Estimators

pykal.algorithm_library.estimators

State estimation algorithms from the robotics and control theory literature.

Available Modules:

kf: Kalman filter and Extended Kalman Filter (EKF) ukf: Unscented Kalman Filter (UKF) pf: Particle Filter for nonlinear/non-Gaussian estimation ai_ukf: Augmented Information UKF for hybrid physics-data-driven estimation

State estimation algorithms from the robotics and control theory literature.

Kalman Filter

class pykal.algorithm_library.estimators.kf.KF[source]

Bases: object

static f(*, zk, yk, f, f_h_params, h, Fk, Qk, Hk, Rk)[source]

Perform one full predict–update step of the discrete-time Kalman Filter.

Return type:

Tuple[ndarray[tuple[Any, ...], dtype[TypeVar(_ScalarT, bound= generic)]], ndarray[tuple[Any, ...], dtype[TypeVar(_ScalarT, bound= generic)]]]

Parameters:
static h(zk)[source]
Return type:

ndarray[tuple[Any, ...], dtype[TypeVar(_ScalarT, bound= generic)]]

Parameters:

zk (Tuple[ndarray[tuple[Any, ...], dtype[_ScalarT]], ndarray[tuple[Any, ...], dtype[_ScalarT]]])

pykal.algorithm_library.estimators.kf.f(*, zk, yk, f, f_h_params, h, Fk, Qk, Hk, Rk)

Perform one full predict–update step of the discrete-time Kalman Filter.

Return type:

Tuple[ndarray[tuple[Any, ...], dtype[TypeVar(_ScalarT, bound= generic)]], ndarray[tuple[Any, ...], dtype[TypeVar(_ScalarT, bound= generic)]]]

Parameters:
pykal.algorithm_library.estimators.kf.h(zk)
Return type:

ndarray[tuple[Any, ...], dtype[TypeVar(_ScalarT, bound= generic)]]

Parameters:

zk (Tuple[ndarray[tuple[Any, ...], dtype[_ScalarT]], ndarray[tuple[Any, ...], dtype[_ScalarT]]])

Unscented Kalman Filter

class pykal.algorithm_library.estimators.ukf.UKF[source]

Bases: object

Unscented Kalman Filter (UKF) for nonlinear state estimation.

Based on Julier & Uhlmann (2004), “Unscented Filtering and Nonlinear Estimation”, Proceedings of the IEEE, Vol. 92, No. 3, pp. 401-422.

The UKF uses the unscented transformation to propagate mean and covariance through nonlinear functions without requiring Jacobian calculations.

static f(*, xhat_P, yk, f, f_params, h, h_params, Qk, Rk, alpha=0.001, beta=2.0, kappa=0.0)[source]

Perform one full predict-update step of the Unscented Kalman Filter.

Parameters:
  • xhat_P (Tuple[NDArray, NDArray]) –

    Tuple (x_hat_k, P_k) containing:
    • x_hat_k : current state estimate, shape (n,)

    • P_k : current state covariance, shape (n, n)

  • yk (NDArray) – Measurement at time k, shape (m,)

  • f (Callable) – Nonlinear state evolution function: x_{k+1} = f(x_k, **f_params) Should accept state as first argument

  • f_params (Dict) – Additional parameters for state evolution function

  • h (Callable) – Nonlinear measurement function: y_k = h(x_k, **h_params) Should accept state as first argument

  • h_params (Dict) – Additional parameters for measurement function

  • Qk (NDArray) – Process noise covariance, shape (n, n)

  • Rk (NDArray) – Measurement noise covariance, shape (m, m)

  • alpha (float) – UKF tuning parameter (spread of sigma points), typically 1e-4 to 1

  • beta (float) – UKF tuning parameter (distribution shape), 2 is optimal for Gaussian

  • kappa (float) – UKF tuning parameter (secondary scaling), typically 0 or 3-n

Return type:

Tuple[ndarray[tuple[Any, ...], dtype[TypeVar(_ScalarT, bound= generic)]], ndarray[tuple[Any, ...], dtype[TypeVar(_ScalarT, bound= generic)]]]

Returns:

  • x_upd (NDArray) – Updated state estimate, shape (n,)

  • P_upd (NDArray) – Updated state covariance, shape (n, n)

Notes

The UKF uses the unscented transformation to propagate the state estimate through nonlinear dynamics and measurement functions. Unlike the EKF, it does not require Jacobian calculations.

Predict Step: 1. Generate sigma points from current estimate 2. Propagate sigma points through dynamics f() 3. Compute predicted mean and covariance

