{ "cells": [ { "cell_type": "markdown", "id": "e87aa7c1", "metadata": {}, "source": [ "[← Algorithms as Dynamical Systems](../../getting_started/theory_to_python/algorithms_as_dynamical_systems.rst)" ] }, { "cell_type": "markdown", "id": "8c8b7063", "metadata": { "lines_to_next_cell": 0 }, "source": [ ":::{note}\n", "This notebook serves as an example for implementing a standard algorithm in an $(f,h)$-representation. This notebooks also serves as a template for those who wish to contribute algorithms to the [Community Library](../../community/index.rst). Implementation advice is given in green; templating advice is given in blue.\n", ":::" ] }, { "cell_type": "markdown", "id": "c766459f", "metadata": { "lines_to_next_cell": 0 }, "source": [ "# Example: Kalman Filter\n", "\n", ":::note\n", "The title of the notebook should be the name of the algorithm you're implemeting. The text beneath the title should serve as a general overview of your algorithm.\n", ":::\n", "The Kalman filter is a (linearly) optimal state estimation algorithm (given Gaussian process and measurement noise). This algorithm works by employing two phases: a **prediction** phase and an **update** phase.\n", "\n", "During the prediction phase, the state estimate and its associated uncertainty are propagated forward in time using the system’s dynamical model, without reference to new measurements. This step answers the question: *given what we believed at the previous time step, what do we expect the state to be now?*\n", "\n", "In the update phase, incoming measurements are incorporated to correct the predicted state. The discrepancy between the predicted measurement and the actual measurement (the **innovation**) is used to adjust the state estimate in a statistically optimal way, balancing model confidence against measurement reliability. The Kalman gain determines how much the estimate should trust the model versus the data, and is computed directly from the predicted covariance and the measurement noise.\n", "\n", "By alternating between prediction and update, the Kalman filter produces a recursive estimate of the system state that minimizes mean-squared estimation error among all linear unbiased estimators. Importantly, the filter does not require storing the full measurement history: all relevant information is compactly summarized in the current state estimate and its covariance (via Bayesian magic).\n", "\n", ":::note\n", "Add helpful links if appropriate.\n", ":::\n", " For further information, read the [Wikipedia article](https://en.wikipedia.org/wiki/Kalman_filter). For Kalman's original paper on arxiv, which has a wonderful manifold projection interpretation of optimal estimation, read [the paper here ](https://www.cs.cmu.edu/~motionplanning/papers/sbp_papers/k/Kalman1960.pdf?utm_source=chatgpt.com).\n", "\n", ":::note\n", "If your algorithm is included in the `pykal.algorithm_library`, then include a link to the API reference.\n", ":::\n", "The Kalman filter dynamical system as implemented in this notebook may be found in the API reference under \"Estimators\": [Kalman Filter](../../api/algorithm_library_estimators.rst)." ] }, { "cell_type": "markdown", "id": "1f21b658", "metadata": {}, "source": [ "## Definition of Algorithm\n", "\n", "### Notation and Assumptions\n", ":::note\n", "Here you define variables, terminology, and notation, as well as relevant assumptions (e.g. Gaussian noise)\n", ":::\n", "\n", "We model a discrete-time linear dynamical system with Gaussian noise by\n", "\n", "\n", "$$\n", "\\begin{aligned}\n", "x_{k+1} &= F_k x_k + B_k u_k + w_k, \\\\\n", "y_k &= H_k x_k + v_k.\n", "\\end{aligned}\n", "$$\n", "\n", "Where\n", "- $x_k \\in \\mathbb{R}^n$ is the (hidden) state at time step $k$,\n", "- $u_k \\in \\mathbb{R}^p$ is a known control input,\n", "- $y_k \\in \\mathbb{R}^m$ is the measurement,\n", "- $F_k \\in \\mathbb{R}^{n\\times n}$ is the state transition matrix,\n", "- $B_k \\in \\mathbb{R}^{n\\times p}$ is the control-input matrix,\n", "- $H_k \\in \\mathbb{R}^{m\\times n}$ is the measurement matrix.\n", "\n", "We assume that the process noise and measurement noise are modeled as zero-mean Gaussian random variables\n", "\n", "$$\n", "\\begin{aligned}\n", "w_k & \\sim \\mathcal{N}(0, Q_k)\\\\\n", "v_k & \\sim \\mathcal{N}(0, R_k)\n", "\\end{aligned}\n", "$$\n", "\n", "with covariances $Q_k \\in \\mathbb{R}^{n\\times n}$ and $R_k \\in \\mathbb{R}^{m\\times m}$. We also assume an initial Gaussian prior\n", "\n", "$$\n", "x_0 \\sim \\mathcal{N}(\\hat{x}_{0|0}, P_{0|0}).\n", "$$\n", "\n", "### How it works\n", ":::note\n", " An overview is great, but a full derivation and/or examples are unnecessary. You can just link to relevant resources and examples if you wish.\n", ":::\n", "\n", "The Kalman filter produces, at each time $k$, a Gaussian posterior estimate of the state conditioned on measurements up to time $k$:\n", "\n", "$$\n", "p(x_k \\mid y_{0:k}) \\approx \\mathcal{N}(\\hat{x}_{k|k}, P_{k|k}),\n", "$$\n", "\n", "where $\\hat{x}_{k|k}$ is the posterior mean (state estimate) and $P_{k|k}$ is the posterior covariance (uncertainty). It produces this estimate through a recursive **predict–update** cycle.\n", "\n", "In the **prediction** step, the previous posterior $(\\hat{x}_{k-1|k-1}, P_{k-1|k-1})$ is propagated through the system dynamics to obtain a prior (or *a priori*) estimate:\n", "\n", "$$\n", "\\hat{x}_{k|k-1} = f(\\hat{x}_{k-1|k-1}, u_{k-1},\\dots), \\qquad\n", "P_{k|k-1} = F_k P_{k-1|k-1} F_k^\\top + Q_k,\n", "$$\n", "\n", "where $F_k$ is the linearized (or exact) state transition model and $Q_k$ is the process noise covariance. This step accounts for both model evolution and uncertainty growth.\n", "\n", "In the **update** step, the predicted state is corrected using the new measurement $y_k$. The innovation\n", "\n", "$$\n", "\\nu_k = y_k - h(\\hat{x}_{k|k-1},\\dots)\n", "$$\n", "\n", "measures the mismatch between the predicted and observed outputs, with innovation covariance\n", "\n", "$$\n", "S_k = H_k P_{k|k-1} H_k^\\top + R_k.\n", "$$\n", "\n", "The Kalman gain\n", "\n", "$$\n", "K_k = P_{k|k-1} H_k^\\top S_k^{-1}\n", "$$\n", "\n", "determines how the innovation is mapped back into the state space. The posterior estimate is then\n", "\n", "$$\n", "\\hat{x}_{k|k} = \\hat{x}_{k|k-1} + K_k \\nu_k, \\qquad\n", "P_{k|k} = (I - K_k H_k) P_{k|k-1}.\n", "$$\n", "\n", "This recursive structure yields the minimum-variance linear unbiased estimate under Gaussian noise assumptions, while requiring only the current estimate and covariance rather than the full measurement history.\n", "\n", "For a full derivation, see [Kalman's original paper](https://ntrs.nasa.gov/citations/19860016041) or [this tutorial](https://www.bzarg.com/p/how-a-kalman-filter-works-in-pictures/)." ] }, { "cell_type": "markdown", "id": "385df8dd", "metadata": { "lines_to_next_cell": 0 }, "source": [ "## Algorithm as an $(f,h)$-representation\n", "::: note\n", "Define the $(f,h)$ system representation of the algorithm, with references to earlies sections as needed. You can include reasons as to why you defined the parameters, $f$, and $h$ as you did, as it may prove helpful for others looking at your notebook.\n", ":::\n", "Looking at the \"How it works\" section above, we see that a natural choice of state for the Kalman Filter is the pair\n", "\n", "$$\n", "z_k := (\\hat{x}_{k|k}, P_{k|k}),\n", "$$\n", "\n", "where $\\hat{x}_{k|k} \\in \\mathbb{R}^n$ is the current state estimate (posterior mean) and $P_{k|k} \\in \\mathbb{R}^{n\\times n}$ is the current posterior covariance. We choose our other expicit parameters to be the measurements $y_{k+1}$ and the input control input $u_k$. We choose our implicit parameters to be the functions $f$ and $h$, the matrices $F_k,H_k,Q_k$ and $R_k$, and the parameters for $f$ and $h$ (since we will need to call them inside of our dynamical system to generate our predictions).\n", "\n", ":::tip\n", "To determine what the state should be for your system, it is helpful to first think of what variables must be \"updated\" during each iteration. To determine which parameters should be explicit or implicit, recall that the state and, if present, the time, must always be explicit parameter, and choose other explicit parameters to be variables that interact with other dynamical systems in a block diagram, are able to change over different iterations, or are otherwise meaningful to the problem at hand. Again, choosing which parameters are explicit vs implicit is a matter of notational convenience and nothing more; don't lose any sleep over it.\n", ":::\n", "\n", "With the parameters defined thus, we define the Kalman Filter's $(f,h)$-representation as\n", "\n", "$$\n", "z_{k+1} = f(z_k, y_{k+1},u_k,\\dots), \\\\\n", "\\hat{x}_k=h(z_k, y_{k+1},u_k,\\dots)\n", "$$\n", "\n", "which is implemented as follows:\n" ] }, { "cell_type": "code", "execution_count": null, "id": "6371431a", "metadata": { "lines_to_next_cell": 2 }, "outputs": [], "source": [ "# Import any packages that are necessary to define and run your functions. For example, if your functions need \"scipy\" or \"cvxpy\" or even specific typing for their arguments, import them.\n", "\n", "from typing import Tuple, Callable, Dict\n", "from numpy.typing import NDArray\n", "from pykal.dynamical_system import DynamicalSystem\n", "import numpy as np\n", "\n", "\n", "def f(\n", " *,\n", " zk: Tuple[NDArray, NDArray],\n", " yk: NDArray,\n", " f: Callable,\n", " f_h_params: Dict,\n", " h: Callable,\n", " Fk: NDArray,\n", " Qk: NDArray,\n", " Hk: NDArray,\n", " Rk: NDArray,\n", ") -> Tuple[NDArray, NDArray]:\n", " \"\"\"\n", " Perform one full **predict–update** step of the discrete-time Kalman Filter.\n", " \"\"\"\n", "\n", " # === Extract covariance ===\n", " _, Pk = zk\n", "\n", " # === Predict ===\n", " x_pred = DynamicalSystem._smart_call(\n", " f, f_h_params\n", " ) # smart parameter binding (see \"Parameter Binding\" in \"DynamicalSystem\" notebook)\n", " P_pred = Fk @ Pk @ Fk.T + Qk\n", "\n", " # === Innovation ===\n", " y_pred = DynamicalSystem._smart_call(h, f_h_params)\n", " innovation = yk - y_pred\n", "\n", " # === Update ===\n", " Sk = Hk @ P_pred @ Hk.T + Rk\n", " ridge = 1e-9 * np.eye(Sk.shape[0])\n", " try:\n", " Sk_inv = np.linalg.inv(Sk + ridge)\n", " except np.linalg.LinAlgError:\n", " Sk_inv = np.linalg.pinv(Sk + ridge)\n", "\n", " Kk = P_pred @ Hk.T @ Sk_inv\n", " x_upd = x_pred + Kk @ innovation\n", "\n", " I = np.eye(P_pred.shape[0])\n", " P_upd = (I - Kk @ Hk) @ P_pred @ (I - Kk @ Hk).T + Kk @ Rk @ Kk.T\n", " P_upd = 0.5 * (P_upd + P_upd.T)\n", "\n", " return (x_upd, P_upd)\n", "\n", "\n", "def h(zk: Tuple[NDArray, NDArray]):\n", " return zk[0]" ] }, { "cell_type": "markdown", "id": "efb4b673", "metadata": { "lines_to_next_cell": 0 }, "source": [ "### Example: 1D Constant Velocity Tracking\n", "::: note\n", "Finally, show an example of your algorithm in action.This should be a simple example that gets the idea across on how to use your algorithm in a standard simulation loop.\n", ":::\n", "\n", "We track a target moving in 1D with (approximately) constant velocity, where we only observe its **position**. We suppose this is a linear system with zero-mean Gaussian process and measurement noise.\n", "\n", "Note that even though we only **measure position**, the Kalman filter is able to **estimate both position and velocity**. This is because the filter uses the system's dynamical model (constant velocity motion) to infer velocity from the sequence of position measurements. This demonstrates the power of model-based state estimation: by leveraging knowledge of how the system evolves, the Kalman filter can estimate hidden states that are not directly observed." ] }, { "cell_type": "markdown", "id": "32d4aa41", "metadata": {}, "source": [ "
\n", "\n", "
\n", "\n", "The diagram above shows the structure of our example: the **Target** block represents the system being tracked (with 2D state $x_k = [p_k, v_k]^\\top$), and the **Kalman Filter** block estimates this state from noisy position measurements $y_k$. The KF maintains its own state $z_k = (\\hat{x}_k, P_k)$ consisting of the state estimate and covariance." ] }, { "cell_type": "code", "execution_count": null, "id": "72579271", "metadata": {}, "outputs": [], "source": [ "\n", "\n", "def target_f(xk, Ak):\n", " \"\"\"\n", " Noise-free state evolution of target.\n", "\n", " Parameters:\n", " xk: State vector [position, velocity] as column vector, shape (2, 1)\n", " Ak: State transition matrix for constant velocity model, shape (2, 2)\n", " Ak = [[1, dt],\n", " [0, 1]]\n", "\n", " Returns:\n", " xk_next: Next state [position, velocity] as column vector, shape (2, 1)\n", " \"\"\"\n", " xk_next = Ak @ xk\n", " return xk_next\n", "\n", "\n", "def target_h(xk, Ck):\n", " \"\"\"\n", " Noise-free measurement of target.\n", "\n", " Parameters:\n", " xk: State vector [position, velocity] as column vector, shape (2, 1)\n", " Ck: Measurement matrix that extracts position only, shape (1, 2)\n", " Ck = [[1, 0]] (measures position, ignores velocity)\n", "\n", " Returns:\n", " yk: Measurement (position only) as column vector, shape (1, 1)\n", " \"\"\"\n", " yk = Ck @ xk\n", " return yk\n", "\n", "\n", "target_block = DynamicalSystem(f=target_f, h=target_h)\n", "\n", "kf_block = DynamicalSystem(f=f, h=h) # f and h are defined above" ] }, { "cell_type": "code", "execution_count": null, "id": "adfd9690", "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "\n", "rng = np.random.default_rng()\n", "\n", "### SIMULATION TIME ############\n", "dt = 0.1\n", "sim_time = np.arange(0, 10, dt)\n", "\n", "#### EXPLICIT PARAMETERS #########################\n", "\n", "xk = np.array([[0.0], [1.0]]) # initial target state (column vector, pos and vel)\n", "\n", "Pk = np.array([[1e-3, 0], [0, 1e-3]])\n", "zk = [xk, Pk] # initial kf state\n", "\n", "# target dynamics and measurment matrices\n", "Ak = np.array([[1, dt], [0, 1]])\n", "Ck = np.array([[1, 0]])\n", "\n", "\n", "#### IMPLICIT PARAMETERS #########################\n", "\n", "Qk = np.array([[1e-4, 0], [0, 1e-4]])\n", "Rk = np.array([[1e-3]])\n", "kf_constant_params = {\n", " \"f\": target_block.f,\n", " \"h\": target_block.h,\n", " \"Qk\": Qk,\n", " \"Rk\": Rk,\n", " \"Fk\": Ak,\n", " \"Hk\": Ck,\n", "}\n", "\n", "### HISTORIES #################\n", "\n", "true_states = []\n", "measurements = []\n", "kf_state_estimates = []\n", "\n", "### SIMULATION ###################################################\n", "for tk in sim_time:\n", "\n", " # Noise-free state evolution\n", " xk_next, yk = target_block.step(params={\"xk\": xk, \"Ak\": Ak, \"Ck\": Ck})\n", "\n", " # Apply process noise and measurement noise\n", " process_noise = rng.multivariate_normal(mean=np.zeros(Qk.shape[0]), cov=Qk).reshape(\n", " -1, 1\n", " )\n", " measurement_noise = rng.multivariate_normal(\n", " mean=np.zeros(Rk.shape[0]), cov=Rk\n", " ).reshape(-1, 1)\n", "\n", " xk_next = xk_next + process_noise\n", " yk = yk + measurement_noise\n", "\n", " zk_next, xhatk = kf_block.step(\n", " params={\n", " \"zk\": zk,\n", " \"yk\": yk,\n", " \"f_h_params\": {\"xk\": xk, \"Ak\": Ak, \"Ck\": Ck},\n", " **kf_constant_params,\n", " }\n", " )\n", "\n", " true_states.append(xk.flatten())\n", " measurements.append(yk.flatten())\n", " kf_state_estimates.append(xhatk.flatten())\n", "\n", " xk = xk_next\n", " zk = zk_next\n", "\n", "# Convert to arrays for plotting\n", "true_states = np.array(true_states)\n", "measurements = np.array(measurements)\n", "kf_state_estimates = np.array(kf_state_estimates)" ] }, { "cell_type": "code", "execution_count": null, "id": "62bd15cc", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "import matplotlib.pyplot as plt\n", "\n", "fig, axs = plt.subplots(2, 1, figsize=(12, 8))\n", "\n", "# Plot position\n", "axs[0].plot(\n", " sim_time, true_states[:, 0], \"r-\", linewidth=2, label=\"True Position\", alpha=0.7\n", ")\n", "axs[0].scatter(\n", " sim_time, measurements[:, 0], c=\"gray\", s=10, alpha=0.5, label=\"Measurements\"\n", ")\n", "axs[0].plot(sim_time, kf_state_estimates[:, 0], \"b-\", linewidth=2, label=\"KF Estimate\")\n", "axs[0].set_ylabel(\"Position\", fontsize=11)\n", "axs[0].set_title(\n", " \"Kalman Filter: 1D Constant Velocity Tracking\", fontsize=13, fontweight=\"bold\"\n", ")\n", "axs[0].legend(loc=\"best\")\n", "axs[0].grid(True, alpha=0.3)\n", "\n", "# Plot velocity\n", "axs[1].plot(\n", " sim_time, true_states[:, 1], \"r-\", linewidth=2, label=\"True Velocity\", alpha=0.7\n", ")\n", "axs[1].plot(sim_time, kf_state_estimates[:, 1], \"b-\", linewidth=2, label=\"KF Estimate\")\n", "axs[1].set_xlabel(\"Time (s)\", fontsize=11)\n", "axs[1].set_ylabel(\"Velocity\", fontsize=11)\n", "axs[1].set_title(\"Velocity Estimation\", fontsize=13, fontweight=\"bold\")\n", "axs[1].legend(loc=\"best\")\n", "axs[1].grid(True, alpha=0.3)\n", "\n", "plt.tight_layout()\n", "plt.show()" ] }, { "cell_type": "markdown", "id": "abcba40b", "metadata": {}, "source": [ "\n", "## Notes on Usage\n", ":::note\n", "What should users of your algorithm know if they want to use it? Add links to outside resources if relevant.\n", ":::\n", "\n", "The Kalman filter is an optimal unbiased estimator when **all** of the following conditions hold:\n", "\n", "1. **Linear dynamics**: The system evolution and measurement models are linear (or have been linearized)\n", "2. **Gaussian noise**: Process noise and measurement noise are approximately Gaussian\n", "3. **Known covariances**: The noise covariance matrices $Q$ and $R$ can be estimated or characterized\n", "\n", "If your system is **nonlinear**, consider using the Extended Kalman Filter (EKF) or Unscented Kalman Filter (UKF) instead. If your noise is **non-Gaussian** (e.g., heavy-tailed, multimodal), consider particle filters or other robust estimators.\n", "\n", "### Jacobian Requirements\n", "\n", "For **linear systems**, the Jacobians are simply the system matrices:\n", "\n", "- State transition Jacobian: $F_k$ is the linearized dynamics matrix\n", "- Measurement Jacobian: $H_k$ is the linearized measurement matrix\n", "\n", "In this implementation:\n", "- `Fk` should be the Jacobian $\\frac{\\partial f}{\\partial x}$ evaluated at the current estimate\n", "- `Hk` should be the Jacobian $\\frac{\\partial h}{\\partial x}$ evaluated at the current estimate\n", "\n", "For **time-invariant linear systems** (constant matrices), these are the same at every time step. For **time-varying or nonlinear systems**, recompute the Jacobians at each step.\n", "\n", "### Tuning Guidance\n", "\n", "The performance of the Kalman filter depends critically on the noise covariance matrices:\n", "\n", "- **Process noise $Q$**: Larger values → filter trusts model less, adapts faster to changes\n", "- **Measurement noise $R$**: Larger values → filter trusts measurements less, smoother estimates\n", "\n", "**Initial covariance $P_0$**: Set based on initial uncertainty. Large values indicate high initial uncertainty; the filter will converge as measurements arrive.\n", "\n", "For practical tuning, see [this guide on tuning Kalman filters](https://shepherdmoon.org/kalman_filter/tuning.html)." ] }, { "cell_type": "markdown", "id": "7680a275", "metadata": {}, "source": [ "[← Algorithms as Dynamical Systems](../../getting_started/theory_to_python/algorithms_as_dynamical_systems.rst)" ] } ], "metadata": { "kernelspec": { "display_name": ".venv", "language": "python", "name": "python3" } }, "nbformat": 4, "nbformat_minor": 5 }