"""memoir-phase1-ready internal 路由(依赖注入替身)。""" from datetime import datetime, timezone import pytest from tests.conftest import install_test_error_handlers from httpx import ASGITransport, AsyncClient from app.features.evaluation.internal_auth import get_internal_eval_principal from app.features.evaluation.schemas import MemoirPhase1ReadyOut @pytest.mark.asyncio async def test_memoir_phase1_ready_returns_bundle( monkeypatch: pytest.MonkeyPatch, ) -> None: from fastapi import FastAPI monkeypatch.setattr( "app.core.config.settings.internal_eval_api_key", "secret", raising=False, ) from app.features.evaluation.deps import get_memoir_readiness_service from app.features.evaluation.router import router class _Fake: async def memoir_phase1_ready_for_segments( self, *, conversation_id: str, segment_ids: list[str] ) -> MemoirPhase1ReadyOut: return MemoirPhase1ReadyOut( ready=True, checked_segment_ids=list(segment_ids), pending_segment_ids=[], ) app = install_test_error_handlers(FastAPI()) app.include_router(router, prefix="/internal/api/evaluation") async def _override_auth(): from app.features.evaluation.internal_auth import InternalEvalPrincipal return InternalEvalPrincipal() app.dependency_overrides[get_internal_eval_principal] = _override_auth app.dependency_overrides[get_memoir_readiness_service] = lambda: _Fake() transport = ASGITransport(app=app) async with AsyncClient(transport=transport, base_url="http://t") as client: r = await client.get( "/internal/api/evaluation/sessions/cid-a/memoir-phase1-ready", headers={"X-Internal-Eval-Key": "secret"}, params=[("segment_ids", "s1"), ("segment_ids", "s2")], ) assert r.status_code == 200 body = r.json() assert body["ready"] is True assert body["checked_segment_ids"] == ["s1", "s2"] assert body["pending_segment_ids"] == [] @pytest.mark.asyncio async def test_memoir_phase1_ready_404_propagates( monkeypatch: pytest.MonkeyPatch, ) -> None: from fastapi import FastAPI monkeypatch.setattr( "app.core.config.settings.internal_eval_api_key", "secret", raising=False, ) from app.features.evaluation.deps import get_memoir_readiness_service from app.features.evaluation.errors import EvaluationNotFoundError from app.features.evaluation.router import router class _Fake: async def memoir_phase1_ready_for_segments( self, *, conversation_id: str, segment_ids: list[str] ) -> MemoirPhase1ReadyOut: raise EvaluationNotFoundError("conversation not found") app = install_test_error_handlers(FastAPI()) app.include_router(router, prefix="/internal/api/evaluation") async def _override_auth(): from app.features.evaluation.internal_auth import InternalEvalPrincipal return InternalEvalPrincipal() app.dependency_overrides[get_internal_eval_principal] = _override_auth app.dependency_overrides[get_memoir_readiness_service] = lambda: _Fake() transport = ASGITransport(app=app) async with AsyncClient(transport=transport, base_url="http://t") as client: r = await client.get( "/internal/api/evaluation/sessions/missing/memoir-phase1-ready", headers={"X-Internal-Eval-Key": "secret"}, params=[("segment_ids", "s1")], ) assert r.status_code == 404