← Dynamical Systems as ROS Nodes

TurtleBot State Estimation: From Python to ROS2

In the TurtleBot State Estimation tutorial, we designed a complete navigation system in Python using DynamicalSystem components. Now we’ll deploy this exact system as ROS2 nodes.

Key Insight: The dynamical system architecture maps directly to ROS!

Each DynamicalSystem becomes a ROS node, and parameter dictionary connections become ROS topics. This 1:1 correspondence preserves our theoretical design while enabling distributed deployment.

Conceptual Foundation: From Dynamical Systems to ROS Nodes

Recall from the TurtleBot State Estimation tutorial that we represented our navigation system as a composition of dynamical systems:

Each block is a DynamicalSystem with state evolution (f) and observation (h) functions. Data flows through a shared parameter dictionary.

The ROS Mapping

There is a direct correspondence between this dynamical system representation and a ROS node architecture:

The correspondence:

  • Each dynamical system block becomes a ROS node

  • Each data connection becomes a ROS topic

  • The parameter dictionary is replaced by message passing

  • Each step() method becomes a callback function

Mathematically: There is a functor from the category of discrete-time dynamical systems (objects = blocks, morphisms = data flows) to the category of ROS nodes (objects = nodes, morphisms = topics).

Adding Teleoperation

In the Python simulation, we couldn’t interactively control the robot during execution because Jupyter kernels lack elegant real-time input handling. However, ROS middleware naturally supports this through teleop nodes!

Teleoperation provides direct velocity control, bypassing the high-level navigation pipeline entirely:

Key Architectural Difference:

  • Autonomous Navigation: Waypoint Generator → Velocity Controller → /cmd_vel → Plant

  • Teleoperation: Keyboard Teleop → /cmd_vel → Plant

The Keyboard Teleop node publishes velocity commands directly to /cmd_vel, completely bypassing both the waypoint generator and position controller. The observer (Kalman filter) continues running to provide state estimates for monitoring and visualization.

This design pattern is common in robotics: teleoperation provides low-level control for manual operation, while autonomous mode uses the full planning and control pipeline.

Note

This tutorial implements the basic ROS node architecture (without teleop). The teleop version will be demonstrated in a later tutorial.

Individual Node Blocks

Each ROS node wraps a dynamical system block. Here are the individual components:

Waypoint Generator

Velocity Controller

TurtleBot Plant

EKF Observer

In the following sections, we’ll implement each of these blocks as ROSNode instances, demonstrating how theoretical design maps directly to ROS deployment.

Architecture Comparison: Python Simulation vs ROS Nodes

Python Simulation (theory notebook):

# Direct function calls, shared parameter dictionary
for tk in time_steps:
    rk = waypoint_generator(...)  # Returns reference
    xhat = observer.h(xhat_P)      # Extract estimate
    uk = controller(xhat, rk, ...) # Generate command
    xk, yk = plant.step(...)       # Update plant
    xhat_P = observer.step(...)    # Update observer

ROS2 Nodes (this notebook):

waypoint_generator_node → /reference (PoseStamped)
                               ↓
velocity_controller_node → /cmd_vel (Twist)
                               ↓
turtlebot_simulator_node → /odom (Odometry)
                               ↓
kalman_filter_node → /estimate (Odometry)
        ↓
   (feedback via /estimate topic)

Note

The ROS graph is the block diagram! Each function becomes a node, each data flow becomes a topic.

ROS2 System Architecture

Our system consists of four ROS nodes:

TurtleBot ROS System

Node 1: Waypoint Generator

  • Publishes: /reference (PoseStamped)

  • Function: Generates reference poses for tracking

Node 2: Velocity Controller

  • Subscribes: /reference, /estimate

  • Publishes: /cmd_vel (Twist)

  • Function: Proportional control to track reference

Node 3: TurtleBot Simulator

  • Subscribes: /cmd_vel

  • Publishes: /odom (noisy), /true_state (clean)

  • Function: Simulates robot dynamics and sensors

Node 4: Kalman Filter Observer

  • Subscribes: /odom, /cmd_vel

  • Publishes: /estimate (Odometry)

  • Function: Fuses measurements and predictions

Setup: Imports and Core Functions

import numpy as np
import matplotlib.pyplot as plt
from pykal import DynamicalSystem
from pykal.ros.ros_node import ROSNode
from pykal.algorithm_library.estimators.kf import KF
from pykal.data_change import corrupt

# ROS message types
from geometry_msgs.msg import Twist, PoseStamped
from nav_msgs.msg import Odometry
import rclpy
import time

print("✓ Imports successful!")

Core TurtleBot Functions (from Theory Notebook)

We reuse the exact same dynamics and Jacobian functions from the theory notebook:

# ============================================================================
# TurtleBot Dynamics
# ============================================================================


def turtlebot_f(xk: np.ndarray, uk: np.ndarray, dt: float) -> np.ndarray:
    """Unicycle kinematics: [x, y, theta, v, omega]."""
    x, y, theta, v, omega = xk.flatten()
    v_cmd, omega_cmd = uk.flatten()

    x_new = x + v * np.cos(theta) * dt
    y_new = y + v * np.sin(theta) * dt
    theta_new = np.arctan2(np.sin(theta + omega * dt), np.cos(theta + omega * dt))

    return np.array([[x_new], [y_new], [theta_new], [v_cmd], [omega_cmd]])


