{ "cells": [ { "cell_type": "markdown", "id": "a85cc762", "metadata": {}, "source": [ "[← Control Systems as Dynamical Systems](../../../getting_started/theory_to_python/control_systems_as_dynamical_systems.rst)" ] }, { "cell_type": "markdown", "id": "9f7593f9", "metadata": {}, "source": [ "# Example: TurtleBot with Noisy Odometry" ] }, { "cell_type": "markdown", "id": "68525bd5", "metadata": {}, "source": [ "In this notebook, we model a TurtleBot3 navigating towards waypoints with noisy odometry data and use ``pykal`` to recast it as a composition of dynamical systems.\n", "\n", "## System Overview\n", "\n", "We consider uneven terrain and wheel slip as process noise, and measurement quantization as measurement noise. The Turtlebot3 offers a high-level controllers that lets us abstract away many details, as we will see below." ] }, { "cell_type": "markdown", "id": "fddaa3a3", "metadata": {}, "source": [ "## Block Diagram\n", "\n", "We can model the TurtleBot3 feedback system we are interested in as follows:\n", "\n", "\n", "\n", "where **waypoint generator** produces reference trajectories (e.g., \"move to\" coordinates), **velocity command** is a controller that outputs linear and angular velocity, **TurtleBot** is our robot with an odometry sensor, that is, with wheel encoders measuring position and orientation, and **Kalman Filter** is the state observer that fuses control inputs and the measurement from the odometry sensor.\n", "\n", ":::{note}\n", "The TurtleBot has an onboard low-level controller that converts linear and angular velocity commands to the to the appropriate wheel speeds. Thus, our \"velocity controller\" is simply an algorithm that generates such commands.\n", ":::" ] }, { "cell_type": "markdown", "id": "13d262bf", "metadata": {}, "source": [ "### Discrete-time Dynamical Systems\n", "\n", "For a minimal working knowledge of the theory of discrete-time dynamical systems, check out the :doc:`\"DynamicalSystem\" <./dynamical_system>` notebook under \"Modules\" . We assume a basic familiarity with the theory and proceed with casting our diagram blocks as dynamical system blocks.\n", "\n", "\n", "\n", "We discuss the derivation of each dynamical system block below.\n", ":::{note} This diagram is techincally a lie: as we will soon see, our waypoint generator will require a state estimate to determine which waypoint it generates. So we should draw another $\\hat{x}$ arrow that feeds into the waypoint generator block. But we don't. Why? The state estimate $\\hat{x}$ will be passed in as an explicit paramter, and we choose not to model explicity paramters as arrows to keep our diagrams clean. See \"Function and Diagram Convention\" for more details.\n", "\n", ":::{tip} Notice that we have introduced a naming convention where subscripts on functions denote the block with which they are associated. In particular, the subscript $k$ is added to all state and output variables. Refer to :doc:`\"DynamicalSystem\" <./dynamical_system>` for the naming convention and why it is a good idea to represent functions and variables in such a way.\n" ] }, { "cell_type": "markdown", "id": "d5da675b", "metadata": {}, "source": [ "### Block 1: Waypoint Generator\n", "\n", "
\n", " \n", "
\n", "\n", "The waypoint generator produces reference poses $(x, y, \\theta)$ for the robot to track. For this tutorial, we'll create a simple square trajectory. To do this, we will define a sequence of poses that represents a square (in this case, four poses), and a method for choosing which pose our generator outputs. We do so by defining our set of poses as a **list** and having our state be an index of said list.\n", "\n", "For a square, our sequence of poses will be\n", "\n", "$$\n", "r_i \\in \\left\\{ (l,0,0), (l,l,\\frac{\\pi}{2}), (0,l,\\frac{\\pi}),(0,0,\\frac{-\\pi}{2})\\right\\}\n", "$$\n", "\n", "where $l$ is the length of each side and $i$ is a list-compatible index (so $i \\in \\left\\{0,1,2,3 \\right\\}). We now have the following:\n", "\n", "**Explicit Parameters**:\n", "- Current list index $w_k \\in \\{0, 1, 2, 3\\}$ (state)\n", "\n", "**Evolution Function**:\n", "$$\n", "w_{k+1} = f(w_k,\\dots) = \\begin{cases}w_k & \\text{ if wayboint not reached} \\\\ (w_k + 1) \\mod 4 & \\text{when waypoint reached}\\end{cases}\n", "$$\n", "\n", "Note that we need to define the \"waypoint reached\" condition precisely. We can do this by having another function, $\\text{is_waypoint_reached}$, that is True when the current state estimate is within tolerance of the waypoint, and False otherwise; that is,\n", "\n", "$$\n", "\\text{is_waypoint_reached}(r_j,\\hat{x}) = \\begin{cases} \\text{True} & \\text{if } ||r_j - \\hat{x}||_2 < \\text{tolerance} \\\\\n", "\\text{False} & \\text{otherwise} \\end{cases}\n", "$$\n", "\n", "where **tolerance** is a variable we can define at runtime. Our evolution function then becomes\n", "\n", "$$\n", "w_{k+1} = f(w_k) = \\begin{cases}w_k & \\text{is_waypoint_reached}=\\text{False} \\\\ (w_k + 1) \\mod 4 & \\text{is_waypoint_reached} = \\text{True}\\end{cases}\n", "$$\n", "\n", "**Output Function**:\n", "\n", "$$\n", "h_w(w_k) = r_{w_k}\n", "$$\n", "\n", "** Implicit Parameters**:\n", "- tolerance\n", "- sequence of waypoints" ] }, { "cell_type": "code", "execution_count": null, "id": "829b45e2", "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "import matplotlib.pyplot as plt\n", "from pykal import DynamicalSystem\n", "from pykal.data_change import corrupt\n", "\n", "# w_k in {0, 1, 2, 3}\n", "indices = [0, 1, 2, 3]\n", "\n", "# r_{i} in {r_0, r_1, r_2, r_3}\n", "sequence_of_waypoints = [\n", " np.array([[2.0], [0.0], [0.0]]), # Right\n", " np.array([[2.0], [2.0], [np.pi / 2]]), # Up\n", " np.array([[0.0], [2.0], [np.pi]]), # Left\n", " np.array([[0.0], [0.0], [-np.pi / 2]]), # Down (back to start)\n", "]\n", "\n", "\n", "def fw(\n", " wk: int, sequence_of_waypoints: list, xhat: np.ndarray, tol: float\n", ") -> int:\n", " # Get current target waypoint\n", " rk = sequence_of_waypoints[wk]\n", "\n", " # Check if waypoint pose is reached\n", " distance = np.linalg.norm(rk - xhat, ord=2)\n", " is_reached = distance < tol\n", "\n", " if is_reached:\n", " # Move to next waypoint\n", " next_idx = (wk + 1) % 4\n", " return next_idx\n", " else:\n", " # Stay at current waypoint\n", " return wk\n", "\n", "\n", "def hw(wk: int, sequence_of_waypoints: list) -> np.ndarray:\n", " return sequence_of_waypoints[wk]\n", "\n", "\n", "waypoint_generator = DynamicalSystem(f=fw, h=hw, state_name=\"wk\")\n", "\n", "# Test the waypoint generator\n", "wk_test = 0 # start at first waypoint\n", "xhat_test = np.array(\n", " [[0.1], [0.0], [0.0]]\n", ") # Close to first waypoint (but not close enough per \"tol\")\n", "\n", "# Not yet reached (far away)\n", "wk_next, rk = waypoint_generator.step(\n", " params={\n", " \"wk\": wk_test,\n", " \"xhat\": xhat_test,\n", " \"sequence_of_waypoints\": sequence_of_waypoints,\n", " \"tol\": 0.05,\n", " })\n", "print(\" Not reached:\")\n", "print(f\" Current waypoint index: {wk_next}\")\n", "print(f\" Reference pose: {rk.flatten()}\")\n", "\n", "# Waypoint reached (within tolerance)\n", "xhat_reached = np.array([[2.01], [0.01], [0.0]]) # within \"tol\" of first waypoint\n", "wk_next, rk = waypoint_generator.step(\n", " params={\n", " \"wk\": wk_next,\n", " \"xhat\": xhat_reached,\n", " \"sequence_of_waypoints\": sequence_of_waypoints,\n", " \"tol\": 0.05,\n", " })\n", "print(\"Reached! Moving to next waypoint.):\")\n", "print(f\" Next waypoint index: {wk_next\")\n", "print(f\" Reference pose: {rk.flatten()}\")" ] }, { "cell_type": "markdown", "id": "47938f80", "metadata": { "lines_to_next_cell": 2 }, "source": [ "### Block 2: Velocity Command Generator\n", "\n", "
\n", " \n", "
\n", "\n", "The velocity controller generates $(v_{\\text{cmd}}, \\omega_{\\text{cmd}})$ -- linear and angular velocity commands -- to drive the TurtleBot3 towards the reference pose $r_k = (x_r, y_r, \\theta_r)$. We use a simple proportional controller based on the distance to the goal and the heading error.\n", "\n", "Since a proportional controller has no internal state (no memory), it is a **stateless dynamical system** with only an output function $h$ (recall that $f: \\varnothing \\to \\varnothing$ for stateless systems).\n", "\n", "**Explicit Parameters**:\n", "- Current state estimate $\\hat{x}_k = (\\hat{x}, \\hat{y}, \\hat{\\theta}, \\dots)$\n", "- Reference pose $r_k = (x_r, y_r, \\theta_r)$\n", "\n", "**Evolution Function**:\n", "$$\n", "f = \\varnothing \\quad \\text{(no state evolution)}\n", "$$\n", "\n", "**Output Function**:\n", "$$\n", "h_c(\\hat{x}_k, r_k) = \\begin{bmatrix} v_{\\text{cmd}} \\\\ \\omega_{\\text{cmd}} \\end{bmatrix} = \\begin{bmatrix}\n", "\\text{sat}(K_v \\cdot d, v_{\\max}) \\\\\n", "\\text{sat}(K_\\omega \\cdot e_\\theta, \\omega_{\\max})\n", "\\end{bmatrix}\n", "$$\n", "\n", "where:\n", "- $d = \\sqrt{(x_r - \\hat{x})^2 + (y_r - \\hat{y})^2}$ is the Euclidean distance to the goal\n", "- $e_\\theta = \\text{wrap}(\\theta_r - \\hat{\\theta})$ is the heading error wrapped to $[-\\pi, \\pi]$\n", "- $\\text{sat}(\\cdot, \\max)$ denotes saturation to respect physical velocity limits\n", "\n", "**Implicit Parameters**:\n", "- $K_v$: Linear velocity gain\n", "- $K_\\omega$: Angular velocity gain\n", "- $v_{\\max}$: Maximum linear velocity (TurtleBot3: 0.22 m/s)\n", "- $\\omega_{\\max}$: Maximum angular velocity (TurtleBot3: 2.84 rad/s)\n", "\n", ":::{note}\n", "We use a proportional controller rather than a full PID controller because the TurtleBot's dynamics are slow and well-damped. For faster or more complex systems, integral and derivative terms would be needed (see the car cruise control example).\n", ":::\n" ] }, { "cell_type": "code", "execution_count": null, "id": "57f4115c", "metadata": { "lines_to_next_cell": 2 }, "outputs": [], "source": [ "def hc(\n", " xhat: np.ndarray,\n", " r: np.ndarray,\n", " Kv: float,\n", " Komega: float,\n", " max_v: float,\n", " max_omega: float) -> np.ndarray:\n", " # Extract positions (first 3 elements in case xhat has velocities)\n", " x, y, theta = xhat.flatten()\n", " x_r, y_r, theta_r = r.flatten()\n", "\n", " # Distance to goal\n", " dx = x_r - x\n", " dy = y_r - y\n", " distance = np.sqrt(dx**2 + dy**2)\n", "\n", " # Heading error (wrap to [-pi, pi])\n", " heading_error = theta_r - theta\n", " heading_error = np.arctan2(np.sin(heading_error), np.cos(heading_error))\n", "\n", " # Proportional control\n", " v_cmd = Kv * distance\n", " omega_cmd = Komega * heading_error\n", "\n", " # Saturate commands\n", " v_cmd = np.clip(v_cmd, -max_v, max_v)\n", " omega_cmd = np.clip(omega_cmd, -max_omega, max_omega)\n", "\n", " return np.array([[v_cmd], [omega_cmd]])\n", "\n", "\n", "# Create the controller DynamicalSystem (stateless, so f=None)\n", "velocity_controller = DynamicalSystem(h=hc)\n", "\n", "# Test the controller\n", "xhat_test = np.array([[0.0], [0.0], [0.0]]) # At origin\n", "r_test = np.array([[1.0], [1.0], [np.pi / 4]]) # Target pose\n", "\n", "uk = velocity_controller.step(\n", " params={\n", " \"xhat\": xhat_test,\n", " \"r\": r_test,\n", " \"Kv\": 0.5,\n", " \"Komega\": 1.0,\n", " \"max_v\": 0.22, # TurtleBot max linear velocity\n", " \"max_omega\": 2.84, # TurtleBot max angular velocity\n", " }\n", ")\n", "print(\"Controller output:\")\n", "print(f\" v_cmd = {uk[0,0]:.3f} m/s\")\n", "print(f\" omega_cmd = {uk[1,0]:.3f} rad/s\")" ] }, { "cell_type": "markdown", "id": "95c191f3", "metadata": { "lines_to_next_cell": 2 }, "source": [ "### Block 3: TurtleBot (Plant)\n", "\n", "
\n", " \n", "
\n", "\n", "The TurtleBot3 follows the **unicycle kinematics model**, a standard model for differential-drive robots. The robot's state includes both its pose (position and orientation) and its velocities.\n", "\n", "**Explicit Parameters**:\n", "- Current state $x_k = [x, y, \\theta, v, \\omega]^T$ (5-dimensional state)\n", "- Control input $u_k = [v_{\\text{cmd}}, \\omega_{\\text{cmd}}]^T$\n", "- Timestep $\\Delta t$\n", "\n", "**Evolution Function** (using Euler integration):\n", "$$\n", "f_p(x_k, u_k, \\Delta t) = \\begin{bmatrix}\n", "x + v \\cos(\\theta) \\Delta t \\\\\n", "y + v \\sin(\\theta) \\Delta t \\\\\n", "\\text{wrap}(\\theta + \\omega \\Delta t) \\\\\n", "v_{\\text{cmd}} \\\\\n", "\\omega_{\\text{cmd}}\n", "\\end{bmatrix}\n", "$$\n", "\n", "where $\\text{wrap}(\\cdot)$ normalizes angles to $[-\\pi, \\pi]$ using $\\text{atan2}(\\sin(\\theta), \\cos(\\theta))$.\n", "\n", "The last two equations assume **instantaneous velocity response** -- that is, we assume the low-level motor controller perfectly tracks commanded velocities. This is a reasonable approximation for the TurtleBot3 at typical speeds.\n", "\n", "**Output Function**:\n", "The wheel odometry sensor measures position and orientation (but not velocity directly):\n", "$$\n", "h_p(x_k) = \\begin{bmatrix} x \\\\ y \\\\ \\theta \\end{bmatrix}\n", "$$\n", "\n", "In practice, this measurement will be corrupted by:\n", "- Wheel slip (especially on smooth floors or uneven terrain)\n", "- Quantization from encoder resolution\n", "- Systematic bias from wheel diameter mismatch\n", "- Occasional spikes from bumps or obstacles\n", "\n", "We will add realistic sensor corruption during simulation using `data_change.corrupt`.\n", "\n", "**Implicit Parameters**:\n", "- None (all parameters are explicit for the plant dynamics)\n" ] }, { "cell_type": "code", "execution_count": null, "id": "f9b74b20", "metadata": {}, "outputs": [], "source": [ "def turtlebot_f(xk: np.ndarray, uk: np.ndarray, dt: float) -> np.ndarray:\n", " \"\"\"\n", " TurtleBot unicycle dynamics (noise-free).\n", "\n", " Parameters\n", " ----------\n", " xk : np.ndarray\n", " Current state [x, y, theta, v, omega], shape (5,1)\n", " uk : np.ndarray\n", " Control input [v_cmd, omega_cmd], shape (2,1)\n", " dt : float\n", " Timestep (seconds)\n", "\n", " Returns\n", " -------\n", " xk_next : np.ndarray\n", " Next state, shape (5,1)\n", " \"\"\"\n", " x, y, theta, v, omega = xk.flatten()\n", " v_cmd, omega_cmd = uk.flatten()\n", "\n", " # Euler integration\n", " x_new = x + v * np.cos(theta) * dt\n", " y_new = y + v * np.sin(theta) * dt\n", " theta_new = theta + omega * dt\n", "\n", " # Wrap theta to [-pi, pi]\n", " theta_new = np.arctan2(np.sin(theta_new), np.cos(theta_new))\n", "\n", " # Update velocities (assume instantaneous response)\n", " v_new = v_cmd\n", " omega_new = omega_cmd\n", "\n", " return np.array([[x_new], [y_new], [theta_new], [v_new], [omega_new]])\n", "\n", "\n", "def turtlebot_h(xk: np.ndarray) -> np.ndarray:\n", " \"\"\"\n", " Measurement function: observe [x, y, theta] from odometry.\n", "\n", " Parameters\n", " ----------\n", " xk : np.ndarray\n", " State [x, y, theta, v, omega], shape (5,1)\n", "\n", " Returns\n", " -------\n", " yk : np.ndarray\n", " Measurement [x, y, theta], shape (3,1)\n", " \"\"\"\n", " return xk[:3, :]\n", "\n", "\n", "# Create the plant DynamicalSystem\n", "plant = DynamicalSystem(f=turtlebot_f, h=turtlebot_h, state_name=\"xk\")\n", "\n", "# Test the plant\n", "xk_test = np.array([[0.0], [0.0], [0.0], [0.0], [0.0]]) # Start at origin, stationary\n", "uk_test = np.array([[0.2], [0.5]]) # Move forward and turn\n", "dt = 0.1\n", "\n", "xk_next, yk = plant.step(\n", " params={\"xk\": xk_test, \"uk\": uk_test, \"dt\": dt})\n", "print(\"After one step:\")\n", "print(f\" State: {xk_next.flatten()}\")\n", "print(f\" Measurement: {yk.flatten()}\")" ] }, { "cell_type": "markdown", "id": "053b8257", "metadata": {}, "source": [ "### Block 4: Kalman Filter (Observer)\n", "\n", "
\n", " \n", "
\n", "\n", "Since the TurtleBot dynamics are **nonlinear** (due to $\\cos(\\theta)$ and $\\sin(\\theta)$ in the evolution function), we use the **Extended Kalman Filter (EKF)**, which linearizes the dynamics around the current state estimate.\n", "\n", "The observer maintains both a state estimate $\\hat{x}_k$ and a covariance matrix $P_k$ representing uncertainty. Thus, the observer state is the tuple $(\\hat{x}_k, P_k)$.\n", "\n", "**Explicit Parameters**:\n", "- Current estimate and covariance: $\\text{xhat_P}_k = (\\hat{x}_k, P_k)$\n", "- Measurement $y_k$ from the odometry sensor\n", "- Control input $u_k$ (same as applied to the plant)\n", "\n", "**Evolution Function**:\n", "The EKF performs a **predict-update cycle**:\n", "\n", "**Predict step**:\n", "$$\n", "\\begin{aligned}\n", "\\hat{x}_k^- &= f_p(\\hat{x}_{k-1}, u_{k-1}, \\Delta t) \\quad \\text{(state prediction)} \\\\\n", "P_k^- &= F_k P_{k-1} F_k^T + Q_k \\quad \\text{(covariance prediction)}\n", "\\end{aligned}\n", "$$\n", "\n", "**Update step**:\n", "$$\n", "\\begin{aligned}\n", "\\nu_k &= y_k - h_p(\\hat{x}_k^-) \\quad \\text{(innovation)} \\\\\n", "S_k &= H_k P_k^- H_k^T + R_k \\quad \\text{(innovation covariance)} \\\\\n", "K_k &= P_k^- H_k^T S_k^{-1} \\quad \\text{(Kalman gain)} \\\\\n", "\\hat{x}_k &= \\hat{x}_k^- + K_k \\nu_k \\quad \\text{(state update)} \\\\\n", "P_k &= (I - K_k H_k) P_k^- (I - K_k H_k)^T + K_k R_k K_k^T \\quad \\text{(covariance update)}\n", "\\end{aligned}\n", "$$\n", "\n", "The evolution function is:\n", "$$\n", "f_{\\text{kf}}(\\text{xhat_P}_k, y_k, u_k) = (\\hat{x}_{k+1}, P_{k+1})\n", "$$\n", "\n", "**Output Function**:\n", "$$\n", "h_{\\text{kf}}(\\text{xhat_P}_k) = \\hat{x}_k \\quad \\text{(extract state estimate)}\n", "$$\n", "\n", "**Implicit Parameters**:\n", "\n", "**Linearization Jacobians**:\n", "- $F_k = \\frac{\\partial f_p}{\\partial x}\\bigg|_{\\hat{x}_k}$: State transition Jacobian (5×5)\n", " $$\n", " F_k = \\begin{bmatrix}\n", " 1 & 0 & -v \\sin(\\theta) \\Delta t & \\cos(\\theta) \\Delta t & 0 \\\\\n", " 0 & 1 & v \\cos(\\theta) \\Delta t & \\sin(\\theta) \\Delta t & 0 \\\\\n", " 0 & 0 & 1 & 0 & \\Delta t \\\\\n", " 0 & 0 & 0 & 1 & 0 \\\\\n", " 0 & 0 & 0 & 0 & 1\n", " \\end{bmatrix}\n", " $$\n", "\n", "- $H_k = \\frac{\\partial h_p}{\\partial x}\\bigg|_{\\hat{x}_k}$: Measurement Jacobian (3×5, constant for this system)\n", " $$\n", " H_k = \\begin{bmatrix}\n", " 1 & 0 & 0 & 0 & 0 \\\\\n", " 0 & 1 & 0 & 0 & 0 \\\\\n", " 0 & 0 & 1 & 0 & 0\n", " \\end{bmatrix}\n", " $$\n", "\n", "**Noise Covariances**:\n", "- $Q_k$: Process noise covariance (5×5) -- models uncertainty in the dynamics (wheel slip, unmodeled effects)\n", "- $R_k$: Measurement noise covariance (3×3) -- models odometry sensor noise\n", "\n", "**Plant dynamics functions** (needed for prediction):\n", "- $f_p$: Plant evolution function\n", "- $h_p$: Plant measurement function\n", "\n", ":::{note}\n", "The EKF implementation is provided in `pykal.algorithm_library.estimators.kf`. It uses the **Joseph form** for covariance update to ensure numerical stability and maintains symmetry through explicit symmetrization.\n", ":::\n" ] }, { "cell_type": "code", "execution_count": null, "id": "97799dce", "metadata": {}, "outputs": [], "source": [ "from pykal.algorithm_library.estimators import kf\n", "\n", "\n", "def compute_F_jacobian(xhat: np.ndarray, dt: float) -> np.ndarray:\n", " \"\"\"\n", " Compute Jacobian of dynamics for TurtleBot.\n", "\n", " Parameters\n", " ----------\n", " xhat : np.ndarray\n", " Current state estimate [x, y, theta, v, omega], shape (5,1)\n", " dt : float\n", " Timestep\n", "\n", " Returns\n", " -------\n", " F : np.ndarray\n", " Jacobian matrix, shape (5, 5)\n", " \"\"\"\n", " _, _, theta, v, _ = xhat.flatten()\n", "\n", " F = np.array(\n", " [\n", " [1, 0, -v * np.sin(theta) * dt, np.cos(theta) * dt, 0],\n", " [0, 1, v * np.cos(theta) * dt, np.sin(theta) * dt, 0],\n", " [0, 0, 1, 0, dt],\n", " [0, 0, 0, 1, 0],\n", " [0, 0, 0, 0, 1],\n", " ]\n", " )\n", "\n", " return F\n", "\n", "\n", "def compute_H_jacobian() -> np.ndarray:\n", " \"\"\"\n", " Compute Jacobian of measurement function (constant for this system).\n", "\n", " Returns\n", " -------\n", " H : np.ndarray\n", " Jacobian matrix, shape (3, 5)\n", " \"\"\"\n", " return np.array([[1, 0, 0, 0, 0], [0, 1, 0, 0, 0], [0, 0, 1, 0, 0]])\n", "\n", "\n", "# Process and measurement noise covariances\n", "Q_turtlebot = np.diag([0.01, 0.01, 0.02, 0.1, 0.1]) # Process noise\n", "R_turtlebot = np.diag([0.05, 0.05, 0.1]) # Odometry noise (5cm, 5cm, ~6 degrees)\n", "\n", "# Create the observer DynamicalSystem\n", "observer = DynamicalSystem(f=kf.f, h=kf.h, state_name=\"xhat_P\")\n", "\n", "# Test the observer (single step)\n", "xhat_0 = np.array([[0.0], [0.0], [0.0], [0.0], [0.0]])\n", "P_0 = np.diag([0.1, 0.1, 0.1, 1.0, 1.0])\n", "xhat_P_test = (xhat_0, P_0)\n", "\n", "# Simulate receiving a noisy measurement\n", "yk_noisy = np.array([[0.02], [0.01], [0.05]]) # Noisy measurement\n", "\n", "xhat_P_new, xhat_out = observer.step(\n", " params={\n", " \"xhat_P\": xhat_P_test,\n", " \"yk\": yk_noisy,\n", " \"f\": turtlebot_f,\n", " \"f_params\": {\"xk\": xhat_0, \"uk\": np.array([[0.0], [0.0]]), \"dt\": 0.1},\n", " \"h\": turtlebot_h,\n", " \"h_params\": {\"xk\": xhat_0},\n", " \"Fk\": compute_F_jacobian(xhat_0, dt=0.1),\n", " \"Qk\": Q_turtlebot,\n", " \"Hk\": compute_H_jacobian(),\n", " \"Rk\": R_turtlebot,\n", " })\n", "\n", "print(\"Observer estimate after one measurement:\")\n", "print(xhat_out.flatten())" ] }, { "cell_type": "markdown", "id": "85346ca1", "metadata": {}, "source": [ "## Simulation\n", "\n", "\n", "We now simulate the complete closed-loop system, integrating all four dynamical components:\n", "1. **Waypoint Generator** → reference pose $r_k$\n", "2. **Velocity Controller** → velocity commands $u_k$ (using $r_k$ and $\\hat{x}_k$)\n", "3. **TurtleBot Plant** → true state evolution and noisy measurements $y_k$\n", "4. **Kalman Filter** → state estimate $\\hat{x}_k$ (using $u_k$ and $y_k$)" ] }, { "cell_type": "markdown", "id": "6a789ec3", "metadata": {}, "source": [ "### System Parameters" ] }, { "cell_type": "code", "execution_count": null, "id": "acf51724", "metadata": {}, "outputs": [], "source": [ "# Time parameters\n", "dt = 0.1 # Sampling time (seconds)\n", "switch_time = 15.0 # Time at each waypoint (seconds)\n", "\n", "# Controller gains\n", "Kv = 0.5 # Linear velocity gain\n", "Komega = 1.5 # Angular velocity gain\n", "\n", "# Kalman filter parameters\n", "Q = np.diag([0.01, 0.01, 0.02, 0.1, 0.1]) # Process noise covariance\n", "R = np.diag([0.05, 0.05, 0.1]) # Measurement noise covariance" ] }, { "cell_type": "markdown", "id": "e6fb1ae9", "metadata": {}, "source": [ "### Initial Conditions" ] }, { "cell_type": "code", "execution_count": null, "id": "b5ce5623", "metadata": {}, "outputs": [], "source": [ "# Initial states\n", "wk = np.array([[0]]) # Waypoint generator state: start at first waypoint\n", "xk = np.array([[0.0], [0.0], [0.0], [0.0], [0.0]]) # Plant state: start at origin\n", "xhat = np.array([[0.0], [0.0], [0.0], [0.0], [0.0]]) # Observer estimate\n", "P = np.diag([0.1, 0.1, 0.1, 1.0, 1.0]) # Covariance matrix\n", "xhat_P = (xhat, P) # Observer state tuple\n", "\n", "# Waypoint reached tolerance\n", "waypoint_tol = 0.3 # Distance tolerance in meters\n", "\n", "# Controller gains (used as parameters)\n", "Kv = 0.5\n", "Komega = 1.5\n", "max_v = 0.22\n", "max_omega = 2.84\n", "\n", "# Storage for plotting\n", "time_hist = []\n", "reference_hist = []\n", "true_state_hist = []\n", "estimate_hist = []\n", "measurement_hist = []\n", "command_hist = []\n", "error_hist = []" ] }, { "cell_type": "markdown", "id": "fea451a2", "metadata": {}, "source": [ "### Simulate" ] }, { "cell_type": "code", "execution_count": null, "id": "f4ee5a91", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "# Simulation time\n", "T_sim = 60.0 # seconds\n", "time_steps = np.arange(0, T_sim, dt)\n", "\n", "# Run closed-loop simulation using all four DynamicalSystem components\n", "for tk in time_steps:\n", " # 1. Waypoint generator step\n", " wk, rk = waypoint_generator.step(\n", " params={\n", " \"wk\": wk,\n", " \"xhat\": xhat,\n", " \"waypoint_values\": sequence_of_waypoints,\n", " \"tol\": waypoint_tol,\n", " })\n", "\n", " # 2. Controller step (generates velocity commands)\n", " uk = velocity_controller.step(\n", " params={\n", " \"xhat\": xhat,\n", " \"r\": rk,\n", " \"Kv\": Kv,\n", " \"Komega\": Komega,\n", " \"max_v\": max_v,\n", " \"max_omega\": max_omega,\n", " }\n", " )\n", "\n", " # 3. Plant step (true dynamics)\n", " xk, yk_clean = plant.step(\n", " params={\"xk\": xk, \"uk\": uk, \"dt\": dt})\n", "\n", " # 4. Add measurement noise (realistic corruption)\n", " yk_flat = yk_clean.flatten()\n", "\n", " # X: bias (wheel diameter mismatch) + Gaussian noise\n", " yk_corrupted_x = corrupt.with_bias(yk_flat[0], bias=0.02)\n", " yk_corrupted_x = yk_corrupted_x + np.random.normal(0, np.sqrt(R[0, 0]))\n", "\n", " # Y: Gaussian noise\n", " yk_corrupted_y = yk_flat[1] + np.random.normal(0, np.sqrt(R[1, 1]))\n", "\n", " # Theta: Gaussian noise + occasional spikes (bumps)\n", " yk_corrupted_theta = yk_flat[2] + np.random.normal(0, np.sqrt(R[2, 2]))\n", " if np.random.rand() < 0.02: # 2% spike rate\n", " yk_corrupted_theta += np.random.choice([-1, 1]) * 0.3\n", "\n", " yk_noisy = np.array([[yk_corrupted_x], [yk_corrupted_y], [yk_corrupted_theta]])\n", "\n", " # 5. Extract current estimate for observer step\n", " xhat = observer.h(xhat_P)\n", "\n", " # 6. Observer step (EKF)\n", " Fk = compute_F_jacobian(xhat, dt)\n", " Hk = compute_H_jacobian()\n", "\n", " xhat_P, xhat_obs = observer.step(\n", " params={\n", " \"xhat_P\": xhat_P,\n", " \"yk\": yk_noisy,\n", " \"f\": turtlebot_f,\n", " \"f_params\": {\"xk\": xhat, \"uk\": uk, \"dt\": dt},\n", " \"h\": turtlebot_h,\n", " \"h_params\": {\"xk\": xhat},\n", " \"Fk\": Fk,\n", " \"Qk\": Q,\n", " \"Hk\": Hk,\n", " \"Rk\": R,\n", " })\n", "\n", " # Store results\n", " time_hist.append(tk)\n", " reference_hist.append(rk.flatten())\n", " true_state_hist.append(xk.flatten())\n", " estimate_hist.append(xhat_obs.flatten())\n", " measurement_hist.append(yk_noisy.flatten())\n", " command_hist.append(uk.flatten())\n", " error_hist.append((xk - xhat_obs).flatten())\n", "\n", "# Convert to numpy arrays\n", "reference_hist = np.array(reference_hist)\n", "true_state_hist = np.array(true_state_hist)\n", "estimate_hist = np.array(estimate_hist)\n", "measurement_hist = np.array(measurement_hist)\n", "command_hist = np.array(command_hist)\n", "error_hist = np.array(error_hist)" ] }, { "cell_type": "markdown", "id": "3bd7009d", "metadata": {}, "source": [ "### Visualize\n", "\n", "We can visualize the pertinent values of our system to assure correct behavior. Note how the Kalman filter smooths out the noisy measurements and provides accurate state estimates even in the presence of sensor corruption." ] }, { "cell_type": "code", "execution_count": null, "id": "4df69268", "metadata": { "lines_to_next_cell": 2, "tags": [ "hide-input" ] }, "outputs": [], "source": [ "fig, axs = plt.subplots(3, 2, figsize=(14, 12))\n", "\n", "# Plot 1: 2D Trajectory\n", "ax = axs[0, 0]\n", "ax.plot(\n", " true_state_hist[:, 0],\n", " true_state_hist[:, 1],\n", " \"b-\",\n", " label=\"True Trajectory\",\n", " linewidth=2,\n", " alpha=0.7)\n", "ax.plot(\n", " estimate_hist[:, 0],\n", " estimate_hist[:, 1],\n", " \"r--\",\n", " label=\"Estimated Trajectory\",\n", " linewidth=2,\n", " alpha=0.7)\n", "ax.scatter(\n", " reference_hist[:, 0],\n", " reference_hist[:, 1],\n", " c=\"green\",\n", " s=100,\n", " marker=\"*\",\n", " label=\"Waypoints\",\n", " zorder=5)\n", "ax.set_xlabel(\"X Position (m)\", fontsize=11)\n", "ax.set_ylabel(\"Y Position (m)\", fontsize=11)\n", "ax.set_title(\"2D Trajectory Tracking\", fontsize=13, fontweight=\"bold\")\n", "ax.legend()\n", "ax.grid(True, alpha=0.3)\n", "ax.axis(\"equal\")\n", "\n", "# Plot 2: X Position vs Time\n", "ax = axs[0, 1]\n", "ax.plot(time_hist, true_state_hist[:, 0], \"b-\", label=\"True X\", alpha=0.7)\n", "ax.plot(time_hist, estimate_hist[:, 0], \"r--\", label=\"Estimated X\", alpha=0.7)\n", "ax.scatter(\n", " time_hist[::10],\n", " measurement_hist[::10, 0],\n", " c=\"gray\",\n", " s=10,\n", " alpha=0.3,\n", " label=\"Measurements\")\n", "ax.set_ylabel(\"X Position (m)\", fontsize=11)\n", "ax.set_title(\"X Position Over Time\", fontsize=13, fontweight=\"bold\")\n", "ax.legend()\n", "ax.grid(True, alpha=0.3)\n", "\n", "# Plot 3: Y Position vs Time\n", "ax = axs[1, 0]\n", "ax.plot(time_hist, true_state_hist[:, 1], \"b-\", label=\"True Y\", alpha=0.7)\n", "ax.plot(time_hist, estimate_hist[:, 1], \"r--\", label=\"Estimated Y\", alpha=0.7)\n", "ax.scatter(\n", " time_hist[::10],\n", " measurement_hist[::10, 1],\n", " c=\"gray\",\n", " s=10,\n", " alpha=0.3,\n", " label=\"Measurements\")\n", "ax.set_ylabel(\"Y Position (m)\", fontsize=11)\n", "ax.set_title(\"Y Position Over Time\", fontsize=13, fontweight=\"bold\")\n", "ax.legend()\n", "ax.grid(True, alpha=0.3)\n", "\n", "# Plot 4: Heading vs Time\n", "ax = axs[1, 1]\n", "ax.plot(\n", " time_hist,\n", " np.rad2deg(true_state_hist[:, 2]),\n", " \"b-\",\n", " label=\"True Heading\",\n", " alpha=0.7)\n", "ax.plot(\n", " time_hist,\n", " np.rad2deg(estimate_hist[:, 2]),\n", " \"r--\",\n", " label=\"Estimated Heading\",\n", " alpha=0.7)\n", "ax.set_ylabel(\"Heading (degrees)\", fontsize=11)\n", "ax.set_title(\"Heading Over Time\", fontsize=13, fontweight=\"bold\")\n", "ax.legend()\n", "ax.grid(True, alpha=0.3)\n", "\n", "# Plot 5: Velocity Commands\n", "ax = axs[2, 0]\n", "ax.plot(\n", " time_hist,\n", " command_hist[:, 0],\n", " \"g-\",\n", " label=\"Linear Velocity\",\n", " linewidth=1.5)\n", "ax.set_ylabel(\"Linear Velocity (m/s)\", fontsize=11)\n", "ax.set_xlabel(\"Time (s)\", fontsize=11)\n", "ax.set_title(\"Control Commands\", fontsize=13, fontweight=\"bold\")\n", "ax.legend(loc=\"upper left\")\n", "ax.grid(True, alpha=0.3)\n", "\n", "ax_omega = ax.twinx()\n", "ax_omega.plot(\n", " time_hist,\n", " command_hist[:, 1],\n", " \"purple\",\n", " label=\"Angular Velocity\",\n", " linewidth=1.5,\n", " alpha=0.7)\n", "ax_omega.set_ylabel(\"Angular Velocity (rad/s)\", fontsize=11, color=\"purple\")\n", "ax_omega.tick_params(axis=\"y\", labelcolor=\"purple\")\n", "ax_omega.legend(loc=\"upper right\")\n", "\n", "# Plot 6: Position Estimation Error\n", "ax = axs[2, 1]\n", "position_error = np.sqrt(error_hist[:, 0] ** 2 + error_hist[:, 1] ** 2)\n", "ax.plot(time_hist, position_error, \"m-\", label=\"Position Error\", linewidth=1.5)\n", "ax.set_xlabel(\"Time (s)\", fontsize=11)\n", "ax.set_ylabel(\"Position Error (m)\", fontsize=11)\n", "ax.set_title(\"Estimation Error Magnitude\", fontsize=13, fontweight=\"bold\")\n", "ax.legend()\n", "ax.grid(True, alpha=0.3)\n", "\n", "plt.tight_layout()\n", "plt.show()\n", "\n", "print(f\"Mean position error: {np.mean(position_error):.4f} m\")\n", "print(f\"Max position error: {np.max(position_error):.4f} m\")" ] }, { "cell_type": "markdown", "id": "605edf07", "metadata": {}, "source": [ "Now we can run the same simulation with a much cleaner interface. All system parameters can be configured at initialization, and the simulation loop becomes trivial:" ] }, { "cell_type": "code", "execution_count": null, "id": "acf64997", "metadata": { "tags": [ "hide-input" ] }, "outputs": [], "source": [ "# Visualize results\n", "fig, axs = plt.subplots(3, 2, figsize=(14, 12))\n", "\n", "# Plot 1: 2D Trajectory\n", "ax = axs[0, 0]\n", "ax.plot(\n", " results[\"true_state\"][:, 0],\n", " results[\"true_state\"][:, 1],\n", " \"b-\",\n", " label=\"True Trajectory\",\n", " linewidth=2,\n", " alpha=0.7)\n", "ax.plot(\n", " results[\"estimate\"][:, 0],\n", " results[\"estimate\"][:, 1],\n", " \"r--\",\n", " label=\"Estimated Trajectory\",\n", " linewidth=2,\n", " alpha=0.7)\n", "ax.scatter(\n", " results[\"reference\"][:, 0],\n", " results[\"reference\"][:, 1],\n", " c=\"green\",\n", " s=100,\n", " marker=\"*\",\n", " label=\"Waypoints\",\n", " zorder=5)\n", "ax.set_xlabel(\"X Position (m)\", fontsize=11)\n", "ax.set_ylabel(\"Y Position (m)\", fontsize=11)\n", "ax.set_title(\n", " \"2D Trajectory Tracking (Callback Interface)\", fontsize=13, fontweight=\"bold\"\n", ")\n", "ax.legend()\n", "ax.grid(True, alpha=0.3)\n", "ax.axis(\"equal\")\n", "\n", "# Plot 2: X Position vs Time\n", "ax = axs[0, 1]\n", "ax.plot(\n", " results[\"time\"],\n", " results[\"true_state\"][:, 0],\n", " \"b-\",\n", " label=\"True X\",\n", " alpha=0.7)\n", "ax.plot(\n", " results[\"time\"],\n", " results[\"estimate\"][:, 0],\n", " \"r--\",\n", " label=\"Estimated X\",\n", " alpha=0.7)\n", "ax.scatter(\n", " results[\"time\"][::10],\n", " results[\"measurement\"][::10, 0],\n", " c=\"gray\",\n", " s=10,\n", " alpha=0.3,\n", " label=\"Measurements\")\n", "ax.set_ylabel(\"X Position (m)\", fontsize=11)\n", "ax.set_title(\"X Position Over Time\", fontsize=13, fontweight=\"bold\")\n", "ax.legend()\n", "ax.grid(True, alpha=0.3)\n", "\n", "# Plot 3: Y Position vs Time\n", "ax = axs[1, 0]\n", "ax.plot(\n", " results[\"time\"],\n", " results[\"true_state\"][:, 1],\n", " \"b-\",\n", " label=\"True Y\",\n", " alpha=0.7)\n", "ax.plot(\n", " results[\"time\"],\n", " results[\"estimate\"][:, 1],\n", " \"r--\",\n", " label=\"Estimated Y\",\n", " alpha=0.7)\n", "ax.scatter(\n", " results[\"time\"][::10],\n", " results[\"measurement\"][::10, 1],\n", " c=\"gray\",\n", " s=10,\n", " alpha=0.3,\n", " label=\"Measurements\")\n", "ax.set_ylabel(\"Y Position (m)\", fontsize=11)\n", "ax.set_title(\"Y Position Over Time\", fontsize=13, fontweight=\"bold\")\n", "ax.legend()\n", "ax.grid(True, alpha=0.3)\n", "\n", "# Plot 4: Heading vs Time\n", "ax = axs[1, 1]\n", "ax.plot(\n", " results[\"time\"],\n", " np.rad2deg(results[\"true_state\"][:, 2]),\n", " \"b-\",\n", " label=\"True Heading\",\n", " alpha=0.7)\n", "ax.plot(\n", " results[\"time\"],\n", " np.rad2deg(results[\"estimate\"][:, 2]),\n", " \"r--\",\n", " label=\"Estimated Heading\",\n", " alpha=0.7)\n", "ax.set_ylabel(\"Heading (degrees)\", fontsize=11)\n", "ax.set_title(\"Heading Over Time\", fontsize=13, fontweight=\"bold\")\n", "ax.legend()\n", "ax.grid(True, alpha=0.3)\n", "\n", "# Plot 5: Velocity Commands\n", "ax = axs[2, 0]\n", "ax.plot(\n", " results[\"time\"],\n", " results[\"command\"][:, 0],\n", " \"g-\",\n", " label=\"Linear Velocity\",\n", " linewidth=1.5)\n", "ax.set_ylabel(\"Linear Velocity (m/s)\", fontsize=11)\n", "ax.set_xlabel(\"Time (s)\", fontsize=11)\n", "ax.set_title(\"Control Commands\", fontsize=13, fontweight=\"bold\")\n", "ax.legend(loc=\"upper left\")\n", "ax.grid(True, alpha=0.3)\n", "\n", "ax_omega = ax.twinx()\n", "ax_omega.plot(\n", " results[\"time\"],\n", " results[\"command\"][:, 1],\n", " \"purple\",\n", " label=\"Angular Velocity\",\n", " linewidth=1.5,\n", " alpha=0.7)\n", "ax_omega.set_ylabel(\"Angular Velocity (rad/s)\", fontsize=11, color=\"purple\")\n", "ax_omega.tick_params(axis=\"y\", labelcolor=\"purple\")\n", "ax_omega.legend(loc=\"upper right\")\n", "\n", "# Plot 6: Position Estimation Error\n", "ax = axs[2, 1]\n", "position_error = np.sqrt(\n", " results[\"estimation_error\"][:, 0] ** 2 + results[\"estimation_error\"][:, 1] ** 2\n", ")\n", "ax.plot(\n", " results[\"time\"],\n", " position_error,\n", " \"m-\",\n", " label=\"Position Error\",\n", " linewidth=1.5)\n", "ax.set_xlabel(\"Time (s)\", fontsize=11)\n", "ax.set_ylabel(\"Position Error (m)\", fontsize=11)\n", "ax.set_title(\"Estimation Error Magnitude\", fontsize=13, fontweight=\"bold\")\n", "ax.legend()\n", "ax.grid(True, alpha=0.3)\n", "\n", "plt.tight_layout()\n", "plt.show()\n", "\n", "print(f\"Mean position error: {np.mean(position_error):.4f} m\")\n", "print(f\"Max position error: {np.max(position_error):.4f} m\")" ] }, { "cell_type": "markdown", "id": "2d89f939", "metadata": {}, "source": [ "## Experimentation\n", "\n", "Now that we have a working system, try experimenting with different parameters:\n", "\n", "**Noise tuning**:\n", "- Increase measurement noise `R` to simulate worse odometry (wheel slip, rough terrain)\n", "- Increase process noise `Q` to account for model uncertainty\n", "- Observe how the filter trades off model prediction vs measurements\n", "\n", "**Controller tuning**:\n", "- Adjust `Kv` and `Komega` to change aggressiveness\n", "- Try different waypoint patterns (figure-eight, circle, random)\n", "- Add velocity limits to simulate real robot constraints\n", "\n", "**Observer comparison**:\n", "- Run simulation with KF disabled (use raw measurements)\n", "- Compare estimation error with and without filtering\n", "- Visualize covariance ellipses over time\n", "\n", "**Challenge**: Can you tune Q and R to minimize position error while maintaining smooth estimates?" ] }, { "cell_type": "markdown", "id": "446135f6", "metadata": {}, "source": [ "[← Control Systems as Dynamical Systems](../../../getting_started/theory_to_python/control_systems_as_dynamical_systems.rst)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" } }, "nbformat": 4, "nbformat_minor": 5 }