← Control Systems as Dynamical Systems

Example: Crazyflie Multi-Sensor Fusion

Suppose we want to stabilize a quadrotor in 3D space. We need accurate position and velocity estimates to maintain stable flight. However, our sensors have complementary strengths and weaknesses: motion capture is precise but slow, the barometer drifts with temperature, and the IMU is noisy but fast.

In this notebook, we extend our state estimation framework to handle multiple asynchronous sensors with different characteristics. The Crazyflie quadrotor fuses motion capture, barometer, and IMU data using a Kalman filter that optimally weights each sensor based on its noise properties. This demonstrates how sensor fusion can achieve better performance than any single sensor alone.

System Overview

We model the quadrotor dynamics, implement sensor-specific noise characteristics, design the multi-sensor observer, and integrate everything into a complete feedback system. This notebook bridges theory and practice for aerial robot state estimation with heterogeneous sensors.

Block Diagram

The Crazyflie system has multiple sensor streams feeding a central estimator:

../../../_images/crazyflie_multisensor_system.svg

where setpoint generator produces reference positions (e.g., hover at [0, 0, 1]), position controller outputs velocity commands to track the reference, Crazyflie is our quadrotor plant with 3D dynamics, motion capture is a 3D position sensor (10 Hz, 5mm accuracy), barometer measures altitude (20 Hz, 10cm accuracy, subject to drift), IMU measures velocity from accelerometer integration (100 Hz, noisy), and Kalman Filter is the state observer that fuses all three sensors optimally.

Note

Different sensors operate at different rates and measure different aspects of the state. The KF intelligently weights each sensor based on its noise characteristics, and it will also take into account asyncroonicty.

Discrete-time Dynamical Systems

For a minimal working knowledge of the theory of discrete-time dynamical systems, check out the :doc:"DynamicalSystem" <./dynamical_system> notebook under “Modules” . We assume a basic familiarity with the theory and proceed with casting our diagram blocks as dynamical system blocks.

../../../_images/crazyflie_composition_of_dynamical_systems.svg

We discuss the derivation of each dynamical system block below.

Note

Note the three sensor blocks all feeding into a single observer. The KF combines their measurements by concatenating them into a single measurement vector. Unlike the TurtleBot’s nonlinear dynamics, the Crazyflie uses a linear constant-velocity model, making the KF exact (no linearization needed).

Block 1: Setpoint Generator

The setpoint generator cycles through a sequence of waypoints, commanding the drone to hover at each position for a specified duration.

Explicit Parameters:

  • Current state \(s_k = [\text{index}, t_{\text{elapsed}}]^T\) where:

    • \(\text{index} \in \{0, 1, \ldots, N-1\}\) is the current waypoint index

    • \(t_{\text{elapsed}}\) is time spent at current waypoint

  • Timestep \(\Delta t\)

Evolution Function: $\( f_s(s_k, \Delta t) = \begin{cases} [\text{index}, t_{\text{elapsed}} + \Delta t]^T & \text{if } t_{\text{elapsed}} < t_{\text{switch}} \\ [(\text{index} + 1) \mod N, 0]^T & \text{otherwise} \end{cases} \)$

The system increments the elapsed time each step. When the elapsed time exceeds \(t_{\text{switch}}\), it advances to the next waypoint (wrapping around cyclically) and resets the timer.

Output Function: $\( h_s(s_k) = r_k = \text{waypoints}[\text{index}] \)$

The output is the 3D position corresponding to the current waypoint index.

Implicit Parameters:

  • Waypoints list: \(\{[x_0, y_0, z_0]^T, [x_1, y_1, z_1]^T, \ldots, [x_{N-1}, y_{N-1}, z_{N-1}]^T\}\)

  • \(t_{\text{switch}}\): Time to spend at each waypoint before switching (seconds)

  • \(N\): Total number of waypoints

Note

This is a hybrid dynamical system with discrete (waypoint index) and continuous (elapsed time) state components. The discrete transitions happen at fixed time intervals, creating a piecewise-constant reference trajectory.

import numpy as np
import matplotlib.pyplot as plt
from pykal import DynamicalSystem
from pykal.data_change import corrupt, prepare  # For realistic sensor noise


# Create hover trajectory: positions in meters
hover_waypoints = [
    np.array([[0.0], [0.0], [1.0]]),  # Start: hover at 1m altitude
    np.array([[1.0], [1.0], [1.5]]),  # Move to (1, 1, 1.5)
    np.array([[0.0], [0.0], [1.0]]),  # Return to start
]


def setpoint_f(
    sk: np.ndarray, dt: float, switch_time: float, num_waypoints: int
) -> np.ndarray:
    """
    Evolution function for setpoint generator.

    Parameters
    ----------
    sk : np.ndarray
        Current state [index, time_at_setpoint], shape (2,1)
    dt : float
        Timestep
    switch_time : float
        Time to spend at each setpoint before switching
    num_waypoints : int
        Total number of waypoints

    Returns
    -------
    sk_next : np.ndarray
        Next state [index, time_at_setpoint], shape (2,1)
    """
    idx = int(sk[0, 0])
    time_elapsed = sk[1, 0]

    # Update elapsed time
    time_elapsed += dt

    # Check if we should switch to next waypoint
    if time_elapsed >= switch_time:
        idx = (idx + 1) % num_waypoints
        time_elapsed = 0.0

    return np.array([[idx], [time_elapsed]], dtype=float)


