Files
life-echo/api/app/core/redis_urls.py
Sully 53e0065e3e refactor(api): TOML 配置 SSOT、统一错误契约、Auth/事务加固与可观测性 (#33)
配置 SSOT(TOML + .env)
统一错误契约
Auth 与事务边界
Redis / Celery 可靠性:业务 Redis(DB/0)与 Celery broker/backend(DB/1)显式拆分;连接池、sync client
可观测性(OpenTelemetry + LGTM)
2026-05-22 13:44:50 +08:00

97 lines
2.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Redis URL resolution: password injection and Celery DB separation."""
from __future__ import annotations
from urllib.parse import quote, urlparse, urlunparse
def _url_has_password(parsed) -> bool:
return bool(parsed.password or parsed.username)
def inject_redis_password(redis_url: str, password: str | None) -> str:
"""Inject REDIS_PASSWORD into URL when the URL has no credentials."""
if not password:
return redis_url
parsed = urlparse(redis_url)
if _url_has_password(parsed):
return redis_url
host = parsed.hostname or "localhost"
port = parsed.port
netloc = f":{quote(password, safe='')}@{host}"
if port is not None:
netloc = f":{quote(password, safe='')}@{host}:{port}"
return urlunparse(
(
parsed.scheme or "redis",
netloc,
parsed.path or "",
parsed.params,
parsed.query,
parsed.fragment,
)
)
def _parse_db_index(path: str) -> int:
segment = (path or "").strip("/")
if not segment:
return 0
try:
return int(segment)
except ValueError:
return 0
def derive_celery_redis_url(
redis_url: str,
*,
celery_redis_url_override: str | None = None,
) -> str:
"""Resolve Celery broker/backend URL (override or same host with DB+1).
Business keys use ``REDIS_URL`` (typically DB/0); Celery broker/backend use
the next logical DB. After upgrading to DB separation, unconsumed Celery
keys on the business DB are abandoned (one-time cutover).
When ``REDIS_URL`` uses DB/15, set ``CELERY_REDIS_URL`` explicitly — Redis
only supports logical DBs 015.
"""
if celery_redis_url_override:
return celery_redis_url_override
parsed = urlparse(redis_url)
db = _parse_db_index(parsed.path)
if db >= 15:
raise ValueError(
"REDIS_URL uses DB/15; Celery cannot auto-derive DB+1. "
"Set CELERY_REDIS_URL explicitly."
)
new_path = f"/{db + 1}"
return urlunparse(
(
parsed.scheme or "redis",
parsed.netloc,
new_path,
parsed.params,
parsed.query,
parsed.fragment,
)
)
def resolve_redis_urls(
redis_url: str,
*,
redis_password: str | None = None,
celery_redis_url_override: str | None = None,
) -> tuple[str, str]:
"""Return (business_redis_url, celery_redis_url)."""
business = inject_redis_password(redis_url, redis_password)
celery = derive_celery_redis_url(
business,
celery_redis_url_override=celery_redis_url_override,
)
if celery_redis_url_override is None:
celery = inject_redis_password(celery, redis_password)
return business, celery