← Modules

ros2py_py2ros

import numpy as np
from pykal.ros.ros2py_py2ros import (
    # Basic geometry
    ros2py_vector3,
    py2ros_vector3,
    ros2py_quaternion,
    py2ros_quaternion,
    ros2py_pose,
    py2ros_pose,
    ros2py_twist,
    py2ros_twist,
    # Stamped versions
    ros2py_pose_stamped,
    py2ros_pose_stamped,
    ros2py_twist_stamped,
    py2ros_twist_stamped,
    ros2py_vector3_stamped,
    py2ros_vector3_stamped,
    # Complex messages
    ros2py_odometry,
    py2ros_odometry,
    ros2py_imu,
    py2ros_imu,
    ros2py_transform_stamped,
    py2ros_transform_stamped,
    # Sensor messages
    ros2py_laserscan,
    py2ros_laserscan,
    ros2py_joint_state,
    py2ros_joint_state,
    # Time utilities
    ros2py_time,
    py2ros_time,
    ros2py_header,
    py2ros_header,
    # Registries
    ROS2PY_DEFAULT,
    PY2ROS_DEFAULT)

# Import ROS message types
from geometry_msgs.msg import (
    Vector3,
    Quaternion,
    Pose,
    Twist,
    PoseStamped,
    TwistStamped,
    Vector3Stamped,
    TransformStamped)
from nav_msgs.msg import Odometry
from sensor_msgs.msg import Imu, LaserScan, JointState
from std_msgs.msg import Header
from builtin_interfaces.msg import Time

print("Imports successful!")
print(f"Default ROS2PY converters: {len(ROS2PY_DEFAULT)} message types")
print(f"Default PY2ROS converters: {len(PY2ROS_DEFAULT)} message types")

References and Examples

This notebook demonstrates all the ROS message ↔ NumPy conversions available in pykal.ros.ros2py_py2ros. These converters are the foundation for building ROS2 nodes with the ROSNode wrapper, enabling seamless integration between Python control algorithms and ROS2 middleware.

Time and Header Conversions

ROS uses builtin_interfaces/Time for timestamps and std_msgs/Header for metadata. We need to convert these to/from Python floats (seconds since epoch).

Time Conversion

ROS Time has two fields:

  • sec: Integer seconds

  • nanosec: Nanoseconds (0-999,999,999)

We convert to a single float representing seconds.

# Python time → ROS Time
t_python = 1234.567890123  # seconds with nanosecond precision
time_msg = py2ros_time(t_python)

print("Python → ROS Time:")
print(f"  Input: {t_python} seconds")
print(f"  Output: sec={time_msg.sec}, nanosec={time_msg.nanosec}")

# ROS Time → Python time
t_back = ros2py_time(time_msg)
print(f"  Roundtrip: {t_back} seconds")
print(f"  Error: {abs(t_back - t_python):.12f} seconds")
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[1], line 3
      1 # Python time → ROS Time
      2 t_python = 1234.567890123  # seconds with nanosecond precision
----> 3 time_msg = py2ros_time(t_python)
      5 print("Python → ROS Time:")
      6 print(f"  Input: {t_python} seconds")

NameError: name 'py2ros_time' is not defined

Header Conversion

Headers contain:

  • stamp: Time message

  • frame_id: String identifying coordinate frame

We convert to/from a dictionary: {"t": float, "frame_id": str}

# Create ROS Header
header_msg = py2ros_header(t=10.5, frame_id="base_link")
print("Python → ROS Header:")
print(f"  frame_id: {header_msg.frame_id}")
print(f"  stamp: {header_msg.stamp.sec}.{header_msg.stamp.nanosec:09d}")

# Convert back to Python
header_dict = ros2py_header(header_msg)
print(f"\nROS → Python Header:")
print(f"  {header_dict}")

Basic Geometry Messages

These are the fundamental building blocks for robotics: points, orientations, and velocities.

Vector3: 3D Points and Directions

Format: [x, y, z] as NumPy array of shape (3,)

# Create a 3D vector
position = np.array([1.0, 2.0, 3.0])
vec_msg = py2ros_vector3(position)

print("NumPy → ROS Vector3:")
print(f"  Input: {position}")
print(f"  Output: x={vec_msg.x}, y={vec_msg.y}, z={vec_msg.z}")

# Convert back
position_back = ros2py_vector3(vec_msg)
print(f"  Roundtrip: {position_back}")
print(f"  Shape: {position_back.shape}")

# Works with column vectors too
position_col = np.array([[1.0], [2.0], [3.0]])  # Shape (3, 1)
vec_msg2 = py2ros_vector3(position_col)
print(f"\nColumn vector → ROS: x={vec_msg2.x}, y={vec_msg2.y}, z={vec_msg2.z}")

Quaternion: 3D Rotations

Format: [x, y, z, w] as NumPy array of shape (4,)

Quaternions represent rotations. The w component comes last (unlike some conventions).

# Identity rotation (no rotation)
quat_identity = np.array([0.0, 0.0, 0.0, 1.0])  # [x, y, z, w]
quat_msg = py2ros_quaternion(quat_identity)

print("NumPy → ROS Quaternion:")
print(f"  Input: {quat_identity}")
print(f"  Output: x={quat_msg.x}, y={quat_msg.y}, z={quat_msg.z}, w={quat_msg.w}")

