"""ArchivePersister:指数退避、重试上限与 durable fallback 恢复。""" from __future__ import annotations import json from datetime import datetime, timezone import pytest from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker from app.baked import pipeline as bp from app.domain.consumption import SurgeryConsumptionStored from app.repositories.surgery_results import SurgeryResultRepository from app.services.video.archive_persister import ArchivePersister class _AlwaysFailRepo(SurgeryResultRepository): def __init__(self) -> None: super().__init__() self.calls = 0 async def save_final_result(self, session: AsyncSession, **kwargs: object) -> None: self.calls += 1 raise RuntimeError("db down") def _detail(item_id: str = "纱布") -> SurgeryConsumptionStored: return SurgeryConsumptionStored( item_id=item_id, item_name=item_id, qty=1, doctor_id="vision", timestamp=datetime(2026, 4, 23, 12, 0, tzinfo=timezone.utc), source="vision", ) @pytest.mark.asyncio async def test_persist_or_archive_writes_durable_fallback( tmp_path, sqlite_session_factory: async_sessionmaker[AsyncSession], monkeypatch: pytest.MonkeyPatch, ) -> None: fallback_dir = tmp_path / "pending_archive" monkeypatch.setattr(bp, "ARCHIVE_PERSIST_DURABLE_FALLBACK_DIR", str(fallback_dir)) repo = _AlwaysFailRepo() persister = ArchivePersister( repository=repo, session_factory=sqlite_session_factory, ) ok = await persister.persist_or_archive("abc123", [_detail("纱布")]) assert ok is False path = fallback_dir / "abc123.json" assert path.exists() payload = json.loads(path.read_text(encoding="utf-8")) assert payload["surgery_id"] == "abc123" assert payload["details"][0]["item_id"] == "纱布" assert persister.archived_details("abc123") is not None @pytest.mark.asyncio async def test_recover_from_durable_fallback_reloads_pending_archive( tmp_path, sqlite_session_factory: async_sessionmaker[AsyncSession], monkeypatch: pytest.MonkeyPatch, ) -> None: fallback_dir = tmp_path / "pending_archive" fallback_dir.mkdir() payload = { "surgery_id": "recov01", "saved_at": "2026-04-23T08:00:00+00:00", "details": [ { "item_id": "缝线", "item_name": "缝线", "qty": 1, "doctor_id": "vision", "timestamp": "2026-04-23T08:00:00+00:00", "source": "vision", } ], } (fallback_dir / "recov01.json").write_text( json.dumps(payload, ensure_ascii=False), encoding="utf-8" ) monkeypatch.setattr(bp, "ARCHIVE_PERSIST_DURABLE_FALLBACK_DIR", str(fallback_dir)) persister = ArchivePersister( repository=SurgeryResultRepository(), session_factory=sqlite_session_factory, ) loaded = await persister.recover_from_durable_fallback() assert loaded == 1 details = persister.archived_details("recov01") assert details is not None assert details[0].item_id == "缝线" # 下一次 retry 应成功落库并清理内存 + durable 文件。 ok = await persister.try_persist_archive("recov01") assert ok is True assert persister.archived_details("recov01") is None assert not (fallback_dir / "recov01.json").exists()