RabbitMQ: Łączenie się z RabbitMQ za pomocą Pythona
Ta dokumentacja jest częścią przewodnika Pierwsze kroki. Pełny przewodnik znajdziesz tutaj: Jak korzystać ze Stackhero dla RabbitMQ.
Pokażemy, jak połączyć swoją aplikację Python z RabbitMQ, korzystając z biblioteki Aio Pika. W większości przypadków wystarczy podać adres URL AMQPS, aby nawiązać bezpieczne połączenie:
connection = await aio_pika.connect_robust(
"amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>",
)
Poniżej znajduje się kompletny przykład pokazujący, jak nawiązać bezpieczne połączenie z RabbitMQ, utworzyć kanał i zadeklarować podstawową kolejkę. To dobry sposób na weryfikację konfiguracji:
import asyncio
import logging
import aio_pika
async def main() -> None:
# Jeśli chcesz zobaczyć logi debugowania, możesz odkomentować poniższą linię
# logging.basicConfig(level=logging.DEBUG)
connection = await aio_pika.connect_robust(
"amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>"
)
async with connection:
print("The connection worked!")
channel = await connection.channel()
await channel.set_qos(prefetch_count=10)
queue = await channel.declare_queue("test_queue", auto_delete=True)
if __name__ == "__main__":
asyncio.run(main())
Obsługa błędu unable to get local issuer certificate
Jeśli podczas łączenia się z Pythona pojawi się błąd:
aiormq.exceptions.AMQPConnectionError: [Errno 5] [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1006)
najczęściej oznacza to brak certyfikatu CA Let's Encrypt w systemie. Aby to naprawić, zainstaluj pakiet certyfikatów CA odpowiedni dla Twojego systemu operacyjnego:
-
Na Ubuntu/Debian uruchom:
sudo apt install ca-certificates -
Na Alpine Linux uruchom:
apk add ca-certificates
Jeśli nie możesz użyć tych poleceń, możesz również zainstalować certyfikat CA ręcznie:
-
Pobierz certyfikat CA Let's Encrypt ze strony https://letsencrypt.org/certs/isrgrootx1.pem.
-
Następnie połącz się z RabbitMQ w swoim kodzie Python, wskazując plik certyfikatu CA:
import ssl ssl_context = ssl.create_default_context() ssl_context.load_verify_locations(cafile='isrgrootx1.pem') connection = await aio_pika.connect_robust( "amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>", ssl_context=ssl_context )
Poniżej znajduje się pełny przykład użycia certyfikatu CA Let's Encrypt do bezpiecznego połączenia:
import asyncio
import logging
import ssl
import aio_pika
async def main() -> None:
# Aby włączyć logi debugowania, możesz odkomentować tę linię
# logging.basicConfig(level=logging.DEBUG)
ssl_context = ssl.create_default_context()
# Załaduj pobrany certyfikat CA Let's Encrypt
# Na przykład: wget https://letsencrypt.org/certs/isrgrootx1.pem
ssl_context.load_verify_locations(cafile='isrgrootx1.pem')
connection = await aio_pika.connect_robust(
"amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>",
ssl_context=ssl_context
)
async with connection:
print("The connection worked!")
channel = await connection.channel()
await channel.set_qos(prefetch_count=10)
queue = await channel.declare_queue("test_queue", auto_delete=True)
if __name__ == "__main__":
asyncio.run(main())