from typing import Any
import numpy as np
from symaware.base import Identifier
from symaware.extra.ros import AckermannDriveModel, TwistDriveModel
from .ros_client import Ros1Client
from rospy import AckermannDriveStamped, AckermannDrive, Twist
[docs]
class Ros1AckermannDriveModel(AckermannDriveModel[AckermannDriveStamped]):
"""
Dynamical model using the AckermannDrive type messages as a control input.
"""
def __init__(
self,
ID: Identifier,
topic: str = "",
compression: Any = None,
latch: bool = False,
throttle_rate: int = 0,
queue_size: int = 0,
reconnect_on_close: bool = True,
):
super().__init__(
ID=ID,
topic=topic,
message_type=AckermannDriveStamped,
compression=compression,
latch=latch,
throttle_rate=throttle_rate,
queue_size=queue_size,
reconnect_on_close=reconnect_on_close,
)
[docs]
def step(self):
if self._topic is not None:
self._topic.publish(AckermannDriveStamped(drive=AckermannDrive(*self._control_input)).to_dict())
[docs]
class Ros1TwistDriveModel(TwistDriveModel[Twist]):
"""
Dynamical model using the twist type messages as a control input.
"""
def __init__(
self,
ID: Identifier,
topic: str = "",
compression: Any = None,
latch: bool = False,
throttle_rate: int = 0,
queue_size: int = 0,
reconnect_on_close: bool = True,
):
super().__init__(
ID=ID,
topic=topic,
message_type=AckermannDriveStamped,
compression=compression,
latch=latch,
throttle_rate=throttle_rate,
queue_size=queue_size,
reconnect_on_close=reconnect_on_close,
)
[docs]
def step(self):
if self._topic is not None:
self._topic.publish(
Twist(linear=self._control_input[:3].tolist(), angular=self._control_input[3:].tolist()).to_dict()
)