"""视频切片工具:将长视频按固定时长切分为多个片段。""" from __future__ import annotations import subprocess import tempfile from pathlib import Path from typing import List, Tuple from loguru import logger def _get_ffmpeg_path() -> str: """获取可用的 ffmpeg 路径。""" project_ffmpeg = Path("/home/ubuntu/projects/FishServer/tools/ffmpeg/bin/ffmpeg") if project_ffmpeg.is_file(): return str(project_ffmpeg) system_paths = ["/usr/bin/ffmpeg", "/usr/local/bin/ffmpeg"] for path in system_paths: if Path(path).is_file(): return path return "ffmpeg" def _get_ffprobe_path() -> str: """获取可用的 ffprobe 路径。""" ffmpeg_path = Path(_get_ffmpeg_path()) ffprobe = ffmpeg_path.parent / "ffprobe" if ffprobe.is_file(): return str(ffprobe) return "ffprobe" def get_video_duration(video_path: Path) -> float: """获取视频时长(秒)。""" ffprobe_path = _get_ffprobe_path() try: result = subprocess.run( [ ffprobe_path, "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", str(video_path), ], capture_output=True, text=True, timeout=30, ) if result.returncode == 0: duration = float(result.stdout.strip()) return duration except Exception as e: logger.warning("[video_slice] failed to get duration: {}", e) return 0.0 def slice_video( video_path: Path, slice_duration: float = 10.0, output_dir: Path | None = None, ) -> Tuple[List[Path], Path]: """将视频按固定时长切分为多个片段。 Args: video_path: 源视频路径 slice_duration: 每个切片的时长(秒),默认10秒 output_dir: 输出目录,默认使用临时目录 Returns: (切片文件列表, 输出目录路径) """ duration = get_video_duration(video_path) if duration <= slice_duration: # 视频太短,无需切片 return [video_path], video_path.parent if output_dir is None: output_dir = Path(tempfile.mkdtemp(prefix="video_slices_")) output_dir.mkdir(parents=True, exist_ok=True) ffmpeg_path = _get_ffmpeg_path() base_name = video_path.stem # 计算需要切多少片 num_slices = int(duration / slice_duration) if duration % slice_duration > 1.0: # 剩余超过1秒才多切一片 num_slices += 1 logger.info( "[video_slice] slicing {} ({}s) into {} slices of {}s each", video_path.name, duration, num_slices, slice_duration, ) slice_files: List[Path] = [] for i in range(num_slices): start_time = i * slice_duration slice_file = output_dir / f"{base_name}_slice_{i:03d}.mp4" # 使用 ffmpeg 切片,-c copy 快速复制,避免重新编码 cmd = [ ffmpeg_path, "-y", "-ss", str(start_time), "-t", str(slice_duration), "-i", str(video_path), "-c", "copy", "-avoid_negative_ts", "make_zero", str(slice_file), ] try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=60, ) if result.returncode == 0 and slice_file.is_file(): slice_files.append(slice_file) logger.debug( "[video_slice] created slice {}: {} (start={}s)", i, slice_file.name, start_time, ) else: logger.warning( "[video_slice] failed to create slice {}: {}", i, result.stderr[-200:] if result.stderr else "unknown", ) except Exception as e: logger.warning("[video_slice] exception creating slice {}: {}", i, e) if not slice_files: # 切片失败,返回原视频 return [video_path], video_path.parent logger.info( "[video_slice] created {} slices for {}", len(slice_files), video_path.name, ) return slice_files, output_dir