diff --git a/FishMeasure/fish_video_weight_evaluation.py b/FishMeasure/fish_video_weight_evaluation.py index 435a78a..811a0bb 100644 --- a/FishMeasure/fish_video_weight_evaluation.py +++ b/FishMeasure/fish_video_weight_evaluation.py @@ -10,6 +10,24 @@ import json import numpy as np import torch from pathlib import Path + +if not hasattr(argparse, "BooleanOptionalAction"): + class _BooleanOptionalAction(argparse.Action): + def __init__(self, option_strings, dest, default=None, type=None, + choices=None, required=False, help=None, metavar=None): + _option_strings = [] + for opt in option_strings: + _option_strings.append(opt) + if opt.startswith("--"): + _option_strings.append("--no-" + opt[2:]) + super().__init__(option_strings=_option_strings, dest=dest, nargs=0, + default=default, type=type, choices=choices, + required=required, help=help, metavar=metavar) + + def __call__(self, parser, namespace, values, option_string=None): + setattr(namespace, self.dest, not option_string.startswith("--no-")) + + argparse.BooleanOptionalAction = _BooleanOptionalAction from typing import List, Dict, Any, Optional, Tuple from ultralytics import YOLO from seg import init_models @@ -20,6 +38,8 @@ import importlib import tempfile import subprocess +_REPO_ROOT = Path(__file__).resolve().parent + try: import pyzed.sl as sl ZED_AVAILABLE = True @@ -936,7 +956,7 @@ def classify_pointcloud_array(classifier: torch.nn.Module, points: np.ndarray, c possible_paths = [ Path(__file__).parent / "pointcloud_classifier" / "Pointnet_Pointnet2_pytorch" / "test_classification.py", Path(__file__).parent.parent / "pointcloud_classifier" / "Pointnet_Pointnet2_pytorch" / "test_classification.py", - Path("/home/ubuntu/projects/FishMeasure/pointcloud_classifier/Pointnet_Pointnet2_pytorch/test_classification.py"), + _REPO_ROOT / "pointcloud_classifier" / "Pointnet_Pointnet2_pytorch" / "test_classification.py", ] test_classification_path = None @@ -2132,7 +2152,7 @@ def main(): help="With --batch-svo-folder: find *.svo2 in all subfolders (output dirs use path like parent__child__file)", ) parser.add_argument("--yolo-model", - default="/home/ubuntu/projects/FishMeasure/runs/train/fish_detection_20251127_104658/weights/best.pt", + default=str(_REPO_ROOT / "runs/train/fish_detection_20251127_104658/weights/best.pt"), help="YOLO model path") parser.add_argument("--conf", type=float, default=0.25, help="Confidence threshold") parser.add_argument("--imgsz", type=int, default=640, help="Image size") @@ -2176,7 +2196,7 @@ def main(): parser.add_argument("--run-weight-estimation", action="store_true", help="After processing, run DGCNN weight estimation on saved point clouds (test_dgcnn_weight_estimator)") parser.add_argument("--weight-estimator-checkpoint", type=str, - default="/home/ubuntu/projects/FishMeasure/weight_estimator/runs/dgcnn_20260312_171043/best.pt", + default=str(_REPO_ROOT / "weight_estimator/runs/dgcnn_20260312_171043/best.pt"), help="Path to DGCNN weight estimator checkpoint (.pt)") parser.add_argument("--weight-topk-length", type=int, default=3, help="Optional: length-weighted top-K for predict_cloud_folder (default: 3; set 0 to disable)") diff --git a/FishMeasure/generate_video_with_labels.py b/FishMeasure/generate_video_with_labels.py index 539a02c..4b326fa 100644 --- a/FishMeasure/generate_video_with_labels.py +++ b/FishMeasure/generate_video_with_labels.py @@ -167,22 +167,41 @@ def generate_video( imgsz: int = 640, frame_stride: int = 1, show_large: bool = False, + summary_weight_g: Optional[float] = None, + summary_length_mm: Optional[float] = None, + summary_star: bool = False, + output_video_name: Optional[str] = None, + sam_device: str = "cuda", ) -> Optional[Path]: if not ZED_AVAILABLE: print("ERROR: pyzed not available, cannot generate labeled video") return None - per_frame, summary_wg, summary_lmm, is_confident = _parse_weight_json(weight_json) - star_s = " *" if is_confident else "" + per_frame, parsed_summary_wg, parsed_summary_lmm, raw_confident = _parse_weight_json(weight_json) + if summary_weight_g is None: + summary_weight_g = parsed_summary_wg + if summary_length_mm is None: + summary_length_mm = parsed_summary_lmm + star_s = " *" if summary_star else "" print(f" Per-frame predictions: {len(per_frame)} PLYs mapped") - print(f" Summary: weight={summary_wg}g, length={summary_lmm}mm{star_s}") + print( + f" Summary: weight={summary_weight_g}g, length={summary_length_mm}mm{star_s} " + f"(raw_confident={raw_confident})" + ) - if not per_frame and summary_wg is None: + if not per_frame and summary_weight_g is None: print(" WARNING: No weight data in JSON, video will show '--'") from ultralytics import YOLO yolo = YOLO(yolo_model_path) class_names = yolo.names if hasattr(yolo, "names") else {} + from fish_video_weight_evaluation import ( + create_segmentation_overlay, + load_sam_predictor_with_fallback, + segment_with_sam, + ) + sam_predictor, eff_sam_device = load_sam_predictor_with_fallback(sam_device) + sam_torch_device = eff_sam_device from dataset.zed_reader import ZEDReader zed_reader = ZEDReader(svo_path=str(svo_path), camera_mode=False, use_yolo_detector=False) @@ -218,6 +237,7 @@ def generate_video( continue frame_number = idx + 1 + frame_name = f"frame_{frame_number:06d}" if frame_number in per_frame: cur_wg, cur_lmm = per_frame[frame_number] last_wg = cur_wg @@ -230,6 +250,7 @@ def generate_video( num_dets = len(results.boxes) if results.boxes is not None else 0 left_disp = img.copy() + right_disp = img.copy() if num_dets > 0: boxes = results.boxes.xyxy.cpu().numpy() tids = (results.boxes.id.cpu().numpy().astype(int) @@ -245,19 +266,31 @@ def generate_video( cname = class_names.get(cid, "fish") _draw_label_on_box(left_disp, box, tid, cname, cur_wg, cur_lmm) - if show_large or summary_wg is not None: - _draw_large_summary(left_disp, summary_wg, summary_lmm, is_confident) + try: + masks = segment_with_sam(sam_predictor, img, boxes, sam_torch_device) + except Exception as e: + print(f" WARNING: SAM segmentation failed on {frame_name}: {e}") + masks = [] + + if masks: + right_disp = create_segmentation_overlay(img.copy(), masks) + cv2.putText(right_disp, "Segmentation", (10, right_disp.shape[0] - 20), + cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2, cv2.LINE_AA) + else: + cv2.putText(right_disp, "Segmentation (failed)", (10, right_disp.shape[0] - 20), + cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2, cv2.LINE_AA) + else: + cv2.putText(right_disp, "No detections", (10, right_disp.shape[0] - 20), + cv2.FONT_HERSHEY_SIMPLEX, 0.7, (128, 128, 128), 2, cv2.LINE_AA) + + if show_large or summary_weight_g is not None: + _draw_large_summary(left_disp, summary_weight_g, summary_length_mm, summary_star) - frame_name = f"frame_{frame_number:06d}" info = f"[{frame_number}] {frame_name} | Detections: {num_dets}" cv2.putText(left_disp, info, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2, cv2.LINE_AA) cv2.putText(left_disp, "Detection", (10, left_disp.shape[0] - 20), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2, cv2.LINE_AA) - right_disp = img.copy() - cv2.putText(right_disp, "Original", (10, right_disp.shape[0] - 20), - cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2, cv2.LINE_AA) - combined = np.hstack([left_disp, right_disp]) if num_dets > 0: frames.append(combined) @@ -275,7 +308,7 @@ def generate_video( print(f" WARNING: No detection frames collected from {svo_name}") return None - video_path = images_dir / f"{svo_name}_preview.mp4" + video_path = images_dir / (output_video_name or f"{svo_name}_preview.mp4") h, w = frames[0].shape[:2] writer = cv2.VideoWriter(str(video_path), cv2.VideoWriter_fourcc(*"mp4v"), 10.0, (w, h)) for f in frames: @@ -295,7 +328,17 @@ def main(): parser.add_argument("--conf", type=float, default=0.25) parser.add_argument("--imgsz", type=int, default=640) parser.add_argument("--frame-stride", type=int, default=1) + parser.add_argument("--sam-device", type=str, default="cuda") parser.add_argument("--show-large-labels-at-top-right", action="store_true") + parser.add_argument( + "--summary-star", + action=argparse.BooleanOptionalAction, + default=False, + help="Whether to draw * on the Final summary line; caller/DB is the source of truth.", + ) + parser.add_argument("--summary-weight-g", type=float, default=None) + parser.add_argument("--summary-length-mm", type=float, default=None) + parser.add_argument("--output-video-name", type=str, default=None) args = parser.parse_args() svo = Path(args.svo).expanduser().resolve() @@ -315,7 +358,12 @@ def main(): conf=args.conf, imgsz=args.imgsz, frame_stride=args.frame_stride, + sam_device=args.sam_device, show_large=args.show_large_labels_at_top_right, + summary_weight_g=args.summary_weight_g, + summary_length_mm=args.summary_length_mm, + summary_star=bool(args.summary_star), + output_video_name=args.output_video_name, ) diff --git a/FishMeasure/predict_weigth_from_svo2.py b/FishMeasure/predict_weigth_from_svo2.py index a039cb8..48a178b 100644 --- a/FishMeasure/predict_weigth_from_svo2.py +++ b/FishMeasure/predict_weigth_from_svo2.py @@ -25,6 +25,24 @@ import subprocess import sys import math from pathlib import Path + +if not hasattr(argparse, "BooleanOptionalAction"): + class _BooleanOptionalAction(argparse.Action): + def __init__(self, option_strings, dest, default=None, type=None, + choices=None, required=False, help=None, metavar=None): + _option_strings = [] + for opt in option_strings: + _option_strings.append(opt) + if opt.startswith("--"): + _option_strings.append("--no-" + opt[2:]) + super().__init__(option_strings=_option_strings, dest=dest, nargs=0, + default=default, type=type, choices=choices, + required=required, help=help, metavar=metavar) + + def __call__(self, parser, namespace, values, option_string=None): + setattr(namespace, self.dest, not option_string.startswith("--no-")) + + argparse.BooleanOptionalAction = _BooleanOptionalAction from typing import Any, Dict, List, Optional, Tuple import torch @@ -136,11 +154,14 @@ def _run_generate_video_with_labels_subprocess( "--conf", str(args.conf), "--imgsz", str(args.imgsz), "--frame-stride", str(args.frame_stride), + "--sam-device", str(args.sam_device), "--weight-json", str(weight_json.expanduser().resolve()), ] if getattr(args, "show_large_labels_at_top_right", False): cmd.append("--show-large-labels-at-top-right") + if getattr(args, "summary_star", False): + cmd.append("--summary-star") print(f"Invoking generate_video_with_labels.py:\n {' '.join(cmd)}") proc = subprocess.run(cmd, cwd=str(REPO_ROOT)) @@ -505,7 +526,7 @@ def main() -> None: parser.add_argument( "--yolo-model", type=str, - default="/home/ubuntu/projects/FishMeasure/runs/train/fish_detection_20251127_104658/weights/best.pt", + default=str(REPO_ROOT / "runs/train/fish_detection_20251127_104658/weights/best.pt"), ) parser.add_argument( "--conf", @@ -622,6 +643,12 @@ def main() -> None: action="store_true", help="Show large weight/length labels (10x font) at top right corner for real/camera generated videos.", ) + parser.add_argument( + "--summary-star", + action=argparse.BooleanOptionalAction, + default=False, + help="Pass to generate_video_with_labels: whether the summary line should draw *.", + ) args = parser.parse_args() if args.frame_stride < 1: diff --git a/FishMeasure/runs/train/fish_detection_20251127_104658/F1_curve.png b/FishMeasure/runs/train/fish_detection_20251127_104658/F1_curve.png deleted file mode 100644 index 4078b2f..0000000 Binary files a/FishMeasure/runs/train/fish_detection_20251127_104658/F1_curve.png and /dev/null differ diff --git a/FishMeasure/runs/train/fish_detection_20251127_104658/PR_curve.png b/FishMeasure/runs/train/fish_detection_20251127_104658/PR_curve.png deleted file mode 100644 index 608e97f..0000000 Binary files a/FishMeasure/runs/train/fish_detection_20251127_104658/PR_curve.png and /dev/null differ diff --git a/FishMeasure/runs/train/fish_detection_20251127_104658/P_curve.png b/FishMeasure/runs/train/fish_detection_20251127_104658/P_curve.png deleted file mode 100644 index bd1a3d4..0000000 Binary files a/FishMeasure/runs/train/fish_detection_20251127_104658/P_curve.png and /dev/null differ diff --git a/FishMeasure/runs/train/fish_detection_20251127_104658/R_curve.png b/FishMeasure/runs/train/fish_detection_20251127_104658/R_curve.png deleted file mode 100644 index 54c5f50..0000000 Binary files a/FishMeasure/runs/train/fish_detection_20251127_104658/R_curve.png and /dev/null differ diff --git a/FishMeasure/runs/train/fish_detection_20251127_104658/args.yaml b/FishMeasure/runs/train/fish_detection_20251127_104658/args.yaml deleted file mode 100644 index e00bcbf..0000000 --- a/FishMeasure/runs/train/fish_detection_20251127_104658/args.yaml +++ /dev/null @@ -1,106 +0,0 @@ -task: detect -mode: train -model: yolov8n.pt -data: ./yolo_dataset/dataset.yaml -epochs: 100 -time: null -patience: 50 -batch: 96 -imgsz: 640 -save: true -save_period: -1 -cache: false -device: null -workers: 8 -project: runs/train -name: fish_detection_20251127_104658 -exist_ok: false -pretrained: false -optimizer: auto -verbose: true -seed: 0 -deterministic: true -single_cls: false -rect: false -cos_lr: false -close_mosaic: 10 -resume: false -amp: true -fraction: 1.0 -profile: false -freeze: null -multi_scale: false -overlap_mask: true -mask_ratio: 4 -dropout: 0.0 -val: true -split: val -save_json: false -save_hybrid: false -conf: null -iou: 0.7 -max_det: 300 -half: false -dnn: false -plots: true -source: null -vid_stride: 1 -stream_buffer: false -visualize: false -augment: false -agnostic_nms: false -classes: null -retina_masks: false -embed: null -show: false -save_frames: false -save_txt: false -save_conf: false -save_crop: false -show_labels: true -show_conf: true -show_boxes: true -line_width: null -format: torchscript -keras: false -optimize: false -int8: false -dynamic: false -simplify: true -opset: null -workspace: null -nms: false -lr0: 0.01 -lrf: 0.01 -momentum: 0.937 -weight_decay: 0.0005 -warmup_epochs: 3.0 -warmup_momentum: 0.8 -warmup_bias_lr: 0.1 -box: 1.75 -cls: 0.5 -dfl: 1.5 -pose: 12.0 -kobj: 1.0 -nbs: 64 -hsv_h: 0.015 -hsv_s: 0.7 -hsv_v: 0.4 -degrees: 10.0 -translate: 0.2 -scale: 0.5 -shear: 2.0 -perspective: 0.0 -flipud: 0.1 -fliplr: 0.5 -bgr: 0.0 -mosaic: 1.0 -mixup: 0.2 -copy_paste: 0.3 -copy_paste_mode: flip -auto_augment: randaugment -erasing: 0.4 -crop_fraction: 1.0 -cfg: null -tracker: botsort.yaml -save_dir: runs/train/fish_detection_20251127_104658 diff --git a/FishMeasure/runs/train/fish_detection_20251127_104658/confusion_matrix.png b/FishMeasure/runs/train/fish_detection_20251127_104658/confusion_matrix.png deleted file mode 100644 index 1a15ed8..0000000 Binary files a/FishMeasure/runs/train/fish_detection_20251127_104658/confusion_matrix.png and /dev/null differ diff --git a/FishMeasure/runs/train/fish_detection_20251127_104658/confusion_matrix_normalized.png b/FishMeasure/runs/train/fish_detection_20251127_104658/confusion_matrix_normalized.png deleted file mode 100644 index e6fdc6a..0000000 Binary files a/FishMeasure/runs/train/fish_detection_20251127_104658/confusion_matrix_normalized.png and /dev/null differ diff --git a/FishMeasure/runs/train/fish_detection_20251127_104658/events.out.tfevents.1764211621.svap-master.979906.0 b/FishMeasure/runs/train/fish_detection_20251127_104658/events.out.tfevents.1764211621.svap-master.979906.0 deleted file mode 100644 index 85bdec3..0000000 Binary files a/FishMeasure/runs/train/fish_detection_20251127_104658/events.out.tfevents.1764211621.svap-master.979906.0 and /dev/null differ diff --git a/FishMeasure/runs/train/fish_detection_20251127_104658/labels.jpg b/FishMeasure/runs/train/fish_detection_20251127_104658/labels.jpg deleted file mode 100644 index 2fb0236..0000000 Binary files a/FishMeasure/runs/train/fish_detection_20251127_104658/labels.jpg and /dev/null differ diff --git a/FishMeasure/runs/train/fish_detection_20251127_104658/labels_correlogram.jpg b/FishMeasure/runs/train/fish_detection_20251127_104658/labels_correlogram.jpg deleted file mode 100644 index ce8161b..0000000 Binary files a/FishMeasure/runs/train/fish_detection_20251127_104658/labels_correlogram.jpg and /dev/null differ diff --git a/FishMeasure/runs/train/fish_detection_20251127_104658/results.csv b/FishMeasure/runs/train/fish_detection_20251127_104658/results.csv deleted file mode 100644 index 68f34e5..0000000 --- a/FishMeasure/runs/train/fish_detection_20251127_104658/results.csv +++ /dev/null @@ -1,101 +0,0 @@ -epoch,time,train/box_loss,train/cls_loss,train/dfl_loss,metrics/precision(B),metrics/recall(B),metrics/mAP50(B),metrics/mAP50-95(B),val/box_loss,val/cls_loss,val/dfl_loss,lr/pg0,lr/pg1,lr/pg2 -1,10.3904,0.39855,2.68593,1.4848,1,0.29733,0.96537,0.56943,0.33362,2.56495,1.02233,0.00038,0.00038,0.00038 -2,17.5483,0.30028,1.35659,1.12817,1,0.49396,0.95678,0.49421,0.35142,2.66289,1.12329,0.000772278,0.000772278,0.000772278 -3,25.1797,0.29352,1.17269,1.13078,0.63354,0.70588,0.71908,0.35996,0.3706,1.94815,1.15705,0.00115664,0.00115664,0.00115664 -4,31.2751,0.29379,1.05877,1.14652,0.92901,0.76994,0.8997,0.51519,0.34598,1.68774,1.25469,0.00153307,0.00153307,0.00153307 -5,38.2345,0.30129,1.01013,1.16273,1,0.05809,0.11957,0.0573,0.56052,3.3705,2.25768,0.00190159,0.00190159,0.00190159 -6,44.1377,0.29631,0.9286,1.17263,0.77814,0.72549,0.77275,0.49905,0.30463,2.21664,1.22409,0.001901,0.001901,0.001901 -7,51.7031,0.28969,0.82499,1.16157,0.81899,0.82353,0.86643,0.50327,0.30846,1.06156,1.23743,0.0018812,0.0018812,0.0018812 -8,57.6974,0.28756,0.79748,1.16646,0.97759,0.85543,0.93703,0.54512,0.30945,1.3021,1.15419,0.0018614,0.0018614,0.0018614 -9,64.6096,0.28988,0.78404,1.16845,0.87355,0.54195,0.61651,0.44007,0.28541,1.8088,1.19646,0.0018416,0.0018416,0.0018416 -10,70.8722,0.28509,0.74823,1.17001,1,0.97714,0.9879,0.66339,0.27556,0.67389,1.10534,0.0018218,0.0018218,0.0018218 -11,77.6729,0.2811,0.71605,1.16472,0.97131,1,0.99327,0.68686,0.26039,0.53918,1.12841,0.001802,0.001802,0.001802 -12,83.6789,0.27956,0.71366,1.17215,0.94349,0.7451,0.81617,0.36975,0.44564,1.10381,1.5902,0.0017822,0.0017822,0.0017822 -13,90.5196,0.27954,0.70341,1.16105,0.98977,0.98039,0.99395,0.70985,0.26287,0.71862,1.08777,0.0017624,0.0017624,0.0017624 -14,96.2279,0.27389,0.69109,1.16035,0.99776,0.98039,0.99481,0.68656,0.27765,0.65545,1.09127,0.0017426,0.0017426,0.0017426 -15,103.274,0.27484,0.69505,1.16409,0.94274,0.96857,0.98537,0.6647,0.2812,0.56622,1.11215,0.0017228,0.0017228,0.0017228 -16,109.28,0.27709,0.68435,1.17154,0.99813,0.98039,0.99462,0.71965,0.24753,0.50567,1.06086,0.001703,0.001703,0.001703 -17,116.354,0.27127,0.65515,1.15472,0.99011,0.96078,0.99405,0.68376,0.27062,0.59393,1.10243,0.0016832,0.0016832,0.0016832 -18,121.994,0.27291,0.65216,1.1543,0.97318,0.98039,0.99153,0.72137,0.24675,0.73777,1.04403,0.0016634,0.0016634,0.0016634 -19,128.471,0.27609,0.64481,1.1633,0.99856,1,0.995,0.71822,0.2506,0.44838,1.04764,0.0016436,0.0016436,0.0016436 -20,134.908,0.26641,0.63605,1.13566,0.97925,0.98039,0.99423,0.73946,0.22498,0.4882,1.01436,0.0016238,0.0016238,0.0016238 -21,141.659,0.27002,0.63276,1.15338,0.99622,1,0.995,0.73392,0.24781,0.6929,1.06027,0.001604,0.001604,0.001604 -22,147.596,0.27016,0.62133,1.15598,1,0.71878,0.7985,0.38994,0.43809,1.15961,1.45966,0.0015842,0.0015842,0.0015842 -23,154.606,0.26856,0.63903,1.14822,0.99006,0.96078,0.98976,0.73202,0.23704,0.89525,1.02748,0.0015644,0.0015644,0.0015644 -24,160.874,0.26556,0.61805,1.14759,0.52381,0.21569,0.37119,0.23923,0.48324,4.161,1.79371,0.0015446,0.0015446,0.0015446 -25,167.63,0.26283,0.62175,1.14417,0.99771,0.98039,0.99481,0.54043,0.36833,0.62126,1.2961,0.0015248,0.0015248,0.0015248 -26,172.972,0.26277,0.62184,1.14693,0.97588,0.79352,0.838,0.60246,0.26723,1.16324,1.14688,0.001505,0.001505,0.001505 -27,180.886,0.25845,0.58391,1.1274,1,0.97466,0.99444,0.7358,0.23001,0.43927,1.03289,0.0014852,0.0014852,0.0014852 -28,186.475,0.25736,0.59062,1.13027,0.99938,1,0.995,0.77511,0.22035,0.41332,1.01175,0.0014654,0.0014654,0.0014654 -29,193.178,0.25997,0.59153,1.13884,0.99623,1,0.995,0.72041,0.2431,0.43747,1.04154,0.0014456,0.0014456,0.0014456 -30,198.402,0.26318,0.59378,1.14352,0.99588,0.98039,0.99086,0.71822,0.23253,0.59404,1.04641,0.0014258,0.0014258,0.0014258 -31,204.755,0.25794,0.57997,1.12954,0.99758,1,0.995,0.65546,0.2929,0.53569,1.17732,0.001406,0.001406,0.001406 -32,210.705,0.25981,0.59023,1.14197,0.99123,0.96078,0.97752,0.7113,0.24951,0.53056,1.04465,0.0013862,0.0013862,0.0013862 -33,217.738,0.25927,0.58286,1.1443,0.99828,1,0.995,0.76497,0.21878,0.37529,1.00619,0.0013664,0.0013664,0.0013664 -34,223.275,0.25433,0.55825,1.11915,0.98879,0.84314,0.93602,0.58228,0.34063,0.70089,1.23218,0.0013466,0.0013466,0.0013466 -35,230.18,0.25841,0.5804,1.13861,0.99727,0.98039,0.9925,0.75134,0.21602,0.45677,1.00342,0.0013268,0.0013268,0.0013268 -36,236.28,0.2532,0.57095,1.13238,0.9975,0.98039,0.99093,0.74316,0.24353,0.43215,1.05353,0.001307,0.001307,0.001307 -37,243.19,0.25301,0.57535,1.13453,1,0.99155,0.995,0.72838,0.24188,0.67456,1.0605,0.0012872,0.0012872,0.0012872 -38,249.356,0.25509,0.56332,1.13272,1,0.95882,0.99102,0.74599,0.21894,0.44255,1.00044,0.0012674,0.0012674,0.0012674 -39,256.308,0.25554,0.5739,1.13027,0.99595,1,0.995,0.75996,0.22547,0.50187,1.01792,0.0012476,0.0012476,0.0012476 -40,262.306,0.25143,0.55879,1.12012,0.99711,1,0.995,0.76673,0.21535,0.46243,1.00072,0.0012278,0.0012278,0.0012278 -41,268.917,0.2534,0.56009,1.12077,0.99082,1,0.995,0.73951,0.23347,0.66055,1.02986,0.001208,0.001208,0.001208 -42,274.828,0.2489,0.5661,1.11458,1,0.91885,0.98684,0.72466,0.22931,0.53634,1.02747,0.0011882,0.0011882,0.0011882 -43,281.555,0.25301,0.55236,1.13143,0.99738,1,0.995,0.74646,0.2298,0.49582,1.02689,0.0011684,0.0011684,0.0011684 -44,287.74,0.2486,0.53306,1.11907,1,0.97736,0.99364,0.73858,0.23603,0.49003,1.03041,0.0011486,0.0011486,0.0011486 -45,294.836,0.24944,0.54158,1.1268,0.99062,1,0.995,0.75756,0.23048,0.60132,1.04024,0.0011288,0.0011288,0.0011288 -46,301.451,0.24913,0.53379,1.1269,0.99596,0.98039,0.99048,0.77386,0.20906,0.54314,0.99925,0.001109,0.001109,0.001109 -47,308.242,0.24957,0.53985,1.12517,0.99593,1,0.995,0.73996,0.23001,0.52276,1.02912,0.0010892,0.0010892,0.0010892 -48,314.344,0.24351,0.53626,1.10608,1,0.99956,0.995,0.74934,0.23199,0.39499,1.03412,0.0010694,0.0010694,0.0010694 -49,321.241,0.24759,0.53893,1.11816,1,0.99844,0.995,0.74604,0.22845,0.39064,1.02153,0.0010496,0.0010496,0.0010496 -50,327.139,0.2464,0.53694,1.1138,0.99835,0.98039,0.99048,0.71125,0.24541,0.42938,1.07937,0.0010298,0.0010298,0.0010298 -51,333.821,0.24154,0.51898,1.10733,0.99567,0.90196,0.97517,0.62595,0.29596,0.61014,1.14221,0.00101,0.00101,0.00101 -52,339.882,0.24658,0.53043,1.11979,0.99546,0.98039,0.99323,0.65978,0.27159,0.52309,1.13272,0.0009902,0.0009902,0.0009902 -53,346.689,0.24636,0.52582,1.11253,0.99778,0.98039,0.99481,0.69862,0.26351,0.45512,1.08185,0.0009704,0.0009704,0.0009704 -54,352.945,0.24152,0.51738,1.10331,0.99739,1,0.995,0.72452,0.24969,0.40628,1.06735,0.0009506,0.0009506,0.0009506 -55,359.178,0.24263,0.50863,1.10604,0.99683,1,0.995,0.77739,0.21597,0.42763,1.01173,0.0009308,0.0009308,0.0009308 -56,365.965,0.24373,0.51169,1.10391,0.95858,0.90771,0.97651,0.63893,0.29309,0.63256,1.16514,0.000911,0.000911,0.000911 -57,372.475,0.24282,0.51188,1.10565,0.9915,0.98039,0.99054,0.75734,0.21952,0.56379,1.01633,0.0008912,0.0008912,0.0008912 -58,379.602,0.23912,0.50781,1.09877,1,0.95988,0.98778,0.72449,0.24914,0.46787,1.05661,0.0008714,0.0008714,0.0008714 -59,385.13,0.24179,0.51076,1.10568,1,0.95731,0.98289,0.71162,0.23833,0.6395,1.04927,0.0008516,0.0008516,0.0008516 -60,392.571,0.23757,0.50074,1.09636,0.99702,0.98039,0.99261,0.76777,0.20872,0.47718,0.99627,0.0008318,0.0008318,0.0008318 -61,397.864,0.23342,0.49493,1.09485,0.99568,0.98039,0.9931,0.76937,0.20784,0.45567,1.01043,0.000812,0.000812,0.000812 -62,405.506,0.23929,0.49675,1.10508,0.9953,0.96078,0.9827,0.72038,0.24622,0.5664,1.07435,0.0007922,0.0007922,0.0007922 -63,411.243,0.2385,0.49568,1.09486,0.99783,0.98039,0.99086,0.76087,0.22259,0.49003,1.01614,0.0007724,0.0007724,0.0007724 -64,418.044,0.23775,0.49129,1.08891,0.99544,0.98039,0.99054,0.77224,0.21441,0.51803,1.00319,0.0007526,0.0007526,0.0007526 -65,423.941,0.23656,0.49423,1.09174,0.97965,0.94412,0.98937,0.76534,0.21473,0.53841,0.99562,0.0007328,0.0007328,0.0007328 -66,430.931,0.23491,0.49057,1.08267,0.99223,0.98039,0.98955,0.76326,0.21453,0.64708,0.99581,0.000713,0.000713,0.000713 -67,436.831,0.23603,0.4875,1.08431,0.99649,0.98039,0.99427,0.76543,0.21578,0.47111,0.99195,0.0006932,0.0006932,0.0006932 -68,443.698,0.23371,0.48684,1.08189,1,0.97435,0.99379,0.77164,0.215,0.53772,0.99543,0.0006734,0.0006734,0.0006734 -69,449.871,0.23341,0.48312,1.0883,0.99289,0.98039,0.99379,0.78205,0.20299,0.46992,0.97694,0.0006536,0.0006536,0.0006536 -70,456.649,0.2335,0.48756,1.08284,0.99768,0.98039,0.9931,0.76909,0.21567,0.3792,0.99144,0.0006338,0.0006338,0.0006338 -71,462.607,0.22882,0.47871,1.07799,0.99471,0.98039,0.991,0.76463,0.22008,0.44229,1.00736,0.000614,0.000614,0.000614 -72,469.768,0.23333,0.48255,1.08936,0.99842,0.92157,0.95471,0.6835,0.25689,0.5887,1.10088,0.0005942,0.0005942,0.0005942 -73,476.152,0.22814,0.47147,1.07796,0.99695,0.96078,0.99179,0.75486,0.22212,0.48941,1.01634,0.0005744,0.0005744,0.0005744 -74,483.377,0.22914,0.47235,1.08384,1,0.97669,0.9901,0.77219,0.2153,0.39275,0.99558,0.0005546,0.0005546,0.0005546 -75,488.716,0.22821,0.47539,1.08204,0.99868,1,0.995,0.74383,0.23322,0.39742,1.03818,0.0005348,0.0005348,0.0005348 -76,496.052,0.22693,0.47841,1.0774,0.9966,0.98039,0.99427,0.77597,0.22186,0.42881,1.00906,0.000515,0.000515,0.000515 -77,501.774,0.2272,0.46429,1.07859,0.99656,0.98039,0.9925,0.7684,0.21909,0.4223,1.00152,0.0004952,0.0004952,0.0004952 -78,508.605,0.22715,0.46817,1.08007,1,0.99833,0.995,0.77597,0.20663,0.39658,0.98381,0.0004754,0.0004754,0.0004754 -79,514.483,0.22845,0.46984,1.08602,0.99476,0.98039,0.99297,0.76487,0.22367,0.53976,1.01338,0.0004556,0.0004556,0.0004556 -80,521.104,0.22489,0.45308,1.07029,0.99378,0.98039,0.99364,0.76591,0.22644,0.44335,1.01611,0.0004358,0.0004358,0.0004358 -81,526.831,0.22808,0.46696,1.08249,0.9984,0.98039,0.99043,0.76392,0.22464,0.40558,1.00919,0.000416,0.000416,0.000416 -82,534.261,0.22461,0.45758,1.07345,0.99706,0.98039,0.9918,0.76203,0.22683,0.37658,1.02904,0.0003962,0.0003962,0.0003962 -83,539.83,0.2224,0.45574,1.06465,0.9966,0.96078,0.98809,0.77183,0.22791,0.58994,1.00929,0.0003764,0.0003764,0.0003764 -84,547.068,0.22321,0.45489,1.07012,0.99534,0.98039,0.99218,0.77586,0.21211,0.4875,0.99631,0.0003566,0.0003566,0.0003566 -85,553.422,0.22324,0.44994,1.07037,0.99705,0.98039,0.99122,0.77203,0.21439,0.42988,1.00439,0.0003368,0.0003368,0.0003368 -86,560.244,0.22593,0.46254,1.07436,0.99551,0.98039,0.99323,0.7568,0.21436,0.39946,1.00288,0.000317,0.000317,0.000317 -87,566.207,0.22535,0.45462,1.07467,0.99768,0.98039,0.99427,0.77229,0.20736,0.36008,0.99378,0.0002972,0.0002972,0.0002972 -88,573.201,0.22031,0.44716,1.0635,0.99697,0.98039,0.99481,0.76489,0.21791,0.38311,1.01245,0.0002774,0.0002774,0.0002774 -89,579.084,0.22289,0.44359,1.06804,0.99621,0.98039,0.99162,0.77105,0.21459,0.41661,0.99051,0.0002576,0.0002576,0.0002576 -90,585.517,0.2213,0.4519,1.07424,0.99675,0.98039,0.99122,0.76516,0.21041,0.42833,0.99584,0.0002378,0.0002378,0.0002378 -91,597.175,0.19679,0.38958,1.03361,0.99618,0.98039,0.9925,0.76415,0.21165,0.41842,0.98503,0.000218,0.000218,0.000218 -92,603.128,0.19341,0.33723,1.01494,0.99613,0.98039,0.99285,0.76376,0.21456,0.42319,0.99363,0.0001982,0.0001982,0.0001982 -93,609.58,0.19245,0.34007,1.02201,0.99697,0.98039,0.99229,0.76732,0.21373,0.38196,0.99964,0.0001784,0.0001784,0.0001784 -94,614.558,0.18975,0.33377,1.01562,0.99702,0.98039,0.99285,0.76477,0.21757,0.39742,0.9974,0.0001586,0.0001586,0.0001586 -95,620.215,0.18952,0.32777,1.01532,0.9962,0.98039,0.99107,0.7668,0.21387,0.44539,1.0045,0.0001388,0.0001388,0.0001388 -96,625.105,0.1894,0.32969,1.01349,0.99492,0.98039,0.9913,0.76397,0.21124,0.43134,0.99738,0.000119,0.000119,0.000119 -97,630.785,0.18892,0.32829,1.00956,0.99236,0.98039,0.9908,0.7635,0.21104,0.4798,0.99472,9.92e-05,9.92e-05,9.92e-05 -98,636.519,0.18811,0.32653,1.00609,0.99603,0.98039,0.99261,0.77306,0.21321,0.40139,0.99426,7.94e-05,7.94e-05,7.94e-05 -99,642.554,0.1863,0.32067,1.00634,0.99516,0.98039,0.9935,0.75723,0.21665,0.41315,1.00323,5.96e-05,5.96e-05,5.96e-05 -100,647.259,0.18644,0.31964,1.00625,0.99453,0.98039,0.99297,0.76087,0.21668,0.43148,1.00482,3.98e-05,3.98e-05,3.98e-05 diff --git a/FishMeasure/runs/train/fish_detection_20251127_104658/results.png b/FishMeasure/runs/train/fish_detection_20251127_104658/results.png deleted file mode 100644 index 7a204f6..0000000 Binary files a/FishMeasure/runs/train/fish_detection_20251127_104658/results.png and /dev/null differ diff --git a/FishMeasure/runs/train/fish_detection_20251127_104658/train_batch0.jpg b/FishMeasure/runs/train/fish_detection_20251127_104658/train_batch0.jpg deleted file mode 100644 index 6e29b85..0000000 Binary files a/FishMeasure/runs/train/fish_detection_20251127_104658/train_batch0.jpg and /dev/null differ diff --git a/FishMeasure/runs/train/fish_detection_20251127_104658/train_batch1.jpg b/FishMeasure/runs/train/fish_detection_20251127_104658/train_batch1.jpg deleted file mode 100644 index d47aae3..0000000 Binary files a/FishMeasure/runs/train/fish_detection_20251127_104658/train_batch1.jpg and /dev/null differ diff --git a/FishMeasure/runs/train/fish_detection_20251127_104658/train_batch1800.jpg b/FishMeasure/runs/train/fish_detection_20251127_104658/train_batch1800.jpg deleted file mode 100644 index 8ac9713..0000000 Binary files a/FishMeasure/runs/train/fish_detection_20251127_104658/train_batch1800.jpg and /dev/null differ diff --git a/FishMeasure/runs/train/fish_detection_20251127_104658/train_batch1801.jpg b/FishMeasure/runs/train/fish_detection_20251127_104658/train_batch1801.jpg deleted file mode 100644 index 84bdfd9..0000000 Binary files a/FishMeasure/runs/train/fish_detection_20251127_104658/train_batch1801.jpg and /dev/null differ diff --git a/FishMeasure/runs/train/fish_detection_20251127_104658/train_batch1802.jpg b/FishMeasure/runs/train/fish_detection_20251127_104658/train_batch1802.jpg deleted file mode 100644 index fda43c1..0000000 Binary files a/FishMeasure/runs/train/fish_detection_20251127_104658/train_batch1802.jpg and /dev/null differ diff --git a/FishMeasure/runs/train/fish_detection_20251127_104658/train_batch2.jpg b/FishMeasure/runs/train/fish_detection_20251127_104658/train_batch2.jpg deleted file mode 100644 index ef21dc9..0000000 Binary files a/FishMeasure/runs/train/fish_detection_20251127_104658/train_batch2.jpg and /dev/null differ diff --git a/FishMeasure/runs/train/fish_detection_20251127_104658/val_batch0_labels.jpg b/FishMeasure/runs/train/fish_detection_20251127_104658/val_batch0_labels.jpg deleted file mode 100644 index 3ec747a..0000000 Binary files a/FishMeasure/runs/train/fish_detection_20251127_104658/val_batch0_labels.jpg and /dev/null differ diff --git a/FishMeasure/runs/train/fish_detection_20251127_104658/val_batch0_pred.jpg b/FishMeasure/runs/train/fish_detection_20251127_104658/val_batch0_pred.jpg deleted file mode 100644 index be38b81..0000000 Binary files a/FishMeasure/runs/train/fish_detection_20251127_104658/val_batch0_pred.jpg and /dev/null differ diff --git a/FishMeasure/runs/train/fish_detection_20251127_104658/weights/last.pt b/FishMeasure/runs/train/fish_detection_20251127_104658/weights/last.pt deleted file mode 100644 index b132f28..0000000 Binary files a/FishMeasure/runs/train/fish_detection_20251127_104658/weights/last.pt and /dev/null differ diff --git a/FishMeasure/weight_estimator/test_dgcnn_weight_estimator.py b/FishMeasure/weight_estimator/test_dgcnn_weight_estimator.py index 12ff0d5..1e50eee 100644 --- a/FishMeasure/weight_estimator/test_dgcnn_weight_estimator.py +++ b/FishMeasure/weight_estimator/test_dgcnn_weight_estimator.py @@ -30,6 +30,24 @@ import re import sys import zlib from pathlib import Path + +if not hasattr(argparse, "BooleanOptionalAction"): + class _BooleanOptionalAction(argparse.Action): + def __init__(self, option_strings, dest, default=None, type=None, + choices=None, required=False, help=None, metavar=None): + _option_strings = [] + for opt in option_strings: + _option_strings.append(opt) + if opt.startswith("--"): + _option_strings.append("--no-" + opt[2:]) + super().__init__(option_strings=_option_strings, dest=dest, nargs=0, + default=default, type=type, choices=choices, + required=required, help=help, metavar=metavar) + + def __call__(self, parser, namespace, values, option_string=None): + setattr(namespace, self.dest, not option_string.startswith("--no-")) + + argparse.BooleanOptionalAction = _BooleanOptionalAction from typing import Any, Dict, List, Optional, Tuple import numpy as np diff --git a/docs/conda-deploy.md b/docs/conda-deploy.md new file mode 100644 index 0000000..26970bf --- /dev/null +++ b/docs/conda-deploy.md @@ -0,0 +1,130 @@ +# 使用 Conda 单环境部署 FishServer(方案 A) + +本文说明在另一台 **Linux x86_64** 机器上,用 **一个 conda 环境** 安装 Fish API、FishMeasure、FishAction 所需依赖并启动服务。子进程与 uvicorn 共用同一 Python 解释器,**无需**在 `fish_api/.env` 中设置 `PYTHON_FISH_MEASURE` / `PYTHON_FISH_ACTION`(保持为空即可)。 + +--- + +## 前置条件 + +- 已克隆本仓库,目录中包含 `fish_api/`、`FishMeasure/`、`FishAction/`。 +- **模型权重**:将原机器上的 `models/` 及 `.env` 中引用的 checkpoint 拷贝到新机器;空目录会导致推理失败。 +- **GPU(可选)**:若使用 `SAM_DEVICE=cuda` 或 PyTorch CUDA,需安装与 PyTorch 版本匹配的 **NVIDIA 驱动与 CUDA**。 +- **ZED SDK(处理 `.svo2` 时)**:在新机器安装 **Stereolabs ZED SDK** 及 Python API,版本尽量与现有开发/生产环境一致。 +- **ffmpeg**:用于视频转码;可用系统包(`apt install ffmpeg` 等),或使用仓库内 `scripts/setup_ffmpeg.sh`(需将脚本中的 `PROJECT_ROOT` 改为本机仓库路径)。 + +--- + +## 1. 创建并激活 conda 环境 + +Fish API 要求 **Python ≥ 3.11**(见 `fish_api/pyproject.toml`)。 + +```bash +conda create -n fishserver python=3.11 -y +conda activate fishserver +``` + +后续所有 `pip`/`python` 命令均在该环境中执行。 + +--- + +## 2. 安装 Fish API(FastAPI 网关) + +```bash +cd /path/to/FishServer/fish_api +pip install -e . +``` + +可选开发依赖(本地冒烟测试等): + +```bash +pip install -e ".[dev]" +``` + +--- + +## 3. 在同一环境中安装 FishMeasure / FishAction 依赖 + +**方案 A 的核心**:测量与行为脚本由 API 以子进程调用,默认使用 **当前 `python`(与 uvicorn 相同)**,因此 PyTorch、OpenCV、ZED、X3D 等必须**全部**装进 **`fishserver`** 这一环境。 + +本仓库未提供单一的 `FishMeasure/requirements.txt`,请按以下方式之一对齐依赖: + +1. **从已有机器导出**(推荐):在已能跑通测量的环境中执行 `pip freeze > requirements-frozen.txt`,在新环境中用 `pip install -r requirements-frozen.txt`(注意需人工剔除/替换与平台、CUDA 不兼容的包)。 +2. **按模块 README 与脚本逐步安装**:参考 `FishMeasure/README.md`、各子目录说明,以及 `FishAction/predict_video_x3d_3class.py` 的实际 import;至少安装 `FishAction/requirements.txt` 中的依赖,并补齐 PyTorch、torchvision、与 ZED 文档匹配的包。 + +安装完成后,在 **`conda activate fishserver`** 下验证: + +```bash +python -c "import torch; print(torch.__version__)" +# 若使用 ZED:按 Stereolabs 文档验证 pyzed 等 +``` + +--- + +## 4. 配置 `fish_api/.env` + +从旧机器复制 `fish_api/.env` 到新机器,**全部路径改为本机绝对路径**,例如: + +- `PUBLIC_BASE_URL`:客户端可访问的 API 根地址,**不要**末尾 `/`。 +- `ACTION_WATCH_DIR`、`MEASURE_WATCH_DIR`:若启用目录监控,填本机目录。 +- `YOLO_MODEL`、`WEIGHT_CHECKPOINT`、`ACTION_CHECKPOINT` 等:指向本机真实文件。 +- `SAM_DEVICE`:`cuda` 或 `cpu`。 + +**方案 A 下不要填写** `PYTHON_FISH_MEASURE`、`PYTHON_FISH_ACTION`(留空),让子进程使用当前环境的 `python`。 + +--- + +## 5. 与 `uv` 的优先级 + +`fish_api/start_fresh.sh` 若检测到系统中有 `uv` 命令,会优先使用 `uv run`。若你希望**强制使用当前 conda 环境**: + +- 启动前执行 **`conda activate fishserver`**,并确认 `which python` / `which uvicorn` 指向 `fishserver`;或 +- 在不含 `uv` 的 PATH 下启动;或临时从 PATH 中移除 `uv`。 + +若未安装 `uv`,脚本会自动使用 `python3` 与 `uvicorn`(依赖已激活的 conda)。 + +--- + +## 6. 启动服务 + +在**仓库根目录**执行: + +```bash +conda activate fishserver +cd /path/to/FishServer +bash scripts/start_fresh.sh +``` + +默认监听 `0.0.0.0:8000`。可覆盖: + +```bash +HOST=0.0.0.0 PORT=8000 bash scripts/start_fresh.sh +``` + +可选环境变量(见 `fish_api/start_fresh.sh` 注释): + +- `CLEAR_SQLITE_DATABASE=1`:清空 SQLite(谨慎)。 +- `CLEAR_MEASURE_OUTPUT=1` / `CLEAR_ACTION_OUTPUT=1`:清空中间输出目录。 + +浏览器访问:`http://<主机>:8000/docs`。 + +--- + +## 7. 自检清单 + +| 检查项 | 说明 | +|--------|------| +| 环境 | `conda activate fishserver` 后 `python -V` 为 3.11+ | +| API 包 | 在 `fish_api` 目录下 `python -c "import app"` 无报错 | +| 可执行文件 | `which uvicorn` 指向 `fishserver` 的 `bin` | +| 权重 | `.env` 中模型路径在本机存在 | +| 测量(SVO) | ZED 与 PyTorch 在同一 conda 环境中可用 | +| ffmpeg | `ffmpeg -version` 成功(若走视频转码路径) | + +--- + +## 8. 参考 + +- 根目录 `README.md`、`fish_api/README.md`:环境变量表与启动说明。 +- `fish_api/start_fresh.sh`:启动前 `prestart_fresh` 与 uvicorn 参数。 + +若需将 FishMeasure / FishAction 拆到**独立 conda 环境**,请在 `fish_api/.env` 中填写 `PYTHON_FISH_MEASURE` 与 `PYTHON_FISH_ACTION`(不在本文「方案 A」范围内)。 diff --git a/fish_api/README.md b/fish_api/README.md index 7d93a11..50324a3 100644 --- a/fish_api/README.md +++ b/fish_api/README.md @@ -1,6 +1,6 @@ # Recalculate Everything: -CLEAR_SQLITE_DATABASE=1 CLEAR_MEASURE_OUTPUT=1 CLEAR_ACTION_OUTPUT=1 CLEAR_MEDIA=1 CLEAR_STREAM_TMP=1 bash scripts/start_fresh.sh +CLEAR_SQLITE_DATABASE=1 CLEAR_MEASURE_OUTPUT=1 CLEAR_ACTION_OUTPUT=1 CLEAR_MEDIA=1 CLEAR_STREAM_TMP=1 LD_PRELOAD=/lib/aarch64-linux-gnu/libGLdispatch.so.0 bash scripts/start_fresh.sh @@ -17,19 +17,62 @@ CLEAR_SQLITE_DATABASE=1 CLEAR_MEASURE_OUTPUT=1 CLEAR_ACTION_OUTPUT=1 CLEAR_MEDIA | `MEASURE_OUTPUT_ROOT` | 传给 `--save-output` 的目录 | `/fish_api/.data/measure_output` | | `YOLO_MODEL` / `WEIGHT_CHECKPOINT` / `ACTION_CHECKPOINT` | 模型路径 | 与仓库内脚本默认一致 | | `SAM_DEVICE` | `cuda` 或 `cpu` | `cuda` | +| `MEASURE_FINAL_AGGREGATE_MODE` | 齐套后对各段 former 体重/体长聚合:`median` / `mean` / `trimmed_mean` | `median` | 可在 `fish_api/.env` 中填写上述变量(`pydantic-settings` 会读取)。 ## 安装与启动 ```bash cd fish_api -uv sync -# 可选:包含 httpx,便于本地用 FastAPI TestClient 做冒烟测试 -# uv sync --group dev +python3 -m pip install -e . # 安装 pyproject 依赖(无 venv 时可用 --user 或系统 site-packages) ./scripts/start_fresh.sh # 默认仅重置 client_id 投递进度,保留 SQLite 历史与快照 # CLEAR_SQLITE_DATABASE=1 bash start_fresh.sh # 需要时才彻底清 SQLite -# 或:uv run uvicorn app.main:app --host 0.0.0.0 --port 8000(需自行 prestart) +# python3 -m uvicorn app.main:app --host 0.0.0.0 --port 8000 # 需自行 prestart ``` + +## SVO 输入与 ``measure_watch``(两种来源,同一套目录逻辑) + +后台 **[``measure_watch``](app/services/measure_watch.py)** 只认 **`MEASURE_WATCH_DIR`** 下一层的 **`fish{N}/`** 子目录及其中的 **``.svo2``**(见 ``iter_svo2_folders``)。与入口无关: + +| 方式 | 说明 | +|------|------| +| **ZED 分段录制** | 每次会话分配 ``fish_id``:取库表(``fish_id`` 与 ``output_dir`` 路径)、父目录下 ``fish``+数字 子目录名、以及 ``fish{N}/`` 下已有 ``.svo2`` 路径 四者编号的最大值再加 1(磁盘有数据而库未记时也不冲突);文件写入 ``{MEASURE_WATCH_DIR}/fish{N}/``(若未配置 ``MEASURE_WATCH_DIR`` 则 ``{STREAM_TMP_DIR}/zed_svo2/fish{N}/``,此时**不会**启用 ``measure_watch``,除非把 ``MEASURE_WATCH_DIR`` 指到 ``…/ingest/zed_svo2``) | +| **手工拷贝** | 将 ``.svo2`` 放入 ``MEASURE_WATCH_DIR/fish{N}/``(自建 ``fish{N}`` 即可) | + +**逐段测量与齐套 final**:对每个 ``.svo2`` 稳定后轮询跑 FishMeasure,写入 SQLite;服务端可在 ``calculation_log`` 中区分 segment/final。``GET /api/v1/biomass/real/camera/`` 的 ``data.result[]`` **仅含** ``id``、``type``、``length``、``weight``、``date``(与历史客户端约定一致);按投递顺序先可能收到多段再收到聚合行。``video_left`` / ``video_right`` 规则不变。 + +**ingest** 的 ``/api/v1/ingest/svo/...`` 为分块上传流程,落盘与上述 ``fish{N}`` 路径独立;``fish{N}`` 目录的测量以 ``measure_watch`` 为准。 + +**ZED 相机**:**fish_api 启动/停止不会自动开关相机录制**;录制由独立服务或仓库根 ``start_recording.sh`` / ``zed_record_cli`` 等自行管理,写入 ``MEASURE_WATCH_DIR/fish{N}/`` 后由 ``measure_watch`` 消费。需要时也可调 ``/api/v1/zed/recording/start|stop``(不经过 uvicorn lifespan)。 + +## ZED 分段录制 CLI + +每次 **开始/停止** 录制会在 SQLite 表 ``zed_recording_sessions`` 中记录一行(``fish_id``、``started_at`` / ``stopped_at``、``output_dir``;分配规则见上表)。 + +在 `fish_api` 目录下执行(与 `app` 包路径一致;依赖已 `pip install -e .`): + +```bash +# 本机直连相机:前台录制,终端不关,Ctrl+C 结束(终端会打印分配的 fish_id) +python3 -m app.zed_record_cli start [--segment-sec SEC] + +# 仅向已运行的 fish_api 发请求(不阻塞) +python3 -m app.zed_record_cli start --remote [--segment-sec SEC] + +# 停止 / 查询状态(HTTP,需 uvicorn 已监听;基址见 FISH_API_BASE_URL 或 PUBLIC_BASE_URL) +python3 -m app.zed_record_cli stop +python3 -m app.zed_record_cli status +``` + +若已通过 `pip install -e .` 安装了控制台脚本,也可使用 `fish-zed-record`(与上表等价)。 + +也可使用仓库根目录下的封装脚本: + +```bash +# 在 FishServer 仓库根目录执行 +./start_recording.sh # 本地前台;可选 --remote / --segment-sec +./stop_recording.sh # HTTP 停止(需 API 已启动) +``` + ## Weight Rule (Current) 最终体重 `pred_weight_g` 由以下规则链决定(按优先级从高到低): @@ -55,15 +98,15 @@ bash scripts/measure_debug.sh --fish-id 14 # 方式 B:先进入 fish_api cd fish_api -uv sync +python3 -m pip install -e . # 若尚未安装依赖 # 默认:MEASURE_WATCH_DIR/fish{N}/ 下所有 .svo2 → 输出到 MEASURE_OUTPUT_ROOT/fish{N}(默认 fish_api/.data/measure_output/fish{N}) -uv run python -m app.measure_debug_cli --fish-id 1 +python3 -m app.measure_debug_cli --fish-id 1 -# 或等价入口(须在 fish_api 目录) -uv run fish-measure-debug --fish-id 1 +# 或等价入口(须在 fish_api 目录,且已 pip install -e .) +fish-measure-debug --fish-id 1 # 指定 SVO 目录或输出目录(在 fish_api 目录下) -uv run python -m app.measure_debug_cli --batch-folder /path/to/fish1 --fish-id 1 --output-root /path/to/out +python3 -m app.measure_debug_cli --batch-folder /path/to/fish1 --fish-id 1 --output-root /path/to/out ``` 结束后会在终端打印 `weight_prediction.json` 中的 `pred_weight_g`、`pred_weight_rule` 等摘要。 diff --git a/fish_api/app/compat.py b/fish_api/app/compat.py new file mode 100644 index 0000000..fdf80f5 --- /dev/null +++ b/fish_api/app/compat.py @@ -0,0 +1,14 @@ +"""Python 3.8 兼容:提供 asyncio.to_thread 的等价实现。""" +from __future__ import annotations + +import asyncio +import functools +from typing import Any, Callable, TypeVar + +T = TypeVar("T") + + +async def to_thread(func: Callable[..., T], *args: Any, **kwargs: Any) -> T: + """asyncio.to_thread 的 Python 3.8 兼容版本。""" + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, functools.partial(func, *args, **kwargs)) diff --git a/fish_api/app/db.py b/fish_api/app/db.py index a188297..5125096 100644 --- a/fish_api/app/db.py +++ b/fish_api/app/db.py @@ -9,6 +9,7 @@ from __future__ import annotations import json import math +import re import shutil import sqlite3 from datetime import datetime, timezone @@ -23,7 +24,7 @@ DEFAULT_CLIENT_ID = "default" MAX_CLIENT_ID_LEN = 128 # 客户端切片索引起缓存:记录每个 client_id 上次返回的切片索引(用于对齐 water/video 端点) -_client_health_slice_index: dict[str, int] = {} +_client_health_slice_index = {} # type: Dict[str, int] def _parse_slice_index_from_source_path(source_path: Optional[str]) -> int: @@ -117,6 +118,17 @@ def init_db(settings: Settings) -> None: last_delivered_id INTEGER NOT NULL DEFAULT 0, PRIMARY KEY (client_id, kind) ); + + CREATE TABLE IF NOT EXISTS zed_recording_sessions ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + fish_id INTEGER NOT NULL, + started_at TEXT NOT NULL, + stopped_at TEXT, + output_dir TEXT NOT NULL + ); + + CREATE INDEX IF NOT EXISTS idx_zed_sessions_fish_id + ON zed_recording_sessions(fish_id); """ ) _migrate_delivery_cursor_from_legacy(conn) @@ -128,6 +140,147 @@ def init_db(settings: Settings) -> None: conn.close() +_FISH_DIR_RE = re.compile(r"^fish(\d+)$") + + +def _max_numeric_fish_folder_id(parent: Path) -> int: + """``parent`` 下名为 ``fish{N}``(N 为数字)的直接子目录中,最大的 N;无则 0。""" + if not parent.is_dir(): + return 0 + m = 0 + try: + for p in parent.iterdir(): + if not p.is_dir(): + continue + mo = _FISH_DIR_RE.match(p.name) + if mo: + m = max(m, int(mo.group(1))) + except OSError: + return 0 + return m + + +def _max_fish_id_from_svo2_under_parent(parent: Path) -> int: + """扫描 ``parent`` 下 ``fish{{N}}/.../*.svo2``,从相对路径第一段取最大 N(库未记会话时仍能发现已有数据)。""" + if not parent.is_dir(): + return 0 + m = 0 + try: + for p in parent.rglob("*.svo2"): + try: + rel = p.relative_to(parent) + except ValueError: + continue + if not rel.parts: + continue + mo = _FISH_DIR_RE.match(rel.parts[0]) + if mo: + m = max(m, int(mo.group(1))) + except OSError: + return 0 + return m + + +def _max_fish_id_from_zed_sessions(conn: sqlite3.Connection) -> int: + """合并 ``fish_id`` 列与 ``output_dir`` 路径中出现的 ``fish{{N}}`` 段(防库与路径不一致)。""" + m = 0 + for row in conn.execute("SELECT fish_id, output_dir FROM zed_recording_sessions"): + m = max(m, int(row["fish_id"])) + od = row["output_dir"] + if not od: + continue + try: + for part in Path(str(od)).parts: + mo = _FISH_DIR_RE.match(part) + if mo: + m = max(m, int(mo.group(1))) + except (ValueError, OSError): + pass + return m + + +def begin_zed_recording_session(settings: Settings) -> Tuple[int, int, Path]: + """为本次录制分配 ``fish_id`` 并写入 ``zed_recording_sessions``。 + + 编号取库内会话、父目录下 ``fish``+数字 子目录、以及其下 ``.svo2`` 文件路径所反映编号三者的最大值再加 1, + 避免目录里已有数据而库尚未记录时仍复用同一编号;若目标路径仍存在则顺延直至可用。 + + 返回 ``(session_row_id, fish_id, output_dir)``。目录规则: + 若配置了 ``MEASURE_WATCH_DIR`` 则为 ``{MEASURE_WATCH_DIR}/fish{N}``, + 否则为 ``{STREAM_TMP_DIR}/zed_svo2/fish{N}``。 + """ + init_db(settings) + conn = _connect(settings.sqlite_path) + try: + conn.execute("BEGIN IMMEDIATE") + db_max = _max_fish_id_from_zed_sessions(conn) + if settings.measure_watch_dir is not None: + parent = settings.measure_watch_dir.resolve() + else: + parent = (settings.stream_tmp_dir / "zed_svo2").resolve() + fs_dir = _max_numeric_fish_folder_id(parent) + fs_svo = _max_fish_id_from_svo2_under_parent(parent) + fs_max = max(fs_dir, fs_svo) + fish_id = max(db_max, fs_max) + 1 + ts = datetime.now(timezone.utc).isoformat() + output_dir = (parent / f"fish{fish_id}").resolve() + # 极端情况:并发或其它原因导致目录已存在,则顺延编号 + for _ in range(10000): + if not output_dir.exists(): + break + fish_id += 1 + output_dir = (parent / f"fish{fish_id}").resolve() + else: + raise RuntimeError( + f"无法为 ZED 录制分配空闲 fish 目录(父目录 {parent})" + ) + output_dir.mkdir(parents=True, exist_ok=True) + cur = conn.execute( + """ + INSERT INTO zed_recording_sessions (fish_id, started_at, stopped_at, output_dir) + VALUES (?, ?, NULL, ?) + """, + (fish_id, ts, str(output_dir)), + ) + session_id = int(cur.lastrowid) + conn.commit() + return (session_id, fish_id, output_dir) + except Exception: + conn.rollback() + raise + finally: + conn.close() + + +def mark_zed_recording_session_stopped( + settings: Settings, session_row_id: int +) -> Optional[int]: + """将对应会话行的 ``stopped_at`` 置为当前时间;返回 ``fish_id``,未更新则 ``None``。""" + init_db(settings) + conn = _connect(settings.sqlite_path) + try: + ts = datetime.now(timezone.utc).isoformat() + cur = conn.execute( + """ + UPDATE zed_recording_sessions + SET stopped_at = ? + WHERE id = ? AND stopped_at IS NULL + """, + (ts, session_row_id), + ) + if cur.rowcount == 0: + conn.commit() + return None + row = conn.execute( + "SELECT fish_id FROM zed_recording_sessions WHERE id = ?", + (session_row_id,), + ).fetchone() + conn.commit() + return int(row["fish_id"]) if row else None + finally: + conn.close() + + def _migrate_add_client_id_column(conn: sqlite3.Connection) -> None: """为旧数据库添加 client_id 列(如果不存在)。""" row = conn.execute( @@ -376,6 +529,37 @@ def list_all_measure_snapshots(settings: Settings) -> List[Dict[str, Any]]: conn.close() +def list_all_health_snapshots(settings: Settings) -> List[Dict[str, Any]]: + """返回 ``health_snapshots`` 全部行(id 降序,最新在前),供调试接口使用。""" + init_db(settings) + conn = _connect(settings.sqlite_path) + try: + rows = conn.execute( + """ + SELECT id, created_at, behavior_result, health_result, + raw_class_en, error, source_path + FROM health_snapshots + ORDER BY id DESC + """ + ).fetchall() + out: List[Dict[str, Any]] = [] + for row in rows: + out.append( + { + "id": row["id"], + "created_at": row["created_at"], + "behavior_result": row["behavior_result"] or "", + "health_result": row["health_result"] or "", + "raw_class_en": row["raw_class_en"] or "", + "error": row["error"], + "source_path": row["source_path"], + } + ) + return out + finally: + conn.close() + + def get_latest_health(settings: Settings) -> HealthSnapshot: init_db(settings) conn = _connect(settings.sqlite_path) diff --git a/fish_api/app/deps.py b/fish_api/app/deps.py index 8d322d1..ff7e423 100644 --- a/fish_api/app/deps.py +++ b/fish_api/app/deps.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Annotated, Optional +from typing import Optional from fastapi import Depends, Header, HTTPException @@ -8,8 +8,8 @@ from app.settings import Settings, get_settings def require_ingest_auth( - settings: Annotated[Settings, Depends(get_settings)], - x_api_key: Annotated[Optional[str], Header(alias="X-API-Key")] = None, + settings: Settings = Depends(get_settings), + x_api_key: Optional[str] = Header(None, alias="X-API-Key"), ) -> None: expected = settings.ingest_api_key.strip() if not expected: diff --git a/fish_api/app/logging_config.py b/fish_api/app/logging_config.py index fac46f6..4bfb726 100644 --- a/fish_api/app/logging_config.py +++ b/fish_api/app/logging_config.py @@ -6,7 +6,7 @@ import json import logging import os import sys -from typing import Any +from typing import Any, Optional, Union from loguru import logger @@ -17,7 +17,7 @@ def format_json_pretty( data: Any, *, indent: int = 2, - max_chars: int | None = 24_000, + max_chars: Optional[int] = 24_000, ) -> str: """将对象格式化为带缩进、保留中文的 JSON 字符串,供 loguru 多行输出。 @@ -37,7 +37,7 @@ class InterceptHandler(logging.Handler): def emit(self, record: logging.LogRecord) -> None: try: - level: str | int = logger.level(record.levelname).name + level = logger.level(record.levelname).name # type: Union[str, int] except ValueError: level = record.levelno logger.opt(depth=6, exception=record.exc_info).log(level, record.getMessage()) diff --git a/fish_api/app/main.py b/fish_api/app/main.py index 3442c84..2c171d2 100644 --- a/fish_api/app/main.py +++ b/fish_api/app/main.py @@ -2,13 +2,14 @@ from __future__ import annotations import asyncio from contextlib import asynccontextmanager +from typing import List from fastapi import FastAPI from app.logging_config import setup_logging from app.media_static import MediaStaticFiles from app.db import init_db -from app.routers import biomass, debug, ingest +from app.routers import biomass, debug, ingest, zed from app.services.action_watch import run_action_watch_loop from app.services.measure_watch import run_measure_watch_loop from app.settings import get_settings @@ -23,11 +24,12 @@ async def lifespan(app: FastAPI): init_db(s) s.media_root.mkdir(parents=True, exist_ok=True) s.stream_tmp_dir.mkdir(parents=True, exist_ok=True) - tasks: list[asyncio.Task[None]] = [] + tasks = [] # type: List[asyncio.Task] if s.action_watch_dir is not None: tasks.append(asyncio.create_task(run_action_watch_loop(s))) if s.measure_watch_dir is not None: tasks.append(asyncio.create_task(run_measure_watch_loop(s))) + yield for t in tasks: t.cancel() @@ -42,6 +44,7 @@ app = FastAPI(title="Fish API", lifespan=lifespan) app.include_router(ingest.router) app.include_router(biomass.router) app.include_router(debug.router) +app.include_router(zed.router) _settings = get_settings() _settings.media_root.mkdir(parents=True, exist_ok=True) @@ -63,5 +66,6 @@ async def root(): "biomass_water_video": "/api/v1/biomass/water/video/", "biomass_sonar_video": "/api/v1/biomass/sonar/video/", "debug_measure": "/api/v1/debug/meause", - "note": "若配置了 ACTION_WATCH_DIR / MEASURE_WATCH_DIR,启动后会后台监控对应目录。", + "zed_recording": "/api/v1/zed/recording/start|stop|status", + "note": "若配置了 ACTION_WATCH_DIR / MEASURE_WATCH_DIR,启动后会后台监控对应目录。ZED 分段录制由独立进程/脚本负责,不由 fish_api 启停;可选 HTTP /api/v1/zed/recording/start|stop|status。", } diff --git a/fish_api/app/measure_debug_cli.py b/fish_api/app/measure_debug_cli.py index ba703ac..4302333 100644 --- a/fish_api/app/measure_debug_cli.py +++ b/fish_api/app/measure_debug_cli.py @@ -1,6 +1,6 @@ -"""CLI:对单条 fish 跑与 fish_api / measure-watch 相同的 FishMeasure 子进程(不写 SQLite、不发布 /media)。 +"""CLI:对单条 fish 跑 FishMeasure 子进程(不写 SQLite、不发布 /media)。 -使用 ``run_measure_batch_subprocess``,与 ``run_full_measure_batch`` 中的推理步骤一致。 +使用 ``run_measure_batch_subprocess``(合并点云)。线上 ``measure_watch`` 为逐段 ``run_full_measure``,齐套后再 aggregate final。 """ from __future__ import annotations diff --git a/fish_api/app/media_static.py b/fish_api/app/media_static.py index ef11341..3edc6b8 100644 --- a/fish_api/app/media_static.py +++ b/fish_api/app/media_static.py @@ -8,7 +8,7 @@ from __future__ import annotations import os from mimetypes import guess_type from pathlib import Path -from typing import Union +from typing import Dict, List, Optional, Tuple, Union from starlette.datastructures import Headers from starlette.responses import FileResponse, Response, StreamingResponse @@ -32,7 +32,7 @@ def _media_type_for_file(path: PathLike) -> str: return "application/octet-stream" -def _parse_range_header(range_header: str, file_size: int) -> list[tuple[int, int]]: +def _parse_range_header(range_header: str, file_size: int) -> List[Tuple[int, int]]: """解析 HTTP Range 头,返回 (start, end) 列表。""" if not range_header.startswith("bytes="): return [] @@ -72,10 +72,10 @@ class _RangeFileResponse(Response): def __init__( self, path: PathLike, - media_type: str | None = None, - stat_result: os.stat_result | None = None, - headers: dict | None = None, - range_header: str | None = None, + media_type: Optional[str] = None, + stat_result: Optional[os.stat_result] = None, + headers: Optional[Dict] = None, + range_header: Optional[str] = None, ): self.path = path self.stat_result = stat_result or os.stat(path) diff --git a/fish_api/app/prestart_fresh.py b/fish_api/app/prestart_fresh.py index 106d983..07d0165 100644 --- a/fish_api/app/prestart_fresh.py +++ b/fish_api/app/prestart_fresh.py @@ -16,12 +16,13 @@ from __future__ import annotations import os from pathlib import Path +from typing import Optional from app.db import _safe_rm_tree, remove_sqlite_database_files, reset_delivery_client_progress from app.settings import get_settings -def _rm_legacy_json(path: Path | None) -> None: +def _rm_legacy_json(path: Optional[Path]) -> None: if path is None: return try: diff --git a/fish_api/app/routers/biomass.py b/fish_api/app/routers/biomass.py index a59e010..2d44461 100644 --- a/fish_api/app/routers/biomass.py +++ b/fish_api/app/routers/biomass.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Optional +from typing import Any, Dict, List, Optional from fastapi import APIRouter, Depends, Header, Query from starlette.responses import JSONResponse @@ -16,10 +16,27 @@ router = APIRouter(prefix="/api/v1/biomass", tags=["biomass"]) HEADER_BIOMASS_NEW = "X-Fish-Biomass-New" -def _new_headers(has_new: bool) -> dict[str, str]: +def _new_headers(has_new: bool) -> Dict[str, str]: return {HEADER_BIOMASS_NEW: "1" if has_new else "0"} +# GET /real/camera/ 的 data.result[] 仅含:id, type, length, weight, date(与客户端约定一致) +_BIOMASS_CAMERA_RESULT_KEYS = ("id", "type", "length", "weight", "date") + + +def _biomass_camera_result_rows(result: Any) -> List[Dict[str, Any]]: + if not isinstance(result, list): + return [] + out: List[Dict[str, Any]] = [] + for it in result: + if not isinstance(it, dict): + continue + row = {k: it[k] for k in _BIOMASS_CAMERA_RESULT_KEYS if k in it} + if row: + out.append(row) + return out + + def _resolve_client_id( x_fish_client_id: Optional[str] = Header(None, alias="X-Fish-Client-Id"), client_id: Optional[str] = Query( @@ -69,7 +86,7 @@ async def get_real_camera( ) payload: dict = { - "result": m.result, + "result": _biomass_camera_result_rows(m.result), "video_left": m.video_left, "video_right": m.video_right, } diff --git a/fish_api/app/routers/debug.py b/fish_api/app/routers/debug.py index 8d21f4e..2bbb8a4 100644 --- a/fish_api/app/routers/debug.py +++ b/fish_api/app/routers/debug.py @@ -4,7 +4,7 @@ from __future__ import annotations from fastapi import APIRouter, Depends -from app.db import list_all_measure_snapshots +from app.db import list_all_health_snapshots, list_all_measure_snapshots from app.settings import Settings, get_settings router = APIRouter(prefix="/api/v1/debug", tags=["debug"]) @@ -16,3 +16,11 @@ async def debug_list_measure_results(settings: Settings = Depends(get_settings)) """列出 SQLite 中已保存的全部 FishMeasure 计算结果(含每条鱼的 result / pred / star 等)。""" items = list_all_measure_snapshots(settings) return {"count": len(items), "items": items} + + +@router.get("/aciton") +@router.get("/action") +async def debug_list_action_results(settings: Settings = Depends(get_settings)): + """列出 SQLite 中已保存的全部 FishAction 健康结果(含切片 source_path)。""" + items = list_all_health_snapshots(settings) + return {"count": len(items), "items": items} diff --git a/fish_api/app/routers/ingest.py b/fish_api/app/routers/ingest.py index 068b09b..88ae8fd 100644 --- a/fish_api/app/routers/ingest.py +++ b/fish_api/app/routers/ingest.py @@ -5,6 +5,7 @@ from pathlib import Path from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Request, Response +from app.compat import to_thread from app.db import ( health_snapshot_deliverable, measure_snapshot_deliverable, @@ -30,9 +31,12 @@ async def _measure_job_serial(svo_path: Path, settings: Settings) -> None: async with app_state.measure_lock: app_state.measure_status = "running" try: - snap = await asyncio.to_thread( - measure_svc.run_full_measure, svo_path, settings - ) + + def _run(): + with app_state.measure_thread_lock: + return measure_svc.run_full_measure(svo_path, settings) + + snap = await to_thread(_run) if measure_snapshot_deliverable(snap): save_measure_snapshot( settings, snap, source_path=str(svo_path.resolve()) @@ -47,7 +51,7 @@ async def _action_job_serial(mp4_path: Path, settings: Settings) -> None: app_state.action_status = "running" try: # 返回 (第一个快照, 所有切片快照列表) - first_snap, all_snaps = await asyncio.to_thread( + first_snap, all_snaps = await to_thread( action_svc.run_full_action, mp4_path, settings ) diff --git a/fish_api/app/routers/zed.py b/fish_api/app/routers/zed.py new file mode 100644 index 0000000..f0eba33 --- /dev/null +++ b/fish_api/app/routers/zed.py @@ -0,0 +1,125 @@ +"""ZED 分段录制 HTTP 控制(响应格式对齐 biomass:code/msg/data;不校验 ingest API Key)。""" + +from __future__ import annotations + +from typing import Any, Dict, Optional + +from fastapi import APIRouter, Body, Depends +from pydantic import BaseModel, Field +from starlette.responses import JSONResponse + +from app.services.zed_recording_control import ( + start_zed_recording, + stop_zed_recording, + zed_recording_is_running, +) +from app.settings import Settings, get_settings +from app.state import app_state + +router = APIRouter(prefix="/api/v1/zed/recording", tags=["zed-recording"]) + + +def _ok(data: Dict[str, Any], msg: str = "成功") -> JSONResponse: + return JSONResponse( + content={ + "code": 200, + "msg": msg, + "data": data, + } + ) + + +def _err( + http_status: int, + *, + code: int, + msg: str, + data: Optional[Dict[str, Any]] = None, +) -> JSONResponse: + body: Dict[str, Any] = {"code": code, "msg": msg} + if data is not None: + body["data"] = data + return JSONResponse(status_code=http_status, content=body) + + +class ZedRecordingStartRequest(BaseModel): + """可选覆盖分段时长;``fish_id`` 由服务端每次启动在库中递增分配。""" + + segment_sec: Optional[float] = Field( + None, + ge=1.0, + description="每段时长(秒);缺省使用 ZED_SVO_SEGMENT_SEC", + ) + + +@router.get("/status") +async def zed_recording_status() -> JSONResponse: + data: Dict[str, Any] = {"running": zed_recording_is_running()} + if app_state.zed_recording_fish_id is not None: + data["fish_id"] = app_state.zed_recording_fish_id + return _ok(data) + + +@router.post("/start") +async def zed_recording_start( + body: ZedRecordingStartRequest = Body(default_factory=ZedRecordingStartRequest), + settings: Settings = Depends(get_settings), +) -> JSONResponse: + ok, msg, fish_id, _ = start_zed_recording( + settings, + segment_sec=body.segment_sec, + ) + if not ok: + if msg == "already_running": + return _err( + 409, + code=409, + msg="已在录制中", + data={"ok": False, "message": "已在录制中"}, + ) + if msg.startswith("session_db_error:"): + return _err( + 500, + code=500, + msg="写入录制会话失败", + data={"ok": False, "message": msg}, + ) + return _err( + 500, + code=500, + msg="启动录制失败", + data={"ok": False, "message": msg}, + ) + + return _ok( + { + "ok": True, + "message": "录制开始", + "fish_id": fish_id, + } + ) + + +@router.post("/stop") +async def zed_recording_stop(settings: Settings = Depends(get_settings)) -> JSONResponse: + ok, msg, fish_id = stop_zed_recording(settings) + if not ok: + if msg == "not_running": + return _ok({"ok": False, "message": "未在录制"}, msg="未在录制") + if msg == "stop_timeout": + return _err( + 504, + code=504, + msg="停止超时", + data={"ok": False, "message": "停止超时"}, + ) + return _err( + 500, + code=500, + msg="停止录制失败", + data={"ok": False, "message": msg}, + ) + out: Dict[str, Any] = {"ok": True, "message": "录制停止"} + if fish_id is not None: + out["fish_id"] = fish_id + return _ok(out) diff --git a/fish_api/app/services/action.py b/fish_api/app/services/action.py index 7718a70..ed3eb7a 100644 --- a/fish_api/app/services/action.py +++ b/fish_api/app/services/action.py @@ -6,7 +6,7 @@ import sys import tempfile from datetime import datetime, timezone from pathlib import Path -from typing import List +from typing import List, Tuple from app.logging_config import format_json_pretty from app.services.video_slice import get_video_duration, slice_video @@ -102,7 +102,7 @@ def run_action_subprocess(mp4_path: Path, settings: Settings) -> str: Path(out_json).unlink(missing_ok=True) -def run_full_action(mp4_path: Path, settings: Settings) -> tuple[HealthSnapshot, list[HealthSnapshot]]: +def run_full_action(mp4_path: Path, settings: Settings) -> Tuple[HealthSnapshot, List[HealthSnapshot]]: """运行 FishAction 健康检测。如果视频较长,会自动切片后分别检测。 每个切片被视为独立的视频,返回所有切片的结果列表。 @@ -112,7 +112,7 @@ def run_full_action(mp4_path: Path, settings: Settings) -> tuple[HealthSnapshot, settings: 应用配置 Returns: - tuple[HealthSnapshot, list[HealthSnapshot]]: (第一个切片/完整视频的快照, 所有切片快照列表) + Tuple[HealthSnapshot, List[HealthSnapshot]]: (第一个切片/完整视频的快照, 所有切片快照列表) - 如果视频被切片:返回 (第一个切片, 所有切片列表) - 如果视频未被切片:返回 (完整视频快照, [完整视频快照]) """ @@ -141,7 +141,7 @@ def run_full_action(mp4_path: Path, settings: Settings) -> tuple[HealthSnapshot, ) # 处理每个切片 - all_snaps: list[HealthSnapshot] = [] + all_snaps = [] # type: List[HealthSnapshot] for i, slice_file in enumerate(slice_files): start_time = i * DEFAULT_SLICE_DURATION end_time = min(start_time + DEFAULT_SLICE_DURATION, duration) diff --git a/fish_api/app/services/action_watch.py b/fish_api/app/services/action_watch.py index a53fc6b..fc42ac5 100644 --- a/fish_api/app/services/action_watch.py +++ b/fish_api/app/services/action_watch.py @@ -2,10 +2,12 @@ from __future__ import annotations import asyncio from pathlib import Path -from typing import Dict, Set +from typing import Dict, List, Set, Tuple from loguru import logger +from app.compat import to_thread + from app.db import ( add_watch_processed, health_snapshot_deliverable, @@ -28,7 +30,7 @@ def _state_path(settings: Settings) -> Path: return settings.action_watch_dir / ".fishaction_watch_processed.json" -def iter_mp4(watch_dir: Path, recursive: bool) -> list[Path]: +def iter_mp4(watch_dir: Path, recursive: bool) -> List[Path]: if recursive: return sorted( p @@ -56,7 +58,7 @@ async def _run_inference_and_state( app_state.action_status = "running" try: # 返回 (第一个快照, 所有切片快照列表) - first_snap, all_snaps = await asyncio.to_thread( + first_snap, all_snaps = await to_thread( action_svc.run_full_action, mp4, settings ) @@ -98,7 +100,7 @@ async def _run_inference_and_state( async def watch_tick( settings: Settings, processed: Set[str], - stability: Dict[str, tuple[int, int]], + stability: Dict[str, Tuple[int, int]], state_file: Path, ) -> bool: """处理一轮目录扫描;若处理了至少一个文件返回 True。""" @@ -149,7 +151,7 @@ async def run_action_watch_loop(settings: Settings) -> None: if settings.action_watch_use_state_file else set() ) - stability: Dict[str, tuple[int, int]] = {} + stability = {} # type: Dict[str, Tuple[int, int]] logger.info( "[action-watch] watching {} (poll={}s, stable_polls={}, state={} {})", diff --git a/fish_api/app/services/measure.py b/fish_api/app/services/measure.py index 52239f1..96e014e 100644 --- a/fish_api/app/services/measure.py +++ b/fish_api/app/services/measure.py @@ -3,6 +3,7 @@ from __future__ import annotations import json import math import os +import statistics import re import shutil import subprocess @@ -942,13 +943,208 @@ def _find_preview_videos(output_dir: Path) -> Tuple[Optional[Path], Optional[Pat return None, None +def _snapshot_length_mm(snap: MeasureSnapshot) -> Optional[float]: + if snap.result and isinstance(snap.result[0], dict): + return _row_finite_field(snap.result[0], "length") + return None + + +def _materialize_weight_json_for_svo( + svo_path: Path, + settings: Settings, + *, + fish_id: str, + temp_dir: Path, +) -> Path: + root = settings.measure_output_root / f"fish{fish_id}" + candidate = root / svo_path.stem / "weight_prediction.json" + if candidate.is_file(): + return candidate + data = _load_weight_json(svo_path, settings, output_root=root) + temp_dir.mkdir(parents=True, exist_ok=True) + materialized = temp_dir / f"{svo_path.stem}_weight_prediction.json" + materialized.write_text( + json.dumps(data, ensure_ascii=False, indent=2), + encoding="utf-8", + ) + return materialized + + +def _run_generate_video_with_labels_cli( + *, + settings: Settings, + svo_path: Path, + output_dir: Path, + weight_json: Path, + summary_weight_g: Optional[float], + summary_length_mm: Optional[float], + summary_star: bool, + output_video_name: str, +) -> Path: + script = settings.fish_measure_root / "generate_video_with_labels.py" + if not script.is_file(): + raise FileNotFoundError(f"Missing FishMeasure preview script: {script}") + + output_dir.mkdir(parents=True, exist_ok=True) + cmd = [ + _py_exe(settings), + str(script), + "--svo", + str(svo_path.resolve()), + "--save-output", + str(output_dir.resolve()), + "--weight-json", + str(weight_json.resolve()), + "--conf", + str(settings.measure_yolo_conf), + "--sam-device", + settings.sam_device, + "--output-video-name", + output_video_name, + ] + if settings.predict_show_large_labels_at_top_right: + cmd.append("--show-large-labels-at-top-right") + if summary_weight_g is not None and math.isfinite(summary_weight_g): + cmd.extend(["--summary-weight-g", str(float(summary_weight_g))]) + if summary_length_mm is not None and math.isfinite(summary_length_mm): + cmd.extend(["--summary-length-mm", str(float(summary_length_mm))]) + if summary_star: + cmd.append("--summary-star") + + proc = run_subprocess_with_log( + cmd, + cwd=str(settings.fish_measure_root), + env=os.environ.copy(), + log_name="FishMeasure", + stream_to_logger=False, + ) + if proc.returncode != 0: + err = proc.stdout or "" + raise RuntimeError( + f"generate_video_with_labels.py failed ({proc.returncode}): {err[-4000:]}" + ) + + video_path = output_dir / "images" / output_video_name + if not video_path.is_file(): + raise FileNotFoundError(f"Expected generated preview video at {video_path}") + return video_path + + +def _concat_preview_videos(inputs: List[Path], dst: Path) -> bool: + if not inputs: + return False + try: + import cv2 + except Exception: + return False + + writer = None + target_size: Optional[Tuple[int, int]] = None + fps = 10.0 + wrote_any = False + try: + dst.parent.mkdir(parents=True, exist_ok=True) + for src in inputs: + if not src.is_file(): + continue + cap = cv2.VideoCapture(str(src)) + if not cap.isOpened(): + continue + cur_fps = float(cap.get(cv2.CAP_PROP_FPS) or 0.0) + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH) or 0) + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT) or 0) + if width <= 0 or height <= 0: + cap.release() + continue + if writer is None: + target_size = (width, height) + fps = cur_fps if cur_fps > 0.0 else 10.0 + writer = cv2.VideoWriter( + str(dst), + cv2.VideoWriter_fourcc(*"mp4v"), + fps, + target_size, + ) + while True: + ok, frame = cap.read() + if not ok: + break + if target_size is not None and ( + frame.shape[1] != target_size[0] or frame.shape[0] != target_size[1] + ): + frame = cv2.resize(frame, target_size) + writer.write(frame) + wrote_any = True + cap.release() + finally: + if writer is not None: + writer.release() + return wrote_any and dst.is_file() and dst.stat().st_size > 0 + + +def generate_aggregate_preview_media( + contributing_svos: List[Path], + snap: MeasureSnapshot, + fish_id: str, + settings: Settings, + *, + final_key: str, +) -> Tuple[str, str]: + """为 final 快照生成并发布串接预览视频,返回 (video_left, video_right)。""" + if not contributing_svos or not snap.result: + return "", "" + + summary_weight_g = snap.pred if snap.pred is not None and math.isfinite(snap.pred) else None + summary_length_mm = _snapshot_length_mm(snap) + root = settings.measure_output_root / f"fish{fish_id}" + temp_root = root / "__aggregate_preview__" / _safe_media_prefix(final_key) + temp_root.mkdir(parents=True, exist_ok=True) + + part_videos: List[Path] = [] + weight_json_cache_dir = temp_root / "__weight_json_cache__" + for idx, svo in enumerate(contributing_svos, start=1): + part_dir = temp_root / f"part_{idx:02d}_{_safe_media_prefix(svo.stem)}" + weight_json = _materialize_weight_json_for_svo( + svo, + settings, + fish_id=fish_id, + temp_dir=weight_json_cache_dir, + ) + part_video = _run_generate_video_with_labels_cli( + settings=settings, + svo_path=svo, + output_dir=part_dir, + weight_json=weight_json, + summary_weight_g=summary_weight_g, + summary_length_mm=summary_length_mm, + summary_star=bool(snap.star), + output_video_name=f"final_part_{idx:02d}.mp4", + ) + part_videos.append(part_video) + + if not part_videos: + return "", "" + + final_preview = temp_root / "final_preview.mp4" + if not _concat_preview_videos(part_videos, final_preview): + raise RuntimeError(f"Failed to concatenate final preview videos into {final_preview}") + + return _publish_media( + final_preview, + final_preview, + settings, + f"{datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%S')}_final_fish{fish_id}_{_safe_media_prefix(final_key)}", + ) + + def _split_sbs_video(src: Path, left_dst: Path, right_dst: Path) -> bool: """Split a side-by-side stereo video (W x H where W == 2*H_single) into left/right halves. Returns True if split succeeded, False otherwise (caller should fall back to copy). """ ffmpeg_path = _get_ffmpeg_path() - ffprobe_path = str(Path(ffmpeg_path).parent / "ffprobe") + from app.services.video_slice import _get_ffprobe_path + ffprobe_path = _get_ffprobe_path() probe = subprocess.run( [ ffprobe_path, "-v", "quiet", "-print_format", "json", @@ -990,21 +1186,19 @@ def _split_sbs_video(src: Path, left_dst: Path, right_dst: Path) -> bool: def _get_ffmpeg_path() -> str: - """获取可用的 ffmpeg 路径。优先使用项目配置的 ffmpeg。""" - # 优先使用项目目录下的 ffmpeg - project_ffmpeg = Path("/home/ubuntu/projects/FishServer/tools/ffmpeg/bin/ffmpeg") - if project_ffmpeg.is_file(): - return str(project_ffmpeg) - # 尝试系统路径 + """获取可用的 ffmpeg 路径。优先使用 .env 中 FFMPEG_PATH 配置。""" + from app.settings import get_settings + configured = get_settings().ffmpeg_path.strip() + if configured and Path(configured).is_file(): + return configured system_paths = ["/usr/bin/ffmpeg", "/usr/local/bin/ffmpeg"] for path in system_paths: if Path(path).is_file(): return path - # 回退到 PATH 中的 ffmpeg return "ffmpeg" -def _get_h264_encoder() -> tuple[str, list[str], str]: +def _get_h264_encoder() -> Tuple[str, List[str], str]: """检测可用的H.264编码器,返回 (encoder_name, options, ffmpeg_path)。 优先使用 libx264(纯软件,最可靠),硬件编码器需要实际测试才能确认可用。 @@ -1062,9 +1256,10 @@ def _transcode_with_x264(src: Path, dst: Path) -> bool: return False # 首先用 ffprobe 获取视频信息 + from app.services.video_slice import _get_ffprobe_path try: probe = subprocess.run( - ["ffprobe", "-v", "quiet", "-print_format", "json", "-show_streams", str(src)], + [_get_ffprobe_path(), "-v", "quiet", "-print_format", "json", "-show_streams", str(src)], capture_output=True, text=True, timeout=10 ) import json as _json @@ -1373,7 +1568,7 @@ def build_measure_snapshot( updated_at=datetime.now(timezone.utc), raw_prediction_path=str(root / svo_path.stem / "weight_prediction.json"), pred=pred_weight, - star=is_confident, + star=False, calculation_log=calc_log, ) @@ -1537,6 +1732,209 @@ def build_measure_snapshot_batch( updated_at=datetime.now(timezone.utc), raw_prediction_path=str(combined_json) if combined_json.is_file() else None, pred=pred_weight, - star=is_confident, + star=False, calculation_log=calc_log, ) + + +def _row_finite_field(d: dict, key: str) -> Optional[float]: + v = d.get(key) + if v is None: + return None + try: + x = float(v) + return x if math.isfinite(x) else None + except (TypeError, ValueError): + return None + + +def tag_measure_snapshot_meta( + snap: MeasureSnapshot, + *, + measurement_phase: str, + fish_folder: str = "", + segment_source: str = "", +) -> MeasureSnapshot: + """仅在 ``calculation_log`` 中记录 segment/final 元信息;不向 ``result`` 行追加字段(保持 biomass 接口约定)。""" + lines = [ + f"# measurement_phase: {measurement_phase}", + f"# fish_folder: {fish_folder}", + ] + if segment_source: + lines.append(f"# segment_source: {segment_source}") + meta = "\n".join(lines) + "\n" + snap.calculation_log = meta + (snap.calculation_log or "") + return snap + + +def _segment_weight_g(snap: MeasureSnapshot) -> Optional[float]: + if snap.pred is not None and math.isfinite(snap.pred): + return float(snap.pred) + if snap.result and isinstance(snap.result[0], dict): + return _row_finite_field(snap.result[0], "weight") + return None + + +def _segment_length_mm(snap: MeasureSnapshot) -> Optional[float]: + if snap.result and isinstance(snap.result[0], dict): + return _row_finite_field(snap.result[0], "length") + return None + + +def _aggregate_numeric(values: List[float], mode: str) -> float: + if not values: + return float("nan") + m = (mode or "median").strip().lower() + if m not in ("median", "mean", "trimmed_mean"): + m = "median" + if m == "mean": + return float(sum(values) / len(values)) + if m == "trimmed_mean" and len(values) >= 3: + s = sorted(values) + core = s[1:-1] + return float(sum(core) / len(core)) + return float(statistics.median(values)) + + +def _final_star_from_contributing_svos( + contributing_svos: List[Path], fish_id: str, settings: Settings +) -> bool: + """与逐段曾用 ``all(s.star)`` 等价:每段对 dgcnn_summary 做 ``_star_confident_like_test_dgcnn``。""" + if not contributing_svos: + return False + root = settings.measure_output_root / f"fish{fish_id}" + for svo in contributing_svos: + try: + data = _load_weight_json(svo, settings, output_root=root) + except (FileNotFoundError, RuntimeError, OSError, ValueError): + return False + summary = data.get("dgcnn_summary") or data.get("weight_summary") or {} + if not _star_confident_like_test_dgcnn(summary): + return False + return True + + +def reload_segment_snapshots_for_aggregate( + svo_list: List[Path], fish_id: str, settings: Settings +) -> List[Tuple[Path, MeasureSnapshot]]: + """从已写入的 FishMeasure 输出目录重载各段快照(用于齐套 final 聚合,无需重跑子进程)。 + + 逐段测量失败或未写出 ``weight_prediction.json`` 的段仍会被 watch 标为已处理; + 此处跳过这些段,仅用成功段聚合,避免 final 整批因单段缺文件而失败。 + + 返回 ``(svo_path, snapshot)`` 列表,与 ``build_measure_snapshot_aggregate(..., contributing_svos=...)`` 对齐。 + """ + root = settings.measure_output_root / f"fish{fish_id}" + out: List[Tuple[Path, MeasureSnapshot]] = [] + for svo in sorted(svo_list, key=lambda x: str(x.resolve())): + try: + out.append((svo, build_measure_snapshot(svo, settings, output_root=root))) + except (FileNotFoundError, RuntimeError) as e: + logger.warning( + "[FishMeasure] final aggregate skip segment (no usable weight JSON) fish_id={} svo={}: {}", + fish_id, + svo.name, + e, + ) + return out + + +def build_measure_snapshot_aggregate( + segments: List[MeasureSnapshot], + fish_id: str, + settings: Settings, + *, + contributing_svos: List[Path], + fish_folder: str = "", + segment_source_paths: str = "", +) -> MeasureSnapshot: + """从逐段 ``MeasureSnapshot`` 聚合为一条 final 快照(无预览视频)。 + + ``contributing_svos`` 须与 ``segments`` 等长且顺序一一对应(成功重载的各段 SVO), + 用于按段 ``dgcnn_summary`` 计算 final ``star``。 + """ + if not segments: + return MeasureSnapshot( + result=[], + video_left="", + video_right="", + error="no_segments", + ) + if len(contributing_svos) != len(segments): + raise ValueError( + f"contributing_svos length {len(contributing_svos)} != segments length {len(segments)}" + ) + + mode = (settings.measure_final_aggregate_mode or "median").strip().lower() + if mode not in ("median", "mean", "trimmed_mean"): + mode = "median" + + ws: List[float] = [w for s in segments if (w := _segment_weight_g(s)) is not None] + ls = [ln for s in segments if (ln := _segment_length_mm(s)) is not None] + star_final = _final_star_from_contributing_svos(contributing_svos, fish_id, settings) + + if not ws: + return MeasureSnapshot( + result=[], + video_left="", + video_right="", + error="aggregate_no_segment_weights", + calculation_log="# measurement_phase: final\n# error: no segment weights\n", + ) + + pred_w = _aggregate_numeric(ws, mode) + pred_len = _aggregate_numeric(ls, mode) if ls else float("nan") + + if not math.isfinite(pred_w): + for s in segments: + w = _segment_weight_g(s) + if w is not None and math.isfinite(w): + pred_w = w + break + if not math.isfinite(pred_len): + for s in segments: + ln = _segment_length_mm(s) + if ln is not None and math.isfinite(ln): + pred_len = ln + break + + fid = int(fish_id) if fish_id.isdigit() else 1 + fish_type = "大黄鱼" + if segments[0].result and isinstance(segments[0].result[0], dict): + t = segments[0].result[0].get("type") + if isinstance(t, str) and t.strip(): + fish_type = t.strip() + + row: Dict[str, Any] = { + "id": fid, + "type": fish_type, + "weight": str(round(pred_w)) if math.isfinite(pred_w) else "", + "length": str(round(pred_len)) if math.isfinite(pred_len) else "", + "date": datetime.now(timezone.utc).strftime("%Y-%m-%d"), + } + result = [row] + + lines = [ + f"# aggregate_mode: {mode}", + f"# segment_count: {len(segments)}", + f"# segment_weights_g: {ws}", + f"# segment_lengths_mm: {ls}", + ] + calc_log = "\n".join(lines) + "\n# Aggregated from per-segment former results (measure_watch).\n" + + snap = MeasureSnapshot( + result=result, + video_left="", + video_right="", + updated_at=datetime.now(timezone.utc), + raw_prediction_path=None, + pred=pred_w if math.isfinite(pred_w) else None, + star=star_final, + calculation_log=calc_log, + ) + return tag_measure_snapshot_meta( + snap, + measurement_phase="final", + fish_folder=fish_folder, + segment_source=segment_source_paths, + ) diff --git a/fish_api/app/services/measure_watch.py b/fish_api/app/services/measure_watch.py index cd08308..0aa0035 100644 --- a/fish_api/app/services/measure_watch.py +++ b/fish_api/app/services/measure_watch.py @@ -1,13 +1,16 @@ -"""后台轮询目录中的 .svo2,跑 FishMeasure,写入 SQLite(与 ingest 共用)。""" +"""后台轮询目录中的 .svo2,逐段跑 FishMeasure,齐套后聚合 final(与 ingest 共用 SQLite)。""" from __future__ import annotations import asyncio +import hashlib from pathlib import Path -from typing import Dict, Set +from typing import Dict, List, Set, Tuple from loguru import logger +from app.compat import to_thread + from app.db import ( add_watch_processed, load_watch_processed, @@ -30,143 +33,256 @@ def _state_path(settings: Settings) -> Path: return settings.measure_watch_dir / ".fishmeasure_watch_processed.json" -def iter_svo2_folders(watch_dir: Path) -> list[tuple[list[Path], str]]: +def iter_svo2_folders(watch_dir: Path) -> List[Tuple[List[Path], str]]: """扫描子文件夹,返回 (svo文件路径列表, fish_id) 列表。 - 文件夹命名格式为 fish{N},如 fish1、fish2 等。 - 每个子文件夹可以包含多个 .svo2 文件(同一条鱼的多段视频), - 这些 SVO 文件会被批量处理,点云合并后进行重量预测。 + 文件夹命名格式为 fish{N}。每个子文件夹内多个 .svo2 先逐段测量,齐套后再聚合 final。 """ - result: list[tuple[list[Path], str]] = [] + result = [] # type: List[Tuple[List[Path], str]] if not watch_dir.is_dir(): return result for entry in sorted(watch_dir.iterdir()): if not entry.is_dir(): continue - # 从文件夹名提取 fish_id,格式为 fish{N} folder_name = entry.name if not folder_name.startswith("fish"): continue try: - fish_id = folder_name[4:] # 去掉 "fish" 前缀 + fish_id = folder_name[4:] if not fish_id.isdigit(): continue except (IndexError, ValueError): continue - # 在子文件夹中查找所有 .svo2 文件 svo_files = sorted([ p for p in entry.iterdir() if p.is_file() and p.suffix.lower() == ".svo2" ]) if svo_files: - # 返回该文件夹中的所有 SVO 文件,它们将被批量处理 result.append((svo_files, fish_id)) return result -async def _run_measure_and_state( - svo_list: list[Path], +def _final_processed_key(fish_id: str, svo_list: List[Path]) -> str: + sig = "|".join(sorted(str(p.resolve()) for p in svo_list)) + h = hashlib.sha256(sig.encode("utf-8")).hexdigest()[:24] + return f"__measure_final__fish{fish_id}:{h}" + + +def _folder_size_tuple(svo_list: List[Path]) -> Tuple[Tuple[str, int], ...]: + out: List[Tuple[str, int]] = [] + for p in sorted(svo_list, key=lambda x: str(x.resolve())): + try: + st = p.stat() + out.append((str(p.resolve()), int(st.st_size))) + except OSError: + return tuple() + return tuple(out) + + +async def _run_single_svo_measure( + svo: Path, fish_id: str, settings: Settings, processed: Set[str], state_file: Path, ) -> None: - """批量处理同一条鱼的多个 SVO:合并点云后一次 DGCNN;解析与 test_dgcnn summary 对齐。 - """ - if not svo_list: - return + key = str(svo.resolve()) + fish_folder = svo.parent.resolve() + fish_output_root = settings.measure_output_root / f"fish{fish_id}" + fish_output_root.mkdir(parents=True, exist_ok=True) - # 生成唯一的 key 列表(用于 processed 标记) - keys = [str(svo.resolve()) for svo in svo_list] - # 检查是否全部已处理 - if all(key in processed for key in keys): - return - - svo_names = ", ".join(svo.name for svo in svo_list) - logger.info("[measure-watch] batch inference for fish_id={}: {} SVO(s): {}", - fish_id, len(svo_list), svo_names) + logger.info( + "[measure-watch] segment inference fish_id={} svo={}", + fish_id, + svo.name, + ) async with app_state.measure_lock: app_state.measure_status = "running" try: - # 使用 batch 模式处理所有 SVO,传入 fish_id 作为结果 id - snap = await asyncio.to_thread( - measure_svc.run_full_measure_batch, svo_list, settings, fish_id + + def _run(): + with app_state.measure_thread_lock: + return measure_svc.run_full_measure( + svo, settings, output_root=fish_output_root + ) + + snap = await to_thread(_run) + + snap = measure_svc.tag_measure_snapshot_meta( + snap, + measurement_phase="segment", + fish_folder=str(fish_folder), + segment_source=str(svo.resolve()), ) if measure_snapshot_deliverable(snap): - # 保存结果,client_id=None 表示对所有客户端可见 - # fish_id 只用于 result 中的 id 字段,不作为 client_id - source_paths = "|".join(keys) # 合并所有 source_path save_measure_snapshot( - settings, snap, source_path=source_paths, client_id=None + settings, + snap, + source_path=str(svo.resolve()), + client_id=None, ) else: logger.warning( - "[measure-watch] no deliverable measure rows for fish_id={}, skip SQLite", + "[measure-watch] no deliverable measure rows for fish_id={} svo={}, skip SQLite", + fish_id, + svo.name, + ) + + app_state.measure_status = "idle" + processed.add(key) + if settings.measure_watch_use_state_file: + add_watch_processed(settings, key, "measure") + + r0 = snap.result[0] if snap.result else {} + logger.info( + "[measure-watch] segment done: fish_id={} svo={} weight={!r}", + fish_id, + svo.name, + r0.get("weight", ""), + ) + + except (RuntimeError, FileNotFoundError) as e: + logger.warning( + "[measure-watch] measure failed fish_id={} svo={}: {}", + fish_id, + svo.name, + e, + ) + app_state.measure_status = "idle" + processed.add(key) + if settings.measure_watch_use_state_file: + add_watch_processed(settings, key, "measure") + + except Exception as e: + logger.exception( + "[measure-watch] error fish_id={} svo={}: {}", + fish_id, + svo.name, + e, + ) + app_state.measure_status = "idle" + processed.add(key) + if settings.measure_watch_use_state_file: + add_watch_processed(settings, key, "measure") + + +async def _run_final_aggregate( + svo_list: List[Path], + fish_id: str, + settings: Settings, + processed: Set[str], + state_file: Path, + final_key: str, +) -> None: + fish_folder = svo_list[0].parent.resolve() + + logger.info( + "[measure-watch] final aggregate fish_id={} {} segment(s)", + fish_id, + len(svo_list), + ) + + async with app_state.measure_lock: + app_state.measure_status = "running" + try: + + def _reload(): + return measure_svc.reload_segment_snapshots_for_aggregate( + svo_list, fish_id, settings + ) + + pairs = await to_thread(_reload) + contributing_svos = [p[0] for p in pairs] + segments = [p[1] for p in pairs] + paths_joined = "|".join(sorted(str(p.resolve()) for p in contributing_svos)) + + snap = measure_svc.build_measure_snapshot_aggregate( + segments, + fish_id, + settings, + contributing_svos=contributing_svos, + fish_folder=str(fish_folder), + segment_source_paths=paths_joined, + ) + + if measure_snapshot_deliverable(snap): + try: + v_left, v_right = await to_thread( + measure_svc.generate_aggregate_preview_media, + contributing_svos, + snap, + fish_id, + settings, + final_key=final_key, + ) + snap.video_left = v_left + snap.video_right = v_right + except Exception as e: + logger.warning( + "[measure-watch] final preview generate failed fish_id={}: {}", + fish_id, + e, + ) + save_measure_snapshot( + settings, + snap, + source_path=f"aggregate:{final_key}", + client_id=None, + ) + else: + logger.warning( + "[measure-watch] final not deliverable for fish_id={}, skip SQLite", fish_id, ) app_state.measure_status = "idle" - - # 标记所有 SVO 为已处理 - for key in keys: - processed.add(key) - if settings.measure_watch_use_state_file: - add_watch_processed(settings, key, "measure") + processed.add(final_key) + if settings.measure_watch_use_state_file: + add_watch_processed(settings, final_key, "measure") r0 = snap.result[0] if snap.result else {} - w = r0.get("weight", "") logger.info( - "[measure-watch] done: fish_id={} SVOs={} weight={!r}", - fish_id, len(svo_list), w + "[measure-watch] final done: fish_id={} weight={!r}", + fish_id, + r0.get("weight", ""), ) - except (RuntimeError, FileNotFoundError) as e: - logger.warning("[measure-watch] measure failed for fish_id={}: {}", fish_id, e) - app_state.measure_status = "idle" - for key in keys: - processed.add(key) - if settings.measure_watch_use_state_file: - add_watch_processed(settings, key, "measure") - except Exception as e: - logger.exception("[measure-watch] error on fish_id={}: {}", fish_id, e) + logger.exception( + "[measure-watch] final aggregate failed fish_id={}: {}", + fish_id, + e, + ) app_state.measure_status = "idle" - for key in keys: - processed.add(key) - if settings.measure_watch_use_state_file: - add_watch_processed(settings, key, "measure") + processed.add(final_key) + if settings.measure_watch_use_state_file: + add_watch_processed(settings, final_key, "measure") async def watch_tick( settings: Settings, processed: Set[str], - stability: Dict[str, tuple[int, int]], + stability: Dict[str, Tuple[int, int]], + final_stability: Dict[str, Tuple[Tuple[Tuple[str, int], ...], int]], state_file: Path, ) -> bool: - """处理一轮目录扫描;若处理了至少一个文件返回 True。 - - 使用 batch 模式:同一条鱼(fish{N} 文件夹)下的所有 SVO 文件会被一起处理, - 点云合并后进行重量预测(与 test_dgcnn.sh --batch-root 相同的逻辑)。 - """ + """逐段稳定即测量;同一 fish 目录全部段已处理且整体稳定后写 final。""" assert settings.measure_watch_dir is not None watch_dir = settings.measure_watch_dir did = False seen_keys: Set[str] = set() - # 使用新的子文件夹扫描方式,返回 (svo_list, fish_id) for svo_list, fish_id in iter_svo2_folders(watch_dir): if not svo_list: continue - # 为该 fish 文件夹中的所有 SVO 文件计算稳定性 - # 只有当所有 SVO 都达到稳定轮询次数时才处理 - all_stable = True - any_new = False + fish_folder = svo_list[0].parent + folder_key = str(fish_folder.resolve()) for svo in svo_list: key = str(svo.resolve()) @@ -174,43 +290,57 @@ async def watch_tick( if key in processed: continue - any_new = True try: st = svo.stat() except OSError: - all_stable = False continue size = int(st.st_size) if size <= 0: stability.pop(key, None) - all_stable = False continue last = stability.get(key) if last is None or last[0] != size: stability[key] = (size, 1) - all_stable = False else: _, cnt = last cnt += 1 stability[key] = (size, cnt) - if cnt < settings.measure_watch_stable_polls: - all_stable = False + if cnt >= settings.measure_watch_stable_polls: + await _run_single_svo_measure( + svo, fish_id, settings, processed, state_file + ) + stability.pop(key, None) + did = True - # 如果该文件夹下有新的 SVO 文件且全部达到稳定,则批量处理 - if any_new and all_stable: - await _run_measure_and_state(svo_list, fish_id, settings, processed, state_file) + fk = _final_processed_key(fish_id, svo_list) + if fk in processed: + continue - # 清理已处理的 SVO 文件的稳定性记录 - for svo in svo_list: - key = str(svo.resolve()) - stability.pop(key, None) + if not all(str(p.resolve()) in processed for p in svo_list): + final_stability.pop(folder_key, None) + continue - did = True + tup = _folder_size_tuple(svo_list) + if not tup: + continue + + prev = final_stability.get(folder_key) + if prev is None or prev[0] != tup: + final_stability[folder_key] = (tup, 1) + else: + _, c = prev + c += 1 + final_stability[folder_key] = (tup, c) + if c >= settings.measure_watch_stable_polls: + await _run_final_aggregate( + svo_list, fish_id, settings, processed, state_file, fk + ) + final_stability.pop(folder_key, None) + did = True - # 清理不再看到的文件的稳定性记录 for k in list(stability.keys()): if k not in seen_keys: del stability[k] @@ -231,20 +361,24 @@ async def run_measure_watch_loop(settings: Settings) -> None: if settings.measure_watch_use_state_file else set() ) - stability: Dict[str, tuple[int, int]] = {} + stability = {} # type: Dict[str, Tuple[int, int]] + final_stability = {} # type: Dict[str, Tuple[Tuple[Tuple[str, int], ...], int]] logger.info( - "[measure-watch] watching {} (poll={}s, stable_polls={}, state={} {})", + "[measure-watch] watching {} (poll={}s, stable_polls={}, aggregate={}, state={} {})", wd, settings.measure_watch_poll_interval, settings.measure_watch_stable_polls, + settings.measure_final_aggregate_mode, "on" if settings.measure_watch_use_state_file else "off", state_file if settings.measure_watch_use_state_file else "", ) idle_warn_state = IdleWatchWarnState() while True: - did = await watch_tick(settings, processed, stability, state_file) + did = await watch_tick( + settings, processed, stability, final_stability, state_file + ) maybe_warn_idle_watch( did_work=did, log_tag="measure-watch", diff --git a/fish_api/app/services/sessions.py b/fish_api/app/services/sessions.py index b155d66..c75009c 100644 --- a/fish_api/app/services/sessions.py +++ b/fish_api/app/services/sessions.py @@ -2,11 +2,12 @@ from __future__ import annotations import uuid from pathlib import Path +from typing import Tuple PARTIAL_NAME = "upload.partial" -def new_session_dir(base: Path) -> tuple[str, Path]: +def new_session_dir(base: Path) -> Tuple[str, Path]: session_id = uuid.uuid4().hex d = base / session_id d.mkdir(parents=True, exist_ok=True) diff --git a/fish_api/app/services/sonar_video.py b/fish_api/app/services/sonar_video.py index de04b6e..3512185 100644 --- a/fish_api/app/services/sonar_video.py +++ b/fish_api/app/services/sonar_video.py @@ -10,8 +10,12 @@ import asyncio import shutil from pathlib import Path +from typing import Optional, Tuple + from loguru import logger +from app.compat import to_thread + from app.services.action_watch import iter_mp4 from app.services.measure import transcode_src_to_h264_dst from app.settings import Settings @@ -21,7 +25,7 @@ _publish_lock = asyncio.Lock() DEFAULT_CLIENT_ID = "default" # 源路径 + mtime,用于跳过重复转码 -_last_src_key: tuple[str, float] | None = None +_last_src_key = None # type: Optional[Tuple[str, float]] _cached_public_url: str = "" @@ -37,7 +41,7 @@ def _safe_sonar_media_basename(raw: str) -> str: return Path(n).name or "biomass_sonar.mp4" -def resolve_sonar_video_source(settings: Settings) -> Path | None: +def resolve_sonar_video_source(settings: Settings) -> Optional[Path]: """优先 BIOMASS_SONAR_VIDEO_SOURCE;否则在 BIOMASS_SONAR_VIDEO_DIR 中取 mtime 最新的 .mp4。""" cfg = settings.biomass_sonar_video_source if cfg is not None: @@ -71,13 +75,13 @@ async def _publish_video( tmp.unlink(missing_ok=True) try: - ok = await asyncio.to_thread(transcode_src_to_h264_dst, src, tmp) + ok = await to_thread(transcode_src_to_h264_dst, src, tmp) if ok and tmp.is_file() and tmp.stat().st_size > 0: tmp.replace(dst) logger.info("[sonar-video] published H.264: {} -> {}", src.name, dst.name) else: tmp.unlink(missing_ok=True) - await asyncio.to_thread(shutil.copy2, src, dst) + await to_thread(shutil.copy2, src, dst) logger.warning( "[sonar-video] transcode failed, copied raw: {} -> {}", src.name, diff --git a/fish_api/app/services/video_slice.py b/fish_api/app/services/video_slice.py index d7b8b34..a51596f 100644 --- a/fish_api/app/services/video_slice.py +++ b/fish_api/app/services/video_slice.py @@ -5,16 +5,17 @@ from __future__ import annotations import subprocess import tempfile from pathlib import Path -from typing import List, Tuple +from typing import List, Optional, Tuple from loguru import logger def _get_ffmpeg_path() -> str: - """获取可用的 ffmpeg 路径。""" - project_ffmpeg = Path("/home/ubuntu/projects/FishServer/tools/ffmpeg/bin/ffmpeg") - if project_ffmpeg.is_file(): - return str(project_ffmpeg) + """获取可用的 ffmpeg 路径。优先使用 .env 中 FFMPEG_PATH 配置。""" + from app.settings import get_settings + configured = get_settings().ffmpeg_path.strip() + if configured and Path(configured).is_file(): + return configured system_paths = ["/usr/bin/ffmpeg", "/usr/local/bin/ffmpeg"] for path in system_paths: if Path(path).is_file(): @@ -23,11 +24,15 @@ def _get_ffmpeg_path() -> str: def _get_ffprobe_path() -> str: - """获取可用的 ffprobe 路径。""" + """获取可用的 ffprobe 路径。从 ffmpeg 同目录推导,回退到系统 PATH。""" ffmpeg_path = Path(_get_ffmpeg_path()) - ffprobe = ffmpeg_path.parent / "ffprobe" - if ffprobe.is_file(): - return str(ffprobe) + if ffmpeg_path.parent.name: + sibling = ffmpeg_path.parent / "ffprobe" + if sibling.is_file(): + return str(sibling) + for p in ["/usr/bin/ffprobe", "/usr/local/bin/ffprobe"]: + if Path(p).is_file(): + return p return "ffprobe" @@ -58,7 +63,7 @@ def get_video_duration(video_path: Path) -> float: def slice_video( video_path: Path, slice_duration: float = 10.0, - output_dir: Path | None = None, + output_dir: Optional[Path] = None, ) -> Tuple[List[Path], Path]: """将视频按固定时长切分为多个片段。 diff --git a/fish_api/app/services/water_video.py b/fish_api/app/services/water_video.py index 0c2e55d..e6232fa 100644 --- a/fish_api/app/services/water_video.py +++ b/fish_api/app/services/water_video.py @@ -12,10 +12,12 @@ from __future__ import annotations import asyncio import shutil from pathlib import Path -from typing import Dict, List +from typing import Dict, List, Optional from loguru import logger +from app.compat import to_thread + from app.services.action_watch import iter_mp4 from app.services.measure import transcode_src_to_h264_dst from app.services.video_slice import get_video_duration, slice_video @@ -60,7 +62,7 @@ _global_slice_urls: List[str] = [] _last_source_mtime: float = 0.0 -def resolve_water_video_source(settings: Settings) -> Path | None: +def resolve_water_video_source(settings: Settings) -> Optional[Path]: """优先 BIOMASS_WATER_VIDEO_SOURCE;否则取 ACTION_WATCH_DIR 中 mtime 最新的 .mp4。""" cfg = settings.biomass_water_video_source if cfg is not None: @@ -103,13 +105,13 @@ async def _publish_video( tmp.unlink(missing_ok=True) try: - ok = await asyncio.to_thread(transcode_src_to_h264_dst, src, tmp) + ok = await to_thread(transcode_src_to_h264_dst, src, tmp) if ok and tmp.is_file() and tmp.stat().st_size > 0: tmp.replace(dst) logger.info("[water-video] published H.264: {} -> {}", src.name, dst.name) else: tmp.unlink(missing_ok=True) - await asyncio.to_thread(shutil.copy2, src, dst) + await to_thread(shutil.copy2, src, dst) logger.warning( "[water-video] transcode failed, copied raw: {} -> {}", src.name, diff --git a/fish_api/app/services/zed_recording_control.py b/fish_api/app/services/zed_recording_control.py new file mode 100644 index 0000000..214c9a5 --- /dev/null +++ b/fish_api/app/services/zed_recording_control.py @@ -0,0 +1,132 @@ +"""ZED 分段录制线程的启动/停止(供 lifespan 与 HTTP API 共用)。""" + +from __future__ import annotations + +import threading +from typing import Optional, Tuple + +from loguru import logger + +from app.db import ( + begin_zed_recording_session, + mark_zed_recording_session_stopped, +) +from app.services.zed_svo_record import run_zed_svo_record_loop +from app.settings import Settings +from app.state import app_state + + +def _cleanup_stale_zed_thread() -> None: + t = app_state.zed_recording_thread + if t is not None and not t.is_alive(): + sid = app_state.zed_recording_session_row_id + app_state.zed_recording_thread = None + app_state.zed_recording_stop_event = None + if sid is not None: + try: + from app.settings import get_settings + + mark_zed_recording_session_stopped(get_settings(), sid) + except Exception as e: + logger.warning("[zed-svo] 线程已退出但写库失败: {}", e) + app_state.zed_recording_session_row_id = None + app_state.zed_recording_fish_id = None + + +def start_zed_recording( + settings: Settings, + *, + segment_sec: Optional[float] = None, +) -> Tuple[bool, str, Optional[int], Optional[int]]: + """启动后台录制线程;已在运行则 ``(False, 'already_running', None, None)``。 + + 每次启动在 SQLite ``zed_recording_sessions`` 中分配新 ``fish_id`` 并写入输出目录。 + + 成功时返回 ``(True, 'started', fish_id, session_row_id)``。 + """ + with app_state.zed_recording_lock: + _cleanup_stale_zed_thread() + if ( + app_state.zed_recording_thread is not None + and app_state.zed_recording_thread.is_alive() + ): + return False, "already_running", None, None + + try: + session_row_id, fish_id, resolved = begin_zed_recording_session(settings) + except Exception as e: + logger.exception("[zed-svo] 分配会话失败") + return False, f"session_db_error:{e}", None, None + + stop = threading.Event() + + def _run() -> None: + run_zed_svo_record_loop( + settings, + stop, + output_dir=resolved, + segment_sec=segment_sec, + ) + + th = threading.Thread( + target=_run, + name="zed-svo-record", + daemon=True, + ) + app_state.zed_recording_stop_event = stop + app_state.zed_recording_thread = th + app_state.zed_recording_session_row_id = session_row_id + app_state.zed_recording_fish_id = fish_id + th.start() + logger.info( + "[zed-svo] 录制线程已启动 session_id={} fish_id={} output_dir={}", + session_row_id, + fish_id, + resolved, + ) + return True, "started", fish_id, session_row_id + + +def stop_zed_recording( + settings: Settings, timeout_sec: float = 30.0 +) -> Tuple[bool, str, Optional[int]]: + """请求停止并 ``join`` 录制线程;成功停止后更新数据库并返回 ``fish_id``。""" + session_row_id: Optional[int] = None + with app_state.zed_recording_lock: + _cleanup_stale_zed_thread() + if app_state.zed_recording_thread is None: + return False, "not_running", None + if not app_state.zed_recording_thread.is_alive(): + app_state.zed_recording_thread = None + app_state.zed_recording_stop_event = None + app_state.zed_recording_session_row_id = None + app_state.zed_recording_fish_id = None + return False, "not_running", None + + session_row_id = app_state.zed_recording_session_row_id + ev = app_state.zed_recording_stop_event + th = app_state.zed_recording_thread + if ev is not None: + ev.set() + if th is not None: + th.join(timeout=timeout_sec) + if th.is_alive(): + logger.warning("[zed-svo] stop join 超时,线程仍在运行") + return False, "stop_timeout", None + + app_state.zed_recording_stop_event = None + app_state.zed_recording_thread = None + app_state.zed_recording_session_row_id = None + app_state.zed_recording_fish_id = None + logger.info("[zed-svo] 录制线程已停止") + + fish_id: Optional[int] = None + if session_row_id is not None: + fish_id = mark_zed_recording_session_stopped(settings, session_row_id) + return True, "stopped", fish_id + + +def zed_recording_is_running() -> bool: + _cleanup_stale_zed_thread() + t = app_state.zed_recording_thread + return t is not None and t.is_alive() diff --git a/fish_api/app/services/zed_svo_record.py b/fish_api/app/services/zed_svo_record.py new file mode 100644 index 0000000..b8febd6 --- /dev/null +++ b/fish_api/app/services/zed_svo_record.py @@ -0,0 +1,109 @@ +"""后台线程:单台 ZED 分段录制 .svo2(可选依赖 pyzed)。""" + +from __future__ import annotations + +import time +from datetime import datetime, timezone +from pathlib import Path +from threading import Event +from typing import Optional + +from loguru import logger + +from app.settings import Settings + + +def _try_import_sl(): + try: + import pyzed.sl as sl + + return sl + except ImportError: + return None + + +def run_zed_svo_record_loop( + settings: Settings, + stop_event: Event, + *, + output_dir: Optional[Path] = None, + segment_sec: Optional[float] = None, +) -> None: + """阻塞运行直到 ``stop_event`` 置位;每段按 ``segment_sec`` 或配置轮换输出文件。 + + ``output_dir`` / ``segment_sec`` 非空时覆盖 ``settings`` 中对应逻辑。 + """ + sl = _try_import_sl() + if sl is None: + logger.warning( + "[zed-svo] pyzed 不可用,跳过 ZED 分段录制(请安装 ZED SDK / pyzed)。" + ) + return + + out_dir = output_dir if output_dir is not None else settings.zed_svo_record_dir + if out_dir is None: + logger.error("[zed-svo] zed_svo_record_dir 未解析,跳过录制") + return + + out_dir = out_dir.expanduser().resolve() + out_dir.mkdir(parents=True, exist_ok=True) + + zed = sl.Camera() + init = sl.InitParameters() + init.camera_resolution = sl.RESOLUTION.HD720 + init.camera_fps = 30 + init.depth_mode = sl.DEPTH_MODE.ULTRA + init.coordinate_units = sl.UNIT.MILLIMETER + if settings.zed_serial_number is not None: + init.set_from_serial_number(settings.zed_serial_number) + + err_open = zed.open(init) + if err_open != sl.ERROR_CODE.SUCCESS: + logger.error("[zed-svo] 打开相机失败: {}", repr(err_open)) + return + + runtime = sl.RuntimeParameters() + seg_sec = float( + segment_sec if segment_sec is not None else settings.zed_svo_segment_sec + ) + segment_index = 0 + + try: + while not stop_event.is_set(): + ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") + out_path = out_dir / f"zed_{ts}_{segment_index:06d}.svo2" + segment_index += 1 + + rec = sl.RecordingParameters( + str(out_path.resolve()), + sl.SVO_COMPRESSION_MODE.H264, + ) + err_rec = zed.enable_recording(rec) + if err_rec != sl.ERROR_CODE.SUCCESS: + logger.error( + "[zed-svo] enable_recording 失败: {} path={}", + repr(err_rec), + out_path, + ) + return + + logger.info("[zed-svo] 开始写入 {}", out_path.name) + + deadline = time.monotonic() + seg_sec + while time.monotonic() < deadline and not stop_event.is_set(): + err = zed.grab(runtime) + if err != sl.ERROR_CODE.SUCCESS: + if err == sl.ERROR_CODE.END_OF_SVOFILE_REACHED: + break + logger.warning("[zed-svo] grab 非成功: {}", repr(err)) + + zed.disable_recording() + logger.info("[zed-svo] 结束一段 {}", out_path.name) + + finally: + try: + zed.disable_recording() + except Exception as e: + logger.debug("[zed-svo] disable_recording (finally): {}", e) + zed.close() + logger.info("[zed-svo] 相机已关闭") diff --git a/fish_api/app/settings.py b/fish_api/app/settings.py index 696ab28..d133d3a 100644 --- a/fish_api/app/settings.py +++ b/fish_api/app/settings.py @@ -56,6 +56,11 @@ class Settings(BaseSettings): default="http://127.0.0.1:8000", validation_alias=AliasChoices("PUBLIC_BASE_URL", "public_base_url"), ) + #: ZED 录制 CLI(``fish-zed-record``)等访问本机 API 时的基址;未设时与 ``public_base_url`` 相同。**FISH_API_BASE_URL** + fish_api_base_url: str = Field( + default="", + validation_alias=AliasChoices("FISH_API_BASE_URL", "fish_api_base_url"), + ) ingest_api_key: str = "" @@ -84,6 +89,9 @@ class Settings(BaseSettings): python_fish_measure: str = "" python_fish_action: str = "" + #: ffmpeg 可执行文件路径;为空时按顺序尝试 tools/ffmpeg/bin/ffmpeg → 系统 PATH。**FFMPEG_PATH** + ffmpeg_path: str = "" + #: SAM/CUDA 设备(cuda 或 cpu) sam_device: str = "cuda" @@ -261,15 +269,57 @@ class Settings(BaseSettings): measure_watch_recursive: bool = False #: 状态管理:true=持久化到 SQLite(重启后记住),false=内存模式(重启后清空) measure_watch_use_state_file: bool = True + #: 齐套后对各段 former 体重/体长聚合方式:``median``、``mean``、``trimmed_mean``(至少 3 段时去头尾再均值)。**MEASURE_FINAL_AGGREGATE_MODE** + measure_final_aggregate_mode: str = Field( + default="median", + validation_alias=AliasChoices( + "MEASURE_FINAL_AGGREGATE_MODE", "measure_final_aggregate_mode" + ), + ) + + #: 分段 SVO2 输出目录;未设时:有 MEASURE_WATCH_DIR 则为 ``{MEASURE_WATCH_DIR}/fish{N}``(N 见 zed_svo_record_fish_id),否则为 ``{STREAM_TMP_DIR}/zed_svo2``。**ZED_SVO_RECORD_DIR** + zed_svo_record_dir: Optional[Path] = Field( + default=None, + validation_alias=AliasChoices("ZED_SVO_RECORD_DIR", "zed_svo_record_dir"), + ) + #: 每段时长(秒),默认 300(5 分钟)。**ZED_SVO_SEGMENT_SEC** + zed_svo_segment_sec: float = Field( + default=300.0, + ge=1.0, + validation_alias=AliasChoices("ZED_SVO_SEGMENT_SEC", "zed_svo_segment_sec"), + ) + #: 仅连接一台相机时可选;指定序列号打开对应设备。**ZED_SERIAL_NUMBER** + zed_serial_number: Optional[int] = Field( + default=None, + validation_alias=AliasChoices("ZED_SERIAL_NUMBER", "zed_serial_number"), + ) + #: 与 MEASURE_WATCH_DIR 组合为 ``fish{N}``(默认 1)。**ZED_SVO_RECORD_FISH_ID** + zed_svo_record_fish_id: int = Field( + default=1, + ge=1, + validation_alias=AliasChoices( + "ZED_SVO_RECORD_FISH_ID", "zed_svo_record_fish_id" + ), + ) default_fish_species: str = "大黄鱼" + @field_validator("zed_serial_number", mode="before") + @classmethod + def _zed_serial_empty_none(cls, v: object) -> object: + if v is None: + return None + if isinstance(v, str) and not v.strip(): + return None + return v + @field_validator( "action_watch_dir", "biomass_water_video_source", "biomass_sonar_video_source", "biomass_sonar_video_dir", "measure_watch_dir", + "zed_svo_record_dir", mode="before", ) @classmethod @@ -300,6 +350,25 @@ class Settings(BaseSettings): / "checkpoints" / "best_model.pth", ) + if self.zed_svo_record_dir is None: + if self.measure_watch_dir is not None: + object.__setattr__( + self, + "zed_svo_record_dir", + ( + self.measure_watch_dir / f"fish{self.zed_svo_record_fish_id}" + ).resolve(), + ) + else: + object.__setattr__( + self, + "zed_svo_record_dir", + (self.stream_tmp_dir / "zed_svo2").resolve(), + ) + else: + object.__setattr__( + self, "zed_svo_record_dir", self.zed_svo_record_dir.expanduser().resolve() + ) return self diff --git a/fish_api/app/state.py b/fish_api/app/state.py index 46b724b..346ff97 100644 --- a/fish_api/app/state.py +++ b/fish_api/app/state.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import threading from dataclasses import dataclass, field from datetime import datetime from typing import List, Optional @@ -15,7 +16,7 @@ class MeasureSnapshot: error: Optional[str] = None raw_prediction_path: Optional[str] = None pred: Optional[float] = None # 最终预测的体重值 - star: bool = False # 置信度标记,True 表示计算可信 + star: bool = False # DB 是唯一真相源:segment/batch 恒 False;final 行按参与聚合段的 dgcnn_summary 规则判定 #: 与 FishMeasure ``test_dgcnn_weight_estimator.py`` 终端输出一致的体重推算过程文本 calculation_log: Optional[str] = None @@ -32,11 +33,21 @@ class HealthSnapshot: @dataclass class AppState: measure_lock: asyncio.Lock = field(default_factory=asyncio.Lock) + #: 与 ``run_full_measure`` 子进程串行化(跨 ``measure_watch`` / ingest 线程) + measure_thread_lock: threading.Lock = field(default_factory=threading.Lock) action_lock: asyncio.Lock = field(default_factory=asyncio.Lock) # job status for optional polling(业务结果见 SQLite) measure_status: str = "idle" action_status: str = "idle" + #: ZED 分段录制线程与协作停止事件(由 ``zed_recording_control`` 管理) + zed_recording_lock: threading.Lock = field(default_factory=threading.Lock) + zed_recording_stop_event: Optional[threading.Event] = None + zed_recording_thread: Optional[threading.Thread] = None + #: 当前会话(与 ``zed_recording_sessions`` 表对应) + zed_recording_session_row_id: Optional[int] = None + zed_recording_fish_id: Optional[int] = None + app_state = AppState() diff --git a/fish_api/app/subprocess_run.py b/fish_api/app/subprocess_run.py index aa298a3..156ade6 100644 --- a/fish_api/app/subprocess_run.py +++ b/fish_api/app/subprocess_run.py @@ -29,6 +29,8 @@ def run_subprocess_with_log( stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, + encoding="utf-8", + errors="replace", bufsize=1, ) lines: List[str] = [] diff --git a/fish_api/app/zed_record_cli.py b/fish_api/app/zed_record_cli.py new file mode 100644 index 0000000..70676c8 --- /dev/null +++ b/fish_api/app/zed_record_cli.py @@ -0,0 +1,150 @@ +"""命令行:ZED 分段录制。 + +- ``start``(默认):本进程内 ``start_zed_recording``,前台阻塞直到 Ctrl+C 再 ``stop``(不经过 HTTP)。 +- ``start --remote``:``POST`` 已运行的 fish_api,不阻塞。 +- ``stop`` / ``status``:仅 HTTP,用于停/查 **由 uvicorn 进程托管** 的录制。 + +HTTP 基址:环境变量 ``FISH_API_BASE_URL`` 或 ``PUBLIC_BASE_URL``,否则 ``http://127.0.0.1:8000``。 +ZED 路由不校验 ingest API Key,请求无需 ``X-API-Key``。 + +每次启动由服务端分配 ``fish_id``(库表与 ``output_dir``、目标父目录下 ``fish``+数字 及 ``.svo2`` 路径综合)并写入 ``zed_recording_sessions``。 +""" + +from __future__ import annotations + +import argparse +import json +import os +import sys +import time +import urllib.error +import urllib.request +from typing import Any, Dict, Optional + + +def _base_url() -> str: + """优先环境变量;否则从 ``fish_api/.env`` 经 ``get_settings()`` 读 ``FISH_API_BASE_URL`` / ``PUBLIC_BASE_URL``。""" + u = os.environ.get("FISH_API_BASE_URL") or os.environ.get("PUBLIC_BASE_URL") + if u: + return str(u).strip().rstrip("/") + from app.settings import get_settings + + s = get_settings() + u2 = (s.fish_api_base_url or "").strip() or (s.public_base_url or "").strip() + if u2: + return u2.rstrip("/") + return "http://127.0.0.1:8000" + + +def _http_request_json(method: str, path: str, body: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: + url = _base_url() + path + if method == "GET": + req = urllib.request.Request(url, method="GET") + elif method == "POST": + payload = json.dumps(body if body is not None else {}).encode("utf-8") + req = urllib.request.Request( + url, + data=payload, + headers={"Content-Type": "application/json; charset=utf-8"}, + method="POST", + ) + else: + raise ValueError(method) + + try: + with urllib.request.urlopen(req, timeout=120) as resp: + raw = resp.read().decode("utf-8") + if not raw.strip(): + return {} + return json.loads(raw) + except urllib.error.HTTPError as e: + err_body = e.read().decode("utf-8", errors="replace") + print(f"HTTP {e.code}: {err_body}", file=sys.stderr) + raise SystemExit(1) from None + except urllib.error.URLError as e: + print(f"请求失败: {e}", file=sys.stderr) + raise SystemExit(1) from None + + +def _cmd_start_remote(args: argparse.Namespace) -> None: + body: Dict[str, Any] = {} + if args.segment_sec is not None: + body["segment_sec"] = args.segment_sec + r = _http_request_json("POST", "/api/v1/zed/recording/start", body) + print(json.dumps(r, ensure_ascii=False, indent=2)) + + +def _cmd_start_local(args: argparse.Namespace) -> None: + from app.services.zed_recording_control import ( + start_zed_recording, + stop_zed_recording, + ) + from app.settings import get_settings + + s = get_settings() + ok, msg, fish_id, _ = start_zed_recording( + s, + segment_sec=args.segment_sec, + ) + if not ok: + print(f"启动失败: {msg}", file=sys.stderr) + raise SystemExit(1) + + print( + f"fish_id={fish_id},录制中,按 Ctrl+C 停止…", + flush=True, + ) + try: + while True: + time.sleep(1.0) + except KeyboardInterrupt: + print("\n正在停止…", flush=True) + stop_ok, stop_msg, stop_fish = stop_zed_recording(s) + if stop_ok: + print(f"已退出。fish_id={stop_fish}", flush=True) + else: + print(f"停止结果: {stop_msg}", file=sys.stderr) + + +def _cmd_stop(_args: argparse.Namespace) -> None: + r = _http_request_json("POST", "/api/v1/zed/recording/stop", {}) + print(json.dumps(r, ensure_ascii=False, indent=2)) + + +def _cmd_status(_args: argparse.Namespace) -> None: + r = _http_request_json("GET", "/api/v1/zed/recording/status", None) + print(json.dumps(r, ensure_ascii=False, indent=2)) + + +def main() -> None: + parser = argparse.ArgumentParser( + description="ZED 分段录制:本地阻塞 start,或 HTTP 调用 fish_api", + ) + sub = parser.add_subparsers(dest="cmd", required=True) + + p_start = sub.add_parser("start", help="本地启动(阻塞至 Ctrl+C)或 --remote 仅请求 API") + p_start.add_argument( + "--remote", + action="store_true", + help="通过 HTTP 调用 fish_api,不阻塞本进程", + ) + p_start.add_argument("--segment-sec", type=float, default=None, metavar="SEC") + + sub.add_parser("stop", help="HTTP 停止 fish_api 进程中的录制") + sub.add_parser("status", help="HTTP 查询 fish_api 中的录制状态") + + args = parser.parse_args() + + if args.cmd == "start": + if args.remote: + _cmd_start_remote(args) + else: + _cmd_start_local(args) + elif args.cmd == "stop": + _cmd_stop(args) + elif args.cmd == "status": + _cmd_status(args) + + +if __name__ == "__main__": + main() diff --git a/fish_api/pyproject.toml b/fish_api/pyproject.toml index 85d19a2..2de57e5 100644 --- a/fish_api/pyproject.toml +++ b/fish_api/pyproject.toml @@ -17,3 +17,4 @@ dev = ["httpx>=0.28.1"] [project.scripts] fish-action-watch = "app.action_watch_cli:main" fish-measure-debug = "app.measure_debug_cli:main" +fish-zed-record = "app.zed_record_cli:main" diff --git a/fish_api/start_fresh.sh b/fish_api/start_fresh.sh old mode 100755 new mode 100644 diff --git a/start_recording.sh b/start_recording.sh new file mode 100755 index 0000000..6a82f78 --- /dev/null +++ b/start_recording.sh @@ -0,0 +1,19 @@ +#!/usr/bin/env bash +# 独立运行:与 fish_api/uvicorn 的启动、停止无关。调用 ZED 录制 CLI:本地前台录制(Ctrl+C 停),或传 --remote 仅请求已运行的 fish_api。 +# 用法(在仓库根目录): +# ./start_recording.sh +# ./start_recording.sh --remote +# ./start_recording.sh --segment-sec 300 +# 依赖:fish_api 已 pip install -e .;直连相机时需 pyzed。 +set -euo pipefail + +ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +cd "$ROOT/fish_api" + +if command -v uv >/dev/null 2>&1; then + PY=(uv run python) +else + PY=(python3) +fi + +exec "${PY[@]}" -m app.zed_record_cli start "$@" diff --git a/stop_recording.sh b/stop_recording.sh new file mode 100644 index 0000000..87809f3 --- /dev/null +++ b/stop_recording.sh @@ -0,0 +1,16 @@ +#!/usr/bin/env bash +# 与 fish_api/uvicorn 的启动、停止无关;仅向已运行的 fish_api 发 HTTP stop(适用于曾用 /api/v1/zed/recording/start 或 start_recording.sh --remote 启录的情形)。 +# 基址:环境变量 FISH_API_BASE_URL 或 PUBLIC_BASE_URL(见 fish_api/.env)。 +# 用法(在仓库根目录):./stop_recording.sh +set -euo pipefail + +ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +cd "$ROOT/fish_api" + +if command -v uv >/dev/null 2>&1; then + PY=(uv run python) +else + PY=(python3) +fi + +exec "${PY[@]}" -m app.zed_record_cli stop "$@"