Files
life-echo/api/tests/test_redis_urls.py
Sully 53e0065e3e refactor(api): TOML 配置 SSOT、统一错误契约、Auth/事务加固与可观测性 (#33)
配置 SSOT(TOML + .env)
统一错误契约
Auth 与事务边界
Redis / Celery 可靠性:业务 Redis(DB/0)与 Celery broker/backend(DB/1)显式拆分;连接池、sync client
可观测性(OpenTelemetry + LGTM)
2026-05-22 13:44:50 +08:00

47 lines
1.5 KiB
Python

from app.core.redis_urls import (
derive_celery_redis_url,
inject_redis_password,
resolve_redis_urls,
)
def test_inject_redis_password_when_missing() -> None:
url = inject_redis_password("redis://localhost:6379/0", "secret")
assert url == "redis://:secret@localhost:6379/0"
def test_inject_redis_password_skips_when_present() -> None:
url = inject_redis_password("redis://:existing@localhost:6379/0", "secret")
assert url == "redis://:existing@localhost:6379/0"
def test_derive_celery_redis_url_increments_db() -> None:
url = derive_celery_redis_url("redis://localhost:6379/0")
assert url == "redis://localhost:6379/1"
def test_resolve_redis_urls_applies_password_to_both() -> None:
business, celery = resolve_redis_urls(
"redis://localhost:6379/0",
redis_password="secret",
)
assert business == "redis://:secret@localhost:6379/0"
assert celery == "redis://:secret@localhost:6379/1"
def test_celery_redis_url_override() -> None:
business, celery = resolve_redis_urls(
"redis://localhost:6379/0",
redis_password="secret",
celery_redis_url_override="redis://broker:6380/2",
)
assert business == "redis://:secret@localhost:6379/0"
assert celery == "redis://broker:6380/2"
def test_derive_celery_redis_url_rejects_db_15() -> None:
import pytest
with pytest.raises(ValueError, match="CELERY_REDIS_URL"):
derive_celery_redis_url("redis://localhost:6379/15")