from __future__ import annotations import asyncio import shutil from pathlib import Path from typing import Dict, List, Set, Tuple from loguru import logger from app.compat import to_thread from app.db import ( add_watch_processed, health_snapshot_deliverable, load_watch_processed, save_health_snapshot, ) from app.logging_config import new_run_id, stage from app.services import action as action_svc from app.services.measure import transcode_src_to_h264_dst from app.services.video_slice import _get_ffmpeg_path from app.settings import Settings from app.state import app_state from app.watch_idle import IdleWatchWarnState, idle_warn_interval_sec, maybe_warn_idle_watch _ACTION_IDLE_WARN_INTERVAL_SEC = idle_warn_interval_sec( "FISH_ACTION_WATCH_IDLE_WARN_INTERVAL_SEC" ) def _state_path(settings: Settings) -> Path: """返回旧版 JSON 状态文件路径(仅用于兼容导入 SQLite)。""" assert settings.action_watch_dir is not None return settings.action_watch_dir / ".fishaction_watch_processed.json" _VIDEO_SUFFIXES = frozenset({".mp4", ".mkv", ".mov"}) def iter_mp4(watch_dir: Path, recursive: bool) -> List[Path]: """List video files (.mp4, .mkv, .mov) in *watch_dir*.""" if recursive: return sorted( p for p in watch_dir.rglob("*") if p.is_file() and p.suffix.lower() in _VIDEO_SUFFIXES ) return sorted( p for p in watch_dir.iterdir() if p.is_file() and p.suffix.lower() in _VIDEO_SUFFIXES ) def _public_media_url(settings: Settings, basename: str) -> str: base = settings.public_base_url.rstrip("/") return f"{base}/media/{basename}" def _slice_media_basename(base_name: str, slice_index: int) -> str: """生成切片视频的媒体文件名。""" base = Path(base_name).stem return f"{base}_slice_{slice_index:03d}.mp4" def _publish_slice_video( src: Path, dst: Path, settings: Settings, ) -> str: """Transcode and publish a single video slice to media_root. Returns the public URL on success, empty string on failure. """ tmp = dst.with_name(dst.stem + "_tmp.mp4") tmp.unlink(missing_ok=True) log = logger.bind(pipeline="action_watch", source=src.name) try: log.info( "[行为监控] 切片 H.264 转码开始 | ffmpeg={} | {} -> {}", _get_ffmpeg_path(), src.name, tmp.name, ) ok = transcode_src_to_h264_dst(src, tmp) if ok and tmp.is_file() and tmp.stat().st_size > 0: tmp.replace(dst) log.info("[行为监控] 切片已发布为 H.264:{} -> {}", src.name, dst.name) else: tmp.unlink(missing_ok=True) shutil.copy2(src, dst) log.warning( "[行为监控] 转码失败,已直接拷贝原文件:{} -> {}(详见 measure 管线日志)", src.name, dst.name, ) return _public_media_url(settings, dst.name) except Exception: log.exception("[行为监控] 切片发布异常 | {}", src.name) tmp.unlink(missing_ok=True) if dst.is_file(): return _public_media_url(settings, dst.name) return "" async def _run_inference_and_state( mp4: Path, settings: Settings, processed: Set[str], state_file: Path, ) -> None: key = str(mp4.resolve()) if key in processed: return rid = new_run_id("action_watch") log = logger.bind(pipeline="action_watch", run_id=rid, source=mp4.name) try: size_mb = mp4.stat().st_size / (1024 * 1024) except OSError: size_mb = 0.0 log.info( "[行为监控] 触发行为识别 | mp4={} | 大小={:.1f} MB", mp4, size_mb, ) async with app_state.action_lock: app_state.action_status = "running" try: with stage( f"行为识别 watch 全流程({mp4.name})", pipeline="action_watch", step="file_total", run_id=rid, source=mp4.name, metrics={"size_mb": round(size_mb, 2)}, ): slice_files, duration = await to_thread( action_svc.prepare_action_slices, mp4, settings, run_id=rid, ) settings.media_root.mkdir(parents=True, exist_ok=True) base_name = ( settings.biomass_water_video_media_name or "biomass_water_surface.mp4" ) saved_count = 0 first_snap = None total_slices = len(slice_files) for i, slice_file in enumerate(slice_files): snap = await to_thread( action_svc.run_single_slice_inference, slice_file, i, total_slices, duration, mp4.name, settings, run_id=rid, ) if first_snap is None: first_snap = snap if not health_snapshot_deliverable(snap): continue video_url = "" media_basename = _slice_media_basename(base_name, i) dst = settings.media_root / media_basename video_url = await to_thread( _publish_slice_video, slice_file, dst, settings, ) slice_key = f"{key}#slice{i}" save_health_snapshot( settings, snap, source_path=slice_key, video_url=video_url, ) saved_count += 1 if saved_count == 0: log.warning( "[行为监控] 无可投递结果,跳过写库 | mp4={}", mp4.name, ) app_state.action_status = "idle" processed.add(key) if settings.action_watch_use_state_file: add_watch_processed(settings, key, "action") pred = (first_snap.raw_class_en or "").strip() if first_snap else "" zh = first_snap.behavior_result if first_snap else "" health = first_snap.health_result if first_snap else "" log.info( "[行为监控] 完成 | mp4={} | 类别={} ({}) | 健康={} | 投递切片={}", mp4.name, zh or "-", pred or "-", health or "-", saved_count, ) except Exception as e: log.exception("[行为监控] 处理异常 | mp4={} | {}", mp4, e) app_state.action_status = "idle" processed.add(key) if settings.action_watch_use_state_file: add_watch_processed(settings, key, "action") async def watch_tick( settings: Settings, processed: Set[str], stability: Dict[str, Tuple[int, int]], state_file: Path, ) -> bool: """处理一轮目录扫描;若处理了至少一个文件返回 True。""" assert settings.action_watch_dir is not None watch_dir = settings.action_watch_dir did = False seen_keys: Set[str] = set() for mp4 in iter_mp4(watch_dir, settings.action_watch_recursive): key = str(mp4.resolve()) seen_keys.add(key) if key in processed: continue try: st = mp4.stat() except OSError: continue size = int(st.st_size) if size <= 0: stability.pop(key, None) continue last = stability.get(key) if last is None or last[0] != size: stability[key] = (size, 1) else: _, cnt = last stability[key] = (size, cnt + 1) _, cnt = stability[key] if cnt >= settings.action_watch_stable_polls: await _run_inference_and_state(mp4, settings, processed, state_file) stability.pop(key, None) did = True for k in list(stability.keys()): if k not in seen_keys: del stability[k] return did async def run_action_watch_loop(settings: Settings) -> None: assert settings.action_watch_dir is not None wd = settings.action_watch_dir if not wd.is_dir(): logger.bind(pipeline="action_watch").warning( "[行为监控] 启动跳过:路径不是目录 {}", wd ) return state_file = _state_path(settings) processed: Set[str] = ( load_watch_processed(settings, state_file, "action") if settings.action_watch_use_state_file else set() ) stability = {} # type: Dict[str, Tuple[int, int]] logger.bind(pipeline="action_watch").info( "[行为监控] 监控目录已启动 | dir={} | poll={}s | stable_polls={} | state={} {}", wd, settings.action_watch_poll_interval, settings.action_watch_stable_polls, "on" if settings.action_watch_use_state_file else "off", state_file if settings.action_watch_use_state_file else "", ) idle_warn_state = IdleWatchWarnState() while True: did = await watch_tick(settings, processed, stability, state_file) maybe_warn_idle_watch( did_work=did, log_tag="action-watch", algo_name="FishAction", idle_hint="目录内无 .mp4、已全部处理完毕,或文件尚未达到稳定轮询次数", watch_dir=wd, state=idle_warn_state, interval_sec=_ACTION_IDLE_WARN_INTERVAL_SEC, ) await asyncio.sleep(max(settings.action_watch_poll_interval, 0.1))