Redis®*: Éviter l'erreur "Connection Closed by Server" dans Redis et Python
Cette documentation fait partie du guide Utilisation avec Python. Consultez le guide complet ici : Comment connecter Redis avec Python.
👋 Bienvenue sur la documentation de Stackhero !
Stackhero propose une solution Redis cloud prête à l'emploi offrant de nombreux avantages, notamment :
- Interface web
Redis Commanderincluse.- Taille et transferts de messages illimités.
- Mises à jour simplifiées en un clic.
- Performance optimale et sécurité renforcée grâce à une VM privée et dédiée.
Gagnez du temps et simplifiez-vous la vie : il suffit de 5 minutes pour essayer la solution d'hébergement Redis cloud de Stackhero !
Vous pourriez rencontrer l'erreur redis.exceptions.ConnectionError: Connection closed by server, qui peut survenir en raison de l'inactivité de votre application Python, entraînant la fermeture automatique de la connexion. Lorsque votre application tente de se reconnecter, elle peut échouer, entraînant cette erreur.
Pour atténuer cela, envisagez de définir le paramètre health_check_interval dans votre connexion Redis comme suit :
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
Lors de l'utilisation de la fonctionnalité Pub/Sub de Redis, assurez-vous que votre application appelle get_message() ou listen() plus fréquemment que l'intervalle spécifié par health_check_interval (dans cet exemple, toutes les 10 secondes). Vous pouvez vous référer à la documentation officielle de redis-py pour plus de détails.
Si ces appels ne sont pas effectués dans l'intervalle, vous pourriez toujours rencontrer l'erreur "Connection closed by server". Une solution pratique est d'utiliser régulièrement la fonction check_health().
Voici comment vous pouvez l'implémenter :
import redis
import threading
# Connecter à Redis
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
# Créer une instance PubSub
p = r.pubsub()
# S'abonner au canal "test"
p.subscribe('test')
# Créer une fonction qui appellera `check_health` toutes les 5 secondes
def redis_auto_check(p):
t = threading.Timer(5, redis_auto_check, [p])
t.start()
p.check_health()
# Appeler la fonction redis_auto_check
redis_auto_check(p)