def setpoint_h(sk: np.ndarray, waypoints: list) -> np.ndarray:
    """
    Output function for setpoint generator.

    Parameters
    ----------
    sk : np.ndarray
        Current state [index, time_at_setpoint], shape (2,1)
    waypoints : list
        List of waypoint positions

    Returns
    -------
    rk : np.ndarray
        Reference position [x_r, y_r, z_r], shape (3,1)
    """
    idx = int(sk[0, 0])
    return waypoints[idx]


# Create the setpoint generator DynamicalSystem
setpoint_generator = DynamicalSystem(f=setpoint_f, h=setpoint_h, state_name="sk")

# Test the setpoint generator
sk_test = np.array([[0], [0.0]])  # Start at waypoint 0, time 0

# Step 1: Before switch time
sk_next, rk = setpoint_generator.step(
    params={
        "sk": sk_test,
        "dt": 0.1,
        "switch_time": 1.0,
        "num_waypoints": len(hover_waypoints),
        "waypoints": hover_waypoints,
    })
print("Step 1 (before switch):")
print(f"  State: index={int(sk_next[0,0])}, time={sk_next[1,0]:.1f}s")
print(f"  Setpoint: {rk.flatten()}")

# Step 2: After accumulating enough time (simulate multiple steps)
sk_current = sk_test.copy()
for _ in range(11):  # 11 steps * 0.1s = 1.1s total
    sk_current, rk = setpoint_generator.step(
        params={
            "sk": sk_current,
            "dt": 0.1,
            "switch_time": 1.0,
            "num_waypoints": len(hover_waypoints),
            "waypoints": hover_waypoints,
        })

print("\nStep 2 (after switch):")
print(f"  State: index={int(sk_current[0,0])}, time={sk_current[1,0]:.1f}s")
print(f"  Setpoint: {rk.flatten()}")
---------------------------------------------------------------------------
ImportError                               Traceback (most recent call last)
Cell In[1], line 4
      2 import matplotlib.pyplot as plt
      3 from pykal import DynamicalSystem
----> 4 from pykal.data_change import corrupt, prepare  # For realistic sensor noise
      7 # Create hover trajectory: positions in meters
      8 hover_waypoints = [
      9     np.array([[0.0], [0.0], [1.0]]),  # Start: hover at 1m altitude
     10     np.array([[1.0], [1.0], [1.5]]),  # Move to (1, 1, 1.5)
     11     np.array([[0.0], [0.0], [1.0]]),  # Return to start
     12 ]

ImportError: cannot import name 'prepare' from 'pykal.data_change' (/home/docs/checkouts/readthedocs.org/user_builds/pykal/envs/latest/lib/python3.12/site-packages/pykal/data_change.py)

Block 2: Position Controller

The position controller generates velocity commands to drive the drone toward the reference position using proportional control.

Explicit Parameters:

  • Estimated position \(\hat{p}_k = [\hat{x}, \hat{y}, \hat{z}]^T\) (extracted from state estimate)

  • Reference position \(r_k = [x_r, y_r, z_r]^T\)

Evolution Function: $\( f_c = \varnothing \quad \text{(stateless controller)} \)$

This is a stateless dynamical system (pure feedback, no memory).

Output Function: $\( h_c(\hat{p}_k, r_k, K_p) = \vec{v}_{\text{cmd}} = \text{sat}(K_p (r_k - \hat{p}_k), v_{\max}) \)$

where \(\text{sat}(\cdot, v_{\max})\) clips each velocity component to \([-v_{\max}, v_{\max}]\).

Implicit Parameters:

  • \(K_p = 0.8\): Proportional gain (units: 1/s)

  • \(v_{\max} = 0.5\) m/s: Maximum velocity command (safety limit)

Note

Unlike the car’s PID controller, this controller is stateless because quadrotor position control doesn’t require integral or derivative terms at this level. The inner velocity control loop (not modeled here) handles the dynamic response. A pure P controller suffices for position tracking when the plant has velocity as a direct input.

def controller_h(
    phat: np.ndarray,
    r: np.ndarray,
    Kp: float,
    max_vel: float) -> np.ndarray:
    """
    Output function for position controller (stateless).

    Parameters
    ----------
    phat : np.ndarray
        Estimated position [x, y, z], shape (3,1)
    r : np.ndarray
        Reference position [x_r, y_r, z_r], shape (3,1)
    Kp : float
        Proportional gain
    max_vel : float
        Maximum velocity (m/s)

    Returns
    -------
    v_cmd : np.ndarray
        Velocity command [vx, vy, vz], shape (3,1)
    """
    # Position error
    error = r - phat

    # Proportional control
    v_cmd = Kp * error

    # Saturate
    v_cmd = np.clip(v_cmd, -max_vel, max_vel)

    return v_cmd


# Create the controller DynamicalSystem (stateless, so f=None)
controller = DynamicalSystem(h=controller_h)

# Test the controller
phat_test = np.array([[0.0], [0.0], [0.8]])
r_test = np.array([[1.0], [0.5], [1.0]])

v_cmd = controller.step(
    params={
        "phat": phat_test,
        "r": r_test,
        "Kp": 0.8,
        "max_vel": 0.5,
    }
)
print("Controller output:")
print(f"  v_cmd = {v_cmd.flatten()}")

Block 3: Crazyflie Quadrotor (Plant)

The Crazyflie quadrotor dynamics are modeled using a simplified constant-velocity model in 3D space.

Explicit Parameters:

  • Current state \(x_k = [x, y, z, v_x, v_y, v_z]^T\) (position and velocity in 3D)

  • Control input \(u_k = [v_{x,\text{cmd}}, v_{y,\text{cmd}}, v_{z,\text{cmd}}]^T\) (velocity commands)

  • Timestep \(\Delta t\)

