Files
operating-room-monitor-server/tests/test_archive_persister.py
Kevin 8a4bad99d3 feat: 配置写死与 baked 模块,Alembic 建表,百度仅 BAIDU_*
- 新增 app/baked/algorithm|pipeline,非部署参数不再走 env;Settings 保留 DB/HTTP/RTSP/海康/百度/MinIO/Demo
- 移除 init_db_schema 与 reload 配置;main 仅 check_database;start*.sh 在 uvicorn 前执行 alembic upgrade head
- 依赖 psycopg[binary] 供 Alembic 同步 URL;alembic/env 注释与预发清单更新
- 撕段门控消费管线、各视频/语音/归档调用改为 baked
- 百度环境变量仅 BAIDU_APP_ID、BAIDU_API_KEY、BAIDU_SECRET_KEY 与 BAIDU_* 超时/ASR;人脸脚本与 baidu_speech 文案同步
- 全量单测与 .env.example 更新;.gitignore 忽略 refs/(本地权重/视频不入库)

Made-with: Cursor
2026-04-24 15:33:22 +08:00

102 lines
3.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""ArchivePersister指数退避、重试上限与 durable fallback 恢复。"""
from __future__ import annotations
import json
from datetime import datetime, timezone
import pytest
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from app.baked import pipeline as bp
from app.domain.consumption import SurgeryConsumptionStored
from app.repositories.surgery_results import SurgeryResultRepository
from app.services.video.archive_persister import ArchivePersister
class _AlwaysFailRepo(SurgeryResultRepository):
def __init__(self) -> None:
super().__init__()
self.calls = 0
async def save_final_result(self, session: AsyncSession, **kwargs: object) -> None:
self.calls += 1
raise RuntimeError("db down")
def _detail(item_id: str = "纱布") -> SurgeryConsumptionStored:
return SurgeryConsumptionStored(
item_id=item_id,
item_name=item_id,
qty=1,
doctor_id="vision",
timestamp=datetime(2026, 4, 23, 12, 0, tzinfo=timezone.utc),
source="vision",
)
@pytest.mark.asyncio
async def test_persist_or_archive_writes_durable_fallback(
tmp_path,
sqlite_session_factory: async_sessionmaker[AsyncSession],
monkeypatch: pytest.MonkeyPatch,
) -> None:
fallback_dir = tmp_path / "pending_archive"
monkeypatch.setattr(bp, "ARCHIVE_PERSIST_DURABLE_FALLBACK_DIR", str(fallback_dir))
repo = _AlwaysFailRepo()
persister = ArchivePersister(
repository=repo,
session_factory=sqlite_session_factory,
)
ok = await persister.persist_or_archive("abc123", [_detail("纱布")])
assert ok is False
path = fallback_dir / "abc123.json"
assert path.exists()
payload = json.loads(path.read_text(encoding="utf-8"))
assert payload["surgery_id"] == "abc123"
assert payload["details"][0]["item_id"] == "纱布"
assert persister.archived_details("abc123") is not None
@pytest.mark.asyncio
async def test_recover_from_durable_fallback_reloads_pending_archive(
tmp_path,
sqlite_session_factory: async_sessionmaker[AsyncSession],
monkeypatch: pytest.MonkeyPatch,
) -> None:
fallback_dir = tmp_path / "pending_archive"
fallback_dir.mkdir()
payload = {
"surgery_id": "recov01",
"saved_at": "2026-04-23T08:00:00+00:00",
"details": [
{
"item_id": "缝线",
"item_name": "缝线",
"qty": 1,
"doctor_id": "vision",
"timestamp": "2026-04-23T08:00:00+00:00",
"source": "vision",
}
],
}
(fallback_dir / "recov01.json").write_text(
json.dumps(payload, ensure_ascii=False), encoding="utf-8"
)
monkeypatch.setattr(bp, "ARCHIVE_PERSIST_DURABLE_FALLBACK_DIR", str(fallback_dir))
persister = ArchivePersister(
repository=SurgeryResultRepository(),
session_factory=sqlite_session_factory,
)
loaded = await persister.recover_from_durable_fallback()
assert loaded == 1
details = persister.archived_details("recov01")
assert details is not None
assert details[0].item_id == "缝线"
# 下一次 retry 应成功落库并清理内存 + durable 文件。
ok = await persister.try_persist_archive("recov01")
assert ok is True
assert persister.archived_details("recov01") is None
assert not (fallback_dir / "recov01.json").exists()