# 90° rotation around Z axis
quat_90z = np.array([0.0, 0.0, 0.707, 0.707])  # Approx values
quat_msg2 = py2ros_quaternion(quat_90z)
print(
    f"\n90° Z rotation → ROS: x={quat_msg2.x:.3f}, y={quat_msg2.y:.3f}, "
    f"z={quat_msg2.z:.3f}, w={quat_msg2.w:.3f}"
)

# Roundtrip
quat_back = ros2py_quaternion(quat_msg2)
print(f"  Roundtrip: {quat_back}")

Pose: Position + Orientation

Format: [px, py, pz, qx, qy, qz, qw] as NumPy array of shape (7,)

Combines a 3D position with a quaternion orientation.

# Robot pose: at (1, 2, 0) with identity rotation
pose_array = np.array([1.0, 2.0, 0.0, 0.0, 0.0, 0.0, 1.0])  # position  # quaternion
pose_msg = py2ros_pose(pose_array)

print("NumPy → ROS Pose:")
print(
    f"  Position: ({pose_msg.position.x}, {pose_msg.position.y}, {pose_msg.position.z})"
)
print(
    f"  Orientation: ({pose_msg.orientation.x}, {pose_msg.orientation.y}, "
    f"{pose_msg.orientation.z}, {pose_msg.orientation.w})"
)

# Convert back
pose_back = ros2py_pose(pose_msg)
print(f"\nROS → NumPy Pose:")
print(f"  Array: {pose_back}")
print(f"  Shape: {pose_back.shape}")

Twist: Linear + Angular Velocity

Format: [vx, vy, vz, ωx, ωy, ωz] as NumPy array of shape (6,)

Represents velocity (linear and angular) of a rigid body.

# Moving forward at 0.5 m/s, turning at 0.2 rad/s
twist_array = np.array(
    [0.5, 0.0, 0.0, 0.0, 0.0, 0.2]  # linear velocity
)  # angular velocity
twist_msg = py2ros_twist(twist_array)

print("NumPy → ROS Twist:")
print(
    f"  Linear: ({twist_msg.linear.x}, {twist_msg.linear.y}, {twist_msg.linear.z}) m/s"
)
print(
    f"  Angular: ({twist_msg.angular.x}, {twist_msg.angular.y}, {twist_msg.angular.z}) rad/s"
)

# Convert back
twist_back = ros2py_twist(twist_msg)
print(f"\nROS → NumPy Twist: {twist_back}")

Stamped Messages (With Time)

“Stamped” messages add a header (timestamp + frame_id) to basic geometry types. The timestamp allows synchronization across multiple sensors.

PoseStamped: Pose + Timestamp

Format: [t, px, py, pz, qx, qy, qz, qw] as NumPy array of shape (8,)

# Pose at time t=5.0 seconds
pose_stamped_array = np.array(
    [5.0, 1.0, 2.0, 3.0, 0.0, 0.0, 0.0, 1.0]  # time  # position
)  # quaternion

pose_stamped_msg = py2ros_pose_stamped(pose_stamped_array, frame_id="world")

print("NumPy → ROS PoseStamped:")
print(f"  Timestamp: {ros2py_time(pose_stamped_msg.header.stamp):.3f} s")
print(f"  Frame: {pose_stamped_msg.header.frame_id}")
print(
    f"  Position: ({pose_stamped_msg.pose.position.x}, "
    f"{pose_stamped_msg.pose.position.y}, {pose_stamped_msg.pose.position.z})"
)

# Convert back
pose_stamped_back = ros2py_pose_stamped(pose_stamped_msg)
print(f"\nROS → NumPy PoseStamped:")
print(f"  Array: {pose_stamped_back}")
print(f"  [time, x, y, z, qx, qy, qz, qw]")

TwistStamped: Velocity + Timestamp

Format: [t, vx, vy, vz, ωx, ωy, ωz] as NumPy array of shape (7,)

# Velocity measurement at t=10.5
twist_stamped_array = np.array(
    [10.5, 0.3, 0.0, 0.0, 0.0, 0.0, 0.1]  # time  # linear
)  # angular

twist_stamped_msg = py2ros_twist_stamped(twist_stamped_array, frame_id="base_link")

print("NumPy → ROS TwistStamped:")
print(f"  Time: {ros2py_time(twist_stamped_msg.header.stamp):.3f} s")
print(f"  Frame: {twist_stamped_msg.header.frame_id}")
print(
    f"  Linear vel: ({twist_stamped_msg.twist.linear.x:.2f}, "
    f"{twist_stamped_msg.twist.linear.y:.2f}, {twist_stamped_msg.twist.linear.z:.2f}) m/s"
)

# Roundtrip
twist_stamped_back = ros2py_twist_stamped(twist_stamped_msg)
print(f"\nRoundtrip: {twist_stamped_back}")

Vector3Stamped: 3D Vector + Timestamp

Format: [t, x, y, z] as NumPy array of shape (4,)

Useful for force vectors, acceleration, etc.

# Acceleration measurement
accel_array = np.array([2.5, 0.1, -0.2, 9.81])  # time  # acceleration (m/s²)

accel_msg = py2ros_vector3_stamped(accel_array, frame_id="imu_link")

