Valkey: Evitare errori "Connection Closed by Server"

Questa documentazione fa parte della guida Utilizzo con Python. Consulta la guida completa qui: Come connettere Valkey con Python.

👋 Benvenuti nella documentazione di Stackhero!

Stackhero offre una soluzione Valkey cloud pronta all'uso che fornisce numerosi vantaggi, tra cui:

  • Interfaccia web UI Redis Commander inclusa.
  • Dimensione e trasferimenti di messaggi illimitati.
  • Aggiornamenti semplificati con un solo clic.
  • Prestazioni ottimali e sicurezza robusta grazie a una VM privata e dedicata.

Risparmia tempo e semplifica la tua vita: bastano 5 minuti per provare la soluzione Valkey cloud hosting di Stackhero!

L'errore redis.exceptions.ConnectionError: Connection closed by server può verificarsi se la tua app non interagisce con Valkey per un periodo, portando a una disconnessione automatica. Per evitare ciò, puoi impostare il parametro health_check_interval come mostrato di seguito:

r = redis.from_url(
  'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
  health_check_interval=10,
  socket_connect_timeout=5,
  retry_on_timeout=True,
  socket_keepalive=True
)

Quando si utilizza la funzionalità Pub/Sub di Valkey, la libreria redis-py si aspetta che funzioni come get_message() o listen() vengano chiamate più frequentemente dell'intervallo health_check_interval. Nel nostro esempio, abbiamo impostato questo intervallo a 10 secondi, quindi assicurati di chiamare get_message() o listen() almeno una volta ogni 10 secondi (consulta la documentazione ufficiale di redis-py).

Se ciò non viene fatto, potresti incontrare lo stesso errore di connessione. Per evitare ciò, considera di chiamare regolarmente check_health().

Ecco come puoi implementarlo:

import redis
import threading

# Connettersi a Valkey
r = redis.from_url(
  'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
  health_check_interval=10,
  socket_connect_timeout=5,
  retry_on_timeout=True,
  socket_keepalive=True
)

# Creare un'istanza PubSub
p = r.pubsub()

# Iscriversi al canale "test"
p.subscribe('test')

# Creare una funzione che chiamerà `check_health` ogni 5 secondi
def valkey_auto_check(p):
  t = threading.Timer(5, valkey_auto_check, [p])
  t.start()
  p.check_health()

# Chiamare la funzione valkey_auto_check
valkey_auto_check(p)