Source code for symaware.extra.ros.models.dynamical_model

from typing import Any, Generic, TypedDict

import numpy as np
from symaware.base import DynamicalModel, Identifier

from symaware.extra.ros.utils import RosClient, TMessage


[docs] class AckermannDriveModelSubinputs(TypedDict): """Type definition for Ackermann drive model sub-inputs. Args ---- steering_angle: The angle of the steering, in radians. steering_angle_velocity: The velocity of the steering angle, in radians per second. speed: The speed of the vehicle, in meters per second. acceleration: The acceleration of the vehicle, in meters per second squared. jerk: The jerk of the vehicle, in meters per second cubed. """ steering_angle: float steering_angle_velocity: float speed: float acceleration: float jerk: float
[docs] class TwistDriveModelSubinputs(TypedDict): """Type definition for twist drive model sub-inputs. Args ---- linear: Linear velocity vector (vx, vy, vz) in meters per second. angular: Angular velocity vector (wx, wy, wz) in radians per second. """ linear: np.ndarray angular: np.ndarray
[docs] class OdometryModelSubinputs(TypedDict): """Type definition for odometry model sub-inputs. Args ---- position: Position vector (x, y, z) in meters. orientation: Orientation quaternion (qx, qy, qz, qw). speed_linear: Linear velocity vector (vx, vy, vz) in meters per second. speed_angular: Angular velocity vector (wx, wy, wz) in radians per second. """ position: np.ndarray orientation: np.ndarray speed_linear: np.ndarray speed_angular: np.ndarray
[docs] class RosClientDynamicalModel(DynamicalModel, Generic[TMessage]): """Base dynamical model that interfaces with ROS client.""" def __init__( self, ID: Identifier, control_input: np.ndarray, topic: "str" = "", message_type: "TMessage" = None, compression: Any = None, latch: bool = False, throttle_rate: int = 0, queue_size: int = 0, reconnect_on_close: bool = True, ): super().__init__(ID, control_input) self._ros: "RosClient | None" = None self._topic_name = topic self._message_type = message_type self._compression = compression self._latch = latch self._throttle_rate = throttle_rate self._queue_size = queue_size self._reconnect_on_close = reconnect_on_close self._publisher: "object | None" = None @property def topic(self) -> "str": """ROS topic name.""" return self._topic_name @property def message_type(self) -> "type[TMessage] | None": """ROS message type.""" return self._message_type @property def is_publisher(self) -> bool: """Return True if ROS client is initialized.""" return self._ros is not None
[docs] def initialise(self, ros_client: "RosClient | None" = None): """Initialise ROS client and create publisher.""" self._ros = ros_client if self.is_publisher: self._publisher = self._ros.publish_topic( topic_name=self._topic_name, message_type=self._message_type, compression=self._compression, latch=self._latch, throttle_rate=self._throttle_rate, queue_size=self._queue_size, reconnect_on_close=self._reconnect_on_close, )
[docs] class AckermannDriveModel(RosClientDynamicalModel[TMessage]): """ Dynamical model using the AckermannDrive type messages as a control input. """ def __init__( self, ID: Identifier, topic: "str | None" = None, message_type: "TMessage" = None, compression: Any = None, latch: bool = False, throttle_rate: int = 0, queue_size: int = 0, reconnect_on_close: bool = True, ): super().__init__( ID=ID, control_input=np.zeros(5), topic=topic, message_type=message_type, compression=compression, latch=latch, throttle_rate=throttle_rate, queue_size=queue_size, reconnect_on_close=reconnect_on_close, ) @property def subinputs_dict(self) -> AckermannDriveModelSubinputs: return { "steering_angle": self._control_input[0], "steering_angle_velocity": self._control_input[1], "speed": self._control_input[2], "acceleration": self._control_input[3], "jerk": self.control_input[4], }
[docs] @staticmethod def control_input_to_array( steering_angle: float = 0.0, steering_angle_velocity: float = 0.0, speed: float = 0.0, acceleration: float = 0.0, jerk: float = 0.0, ) -> np.ndarray: return np.array((steering_angle, steering_angle_velocity, speed, acceleration, jerk))
[docs] class TwistDriveModel(RosClientDynamicalModel[TMessage]): """ Dynamical model using the twist type messages as a control input. """ def __init__( self, ID: Identifier, topic: "str | None" = None, message_type: "TMessage" = None, compression: Any = None, latch: bool = False, throttle_rate: int = 0, queue_size: int = 0, reconnect_on_close: bool = True, ): super().__init__( ID=ID, control_input=np.zeros(6), topic=topic, message_type=message_type, compression=compression, latch=latch, throttle_rate=throttle_rate, queue_size=queue_size, reconnect_on_close=reconnect_on_close, ) @property def subinputs_dict(self) -> TwistDriveModelSubinputs: return { "linear": self._control_input[:3], "angular": self._control_input[3:], }
[docs] @staticmethod def control_input_to_array( linear_x: float = 0.0, linear_y: float = 0.0, linear_z: float = 0.0, angular_x: float = 0.0, angular_y: float = 0.0, angular_z: float = 0.0, ) -> np.ndarray: return np.array((linear_x, linear_y, linear_z, angular_x, angular_y, angular_z))
[docs] class OdometryModel(RosClientDynamicalModel[TMessage]): """ Dynamical model using the odometry type messages as a control input. """ def __init__( self, ID: Identifier, topic: "str | None" = None, message_type: "TMessage" = None, compression: Any = None, latch: bool = False, throttle_rate: int = 0, queue_size: int = 0, reconnect_on_close: bool = True, ): super().__init__( ID=ID, control_input=np.zeros(13), topic=topic, message_type=message_type, compression=compression, latch=latch, throttle_rate=throttle_rate, queue_size=queue_size, reconnect_on_close=reconnect_on_close, ) @property def subinputs_dict(self) -> OdometryModelSubinputs: return { "position": self._control_input[:3], "orientation": self._control_input[3:7], "speed_linear": self._control_input[7:10], "speed_angular": self._control_input[10:], }
[docs] @staticmethod def control_input_to_array( position_x: float = 0.0, position_y: float = 0.0, position_z: float = 0.0, orientation_x: float = 0.0, orientation_y: float = 0.0, orientation_z: float = 0.0, orientation_w: float = 0.0, speed_linear_x: float = 0.0, speed_linear_y: float = 0.0, speed_linear_z: float = 0.0, speed_angular_x: float = 0.0, speed_angular_y: float = 0.0, speed_angular_z: float = 0.0, ) -> np.ndarray: return np.array( ( position_x, position_y, position_z, orientation_x, orientation_y, orientation_z, orientation_w, speed_linear_x, speed_linear_y, speed_linear_z, speed_angular_x, speed_angular_y, speed_angular_z, ) )