← Control Systems as Dynamical Systems

Example: TurtleBot with Noisy Odometry

In this notebook, we model a TurtleBot3 navigating towards waypoints with noisy odometry data and use pykal to recast it as a composition of dynamical systems.

System Overview

We consider uneven terrain and wheel slip as process noise, and measurement quantization as measurement noise. The Turtlebot3 offers a high-level controllers that lets us abstract away many details, as we will see below.

Block Diagram

We can model the TurtleBot3 feedback system we are interested in as follows:

../../../_images/turtlebot_feedback_system.svg

where waypoint generator produces reference trajectories (e.g., “move to” coordinates), velocity command is a controller that outputs linear and angular velocity, TurtleBot is our robot with an odometry sensor, that is, with wheel encoders measuring position and orientation, and Kalman Filter is the state observer that fuses control inputs and the measurement from the odometry sensor.

Note

The TurtleBot has an onboard low-level controller that converts linear and angular velocity commands to the to the appropriate wheel speeds. Thus, our “velocity controller” is simply an algorithm that generates such commands.

Discrete-time Dynamical Systems

For a minimal working knowledge of the theory of discrete-time dynamical systems, check out the :doc:"DynamicalSystem" <./dynamical_system> notebook under “Modules” . We assume a basic familiarity with the theory and proceed with casting our diagram blocks as dynamical system blocks.

../../../_images/turtlebot_composition_of_dynamical_systems.svg

We discuss the derivation of each dynamical system block below.

Note

This diagram is techincally a lie: as we will soon see, our waypoint generator will require a state estimate to determine which waypoint it generates. So we should draw another \(\hat{x}\) arrow that feeds into the waypoint generator block. But we don’t. Why? The state estimate \(\hat{x}\) will be passed in as an explicit paramter, and we choose not to model explicity paramters as arrows to keep our diagrams clean. See “Function and Diagram Convention” for more details.

Tip

Notice that we have introduced a naming convention where subscripts on functions denote the block with which they are associated. In particular, the subscript \(k\) is added to all state and output variables. Refer to :doc:"DynamicalSystem" <./dynamical_system> for the naming convention and why it is a good idea to represent functions and variables in such a way.

Block 1: Waypoint Generator

The waypoint generator produces reference poses \((x, y, \theta)\) for the robot to track. For this tutorial, we’ll create a simple square trajectory. To do this, we will define a sequence of poses that represents a square (in this case, four poses), and a method for choosing which pose our generator outputs. We do so by defining our set of poses as a list and having our state be an index of said list.

For a square, our sequence of poses will be

\[ r_i \in \left\{ (l,0,0), (l,l,\frac{\pi}{2}), (0,l,\frac{\pi}),(0,0,\frac{-\pi}{2})\right\} \]

where \(l\) is the length of each side and \(i\) is a list-compatible index (so $i \in \left{0,1,2,3 \right}). We now have the following:

Explicit Parameters:

  • Current list index \(w_k \in \{0, 1, 2, 3\}\) (state)

Evolution Function: $\( w_{k+1} = f(w_k,\dots) = \begin{cases}w_k & \text{ if wayboint not reached} \\ (w_k + 1) \mod 4 & \text{when waypoint reached}\end{cases} \)$

Note that we need to define the “waypoint reached” condition precisely. We can do this by having another function, \(\text{is_waypoint_reached}\), that is True when the current state estimate is within tolerance of the waypoint, and False otherwise; that is,

\[\begin{split} \text{is_waypoint_reached}(r_j,\hat{x}) = \begin{cases} \text{True} & \text{if } ||r_j - \hat{x}||_2 < \text{tolerance} \\ \text{False} & \text{otherwise} \end{cases} \end{split}\]

where tolerance is a variable we can define at runtime. Our evolution function then becomes

\[\begin{split} w_{k+1} = f(w_k) = \begin{cases}w_k & \text{is_waypoint_reached}=\text{False} \\ (w_k + 1) \mod 4 & \text{is_waypoint_reached} = \text{True}\end{cases} \end{split}\]

Output Function:

\[ h_w(w_k) = r_{w_k} \]

** Implicit Parameters**:

  • tolerance

  • sequence of waypoints

import numpy as np
import matplotlib.pyplot as plt
from pykal import DynamicalSystem
from pykal.data_change import corrupt

# w_k in {0, 1, 2, 3}
indices = [0, 1, 2, 3]

