配置 SSOT(TOML + .env) 统一错误契约 Auth 与事务边界 Redis / Celery 可靠性:业务 Redis(DB/0)与 Celery broker/backend(DB/1)显式拆分;连接池、sync client 可观测性(OpenTelemetry + LGTM)
439 lines
16 KiB
Python
439 lines
16 KiB
Python
"""Phase2 路由低置信延迟管线:deferred 池 / 唤醒 / 重试上限。"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import uuid
|
||
from datetime import datetime, timedelta, timezone
|
||
from types import SimpleNamespace
|
||
from unittest.mock import DEFAULT, MagicMock, patch
|
||
|
||
import pytest
|
||
from sqlalchemy import create_engine, select
|
||
from sqlalchemy.orm import sessionmaker
|
||
|
||
# 与 alembic/env.py 一致:注册全部 ORM,避免 relationship 解析失败
|
||
from app.agents.memoir.story_route_agent import StoryRouteDecision
|
||
from app.agents.state_schema import MemoirStateSchema
|
||
from app.core.config import settings
|
||
from app.core.db import Base
|
||
from app.features.asset import models as _asset_models # noqa: F401
|
||
from app.features.auth import models as _auth_models # noqa: F401
|
||
from app.features.conversation import models as _conv_models # noqa: F401
|
||
from app.features.conversation.models import Conversation, Segment
|
||
from app.features.memoir import models as _memoir_models # noqa: F401
|
||
from app.features.memoir.story_pipeline_sync import (
|
||
StoryPipelineResult,
|
||
run_story_pipeline_for_category_batch,
|
||
)
|
||
from app.features.memory import models as _memory_models # noqa: F401
|
||
from app.features.payment import models as _payment_models # noqa: F401
|
||
from app.features.story import models as _story_models # noqa: F401
|
||
from app.features.user import models as _user_models # noqa: F401
|
||
from app.features.user.models import User
|
||
from app.features.memoir.constants import memoir
|
||
from app.tasks.memoir_tasks import (
|
||
_persist_phase2_route_defer,
|
||
_wake_deferred_segments_for_category,
|
||
)
|
||
|
||
|
||
@pytest.fixture
|
||
def sqlite_session_factory():
|
||
engine = create_engine("sqlite:///:memory:", future=True)
|
||
Base.metadata.create_all(
|
||
engine,
|
||
tables=[
|
||
User.__table__,
|
||
Conversation.__table__,
|
||
Segment.__table__,
|
||
],
|
||
)
|
||
yield sessionmaker(bind=engine, expire_on_commit=False, future=True)
|
||
engine.dispose()
|
||
|
||
|
||
def _seed_user_segment(
|
||
db,
|
||
*,
|
||
user_id: str,
|
||
conversation_id: str,
|
||
segment_id: str,
|
||
text: str = "我童年的事情很短暂",
|
||
topic_category: str = "childhood",
|
||
) -> Segment:
|
||
if not db.get(User, user_id):
|
||
db.add(
|
||
User(
|
||
id=user_id,
|
||
phone=f"p-{user_id[:8]}",
|
||
password_hash="x",
|
||
nickname="t",
|
||
)
|
||
)
|
||
if not db.get(Conversation, conversation_id):
|
||
db.add(Conversation(id=conversation_id, user_id=user_id))
|
||
seg = Segment(
|
||
id=segment_id,
|
||
conversation_id=conversation_id,
|
||
user_input_text=text,
|
||
topic_category=topic_category,
|
||
narrated=False,
|
||
skip_narrative=False,
|
||
narrative_defer_count=0,
|
||
)
|
||
db.add(seg)
|
||
db.commit()
|
||
return seg
|
||
|
||
|
||
def _patch_pipeline(plan_return, decide_return):
|
||
"""统一 mock pipeline 内的 IO 与 LLM 依赖,便于聚焦路由分支。
|
||
|
||
返回 ``(context_manager, route_agent_mock)``;进入 context 后由 ``patch.multiple``
|
||
生成的 mock dict 作为 ``mocks`` 提供给测试用例配置返回值与断言。
|
||
"""
|
||
route_agent_mock = MagicMock()
|
||
route_agent_mock.plan_batch.return_value = plan_return
|
||
route_agent_mock.decide.return_value = decide_return
|
||
|
||
return (
|
||
patch.multiple(
|
||
"app.features.memoir.story_pipeline_sync",
|
||
list_active_stories_for_user_sync=DEFAULT,
|
||
StoryRouteAgent=DEFAULT,
|
||
NarrativeAgent=DEFAULT,
|
||
normalize_oral_for_memoir=DEFAULT,
|
||
ensure_chapter_story_link_sync=DEFAULT,
|
||
reorder_chapter_story_links_by_life_order_sync=DEFAULT,
|
||
mark_chapter_dirty_sync=DEFAULT,
|
||
chapter_needs_cover_enqueue=DEFAULT,
|
||
MemoirImageSettings=DEFAULT,
|
||
refresh_chapter_evidence_snapshot_with_retry_sync=DEFAULT,
|
||
create_story_with_version_sync=DEFAULT,
|
||
_ensure_chapter_record=DEFAULT,
|
||
),
|
||
route_agent_mock,
|
||
)
|
||
|
||
|
||
def _configure_pipeline_mocks(mocks: dict, route_agent_mock: MagicMock) -> None:
|
||
mocks["list_active_stories_for_user_sync"].return_value = []
|
||
mocks["StoryRouteAgent"].return_value = route_agent_mock
|
||
mocks["normalize_oral_for_memoir"].side_effect = lambda text, **_: text
|
||
mocks["chapter_needs_cover_enqueue"].return_value = False
|
||
mocks["MemoirImageSettings"].from_env.return_value = MagicMock(enabled=False)
|
||
|
||
|
||
def _empty_state() -> MemoirStateSchema:
|
||
return MemoirStateSchema(
|
||
stage_order=["childhood"],
|
||
current_stage="childhood",
|
||
covered_stages=[],
|
||
slots={},
|
||
)
|
||
|
||
|
||
@pytest.mark.parametrize("reason", ["no_llm", "parse_error", "invalid_target"])
|
||
def test_pipeline_defers_on_fallback_route_reason(reason: str) -> None:
|
||
"""单段路由 fallback 时不写 chapter/story,返回 deferred 结果。"""
|
||
seg = SimpleNamespace(id="seg-defer-1", user_input_text="一句简短的口述")
|
||
decide_return = StoryRouteDecision(
|
||
decision="new_story",
|
||
new_story_title=None,
|
||
reason=reason,
|
||
)
|
||
cm, route_agent_mock = _patch_pipeline(
|
||
plan_return=None,
|
||
decide_return=decide_return,
|
||
)
|
||
with cm as mocks:
|
||
_configure_pipeline_mocks(mocks, route_agent_mock)
|
||
session = MagicMock()
|
||
exec_result = MagicMock()
|
||
exec_result.unique.return_value.scalar_one_or_none.return_value = None
|
||
session.execute.return_value = exec_result
|
||
|
||
result = run_story_pipeline_for_category_batch(
|
||
session,
|
||
user_id="user-defer",
|
||
chapter_category="childhood",
|
||
category_segments=[seg],
|
||
state=_empty_state(),
|
||
user_profile="",
|
||
user_birth_year=None,
|
||
llm=object(),
|
||
memory_evidence={
|
||
"relevant_chunks": [],
|
||
"relevant_summaries": [],
|
||
"relevant_facts": [],
|
||
"relevant_stories": [],
|
||
},
|
||
)
|
||
|
||
assert isinstance(result, StoryPipelineResult)
|
||
assert result.deferred is True
|
||
assert result.chapter is None
|
||
assert result.dispatch_ids == set()
|
||
assert result.defer_reason == reason
|
||
assert result.defer_segment_ids == ["seg-defer-1"]
|
||
mocks["_ensure_chapter_record"].assert_not_called()
|
||
mocks["create_story_with_version_sync"].assert_not_called()
|
||
mocks["mark_chapter_dirty_sync"].assert_not_called()
|
||
route_agent_mock.decide.assert_called_once()
|
||
|
||
|
||
def test_pipeline_does_not_defer_when_disabled(
|
||
monkeypatch: pytest.MonkeyPatch,
|
||
) -> None:
|
||
"""关闭开关后,旧行为:直接写 new_story(不再延迟)。"""
|
||
monkeypatch.setattr(memoir, "route_defer_enabled", False)
|
||
|
||
seg = SimpleNamespace(id="seg-no-defer", user_input_text="一句简短的口述")
|
||
decide_return = StoryRouteDecision(
|
||
decision="new_story",
|
||
new_story_title=None,
|
||
reason="no_llm",
|
||
)
|
||
cm, route_agent_mock = _patch_pipeline(
|
||
plan_return=None,
|
||
decide_return=decide_return,
|
||
)
|
||
with cm as mocks:
|
||
_configure_pipeline_mocks(mocks, route_agent_mock)
|
||
chapter_stub = SimpleNamespace(id="chapter-1")
|
||
mocks["_ensure_chapter_record"].return_value = chapter_stub
|
||
story_stub = MagicMock()
|
||
story_stub.id = "story-x"
|
||
story_stub.current_version_id = None
|
||
mocks["create_story_with_version_sync"].return_value = story_stub
|
||
|
||
# NarrativeAgent.generate_narrative 必须返回有效 JSON
|
||
nac_instance = mocks["NarrativeAgent"].return_value
|
||
nac_instance.generate_narrative.return_value = (
|
||
'{"paragraphs": [{"content": "叙事正文段落足够长用于测试合并逻辑避免触发过短回退"}]}'
|
||
)
|
||
|
||
session = MagicMock()
|
||
exec_result = MagicMock()
|
||
exec_result.unique.return_value.scalar_one_or_none.return_value = None
|
||
session.execute.return_value = exec_result
|
||
|
||
result = run_story_pipeline_for_category_batch(
|
||
session,
|
||
user_id="user-no-defer",
|
||
chapter_category="childhood",
|
||
category_segments=[seg],
|
||
state=_empty_state(),
|
||
user_profile="",
|
||
user_birth_year=None,
|
||
llm=object(),
|
||
memory_evidence={
|
||
"relevant_chunks": [],
|
||
"relevant_summaries": [],
|
||
"relevant_facts": [],
|
||
"relevant_stories": [],
|
||
},
|
||
)
|
||
|
||
assert isinstance(result, StoryPipelineResult)
|
||
assert result.deferred is False
|
||
assert result.chapter is chapter_stub
|
||
mocks["_ensure_chapter_record"].assert_called_once()
|
||
|
||
|
||
def test_pipeline_returns_result_object_for_normal_path() -> None:
|
||
"""决策非 fallback 时,pipeline 仍按原路径执行并返回 StoryPipelineResult。"""
|
||
seg = SimpleNamespace(id="seg-ok", user_input_text="一段足够长的童年口述用于测试正常写入路径")
|
||
decide_return = StoryRouteDecision(
|
||
decision="new_story",
|
||
new_story_title="一个童年故事的新标题",
|
||
reason="ok",
|
||
)
|
||
cm, route_agent_mock = _patch_pipeline(
|
||
plan_return=None,
|
||
decide_return=decide_return,
|
||
)
|
||
with cm as mocks:
|
||
_configure_pipeline_mocks(mocks, route_agent_mock)
|
||
chapter_stub = SimpleNamespace(id="chapter-ok")
|
||
mocks["_ensure_chapter_record"].return_value = chapter_stub
|
||
story_stub = MagicMock()
|
||
story_stub.id = "story-ok"
|
||
story_stub.current_version_id = None
|
||
mocks["create_story_with_version_sync"].return_value = story_stub
|
||
|
||
nac_instance = mocks["NarrativeAgent"].return_value
|
||
nac_instance.generate_narrative.return_value = (
|
||
'{"paragraphs": [{"content": "叙事正文段落足够长用于测试合并逻辑避免触发过短回退"}]}'
|
||
)
|
||
|
||
session = MagicMock()
|
||
exec_result = MagicMock()
|
||
exec_result.unique.return_value.scalar_one_or_none.return_value = None
|
||
session.execute.return_value = exec_result
|
||
|
||
result = run_story_pipeline_for_category_batch(
|
||
session,
|
||
user_id="user-ok",
|
||
chapter_category="childhood",
|
||
category_segments=[seg],
|
||
state=_empty_state(),
|
||
user_profile="",
|
||
user_birth_year=None,
|
||
llm=object(),
|
||
memory_evidence={
|
||
"relevant_chunks": [],
|
||
"relevant_summaries": [],
|
||
"relevant_facts": [],
|
||
"relevant_stories": [],
|
||
},
|
||
)
|
||
|
||
assert isinstance(result, StoryPipelineResult)
|
||
assert result.deferred is False
|
||
assert result.chapter is chapter_stub
|
||
|
||
|
||
def test_persist_phase2_route_defer_marks_segment_and_schedules_next(
|
||
sqlite_session_factory,
|
||
monkeypatch: pytest.MonkeyPatch,
|
||
) -> None:
|
||
"""首次延迟:写入 defer 元数据并安排下一次 timeout(未达上限)。"""
|
||
monkeypatch.setattr(memoir, "route_defer_seconds", 30.0)
|
||
monkeypatch.setattr(memoir, "route_defer_max_attempts", 3)
|
||
|
||
db = sqlite_session_factory()
|
||
seg = _seed_user_segment(
|
||
db,
|
||
user_id="u-defer-1",
|
||
conversation_id=str(uuid.uuid4()),
|
||
segment_id="seg-defer-x1",
|
||
)
|
||
|
||
with patch(
|
||
"app.tasks.memoir_tasks._schedule_phase2_timeout",
|
||
return_value="task-id-next",
|
||
) as schedule_mock:
|
||
out = _persist_phase2_route_defer(
|
||
db,
|
||
user_id="u-defer-1",
|
||
chapter_category="childhood",
|
||
task_id="task-id-current",
|
||
memoir_correlation_id="cid-1",
|
||
defer_segment_ids=[seg.id],
|
||
defer_reason="no_llm",
|
||
phase2_started=0.0,
|
||
pipeline_elapsed=0.0,
|
||
lock_elapsed=0.0,
|
||
)
|
||
|
||
assert out["status"] == "deferred"
|
||
assert out["segments"] == 1
|
||
assert out["saturated_count"] == 0
|
||
schedule_mock.assert_called_once_with("u-defer-1", "childhood", "cid-1")
|
||
|
||
refreshed = db.execute(select(Segment).where(Segment.id == seg.id)).scalar_one()
|
||
assert refreshed.narrative_defer_count == 1
|
||
assert refreshed.narrative_defer_reason == "no_llm"
|
||
assert refreshed.narrative_deferred_until is not None
|
||
assert refreshed.narrative_last_attempt_at is not None
|
||
assert refreshed.narrated is False
|
||
assert refreshed.processed is False
|
||
|
||
|
||
def test_persist_phase2_route_defer_stops_scheduling_at_max_attempts(
|
||
sqlite_session_factory,
|
||
monkeypatch: pytest.MonkeyPatch,
|
||
) -> None:
|
||
"""达到 max_attempts 后不再继续派发 timeout,segment 仍保留 defer 元数据。"""
|
||
monkeypatch.setattr(memoir, "route_defer_seconds", 30.0)
|
||
monkeypatch.setattr(memoir, "route_defer_max_attempts", 2)
|
||
|
||
db = sqlite_session_factory()
|
||
seg = _seed_user_segment(
|
||
db,
|
||
user_id="u-defer-max",
|
||
conversation_id=str(uuid.uuid4()),
|
||
segment_id="seg-defer-max-1",
|
||
)
|
||
seg.narrative_defer_count = 1
|
||
db.commit()
|
||
|
||
with patch(
|
||
"app.tasks.memoir_tasks._schedule_phase2_timeout",
|
||
return_value="should-not-be-called",
|
||
) as schedule_mock:
|
||
out = _persist_phase2_route_defer(
|
||
db,
|
||
user_id="u-defer-max",
|
||
chapter_category="childhood",
|
||
task_id="task-id-current",
|
||
memoir_correlation_id="cid-2",
|
||
defer_segment_ids=[seg.id],
|
||
defer_reason="parse_error",
|
||
phase2_started=0.0,
|
||
pipeline_elapsed=0.0,
|
||
lock_elapsed=0.0,
|
||
)
|
||
|
||
assert out["status"] == "deferred"
|
||
assert out["saturated_count"] == 1
|
||
schedule_mock.assert_not_called()
|
||
|
||
refreshed = db.execute(select(Segment).where(Segment.id == seg.id)).scalar_one()
|
||
assert refreshed.narrative_defer_count == 2
|
||
# 达上限后不设 deferred_until,需要等待新素材唤醒;此时 segment 仍可被下次 Phase2 消费
|
||
assert refreshed.narrative_deferred_until is None
|
||
assert refreshed.narrative_defer_reason == "parse_error"
|
||
|
||
|
||
def test_wake_deferred_segments_clears_defer_metadata(
|
||
sqlite_session_factory,
|
||
) -> None:
|
||
"""新素材到达时清空同类目下既有 defer 元数据,并保留另一类目不变。"""
|
||
db = sqlite_session_factory()
|
||
user_id = "u-wake"
|
||
conv_id = str(uuid.uuid4())
|
||
seg_a = _seed_user_segment(
|
||
db,
|
||
user_id=user_id,
|
||
conversation_id=conv_id,
|
||
segment_id="seg-wake-1",
|
||
topic_category="childhood",
|
||
)
|
||
seg_other = _seed_user_segment(
|
||
db,
|
||
user_id=user_id,
|
||
conversation_id=conv_id,
|
||
segment_id="seg-other",
|
||
topic_category="education",
|
||
)
|
||
seg_a.narrative_defer_count = 2
|
||
seg_a.narrative_defer_reason = "parse_error"
|
||
seg_a.narrative_deferred_until = datetime.now(timezone.utc) + timedelta(minutes=5)
|
||
seg_other.narrative_defer_count = 1
|
||
seg_other.narrative_defer_reason = "no_llm"
|
||
seg_other.narrative_deferred_until = datetime.now(timezone.utc) + timedelta(
|
||
minutes=5
|
||
)
|
||
db.commit()
|
||
|
||
woke = _wake_deferred_segments_for_category(db, user_id, "childhood")
|
||
db.commit()
|
||
|
||
refreshed_a = db.execute(
|
||
select(Segment).where(Segment.id == seg_a.id)
|
||
).scalar_one()
|
||
refreshed_other = db.execute(
|
||
select(Segment).where(Segment.id == seg_other.id)
|
||
).scalar_one()
|
||
|
||
assert woke == 1
|
||
assert refreshed_a.narrative_deferred_until is None
|
||
assert refreshed_a.narrative_defer_count == 0
|
||
assert refreshed_a.narrative_defer_reason is None
|
||
# 其它类目不应被波及
|
||
assert refreshed_other.narrative_deferred_until is not None
|
||
assert refreshed_other.narrative_defer_count == 1
|
||
assert refreshed_other.narrative_defer_reason == "no_llm"
|