Source code for symaware.extra.ros.ros1.ros_client

from typing import Any, Callable

import rospy

from symaware.extra.ros.utils import RosClient


[docs] class Ros1Client(RosClient[object, rospy.Subscriber, rospy.Publisher]): """ Global ROS client used to communicate with the [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) node. It should be initialised once using the `with` construct. This ensures that the client will disconnect gracefully regardless any error. Example ------- Use the RosClient >>> from symaware.extra.ros import RosClient >>> def main(): ... # Add your communication components in here ... pass >>> if __name__ == "__main__": ... with RosClient(host="localhost", port=9090) as ros: ... main(ros) Args ---- host: Name or IP address of the ROS bridge host, e.g. `localhost`, `127.0.0.1`, `192.168.1.2`, `my.works.station` port: ROS bridge port, e.g. 9090 is_secure: Whether to use a secure web sockets connection """
[docs] def publish_topic( self, topic_name: str, message_type: "type", compression: Any = None, latch: bool = False, throttle_rate: int = 0, queue_size: int = 0, reconnect_on_close: bool = True, ) -> "rospy.Publisher": """ Create and advertise a new ROS topic with the given name and message_type. Args ---- topic_name: ROS topic the sender publish to, e.g. `/my_topic` message_type: Type of messages the sender will publish, e.g. `std_msgs/String` compression: Type of compression to use, e.g. `png` latch: True to latch the topic when publishing, False otherwise throttle_rate: Rate (in ms between messages) at which to throttle the topics queue_size: Queue size created at bridge side for re-publishing webtopics reconnect_on_close: Reconnect topic (both publisher & subscriber) on reconnection """ publisher = rospy.Publisher( name=topic_name, data_class=message_type, latch=latch, queue_size=queue_size, ) self._publishers.append(publisher) return publisher
[docs] def subscribe_topic( self, topic_name: str, message_type: "type", callback: "Callable[[type], None]", compression: Any = None, throttle_rate: int = 0, queue_size: int = 0, reconnect_on_close: bool = True, ) -> "rospy.Subscriber": """ Subscribe to the ROS topic with the given name and message_type. Args ---- topic_name: ROS topic the sender publish to, e.g. `/my_topic` message_type: Type of messages the sender will publish, e.g. `std_msgs/String` compression: Type of compression to use, e.g. `png` throttle_rate: Rate (in ms between messages) at which to throttle the topics queue_size: Queue length at bridge side used when subscribing reconnect_on_close: Reconnect topic (both publisher & subscriber) on reconnection """ subscriber = rospy.Subscriber( name=topic_name, data_class=message_type, callback=callback, queue_size=queue_size, ) self._subscribers.append(subscriber) return subscriber
[docs] def publish(self, publisher: "rospy.Publisher", message: object): """ Publish the given message to the provided publisher. Args ---- publisher: Publisher to use to send the message message: Message to publish to the ROS topics """ publisher.publish(message)