配置 SSOT(TOML + .env) 统一错误契约 Auth 与事务边界 Redis / Celery 可靠性:业务 Redis(DB/0)与 Celery broker/backend(DB/1)显式拆分;连接池、sync client 可观测性(OpenTelemetry + LGTM)
209 lines
6.4 KiB
Python
209 lines
6.4 KiB
Python
"""BackgroundTaskRunner:字数门闸、超时、flush(纯函数 + 异步 mock)。"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
from unittest.mock import AsyncMock, patch
|
||
|
||
import pytest
|
||
|
||
from app.features.memoir import background_runner as br
|
||
from app.features.memoir.constants import memoir
|
||
|
||
|
||
def test_batch_ready_for_submit_min_chars_zero() -> None:
|
||
assert br._batch_ready_for_submit(
|
||
min_chars=0,
|
||
max_wait_seconds=60.0,
|
||
total_text_chars=0,
|
||
elapsed_seconds=0.0,
|
||
)
|
||
|
||
|
||
def test_batch_ready_for_submit_chars_met() -> None:
|
||
assert br._batch_ready_for_submit(
|
||
min_chars=50,
|
||
max_wait_seconds=60.0,
|
||
total_text_chars=50,
|
||
elapsed_seconds=1.0,
|
||
)
|
||
|
||
|
||
def test_batch_ready_for_submit_not_ready() -> None:
|
||
assert not br._batch_ready_for_submit(
|
||
min_chars=50,
|
||
max_wait_seconds=60.0,
|
||
total_text_chars=10,
|
||
elapsed_seconds=5.0,
|
||
)
|
||
|
||
|
||
def test_batch_ready_for_submit_max_wait_elapsed() -> None:
|
||
assert br._batch_ready_for_submit(
|
||
min_chars=50,
|
||
max_wait_seconds=60.0,
|
||
total_text_chars=10,
|
||
elapsed_seconds=60.0,
|
||
)
|
||
|
||
|
||
def test_next_retry_sleep_seconds() -> None:
|
||
assert br._next_retry_sleep_seconds(5.0, 60.0, 1.0) == 5.0
|
||
assert br._next_retry_sleep_seconds(5.0, 60.0, 58.0) == 2.0
|
||
assert br._next_retry_sleep_seconds(5.0, 60.0, 60.0) == 0.0
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_flush_pending_submits_without_gate(
|
||
monkeypatch: pytest.MonkeyPatch,
|
||
) -> None:
|
||
monkeypatch.setattr(memoir, "segment_batch_min_chars", 9999)
|
||
monkeypatch.setattr(memoir, "segment_batch_max_wait_seconds", 9999.0)
|
||
|
||
submitted: list[tuple[str, list[str]]] = []
|
||
|
||
async def fake_submit(uid: str, ids: list[str]) -> str:
|
||
submitted.append((uid, ids))
|
||
return "tid"
|
||
|
||
runner = br.BackgroundTaskRunner(debounce_seconds=30)
|
||
uid = "u1"
|
||
runner._batch[uid] = br._MemoirBatchState(
|
||
segment_ids=["s1", "s2"],
|
||
total_text_chars=3,
|
||
first_queued_monotonic=0.0,
|
||
)
|
||
|
||
with (
|
||
patch.object(runner, "_submit_task", new=AsyncMock(side_effect=fake_submit)),
|
||
patch.object(
|
||
runner,
|
||
"_flush_pending_phase2",
|
||
new=AsyncMock(return_value=None),
|
||
),
|
||
):
|
||
await runner.flush_pending(uid)
|
||
|
||
assert submitted == [("u1", ["s1", "s2"])]
|
||
assert uid not in runner._batch
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_flush_pending_merges_batch_and_extra_deduped(
|
||
monkeypatch: pytest.MonkeyPatch,
|
||
) -> None:
|
||
monkeypatch.setattr(memoir, "segment_batch_min_chars", 9999)
|
||
monkeypatch.setattr(memoir, "segment_batch_max_wait_seconds", 9999.0)
|
||
|
||
submitted: list[tuple[str, list[str]]] = []
|
||
|
||
async def fake_submit(uid: str, ids: list[str]) -> str:
|
||
submitted.append((uid, ids))
|
||
return "tid"
|
||
|
||
runner = br.BackgroundTaskRunner(debounce_seconds=30)
|
||
uid = "u1"
|
||
runner._batch[uid] = br._MemoirBatchState(
|
||
segment_ids=["s1", "s2"],
|
||
total_text_chars=3,
|
||
first_queued_monotonic=0.0,
|
||
)
|
||
|
||
with (
|
||
patch.object(runner, "_submit_task", new=AsyncMock(side_effect=fake_submit)),
|
||
patch.object(
|
||
runner,
|
||
"_flush_pending_phase2",
|
||
new=AsyncMock(return_value=None),
|
||
),
|
||
):
|
||
await runner.flush_pending(uid, extra_segment_ids=["s2", "s3", "s1"])
|
||
|
||
assert submitted == [("u1", ["s1", "s2", "s3"])]
|
||
assert uid not in runner._batch
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_queue_message_min_chars_zero_submits_after_debounce(
|
||
monkeypatch: pytest.MonkeyPatch,
|
||
) -> None:
|
||
monkeypatch.setattr(memoir, "segment_batch_min_chars", 0)
|
||
monkeypatch.setattr(memoir, "segment_batch_max_wait_seconds", 60.0)
|
||
|
||
submitted: list[tuple[str, list[str]]] = []
|
||
|
||
async def fake_submit(uid: str, ids: list[str]) -> str:
|
||
submitted.append((uid, ids))
|
||
return "tid"
|
||
|
||
runner = br.BackgroundTaskRunner(debounce_seconds=0)
|
||
with patch.object(runner, "_submit_task", new=AsyncMock(side_effect=fake_submit)):
|
||
await runner.queue_message("u1", "seg-a", text_char_count=0)
|
||
await asyncio.sleep(0.05)
|
||
|
||
assert submitted and submitted[0][1] == ["seg-a"]
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_queue_message_not_ready_then_max_wait_submits(
|
||
monkeypatch: pytest.MonkeyPatch,
|
||
) -> None:
|
||
monkeypatch.setattr(memoir, "segment_batch_min_chars", 100)
|
||
monkeypatch.setattr(memoir, "segment_batch_max_wait_seconds", 0.12)
|
||
|
||
submitted: list[tuple[str, list[str]]] = []
|
||
|
||
async def fake_submit(uid: str, ids: list[str]) -> str:
|
||
submitted.append((uid, ids))
|
||
return "tid"
|
||
|
||
# debounce 须 >0,否则 retry sleep 为 0 会误走「立即提交」分支
|
||
runner = br.BackgroundTaskRunner(debounce_seconds=0.02)
|
||
with patch.object(runner, "_submit_task", new=AsyncMock(side_effect=fake_submit)):
|
||
await runner.queue_message("u1", "seg-a", text_char_count=5)
|
||
await asyncio.sleep(0.2)
|
||
|
||
assert submitted and submitted[0][1] == ["seg-a"]
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_queue_message_not_ready_before_debounce_no_submit(
|
||
monkeypatch: pytest.MonkeyPatch,
|
||
) -> None:
|
||
monkeypatch.setattr(memoir, "segment_batch_min_chars", 100)
|
||
monkeypatch.setattr(memoir, "segment_batch_max_wait_seconds", 60.0)
|
||
|
||
submitted: list[tuple[str, list[str]]] = []
|
||
|
||
async def fake_submit(uid: str, ids: list[str]) -> str:
|
||
submitted.append((uid, ids))
|
||
return "tid"
|
||
|
||
runner = br.BackgroundTaskRunner(debounce_seconds=0.5)
|
||
with patch.object(runner, "_submit_task", new=AsyncMock(side_effect=fake_submit)):
|
||
await runner.queue_message("u1", "seg-a", text_char_count=5)
|
||
await asyncio.sleep(0.05)
|
||
|
||
assert submitted == []
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_queue_message_chars_met_submits_after_debounce(
|
||
monkeypatch: pytest.MonkeyPatch,
|
||
) -> None:
|
||
monkeypatch.setattr(memoir, "segment_batch_min_chars", 10)
|
||
monkeypatch.setattr(memoir, "segment_batch_max_wait_seconds", 60.0)
|
||
|
||
submitted: list[tuple[str, list[str]]] = []
|
||
|
||
async def fake_submit(uid: str, ids: list[str]) -> str:
|
||
submitted.append((uid, ids))
|
||
return "tid"
|
||
|
||
runner = br.BackgroundTaskRunner(debounce_seconds=0)
|
||
with patch.object(runner, "_submit_task", new=AsyncMock(side_effect=fake_submit)):
|
||
await runner.queue_message("u1", "seg-long", text_char_count=50)
|
||
await asyncio.sleep(0.05)
|
||
|
||
assert submitted and submitted[0][1] == ["seg-long"]
|