Update Step: 1. Propagate predicted sigma points through measurement h() 2. Compute predicted measurement mean and covariance 3. Compute cross-covariance between state and measurement 4. Compute Kalman gain and update estimate

References: Julier, S. J., & Uhlmann, J. K. (2004). Unscented filtering and nonlinear estimation. Proceedings of the IEEE, 92(3), 401-422.

static h(xhat_P)[source]

Extract current state estimate.

Parameters:

xhat_P (Tuple[NDArray, NDArray]) – State estimate and covariance tuple

Returns:

x_hat – Current state estimate

Return type:

NDArray

pykal.algorithm_library.estimators.ukf.f(*, xhat_P, yk, f, f_params, h, h_params, Qk, Rk, alpha=0.001, beta=2.0, kappa=0.0)

Perform one full predict-update step of the Unscented Kalman Filter.

Parameters:
  • xhat_P (Tuple[NDArray, NDArray]) –

    Tuple (x_hat_k, P_k) containing:
    • x_hat_k : current state estimate, shape (n,)

    • P_k : current state covariance, shape (n, n)

  • yk (NDArray) – Measurement at time k, shape (m,)

  • f (Callable) – Nonlinear state evolution function: x_{k+1} = f(x_k, **f_params) Should accept state as first argument

  • f_params (Dict) – Additional parameters for state evolution function

  • h (Callable) – Nonlinear measurement function: y_k = h(x_k, **h_params) Should accept state as first argument

  • h_params (Dict) – Additional parameters for measurement function

  • Qk (NDArray) – Process noise covariance, shape (n, n)

  • Rk (NDArray) – Measurement noise covariance, shape (m, m)

  • alpha (float) – UKF tuning parameter (spread of sigma points), typically 1e-4 to 1

  • beta (float) – UKF tuning parameter (distribution shape), 2 is optimal for Gaussian

  • kappa (float) – UKF tuning parameter (secondary scaling), typically 0 or 3-n

Return type:

Tuple[ndarray[tuple[Any, ...], dtype[TypeVar(_ScalarT, bound= generic)]], ndarray[tuple[Any, ...], dtype[TypeVar(_ScalarT, bound= generic)]]]

Returns:

  • x_upd (NDArray) – Updated state estimate, shape (n,)

  • P_upd (NDArray) – Updated state covariance, shape (n, n)

Notes

The UKF uses the unscented transformation to propagate the state estimate through nonlinear dynamics and measurement functions. Unlike the EKF, it does not require Jacobian calculations.

Predict Step: 1. Generate sigma points from current estimate 2. Propagate sigma points through dynamics f() 3. Compute predicted mean and covariance

Update Step: 1. Propagate predicted sigma points through measurement h() 2. Compute predicted measurement mean and covariance 3. Compute cross-covariance between state and measurement 4. Compute Kalman gain and update estimate

References: Julier, S. J., & Uhlmann, J. K. (2004). Unscented filtering and nonlinear estimation. Proceedings of the IEEE, 92(3), 401-422.

pykal.algorithm_library.estimators.ukf.h(xhat_P)

Extract current state estimate.

Parameters:

xhat_P (Tuple[NDArray, NDArray]) – State estimate and covariance tuple

Returns:

x_hat – Current state estimate

Return type:

NDArray

Particle Filter

pykal.algorithm_library.estimators.pf.PF

alias of ParticleFilter

class pykal.algorithm_library.estimators.pf.ParticleFilter[source]

Bases: object

Particle Filter for nonlinear/non-Gaussian state estimation.

Based on Gordon, Salmond, & Smith (1993), “Novel approach to nonlinear/non-Gaussian Bayesian state estimation”, IEE Proceedings F, Vol. 140, No. 2, pp. 107-113.

The Particle Filter represents the posterior distribution p(x_k | y_{1:k}) using weighted particles, enabling estimation for arbitrary nonlinear dynamics and non-Gaussian noise.

static effective_sample_size(weights)[source]

Compute the effective sample size (ESS) of the particle set.

Parameters:

weights (NDArray) – Normalized particle weights, shape (N_p,)

Returns:

ess – Effective sample size, range [1, N_p]

Return type:

float

Notes

The effective sample size quantifies particle diversity:

\[\text{ESS} = \frac{1}{\sum_{i=1}^{N_p} (w_i)^2}\]

Interpretation: - ESS N_p: Weights are uniform (good diversity) - ESS 1: One particle has all weight (degeneracy)

Resampling criterion: Resample when ESS < threshold * N_p (typically threshold = 0.5)

