"""视觉分类结果处理:把 ``PredictionResult`` 转成自动记账 or 待人工确认。 从 ``CameraSessionManager`` 抽出,保持原先行为: - 置信度低于 ``video_voice_confirm_min_confidence`` → 丢弃。 - 会话状态中候选清单为空 → 丢弃(开录时通常会由空请求解析为全量目录/模型类名)。 - 置信度 ≥ ``video_auto_confirm_confidence`` 且 Top1 在候选内 → 自动追加 vision 明细,并写消耗 TSV(记具体耗材)。 - 置信度 ≥ 自动阈值但 Top1 不在候选内 → 视 voice_confirmation_enabled 入 pending。 - 中等置信度 → 入 pending(若有可展示候选项)。 需医生确认时:消耗 TSV / 内存明细记「待确认」(不写模型 top1 商品名);语音确认后**替换**该条 TSV 为最终耗材。停录时 TSV/终端汇总与查结果 API 同口径,由 details 经 ``build_consumption_summary`` 生成。 """ from __future__ import annotations import time from app.baked import pipeline as bp from app.services.consumable_vision_algorithm import ( PredictionCandidate, PredictionResult, ) from app.services.consumption_tsv_log import ( append_consumption_pending_window, append_consumption_window, resolve_consumption_item_id, ) from app.services.video.inference_aggregator import WindowInferenceReady from app.services.video.session_registry import ( SurgerySessionRegistry, SurgerySessionState, format_elapsed_mmss_since, ) def rank_topk_for_candidates( topk: list[PredictionCandidate], ordered_candidates: list[str], *, limit: int = 5, ) -> list[PredictionCandidate]: if not topk: return [] stripped_order = [c.strip() for c in ordered_candidates if c.strip()] if not stripped_order: return topk[:limit] order_index = {name: i for i, name in enumerate(stripped_order)} picked = [c for c in topk if c.label.strip() in order_index] picked.sort(key=lambda c: order_index[c.label.strip()]) return picked[:limit] class VisionClassificationHandler: """把分类结果转化为 registry 上的状态变更(追加明细 / 入队待确认)。""" def __init__( self, *, registry: SurgerySessionRegistry, ) -> None: self._registry = registry def _append_vision_consumption_window_if_ready( self, state: SurgerySessionState, ready: WindowInferenceReady | None, surgery_id: str, camera_id: str, ) -> None: if ( ready is None or not surgery_id or not camera_id or ( not bp.CONSUMPTION_TSV_LOG_ENABLED and not bp.CONSUMPTION_LOG_MARKDOWN_TERMINAL ) ): return append_consumption_window( surgery_id=surgery_id, name_to_code=state.name_to_code, best=ready.best, doctor_id=bp.VIDEO_RESULT_DOCTOR_ID, camera_id=camera_id, wall_start_epoch=ready.wall_lo, wall_end_epoch=ready.wall_hi, since_recording_start=format_elapsed_mmss_since( state.surgery_started_wall, at_epoch=ready.wall_hi, ), ) async def handle( self, *, state: SurgerySessionState, cls_res: PredictionResult, ready: WindowInferenceReady | None = None, surgery_id: str = "", camera_id: str = "", ) -> None: conf = cls_res.confidence label = (cls_res.label or "").strip() t1_pid = (ready.best.t1_pid if ready is not None else "") item_id = resolve_consumption_item_id(label, t1_pid, state.name_to_code) voice_floor = bp.VIDEO_VOICE_CONFIRM_MIN_CONFIDENCE if conf < voice_floor: return cand_order = [c.strip() for c in state.candidate_consumables if c.strip()] if not cand_order: return cand_set = set(cand_order) ranked = rank_topk_for_candidates(cls_res.topk, cand_order) auto_th = bp.VIDEO_AUTO_CONFIRM_CONFIDENCE def in_allowed(name: str) -> bool: return name in cand_set if conf >= auto_th and in_allowed(label): self._append_vision_consumption_window_if_ready( state, ready, surgery_id, camera_id ) await self._registry.append_confirmed_detail( state=state, item_id=item_id or "unknown", item_name=label or "unknown", doctor_id=bp.VIDEO_RESULT_DOCTOR_ID, source="vision", ) return if conf >= auto_th and not in_allowed(label): if ranked and bp.VOICE_CONFIRMATION_ENABLED: await self._enqueue( state, ranked, label, conf, ready=ready, surgery_id=surgery_id, camera_id=camera_id, ) return if not bp.VOICE_CONFIRMATION_ENABLED: return if ranked: await self._enqueue( state, ranked, label, conf, ready=ready, surgery_id=surgery_id, camera_id=camera_id, ) elif in_allowed(label): await self._enqueue( state, [PredictionCandidate(label=label, confidence=conf)], label, conf, ready=ready, surgery_id=surgery_id, camera_id=camera_id, ) async def _enqueue( self, state: SurgerySessionState, ranked: list[PredictionCandidate], top_key: str, top_confidence: float, *, ready: WindowInferenceReady | None = None, surgery_id: str = "", camera_id: str = "", ) -> None: cid = await self._registry.enqueue_pending_confirmation( state, ranked, top_key=top_key, top_confidence=top_confidence, ) if cid is None: return at_ep = ready.wall_hi if ready is not None else time.time() if ready is not None and surgery_id and camera_id and ( bp.CONSUMPTION_TSV_LOG_ENABLED or bp.CONSUMPTION_LOG_MARKDOWN_TERMINAL ): append_consumption_pending_window( surgery_id=surgery_id, confirmation_id=cid, model_snap=ready.best, doctor_id=bp.VIDEO_RESULT_DOCTOR_ID, camera_id=camera_id, wall_start_epoch=ready.wall_lo, wall_end_epoch=ready.wall_hi, tsv_enabled=bp.CONSUMPTION_TSV_LOG_ENABLED, markdown_terminal=bp.CONSUMPTION_LOG_MARKDOWN_TERMINAL, since_recording_start=format_elapsed_mmss_since( state.surgery_started_wall, at_epoch=at_ep, ), ) await self._registry.append_pending_consumption_detail( state=state, confirmation_id=cid, doctor_id=bp.VIDEO_RESULT_DOCTOR_ID, )