Evolution Function (Euler integration): $\( f_p(x_k, u_k, \Delta t) = \begin{bmatrix} x + v_x \Delta t \\ y + v_y \Delta t \\ z + v_z \Delta t \\ v_{x,\text{cmd}} \\ v_{y,\text{cmd}} \\ v_{z,\text{cmd}} \end{bmatrix} \)$

The position integrates the current velocity, while the velocity instantaneously tracks the commanded velocity. This assumes a fast inner-loop velocity controller that we don’t model explicitly.

Output Function (multiple sensors):

Motion Capture (Mocap): $\( h_{\text{mocap}}(x_k) = \begin{bmatrix} x \\ y \\ z \end{bmatrix} \)$

Barometer: $\( h_{\text{baro}}(x_k) = z \)$

IMU (Accelerometer-derived velocity): $\( h_{\text{IMU}}(x_k) = \begin{bmatrix} v_x \\ v_y \\ v_z \end{bmatrix} \)$

These three measurement functions extract different aspects of the state for sensor fusion.

Implicit Parameters:

  • None (all dynamics are explicit; this is a kinematic model)

Note

This is a linear system, making the Kalman filter exact (no linearization needed). The assumption of instantaneous velocity response is valid because the Crazyflie’s onboard controller has a much faster response time (~10 ms) than our outer position loop (~100 ms).

def crazyflie_f(xk: np.ndarray, uk: np.ndarray, dt: float) -> np.ndarray:
    """
    Crazyflie 3D constant-velocity dynamics.

    Parameters
    ----------
    xk : np.ndarray
        State [x, y, z, vx, vy, vz], shape (6,1)
    uk : np.ndarray
        Velocity command [vx_cmd, vy_cmd, vz_cmd], shape (3,1)
    dt : float
        Timestep

    Returns
    -------
    xk_next : np.ndarray
        Next state, shape (6,1)
    """
    pos = xk[:3]
    vel = xk[3:]

    # Update position
    pos_new = pos + vel * dt

    # Update velocity (assume instantaneous response to command)
    vel_new = uk

    return np.vstack([pos_new, vel_new])


def crazyflie_h_mocap(xk: np.ndarray) -> np.ndarray:
    """Motion capture measurement: observe [x, y, z]."""
    return xk[:3]


def crazyflie_h_baro(xk: np.ndarray) -> np.ndarray:
    """Barometer measurement: observe z only."""
    return xk[2:3]


def crazyflie_h_imu(xk: np.ndarray) -> np.ndarray:
    """IMU measurement: observe velocity [vx, vy, vz] (from accelerometer integration)."""
    return xk[3:]


# Create plant DynamicalSystem
crazyflie_plant = DynamicalSystem(f=crazyflie_f, h=crazyflie_h_mocap, state_name="xk")

# Test
xk_test = np.array([[0.0], [0.0], [1.0], [0.0], [0.0], [0.0]])
uk_test = np.array([[0.1], [0.0], [0.05]])
dt = 0.01

xk_next, yk = crazyflie_plant.step(
    params={"xk": xk_test, "uk": uk_test, "dt": dt})
print("After one step:")
print(f"  State: {xk_next.flatten()}")
print(f"  Mocap measurement: {yk.flatten()}")

Block 4: Multi-Sensor Measurements

We have three heterogeneous sensors, each measuring different aspects of the state with different characteristics:

Sensor 1: Motion Capture System

Explicit Parameters:

  • True state \(x_k\)

Output Function: $\( y_{\text{mocap},k} = h_{\text{mocap}}(x_k) + v_{\text{mocap},k} = \begin{bmatrix} x \\ y \\ z \end{bmatrix} + v_{\text{mocap},k} \)$

where \(v_{\text{mocap},k} \sim \mathcal{N}(0, R_{\text{mocap}})\).

Implicit Parameters:

  • Rate: 10 Hz (100 ms period)

  • \(R_{\text{mocap}} = \text{diag}(0.005^2, 0.005^2, 0.005^2)\): Noise covariance (5 mm standard deviation per axis)

  • Characteristics: High accuracy, low rate, line-of-sight required

Sensor 2: Barometric Altimeter

Explicit Parameters:

  • True state \(x_k\)

Output Function: $\( y_{\text{baro},k} = h_{\text{baro}}(x_k) + b_{\text{baro}} + v_{\text{baro},k} = z + b_{\text{baro}} + v_{\text{baro},k} \)$

where \(v_{\text{baro},k} \sim \mathcal{N}(0, R_{\text{baro}})\) and \(b_{\text{baro}}\) is a slow-varying bias due to temperature/pressure drift.

Implicit Parameters:

  • Rate: 20 Hz (50 ms period)

  • \(R_{\text{baro}} = 0.1^2\): Noise covariance (10 cm standard deviation)

  • Bias: \(b_{\text{baro}} \approx 0.02\) m (2 cm offset)

  • Characteristics: Moderate accuracy, subject to drift, independent of lighting

Sensor 3: IMU (Inertial Measurement Unit)

Explicit Parameters:

  • True state \(x_k\)

Output Function: $\( y_{\text{IMU},k} = h_{\text{IMU}}(x_k) + v_{\text{IMU},k} + s_k = \begin{bmatrix} v_x \\ v_y \\ v_z \end{bmatrix} + v_{\text{IMU},k} + s_k \)$

where \(v_{\text{IMU},k} \sim \mathcal{N}(0, R_{\text{IMU}})\) and \(s_k\) represents occasional vibration spikes.

Implicit Parameters:

  • Rate: 100 Hz (10 ms period, though we simulate synchronously)

  • \(R_{\text{IMU}} = \text{diag}(0.1^2, 0.1^2, 0.1^2)\): Noise covariance (0.1 m/s standard deviation)

  • Spike probability: 3% per sample (vibration events)

  • Spike magnitude: ±0.2 m/s

  • Characteristics: High rate, noisy, subject to vibration artifacts

Combined Measurement Vector

For synchronous fusion, we stack all measurements: $\( y_k = \begin{bmatrix} y_{\text{mocap},k} \\ y_{\text{baro},k} \\ y_{\text{IMU},k} \end{bmatrix} = \begin{bmatrix} x \\ y \\ z \\ z \\ v_x \\ v_y \\ v_z \end{bmatrix} + v_k \)$

with block-diagonal noise covariance: $\( R_k = \text{diag}(R_{\text{mocap}}, R_{\text{baro}}, R_{\text{IMU}}) \)$

Note

The three sensors have complementary characteristics: mocap is accurate but slow, the barometer drifts but is always available, and the IMU is fast but noisy. The Kalman filter optimally weighs each sensor based on its noise properties, achieving better performance than any single sensor alone.

def generate_multisensor_measurement(
    xk: np.ndarray,
    R_mocap: np.ndarray,
    R_baro: np.ndarray,
    R_imu: np.ndarray,
    use_realistic_corruption: bool = True) -> tuple:
    """
    Generate noisy measurements from all three sensors.

    Parameters
    ----------
    use_realistic_corruption : bool
        If True, uses data_change methods for sensor-specific corruption.
        If False, uses simple Gaussian noise.

    Returns
    -------
    yk_combined : np.ndarray
        Concatenated measurement vector [mocap(3), baro(1), imu(3)] = (7,1)
    R_combined : np.ndarray
        Block-diagonal noise covariance (7, 7)
    """
    # Clean measurements
    y_mocap = crazyflie_h_mocap(xk)  # [x, y, z]
    y_baro = crazyflie_h_baro(xk)  # [z]
    y_imu = crazyflie_h_imu(xk)  # [vx, vy, vz]

    if use_realistic_corruption:
        # Realistic sensor-specific corruption using data_change

        # Mocap: High precision, Gaussian noise only
        mocap_corrupted = y_mocap.flatten()
        mocap_corrupted = mocap_corrupted + np.random.multivariate_normal(
            np.zeros(3), R_mocap
        )
        y_mocap_noisy = mocap_corrupted.reshape(-1, 1)

        # Barometer: Drift + bias + Gaussian noise
        baro_corrupted = y_baro.item()
        baro_corrupted = corrupt.with_bias(baro_corrupted, bias=0.02)  # 2cm offset
        # Drift is cumulative, so we skip it in single-shot measurements
        baro_corrupted = baro_corrupted + np.random.normal(0, np.sqrt(R_baro[0, 0]))
        y_baro_noisy = np.array([[baro_corrupted]])

        # IMU: Bias + Gaussian noise + occasional spikes (vibration)
        imu_corrupted = y_imu.flatten()
        imu_corrupted = imu_corrupted + np.random.multivariate_normal(
            np.zeros(3), R_imu
        )
        # Add vibration spikes with 3% probability
        if np.random.rand() < 0.03:
            spike_idx = np.random.randint(0, 3)
            imu_corrupted[spike_idx] += np.random.choice([-1, 1]) * 0.2
        y_imu_noisy = imu_corrupted.reshape(-1, 1)

    else:
        # Simple Gaussian noise (original method)
        y_mocap_noisy = y_mocap + np.random.multivariate_normal(
            np.zeros(3), R_mocap
        ).reshape(-1, 1)

        y_baro_noisy = y_baro + np.random.multivariate_normal(
            np.zeros(1), R_baro
        ).reshape(-1, 1)

        y_imu_noisy = y_imu + np.random.multivariate_normal(np.zeros(3), R_imu).reshape(
            -1, 1
        )

    # Concatenate
    yk_combined = np.vstack([y_mocap_noisy, y_baro_noisy, y_imu_noisy])

    # Block-diagonal R
    R_combined = np.block(
        [
            [R_mocap, np.zeros((3, 1)), np.zeros((3, 3))],
            [np.zeros((1, 3)), R_baro, np.zeros((1, 3))],
            [np.zeros((3, 3)), np.zeros((3, 1)), R_imu],
        ]
    )

    return yk_combined, R_combined


# Define sensor noise covariances
R_mocap = np.diag([0.005, 0.005, 0.005])  # 5mm std
R_baro = np.array([[0.1]])  # 10cm std
R_imu = np.diag([0.1, 0.1, 0.1])  # 0.1 m/s std

# Test measurement generation
xk_test = np.array([[0.5], [0.3], [1.0], [0.1], [0.05], [0.0]])
yk_combined, R_combined = generate_multisensor_measurement(
    xk_test, R_mocap, R_baro, R_imu, use_realistic_corruption=True
)
print(f"Combined measurement shape: {yk_combined.shape}")
print(f"R_combined shape: {R_combined.shape}")
print(f"Measurements: {yk_combined.flatten()}")

Block 5: Multi-Sensor Kalman Filter (Observer)

The Kalman filter fuses all three sensors by stacking their measurement models into a single innovation step.

