Files
life-echo/api/tests/test_memoir_pipeline_progress.py
Sully 53e0065e3e refactor(api): TOML 配置 SSOT、统一错误契约、Auth/事务加固与可观测性 (#33)
配置 SSOT(TOML + .env)
统一错误契约
Auth 与事务边界
Redis / Celery 可靠性:业务 Redis(DB/0)与 Celery broker/backend(DB/1)显式拆分;连接池、sync client
可观测性(OpenTelemetry + LGTM)
2026-05-22 13:44:50 +08:00

98 lines
3.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""memoir_pipeline_progress合并与读取逻辑假 Redis 客户端)。"""
import json
import pytest
import app.core.memoir_pipeline_progress as mpp
class _FakeRedis:
def __init__(self) -> None:
self.store: dict[str, str] = {}
def get(self, key: str) -> str | None:
return self.store.get(key)
def setex(self, key: str, _ttl: int, value: str) -> None:
self.store[key] = value
@pytest.fixture
def fake_redis(monkeypatch: pytest.MonkeyPatch) -> _FakeRedis:
fr = _FakeRedis()
monkeypatch.setattr(mpp, "_redis", lambda: fr)
return fr
def test_merge_pipeline_run_creates_doc(fake_redis: _FakeRedis) -> None:
mpp.merge_pipeline_run("cid-1", {"phase1": {"step": "memory_ingest"}})
raw = fake_redis.store.get("memoir_pipeline_run:cid-1")
assert raw
doc = json.loads(raw)
assert doc["memoir_correlation_id"] == "cid-1"
assert doc["phase1"]["step"] == "memory_ingest"
def test_merge_phase2_merges_by_task_id(fake_redis: _FakeRedis) -> None:
mpp.merge_pipeline_run(
"cid-2",
{
"phase2": [
{"chapter_category": "a", "task_id": "t1", "status": "enqueued"},
],
},
)
mpp.merge_pipeline_run(
"cid-2",
{"phase2": [{"task_id": "t1", "status": "running"}]},
)
raw = fake_redis.store["memoir_pipeline_run:cid-2"]
doc = json.loads(raw)
assert len(doc["phase2"]) == 1
assert doc["phase2"][0]["task_id"] == "t1"
assert doc["phase2"][0]["status"] == "running"
assert doc["phase2"][0]["chapter_category"] == "a"
def test_merge_fanout_lists_merge_by_id(fake_redis: _FakeRedis) -> None:
mpp.merge_pipeline_run(
"cid-3",
{
"fanout": {
"story_images": [
{"story_id": "s1", "task_id": "img1", "status": "enqueued"},
],
},
},
)
mpp.merge_pipeline_run(
"cid-3",
{
"fanout": {
"story_images": [
{"story_id": "s1", "status": "success"},
],
},
},
)
doc = json.loads(fake_redis.store["memoir_pipeline_run:cid-3"])
assert len(doc["fanout"]["story_images"]) == 1
assert doc["fanout"]["story_images"][0]["task_id"] == "img1"
assert doc["fanout"]["story_images"][0]["status"] == "success"
def test_init_and_index_resolve(fake_redis: _FakeRedis) -> None:
mpp.init_pipeline_run_from_phase1("user-a", "cid-4", "p1tid", segment_count=3)
cid = mpp.resolve_correlation_id_for_phase1_task("p1tid")
assert cid == "cid-4"
snap = mpp.get_pipeline_run_for_eval("user-a", phase1_task_id="p1tid")
assert snap is not None
assert snap["user_id"] == "user-a"
assert snap["phase1"]["task_id"] == "p1tid"
def test_get_pipeline_run_for_eval_user_mismatch(fake_redis: _FakeRedis) -> None:
mpp.init_pipeline_run_from_phase1("user-a", "cid-5", "p1b", segment_count=1)
assert mpp.get_pipeline_run_for_eval("other", phase1_task_id="p1b") is None