"""``OnlineActionFormerRunner`` 行为单测:稳定段发布 / dedupe / 段内投票 / 尾段补发。 不加载 YOLO / VideoSwin / ActionFormer:直接构造 runner 后注入 fake bundle 与 fake feature/per-frame state,调用内部 ``_maybe_flush_stable_segments`` 与 ``finalize_split_for_stop`` 验证行为。 """ from __future__ import annotations import time from dataclasses import dataclass from typing import Any from unittest.mock import MagicMock import numpy as np import pytest from app.algorithm_runner.actionformer_gated.runner import ( ActionFormerSegmentRecord, OnlineActionFormerRunner, _Feature, _FrameClsState, _to_segment_records, actionformer_segment_stable_dedupe_key, ) class _FakeAF: """Fake ActionFormer:``forward_features`` 返回预置候选段。""" def __init__(self, segments: list[tuple[float, float, float]]) -> None: self._segments = segments self.feat_stride_frames = 32 self.num_frames = 64 self.calls: list[dict[str, Any]] = [] def forward_features(self, feats: np.ndarray, *, duration_sec: float, fps: float, video_id: str = "online"): self.calls.append({"shape": feats.shape, "duration": duration_sec, "fps": fps}) return list(self._segments) def _new_runner(*, segments: list[tuple[float, float, float]], sink=None) -> OnlineActionFormerRunner: """构造一个跳过 YOLO / VideoSwin 加载的 runner。""" haoc_names = {0: "手套", 1: "纱布"} det = MagicMock() det.names = {0: "hand"} gb = MagicMock() gb.names = {0: "good", 1: "bad"} haoc = MagicMock() haoc.names = haoc_names videoswin = MagicMock() af = _FakeAF(segments) runner = OnlineActionFormerRunner( det_m=det, gb_m=gb, haoc_m=haoc, videoswin=videoswin, action_former=af, name_to_id={"手套": "id-glove", "纱布": "id-gauze"}, whitelist_indices=None, timeline_anchor_wall=None, stable_segments_sink=sink, ) return runner def test_to_segment_records_voting_top1() -> None: per_frame = [ _FrameClsState(t_abs=0.5, label_name="手套", max_prob=0.9), _FrameClsState(t_abs=1.0, label_name="手套", max_prob=0.8), _FrameClsState(t_abs=1.4, label_name="纱布", max_prob=0.7), _FrameClsState(t_abs=2.5, label_name=None, max_prob=0.0), _FrameClsState(t_abs=10.5, label_name="纱布", max_prob=0.99), ] cands = [(0.0, 2.0, 0.95), (10.0, 11.0, 0.7)] recs = _to_segment_records( cands, per_frame=per_frame, name_to_id={"手套": "id-glove", "纱布": "id-gauze"}, ) assert len(recs) == 2 a, b = recs assert a.start_sec == 0.0 and a.end_sec == 2.0 assert a.item_name == "手套" assert a.item_id == "id-glove" assert a.top1_conf == pytest.approx(2 / 3, abs=1e-6) assert a.top2_name == "纱布" assert a.top2_conf == pytest.approx(1 / 3, abs=1e-6) assert b.start_sec == 10.0 and b.end_sec == 11.0 assert b.item_name == "纱布" assert b.item_id == "id-gauze" assert b.top1_conf == pytest.approx(1.0, abs=1e-6) def test_maybe_flush_publishes_only_stable_segments(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setattr( "app.baked.algorithm.ACTIONFORMER_FREEZE_LOOKAHEAD_S", 0.75, raising=False, ) received: list[list[ActionFormerSegmentRecord]] = [] def sink(batch: list[ActionFormerSegmentRecord]) -> None: received.append(batch) # 候选段 1:[0,2],候选段 2:[4,6]。now=8 时只有 [0,2] 满足 end+margin=4 不发,所以用 now=3.0 让 [0,2] 段稳定(2+0.75=2.75<3) runner._last_actionformer_call_wall = 0.0 runner._maybe_flush_stable_segments(now_sec=3.0) assert len(received) == 1 assert len(received[0]) == 1 pub_keys = {actionformer_segment_stable_dedupe_key(r) for r in received[0]} runner.mark_stable_segments_emitted(pub_keys) # 等过一个周期再发:now=10 时 [4,6] 段稳定(6+0.75=6.75<10) runner._last_actionformer_call_wall = 0.0 runner._maybe_flush_stable_segments(now_sec=10.0) assert len(received) == 2 assert len(received[1]) == 1 rec2 = received[1][0] assert (rec2.start_sec, rec2.end_sec) == (4.0, 6.0) def test_maybe_flush_dedupe_same_key_not_republished(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setattr( "app.baked.algorithm.ACTIONFORMER_FREEZE_LOOKAHEAD_S", 0.75, raising=False, ) received: list[list[ActionFormerSegmentRecord]] = [] def sink(batch: list[ActionFormerSegmentRecord]) -> None: received.append(batch) runner = _new_runner(segments=[(0.0, 2.0, 0.9)], sink=sink) runner._features = [ _Feature(t_center=0.5 + i * 1.0, feat=np.zeros(768, dtype=np.float32)) for i in range(20) ] runner._per_frame = [ _FrameClsState(t_abs=t, label_name="手套", max_prob=0.9) for t in np.linspace(0.1, 2.0, 5) ] runner._last_actionformer_call_wall = 0.0 runner._maybe_flush_stable_segments(now_sec=3.0) assert len(received) == 1 assert len(received[0]) == 1 runner.mark_stable_segments_emitted( [actionformer_segment_stable_dedupe_key(r) for r in received[0]] ) # 第二次再调,AF 给同一段不应再发 runner._last_actionformer_call_wall = 0.0 runner._maybe_flush_stable_segments(now_sec=4.0) assert len(received) == 1 def test_maybe_flush_throttled_by_period() -> None: received: list[list[ActionFormerSegmentRecord]] = [] def sink(batch: list[ActionFormerSegmentRecord]) -> None: received.append(batch) runner = _new_runner(segments=[(0.0, 2.0, 0.9)], sink=sink) runner._features = [ _Feature(t_center=0.5 + i * 1.0, feat=np.zeros(768, dtype=np.float32)) for i in range(20) ] runner._per_frame = [ _FrameClsState(t_abs=t, label_name="手套", max_prob=0.9) for t in np.linspace(0.1, 2.0, 5) ] # 刚刚调用过 → 节流不触发 runner._last_actionformer_call_wall = time.time() runner._maybe_flush_stable_segments(now_sec=3.0) assert received == [] def test_maybe_flush_respects_freeze_lookahead(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setattr( "app.baked.algorithm.ACTIONFORMER_FREEZE_LOOKAHEAD_S", 12.0, raising=False, ) received: list[list[ActionFormerSegmentRecord]] = [] def sink(batch: list[ActionFormerSegmentRecord]) -> None: received.append(batch) runner = _new_runner(segments=[(0.0, 2.0, 0.9)], sink=sink) runner._features = [ _Feature(t_center=0.5 + i * 1.0, feat=np.zeros(768, dtype=np.float32)) for i in range(20) ] runner._per_frame = [ _FrameClsState(t_abs=t, label_name="手套", max_prob=0.9) for t in np.linspace(0.1, 2.0, 5) ] runner._last_actionformer_call_wall = 0.0 runner._maybe_flush_stable_segments(now_sec=13.0) assert received == [] runner._last_actionformer_call_wall = 0.0 runner._maybe_flush_stable_segments(now_sec=15.0) assert len(received) == 1 def test_maybe_flush_overlap_dedupe_suppresses_boundary_jitter( monkeypatch: pytest.MonkeyPatch, ) -> None: monkeypatch.setattr( "app.baked.algorithm.ACTIONFORMER_FREEZE_LOOKAHEAD_S", 0.75, raising=False, ) monkeypatch.setattr( "app.baked.algorithm.ACTIONFORMER_DEDUPE_OVERLAP_RATIO", 0.5, raising=False, ) received: list[list[ActionFormerSegmentRecord]] = [] def sink(batch: list[ActionFormerSegmentRecord]) -> None: received.append(batch) runner = _new_runner(segments=[(0.0, 2.0, 0.9)], sink=sink) runner._features = [ _Feature(t_center=0.5 + i * 1.0, feat=np.zeros(768, dtype=np.float32)) for i in range(20) ] runner._per_frame = [ _FrameClsState(t_abs=t, label_name="手套", max_prob=0.9) for t in np.linspace(0.1, 2.2, 8) ] runner._last_actionformer_call_wall = 0.0 runner._maybe_flush_stable_segments(now_sec=3.0) assert len(received) == 1 runner.mark_stable_records_emitted(received[0]) runner._af._segments = [(0.12, 2.12, 0.95)] runner._last_actionformer_call_wall = 0.0 runner._maybe_flush_stable_segments(now_sec=4.0) assert len(received) == 1 def test_finalize_split_for_stop_returns_unemitted_tail() -> None: runner = _new_runner(segments=[(0.0, 2.0, 0.9), (4.0, 6.0, 0.8)], sink=None) runner._features = [ _Feature(t_center=0.5 + i * 1.0, feat=np.zeros(768, dtype=np.float32)) for i in range(20) ] runner._per_frame = [ _FrameClsState(t_abs=t, label_name="手套", max_prob=0.9) for t in np.linspace(0.1, 2.0, 5) ] + [ _FrameClsState(t_abs=t, label_name="纱布", max_prob=0.8) for t in np.linspace(4.1, 6.0, 5) ] full, tail = runner.finalize_split_for_stop() assert len(full) == 2 assert len(tail) == 2 runner.mark_stable_segments_emitted( [actionformer_segment_stable_dedupe_key(full[0])] ) full2, tail2 = runner.finalize_split_for_stop() assert len(full2) == 2 assert len(tail2) == 1 assert tail2[0].start_sec == 4.0 def test_finalize_no_features_returns_empty() -> None: runner = _new_runner(segments=[(0.0, 2.0, 0.9)], sink=None) full, tail = runner.finalize_split_for_stop() assert full == [] assert tail == []