Source code for symaware.extra.ros.utils.ros_client

from abc import ABC, abstractmethod
from typing import Any, Callable, Generic, TypeVar

TSubscriber = TypeVar("TSubscriber")
TPublisher = TypeVar("TPublisher")
TMessage = TypeVar("TMessage")


[docs] class RosClient(ABC, Generic[TMessage, TSubscriber, TPublisher]): """ Interface for ROS clients used to communicate with a ROS network. The subclasses specialise the implementation in order to use different ROS versions. Those include `roslibpy` for [ROS bridge](http://wiki.ros.org/rosbridge_suite), `rospy` for [ROS1 Python client library](http://wiki.ros.org/rospy) and `rclpy` for [ROS2 Python client library](https://github.com/ros2/rclpy). Using this class allows to abstract the communication with the ROS network, making it easier to switch between different ROS versions. Warning ------- The message types used in the topics must be compatible with the ROS version used. Use the message types from the libraries if you are using `rospy` or `rclpy`. Use the message types provided by this package if you are using `roslibpy`. """ def __init__(self): """Initialise the ROS client with empty publisher and subscriber lists.""" self._publishers: "list[TPublisher]" = [] self._subscribers: "list[TPublisher]" = [] def __enter__(self): """Context manager entry, connects to ROS.""" self.connect() return self def __exit__(self, exc_type, exc_value, traceback): """Context manager exit, disconnects from ROS.""" self.disconnect()
[docs] def connect(self): """Connect to the ROS network. Override in subclasses.""" pass
[docs] def disconnect(self): """Disconnect from the ROS network. Override in subclasses.""" pass
@property def publishers(self): """List of created publishers.""" return self._publishers @property def subscribers(self): """List of created subscribers.""" return self._subscribers
[docs] @abstractmethod def publish_topic( self, topic_name: str, message_type: "type[TMessage]", compression: Any = None, latch: bool = False, throttle_rate: int = 0, queue_size: int = 0, reconnect_on_close: bool = True, ) -> TPublisher: """ Create and advertise a new ROS topic with the given name and message_type. Args ---- topic_name: ROS topic the sender publish to, e.g. `/my_topic` message_type: Type of messages the sender will publish, e.g. `std_msgs/String` compression: Type of compression to use, e.g. `png` latch: True to latch the topic when publishing, False otherwise throttle_rate: Rate (in ms between messages) at which to throttle the topics queue_size: Queue size created at bridge side for re-publishing webtopics reconnect_on_close: Reconnect topic (both publisher & subscriber) on reconnection """ pass
[docs] @abstractmethod def subscribe_topic( self, topic_name: str, message_type: "type[TMessage]", callback: "Callable[[TMessage], None]", compression: Any = None, throttle_rate: int = 0, queue_size: int = 0, reconnect_on_close: bool = True, ) -> TSubscriber: """ Subscribe to the ROS topic with the given name and message_type. Args ---- topic_name: ROS topic the sender publish to, e.g. `/my_topic` message_type: Type of messages the sender will publish, e.g. `std_msgs/String` compression: Type of compression to use, e.g. `png` throttle_rate: Rate (in ms between messages) at which to throttle the topics queue_size: Queue length at bridge side used when subscribing reconnect_on_close: Reconnect topic (both publisher & subscriber) on reconnection """ pass
[docs] @abstractmethod def publish(self, publisher: TPublisher, message: TMessage): """ Publish the given message to the provided publisher. Args ---- publisher: Publisher to use to send the message message: Message to publish to the ROS topics """ pass