Examples

>>> import numpy as np
>>> # Uniform weights (maximum diversity)
>>> weights = np.ones(100) / 100
>>> ess = ParticleFilter.effective_sample_size(weights)
>>> ess
100.0
>>> # Degenerate case (one particle has all weight)
>>> weights = np.zeros(100)
>>> weights[0] = 1.0
>>> ess = ParticleFilter.effective_sample_size(weights)
>>> ess
1.0
static f(*, particles_weights, yk, f, f_params, h=None, h_params=None, likelihood=None, likelihood_params=None, Q=None, R=None, resample_threshold=0.5, resample_method='systematic')[source]

Perform one full predict-update-resample cycle of the Particle Filter.

Parameters:
  • particles_weights (Tuple[NDArray, NDArray]) –

    A tuple (particles, weights) containing:
    • particles : particle states, shape (N_p, n)

    • weights : normalized particle weights, shape (N_p,)

  • yk (NDArray) – Measurement at time k, shape (m,)

  • f (Callable) – Nonlinear dynamics function: x_{k+1} = f(x_k, **f_params) Should accept state as first argument

  • f_params (Dict) – Additional parameters for dynamics function

  • h (Callable, optional) – Nonlinear measurement function: y_k = h(x_k, **h_params) Required if likelihood is not provided Should accept state as first argument

  • h_params (Dict, optional) – Additional parameters for measurement function

  • likelihood (Callable, optional) – Custom likelihood function: p(y_k | x_k) = likelihood(yk, x_k, **likelihood_params) If provided, used instead of automatic Gaussian likelihood from h Should accept measurement and state as first two arguments

  • likelihood_params (Dict, optional) – Additional parameters for likelihood function

  • Q (NDArray, optional) – Process noise covariance, shape (n, n) Can be None if process noise is handled within the dynamics function f

  • R (NDArray, optional) – Measurement noise covariance, shape (m, m) Required if h is provided (for Gaussian likelihood computation) Not needed if using custom likelihood function

  • resample_threshold (float) – Effective sample size threshold (0-1) for triggering resampling ESS < threshold * N_p triggers resampling

  • resample_method (str) – Resampling strategy: ‘systematic’, ‘stratified’, or ‘residual’ Default: ‘systematic’ (recommended for low variance)

Return type:

Tuple[ndarray[tuple[Any, ...], dtype[TypeVar(_ScalarT, bound= generic)]], ndarray[tuple[Any, ...], dtype[TypeVar(_ScalarT, bound= generic)]]]

Returns:

  • particles_new (NDArray) – Updated particles, shape (N_p, n)

  • weights_new (NDArray) – Updated normalized weights, shape (N_p,)

Notes

The Particle Filter approximates the posterior distribution using weighted samples:

Representation:

p(x_k | y_{1:k}) ∑_{i=1}^{N_p} w_k^(i) δ(x_k - x_k^(i))

Algorithm Steps:

  1. Predict: Propagate particles through dynamics with process noise

    x_k^(i) ~ p(x_k | x_{k-1}^(i))f(x_{k-1}^(i)) + w_k

  2. Update: Compute likelihood weights

    w_k^(i) p(y_k | x_k^(i)) → evaluated via measurement model h()

  3. Normalize: Ensure weights sum to 1

    w_k^(i) = w_k^(i) / ∑_j w_k^(j)

  4. Resample: If effective sample size (ESS) is low, resample to prevent degeneracy

    ESS = 1 / ∑_i (w_k^(i))²

Numerical Stability: - Weights computed in log-space to prevent underflow - Log-sum-exp trick for normalization - Small ridge term added to prevent zero weights

References: Gordon, N. J., Salmond, D. J., & Smith, A. F. M. (1993). Novel approach to nonlinear/non-Gaussian Bayesian state estimation. IEE Proceedings F, 140(2), 107-113.

static h(particles_weights)[source]

Compute the weighted mean state estimate from particles.

Parameters:

particles_weights (Tuple[NDArray, NDArray]) – Tuple (particles, weights)

Returns:

xhat – Weighted mean state estimate, shape (n,)

Return type:

NDArray

Notes

The weighted mean is computed as:

\[\hat{x}_k = \sum_{i=1}^{N_p} w_k^{(i)} x_k^{(i)}\]

For multimodal distributions, the weighted mean may not be the best point estimate. Alternative estimators: - Maximum a posteriori (MAP): particles[np.argmax(weights)] - Median: Component-wise weighted median

Examples

