"""Celery worker:任务生命周期内通过 ContextVar 注入 loguru extra(user_id、correlation_id 等)。""" from __future__ import annotations from contextvars import ContextVar from typing import Mapping _ctx: ContextVar[dict[str, str] | None] = ContextVar("celery_log_extras", default=None) def set_celery_log_extras(extras: Mapping[str, str] | None) -> None: """在 ``task_prerun`` 中调用;值会并入后续 loguru 记录的 ``extra``(不覆盖已有非空 bind)。""" if not extras: _ctx.set(None) return cleaned: dict[str, str] = {} for k, v in extras.items(): if v is None: continue s = str(v).strip() if s: cleaned[str(k).strip()] = s _ctx.set(cleaned or None) def clear_celery_log_extras() -> None: """在 ``task_postrun`` 中调用,避免泄漏到同进程下一任务。""" _ctx.set(None) def get_celery_log_extras() -> dict[str, str]: v = _ctx.get() return dict(v) if v else {}