配置 SSOT(TOML + .env) 统一错误契约 Auth 与事务边界 Redis / Celery 可靠性:业务 Redis(DB/0)与 Celery broker/backend(DB/1)显式拆分;连接池、sync client 可观测性(OpenTelemetry + LGTM)
45 lines
1.4 KiB
Python
45 lines
1.4 KiB
Python
"""Process-wide synchronous Redis client factory with connection pooling."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import threading
|
|
|
|
import redis
|
|
|
|
from app.core.config import settings
|
|
from app.core.runtime_constants import redis_defaults
|
|
|
|
_clients: dict[bool, redis.Redis] = {}
|
|
_init_lock = threading.Lock()
|
|
|
|
|
|
def get_sync_redis(*, decode_responses: bool = True) -> redis.Redis:
|
|
"""Return a shared sync Redis client (one pool per decode_responses mode)."""
|
|
client = _clients.get(decode_responses)
|
|
if client is not None:
|
|
return client
|
|
with _init_lock:
|
|
client = _clients.get(decode_responses)
|
|
if client is None:
|
|
client = redis.from_url(
|
|
settings.redis_url_resolved,
|
|
decode_responses=decode_responses,
|
|
socket_timeout=redis_defaults.socket_timeout_seconds,
|
|
socket_connect_timeout=redis_defaults.socket_connect_timeout_seconds,
|
|
health_check_interval=redis_defaults.health_check_interval_seconds,
|
|
retry_on_timeout=True,
|
|
)
|
|
_clients[decode_responses] = client
|
|
return client
|
|
|
|
|
|
def reset_sync_redis_clients_for_tests() -> None:
|
|
"""Close and clear cached clients (tests only)."""
|
|
with _init_lock:
|
|
for client in _clients.values():
|
|
try:
|
|
client.close()
|
|
except Exception:
|
|
pass
|
|
_clients.clear()
|