>>> import numpy as np
>>> particles = np.array([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]])
>>> weights = np.array([0.5, 0.3, 0.2])
>>> particles_weights = (particles, weights)
>>> xhat = ParticleFilter.standard_h(particles_weights)
>>> xhat
array([2.4, 3.4])
pykal.algorithm_library.estimators.pf.f(*, particles_weights, yk, f, f_params, h=None, h_params=None, likelihood=None, likelihood_params=None, Q=None, R=None, resample_threshold=0.5, resample_method='systematic')

Perform one full predict-update-resample cycle of the Particle Filter.

Parameters:
  • particles_weights (Tuple[NDArray, NDArray]) –

    A tuple (particles, weights) containing:
    • particles : particle states, shape (N_p, n)

    • weights : normalized particle weights, shape (N_p,)

  • yk (NDArray) – Measurement at time k, shape (m,)

  • f (Callable) – Nonlinear dynamics function: x_{k+1} = f(x_k, **f_params) Should accept state as first argument

  • f_params (Dict) – Additional parameters for dynamics function

  • h (Callable, optional) – Nonlinear measurement function: y_k = h(x_k, **h_params) Required if likelihood is not provided Should accept state as first argument

  • h_params (Dict, optional) – Additional parameters for measurement function

  • likelihood (Callable, optional) – Custom likelihood function: p(y_k | x_k) = likelihood(yk, x_k, **likelihood_params) If provided, used instead of automatic Gaussian likelihood from h Should accept measurement and state as first two arguments

  • likelihood_params (Dict, optional) – Additional parameters for likelihood function

  • Q (NDArray, optional) – Process noise covariance, shape (n, n) Can be None if process noise is handled within the dynamics function f

  • R (NDArray, optional) – Measurement noise covariance, shape (m, m) Required if h is provided (for Gaussian likelihood computation) Not needed if using custom likelihood function

  • resample_threshold (float) – Effective sample size threshold (0-1) for triggering resampling ESS < threshold * N_p triggers resampling

  • resample_method (str) – Resampling strategy: ‘systematic’, ‘stratified’, or ‘residual’ Default: ‘systematic’ (recommended for low variance)

Return type:

Tuple[ndarray[tuple[Any, ...], dtype[TypeVar(_ScalarT, bound= generic)]], ndarray[tuple[Any, ...], dtype[TypeVar(_ScalarT, bound= generic)]]]

Returns:

  • particles_new (NDArray) – Updated particles, shape (N_p, n)

  • weights_new (NDArray) – Updated normalized weights, shape (N_p,)

Notes

The Particle Filter approximates the posterior distribution using weighted samples:

Representation:

p(x_k | y_{1:k}) ∑_{i=1}^{N_p} w_k^(i) δ(x_k - x_k^(i))

Algorithm Steps:

  1. Predict: Propagate particles through dynamics with process noise

    x_k^(i) ~ p(x_k | x_{k-1}^(i))f(x_{k-1}^(i)) + w_k

  2. Update: Compute likelihood weights

    w_k^(i) p(y_k | x_k^(i)) → evaluated via measurement model h()

  3. Normalize: Ensure weights sum to 1

    w_k^(i) = w_k^(i) / ∑_j w_k^(j)

  4. Resample: If effective sample size (ESS) is low, resample to prevent degeneracy

    ESS = 1 / ∑_i (w_k^(i))²

Numerical Stability: - Weights computed in log-space to prevent underflow - Log-sum-exp trick for normalization - Small ridge term added to prevent zero weights

References: Gordon, N. J., Salmond, D. J., & Smith, A. F. M. (1993). Novel approach to nonlinear/non-Gaussian Bayesian state estimation. IEE Proceedings F, 140(2), 107-113.

pykal.algorithm_library.estimators.pf.h(particles_weights)

Compute the weighted mean state estimate from particles.

Parameters:

particles_weights (Tuple[NDArray, NDArray]) – Tuple (particles, weights)

Returns:

xhat – Weighted mean state estimate, shape (n,)

Return type:

NDArray

Notes

The weighted mean is computed as:

\[\hat{x}_k = \sum_{i=1}^{N_p} w_k^{(i)} x_k^{(i)}\]

For multimodal distributions, the weighted mean may not be the best point estimate. Alternative estimators: - Maximum a posteriori (MAP): particles[np.argmax(weights)] - Median: Component-wise weighted median

Examples

>>> import numpy as np
>>> particles = np.array([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]])
>>> weights = np.array([0.5, 0.3, 0.2])
>>> particles_weights = (particles, weights)
>>> xhat = ParticleFilter.standard_h(particles_weights)
>>> xhat
array([2.4, 3.4])

