from abc import ABC, abstractmethod
from typing import Any, Callable, Generic, TypeVar
TSubscriber = TypeVar("TSubscriber")
TPublisher = TypeVar("TPublisher")
TMessage = TypeVar("TMessage")
[docs]
class RosClient(ABC, Generic[TMessage, TSubscriber, TPublisher]):
"""
Interface for ROS clients used to communicate with a ROS network.
The subclasses specialise the implementation in order to use different ROS versions.
Those include `roslibpy` for [ROS bridge](http://wiki.ros.org/rosbridge_suite),
`rospy` for [ROS1 Python client library](http://wiki.ros.org/rospy)
and `rclpy` for [ROS2 Python client library](https://github.com/ros2/rclpy).
Using this class allows to abstract the communication with the ROS network,
making it easier to switch between different ROS versions.
Warning
-------
The message types used in the topics must be compatible with the ROS version used.
Use the message types from the libraries if you are using `rospy` or `rclpy`.
Use the message types provided by this package if you are using `roslibpy`.
"""
def __init__(self):
"""Initialise the ROS client with empty publisher and subscriber lists."""
self._publishers: "list[TPublisher]" = []
self._subscribers: "list[TPublisher]" = []
def __enter__(self):
"""Context manager entry, connects to ROS."""
self.connect()
return self
def __exit__(self, exc_type, exc_value, traceback):
"""Context manager exit, disconnects from ROS."""
self.disconnect()
[docs]
def connect(self):
"""Connect to the ROS network. Override in subclasses."""
pass
[docs]
def disconnect(self):
"""Disconnect from the ROS network. Override in subclasses."""
pass
@property
def publishers(self):
"""List of created publishers."""
return self._publishers
@property
def subscribers(self):
"""List of created subscribers."""
return self._subscribers
[docs]
@abstractmethod
def publish_topic(
self,
topic_name: str,
message_type: "type[TMessage]",
compression: Any = None,
latch: bool = False,
throttle_rate: int = 0,
queue_size: int = 0,
reconnect_on_close: bool = True,
) -> TPublisher:
"""
Create and advertise a new ROS topic with the given name and message_type.
Args
----
topic_name:
ROS topic the sender publish to, e.g. `/my_topic`
message_type:
Type of messages the sender will publish, e.g. `std_msgs/String`
compression:
Type of compression to use, e.g. `png`
latch:
True to latch the topic when publishing, False otherwise
throttle_rate:
Rate (in ms between messages) at which to throttle the topics
queue_size:
Queue size created at bridge side for re-publishing webtopics
reconnect_on_close:
Reconnect topic (both publisher & subscriber) on reconnection
"""
pass
[docs]
@abstractmethod
def subscribe_topic(
self,
topic_name: str,
message_type: "type[TMessage]",
callback: "Callable[[TMessage], None]",
compression: Any = None,
throttle_rate: int = 0,
queue_size: int = 0,
reconnect_on_close: bool = True,
) -> TSubscriber:
"""
Subscribe to the ROS topic with the given name and message_type.
Args
----
topic_name:
ROS topic the sender publish to, e.g. `/my_topic`
message_type:
Type of messages the sender will publish, e.g. `std_msgs/String`
compression:
Type of compression to use, e.g. `png`
throttle_rate:
Rate (in ms between messages) at which to throttle the topics
queue_size:
Queue length at bridge side used when subscribing
reconnect_on_close:
Reconnect topic (both publisher & subscriber) on reconnection
"""
pass
[docs]
@abstractmethod
def publish(self, publisher: TPublisher, message: TMessage):
"""
Publish the given message to the provided publisher.
Args
----
publisher:
Publisher to use to send the message
message:
Message to publish to the ROS topics
"""
pass