RabbitMQ: 使用 Python 连接 RabbitMQ
本文件是入门指南指南的一部分。您可以在这里查看完整指南:如何使用 Stackhero 连接 RabbitMQ。
以下将为您介绍如何通过 Aio Pika 库,将您的 Python 应用程序连接到 RabbitMQ。大多数情况下,您只需提供 AMQPS URL 即可建立安全连接:
connection = await aio_pika.connect_robust(
"amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>",
)
下面是一个完整示例,展示如何建立到 RabbitMQ 的安全连接、创建 channel,并声明一个基础队列。这也是验证您配置是否正确的好方法:
import asyncio
import logging
import aio_pika
async def main() -> None:
# 如果您希望查看调试日志,可以取消下一行的注释
# logging.basicConfig(level=logging.DEBUG)
connection = await aio_pika.connect_robust(
"amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>"
)
async with connection:
print("The connection worked!")
channel = await connection.channel()
await channel.set_qos(prefetch_count=10)
queue = await channel.declare_queue("test_queue", auto_delete=True)
if __name__ == "__main__":
asyncio.run(main())
处理 unable to get local issuer certificate 错误
如果您在使用 Python 连接时遇到如下错误:
aiormq.exceptions.AMQPConnectionError: [Errno 5] [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1006)
这通常意味着您的系统缺少 Let's Encrypt 的 CA 证书。为了解决这个问题,您可以为您的操作系统安装常用的 CA 证书包:
-
在 Ubuntu/Debian 上,运行:
sudo apt install ca-certificates -
在 Alpine Linux 上,运行:
apk add ca-certificates
如果您无法使用上述命令,您也可以手动安装 CA 证书:
-
从 https://letsencrypt.org/certs/isrgrootx1.pem 下载 Let's Encrypt CA 证书。
-
然后,在您的 Python 代码中指定 CA 证书文件来连接 RabbitMQ:
import ssl ssl_context = ssl.create_default_context() ssl_context.load_verify_locations(cafile='isrgrootx1.pem') connection = await aio_pika.connect_robust( "amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>", ssl_context=ssl_context )
以下是一个完整示例,演示如何使用 Let's Encrypt CA 证书进行安全连接:
import asyncio
import logging
import ssl
import aio_pika
async def main() -> None:
# 如需启用调试日志,可取消下方注释
# logging.basicConfig(level=logging.DEBUG)
ssl_context = ssl.create_default_context()
# 加载您下载的 Let's Encrypt CA 证书
# 例如:wget https://letsencrypt.org/certs/isrgrootx1.pem
ssl_context.load_verify_locations(cafile='isrgrootx1.pem')
connection = await aio_pika.connect_robust(
"amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>",
ssl_context=ssl_context
)
async with connection:
print("The connection worked!")
channel = await connection.channel()
await channel.set_qos(prefetch_count=10)
queue = await channel.declare_queue("test_queue", auto_delete=True)
if __name__ == "__main__":
asyncio.run(main())