Files
operating-room-monitor-server/backend/app/algorithm_runner/cli.py
Kevin 70431ca3f9 重命名 refs 为 algorithm_subprocesses,并清理误提交的权重与缓存文件。
算法子进程目录仅保留源码与配置;权重、样本 I/O、构建产物通过 .gitignore 离线交付。

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-21 16:10:08 +08:00

191 lines
6.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""子进程入口:读 RTSP + 白名单 JSON段级结果追加写入 JSONL。
基于 algorithm_subprocesses/5.15 ``actionformer_gated`` 流水线VideoSwin → ActionFormer + 手检/好坏帧/
耗材分类投票);父进程通过 ``--events-jsonl`` tail JSONL 即可消费段事件。
"""
from __future__ import annotations
import argparse
import signal
from pathlib import Path
import cv2
from loguru import logger
from app.algorithm_ipc.schema import WhitelistSpec, append_event_line
from app.algorithm_runner.actionformer_gated.runner import (
OnlineActionFormerSegmentBundle,
actionformer_segment_stable_dedupe_key,
)
from app.algorithm_runner.segment_policy import events_for_tear_record
from app.baked import algorithm as ba
_FILE_SOURCE_SUFFIXES = frozenset({".mp4", ".mov", ".m4v", ".avi", ".mkv"})
def _source_looks_like_file(source: str) -> bool:
if source.startswith(("rtsp://", "rtmp://", "http://", "https://")):
return False
p = Path(source).expanduser()
return p.is_file() or p.suffix.lower() in _FILE_SOURCE_SUFFIXES
def _resolve_source_mode(source: str, requested: str) -> str:
if requested != "auto":
return requested
return "file" if _source_looks_like_file(source) else "realtime"
def _stream_time_for_frame(
*,
source_mode: str,
cap_pos_msec: float,
frame_idx: int,
fps_hint: float,
) -> float:
if source_mode == "file" and cap_pos_msec > 0:
return float(cap_pos_msec) / 1000.0
fps = fps_hint if fps_hint > 0 else float(ba.ACTIONFORMER_ASSUMED_FPS)
return float(frame_idx) / fps
def main(argv: list[str] | None = None) -> int:
ap = argparse.ArgumentParser(description="OR 耗材撕段算法子进程RTSP → JSONL")
ap.add_argument("--source", required=True, help="RTSP URL 或视频路径")
ap.add_argument("--whitelist-json", required=True, type=Path)
ap.add_argument("--events-jsonl", required=True, type=Path)
ap.add_argument("--wall-anchor", type=float, required=True, help="开录墙钟 time.time()")
ap.add_argument("--surgery-id", default="", help="用于 cooldown_key 前缀")
ap.add_argument("--camera-id", default="cam01")
ap.add_argument(
"--source-mode",
choices=("auto", "realtime", "file"),
default="auto",
help="auto: 本地视频文件走 file否则走 realtimerealtime 使用墙钟时间轴file 使用视频时间轴。",
)
args_ns = ap.parse_args(argv)
wl = WhitelistSpec.load_path(args_ns.whitelist_json)
events_path: Path = args_ns.events_jsonl
wall_anchor = float(args_ns.wall_anchor)
surgery_id = (args_ns.surgery_id or "").strip()
camera_id = (args_ns.camera_id or "").strip() or "cam01"
source_mode = _resolve_source_mode(args_ns.source, args_ns.source_mode)
stop_flag = {"stop": False}
def _on_sig(*_: object) -> None:
stop_flag["stop"] = True
signal.signal(signal.SIGTERM, _on_sig)
signal.signal(signal.SIGINT, _on_sig)
bundle = OnlineActionFormerSegmentBundle()
try:
bundle.ensure_loaded()
except Exception as exc:
logger.exception("算法模型加载失败: {}", exc)
append_event_line(
events_path,
{"type": "error", "message": f"model_load_failed: {exc!s}"},
)
return 2
cap = cv2.VideoCapture(args_ns.source)
if not cap.isOpened():
logger.error("无法打开视频源: {}", args_ns.source)
append_event_line(
events_path,
{"type": "error", "message": f"cannot_open_source: {args_ns.source!r}"},
)
return 3
fps_hint = float(cap.get(cv2.CAP_PROP_FPS) or 0.0)
if fps_hint <= 0:
fps_hint = float(ba.ACTIONFORMER_ASSUMED_FPS)
runner_ref: list = [None]
def stable_segments_sink(batch: list) -> None:
r = runner_ref[0]
if r is None:
return
for rec in batch:
k = actionformer_segment_stable_dedupe_key(rec)
wall_lo = wall_anchor + rec.start_sec
wall_hi = wall_anchor + rec.end_sec
prefix = f"{surgery_id}:" if surgery_id else ""
cooldown_key = f"{prefix}af_seg:{k}"
evs = events_for_tear_record(
rec,
wl=wl,
camera_id=camera_id,
wall_lo=wall_lo,
wall_hi=wall_hi,
cooldown_key=cooldown_key,
)
for ev in evs:
ev["frozen"] = True
append_event_line(events_path, ev)
if evs:
r.mark_stable_records_emitted([rec])
runner = bundle.create_runner(
dict(wl.name_to_id),
candidate_consumables=list(wl.candidate_consumables),
timeline_anchor_wall=wall_anchor,
stable_segments_sink=stable_segments_sink,
)
runner_ref[0] = runner
append_event_line(events_path, {"type": "ready", "camera_id": camera_id})
frame_idx = 0
try:
while not stop_flag["stop"]:
ret, frame = cap.read()
if not ret or frame is None:
break
stream_sec = _stream_time_for_frame(
source_mode=source_mode,
cap_pos_msec=float(cap.get(cv2.CAP_PROP_POS_MSEC) or 0.0),
frame_idx=frame_idx,
fps_hint=fps_hint,
)
runner.process_frame_bgr(frame, stream_time_sec=stream_sec)
frame_idx += 1
finally:
cap.release()
full_tail_runner = runner_ref[0]
if full_tail_runner is not None:
_full, tail = full_tail_runner.finalize_split_for_stop()
for rec in tail:
k = actionformer_segment_stable_dedupe_key(rec)
wall_lo = wall_anchor + rec.start_sec
wall_hi = wall_anchor + rec.end_sec
prefix = f"{surgery_id}:" if surgery_id else ""
cooldown_key = f"{prefix}af_seg:{k}"
evs = events_for_tear_record(
rec,
wl=wl,
camera_id=camera_id,
wall_lo=wall_lo,
wall_hi=wall_hi,
cooldown_key=cooldown_key,
)
for ev in evs:
ev["frozen"] = True
append_event_line(events_path, ev)
if evs:
full_tail_runner.mark_stable_records_emitted([rec])
done_reason = "stop" if stop_flag["stop"] else "eof"
append_event_line(events_path, {"type": "done", "reason": done_reason})
return 0
if __name__ == "__main__":
raise SystemExit(main())