# r_{i} in {r_0, r_1, r_2, r_3}
sequence_of_waypoints = [
    np.array([[2.0], [0.0], [0.0]]),  # Right
    np.array([[2.0], [2.0], [np.pi / 2]]),  # Up
    np.array([[0.0], [2.0], [np.pi]]),  # Left
    np.array([[0.0], [0.0], [-np.pi / 2]]),  # Down (back to start)
]


def fw(
    wk: int, sequence_of_waypoints: list, xhat: np.ndarray, tol: float
) -> int:
    # Get current target waypoint
    rk = sequence_of_waypoints[wk]

    # Check if waypoint pose is reached
    distance = np.linalg.norm(rk - xhat, ord=2)
    is_reached = distance < tol

    if is_reached:
        # Move to next waypoint
        next_idx = (wk + 1) % 4
        return next_idx
    else:
        # Stay at current waypoint
        return wk


def hw(wk: int, sequence_of_waypoints: list) -> np.ndarray:
    return sequence_of_waypoints[wk]


waypoint_generator = DynamicalSystem(f=fw, h=hw, state_name="wk")

# Test the waypoint generator
wk_test = 0  # start at first waypoint
xhat_test = np.array(
    [[0.1], [0.0], [0.0]]
)  # Close to first waypoint (but not close enough per "tol")

# Not yet reached (far away)
wk_next, rk = waypoint_generator.step(
    params={
        "wk": wk_test,
        "xhat": xhat_test,
        "sequence_of_waypoints": sequence_of_waypoints,
        "tol": 0.05,
    })
print(" Not reached:")
print(f"  Current waypoint index: {wk_next}")
print(f"  Reference pose: {rk.flatten()}")

# Waypoint reached (within tolerance)
xhat_reached = np.array([[2.01], [0.01], [0.0]])  # within "tol" of first waypoint
wk_next, rk = waypoint_generator.step(
    params={
        "wk": wk_next,
        "xhat": xhat_reached,
        "sequence_of_waypoints": sequence_of_waypoints,
        "tol": 0.05,
    })
print("Reached! Moving to next waypoint.):")
print(f"  Next waypoint index: {wk_next")
print(f"  Reference pose: {rk.flatten()}")
  Cell In[1], line 71
    print(f"  Next waypoint index: {wk_next")
                                           ^
SyntaxError: f-string: expecting '}'

Block 2: Velocity Command Generator

The velocity controller generates \((v_{\text{cmd}}, \omega_{\text{cmd}})\) – linear and angular velocity commands – to drive the TurtleBot3 towards the reference pose \(r_k = (x_r, y_r, \theta_r)\). We use a simple proportional controller based on the distance to the goal and the heading error.

Since a proportional controller has no internal state (no memory), it is a stateless dynamical system with only an output function \(h\) (recall that \(f: \varnothing \to \varnothing\) for stateless systems).

Explicit Parameters:

  • Current state estimate \(\hat{x}_k = (\hat{x}, \hat{y}, \hat{\theta}, \dots)\)

  • Reference pose \(r_k = (x_r, y_r, \theta_r)\)

Evolution Function: $\( f = \varnothing \quad \text{(no state evolution)} \)$

Output Function: $\( h_c(\hat{x}_k, r_k) = \begin{bmatrix} v_{\text{cmd}} \\ \omega_{\text{cmd}} \end{bmatrix} = \begin{bmatrix} \text{sat}(K_v \cdot d, v_{\max}) \\ \text{sat}(K_\omega \cdot e_\theta, \omega_{\max}) \end{bmatrix} \)$

where:

  • \(d = \sqrt{(x_r - \hat{x})^2 + (y_r - \hat{y})^2}\) is the Euclidean distance to the goal

  • \(e_\theta = \text{wrap}(\theta_r - \hat{\theta})\) is the heading error wrapped to \([-\pi, \pi]\)

  • \(\text{sat}(\cdot, \max)\) denotes saturation to respect physical velocity limits

Implicit Parameters:

  • \(K_v\): Linear velocity gain

  • \(K_\omega\): Angular velocity gain

  • \(v_{\max}\): Maximum linear velocity (TurtleBot3: 0.22 m/s)

  • \(\omega_{\max}\): Maximum angular velocity (TurtleBot3: 2.84 rad/s)

Note

We use a proportional controller rather than a full PID controller because the TurtleBot’s dynamics are slow and well-damped. For faster or more complex systems, integral and derivative terms would be needed (see the car cruise control example).

