diff --git a/.gitignore b/.gitignore index c4f4b5a..9521eb5 100644 --- a/.gitignore +++ b/.gitignore @@ -13,6 +13,7 @@ build/ .mypy_cache/ .ruff_cache/ *.egg +/tmp # Large model weights — 统一放仓库根目录 models/(见 fish_api 默认 settings) models/ @@ -43,4 +44,7 @@ mockdata/** .DS_Store .cursor/ -frp/ \ No newline at end of file +frp/ + +# 项目专属 ffmpeg 静态构建(二进制太大,通过 scripts/setup_ffmpeg.sh 下载) +tools/ffmpeg/ \ No newline at end of file diff --git a/FishMeasure/fish_video_weight_evaluation.py b/FishMeasure/fish_video_weight_evaluation.py index fd9f37b..ac30104 100755 --- a/FishMeasure/fish_video_weight_evaluation.py +++ b/FishMeasure/fish_video_weight_evaluation.py @@ -41,6 +41,37 @@ from utils.keep_largest_cluster import keep_largest_cluster_with_colors from utils.correct_tail_rotation import correct_tail_rotation_array +def get_h264_fourcc(): + """返回最佳的 H.264 FourCC 编码器代码。 + + 尝试顺序:avc1 (最兼容), X264, H264 + 如果都不可用,回退到 mp4v (MPEG-4,兼容性较差但通用) + """ + # 尝试的 H.264 FourCC 代码(按浏览器兼容性排序) + h264_candidates = ["avc1", "X264", "H264"] + for codec in h264_candidates: + fourcc = cv2.VideoWriter_fourcc(*codec) + # 测试是否可用(创建临时视频) + try: + import tempfile + import os + with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as f: + tmp_path = f.name + test_writer = cv2.VideoWriter(tmp_path, fourcc, 10.0, (64, 64)) + if test_writer.isOpened(): + test_writer.release() + os.unlink(tmp_path) + print(f"[VideoEncoder] Using H.264 codec: {codec}") + return fourcc + test_writer.release() + os.unlink(tmp_path) + except Exception: + pass + # 回退到 mp4v(MPEG-4 Part 2) + print("[VideoEncoder] H.264 not available, falling back to mp4v (MPEG-4)") + return cv2.VideoWriter_fourcc(*"mp4v") + + def estimate_pointcloud_length_pca(points: np.ndarray) -> float: """ Estimate point cloud "length" as the extent along the 1st PCA axis. @@ -642,7 +673,7 @@ def finalize_preview_video_with_weights( out_frames.append(combined) video_path = output_images_folder / f"{svo_name}_preview.mp4" h, w = out_frames[0].shape[:2] - fourcc = cv2.VideoWriter_fourcc(*"mp4v") + fourcc = get_h264_fourcc() vw = cv2.VideoWriter(str(video_path), fourcc, float(fps_video), (w, h)) for fr in out_frames: vw.write(fr) @@ -1297,41 +1328,6 @@ def process_single_svo2(svo_path, output_base, yolo_model, sam_predictor, sam_de print(f"Reading from SVO2 file: {svo_path.name}") print(f"Output folder: {output_base.resolve()}") - # Check if output folder already exists and contains point clouds - # If so, skip data generation and directly run weight estimation - if output_base.exists() and output_cloud_folder.exists(): - # Check if there are point cloud files - point_cloud_files = list(output_cloud_folder.glob("*.ply")) - if point_cloud_files and do_weight_estimation: - print(f"\n{'='*60}") - print(f"Output folder already exists with {len(point_cloud_files)} point cloud files") - print(f"Skipping data generation, directly running weight estimation...") - print(f"{'='*60}") - - # Run weight estimation directly - weight_output_dir = output_base / "weight_estimation" - results = run_weight_estimation( - cloud_folder=output_cloud_folder, - output_dir=weight_output_dir, - topk_length=weight_topk_length, - remove_outliers=weight_remove_outliers, - outlier_method=weight_outlier_method, - max_cv_length=max_cv_length, - verbose=True, - top_k=weight_top_k, - top_by_length=weight_top_by_length, - length_switch_to_weight_mm=weight_length_switch_mm, - ) - return results is not None - elif point_cloud_files and not do_weight_estimation: - print(f"\n{'='*60}") - print(f"Output folder already exists with {len(point_cloud_files)} point cloud files") - print(f"Weight estimation not requested (--run-weight-estimation not set)") - print(f"Skipping processing...") - print(f"{'='*60}") - return True - # If folder exists but no point clouds, continue with normal processing - # Initialize ZED reader zed_reader = ZEDReader(svo_path=str(svo_path), camera_mode=False, use_yolo_detector=False) if not zed_reader.open(): @@ -1820,7 +1816,7 @@ def process_single_svo2(svo_path, output_base, yolo_model, sam_predictor, sam_de video_path = output_images_folder / f"{svo_name}_preview.mp4" h, w = video_frames[0].shape[:2] fps_v = 10.0 - fourcc = cv2.VideoWriter_fourcc(*"mp4v") + fourcc = get_h264_fourcc() video_writer = cv2.VideoWriter(str(video_path), fourcc, fps_v, (w, h)) for frame in video_frames: video_writer.write(frame) @@ -2975,7 +2971,7 @@ def main(): fps = 10.0 # Frames per second # Create video writer - fourcc = cv2.VideoWriter_fourcc(*'mp4v') + fourcc = get_h264_fourcc() video_writer = cv2.VideoWriter(str(video_path), fourcc, fps, (w, h)) for frame in video_frames: diff --git a/fish_api/README.md b/fish_api/README.md index eada28f..3fef7c3 100644 --- a/fish_api/README.md +++ b/fish_api/README.md @@ -45,9 +45,11 @@ OpenAPI:`http://127.0.0.1:8000/docs` ## 对外 GET(由其它系统轮询) -- `GET /api/v1/biomass/real/camera/` — 每次 GET 消费一条未投递的称重快照(SQLite 游标);无新数据时 `data.result` 为空,响应头 `X-Fish-Biomass-New: 0` +- `GET /api/v1/biomass/real/camera/` — 每次 GET 消费**该客户端**下一条未投递的称重快照(SQLite **按客户端独立游标**);无新数据时 `data.result` 为空,响应头 `X-Fish-Biomass-New: 0` - `GET /api/v1/biomass/health/result/` — 同上,行为 / 健康快照队列 +**客户端区分**:请求头 `X-Fish-Client-Id: <字符串>` 或查询参数 `client_id=<字符串>`(**优先头**)。未携带时等价于 `default`,与旧版「全局一条游标」行为一致。不同 `client_id` 各自从当前队列消费,互不影响。 + `result` 中每条鱼含算法字段:`id`(跟踪 ID)、`weight`(g)、`length`(mm)。不可交付或失败的推理不会进入客户端队列。 ## 流式输入(分块上传) diff --git a/fish_api/app/db.py b/fish_api/app/db.py index eacc7fa..ea44e34 100644 --- a/fish_api/app/db.py +++ b/fish_api/app/db.py @@ -18,6 +18,22 @@ from typing import Any, Optional, Set, Tuple from app.settings import Settings from app.state import HealthSnapshot, MeasureSnapshot +# 未带客户端标识时与旧行为兼容:共享同一条投递游标 +DEFAULT_CLIENT_ID = "default" +MAX_CLIENT_ID_LEN = 128 + + +def normalize_client_id(raw: Optional[str]) -> str: + """供轮询接口使用;过长截断,空值回退为 DEFAULT_CLIENT_ID。""" + if raw is None: + return DEFAULT_CLIENT_ID + s = str(raw).strip() + if not s: + return DEFAULT_CLIENT_ID + if len(s) > MAX_CLIENT_ID_LEN: + s = s[:MAX_CLIENT_ID_LEN] + return s + def _connect(path: Path) -> sqlite3.Connection: path.parent.mkdir(parents=True, exist_ok=True) @@ -60,33 +76,64 @@ def init_db(settings: Settings) -> None: PRIMARY KEY (path, kind) ); - CREATE TABLE IF NOT EXISTS delivery_cursor ( - kind TEXT PRIMARY KEY CHECK (kind IN ('measure', 'health')), - last_delivered_id INTEGER NOT NULL DEFAULT 0 + CREATE TABLE IF NOT EXISTS delivery_client_cursor ( + client_id TEXT NOT NULL, + kind TEXT NOT NULL CHECK (kind IN ('measure', 'health')), + last_delivered_id INTEGER NOT NULL DEFAULT 0, + PRIMARY KEY (client_id, kind) ); """ ) + _migrate_delivery_cursor_from_legacy(conn) _ensure_delivery_cursors(conn) finally: conn.close() +def _migrate_delivery_cursor_from_legacy(conn: sqlite3.Connection) -> None: + """旧表 delivery_cursor(kind) → delivery_client_cursor(default, kind)。""" + row = conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='delivery_cursor'" + ).fetchone() + if row is None: + return + for kind in ("measure", "health"): + r = conn.execute( + "SELECT last_delivered_id FROM delivery_cursor WHERE kind = ?", (kind,) + ).fetchone() + if r is not None: + conn.execute( + """ + INSERT OR REPLACE INTO delivery_client_cursor + (client_id, kind, last_delivered_id) + VALUES (?, ?, ?) + """, + (DEFAULT_CLIENT_ID, kind, int(r["last_delivered_id"])), + ) + conn.execute("DROP TABLE delivery_cursor") + conn.commit() + + def _ensure_delivery_cursors(conn: sqlite3.Connection) -> None: - """为每条流插入一行游标;首次插入时 last_delivered_id=当前 MAX(id),避免升级后逐条投递历史快照。""" + """为默认客户端插入一行游标;首次插入时 last_delivered_id=当前 MAX(id),避免升级后逐条投递历史快照。""" for kind, table in ( ("measure", "measure_snapshots"), ("health", "health_snapshots"), ): row = conn.execute( - "SELECT 1 FROM delivery_cursor WHERE kind = ?", (kind,) + "SELECT 1 FROM delivery_client_cursor WHERE client_id = ? AND kind = ?", + (DEFAULT_CLIENT_ID, kind), ).fetchone() if row is None: mid = conn.execute( f"SELECT COALESCE(MAX(id), 0) FROM {table}" ).fetchone()[0] conn.execute( - "INSERT INTO delivery_cursor (kind, last_delivered_id) VALUES (?, ?)", - (kind, int(mid)), + """ + INSERT INTO delivery_client_cursor (client_id, kind, last_delivered_id) + VALUES (?, ?, ?) + """, + (DEFAULT_CLIENT_ID, kind, int(mid)), ) conn.commit() @@ -304,10 +351,17 @@ def _health_row_deliverable( def _last_delivered_id( - conn: sqlite3.Connection, kind: str, snapshots_table: str + conn: sqlite3.Connection, + kind: str, + snapshots_table: str, + client_id: str, ) -> int: row = conn.execute( - "SELECT last_delivered_id FROM delivery_cursor WHERE kind = ?", (kind,) + """ + SELECT last_delivered_id FROM delivery_client_cursor + WHERE kind = ? AND client_id = ? + """, + (kind, client_id), ).fetchone() if row is not None: return int(row["last_delivered_id"]) @@ -315,21 +369,26 @@ def _last_delivered_id( f"SELECT COALESCE(MAX(id), 0) FROM {snapshots_table}" ).fetchone()[0] conn.execute( - "INSERT INTO delivery_cursor (kind, last_delivered_id) VALUES (?, ?)", - (kind, int(mid)), + """ + INSERT INTO delivery_client_cursor (client_id, kind, last_delivered_id) + VALUES (?, ?, ?) + """, + (client_id, kind, int(mid)), ) return int(mid) def pop_next_measure( settings: Settings, + client_id: str = DEFAULT_CLIENT_ID, ) -> Tuple[MeasureSnapshot, bool, Optional[int]]: - """取队首未投递且可交付的 measure 快照并推进游标;跳过不可交付行仅推进游标。""" + """取该客户端队首未投递且可交付的 measure 快照并推进其游标;跳过不可交付行仅推进游标。""" + cid = normalize_client_id(client_id) init_db(settings) conn = _connect(settings.sqlite_path) try: conn.execute("BEGIN IMMEDIATE") - last_id = _last_delivered_id(conn, "measure", "measure_snapshots") + last_id = _last_delivered_id(conn, "measure", "measure_snapshots", cid) while True: next_row = conn.execute( @@ -355,8 +414,11 @@ def pop_next_measure( data = [] conn.execute( - "UPDATE delivery_cursor SET last_delivered_id = ? WHERE kind = ?", - (nid, "measure"), + """ + UPDATE delivery_client_cursor SET last_delivered_id = ? + WHERE kind = ? AND client_id = ? + """, + (nid, "measure", cid), ) if not measure_result_deliverable(data, err): @@ -380,13 +442,17 @@ def pop_next_measure( conn.close() -def pop_next_health(settings: Settings) -> Tuple[HealthSnapshot, bool, Optional[int]]: - """取队首未投递且可交付的 health 快照并推进游标;跳过不可交付行仅推进游标。""" +def pop_next_health( + settings: Settings, + client_id: str = DEFAULT_CLIENT_ID, +) -> Tuple[HealthSnapshot, bool, Optional[int]]: + """取该客户端队首未投递且可交付的 health 快照并推进其游标;跳过不可交付行仅推进游标。""" + cid = normalize_client_id(client_id) init_db(settings) conn = _connect(settings.sqlite_path) try: conn.execute("BEGIN IMMEDIATE") - last_id = _last_delivered_id(conn, "health", "health_snapshots") + last_id = _last_delivered_id(conn, "health", "health_snapshots", cid) while True: next_row = conn.execute( @@ -412,8 +478,11 @@ def pop_next_health(settings: Settings) -> Tuple[HealthSnapshot, bool, Optional[ err: Optional[str] = next_row["error"] conn.execute( - "UPDATE delivery_cursor SET last_delivered_id = ? WHERE kind = ?", - (nid, "health"), + """ + UPDATE delivery_client_cursor SET last_delivered_id = ? + WHERE kind = ? AND client_id = ? + """, + (nid, "health", cid), ) if not _health_row_deliverable(beh, hlth, raw_en, err): @@ -529,14 +598,14 @@ def clear_watch_cache_and_snapshots(settings: Settings) -> None: conn.execute("DELETE FROM watch_processed WHERE kind = ?", ("measure",)) conn.execute("DELETE FROM measure_snapshots") conn.execute( - "UPDATE delivery_cursor SET last_delivered_id = 0 WHERE kind = ?", + "UPDATE delivery_client_cursor SET last_delivered_id = 0 WHERE kind = ?", ("measure",), ) if settings.action_watch_use_state_file: conn.execute("DELETE FROM watch_processed WHERE kind = ?", ("action",)) conn.execute("DELETE FROM health_snapshots") conn.execute( - "UPDATE delivery_cursor SET last_delivered_id = 0 WHERE kind = ?", + "UPDATE delivery_client_cursor SET last_delivered_id = 0 WHERE kind = ?", ("health",), ) conn.commit() diff --git a/fish_api/app/main.py b/fish_api/app/main.py index dded7e2..29bebc5 100644 --- a/fish_api/app/main.py +++ b/fish_api/app/main.py @@ -4,9 +4,9 @@ import asyncio from contextlib import asynccontextmanager from fastapi import FastAPI -from fastapi.staticfiles import StaticFiles from app.logging_config import setup_logging +from app.media_static import MediaStaticFiles from app.db import init_db from app.routers import biomass, ingest from app.services.action_watch import run_action_watch_loop @@ -46,7 +46,7 @@ _settings = get_settings() _settings.media_root.mkdir(parents=True, exist_ok=True) app.mount( "/media", - StaticFiles(directory=str(_settings.media_root)), + MediaStaticFiles(directory=str(_settings.media_root)), name="media", ) diff --git a/fish_api/app/media_static.py b/fish_api/app/media_static.py new file mode 100644 index 0000000..ef11341 --- /dev/null +++ b/fish_api/app/media_static.py @@ -0,0 +1,180 @@ +"""托管预览视频:Linux 上 mimetypes 常无法识别 .mp4,Starlette 会退回 text/plain,导致浏览器表现异常。 + +同时提供 Range 请求支持,使视频可以在浏览器中流式播放、跳转进度。 +""" + +from __future__ import annotations + +import os +from mimetypes import guess_type +from pathlib import Path +from typing import Union + +from starlette.datastructures import Headers +from starlette.responses import FileResponse, Response, StreamingResponse +from starlette.staticfiles import NotModifiedResponse, StaticFiles +from starlette.types import Scope + +PathLike = Union[str, "os.PathLike[str]"] + + +def _media_type_for_file(path: PathLike) -> str: + ext = Path(path).suffix.lower() + if ext in (".mp4", ".m4v"): + return "video/mp4" + if ext == ".webm": + return "video/webm" + if ext == ".mkv": + return "video/x-matroska" + guessed = guess_type(str(path))[0] + if guessed: + return guessed + return "application/octet-stream" + + +def _parse_range_header(range_header: str, file_size: int) -> list[tuple[int, int]]: + """解析 HTTP Range 头,返回 (start, end) 列表。""" + if not range_header.startswith("bytes="): + return [] + ranges = [] + for part in range_header[len("bytes="):].split(","): + part = part.strip() + if part.startswith("-"): + # 后缀范围:最后 N 字节 + try: + suffix = int(part[1:]) + if suffix > 0: + start = max(0, file_size - suffix) + ranges.append((start, file_size - 1)) + except ValueError: + continue + else: + if "-" in part: + start_str, end_str = part.split("-", 1) + try: + start = int(start_str) if start_str else 0 + except ValueError: + continue + try: + end = int(end_str) if end_str else file_size - 1 + except ValueError: + continue + if start > end: + continue + end = min(end, file_size - 1) + ranges.append((start, end)) + return ranges + + +class _RangeFileResponse(Response): + """支持 HTTP Range 请求的文件响应,用于视频流播放。""" + + def __init__( + self, + path: PathLike, + media_type: str | None = None, + stat_result: os.stat_result | None = None, + headers: dict | None = None, + range_header: str | None = None, + ): + self.path = path + self.stat_result = stat_result or os.stat(path) + self.file_size = self.stat_result.st_size + self.range_header = range_header + + headers = headers or {} + headers.setdefault("accept-ranges", "bytes") + headers.setdefault("last-modified", str(self.stat_result.st_mtime)) + + # 解析 Range 请求 + ranges = [] + if range_header: + ranges = _parse_range_header(range_header, self.file_size) + + if len(ranges) == 1: + # 单范围请求 + start, end = ranges[0] + self.status_code = 206 + self.start = start + self.end = end + headers["content-length"] = str(end - start + 1) + headers["content-range"] = f"bytes {start}-{end}/{self.file_size}" + elif len(ranges) > 1: + # 多范围请求太复杂,返回整个文件 + self.status_code = 200 + self.start = 0 + self.end = self.file_size - 1 + headers["content-length"] = str(self.file_size) + else: + # 无 Range 请求,返回整个文件 + self.status_code = 200 + self.start = 0 + self.end = self.file_size - 1 + headers["content-length"] = str(self.file_size) + + super().__init__( + content=None, + status_code=self.status_code, + headers=headers, + media_type=media_type, + ) + + async def __call__(self, scope: Scope, receive, send): + if self.status_code == 416: + await send({"type": "http.response.start", "status": 416, "headers": self.raw_headers}) + await send({"type": "http.response.body", "body": b""}) + return + + await send({"type": "http.response.start", "status": self.status_code, "headers": self.raw_headers}) + + chunk_size = 64 * 1024 # 64KB chunks + async def file_stream(): + with open(self.path, "rb") as f: + f.seek(self.start) + remaining = self.end - self.start + 1 + while remaining > 0: + to_read = min(chunk_size, remaining) + chunk = f.read(to_read) + if not chunk: + break + yield chunk + remaining -= len(chunk) + + async for chunk in file_stream(): + await send({"type": "http.response.body", "body": chunk, "more_body": True}) + await send({"type": "http.response.body", "body": b"", "more_body": False}) + + +class MediaStaticFiles(StaticFiles): + """与 StaticFiles 相同,但对常见视频后缀显式设置 Content-Type,并支持 Range 请求。""" + + def file_response( + self, + full_path: PathLike, + stat_result: os.stat_result, + scope: Scope, + status_code: int = 200, + ) -> Response: + request_headers = Headers(scope=scope) + range_header = request_headers.get("range") + media_type = _media_type_for_file(full_path) + + # 视频文件支持 Range 请求 + if media_type.startswith("video/") and range_header: + response = _RangeFileResponse( + full_path, + media_type=media_type, + stat_result=stat_result, + range_header=range_header, + ) + else: + response = FileResponse( + full_path, + status_code=status_code, + stat_result=stat_result, + media_type=media_type, + ) + + if self.is_not_modified(response.headers, request_headers): + return NotModifiedResponse(response.headers) + return response diff --git a/fish_api/app/prestart_fresh.py b/fish_api/app/prestart_fresh.py index 704da12..6fead9d 100644 --- a/fish_api/app/prestart_fresh.py +++ b/fish_api/app/prestart_fresh.py @@ -1,10 +1,12 @@ """启动前清空状态:SQLite、watch 旧 JSON、测量/行为运行时目录。 由 start_fresh.sh 在 uvicorn 之前调用,使 FishMeasure 与 FishAction 均在无缓存下重新推理。 +可通过环境变量 KEEP_MEASURE_OUTPUT=1 保留测量输出目录以复用点云。 """ from __future__ import annotations +import os from pathlib import Path from app.db import clear_runtime_compute_dirs, remove_sqlite_database_files @@ -31,14 +33,30 @@ def run_prestart_fresh() -> None: flush=True, ) - clear_runtime_compute_dirs(s) - print( - "[prestart-fresh] cleared compute dirs: " - f"measure_output={s.measure_output_root}, " - f"action_output={s.action_output_root}, " - f"media={s.media_root}, stream_tmp={s.stream_tmp_dir}", - flush=True, - ) + # 检查是否保留测量输出目录(用于复用点云) + keep_measure_output = os.environ.get("KEEP_MEASURE_OUTPUT", "").strip() in ("1", "true", "yes") + + if keep_measure_output: + # 只清理其他目录,保留 measure_output_root + from app.db import _safe_rm_tree + _safe_rm_tree(s.action_output_root) + _safe_rm_tree(s.media_root) + _safe_rm_tree(s.stream_tmp_dir) + print( + "[prestart-fresh] cleared compute dirs (kept measure_output for cloud reuse): " + f"action_output={s.action_output_root}, " + f"media={s.media_root}, stream_tmp={s.stream_tmp_dir}", + flush=True, + ) + else: + clear_runtime_compute_dirs(s) + print( + "[prestart-fresh] cleared compute dirs: " + f"measure_output={s.measure_output_root}, " + f"action_output={s.action_output_root}, " + f"media={s.media_root}, stream_tmp={s.stream_tmp_dir}", + flush=True, + ) if s.measure_watch_use_state_file: if s.measure_watch_state_file is not None: diff --git a/fish_api/app/routers/biomass.py b/fish_api/app/routers/biomass.py index 20260b4..cc39f9c 100644 --- a/fish_api/app/routers/biomass.py +++ b/fish_api/app/routers/biomass.py @@ -1,9 +1,11 @@ from __future__ import annotations -from fastapi import APIRouter, Depends +from typing import Optional + +from fastapi import APIRouter, Depends, Header, Query from starlette.responses import JSONResponse -from app.db import pop_next_health, pop_next_measure +from app.db import normalize_client_id, pop_next_health, pop_next_measure from app.settings import Settings, get_settings router = APIRouter(prefix="/api/v1/biomass", tags=["biomass"]) @@ -16,10 +18,27 @@ def _new_headers(has_new: bool) -> dict[str, str]: return {HEADER_BIOMASS_NEW: "1" if has_new else "0"} +def _resolve_client_id( + x_fish_client_id: Optional[str] = Header(None, alias="X-Fish-Client-Id"), + client_id: Optional[str] = Query( + None, + description="客户端标识;与请求头 X-Fish-Client-Id 二选一(优先头)。未带时共用 default 游标", + ), +) -> str: + if x_fish_client_id is not None and str(x_fish_client_id).strip(): + return normalize_client_id(x_fish_client_id) + if client_id is not None and str(client_id).strip(): + return normalize_client_id(client_id) + return normalize_client_id(None) + + @router.get("/real/camera/") -async def get_real_camera(settings: Settings = Depends(get_settings)): - """双目实时结果:每次 GET 投递下一条未消费的 FishMeasure 快照(SQLite 游标)。""" - m, has_new, _ = pop_next_measure(settings) +async def get_real_camera( + settings: Settings = Depends(get_settings), + client_id: str = Depends(_resolve_client_id), +): + """双目实时结果:每次 GET 投递该客户端下一条未消费的 FishMeasure 快照(按 client_id 独立游标)。""" + m, has_new, _ = pop_next_measure(settings, client_id) if not has_new: return JSONResponse( content={ @@ -61,9 +80,12 @@ async def get_real_camera(settings: Settings = Depends(get_settings)): @router.get("/health/result/") -async def get_health_result(settings: Settings = Depends(get_settings)): - """行为 / 健康结果:每次 GET 投递下一条未消费的 FishAction 快照(SQLite 游标)。""" - h, has_new, _ = pop_next_health(settings) +async def get_health_result( + settings: Settings = Depends(get_settings), + client_id: str = Depends(_resolve_client_id), +): + """行为 / 健康结果:每次 GET 投递该客户端下一条未消费的 FishAction 快照(按 client_id 独立游标)。""" + h, has_new, _ = pop_next_health(settings, client_id) if not has_new: return JSONResponse( content={ diff --git a/fish_api/app/services/measure.py b/fish_api/app/services/measure.py index c012625..7f21dda 100644 --- a/fish_api/app/services/measure.py +++ b/fish_api/app/services/measure.py @@ -241,9 +241,11 @@ def _split_sbs_video(src: Path, left_dst: Path, right_dst: Path) -> bool: Returns True if split succeeded, False otherwise (caller should fall back to copy). """ + ffmpeg_path = _get_ffmpeg_path() + ffprobe_path = str(Path(ffmpeg_path).parent / "ffprobe") probe = subprocess.run( [ - "ffprobe", "-v", "quiet", "-print_format", "json", + ffprobe_path, "-v", "quiet", "-print_format", "json", "-show_streams", str(src), ], capture_output=True, text=True, @@ -263,19 +265,282 @@ def _split_sbs_video(src: Path, left_dst: Path, right_dst: Path) -> bool: if half_w < 1 or w < h: return False + encoder, encoder_options, _ = _get_h264_encoder() for crop, dst in [ (f"crop={half_w}:{h}:{half_w}:0", left_dst), (f"crop={half_w}:{h}:0:0", right_dst), ]: - r = subprocess.run( - ["ffmpeg", "-y", "-i", str(src), "-vf", crop, "-an", "-q:v", "5", str(dst)], - capture_output=True, text=True, - ) + cmd = [ffmpeg_path, "-y", "-i", str(src), "-vf", crop, "-an"] + if encoder: + cmd.extend(["-c:v", encoder, "-pix_fmt", "yuv420p", "-movflags", "+faststart"]) + cmd.extend(encoder_options) + else: + cmd.extend(["-q:v", "5"]) + cmd.append(str(dst)) + r = subprocess.run(cmd, capture_output=True, text=True) if r.returncode != 0: return False return True +def _get_ffmpeg_path() -> str: + """获取可用的 ffmpeg 路径。优先使用项目配置的 ffmpeg。""" + # 优先使用项目目录下的 ffmpeg + project_ffmpeg = Path("/home/ubuntu/projects/FishServer/tools/ffmpeg/bin/ffmpeg") + if project_ffmpeg.is_file(): + return str(project_ffmpeg) + # 尝试系统路径 + system_paths = ["/usr/bin/ffmpeg", "/usr/local/bin/ffmpeg"] + for path in system_paths: + if Path(path).is_file(): + return path + # 回退到 PATH 中的 ffmpeg + return "ffmpeg" + + +def _get_h264_encoder() -> tuple[str, list[str], str]: + """检测可用的H.264编码器,返回 (encoder_name, options, ffmpeg_path)。 + + 优先使用 libx264(纯软件,最可靠),硬件编码器需要实际测试才能确认可用。 + """ + encoders_to_try = [ + ("libx264", ["-preset", "fast", "-crf", "23"]), + ("h264_nvenc", ["-preset", "fast"]), + ("libopenh264", []), + ] + + ffmpeg_path = _get_ffmpeg_path() + try: + result = subprocess.run( + [ffmpeg_path, "-encoders"], + capture_output=True, text=True, timeout=10 + ) + encoders_output = result.stdout + for encoder, options in encoders_to_try: + if encoder in encoders_output: + return encoder, options, ffmpeg_path + except Exception: + pass + return "", [], ffmpeg_path + + +def _get_x264_path() -> Optional[str]: + """检测系统上是否有可用的 x264 命令行工具。""" + for path in ["/usr/bin/x264", "/usr/local/bin/x264", "x264"]: + if path == "x264": + try: + result = subprocess.run(["which", "x264"], capture_output=True, text=True, timeout=5) + if result.returncode == 0 and result.stdout.strip(): + return result.stdout.strip() + except Exception: + pass + elif Path(path).is_file(): + return path + return None + + +def _transcode_with_x264(src: Path, dst: Path) -> bool: + """使用 x264 命令行工具将视频转码为 H.264。 + + 这是当 ffmpeg 的 H.264 编码器都不可用时(如 libopenh264 版本不匹配)的最后备选方案。 + 通过 ffmpeg 提取原始 YUV 帧,然后用 x264 编码。 + """ + import tempfile + import shutil + + x264_path = _get_x264_path() + ffmpeg_path = _get_ffmpeg_path() + + if not x264_path: + logger.debug("[FishMeasure] x264 not available") + return False + + # 首先用 ffprobe 获取视频信息 + try: + probe = subprocess.run( + ["ffprobe", "-v", "quiet", "-print_format", "json", "-show_streams", str(src)], + capture_output=True, text=True, timeout=10 + ) + import json as _json + streams = _json.loads(probe.stdout).get("streams", []) + vstream = next((s for s in streams if s.get("codec_type") == "video"), None) + if not vstream: + return False + width = int(vstream["width"]) + height = int(vstream["height"]) + fps_str = vstream.get("r_frame_rate", "25/1") + # 解析 fps (可能是 "30/1" 或 "30000/1001" 格式) + if "/" in fps_str: + num, den = map(int, fps_str.split("/")) + fps = num / den if den != 0 else 25.0 + else: + fps = float(fps_str) + except Exception as e: + logger.debug("[FishMeasure] x264 probe failed: {}", str(e)) + return False + + tmp_yuv = None + try: + # 创建临时 YUV 文件 + with tempfile.NamedTemporaryFile(suffix=".yuv", delete=False) as f: + tmp_yuv = Path(f.name) + + # 步骤1: 用 ffmpeg 提取 YUV 原始帧 + extract_cmd = [ + ffmpeg_path, "-y", "-i", str(src), + "-f", "rawvideo", + "-pix_fmt", "yuv420p", + str(tmp_yuv) + ] + result = subprocess.run(extract_cmd, capture_output=True, text=True, timeout=300) + if result.returncode != 0: + logger.debug("[FishMeasure] x264: YUV extraction failed: {}", result.stderr[-200:] if result.stderr else "unknown") + return False + + # 步骤2: 用 x264 编码 + # x264 需要特定格式的输入参数 + encode_cmd = [ + x264_path, + "--input-res", f"{width}x{height}", + "--fps", str(fps), + "--preset", "fast", + "--crf", "23", + "--output-csp", "i420", + "-o", str(dst), + str(tmp_yuv) + ] + result = subprocess.run(encode_cmd, capture_output=True, text=True, timeout=600) + + if result.returncode == 0 and dst.is_file(): + logger.info("[FishMeasure] x264 transcoding SUCCESS: {} ({} bytes)", dst.name, dst.stat().st_size) + return True + else: + stderr = result.stderr[-300:] if result.stderr else "Unknown error" + logger.warning("[FishMeasure] x264 transcoding FAILED: {}", stderr) + if dst.exists(): + dst.unlink() + return False + except Exception as e: + logger.warning("[FishMeasure] x264 transcoding exception: {}", str(e)) + if dst.exists(): + dst.unlink() + return False + finally: + if tmp_yuv and tmp_yuv.exists(): + tmp_yuv.unlink() + + +def _transcode_fallback(src: Path, dst: Path) -> bool: + """备选转码方案:提取帧为图像序列,然后用ffmpeg编码为H.264。 + + 这种方法避免编码器直接读取 mp4v 文件的兼容性问题。 + """ + import tempfile + import shutil + + encoder, encoder_options, ffmpeg_path = _get_h264_encoder() + if not encoder: + return False + + tmp_dir = tempfile.mkdtemp() + try: + # 步骤1: 提取帧为 jpg 序列 + frames_pattern = f"{tmp_dir}/frame_%04d.jpg" + extract_cmd = [ + ffmpeg_path, "-y", "-i", str(src), + "-q:v", "2", # 高质量 + frames_pattern + ] + result = subprocess.run(extract_cmd, capture_output=True, text=True, timeout=60) + if result.returncode != 0: + logger.debug("[FishMeasure] Fallback: frame extraction failed: {}", result.stderr[-200:] if result.stderr else "unknown") + return False + + # 步骤2: 从帧编码为 H.264 MP4 + encode_cmd = [ + ffmpeg_path, "-y", + "-i", frames_pattern, + "-c:v", encoder, + "-pix_fmt", "yuv420p", + "-movflags", "+faststart", + "-an", + ] + encode_cmd.extend(encoder_options) + encode_cmd.append(str(dst)) + + result = subprocess.run(encode_cmd, capture_output=True, text=True, timeout=120) + + if result.returncode == 0 and dst.is_file(): + logger.info("[FishMeasure] Fallback transcoding SUCCESS: {} ({} bytes)", dst.name, dst.stat().st_size) + return True + else: + stderr = result.stderr[-300:] if result.stderr else "Unknown error" + logger.warning("[FishMeasure] Fallback transcoding FAILED: {}", stderr) + if dst.exists(): + dst.unlink() + return False + except Exception as e: + logger.warning("[FishMeasure] Fallback transcoding exception: {}", str(e)) + if dst.exists(): + dst.unlink() + return False + finally: + # 清理临时目录 + shutil.rmtree(tmp_dir, ignore_errors=True) + + +def _transcode_to_h264(src: Path, dst: Path) -> bool: + """使用 ffmpeg 将视频转码为 H.264 (浏览器兼容格式)。 + + 尝试多种H.264编码器,包括软件编码和硬件加速编码。 + 如果直接转码失败,依次尝试备选方案: + 1. 提取帧重新编码 + 2. 使用 x264 命令行工具(当 ffmpeg 的 H.264 编码器都不可用时) + """ + encoder, encoder_options, ffmpeg_path = _get_h264_encoder() + + # 如果有可用的 ffmpeg H.264 编码器,先尝试直接转码 + if encoder: + try: + # 基础参数 + cmd = [ + ffmpeg_path, "-y", "-i", str(src), + "-c:v", encoder, + "-pix_fmt", "yuv420p", # 确保兼容性 + "-movflags", "+faststart", # 优化网络播放(moov前置) + "-an", # 去除音频 + ] + cmd.extend(encoder_options) + cmd.append(str(dst)) + + logger.info("[FishMeasure] Transcoding with {} using {}: {} -> {}", encoder, ffmpeg_path, src.name, dst.name) + result = subprocess.run( + cmd, capture_output=True, text=True, timeout=300 + ) + + if result.returncode == 0 and dst.is_file(): + logger.info("[FishMeasure] Transcoding SUCCESS: {} ({} bytes)", dst.name, dst.stat().st_size) + return True + else: + stderr = result.stderr[-500:] if result.stderr else "Unknown error" + logger.warning("[FishMeasure] Direct transcoding FAILED, trying fallback: {}", stderr) + # 尝试备选方案1: 提取帧重新编码 + if _transcode_fallback(src, dst): + return True + # 备选方案1失败,尝试 x264 + logger.info("[FishMeasure] Fallback failed, trying x264...") + return _transcode_with_x264(src, dst) + except Exception as e: + logger.warning("[FishMeasure] Transcoding exception: {}", str(e)) + if _transcode_fallback(src, dst): + return True + return _transcode_with_x264(src, dst) + else: + # 没有可用的 ffmpeg H.264 编码器,直接尝试 x264 + logger.warning("[FishMeasure] No H.264 encoder available in ffmpeg, trying x264...") + return _transcode_with_x264(src, dst) + + def _publish_media( left: Optional[Path], right: Optional[Path], @@ -298,7 +563,13 @@ def _publish_media( def publish(src: Optional[Path], dst: Path) -> str: if src is None or not src.is_file(): return "" - shutil.copy2(src, dst) + # 尝试转码为 H.264,如果失败则直接复制原文件 + if _transcode_to_h264(src, dst): + logger.info("[FishMeasure] transcoded to H.264: {} -> {}", src.name, dst.name) + else: + # 转码失败,直接复制原文件 + shutil.copy2(src, dst) + logger.warning("[FishMeasure] copied without transcoding: {} -> {}", src.name, dst.name) return f"{base}/media/{dst.name}" vl = publish(left, left_dst) diff --git a/fish_api/app/settings.py b/fish_api/app/settings.py index cb186c0..057260e 100644 --- a/fish_api/app/settings.py +++ b/fish_api/app/settings.py @@ -4,10 +4,15 @@ from functools import lru_cache from pathlib import Path from typing import Optional -from pydantic import Field, field_validator, model_validator +from pydantic import AliasChoices, Field, field_validator, model_validator from pydantic_settings import BaseSettings, SettingsConfigDict +def _fish_api_env_file() -> Path: + """fish_api/.env — 与启动 cwd 无关,避免从仓库根跑 uvicorn 时读不到 .env。""" + return Path(__file__).resolve().parents[1] / ".env" + + def fish_repo_root() -> Path: # fish_api/app/settings.py -> parent[2] = repo root (contains FishMeasure/, fish_api/) return Path(__file__).resolve().parents[2] @@ -36,12 +41,16 @@ def _default_action_output_root() -> Path: class Settings(BaseSettings): model_config = SettingsConfigDict( - env_file=".env", + env_file=_fish_api_env_file(), env_file_encoding="utf-8", extra="ignore", ) - public_base_url: str = "http://127.0.0.1:8000" + #: 对外可访问的 API 基址(无末尾 `/`),用于 biomass 等 JSON 里 `video_left` / `video_right` 的绝对 URL。环境变量:**PUBLIC_BASE_URL** + public_base_url: str = Field( + default="http://127.0.0.1:8000", + validation_alias=AliasChoices("PUBLIC_BASE_URL", "public_base_url"), + ) ingest_api_key: str = "" diff --git a/fish_api/start_fresh.sh b/fish_api/start_fresh.sh index a6fc8b6..5f3c2d1 100755 --- a/fish_api/start_fresh.sh +++ b/fish_api/start_fresh.sh @@ -4,6 +4,9 @@ # bash fish_api/start_fresh.sh # PORT=8001 HOST=0.0.0.0 bash fish_api/start_fresh.sh # +# 保留测量输出目录以复用点云(配合 .env 中 MEASURE_REUSE_EXISTING_CLOUDS=true): +# KEEP_MEASURE_OUTPUT=1 bash fish_api/start_fresh.sh +# # 首次使用请先:cd fish_api && uv sync # set -euo pipefail @@ -11,7 +14,8 @@ set -euo pipefail DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" cd "$DIR" -export PUBLIC_BASE_URL="${PUBLIC_BASE_URL:-http://127.0.0.1:8000}" +# 勿在此默认导出 PUBLIC_BASE_URL:shell 环境会覆盖 fish_api/.env(pydantic 优先级)。 +# 未设置时由 app/settings.py 的默认值或 .env 中的 PUBLIC_BASE_URL 决定。 unset PYTHON_FISH_MEASURE PYTHON_FISH_ACTION 2>/dev/null || true if command -v uv >/dev/null 2>&1; then diff --git a/fish_api/start_no_fresh.sh b/fish_api/start_no_fresh.sh index 988885d..9cdb509 100755 --- a/fish_api/start_no_fresh.sh +++ b/fish_api/start_no_fresh.sh @@ -11,7 +11,8 @@ set -euo pipefail DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" cd "$DIR" -export PUBLIC_BASE_URL="${PUBLIC_BASE_URL:-http://127.0.0.1:8000}" +# 勿在此默认导出 PUBLIC_BASE_URL:shell 环境会覆盖 fish_api/.env(pydantic 优先级)。 +# 未设置时由 app/settings.py 的默认值或 .env 中的 PUBLIC_BASE_URL 决定。 unset PYTHON_FISH_MEASURE PYTHON_FISH_ACTION 2>/dev/null || true PORT="${PORT:-8000}" diff --git a/scripts/biomass_poller.py b/scripts/biomass_poller.py index 0b75e40..dd68fcc 100755 --- a/scripts/biomass_poller.py +++ b/scripts/biomass_poller.py @@ -40,12 +40,14 @@ def _should_log(resp: httpx.Response, body: Any) -> bool: return resp.headers.get("X-Fish-Biomass-New", "").strip() == "1" -async def poll_once(client: httpx.AsyncClient, base: str) -> None: +async def poll_once( + client: httpx.AsyncClient, base: str, extra_headers: dict[str, str] +) -> None: base = base.rstrip("/") camera_url = f"{base}/api/v1/biomass/real/camera/" health_url = f"{base}/api/v1/biomass/health/result/" - r1 = await client.get(camera_url) - r2 = await client.get(health_url) + r1 = await client.get(camera_url, headers=extra_headers) + r2 = await client.get(health_url, headers=extra_headers) b1 = _fmt_body(r1) b2 = _fmt_body(r2) if _should_log(r1, b1): @@ -54,11 +56,13 @@ async def poll_once(client: httpx.AsyncClient, base: str) -> None: logger.info("[health/result/] HTTP {} | {}", r2.status_code, json.dumps(b2, ensure_ascii=False)) -async def poll_loop(base: str, interval: float) -> None: +async def poll_loop( + base: str, interval: float, extra_headers: dict[str, str] +) -> None: async with httpx.AsyncClient(timeout=30.0) as client: while True: try: - await poll_once(client, base) + await poll_once(client, base, extra_headers) except Exception: logger.exception("poll round failed") await asyncio.sleep(max(interval, 0.5)) @@ -77,6 +81,11 @@ def main() -> None: default=float(os.environ.get("POLL_INTERVAL", "5")), help="轮询间隔秒数(默认 POLL_INTERVAL 或 5)", ) + parser.add_argument( + "--client-id", + default=os.environ.get("BIOMASS_CLIENT_ID", "").strip() or None, + help="传给 X-Fish-Client-Id(默认 BIOMASS_CLIENT_ID;不设则与网关 default 游标一致)", + ) args = parser.parse_args() logger.remove() @@ -87,12 +96,16 @@ def main() -> None: "{level: <8} | " "{message}", ) + extra: dict[str, str] = {} + if args.client_id: + extra["X-Fish-Client-Id"] = args.client_id logger.info( - "biomass_poller 启动 | base={} | interval={}s | GET /api/v1/biomass/real/camera/ 与 /health/result/", + "biomass_poller 启动 | base={} | interval={}s | client_id={} | GET biomass real/camera + health/result", args.base_url.rstrip("/"), args.interval, + args.client_id or "(default)", ) - asyncio.run(poll_loop(args.base_url, args.interval)) + asyncio.run(poll_loop(args.base_url, args.interval, extra)) if __name__ == "__main__": diff --git a/scripts/setup_ffmpeg.sh b/scripts/setup_ffmpeg.sh new file mode 100755 index 0000000..b7e5e8c --- /dev/null +++ b/scripts/setup_ffmpeg.sh @@ -0,0 +1,58 @@ +#!/bin/bash +# 下载并设置项目专属的 ffmpeg 静态构建版本 +# 这确保项目使用一致的 ffmpeg 版本,避免环境问题 + +set -e + +PROJECT_ROOT="/home/ubuntu/projects/FishServer" +TOOLS_DIR="$PROJECT_ROOT/tools" +FFMPEG_DIR="$TOOLS_DIR/ffmpeg" +FFMPEG_VERSION="6.1.1" +FFMPEG_ARCH="amd64" + +echo "[setup-ffmpeg] Setting up project-specific ffmpeg..." + +# 创建工具目录 +mkdir -p "$TOOLS_DIR" + +# 检查是否已存在 +if [ -f "$FFMPEG_DIR/bin/ffmpeg" ]; then + echo "[setup-ffmpeg] ffmpeg already exists at $FFMPEG_DIR/bin/ffmpeg" + "$FFMPEG_DIR/bin/ffmpeg" -version | head -1 + exit 0 +fi + +# 下载 ffmpeg 静态构建版本 +echo "[setup-ffmpeg] Downloading ffmpeg $FFMPEG_VERSION..." +cd "$TOOLS_DIR" + +# 使用 johnvansickle 的静态构建版本 +DOWNLOAD_URL="https://johnvansickle.com/ffmpeg/releases/ffmpeg-${FFMPEG_VERSION}-${FFMPEG_ARCH}-static.tar.xz" +TAR_FILE="ffmpeg-${FFMPEG_VERSION}-${FFMPEG_ARCH}-static.tar.xz" + +if command -v wget &> /dev/null; then + wget -q --show-progress "$DOWNLOAD_URL" -O "$TAR_FILE" +elif command -v curl &> /dev/null; then + curl -L --progress-bar "$DOWNLOAD_URL" -o "$TAR_FILE" +else + echo "[setup-ffmpeg] Error: wget or curl is required" + exit 1 +fi + +# 解压 +echo "[setup-ffmpeg] Extracting..." +tar -xf "$TAR_FILE" + +# 移动到新位置 +EXTRACTED_DIR=$(tar -tf "$TAR_FILE" | head -1 | cut -f1 -d"/") +mv "$EXTRACTED_DIR" "$FFMPEG_DIR" + +# 清理 +rm "$TAR_FILE" + +# 验证 +echo "[setup-ffmpeg] Verifying installation..." +"$FFMPEG_DIR/bin/ffmpeg" -version | head -1 +"$FFMPEG_DIR/bin/ffmpeg" -encoders | grep -i "h264\|x264" | head -5 + +echo "[setup-ffmpeg] Done! ffmpeg is installed at $FFMPEG_DIR/bin/ffmpeg" diff --git a/scripts/start_fresh.sh b/scripts/start_fresh.sh index deab4f8..e2dade4 100755 --- a/scripts/start_fresh.sh +++ b/scripts/start_fresh.sh @@ -4,6 +4,9 @@ # bash scripts/start_fresh.sh # PORT=8001 bash scripts/start_fresh.sh # +# 保留测量输出目录以复用点云(配合 .env 中 MEASURE_REUSE_EXISTING_CLOUDS=true): +# KEEP_MEASURE_OUTPUT=1 bash scripts/start_fresh.sh +# set -euo pipefail ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" exec bash "$ROOT/fish_api/start_fresh.sh"