def turtlebot_h(xk: np.ndarray) -> np.ndarray:
    """Measurement: [x, y, theta] from odometry."""
    return xk[:3, :]


# ============================================================================
# EKF Jacobians
# ============================================================================


def compute_F_jacobian(xhat: np.ndarray, dt: float) -> np.ndarray:
    """Jacobian of dynamics."""
    _, _, theta, v, _ = xhat.flatten()
    return np.array(
        [
            [1, 0, -v * np.sin(theta) * dt, np.cos(theta) * dt, 0],
            [0, 1, v * np.cos(theta) * dt, np.sin(theta) * dt, 0],
            [0, 0, 1, 0, dt],
            [0, 0, 0, 1, 0],
            [0, 0, 0, 0, 1],
        ]
    )


def compute_H_jacobian() -> np.ndarray:
    """Jacobian of measurement."""
    return np.array([[1, 0, 0, 0, 0], [0, 1, 0, 0, 0], [0, 0, 1, 0, 0]])


# ============================================================================
# Noise Covariances
# ============================================================================

Q_turtlebot = np.diag([0.01, 0.01, 0.02, 0.1, 0.1])  # Process noise
R_turtlebot = np.diag([0.05, 0.05, 0.1])  # Measurement noise

print("Core TurtleBot functions defined!")
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[1], line 6
      1 # ============================================================================
      2 # TurtleBot Dynamics
      3 # ============================================================================
----> 6 def turtlebot_f(xk: np.ndarray, uk: np.ndarray, dt: float) -> np.ndarray:
      7     """Unicycle kinematics: [x, y, theta, v, omega]."""
      8     x, y, theta, v, omega = xk.flatten()

NameError: name 'np' is not defined

Node 1: Waypoint Generator

ROS Wrapper Pattern: We wrap our waypoint logic in a ROSNode that publishes /reference.

Message Format: PoseStamped = (8,) array [t, px, py, pz, qx, qy, qz, qw]

Key Changes from Python:

  • Python: Returns NumPy array directly

  • ROS: Publishes to /reference topic

  • Message conversion handled automatically by ROSNode

def create_waypoint_generator_node(waypoints, switch_time=15.0, rate_hz=10.0):
    """
    Create ROS node that publishes waypoints.

    Parameters
    ----------
    waypoints : list of tuples
        Each tuple is (x, y, theta)
    switch_time : float
        Time to spend at each waypoint
    rate_hz : float
        Publishing rate

    Returns
    -------
    ROSNode
        Configured waypoint generator node
    """
    # State: current waypoint index and time elapsed
    current_idx = 0
    time_at_waypoint = 0.0
    last_tk = 0.0

    def waypoint_callback(tk):
        nonlocal current_idx, time_at_waypoint, last_tk

        # Compute elapsed time
        dt = tk - last_tk if last_tk > 0 else 0.0
        last_tk = tk
        time_at_waypoint += dt

        # Switch waypoint if needed
        if time_at_waypoint >= switch_time:
            current_idx = (current_idx + 1) % len(waypoints)
            time_at_waypoint = 0.0

        # Get current waypoint
        x_r, y_r, theta_r = waypoints[current_idx]

        # Convert theta to quaternion (2D: only yaw rotation)
        qx, qy = 0.0, 0.0
        qz = np.sin(theta_r / 2.0)
        qw = np.cos(theta_r / 2.0)

        # Pack as PoseStamped: [t, px, py, pz, qx, qy, qz, qw]
        reference = np.array([tk, x_r, y_r, 0.0, qx, qy, qz, qw])

        return {"reference": reference}

    # Create ROS node
    node = ROSNode(
        node_name="waypoint_generator",
        callback=waypoint_callback,
        subscribes_to=[],  # No subscriptions
        publishes_to=[
            ("reference", PoseStamped, "/reference"),
        ],
        rate_hz=rate_hz)

    return node


# Create waypoint generator
square_waypoints = [
    (2.0, 0.0, 0.0),  # Right
    (2.0, 2.0, np.pi / 2),  # Up
    (0.0, 2.0, np.pi),  # Left
    (0.0, 0.0, -np.pi / 2),  # Down
]

waypoint_node = create_waypoint_generator_node(
    waypoints=square_waypoints, switch_time=15.0, rate_hz=10.0
)

print("Waypoint generator node created!")
print(f"  Publishes: /reference (PoseStamped)")
print(f"  Waypoints: {len(square_waypoints)} points in square pattern")
print(f"  Switch time: 15.0 seconds")

Node 2: Velocity Controller

ROS Wrapper Pattern: Subscribes to /reference and /estimate, publishes /cmd_vel.

Controller Law:

\[ v_{cmd} = K_v \cdot \| \vec{r} - \hat{\vec{p}} \| \]
\[ \omega_{cmd} = K_\omega \cdot (\theta_r - \hat{\theta}) \]

Key Changes from Python:

  • Python: Direct function call with parameters

  • ROS: Callback receives messages as kwargs

  • Automatic message → NumPy conversion

def create_velocity_controller_node(Kv=0.5, Komega=1.5, rate_hz=50.0):
    """
    Create ROS node for proportional velocity controller.

    Parameters
    ----------
    Kv : float
        Linear velocity gain
    Komega : float
        Angular velocity gain
    rate_hz : float
        Control loop rate

    Returns
    -------
    ROSNode
        Configured controller node
    """

    def controller_callback(tk, reference, estimate):
        """
        Compute velocity command.

        Args:
            tk (float): Current time
            reference (np.ndarray): (8) PoseStamped [t, px, py, pz, qx, qy, qz, qw]
            estimate (np.ndarray): (13) Odometry [px, py, pz, qx, qy, qz, qw, vx, vy, vz, wx, wy, wz]

        Returns:
            dict: {'cmd_vel': (6) Twist array}
        """
        # Extract reference position and heading
        x_r, y_r = reference[1], reference[2]  # Skip timestamp
        qz_r, qw_r = reference[6], reference[7]
        theta_r = np.arctan2(2.0 * qw_r * qz_r, 1.0 - 2.0 * qz_r**2)

        # Extract current position and heading from estimate
        x, y = estimate[0], estimate[1]
        qz, qw = estimate[5], estimate[6]
        theta = np.arctan2(2.0 * qw * qz, 1.0 - 2.0 * qz**2)

        # Compute errors
        dx = x_r - x
        dy = y_r - y
        distance = np.sqrt(dx**2 + dy**2)

        heading_error = theta_r - theta
        heading_error = np.arctan2(np.sin(heading_error), np.cos(heading_error))

        # Proportional control
        v_cmd = Kv * distance
        omega_cmd = Komega * heading_error

        # Saturate (TurtleBot limits)
        v_cmd = np.clip(v_cmd, -0.22, 0.22)
        omega_cmd = np.clip(omega_cmd, -2.84, 2.84)

        # Pack as Twist: [vx, vy, vz, wx, wy, wz]
        cmd_vel = np.array([v_cmd, 0.0, 0.0, 0.0, 0.0, omega_cmd])

        return {"cmd_vel": cmd_vel}

    # Create ROS node
    node = ROSNode(
        node_name="velocity_controller",
        callback=controller_callback,
        subscribes_to=[
            ("/reference", PoseStamped, "reference"),
            ("/estimate", Odometry, "estimate"),
        ],
        publishes_to=[
            ("cmd_vel", Twist, "/cmd_vel"),
        ],
        rate_hz=rate_hz,
        required_topics={"reference", "estimate"},  # Both required for control
    )

    return node


# Create controller node
controller_node = create_velocity_controller_node(Kv=0.5, Komega=1.5, rate_hz=50.0)

print("Velocity controller node created!")
print(f"  Subscribes: /reference, /estimate")
print(f"  Publishes: /cmd_vel (Twist)")
print(f"  Gains: Kv={0.5}, Komega={1.5}")
print(f"  Rate: 50 Hz")

Node 3: TurtleBot Simulator (Plant)

ROS Wrapper Pattern: Subscribes to /cmd_vel, publishes /odom and /true_state.

Dynamics: Same unicycle model from theory notebook

\[\begin{split} \begin{aligned} x_{k+1} &= x_k + v_k \cos(\theta_k) \Delta t \\ y_{k+1} &= y_k + v_k \sin(\theta_k) \Delta t \\ \theta_{k+1} &= \theta_k + \omega_k \Delta t \end{aligned} \end{split}\]

Measurement Noise: Adds realistic odometry noise + occasional spikes

Note

We publish both /odom (noisy, like a real robot) and /true_state (clean, for visualization).

def create_turtlebot_simulator_node(dt=0.1, rate_hz=10.0, R=None):
    """
    Create ROS node that simulates TurtleBot dynamics.

    Parameters
    ----------
    dt : float
        Integration timestep
    rate_hz : float
        Simulation rate
    R : np.ndarray
        Measurement noise covariance (3x3)

    Returns
    -------
    ROSNode
        Configured simulator node
    """
    if R is None:
        R = np.diag([0.05, 0.05, 0.1])  # Default odometry noise

    # Internal state: [x, y, theta, v, omega]
    xk = np.array([[0.0], [0.0], [0.0], [0.0], [0.0]])

    def simulator_callback(tk, cmd_vel):
        """
        Simulate one timestep of TurtleBot dynamics.

        Args:
            tk (float): Current time
            cmd_vel (np.ndarray): (6) Twist [vx, vy, vz, wx, wy, wz]

        Returns:
            dict: {'odom': noisy Odometry, 'true_state': clean Odometry}
        """
        nonlocal xk

        # Extract commands (only use vx and wz for 2D motion)
        uk = np.array([[cmd_vel[0]], [cmd_vel[5]]])

        # Update state using dynamics
        xk = turtlebot_f(xk, uk, dt)

        # Convert to Odometry format: [px, py, pz, qx, qy, qz, qw, vx, vy, vz, wx, wy, wz]
        x, y, theta, v, omega = xk.flatten()

        # Convert theta to quaternion
        qx, qy = 0.0, 0.0
        qz = np.sin(theta / 2.0)
        qw = np.cos(theta / 2.0)

        # True state (clean)
        true_state = np.array(
            [
                x,
                y,
                0.0,  # position
                qx,
                qy,
                qz,
                qw,  # orientation
                v * np.cos(theta),
                v * np.sin(theta),
                0.0,  # linear velocity
                0.0,
                0.0,
                omega,  # angular velocity
            ]
        )

        # Add realistic measurement noise
        # Corrupt position and orientation
        noise_pos = np.random.multivariate_normal(np.zeros(3), R)
        x_noisy = x + noise_pos[0]
        y_noisy = y + noise_pos[1]
        theta_noisy = theta + noise_pos[2]

        # Add occasional spikes to heading (bumps/wheel slip)
        if np.random.rand() < 0.02:  # 2% spike rate
            theta_noisy += np.random.choice([-1, 1]) * 0.3  # ~17 degrees

        # Convert noisy theta to quaternion
        qz_noisy = np.sin(theta_noisy / 2.0)
        qw_noisy = np.cos(theta_noisy / 2.0)

        # Noisy odometry
        odom = np.array(
            [
                x_noisy,
                y_noisy,
                0.0,
                qx,
                qy,
                qz_noisy,
                qw_noisy,
                v * np.cos(theta),
                v * np.sin(theta),
                0.0,
                0.0,
                0.0,
                omega,
            ]
        )

        return {"odom": odom, "true_state": true_state}

    # Create ROS node
    node = ROSNode(
        node_name="turtlebot_simulator",
        callback=simulator_callback,
        subscribes_to=[
            ("/cmd_vel", Twist, "cmd_vel"),
        ],
        publishes_to=[
            ("odom", Odometry, "/odom"),
            ("true_state", Odometry, "/true_state"),
        ],
        rate_hz=rate_hz)

    return node


