{
"cells": [
{
"cell_type": "markdown",
"id": "8de9a11f",
"metadata": {},
"source": [
"[← Modules](../../../getting_started/python_to_ros/modules.rst)"
]
},
{
"cell_type": "markdown",
"id": "37651cbf",
"metadata": {},
"source": [
"# ros2py_py2ros"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "991987b5",
"metadata": {
"tags": [
"skip-execution"
]
},
"outputs": [],
"source": [
"import numpy as np\n",
"from pykal.ros.ros2py_py2ros import (\n",
" # Basic geometry\n",
" ros2py_vector3,\n",
" py2ros_vector3,\n",
" ros2py_quaternion,\n",
" py2ros_quaternion,\n",
" ros2py_pose,\n",
" py2ros_pose,\n",
" ros2py_twist,\n",
" py2ros_twist,\n",
" # Stamped versions\n",
" ros2py_pose_stamped,\n",
" py2ros_pose_stamped,\n",
" ros2py_twist_stamped,\n",
" py2ros_twist_stamped,\n",
" ros2py_vector3_stamped,\n",
" py2ros_vector3_stamped,\n",
" # Complex messages\n",
" ros2py_odometry,\n",
" py2ros_odometry,\n",
" ros2py_imu,\n",
" py2ros_imu,\n",
" ros2py_transform_stamped,\n",
" py2ros_transform_stamped,\n",
" # Sensor messages\n",
" ros2py_laserscan,\n",
" py2ros_laserscan,\n",
" ros2py_joint_state,\n",
" py2ros_joint_state,\n",
" # Time utilities\n",
" ros2py_time,\n",
" py2ros_time,\n",
" ros2py_header,\n",
" py2ros_header,\n",
" # Registries\n",
" ROS2PY_DEFAULT,\n",
" PY2ROS_DEFAULT)\n",
"\n",
"# Import ROS message types\n",
"from geometry_msgs.msg import (\n",
" Vector3,\n",
" Quaternion,\n",
" Pose,\n",
" Twist,\n",
" PoseStamped,\n",
" TwistStamped,\n",
" Vector3Stamped,\n",
" TransformStamped)\n",
"from nav_msgs.msg import Odometry\n",
"from sensor_msgs.msg import Imu, LaserScan, JointState\n",
"from std_msgs.msg import Header\n",
"from builtin_interfaces.msg import Time\n",
"\n",
"print(\"Imports successful!\")\n",
"print(f\"Default ROS2PY converters: {len(ROS2PY_DEFAULT)} message types\")\n",
"print(f\"Default PY2ROS converters: {len(PY2ROS_DEFAULT)} message types\")"
]
},
{
"cell_type": "markdown",
"id": "mvb9itpta5a",
"source": "## References and Examples\n\nThis notebook demonstrates all the ROS message ↔ NumPy conversions available in `pykal.ros.ros2py_py2ros`. These converters are the foundation for building ROS2 nodes with the `ROSNode` wrapper, enabling seamless integration between Python control algorithms and ROS2 middleware.",
"metadata": {}
},
{
"cell_type": "markdown",
"id": "7ad53e15",
"metadata": {},
"source": "### Time and Header Conversions\n\nROS uses `builtin_interfaces/Time` for timestamps and `std_msgs/Header` for metadata. We need to convert these to/from Python floats (seconds since epoch).\n\n#### Time Conversion\n\nROS Time has two fields:\n- `sec`: Integer seconds\n- `nanosec`: Nanoseconds (0-999,999,999)\n\nWe convert to a single float representing seconds."
},
{
"cell_type": "code",
"execution_count": null,
"id": "e3158740",
"metadata": {},
"outputs": [],
"source": [
"# Python time → ROS Time\n",
"t_python = 1234.567890123 # seconds with nanosecond precision\n",
"time_msg = py2ros_time(t_python)\n",
"\n",
"print(\"Python → ROS Time:\")\n",
"print(f\" Input: {t_python} seconds\")\n",
"print(f\" Output: sec={time_msg.sec}, nanosec={time_msg.nanosec}\")\n",
"\n",
"# ROS Time → Python time\n",
"t_back = ros2py_time(time_msg)\n",
"print(f\" Roundtrip: {t_back} seconds\")\n",
"print(f\" Error: {abs(t_back - t_python):.12f} seconds\")"
]
},
{
"cell_type": "markdown",
"id": "8e7ee0b4",
"metadata": {},
"source": "#### Header Conversion\n\nHeaders contain:\n- `stamp`: Time message\n- `frame_id`: String identifying coordinate frame\n\nWe convert to/from a dictionary: `{\"t\": float, \"frame_id\": str}`"
},
{
"cell_type": "code",
"execution_count": null,
"id": "932f85c4",
"metadata": {},
"outputs": [],
"source": [
"# Create ROS Header\n",
"header_msg = py2ros_header(t=10.5, frame_id=\"base_link\")\n",
"print(\"Python → ROS Header:\")\n",
"print(f\" frame_id: {header_msg.frame_id}\")\n",
"print(f\" stamp: {header_msg.stamp.sec}.{header_msg.stamp.nanosec:09d}\")\n",
"\n",
"# Convert back to Python\n",
"header_dict = ros2py_header(header_msg)\n",
"print(f\"\\nROS → Python Header:\")\n",
"print(f\" {header_dict}\")"
]
},
{
"cell_type": "markdown",
"id": "5fcc8b72",
"metadata": {},
"source": "### Basic Geometry Messages\n\nThese are the fundamental building blocks for robotics: points, orientations, and velocities.\n\n#### Vector3: 3D Points and Directions\n\n**Format**: `[x, y, z]` as NumPy array of shape `(3,)`"
},
{
"cell_type": "code",
"execution_count": null,
"id": "cb5e77e4",
"metadata": {},
"outputs": [],
"source": [
"# Create a 3D vector\n",
"position = np.array([1.0, 2.0, 3.0])\n",
"vec_msg = py2ros_vector3(position)\n",
"\n",
"print(\"NumPy → ROS Vector3:\")\n",
"print(f\" Input: {position}\")\n",
"print(f\" Output: x={vec_msg.x}, y={vec_msg.y}, z={vec_msg.z}\")\n",
"\n",
"# Convert back\n",
"position_back = ros2py_vector3(vec_msg)\n",
"print(f\" Roundtrip: {position_back}\")\n",
"print(f\" Shape: {position_back.shape}\")\n",
"\n",
"# Works with column vectors too\n",
"position_col = np.array([[1.0], [2.0], [3.0]]) # Shape (3, 1)\n",
"vec_msg2 = py2ros_vector3(position_col)\n",
"print(f\"\\nColumn vector → ROS: x={vec_msg2.x}, y={vec_msg2.y}, z={vec_msg2.z}\")"
]
},
{
"cell_type": "markdown",
"id": "cd581ca9",
"metadata": {},
"source": "#### Quaternion: 3D Rotations\n\n**Format**: `[x, y, z, w]` as NumPy array of shape `(4,)`\n\nQuaternions represent rotations. The `w` component comes last (unlike some conventions)."
},
{
"cell_type": "code",
"execution_count": null,
"id": "033732eb",
"metadata": {},
"outputs": [],
"source": [
"# Identity rotation (no rotation)\n",
"quat_identity = np.array([0.0, 0.0, 0.0, 1.0]) # [x, y, z, w]\n",
"quat_msg = py2ros_quaternion(quat_identity)\n",
"\n",
"print(\"NumPy → ROS Quaternion:\")\n",
"print(f\" Input: {quat_identity}\")\n",
"print(f\" Output: x={quat_msg.x}, y={quat_msg.y}, z={quat_msg.z}, w={quat_msg.w}\")\n",
"\n",
"# 90° rotation around Z axis\n",
"quat_90z = np.array([0.0, 0.0, 0.707, 0.707]) # Approx values\n",
"quat_msg2 = py2ros_quaternion(quat_90z)\n",
"print(\n",
" f\"\\n90° Z rotation → ROS: x={quat_msg2.x:.3f}, y={quat_msg2.y:.3f}, \"\n",
" f\"z={quat_msg2.z:.3f}, w={quat_msg2.w:.3f}\"\n",
")\n",
"\n",
"# Roundtrip\n",
"quat_back = ros2py_quaternion(quat_msg2)\n",
"print(f\" Roundtrip: {quat_back}\")"
]
},
{
"cell_type": "markdown",
"id": "16ba6d9d",
"metadata": {},
"source": "#### Pose: Position + Orientation\n\n**Format**: `[px, py, pz, qx, qy, qz, qw]` as NumPy array of shape `(7,)`\n\nCombines a 3D position with a quaternion orientation."
},
{
"cell_type": "code",
"execution_count": null,
"id": "1525a47c",
"metadata": {},
"outputs": [],
"source": [
"# Robot pose: at (1, 2, 0) with identity rotation\n",
"pose_array = np.array([1.0, 2.0, 0.0, 0.0, 0.0, 0.0, 1.0]) # position # quaternion\n",
"pose_msg = py2ros_pose(pose_array)\n",
"\n",
"print(\"NumPy → ROS Pose:\")\n",
"print(\n",
" f\" Position: ({pose_msg.position.x}, {pose_msg.position.y}, {pose_msg.position.z})\"\n",
")\n",
"print(\n",
" f\" Orientation: ({pose_msg.orientation.x}, {pose_msg.orientation.y}, \"\n",
" f\"{pose_msg.orientation.z}, {pose_msg.orientation.w})\"\n",
")\n",
"\n",
"# Convert back\n",
"pose_back = ros2py_pose(pose_msg)\n",
"print(f\"\\nROS → NumPy Pose:\")\n",
"print(f\" Array: {pose_back}\")\n",
"print(f\" Shape: {pose_back.shape}\")"
]
},
{
"cell_type": "markdown",
"id": "e017956a",
"metadata": {},
"source": "#### Twist: Linear + Angular Velocity\n\n**Format**: `[vx, vy, vz, ωx, ωy, ωz]` as NumPy array of shape `(6,)`\n\nRepresents velocity (linear and angular) of a rigid body."
},
{
"cell_type": "code",
"execution_count": null,
"id": "e5d74589",
"metadata": {},
"outputs": [],
"source": [
"# Moving forward at 0.5 m/s, turning at 0.2 rad/s\n",
"twist_array = np.array(\n",
" [0.5, 0.0, 0.0, 0.0, 0.0, 0.2] # linear velocity\n",
") # angular velocity\n",
"twist_msg = py2ros_twist(twist_array)\n",
"\n",
"print(\"NumPy → ROS Twist:\")\n",
"print(\n",
" f\" Linear: ({twist_msg.linear.x}, {twist_msg.linear.y}, {twist_msg.linear.z}) m/s\"\n",
")\n",
"print(\n",
" f\" Angular: ({twist_msg.angular.x}, {twist_msg.angular.y}, {twist_msg.angular.z}) rad/s\"\n",
")\n",
"\n",
"# Convert back\n",
"twist_back = ros2py_twist(twist_msg)\n",
"print(f\"\\nROS → NumPy Twist: {twist_back}\")"
]
},
{
"cell_type": "markdown",
"id": "d199a283",
"metadata": {},
"source": "### Stamped Messages (With Time)\n\n\"Stamped\" messages add a header (timestamp + frame_id) to basic geometry types. The timestamp allows synchronization across multiple sensors.\n\n#### PoseStamped: Pose + Timestamp\n\n**Format**: `[t, px, py, pz, qx, qy, qz, qw]` as NumPy array of shape `(8,)`"
},
{
"cell_type": "code",
"execution_count": null,
"id": "b7058eb3",
"metadata": {},
"outputs": [],
"source": [
"# Pose at time t=5.0 seconds\n",
"pose_stamped_array = np.array(\n",
" [5.0, 1.0, 2.0, 3.0, 0.0, 0.0, 0.0, 1.0] # time # position\n",
") # quaternion\n",
"\n",
"pose_stamped_msg = py2ros_pose_stamped(pose_stamped_array, frame_id=\"world\")\n",
"\n",
"print(\"NumPy → ROS PoseStamped:\")\n",
"print(f\" Timestamp: {ros2py_time(pose_stamped_msg.header.stamp):.3f} s\")\n",
"print(f\" Frame: {pose_stamped_msg.header.frame_id}\")\n",
"print(\n",
" f\" Position: ({pose_stamped_msg.pose.position.x}, \"\n",
" f\"{pose_stamped_msg.pose.position.y}, {pose_stamped_msg.pose.position.z})\"\n",
")\n",
"\n",
"# Convert back\n",
"pose_stamped_back = ros2py_pose_stamped(pose_stamped_msg)\n",
"print(f\"\\nROS → NumPy PoseStamped:\")\n",
"print(f\" Array: {pose_stamped_back}\")\n",
"print(f\" [time, x, y, z, qx, qy, qz, qw]\")"
]
},
{
"cell_type": "markdown",
"id": "1031292a",
"metadata": {},
"source": "#### TwistStamped: Velocity + Timestamp\n\n**Format**: `[t, vx, vy, vz, ωx, ωy, ωz]` as NumPy array of shape `(7,)`"
},
{
"cell_type": "code",
"execution_count": null,
"id": "bda10803",
"metadata": {},
"outputs": [],
"source": [
"# Velocity measurement at t=10.5\n",
"twist_stamped_array = np.array(\n",
" [10.5, 0.3, 0.0, 0.0, 0.0, 0.0, 0.1] # time # linear\n",
") # angular\n",
"\n",
"twist_stamped_msg = py2ros_twist_stamped(twist_stamped_array, frame_id=\"base_link\")\n",
"\n",
"print(\"NumPy → ROS TwistStamped:\")\n",
"print(f\" Time: {ros2py_time(twist_stamped_msg.header.stamp):.3f} s\")\n",
"print(f\" Frame: {twist_stamped_msg.header.frame_id}\")\n",
"print(\n",
" f\" Linear vel: ({twist_stamped_msg.twist.linear.x:.2f}, \"\n",
" f\"{twist_stamped_msg.twist.linear.y:.2f}, {twist_stamped_msg.twist.linear.z:.2f}) m/s\"\n",
")\n",
"\n",
"# Roundtrip\n",
"twist_stamped_back = ros2py_twist_stamped(twist_stamped_msg)\n",
"print(f\"\\nRoundtrip: {twist_stamped_back}\")"
]
},
{
"cell_type": "markdown",
"id": "89111ba3",
"metadata": {},
"source": "#### Vector3Stamped: 3D Vector + Timestamp\n\n**Format**: `[t, x, y, z]` as NumPy array of shape `(4,)`\n\nUseful for force vectors, acceleration, etc."
},
{
"cell_type": "code",
"execution_count": null,
"id": "410b04f8",
"metadata": {},
"outputs": [],
"source": [
"# Acceleration measurement\n",
"accel_array = np.array([2.5, 0.1, -0.2, 9.81]) # time # acceleration (m/s²)\n",
"\n",
"accel_msg = py2ros_vector3_stamped(accel_array, frame_id=\"imu_link\")\n",
"\n",
"print(\"NumPy → ROS Vector3Stamped:\")\n",
"print(f\" Time: {ros2py_time(accel_msg.header.stamp):.3f} s\")\n",
"print(\n",
" f\" Vector: ({accel_msg.vector.x:.2f}, {accel_msg.vector.y:.2f}, \"\n",
" f\"{accel_msg.vector.z:.2f})\"\n",
")\n",
"\n",
"accel_back = ros2py_vector3_stamped(accel_msg)\n",
"print(f\" Roundtrip: {accel_back}\")"
]
},
{
"cell_type": "markdown",
"id": "656f5631",
"metadata": {},
"source": "### Complex Robot Messages\n\nReal robots publish richer messages combining multiple pieces of information.\n\n#### Odometry: Full Robot State\n\n**Format**: `[px, py, pz, qx, qy, qz, qw, vx, vy, vz, ωx, ωy, ωz]` as shape `(13,)`\n\nCombines pose (7) and twist (6) into a single message. Common output from robot base controllers."
},
{
"cell_type": "code",
"execution_count": null,
"id": "a54abd30",
"metadata": {},
"outputs": [],
"source": [
"# Complete robot state\n",
"odom_array = np.array(\n",
" [\n",
" # Pose: position\n",
" 1.5,\n",
" 0.8,\n",
" 0.0,\n",
" # Pose: orientation (quaternion)\n",
" 0.0,\n",
" 0.0,\n",
" 0.383,\n",
" 0.924, # ~45° rotation around Z\n",
" # Twist: linear velocity\n",
" 0.2,\n",
" 0.1,\n",
" 0.0,\n",
" # Twist: angular velocity\n",
" 0.0,\n",
" 0.0,\n",
" 0.3,\n",
" ]\n",
")\n",
"\n",
"odom_msg = py2ros_odometry(odom_array, frame_id=\"odom\", child_frame_id=\"base_link\")\n",
"\n",
"print(\"NumPy → ROS Odometry:\")\n",
"print(f\" Frame: {odom_msg.header.frame_id} → {odom_msg.child_frame_id}\")\n",
"print(\n",
" f\" Position: ({odom_msg.pose.pose.position.x:.2f}, \"\n",
" f\"{odom_msg.pose.pose.position.y:.2f}, {odom_msg.pose.pose.position.z:.2f})\"\n",
")\n",
"print(\n",
" f\" Orientation: quat({odom_msg.pose.pose.orientation.x:.3f}, \"\n",
" f\"{odom_msg.pose.pose.orientation.y:.3f}, {odom_msg.pose.pose.orientation.z:.3f}, \"\n",
" f\"{odom_msg.pose.pose.orientation.w:.3f})\"\n",
")\n",
"print(\n",
" f\" Linear vel: ({odom_msg.twist.twist.linear.x:.2f}, \"\n",
" f\"{odom_msg.twist.twist.linear.y:.2f}, {odom_msg.twist.twist.linear.z:.2f})\"\n",
")\n",
"print(\n",
" f\" Angular vel: ({odom_msg.twist.twist.angular.x:.2f}, \"\n",
" f\"{odom_msg.twist.twist.angular.y:.2f}, {odom_msg.twist.twist.angular.z:.2f})\"\n",
")\n",
"\n",
"# Convert back\n",
"odom_back = ros2py_odometry(odom_msg)\n",
"print(f\"\\nROS → NumPy Odometry:\")\n",
"print(f\" Shape: {odom_back.shape}\")\n",
"print(f\" Array: {odom_back}\")\n",
"print(f\" [px, py, pz, qx, qy, qz, qw, vx, vy, vz, wx, wy, wz]\")"
]
},
{
"cell_type": "markdown",
"id": "46ba0a78",
"metadata": {},
"source": "#### IMU: Inertial Measurement Unit\n\n**Format**: `[qx, qy, qz, qw, ωx, ωy, ωz, ax, ay, az]` as shape `(10,)`\n\nContains:\n- Orientation (quaternion, 4)\n- Angular velocity (gyro, 3)\n- Linear acceleration (accel, 3)\n\nNote: Covariances are not included in the array representation."
},
{
"cell_type": "code",
"execution_count": null,
"id": "13f78dca",
"metadata": {},
"outputs": [],
"source": [
"# IMU data from a quadrotor\n",
"imu_array = np.array(\n",
" [\n",
" # Orientation (quaternion)\n",
" 0.0,\n",
" 0.0,\n",
" 0.0,\n",
" 1.0,\n",
" # Angular velocity (rad/s)\n",
" 0.1,\n",
" -0.05,\n",
" 0.2,\n",
" # Linear acceleration (m/s²)\n",
" 0.5,\n",
" 0.2,\n",
" 9.81, # Gravity + small perturbation\n",
" ]\n",
")\n",
"\n",
"imu_msg = py2ros_imu(imu_array)\n",
"\n",
"print(\"NumPy → ROS IMU:\")\n",
"print(\n",
" f\" Orientation: quat({imu_msg.orientation.x:.3f}, {imu_msg.orientation.y:.3f}, \"\n",
" f\"{imu_msg.orientation.z:.3f}, {imu_msg.orientation.w:.3f})\"\n",
")\n",
"print(\n",
" f\" Gyro: ({imu_msg.angular_velocity.x:.2f}, {imu_msg.angular_velocity.y:.2f}, \"\n",
" f\"{imu_msg.angular_velocity.z:.2f}) rad/s\"\n",
")\n",
"print(\n",
" f\" Accel: ({imu_msg.linear_acceleration.x:.2f}, {imu_msg.linear_acceleration.y:.2f}, \"\n",
" f\"{imu_msg.linear_acceleration.z:.2f}) m/s²\"\n",
")\n",
"\n",
"# Roundtrip\n",
"imu_back = ros2py_imu(imu_msg)\n",
"print(f\"\\nRoundtrip: {imu_back}\")"
]
},
{
"cell_type": "markdown",
"id": "c70b3d0b",
"metadata": {},
"source": "#### TransformStamped: TF Tree Transforms\n\n**Format**: `[t, tx, ty, tz, qx, qy, qz, qw]` as shape `(8,)`\n\nSimilar to PoseStamped but used for coordinate frame transforms in the TF tree."
},
{
"cell_type": "code",
"execution_count": null,
"id": "c1074967",
"metadata": {},
"outputs": [],
"source": [
"# Transform from base_link to laser_link\n",
"transform_array = np.array(\n",
" [\n",
" 15.0, # time\n",
" 0.1,\n",
" 0.0,\n",
" 0.2, # translation\n",
" 0.0,\n",
" 0.0,\n",
" 0.0,\n",
" 1.0, # rotation (identity)\n",
" ]\n",
")\n",
"\n",
"tf_msg = py2ros_transform_stamped(\n",
" transform_array, frame_id=\"base_link\", child_frame_id=\"laser_link\"\n",
")\n",
"\n",
"print(\"NumPy → ROS TransformStamped:\")\n",
"print(f\" Time: {ros2py_time(tf_msg.header.stamp):.3f} s\")\n",
"print(f\" Transform: {tf_msg.header.frame_id} → {tf_msg.child_frame_id}\")\n",
"print(\n",
" f\" Translation: ({tf_msg.transform.translation.x:.2f}, \"\n",
" f\"{tf_msg.transform.translation.y:.2f}, {tf_msg.transform.translation.z:.2f})\"\n",
")\n",
"\n",
"# Roundtrip\n",
"tf_back = ros2py_transform_stamped(tf_msg)\n",
"print(f\"\\nRoundtrip: {tf_back}\")"
]
},
{
"cell_type": "markdown",
"id": "71ba7fb5",
"metadata": {},
"source": "### Sensor Messages\n\nSpecialized messages for common robot sensors.\n\n#### LaserScan: 2D Lidar Data\n\n**Format**: Dictionary with `ranges` array and metadata\n\nLaserScan contains an array of range measurements plus scan parameters."
},
{
"cell_type": "code",
"execution_count": null,
"id": "f1f73e84",
"metadata": {},
"outputs": [],
"source": [
"# Simulate a 180° scan with 10 beams\n",
"ranges = np.array([1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 4.0, 3.5, 3.0, 2.5])\n",
"\n",
"scan_msg = py2ros_laserscan(\n",
" ranges,\n",
" angle_min=-np.pi / 2, # -90°\n",
" angle_max=np.pi / 2, # +90°\n",
" angle_increment=np.pi / 9, # 180° / 10 beams\n",
" range_min=0.1,\n",
" range_max=10.0,\n",
" frame_id=\"laser_link\",\n",
" stamp=20.0)\n",
"\n",
"print(\"NumPy → ROS LaserScan:\")\n",
"print(f\" Frame: {scan_msg.header.frame_id}\")\n",
"print(f\" Time: {ros2py_time(scan_msg.header.stamp):.3f} s\")\n",
"print(\n",
" f\" Angle range: {np.rad2deg(scan_msg.angle_min):.1f}° to {np.rad2deg(scan_msg.angle_max):.1f}°\"\n",
")\n",
"print(f\" Number of beams: {len(scan_msg.ranges)}\")\n",
"print(f\" Ranges: {scan_msg.ranges[:5]}...\")\n",
"\n",
"# Convert back\n",
"scan_dict = ros2py_laserscan(scan_msg)\n",
"print(f\"\\nROS → Python Dictionary:\")\n",
"print(f\" Keys: {list(scan_dict.keys())}\")\n",
"print(f\" Ranges shape: {scan_dict['ranges'].shape}\")\n",
"print(f\" Ranges: {scan_dict['ranges']}\")"
]
},
{
"cell_type": "markdown",
"id": "0df2f238",
"metadata": {},
"source": "#### JointState: Robot Joint Positions/Velocities\n\n**Format**: Dictionary with `name`, `position`, `velocity`, `effort` arrays\n\nDescribes the state of all joints in a robot (e.g., arm joints, wheel joints)."
},
{
"cell_type": "code",
"execution_count": null,
"id": "fc47bfd7",
"metadata": {},
"outputs": [],
"source": [
"# 3-joint robot arm\n",
"joint_names = [\"shoulder\", \"elbow\", \"wrist\"]\n",
"joint_positions = np.array([0.5, 1.0, -0.3]) # radians\n",
"joint_velocities = np.array([0.1, 0.05, 0.0]) # rad/s\n",
"joint_efforts = np.array([2.0, 1.5, 0.5]) # N⋅m\n",
"\n",
"joint_msg = py2ros_joint_state(\n",
" names=joint_names,\n",
" position=joint_positions,\n",
" velocity=joint_velocities,\n",
" effort=joint_efforts)\n",
"\n",
"print(\"NumPy → ROS JointState:\")\n",
"print(f\" Joints: {joint_msg.name}\")\n",
"print(f\" Positions: {joint_msg.position}\")\n",
"print(f\" Velocities: {joint_msg.velocity}\")\n",
"print(f\" Efforts: {joint_msg.effort}\")\n",
"\n",
"# Convert back\n",
"joint_dict = ros2py_joint_state(joint_msg)\n",
"print(f\"\\nROS → Python Dictionary:\")\n",
"print(f\" Keys: {list(joint_dict.keys())}\")\n",
"print(f\" Names: {joint_dict['name']}\")\n",
"print(f\" Positions: {joint_dict['position']}\")"
]
},
{
"cell_type": "markdown",
"id": "e664720c",
"metadata": {},
"source": "### Using the Conversion Registries\n\nThe `ROS2PY_DEFAULT` and `PY2ROS_DEFAULT` dictionaries map message types to converter functions. ROSNode uses these registries automatically.\n\n#### Checking Available Converters"
},
{
"cell_type": "code",
"execution_count": null,
"id": "a81a82a9",
"metadata": {
"lines_to_next_cell": 2
},
"outputs": [],
"source": [
"print(\"Available ROS2PY converters:\")\n",
"for msg_type in sorted(ROS2PY_DEFAULT.keys(), key=lambda t: t.__name__):\n",
" print(f\" {msg_type.__module__}.{msg_type.__name__}\")\n",
"\n",
"print(f\"\\nTotal: {len(ROS2PY_DEFAULT)} message types\")"
]
},
{
"cell_type": "markdown",
"id": "2a510b40",
"metadata": {
"lines_to_next_cell": 2
},
"source": "#### Using Registries for Generic Conversion"
},
{
"cell_type": "code",
"execution_count": null,
"id": "524b5e7f",
"metadata": {
"lines_to_next_cell": 2
},
"outputs": [],
"source": [
"# Generic converter function\n",
"def convert_ros_to_numpy(msg):\n",
" \"\"\"Convert any registered ROS message to NumPy array.\"\"\"\n",
" msg_type = type(msg)\n",
" if msg_type not in ROS2PY_DEFAULT:\n",
" raise TypeError(f\"No converter for {msg_type.__name__}\")\n",
"\n",
" converter = ROS2PY_DEFAULT[msg_type]\n",
" return converter(msg)\n",
"\n",
"\n",
"# Test with different message types\n",
"test_msgs = [\n",
" py2ros_vector3(np.array([1, 2, 3])),\n",
" py2ros_twist(np.array([0.5, 0, 0, 0, 0, 0.2])),\n",
" py2ros_pose(np.array([1, 2, 3, 0, 0, 0, 1])),\n",
"]\n",
"\n",
"for msg in test_msgs:\n",
" arr = convert_ros_to_numpy(msg)\n",
" print(f\"{type(msg).__name__:20s} → {arr}\")"
]
},
{
"cell_type": "markdown",
"id": "977bc6a1",
"metadata": {
"lines_to_next_cell": 2
},
"source": "### Practical Workflow Examples\n\nReal-world scenarios combining multiple conversions.\n\n#### Example 1: Processing Odometry Stream"
},
{
"cell_type": "code",
"execution_count": null,
"id": "05092cb9",
"metadata": {},
"outputs": [],
"source": [
"# Simulate receiving odometry messages\n",
"def process_odometry_callback(odom_msg: Odometry):\n",
" \"\"\"Extract position and velocity from odometry.\"\"\"\n",
" # Convert to NumPy\n",
" odom_array = ros2py_odometry(odom_msg)\n",
"\n",
" # Extract components\n",
" position = odom_array[0:3] # [px, py, pz]\n",
" quaternion = odom_array[3:7] # [qx, qy, qz, qw]\n",
" linear_vel = odom_array[7:10] # [vx, vy, vz]\n",
" angular_vel = odom_array[10:13] # [wx, wy, wz]\n",
"\n",
" return {\n",
" \"position\": position,\n",
" \"orientation\": quaternion,\n",
" \"linear_velocity\": linear_vel,\n",
" \"angular_velocity\": angular_vel,\n",
" \"speed\": np.linalg.norm(linear_vel),\n",
" }\n",
"\n",
"\n",
"# Test with sample message\n",
"sample_odom = py2ros_odometry(\n",
" np.array(\n",
" [\n",
" 5,\n",
" 3,\n",
" 0, # position\n",
" 0,\n",
" 0,\n",
" 0,\n",
" 1, # orientation\n",
" 0.8,\n",
" 0.2,\n",
" 0, # linear vel\n",
" 0,\n",
" 0,\n",
" 0.1,\n",
" ]\n",
" ), # angular vel\n",
" frame_id=\"odom\")\n",
"\n",
"result = process_odometry_callback(sample_odom)\n",
"print(\"Processed Odometry:\")\n",
"for key, value in result.items():\n",
" if isinstance(value, np.ndarray):\n",
" print(f\" {key}: {value}\")\n",
" else:\n",
" print(f\" {key}: {value:.3f}\")"
]
},
{
"cell_type": "markdown",
"id": "9e708580",
"metadata": {},
"source": "#### Example 2: Creating Stamped Messages with Current Time"
},
{
"cell_type": "code",
"execution_count": null,
"id": "69e0eea2",
"metadata": {
"lines_to_next_cell": 2
},
"outputs": [],
"source": [
"import time\n",
"\n",
"\n",
"def create_velocity_command(vx: float, vy: float, omega: float) -> TwistStamped:\n",
" \"\"\"Create a timestamped velocity command.\"\"\"\n",
" current_time = time.time()\n",
"\n",
" # Create array with timestamp\n",
" cmd_array = np.array([current_time, vx, vy, 0.0, 0.0, 0.0, omega])\n",
"\n",
" # Convert to ROS message\n",
" return py2ros_twist_stamped(cmd_array, frame_id=\"base_link\")\n",
"\n",
"\n",
"# Generate command\n",
"cmd_msg = create_velocity_command(vx=0.5, vy=0.0, omega=0.3)\n",
"\n",
"print(\"Velocity Command:\")\n",
"print(f\" Time: {ros2py_time(cmd_msg.header.stamp):.3f} s\")\n",
"print(f\" Linear: ({cmd_msg.twist.linear.x:.2f}, {cmd_msg.twist.linear.y:.2f}) m/s\")\n",
"print(f\" Angular: {cmd_msg.twist.angular.z:.2f} rad/s\")"
]
},
{
"cell_type": "markdown",
"id": "cb642361",
"metadata": {
"lines_to_next_cell": 2
},
"source": "#### Example 3: Batch Processing Multiple Poses"
},
{
"cell_type": "code",
"execution_count": null,
"id": "12acde27",
"metadata": {},
"outputs": [],
"source": [
"# Generate trajectory of poses\n",
"def generate_circular_trajectory(num_points: int, radius: float) -> list:\n",
" \"\"\"Generate poses along a circular path.\"\"\"\n",
" poses = []\n",
" for i in range(num_points):\n",
" angle = 2 * np.pi * i / num_points\n",
"\n",
" # Position along circle\n",
" x = radius * np.cos(angle)\n",
" y = radius * np.sin(angle)\n",
" z = 0.0\n",
"\n",
" # Orientation facing tangent to circle\n",
" qz = np.sin(angle / 2)\n",
" qw = np.cos(angle / 2)\n",
"\n",
" pose_array = np.array([x, y, z, 0, 0, qz, qw])\n",
" poses.append(py2ros_pose(pose_array))\n",
"\n",
" return poses\n",
"\n",
"\n",
"# Generate and convert\n",
"trajectory = generate_circular_trajectory(num_points=8, radius=2.0)\n",
"\n",
"print(f\"Generated {len(trajectory)} poses:\")\n",
"for i, pose in enumerate(trajectory[:4]): # Show first 4\n",
" arr = ros2py_pose(pose)\n",
" print(\n",
" f\" Pose {i}: pos=({arr[0]:.2f}, {arr[1]:.2f}), \"\n",
" f\"quat=({arr[3]:.3f}, {arr[4]:.3f}, {arr[5]:.3f}, {arr[6]:.3f})\"\n",
" )\n",
"print(f\" ... and {len(trajectory)-4} more\")"
]
},
{
"cell_type": "markdown",
"id": "vaibpjdrv1",
"source": "## Working with Custom ROS Message Types\n\nBy default, `pykal` supports common ROS2 message types (Twist, Odometry, Vector3, etc.). This section shows how to use **custom message types** with `pykal`'s ROSNode wrapper.\n\n### Why Custom Messages?\n\nUse custom messages when:\n\n- Standard messages don't fit your data structure\n- You need specific field names for clarity\n- You're interfacing with existing ROS packages that use custom types\n- You want type safety for complex data structures\n\n**Example Use Cases**:\n\n- `EstimateWithCovariance` - State estimate + uncertainty\n- `ControlCommand` - Multi-DOF control with metadata\n- `SensorFusion` - Combined data from multiple sensors\n- `RobotStatus` - Custom telemetry fields",
"metadata": {}
},
{
"cell_type": "markdown",
"id": "1nd8siv7avv",
"source": "### Message Converter Architecture\n\n`pykal` uses two registries for bidirectional conversion:\n\n```python\nfrom pykal.ros.ros2py_py2ros import ROS2PY_DEFAULT, PY2ROS_DEFAULT\n\n# ROS → NumPy (for subscribers)\nROS2PY_DEFAULT[MessageType] = converter_function\n\n# NumPy → ROS (for publishers)\nPY2ROS_DEFAULT[MessageType] = converter_function\n```\n\n**Conversion Flow**:\n\n```\nSubscriber: ROS Topic → ROS Message → ROS2PY → NumPy Array → Callback\nPublisher: Callback → NumPy Array → PY2ROS → ROS Message → ROS Topic\n```",
"metadata": {}
},
{
"cell_type": "markdown",
"id": "7q9j3xpyuva",
"source": "### Creating a Custom Message\n\n#### Step 1: Define Message in Package\n\nCreate a ROS2 package with custom messages:\n\n```bash\n# Create package\nros2 pkg create --build-type ament_cmake my_custom_msgs\n\ncd my_custom_msgs\nmkdir msg\n```\n\nCreate message definition file:\n\n```text\n# msg/EstimateWithCovariance.msg\nfloat64[] state # State estimate vector\nfloat64[] covariance # Flattened covariance matrix\nuint32 state_dim # State dimension\nbuiltin_interfaces/Time stamp\n```\n\n#### Step 2: Configure Package\n\nUpdate `CMakeLists.txt`:\n\n```cmake\nfind_package(rosidl_default_generators REQUIRED)\nfind_package(builtin_interfaces REQUIRED)\n\nrosidl_generate_interfaces(${PROJECT_NAME}\n \"msg/EstimateWithCovariance.msg\"\n DEPENDENCIES builtin_interfaces\n)\n```\n\nUpdate `package.xml`:\n\n```xml\nrosidl_default_generators\nrosidl_default_runtime\nrosidl_interface_packages\nbuiltin_interfaces\n```\n\n#### Step 3: Build Package\n\n```bash\ncd ~/ros2_ws\ncolcon build --packages-select my_custom_msgs\nsource install/setup.bash\n\n# Verify message exists\nros2 interface show my_custom_msgs/msg/EstimateWithCovariance\n```",
"metadata": {}
},
{
"cell_type": "markdown",
"id": "3saeg44t7km",
"source": "### Registering Converters with pykal\n\n#### Basic Converter Pattern",
"metadata": {}
},
{
"cell_type": "code",
"id": "tjfmwvzlfzf",
"source": "# Example: Basic converter pattern (pseudocode - requires actual custom message)\n# from pykal.ros.ros2py_py2ros import ROS2PY_DEFAULT, PY2ROS_DEFAULT\n# from my_custom_msgs.msg import EstimateWithCovariance\n# import numpy as np\n\n# # ROS → NumPy converter\n# def estimate_to_numpy(msg):\n# \"\"\"Convert EstimateWithCovariance to NumPy array.\"\"\"\n# state = np.array(msg.state)\n# cov = np.array(msg.covariance).reshape(msg.state_dim, msg.state_dim)\n# return np.concatenate([state, cov.flatten()])\n\n# # NumPy → ROS converter\n# def numpy_to_estimate(arr):\n# \"\"\"Convert NumPy array to EstimateWithCovariance.\"\"\"\n# msg = EstimateWithCovariance()\n# \n# # Assuming arr = [state..., covariance_flat...]\n# state_dim = 3 # Example: 3D state\n# \n# msg.state = arr[:state_dim].tolist()\n# msg.covariance = arr[state_dim:].tolist()\n# msg.state_dim = state_dim\n# # msg.stamp = ... # Handle timestamp\n# \n# return msg\n\n# # Register converters\n# ROS2PY_DEFAULT[EstimateWithCovariance] = estimate_to_numpy\n# PY2ROS_DEFAULT[EstimateWithCovariance] = numpy_to_estimate\n\nprint(\"Custom converter pattern demonstrated (commented out - requires custom message package)\")",
"metadata": {
"tags": [
"skip-execution"
]
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"id": "1h5yx6py34v",
"source": "**Key Points**:\n\n- ROS → NumPy should return a 1D NumPy array\n- NumPy → ROS should return a populated message object\n- Handle timestamps if message includes them\n- Convert lists to NumPy arrays and vice versa\n\n#### Example: Multi-Sensor Fusion Message\n\n**Message Definition**:\n\n```text\n# msg/SensorFusion.msg\ngeometry_msgs/Vector3 mocap # Motion capture position\nstd_msgs/Float64 barometer # Barometer altitude\ngeometry_msgs/Vector3 imu # IMU velocities\nbuiltin_interfaces/Time stamp\n```\n\n**Converters**:\n\n```python\nfrom my_custom_msgs.msg import SensorFusion\nfrom pykal.ros.ros2py_py2ros import ROS2PY_DEFAULT, PY2ROS_DEFAULT\nimport numpy as np\n\ndef sensor_fusion_to_numpy(msg):\n \"\"\"Convert SensorFusion to [mocap_x, mocap_y, mocap_z, baro, imu_x, imu_y, imu_z].\"\"\"\n return np.array([\n msg.mocap.x, msg.mocap.y, msg.mocap.z,\n msg.barometer.data,\n msg.imu.x, msg.imu.y, msg.imu.z\n ])\n\ndef numpy_to_sensor_fusion(arr):\n \"\"\"Convert NumPy [7] array to SensorFusion.\"\"\"\n from geometry_msgs.msg import Vector3\n from std_msgs.msg import Float64\n from builtin_interfaces.msg import Time\n \n msg = SensorFusion()\n \n msg.mocap = Vector3(x=float(arr[0]), y=float(arr[1]), z=float(arr[2]))\n msg.barometer = Float64(data=float(arr[3]))\n msg.imu = Vector3(x=float(arr[4]), y=float(arr[5]), z=float(arr[6]))\n msg.stamp = Time() # Use current time or pass as parameter\n \n return msg\n\nROS2PY_DEFAULT[SensorFusion] = sensor_fusion_to_numpy\nPY2ROS_DEFAULT[SensorFusion] = numpy_to_sensor_fusion\n```",
"metadata": {}
},
{
"cell_type": "markdown",
"id": "55rhp5hnnpo",
"source": "#### Example: Control Command Message\n\n**Message Definition**:\n\n```text\n# msg/ControlCommand.msg\nfloat64[] forces # Force commands [Fx, Fy, Fz]\nfloat64[] torques # Torque commands [Tx, Ty, Tz]\nuint8 control_mode # 0=position, 1=velocity, 2=force\nbuiltin_interfaces/Time stamp\n```\n\n**Converters**:\n\n```python\nfrom my_custom_msgs.msg import ControlCommand\n\ndef control_command_to_numpy(msg):\n \"\"\"Convert to [Fx, Fy, Fz, Tx, Ty, Tz, mode].\"\"\"\n forces = np.array(msg.forces)\n torques = np.array(msg.torques)\n mode = np.array([msg.control_mode])\n return np.concatenate([forces, torques, mode])\n\ndef numpy_to_control_command(arr):\n \"\"\"Convert NumPy [7] array to ControlCommand.\"\"\"\n msg = ControlCommand()\n msg.forces = arr[:3].tolist()\n msg.torques = arr[3:6].tolist()\n msg.control_mode = int(arr[6])\n # msg.stamp = ... # Handle timestamp\n return msg\n\nROS2PY_DEFAULT[ControlCommand] = control_command_to_numpy\nPY2ROS_DEFAULT[ControlCommand] = numpy_to_control_command\n```",
"metadata": {}
},
{
"cell_type": "markdown",
"id": "y4cd27il2f",
"source": "### Using Custom Messages with ROSNode\n\n#### Basic Example\n\n```python\nimport rclpy\nfrom pykal.ros.ros_node import ROSNode\nfrom my_custom_msgs.msg import EstimateWithCovariance\nfrom pykal.ros.ros2py_py2ros import ROS2PY_DEFAULT, PY2ROS_DEFAULT\nimport numpy as np\n\n# 1. Register converters (do this once at module level)\nROS2PY_DEFAULT[EstimateWithCovariance] = estimate_to_numpy\nPY2ROS_DEFAULT[EstimateWithCovariance] = numpy_to_estimate\n\n# 2. Create callback using custom message\ndef kalman_filter_callback(tk, measurement):\n # measurement is NumPy array (converted automatically)\n # ... run Kalman filter\n \n # Return estimate (will be converted to EstimateWithCovariance)\n state = np.array([x, y, z])\n cov = P.flatten()\n return {'estimate': np.concatenate([state, cov])}\n\n# 3. Create ROSNode with custom message types\nkf_node = ROSNode(\n node_name='kalman_filter',\n callback=kalman_filter_callback,\n subscribes_to=[\n ('/measurement', EstimateWithCovariance, 'measurement')\n ],\n publishes_to=[\n ('estimate', EstimateWithCovariance, '/estimate')\n ],\n rate_hz=10.0\n)\n\n# 4. Run node\nrclpy.init()\nkf_node.create_node()\nkf_node.start()\nrclpy.spin(kf_node._node)\n```",
"metadata": {}
},
{
"cell_type": "markdown",
"id": "4anl0j8kto2",
"source": "#### Complete Example: Sensor Fusion System\n\n```python\nfrom my_custom_msgs.msg import SensorFusion\nfrom nav_msgs.msg import Odometry\nfrom pykal import DynamicalSystem\nfrom pykal.algorithm_library.estimators.kf import KF\nimport numpy as np\n\n# Register converter (once)\nROS2PY_DEFAULT[SensorFusion] = sensor_fusion_to_numpy\nPY2ROS_DEFAULT[SensorFusion] = numpy_to_sensor_fusion\n\n# Create Kalman filter callback\ndef fusion_callback(tk, sensors):\n # sensors is [mocap_x, mocap_y, mocap_z, baro, imu_x, imu_y, imu_z]\n \n # Extract sensor data\n mocap = sensors[:3]\n baro = sensors[3]\n imu = sensors[4:]\n \n # Fuse sensors with KF\n # ... (KF implementation)\n \n # Return estimate as Odometry\n estimate = np.concatenate([\n position, orientation, linear_vel, angular_vel\n ])\n return {'estimate': estimate}\n\n# Create node\nfusion_node = ROSNode(\n node_name='sensor_fusion',\n callback=fusion_callback,\n subscribes_to=[\n ('/sensors', SensorFusion, 'sensors')\n ],\n publishes_to=[\n ('estimate', Odometry, '/estimate')\n ],\n rate_hz=100.0\n)\n```",
"metadata": {}
},
{
"cell_type": "markdown",
"id": "e0uqw0onsx4",
"source": "### Advanced Topics\n\n#### Handling Timestamps\n\nFor messages with timestamps, use ROS2 time utilities:\n\n```python\nfrom builtin_interfaces.msg import Time\nimport rclpy.time\n\ndef numpy_to_custom_msg(arr, node=None):\n msg = CustomMsg()\n msg.data = arr.tolist()\n \n # Get current ROS time\n if node is not None:\n now = node.get_clock().now()\n msg.stamp = now.to_msg()\n else:\n msg.stamp = Time() # Zero timestamp\n \n return msg\n```\n\nThis requires passing the node to the converter. Currently, `pykal` doesn't support this directly.\n\n**Workaround**: Set timestamp to zero or use wall-clock time:\n\n```python\nimport time\n\ndef numpy_to_custom_msg(arr):\n msg = CustomMsg()\n msg.data = arr.tolist()\n \n # Use wall-clock time (not recommended for simulation!)\n t = time.time()\n msg.stamp.sec = int(t)\n msg.stamp.nanosec = int((t % 1) * 1e9)\n \n return msg\n```\n\n#### Variable-Length Arrays\n\nFor messages with variable-length arrays:\n\n```python\n# Message: float64[] data\n\ndef msg_to_numpy(msg):\n return np.array(msg.data) # Simple!\n\ndef numpy_to_msg(arr):\n msg = CustomMsg()\n msg.data = arr.flatten().tolist() # Flatten first\n return msg\n```\n\n#### Nested Messages\n\nFor messages containing other messages:\n\n```python\n# Message:\n# geometry_msgs/Pose pose\n# geometry_msgs/Twist velocity\n\ndef nested_to_numpy(msg):\n # Flatten nested structure\n pose = np.array([\n msg.pose.position.x,\n msg.pose.position.y,\n msg.pose.position.z,\n msg.pose.orientation.x,\n msg.pose.orientation.y,\n msg.pose.orientation.z,\n msg.pose.orientation.w\n ])\n \n twist = np.array([\n msg.velocity.linear.x,\n msg.velocity.linear.y,\n msg.velocity.linear.z,\n msg.velocity.angular.x,\n msg.velocity.angular.y,\n msg.velocity.angular.z\n ])\n \n return np.concatenate([pose, twist])\n```\n\n#### Array Messages\n\nFor arrays of messages:\n\n```python\n# Message: CustomMsg[] array\n\ndef array_to_numpy(msg):\n # Stack all elements\n return np.vstack([\n np.array([elem.x, elem.y, elem.z])\n for elem in msg.array\n ]).flatten()\n\ndef numpy_to_array(arr):\n msg = ArrayMsg()\n arr_2d = arr.reshape(-1, 3) # Reshape to (N, 3)\n \n msg.array = [\n CustomMsg(x=row[0], y=row[1], z=row[2])\n for row in arr_2d\n ]\n return msg\n```",
"metadata": {}
},
{
"cell_type": "markdown",
"id": "e5matv1nph7",
"source": "### Best Practices\n\n#### 1. Flatten to 1D Arrays\n\nAlways return 1D NumPy arrays from ROS → NumPy converters:\n\n```python\n# GOOD\nreturn np.concatenate([position, orientation.flatten()])\n\n# BAD\nreturn np.vstack([position, orientation]) # 2D!\n```\n\n#### 2. Use `.tolist()` for ROS Messages\n\nROS expects Python lists, not NumPy arrays:\n\n```python\n# GOOD\nmsg.data = arr.tolist()\n\n# BAD (may work but not guaranteed)\nmsg.data = arr\n```\n\n#### 3. Document Expected Array Format\n\n```python\ndef custom_to_numpy(msg):\n \"\"\"\n Convert CustomMsg to NumPy array.\n \n Returns\n -------\n np.ndarray, shape (10,)\n [x, y, z, qx, qy, qz, qw, vx, vy, vz]\n \"\"\"\n # ...\n```\n\n#### 4. Register Early\n\nRegister converters at module import time, not inside functions:\n\n```python\n# my_package/converters.py\nfrom pykal.ros.ros2py_py2ros import ROS2PY_DEFAULT, PY2ROS_DEFAULT\nfrom my_custom_msgs.msg import CustomMsg\n\nROS2PY_DEFAULT[CustomMsg] = custom_to_numpy\nPY2ROS_DEFAULT[CustomMsg] = numpy_to_custom\n\n# Then in main script:\nimport my_package.converters # Registers automatically\n```\n\n#### 5. Test Converters Independently\n\n```python\n# Test round-trip conversion\nmsg_original = CustomMsg(...)\narr = custom_to_numpy(msg_original)\nmsg_reconstructed = numpy_to_custom(arr)\n\n# Verify equality\nassert msg_original == msg_reconstructed\n```",
"metadata": {}
},