Files
life-echo/api/tests/test_memoir_ingest_scheduler.py
Kevin ac436b87a2 feat(api): 收敛对话与记忆流程边界,引入 LLM 网关与专用服务
- MemoryService 异步路径委托 MemoryIngestService / MemoryRetrievalService;富化派发经 MemoryEnrichmentScheduler
- WebSocket pipeline 经 ChatTurnService 与显式 DTO 编排单轮对话;回忆录片段入队由 MemoirIngestScheduler 封装
- 新增 LlmGateway(LlmUseCase),各 agent、任务与适配器对齐 ports
- 补充 memory 提示适配、runtime 类型、memory-retrieval 文档、ai-touchpoints 说明与扫描脚本及配套测试

Made-with: Cursor
2026-04-30 09:17:01 +08:00

66 lines
1.7 KiB
Python

from __future__ import annotations
import pytest
from app.features.memoir.ingest_scheduler import MemoirIngestScheduler
class _FakeRunner:
def __init__(self) -> None:
self.queued: list[tuple[str, str, int]] = []
self.flushed: list[tuple[str, list[str]]] = []
async def queue_message(
self,
user_id: str,
segment_id: str,
*,
text_char_count: int = 0,
) -> None:
self.queued.append((user_id, segment_id, text_char_count))
async def flush_pending(
self,
user_id: str,
*,
extra_segment_ids: list[str] | None = None,
) -> str:
self.flushed.append((user_id, list(extra_segment_ids or [])))
return "task-1"
@pytest.mark.asyncio
async def test_queue_segment_returns_visible_phase_plan() -> None:
runner = _FakeRunner()
scheduler = MemoirIngestScheduler(runner=runner)
plan = await scheduler.queue_segment(
"user-1",
"seg-1",
text_char_count=42,
trigger="evaluation_replay",
)
assert runner.queued == [("user-1", "seg-1", 42)]
assert plan.user_id == "user-1"
assert plan.segment_ids == ("seg-1",)
assert plan.trigger == "evaluation_replay"
@pytest.mark.asyncio
async def test_flush_pending_returns_plan_and_task_id() -> None:
runner = _FakeRunner()
scheduler = MemoirIngestScheduler(runner=runner)
plan, task_id = await scheduler.flush_pending(
"user-1",
extra_segment_ids=["seg-1", "seg-2"],
trigger="conversation_end",
)
assert runner.flushed == [("user-1", ["seg-1", "seg-2"])]
assert task_id == "task-1"
assert plan.user_id == "user-1"
assert plan.segment_ids == ("seg-1", "seg-2")
assert plan.trigger == "conversation_end"