Source code for symaware.extra.ros.components.perception_system

import asyncio
import time
from abc import abstractmethod
from typing import Any, Generic, Iterable

from symaware.base import Identifier
from symaware.base import PerceptionSystem as BasePerceptionSystem
from symaware.base import Tasynclooplock, get_logger

from symaware.extra.ros.utils import RosClient, TMessage


[docs] class PerceptionSystem(BasePerceptionSystem[Tasynclooplock], Generic[Tasynclooplock, TMessage]): """ Perception system that interfaces with ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component. Args ---- agent_id: Identifier of the agent this component belongs to ros_client: ROS client to use for the communication topic: ROS topic the perception will subscribe to, e.g. `/my_topic` message_type: Type of messages the perception expects callback_at_initialise_timeout: If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message from the topic it is subscribed to or the timeout expires. Useful to coordinate when expecting some mandatory data from another ROS node. Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop. compression: Type of compression to use, e.g. `png` throttle_rate: Rate (in ms between messages) at which to throttle the topics queue_size: Queue length at bridge side used when subscribing reconnect_on_close: Reconnect topic (both publisher & subscriber) on reconnection async_loop_lock: Async loop lock to use for the communication receiver """ _LOGGER = get_logger(__name__, "PerceptionSystem") def __init__( self, agent_id: Identifier, ros_client: RosClient, topic: str, message_type: "type[TMessage]", callback_at_initialise_timeout: float = -1.0, compression: Any = None, throttle_rate: int = 0, queue_size: int = 100, reconnect_on_close: bool = True, async_loop_lock: "Tasynclooplock | None" = None, ): super().__init__(agent_id, environment=None, async_loop_lock=async_loop_lock) self._ros = ros_client self._topic_name = topic self._message_type = message_type self._queue_size = queue_size self._compression = compression self._throttle_rate = throttle_rate self._reconnect_on_close = reconnect_on_close self._callback_at_initialise_timeout = callback_at_initialise_timeout self._message_queue: "asyncio.Queue[TMessage]" = asyncio.Queue() self._message_get_task: "asyncio.Task | None" = None self._has_received_first_callback: bool = False self._topic = self._ros.subscribe_topic( topic_name=self._topic_name, message_type=self._message_type, callback=self._on_subscription_callback, compression=self._compression, throttle_rate=self._throttle_rate, queue_size=self._queue_size, reconnect_on_close=self._reconnect_on_close, ) @property def topic(self) -> str: """ROS topic the perception system subscribes to.""" return self._topic_name @property def message_type(self) -> "type[TMessage]": """Message type the perception system expects.""" return self._message_type
[docs] def initialise_component(self, agent, initial_awareness_database, initial_knowledge_database): """Initialise the perception system component, optionally waiting for mandatory data from the topic.""" super().initialise_component(agent, initial_awareness_database, initial_knowledge_database) if self._callback_at_initialise_timeout > 0: timeout = self._callback_at_initialise_timeout self._LOGGER.info("Waiting for mandatory data from topic %s", self._topic_name) while timeout > 0 and not self._has_received_first_callback: time.sleep(0.1) timeout -= 0.1 if timeout < 0: raise RuntimeError(f"Timeout waiting for mandatory data from topic {self._topic_name}") self._LOGGER.info("Received mandatory data from topic %s", self._topic_name) self.compute_and_update()
[docs] async def async_initialise_component(self, agent, initial_awareness_database, initial_knowledge_database): super().initialise_component(agent, initial_awareness_database, initial_knowledge_database) if self._callback_at_initialise_timeout > 0: self._LOGGER.info("Waiting for mandatory data from topic %s", self._topic_name) await asyncio.wait_for(self.async_compute_and_update(), self._callback_at_initialise_timeout) self._LOGGER.info("Received mandatory data from topic %s", self._topic_name)
[docs] def _on_subscription_callback(self, message: object): """Callback that adds messages to the queue in a thread-safe manner.""" self._has_received_first_callback = True if self._async_loop_lock.loop.is_running(): self._async_loop_lock.loop.call_soon_threadsafe(self._message_queue.put_nowait, message) else: self._message_queue.put_nowait(message)
[docs] def _compute(self) -> "Iterable[TMessage]": messages = [] while not self._message_queue.empty(): messages.append(self._message_queue.get_nowait()) return messages
[docs] async def _async_compute(self) -> Iterable[TMessage]: messages = [] while True: try: self._message_get_task = self._async_loop_lock.loop.create_task(self._message_queue.get()) messages.append(await self._message_get_task) except asyncio.CancelledError: pass if self._message_queue.empty(): break return messages
[docs] @abstractmethod def _update(self, perceived_information: "Iterable[TMessage]"): pass
[docs] def clear_queue(self): """Clear all messages from the perception queue.""" while not self._message_queue.empty(): self._message_queue.get_nowait() self._message_queue.task_done()
[docs] async def async_stop(self): if self._message_get_task is not None: self._message_get_task.cancel() return await super().async_stop()