"""Redis URL resolution: password injection and Celery DB separation.""" from __future__ import annotations from urllib.parse import quote, urlparse, urlunparse def _url_has_password(parsed) -> bool: return bool(parsed.password or parsed.username) def inject_redis_password(redis_url: str, password: str | None) -> str: """Inject REDIS_PASSWORD into URL when the URL has no credentials.""" if not password: return redis_url parsed = urlparse(redis_url) if _url_has_password(parsed): return redis_url host = parsed.hostname or "localhost" port = parsed.port netloc = f":{quote(password, safe='')}@{host}" if port is not None: netloc = f":{quote(password, safe='')}@{host}:{port}" return urlunparse( ( parsed.scheme or "redis", netloc, parsed.path or "", parsed.params, parsed.query, parsed.fragment, ) ) def _parse_db_index(path: str) -> int: segment = (path or "").strip("/") if not segment: return 0 try: return int(segment) except ValueError: return 0 def derive_celery_redis_url( redis_url: str, *, celery_redis_url_override: str | None = None, ) -> str: """Resolve Celery broker/backend URL (override or same host with DB+1). Business keys use ``REDIS_URL`` (typically DB/0); Celery broker/backend use the next logical DB. After upgrading to DB separation, unconsumed Celery keys on the business DB are abandoned (one-time cutover). When ``REDIS_URL`` uses DB/15, set ``CELERY_REDIS_URL`` explicitly — Redis only supports logical DBs 0–15. """ if celery_redis_url_override: return celery_redis_url_override parsed = urlparse(redis_url) db = _parse_db_index(parsed.path) if db >= 15: raise ValueError( "REDIS_URL uses DB/15; Celery cannot auto-derive DB+1. " "Set CELERY_REDIS_URL explicitly." ) new_path = f"/{db + 1}" return urlunparse( ( parsed.scheme or "redis", parsed.netloc, new_path, parsed.params, parsed.query, parsed.fragment, ) ) def resolve_redis_urls( redis_url: str, *, redis_password: str | None = None, celery_redis_url_override: str | None = None, ) -> tuple[str, str]: """Return (business_redis_url, celery_redis_url).""" business = inject_redis_password(redis_url, redis_password) celery = derive_celery_redis_url( business, celery_redis_url_override=celery_redis_url_override, ) if celery_redis_url_override is None: celery = inject_redis_password(celery, redis_password) return business, celery