def hc(
    xhat: np.ndarray,
    r: np.ndarray,
    Kv: float,
    Komega: float,
    max_v: float,
    max_omega: float) -> np.ndarray:
    # Extract positions (first 3 elements in case xhat has velocities)
    x, y, theta = xhat.flatten()
    x_r, y_r, theta_r = r.flatten()

    # Distance to goal
    dx = x_r - x
    dy = y_r - y
    distance = np.sqrt(dx**2 + dy**2)

    # Heading error (wrap to [-pi, pi])
    heading_error = theta_r - theta
    heading_error = np.arctan2(np.sin(heading_error), np.cos(heading_error))

    # Proportional control
    v_cmd = Kv * distance
    omega_cmd = Komega * heading_error

    # Saturate commands
    v_cmd = np.clip(v_cmd, -max_v, max_v)
    omega_cmd = np.clip(omega_cmd, -max_omega, max_omega)

    return np.array([[v_cmd], [omega_cmd]])


# Create the controller DynamicalSystem (stateless, so f=None)
velocity_controller = DynamicalSystem(h=hc)

# Test the controller
xhat_test = np.array([[0.0], [0.0], [0.0]])  # At origin
r_test = np.array([[1.0], [1.0], [np.pi / 4]])  # Target pose

uk = velocity_controller.step(
    params={
        "xhat": xhat_test,
        "r": r_test,
        "Kv": 0.5,
        "Komega": 1.0,
        "max_v": 0.22,  # TurtleBot max linear velocity
        "max_omega": 2.84,  # TurtleBot max angular velocity
    }
)
print("Controller output:")
print(f"  v_cmd = {uk[0,0]:.3f} m/s")
print(f"  omega_cmd = {uk[1,0]:.3f} rad/s")

Block 3: TurtleBot (Plant)

The TurtleBot3 follows the unicycle kinematics model, a standard model for differential-drive robots. The robot’s state includes both its pose (position and orientation) and its velocities.

Explicit Parameters:

  • Current state \(x_k = [x, y, \theta, v, \omega]^T\) (5-dimensional state)

  • Control input \(u_k = [v_{\text{cmd}}, \omega_{\text{cmd}}]^T\)

  • Timestep \(\Delta t\)

Evolution Function (using Euler integration): $\( f_p(x_k, u_k, \Delta t) = \begin{bmatrix} x + v \cos(\theta) \Delta t \\ y + v \sin(\theta) \Delta t \\ \text{wrap}(\theta + \omega \Delta t) \\ v_{\text{cmd}} \\ \omega_{\text{cmd}} \end{bmatrix} \)$

where \(\text{wrap}(\cdot)\) normalizes angles to \([-\pi, \pi]\) using \(\text{atan2}(\sin(\theta), \cos(\theta))\).

The last two equations assume instantaneous velocity response – that is, we assume the low-level motor controller perfectly tracks commanded velocities. This is a reasonable approximation for the TurtleBot3 at typical speeds.

Output Function: The wheel odometry sensor measures position and orientation (but not velocity directly): $\( h_p(x_k) = \begin{bmatrix} x \\ y \\ \theta \end{bmatrix} \)$

In practice, this measurement will be corrupted by:

  • Wheel slip (especially on smooth floors or uneven terrain)

  • Quantization from encoder resolution

  • Systematic bias from wheel diameter mismatch

  • Occasional spikes from bumps or obstacles

We will add realistic sensor corruption during simulation using data_change.corrupt.

Implicit Parameters:

  • None (all parameters are explicit for the plant dynamics)

def turtlebot_f(xk: np.ndarray, uk: np.ndarray, dt: float) -> np.ndarray:
    """
    TurtleBot unicycle dynamics (noise-free).

    Parameters
    ----------
    xk : np.ndarray
        Current state [x, y, theta, v, omega], shape (5,1)
    uk : np.ndarray
        Control input [v_cmd, omega_cmd], shape (2,1)
    dt : float
        Timestep (seconds)

    Returns
    -------
    xk_next : np.ndarray
        Next state, shape (5,1)
    """
    x, y, theta, v, omega = xk.flatten()
    v_cmd, omega_cmd = uk.flatten()

    # Euler integration
    x_new = x + v * np.cos(theta) * dt
    y_new = y + v * np.sin(theta) * dt
    theta_new = theta + omega * dt

    # Wrap theta to [-pi, pi]
    theta_new = np.arctan2(np.sin(theta_new), np.cos(theta_new))

    # Update velocities (assume instantaneous response)
    v_new = v_cmd
    omega_new = omega_cmd

    return np.array([[x_new], [y_new], [theta_new], [v_new], [omega_new]])


