Source code for symaware.extra.ros.roslib.data_structures

import json
from abc import ABC, abstractmethod
from dataclasses import asdict, dataclass, field
from typing import TYPE_CHECKING

import numpy as np
import roslibpy
from symaware.base import Message

if TYPE_CHECKING:
    import sys

    if sys.version_info >= (3, 11):
        from typing import Self
    else:
        from typing_extensions import Self


[docs] @dataclass class RosMessageData(ABC):
[docs] @staticmethod def msg_type() -> str: """Return ROS message type string. Must be called on a subclass.""" raise TypeError("msg_type method should be called on a subclass")
[docs] def to_dict(self) -> dict: """Convert dataclass to dictionary.""" return asdict(self)
[docs] @classmethod @abstractmethod def from_dict(cls, d: dict) -> "Self": pass
[docs] @dataclass class Time(RosMessageData): """Timestamp, using both seconds and nanoseconds""" sec: int = field(default_factory=lambda: roslibpy.Time.now().secs) nanosec: int = field(default_factory=lambda: roslibpy.Time.now().nsecs)
[docs] @staticmethod def msg_type() -> str: return "std_msgs/Time"
[docs] @classmethod def from_dict(cls, d: dict) -> "Self": if "secs" in d: return cls(sec=d["secs"], nanosec=d["nsecs"]) return cls(sec=d["sec"], nanosec=d["nanosec"])
[docs] @dataclass class String(RosMessageData): """String message Args ---- data: string data """ data: str = ""
[docs] @staticmethod def msg_type() -> str: return "std_msgs/String"
[docs] @classmethod def from_dict(cls, d: dict) -> "Self": return cls(data=d["data"])
[docs] @dataclass class Bool(RosMessageData): """Boolean message Args ---- data: boolean data """ data: bool = False
[docs] @staticmethod def msg_type() -> str: return "std_msgs/Bool"
[docs] @classmethod def from_dict(cls, d: dict) -> "Self": return cls(data=d["data"])
[docs] @dataclass class Int32(RosMessageData): """32-bit integer message Args ---- data: 32-bit integer data """ data: int = 0
[docs] @staticmethod def msg_type() -> str: return "std_msgs/Int32"
[docs] @classmethod def from_dict(cls, d: dict) -> "Self": return cls(data=d["data"])
[docs] @dataclass class Int32Array(RosMessageData): """32-bit integer array message Args ---- data: 32-bit integer data """ data: "list[int]" = field(default_factory=list) layout: dict = field(default_factory=dict) def __post_init__(self): """Set up default layout dict for multi-array message.""" if not self.layout: self.layout = { "dim": [ { "label": "array", "size": len(self.data), "stride": len(self.data), } ], "data_offset": 0, }
[docs] @staticmethod def msg_type() -> str: return "std_msgs/Int32MultiArray"
[docs] @classmethod def from_dict(cls, d: dict) -> "Self": return cls(data=d["data"])
[docs] @dataclass class Float32(RosMessageData): """32-bit float message Args ---- data: 32-bit float data """ data: float = 0.0
[docs] @staticmethod def msg_type() -> str: return "std_msgs/Float32"
[docs] @classmethod def from_dict(cls, d: dict) -> "Self": return cls(data=d["data"])
[docs] @dataclass class Float32Array(RosMessageData): """32-bit float array message Args ---- data: 32-bit float data """ data: "list[float]" = field(default_factory=list) layout: dict = field(default_factory=dict) def __post_init__(self): """Set up default layout dict for multi-array message.""" if not self.layout: self.layout = { "dim": [ { "label": "array", "size": len(self.data), "stride": len(self.data), } ], "data_offset": 0, }
[docs] @staticmethod def msg_type() -> str: return "std_msgs/Float32MultiArray"
[docs] @classmethod def from_dict(cls, d: dict) -> "Self": return cls(data=d["data"])
[docs] @dataclass class Float64(RosMessageData): """64-bit float message Args ---- data: 64-bit float data """ data: float = 0.0
[docs] @staticmethod def msg_type() -> str: return "std_msgs/Float64"
[docs] @classmethod def from_dict(cls, d: dict) -> "Self": return cls(data=d["data"])
[docs] @dataclass class Float64Array(RosMessageData): """64-bit float array message Args ---- data: 64-bit float data """ data: "list[float]" = field(default_factory=list) layout: dict = field(default_factory=dict) def __post_init__(self): """Set up default layout dict for multi-array message.""" if not self.layout: self.layout = { "dim": [ { "label": "array", "size": len(self.data), "stride": len(self.data), } ], "data_offset": 0, }
[docs] @staticmethod def msg_type() -> str: return "std_msgs/Float64MultiArray"
[docs] @classmethod def from_dict(cls, d: dict) -> "Self": return cls(data=d["data"])
[docs] @dataclass class Vector3(RosMessageData): """ 3-Dimensional vector Args ---- x: x y: y z: z """ x: float y: float z: float
[docs] @staticmethod def msg_type() -> str: return "geometry_msgs/Vector3"
[docs] @classmethod def from_dict(cls, d: dict) -> "Self": return cls(x=d["x"], y=d["y"], z=d["z"])
def __array__(self): return np.array([self.x, self.y, self.z])
[docs] @dataclass class Point(RosMessageData): """3-Dimensional point""" x: float = 0.0 y: float = 0.0 z: float = 0.0
[docs] @staticmethod def msg_type() -> str: return "geometry_msgs/Point"
[docs] @classmethod def from_dict(cls, d: dict) -> "Self": return cls(x=d["x"], y=d["y"], z=d["z"])
def __array__(self): """Convert to numpy array of [x, y, z].""" return np.array([self.x, self.y, self.z])
[docs] @dataclass class Quaternion(RosMessageData): """Quaternion rotation""" x: float = 0.0 y: float = 0.0 z: float = 0.0 w: float = 0.0
[docs] @staticmethod def msg_type() -> str: return "geometry_msgs/Quaternion"
[docs] @classmethod def from_dict(cls, d: dict) -> "Self": return cls(x=d["x"], y=d["y"], z=d["z"], w=d["w"])
def __array__(self): """Convert to numpy array of [x, y, z, w].""" return np.array([self.x, self.y, self.z, self.w])
[docs] @dataclass class Pose(RosMessageData): """Pose, combining both a position and an orientation""" position: Point = field(default_factory=Point) orientation: Quaternion = field(default_factory=Quaternion)
[docs] @staticmethod def msg_type() -> str: return "geometry_msgs/Pose"
[docs] @classmethod def from_dict(cls, d: dict) -> "Self": return cls(position=Point.from_dict(d["position"]), orientation=Quaternion.from_dict(d["orientation"]))
def __array__(self): """Convert to numpy array concatenating position and orientation.""" return np.concatenate((np.array(self.position), np.array(self.orientation)))
[docs] @dataclass class PoseStamped(RosMessageData): """A Pose with reference coordinate frame and timestamp""" header: Header = field(default_factory=Header) pose: Pose = field(default_factory=Pose)
[docs] @staticmethod def msg_type() -> str: return "geometry_msgs/PoseStamped"
[docs] @classmethod def from_dict(cls, d: dict) -> "Self": return cls(header=Header.from_dict(d["header"]), pose=Pose.from_dict(d["pose"]))
def __array__(self): """Convert to numpy array of pose.""" return np.array(self.pose)
[docs] @dataclass class TrafficLightState(RosMessageData): """State of a traffic light""" header: Header = field(default_factory=Header) id: str = "" position: Point = field(default_factory=Point) status: str = ""
[docs] @staticmethod def msg_type() -> str: return "symaware_msgs/TrafficLightState"
[docs] @classmethod def from_dict(cls, d: dict) -> "Self": return cls( header=Header.from_dict(d["header"]), id=d["id"], position=Point.from_dict(d["position"]), status=d["status"], )
[docs] @dataclass class AirSensorOutput(RosMessageData): """Output produced by the AIR sensor""" header: Header = field(default_factory=Header) tid: str = "" range: float = 0.0 azimuth: float = 0.0 elevation: float = 0.0 velocity: float = 0.0 heading: float = 0.0
[docs] @staticmethod def msg_type() -> str: return "symaware_msgs/AirSensorOutput"
[docs] @classmethod def from_dict(cls, d: dict) -> "Self": return cls( header=Header.from_dict(d["header"]), tid=d["tid"], range=d["range"], azimuth=d["azimuth"], elevation=d["elevation"], velocity=d["velocity"], heading=d["heading"], )
def __array__(self): """Convert to numpy array of [range, azimuth, elevation, velocity, heading].""" return np.array([self.range, self.azimuth, self.elevation, self.velocity, self.heading])
[docs] @dataclass class JsonAirSensorOutput(RosMessageData): """Output produced by the AIR sensor, sent as a JSON string""" header: Header = field(default_factory=Header) tid: str = "" range: float = 0.0 azimuth: float = 0.0 elevation: float = 0.0 velocity: float = 0.0 heading: float = 0.0
[docs] @staticmethod def msg_type() -> str: return "std_msgs/String"
[docs] @classmethod def from_dict(cls, d: dict) -> "Self": js = json.loads(d["data"].replace("\0", "").replace("'", '"')) return cls( tid=js["id"], range=js["range"], azimuth=js["theta"], elevation=0.0, velocity=js["vel"], heading=js["heading"], )
def __array__(self): """Convert to numpy array of [range, azimuth, elevation, velocity, heading].""" return np.array([self.range, self.azimuth, self.elevation, self.velocity, self.heading])
[docs] @dataclass class Path(RosMessageData): """List of points that make up a path""" header: Header = field(default_factory=Header) poses: "list[Pose]" = field(default_factory=list)
[docs] @staticmethod def msg_type() -> str: return "symaware_msgs/Path"
[docs] @classmethod def from_dict(cls, d: dict) -> "Self": return cls(header=Header.from_dict(d["header"]), poses=[Pose.from_dict(p) for p in d["poses"]])
[docs] @dataclass class PathStamped(RosMessageData): """An array of poses that represents a Path for a robot to follow""" header: Header = field(default_factory=Header) poses: "list[PoseStamped]" = field(default_factory=list)
[docs] @staticmethod def msg_type() -> str: return "nav_msgs/Path"
[docs] @classmethod def from_dict(cls, d: dict) -> "Self": return cls(header=Header.from_dict(d["header"]), poses=[PoseStamped.from_dict(p) for p in d["poses"]])
[docs] @dataclass class PoseArray(RosMessageData): """An array of poses that represents a set of points in space""" header: Header = field(default_factory=Header) poses: "list[Pose]" = field(default_factory=list)
[docs] @staticmethod def msg_type() -> str: return "geometry_msgs/PoseArray"
[docs] @classmethod def from_dict(cls, d: dict) -> "Self": return cls(header=Header.from_dict(d["header"]), poses=[Pose.from_dict(p) for p in d["poses"]])
[docs] @dataclass class PrescanStatus(RosMessageData): """Whether the simulation is ready to start""" ready: bool = False stop: bool = False
[docs] @staticmethod def msg_type() -> str: return "symaware_msgs/PrescanStatus"
[docs] @classmethod def from_dict(cls, d: dict) -> "Self": return cls(ready=d["ready"], stop=d["stop"])
[docs] @dataclass class LaserScan(RosMessageData): """ Single scan from a planar laser range-finder If you have another ranging device with different behavior (e.g. a sonar array), please find or create a different message, since applications will make fairly laser-specific assumptions about this data Args ---- header: Timestamp in the header is the acquisition time of the first ray in the scan. In frame frame_id, angles are measured around the positive Z axis (counterclockwise, if Z is up) with zero angle being forward along the x axis angle_min: Start angle of the scan [rad] angle_max: End angle of the scan [rad] angle_increment: Angular distance between measurements [rad] time_increment: Time between measurements [seconds] - if your scanner is moving, this will be used in interpolating position of 3d points scan_time: time between scans [seconds] range_min: minimum range value [m] range_max: maximum range value [m] ranges: range data [m] (Note: values < range_min or > range_max should be discarded) intensities: intensity data [device-specific units]. If your device does not provide intensities, please leave the array empty. """ header: Header = field(default_factory=Header) angle_min: float = 0.0 angle_max: float = 0.0 angle_increment: float = 0.0 time_increment: float = 0.0 scan_time: float = 0.0 range_min: float = 0.0 range_max: float = 0.0 ranges: "list[float]" = field(default_factory=list) intensities: "list[float]" = field(default_factory=list)
[docs] @staticmethod def msg_type() -> str: return "sensor_msgs/LaserScan"
[docs] @classmethod def from_dict(cls, d: dict) -> "Self": return cls( header=Header.from_dict(d["header"]), angle_min=d["angle_min"], angle_max=d["angle_max"], angle_increment=d["angle_increment"], time_increment=d["time_increment"], scan_time=d["scan_time"], range_min=d["range_min"], range_max=d["range_max"], ranges=[r for r in d["ranges"]], intensities=[i for i in d["intensities"]], )
[docs] @dataclass class PoseWithCovariance(RosMessageData): """ This represents a pose in free space with uncertainty. The pose is defined by a position and orientation. Args ---- pose: Pose covariance: float64[36] """ pose: Pose = field(default_factory=Pose) covariance: "list[float]" = field(default_factory=lambda: [0.0] * 36)
[docs] @staticmethod def msg_type() -> str: return "geometry_msgs/PoseWithCovariance"
[docs] @classmethod def from_dict(cls, d: dict) -> "Self": return cls(pose=Pose.from_dict(d["pose"]), covariance=[c for c in d["covariance"]])
def __array__(self): """Convert to numpy array concatenating pose and covariance.""" return np.concatenate((np.array(self.pose), np.array(self.covariance)))
[docs] @dataclass class Twist(RosMessageData): """ This expresses velocity in free space broken into its linear and angular parts. Args ---- linear: Vector3 angular: Vector3 """ linear: "Vector3" = field(default_factory=Vector3) angular: "Vector3" = field(default_factory=Vector3)
[docs] @staticmethod def msg_type() -> str: return "geometry_msgs/Twist"
[docs] @classmethod def from_dict(cls, d: dict) -> "Self": return cls(linear=Vector3.from_dict(d["linear"]), angular=Vector3.from_dict(d["angular"]))
def __array__(self): """Convert to numpy array concatenating linear and angular.""" return np.concatenate((np.array(self.linear), np.array(self.angular)))
[docs] @dataclass class TwistWithCovariance(RosMessageData): """ This expresses velocity in free space with uncertainty. Args ---- twist: Twist covariance: float64[36] """ twist: Twist = field(default_factory=Twist) covariance: "list[float]" = field(default_factory=lambda: [0.0] * 36)
[docs] @staticmethod def msg_type() -> str: return "geometry_msgs/TwistWithCovariance"
[docs] @classmethod def from_dict(cls, d: dict) -> "Self": return cls(twist=Twist.from_dict(d["twist"]), covariance=[c for c in d["covariance"]])
def __array__(self): """Convert to numpy array concatenating twist and covariance.""" return np.concatenate((np.array(self.twist), np.array(self.covariance)))
[docs] @dataclass class Odometry(RosMessageData): """ This represents an estimate of a position and velocity in free space. The pose in this message should be specified in the coordinate frame given by header.frame_id. The twist in this message should be specified in the coordinate frame given by the child_frame_id Args ---- header: Header child_frame_id: string pose: PoseWithCovariance twist: TwistWithCovariance """ header: Header = field(default_factory=Header) child_frame_id: str = "" pose: PoseWithCovariance = field(default_factory=PoseWithCovariance) twist: TwistWithCovariance = field(default_factory=TwistWithCovariance)
[docs] @staticmethod def msg_type() -> str: return "nav_msgs/Odometry"
[docs] @classmethod def from_dict(cls, d: dict) -> "Self": return cls( header=Header.from_dict(d["header"]), child_frame_id=d["child_frame_id"], pose=PoseWithCovariance.from_dict(d["pose"]), twist=TwistWithCovariance.from_dict(d["twist"]), )
def __array__(self): """Convert to numpy array concatenating pose and twist.""" return np.concatenate((np.array(self.pose), np.array(self.twist)))
[docs] @dataclass class AckermannDrive(RosMessageData): """Driving command for a car-like vehicle using Ackermann steering Assumes Ackermann front-wheel steering. The left and right front wheels are generally at different angles. To simplify, the commanded angle corresponds to the yaw of a virtual wheel located at the center of the front axle, like on a tricycle. Positive yaw is to the left. (This is **not** the angle of the steering wheel inside the passenger compartment.) Zero steering angle velocity means change the steering angle as quickly as possible. Positive velocity indicates a desired absolute rate of change either left or right. The controller tries not to exceed this limit in either direction, but sometimes it might. Drive at requested speed, acceleration and jerk (the 1st, 2nd and 3rd derivatives of position). All are measured at the vehicle's center of rotation, typically the center of the rear axle. The controller tries not to exceed these limits in either direction, but sometimes it might. Speed is the desired scalar magnitude of the velocity vector. Direction is forward unless the sign is negative, indicating reverse. Zero acceleration means change speed as quickly as possible. Positive acceleration indicates a desired absolute magnitude; that includes deceleration. Zero jerk means change acceleration as quickly as possible. Positive jerk indicates a desired absolute rate of acceleration change in either direction (increasing or decreasing). Args ---- steering_angle: desired virtual angle (radians) steering_angle_velocity: desired rate of change (radians/s) speed: desired forward speed (m/s) acceleration: desired acceleration (m/s^2) jerk: desired jerk (m/s^3) """ steering_angle: float = 0.0 steering_angle_velocity: float = 0.0 speed: float = 0.0 acceleration: float = 0.0 jerk: float = 0.0
[docs] @staticmethod def msg_type() -> str: return "ackermann_msgs/AckermannDrive"
[docs] @classmethod def from_dict(cls, d: dict) -> "Self": return cls( steering_angle=d["steering_angle"], steering_angle_velocity=d["steering_angle_velocity"], speed=["speed"], acceleration=d["acceleration"], jerk=["jerk"], )
def __array__(self): """Convert to numpy array of drive parameters.""" return np.array([self.steering_angle, self.steering_angle_velocity, self.speed, self.acceleration, self.jerk])
[docs] @dataclass class AckermannDriveStamped(RosMessageData): """Time stamped drive command for robots with Ackermann steering Args ---- header: Header drive: AckermannDrive """ header: Header = field(default_factory=Header) drive: AckermannDrive = field(default_factory=AckermannDrive)
[docs] @staticmethod def msg_type() -> str: return "ackermann_msgs/AckermannDriveStamped"
[docs] @classmethod def from_dict(cls, d: dict) -> "Self": return cls(header=Header.from_dict(d["header"]), drive=AckermannDrive.from_dict(d["drive"]))
def __array__(self): """Convert to numpy array of drive.""" return np.array(self.drive)
[docs] @dataclass(frozen=True) class RosLibMessage(Message): """ Simple class to store messages between agents. Args ---- sender_id: Id of the sender agent receiver_id: Id of the receiver agent data: ROS message """ data: RosMessageData