RabbitMQ: Python で RabbitMQ に接続する

このドキュメントははじめにガイドの一部です。完全なガイドはこちらからご覧いただけます:Stackhero で RabbitMQ を利用する方法

ここでは、Aio Pika ライブラリを使って Python アプリケーションから RabbitMQ に接続する方法をご案内します。ほとんどの場合、セキュアな接続には AMQPS URL を指定するだけで十分です:

connection = await aio_pika.connect_robust(
  "amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>",
)

以下は、RabbitMQ へのセキュアな接続を確立し、チャネルを作成し、基本的なキューを宣言する完全なサンプルです。セットアップの確認にも最適です:

import asyncio
import logging
import aio_pika

async def main() -> None:
    # デバッグログを表示したい場合は、次の行のコメントを外してください
    # logging.basicConfig(level=logging.DEBUG)

    connection = await aio_pika.connect_robust(
        "amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>"
    )

    async with connection:
        print("The connection worked!")
        channel = await connection.channel()
        await channel.set_qos(prefetch_count=10)
        queue = await channel.declare_queue("test_queue", auto_delete=True)

if __name__ == "__main__":
    asyncio.run(main())

Python から接続する際に、次のようなエラーが発生する場合:

aiormq.exceptions.AMQPConnectionError: [Errno 5] [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1006)

これは、Let's Encrypt の CA 証明書がシステムにインストールされていないことが原因である場合がほとんどです。これを解決するには、ご利用の OS に応じて CA 証明書パッケージをインストールしてください:

  1. Ubuntu/Debian の場合:

    sudo apt install ca-certificates
    
  2. Alpine Linux の場合:

    apk add ca-certificates
    

これらのコマンドが利用できない場合は、CA 証明書を手動でインストールすることも可能です:

  1. https://letsencrypt.org/certs/isrgrootx1.pem から Let's Encrypt の CA 証明書をダウンロードします。

  2. その後、Python コードで CA 証明書ファイルを指定して RabbitMQ に接続します:

    import ssl
    
    ssl_context = ssl.create_default_context()
    ssl_context.load_verify_locations(cafile='isrgrootx1.pem')
    
    connection = await aio_pika.connect_robust(
      "amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>",
      ssl_context=ssl_context
    )
    

Let's Encrypt の CA 証明書を使ったセキュアな接続の完全な例は以下の通りです:

import asyncio
import logging
import ssl
import aio_pika

async def main() -> None:
    # デバッグログを有効にする場合は、次の行のコメントを外してください
    # logging.basicConfig(level=logging.DEBUG)

    ssl_context = ssl.create_default_context()
    # ダウンロードした Let's Encrypt の CA 証明書を読み込みます
    # 例: wget https://letsencrypt.org/certs/isrgrootx1.pem
    ssl_context.load_verify_locations(cafile='isrgrootx1.pem')

    connection = await aio_pika.connect_robust(
        "amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>",
        ssl_context=ssl_context
    )

    async with connection:
        print("The connection worked!")
        channel = await connection.channel()
        await channel.set_qos(prefetch_count=10)
        queue = await channel.declare_queue("test_queue", auto_delete=True)

if __name__ == "__main__":
    asyncio.run(main())