43 lines
1.4 KiB
Python
43 lines
1.4 KiB
Python
|
|
"""Memoir 链路:correlation id 与 Phase2 派发 task_id 策略。"""
|
|||
|
|
|
|||
|
|
from __future__ import annotations
|
|||
|
|
|
|||
|
|
from app.core.memoir_pipeline_trace import (
|
|||
|
|
effective_correlation_id,
|
|||
|
|
new_memoir_correlation_id,
|
|||
|
|
)
|
|||
|
|
from app.tasks.memoir_tasks import _phase2_immediate_task_id
|
|||
|
|
|
|||
|
|
|
|||
|
|
def test_effective_correlation_id_prefers_explicit() -> None:
|
|||
|
|
assert (
|
|||
|
|
effective_correlation_id(explicit=" cid-1 ", celery_task_id="task-9")
|
|||
|
|
== "cid-1"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def test_effective_correlation_id_falls_back_to_celery_task_id() -> None:
|
|||
|
|
assert effective_correlation_id(explicit=None, celery_task_id="task-9") == "task-9"
|
|||
|
|
assert effective_correlation_id(explicit="", celery_task_id="task-9") == "task-9"
|
|||
|
|
|
|||
|
|
|
|||
|
|
def test_effective_correlation_id_generates_when_missing() -> None:
|
|||
|
|
a = effective_correlation_id(explicit=None, celery_task_id=None)
|
|||
|
|
b = effective_correlation_id(explicit=None, celery_task_id=None)
|
|||
|
|
assert len(a) > 20
|
|||
|
|
assert a != b
|
|||
|
|
|
|||
|
|
|
|||
|
|
def test_new_memoir_correlation_id_is_uuid_like() -> None:
|
|||
|
|
x = new_memoir_correlation_id()
|
|||
|
|
assert len(x) >= 32
|
|||
|
|
|
|||
|
|
|
|||
|
|
def test_phase2_immediate_task_id_stable_per_user_category() -> None:
|
|||
|
|
assert _phase2_immediate_task_id("u1", "childhood") == _phase2_immediate_task_id(
|
|||
|
|
"u1", "childhood"
|
|||
|
|
)
|
|||
|
|
assert _phase2_immediate_task_id("u1", "childhood") != _phase2_immediate_task_id(
|
|||
|
|
"u2", "childhood"
|
|||
|
|
)
|