Files
life-echo/api/tests/test_memoir_route_defer.py
Sully 53e0065e3e refactor(api): TOML 配置 SSOT、统一错误契约、Auth/事务加固与可观测性 (#33)
配置 SSOT(TOML + .env)
统一错误契约
Auth 与事务边界
Redis / Celery 可靠性:业务 Redis(DB/0)与 Celery broker/backend(DB/1)显式拆分;连接池、sync client
可观测性(OpenTelemetry + LGTM)
2026-05-22 13:44:50 +08:00

439 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Phase2 路由低置信延迟管线deferred 池 / 唤醒 / 重试上限。"""
from __future__ import annotations
import uuid
from datetime import datetime, timedelta, timezone
from types import SimpleNamespace
from unittest.mock import DEFAULT, MagicMock, patch
import pytest
from sqlalchemy import create_engine, select
from sqlalchemy.orm import sessionmaker
# 与 alembic/env.py 一致:注册全部 ORM避免 relationship 解析失败
from app.agents.memoir.story_route_agent import StoryRouteDecision
from app.agents.state_schema import MemoirStateSchema
from app.core.config import settings
from app.core.db import Base
from app.features.asset import models as _asset_models # noqa: F401
from app.features.auth import models as _auth_models # noqa: F401
from app.features.conversation import models as _conv_models # noqa: F401
from app.features.conversation.models import Conversation, Segment
from app.features.memoir import models as _memoir_models # noqa: F401
from app.features.memoir.story_pipeline_sync import (
StoryPipelineResult,
run_story_pipeline_for_category_batch,
)
from app.features.memory import models as _memory_models # noqa: F401
from app.features.payment import models as _payment_models # noqa: F401
from app.features.story import models as _story_models # noqa: F401
from app.features.user import models as _user_models # noqa: F401
from app.features.user.models import User
from app.features.memoir.constants import memoir
from app.tasks.memoir_tasks import (
_persist_phase2_route_defer,
_wake_deferred_segments_for_category,
)
@pytest.fixture
def sqlite_session_factory():
engine = create_engine("sqlite:///:memory:", future=True)
Base.metadata.create_all(
engine,
tables=[
User.__table__,
Conversation.__table__,
Segment.__table__,
],
)
yield sessionmaker(bind=engine, expire_on_commit=False, future=True)
engine.dispose()
def _seed_user_segment(
db,
*,
user_id: str,
conversation_id: str,
segment_id: str,
text: str = "我童年的事情很短暂",
topic_category: str = "childhood",
) -> Segment:
if not db.get(User, user_id):
db.add(
User(
id=user_id,
phone=f"p-{user_id[:8]}",
password_hash="x",
nickname="t",
)
)
if not db.get(Conversation, conversation_id):
db.add(Conversation(id=conversation_id, user_id=user_id))
seg = Segment(
id=segment_id,
conversation_id=conversation_id,
user_input_text=text,
topic_category=topic_category,
narrated=False,
skip_narrative=False,
narrative_defer_count=0,
)
db.add(seg)
db.commit()
return seg
def _patch_pipeline(plan_return, decide_return):
"""统一 mock pipeline 内的 IO 与 LLM 依赖,便于聚焦路由分支。
返回 ``(context_manager, route_agent_mock)``;进入 context 后由 ``patch.multiple``
生成的 mock dict 作为 ``mocks`` 提供给测试用例配置返回值与断言。
"""
route_agent_mock = MagicMock()
route_agent_mock.plan_batch.return_value = plan_return
route_agent_mock.decide.return_value = decide_return
return (
patch.multiple(
"app.features.memoir.story_pipeline_sync",
list_active_stories_for_user_sync=DEFAULT,
StoryRouteAgent=DEFAULT,
NarrativeAgent=DEFAULT,
normalize_oral_for_memoir=DEFAULT,
ensure_chapter_story_link_sync=DEFAULT,
reorder_chapter_story_links_by_life_order_sync=DEFAULT,
mark_chapter_dirty_sync=DEFAULT,
chapter_needs_cover_enqueue=DEFAULT,
MemoirImageSettings=DEFAULT,
refresh_chapter_evidence_snapshot_with_retry_sync=DEFAULT,
create_story_with_version_sync=DEFAULT,
_ensure_chapter_record=DEFAULT,
),
route_agent_mock,
)
def _configure_pipeline_mocks(mocks: dict, route_agent_mock: MagicMock) -> None:
mocks["list_active_stories_for_user_sync"].return_value = []
mocks["StoryRouteAgent"].return_value = route_agent_mock
mocks["normalize_oral_for_memoir"].side_effect = lambda text, **_: text
mocks["chapter_needs_cover_enqueue"].return_value = False
mocks["MemoirImageSettings"].from_env.return_value = MagicMock(enabled=False)
def _empty_state() -> MemoirStateSchema:
return MemoirStateSchema(
stage_order=["childhood"],
current_stage="childhood",
covered_stages=[],
slots={},
)
@pytest.mark.parametrize("reason", ["no_llm", "parse_error", "invalid_target"])
def test_pipeline_defers_on_fallback_route_reason(reason: str) -> None:
"""单段路由 fallback 时不写 chapter/story返回 deferred 结果。"""
seg = SimpleNamespace(id="seg-defer-1", user_input_text="一句简短的口述")
decide_return = StoryRouteDecision(
decision="new_story",
new_story_title=None,
reason=reason,
)
cm, route_agent_mock = _patch_pipeline(
plan_return=None,
decide_return=decide_return,
)
with cm as mocks:
_configure_pipeline_mocks(mocks, route_agent_mock)
session = MagicMock()
exec_result = MagicMock()
exec_result.unique.return_value.scalar_one_or_none.return_value = None
session.execute.return_value = exec_result
result = run_story_pipeline_for_category_batch(
session,
user_id="user-defer",
chapter_category="childhood",
category_segments=[seg],
state=_empty_state(),
user_profile="",
user_birth_year=None,
llm=object(),
memory_evidence={
"relevant_chunks": [],
"relevant_summaries": [],
"relevant_facts": [],
"relevant_stories": [],
},
)
assert isinstance(result, StoryPipelineResult)
assert result.deferred is True
assert result.chapter is None
assert result.dispatch_ids == set()
assert result.defer_reason == reason
assert result.defer_segment_ids == ["seg-defer-1"]
mocks["_ensure_chapter_record"].assert_not_called()
mocks["create_story_with_version_sync"].assert_not_called()
mocks["mark_chapter_dirty_sync"].assert_not_called()
route_agent_mock.decide.assert_called_once()
def test_pipeline_does_not_defer_when_disabled(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""关闭开关后,旧行为:直接写 new_story不再延迟"""
monkeypatch.setattr(memoir, "route_defer_enabled", False)
seg = SimpleNamespace(id="seg-no-defer", user_input_text="一句简短的口述")
decide_return = StoryRouteDecision(
decision="new_story",
new_story_title=None,
reason="no_llm",
)
cm, route_agent_mock = _patch_pipeline(
plan_return=None,
decide_return=decide_return,
)
with cm as mocks:
_configure_pipeline_mocks(mocks, route_agent_mock)
chapter_stub = SimpleNamespace(id="chapter-1")
mocks["_ensure_chapter_record"].return_value = chapter_stub
story_stub = MagicMock()
story_stub.id = "story-x"
story_stub.current_version_id = None
mocks["create_story_with_version_sync"].return_value = story_stub
# NarrativeAgent.generate_narrative 必须返回有效 JSON
nac_instance = mocks["NarrativeAgent"].return_value
nac_instance.generate_narrative.return_value = (
'{"paragraphs": [{"content": "叙事正文段落足够长用于测试合并逻辑避免触发过短回退"}]}'
)
session = MagicMock()
exec_result = MagicMock()
exec_result.unique.return_value.scalar_one_or_none.return_value = None
session.execute.return_value = exec_result
result = run_story_pipeline_for_category_batch(
session,
user_id="user-no-defer",
chapter_category="childhood",
category_segments=[seg],
state=_empty_state(),
user_profile="",
user_birth_year=None,
llm=object(),
memory_evidence={
"relevant_chunks": [],
"relevant_summaries": [],
"relevant_facts": [],
"relevant_stories": [],
},
)
assert isinstance(result, StoryPipelineResult)
assert result.deferred is False
assert result.chapter is chapter_stub
mocks["_ensure_chapter_record"].assert_called_once()
def test_pipeline_returns_result_object_for_normal_path() -> None:
"""决策非 fallback 时pipeline 仍按原路径执行并返回 StoryPipelineResult。"""
seg = SimpleNamespace(id="seg-ok", user_input_text="一段足够长的童年口述用于测试正常写入路径")
decide_return = StoryRouteDecision(
decision="new_story",
new_story_title="一个童年故事的新标题",
reason="ok",
)
cm, route_agent_mock = _patch_pipeline(
plan_return=None,
decide_return=decide_return,
)
with cm as mocks:
_configure_pipeline_mocks(mocks, route_agent_mock)
chapter_stub = SimpleNamespace(id="chapter-ok")
mocks["_ensure_chapter_record"].return_value = chapter_stub
story_stub = MagicMock()
story_stub.id = "story-ok"
story_stub.current_version_id = None
mocks["create_story_with_version_sync"].return_value = story_stub
nac_instance = mocks["NarrativeAgent"].return_value
nac_instance.generate_narrative.return_value = (
'{"paragraphs": [{"content": "叙事正文段落足够长用于测试合并逻辑避免触发过短回退"}]}'
)
session = MagicMock()
exec_result = MagicMock()
exec_result.unique.return_value.scalar_one_or_none.return_value = None
session.execute.return_value = exec_result
result = run_story_pipeline_for_category_batch(
session,
user_id="user-ok",
chapter_category="childhood",
category_segments=[seg],
state=_empty_state(),
user_profile="",
user_birth_year=None,
llm=object(),
memory_evidence={
"relevant_chunks": [],
"relevant_summaries": [],
"relevant_facts": [],
"relevant_stories": [],
},
)
assert isinstance(result, StoryPipelineResult)
assert result.deferred is False
assert result.chapter is chapter_stub
def test_persist_phase2_route_defer_marks_segment_and_schedules_next(
sqlite_session_factory,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""首次延迟:写入 defer 元数据并安排下一次 timeout未达上限"""
monkeypatch.setattr(memoir, "route_defer_seconds", 30.0)
monkeypatch.setattr(memoir, "route_defer_max_attempts", 3)
db = sqlite_session_factory()
seg = _seed_user_segment(
db,
user_id="u-defer-1",
conversation_id=str(uuid.uuid4()),
segment_id="seg-defer-x1",
)
with patch(
"app.tasks.memoir_tasks._schedule_phase2_timeout",
return_value="task-id-next",
) as schedule_mock:
out = _persist_phase2_route_defer(
db,
user_id="u-defer-1",
chapter_category="childhood",
task_id="task-id-current",
memoir_correlation_id="cid-1",
defer_segment_ids=[seg.id],
defer_reason="no_llm",
phase2_started=0.0,
pipeline_elapsed=0.0,
lock_elapsed=0.0,
)
assert out["status"] == "deferred"
assert out["segments"] == 1
assert out["saturated_count"] == 0
schedule_mock.assert_called_once_with("u-defer-1", "childhood", "cid-1")
refreshed = db.execute(select(Segment).where(Segment.id == seg.id)).scalar_one()
assert refreshed.narrative_defer_count == 1
assert refreshed.narrative_defer_reason == "no_llm"
assert refreshed.narrative_deferred_until is not None
assert refreshed.narrative_last_attempt_at is not None
assert refreshed.narrated is False
assert refreshed.processed is False
def test_persist_phase2_route_defer_stops_scheduling_at_max_attempts(
sqlite_session_factory,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""达到 max_attempts 后不再继续派发 timeoutsegment 仍保留 defer 元数据。"""
monkeypatch.setattr(memoir, "route_defer_seconds", 30.0)
monkeypatch.setattr(memoir, "route_defer_max_attempts", 2)
db = sqlite_session_factory()
seg = _seed_user_segment(
db,
user_id="u-defer-max",
conversation_id=str(uuid.uuid4()),
segment_id="seg-defer-max-1",
)
seg.narrative_defer_count = 1
db.commit()
with patch(
"app.tasks.memoir_tasks._schedule_phase2_timeout",
return_value="should-not-be-called",
) as schedule_mock:
out = _persist_phase2_route_defer(
db,
user_id="u-defer-max",
chapter_category="childhood",
task_id="task-id-current",
memoir_correlation_id="cid-2",
defer_segment_ids=[seg.id],
defer_reason="parse_error",
phase2_started=0.0,
pipeline_elapsed=0.0,
lock_elapsed=0.0,
)
assert out["status"] == "deferred"
assert out["saturated_count"] == 1
schedule_mock.assert_not_called()
refreshed = db.execute(select(Segment).where(Segment.id == seg.id)).scalar_one()
assert refreshed.narrative_defer_count == 2
# 达上限后不设 deferred_until需要等待新素材唤醒此时 segment 仍可被下次 Phase2 消费
assert refreshed.narrative_deferred_until is None
assert refreshed.narrative_defer_reason == "parse_error"
def test_wake_deferred_segments_clears_defer_metadata(
sqlite_session_factory,
) -> None:
"""新素材到达时清空同类目下既有 defer 元数据,并保留另一类目不变。"""
db = sqlite_session_factory()
user_id = "u-wake"
conv_id = str(uuid.uuid4())
seg_a = _seed_user_segment(
db,
user_id=user_id,
conversation_id=conv_id,
segment_id="seg-wake-1",
topic_category="childhood",
)
seg_other = _seed_user_segment(
db,
user_id=user_id,
conversation_id=conv_id,
segment_id="seg-other",
topic_category="education",
)
seg_a.narrative_defer_count = 2
seg_a.narrative_defer_reason = "parse_error"
seg_a.narrative_deferred_until = datetime.now(timezone.utc) + timedelta(minutes=5)
seg_other.narrative_defer_count = 1
seg_other.narrative_defer_reason = "no_llm"
seg_other.narrative_deferred_until = datetime.now(timezone.utc) + timedelta(
minutes=5
)
db.commit()
woke = _wake_deferred_segments_for_category(db, user_id, "childhood")
db.commit()
refreshed_a = db.execute(
select(Segment).where(Segment.id == seg_a.id)
).scalar_one()
refreshed_other = db.execute(
select(Segment).where(Segment.id == seg_other.id)
).scalar_one()
assert woke == 1
assert refreshed_a.narrative_deferred_until is None
assert refreshed_a.narrative_defer_count == 0
assert refreshed_a.narrative_defer_reason is None
# 其它类目不应被波及
assert refreshed_other.narrative_deferred_until is not None
assert refreshed_other.narrative_defer_count == 1
assert refreshed_other.narrative_defer_reason == "no_llm"