Source code for symaware.extra.ros.ros1.models

from typing import Any

import numpy as np
from symaware.base import Identifier

from symaware.extra.ros import AckermannDriveModel, TwistDriveModel

from .ros_client import Ros1Client

from rospy import AckermannDriveStamped, AckermannDrive, Twist


[docs] class Ros1AckermannDriveModel(AckermannDriveModel[AckermannDriveStamped]): """ Dynamical model using the AckermannDrive type messages as a control input. """ def __init__( self, ID: Identifier, topic: str = "", compression: Any = None, latch: bool = False, throttle_rate: int = 0, queue_size: int = 0, reconnect_on_close: bool = True, ): super().__init__( ID=ID, topic=topic, message_type=AckermannDriveStamped, compression=compression, latch=latch, throttle_rate=throttle_rate, queue_size=queue_size, reconnect_on_close=reconnect_on_close, )
[docs] def step(self): if self._topic is not None: self._topic.publish(AckermannDriveStamped(drive=AckermannDrive(*self._control_input)).to_dict())
[docs] class Ros1TwistDriveModel(TwistDriveModel[Twist]): """ Dynamical model using the twist type messages as a control input. """ def __init__( self, ID: Identifier, topic: str = "", compression: Any = None, latch: bool = False, throttle_rate: int = 0, queue_size: int = 0, reconnect_on_close: bool = True, ): super().__init__( ID=ID, topic=topic, message_type=AckermannDriveStamped, compression=compression, latch=latch, throttle_rate=throttle_rate, queue_size=queue_size, reconnect_on_close=reconnect_on_close, )
[docs] def step(self): if self._topic is not None: self._topic.publish( Twist(linear=self._control_input[:3].tolist(), angular=self._control_input[3:].tolist()).to_dict() )