AI-UKF (Augmented Information UKF)

class pykal.algorithm_library.estimators.ai_ukf.AIUKF[source]

Bases: object

Augmented Information Unscented Kalman Filter (AI-UKF) for hybrid physics-data-driven state estimation.

Based on Cellini et al. (2025), “Discovering and exploiting active sensing motifs for estimation”, arXiv:2511.08766.

The AI-UKF extends the standard UKF by augmenting measurements with data-driven predictions (e.g., from Artificial Neural Networks) and using observability-aware time-varying measurement covariance to dynamically adjust trust in these predictions.

Key Innovation: Time-varying R that adapts based on instantaneous observability, enabling robust estimation even with poor initial conditions or weak observability.

static compute_observability_variance(observability_indicators, window_size, base_variance=1.0)[source]

Compute time-varying R for ANN measurements based on observability.

Parameters:
  • observability_indicators (NDArray) – Time series of observability indicators, shape (T,) Higher values indicate better observability Examples: |acceleration|, |velocity|, |angular_rate|

  • window_size (int) – Number of past timesteps to consider for minimum

  • base_variance (float) – Base measurement variance (scales the output)

Returns:

R_ann – Time-varying ANN measurement variance, shape (T,)

Return type:

NDArray

Notes

The observability-based variance is computed as:

\[\check{R}_{ANN}(t) = \frac{c}{\min_{i \in [t-w, t]} \sigma(i)}\]

where: - \(c\) is the base_variance parameter - \(w\) is the window_size - \(\sigma(i)\) is the observability indicator at time \(i\)

Intuition: - High observability (large \(\sigma\)) → Small \(\check{R}_{ANN}\) → Trust ANN - Low observability (small \(\sigma\)) → Large \(\check{R}_{ANN}\) → Don’t trust ANN

Example: Altitude from Optic Flow When horizontal acceleration is high, altitude is observable from optic flow. When hovering (low acceleration), altitude is unobservable.

accel_x = np.abs(acceleration_history)  # Observability indicator
R_ann = AIUKF.compute_observability_variance(
    accel_x, window_size=10, base_variance=1.0
)

Numerical Stability: To prevent division by zero or very small numbers: - A small epsilon is added to the minimum observability - Very small observability indicators result in very large R (effectively ignoring ANN)

Examples

>>> import numpy as np
>>> # Simulate varying acceleration (observability indicator)
>>> accel = np.array([0.1, 0.5, 1.0, 0.2, 0.8, 0.05, 0.3])
>>> R_ann = AIUKF.compute_observability_variance(accel, window_size=3)
>>> R_ann.shape
(7,)
>>> # When acceleration is high (good observability), R is low
>>> R_ann[2]  # accel=1.0 is high
1.0
>>> # When acceleration is low (poor observability), R is high
>>> R_ann[5] > R_ann[2]  # accel=0.05 vs accel=1.0
True
static create_augmented_R(R_sensor, R_ann_time_varying, num_sensor_measurements, time_index)[source]

Construct augmented measurement covariance matrix at a specific time.

Parameters:
  • R_sensor (NDArray) – Sensor measurement covariance (constant), shape (m_sensor, m_sensor)

  • R_ann_time_varying (NDArray) – Time series of ANN measurement variances, shape (T,) or (T, m_ann) for multiple ANN measurements

  • num_sensor_measurements (int) – Number of sensor measurements (m_sensor)

  • time_index (int) – Current time index

Returns:

R_aug – Augmented measurement covariance, shape (m_aug, m_aug)

Return type:

NDArray

Notes

Constructs block-diagonal augmented R:

\[\begin{split}\mathbf{R}_{aug}(t) = \begin{bmatrix} \mathbf{R}_{sensor} & \mathbf{0} \\ \mathbf{0} & \check{R}_{ANN}(t) \end{bmatrix}\end{split}\]

Examples

>>> import numpy as np
>>> R_sensor = np.diag([0.1, 0.2, 0.05])  # 3 sensor measurements
>>> R_ann_series = np.array([1.0, 0.5, 2.0, 0.8])  # Time series
>>> R_aug = AIUKF.create_augmented_R(R_sensor, R_ann_series, 3, time_index=1)
>>> R_aug.shape
(4, 4)
>>> R_aug[-1, -1]  # ANN variance at time_index=1
0.5
static f(*, xhat_P, yk_aug, f, f_params, h_aug, h_aug_params, Qk, Rk_aug, alpha=0.001, beta=2.0, kappa=0.0)[source]

