""" Evaluate the flatness/curvature of a point cloud using RANSAC plane fitting. The score is the percentage of points that lie on the fitted plane. Higher percentage means the point cloud is flatter (better for flat objects like fish). """ import numpy as np from pathlib import Path from typing import Tuple, Optional import open3d as o3d def evaluate_flatness_ransac(points: np.ndarray, distance_threshold: float = 5.0, ransac_n: int = 3, num_iterations: int = 1000) -> Tuple[float, dict]: """ Evaluate flatness of a point cloud using RANSAC plane fitting. Args: points: Point cloud array (N, 3) distance_threshold: Maximum distance from a point to the plane to be considered inlier (in mm) ransac_n: Number of points to sample for plane fitting num_iterations: Number of RANSAC iterations Returns: tuple: (flatness_score: float, info_dict: dict) - flatness_score: Percentage of points on the plane (0-100, higher is flatter) - info_dict: Contains plane equation, inlier count, total points, etc. """ if len(points) < 3: return 0.0, {"error": "Not enough points (need at least 3)"} if len(points) < ransac_n: return 0.0, {"error": f"Not enough points for RANSAC (need at least {ransac_n})"} # Convert to Open3D point cloud pcd = o3d.geometry.PointCloud() pcd.points = o3d.utility.Vector3dVector(points.astype(np.float64)) # Fit plane using RANSAC plane_model, inliers = pcd.segment_plane( distance_threshold=distance_threshold, ransac_n=ransac_n, num_iterations=num_iterations ) # Calculate flatness score (percentage of inliers) num_inliers = len(inliers) num_total = len(points) flatness_score = (num_inliers / num_total) * 100.0 # Extract plane equation: ax + by + cz + d = 0 [a, b, c, d] = plane_model # Calculate plane normal and distance normal = np.array([a, b, c]) normal_norm = np.linalg.norm(normal) if normal_norm > 0: normal = normal / normal_norm plane_distance = d / normal_norm else: normal = np.array([0, 0, 1]) plane_distance = 0.0 info = { "flatness_score": flatness_score, "num_inliers": num_inliers, "num_total": num_total, "num_outliers": num_total - num_inliers, "plane_equation": [a, b, c, d], "plane_normal": normal.tolist(), "plane_distance": float(plane_distance), "distance_threshold": distance_threshold, "inlier_indices": inliers } return flatness_score, info def evaluate_flatness_from_ply(ply_path: str, distance_threshold: float = 5.0, ransac_n: int = 3, num_iterations: int = 1000) -> Tuple[float, dict]: """ Evaluate flatness of a point cloud from a PLY file. Args: ply_path: Path to PLY file distance_threshold: Maximum distance from a point to the plane to be considered inlier (in mm) ransac_n: Number of points to sample for plane fitting num_iterations: Number of RANSAC iterations Returns: tuple: (flatness_score: float, info_dict: dict) """ ply_path = Path(ply_path).expanduser().resolve() if not ply_path.exists(): return 0.0, {"error": f"PLY file not found: {ply_path}"} # Load point cloud pcd = o3d.io.read_point_cloud(str(ply_path)) if len(pcd.points) == 0: return 0.0, {"error": "Empty point cloud"} # Convert to numpy array points = np.asarray(pcd.points) return evaluate_flatness_ransac(points, distance_threshold, ransac_n, num_iterations) def batch_evaluate_flatness(ply_files: list, distance_threshold: float = 5.0, ransac_n: int = 3, num_iterations: int = 1000) -> list: """ Evaluate flatness for multiple PLY files. Args: ply_files: List of paths to PLY files distance_threshold: Maximum distance from a point to the plane to be considered inlier (in mm) ransac_n: Number of points to sample for plane fitting num_iterations: Number of RANSAC iterations Returns: list: List of dicts, each containing file path and flatness evaluation results """ results = [] for ply_file in ply_files: ply_path = Path(ply_file) score, info = evaluate_flatness_from_ply( str(ply_path), distance_threshold=distance_threshold, ransac_n=ransac_n, num_iterations=num_iterations ) result = { "file": str(ply_path), "flatness_score": score, **info } results.append(result) return results if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Evaluate flatness of point clouds using RANSAC plane fitting") parser.add_argument("--ply", type=str, help="Path to single PLY file") parser.add_argument("--folder", type=str, help="Path to folder containing PLY files") parser.add_argument("--distance-threshold", type=float, default=5.0, help="Maximum distance from point to plane to be considered inlier (mm, default: 5.0)") parser.add_argument("--ransac-n", type=int, default=3, help="Number of points to sample for plane fitting (default: 3)") parser.add_argument("--num-iterations", type=int, default=1000, help="Number of RANSAC iterations (default: 1000)") parser.add_argument("--output", type=str, help="Output JSON file to save results") args = parser.parse_args() import json if args.ply: # Single file score, info = evaluate_flatness_from_ply( args.ply, distance_threshold=args.distance_threshold, ransac_n=args.ransac_n, num_iterations=args.num_iterations ) result = { "file": args.ply, "flatness_score": score, **info } print(f"File: {args.ply}") print(f"Flatness Score: {score:.2f}%") print(f"Inliers: {info.get('num_inliers', 0)}/{info.get('num_total', 0)}") print(f"Plane Normal: {info.get('plane_normal', [0,0,0])}") if args.output: with open(args.output, 'w') as f: json.dump(result, f, indent=2) print(f"\nResults saved to: {args.output}") elif args.folder: # Folder of files folder = Path(args.folder) ply_files = list(folder.glob("*.ply")) if not ply_files: print(f"No PLY files found in {folder}") exit(1) print(f"Evaluating {len(ply_files)} PLY files...") results = batch_evaluate_flatness( ply_files, distance_threshold=args.distance_threshold, ransac_n=args.ransac_n, num_iterations=args.num_iterations ) # Print summary scores = [r["flatness_score"] for r in results if "error" not in r] if scores: print(f"\nSummary:") print(f" Average flatness score: {np.mean(scores):.2f}%") print(f" Min flatness score: {np.min(scores):.2f}%") print(f" Max flatness score: {np.max(scores):.2f}%") print(f" Std flatness score: {np.std(scores):.2f}%") if args.output: output_data = { "summary": { "num_files": len(results), "average_score": float(np.mean(scores)) if scores else 0.0, "min_score": float(np.min(scores)) if scores else 0.0, "max_score": float(np.max(scores)) if scores else 0.0, "std_score": float(np.std(scores)) if scores else 0.0 }, "results": results } with open(args.output, 'w') as f: json.dump(output_data, f, indent=2) print(f"\nResults saved to: {args.output}") else: # Print all results print("\nDetailed Results:") for r in results: if "error" in r: print(f" {r['file']}: ERROR - {r['error']}") else: print(f" {r['file']}: {r['flatness_score']:.2f}% ({r['num_inliers']}/{r['num_total']} inliers)") else: parser.print_help()