RabbitMQ: 使用 Python 连接 RabbitMQ

本文件是入门指南指南的一部分。您可以在这里查看完整指南:如何使用 Stackhero 连接 RabbitMQ

以下将为您介绍如何通过 Aio Pika 库,将您的 Python 应用程序连接到 RabbitMQ。大多数情况下,您只需提供 AMQPS URL 即可建立安全连接:

connection = await aio_pika.connect_robust(
  "amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>",
)

下面是一个完整示例,展示如何建立到 RabbitMQ 的安全连接、创建 channel,并声明一个基础队列。这也是验证您配置是否正确的好方法:

import asyncio
import logging
import aio_pika

async def main() -> None:
    # 如果您希望查看调试日志,可以取消下一行的注释
    # logging.basicConfig(level=logging.DEBUG)

    connection = await aio_pika.connect_robust(
        "amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>"
    )

    async with connection:
        print("The connection worked!")
        channel = await connection.channel()
        await channel.set_qos(prefetch_count=10)
        queue = await channel.declare_queue("test_queue", auto_delete=True)

if __name__ == "__main__":
    asyncio.run(main())

如果您在使用 Python 连接时遇到如下错误:

aiormq.exceptions.AMQPConnectionError: [Errno 5] [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1006)

这通常意味着您的系统缺少 Let's Encrypt 的 CA 证书。为了解决这个问题,您可以为您的操作系统安装常用的 CA 证书包:

  1. Ubuntu/Debian 上,运行:

    sudo apt install ca-certificates
    
  2. Alpine Linux 上,运行:

    apk add ca-certificates
    

如果您无法使用上述命令,您也可以手动安装 CA 证书:

  1. https://letsencrypt.org/certs/isrgrootx1.pem 下载 Let's Encrypt CA 证书。

  2. 然后,在您的 Python 代码中指定 CA 证书文件来连接 RabbitMQ:

    import ssl
    
    ssl_context = ssl.create_default_context()
    ssl_context.load_verify_locations(cafile='isrgrootx1.pem')
    
    connection = await aio_pika.connect_robust(
      "amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>",
      ssl_context=ssl_context
    )
    

以下是一个完整示例,演示如何使用 Let's Encrypt CA 证书进行安全连接:

import asyncio
import logging
import ssl
import aio_pika

async def main() -> None:
    # 如需启用调试日志,可取消下方注释
    # logging.basicConfig(level=logging.DEBUG)

    ssl_context = ssl.create_default_context()
    # 加载您下载的 Let's Encrypt CA 证书
    # 例如:wget https://letsencrypt.org/certs/isrgrootx1.pem
    ssl_context.load_verify_locations(cafile='isrgrootx1.pem')

    connection = await aio_pika.connect_robust(
        "amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>",
        ssl_context=ssl_context
    )

    async with connection:
        print("The connection worked!")
        channel = await connection.channel()
        await channel.set_qos(prefetch_count=10)
        queue = await channel.declare_queue("test_queue", auto_delete=True)

if __name__ == "__main__":
    asyncio.run(main())