
{
"cell_type": "markdown",
"id": "hbimhaxhqat",
"source": "### Common Pitfalls\n\n#### Pitfall 1: Wrong Array Shape\n\n```python\n# WRONG: Returning 2D array\ndef bad_converter(msg):\n return np.array([[msg.x, msg.y, msg.z]]) # Shape (1, 3)\n\n# CORRECT: Return 1D\ndef good_converter(msg):\n return np.array([msg.x, msg.y, msg.z]) # Shape (3,)\n```\n\n#### Pitfall 2: Forgetting to Register\n\n```python\n# Converter defined but not registered!\ndef my_converter(msg):\n return np.array([...])\n\n# Must add:\nROS2PY_DEFAULT[MyMsg] = my_converter\n```\n\n#### Pitfall 3: Type Mismatches\n\n```python\n# Message expects float64[]\nmsg.data = [1, 2, 3] # Python ints, may fail!\n\n# Better:\nmsg.data = [float(x) for x in arr]\n# Or:\nmsg.data = arr.astype(float).tolist()\n```\n\n#### Pitfall 4: Modifying Converters After Node Creation\n\n```python\n# BAD: Node already created with old converter\nnode.create_node()\nROS2PY_DEFAULT[MyMsg] = new_converter # Too late!\n\n# GOOD: Register before node creation\nROS2PY_DEFAULT[MyMsg] = new_converter\nnode.create_node()\n```",
"metadata": {}
},
{
"cell_type": "markdown",
"id": "kl2gqimrde",
"source": "### Complete Custom Message Workflow\n\nThis example shows the entire workflow from creating a message package to using it with ROSNode.\n\n#### 1. Create Message Package\n\n```bash\nros2 pkg create --build-type ament_cmake robot_msgs\ncd robot_msgs/msg\necho \"float64[] state\nfloat64[] covariance\nuint32 dim\" > StateEstimate.msg\n\ncd ..\n# Edit CMakeLists.txt and package.xml\ncd ~/ros2_ws && colcon build\n```\n\n#### 2. Create Converter Module\n\n```python\n# robot_utils/converters.py\nfrom pykal.ros.ros2py_py2ros import ROS2PY_DEFAULT, PY2ROS_DEFAULT\nfrom robot_msgs.msg import StateEstimate\nimport numpy as np\n\ndef state_estimate_to_numpy(msg):\n \"\"\"[state..., covariance_flat...]\"\"\"\n state = np.array(msg.state)\n cov = np.array(msg.covariance)\n return np.concatenate([state, cov])\n\ndef numpy_to_state_estimate(arr):\n \"\"\"Assumes dim=3 for simplicity.\"\"\"\n msg = StateEstimate()\n msg.dim = 3\n msg.state = arr[:3].tolist()\n msg.covariance = arr[3:].tolist()\n return msg\n\n# Auto-register on import\nROS2PY_DEFAULT[StateEstimate] = state_estimate_to_numpy\nPY2ROS_DEFAULT[StateEstimate] = numpy_to_state_estimate\n```\n\n#### 3. Use in ROSNode\n\n```python\n# main.py\nimport robot_utils.converters # Registers converters\nfrom robot_msgs.msg import StateEstimate\nfrom pykal.ros.ros_node import ROSNode\nimport numpy as np\n\ndef estimator_callback(tk, measurement):\n # measurement auto-converted to NumPy\n # ... run filter\n state = np.array([x, y, z])\n cov = P.flatten()\n return {'estimate': np.concatenate([state, cov])}\n\nnode = ROSNode(\n node_name='estimator',\n callback=estimator_callback,\n subscribes_to=[('/measurement', StateEstimate, 'measurement')],\n publishes_to=[('estimate', StateEstimate, '/estimate')],\n rate_hz=10.0\n)\n\nimport rclpy\nrclpy.init()\nnode.create_node()\nnode.start()\nrclpy.spin(node._node)\n```",
"metadata": {}
},
{
"cell_type": "markdown",
"id": "f531aa7f",
"metadata": {},
"source": "## Summary\n\nYou've learned all the essential ROS ↔ NumPy conversions:\n\n**Basic Types**:\n- `Vector3`: 3D vectors → `(3,)` arrays\n- `Quaternion`: Rotations → `(4,)` arrays `[x, y, z, w]`\n- `Pose`: Position + orientation → `(7,)` arrays\n- `Twist`: Linear + angular velocity → `(6,)` arrays\n\n**Stamped Types** (add timestamp):\n- `PoseStamped` → `(8,)`: `[t, px, py, pz, qx, qy, qz, qw]`\n- `TwistStamped` → `(7,)`: `[t, vx, vy, vz, wx, wy, wz]`\n- `Vector3Stamped` → `(4,)`: `[t, x, y, z]`\n\n**Complex Messages**:\n- `Odometry` → `(13,)`: Full robot state (pose + twist)\n- `IMU` → `(10,)`: Orientation + gyro + accel\n- `TransformStamped` → `(8,)`: Coordinate frame transforms\n\n**Sensor Messages**:\n- `LaserScan` → Dictionary with ranges array + metadata\n- `JointState` → Dictionary with joint arrays\n\n**Registries**:\n- `ROS2PY_DEFAULT`: Automatic ROS → NumPy conversion\n- `PY2ROS_DEFAULT`: Automatic NumPy → ROS conversion\n\n**Custom Message Workflow**:\n\n1. ✓ Define `.msg` file in ROS2 package\n2. ✓ Build package with `colcon build`\n3. ✓ Write ROS → NumPy converter (returns 1D array)\n4. ✓ Write NumPy → ROS converter (returns message object)\n5. ✓ Register both converters in registries\n6. ✓ Use message type in ROSNode `subscribes_to`/`publishes_to`\n7. ✓ Test converters independently before integration\n\n**Key Reminders**:\n\n- ROS → NumPy must return **1D NumPy array**\n- NumPy → ROS must return **populated message object**\n- Register converters **before** creating nodes\n- Use `.tolist()` when assigning NumPy arrays to message fields\n- Document expected array format in docstrings\n- Test round-trip conversion\n\nThese converters are the foundation for building ROS nodes with pykal's `ROSNode` wrapper. Next, see `rosnode.ipynb` to learn how ROSNode uses these converters automatically!"
},
{
"cell_type": "markdown",
"id": "d0866315",
"metadata": {},
"source": [
"[← Modules](../../../getting_started/python_to_ros/modules.rst)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
}
},
"nbformat": 4,
"nbformat_minor": 5
}