"""仅 FastAPI 进程使用 SQLite:落库测量/健康结果与 watch 已处理路径。 FishMeasure / FishAction 子进程不连接、不依赖本库;它们只读写各自文件(如 measure_output 下 weight_prediction.json、临时 pred.json 等),由 fish_api 在子进程结束后读文件并写入本表。 预览视频在 media_root;start_fresh 默认仅重置投递游标,设置 CLEAR_* 环境变量可按需清空各目录。 """ from __future__ import annotations import json import math import re import shutil import sqlite3 from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional, Set, Tuple from app.settings import Settings from app.state import HealthSnapshot, MeasureSnapshot # 未带客户端标识时与旧行为兼容:共享同一条投递游标 DEFAULT_CLIENT_ID = "default" MAX_CLIENT_ID_LEN = 128 def normalize_client_id(raw: Optional[str]) -> str: """供轮询接口使用;过长截断,空值回退为 DEFAULT_CLIENT_ID。""" if raw is None: return DEFAULT_CLIENT_ID s = str(raw).strip() if not s: return DEFAULT_CLIENT_ID if len(s) > MAX_CLIENT_ID_LEN: s = s[:MAX_CLIENT_ID_LEN] return s def _connect(path: Path) -> sqlite3.Connection: path.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(str(path), check_same_thread=False, isolation_level=None) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA foreign_keys=ON") return conn def init_db(settings: Settings) -> None: conn = _connect(settings.sqlite_path) try: conn.executescript( """ CREATE TABLE IF NOT EXISTS measure_snapshots ( id INTEGER PRIMARY KEY AUTOINCREMENT, created_at TEXT NOT NULL, result_json TEXT NOT NULL, video_left TEXT NOT NULL DEFAULT '', video_right TEXT NOT NULL DEFAULT '', error TEXT, raw_prediction_path TEXT, source_path TEXT, client_id TEXT DEFAULT NULL, pred REAL, star INTEGER DEFAULT 0 ); CREATE INDEX IF NOT EXISTS idx_measure_client_id ON measure_snapshots(client_id); CREATE TABLE IF NOT EXISTS health_snapshots ( id INTEGER PRIMARY KEY AUTOINCREMENT, created_at TEXT NOT NULL, behavior_result TEXT NOT NULL DEFAULT '', health_result TEXT NOT NULL DEFAULT '', raw_class_en TEXT NOT NULL DEFAULT '', error TEXT, source_path TEXT, video_url TEXT NOT NULL DEFAULT '' ); CREATE TABLE IF NOT EXISTS watch_processed ( path TEXT NOT NULL, kind TEXT NOT NULL CHECK (kind IN ('measure', 'action')), PRIMARY KEY (path, kind) ); CREATE TABLE IF NOT EXISTS delivery_client_cursor ( client_id TEXT NOT NULL, kind TEXT NOT NULL CHECK (kind IN ('measure', 'health')), last_delivered_id INTEGER NOT NULL DEFAULT 0, PRIMARY KEY (client_id, kind) ); CREATE TABLE IF NOT EXISTS zed_recording_sessions ( id INTEGER PRIMARY KEY AUTOINCREMENT, fish_id INTEGER NOT NULL, started_at TEXT NOT NULL, stopped_at TEXT, output_dir TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_zed_sessions_fish_id ON zed_recording_sessions(fish_id); """ ) _migrate_delivery_cursor_from_legacy(conn) _ensure_delivery_cursors(conn) _migrate_add_client_id_column(conn) _migrate_add_pred_star_columns(conn) _migrate_add_calculation_log_column(conn) _migrate_add_health_video_url_column(conn) finally: conn.close() _FISH_DIR_RE = re.compile(r"^fish(\d+)$") def _max_numeric_fish_folder_id(parent: Path) -> int: """``parent`` 下名为 ``fish{N}``(N 为数字)的直接子目录中,最大的 N;无则 0。""" if not parent.is_dir(): return 0 m = 0 try: for p in parent.iterdir(): if not p.is_dir(): continue mo = _FISH_DIR_RE.match(p.name) if mo: m = max(m, int(mo.group(1))) except OSError: return 0 return m def _max_fish_id_from_svo2_under_parent(parent: Path) -> int: """扫描 ``parent`` 下 ``fish{{N}}/.../*.svo2``,从相对路径第一段取最大 N(库未记会话时仍能发现已有数据)。""" if not parent.is_dir(): return 0 m = 0 try: for p in parent.rglob("*.svo2"): try: rel = p.relative_to(parent) except ValueError: continue if not rel.parts: continue mo = _FISH_DIR_RE.match(rel.parts[0]) if mo: m = max(m, int(mo.group(1))) except OSError: return 0 return m def _max_fish_id_from_zed_sessions(conn: sqlite3.Connection) -> int: """合并 ``fish_id`` 列与 ``output_dir`` 路径中出现的 ``fish{{N}}`` 段(防库与路径不一致)。""" m = 0 for row in conn.execute("SELECT fish_id, output_dir FROM zed_recording_sessions"): m = max(m, int(row["fish_id"])) od = row["output_dir"] if not od: continue try: for part in Path(str(od)).parts: mo = _FISH_DIR_RE.match(part) if mo: m = max(m, int(mo.group(1))) except (ValueError, OSError): pass return m def begin_zed_recording_session(settings: Settings) -> Tuple[int, int, Path]: """为本次录制分配 ``fish_id`` 并写入 ``zed_recording_sessions``。 编号取库内会话、父目录下 ``fish``+数字 子目录、以及其下 ``.svo2`` 文件路径所反映编号三者的最大值再加 1, 避免目录里已有数据而库尚未记录时仍复用同一编号;若目标路径仍存在则顺延直至可用。 返回 ``(session_row_id, fish_id, output_dir)``。目录规则: 若配置了 ``MEASURE_WATCH_DIR`` 则为 ``{MEASURE_WATCH_DIR}/fish{N}``, 否则为 ``{STREAM_TMP_DIR}/zed_svo2/fish{N}``。 """ init_db(settings) conn = _connect(settings.sqlite_path) try: conn.execute("BEGIN IMMEDIATE") db_max = _max_fish_id_from_zed_sessions(conn) if settings.measure_watch_dir is not None: parent = settings.measure_watch_dir.resolve() else: parent = (settings.stream_tmp_dir / "zed_svo2").resolve() fs_dir = _max_numeric_fish_folder_id(parent) fs_svo = _max_fish_id_from_svo2_under_parent(parent) fs_max = max(fs_dir, fs_svo) fish_id = max(db_max, fs_max, 99) + 1 ts = datetime.now(timezone.utc).isoformat() output_dir = (parent / f"fish{fish_id}").resolve() # 极端情况:并发或其它原因导致目录已存在,则顺延编号 for _ in range(10000): if not output_dir.exists(): break fish_id += 1 output_dir = (parent / f"fish{fish_id}").resolve() else: raise RuntimeError( f"无法为 ZED 录制分配空闲 fish 目录(父目录 {parent})" ) output_dir.mkdir(parents=True, exist_ok=True) cur = conn.execute( """ INSERT INTO zed_recording_sessions (fish_id, started_at, stopped_at, output_dir) VALUES (?, ?, NULL, ?) """, (fish_id, ts, str(output_dir)), ) session_id = int(cur.lastrowid) conn.commit() return (session_id, fish_id, output_dir) except Exception: conn.rollback() raise finally: conn.close() def mark_zed_recording_session_stopped( settings: Settings, session_row_id: int ) -> Optional[int]: """将对应会话行的 ``stopped_at`` 置为当前时间;返回 ``fish_id``,未更新则 ``None``。""" init_db(settings) conn = _connect(settings.sqlite_path) try: ts = datetime.now(timezone.utc).isoformat() cur = conn.execute( """ UPDATE zed_recording_sessions SET stopped_at = ? WHERE id = ? AND stopped_at IS NULL """, (ts, session_row_id), ) if cur.rowcount == 0: conn.commit() return None row = conn.execute( "SELECT fish_id FROM zed_recording_sessions WHERE id = ?", (session_row_id,), ).fetchone() conn.commit() return int(row["fish_id"]) if row else None finally: conn.close() def _migrate_add_client_id_column(conn: sqlite3.Connection) -> None: """为旧数据库添加 client_id 列(如果不存在)。""" row = conn.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name='measure_snapshots'" ).fetchone() if row is None: return # 检查 client_id 列是否存在 cols = conn.execute("PRAGMA table_info(measure_snapshots)").fetchall() has_client_id = any(col[1] == "client_id" for col in cols) if not has_client_id: conn.execute("ALTER TABLE measure_snapshots ADD COLUMN client_id TEXT DEFAULT NULL") conn.execute("CREATE INDEX idx_measure_client_id ON measure_snapshots(client_id)") conn.commit() def _migrate_add_pred_star_columns(conn: sqlite3.Connection) -> None: """为旧数据库添加 pred 和 star 列(如果不存在)。""" row = conn.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name='measure_snapshots'" ).fetchone() if row is None: return cols = conn.execute("PRAGMA table_info(measure_snapshots)").fetchall() col_names = {col[1] for col in cols} if "pred" not in col_names: conn.execute("ALTER TABLE measure_snapshots ADD COLUMN pred REAL") if "star" not in col_names: conn.execute("ALTER TABLE measure_snapshots ADD COLUMN star INTEGER DEFAULT 0") conn.commit() def _migrate_add_calculation_log_column(conn: sqlite3.Connection) -> None: """为旧数据库添加 calculation_log 列(体重推算过程文本,对齐 test_dgcnn 终端输出)。""" row = conn.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name='measure_snapshots'" ).fetchone() if row is None: return cols = conn.execute("PRAGMA table_info(measure_snapshots)").fetchall() col_names = {col[1] for col in cols} if "calculation_log" not in col_names: conn.execute("ALTER TABLE measure_snapshots ADD COLUMN calculation_log TEXT") conn.commit() def _migrate_add_health_video_url_column(conn: sqlite3.Connection) -> None: """为旧数据库的 health_snapshots 添加 video_url 列(如果不存在)。""" row = conn.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name='health_snapshots'" ).fetchone() if row is None: return cols = conn.execute("PRAGMA table_info(health_snapshots)").fetchall() col_names = {col[1] for col in cols} if "video_url" not in col_names: conn.execute( "ALTER TABLE health_snapshots ADD COLUMN video_url TEXT NOT NULL DEFAULT ''" ) conn.commit() def _migrate_delivery_cursor_from_legacy(conn: sqlite3.Connection) -> None: """旧表 delivery_cursor(kind) → delivery_client_cursor(default, kind)。""" row = conn.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name='delivery_cursor'" ).fetchone() if row is None: return for kind in ("measure", "health"): r = conn.execute( "SELECT last_delivered_id FROM delivery_cursor WHERE kind = ?", (kind,) ).fetchone() if r is not None: conn.execute( """ INSERT OR REPLACE INTO delivery_client_cursor (client_id, kind, last_delivered_id) VALUES (?, ?, ?) """, (DEFAULT_CLIENT_ID, kind, int(r["last_delivered_id"])), ) conn.execute("DROP TABLE delivery_cursor") conn.commit() def _ensure_delivery_cursors(conn: sqlite3.Connection) -> None: """为默认客户端插入一行游标;首次插入时 last_delivered_id=当前 MAX(id),避免升级后逐条投递历史快照。""" for kind, table in ( ("measure", "measure_snapshots"), ("health", "health_snapshots"), ): row = conn.execute( "SELECT 1 FROM delivery_client_cursor WHERE client_id = ? AND kind = ?", (DEFAULT_CLIENT_ID, kind), ).fetchone() if row is None: mid = conn.execute( f"SELECT COALESCE(MAX(id), 0) FROM {table}" ).fetchone()[0] conn.execute( """ INSERT INTO delivery_client_cursor (client_id, kind, last_delivered_id) VALUES (?, ?, ?) """, (DEFAULT_CLIENT_ID, kind, int(mid)), ) conn.commit() def save_measure_snapshot( settings: Settings, snap: MeasureSnapshot, source_path: Optional[str] = None, client_id: Optional[str] = None, ) -> None: init_db(settings) conn = _connect(settings.sqlite_path) try: ts = ( snap.updated_at.isoformat() if snap.updated_at else datetime.now(timezone.utc).isoformat() ) conn.execute( """ INSERT INTO measure_snapshots ( created_at, result_json, video_left, video_right, error, raw_prediction_path, source_path, client_id, pred, star, calculation_log ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( ts, json.dumps(snap.result, ensure_ascii=False), snap.video_left, snap.video_right, snap.error, snap.raw_prediction_path, source_path, client_id, snap.pred, 1 if snap.star else 0, snap.calculation_log, ), ) finally: conn.close() def save_health_snapshot( settings: Settings, snap: HealthSnapshot, source_path: Optional[str] = None, video_url: str = "", ) -> None: init_db(settings) conn = _connect(settings.sqlite_path) try: ts = ( snap.updated_at.isoformat() if snap.updated_at else datetime.now(timezone.utc).isoformat() ) conn.execute( """ INSERT INTO health_snapshots ( created_at, behavior_result, health_result, raw_class_en, error, source_path, video_url ) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( ts, snap.behavior_result, snap.health_result, snap.raw_class_en, snap.error, source_path, video_url, ), ) finally: conn.close() def _parse_dt(s: Optional[str]) -> Optional[datetime]: if not s: return None try: return datetime.fromisoformat(s.replace("Z", "+00:00")) except ValueError: return None def get_latest_measure(settings: Settings) -> MeasureSnapshot: init_db(settings) conn = _connect(settings.sqlite_path) try: row = conn.execute( """ SELECT created_at, result_json, video_left, video_right, error, raw_prediction_path, calculation_log FROM measure_snapshots ORDER BY id DESC LIMIT 1 """ ).fetchone() if row is None: return MeasureSnapshot(result=[], video_left="", video_right="") data: Any = json.loads(row["result_json"]) if not isinstance(data, list): data = [] return MeasureSnapshot( result=data, video_left=row["video_left"] or "", video_right=row["video_right"] or "", updated_at=_parse_dt(row["created_at"]), error=row["error"], raw_prediction_path=row["raw_prediction_path"], calculation_log=row["calculation_log"], ) finally: conn.close() def list_all_measure_snapshots(settings: Settings) -> List[Dict[str, Any]]: """返回 ``measure_snapshots`` 全部行(id 降序,最新在前),供调试接口使用。""" init_db(settings) conn = _connect(settings.sqlite_path) try: rows = conn.execute( """ SELECT id, created_at, result_json, video_left, video_right, error, raw_prediction_path, source_path, client_id, pred, star, calculation_log FROM measure_snapshots ORDER BY id DESC """ ).fetchall() out: List[Dict[str, Any]] = [] for row in rows: data: Any = json.loads(row["result_json"]) if not isinstance(data, list): data = [] st = row["star"] out.append( { "id": row["id"], "created_at": row["created_at"], "result": data, "video_left": row["video_left"] or "", "video_right": row["video_right"] or "", "error": row["error"], "raw_prediction_path": row["raw_prediction_path"], "source_path": row["source_path"], "client_id": row["client_id"], "pred": row["pred"], "star": bool(st) if st is not None else False, "calculation_log": row["calculation_log"], } ) return out finally: conn.close() def list_all_health_snapshots(settings: Settings) -> List[Dict[str, Any]]: """返回 ``health_snapshots`` 全部行(id 降序,最新在前),供调试接口使用。""" init_db(settings) conn = _connect(settings.sqlite_path) try: rows = conn.execute( """ SELECT id, created_at, behavior_result, health_result, raw_class_en, error, source_path, video_url FROM health_snapshots ORDER BY id DESC """ ).fetchall() out: List[Dict[str, Any]] = [] for row in rows: out.append( { "id": row["id"], "created_at": row["created_at"], "behavior_result": row["behavior_result"] or "", "health_result": row["health_result"] or "", "raw_class_en": row["raw_class_en"] or "", "error": row["error"], "source_path": row["source_path"], "video_path": row["video_url"] or "", } ) return out finally: conn.close() def get_latest_health(settings: Settings) -> HealthSnapshot: init_db(settings) conn = _connect(settings.sqlite_path) try: row = conn.execute( """ SELECT created_at, behavior_result, health_result, raw_class_en, error, video_url FROM health_snapshots ORDER BY id DESC LIMIT 1 """ ).fetchone() if row is None: return HealthSnapshot(behavior_result="", health_result="") return HealthSnapshot( behavior_result=row["behavior_result"] or "", health_result=row["health_result"] or "", updated_at=_parse_dt(row["created_at"]), error=row["error"], raw_class_en=row["raw_class_en"] or "", video_path=row["video_url"] or "", ) finally: conn.close() def _coerce_finite_number(v: Any) -> Optional[float]: if v is None: return None if isinstance(v, bool): return None if isinstance(v, (int, float)): x = float(v) return x if math.isfinite(x) else None if isinstance(v, str): s = v.strip() if not s: return None try: x = float(s) return x if math.isfinite(x) else None except ValueError: return None return None def _coerce_track_id(v: Any) -> Optional[int]: # bool is a subclass of int in Python if isinstance(v, bool): return None if isinstance(v, int): return v if v >= 0 else None if isinstance(v, str): try: i = int(v.strip(), 10) return i if i >= 0 else None except ValueError: return None return None def measure_result_deliverable(result: Any, error: Optional[str]) -> bool: """至少一条记录含有效 track id 与有限数值的 weight(g)、length(mm)。""" if error: return False if not isinstance(result, list) or not result: return False for it in result: if not isinstance(it, dict): continue tid = _coerce_track_id(it.get("id")) w = _coerce_finite_number(it.get("weight")) ln = _coerce_finite_number(it.get("length")) if tid is not None and w is not None and ln is not None: return True return False def measure_snapshot_deliverable(snap: MeasureSnapshot) -> bool: return measure_result_deliverable(snap.result, snap.error) def health_snapshot_deliverable(snap: HealthSnapshot) -> bool: if snap.error: return False b = (snap.behavior_result or "").strip() h = (snap.health_result or "").strip() r = (snap.raw_class_en or "").strip() return bool(b or h or r) def _health_row_deliverable( behavior_result: str, health_result: str, raw_class_en: str, error: Optional[str], ) -> bool: snap = HealthSnapshot( behavior_result=behavior_result or "", health_result=health_result or "", raw_class_en=raw_class_en or "", error=error, ) return health_snapshot_deliverable(snap) def _last_delivered_id( conn: sqlite3.Connection, kind: str, snapshots_table: str, client_id: str, ) -> int: row = conn.execute( """ SELECT last_delivered_id FROM delivery_client_cursor WHERE kind = ? AND client_id = ? """, (kind, client_id), ).fetchone() if row is not None: return int(row["last_delivered_id"]) mid = conn.execute( f"SELECT COALESCE(MAX(id), 0) FROM {snapshots_table}" ).fetchone()[0] conn.execute( """ INSERT INTO delivery_client_cursor (client_id, kind, last_delivered_id) VALUES (?, ?, ?) """, (client_id, kind, int(mid)), ) return int(mid) def pop_next_measure( settings: Settings, client_id: str = DEFAULT_CLIENT_ID, ) -> Tuple[MeasureSnapshot, bool, Optional[int]]: """取该客户端队首未投递且可交付的 measure 快照并推进其游标;跳过不可交付行仅推进游标。 只返回与该 client_id 匹配的记录(client_id 为 NULL 的记录对所有客户端可见,用于向后兼容)。 """ cid = normalize_client_id(client_id) init_db(settings) conn = _connect(settings.sqlite_path) try: conn.execute("BEGIN IMMEDIATE") last_id = _last_delivered_id(conn, "measure", "measure_snapshots", cid) while True: # 只查询匹配该 client_id 或 client_id 为 NULL 的记录 next_row = conn.execute( """ SELECT id, created_at, result_json, video_left, video_right, error, raw_prediction_path, pred, star, calculation_log FROM measure_snapshots WHERE id > ? AND (client_id = ? OR client_id IS NULL) ORDER BY id ASC LIMIT 1 """, (last_id, cid), ).fetchone() if next_row is None: conn.commit() return MeasureSnapshot(result=[], video_left="", video_right=""), False, None nid = int(next_row["id"]) err: Optional[str] = next_row["error"] data: Any = json.loads(next_row["result_json"]) if not isinstance(data, list): data = [] conn.execute( """ UPDATE delivery_client_cursor SET last_delivered_id = ? WHERE kind = ? AND client_id = ? """, (nid, "measure", cid), ) if not measure_result_deliverable(data, err): last_id = nid continue conn.commit() snap = MeasureSnapshot( result=data, video_left=next_row["video_left"] or "", video_right=next_row["video_right"] or "", updated_at=_parse_dt(next_row["created_at"]), error=err, raw_prediction_path=next_row["raw_prediction_path"], pred=next_row["pred"], star=bool(next_row["star"]) if next_row["star"] is not None else False, calculation_log=next_row["calculation_log"], ) return snap, True, nid except Exception: conn.rollback() raise finally: conn.close() def pop_next_health( settings: Settings, client_id: str = DEFAULT_CLIENT_ID, ) -> Tuple[HealthSnapshot, bool, Optional[int]]: """取该客户端队首未投递且可交付的 health 快照并推进其游标;跳过不可交付行仅推进游标。""" cid = normalize_client_id(client_id) init_db(settings) conn = _connect(settings.sqlite_path) try: conn.execute("BEGIN IMMEDIATE") last_id = _last_delivered_id(conn, "health", "health_snapshots", cid) while True: next_row = conn.execute( """ SELECT id, created_at, behavior_result, health_result, raw_class_en, error, source_path, video_url FROM health_snapshots WHERE id > ? ORDER BY id ASC LIMIT 1 """, (last_id,), ).fetchone() if next_row is None: conn.commit() return HealthSnapshot(behavior_result="", health_result=""), False, None nid = int(next_row["id"]) beh = next_row["behavior_result"] or "" hlth = next_row["health_result"] or "" raw_en = next_row["raw_class_en"] or "" err: Optional[str] = next_row["error"] conn.execute( """ UPDATE delivery_client_cursor SET last_delivered_id = ? WHERE kind = ? AND client_id = ? """, (nid, "health", cid), ) if not _health_row_deliverable(beh, hlth, raw_en, err): last_id = nid continue conn.commit() snap = HealthSnapshot( behavior_result=beh, health_result=hlth, updated_at=_parse_dt(next_row["created_at"]), error=err, raw_class_en=raw_en, video_path=next_row["video_url"] or "", ) return snap, True, nid except Exception: conn.rollback() raise finally: conn.close() def peek_last_delivered_health_video_url( settings: Settings, client_id: str = DEFAULT_CLIENT_ID, ) -> str: """返回该客户端上一次 ``pop_next_health`` 投递的行的 ``video_url``(不推进游标)。 用于 ``/water/video/`` 端点对齐:返回与最近投递的 health 快照相同的视频 URL。 """ cid = normalize_client_id(client_id) init_db(settings) conn = _connect(settings.sqlite_path) try: cursor_row = conn.execute( """ SELECT last_delivered_id FROM delivery_client_cursor WHERE kind = ? AND client_id = ? """, ("health", cid), ).fetchone() if cursor_row is None: return "" last_id = int(cursor_row["last_delivered_id"]) if last_id <= 0: return "" row = conn.execute( "SELECT video_url FROM health_snapshots WHERE id = ?", (last_id,), ).fetchone() if row is None: return "" return row["video_url"] or "" finally: conn.close() def _load_json_processed_set(path: Path) -> Set[str]: if not path.is_file(): return set() try: with open(path, encoding="utf-8") as f: data: Any = json.load(f) if isinstance(data, list): return set(str(x) for x in data) if isinstance(data, dict) and "processed" in data: return set(str(x) for x in data["processed"]) except (json.JSONDecodeError, OSError): pass return set() def load_watch_processed(settings: Settings, state_file: Path, kind: str) -> Set[str]: """从 SQLite 读取已处理路径;若存在旧版 JSON 状态文件则合并导入(幂等)。""" assert kind in ("measure", "action") init_db(settings) conn = _connect(settings.sqlite_path) try: for p in _load_json_processed_set(state_file): conn.execute( "INSERT OR IGNORE INTO watch_processed (path, kind) VALUES (?, ?)", (p, kind), ) conn.commit() cur = conn.execute( "SELECT path FROM watch_processed WHERE kind = ?", (kind,) ) return {r[0] for r in cur} finally: conn.close() def add_watch_processed(settings: Settings, path: str, kind: str) -> None: assert kind in ("measure", "action") init_db(settings) conn = _connect(settings.sqlite_path) try: conn.execute( "INSERT OR IGNORE INTO watch_processed (path, kind) VALUES (?, ?)", (path, kind), ) conn.commit() finally: conn.close() def _safe_rm_tree(path: Path) -> None: """安全删除目录树(包括目录本身),忽略不存在或权限错误。""" p = Path(path).resolve() if not p.exists(): return try: if p.is_dir(): shutil.rmtree(p) else: p.unlink() except OSError as e: print(f"[prestart-fresh] skip remove {p}: {e}", flush=True) def clear_runtime_compute_dirs(settings: Settings) -> None: """清空 FishMeasure / FishAction 运行时目录、托管预览、ingest 临时文件(保留目录本身)。 注意:start_fresh 默认不再调用此函数。仅供手动或脚本显式调用。 """ for base in ( settings.measure_output_root, settings.action_output_root, settings.media_root, settings.stream_tmp_dir, ): p = Path(base).resolve() if not p.is_dir(): continue for child in p.iterdir(): try: if child.is_dir(): shutil.rmtree(child) else: child.unlink() except OSError as e: print(f"[prestart-fresh] skip remove {child}: {e}", flush=True) def remove_sqlite_database_files(settings: Settings) -> None: """删除 SQLite 主库及 WAL/SHM 副文件;不存在则忽略。下次 init_db 会重建空库。""" base = settings.sqlite_path.resolve() for p in (base, Path(str(base) + "-wal"), Path(str(base) + "-shm")): try: if p.is_file(): p.unlink() except OSError: pass def reset_delivery_client_progress(settings: Settings) -> None: """仅重置客户端投递游标(保留历史快照与 watch 缓存)。""" init_db(settings) conn = _connect(settings.sqlite_path) try: # 清空所有客户端游标,避免沿用旧 client_id 的消费进度。 conn.execute("UPDATE delivery_client_cursor SET last_delivered_id = 0") # 确保默认客户端行存在(历史库升级场景)。 for kind in ("measure", "health"): conn.execute( """ INSERT INTO delivery_client_cursor (client_id, kind, last_delivered_id) VALUES (?, ?, 0) ON CONFLICT(client_id, kind) DO NOTHING """, (DEFAULT_CLIENT_ID, kind), ) conn.commit() finally: conn.close() def clear_watch_cache_and_snapshots(settings: Settings) -> None: """清空 watch 已处理路径与对应快照,便于重新跑推理(与 measure/action_watch 的 use_state_file 开关一致)。""" init_db(settings) conn = _connect(settings.sqlite_path) try: if settings.measure_watch_use_state_file: conn.execute("DELETE FROM watch_processed WHERE kind = ?", ("measure",)) conn.execute("DELETE FROM measure_snapshots") conn.execute( "UPDATE delivery_client_cursor SET last_delivered_id = 0 WHERE kind = ?", ("measure",), ) if settings.action_watch_use_state_file: conn.execute("DELETE FROM watch_processed WHERE kind = ?", ("action",)) conn.execute("DELETE FROM health_snapshots") conn.execute( "UPDATE delivery_client_cursor SET last_delivered_id = 0 WHERE kind = ?", ("health",), ) conn.commit() finally: conn.close()