Valkey: Vermijden van "Connection Closed by Server" fouten

Deze documentatie maakt deel uit van de Gebruik met Python-gids. Bekijk de volledige gids hier: Hoe Valkey met Python te verbinden.

👋 Welkom bij de Stackhero-documentatie!

Stackhero biedt een kant-en-klare Valkey cloud oplossing die tal van voordelen biedt, waaronder:

  • Redis Commander web UI inbegrepen.
  • Onbeperkte berichtgrootte en overdrachten.
  • Moeiteloze updates met slechts één klik.
  • Optimale prestaties en robuuste beveiliging aangedreven door een privé en dedicated VM.

Bespaar tijd en vereenvoudig uw leven: het kost slechts 5 minuten om de Valkey cloud hosting oplossing van Stackhero te proberen!

De fout redis.exceptions.ConnectionError: Connection closed by server kan optreden als uw app niet gedurende een bepaalde periode met Valkey communiceert, wat leidt tot een automatische ontkoppeling. Om dit te voorkomen, kunt u de parameter health_check_interval instellen zoals hieronder weergegeven:

r = redis.from_url(
  'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
  health_check_interval=10,
  socket_connect_timeout=5,
  retry_on_timeout=True,
  socket_keepalive=True
)

Bij het gebruik van de Pub/Sub-functie van Valkey verwacht de redis-py-bibliotheek dat functies zoals get_message() of listen() vaker worden aangeroepen dan het health_check_interval. In ons voorbeeld hebben we dit interval ingesteld op 10 seconden, dus zorg ervoor dat u get_message() of listen() ten minste één keer per 10 seconden aanroept (raadpleeg de officiële documentatie van redis-py).

Als dit niet wordt gedaan, kunt u dezelfde verbindingsfout tegenkomen. Om dit te vermijden, overweeg regelmatig check_health() aan te roepen.

Hier is hoe u het kunt implementeren:

import redis
import threading

# Verbinden met Valkey
r = redis.from_url(
  'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
  health_check_interval=10,
  socket_connect_timeout=5,
  retry_on_timeout=True,
  socket_keepalive=True
)

# Maak een PubSub-instantie
p = r.pubsub()

# Abonneer op het kanaal "test"
p.subscribe('test')

# Maak een functie die `check_health` elke 5 seconden aanroept
def valkey_auto_check(p):
  t = threading.Timer(5, valkey_auto_check, [p])
  t.start()
  p.check_health()

# Roep de functie valkey_auto_check aan
valkey_auto_check(p)