Redis®*: Kaip išvengti "Connection Closed by Server" klaidos Redis ir Python

Ši dokumentacija yra Naudojimas su Python vadovo dalis. Visą vadovą rasite čia: Kaip sujungti Redis su Python.

👋 Sveiki atvykę į Stackhero dokumentaciją!

Stackhero siūlo paruoštą naudoti Redis cloud sprendimą, kuris suteikia daugybę privalumų, įskaitant:

  • Įtraukta Redis Commander web UI.
  • Neribotas žinučių dydis ir perdavimai.
  • Paprasti atnaujinimai vienu spustelėjimu.
  • Optimali veikla ir stiprus saugumas, užtikrinamas privačiu ir dedikuotu VM.

Taupykite laiką ir supaprastinkite savo gyvenimą: tereikia 5 minučių, kad išbandytumėte Stackhero Redis cloud hosting sprendimą!

Galite susidurti su redis.exceptions.ConnectionError: Connection closed by server klaida, kuri gali atsirasti dėl jūsų Python aplikacijos neveiklumo, dėl ko ryšys automatiškai uždaromas. Kai jūsų programa bando prisijungti iš naujo, ji gali nepavykti, sukeldama šią klaidą.

Norėdami tai sumažinti, apsvarstykite galimybę nustatyti health_check_interval parametrą jūsų Redis ryšyje taip:

r = redis.from_url(
  'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
  health_check_interval=10,
  socket_connect_timeout=5,
  retry_on_timeout=True,
  socket_keepalive=True
)

Naudojant Redis Pub/Sub funkciją, įsitikinkite, kad jūsų programa kviečia get_message() arba listen() dažniau nei nurodytas health_check_interval (šiame pavyzdyje, kas 10 sekundžių). Galite kreiptis į redis-py oficialią dokumentaciją dėl daugiau detalių.

Jei šie kvietimai nėra atliekami per intervalą, vis tiek galite susidurti su "Connection closed by server" klaida. Praktinis sprendimas yra reguliariai naudoti check_health() funkciją.

Štai kaip galite tai įgyvendinti:

import redis
import threading

# Prisijungti prie Redis
r = redis.from_url(
  'rediss://default:<password>@<XXXXXX>.stackhero-network.com:<PORT_TLS>',
  health_check_interval=10,
  socket_connect_timeout=5,
  retry_on_timeout=True,
  socket_keepalive=True
)

# Sukurti PubSub instanciją
p = r.pubsub()

# Prenumeruoti kanalą "test"
p.subscribe('test')

# Sukurti funkciją, kuri kvies `check_health` kas 5 sekundes
def redis_auto_check(p):
  t = threading.Timer(5, redis_auto_check, [p])
  t.start()
  p.check_health()

# Kviesti redis_auto_check funkciją
redis_auto_check(p)