"""结构化日志辅助:统一 ``event=`` 行格式与 Celery prerun 可提取的上下文字段。""" from __future__ import annotations from typing import Any def format_log_event(event: str, **fields: Any) -> str: """ 生成单行 ``event=...`` 日志正文:``event`` 固定首位;``msg`` 固定末位(若提供);其余键按字母序。 ``None`` 与空字符串会跳过;浮点数默认保留一位小数(适用于 ``duration_ms``)。 """ parts: list[str] = [f"event={event}"] keys = sorted(k for k in fields if k != "msg") ordered = list(keys) if "msg" in fields: ordered.append("msg") for k in ordered: v = fields[k] if v is None: continue if isinstance(v, float): parts.append(f"{k}={v:.1f}") elif isinstance(v, bool): parts.append(f"{k}={str(v).lower()}") else: s = str(v).strip() if not s: continue parts.append(f"{k}={s}") return " ".join(parts) def correlation_bind_kwargs( *, user_id: str | None = None, memoir_correlation_id: str | None = None, correlation_id: str | None = None, **more: str | None, ) -> dict[str, str]: """供 ``logger.bind(**...)``:``memoir_correlation_id`` 会以 ``correlation_id`` 写入(统一检索键)。""" out: dict[str, str] = {} uid = (user_id or "").strip() if uid: out["user_id"] = uid cid = (correlation_id or memoir_correlation_id or "").strip() if cid: out["correlation_id"] = cid for k, v in more.items(): if v is None: continue s = str(v).strip() if s: out[str(k)] = s return out # bind=True 任务的 positional 与字段名映射(kwargs 优先,缺位再填) _TASK_POSITIONAL_FIELDS: dict[str, tuple[str, ...]] = { "app.tasks.memory_enrichment_tasks.embed_memory_source": ("user_id", "source_id"), "app.tasks.memory_enrichment_tasks.enrich_memory_source": ("user_id", "source_id"), "app.tasks.memory_compaction_tasks.memory_compaction_run": ("user_id",), "app.tasks.chapter_compose_tasks.recompose_chapter": ("chapter_id",), "app.tasks.memoir_quality_pass_tasks.memoir_quality_pass": ("user_id",), "app.tasks.memoir_tasks.process_memoir_phase2": ("user_id", "chapter_category"), "app.tasks.memoir_tasks.process_memoir_phase1": ("user_id",), "app.tasks.memoir_tasks.generate_chapter_content": ("user_id", "stage"), "app.tasks.chapter_cover_tasks.generate_chapter_cover": ("chapter_id",), "app.tasks.story_image_tasks.generate_story_image": ("story_id",), "app.tasks.story_title_tasks.generate_story_title_after_create": ( "story_id", "chapter_category", "oral_scope", "user_id", ), } _KW_KEYS_COPY: tuple[str, ...] = ( "user_id", "source_id", "chapter_id", "story_id", "chapter_category", "stage", "oral_scope", ) def celery_prerun_extras( task_name: str | None, args: tuple[Any, ...], kwargs: dict[str, Any] | None, ) -> dict[str, str]: """ 从 Celery ``task_prerun`` 的 args/kwargs 提取 ``user_id``、``correlation_id`` 等, 供 ``set_celery_log_extras`` 与任务体内 loguru 记录关联。 """ out: dict[str, str] = {} kw = dict(kwargs or {}) mcid = kw.get("memoir_correlation_id") if mcid is not None: s = str(mcid).strip() if s: out["correlation_id"] = s for key in _KW_KEYS_COPY: if key not in kw: continue val = kw[key] if val is None: continue s = str(val).strip() if s: out[key] = s name = (task_name or "").strip() fields = _TASK_POSITIONAL_FIELDS.get(name) if fields and args: for i, field in enumerate(fields): if i >= len(args): break if field in out: continue val = args[i] if val is None: continue s = str(val).strip() if s: out[field] = s return out