def turtlebot_h(xk: np.ndarray) -> np.ndarray:
    """
    Measurement function: observe [x, y, theta] from odometry.

    Parameters
    ----------
    xk : np.ndarray
        State [x, y, theta, v, omega], shape (5,1)

    Returns
    -------
    yk : np.ndarray
        Measurement [x, y, theta], shape (3,1)
    """
    return xk[:3, :]


# Create the plant DynamicalSystem
plant = DynamicalSystem(f=turtlebot_f, h=turtlebot_h, state_name="xk")

# Test the plant
xk_test = np.array([[0.0], [0.0], [0.0], [0.0], [0.0]])  # Start at origin, stationary
uk_test = np.array([[0.2], [0.5]])  # Move forward and turn
dt = 0.1

xk_next, yk = plant.step(
    params={"xk": xk_test, "uk": uk_test, "dt": dt})
print("After one step:")
print(f"  State: {xk_next.flatten()}")
print(f"  Measurement: {yk.flatten()}")

Block 4: Kalman Filter (Observer)

Since the TurtleBot dynamics are nonlinear (due to \(\cos(\theta)\) and \(\sin(\theta)\) in the evolution function), we use the Extended Kalman Filter (EKF), which linearizes the dynamics around the current state estimate.

The observer maintains both a state estimate \(\hat{x}_k\) and a covariance matrix \(P_k\) representing uncertainty. Thus, the observer state is the tuple \((\hat{x}_k, P_k)\).

Explicit Parameters:

  • Current estimate and covariance: \(\text{xhat_P}_k = (\hat{x}_k, P_k)\)

  • Measurement \(y_k\) from the odometry sensor

  • Control input \(u_k\) (same as applied to the plant)

Evolution Function: The EKF performs a predict-update cycle:

Predict step: $\( \begin{aligned} \hat{x}_k^- &= f_p(\hat{x}_{k-1}, u_{k-1}, \Delta t) \quad \text{(state prediction)} \\ P_k^- &= F_k P_{k-1} F_k^T + Q_k \quad \text{(covariance prediction)} \end{aligned} \)$

Update step: $\( \begin{aligned} \nu_k &= y_k - h_p(\hat{x}_k^-) \quad \text{(innovation)} \\ S_k &= H_k P_k^- H_k^T + R_k \quad \text{(innovation covariance)} \\ K_k &= P_k^- H_k^T S_k^{-1} \quad \text{(Kalman gain)} \\ \hat{x}_k &= \hat{x}_k^- + K_k \nu_k \quad \text{(state update)} \\ P_k &= (I - K_k H_k) P_k^- (I - K_k H_k)^T + K_k R_k K_k^T \quad \text{(covariance update)} \end{aligned} \)$

The evolution function is: $\( f_{\text{kf}}(\text{xhat_P}_k, y_k, u_k) = (\hat{x}_{k+1}, P_{k+1}) \)$

Output Function: $\( h_{\text{kf}}(\text{xhat_P}_k) = \hat{x}_k \quad \text{(extract state estimate)} \)$

Implicit Parameters:

Linearization Jacobians:

  • \(F_k = \frac{\partial f_p}{\partial x}\bigg|_{\hat{x}_k}\): State transition Jacobian (5×5) $\( F_k = \begin{bmatrix} 1 & 0 & -v \sin(\theta) \Delta t & \cos(\theta) \Delta t & 0 \\ 0 & 1 & v \cos(\theta) \Delta t & \sin(\theta) \Delta t & 0 \\ 0 & 0 & 1 & 0 & \Delta t \\ 0 & 0 & 0 & 1 & 0 \\ 0 & 0 & 0 & 0 & 1 \end{bmatrix} \)$

  • \(H_k = \frac{\partial h_p}{\partial x}\bigg|_{\hat{x}_k}\): Measurement Jacobian (3×5, constant for this system) $\( H_k = \begin{bmatrix} 1 & 0 & 0 & 0 & 0 \\ 0 & 1 & 0 & 0 & 0 \\ 0 & 0 & 1 & 0 & 0 \end{bmatrix} \)$

Noise Covariances:

  • \(Q_k\): Process noise covariance (5×5) – models uncertainty in the dynamics (wheel slip, unmodeled effects)

  • \(R_k\): Measurement noise covariance (3×3) – models odometry sensor noise

Plant dynamics functions (needed for prediction):

  • \(f_p\): Plant evolution function

  • \(h_p\): Plant measurement function

Note

