266 lines
9.9 KiB
Python
266 lines
9.9 KiB
Python
|
|
"""
|
||
|
|
Up-Down Alignment for Fish Point Cloud
|
||
|
|
|
||
|
|
This module implements an algorithm to determine the correct up/down direction
|
||
|
|
of a fish point cloud by fitting parabolas to the top and bottom points.
|
||
|
|
|
||
|
|
Algorithm:
|
||
|
|
1. Divide points into 100 bins along X-axis
|
||
|
|
2. For each bin, find the lowest and highest Y values
|
||
|
|
3. Project these points onto XY plane
|
||
|
|
4. Fit parabolas to top points and bottom points separately
|
||
|
|
5. The side with lower fitting error is the actual top of the fish
|
||
|
|
"""
|
||
|
|
|
||
|
|
import numpy as np
|
||
|
|
from typing import Tuple, Optional
|
||
|
|
|
||
|
|
|
||
|
|
def fit_parabola_xy(x: np.ndarray, y: np.ndarray) -> Tuple[np.ndarray, float]:
|
||
|
|
"""
|
||
|
|
Fit a parabola y = ax² + bx + c to 2D points (x, y).
|
||
|
|
|
||
|
|
Args:
|
||
|
|
x: (N,) array of x coordinates
|
||
|
|
y: (N,) array of y coordinates
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
coefficients: (3,) array [a, b, c] for parabola y = ax² + bx + c
|
||
|
|
error: Mean squared error of the fit
|
||
|
|
"""
|
||
|
|
if len(x) < 3:
|
||
|
|
# Need at least 3 points to fit a parabola
|
||
|
|
return np.array([0.0, 0.0, np.mean(y) if len(y) > 0 else 0.0]), float('inf')
|
||
|
|
|
||
|
|
# Build design matrix for parabola: y = ax² + bx + c
|
||
|
|
# [x² x 1] [a] [y]
|
||
|
|
# [x² x 1] [b] = [y]
|
||
|
|
# [x² x 1] [c] [y]
|
||
|
|
A = np.column_stack([x**2, x, np.ones(len(x))])
|
||
|
|
|
||
|
|
try:
|
||
|
|
# Solve using least squares
|
||
|
|
coefficients, residuals, _, _ = np.linalg.lstsq(A, y, rcond=None)
|
||
|
|
mse = np.mean(residuals) if len(residuals) > 0 else float('inf')
|
||
|
|
return coefficients, mse
|
||
|
|
except np.linalg.LinAlgError:
|
||
|
|
# If fitting fails, return default
|
||
|
|
return np.array([0.0, 0.0, np.mean(y) if len(y) > 0 else 0.0]), float('inf')
|
||
|
|
|
||
|
|
|
||
|
|
def determine_up_down_direction(points: np.ndarray, bins: int = 100) -> Tuple[bool, dict]:
|
||
|
|
"""
|
||
|
|
Determine the correct up/down direction of a fish point cloud.
|
||
|
|
|
||
|
|
Algorithm:
|
||
|
|
1. Divide points into bins along X-axis
|
||
|
|
2. For each bin, find lowest and highest Y values
|
||
|
|
3. Fit parabolas to top points and bottom points
|
||
|
|
4. The side with higher curvature (larger |a| coefficient) is the actual top
|
||
|
|
(more curved = top, more flat = bottom)
|
||
|
|
|
||
|
|
Args:
|
||
|
|
points: (N, 3) array of point coordinates (assumed to be in XY plane, z ≈ 0)
|
||
|
|
bins: Number of bins along X-axis (default: 100)
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
is_flipped: True if points need to be flipped (current top is actually bottom)
|
||
|
|
info: Dictionary with diagnostic information:
|
||
|
|
- 'top_curvature': Curvature (|a|) of top parabola
|
||
|
|
- 'bottom_curvature': Curvature (|a|) of bottom parabola
|
||
|
|
- 'top_error': Fitting error for top parabola
|
||
|
|
- 'bottom_error': Fitting error for bottom parabola
|
||
|
|
- 'top_coeffs': Parabola coefficients for top [a, b, c]
|
||
|
|
- 'bottom_coeffs': Parabola coefficients for bottom [a, b, c]
|
||
|
|
- 'top_points': (M, 2) array of top points [x, y]
|
||
|
|
- 'bottom_points': (M, 2) array of bottom points [x, y]
|
||
|
|
"""
|
||
|
|
if points is None or len(points) == 0:
|
||
|
|
return False, {'error': 'Empty point cloud'}
|
||
|
|
|
||
|
|
pts = np.asarray(points)
|
||
|
|
if pts.shape[1] < 2:
|
||
|
|
return False, {'error': 'Points must have at least 2 dimensions'}
|
||
|
|
|
||
|
|
# Project to XY plane (use x and y coordinates)
|
||
|
|
x_vals = pts[:, 0]
|
||
|
|
y_vals = pts[:, 1]
|
||
|
|
|
||
|
|
x_min, x_max = x_vals.min(), x_vals.max()
|
||
|
|
if np.isclose(x_max, x_min):
|
||
|
|
# Degenerate along X; cannot determine
|
||
|
|
return False, {'error': 'Degenerate along X-axis'}
|
||
|
|
|
||
|
|
# Divide into bins along X-axis
|
||
|
|
bins = max(10, int(bins)) # Ensure reasonable bin count
|
||
|
|
bin_edges = np.linspace(x_min, x_max, bins + 1)
|
||
|
|
|
||
|
|
top_points = [] # Highest Y in each bin
|
||
|
|
bottom_points = [] # Lowest Y in each bin
|
||
|
|
|
||
|
|
for i in range(bins):
|
||
|
|
left_edge = bin_edges[i]
|
||
|
|
right_edge = bin_edges[i + 1]
|
||
|
|
|
||
|
|
# Include right edge on final bin
|
||
|
|
if i == bins - 1:
|
||
|
|
mask = (x_vals >= left_edge) & (x_vals <= right_edge)
|
||
|
|
else:
|
||
|
|
mask = (x_vals >= left_edge) & (x_vals < right_edge)
|
||
|
|
|
||
|
|
if not np.any(mask):
|
||
|
|
continue
|
||
|
|
|
||
|
|
# Get Y values in this bin
|
||
|
|
y_bin = y_vals[mask]
|
||
|
|
x_bin = x_vals[mask]
|
||
|
|
|
||
|
|
if len(y_bin) == 0:
|
||
|
|
continue
|
||
|
|
|
||
|
|
# Find highest and lowest Y values
|
||
|
|
top_idx = np.argmax(y_bin)
|
||
|
|
bottom_idx = np.argmin(y_bin)
|
||
|
|
|
||
|
|
# Get corresponding X values
|
||
|
|
top_x = x_bin[top_idx]
|
||
|
|
top_y = y_bin[top_idx]
|
||
|
|
bottom_x = x_bin[bottom_idx]
|
||
|
|
bottom_y = y_bin[bottom_idx]
|
||
|
|
|
||
|
|
top_points.append([top_x, top_y])
|
||
|
|
bottom_points.append([bottom_x, bottom_y])
|
||
|
|
|
||
|
|
if len(top_points) < 3 or len(bottom_points) < 3:
|
||
|
|
# Need at least 3 points to fit a parabola
|
||
|
|
return False, {
|
||
|
|
'error': f'Insufficient points for fitting (top: {len(top_points)}, bottom: {len(bottom_points)})',
|
||
|
|
'top_points': len(top_points),
|
||
|
|
'bottom_points': len(bottom_points)
|
||
|
|
}
|
||
|
|
|
||
|
|
top_points = np.array(top_points)
|
||
|
|
bottom_points = np.array(bottom_points)
|
||
|
|
|
||
|
|
# Fit parabolas to top and bottom points
|
||
|
|
top_coeffs, top_error = fit_parabola_xy(top_points[:, 0], top_points[:, 1])
|
||
|
|
bottom_coeffs, bottom_error = fit_parabola_xy(bottom_points[:, 0], bottom_points[:, 1])
|
||
|
|
|
||
|
|
# Calculate curvature: for parabola y = ax² + bx + c, curvature is |a|
|
||
|
|
# The more curved side (larger |a|) is the top
|
||
|
|
# The more flat side (smaller |a|) is the bottom
|
||
|
|
top_curvature = abs(top_coeffs[0]) # |a| for top parabola
|
||
|
|
bottom_curvature = abs(bottom_coeffs[0]) # |a| for bottom parabola
|
||
|
|
|
||
|
|
# If bottom has higher curvature than top, it means current orientation is flipped
|
||
|
|
# (The actual top should be more curved)
|
||
|
|
is_flipped = bottom_curvature > top_curvature
|
||
|
|
|
||
|
|
info = {
|
||
|
|
'top_curvature': float(top_curvature),
|
||
|
|
'bottom_curvature': float(bottom_curvature),
|
||
|
|
'top_error': float(top_error),
|
||
|
|
'bottom_error': float(bottom_error),
|
||
|
|
'top_coeffs': top_coeffs,
|
||
|
|
'bottom_coeffs': bottom_coeffs,
|
||
|
|
'top_points': top_points,
|
||
|
|
'bottom_points': bottom_points,
|
||
|
|
'num_bins_used': len(top_points)
|
||
|
|
}
|
||
|
|
|
||
|
|
return is_flipped, info
|
||
|
|
|
||
|
|
|
||
|
|
def align_up_down(points: np.ndarray, bins: int = 100) -> Tuple[np.ndarray, bool, dict]:
|
||
|
|
"""
|
||
|
|
Align point cloud to ensure correct up/down direction.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
points: (N, 3) array of point coordinates (assumed to be in XY plane, z ≈ 0)
|
||
|
|
bins: Number of bins along X-axis (default: 100)
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
aligned_points: (N, 3) array of aligned points
|
||
|
|
was_flipped: True if points were flipped
|
||
|
|
info: Dictionary with diagnostic information
|
||
|
|
"""
|
||
|
|
if points is None or len(points) == 0:
|
||
|
|
return points, False, {'error': 'Empty point cloud'}
|
||
|
|
|
||
|
|
pts = np.asarray(points).copy()
|
||
|
|
|
||
|
|
# Determine if flipping is needed
|
||
|
|
is_flipped, info = determine_up_down_direction(pts, bins=bins)
|
||
|
|
|
||
|
|
if is_flipped:
|
||
|
|
# Flip along Y-axis (rotate 180° around X-axis)
|
||
|
|
# This swaps top and bottom
|
||
|
|
pts[:, 1] = -pts[:, 1]
|
||
|
|
# Also flip Z if present (for 3D points)
|
||
|
|
if pts.shape[1] >= 3:
|
||
|
|
pts[:, 2] = -pts[:, 2]
|
||
|
|
|
||
|
|
return pts, is_flipped, info
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
import argparse
|
||
|
|
import open3d as o3d
|
||
|
|
from pathlib import Path
|
||
|
|
|
||
|
|
parser = argparse.ArgumentParser(description="Test up-down alignment on a PLY file")
|
||
|
|
parser.add_argument("ply_path", type=str, help="Path to PLY file to test")
|
||
|
|
parser.add_argument("--bins", type=int, default=100, help="Number of bins along X-axis (default: 100)")
|
||
|
|
args = parser.parse_args()
|
||
|
|
|
||
|
|
# Load PLY file
|
||
|
|
ply_path = Path(args.ply_path)
|
||
|
|
if not ply_path.exists():
|
||
|
|
print(f"Error: PLY file not found: {ply_path}")
|
||
|
|
exit(1)
|
||
|
|
|
||
|
|
print(f"Loading PLY file: {ply_path}")
|
||
|
|
pcd = o3d.io.read_point_cloud(str(ply_path))
|
||
|
|
|
||
|
|
if len(pcd.points) == 0:
|
||
|
|
print(f"Error: Empty point cloud in {ply_path}")
|
||
|
|
exit(1)
|
||
|
|
|
||
|
|
points = np.asarray(pcd.points)
|
||
|
|
print(f"Loaded {len(points)} points")
|
||
|
|
|
||
|
|
# Run the algorithm
|
||
|
|
print(f"\nRunning up-down alignment check (bins={args.bins})...")
|
||
|
|
is_flipped, info = determine_up_down_direction(points, bins=args.bins)
|
||
|
|
|
||
|
|
# Print results
|
||
|
|
print(f"\nResults:")
|
||
|
|
print(f" File: {ply_path.name}")
|
||
|
|
print(f" Is flipped: {is_flipped}")
|
||
|
|
print(f" Top curvature (|a|): {info.get('top_curvature', 'N/A'):.6f}")
|
||
|
|
print(f" Bottom curvature (|a|): {info.get('bottom_curvature', 'N/A'):.6f}")
|
||
|
|
print(f" Top parabola error: {info.get('top_error', 'N/A'):.6f}")
|
||
|
|
print(f" Bottom parabola error: {info.get('bottom_error', 'N/A'):.6f}")
|
||
|
|
print(f" Number of bins used: {info.get('num_bins_used', 'N/A')}")
|
||
|
|
|
||
|
|
if 'top_coeffs' in info and 'bottom_coeffs' in info:
|
||
|
|
top_coeffs = info['top_coeffs']
|
||
|
|
bottom_coeffs = info['bottom_coeffs']
|
||
|
|
print(f"\n Top parabola: y = {top_coeffs[0]:.6f}x² + {top_coeffs[1]:.6f}x + {top_coeffs[2]:.6f}")
|
||
|
|
print(f" Bottom parabola: y = {bottom_coeffs[0]:.6f}x² + {bottom_coeffs[1]:.6f}x + {bottom_coeffs[2]:.6f}")
|
||
|
|
|
||
|
|
if is_flipped:
|
||
|
|
print(f"\n Recommendation: Point cloud should be flipped")
|
||
|
|
print(f" (Current 'top' curvature {info.get('top_curvature', 0):.6f} < 'bottom' curvature {info.get('bottom_curvature', 0):.6f})")
|
||
|
|
print(f" (More curved side should be on top)")
|
||
|
|
else:
|
||
|
|
print(f"\n Recommendation: Point cloud orientation is correct")
|
||
|
|
print(f" (Top curvature {info.get('top_curvature', 0):.6f} >= Bottom curvature {info.get('bottom_curvature', 0):.6f})")
|
||
|
|
|
||
|
|
# Test alignment function
|
||
|
|
print(f"\nTesting alignment function...")
|
||
|
|
aligned_points, was_flipped, align_info = align_up_down(points, bins=args.bins)
|
||
|
|
print(f" Was flipped during alignment: {was_flipped}")
|
||
|
|
if was_flipped:
|
||
|
|
print(f" Aligned point cloud saved (Y and Z axes flipped)")
|