Perform one full predict-update step of the AI-UKF with augmented measurements.

Parameters:
  • xhat_P (Tuple[NDArray, NDArray]) –

    Tuple (x_hat_k, P_k) containing:
    • x_hat_k : current state estimate, shape (n,)

    • P_k : current state covariance, shape (n, n)

  • yk_aug (NDArray) – Augmented measurement at time k, shape (m_aug,) Typically [sensor_measurements, ANN_predictions]

  • f (Callable) – Nonlinear state evolution function: x_{k+1} = f(x_k, **f_params) Should accept state as first argument

  • f_params (Dict) – Additional parameters for state evolution function

  • h_aug (Callable) – Augmented nonlinear measurement function: y_aug_k = h_aug(x_k, **h_aug_params) Should return augmented measurement vector including ANN-predicted states

  • h_aug_params (Dict) – Additional parameters for augmented measurement function

  • Qk (NDArray) – Process noise covariance, shape (n, n)

  • Rk_aug (NDArray) –

    Time-varying augmented measurement noise covariance, shape (m_aug, m_aug) Structure: R_aug = [[R_sensor, 0 ],

    [0, R_ANN(t) ]]

    where R_ANN(t) varies based on observability

  • alpha (float) – UKF tuning parameter (spread of sigma points), typically 1e-4 to 1

  • beta (float) – UKF tuning parameter (distribution shape), 2 is optimal for Gaussian

  • kappa (float) – UKF tuning parameter (secondary scaling), typically 0 or 3-n

Return type:

Tuple[ndarray[tuple[Any, ...], dtype[TypeVar(_ScalarT, bound= generic)]], ndarray[tuple[Any, ...], dtype[TypeVar(_ScalarT, bound= generic)]]]

Returns:

  • x_upd (NDArray) – Updated state estimate, shape (n,)

  • P_upd (NDArray) – Updated state covariance, shape (n, n)

Notes

The AI-UKF algorithm follows the standard UKF predict-update cycle but with key modifications:

Augmented Measurement Model:

\[\begin{split}\mathbf{y}_{aug}(t) = \begin{bmatrix} \mathbf{h}(\mathbf{x}(t)) \\ \check{h}_{ANN}(\mathbf{y}_{[t-w:t]}) \end{bmatrix}\end{split}\]

where \(\check{h}_{ANN}\) is a data-driven estimator (e.g., ANN) that predicts poorly observable states from a time window of measurements.

Time-Varying Measurement Covariance:

\[\begin{split}\mathbf{R}_{aug}(t) = \begin{bmatrix} \mathbf{R}_{sensor} & \mathbf{0} \\ \mathbf{0} & \check{R}_{ANN}(t) \end{bmatrix}\end{split}\]

The ANN measurement variance adapts based on observability:

\[\check{R}_{ANN}(t) = \frac{c}{\min_{i \in [t-w, t]} \sigma(i)}\]

where \(\sigma(i)\) is an observability indicator (e.g., |acceleration|): - High observability → Small \(\check{R}_{ANN}\) → Trust ANN - Low observability → Large \(\check{R}_{ANN}\) → Ignore ANN

Algorithm Steps:

  1. Predict: Propagate sigma points through dynamics \(f()\)

  2. Update: Use augmented measurements and time-varying \(\mathbf{R}_{aug}\)

  3. Adaptive Trust: Automatically adjust ANN influence based on observability

When to Use AI-UKF: - Initial conditions are very uncertain (standard UKF would diverge) - States are intermittently observable (e.g., altitude from optic flow) - You have trained data-driven estimators for weakly observable states

References: Cellini, B., Boyacioglu, B., Lopez, A., & van Breugel, F. (2025). Discovering and exploiting active sensing motifs for estimation. arXiv:2511.08766.

Examples

>>> import numpy as np
>>> from pykal.algorithm_library.estimators.ai_ukf import AIUKF
>>>
>>> # Simple 1D example with augmented altitude measurement
>>> x_hat = np.array([0.0])  # State: altitude
>>> P = np.eye(1) * 100  # Large initial uncertainty
>>> xhat_P = (x_hat, P)
>>>
>>> # Augmented measurement: [optic_flow, altitude_ANN_estimate]
>>> yk_aug = np.array([0.5, 10.0])
>>>
>>> # Define dynamics and augmented measurement functions
>>> def f(x): return x  # Constant altitude
>>> def h_aug(x): return np.array([x[0]/10, x[0]])  # [optic_flow, altitude]
>>>
>>> # Time-varying R: trust ANN more when observability is high
>>> R_sensor = np.array([[0.1]])
>>> R_ANN = np.array([[1.0]])  # Varies with time in practice
>>> Rk_aug = np.block([[R_sensor, np.zeros((1,1))],
...                    [np.zeros((1,1)), R_ANN]])
>>>
>>> Q = np.eye(1) * 0.01
>>>
>>> # Run AI-UKF
>>> x_upd, P_upd = AIUKF.standard_f(
...     xhat_P=xhat_P, yk_aug=yk_aug, f=f, f_params={},
...     h_aug=h_aug, h_aug_params={}, Qk=Q, Rk_aug=Rk_aug
... )
>>> x_upd.shape
(1,)
static h(xhat_P)[source]

Extract current state estimate.

Parameters:

xhat_P (Tuple[NDArray, NDArray]) – State estimate and covariance tuple

Returns:

x_hat – Current state estimate

Return type:

NDArray

Notes

This function is identical to UKF.standard_h() - it simply extracts the state estimate from the (x_hat, P) tuple.

pykal.algorithm_library.estimators.ai_ukf.compute_observability_variance(observability_indicators, window_size, base_variance=1.0)

Compute time-varying R for ANN measurements based on observability.

Parameters:
  • observability_indicators (NDArray) – Time series of observability indicators, shape (T,) Higher values indicate better observability Examples: |acceleration|, |velocity|, |angular_rate|

  • window_size (int) – Number of past timesteps to consider for minimum

  • base_variance (float) – Base measurement variance (scales the output)

Returns:

R_ann – Time-varying ANN measurement variance, shape (T,)

Return type:

NDArray

Notes

The observability-based variance is computed as:

\[\check{R}_{ANN}(t) = \frac{c}{\min_{i \in [t-w, t]} \sigma(i)}\]

where: - \(c\) is the base_variance parameter - \(w\) is the window_size - \(\sigma(i)\) is the observability indicator at time \(i\)

Intuition: - High observability (large \(\sigma\)) → Small \(\check{R}_{ANN}\) → Trust ANN - Low observability (small \(\sigma\)) → Large \(\check{R}_{ANN}\) → Don’t trust ANN

Example: Altitude from Optic Flow When horizontal acceleration is high, altitude is observable from optic flow. When hovering (low acceleration), altitude is unobservable.

accel_x = np.abs(acceleration_history)  # Observability indicator
R_ann = AIUKF.compute_observability_variance(
    accel_x, window_size=10, base_variance=1.0
)

Numerical Stability: To prevent division by zero or very small numbers: - A small epsilon is added to the minimum observability - Very small observability indicators result in very large R (effectively ignoring ANN)

Examples

>>> import numpy as np
>>> # Simulate varying acceleration (observability indicator)
>>> accel = np.array([0.1, 0.5, 1.0, 0.2, 0.8, 0.05, 0.3])
>>> R_ann = AIUKF.compute_observability_variance(accel, window_size=3)
>>> R_ann.shape
(7,)
>>> # When acceleration is high (good observability), R is low
>>> R_ann[2]  # accel=1.0 is high
1.0
>>> # When acceleration is low (poor observability), R is high
>>> R_ann[5] > R_ann[2]  # accel=0.05 vs accel=1.0
True
pykal.algorithm_library.estimators.ai_ukf.f(*, xhat_P, yk_aug, f, f_params, h_aug, h_aug_params, Qk, Rk_aug, alpha=0.001, beta=2.0, kappa=0.0)

Perform one full predict-update step of the AI-UKF with augmented measurements.

Parameters:
  • xhat_P (Tuple[NDArray, NDArray]) –

    Tuple (x_hat_k, P_k) containing:
    • x_hat_k : current state estimate, shape (n,)

    • P_k : current state covariance, shape (n, n)

  • yk_aug (NDArray) – Augmented measurement at time k, shape (m_aug,) Typically [sensor_measurements, ANN_predictions]

  • f (Callable) – Nonlinear state evolution function: x_{k+1} = f(x_k, **f_params) Should accept state as first argument

  • f_params (Dict) – Additional parameters for state evolution function

  • h_aug (Callable) – Augmented nonlinear measurement function: y_aug_k = h_aug(x_k, **h_aug_params) Should return augmented measurement vector including ANN-predicted states

  • h_aug_params (Dict) – Additional parameters for augmented measurement function

  • Qk (NDArray) – Process noise covariance, shape (n, n)

  • Rk_aug (NDArray) –

    Time-varying augmented measurement noise covariance, shape (m_aug, m_aug) Structure: R_aug = [[R_sensor, 0 ],

    [0, R_ANN(t) ]]

    where R_ANN(t) varies based on observability

  • alpha (float) – UKF tuning parameter (spread of sigma points), typically 1e-4 to 1

  • beta (float) – UKF tuning parameter (distribution shape), 2 is optimal for Gaussian

  • kappa (float) – UKF tuning parameter (secondary scaling), typically 0 or 3-n