The EKF implementation is provided in pykal.algorithm_library.estimators.kf. It uses the Joseph form for covariance update to ensure numerical stability and maintains symmetry through explicit symmetrization.

from pykal.algorithm_library.estimators import kf


def compute_F_jacobian(xhat: np.ndarray, dt: float) -> np.ndarray:
    """
    Compute Jacobian of dynamics for TurtleBot.

    Parameters
    ----------
    xhat : np.ndarray
        Current state estimate [x, y, theta, v, omega], shape (5,1)
    dt : float
        Timestep

    Returns
    -------
    F : np.ndarray
        Jacobian matrix, shape (5, 5)
    """
    _, _, theta, v, _ = xhat.flatten()

    F = np.array(
        [
            [1, 0, -v * np.sin(theta) * dt, np.cos(theta) * dt, 0],
            [0, 1, v * np.cos(theta) * dt, np.sin(theta) * dt, 0],
            [0, 0, 1, 0, dt],
            [0, 0, 0, 1, 0],
            [0, 0, 0, 0, 1],
        ]
    )

    return F


def compute_H_jacobian() -> np.ndarray:
    """
    Compute Jacobian of measurement function (constant for this system).

    Returns
    -------
    H : np.ndarray
        Jacobian matrix, shape (3, 5)
    """
    return np.array([[1, 0, 0, 0, 0], [0, 1, 0, 0, 0], [0, 0, 1, 0, 0]])


# Process and measurement noise covariances
Q_turtlebot = np.diag([0.01, 0.01, 0.02, 0.1, 0.1])  # Process noise
R_turtlebot = np.diag([0.05, 0.05, 0.1])  # Odometry noise (5cm, 5cm, ~6 degrees)

# Create the observer DynamicalSystem
observer = DynamicalSystem(f=kf.f, h=kf.h, state_name="xhat_P")

# Test the observer (single step)
xhat_0 = np.array([[0.0], [0.0], [0.0], [0.0], [0.0]])
P_0 = np.diag([0.1, 0.1, 0.1, 1.0, 1.0])
xhat_P_test = (xhat_0, P_0)

# Simulate receiving a noisy measurement
yk_noisy = np.array([[0.02], [0.01], [0.05]])  # Noisy measurement

xhat_P_new, xhat_out = observer.step(
    params={
        "xhat_P": xhat_P_test,
        "yk": yk_noisy,
        "f": turtlebot_f,
        "f_params": {"xk": xhat_0, "uk": np.array([[0.0], [0.0]]), "dt": 0.1},
        "h": turtlebot_h,
        "h_params": {"xk": xhat_0},
        "Fk": compute_F_jacobian(xhat_0, dt=0.1),
        "Qk": Q_turtlebot,
        "Hk": compute_H_jacobian(),
        "Rk": R_turtlebot,
    })

print("Observer estimate after one measurement:")
print(xhat_out.flatten())

Simulation

../../../_images/turtlebot_composition_of_dynamical_systems.svg

We now simulate the complete closed-loop system, integrating all four dynamical components:

  1. Waypoint Generator → reference pose \(r_k\)

  2. Velocity Controller → velocity commands \(u_k\) (using \(r_k\) and \(\hat{x}_k\))

  3. TurtleBot Plant → true state evolution and noisy measurements \(y_k\)

  4. Kalman Filter → state estimate \(\hat{x}_k\) (using \(u_k\) and \(y_k\))

System Parameters

# Time parameters
dt = 0.1  # Sampling time (seconds)
switch_time = 15.0  # Time at each waypoint (seconds)

# Controller gains
Kv = 0.5  # Linear velocity gain
Komega = 1.5  # Angular velocity gain

# Kalman filter parameters
Q = np.diag([0.01, 0.01, 0.02, 0.1, 0.1])  # Process noise covariance
R = np.diag([0.05, 0.05, 0.1])  # Measurement noise covariance

Initial Conditions

# Initial states
wk = np.array([[0]])  # Waypoint generator state: start at first waypoint
xk = np.array([[0.0], [0.0], [0.0], [0.0], [0.0]])  # Plant state: start at origin
xhat = np.array([[0.0], [0.0], [0.0], [0.0], [0.0]])  # Observer estimate
P = np.diag([0.1, 0.1, 0.1, 1.0, 1.0])  # Covariance matrix
xhat_P = (xhat, P)  # Observer state tuple

# Waypoint reached tolerance
waypoint_tol = 0.3  # Distance tolerance in meters

# Controller gains (used as parameters)
Kv = 0.5
Komega = 1.5
max_v = 0.22
max_omega = 2.84