# Create simulator node
simulator_node = create_turtlebot_simulator_node(dt=0.1, rate_hz=10.0, R=R_turtlebot)

print("TurtleBot simulator node created!")
print(f"  Subscribes: /cmd_vel (Twist)")
print(f"  Publishes: /odom (noisy), /true_state (clean)")
print(f"  Dynamics: Unicycle model with dt={0.1}s")
print(f"  Noise: R = diag([0.05, 0.05, 0.1]) + occasional spikes")

Node 4: Extended Kalman Filter Observer

ROS Wrapper Pattern: Subscribes to /odom and /cmd_vel, publishes /estimate.

Algorithm: Same EKF from theory notebook

Prediction Step:

\[ \hat{x}_k^- = f(\hat{x}_{k-1}, u_k) \]
\[ P_k^- = F_k P_{k-1} F_k^T + Q \]

Update Step:

\[ K_k = P_k^- H_k^T (H_k P_k^- H_k^T + R)^{-1} \]
\[ \hat{x}_k = \hat{x}_k^- + K_k (y_k - h(\hat{x}_k^-)) \]

Key Changes from Python:

  • Python: observer.step() with direct array passing

  • ROS: Callback receives ROS messages, calls KF.f(), publishes result

def create_kalman_filter_node(dt=0.1, rate_hz=10.0, Q=None, R=None):
    """
    Create ROS node for Extended Kalman Filter.

    Parameters
    ----------
    dt : float
        Filter timestep
    rate_hz : float
        Filter rate
    Q : np.ndarray
        Process noise covariance (5x5)
    R : np.ndarray
        Measurement noise covariance (3x3)

    Returns
    -------
    ROSNode
        Configured Kalman filter node
    """
    if Q is None:
        Q = np.diag([0.01, 0.01, 0.02, 0.1, 0.1])
    if R is None:
        R = np.diag([0.05, 0.05, 0.1])

    # Initial estimate
    xhat = np.array([[0.0], [0.0], [0.0], [0.0], [0.0]])
    P = np.diag([0.1, 0.1, 0.1, 1.0, 1.0])

    def filter_callback(tk, odom, cmd_vel):
        """
        Run one step of EKF.

        Args:
            tk (float): Current time
            odom (np.ndarray): (13) Odometry measurement
            cmd_vel (np.ndarray): (6) Twist command

        Returns:
            dict: {'estimate': (13) Odometry with state estimate}
        """
        nonlocal xhat, P

        # Extract measurement [x, y, theta] from Odometry
        x_meas, y_meas = odom[0], odom[1]
        qz_meas, qw_meas = odom[5], odom[6]
        theta_meas = np.arctan2(2.0 * qw_meas * qz_meas, 1.0 - 2.0 * qz_meas**2)
        yk = np.array([[x_meas], [y_meas], [theta_meas]])

        # Extract control input [v_cmd, omega_cmd]
        uk = np.array([[cmd_vel[0]], [cmd_vel[5]]])

        # Compute Jacobians
        Fk = compute_F_jacobian(xhat, dt)
        Hk = compute_H_jacobian()

        # Run EKF update using pykal's KF.f
        f_params = {"xk": xhat, "uk": uk, "dt": dt}
        h_params = {"xk": xhat}

        xhat_new, P_new = KF.f(
            x_P=(xhat, P),
            y=yk,
            u=uk,
            f=turtlebot_f,
            F=lambda **params: Fk,
            Q=lambda **params: Q,
            h=turtlebot_h,
            H=lambda **params: Hk,
            R=lambda **params: R,
            f_params=f_params,
            F_params={},
            Q_params={},
            h_params=h_params,
            H_params={},
            R_params={})

        # Update state
        xhat = xhat_new
        P = P_new

        # Extract estimate from state vector
        x_est, y_est, theta_est, v_est, omega_est = xhat.flatten()

        # Convert to Odometry format
        qx, qy = 0.0, 0.0
        qz_est = np.sin(theta_est / 2.0)
        qw_est = np.cos(theta_est / 2.0)

        estimate = np.array(
            [
                x_est,
                y_est,
                0.0,
                qx,
                qy,
                qz_est,
                qw_est,
                v_est * np.cos(theta_est),
                v_est * np.sin(theta_est),
                0.0,
                0.0,
                0.0,
                omega_est,
            ]
        )

        return {"estimate": estimate}

    # Create ROS node
    node = ROSNode(
        node_name="kalman_filter",
        callback=filter_callback,
        subscribes_to=[
            ("/odom", Odometry, "odom"),
            ("/cmd_vel", Twist, "cmd_vel"),
        ],
        publishes_to=[
            ("estimate", Odometry, "/estimate"),
        ],
        rate_hz=rate_hz,
        required_topics={"odom", "cmd_vel"},  # Need both for EKF
    )

    return node


