将后端迁入 backend/,完善根目录 .gitignore,删除误提交的 .mypy_cache 缓存文件。 Co-authored-by: Cursor <cursoragent@cursor.com>
289 lines
9.9 KiB
Python
289 lines
9.9 KiB
Python
"""``OnlineActionFormerRunner`` 行为单测:稳定段发布 / dedupe / 段内投票 / 尾段补发。
|
||
|
||
不加载 YOLO / VideoSwin / ActionFormer:直接构造 runner 后注入 fake bundle 与
|
||
fake feature/per-frame state,调用内部 ``_maybe_flush_stable_segments`` 与
|
||
``finalize_split_for_stop`` 验证行为。
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import time
|
||
from dataclasses import dataclass
|
||
from typing import Any
|
||
from unittest.mock import MagicMock
|
||
|
||
import numpy as np
|
||
import pytest
|
||
|
||
from app.algorithm_runner.actionformer_gated.runner import (
|
||
ActionFormerSegmentRecord,
|
||
OnlineActionFormerRunner,
|
||
_Feature,
|
||
_FrameClsState,
|
||
_to_segment_records,
|
||
actionformer_segment_stable_dedupe_key,
|
||
)
|
||
|
||
|
||
class _FakeAF:
|
||
"""Fake ActionFormer:``forward_features`` 返回预置候选段。"""
|
||
|
||
def __init__(self, segments: list[tuple[float, float, float]]) -> None:
|
||
self._segments = segments
|
||
self.feat_stride_frames = 32
|
||
self.num_frames = 64
|
||
self.calls: list[dict[str, Any]] = []
|
||
|
||
def forward_features(self, feats: np.ndarray, *, duration_sec: float, fps: float, video_id: str = "online"):
|
||
self.calls.append({"shape": feats.shape, "duration": duration_sec, "fps": fps})
|
||
return list(self._segments)
|
||
|
||
|
||
def _new_runner(*, segments: list[tuple[float, float, float]], sink=None) -> OnlineActionFormerRunner:
|
||
"""构造一个跳过 YOLO / VideoSwin 加载的 runner。"""
|
||
haoc_names = {0: "手套", 1: "纱布"}
|
||
det = MagicMock()
|
||
det.names = {0: "hand"}
|
||
gb = MagicMock()
|
||
gb.names = {0: "good", 1: "bad"}
|
||
haoc = MagicMock()
|
||
haoc.names = haoc_names
|
||
videoswin = MagicMock()
|
||
af = _FakeAF(segments)
|
||
runner = OnlineActionFormerRunner(
|
||
det_m=det,
|
||
gb_m=gb,
|
||
haoc_m=haoc,
|
||
videoswin=videoswin,
|
||
action_former=af,
|
||
name_to_id={"手套": "id-glove", "纱布": "id-gauze"},
|
||
whitelist_indices=None,
|
||
timeline_anchor_wall=None,
|
||
stable_segments_sink=sink,
|
||
)
|
||
return runner
|
||
|
||
|
||
def test_to_segment_records_voting_top1() -> None:
|
||
per_frame = [
|
||
_FrameClsState(t_abs=0.5, label_name="手套", max_prob=0.9),
|
||
_FrameClsState(t_abs=1.0, label_name="手套", max_prob=0.8),
|
||
_FrameClsState(t_abs=1.4, label_name="纱布", max_prob=0.7),
|
||
_FrameClsState(t_abs=2.5, label_name=None, max_prob=0.0),
|
||
_FrameClsState(t_abs=10.5, label_name="纱布", max_prob=0.99),
|
||
]
|
||
cands = [(0.0, 2.0, 0.95), (10.0, 11.0, 0.7)]
|
||
recs = _to_segment_records(
|
||
cands,
|
||
per_frame=per_frame,
|
||
name_to_id={"手套": "id-glove", "纱布": "id-gauze"},
|
||
)
|
||
assert len(recs) == 2
|
||
a, b = recs
|
||
assert a.start_sec == 0.0 and a.end_sec == 2.0
|
||
assert a.item_name == "手套"
|
||
assert a.item_id == "id-glove"
|
||
assert a.top1_conf == pytest.approx(2 / 3, abs=1e-6)
|
||
assert a.top2_name == "纱布"
|
||
assert a.top2_conf == pytest.approx(1 / 3, abs=1e-6)
|
||
assert b.start_sec == 10.0 and b.end_sec == 11.0
|
||
assert b.item_name == "纱布"
|
||
assert b.item_id == "id-gauze"
|
||
assert b.top1_conf == pytest.approx(1.0, abs=1e-6)
|
||
|
||
|
||
def test_maybe_flush_publishes_only_stable_segments(monkeypatch: pytest.MonkeyPatch) -> None:
|
||
monkeypatch.setattr(
|
||
"app.baked.algorithm.ACTIONFORMER_FREEZE_LOOKAHEAD_S",
|
||
0.75,
|
||
raising=False,
|
||
)
|
||
received: list[list[ActionFormerSegmentRecord]] = []
|
||
|
||
def sink(batch: list[ActionFormerSegmentRecord]) -> None:
|
||
received.append(batch)
|
||
|
||
# 候选段 1:[0,2],候选段 2:[4,6]。now=8 时只有 [0,2] 满足 end+margin<now(margin=0.75)。
|
||
# now=10 时两个都满足。
|
||
runner = _new_runner(segments=[(0.0, 2.0, 0.9), (4.0, 6.0, 0.8)], sink=sink)
|
||
runner._features = [
|
||
_Feature(t_center=0.5 + i * 1.0, feat=np.zeros(768, dtype=np.float32)) for i in range(20)
|
||
]
|
||
runner._per_frame = [
|
||
_FrameClsState(t_abs=t, label_name="手套", max_prob=0.9)
|
||
for t in np.linspace(0.1, 2.0, 5)
|
||
] + [
|
||
_FrameClsState(t_abs=t, label_name="纱布", max_prob=0.8)
|
||
for t in np.linspace(4.1, 6.0, 5)
|
||
]
|
||
|
||
# 第一次只发 [0,2]:now=4 时 4+0.75>=4 不发,所以用 now=3.0 让 [0,2] 段稳定(2+0.75=2.75<3)
|
||
runner._last_actionformer_call_wall = 0.0
|
||
runner._maybe_flush_stable_segments(now_sec=3.0)
|
||
assert len(received) == 1
|
||
assert len(received[0]) == 1
|
||
pub_keys = {actionformer_segment_stable_dedupe_key(r) for r in received[0]}
|
||
runner.mark_stable_segments_emitted(pub_keys)
|
||
|
||
# 等过一个周期再发:now=10 时 [4,6] 段稳定(6+0.75=6.75<10)
|
||
runner._last_actionformer_call_wall = 0.0
|
||
runner._maybe_flush_stable_segments(now_sec=10.0)
|
||
assert len(received) == 2
|
||
assert len(received[1]) == 1
|
||
rec2 = received[1][0]
|
||
assert (rec2.start_sec, rec2.end_sec) == (4.0, 6.0)
|
||
|
||
|
||
def test_maybe_flush_dedupe_same_key_not_republished(monkeypatch: pytest.MonkeyPatch) -> None:
|
||
monkeypatch.setattr(
|
||
"app.baked.algorithm.ACTIONFORMER_FREEZE_LOOKAHEAD_S",
|
||
0.75,
|
||
raising=False,
|
||
)
|
||
received: list[list[ActionFormerSegmentRecord]] = []
|
||
|
||
def sink(batch: list[ActionFormerSegmentRecord]) -> None:
|
||
received.append(batch)
|
||
|
||
runner = _new_runner(segments=[(0.0, 2.0, 0.9)], sink=sink)
|
||
runner._features = [
|
||
_Feature(t_center=0.5 + i * 1.0, feat=np.zeros(768, dtype=np.float32)) for i in range(20)
|
||
]
|
||
runner._per_frame = [
|
||
_FrameClsState(t_abs=t, label_name="手套", max_prob=0.9)
|
||
for t in np.linspace(0.1, 2.0, 5)
|
||
]
|
||
|
||
runner._last_actionformer_call_wall = 0.0
|
||
runner._maybe_flush_stable_segments(now_sec=3.0)
|
||
assert len(received) == 1
|
||
assert len(received[0]) == 1
|
||
runner.mark_stable_segments_emitted(
|
||
[actionformer_segment_stable_dedupe_key(r) for r in received[0]]
|
||
)
|
||
|
||
# 第二次再调,AF 给同一段不应再发
|
||
runner._last_actionformer_call_wall = 0.0
|
||
runner._maybe_flush_stable_segments(now_sec=4.0)
|
||
assert len(received) == 1
|
||
|
||
|
||
def test_maybe_flush_throttled_by_period() -> None:
|
||
received: list[list[ActionFormerSegmentRecord]] = []
|
||
|
||
def sink(batch: list[ActionFormerSegmentRecord]) -> None:
|
||
received.append(batch)
|
||
|
||
runner = _new_runner(segments=[(0.0, 2.0, 0.9)], sink=sink)
|
||
runner._features = [
|
||
_Feature(t_center=0.5 + i * 1.0, feat=np.zeros(768, dtype=np.float32)) for i in range(20)
|
||
]
|
||
runner._per_frame = [
|
||
_FrameClsState(t_abs=t, label_name="手套", max_prob=0.9)
|
||
for t in np.linspace(0.1, 2.0, 5)
|
||
]
|
||
|
||
# 刚刚调用过 → 节流不触发
|
||
runner._last_actionformer_call_wall = time.time()
|
||
runner._maybe_flush_stable_segments(now_sec=3.0)
|
||
assert received == []
|
||
|
||
|
||
def test_maybe_flush_respects_freeze_lookahead(monkeypatch: pytest.MonkeyPatch) -> None:
|
||
monkeypatch.setattr(
|
||
"app.baked.algorithm.ACTIONFORMER_FREEZE_LOOKAHEAD_S",
|
||
12.0,
|
||
raising=False,
|
||
)
|
||
received: list[list[ActionFormerSegmentRecord]] = []
|
||
|
||
def sink(batch: list[ActionFormerSegmentRecord]) -> None:
|
||
received.append(batch)
|
||
|
||
runner = _new_runner(segments=[(0.0, 2.0, 0.9)], sink=sink)
|
||
runner._features = [
|
||
_Feature(t_center=0.5 + i * 1.0, feat=np.zeros(768, dtype=np.float32)) for i in range(20)
|
||
]
|
||
runner._per_frame = [
|
||
_FrameClsState(t_abs=t, label_name="手套", max_prob=0.9)
|
||
for t in np.linspace(0.1, 2.0, 5)
|
||
]
|
||
|
||
runner._last_actionformer_call_wall = 0.0
|
||
runner._maybe_flush_stable_segments(now_sec=13.0)
|
||
assert received == []
|
||
|
||
runner._last_actionformer_call_wall = 0.0
|
||
runner._maybe_flush_stable_segments(now_sec=15.0)
|
||
assert len(received) == 1
|
||
|
||
|
||
def test_maybe_flush_overlap_dedupe_suppresses_boundary_jitter(
|
||
monkeypatch: pytest.MonkeyPatch,
|
||
) -> None:
|
||
monkeypatch.setattr(
|
||
"app.baked.algorithm.ACTIONFORMER_FREEZE_LOOKAHEAD_S",
|
||
0.75,
|
||
raising=False,
|
||
)
|
||
monkeypatch.setattr(
|
||
"app.baked.algorithm.ACTIONFORMER_DEDUPE_OVERLAP_RATIO",
|
||
0.5,
|
||
raising=False,
|
||
)
|
||
received: list[list[ActionFormerSegmentRecord]] = []
|
||
|
||
def sink(batch: list[ActionFormerSegmentRecord]) -> None:
|
||
received.append(batch)
|
||
|
||
runner = _new_runner(segments=[(0.0, 2.0, 0.9)], sink=sink)
|
||
runner._features = [
|
||
_Feature(t_center=0.5 + i * 1.0, feat=np.zeros(768, dtype=np.float32)) for i in range(20)
|
||
]
|
||
runner._per_frame = [
|
||
_FrameClsState(t_abs=t, label_name="手套", max_prob=0.9)
|
||
for t in np.linspace(0.1, 2.2, 8)
|
||
]
|
||
|
||
runner._last_actionformer_call_wall = 0.0
|
||
runner._maybe_flush_stable_segments(now_sec=3.0)
|
||
assert len(received) == 1
|
||
runner.mark_stable_records_emitted(received[0])
|
||
|
||
runner._af._segments = [(0.12, 2.12, 0.95)]
|
||
runner._last_actionformer_call_wall = 0.0
|
||
runner._maybe_flush_stable_segments(now_sec=4.0)
|
||
assert len(received) == 1
|
||
|
||
|
||
def test_finalize_split_for_stop_returns_unemitted_tail() -> None:
|
||
runner = _new_runner(segments=[(0.0, 2.0, 0.9), (4.0, 6.0, 0.8)], sink=None)
|
||
runner._features = [
|
||
_Feature(t_center=0.5 + i * 1.0, feat=np.zeros(768, dtype=np.float32)) for i in range(20)
|
||
]
|
||
runner._per_frame = [
|
||
_FrameClsState(t_abs=t, label_name="手套", max_prob=0.9)
|
||
for t in np.linspace(0.1, 2.0, 5)
|
||
] + [
|
||
_FrameClsState(t_abs=t, label_name="纱布", max_prob=0.8)
|
||
for t in np.linspace(4.1, 6.0, 5)
|
||
]
|
||
|
||
full, tail = runner.finalize_split_for_stop()
|
||
assert len(full) == 2
|
||
assert len(tail) == 2
|
||
runner.mark_stable_segments_emitted(
|
||
[actionformer_segment_stable_dedupe_key(full[0])]
|
||
)
|
||
full2, tail2 = runner.finalize_split_for_stop()
|
||
assert len(full2) == 2
|
||
assert len(tail2) == 1
|
||
assert tail2[0].start_sec == 4.0
|
||
|
||
|
||
def test_finalize_no_features_returns_empty() -> None:
|
||
runner = _new_runner(segments=[(0.0, 2.0, 0.9)], sink=None)
|
||
full, tail = runner.finalize_split_for_stop()
|
||
assert full == []
|
||
assert tail == []
|