Files

257 lines
8.7 KiB
Python
Raw Permalink Normal View History

from __future__ import annotations
import json
import os
import sys
import tempfile
from datetime import datetime, timezone
from pathlib import Path
2026-05-13 09:19:31 +08:00
from typing import List, Optional, Tuple
2026-05-13 09:19:31 +08:00
from app.logging_config import format_json_pretty, new_run_id, stage
2026-04-13 17:13:02 +08:00
from app.services.video_slice import get_video_duration, slice_video
from app.settings import Settings
from app.state import HealthSnapshot
from app.subprocess_run import run_subprocess_with_log
from loguru import logger
2026-05-13 09:19:31 +08:00
def _action_subprocess_dump_dir(settings: Settings) -> Path:
"""``{LOG_SUBPROCESS_DIR}/action/subprocess`` —— 行为识别子进程整段输出目录。"""
return Path(settings.log_subprocess_dir) / "action" / "subprocess"
2026-04-13 17:13:02 +08:00
# 视频切片配置
DEFAULT_SLICE_DURATION = 10.0 # 每10秒切一个片段
DEFAULT_MIN_DURATION_FOR_SLICE = 15.0 # 视频超过15秒才切片
BEHAVIOR_EN_TO_ZH = {
"feeding": "吃饵",
"normal": "正常游行",
"scared": "惊吓",
}
def _py_exe(settings: Settings) -> str:
return settings.python_fish_action or sys.executable
def behavior_to_health(behavior_en: str) -> str:
if behavior_en == "scared":
return "不健康"
return "健康"
2026-05-13 09:19:31 +08:00
def run_action_subprocess(
mp4_path: Path, settings: Settings, *, run_id: Optional[str] = None,
) -> str:
script = settings.fish_action_root / "predict_video_x3d_3class.py"
if not script.is_file():
raise FileNotFoundError(f"Missing FishAction script: {script}")
mp4_path = mp4_path.resolve()
path_prefix = str(mp4_path.parent)
rel_name = mp4_path.name
with tempfile.NamedTemporaryFile(
mode="w", suffix=".csv", delete=False, encoding="utf-8"
) as tmp:
tmp.write(f"{rel_name} 0\n")
csv_path = tmp.name
out_fd, out_json = tempfile.mkstemp(suffix="_pred.json")
os.close(out_fd)
2026-05-13 09:19:31 +08:00
rid = run_id or new_run_id("action")
try:
#: 使用脚本默认参数clips_per_video=10, batch_size=8, num_workers=4
#: 如需调整,请修改 FishAction/predict_video_x3d_3class.py 中的默认值
cmd = [
_py_exe(settings),
str(script),
"--checkpoint",
settings.action_checkpoint,
"--csv",
csv_path,
"--path_prefix",
path_prefix,
"--output_json",
out_json,
"--log_interval",
"0",
]
2026-05-13 09:19:31 +08:00
with stage(
f"行为识别子进程 X3D{mp4_path.name}",
pipeline="action",
step="action_subprocess",
run_id=rid,
source=mp4_path.name,
metrics={"checkpoint": settings.action_checkpoint},
):
proc = run_subprocess_with_log(
cmd,
cwd=str(settings.fish_action_root),
env=os.environ.copy(),
log_name="FishAction",
stream_to_logger=False,
dump_dir=_action_subprocess_dump_dir(settings),
run_id=rid,
source_label=mp4_path.stem,
kind="predict_video_x3d_3class",
pipeline="action",
step="action_subprocess",
summary_tail_lines=settings.log_subprocess_tail_lines,
)
2026-05-13 09:19:31 +08:00
if proc.returncode != 0:
err = proc.stdout or ""
raise RuntimeError(
f"predict_video_x3d_3class.py failed ({proc.returncode}): {err[-4000:]}"
)
if not Path(out_json).is_file():
raise RuntimeError(f"No output_json written: {out_json}")
with open(out_json, encoding="utf-8") as f:
rows = json.load(f)
if not rows:
raise RuntimeError("Empty prediction JSON")
pred_en = str(rows[0].get("pred_3class", "")).strip().lower()
logger.bind(pipeline="action", run_id=rid).debug(
"[FishAction] 预测原始行:\n{}",
format_json_pretty(rows[0]),
)
if pred_en not in BEHAVIOR_EN_TO_ZH:
raise RuntimeError(f"Unexpected pred_3class: {pred_en!r}")
return pred_en
finally:
Path(csv_path).unlink(missing_ok=True)
Path(out_json).unlink(missing_ok=True)
def prepare_action_slices(
2026-05-13 09:19:31 +08:00
mp4_path: Path, settings: Settings, *, run_id: Optional[str] = None,
) -> Tuple[List[Path], float]:
"""Check video duration and create slices if needed.
2026-04-13 17:13:02 +08:00
Returns ``(slice_files, duration)``. If the video is short enough,
returns ``([mp4_path], duration)`` without slicing.
2026-04-13 17:13:02 +08:00
"""
2026-05-13 09:19:31 +08:00
rid = run_id or new_run_id("action")
log = logger.bind(pipeline="action", run_id=rid, source=mp4_path.name)
log.info("[FishAction] 开始处理视频 | mp4={}", mp4_path.resolve())
with stage(
f"视频切片预处理({mp4_path.name}",
pipeline="action",
step="prepare_slices",
run_id=rid,
source=mp4_path.name,
metrics={"min_duration_for_slice": DEFAULT_MIN_DURATION_FOR_SLICE,
"slice_duration": DEFAULT_SLICE_DURATION},
):
duration = get_video_duration(mp4_path)
if duration > DEFAULT_MIN_DURATION_FOR_SLICE:
log.info(
"[FishAction] 视频时长 {:.1f}s > {}s切分为 {}s 一段",
duration, DEFAULT_MIN_DURATION_FOR_SLICE, DEFAULT_SLICE_DURATION,
2026-04-13 17:13:02 +08:00
)
2026-05-13 09:19:31 +08:00
slice_files, _slice_dir = slice_video(mp4_path, DEFAULT_SLICE_DURATION)
if len(slice_files) > 1:
log.info(
"[FishAction] {} 将处理 {} 段切片",
mp4_path.name, len(slice_files),
)
return slice_files, duration
2026-05-13 09:19:31 +08:00
return [mp4_path], duration
def run_single_slice_inference(
slice_file: Path,
slice_index: int,
total_slices: int,
duration: float,
mp4_name: str,
settings: Settings,
2026-05-13 09:19:31 +08:00
*,
run_id: Optional[str] = None,
) -> HealthSnapshot:
"""Run FishAction inference on a single video file / slice.
2026-04-13 17:13:02 +08:00
Returns a ``HealthSnapshot`` (may contain error info if inference fails).
"""
start_time = slice_index * DEFAULT_SLICE_DURATION
end_time = min(start_time + DEFAULT_SLICE_DURATION, duration)
2026-04-13 17:13:02 +08:00
2026-05-13 09:19:31 +08:00
rid = run_id or new_run_id("action")
log = logger.bind(pipeline="action", run_id=rid, source=slice_file.name)
try:
2026-05-13 09:19:31 +08:00
with stage(
f"切片识别 #{slice_index + 1}/{total_slices}{slice_file.name}",
pipeline="action",
step="slice_inference",
run_id=rid,
source=slice_file.name,
metrics={
"slice_index": slice_index,
"total_slices": total_slices,
"start_sec": round(start_time, 2),
"end_sec": round(end_time, 2),
},
):
pred_en = run_action_subprocess(slice_file, settings, run_id=rid)
zh = BEHAVIOR_EN_TO_ZH[pred_en]
health = behavior_to_health(pred_en)
snap = HealthSnapshot(
behavior_result=zh,
health_result=health,
updated_at=datetime.now(timezone.utc),
raw_class_en=pred_en,
)
2026-05-13 09:19:31 +08:00
# 中文识别结果事件(用于离线分析)
logger.bind(
event=True,
pipeline="action",
step="slice_result",
run_id=rid,
source=slice_file.name,
status="info",
metrics={
"slice_index": slice_index,
"total_slices": total_slices,
"start_sec": round(start_time, 2),
"end_sec": round(end_time, 2),
"pred_3class": pred_en,
"behavior_zh": zh,
"health_zh": health,
"mp4_name": mp4_name,
},
).info(
"[FishAction] 切片识别结果 | {} | {} | 段 {}/{} | {:.1f}~{:.1f}s | 行为={} | 健康={}",
mp4_name,
slice_file.name,
slice_index + 1,
total_slices,
start_time,
end_time,
zh,
health,
)
if total_slices == 1:
log.info(
"[FishAction] 视频识别完成 | mp4={} | 行为={} | 健康={}",
mp4_name, zh, health,
2026-04-13 17:13:02 +08:00
)
return snap
except Exception as e:
2026-05-13 09:19:31 +08:00
log.error(
"[FishAction] 切片处理失败 | slice={}/{} | {}: {}",
slice_index + 1, total_slices, type(e).__name__, e,
)
return HealthSnapshot(
behavior_result="处理失败",
health_result="未知",
updated_at=datetime.now(timezone.utc),
raw_class_en="error",
error=str(e),
)