print("NumPy → ROS Vector3Stamped:")
print(f"  Time: {ros2py_time(accel_msg.header.stamp):.3f} s")
print(
    f"  Vector: ({accel_msg.vector.x:.2f}, {accel_msg.vector.y:.2f}, "
    f"{accel_msg.vector.z:.2f})"
)

accel_back = ros2py_vector3_stamped(accel_msg)
print(f"  Roundtrip: {accel_back}")

Complex Robot Messages

Real robots publish richer messages combining multiple pieces of information.

Odometry: Full Robot State

Format: [px, py, pz, qx, qy, qz, qw, vx, vy, vz, ωx, ωy, ωz] as shape (13,)

Combines pose (7) and twist (6) into a single message. Common output from robot base controllers.

# Complete robot state
odom_array = np.array(
    [
        # Pose: position
        1.5,
        0.8,
        0.0,
        # Pose: orientation (quaternion)
        0.0,
        0.0,
        0.383,
        0.924,  # ~45° rotation around Z
        # Twist: linear velocity
        0.2,
        0.1,
        0.0,
        # Twist: angular velocity
        0.0,
        0.0,
        0.3,
    ]
)

odom_msg = py2ros_odometry(odom_array, frame_id="odom", child_frame_id="base_link")

print("NumPy → ROS Odometry:")
print(f"  Frame: {odom_msg.header.frame_id}{odom_msg.child_frame_id}")
print(
    f"  Position: ({odom_msg.pose.pose.position.x:.2f}, "
    f"{odom_msg.pose.pose.position.y:.2f}, {odom_msg.pose.pose.position.z:.2f})"
)
print(
    f"  Orientation: quat({odom_msg.pose.pose.orientation.x:.3f}, "
    f"{odom_msg.pose.pose.orientation.y:.3f}, {odom_msg.pose.pose.orientation.z:.3f}, "
    f"{odom_msg.pose.pose.orientation.w:.3f})"
)
print(
    f"  Linear vel: ({odom_msg.twist.twist.linear.x:.2f}, "
    f"{odom_msg.twist.twist.linear.y:.2f}, {odom_msg.twist.twist.linear.z:.2f})"
)
print(
    f"  Angular vel: ({odom_msg.twist.twist.angular.x:.2f}, "
    f"{odom_msg.twist.twist.angular.y:.2f}, {odom_msg.twist.twist.angular.z:.2f})"
)

# Convert back
odom_back = ros2py_odometry(odom_msg)
print(f"\nROS → NumPy Odometry:")
print(f"  Shape: {odom_back.shape}")
print(f"  Array: {odom_back}")
print(f"  [px, py, pz, qx, qy, qz, qw, vx, vy, vz, wx, wy, wz]")

IMU: Inertial Measurement Unit

Format: [qx, qy, qz, qw, ωx, ωy, ωz, ax, ay, az] as shape (10,)

Contains:

  • Orientation (quaternion, 4)

  • Angular velocity (gyro, 3)

  • Linear acceleration (accel, 3)

Note: Covariances are not included in the array representation.

# IMU data from a quadrotor
imu_array = np.array(
    [
        # Orientation (quaternion)
        0.0,
        0.0,
        0.0,
        1.0,
        # Angular velocity (rad/s)
        0.1,
        -0.05,
        0.2,
        # Linear acceleration (m/s²)
        0.5,
        0.2,
        9.81,  # Gravity + small perturbation
    ]
)

imu_msg = py2ros_imu(imu_array)

print("NumPy → ROS IMU:")
print(
    f"  Orientation: quat({imu_msg.orientation.x:.3f}, {imu_msg.orientation.y:.3f}, "
    f"{imu_msg.orientation.z:.3f}, {imu_msg.orientation.w:.3f})"
)
print(
    f"  Gyro: ({imu_msg.angular_velocity.x:.2f}, {imu_msg.angular_velocity.y:.2f}, "
    f"{imu_msg.angular_velocity.z:.2f}) rad/s"
)
print(
    f"  Accel: ({imu_msg.linear_acceleration.x:.2f}, {imu_msg.linear_acceleration.y:.2f}, "
    f"{imu_msg.linear_acceleration.z:.2f}) m/s²"
)

# Roundtrip
imu_back = ros2py_imu(imu_msg)
print(f"\nRoundtrip: {imu_back}")

TransformStamped: TF Tree Transforms

Format: [t, tx, ty, tz, qx, qy, qz, qw] as shape (8,)

Similar to PoseStamped but used for coordinate frame transforms in the TF tree.

# Transform from base_link to laser_link
transform_array = np.array(
    [
        15.0,  # time
        0.1,
        0.0,
        0.2,  # translation
        0.0,
        0.0,
        0.0,
        1.0,  # rotation (identity)
    ]
)

tf_msg = py2ros_transform_stamped(
    transform_array, frame_id="base_link", child_frame_id="laser_link"
)

print("NumPy → ROS TransformStamped:")
print(f"  Time: {ros2py_time(tf_msg.header.stamp):.3f} s")
print(f"  Transform: {tf_msg.header.frame_id}{tf_msg.child_frame_id}")
print(
    f"  Translation: ({tf_msg.transform.translation.x:.2f}, "
    f"{tf_msg.transform.translation.y:.2f}, {tf_msg.transform.translation.z:.2f})"
)

# Roundtrip
tf_back = ros2py_transform_stamped(tf_msg)
print(f"\nRoundtrip: {tf_back}")

Sensor Messages

Specialized messages for common robot sensors.

LaserScan: 2D Lidar Data

Format: Dictionary with ranges array and metadata

LaserScan contains an array of range measurements plus scan parameters.

# Simulate a 180° scan with 10 beams
ranges = np.array([1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 4.0, 3.5, 3.0, 2.5])

scan_msg = py2ros_laserscan(
    ranges,
    angle_min=-np.pi / 2,  # -90°
    angle_max=np.pi / 2,  # +90°
    angle_increment=np.pi / 9,  # 180° / 10 beams
    range_min=0.1,
    range_max=10.0,
    frame_id="laser_link",
    stamp=20.0)

print("NumPy → ROS LaserScan:")
print(f"  Frame: {scan_msg.header.frame_id}")
print(f"  Time: {ros2py_time(scan_msg.header.stamp):.3f} s")
print(
    f"  Angle range: {np.rad2deg(scan_msg.angle_min):.1f}° to {np.rad2deg(scan_msg.angle_max):.1f}°"
)
print(f"  Number of beams: {len(scan_msg.ranges)}")
print(f"  Ranges: {scan_msg.ranges[:5]}...")

# Convert back
scan_dict = ros2py_laserscan(scan_msg)
print(f"\nROS → Python Dictionary:")
print(f"  Keys: {list(scan_dict.keys())}")
print(f"  Ranges shape: {scan_dict['ranges'].shape}")
print(f"  Ranges: {scan_dict['ranges']}")

JointState: Robot Joint Positions/Velocities

Format: Dictionary with name, position, velocity, effort arrays

Describes the state of all joints in a robot (e.g., arm joints, wheel joints).

# 3-joint robot arm
joint_names = ["shoulder", "elbow", "wrist"]
joint_positions = np.array([0.5, 1.0, -0.3])  # radians
joint_velocities = np.array([0.1, 0.05, 0.0])  # rad/s
joint_efforts = np.array([2.0, 1.5, 0.5])  # N⋅m

joint_msg = py2ros_joint_state(
    names=joint_names,
    position=joint_positions,
    velocity=joint_velocities,
    effort=joint_efforts)

print("NumPy → ROS JointState:")
print(f"  Joints: {joint_msg.name}")
print(f"  Positions: {joint_msg.position}")
print(f"  Velocities: {joint_msg.velocity}")
print(f"  Efforts: {joint_msg.effort}")

# Convert back
joint_dict = ros2py_joint_state(joint_msg)
print(f"\nROS → Python Dictionary:")
print(f"  Keys: {list(joint_dict.keys())}")
print(f"  Names: {joint_dict['name']}")
print(f"  Positions: {joint_dict['position']}")

Using the Conversion Registries

The ROS2PY_DEFAULT and PY2ROS_DEFAULT dictionaries map message types to converter functions. ROSNode uses these registries automatically.

Checking Available Converters

print("Available ROS2PY converters:")
for msg_type in sorted(ROS2PY_DEFAULT.keys(), key=lambda t: t.__name__):
    print(f"  {msg_type.__module__}.{msg_type.__name__}")

print(f"\nTotal: {len(ROS2PY_DEFAULT)} message types")

Using Registries for Generic Conversion

# Generic converter function
def convert_ros_to_numpy(msg):
    """Convert any registered ROS message to NumPy array."""
    msg_type = type(msg)
    if msg_type not in ROS2PY_DEFAULT:
        raise TypeError(f"No converter for {msg_type.__name__}")

    converter = ROS2PY_DEFAULT[msg_type]
    return converter(msg)


# Test with different message types
test_msgs = [
    py2ros_vector3(np.array([1, 2, 3])),
    py2ros_twist(np.array([0.5, 0, 0, 0, 0, 0.2])),
    py2ros_pose(np.array([1, 2, 3, 0, 0, 0, 1])),
]

for msg in test_msgs:
    arr = convert_ros_to_numpy(msg)
    print(f"{type(msg).__name__:20s}{arr}")

Practical Workflow Examples

Real-world scenarios combining multiple conversions.

Example 1: Processing Odometry Stream

# Simulate receiving odometry messages
def process_odometry_callback(odom_msg: Odometry):
    """Extract position and velocity from odometry."""
    # Convert to NumPy
    odom_array = ros2py_odometry(odom_msg)

    # Extract components
    position = odom_array[0:3]  # [px, py, pz]
    quaternion = odom_array[3:7]  # [qx, qy, qz, qw]
    linear_vel = odom_array[7:10]  # [vx, vy, vz]
    angular_vel = odom_array[10:13]  # [wx, wy, wz]

    return {
        "position": position,
        "orientation": quaternion,
        "linear_velocity": linear_vel,
        "angular_velocity": angular_vel,
        "speed": np.linalg.norm(linear_vel),
    }


# Test with sample message
sample_odom = py2ros_odometry(
    np.array(
        [
            5,
            3,
            0,  # position
            0,
            0,
            0,
            1,  # orientation
            0.8,
            0.2,
            0,  # linear vel
            0,
            0,
            0.1,
        ]
    ),  # angular vel
    frame_id="odom")

