feat(fish_api): SQLite 快照投递、日志与 watch 空闲告警

- 新增 SQLite:measure/health 快照、delivery_cursor 单消费者 pop;clear/start_fresh 可清空库
- biomass GET 仅返回约定 data 字段,X-Fish-Biomass-New 表示是否有新快照;poller 读响应头
- loguru 桥接 uvicorn,子进程 stdout 流式输出;format_json_pretty 与算法摘要日志
- measure/action watch 无新任务时限流 WARNING;watch_idle 共用逻辑
- 依赖 loguru;新增 db、logging_config、subprocess_run、watch_idle、启动脚本

FishMeasure: 更新 fish_video_weight_evaluation 与 predict_weigth_from_svo2;移除未用 refbox/segmentation 脚本
Made-with: Cursor
This commit is contained in:
zaiun xu
2026-04-09 11:54:30 +08:00
parent db181d4f84
commit 5e1b2117c1
29 changed files with 1464 additions and 1714 deletions

View File

@@ -1,99 +0,0 @@
## Fish Body Segmentation (YOLOv8-seg)
This folder provides a quick pipeline to train a **body-only** fish segmentation model using **Labelme polygon annotations**.
The goal is to produce a mask that **excludes fins and tail** (as much as possible), so the depth->pointcloud becomes cleaner for weight estimation.
### 1) Labeling in Labelme
- Use `Labelme` polygon tool.
- Recommended class name: `body` (you can use other names; see `--classes` below).
- Each image produces a `.json` annotation file.
### 2) Convert Labelme JSON -> YOLOv8-seg dataset
This will create a YOLO dataset folder:
```
<out_dir>/
images/train, images/val, images/test
labels/train, labels/val, labels/test
dataset.yaml
```
Example:
```bash
python3 segmentation/prepare_yolo_seg_dataset.py \
--source_dir /path/to/labelme_export \
--out_dir ./datasets/fish_body_seg \
--classes body \
--train_ratio 0.8 --val_ratio 0.1 --test_ratio 0.1 \
--seed 42 \
--copy
```
Notes:
- YOLOv8-seg label format is: `<class_id> x1 y1 x2 y2 ... xn yn` (all normalized to [0,1]).
- If an image has no valid polygons, an empty label file will be written (you can change this later if desired).
### 2b) Filter existing prepared dataset (if only some images are labeled)
If you already have a prepared YOLO-seg dataset but only some images have labels, use this mode to filter and keep only labeled images:
```bash
python3 segmentation/prepare_yolo_seg_dataset.py \
--prepared_dataset /home/ubuntu/data/fish/fish_measure_intermediates/yolo_seg \
--out_dir ./datasets/fish_body_seg_filtered \
--classes body \
--copy
```
This will:
- Scan `images/train/`, `images/val/`, `images/test/` for images
- Check for corresponding `.txt` label files in `labels/train/`, `labels/val/`, `labels/test/`
- Only copy/symlink images that have labels
- Generate a clean `dataset.yaml` for training
### 3) Visualize labels (optional - verify conversion correctness)
Before training, you can visualize the converted labels to verify they're correct:
```bash
python3 segmentation/visualize_yolo_seg_labels.py \
--dataset ./datasets/fish_body_seg_filtered \
--output ./visualizations/yolo_seg_labels \
--split train \
--max_images 50 \
--classes fishbody \
--alpha 0.5
```
This will:
- Load images and their corresponding `.txt` label files
- Draw polygon masks (semi-transparent overlay) on images
- Save visualized images to the output directory
- Useful for checking that Labelme → YOLO conversion preserved polygon shapes correctly
### 4) Train YOLOv8 segmentation
```bash
python3 segmentation/train_yolo_seg.py \
--data ./datasets/fish_body_seg/dataset.yaml \
--model yolov8s-seg.pt \
--epochs 200 \
--batch 16 \
--imgsz 640 \
--project runs/seg \
--name fish_body_seg_$(date +%Y%m%d_%H%M%S)
```
Outputs:
- `runs/seg/<name>/weights/best.pt`
### 5) Next step (pipeline integration)
After training, you can run segmentation on:
- full image, or
- **cropped image from detector bbox** (often better when fish is small in the frame).

