symaware.extra.ros.roslib package

Submodules

symaware.extra.ros.roslib.components module

class symaware.extra.ros.roslib.components.RosLibAirSensorOutputPerceptionSystem(agent_id, ros_client, topic, callback_at_initialise_timeout=-1.0, async_loop_lock=None)[source]

Bases: PerceptionSystem[Tasynclooplock, AirSensorOutput]

Perception system that interfaces with ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component.

This perception system is used to perceive output of the AIR sensor. The information will be stored in the agent’s knowledge database with the key “air_sensor_output”.

Parameters:
  • agent_id (int) – Identifier of the agent this component belongs to

  • ros_client (RosClient) – ROS client to use for the communication

  • topic (str) – ROS topic the perception will subscribe to, e.g. /my_topic

  • callback_at_initialise_timeout (float, default: -1.0) – If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message from the topic it is subscribed to or the timeout expires. Useful to coordinate when expecting some mandatory data from another ROS node. Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop.

  • async_loop_lock (Optional[TypeVar(Tasynclooplock, bound= AsyncLoopLock)], default: None) – Async loop lock to use for the communication receiver

_update(perceived_information)[source]

Update the agent’s model with the new perceived information.

Example

A new perception system could decide to override the default _update() method.

>>> from symaware.base import PerceptionSystem, StateObservation, Identifier
>>> class MyPerceptionSystem(PerceptionSystem):
...     def _update(self, perceived_information: "dict[Identifier, StateObservation]"):
...         # Your implementation here
...         # Example:
...         # Simply override the state of the agent
...         self._agent.awareness_database[self._agent_id].state = perceived_information[self._agent_id].state
Parameters:

perceived_information (Iterable[AirSensorOutput]) – Information perceived by the agent. Each agent’s state is identified by its id.

class symaware.extra.ros.roslib.components.RosLibJsonAirSensorOutputPerceptionSystem(agent_id, ros_client, topic, tid='car57', callback_at_initialise_timeout=-1.0, async_loop_lock=None)[source]

Bases: PerceptionSystem[Tasynclooplock, JsonAirSensorOutput]

Perception system that interfaces with ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component.

This perception system is used to perceive output of the AIR sensor, sent using JSON. The information will be stored in the agent’s knowledge database with the key “air_sensor_output”.

Parameters:
  • agent_id (int) – Identifier of the agent this component belongs to

  • ros_client (RosClient) – ROS client to use for the communication

  • topic (str) – ROS topic the perception will subscribe to, e.g. /my_topic

  • callback_at_initialise_timeout (float, default: -1.0) – If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message from the topic it is subscribed to or the timeout expires. Useful to coordinate when expecting some mandatory data from another ROS node. Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop.

  • async_loop_lock (Optional[TypeVar(Tasynclooplock, bound= AsyncLoopLock)], default: None) – Async loop lock to use for the communication receiver

async _async_compute()[source]

Given the state of the agent, compute a value or a set of values asynchronously.

Note

Check _compute() for more information about the method and AsyncLoopLockable for more information about the async loop.

Parameters:
  • args – Additional arguments

  • kwargs – Additional keyword arguments

Returns:

Iterable[JsonAirSensorOutput] – Computed value or set of values

_on_subscription_callback(message)[source]

Callback that adds messages to the queue in a thread-safe manner.

_update(perceived_information)[source]

Update the agent’s model with the new perceived information.

Example

A new perception system could decide to override the default _update() method.

>>> from symaware.base import PerceptionSystem, StateObservation, Identifier
>>> class MyPerceptionSystem(PerceptionSystem):
...     def _update(self, perceived_information: "dict[Identifier, StateObservation]"):
...         # Your implementation here
...         # Example:
...         # Simply override the state of the agent
...         self._agent.awareness_database[self._agent_id].state = perceived_information[self._agent_id].state
Parameters:

perceived_information (Iterable[JsonAirSensorOutput]) – Information perceived by the agent. Each agent’s state is identified by its id.

class symaware.extra.ros.roslib.components.RosLibOdometryPerceptionSystem(agent_id, ros_client, topic, callback_at_initialise_timeout=-1.0, async_loop_lock=None)[source]

Bases: PerceptionSystem[Tasynclooplock, Odometry]

Perception system that interfaces with ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component.

This perception system is used to perceive the odometry information. The information will be stored in the agent’s self state.

Parameters:
  • agent_id (int) – Identifier of the agent this component belongs to

  • ros_client (RosClient) – ROS client to use for the communication

  • topic (str) – ROS topic the perception will subscribe to, e.g. /my_topic

  • async_loop_lock (Optional[TypeVar(Tasynclooplock, bound= AsyncLoopLock)], default: None) – Async loop lock to use for the communication receiver

_update(perceived_information)[source]

Update the agent’s model with the new perceived information.

Example

A new perception system could decide to override the default _update() method.

>>> from symaware.base import PerceptionSystem, StateObservation, Identifier
>>> class MyPerceptionSystem(PerceptionSystem):
...     def _update(self, perceived_information: "dict[Identifier, StateObservation]"):
...         # Your implementation here
...         # Example:
...         # Simply override the state of the agent
...         self._agent.awareness_database[self._agent_id].state = perceived_information[self._agent_id].state
Parameters:

