Valkey: Evitar erros "Connection Closed by Server"
Esta documentação faz parte do guia Utilizar com Python. Consulte o guia completo aqui: Como conectar Valkey com Python.
👋 Bem-vindo à documentação do Stackhero!
A Stackhero oferece uma solução Valkey cloud pronta a usar que proporciona uma série de benefícios, incluindo:
- Interface web UI
Redis Commanderincluída.- Tamanho e transferências de mensagens ilimitados.
- Atualizações simplificadas com apenas um clique.
- Desempenho ótimo e segurança robusta alimentados por uma VM privada e dedicada.
Poupe tempo e simplifique a sua vida: são necessários apenas 5 minutos para experimentar a solução Valkey cloud hosting da Stackhero!
O erro redis.exceptions.ConnectionError: Connection closed by server pode ocorrer se a sua aplicação não interagir com o Valkey por um período, levando a uma desconexão automática. Para evitar isso, pode definir o parâmetro health_check_interval como mostrado abaixo:
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
Ao usar a funcionalidade Pub/Sub do Valkey, a biblioteca redis-py espera que funções como get_message() ou listen() sejam chamadas mais frequentemente do que o intervalo health_check_interval. No nosso exemplo, definimos este intervalo para 10 segundos, por isso, certifique-se de chamar get_message() ou listen() pelo menos uma vez a cada 10 segundos (consulte a documentação oficial do redis-py).
Se isso não for feito, pode encontrar o mesmo erro de conexão. Para evitar isso, considere chamar regularmente check_health().
Aqui está como pode implementá-lo:
import redis
import threading
# Conectar ao Valkey
r = redis.from_url(
'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
health_check_interval=10,
socket_connect_timeout=5,
retry_on_timeout=True,
socket_keepalive=True
)
# Criar uma instância PubSub
p = r.pubsub()
# Subscrever ao canal "test"
p.subscribe('test')
# Criar uma função que chamará `check_health` a cada 5 segundos
def valkey_auto_check(p):
t = threading.Timer(5, valkey_auto_check, [p])
t.start()
p.check_health()
# Chamar a função valkey_auto_check
valkey_auto_check(p)