from typing import Any
from symaware.base import Identifier
from symaware.extra.ros import AckermannDriveModel, OdometryModel, TwistDriveModel
from .data_structures import (
AckermannDrive,
AckermannDriveStamped,
Odometry,
Point,
Pose,
PoseWithCovariance,
Quaternion,
Twist,
TwistWithCovariance,
Vector3,
)
[docs]
class RosLibAckermannDriveModel(AckermannDriveModel[AckermannDriveStamped]):
"""
Dynamical model using the AckermannDrive type messages as a control input.
"""
def __init__(
self,
ID: Identifier,
topic: str = "",
compression: Any = None,
latch: bool = False,
throttle_rate: int = 0,
queue_size: int = 0,
reconnect_on_close: bool = True,
):
super().__init__(
ID=ID,
topic=topic,
message_type=AckermannDriveStamped,
compression=compression,
latch=latch,
throttle_rate=throttle_rate,
queue_size=queue_size,
reconnect_on_close=reconnect_on_close,
)
[docs]
def step(self):
if self.is_publisher:
self._ros.publish(self._publisher, AckermannDriveStamped(drive=AckermannDrive(*self._control_input)))
[docs]
class RosLibTwistDriveModel(TwistDriveModel[Twist]):
"""
Dynamical model using the twist type messages as a control input.
"""
def __init__(
self,
ID: Identifier,
topic: str = "",
compression: Any = None,
latch: bool = False,
throttle_rate: int = 0,
queue_size: int = 0,
reconnect_on_close: bool = True,
):
super().__init__(
ID=ID,
topic=topic,
message_type=Twist,
compression=compression,
latch=latch,
throttle_rate=throttle_rate,
queue_size=queue_size,
reconnect_on_close=reconnect_on_close,
)
[docs]
def step(self):
if self.is_publisher:
self._ros.publish(
self._publisher,
Twist(linear=Vector3(*self._control_input[:3]), angular=Vector3(*self._control_input[3:])),
)
[docs]
class RosLibOdometryeModel(OdometryModel[Odometry]):
"""
Dynamical model using the twist type messages as a control input.
"""
def __init__(
self,
ID: Identifier,
topic: str = "",
compression: Any = None,
latch: bool = False,
throttle_rate: int = 0,
queue_size: int = 0,
reconnect_on_close: bool = True,
):
super().__init__(
ID=ID,
topic=topic,
message_type=Odometry,
compression=compression,
latch=latch,
throttle_rate=throttle_rate,
queue_size=queue_size,
reconnect_on_close=reconnect_on_close,
)
[docs]
def step(self):
if self.is_publisher:
self._ros.publish(
self._publisher,
Odometry(
pose=PoseWithCovariance(
pose=Pose(
position=Point(*self._control_input[:3]), orientation=Quaternion(*self._control_input[3:7])
)
),
twist=TwistWithCovariance(
twist=Twist(
linear=Vector3(*self._control_input[7:10]), angular=Vector3(*self._control_input[10:])
)
),
),
)