Files
life-echo/api/tests/test_memory_compaction_sweep.py

31 lines
944 B
Python
Raw Permalink Normal View History

from unittest.mock import MagicMock
import pytest
from app.tasks import memory_compaction_tasks as tasks
def test_memory_compaction_sweep_continues_after_schedule_failure(
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setattr(tasks.memory, "compaction_enabled", True)
monkeypatch.setattr(tasks.memory, "compaction_sweep_recent_hours", 6)
async def _fake_list(hours: int) -> list[str]:
return ["u1", "u2", "u3"]
monkeypatch.setattr(
tasks, "_list_users_with_recent_chunks_async", _fake_list
)
def fake_schedule(user_id: str, ctx: dict) -> None:
if user_id == "u2":
raise RuntimeError("boom")
monkeypatch.setattr(tasks, "schedule_memory_compaction_run", fake_schedule)
monkeypatch.setattr(tasks, "business_span", lambda *a, **k: MagicMock())
result = tasks.memory_compaction_sweep.run()
assert result == {"scheduled": 2, "failed": 1, "hours": 6}