#!/usr/bin/env python3 """ 独立进程:轮询 Fish API 的两个结果接口,用 loguru 输出响应。 接口每次 GET 会「消费」一条未投递快照;仅当响应头 X-Fish-Biomass-New: 1(或 JSON code!=200)时打日志。 cd <仓库根目录> BIOMASS_API_BASE=http://127.0.0.1:8000 POLL_INTERVAL=5 python3 scripts/biomass_poller.py 依赖(与 fish_api dev 组一致):httpx、loguru cd fish_api && pip install -e ".[dev]" """ from __future__ import annotations import argparse import asyncio import json import os import sys from typing import Any import httpx from loguru import logger def _fmt_body(resp: httpx.Response) -> Any: try: return resp.json() except Exception: return resp.text def _should_log(resp: httpx.Response, body: Any) -> bool: """有新投递的快照(X-Fish-Biomass-New: 1)或业务错误(JSON code!=200)时打日志。""" if not isinstance(body, dict): return bool(body) if body.get("code") != 200: return True return resp.headers.get("X-Fish-Biomass-New", "").strip() == "1" async def poll_once( client: httpx.AsyncClient, base: str, extra_headers: dict[str, str] ) -> None: base = base.rstrip("/") camera_url = f"{base}/api/v1/biomass/real/camera/" health_url = f"{base}/api/v1/biomass/health/result/" r1 = await client.get(camera_url, headers=extra_headers) r2 = await client.get(health_url, headers=extra_headers) b1 = _fmt_body(r1) b2 = _fmt_body(r2) if _should_log(r1, b1): logger.info("[real/camera/] HTTP {} | {}", r1.status_code, json.dumps(b1, ensure_ascii=False)) if _should_log(r2, b2): logger.info("[health/result/] HTTP {} | {}", r2.status_code, json.dumps(b2, ensure_ascii=False)) async def poll_loop( base: str, interval: float, extra_headers: dict[str, str] ) -> None: async with httpx.AsyncClient(timeout=30.0) as client: while True: try: await poll_once(client, base, extra_headers) except Exception: logger.exception("poll round failed") await asyncio.sleep(max(interval, 0.5)) def main() -> None: parser = argparse.ArgumentParser(description="Poll Fish API biomass GET endpoints.") parser.add_argument( "--base-url", default=os.environ.get("BIOMASS_API_BASE", "http://127.0.0.1:8000"), help="Fish API 根 URL(默认 BIOMASS_API_BASE 或 http://127.0.0.1:8000)", ) parser.add_argument( "--interval", type=float, default=float(os.environ.get("POLL_INTERVAL", "5")), help="轮询间隔秒数(默认 POLL_INTERVAL 或 5)", ) parser.add_argument( "--client-id", default=os.environ.get("BIOMASS_CLIENT_ID", "").strip() or None, help="传给 X-Fish-Client-Id(默认 BIOMASS_CLIENT_ID;不设则与网关 default 游标一致)", ) args = parser.parse_args() logger.remove() logger.add( sys.stderr, level=os.environ.get("LOG_LEVEL", "INFO"), format="{time:YYYY-MM-DD HH:mm:ss.SSS} | " "{level: <8} | " "{message}", ) extra: dict[str, str] = {} if args.client_id: extra["X-Fish-Client-Id"] = args.client_id logger.info( "biomass_poller 启动 | base={} | interval={}s | client_id={} | GET biomass real/camera + health/result", args.base_url.rstrip("/"), args.interval, args.client_id or "(default)", ) asyncio.run(poll_loop(args.base_url, args.interval, extra)) if __name__ == "__main__": main()