View File

@@ -1,6 +0,0 @@
"""
Segmentation training/inference utilities.
Currently focused on YOLOv8-seg (Ultralytics) and Labelme polygon annotations.
"""

View File

@@ -1,490 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Prepare a YOLOv8-seg dataset from Labelme JSON polygon annotations OR filter an existing prepared dataset.
Mode 1: Convert from Labelme JSONs
Input (Labelme):
- one JSON per image
- JSON contains: imagePath (recommended), imageHeight, imageWidth, shapes[]
- each shape is a polygon with: label, points[[x,y],...], shape_type="polygon"
Example:
python3 segmentation/prepare_yolo_seg_dataset.py \
--source_dir /data/labelme \
--out_dir ./datasets/fish_body_seg \
--classes body \
--train_ratio 0.8 --val_ratio 0.1 --test_ratio 0.1 \
--seed 42 --copy
Mode 2: Filter existing prepared dataset (keep only images with labels)
Input: Existing YOLO-seg dataset with images/ and labels/ folders
- Only images that have corresponding .txt label files are kept
- Useful when only some images in the dataset are labeled
Example:
python3 segmentation/prepare_yolo_seg_dataset.py \
--prepared_dataset /home/ubuntu/data/fish/fish_measure_intermediates/yolo_seg \
--out_dir ./datasets/fish_body_seg_filtered \
--classes body \
--copy
Output (Ultralytics YOLO segmentation dataset):
<out_dir>/
images/{train,val,test}/xxx.jpg
labels/{train,val,test}/xxx.txt
dataset.yaml
Label format (YOLOv8-seg):
<class_id> <x1> <y1> <x2> <y2> ... <xn> <yn>
where coordinates are normalized to [0,1] by (x/img_w, y/img_h).
"""
from __future__ import annotations
import argparse
import base64
import json
import os
import random
import shutil
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import cv2
import numpy as np
IMG_EXTS = {".jpg", ".jpeg", ".png", ".bmp", ".tif", ".tiff"}
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="Prepare YOLOv8-seg dataset from Labelme JSONs or filter existing prepared dataset")
mode = p.add_mutually_exclusive_group(required=True)
mode.add_argument(
"--source_dir",
type=str,
default="",
help="Folder containing Labelme JSONs (and images) - use this for Labelme conversion mode",
)
mode.add_argument(
"--prepared_dataset",
type=str,
default="",
help="Path to existing prepared YOLO-seg dataset (images/ and labels/ folders) - use this to filter/validate existing dataset",
)
p.add_argument("--out_dir", type=str, required=True, help="Output dataset directory")
p.add_argument("--train_ratio", type=float, default=0.8)
p.add_argument("--val_ratio", type=float, default=0.1)
p.add_argument("--test_ratio", type=float, default=0.1)
p.add_argument("--seed", type=int, default=42)
# classes
p.add_argument(
"--classes",
type=str,
default="body",
help="Comma-separated class names, e.g. 'body' or 'body,fin,tail' (order defines class_id)",
)
p.add_argument(
"--allow_unknown_labels",
action="store_true",
help="If set, unknown labels will be ignored (default behavior is also ignore).",
)
# image placing
g = p.add_mutually_exclusive_group()
g.add_argument("--copy", action="store_true", help="Copy images into output dataset")
g.add_argument("--symlink", action="store_true", help="Symlink images into output dataset")
# default: hardlink
p.add_argument(
"--skip_non_polygon",
action="store_true",
default=True,
help="Ignore non-polygon shapes (default: True)",
)
p.add_argument(
"--drop_empty",
action="store_true",
help="Drop images with no valid polygons (default: keep with empty label file)",
)
return p.parse_args()
def ensure_dirs(root: Path) -> None:
for sub in [
"images/train",
"images/val",
"images/test",
"labels/train",
"labels/val",
"labels/test",
]:
(root / sub).mkdir(parents=True, exist_ok=True)
def place_image(src: Path, dst: Path, mode: str) -> None:
dst.parent.mkdir(parents=True, exist_ok=True)
if mode == "copy":
shutil.copy2(src, dst)
elif mode == "symlink":
if dst.exists():
dst.unlink()
os.symlink(src, dst)
else: # hardlink
if dst.exists():
dst.unlink()
try:
os.link(src, dst)
except OSError:
shutil.copy2(src, dst)
def write_label(label_path: Path, lines: List[str]) -> None:
label_path.parent.mkdir(parents=True, exist_ok=True)
label_path.write_text("\n".join(lines) + ("\n" if lines else ""), encoding="utf-8")
def _load_image_size(image_path: Path) -> Optional[Tuple[int, int]]:
img = cv2.imread(str(image_path), cv2.IMREAD_UNCHANGED)
if img is None:
return None
h, w = img.shape[:2]
if w <= 0 or h <= 0:
return None
return int(w), int(h)
def _decode_labelme_image_data(image_data_b64: str) -> Optional[np.ndarray]:
try:
raw = base64.b64decode(image_data_b64.encode("utf-8"))
arr = np.frombuffer(raw, dtype=np.uint8)
img = cv2.imdecode(arr, cv2.IMREAD_COLOR)
return img
except Exception:
return None
def resolve_labelme_image_path(source_dir: Path, json_path: Path, meta: Dict) -> Optional[Path]:
# 1) preferred: imagePath from JSON
image_path = meta.get("imagePath", "") or ""
if image_path:
p = (json_path.parent / image_path).resolve() if not os.path.isabs(image_path) else Path(image_path)
if p.exists() and p.suffix.lower() in IMG_EXTS:
return p
# sometimes imagePath has only basename but image lives elsewhere under source_dir
b = Path(image_path).name
found = list(source_dir.rglob(b))
for fp in found:
if fp.exists() and fp.suffix.lower() in IMG_EXTS:
return fp
# 2) fallback: same stem with common image extensions next to json
for ext in sorted(IMG_EXTS):
p = json_path.with_suffix(ext)
if p.exists():
return p
return None
def _normalize_polygon(points_xy: List[List[float]], w: int, h: int) -> Optional[List[Tuple[float, float]]]:
if w <= 0 or h <= 0:
return None
if not points_xy or len(points_xy) < 3:
return None
pts: List[Tuple[float, float]] = []
for p in points_xy:
if not isinstance(p, (list, tuple)) or len(p) != 2:
continue
x, y = float(p[0]), float(p[1])
xn = x / float(w)
yn = y / float(h)
# clip (labelme can slightly exceed bounds)
xn = 0.0 if xn < 0.0 else (1.0 if xn > 1.0 else xn)
yn = 0.0 if yn < 0.0 else (1.0 if yn > 1.0 else yn)
pts.append((xn, yn))
# remove duplicated last==first (optional)
if len(pts) >= 4 and pts[0] == pts[-1]:
pts = pts[:-1]
# ensure at least 3 unique points
uniq = list(dict.fromkeys(pts))
if len(uniq) < 3:
return None
return pts
def generate_yaml(out_dir: Path, names: List[str]) -> None:
yaml_path = out_dir / "dataset.yaml"
content = (
f"path: {out_dir.resolve()}\n"
f"train: images/train\n"
f"val: images/val\n"
f"test: images/test\n"
f"names: {names}\n"
)
yaml_path.write_text(content, encoding="utf-8")
print(f"[OK] wrote: {yaml_path}")
def is_labelme_json(meta: Dict) -> bool:
return isinstance(meta.get("shapes", None), list)
def find_labeled_images_in_prepared_dataset(prepared_dir: Path) -> Dict[str, List[Tuple[Path, Path]]]:
"""
Scan a prepared YOLO-seg dataset and return only images that have corresponding label files.
Supports both .txt (YOLO format) and .json (Labelme format) label files.
Returns: {"train": [(img_path, label_path), ...], "val": [...], "test": [...]}
"""
prepared_dir = prepared_dir.expanduser().resolve()
if not prepared_dir.exists():
raise SystemExit(f"prepared_dataset not found: {prepared_dir}")
result: Dict[str, List[Tuple[Path, Path]]] = {"train": [], "val": [], "test": []}
for split in ["train", "val", "test"]:
img_dir = prepared_dir / "images" / split
lbl_dir = prepared_dir / "labels" / split
if not img_dir.exists():
print(f"[info] {split}: images directory not found: {img_dir}")
continue
if not lbl_dir.exists():
print(f"[info] {split}: labels directory not found: {lbl_dir}")
continue
# find all images
img_count = 0
lbl_count = 0
for img_path in img_dir.iterdir():
if img_path.suffix.lower() not in IMG_EXTS:
continue
img_count += 1
# check for corresponding label (.txt or .json)
lbl_path_txt = lbl_dir / f"{img_path.stem}.txt"
lbl_path_json = lbl_dir / f"{img_path.stem}.json"
if lbl_path_txt.exists():
result[split].append((img_path, lbl_path_txt))
lbl_count += 1
elif lbl_path_json.exists():
result[split].append((img_path, lbl_path_json))
lbl_count += 1
# else: image has no label, skip it
print(f"[info] {split}: found {img_count} images, {lbl_count} with labels")
return result
def process_prepared_dataset(prepared_dir: Path, out_dir: Path, place_mode: str, classes: List[str]) -> None:
"""Filter and copy/symlink only labeled images from a prepared dataset."""
labeled = find_labeled_images_in_prepared_dataset(prepared_dir)
ensure_dirs(out_dir)
total_kept = 0
for split in ["train", "val", "test"]:
items = labeled.get(split, [])
print(f"{split}: {len(items)} images with labels")
for img_src, lbl_src in items:
dst_img = out_dir / f"images/{split}/{img_src.name}"
# If source label is .json, convert to .txt format; otherwise keep as-is
if lbl_src.suffix.lower() == ".json":
# Convert Labelme JSON to YOLO .txt format
try:
meta = json.loads(lbl_src.read_text(encoding="utf-8"))
img_w = int(meta.get("imageWidth", 0) or 0)
img_h = int(meta.get("imageHeight", 0) or 0)
if img_w <= 0 or img_h <= 0:
# Try to load from image
wh = _load_image_size(img_src)
if wh is None:
print(f"[warn] cannot determine size for {img_src.name}, skipping")
continue
img_w, img_h = wh
lines: List[str] = []
name2id = {n: i for i, n in enumerate(classes)}
for sh in meta.get("shapes", []):
label = (sh.get("label", "") or "").strip()
if label not in name2id:
continue
shape_type = (sh.get("shape_type", "polygon") or "polygon").lower()
if shape_type != "polygon":
continue
pts = _normalize_polygon(sh.get("points", []), w=img_w, h=img_h)
if pts is None:
continue
cls_id = name2id[label]
flat = " ".join([f"{x:.6f} {y:.6f}" for x, y in pts])
lines.append(f"{cls_id} {flat}")
dst_lbl = out_dir / f"labels/{split}/{img_src.stem}.txt"
write_label(dst_lbl, lines)
except Exception as e:
print(f"[warn] failed to convert {lbl_src.name}: {e}")
continue
else:
# Already .txt format, just copy
dst_lbl = out_dir / f"labels/{split}/{lbl_src.name}"
try:
shutil.copy2(lbl_src, dst_lbl)
except Exception as e:
print(f"[warn] failed to copy label {lbl_src.name}: {e}")
continue
try:
place_image(img_src, dst_img, place_mode)
total_kept += 1
except Exception as e:
print(f"[warn] failed to place {img_src.name}: {e}")
generate_yaml(out_dir, classes)
print(f"[done] kept={total_kept} labeled images out={out_dir}")
def main() -> None:
args = parse_args()
random.seed(args.seed)
out_dir = Path(args.out_dir).expanduser().resolve()
classes = [c.strip() for c in (args.classes or "").split(",") if c.strip()]
if not classes:
raise SystemExit("No classes provided. Example: --classes body")
# Mode 1: Process prepared dataset (filter to only labeled images)
if args.prepared_dataset:
prepared_dir = Path(args.prepared_dataset).expanduser().resolve()
place_mode = "copy" if args.copy else ("symlink" if args.symlink else "hardlink")
process_prepared_dataset(prepared_dir, out_dir, place_mode, classes)
return
# Mode 2: Convert from Labelme JSONs (original behavior)
source_dir = Path(args.source_dir).expanduser().resolve()
if not source_dir.exists():
raise SystemExit(f"source_dir not found: {source_dir}")
name2id = {n: i for i, n in enumerate(classes)}
json_files = sorted(source_dir.rglob("*.json"))
if not json_files:
raise SystemExit(f"No .json found under: {source_dir}")
items: List[Tuple[Path, Path, Dict]] = []
bad = 0
for jp in json_files:
try:
meta = json.loads(jp.read_text(encoding="utf-8"))
except Exception:
bad += 1
continue
if not is_labelme_json(meta):
continue
img_path = resolve_labelme_image_path(source_dir, jp, meta)
if img_path is None:
# allow imageData-only workflows: decode and write next to json
if meta.get("imageData", None):
img = _decode_labelme_image_data(meta["imageData"])
if img is not None:
# choose png
img_path = jp.with_suffix(".png")
cv2.imwrite(str(img_path), img)
else:
bad += 1
continue
else:
bad += 1
continue
items.append((jp, img_path, meta))
if not items:
raise SystemExit(f"No valid Labelme JSON found under: {source_dir} (bad_json={bad})")
# split
idx = list(range(len(items)))
random.shuffle(idx)
n = len(idx)
n_train = int(n * args.train_ratio)
n_val = int(n * args.val_ratio)
n_test = n - n_train - n_val
train_set = set(idx[:n_train])
val_set = set(idx[n_train : n_train + n_val])
test_set = set(idx[n_train + n_val :])
print(f"total={n} train={len(train_set)} val={len(val_set)} test={len(test_set)} bad_json={bad}")
ensure_dirs(out_dir)
place_mode = "copy" if args.copy else ("symlink" if args.symlink else "hardlink")
kept = 0
dropped_empty = 0
for i, (json_path, img_path, meta) in enumerate(items):
if i in train_set:
split = "train"
elif i in val_set:
split = "val"
else:
split = "test"
# size
w = int(meta.get("imageWidth", 0) or 0)
h = int(meta.get("imageHeight", 0) or 0)
if w <= 0 or h <= 0:
wh = _load_image_size(img_path)
if wh is None:
continue
w, h = wh
# shapes -> yolo seg lines
lines: List[str] = []
for sh in meta.get("shapes", []):
label = (sh.get("label", "") or "").strip()
if label not in name2id:
# ignore unknown labels
continue
shape_type = (sh.get("shape_type", "polygon") or "polygon").lower()
if args.skip_non_polygon and shape_type != "polygon":
continue
pts = _normalize_polygon(sh.get("points", []), w=w, h=h)
if pts is None:
continue
cls_id = name2id[label]
flat = " ".join([f"{x:.6f} {y:.6f}" for x, y in pts])
lines.append(f"{cls_id} {flat}")
if args.drop_empty and not lines:
dropped_empty += 1
continue
dst_img = out_dir / f"images/{split}/{img_path.name}"
dst_lbl = out_dir / f"labels/{split}/{img_path.with_suffix('.txt').name}"
try:
place_image(img_path, dst_img, place_mode)
except Exception:
continue
write_label(dst_lbl, lines)
kept += 1
generate_yaml(out_dir, classes)
print(f"[done] kept={kept} dropped_empty={dropped_empty} out={out_dir}")
if __name__ == "__main__":
main()

View File

@@ -1,151 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Ultralytics YOLOv8 segmentation training script.
Example (using filtered dataset):
python3 segmentation/train_yolo_seg.py \
--data ./datasets/fish_body_seg_filtered/dataset.yaml \
--model yolo26s-seg.pt \
--epochs 100 \
--batch 16 \
--imgsz 640 \
--project runs/seg \
--name fish_body_seg_$(date +%Y%m%d_%H%M%S)
Example (with more options):
python3 segmentation/train_yolo_seg.py \
--data ./datasets/fish_body_seg_filtered/dataset.yaml \
--model yolov8s-seg.pt \
--epochs 300 \
--batch 32 \
--imgsz 640 \
--device 0 \
--workers 8 \
--patience 50 \
--pretrained \
--cache \
--project runs/seg \
--name fish_body_seg_yolov8s_$(date +%Y%m%d_%H%M%S)
Dependency:
pip install ultralytics
"""
from __future__ import annotations
import argparse
import os
import sys
from datetime import datetime
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="Ultralytics YOLOv8-seg training")
p.add_argument(
"--data",
type=str,
default="./datasets/fish_body_seg_filtered/dataset.yaml",
help="dataset.yaml path (default: ./datasets/fish_body_seg_filtered/dataset.yaml)",
)
p.add_argument(
"--model",
type=str,
default="yolo26l-seg.pt",
help="model weights/arch, e.g. yolov8n-seg.pt/yolov8s-seg.pt or your .pt",
)
p.add_argument("--epochs", type=int, default=100)
p.add_argument("--batch", type=int, default=16)
p.add_argument("--imgsz", type=int, default=640)
p.add_argument("--device", type=str, default="", help="CUDA device like '0' or '0,1'. Empty=auto")
p.add_argument("--project", type=str, default="runs/seg", help="output project dir")
p.add_argument("--name", type=str, default="", help="run name (default: model + timestamp)")
p.add_argument("--workers", type=int, default=8)
p.add_argument("--patience", type=int, default=50)
p.add_argument("--lr0", type=float, default=0.01)
p.add_argument("--pretrained", action="store_true", help="use pretrained weights")
p.add_argument("--cache", action="store_true")
p.add_argument("--seed", type=int, default=0)
p.add_argument("--exist-ok", action="store_true")
p.add_argument("--resume", action="store_true")
p.add_argument("--export", action="store_true", help="export ONNX/TorchScript after training")
return p.parse_args()
def main() -> None:
args = parse_args()
try:
from ultralytics import YOLO
except Exception as e:
print("[error] ultralytics not found. Install with: pip install ultralytics")
print(f"details: {e}")
sys.exit(1)
if not os.path.exists(args.data):
print(f"[error] dataset yaml not found: {args.data}")
sys.exit(1)
if not args.name:
model_stem = os.path.splitext(os.path.basename(args.model))[0]
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
args.name = f"{model_stem}_{timestamp}"
os.makedirs(args.project, exist_ok=True)
print("======== YOLOv8-seg Train ========")
print(f"data : {args.data}")
print(f"model : {args.model}")
print(f"epochs : {args.epochs}")
print(f"batch : {args.batch}")
print(f"imgsz : {args.imgsz}")
print(f"device : {args.device or 'auto'}")
print(f"project : {args.project}")
print(f"name : {args.name}")
print("=================================")
model = YOLO(args.model)
model.train(
data=args.data,
epochs=args.epochs,
imgsz=args.imgsz,
batch=args.batch,
device=args.device if args.device else None,
project=args.project,
name=args.name,
pretrained=args.pretrained,
cache=args.cache,
workers=args.workers,
patience=args.patience,
lr0=args.lr0,
seed=args.seed,
exist_ok=args.exist_ok,
resume=args.resume,
verbose=True,
)
save_dir = os.path.join(args.project, args.name)
best_pt = os.path.join(save_dir, "weights", "best.pt")
last_pt = os.path.join(save_dir, "weights", "last.pt")
print("\n======== Train done ========")
print(f"save_dir : {save_dir}")
if os.path.exists(best_pt):
print(f"best.pt : {best_pt}")
if os.path.exists(last_pt):
print(f"last.pt : {last_pt}")
if args.export and os.path.exists(best_pt):
try:
exp = YOLO(best_pt)
onnx_path = exp.export(format="onnx", imgsz=args.imgsz)
ts_path = exp.export(format="torchscript", imgsz=args.imgsz)
print(f"export onnx : {onnx_path}")
print(f"export torchscript: {ts_path}")
except Exception as e:
print(f"[warn] export failed: {e}")
if __name__ == "__main__":
main()