Return type:

Tuple[ndarray[tuple[Any, ...], dtype[TypeVar(_ScalarT, bound= generic)]], ndarray[tuple[Any, ...], dtype[TypeVar(_ScalarT, bound= generic)]]]

Returns:

  • x_upd (NDArray) – Updated state estimate, shape (n,)

  • P_upd (NDArray) – Updated state covariance, shape (n, n)

Notes

The AI-UKF algorithm follows the standard UKF predict-update cycle but with key modifications:

Augmented Measurement Model:

\[\begin{split}\mathbf{y}_{aug}(t) = \begin{bmatrix} \mathbf{h}(\mathbf{x}(t)) \\ \check{h}_{ANN}(\mathbf{y}_{[t-w:t]}) \end{bmatrix}\end{split}\]

where \(\check{h}_{ANN}\) is a data-driven estimator (e.g., ANN) that predicts poorly observable states from a time window of measurements.

Time-Varying Measurement Covariance:

\[\begin{split}\mathbf{R}_{aug}(t) = \begin{bmatrix} \mathbf{R}_{sensor} & \mathbf{0} \\ \mathbf{0} & \check{R}_{ANN}(t) \end{bmatrix}\end{split}\]

The ANN measurement variance adapts based on observability:

\[\check{R}_{ANN}(t) = \frac{c}{\min_{i \in [t-w, t]} \sigma(i)}\]

where \(\sigma(i)\) is an observability indicator (e.g., |acceleration|): - High observability → Small \(\check{R}_{ANN}\) → Trust ANN - Low observability → Large \(\check{R}_{ANN}\) → Ignore ANN

Algorithm Steps:

  1. Predict: Propagate sigma points through dynamics \(f()\)

  2. Update: Use augmented measurements and time-varying \(\mathbf{R}_{aug}\)

  3. Adaptive Trust: Automatically adjust ANN influence based on observability

When to Use AI-UKF: - Initial conditions are very uncertain (standard UKF would diverge) - States are intermittently observable (e.g., altitude from optic flow) - You have trained data-driven estimators for weakly observable states

References: Cellini, B., Boyacioglu, B., Lopez, A., & van Breugel, F. (2025). Discovering and exploiting active sensing motifs for estimation. arXiv:2511.08766.

Examples

>>> import numpy as np
>>> from pykal.algorithm_library.estimators.ai_ukf import AIUKF
>>>
>>> # Simple 1D example with augmented altitude measurement
>>> x_hat = np.array([0.0])  # State: altitude
>>> P = np.eye(1) * 100  # Large initial uncertainty
>>> xhat_P = (x_hat, P)
>>>
>>> # Augmented measurement: [optic_flow, altitude_ANN_estimate]
>>> yk_aug = np.array([0.5, 10.0])
>>>
>>> # Define dynamics and augmented measurement functions
>>> def f(x): return x  # Constant altitude
>>> def h_aug(x): return np.array([x[0]/10, x[0]])  # [optic_flow, altitude]
>>>
>>> # Time-varying R: trust ANN more when observability is high
>>> R_sensor = np.array([[0.1]])
>>> R_ANN = np.array([[1.0]])  # Varies with time in practice
>>> Rk_aug = np.block([[R_sensor, np.zeros((1,1))],
...                    [np.zeros((1,1)), R_ANN]])
>>>
>>> Q = np.eye(1) * 0.01
>>>
>>> # Run AI-UKF
>>> x_upd, P_upd = AIUKF.standard_f(
...     xhat_P=xhat_P, yk_aug=yk_aug, f=f, f_params={},
...     h_aug=h_aug, h_aug_params={}, Qk=Q, Rk_aug=Rk_aug
... )
>>> x_upd.shape
(1,)
pykal.algorithm_library.estimators.ai_ukf.h(xhat_P)

Extract current state estimate.

Parameters:

xhat_P (Tuple[NDArray, NDArray]) – State estimate and covariance tuple

Returns:

x_hat – Current state estimate

Return type:

NDArray

Notes

This function is identical to UKF.standard_h() - it simply extracts the state estimate from the (x_hat, P) tuple.