Files
operating-room-monitor-server/backend/app/services/video/session_manager.py
Kevin d6f4590969 Clarify surgery result errors and expose offline batch timing.
Return specific codes when results are unavailable (not started vs in progress vs ended empty), block duplicate starts with SURGERY_ALREADY_RECORDING, and show text/video/total durations in the demo client.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-22 14:01:25 +08:00

597 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import asyncio
import json
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Literal
from loguru import logger
from sqlalchemy.ext.asyncio import async_sessionmaker
from app.algorithm_ipc.schema import WhitelistSpec, events_path_for_surgery, parse_event_obj
from app.baked import algorithm as ba
from app.baked import pipeline as bp
from app.config import Settings
from app.consumable_catalog import build_name_mapping, effective_candidate_consumables
from app.database import AsyncSessionLocal
from app.domain.consumption import SurgeryConsumptionStored
from app.domain.vision_prediction import ClsTop3, PredictionCandidate
from app.repositories.surgery_results import SurgeryResultRepository
from app.schemas import SurgeryConsumptionDetail, build_consumption_summary
from app.services.consumption_tsv_log import (
append_consumption_log_summary,
append_consumption_pending_window,
append_consumption_window,
init_consumption_log_file,
print_consumption_summary_markdown,
)
from app.services.video.archive_persister import ArchivePersister
from app.services.video.backend_resolver import BackendResolver
from app.services.video.hikvision_runtime import HikvisionInitRefCount, HikvisionRuntime
from app.services.video.session_registry import (
PendingConsumableConfirmation,
RunningSurgery,
SurgerySessionRegistry,
SurgerySessionState,
format_elapsed_mmss_since,
pending_display_item_name_for_confirmation,
)
from app.services.video.types import VideoBackendKind
from app.services.voice_file_log import init_voice_log_file
from app.services.voice_terminal_hub import VoiceTerminalHub
from app.surgery_errors import SurgeryPipelineError
__all__ = [
"CameraSessionManager",
"PendingConsumableConfirmation",
"RunningSurgery",
"SurgerySessionState",
]
LOGS_DIR = Path("logs")
def _safe_log_name(value: str) -> str:
out = "".join(ch if ch.isalnum() or ch in "._-" else "_" for ch in value.strip())
return out[:96] or "unknown"
class CameraSessionManager:
"""手术会话启动算法子进程RTSP+ JSONL tail 驱动内存明细与待确认队列。"""
def __init__(
self,
*,
settings: Settings,
hikvision_runtime: HikvisionRuntime | None,
result_repository: SurgeryResultRepository | None = None,
session_factory: async_sessionmaker | None = None,
registry: SurgerySessionRegistry | None = None,
archive_persister: ArchivePersister | None = None,
) -> None:
self._s = settings
self._hik = hikvision_runtime
self._session_factory: async_sessionmaker = session_factory or AsyncSessionLocal
self._resolver = BackendResolver(settings, hikvision_runtime=hikvision_runtime)
self._registry = registry or SurgerySessionRegistry()
self._archive = archive_persister or ArchivePersister(
repository=result_repository,
session_factory=self._session_factory,
)
self._voice_hub: VoiceTerminalHub | None = None
def set_voice_terminal_hub(self, hub: VoiceTerminalHub | None) -> None:
self._voice_hub = hub
def get_voice_terminal_id_if_active(self, surgery_id: str) -> str | None:
run = self._registry.get_running(surgery_id)
if run is None:
return None
tid = (run.state.voice_terminal_id or "").strip()
return tid or None
async def start_archive_retry_loop(self) -> None:
await self._archive.recover_from_durable_fallback()
await self._archive.start_retry_loop()
async def shutdown(self) -> None:
await self._archive.shutdown()
ids = self._registry.active_ids()
for sid in ids:
try:
await self.stop_surgery(sid, require_active=False)
except Exception as exc:
logger.warning("shutdown stop_surgery {}: {}", sid, exc)
def _whitelist_path(self, surgery_id: str) -> Path:
return (LOGS_DIR / f"surgery_{surgery_id}_whitelist.json").expanduser()
async def start_surgery(
self,
surgery_id: str,
camera_ids: list[str],
candidate_consumables: list[str],
) -> None:
if self._registry.has_active(surgery_id):
raise SurgeryPipelineError(
"SURGERY_ALREADY_RECORDING",
"该手术已在录制中,请勿重复开始。",
)
stale = await self._archive.take_archived_details(surgery_id)
if stale is not None:
logger.warning(
"surgery_id={} 仍有未落库归档,尝试写入数据库后再开始新会话",
surgery_id,
)
if self._archive.repository is None:
logger.error(
"surgery_id={} 有内存归档但未配置数据库仓库,无法持久化;开始新会话将丢弃该归档(仅开发/无库模式)",
surgery_id,
)
else:
ok = await self._archive.persist_or_archive(surgery_id, stale)
if not ok:
raise SurgeryPipelineError(
"RECORDING_CANNOT_START",
"该手术号存在尚未写入数据库的历史结果,请修复数据库或等待自动重试成功后再开始。",
)
resolved = effective_candidate_consumables(candidate_consumables)
if not resolved:
raise SurgeryPipelineError(
"RECORDING_CANNOT_START",
"耗材候选为空:请在请求中传入 candidate_consumables或提供有效的 consumable_classifier_labels.yaml。",
)
if not any(str(x).strip() for x in candidate_consumables):
logger.info(
"surgery {}: candidate_consumables 未提供,使用默认全量 {}",
surgery_id,
len(resolved),
)
name_to_code = build_name_mapping(resolved)
state = SurgerySessionState(
candidate_consumables=list(resolved),
name_to_code=name_to_code,
surgery_started_wall=time.time(),
)
stop_event = asyncio.Event()
events_path = events_path_for_surgery(surgery_id, base_dir=LOGS_DIR)
events_path.parent.mkdir(parents=True, exist_ok=True)
events_path.write_text("", encoding="utf-8")
wl = WhitelistSpec.from_session(resolved, name_to_code)
wp = self._whitelist_path(surgery_id)
wp.write_text(
json.dumps(wl.to_json_obj(), ensure_ascii=False, indent=2),
encoding="utf-8",
)
primary_cfg = (ba.ACTIONFORMER_PRIMARY_CAMERA_ID or "").strip()
if primary_cfg:
primary_cam = primary_cfg
else:
primary_cam = (camera_ids[0] if camera_ids else "").strip() or "cam01"
kind = self._resolver.backend_for_camera(primary_cam)
url, hik_uid, hik_retained = await self._resolve_rtsp_url(camera_id=primary_cam, kind=kind)
cmd = [
sys.executable,
"-m",
"app.algorithm_runner",
"--source",
url,
"--whitelist-json",
str(wp.resolve()),
"--events-jsonl",
str(events_path.resolve()),
"--wall-anchor",
str(state.surgery_started_wall or time.time()),
"--surgery-id",
surgery_id,
"--camera-id",
primary_cam,
"--source-mode",
"realtime",
]
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL,
)
tail_task = asyncio.create_task(
self._tail_algo_events(surgery_id, events_path, state, stop_event),
name=f"algo_tail:{surgery_id}",
)
run = RunningSurgery(stop_event=stop_event, state=state, tasks=[tail_task], algo_process=proc)
init_consumption_log_file(surgery_id)
init_voice_log_file(surgery_id)
await self._registry.register(surgery_id, run)
open_timeout = float(self._s.video_open_timeout_sec) + 5.0
try:
await asyncio.wait_for(self._wait_algo_ready(events_path), timeout=open_timeout)
state.ready.set()
except TimeoutError as exc:
logger.error("Surgery {} algorithm not ready within {}s", surgery_id, open_timeout)
await self._force_stop_subprocess(run, surgery_id)
tail_task.cancel()
try:
await tail_task
except asyncio.CancelledError:
pass
await self._registry.unregister(surgery_id)
raise SurgeryPipelineError(
"RECORDING_CANNOT_START",
"开录未能确认:算法子进程在超时内未就绪(未收到 ready 事件)。",
) from exc
except Exception:
await self._force_stop_subprocess(run, surgery_id)
tail_task.cancel()
try:
await tail_task
except asyncio.CancelledError:
pass
await self._registry.unregister(surgery_id)
raise
finally:
if hik_uid is not None and self._hik is not None:
await asyncio.to_thread(self._hik.logout, hik_uid)
if hik_retained and self._hik is not None:
HikvisionInitRefCount.release(self._hik)
async def _wait_algo_ready(self, events_path: Path) -> None:
deadline = time.monotonic() + float(self._s.video_open_timeout_sec) + 5.0
while time.monotonic() < deadline:
if events_path.is_file() and events_path.stat().st_size > 0:
try:
text = events_path.read_text(encoding="utf-8")
except OSError:
text = ""
for line in text.splitlines():
ev = parse_event_obj(line)
if ev and ev.get("type") == "ready":
return
if ev and ev.get("type") == "error":
raise SurgeryPipelineError(
"RECORDING_CANNOT_START",
f"算法子进程错误: {ev.get('message', ev)!s}",
)
await asyncio.sleep(0.05)
raise TimeoutError("algo ready")
async def _tail_algo_events(
self,
surgery_id: str,
path: Path,
state: SurgerySessionState,
stop_event: asyncio.Event,
) -> None:
offset = 0
try:
while not stop_event.is_set():
await asyncio.sleep(0.12)
if not path.is_file():
continue
try:
data = path.read_bytes()
except OSError:
continue
if len(data) < offset:
offset = 0
if len(data) <= offset:
continue
chunk = data[offset:].decode("utf-8", errors="replace")
offset = len(data)
for line in chunk.splitlines():
ev = parse_event_obj(line)
if ev:
await self._apply_algo_event(surgery_id, state, ev)
except asyncio.CancelledError:
pass
finally:
if path.is_file():
try:
data = path.read_bytes()
if len(data) > offset:
chunk = data[offset:].decode("utf-8", errors="replace")
for line in chunk.splitlines():
ev = parse_event_obj(line)
if ev:
await self._apply_algo_event(surgery_id, state, ev)
except Exception:
logger.debug("final algo tail drain skipped")
async def _apply_algo_event(self, surgery_id: str, state: SurgerySessionState, ev: dict) -> None:
t = ev.get("type")
if t == "ready" or t == "done":
return
if t == "error":
logger.warning("algo subprocess surgery={} error={}", surgery_id, ev.get("message"))
return
if t == "segment_confirmed":
if ev.get("frozen", True) is False:
logger.debug("skip provisional segment_confirmed surgery={}", surgery_id)
return
wall_hi = float(ev.get("wall_end_epoch") or time.time())
ts = datetime.fromtimestamp(wall_hi, tz=timezone.utc)
await self._registry.append_confirmed_detail(
state=state,
item_id=str(ev.get("item_id") or "unknown"),
item_name=str(ev.get("item_name") or "unknown"),
doctor_id=bp.VIDEO_RESULT_DOCTOR_ID,
source="algo_subprocess",
cooldown_key=str(ev.get("cooldown_key") or "") or None,
detail_timestamp=ts,
)
if bp.CONSUMPTION_TSV_LOG_ENABLED or bp.CONSUMPTION_LOG_MARKDOWN_TERMINAL:
snap = ClsTop3(
t1_name=str(ev.get("item_name") or ""),
t1_conf=float(ev.get("top1_conf") or 0.0),
t2_name=str(ev.get("top2_name") or ""),
t2_conf=float(ev.get("top2_conf") or 0.0),
t3_name=str(ev.get("top3_name") or ""),
t3_conf=float(ev.get("top3_conf") or 0.0),
t1_pid="",
t2_pid="",
t3_pid="",
)
append_consumption_window(
surgery_id=surgery_id,
name_to_code=state.name_to_code,
best=snap,
doctor_id=bp.VIDEO_RESULT_DOCTOR_ID,
camera_id=str(ev.get("camera_id") or ""),
wall_start_epoch=float(ev.get("wall_start_epoch") or wall_hi),
wall_end_epoch=wall_hi,
since_recording_start=format_elapsed_mmss_since(
state.surgery_started_wall,
at_epoch=wall_hi,
),
)
return
if t == "needs_voice_confirm":
if ev.get("frozen", True) is False:
logger.debug("skip provisional needs_voice_confirm surgery={}", surgery_id)
return
opts_raw = ev.get("options") or []
ranked: list[PredictionCandidate] = []
if isinstance(opts_raw, list):
for o in opts_raw:
if isinstance(o, dict):
ranked.append(
PredictionCandidate(
str(o.get("label") or "").strip(),
float(o.get("confidence") or 0.0),
)
)
cid_in = str(ev.get("confirmation_id") or "").strip()
ret = await self._registry.enqueue_pending_confirmation(
state,
ranked,
top_key=str(ev.get("model_top1_label") or ""),
top_confidence=float(ev.get("model_top1_confidence") or 0.0),
confirmation_id=cid_in or None,
)
if ret is None:
return
cls = ev.get("cls_top3") if isinstance(ev.get("cls_top3"), dict) else {}
snap = ClsTop3(
t1_name=str(cls.get("t1_name") or ""),
t1_conf=float(cls.get("t1_conf") or 0.0),
t2_name=str(cls.get("t2_name") or ""),
t2_conf=float(cls.get("t2_conf") or 0.0),
t3_name=str(cls.get("t3_name") or ""),
t3_conf=float(cls.get("t3_conf") or 0.0),
t1_pid=str(cls.get("t1_pid") or ""),
t2_pid=str(cls.get("t2_pid") or ""),
t3_pid=str(cls.get("t3_pid") or ""),
)
wall_lo = float(ev.get("wall_start_epoch") or time.time())
wall_hi = float(ev.get("wall_end_epoch") or time.time())
cam = str(ev.get("camera_id") or "")
await self._registry.append_pending_consumption_detail(
state=state,
confirmation_id=ret,
doctor_id=bp.VIDEO_RESULT_DOCTOR_ID,
)
pd_name = pending_display_item_name_for_confirmation(state.pending_by_id.get(ret))
append_consumption_pending_window(
surgery_id=surgery_id,
confirmation_id=ret,
model_snap=snap,
doctor_id=bp.VIDEO_RESULT_DOCTOR_ID,
camera_id=cam,
wall_start_epoch=wall_lo,
wall_end_epoch=wall_hi,
item_display_name=pd_name,
since_recording_start=format_elapsed_mmss_since(
state.surgery_started_wall,
at_epoch=wall_hi,
),
)
hub = self._voice_hub
vtid = (state.voice_terminal_id or "").strip()
if hub is not None and vtid:
hub.schedule_notify_pending_head(vtid, surgery_id)
async def _force_stop_subprocess(self, run: RunningSurgery, surgery_id: str) -> None:
run.stop_event.set()
proc = run.algo_process
if proc is None:
return
try:
proc.terminate()
await asyncio.wait_for(proc.wait(), timeout=12.0)
except asyncio.TimeoutError:
proc.kill()
except Exception as exc:
logger.warning("terminate algo subprocess surgery={}: {}", surgery_id, exc)
def set_voice_terminal_id(self, surgery_id: str, terminal_id: str | None) -> None:
run = self._registry.get_running(surgery_id)
if run is None:
return
tid = (terminal_id or "").strip()
run.state.voice_terminal_id = tid or None
async def stop_surgery(self, surgery_id: str, *, require_active: bool = True) -> str | None:
run = await self._registry.unregister(surgery_id)
if run is None:
if require_active:
raise SurgeryPipelineError(
"RECORDING_NOT_STOPPED",
"停录未能完成:当前没有该手术的活跃录制会话。",
)
return None
voice_tid = run.state.voice_terminal_id
run.stop_event.set()
if run.algo_process is not None:
try:
run.algo_process.terminate()
await asyncio.wait_for(run.algo_process.wait(), timeout=20.0)
except asyncio.TimeoutError:
run.algo_process.kill()
except ProcessLookupError:
pass
except Exception as exc:
logger.warning("algo subprocess wait surgery={}: {}", surgery_id, exc)
results = await asyncio.gather(*run.tasks, return_exceptions=True)
for res in results:
if isinstance(res, BaseException):
logger.warning("surgery task finished with error: {}", res)
details = list(run.state.details)
detail_rows = [
SurgeryConsumptionDetail(
item_id=r.item_id,
item_name=r.item_name,
qty=r.qty,
doctor_id=r.doctor_id,
timestamp=r.timestamp,
)
for r in details
]
summ = build_consumption_summary(detail_rows)
totals = {s.item_id: (s.item_name, s.total_quantity) for s in summ}
append_consumption_log_summary(surgery_id, totals)
print_consumption_summary_markdown(totals)
await self._archive.persist_or_archive(surgery_id, details)
return voice_tid
async def _resolve_rtsp_url(
self,
*,
camera_id: str,
kind: VideoBackendKind,
) -> tuple[str, int | None, bool]:
if kind != VideoBackendKind.HIKVISION_SDK:
return self._resolver.rtsp_url_for_camera(camera_id), None, False
if self._hik is None:
if self._s.hikvision_sdk_fallback_to_rtsp:
logger.warning(
"Hikvision SDK not loaded; fallback to RTSP for camera {}",
camera_id,
)
return self._resolver.rtsp_url_for_camera(camera_id), None, False
raise RuntimeError("Hikvision SDK requested but libhcnetsdk.so not loaded")
if not (
self._s.hikvision_device_ip.strip()
and self._s.hikvision_user.strip()
and self._s.hikvision_password.strip()
):
if self._s.hikvision_sdk_fallback_to_rtsp:
logger.warning(
"Hikvision credentials incomplete; fallback to RTSP for camera {}",
camera_id,
)
return self._resolver.rtsp_url_for_camera(camera_id), None, False
raise RuntimeError("Hikvision SDK requires HIKVISION_DEVICE_IP, user, password")
HikvisionInitRefCount.retain(self._hik)
hik = self._hik
assert hik is not None
try:
login = await asyncio.to_thread(
lambda: hik.login_v30(
ip=self._s.hikvision_device_ip.strip(),
port=int(self._s.hikvision_device_port),
username=self._s.hikvision_user.strip(),
password=self._s.hikvision_password.strip(),
)
)
except Exception as exc:
HikvisionInitRefCount.release(self._hik)
if self._s.hikvision_sdk_fallback_to_rtsp:
logger.warning("Hikvision login failed ({}); fallback to RTSP", exc)
return self._resolver.rtsp_url_for_camera(camera_id), None, False
raise
url = self._resolver.rtsp_url_after_hikvision_login(camera_id)
return url, login.user_id, True
def active_recording_phase(self, surgery_id: str) -> Literal["starting", "recording"] | None:
return self._registry.active_recording_phase(surgery_id)
def live_consumption_if_active(self, surgery_id: str) -> list[SurgeryConsumptionStored] | None:
return self._registry.live_consumption_if_active(surgery_id)
async def archived_consumption_fallback(
self,
surgery_id: str,
) -> list[SurgeryConsumptionStored] | None:
return await self._archive.archived_details_async(surgery_id)
def record_voice_trace(
self,
surgery_id: str,
*,
asr_text: str | None,
error: str | None,
) -> None:
self._registry.record_voice_trace(surgery_id, asr_text=asr_text, error=error)
def get_pending_confirmation_by_id(
self,
surgery_id: str,
confirmation_id: str,
) -> PendingConsumableConfirmation | None:
return self._registry.get_pending_confirmation_by_id(surgery_id, confirmation_id)
def get_surgery_candidate_consumables(self, surgery_id: str) -> list[str]:
return self._registry.get_surgery_candidate_consumables(surgery_id)
async def record_voice_parse_failure(self, surgery_id: str, confirmation_id: str) -> tuple[int, int]:
return await self._registry.record_voice_parse_failure(surgery_id, confirmation_id)
def next_pending_confirmation(self, surgery_id: str) -> PendingConsumableConfirmation | None:
return self._registry.next_pending_confirmation(surgery_id)
def pending_queue_pending_count(self, surgery_id: str) -> int:
return self._registry.pending_queue_pending_count(surgery_id)
def pending_queue_position_1based(self, surgery_id: str, confirmation_id: str) -> int | None:
return self._registry.pending_queue_position_1based(surgery_id, confirmation_id)
async def resolve_pending_confirmation(
self,
surgery_id: str,
confirmation_id: str,
*,
chosen_label: str | None,
rejected: bool,
) -> None:
await self._registry.resolve_pending_confirmation(
surgery_id,
confirmation_id,
chosen_label=chosen_label,
rejected=rejected,
)