RabbitMQ: Primeros pasos

Cómo usar Stackhero para RabbitMQ

👋 ¡Bienvenido a la documentación de Stackhero!

Stackhero ofrece una solución RabbitMQ cloud lista para usar que proporciona una serie de beneficios, incluyendo:

  • Acceso completo a la interfaz web de RabbitMQ para gestionar usuarios, vhosts y permisos.
  • Colas ilimitadas sin limitaciones de tiempo de retención.
  • Soporta los protocolos AMQP, MQTT, STOMP y WebSocket.
  • Muchos plugins incluidos, como Delayed Message Exchange, Message Deduplication y Consistent-hash Exchange.
  • Actualizaciones sin esfuerzo con solo un clic.
  • Rendimiento óptimo y seguridad robusta impulsados por una VM privada y dedicada.

Ahorre tiempo y simplifique su vida: ¡solo toma 5 minutos probar la solución de alojamiento en la nube RabbitMQ de Stackhero!

Este ejemplo muestra cómo usar la biblioteca Aio Pika para conectar Python a RabbitMQ. En muchos casos, especificar la URL AMQPS es todo lo que se necesita:

connection = await aio_pika.connect_robust(
  "amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>",
)

A continuación, se muestra un ejemplo completo en Python que establece una conexión segura a RabbitMQ. Puede seguir estos pasos para verificar su conexión y configurar una cola básica:

import asyncio
import logging
import aio_pika

async def main() -> None:
    # Descomente la siguiente línea para habilitar los registros de depuración
    # logging.basicConfig(level=logging.DEBUG)

    connection = await aio_pika.connect_robust(
        "amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>"
    )

    async with connection:
        print("¡La conexión funcionó!")
        channel = await connection.channel()
        await channel.set_qos(prefetch_count=10)
        queue = await channel.declare_queue("test_queue", auto_delete=True)

if __name__ == "__main__":
    asyncio.run(main())

Si ve el error

aiormq.exceptions.AMQPConnectionError: [Errno 5] [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1006)

al conectarse a RabbitMQ desde Python, generalmente indica que a su sistema le falta el certificado de Let's Encrypt. Para resolver esto, instale los certificados CA comunes:

  1. En Ubuntu/Debian, ejecute:

    sudo apt install ca-certificates
    
  2. En Alpine Linux, ejecute:

    apk add ca-certificates
    

Si no puede usar estos comandos, puede instalar el certificado CA manualmente:

  1. Descargue el certificado CA de Let's Encrypt desde https://letsencrypt.org/certs/isrgrootx1.pem.

  2. Luego, conéctese a RabbitMQ en su código Python pasando el archivo de certificado CA:

    import ssl
    
    ssl_context = ssl.create_default_context()
    ssl_context.load_verify_locations(cafile='isrgrootx1.pem')
    
    connection = await aio_pika.connect_robust(
      "amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>",
      ssl_context=ssl_context
    )
    

A continuación, se muestra un ejemplo completo en Python que utiliza el certificado CA de Let's Encrypt para establecer una conexión segura:

import asyncio
import logging
import ssl
import aio_pika

async def main() -> None:
    # Descomente la siguiente línea para habilitar los registros de depuración
    # logging.basicConfig(level=logging.DEBUG)

    ssl_context = ssl.create_default_context()
    # Cargue manualmente el certificado CA de Let's Encrypt
    # Descárguelo usando: wget https://letsencrypt.org/certs/isrgrootx1.pem
    ssl_context.load_verify_locations(cafile='isrgrootx1.pem')

    connection = await aio_pika.connect_robust(
        "amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>",
        ssl_context=ssl_context
    )

    async with connection:
        print("¡La conexión funcionó!")
        channel = await connection.channel()
        await channel.set_qos(prefetch_count=10)
        queue = await channel.declare_queue("test_queue", auto_delete=True)


if __name__ == "__main__":
    asyncio.run(main())

A continuación, se muestra un ejemplo simple que muestra cómo conectarse a RabbitMQ desde una aplicación GoLang usando la biblioteca oficial Go RabbitMQ Client Library. Siga estos pasos para configurar su proyecto:

  1. Cree un nuevo directorio e inicialice el módulo:
go mod init rabbitmq-example
  1. Agregue la biblioteca RabbitMQ:
go get github.com/rabbitmq/amqp091-go
  1. Cree un nuevo archivo llamado main.go y agregue el siguiente contenido:

    package main
    
    import (
      "fmt"
      amqp "github.com/rabbitmq/amqp091-go"
    )
    
    func main() {
      connection, err := amqp.Dial("amqps://<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>")
      if err != nil {
        panic(err)
      }
      defer connection.Close()
    
      fmt.Println("Conectado exitosamente a la instancia de RabbitMQ")
    }
    
  2. Ejecute su código:

go run main.go

Debería ver "Conectado exitosamente a la instancia de RabbitMQ", lo que indica que su código se ha conectado de manera segura utilizando autenticación y cifrado TLS.

Para más ejemplos, puede explorar ejemplos de Go en el repositorio oficial de RabbitMQ: https://github.com/rabbitmq/rabbitmq-tutorials/tree/main/go.

A continuación, se muestra un ejemplo que demuestra cómo conectarse a RabbitMQ desde PHP usando la biblioteca php-amqplib. Dado que las instancias de Stackhero usan cifrado TLS (SSL), la conexión debe establecerse con AMQPSSLConnection:

