配置 SSOT(TOML + .env) 统一错误契约 Auth 与事务边界 Redis / Celery 可靠性:业务 Redis(DB/0)与 Celery broker/backend(DB/1)显式拆分;连接池、sync client 可观测性(OpenTelemetry + LGTM)
98 lines
3.0 KiB
Python
98 lines
3.0 KiB
Python
"""memoir_pipeline_progress:合并与读取逻辑(假 Redis 客户端)。"""
|
||
|
||
import json
|
||
|
||
import pytest
|
||
|
||
import app.core.memoir_pipeline_progress as mpp
|
||
|
||
|
||
class _FakeRedis:
|
||
def __init__(self) -> None:
|
||
self.store: dict[str, str] = {}
|
||
|
||
def get(self, key: str) -> str | None:
|
||
return self.store.get(key)
|
||
|
||
def setex(self, key: str, _ttl: int, value: str) -> None:
|
||
self.store[key] = value
|
||
|
||
|
||
@pytest.fixture
|
||
def fake_redis(monkeypatch: pytest.MonkeyPatch) -> _FakeRedis:
|
||
fr = _FakeRedis()
|
||
monkeypatch.setattr(mpp, "_redis", lambda: fr)
|
||
return fr
|
||
|
||
|
||
def test_merge_pipeline_run_creates_doc(fake_redis: _FakeRedis) -> None:
|
||
mpp.merge_pipeline_run("cid-1", {"phase1": {"step": "memory_ingest"}})
|
||
raw = fake_redis.store.get("memoir_pipeline_run:cid-1")
|
||
assert raw
|
||
doc = json.loads(raw)
|
||
assert doc["memoir_correlation_id"] == "cid-1"
|
||
assert doc["phase1"]["step"] == "memory_ingest"
|
||
|
||
|
||
def test_merge_phase2_merges_by_task_id(fake_redis: _FakeRedis) -> None:
|
||
mpp.merge_pipeline_run(
|
||
"cid-2",
|
||
{
|
||
"phase2": [
|
||
{"chapter_category": "a", "task_id": "t1", "status": "enqueued"},
|
||
],
|
||
},
|
||
)
|
||
mpp.merge_pipeline_run(
|
||
"cid-2",
|
||
{"phase2": [{"task_id": "t1", "status": "running"}]},
|
||
)
|
||
raw = fake_redis.store["memoir_pipeline_run:cid-2"]
|
||
doc = json.loads(raw)
|
||
assert len(doc["phase2"]) == 1
|
||
assert doc["phase2"][0]["task_id"] == "t1"
|
||
assert doc["phase2"][0]["status"] == "running"
|
||
assert doc["phase2"][0]["chapter_category"] == "a"
|
||
|
||
|
||
def test_merge_fanout_lists_merge_by_id(fake_redis: _FakeRedis) -> None:
|
||
mpp.merge_pipeline_run(
|
||
"cid-3",
|
||
{
|
||
"fanout": {
|
||
"story_images": [
|
||
{"story_id": "s1", "task_id": "img1", "status": "enqueued"},
|
||
],
|
||
},
|
||
},
|
||
)
|
||
mpp.merge_pipeline_run(
|
||
"cid-3",
|
||
{
|
||
"fanout": {
|
||
"story_images": [
|
||
{"story_id": "s1", "status": "success"},
|
||
],
|
||
},
|
||
},
|
||
)
|
||
doc = json.loads(fake_redis.store["memoir_pipeline_run:cid-3"])
|
||
assert len(doc["fanout"]["story_images"]) == 1
|
||
assert doc["fanout"]["story_images"][0]["task_id"] == "img1"
|
||
assert doc["fanout"]["story_images"][0]["status"] == "success"
|
||
|
||
|
||
def test_init_and_index_resolve(fake_redis: _FakeRedis) -> None:
|
||
mpp.init_pipeline_run_from_phase1("user-a", "cid-4", "p1tid", segment_count=3)
|
||
cid = mpp.resolve_correlation_id_for_phase1_task("p1tid")
|
||
assert cid == "cid-4"
|
||
snap = mpp.get_pipeline_run_for_eval("user-a", phase1_task_id="p1tid")
|
||
assert snap is not None
|
||
assert snap["user_id"] == "user-a"
|
||
assert snap["phase1"]["task_id"] == "p1tid"
|
||
|
||
|
||
def test_get_pipeline_run_for_eval_user_mismatch(fake_redis: _FakeRedis) -> None:
|
||
mpp.init_pipeline_run_from_phase1("user-a", "cid-5", "p1b", segment_count=1)
|
||
assert mpp.get_pipeline_run_for_eval("other", phase1_task_id="p1b") is None
|