result = process_odometry_callback(sample_odom)
print("Processed Odometry:")
for key, value in result.items():
    if isinstance(value, np.ndarray):
        print(f"  {key}: {value}")
    else:
        print(f"  {key}: {value:.3f}")

Example 2: Creating Stamped Messages with Current Time

import time


def create_velocity_command(vx: float, vy: float, omega: float) -> TwistStamped:
    """Create a timestamped velocity command."""
    current_time = time.time()

    # Create array with timestamp
    cmd_array = np.array([current_time, vx, vy, 0.0, 0.0, 0.0, omega])

    # Convert to ROS message
    return py2ros_twist_stamped(cmd_array, frame_id="base_link")


# Generate command
cmd_msg = create_velocity_command(vx=0.5, vy=0.0, omega=0.3)

print("Velocity Command:")
print(f"  Time: {ros2py_time(cmd_msg.header.stamp):.3f} s")
print(f"  Linear: ({cmd_msg.twist.linear.x:.2f}, {cmd_msg.twist.linear.y:.2f}) m/s")
print(f"  Angular: {cmd_msg.twist.angular.z:.2f} rad/s")

Example 3: Batch Processing Multiple Poses

# Generate trajectory of poses
def generate_circular_trajectory(num_points: int, radius: float) -> list:
    """Generate poses along a circular path."""
    poses = []
    for i in range(num_points):
        angle = 2 * np.pi * i / num_points

        # Position along circle
        x = radius * np.cos(angle)
        y = radius * np.sin(angle)
        z = 0.0

        # Orientation facing tangent to circle
        qz = np.sin(angle / 2)
        qw = np.cos(angle / 2)

        pose_array = np.array([x, y, z, 0, 0, qz, qw])
        poses.append(py2ros_pose(pose_array))

    return poses


# Generate and convert
trajectory = generate_circular_trajectory(num_points=8, radius=2.0)

print(f"Generated {len(trajectory)} poses:")
for i, pose in enumerate(trajectory[:4]):  # Show first 4
    arr = ros2py_pose(pose)
    print(
        f"  Pose {i}: pos=({arr[0]:.2f}, {arr[1]:.2f}), "
        f"quat=({arr[3]:.3f}, {arr[4]:.3f}, {arr[5]:.3f}, {arr[6]:.3f})"
    )
print(f"  ... and {len(trajectory)-4} more")

Working with Custom ROS Message Types

By default, pykal supports common ROS2 message types (Twist, Odometry, Vector3, etc.). This section shows how to use custom message types with pykal’s ROSNode wrapper.

Why Custom Messages?

Use custom messages when:

  • Standard messages don’t fit your data structure

  • You need specific field names for clarity

  • You’re interfacing with existing ROS packages that use custom types

  • You want type safety for complex data structures

Example Use Cases:

  • EstimateWithCovariance - State estimate + uncertainty

  • ControlCommand - Multi-DOF control with metadata

  • SensorFusion - Combined data from multiple sensors

  • RobotStatus - Custom telemetry fields

Message Converter Architecture

pykal uses two registries for bidirectional conversion:

from pykal.ros.ros2py_py2ros import ROS2PY_DEFAULT, PY2ROS_DEFAULT

# ROS → NumPy (for subscribers)
ROS2PY_DEFAULT[MessageType] = converter_function

# NumPy → ROS (for publishers)
PY2ROS_DEFAULT[MessageType] = converter_function

Conversion Flow:

Subscriber: ROS Topic → ROS Message → ROS2PY → NumPy Array → Callback
Publisher:  Callback → NumPy Array → PY2ROS → ROS Message → ROS Topic

Creating a Custom Message

Step 1: Define Message in Package

Create a ROS2 package with custom messages:

# Create package
ros2 pkg create --build-type ament_cmake my_custom_msgs

cd my_custom_msgs
mkdir msg

Create message definition file:

# msg/EstimateWithCovariance.msg
float64[] state         # State estimate vector
float64[] covariance    # Flattened covariance matrix
uint32 state_dim        # State dimension
builtin_interfaces/Time stamp

Step 2: Configure Package

Update CMakeLists.txt:

find_package(rosidl_default_generators REQUIRED)
find_package(builtin_interfaces REQUIRED)

rosidl_generate_interfaces(${PROJECT_NAME}
  "msg/EstimateWithCovariance.msg"
  DEPENDENCIES builtin_interfaces
)

Update package.xml:

<build_depend>rosidl_default_generators</build_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>
<depend>builtin_interfaces</depend>

Step 3: Build Package

cd ~/ros2_ws
colcon build --packages-select my_custom_msgs
source install/setup.bash

# Verify message exists
ros2 interface show my_custom_msgs/msg/EstimateWithCovariance

Registering Converters with pykal

Basic Converter Pattern

# Example: Basic converter pattern (pseudocode - requires actual custom message)
# from pykal.ros.ros2py_py2ros import ROS2PY_DEFAULT, PY2ROS_DEFAULT
# from my_custom_msgs.msg import EstimateWithCovariance
# import numpy as np

