"""Memoir 链路:correlation id 与 Phase2 派发 task_id 策略。""" from __future__ import annotations from app.core.memoir_pipeline_trace import ( effective_correlation_id, new_memoir_correlation_id, ) from app.tasks.memoir_tasks import _phase2_immediate_task_id def test_effective_correlation_id_prefers_explicit() -> None: assert ( effective_correlation_id(explicit=" cid-1 ", celery_task_id="task-9") == "cid-1" ) def test_effective_correlation_id_falls_back_to_celery_task_id() -> None: assert effective_correlation_id(explicit=None, celery_task_id="task-9") == "task-9" assert effective_correlation_id(explicit="", celery_task_id="task-9") == "task-9" def test_effective_correlation_id_generates_when_missing() -> None: a = effective_correlation_id(explicit=None, celery_task_id=None) b = effective_correlation_id(explicit=None, celery_task_id=None) assert len(a) > 20 assert a != b def test_new_memoir_correlation_id_is_uuid_like() -> None: x = new_memoir_correlation_id() assert len(x) >= 32 def test_phase2_immediate_task_id_stable_per_user_category() -> None: assert _phase2_immediate_task_id("u1", "childhood") == _phase2_immediate_task_id( "u1", "childhood" ) assert _phase2_immediate_task_id("u1", "childhood") != _phase2_immediate_task_id( "u2", "childhood" )