配置 SSOT(TOML + .env) 统一错误契约 Auth 与事务边界 Redis / Celery 可靠性:业务 Redis(DB/0)与 Celery broker/backend(DB/1)显式拆分;连接池、sync client 可观测性(OpenTelemetry + LGTM)
47 lines
1.5 KiB
Python
47 lines
1.5 KiB
Python
from app.core.redis_urls import (
|
|
derive_celery_redis_url,
|
|
inject_redis_password,
|
|
resolve_redis_urls,
|
|
)
|
|
|
|
|
|
def test_inject_redis_password_when_missing() -> None:
|
|
url = inject_redis_password("redis://localhost:6379/0", "secret")
|
|
assert url == "redis://:secret@localhost:6379/0"
|
|
|
|
|
|
def test_inject_redis_password_skips_when_present() -> None:
|
|
url = inject_redis_password("redis://:existing@localhost:6379/0", "secret")
|
|
assert url == "redis://:existing@localhost:6379/0"
|
|
|
|
|
|
def test_derive_celery_redis_url_increments_db() -> None:
|
|
url = derive_celery_redis_url("redis://localhost:6379/0")
|
|
assert url == "redis://localhost:6379/1"
|
|
|
|
|
|
def test_resolve_redis_urls_applies_password_to_both() -> None:
|
|
business, celery = resolve_redis_urls(
|
|
"redis://localhost:6379/0",
|
|
redis_password="secret",
|
|
)
|
|
assert business == "redis://:secret@localhost:6379/0"
|
|
assert celery == "redis://:secret@localhost:6379/1"
|
|
|
|
|
|
def test_celery_redis_url_override() -> None:
|
|
business, celery = resolve_redis_urls(
|
|
"redis://localhost:6379/0",
|
|
redis_password="secret",
|
|
celery_redis_url_override="redis://broker:6380/2",
|
|
)
|
|
assert business == "redis://:secret@localhost:6379/0"
|
|
assert celery == "redis://broker:6380/2"
|
|
|
|
|
|
def test_derive_celery_redis_url_rejects_db_15() -> None:
|
|
import pytest
|
|
|
|
with pytest.raises(ValueError, match="CELERY_REDIS_URL"):
|
|
derive_celery_redis_url("redis://localhost:6379/15")
|