import asyncio
import time
from abc import abstractmethod
from typing import Any, Generic, Iterable
from symaware.base import Identifier
from symaware.base import PerceptionSystem as BasePerceptionSystem
from symaware.base import Tasynclooplock, get_logger
from symaware.extra.ros.utils import RosClient, TMessage
[docs]
class PerceptionSystem(BasePerceptionSystem[Tasynclooplock], Generic[Tasynclooplock, TMessage]):
"""
Perception system that interfaces with ROS topics.
It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes
without requiring a full ROS installation on the machine running this script.
It needs the ROS client to be initialised before this component.
Args
----
agent_id:
Identifier of the agent this component belongs to
ros_client:
ROS client to use for the communication
topic:
ROS topic the perception will subscribe to, e.g. `/my_topic`
message_type:
Type of messages the perception expects
callback_at_initialise_timeout:
If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message
from the topic it is subscribed to or the timeout expires.
Useful to coordinate when expecting some mandatory data from another ROS node.
Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop.
compression:
Type of compression to use, e.g. `png`
throttle_rate:
Rate (in ms between messages) at which to throttle the topics
queue_size:
Queue length at bridge side used when subscribing
reconnect_on_close:
Reconnect topic (both publisher & subscriber) on reconnection
async_loop_lock:
Async loop lock to use for the communication receiver
"""
_LOGGER = get_logger(__name__, "PerceptionSystem")
def __init__(
self,
agent_id: Identifier,
ros_client: RosClient,
topic: str,
message_type: "type[TMessage]",
callback_at_initialise_timeout: float = -1.0,
compression: Any = None,
throttle_rate: int = 0,
queue_size: int = 100,
reconnect_on_close: bool = True,
async_loop_lock: "Tasynclooplock | None" = None,
):
super().__init__(agent_id, environment=None, async_loop_lock=async_loop_lock)
self._ros = ros_client
self._topic_name = topic
self._message_type = message_type
self._queue_size = queue_size
self._compression = compression
self._throttle_rate = throttle_rate
self._reconnect_on_close = reconnect_on_close
self._callback_at_initialise_timeout = callback_at_initialise_timeout
self._message_queue: "asyncio.Queue[TMessage]" = asyncio.Queue()
self._message_get_task: "asyncio.Task | None" = None
self._has_received_first_callback: bool = False
self._topic = self._ros.subscribe_topic(
topic_name=self._topic_name,
message_type=self._message_type,
callback=self._on_subscription_callback,
compression=self._compression,
throttle_rate=self._throttle_rate,
queue_size=self._queue_size,
reconnect_on_close=self._reconnect_on_close,
)
@property
def topic(self) -> str:
"""ROS topic the perception system subscribes to."""
return self._topic_name
@property
def message_type(self) -> "type[TMessage]":
"""Message type the perception system expects."""
return self._message_type
[docs]
def initialise_component(self, agent, initial_awareness_database, initial_knowledge_database):
"""Initialise the perception system component, optionally waiting for mandatory data from the topic."""
super().initialise_component(agent, initial_awareness_database, initial_knowledge_database)
if self._callback_at_initialise_timeout > 0:
timeout = self._callback_at_initialise_timeout
self._LOGGER.info("Waiting for mandatory data from topic %s", self._topic_name)
while timeout > 0 and not self._has_received_first_callback:
time.sleep(0.1)
timeout -= 0.1
if timeout < 0:
raise RuntimeError(f"Timeout waiting for mandatory data from topic {self._topic_name}")
self._LOGGER.info("Received mandatory data from topic %s", self._topic_name)
self.compute_and_update()
[docs]
async def async_initialise_component(self, agent, initial_awareness_database, initial_knowledge_database):
super().initialise_component(agent, initial_awareness_database, initial_knowledge_database)
if self._callback_at_initialise_timeout > 0:
self._LOGGER.info("Waiting for mandatory data from topic %s", self._topic_name)
await asyncio.wait_for(self.async_compute_and_update(), self._callback_at_initialise_timeout)
self._LOGGER.info("Received mandatory data from topic %s", self._topic_name)
[docs]
def _on_subscription_callback(self, message: object):
"""Callback that adds messages to the queue in a thread-safe manner."""
self._has_received_first_callback = True
if self._async_loop_lock.loop.is_running():
self._async_loop_lock.loop.call_soon_threadsafe(self._message_queue.put_nowait, message)
else:
self._message_queue.put_nowait(message)
[docs]
def _compute(self) -> "Iterable[TMessage]":
messages = []
while not self._message_queue.empty():
messages.append(self._message_queue.get_nowait())
return messages
[docs]
async def _async_compute(self) -> Iterable[TMessage]:
messages = []
while True:
try:
self._message_get_task = self._async_loop_lock.loop.create_task(self._message_queue.get())
messages.append(await self._message_get_task)
except asyncio.CancelledError:
pass
if self._message_queue.empty():
break
return messages
[docs]
@abstractmethod
def _update(self, perceived_information: "Iterable[TMessage]"):
pass
[docs]
def clear_queue(self):
"""Clear all messages from the perception queue."""
while not self._message_queue.empty():
self._message_queue.get_nowait()
self._message_queue.task_done()
[docs]
async def async_stop(self):
if self._message_get_task is not None:
self._message_get_task.cancel()
return await super().async_stop()