perceived_information (Iterable[Odometry]) – Information perceived by the agent. Each agent’s state is identified by its id.

class symaware.extra.ros.roslib.components.RosLibPathPerceptionSystem(agent_id, ros_client, topic, callback_at_initialise_timeout=-1.0, async_loop_lock=None)[source]

Bases: PerceptionSystem[Tasynclooplock, Path]

Perception system that interfaces with ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component.

This perception system is used to perceive the path for the agent to follow. The information will be stored in the agent’s knowledge database with the key “path”.

Parameters:
  • agent_id (int) – Identifier of the agent this component belongs to

  • ros_client (RosClient) – ROS client to use for the communication

  • topic (str) – ROS topic the perception will subscribe to, e.g. /my_topic

  • callback_at_initialise_timeout (float, default: -1.0) – If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message from the topic it is subscribed to or the timeout expires. Useful to coordinate when expecting some mandatory data from another ROS node. Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop.

  • async_loop_lock (Optional[TypeVar(Tasynclooplock, bound= AsyncLoopLock)], default: None) – Async loop lock to use for the communication receiver

_update(perceived_information)[source]

Update the agent’s model with the new perceived information.

Example

A new perception system could decide to override the default _update() method.

>>> from symaware.base import PerceptionSystem, StateObservation, Identifier
>>> class MyPerceptionSystem(PerceptionSystem):
...     def _update(self, perceived_information: "dict[Identifier, StateObservation]"):
...         # Your implementation here
...         # Example:
...         # Simply override the state of the agent
...         self._agent.awareness_database[self._agent_id].state = perceived_information[self._agent_id].state
Parameters:

perceived_information (Iterable[Path]) – Information perceived by the agent. Each agent’s state is identified by its id.

class symaware.extra.ros.roslib.components.RosLibPathStampedPerceptionSystem(agent_id, ros_client, topic, callback_at_initialise_timeout=-1.0, async_loop_lock=None)[source]

Bases: PerceptionSystem[Tasynclooplock, PathStamped]

Perception system that interfaces with ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component.

This perception system is used to perceive the path stamped for the agent to follow. The information will be stored in the agent’s knowledge database with the key “path”.

Parameters:
  • agent_id (int) – Identifier of the agent this component belongs to

  • ros_client (RosClient) – ROS client to use for the communication

  • topic (str) – ROS topic the perception will subscribe to, e.g. /my_topic

  • callback_at_initialise_timeout (float, default: -1.0) – If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message from the topic it is subscribed to or the timeout expires. Useful to coordinate when expecting some mandatory data from another ROS node. Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop.

  • async_loop_lock (Optional[TypeVar(Tasynclooplock, bound= AsyncLoopLock)], default: None) – Async loop lock to use for the communication receiver

_update(perceived_information)[source]

Update the agent’s model with the new perceived information.

Example

A new perception system could decide to override the default _update() method.

>>> from symaware.base import PerceptionSystem, StateObservation, Identifier
>>> class MyPerceptionSystem(PerceptionSystem):
...     def _update(self, perceived_information: "dict[Identifier, StateObservation]"):
...         # Your implementation here
...         # Example:
...         # Simply override the state of the agent
...         self._agent.awareness_database[self._agent_id].state = perceived_information[self._agent_id].state
Parameters:

perceived_information (Iterable[PathStamped]) – Information perceived by the agent. Each agent’s state is identified by its id.

class symaware.extra.ros.roslib.components.RosLibPathsStampedPerceptionSystem(agent_id, ros_client, topic, callback_at_initialise_timeout=-1.0, async_loop_lock=None)[source]

Bases: PerceptionSystem[Tasynclooplock, PathStamped]

Perception system that interfaces with ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component.

This perception system is used to perceive the path stamped for the agent to follow. The information will be stored in the agent’s knowledge database with the key “path”.

Parameters:
  • agent_id (int) – Identifier of the agent this component belongs to

  • ros_client (RosClient) – ROS client to use for the communication

  • topic (str) – ROS topic the perception will subscribe to, e.g. /my_topic

  • callback_at_initialise_timeout (float, default: -1.0) – If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message from the topic it is subscribed to or the timeout expires. Useful to coordinate when expecting some mandatory data from another ROS node. Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop.

  • async_loop_lock (Optional[TypeVar(Tasynclooplock, bound= AsyncLoopLock)], default: None) – Async loop lock to use for the communication receiver

_update(perceived_information)[source]

Update the agent’s model with the new perceived information.

Example

A new perception system could decide to override the default _update() method.

>>> from symaware.base import PerceptionSystem, StateObservation, Identifier
>>> class MyPerceptionSystem(PerceptionSystem):
...     def _update(self, perceived_information: "dict[Identifier, StateObservation]"):
...         # Your implementation here
...         # Example:
...         # Simply override the state of the agent
...         self._agent.awareness_database[self._agent_id].state = perceived_information[self._agent_id].state
Parameters:

