{ "cells": [ { "cell_type": "markdown", "id": "a9ad789f", "metadata": {}, "source": [ "# ROSNode\n", "\n", "In the previous tutorial, we learned how to convert between ROS messages and NumPy arrays. Now we'll learn how to wrap arbitrary Python functions as ROS2 nodes using `pykal`'s `ROSNode` class.\n", "\n", "The `ROSNode` class is a high-level abstraction that:\n", "- Wraps any Python callback as a ROS2 node\n", "- Automatically converts ROS messages to/from NumPy arrays\n", "- Handles fixed-rate execution\n", "- Manages multi-rate sensor fusion with staleness po\\licies\n", "- Provides diagnostics, heartbeat monitoring, and error handling\n", "\n", "This tutorial will show you how to use `ROSNode` to create various types of nodes, from simple echo nodes to complex sensor fusion systems." ] }, { "cell_type": "markdown", "id": "677e11fc", "metadata": {}, "source": [ "# Conceptual Foundation\n", "\n", "Recall that from \"Cruise Control\" in \"Theory to Python\", we derived the following block diagram of dynamical systems:\n", "\n", "\n", "\n", "The idea is that there is a correspondence between this representation of a system and a ROS node archtecture, namely\n", "\n", "\n", "\n", "Essentially, each dynamical system block becomes a ROS node, and the data exchanged between them becomes a topic. The dynamical system is wrapped in a callback function, which we denote $C$. The callback function receives the current time in seconds (tk) as the first argument and the subscribed topics as keyword arguments. Under the hood, these topics are converted to numpy arrays which are sent to wrapped dynamical system. After the system is stepped, the output is then put into a dictionary which maps the output to the topic name it should be published to.\n", "\n", "Incidentally, we could not set the setpoint \"live\" in the \"Cruise Control\" notebook (that is, we could not run the simulation and change the setpoint via the keyboard) because Jupyter kernels lack an elegant way to do so. However, the ROS node architecture, being middelware, has nodes that can actually serve between keyboard inputs and the rest of the system! Our new node architecture would look something like:\n", "\n", "\n", "\n", "Indeed, we will show an example of this system running later in the notebook. For now, we build up an intuition on how to use the `ROSNode` class.\n", "\n", ":::node{For the math nerds, there is a functor from the category of discrete-time dynamical systems, where the objects are dynamical system blocks and the morphisms are communicated data, and the category of ROS nodes, where the objects are ROS nodes and the morphisms are topics.}" ] }, { "cell_type": "markdown", "id": "259b49c0", "metadata": {}, "source": [ "## Programmatic Implementation\n", "We review what a ROS node is before we discuss how the `ROSNode` class interfaces with it.\n", "\n", "A **ROS2 node** is a long-running process that communicates with other nodes by exchanging typed messages over named channels:\n", "\n", "- **Topics** (pub/sub): asynchronous message streams (`/cmd_vel`, `/odom`, `/imu/data`, …)\n", "- **Services** (req/rep): synchronous RPC calls (`/reset`, `/calibrate`, …)\n", "- **Actions** (goal/feedback/result): long-running tasks (navigation goals, trajectories, …)\n", "- **Parameters**: runtime configuration (`gain`, `rate`, `frame_id`, …)\n", "\n", "In this tutorial we focus on the most common case: **topic-based pub/sub**, because it matches the “block diagram wiring” intuition best.\n", "\n", "At a practical level, most robotics nodes reduce to:\n", "\n", "1. **Subscribe** to one or more input topics\n", "2. **Convert** incoming messages into an internal representation\n", "3. **Compute** something (control, filtering, fusion, transforms, etc.)\n", "4. **Publish** results to one or more output topics\n", "5. Optionally run at a **fixed rate** even when some inputs are slower/faster than others\n", "\n", "In plain `rclpy`, you typically write boilerplate for all of this:\n", "- define publishers/subscribers\n", "- maintain “latest message” buffers\n", "- handle message timing, missing data, and synchronization\n", "- write conversion code between ROS messages and numeric arrays\n", "- manage timers for fixed-rate execution\n", "- add diagnostics and error handling\n", "\n", "The `pykal` `ROSNode` class exists to collapse that boilerplate into a single object, while preserving the key ROS concepts.\n", "\n", "A `ROSNode` is built around a single **Python callback** with signature\n", "\n", "\\[\n", "C:\\ (t_k,\\ \\text{inputs as keyword args})\\ \\mapsto\\ \\{\\text{outputs keyed by name}\\}.\n", "\\]\n", "\n", "- The first argument `tk` is the current time in seconds (float).\n", "- Each subscribed topic is passed in as a keyword argument to the callback.\n", "- Each input arrives as a **NumPy array**, not a ROS message (conversion is automatic).\n", "- The callback returns a **dictionary** mapping output names to NumPy arrays.\n", "- `ROSNode` converts those outputs back into ROS messages and publishes them.\n", "\n", "This gives you a clean separation:\n", "\n", "- **Your algorithm** lives in a pure Python function operating on NumPy arrays.\n", "- **ROS plumbing** (pub/sub, conversion, timers) is handled by `ROSNode`.\n", "\n", "\n", "This is the key correspondence you’re using:\n", "\n", "- A **dynamical system block** (controller, plant, observer, setpoint generator) becomes a **ROS node**\n", "- A **wire** (signal \\(r, u, x, \\hat{x}\\)) becomes a **topic**\n", "\n", "So the block diagram interpretation:\n", "\n", "- “Controller takes \\(r\\) and \\(\\hat{x}\\), produces \\(u\\)”\n", "becomes:\n", "- controller node **subscribes** to `/setpoint`, `/xhat` and **publishes** `/cmd_u`\n", "\n", "This is exactly what your TikZ diagram is representing.\n", "\n", "\n", "In ROS, there are two common patterns:\n", "\n", "**(A) Message-triggered nodes**\n", "- Compute whenever an input message arrives (callback per subscription)\n", "\n", "**(B) Timer-driven (fixed-rate) nodes**\n", "- Compute at a fixed frequency, using the latest available inputs\n", "\n", "`ROSNode` is designed primarily around (B), because it matches control/estimation loops and makes multi-rate fusion practical: you can run the estimator at 100 Hz even if GPS comes at 1 Hz." ] }, { "cell_type": "markdown", "id": "1804ecd4", "metadata": { "lines_to_next_cell": 2 }, "source": [ "## References and Examples\n", "\n", "The power of the `ROSNode` abstraction lies in the fact that it lets you write **algorithm-first Python** (NumPy in / NumPy out),\n", "while `pykal` handles the **ROS plumbing** (pub/sub wiring, conversions, timing, multi-rate staleness, and lifecycle).\n", "\n", "In the following sections, we'll explore:\n", "\n", "- **Creation Patterns**: Stateless nodes, stateful nodes, “pure transformer” nodes\n", "- **Callback I/O Contract**: Subscribed topics as keyword args, outputs as a dict of arrays\n", "- **Timing Patterns**: Timer-driven fixed-rate execution vs. event-driven behavior\n", "- **Multi-Rate Inputs**: Fusing fast + slow topics with explicit staleness policies\n", "- **Safety & Robustness**: Required topics, error callbacks, heartbeat monitoring\n", "- **Diagnostics**: Throughput, latency, dropped/stale inputs, node health\n", "- **Integration Examples**: Wrapping `DynamicalSystem` / estimators / controllers as ROS nodes\n", "\n", "Let's begin." ] }, { "cell_type": "markdown", "id": "69e3be39", "metadata": {}, "source": [ "## Part 1: Basic Node Structure\n", "\n", "A `ROSNode` object requires:\n", "1. **node_name**: Unique identifier for the node\n", "2. **callback**: Python function that processes inputs and produces outputs\n", "3. **subscribes_to**: List of `(topic, msg_type, arg_name)` tuples\n", "4. **publishes_to**: List of `(return_key, msg_type, topic)` tuples\n", "5. **rate_hz**: Execution rate (default 10 Hz)\n", "\n", "The callback must return:\n", "- Dictionary mapping return keys to NumPy arrays (auto-converted to ROS messages)" ] }, { "cell_type": "code", "execution_count": null, "id": "b0e315cf", "metadata": { "lines_to_next_cell": 2, "tags": [ "skip-execution" ] }, "outputs": [], "source": [ "# Import necessary libraries\n", "import numpy as np\n", "from pykal.ros.ros_node import ROSNode\n", "from geometry_msgs.msg import Twist, Vector3\n", "from nav_msgs.msg import Odometry\n", "import rclpy\n", "import time" ] }, { "cell_type": "markdown", "id": "ee186029", "metadata": { "lines_to_next_cell": 2 }, "source": [ "### Example 1.1: Simple Echo Node\n", "\n", "Let's create the simplest possible node: one that echoes a message from one topic to another." ] }, { "cell_type": "code", "execution_count": null, "id": "22619466", "metadata": { "lines_to_next_cell": 2, "tags": [ "skip-execution" ] }, "outputs": [], "source": [ "# Callback that echoes the input\n", "def echo_callback(tk, velocity_in):\n", " \"\"\"\n", " Echo the velocity from input to output.\n", "\n", " Args:\n", " tk (float): Current time in seconds\n", " velocity_in (np.ndarray): (6) array [vx, vy, vz, wx, wy, wz]\n", "\n", " Returns:\n", " dict: {'velocity_out': (6) array}\n", " \"\"\"\n", " return {\"velocity_out\": velocity_in}\n", "\n", "\n", "# Create the echo node\n", "echo_node = ROSNode(\n", " node_name=\"echo_node\",\n", " callback=echo_callback,\n", " subscribes_to=[\n", " (\"/cmd_vel_in\", Twist, \"velocity_in\"), # Subscribe to /cmd_vel_in\n", " ],\n", " publishes_to=[\n", " (\"velocity_out\", Twist, \"/cmd_vel_out\"), # Publish to /cmd_vel_out\n", " ],\n", " rate_hz=10.0, # Run at 10 Hz\n", ")\n", "\n", "print(\"Echo node created!\")\n", "print(f\" Subscribes to: /cmd_vel_in (Twist)\")\n", "print(f\" Publishes to: /cmd_vel_out (Twist)\")\n", "print(f\" Rate: 10 Hz\")" ] }, { "cell_type": "markdown", "id": "464a1345", "metadata": {}, "source": [ ":::{note}\n", "The `ROSNode` automatically:\n", "- Converts the incoming `Twist` message to a NumPy array `(6,)` with format `[vx, vy, vz, wx, wy, wz]`\n", "- Passes it to the callback as `velocity_in`\n", "- Converts the returned `velocity_out` array back to a `Twist` message\n", "- Publishes it to `/cmd_vel_out`\n", ":::" ] }, { "cell_type": "markdown", "id": "c78515ad", "metadata": { "lines_to_next_cell": 2 }, "source": [ "### Example 1.2: Velocity Scaling Node\n", "\n", "Let's create a more useful node that scales velocities (e.g., for safety limiting)." ] }, { "cell_type": "code", "execution_count": null, "id": "bfd33739", "metadata": { "lines_to_next_cell": 2, "tags": [ "skip-execution" ] }, "outputs": [], "source": [ "# Callback that scales velocities\n", "def scale_velocity_callback(tk, cmd_vel, scale_factor=0.5):\n", " \"\"\"\n", " Scale the commanded velocity by a safety factor.\n", "\n", " Args:\n", " tk (float): Current time in seconds\n", " cmd_vel (np.ndarray): (6) array [vx, vy, vz, wx, wy, wz]\n", " scale_factor (float): Scaling factor (default 0.5)\n", "\n", " Returns:\n", " dict: {'safe_vel': scaled (6) array}\n", " \"\"\"\n", " # Scale both linear and angular velocities\n", " safe_vel = cmd_vel * scale_factor\n", "\n", " # Print for debugging\n", " print(f\"[{tk:.2f}s] Scaling velocity: {cmd_vel[0]:.2f} -> {safe_vel[0]:.2f} m/s\")\n", "\n", " return {\"safe_vel\": safe_vel}\n", "\n", "\n", "# Create the velocity scaling node\n", "scaler_node = ROSNode(\n", " node_name=\"velocity_scaler\",\n", " callback=lambda tk, cmd_vel: scale_velocity_callback(tk, cmd_vel, scale_factor=0.5),\n", " subscribes_to=[\n", " (\"/cmd_vel\", Twist, \"cmd_vel\"),\n", " ],\n", " publishes_to=[\n", " (\"safe_vel\", Twist, \"/cmd_vel_safe\"),\n", " ],\n", " rate_hz=20.0, # Run at 20 Hz for faster response\n", ")\n", "\n", "print(\"Velocity scaler node created!\")\n", "print(f\" Subscribes to: /cmd_vel (Twist)\")\n", "print(f\" Publishes to: /cmd_vel_safe (Twist)\")\n", "print(f\" Safety factor: 0.5\")\n", "print(f\" Rate: 20 Hz\")" ] }, { "cell_type": "markdown", "id": "edb8a021", "metadata": { "lines_to_next_cell": 2 }, "source": [ "## Part 2: Stateful Callbacks with Closures\n", "\n", "Many robotics algorithms need to maintain internal state (e.g., controllers, filters). We can use Python closures to create stateful callbacks.\n", "\n", "### Example 2.1: Simple Integrator\n", "\n", "Let's create a node that integrates velocity to estimate position." ] }, { "cell_type": "code", "execution_count": null, "id": "e688e85d", "metadata": { "lines_to_next_cell": 2, "tags": [ "skip-execution" ] }, "outputs": [], "source": [ "def create_integrator_callback(dt=0.1):\n", " \"\"\"\n", " Factory function that creates an integrator callback with internal state.\n", "\n", " Args:\n", " dt (float): Time step for integration\n", "\n", " Returns:\n", " callable: Callback function with state\n", " \"\"\"\n", " # Internal state: position\n", " position = np.array([0.0, 0.0, 0.0])\n", "\n", " def integrator_callback(tk, velocity):\n", " \"\"\"\n", " Integrate velocity to estimate position.\n", "\n", " Args:\n", " tk (float): Current time\n", " velocity (np.ndarray): (6) Twist array, we use linear part [vx, vy, vz]\n", "\n", " Returns:\n", " dict: {'position': (3) array}\n", " \"\"\"\n", " nonlocal position\n", "\n", " # Extract linear velocity\n", " v_linear = velocity[:3]\n", "\n", " # Simple Euler integration: x_{k+1} = x_k + v * dt\n", " position = position + v_linear * dt\n", "\n", " print(\n", " f\"[{tk:.2f}s] Position: [{position[0]:.2f}, {position[1]:.2f}, {position[2]:.2f}]\"\n", " )\n", "\n", " return {\"position\": position}\n", "\n", " return integrator_callback\n", "\n", "\n", "# Create the integrator node\n", "integrator_callback = create_integrator_callback(dt=0.1)\n", "integrator_node = ROSNode(\n", " node_name=\"velocity_integrator\",\n", " callback=integrator_callback,\n", " subscribes_to=[\n", " (\"/cmd_vel\", Twist, \"velocity\"),\n", " ],\n", " publishes_to=[\n", " (\"position\", Vector3, \"/estimated_position\"),\n", " ],\n", " rate_hz=10.0)\n", "\n", "print(\"Integrator node created!\")\n", "print(f\" Subscribes to: /cmd_vel (Twist)\")\n", "print(f\" Publishes to: /estimated_position (Vector3)\")\n", "print(f\" Integration timestep: 0.1 s\")" ] }, { "cell_type": "markdown", "id": "c180ccaf", "metadata": {}, "source": [ ":::{note}\n", "Using closures allows us to:\n", "- Maintain internal state between callback invocations\n", "- Initialize state with specific parameters (like `dt`)\n", "- Keep the callback signature clean (only `tk` and subscribed topics)\n", ":::" ] }, { "cell_type": "markdown", "id": "3981a3f7", "metadata": { "lines_to_next_cell": 2 }, "source": [ "### Example 2.2: Proportional Controller\n", "\n", "Let's create a position controller that generates velocity commands to reach a setpoint." ] }, { "cell_type": "code", "execution_count": null, "id": "0f37d466", "metadata": { "lines_to_next_cell": 2, "tags": [ "skip-execution" ] }, "outputs": [], "source": [ "def create_p_controller_callback(Kp=1.0, setpoint=np.array([5.0, 0.0, 1.0])):\n", " \"\"\"\n", " Factory for proportional position controller.\n", "\n", " Args:\n", " Kp (float): Proportional gain\n", " setpoint (np.ndarray): Desired position (3)\n", "\n", " Returns:\n", " callable: Controller callback\n", " \"\"\"\n", "\n", " def controller_callback(tk, current_position):\n", " \"\"\"\n", " Compute velocity command to reach setpoint.\n", "\n", " Args:\n", " tk (float): Current time\n", " current_position (np.ndarray): (3) current position\n", "\n", " Returns:\n", " dict: {'cmd_vel': (6) Twist array}\n", " \"\"\"\n", " # Position error\n", " error = setpoint - current_position\n", "\n", " # Proportional control law: v = Kp * error\n", " v_linear = Kp * error\n", "\n", " # No angular velocity for this simple controller\n", " v_angular = np.array([0.0, 0.0, 0.0])\n", "\n", " # Construct Twist array\n", " cmd_vel = np.concatenate([v_linear, v_angular])\n", "\n", " # Compute error magnitude for logging\n", " error_mag = np.linalg.norm(error)\n", " print(\n", " f\"[{tk:.2f}s] Error: {error_mag:.3f} m, Command: [{v_linear[0]:.2f}, {v_linear[1]:.2f}, {v_linear[2]:.2f}] m/s\"\n", " )\n", "\n", " return {\"cmd_vel\": cmd_vel}\n", "\n", " return controller_callback\n", "\n", "\n", "# Create the controller node\n", "setpoint = np.array([5.0, 3.0, 1.5])\n", "controller_callback = create_p_controller_callback(Kp=0.5, setpoint=setpoint)\n", "controller_node = ROSNode(\n", " node_name=\"position_controller\",\n", " callback=controller_callback,\n", " subscribes_to=[\n", " (\"/current_position\", Vector3, \"current_position\"),\n", " ],\n", " publishes_to=[\n", " (\"cmd_vel\", Twist, \"/cmd_vel\"),\n", " ],\n", " rate_hz=50.0, # High rate for responsive control\n", ")\n", "\n", "print(\"Position controller node created!\")\n", "print(f\" Subscribes to: /current_position (Vector3)\")\n", "print(f\" Publishes to: /cmd_vel (Twist)\")\n", "print(f\" Setpoint: {setpoint}\")\n", "print(f\" Gain Kp: 0.5\")\n", "print(f\" Rate: 50 Hz\")" ] }, { "cell_type": "markdown", "id": "62ef9995", "metadata": { "lines_to_next_cell": 2 }, "source": [ "## Part 3: Multi-Rate Sensor Fusion with Staleness Policies\n", "\n", "Real robots have sensors that publish at different rates:\n", "- Motion capture: ~100 Hz (high-rate, accurate)\n", "- LiDAR: ~10 Hz (medium-rate)\n", "- GPS: ~1 Hz (low-rate)\n", "\n", "The `ROSNode` handles this with **staleness policies**:\n", "- `\"zero\"`: Replace stale data with zeros\n", "- `\"hold\"`: Keep the last valid value (default)\n", "- `\"drop\"`: Skip the callback if data is stale\n", "\n", "### Example 3.1: Multi-Sensor Fusion Node\n", "\n", "Let's create a node that fuses odometry (fast) and GPS (slow)." ] }, { "cell_type": "code", "execution_count": null, "id": "b2d57395", "metadata": { "lines_to_next_cell": 2, "tags": [ "skip-execution" ] }, "outputs": [], "source": [ "def sensor_fusion_callback(tk, odometry, gps_position):\n", " \"\"\"\n", " Fuse odometry and GPS for position estimation.\n", "\n", " Args:\n", " tk (float): Current time\n", " odometry (np.ndarray): (13) [px, py, pz, qx, qy, qz, qw, vx, vy, vz, wx, wy, wz]\n", " gps_position (np.ndarray): (3) [x, y, z]\n", "\n", " Returns:\n", " dict: {'fused_position': (3) array}\n", " \"\"\"\n", " # Extract position from odometry (first 3 elements)\n", " odom_position = odometry[:3]\n", "\n", " # Simple weighted fusion (in practice, use Kalman filter)\n", " # Weight: 70% odometry, 30% GPS\n", " fused_position = 0.7 * odom_position + 0.3 * gps_position\n", "\n", " # Log positions\n", " print(\n", " f\"[{tk:.2f}s] Odom: [{odom_position[0]:.2f}, {odom_position[1]:.2f}], \"\n", " f\"GPS: [{gps_position[0]:.2f}, {gps_position[1]:.2f}], \"\n", " f\"Fused: [{fused_position[0]:.2f}, {fused_position[1]:.2f}]\"\n", " )\n", "\n", " return {\"fused_position\": fused_position}\n", "\n", "\n", "# Create the fusion node with staleness configuration\n", "fusion_node = ROSNode(\n", " node_name=\"sensor_fusion\",\n", " callback=sensor_fusion_callback,\n", " subscribes_to=[\n", " (\"/odom\", Odometry, \"odometry\"), # Fast: 50 Hz\n", " (\"/gps/position\", Vector3, \"gps_position\"), # Slow: 1 Hz\n", " ],\n", " publishes_to=[\n", " (\"fused_position\", Vector3, \"/fused_position\"),\n", " ],\n", " rate_hz=50.0, # Run at fast rate\n", " stale_config={\n", " \"gps_position\": {\n", " \"after\": 2.0, # Mark stale after 2 seconds\n", " \"policy\": \"hold\", # Keep last valid GPS reading\n", " }\n", " })\n", "\n", "print(\"Sensor fusion node created!\")\n", "print(f\" Subscribes to:\")\n", "print(f\" - /odom (Odometry, ~50 Hz)\")\n", "print(f\" - /gps/position (Vector3, ~1 Hz)\")\n", "print(f\" Publishes to: /fused_position (Vector3)\")\n", "print(f\" GPS staleness: hold last value if >2s old\")\n", "print(f\" Node rate: 50 Hz\")" ] }, { "cell_type": "markdown", "id": "c9cf93b3", "metadata": {}, "source": [ ":::{note}\n", "The `stale_config` allows the fusion node to run at 50 Hz even though GPS only updates at 1 Hz:\n", "- Between GPS updates, the last GPS value is used (\"hold\" policy)\n", "- Odometry is fresh at every callback (50 Hz)\n", "- If GPS stops publishing for >2 seconds, the node detects it as stale\n", ":::" ] }, { "cell_type": "markdown", "id": "74b9385b", "metadata": { "lines_to_next_cell": 2 }, "source": [ "### Example 3.2: Staleness Policy Comparison\n", "\n", "Let's see the difference between staleness policies." ] }, { "cell_type": "code", "execution_count": null, "id": "85c8daf3", "metadata": { "lines_to_next_cell": 2, "tags": [ "skip-execution" ] }, "outputs": [], "source": [ "def policy_demo_callback(tk, fast_sensor, slow_sensor):\n", " \"\"\"\n", " Demonstrate staleness policy behavior.\n", "\n", " Args:\n", " tk (float): Current time\n", " fast_sensor (np.ndarray): (3) high-rate sensor\n", " slow_sensor (np.ndarray): (3) low-rate sensor\n", "\n", " Returns:\n", " dict: {'combined': (6) concatenated sensors}\n", " \"\"\"\n", " combined = np.concatenate([fast_sensor, slow_sensor])\n", "\n", " print(f\"[{tk:.2f}s] Fast: {fast_sensor[0]:.2f}, Slow: {slow_sensor[0]:.2f}\")\n", "\n", " return {\"combined\": combined}\n", "\n", "\n", "# Policy 1: \"hold\" - keeps last value\n", "node_hold = ROSNode(\n", " node_name=\"policy_hold\",\n", " callback=policy_demo_callback,\n", " subscribes_to=[\n", " (\"/fast\", Vector3, \"fast_sensor\"),\n", " (\"/slow\", Vector3, \"slow_sensor\"),\n", " ],\n", " publishes_to=[\n", " (\"combined\", Vector3, \"/combined_hold\"), # Only publishing first 3 elements\n", " ],\n", " rate_hz=10.0,\n", " stale_config={\"slow_sensor\": {\"after\": 1.0, \"policy\": \"hold\"}})\n", "\n", "# Policy 2: \"zero\" - replaces with zeros\n", "node_zero = ROSNode(\n", " node_name=\"policy_zero\",\n", " callback=policy_demo_callback,\n", " subscribes_to=[\n", " (\"/fast\", Vector3, \"fast_sensor\"),\n", " (\"/slow\", Vector3, \"slow_sensor\"),\n", " ],\n", " publishes_to=[\n", " (\"combined\", Vector3, \"/combined_zero\"),\n", " ],\n", " rate_hz=10.0,\n", " stale_config={\"slow_sensor\": {\"after\": 1.0, \"policy\": \"zero\"}})\n", "\n", "print(\"Staleness policy comparison:\")\n", "print(\"\\n Policy 'hold': Keeps last valid sensor value\")\n", "print(\" - Useful when sensor represents a stable measurement\")\n", "print(\" - Example: GPS position (changes slowly)\")\n", "print(\"\\n Policy 'zero': Replaces stale data with zeros\")\n", "print(\" - Useful when absence of data is meaningful\")\n", "print(\" - Example: Object detection (no object = zero confidence)\")" ] }, { "cell_type": "markdown", "id": "16b49024", "metadata": { "lines_to_next_cell": 2 }, "source": [ "## Part 4: Required Topics and Error Handling\n", "\n", "Some nodes cannot function without certain topics. The `required_topics` parameter ensures critical data is available before processing.\n", "\n", "### Example 4.1: Required Topics\n", "\n", "Let's create a controller that requires both position and setpoint." ] }, { "cell_type": "code", "execution_count": null, "id": "2048f5eb", "metadata": { "lines_to_next_cell": 2, "tags": [ "skip-execution" ] }, "outputs": [], "source": [ "def tracking_controller_callback(tk, current_position, setpoint, Kp=1.0):\n", " \"\"\"\n", " Track a time-varying setpoint.\n", "\n", " Args:\n", " tk (float): Current time\n", " current_position (np.ndarray): (3) current position\n", " setpoint (np.ndarray): (3) desired position\n", " Kp (float): Proportional gain\n", "\n", " Returns:\n", " dict: {'cmd_vel': (6) Twist array}\n", " \"\"\"\n", " # Cannot compute control without both position and setpoint!\n", " error = setpoint - current_position\n", " v_linear = Kp * error\n", " v_angular = np.zeros(3)\n", " cmd_vel = np.concatenate([v_linear, v_angular])\n", "\n", " return {\"cmd_vel\": cmd_vel}\n", "\n", "\n", "# Create node with required topics\n", "tracking_node = ROSNode(\n", " node_name=\"tracking_controller\",\n", " callback=lambda tk, pos, sp: tracking_controller_callback(tk, pos, sp, Kp=0.8),\n", " subscribes_to=[\n", " (\"/current_position\", Vector3, \"current_position\"),\n", " (\"/setpoint\", Vector3, \"setpoint\"),\n", " ],\n", " publishes_to=[\n", " (\"cmd_vel\", Twist, \"/cmd_vel\"),\n", " ],\n", " rate_hz=50.0,\n", " required_topics={\"current_position\", \"setpoint\"}, # Both required!\n", ")\n", "\n", "print(\"Tracking controller node created!\")\n", "print(f\" Required topics: current_position, setpoint\")\n", "print(\n", " f\" Behavior: Callback will NOT run until both topics have published at least once\"\n", ")\n", "print(f\" Use case: Prevents control commands before system is initialized\")" ] }, { "cell_type": "markdown", "id": "361aa916", "metadata": { "lines_to_next_cell": 2 }, "source": [ "### Example 4.2: Error Callbacks\n", "\n", "Handle errors gracefully with custom error callbacks." ] }, { "cell_type": "code", "execution_count": null, "id": "4e8607ec", "metadata": { "lines_to_next_cell": 2, "tags": [ "skip-execution" ] }, "outputs": [], "source": [ "def risky_callback(tk, sensor_data):\n", " \"\"\"\n", " A callback that might raise exceptions.\n", "\n", " Args:\n", " tk (float): Current time\n", " sensor_data (np.ndarray): (3) sensor reading\n", "\n", " Returns:\n", " dict: {'output': processed data}\n", " \"\"\"\n", " # This might divide by zero!\n", " if np.linalg.norm(sensor_data) < 1e-6:\n", " raise ValueError(\"Sensor data too small, possible sensor failure!\")\n", "\n", " normalized = sensor_data / np.linalg.norm(sensor_data)\n", " return {\"output\": normalized}\n", "\n", "\n", "# Custom error handler\n", "def my_error_handler(error_msg, exception):\n", " \"\"\"\n", " Handle errors from the node callback.\n", "\n", " Args:\n", " error_msg (str): Description of where error occurred\n", " exception (Exception): The exception that was raised\n", " \"\"\"\n", " print(f\"\\n⚠️ ERROR: {error_msg}\")\n", " print(f\" Exception: {type(exception).__name__}: {exception}\")\n", " print(f\" Recovery: Publishing zero output as fallback\\n\")\n", "\n", "\n", "# Create node with error callback\n", "robust_node = ROSNode(\n", " node_name=\"robust_processor\",\n", " callback=risky_callback,\n", " subscribes_to=[\n", " (\"/sensor\", Vector3, \"sensor_data\"),\n", " ],\n", " publishes_to=[\n", " (\"output\", Vector3, \"/processed\"),\n", " ],\n", " rate_hz=10.0,\n", " error_callback=my_error_handler, # Handle errors gracefully\n", ")\n", "\n", "print(\"Robust node created!\")\n", "print(f\" Error handling: Custom callback for graceful degradation\")\n", "print(f\" Use case: Continue operation even when processing fails\")" ] }, { "cell_type": "markdown", "id": "d3988ffc", "metadata": { "lines_to_next_cell": 2 }, "source": [ "## Part 5: Diagnostics and Heartbeat Monitoring\n", "\n", "For production systems, you need to monitor node health. The `ROSNode` provides:\n", "- **Diagnostics**: Message counts, rates, latency, error counts\n", "- **Heartbeat monitoring**: Detect when topics stop publishing\n", "\n", "### Example 5.1: Node Diagnostics\n", "\n", "Enable diagnostics to track node performance." ] }, { "cell_type": "code", "execution_count": null, "id": "a06b2640", "metadata": { "lines_to_next_cell": 2, "tags": [ "skip-execution" ] }, "outputs": [], "source": [ "def monitored_callback(tk, input_data):\n", " \"\"\"\n", " Simple processing with diagnostics tracking.\n", "\n", " Args:\n", " tk (float): Current time\n", " input_data (np.ndarray): (3) input\n", "\n", " Returns:\n", " dict: {'output': processed data}\n", " \"\"\"\n", " # Simulate some processing\n", " output = input_data * 2.0\n", " return {\"output\": output}\n", "\n", "\n", "# Create node with diagnostics enabled\n", "monitored_node = ROSNode(\n", " node_name=\"monitored_processor\",\n", " callback=monitored_callback,\n", " subscribes_to=[\n", " (\"/input\", Vector3, \"input_data\"),\n", " ],\n", " publishes_to=[\n", " (\"output\", Vector3, \"/output\"),\n", " ],\n", " rate_hz=10.0,\n", " enable_diagnostics=True, # Track performance metrics\n", ")\n", "\n", "print(\"Node with diagnostics created!\")\n", "print(f\"\\nDiagnostics will track:\")\n", "print(f\" - Message counts per topic\")\n", "print(f\" - Average message rates (Hz)\")\n", "print(f\" - Message latency (time from stamp to processing)\")\n", "print(f\" - Error counts and types\")\n", "print(f\" - Node uptime\")\n", "print(f\"\\nAccess via: monitored_node.get_diagnostics()\")" ] }, { "cell_type": "markdown", "id": "c5b4c466", "metadata": { "lines_to_next_cell": 2 }, "source": [ "### Example 5.2: Heartbeat Monitoring\n", "\n", "Detect when critical sensors stop publishing." ] }, { "cell_type": "code", "execution_count": null, "id": "bcbb8403", "metadata": { "lines_to_next_cell": 2, "tags": [ "skip-execution" ] }, "outputs": [], "source": [ "def critical_callback(tk, gps, imu):\n", " \"\"\"\n", " Process critical sensor data.\n", "\n", " Args:\n", " tk (float): Current time\n", " gps (np.ndarray): (3) GPS position\n", " imu (np.ndarray): (10) IMU data [qx, qy, qz, qw, wx, wy, wz, ax, ay, az]\n", "\n", " Returns:\n", " dict: {'status': health indicator}\n", " \"\"\"\n", " # Extract orientation from IMU\n", " orientation = imu[:4]\n", "\n", " # Simple health metric\n", " status = np.array([1.0, 0.0, 0.0]) # [healthy, degraded, failed]\n", "\n", " return {\"status\": status}\n", "\n", "\n", "# Create node with heartbeat monitoring\n", "from sensor_msgs.msg import Imu\n", "\n", "critical_node = ROSNode(\n", " node_name=\"critical_sensors\",\n", " callback=critical_callback,\n", " subscribes_to=[\n", " (\"/gps/position\", Vector3, \"gps\"),\n", " (\"/imu/data\", Imu, \"imu\"),\n", " ],\n", " publishes_to=[\n", " (\"status\", Vector3, \"/sensor_status\"),\n", " ],\n", " rate_hz=10.0,\n", " heartbeat_config={\n", " \"gps\": 2.0, # Expect GPS at least every 2 seconds\n", " \"imu\": 0.1, # Expect IMU at least every 100 ms\n", " },\n", " enable_diagnostics=True)\n", "\n", "print(\"Critical sensor monitoring node created!\")\n", "print(f\"\\nHeartbeat monitoring:\")\n", "print(f\" - GPS: timeout if no message for 2.0 seconds\")\n", "print(f\" - IMU: timeout if no message for 0.1 seconds\")\n", "print(f\"\\nCheck health via: critical_node.get_diagnostics()['heartbeat_status']\")" ] }, { "cell_type": "markdown", "id": "de44d74a", "metadata": { "lines_to_next_cell": 2 }, "source": [ "## Part 6: Running and Managing Nodes\n", "\n", "To actually run a node, you need to:\n", "1. Create the node (we've been doing this)\n", "2. Call `create_node()` to instantiate the ROS2 node\n", "3. Call `start()` to begin execution\n", "4. Call `stop()` to gracefully shutdown\n", "\n", "### Example 6.1: Node Lifecycle" ] }, { "cell_type": "code", "execution_count": null, "id": "e9c05505", "metadata": { "tags": [ "skip-execution" ] }, "outputs": [], "source": [ "# Simple example callback\n", "def simple_callback(tk):\n", " \"\"\"\n", " A node that publishes time.\n", "\n", " Args:\n", " tk (float): Current time\n", "\n", " Returns:\n", " dict: {'time_vec': (3) with time in first element}\n", " \"\"\"\n", " time_vec = np.array([tk, 0.0, 0.0])\n", " print(f\"Publishing time: {tk:.2f}\")\n", " return {\"time_vec\": time_vec}\n", "\n", "\n", "# Create the node\n", "time_node = ROSNode(\n", " node_name=\"time_publisher\",\n", " callback=simple_callback,\n", " subscribes_to=[], # No subscriptions\n", " publishes_to=[\n", " (\"time_vec\", Vector3, \"/current_time\"),\n", " ],\n", " rate_hz=1.0, # Publish once per second\n", ")\n", "\n", "print(\"Node lifecycle:\")\n", "print(\"\\n1. Create ROSNode object ✓\")\n", "print(\" - Defines callback, topics, rate\")\n", "print(\" - Does NOT create ROS2 node yet\")\n", "print(\"\\n2. Call create_node():\")\n", "print(\" - time_node.create_node()\")\n", "print(\" - Instantiates ROS2 node, publishers, subscribers\")\n", "print(\"\\n3. Call start():\")\n", "print(\" - time_node.start()\")\n", "print(\" - Begins callback execution at specified rate\")\n", "print(\" - Runs in background thread\")\n", "print(\"\\n4. Call stop():\")\n", "print(\" - time_node.stop()\")\n", "print(\" - Graceful shutdown\")\n", "\n", "# Uncomment to actually run:\n", "# time_node.create_node()\n", "# time_node.start()\n", "# time.sleep(5) # Run for 5 seconds\n", "# time_node.stop()" ] }, { "cell_type": "markdown", "id": "30d51218", "metadata": { "lines_to_next_cell": 2 }, "source": [ "### Example 6.2: Running Multiple Nodes\n", "\n", "You can run multiple nodes in the same process." ] }, { "cell_type": "code", "execution_count": null, "id": "0930bea4", "metadata": { "tags": [ "skip-execution" ] }, "outputs": [], "source": [ "# Publisher node\n", "def publisher_callback(tk):\n", " data = np.array([np.sin(tk), np.cos(tk), tk])\n", " return {\"data\": data}\n", "\n", "\n", "publisher_node = ROSNode(\n", " node_name=\"data_publisher\",\n", " callback=publisher_callback,\n", " subscribes_to=[],\n", " publishes_to=[(\"data\", Vector3, \"/sensor_data\")],\n", " rate_hz=10.0)\n", "\n", "\n", "# Subscriber node\n", "def subscriber_callback(tk, data):\n", " processed = data**2\n", " return {\"processed\": processed}\n", "\n", "\n", "subscriber_node = ROSNode(\n", " node_name=\"data_processor\",\n", " callback=subscriber_callback,\n", " subscribes_to=[(\"/sensor_data\", Vector3, \"data\")],\n", " publishes_to=[(\"processed\", Vector3, \"/processed_data\")],\n", " rate_hz=10.0)\n", "\n", "print(\"Multi-node system:\")\n", "print(\"\\n Publisher: /sensor_data (10 Hz)\")\n", "print(\" ↓\")\n", "print(\" Processor: /processed_data (10 Hz)\")\n", "print(\"\\nTo run both:\")\n", "print(\" publisher_node.create_node()\")\n", "print(\" subscriber_node.create_node()\")\n", "print(\" publisher_node.start()\")\n", "print(\" subscriber_node.start()\")\n", "print(\" # ... run for some time ...\")\n", "print(\" publisher_node.stop()\")\n", "print(\" subscriber_node.stop()\")" ] }, { "cell_type": "markdown", "id": "9aec8c7f", "metadata": {}, "source": [ "## Part 7: Integration with DynamicalSystem\n", "\n", "The real power of `pykal` comes from wrapping `DynamicalSystem` instances as ROS nodes. This allows you to deploy control systems designed in Python directly to ROS.\n", "\n", "### Example 7.1: Kalman Filter as ROS Node\n", "\n", "Let's wrap a simple Kalman filter as a ROS node." ] }, { "cell_type": "code", "execution_count": null, "id": "b39d810d", "metadata": { "tags": [ "skip-execution" ] }, "outputs": [], "source": [ "from pykal import DynamicalSystem\n", "from pykal.algorithm_library.estimators.kf import KF\n", "\n", "\n", "# Create a simple 1D Kalman filter\n", "def create_kf_node():\n", " \"\"\"\n", " Create a Kalman filter node for 1D position estimation.\n", "\n", " State: x = [position, velocity]\n", " Measurement: y = [position] (noisy)\n", " \"\"\"\n", " # Initial state\n", " x0 = np.array([0.0, 0.0]) # [position, velocity]\n", " P0 = np.eye(2) * 1.0\n", "\n", " # Dynamics: constant velocity model\n", " dt = 0.1\n", "\n", " def f(x, u):\n", " F = np.array([[1, dt], [0, 1]])\n", " return F @ x\n", "\n", " def F_func(x, u):\n", " return np.array([[1, dt], [0, 1]])\n", "\n", " def Q_func():\n", " return np.eye(2) * 0.01\n", "\n", " # Measurement: observe position only\n", " def h_meas(x):\n", " return np.array([x[0]])\n", "\n", " def H_func(x):\n", " return np.array([[1.0, 0.0]])\n", "\n", " def R_func():\n", " return np.array([[0.1]])\n", "\n", " # Create parameter dictionaries\n", " param_dict = {\n", " \"x\": x0,\n", " \"P\": P0,\n", " \"u\": np.array([0.0]),\n", " \"y\": np.array([0.0]),\n", " }\n", "\n", " f_params = {\"x\": x0, \"u\": np.array([0.0])}\n", " F_params = {\"x\": x0, \"u\": np.array([0.0])}\n", " Q_params = {}\n", " h_params = {\"x\": x0}\n", " H_params = {\"x\": x0}\n", " R_params = {}\n", "\n", " # Wrap in DynamicalSystem\n", " kf = DynamicalSystem(\n", " params=param_dict,\n", " f=lambda pd: KF.f(\n", " x_P=(pd[\"x\"], pd[\"P\"]),\n", " y=pd[\"y\"],\n", " u=pd[\"u\"],\n", " f=f,\n", " F=F_func,\n", " Q=Q_func,\n", " h=h_meas,\n", " H=H_func,\n", " R=R_func,\n", " f_params=f_params,\n", " F_params=F_params,\n", " Q_params=Q_params,\n", " h_params=h_params,\n", " H_params=H_params,\n", " R_params=R_params),\n", " h=lambda pd: KF.h(pd[\"x_P\"]),\n", " state_name=\"x_P\")\n", "\n", " # Create ROS callback that wraps the DynamicalSystem\n", " def kf_callback(tk, measurement):\n", " \"\"\"\n", " Kalman filter update.\n", "\n", " Args:\n", " tk (float): Current time\n", " measurement (np.ndarray): (3) but we only use first element\n", "\n", " Returns:\n", " dict: {'estimate': (3) with [position, velocity, 0]}\n", " \"\"\"\n", " # Update measurement in parameter dict\n", " kf.param_dict[\"y\"] = np.array([measurement[0]])\n", "\n", " # Run one step of the filter\n", " x_est, new_state = kf.step()\n", "\n", " # Pack estimate as Vector3 for publishing\n", " estimate = np.array([x_est[0], x_est[1], 0.0])\n", "\n", " print(\n", " f\"[{tk:.2f}s] Measurement: {measurement[0]:.3f}, \"\n", " f\"Estimate: pos={x_est[0]:.3f}, vel={x_est[1]:.3f}\"\n", " )\n", "\n", " return {\"estimate\": estimate}\n", "\n", " # Wrap as ROS node\n", " kf_node = ROSNode(\n", " node_name=\"kalman_filter\",\n", " callback=kf_callback,\n", " subscribes_to=[\n", " (\"/noisy_position\", Vector3, \"measurement\"),\n", " ],\n", " publishes_to=[\n", " (\"estimate\", Vector3, \"/filtered_state\"),\n", " ],\n", " rate_hz=10.0)\n", "\n", " return kf_node\n", "\n", "\n", "# Create the Kalman filter node\n", "kf_ros_node = create_kf_node()\n", "\n", "print(\"Kalman filter ROS node created!\")\n", "print(f\"\\nDynamicalSystem integration:\")\n", "print(f\" - DynamicalSystem.f() performs predict-update cycle\")\n", "print(f\" - DynamicalSystem.h() extracts state estimate\")\n", "print(f\" - ROSNode wraps the DynamicalSystem.step() method\")\n", "print(f\" - Automatic ROS message conversion (Vector3 ↔ NumPy)\")\n", "print(f\"\\nSubscribes to: /noisy_position (Vector3)\")\n", "print(f\"Publishes to: /filtered_state (Vector3)\")\n", "print(f\"Rate: 10 Hz\")" ] }, { "cell_type": "markdown", "id": "49d8dcaa", "metadata": {}, "source": [ ":::{note}\n", "This pattern shows the full power of `pykal`:\n", "1. Design algorithm as `DynamicalSystem` (theory → software)\n", "2. Test in Python/Jupyter with NumPy arrays\n", "3. Wrap as `ROSNode` for deployment (software → ROS)\n", "4. Deploy to Gazebo or hardware (ROS → simulation/hardware)\n", ":::" ] }, { "cell_type": "markdown", "id": "cruise-control-intro", "metadata": {}, "source": [ "## Part 8: Complete Example - Car Cruise Control ROS Deployment\n", "\n", "Recall from the Conceptual Foundation that we showed a complete car cruise control system:\n", "\n", "