"""命令行:ZED 分段录制。 - ``start``(默认):本进程内 ``start_zed_recording``,前台阻塞直到 Ctrl+C 再 ``stop``(不经过 HTTP)。 - ``start --remote``:``POST`` 已运行的 fish_api,不阻塞。 - ``stop`` / ``status``:仅 HTTP,用于停/查 **由 uvicorn 进程托管** 的录制。 HTTP 基址:环境变量 ``FISH_API_BASE_URL`` 或 ``PUBLIC_BASE_URL``,否则 ``http://127.0.0.1:8000``。 ZED 路由不校验 ingest API Key,请求无需 ``X-API-Key``。 每次启动由服务端分配 ``fish_id``(库表与 ``output_dir``、目标父目录下 ``fish``+数字 及 ``.svo2`` 路径综合)并写入 ``zed_recording_sessions``。 """ from __future__ import annotations import argparse import json import os import sys import time import urllib.error import urllib.request from typing import Any, Dict, Optional def _base_url() -> str: """优先环境变量;否则从 ``fish_api/.env`` 经 ``get_settings()`` 读 ``FISH_API_BASE_URL`` / ``PUBLIC_BASE_URL``。""" u = os.environ.get("FISH_API_BASE_URL") or os.environ.get("PUBLIC_BASE_URL") if u: return str(u).strip().rstrip("/") from app.settings import get_settings s = get_settings() u2 = (s.fish_api_base_url or "").strip() or (s.public_base_url or "").strip() if u2: return u2.rstrip("/") return "http://127.0.0.1:8000" def _http_request_json(method: str, path: str, body: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: url = _base_url() + path if method == "GET": req = urllib.request.Request(url, method="GET") elif method == "POST": payload = json.dumps(body if body is not None else {}).encode("utf-8") req = urllib.request.Request( url, data=payload, headers={"Content-Type": "application/json; charset=utf-8"}, method="POST", ) else: raise ValueError(method) try: with urllib.request.urlopen(req, timeout=120) as resp: raw = resp.read().decode("utf-8") if not raw.strip(): return {} return json.loads(raw) except urllib.error.HTTPError as e: err_body = e.read().decode("utf-8", errors="replace") print(f"HTTP {e.code}: {err_body}", file=sys.stderr) raise SystemExit(1) from None except urllib.error.URLError as e: print(f"请求失败: {e}", file=sys.stderr) raise SystemExit(1) from None def _cmd_start_remote(args: argparse.Namespace) -> None: body: Dict[str, Any] = {} if args.segment_sec is not None: body["segment_sec"] = args.segment_sec r = _http_request_json("POST", "/api/v1/zed/recording/start", body) print(json.dumps(r, ensure_ascii=False, indent=2)) def _cmd_start_local(args: argparse.Namespace) -> None: from app.services.zed_recording_control import ( start_zed_recording, stop_zed_recording, ) from app.settings import get_settings s = get_settings() ok, msg, fish_id, _ = start_zed_recording( s, segment_sec=args.segment_sec, ) if not ok: print(f"启动失败: {msg}", file=sys.stderr) raise SystemExit(1) print( f"fish_id={fish_id},录制中,按 Ctrl+C 停止…", flush=True, ) try: while True: time.sleep(1.0) except KeyboardInterrupt: print("\n正在停止…", flush=True) stop_ok, stop_msg, stop_fish = stop_zed_recording(s) if stop_ok: print(f"已退出。fish_id={stop_fish}", flush=True) else: print(f"停止结果: {stop_msg}", file=sys.stderr) def _cmd_stop(_args: argparse.Namespace) -> None: r = _http_request_json("POST", "/api/v1/zed/recording/stop", {}) print(json.dumps(r, ensure_ascii=False, indent=2)) def _cmd_status(_args: argparse.Namespace) -> None: r = _http_request_json("GET", "/api/v1/zed/recording/status", None) print(json.dumps(r, ensure_ascii=False, indent=2)) def main() -> None: parser = argparse.ArgumentParser( description="ZED 分段录制:本地阻塞 start,或 HTTP 调用 fish_api", ) sub = parser.add_subparsers(dest="cmd", required=True) p_start = sub.add_parser("start", help="本地启动(阻塞至 Ctrl+C)或 --remote 仅请求 API") p_start.add_argument( "--remote", action="store_true", help="通过 HTTP 调用 fish_api,不阻塞本进程", ) p_start.add_argument("--segment-sec", type=float, default=None, metavar="SEC") sub.add_parser("stop", help="HTTP 停止 fish_api 进程中的录制") sub.add_parser("status", help="HTTP 查询 fish_api 中的录制状态") args = parser.parse_args() if args.cmd == "start": if args.remote: _cmd_start_remote(args) else: _cmd_start_local(args) elif args.cmd == "stop": _cmd_stop(args) elif args.cmd == "status": _cmd_status(args) if __name__ == "__main__": main()