Redis®*: Evitar el error "Connection Closed by Server" en Redis y Python
Esta documentación forma parte de la guía Uso con Python. Consulte la guía completa aquí: Cómo conectar Redis con Python.
👋 ¡Bienvenido a la documentación de Stackhero!
Stackhero ofrece una solución Redis cloud lista para usar que proporciona una serie de beneficios, incluyendo:
- Interfaz web
Redis Commanderincluida.- Tamaño y transferencias de mensajes ilimitados.
- Actualizaciones sin esfuerzo con solo un clic.
- Rendimiento óptimo y seguridad robusta gracias a una VM privada y dedicada.
Ahorra tiempo y simplifica tu vida: ¡solo toma 5 minutos probar la solución de alojamiento Redis cloud de Stackhero!
Puede encontrarse con el error redis.exceptions.ConnectionError: Connection closed by server, que puede ocurrir debido a la inactividad en su aplicación de Python, causando que la conexión se cierre automáticamente. Cuando su aplicación intenta reconectarse, puede fallar, resultando en este error.
Para mitigar esto, considere establecer el parámetro health_check_interval en su conexión Redis de la siguiente manera:
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
Al usar la funcionalidad Pub/Sub de Redis, asegúrese de que su aplicación llame a get_message() o listen() con más frecuencia que el health_check_interval especificado (en este ejemplo, cada 10 segundos). Puede consultar la documentación oficial de redis-py para más detalles.
Si estas llamadas no se realizan dentro del intervalo, aún puede encontrarse con el error "Connection closed by server". Una solución práctica es usar la función check_health() regularmente.
Aquí le mostramos cómo puede implementarlo:
import redis
import threading
# Conectar a Redis
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
# Crear una instancia de PubSub
p = r.pubsub()
# Suscribirse al canal "test"
p.subscribe('test')
# Crear una función que llame a `check_health` cada 5 segundos
def redis_auto_check(p):
t = threading.Timer(5, redis_auto_check, [p])
t.start()
p.check_health()
# Llamar a la función redis_auto_check
redis_auto_check(p)