perceived_information (Iterable[PathStamped]) – Information perceived by the agent. Each agent’s state is identified by its id.

class symaware.extra.ros.roslib.components.RosLibPoseArrayPerceptionSystem(agent_id, ros_client, topic, callback_at_initialise_timeout=-1.0, async_loop_lock=None)[source]

Bases: PerceptionSystem[Tasynclooplock, PoseArray]

Perception system that interfaces with ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component.

This perception system is used to perceive the odometry information. The information will be stored in the agent’s self state.

Parameters:
  • agent_id (int) – Identifier of the agent this component belongs to

  • ros_client (RosClient) – ROS client to use for the communication

  • topic (str) – ROS topic the perception will subscribe to, e.g. /my_topic

  • async_loop_lock (Optional[TypeVar(Tasynclooplock, bound= AsyncLoopLock)], default: None) – Async loop lock to use for the communication receiver

_update(perceived_information)[source]

Update the agent’s model with the new perceived information.

Example

A new perception system could decide to override the default _update() method.

>>> from symaware.base import PerceptionSystem, StateObservation, Identifier
>>> class MyPerceptionSystem(PerceptionSystem):
...     def _update(self, perceived_information: "dict[Identifier, StateObservation]"):
...         # Your implementation here
...         # Example:
...         # Simply override the state of the agent
...         self._agent.awareness_database[self._agent_id].state = perceived_information[self._agent_id].state
Parameters:

perceived_information (Iterable[PoseArray]) – Information perceived by the agent. Each agent’s state is identified by its id.

class symaware.extra.ros.roslib.components.RosLibPosePerceptionSystem(agent_id, ros_client, topic, callback_at_initialise_timeout=-1.0, async_loop_lock=None)[source]

Bases: PerceptionSystem[Tasynclooplock, Pose]

Perception system that interfaces with ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component.

This perception system is used to perceive the pose of the agent. The information will be stored in the agent’s self state.

Parameters:
  • agent_id (int) – Identifier of the agent this component belongs to

  • ros_client (RosClient) – ROS client to use for the communication

  • topic (str) – ROS topic the perception will subscribe to, e.g. /my_topic

  • callback_at_initialise_timeout (float, default: -1.0) – If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message from the topic it is subscribed to or the timeout expires. Useful to coordinate when expecting some mandatory data from another ROS node. Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop.

  • async_loop_lock (Optional[TypeVar(Tasynclooplock, bound= AsyncLoopLock)], default: None) – Async loop lock to use for the communication receiver

_update(perceived_information)[source]

Update the agent’s model with the new perceived information.

Example

A new perception system could decide to override the default _update() method.

>>> from symaware.base import PerceptionSystem, StateObservation, Identifier
>>> class MyPerceptionSystem(PerceptionSystem):
...     def _update(self, perceived_information: "dict[Identifier, StateObservation]"):
...         # Your implementation here
...         # Example:
...         # Simply override the state of the agent
...         self._agent.awareness_database[self._agent_id].state = perceived_information[self._agent_id].state
Parameters:

perceived_information (Iterable[Pose]) – Information perceived by the agent. Each agent’s state is identified by its id.

class symaware.extra.ros.roslib.components.RosLibPoseStampedPerceptionSystem(agent_id, ros_client, topic, callback_at_initialise_timeout=-1.0, async_loop_lock=None)[source]

Bases: PerceptionSystem[Tasynclooplock, PoseStamped]

Perception system that interfaces with ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component.

This perception system is used to perceive the pose stamped information. The information will be stored in the agent’s self state.

Parameters:
  • agent_id (int) – Identifier of the agent this component belongs to

  • ros_client (RosClient) – ROS client to use for the communication

  • topic (str) – ROS topic the perception will subscribe to, e.g. /my_topic

  • async_loop_lock (Optional[TypeVar(Tasynclooplock, bound= AsyncLoopLock)], default: None) – Async loop lock to use for the communication receiver

_update(perceived_information)[source]

Update the agent’s model with the new perceived information.

Example

A new perception system could decide to override the default _update() method.

>>> from symaware.base import PerceptionSystem, StateObservation, Identifier
>>> class MyPerceptionSystem(PerceptionSystem):
...     def _update(self, perceived_information: "dict[Identifier, StateObservation]"):
...         # Your implementation here
...         # Example:
...         # Simply override the state of the agent
...         self._agent.awareness_database[self._agent_id].state = perceived_information[self._agent_id].state
Parameters:

perceived_information (Iterable[PoseStamped]) – Information perceived by the agent. Each agent’s state is identified by its id.

class symaware.extra.ros.roslib.components.RosLibPrescanReadyPerceptionSystem(agent_id, ros_client, topic, callback_at_initialise_timeout=-1.0, sleep_after_status=2.0, async_loop_lock=None)[source]

Bases: PerceptionSystem[Tasynclooplock, Bool]

Perception system that interfaces with ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component.

This perception system is used to receive the flag from Prescan, to know when the simulation is ready. The information will be stored in the agent’s knowledge database with the key “prescan_status”.

