Redis®*: Evitar o erro "Connection Closed by Server" no Redis e Python

Esta documentação faz parte do guia Utilizar com Python. Consulte o guia completo aqui: Como conectar o Redis com Python.

👋 Bem-vindo à documentação da Stackhero!

A Stackhero oferece uma solução Redis cloud pronta a usar que proporciona uma série de benefícios, incluindo:

  • Interface web Redis Commander incluída.
  • Tamanho e transferências de mensagens ilimitados.
  • Atualizações simplificadas com apenas um clique.
  • Desempenho ótimo e segurança robusta garantidos por uma VM privada e dedicada.

Poupe tempo e simplifique a sua vida: são necessários apenas 5 minutos para experimentar a solução de hospedagem Redis cloud da Stackhero!

Pode encontrar o erro redis.exceptions.ConnectionError: Connection closed by server, que pode ocorrer devido à inatividade na sua aplicação Python, causando o encerramento automático da conexão. Quando a sua aplicação tenta reconectar, pode falhar, resultando neste erro.

Para mitigar isso, considere definir o parâmetro health_check_interval na sua conexão Redis da seguinte forma:

r = redis.from_url(
  'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
  health_check_interval=10,
  socket_connect_timeout=5,
  retry_on_timeout=True,
  socket_keepalive=True
)

Ao usar a funcionalidade Pub/Sub do Redis, certifique-se de que a sua aplicação chama get_message() ou listen() mais frequentemente do que o intervalo especificado por health_check_interval (neste exemplo, a cada 10 segundos). Pode consultar a documentação oficial do redis-py para mais detalhes.

Se estas chamadas não forem feitas dentro do intervalo, ainda pode encontrar o erro "Connection closed by server". Uma solução prática é usar regularmente a função check_health().

Aqui está como pode implementá-la:

import redis
import threading

# Conectar ao Redis
r = redis.from_url(
  'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
  health_check_interval=10,
  socket_connect_timeout=5,
  retry_on_timeout=True,
  socket_keepalive=True
)

# Criar uma instância PubSub
p = r.pubsub()

# Subscrever ao canal "test"
p.subscribe('test')

# Criar uma função que chamará `check_health` a cada 5 segundos
def redis_auto_check(p):
  t = threading.Timer(5, redis_auto_check, [p])
  t.start()
  p.check_health()

# Chamar a função redis_auto_check
redis_auto_check(p)