Source code for symaware.extra.ros.models.dynamical_model
from typing import Any, Generic, TypedDict
import numpy as np
from symaware.base import DynamicalModel, Identifier
from symaware.extra.ros.utils import RosClient, TMessage
[docs]
class AckermannDriveModelSubinputs(TypedDict):
"""Type definition for Ackermann drive model sub-inputs.
Args
----
steering_angle:
The angle of the steering, in radians.
steering_angle_velocity:
The velocity of the steering angle, in radians per second.
speed:
The speed of the vehicle, in meters per second.
acceleration:
The acceleration of the vehicle, in meters per second squared.
jerk:
The jerk of the vehicle, in meters per second cubed.
"""
steering_angle: float
steering_angle_velocity: float
speed: float
acceleration: float
jerk: float
[docs]
class TwistDriveModelSubinputs(TypedDict):
"""Type definition for twist drive model sub-inputs.
Args
----
linear:
Linear velocity vector (vx, vy, vz) in meters per second.
angular:
Angular velocity vector (wx, wy, wz) in radians per second.
"""
linear: np.ndarray
angular: np.ndarray
[docs]
class OdometryModelSubinputs(TypedDict):
"""Type definition for odometry model sub-inputs.
Args
----
position:
Position vector (x, y, z) in meters.
orientation:
Orientation quaternion (qx, qy, qz, qw).
speed_linear:
Linear velocity vector (vx, vy, vz) in meters per second.
speed_angular:
Angular velocity vector (wx, wy, wz) in radians per second.
"""
position: np.ndarray
orientation: np.ndarray
speed_linear: np.ndarray
speed_angular: np.ndarray
[docs]
class RosClientDynamicalModel(DynamicalModel, Generic[TMessage]):
"""Base dynamical model that interfaces with ROS client."""
def __init__(
self,
ID: Identifier,
control_input: np.ndarray,
topic: "str" = "",
message_type: "TMessage" = None,
compression: Any = None,
latch: bool = False,
throttle_rate: int = 0,
queue_size: int = 0,
reconnect_on_close: bool = True,
):
super().__init__(ID, control_input)
self._ros: "RosClient | None" = None
self._topic_name = topic
self._message_type = message_type
self._compression = compression
self._latch = latch
self._throttle_rate = throttle_rate
self._queue_size = queue_size
self._reconnect_on_close = reconnect_on_close
self._publisher: "object | None" = None
@property
def topic(self) -> "str":
"""ROS topic name."""
return self._topic_name
@property
def message_type(self) -> "type[TMessage] | None":
"""ROS message type."""
return self._message_type
@property
def is_publisher(self) -> bool:
"""Return True if ROS client is initialized."""
return self._ros is not None
[docs]
def initialise(self, ros_client: "RosClient | None" = None):
"""Initialise ROS client and create publisher."""
self._ros = ros_client
if self.is_publisher:
self._publisher = self._ros.publish_topic(
topic_name=self._topic_name,
message_type=self._message_type,
compression=self._compression,
latch=self._latch,
throttle_rate=self._throttle_rate,
queue_size=self._queue_size,
reconnect_on_close=self._reconnect_on_close,
)
[docs]
class AckermannDriveModel(RosClientDynamicalModel[TMessage]):
"""
Dynamical model using the AckermannDrive type messages as a control input.
"""
def __init__(
self,
ID: Identifier,
topic: "str | None" = None,
message_type: "TMessage" = None,
compression: Any = None,
latch: bool = False,
throttle_rate: int = 0,
queue_size: int = 0,
reconnect_on_close: bool = True,
):
super().__init__(
ID=ID,
control_input=np.zeros(5),
topic=topic,
message_type=message_type,
compression=compression,
latch=latch,
throttle_rate=throttle_rate,
queue_size=queue_size,
reconnect_on_close=reconnect_on_close,
)
@property
def subinputs_dict(self) -> AckermannDriveModelSubinputs:
return {
"steering_angle": self._control_input[0],
"steering_angle_velocity": self._control_input[1],
"speed": self._control_input[2],
"acceleration": self._control_input[3],
"jerk": self.control_input[4],
}
[docs]
@staticmethod
def control_input_to_array(
steering_angle: float = 0.0,
steering_angle_velocity: float = 0.0,
speed: float = 0.0,
acceleration: float = 0.0,
jerk: float = 0.0,
) -> np.ndarray:
return np.array((steering_angle, steering_angle_velocity, speed, acceleration, jerk))
[docs]
class TwistDriveModel(RosClientDynamicalModel[TMessage]):
"""
Dynamical model using the twist type messages as a control input.
"""
def __init__(
self,
ID: Identifier,
topic: "str | None" = None,
message_type: "TMessage" = None,
compression: Any = None,
latch: bool = False,
throttle_rate: int = 0,
queue_size: int = 0,
reconnect_on_close: bool = True,
):
super().__init__(
ID=ID,
control_input=np.zeros(6),
topic=topic,
message_type=message_type,
compression=compression,
latch=latch,
throttle_rate=throttle_rate,
queue_size=queue_size,
reconnect_on_close=reconnect_on_close,
)
@property
def subinputs_dict(self) -> TwistDriveModelSubinputs:
return {
"linear": self._control_input[:3],
"angular": self._control_input[3:],
}
[docs]
@staticmethod
def control_input_to_array(
linear_x: float = 0.0,
linear_y: float = 0.0,
linear_z: float = 0.0,
angular_x: float = 0.0,
angular_y: float = 0.0,
angular_z: float = 0.0,
) -> np.ndarray:
return np.array((linear_x, linear_y, linear_z, angular_x, angular_y, angular_z))
[docs]
class OdometryModel(RosClientDynamicalModel[TMessage]):
"""
Dynamical model using the odometry type messages as a control input.
"""
def __init__(
self,
ID: Identifier,
topic: "str | None" = None,
message_type: "TMessage" = None,
compression: Any = None,
latch: bool = False,
throttle_rate: int = 0,
queue_size: int = 0,
reconnect_on_close: bool = True,
):
super().__init__(
ID=ID,
control_input=np.zeros(13),
topic=topic,
message_type=message_type,
compression=compression,
latch=latch,
throttle_rate=throttle_rate,
queue_size=queue_size,
reconnect_on_close=reconnect_on_close,
)
@property
def subinputs_dict(self) -> OdometryModelSubinputs:
return {
"position": self._control_input[:3],
"orientation": self._control_input[3:7],
"speed_linear": self._control_input[7:10],
"speed_angular": self._control_input[10:],
}
[docs]
@staticmethod
def control_input_to_array(
position_x: float = 0.0,
position_y: float = 0.0,
position_z: float = 0.0,
orientation_x: float = 0.0,
orientation_y: float = 0.0,
orientation_z: float = 0.0,
orientation_w: float = 0.0,
speed_linear_x: float = 0.0,
speed_linear_y: float = 0.0,
speed_linear_z: float = 0.0,
speed_angular_x: float = 0.0,
speed_angular_y: float = 0.0,
speed_angular_z: float = 0.0,
) -> np.ndarray:
return np.array(
(
position_x,
position_y,
position_z,
orientation_x,
orientation_y,
orientation_z,
orientation_w,
speed_linear_x,
speed_linear_y,
speed_linear_z,
speed_angular_x,
speed_angular_y,
speed_angular_z,
)
)