from typing import Any, Callable
import roslibpy
from symaware.extra.ros.utils import RosClient
from .data_structures import RosMessageData
[docs]
class RosLibClient(RosClient[RosMessageData, roslibpy.Topic, roslibpy.Topic]):
"""
Global ROS client used to communicate with the [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) node.
It should be initialised once using the `with` construct.
This ensures that the client will disconnect gracefully regardless any error.
Example
-------
Use the RosClient
>>> from symaware.extra.ros import RosClient
>>> def main():
... # Add your communication components in here
... pass
>>> if __name__ == "__main__":
... with RosClient(host="localhost", port=9090) as ros:
... main(ros)
Args
----
host:
Name or IP address of the ROS bridge host, e.g. `localhost`, `127.0.0.1`, `192.168.1.2`, `my.works.station`
port:
ROS bridge port, e.g. 9090
is_secure:
Whether to use a secure web sockets connection
"""
def __init__(self, host: str = "localhost", port: int = 9090, is_secure: bool = False):
super().__init__()
self._host = host
self._port = port
self._is_secure = is_secure
self._ros: "roslibpy.Ros | None" = None
[docs]
def connect(self):
"""Connect to ROS bridge with connection validation."""
assert self._ros is None, "A ROS client has already been initialised"
self._ros = roslibpy.Ros(host=self._host, port=self._port, is_secure=self._is_secure)
self._ros.run()
assert self._ros.is_connected, "Could not connect to ROS master"
[docs]
def disconnect(self):
"""Disconnect from ROS, unadvertise publishers, and unsubscribe subscribers."""
if self._ros is not None:
assert self._ros.is_connected, "ROS not connected"
for publisher in self._publishers:
publisher.unadvertise()
for subscriber in self._subscribers:
subscriber.unsubscribe()
self._ros.close()
@property
def host(self) -> str:
"""ROS bridge host name or IP address."""
return self._host
@property
def port(self) -> int:
"""ROS bridge port number."""
return self._port
@property
def is_secure(self) -> bool:
"""Whether using secure websockets."""
return self._is_secure
@property
def ros(self) -> roslibpy.Ros:
"""Underlying roslibpy.Ros instance."""
return self._ros
@property
def is_connected(self):
"""Whether client is connected to ROS bridge."""
return self._ros is not None and self._ros.is_connected
[docs]
def publish_topic(
self,
topic_name: str,
message_type: "type[RosMessageData]",
compression: Any = None,
latch: bool = False,
throttle_rate: int = 0,
queue_size: int = 0,
reconnect_on_close: bool = True,
) -> roslibpy.Topic:
"""
Create and advertise a new ROS topic with the given name and message_type.
Args
----
topic_name:
ROS topic the sender publish to, e.g. `/my_topic`
message_type:
Type of messages the sender will publish, e.g. `std_msgs/String`
compression:
Type of compression to use, e.g. `png`
latch:
True to latch the topic when publishing, False otherwise
throttle_rate:
Rate (in ms between messages) at which to throttle the topics
queue_size:
Queue size created at bridge side for re-publishing webtopics
reconnect_on_close:
Reconnect topic (both publisher & subscriber) on reconnection
"""
assert self._ros is not None, "ROS not connected"
topic = roslibpy.Topic(
self._ros,
topic_name,
message_type.msg_type(),
compression=compression,
latch=latch,
throttle_rate=throttle_rate,
queue_size=queue_size,
reconnect_on_close=reconnect_on_close,
)
topic.advertise()
self._publishers.append(topic)
return topic
[docs]
def subscribe_topic(
self,
topic_name: str,
message_type: "type[RosMessageData]",
callback: "Callable[[RosMessageData], None]",
compression: Any = None,
throttle_rate: int = 0,
queue_size: int = 0,
reconnect_on_close: bool = True,
):
"""
Subscribe to the ROS topic with the given name and message_type.
Args
----
topic_name:
ROS topic the sender publish to, e.g. `/my_topic`
message_type:
Type of messages the sender will publish, e.g. `std_msgs/String`
compression:
Type of compression to use, e.g. `png`
throttle_rate:
Rate (in ms between messages) at which to throttle the topics
queue_size:
Queue size created at bridge side for re-publishing webtopics
queue_size:
Queue length at bridge side used when subscribing
reconnect_on_close:
Reconnect topic (both publisher & subscriber) on reconnection
"""
assert self._ros is not None, "ROS not connected"
topic = roslibpy.Topic(
self._ros,
topic_name,
message_type.msg_type(),
compression=compression,
throttle_rate=throttle_rate,
queue_length=queue_size,
reconnect_on_close=reconnect_on_close,
)
topic.subscribe(lambda message: callback(message_type.from_dict(message)))
self._subscribers.append(topic)
return topic
[docs]
def publish(self, publisher: roslibpy.Topic, message: RosMessageData):
"""
Publish the given message to the provided publisher.
Args
----
publisher:
Publisher to use to send the message
message:
Message to publish to the ROS topics
"""
publisher.publish(message.to_dict())