RabbitMQ: Using Python to connect to RabbitMQ

This documentation is part of the Getting started guide. You can view the complete guide here: How to use Stackhero for RabbitMQ.

Let's go through how to connect your Python application to RabbitMQ using the Aio Pika library. In most cases, you simply need to provide the AMQPS URL to establish a secure connection:

connection = await aio_pika.connect_robust(
  "amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>",
)

Below is a complete example demonstrating how to establish a secure connection to RabbitMQ, create a channel, and declare a basic queue. This is an excellent way to verify your configuration:

import asyncio
import logging
import aio_pika

async def main() -> None:
    # If you wish to see debug logs, you can uncomment the following line
    # logging.basicConfig(level=logging.DEBUG)

    connection = await aio_pika.connect_robust(
        "amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>"
    )

    async with connection:
        print("The connection worked!")
        channel = await connection.channel()
        await channel.set_qos(prefetch_count=10)
        queue = await channel.declare_queue("test_queue", auto_delete=True)

if __name__ == "__main__":
    asyncio.run(main())

If you encounter an error like this when connecting from Python:

aiormq.exceptions.AMQPConnectionError: [Errno 5] [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1006)

this usually means your system is missing the Let's Encrypt CA certificate. To resolve this, install the appropriate CA certificates package for your operating system:

  1. On Ubuntu/Debian, run:

    sudo apt install ca-certificates
    
  2. On Alpine Linux, run:

    apk add ca-certificates
    

If you are unable to use these commands, you can also install the CA certificate manually:

  1. Download the Let's Encrypt CA certificate from https://letsencrypt.org/certs/isrgrootx1.pem.

  2. Then, connect to RabbitMQ in your Python code by specifying the CA certificate file:

    import ssl
    
    ssl_context = ssl.create_default_context()
    ssl_context.load_verify_locations(cafile='isrgrootx1.pem')
    
    connection = await aio_pika.connect_robust(
      "amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>",
      ssl_context=ssl_context
    )
    

Here is a full example using the Let's Encrypt CA certificate for secure connections:

import asyncio
import logging
import ssl
import aio_pika

async def main() -> None:
    # To enable debug logs, you can uncomment this line
    # logging.basicConfig(level=logging.DEBUG)

    ssl_context = ssl.create_default_context()
    # Load the Let's Encrypt CA certificate you downloaded
    # For example: wget https://letsencrypt.org/certs/isrgrootx1.pem
    ssl_context.load_verify_locations(cafile='isrgrootx1.pem')

    connection = await aio_pika.connect_robust(
        "amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>",
        ssl_context=ssl_context
    )

    async with connection:
        print("The connection worked!")
        channel = await connection.channel()
        await channel.set_qos(prefetch_count=10)
        queue = await channel.declare_queue("test_queue", auto_delete=True)

if __name__ == "__main__":
    asyncio.run(main())