from __future__ import annotations import asyncio import time from loguru import logger from app.config import Settings from sqlalchemy.ext.asyncio import async_sessionmaker from app.database import AsyncSessionLocal from app.domain.consumption import SurgeryConsumptionStored from app.repositories.surgery_results import SurgeryResultRepository from app.services.consumable_vision_algorithm import ( ConsumableVisionAlgorithmService, PredictionResult, ) from app.services.video.archive_persister import ArchivePersister from app.services.video.backend_resolver import BackendResolver from app.services.video.classification_handler import VisionClassificationHandler from app.services.video.hikvision_runtime import HikvisionInitRefCount, HikvisionRuntime from app.services.video.inference_aggregator import WindowInferenceAggregator from app.services.video.session_registry import ( CameraStreamInferState, PendingConsumableConfirmation, RunningSurgery, SurgerySessionRegistry, SurgerySessionState, ) from app.services.video.stream_worker import CameraStreamWorker, redact_rtsp_url from app.services.video.types import VideoBackendKind from app.services.consumption_tsv_log import ( append_consumption_log_summary, init_consumption_log_file, print_consumption_summary_markdown, ) from app.services.voice_file_log import init_voice_log_file from app.surgery_errors import SurgeryPipelineError __all__ = [ "CameraSessionManager", "CameraStreamInferState", "PendingConsumableConfirmation", "RunningSurgery", "SurgerySessionState", ] class CameraSessionManager: """Per-surgery camera orchestration. 本类负责: 1. 开始/停止手术:创建 `SurgerySessionState`、拉起相机 worker、停录时收尾。 2. 把「语音确认所需的内存态」委托给 ``SurgerySessionRegistry``(实现 `PendingConfirmationStore`)。 3. 把「结果写库 + 失败重试 + durable fallback」委托给 ``ArchivePersister``。 对外接口保持不变,上游(``SurgeryPipeline`` / ``VoiceConfirmationService``)无需感知拆分。 """ def __init__( self, *, settings: Settings, vision_algorithm: ConsumableVisionAlgorithmService, hikvision_runtime: HikvisionRuntime | None, result_repository: SurgeryResultRepository | None = None, session_factory: async_sessionmaker | None = None, registry: SurgerySessionRegistry | None = None, archive_persister: ArchivePersister | None = None, ) -> None: self._s = settings self._vision = vision_algorithm self._hik = hikvision_runtime self._session_factory: async_sessionmaker = session_factory or AsyncSessionLocal self._resolver = BackendResolver(settings, hikvision_runtime=hikvision_runtime) self._registry = registry or SurgerySessionRegistry(settings=settings) self._archive = archive_persister or ArchivePersister( settings=settings, repository=result_repository, session_factory=self._session_factory, ) self._aggregator = WindowInferenceAggregator(settings=settings) self._classifier_handler = VisionClassificationHandler( settings=settings, registry=self._registry, ) # ------------------------------------------------------------------ # 生命周期 # ------------------------------------------------------------------ async def start_archive_retry_loop(self) -> None: await self._archive.recover_from_durable_fallback() await self._archive.start_retry_loop() async def shutdown(self) -> None: await self._archive.shutdown() ids = self._registry.active_ids() for sid in ids: try: await self.stop_surgery(sid, require_active=False) except Exception as exc: logger.warning("shutdown stop_surgery {}: {}", sid, exc) # ------------------------------------------------------------------ # Surgery start / stop # ------------------------------------------------------------------ async def start_surgery( self, surgery_id: str, camera_ids: list[str], candidate_consumables: list[str], ) -> None: if self._registry.has_active(surgery_id): raise SurgeryPipelineError( "RECORDING_CANNOT_START", "该手术已在录制中,请勿重复开始。", ) stale = await self._archive.take_archived_details(surgery_id) if stale is not None: logger.warning( "surgery_id={} 仍有未落库归档,尝试写入数据库后再开始新会话", surgery_id, ) if self._archive.repository is None: logger.error( "surgery_id={} 有内存归档但未配置数据库仓库,无法持久化;" "开始新会话将丢弃该归档(仅开发/无库模式)", surgery_id, ) else: ok = await self._archive.persist_or_archive(surgery_id, stale) if not ok: raise SurgeryPipelineError( "RECORDING_CANNOT_START", "该手术号存在尚未写入数据库的历史结果,请修复数据库或等待自动重试成功后再开始。", ) resolved = self._vision.effective_candidate_consumables(candidate_consumables) if not resolved: raise SurgeryPipelineError( "RECORDING_CANNOT_START", "耗材候选为空:请在请求中传入 candidate_consumables,或提供有效的 consumable_classifier_labels.yaml / 分类模型。", ) if not any(str(x).strip() for x in candidate_consumables): logger.info( "surgery {}: candidate_consumables 未提供,使用默认全量 {} 项", surgery_id, len(resolved), ) name_to_code = self._vision.build_name_mapping(resolved) state = SurgerySessionState( candidate_consumables=list(resolved), name_to_code=name_to_code, ) stop_event = asyncio.Event() readies = [asyncio.Event() for _ in camera_ids] tasks: list[asyncio.Task[None]] = [] open_timeout = self._s.video_open_timeout_sec + 5.0 for cam_id, ready in zip(camera_ids, readies, strict=True): tasks.append( asyncio.create_task( self._camera_worker( surgery_id=surgery_id, camera_id=cam_id, stream_ready=ready, stop_event=stop_event, state=state, ), name=f"camera:{surgery_id}:{cam_id}", ) ) run = RunningSurgery(stop_event=stop_event, state=state, tasks=tasks) init_consumption_log_file(surgery_id) init_voice_log_file(surgery_id, self._s) await self._registry.register(surgery_id, run) try: await asyncio.wait_for( asyncio.gather(*(r.wait() for r in readies)), timeout=open_timeout, ) state.ready.set() except TimeoutError as exc: logger.error( "Surgery {} cameras not all ready within {}s", surgery_id, open_timeout, ) await self.stop_surgery(surgery_id, require_active=True) raise SurgeryPipelineError( "RECORDING_CANNOT_START", "开录未能确认:部分摄像头在超时内未成功拉到首帧。", ) from exc except Exception: await self.stop_surgery(surgery_id, require_active=True) raise async def stop_surgery(self, surgery_id: str, *, require_active: bool = True) -> None: run = await self._registry.unregister(surgery_id) if run is None: if require_active: raise SurgeryPipelineError( "RECORDING_NOT_STOPPED", "停录未能完成:当前没有该手术的活跃录制会话。", ) return run.stop_event.set() results = await asyncio.gather(*run.tasks, return_exceptions=True) for res in results: if isinstance(res, BaseException): logger.warning("surgery task finished with error: {}", res) totals = dict(run.state.consumption_log_totals) append_consumption_log_summary(surgery_id, totals) print_consumption_summary_markdown(totals) details = list(run.state.details) await self._archive.persist_or_archive(surgery_id, details) # ------------------------------------------------------------------ # PendingConfirmationStore 协议委托 # ------------------------------------------------------------------ def live_consumption_if_active( self, surgery_id: str ) -> list[SurgeryConsumptionStored] | None: return self._registry.live_consumption_if_active(surgery_id) def archived_consumption_fallback( self, surgery_id: str ) -> list[SurgeryConsumptionStored] | None: return self._archive.archived_details(surgery_id) def record_voice_trace( self, surgery_id: str, *, asr_text: str | None, error: str | None, ) -> None: self._registry.record_voice_trace(surgery_id, asr_text=asr_text, error=error) def get_pending_confirmation_by_id( self, surgery_id: str, confirmation_id: str, ) -> PendingConsumableConfirmation | None: return self._registry.get_pending_confirmation_by_id(surgery_id, confirmation_id) def get_surgery_candidate_consumables(self, surgery_id: str) -> list[str]: return self._registry.get_surgery_candidate_consumables(surgery_id) async def record_voice_parse_failure( self, surgery_id: str, confirmation_id: str ) -> tuple[int, int]: return await self._registry.record_voice_parse_failure(surgery_id, confirmation_id) def next_pending_confirmation( self, surgery_id: str ) -> PendingConsumableConfirmation | None: return self._registry.next_pending_confirmation(surgery_id) async def resolve_pending_confirmation( self, surgery_id: str, confirmation_id: str, *, chosen_label: str | None, rejected: bool, ) -> None: await self._registry.resolve_pending_confirmation( surgery_id, confirmation_id, chosen_label=chosen_label, rejected=rejected, ) # ------------------------------------------------------------------ # Camera worker(拉流 + 推理节流 + 时间窗分桶 + 分类结果处理) # ------------------------------------------------------------------ async def _camera_worker( self, *, surgery_id: str, camera_id: str, stream_ready: asyncio.Event, stop_event: asyncio.Event, state: SurgerySessionState, ) -> None: kind = self._resolver.backend_for_camera(camera_id) hik_user_id: int | None = None hik_init_retained = False try: url, hik_user_id, hik_init_retained = await self._resolve_rtsp_url( camera_id=camera_id, kind=kind, ) assert url is not None last_infer = 0.0 async def _frame_handler(frame: object) -> None: nonlocal last_infer now = time.monotonic() if now - last_infer < self._s.video_inference_interval_sec: await asyncio.sleep(0.01) return last_infer = now try: snap = await asyncio.to_thread( self._vision.infer_frame_bgr, frame, state.name_to_code, ) except Exception as exc: logger.debug( "Inference skip camera={} surgery={}: {}", camera_id, surgery_id, exc, ) return if snap is None: return if self._s.video_log_inference_results: logger.info( "Vision result surgery={} camera={} top1={}({:.3f}) top2={}({:.3f}) top3={}({:.3f})", surgery_id, camera_id, snap.t1_name, snap.t1_conf, snap.t2_name, snap.t2_conf, snap.t3_name, snap.t3_conf, ) async with state.lock: ready_windows = self._aggregator.ingest_snapshot_and_collect_ready( surgery_id=surgery_id, camera_id=camera_id, snap=snap, state=state, ) for win in ready_windows: await self._classifier_handler.handle( state=state, cls_res=win.prediction, ready=win, surgery_id=surgery_id, camera_id=camera_id, ) worker = CameraStreamWorker( settings=self._s, surgery_id=surgery_id, camera_id=camera_id, url=url, ) await worker.run( stream_ready=stream_ready, stop_event=stop_event, frame_handler=_frame_handler, ) finally: if hik_user_id is not None and self._hik is not None: await asyncio.to_thread(self._hik.logout, hik_user_id) if hik_init_retained and self._hik is not None: HikvisionInitRefCount.release(self._hik) async def _handle_classification_result( self, *, state: SurgerySessionState, cls_res: PredictionResult, ) -> None: """Deprecated test-shim:沿用旧签名,转发给 ``VisionClassificationHandler``。 保留此方法是因为单元测试直接调用了它。新代码应使用 ``self._classifier_handler.handle``。 """ await self._classifier_handler.handle(state=state, cls_res=cls_res) async def _resolve_rtsp_url( self, *, camera_id: str, kind: VideoBackendKind, ) -> tuple[str, int | None, bool]: """Returns (url, hikvision_user_id, whether NET_DVR_Init refcount was retained).""" if kind != VideoBackendKind.HIKVISION_SDK: return self._resolver.rtsp_url_for_camera(camera_id), None, False if self._hik is None: if self._s.hikvision_sdk_fallback_to_rtsp: logger.warning( "Hikvision SDK not loaded; fallback to RTSP for camera {}", camera_id, ) return self._resolver.rtsp_url_for_camera(camera_id), None, False raise RuntimeError("Hikvision SDK requested but libhcnetsdk.so not loaded") if not ( self._s.hikvision_device_ip.strip() and self._s.hikvision_user.strip() and self._s.hikvision_password.strip() ): if self._s.hikvision_sdk_fallback_to_rtsp: logger.warning( "Hikvision credentials incomplete; fallback to RTSP for camera {}", camera_id, ) return self._resolver.rtsp_url_for_camera(camera_id), None, False raise RuntimeError("Hikvision SDK requires HIKVISION_DEVICE_IP, user, password") HikvisionInitRefCount.retain(self._hik) try: login = await asyncio.to_thread( lambda: self._hik.login_v30( ip=self._s.hikvision_device_ip.strip(), port=int(self._s.hikvision_device_port), username=self._s.hikvision_user.strip(), password=self._s.hikvision_password.strip(), ) ) except Exception as exc: HikvisionInitRefCount.release(self._hik) if self._s.hikvision_sdk_fallback_to_rtsp: logger.warning("Hikvision login failed ({}); fallback to RTSP", exc) return self._resolver.rtsp_url_for_camera(camera_id), None, False raise url = self._resolver.rtsp_url_after_hikvision_login(camera_id) return url, login.user_id, True