Files
operating-room-monitor-server/backend/app/services/video/session_manager.py
Kevin 5bbc3903cb Fix Docker log permissions and harden live surgery operations.
Map bind-mounted logs to host UID/GID via entrypoint, expose RTSP prewarm in compose, suppress health-check access noise, and return 409 when another surgery is active with orphan auto-end sweep.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-26 15:36:09 +08:00

571 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import asyncio
import json
import time
from pathlib import Path
from typing import Literal
from loguru import logger
from sqlalchemy.ext.asyncio import async_sessionmaker
from app.algorithm_ipc.schema import WhitelistSpec
from app.algo_host.batch_service import BatchAlgorithmService
from app.baked import pipeline as bp
from app.config import Settings
from app.consumable_catalog import build_name_mapping, effective_candidate_consumables
from app.database import AsyncSessionLocal
from app.domain.consumption import SurgeryConsumptionStored
from app.repositories.surgery_results import SurgeryResultRepository
from app.schemas import SurgeryConsumptionDetail, build_consumption_summary
from app.services.consumption_tsv_log import (
append_consumption_log_summary,
init_consumption_log_file,
print_consumption_summary_markdown,
)
from app.services.video.archive_persister import ArchivePersister
from app.services.video.backend_resolver import BackendResolver
from app.services.video.hikvision_runtime import HikvisionInitRefCount, HikvisionRuntime
from app.services.video.recording_camera_policy import resolve_recording_cameras
from app.services.video.rtsp_segment_cleanup import (
default_rtsp_segments_root,
purge_expired_rtsp_segments,
)
from app.services.video.rtsp_prewarm import RtspPrewarmService
from app.services.video.rtsp_segment_recorder import (
RtspSegmentRecorder,
SegmentCompleteEvent,
rtsp_segments_dir,
)
from app.services.video.session_registry import (
PendingConsumableConfirmation,
RunningSurgery,
SurgerySessionRegistry,
SurgerySessionState,
)
from app.services.video.slice_batch_processor import SliceBatchProcessor
from app.services.video.types import VideoBackendKind
from app.services.voice_file_log import init_voice_log_file
from app.services.voice_terminal_hub import VoiceTerminalHub
from app.surgery_errors import SurgeryPipelineError
__all__ = [
"CameraSessionManager",
"PendingConsumableConfirmation",
"RunningSurgery",
"SurgerySessionState",
]
LOGS_DIR = Path("logs")
def _safe_log_name(value: str) -> str:
out = "".join(ch if ch.isalnum() or ch in "._-" else "_" for ch in value.strip())
return out[:96] or "unknown"
class CameraSessionManager:
"""手术会话RTSP 录像切片 + batch 子进程5.15/main.py驱动明细与待确认队列。"""
def __init__(
self,
*,
settings: Settings,
hikvision_runtime: HikvisionRuntime | None,
slice_batch_processor: SliceBatchProcessor | None = None,
result_repository: SurgeryResultRepository | None = None,
session_factory: async_sessionmaker | None = None,
registry: SurgerySessionRegistry | None = None,
archive_persister: ArchivePersister | None = None,
) -> None:
self._s = settings
self._hik = hikvision_runtime
self._session_factory: async_sessionmaker = session_factory or AsyncSessionLocal
self._resolver = BackendResolver(settings, hikvision_runtime=hikvision_runtime)
self._registry = registry or SurgerySessionRegistry()
self._slice_batch = slice_batch_processor or SliceBatchProcessor(
batch_service=BatchAlgorithmService(),
registry=self._registry,
max_concurrent=int(settings.rtsp_slice_batch_max_concurrent),
drain_timeout_sec=float(settings.rtsp_slice_batch_drain_timeout_sec),
)
self._archive = archive_persister or ArchivePersister(
repository=result_repository,
session_factory=self._session_factory,
)
self._voice_hub: VoiceTerminalHub | None = None
self._rtsp_prewarm: RtspPrewarmService | None = None
self._rtsp_ttl_stop = asyncio.Event()
self._rtsp_ttl_task: asyncio.Task[None] | None = None
self._orphan_sweep_stop = asyncio.Event()
self._orphan_sweep_task: asyncio.Task[None] | None = None
async def _purge_expired_rtsp_segments(self) -> None:
ttl = float(self._s.rtsp_segment_ttl_hours or bp.RTSP_SEGMENT_TTL_HOURS)
root = default_rtsp_segments_root(base_dir=LOGS_DIR)
await asyncio.to_thread(purge_expired_rtsp_segments, root, ttl_hours=ttl)
async def _rtsp_segment_ttl_loop(self) -> None:
interval = max(3600.0, float(bp.RTSP_SEGMENT_TTL_HOURS) * 3600.0 / 4.0)
while not self._rtsp_ttl_stop.is_set():
try:
await asyncio.wait_for(self._rtsp_ttl_stop.wait(), timeout=interval)
break
except TimeoutError:
pass
try:
await self._purge_expired_rtsp_segments()
except Exception as exc:
logger.warning("RTSP segment TTL sweep failed: {}", exc)
async def _sweep_orphan_surgeries_once(self) -> None:
max_h = float(bp.ORPHAN_SURGERY_AUTO_END_HOURS)
for sid in list(self._registry.surgeries_exceeding_duration_hours(max_h)):
logger.warning(
"孤儿手术 {} 开录已超过 {:.0f} 小时,自动停录",
sid,
max_h,
)
try:
voice_tid = await self.stop_surgery(sid, require_active=False)
except Exception as exc:
logger.error("孤儿手术 {} 自动停录失败: {}", sid, exc)
continue
if voice_tid and self._voice_hub is not None:
try:
await self._voice_hub.notify_end(voice_tid, sid)
except Exception as exc:
logger.warning(
"孤儿手术 {} 自动停录后语音终端 end 推送失败: {}",
sid,
exc,
)
async def _orphan_surgery_sweep_loop(self) -> None:
interval = max(60.0, float(bp.ORPHAN_SURGERY_SWEEP_INTERVAL_SEC))
while not self._orphan_sweep_stop.is_set():
try:
await asyncio.wait_for(self._orphan_sweep_stop.wait(), timeout=interval)
break
except TimeoutError:
pass
try:
await self._sweep_orphan_surgeries_once()
except Exception as exc:
logger.warning("孤儿手术巡检失败: {}", exc)
def set_voice_terminal_hub(self, hub: VoiceTerminalHub | None) -> None:
self._voice_hub = hub
self._slice_batch.set_voice_terminal_hub(hub)
def set_rtsp_prewarm_service(self, prewarm: RtspPrewarmService | None) -> None:
self._rtsp_prewarm = prewarm
def get_voice_terminal_id_if_active(self, surgery_id: str) -> str | None:
run = self._registry.get_running(surgery_id)
if run is None:
return None
tid = (run.state.voice_terminal_id or "").strip()
return tid or None
async def start_archive_retry_loop(self) -> None:
await self._archive.recover_from_durable_fallback()
await self._archive.start_retry_loop()
await self._purge_expired_rtsp_segments()
if self._rtsp_ttl_task is None or self._rtsp_ttl_task.done():
self._rtsp_ttl_stop.clear()
self._rtsp_ttl_task = asyncio.create_task(
self._rtsp_segment_ttl_loop(),
name="rtsp_segment_ttl",
)
if self._orphan_sweep_task is None or self._orphan_sweep_task.done():
self._orphan_sweep_stop.clear()
self._orphan_sweep_task = asyncio.create_task(
self._orphan_surgery_sweep_loop(),
name="orphan_surgery_sweep",
)
async def shutdown(self) -> None:
self._rtsp_ttl_stop.set()
if self._rtsp_ttl_task is not None:
self._rtsp_ttl_task.cancel()
try:
await self._rtsp_ttl_task
except asyncio.CancelledError:
pass
self._rtsp_ttl_task = None
self._orphan_sweep_stop.set()
if self._orphan_sweep_task is not None:
self._orphan_sweep_task.cancel()
try:
await self._orphan_sweep_task
except asyncio.CancelledError:
pass
self._orphan_sweep_task = None
await self._archive.shutdown()
ids = self._registry.active_ids()
for sid in ids:
try:
await self.stop_surgery(sid, require_active=False)
except Exception as exc:
logger.warning("shutdown stop_surgery {}: {}", sid, exc)
def _whitelist_path(self, surgery_id: str) -> Path:
return (LOGS_DIR / f"surgery_{surgery_id}_whitelist.json").expanduser()
async def start_surgery(
self,
surgery_id: str,
camera_ids: list[str],
candidate_consumables: list[str],
) -> None:
if self._registry.has_active(surgery_id):
raise SurgeryPipelineError(
"SURGERY_ALREADY_RECORDING",
"该手术已在录制中,请勿重复开始。",
)
other_active = self._registry.other_active_surgery_id(surgery_id)
if other_active is not None:
raise SurgeryPipelineError(
"SURGERY_IN_PROGRESS",
f"手术ID {other_active} 正在进行中,请先结束后再开始新手术。",
extra={"active_surgery_id": other_active},
)
stale = await self._archive.take_archived_details(surgery_id)
if stale is not None:
logger.warning(
"surgery_id={} 仍有未落库归档,尝试写入数据库后再开始新会话",
surgery_id,
)
if self._archive.repository is None:
logger.error(
"surgery_id={} 有内存归档但未配置数据库仓库,无法持久化;开始新会话将丢弃该归档(仅开发/无库模式)",
surgery_id,
)
else:
ok = await self._archive.persist_or_archive(surgery_id, stale)
if not ok:
raise SurgeryPipelineError(
"RECORDING_CANNOT_START",
"该手术号存在尚未写入数据库的历史结果,请修复数据库或等待自动重试成功后再开始。",
)
resolved = effective_candidate_consumables(candidate_consumables)
if not resolved:
raise SurgeryPipelineError(
"RECORDING_CANNOT_START",
"耗材候选为空:请在请求中传入 candidate_consumables或提供有效的 consumable_classifier_labels.yaml。",
)
if not any(str(x).strip() for x in candidate_consumables):
logger.info(
"surgery {}: candidate_consumables 未提供,使用默认全量 {}",
surgery_id,
len(resolved),
)
name_to_code = build_name_mapping(resolved)
state = SurgerySessionState(
candidate_consumables=list(resolved),
name_to_code=name_to_code,
surgery_started_wall=time.time(),
)
stop_event = asyncio.Event()
wl = WhitelistSpec.from_session(resolved, name_to_code)
wp = self._whitelist_path(surgery_id)
wp.parent.mkdir(parents=True, exist_ok=True)
wp.write_text(
json.dumps(wl.to_json_obj(), ensure_ascii=False, indent=2),
encoding="utf-8",
)
try:
record_cameras = resolve_recording_cameras(
camera_ids,
self._s,
resolver=self._resolver,
)
except ValueError as exc:
raise SurgeryPipelineError(
"RECORDING_CANNOT_START",
str(exc),
) from exc
primary_cam = record_cameras[0]
ready_event = asyncio.Event()
hik_logouts: list[tuple[int, bool]] = []
t_start = time.monotonic()
for cam in record_cameras:
if self._rtsp_prewarm is not None:
await self._rtsp_prewarm.pause(cam)
t_resolve = time.monotonic()
async def on_segment(event: SegmentCompleteEvent) -> None:
await self._slice_batch.submit_slice(
surgery_id=surgery_id,
event=event,
candidate_consumables=list(resolved),
state=state,
)
recorder_tasks: list[asyncio.Task[None]] = []
for cam in record_cameras:
kind = self._resolver.backend_for_camera(cam)
url, hik_uid, hik_retained = await self._resolve_rtsp_url(camera_id=cam, kind=kind)
if hik_uid is not None:
hik_logouts.append((hik_uid, hik_retained))
out_dir = rtsp_segments_dir(surgery_id) / _safe_log_name(cam)
recorder = RtspSegmentRecorder(
surgery_id=surgery_id,
camera_id=cam,
rtsp_url=url,
output_dir=out_dir,
segment_duration_sec=self._s.rtsp_segment_duration_sec,
segment_min_sec=self._s.rtsp_segment_min_sec,
on_segment_complete=on_segment,
ready_event=ready_event if cam == primary_cam else None,
)
recorder_tasks.append(
asyncio.create_task(
recorder.run(stop_event),
name=f"rtsp_recorder:{surgery_id}:{cam}",
)
)
resolve_ms = (time.monotonic() - t_resolve) * 1000.0
t_spawn = time.monotonic()
self._slice_batch.ensure_worker(surgery_id)
run = RunningSurgery(
stop_event=stop_event,
state=state,
tasks=recorder_tasks,
record_camera_ids=list(record_cameras),
algo_process=None,
)
init_consumption_log_file(surgery_id)
init_voice_log_file(surgery_id)
await self._registry.register(surgery_id, run)
spawn_ms = (time.monotonic() - t_spawn) * 1000.0
t_ready = time.monotonic()
prewarm_was_warm = (
self._rtsp_prewarm.was_warm(primary_cam) if self._rtsp_prewarm is not None else False
)
open_timeout = float(self._s.video_open_timeout_sec) + 5.0
started_ok = False
try:
await asyncio.wait_for(ready_event.wait(), timeout=open_timeout)
state.ready.set()
started_ok = True
ready_ms = (time.monotonic() - t_ready) * 1000.0
total_ms = (time.monotonic() - t_start) * 1000.0
logger.info(
"RTSP start_surgery timing surgery={} camera={} prewarm_was_warm={} "
"resolve_ms={:.0f} spawn_ms={:.0f} ready_ms={:.0f} total_ms={:.0f}",
surgery_id,
primary_cam,
prewarm_was_warm,
resolve_ms,
spawn_ms,
ready_ms,
total_ms,
)
except TimeoutError as exc:
logger.error(
"Surgery {} RTSP recorder not ready within {}s",
surgery_id,
open_timeout,
)
await self._force_stop_run(run, surgery_id)
await self._registry.unregister(surgery_id)
raise SurgeryPipelineError(
"RECORDING_CANNOT_START",
"开录未能确认RTSP 录像在超时内未就绪。",
) from exc
except Exception:
await self._force_stop_run(run, surgery_id)
await self._registry.unregister(surgery_id)
raise
finally:
for hik_uid, hik_retained in hik_logouts:
if self._hik is not None:
await asyncio.to_thread(self._hik.logout, hik_uid)
if hik_retained and self._hik is not None:
HikvisionInitRefCount.release(self._hik)
if not started_ok and self._rtsp_prewarm is not None:
for cam in record_cameras:
await self._rtsp_prewarm.resume(cam)
async def _force_stop_run(self, run: RunningSurgery, surgery_id: str) -> None:
run.stop_event.set()
await self._slice_batch.drain(surgery_id, timeout=30.0)
for task in run.tasks:
task.cancel()
if run.tasks:
await asyncio.gather(*run.tasks, return_exceptions=True)
def set_voice_terminal_id(self, surgery_id: str, terminal_id: str | None) -> None:
run = self._registry.get_running(surgery_id)
if run is None:
return
tid = (terminal_id or "").strip()
run.state.voice_terminal_id = tid or None
async def stop_surgery(self, surgery_id: str, *, require_active: bool = True) -> str | None:
run = await self._registry.unregister(surgery_id)
if run is None:
if require_active:
raise SurgeryPipelineError(
"RECORDING_NOT_STOPPED",
"停录未能完成:当前没有该手术的活跃录制会话。",
)
return None
voice_tid = run.state.voice_terminal_id
run.stop_event.set()
results = await asyncio.gather(*run.tasks, return_exceptions=True)
for res in results:
if isinstance(res, BaseException):
logger.warning("surgery recorder task finished with error: {}", res)
await self._slice_batch.drain(
surgery_id,
timeout=self._s.rtsp_slice_batch_drain_timeout_sec,
)
details = list(run.state.details)
detail_rows = [
SurgeryConsumptionDetail(
item_id=r.item_id,
item_name=r.item_name,
qty=r.qty,
doctor_id=r.doctor_id,
timestamp=r.timestamp,
)
for r in details
]
summ = build_consumption_summary(detail_rows)
totals = {s.item_id: (s.item_name, s.total_quantity) for s in summ}
append_consumption_log_summary(surgery_id, totals)
print_consumption_summary_markdown(totals)
await self._archive.persist_or_archive(surgery_id, details)
if self._rtsp_prewarm is not None:
for cam in run.record_camera_ids:
await self._rtsp_prewarm.resume(cam)
return voice_tid
async def _resolve_rtsp_url(
self,
*,
camera_id: str,
kind: VideoBackendKind,
) -> tuple[str, int | None, bool]:
if kind != VideoBackendKind.HIKVISION_SDK:
return self._resolver.rtsp_url_for_camera(camera_id), None, False
if self._hik is None:
if self._s.hikvision_sdk_fallback_to_rtsp:
logger.warning(
"Hikvision SDK not loaded; fallback to RTSP for camera {}",
camera_id,
)
return self._resolver.rtsp_url_for_camera(camera_id), None, False
raise RuntimeError("Hikvision SDK requested but libhcnetsdk.so not loaded")
if not (
self._s.hikvision_device_ip.strip()
and self._s.hikvision_user.strip()
and self._s.hikvision_password.strip()
):
if self._s.hikvision_sdk_fallback_to_rtsp:
logger.warning(
"Hikvision credentials incomplete; fallback to RTSP for camera {}",
camera_id,
)
return self._resolver.rtsp_url_for_camera(camera_id), None, False
raise RuntimeError("Hikvision SDK requires HIKVISION_DEVICE_IP, user, password")
HikvisionInitRefCount.retain(self._hik)
hik = self._hik
assert hik is not None
try:
login = await asyncio.to_thread(
lambda: hik.login_v30(
ip=self._s.hikvision_device_ip.strip(),
port=int(self._s.hikvision_device_port),
username=self._s.hikvision_user.strip(),
password=self._s.hikvision_password.strip(),
)
)
except Exception as exc:
HikvisionInitRefCount.release(self._hik)
if self._s.hikvision_sdk_fallback_to_rtsp:
logger.warning("Hikvision login failed ({}); fallback to RTSP", exc)
return self._resolver.rtsp_url_for_camera(camera_id), None, False
raise
url = self._resolver.rtsp_url_after_hikvision_login(camera_id)
return url, login.user_id, True
def active_recording_phase(self, surgery_id: str) -> Literal["starting", "recording"] | None:
return self._registry.active_recording_phase(surgery_id)
def live_consumption_if_active(self, surgery_id: str) -> list[SurgeryConsumptionStored] | None:
return self._registry.live_consumption_if_active(surgery_id)
async def archived_consumption_fallback(
self,
surgery_id: str,
) -> list[SurgeryConsumptionStored] | None:
return await self._archive.archived_details_async(surgery_id)
def record_voice_trace(
self,
surgery_id: str,
*,
asr_text: str | None,
error: str | None,
) -> None:
self._registry.record_voice_trace(surgery_id, asr_text=asr_text, error=error)
def get_pending_confirmation_by_id(
self,
surgery_id: str,
confirmation_id: str,
) -> PendingConsumableConfirmation | None:
return self._registry.get_pending_confirmation_by_id(surgery_id, confirmation_id)
def get_surgery_candidate_consumables(self, surgery_id: str) -> list[str]:
return self._registry.get_surgery_candidate_consumables(surgery_id)
async def record_voice_parse_failure(self, surgery_id: str, confirmation_id: str) -> tuple[int, int]:
return await self._registry.record_voice_parse_failure(surgery_id, confirmation_id)
def next_pending_confirmation(self, surgery_id: str) -> PendingConsumableConfirmation | None:
return self._registry.next_pending_confirmation(surgery_id)
def pending_queue_pending_count(self, surgery_id: str) -> int:
return self._registry.pending_queue_pending_count(surgery_id)
def pending_queue_position_1based(self, surgery_id: str, confirmation_id: str) -> int | None:
return self._registry.pending_queue_position_1based(surgery_id, confirmation_id)
async def resolve_pending_confirmation(
self,
surgery_id: str,
confirmation_id: str,
*,
chosen_label: str | None,
rejected: bool,
) -> None:
await self._registry.resolve_pending_confirmation(
surgery_id,
confirmation_id,
chosen_label=chosen_label,
rejected=rejected,
)