Source code for symaware.extra.ros.components.communication_sender

from typing import Any, Generic

import roslibpy
from symaware.base import CommunicationSender, Identifier, Tasynclooplock

from symaware.extra.ros.data import RosMessage
from symaware.extra.ros.utils import RosClient, TMessage


[docs] class RosCommunicationSender(CommunicationSender[Tasynclooplock], Generic[Tasynclooplock, TMessage]): """ Communication sender for ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to communicate with ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component. Example ------- Create a communication receiver for the "/path" topic of type Path >>> # doctest: +SKIP >>> from symaware.base import DefaultPerceptionSystem, DefaultRiskEstimator, DefaultUncertaintyEstimator >>> agent_id = 0 >>> env = Environment(connection_method=p.DIRECT, async_loop_lock=TimeIntervalAsyncLoopLock(0.1)) >>> entity = RacecarEntity(agent_id, model=RacecarModel(agent_id)) >>> agent = Agent[KnowledgeDatabase](agent_id, entity) >>> env.add_agents(agent) >>> agent.add_components( ... DefaultController(agent_id, TimeIntervalAsyncLoopLock(0.1)), ... DefaultRiskEstimator(agent_id, TimeIntervalAsyncLoopLock(0.1)), ... DefaultUncertaintyEstimator(agent_id, TimeIntervalAsyncLoopLock(0.1)), ... DefaultPerceptionSystem(agent_id, env, TimeIntervalAsyncLoopLock(0.1)), ... DefaultCommunicationSender(agent_id, TimeIntervalAsyncLoopLock(0.1)), ... DefaultCommunicationReceiver(agent_id, TimeIntervalAsyncLoopLock(0.1)), ... ) >>> agent.initialise_agent( ... initial_awareness_database=AwarenessVector(agent_id, np.zeros(7)), ... initial_knowledge_database={agent_id: KnowledgeDatabase(goal_reached=False, goal_pos=np.zeros(3))}, ... ) >>> AgentCoordinator(env, agent).run() Args ---- agent_id: Identifier of the agent this component belongs to topic: ROS topic the sender publish to, e.g. `/my_topic` message_type: Type of messages the sender will publish compression: Type of compression to use, e.g. `png` throttle_rate: Rate (in ms between messages) at which to throttle the topics queue_size: Queue size created at bridge side for re-publishing webtopics latch: True to latch the topic when publishing, False otherwise reconnect_on_close: Reconnect topic (both publisher & subscriber) on reconnection async_loop_lock: Async loop lock to use for the communication sender """ def __init__( self, agent_id: Identifier, ros_client: RosClient, topic: str, message_type: "type[TMessage]", compression: Any = None, latch: bool = False, throttle_rate: int = 0, queue_size: int = 0, reconnect_on_close: bool = True, async_loop_lock: "Tasynclooplock | None" = None, ): super().__init__(agent_id, async_loop_lock) self._ros = ros_client self._topic_name = topic self._message_type = message_type self._queue_size = queue_size self._compression = compression self._latch = latch self._throttle_rate = throttle_rate self._reconnect_on_close = reconnect_on_close self._topic: "roslibpy.Topic | None" = None @property def topic(self) -> str: """ROS topic the sender publishes to.""" return self._topic_name @property def message_type(self) -> "type[TMessage]": """Message type the sender publishes.""" return self._message_type
[docs] def initialise_component(self, agent, initial_awareness_database, initial_knowledge_database): super().initialise_component(agent, initial_awareness_database, initial_knowledge_database) self.initialise_topic()
[docs] def initialise_topic(self): """Create and advertise the ROS topic if not already initialized.""" if self._topic is not None: return self._topic = self._ros.publish_topic( topic_name=self._topic_name, message_type=self._message_type, compression=self._compression, latch=self._latch, throttle_rate=self._throttle_rate, queue_size=self._queue_size, reconnect_on_close=self._reconnect_on_close, )
[docs] def _send_communication_through_channel(self, message: RosMessage): """Send a RosMessage through the ROS topic channel.""" self.publish(message.data)
[docs] def publish(self, message: object): """Publish a message to the ROS topic, converting to dict if needed.""" self._topic.publish(message.to_dict() if hasattr(message, "to_dict") else message)