Reserve camera ownership under registry lock before spawning ffmpeg, return 409 for conflicts without retry, and transcode slices to 1080p H.264 for more stable playback. Co-authored-by: Cursor <cursoragent@cursor.com>
454 lines
15 KiB
Python
454 lines
15 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from datetime import datetime, timezone
|
|
from unittest.mock import AsyncMock, MagicMock
|
|
|
|
import pytest
|
|
|
|
from app.baked import pipeline as bp
|
|
from app.config import Settings
|
|
from app.domain.consumption import SurgeryConsumptionStored
|
|
from app.surgery_errors import SurgeryPipelineError
|
|
from app.services.video.session_manager import (
|
|
CameraSessionManager,
|
|
PendingConsumableConfirmation,
|
|
RunningSurgery,
|
|
SurgerySessionState,
|
|
)
|
|
|
|
|
|
def test_live_consumption_requires_non_empty_details() -> None:
|
|
settings = Settings()
|
|
mgr = CameraSessionManager(
|
|
settings=settings,
|
|
hikvision_runtime=None,
|
|
result_repository=None,
|
|
)
|
|
st = SurgerySessionState(candidate_consumables=["纱布"])
|
|
run = RunningSurgery(stop_event=asyncio.Event(), state=st, tasks=[])
|
|
mgr._registry._active["123456"] = run
|
|
st.ready.set()
|
|
assert mgr.live_consumption_if_active("123456") is None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_resolve_voice_accepts_label_on_surgery_list_not_in_topk_options() -> None:
|
|
settings = Settings()
|
|
mgr = CameraSessionManager(
|
|
settings=settings,
|
|
hikvision_runtime=None,
|
|
result_repository=None,
|
|
)
|
|
st = SurgerySessionState(
|
|
candidate_consumables=["纱布", "止血钳"],
|
|
name_to_code={"纱布": "P1", "止血钳": "P2"},
|
|
)
|
|
pid = "test-confirm-id"
|
|
st.pending_by_id[pid] = PendingConsumableConfirmation(
|
|
id=pid,
|
|
status="pending",
|
|
options=[("纱布", 0.4)],
|
|
prompt_text="请确认",
|
|
created_at=datetime.now(timezone.utc),
|
|
model_top1_label="unknown",
|
|
model_top1_confidence=0.41,
|
|
)
|
|
st.pending_fifo.append(pid)
|
|
mgr._registry._active["123456"] = RunningSurgery(stop_event=asyncio.Event(), state=st, tasks=[])
|
|
|
|
await mgr.resolve_pending_confirmation("123456", pid, chosen_label="止血钳", rejected=False)
|
|
|
|
assert len(st.details) == 1
|
|
assert st.details[0].item_id == "P2"
|
|
assert st.details[0].item_name == "止血钳"
|
|
assert st.details[0].source == "voice"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_resolve_pending_appends_voice_detail() -> None:
|
|
settings = Settings()
|
|
mgr = CameraSessionManager(
|
|
settings=settings,
|
|
hikvision_runtime=None,
|
|
result_repository=None,
|
|
)
|
|
st = SurgerySessionState(candidate_consumables=["纱布", "缝线"])
|
|
pid = "test-confirm-id"
|
|
st.pending_by_id[pid] = PendingConsumableConfirmation(
|
|
id=pid,
|
|
status="pending",
|
|
options=[("纱布", 0.4), ("缝线", 0.3)],
|
|
prompt_text="请确认",
|
|
created_at=datetime.now(timezone.utc),
|
|
model_top1_label="unknown",
|
|
model_top1_confidence=0.41,
|
|
)
|
|
st.pending_fifo.append(pid)
|
|
mgr._registry._active["123456"] = RunningSurgery(stop_event=asyncio.Event(), state=st, tasks=[])
|
|
|
|
await mgr.resolve_pending_confirmation("123456", pid, chosen_label="纱布", rejected=False)
|
|
|
|
assert len(st.details) == 1
|
|
assert st.details[0].item_name == "纱布"
|
|
assert st.details[0].source == "voice"
|
|
assert pid not in st.pending_by_id
|
|
assert st.pending_fifo == []
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_resolve_reject_keeps_pending_detail_and_queue() -> None:
|
|
settings = Settings()
|
|
mgr = CameraSessionManager(
|
|
settings=settings,
|
|
hikvision_runtime=None,
|
|
result_repository=None,
|
|
)
|
|
st = SurgerySessionState(candidate_consumables=["纱布"])
|
|
pid = "r1"
|
|
st.pending_by_id[pid] = PendingConsumableConfirmation(
|
|
id=pid,
|
|
status="pending",
|
|
options=[("纱布", 0.4)],
|
|
prompt_text="x",
|
|
created_at=datetime.now(timezone.utc),
|
|
model_top1_label="纱布",
|
|
model_top1_confidence=0.4,
|
|
)
|
|
st.pending_fifo.append(pid)
|
|
st.details.append(
|
|
SurgeryConsumptionStored(
|
|
item_id=f"pending:{pid}",
|
|
item_name="纱布(待确认)",
|
|
qty=1,
|
|
doctor_id="x",
|
|
timestamp=datetime.now(timezone.utc),
|
|
source="pending_confirmation",
|
|
pending_confirmation_id=pid,
|
|
),
|
|
)
|
|
mgr._registry._active["123456"] = RunningSurgery(stop_event=asyncio.Event(), state=st, tasks=[])
|
|
|
|
await mgr.resolve_pending_confirmation("123456", pid, chosen_label=None, rejected=True)
|
|
|
|
assert len(st.details) == 1
|
|
assert st.details[0].item_id == f"pending:{pid}"
|
|
assert st.details[0].item_name == "纱布(待确认)"
|
|
assert pid in st.pending_by_id
|
|
assert st.pending_fifo == [pid]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_archive_retry_loop_starts() -> None:
|
|
settings = Settings()
|
|
mgr = CameraSessionManager(
|
|
settings=settings,
|
|
hikvision_runtime=None,
|
|
result_repository=None,
|
|
)
|
|
await mgr.start_archive_retry_loop()
|
|
persister = mgr._archive
|
|
assert persister._retry_task is not None
|
|
await mgr.shutdown()
|
|
assert persister._retry_task is None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_resolve_invalid_chosen_label() -> None:
|
|
settings = Settings()
|
|
mgr = CameraSessionManager(
|
|
settings=settings,
|
|
hikvision_runtime=None,
|
|
result_repository=None,
|
|
)
|
|
st = SurgerySessionState(candidate_consumables=["纱布"])
|
|
pid = "p1"
|
|
st.pending_by_id[pid] = PendingConsumableConfirmation(
|
|
id=pid,
|
|
status="pending",
|
|
options=[("纱布", 0.4)],
|
|
prompt_text="x",
|
|
created_at=datetime.now(timezone.utc),
|
|
model_top1_label="x",
|
|
model_top1_confidence=0.4,
|
|
)
|
|
st.pending_fifo.append(pid)
|
|
mgr._registry._active["123456"] = RunningSurgery(stop_event=asyncio.Event(), state=st, tasks=[])
|
|
with pytest.raises(SurgeryPipelineError) as excinfo:
|
|
await mgr.resolve_pending_confirmation("123456", pid, chosen_label="止血钳", rejected=False)
|
|
assert excinfo.value.code == "CONFIRMATION_INVALID"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_resolve_not_active() -> None:
|
|
settings = Settings()
|
|
mgr = CameraSessionManager(
|
|
settings=settings,
|
|
hikvision_runtime=None,
|
|
result_repository=None,
|
|
)
|
|
with pytest.raises(SurgeryPipelineError) as excinfo:
|
|
await mgr.resolve_pending_confirmation("999999", "p1", chosen_label="纱布", rejected=False)
|
|
assert excinfo.value.code == "CONFIRMATION_NOT_ACTIVE"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_resolve_second_time_not_found() -> None:
|
|
settings = Settings()
|
|
mgr = CameraSessionManager(
|
|
settings=settings,
|
|
hikvision_runtime=None,
|
|
result_repository=None,
|
|
)
|
|
st = SurgerySessionState(candidate_consumables=["纱布"])
|
|
pid = "p2"
|
|
st.pending_by_id[pid] = PendingConsumableConfirmation(
|
|
id=pid,
|
|
status="pending",
|
|
options=[("纱布", 0.4)],
|
|
prompt_text="x",
|
|
created_at=datetime.now(timezone.utc),
|
|
model_top1_label="x",
|
|
model_top1_confidence=0.4,
|
|
)
|
|
st.pending_fifo.append(pid)
|
|
mgr._registry._active["123456"] = RunningSurgery(stop_event=asyncio.Event(), state=st, tasks=[])
|
|
await mgr.resolve_pending_confirmation("123456", pid, chosen_label="纱布", rejected=False)
|
|
with pytest.raises(SurgeryPipelineError) as excinfo:
|
|
await mgr.resolve_pending_confirmation("123456", pid, chosen_label="纱布", rejected=False)
|
|
assert excinfo.value.code == "CONFIRMATION_NOT_FOUND"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_resolve_already_resolved_status() -> None:
|
|
settings = Settings()
|
|
mgr = CameraSessionManager(
|
|
settings=settings,
|
|
hikvision_runtime=None,
|
|
result_repository=None,
|
|
)
|
|
st = SurgerySessionState(candidate_consumables=["纱布"])
|
|
pid = "p3"
|
|
pending = PendingConsumableConfirmation(
|
|
id=pid,
|
|
status="pending",
|
|
options=[("纱布", 0.4)],
|
|
prompt_text="x",
|
|
created_at=datetime.now(timezone.utc),
|
|
model_top1_label="x",
|
|
model_top1_confidence=0.4,
|
|
)
|
|
st.pending_by_id[pid] = pending
|
|
st.pending_fifo.append(pid)
|
|
mgr._registry._active["123456"] = RunningSurgery(stop_event=asyncio.Event(), state=st, tasks=[])
|
|
pending.status = "confirmed"
|
|
with pytest.raises(SurgeryPipelineError) as excinfo:
|
|
await mgr.resolve_pending_confirmation("123456", pid, chosen_label="纱布", rejected=False)
|
|
assert excinfo.value.code == "CONFIRMATION_ALREADY_RESOLVED"
|
|
|
|
|
|
def test_pending_queue_pending_count_fifo() -> None:
|
|
settings = Settings()
|
|
mgr = CameraSessionManager(
|
|
settings=settings,
|
|
hikvision_runtime=None,
|
|
result_repository=None,
|
|
)
|
|
st = SurgerySessionState(candidate_consumables=["纱布"])
|
|
for pid in ("p1", "p2"):
|
|
st.pending_by_id[pid] = PendingConsumableConfirmation(
|
|
id=pid,
|
|
status="pending",
|
|
options=[("纱布", 0.4)],
|
|
prompt_text="x",
|
|
created_at=datetime.now(timezone.utc),
|
|
model_top1_label="x",
|
|
model_top1_confidence=0.4,
|
|
)
|
|
st.pending_fifo.append(pid)
|
|
mgr._registry._active["123456"] = RunningSurgery(stop_event=asyncio.Event(), state=st, tasks=[])
|
|
assert mgr.pending_queue_pending_count("123456") == 2
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_start_surgery_pauses_prewarm_and_resumes_on_failure(
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
settings = Settings(video_open_timeout_sec=5.0)
|
|
mgr = CameraSessionManager(
|
|
settings=settings,
|
|
hikvision_runtime=None,
|
|
result_repository=None,
|
|
)
|
|
prewarm = AsyncMock()
|
|
prewarm.was_warm = MagicMock(return_value=True)
|
|
mgr.set_rtsp_prewarm_service(prewarm)
|
|
|
|
async def fake_recorder_run(_self: object, stop_event: asyncio.Event) -> None:
|
|
await asyncio.sleep(3600)
|
|
|
|
monkeypatch.setattr(
|
|
"app.services.video.session_manager.RtspSegmentRecorder.run",
|
|
fake_recorder_run,
|
|
)
|
|
monkeypatch.setattr(
|
|
"app.services.video.session_manager.resolve_recording_cameras",
|
|
lambda *_a, **_k: ["or-cam-03"],
|
|
)
|
|
monkeypatch.setattr(
|
|
mgr,
|
|
"_resolve_rtsp_url",
|
|
AsyncMock(return_value=("rtsp://example/stream", None, False)),
|
|
)
|
|
|
|
with pytest.raises(SurgeryPipelineError):
|
|
await mgr.start_surgery("123456", ["or-cam-03"], ["纱布"])
|
|
|
|
prewarm.pause.assert_awaited_once_with("or-cam-03")
|
|
prewarm.resume.assert_awaited_once_with("or-cam-03")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_start_surgery_rejects_when_camera_already_recording() -> None:
|
|
settings = Settings()
|
|
mgr = CameraSessionManager(
|
|
settings=settings,
|
|
hikvision_runtime=None,
|
|
result_repository=None,
|
|
)
|
|
busy = SurgerySessionState(candidate_consumables=["纱布"])
|
|
busy.ready.set()
|
|
mgr._registry._active["100155"] = RunningSurgery(
|
|
stop_event=asyncio.Event(),
|
|
state=busy,
|
|
tasks=[],
|
|
record_camera_ids=["or-cam-03"],
|
|
)
|
|
mgr._registry._camera_owners["or-cam-03"] = "100155"
|
|
|
|
with pytest.raises(SurgeryPipelineError) as exc:
|
|
await mgr.start_surgery("100003", ["or-cam-03"], ["纱布"])
|
|
|
|
assert exc.value.code == "CAMERA_ALREADY_RECORDING"
|
|
assert "100155" in exc.value.message
|
|
assert "100003" not in mgr._registry._active
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_registry_register_rejects_duplicate_camera_owner() -> None:
|
|
from app.services.video.session_registry import SurgerySessionRegistry
|
|
|
|
registry = SurgerySessionRegistry()
|
|
first = RunningSurgery(
|
|
stop_event=asyncio.Event(),
|
|
state=SurgerySessionState(candidate_consumables=["纱布"]),
|
|
tasks=[],
|
|
record_camera_ids=["or-cam-03"],
|
|
)
|
|
await registry.register("100155", first)
|
|
|
|
second = RunningSurgery(
|
|
stop_event=asyncio.Event(),
|
|
state=SurgerySessionState(candidate_consumables=["纱布"]),
|
|
tasks=[],
|
|
record_camera_ids=["or-cam-03"],
|
|
)
|
|
with pytest.raises(SurgeryPipelineError) as exc:
|
|
await registry.register("100003", second)
|
|
|
|
assert exc.value.code == "CAMERA_ALREADY_RECORDING"
|
|
|
|
await registry.unregister("100155")
|
|
assert registry.camera_recording_conflicts("100003", ["or-cam-03"]) == []
|
|
await registry.register("100003", second)
|
|
assert registry.get_running("100003") is second
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_start_surgery_concurrent_camera_recording_spawns_single_ffmpeg(
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
settings = Settings(video_open_timeout_sec=5.0)
|
|
mgr = CameraSessionManager(
|
|
settings=settings,
|
|
hikvision_runtime=None,
|
|
result_repository=None,
|
|
)
|
|
ffmpeg_spawn_count = 0
|
|
ready_gate = asyncio.Event()
|
|
|
|
async def fake_recorder_run(_self: object, stop_event: asyncio.Event) -> None:
|
|
nonlocal ffmpeg_spawn_count
|
|
ffmpeg_spawn_count += 1
|
|
ready = getattr(_self, "_ready_event", None)
|
|
if ready is not None and not ready.is_set():
|
|
await ready_gate.wait()
|
|
ready.set()
|
|
await stop_event.wait()
|
|
|
|
monkeypatch.setattr(
|
|
"app.services.video.session_manager.RtspSegmentRecorder.run",
|
|
fake_recorder_run,
|
|
)
|
|
monkeypatch.setattr(
|
|
"app.services.video.session_manager.resolve_recording_cameras",
|
|
lambda *_a, **_k: ["or-cam-03"],
|
|
)
|
|
monkeypatch.setattr(
|
|
mgr,
|
|
"_resolve_rtsp_url",
|
|
AsyncMock(return_value=("rtsp://example/stream", None, False)),
|
|
)
|
|
|
|
async def start_one(surgery_id: str) -> SurgeryPipelineError | None:
|
|
try:
|
|
await mgr.start_surgery(surgery_id, ["or-cam-03"], ["纱布"])
|
|
except SurgeryPipelineError as exc:
|
|
return exc
|
|
return None
|
|
|
|
first = asyncio.create_task(start_one("100155"))
|
|
second = asyncio.create_task(start_one("100003"))
|
|
await asyncio.sleep(0.05)
|
|
ready_gate.set()
|
|
outcomes = await asyncio.gather(first, second)
|
|
|
|
codes = ["ok" if err is None else err.code for err in outcomes]
|
|
assert codes.count("ok") == 1
|
|
assert codes.count("CAMERA_ALREADY_RECORDING") == 1
|
|
assert ffmpeg_spawn_count == 1
|
|
active = mgr._registry.active_ids()
|
|
assert len(active) == 1
|
|
assert active[0] in {"100155", "100003"}
|
|
|
|
owner = active[0]
|
|
await mgr.stop_surgery(owner)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_stop_surgery_resumes_prewarm_for_recorded_cameras() -> None:
|
|
settings = Settings()
|
|
mgr = CameraSessionManager(
|
|
settings=settings,
|
|
hikvision_runtime=None,
|
|
result_repository=None,
|
|
)
|
|
prewarm = AsyncMock()
|
|
mgr.set_rtsp_prewarm_service(prewarm)
|
|
|
|
st = SurgerySessionState(candidate_consumables=["纱布"])
|
|
st.ready.set()
|
|
run = RunningSurgery(
|
|
stop_event=asyncio.Event(),
|
|
state=st,
|
|
tasks=[],
|
|
record_camera_ids=["or-cam-03"],
|
|
)
|
|
mgr._registry._active["123456"] = run
|
|
mgr._registry._camera_owners["or-cam-03"] = "123456"
|
|
|
|
await mgr.stop_surgery("123456")
|
|
|
|
prewarm.resume.assert_awaited_once_with("or-cam-03")
|
|
|