from typing import Any, Callable
import rospy
from symaware.extra.ros.utils import RosClient
[docs]
class Ros1Client(RosClient[object, rospy.Subscriber, rospy.Publisher]):
"""
Global ROS client used to communicate with the [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) node.
It should be initialised once using the `with` construct.
This ensures that the client will disconnect gracefully regardless any error.
Example
-------
Use the RosClient
>>> from symaware.extra.ros import RosClient
>>> def main():
... # Add your communication components in here
... pass
>>> if __name__ == "__main__":
... with RosClient(host="localhost", port=9090) as ros:
... main(ros)
Args
----
host:
Name or IP address of the ROS bridge host, e.g. `localhost`, `127.0.0.1`, `192.168.1.2`, `my.works.station`
port:
ROS bridge port, e.g. 9090
is_secure:
Whether to use a secure web sockets connection
"""
[docs]
def publish_topic(
self,
topic_name: str,
message_type: "type",
compression: Any = None,
latch: bool = False,
throttle_rate: int = 0,
queue_size: int = 0,
reconnect_on_close: bool = True,
) -> "rospy.Publisher":
"""
Create and advertise a new ROS topic with the given name and message_type.
Args
----
topic_name:
ROS topic the sender publish to, e.g. `/my_topic`
message_type:
Type of messages the sender will publish, e.g. `std_msgs/String`
compression:
Type of compression to use, e.g. `png`
latch:
True to latch the topic when publishing, False otherwise
throttle_rate:
Rate (in ms between messages) at which to throttle the topics
queue_size:
Queue size created at bridge side for re-publishing webtopics
reconnect_on_close:
Reconnect topic (both publisher & subscriber) on reconnection
"""
publisher = rospy.Publisher(
name=topic_name,
data_class=message_type,
latch=latch,
queue_size=queue_size,
)
self._publishers.append(publisher)
return publisher
[docs]
def subscribe_topic(
self,
topic_name: str,
message_type: "type",
callback: "Callable[[type], None]",
compression: Any = None,
throttle_rate: int = 0,
queue_size: int = 0,
reconnect_on_close: bool = True,
) -> "rospy.Subscriber":
"""
Subscribe to the ROS topic with the given name and message_type.
Args
----
topic_name:
ROS topic the sender publish to, e.g. `/my_topic`
message_type:
Type of messages the sender will publish, e.g. `std_msgs/String`
compression:
Type of compression to use, e.g. `png`
throttle_rate:
Rate (in ms between messages) at which to throttle the topics
queue_size:
Queue length at bridge side used when subscribing
reconnect_on_close:
Reconnect topic (both publisher & subscriber) on reconnection
"""
subscriber = rospy.Subscriber(
name=topic_name,
data_class=message_type,
callback=callback,
queue_size=queue_size,
)
self._subscribers.append(subscriber)
return subscriber
[docs]
def publish(self, publisher: "rospy.Publisher", message: object):
"""
Publish the given message to the provided publisher.
Args
----
publisher:
Publisher to use to send the message
message:
Message to publish to the ROS topics
"""
publisher.publish(message)