from typing import Iterable
import numpy as np
from symaware.base import Identifier, Tasynclooplock
from symaware.extra.ros import PerceptionSystem
from .ros_client import Ros1Client
from rospy import PoseStamped, Odometry, Pose, Path, PathStamped, Bool, PrescanStatus, AirSensorOutput, JsonAirSensorOutput
[docs]
class Ros1PoseStampedPerceptionSystem(PerceptionSystem[Tasynclooplock, PoseStamped]):
"""
Perception system that interfaces with ROS topics.
It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes
without requiring a full ROS installation on the machine running this script.
It needs the ROS client to be initialised before this component.
This perception system is used to perceive the pose stamped information.
The information will be stored in the agent's self state.
Args
----
agent_id:
Identifier of the agent this component belongs to
ros_client:
ROS client to use for the communication
topic:
ROS topic the perception will subscribe to, e.g. `/my_topic`
async_loop_lock:
Async loop lock to use for the communication receiver
"""
def __init__(
self,
agent_id: Identifier,
ros_client: Ros1Client,
topic: str,
callback_at_initialise_timeout: float = -1.0,
async_loop_lock: "Tasynclooplock | None" = None,
):
super().__init__(
agent_id,
ros_client,
topic,
message_type=PoseStamped,
callback_at_initialise_timeout=callback_at_initialise_timeout,
async_loop_lock=async_loop_lock,
)
[docs]
def _update(self, perceived_information: "Iterable[PoseStamped]"):
for perceived_info in perceived_information:
pose = perceived_info.pose
self._agent.self_state = np.array(
[
pose.position.x,
pose.position.y,
pose.position.z,
pose.orientation.x,
pose.orientation.y,
pose.orientation.z,
pose.orientation.w,
]
)
[docs]
class Ros1OdometryPerceptionSystem(PerceptionSystem[Tasynclooplock, Odometry]):
"""
Perception system that interfaces with ROS topics.
It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes
without requiring a full ROS installation on the machine running this script.
It needs the ROS client to be initialised before this component.
This perception system is used to perceive the odometry information.
The information will be stored in the agent's self state.
Args
----
agent_id:
Identifier of the agent this component belongs to
ros_client:
ROS client to use for the communication
topic:
ROS topic the perception will subscribe to, e.g. `/my_topic`
async_loop_lock:
Async loop lock to use for the communication receiver
"""
def __init__(
self,
agent_id: Identifier,
ros_client: Ros1Client,
topic: str,
callback_at_initialise_timeout: float = -1.0,
async_loop_lock: "Tasynclooplock | None" = None,
):
super().__init__(
agent_id,
ros_client,
topic,
message_type=Odometry,
callback_at_initialise_timeout=callback_at_initialise_timeout,
async_loop_lock=async_loop_lock,
)
[docs]
def _update(self, perceived_information: "Iterable[Odometry]"):
for perceived_info in perceived_information:
pose = perceived_info.pose.pose
twist = perceived_info.twist.twist
self._agent.self_state = np.array(
(
pose.position.x,
pose.position.y,
pose.position.z,
pose.orientation.x,
pose.orientation.y,
pose.orientation.z,
pose.orientation.w,
twist.linear.x,
twist.linear.y,
twist.linear.z,
)
)
[docs]
class Ros1PosePerceptionSystem(PerceptionSystem[Tasynclooplock, Pose]):
"""
Perception system that interfaces with ROS topics.
It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes
without requiring a full ROS installation on the machine running this script.
It needs the ROS client to be initialised before this component.
This perception system is used to perceive the pose of the agent.
The information will be stored in the agent's self state.
Args
----
agent_id:
Identifier of the agent this component belongs to
ros_client:
ROS client to use for the communication
topic:
ROS topic the perception will subscribe to, e.g. `/my_topic`
callback_at_initialise_timeout:
If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message
from the topic it is subscribed to or the timeout expires.
Useful to coordinate when expecting some mandatory data from another ROS node.
Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop.
async_loop_lock:
Async loop lock to use for the communication receiver
"""
def __init__(
self,
agent_id: Identifier,
ros_client: Ros1Client,
topic: str,
callback_at_initialise_timeout: float = -1.0,
async_loop_lock: "Tasynclooplock | None" = None,
):
super().__init__(
agent_id,
ros_client,
topic,
message_type=Pose,
callback_at_initialise_timeout=callback_at_initialise_timeout,
async_loop_lock=async_loop_lock,
)
[docs]
def _update(self, perceived_information: "Iterable[Pose]"):
for pose in perceived_information:
self._agent.self_state = np.array(
(
pose.position.x,
pose.position.y,
pose.position.z,
pose.orientation.x,
pose.orientation.y,
pose.orientation.z,
pose.orientation.w,
)
)
[docs]
class Ros1AirSensorOutputPerceptionSystem(PerceptionSystem[Tasynclooplock, AirSensorOutput]):
"""
Perception system that interfaces with ROS topics.
It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes
without requiring a full ROS installation on the machine running this script.
It needs the ROS client to be initialised before this component.
This perception system is used to perceive output of the AIR sensor.
The information will be stored in the agent's knowledge database with the key "air_sensor_output".
Args
----
agent_id:
Identifier of the agent this component belongs to
ros_client:
ROS client to use for the communication
topic:
ROS topic the perception will subscribe to, e.g. `/my_topic`
callback_at_initialise_timeout:
If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message
from the topic it is subscribed to or the timeout expires.
Useful to coordinate when expecting some mandatory data from another ROS node.
Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop.
async_loop_lock:
Async loop lock to use for the communication receiver
"""
def __init__(
self,
agent_id: Identifier,
ros_client: Ros1Client,
topic: str,
callback_at_initialise_timeout: float = -1.0,
async_loop_lock: "Tasynclooplock | None" = None,
):
super().__init__(
agent_id,
ros_client,
topic,
message_type=AirSensorOutput,
callback_at_initialise_timeout=callback_at_initialise_timeout,
async_loop_lock=async_loop_lock,
)
[docs]
def _update(self, perceived_information: "Iterable[AirSensorOutput]"):
for air_sensor_output in perceived_information:
self._agent.self_knowledge["air_sensor_output"] = air_sensor_output
[docs]
class Ros1JsonAirSensorOutputPerceptionSystem(PerceptionSystem[Tasynclooplock, JsonAirSensorOutput]):
"""
Perception system that interfaces with ROS topics.
It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes
without requiring a full ROS installation on the machine running this script.
It needs the ROS client to be initialised before this component.
This perception system is used to perceive output of the AIR sensor, sent using JSON.
The information will be stored in the agent's knowledge database with the key "air_sensor_output".
Args
----
agent_id:
Identifier of the agent this component belongs to
ros_client:
ROS client to use for the communication
topic:
ROS topic the perception will subscribe to, e.g. `/my_topic`
callback_at_initialise_timeout:
If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message
from the topic it is subscribed to or the timeout expires.
Useful to coordinate when expecting some mandatory data from another ROS node.
Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop.
async_loop_lock:
Async loop lock to use for the communication receiver
"""
def __init__(
self,
agent_id: Identifier,
ros_client: Ros1Client,
topic: str,
callback_at_initialise_timeout: float = -1.0,
async_loop_lock: "Tasynclooplock | None" = None,
):
super().__init__(
agent_id,
ros_client,
topic,
message_type=JsonAirSensorOutput,
callback_at_initialise_timeout=callback_at_initialise_timeout,
async_loop_lock=async_loop_lock,
)
[docs]
def _update(self, perceived_information: "Iterable[JsonAirSensorOutput]"):
for air_sensor_output in perceived_information:
self._agent.self_knowledge["air_sensor_output"] = air_sensor_output
[docs]
class Ros1PathPerceptionSystem(PerceptionSystem[Tasynclooplock, Path]):
"""
Perception system that interfaces with ROS topics.
It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes
without requiring a full ROS installation on the machine running this script.
It needs the ROS client to be initialised before this component.
This perception system is used to perceive the path for the agent to follow.
The information will be stored in the agent's knowledge database with the key "path".
Args
----
agent_id:
Identifier of the agent this component belongs to
ros_client:
ROS client to use for the communication
topic:
ROS topic the perception will subscribe to, e.g. `/my_topic`
callback_at_initialise_timeout:
If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message
from the topic it is subscribed to or the timeout expires.
Useful to coordinate when expecting some mandatory data from another ROS node.
Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop.
async_loop_lock:
Async loop lock to use for the communication receiver
"""
def __init__(
self,
agent_id: Identifier,
ros_client: Ros1Client,
topic: str,
callback_at_initialise_timeout: float = -1.0,
async_loop_lock: "Tasynclooplock | None" = None,
):
super().__init__(
agent_id,
ros_client,
topic,
message_type=Path,
callback_at_initialise_timeout=callback_at_initialise_timeout,
async_loop_lock=async_loop_lock,
)
[docs]
def _update(self, perceived_information: "Iterable[Path]"):
for path in perceived_information:
self._agent.self_knowledge["path"] = path
[docs]
class Ros1PathStampedPerceptionSystem(PerceptionSystem[Tasynclooplock, PathStamped]):
"""
Perception system that interfaces with ROS topics.
It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes
without requiring a full ROS installation on the machine running this script.
It needs the ROS client to be initialised before this component.
This perception system is used to perceive the path stamped for the agent to follow.
The information will be stored in the agent's knowledge database with the key "path".
Args
----
agent_id:
Identifier of the agent this component belongs to
ros_client:
ROS client to use for the communication
topic:
ROS topic the perception will subscribe to, e.g. `/my_topic`
callback_at_initialise_timeout:
If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message
from the topic it is subscribed to or the timeout expires.
Useful to coordinate when expecting some mandatory data from another ROS node.
Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop.
async_loop_lock:
Async loop lock to use for the communication receiver
"""
def __init__(
self,
agent_id: Identifier,
ros_client: Ros1Client,
topic: str,
callback_at_initialise_timeout: float = -1.0,
async_loop_lock: "Tasynclooplock | None" = None,
):
super().__init__(
agent_id,
ros_client,
topic,
message_type=PathStamped,
callback_at_initialise_timeout=callback_at_initialise_timeout,
async_loop_lock=async_loop_lock,
)
[docs]
def _update(self, perceived_information: "Iterable[PathStamped]"):
for path in perceived_information:
self._agent.self_knowledge["path"] = path.path
[docs]
class Ros1PrescanStatusPerceptionSystem(PerceptionSystem[Tasynclooplock, PrescanStatus]):
"""
Perception system that interfaces with ROS topics.
It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes
without requiring a full ROS installation on the machine running this script.
It needs the ROS client to be initialised before this component.
This perception system is used to receive the state of Prescan, to know when the simulation is ready.
The information will be stored in the agent's knowledge database with the key "prescan_status".
Args
----
agent_id:
Identifier of the agent this component belongs to
ros_client:
ROS client to use for the communication
topic:
ROS topic the perception will subscribe to, e.g. `/my_topic`
callback_at_initialise_timeout:
If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message
from the topic it is subscribed to or the timeout expires.
Useful to coordinate when expecting some mandatory data from another ROS node.
Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop.
async_loop_lock:
Async loop lock to use for the communication receiver
"""
def __init__(
self,
agent_id: Identifier,
ros_client: Ros1Client,
topic: str,
callback_at_initialise_timeout: float = -1.0,
async_loop_lock: "Tasynclooplock | None" = None,
):
super().__init__(
agent_id,
ros_client,
topic,
message_type=PrescanStatus,
callback_at_initialise_timeout=callback_at_initialise_timeout,
async_loop_lock=async_loop_lock,
)
[docs]
def _update(self, perceived_information: "Iterable[PrescanStatus]"):
for status in perceived_information:
self._agent.self_knowledge["prescan_status"] = status
[docs]
class Ros1PrescanReadyPerceptionSystem(PerceptionSystem[Tasynclooplock, Bool]):
"""
Perception system that interfaces with ROS topics.
It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes
without requiring a full ROS installation on the machine running this script.
It needs the ROS client to be initialised before this component.
This perception system is used to receive the flag from Prescan, to know when the simulation is ready.
The information will be stored in the agent's knowledge database with the key "prescan_status".
Args
----
agent_id:
Identifier of the agent this component belongs to
ros_client:
ROS client to use for the communication
topic:
ROS topic the perception will subscribe to, e.g. `/my_topic`
callback_at_initialise_timeout:
If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message
from the topic it is subscribed to or the timeout expires.
Useful to coordinate when expecting some mandatory data from another ROS node.
Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop.
async_loop_lock:
Async loop lock to use for the communication receiver
"""
def __init__(
self,
agent_id: Identifier,
ros_client: Ros1Client,
topic: str,
callback_at_initialise_timeout: float = -1.0,
async_loop_lock: "Tasynclooplock | None" = None,
):
super().__init__(
agent_id,
ros_client,
topic,
message_type=Bool,
callback_at_initialise_timeout=callback_at_initialise_timeout,
async_loop_lock=async_loop_lock,
)
[docs]
def _update(self, perceived_information: "Iterable[Bool]"):
for status in perceived_information:
self._agent.self_knowledge["prescan_status"] = status