# Create Kalman filter node
kf_node = create_kalman_filter_node(dt=0.1, rate_hz=10.0, Q=Q_turtlebot, R=R_turtlebot)

print("Kalman filter node created!")
print(f"  Subscribes: /odom, /cmd_vel")
print(f"  Publishes: /estimate (Odometry)")
print(f"  Algorithm: Extended Kalman Filter (EKF)")
print(f"  Q: Process noise covariance (5x5)")
print(f"  R: Measurement noise covariance (3x3)")

Running the Complete ROS2 System

Now we initialize ROS2, create all nodes, and start the distributed system:

# Initialize ROS2
rclpy.init()

# Create all nodes
print("Creating ROS2 nodes...")
waypoint_node.create_node()
controller_node.create_node()
simulator_node.create_node()
kf_node.create_node()
print("All nodes created!")

# Start all nodes
print("\nStarting nodes...")
waypoint_node.start()
print("  ✓ Waypoint generator running")
simulator_node.start()
print("  ✓ TurtleBot simulator running")
kf_node.start()
print("  ✓ Kalman filter running")
controller_node.start()
print("  ✓ Velocity controller running")

print("\n🚀 TurtleBot ROS system is live!")
print("\nROS2 Topic Graph:")
print("  /reference ← waypoint_generator")
print("     ↓")
print("  velocity_controller → /cmd_vel")
print("     ↓")
print("  turtlebot_simulator → /odom, /true_state")
print("     ↓")
print("  kalman_filter → /estimate")
print("     ↓ (feedback)")
print("  velocity_controller")

print("\n💡 Tip: Run 'rqt_graph' in a terminal to visualize the node graph!")
print("         You'll see that the ROS graph matches the block diagram.")

Data Collection for Analysis

We create a logger node to subscribe to all topics and collect data:

# Data collection node
def create_data_logger_node():
    """
    Create a node that logs all published data for analysis.
    """
    data_log = {
        "time": [],
        "reference": [],
        "cmd_vel": [],
        "true_state": [],
        "odom": [],
        "estimate": [],
    }

    def logger_callback(tk, reference, cmd_vel, true_state, odom, estimate):
        """Log all topics."""
        data_log["time"].append(tk)
        data_log["reference"].append(reference.copy())
        data_log["cmd_vel"].append(cmd_vel.copy())
        data_log["true_state"].append(true_state.copy())
        data_log["odom"].append(odom.copy())
        data_log["estimate"].append(estimate.copy())

        return {}  # No publications

    node = ROSNode(
        node_name="data_logger",
        callback=logger_callback,
        subscribes_to=[
            ("/reference", PoseStamped, "reference"),
            ("/cmd_vel", Twist, "cmd_vel"),
            ("/true_state", Odometry, "true_state"),
            ("/odom", Odometry, "odom"),
            ("/estimate", Odometry, "estimate"),
        ],
        publishes_to=[],
        rate_hz=10.0)

    return node, data_log


# Create and start logger
logger_node, data_log = create_data_logger_node()
logger_node.create_node()
logger_node.start()
print("Data logger started! Collecting data...")

# Run for 60 seconds
import time as pytime

T_sim = 60.0
print(f"\nRunning simulation for {T_sim} seconds...")
pytime.sleep(T_sim)

print(f"\nSimulation complete! Collected {len(data_log['time'])} samples.")

Stopping the System

# Stop all nodes
print("Stopping nodes...")
logger_node.stop()
waypoint_node.stop()
controller_node.stop()
simulator_node.stop()
kf_node.stop()

# Shutdown ROS2
rclpy.shutdown()
print("All nodes stopped. ROS2 shutdown complete.")

Visualization and Analysis

Let’s visualize the ROS deployment results:

Hide code cell source

# Convert logged data to arrays
time_vec = np.array(data_log["time"])
true_states = np.array(data_log["true_state"])
estimates = np.array(data_log["estimate"])
measurements = np.array(data_log["odom"])
commands = np.array(data_log["cmd_vel"])
references = np.array(data_log["reference"])

# Extract positions from Odometry arrays
true_x = true_states[:, 0]
true_y = true_states[:, 1]
est_x = estimates[:, 0]
est_y = estimates[:, 1]
meas_x = measurements[:, 0]
meas_y = measurements[:, 1]

# Extract references from PoseStamped
ref_x = references[:, 1]  # Skip timestamp
ref_y = references[:, 2]

# Plotting
fig, axs = plt.subplots(2, 2, figsize=(14, 10))

# Plot 1: 2D Trajectory
ax = axs[0, 0]
ax.plot(true_x, true_y, "b-", linewidth=2, label="True Trajectory", alpha=0.7)
ax.plot(est_x, est_y, "r--", linewidth=2, label="Estimated (EKF)", alpha=0.7)
ax.scatter(
    ref_x[::50], ref_y[::50], c="green", s=100, marker="*", label="Waypoints", zorder=5
)
ax.set_xlabel("X Position (m)", fontsize=11)
ax.set_ylabel("Y Position (m)", fontsize=11)
ax.set_title("ROS Deployment: 2D Trajectory", fontsize=13, fontweight="bold")
ax.legend()
ax.grid(True, alpha=0.3)
ax.axis("equal")

