from typing import Any, Generic, Iterable
from symaware.base import Identifier, QueueCommunicationReceiver, Tasynclooplock
from symaware.extra.ros.data import RosMessage
from symaware.extra.ros.utils import RosClient, TMessage
[docs]
class RosCommunicationReceiver(QueueCommunicationReceiver[Tasynclooplock], Generic[Tasynclooplock, TMessage]):
"""
Communication receiver for ROS topics.
It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to communicate with ROS nodes
without requiring a full ROS installation on the machine running this script.
It needs the ROS client to be initialised before this component.
Example
-------
Create a communication receiver for the "/path" topic of type Path
>>> # doctest: +SKIP
>>> from symaware.base import DefaultPerceptionSystem, DefaultRiskEstimator, DefaultUncertaintyEstimator
>>> agent_id = 0
>>> env = Environment(connection_method=p.DIRECT, async_loop_lock=TimeIntervalAsyncLoopLock(0.1))
>>> entity = RacecarEntity(agent_id, model=RacecarModel(agent_id))
>>> agent = Agent[KnowledgeDatabase](agent_id, entity)
>>> env.add_agents(agent)
>>> agent.add_components(
... DefaultController(agent_id, TimeIntervalAsyncLoopLock(0.1)),
... DefaultRiskEstimator(agent_id, TimeIntervalAsyncLoopLock(0.1)),
... DefaultUncertaintyEstimator(agent_id, TimeIntervalAsyncLoopLock(0.1)),
... DefaultPerceptionSystem(agent_id, env, TimeIntervalAsyncLoopLock(0.1)),
... DefaultCommunicationSender(agent_id, TimeIntervalAsyncLoopLock(0.1)),
... DefaultCommunicationReceiver(agent_id, TimeIntervalAsyncLoopLock(0.1)),
... )
>>> agent.initialise_agent(
... initial_awareness_database=AwarenessVector(agent_id, np.zeros(7)),
... initial_knowledge_database={agent_id: KnowledgeDatabase(goal_reached=False, goal_pos=np.zeros(3))},
... )
>>> AgentCoordinator(env, agent).run()
Args
----
agent_id:
Identifier of the agent this component belongs to
topic:
ROS topic the receiver will subscribe to, e.g. `/my_topic`
message_type:
Type of messages the receiver expects
compression:
Type of compression to use, e.g. `png`
throttle_rate:
Rate (in ms between messages) at which to throttle the topics
queue_size:
Queue size created at bridge side for re-publishing webtopics
reconnect_on_close:
Reconnect topic (both publisher & subscriber) on reconnection
async_loop_lock:
Async loop lock to use for the communication receiver
"""
def __init__(
self,
agent_id: Identifier,
ros_client: RosClient,
topic: str,
message_type: "type[TMessage]",
compression: Any = None,
throttle_rate: int = 0,
queue_size: int = 100,
reconnect_on_close: bool = True,
async_loop_lock: "Tasynclooplock | None" = None,
):
super().__init__(agent_id, async_loop_lock)
self._ros = ros_client
self._topic_name = topic
self._message_type = message_type
self._queue_size = queue_size
self._compression = compression
self._throttle_rate = throttle_rate
self._reconnect_on_close = reconnect_on_close
self._topic: object = None
@property
def topic(self) -> str:
"""ROS topic the receiver subscribes to."""
return self._topic_name
@property
def message_type(self) -> "type[TMessage]":
"""Message type the receiver expects."""
return self._message_type
[docs]
def initialise_component(self, agent, initial_awareness_database, initial_knowledge_database):
super().initialise_component(agent, initial_awareness_database, initial_knowledge_database)
self._topic = self._ros.subscribe_topic(
topic_name=self._topic_name,
message_type=self._message_type,
callback=self._on_subscription_callback,
compression=self._compression,
throttle_rate=self._throttle_rate,
queue_size=self._queue_size,
reconnect_on_close=self._reconnect_on_close,
)
[docs]
def _decode_message(self, *messages: RosMessage) -> "Iterable[TMessage]":
return [message.data for message in messages]
[docs]
def _on_subscription_callback(self, message: TMessage):
self._message_queue.put_nowait(RosMessage(sender_id=-1, receiver_id=self._agent_id, data=message))