Redis®*: Utilisation avec Python
Comment connecter Redis avec Python
👋 Bienvenue sur la documentation de Stackhero !
Stackhero propose une solution Redis cloud prête à l'emploi qui offre de nombreux avantages, notamment :
- Interface web Redis Commander incluse.
- Taille des messages et transferts illimités.
- Mises à jour simplifiées en un clic.
- Performance optimale et sécurité renforcée grâce à une VM privée et dédiée.
Gagnez du temps et simplifiez-vous la vie : il suffit de 5 minutes pour essayer la solution Redis cloud hosting de Stackhero !
Choisir la bonne bibliothèque Python pour se connecter à Redis
Pour connecter facilement votre application Python à une instance Redis, la bibliothèque redis est un choix populaire et performant. Vous pouvez commencer en l'installant avec la commande suivante :
pip install redis
pip freeze > requirements.txt
Connecter Python à Redis
Voici un exemple simple utilisant les valeurs par défaut, adapté à la plupart des besoins :
import redis
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
Pour renforcer la sécurité, il est recommandé d'utiliser des variables d'environnement pour vos identifiants. Voici comment procéder :
import os
import redis
r = redis.from_url(
os.environ.get("STACKHERO_REDIS_URL_TLS"),
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
Assurez-vous que vos variables d'environnement incluent une définition comme : STACKHERO_REDIS_URL_TLS=rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>
Utiliser Pub/Sub avec Redis et Python
La fonctionnalité Publish/Subscribe de Redis s'utilise très simplement avec Python. Voici un exemple pour vous guider :
import redis
# Connexion à Redis
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
socket_keepalive=True,
retry_on_timeout=True
)
# Création d'une instance PubSub
p = r.pubsub()
# S'abonner au canal "test"
p.subscribe('test')
# Publier un message sur le canal "test"
r.publish('test', 'This is a test message')
# Récupérer le premier message disponible du canal "test"
p.get_message()
# Se désabonner du canal "test"
p.unsubscribe('test')
Exemples avancés de Pub/Sub Redis avec Python
Vous pouvez explorer des techniques Pub/Sub plus avancées avec les exemples suivants :
# Créer une instance PubSub en ignorant les messages d'abonnement
p = r.pubsub(ignore_subscribe_messages=True)
# S'abonner à plusieurs canaux
p.subscribe('test-1', 'test-2', ...)
# Se désabonner de plusieurs canaux
p.unsubscribe('test-1', 'test-2', ...)
# Vous pouvez aussi utiliser "unsubscribe" sans argument pour se désabonner de tous les canaux
p.unsubscribe()
# S'abonner à des canaux via un pattern
p.psubscribe('my-*')
# Se désabonner de canaux via un pattern
p.punsubscribe('my-*')
Eviter l'erreur "Connection Closed by Server" avec Redis et Python
Vous pouvez rencontrer l'erreur redis.exceptions.ConnectionError: Connection closed by server, qui peut survenir en cas d'inactivité de votre application Python, entraînant la fermeture automatique de la connexion. Lorsque votre application tente de se reconnecter, cela peut échouer et générer cette erreur.
Pour éviter cela, il est conseillé de définir le paramètre health_check_interval dans votre connexion Redis comme ceci :
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
Lorsque vous utilisez la fonctionnalité Pub/Sub de Redis, assurez-vous que votre application appelle get_message() ou listen() plus fréquemment que l'intervalle défini par health_check_interval (dans cet exemple, toutes les 10 secondes). Vous pouvez consulter la documentation officielle de redis-py pour plus d'informations.
Si ces appels ne sont pas effectués dans l'intervalle, vous risquez toujours de rencontrer l'erreur "Connection closed by server". Une solution efficace consiste à utiliser régulièrement la fonction check_health().
Voici comment l'implémenter :
import redis
import threading
# Connexion à Redis
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
# Création d'une instance PubSub
p = r.pubsub()
# S'abonner au canal "test"
p.subscribe('test')
# Créer une fonction qui appelle `check_health` toutes les 5 secondes
def redis_auto_check(p):
t = threading.Timer(5, redis_auto_check, [p])
t.start()
p.check_health()
# Appeler la fonction redis_auto_check
redis_auto_check(p)