Files
life-echo/api/tests/test_redis_urls.py

47 lines
1.5 KiB
Python
Raw Permalink Normal View History

from app.core.redis_urls import (
derive_celery_redis_url,
inject_redis_password,
resolve_redis_urls,
)
def test_inject_redis_password_when_missing() -> None:
url = inject_redis_password("redis://localhost:6379/0", "secret")
assert url == "redis://:secret@localhost:6379/0"
def test_inject_redis_password_skips_when_present() -> None:
url = inject_redis_password("redis://:existing@localhost:6379/0", "secret")
assert url == "redis://:existing@localhost:6379/0"
def test_derive_celery_redis_url_increments_db() -> None:
url = derive_celery_redis_url("redis://localhost:6379/0")
assert url == "redis://localhost:6379/1"
def test_resolve_redis_urls_applies_password_to_both() -> None:
business, celery = resolve_redis_urls(
"redis://localhost:6379/0",
redis_password="secret",
)
assert business == "redis://:secret@localhost:6379/0"
assert celery == "redis://:secret@localhost:6379/1"
def test_celery_redis_url_override() -> None:
business, celery = resolve_redis_urls(
"redis://localhost:6379/0",
redis_password="secret",
celery_redis_url_override="redis://broker:6380/2",
)
assert business == "redis://:secret@localhost:6379/0"
assert celery == "redis://broker:6380/2"
def test_derive_celery_redis_url_rejects_db_15() -> None:
import pytest
with pytest.raises(ValueError, match="CELERY_REDIS_URL"):
derive_celery_redis_url("redis://localhost:6379/15")