RabbitMQ: 使用 Python 連接 RabbitMQ

此文件屬於入門指南指南的一部分。請在此處查看完整指南:如何使用 Stackhero 連接 RabbitMQ

以下將說明如何利用 Aio Pika library,讓您的 Python 應用程式連接至 RabbitMQ。在大多數情況下,您只需提供 AMQPS URL,即可建立安全連線:

connection = await aio_pika.connect_robust(
  "amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>",
)

以下是一個完整範例,展示如何建立安全連線至 RabbitMQ、建立 channel 及宣告一個基本 queue。這是驗證您的設定是否正確的好方法:

import asyncio
import logging
import aio_pika

async def main() -> None:
    # 如果您想查看 debug log,可以取消註解下一行
    # logging.basicConfig(level=logging.DEBUG)

    connection = await aio_pika.connect_robust(
        "amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>"
    )

    async with connection:
        print("The connection worked!")
        channel = await connection.channel()
        await channel.set_qos(prefetch_count=10)
        queue = await channel.declare_queue("test_queue", auto_delete=True)

if __name__ == "__main__":
    asyncio.run(main())

如果您在 Python 連線時遇到如下錯誤:

aiormq.exceptions.AMQPConnectionError: [Errno 5] [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1006)

這通常代表您的系統缺少 Let's Encrypt 的 CA 憑證。您可以透過安裝作業系統常見的 CA 憑證套件來解決:

  1. Ubuntu/Debian,請執行:

    sudo apt install ca-certificates
    
  2. Alpine Linux,請執行:

    apk add ca-certificates
    

如果您無法使用上述指令,也可以手動安裝 CA 憑證:

  1. https://letsencrypt.org/certs/isrgrootx1.pem 下載 Let's Encrypt CA 憑證。

  2. 然後在 Python 程式中指定 CA 憑證檔案來連接 RabbitMQ:

    import ssl
    
    ssl_context = ssl.create_default_context()
    ssl_context.load_verify_locations(cafile='isrgrootx1.pem')
    
    connection = await aio_pika.connect_robust(
      "amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>",
      ssl_context=ssl_context
    )
    

以下是使用 Let's Encrypt CA 憑證進行安全連線的完整範例:

import asyncio
import logging
import ssl
import aio_pika

async def main() -> None:
    # 如需啟用 debug log,可取消註解下行
    # logging.basicConfig(level=logging.DEBUG)

    ssl_context = ssl.create_default_context()
    # 載入您下載的 Let's Encrypt CA 憑證
    # 例如:wget https://letsencrypt.org/certs/isrgrootx1.pem
    ssl_context.load_verify_locations(cafile='isrgrootx1.pem')

    connection = await aio_pika.connect_robust(
        "amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>",
        ssl_context=ssl_context
    )

    async with connection:
        print("The connection worked!")
        channel = await connection.channel()
        await channel.set_qos(prefetch_count=10)
        queue = await channel.declare_queue("test_queue", auto_delete=True)

if __name__ == "__main__":
    asyncio.run(main())