RabbitMQ: Python で RabbitMQ に接続する
このドキュメントははじめにガイドの一部です。完全なガイドはこちらからご覧いただけます:Stackhero で RabbitMQ を利用する方法。
ここでは、Aio Pika ライブラリを使って Python アプリケーションから RabbitMQ に接続する方法をご案内します。ほとんどの場合、セキュアな接続には AMQPS URL を指定するだけで十分です:
connection = await aio_pika.connect_robust(
"amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>",
)
以下は、RabbitMQ へのセキュアな接続を確立し、チャネルを作成し、基本的なキューを宣言する完全なサンプルです。セットアップの確認にも最適です:
import asyncio
import logging
import aio_pika
async def main() -> None:
# デバッグログを表示したい場合は、次の行のコメントを外してください
# logging.basicConfig(level=logging.DEBUG)
connection = await aio_pika.connect_robust(
"amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>"
)
async with connection:
print("The connection worked!")
channel = await connection.channel()
await channel.set_qos(prefetch_count=10)
queue = await channel.declare_queue("test_queue", auto_delete=True)
if __name__ == "__main__":
asyncio.run(main())
unable to get local issuer certificate エラーの対処方法
Python から接続する際に、次のようなエラーが発生する場合:
aiormq.exceptions.AMQPConnectionError: [Errno 5] [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1006)
これは、Let's Encrypt の CA 証明書がシステムにインストールされていないことが原因である場合がほとんどです。これを解決するには、ご利用の OS に応じて CA 証明書パッケージをインストールしてください:
-
Ubuntu/Debian の場合:
sudo apt install ca-certificates -
Alpine Linux の場合:
apk add ca-certificates
これらのコマンドが利用できない場合は、CA 証明書を手動でインストールすることも可能です:
-
https://letsencrypt.org/certs/isrgrootx1.pem から Let's Encrypt の CA 証明書をダウンロードします。
-
その後、Python コードで CA 証明書ファイルを指定して RabbitMQ に接続します:
import ssl ssl_context = ssl.create_default_context() ssl_context.load_verify_locations(cafile='isrgrootx1.pem') connection = await aio_pika.connect_robust( "amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>", ssl_context=ssl_context )
Let's Encrypt の CA 証明書を使ったセキュアな接続の完全な例は以下の通りです:
import asyncio
import logging
import ssl
import aio_pika
async def main() -> None:
# デバッグログを有効にする場合は、次の行のコメントを外してください
# logging.basicConfig(level=logging.DEBUG)
ssl_context = ssl.create_default_context()
# ダウンロードした Let's Encrypt の CA 証明書を読み込みます
# 例: wget https://letsencrypt.org/certs/isrgrootx1.pem
ssl_context.load_verify_locations(cafile='isrgrootx1.pem')
connection = await aio_pika.connect_robust(
"amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>",
ssl_context=ssl_context
)
async with connection:
print("The connection worked!")
channel = await connection.channel()
await channel.set_qos(prefetch_count=10)
queue = await channel.declare_queue("test_queue", auto_delete=True)
if __name__ == "__main__":
asyncio.run(main())