import asyncio
import time
from typing import Iterable
import numpy as np
from symaware.base import Identifier, Tasynclooplock
from symaware.extra.ros import PerceptionSystem, RosClient
from .data_structures import (
AirSensorOutput,
Bool,
JsonAirSensorOutput,
Odometry,
Path,
PathStamped,
Pose,
PoseArray,
PoseStamped,
PrescanStatus,
Vector3,
)
[docs]
class RosLibPoseStampedPerceptionSystem(PerceptionSystem[Tasynclooplock, PoseStamped]):
"""
Perception system that interfaces with ROS topics.
It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes
without requiring a full ROS installation on the machine running this script.
It needs the ROS client to be initialised before this component.
This perception system is used to perceive the pose stamped information.
The information will be stored in the agent's self state.
Args
----
agent_id:
Identifier of the agent this component belongs to
ros_client:
ROS client to use for the communication
topic:
ROS topic the perception will subscribe to, e.g. `/my_topic`
async_loop_lock:
Async loop lock to use for the communication receiver
"""
def __init__(
self,
agent_id: Identifier,
ros_client: RosClient,
topic: str,
callback_at_initialise_timeout: float = -1.0,
async_loop_lock: "Tasynclooplock | None" = None,
):
super().__init__(
agent_id,
ros_client,
topic,
message_type=PoseStamped,
callback_at_initialise_timeout=callback_at_initialise_timeout,
async_loop_lock=async_loop_lock,
)
[docs]
def _update(self, perceived_information: "Iterable[PoseStamped]"):
for perceived_info in perceived_information:
pose = perceived_info.pose
self._agent.self_state = np.array(
[
pose.position.x,
pose.position.y,
pose.position.z,
pose.orientation.x,
pose.orientation.y,
pose.orientation.z,
pose.orientation.w,
]
)
[docs]
class RosLibPoseArrayPerceptionSystem(PerceptionSystem[Tasynclooplock, PoseArray]):
"""
Perception system that interfaces with ROS topics.
It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes
without requiring a full ROS installation on the machine running this script.
It needs the ROS client to be initialised before this component.
This perception system is used to perceive the odometry information.
The information will be stored in the agent's self state.
Args
----
agent_id:
Identifier of the agent this component belongs to
ros_client:
ROS client to use for the communication
topic:
ROS topic the perception will subscribe to, e.g. `/my_topic`
async_loop_lock:
Async loop lock to use for the communication receiver
"""
def __init__(
self,
agent_id: Identifier,
ros_client: RosClient,
topic: str,
callback_at_initialise_timeout: float = -1.0,
async_loop_lock: "Tasynclooplock | None" = None,
):
super().__init__(
agent_id,
ros_client,
topic,
message_type=PoseArray,
callback_at_initialise_timeout=callback_at_initialise_timeout,
async_loop_lock=async_loop_lock,
)
[docs]
def _update(self, perceived_information: "Iterable[PoseArray]"):
for perceived_info in perceived_information:
for pose in perceived_info.poses:
self._agent.self_state = np.array(
[
pose.position.x,
pose.position.y,
pose.position.z,
pose.orientation.x,
pose.orientation.y,
pose.orientation.z,
pose.orientation.w,
]
)
[docs]
class RosLibOdometryPerceptionSystem(PerceptionSystem[Tasynclooplock, Odometry]):
"""
Perception system that interfaces with ROS topics.
It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes
without requiring a full ROS installation on the machine running this script.
It needs the ROS client to be initialised before this component.
This perception system is used to perceive the odometry information.
The information will be stored in the agent's self state.
Args
----
agent_id:
Identifier of the agent this component belongs to
ros_client:
ROS client to use for the communication
topic:
ROS topic the perception will subscribe to, e.g. `/my_topic`
async_loop_lock:
Async loop lock to use for the communication receiver
"""
def __init__(
self,
agent_id: Identifier,
ros_client: RosClient,
topic: str,
callback_at_initialise_timeout: float = -1.0,
async_loop_lock: "Tasynclooplock | None" = None,
):
super().__init__(
agent_id,
ros_client,
topic,
message_type=Odometry,
callback_at_initialise_timeout=callback_at_initialise_timeout,
async_loop_lock=async_loop_lock,
)
[docs]
def _update(self, perceived_information: "Iterable[Odometry]"):
for perceived_info in perceived_information:
pose = perceived_info.pose.pose
twist = perceived_info.twist.twist
self._agent.self_state = np.array(
(
pose.position.x,
pose.position.y,
pose.position.z,
pose.orientation.x,
pose.orientation.y,
pose.orientation.z,
pose.orientation.w,
twist.linear.x,
twist.linear.y,
twist.linear.z,
)
)
[docs]
class RosLibTrafficSignPerceptionSystem(PerceptionSystem[Tasynclooplock, Odometry]):
"""
Perception system that interfaces with ROS topics.
It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes
without requiring a full ROS installation on the machine running this script.
It needs the ROS client to be initialised before this component.
This perception system is used to perceive the odometry information.
The information will be stored in the agent's self state.
Args
----
agent_id:
Identifier of the agent this component belongs to
ros_client:
ROS client to use for the communication
topic:
ROS topic the perception will subscribe to, e.g. `/my_topic`
async_loop_lock:
Async loop lock to use for the communication receiver
"""
def __init__(
self,
agent_id: Identifier,
ros_client: RosClient,
topic: str,
callback_at_initialise_timeout: float = -1.0,
async_loop_lock: "Tasynclooplock | None" = None,
):
super().__init__(
agent_id,
ros_client,
topic,
message_type=Vector3,
callback_at_initialise_timeout=callback_at_initialise_timeout,
async_loop_lock=async_loop_lock,
)
[docs]
def _update(self, perceived_information: "Iterable[Vector3]"):
for perceived_info in perceived_information:
if perceived_info.z == 0.0:
continue
self._agent.self_knowledge["traffic_sign"][int(perceived_info.z)] = perceived_info
[docs]
class RosLibPosePerceptionSystem(PerceptionSystem[Tasynclooplock, Pose]):
"""
Perception system that interfaces with ROS topics.
It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes
without requiring a full ROS installation on the machine running this script.
It needs the ROS client to be initialised before this component.
This perception system is used to perceive the pose of the agent.
The information will be stored in the agent's self state.
Args
----
agent_id:
Identifier of the agent this component belongs to
ros_client:
ROS client to use for the communication
topic:
ROS topic the perception will subscribe to, e.g. `/my_topic`
callback_at_initialise_timeout:
If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message
from the topic it is subscribed to or the timeout expires.
Useful to coordinate when expecting some mandatory data from another ROS node.
Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop.
async_loop_lock:
Async loop lock to use for the communication receiver
"""
def __init__(
self,
agent_id: Identifier,
ros_client: RosClient,
topic: str,
callback_at_initialise_timeout: float = -1.0,
async_loop_lock: "Tasynclooplock | None" = None,
):
super().__init__(
agent_id,
ros_client,
topic,
message_type=Pose,
callback_at_initialise_timeout=callback_at_initialise_timeout,
async_loop_lock=async_loop_lock,
)
[docs]
def _update(self, perceived_information: "Iterable[Pose]"):
for pose in perceived_information:
self._agent.self_state = np.array(
(
pose.position.x,
pose.position.y,
pose.position.z,
pose.orientation.x,
pose.orientation.y,
pose.orientation.z,
pose.orientation.w,
)
)
[docs]
class RosLibAirSensorOutputPerceptionSystem(PerceptionSystem[Tasynclooplock, AirSensorOutput]):
"""
Perception system that interfaces with ROS topics.
It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes
without requiring a full ROS installation on the machine running this script.
It needs the ROS client to be initialised before this component.
This perception system is used to perceive output of the AIR sensor.
The information will be stored in the agent's knowledge database with the key "air_sensor_output".
Args
----
agent_id:
Identifier of the agent this component belongs to
ros_client:
ROS client to use for the communication
topic:
ROS topic the perception will subscribe to, e.g. `/my_topic`
callback_at_initialise_timeout:
If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message
from the topic it is subscribed to or the timeout expires.
Useful to coordinate when expecting some mandatory data from another ROS node.
Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop.
async_loop_lock:
Async loop lock to use for the communication receiver
"""
def __init__(
self,
agent_id: Identifier,
ros_client: RosClient,
topic: str,
callback_at_initialise_timeout: float = -1.0,
async_loop_lock: "Tasynclooplock | None" = None,
):
super().__init__(
agent_id,
ros_client,
topic,
message_type=AirSensorOutput,
callback_at_initialise_timeout=callback_at_initialise_timeout,
async_loop_lock=async_loop_lock,
)
[docs]
def _update(self, perceived_information: "Iterable[AirSensorOutput]"):
if len(perceived_information) > 0:
self._agent.self_knowledge["air_sensor_output"] = perceived_information[-1]
else:
self._agent.self_knowledge["air_sensor_output"] = None
[docs]
class RosLibJsonAirSensorOutputPerceptionSystem(PerceptionSystem[Tasynclooplock, JsonAirSensorOutput]):
"""
Perception system that interfaces with ROS topics.
It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes
without requiring a full ROS installation on the machine running this script.
It needs the ROS client to be initialised before this component.
This perception system is used to perceive output of the AIR sensor, sent using JSON.
The information will be stored in the agent's knowledge database with the key "air_sensor_output".
Args
----
agent_id:
Identifier of the agent this component belongs to
ros_client:
ROS client to use for the communication
topic:
ROS topic the perception will subscribe to, e.g. `/my_topic`
callback_at_initialise_timeout:
If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message
from the topic it is subscribed to or the timeout expires.
Useful to coordinate when expecting some mandatory data from another ROS node.
Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop.
async_loop_lock:
Async loop lock to use for the communication receiver
"""
def __init__(
self,
agent_id: Identifier,
ros_client: RosClient,
topic: str,
tid: str = "car57",
callback_at_initialise_timeout: float = -1.0,
async_loop_lock: "Tasynclooplock | None" = None,
):
super().__init__(
agent_id,
ros_client,
topic,
message_type=JsonAirSensorOutput,
callback_at_initialise_timeout=callback_at_initialise_timeout,
async_loop_lock=async_loop_lock,
)
self._tid = tid
self._message_queue: "asyncio.LifoQueue[JsonAirSensorOutput]" = asyncio.LifoQueue()
[docs]
def _on_subscription_callback(self, message: JsonAirSensorOutput):
if self._tid != "" and self._tid != message.tid:
return
super()._on_subscription_callback(message)
[docs]
async def _async_compute(self) -> "Iterable[JsonAirSensorOutput]":
messages = []
while True:
try:
self._message_get_task = self._async_loop_lock.loop.create_task(self._message_queue.get())
messages.append(await self._message_get_task)
self.clear_queue()
break
except asyncio.CancelledError:
pass
if self._message_queue.empty():
break
return messages
[docs]
def _update(self, perceived_information: "Iterable[JsonAirSensorOutput]"):
if len(perceived_information) > 0:
self._agent.self_knowledge["air_sensor_output"] = perceived_information[-1]
else:
self._agent.self_knowledge["air_sensor_output"] = None
[docs]
class RosLibPathPerceptionSystem(PerceptionSystem[Tasynclooplock, Path]):
"""
Perception system that interfaces with ROS topics.
It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes
without requiring a full ROS installation on the machine running this script.
It needs the ROS client to be initialised before this component.
This perception system is used to perceive the path for the agent to follow.
The information will be stored in the agent's knowledge database with the key "path".
Args
----
agent_id:
Identifier of the agent this component belongs to
ros_client:
ROS client to use for the communication
topic:
ROS topic the perception will subscribe to, e.g. `/my_topic`
callback_at_initialise_timeout:
If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message
from the topic it is subscribed to or the timeout expires.
Useful to coordinate when expecting some mandatory data from another ROS node.
Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop.
async_loop_lock:
Async loop lock to use for the communication receiver
"""
def __init__(
self,
agent_id: Identifier,
ros_client: RosClient,
topic: str,
callback_at_initialise_timeout: float = -1.0,
async_loop_lock: "Tasynclooplock | None" = None,
):
super().__init__(
agent_id,
ros_client,
topic,
message_type=Path,
callback_at_initialise_timeout=callback_at_initialise_timeout,
async_loop_lock=async_loop_lock,
)
[docs]
def _update(self, perceived_information: "Iterable[Path]"):
for path in perceived_information:
self._agent.self_knowledge["path"] = path
[docs]
class RosLibPathStampedPerceptionSystem(PerceptionSystem[Tasynclooplock, PathStamped]):
"""
Perception system that interfaces with ROS topics.
It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes
without requiring a full ROS installation on the machine running this script.
It needs the ROS client to be initialised before this component.
This perception system is used to perceive the path stamped for the agent to follow.
The information will be stored in the agent's knowledge database with the key "path".
Args
----
agent_id:
Identifier of the agent this component belongs to
ros_client:
ROS client to use for the communication
topic:
ROS topic the perception will subscribe to, e.g. `/my_topic`
callback_at_initialise_timeout:
If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message
from the topic it is subscribed to or the timeout expires.
Useful to coordinate when expecting some mandatory data from another ROS node.
Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop.
async_loop_lock:
Async loop lock to use for the communication receiver
"""
def __init__(
self,
agent_id: Identifier,
ros_client: RosClient,
topic: str,
callback_at_initialise_timeout: float = -1.0,
async_loop_lock: "Tasynclooplock | None" = None,
):
super().__init__(
agent_id,
ros_client,
topic,
message_type=PathStamped,
callback_at_initialise_timeout=callback_at_initialise_timeout,
async_loop_lock=async_loop_lock,
)
[docs]
def _update(self, perceived_information: "Iterable[PathStamped]"):
for path in perceived_information:
self._agent.self_knowledge["path"] = path
[docs]
class RosLibPathsStampedPerceptionSystem(PerceptionSystem[Tasynclooplock, PathStamped]):
"""
Perception system that interfaces with ROS topics.
It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes
without requiring a full ROS installation on the machine running this script.
It needs the ROS client to be initialised before this component.
This perception system is used to perceive the path stamped for the agent to follow.
The information will be stored in the agent's knowledge database with the key "path".
Args
----
agent_id:
Identifier of the agent this component belongs to
ros_client:
ROS client to use for the communication
topic:
ROS topic the perception will subscribe to, e.g. `/my_topic`
callback_at_initialise_timeout:
If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message
from the topic it is subscribed to or the timeout expires.
Useful to coordinate when expecting some mandatory data from another ROS node.
Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop.
async_loop_lock:
Async loop lock to use for the communication receiver
"""
def __init__(
self,
agent_id: Identifier,
ros_client: RosClient,
topic: str,
callback_at_initialise_timeout: float = -1.0,
async_loop_lock: "Tasynclooplock | None" = None,
):
super().__init__(
agent_id,
ros_client,
topic,
message_type=PathStamped,
callback_at_initialise_timeout=callback_at_initialise_timeout,
async_loop_lock=async_loop_lock,
)
[docs]
def _update(self, perceived_information: "Iterable[PathStamped]"):
for path in perceived_information:
self._agent.self_knowledge.setdefault("paths", []).append(path)
[docs]
class RosLibPrescanStatusPerceptionSystem(PerceptionSystem[Tasynclooplock, PrescanStatus]):
"""
Perception system that interfaces with ROS topics.
It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes
without requiring a full ROS installation on the machine running this script.
It needs the ROS client to be initialised before this component.
This perception system is used to receive the state of Prescan, to know when the simulation is ready.
The information will be stored in the agent's knowledge database with the key "prescan_status".
Args
----
agent_id:
Identifier of the agent this component belongs to
ros_client:
ROS client to use for the communication
topic:
ROS topic the perception will subscribe to, e.g. `/my_topic`
callback_at_initialise_timeout:
If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message
from the topic it is subscribed to or the timeout expires.
Useful to coordinate when expecting some mandatory data from another ROS node.
Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop.
async_loop_lock:
Async loop lock to use for the communication receiver
"""
def __init__(
self,
agent_id: Identifier,
ros_client: RosClient,
topic: str,
callback_at_initialise_timeout: float = -1.0,
async_loop_lock: "Tasynclooplock | None" = None,
):
super().__init__(
agent_id,
ros_client,
topic,
message_type=PrescanStatus,
callback_at_initialise_timeout=callback_at_initialise_timeout,
async_loop_lock=async_loop_lock,
)
[docs]
def _update(self, perceived_information: "Iterable[PrescanStatus]"):
for status in perceived_information:
self._agent.self_knowledge["prescan_status"] = status
[docs]
class RosLibPrescanReadyPerceptionSystem(PerceptionSystem[Tasynclooplock, Bool]):
"""
Perception system that interfaces with ROS topics.
It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes
without requiring a full ROS installation on the machine running this script.
It needs the ROS client to be initialised before this component.
This perception system is used to receive the flag from Prescan, to know when the simulation is ready.
The information will be stored in the agent's knowledge database with the key "prescan_status".
Args
----
agent_id:
Identifier of the agent this component belongs to
ros_client:
ROS client to use for the communication
topic:
ROS topic the perception will subscribe to, e.g. `/my_topic`
callback_at_initialise_timeout:
If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message
from the topic it is subscribed to or the timeout expires.
Useful to coordinate when expecting some mandatory data from another ROS node.
Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop.
async_loop_lock:
Async loop lock to use for the communication receiver
"""
def __init__(
self,
agent_id: Identifier,
ros_client: RosClient,
topic: str,
callback_at_initialise_timeout: float = -1.0,
sleep_after_status: float = 2.0,
async_loop_lock: "Tasynclooplock | None" = None,
):
super().__init__(
agent_id,
ros_client,
topic,
message_type=Bool,
callback_at_initialise_timeout=callback_at_initialise_timeout,
async_loop_lock=async_loop_lock,
)
self.__sleep_after_status = sleep_after_status
[docs]
def _on_subscription_callback(self, message: object):
if self._has_received_first_callback:
return
super()._on_subscription_callback(message)
[docs]
def _update(self, perceived_information: "Iterable[Bool]"):
if (
len(perceived_information) > 0
and perceived_information[-1]
and not self._agent.self_knowledge.get("prescan_status", False)
):
time.sleep(self.__sleep_after_status)
for status in perceived_information:
self._agent.self_knowledge["prescan_status"] = status