491 lines
17 KiB
Python
491 lines
17 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
"""
|
|
Prepare a YOLOv8-seg dataset from Labelme JSON polygon annotations OR filter an existing prepared dataset.
|
|
|
|
Mode 1: Convert from Labelme JSONs
|
|
Input (Labelme):
|
|
- one JSON per image
|
|
- JSON contains: imagePath (recommended), imageHeight, imageWidth, shapes[]
|
|
- each shape is a polygon with: label, points[[x,y],...], shape_type="polygon"
|
|
|
|
Example:
|
|
python3 segmentation/prepare_yolo_seg_dataset.py \
|
|
--source_dir /data/labelme \
|
|
--out_dir ./datasets/fish_body_seg \
|
|
--classes body \
|
|
--train_ratio 0.8 --val_ratio 0.1 --test_ratio 0.1 \
|
|
--seed 42 --copy
|
|
|
|
Mode 2: Filter existing prepared dataset (keep only images with labels)
|
|
Input: Existing YOLO-seg dataset with images/ and labels/ folders
|
|
- Only images that have corresponding .txt label files are kept
|
|
- Useful when only some images in the dataset are labeled
|
|
|
|
Example:
|
|
python3 segmentation/prepare_yolo_seg_dataset.py \
|
|
--prepared_dataset /home/ubuntu/data/fish/fish_measure_intermediates/yolo_seg \
|
|
--out_dir ./datasets/fish_body_seg_filtered \
|
|
--classes body \
|
|
--copy
|
|
|
|
Output (Ultralytics YOLO segmentation dataset):
|
|
<out_dir>/
|
|
images/{train,val,test}/xxx.jpg
|
|
labels/{train,val,test}/xxx.txt
|
|
dataset.yaml
|
|
|
|
Label format (YOLOv8-seg):
|
|
<class_id> <x1> <y1> <x2> <y2> ... <xn> <yn>
|
|
where coordinates are normalized to [0,1] by (x/img_w, y/img_h).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import base64
|
|
import json
|
|
import os
|
|
import random
|
|
import shutil
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Tuple
|
|
|
|
import cv2
|
|
import numpy as np
|
|
|
|
IMG_EXTS = {".jpg", ".jpeg", ".png", ".bmp", ".tif", ".tiff"}
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
p = argparse.ArgumentParser(description="Prepare YOLOv8-seg dataset from Labelme JSONs or filter existing prepared dataset")
|
|
mode = p.add_mutually_exclusive_group(required=True)
|
|
mode.add_argument(
|
|
"--source_dir",
|
|
type=str,
|
|
default="",
|
|
help="Folder containing Labelme JSONs (and images) - use this for Labelme conversion mode",
|
|
)
|
|
mode.add_argument(
|
|
"--prepared_dataset",
|
|
type=str,
|
|
default="",
|
|
help="Path to existing prepared YOLO-seg dataset (images/ and labels/ folders) - use this to filter/validate existing dataset",
|
|
)
|
|
p.add_argument("--out_dir", type=str, required=True, help="Output dataset directory")
|
|
p.add_argument("--train_ratio", type=float, default=0.8)
|
|
p.add_argument("--val_ratio", type=float, default=0.1)
|
|
p.add_argument("--test_ratio", type=float, default=0.1)
|
|
p.add_argument("--seed", type=int, default=42)
|
|
|
|
# classes
|
|
p.add_argument(
|
|
"--classes",
|
|
type=str,
|
|
default="body",
|
|
help="Comma-separated class names, e.g. 'body' or 'body,fin,tail' (order defines class_id)",
|
|
)
|
|
p.add_argument(
|
|
"--allow_unknown_labels",
|
|
action="store_true",
|
|
help="If set, unknown labels will be ignored (default behavior is also ignore).",
|
|
)
|
|
|
|
# image placing
|
|
g = p.add_mutually_exclusive_group()
|
|
g.add_argument("--copy", action="store_true", help="Copy images into output dataset")
|
|
g.add_argument("--symlink", action="store_true", help="Symlink images into output dataset")
|
|
# default: hardlink
|
|
|
|
p.add_argument(
|
|
"--skip_non_polygon",
|
|
action="store_true",
|
|
default=True,
|
|
help="Ignore non-polygon shapes (default: True)",
|
|
)
|
|
p.add_argument(
|
|
"--drop_empty",
|
|
action="store_true",
|
|
help="Drop images with no valid polygons (default: keep with empty label file)",
|
|
)
|
|
return p.parse_args()
|
|
|
|
|
|
def ensure_dirs(root: Path) -> None:
|
|
for sub in [
|
|
"images/train",
|
|
"images/val",
|
|
"images/test",
|
|
"labels/train",
|
|
"labels/val",
|
|
"labels/test",
|
|
]:
|
|
(root / sub).mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
def place_image(src: Path, dst: Path, mode: str) -> None:
|
|
dst.parent.mkdir(parents=True, exist_ok=True)
|
|
if mode == "copy":
|
|
shutil.copy2(src, dst)
|
|
elif mode == "symlink":
|
|
if dst.exists():
|
|
dst.unlink()
|
|
os.symlink(src, dst)
|
|
else: # hardlink
|
|
if dst.exists():
|
|
dst.unlink()
|
|
try:
|
|
os.link(src, dst)
|
|
except OSError:
|
|
shutil.copy2(src, dst)
|
|
|
|
|
|
def write_label(label_path: Path, lines: List[str]) -> None:
|
|
label_path.parent.mkdir(parents=True, exist_ok=True)
|
|
label_path.write_text("\n".join(lines) + ("\n" if lines else ""), encoding="utf-8")
|
|
|
|
|
|
def _load_image_size(image_path: Path) -> Optional[Tuple[int, int]]:
|
|
img = cv2.imread(str(image_path), cv2.IMREAD_UNCHANGED)
|
|
if img is None:
|
|
return None
|
|
h, w = img.shape[:2]
|
|
if w <= 0 or h <= 0:
|
|
return None
|
|
return int(w), int(h)
|
|
|
|
|
|
def _decode_labelme_image_data(image_data_b64: str) -> Optional[np.ndarray]:
|
|
try:
|
|
raw = base64.b64decode(image_data_b64.encode("utf-8"))
|
|
arr = np.frombuffer(raw, dtype=np.uint8)
|
|
img = cv2.imdecode(arr, cv2.IMREAD_COLOR)
|
|
return img
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def resolve_labelme_image_path(source_dir: Path, json_path: Path, meta: Dict) -> Optional[Path]:
|
|
# 1) preferred: imagePath from JSON
|
|
image_path = meta.get("imagePath", "") or ""
|
|
if image_path:
|
|
p = (json_path.parent / image_path).resolve() if not os.path.isabs(image_path) else Path(image_path)
|
|
if p.exists() and p.suffix.lower() in IMG_EXTS:
|
|
return p
|
|
|
|
# sometimes imagePath has only basename but image lives elsewhere under source_dir
|
|
b = Path(image_path).name
|
|
found = list(source_dir.rglob(b))
|
|
for fp in found:
|
|
if fp.exists() and fp.suffix.lower() in IMG_EXTS:
|
|
return fp
|
|
|
|
# 2) fallback: same stem with common image extensions next to json
|
|
for ext in sorted(IMG_EXTS):
|
|
p = json_path.with_suffix(ext)
|
|
if p.exists():
|
|
return p
|
|
|
|
return None
|
|
|
|
|
|
def _normalize_polygon(points_xy: List[List[float]], w: int, h: int) -> Optional[List[Tuple[float, float]]]:
|
|
if w <= 0 or h <= 0:
|
|
return None
|
|
if not points_xy or len(points_xy) < 3:
|
|
return None
|
|
|
|
pts: List[Tuple[float, float]] = []
|
|
for p in points_xy:
|
|
if not isinstance(p, (list, tuple)) or len(p) != 2:
|
|
continue
|
|
x, y = float(p[0]), float(p[1])
|
|
xn = x / float(w)
|
|
yn = y / float(h)
|
|
# clip (labelme can slightly exceed bounds)
|
|
xn = 0.0 if xn < 0.0 else (1.0 if xn > 1.0 else xn)
|
|
yn = 0.0 if yn < 0.0 else (1.0 if yn > 1.0 else yn)
|
|
pts.append((xn, yn))
|
|
|
|
# remove duplicated last==first (optional)
|
|
if len(pts) >= 4 and pts[0] == pts[-1]:
|
|
pts = pts[:-1]
|
|
|
|
# ensure at least 3 unique points
|
|
uniq = list(dict.fromkeys(pts))
|
|
if len(uniq) < 3:
|
|
return None
|
|
|
|
return pts
|
|
|
|
|
|
def generate_yaml(out_dir: Path, names: List[str]) -> None:
|
|
yaml_path = out_dir / "dataset.yaml"
|
|
content = (
|
|
f"path: {out_dir.resolve()}\n"
|
|
f"train: images/train\n"
|
|
f"val: images/val\n"
|
|
f"test: images/test\n"
|
|
f"names: {names}\n"
|
|
)
|
|
yaml_path.write_text(content, encoding="utf-8")
|
|
print(f"[OK] wrote: {yaml_path}")
|
|
|
|
|
|
def is_labelme_json(meta: Dict) -> bool:
|
|
return isinstance(meta.get("shapes", None), list)
|
|
|
|
|
|
def find_labeled_images_in_prepared_dataset(prepared_dir: Path) -> Dict[str, List[Tuple[Path, Path]]]:
|
|
"""
|
|
Scan a prepared YOLO-seg dataset and return only images that have corresponding label files.
|
|
Supports both .txt (YOLO format) and .json (Labelme format) label files.
|
|
Returns: {"train": [(img_path, label_path), ...], "val": [...], "test": [...]}
|
|
"""
|
|
prepared_dir = prepared_dir.expanduser().resolve()
|
|
if not prepared_dir.exists():
|
|
raise SystemExit(f"prepared_dataset not found: {prepared_dir}")
|
|
|
|
result: Dict[str, List[Tuple[Path, Path]]] = {"train": [], "val": [], "test": []}
|
|
|
|
for split in ["train", "val", "test"]:
|
|
img_dir = prepared_dir / "images" / split
|
|
lbl_dir = prepared_dir / "labels" / split
|
|
|
|
if not img_dir.exists():
|
|
print(f"[info] {split}: images directory not found: {img_dir}")
|
|
continue
|
|
|
|
if not lbl_dir.exists():
|
|
print(f"[info] {split}: labels directory not found: {lbl_dir}")
|
|
continue
|
|
|
|
# find all images
|
|
img_count = 0
|
|
lbl_count = 0
|
|
for img_path in img_dir.iterdir():
|
|
if img_path.suffix.lower() not in IMG_EXTS:
|
|
continue
|
|
img_count += 1
|
|
|
|
# check for corresponding label (.txt or .json)
|
|
lbl_path_txt = lbl_dir / f"{img_path.stem}.txt"
|
|
lbl_path_json = lbl_dir / f"{img_path.stem}.json"
|
|
|
|
if lbl_path_txt.exists():
|
|
result[split].append((img_path, lbl_path_txt))
|
|
lbl_count += 1
|
|
elif lbl_path_json.exists():
|
|
result[split].append((img_path, lbl_path_json))
|
|
lbl_count += 1
|
|
# else: image has no label, skip it
|
|
|
|
print(f"[info] {split}: found {img_count} images, {lbl_count} with labels")
|
|
|
|
return result
|
|
|
|
|
|
def process_prepared_dataset(prepared_dir: Path, out_dir: Path, place_mode: str, classes: List[str]) -> None:
|
|
"""Filter and copy/symlink only labeled images from a prepared dataset."""
|
|
labeled = find_labeled_images_in_prepared_dataset(prepared_dir)
|
|
|
|
ensure_dirs(out_dir)
|
|
|
|
total_kept = 0
|
|
for split in ["train", "val", "test"]:
|
|
items = labeled.get(split, [])
|
|
print(f"{split}: {len(items)} images with labels")
|
|
|
|
for img_src, lbl_src in items:
|
|
dst_img = out_dir / f"images/{split}/{img_src.name}"
|
|
|
|
# If source label is .json, convert to .txt format; otherwise keep as-is
|
|
if lbl_src.suffix.lower() == ".json":
|
|
# Convert Labelme JSON to YOLO .txt format
|
|
try:
|
|
meta = json.loads(lbl_src.read_text(encoding="utf-8"))
|
|
img_w = int(meta.get("imageWidth", 0) or 0)
|
|
img_h = int(meta.get("imageHeight", 0) or 0)
|
|
if img_w <= 0 or img_h <= 0:
|
|
# Try to load from image
|
|
wh = _load_image_size(img_src)
|
|
if wh is None:
|
|
print(f"[warn] cannot determine size for {img_src.name}, skipping")
|
|
continue
|
|
img_w, img_h = wh
|
|
|
|
lines: List[str] = []
|
|
name2id = {n: i for i, n in enumerate(classes)}
|
|
for sh in meta.get("shapes", []):
|
|
label = (sh.get("label", "") or "").strip()
|
|
if label not in name2id:
|
|
continue
|
|
shape_type = (sh.get("shape_type", "polygon") or "polygon").lower()
|
|
if shape_type != "polygon":
|
|
continue
|
|
pts = _normalize_polygon(sh.get("points", []), w=img_w, h=img_h)
|
|
if pts is None:
|
|
continue
|
|
cls_id = name2id[label]
|
|
flat = " ".join([f"{x:.6f} {y:.6f}" for x, y in pts])
|
|
lines.append(f"{cls_id} {flat}")
|
|
|
|
dst_lbl = out_dir / f"labels/{split}/{img_src.stem}.txt"
|
|
write_label(dst_lbl, lines)
|
|
except Exception as e:
|
|
print(f"[warn] failed to convert {lbl_src.name}: {e}")
|
|
continue
|
|
else:
|
|
# Already .txt format, just copy
|
|
dst_lbl = out_dir / f"labels/{split}/{lbl_src.name}"
|
|
try:
|
|
shutil.copy2(lbl_src, dst_lbl)
|
|
except Exception as e:
|
|
print(f"[warn] failed to copy label {lbl_src.name}: {e}")
|
|
continue
|
|
|
|
try:
|
|
place_image(img_src, dst_img, place_mode)
|
|
total_kept += 1
|
|
except Exception as e:
|
|
print(f"[warn] failed to place {img_src.name}: {e}")
|
|
|
|
generate_yaml(out_dir, classes)
|
|
print(f"[done] kept={total_kept} labeled images out={out_dir}")
|
|
|
|
|
|
def main() -> None:
|
|
args = parse_args()
|
|
random.seed(args.seed)
|
|
|
|
out_dir = Path(args.out_dir).expanduser().resolve()
|
|
|
|
classes = [c.strip() for c in (args.classes or "").split(",") if c.strip()]
|
|
if not classes:
|
|
raise SystemExit("No classes provided. Example: --classes body")
|
|
|
|
# Mode 1: Process prepared dataset (filter to only labeled images)
|
|
if args.prepared_dataset:
|
|
prepared_dir = Path(args.prepared_dataset).expanduser().resolve()
|
|
place_mode = "copy" if args.copy else ("symlink" if args.symlink else "hardlink")
|
|
process_prepared_dataset(prepared_dir, out_dir, place_mode, classes)
|
|
return
|
|
|
|
# Mode 2: Convert from Labelme JSONs (original behavior)
|
|
source_dir = Path(args.source_dir).expanduser().resolve()
|
|
if not source_dir.exists():
|
|
raise SystemExit(f"source_dir not found: {source_dir}")
|
|
|
|
name2id = {n: i for i, n in enumerate(classes)}
|
|
|
|
json_files = sorted(source_dir.rglob("*.json"))
|
|
if not json_files:
|
|
raise SystemExit(f"No .json found under: {source_dir}")
|
|
|
|
items: List[Tuple[Path, Path, Dict]] = []
|
|
bad = 0
|
|
for jp in json_files:
|
|
try:
|
|
meta = json.loads(jp.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
bad += 1
|
|
continue
|
|
if not is_labelme_json(meta):
|
|
continue
|
|
img_path = resolve_labelme_image_path(source_dir, jp, meta)
|
|
if img_path is None:
|
|
# allow imageData-only workflows: decode and write next to json
|
|
if meta.get("imageData", None):
|
|
img = _decode_labelme_image_data(meta["imageData"])
|
|
if img is not None:
|
|
# choose png
|
|
img_path = jp.with_suffix(".png")
|
|
cv2.imwrite(str(img_path), img)
|
|
else:
|
|
bad += 1
|
|
continue
|
|
else:
|
|
bad += 1
|
|
continue
|
|
|
|
items.append((jp, img_path, meta))
|
|
|
|
if not items:
|
|
raise SystemExit(f"No valid Labelme JSON found under: {source_dir} (bad_json={bad})")
|
|
|
|
# split
|
|
idx = list(range(len(items)))
|
|
random.shuffle(idx)
|
|
n = len(idx)
|
|
n_train = int(n * args.train_ratio)
|
|
n_val = int(n * args.val_ratio)
|
|
n_test = n - n_train - n_val
|
|
train_set = set(idx[:n_train])
|
|
val_set = set(idx[n_train : n_train + n_val])
|
|
test_set = set(idx[n_train + n_val :])
|
|
|
|
print(f"total={n} train={len(train_set)} val={len(val_set)} test={len(test_set)} bad_json={bad}")
|
|
|
|
ensure_dirs(out_dir)
|
|
place_mode = "copy" if args.copy else ("symlink" if args.symlink else "hardlink")
|
|
|
|
kept = 0
|
|
dropped_empty = 0
|
|
for i, (json_path, img_path, meta) in enumerate(items):
|
|
if i in train_set:
|
|
split = "train"
|
|
elif i in val_set:
|
|
split = "val"
|
|
else:
|
|
split = "test"
|
|
|
|
# size
|
|
w = int(meta.get("imageWidth", 0) or 0)
|
|
h = int(meta.get("imageHeight", 0) or 0)
|
|
if w <= 0 or h <= 0:
|
|
wh = _load_image_size(img_path)
|
|
if wh is None:
|
|
continue
|
|
w, h = wh
|
|
|
|
# shapes -> yolo seg lines
|
|
lines: List[str] = []
|
|
for sh in meta.get("shapes", []):
|
|
label = (sh.get("label", "") or "").strip()
|
|
if label not in name2id:
|
|
# ignore unknown labels
|
|
continue
|
|
shape_type = (sh.get("shape_type", "polygon") or "polygon").lower()
|
|
if args.skip_non_polygon and shape_type != "polygon":
|
|
continue
|
|
pts = _normalize_polygon(sh.get("points", []), w=w, h=h)
|
|
if pts is None:
|
|
continue
|
|
cls_id = name2id[label]
|
|
flat = " ".join([f"{x:.6f} {y:.6f}" for x, y in pts])
|
|
lines.append(f"{cls_id} {flat}")
|
|
|
|
if args.drop_empty and not lines:
|
|
dropped_empty += 1
|
|
continue
|
|
|
|
dst_img = out_dir / f"images/{split}/{img_path.name}"
|
|
dst_lbl = out_dir / f"labels/{split}/{img_path.with_suffix('.txt').name}"
|
|
|
|
try:
|
|
place_image(img_path, dst_img, place_mode)
|
|
except Exception:
|
|
continue
|
|
|
|
write_label(dst_lbl, lines)
|
|
kept += 1
|
|
|
|
generate_yaml(out_dir, classes)
|
|
print(f"[done] kept={kept} dropped_empty={dropped_empty} out={out_dir}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|