# Plot 2: X Position
ax = axs[0, 1]
ax.plot(time_vec, true_x, "b-", label="True X", alpha=0.7)
ax.plot(time_vec, est_x, "r--", label="Estimated X", alpha=0.7)
ax.scatter(
    time_vec[::10], meas_x[::10], c="gray", s=10, alpha=0.3, label="Measurements"
)
ax.set_ylabel("X Position (m)", fontsize=11)
ax.set_title("X Position: ROS Topics", fontsize=13, fontweight="bold")
ax.legend()
ax.grid(True, alpha=0.3)

# Plot 3: Y Position
ax = axs[1, 0]
ax.plot(time_vec, true_y, "b-", label="True Y", alpha=0.7)
ax.plot(time_vec, est_y, "r--", label="Estimated Y", alpha=0.7)
ax.scatter(
    time_vec[::10], meas_y[::10], c="gray", s=10, alpha=0.3, label="Measurements"
)
ax.set_xlabel("Time (s)", fontsize=11)
ax.set_ylabel("Y Position (m)", fontsize=11)
ax.set_title("Y Position: ROS Topics", fontsize=13, fontweight="bold")
ax.legend()
ax.grid(True, alpha=0.3)

# Plot 4: Estimation Error
ax = axs[1, 1]
position_error = np.sqrt((true_x - est_x) ** 2 + (true_y - est_y) ** 2)
ax.plot(time_vec, position_error, "m-", linewidth=1.5, label="Position Error")
ax.set_xlabel("Time (s)", fontsize=11)
ax.set_ylabel("Error (m)", fontsize=11)
ax.set_title("KF Estimation Error", fontsize=13, fontweight="bold")
ax.legend()
ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print(f"\nPerformance Metrics (ROS Deployment):")
print(f"  Mean position error: {np.mean(position_error):.4f} m")
print(f"  Max position error: {np.max(position_error):.4f} m")
print(f"  Final position error: {position_error[-1]:.4f} m")

Summary: Python → ROS2 Deployment

We’ve successfully deployed our TurtleBot navigation system to ROS2!

Mapping: Theory → ROS:

Python (Theory)

ROS2 (Deployment)

DynamicalSystem

ROSNode

Function call

Topic publish/subscribe

Parameter dict

ROS message

step() method

Node callback

NumPy array

Auto-converted message

Key Accomplishments:

  1. Architecture Preservation: Block diagram = ROS graph

  2. Code Reuse: Same dynamics, Jacobians, noise covariances

  3. Automatic Conversion: NumPy ↔ ROS messages handled by ROSNode

  4. Distributed System: Nodes can run on different machines

  5. Same Algorithm: EKF implementation identical to theory notebook

What Changed:

  • ✓ Wrapped functions as ROSNode callbacks

  • ✓ Added message format conversions (PoseStamped, Odometry, Twist)

  • ✓ Used ROS topics instead of direct function calls

  • Did NOT change: Dynamics, controller, observer algorithms

Next Step: In the Gazebo integration tutorial, we’ll replace the simulator node with Gazebo’s physics engine, demonstrating ROS → Simulation → Hardware!

Architecture 2: Teleoperation Mode

In the previous section, we demonstrated autonomous navigation with waypoint tracking. Now we’ll demonstrate teleoperation mode, where a human operator directly controls the robot’s velocity.

Key Architectural Difference:

  • Autonomous (previous): Waypoint Gen → Controller → /cmd_vel → Plant → Observer

  • Teleoperation (this section): Keyboard Teleop/cmd_vel → Plant → Observer

The teleoperation node (shown in RED in diagrams) is an external ROS2 tool, not wrapped in pykal.ROSNode. It publishes velocity commands directly to /cmd_vel, completely bypassing the high-level planning pipeline.

Nodes Used in Teleoperation Mode:

  1. TurtleBot Simulator - Receives /cmd_vel from teleop

  2. Kalman Filter - Continues providing state estimates for monitoring

  3. Waypoint Generator - Not needed (bypassed)

  4. Velocity Controller - Not needed (bypassed)

  5. 🔴 Keyboard Teleop - External tool (teleop_twist_keyboard)

# Re-initialize ROS2 (if needed after previous shutdown)
if not rclpy.ok():
    rclpy.init()

# Create fresh simulator and KF nodes for teleoperation
print("Creating nodes for teleoperation mode...")
simulator_teleop = create_turtlebot_simulator_node(dt=0.1, rate_hz=10.0, R=R_turtlebot)
kf_teleop = create_kalman_filter_node(dt=0.1, rate_hz=10.0, Q=Q_turtlebot, R=R_turtlebot)

simulator_teleop.create_node()
kf_teleop.create_node()
print("✓ Simulator and Kalman filter created")

# Start nodes
print("\nStarting nodes...")
simulator_teleop.start()
print("  ✓ TurtleBot simulator running")
kf_teleop.start()
print("  ✓ Kalman filter running")

