"""BackgroundTaskRunner:字数门闸、超时、flush(纯函数 + 异步 mock)。""" from __future__ import annotations import asyncio from unittest.mock import AsyncMock, patch import pytest from app.features.memoir import background_runner as br from app.features.memoir.constants import memoir def test_batch_ready_for_submit_min_chars_zero() -> None: assert br._batch_ready_for_submit( min_chars=0, max_wait_seconds=60.0, total_text_chars=0, elapsed_seconds=0.0, ) def test_batch_ready_for_submit_chars_met() -> None: assert br._batch_ready_for_submit( min_chars=50, max_wait_seconds=60.0, total_text_chars=50, elapsed_seconds=1.0, ) def test_batch_ready_for_submit_not_ready() -> None: assert not br._batch_ready_for_submit( min_chars=50, max_wait_seconds=60.0, total_text_chars=10, elapsed_seconds=5.0, ) def test_batch_ready_for_submit_max_wait_elapsed() -> None: assert br._batch_ready_for_submit( min_chars=50, max_wait_seconds=60.0, total_text_chars=10, elapsed_seconds=60.0, ) def test_next_retry_sleep_seconds() -> None: assert br._next_retry_sleep_seconds(5.0, 60.0, 1.0) == 5.0 assert br._next_retry_sleep_seconds(5.0, 60.0, 58.0) == 2.0 assert br._next_retry_sleep_seconds(5.0, 60.0, 60.0) == 0.0 @pytest.mark.asyncio async def test_flush_pending_submits_without_gate( monkeypatch: pytest.MonkeyPatch, ) -> None: monkeypatch.setattr(memoir, "segment_batch_min_chars", 9999) monkeypatch.setattr(memoir, "segment_batch_max_wait_seconds", 9999.0) submitted: list[tuple[str, list[str]]] = [] async def fake_submit(uid: str, ids: list[str]) -> str: submitted.append((uid, ids)) return "tid" runner = br.BackgroundTaskRunner(debounce_seconds=30) uid = "u1" runner._batch[uid] = br._MemoirBatchState( segment_ids=["s1", "s2"], total_text_chars=3, first_queued_monotonic=0.0, ) with ( patch.object(runner, "_submit_task", new=AsyncMock(side_effect=fake_submit)), patch.object( runner, "_flush_pending_phase2", new=AsyncMock(return_value=None), ), ): await runner.flush_pending(uid) assert submitted == [("u1", ["s1", "s2"])] assert uid not in runner._batch @pytest.mark.asyncio async def test_flush_pending_merges_batch_and_extra_deduped( monkeypatch: pytest.MonkeyPatch, ) -> None: monkeypatch.setattr(memoir, "segment_batch_min_chars", 9999) monkeypatch.setattr(memoir, "segment_batch_max_wait_seconds", 9999.0) submitted: list[tuple[str, list[str]]] = [] async def fake_submit(uid: str, ids: list[str]) -> str: submitted.append((uid, ids)) return "tid" runner = br.BackgroundTaskRunner(debounce_seconds=30) uid = "u1" runner._batch[uid] = br._MemoirBatchState( segment_ids=["s1", "s2"], total_text_chars=3, first_queued_monotonic=0.0, ) with ( patch.object(runner, "_submit_task", new=AsyncMock(side_effect=fake_submit)), patch.object( runner, "_flush_pending_phase2", new=AsyncMock(return_value=None), ), ): await runner.flush_pending(uid, extra_segment_ids=["s2", "s3", "s1"]) assert submitted == [("u1", ["s1", "s2", "s3"])] assert uid not in runner._batch @pytest.mark.asyncio async def test_queue_message_min_chars_zero_submits_after_debounce( monkeypatch: pytest.MonkeyPatch, ) -> None: monkeypatch.setattr(memoir, "segment_batch_min_chars", 0) monkeypatch.setattr(memoir, "segment_batch_max_wait_seconds", 60.0) submitted: list[tuple[str, list[str]]] = [] async def fake_submit(uid: str, ids: list[str]) -> str: submitted.append((uid, ids)) return "tid" runner = br.BackgroundTaskRunner(debounce_seconds=0) with patch.object(runner, "_submit_task", new=AsyncMock(side_effect=fake_submit)): await runner.queue_message("u1", "seg-a", text_char_count=0) await asyncio.sleep(0.05) assert submitted and submitted[0][1] == ["seg-a"] @pytest.mark.asyncio async def test_queue_message_not_ready_then_max_wait_submits( monkeypatch: pytest.MonkeyPatch, ) -> None: monkeypatch.setattr(memoir, "segment_batch_min_chars", 100) monkeypatch.setattr(memoir, "segment_batch_max_wait_seconds", 0.12) submitted: list[tuple[str, list[str]]] = [] async def fake_submit(uid: str, ids: list[str]) -> str: submitted.append((uid, ids)) return "tid" # debounce 须 >0,否则 retry sleep 为 0 会误走「立即提交」分支 runner = br.BackgroundTaskRunner(debounce_seconds=0.02) with patch.object(runner, "_submit_task", new=AsyncMock(side_effect=fake_submit)): await runner.queue_message("u1", "seg-a", text_char_count=5) await asyncio.sleep(0.2) assert submitted and submitted[0][1] == ["seg-a"] @pytest.mark.asyncio async def test_queue_message_not_ready_before_debounce_no_submit( monkeypatch: pytest.MonkeyPatch, ) -> None: monkeypatch.setattr(memoir, "segment_batch_min_chars", 100) monkeypatch.setattr(memoir, "segment_batch_max_wait_seconds", 60.0) submitted: list[tuple[str, list[str]]] = [] async def fake_submit(uid: str, ids: list[str]) -> str: submitted.append((uid, ids)) return "tid" runner = br.BackgroundTaskRunner(debounce_seconds=0.5) with patch.object(runner, "_submit_task", new=AsyncMock(side_effect=fake_submit)): await runner.queue_message("u1", "seg-a", text_char_count=5) await asyncio.sleep(0.05) assert submitted == [] @pytest.mark.asyncio async def test_queue_message_chars_met_submits_after_debounce( monkeypatch: pytest.MonkeyPatch, ) -> None: monkeypatch.setattr(memoir, "segment_batch_min_chars", 10) monkeypatch.setattr(memoir, "segment_batch_max_wait_seconds", 60.0) submitted: list[tuple[str, list[str]]] = [] async def fake_submit(uid: str, ids: list[str]) -> str: submitted.append((uid, ids)) return "tid" runner = br.BackgroundTaskRunner(debounce_seconds=0) with patch.object(runner, "_submit_task", new=AsyncMock(side_effect=fake_submit)): await runner.queue_message("u1", "seg-long", text_char_count=50) await asyncio.sleep(0.05) assert submitted and submitted[0][1] == ["seg-long"]