Source code for symaware.extra.ros.components.communication_receiver

from typing import Any, Generic, Iterable

from symaware.base import Identifier, QueueCommunicationReceiver, Tasynclooplock

from symaware.extra.ros.data import RosMessage
from symaware.extra.ros.utils import RosClient, TMessage


[docs] class RosCommunicationReceiver(QueueCommunicationReceiver[Tasynclooplock], Generic[Tasynclooplock, TMessage]): """ Communication receiver for ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to communicate with ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component. Example ------- Create a communication receiver for the "/path" topic of type Path >>> # doctest: +SKIP >>> from symaware.base import DefaultPerceptionSystem, DefaultRiskEstimator, DefaultUncertaintyEstimator >>> agent_id = 0 >>> env = Environment(connection_method=p.DIRECT, async_loop_lock=TimeIntervalAsyncLoopLock(0.1)) >>> entity = RacecarEntity(agent_id, model=RacecarModel(agent_id)) >>> agent = Agent[KnowledgeDatabase](agent_id, entity) >>> env.add_agents(agent) >>> agent.add_components( ... DefaultController(agent_id, TimeIntervalAsyncLoopLock(0.1)), ... DefaultRiskEstimator(agent_id, TimeIntervalAsyncLoopLock(0.1)), ... DefaultUncertaintyEstimator(agent_id, TimeIntervalAsyncLoopLock(0.1)), ... DefaultPerceptionSystem(agent_id, env, TimeIntervalAsyncLoopLock(0.1)), ... DefaultCommunicationSender(agent_id, TimeIntervalAsyncLoopLock(0.1)), ... DefaultCommunicationReceiver(agent_id, TimeIntervalAsyncLoopLock(0.1)), ... ) >>> agent.initialise_agent( ... initial_awareness_database=AwarenessVector(agent_id, np.zeros(7)), ... initial_knowledge_database={agent_id: KnowledgeDatabase(goal_reached=False, goal_pos=np.zeros(3))}, ... ) >>> AgentCoordinator(env, agent).run() Args ---- agent_id: Identifier of the agent this component belongs to topic: ROS topic the receiver will subscribe to, e.g. `/my_topic` message_type: Type of messages the receiver expects compression: Type of compression to use, e.g. `png` throttle_rate: Rate (in ms between messages) at which to throttle the topics queue_size: Queue size created at bridge side for re-publishing webtopics reconnect_on_close: Reconnect topic (both publisher & subscriber) on reconnection async_loop_lock: Async loop lock to use for the communication receiver """ def __init__( self, agent_id: Identifier, ros_client: RosClient, topic: str, message_type: "type[TMessage]", compression: Any = None, throttle_rate: int = 0, queue_size: int = 100, reconnect_on_close: bool = True, async_loop_lock: "Tasynclooplock | None" = None, ): super().__init__(agent_id, async_loop_lock) self._ros = ros_client self._topic_name = topic self._message_type = message_type self._queue_size = queue_size self._compression = compression self._throttle_rate = throttle_rate self._reconnect_on_close = reconnect_on_close self._topic: object = None @property def topic(self) -> str: """ROS topic the receiver subscribes to.""" return self._topic_name @property def message_type(self) -> "type[TMessage]": """Message type the receiver expects.""" return self._message_type
[docs] def initialise_component(self, agent, initial_awareness_database, initial_knowledge_database): super().initialise_component(agent, initial_awareness_database, initial_knowledge_database) self._topic = self._ros.subscribe_topic( topic_name=self._topic_name, message_type=self._message_type, callback=self._on_subscription_callback, compression=self._compression, throttle_rate=self._throttle_rate, queue_size=self._queue_size, reconnect_on_close=self._reconnect_on_close, )
[docs] def _decode_message(self, *messages: RosMessage) -> "Iterable[TMessage]": return [message.data for message in messages]
[docs] def _on_subscription_callback(self, message: TMessage): self._message_queue.put_nowait(RosMessage(sender_id=-1, receiver_id=self._agent_id, data=message))