Parameters:
  • agent_id (int) – Identifier of the agent this component belongs to

  • ros_client (RosClient) – ROS client to use for the communication

  • topic (str) – ROS topic the perception will subscribe to, e.g. /my_topic

  • callback_at_initialise_timeout (float, default: -1.0) – If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message from the topic it is subscribed to or the timeout expires. Useful to coordinate when expecting some mandatory data from another ROS node. Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop.

  • async_loop_lock (Optional[TypeVar(Tasynclooplock, bound= AsyncLoopLock)], default: None) – Async loop lock to use for the communication receiver

_on_subscription_callback(message)[source]

Callback that adds messages to the queue in a thread-safe manner.

_update(perceived_information)[source]

Update the agent’s model with the new perceived information.

Example

A new perception system could decide to override the default _update() method.

>>> from symaware.base import PerceptionSystem, StateObservation, Identifier
>>> class MyPerceptionSystem(PerceptionSystem):
...     def _update(self, perceived_information: "dict[Identifier, StateObservation]"):
...         # Your implementation here
...         # Example:
...         # Simply override the state of the agent
...         self._agent.awareness_database[self._agent_id].state = perceived_information[self._agent_id].state
Parameters:

perceived_information (Iterable[Bool]) – Information perceived by the agent. Each agent’s state is identified by its id.

class symaware.extra.ros.roslib.components.RosLibPrescanStatusPerceptionSystem(agent_id, ros_client, topic, callback_at_initialise_timeout=-1.0, async_loop_lock=None)[source]

Bases: PerceptionSystem[Tasynclooplock, PrescanStatus]

Perception system that interfaces with ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component.

This perception system is used to receive the state of Prescan, to know when the simulation is ready. The information will be stored in the agent’s knowledge database with the key “prescan_status”.

Parameters:
  • agent_id (int) – Identifier of the agent this component belongs to

  • ros_client (RosClient) – ROS client to use for the communication

  • topic (str) – ROS topic the perception will subscribe to, e.g. /my_topic

  • callback_at_initialise_timeout (float, default: -1.0) – If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message from the topic it is subscribed to or the timeout expires. Useful to coordinate when expecting some mandatory data from another ROS node. Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop.

  • async_loop_lock (Optional[TypeVar(Tasynclooplock, bound= AsyncLoopLock)], default: None) – Async loop lock to use for the communication receiver

_update(perceived_information)[source]

Update the agent’s model with the new perceived information.

Example

A new perception system could decide to override the default _update() method.

>>> from symaware.base import PerceptionSystem, StateObservation, Identifier
>>> class MyPerceptionSystem(PerceptionSystem):
...     def _update(self, perceived_information: "dict[Identifier, StateObservation]"):
...         # Your implementation here
...         # Example:
...         # Simply override the state of the agent
...         self._agent.awareness_database[self._agent_id].state = perceived_information[self._agent_id].state
Parameters:

perceived_information (Iterable[PrescanStatus]) – Information perceived by the agent. Each agent’s state is identified by its id.

class symaware.extra.ros.roslib.components.RosLibTrafficSignPerceptionSystem(agent_id, ros_client, topic, callback_at_initialise_timeout=-1.0, async_loop_lock=None)[source]

Bases: PerceptionSystem[Tasynclooplock, Odometry]

Perception system that interfaces with ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component.

This perception system is used to perceive the odometry information. The information will be stored in the agent’s self state.

Parameters:
  • agent_id (int) – Identifier of the agent this component belongs to

  • ros_client (RosClient) – ROS client to use for the communication

  • topic (str) – ROS topic the perception will subscribe to, e.g. /my_topic

  • async_loop_lock (Optional[TypeVar(Tasynclooplock, bound= AsyncLoopLock)], default: None) – Async loop lock to use for the communication receiver

_update(perceived_information)[source]

Update the agent’s model with the new perceived information.

Example

A new perception system could decide to override the default _update() method.

>>> from symaware.base import PerceptionSystem, StateObservation, Identifier
>>> class MyPerceptionSystem(PerceptionSystem):
...     def _update(self, perceived_information: "dict[Identifier, StateObservation]"):
...         # Your implementation here
...         # Example:
...         # Simply override the state of the agent
...         self._agent.awareness_database[self._agent_id].state = perceived_information[self._agent_id].state
Parameters:

perceived_information (Iterable[Vector3]) – Information perceived by the agent. Each agent’s state is identified by its id.

symaware.extra.ros.roslib.data_structures module

class symaware.extra.ros.roslib.data_structures.AckermannDrive(steering_angle=0.0, steering_angle_velocity=0.0, speed=0.0, acceleration=0.0, jerk=0.0)[source]

Bases: RosMessageData

Driving command for a car-like vehicle using Ackermann steering

Assumes Ackermann front-wheel steering. The left and right front wheels are generally at different angles. To simplify, the commanded angle corresponds to the yaw of a virtual wheel located at the center of the front axle, like on a tricycle. Positive yaw is to the left. (This is not the angle of the steering wheel inside the passenger compartment.)

Zero steering angle velocity means change the steering angle as quickly as possible. Positive velocity indicates a desired absolute rate of change either left or right. The controller tries not to exceed this limit in either direction, but sometimes it might.

