Redis®*: Vermeidung des Fehlers "Connection Closed by Server" in Redis und Python

Diese Dokumentation ist Teil des Verwendung mit Python-Leitfadens. Den vollständigen Leitfaden finden Sie hier: Wie man Redis mit Python verbindet.

👋 Willkommen in der Stackhero-Dokumentation!

Stackhero bietet eine einsatzbereite Redis Cloud Lösung mit zahlreichen Vorteilen, darunter:

  • Redis Commander Web-UI inklusive.
  • Unbegrenzte Nachrichtengröße und Übertragungen.
  • Mühelose Updates mit nur einem Klick.
  • Optimale Performance und robuste Sicherheit durch eine private und dedizierte VM.

Sparen Sie Zeit und vereinfachen Sie Ihr Leben: Es dauert nur 5 Minuten, um die Redis Cloud Hosting Lösung von Stackhero auszuprobieren!

Sie könnten auf den Fehler redis.exceptions.ConnectionError: Connection closed by server stoßen, der aufgrund von Inaktivität in Ihrer Python-Anwendung auftreten kann, was dazu führt, dass die Verbindung automatisch geschlossen wird. Wenn Ihre App versucht, sich erneut zu verbinden, kann dies fehlschlagen und zu diesem Fehler führen.

Um dies zu mildern, sollten Sie den Parameter health_check_interval in Ihrer Redis-Verbindung wie folgt festlegen:

r = redis.from_url(
  'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
  health_check_interval=10,
  socket_connect_timeout=5,
  retry_on_timeout=True,
  socket_keepalive=True
)

Bei der Verwendung der Pub/Sub-Funktion von Redis stellen Sie sicher, dass Ihre Anwendung get_message() oder listen() häufiger aufruft als das angegebene health_check_interval (in diesem Beispiel alle 10 Sekunden). Sie können die offizielle redis-py-Dokumentation für weitere Details konsultieren.

Wenn diese Aufrufe nicht innerhalb des Intervalls erfolgen, könnten Sie weiterhin auf den Fehler "Connection closed by server" stoßen. Eine praktische Lösung ist die regelmäßige Verwendung der Funktion check_health().

Hier ist, wie Sie es implementieren können:

import redis
import threading

# Verbindung zu Redis herstellen
r = redis.from_url(
  'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
  health_check_interval=10,
  socket_connect_timeout=5,
  retry_on_timeout=True,
  socket_keepalive=True
)

# Erstellen einer PubSub-Instanz
p = r.pubsub()

# Abonnieren des Kanals "test"
p.subscribe('test')

# Erstellen einer Funktion, die `check_health` alle 5 Sekunden aufruft
def redis_auto_check(p):
  t = threading.Timer(5, redis_auto_check, [p])
  t.start()
  p.check_health()

# Die Funktion redis_auto_check aufrufen
redis_auto_check(p)