Explicit Parameters:

  • Observer state \((\hat{x}_k, P_k)\) where:

    • \(\hat{x}_k = [\hat{x}, \hat{y}, \hat{z}, \hat{v}_x, \hat{v}_y, \hat{v}_z]^T\) is the state estimate (6×1)

    • \(P_k\) is the estimation error covariance (6×6)

  • Combined measurement \(y_k = [y_{\text{mocap}}, y_{\text{baro}}, y_{\text{IMU}}]^T\) (7×1)

  • Control input \(u_k\) (velocity commands)

Evolution Function: The Kalman filter performs a predict-update cycle:

Predict step: $\( \begin{aligned} \hat{x}_k^- &= f_p(\hat{x}_{k-1}, u_{k-1}, \Delta t) \quad \text{(state prediction)} \\ P_k^- &= F_k P_{k-1} F_k^T + Q_k \quad \text{(covariance prediction)} \end{aligned} \)$

Update step (fusing all three sensors simultaneously): $\( \begin{aligned} \nu_k &= y_k - h_{\text{multi}}(\hat{x}_k^-) \quad \text{(innovation, 7×1)} \\ S_k &= H_k P_k^- H_k^T + R_k \quad \text{(innovation covariance, 7×7)} \\ K_k &= P_k^- H_k^T S_k^{-1} \quad \text{(Kalman gain, 6×7)} \\ \hat{x}_k &= \hat{x}_k^- + K_k \nu_k \quad \text{(state update)} \\ P_k &= (I - K_k H_k) P_k^- (I - K_k H_k)^T + K_k R_k K_k^T \quad \text{(covariance update)} \end{aligned} \)$

The evolution function returns: $\( f_{\text{kf}}((\hat{x}_k, P_k), y_k, u_k) = (\hat{x}_{k+1}, P_{k+1}) \)$

Output Function: $\( h_{\text{kf}}((\hat{x}_k, P_k)) = \hat{x}_k \quad \text{(extract state estimate)} \)$

Implicit Parameters:

Linearization Jacobians (constant for this linear system): $\( F_k = \begin{bmatrix} I_3 & I_3 \Delta t \\ 0 & I_3 \end{bmatrix} \quad \text{(6×6 state transition)} \)$

\[\begin{split} H_k = \begin{bmatrix} I_3 & 0 \\ \begin{bmatrix}0 & 0 & 1\end{bmatrix} & 0 \\ 0 & I_3 \end{bmatrix} \quad \text{(7×6 measurement Jacobian)} \end{split}\]

where:

  • Rows 1-3 extract position \((x, y, z)\) for mocap

  • Row 4 extracts altitude \(z\) for barometer

  • Rows 5-7 extract velocity \((v_x, v_y, v_z)\) for IMU

Noise Covariances:

  • \(Q_k = \text{diag}(0.001, 0.001, 0.001, 0.1, 0.1, 0.1)\): Process noise (6×6)

    • Small position uncertainty, larger velocity uncertainty

  • \(R_k = \text{diag}(R_{\text{mocap}}, R_{\text{baro}}, R_{\text{IMU}})\): Measurement noise (7×7, block-diagonal)

Plant dynamics functions (needed for prediction):

  • \(f_p\): Crazyflie evolution function

  • \(h_{\text{multi}}\): Stacked measurement function

Note

The measurement Jacobian \(H_k\) has more rows (7) than columns (6), making this an overdetermined system—we have more measurements than states. The Kalman gain \(K_k\) optimally weighs the three sensors based on their relative noise levels. Sensors with lower noise (mocap) receive higher weight, while noisy sensors (IMU) are downweighted.

from pykal.algorithm_library.estimators.kf import KF


def compute_F_crazyflie(dt: float) -> np.ndarray:
    """Compute state transition Jacobian for Crazyflie (constant)."""
    I3 = np.eye(3)
    return np.block([[I3, I3 * dt], [np.zeros((3, 3)), I3]])


def compute_H_multisensor() -> np.ndarray:
    """
    Compute measurement Jacobian for multi-sensor fusion.

    Measurement order: [mocap(3), baro(1), imu(3)] = 7 measurements
    State order: [pos(3), vel(3)] = 6 states
    """
    H = np.array(
        [
            # Mocap: measures x, y, z (first 3 states)
            [1, 0, 0, 0, 0, 0],
            [0, 1, 0, 0, 0, 0],
            [0, 0, 1, 0, 0, 0],
            # Barometer: measures z only (3rd state)
            [0, 0, 1, 0, 0, 0],
            # IMU: measures vx, vy, vz (last 3 states)
            [0, 0, 0, 1, 0, 0],
            [0, 0, 0, 0, 1, 0],
            [0, 0, 0, 0, 0, 1],
        ]
    )
    return H


def h_multisensor(xk: np.ndarray) -> np.ndarray:
    """
    Multi-sensor measurement function (clean, no noise).

    Concatenates all sensor measurements: [mocap(3), baro(1), imu(3)] = 7D
    """
    y_mocap = crazyflie_h_mocap(xk)  # [x, y, z]
    y_baro = crazyflie_h_baro(xk)  # [z]
    y_imu = crazyflie_h_imu(xk)  # [vx, vy, vz]
    return np.vstack([y_mocap, y_baro, y_imu])


# Process noise
Q_crazyflie = np.diag([0.001, 0.001, 0.001, 0.1, 0.1, 0.1])

# Create observer
crazyflie_observer = DynamicalSystem(f=KF.f, h=KF.h, state_name="xhat_P")

# Test observer with multi-sensor measurement
xhat_0 = np.array([[0.0], [0.0], [1.0], [0.0], [0.0], [0.0]])
P_0 = np.diag([0.1, 0.1, 0.1, 1.0, 1.0, 1.0])
xhat_P_test = (xhat_0, P_0)

