Files
operating-room-monitor-server/backend/tests/test_actionformer_gated_runner.py
Kevin 1af442481e 重组为 backend/clients/docs 三层结构,并清理 git 污染。
将后端迁入 backend/,完善根目录 .gitignore,删除误提交的 .mypy_cache 缓存文件。

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-21 16:02:25 +08:00

289 lines
9.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""``OnlineActionFormerRunner`` 行为单测:稳定段发布 / dedupe / 段内投票 / 尾段补发。
不加载 YOLO / VideoSwin / ActionFormer直接构造 runner 后注入 fake bundle 与
fake feature/per-frame state调用内部 ``_maybe_flush_stable_segments`` 与
``finalize_split_for_stop`` 验证行为。
"""
from __future__ import annotations
import time
from dataclasses import dataclass
from typing import Any
from unittest.mock import MagicMock
import numpy as np
import pytest
from app.algorithm_runner.actionformer_gated.runner import (
ActionFormerSegmentRecord,
OnlineActionFormerRunner,
_Feature,
_FrameClsState,
_to_segment_records,
actionformer_segment_stable_dedupe_key,
)
class _FakeAF:
"""Fake ActionFormer``forward_features`` 返回预置候选段。"""
def __init__(self, segments: list[tuple[float, float, float]]) -> None:
self._segments = segments
self.feat_stride_frames = 32
self.num_frames = 64
self.calls: list[dict[str, Any]] = []
def forward_features(self, feats: np.ndarray, *, duration_sec: float, fps: float, video_id: str = "online"):
self.calls.append({"shape": feats.shape, "duration": duration_sec, "fps": fps})
return list(self._segments)
def _new_runner(*, segments: list[tuple[float, float, float]], sink=None) -> OnlineActionFormerRunner:
"""构造一个跳过 YOLO / VideoSwin 加载的 runner。"""
haoc_names = {0: "手套", 1: "纱布"}
det = MagicMock()
det.names = {0: "hand"}
gb = MagicMock()
gb.names = {0: "good", 1: "bad"}
haoc = MagicMock()
haoc.names = haoc_names
videoswin = MagicMock()
af = _FakeAF(segments)
runner = OnlineActionFormerRunner(
det_m=det,
gb_m=gb,
haoc_m=haoc,
videoswin=videoswin,
action_former=af,
name_to_id={"手套": "id-glove", "纱布": "id-gauze"},
whitelist_indices=None,
timeline_anchor_wall=None,
stable_segments_sink=sink,
)
return runner
def test_to_segment_records_voting_top1() -> None:
per_frame = [
_FrameClsState(t_abs=0.5, label_name="手套", max_prob=0.9),
_FrameClsState(t_abs=1.0, label_name="手套", max_prob=0.8),
_FrameClsState(t_abs=1.4, label_name="纱布", max_prob=0.7),
_FrameClsState(t_abs=2.5, label_name=None, max_prob=0.0),
_FrameClsState(t_abs=10.5, label_name="纱布", max_prob=0.99),
]
cands = [(0.0, 2.0, 0.95), (10.0, 11.0, 0.7)]
recs = _to_segment_records(
cands,
per_frame=per_frame,
name_to_id={"手套": "id-glove", "纱布": "id-gauze"},
)
assert len(recs) == 2
a, b = recs
assert a.start_sec == 0.0 and a.end_sec == 2.0
assert a.item_name == "手套"
assert a.item_id == "id-glove"
assert a.top1_conf == pytest.approx(2 / 3, abs=1e-6)
assert a.top2_name == "纱布"
assert a.top2_conf == pytest.approx(1 / 3, abs=1e-6)
assert b.start_sec == 10.0 and b.end_sec == 11.0
assert b.item_name == "纱布"
assert b.item_id == "id-gauze"
assert b.top1_conf == pytest.approx(1.0, abs=1e-6)
def test_maybe_flush_publishes_only_stable_segments(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(
"app.baked.algorithm.ACTIONFORMER_FREEZE_LOOKAHEAD_S",
0.75,
raising=False,
)
received: list[list[ActionFormerSegmentRecord]] = []
def sink(batch: list[ActionFormerSegmentRecord]) -> None:
received.append(batch)
# 候选段 1[0,2],候选段 2[4,6]。now=8 时只有 [0,2] 满足 end+margin<nowmargin=0.75)。
# now=10 时两个都满足。
runner = _new_runner(segments=[(0.0, 2.0, 0.9), (4.0, 6.0, 0.8)], sink=sink)
runner._features = [
_Feature(t_center=0.5 + i * 1.0, feat=np.zeros(768, dtype=np.float32)) for i in range(20)
]
runner._per_frame = [
_FrameClsState(t_abs=t, label_name="手套", max_prob=0.9)
for t in np.linspace(0.1, 2.0, 5)
] + [
_FrameClsState(t_abs=t, label_name="纱布", max_prob=0.8)
for t in np.linspace(4.1, 6.0, 5)
]
# 第一次只发 [0,2]now=4 时 4+0.75>=4 不发,所以用 now=3.0 让 [0,2] 段稳定2+0.75=2.75<3
runner._last_actionformer_call_wall = 0.0
runner._maybe_flush_stable_segments(now_sec=3.0)
assert len(received) == 1
assert len(received[0]) == 1
pub_keys = {actionformer_segment_stable_dedupe_key(r) for r in received[0]}
runner.mark_stable_segments_emitted(pub_keys)
# 等过一个周期再发now=10 时 [4,6] 段稳定6+0.75=6.75<10
runner._last_actionformer_call_wall = 0.0
runner._maybe_flush_stable_segments(now_sec=10.0)
assert len(received) == 2
assert len(received[1]) == 1
rec2 = received[1][0]
assert (rec2.start_sec, rec2.end_sec) == (4.0, 6.0)
def test_maybe_flush_dedupe_same_key_not_republished(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(
"app.baked.algorithm.ACTIONFORMER_FREEZE_LOOKAHEAD_S",
0.75,
raising=False,
)
received: list[list[ActionFormerSegmentRecord]] = []
def sink(batch: list[ActionFormerSegmentRecord]) -> None:
received.append(batch)
runner = _new_runner(segments=[(0.0, 2.0, 0.9)], sink=sink)
runner._features = [
_Feature(t_center=0.5 + i * 1.0, feat=np.zeros(768, dtype=np.float32)) for i in range(20)
]
runner._per_frame = [
_FrameClsState(t_abs=t, label_name="手套", max_prob=0.9)
for t in np.linspace(0.1, 2.0, 5)
]
runner._last_actionformer_call_wall = 0.0
runner._maybe_flush_stable_segments(now_sec=3.0)
assert len(received) == 1
assert len(received[0]) == 1
runner.mark_stable_segments_emitted(
[actionformer_segment_stable_dedupe_key(r) for r in received[0]]
)
# 第二次再调AF 给同一段不应再发
runner._last_actionformer_call_wall = 0.0
runner._maybe_flush_stable_segments(now_sec=4.0)
assert len(received) == 1
def test_maybe_flush_throttled_by_period() -> None:
received: list[list[ActionFormerSegmentRecord]] = []
def sink(batch: list[ActionFormerSegmentRecord]) -> None:
received.append(batch)
runner = _new_runner(segments=[(0.0, 2.0, 0.9)], sink=sink)
runner._features = [
_Feature(t_center=0.5 + i * 1.0, feat=np.zeros(768, dtype=np.float32)) for i in range(20)
]
runner._per_frame = [
_FrameClsState(t_abs=t, label_name="手套", max_prob=0.9)
for t in np.linspace(0.1, 2.0, 5)
]
# 刚刚调用过 → 节流不触发
runner._last_actionformer_call_wall = time.time()
runner._maybe_flush_stable_segments(now_sec=3.0)
assert received == []
def test_maybe_flush_respects_freeze_lookahead(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(
"app.baked.algorithm.ACTIONFORMER_FREEZE_LOOKAHEAD_S",
12.0,
raising=False,
)
received: list[list[ActionFormerSegmentRecord]] = []
def sink(batch: list[ActionFormerSegmentRecord]) -> None:
received.append(batch)
runner = _new_runner(segments=[(0.0, 2.0, 0.9)], sink=sink)
runner._features = [
_Feature(t_center=0.5 + i * 1.0, feat=np.zeros(768, dtype=np.float32)) for i in range(20)
]
runner._per_frame = [
_FrameClsState(t_abs=t, label_name="手套", max_prob=0.9)
for t in np.linspace(0.1, 2.0, 5)
]
runner._last_actionformer_call_wall = 0.0
runner._maybe_flush_stable_segments(now_sec=13.0)
assert received == []
runner._last_actionformer_call_wall = 0.0
runner._maybe_flush_stable_segments(now_sec=15.0)
assert len(received) == 1
def test_maybe_flush_overlap_dedupe_suppresses_boundary_jitter(
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setattr(
"app.baked.algorithm.ACTIONFORMER_FREEZE_LOOKAHEAD_S",
0.75,
raising=False,
)
monkeypatch.setattr(
"app.baked.algorithm.ACTIONFORMER_DEDUPE_OVERLAP_RATIO",
0.5,
raising=False,
)
received: list[list[ActionFormerSegmentRecord]] = []
def sink(batch: list[ActionFormerSegmentRecord]) -> None:
received.append(batch)
runner = _new_runner(segments=[(0.0, 2.0, 0.9)], sink=sink)
runner._features = [
_Feature(t_center=0.5 + i * 1.0, feat=np.zeros(768, dtype=np.float32)) for i in range(20)
]
runner._per_frame = [
_FrameClsState(t_abs=t, label_name="手套", max_prob=0.9)
for t in np.linspace(0.1, 2.2, 8)
]
runner._last_actionformer_call_wall = 0.0
runner._maybe_flush_stable_segments(now_sec=3.0)
assert len(received) == 1
runner.mark_stable_records_emitted(received[0])
runner._af._segments = [(0.12, 2.12, 0.95)]
runner._last_actionformer_call_wall = 0.0
runner._maybe_flush_stable_segments(now_sec=4.0)
assert len(received) == 1
def test_finalize_split_for_stop_returns_unemitted_tail() -> None:
runner = _new_runner(segments=[(0.0, 2.0, 0.9), (4.0, 6.0, 0.8)], sink=None)
runner._features = [
_Feature(t_center=0.5 + i * 1.0, feat=np.zeros(768, dtype=np.float32)) for i in range(20)
]
runner._per_frame = [
_FrameClsState(t_abs=t, label_name="手套", max_prob=0.9)
for t in np.linspace(0.1, 2.0, 5)
] + [
_FrameClsState(t_abs=t, label_name="纱布", max_prob=0.8)
for t in np.linspace(4.1, 6.0, 5)
]
full, tail = runner.finalize_split_for_stop()
assert len(full) == 2
assert len(tail) == 2
runner.mark_stable_segments_emitted(
[actionformer_segment_stable_dedupe_key(full[0])]
)
full2, tail2 = runner.finalize_split_for_stop()
assert len(full2) == 2
assert len(tail2) == 1
assert tail2[0].start_sec == 4.0
def test_finalize_no_features_returns_empty() -> None:
runner = _new_runner(segments=[(0.0, 2.0, 0.9)], sink=None)
full, tail = runner.finalize_split_for_stop()
assert full == []
assert tail == []