live sonar feed, and incremental action feed

This commit is contained in:
kevin
2026-04-16 14:53:01 +08:00
parent cc6cef0f73
commit 34ecc33ee5
28 changed files with 1555 additions and 1227 deletions

View File

@@ -1,265 +1,25 @@
"""水上视频:从 FishAction 输入目录或显式路径发布 H.264 MP4 到 MEDIA_ROOT
"""水上视频:返回与最近投递的 health 快照对齐的视频 URL
支持长视频切片如果视频较长会切分为多个10秒片段并分别转码发布。
每个切片被视为独立的视频。
对齐机制:使用 client_id 区分不同客户端的轮询进度,确保 health/result 和
water/video 两个端点对齐返回同一切片。
视频由 action_watch 在推理完成后统一发布到 MEDIA_ROOTURL 存储在
health_snapshots.video_url 列中,确保 /health/result/ 与 /water/video/
两个端点始终返回同一切片的结果。
"""
from __future__ import annotations
import asyncio
import shutil
from pathlib import Path
from typing import Dict, List, Optional
from loguru import logger
from app.compat import to_thread
from app.services.action_watch import iter_mp4
from app.services.measure import transcode_src_to_h264_dst
from app.services.video_slice import get_video_duration, slice_video
from app.db import peek_last_delivered_health_video_url
from app.settings import Settings
_publish_lock = asyncio.Lock()
# 视频切片配置
SLICE_DURATION = 10.0 # 每个切片的时长(秒)
MIN_DURATION_FOR_SLICE = 15.0 # 超过此时长才切片
# 默认客户端ID与 db.py 保持一致)
DEFAULT_CLIENT_ID = "default"
def _public_media_url(settings: Settings, basename: str) -> str:
base = settings.public_base_url.rstrip("/")
return f"{base}/media/{basename}"
def _safe_water_media_basename(raw: str) -> str:
n = (raw or "").strip()
if not n:
return "biomass_water_surface.mp4"
return Path(n).name or "biomass_water_surface.mp4"
def _slice_media_basename(base_name: str, slice_index: int) -> str:
"""生成切片视频的媒体文件名。"""
base = Path(base_name).stem
return f"{base}_slice_{slice_index:03d}.mp4"
# 客户端独立的状态:每个 client_id 有自己的切片队列和索引
# _client_slice_queues[client_id] = [url0, url1, url2, ...]
# _client_slice_indices[client_id] = 当前应该返回的索引
_client_slice_queues: Dict[str, List[str]] = {}
_client_slice_indices: Dict[str, int] = {}
# 全局缓存的切片列表(用于检测源文件变化)
_global_slice_urls: List[str] = []
_last_source_mtime: float = 0.0
def resolve_water_video_source(settings: Settings) -> Optional[Path]:
"""优先 BIOMASS_WATER_VIDEO_SOURCE否则取 ACTION_WATCH_DIR 中 mtime 最新的 .mp4。"""
cfg = settings.biomass_water_video_source
if cfg is not None:
if cfg.is_file():
return cfg
logger.warning(
"[water-video] BIOMASS_WATER_VIDEO_SOURCE is not a file: {}",
cfg,
)
return None
aw = settings.action_watch_dir
if aw is None or not aw.is_dir():
return None
mp4s = iter_mp4(aw, settings.action_watch_recursive)
if not mp4s:
return None
try:
return max(mp4s, key=lambda p: p.stat().st_mtime)
except OSError as e:
logger.warning("[water-video] could not pick latest mp4: {}", e)
return None
async def _publish_video(
src: Path,
dst: Path,
settings: Settings,
async def get_water_video_public_url(
settings: Settings, client_id: str = DEFAULT_CLIENT_ID,
) -> str:
"""发布视频到 MEDIA_ROOT
"""返回该客户端上一次 ``/health/result/`` 投递的切片视频 URL
Args:
src: 源视频路径
dst: 目标路径
settings: 应用配置
Returns:
发布的视频URL失败返回空串
视频文件在 ``action_watch`` 推理完成后即发布到 ``MEDIA_ROOT`` 并将 URL
写入 ``health_snapshots.video_url``;本函数仅做一次 DB 查询,不再独立
解析源文件或切片。
"""
tmp = dst.with_name(dst.stem + "_tmp.mp4")
tmp.unlink(missing_ok=True)
try:
ok = await to_thread(transcode_src_to_h264_dst, src, tmp)
if ok and tmp.is_file() and tmp.stat().st_size > 0:
tmp.replace(dst)
logger.info("[water-video] published H.264: {} -> {}", src.name, dst.name)
else:
tmp.unlink(missing_ok=True)
await to_thread(shutil.copy2, src, dst)
logger.warning(
"[water-video] transcode failed, copied raw: {} -> {}",
src.name,
dst.name,
)
return _public_media_url(settings, dst.name)
except Exception:
logger.exception("[water-video] publish failed")
tmp.unlink(missing_ok=True)
if dst.is_file():
return _public_media_url(settings, dst.name)
return ""
async def _prepare_slices(settings: Settings) -> List[str]:
"""预处理:如果视频较长,切分为多个片段并发布到 MEDIA_ROOT。
返回切片URL列表供各客户端使用
"""
global _global_slice_urls, _last_source_mtime
base_basename = _safe_water_media_basename(settings.biomass_water_video_media_name)
src = resolve_water_video_source(settings)
if src is None:
return []
# 检查是否需要重新切片
try:
current_mtime = src.stat().st_mtime
except OSError:
current_mtime = 0.0
# 如果源文件未变化且已有缓存,直接返回缓存
if current_mtime == _last_source_mtime and _global_slice_urls:
return _global_slice_urls
# 检查视频时长
duration = get_video_duration(src)
should_slice = duration > MIN_DURATION_FOR_SLICE
new_urls: List[str] = []
if should_slice:
# 视频较长,切片处理
logger.info(
"[water-video] video duration {}s > {}s, slicing into {}s segments",
duration,
MIN_DURATION_FOR_SLICE,
SLICE_DURATION,
)
slice_files, slice_dir = slice_video(src, SLICE_DURATION)
if len(slice_files) > 1:
# 发布每个切片
for i, slice_file in enumerate(slice_files):
slice_basename = _slice_media_basename(base_basename, i)
dst = settings.media_root / slice_basename
# 检查是否需要重新发布
need_publish = True
if dst.is_file():
try:
if dst.stat().st_mtime >= slice_file.stat().st_mtime:
need_publish = False
except OSError:
pass
if need_publish:
url = await _publish_video(slice_file, dst, settings)
else:
url = _public_media_url(settings, dst.name)
if url:
new_urls.append(url)
logger.info(
"[water-video] prepared {} slices for {}",
len(new_urls),
src.name,
)
# 更新全局缓存
_global_slice_urls = new_urls
_last_source_mtime = current_mtime
return new_urls
# 视频较短,只保留完整视频
dst = settings.media_root / base_basename
url = await _publish_video(src, dst, settings)
if url:
new_urls = [url]
_global_slice_urls = new_urls
_last_source_mtime = current_mtime
return new_urls
return []
async def get_water_video_public_url(settings: Settings, client_id: str = DEFAULT_CLIENT_ID) -> str:
"""转码并发布到 MEDIA_ROOT 后返回绝对 URL无可用源且无已发布文件时返回空串。
如果视频较长被切片,会根据 health/result 端点的状态返回对应的切片URL。
对齐机制:查询 health 数据库记录的切片索引,确保与 health/result 端点对齐。
只有当 health/result 返回了第 N 个切片的行为结果后,本端点才会返回第 N 个切片的视频。
Args:
settings: 应用配置
client_id: 客户端标识,默认为 "default"
Returns:
视频URL失败返回空串
"""
from app.db import get_last_health_slice_index
settings.media_root.mkdir(parents=True, exist_ok=True)
async with _publish_lock:
# 确保切片已准备好
slice_urls = await _prepare_slices(settings)
if not slice_urls:
# 没有切片,尝试返回已发布的文件
basename = _safe_water_media_basename(settings.biomass_water_video_media_name)
dst = settings.media_root / basename
if dst.is_file():
return _public_media_url(settings, dst.name)
return ""
# 查询 health 端点上次返回的切片索引
target_slice_idx = get_last_health_slice_index(client_id)
if target_slice_idx >= 0 and target_slice_idx < len(slice_urls):
# 返回与 health 结果对齐的切片
logger.debug(
"[water-video] client_id={} aligned to health slice {}/{}",
client_id,
target_slice_idx,
len(slice_urls),
)
return slice_urls[target_slice_idx]
else:
# 没有对齐的 health 结果,返回空(等待 health/result 先被调用)
logger.debug(
"[water-video] client_id={} no health index yet, returning empty",
client_id,
)
return ""
return peek_last_delivered_health_video_url(settings, client_id)