Files
2026-05-13 09:19:31 +08:00

257 lines
8.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import json
import os
import sys
import tempfile
from datetime import datetime, timezone
from pathlib import Path
from typing import List, Optional, Tuple
from app.logging_config import format_json_pretty, new_run_id, stage
from app.services.video_slice import get_video_duration, slice_video
from app.settings import Settings
from app.state import HealthSnapshot
from app.subprocess_run import run_subprocess_with_log
from loguru import logger
def _action_subprocess_dump_dir(settings: Settings) -> Path:
"""``{LOG_SUBPROCESS_DIR}/action/subprocess`` —— 行为识别子进程整段输出目录。"""
return Path(settings.log_subprocess_dir) / "action" / "subprocess"
# 视频切片配置
DEFAULT_SLICE_DURATION = 10.0 # 每10秒切一个片段
DEFAULT_MIN_DURATION_FOR_SLICE = 15.0 # 视频超过15秒才切片
BEHAVIOR_EN_TO_ZH = {
"feeding": "吃饵",
"normal": "正常游行",
"scared": "惊吓",
}
def _py_exe(settings: Settings) -> str:
return settings.python_fish_action or sys.executable
def behavior_to_health(behavior_en: str) -> str:
if behavior_en == "scared":
return "不健康"
return "健康"
def run_action_subprocess(
mp4_path: Path, settings: Settings, *, run_id: Optional[str] = None,
) -> str:
script = settings.fish_action_root / "predict_video_x3d_3class.py"
if not script.is_file():
raise FileNotFoundError(f"Missing FishAction script: {script}")
mp4_path = mp4_path.resolve()
path_prefix = str(mp4_path.parent)
rel_name = mp4_path.name
with tempfile.NamedTemporaryFile(
mode="w", suffix=".csv", delete=False, encoding="utf-8"
) as tmp:
tmp.write(f"{rel_name} 0\n")
csv_path = tmp.name
out_fd, out_json = tempfile.mkstemp(suffix="_pred.json")
os.close(out_fd)
rid = run_id or new_run_id("action")
try:
#: 使用脚本默认参数clips_per_video=10, batch_size=8, num_workers=4
#: 如需调整,请修改 FishAction/predict_video_x3d_3class.py 中的默认值
cmd = [
_py_exe(settings),
str(script),
"--checkpoint",
settings.action_checkpoint,
"--csv",
csv_path,
"--path_prefix",
path_prefix,
"--output_json",
out_json,
"--log_interval",
"0",
]
with stage(
f"行为识别子进程 X3D{mp4_path.name}",
pipeline="action",
step="action_subprocess",
run_id=rid,
source=mp4_path.name,
metrics={"checkpoint": settings.action_checkpoint},
):
proc = run_subprocess_with_log(
cmd,
cwd=str(settings.fish_action_root),
env=os.environ.copy(),
log_name="FishAction",
stream_to_logger=False,
dump_dir=_action_subprocess_dump_dir(settings),
run_id=rid,
source_label=mp4_path.stem,
kind="predict_video_x3d_3class",
pipeline="action",
step="action_subprocess",
summary_tail_lines=settings.log_subprocess_tail_lines,
)
if proc.returncode != 0:
err = proc.stdout or ""
raise RuntimeError(
f"predict_video_x3d_3class.py failed ({proc.returncode}): {err[-4000:]}"
)
if not Path(out_json).is_file():
raise RuntimeError(f"No output_json written: {out_json}")
with open(out_json, encoding="utf-8") as f:
rows = json.load(f)
if not rows:
raise RuntimeError("Empty prediction JSON")
pred_en = str(rows[0].get("pred_3class", "")).strip().lower()
logger.bind(pipeline="action", run_id=rid).debug(
"[FishAction] 预测原始行:\n{}",
format_json_pretty(rows[0]),
)
if pred_en not in BEHAVIOR_EN_TO_ZH:
raise RuntimeError(f"Unexpected pred_3class: {pred_en!r}")
return pred_en
finally:
Path(csv_path).unlink(missing_ok=True)
Path(out_json).unlink(missing_ok=True)
def prepare_action_slices(
mp4_path: Path, settings: Settings, *, run_id: Optional[str] = None,
) -> Tuple[List[Path], float]:
"""Check video duration and create slices if needed.
Returns ``(slice_files, duration)``. If the video is short enough,
returns ``([mp4_path], duration)`` without slicing.
"""
rid = run_id or new_run_id("action")
log = logger.bind(pipeline="action", run_id=rid, source=mp4_path.name)
log.info("[FishAction] 开始处理视频 | mp4={}", mp4_path.resolve())
with stage(
f"视频切片预处理({mp4_path.name}",
pipeline="action",
step="prepare_slices",
run_id=rid,
source=mp4_path.name,
metrics={"min_duration_for_slice": DEFAULT_MIN_DURATION_FOR_SLICE,
"slice_duration": DEFAULT_SLICE_DURATION},
):
duration = get_video_duration(mp4_path)
if duration > DEFAULT_MIN_DURATION_FOR_SLICE:
log.info(
"[FishAction] 视频时长 {:.1f}s > {}s切分为 {}s 一段",
duration, DEFAULT_MIN_DURATION_FOR_SLICE, DEFAULT_SLICE_DURATION,
)
slice_files, _slice_dir = slice_video(mp4_path, DEFAULT_SLICE_DURATION)
if len(slice_files) > 1:
log.info(
"[FishAction] {} 将处理 {} 段切片",
mp4_path.name, len(slice_files),
)
return slice_files, duration
return [mp4_path], duration
def run_single_slice_inference(
slice_file: Path,
slice_index: int,
total_slices: int,
duration: float,
mp4_name: str,
settings: Settings,
*,
run_id: Optional[str] = None,
) -> HealthSnapshot:
"""Run FishAction inference on a single video file / slice.
Returns a ``HealthSnapshot`` (may contain error info if inference fails).
"""
start_time = slice_index * DEFAULT_SLICE_DURATION
end_time = min(start_time + DEFAULT_SLICE_DURATION, duration)
rid = run_id or new_run_id("action")
log = logger.bind(pipeline="action", run_id=rid, source=slice_file.name)
try:
with stage(
f"切片识别 #{slice_index + 1}/{total_slices}{slice_file.name}",
pipeline="action",
step="slice_inference",
run_id=rid,
source=slice_file.name,
metrics={
"slice_index": slice_index,
"total_slices": total_slices,
"start_sec": round(start_time, 2),
"end_sec": round(end_time, 2),
},
):
pred_en = run_action_subprocess(slice_file, settings, run_id=rid)
zh = BEHAVIOR_EN_TO_ZH[pred_en]
health = behavior_to_health(pred_en)
snap = HealthSnapshot(
behavior_result=zh,
health_result=health,
updated_at=datetime.now(timezone.utc),
raw_class_en=pred_en,
)
# 中文识别结果事件(用于离线分析)
logger.bind(
event=True,
pipeline="action",
step="slice_result",
run_id=rid,
source=slice_file.name,
status="info",
metrics={
"slice_index": slice_index,
"total_slices": total_slices,
"start_sec": round(start_time, 2),
"end_sec": round(end_time, 2),
"pred_3class": pred_en,
"behavior_zh": zh,
"health_zh": health,
"mp4_name": mp4_name,
},
).info(
"[FishAction] 切片识别结果 | {} | {} | 段 {}/{} | {:.1f}~{:.1f}s | 行为={} | 健康={}",
mp4_name,
slice_file.name,
slice_index + 1,
total_slices,
start_time,
end_time,
zh,
health,
)
if total_slices == 1:
log.info(
"[FishAction] 视频识别完成 | mp4={} | 行为={} | 健康={}",
mp4_name, zh, health,
)
return snap
except Exception as e:
log.error(
"[FishAction] 切片处理失败 | slice={}/{} | {}: {}",
slice_index + 1, total_slices, type(e).__name__, e,
)
return HealthSnapshot(
behavior_result="处理失败",
health_result="未知",
updated_at=datetime.now(timezone.utc),
raw_class_en="error",
error=str(e),
)