Valkey: Eviter les erreurs "Connection Closed by Server"
Cette documentation fait partie du guide Utilisation avec Python. Consultez le guide complet ici : Comment connecter Valkey avec Python.
👋 Bienvenue sur la documentation de Stackhero !
Stackhero propose une solution Valkey cloud prête à l'emploi qui offre de nombreux avantages, notamment :
- Interface web Redis Commander incluse.
- Taille et transferts de messages illimités.
- Mises à jour simplifiées en un clic.
- Performance optimale et sécurité renforcée grâce à une VM privée et dédiée.
Gagnez du temps et simplifiez-vous la vie : il suffit de 5 minutes pour essayer la solution Valkey cloud hosting de Stackhero !
L'erreur redis.exceptions.ConnectionError: Connection closed by server peut survenir si votre application n'interagit pas avec Valkey pendant un certain temps, ce qui entraîne une déconnexion automatique. Pour éviter cela, vous pouvez définir le paramètre health_check_interval comme ci-dessous :
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
Lorsque vous utilisez la fonctionnalité Pub/Sub de Valkey, la bibliothèque redis-py attend que des fonctions comme get_message() ou listen() soient appelées plus fréquemment que la valeur de health_check_interval. Dans notre exemple, cet intervalle est fixé à 10 secondes, il est donc important d'appeler get_message() ou listen() au moins une fois toutes les 10 secondes (voir la documentation officielle de redis-py).
Si cela n'est pas fait, vous risquez de rencontrer la même erreur de connexion. Pour l'éviter, pensez à appeler régulièrement check_health().
Voici comment l'implémenter :
import redis
import threading
# Connexion à Valkey
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
# Créer une instance PubSub
p = r.pubsub()
# S'abonner au canal "test"
p.subscribe('test')
# Créer une fonction qui appelle `check_health` toutes les 5 secondes
def valkey_auto_check(p):
t = threading.Timer(5, valkey_auto_check, [p])
t.start()
p.check_health()
# Appeler la fonction valkey_auto_check
valkey_auto_check(p)