yk_test, R_test = generate_multisensor_measurement(xhat_0, R_mocap, R_baro, R_imu)

xhat_P_new, xhat_out = crazyflie_observer.step(
    params={
        "xhat_P": xhat_P_test,
        "yk": yk_test,
        "f": crazyflie_f,
        "f_params": {"xk": xhat_0, "uk": np.zeros((3, 1)), "dt": 0.01},
        "h": h_multisensor,
        "h_params": {"xk": xhat_0},
        "Fk": compute_F_crazyflie(dt=0.01),
        "Qk": Q_crazyflie,
        "Hk": compute_H_multisensor(),
        "Rk": R_test,
    })

print("Multi-sensor KF estimate:")
print(xhat_out.flatten())

Simulation

../../../_images/crazyflie_composition_of_dynamical_systems.svg

We now simulate the complete closed-loop system, integrating all five dynamical components:

  1. Setpoint Generator → reference position \(r_k\)

  2. Position Controller → velocity commands \(u_k\) (using \(r_k\) and \(\hat{p}_k\))

  3. Crazyflie Plant → true state evolution

  4. Multi-Sensor Array → measurements \(y_k\) from mocap, baro, IMU

  5. Kalman Filter → fused state estimate \(\hat{x}_k\) (using \(u_k\) and \(y_k\))

System Parameters

# Time parameters
dt = 0.01  # Sampling time (seconds) - 100 Hz
switch_time = 20.0  # Time at each waypoint (seconds)

# Controller gains
Kp = 0.8  # Position control gain

# Kalman filter parameters
Q = np.diag([0.001, 0.001, 0.001, 0.1, 0.1, 0.1])  # Process noise
R_mocap = np.diag([0.005, 0.005, 0.005])  # Motion capture noise (5mm)
R_baro = np.array([[0.1]])  # Barometer noise (10cm)
R_imu = np.diag([0.1, 0.1, 0.1])  # IMU noise (0.1 m/s)

Initial Conditions

# Initial states
sk = np.array([[0], [0.0]])  # Setpoint generator state: [index, time_elapsed]
xk = np.array([[0.0], [0.0], [1.0], [0.0], [0.0], [0.0]])  # Plant state: hover at 1m
xhat = np.array([[0.0], [0.0], [1.0], [0.0], [0.0], [0.0]])  # Observer estimate
P = np.diag([0.1, 0.1, 0.1, 1.0, 1.0, 1.0])  # Covariance matrix
xhat_P = (xhat, P)  # Observer state tuple

# Controller parameters
Kp = 0.8
max_vel = 0.5

# Storage for plotting
time_hist = []
reference_hist = []
true_state_hist = []
estimate_hist = []
measurement_mocap_hist = []
measurement_baro_hist = []
command_hist = []
error_hist = []

Simulate

Hide code cell source

# Simulation time
T_sim = 60.0  # seconds
time_steps = np.arange(0, T_sim, dt)

# Import KF from algorithm library
from pykal.algorithm_library.estimators import kf as KF_module

# Create observer DynamicalSystem
observer = DynamicalSystem(f=KF_module.f, h=KF_module.h, state_name="xhat_P")

# Run closed-loop simulation using all five DynamicalSystem components
for tk in time_steps:
    # 1. Setpoint generator step
    sk, rk = setpoint_generator.step(
        params={
            "sk": sk,
            "dt": dt,
            "switch_time": switch_time,
            "num_waypoints": len(hover_waypoints),
            "waypoints": hover_waypoints,
        })

    # 2. Extract position estimate from observer
    xhat = observer.h(xhat_P)
    phat = xhat[:3]

    # 3. Controller step (generates velocity commands)
    uk = controller.step(
        params={
            "phat": phat,
            "r": rk,
            "Kp": Kp,
            "max_vel": max_vel,
        }
    )

    # 4. Plant step (true dynamics)
    xk, _ = crazyflie_plant.step(
        params={"xk": xk, "uk": uk, "dt": dt})

    # 5. Generate multi-sensor measurements
    yk_combined, R_combined = generate_multisensor_measurement(
        xk, R_mocap, R_baro, R_imu, use_realistic_corruption=True
    )

    # 6. Observer step (KF)
    Fk = compute_F_crazyflie(dt)
    Hk = compute_H_multisensor()

    xhat_P, xhat_obs = observer.step(
        params={
            "xhat_P": xhat_P,
            "yk": yk_combined,
            "f": crazyflie_f,
            "f_params": {"xk": xhat, "uk": uk, "dt": dt},
            "h": h_multisensor,
            "h_params": {"xk": xhat},
            "Fk": Fk,
            "Qk": Q,
            "Hk": Hk,
            "Rk": R_combined,
        })

    # Store results
    time_hist.append(tk)
    reference_hist.append(rk.flatten())
    true_state_hist.append(xk.flatten())
    estimate_hist.append(xhat_obs.flatten())
    measurement_mocap_hist.append(yk_combined[:3].flatten())
    measurement_baro_hist.append(yk_combined[3].item())
    command_hist.append(uk.flatten())
    error_hist.append((xk - xhat_obs).flatten())

# Convert to numpy arrays
reference_hist = np.array(reference_hist)
true_state_hist = np.array(true_state_hist)
estimate_hist = np.array(estimate_hist)
measurement_mocap_hist = np.array(measurement_mocap_hist)
measurement_baro_hist = np.array(measurement_baro_hist)
command_hist = np.array(command_hist)
error_hist = np.array(error_hist)

Visualize

