Source code for pykal.algorithm_library.estimators.pf

import numpy as np
from numpy.typing import NDArray
from typing import Callable, Dict, Tuple
from scipy.stats import multivariate_normal


[docs] class ParticleFilter: """ Particle Filter for nonlinear/non-Gaussian state estimation. Based on Gordon, Salmond, & Smith (1993), "Novel approach to nonlinear/non-Gaussian Bayesian state estimation", IEE Proceedings F, Vol. 140, No. 2, pp. 107-113. The Particle Filter represents the posterior distribution p(x_k | y_{1:k}) using weighted particles, enabling estimation for arbitrary nonlinear dynamics and non-Gaussian noise. """
[docs] @staticmethod def f( *, particles_weights: Tuple[NDArray, NDArray], yk: NDArray, f: Callable, f_params: Dict, h: Callable = None, h_params: Dict = None, likelihood: Callable = None, likelihood_params: Dict = None, Q: NDArray = None, R: NDArray = None, resample_threshold: float = 0.5, resample_method: str = 'systematic' ) -> Tuple[NDArray, NDArray]: """ Perform one full predict-update-resample cycle of the Particle Filter. Parameters ---------- particles_weights : Tuple[NDArray, NDArray] A tuple ``(particles, weights)`` containing: - ``particles`` : particle states, shape (N_p, n) - ``weights`` : normalized particle weights, shape (N_p,) yk : NDArray Measurement at time k, shape (m,) f : Callable Nonlinear dynamics function: x_{k+1} = f(x_k, **f_params) Should accept state as first argument f_params : Dict Additional parameters for dynamics function h : Callable, optional Nonlinear measurement function: y_k = h(x_k, **h_params) Required if likelihood is not provided Should accept state as first argument h_params : Dict, optional Additional parameters for measurement function likelihood : Callable, optional Custom likelihood function: p(y_k | x_k) = likelihood(yk, x_k, **likelihood_params) If provided, used instead of automatic Gaussian likelihood from h Should accept measurement and state as first two arguments likelihood_params : Dict, optional Additional parameters for likelihood function Q : NDArray, optional Process noise covariance, shape (n, n) Can be None if process noise is handled within the dynamics function f R : NDArray, optional Measurement noise covariance, shape (m, m) Required if h is provided (for Gaussian likelihood computation) Not needed if using custom likelihood function resample_threshold : float Effective sample size threshold (0-1) for triggering resampling ESS < threshold * N_p triggers resampling resample_method : str Resampling strategy: 'systematic', 'stratified', or 'residual' Default: 'systematic' (recommended for low variance) Returns ------- particles_new : NDArray Updated particles, shape (N_p, n) weights_new : NDArray Updated normalized weights, shape (N_p,) Notes ----- The Particle Filter approximates the posterior distribution using weighted samples: **Representation:** ``p(x_k | y_{1:k}) ≈ ∑_{i=1}^{N_p} w_k^(i) δ(x_k - x_k^(i))`` **Algorithm Steps:** 1. **Predict:** Propagate particles through dynamics with process noise ``x_k^(i) ~ p(x_k | x_{k-1}^(i))`` → ``f(x_{k-1}^(i)) + w_k`` 2. **Update:** Compute likelihood weights ``w_k^(i) ∝ p(y_k | x_k^(i))`` → evaluated via measurement model h() 3. **Normalize:** Ensure weights sum to 1 ``w_k^(i) = w_k^(i) / ∑_j w_k^(j)`` 4. **Resample:** If effective sample size (ESS) is low, resample to prevent degeneracy ``ESS = 1 / ∑_i (w_k^(i))²`` **Numerical Stability:** - Weights computed in log-space to prevent underflow - Log-sum-exp trick for normalization - Small ridge term added to prevent zero weights **References:** Gordon, N. J., Salmond, D. J., & Smith, A. F. M. (1993). Novel approach to nonlinear/non-Gaussian Bayesian state estimation. IEE Proceedings F, 140(2), 107-113. """ particles, weights = particles_weights N_p, n = particles.shape m = len(yk) # ===== PREDICT STEP ===== # Propagate particles through dynamics with process noise particles_pred = np.zeros_like(particles) for i in range(N_p): # Propagate through dynamics x_pred = f(particles[i], **f_params) # Add process noise if Q is provided (otherwise assume f handles it) if Q is not None: process_noise = np.random.multivariate_normal(np.zeros(n), Q) particles_pred[i] = x_pred + process_noise else: particles_pred[i] = x_pred # ===== UPDATE STEP ===== # Compute likelihood weights p(y_k | x_k^(i)) log_weights = np.zeros(N_p) # Use custom likelihood if provided, otherwise compute Gaussian likelihood from h if likelihood is not None: # Custom likelihood function if likelihood_params is None: likelihood_params = {} for i in range(N_p): try: weight = likelihood(yk, particles_pred[i], **likelihood_params) log_weights[i] = np.log(weight + 1e-300) # Prevent log(0) except: log_weights[i] = -np.inf else: # Automatic Gaussian likelihood from measurement function h if h is None: raise ValueError("Either h or likelihood must be provided") if R is None: raise ValueError("R (measurement noise covariance) is required when using h (not using custom likelihood)") if h_params is None: h_params = {} for i in range(N_p): # Predicted measurement y_pred = h(particles_pred[i], **h_params) # Innovation (measurement residual) innovation = yk - y_pred # Log-likelihood: log p(y | x) = -0.5 * (y-h(x))^T R^{-1} (y-h(x)) + const try: log_likelihood = multivariate_normal.logpdf(yk, mean=y_pred, cov=R) except: # Fallback for numerical issues try: R_inv = np.linalg.inv(R) log_likelihood = -0.5 * (innovation.T @ R_inv @ innovation) except: log_likelihood = -np.inf log_weights[i] = log_likelihood # Normalize weights (log-sum-exp trick for numerical stability) max_log_weight = np.max(log_weights) if not np.isfinite(max_log_weight): # All weights are -inf, reinitialize uniformly weights_new = np.ones(N_p) / N_p else: weights_new = np.exp(log_weights - max_log_weight) weights_new += 1e-300 # Prevent exact zeros weights_new /= np.sum(weights_new) # ===== RESAMPLE STEP ===== # Check effective sample size ess = ParticleFilter.effective_sample_size(weights_new) ess_ratio = ess / N_p if ess_ratio < resample_threshold: # Resample particles to prevent degeneracy if resample_method == 'systematic': particles_new, weights_new = ParticleFilter._resample_systematic( particles_pred, weights_new ) elif resample_method == 'stratified': particles_new, weights_new = ParticleFilter._resample_stratified( particles_pred, weights_new ) elif resample_method == 'residual': particles_new, weights_new = ParticleFilter._resample_residual( particles_pred, weights_new ) else: raise ValueError(f"Unknown resampling method: {resample_method}") else: # No resampling needed particles_new = particles_pred return particles_new, weights_new
[docs] @staticmethod def h(particles_weights: Tuple[NDArray, NDArray]) -> NDArray: """ Compute the weighted mean state estimate from particles. Parameters ---------- particles_weights : Tuple[NDArray, NDArray] Tuple (particles, weights) Returns ------- xhat : NDArray Weighted mean state estimate, shape (n,) Notes ----- The weighted mean is computed as: .. math:: \\hat{x}_k = \\sum_{i=1}^{N_p} w_k^{(i)} x_k^{(i)} For multimodal distributions, the weighted mean may not be the best point estimate. Alternative estimators: - **Maximum a posteriori (MAP):** ``particles[np.argmax(weights)]`` - **Median:** Component-wise weighted median Examples -------- >>> import numpy as np >>> particles = np.array([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]]) >>> weights = np.array([0.5, 0.3, 0.2]) >>> particles_weights = (particles, weights) >>> xhat = ParticleFilter.standard_h(particles_weights) >>> xhat array([2.4, 3.4]) """ particles, weights = particles_weights # Weighted mean estimate xhat = np.sum(weights[:, np.newaxis] * particles, axis=0) return xhat
[docs] @staticmethod def effective_sample_size(weights: NDArray) -> float: """ Compute the effective sample size (ESS) of the particle set. Parameters ---------- weights : NDArray Normalized particle weights, shape (N_p,) Returns ------- ess : float Effective sample size, range [1, N_p] Notes ----- The effective sample size quantifies particle diversity: .. math:: \\text{ESS} = \\frac{1}{\\sum_{i=1}^{N_p} (w_i)^2} **Interpretation:** - ``ESS ≈ N_p``: Weights are uniform (good diversity) - ``ESS ≈ 1``: One particle has all weight (degeneracy) **Resampling criterion:** Resample when ``ESS < threshold * N_p`` (typically threshold = 0.5) Examples -------- >>> import numpy as np >>> # Uniform weights (maximum diversity) >>> weights = np.ones(100) / 100 >>> ess = ParticleFilter.effective_sample_size(weights) >>> ess 100.0 >>> # Degenerate case (one particle has all weight) >>> weights = np.zeros(100) >>> weights[0] = 1.0 >>> ess = ParticleFilter.effective_sample_size(weights) >>> ess 1.0 """ return 1.0 / np.sum(weights**2)
@staticmethod def _resample_systematic( particles: NDArray, weights: NDArray ) -> Tuple[NDArray, NDArray]: """ Systematic resampling (low variance, recommended). Parameters ---------- particles : NDArray Particle states, shape (N_p, n) weights : NDArray Normalized weights, shape (N_p,) Returns ------- particles_new : NDArray Resampled particles, shape (N_p, n) weights_new : NDArray Uniform weights, shape (N_p,) Notes ----- Systematic resampling uses deterministic spacing with a single random offset, resulting in low variance and computational efficiency O(N_p). """ N_p = len(weights) # Single random offset for all samples positions = (np.arange(N_p) + np.random.random()) / N_p # Cumulative sum of weights cumsum = np.cumsum(weights) # Find indices via inverse CDF indices = np.searchsorted(cumsum, positions) # Resample particles particles_new = particles[indices] # Uniform weights after resampling weights_new = np.ones(N_p) / N_p return particles_new, weights_new @staticmethod def _resample_stratified( particles: NDArray, weights: NDArray ) -> Tuple[NDArray, NDArray]: """ Stratified resampling (low variance). Parameters ---------- particles : NDArray Particle states, shape (N_p, n) weights : NDArray Normalized weights, shape (N_p,) Returns ------- particles_new : NDArray Resampled particles, shape (N_p, n) weights_new : NDArray Uniform weights, shape (N_p,) Notes ----- Stratified resampling uses independent random offsets for each stratum, providing low variance similar to systematic resampling. """ N_p = len(weights) # Independent random offset for each sample positions = (np.arange(N_p) + np.random.random(N_p)) / N_p # Cumulative sum of weights cumsum = np.cumsum(weights) # Find indices indices = np.searchsorted(cumsum, positions) # Resample particles particles_new = particles[indices] weights_new = np.ones(N_p) / N_p return particles_new, weights_new @staticmethod def _resample_residual( particles: NDArray, weights: NDArray ) -> Tuple[NDArray, NDArray]: """ Residual resampling (minimum variance). Parameters ---------- particles : NDArray Particle states, shape (N_p, n) weights : NDArray Normalized weights, shape (N_p,) Returns ------- particles_new : NDArray Resampled particles, shape (N_p, n) weights_new : NDArray Uniform weights, shape (N_p,) Notes ----- Residual resampling combines deterministic and stochastic sampling, achieving minimum variance among standard resampling methods. """ N_p = len(weights) # Deterministic part: take floor(N_p * w_i) copies counts = np.floor(N_p * weights).astype(int) # Stochastic part: sample remainder residual = N_p * weights - counts residual_normalized = residual / np.sum(residual) # Multinomial sampling for remainder residual_counts = np.random.multinomial(N_p - np.sum(counts), residual_normalized) counts += residual_counts # Generate particle indices indices = np.repeat(np.arange(N_p), counts) # Resample particles particles_new = particles[indices] weights_new = np.ones(N_p) / N_p return particles_new, weights_new
# Alias for consistency with other algorithms (KF, UKF, etc.) PF = ParticleFilter # Module-level aliases for convenience # Allows usage like: from pykal.algorithm_library.estimators import pf; pf.f(...) f = ParticleFilter.f h = ParticleFilter.h