Files
life-echo/api/tests/test_memory_compaction_sweep.py
Sully 53e0065e3e refactor(api): TOML 配置 SSOT、统一错误契约、Auth/事务加固与可观测性 (#33)
配置 SSOT(TOML + .env)
统一错误契约
Auth 与事务边界
Redis / Celery 可靠性:业务 Redis(DB/0)与 Celery broker/backend(DB/1)显式拆分;连接池、sync client
可观测性(OpenTelemetry + LGTM)
2026-05-22 13:44:50 +08:00

31 lines
944 B
Python

from unittest.mock import MagicMock
import pytest
from app.tasks import memory_compaction_tasks as tasks
def test_memory_compaction_sweep_continues_after_schedule_failure(
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setattr(tasks.memory, "compaction_enabled", True)
monkeypatch.setattr(tasks.memory, "compaction_sweep_recent_hours", 6)
async def _fake_list(hours: int) -> list[str]:
return ["u1", "u2", "u3"]
monkeypatch.setattr(
tasks, "_list_users_with_recent_chunks_async", _fake_list
)
def fake_schedule(user_id: str, ctx: dict) -> None:
if user_id == "u2":
raise RuntimeError("boom")
monkeypatch.setattr(tasks, "schedule_memory_compaction_run", fake_schedule)
monkeypatch.setattr(tasks, "business_span", lambda *a, **k: MagicMock())
result = tasks.memory_compaction_sweep.run()
assert result == {"scheduled": 2, "failed": 1, "hours": 6}