Files
FishServer/fish_api/app/zed_record_cli.py

151 lines
4.9 KiB
Python
Raw Permalink Normal View History

"""命令行ZED 分段录制。
- ``start``默认本进程内 ``start_zed_recording``前台阻塞直到 Ctrl+C ``stop``不经过 HTTP
- ``start --remote````POST`` 已运行的 fish_api不阻塞
- ``stop`` / ``status`` HTTP用于停/ ** uvicorn 进程托管** 的录制
HTTP 基址环境变量 ``FISH_API_BASE_URL`` ``PUBLIC_BASE_URL``否则 ``http://127.0.0.1:8000``
ZED 路由不校验 ingest API Key请求无需 ``X-API-Key``
每次启动由服务端分配 ``fish_id``库表与 ``output_dir``目标父目录下 ``fish``+数字 ``.svo2`` 路径综合并写入 ``zed_recording_sessions``
"""
from __future__ import annotations
import argparse
import json
import os
import sys
import time
import urllib.error
import urllib.request
from typing import Any, Dict, Optional
def _base_url() -> str:
"""优先环境变量;否则从 ``fish_api/.env`` 经 ``get_settings()`` 读 ``FISH_API_BASE_URL`` / ``PUBLIC_BASE_URL``。"""
u = os.environ.get("FISH_API_BASE_URL") or os.environ.get("PUBLIC_BASE_URL")
if u:
return str(u).strip().rstrip("/")
from app.settings import get_settings
s = get_settings()
u2 = (s.fish_api_base_url or "").strip() or (s.public_base_url or "").strip()
if u2:
return u2.rstrip("/")
return "http://127.0.0.1:8000"
def _http_request_json(method: str, path: str, body: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
url = _base_url() + path
if method == "GET":
req = urllib.request.Request(url, method="GET")
elif method == "POST":
payload = json.dumps(body if body is not None else {}).encode("utf-8")
req = urllib.request.Request(
url,
data=payload,
headers={"Content-Type": "application/json; charset=utf-8"},
method="POST",
)
else:
raise ValueError(method)
try:
with urllib.request.urlopen(req, timeout=120) as resp:
raw = resp.read().decode("utf-8")
if not raw.strip():
return {}
return json.loads(raw)
except urllib.error.HTTPError as e:
err_body = e.read().decode("utf-8", errors="replace")
print(f"HTTP {e.code}: {err_body}", file=sys.stderr)
raise SystemExit(1) from None
except urllib.error.URLError as e:
print(f"请求失败: {e}", file=sys.stderr)
raise SystemExit(1) from None
def _cmd_start_remote(args: argparse.Namespace) -> None:
body: Dict[str, Any] = {}
if args.segment_sec is not None:
body["segment_sec"] = args.segment_sec
r = _http_request_json("POST", "/api/v1/zed/recording/start", body)
print(json.dumps(r, ensure_ascii=False, indent=2))
def _cmd_start_local(args: argparse.Namespace) -> None:
from app.services.zed_recording_control import (
start_zed_recording,
stop_zed_recording,
)
from app.settings import get_settings
s = get_settings()
ok, msg, fish_id, _ = start_zed_recording(
s,
segment_sec=args.segment_sec,
)
if not ok:
print(f"启动失败: {msg}", file=sys.stderr)
raise SystemExit(1)
print(
f"fish_id={fish_id},录制中,按 Ctrl+C 停止…",
flush=True,
)
try:
while True:
time.sleep(1.0)
except KeyboardInterrupt:
print("\n正在停止…", flush=True)
stop_ok, stop_msg, stop_fish = stop_zed_recording(s)
if stop_ok:
print(f"已退出。fish_id={stop_fish}", flush=True)
else:
print(f"停止结果: {stop_msg}", file=sys.stderr)
def _cmd_stop(_args: argparse.Namespace) -> None:
r = _http_request_json("POST", "/api/v1/zed/recording/stop", {})
print(json.dumps(r, ensure_ascii=False, indent=2))
def _cmd_status(_args: argparse.Namespace) -> None:
r = _http_request_json("GET", "/api/v1/zed/recording/status", None)
print(json.dumps(r, ensure_ascii=False, indent=2))
def main() -> None:
parser = argparse.ArgumentParser(
description="ZED 分段录制:本地阻塞 start或 HTTP 调用 fish_api",
)
sub = parser.add_subparsers(dest="cmd", required=True)
p_start = sub.add_parser("start", help="本地启动(阻塞至 Ctrl+C或 --remote 仅请求 API")
p_start.add_argument(
"--remote",
action="store_true",
help="通过 HTTP 调用 fish_api不阻塞本进程",
)
p_start.add_argument("--segment-sec", type=float, default=None, metavar="SEC")
sub.add_parser("stop", help="HTTP 停止 fish_api 进程中的录制")
sub.add_parser("status", help="HTTP 查询 fish_api 中的录制状态")
args = parser.parse_args()
if args.cmd == "start":
if args.remote:
_cmd_start_remote(args)
else:
_cmd_start_local(args)
elif args.cmd == "stop":
_cmd_stop(args)
elif args.cmd == "status":
_cmd_status(args)
if __name__ == "__main__":
main()