import json
from abc import ABC, abstractmethod
from dataclasses import asdict, dataclass, field
from typing import TYPE_CHECKING
import numpy as np
import roslibpy
from symaware.base import Message
if TYPE_CHECKING:
import sys
if sys.version_info >= (3, 11):
from typing import Self
else:
from typing_extensions import Self
[docs]
@dataclass
class RosMessageData(ABC):
[docs]
@staticmethod
def msg_type() -> str:
"""Return ROS message type string. Must be called on a subclass."""
raise TypeError("msg_type method should be called on a subclass")
[docs]
def to_dict(self) -> dict:
"""Convert dataclass to dictionary."""
return asdict(self)
[docs]
@classmethod
@abstractmethod
def from_dict(cls, d: dict) -> "Self":
pass
[docs]
@dataclass
class Time(RosMessageData):
"""Timestamp, using both seconds and nanoseconds"""
sec: int = field(default_factory=lambda: roslibpy.Time.now().secs)
nanosec: int = field(default_factory=lambda: roslibpy.Time.now().nsecs)
[docs]
@staticmethod
def msg_type() -> str:
return "std_msgs/Time"
[docs]
@classmethod
def from_dict(cls, d: dict) -> "Self":
if "secs" in d:
return cls(sec=d["secs"], nanosec=d["nsecs"])
return cls(sec=d["sec"], nanosec=d["nanosec"])
[docs]
@dataclass
class String(RosMessageData):
"""String message
Args
----
data:
string data
"""
data: str = ""
[docs]
@staticmethod
def msg_type() -> str:
return "std_msgs/String"
[docs]
@classmethod
def from_dict(cls, d: dict) -> "Self":
return cls(data=d["data"])
[docs]
@dataclass
class Bool(RosMessageData):
"""Boolean message
Args
----
data:
boolean data
"""
data: bool = False
[docs]
@staticmethod
def msg_type() -> str:
return "std_msgs/Bool"
[docs]
@classmethod
def from_dict(cls, d: dict) -> "Self":
return cls(data=d["data"])
[docs]
@dataclass
class Int32(RosMessageData):
"""32-bit integer message
Args
----
data:
32-bit integer data
"""
data: int = 0
[docs]
@staticmethod
def msg_type() -> str:
return "std_msgs/Int32"
[docs]
@classmethod
def from_dict(cls, d: dict) -> "Self":
return cls(data=d["data"])
[docs]
@dataclass
class Int32Array(RosMessageData):
"""32-bit integer array message
Args
----
data:
32-bit integer data
"""
data: "list[int]" = field(default_factory=list)
layout: dict = field(default_factory=dict)
def __post_init__(self):
"""Set up default layout dict for multi-array message."""
if not self.layout:
self.layout = {
"dim": [
{
"label": "array",
"size": len(self.data),
"stride": len(self.data),
}
],
"data_offset": 0,
}
[docs]
@staticmethod
def msg_type() -> str:
return "std_msgs/Int32MultiArray"
[docs]
@classmethod
def from_dict(cls, d: dict) -> "Self":
return cls(data=d["data"])
[docs]
@dataclass
class Float32(RosMessageData):
"""32-bit float message
Args
----
data:
32-bit float data
"""
data: float = 0.0
[docs]
@staticmethod
def msg_type() -> str:
return "std_msgs/Float32"
[docs]
@classmethod
def from_dict(cls, d: dict) -> "Self":
return cls(data=d["data"])
[docs]
@dataclass
class Float32Array(RosMessageData):
"""32-bit float array message
Args
----
data:
32-bit float data
"""
data: "list[float]" = field(default_factory=list)
layout: dict = field(default_factory=dict)
def __post_init__(self):
"""Set up default layout dict for multi-array message."""
if not self.layout:
self.layout = {
"dim": [
{
"label": "array",
"size": len(self.data),
"stride": len(self.data),
}
],
"data_offset": 0,
}
[docs]
@staticmethod
def msg_type() -> str:
return "std_msgs/Float32MultiArray"
[docs]
@classmethod
def from_dict(cls, d: dict) -> "Self":
return cls(data=d["data"])
[docs]
@dataclass
class Float64(RosMessageData):
"""64-bit float message
Args
----
data:
64-bit float data
"""
data: float = 0.0
[docs]
@staticmethod
def msg_type() -> str:
return "std_msgs/Float64"
[docs]
@classmethod
def from_dict(cls, d: dict) -> "Self":
return cls(data=d["data"])
[docs]
@dataclass
class Float64Array(RosMessageData):
"""64-bit float array message
Args
----
data:
64-bit float data
"""
data: "list[float]" = field(default_factory=list)
layout: dict = field(default_factory=dict)
def __post_init__(self):
"""Set up default layout dict for multi-array message."""
if not self.layout:
self.layout = {
"dim": [
{
"label": "array",
"size": len(self.data),
"stride": len(self.data),
}
],
"data_offset": 0,
}
[docs]
@staticmethod
def msg_type() -> str:
return "std_msgs/Float64MultiArray"
[docs]
@classmethod
def from_dict(cls, d: dict) -> "Self":
return cls(data=d["data"])
[docs]
@dataclass
class Vector3(RosMessageData):
"""
3-Dimensional vector
Args
----
x:
x
y:
y
z:
z
"""
x: float
y: float
z: float
[docs]
@staticmethod
def msg_type() -> str:
return "geometry_msgs/Vector3"
[docs]
@classmethod
def from_dict(cls, d: dict) -> "Self":
return cls(x=d["x"], y=d["y"], z=d["z"])
def __array__(self):
return np.array([self.x, self.y, self.z])
[docs]
@dataclass
class Point(RosMessageData):
"""3-Dimensional point"""
x: float = 0.0
y: float = 0.0
z: float = 0.0
[docs]
@staticmethod
def msg_type() -> str:
return "geometry_msgs/Point"
[docs]
@classmethod
def from_dict(cls, d: dict) -> "Self":
return cls(x=d["x"], y=d["y"], z=d["z"])
def __array__(self):
"""Convert to numpy array of [x, y, z]."""
return np.array([self.x, self.y, self.z])
[docs]
@dataclass
class Quaternion(RosMessageData):
"""Quaternion rotation"""
x: float = 0.0
y: float = 0.0
z: float = 0.0
w: float = 0.0
[docs]
@staticmethod
def msg_type() -> str:
return "geometry_msgs/Quaternion"
[docs]
@classmethod
def from_dict(cls, d: dict) -> "Self":
return cls(x=d["x"], y=d["y"], z=d["z"], w=d["w"])
def __array__(self):
"""Convert to numpy array of [x, y, z, w]."""
return np.array([self.x, self.y, self.z, self.w])
[docs]
@dataclass
class Pose(RosMessageData):
"""Pose, combining both a position and an orientation"""
position: Point = field(default_factory=Point)
orientation: Quaternion = field(default_factory=Quaternion)
[docs]
@staticmethod
def msg_type() -> str:
return "geometry_msgs/Pose"
[docs]
@classmethod
def from_dict(cls, d: dict) -> "Self":
return cls(position=Point.from_dict(d["position"]), orientation=Quaternion.from_dict(d["orientation"]))
def __array__(self):
"""Convert to numpy array concatenating position and orientation."""
return np.concatenate((np.array(self.position), np.array(self.orientation)))
[docs]
@dataclass
class PoseStamped(RosMessageData):
"""A Pose with reference coordinate frame and timestamp"""
header: Header = field(default_factory=Header)
pose: Pose = field(default_factory=Pose)
[docs]
@staticmethod
def msg_type() -> str:
return "geometry_msgs/PoseStamped"
[docs]
@classmethod
def from_dict(cls, d: dict) -> "Self":
return cls(header=Header.from_dict(d["header"]), pose=Pose.from_dict(d["pose"]))
def __array__(self):
"""Convert to numpy array of pose."""
return np.array(self.pose)
[docs]
@dataclass
class TrafficLightState(RosMessageData):
"""State of a traffic light"""
header: Header = field(default_factory=Header)
id: str = ""
position: Point = field(default_factory=Point)
status: str = ""
[docs]
@staticmethod
def msg_type() -> str:
return "symaware_msgs/TrafficLightState"
[docs]
@classmethod
def from_dict(cls, d: dict) -> "Self":
return cls(
header=Header.from_dict(d["header"]),
id=d["id"],
position=Point.from_dict(d["position"]),
status=d["status"],
)
[docs]
@dataclass
class AirSensorOutput(RosMessageData):
"""Output produced by the AIR sensor"""
header: Header = field(default_factory=Header)
tid: str = ""
range: float = 0.0
azimuth: float = 0.0
elevation: float = 0.0
velocity: float = 0.0
heading: float = 0.0
[docs]
@staticmethod
def msg_type() -> str:
return "symaware_msgs/AirSensorOutput"
[docs]
@classmethod
def from_dict(cls, d: dict) -> "Self":
return cls(
header=Header.from_dict(d["header"]),
tid=d["tid"],
range=d["range"],
azimuth=d["azimuth"],
elevation=d["elevation"],
velocity=d["velocity"],
heading=d["heading"],
)
def __array__(self):
"""Convert to numpy array of [range, azimuth, elevation, velocity, heading]."""
return np.array([self.range, self.azimuth, self.elevation, self.velocity, self.heading])
[docs]
@dataclass
class JsonAirSensorOutput(RosMessageData):
"""Output produced by the AIR sensor, sent as a JSON string"""
header: Header = field(default_factory=Header)
tid: str = ""
range: float = 0.0
azimuth: float = 0.0
elevation: float = 0.0
velocity: float = 0.0
heading: float = 0.0
[docs]
@staticmethod
def msg_type() -> str:
return "std_msgs/String"
[docs]
@classmethod
def from_dict(cls, d: dict) -> "Self":
js = json.loads(d["data"].replace("\0", "").replace("'", '"'))
return cls(
tid=js["id"],
range=js["range"],
azimuth=js["theta"],
elevation=0.0,
velocity=js["vel"],
heading=js["heading"],
)
def __array__(self):
"""Convert to numpy array of [range, azimuth, elevation, velocity, heading]."""
return np.array([self.range, self.azimuth, self.elevation, self.velocity, self.heading])
[docs]
@dataclass
class Path(RosMessageData):
"""List of points that make up a path"""
header: Header = field(default_factory=Header)
poses: "list[Pose]" = field(default_factory=list)
[docs]
@staticmethod
def msg_type() -> str:
return "symaware_msgs/Path"
[docs]
@classmethod
def from_dict(cls, d: dict) -> "Self":
return cls(header=Header.from_dict(d["header"]), poses=[Pose.from_dict(p) for p in d["poses"]])
[docs]
@dataclass
class PathStamped(RosMessageData):
"""An array of poses that represents a Path for a robot to follow"""
header: Header = field(default_factory=Header)
poses: "list[PoseStamped]" = field(default_factory=list)
[docs]
@staticmethod
def msg_type() -> str:
return "nav_msgs/Path"
[docs]
@classmethod
def from_dict(cls, d: dict) -> "Self":
return cls(header=Header.from_dict(d["header"]), poses=[PoseStamped.from_dict(p) for p in d["poses"]])
[docs]
@dataclass
class PoseArray(RosMessageData):
"""An array of poses that represents a set of points in space"""
header: Header = field(default_factory=Header)
poses: "list[Pose]" = field(default_factory=list)
[docs]
@staticmethod
def msg_type() -> str:
return "geometry_msgs/PoseArray"
[docs]
@classmethod
def from_dict(cls, d: dict) -> "Self":
return cls(header=Header.from_dict(d["header"]), poses=[Pose.from_dict(p) for p in d["poses"]])
[docs]
@dataclass
class PrescanStatus(RosMessageData):
"""Whether the simulation is ready to start"""
ready: bool = False
stop: bool = False
[docs]
@staticmethod
def msg_type() -> str:
return "symaware_msgs/PrescanStatus"
[docs]
@classmethod
def from_dict(cls, d: dict) -> "Self":
return cls(ready=d["ready"], stop=d["stop"])
[docs]
@dataclass
class LaserScan(RosMessageData):
"""
Single scan from a planar laser range-finder
If you have another ranging device with different behavior (e.g. a sonar
array), please find or create a different message, since applications
will make fairly laser-specific assumptions about this data
Args
----
header:
Timestamp in the header is the acquisition time of
the first ray in the scan.
In frame frame_id, angles are measured around
the positive Z axis (counterclockwise, if Z is up)
with zero angle being forward along the x axis
angle_min:
Start angle of the scan [rad]
angle_max:
End angle of the scan [rad]
angle_increment:
Angular distance between measurements [rad]
time_increment:
Time between measurements [seconds] - if your scanner
is moving, this will be used in interpolating position
of 3d points
scan_time:
time between scans [seconds]
range_min:
minimum range value [m]
range_max:
maximum range value [m]
ranges:
range data [m] (Note: values < range_min or > range_max should be discarded)
intensities:
intensity data [device-specific units]. If your
device does not provide intensities, please leave
the array empty.
"""
header: Header = field(default_factory=Header)
angle_min: float = 0.0
angle_max: float = 0.0
angle_increment: float = 0.0
time_increment: float = 0.0
scan_time: float = 0.0
range_min: float = 0.0
range_max: float = 0.0
ranges: "list[float]" = field(default_factory=list)
intensities: "list[float]" = field(default_factory=list)
[docs]
@staticmethod
def msg_type() -> str:
return "sensor_msgs/LaserScan"
[docs]
@classmethod
def from_dict(cls, d: dict) -> "Self":
return cls(
header=Header.from_dict(d["header"]),
angle_min=d["angle_min"],
angle_max=d["angle_max"],
angle_increment=d["angle_increment"],
time_increment=d["time_increment"],
scan_time=d["scan_time"],
range_min=d["range_min"],
range_max=d["range_max"],
ranges=[r for r in d["ranges"]],
intensities=[i for i in d["intensities"]],
)
[docs]
@dataclass
class PoseWithCovariance(RosMessageData):
"""
This represents a pose in free space with uncertainty.
The pose is defined by a position and orientation.
Args
----
pose:
Pose
covariance:
float64[36]
"""
pose: Pose = field(default_factory=Pose)
covariance: "list[float]" = field(default_factory=lambda: [0.0] * 36)
[docs]
@staticmethod
def msg_type() -> str:
return "geometry_msgs/PoseWithCovariance"
[docs]
@classmethod
def from_dict(cls, d: dict) -> "Self":
return cls(pose=Pose.from_dict(d["pose"]), covariance=[c for c in d["covariance"]])
def __array__(self):
"""Convert to numpy array concatenating pose and covariance."""
return np.concatenate((np.array(self.pose), np.array(self.covariance)))
[docs]
@dataclass
class Twist(RosMessageData):
"""
This expresses velocity in free space broken into its linear and angular parts.
Args
----
linear:
Vector3
angular:
Vector3
"""
linear: "Vector3" = field(default_factory=Vector3)
angular: "Vector3" = field(default_factory=Vector3)
[docs]
@staticmethod
def msg_type() -> str:
return "geometry_msgs/Twist"
[docs]
@classmethod
def from_dict(cls, d: dict) -> "Self":
return cls(linear=Vector3.from_dict(d["linear"]), angular=Vector3.from_dict(d["angular"]))
def __array__(self):
"""Convert to numpy array concatenating linear and angular."""
return np.concatenate((np.array(self.linear), np.array(self.angular)))
[docs]
@dataclass
class TwistWithCovariance(RosMessageData):
"""
This expresses velocity in free space with uncertainty.
Args
----
twist:
Twist
covariance:
float64[36]
"""
twist: Twist = field(default_factory=Twist)
covariance: "list[float]" = field(default_factory=lambda: [0.0] * 36)
[docs]
@staticmethod
def msg_type() -> str:
return "geometry_msgs/TwistWithCovariance"
[docs]
@classmethod
def from_dict(cls, d: dict) -> "Self":
return cls(twist=Twist.from_dict(d["twist"]), covariance=[c for c in d["covariance"]])
def __array__(self):
"""Convert to numpy array concatenating twist and covariance."""
return np.concatenate((np.array(self.twist), np.array(self.covariance)))
[docs]
@dataclass
class Odometry(RosMessageData):
"""
This represents an estimate of a position and velocity in free space.
The pose in this message should be specified in the coordinate frame given by header.frame_id.
The twist in this message should be specified in the coordinate frame given by the child_frame_id
Args
----
header:
Header
child_frame_id:
string
pose:
PoseWithCovariance
twist:
TwistWithCovariance
"""
header: Header = field(default_factory=Header)
child_frame_id: str = ""
pose: PoseWithCovariance = field(default_factory=PoseWithCovariance)
twist: TwistWithCovariance = field(default_factory=TwistWithCovariance)
[docs]
@staticmethod
def msg_type() -> str:
return "nav_msgs/Odometry"
[docs]
@classmethod
def from_dict(cls, d: dict) -> "Self":
return cls(
header=Header.from_dict(d["header"]),
child_frame_id=d["child_frame_id"],
pose=PoseWithCovariance.from_dict(d["pose"]),
twist=TwistWithCovariance.from_dict(d["twist"]),
)
def __array__(self):
"""Convert to numpy array concatenating pose and twist."""
return np.concatenate((np.array(self.pose), np.array(self.twist)))
[docs]
@dataclass
class AckermannDrive(RosMessageData):
"""Driving command for a car-like vehicle using Ackermann steering
Assumes Ackermann front-wheel steering. The left and right front
wheels are generally at different angles. To simplify, the commanded
angle corresponds to the yaw of a virtual wheel located at the
center of the front axle, like on a tricycle. Positive yaw is to
the left. (This is **not** the angle of the steering wheel inside the
passenger compartment.)
Zero steering angle velocity means change the steering angle as
quickly as possible. Positive velocity indicates a desired absolute
rate of change either left or right. The controller tries not to
exceed this limit in either direction, but sometimes it might.
Drive at requested speed, acceleration and jerk (the 1st, 2nd and
3rd derivatives of position). All are measured at the vehicle's
center of rotation, typically the center of the rear axle. The
controller tries not to exceed these limits in either direction, but
sometimes it might.
Speed is the desired scalar magnitude of the velocity vector.
Direction is forward unless the sign is negative, indicating reverse.
Zero acceleration means change speed as quickly as
possible. Positive acceleration indicates a desired absolute
magnitude; that includes deceleration.
Zero jerk means change acceleration as quickly as possible. Positive
jerk indicates a desired absolute rate of acceleration change in
either direction (increasing or decreasing).
Args
----
steering_angle:
desired virtual angle (radians)
steering_angle_velocity:
desired rate of change (radians/s)
speed:
desired forward speed (m/s)
acceleration:
desired acceleration (m/s^2)
jerk:
desired jerk (m/s^3)
"""
steering_angle: float = 0.0
steering_angle_velocity: float = 0.0
speed: float = 0.0
acceleration: float = 0.0
jerk: float = 0.0
[docs]
@staticmethod
def msg_type() -> str:
return "ackermann_msgs/AckermannDrive"
[docs]
@classmethod
def from_dict(cls, d: dict) -> "Self":
return cls(
steering_angle=d["steering_angle"],
steering_angle_velocity=d["steering_angle_velocity"],
speed=["speed"],
acceleration=d["acceleration"],
jerk=["jerk"],
)
def __array__(self):
"""Convert to numpy array of drive parameters."""
return np.array([self.steering_angle, self.steering_angle_velocity, self.speed, self.acceleration, self.jerk])
[docs]
@dataclass
class AckermannDriveStamped(RosMessageData):
"""Time stamped drive command for robots with Ackermann steering
Args
----
header:
Header
drive:
AckermannDrive
"""
header: Header = field(default_factory=Header)
drive: AckermannDrive = field(default_factory=AckermannDrive)
[docs]
@staticmethod
def msg_type() -> str:
return "ackermann_msgs/AckermannDriveStamped"
[docs]
@classmethod
def from_dict(cls, d: dict) -> "Self":
return cls(header=Header.from_dict(d["header"]), drive=AckermannDrive.from_dict(d["drive"]))
def __array__(self):
"""Convert to numpy array of drive."""
return np.array(self.drive)
[docs]
@dataclass(frozen=True)
class RosLibMessage(Message):
"""
Simple class to store messages between agents.
Args
----
sender_id:
Id of the sender agent
receiver_id:
Id of the receiver agent
data:
ROS message
"""
data: RosMessageData