from typing import Any, Generic
import roslibpy
from symaware.base import CommunicationSender, Identifier, Tasynclooplock
from symaware.extra.ros.data import RosMessage
from symaware.extra.ros.utils import RosClient, TMessage
[docs]
class RosCommunicationSender(CommunicationSender[Tasynclooplock], Generic[Tasynclooplock, TMessage]):
"""
Communication sender for ROS topics.
It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to communicate with ROS nodes
without requiring a full ROS installation on the machine running this script.
It needs the ROS client to be initialised before this component.
Example
-------
Create a communication receiver for the "/path" topic of type Path
>>> # doctest: +SKIP
>>> from symaware.base import DefaultPerceptionSystem, DefaultRiskEstimator, DefaultUncertaintyEstimator
>>> agent_id = 0
>>> env = Environment(connection_method=p.DIRECT, async_loop_lock=TimeIntervalAsyncLoopLock(0.1))
>>> entity = RacecarEntity(agent_id, model=RacecarModel(agent_id))
>>> agent = Agent[KnowledgeDatabase](agent_id, entity)
>>> env.add_agents(agent)
>>> agent.add_components(
... DefaultController(agent_id, TimeIntervalAsyncLoopLock(0.1)),
... DefaultRiskEstimator(agent_id, TimeIntervalAsyncLoopLock(0.1)),
... DefaultUncertaintyEstimator(agent_id, TimeIntervalAsyncLoopLock(0.1)),
... DefaultPerceptionSystem(agent_id, env, TimeIntervalAsyncLoopLock(0.1)),
... DefaultCommunicationSender(agent_id, TimeIntervalAsyncLoopLock(0.1)),
... DefaultCommunicationReceiver(agent_id, TimeIntervalAsyncLoopLock(0.1)),
... )
>>> agent.initialise_agent(
... initial_awareness_database=AwarenessVector(agent_id, np.zeros(7)),
... initial_knowledge_database={agent_id: KnowledgeDatabase(goal_reached=False, goal_pos=np.zeros(3))},
... )
>>> AgentCoordinator(env, agent).run()
Args
----
agent_id:
Identifier of the agent this component belongs to
topic:
ROS topic the sender publish to, e.g. `/my_topic`
message_type:
Type of messages the sender will publish
compression:
Type of compression to use, e.g. `png`
throttle_rate:
Rate (in ms between messages) at which to throttle the topics
queue_size:
Queue size created at bridge side for re-publishing webtopics
latch:
True to latch the topic when publishing, False otherwise
reconnect_on_close:
Reconnect topic (both publisher & subscriber) on reconnection
async_loop_lock:
Async loop lock to use for the communication sender
"""
def __init__(
self,
agent_id: Identifier,
ros_client: RosClient,
topic: str,
message_type: "type[TMessage]",
compression: Any = None,
latch: bool = False,
throttle_rate: int = 0,
queue_size: int = 0,
reconnect_on_close: bool = True,
async_loop_lock: "Tasynclooplock | None" = None,
):
super().__init__(agent_id, async_loop_lock)
self._ros = ros_client
self._topic_name = topic
self._message_type = message_type
self._queue_size = queue_size
self._compression = compression
self._latch = latch
self._throttle_rate = throttle_rate
self._reconnect_on_close = reconnect_on_close
self._topic: "roslibpy.Topic | None" = None
@property
def topic(self) -> str:
"""ROS topic the sender publishes to."""
return self._topic_name
@property
def message_type(self) -> "type[TMessage]":
"""Message type the sender publishes."""
return self._message_type
[docs]
def initialise_component(self, agent, initial_awareness_database, initial_knowledge_database):
super().initialise_component(agent, initial_awareness_database, initial_knowledge_database)
self.initialise_topic()
[docs]
def initialise_topic(self):
"""Create and advertise the ROS topic if not already initialized."""
if self._topic is not None:
return
self._topic = self._ros.publish_topic(
topic_name=self._topic_name,
message_type=self._message_type,
compression=self._compression,
latch=self._latch,
throttle_rate=self._throttle_rate,
queue_size=self._queue_size,
reconnect_on_close=self._reconnect_on_close,
)
[docs]
def _send_communication_through_channel(self, message: RosMessage):
"""Send a RosMessage through the ROS topic channel."""
self.publish(message.data)
[docs]
def publish(self, message: object):
"""Publish a message to the ROS topic, converting to dict if needed."""
self._topic.publish(message.to_dict() if hasattr(message, "to_dict") else message)