Valkey: Evitar errores "Connection Closed by Server"
Esta documentación forma parte de la guía Uso con Python. Consulte la guía completa aquí: Cómo conectar Valkey con Python.
👋 ¡Bienvenido a la documentación de Stackhero!
Stackhero ofrece una solución Valkey cloud lista para usar que proporciona una serie de beneficios, incluyendo:
- Interfaz web UI
Redis Commanderincluida.- Tamaño y transferencias de mensajes ilimitados.
- Actualizaciones sin esfuerzo con solo un clic.
- Rendimiento óptimo y seguridad robusta gracias a una VM privada y dedicada.
Ahorra tiempo y simplifica tu vida: ¡solo toma 5 minutos probar la solución de Valkey cloud hosting de Stackhero!
El error redis.exceptions.ConnectionError: Connection closed by server puede ocurrir si su aplicación no interactúa con Valkey durante un período, lo que lleva a una desconexión automática. Para evitar esto, puede establecer el parámetro health_check_interval como se muestra a continuación:
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
Al usar la función Pub/Sub de Valkey, la biblioteca redis-py espera que funciones como get_message() o listen() se llamen con más frecuencia que el health_check_interval. En nuestro ejemplo, hemos establecido este intervalo en 10 segundos, así que asegúrese de llamar a get_message() o listen() al menos una vez cada 10 segundos (consulte la documentación oficial de redis-py).
Si esto no se hace, podría encontrar el mismo error de conexión. Para evitar esto, considere llamar regularmente a check_health().
Aquí le mostramos cómo puede implementarlo:
import redis
import threading
# Conectar a Valkey
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
# Crear una instancia de PubSub
p = r.pubsub()
# Suscribirse al canal "test"
p.subscribe('test')
# Crear una función que llame a `check_health` cada 5 segundos
def valkey_auto_check(p):
t = threading.Timer(5, valkey_auto_check, [p])
t.start()
p.check_health()
# Llamar a la función valkey_auto_check
valkey_auto_check(p)