symaware.extra.ros.ros1 package

Submodules

symaware.extra.ros.ros1.components module

class symaware.extra.ros.ros1.components.Ros1AirSensorOutputPerceptionSystem(agent_id, ros_client, topic, callback_at_initialise_timeout=-1.0, async_loop_lock=None)[source]

Bases: PerceptionSystem[Tasynclooplock, AirSensorOutput]

Perception system that interfaces with ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component.

This perception system is used to perceive output of the AIR sensor. The information will be stored in the agent’s knowledge database with the key “air_sensor_output”.

Parameters:
  • agent_id (int) – Identifier of the agent this component belongs to

  • ros_client (Ros1Client) – ROS client to use for the communication

  • topic (str) – ROS topic the perception will subscribe to, e.g. /my_topic

  • callback_at_initialise_timeout (float, default: -1.0) – If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message from the topic it is subscribed to or the timeout expires. Useful to coordinate when expecting some mandatory data from another ROS node. Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop.

  • async_loop_lock (Optional[TypeVar(Tasynclooplock, bound= AsyncLoopLock)], default: None) – Async loop lock to use for the communication receiver

_update(perceived_information)[source]

Update the agent’s model with the new perceived information.

Example

A new perception system could decide to override the default _update() method.

>>> from symaware.base import PerceptionSystem, StateObservation, Identifier
>>> class MyPerceptionSystem(PerceptionSystem):
...     def _update(self, perceived_information: "dict[Identifier, StateObservation]"):
...         # Your implementation here
...         # Example:
...         # Simply override the state of the agent
...         self._agent.awareness_database[self._agent_id].state = perceived_information[self._agent_id].state
Parameters:

perceived_information (Iterable[AirSensorOutput]) – Information perceived by the agent. Each agent’s state is identified by its id.

class symaware.extra.ros.ros1.components.Ros1JsonAirSensorOutputPerceptionSystem(agent_id, ros_client, topic, callback_at_initialise_timeout=-1.0, async_loop_lock=None)[source]

Bases: PerceptionSystem[Tasynclooplock, JsonAirSensorOutput]

Perception system that interfaces with ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component.

This perception system is used to perceive output of the AIR sensor, sent using JSON. The information will be stored in the agent’s knowledge database with the key “air_sensor_output”.

Parameters:
  • agent_id (int) – Identifier of the agent this component belongs to

  • ros_client (Ros1Client) – ROS client to use for the communication

  • topic (str) – ROS topic the perception will subscribe to, e.g. /my_topic

  • callback_at_initialise_timeout (float, default: -1.0) – If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message from the topic it is subscribed to or the timeout expires. Useful to coordinate when expecting some mandatory data from another ROS node. Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop.

  • async_loop_lock (Optional[TypeVar(Tasynclooplock, bound= AsyncLoopLock)], default: None) – Async loop lock to use for the communication receiver

_update(perceived_information)[source]

Update the agent’s model with the new perceived information.

Example

A new perception system could decide to override the default _update() method.

>>> from symaware.base import PerceptionSystem, StateObservation, Identifier
>>> class MyPerceptionSystem(PerceptionSystem):
...     def _update(self, perceived_information: "dict[Identifier, StateObservation]"):
...         # Your implementation here
...         # Example:
...         # Simply override the state of the agent
...         self._agent.awareness_database[self._agent_id].state = perceived_information[self._agent_id].state
Parameters:

perceived_information (Iterable[JsonAirSensorOutput]) – Information perceived by the agent. Each agent’s state is identified by its id.

class symaware.extra.ros.ros1.components.Ros1OdometryPerceptionSystem(agent_id, ros_client, topic, callback_at_initialise_timeout=-1.0, async_loop_lock=None)[source]

Bases: PerceptionSystem[Tasynclooplock, Odometry]

Perception system that interfaces with ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component.

This perception system is used to perceive the odometry information. The information will be stored in the agent’s self state.

Parameters:
  • agent_id (int) – Identifier of the agent this component belongs to

  • ros_client (Ros1Client) – ROS client to use for the communication

  • topic (str) – ROS topic the perception will subscribe to, e.g. /my_topic

  • async_loop_lock (Optional[TypeVar(Tasynclooplock, bound= AsyncLoopLock)], default: None) – Async loop lock to use for the communication receiver

_update(perceived_information)[source]

Update the agent’s model with the new perceived information.

Example

A new perception system could decide to override the default _update() method.

>>> from symaware.base import PerceptionSystem, StateObservation, Identifier
>>> class MyPerceptionSystem(PerceptionSystem):
...     def _update(self, perceived_information: "dict[Identifier, StateObservation]"):
...         # Your implementation here
...         # Example:
...         # Simply override the state of the agent
...         self._agent.awareness_database[self._agent_id].state = perceived_information[self._agent_id].state
Parameters:

perceived_information (Iterable[Odometry]) – Information perceived by the agent. Each agent’s state is identified by its id.

class symaware.extra.ros.ros1.components.Ros1PathPerceptionSystem(agent_id, ros_client, topic, callback_at_initialise_timeout=-1.0, async_loop_lock=None)[source]

Bases: PerceptionSystem[Tasynclooplock, Path]

Perception system that interfaces with ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component.

This perception system is used to perceive the path for the agent to follow. The information will be stored in the agent’s knowledge database with the key “path”.

Parameters:
  • agent_id (int) – Identifier of the agent this component belongs to

  • ros_client (Ros1Client) – ROS client to use for the communication

  • topic (str) – ROS topic the perception will subscribe to, e.g. /my_topic

  • callback_at_initialise_timeout (float, default: -1.0) – If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message from the topic it is subscribed to or the timeout expires. Useful to coordinate when expecting some mandatory data from another ROS node. Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop.

  • async_loop_lock (Optional[TypeVar(Tasynclooplock, bound= AsyncLoopLock)], default: None) – Async loop lock to use for the communication receiver

_update(perceived_information)[source]

Update the agent’s model with the new perceived information.

Example

A new perception system could decide to override the default _update() method.

>>> from symaware.base import PerceptionSystem, StateObservation, Identifier
>>> class MyPerceptionSystem(PerceptionSystem):
...     def _update(self, perceived_information: "dict[Identifier, StateObservation]"):
...         # Your implementation here
...         # Example:
...         # Simply override the state of the agent
...         self._agent.awareness_database[self._agent_id].state = perceived_information[self._agent_id].state
Parameters:

perceived_information (Iterable[Path]) – Information perceived by the agent. Each agent’s state is identified by its id.

class symaware.extra.ros.ros1.components.Ros1PathStampedPerceptionSystem(agent_id, ros_client, topic, callback_at_initialise_timeout=-1.0, async_loop_lock=None)[source]

Bases: PerceptionSystem[Tasynclooplock, PathStamped]

Perception system that interfaces with ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component.

This perception system is used to perceive the path stamped for the agent to follow. The information will be stored in the agent’s knowledge database with the key “path”.

Parameters:
  • agent_id (int) – Identifier of the agent this component belongs to

  • ros_client (Ros1Client) – ROS client to use for the communication

  • topic (str) – ROS topic the perception will subscribe to, e.g. /my_topic

  • callback_at_initialise_timeout (float, default: -1.0) – If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message from the topic it is subscribed to or the timeout expires. Useful to coordinate when expecting some mandatory data from another ROS node. Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop.

  • async_loop_lock (Optional[TypeVar(Tasynclooplock, bound= AsyncLoopLock)], default: None) – Async loop lock to use for the communication receiver

_update(perceived_information)[source]

Update the agent’s model with the new perceived information.

Example

A new perception system could decide to override the default _update() method.

>>> from symaware.base import PerceptionSystem, StateObservation, Identifier
>>> class MyPerceptionSystem(PerceptionSystem):
...     def _update(self, perceived_information: "dict[Identifier, StateObservation]"):
...         # Your implementation here
...         # Example:
...         # Simply override the state of the agent
...         self._agent.awareness_database[self._agent_id].state = perceived_information[self._agent_id].state
Parameters:

perceived_information (Iterable[PathStamped]) – Information perceived by the agent. Each agent’s state is identified by its id.

class symaware.extra.ros.ros1.components.Ros1PosePerceptionSystem(agent_id, ros_client, topic, callback_at_initialise_timeout=-1.0, async_loop_lock=None)[source]

Bases: PerceptionSystem[Tasynclooplock, Pose]

Perception system that interfaces with ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component.

This perception system is used to perceive the pose of the agent. The information will be stored in the agent’s self state.

Parameters:
  • agent_id (int) – Identifier of the agent this component belongs to

  • ros_client (Ros1Client) – ROS client to use for the communication

  • topic (str) – ROS topic the perception will subscribe to, e.g. /my_topic

  • callback_at_initialise_timeout (float, default: -1.0) – If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message from the topic it is subscribed to or the timeout expires. Useful to coordinate when expecting some mandatory data from another ROS node. Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop.

  • async_loop_lock (Optional[TypeVar(Tasynclooplock, bound= AsyncLoopLock)], default: None) – Async loop lock to use for the communication receiver

_update(perceived_information)[source]

Update the agent’s model with the new perceived information.

Example

A new perception system could decide to override the default _update() method.

>>> from symaware.base import PerceptionSystem, StateObservation, Identifier
>>> class MyPerceptionSystem(PerceptionSystem):
...     def _update(self, perceived_information: "dict[Identifier, StateObservation]"):
...         # Your implementation here
...         # Example:
...         # Simply override the state of the agent
...         self._agent.awareness_database[self._agent_id].state = perceived_information[self._agent_id].state
Parameters:

perceived_information (Iterable[Pose]) – Information perceived by the agent. Each agent’s state is identified by its id.

class symaware.extra.ros.ros1.components.Ros1PoseStampedPerceptionSystem(agent_id, ros_client, topic, callback_at_initialise_timeout=-1.0, async_loop_lock=None)[source]

Bases: PerceptionSystem[Tasynclooplock, PoseStamped]

Perception system that interfaces with ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component.

This perception system is used to perceive the pose stamped information. The information will be stored in the agent’s self state.

Parameters:
  • agent_id (int) – Identifier of the agent this component belongs to

  • ros_client (Ros1Client) – ROS client to use for the communication

  • topic (str) – ROS topic the perception will subscribe to, e.g. /my_topic

  • async_loop_lock (Optional[TypeVar(Tasynclooplock, bound= AsyncLoopLock)], default: None) – Async loop lock to use for the communication receiver

_update(perceived_information)[source]

Update the agent’s model with the new perceived information.

Example

A new perception system could decide to override the default _update() method.

>>> from symaware.base import PerceptionSystem, StateObservation, Identifier
>>> class MyPerceptionSystem(PerceptionSystem):
...     def _update(self, perceived_information: "dict[Identifier, StateObservation]"):
...         # Your implementation here
...         # Example:
...         # Simply override the state of the agent
...         self._agent.awareness_database[self._agent_id].state = perceived_information[self._agent_id].state
Parameters:

perceived_information (Iterable[PoseStamped]) – Information perceived by the agent. Each agent’s state is identified by its id.

class symaware.extra.ros.ros1.components.Ros1PrescanReadyPerceptionSystem(agent_id, ros_client, topic, callback_at_initialise_timeout=-1.0, async_loop_lock=None)[source]

Bases: PerceptionSystem[Tasynclooplock, Bool]

Perception system that interfaces with ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component.

This perception system is used to receive the flag from Prescan, to know when the simulation is ready. The information will be stored in the agent’s knowledge database with the key “prescan_status”.

Parameters:
  • agent_id (int) – Identifier of the agent this component belongs to

  • ros_client (Ros1Client) – ROS client to use for the communication

  • topic (str) – ROS topic the perception will subscribe to, e.g. /my_topic

  • callback_at_initialise_timeout (float, default: -1.0) – If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message from the topic it is subscribed to or the timeout expires. Useful to coordinate when expecting some mandatory data from another ROS node. Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop.

  • async_loop_lock (Optional[TypeVar(Tasynclooplock, bound= AsyncLoopLock)], default: None) – Async loop lock to use for the communication receiver

_update(perceived_information)[source]

Update the agent’s model with the new perceived information.

Example

A new perception system could decide to override the default _update() method.

>>> from symaware.base import PerceptionSystem, StateObservation, Identifier
>>> class MyPerceptionSystem(PerceptionSystem):
...     def _update(self, perceived_information: "dict[Identifier, StateObservation]"):
...         # Your implementation here
...         # Example:
...         # Simply override the state of the agent
...         self._agent.awareness_database[self._agent_id].state = perceived_information[self._agent_id].state
Parameters:

perceived_information (Iterable[Bool]) – Information perceived by the agent. Each agent’s state is identified by its id.

class symaware.extra.ros.ros1.components.Ros1PrescanStatusPerceptionSystem(agent_id, ros_client, topic, callback_at_initialise_timeout=-1.0, async_loop_lock=None)[source]

Bases: PerceptionSystem[Tasynclooplock, PrescanStatus]

Perception system that interfaces with ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component.

This perception system is used to receive the state of Prescan, to know when the simulation is ready. The information will be stored in the agent’s knowledge database with the key “prescan_status”.

Parameters:
  • agent_id (int) – Identifier of the agent this component belongs to

  • ros_client (Ros1Client) – ROS client to use for the communication

  • topic (str) – ROS topic the perception will subscribe to, e.g. /my_topic

  • callback_at_initialise_timeout (float, default: -1.0) – If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message from the topic it is subscribed to or the timeout expires. Useful to coordinate when expecting some mandatory data from another ROS node. Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop.

  • async_loop_lock (Optional[TypeVar(Tasynclooplock, bound= AsyncLoopLock)], default: None) – Async loop lock to use for the communication receiver

