Valkey: Evitare errori "Connection Closed by Server"
Questa documentazione fa parte della guida Utilizzo con Python. Consulta la guida completa qui: Come connettere Valkey con Python.
👋 Benvenuti nella documentazione di Stackhero!
Stackhero offre una soluzione Valkey cloud pronta all'uso che fornisce numerosi vantaggi, tra cui:
- Interfaccia web UI
Redis Commanderinclusa.- Dimensione e trasferimenti di messaggi illimitati.
- Aggiornamenti semplificati con un solo clic.
- Prestazioni ottimali e sicurezza robusta grazie a una VM privata e dedicata.
Risparmia tempo e semplifica la tua vita: bastano 5 minuti per provare la soluzione Valkey cloud hosting di Stackhero!
L'errore redis.exceptions.ConnectionError: Connection closed by server può verificarsi se la tua app non interagisce con Valkey per un periodo, portando a una disconnessione automatica. Per evitare ciò, puoi impostare il parametro health_check_interval come mostrato di seguito:
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
Quando si utilizza la funzionalità Pub/Sub di Valkey, la libreria redis-py si aspetta che funzioni come get_message() o listen() vengano chiamate più frequentemente dell'intervallo health_check_interval. Nel nostro esempio, abbiamo impostato questo intervallo a 10 secondi, quindi assicurati di chiamare get_message() o listen() almeno una volta ogni 10 secondi (consulta la documentazione ufficiale di redis-py).
Se ciò non viene fatto, potresti incontrare lo stesso errore di connessione. Per evitare ciò, considera di chiamare regolarmente check_health().
Ecco come puoi implementarlo:
import redis
import threading
# Connettersi a Valkey
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
# Creare un'istanza PubSub
p = r.pubsub()
# Iscriversi al canale "test"
p.subscribe('test')
# Creare una funzione che chiamerà `check_health` ogni 5 secondi
def valkey_auto_check(p):
t = threading.Timer(5, valkey_auto_check, [p])
t.start()
p.check_health()
# Chiamare la funzione valkey_auto_check
valkey_auto_check(p)