RabbitMQ: 使用 Python 連接 RabbitMQ
本文件是快速開始指南的一部分。請在此處查看完整指南:如何使用 Stackhero 連接 RabbitMQ。
以下將說明如何使用 Aio Pika 函式庫,讓您的 Python 應用程式連接到 RabbitMQ。在大多數情況下,您只需要提供 AMQPS URL,即可建立安全連線:
connection = await aio_pika.connect_robust(
"amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>",
)
以下是一個完整範例,展示如何建立與 RabbitMQ 的安全連線、建立 channel 並宣告一個基本佇列。這是驗證您的設定是否正確的好方法:
import asyncio
import logging
import aio_pika
async def main() -> None:
# 若您想查看除錯日誌,可取消註解下一行
# logging.basicConfig(level=logging.DEBUG)
connection = await aio_pika.connect_robust(
"amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>"
)
async with connection:
print("The connection worked!")
channel = await connection.channel()
await channel.set_qos(prefetch_count=10)
queue = await channel.declare_queue("test_queue", auto_delete=True)
if __name__ == "__main__":
asyncio.run(main())
處理 unable to get local issuer certificate 錯誤
如果您在 Python 連線時遇到如下錯誤:
aiormq.exceptions.AMQPConnectionError: [Errno 5] [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1006)
這通常表示您的系統缺少 Let's Encrypt 的 CA 憑證。請依照您的作業系統安裝常用的 CA 憑證套件來解決:
-
在 Ubuntu/Debian 上,請執行:
sudo apt install ca-certificates -
在 Alpine Linux 上,請執行:
apk add ca-certificates
如果您無法使用上述指令,也可以手動安裝 CA 憑證:
-
從 https://letsencrypt.org/certs/isrgrootx1.pem 下載 Let's Encrypt CA 憑證。
-
然後在 Python 程式碼中指定 CA 憑證檔案來連接 RabbitMQ:
import ssl ssl_context = ssl.create_default_context() ssl_context.load_verify_locations(cafile='isrgrootx1.pem') connection = await aio_pika.connect_robust( "amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>", ssl_context=ssl_context )
以下是一個完整範例,使用 Let's Encrypt CA 憑證進行安全連線:
import asyncio
import logging
import ssl
import aio_pika
async def main() -> None:
# 若要啟用除錯日誌,可取消註解這一行
# logging.basicConfig(level=logging.DEBUG)
ssl_context = ssl.create_default_context()
# 載入您剛下載的 Let's Encrypt CA 憑證
# 例如:wget https://letsencrypt.org/certs/isrgrootx1.pem
ssl_context.load_verify_locations(cafile='isrgrootx1.pem')
connection = await aio_pika.connect_robust(
"amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>",
ssl_context=ssl_context
)
async with connection:
print("The connection worked!")
channel = await connection.channel()
await channel.set_qos(prefetch_count=10)
queue = await channel.declare_queue("test_queue", auto_delete=True)
if __name__ == "__main__":
asyncio.run(main())