Redis®*: Evitare l'errore "Connection Closed by Server" in Redis e Python

Questa documentazione fa parte della guida Utilizzo con Python. Consulta la guida completa qui: Come connettere Redis con Python.

👋 Benvenuti nella documentazione di Stackhero!

Stackhero offre una soluzione Redis cloud pronta all'uso che fornisce numerosi vantaggi, tra cui:

  • Interfaccia web Redis Commander inclusa.
  • Dimensione e trasferimenti di messaggi illimitati.
  • Aggiornamenti senza sforzo con un solo clic.
  • Prestazioni ottimali e sicurezza robusta grazie a una VM privata e dedicata.

Risparmia tempo e semplifica la tua vita: bastano solo 5 minuti per provare la soluzione di hosting Redis cloud di Stackhero!

Potresti incontrare l'errore redis.exceptions.ConnectionError: Connection closed by server, che può verificarsi a causa dell'inattività della tua applicazione Python, causando la chiusura automatica della connessione. Quando la tua app tenta di riconnettersi, potrebbe fallire, risultando in questo errore.

Per mitigare questo, considera di impostare il parametro health_check_interval nella tua connessione Redis come segue:

r = redis.from_url(
  'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
  health_check_interval=10,
  socket_connect_timeout=5,
  retry_on_timeout=True,
  socket_keepalive=True
)

Quando utilizzi la funzionalità Pub/Sub di Redis, assicurati che la tua applicazione chiami get_message() o listen() più frequentemente dell'intervallo specificato da health_check_interval (in questo esempio, ogni 10 secondi). Puoi fare riferimento alla documentazione ufficiale di redis-py per maggiori dettagli.

Se queste chiamate non vengono effettuate entro l'intervallo, potresti ancora incontrare l'errore "Connection closed by server". Una soluzione pratica è utilizzare regolarmente la funzione check_health().

Ecco come puoi implementarla:

import redis
import threading

# Connettersi a Redis
r = redis.from_url(
  'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
  health_check_interval=10,
  socket_connect_timeout=5,
  retry_on_timeout=True,
  socket_keepalive=True
)

# Creare un'istanza PubSub
p = r.pubsub()

# Iscriversi al canale "test"
p.subscribe('test')

# Creare una funzione che chiamerà `check_health` ogni 5 secondi
def redis_auto_check(p):
  t = threading.Timer(5, redis_auto_check, [p])
  t.start()
  p.check_health()

# Chiamare la funzione redis_auto_check
redis_auto_check(p)