# # ROS → NumPy converter
# def estimate_to_numpy(msg):
#     """Convert EstimateWithCovariance to NumPy array."""
#     state = np.array(msg.state)
#     cov = np.array(msg.covariance).reshape(msg.state_dim, msg.state_dim)
#     return np.concatenate([state, cov.flatten()])

# # NumPy → ROS converter
# def numpy_to_estimate(arr):
#     """Convert NumPy array to EstimateWithCovariance."""
#     msg = EstimateWithCovariance()
#     
#     # Assuming arr = [state..., covariance_flat...]
#     state_dim = 3  # Example: 3D state
#     
#     msg.state = arr[:state_dim].tolist()
#     msg.covariance = arr[state_dim:].tolist()
#     msg.state_dim = state_dim
#     # msg.stamp = ...  # Handle timestamp
#     
#     return msg

# # Register converters
# ROS2PY_DEFAULT[EstimateWithCovariance] = estimate_to_numpy
# PY2ROS_DEFAULT[EstimateWithCovariance] = numpy_to_estimate

print("Custom converter pattern demonstrated (commented out - requires custom message package)")

Key Points:

  • ROS → NumPy should return a 1D NumPy array

  • NumPy → ROS should return a populated message object

  • Handle timestamps if message includes them

  • Convert lists to NumPy arrays and vice versa

Example: Multi-Sensor Fusion Message

Message Definition:

# msg/SensorFusion.msg
geometry_msgs/Vector3 mocap      # Motion capture position
std_msgs/Float64 barometer       # Barometer altitude
geometry_msgs/Vector3 imu        # IMU velocities
builtin_interfaces/Time stamp

Converters:

from my_custom_msgs.msg import SensorFusion
from pykal.ros.ros2py_py2ros import ROS2PY_DEFAULT, PY2ROS_DEFAULT
import numpy as np

def sensor_fusion_to_numpy(msg):
    """Convert SensorFusion to [mocap_x, mocap_y, mocap_z, baro, imu_x, imu_y, imu_z]."""
    return np.array([
        msg.mocap.x, msg.mocap.y, msg.mocap.z,
        msg.barometer.data,
        msg.imu.x, msg.imu.y, msg.imu.z
    ])

def numpy_to_sensor_fusion(arr):
    """Convert NumPy [7] array to SensorFusion."""
    from geometry_msgs.msg import Vector3
    from std_msgs.msg import Float64
    from builtin_interfaces.msg import Time
    
    msg = SensorFusion()
    
    msg.mocap = Vector3(x=float(arr[0]), y=float(arr[1]), z=float(arr[2]))
    msg.barometer = Float64(data=float(arr[3]))
    msg.imu = Vector3(x=float(arr[4]), y=float(arr[5]), z=float(arr[6]))
    msg.stamp = Time()  # Use current time or pass as parameter
    
    return msg

ROS2PY_DEFAULT[SensorFusion] = sensor_fusion_to_numpy
PY2ROS_DEFAULT[SensorFusion] = numpy_to_sensor_fusion

Example: Control Command Message

Message Definition:

# msg/ControlCommand.msg
float64[] forces       # Force commands [Fx, Fy, Fz]
float64[] torques      # Torque commands [Tx, Ty, Tz]
uint8 control_mode     # 0=position, 1=velocity, 2=force
builtin_interfaces/Time stamp

Converters:

from my_custom_msgs.msg import ControlCommand

def control_command_to_numpy(msg):
    """Convert to [Fx, Fy, Fz, Tx, Ty, Tz, mode]."""
    forces = np.array(msg.forces)
    torques = np.array(msg.torques)
    mode = np.array([msg.control_mode])
    return np.concatenate([forces, torques, mode])

def numpy_to_control_command(arr):
    """Convert NumPy [7] array to ControlCommand."""
    msg = ControlCommand()
    msg.forces = arr[:3].tolist()
    msg.torques = arr[3:6].tolist()
    msg.control_mode = int(arr[6])
    # msg.stamp = ...  # Handle timestamp
    return msg

ROS2PY_DEFAULT[ControlCommand] = control_command_to_numpy
PY2ROS_DEFAULT[ControlCommand] = numpy_to_control_command

Using Custom Messages with ROSNode

Basic Example

import rclpy
from pykal.ros.ros_node import ROSNode
from my_custom_msgs.msg import EstimateWithCovariance
from pykal.ros.ros2py_py2ros import ROS2PY_DEFAULT, PY2ROS_DEFAULT
import numpy as np

# 1. Register converters (do this once at module level)
ROS2PY_DEFAULT[EstimateWithCovariance] = estimate_to_numpy
PY2ROS_DEFAULT[EstimateWithCovariance] = numpy_to_estimate

# 2. Create callback using custom message
def kalman_filter_callback(tk, measurement):
    # measurement is NumPy array (converted automatically)
    # ... run Kalman filter
    
    # Return estimate (will be converted to EstimateWithCovariance)
    state = np.array([x, y, z])
    cov = P.flatten()
    return {'estimate': np.concatenate([state, cov])}

# 3. Create ROSNode with custom message types
kf_node = ROSNode(
    node_name='kalman_filter',
    callback=kalman_filter_callback,
    subscribes_to=[
        ('/measurement', EstimateWithCovariance, 'measurement')
    ],
    publishes_to=[
        ('estimate', EstimateWithCovariance, '/estimate')
    ],
    rate_hz=10.0
)

