Files
life-echo/api/tests/evaluation/test_replay_router.py
Sully 53e0065e3e refactor(api): TOML 配置 SSOT、统一错误契约、Auth/事务加固与可观测性 (#33)
配置 SSOT(TOML + .env)
统一错误契约
Auth 与事务边界
Redis / Celery 可靠性:业务 Redis(DB/0)与 Celery broker/backend(DB/1)显式拆分;连接池、sync client
可观测性(OpenTelemetry + LGTM)
2026-05-22 13:44:50 +08:00

76 lines
2.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""回放 / 评审路由参数校验(最小 HTTP"""
import pytest
from tests.conftest import install_test_error_handlers
from httpx import ASGITransport, AsyncClient
from app.features.evaluation.internal_auth import get_internal_eval_principal
@pytest.mark.asyncio
async def test_replay_conversation_requires_fixture_or_utterances(
monkeypatch: pytest.MonkeyPatch,
) -> None:
from fastapi import FastAPI
monkeypatch.setattr(
"app.core.config.settings.internal_eval_api_key",
"secret",
raising=False,
)
from app.features.evaluation.router import router
app = install_test_error_handlers(FastAPI())
app.include_router(router, prefix="/internal/api/evaluation")
async def _override_auth():
from app.features.evaluation.internal_auth import InternalEvalPrincipal
return InternalEvalPrincipal()
app.dependency_overrides[get_internal_eval_principal] = _override_auth
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://t") as client:
r = await client.post(
"/internal/api/evaluation/replay/conversation",
headers={"X-Internal-Eval-Key": "secret"},
json={"conversation_id": "00000000-0000-0000-0000-000000000001"},
)
assert r.status_code == 400
@pytest.mark.asyncio
async def test_replay_conversation_rejects_both_fixture_and_utterances(
monkeypatch: pytest.MonkeyPatch,
) -> None:
from fastapi import FastAPI
monkeypatch.setattr(
"app.core.config.settings.internal_eval_api_key",
"secret",
raising=False,
)
from app.features.evaluation.router import router
app = install_test_error_handlers(FastAPI())
app.include_router(router, prefix="/internal/api/evaluation")
async def _override_auth():
from app.features.evaluation.internal_auth import InternalEvalPrincipal
return InternalEvalPrincipal()
app.dependency_overrides[get_internal_eval_principal] = _override_auth
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://t") as client:
r = await client.post(
"/internal/api/evaluation/replay/conversation",
headers={"X-Internal-Eval-Key": "secret"},
json={
"conversation_id": "00000000-0000-0000-0000-000000000001",
"fixture_filename": "x.md",
"user_utterances": ["a"],
},
)
assert r.status_code == 400