{ "cells": [ { "cell_type": "markdown", "source": [ "[← Dynamical Systems as ROS Nodes](../../../getting_started/python_to_ros/dynamical_systems_as_ros_nodes.rst)\n" ], "id": "cell-0", "metadata": {} }, { "cell_type": "markdown", "source": [ "# TurtleBot State Estimation: From Python to ROS2\n" ], "id": "cell-1", "metadata": {} }, { "cell_type": "markdown", "source": [ "In the [TurtleBot State Estimation](../theory_to_python/turtlebot_state_estimation.ipynb) tutorial, we designed a complete navigation system in Python using `DynamicalSystem` components. Now we'll deploy this exact system as **ROS2 nodes**.\n", "\n", "**Key Insight**: The dynamical system architecture maps directly to ROS!\n", "\n", "Each `DynamicalSystem` becomes a ROS node, and parameter dictionary connections become ROS topics. This 1:1 correspondence preserves our theoretical design while enabling distributed deployment.\n" ], "id": "cell-2", "metadata": {} }, { "cell_type": "markdown", "id": "cell-conceptual-foundation", "metadata": {}, "source": "# Conceptual Foundation: From Dynamical Systems to ROS Nodes\n\nRecall from the [TurtleBot State Estimation](../theory_to_python/turtlebot_state_estimation.ipynb) tutorial that we represented our navigation system as a composition of dynamical systems:\n\n
\n \n
\n\nEach block is a `DynamicalSystem` with state evolution (`f`) and observation (`h`) functions. Data flows through a shared parameter dictionary.\n\n## The ROS Mapping\n\nThere is a direct correspondence between this dynamical system representation and a ROS node architecture:\n\n
\n \n
\n\n**The correspondence**:\n- Each **dynamical system block** becomes a **ROS node**\n- Each **data connection** becomes a **ROS topic**\n- The **parameter dictionary** is replaced by **message passing**\n- Each **step()** method becomes a **callback function**\n\n**Mathematically**: There is a functor from the category of discrete-time dynamical systems (objects = blocks, morphisms = data flows) to the category of ROS nodes (objects = nodes, morphisms = topics).\n\n## Adding Teleoperation\n\nIn the Python simulation, we couldn't interactively control the robot during execution because Jupyter kernels lack elegant real-time input handling. However, ROS middleware naturally supports this through teleop nodes!\n\nTeleoperation provides **direct velocity control**, bypassing the high-level navigation pipeline entirely:\n\n
\n \n
\n\n**Key Architectural Difference**:\n- **Autonomous Navigation**: Waypoint Generator → Velocity Controller → `/cmd_vel` → Plant\n- **Teleoperation**: Keyboard Teleop → `/cmd_vel` → Plant\n\nThe **Keyboard Teleop** node publishes velocity commands directly to `/cmd_vel`, completely bypassing both the waypoint generator and position controller. The observer (Kalman filter) continues running to provide state estimates for monitoring and visualization.\n\nThis design pattern is common in robotics: teleoperation provides low-level control for manual operation, while autonomous mode uses the full planning and control pipeline.\n\n:::{note}\nThis tutorial implements the basic ROS node architecture (without teleop). The teleop version will be demonstrated in a later tutorial.\n:::\n\n## Individual Node Blocks\n\nEach ROS node wraps a dynamical system block. Here are the individual components:\n\n### Waypoint Generator\n
\n \n
\n\n### Velocity Controller\n
\n \n
\n\n### TurtleBot Plant\n
\n \n
\n\n### EKF Observer\n
\n \n
\n\nIn the following sections, we'll implement each of these blocks as `ROSNode` instances, demonstrating how theoretical design maps directly to ROS deployment." }, { "cell_type": "markdown", "source": [ "## Architecture Comparison: Python Simulation vs ROS Nodes\n", "\n", "**Python Simulation** (theory notebook):\n", "\n", "```python\n", "# Direct function calls, shared parameter dictionary\n", "for tk in time_steps:\n", " rk = waypoint_generator(...) # Returns reference\n", " xhat = observer.h(xhat_P) # Extract estimate\n", " uk = controller(xhat, rk, ...) # Generate command\n", " xk, yk = plant.step(...) # Update plant\n", " xhat_P = observer.step(...) # Update observer\n", "```\n", "\n", "**ROS2 Nodes** (this notebook):\n", "\n", "```\n", "waypoint_generator_node → /reference (PoseStamped)\n", " ↓\n", "velocity_controller_node → /cmd_vel (Twist)\n", " ↓\n", "turtlebot_simulator_node → /odom (Odometry)\n", " ↓\n", "kalman_filter_node → /estimate (Odometry)\n", " ↓\n", " (feedback via /estimate topic)\n", "```\n", "\n", ":::{note}\n", "The ROS graph **is** the block diagram! Each function becomes a node, each data flow becomes a topic.\n", ":::\n" ], "id": "cell-3", "metadata": {} }, { "cell_type": "markdown", "source": [ "## ROS2 System Architecture\n", "\n", "Our system consists of four ROS nodes:\n", "\n", "![TurtleBot ROS System](../../../../_static/tutorial/theory_to_python/turtlebot_feedback_system.svg)\n", "\n", "**Node 1: Waypoint Generator**\n", "- **Publishes**: `/reference` (PoseStamped)\n", "- **Function**: Generates reference poses for tracking\n", "\n", "**Node 2: Velocity Controller**\n", "- **Subscribes**: `/reference`, `/estimate`\n", "- **Publishes**: `/cmd_vel` (Twist)\n", "- **Function**: Proportional control to track reference\n", "\n", "**Node 3: TurtleBot Simulator**\n", "- **Subscribes**: `/cmd_vel`\n", "- **Publishes**: `/odom` (noisy), `/true_state` (clean)\n", "- **Function**: Simulates robot dynamics and sensors\n", "\n", "**Node 4: Kalman Filter Observer**\n", "- **Subscribes**: `/odom`, `/cmd_vel`\n", "- **Publishes**: `/estimate` (Odometry)\n", "- **Function**: Fuses measurements and predictions\n", "" ], "id": "cell-4", "metadata": {} }, { "cell_type": "markdown", "source": [ "## Setup: Imports and Core Functions\n" ], "id": "cell-5", "metadata": {} }, { "cell_type": "code", "source": [ "import numpy as np\n", "import matplotlib.pyplot as plt\n", "from pykal import DynamicalSystem\n", "from pykal.ros.ros_node import ROSNode\n", "from pykal.algorithm_library.estimators.kf import KF\n", "from pykal.data_change import corrupt\n", "\n", "# ROS message types\n", "from geometry_msgs.msg import Twist, PoseStamped\n", "from nav_msgs.msg import Odometry\n", "import rclpy\n", "import time\n", "\n", "print(\"✓ Imports successful!\")\n" ], "execution_count": null, "outputs": [], "id": "cell-6", "metadata": { "tags": [ "skip-execution" ] } }, { "cell_type": "markdown", "source": [ "### Core TurtleBot Functions (from Theory Notebook)\n", "\n", "We reuse the exact same dynamics and Jacobian functions from the theory notebook:\n" ], "id": "cell-7", "metadata": {} }, { "cell_type": "code", "source": [ "# ============================================================================\n", "# TurtleBot Dynamics\n", "# ============================================================================\n", "\n", "\n", "def turtlebot_f(xk: np.ndarray, uk: np.ndarray, dt: float) -> np.ndarray:\n", " \"\"\"Unicycle kinematics: [x, y, theta, v, omega].\"\"\"\n", " x, y, theta, v, omega = xk.flatten()\n", " v_cmd, omega_cmd = uk.flatten()\n", "\n", " x_new = x + v * np.cos(theta) * dt\n", " y_new = y + v * np.sin(theta) * dt\n", " theta_new = np.arctan2(np.sin(theta + omega * dt), np.cos(theta + omega * dt))\n", "\n", " return np.array([[x_new], [y_new], [theta_new], [v_cmd], [omega_cmd]])\n", "\n", "\n", "def turtlebot_h(xk: np.ndarray) -> np.ndarray:\n", " \"\"\"Measurement: [x, y, theta] from odometry.\"\"\"\n", " return xk[:3, :]\n", "\n", "\n", "# ============================================================================\n", "# EKF Jacobians\n", "# ============================================================================\n", "\n", "\n", "def compute_F_jacobian(xhat: np.ndarray, dt: float) -> np.ndarray:\n", " \"\"\"Jacobian of dynamics.\"\"\"\n", " _, _, theta, v, _ = xhat.flatten()\n", " return np.array(\n", " [\n", " [1, 0, -v * np.sin(theta) * dt, np.cos(theta) * dt, 0],\n", " [0, 1, v * np.cos(theta) * dt, np.sin(theta) * dt, 0],\n", " [0, 0, 1, 0, dt],\n", " [0, 0, 0, 1, 0],\n", " [0, 0, 0, 0, 1],\n", " ]\n", " )\n", "\n", "\n", "def compute_H_jacobian() -> np.ndarray:\n", " \"\"\"Jacobian of measurement.\"\"\"\n", " return np.array([[1, 0, 0, 0, 0], [0, 1, 0, 0, 0], [0, 0, 1, 0, 0]])\n", "\n", "\n", "# ============================================================================\n", "# Noise Covariances\n", "# ============================================================================\n", "\n", "Q_turtlebot = np.diag([0.01, 0.01, 0.02, 0.1, 0.1]) # Process noise\n", "R_turtlebot = np.diag([0.05, 0.05, 0.1]) # Measurement noise\n", "\n", "print(\"Core TurtleBot functions defined!\")" ], "execution_count": null, "outputs": [], "id": "cell-8", "metadata": {} }, { "cell_type": "markdown", "source": [ "## Node 1: Waypoint Generator\n", "\n", "**ROS Wrapper Pattern**: We wrap our waypoint logic in a `ROSNode` that publishes `/reference`.\n", "\n", "**Message Format**: `PoseStamped` = `(8,)` array `[t, px, py, pz, qx, qy, qz, qw]`\n", "\n", "**Key Changes from Python**:\n", "- Python: Returns NumPy array directly\n", "- ROS: Publishes to `/reference` topic\n", "- Message conversion handled automatically by `ROSNode`\n" ], "id": "cell-9", "metadata": {} }, { "cell_type": "code", "source": [ "def create_waypoint_generator_node(waypoints, switch_time=15.0, rate_hz=10.0):\n", " \"\"\"\n", " Create ROS node that publishes waypoints.\n", "\n", " Parameters\n", " ----------\n", " waypoints : list of tuples\n", " Each tuple is (x, y, theta)\n", " switch_time : float\n", " Time to spend at each waypoint\n", " rate_hz : float\n", " Publishing rate\n", "\n", " Returns\n", " -------\n", " ROSNode\n", " Configured waypoint generator node\n", " \"\"\"\n", " # State: current waypoint index and time elapsed\n", " current_idx = 0\n", " time_at_waypoint = 0.0\n", " last_tk = 0.0\n", "\n", " def waypoint_callback(tk):\n", " nonlocal current_idx, time_at_waypoint, last_tk\n", "\n", " # Compute elapsed time\n", " dt = tk - last_tk if last_tk > 0 else 0.0\n", " last_tk = tk\n", " time_at_waypoint += dt\n", "\n", " # Switch waypoint if needed\n", " if time_at_waypoint >= switch_time:\n", " current_idx = (current_idx + 1) % len(waypoints)\n", " time_at_waypoint = 0.0\n", "\n", " # Get current waypoint\n", " x_r, y_r, theta_r = waypoints[current_idx]\n", "\n", " # Convert theta to quaternion (2D: only yaw rotation)\n", " qx, qy = 0.0, 0.0\n", " qz = np.sin(theta_r / 2.0)\n", " qw = np.cos(theta_r / 2.0)\n", "\n", " # Pack as PoseStamped: [t, px, py, pz, qx, qy, qz, qw]\n", " reference = np.array([tk, x_r, y_r, 0.0, qx, qy, qz, qw])\n", "\n", " return {\"reference\": reference}\n", "\n", " # Create ROS node\n", " node = ROSNode(\n", " node_name=\"waypoint_generator\",\n", " callback=waypoint_callback,\n", " subscribes_to=[], # No subscriptions\n", " publishes_to=[\n", " (\"reference\", PoseStamped, \"/reference\"),\n", " ],\n", " rate_hz=rate_hz)\n", "\n", " return node\n", "\n", "\n", "# Create waypoint generator\n", "square_waypoints = [\n", " (2.0, 0.0, 0.0), # Right\n", " (2.0, 2.0, np.pi / 2), # Up\n", " (0.0, 2.0, np.pi), # Left\n", " (0.0, 0.0, -np.pi / 2), # Down\n", "]\n", "\n", "waypoint_node = create_waypoint_generator_node(\n", " waypoints=square_waypoints, switch_time=15.0, rate_hz=10.0\n", ")\n", "\n", "print(\"Waypoint generator node created!\")\n", "print(f\" Publishes: /reference (PoseStamped)\")\n", "print(f\" Waypoints: {len(square_waypoints)} points in square pattern\")\n", "print(f\" Switch time: 15.0 seconds\")" ], "execution_count": null, "outputs": [], "id": "cell-10", "metadata": { "tags": [ "skip-execution" ] } }, { "cell_type": "markdown", "source": [ "## Node 2: Velocity Controller\n", "\n", "**ROS Wrapper Pattern**: Subscribes to `/reference` and `/estimate`, publishes `/cmd_vel`.\n", "\n", "**Controller Law**:\n", "\n", "$$\n", "v_{cmd} = K_v \\cdot \\| \\vec{r} - \\hat{\\vec{p}} \\|\n", "$$\n", "\n", "$$\n", "\\omega_{cmd} = K_\\omega \\cdot (\\theta_r - \\hat{\\theta})\n", "$$\n", "\n", "**Key Changes from Python**:\n", "- Python: Direct function call with parameters\n", "- ROS: Callback receives messages as kwargs\n", "- Automatic message → NumPy conversion\n" ], "id": "cell-11", "metadata": {} }, { "cell_type": "code", "source": [ "def create_velocity_controller_node(Kv=0.5, Komega=1.5, rate_hz=50.0):\n", " \"\"\"\n", " Create ROS node for proportional velocity controller.\n", "\n", " Parameters\n", " ----------\n", " Kv : float\n", " Linear velocity gain\n", " Komega : float\n", " Angular velocity gain\n", " rate_hz : float\n", " Control loop rate\n", "\n", " Returns\n", " -------\n", " ROSNode\n", " Configured controller node\n", " \"\"\"\n", "\n", " def controller_callback(tk, reference, estimate):\n", " \"\"\"\n", " Compute velocity command.\n", "\n", " Args:\n", " tk (float): Current time\n", " reference (np.ndarray): (8) PoseStamped [t, px, py, pz, qx, qy, qz, qw]\n", " estimate (np.ndarray): (13) Odometry [px, py, pz, qx, qy, qz, qw, vx, vy, vz, wx, wy, wz]\n", "\n", " Returns:\n", " dict: {'cmd_vel': (6) Twist array}\n", " \"\"\"\n", " # Extract reference position and heading\n", " x_r, y_r = reference[1], reference[2] # Skip timestamp\n", " qz_r, qw_r = reference[6], reference[7]\n", " theta_r = np.arctan2(2.0 * qw_r * qz_r, 1.0 - 2.0 * qz_r**2)\n", "\n", " # Extract current position and heading from estimate\n", " x, y = estimate[0], estimate[1]\n", " qz, qw = estimate[5], estimate[6]\n", " theta = np.arctan2(2.0 * qw * qz, 1.0 - 2.0 * qz**2)\n", "\n", " # Compute errors\n", " dx = x_r - x\n", " dy = y_r - y\n", " distance = np.sqrt(dx**2 + dy**2)\n", "\n", " heading_error = theta_r - theta\n", " heading_error = np.arctan2(np.sin(heading_error), np.cos(heading_error))\n", "\n", " # Proportional control\n", " v_cmd = Kv * distance\n", " omega_cmd = Komega * heading_error\n", "\n", " # Saturate (TurtleBot limits)\n", " v_cmd = np.clip(v_cmd, -0.22, 0.22)\n", " omega_cmd = np.clip(omega_cmd, -2.84, 2.84)\n", "\n", " # Pack as Twist: [vx, vy, vz, wx, wy, wz]\n", " cmd_vel = np.array([v_cmd, 0.0, 0.0, 0.0, 0.0, omega_cmd])\n", "\n", " return {\"cmd_vel\": cmd_vel}\n", "\n", " # Create ROS node\n", " node = ROSNode(\n", " node_name=\"velocity_controller\",\n", " callback=controller_callback,\n", " subscribes_to=[\n", " (\"/reference\", PoseStamped, \"reference\"),\n", " (\"/estimate\", Odometry, \"estimate\"),\n", " ],\n", " publishes_to=[\n", " (\"cmd_vel\", Twist, \"/cmd_vel\"),\n", " ],\n", " rate_hz=rate_hz,\n", " required_topics={\"reference\", \"estimate\"}, # Both required for control\n", " )\n", "\n", " return node\n", "\n", "\n", "# Create controller node\n", "controller_node = create_velocity_controller_node(Kv=0.5, Komega=1.5, rate_hz=50.0)\n", "\n", "print(\"Velocity controller node created!\")\n", "print(f\" Subscribes: /reference, /estimate\")\n", "print(f\" Publishes: /cmd_vel (Twist)\")\n", "print(f\" Gains: Kv={0.5}, Komega={1.5}\")\n", "print(f\" Rate: 50 Hz\")" ], "execution_count": null, "outputs": [], "id": "cell-12", "metadata": { "tags": [ "skip-execution" ] } }, { "cell_type": "markdown", "source": [ "## Node 3: TurtleBot Simulator (Plant)\n", "\n", "**ROS Wrapper Pattern**: Subscribes to `/cmd_vel`, publishes `/odom` and `/true_state`.\n", "\n", "**Dynamics**: Same unicycle model from theory notebook\n", "\n", "$$\n", "\\begin{aligned}\n", "x_{k+1} &= x_k + v_k \\cos(\\theta_k) \\Delta t \\\\\n", "y_{k+1} &= y_k + v_k \\sin(\\theta_k) \\Delta t \\\\\n", "\\theta_{k+1} &= \\theta_k + \\omega_k \\Delta t\n", "\\end{aligned}\n", "$$\n", "\n", "**Measurement Noise**: Adds realistic odometry noise + occasional spikes\n", "\n", ":::{note}\n", "We publish both `/odom` (noisy, like a real robot) and `/true_state` (clean, for visualization).\n", ":::\n" ], "id": "cell-13", "metadata": {} }, { "cell_type": "code", "source": [ "def create_turtlebot_simulator_node(dt=0.1, rate_hz=10.0, R=None):\n", " \"\"\"\n", " Create ROS node that simulates TurtleBot dynamics.\n", "\n", " Parameters\n", " ----------\n", " dt : float\n", " Integration timestep\n", " rate_hz : float\n", " Simulation rate\n", " R : np.ndarray\n", " Measurement noise covariance (3x3)\n", "\n", " Returns\n", " -------\n", " ROSNode\n", " Configured simulator node\n", " \"\"\"\n", " if R is None:\n", " R = np.diag([0.05, 0.05, 0.1]) # Default odometry noise\n", "\n", " # Internal state: [x, y, theta, v, omega]\n", " xk = np.array([[0.0], [0.0], [0.0], [0.0], [0.0]])\n", "\n", " def simulator_callback(tk, cmd_vel):\n", " \"\"\"\n", " Simulate one timestep of TurtleBot dynamics.\n", "\n", " Args:\n", " tk (float): Current time\n", " cmd_vel (np.ndarray): (6) Twist [vx, vy, vz, wx, wy, wz]\n", "\n", " Returns:\n", " dict: {'odom': noisy Odometry, 'true_state': clean Odometry}\n", " \"\"\"\n", " nonlocal xk\n", "\n", " # Extract commands (only use vx and wz for 2D motion)\n", " uk = np.array([[cmd_vel[0]], [cmd_vel[5]]])\n", "\n", " # Update state using dynamics\n", " xk = turtlebot_f(xk, uk, dt)\n", "\n", " # Convert to Odometry format: [px, py, pz, qx, qy, qz, qw, vx, vy, vz, wx, wy, wz]\n", " x, y, theta, v, omega = xk.flatten()\n", "\n", " # Convert theta to quaternion\n", " qx, qy = 0.0, 0.0\n", " qz = np.sin(theta / 2.0)\n", " qw = np.cos(theta / 2.0)\n", "\n", " # True state (clean)\n", " true_state = np.array(\n", " [\n", " x,\n", " y,\n", " 0.0, # position\n", " qx,\n", " qy,\n", " qz,\n", " qw, # orientation\n", " v * np.cos(theta),\n", " v * np.sin(theta),\n", " 0.0, # linear velocity\n", " 0.0,\n", " 0.0,\n", " omega, # angular velocity\n", " ]\n", " )\n", "\n", " # Add realistic measurement noise\n", " # Corrupt position and orientation\n", " noise_pos = np.random.multivariate_normal(np.zeros(3), R)\n", " x_noisy = x + noise_pos[0]\n", " y_noisy = y + noise_pos[1]\n", " theta_noisy = theta + noise_pos[2]\n", "\n", " # Add occasional spikes to heading (bumps/wheel slip)\n", " if np.random.rand() < 0.02: # 2% spike rate\n", " theta_noisy += np.random.choice([-1, 1]) * 0.3 # ~17 degrees\n", "\n", " # Convert noisy theta to quaternion\n", " qz_noisy = np.sin(theta_noisy / 2.0)\n", " qw_noisy = np.cos(theta_noisy / 2.0)\n", "\n", " # Noisy odometry\n", " odom = np.array(\n", " [\n", " x_noisy,\n", " y_noisy,\n", " 0.0,\n", " qx,\n", " qy,\n", " qz_noisy,\n", " qw_noisy,\n", " v * np.cos(theta),\n", " v * np.sin(theta),\n", " 0.0,\n", " 0.0,\n", " 0.0,\n", " omega,\n", " ]\n", " )\n", "\n", " return {\"odom\": odom, \"true_state\": true_state}\n", "\n", " # Create ROS node\n", " node = ROSNode(\n", " node_name=\"turtlebot_simulator\",\n", " callback=simulator_callback,\n", " subscribes_to=[\n", " (\"/cmd_vel\", Twist, \"cmd_vel\"),\n", " ],\n", " publishes_to=[\n", " (\"odom\", Odometry, \"/odom\"),\n", " (\"true_state\", Odometry, \"/true_state\"),\n", " ],\n", " rate_hz=rate_hz)\n", "\n", " return node\n", "\n", "\n", "# Create simulator node\n", "simulator_node = create_turtlebot_simulator_node(dt=0.1, rate_hz=10.0, R=R_turtlebot)\n", "\n", "print(\"TurtleBot simulator node created!\")\n", "print(f\" Subscribes: /cmd_vel (Twist)\")\n", "print(f\" Publishes: /odom (noisy), /true_state (clean)\")\n", "print(f\" Dynamics: Unicycle model with dt={0.1}s\")\n", "print(f\" Noise: R = diag([0.05, 0.05, 0.1]) + occasional spikes\")" ], "execution_count": null, "outputs": [], "id": "cell-14", "metadata": { "tags": [ "skip-execution" ] } }, { "cell_type": "markdown", "source": [ "## Node 4: Extended Kalman Filter Observer\n", "\n", "**ROS Wrapper Pattern**: Subscribes to `/odom` and `/cmd_vel`, publishes `/estimate`.\n", "\n", "**Algorithm**: Same EKF from theory notebook\n", "\n", "**Prediction Step**:\n", "\n", "$$\n", "\\hat{x}_k^- = f(\\hat{x}_{k-1}, u_k)\n", "$$\n", "\n", "$$\n", "P_k^- = F_k P_{k-1} F_k^T + Q\n", "$$\n", "\n", "**Update Step**:\n", "\n", "$$\n", "K_k = P_k^- H_k^T (H_k P_k^- H_k^T + R)^{-1}\n", "$$\n", "\n", "$$\n", "\\hat{x}_k = \\hat{x}_k^- + K_k (y_k - h(\\hat{x}_k^-))\n", "$$\n", "\n", "**Key Changes from Python**:\n", "- Python: `observer.step()` with direct array passing\n", "- ROS: Callback receives ROS messages, calls `KF.f()`, publishes result\n" ], "id": "cell-15", "metadata": {} }, { "cell_type": "code", "source": [ "def create_kalman_filter_node(dt=0.1, rate_hz=10.0, Q=None, R=None):\n", " \"\"\"\n", " Create ROS node for Extended Kalman Filter.\n", "\n", " Parameters\n", " ----------\n", " dt : float\n", " Filter timestep\n", " rate_hz : float\n", " Filter rate\n", " Q : np.ndarray\n", " Process noise covariance (5x5)\n", " R : np.ndarray\n", " Measurement noise covariance (3x3)\n", "\n", " Returns\n", " -------\n", " ROSNode\n", " Configured Kalman filter node\n", " \"\"\"\n", " if Q is None:\n", " Q = np.diag([0.01, 0.01, 0.02, 0.1, 0.1])\n", " if R is None:\n", " R = np.diag([0.05, 0.05, 0.1])\n", "\n", " # Initial estimate\n", " xhat = np.array([[0.0], [0.0], [0.0], [0.0], [0.0]])\n", " P = np.diag([0.1, 0.1, 0.1, 1.0, 1.0])\n", "\n", " def filter_callback(tk, odom, cmd_vel):\n", " \"\"\"\n", " Run one step of EKF.\n", "\n", " Args:\n", " tk (float): Current time\n", " odom (np.ndarray): (13) Odometry measurement\n", " cmd_vel (np.ndarray): (6) Twist command\n", "\n", " Returns:\n", " dict: {'estimate': (13) Odometry with state estimate}\n", " \"\"\"\n", " nonlocal xhat, P\n", "\n", " # Extract measurement [x, y, theta] from Odometry\n", " x_meas, y_meas = odom[0], odom[1]\n", " qz_meas, qw_meas = odom[5], odom[6]\n", " theta_meas = np.arctan2(2.0 * qw_meas * qz_meas, 1.0 - 2.0 * qz_meas**2)\n", " yk = np.array([[x_meas], [y_meas], [theta_meas]])\n", "\n", " # Extract control input [v_cmd, omega_cmd]\n", " uk = np.array([[cmd_vel[0]], [cmd_vel[5]]])\n", "\n", " # Compute Jacobians\n", " Fk = compute_F_jacobian(xhat, dt)\n", " Hk = compute_H_jacobian()\n", "\n", " # Run EKF update using pykal's KF.f\n", " f_params = {\"xk\": xhat, \"uk\": uk, \"dt\": dt}\n", " h_params = {\"xk\": xhat}\n", "\n", " xhat_new, P_new = KF.f(\n", " x_P=(xhat, P),\n", " y=yk,\n", " u=uk,\n", " f=turtlebot_f,\n", " F=lambda **params: Fk,\n", " Q=lambda **params: Q,\n", " h=turtlebot_h,\n", " H=lambda **params: Hk,\n", " R=lambda **params: R,\n", " f_params=f_params,\n", " F_params={},\n", " Q_params={},\n", " h_params=h_params,\n", " H_params={},\n", " R_params={})\n", "\n", " # Update state\n", " xhat = xhat_new\n", " P = P_new\n", "\n", " # Extract estimate from state vector\n", " x_est, y_est, theta_est, v_est, omega_est = xhat.flatten()\n", "\n", " # Convert to Odometry format\n", " qx, qy = 0.0, 0.0\n", " qz_est = np.sin(theta_est / 2.0)\n", " qw_est = np.cos(theta_est / 2.0)\n", "\n", " estimate = np.array(\n", " [\n", " x_est,\n", " y_est,\n", " 0.0,\n", " qx,\n", " qy,\n", " qz_est,\n", " qw_est,\n", " v_est * np.cos(theta_est),\n", " v_est * np.sin(theta_est),\n", " 0.0,\n", " 0.0,\n", " 0.0,\n", " omega_est,\n", " ]\n", " )\n", "\n", " return {\"estimate\": estimate}\n", "\n", " # Create ROS node\n", " node = ROSNode(\n", " node_name=\"kalman_filter\",\n", " callback=filter_callback,\n", " subscribes_to=[\n", " (\"/odom\", Odometry, \"odom\"),\n", " (\"/cmd_vel\", Twist, \"cmd_vel\"),\n", " ],\n", " publishes_to=[\n", " (\"estimate\", Odometry, \"/estimate\"),\n", " ],\n", " rate_hz=rate_hz,\n", " required_topics={\"odom\", \"cmd_vel\"}, # Need both for EKF\n", " )\n", "\n", " return node\n", "\n", "\n", "# Create Kalman filter node\n", "kf_node = create_kalman_filter_node(dt=0.1, rate_hz=10.0, Q=Q_turtlebot, R=R_turtlebot)\n", "\n", "print(\"Kalman filter node created!\")\n", "print(f\" Subscribes: /odom, /cmd_vel\")\n", "print(f\" Publishes: /estimate (Odometry)\")\n", "print(f\" Algorithm: Extended Kalman Filter (EKF)\")\n", "print(f\" Q: Process noise covariance (5x5)\")\n", "print(f\" R: Measurement noise covariance (3x3)\")" ], "execution_count": null, "outputs": [], "id": "cell-16", "metadata": { "tags": [ "skip-execution" ] } }, { "cell_type": "markdown", "source": [ "## Running the Complete ROS2 System\n", "\n", "Now we initialize ROS2, create all nodes, and start the distributed system:\n" ], "id": "cell-17", "metadata": {} }, { "cell_type": "code", "source": [ "# Initialize ROS2\n", "rclpy.init()\n", "\n", "# Create all nodes\n", "print(\"Creating ROS2 nodes...\")\n", "waypoint_node.create_node()\n", "controller_node.create_node()\n", "simulator_node.create_node()\n", "kf_node.create_node()\n", "print(\"All nodes created!\")\n", "\n", "# Start all nodes\n", "print(\"\\nStarting nodes...\")\n", "waypoint_node.start()\n", "print(\" ✓ Waypoint generator running\")\n", "simulator_node.start()\n", "print(\" ✓ TurtleBot simulator running\")\n", "kf_node.start()\n", "print(\" ✓ Kalman filter running\")\n", "controller_node.start()\n", "print(\" ✓ Velocity controller running\")\n", "\n", "print(\"\\n🚀 TurtleBot ROS system is live!\")\n", "print(\"\\nROS2 Topic Graph:\")\n", "print(\" /reference ← waypoint_generator\")\n", "print(\" ↓\")\n", "print(\" velocity_controller → /cmd_vel\")\n", "print(\" ↓\")\n", "print(\" turtlebot_simulator → /odom, /true_state\")\n", "print(\" ↓\")\n", "print(\" kalman_filter → /estimate\")\n", "print(\" ↓ (feedback)\")\n", "print(\" velocity_controller\")\n", "\n", "print(\"\\n💡 Tip: Run 'rqt_graph' in a terminal to visualize the node graph!\")\n", "print(\" You'll see that the ROS graph matches the block diagram.\")" ], "execution_count": null, "outputs": [], "id": "cell-18", "metadata": { "tags": [ "skip-execution" ] } }, { "cell_type": "markdown", "source": [ "### Data Collection for Analysis\n", "\n", "We create a logger node to subscribe to all topics and collect data:\n" ], "id": "cell-19", "metadata": {} }, { "cell_type": "code", "source": [ "# Data collection node\n", "def create_data_logger_node():\n", " \"\"\"\n", " Create a node that logs all published data for analysis.\n", " \"\"\"\n", " data_log = {\n", " \"time\": [],\n", " \"reference\": [],\n", " \"cmd_vel\": [],\n", " \"true_state\": [],\n", " \"odom\": [],\n", " \"estimate\": [],\n", " }\n", "\n", " def logger_callback(tk, reference, cmd_vel, true_state, odom, estimate):\n", " \"\"\"Log all topics.\"\"\"\n", " data_log[\"time\"].append(tk)\n", " data_log[\"reference\"].append(reference.copy())\n", " data_log[\"cmd_vel\"].append(cmd_vel.copy())\n", " data_log[\"true_state\"].append(true_state.copy())\n", " data_log[\"odom\"].append(odom.copy())\n", " data_log[\"estimate\"].append(estimate.copy())\n", "\n", " return {} # No publications\n", "\n", " node = ROSNode(\n", " node_name=\"data_logger\",\n", " callback=logger_callback,\n", " subscribes_to=[\n", " (\"/reference\", PoseStamped, \"reference\"),\n", " (\"/cmd_vel\", Twist, \"cmd_vel\"),\n", " (\"/true_state\", Odometry, \"true_state\"),\n", " (\"/odom\", Odometry, \"odom\"),\n", " (\"/estimate\", Odometry, \"estimate\"),\n", " ],\n", " publishes_to=[],\n", " rate_hz=10.0)\n", "\n", " return node, data_log\n", "\n", "\n", "# Create and start logger\n", "logger_node, data_log = create_data_logger_node()\n", "logger_node.create_node()\n", "logger_node.start()\n", "print(\"Data logger started! Collecting data...\")\n", "\n", "# Run for 60 seconds\n", "import time as pytime\n", "\n", "T_sim = 60.0\n", "print(f\"\\nRunning simulation for {T_sim} seconds...\")\n", "pytime.sleep(T_sim)\n", "\n", "print(f\"\\nSimulation complete! Collected {len(data_log['time'])} samples.\")" ], "execution_count": null, "outputs": [], "id": "cell-20", "metadata": { "tags": [ "skip-execution" ] } }, { "cell_type": "markdown", "source": [ "### Stopping the System\n" ], "id": "cell-21", "metadata": {} }, { "cell_type": "code", "source": [ "# Stop all nodes\n", "print(\"Stopping nodes...\")\n", "logger_node.stop()\n", "waypoint_node.stop()\n", "controller_node.stop()\n", "simulator_node.stop()\n", "kf_node.stop()\n", "\n", "# Shutdown ROS2\n", "rclpy.shutdown()\n", "print(\"All nodes stopped. ROS2 shutdown complete.\")" ], "execution_count": null, "outputs": [], "id": "cell-22", "metadata": {} }, { "cell_type": "markdown", "source": [ "## Visualization and Analysis\n", "\n", "Let's visualize the ROS deployment results:\n" ], "id": "cell-23", "metadata": {} }, { "cell_type": "code", "source": [ "# Convert logged data to arrays\n", "time_vec = np.array(data_log[\"time\"])\n", "true_states = np.array(data_log[\"true_state\"])\n", "estimates = np.array(data_log[\"estimate\"])\n", "measurements = np.array(data_log[\"odom\"])\n", "commands = np.array(data_log[\"cmd_vel\"])\n", "references = np.array(data_log[\"reference\"])\n", "\n", "# Extract positions from Odometry arrays\n", "true_x = true_states[:, 0]\n", "true_y = true_states[:, 1]\n", "est_x = estimates[:, 0]\n", "est_y = estimates[:, 1]\n", "meas_x = measurements[:, 0]\n", "meas_y = measurements[:, 1]\n", "\n", "# Extract references from PoseStamped\n", "ref_x = references[:, 1] # Skip timestamp\n", "ref_y = references[:, 2]\n", "\n", "# Plotting\n", "fig, axs = plt.subplots(2, 2, figsize=(14, 10))\n", "\n", "# Plot 1: 2D Trajectory\n", "ax = axs[0, 0]\n", "ax.plot(true_x, true_y, \"b-\", linewidth=2, label=\"True Trajectory\", alpha=0.7)\n", "ax.plot(est_x, est_y, \"r--\", linewidth=2, label=\"Estimated (EKF)\", alpha=0.7)\n", "ax.scatter(\n", " ref_x[::50], ref_y[::50], c=\"green\", s=100, marker=\"*\", label=\"Waypoints\", zorder=5\n", ")\n", "ax.set_xlabel(\"X Position (m)\", fontsize=11)\n", "ax.set_ylabel(\"Y Position (m)\", fontsize=11)\n", "ax.set_title(\"ROS Deployment: 2D Trajectory\", fontsize=13, fontweight=\"bold\")\n", "ax.legend()\n", "ax.grid(True, alpha=0.3)\n", "ax.axis(\"equal\")\n", "\n", "# Plot 2: X Position\n", "ax = axs[0, 1]\n", "ax.plot(time_vec, true_x, \"b-\", label=\"True X\", alpha=0.7)\n", "ax.plot(time_vec, est_x, \"r--\", label=\"Estimated X\", alpha=0.7)\n", "ax.scatter(\n", " time_vec[::10], meas_x[::10], c=\"gray\", s=10, alpha=0.3, label=\"Measurements\"\n", ")\n", "ax.set_ylabel(\"X Position (m)\", fontsize=11)\n", "ax.set_title(\"X Position: ROS Topics\", fontsize=13, fontweight=\"bold\")\n", "ax.legend()\n", "ax.grid(True, alpha=0.3)\n", "\n", "# Plot 3: Y Position\n", "ax = axs[1, 0]\n", "ax.plot(time_vec, true_y, \"b-\", label=\"True Y\", alpha=0.7)\n", "ax.plot(time_vec, est_y, \"r--\", label=\"Estimated Y\", alpha=0.7)\n", "ax.scatter(\n", " time_vec[::10], meas_y[::10], c=\"gray\", s=10, alpha=0.3, label=\"Measurements\"\n", ")\n", "ax.set_xlabel(\"Time (s)\", fontsize=11)\n", "ax.set_ylabel(\"Y Position (m)\", fontsize=11)\n", "ax.set_title(\"Y Position: ROS Topics\", fontsize=13, fontweight=\"bold\")\n", "ax.legend()\n", "ax.grid(True, alpha=0.3)\n", "\n", "# Plot 4: Estimation Error\n", "ax = axs[1, 1]\n", "position_error = np.sqrt((true_x - est_x) ** 2 + (true_y - est_y) ** 2)\n", "ax.plot(time_vec, position_error, \"m-\", linewidth=1.5, label=\"Position Error\")\n", "ax.set_xlabel(\"Time (s)\", fontsize=11)\n", "ax.set_ylabel(\"Error (m)\", fontsize=11)\n", "ax.set_title(\"KF Estimation Error\", fontsize=13, fontweight=\"bold\")\n", "ax.legend()\n", "ax.grid(True, alpha=0.3)\n", "\n", "plt.tight_layout()\n", "plt.show()\n", "\n", "print(f\"\\nPerformance Metrics (ROS Deployment):\")\n", "print(f\" Mean position error: {np.mean(position_error):.4f} m\")\n", "print(f\" Max position error: {np.max(position_error):.4f} m\")\n", "print(f\" Final position error: {position_error[-1]:.4f} m\")" ], "execution_count": null, "outputs": [], "id": "cell-24", "metadata": { "tags": [ "hide-input" ] } }, { "cell_type": "markdown", "source": [ "## Summary: Python → ROS2 Deployment\n", "\n", "We've successfully deployed our TurtleBot navigation system to ROS2!\n", "\n", "**Mapping: Theory → ROS**:\n", "\n", "| Python (Theory) | ROS2 (Deployment) |\n", "|-----------------|-------------------|\n", "| `DynamicalSystem` | `ROSNode` |\n", "| Function call | Topic publish/subscribe |\n", "| Parameter dict | ROS message |\n", "| `step()` method | Node callback |\n", "| NumPy array | Auto-converted message |\n", "\n", "**Key Accomplishments**:\n", "\n", "1. **Architecture Preservation**: Block diagram = ROS graph\n", "2. **Code Reuse**: Same dynamics, Jacobians, noise covariances\n", "3. **Automatic Conversion**: NumPy ↔ ROS messages handled by `ROSNode`\n", "4. **Distributed System**: Nodes can run on different machines\n", "5. **Same Algorithm**: EKF implementation identical to theory notebook\n", "\n", "**What Changed**:\n", "- ✓ Wrapped functions as `ROSNode` callbacks\n", "- ✓ Added message format conversions (PoseStamped, Odometry, Twist)\n", "- ✓ Used ROS topics instead of direct function calls\n", "- ✗ **Did NOT change**: Dynamics, controller, observer algorithms\n", "\n", "**Next Step**: In the Gazebo integration tutorial, we'll replace the simulator node with Gazebo's physics engine, demonstrating **ROS → Simulation → Hardware**!\n" ], "id": "cell-25", "metadata": {} }, { "cell_type": "markdown", "id": "pcv71puh0a", "source": "# Architecture 2: Teleoperation Mode\n\nIn the previous section, we demonstrated **autonomous navigation** with waypoint tracking. Now we'll demonstrate **teleoperation mode**, where a human operator directly controls the robot's velocity.\n\n**Key Architectural Difference**:\n- **Autonomous** (previous): Waypoint Gen → Controller → `/cmd_vel` → Plant → Observer\n- **Teleoperation** (this section): **Keyboard Teleop** → `/cmd_vel` → Plant → Observer\n\nThe teleoperation node (**shown in RED** in diagrams) is an **external ROS2 tool**, not wrapped in `pykal.ROSNode`. It publishes velocity commands directly to `/cmd_vel`, completely bypassing the high-level planning pipeline.\n\n
\n \n
\n\n**Nodes Used in Teleoperation Mode**:\n1. ✅ **TurtleBot Simulator** - Receives `/cmd_vel` from teleop\n2. ✅ **Kalman Filter** - Continues providing state estimates for monitoring\n3. ❌ **Waypoint Generator** - Not needed (bypassed)\n4. ❌ **Velocity Controller** - Not needed (bypassed)\n5. 🔴 **Keyboard Teleop** - External tool (`teleop_twist_keyboard`)", "metadata": {} }, { "cell_type": "code", "id": "ewrak1xypbi", "source": "# Re-initialize ROS2 (if needed after previous shutdown)\nif not rclpy.ok():\n rclpy.init()\n\n# Create fresh simulator and KF nodes for teleoperation\nprint(\"Creating nodes for teleoperation mode...\")\nsimulator_teleop = create_turtlebot_simulator_node(dt=0.1, rate_hz=10.0, R=R_turtlebot)\nkf_teleop = create_kalman_filter_node(dt=0.1, rate_hz=10.0, Q=Q_turtlebot, R=R_turtlebot)\n\nsimulator_teleop.create_node()\nkf_teleop.create_node()\nprint(\"✓ Simulator and Kalman filter created\")\n\n# Start nodes\nprint(\"\\nStarting nodes...\")\nsimulator_teleop.start()\nprint(\" ✓ TurtleBot simulator running\")\nkf_teleop.start()\nprint(\" ✓ Kalman filter running\")\n\nprint(\"\\n🚀 System ready for teleoperation!\")\nprint(\"\\nActive ROS2 Topics:\")\nprint(\" /cmd_vel ← (waiting for keyboard teleop)\")\nprint(\" /odom ← turtlebot_simulator\")\nprint(\" /estimate ← kalman_filter\")\nprint(\"\\nNOTE: Waypoint generator and controller are NOT running (bypassed!)\")", "metadata": {}, "execution_count": null, "outputs": [] }, { "cell_type": "code", "id": "1xlju34mc52", "source": "# Create a simulated teleop node that mimics user input\ndef create_simulated_teleop_node():\n \"\"\"\n Simulate keyboard teleop with programmed commands.\n \n This mimics what teleop_twist_keyboard would publish,\n demonstrating the teleoperation architecture.\n \"\"\"\n import time\n \n # Programmed maneuver sequence\n maneuvers = [\n (0, 5, (0.15, 0.0)), # 0-5s: Forward\n (5, 8, (0.15, 0.8)), # 5-8s: Forward + turn left\n (8, 13, (0.15, 0.0)), # 8-13s: Forward\n (13, 16, (0.15, -0.8)), # 13-16s: Forward + turn right\n (16, 21, (0.15, 0.0)), # 16-21s: Forward\n (21, 24, (0.0, 1.5)), # 21-24s: Spin left\n (24, 30, (0.15, 0.0)), # 24-30s: Forward\n ]\n \n start_time = None\n \n def teleop_callback(tk):\n nonlocal start_time\n \n if start_time is None:\n start_time = tk\n \n elapsed = tk - start_time\n \n # Find current maneuver\n v, omega = 0.0, 0.0\n for t_start, t_end, (v_cmd, omega_cmd) in maneuvers:\n if t_start <= elapsed < t_end:\n v, omega = v_cmd, omega_cmd\n break\n \n # Pack as Twist\n cmd_vel = np.array([v, 0.0, 0.0, 0.0, 0.0, omega])\n \n return {\"cmd_vel\": cmd_vel}\n \n node = ROSNode(\n node_name=\"simulated_teleop\",\n callback=teleop_callback,\n subscribes_to=[],\n publishes_to=[\n (\"cmd_vel\", Twist, \"/cmd_vel\"),\n ],\n rate_hz=10.0)\n \n return node\n\n\n# Create and start simulated teleop\nprint(\"Creating simulated keyboard teleop...\")\nteleop_node = create_simulated_teleop_node()\nteleop_node.create_node()\nteleop_node.start()\nprint(\"✓ Simulated teleop running (mimics keyboard input)\")\nprint(\"\\nThis simulates a human operator using teleop_twist_keyboard!\")\nprint(\"In real usage, you would run: ros2 run teleop_twist_keyboard teleop_twist_keyboard\")", "metadata": {}, "execution_count": null, "outputs": [] }, { "cell_type": "code", "id": "xx198vyp6x", "source": "# Create logger for teleoperation data\ndef create_teleop_logger():\n \"\"\"Logger for teleoperation mode (no reference waypoints).\"\"\"\n teleop_data = {\n \"time\": [],\n \"cmd_vel\": [],\n \"true_state\": [],\n \"odom\": [],\n \"estimate\": [],\n }\n \n def logger_callback(tk, cmd_vel, true_state, odom, estimate):\n teleop_data[\"time\"].append(tk)\n teleop_data[\"cmd_vel\"].append(cmd_vel.copy())\n teleop_data[\"true_state\"].append(true_state.copy())\n teleop_data[\"odom\"].append(odom.copy())\n teleop_data[\"estimate\"].append(estimate.copy())\n return {}\n \n node = ROSNode(\n node_name=\"teleop_logger\",\n callback=logger_callback,\n subscribes_to=[\n (\"/cmd_vel\", Twist, \"cmd_vel\"),\n (\"/true_state\", Odometry, \"true_state\"),\n (\"/odom\", Odometry, \"odom\"),\n (\"/estimate\", Odometry, \"estimate\"),\n ],\n publishes_to=[],\n rate_hz=10.0)\n \n return node, teleop_data\n\n\n# Create and start logger\nteleop_logger, teleop_data = create_teleop_logger()\nteleop_logger.create_node()\nteleop_logger.start()\nprint(\"Data logger started!\")\n\n# Run teleoperation for 30 seconds\nimport time as pytime\n\nT_teleop = 30.0\nprint(f\"\\nRunning teleoperation for {T_teleop} seconds...\")\nprint(\"Robot is being controlled via simulated keyboard input!\")\npytime.sleep(T_teleop)\n\nprint(f\"\\nTeleoperation complete! Collected {len(teleop_data['time'])} samples.\")", "metadata": {}, "execution_count": null, "outputs": [] }, { "cell_type": "code", "id": "rjkpo3kglgg", "source": "# Stop all teleoperation nodes\nprint(\"Stopping teleoperation nodes...\")\nteleop_logger.stop()\nteleop_node.stop()\nsimulator_teleop.stop()\nkf_teleop.stop()\n\n# Shutdown ROS2\nrclpy.shutdown()\nprint(\"All nodes stopped. ROS2 shutdown complete.\")", "metadata": {}, "execution_count": null, "outputs": [] }, { "cell_type": "code", "id": "1f0q680cm8n", "source": "# Convert teleoperation data to arrays\ntime_teleop = np.array(teleop_data[\"time\"])\ntrue_teleop = np.array(teleop_data[\"true_state\"])\nest_teleop = np.array(teleop_data[\"estimate\"])\nmeas_teleop = np.array(teleop_data[\"odom\"])\ncmd_teleop = np.array(teleop_data[\"cmd_vel\"])\n\n# Extract positions\ntrue_x_t = true_teleop[:, 0]\ntrue_y_t = true_teleop[:, 1]\nest_x_t = est_teleop[:, 0]\nest_y_t = est_teleop[:, 1]\nmeas_x_t = meas_teleop[:, 0]\nmeas_y_t = meas_teleop[:, 1]\n\n# Extract velocities\nv_cmd = cmd_teleop[:, 0]\nomega_cmd = cmd_teleop[:, 5]\n\n# Plotting\nfig, axs = plt.subplots(2, 2, figsize=(14, 10))\n\n# Plot 1: 2D Trajectory (teleoperation)\nax = axs[0, 0]\nax.plot(true_x_t, true_y_t, \"b-\", linewidth=2, label=\"True Trajectory\", alpha=0.7)\nax.plot(est_x_t, est_y_t, \"r--\", linewidth=2, label=\"Estimated (EKF)\", alpha=0.7)\nax.scatter(true_x_t[0], true_y_t[0], c=\"green\", s=150, marker=\"o\", label=\"Start\", zorder=5)\nax.scatter(true_x_t[-1], true_y_t[-1], c=\"red\", s=150, marker=\"X\", label=\"End\", zorder=5)\nax.set_xlabel(\"X Position (m)\", fontsize=11)\nax.set_ylabel(\"Y Position (m)\", fontsize=11)\nax.set_title(\"Teleoperation: Manual Control Trajectory\", fontsize=13, fontweight=\"bold\")\nax.legend()\nax.grid(True, alpha=0.3)\nax.axis(\"equal\")\n\n# Plot 2: Velocity Commands\nax = axs[0, 1]\nax.plot(time_teleop, v_cmd, \"b-\", label=\"Linear Velocity (v)\", alpha=0.7)\nax.plot(time_teleop, omega_cmd, \"r-\", label=\"Angular Velocity (ω)\", alpha=0.7)\nax.set_xlabel(\"Time (s)\", fontsize=11)\nax.set_ylabel(\"Command (m/s or rad/s)\", fontsize=11)\nax.set_title(\"Teleop Commands to /cmd_vel\", fontsize=13, fontweight=\"bold\")\nax.legend()\nax.grid(True, alpha=0.3)\n\n# Plot 3: X Position Tracking\nax = axs[1, 0]\nax.plot(time_teleop, true_x_t, \"b-\", label=\"True X\", alpha=0.7)\nax.plot(time_teleop, est_x_t, \"r--\", linewidth=2, label=\"Estimated X\", alpha=0.7)\nax.scatter(time_teleop[::10], meas_x_t[::10], c=\"gray\", s=10, alpha=0.3, label=\"Measurements\")\nax.set_xlabel(\"Time (s)\", fontsize=11)\nax.set_ylabel(\"X Position (m)\", fontsize=11)\nax.set_title(\"X Position: Teleop Mode\", fontsize=13, fontweight=\"bold\")\nax.legend()\nax.grid(True, alpha=0.3)\n\n# Plot 4: Estimation Error\nax = axs[1, 1]\nposition_error_t = np.sqrt((true_x_t - est_x_t) ** 2 + (true_y_t - est_y_t) ** 2)\nax.plot(time_teleop, position_error_t, \"m-\", linewidth=1.5, label=\"Position Error\")\nax.set_xlabel(\"Time (s)\", fontsize=11)\nax.set_ylabel(\"Error (m)\", fontsize=11)\nax.set_title(\"KF Error During Teleoperation\", fontsize=13, fontweight=\"bold\")\nax.legend()\nax.grid(True, alpha=0.3)\n\nplt.tight_layout()\nplt.show()\n\nprint(f\"\\nTeleoperation Performance:\")\nprint(f\" Mean position error: {np.mean(position_error_t):.4f} m\")\nprint(f\" Max position error: {np.max(position_error_t):.4f} m\")\nprint(f\" Total distance traveled: {np.sum(np.sqrt(np.diff(true_x_t)**2 + np.diff(true_y_t)**2)):.2f} m\")", "metadata": {}, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "id": "9x4q3r4038w", "source": "## Summary: Two ROS Architectures Demonstrated\n\nThis notebook demonstrated **two distinct control architectures** for the TurtleBot ROS system:\n\n### Architecture 1: Autonomous Navigation (Waypoint Tracking)\n\n**Nodes Active**:\n- ✅ Waypoint Generator (`pykal.ROSNode`)\n- ✅ Velocity Controller (`pykal.ROSNode`)\n- ✅ TurtleBot Simulator (`pykal.ROSNode`)\n- ✅ Kalman Filter (`pykal.ROSNode`)\n\n**Data Flow**: Waypoint Gen → Controller → `/cmd_vel` → Plant → Observer → (feedback)\n\n**Use Case**: Autonomous navigation with high-level planning and control\n\n---\n\n### Architecture 2: Teleoperation (Manual Control)\n\n**Nodes Active**:\n- 🔴 Keyboard Teleop (external tool: `teleop_twist_keyboard`)\n- ✅ TurtleBot Simulator (`pykal.ROSNode`)\n- ✅ Kalman Filter (`pykal.ROSNode`)\n\n**Nodes Bypassed**:\n- ❌ Waypoint Generator (not needed)\n- ❌ Velocity Controller (not needed)\n\n**Data Flow**: Teleop → `/cmd_vel` → Plant → Observer\n\n**Use Case**: Direct manual control for intervention, testing, or operation\n\n---\n\n### Key Insights\n\n1. **Modularity**: ROS architecture allows easy mode switching by changing active nodes\n2. **Red Nodes**: External tools (teleop) are shown in RED in diagrams to distinguish from `pykal.ROSNode` wrappers\n3. **Observer Persistence**: Kalman filter runs in both modes for state estimation\n4. **Same Interface**: Both modes use `/cmd_vel` topic, making them interchangeable\n5. **Common Pattern**: This dual-mode architecture (autonomous + teleop) is standard in robotics\n\n**Next Steps**: The [Gazebo integration tutorial](../ros_to_gazebo/turtlebot_gazebo_integration.ipynb) replaces the simulator with physics simulation, demonstrating the same dual-mode architecture in a realistic environment!", "metadata": {} }, { "cell_type": "markdown", "id": "c0ekbqxd0ck", "source": "## Visualization: Teleoperation Results", "metadata": {} }, { "cell_type": "markdown", "id": "a3k06xobbji", "source": "### Stopping Teleoperation Nodes", "metadata": {} }, { "cell_type": "markdown", "id": "wueld7fkhjn", "source": "### Data Collection During Teleoperation", "metadata": {} }, { "cell_type": "markdown", "id": "908jb9qongf", "source": "## Running the Keyboard Teleop Tool\n\nTo control the TurtleBot manually, we use the standard ROS2 `teleop_twist_keyboard` package.\n\n**In a separate terminal**, run:\n\n```bash\nros2 run teleop_twist_keyboard teleop_twist_keyboard\n```\n\n**Keyboard Controls**:\n- `i` - Move forward\n- `k` - Stop\n- `,` - Move backward \n- `j` - Turn left\n- `l` - Turn right\n- `u` - Forward + left\n- `o` - Forward + right\n- `m` - Backward + left\n- `.` - Backward + right\n- `q/z` - Increase/decrease max speeds\n\n:::{note}\nThe teleop tool publishes `Twist` messages to `/cmd_vel`, which the simulator subscribes to. The Kalman filter monitors the system through `/odom` and `/cmd_vel` topics.\n:::\n\n**For this notebook demo**, we'll simulate teleop commands programmatically to demonstrate the architecture:", "metadata": {} }, { "cell_type": "markdown", "id": "7h42qy9gmqv", "source": "## Setting Up Teleoperation Mode\n\nWe only need to run the **simulator** and **observer** nodes. The waypoint generator and controller are not used in teleoperation mode.", "metadata": {} }, { "cell_type": "markdown", "source": [ "[← Dynamical Systems as ROS Nodes](../../../getting_started/python_to_ros/dynamical_systems_as_ros_nodes.rst)\n" ], "id": "cell-26", "metadata": {} } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "name": "python", "version": "3.12.0" } }, "nbformat": 4, "nbformat_minor": 5 }