"""时间窗聚合:按 ``consumable_vision_window_sec`` 桶内众数投票,产出 ``WindowInferenceReady``。 从 ``CameraSessionManager._camera_worker`` 的时间窗计票逻辑独立出来,便于单测。 消耗 TSV / 终端 Markdown 在 ``VisionClassificationHandler`` 中按「自动确认 / 待确认」分支写入, 避免待确认事件在日志中先记成具体耗材名。 """ from __future__ import annotations import time from dataclasses import dataclass from app.baked import algorithm as ba from app.services.consumable_vision_algorithm import ( ClsTop3, PredictionResult, cls_top3_to_prediction_result, window_bucket_to_best_snap, ) from app.services.video.session_registry import ( CameraStreamInferState, SurgerySessionState, ) @dataclass(frozen=True) class WindowInferenceReady: """单个已完成时间窗:原始 top3 快照 + 分类结果 + 墙钟区间(与 monotonic 窗对齐)。""" best: ClsTop3 prediction: PredictionResult wall_lo: float wall_hi: float class WindowInferenceAggregator: """负责把单路相机的推理快照按时间窗分桶,并产出「桶内最佳」结果。 本类无状态:状态保存在 ``SurgerySessionState.camera_infer`` 中, 便于与原逻辑保持一致;调用方在持有 ``state.lock`` 时调用下面的方法。 """ def __init__(self) -> None: pass def ingest_snapshot_and_collect_ready( self, *, surgery_id: str, camera_id: str, snap: ClsTop3, state: SurgerySessionState, ) -> list[WindowInferenceReady]: """摄入一条推理快照,返回本次因桶满而产出的窗口列表。 调用方必须已持有 ``state.lock``。 """ _ = surgery_id _ = camera_id wsec = ba.CONSUMABLE_VISION_WINDOW_SEC ready: list[WindowInferenceReady] = [] cis = state.camera_infer.setdefault(camera_id, CameraStreamInferState()) if cis.stream_t0 is None: cis.stream_t0 = time.monotonic() cis.stream_wall_start = time.time() t_rel = time.monotonic() - cis.stream_t0 cis.votes.append((t_rel, snap.t1_name, snap)) current_b = int(t_rel // wsec) while cis.next_bucket < current_b: b = cis.next_bucket cis.next_bucket += 1 lo, hi = b * wsec, (b + 1) * wsec bucket_pts = [(p, sn) for (t, p, sn) in cis.votes if lo <= t < hi] cis.votes = [ (t, p, sn) for (t, p, sn) in cis.votes if not (lo <= t < hi) ] if not bucket_pts: continue best = window_bucket_to_best_snap(bucket_pts) if best is None or cis.stream_wall_start is None: continue wall_lo = cis.stream_wall_start + lo wall_hi = cis.stream_wall_start + hi pred = cls_top3_to_prediction_result(best) ready.append( WindowInferenceReady( best=best, prediction=pred, wall_lo=wall_lo, wall_hi=wall_hi, ) ) return ready