RabbitMQ: 使用 Python 連接 RabbitMQ
此文件屬於入門指南指南的一部分。請在此處查看完整指南:如何使用 Stackhero 連接 RabbitMQ。
以下將說明如何利用 Aio Pika library,讓您的 Python 應用程式連接至 RabbitMQ。在大多數情況下,您只需提供 AMQPS URL,即可建立安全連線:
connection = await aio_pika.connect_robust(
"amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>",
)
以下是一個完整範例,展示如何建立安全連線至 RabbitMQ、建立 channel 及宣告一個基本 queue。這是驗證您的設定是否正確的好方法:
import asyncio
import logging
import aio_pika
async def main() -> None:
# 如果您想查看 debug log,可以取消註解下一行
# logging.basicConfig(level=logging.DEBUG)
connection = await aio_pika.connect_robust(
"amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>"
)
async with connection:
print("The connection worked!")
channel = await connection.channel()
await channel.set_qos(prefetch_count=10)
queue = await channel.declare_queue("test_queue", auto_delete=True)
if __name__ == "__main__":
asyncio.run(main())
處理 unable to get local issuer certificate 錯誤
如果您在 Python 連線時遇到如下錯誤:
aiormq.exceptions.AMQPConnectionError: [Errno 5] [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1006)
這通常代表您的系統缺少 Let's Encrypt 的 CA 憑證。您可以透過安裝作業系統常見的 CA 憑證套件來解決:
-
在 Ubuntu/Debian,請執行:
sudo apt install ca-certificates -
在 Alpine Linux,請執行:
apk add ca-certificates
如果您無法使用上述指令,也可以手動安裝 CA 憑證:
-
從 https://letsencrypt.org/certs/isrgrootx1.pem 下載 Let's Encrypt CA 憑證。
-
然後在 Python 程式中指定 CA 憑證檔案來連接 RabbitMQ:
import ssl ssl_context = ssl.create_default_context() ssl_context.load_verify_locations(cafile='isrgrootx1.pem') connection = await aio_pika.connect_robust( "amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>", ssl_context=ssl_context )
以下是使用 Let's Encrypt CA 憑證進行安全連線的完整範例:
import asyncio
import logging
import ssl
import aio_pika
async def main() -> None:
# 如需啟用 debug log,可取消註解下行
# logging.basicConfig(level=logging.DEBUG)
ssl_context = ssl.create_default_context()
# 載入您下載的 Let's Encrypt CA 憑證
# 例如:wget https://letsencrypt.org/certs/isrgrootx1.pem
ssl_context.load_verify_locations(cafile='isrgrootx1.pem')
connection = await aio_pika.connect_robust(
"amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>",
ssl_context=ssl_context
)
async with connection:
print("The connection worked!")
channel = await connection.channel()
await channel.set_qos(prefetch_count=10)
queue = await channel.declare_queue("test_queue", auto_delete=True)
if __name__ == "__main__":
asyncio.run(main())