""" Up-Down Alignment for Fish Point Cloud This module implements an algorithm to determine the correct up/down direction of a fish point cloud by fitting parabolas to the top and bottom points. Algorithm: 1. Divide points into 100 bins along X-axis 2. For each bin, find the lowest and highest Y values 3. Project these points onto XY plane 4. Fit parabolas to top points and bottom points separately 5. The side with lower fitting error is the actual top of the fish """ import numpy as np from typing import Tuple, Optional def fit_parabola_xy(x: np.ndarray, y: np.ndarray) -> Tuple[np.ndarray, float]: """ Fit a parabola y = ax² + bx + c to 2D points (x, y). Args: x: (N,) array of x coordinates y: (N,) array of y coordinates Returns: coefficients: (3,) array [a, b, c] for parabola y = ax² + bx + c error: Mean squared error of the fit """ if len(x) < 3: # Need at least 3 points to fit a parabola return np.array([0.0, 0.0, np.mean(y) if len(y) > 0 else 0.0]), float('inf') # Build design matrix for parabola: y = ax² + bx + c # [x² x 1] [a] [y] # [x² x 1] [b] = [y] # [x² x 1] [c] [y] A = np.column_stack([x**2, x, np.ones(len(x))]) try: # Solve using least squares coefficients, residuals, _, _ = np.linalg.lstsq(A, y, rcond=None) mse = np.mean(residuals) if len(residuals) > 0 else float('inf') return coefficients, mse except np.linalg.LinAlgError: # If fitting fails, return default return np.array([0.0, 0.0, np.mean(y) if len(y) > 0 else 0.0]), float('inf') def determine_up_down_direction(points: np.ndarray, bins: int = 100) -> Tuple[bool, dict]: """ Determine the correct up/down direction of a fish point cloud. Algorithm: 1. Divide points into bins along X-axis 2. For each bin, find lowest and highest Y values 3. Fit parabolas to top points and bottom points 4. The side with higher curvature (larger |a| coefficient) is the actual top (more curved = top, more flat = bottom) Args: points: (N, 3) array of point coordinates (assumed to be in XY plane, z ≈ 0) bins: Number of bins along X-axis (default: 100) Returns: is_flipped: True if points need to be flipped (current top is actually bottom) info: Dictionary with diagnostic information: - 'top_curvature': Curvature (|a|) of top parabola - 'bottom_curvature': Curvature (|a|) of bottom parabola - 'top_error': Fitting error for top parabola - 'bottom_error': Fitting error for bottom parabola - 'top_coeffs': Parabola coefficients for top [a, b, c] - 'bottom_coeffs': Parabola coefficients for bottom [a, b, c] - 'top_points': (M, 2) array of top points [x, y] - 'bottom_points': (M, 2) array of bottom points [x, y] """ if points is None or len(points) == 0: return False, {'error': 'Empty point cloud'} pts = np.asarray(points) if pts.shape[1] < 2: return False, {'error': 'Points must have at least 2 dimensions'} # Project to XY plane (use x and y coordinates) x_vals = pts[:, 0] y_vals = pts[:, 1] x_min, x_max = x_vals.min(), x_vals.max() if np.isclose(x_max, x_min): # Degenerate along X; cannot determine return False, {'error': 'Degenerate along X-axis'} # Divide into bins along X-axis bins = max(10, int(bins)) # Ensure reasonable bin count bin_edges = np.linspace(x_min, x_max, bins + 1) top_points = [] # Highest Y in each bin bottom_points = [] # Lowest Y in each bin for i in range(bins): left_edge = bin_edges[i] right_edge = bin_edges[i + 1] # Include right edge on final bin if i == bins - 1: mask = (x_vals >= left_edge) & (x_vals <= right_edge) else: mask = (x_vals >= left_edge) & (x_vals < right_edge) if not np.any(mask): continue # Get Y values in this bin y_bin = y_vals[mask] x_bin = x_vals[mask] if len(y_bin) == 0: continue # Find highest and lowest Y values top_idx = np.argmax(y_bin) bottom_idx = np.argmin(y_bin) # Get corresponding X values top_x = x_bin[top_idx] top_y = y_bin[top_idx] bottom_x = x_bin[bottom_idx] bottom_y = y_bin[bottom_idx] top_points.append([top_x, top_y]) bottom_points.append([bottom_x, bottom_y]) if len(top_points) < 3 or len(bottom_points) < 3: # Need at least 3 points to fit a parabola return False, { 'error': f'Insufficient points for fitting (top: {len(top_points)}, bottom: {len(bottom_points)})', 'top_points': len(top_points), 'bottom_points': len(bottom_points) } top_points = np.array(top_points) bottom_points = np.array(bottom_points) # Fit parabolas to top and bottom points top_coeffs, top_error = fit_parabola_xy(top_points[:, 0], top_points[:, 1]) bottom_coeffs, bottom_error = fit_parabola_xy(bottom_points[:, 0], bottom_points[:, 1]) # Calculate curvature: for parabola y = ax² + bx + c, curvature is |a| # The more curved side (larger |a|) is the top # The more flat side (smaller |a|) is the bottom top_curvature = abs(top_coeffs[0]) # |a| for top parabola bottom_curvature = abs(bottom_coeffs[0]) # |a| for bottom parabola # If bottom has higher curvature than top, it means current orientation is flipped # (The actual top should be more curved) is_flipped = bottom_curvature > top_curvature info = { 'top_curvature': float(top_curvature), 'bottom_curvature': float(bottom_curvature), 'top_error': float(top_error), 'bottom_error': float(bottom_error), 'top_coeffs': top_coeffs, 'bottom_coeffs': bottom_coeffs, 'top_points': top_points, 'bottom_points': bottom_points, 'num_bins_used': len(top_points) } return is_flipped, info def align_up_down(points: np.ndarray, bins: int = 100) -> Tuple[np.ndarray, bool, dict]: """ Align point cloud to ensure correct up/down direction. Args: points: (N, 3) array of point coordinates (assumed to be in XY plane, z ≈ 0) bins: Number of bins along X-axis (default: 100) Returns: aligned_points: (N, 3) array of aligned points was_flipped: True if points were flipped info: Dictionary with diagnostic information """ if points is None or len(points) == 0: return points, False, {'error': 'Empty point cloud'} pts = np.asarray(points).copy() # Determine if flipping is needed is_flipped, info = determine_up_down_direction(pts, bins=bins) if is_flipped: # Flip along Y-axis (rotate 180° around X-axis) # This swaps top and bottom pts[:, 1] = -pts[:, 1] # Also flip Z if present (for 3D points) if pts.shape[1] >= 3: pts[:, 2] = -pts[:, 2] return pts, is_flipped, info if __name__ == "__main__": import argparse import open3d as o3d from pathlib import Path parser = argparse.ArgumentParser(description="Test up-down alignment on a PLY file") parser.add_argument("ply_path", type=str, help="Path to PLY file to test") parser.add_argument("--bins", type=int, default=100, help="Number of bins along X-axis (default: 100)") args = parser.parse_args() # Load PLY file ply_path = Path(args.ply_path) if not ply_path.exists(): print(f"Error: PLY file not found: {ply_path}") exit(1) print(f"Loading PLY file: {ply_path}") pcd = o3d.io.read_point_cloud(str(ply_path)) if len(pcd.points) == 0: print(f"Error: Empty point cloud in {ply_path}") exit(1) points = np.asarray(pcd.points) print(f"Loaded {len(points)} points") # Run the algorithm print(f"\nRunning up-down alignment check (bins={args.bins})...") is_flipped, info = determine_up_down_direction(points, bins=args.bins) # Print results print(f"\nResults:") print(f" File: {ply_path.name}") print(f" Is flipped: {is_flipped}") print(f" Top curvature (|a|): {info.get('top_curvature', 'N/A'):.6f}") print(f" Bottom curvature (|a|): {info.get('bottom_curvature', 'N/A'):.6f}") print(f" Top parabola error: {info.get('top_error', 'N/A'):.6f}") print(f" Bottom parabola error: {info.get('bottom_error', 'N/A'):.6f}") print(f" Number of bins used: {info.get('num_bins_used', 'N/A')}") if 'top_coeffs' in info and 'bottom_coeffs' in info: top_coeffs = info['top_coeffs'] bottom_coeffs = info['bottom_coeffs'] print(f"\n Top parabola: y = {top_coeffs[0]:.6f}x² + {top_coeffs[1]:.6f}x + {top_coeffs[2]:.6f}") print(f" Bottom parabola: y = {bottom_coeffs[0]:.6f}x² + {bottom_coeffs[1]:.6f}x + {bottom_coeffs[2]:.6f}") if is_flipped: print(f"\n Recommendation: Point cloud should be flipped") print(f" (Current 'top' curvature {info.get('top_curvature', 0):.6f} < 'bottom' curvature {info.get('bottom_curvature', 0):.6f})") print(f" (More curved side should be on top)") else: print(f"\n Recommendation: Point cloud orientation is correct") print(f" (Top curvature {info.get('top_curvature', 0):.6f} >= Bottom curvature {info.get('bottom_curvature', 0):.6f})") # Test alignment function print(f"\nTesting alignment function...") aligned_points, was_flipped, align_info = align_up_down(points, bins=args.bins) print(f" Was flipped during alignment: {was_flipped}") if was_flipped: print(f" Aligned point cloud saved (Y and Z axes flipped)")