# Storage for plotting
time_hist = []
reference_hist = []
true_state_hist = []
estimate_hist = []
measurement_hist = []
command_hist = []
error_hist = []

Simulate

Hide code cell source

# Simulation time
T_sim = 60.0  # seconds
time_steps = np.arange(0, T_sim, dt)

# Run closed-loop simulation using all four DynamicalSystem components
for tk in time_steps:
    # 1. Waypoint generator step
    wk, rk = waypoint_generator.step(
        params={
            "wk": wk,
            "xhat": xhat,
            "waypoint_values": sequence_of_waypoints,
            "tol": waypoint_tol,
        })

    # 2. Controller step (generates velocity commands)
    uk = velocity_controller.step(
        params={
            "xhat": xhat,
            "r": rk,
            "Kv": Kv,
            "Komega": Komega,
            "max_v": max_v,
            "max_omega": max_omega,
        }
    )

    # 3. Plant step (true dynamics)
    xk, yk_clean = plant.step(
        params={"xk": xk, "uk": uk, "dt": dt})

    # 4. Add measurement noise (realistic corruption)
    yk_flat = yk_clean.flatten()

    # X: bias (wheel diameter mismatch) + Gaussian noise
    yk_corrupted_x = corrupt.with_bias(yk_flat[0], bias=0.02)
    yk_corrupted_x = yk_corrupted_x + np.random.normal(0, np.sqrt(R[0, 0]))

    # Y: Gaussian noise
    yk_corrupted_y = yk_flat[1] + np.random.normal(0, np.sqrt(R[1, 1]))

    # Theta: Gaussian noise + occasional spikes (bumps)
    yk_corrupted_theta = yk_flat[2] + np.random.normal(0, np.sqrt(R[2, 2]))
    if np.random.rand() < 0.02:  # 2% spike rate
        yk_corrupted_theta += np.random.choice([-1, 1]) * 0.3

    yk_noisy = np.array([[yk_corrupted_x], [yk_corrupted_y], [yk_corrupted_theta]])

    # 5. Extract current estimate for observer step
    xhat = observer.h(xhat_P)

    # 6. Observer step (EKF)
    Fk = compute_F_jacobian(xhat, dt)
    Hk = compute_H_jacobian()

    xhat_P, xhat_obs = observer.step(
        params={
            "xhat_P": xhat_P,
            "yk": yk_noisy,
            "f": turtlebot_f,
            "f_params": {"xk": xhat, "uk": uk, "dt": dt},
            "h": turtlebot_h,
            "h_params": {"xk": xhat},
            "Fk": Fk,
            "Qk": Q,
            "Hk": Hk,
            "Rk": R,
        })

    # Store results
    time_hist.append(tk)
    reference_hist.append(rk.flatten())
    true_state_hist.append(xk.flatten())
    estimate_hist.append(xhat_obs.flatten())
    measurement_hist.append(yk_noisy.flatten())
    command_hist.append(uk.flatten())
    error_hist.append((xk - xhat_obs).flatten())

# Convert to numpy arrays
reference_hist = np.array(reference_hist)
true_state_hist = np.array(true_state_hist)
estimate_hist = np.array(estimate_hist)
measurement_hist = np.array(measurement_hist)
command_hist = np.array(command_hist)
error_hist = np.array(error_hist)

Visualize

We can visualize the pertinent values of our system to assure correct behavior. Note how the Kalman filter smooths out the noisy measurements and provides accurate state estimates even in the presence of sensor corruption.

Hide code cell source

fig, axs = plt.subplots(3, 2, figsize=(14, 12))

# Plot 1: 2D Trajectory
ax = axs[0, 0]
ax.plot(
    true_state_hist[:, 0],
    true_state_hist[:, 1],
    "b-",
    label="True Trajectory",
    linewidth=2,
    alpha=0.7)
ax.plot(
    estimate_hist[:, 0],
    estimate_hist[:, 1],
    "r--",
    label="Estimated Trajectory",
    linewidth=2,
    alpha=0.7)
ax.scatter(
    reference_hist[:, 0],
    reference_hist[:, 1],
    c="green",
    s=100,
    marker="*",
    label="Waypoints",
    zorder=5)
ax.set_xlabel("X Position (m)", fontsize=11)
ax.set_ylabel("Y Position (m)", fontsize=11)
ax.set_title("2D Trajectory Tracking", fontsize=13, fontweight="bold")
ax.legend()
ax.grid(True, alpha=0.3)
ax.axis("equal")