use PhpAmqpLib\Connection\AMQPSSLConnection;

$connection = new AMQPSSLConnection(
  '<XXXXXX>.stackhero-network.com',
  <AMQP_PORT_TLS>,
  'admin',
  '<PASSWORD>',
  '/',
  array()
);

/**
 * @param \PhpAmqpLib\Connection\AbstractConnection $connection
 */
function shutdown($connection)
{
  $connection->close();
}

register_shutdown_function('shutdown', $connection);

La conexión TLS puede requerir un certificado de Autoridad de Certificación (CA). Aunque muchos servidores ya lo tienen instalado, es posible que necesite descargarlo manualmente. Siga estos pasos:

  1. Descargue el certificado desde https://letsencrypt.org/certs/isrgrootx1.pem y guárdelo en su servidor.
  2. Use el siguiente código PHP para conectarse usando el certificado descargado:
$sslOptions = array(
  'cafile' => realpath(__DIR__ . '/isrgrootx1.pem'),
);

$connection = new AMQPSSLConnection(
  '<XXXXXX>.stackhero-network.com',
  <AMQP_PORT_TLS>,
  'admin',
  '<PASSWORD>',
  '/',
  $sslOptions
);

Symfony puede usar RabbitMQ como un intermediario de mensajes configurando la variable de entorno MESSENGER_TRANSPORT_DSN. Para configurar esto, edite el archivo .env y establezca la variable de la siguiente manera:

MESSENGER_TRANSPORT_DSN=amqps://<USER>:<PASSWORD>@<HOST>:<PORT>/%2f/messages?cacert=%2Fetc%2Fssl%2Fcerts%2Fca-certificates.crt

Reemplace <USER>, <PASSWORD>, <HOST> y <PORT> con sus detalles de RabbitMQ.

Además, asegúrese de que el archivo config/packages/messenger.yaml use la variable MESSENGER_TRANSPORT_DSN. La configuración debería verse así:

framework:
    messenger:
        transports:
            async: '%env(MESSENGER_TRANSPORT_DSN)%'

A continuación, se muestra un ejemplo de cómo puede configurar Spring Boot para conectarse de manera segura a una instancia de RabbitMQ de Stackhero. Actualice las propiedades de su aplicación con estas configuraciones:

spring.rabbitmq.host=<XXXXXX>.stackhero-network.com
spring.rabbitmq.port=<AMQP_PORT_TLS>
spring.rabbitmq.username=admin
spring.rabbitmq.password=<PASSWORD>
spring.rabbitmq.ssl.enabled=true
spring.rabbitmq.ssl.algorithm=TLSv1.2

Aquí hay un ejemplo que muestra cómo conectarse a Stackhero RabbitMQ usando .NET y MassTransit. Este ejemplo configura el host con los ajustes necesarios para el cifrado TLS:

using MassTransit;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System;

public class Program
{
  public static void Main(string[] args)
  {
    var host = Host.CreateDefaultBuilder(args)
      .ConfigureServices((context, services) =>
      {
        services.AddMassTransit(x =>
        {
          x.UsingRabbitMq((context, cfg) =>
          {
            cfg.Host(new Uri("amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>"), h =>
            {
              h.UseSsl(s =>
              {
                s.Protocol = System.Security.Authentication.SslProtocols.Tls12;
              });
            });
          });
        });

        services.AddMassTransitHostedService(true);
      })
      .Build();

    host.Run();
  }
}

Con el plugin "Mensajes retrasados", puede retrasar o programar mensajes estableciendo un retraso en milisegundos. El plugin se puede activar directamente desde el panel de Stackhero. Después de activar el plugin, cree un intercambio retrasado ya sea a través del panel de administración de RabbitMQ o directamente en su código.

Para más detalles, consulte el repositorio oficial: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange.

Si desactiva el plugin, cualquier mensaje retrasado que aún no se haya entregado se perderá.

Después de activar el plugin en el panel de Stackhero, navegue a su panel de administración de RabbitMQ y cree un intercambio con el tipo "x-delayed-message". Luego agregue un argumento con la clave x-delayed-type y el valor "direct". Esta configuración se ilustra en la captura de pantalla a continuación.

Creación de intercambioCreación de intercambio

Si encuentra el error "Invalid argument, 'x-delayed-type' must be an existing exchange type", asegúrese de haber configurado correctamente el argumento x-delayed-type.

Al usar Elixir para conectarse a RabbitMQ, puede encontrar el error

CLIENT ALERT: Fatal - Handshake Failure

Este problema está relacionado con un error con el soporte de TLS 1.3 en la biblioteca AMQP para Elixir. Una solución práctica es forzar el uso de TLS 1.2. Puede lograr esto incluyendo la siguiente opción al abrir la conexión:

AMQP.Connection.open("amqps://admin:<PASSWORD>@<XXXXXX>.stackhero-network.com:<AMQP_PORT_TLS>", :undefined, ssl_options: [ versions: [ :"tlsv1.2" ] ])

El error Error: Socket closed abruptly during opening handshake puede ocurrir al usar una versión de la biblioteca amqplib de Node.js anterior a la 0.10.7 al conectarse a RabbitMQ 4.1.0 o posterior. Este problema está relacionado con un cambio en la configuración frame_max introducido en RabbitMQ 4.1.0.

Para resolver el error, actualice su biblioteca amqplib a la versión 0.10.7 o posterior.