diff --git a/Localization/bayes_filter.py b/Localization/bayes_filter.py new file mode 100644 index 0000000000..1329dc7378 --- /dev/null +++ b/Localization/bayes_filter.py @@ -0,0 +1,90 @@ +"""Base Bayes Filter class for localization algorithms. + +This module provides an abstract base class for Bayesian filtering algorithms +used in robot localization. It defines the common interface and methods that +should be implemented by specific filter types such as Extended Kalman Filter, +Particle Filter, Unscented Kalman Filter, etc. + +Issue #1277: Code improvement in Localization dir +Reference: https://github.com/AtsushiSakai/PythonRobotics/issues/1277 +""" + +from abc import ABC, abstractmethod +import numpy as np + + +class BayesFilter(ABC): + """Abstract base class for Bayesian filters. + + This class defines the common interface for all Bayesian filtering algorithms + used in localization. All specific filter implementations (EKF, UKF, PF, etc.) + should inherit from this class and implement the required abstract methods. + + Attributes: + state_dim (int): Dimension of the state vector + observation_dim (int): Dimension of the observation vector + """ + + def __init__(self, state_dim, observation_dim): + """Initialize the Bayes filter. + + Args: + state_dim (int): Dimension of the state vector + observation_dim (int): Dimension of the observation vector + """ + self.state_dim = state_dim + self.observation_dim = observation_dim + + @abstractmethod + def predict(self, control_input, dt): + """Prediction step of the Bayesian filter. + + Propagate the belief through the motion model. + + Args: + control_input: Control input (e.g., velocity commands) + dt (float): Time step + + Returns: + Updated belief after prediction + """ + pass + + @abstractmethod + def update(self, observation): + """Update step of the Bayesian filter. + + Update the belief based on the observation. + + Args: + observation: Sensor observation + + Returns: + Updated belief after measurement update + """ + pass + + @abstractmethod + def get_state(self): + """Get the current state estimate. + + Returns: + Current state estimate + """ + pass + + @abstractmethod + def get_covariance(self): + """Get the current uncertainty/covariance. + + Returns: + Current covariance matrix or uncertainty measure + """ + pass + + def reset(self): + """Reset the filter to initial state. + + This method can be overridden by subclasses if needed. + """ + pass diff --git a/Localization/ekf_localization.py b/Localization/ekf_localization.py new file mode 100644 index 0000000000..62e4be039b --- /dev/null +++ b/Localization/ekf_localization.py @@ -0,0 +1,218 @@ +"""Extended Kalman Filter Localization - Refactored to use BayesFilter base class. + +This module provides an Extended Kalman Filter implementation that inherits from +the BayesFilter base class, creating a consistent API across all localization filters. + +Issue #1277: Code improvement in Localization dir +Reference: https://github.com/AtsushiSakai/PythonRobotics/issues/1277 +""" + +import numpy as np +from bayes_filter import BayesFilter + + +class ExtendedKalmanFilter(BayesFilter): + """Extended Kalman Filter for robot localization. + + This class implements the Extended Kalman Filter algorithm as a subclass + of the BayesFilter base class. It maintains a Gaussian belief represented + by a mean (state) and covariance matrix. + + Attributes: + x (np.ndarray): State estimate (mean of Gaussian belief) + P (np.ndarray): Covariance matrix (uncertainty) + Q (np.ndarray): Process noise covariance + R (np.ndarray): Measurement noise covariance + """ + + def __init__(self, state_dim, observation_dim, initial_state=None, initial_cov=None): + """Initialize the Extended Kalman Filter. + + Args: + state_dim (int): Dimension of the state vector + observation_dim (int): Dimension of the observation vector + initial_state (np.ndarray, optional): Initial state estimate + initial_cov (np.ndarray, optional): Initial covariance matrix + """ + super().__init__(state_dim, observation_dim) + + # Initialize state and covariance + if initial_state is not None: + self.x = initial_state.copy() + else: + self.x = np.zeros(state_dim) + + if initial_cov is not None: + self.P = initial_cov.copy() + else: + self.P = np.eye(state_dim) + + # Initialize process and measurement noise covariances + self.Q = np.eye(state_dim) * 0.1 # Default process noise + self.R = np.eye(observation_dim) * 0.1 # Default measurement noise + + def set_noise_covariances(self, Q, R): + """Set the process and measurement noise covariances. + + Args: + Q (np.ndarray): Process noise covariance + R (np.ndarray): Measurement noise covariance + """ + self.Q = Q.copy() + self.R = R.copy() + + def predict(self, control_input, dt): + """Prediction step of the EKF. + + Propagates the state estimate and covariance forward using the motion model. + + Args: + control_input (np.ndarray): Control input (e.g., velocity commands) + dt (float): Time step + + Returns: + tuple: (predicted_state, predicted_covariance) + """ + # Motion model: x_t = f(x_{t-1}, u_t) + # This is a simple motion model - can be overridden for specific robots + F = self.jacobian_motion_model(self.x, control_input, dt) + + # Predict state + self.x = self.motion_model(self.x, control_input, dt) + + # Predict covariance + self.P = F @ self.P @ F.T + self.Q + + return self.x, self.P + + def update(self, observation, landmark_pos=None): + """Update step of the EKF. + + Updates the state estimate and covariance using the observation. + + Args: + observation (np.ndarray): Sensor observation + landmark_pos (np.ndarray, optional): Known landmark positions for observation model + + Returns: + tuple: (updated_state, updated_covariance) + """ + # Observation model: z_t = h(x_t) + H = self.jacobian_observation_model(self.x, landmark_pos) + + # Predicted observation + z_pred = self.observation_model(self.x, landmark_pos) + + # Innovation (measurement residual) + y = observation - z_pred + + # Innovation covariance + S = H @ self.P @ H.T + self.R + + # Kalman gain + K = self.P @ H.T @ np.linalg.inv(S) + + # Update state + self.x = self.x + K @ y + + # Update covariance + I = np.eye(self.state_dim) + self.P = (I - K @ H) @ self.P + + return self.x, self.P + + def get_state(self): + """Get the current state estimate. + + Returns: + np.ndarray: Current state estimate + """ + return self.x.copy() + + def get_covariance(self): + """Get the current covariance matrix. + + Returns: + np.ndarray: Current covariance matrix + """ + return self.P.copy() + + def reset(self): + """Reset the filter to initial state.""" + self.x = np.zeros(self.state_dim) + self.P = np.eye(self.state_dim) + + # Motion and observation models - to be customized for specific applications + + def motion_model(self, x, u, dt): + """Motion model function. + + Default implementation: simple kinematic model. + Override this for specific robot models. + + Args: + x (np.ndarray): Current state + u (np.ndarray): Control input + dt (float): Time step + + Returns: + np.ndarray: Predicted next state + """ + # Simple kinematic model for 2D robot [x, y, theta] + # u = [v, omega] (linear velocity, angular velocity) + if len(x) >= 3 and len(u) >= 2: + x_next = x.copy() + x_next[0] = x[0] + u[0] * np.cos(x[2]) * dt # x position + x_next[1] = x[1] + u[0] * np.sin(x[2]) * dt # y position + x_next[2] = x[2] + u[1] * dt # orientation + return x_next + return x + + def jacobian_motion_model(self, x, u, dt): + """Jacobian of the motion model. + + Args: + x (np.ndarray): Current state + u (np.ndarray): Control input + dt (float): Time step + + Returns: + np.ndarray: Jacobian matrix F + """ + # Jacobian for simple kinematic model + F = np.eye(self.state_dim) + if len(x) >= 3 and len(u) >= 2: + F[0, 2] = -u[0] * np.sin(x[2]) * dt + F[1, 2] = u[0] * np.cos(x[2]) * dt + return F + + def observation_model(self, x, landmark_pos=None): + """Observation model function. + + Default implementation: return state directly. + Override this for specific sensor models. + + Args: + x (np.ndarray): Current state + landmark_pos (np.ndarray, optional): Known landmark positions + + Returns: + np.ndarray: Predicted observation + """ + # Simple identity observation model + return x[:self.observation_dim] + + def jacobian_observation_model(self, x, landmark_pos=None): + """Jacobian of the observation model. + + Args: + x (np.ndarray): Current state + landmark_pos (np.ndarray, optional): Known landmark positions + + Returns: + np.ndarray: Jacobian matrix H + """ + # Jacobian for identity observation model + H = np.zeros((self.observation_dim, self.state_dim)) + H[:self.observation_dim, :self.observation_dim] = np.eye(self.observation_dim) + return H diff --git a/Localization/pf_localization.py b/Localization/pf_localization.py new file mode 100644 index 0000000000..3cb3745fe4 --- /dev/null +++ b/Localization/pf_localization.py @@ -0,0 +1,258 @@ +"""Particle Filter Localization - Refactored to use BayesFilter base class. + +This module provides a Particle Filter implementation that inherits from +the BayesFilter base class, creating a consistent API across all localization filters. + +Issue #1277: Code improvement in Localization dir +Reference: https://github.com/AtsushiSakai/PythonRobotics/issues/1277 +""" + +import numpy as np +from bayes_filter import BayesFilter + + +class ParticleFilter(BayesFilter): + """Particle Filter for robot localization. + + This class implements the Particle Filter algorithm (also known as Sequential + Monte Carlo) as a subclass of the BayesFilter base class. It represents the + belief as a set of weighted particles, making it suitable for non-Gaussian + and multimodal distributions. + + Attributes: + num_particles (int): Number of particles to use + particles (np.ndarray): Array of particle states [num_particles x state_dim] + weights (np.ndarray): Array of particle weights [num_particles] + """ + + def __init__(self, state_dim, observation_dim, num_particles=100): + """Initialize the Particle Filter. + + Args: + state_dim (int): Dimension of the state vector + observation_dim (int): Dimension of the observation vector + num_particles (int): Number of particles to use + """ + super().__init__(state_dim, observation_dim) + + self.num_particles = num_particles + + # Initialize particles uniformly + self.particles = np.zeros((num_particles, state_dim)) + + # Initialize weights uniformly + self.weights = np.ones(num_particles) / num_particles + + # Process and measurement noise parameters + self.process_noise_std = 0.1 + self.measurement_noise_std = 0.1 + + def initialize_particles(self, initial_state, initial_cov): + """Initialize particles around an initial state with covariance. + + Args: + initial_state (np.ndarray): Initial state estimate + initial_cov (np.ndarray): Initial covariance matrix + """ + for i in range(self.num_particles): + self.particles[i] = np.random.multivariate_normal(initial_state, initial_cov) + + # Reset weights uniformly + self.weights = np.ones(self.num_particles) / self.num_particles + + def set_noise_parameters(self, process_noise_std, measurement_noise_std): + """Set the process and measurement noise standard deviations. + + Args: + process_noise_std (float): Process noise standard deviation + measurement_noise_std (float): Measurement noise standard deviation + """ + self.process_noise_std = process_noise_std + self.measurement_noise_std = measurement_noise_std + + def predict(self, control_input, dt): + """Prediction step of the Particle Filter. + + Propagates each particle forward according to the motion model with noise. + + Args: + control_input (np.ndarray): Control input (e.g., velocity commands) + dt (float): Time step + + Returns: + np.ndarray: Particles after prediction + """ + # Propagate each particle + for i in range(self.num_particles): + # Apply motion model + self.particles[i] = self.motion_model(self.particles[i], control_input, dt) + + # Add process noise + noise = np.random.randn(self.state_dim) * self.process_noise_std + self.particles[i] += noise + + return self.particles + + def update(self, observation, landmark_pos=None): + """Update step of the Particle Filter. + + Updates particle weights based on observation likelihood and performs resampling. + + Args: + observation (np.ndarray): Sensor observation + landmark_pos (np.ndarray, optional): Known landmark positions + + Returns: + np.ndarray: Particles after update and resampling + """ + # Update weights based on observation likelihood + for i in range(self.num_particles): + # Predicted observation for this particle + z_pred = self.observation_model(self.particles[i], landmark_pos) + + # Compute likelihood using Gaussian + diff = observation - z_pred + likelihood = self._gaussian_likelihood(diff, self.measurement_noise_std) + + # Update weight + self.weights[i] *= likelihood + + # Normalize weights + weight_sum = np.sum(self.weights) + if weight_sum > 0: + self.weights /= weight_sum + else: + # If all weights are zero, reset uniformly + self.weights = np.ones(self.num_particles) / self.num_particles + + # Resample particles + self._resample() + + return self.particles + + def _gaussian_likelihood(self, diff, std): + """Compute Gaussian likelihood. + + Args: + diff (np.ndarray): Difference between observation and prediction + std (float): Standard deviation + + Returns: + float: Likelihood value + """ + # Multivariate Gaussian likelihood + exponent = -0.5 * np.dot(diff, diff) / (std ** 2) + return np.exp(exponent) + + def _resample(self): + """Resample particles based on weights (low variance resampling).""" + # Low variance resampling + new_particles = np.zeros_like(self.particles) + + # Cumulative sum of weights + cumsum = np.cumsum(self.weights) + + # Random starting point + r = np.random.uniform(0, 1.0 / self.num_particles) + + i = 0 + for j in range(self.num_particles): + u = r + j / self.num_particles + + # Find corresponding particle + while u > cumsum[i]: + i += 1 + if i >= self.num_particles: + i = self.num_particles - 1 + break + + new_particles[j] = self.particles[i] + + self.particles = new_particles + + # Reset weights uniformly after resampling + self.weights = np.ones(self.num_particles) / self.num_particles + + def get_state(self): + """Get the current state estimate (weighted mean of particles). + + Returns: + np.ndarray: Current state estimate + """ + # Weighted mean of particles + return np.average(self.particles, weights=self.weights, axis=0) + + def get_covariance(self): + """Get the current covariance (weighted covariance of particles). + + Returns: + np.ndarray: Current covariance matrix + """ + # Weighted mean + mean = self.get_state() + + # Weighted covariance + cov = np.zeros((self.state_dim, self.state_dim)) + for i in range(self.num_particles): + diff = self.particles[i] - mean + cov += self.weights[i] * np.outer(diff, diff) + + return cov + + def reset(self): + """Reset the filter to initial state.""" + # Reset particles to origin + self.particles = np.zeros((self.num_particles, self.state_dim)) + + # Reset weights uniformly + self.weights = np.ones(self.num_particles) / self.num_particles + + def get_effective_particles(self): + """Get the effective number of particles (useful for adaptive resampling). + + Returns: + float: Effective number of particles + """ + return 1.0 / np.sum(self.weights ** 2) + + # Motion and observation models - to be customized for specific applications + + def motion_model(self, x, u, dt): + """Motion model function. + + Default implementation: simple kinematic model. + Override this for specific robot models. + + Args: + x (np.ndarray): Current state + u (np.ndarray): Control input + dt (float): Time step + + Returns: + np.ndarray: Predicted next state + """ + # Simple kinematic model for 2D robot [x, y, theta] + # u = [v, omega] (linear velocity, angular velocity) + if len(x) >= 3 and len(u) >= 2: + x_next = x.copy() + x_next[0] = x[0] + u[0] * np.cos(x[2]) * dt # x position + x_next[1] = x[1] + u[0] * np.sin(x[2]) * dt # y position + x_next[2] = x[2] + u[1] * dt # orientation + return x_next + return x + + def observation_model(self, x, landmark_pos=None): + """Observation model function. + + Default implementation: return state directly. + Override this for specific sensor models. + + Args: + x (np.ndarray): Current state + landmark_pos (np.ndarray, optional): Known landmark positions + + Returns: + np.ndarray: Predicted observation + """ + # Simple identity observation model + return x[:self.observation_dim]