Files
FishServer/FishMeasure/utils/up_down_alignment.py
2026-04-08 19:32:23 +08:00

266 lines
9.9 KiB
Python
Executable File

"""
Up-Down Alignment for Fish Point Cloud
This module implements an algorithm to determine the correct up/down direction
of a fish point cloud by fitting parabolas to the top and bottom points.
Algorithm:
1. Divide points into 100 bins along X-axis
2. For each bin, find the lowest and highest Y values
3. Project these points onto XY plane
4. Fit parabolas to top points and bottom points separately
5. The side with lower fitting error is the actual top of the fish
"""
import numpy as np
from typing import Tuple, Optional
def fit_parabola_xy(x: np.ndarray, y: np.ndarray) -> Tuple[np.ndarray, float]:
"""
Fit a parabola y = ax² + bx + c to 2D points (x, y).
Args:
x: (N,) array of x coordinates
y: (N,) array of y coordinates
Returns:
coefficients: (3,) array [a, b, c] for parabola y = ax² + bx + c
error: Mean squared error of the fit
"""
if len(x) < 3:
# Need at least 3 points to fit a parabola
return np.array([0.0, 0.0, np.mean(y) if len(y) > 0 else 0.0]), float('inf')
# Build design matrix for parabola: y = ax² + bx + c
# [x² x 1] [a] [y]
# [x² x 1] [b] = [y]
# [x² x 1] [c] [y]
A = np.column_stack([x**2, x, np.ones(len(x))])
try:
# Solve using least squares
coefficients, residuals, _, _ = np.linalg.lstsq(A, y, rcond=None)
mse = np.mean(residuals) if len(residuals) > 0 else float('inf')
return coefficients, mse
except np.linalg.LinAlgError:
# If fitting fails, return default
return np.array([0.0, 0.0, np.mean(y) if len(y) > 0 else 0.0]), float('inf')
def determine_up_down_direction(points: np.ndarray, bins: int = 100) -> Tuple[bool, dict]:
"""
Determine the correct up/down direction of a fish point cloud.
Algorithm:
1. Divide points into bins along X-axis
2. For each bin, find lowest and highest Y values
3. Fit parabolas to top points and bottom points
4. The side with higher curvature (larger |a| coefficient) is the actual top
(more curved = top, more flat = bottom)
Args:
points: (N, 3) array of point coordinates (assumed to be in XY plane, z ≈ 0)
bins: Number of bins along X-axis (default: 100)
Returns:
is_flipped: True if points need to be flipped (current top is actually bottom)
info: Dictionary with diagnostic information:
- 'top_curvature': Curvature (|a|) of top parabola
- 'bottom_curvature': Curvature (|a|) of bottom parabola
- 'top_error': Fitting error for top parabola
- 'bottom_error': Fitting error for bottom parabola
- 'top_coeffs': Parabola coefficients for top [a, b, c]
- 'bottom_coeffs': Parabola coefficients for bottom [a, b, c]
- 'top_points': (M, 2) array of top points [x, y]
- 'bottom_points': (M, 2) array of bottom points [x, y]
"""
if points is None or len(points) == 0:
return False, {'error': 'Empty point cloud'}
pts = np.asarray(points)
if pts.shape[1] < 2:
return False, {'error': 'Points must have at least 2 dimensions'}
# Project to XY plane (use x and y coordinates)
x_vals = pts[:, 0]
y_vals = pts[:, 1]
x_min, x_max = x_vals.min(), x_vals.max()
if np.isclose(x_max, x_min):
# Degenerate along X; cannot determine
return False, {'error': 'Degenerate along X-axis'}
# Divide into bins along X-axis
bins = max(10, int(bins)) # Ensure reasonable bin count
bin_edges = np.linspace(x_min, x_max, bins + 1)
top_points = [] # Highest Y in each bin
bottom_points = [] # Lowest Y in each bin
for i in range(bins):
left_edge = bin_edges[i]
right_edge = bin_edges[i + 1]
# Include right edge on final bin
if i == bins - 1:
mask = (x_vals >= left_edge) & (x_vals <= right_edge)
else:
mask = (x_vals >= left_edge) & (x_vals < right_edge)
if not np.any(mask):
continue
# Get Y values in this bin
y_bin = y_vals[mask]
x_bin = x_vals[mask]
if len(y_bin) == 0:
continue
# Find highest and lowest Y values
top_idx = np.argmax(y_bin)
bottom_idx = np.argmin(y_bin)
# Get corresponding X values
top_x = x_bin[top_idx]
top_y = y_bin[top_idx]
bottom_x = x_bin[bottom_idx]
bottom_y = y_bin[bottom_idx]
top_points.append([top_x, top_y])
bottom_points.append([bottom_x, bottom_y])
if len(top_points) < 3 or len(bottom_points) < 3:
# Need at least 3 points to fit a parabola
return False, {
'error': f'Insufficient points for fitting (top: {len(top_points)}, bottom: {len(bottom_points)})',
'top_points': len(top_points),
'bottom_points': len(bottom_points)
}
top_points = np.array(top_points)
bottom_points = np.array(bottom_points)
# Fit parabolas to top and bottom points
top_coeffs, top_error = fit_parabola_xy(top_points[:, 0], top_points[:, 1])
bottom_coeffs, bottom_error = fit_parabola_xy(bottom_points[:, 0], bottom_points[:, 1])
# Calculate curvature: for parabola y = ax² + bx + c, curvature is |a|
# The more curved side (larger |a|) is the top
# The more flat side (smaller |a|) is the bottom
top_curvature = abs(top_coeffs[0]) # |a| for top parabola
bottom_curvature = abs(bottom_coeffs[0]) # |a| for bottom parabola
# If bottom has higher curvature than top, it means current orientation is flipped
# (The actual top should be more curved)
is_flipped = bottom_curvature > top_curvature
info = {
'top_curvature': float(top_curvature),
'bottom_curvature': float(bottom_curvature),
'top_error': float(top_error),
'bottom_error': float(bottom_error),
'top_coeffs': top_coeffs,
'bottom_coeffs': bottom_coeffs,
'top_points': top_points,
'bottom_points': bottom_points,
'num_bins_used': len(top_points)
}
return is_flipped, info
def align_up_down(points: np.ndarray, bins: int = 100) -> Tuple[np.ndarray, bool, dict]:
"""
Align point cloud to ensure correct up/down direction.
Args:
points: (N, 3) array of point coordinates (assumed to be in XY plane, z ≈ 0)
bins: Number of bins along X-axis (default: 100)
Returns:
aligned_points: (N, 3) array of aligned points
was_flipped: True if points were flipped
info: Dictionary with diagnostic information
"""
if points is None or len(points) == 0:
return points, False, {'error': 'Empty point cloud'}
pts = np.asarray(points).copy()
# Determine if flipping is needed
is_flipped, info = determine_up_down_direction(pts, bins=bins)
if is_flipped:
# Flip along Y-axis (rotate 180° around X-axis)
# This swaps top and bottom
pts[:, 1] = -pts[:, 1]
# Also flip Z if present (for 3D points)
if pts.shape[1] >= 3:
pts[:, 2] = -pts[:, 2]
return pts, is_flipped, info
if __name__ == "__main__":
import argparse
import open3d as o3d
from pathlib import Path
parser = argparse.ArgumentParser(description="Test up-down alignment on a PLY file")
parser.add_argument("ply_path", type=str, help="Path to PLY file to test")
parser.add_argument("--bins", type=int, default=100, help="Number of bins along X-axis (default: 100)")
args = parser.parse_args()
# Load PLY file
ply_path = Path(args.ply_path)
if not ply_path.exists():
print(f"Error: PLY file not found: {ply_path}")
exit(1)
print(f"Loading PLY file: {ply_path}")
pcd = o3d.io.read_point_cloud(str(ply_path))
if len(pcd.points) == 0:
print(f"Error: Empty point cloud in {ply_path}")
exit(1)
points = np.asarray(pcd.points)
print(f"Loaded {len(points)} points")
# Run the algorithm
print(f"\nRunning up-down alignment check (bins={args.bins})...")
is_flipped, info = determine_up_down_direction(points, bins=args.bins)
# Print results
print(f"\nResults:")
print(f" File: {ply_path.name}")
print(f" Is flipped: {is_flipped}")
print(f" Top curvature (|a|): {info.get('top_curvature', 'N/A'):.6f}")
print(f" Bottom curvature (|a|): {info.get('bottom_curvature', 'N/A'):.6f}")
print(f" Top parabola error: {info.get('top_error', 'N/A'):.6f}")
print(f" Bottom parabola error: {info.get('bottom_error', 'N/A'):.6f}")
print(f" Number of bins used: {info.get('num_bins_used', 'N/A')}")
if 'top_coeffs' in info and 'bottom_coeffs' in info:
top_coeffs = info['top_coeffs']
bottom_coeffs = info['bottom_coeffs']
print(f"\n Top parabola: y = {top_coeffs[0]:.6f}x² + {top_coeffs[1]:.6f}x + {top_coeffs[2]:.6f}")
print(f" Bottom parabola: y = {bottom_coeffs[0]:.6f}x² + {bottom_coeffs[1]:.6f}x + {bottom_coeffs[2]:.6f}")
if is_flipped:
print(f"\n Recommendation: Point cloud should be flipped")
print(f" (Current 'top' curvature {info.get('top_curvature', 0):.6f} < 'bottom' curvature {info.get('bottom_curvature', 0):.6f})")
print(f" (More curved side should be on top)")
else:
print(f"\n Recommendation: Point cloud orientation is correct")
print(f" (Top curvature {info.get('top_curvature', 0):.6f} >= Bottom curvature {info.get('bottom_curvature', 0):.6f})")
# Test alignment function
print(f"\nTesting alignment function...")
aligned_points, was_flipped, align_info = align_up_down(points, bins=args.bins)
print(f" Was flipped during alignment: {was_flipped}")
if was_flipped:
print(f" Aligned point cloud saved (Y and Z axes flipped)")