Redis®*: Unikanie błędu "Connection Closed by Server" w Redis i Python

Ta dokumentacja jest częścią przewodnika Używanie z Pythonem. Pełny przewodnik znajdziesz tutaj: Jak połączyć Redis z Pythonem.

👋 Witamy w dokumentacji Stackhero!

Stackhero oferuje gotowe do użycia rozwiązanie Redis cloud, które zapewnia wiele korzyści, w tym:

  • Włączony web UI Redis Commander.
  • Nieograniczona wielkość i transfer wiadomości.
  • Bezproblemowe aktualizacje za pomocą jednego kliknięcia.
  • Optymalna wydajność i solidne zabezpieczenia dzięki prywatnej i dedykowanej VM.

Oszczędzaj czas i upraszczaj sobie życie: wystarczy 5 minut, aby wypróbować rozwiązanie hostingu Redis cloud Stackhero!

Możesz napotkać błąd redis.exceptions.ConnectionError: Connection closed by server, który może wystąpić z powodu bezczynności w Twojej aplikacji Python, powodując automatyczne zamknięcie połączenia. Gdy Twoja aplikacja próbuje się ponownie połączyć, może się nie udać, co skutkuje tym błędem.

Aby to złagodzić, rozważ ustawienie parametru health_check_interval w swoim połączeniu Redis w następujący sposób:

r = redis.from_url(
  'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
  health_check_interval=10,
  socket_connect_timeout=5,
  retry_on_timeout=True,
  socket_keepalive=True
)

Podczas korzystania z funkcji Pub/Sub Redis, upewnij się, że Twoja aplikacja wywołuje get_message() lub listen() częściej niż określony health_check_interval (w tym przykładzie co 10 sekund). Możesz odnieść się do oficjalnej dokumentacji redis-py po więcej szczegółów.

Jeśli te wywołania nie są wykonywane w tym przedziale, nadal możesz napotkać błąd "Connection closed by server". Praktycznym rozwiązaniem jest regularne używanie funkcji check_health().

Oto jak można to zaimplementować:

import redis
import threading

# Połącz z Redis
r = redis.from_url(
  'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
  health_check_interval=10,
  socket_connect_timeout=5,
  retry_on_timeout=True,
  socket_keepalive=True
)

# Utwórz instancję PubSub
p = r.pubsub()

# Subskrybuj kanał "test"
p.subscribe('test')

# Utwórz funkcję, która będzie wywoływać `check_health` co 5 sekund
def redis_auto_check(p):
  t = threading.Timer(5, redis_auto_check, [p])
  t.start()
  p.check_health()

# Wywołaj funkcję redis_auto_check
redis_auto_check(p)