# Plot 2: X Position vs Time
ax = axs[0, 1]
ax.plot(time_hist, true_state_hist[:, 0], "b-", label="True X", alpha=0.7)
ax.plot(time_hist, estimate_hist[:, 0], "r--", label="Estimated X", alpha=0.7)
ax.scatter(
    time_hist[::10],
    measurement_hist[::10, 0],
    c="gray",
    s=10,
    alpha=0.3,
    label="Measurements")
ax.set_ylabel("X Position (m)", fontsize=11)
ax.set_title("X Position Over Time", fontsize=13, fontweight="bold")
ax.legend()
ax.grid(True, alpha=0.3)

# Plot 3: Y Position vs Time
ax = axs[1, 0]
ax.plot(time_hist, true_state_hist[:, 1], "b-", label="True Y", alpha=0.7)
ax.plot(time_hist, estimate_hist[:, 1], "r--", label="Estimated Y", alpha=0.7)
ax.scatter(
    time_hist[::10],
    measurement_hist[::10, 1],
    c="gray",
    s=10,
    alpha=0.3,
    label="Measurements")
ax.set_ylabel("Y Position (m)", fontsize=11)
ax.set_title("Y Position Over Time", fontsize=13, fontweight="bold")
ax.legend()
ax.grid(True, alpha=0.3)

# Plot 4: Heading vs Time
ax = axs[1, 1]
ax.plot(
    time_hist,
    np.rad2deg(true_state_hist[:, 2]),
    "b-",
    label="True Heading",
    alpha=0.7)
ax.plot(
    time_hist,
    np.rad2deg(estimate_hist[:, 2]),
    "r--",
    label="Estimated Heading",
    alpha=0.7)
ax.set_ylabel("Heading (degrees)", fontsize=11)
ax.set_title("Heading Over Time", fontsize=13, fontweight="bold")
ax.legend()
ax.grid(True, alpha=0.3)

# Plot 5: Velocity Commands
ax = axs[2, 0]
ax.plot(
    time_hist,
    command_hist[:, 0],
    "g-",
    label="Linear Velocity",
    linewidth=1.5)
ax.set_ylabel("Linear Velocity (m/s)", fontsize=11)
ax.set_xlabel("Time (s)", fontsize=11)
ax.set_title("Control Commands", fontsize=13, fontweight="bold")
ax.legend(loc="upper left")
ax.grid(True, alpha=0.3)

ax_omega = ax.twinx()
ax_omega.plot(
    time_hist,
    command_hist[:, 1],
    "purple",
    label="Angular Velocity",
    linewidth=1.5,
    alpha=0.7)
ax_omega.set_ylabel("Angular Velocity (rad/s)", fontsize=11, color="purple")
ax_omega.tick_params(axis="y", labelcolor="purple")
ax_omega.legend(loc="upper right")

# Plot 6: Position Estimation Error
ax = axs[2, 1]
position_error = np.sqrt(error_hist[:, 0] ** 2 + error_hist[:, 1] ** 2)
ax.plot(time_hist, position_error, "m-", label="Position Error", linewidth=1.5)
ax.set_xlabel("Time (s)", fontsize=11)
ax.set_ylabel("Position Error (m)", fontsize=11)
ax.set_title("Estimation Error Magnitude", fontsize=13, fontweight="bold")
ax.legend()
ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print(f"Mean position error: {np.mean(position_error):.4f} m")
print(f"Max position error: {np.max(position_error):.4f} m")

Now we can run the same simulation with a much cleaner interface. All system parameters can be configured at initialization, and the simulation loop becomes trivial:

Hide code cell source

# Visualize results
fig, axs = plt.subplots(3, 2, figsize=(14, 12))

# Plot 1: 2D Trajectory
ax = axs[0, 0]
ax.plot(
    results["true_state"][:, 0],
    results["true_state"][:, 1],
    "b-",
    label="True Trajectory",
    linewidth=2,
    alpha=0.7)
ax.plot(
    results["estimate"][:, 0],
    results["estimate"][:, 1],
    "r--",
    label="Estimated Trajectory",
    linewidth=2,
    alpha=0.7)
ax.scatter(
    results["reference"][:, 0],
    results["reference"][:, 1],
    c="green",
    s=100,
    marker="*",
    label="Waypoints",
    zorder=5)
ax.set_xlabel("X Position (m)", fontsize=11)
ax.set_ylabel("Y Position (m)", fontsize=11)
ax.set_title(
    "2D Trajectory Tracking (Callback Interface)", fontsize=13, fontweight="bold"
)
ax.legend()
ax.grid(True, alpha=0.3)
ax.axis("equal")