We can visualize the pertinent values of our system to assure correct behavior. Note how the Kalman filter fuses three heterogeneous sensors to provide accurate state estimates despite each sensor’s limitations.

Hide code cell source

from mpl_toolkits.mplot3d import Axes3D

fig = plt.figure(figsize=(15, 10))

# Plot 1: 3D Trajectory
ax = fig.add_subplot(2, 3, 1, projection="3d")
ax.plot(
    true_state_hist[:, 0],
    true_state_hist[:, 1],
    true_state_hist[:, 2],
    "b-",
    label="True",
    linewidth=2,
    alpha=0.7)
ax.plot(
    estimate_hist[:, 0],
    estimate_hist[:, 1],
    estimate_hist[:, 2],
    "r--",
    label="Estimate",
    linewidth=2,
    alpha=0.7)
ax.scatter(
    reference_hist[:, 0],
    reference_hist[:, 1],
    reference_hist[:, 2],
    c="green",
    s=100,
    marker="*",
    label="Setpoints")
ax.set_xlabel("X (m)")
ax.set_ylabel("Y (m)")
ax.set_zlabel("Z (m)")
ax.set_title("3D Trajectory", fontweight="bold")
ax.legend()

# Plot 2: XY Trajectory (top view)
ax = fig.add_subplot(2, 3, 2)
ax.plot(
    true_state_hist[:, 0],
    true_state_hist[:, 1],
    "b-",
    label="True",
    linewidth=2,
    alpha=0.7)
ax.plot(
    estimate_hist[:, 0],
    estimate_hist[:, 1],
    "r--",
    label="Estimate",
    linewidth=2,
    alpha=0.7)
ax.scatter(
    reference_hist[:, 0],
    reference_hist[:, 1],
    c="green",
    s=100,
    marker="*",
    label="Setpoints",
    zorder=5)
ax.set_xlabel("X Position (m)", fontsize=11)
ax.set_ylabel("Y Position (m)", fontsize=11)
ax.set_title("XY Trajectory (Top View)", fontsize=13, fontweight="bold")
ax.legend()
ax.grid(True, alpha=0.3)
ax.axis("equal")

# Plot 3: Z Position vs Time (with all sensors)
ax = fig.add_subplot(2, 3, 3)
ax.plot(time_hist, true_state_hist[:, 2], "b-", label="True Z", linewidth=2, alpha=0.7)
ax.plot(
    time_hist, estimate_hist[:, 2], "r--", label="Estimated Z", linewidth=2, alpha=0.7
)
ax.scatter(
    time_hist[::100],
    measurement_mocap_hist[::100, 2],
    c="green",
    s=5,
    alpha=0.3,
    label="Mocap Z")
ax.scatter(
    time_hist[::50],
    measurement_baro_hist[::50],
    c="purple",
    s=5,
    alpha=0.3,
    label="Baro Z")
ax.set_ylabel("Z Position (m)", fontsize=11)
ax.set_title("Altitude: Multi-Sensor Fusion", fontsize=13, fontweight="bold")
ax.legend()
ax.grid(True, alpha=0.3)

# Plot 4: X Position vs Time
ax = fig.add_subplot(2, 3, 4)
ax.plot(time_hist, true_state_hist[:, 0], "b-", label="True X", alpha=0.7)
ax.plot(time_hist, estimate_hist[:, 0], "r--", label="Estimated X", alpha=0.7)
ax.scatter(
    time_hist[::100],
    measurement_mocap_hist[::100, 0],
    c="gray",
    s=5,
    alpha=0.3,
    label="Mocap")
ax.set_ylabel("X Position (m)", fontsize=11)
ax.set_xlabel("Time (s)", fontsize=11)
ax.set_title("X Position Over Time", fontsize=13, fontweight="bold")
ax.legend()
ax.grid(True, alpha=0.3)

# Plot 5: Velocity Commands
ax = fig.add_subplot(2, 3, 5)
ax.plot(time_hist, command_hist[:, 0], label="Vx cmd", alpha=0.7)
ax.plot(time_hist, command_hist[:, 1], label="Vy cmd", alpha=0.7)
ax.plot(time_hist, command_hist[:, 2], label="Vz cmd", alpha=0.7)
ax.set_xlabel("Time (s)", fontsize=11)
ax.set_ylabel("Velocity Command (m/s)", fontsize=11)
ax.set_title("Control Commands", fontsize=13, fontweight="bold")
ax.legend()
ax.grid(True, alpha=0.3)

# Plot 6: Position Estimation Error
ax = fig.add_subplot(2, 3, 6)
position_error = np.sqrt(
    error_hist[:, 0] ** 2 + error_hist[:, 1] ** 2 + error_hist[:, 2] ** 2
)
ax.plot(time_hist, position_error * 1000, "m-", linewidth=1.5)  # Convert to mm
ax.set_xlabel("Time (s)", fontsize=11)
ax.set_ylabel("Position Error (mm)", fontsize=11)
ax.set_title("3D Position Estimation Error", fontsize=13, fontweight="bold")
ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print(f"Mean position error: {np.mean(position_error)*1000:.2f} mm")
print(f"Max position error: {np.max(position_error)*1000:.2f} mm")

Now we can run the same simulation with a much cleaner interface. All system parameters can be configured at initialization, and the simulation loop becomes trivial:

Hide code cell source

# Visualize results from callback
fig = plt.figure(figsize=(15, 10))

# Plot 1: 3D Trajectory
ax = fig.add_subplot(2, 3, 1, projection="3d")
ax.plot(
    results["true_state"][:, 0],
    results["true_state"][:, 1],
    results["true_state"][:, 2],
    "b-",
    label="True",
    linewidth=2,
    alpha=0.7)
ax.plot(
    results["estimate"][:, 0],
    results["estimate"][:, 1],
    results["estimate"][:, 2],
    "r--",
    label="Estimate",
    linewidth=2,
    alpha=0.7)
ax.scatter(
    results["reference"][:, 0],
    results["reference"][:, 1],
    results["reference"][:, 2],
    c="green",
    s=100,
    marker="*",
    label="Setpoints")
ax.set_xlabel("X (m)")
ax.set_ylabel("Y (m)")
ax.set_zlabel("Z (m)")
ax.set_title("3D Trajectory (Callback Interface)", fontweight="bold")
ax.legend()

# Plot 2: XY Trajectory (top view)
ax = fig.add_subplot(2, 3, 2)
ax.plot(
    results["true_state"][:, 0],
    results["true_state"][:, 1],
    "b-",
    label="True",
    linewidth=2,
    alpha=0.7)
ax.plot(
    results["estimate"][:, 0],
    results["estimate"][:, 1],
    "r--",
    label="Estimate",
    linewidth=2,
    alpha=0.7)
ax.scatter(
    results["reference"][:, 0],
    results["reference"][:, 1],
    c="green",
    s=100,
    marker="*",
    label="Setpoints",
    zorder=5)
ax.set_xlabel("X Position (m)", fontsize=11)
ax.set_ylabel("Y Position (m)", fontsize=11)
ax.set_title("XY Trajectory (Top View)", fontsize=13, fontweight="bold")
ax.legend()
ax.grid(True, alpha=0.3)
ax.axis("equal")

# Plot 3: Z Position vs Time (with all sensors)
ax = fig.add_subplot(2, 3, 3)
ax.plot(
    results["time"],
    results["true_state"][:, 2],
    "b-",
    label="True Z",
    linewidth=2,
    alpha=0.7)
ax.plot(
    results["time"],
    results["estimate"][:, 2],
    "r--",
    label="Estimated Z",
    linewidth=2,
    alpha=0.7)
ax.scatter(
    results["time"][::100],
    results["measurement_mocap"][::100, 2],
    c="green",
    s=5,
    alpha=0.3,
    label="Mocap Z")
ax.scatter(
    results["time"][::50],
    results["measurement_baro"][::50],
    c="purple",
    s=5,
    alpha=0.3,
    label="Baro Z")
ax.set_ylabel("Z Position (m)", fontsize=11)
ax.set_title("Altitude: Multi-Sensor Fusion", fontsize=13, fontweight="bold")
ax.legend()
ax.grid(True, alpha=0.3)

# Plot 4: X Position vs Time
ax = fig.add_subplot(2, 3, 4)
ax.plot(results["time"], results["true_state"][:, 0], "b-", label="True X", alpha=0.7)
ax.plot(
    results["time"], results["estimate"][:, 0], "r--", label="Estimated X", alpha=0.7
)
ax.scatter(
    results["time"][::100],
    results["measurement_mocap"][::100, 0],
    c="gray",
    s=5,
    alpha=0.3,
    label="Mocap")
ax.set_ylabel("X Position (m)", fontsize=11)
ax.set_xlabel("Time (s)", fontsize=11)
ax.set_title("X Position Over Time", fontsize=13, fontweight="bold")
ax.legend()
ax.grid(True, alpha=0.3)

# Plot 5: Velocity Commands
ax = fig.add_subplot(2, 3, 5)
ax.plot(results["time"], results["command"][:, 0], label="Vx cmd", alpha=0.7)
ax.plot(results["time"], results["command"][:, 1], label="Vy cmd", alpha=0.7)
ax.plot(results["time"], results["command"][:, 2], label="Vz cmd", alpha=0.7)
ax.set_xlabel("Time (s)", fontsize=11)
ax.set_ylabel("Velocity Command (m/s)", fontsize=11)
ax.set_title("Control Commands", fontsize=13, fontweight="bold")
ax.legend()
ax.grid(True, alpha=0.3)

# Plot 6: Position Estimation Error
ax = fig.add_subplot(2, 3, 6)
position_error = np.sqrt(
    results["estimation_error"][:, 0] ** 2
    + results["estimation_error"][:, 1] ** 2
    + results["estimation_error"][:, 2] ** 2
)
ax.plot(results["time"], position_error * 1000, "m-", linewidth=1.5)  # Convert to mm
ax.set_xlabel("Time (s)", fontsize=11)
ax.set_ylabel("Position Error (mm)", fontsize=11)
ax.set_title("3D Position Estimation Error", fontsize=13, fontweight="bold")
ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print(f"Mean position error: {np.mean(position_error)*1000:.2f} mm")
print(f"Max position error: {np.max(position_error)*1000:.2f} mm")

Experimentation

Now that we have a working system, try experimenting with different parameters:

Sensor characteristics:

  • Vary mocap noise to simulate poor lighting conditions

  • Increase barometer noise to simulate temperature-induced drift

  • Increase IMU noise to simulate cheap sensors

  • Observe how the KF automatically reweights sensors

Controller tuning:

  • Adjust Kp to change position tracking aggressiveness

  • Try different hover patterns (figure-eight, circle, random)

  • Add velocity limits to simulate real quadrotor constraints

Observer comparison:

  • Run simulation with KF disabled (use raw mocap only)

  • Compare estimation error with single-sensor vs multi-sensor

  • Visualize which sensor dominates in different flight regimes

Challenge: Design a trajectory where fusion of all three sensors significantly outperforms any single sensor.

← Control Systems as Dynamical Systems