Advanced ROS Features
This guide covers advanced ROS2 features for building production-quality robotics systems with pykal.
Overview
The core tutorials showed basic ROS integration: topics, nodes, and message conversion. This guide covers:
Transforms (tf/tf2): Coordinate frame management
ROS Bags: Data recording and playback
Multi-Robot Systems: Namespaces and multiple nodes
Visualization (RViz2): 3D visualization of robot state
Quality of Service (QoS): Reliable communication tuning
Parameter Server: Dynamic configuration
Actions: Long-running tasks with feedback
Transform Frames (tf/tf2)
Why Transforms Matter
Robotics systems have multiple coordinate frames:
world: Global reference frame
odom: Odometry frame (drift over time)
base_link: Robot body frame
sensor_frame: Camera, LiDAR, IMU frames
Problem: Sensor data arrives in sensor frame, controller needs world frame.
Solution: tf/tf2 automatically transforms between frames.
Understanding the Transform Tree
Typical TurtleBot3 transform tree:
world
└─ odom
└─ base_footprint
└─ base_link
├─ imu_link
├─ camera_link
└─ laser_link
Reading the tree:
world → odom: Localization correction (e.g., from AMCL)odom → base_footprint: Odometry integrationbase_link → sensor_link: Fixed sensor mounting
Using tf2 in Python
Install:
sudo apt install ros-humble-tf2-ros ros-humble-tf2-geometry-msgs
Basic transform lookup:
import rclpy
from rclpy.node import Node
from tf2_ros import TransformListener, Buffer
from geometry_msgs.msg import TransformStamped
class TransformUser(Node):
def __init__(self):
super().__init__('transform_user')
# Create buffer and listener
self.tf_buffer = Buffer()
self.tf_listener = TransformListener(self.tf_buffer, self)
def get_transform(self, target_frame, source_frame):
"""Get transform from source to target frame."""
try:
# Wait up to 1 second for transform
transform = self.tf_buffer.lookup_transform(
target_frame,
source_frame,
rclpy.time.Time(), # Latest available
timeout=rclpy.duration.Duration(seconds=1.0)
)
return transform
except Exception as e:
self.get_logger().error(f'Transform lookup failed: {e}')
return None
Transform a point:
from tf2_geometry_msgs import do_transform_point
from geometry_msgs.msg import PointStamped
def transform_point(self, point_in_sensor_frame):
"""Transform point from sensor frame to world frame."""
# Create stamped point
point_stamped = PointStamped()
point_stamped.header.frame_id = 'camera_link'
point_stamped.header.stamp = self.get_clock().now().to_msg()
point_stamped.point.x = point_in_sensor_frame[0]
point_stamped.point.y = point_in_sensor_frame[1]
point_stamped.point.z = point_in_sensor_frame[2]
# Get transform
transform = self.get_transform('world', 'camera_link')
# Apply transform
point_in_world = do_transform_point(point_stamped, transform)
return np.array([
point_in_world.point.x,
point_in_world.point.y,
point_in_world.point.z
])
Publishing Transforms
If your pykal estimator produces pose estimates, publish them as transforms:
from tf2_ros import TransformBroadcaster
from geometry_msgs.msg import TransformStamped
class EstimatorWithTF(Node):
def __init__(self):
super().__init__('estimator_tf')
self.tf_broadcaster = TransformBroadcaster(self)
def publish_estimate_as_tf(self, position, quaternion):
"""Publish estimated pose as transform."""
t = TransformStamped()
t.header.stamp = self.get_clock().now().to_msg()
t.header.frame_id = 'world'
t.child_frame_id = 'base_link_estimated'
t.transform.translation.x = position[0]
t.transform.translation.y = position[1]
t.transform.translation.z = position[2]
t.transform.rotation.x = quaternion[0]
t.transform.rotation.y = quaternion[1]
t.transform.rotation.z = quaternion[2]
t.transform.rotation.w = quaternion[3]
self.tf_broadcaster.sendTransform(t)
Integrating tf2 with pykal
Since ROSNode doesn’t natively support tf2, use a wrapper:
class PykalNodeWithTF:
def __init__(self, pykal_node):
self.pykal_node = pykal_node
self.tf_buffer = Buffer()
self.tf_listener = TransformListener(self.tf_buffer, pykal_node._node)
def create_node(self):
self.pykal_node.create_node()
def start(self):
self.pykal_node.start()
def lookup_transform(self, target, source):
return self.tf_buffer.lookup_transform(target, source, rclpy.time.Time())
ROS Bags: Recording and Replay
Why Use Bags?
ROS bags record topic data to disk for:
Debugging: Replay problematic scenarios
Development: Test without hardware
Analysis: Post-process data in Python
Sharing: Distribute datasets
Recording Bags
Basic recording:
# Record specific topics
ros2 bag record /odom /cmd_vel /estimate
# Record all topics
ros2 bag record -a
# Save to specific file
ros2 bag record -o my_experiment /odom /cmd_vel
# Stop with Ctrl+C
Selective recording:
# Only control-related topics
ros2 bag record -e "/cmd_*|/odom"
# Exclude image topics (large!)
ros2 bag record -a -x "/camera/.*"
Playing Back Bags
Basic playback:
# Play at normal speed
ros2 bag play my_bag
# Play at 2x speed
ros2 bag play my_bag --rate 2.0
# Play in loop
ros2 bag play my_bag --loop
# Start paused (press space to start)
ros2 bag play my_bag --start-paused
Remap topics during playback:
# Replay /odom as /odom_recorded
ros2 bag play my_bag --remap /odom:=/odom_recorded
Analyzing Bags in Python
from rosbag2_py import SequentialReader, StorageOptions, ConverterOptions
from rclpy.serialization import deserialize_message
from rosidl_runtime_py.utilities import get_message
import numpy as np
def extract_odom_data(bag_path):
"""Extract odometry data from bag file."""
# Setup reader
storage_options = StorageOptions(uri=bag_path, storage_id='sqlite3')
converter_options = ConverterOptions(
input_serialization_format='cdr',
output_serialization_format='cdr'
)
reader = SequentialReader()
reader.open(storage_options, converter_options)
# Get topic metadata
topic_types = reader.get_all_topics_and_types()
type_map = {t.name: t.type for t in topic_types}
# Extract messages
odom_data = []
while reader.has_next():
topic, data, timestamp = reader.read_next()
if topic == '/odom':
msg_type = get_message(type_map[topic])
msg = deserialize_message(data, msg_type)
odom_data.append({
'timestamp': timestamp * 1e-9, # Convert to seconds
'x': msg.pose.pose.position.x,
'y': msg.pose.pose.position.y,
'vx': msg.twist.twist.linear.x,
'vy': msg.twist.twist.linear.y
})
return np.array([(d['timestamp'], d['x'], d['y'], d['vx'], d['vy'])
for d in odom_data])
Plotting bag data:
import matplotlib.pyplot as plt
# Extract data
odom = extract_odom_data('my_experiment')
# Plot trajectory
plt.figure(figsize=(10, 5))
plt.subplot(1, 2, 1)
plt.plot(odom[:, 1], odom[:, 2])
plt.xlabel('X (m)')
plt.ylabel('Y (m)')
plt.title('Robot Trajectory')
plt.axis('equal')
# Plot velocity
plt.subplot(1, 2, 2)
plt.plot(odom[:, 0], odom[:, 3], label='vx')
plt.plot(odom[:, 0], odom[:, 4], label='vy')
plt.xlabel('Time (s)')
plt.ylabel('Velocity (m/s)')
plt.legend()
plt.title('Robot Velocity')
plt.tight_layout()
plt.show()
Multi-Robot Systems
Using Namespaces
Run multiple robots without topic conflicts:
# Robot 1 nodes
robot1_controller = ROSNode(
node_name='controller',
namespace='robot1', # Topics become /robot1/cmd_vel, etc.
callback=controller_callback,
subscribes_to=[('/odom', Odometry, 'odom')], # Becomes /robot1/odom
publishes_to=[('cmd_vel', Twist, '/cmd_vel')], # Becomes /robot1/cmd_vel
rate_hz=10.0
)
# Robot 2 nodes
robot2_controller = ROSNode(
node_name='controller',
namespace='robot2',
callback=controller_callback,
subscribes_to=[('/odom', Odometry, 'odom')],
publishes_to=[('cmd_vel', Twist, '/cmd_vel')],
rate_hz=10.0
)
Note: pykal’s ROSNode doesn’t natively support namespaces. You need to modify topic names manually:
# Workaround: Include namespace in topic names
robot1_controller = ROSNode(
node_name='robot1_controller',
callback=controller_callback,
subscribes_to=[('/robot1/odom', Odometry, 'odom')],
publishes_to=[('cmd_vel', Twist, '/robot1/cmd_vel')],
rate_hz=10.0
)
Multi-Robot Coordination
Share information between robots:
def robot1_callback(tk, robot1_odom, robot2_odom):
"""Controller that considers other robot's position."""
# Extract positions
my_pos = robot1_odom[:2]
other_pos = robot2_odom[:2]
# Avoid collision
distance = np.linalg.norm(my_pos - other_pos)
if distance < 0.5: # Too close!
# Compute repulsive force
repulsion = (my_pos - other_pos) / distance
else:
repulsion = np.zeros(2)
# Navigate to goal + avoid other robot
goal = np.array([5.0, 5.0])
attraction = (goal - my_pos) / np.linalg.norm(goal - my_pos)
velocity = attraction + repulsion
return {'cmd_vel': np.concatenate([velocity, [0, 0, 0, 0]])}
# Node subscribes to both robots' odometry
robot1_node = ROSNode(
node_name='robot1_controller',
callback=robot1_callback,
subscribes_to=[
('/robot1/odom', Odometry, 'robot1_odom'),
('/robot2/odom', Odometry, 'robot2_odom')
],
publishes_to=[('cmd_vel', Twist, '/robot1/cmd_vel')],
rate_hz=10.0
)
Visualization with RViz2
Launch RViz2
# Launch RViz
rviz2
# Or with config file
rviz2 -d my_config.rviz
Common Display Types
Odometry:
Add → By topic → /odom → Odometry
Shows robot pose as axes
Path:
# Publish Path message for trajectory visualization
from nav_msgs.msg import Path
from geometry_msgs.msg import PoseStamped
def publish_path(self, positions):
path = Path()
path.header.frame_id = 'world'
path.header.stamp = self.get_clock().now().to_msg()
for pos in positions:
pose = PoseStamped()
pose.header = path.header
pose.pose.position.x = pos[0]
pose.pose.position.y = pos[1]
pose.pose.position.z = pos[2]
path.poses.append(pose)
self.path_publisher.publish(path)
Markers (custom visualization):
from visualization_msgs.msg import Marker
def publish_goal_marker(self, goal_position):
marker = Marker()
marker.header.frame_id = 'world'
marker.header.stamp = self.get_clock().now().to_msg()
marker.type = Marker.SPHERE
marker.action = Marker.ADD
marker.pose.position.x = goal_position[0]
marker.pose.position.y = goal_position[1]
marker.pose.position.z = goal_position[2]
marker.scale.x = 0.2
marker.scale.y = 0.2
marker.scale.z = 0.2
marker.color.r = 1.0
marker.color.g = 0.0
marker.color.b = 0.0
marker.color.a = 1.0
self.marker_publisher.publish(marker)
Quality of Service (QoS)
Why QoS Matters
Different scenarios need different reliability:
Real-time control: Latest data, drop old messages
Logging: Reliable delivery, no drops
Diagnostics: Best effort, don’t block
QoS Profiles
from rclpy.qos import QoSProfile, ReliabilityPolicy, DurabilityPolicy, HistoryPolicy
# Sensor data (latest only)
sensor_qos = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
durability=DurabilityPolicy.VOLATILE,
history=HistoryPolicy.KEEP_LAST,
depth=1
)
# Control commands (reliable)
command_qos = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
durability=DurabilityPolicy.VOLATILE,
history=HistoryPolicy.KEEP_LAST,
depth=10
)
# Logged data (persistent)
logging_qos = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
durability=DurabilityPolicy.TRANSIENT_LOCAL,
history=HistoryPolicy.KEEP_ALL
)
Note: pykal’s ROSNode doesn’t expose QoS settings. You’d need to modify the source or create publishers/subscribers manually.
Matching QoS
Publishers and subscribers must have compatible QoS:
# Check QoS of existing topic
ros2 topic info /odom --verbose
# Output shows:
# Reliability: RELIABLE / BEST_EFFORT
# Durability: VOLATILE / TRANSIENT_LOCAL
Common issue: Gazebo publishes with BEST_EFFORT, your subscriber uses RELIABLE → no connection!
Solution: Match publisher’s QoS.
Parameter Server
Dynamic Parameters
Change node behavior without restarting:
from rcl_interfaces.msg import ParameterDescriptor
class ParameterizedNode(Node):
def __init__(self):
super().__init__('param_node')
# Declare parameters
self.declare_parameter('kp', 1.0,
ParameterDescriptor(description='Proportional gain'))
self.declare_parameter('kd', 0.5,
ParameterDescriptor(description='Derivative gain'))
# Read parameters
self.kp = self.get_parameter('kp').value
self.kd = self.get_parameter('kd').value
# Add callback for parameter changes
self.add_on_set_parameters_callback(self.parameter_callback)
def parameter_callback(self, params):
for param in params:
if param.name == 'kp':
self.kp = param.value
self.get_logger().info(f'Kp updated to {self.kp}')
elif param.name == 'kd':
self.kd = param.value
self.get_logger().info(f'Kd updated to {self.kd}')
return SetParametersResult(successful=True)
Set parameters from command line:
# Set parameter
ros2 param set /param_node kp 2.0
# Get parameter
ros2 param get /param_node kp
# List all parameters
ros2 param list
Load from YAML file:
# config/params.yaml
param_node:
ros__parameters:
kp: 1.5
kd: 0.3
ros2 run my_package my_node --ros-args --params-file config/params.yaml
Actions (Long-Running Tasks)
When to Use Actions
Actions are for tasks with:
Duration > 1 second
Intermediate feedback needed
Cancellation support
Examples:
Navigate to waypoint (feedback: distance remaining)
Trajectory following (feedback: current segment)
Object manipulation (feedback: grasp progress)
Action Client Example
from rclpy.action import ActionClient
from nav2_msgs.action import NavigateToPose
class NavigationClient(Node):
def __init__(self):
super().__init__('nav_client')
self.action_client = ActionClient(self, NavigateToPose, 'navigate_to_pose')
def send_goal(self, x, y, theta):
goal_msg = NavigateToPose.Goal()
goal_msg.pose.header.frame_id = 'map'
goal_msg.pose.pose.position.x = x
goal_msg.pose.pose.position.y = y
# ... set orientation from theta
self.action_client.wait_for_server()
send_goal_future = self.action_client.send_goal_async(
goal_msg,
feedback_callback=self.feedback_callback
)
send_goal_future.add_done_callback(self.goal_response_callback)
def feedback_callback(self, feedback_msg):
feedback = feedback_msg.feedback
self.get_logger().info(f'Distance remaining: {feedback.distance_remaining:.2f}m')
def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info('Goal rejected')
return
self.get_logger().info('Goal accepted')
result_future = goal_handle.get_result_async()
result_future.add_done_callback(self.result_callback)
def result_callback(self, future):
result = future.result().result
self.get_logger().info('Navigation complete!')
Best Practices
Use tf2 for all coordinate transforms
Don’t manually transform between frames
Let tf2 handle the math and timing
Record bags for every experiment
Invaluable for debugging
Essential for reproducible research
Match QoS profiles
Check existing topic QoS before subscribing
Use RELIABLE for critical data, BEST_EFFORT for high-rate sensors
Parameterize everything
Controller gains, filter noise, rates
Use YAML config files
Visualize with RViz
See what the robot sees
Debug transform trees
Verify sensor data
Namespace multi-robot systems
Prevents topic conflicts
Enables scaling to N robots
Summary
Advanced Features Covered:
✓ tf/tf2: Coordinate frame transformations ✓ ROS Bags: Recording, playback, analysis ✓ Multi-Robot: Namespaces and coordination ✓ RViz2: 3D visualization ✓ QoS: Communication reliability tuning ✓ Parameters: Dynamic configuration ✓ Actions: Long-running tasks with feedback
Integration with pykal:
Most features require extending
ROSNodeor using native ROS2 nodestf2, bags, and RViz work seamlessly with pykal nodes
Parameters and QoS need custom wrappers
Next Steps:
Implement multi-robot coordination in pykal
Record experimental data with bags
Create custom RViz displays for your system
Build action servers for complex tasks