Source code for symaware.extra.ros.roslib.ros_client

from typing import Any, Callable

import roslibpy

from symaware.extra.ros.utils import RosClient

from .data_structures import RosMessageData


[docs] class RosLibClient(RosClient[RosMessageData, roslibpy.Topic, roslibpy.Topic]): """ Global ROS client used to communicate with the [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) node. It should be initialised once using the `with` construct. This ensures that the client will disconnect gracefully regardless any error. Example ------- Use the RosClient >>> from symaware.extra.ros import RosClient >>> def main(): ... # Add your communication components in here ... pass >>> if __name__ == "__main__": ... with RosClient(host="localhost", port=9090) as ros: ... main(ros) Args ---- host: Name or IP address of the ROS bridge host, e.g. `localhost`, `127.0.0.1`, `192.168.1.2`, `my.works.station` port: ROS bridge port, e.g. 9090 is_secure: Whether to use a secure web sockets connection """ def __init__(self, host: str = "localhost", port: int = 9090, is_secure: bool = False): super().__init__() self._host = host self._port = port self._is_secure = is_secure self._ros: "roslibpy.Ros | None" = None
[docs] def connect(self): """Connect to ROS bridge with connection validation.""" assert self._ros is None, "A ROS client has already been initialised" self._ros = roslibpy.Ros(host=self._host, port=self._port, is_secure=self._is_secure) self._ros.run() assert self._ros.is_connected, "Could not connect to ROS master"
[docs] def disconnect(self): """Disconnect from ROS, unadvertise publishers, and unsubscribe subscribers.""" if self._ros is not None: assert self._ros.is_connected, "ROS not connected" for publisher in self._publishers: publisher.unadvertise() for subscriber in self._subscribers: subscriber.unsubscribe() self._ros.close()
@property def host(self) -> str: """ROS bridge host name or IP address.""" return self._host @property def port(self) -> int: """ROS bridge port number.""" return self._port @property def is_secure(self) -> bool: """Whether using secure websockets.""" return self._is_secure @property def ros(self) -> roslibpy.Ros: """Underlying roslibpy.Ros instance.""" return self._ros @property def is_connected(self): """Whether client is connected to ROS bridge.""" return self._ros is not None and self._ros.is_connected
[docs] def publish_topic( self, topic_name: str, message_type: "type[RosMessageData]", compression: Any = None, latch: bool = False, throttle_rate: int = 0, queue_size: int = 0, reconnect_on_close: bool = True, ) -> roslibpy.Topic: """ Create and advertise a new ROS topic with the given name and message_type. Args ---- topic_name: ROS topic the sender publish to, e.g. `/my_topic` message_type: Type of messages the sender will publish, e.g. `std_msgs/String` compression: Type of compression to use, e.g. `png` latch: True to latch the topic when publishing, False otherwise throttle_rate: Rate (in ms between messages) at which to throttle the topics queue_size: Queue size created at bridge side for re-publishing webtopics reconnect_on_close: Reconnect topic (both publisher & subscriber) on reconnection """ assert self._ros is not None, "ROS not connected" topic = roslibpy.Topic( self._ros, topic_name, message_type.msg_type(), compression=compression, latch=latch, throttle_rate=throttle_rate, queue_size=queue_size, reconnect_on_close=reconnect_on_close, ) topic.advertise() self._publishers.append(topic) return topic
[docs] def subscribe_topic( self, topic_name: str, message_type: "type[RosMessageData]", callback: "Callable[[RosMessageData], None]", compression: Any = None, throttle_rate: int = 0, queue_size: int = 0, reconnect_on_close: bool = True, ): """ Subscribe to the ROS topic with the given name and message_type. Args ---- topic_name: ROS topic the sender publish to, e.g. `/my_topic` message_type: Type of messages the sender will publish, e.g. `std_msgs/String` compression: Type of compression to use, e.g. `png` throttle_rate: Rate (in ms between messages) at which to throttle the topics queue_size: Queue size created at bridge side for re-publishing webtopics queue_size: Queue length at bridge side used when subscribing reconnect_on_close: Reconnect topic (both publisher & subscriber) on reconnection """ assert self._ros is not None, "ROS not connected" topic = roslibpy.Topic( self._ros, topic_name, message_type.msg_type(), compression=compression, throttle_rate=throttle_rate, queue_length=queue_size, reconnect_on_close=reconnect_on_close, ) topic.subscribe(lambda message: callback(message_type.from_dict(message))) self._subscribers.append(topic) return topic
[docs] def publish(self, publisher: roslibpy.Topic, message: RosMessageData): """ Publish the given message to the provided publisher. Args ---- publisher: Publisher to use to send the message message: Message to publish to the ROS topics """ publisher.publish(message.to_dict())