配置 SSOT(TOML + .env) 统一错误契约 Auth 与事务边界 Redis / Celery 可靠性:业务 Redis(DB/0)与 Celery broker/backend(DB/1)显式拆分;连接池、sync client 可观测性(OpenTelemetry + LGTM)
72 lines
2.1 KiB
Python
72 lines
2.1 KiB
Python
"""内部路由在未配密钥时应 503。"""
|
|
|
|
import pytest
|
|
from httpx import ASGITransport, AsyncClient
|
|
|
|
from app.features.evaluation.internal_auth import get_internal_eval_principal
|
|
from tests.conftest import install_test_error_handlers
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_internal_eval_list_fixtures_requires_config(
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
):
|
|
from fastapi import FastAPI
|
|
|
|
monkeypatch.setattr(
|
|
"app.core.config.settings.internal_eval_api_key",
|
|
"",
|
|
raising=False,
|
|
)
|
|
from app.features.evaluation.router import router
|
|
|
|
app = install_test_error_handlers(FastAPI())
|
|
app.include_router(router, prefix="/internal/api/evaluation")
|
|
transport = ASGITransport(app=app)
|
|
async with AsyncClient(transport=transport, base_url="http://t") as client:
|
|
r = await client.get("/internal/api/evaluation/fixtures/user-exports")
|
|
assert r.status_code == 503
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_internal_eval_with_override_lists_fixtures(
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
):
|
|
from fastapi import FastAPI
|
|
|
|
monkeypatch.setattr(
|
|
"app.core.config.settings.internal_eval_api_key",
|
|
"secret",
|
|
raising=False,
|
|
)
|
|
|
|
def _empty_fixtures() -> list[str]:
|
|
return []
|
|
|
|
monkeypatch.setattr(
|
|
"app.features.evaluation.admin_service.list_user_export_md_filenames",
|
|
_empty_fixtures,
|
|
raising=False,
|
|
)
|
|
|
|
from app.features.evaluation.router import router
|
|
|
|
app = install_test_error_handlers(FastAPI())
|
|
app.include_router(router, prefix="/internal/api/evaluation")
|
|
|
|
async def _override_auth():
|
|
from app.features.evaluation.internal_auth import InternalEvalPrincipal
|
|
|
|
return InternalEvalPrincipal()
|
|
|
|
app.dependency_overrides[get_internal_eval_principal] = _override_auth
|
|
|
|
transport = ASGITransport(app=app)
|
|
async with AsyncClient(transport=transport, base_url="http://t") as client:
|
|
r = await client.get(
|
|
"/internal/api/evaluation/fixtures/user-exports",
|
|
headers={"X-Internal-Eval-Key": "secret"},
|
|
)
|
|
assert r.status_code == 200
|
|
assert r.json() == {"items": []}
|