from unittest.mock import MagicMock import pytest from app.tasks import memory_compaction_tasks as tasks def test_memory_compaction_sweep_continues_after_schedule_failure( monkeypatch: pytest.MonkeyPatch, ) -> None: monkeypatch.setattr(tasks.memory, "compaction_enabled", True) monkeypatch.setattr(tasks.memory, "compaction_sweep_recent_hours", 6) async def _fake_list(hours: int) -> list[str]: return ["u1", "u2", "u3"] monkeypatch.setattr( tasks, "_list_users_with_recent_chunks_async", _fake_list ) def fake_schedule(user_id: str, ctx: dict) -> None: if user_id == "u2": raise RuntimeError("boom") monkeypatch.setattr(tasks, "schedule_memory_compaction_run", fake_schedule) monkeypatch.setattr(tasks, "business_span", lambda *a, **k: MagicMock()) result = tasks.memory_compaction_sweep.run() assert result == {"scheduled": 2, "failed": 1, "hours": 6}