Files
FishServer/FishMeasure/segmentation/prepare_yolo_seg_dataset.py

491 lines
17 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Prepare a YOLOv8-seg dataset from Labelme JSON polygon annotations OR filter an existing prepared dataset.
Mode 1: Convert from Labelme JSONs
Input (Labelme):
- one JSON per image
- JSON contains: imagePath (recommended), imageHeight, imageWidth, shapes[]
- each shape is a polygon with: label, points[[x,y],...], shape_type="polygon"
Example:
python3 segmentation/prepare_yolo_seg_dataset.py \
--source_dir /data/labelme \
--out_dir ./datasets/fish_body_seg \
--classes body \
--train_ratio 0.8 --val_ratio 0.1 --test_ratio 0.1 \
--seed 42 --copy
Mode 2: Filter existing prepared dataset (keep only images with labels)
Input: Existing YOLO-seg dataset with images/ and labels/ folders
- Only images that have corresponding .txt label files are kept
- Useful when only some images in the dataset are labeled
Example:
python3 segmentation/prepare_yolo_seg_dataset.py \
--prepared_dataset /home/ubuntu/data/fish/fish_measure_intermediates/yolo_seg \
--out_dir ./datasets/fish_body_seg_filtered \
--classes body \
--copy
Output (Ultralytics YOLO segmentation dataset):
<out_dir>/
images/{train,val,test}/xxx.jpg
labels/{train,val,test}/xxx.txt
dataset.yaml
Label format (YOLOv8-seg):
<class_id> <x1> <y1> <x2> <y2> ... <xn> <yn>
where coordinates are normalized to [0,1] by (x/img_w, y/img_h).
"""
from __future__ import annotations
import argparse
import base64
import json
import os
import random
import shutil
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import cv2
import numpy as np
IMG_EXTS = {".jpg", ".jpeg", ".png", ".bmp", ".tif", ".tiff"}
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="Prepare YOLOv8-seg dataset from Labelme JSONs or filter existing prepared dataset")
mode = p.add_mutually_exclusive_group(required=True)
mode.add_argument(
"--source_dir",
type=str,
default="",
help="Folder containing Labelme JSONs (and images) - use this for Labelme conversion mode",
)
mode.add_argument(
"--prepared_dataset",
type=str,
default="",
help="Path to existing prepared YOLO-seg dataset (images/ and labels/ folders) - use this to filter/validate existing dataset",
)
p.add_argument("--out_dir", type=str, required=True, help="Output dataset directory")
p.add_argument("--train_ratio", type=float, default=0.8)
p.add_argument("--val_ratio", type=float, default=0.1)
p.add_argument("--test_ratio", type=float, default=0.1)
p.add_argument("--seed", type=int, default=42)
# classes
p.add_argument(
"--classes",
type=str,
default="body",
help="Comma-separated class names, e.g. 'body' or 'body,fin,tail' (order defines class_id)",
)
p.add_argument(
"--allow_unknown_labels",
action="store_true",
help="If set, unknown labels will be ignored (default behavior is also ignore).",
)
# image placing
g = p.add_mutually_exclusive_group()
g.add_argument("--copy", action="store_true", help="Copy images into output dataset")
g.add_argument("--symlink", action="store_true", help="Symlink images into output dataset")
# default: hardlink
p.add_argument(
"--skip_non_polygon",
action="store_true",
default=True,
help="Ignore non-polygon shapes (default: True)",
)
p.add_argument(
"--drop_empty",
action="store_true",
help="Drop images with no valid polygons (default: keep with empty label file)",
)
return p.parse_args()
def ensure_dirs(root: Path) -> None:
for sub in [
"images/train",
"images/val",
"images/test",
"labels/train",
"labels/val",
"labels/test",
]:
(root / sub).mkdir(parents=True, exist_ok=True)
def place_image(src: Path, dst: Path, mode: str) -> None:
dst.parent.mkdir(parents=True, exist_ok=True)
if mode == "copy":
shutil.copy2(src, dst)
elif mode == "symlink":
if dst.exists():
dst.unlink()
os.symlink(src, dst)
else: # hardlink
if dst.exists():
dst.unlink()
try:
os.link(src, dst)
except OSError:
shutil.copy2(src, dst)
def write_label(label_path: Path, lines: List[str]) -> None:
label_path.parent.mkdir(parents=True, exist_ok=True)
label_path.write_text("\n".join(lines) + ("\n" if lines else ""), encoding="utf-8")
def _load_image_size(image_path: Path) -> Optional[Tuple[int, int]]:
img = cv2.imread(str(image_path), cv2.IMREAD_UNCHANGED)
if img is None:
return None
h, w = img.shape[:2]
if w <= 0 or h <= 0:
return None
return int(w), int(h)
def _decode_labelme_image_data(image_data_b64: str) -> Optional[np.ndarray]:
try:
raw = base64.b64decode(image_data_b64.encode("utf-8"))
arr = np.frombuffer(raw, dtype=np.uint8)
img = cv2.imdecode(arr, cv2.IMREAD_COLOR)
return img
except Exception:
return None
def resolve_labelme_image_path(source_dir: Path, json_path: Path, meta: Dict) -> Optional[Path]:
# 1) preferred: imagePath from JSON
image_path = meta.get("imagePath", "") or ""
if image_path:
p = (json_path.parent / image_path).resolve() if not os.path.isabs(image_path) else Path(image_path)
if p.exists() and p.suffix.lower() in IMG_EXTS:
return p
# sometimes imagePath has only basename but image lives elsewhere under source_dir
b = Path(image_path).name
found = list(source_dir.rglob(b))
for fp in found:
if fp.exists() and fp.suffix.lower() in IMG_EXTS:
return fp
# 2) fallback: same stem with common image extensions next to json
for ext in sorted(IMG_EXTS):
p = json_path.with_suffix(ext)
if p.exists():
return p
return None
def _normalize_polygon(points_xy: List[List[float]], w: int, h: int) -> Optional[List[Tuple[float, float]]]:
if w <= 0 or h <= 0:
return None
if not points_xy or len(points_xy) < 3:
return None
pts: List[Tuple[float, float]] = []
for p in points_xy:
if not isinstance(p, (list, tuple)) or len(p) != 2:
continue
x, y = float(p[0]), float(p[1])
xn = x / float(w)
yn = y / float(h)
# clip (labelme can slightly exceed bounds)
xn = 0.0 if xn < 0.0 else (1.0 if xn > 1.0 else xn)
yn = 0.0 if yn < 0.0 else (1.0 if yn > 1.0 else yn)
pts.append((xn, yn))
# remove duplicated last==first (optional)
if len(pts) >= 4 and pts[0] == pts[-1]:
pts = pts[:-1]
# ensure at least 3 unique points
uniq = list(dict.fromkeys(pts))
if len(uniq) < 3:
return None
return pts
def generate_yaml(out_dir: Path, names: List[str]) -> None:
yaml_path = out_dir / "dataset.yaml"
content = (
f"path: {out_dir.resolve()}\n"
f"train: images/train\n"
f"val: images/val\n"
f"test: images/test\n"
f"names: {names}\n"
)
yaml_path.write_text(content, encoding="utf-8")
print(f"[OK] wrote: {yaml_path}")
def is_labelme_json(meta: Dict) -> bool:
return isinstance(meta.get("shapes", None), list)
def find_labeled_images_in_prepared_dataset(prepared_dir: Path) -> Dict[str, List[Tuple[Path, Path]]]:
"""
Scan a prepared YOLO-seg dataset and return only images that have corresponding label files.
Supports both .txt (YOLO format) and .json (Labelme format) label files.
Returns: {"train": [(img_path, label_path), ...], "val": [...], "test": [...]}
"""
prepared_dir = prepared_dir.expanduser().resolve()
if not prepared_dir.exists():
raise SystemExit(f"prepared_dataset not found: {prepared_dir}")
result: Dict[str, List[Tuple[Path, Path]]] = {"train": [], "val": [], "test": []}
for split in ["train", "val", "test"]:
img_dir = prepared_dir / "images" / split
lbl_dir = prepared_dir / "labels" / split
if not img_dir.exists():
print(f"[info] {split}: images directory not found: {img_dir}")
continue
if not lbl_dir.exists():
print(f"[info] {split}: labels directory not found: {lbl_dir}")
continue
# find all images
img_count = 0
lbl_count = 0
for img_path in img_dir.iterdir():
if img_path.suffix.lower() not in IMG_EXTS:
continue
img_count += 1
# check for corresponding label (.txt or .json)
lbl_path_txt = lbl_dir / f"{img_path.stem}.txt"
lbl_path_json = lbl_dir / f"{img_path.stem}.json"
if lbl_path_txt.exists():
result[split].append((img_path, lbl_path_txt))
lbl_count += 1
elif lbl_path_json.exists():
result[split].append((img_path, lbl_path_json))
lbl_count += 1
# else: image has no label, skip it
print(f"[info] {split}: found {img_count} images, {lbl_count} with labels")
return result
def process_prepared_dataset(prepared_dir: Path, out_dir: Path, place_mode: str, classes: List[str]) -> None:
"""Filter and copy/symlink only labeled images from a prepared dataset."""
labeled = find_labeled_images_in_prepared_dataset(prepared_dir)
ensure_dirs(out_dir)
total_kept = 0
for split in ["train", "val", "test"]:
items = labeled.get(split, [])
print(f"{split}: {len(items)} images with labels")
for img_src, lbl_src in items:
dst_img = out_dir / f"images/{split}/{img_src.name}"
# If source label is .json, convert to .txt format; otherwise keep as-is
if lbl_src.suffix.lower() == ".json":
# Convert Labelme JSON to YOLO .txt format
try:
meta = json.loads(lbl_src.read_text(encoding="utf-8"))
img_w = int(meta.get("imageWidth", 0) or 0)
img_h = int(meta.get("imageHeight", 0) or 0)
if img_w <= 0 or img_h <= 0:
# Try to load from image
wh = _load_image_size(img_src)
if wh is None:
print(f"[warn] cannot determine size for {img_src.name}, skipping")
continue
img_w, img_h = wh
lines: List[str] = []
name2id = {n: i for i, n in enumerate(classes)}
for sh in meta.get("shapes", []):
label = (sh.get("label", "") or "").strip()
if label not in name2id:
continue
shape_type = (sh.get("shape_type", "polygon") or "polygon").lower()
if shape_type != "polygon":
continue
pts = _normalize_polygon(sh.get("points", []), w=img_w, h=img_h)
if pts is None:
continue
cls_id = name2id[label]
flat = " ".join([f"{x:.6f} {y:.6f}" for x, y in pts])
lines.append(f"{cls_id} {flat}")
dst_lbl = out_dir / f"labels/{split}/{img_src.stem}.txt"
write_label(dst_lbl, lines)
except Exception as e:
print(f"[warn] failed to convert {lbl_src.name}: {e}")
continue
else:
# Already .txt format, just copy
dst_lbl = out_dir / f"labels/{split}/{lbl_src.name}"
try:
shutil.copy2(lbl_src, dst_lbl)
except Exception as e:
print(f"[warn] failed to copy label {lbl_src.name}: {e}")
continue
try:
place_image(img_src, dst_img, place_mode)
total_kept += 1
except Exception as e:
print(f"[warn] failed to place {img_src.name}: {e}")
generate_yaml(out_dir, classes)
print(f"[done] kept={total_kept} labeled images out={out_dir}")
def main() -> None:
args = parse_args()
random.seed(args.seed)
out_dir = Path(args.out_dir).expanduser().resolve()
classes = [c.strip() for c in (args.classes or "").split(",") if c.strip()]
if not classes:
raise SystemExit("No classes provided. Example: --classes body")
# Mode 1: Process prepared dataset (filter to only labeled images)
if args.prepared_dataset:
prepared_dir = Path(args.prepared_dataset).expanduser().resolve()
place_mode = "copy" if args.copy else ("symlink" if args.symlink else "hardlink")
process_prepared_dataset(prepared_dir, out_dir, place_mode, classes)
return
# Mode 2: Convert from Labelme JSONs (original behavior)
source_dir = Path(args.source_dir).expanduser().resolve()
if not source_dir.exists():
raise SystemExit(f"source_dir not found: {source_dir}")
name2id = {n: i for i, n in enumerate(classes)}
json_files = sorted(source_dir.rglob("*.json"))
if not json_files:
raise SystemExit(f"No .json found under: {source_dir}")
items: List[Tuple[Path, Path, Dict]] = []
bad = 0
for jp in json_files:
try:
meta = json.loads(jp.read_text(encoding="utf-8"))
except Exception:
bad += 1
continue
if not is_labelme_json(meta):
continue
img_path = resolve_labelme_image_path(source_dir, jp, meta)
if img_path is None:
# allow imageData-only workflows: decode and write next to json
if meta.get("imageData", None):
img = _decode_labelme_image_data(meta["imageData"])
if img is not None:
# choose png
img_path = jp.with_suffix(".png")
cv2.imwrite(str(img_path), img)
else:
bad += 1
continue
else:
bad += 1
continue
items.append((jp, img_path, meta))
if not items:
raise SystemExit(f"No valid Labelme JSON found under: {source_dir} (bad_json={bad})")
# split
idx = list(range(len(items)))
random.shuffle(idx)
n = len(idx)
n_train = int(n * args.train_ratio)
n_val = int(n * args.val_ratio)
n_test = n - n_train - n_val
train_set = set(idx[:n_train])
val_set = set(idx[n_train : n_train + n_val])
test_set = set(idx[n_train + n_val :])
print(f"total={n} train={len(train_set)} val={len(val_set)} test={len(test_set)} bad_json={bad}")
ensure_dirs(out_dir)
place_mode = "copy" if args.copy else ("symlink" if args.symlink else "hardlink")
kept = 0
dropped_empty = 0
for i, (json_path, img_path, meta) in enumerate(items):
if i in train_set:
split = "train"
elif i in val_set:
split = "val"
else:
split = "test"
# size
w = int(meta.get("imageWidth", 0) or 0)
h = int(meta.get("imageHeight", 0) or 0)
if w <= 0 or h <= 0:
wh = _load_image_size(img_path)
if wh is None:
continue
w, h = wh
# shapes -> yolo seg lines
lines: List[str] = []
for sh in meta.get("shapes", []):
label = (sh.get("label", "") or "").strip()
if label not in name2id:
# ignore unknown labels
continue
shape_type = (sh.get("shape_type", "polygon") or "polygon").lower()
if args.skip_non_polygon and shape_type != "polygon":
continue
pts = _normalize_polygon(sh.get("points", []), w=w, h=h)
if pts is None:
continue
cls_id = name2id[label]
flat = " ".join([f"{x:.6f} {y:.6f}" for x, y in pts])
lines.append(f"{cls_id} {flat}")
if args.drop_empty and not lines:
dropped_empty += 1
continue
dst_img = out_dir / f"images/{split}/{img_path.name}"
dst_lbl = out_dir / f"labels/{split}/{img_path.with_suffix('.txt').name}"
try:
place_image(img_path, dst_img, place_mode)
except Exception:
continue
write_label(dst_lbl, lines)
kept += 1
generate_yaml(out_dir, classes)
print(f"[done] kept={kept} dropped_empty={dropped_empty} out={out_dir}")
if __name__ == "__main__":
main()