Drive at requested speed, acceleration and jerk (the 1st, 2nd and 3rd derivatives of position). All are measured at the vehicle’s center of rotation, typically the center of the rear axle. The controller tries not to exceed these limits in either direction, but sometimes it might.

Speed is the desired scalar magnitude of the velocity vector. Direction is forward unless the sign is negative, indicating reverse.

Zero acceleration means change speed as quickly as possible. Positive acceleration indicates a desired absolute magnitude; that includes deceleration.

Zero jerk means change acceleration as quickly as possible. Positive jerk indicates a desired absolute rate of acceleration change in either direction (increasing or decreasing).

Parameters:
  • steering_angle (float, default: 0.0) – desired virtual angle (radians)

  • steering_angle_velocity (float, default: 0.0) – desired rate of change (radians/s)

  • speed (float, default: 0.0) – desired forward speed (m/s)

  • acceleration (float, default: 0.0) – desired acceleration (m/s^2)

  • jerk (float, default: 0.0) – desired jerk (m/s^3)

acceleration: float = 0.0
classmethod from_dict(d)[source]
Return type:

Self

jerk: float = 0.0
static msg_type()[source]

Return ROS message type string. Must be called on a subclass.

Return type:

str

speed: float = 0.0
steering_angle: float = 0.0
steering_angle_velocity: float = 0.0
class symaware.extra.ros.roslib.data_structures.AckermannDriveStamped(header=<factory>, drive=<factory>)[source]

Bases: RosMessageData

Time stamped drive command for robots with Ackermann steering

Parameters:
  • header (Header, default: <factory>) – Header

  • drive (AckermannDrive, default: <factory>) – AckermannDrive

drive: AckermannDrive
classmethod from_dict(d)[source]
Return type:

Self

header: Header
static msg_type()[source]

Return ROS message type string. Must be called on a subclass.

Return type:

str

class symaware.extra.ros.roslib.data_structures.AirSensorOutput(header=<factory>, tid='', range=0.0, azimuth=0.0, elevation=0.0, velocity=0.0, heading=0.0)[source]

Bases: RosMessageData

Output produced by the AIR sensor

azimuth: float = 0.0
elevation: float = 0.0
classmethod from_dict(d)[source]
Return type:

Self

header: Header
heading: float = 0.0
static msg_type()[source]

Return ROS message type string. Must be called on a subclass.

Return type:

str

range: float = 0.0
tid: str = ''
velocity: float = 0.0
class symaware.extra.ros.roslib.data_structures.Bool(data=False)[source]

Bases: RosMessageData

Boolean message

Parameters:

data (bool, default: False) – boolean data

data: bool = False
classmethod from_dict(d)[source]
Return type:

Self

static msg_type()[source]

Return ROS message type string. Must be called on a subclass.

Return type:

str

class symaware.extra.ros.roslib.data_structures.Float32(data=0.0)[source]

Bases: RosMessageData

32-bit float message

Parameters:

data (float, default: 0.0) – 32-bit float data

data: float = 0.0
classmethod from_dict(d)[source]
Return type:

Self

static msg_type()[source]

Return ROS message type string. Must be called on a subclass.

Return type:

str

class symaware.extra.ros.roslib.data_structures.Float32Array(data=<factory>, layout=<factory>)[source]

Bases: RosMessageData

32-bit float array message

Parameters:

data (list[float], default: <factory>) – 32-bit float data

data: list[float]
classmethod from_dict(d)[source]
Return type:

Self

layout: dict
static msg_type()[source]

Return ROS message type string. Must be called on a subclass.

Return type:

str

class symaware.extra.ros.roslib.data_structures.Float64(data=0.0)[source]

Bases: RosMessageData

64-bit float message

Parameters:

data (float, default: 0.0) – 64-bit float data

data: float = 0.0
classmethod from_dict(d)[source]
Return type:

Self

static msg_type()[source]

Return ROS message type string. Must be called on a subclass.

Return type:

str

class symaware.extra.ros.roslib.data_structures.Float64Array(data=<factory>, layout=<factory>)[source]

Bases: RosMessageData

64-bit float array message

Parameters:

data (list[float], default: <factory>) – 64-bit float data

data: list[float]
classmethod from_dict(d)[source]
Return type:

Self

layout: dict
static msg_type()[source]

Return ROS message type string. Must be called on a subclass.

Return type:

str

class symaware.extra.ros.roslib.data_structures.Header(stamp=<factory>, frame_id='')[source]

Bases: RosMessageData

Message header with a sequence number and timestamp information

frame_id: str = ''
classmethod from_dict(d)[source]
Return type:

Self

static msg_type()[source]

Return ROS message type string. Must be called on a subclass.

Return type:

str

stamp: Time
class symaware.extra.ros.roslib.data_structures.Int32(data=0)[source]

Bases: RosMessageData

32-bit integer message

Parameters:

data (int, default: 0) – 32-bit integer data

data: int = 0
classmethod from_dict(d)[source]
Return type:

Self

static msg_type()[source]

Return ROS message type string. Must be called on a subclass.

Return type:

