from __future__ import annotations import asyncio import time from datetime import datetime, timezone from pathlib import Path from loguru import logger from app.baked import algorithm as ba from app.baked import pipeline as bp from app.config import Settings from sqlalchemy.ext.asyncio import async_sessionmaker from app.database import AsyncSessionLocal from app.domain.consumption import SurgeryConsumptionStored from app.repositories.surgery_results import SurgeryResultRepository from app.services.consumable_vision_algorithm import ( ConsumableVisionAlgorithmService, PredictionResult, ) from app.services.video.archive_persister import ArchivePersister from app.services.video.backend_resolver import BackendResolver from app.services.video.classification_handler import VisionClassificationHandler from app.services.video.hikvision_runtime import HikvisionInitRefCount, HikvisionRuntime from app.services.video.inference_aggregator import WindowInferenceAggregator from app.services.video.session_registry import ( CameraStreamInferState, PendingConsumableConfirmation, RunningSurgery, SurgerySessionRegistry, SurgerySessionState, ) from app.services.tear_gated_segment_consumption.product_map import ( load_tear_segment_name_to_id, resolve_tear_segment_labels_yaml_path, ) from app.services.tear_gated_segment_consumption.report import write_tear_segment_txt from app.services.tear_gated_segment_consumption.runner import ( TearGatedSegmentModelBundle, TearGatedSegmentRunner, ) from app.services.video.stream_worker import CameraStreamWorker, redact_rtsp_url from app.services.video.types import VideoBackendKind from app.schemas import SurgeryConsumptionDetail, build_consumption_summary from app.services.consumption_tsv_log import ( append_consumption_log_summary, init_consumption_log_file, print_consumption_summary_markdown, ) from app.services.voice_file_log import init_voice_log_file from app.surgery_errors import SurgeryPipelineError __all__ = [ "CameraSessionManager", "CameraStreamInferState", "PendingConsumableConfirmation", "RunningSurgery", "SurgerySessionState", ] class CameraSessionManager: """Per-surgery camera orchestration. 本类负责: 1. 开始/停止手术:创建 `SurgerySessionState`、拉起相机 worker、停录时收尾。 2. 把「语音确认所需的内存态」委托给 ``SurgerySessionRegistry``(实现 `PendingConfirmationStore`)。 3. 把「结果写库 + 失败重试 + durable fallback」委托给 ``ArchivePersister``。 对外接口保持不变,上游(``SurgeryPipeline`` / ``VoiceConfirmationService``)无需感知拆分。 """ def __init__( self, *, settings: Settings, vision_algorithm: ConsumableVisionAlgorithmService, hikvision_runtime: HikvisionRuntime | None, result_repository: SurgeryResultRepository | None = None, session_factory: async_sessionmaker | None = None, registry: SurgerySessionRegistry | None = None, archive_persister: ArchivePersister | None = None, tear_segment_models: TearGatedSegmentModelBundle | None = None, ) -> None: self._s = settings self._vision = vision_algorithm self._hik = hikvision_runtime self._tear_models = tear_segment_models self._session_factory: async_sessionmaker = session_factory or AsyncSessionLocal self._resolver = BackendResolver(settings, hikvision_runtime=hikvision_runtime) self._registry = registry or SurgerySessionRegistry() self._archive = archive_persister or ArchivePersister( repository=result_repository, session_factory=self._session_factory, ) self._aggregator = WindowInferenceAggregator() self._classifier_handler = VisionClassificationHandler( registry=self._registry, ) # ------------------------------------------------------------------ # 生命周期 # ------------------------------------------------------------------ async def start_archive_retry_loop(self) -> None: await self._archive.recover_from_durable_fallback() await self._archive.start_retry_loop() async def shutdown(self) -> None: await self._archive.shutdown() ids = self._registry.active_ids() for sid in ids: try: await self.stop_surgery(sid, require_active=False) except Exception as exc: logger.warning("shutdown stop_surgery {}: {}", sid, exc) # ------------------------------------------------------------------ # Surgery start / stop # ------------------------------------------------------------------ async def start_surgery( self, surgery_id: str, camera_ids: list[str], candidate_consumables: list[str], ) -> None: if self._registry.has_active(surgery_id): raise SurgeryPipelineError( "RECORDING_CANNOT_START", "该手术已在录制中,请勿重复开始。", ) stale = await self._archive.take_archived_details(surgery_id) if stale is not None: logger.warning( "surgery_id={} 仍有未落库归档,尝试写入数据库后再开始新会话", surgery_id, ) if self._archive.repository is None: logger.error( "surgery_id={} 有内存归档但未配置数据库仓库,无法持久化;" "开始新会话将丢弃该归档(仅开发/无库模式)", surgery_id, ) else: ok = await self._archive.persist_or_archive(surgery_id, stale) if not ok: raise SurgeryPipelineError( "RECORDING_CANNOT_START", "该手术号存在尚未写入数据库的历史结果,请修复数据库或等待自动重试成功后再开始。", ) resolved = self._vision.effective_candidate_consumables(candidate_consumables) if not resolved: raise SurgeryPipelineError( "RECORDING_CANNOT_START", "耗材候选为空:请在请求中传入 candidate_consumables,或提供有效的 consumable_classifier_labels.yaml / 分类模型。", ) if not any(str(x).strip() for x in candidate_consumables): logger.info( "surgery {}: candidate_consumables 未提供,使用默认全量 {} 项", surgery_id, len(resolved), ) name_to_code = self._vision.build_name_mapping(resolved) state = SurgerySessionState( candidate_consumables=list(resolved), name_to_code=name_to_code, ) stop_event = asyncio.Event() readies = [asyncio.Event() for _ in camera_ids] tasks: list[asyncio.Task[None]] = [] open_timeout = bp.VIDEO_OPEN_TIMEOUT_SEC + 5.0 for cam_id, ready in zip(camera_ids, readies, strict=True): tasks.append( asyncio.create_task( self._camera_worker( surgery_id=surgery_id, camera_id=cam_id, stream_ready=ready, stop_event=stop_event, state=state, ), name=f"camera:{surgery_id}:{cam_id}", ) ) run = RunningSurgery(stop_event=stop_event, state=state, tasks=tasks) init_consumption_log_file(surgery_id) init_voice_log_file(surgery_id) await self._registry.register(surgery_id, run) if ba.TEAR_SEGMENT_ENABLED: primary = (ba.TEAR_SEGMENT_PRIMARY_CAMERA_ID or "").strip() if primary and primary not in camera_ids: logger.warning( "撕段算法已开启但主摄 id={!r} 不在本台开录 camera_ids={} 中,该路不会跑撕段流水线", primary, camera_ids, ) try: await asyncio.wait_for( asyncio.gather(*(r.wait() for r in readies)), timeout=open_timeout, ) state.ready.set() except TimeoutError as exc: logger.error( "Surgery {} cameras not all ready within {}s", surgery_id, open_timeout, ) await self.stop_surgery(surgery_id, require_active=True) raise SurgeryPipelineError( "RECORDING_CANNOT_START", "开录未能确认:部分摄像头在超时内未成功拉到首帧。", ) from exc except Exception: await self.stop_surgery(surgery_id, require_active=True) raise async def stop_surgery(self, surgery_id: str, *, require_active: bool = True) -> None: run = await self._registry.unregister(surgery_id) if run is None: if require_active: raise SurgeryPipelineError( "RECORDING_NOT_STOPPED", "停录未能完成:当前没有该手术的活跃录制会话。", ) return run.stop_event.set() results = await asyncio.gather(*run.tasks, return_exceptions=True) for res in results: if isinstance(res, BaseException): logger.warning("surgery task finished with error: {}", res) details = list(run.state.details) detail_rows = [ SurgeryConsumptionDetail( item_id=r.item_id, item_name=r.item_name, qty=r.qty, doctor_id=r.doctor_id, timestamp=r.timestamp, ) for r in details ] summ = build_consumption_summary(detail_rows) totals = {s.item_id: (s.item_name, s.total_quantity) for s in summ} append_consumption_log_summary(surgery_id, totals) print_consumption_summary_markdown(totals) await self._archive.persist_or_archive(surgery_id, details) # ------------------------------------------------------------------ # PendingConfirmationStore 协议委托 # ------------------------------------------------------------------ def live_consumption_if_active( self, surgery_id: str ) -> list[SurgeryConsumptionStored] | None: return self._registry.live_consumption_if_active(surgery_id) def archived_consumption_fallback( self, surgery_id: str ) -> list[SurgeryConsumptionStored] | None: return self._archive.archived_details(surgery_id) def record_voice_trace( self, surgery_id: str, *, asr_text: str | None, error: str | None, ) -> None: self._registry.record_voice_trace(surgery_id, asr_text=asr_text, error=error) def get_pending_confirmation_by_id( self, surgery_id: str, confirmation_id: str, ) -> PendingConsumableConfirmation | None: return self._registry.get_pending_confirmation_by_id(surgery_id, confirmation_id) def get_surgery_candidate_consumables(self, surgery_id: str) -> list[str]: return self._registry.get_surgery_candidate_consumables(surgery_id) async def record_voice_parse_failure( self, surgery_id: str, confirmation_id: str ) -> tuple[int, int]: return await self._registry.record_voice_parse_failure(surgery_id, confirmation_id) def next_pending_confirmation( self, surgery_id: str ) -> PendingConsumableConfirmation | None: return self._registry.next_pending_confirmation(surgery_id) async def resolve_pending_confirmation( self, surgery_id: str, confirmation_id: str, *, chosen_label: str | None, rejected: bool, ) -> None: await self._registry.resolve_pending_confirmation( surgery_id, confirmation_id, chosen_label=chosen_label, rejected=rejected, ) # ------------------------------------------------------------------ # Camera worker(拉流 + 推理节流 + 时间窗分桶 + 分类结果处理) # ------------------------------------------------------------------ async def _finalize_tear_segment_runner( self, *, surgery_id: str, camera_id: str, state: SurgerySessionState, runner: TearGatedSegmentRunner, ) -> None: recs = runner.finalize() for rec in recs: wall_ts = runner.wall_time_for_record(rec) detail_ts = datetime.fromtimestamp(wall_ts, tz=timezone.utc) await self._registry.append_confirmed_detail( state=state, item_id=rec.item_id, item_name=rec.item_name, doctor_id=bp.VIDEO_RESULT_DOCTOR_ID, source="tear_segment", cooldown_key=f"{surgery_id}:tear_seg:{rec.segment_index}", detail_timestamp=detail_ts, ) if ba.TEAR_SEGMENT_LOG_TXT and recs: raw_tpl = (ba.TEAR_SEGMENT_LOG_TXT_PATH or "").strip() if raw_tpl and "{surgery_id}" in raw_tpl: p = Path(raw_tpl.format(surgery_id=surgery_id)).expanduser() elif raw_tpl: p = Path(raw_tpl).expanduser() else: p = Path(f"logs/tear_segment_{surgery_id}.txt") labels_src = str(resolve_tear_segment_labels_yaml_path()) write_tear_segment_txt( path=p, surgery_id=surgery_id, camera_id=camera_id, labels_source=labels_src, records=recs, ) logger.info("撕段报告已写: {}", p) async def _camera_worker( self, *, surgery_id: str, camera_id: str, stream_ready: asyncio.Event, stop_event: asyncio.Event, state: SurgerySessionState, ) -> None: kind = self._resolver.backend_for_camera(camera_id) hik_user_id: int | None = None hik_init_retained = False try: url, hik_user_id, hik_init_retained = await self._resolve_rtsp_url( camera_id=camera_id, kind=kind, ) assert url is not None primary = (ba.TEAR_SEGMENT_PRIMARY_CAMERA_ID or "").strip() use_tear_req = ( ba.TEAR_SEGMENT_ENABLED and self._tear_models is not None and primary and camera_id == primary ) runner: TearGatedSegmentRunner | None = None if use_tear_req: name_to_id = load_tear_segment_name_to_id() try: self._tear_models.ensure_loaded() runner = self._tear_models.create_runner(name_to_id) except Exception as exc: logger.exception( "撕段模型未就绪,本路回退为原时间窗算法 camera={} surgery={}: {}", camera_id, surgery_id, exc, ) runner = None if runner is not None: async def _frame_handler_tear(frame: object) -> None: await asyncio.to_thread(runner.process_frame_bgr, frame) w_tear = CameraStreamWorker( surgery_id=surgery_id, camera_id=camera_id, url=url, ) try: await w_tear.run( stream_ready=stream_ready, stop_event=stop_event, frame_handler=_frame_handler_tear, ) finally: await self._finalize_tear_segment_runner( surgery_id=surgery_id, camera_id=camera_id, state=state, runner=runner, ) else: last_infer = 0.0 async def _frame_handler(frame: object) -> None: nonlocal last_infer now = time.monotonic() if now - last_infer < bp.VIDEO_INFERENCE_INTERVAL_SEC: await asyncio.sleep(0.01) return last_infer = now try: snap = await asyncio.to_thread( self._vision.infer_frame_bgr, frame, state.name_to_code, ) except Exception as exc: logger.debug( "Inference skip camera={} surgery={}: {}", camera_id, surgery_id, exc, ) return if snap is None: return if bp.VIDEO_LOG_INFERENCE_RESULTS: logger.info( "Vision result surgery={} camera={} top1={}({:.3f}) top2={}({:.3f}) top3={}({:.3f})", surgery_id, camera_id, snap.t1_name, snap.t1_conf, snap.t2_name, snap.t2_conf, snap.t3_name, snap.t3_conf, ) async with state.lock: ready_windows = self._aggregator.ingest_snapshot_and_collect_ready( surgery_id=surgery_id, camera_id=camera_id, snap=snap, state=state, ) for win in ready_windows: await self._classifier_handler.handle( state=state, cls_res=win.prediction, ready=win, surgery_id=surgery_id, camera_id=camera_id, ) worker = CameraStreamWorker( surgery_id=surgery_id, camera_id=camera_id, url=url, ) await worker.run( stream_ready=stream_ready, stop_event=stop_event, frame_handler=_frame_handler, ) finally: if hik_user_id is not None and self._hik is not None: await asyncio.to_thread(self._hik.logout, hik_user_id) if hik_init_retained and self._hik is not None: HikvisionInitRefCount.release(self._hik) async def _handle_classification_result( self, *, state: SurgerySessionState, cls_res: PredictionResult, ) -> None: """Deprecated test-shim:沿用旧签名,转发给 ``VisionClassificationHandler``。 保留此方法是因为单元测试直接调用了它。新代码应使用 ``self._classifier_handler.handle``。 """ await self._classifier_handler.handle(state=state, cls_res=cls_res) async def _resolve_rtsp_url( self, *, camera_id: str, kind: VideoBackendKind, ) -> tuple[str, int | None, bool]: """Returns (url, hikvision_user_id, whether NET_DVR_Init refcount was retained).""" if kind != VideoBackendKind.HIKVISION_SDK: return self._resolver.rtsp_url_for_camera(camera_id), None, False if self._hik is None: if self._s.hikvision_sdk_fallback_to_rtsp: logger.warning( "Hikvision SDK not loaded; fallback to RTSP for camera {}", camera_id, ) return self._resolver.rtsp_url_for_camera(camera_id), None, False raise RuntimeError("Hikvision SDK requested but libhcnetsdk.so not loaded") if not ( self._s.hikvision_device_ip.strip() and self._s.hikvision_user.strip() and self._s.hikvision_password.strip() ): if self._s.hikvision_sdk_fallback_to_rtsp: logger.warning( "Hikvision credentials incomplete; fallback to RTSP for camera {}", camera_id, ) return self._resolver.rtsp_url_for_camera(camera_id), None, False raise RuntimeError("Hikvision SDK requires HIKVISION_DEVICE_IP, user, password") HikvisionInitRefCount.retain(self._hik) try: login = await asyncio.to_thread( lambda: self._hik.login_v30( ip=self._s.hikvision_device_ip.strip(), port=int(self._s.hikvision_device_port), username=self._s.hikvision_user.strip(), password=self._s.hikvision_password.strip(), ) ) except Exception as exc: HikvisionInitRefCount.release(self._hik) if self._s.hikvision_sdk_fallback_to_rtsp: logger.warning("Hikvision login failed ({}); fallback to RTSP", exc) return self._resolver.rtsp_url_for_camera(camera_id), None, False raise url = self._resolver.rtsp_url_after_hikvision_login(camera_id) return url, login.user_id, True