Files
operating-room-monitor-server/backend/tests/test_session_manager_unit.py
op 3979e13ca9 Prevent concurrent RTSP recording on the same camera.
Reserve camera ownership under registry lock before spawning ffmpeg, return 409 for conflicts without retry, and transcode slices to 1080p H.264 for more stable playback.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-26 16:32:55 +08:00

454 lines
15 KiB
Python

from __future__ import annotations
import asyncio
from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock
import pytest
from app.baked import pipeline as bp
from app.config import Settings
from app.domain.consumption import SurgeryConsumptionStored
from app.surgery_errors import SurgeryPipelineError
from app.services.video.session_manager import (
CameraSessionManager,
PendingConsumableConfirmation,
RunningSurgery,
SurgerySessionState,
)
def test_live_consumption_requires_non_empty_details() -> None:
settings = Settings()
mgr = CameraSessionManager(
settings=settings,
hikvision_runtime=None,
result_repository=None,
)
st = SurgerySessionState(candidate_consumables=["纱布"])
run = RunningSurgery(stop_event=asyncio.Event(), state=st, tasks=[])
mgr._registry._active["123456"] = run
st.ready.set()
assert mgr.live_consumption_if_active("123456") is None
@pytest.mark.asyncio
async def test_resolve_voice_accepts_label_on_surgery_list_not_in_topk_options() -> None:
settings = Settings()
mgr = CameraSessionManager(
settings=settings,
hikvision_runtime=None,
result_repository=None,
)
st = SurgerySessionState(
candidate_consumables=["纱布", "止血钳"],
name_to_code={"纱布": "P1", "止血钳": "P2"},
)
pid = "test-confirm-id"
st.pending_by_id[pid] = PendingConsumableConfirmation(
id=pid,
status="pending",
options=[("纱布", 0.4)],
prompt_text="请确认",
created_at=datetime.now(timezone.utc),
model_top1_label="unknown",
model_top1_confidence=0.41,
)
st.pending_fifo.append(pid)
mgr._registry._active["123456"] = RunningSurgery(stop_event=asyncio.Event(), state=st, tasks=[])
await mgr.resolve_pending_confirmation("123456", pid, chosen_label="止血钳", rejected=False)
assert len(st.details) == 1
assert st.details[0].item_id == "P2"
assert st.details[0].item_name == "止血钳"
assert st.details[0].source == "voice"
@pytest.mark.asyncio
async def test_resolve_pending_appends_voice_detail() -> None:
settings = Settings()
mgr = CameraSessionManager(
settings=settings,
hikvision_runtime=None,
result_repository=None,
)
st = SurgerySessionState(candidate_consumables=["纱布", "缝线"])
pid = "test-confirm-id"
st.pending_by_id[pid] = PendingConsumableConfirmation(
id=pid,
status="pending",
options=[("纱布", 0.4), ("缝线", 0.3)],
prompt_text="请确认",
created_at=datetime.now(timezone.utc),
model_top1_label="unknown",
model_top1_confidence=0.41,
)
st.pending_fifo.append(pid)
mgr._registry._active["123456"] = RunningSurgery(stop_event=asyncio.Event(), state=st, tasks=[])
await mgr.resolve_pending_confirmation("123456", pid, chosen_label="纱布", rejected=False)
assert len(st.details) == 1
assert st.details[0].item_name == "纱布"
assert st.details[0].source == "voice"
assert pid not in st.pending_by_id
assert st.pending_fifo == []
@pytest.mark.asyncio
async def test_resolve_reject_keeps_pending_detail_and_queue() -> None:
settings = Settings()
mgr = CameraSessionManager(
settings=settings,
hikvision_runtime=None,
result_repository=None,
)
st = SurgerySessionState(candidate_consumables=["纱布"])
pid = "r1"
st.pending_by_id[pid] = PendingConsumableConfirmation(
id=pid,
status="pending",
options=[("纱布", 0.4)],
prompt_text="x",
created_at=datetime.now(timezone.utc),
model_top1_label="纱布",
model_top1_confidence=0.4,
)
st.pending_fifo.append(pid)
st.details.append(
SurgeryConsumptionStored(
item_id=f"pending:{pid}",
item_name="纱布(待确认)",
qty=1,
doctor_id="x",
timestamp=datetime.now(timezone.utc),
source="pending_confirmation",
pending_confirmation_id=pid,
),
)
mgr._registry._active["123456"] = RunningSurgery(stop_event=asyncio.Event(), state=st, tasks=[])
await mgr.resolve_pending_confirmation("123456", pid, chosen_label=None, rejected=True)
assert len(st.details) == 1
assert st.details[0].item_id == f"pending:{pid}"
assert st.details[0].item_name == "纱布(待确认)"
assert pid in st.pending_by_id
assert st.pending_fifo == [pid]
@pytest.mark.asyncio
async def test_archive_retry_loop_starts() -> None:
settings = Settings()
mgr = CameraSessionManager(
settings=settings,
hikvision_runtime=None,
result_repository=None,
)
await mgr.start_archive_retry_loop()
persister = mgr._archive
assert persister._retry_task is not None
await mgr.shutdown()
assert persister._retry_task is None
@pytest.mark.asyncio
async def test_resolve_invalid_chosen_label() -> None:
settings = Settings()
mgr = CameraSessionManager(
settings=settings,
hikvision_runtime=None,
result_repository=None,
)
st = SurgerySessionState(candidate_consumables=["纱布"])
pid = "p1"
st.pending_by_id[pid] = PendingConsumableConfirmation(
id=pid,
status="pending",
options=[("纱布", 0.4)],
prompt_text="x",
created_at=datetime.now(timezone.utc),
model_top1_label="x",
model_top1_confidence=0.4,
)
st.pending_fifo.append(pid)
mgr._registry._active["123456"] = RunningSurgery(stop_event=asyncio.Event(), state=st, tasks=[])
with pytest.raises(SurgeryPipelineError) as excinfo:
await mgr.resolve_pending_confirmation("123456", pid, chosen_label="止血钳", rejected=False)
assert excinfo.value.code == "CONFIRMATION_INVALID"
@pytest.mark.asyncio
async def test_resolve_not_active() -> None:
settings = Settings()
mgr = CameraSessionManager(
settings=settings,
hikvision_runtime=None,
result_repository=None,
)
with pytest.raises(SurgeryPipelineError) as excinfo:
await mgr.resolve_pending_confirmation("999999", "p1", chosen_label="纱布", rejected=False)
assert excinfo.value.code == "CONFIRMATION_NOT_ACTIVE"
@pytest.mark.asyncio
async def test_resolve_second_time_not_found() -> None:
settings = Settings()
mgr = CameraSessionManager(
settings=settings,
hikvision_runtime=None,
result_repository=None,
)
st = SurgerySessionState(candidate_consumables=["纱布"])
pid = "p2"
st.pending_by_id[pid] = PendingConsumableConfirmation(
id=pid,
status="pending",
options=[("纱布", 0.4)],
prompt_text="x",
created_at=datetime.now(timezone.utc),
model_top1_label="x",
model_top1_confidence=0.4,
)
st.pending_fifo.append(pid)
mgr._registry._active["123456"] = RunningSurgery(stop_event=asyncio.Event(), state=st, tasks=[])
await mgr.resolve_pending_confirmation("123456", pid, chosen_label="纱布", rejected=False)
with pytest.raises(SurgeryPipelineError) as excinfo:
await mgr.resolve_pending_confirmation("123456", pid, chosen_label="纱布", rejected=False)
assert excinfo.value.code == "CONFIRMATION_NOT_FOUND"
@pytest.mark.asyncio
async def test_resolve_already_resolved_status() -> None:
settings = Settings()
mgr = CameraSessionManager(
settings=settings,
hikvision_runtime=None,
result_repository=None,
)
st = SurgerySessionState(candidate_consumables=["纱布"])
pid = "p3"
pending = PendingConsumableConfirmation(
id=pid,
status="pending",
options=[("纱布", 0.4)],
prompt_text="x",
created_at=datetime.now(timezone.utc),
model_top1_label="x",
model_top1_confidence=0.4,
)
st.pending_by_id[pid] = pending
st.pending_fifo.append(pid)
mgr._registry._active["123456"] = RunningSurgery(stop_event=asyncio.Event(), state=st, tasks=[])
pending.status = "confirmed"
with pytest.raises(SurgeryPipelineError) as excinfo:
await mgr.resolve_pending_confirmation("123456", pid, chosen_label="纱布", rejected=False)
assert excinfo.value.code == "CONFIRMATION_ALREADY_RESOLVED"
def test_pending_queue_pending_count_fifo() -> None:
settings = Settings()
mgr = CameraSessionManager(
settings=settings,
hikvision_runtime=None,
result_repository=None,
)
st = SurgerySessionState(candidate_consumables=["纱布"])
for pid in ("p1", "p2"):
st.pending_by_id[pid] = PendingConsumableConfirmation(
id=pid,
status="pending",
options=[("纱布", 0.4)],
prompt_text="x",
created_at=datetime.now(timezone.utc),
model_top1_label="x",
model_top1_confidence=0.4,
)
st.pending_fifo.append(pid)
mgr._registry._active["123456"] = RunningSurgery(stop_event=asyncio.Event(), state=st, tasks=[])
assert mgr.pending_queue_pending_count("123456") == 2
@pytest.mark.asyncio
async def test_start_surgery_pauses_prewarm_and_resumes_on_failure(
monkeypatch: pytest.MonkeyPatch,
) -> None:
settings = Settings(video_open_timeout_sec=5.0)
mgr = CameraSessionManager(
settings=settings,
hikvision_runtime=None,
result_repository=None,
)
prewarm = AsyncMock()
prewarm.was_warm = MagicMock(return_value=True)
mgr.set_rtsp_prewarm_service(prewarm)
async def fake_recorder_run(_self: object, stop_event: asyncio.Event) -> None:
await asyncio.sleep(3600)
monkeypatch.setattr(
"app.services.video.session_manager.RtspSegmentRecorder.run",
fake_recorder_run,
)
monkeypatch.setattr(
"app.services.video.session_manager.resolve_recording_cameras",
lambda *_a, **_k: ["or-cam-03"],
)
monkeypatch.setattr(
mgr,
"_resolve_rtsp_url",
AsyncMock(return_value=("rtsp://example/stream", None, False)),
)
with pytest.raises(SurgeryPipelineError):
await mgr.start_surgery("123456", ["or-cam-03"], ["纱布"])
prewarm.pause.assert_awaited_once_with("or-cam-03")
prewarm.resume.assert_awaited_once_with("or-cam-03")
@pytest.mark.asyncio
async def test_start_surgery_rejects_when_camera_already_recording() -> None:
settings = Settings()
mgr = CameraSessionManager(
settings=settings,
hikvision_runtime=None,
result_repository=None,
)
busy = SurgerySessionState(candidate_consumables=["纱布"])
busy.ready.set()
mgr._registry._active["100155"] = RunningSurgery(
stop_event=asyncio.Event(),
state=busy,
tasks=[],
record_camera_ids=["or-cam-03"],
)
mgr._registry._camera_owners["or-cam-03"] = "100155"
with pytest.raises(SurgeryPipelineError) as exc:
await mgr.start_surgery("100003", ["or-cam-03"], ["纱布"])
assert exc.value.code == "CAMERA_ALREADY_RECORDING"
assert "100155" in exc.value.message
assert "100003" not in mgr._registry._active
@pytest.mark.asyncio
async def test_registry_register_rejects_duplicate_camera_owner() -> None:
from app.services.video.session_registry import SurgerySessionRegistry
registry = SurgerySessionRegistry()
first = RunningSurgery(
stop_event=asyncio.Event(),
state=SurgerySessionState(candidate_consumables=["纱布"]),
tasks=[],
record_camera_ids=["or-cam-03"],
)
await registry.register("100155", first)
second = RunningSurgery(
stop_event=asyncio.Event(),
state=SurgerySessionState(candidate_consumables=["纱布"]),
tasks=[],
record_camera_ids=["or-cam-03"],
)
with pytest.raises(SurgeryPipelineError) as exc:
await registry.register("100003", second)
assert exc.value.code == "CAMERA_ALREADY_RECORDING"
await registry.unregister("100155")
assert registry.camera_recording_conflicts("100003", ["or-cam-03"]) == []
await registry.register("100003", second)
assert registry.get_running("100003") is second
@pytest.mark.asyncio
async def test_start_surgery_concurrent_camera_recording_spawns_single_ffmpeg(
monkeypatch: pytest.MonkeyPatch,
) -> None:
settings = Settings(video_open_timeout_sec=5.0)
mgr = CameraSessionManager(
settings=settings,
hikvision_runtime=None,
result_repository=None,
)
ffmpeg_spawn_count = 0
ready_gate = asyncio.Event()
async def fake_recorder_run(_self: object, stop_event: asyncio.Event) -> None:
nonlocal ffmpeg_spawn_count
ffmpeg_spawn_count += 1
ready = getattr(_self, "_ready_event", None)
if ready is not None and not ready.is_set():
await ready_gate.wait()
ready.set()
await stop_event.wait()
monkeypatch.setattr(
"app.services.video.session_manager.RtspSegmentRecorder.run",
fake_recorder_run,
)
monkeypatch.setattr(
"app.services.video.session_manager.resolve_recording_cameras",
lambda *_a, **_k: ["or-cam-03"],
)
monkeypatch.setattr(
mgr,
"_resolve_rtsp_url",
AsyncMock(return_value=("rtsp://example/stream", None, False)),
)
async def start_one(surgery_id: str) -> SurgeryPipelineError | None:
try:
await mgr.start_surgery(surgery_id, ["or-cam-03"], ["纱布"])
except SurgeryPipelineError as exc:
return exc
return None
first = asyncio.create_task(start_one("100155"))
second = asyncio.create_task(start_one("100003"))
await asyncio.sleep(0.05)
ready_gate.set()
outcomes = await asyncio.gather(first, second)
codes = ["ok" if err is None else err.code for err in outcomes]
assert codes.count("ok") == 1
assert codes.count("CAMERA_ALREADY_RECORDING") == 1
assert ffmpeg_spawn_count == 1
active = mgr._registry.active_ids()
assert len(active) == 1
assert active[0] in {"100155", "100003"}
owner = active[0]
await mgr.stop_surgery(owner)
@pytest.mark.asyncio
async def test_stop_surgery_resumes_prewarm_for_recorded_cameras() -> None:
settings = Settings()
mgr = CameraSessionManager(
settings=settings,
hikvision_runtime=None,
result_repository=None,
)
prewarm = AsyncMock()
mgr.set_rtsp_prewarm_service(prewarm)
st = SurgerySessionState(candidate_consumables=["纱布"])
st.ready.set()
run = RunningSurgery(
stop_event=asyncio.Event(),
state=st,
tasks=[],
record_camera_ids=["or-cam-03"],
)
mgr._registry._active["123456"] = run
mgr._registry._camera_owners["or-cam-03"] = "123456"
await mgr.stop_surgery("123456")
prewarm.resume.assert_awaited_once_with("or-cam-03")