str

class symaware.extra.ros.roslib.data_structures.Int32Array(data=<factory>, layout=<factory>)[source]

Bases: RosMessageData

32-bit integer array message

Parameters:

data (list[int], default: <factory>) – 32-bit integer data

data: list[int]
classmethod from_dict(d)[source]
Return type:

Self

layout: dict
static msg_type()[source]

Return ROS message type string. Must be called on a subclass.

Return type:

str

class symaware.extra.ros.roslib.data_structures.JsonAirSensorOutput(header=<factory>, tid='', range=0.0, azimuth=0.0, elevation=0.0, velocity=0.0, heading=0.0)[source]

Bases: RosMessageData

Output produced by the AIR sensor, sent as a JSON string

azimuth: float = 0.0
elevation: float = 0.0
classmethod from_dict(d)[source]
Return type:

Self

header: Header
heading: float = 0.0
static msg_type()[source]

Return ROS message type string. Must be called on a subclass.

Return type:

str

range: float = 0.0
tid: str = ''
velocity: float = 0.0
class symaware.extra.ros.roslib.data_structures.LaserScan(header=<factory>, angle_min=0.0, angle_max=0.0, angle_increment=0.0, time_increment=0.0, scan_time=0.0, range_min=0.0, range_max=0.0, ranges=<factory>, intensities=<factory>)[source]

Bases: RosMessageData

Single scan from a planar laser range-finder

If you have another ranging device with different behavior (e.g. a sonar array), please find or create a different message, since applications will make fairly laser-specific assumptions about this data

Parameters:
  • header (Header, default: <factory>) – Timestamp in the header is the acquisition time of the first ray in the scan. In frame frame_id, angles are measured around the positive Z axis (counterclockwise, if Z is up) with zero angle being forward along the x axis

  • angle_min (float, default: 0.0) – Start angle of the scan [rad]

  • angle_max (float, default: 0.0) – End angle of the scan [rad]

  • angle_increment (float, default: 0.0) – Angular distance between measurements [rad]

  • time_increment (float, default: 0.0) – Time between measurements [seconds] - if your scanner is moving, this will be used in interpolating position of 3d points

  • scan_time (float, default: 0.0) – time between scans [seconds]

  • range_min (float, default: 0.0) – minimum range value [m]

  • range_max (float, default: 0.0) – maximum range value [m]

  • ranges (list[float], default: <factory>) – range data [m] (Note: values < range_min or > range_max should be discarded)

  • intensities (list[float], default: <factory>) – intensity data [device-specific units]. If your device does not provide intensities, please leave the array empty.

angle_increment: float = 0.0
angle_max: float = 0.0
angle_min: float = 0.0
classmethod from_dict(d)[source]
Return type:

Self

header: Header
intensities: list[float]
static msg_type()[source]

Return ROS message type string. Must be called on a subclass.

Return type:

str

range_max: float = 0.0
range_min: float = 0.0
ranges: list[float]
scan_time: float = 0.0
time_increment: float = 0.0
class symaware.extra.ros.roslib.data_structures.Odometry(header=<factory>, child_frame_id='', pose=<factory>, twist=<factory>)[source]

Bases: RosMessageData

This represents an estimate of a position and velocity in free space. The pose in this message should be specified in the coordinate frame given by header.frame_id. The twist in this message should be specified in the coordinate frame given by the child_frame_id

Parameters:
  • header (Header, default: <factory>) – Header

  • child_frame_id (str, default: '') – string

  • pose (PoseWithCovariance, default: <factory>) – PoseWithCovariance

  • twist (TwistWithCovariance, default: <factory>) – TwistWithCovariance

child_frame_id: str = ''
classmethod from_dict(d)[source]
Return type:

Self

header: Header
static msg_type()[source]

Return ROS message type string. Must be called on a subclass.

Return type:

str

pose: PoseWithCovariance
twist: TwistWithCovariance
class symaware.extra.ros.roslib.data_structures.Path(header=<factory>, poses=<factory>)[source]

Bases: RosMessageData

List of points that make up a path

classmethod from_dict(d)[source]
Return type:

Self

header: Header
static msg_type()[source]

Return ROS message type string. Must be called on a subclass.

Return type:

str

poses: list[Pose]
class symaware.extra.ros.roslib.data_structures.PathStamped(header=<factory>, poses=<factory>)[source]

Bases: RosMessageData

An array of poses that represents a Path for a robot to follow

classmethod from_dict(d)[source]
Return type:

Self

header: Header
static msg_type()[source]

Return ROS message type string. Must be called on a subclass.

Return type:

str

poses: list[PoseStamped]
class symaware.extra.ros.roslib.data_structures.Point(x=0.0, y=0.0, z=0.0)[source]

Bases: RosMessageData

3-Dimensional point

classmethod from_dict(d)[source]
Return type:

Self

static msg_type()[source]

Return ROS message type string. Must be called on a subclass.

Return type:

str

x: float = 0.0
y: float = 0.0
z: float = 0.0
class symaware.extra.ros.roslib.data_structures.Pose(position=<factory>, orientation=<factory>)[source]