print("\n🚀 System ready for teleoperation!")
print("\nActive ROS2 Topics:")
print("  /cmd_vel ← (waiting for keyboard teleop)")
print("  /odom ← turtlebot_simulator")
print("  /estimate ← kalman_filter")
print("\nNOTE: Waypoint generator and controller are NOT running (bypassed!)")
# Create a simulated teleop node that mimics user input
def create_simulated_teleop_node():
    """
    Simulate keyboard teleop with programmed commands.
    
    This mimics what teleop_twist_keyboard would publish,
    demonstrating the teleoperation architecture.
    """
    import time
    
    # Programmed maneuver sequence
    maneuvers = [
        (0, 5, (0.15, 0.0)),      # 0-5s: Forward
        (5, 8, (0.15, 0.8)),      # 5-8s: Forward + turn left
        (8, 13, (0.15, 0.0)),     # 8-13s: Forward
        (13, 16, (0.15, -0.8)),   # 13-16s: Forward + turn right
        (16, 21, (0.15, 0.0)),    # 16-21s: Forward
        (21, 24, (0.0, 1.5)),     # 21-24s: Spin left
        (24, 30, (0.15, 0.0)),    # 24-30s: Forward
    ]
    
    start_time = None
    
    def teleop_callback(tk):
        nonlocal start_time
        
        if start_time is None:
            start_time = tk
        
        elapsed = tk - start_time
        
        # Find current maneuver
        v, omega = 0.0, 0.0
        for t_start, t_end, (v_cmd, omega_cmd) in maneuvers:
            if t_start <= elapsed < t_end:
                v, omega = v_cmd, omega_cmd
                break
        
        # Pack as Twist
        cmd_vel = np.array([v, 0.0, 0.0, 0.0, 0.0, omega])
        
        return {"cmd_vel": cmd_vel}
    
    node = ROSNode(
        node_name="simulated_teleop",
        callback=teleop_callback,
        subscribes_to=[],
        publishes_to=[
            ("cmd_vel", Twist, "/cmd_vel"),
        ],
        rate_hz=10.0)
    
    return node


# Create and start simulated teleop
print("Creating simulated keyboard teleop...")
teleop_node = create_simulated_teleop_node()
teleop_node.create_node()
teleop_node.start()
print("✓ Simulated teleop running (mimics keyboard input)")
print("\nThis simulates a human operator using teleop_twist_keyboard!")
print("In real usage, you would run: ros2 run teleop_twist_keyboard teleop_twist_keyboard")
# Create logger for teleoperation data
def create_teleop_logger():
    """Logger for teleoperation mode (no reference waypoints)."""
    teleop_data = {
        "time": [],
        "cmd_vel": [],
        "true_state": [],
        "odom": [],
        "estimate": [],
    }
    
    def logger_callback(tk, cmd_vel, true_state, odom, estimate):
        teleop_data["time"].append(tk)
        teleop_data["cmd_vel"].append(cmd_vel.copy())
        teleop_data["true_state"].append(true_state.copy())
        teleop_data["odom"].append(odom.copy())
        teleop_data["estimate"].append(estimate.copy())
        return {}
    
    node = ROSNode(
        node_name="teleop_logger",
        callback=logger_callback,
        subscribes_to=[
            ("/cmd_vel", Twist, "cmd_vel"),
            ("/true_state", Odometry, "true_state"),
            ("/odom", Odometry, "odom"),
            ("/estimate", Odometry, "estimate"),
        ],
        publishes_to=[],
        rate_hz=10.0)
    
    return node, teleop_data


# Create and start logger
teleop_logger, teleop_data = create_teleop_logger()
teleop_logger.create_node()
teleop_logger.start()
print("Data logger started!")

# Run teleoperation for 30 seconds
import time as pytime

T_teleop = 30.0
print(f"\nRunning teleoperation for {T_teleop} seconds...")
print("Robot is being controlled via simulated keyboard input!")
pytime.sleep(T_teleop)

print(f"\nTeleoperation complete! Collected {len(teleop_data['time'])} samples.")
# Stop all teleoperation nodes
print("Stopping teleoperation nodes...")
teleop_logger.stop()
teleop_node.stop()
simulator_teleop.stop()
kf_teleop.stop()

# Shutdown ROS2
rclpy.shutdown()
print("All nodes stopped. ROS2 shutdown complete.")
# Convert teleoperation data to arrays
time_teleop = np.array(teleop_data["time"])
true_teleop = np.array(teleop_data["true_state"])
est_teleop = np.array(teleop_data["estimate"])
meas_teleop = np.array(teleop_data["odom"])
cmd_teleop = np.array(teleop_data["cmd_vel"])

# Extract positions
true_x_t = true_teleop[:, 0]
true_y_t = true_teleop[:, 1]
est_x_t = est_teleop[:, 0]
est_y_t = est_teleop[:, 1]
meas_x_t = meas_teleop[:, 0]
meas_y_t = meas_teleop[:, 1]

# Extract velocities
v_cmd = cmd_teleop[:, 0]
omega_cmd = cmd_teleop[:, 5]

# Plotting
fig, axs = plt.subplots(2, 2, figsize=(14, 10))

