Redis®*: Avoiding the "Connection Closed by Server" error in Redis and Python

This documentation is part of the Using with Python guide. You can view the complete guide here: How to connect Redis with Python.

👋 Welcome to the Stackhero documentation!

Stackhero offers a ready-to-use Redis cloud solution that provides numerous benefits, including:

  • Redis Commander web UI included.
  • Unlimited message size and transfers.
  • Simplified updates with just a click.
  • Optimal performance and enhanced security powered by a private and dedicated VM.

Save time and simplify your life: it only takes 5 minutes to try Stackhero's Redis cloud hosting solution!

You might encounter the redis.exceptions.ConnectionError: Connection closed by server error, which can occur due to inactivity in your Python application, causing the connection to close automatically. When your app tries to reconnect, it may fail, resulting in this error.

To mitigate this, consider setting the health_check_interval parameter in your Redis connection like this:

r = redis.from_url(
  'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
  health_check_interval=10,
  socket_connect_timeout=5,
  retry_on_timeout=True,
  socket_keepalive=True
)

When using Redis' Pub/Sub feature, ensure that your application calls get_message() or listen() more frequently than the specified health_check_interval (in this example, every 10 seconds). You can refer to the redis-py official documentation for more details.

If these calls are not made within the interval, you may still encounter the "Connection closed by server" error. A practical solution is to use the check_health() function regularly.

Here is how you can implement it:

import redis
import threading

# Connect to Redis
r = redis.from_url(
  'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
  health_check_interval=10,
  socket_connect_timeout=5,
  retry_on_timeout=True,
  socket_keepalive=True
)

# Create a PubSub instance
p = r.pubsub()

# Subscribe to the channel "test"
p.subscribe('test')

# Create a function that will call `check_health` every 5 seconds
def redis_auto_check(p):
  t = threading.Timer(5, redis_auto_check, [p])
  t.start()
  p.check_health()

# Call the redis_auto_check function
redis_auto_check(p)