Redis®*: Het vermijden van de "Connection Closed by Server"-fout in Redis en Python
Deze documentatie maakt deel uit van de Gebruik met Python-gids. Bekijk de volledige gids hier: Hoe Redis met Python te verbinden.
👋 Welkom bij de Stackhero-documentatie!
Stackhero biedt een kant-en-klare Redis cloud oplossing die tal van voordelen biedt, waaronder:
Redis Commanderweb UI inbegrepen.- Onbeperkte berichtgrootte en overdrachten.
- Moeiteloze updates met slechts één klik.
- Optimale prestaties en robuuste beveiliging aangedreven door een privé en dedicated VM.
Bespaar tijd en vereenvoudig uw leven: het kost slechts 5 minuten om de Redis cloud hosting oplossing van Stackhero te proberen!
U kunt de fout redis.exceptions.ConnectionError: Connection closed by server tegenkomen, die kan optreden vanwege inactiviteit in uw Python-applicatie, waardoor de verbinding automatisch wordt gesloten. Wanneer uw app probeert opnieuw verbinding te maken, kan dit mislukken, wat resulteert in deze fout.
Om dit te verminderen, overweeg om de parameter health_check_interval in uw Redis-verbinding als volgt in te stellen:
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
Bij het gebruik van de Pub/Sub-functie van Redis, zorg ervoor dat uw applicatie get_message() of listen() vaker aanroept dan het gespecificeerde health_check_interval (in dit voorbeeld elke 10 seconden). U kunt de officiële documentatie van redis-py raadplegen voor meer details.
Als deze oproepen niet binnen het interval worden gedaan, kunt u nog steeds de "Connection closed by server"-fout tegenkomen. Een praktische oplossing is om regelmatig de functie check_health() te gebruiken.
Hier is hoe u het kunt implementeren:
import redis
import threading
# Verbinden met Redis
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
# Maak een PubSub-instantie
p = r.pubsub()
# Abonneren op het kanaal "test"
p.subscribe('test')
# Maak een functie die `check_health` elke 5 seconden aanroept
def redis_auto_check(p):
t = threading.Timer(5, redis_auto_check, [p])
t.start()
p.check_health()
# Roep de functie redis_auto_check aan
redis_auto_check(p)