Source code for symaware.extra.ros.roslib.models

from typing import Any

from symaware.base import Identifier

from symaware.extra.ros import AckermannDriveModel, OdometryModel, TwistDriveModel

from .data_structures import (
    AckermannDrive,
    AckermannDriveStamped,
    Odometry,
    Point,
    Pose,
    PoseWithCovariance,
    Quaternion,
    Twist,
    TwistWithCovariance,
    Vector3,
)


[docs] class RosLibAckermannDriveModel(AckermannDriveModel[AckermannDriveStamped]): """ Dynamical model using the AckermannDrive type messages as a control input. """ def __init__( self, ID: Identifier, topic: str = "", compression: Any = None, latch: bool = False, throttle_rate: int = 0, queue_size: int = 0, reconnect_on_close: bool = True, ): super().__init__( ID=ID, topic=topic, message_type=AckermannDriveStamped, compression=compression, latch=latch, throttle_rate=throttle_rate, queue_size=queue_size, reconnect_on_close=reconnect_on_close, )
[docs] def step(self): if self.is_publisher: self._ros.publish(self._publisher, AckermannDriveStamped(drive=AckermannDrive(*self._control_input)))
[docs] class RosLibTwistDriveModel(TwistDriveModel[Twist]): """ Dynamical model using the twist type messages as a control input. """ def __init__( self, ID: Identifier, topic: str = "", compression: Any = None, latch: bool = False, throttle_rate: int = 0, queue_size: int = 0, reconnect_on_close: bool = True, ): super().__init__( ID=ID, topic=topic, message_type=Twist, compression=compression, latch=latch, throttle_rate=throttle_rate, queue_size=queue_size, reconnect_on_close=reconnect_on_close, )
[docs] def step(self): if self.is_publisher: self._ros.publish( self._publisher, Twist(linear=Vector3(*self._control_input[:3]), angular=Vector3(*self._control_input[3:])), )
[docs] class RosLibOdometryeModel(OdometryModel[Odometry]): """ Dynamical model using the twist type messages as a control input. """ def __init__( self, ID: Identifier, topic: str = "", compression: Any = None, latch: bool = False, throttle_rate: int = 0, queue_size: int = 0, reconnect_on_close: bool = True, ): super().__init__( ID=ID, topic=topic, message_type=Odometry, compression=compression, latch=latch, throttle_rate=throttle_rate, queue_size=queue_size, reconnect_on_close=reconnect_on_close, )
[docs] def step(self): if self.is_publisher: self._ros.publish( self._publisher, Odometry( pose=PoseWithCovariance( pose=Pose( position=Point(*self._control_input[:3]), orientation=Quaternion(*self._control_input[3:7]) ) ), twist=TwistWithCovariance( twist=Twist( linear=Vector3(*self._control_input[7:10]), angular=Vector3(*self._control_input[10:]) ) ), ), )