# Plot 2: X Position vs Time
ax = axs[0, 1]
ax.plot(
    results["time"],
    results["true_state"][:, 0],
    "b-",
    label="True X",
    alpha=0.7)
ax.plot(
    results["time"],
    results["estimate"][:, 0],
    "r--",
    label="Estimated X",
    alpha=0.7)
ax.scatter(
    results["time"][::10],
    results["measurement"][::10, 0],
    c="gray",
    s=10,
    alpha=0.3,
    label="Measurements")
ax.set_ylabel("X Position (m)", fontsize=11)
ax.set_title("X Position Over Time", fontsize=13, fontweight="bold")
ax.legend()
ax.grid(True, alpha=0.3)

# Plot 3: Y Position vs Time
ax = axs[1, 0]
ax.plot(
    results["time"],
    results["true_state"][:, 1],
    "b-",
    label="True Y",
    alpha=0.7)
ax.plot(
    results["time"],
    results["estimate"][:, 1],
    "r--",
    label="Estimated Y",
    alpha=0.7)
ax.scatter(
    results["time"][::10],
    results["measurement"][::10, 1],
    c="gray",
    s=10,
    alpha=0.3,
    label="Measurements")
ax.set_ylabel("Y Position (m)", fontsize=11)
ax.set_title("Y Position Over Time", fontsize=13, fontweight="bold")
ax.legend()
ax.grid(True, alpha=0.3)

# Plot 4: Heading vs Time
ax = axs[1, 1]
ax.plot(
    results["time"],
    np.rad2deg(results["true_state"][:, 2]),
    "b-",
    label="True Heading",
    alpha=0.7)
ax.plot(
    results["time"],
    np.rad2deg(results["estimate"][:, 2]),
    "r--",
    label="Estimated Heading",
    alpha=0.7)
ax.set_ylabel("Heading (degrees)", fontsize=11)
ax.set_title("Heading Over Time", fontsize=13, fontweight="bold")
ax.legend()
ax.grid(True, alpha=0.3)

# Plot 5: Velocity Commands
ax = axs[2, 0]
ax.plot(
    results["time"],
    results["command"][:, 0],
    "g-",
    label="Linear Velocity",
    linewidth=1.5)
ax.set_ylabel("Linear Velocity (m/s)", fontsize=11)
ax.set_xlabel("Time (s)", fontsize=11)
ax.set_title("Control Commands", fontsize=13, fontweight="bold")
ax.legend(loc="upper left")
ax.grid(True, alpha=0.3)

ax_omega = ax.twinx()
ax_omega.plot(
    results["time"],
    results["command"][:, 1],
    "purple",
    label="Angular Velocity",
    linewidth=1.5,
    alpha=0.7)
ax_omega.set_ylabel("Angular Velocity (rad/s)", fontsize=11, color="purple")
ax_omega.tick_params(axis="y", labelcolor="purple")
ax_omega.legend(loc="upper right")

# Plot 6: Position Estimation Error
ax = axs[2, 1]
position_error = np.sqrt(
    results["estimation_error"][:, 0] ** 2 + results["estimation_error"][:, 1] ** 2
)
ax.plot(
    results["time"],
    position_error,
    "m-",
    label="Position Error",
    linewidth=1.5)
ax.set_xlabel("Time (s)", fontsize=11)
ax.set_ylabel("Position Error (m)", fontsize=11)
ax.set_title("Estimation Error Magnitude", fontsize=13, fontweight="bold")
ax.legend()
ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print(f"Mean position error: {np.mean(position_error):.4f} m")
print(f"Max position error: {np.max(position_error):.4f} m")

Experimentation

Now that we have a working system, try experimenting with different parameters:

Noise tuning:

  • Increase measurement noise R to simulate worse odometry (wheel slip, rough terrain)

  • Increase process noise Q to account for model uncertainty

  • Observe how the filter trades off model prediction vs measurements

Controller tuning:

  • Adjust Kv and Komega to change aggressiveness

  • Try different waypoint patterns (figure-eight, circle, random)

  • Add velocity limits to simulate real robot constraints

Observer comparison:

  • Run simulation with KF disabled (use raw measurements)

  • Compare estimation error with and without filtering

  • Visualize covariance ellipses over time

Challenge: Can you tune Q and R to minimize position error while maintaining smooth estimates?

← Control Systems as Dynamical Systems