# 4. Run node
rclpy.init()
kf_node.create_node()
kf_node.start()
rclpy.spin(kf_node._node)

Complete Example: Sensor Fusion System

from my_custom_msgs.msg import SensorFusion
from nav_msgs.msg import Odometry
from pykal import DynamicalSystem
from pykal.algorithm_library.estimators.kf import KF
import numpy as np

# Register converter (once)
ROS2PY_DEFAULT[SensorFusion] = sensor_fusion_to_numpy
PY2ROS_DEFAULT[SensorFusion] = numpy_to_sensor_fusion

# Create Kalman filter callback
def fusion_callback(tk, sensors):
    # sensors is [mocap_x, mocap_y, mocap_z, baro, imu_x, imu_y, imu_z]
    
    # Extract sensor data
    mocap = sensors[:3]
    baro = sensors[3]
    imu = sensors[4:]
    
    # Fuse sensors with KF
    # ... (KF implementation)
    
    # Return estimate as Odometry
    estimate = np.concatenate([
        position, orientation, linear_vel, angular_vel
    ])
    return {'estimate': estimate}

# Create node
fusion_node = ROSNode(
    node_name='sensor_fusion',
    callback=fusion_callback,
    subscribes_to=[
        ('/sensors', SensorFusion, 'sensors')
    ],
    publishes_to=[
        ('estimate', Odometry, '/estimate')
    ],
    rate_hz=100.0
)

Advanced Topics

Handling Timestamps

For messages with timestamps, use ROS2 time utilities:

from builtin_interfaces.msg import Time
import rclpy.time

def numpy_to_custom_msg(arr, node=None):
    msg = CustomMsg()
    msg.data = arr.tolist()
    
    # Get current ROS time
    if node is not None:
        now = node.get_clock().now()
        msg.stamp = now.to_msg()
    else:
        msg.stamp = Time()  # Zero timestamp
    
    return msg

This requires passing the node to the converter. Currently, pykal doesn’t support this directly.

Workaround: Set timestamp to zero or use wall-clock time:

import time

def numpy_to_custom_msg(arr):
    msg = CustomMsg()
    msg.data = arr.tolist()
    
    # Use wall-clock time (not recommended for simulation!)
    t = time.time()
    msg.stamp.sec = int(t)
    msg.stamp.nanosec = int((t % 1) * 1e9)
    
    return msg

Variable-Length Arrays

For messages with variable-length arrays:

# Message: float64[] data

def msg_to_numpy(msg):
    return np.array(msg.data)  # Simple!

def numpy_to_msg(arr):
    msg = CustomMsg()
    msg.data = arr.flatten().tolist()  # Flatten first
    return msg

Nested Messages

For messages containing other messages:

# Message:
#   geometry_msgs/Pose pose
#   geometry_msgs/Twist velocity

def nested_to_numpy(msg):
    # Flatten nested structure
    pose = np.array([
        msg.pose.position.x,
        msg.pose.position.y,
        msg.pose.position.z,
        msg.pose.orientation.x,
        msg.pose.orientation.y,
        msg.pose.orientation.z,
        msg.pose.orientation.w
    ])
    
    twist = np.array([
        msg.velocity.linear.x,
        msg.velocity.linear.y,
        msg.velocity.linear.z,
        msg.velocity.angular.x,
        msg.velocity.angular.y,
        msg.velocity.angular.z
    ])
    
    return np.concatenate([pose, twist])

Array Messages

For arrays of messages:

# Message: CustomMsg[] array

def array_to_numpy(msg):
    # Stack all elements
    return np.vstack([
        np.array([elem.x, elem.y, elem.z])
        for elem in msg.array
    ]).flatten()

def numpy_to_array(arr):
    msg = ArrayMsg()
    arr_2d = arr.reshape(-1, 3)  # Reshape to (N, 3)
    
    msg.array = [
        CustomMsg(x=row[0], y=row[1], z=row[2])
        for row in arr_2d
    ]
    return msg

Best Practices

1. Flatten to 1D Arrays

Always return 1D NumPy arrays from ROS → NumPy converters:

# GOOD
return np.concatenate([position, orientation.flatten()])

# BAD
return np.vstack([position, orientation])  # 2D!

2. Use .tolist() for ROS Messages

ROS expects Python lists, not NumPy arrays:

# GOOD
msg.data = arr.tolist()

# BAD (may work but not guaranteed)
msg.data = arr

3. Document Expected Array Format

def custom_to_numpy(msg):
    """
    Convert CustomMsg to NumPy array.
    
    Returns
    -------
    np.ndarray, shape (10,)
        [x, y, z, qx, qy, qz, qw, vx, vy, vz]
    """
    # ...

4. Register Early

Register converters at module import time, not inside functions:

# my_package/converters.py
from pykal.ros.ros2py_py2ros import ROS2PY_DEFAULT, PY2ROS_DEFAULT
from my_custom_msgs.msg import CustomMsg

ROS2PY_DEFAULT[CustomMsg] = custom_to_numpy
PY2ROS_DEFAULT[CustomMsg] = numpy_to_custom

# Then in main script:
import my_package.converters  # Registers automatically

5. Test Converters Independently

