{ "cells": [ { "cell_type": "markdown", "id": "e87aa7c1", "metadata": {}, "source": [ "[← Algorithms as Dynamical Systems](../../getting_started/theory_to_python/algorithms_as_dynamical_systems.rst)" ] }, { "cell_type": "markdown", "id": "8c8b7063", "metadata": { "lines_to_next_cell": 0 }, "source": [ ":::{note}\n", "This notebook serves as an example for implementing a standard algorithm in an $(f,h)$-representation. This notebooks also serves as a template for those who wish to contribute algorithms to the [Community Library](../../community/index.rst). Implementation advice is given in green; templating advice is given in blue.\n", ":::" ] }, { "cell_type": "markdown", "id": "c766459f", "metadata": { "lines_to_next_cell": 0 }, "source": [ "# Example: Kalman Filter\n", "\n", ":::note\n", "The title of the notebook should be the name of the algorithm you're implemeting. The text beneath the title should serve as a general overview of your algorithm.\n", ":::\n", "The Kalman filter is a (linearly) optimal state estimation algorithm (given Gaussian process and measurement noise). This algorithm works by employing two phases: a **prediction** phase and an **update** phase.\n", "\n", "During the prediction phase, the state estimate and its associated uncertainty are propagated forward in time using the system’s dynamical model, without reference to new measurements. This step answers the question: *given what we believed at the previous time step, what do we expect the state to be now?*\n", "\n", "In the update phase, incoming measurements are incorporated to correct the predicted state. The discrepancy between the predicted measurement and the actual measurement (the **innovation**) is used to adjust the state estimate in a statistically optimal way, balancing model confidence against measurement reliability. The Kalman gain determines how much the estimate should trust the model versus the data, and is computed directly from the predicted covariance and the measurement noise.\n", "\n", "By alternating between prediction and update, the Kalman filter produces a recursive estimate of the system state that minimizes mean-squared estimation error among all linear unbiased estimators. Importantly, the filter does not require storing the full measurement history: all relevant information is compactly summarized in the current state estimate and its covariance (via Bayesian magic).\n", "\n", ":::note\n", "Add helpful links if appropriate.\n", ":::\n", " For further information, read the [Wikipedia article](https://en.wikipedia.org/wiki/Kalman_filter). For Kalman's original paper on arxiv, which has a wonderful manifold projection interpretation of optimal estimation, read [the paper here ](https://www.cs.cmu.edu/~motionplanning/papers/sbp_papers/k/Kalman1960.pdf?utm_source=chatgpt.com).\n", "\n", ":::note\n", "If your algorithm is included in the `pykal.algorithm_library`, then include a link to the API reference.\n", ":::\n", "The Kalman filter dynamical system as implemented in this notebook may be found in the API reference under \"Estimators\": [Kalman Filter](../../api/algorithm_library_estimators.rst)." ] }, { "cell_type": "markdown", "id": "1f21b658", "metadata": {}, "source": [ "## Definition of Algorithm\n", "\n", "### Notation and Assumptions\n", ":::note\n", "Here you define variables, terminology, and notation, as well as relevant assumptions (e.g. Gaussian noise)\n", ":::\n", "\n", "We model a discrete-time linear dynamical system with Gaussian noise by\n", "\n", "\n", "$$\n", "\\begin{aligned}\n", "x_{k+1} &= F_k x_k + B_k u_k + w_k, \\\\\n", "y_k &= H_k x_k + v_k.\n", "\\end{aligned}\n", "$$\n", "\n", "Where\n", "- $x_k \\in \\mathbb{R}^n$ is the (hidden) state at time step $k$,\n", "- $u_k \\in \\mathbb{R}^p$ is a known control input,\n", "- $y_k \\in \\mathbb{R}^m$ is the measurement,\n", "- $F_k \\in \\mathbb{R}^{n\\times n}$ is the state transition matrix,\n", "- $B_k \\in \\mathbb{R}^{n\\times p}$ is the control-input matrix,\n", "- $H_k \\in \\mathbb{R}^{m\\times n}$ is the measurement matrix.\n", "\n", "We assume that the process noise and measurement noise are modeled as zero-mean Gaussian random variables\n", "\n", "$$\n", "\\begin{aligned}\n", "w_k & \\sim \\mathcal{N}(0, Q_k)\\\\\n", "v_k & \\sim \\mathcal{N}(0, R_k)\n", "\\end{aligned}\n", "$$\n", "\n", "with covariances $Q_k \\in \\mathbb{R}^{n\\times n}$ and $R_k \\in \\mathbb{R}^{m\\times m}$. We also assume an initial Gaussian prior\n", "\n", "$$\n", "x_0 \\sim \\mathcal{N}(\\hat{x}_{0|0}, P_{0|0}).\n", "$$\n", "\n", "### How it works\n", ":::note\n", " An overview is great, but a full derivation and/or examples are unnecessary. You can just link to relevant resources and examples if you wish.\n", ":::\n", "\n", "The Kalman filter produces, at each time $k$, a Gaussian posterior estimate of the state conditioned on measurements up to time $k$:\n", "\n", "$$\n", "p(x_k \\mid y_{0:k}) \\approx \\mathcal{N}(\\hat{x}_{k|k}, P_{k|k}),\n", "$$\n", "\n", "where $\\hat{x}_{k|k}$ is the posterior mean (state estimate) and $P_{k|k}$ is the posterior covariance (uncertainty). It produces this estimate through a recursive **predict–update** cycle.\n", "\n", "In the **prediction** step, the previous posterior $(\\hat{x}_{k-1|k-1}, P_{k-1|k-1})$ is propagated through the system dynamics to obtain a prior (or *a priori*) estimate:\n", "\n", "$$\n", "\\hat{x}_{k|k-1} = f(\\hat{x}_{k-1|k-1}, u_{k-1},\\dots), \\qquad\n", "P_{k|k-1} = F_k P_{k-1|k-1} F_k^\\top + Q_k,\n", "$$\n", "\n", "where $F_k$ is the linearized (or exact) state transition model and $Q_k$ is the process noise covariance. This step accounts for both model evolution and uncertainty growth.\n", "\n", "In the **update** step, the predicted state is corrected using the new measurement $y_k$. The innovation\n", "\n", "$$\n", "\\nu_k = y_k - h(\\hat{x}_{k|k-1},\\dots)\n", "$$\n", "\n", "measures the mismatch between the predicted and observed outputs, with innovation covariance\n", "\n", "$$\n", "S_k = H_k P_{k|k-1} H_k^\\top + R_k.\n", "$$\n", "\n", "The Kalman gain\n", "\n", "$$\n", "K_k = P_{k|k-1} H_k^\\top S_k^{-1}\n", "$$\n", "\n", "determines how the innovation is mapped back into the state space. The posterior estimate is then\n", "\n", "$$\n", "\\hat{x}_{k|k} = \\hat{x}_{k|k-1} + K_k \\nu_k, \\qquad\n", "P_{k|k} = (I - K_k H_k) P_{k|k-1}.\n", "$$\n", "\n", "This recursive structure yields the minimum-variance linear unbiased estimate under Gaussian noise assumptions, while requiring only the current estimate and covariance rather than the full measurement history.\n", "\n", "For a full derivation, see [Kalman's original paper](https://ntrs.nasa.gov/citations/19860016041) or [this tutorial](https://www.bzarg.com/p/how-a-kalman-filter-works-in-pictures/)." ] }, { "cell_type": "markdown", "id": "385df8dd", "metadata": { "lines_to_next_cell": 0 }, "source": [ "## Algorithm as an $(f,h)$-representation\n", "::: note\n", "Define the $(f,h)$ system representation of the algorithm, with references to earlies sections as needed. You can include reasons as to why you defined the parameters, $f$, and $h$ as you did, as it may prove helpful for others looking at your notebook.\n", ":::\n", "Looking at the \"How it works\" section above, we see that a natural choice of state for the Kalman Filter is the pair\n", "\n", "$$\n", "z_k := (\\hat{x}_{k|k}, P_{k|k}),\n", "$$\n", "\n", "where $\\hat{x}_{k|k} \\in \\mathbb{R}^n$ is the current state estimate (posterior mean) and $P_{k|k} \\in \\mathbb{R}^{n\\times n}$ is the current posterior covariance. We choose our other expicit parameters to be the measurements $y_{k+1}$ and the input control input $u_k$. We choose our implicit parameters to be the functions $f$ and $h$, the matrices $F_k,H_k,Q_k$ and $R_k$, and the parameters for $f$ and $h$ (since we will need to call them inside of our dynamical system to generate our predictions).\n", "\n", ":::tip\n", "To determine what the state should be for your system, it is helpful to first think of what variables must be \"updated\" during each iteration. To determine which parameters should be explicit or implicit, recall that the state and, if present, the time, must always be explicit parameter, and choose other explicit parameters to be variables that interact with other dynamical systems in a block diagram, are able to change over different iterations, or are otherwise meaningful to the problem at hand. Again, choosing which parameters are explicit vs implicit is a matter of notational convenience and nothing more; don't lose any sleep over it.\n", ":::\n", "\n", "With the parameters defined thus, we define the Kalman Filter's $(f,h)$-representation as\n", "\n", "$$\n", "z_{k+1} = f(z_k, y_{k+1},u_k,\\dots), \\\\\n", "\\hat{x}_k=h(z_k, y_{k+1},u_k,\\dots)\n", "$$\n", "\n", "which is implemented as follows:\n" ] }, { "cell_type": "code", "execution_count": null, "id": "6371431a", "metadata": { "lines_to_next_cell": 2 }, "outputs": [], "source": [ "# Import any packages that are necessary to define and run your functions. For example, if your functions need \"scipy\" or \"cvxpy\" or even specific typing for their arguments, import them.\n", "\n", "from typing import Tuple, Callable, Dict\n", "from numpy.typing import NDArray\n", "from pykal.dynamical_system import DynamicalSystem\n", "import numpy as np\n", "\n", "\n", "def f(\n", " *,\n", " zk: Tuple[NDArray, NDArray],\n", " yk: NDArray,\n", " f: Callable,\n", " f_h_params: Dict,\n", " h: Callable,\n", " Fk: NDArray,\n", " Qk: NDArray,\n", " Hk: NDArray,\n", " Rk: NDArray,\n", ") -> Tuple[NDArray, NDArray]:\n", " \"\"\"\n", " Perform one full **predict–update** step of the discrete-time Kalman Filter.\n", " \"\"\"\n", "\n", " # === Extract covariance ===\n", " _, Pk = zk\n", "\n", " # === Predict ===\n", " x_pred = DynamicalSystem._smart_call(\n", " f, f_h_params\n", " ) # smart parameter binding (see \"Parameter Binding\" in \"DynamicalSystem\" notebook)\n", " P_pred = Fk @ Pk @ Fk.T + Qk\n", "\n", " # === Innovation ===\n", " y_pred = DynamicalSystem._smart_call(h, f_h_params)\n", " innovation = yk - y_pred\n", "\n", " # === Update ===\n", " Sk = Hk @ P_pred @ Hk.T + Rk\n", " ridge = 1e-9 * np.eye(Sk.shape[0])\n", " try:\n", " Sk_inv = np.linalg.inv(Sk + ridge)\n", " except np.linalg.LinAlgError:\n", " Sk_inv = np.linalg.pinv(Sk + ridge)\n", "\n", " Kk = P_pred @ Hk.T @ Sk_inv\n", " x_upd = x_pred + Kk @ innovation\n", "\n", " I = np.eye(P_pred.shape[0])\n", " P_upd = (I - Kk @ Hk) @ P_pred @ (I - Kk @ Hk).T + Kk @ Rk @ Kk.T\n", " P_upd = 0.5 * (P_upd + P_upd.T)\n", "\n", " return (x_upd, P_upd)\n", "\n", "\n", "def h(zk: Tuple[NDArray, NDArray]):\n", " return zk[0]" ] }, { "cell_type": "markdown", "id": "efb4b673", "metadata": { "lines_to_next_cell": 0 }, "source": [ "### Example: 1D Constant Velocity Tracking\n", "::: note\n", "Finally, show an example of your algorithm in action.This should be a simple example that gets the idea across on how to use your algorithm in a standard simulation loop.\n", ":::\n", "\n", "We track a target moving in 1D with (approximately) constant velocity, where we only observe its **position**. We suppose this is a linear system with zero-mean Gaussian process and measurement noise.\n", "\n", "Note that even though we only **measure position**, the Kalman filter is able to **estimate both position and velocity**. This is because the filter uses the system's dynamical model (constant velocity motion) to infer velocity from the sequence of position measurements. This demonstrates the power of model-based state estimation: by leveraging knowledge of how the system evolves, the Kalman filter can estimate hidden states that are not directly observed." ] }, { "cell_type": "markdown", "id": "32d4aa41", "metadata": {}, "source": [ "