Files
life-echo/api/tests/test_background_runner.py

208 lines
6.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""BackgroundTaskRunner字数门闸、超时、flush纯函数 + 异步 mock"""
from __future__ import annotations
import asyncio
from unittest.mock import AsyncMock, patch
import pytest
from app.features.memoir import background_runner as br
def test_batch_ready_for_submit_min_chars_zero() -> None:
assert br._batch_ready_for_submit(
min_chars=0,
max_wait_seconds=60.0,
total_text_chars=0,
elapsed_seconds=0.0,
)
def test_batch_ready_for_submit_chars_met() -> None:
assert br._batch_ready_for_submit(
min_chars=50,
max_wait_seconds=60.0,
total_text_chars=50,
elapsed_seconds=1.0,
)
def test_batch_ready_for_submit_not_ready() -> None:
assert not br._batch_ready_for_submit(
min_chars=50,
max_wait_seconds=60.0,
total_text_chars=10,
elapsed_seconds=5.0,
)
def test_batch_ready_for_submit_max_wait_elapsed() -> None:
assert br._batch_ready_for_submit(
min_chars=50,
max_wait_seconds=60.0,
total_text_chars=10,
elapsed_seconds=60.0,
)
def test_next_retry_sleep_seconds() -> None:
assert br._next_retry_sleep_seconds(5.0, 60.0, 1.0) == 5.0
assert br._next_retry_sleep_seconds(5.0, 60.0, 58.0) == 2.0
assert br._next_retry_sleep_seconds(5.0, 60.0, 60.0) == 0.0
@pytest.mark.asyncio
async def test_flush_pending_submits_without_gate(
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setattr(br.settings, "memoir_segment_batch_min_chars", 9999)
monkeypatch.setattr(br.settings, "memoir_segment_batch_max_wait_seconds", 9999.0)
submitted: list[tuple[str, list[str]]] = []
async def fake_submit(uid: str, ids: list[str]) -> str:
submitted.append((uid, ids))
return "tid"
runner = br.BackgroundTaskRunner(debounce_seconds=30)
uid = "u1"
runner._batch[uid] = br._MemoirBatchState(
segment_ids=["s1", "s2"],
total_text_chars=3,
first_queued_monotonic=0.0,
)
with (
patch.object(runner, "_submit_task", new=AsyncMock(side_effect=fake_submit)),
patch.object(
runner,
"_flush_pending_phase2",
new=AsyncMock(return_value=None),
),
):
await runner.flush_pending(uid)
assert submitted == [("u1", ["s1", "s2"])]
assert uid not in runner._batch
@pytest.mark.asyncio
async def test_flush_pending_merges_batch_and_extra_deduped(
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setattr(br.settings, "memoir_segment_batch_min_chars", 9999)
monkeypatch.setattr(br.settings, "memoir_segment_batch_max_wait_seconds", 9999.0)
submitted: list[tuple[str, list[str]]] = []
async def fake_submit(uid: str, ids: list[str]) -> str:
submitted.append((uid, ids))
return "tid"
runner = br.BackgroundTaskRunner(debounce_seconds=30)
uid = "u1"
runner._batch[uid] = br._MemoirBatchState(
segment_ids=["s1", "s2"],
total_text_chars=3,
first_queued_monotonic=0.0,
)
with (
patch.object(runner, "_submit_task", new=AsyncMock(side_effect=fake_submit)),
patch.object(
runner,
"_flush_pending_phase2",
new=AsyncMock(return_value=None),
),
):
await runner.flush_pending(uid, extra_segment_ids=["s2", "s3", "s1"])
assert submitted == [("u1", ["s1", "s2", "s3"])]
assert uid not in runner._batch
@pytest.mark.asyncio
async def test_queue_message_min_chars_zero_submits_after_debounce(
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setattr(br.settings, "memoir_segment_batch_min_chars", 0)
monkeypatch.setattr(br.settings, "memoir_segment_batch_max_wait_seconds", 60.0)
submitted: list[tuple[str, list[str]]] = []
async def fake_submit(uid: str, ids: list[str]) -> str:
submitted.append((uid, ids))
return "tid"
runner = br.BackgroundTaskRunner(debounce_seconds=0)
with patch.object(runner, "_submit_task", new=AsyncMock(side_effect=fake_submit)):
await runner.queue_message("u1", "seg-a", text_char_count=0)
await asyncio.sleep(0.05)
assert submitted and submitted[0][1] == ["seg-a"]
@pytest.mark.asyncio
async def test_queue_message_not_ready_then_max_wait_submits(
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setattr(br.settings, "memoir_segment_batch_min_chars", 100)
monkeypatch.setattr(br.settings, "memoir_segment_batch_max_wait_seconds", 0.12)
submitted: list[tuple[str, list[str]]] = []
async def fake_submit(uid: str, ids: list[str]) -> str:
submitted.append((uid, ids))
return "tid"
# debounce 须 >0否则 retry sleep 为 0 会误走「立即提交」分支
runner = br.BackgroundTaskRunner(debounce_seconds=0.02)
with patch.object(runner, "_submit_task", new=AsyncMock(side_effect=fake_submit)):
await runner.queue_message("u1", "seg-a", text_char_count=5)
await asyncio.sleep(0.2)
assert submitted and submitted[0][1] == ["seg-a"]
@pytest.mark.asyncio
async def test_queue_message_not_ready_before_debounce_no_submit(
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setattr(br.settings, "memoir_segment_batch_min_chars", 100)
monkeypatch.setattr(br.settings, "memoir_segment_batch_max_wait_seconds", 60.0)
submitted: list[tuple[str, list[str]]] = []
async def fake_submit(uid: str, ids: list[str]) -> str:
submitted.append((uid, ids))
return "tid"
runner = br.BackgroundTaskRunner(debounce_seconds=0.5)
with patch.object(runner, "_submit_task", new=AsyncMock(side_effect=fake_submit)):
await runner.queue_message("u1", "seg-a", text_char_count=5)
await asyncio.sleep(0.05)
assert submitted == []
@pytest.mark.asyncio
async def test_queue_message_chars_met_submits_after_debounce(
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setattr(br.settings, "memoir_segment_batch_min_chars", 10)
monkeypatch.setattr(br.settings, "memoir_segment_batch_max_wait_seconds", 60.0)
submitted: list[tuple[str, list[str]]] = []
async def fake_submit(uid: str, ids: list[str]) -> str:
submitted.append((uid, ids))
return "tid"
runner = br.BackgroundTaskRunner(debounce_seconds=0)
with patch.object(runner, "_submit_task", new=AsyncMock(side_effect=fake_submit)):
await runner.queue_message("u1", "seg-long", text_char_count=50)
await asyncio.sleep(0.05)
assert submitted and submitted[0][1] == ["seg-long"]