# Test round-trip conversion
msg_original = CustomMsg(...)
arr = custom_to_numpy(msg_original)
msg_reconstructed = numpy_to_custom(arr)

# Verify equality
assert msg_original == msg_reconstructed

Common Pitfalls

Pitfall 1: Wrong Array Shape

# WRONG: Returning 2D array
def bad_converter(msg):
    return np.array([[msg.x, msg.y, msg.z]])  # Shape (1, 3)

# CORRECT: Return 1D
def good_converter(msg):
    return np.array([msg.x, msg.y, msg.z])  # Shape (3,)

Pitfall 2: Forgetting to Register

# Converter defined but not registered!
def my_converter(msg):
    return np.array([...])

# Must add:
ROS2PY_DEFAULT[MyMsg] = my_converter

Pitfall 3: Type Mismatches

# Message expects float64[]
msg.data = [1, 2, 3]  # Python ints, may fail!

# Better:
msg.data = [float(x) for x in arr]
# Or:
msg.data = arr.astype(float).tolist()

Pitfall 4: Modifying Converters After Node Creation

# BAD: Node already created with old converter
node.create_node()
ROS2PY_DEFAULT[MyMsg] = new_converter  # Too late!

# GOOD: Register before node creation
ROS2PY_DEFAULT[MyMsg] = new_converter
node.create_node()

Complete Custom Message Workflow

This example shows the entire workflow from creating a message package to using it with ROSNode.

1. Create Message Package

ros2 pkg create --build-type ament_cmake robot_msgs
cd robot_msgs/msg
echo "float64[] state
float64[] covariance
uint32 dim" > StateEstimate.msg

cd ..
# Edit CMakeLists.txt and package.xml
cd ~/ros2_ws && colcon build

2. Create Converter Module

# robot_utils/converters.py
from pykal.ros.ros2py_py2ros import ROS2PY_DEFAULT, PY2ROS_DEFAULT
from robot_msgs.msg import StateEstimate
import numpy as np

def state_estimate_to_numpy(msg):
    """[state..., covariance_flat...]"""
    state = np.array(msg.state)
    cov = np.array(msg.covariance)
    return np.concatenate([state, cov])

def numpy_to_state_estimate(arr):
    """Assumes dim=3 for simplicity."""
    msg = StateEstimate()
    msg.dim = 3
    msg.state = arr[:3].tolist()
    msg.covariance = arr[3:].tolist()
    return msg

# Auto-register on import
ROS2PY_DEFAULT[StateEstimate] = state_estimate_to_numpy
PY2ROS_DEFAULT[StateEstimate] = numpy_to_state_estimate

3. Use in ROSNode

# main.py
import robot_utils.converters  # Registers converters
from robot_msgs.msg import StateEstimate
from pykal.ros.ros_node import ROSNode
import numpy as np

def estimator_callback(tk, measurement):
    # measurement auto-converted to NumPy
    # ... run filter
    state = np.array([x, y, z])
    cov = P.flatten()
    return {'estimate': np.concatenate([state, cov])}

node = ROSNode(
    node_name='estimator',
    callback=estimator_callback,
    subscribes_to=[('/measurement', StateEstimate, 'measurement')],
    publishes_to=[('estimate', StateEstimate, '/estimate')],
    rate_hz=10.0
)

import rclpy
rclpy.init()
node.create_node()
node.start()
rclpy.spin(node._node)

Summary

You’ve learned all the essential ROS ↔ NumPy conversions:

Basic Types:

  • Vector3: 3D vectors → (3,) arrays

  • Quaternion: Rotations → (4,) arrays [x, y, z, w]

  • Pose: Position + orientation → (7,) arrays

  • Twist: Linear + angular velocity → (6,) arrays

Stamped Types (add timestamp):

  • PoseStamped(8,): [t, px, py, pz, qx, qy, qz, qw]

  • TwistStamped(7,): [t, vx, vy, vz, wx, wy, wz]

  • Vector3Stamped(4,): [t, x, y, z]

Complex Messages:

  • Odometry(13,): Full robot state (pose + twist)

  • IMU(10,): Orientation + gyro + accel

  • TransformStamped(8,): Coordinate frame transforms

Sensor Messages:

  • LaserScan → Dictionary with ranges array + metadata

  • JointState → Dictionary with joint arrays

Registries:

  • ROS2PY_DEFAULT: Automatic ROS → NumPy conversion

  • PY2ROS_DEFAULT: Automatic NumPy → ROS conversion

Custom Message Workflow:

  1. ✓ Define .msg file in ROS2 package

  2. ✓ Build package with colcon build

  3. ✓ Write ROS → NumPy converter (returns 1D array)

  4. ✓ Write NumPy → ROS converter (returns message object)

  5. ✓ Register both converters in registries

  6. ✓ Use message type in ROSNode subscribes_to/publishes_to

  7. ✓ Test converters independently before integration

Key Reminders:

  • ROS → NumPy must return 1D NumPy array

  • NumPy → ROS must return populated message object

  • Register converters before creating nodes

  • Use .tolist() when assigning NumPy arrays to message fields

  • Document expected array format in docstrings

  • Test round-trip conversion

These converters are the foundation for building ROS nodes with pykal’s ROSNode wrapper. Next, see rosnode.ipynb to learn how ROSNode uses these converters automatically!

← Modules