View File

@@ -1,215 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Visualize YOLOv8-seg labels by drawing polygon masks on images.
This helps verify that label conversion (e.g., from Labelme JSON to YOLO .txt) is correct.
Example:
python3 segmentation/visualize_yolo_seg_labels.py \
--dataset ./datasets/fish_body_seg_filtered \
--output ./visualizations/yolo_seg_labels \
--max_images 50 \
--split train
"""
from __future__ import annotations
import argparse
import random
from pathlib import Path
from typing import List, Tuple
import cv2
import numpy as np
def parse_yolo_seg_label(label_path: Path, img_w: int, img_h: int) -> List[np.ndarray]:
"""
Parse YOLO segmentation label file.
Returns list of polygons (each as Nx2 numpy array in pixel coordinates).
"""
if not label_path.exists():
return []
polygons: List[np.ndarray] = []
lines = label_path.read_text(encoding="utf-8").strip().split("\n")
for line in lines:
line = line.strip()
if not line:
continue
parts = line.split()
if len(parts) < 7: # class_id + at least 3 points (x,y pairs)
continue
try:
_class_id = int(parts[0])
coords = [float(x) for x in parts[1:]]
if len(coords) % 2 != 0:
continue
# Convert normalized [0,1] to pixel coordinates
points = []
for i in range(0, len(coords), 2):
x_norm = coords[i]
y_norm = coords[i + 1]
x_px = int(x_norm * img_w)
y_px = int(y_norm * img_h)
points.append([x_px, y_px])
if len(points) >= 3:
polygons.append(np.array(points, dtype=np.int32))
except (ValueError, IndexError):
continue
return polygons
def draw_polygons_on_image(
img: np.ndarray, polygons: List[np.ndarray], class_colors: List[Tuple[int, int, int]], alpha: float = 0.5
) -> np.ndarray:
"""
Draw polygons as semi-transparent masks on image.
Returns a new image with overlays.
"""
overlay = img.copy()
mask = np.zeros(img.shape[:2], dtype=np.uint8)
for i, poly in enumerate(polygons):
color_idx = i % len(class_colors)
color = class_colors[color_idx]
cv2.fillPoly(mask, [poly], 255)
cv2.fillPoly(overlay, [poly], color)
# Also draw outline
cv2.polylines(overlay, [poly], isClosed=True, color=color, thickness=2)
# Blend overlay with original
result = cv2.addWeighted(overlay, alpha, img, 1.0 - alpha, 0)
return result
def visualize_dataset(
dataset_dir: Path,
output_dir: Path,
split: str = "train",
max_images: int = 50,
class_names: List[str] = None,
alpha: float = 0.5,
) -> None:
"""
Visualize YOLO segmentation labels on images.
"""
dataset_dir = dataset_dir.expanduser().resolve()
if not dataset_dir.exists():
raise SystemExit(f"dataset_dir not found: {dataset_dir}")
img_dir = dataset_dir / "images" / split
lbl_dir = dataset_dir / "labels" / split
if not img_dir.exists():
raise SystemExit(f"images directory not found: {img_dir}")
if not lbl_dir.exists():
raise SystemExit(f"labels directory not found: {lbl_dir}")
# Collect image-label pairs
pairs: List[Tuple[Path, Path]] = []
for img_path in sorted(img_dir.iterdir()):
if img_path.suffix.lower() not in {".jpg", ".jpeg", ".png", ".bmp"}:
continue
lbl_path = lbl_dir / f"{img_path.stem}.txt"
if lbl_path.exists():
pairs.append((img_path, lbl_path))
if not pairs:
raise SystemExit(f"No image-label pairs found in {split} split")
# Limit number of images
if max_images > 0 and len(pairs) > max_images:
random.seed(42)
pairs = random.sample(pairs, max_images)
print(f"Visualizing {len(pairs)} images from {split} split...")
# Generate colors for classes (BGR format for OpenCV)
if class_names is None:
class_names = ["class0", "class1", "class2"]
colors = [
(0, 255, 0), # green
(255, 0, 0), # blue
(0, 0, 255), # red
(255, 255, 0), # cyan
(255, 0, 255), # magenta
(0, 255, 255), # yellow
]
class_colors = colors[: len(class_names)]
output_dir = output_dir.expanduser().resolve()
output_dir.mkdir(parents=True, exist_ok=True)
for img_path, lbl_path in pairs:
# Load image
img = cv2.imread(str(img_path))
if img is None:
print(f"[warn] failed to load: {img_path}")
continue
h, w = img.shape[:2]
# Parse labels
polygons = parse_yolo_seg_label(lbl_path, w, h)
if not polygons:
print(f"[warn] no polygons found in: {lbl_path}")
# Still save the original image for reference
out_path = output_dir / f"{img_path.stem}_no_labels{img_path.suffix}"
cv2.imwrite(str(out_path), img)
continue
# Draw polygons
vis_img = draw_polygons_on_image(img, polygons, class_colors, alpha=alpha)
# Save visualization
out_path = output_dir / f"{img_path.stem}_vis{img_path.suffix}"
cv2.imwrite(str(out_path), vis_img)
print(f"[done] saved {len(pairs)} visualizations to: {output_dir}")
def main() -> None:
parser = argparse.ArgumentParser(description="Visualize YOLOv8-seg labels on images")
parser.add_argument("--dataset", type=str, required=True, help="Path to YOLO-seg dataset directory")
parser.add_argument("--output", type=str, required=True, help="Output directory for visualizations")
parser.add_argument(
"--split", type=str, default="train", choices=["train", "val", "test"], help="Dataset split to visualize"
)
parser.add_argument("--max_images", type=int, default=50, help="Maximum number of images to visualize (0=all)")
parser.add_argument(
"--classes",
type=str,
default="fishbody",
help="Comma-separated class names (for color assignment, e.g. 'fishbody' or 'body,fin,tail')",
)
parser.add_argument(
"--alpha", type=float, default=0.5, help="Transparency of mask overlay (0.0=transparent, 1.0=opaque)"
)
args = parser.parse_args()
class_names = [c.strip() for c in (args.classes or "").split(",") if c.strip()]
if not class_names:
class_names = ["class0"]
visualize_dataset(
dataset_dir=Path(args.dataset),
output_dir=Path(args.output),
split=args.split,
max_images=args.max_images,
class_names=class_names,
alpha=args.alpha,
)
if __name__ == "__main__":
main()