Files
operating-room-monitor-server/backend/tests/test_session_manager_unit.py
Kevin 1af442481e 重组为 backend/clients/docs 三层结构,并清理 git 污染。
将后端迁入 backend/,完善根目录 .gitignore,删除误提交的 .mypy_cache 缓存文件。

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-21 16:02:25 +08:00

350 lines
11 KiB
Python

from __future__ import annotations
import asyncio
from datetime import datetime, timezone
from unittest.mock import MagicMock
import pytest
from app.baked import pipeline as bp
from app.config import Settings
from app.domain.consumption import SurgeryConsumptionStored
from app.surgery_errors import SurgeryPipelineError
from app.services.video.session_manager import (
CameraSessionManager,
PendingConsumableConfirmation,
RunningSurgery,
SurgerySessionState,
)
def test_live_consumption_requires_non_empty_details() -> None:
settings = Settings()
mgr = CameraSessionManager(
settings=settings,
hikvision_runtime=None,
result_repository=None,
)
st = SurgerySessionState(candidate_consumables=["纱布"])
run = RunningSurgery(stop_event=asyncio.Event(), state=st, tasks=[])
mgr._registry._active["123456"] = run
st.ready.set()
assert mgr.live_consumption_if_active("123456") is None
@pytest.mark.asyncio
async def test_resolve_voice_accepts_label_on_surgery_list_not_in_topk_options() -> None:
settings = Settings()
mgr = CameraSessionManager(
settings=settings,
hikvision_runtime=None,
result_repository=None,
)
st = SurgerySessionState(
candidate_consumables=["纱布", "止血钳"],
name_to_code={"纱布": "P1", "止血钳": "P2"},
)
pid = "test-confirm-id"
st.pending_by_id[pid] = PendingConsumableConfirmation(
id=pid,
status="pending",
options=[("纱布", 0.4)],
prompt_text="请确认",
created_at=datetime.now(timezone.utc),
model_top1_label="unknown",
model_top1_confidence=0.41,
)
st.pending_fifo.append(pid)
mgr._registry._active["123456"] = RunningSurgery(stop_event=asyncio.Event(), state=st, tasks=[])
await mgr.resolve_pending_confirmation("123456", pid, chosen_label="止血钳", rejected=False)
assert len(st.details) == 1
assert st.details[0].item_id == "P2"
assert st.details[0].item_name == "止血钳"
assert st.details[0].source == "voice"
@pytest.mark.asyncio
async def test_resolve_pending_appends_voice_detail() -> None:
settings = Settings()
mgr = CameraSessionManager(
settings=settings,
hikvision_runtime=None,
result_repository=None,
)
st = SurgerySessionState(candidate_consumables=["纱布", "缝线"])
pid = "test-confirm-id"
st.pending_by_id[pid] = PendingConsumableConfirmation(
id=pid,
status="pending",
options=[("纱布", 0.4), ("缝线", 0.3)],
prompt_text="请确认",
created_at=datetime.now(timezone.utc),
model_top1_label="unknown",
model_top1_confidence=0.41,
)
st.pending_fifo.append(pid)
mgr._registry._active["123456"] = RunningSurgery(stop_event=asyncio.Event(), state=st, tasks=[])
await mgr.resolve_pending_confirmation("123456", pid, chosen_label="纱布", rejected=False)
assert len(st.details) == 1
assert st.details[0].item_name == "纱布"
assert st.details[0].source == "voice"
assert pid not in st.pending_by_id
assert st.pending_fifo == []
@pytest.mark.asyncio
async def test_resolve_reject_keeps_pending_detail_and_queue() -> None:
settings = Settings()
mgr = CameraSessionManager(
settings=settings,
hikvision_runtime=None,
result_repository=None,
)
st = SurgerySessionState(candidate_consumables=["纱布"])
pid = "r1"
st.pending_by_id[pid] = PendingConsumableConfirmation(
id=pid,
status="pending",
options=[("纱布", 0.4)],
prompt_text="x",
created_at=datetime.now(timezone.utc),
model_top1_label="纱布",
model_top1_confidence=0.4,
)
st.pending_fifo.append(pid)
st.details.append(
SurgeryConsumptionStored(
item_id=f"pending:{pid}",
item_name="纱布(待确认)",
qty=1,
doctor_id="x",
timestamp=datetime.now(timezone.utc),
source="pending_confirmation",
pending_confirmation_id=pid,
),
)
mgr._registry._active["123456"] = RunningSurgery(stop_event=asyncio.Event(), state=st, tasks=[])
await mgr.resolve_pending_confirmation("123456", pid, chosen_label=None, rejected=True)
assert len(st.details) == 1
assert st.details[0].item_id == f"pending:{pid}"
assert st.details[0].item_name == "纱布(待确认)"
assert pid in st.pending_by_id
assert st.pending_fifo == [pid]
@pytest.mark.asyncio
async def test_apply_segment_confirmed_event() -> None:
settings = Settings()
mgr = CameraSessionManager(
settings=settings,
hikvision_runtime=None,
result_repository=None,
)
st = SurgerySessionState(
candidate_consumables=["纱布"],
name_to_code={"纱布": "HC1"},
)
wall = 1_700_000_000.0
ev = {
"type": "segment_confirmed",
"cooldown_key": "k1",
"item_id": "HC1",
"item_name": "纱布",
"qty": 1,
"wall_start_epoch": wall - 1,
"wall_end_epoch": wall,
"camera_id": "cam01",
"top1_conf": 0.95,
"top2_name": "",
"top2_conf": 0.0,
"top3_name": "",
"top3_conf": 0.0,
}
await mgr._apply_algo_event("123456", st, ev)
assert len(st.details) == 1
assert st.details[0].item_name == "纱布"
assert st.details[0].source == "algo_subprocess"
@pytest.mark.asyncio
async def test_apply_needs_voice_confirm_event(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(bp, "VIDEO_AUTO_CONFIRM_CONFIDENCE", 0.99)
settings = Settings()
mgr = CameraSessionManager(
settings=settings,
hikvision_runtime=None,
result_repository=None,
)
hub = MagicMock()
mgr.set_voice_terminal_hub(hub)
st = SurgerySessionState(
candidate_consumables=["缝线"],
name_to_code={},
voice_terminal_id="T1",
)
ev = {
"type": "needs_voice_confirm",
"confirmation_id": "cid-voice-1",
"model_top1_label": "纱布",
"model_top1_confidence": 0.91,
"options": [{"label": "纱布", "confidence": 0.91}, {"label": "缝线", "confidence": 0.05}],
"wall_start_epoch": 100.0,
"wall_end_epoch": 110.0,
"camera_id": "cam01",
"cls_top3": {
"t1_name": "纱布",
"t1_conf": 0.91,
"t2_name": "缝线",
"t2_conf": 0.05,
"t3_name": "",
"t3_conf": 0.0,
"t1_pid": "",
"t2_pid": "",
"t3_pid": "",
},
}
await mgr._apply_algo_event("123456", st, ev)
assert len(st.pending_fifo) == 1
assert st.pending_fifo[0] == "cid-voice-1"
assert len(st.details) == 1
assert st.details[0].item_name == "纱布(待确认)"
hub.schedule_notify_pending_head.assert_called_once()
@pytest.mark.asyncio
async def test_archive_retry_loop_starts() -> None:
settings = Settings()
mgr = CameraSessionManager(
settings=settings,
hikvision_runtime=None,
result_repository=None,
)
await mgr.start_archive_retry_loop()
persister = mgr._archive
assert persister._retry_task is not None
await mgr.shutdown()
assert persister._retry_task is None
@pytest.mark.asyncio
async def test_resolve_invalid_chosen_label() -> None:
settings = Settings()
mgr = CameraSessionManager(
settings=settings,
hikvision_runtime=None,
result_repository=None,
)
st = SurgerySessionState(candidate_consumables=["纱布"])
pid = "p1"
st.pending_by_id[pid] = PendingConsumableConfirmation(
id=pid,
status="pending",
options=[("纱布", 0.4)],
prompt_text="x",
created_at=datetime.now(timezone.utc),
model_top1_label="x",
model_top1_confidence=0.4,
)
st.pending_fifo.append(pid)
mgr._registry._active["123456"] = RunningSurgery(stop_event=asyncio.Event(), state=st, tasks=[])
with pytest.raises(SurgeryPipelineError) as excinfo:
await mgr.resolve_pending_confirmation("123456", pid, chosen_label="止血钳", rejected=False)
assert excinfo.value.code == "CONFIRMATION_INVALID"
@pytest.mark.asyncio
async def test_resolve_not_active() -> None:
settings = Settings()
mgr = CameraSessionManager(
settings=settings,
hikvision_runtime=None,
result_repository=None,
)
with pytest.raises(SurgeryPipelineError) as excinfo:
await mgr.resolve_pending_confirmation("999999", "p1", chosen_label="纱布", rejected=False)
assert excinfo.value.code == "CONFIRMATION_NOT_ACTIVE"
@pytest.mark.asyncio
async def test_resolve_second_time_not_found() -> None:
settings = Settings()
mgr = CameraSessionManager(
settings=settings,
hikvision_runtime=None,
result_repository=None,
)
st = SurgerySessionState(candidate_consumables=["纱布"])
pid = "p2"
st.pending_by_id[pid] = PendingConsumableConfirmation(
id=pid,
status="pending",
options=[("纱布", 0.4)],
prompt_text="x",
created_at=datetime.now(timezone.utc),
model_top1_label="x",
model_top1_confidence=0.4,
)
st.pending_fifo.append(pid)
mgr._registry._active["123456"] = RunningSurgery(stop_event=asyncio.Event(), state=st, tasks=[])
await mgr.resolve_pending_confirmation("123456", pid, chosen_label="纱布", rejected=False)
with pytest.raises(SurgeryPipelineError) as excinfo:
await mgr.resolve_pending_confirmation("123456", pid, chosen_label="纱布", rejected=False)
assert excinfo.value.code == "CONFIRMATION_NOT_FOUND"
@pytest.mark.asyncio
async def test_resolve_already_resolved_status() -> None:
settings = Settings()
mgr = CameraSessionManager(
settings=settings,
hikvision_runtime=None,
result_repository=None,
)
st = SurgerySessionState(candidate_consumables=["纱布"])
pid = "p3"
pending = PendingConsumableConfirmation(
id=pid,
status="pending",
options=[("纱布", 0.4)],
prompt_text="x",
created_at=datetime.now(timezone.utc),
model_top1_label="x",
model_top1_confidence=0.4,
)
st.pending_by_id[pid] = pending
st.pending_fifo.append(pid)
mgr._registry._active["123456"] = RunningSurgery(stop_event=asyncio.Event(), state=st, tasks=[])
pending.status = "confirmed"
with pytest.raises(SurgeryPipelineError) as excinfo:
await mgr.resolve_pending_confirmation("123456", pid, chosen_label="纱布", rejected=False)
assert excinfo.value.code == "CONFIRMATION_ALREADY_RESOLVED"
def test_pending_queue_pending_count_fifo() -> None:
settings = Settings()
mgr = CameraSessionManager(
settings=settings,
hikvision_runtime=None,
result_repository=None,
)
st = SurgerySessionState(candidate_consumables=["纱布"])
for pid in ("p1", "p2"):
st.pending_by_id[pid] = PendingConsumableConfirmation(
id=pid,
status="pending",
options=[("纱布", 0.4)],
prompt_text="x",
created_at=datetime.now(timezone.utc),
model_top1_label="x",
model_top1_confidence=0.4,
)
st.pending_fifo.append(pid)
mgr._registry._active["123456"] = RunningSurgery(stop_event=asyncio.Event(), state=st, tasks=[])
assert mgr.pending_queue_pending_count("123456") == 2