_update(perceived_information)[source]

Update the agent’s model with the new perceived information.

Example

A new perception system could decide to override the default _update() method.

>>> from symaware.base import PerceptionSystem, StateObservation, Identifier
>>> class MyPerceptionSystem(PerceptionSystem):
...     def _update(self, perceived_information: "dict[Identifier, StateObservation]"):
...         # Your implementation here
...         # Example:
...         # Simply override the state of the agent
...         self._agent.awareness_database[self._agent_id].state = perceived_information[self._agent_id].state
Parameters:

perceived_information (Iterable[PrescanStatus]) – Information perceived by the agent. Each agent’s state is identified by its id.

symaware.extra.ros.ros1.data_structures module

class symaware.extra.ros.ros1.data_structures.Ros1Message(sender_id, receiver_id, data)[source]

Bases: Message

Simple class to store messages between agents.

Parameters:
  • sender_id (int) – Id of the sender agent

  • receiver_id (int) – Id of the receiver agent

  • data (object) – ROS1 message

data: object

symaware.extra.ros.ros1.models module

class symaware.extra.ros.ros1.models.Ros1AckermannDriveModel(ID, topic='', compression=None, latch=False, throttle_rate=0, queue_size=0, reconnect_on_close=True)[source]

Bases: AckermannDriveModel[AckermannDriveStamped]

Dynamical model using the AckermannDrive type messages as a control input.

step()[source]

Run a simulation step of the dynamical model. Called by the associated entity at each simulation step.

Parameters:
  • args – Additional arguments

  • kwargs – Additional keyword arguments

class symaware.extra.ros.ros1.models.Ros1TwistDriveModel(ID, topic='', compression=None, latch=False, throttle_rate=0, queue_size=0, reconnect_on_close=True)[source]

Bases: TwistDriveModel[Twist]

Dynamical model using the twist type messages as a control input.

step()[source]

Run a simulation step of the dynamical model. Called by the associated entity at each simulation step.

Parameters:
  • args – Additional arguments

  • kwargs – Additional keyword arguments

symaware.extra.ros.ros1.ros_client module

class symaware.extra.ros.ros1.ros_client.Ros1Client[source]

Bases: RosClient[object, Subscriber, Publisher]

Global ROS client used to communicate with the [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) node. It should be initialised once using the with construct. This ensures that the client will disconnect gracefully regardless any error.

Example

Use the RosClient

>>> from symaware.extra.ros import RosClient
>>> def main():
...     # Add your communication components in here
...     pass
>>> if __name__ == "__main__":
...     with RosClient(host="localhost", port=9090) as ros:
...         main(ros)
Parameters:
  • host – Name or IP address of the ROS bridge host, e.g. localhost, 127.0.0.1, 192.168.1.2, my.works.station

  • port – ROS bridge port, e.g. 9090

  • is_secure – Whether to use a secure web sockets connection

publish(publisher, message)[source]

Publish the given message to the provided publisher.

Parameters:
  • publisher (Publisher) – Publisher to use to send the message

  • message (object) – Message to publish to the ROS topics

publish_topic(topic_name, message_type, compression=None, latch=False, throttle_rate=0, queue_size=0, reconnect_on_close=True)[source]

Create and advertise a new ROS topic with the given name and message_type.

Parameters:
  • topic_name (str) – ROS topic the sender publish to, e.g. /my_topic

  • message_type (type) – Type of messages the sender will publish, e.g. std_msgs/String

  • compression (Any, default: None) – Type of compression to use, e.g. png

  • latch (bool, default: False) – True to latch the topic when publishing, False otherwise

  • throttle_rate (int, default: 0) – Rate (in ms between messages) at which to throttle the topics

  • queue_size (int, default: 0) – Queue size created at bridge side for re-publishing webtopics

  • reconnect_on_close (bool, default: True) – Reconnect topic (both publisher & subscriber) on reconnection

Return type:

Publisher

subscribe_topic(topic_name, message_type, callback, compression=None, throttle_rate=0, queue_size=0, reconnect_on_close=True)[source]

Subscribe to the ROS topic with the given name and message_type.

Parameters:
  • topic_name (str) – ROS topic the sender publish to, e.g. /my_topic

  • message_type (type) – Type of messages the sender will publish, e.g. std_msgs/String

  • compression (Any, default: None) – Type of compression to use, e.g. png

  • throttle_rate (int, default: 0) – Rate (in ms between messages) at which to throttle the topics

  • queue_size (int, default: 0) – Queue length at bridge side used when subscribing

  • reconnect_on_close (bool, default: True) – Reconnect topic (both publisher & subscriber) on reconnection

Return type:

Subscriber

Module contents