From c1aafc69bf7b1e49b3269422636f8a26d32122c2 Mon Sep 17 00:00:00 2001 From: zaiun xu Date: Mon, 13 Apr 2026 17:13:02 +0800 Subject: [PATCH] =?UTF-8?q?=E9=AA=8C=E6=94=B61?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- fish_api/app/db.py | 40 ++++- fish_api/app/routers/biomass.py | 21 ++- fish_api/app/routers/ingest.py | 15 +- fish_api/app/services/action.py | 98 ++++++++++- fish_api/app/services/action_watch.py | 30 +++- fish_api/app/services/measure.py | 54 +++++- fish_api/app/services/video_slice.py | 148 ++++++++++++++++ fish_api/app/services/water_video.py | 240 +++++++++++++++++++++----- 8 files changed, 580 insertions(+), 66 deletions(-) create mode 100644 fish_api/app/services/video_slice.py diff --git a/fish_api/app/db.py b/fish_api/app/db.py index ea44e34..d914ee0 100644 --- a/fish_api/app/db.py +++ b/fish_api/app/db.py @@ -22,6 +22,36 @@ from app.state import HealthSnapshot, MeasureSnapshot DEFAULT_CLIENT_ID = "default" MAX_CLIENT_ID_LEN = 128 +# 客户端切片索引起缓存:记录每个 client_id 上次返回的切片索引(用于对齐 water/video 端点) +_client_health_slice_index: dict[str, int] = {} + + +def _parse_slice_index_from_source_path(source_path: Optional[str]) -> int: + """从 source_path 解析切片索引,格式为 video.mp4#slice{N}。 + + Returns: + 切片序号(>=0),如果不是切片则返回 -1 + """ + if not source_path: + return -1 + if "#slice" not in source_path: + return -1 + try: + idx_part = source_path.split("#slice")[-1] + return int(idx_part) + except (ValueError, IndexError): + return -1 + + +def get_last_health_slice_index(client_id: str) -> int: + """获取指定 client_id 上次返回的切片索引(用于 water/video 端点对齐)。 + + Returns: + 切片序号(>=0),如果没有记录则返回 -1 + """ + cid = normalize_client_id(client_id) + return _client_health_slice_index.get(cid, -1) + def normalize_client_id(raw: Optional[str]) -> str: """供轮询接口使用;过长截断,空值回退为 DEFAULT_CLIENT_ID。""" @@ -447,6 +477,8 @@ def pop_next_health( client_id: str = DEFAULT_CLIENT_ID, ) -> Tuple[HealthSnapshot, bool, Optional[int]]: """取该客户端队首未投递且可交付的 health 快照并推进其游标;跳过不可交付行仅推进游标。""" + global _client_health_slice_index + cid = normalize_client_id(client_id) init_db(settings) conn = _connect(settings.sqlite_path) @@ -458,7 +490,7 @@ def pop_next_health( next_row = conn.execute( """ SELECT id, created_at, behavior_result, health_result, - raw_class_en, error + raw_class_en, error, source_path FROM health_snapshots WHERE id > ? ORDER BY id ASC @@ -476,6 +508,7 @@ def pop_next_health( hlth = next_row["health_result"] or "" raw_en = next_row["raw_class_en"] or "" err: Optional[str] = next_row["error"] + source_path: Optional[str] = next_row["source_path"] conn.execute( """ @@ -490,6 +523,11 @@ def pop_next_health( continue conn.commit() + + # 解析并记录切片索引(用于与 water/video 端点对齐) + slice_idx = _parse_slice_index_from_source_path(source_path) + _client_health_slice_index[cid] = slice_idx + snap = HealthSnapshot( behavior_result=beh, health_result=hlth, diff --git a/fish_api/app/routers/biomass.py b/fish_api/app/routers/biomass.py index 821ebd6..553dffe 100644 --- a/fish_api/app/routers/biomass.py +++ b/fish_api/app/routers/biomass.py @@ -85,7 +85,10 @@ async def get_health_result( settings: Settings = Depends(get_settings), client_id: str = Depends(_resolve_client_id), ): - """行为 / 健康结果:每次 GET 投递该客户端下一条未消费的 FishAction 快照(按 client_id 独立游标)。""" + """行为 / 健康结果:每次 GET 投递该客户端下一条未消费的 FishAction 快照(按 client_id 独立游标)。 + + 每个视频切片被视为独立的视频,会分别投递。 + """ h, has_new, _ = pop_next_health(settings, client_id) if not has_new: return JSONResponse( @@ -125,9 +128,19 @@ async def get_health_result( @router.get("/water/video/") -async def get_water_video(settings: Settings = Depends(get_settings)): - """水上视频:FishAction 输入 mp4 经 H.264 转码后托管在 /media/,返回 `video_path` 绝对 URL。""" - video_path = await get_water_video_public_url(settings) +async def get_water_video( + settings: Settings = Depends(get_settings), + client_id: str = Depends(_resolve_client_id), +): + """水上视频:FishAction 输入 mp4 经 H.264 转码后托管在 /media/,返回 `video_path` 绝对 URL。 + + 如果视频较长(超过15秒),会自动切分为多个10秒的片段。 + 每个切片被视为独立的视频,每次调用返回一个切片的URL(按顺序轮流返回)。 + + 对齐机制:使用 client_id 参数(请求头 X-Fish-Client-Id 或查询参数 client_id) + 确保与 /health/result/ 端点对齐返回同一切片。 + """ + video_path = await get_water_video_public_url(settings, client_id) return JSONResponse( content={ "code": 200, diff --git a/fish_api/app/routers/ingest.py b/fish_api/app/routers/ingest.py index bd8ba79..068b09b 100644 --- a/fish_api/app/routers/ingest.py +++ b/fish_api/app/routers/ingest.py @@ -46,13 +46,18 @@ async def _action_job_serial(mp4_path: Path, settings: Settings) -> None: async with app_state.action_lock: app_state.action_status = "running" try: - snap = await asyncio.to_thread( + # 返回 (第一个快照, 所有切片快照列表) + first_snap, all_snaps = await asyncio.to_thread( action_svc.run_full_action, mp4_path, settings ) - if health_snapshot_deliverable(snap): - save_health_snapshot( - settings, snap, source_path=str(mp4_path.resolve()) - ) + + # 将所有切片作为独立视频保存到数据库 + for i, snap in enumerate(all_snaps): + if health_snapshot_deliverable(snap): + # 为每个切片生成独立的 source_path + slice_source = f"{mp4_path.resolve()}#slice{i}" + save_health_snapshot(settings, snap, source_path=slice_source) + app_state.action_status = "idle" except Exception: app_state.action_status = "error" diff --git a/fish_api/app/services/action.py b/fish_api/app/services/action.py index 39ada07..8d067ca 100644 --- a/fish_api/app/services/action.py +++ b/fish_api/app/services/action.py @@ -6,13 +6,19 @@ import sys import tempfile from datetime import datetime, timezone from pathlib import Path +from typing import List from app.logging_config import format_json_pretty +from app.services.video_slice import get_video_duration, slice_video from app.settings import Settings from app.state import HealthSnapshot from app.subprocess_run import run_subprocess_with_log from loguru import logger +# 视频切片配置 +DEFAULT_SLICE_DURATION = 10.0 # 每10秒切一个片段 +DEFAULT_MIN_DURATION_FOR_SLICE = 15.0 # 视频超过15秒才切片 + BEHAVIOR_EN_TO_ZH = { "feeding": "吃饵", "normal": "正常游行", @@ -100,8 +106,95 @@ def run_action_subprocess(mp4_path: Path, settings: Settings) -> str: Path(out_json).unlink(missing_ok=True) -def run_full_action(mp4_path: Path, settings: Settings) -> HealthSnapshot: +def run_full_action(mp4_path: Path, settings: Settings) -> tuple[HealthSnapshot, list[HealthSnapshot]]: + """运行 FishAction 健康检测。如果视频较长,会自动切片后分别检测。 + + 每个切片被视为独立的视频,返回所有切片的结果列表。 + + Args: + mp4_path: 输入视频路径 + settings: 应用配置 + + Returns: + tuple[HealthSnapshot, list[HealthSnapshot]]: (第一个切片/完整视频的快照, 所有切片快照列表) + - 如果视频被切片:返回 (第一个切片, 所有切片列表) + - 如果视频未被切片:返回 (完整视频快照, [完整视频快照]) + """ logger.info("[FishAction] start mp4={}", mp4_path.resolve()) + + # 检查视频时长 + duration = get_video_duration(mp4_path) + should_slice = duration > DEFAULT_MIN_DURATION_FOR_SLICE + + if should_slice: + # 视频较长,需要切片处理 + logger.info( + "[FishAction] video duration {}s > {}s, slicing into {}s segments", + duration, + DEFAULT_MIN_DURATION_FOR_SLICE, + DEFAULT_SLICE_DURATION, + ) + + slice_files, slice_dir = slice_video(mp4_path, DEFAULT_SLICE_DURATION) + + if len(slice_files) > 1: + logger.info( + "[FishAction] processing {} slices for {}", + len(slice_files), + mp4_path.name, + ) + + # 处理每个切片 + all_snaps: list[HealthSnapshot] = [] + for i, slice_file in enumerate(slice_files): + start_time = i * DEFAULT_SLICE_DURATION + end_time = min(start_time + DEFAULT_SLICE_DURATION, duration) + + try: + pred_en = run_action_subprocess(slice_file, settings) + zh = BEHAVIOR_EN_TO_ZH[pred_en] + health = behavior_to_health(pred_en) + + snap = HealthSnapshot( + behavior_result=zh, + health_result=health, + updated_at=datetime.now(timezone.utc), + raw_class_en=pred_en, + ) + + logger.info( + "[FishAction] slice {} ({}s-{}s): pred={} behavior={} health={}", + i, start_time, end_time, pred_en, zh, health, + ) + + all_snaps.append(snap) + except Exception as e: + logger.error("[FishAction] failed to process slice {}: {}", i, e) + # 创建一个表示失败的快照 + error_snap = HealthSnapshot( + behavior_result="处理失败", + health_result="未知", + updated_at=datetime.now(timezone.utc), + raw_class_en="error", + error=str(e), + ) + all_snaps.append(error_snap) + + logger.info( + "[FishAction] done mp4={} total_slices={}", + mp4_path.name, + len(slice_files), + ) + + # 返回第一个切片的结果和所有切片列表 + first_snap = all_snaps[0] if all_snaps else HealthSnapshot( + behavior_result="", + health_result="", + updated_at=datetime.now(timezone.utc), + ) + return first_snap, all_snaps + + # 视频较短,直接处理(原有逻辑) pred_en = run_action_subprocess(mp4_path, settings) zh = BEHAVIOR_EN_TO_ZH[pred_en] health = behavior_to_health(pred_en) @@ -112,9 +205,10 @@ def run_full_action(mp4_path: Path, settings: Settings) -> HealthSnapshot: zh, health, ) - return HealthSnapshot( + snap = HealthSnapshot( behavior_result=zh, health_result=health, updated_at=datetime.now(timezone.utc), raw_class_en=pred_en, ) + return snap, [snap] diff --git a/fish_api/app/services/action_watch.py b/fish_api/app/services/action_watch.py index 78b6b8f..73c645b 100644 --- a/fish_api/app/services/action_watch.py +++ b/fish_api/app/services/action_watch.py @@ -56,20 +56,38 @@ async def _run_inference_and_state( async with app_state.action_lock: app_state.action_status = "running" try: - snap = await asyncio.to_thread(action_svc.run_full_action, mp4, settings) - if health_snapshot_deliverable(snap): - save_health_snapshot(settings, snap, source_path=key) - else: + # 返回 (第一个快照, 所有切片快照列表) + first_snap, all_snaps = await asyncio.to_thread( + action_svc.run_full_action, mp4, settings + ) + + # 将所有切片作为独立视频保存到数据库 + saved_count = 0 + for i, snap in enumerate(all_snaps): + if health_snapshot_deliverable(snap): + # 为每个切片生成独立的 source_path + slice_key = f"{key}#slice{i}" + save_health_snapshot(settings, snap, source_path=slice_key) + saved_count += 1 + + if saved_count == 0: logger.warning( "[action-watch] no deliverable health snapshot for {}, skip SQLite", mp4.name, ) + app_state.action_status = "idle" processed.add(key) if settings.action_watch_use_state_file: add_watch_processed(settings, key, "action") - pred = (snap.raw_class_en or "").strip() - logger.info("[action-watch] done: {} -> {}", mp4.name, pred) + + pred = (first_snap.raw_class_en or "").strip() + logger.info( + "[action-watch] done: {} -> {} (saved {} slices)", + mp4.name, + pred, + saved_count, + ) except Exception as e: logger.exception("[action-watch] error on {}: {}", mp4, e) app_state.action_status = "idle" diff --git a/fish_api/app/services/measure.py b/fish_api/app/services/measure.py index 29505c2..b07005d 100644 --- a/fish_api/app/services/measure.py +++ b/fish_api/app/services/measure.py @@ -11,6 +11,8 @@ from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional, Tuple +import numpy as np + from app.logging_config import format_json_pretty from app.settings import Settings from app.state import MeasureSnapshot @@ -187,12 +189,15 @@ def _safe_media_prefix(stem: str) -> str: def _result_from_weight_prediction(data: Dict[str, Any]) -> List[Dict[str, Any]]: - """按 track_id 聚合:体重取 max(predicted_weight_g),体长取达到 max 的那条 PLY 的 length_input (mm)。""" + """按 track_id 聚合:对每个 track_id 的点云,取 predicted_weight_g 最大的 top5 平均值作为体重, + 体长取这 top5 点云的平均 length_input (mm)。只使用通过点云分类器的好点云(used_for_prediction=True)。""" items = data.get("per_cloud") or data.get("per_file") or [] if not isinstance(items, list): return [] - # tid -> (max_weight_g, length_mm at max weight) - best: Dict[int, Tuple[float, float]] = {} + + # tid -> list of (weight_g, length_mm) for this track + track_predictions: Dict[int, List[Tuple[float, float]]] = {} + for it in items: if not isinstance(it, dict): continue @@ -202,24 +207,55 @@ def _result_from_weight_prediction(data: Dict[str, Any]) -> List[Dict[str, Any]] tid = _parse_tid_from_ply_name(Path(str(ply)).name) if tid is None: continue + + # 只使用通过点云分类器的好点云 + if not it.get("used_for_prediction", True): + continue + try: wg = float(it.get("predicted_weight_g", float("nan"))) except (TypeError, ValueError): continue if not math.isfinite(wg): continue + try: ln = float(it.get("length_input", float("nan"))) except (TypeError, ValueError): ln = float("nan") - if tid not in best or wg > best[tid][0]: - best[tid] = (wg, ln) + + if tid not in track_predictions: + track_predictions[tid] = [] + track_predictions[tid].append((wg, ln)) + out: List[Dict[str, Any]] = [] - for tid in sorted(best.keys()): - wg, ln = best[tid] - if not math.isfinite(ln): + TOP_K = 5 + + for tid in sorted(track_predictions.keys()): + predictions = track_predictions[tid] + if not predictions: continue - out.append({"id": tid, "weight": wg, "length": ln}) + + # 按重量降序排序,取 top5 + predictions_sorted = sorted(predictions, key=lambda x: x[0], reverse=True) + top5 = predictions_sorted[:min(TOP_K, len(predictions_sorted))] + + # 计算 top5 平均重量和平均长度 + avg_weight = float(np.mean([p[0] for p in top5])) + valid_lengths = [p[1] for p in top5 if math.isfinite(p[1])] + avg_length = float(np.mean(valid_lengths)) if valid_lengths else float("nan") + + if not math.isfinite(avg_length): + continue + + out.append({ + "id": tid, + "type": "大黄鱼", + "weight": str(round(avg_weight)), + "length": str(round(avg_length)), + "date": datetime.now(timezone.utc).strftime("%Y-%m-%d"), + }) + return out diff --git a/fish_api/app/services/video_slice.py b/fish_api/app/services/video_slice.py new file mode 100644 index 0000000..d7b8b34 --- /dev/null +++ b/fish_api/app/services/video_slice.py @@ -0,0 +1,148 @@ +"""视频切片工具:将长视频按固定时长切分为多个片段。""" + +from __future__ import annotations + +import subprocess +import tempfile +from pathlib import Path +from typing import List, Tuple + +from loguru import logger + + +def _get_ffmpeg_path() -> str: + """获取可用的 ffmpeg 路径。""" + project_ffmpeg = Path("/home/ubuntu/projects/FishServer/tools/ffmpeg/bin/ffmpeg") + if project_ffmpeg.is_file(): + return str(project_ffmpeg) + system_paths = ["/usr/bin/ffmpeg", "/usr/local/bin/ffmpeg"] + for path in system_paths: + if Path(path).is_file(): + return path + return "ffmpeg" + + +def _get_ffprobe_path() -> str: + """获取可用的 ffprobe 路径。""" + ffmpeg_path = Path(_get_ffmpeg_path()) + ffprobe = ffmpeg_path.parent / "ffprobe" + if ffprobe.is_file(): + return str(ffprobe) + return "ffprobe" + + +def get_video_duration(video_path: Path) -> float: + """获取视频时长(秒)。""" + ffprobe_path = _get_ffprobe_path() + try: + result = subprocess.run( + [ + ffprobe_path, + "-v", "error", + "-show_entries", "format=duration", + "-of", "default=noprint_wrappers=1:nokey=1", + str(video_path), + ], + capture_output=True, + text=True, + timeout=30, + ) + if result.returncode == 0: + duration = float(result.stdout.strip()) + return duration + except Exception as e: + logger.warning("[video_slice] failed to get duration: {}", e) + return 0.0 + + +def slice_video( + video_path: Path, + slice_duration: float = 10.0, + output_dir: Path | None = None, +) -> Tuple[List[Path], Path]: + """将视频按固定时长切分为多个片段。 + + Args: + video_path: 源视频路径 + slice_duration: 每个切片的时长(秒),默认10秒 + output_dir: 输出目录,默认使用临时目录 + + Returns: + (切片文件列表, 输出目录路径) + """ + duration = get_video_duration(video_path) + if duration <= slice_duration: + # 视频太短,无需切片 + return [video_path], video_path.parent + + if output_dir is None: + output_dir = Path(tempfile.mkdtemp(prefix="video_slices_")) + output_dir.mkdir(parents=True, exist_ok=True) + + ffmpeg_path = _get_ffmpeg_path() + base_name = video_path.stem + + # 计算需要切多少片 + num_slices = int(duration / slice_duration) + if duration % slice_duration > 1.0: # 剩余超过1秒才多切一片 + num_slices += 1 + + logger.info( + "[video_slice] slicing {} ({}s) into {} slices of {}s each", + video_path.name, + duration, + num_slices, + slice_duration, + ) + + slice_files: List[Path] = [] + for i in range(num_slices): + start_time = i * slice_duration + slice_file = output_dir / f"{base_name}_slice_{i:03d}.mp4" + + # 使用 ffmpeg 切片,-c copy 快速复制,避免重新编码 + cmd = [ + ffmpeg_path, + "-y", + "-ss", str(start_time), + "-t", str(slice_duration), + "-i", str(video_path), + "-c", "copy", + "-avoid_negative_ts", "make_zero", + str(slice_file), + ] + + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=60, + ) + if result.returncode == 0 and slice_file.is_file(): + slice_files.append(slice_file) + logger.debug( + "[video_slice] created slice {}: {} (start={}s)", + i, + slice_file.name, + start_time, + ) + else: + logger.warning( + "[video_slice] failed to create slice {}: {}", + i, + result.stderr[-200:] if result.stderr else "unknown", + ) + except Exception as e: + logger.warning("[video_slice] exception creating slice {}: {}", i, e) + + if not slice_files: + # 切片失败,返回原视频 + return [video_path], video_path.parent + + logger.info( + "[video_slice] created {} slices for {}", + len(slice_files), + video_path.name, + ) + return slice_files, output_dir diff --git a/fish_api/app/services/water_video.py b/fish_api/app/services/water_video.py index 551669a..0c2e55d 100644 --- a/fish_api/app/services/water_video.py +++ b/fish_api/app/services/water_video.py @@ -1,19 +1,35 @@ -"""水上视频:从 FishAction 输入目录或显式路径发布 H.264 MP4 到 MEDIA_ROOT。""" +"""水上视频:从 FishAction 输入目录或显式路径发布 H.264 MP4 到 MEDIA_ROOT。 + +支持长视频切片:如果视频较长,会切分为多个10秒片段并分别转码发布。 +每个切片被视为独立的视频。 + +对齐机制:使用 client_id 区分不同客户端的轮询进度,确保 health/result 和 +water/video 两个端点对齐返回同一切片。 +""" from __future__ import annotations import asyncio import shutil from pathlib import Path +from typing import Dict, List from loguru import logger from app.services.action_watch import iter_mp4 from app.services.measure import transcode_src_to_h264_dst +from app.services.video_slice import get_video_duration, slice_video from app.settings import Settings _publish_lock = asyncio.Lock() +# 视频切片配置 +SLICE_DURATION = 10.0 # 每个切片的时长(秒) +MIN_DURATION_FOR_SLICE = 15.0 # 超过此时长才切片 + +# 默认客户端ID(与 db.py 保持一致) +DEFAULT_CLIENT_ID = "default" + def _public_media_url(settings: Settings, basename: str) -> str: base = settings.public_base_url.rstrip("/") @@ -27,6 +43,23 @@ def _safe_water_media_basename(raw: str) -> str: return Path(n).name or "biomass_water_surface.mp4" +def _slice_media_basename(base_name: str, slice_index: int) -> str: + """生成切片视频的媒体文件名。""" + base = Path(base_name).stem + return f"{base}_slice_{slice_index:03d}.mp4" + + +# 客户端独立的状态:每个 client_id 有自己的切片队列和索引 +# _client_slice_queues[client_id] = [url0, url1, url2, ...] +# _client_slice_indices[client_id] = 当前应该返回的索引 +_client_slice_queues: Dict[str, List[str]] = {} +_client_slice_indices: Dict[str, int] = {} + +# 全局缓存的切片列表(用于检测源文件变化) +_global_slice_urls: List[str] = [] +_last_source_mtime: float = 0.0 + + def resolve_water_video_source(settings: Settings) -> Path | None: """优先 BIOMASS_WATER_VIDEO_SOURCE;否则取 ACTION_WATCH_DIR 中 mtime 最新的 .mp4。""" cfg = settings.biomass_water_video_source @@ -51,51 +84,180 @@ def resolve_water_video_source(settings: Settings) -> Path | None: return None -async def get_water_video_public_url(settings: Settings) -> str: - """转码并发布到 MEDIA_ROOT 后返回绝对 URL;无可用源且无已发布文件时返回空串。""" - settings.media_root.mkdir(parents=True, exist_ok=True) - basename = _safe_water_media_basename(settings.biomass_water_video_media_name) - dst = settings.media_root / basename +async def _publish_video( + src: Path, + dst: Path, + settings: Settings, +) -> str: + """发布视频到 MEDIA_ROOT。 - src = resolve_water_video_source(settings) - if src is None: + Args: + src: 源视频路径 + dst: 目标路径 + settings: 应用配置 + + Returns: + 发布的视频URL,失败返回空串 + """ + tmp = dst.with_name(dst.stem + "_tmp.mp4") + tmp.unlink(missing_ok=True) + + try: + ok = await asyncio.to_thread(transcode_src_to_h264_dst, src, tmp) + if ok and tmp.is_file() and tmp.stat().st_size > 0: + tmp.replace(dst) + logger.info("[water-video] published H.264: {} -> {}", src.name, dst.name) + else: + tmp.unlink(missing_ok=True) + await asyncio.to_thread(shutil.copy2, src, dst) + logger.warning( + "[water-video] transcode failed, copied raw: {} -> {}", + src.name, + dst.name, + ) + return _public_media_url(settings, dst.name) + except Exception: + logger.exception("[water-video] publish failed") + tmp.unlink(missing_ok=True) if dst.is_file(): return _public_media_url(settings, dst.name) return "" - async with _publish_lock: - need_publish = True - if dst.is_file(): - try: - if dst.stat().st_mtime >= src.stat().st_mtime: - need_publish = False - except OSError: - pass - if not need_publish: - return _public_media_url(settings, dst.name) - tmp = dst.with_name(dst.stem + "_tmp.mp4") - tmp.unlink(missing_ok=True) - try: - ok = await asyncio.to_thread(transcode_src_to_h264_dst, src, tmp) - if ok and tmp.is_file() and tmp.stat().st_size > 0: - tmp.replace(dst) - logger.info("[water-video] published H.264: {} -> {}", src.name, dst.name) - else: - tmp.unlink(missing_ok=True) - await asyncio.to_thread(shutil.copy2, src, dst) - logger.warning( - "[water-video] transcode failed, copied raw: {} -> {}", - src.name, - dst.name, - ) - except Exception: - logger.exception("[water-video] publish failed") - tmp.unlink(missing_ok=True) +async def _prepare_slices(settings: Settings) -> List[str]: + """预处理:如果视频较长,切分为多个片段并发布到 MEDIA_ROOT。 + + 返回切片URL列表(供各客户端使用)。 + """ + global _global_slice_urls, _last_source_mtime + + base_basename = _safe_water_media_basename(settings.biomass_water_video_media_name) + src = resolve_water_video_source(settings) + + if src is None: + return [] + + # 检查是否需要重新切片 + try: + current_mtime = src.stat().st_mtime + except OSError: + current_mtime = 0.0 + + # 如果源文件未变化且已有缓存,直接返回缓存 + if current_mtime == _last_source_mtime and _global_slice_urls: + return _global_slice_urls + + # 检查视频时长 + duration = get_video_duration(src) + should_slice = duration > MIN_DURATION_FOR_SLICE + + new_urls: List[str] = [] + + if should_slice: + # 视频较长,切片处理 + logger.info( + "[water-video] video duration {}s > {}s, slicing into {}s segments", + duration, + MIN_DURATION_FOR_SLICE, + SLICE_DURATION, + ) + + slice_files, slice_dir = slice_video(src, SLICE_DURATION) + + if len(slice_files) > 1: + # 发布每个切片 + for i, slice_file in enumerate(slice_files): + slice_basename = _slice_media_basename(base_basename, i) + dst = settings.media_root / slice_basename + + # 检查是否需要重新发布 + need_publish = True + if dst.is_file(): + try: + if dst.stat().st_mtime >= slice_file.stat().st_mtime: + need_publish = False + except OSError: + pass + + if need_publish: + url = await _publish_video(slice_file, dst, settings) + else: + url = _public_media_url(settings, dst.name) + + if url: + new_urls.append(url) + + logger.info( + "[water-video] prepared {} slices for {}", + len(new_urls), + src.name, + ) + + # 更新全局缓存 + _global_slice_urls = new_urls + _last_source_mtime = current_mtime + return new_urls + + # 视频较短,只保留完整视频 + dst = settings.media_root / base_basename + url = await _publish_video(src, dst, settings) + + if url: + new_urls = [url] + _global_slice_urls = new_urls + _last_source_mtime = current_mtime + return new_urls + + return [] + + +async def get_water_video_public_url(settings: Settings, client_id: str = DEFAULT_CLIENT_ID) -> str: + """转码并发布到 MEDIA_ROOT 后返回绝对 URL;无可用源且无已发布文件时返回空串。 + + 如果视频较长被切片,会根据 health/result 端点的状态返回对应的切片URL。 + + 对齐机制:查询 health 数据库记录的切片索引,确保与 health/result 端点对齐。 + 只有当 health/result 返回了第 N 个切片的行为结果后,本端点才会返回第 N 个切片的视频。 + + Args: + settings: 应用配置 + client_id: 客户端标识,默认为 "default" + + Returns: + 视频URL,失败返回空串 + """ + from app.db import get_last_health_slice_index + + settings.media_root.mkdir(parents=True, exist_ok=True) + + async with _publish_lock: + # 确保切片已准备好 + slice_urls = await _prepare_slices(settings) + + if not slice_urls: + # 没有切片,尝试返回已发布的文件 + basename = _safe_water_media_basename(settings.biomass_water_video_media_name) + dst = settings.media_root / basename if dst.is_file(): return _public_media_url(settings, dst.name) return "" - if dst.is_file(): - return _public_media_url(settings, dst.name) - return "" + # 查询 health 端点上次返回的切片索引 + target_slice_idx = get_last_health_slice_index(client_id) + + if target_slice_idx >= 0 and target_slice_idx < len(slice_urls): + # 返回与 health 结果对齐的切片 + logger.debug( + "[water-video] client_id={} aligned to health slice {}/{}", + client_id, + target_slice_idx, + len(slice_urls), + ) + return slice_urls[target_slice_idx] + else: + # 没有对齐的 health 结果,返回空(等待 health/result 先被调用) + logger.debug( + "[water-video] client_id={} no health index yet, returning empty", + client_id, + ) + return ""