from __future__ import annotations import json import os import sys import tempfile from datetime import datetime, timezone from pathlib import Path from typing import List, Optional, Tuple from app.logging_config import format_json_pretty, new_run_id, stage from app.services.video_slice import get_video_duration, slice_video from app.settings import Settings from app.state import HealthSnapshot from app.subprocess_run import run_subprocess_with_log from loguru import logger def _action_subprocess_dump_dir(settings: Settings) -> Path: """``{LOG_SUBPROCESS_DIR}/action/subprocess`` —— 行为识别子进程整段输出目录。""" return Path(settings.log_subprocess_dir) / "action" / "subprocess" # 视频切片配置 DEFAULT_SLICE_DURATION = 10.0 # 每10秒切一个片段 DEFAULT_MIN_DURATION_FOR_SLICE = 15.0 # 视频超过15秒才切片 BEHAVIOR_EN_TO_ZH = { "feeding": "吃饵", "normal": "正常游行", "scared": "惊吓", } def _py_exe(settings: Settings) -> str: return settings.python_fish_action or sys.executable def behavior_to_health(behavior_en: str) -> str: if behavior_en == "scared": return "不健康" return "健康" def run_action_subprocess( mp4_path: Path, settings: Settings, *, run_id: Optional[str] = None, ) -> str: script = settings.fish_action_root / "predict_video_x3d_3class.py" if not script.is_file(): raise FileNotFoundError(f"Missing FishAction script: {script}") mp4_path = mp4_path.resolve() path_prefix = str(mp4_path.parent) rel_name = mp4_path.name with tempfile.NamedTemporaryFile( mode="w", suffix=".csv", delete=False, encoding="utf-8" ) as tmp: tmp.write(f"{rel_name} 0\n") csv_path = tmp.name out_fd, out_json = tempfile.mkstemp(suffix="_pred.json") os.close(out_fd) rid = run_id or new_run_id("action") try: #: 使用脚本默认参数(clips_per_video=10, batch_size=8, num_workers=4) #: 如需调整,请修改 FishAction/predict_video_x3d_3class.py 中的默认值 cmd = [ _py_exe(settings), str(script), "--checkpoint", settings.action_checkpoint, "--csv", csv_path, "--path_prefix", path_prefix, "--output_json", out_json, "--log_interval", "0", ] with stage( f"行为识别子进程 X3D({mp4_path.name})", pipeline="action", step="action_subprocess", run_id=rid, source=mp4_path.name, metrics={"checkpoint": settings.action_checkpoint}, ): proc = run_subprocess_with_log( cmd, cwd=str(settings.fish_action_root), env=os.environ.copy(), log_name="FishAction", stream_to_logger=False, dump_dir=_action_subprocess_dump_dir(settings), run_id=rid, source_label=mp4_path.stem, kind="predict_video_x3d_3class", pipeline="action", step="action_subprocess", summary_tail_lines=settings.log_subprocess_tail_lines, ) if proc.returncode != 0: err = proc.stdout or "" raise RuntimeError( f"predict_video_x3d_3class.py failed ({proc.returncode}): {err[-4000:]}" ) if not Path(out_json).is_file(): raise RuntimeError(f"No output_json written: {out_json}") with open(out_json, encoding="utf-8") as f: rows = json.load(f) if not rows: raise RuntimeError("Empty prediction JSON") pred_en = str(rows[0].get("pred_3class", "")).strip().lower() logger.bind(pipeline="action", run_id=rid).debug( "[FishAction] 预测原始行:\n{}", format_json_pretty(rows[0]), ) if pred_en not in BEHAVIOR_EN_TO_ZH: raise RuntimeError(f"Unexpected pred_3class: {pred_en!r}") return pred_en finally: Path(csv_path).unlink(missing_ok=True) Path(out_json).unlink(missing_ok=True) def prepare_action_slices( mp4_path: Path, settings: Settings, *, run_id: Optional[str] = None, ) -> Tuple[List[Path], float]: """Check video duration and create slices if needed. Returns ``(slice_files, duration)``. If the video is short enough, returns ``([mp4_path], duration)`` without slicing. """ rid = run_id or new_run_id("action") log = logger.bind(pipeline="action", run_id=rid, source=mp4_path.name) log.info("[FishAction] 开始处理视频 | mp4={}", mp4_path.resolve()) with stage( f"视频切片预处理({mp4_path.name})", pipeline="action", step="prepare_slices", run_id=rid, source=mp4_path.name, metrics={"min_duration_for_slice": DEFAULT_MIN_DURATION_FOR_SLICE, "slice_duration": DEFAULT_SLICE_DURATION}, ): duration = get_video_duration(mp4_path) if duration > DEFAULT_MIN_DURATION_FOR_SLICE: log.info( "[FishAction] 视频时长 {:.1f}s > {}s,切分为 {}s 一段", duration, DEFAULT_MIN_DURATION_FOR_SLICE, DEFAULT_SLICE_DURATION, ) slice_files, _slice_dir = slice_video(mp4_path, DEFAULT_SLICE_DURATION) if len(slice_files) > 1: log.info( "[FishAction] {} 将处理 {} 段切片", mp4_path.name, len(slice_files), ) return slice_files, duration return [mp4_path], duration def run_single_slice_inference( slice_file: Path, slice_index: int, total_slices: int, duration: float, mp4_name: str, settings: Settings, *, run_id: Optional[str] = None, ) -> HealthSnapshot: """Run FishAction inference on a single video file / slice. Returns a ``HealthSnapshot`` (may contain error info if inference fails). """ start_time = slice_index * DEFAULT_SLICE_DURATION end_time = min(start_time + DEFAULT_SLICE_DURATION, duration) rid = run_id or new_run_id("action") log = logger.bind(pipeline="action", run_id=rid, source=slice_file.name) try: with stage( f"切片识别 #{slice_index + 1}/{total_slices}({slice_file.name})", pipeline="action", step="slice_inference", run_id=rid, source=slice_file.name, metrics={ "slice_index": slice_index, "total_slices": total_slices, "start_sec": round(start_time, 2), "end_sec": round(end_time, 2), }, ): pred_en = run_action_subprocess(slice_file, settings, run_id=rid) zh = BEHAVIOR_EN_TO_ZH[pred_en] health = behavior_to_health(pred_en) snap = HealthSnapshot( behavior_result=zh, health_result=health, updated_at=datetime.now(timezone.utc), raw_class_en=pred_en, ) # 中文识别结果事件(用于离线分析) logger.bind( event=True, pipeline="action", step="slice_result", run_id=rid, source=slice_file.name, status="info", metrics={ "slice_index": slice_index, "total_slices": total_slices, "start_sec": round(start_time, 2), "end_sec": round(end_time, 2), "pred_3class": pred_en, "behavior_zh": zh, "health_zh": health, "mp4_name": mp4_name, }, ).info( "[FishAction] 切片识别结果 | {} | {} | 段 {}/{} | {:.1f}~{:.1f}s | 行为={} | 健康={}", mp4_name, slice_file.name, slice_index + 1, total_slices, start_time, end_time, zh, health, ) if total_slices == 1: log.info( "[FishAction] 视频识别完成 | mp4={} | 行为={} | 健康={}", mp4_name, zh, health, ) return snap except Exception as e: log.error( "[FishAction] 切片处理失败 | slice={}/{} | {}: {}", slice_index + 1, total_slices, type(e).__name__, e, ) return HealthSnapshot( behavior_result="处理失败", health_result="未知", updated_at=datetime.now(timezone.utc), raw_class_en="error", error=str(e), )