# Plot 1: 2D Trajectory (teleoperation)
ax = axs[0, 0]
ax.plot(true_x_t, true_y_t, "b-", linewidth=2, label="True Trajectory", alpha=0.7)
ax.plot(est_x_t, est_y_t, "r--", linewidth=2, label="Estimated (EKF)", alpha=0.7)
ax.scatter(true_x_t[0], true_y_t[0], c="green", s=150, marker="o", label="Start", zorder=5)
ax.scatter(true_x_t[-1], true_y_t[-1], c="red", s=150, marker="X", label="End", zorder=5)
ax.set_xlabel("X Position (m)", fontsize=11)
ax.set_ylabel("Y Position (m)", fontsize=11)
ax.set_title("Teleoperation: Manual Control Trajectory", fontsize=13, fontweight="bold")
ax.legend()
ax.grid(True, alpha=0.3)
ax.axis("equal")

# Plot 2: Velocity Commands
ax = axs[0, 1]
ax.plot(time_teleop, v_cmd, "b-", label="Linear Velocity (v)", alpha=0.7)
ax.plot(time_teleop, omega_cmd, "r-", label="Angular Velocity (ω)", alpha=0.7)
ax.set_xlabel("Time (s)", fontsize=11)
ax.set_ylabel("Command (m/s or rad/s)", fontsize=11)
ax.set_title("Teleop Commands to /cmd_vel", fontsize=13, fontweight="bold")
ax.legend()
ax.grid(True, alpha=0.3)

# Plot 3: X Position Tracking
ax = axs[1, 0]
ax.plot(time_teleop, true_x_t, "b-", label="True X", alpha=0.7)
ax.plot(time_teleop, est_x_t, "r--", linewidth=2, label="Estimated X", alpha=0.7)
ax.scatter(time_teleop[::10], meas_x_t[::10], c="gray", s=10, alpha=0.3, label="Measurements")
ax.set_xlabel("Time (s)", fontsize=11)
ax.set_ylabel("X Position (m)", fontsize=11)
ax.set_title("X Position: Teleop Mode", fontsize=13, fontweight="bold")
ax.legend()
ax.grid(True, alpha=0.3)

# Plot 4: Estimation Error
ax = axs[1, 1]
position_error_t = np.sqrt((true_x_t - est_x_t) ** 2 + (true_y_t - est_y_t) ** 2)
ax.plot(time_teleop, position_error_t, "m-", linewidth=1.5, label="Position Error")
ax.set_xlabel("Time (s)", fontsize=11)
ax.set_ylabel("Error (m)", fontsize=11)
ax.set_title("KF Error During Teleoperation", fontsize=13, fontweight="bold")
ax.legend()
ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print(f"\nTeleoperation Performance:")
print(f"  Mean position error: {np.mean(position_error_t):.4f} m")
print(f"  Max position error: {np.max(position_error_t):.4f} m")
print(f"  Total distance traveled: {np.sum(np.sqrt(np.diff(true_x_t)**2 + np.diff(true_y_t)**2)):.2f} m")

Summary: Two ROS Architectures Demonstrated

This notebook demonstrated two distinct control architectures for the TurtleBot ROS system:

Architecture 1: Autonomous Navigation (Waypoint Tracking)

Nodes Active:

  • ✅ Waypoint Generator (pykal.ROSNode)

  • ✅ Velocity Controller (pykal.ROSNode)

  • ✅ TurtleBot Simulator (pykal.ROSNode)

  • ✅ Kalman Filter (pykal.ROSNode)

Data Flow: Waypoint Gen → Controller → /cmd_vel → Plant → Observer → (feedback)

Use Case: Autonomous navigation with high-level planning and control


Architecture 2: Teleoperation (Manual Control)

Nodes Active:

  • 🔴 Keyboard Teleop (external tool: teleop_twist_keyboard)

  • ✅ TurtleBot Simulator (pykal.ROSNode)

  • ✅ Kalman Filter (pykal.ROSNode)

Nodes Bypassed:

  • ❌ Waypoint Generator (not needed)

  • ❌ Velocity Controller (not needed)

Data Flow: Teleop → /cmd_vel → Plant → Observer

Use Case: Direct manual control for intervention, testing, or operation


Key Insights

  1. Modularity: ROS architecture allows easy mode switching by changing active nodes

  2. Red Nodes: External tools (teleop) are shown in RED in diagrams to distinguish from pykal.ROSNode wrappers

  3. Observer Persistence: Kalman filter runs in both modes for state estimation

  4. Same Interface: Both modes use /cmd_vel topic, making them interchangeable

  5. Common Pattern: This dual-mode architecture (autonomous + teleop) is standard in robotics

Next Steps: The Gazebo integration tutorial replaces the simulator with physics simulation, demonstrating the same dual-mode architecture in a realistic environment!

Visualization: Teleoperation Results

Stopping Teleoperation Nodes

Data Collection During Teleoperation

Running the Keyboard Teleop Tool

To control the TurtleBot manually, we use the standard ROS2 teleop_twist_keyboard package.

In a separate terminal, run:

ros2 run teleop_twist_keyboard teleop_twist_keyboard

Keyboard Controls:

  • i - Move forward

  • k - Stop

  • , - Move backward

  • j - Turn left

  • l - Turn right

  • u - Forward + left

  • o - Forward + right

  • m - Backward + left

  • . - Backward + right

  • q/z - Increase/decrease max speeds

Note

The teleop tool publishes Twist messages to /cmd_vel, which the simulator subscribes to. The Kalman filter monitors the system through /odom and /cmd_vel topics.

For this notebook demo, we’ll simulate teleop commands programmatically to demonstrate the architecture:

Setting Up Teleoperation Mode

We only need to run the simulator and observer nodes. The waypoint generator and controller are not used in teleoperation mode.

← Dynamical Systems as ROS Nodes