Bases: RosMessageData

Pose, combining both a position and an orientation

classmethod from_dict(d)[source]
Return type:

Self

static msg_type()[source]

Return ROS message type string. Must be called on a subclass.

Return type:

str

orientation: Quaternion
position: Point
class symaware.extra.ros.roslib.data_structures.PoseArray(header=<factory>, poses=<factory>)[source]

Bases: RosMessageData

An array of poses that represents a set of points in space

classmethod from_dict(d)[source]
Return type:

Self

header: Header
static msg_type()[source]

Return ROS message type string. Must be called on a subclass.

Return type:

str

poses: list[Pose]
class symaware.extra.ros.roslib.data_structures.PoseStamped(header=<factory>, pose=<factory>)[source]

Bases: RosMessageData

A Pose with reference coordinate frame and timestamp

classmethod from_dict(d)[source]
Return type:

Self

header: Header
static msg_type()[source]

Return ROS message type string. Must be called on a subclass.

Return type:

str

pose: Pose
class symaware.extra.ros.roslib.data_structures.PoseWithCovariance(pose=<factory>, covariance=<factory>)[source]

Bases: RosMessageData

This represents a pose in free space with uncertainty.

The pose is defined by a position and orientation.

Parameters:
  • pose (Pose, default: <factory>) – Pose

  • covariance (list[float], default: <factory>) – float64[36]

covariance: list[float]
classmethod from_dict(d)[source]
Return type:

Self

static msg_type()[source]

Return ROS message type string. Must be called on a subclass.

Return type:

str

pose: Pose
class symaware.extra.ros.roslib.data_structures.PrescanStatus(ready=False, stop=False)[source]

Bases: RosMessageData

Whether the simulation is ready to start

classmethod from_dict(d)[source]
Return type:

Self

static msg_type()[source]

Return ROS message type string. Must be called on a subclass.

Return type:

str

ready: bool = False
stop: bool = False
class symaware.extra.ros.roslib.data_structures.Quaternion(x=0.0, y=0.0, z=0.0, w=0.0)[source]

Bases: RosMessageData

Quaternion rotation

classmethod from_dict(d)[source]
Return type:

Self

static msg_type()[source]

Return ROS message type string. Must be called on a subclass.

Return type:

str

w: float = 0.0
x: float = 0.0
y: float = 0.0
z: float = 0.0
class symaware.extra.ros.roslib.data_structures.RosLibMessage(sender_id, receiver_id, data)[source]

Bases: Message

Simple class to store messages between agents.

Parameters:
  • sender_id (int) – Id of the sender agent

  • receiver_id (int) – Id of the receiver agent

  • data (RosMessageData) – ROS message

data: RosMessageData
class symaware.extra.ros.roslib.data_structures.RosMessageData[source]

Bases: ABC

abstractmethod classmethod from_dict(d)[source]
Return type:

Self

static msg_type()[source]

Return ROS message type string. Must be called on a subclass.

Return type:

str

to_dict()[source]

Convert dataclass to dictionary.

Return type:

dict

class symaware.extra.ros.roslib.data_structures.String(data='')[source]

Bases: RosMessageData

String message

Parameters:

data (str, default: '') – string data

data: str = ''
classmethod from_dict(d)[source]
Return type:

Self

static msg_type()[source]

Return ROS message type string. Must be called on a subclass.

Return type:

str

class symaware.extra.ros.roslib.data_structures.Time(sec=<factory>, nanosec=<factory>)[source]

Bases: RosMessageData

Timestamp, using both seconds and nanoseconds

classmethod from_dict(d)[source]
Return type:

Self

static msg_type()[source]

Return ROS message type string. Must be called on a subclass.

Return type:

str

nanosec: int
sec: int
class symaware.extra.ros.roslib.data_structures.TrafficLightState(header=<factory>, id='', position=<factory>, status='')[source]

Bases: RosMessageData

State of a traffic light

classmethod from_dict(d)[source]
Return type:

Self

header: Header
id: str = ''
static msg_type()[source]

Return ROS message type string. Must be called on a subclass.

Return type:

str

position: Point
status: str = ''
class symaware.extra.ros.roslib.data_structures.Twist(linear=<factory>, angular=<factory>)[source]

Bases: RosMessageData

This expresses velocity in free space broken into its linear and angular parts.

Parameters:
  • linear (Vector3, default: <factory>) – Vector3

  • angular (Vector3, default: <factory>) – Vector3

angular: Vector3
classmethod from_dict(d)[source]
Return type:

Self

linear: Vector3
static msg_type()[source]

Return ROS message type string. Must be called on a subclass.

Return type:

str

class symaware.extra.ros.roslib.data_structures.TwistWithCovariance(twist=<factory>, covariance=<factory>)[source]

Bases: RosMessageData

This expresses velocity in free space with uncertainty.

Parameters:
  • twist (Twist, default: <factory>) – Twist

  • covariance (list[float], default: <factory>) – float64[36]

covariance: list[float]
classmethod from_dict(d)[source]
Return type:

Self

static msg_type()[source]

Return ROS message type string. Must be called on a subclass.

Return type:

str

twist: Twist
class symaware.extra.ros.roslib.data_structures.Vector3(x, y, z)[source]

Bases: RosMessageData

3-Dimensional vector

Parameters:
classmethod from_dict(d)[source]
Return type:

Self

static msg_type()[source]

Return ROS message type string. Must be called on a subclass.

Return type:

str

x: float
y: float
z: float

symaware.extra.ros.roslib.models module

class symaware.extra.ros.roslib.models.RosLibAckermannDriveModel(ID, topic='', compression=None, latch=False, throttle_rate=0, queue_size=0, reconnect_on_close=True)[source]

Bases: AckermannDriveModel[AckermannDriveStamped]

Dynamical model using the AckermannDrive type messages as a control input.

step()[source]

Run a simulation step of the dynamical model. Called by the associated entity at each simulation step.

Parameters:
  • args – Additional arguments

  • kwargs – Additional keyword arguments

class symaware.extra.ros.roslib.models.RosLibOdometryeModel(ID, topic='', compression=None, latch=False, throttle_rate=0, queue_size=0, reconnect_on_close=True)[source]

Bases: OdometryModel[Odometry]

Dynamical model using the twist type messages as a control input.

step()[source]

Run a simulation step of the dynamical model. Called by the associated entity at each simulation step.

Parameters:
  • args – Additional arguments

  • kwargs – Additional keyword arguments

class symaware.extra.ros.roslib.models.RosLibTwistDriveModel(ID, topic='', compression=None, latch=False, throttle_rate=0, queue_size=0, reconnect_on_close=True)[source]

Bases: TwistDriveModel[Twist]

Dynamical model using the twist type messages as a control input.

step()[source]

Run a simulation step of the dynamical model. Called by the associated entity at each simulation step.

Parameters:
  • args – Additional arguments

  • kwargs – Additional keyword arguments

symaware.extra.ros.roslib.ros_client module

class symaware.extra.ros.roslib.ros_client.RosLibClient(host='localhost', port=9090, is_secure=False)[source]

Bases: RosClient[RosMessageData, Topic, Topic]

Global ROS client used to communicate with the [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) node. It should be initialised once using the with construct. This ensures that the client will disconnect gracefully regardless any error.

Example

Use the RosClient

>>> from symaware.extra.ros import RosClient
>>> def main():
...     # Add your communication components in here
...     pass
>>> if __name__ == "__main__":
...     with RosClient(host="localhost", port=9090) as ros:
...         main(ros)
Parameters:
  • host (str, default: 'localhost') – Name or IP address of the ROS bridge host, e.g. localhost, 127.0.0.1, 192.168.1.2, my.works.station

  • port (int, default: 9090) – ROS bridge port, e.g. 9090

  • is_secure (bool, default: False) – Whether to use a secure web sockets connection

connect()[source]

Connect to ROS bridge with connection validation.

disconnect()[source]

Disconnect from ROS, unadvertise publishers, and unsubscribe subscribers.

property host: str

ROS bridge host name or IP address.

property is_connected

Whether client is connected to ROS bridge.

property is_secure: bool

Whether using secure websockets.

property port: int

ROS bridge port number.

publish(publisher, message)[source]

Publish the given message to the provided publisher.

Parameters:
  • publisher (Topic) – Publisher to use to send the message

  • message (RosMessageData) – Message to publish to the ROS topics

publish_topic(topic_name, message_type, compression=None, latch=False, throttle_rate=0, queue_size=0, reconnect_on_close=True)[source]

Create and advertise a new ROS topic with the given name and message_type.

Parameters:
  • topic_name (str) – ROS topic the sender publish to, e.g. /my_topic

  • message_type (type[RosMessageData]) – Type of messages the sender will publish, e.g. std_msgs/String

  • compression (Any, default: None) – Type of compression to use, e.g. png

  • latch (bool, default: False) – True to latch the topic when publishing, False otherwise

  • throttle_rate (int, default: 0) – Rate (in ms between messages) at which to throttle the topics

  • queue_size (int, default: 0) – Queue size created at bridge side for re-publishing webtopics

  • reconnect_on_close (bool, default: True) – Reconnect topic (both publisher & subscriber) on reconnection

Return type:

Topic

property ros: roslibpy.Ros

Underlying roslibpy.Ros instance.

subscribe_topic(topic_name, message_type, callback, compression=None, throttle_rate=0, queue_size=0, reconnect_on_close=True)[source]

Subscribe to the ROS topic with the given name and message_type.

Parameters:
  • topic_name (str) – ROS topic the sender publish to, e.g. /my_topic

  • message_type (type[RosMessageData]) – Type of messages the sender will publish, e.g. std_msgs/String

  • compression (Any, default: None) – Type of compression to use, e.g. png

  • throttle_rate (int, default: 0) – Rate (in ms between messages) at which to throttle the topics

  • queue_size (int, default: 0) – Queue size created at bridge side for re-publishing webtopics

  • queue_size – Queue length at bridge side used when subscribing

  • reconnect_on_close (bool, default: True) – Reconnect topic (both publisher & subscriber) on reconnection

Module contents