Estimators
pykal.algorithm_library.estimators
State estimation algorithms from the robotics and control theory literature.
- Available Modules:
kf: Kalman filter and Extended Kalman Filter (EKF) ukf: Unscented Kalman Filter (UKF) pf: Particle Filter for nonlinear/non-Gaussian estimation ai_ukf: Augmented Information UKF for hybrid physics-data-driven estimation
State estimation algorithms from the robotics and control theory literature.
Kalman Filter
- class pykal.algorithm_library.estimators.kf.KF[source]
Bases:
object- static f(*, zk, yk, f, f_h_params, h, Fk, Qk, Hk, Rk)[source]
Perform one full predict–update step of the discrete-time Kalman Filter.
- Return type:
Tuple[ndarray[tuple[Any,...],dtype[TypeVar(_ScalarT, bound=generic)]],ndarray[tuple[Any,...],dtype[TypeVar(_ScalarT, bound=generic)]]]- Parameters:
- pykal.algorithm_library.estimators.kf.f(*, zk, yk, f, f_h_params, h, Fk, Qk, Hk, Rk)
Perform one full predict–update step of the discrete-time Kalman Filter.
- Return type:
Tuple[ndarray[tuple[Any,...],dtype[TypeVar(_ScalarT, bound=generic)]],ndarray[tuple[Any,...],dtype[TypeVar(_ScalarT, bound=generic)]]]- Parameters:
Unscented Kalman Filter
- class pykal.algorithm_library.estimators.ukf.UKF[source]
Bases:
objectUnscented Kalman Filter (UKF) for nonlinear state estimation.
Based on Julier & Uhlmann (2004), “Unscented Filtering and Nonlinear Estimation”, Proceedings of the IEEE, Vol. 92, No. 3, pp. 401-422.
The UKF uses the unscented transformation to propagate mean and covariance through nonlinear functions without requiring Jacobian calculations.
- static f(*, xhat_P, yk, f, f_params, h, h_params, Qk, Rk, alpha=0.001, beta=2.0, kappa=0.0)[source]
Perform one full predict-update step of the Unscented Kalman Filter.
- Parameters:
xhat_P (Tuple[NDArray, NDArray]) –
- Tuple (x_hat_k, P_k) containing:
x_hat_k : current state estimate, shape (n,)
P_k : current state covariance, shape (n, n)
yk (NDArray) – Measurement at time k, shape (m,)
f (Callable) – Nonlinear state evolution function: x_{k+1} = f(x_k, **f_params) Should accept state as first argument
f_params (Dict) – Additional parameters for state evolution function
h (Callable) – Nonlinear measurement function: y_k = h(x_k, **h_params) Should accept state as first argument
h_params (Dict) – Additional parameters for measurement function
Qk (NDArray) – Process noise covariance, shape (n, n)
Rk (NDArray) – Measurement noise covariance, shape (m, m)
alpha (float) – UKF tuning parameter (spread of sigma points), typically 1e-4 to 1
beta (float) – UKF tuning parameter (distribution shape), 2 is optimal for Gaussian
kappa (float) – UKF tuning parameter (secondary scaling), typically 0 or 3-n
- Return type:
Tuple[ndarray[tuple[Any,...],dtype[TypeVar(_ScalarT, bound=generic)]],ndarray[tuple[Any,...],dtype[TypeVar(_ScalarT, bound=generic)]]]- Returns:
x_upd (NDArray) – Updated state estimate, shape (n,)
P_upd (NDArray) – Updated state covariance, shape (n, n)
Notes
The UKF uses the unscented transformation to propagate the state estimate through nonlinear dynamics and measurement functions. Unlike the EKF, it does not require Jacobian calculations.
Predict Step: 1. Generate sigma points from current estimate 2. Propagate sigma points through dynamics f() 3. Compute predicted mean and covariance
Update Step: 1. Propagate predicted sigma points through measurement h() 2. Compute predicted measurement mean and covariance 3. Compute cross-covariance between state and measurement 4. Compute Kalman gain and update estimate
References: Julier, S. J., & Uhlmann, J. K. (2004). Unscented filtering and nonlinear estimation. Proceedings of the IEEE, 92(3), 401-422.
- pykal.algorithm_library.estimators.ukf.f(*, xhat_P, yk, f, f_params, h, h_params, Qk, Rk, alpha=0.001, beta=2.0, kappa=0.0)
Perform one full predict-update step of the Unscented Kalman Filter.
- Parameters:
xhat_P (Tuple[NDArray, NDArray]) –
- Tuple (x_hat_k, P_k) containing:
x_hat_k : current state estimate, shape (n,)
P_k : current state covariance, shape (n, n)
yk (NDArray) – Measurement at time k, shape (m,)
f (Callable) – Nonlinear state evolution function: x_{k+1} = f(x_k, **f_params) Should accept state as first argument
f_params (Dict) – Additional parameters for state evolution function
h (Callable) – Nonlinear measurement function: y_k = h(x_k, **h_params) Should accept state as first argument
h_params (Dict) – Additional parameters for measurement function
Qk (NDArray) – Process noise covariance, shape (n, n)
Rk (NDArray) – Measurement noise covariance, shape (m, m)
alpha (float) – UKF tuning parameter (spread of sigma points), typically 1e-4 to 1
beta (float) – UKF tuning parameter (distribution shape), 2 is optimal for Gaussian
kappa (float) – UKF tuning parameter (secondary scaling), typically 0 or 3-n
- Return type:
Tuple[ndarray[tuple[Any,...],dtype[TypeVar(_ScalarT, bound=generic)]],ndarray[tuple[Any,...],dtype[TypeVar(_ScalarT, bound=generic)]]]- Returns:
x_upd (NDArray) – Updated state estimate, shape (n,)
P_upd (NDArray) – Updated state covariance, shape (n, n)
Notes
The UKF uses the unscented transformation to propagate the state estimate through nonlinear dynamics and measurement functions. Unlike the EKF, it does not require Jacobian calculations.
Predict Step: 1. Generate sigma points from current estimate 2. Propagate sigma points through dynamics f() 3. Compute predicted mean and covariance
Update Step: 1. Propagate predicted sigma points through measurement h() 2. Compute predicted measurement mean and covariance 3. Compute cross-covariance between state and measurement 4. Compute Kalman gain and update estimate
References: Julier, S. J., & Uhlmann, J. K. (2004). Unscented filtering and nonlinear estimation. Proceedings of the IEEE, 92(3), 401-422.
- pykal.algorithm_library.estimators.ukf.h(xhat_P)
Extract current state estimate.
- Parameters:
xhat_P (Tuple[NDArray, NDArray]) – State estimate and covariance tuple
- Returns:
x_hat – Current state estimate
- Return type:
NDArray
Particle Filter
- pykal.algorithm_library.estimators.pf.PF
alias of
ParticleFilter
- class pykal.algorithm_library.estimators.pf.ParticleFilter[source]
Bases:
objectParticle Filter for nonlinear/non-Gaussian state estimation.
Based on Gordon, Salmond, & Smith (1993), “Novel approach to nonlinear/non-Gaussian Bayesian state estimation”, IEE Proceedings F, Vol. 140, No. 2, pp. 107-113.
The Particle Filter represents the posterior distribution p(x_k | y_{1:k}) using weighted particles, enabling estimation for arbitrary nonlinear dynamics and non-Gaussian noise.
- static effective_sample_size(weights)[source]
Compute the effective sample size (ESS) of the particle set.
- Parameters:
weights (NDArray) – Normalized particle weights, shape (N_p,)
- Returns:
ess – Effective sample size, range [1, N_p]
- Return type:
Notes
The effective sample size quantifies particle diversity:
\[\text{ESS} = \frac{1}{\sum_{i=1}^{N_p} (w_i)^2}\]Interpretation: -
ESS ≈ N_p: Weights are uniform (good diversity) -ESS ≈ 1: One particle has all weight (degeneracy)Resampling criterion: Resample when
ESS < threshold * N_p(typically threshold = 0.5)Examples
>>> import numpy as np >>> # Uniform weights (maximum diversity) >>> weights = np.ones(100) / 100 >>> ess = ParticleFilter.effective_sample_size(weights) >>> ess 100.0 >>> # Degenerate case (one particle has all weight) >>> weights = np.zeros(100) >>> weights[0] = 1.0 >>> ess = ParticleFilter.effective_sample_size(weights) >>> ess 1.0
- static f(*, particles_weights, yk, f, f_params, h=None, h_params=None, likelihood=None, likelihood_params=None, Q=None, R=None, resample_threshold=0.5, resample_method='systematic')[source]
Perform one full predict-update-resample cycle of the Particle Filter.
- Parameters:
particles_weights (Tuple[NDArray, NDArray]) –
- A tuple
(particles, weights)containing: particles: particle states, shape (N_p, n)weights: normalized particle weights, shape (N_p,)
- A tuple
yk (NDArray) – Measurement at time k, shape (m,)
f (Callable) – Nonlinear dynamics function: x_{k+1} = f(x_k, **f_params) Should accept state as first argument
f_params (Dict) – Additional parameters for dynamics function
h (Callable, optional) – Nonlinear measurement function: y_k = h(x_k, **h_params) Required if likelihood is not provided Should accept state as first argument
h_params (Dict, optional) – Additional parameters for measurement function
likelihood (Callable, optional) – Custom likelihood function: p(y_k | x_k) = likelihood(yk, x_k, **likelihood_params) If provided, used instead of automatic Gaussian likelihood from h Should accept measurement and state as first two arguments
likelihood_params (Dict, optional) – Additional parameters for likelihood function
Q (NDArray, optional) – Process noise covariance, shape (n, n) Can be None if process noise is handled within the dynamics function f
R (NDArray, optional) – Measurement noise covariance, shape (m, m) Required if h is provided (for Gaussian likelihood computation) Not needed if using custom likelihood function
resample_threshold (float) – Effective sample size threshold (0-1) for triggering resampling ESS < threshold * N_p triggers resampling
resample_method (str) – Resampling strategy: ‘systematic’, ‘stratified’, or ‘residual’ Default: ‘systematic’ (recommended for low variance)
- Return type:
Tuple[ndarray[tuple[Any,...],dtype[TypeVar(_ScalarT, bound=generic)]],ndarray[tuple[Any,...],dtype[TypeVar(_ScalarT, bound=generic)]]]- Returns:
particles_new (NDArray) – Updated particles, shape (N_p, n)
weights_new (NDArray) – Updated normalized weights, shape (N_p,)
Notes
The Particle Filter approximates the posterior distribution using weighted samples:
- Representation:
p(x_k | y_{1:k}) ≈ ∑_{i=1}^{N_p} w_k^(i) δ(x_k - x_k^(i))
Algorithm Steps:
- Predict: Propagate particles through dynamics with process noise
x_k^(i) ~ p(x_k | x_{k-1}^(i))→f(x_{k-1}^(i)) + w_k
- Update: Compute likelihood weights
w_k^(i) ∝ p(y_k | x_k^(i))→ evaluated via measurement model h()
- Normalize: Ensure weights sum to 1
w_k^(i) = w_k^(i) / ∑_j w_k^(j)
- Resample: If effective sample size (ESS) is low, resample to prevent degeneracy
ESS = 1 / ∑_i (w_k^(i))²
Numerical Stability: - Weights computed in log-space to prevent underflow - Log-sum-exp trick for normalization - Small ridge term added to prevent zero weights
References: Gordon, N. J., Salmond, D. J., & Smith, A. F. M. (1993). Novel approach to nonlinear/non-Gaussian Bayesian state estimation. IEE Proceedings F, 140(2), 107-113.
- static h(particles_weights)[source]
Compute the weighted mean state estimate from particles.
- Parameters:
particles_weights (Tuple[NDArray, NDArray]) – Tuple (particles, weights)
- Returns:
xhat – Weighted mean state estimate, shape (n,)
- Return type:
NDArray
Notes
The weighted mean is computed as:
\[\hat{x}_k = \sum_{i=1}^{N_p} w_k^{(i)} x_k^{(i)}\]For multimodal distributions, the weighted mean may not be the best point estimate. Alternative estimators: - Maximum a posteriori (MAP):
particles[np.argmax(weights)]- Median: Component-wise weighted medianExamples
>>> import numpy as np >>> particles = np.array([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]]) >>> weights = np.array([0.5, 0.3, 0.2]) >>> particles_weights = (particles, weights) >>> xhat = ParticleFilter.standard_h(particles_weights) >>> xhat array([2.4, 3.4])
- pykal.algorithm_library.estimators.pf.f(*, particles_weights, yk, f, f_params, h=None, h_params=None, likelihood=None, likelihood_params=None, Q=None, R=None, resample_threshold=0.5, resample_method='systematic')
Perform one full predict-update-resample cycle of the Particle Filter.
- Parameters:
particles_weights (Tuple[NDArray, NDArray]) –
- A tuple
(particles, weights)containing: particles: particle states, shape (N_p, n)weights: normalized particle weights, shape (N_p,)
- A tuple
yk (NDArray) – Measurement at time k, shape (m,)
f (Callable) – Nonlinear dynamics function: x_{k+1} = f(x_k, **f_params) Should accept state as first argument
f_params (Dict) – Additional parameters for dynamics function
h (Callable, optional) – Nonlinear measurement function: y_k = h(x_k, **h_params) Required if likelihood is not provided Should accept state as first argument
h_params (Dict, optional) – Additional parameters for measurement function
likelihood (Callable, optional) – Custom likelihood function: p(y_k | x_k) = likelihood(yk, x_k, **likelihood_params) If provided, used instead of automatic Gaussian likelihood from h Should accept measurement and state as first two arguments
likelihood_params (Dict, optional) – Additional parameters for likelihood function
Q (NDArray, optional) – Process noise covariance, shape (n, n) Can be None if process noise is handled within the dynamics function f
R (NDArray, optional) – Measurement noise covariance, shape (m, m) Required if h is provided (for Gaussian likelihood computation) Not needed if using custom likelihood function
resample_threshold (float) – Effective sample size threshold (0-1) for triggering resampling ESS < threshold * N_p triggers resampling
resample_method (str) – Resampling strategy: ‘systematic’, ‘stratified’, or ‘residual’ Default: ‘systematic’ (recommended for low variance)
- Return type:
Tuple[ndarray[tuple[Any,...],dtype[TypeVar(_ScalarT, bound=generic)]],ndarray[tuple[Any,...],dtype[TypeVar(_ScalarT, bound=generic)]]]- Returns:
particles_new (NDArray) – Updated particles, shape (N_p, n)
weights_new (NDArray) – Updated normalized weights, shape (N_p,)
Notes
The Particle Filter approximates the posterior distribution using weighted samples:
- Representation:
p(x_k | y_{1:k}) ≈ ∑_{i=1}^{N_p} w_k^(i) δ(x_k - x_k^(i))
Algorithm Steps:
- Predict: Propagate particles through dynamics with process noise
x_k^(i) ~ p(x_k | x_{k-1}^(i))→f(x_{k-1}^(i)) + w_k
- Update: Compute likelihood weights
w_k^(i) ∝ p(y_k | x_k^(i))→ evaluated via measurement model h()
- Normalize: Ensure weights sum to 1
w_k^(i) = w_k^(i) / ∑_j w_k^(j)
- Resample: If effective sample size (ESS) is low, resample to prevent degeneracy
ESS = 1 / ∑_i (w_k^(i))²
Numerical Stability: - Weights computed in log-space to prevent underflow - Log-sum-exp trick for normalization - Small ridge term added to prevent zero weights
References: Gordon, N. J., Salmond, D. J., & Smith, A. F. M. (1993). Novel approach to nonlinear/non-Gaussian Bayesian state estimation. IEE Proceedings F, 140(2), 107-113.
- pykal.algorithm_library.estimators.pf.h(particles_weights)
Compute the weighted mean state estimate from particles.
- Parameters:
particles_weights (Tuple[NDArray, NDArray]) – Tuple (particles, weights)
- Returns:
xhat – Weighted mean state estimate, shape (n,)
- Return type:
NDArray
Notes
The weighted mean is computed as:
\[\hat{x}_k = \sum_{i=1}^{N_p} w_k^{(i)} x_k^{(i)}\]For multimodal distributions, the weighted mean may not be the best point estimate. Alternative estimators: - Maximum a posteriori (MAP):
particles[np.argmax(weights)]- Median: Component-wise weighted medianExamples
>>> import numpy as np >>> particles = np.array([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]]) >>> weights = np.array([0.5, 0.3, 0.2]) >>> particles_weights = (particles, weights) >>> xhat = ParticleFilter.standard_h(particles_weights) >>> xhat array([2.4, 3.4])
AI-UKF (Augmented Information UKF)
- class pykal.algorithm_library.estimators.ai_ukf.AIUKF[source]
Bases:
objectAugmented Information Unscented Kalman Filter (AI-UKF) for hybrid physics-data-driven state estimation.
Based on Cellini et al. (2025), “Discovering and exploiting active sensing motifs for estimation”, arXiv:2511.08766.
The AI-UKF extends the standard UKF by augmenting measurements with data-driven predictions (e.g., from Artificial Neural Networks) and using observability-aware time-varying measurement covariance to dynamically adjust trust in these predictions.
Key Innovation: Time-varying R that adapts based on instantaneous observability, enabling robust estimation even with poor initial conditions or weak observability.
- static compute_observability_variance(observability_indicators, window_size, base_variance=1.0)[source]
Compute time-varying R for ANN measurements based on observability.
- Parameters:
observability_indicators (NDArray) – Time series of observability indicators, shape (T,) Higher values indicate better observability Examples: |acceleration|, |velocity|, |angular_rate|
window_size (int) – Number of past timesteps to consider for minimum
base_variance (float) – Base measurement variance (scales the output)
- Returns:
R_ann – Time-varying ANN measurement variance, shape (T,)
- Return type:
NDArray
Notes
The observability-based variance is computed as:
\[\check{R}_{ANN}(t) = \frac{c}{\min_{i \in [t-w, t]} \sigma(i)}\]where: - \(c\) is the base_variance parameter - \(w\) is the window_size - \(\sigma(i)\) is the observability indicator at time \(i\)
Intuition: - High observability (large \(\sigma\)) → Small \(\check{R}_{ANN}\) → Trust ANN - Low observability (small \(\sigma\)) → Large \(\check{R}_{ANN}\) → Don’t trust ANN
Example: Altitude from Optic Flow When horizontal acceleration is high, altitude is observable from optic flow. When hovering (low acceleration), altitude is unobservable.
accel_x = np.abs(acceleration_history) # Observability indicator R_ann = AIUKF.compute_observability_variance( accel_x, window_size=10, base_variance=1.0 )
Numerical Stability: To prevent division by zero or very small numbers: - A small epsilon is added to the minimum observability - Very small observability indicators result in very large R (effectively ignoring ANN)
Examples
>>> import numpy as np >>> # Simulate varying acceleration (observability indicator) >>> accel = np.array([0.1, 0.5, 1.0, 0.2, 0.8, 0.05, 0.3]) >>> R_ann = AIUKF.compute_observability_variance(accel, window_size=3) >>> R_ann.shape (7,) >>> # When acceleration is high (good observability), R is low >>> R_ann[2] # accel=1.0 is high 1.0 >>> # When acceleration is low (poor observability), R is high >>> R_ann[5] > R_ann[2] # accel=0.05 vs accel=1.0 True
- static create_augmented_R(R_sensor, R_ann_time_varying, num_sensor_measurements, time_index)[source]
Construct augmented measurement covariance matrix at a specific time.
- Parameters:
R_sensor (NDArray) – Sensor measurement covariance (constant), shape (m_sensor, m_sensor)
R_ann_time_varying (NDArray) – Time series of ANN measurement variances, shape (T,) or (T, m_ann) for multiple ANN measurements
num_sensor_measurements (int) – Number of sensor measurements (m_sensor)
time_index (int) – Current time index
- Returns:
R_aug – Augmented measurement covariance, shape (m_aug, m_aug)
- Return type:
NDArray
Notes
Constructs block-diagonal augmented R:
\[\begin{split}\mathbf{R}_{aug}(t) = \begin{bmatrix} \mathbf{R}_{sensor} & \mathbf{0} \\ \mathbf{0} & \check{R}_{ANN}(t) \end{bmatrix}\end{split}\]Examples
>>> import numpy as np >>> R_sensor = np.diag([0.1, 0.2, 0.05]) # 3 sensor measurements >>> R_ann_series = np.array([1.0, 0.5, 2.0, 0.8]) # Time series >>> R_aug = AIUKF.create_augmented_R(R_sensor, R_ann_series, 3, time_index=1) >>> R_aug.shape (4, 4) >>> R_aug[-1, -1] # ANN variance at time_index=1 0.5
- static f(*, xhat_P, yk_aug, f, f_params, h_aug, h_aug_params, Qk, Rk_aug, alpha=0.001, beta=2.0, kappa=0.0)[source]
Perform one full predict-update step of the AI-UKF with augmented measurements.
- Parameters:
xhat_P (Tuple[NDArray, NDArray]) –
- Tuple (x_hat_k, P_k) containing:
x_hat_k : current state estimate, shape (n,)
P_k : current state covariance, shape (n, n)
yk_aug (NDArray) – Augmented measurement at time k, shape (m_aug,) Typically [sensor_measurements, ANN_predictions]
f (Callable) – Nonlinear state evolution function: x_{k+1} = f(x_k, **f_params) Should accept state as first argument
f_params (Dict) – Additional parameters for state evolution function
h_aug (Callable) – Augmented nonlinear measurement function: y_aug_k = h_aug(x_k, **h_aug_params) Should return augmented measurement vector including ANN-predicted states
h_aug_params (Dict) – Additional parameters for augmented measurement function
Qk (NDArray) – Process noise covariance, shape (n, n)
Rk_aug (NDArray) –
Time-varying augmented measurement noise covariance, shape (m_aug, m_aug) Structure: R_aug = [[R_sensor, 0 ],
[0, R_ANN(t) ]]
where R_ANN(t) varies based on observability
alpha (float) – UKF tuning parameter (spread of sigma points), typically 1e-4 to 1
beta (float) – UKF tuning parameter (distribution shape), 2 is optimal for Gaussian
kappa (float) – UKF tuning parameter (secondary scaling), typically 0 or 3-n
- Return type:
Tuple[ndarray[tuple[Any,...],dtype[TypeVar(_ScalarT, bound=generic)]],ndarray[tuple[Any,...],dtype[TypeVar(_ScalarT, bound=generic)]]]- Returns:
x_upd (NDArray) – Updated state estimate, shape (n,)
P_upd (NDArray) – Updated state covariance, shape (n, n)
Notes
The AI-UKF algorithm follows the standard UKF predict-update cycle but with key modifications:
Augmented Measurement Model:
\[\begin{split}\mathbf{y}_{aug}(t) = \begin{bmatrix} \mathbf{h}(\mathbf{x}(t)) \\ \check{h}_{ANN}(\mathbf{y}_{[t-w:t]}) \end{bmatrix}\end{split}\]where \(\check{h}_{ANN}\) is a data-driven estimator (e.g., ANN) that predicts poorly observable states from a time window of measurements.
Time-Varying Measurement Covariance:
\[\begin{split}\mathbf{R}_{aug}(t) = \begin{bmatrix} \mathbf{R}_{sensor} & \mathbf{0} \\ \mathbf{0} & \check{R}_{ANN}(t) \end{bmatrix}\end{split}\]The ANN measurement variance adapts based on observability:
\[\check{R}_{ANN}(t) = \frac{c}{\min_{i \in [t-w, t]} \sigma(i)}\]where \(\sigma(i)\) is an observability indicator (e.g., |acceleration|): - High observability → Small \(\check{R}_{ANN}\) → Trust ANN - Low observability → Large \(\check{R}_{ANN}\) → Ignore ANN
Algorithm Steps:
Predict: Propagate sigma points through dynamics \(f()\)
Update: Use augmented measurements and time-varying \(\mathbf{R}_{aug}\)
Adaptive Trust: Automatically adjust ANN influence based on observability
When to Use AI-UKF: - Initial conditions are very uncertain (standard UKF would diverge) - States are intermittently observable (e.g., altitude from optic flow) - You have trained data-driven estimators for weakly observable states
References: Cellini, B., Boyacioglu, B., Lopez, A., & van Breugel, F. (2025). Discovering and exploiting active sensing motifs for estimation. arXiv:2511.08766.
Examples
>>> import numpy as np >>> from pykal.algorithm_library.estimators.ai_ukf import AIUKF >>> >>> # Simple 1D example with augmented altitude measurement >>> x_hat = np.array([0.0]) # State: altitude >>> P = np.eye(1) * 100 # Large initial uncertainty >>> xhat_P = (x_hat, P) >>> >>> # Augmented measurement: [optic_flow, altitude_ANN_estimate] >>> yk_aug = np.array([0.5, 10.0]) >>> >>> # Define dynamics and augmented measurement functions >>> def f(x): return x # Constant altitude >>> def h_aug(x): return np.array([x[0]/10, x[0]]) # [optic_flow, altitude] >>> >>> # Time-varying R: trust ANN more when observability is high >>> R_sensor = np.array([[0.1]]) >>> R_ANN = np.array([[1.0]]) # Varies with time in practice >>> Rk_aug = np.block([[R_sensor, np.zeros((1,1))], ... [np.zeros((1,1)), R_ANN]]) >>> >>> Q = np.eye(1) * 0.01 >>> >>> # Run AI-UKF >>> x_upd, P_upd = AIUKF.standard_f( ... xhat_P=xhat_P, yk_aug=yk_aug, f=f, f_params={}, ... h_aug=h_aug, h_aug_params={}, Qk=Q, Rk_aug=Rk_aug ... ) >>> x_upd.shape (1,)
- static h(xhat_P)[source]
Extract current state estimate.
- Parameters:
xhat_P (Tuple[NDArray, NDArray]) – State estimate and covariance tuple
- Returns:
x_hat – Current state estimate
- Return type:
NDArray
Notes
This function is identical to UKF.standard_h() - it simply extracts the state estimate from the (x_hat, P) tuple.
- pykal.algorithm_library.estimators.ai_ukf.compute_observability_variance(observability_indicators, window_size, base_variance=1.0)
Compute time-varying R for ANN measurements based on observability.
- Parameters:
observability_indicators (NDArray) – Time series of observability indicators, shape (T,) Higher values indicate better observability Examples: |acceleration|, |velocity|, |angular_rate|
window_size (int) – Number of past timesteps to consider for minimum
base_variance (float) – Base measurement variance (scales the output)
- Returns:
R_ann – Time-varying ANN measurement variance, shape (T,)
- Return type:
NDArray
Notes
The observability-based variance is computed as:
\[\check{R}_{ANN}(t) = \frac{c}{\min_{i \in [t-w, t]} \sigma(i)}\]where: - \(c\) is the base_variance parameter - \(w\) is the window_size - \(\sigma(i)\) is the observability indicator at time \(i\)
Intuition: - High observability (large \(\sigma\)) → Small \(\check{R}_{ANN}\) → Trust ANN - Low observability (small \(\sigma\)) → Large \(\check{R}_{ANN}\) → Don’t trust ANN
Example: Altitude from Optic Flow When horizontal acceleration is high, altitude is observable from optic flow. When hovering (low acceleration), altitude is unobservable.
accel_x = np.abs(acceleration_history) # Observability indicator R_ann = AIUKF.compute_observability_variance( accel_x, window_size=10, base_variance=1.0 )
Numerical Stability: To prevent division by zero or very small numbers: - A small epsilon is added to the minimum observability - Very small observability indicators result in very large R (effectively ignoring ANN)
Examples
>>> import numpy as np >>> # Simulate varying acceleration (observability indicator) >>> accel = np.array([0.1, 0.5, 1.0, 0.2, 0.8, 0.05, 0.3]) >>> R_ann = AIUKF.compute_observability_variance(accel, window_size=3) >>> R_ann.shape (7,) >>> # When acceleration is high (good observability), R is low >>> R_ann[2] # accel=1.0 is high 1.0 >>> # When acceleration is low (poor observability), R is high >>> R_ann[5] > R_ann[2] # accel=0.05 vs accel=1.0 True
- pykal.algorithm_library.estimators.ai_ukf.f(*, xhat_P, yk_aug, f, f_params, h_aug, h_aug_params, Qk, Rk_aug, alpha=0.001, beta=2.0, kappa=0.0)
Perform one full predict-update step of the AI-UKF with augmented measurements.
- Parameters:
xhat_P (Tuple[NDArray, NDArray]) –
- Tuple (x_hat_k, P_k) containing:
x_hat_k : current state estimate, shape (n,)
P_k : current state covariance, shape (n, n)
yk_aug (NDArray) – Augmented measurement at time k, shape (m_aug,) Typically [sensor_measurements, ANN_predictions]
f (Callable) – Nonlinear state evolution function: x_{k+1} = f(x_k, **f_params) Should accept state as first argument
f_params (Dict) – Additional parameters for state evolution function
h_aug (Callable) – Augmented nonlinear measurement function: y_aug_k = h_aug(x_k, **h_aug_params) Should return augmented measurement vector including ANN-predicted states
h_aug_params (Dict) – Additional parameters for augmented measurement function
Qk (NDArray) – Process noise covariance, shape (n, n)
Rk_aug (NDArray) –
Time-varying augmented measurement noise covariance, shape (m_aug, m_aug) Structure: R_aug = [[R_sensor, 0 ],
[0, R_ANN(t) ]]
where R_ANN(t) varies based on observability
alpha (float) – UKF tuning parameter (spread of sigma points), typically 1e-4 to 1
beta (float) – UKF tuning parameter (distribution shape), 2 is optimal for Gaussian
kappa (float) – UKF tuning parameter (secondary scaling), typically 0 or 3-n
- Return type:
Tuple[ndarray[tuple[Any,...],dtype[TypeVar(_ScalarT, bound=generic)]],ndarray[tuple[Any,...],dtype[TypeVar(_ScalarT, bound=generic)]]]- Returns:
x_upd (NDArray) – Updated state estimate, shape (n,)
P_upd (NDArray) – Updated state covariance, shape (n, n)
Notes
The AI-UKF algorithm follows the standard UKF predict-update cycle but with key modifications:
Augmented Measurement Model:
\[\begin{split}\mathbf{y}_{aug}(t) = \begin{bmatrix} \mathbf{h}(\mathbf{x}(t)) \\ \check{h}_{ANN}(\mathbf{y}_{[t-w:t]}) \end{bmatrix}\end{split}\]where \(\check{h}_{ANN}\) is a data-driven estimator (e.g., ANN) that predicts poorly observable states from a time window of measurements.
Time-Varying Measurement Covariance:
\[\begin{split}\mathbf{R}_{aug}(t) = \begin{bmatrix} \mathbf{R}_{sensor} & \mathbf{0} \\ \mathbf{0} & \check{R}_{ANN}(t) \end{bmatrix}\end{split}\]The ANN measurement variance adapts based on observability:
\[\check{R}_{ANN}(t) = \frac{c}{\min_{i \in [t-w, t]} \sigma(i)}\]where \(\sigma(i)\) is an observability indicator (e.g., |acceleration|): - High observability → Small \(\check{R}_{ANN}\) → Trust ANN - Low observability → Large \(\check{R}_{ANN}\) → Ignore ANN
Algorithm Steps:
Predict: Propagate sigma points through dynamics \(f()\)
Update: Use augmented measurements and time-varying \(\mathbf{R}_{aug}\)
Adaptive Trust: Automatically adjust ANN influence based on observability
When to Use AI-UKF: - Initial conditions are very uncertain (standard UKF would diverge) - States are intermittently observable (e.g., altitude from optic flow) - You have trained data-driven estimators for weakly observable states
References: Cellini, B., Boyacioglu, B., Lopez, A., & van Breugel, F. (2025). Discovering and exploiting active sensing motifs for estimation. arXiv:2511.08766.
Examples
>>> import numpy as np >>> from pykal.algorithm_library.estimators.ai_ukf import AIUKF >>> >>> # Simple 1D example with augmented altitude measurement >>> x_hat = np.array([0.0]) # State: altitude >>> P = np.eye(1) * 100 # Large initial uncertainty >>> xhat_P = (x_hat, P) >>> >>> # Augmented measurement: [optic_flow, altitude_ANN_estimate] >>> yk_aug = np.array([0.5, 10.0]) >>> >>> # Define dynamics and augmented measurement functions >>> def f(x): return x # Constant altitude >>> def h_aug(x): return np.array([x[0]/10, x[0]]) # [optic_flow, altitude] >>> >>> # Time-varying R: trust ANN more when observability is high >>> R_sensor = np.array([[0.1]]) >>> R_ANN = np.array([[1.0]]) # Varies with time in practice >>> Rk_aug = np.block([[R_sensor, np.zeros((1,1))], ... [np.zeros((1,1)), R_ANN]]) >>> >>> Q = np.eye(1) * 0.01 >>> >>> # Run AI-UKF >>> x_upd, P_upd = AIUKF.standard_f( ... xhat_P=xhat_P, yk_aug=yk_aug, f=f, f_params={}, ... h_aug=h_aug, h_aug_params={}, Qk=Q, Rk_aug=Rk_aug ... ) >>> x_upd.shape (1,)
- pykal.algorithm_library.estimators.ai_ukf.h(xhat_P)
Extract current state estimate.
- Parameters:
xhat_P (Tuple[NDArray, NDArray]) – State estimate and covariance tuple
- Returns:
x_hat – Current state estimate
- Return type:
NDArray
Notes
This function is identical to UKF.standard_h() - it simply extracts the state estimate from the (x_hat, P) tuple.