350 lines
11 KiB
Python
350 lines
11 KiB
Python
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import asyncio
|
||
|
|
from datetime import datetime, timezone
|
||
|
|
from unittest.mock import MagicMock
|
||
|
|
|
||
|
|
import pytest
|
||
|
|
|
||
|
|
from app.baked import pipeline as bp
|
||
|
|
from app.config import Settings
|
||
|
|
from app.domain.consumption import SurgeryConsumptionStored
|
||
|
|
from app.surgery_errors import SurgeryPipelineError
|
||
|
|
from app.services.video.session_manager import (
|
||
|
|
CameraSessionManager,
|
||
|
|
PendingConsumableConfirmation,
|
||
|
|
RunningSurgery,
|
||
|
|
SurgerySessionState,
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
def test_live_consumption_requires_non_empty_details() -> None:
|
||
|
|
settings = Settings()
|
||
|
|
mgr = CameraSessionManager(
|
||
|
|
settings=settings,
|
||
|
|
hikvision_runtime=None,
|
||
|
|
result_repository=None,
|
||
|
|
)
|
||
|
|
st = SurgerySessionState(candidate_consumables=["纱布"])
|
||
|
|
run = RunningSurgery(stop_event=asyncio.Event(), state=st, tasks=[])
|
||
|
|
mgr._registry._active["123456"] = run
|
||
|
|
st.ready.set()
|
||
|
|
assert mgr.live_consumption_if_active("123456") is None
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_resolve_voice_accepts_label_on_surgery_list_not_in_topk_options() -> None:
|
||
|
|
settings = Settings()
|
||
|
|
mgr = CameraSessionManager(
|
||
|
|
settings=settings,
|
||
|
|
hikvision_runtime=None,
|
||
|
|
result_repository=None,
|
||
|
|
)
|
||
|
|
st = SurgerySessionState(
|
||
|
|
candidate_consumables=["纱布", "止血钳"],
|
||
|
|
name_to_code={"纱布": "P1", "止血钳": "P2"},
|
||
|
|
)
|
||
|
|
pid = "test-confirm-id"
|
||
|
|
st.pending_by_id[pid] = PendingConsumableConfirmation(
|
||
|
|
id=pid,
|
||
|
|
status="pending",
|
||
|
|
options=[("纱布", 0.4)],
|
||
|
|
prompt_text="请确认",
|
||
|
|
created_at=datetime.now(timezone.utc),
|
||
|
|
model_top1_label="unknown",
|
||
|
|
model_top1_confidence=0.41,
|
||
|
|
)
|
||
|
|
st.pending_fifo.append(pid)
|
||
|
|
mgr._registry._active["123456"] = RunningSurgery(stop_event=asyncio.Event(), state=st, tasks=[])
|
||
|
|
|
||
|
|
await mgr.resolve_pending_confirmation("123456", pid, chosen_label="止血钳", rejected=False)
|
||
|
|
|
||
|
|
assert len(st.details) == 1
|
||
|
|
assert st.details[0].item_id == "P2"
|
||
|
|
assert st.details[0].item_name == "止血钳"
|
||
|
|
assert st.details[0].source == "voice"
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_resolve_pending_appends_voice_detail() -> None:
|
||
|
|
settings = Settings()
|
||
|
|
mgr = CameraSessionManager(
|
||
|
|
settings=settings,
|
||
|
|
hikvision_runtime=None,
|
||
|
|
result_repository=None,
|
||
|
|
)
|
||
|
|
st = SurgerySessionState(candidate_consumables=["纱布", "缝线"])
|
||
|
|
pid = "test-confirm-id"
|
||
|
|
st.pending_by_id[pid] = PendingConsumableConfirmation(
|
||
|
|
id=pid,
|
||
|
|
status="pending",
|
||
|
|
options=[("纱布", 0.4), ("缝线", 0.3)],
|
||
|
|
prompt_text="请确认",
|
||
|
|
created_at=datetime.now(timezone.utc),
|
||
|
|
model_top1_label="unknown",
|
||
|
|
model_top1_confidence=0.41,
|
||
|
|
)
|
||
|
|
st.pending_fifo.append(pid)
|
||
|
|
mgr._registry._active["123456"] = RunningSurgery(stop_event=asyncio.Event(), state=st, tasks=[])
|
||
|
|
|
||
|
|
await mgr.resolve_pending_confirmation("123456", pid, chosen_label="纱布", rejected=False)
|
||
|
|
|
||
|
|
assert len(st.details) == 1
|
||
|
|
assert st.details[0].item_name == "纱布"
|
||
|
|
assert st.details[0].source == "voice"
|
||
|
|
assert pid not in st.pending_by_id
|
||
|
|
assert st.pending_fifo == []
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_resolve_reject_keeps_pending_detail_and_queue() -> None:
|
||
|
|
settings = Settings()
|
||
|
|
mgr = CameraSessionManager(
|
||
|
|
settings=settings,
|
||
|
|
hikvision_runtime=None,
|
||
|
|
result_repository=None,
|
||
|
|
)
|
||
|
|
st = SurgerySessionState(candidate_consumables=["纱布"])
|
||
|
|
pid = "r1"
|
||
|
|
st.pending_by_id[pid] = PendingConsumableConfirmation(
|
||
|
|
id=pid,
|
||
|
|
status="pending",
|
||
|
|
options=[("纱布", 0.4)],
|
||
|
|
prompt_text="x",
|
||
|
|
created_at=datetime.now(timezone.utc),
|
||
|
|
model_top1_label="纱布",
|
||
|
|
model_top1_confidence=0.4,
|
||
|
|
)
|
||
|
|
st.pending_fifo.append(pid)
|
||
|
|
st.details.append(
|
||
|
|
SurgeryConsumptionStored(
|
||
|
|
item_id=f"pending:{pid}",
|
||
|
|
item_name="纱布(待确认)",
|
||
|
|
qty=1,
|
||
|
|
doctor_id="x",
|
||
|
|
timestamp=datetime.now(timezone.utc),
|
||
|
|
source="pending_confirmation",
|
||
|
|
pending_confirmation_id=pid,
|
||
|
|
),
|
||
|
|
)
|
||
|
|
mgr._registry._active["123456"] = RunningSurgery(stop_event=asyncio.Event(), state=st, tasks=[])
|
||
|
|
|
||
|
|
await mgr.resolve_pending_confirmation("123456", pid, chosen_label=None, rejected=True)
|
||
|
|
|
||
|
|
assert len(st.details) == 1
|
||
|
|
assert st.details[0].item_id == f"pending:{pid}"
|
||
|
|
assert st.details[0].item_name == "纱布(待确认)"
|
||
|
|
assert pid in st.pending_by_id
|
||
|
|
assert st.pending_fifo == [pid]
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_apply_segment_confirmed_event() -> None:
|
||
|
|
settings = Settings()
|
||
|
|
mgr = CameraSessionManager(
|
||
|
|
settings=settings,
|
||
|
|
hikvision_runtime=None,
|
||
|
|
result_repository=None,
|
||
|
|
)
|
||
|
|
st = SurgerySessionState(
|
||
|
|
candidate_consumables=["纱布"],
|
||
|
|
name_to_code={"纱布": "HC1"},
|
||
|
|
)
|
||
|
|
wall = 1_700_000_000.0
|
||
|
|
ev = {
|
||
|
|
"type": "segment_confirmed",
|
||
|
|
"cooldown_key": "k1",
|
||
|
|
"item_id": "HC1",
|
||
|
|
"item_name": "纱布",
|
||
|
|
"qty": 1,
|
||
|
|
"wall_start_epoch": wall - 1,
|
||
|
|
"wall_end_epoch": wall,
|
||
|
|
"camera_id": "cam01",
|
||
|
|
"top1_conf": 0.95,
|
||
|
|
"top2_name": "",
|
||
|
|
"top2_conf": 0.0,
|
||
|
|
"top3_name": "",
|
||
|
|
"top3_conf": 0.0,
|
||
|
|
}
|
||
|
|
await mgr._apply_algo_event("123456", st, ev)
|
||
|
|
assert len(st.details) == 1
|
||
|
|
assert st.details[0].item_name == "纱布"
|
||
|
|
assert st.details[0].source == "algo_subprocess"
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_apply_needs_voice_confirm_event(monkeypatch: pytest.MonkeyPatch) -> None:
|
||
|
|
monkeypatch.setattr(bp, "VIDEO_AUTO_CONFIRM_CONFIDENCE", 0.99)
|
||
|
|
settings = Settings()
|
||
|
|
mgr = CameraSessionManager(
|
||
|
|
settings=settings,
|
||
|
|
hikvision_runtime=None,
|
||
|
|
result_repository=None,
|
||
|
|
)
|
||
|
|
hub = MagicMock()
|
||
|
|
mgr.set_voice_terminal_hub(hub)
|
||
|
|
st = SurgerySessionState(
|
||
|
|
candidate_consumables=["缝线"],
|
||
|
|
name_to_code={},
|
||
|
|
voice_terminal_id="T1",
|
||
|
|
)
|
||
|
|
ev = {
|
||
|
|
"type": "needs_voice_confirm",
|
||
|
|
"confirmation_id": "cid-voice-1",
|
||
|
|
"model_top1_label": "纱布",
|
||
|
|
"model_top1_confidence": 0.91,
|
||
|
|
"options": [{"label": "纱布", "confidence": 0.91}, {"label": "缝线", "confidence": 0.05}],
|
||
|
|
"wall_start_epoch": 100.0,
|
||
|
|
"wall_end_epoch": 110.0,
|
||
|
|
"camera_id": "cam01",
|
||
|
|
"cls_top3": {
|
||
|
|
"t1_name": "纱布",
|
||
|
|
"t1_conf": 0.91,
|
||
|
|
"t2_name": "缝线",
|
||
|
|
"t2_conf": 0.05,
|
||
|
|
"t3_name": "",
|
||
|
|
"t3_conf": 0.0,
|
||
|
|
"t1_pid": "",
|
||
|
|
"t2_pid": "",
|
||
|
|
"t3_pid": "",
|
||
|
|
},
|
||
|
|
}
|
||
|
|
await mgr._apply_algo_event("123456", st, ev)
|
||
|
|
assert len(st.pending_fifo) == 1
|
||
|
|
assert st.pending_fifo[0] == "cid-voice-1"
|
||
|
|
assert len(st.details) == 1
|
||
|
|
assert st.details[0].item_name == "纱布(待确认)"
|
||
|
|
hub.schedule_notify_pending_head.assert_called_once()
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_archive_retry_loop_starts() -> None:
|
||
|
|
settings = Settings()
|
||
|
|
mgr = CameraSessionManager(
|
||
|
|
settings=settings,
|
||
|
|
hikvision_runtime=None,
|
||
|
|
result_repository=None,
|
||
|
|
)
|
||
|
|
await mgr.start_archive_retry_loop()
|
||
|
|
persister = mgr._archive
|
||
|
|
assert persister._retry_task is not None
|
||
|
|
await mgr.shutdown()
|
||
|
|
assert persister._retry_task is None
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_resolve_invalid_chosen_label() -> None:
|
||
|
|
settings = Settings()
|
||
|
|
mgr = CameraSessionManager(
|
||
|
|
settings=settings,
|
||
|
|
hikvision_runtime=None,
|
||
|
|
result_repository=None,
|
||
|
|
)
|
||
|
|
st = SurgerySessionState(candidate_consumables=["纱布"])
|
||
|
|
pid = "p1"
|
||
|
|
st.pending_by_id[pid] = PendingConsumableConfirmation(
|
||
|
|
id=pid,
|
||
|
|
status="pending",
|
||
|
|
options=[("纱布", 0.4)],
|
||
|
|
prompt_text="x",
|
||
|
|
created_at=datetime.now(timezone.utc),
|
||
|
|
model_top1_label="x",
|
||
|
|
model_top1_confidence=0.4,
|
||
|
|
)
|
||
|
|
st.pending_fifo.append(pid)
|
||
|
|
mgr._registry._active["123456"] = RunningSurgery(stop_event=asyncio.Event(), state=st, tasks=[])
|
||
|
|
with pytest.raises(SurgeryPipelineError) as excinfo:
|
||
|
|
await mgr.resolve_pending_confirmation("123456", pid, chosen_label="止血钳", rejected=False)
|
||
|
|
assert excinfo.value.code == "CONFIRMATION_INVALID"
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_resolve_not_active() -> None:
|
||
|
|
settings = Settings()
|
||
|
|
mgr = CameraSessionManager(
|
||
|
|
settings=settings,
|
||
|
|
hikvision_runtime=None,
|
||
|
|
result_repository=None,
|
||
|
|
)
|
||
|
|
with pytest.raises(SurgeryPipelineError) as excinfo:
|
||
|
|
await mgr.resolve_pending_confirmation("999999", "p1", chosen_label="纱布", rejected=False)
|
||
|
|
assert excinfo.value.code == "CONFIRMATION_NOT_ACTIVE"
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_resolve_second_time_not_found() -> None:
|
||
|
|
settings = Settings()
|
||
|
|
mgr = CameraSessionManager(
|
||
|
|
settings=settings,
|
||
|
|
hikvision_runtime=None,
|
||
|
|
result_repository=None,
|
||
|
|
)
|
||
|
|
st = SurgerySessionState(candidate_consumables=["纱布"])
|
||
|
|
pid = "p2"
|
||
|
|
st.pending_by_id[pid] = PendingConsumableConfirmation(
|
||
|
|
id=pid,
|
||
|
|
status="pending",
|
||
|
|
options=[("纱布", 0.4)],
|
||
|
|
prompt_text="x",
|
||
|
|
created_at=datetime.now(timezone.utc),
|
||
|
|
model_top1_label="x",
|
||
|
|
model_top1_confidence=0.4,
|
||
|
|
)
|
||
|
|
st.pending_fifo.append(pid)
|
||
|
|
mgr._registry._active["123456"] = RunningSurgery(stop_event=asyncio.Event(), state=st, tasks=[])
|
||
|
|
await mgr.resolve_pending_confirmation("123456", pid, chosen_label="纱布", rejected=False)
|
||
|
|
with pytest.raises(SurgeryPipelineError) as excinfo:
|
||
|
|
await mgr.resolve_pending_confirmation("123456", pid, chosen_label="纱布", rejected=False)
|
||
|
|
assert excinfo.value.code == "CONFIRMATION_NOT_FOUND"
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_resolve_already_resolved_status() -> None:
|
||
|
|
settings = Settings()
|
||
|
|
mgr = CameraSessionManager(
|
||
|
|
settings=settings,
|
||
|
|
hikvision_runtime=None,
|
||
|
|
result_repository=None,
|
||
|
|
)
|
||
|
|
st = SurgerySessionState(candidate_consumables=["纱布"])
|
||
|
|
pid = "p3"
|
||
|
|
pending = PendingConsumableConfirmation(
|
||
|
|
id=pid,
|
||
|
|
status="pending",
|
||
|
|
options=[("纱布", 0.4)],
|
||
|
|
prompt_text="x",
|
||
|
|
created_at=datetime.now(timezone.utc),
|
||
|
|
model_top1_label="x",
|
||
|
|
model_top1_confidence=0.4,
|
||
|
|
)
|
||
|
|
st.pending_by_id[pid] = pending
|
||
|
|
st.pending_fifo.append(pid)
|
||
|
|
mgr._registry._active["123456"] = RunningSurgery(stop_event=asyncio.Event(), state=st, tasks=[])
|
||
|
|
pending.status = "confirmed"
|
||
|
|
with pytest.raises(SurgeryPipelineError) as excinfo:
|
||
|
|
await mgr.resolve_pending_confirmation("123456", pid, chosen_label="纱布", rejected=False)
|
||
|
|
assert excinfo.value.code == "CONFIRMATION_ALREADY_RESOLVED"
|
||
|
|
|
||
|
|
|
||
|
|
def test_pending_queue_pending_count_fifo() -> None:
|
||
|
|
settings = Settings()
|
||
|
|
mgr = CameraSessionManager(
|
||
|
|
settings=settings,
|
||
|
|
hikvision_runtime=None,
|
||
|
|
result_repository=None,
|
||
|
|
)
|
||
|
|
st = SurgerySessionState(candidate_consumables=["纱布"])
|
||
|
|
for pid in ("p1", "p2"):
|
||
|
|
st.pending_by_id[pid] = PendingConsumableConfirmation(
|
||
|
|
id=pid,
|
||
|
|
status="pending",
|
||
|
|
options=[("纱布", 0.4)],
|
||
|
|
prompt_text="x",
|
||
|
|
created_at=datetime.now(timezone.utc),
|
||
|
|
model_top1_label="x",
|
||
|
|
model_top1_confidence=0.4,
|
||
|
|
)
|
||
|
|
st.pending_fifo.append(pid)
|
||
|
|
mgr._registry._active["123456"] = RunningSurgery(stop_event=asyncio.Event(), state=st, tasks=[])
|
||
|
|
assert mgr.pending_queue_pending_count("123456") == 2
|