#!/usr/bin/env python3 """ Point cloud filtering utilities. Provides methods to filter and downsample point clouds to remove outliers. """ import numpy as np from scipy.spatial import cKDTree try: from sklearn.cluster import DBSCAN SKLEARN_AVAILABLE = True except ImportError: SKLEARN_AVAILABLE = False print("Warning: sklearn not available. Clustering filter will use simple connected components method.") def downsample_point_cloud(points, colors, voxel_size=5.0): """Downsample point cloud using voxel grid to remove sparse points. Args: points: Point cloud coordinates array (N, 3) in mm colors: Color array (N, 3) voxel_size: Voxel size in mm, default 5.0mm Returns: tuple: (downsampled points, downsampled colors) """ if len(points) == 0: return points, colors # Calculate voxel indices for each point voxel_indices = np.floor(points / voxel_size).astype(np.int32) # Use dictionary to store first point in each voxel voxel_dict = {} for i in range(len(points)): voxel_key = tuple(voxel_indices[i]) if voxel_key not in voxel_dict: voxel_dict[voxel_key] = i # Get indices of kept points kept_indices = list(voxel_dict.values()) return points[kept_indices], colors[kept_indices] def filter_point_cloud_by_centroid(points, colors, distance_threshold_factor=2.0): """Filter point cloud by removing points far from the centroid. Calculates the centroid of the point cloud and removes points that are more than distance_threshold_factor * std_dev away from the centroid. Args: points: Point cloud coordinates array (N, 3) in mm colors: Color array (N, 3) distance_threshold_factor: Multiplier for standard deviation threshold (default: 2.0) Points beyond centroid + factor * std_dev are removed Returns: tuple: (filtered points, filtered colors) """ if len(points) == 0: return points, colors # Calculate centroid (mean of all points) centroid = np.mean(points, axis=0) # Calculate distances from each point to centroid distances = np.linalg.norm(points - centroid, axis=1) # Calculate mean and standard deviation of distances mean_distance = np.mean(distances) std_distance = np.std(distances) # Threshold: keep points within mean + factor * std_dev threshold = mean_distance + distance_threshold_factor * std_distance # Keep points within threshold valid_mask = distances <= threshold filtered_points = points[valid_mask] filtered_colors = colors[valid_mask] return filtered_points, filtered_colors def filter_point_cloud_kdtree(points, colors, radius=5.0, min_neighbors=10): """Filter point cloud using KDTree to remove sparse outliers. Points with fewer than min_neighbors within radius are removed. Args: points: Point cloud coordinates array (N, 3) in mm colors: Color array (N, 3) radius: Search radius in mm, default 5.0mm min_neighbors: Minimum number of neighbors required, default 10 Returns: tuple: (filtered points, filtered colors) """ if len(points) == 0: return points, colors # Build KDTree tree = cKDTree(points) # Count neighbors for each point neighbor_counts = np.zeros(len(points), dtype=np.int32) # Query neighbors for each point for i in range(len(points)): neighbors = tree.query_ball_point(points[i], radius) neighbor_counts[i] = len(neighbors) # Keep points with sufficient neighbors valid_mask = neighbor_counts >= min_neighbors filtered_points = points[valid_mask] filtered_colors = colors[valid_mask] return filtered_points, filtered_colors def filter_point_cloud_statistical_outlier(points, colors, k_neighbors=20, std_ratio=2.0): """Filter point cloud using statistical outlier removal based on distance to k-nearest neighbors. For each point, calculates the mean distance to its k nearest neighbors. Points with mean distance beyond mean + std_ratio * std_dev are considered outliers. Args: points: Point cloud coordinates array (N, 3) in mm colors: Color array (N, 3) k_neighbors: Number of nearest neighbors to consider (default: 20) std_ratio: Standard deviation multiplier for outlier threshold (default: 2.0) Points beyond mean + std_ratio * std_dev are removed Returns: tuple: (filtered points, filtered colors) """ if len(points) == 0: return points, colors if len(points) < k_neighbors + 1: # Not enough points for statistical filtering return points, colors # Build KDTree for efficient nearest neighbor queries tree = cKDTree(points) # Calculate mean distance to k nearest neighbors for each point mean_distances = np.zeros(len(points)) for i in range(len(points)): # Query k+1 nearest neighbors (including the point itself) distances, _ = tree.query(points[i], k=k_neighbors + 1) # Exclude the point itself (distance = 0) and calculate mean mean_distances[i] = np.mean(distances[1:]) # Skip first (self) # Calculate statistics mean_dist = np.mean(mean_distances) std_dist = np.std(mean_distances) # Threshold: keep points within mean + std_ratio * std_dev threshold = mean_dist + std_ratio * std_dist # Keep points within threshold valid_mask = mean_distances <= threshold filtered_points = points[valid_mask] filtered_colors = colors[valid_mask] return filtered_points, filtered_colors def filter_point_cloud_by_density(points, colors, radius=100.0, min_points_in_radius=200): """Filter point cloud by density: keep only points with sufficient neighbors within radius. For each point, checks if there are at least min_points_in_radius points within the specified radius. Points with insufficient density are removed. Args: points: Point cloud coordinates array (N, 3) in mm colors: Color array (N, 3) radius: Search radius in mm, default 100.0mm min_points_in_radius: Minimum number of points required within radius, default 200 Returns: tuple: (filtered points, filtered colors) """ if len(points) == 0: return points, colors # Build KDTree for efficient neighbor queries tree = cKDTree(points) # Count neighbors for each point within radius neighbor_counts = np.zeros(len(points), dtype=np.int32) # Query neighbors for each point for i in range(len(points)): neighbors = tree.query_ball_point(points[i], radius) neighbor_counts[i] = len(neighbors) # Keep points with sufficient density (at least min_points_in_radius neighbors) valid_mask = neighbor_counts >= min_points_in_radius filtered_points = points[valid_mask] filtered_colors = colors[valid_mask] return filtered_points, filtered_colors def filter_point_cloud_by_largest_cluster(points, colors, eps=10.0, min_samples=20, use_dbscan=True): """Filter point cloud by keeping only the largest cluster. Uses clustering algorithm (DBSCAN or connected components) to identify clusters and keeps only the largest one, removing all outliers and smaller clusters. Args: points: Point cloud coordinates array (N, 3) in mm colors: Color array (N, 3) eps: Maximum distance between points in the same cluster (mm), default 10.0mm min_samples: Minimum number of points to form a cluster, default 20 use_dbscan: If True, use DBSCAN (requires sklearn), else use simple connected components Returns: tuple: (filtered points, filtered colors) """ if len(points) == 0: return points, colors if use_dbscan and SKLEARN_AVAILABLE: # Use DBSCAN for clustering clustering = DBSCAN(eps=eps, min_samples=min_samples).fit(points) labels = clustering.labels_ # Find the largest cluster (excluding noise points labeled as -1) unique_labels, counts = np.unique(labels[labels >= 0], return_counts=True) if len(unique_labels) == 0: # All points are noise, return empty return np.array([]).reshape(0, 3), np.array([]).reshape(0, 3) # Get the label of the largest cluster largest_cluster_label = unique_labels[np.argmax(counts)] # Keep only points in the largest cluster mask = labels == largest_cluster_label filtered_points = points[mask] filtered_colors = colors[mask] else: # Use simple connected components method with KDTree tree = cKDTree(points) visited = np.zeros(len(points), dtype=bool) clusters = [] for i in range(len(points)): if visited[i]: continue # Start a new cluster cluster = [] stack = [i] visited[i] = True while stack: current_idx = stack.pop() cluster.append(current_idx) # Find neighbors within eps distance neighbors = tree.query_ball_point(points[current_idx], eps) for neighbor_idx in neighbors: if not visited[neighbor_idx]: visited[neighbor_idx] = True stack.append(neighbor_idx) # Only keep clusters with at least min_samples points if len(cluster) >= min_samples: clusters.append(cluster) if len(clusters) == 0: # No valid clusters found, return empty return np.array([]).reshape(0, 3), np.array([]).reshape(0, 3) # Find the largest cluster largest_cluster = max(clusters, key=len) # Keep only points in the largest cluster filtered_points = points[largest_cluster] filtered_colors = colors[largest_cluster] return filtered_points, filtered_colors def filter_point_cloud(points, colors, use_centroid_filter=True, use_kdtree_filter=True, use_downsample=True, use_clustering_filter=False, use_density_filter=False, distance_threshold_factor=2.0, voxel_size_initial=1.0, kdtree_radius=5.0, kdtree_min_neighbors=5, voxel_size_final=5.0, clustering_eps=10.0, clustering_min_samples=20, density_radius=10.0, density_min_points=200): """Apply filtering pipeline to remove outliers from point cloud. The pipeline consists of: 1. (Optional) Initial voxel downsampling 2. (Optional) KDTree filtering - removes sparse outliers 3. (Optional) Density filtering - removes points with insufficient neighbors in radius 4. (Optional) Final voxel downsampling Args: points: Point cloud coordinates array (N, 3) in mm colors: Color array (N, 3) use_centroid_filter: If True, apply centroid-based filtering first (default: True) - currently disabled use_kdtree_filter: If True, apply KDTree filtering (default: True) use_downsample: If True, apply voxel downsampling (default: True) use_clustering_filter: If True, apply clustering filter to keep only largest cluster (default: False) - currently disabled use_density_filter: If True, apply density filtering (default: False) distance_threshold_factor: Multiplier for centroid filter threshold (default: 2.0) voxel_size_initial: Initial voxel size for downsampling in mm (default: 3.0) kdtree_radius: KDTree search radius in mm (default: 5.0) kdtree_min_neighbors: Minimum neighbors for KDTree filter (default: 5) voxel_size_final: Final voxel size for downsampling in mm (default: 5.0) clustering_eps: Maximum distance for clustering in mm (default: 10.0) clustering_min_samples: Minimum samples for clustering (default: 20) density_radius: Radius for density filtering in mm (default: 100.0) density_min_points: Minimum points required within density_radius (default: 200) Returns: tuple: (filtered points, filtered colors) """ if points is None or len(points) == 0: return points, colors # Step 1: Initial voxel downsampling (only if we have enough points) if use_downsample and len(points) > 1000: points, colors = downsample_point_cloud(points, colors, voxel_size=voxel_size_initial) if len(points) == 0: return points, colors # # Step 2: KDTree filtering to remove sparse outliers # if use_kdtree_filter: # points, colors = filter_point_cloud_kdtree(points, colors, radius=kdtree_radius, # min_neighbors=kdtree_min_neighbors) # if len(points) == 0: # return points, colors # Step 3: Density filtering - remove points with insufficient neighbors in radius if use_density_filter: points, colors = filter_point_cloud_by_density(points, colors, radius=density_radius, min_points_in_radius=density_min_points) if len(points) == 0: return points, colors # # Step 4: Final voxel downsampling (only if we have enough points) # if use_downsample and len(points) > 1000: # points, colors = downsample_point_cloud(points, colors, voxel_size=voxel_size_final) return points, colors