symaware.extra.ros.roslib package
Submodules
symaware.extra.ros.roslib.components module
- class symaware.extra.ros.roslib.components.RosLibAirSensorOutputPerceptionSystem(agent_id, ros_client, topic, callback_at_initialise_timeout=-1.0, async_loop_lock=None)[source]
Bases:
PerceptionSystem[Tasynclooplock,AirSensorOutput]Perception system that interfaces with ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component.
This perception system is used to perceive output of the AIR sensor. The information will be stored in the agent’s knowledge database with the key “air_sensor_output”.
- Parameters:
agent_id (
int) – Identifier of the agent this component belongs toros_client (
RosClient) – ROS client to use for the communicationtopic (
str) – ROS topic the perception will subscribe to, e.g. /my_topiccallback_at_initialise_timeout (
float, default:-1.0) – If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message from the topic it is subscribed to or the timeout expires. Useful to coordinate when expecting some mandatory data from another ROS node. Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop.async_loop_lock (
Optional[TypeVar(Tasynclooplock, bound=AsyncLoopLock)], default:None) – Async loop lock to use for the communication receiver
- _update(perceived_information)[source]
Update the agent’s model with the new perceived information.
Example
A new perception system could decide to override the default
_update()method.>>> from symaware.base import PerceptionSystem, StateObservation, Identifier >>> class MyPerceptionSystem(PerceptionSystem): ... def _update(self, perceived_information: "dict[Identifier, StateObservation]"): ... # Your implementation here ... # Example: ... # Simply override the state of the agent ... self._agent.awareness_database[self._agent_id].state = perceived_information[self._agent_id].state
- Parameters:
perceived_information (
Iterable[AirSensorOutput]) – Information perceived by the agent. Each agent’s state is identified by its id.
- class symaware.extra.ros.roslib.components.RosLibJsonAirSensorOutputPerceptionSystem(agent_id, ros_client, topic, tid='car57', callback_at_initialise_timeout=-1.0, async_loop_lock=None)[source]
Bases:
PerceptionSystem[Tasynclooplock,JsonAirSensorOutput]Perception system that interfaces with ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component.
This perception system is used to perceive output of the AIR sensor, sent using JSON. The information will be stored in the agent’s knowledge database with the key “air_sensor_output”.
- Parameters:
agent_id (
int) – Identifier of the agent this component belongs toros_client (
RosClient) – ROS client to use for the communicationtopic (
str) – ROS topic the perception will subscribe to, e.g. /my_topiccallback_at_initialise_timeout (
float, default:-1.0) – If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message from the topic it is subscribed to or the timeout expires. Useful to coordinate when expecting some mandatory data from another ROS node. Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop.async_loop_lock (
Optional[TypeVar(Tasynclooplock, bound=AsyncLoopLock)], default:None) – Async loop lock to use for the communication receiver
- async _async_compute()[source]
Given the state of the agent, compute a value or a set of values asynchronously.
Note
Check
_compute()for more information about the method andAsyncLoopLockablefor more information about the async loop.- Parameters:
args – Additional arguments
kwargs – Additional keyword arguments
- Returns:
Iterable[JsonAirSensorOutput] – Computed value or set of values
- _on_subscription_callback(message)[source]
Callback that adds messages to the queue in a thread-safe manner.
- _update(perceived_information)[source]
Update the agent’s model with the new perceived information.
Example
A new perception system could decide to override the default
_update()method.>>> from symaware.base import PerceptionSystem, StateObservation, Identifier >>> class MyPerceptionSystem(PerceptionSystem): ... def _update(self, perceived_information: "dict[Identifier, StateObservation]"): ... # Your implementation here ... # Example: ... # Simply override the state of the agent ... self._agent.awareness_database[self._agent_id].state = perceived_information[self._agent_id].state
- Parameters:
perceived_information (
Iterable[JsonAirSensorOutput]) – Information perceived by the agent. Each agent’s state is identified by its id.
- class symaware.extra.ros.roslib.components.RosLibOdometryPerceptionSystem(agent_id, ros_client, topic, callback_at_initialise_timeout=-1.0, async_loop_lock=None)[source]
Bases:
PerceptionSystem[Tasynclooplock,Odometry]Perception system that interfaces with ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component.
This perception system is used to perceive the odometry information. The information will be stored in the agent’s self state.
- Parameters:
agent_id (
int) – Identifier of the agent this component belongs toros_client (
RosClient) – ROS client to use for the communicationtopic (
str) – ROS topic the perception will subscribe to, e.g. /my_topicasync_loop_lock (
Optional[TypeVar(Tasynclooplock, bound=AsyncLoopLock)], default:None) – Async loop lock to use for the communication receiver
- _update(perceived_information)[source]
Update the agent’s model with the new perceived information.
Example
A new perception system could decide to override the default
_update()method.>>> from symaware.base import PerceptionSystem, StateObservation, Identifier >>> class MyPerceptionSystem(PerceptionSystem): ... def _update(self, perceived_information: "dict[Identifier, StateObservation]"): ... # Your implementation here ... # Example: ... # Simply override the state of the agent ... self._agent.awareness_database[self._agent_id].state = perceived_information[self._agent_id].state
- class symaware.extra.ros.roslib.components.RosLibPathPerceptionSystem(agent_id, ros_client, topic, callback_at_initialise_timeout=-1.0, async_loop_lock=None)[source]
Bases:
PerceptionSystem[Tasynclooplock,Path]Perception system that interfaces with ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component.
This perception system is used to perceive the path for the agent to follow. The information will be stored in the agent’s knowledge database with the key “path”.
- Parameters:
agent_id (
int) – Identifier of the agent this component belongs toros_client (
RosClient) – ROS client to use for the communicationtopic (
str) – ROS topic the perception will subscribe to, e.g. /my_topiccallback_at_initialise_timeout (
float, default:-1.0) – If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message from the topic it is subscribed to or the timeout expires. Useful to coordinate when expecting some mandatory data from another ROS node. Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop.async_loop_lock (
Optional[TypeVar(Tasynclooplock, bound=AsyncLoopLock)], default:None) – Async loop lock to use for the communication receiver
- _update(perceived_information)[source]
Update the agent’s model with the new perceived information.
Example
A new perception system could decide to override the default
_update()method.>>> from symaware.base import PerceptionSystem, StateObservation, Identifier >>> class MyPerceptionSystem(PerceptionSystem): ... def _update(self, perceived_information: "dict[Identifier, StateObservation]"): ... # Your implementation here ... # Example: ... # Simply override the state of the agent ... self._agent.awareness_database[self._agent_id].state = perceived_information[self._agent_id].state
- class symaware.extra.ros.roslib.components.RosLibPathStampedPerceptionSystem(agent_id, ros_client, topic, callback_at_initialise_timeout=-1.0, async_loop_lock=None)[source]
Bases:
PerceptionSystem[Tasynclooplock,PathStamped]Perception system that interfaces with ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component.
This perception system is used to perceive the path stamped for the agent to follow. The information will be stored in the agent’s knowledge database with the key “path”.
- Parameters:
agent_id (
int) – Identifier of the agent this component belongs toros_client (
RosClient) – ROS client to use for the communicationtopic (
str) – ROS topic the perception will subscribe to, e.g. /my_topiccallback_at_initialise_timeout (
float, default:-1.0) – If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message from the topic it is subscribed to or the timeout expires. Useful to coordinate when expecting some mandatory data from another ROS node. Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop.async_loop_lock (
Optional[TypeVar(Tasynclooplock, bound=AsyncLoopLock)], default:None) – Async loop lock to use for the communication receiver
- _update(perceived_information)[source]
Update the agent’s model with the new perceived information.
Example
A new perception system could decide to override the default
_update()method.>>> from symaware.base import PerceptionSystem, StateObservation, Identifier >>> class MyPerceptionSystem(PerceptionSystem): ... def _update(self, perceived_information: "dict[Identifier, StateObservation]"): ... # Your implementation here ... # Example: ... # Simply override the state of the agent ... self._agent.awareness_database[self._agent_id].state = perceived_information[self._agent_id].state
- Parameters:
perceived_information (
Iterable[PathStamped]) – Information perceived by the agent. Each agent’s state is identified by its id.
- class symaware.extra.ros.roslib.components.RosLibPathsStampedPerceptionSystem(agent_id, ros_client, topic, callback_at_initialise_timeout=-1.0, async_loop_lock=None)[source]
Bases:
PerceptionSystem[Tasynclooplock,PathStamped]Perception system that interfaces with ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component.
This perception system is used to perceive the path stamped for the agent to follow. The information will be stored in the agent’s knowledge database with the key “path”.
- Parameters:
agent_id (
int) – Identifier of the agent this component belongs toros_client (
RosClient) – ROS client to use for the communicationtopic (
str) – ROS topic the perception will subscribe to, e.g. /my_topiccallback_at_initialise_timeout (
float, default:-1.0) – If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message from the topic it is subscribed to or the timeout expires. Useful to coordinate when expecting some mandatory data from another ROS node. Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop.async_loop_lock (
Optional[TypeVar(Tasynclooplock, bound=AsyncLoopLock)], default:None) – Async loop lock to use for the communication receiver
- _update(perceived_information)[source]
Update the agent’s model with the new perceived information.
Example
A new perception system could decide to override the default
_update()method.>>> from symaware.base import PerceptionSystem, StateObservation, Identifier >>> class MyPerceptionSystem(PerceptionSystem): ... def _update(self, perceived_information: "dict[Identifier, StateObservation]"): ... # Your implementation here ... # Example: ... # Simply override the state of the agent ... self._agent.awareness_database[self._agent_id].state = perceived_information[self._agent_id].state
- Parameters:
perceived_information (
Iterable[PathStamped]) – Information perceived by the agent. Each agent’s state is identified by its id.
- class symaware.extra.ros.roslib.components.RosLibPoseArrayPerceptionSystem(agent_id, ros_client, topic, callback_at_initialise_timeout=-1.0, async_loop_lock=None)[source]
Bases:
PerceptionSystem[Tasynclooplock,PoseArray]Perception system that interfaces with ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component.
This perception system is used to perceive the odometry information. The information will be stored in the agent’s self state.
- Parameters:
agent_id (
int) – Identifier of the agent this component belongs toros_client (
RosClient) – ROS client to use for the communicationtopic (
str) – ROS topic the perception will subscribe to, e.g. /my_topicasync_loop_lock (
Optional[TypeVar(Tasynclooplock, bound=AsyncLoopLock)], default:None) – Async loop lock to use for the communication receiver
- _update(perceived_information)[source]
Update the agent’s model with the new perceived information.
Example
A new perception system could decide to override the default
_update()method.>>> from symaware.base import PerceptionSystem, StateObservation, Identifier >>> class MyPerceptionSystem(PerceptionSystem): ... def _update(self, perceived_information: "dict[Identifier, StateObservation]"): ... # Your implementation here ... # Example: ... # Simply override the state of the agent ... self._agent.awareness_database[self._agent_id].state = perceived_information[self._agent_id].state
- class symaware.extra.ros.roslib.components.RosLibPosePerceptionSystem(agent_id, ros_client, topic, callback_at_initialise_timeout=-1.0, async_loop_lock=None)[source]
Bases:
PerceptionSystem[Tasynclooplock,Pose]Perception system that interfaces with ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component.
This perception system is used to perceive the pose of the agent. The information will be stored in the agent’s self state.
- Parameters:
agent_id (
int) – Identifier of the agent this component belongs toros_client (
RosClient) – ROS client to use for the communicationtopic (
str) – ROS topic the perception will subscribe to, e.g. /my_topiccallback_at_initialise_timeout (
float, default:-1.0) – If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message from the topic it is subscribed to or the timeout expires. Useful to coordinate when expecting some mandatory data from another ROS node. Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop.async_loop_lock (
Optional[TypeVar(Tasynclooplock, bound=AsyncLoopLock)], default:None) – Async loop lock to use for the communication receiver
- _update(perceived_information)[source]
Update the agent’s model with the new perceived information.
Example
A new perception system could decide to override the default
_update()method.>>> from symaware.base import PerceptionSystem, StateObservation, Identifier >>> class MyPerceptionSystem(PerceptionSystem): ... def _update(self, perceived_information: "dict[Identifier, StateObservation]"): ... # Your implementation here ... # Example: ... # Simply override the state of the agent ... self._agent.awareness_database[self._agent_id].state = perceived_information[self._agent_id].state
- class symaware.extra.ros.roslib.components.RosLibPoseStampedPerceptionSystem(agent_id, ros_client, topic, callback_at_initialise_timeout=-1.0, async_loop_lock=None)[source]
Bases:
PerceptionSystem[Tasynclooplock,PoseStamped]Perception system that interfaces with ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component.
This perception system is used to perceive the pose stamped information. The information will be stored in the agent’s self state.
- Parameters:
agent_id (
int) – Identifier of the agent this component belongs toros_client (
RosClient) – ROS client to use for the communicationtopic (
str) – ROS topic the perception will subscribe to, e.g. /my_topicasync_loop_lock (
Optional[TypeVar(Tasynclooplock, bound=AsyncLoopLock)], default:None) – Async loop lock to use for the communication receiver
- _update(perceived_information)[source]
Update the agent’s model with the new perceived information.
Example
A new perception system could decide to override the default
_update()method.>>> from symaware.base import PerceptionSystem, StateObservation, Identifier >>> class MyPerceptionSystem(PerceptionSystem): ... def _update(self, perceived_information: "dict[Identifier, StateObservation]"): ... # Your implementation here ... # Example: ... # Simply override the state of the agent ... self._agent.awareness_database[self._agent_id].state = perceived_information[self._agent_id].state
- Parameters:
perceived_information (
Iterable[PoseStamped]) – Information perceived by the agent. Each agent’s state is identified by its id.
- class symaware.extra.ros.roslib.components.RosLibPrescanReadyPerceptionSystem(agent_id, ros_client, topic, callback_at_initialise_timeout=-1.0, sleep_after_status=2.0, async_loop_lock=None)[source]
Bases:
PerceptionSystem[Tasynclooplock,Bool]Perception system that interfaces with ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component.
This perception system is used to receive the flag from Prescan, to know when the simulation is ready. The information will be stored in the agent’s knowledge database with the key “prescan_status”.
- Parameters:
agent_id (
int) – Identifier of the agent this component belongs toros_client (
RosClient) – ROS client to use for the communicationtopic (
str) – ROS topic the perception will subscribe to, e.g. /my_topiccallback_at_initialise_timeout (
float, default:-1.0) – If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message from the topic it is subscribed to or the timeout expires. Useful to coordinate when expecting some mandatory data from another ROS node. Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop.async_loop_lock (
Optional[TypeVar(Tasynclooplock, bound=AsyncLoopLock)], default:None) – Async loop lock to use for the communication receiver
- _on_subscription_callback(message)[source]
Callback that adds messages to the queue in a thread-safe manner.
- _update(perceived_information)[source]
Update the agent’s model with the new perceived information.
Example
A new perception system could decide to override the default
_update()method.>>> from symaware.base import PerceptionSystem, StateObservation, Identifier >>> class MyPerceptionSystem(PerceptionSystem): ... def _update(self, perceived_information: "dict[Identifier, StateObservation]"): ... # Your implementation here ... # Example: ... # Simply override the state of the agent ... self._agent.awareness_database[self._agent_id].state = perceived_information[self._agent_id].state
- class symaware.extra.ros.roslib.components.RosLibPrescanStatusPerceptionSystem(agent_id, ros_client, topic, callback_at_initialise_timeout=-1.0, async_loop_lock=None)[source]
Bases:
PerceptionSystem[Tasynclooplock,PrescanStatus]Perception system that interfaces with ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component.
This perception system is used to receive the state of Prescan, to know when the simulation is ready. The information will be stored in the agent’s knowledge database with the key “prescan_status”.
- Parameters:
agent_id (
int) – Identifier of the agent this component belongs toros_client (
RosClient) – ROS client to use for the communicationtopic (
str) – ROS topic the perception will subscribe to, e.g. /my_topiccallback_at_initialise_timeout (
float, default:-1.0) – If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message from the topic it is subscribed to or the timeout expires. Useful to coordinate when expecting some mandatory data from another ROS node. Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop.async_loop_lock (
Optional[TypeVar(Tasynclooplock, bound=AsyncLoopLock)], default:None) – Async loop lock to use for the communication receiver
- _update(perceived_information)[source]
Update the agent’s model with the new perceived information.
Example
A new perception system could decide to override the default
_update()method.>>> from symaware.base import PerceptionSystem, StateObservation, Identifier >>> class MyPerceptionSystem(PerceptionSystem): ... def _update(self, perceived_information: "dict[Identifier, StateObservation]"): ... # Your implementation here ... # Example: ... # Simply override the state of the agent ... self._agent.awareness_database[self._agent_id].state = perceived_information[self._agent_id].state
- Parameters:
perceived_information (
Iterable[PrescanStatus]) – Information perceived by the agent. Each agent’s state is identified by its id.
- class symaware.extra.ros.roslib.components.RosLibTrafficSignPerceptionSystem(agent_id, ros_client, topic, callback_at_initialise_timeout=-1.0, async_loop_lock=None)[source]
Bases:
PerceptionSystem[Tasynclooplock,Odometry]Perception system that interfaces with ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component.
This perception system is used to perceive the odometry information. The information will be stored in the agent’s self state.
- Parameters:
agent_id (
int) – Identifier of the agent this component belongs toros_client (
RosClient) – ROS client to use for the communicationtopic (
str) – ROS topic the perception will subscribe to, e.g. /my_topicasync_loop_lock (
Optional[TypeVar(Tasynclooplock, bound=AsyncLoopLock)], default:None) – Async loop lock to use for the communication receiver
- _update(perceived_information)[source]
Update the agent’s model with the new perceived information.
Example
A new perception system could decide to override the default
_update()method.>>> from symaware.base import PerceptionSystem, StateObservation, Identifier >>> class MyPerceptionSystem(PerceptionSystem): ... def _update(self, perceived_information: "dict[Identifier, StateObservation]"): ... # Your implementation here ... # Example: ... # Simply override the state of the agent ... self._agent.awareness_database[self._agent_id].state = perceived_information[self._agent_id].state
symaware.extra.ros.roslib.data_structures module
- class symaware.extra.ros.roslib.data_structures.AckermannDrive(steering_angle=0.0, steering_angle_velocity=0.0, speed=0.0, acceleration=0.0, jerk=0.0)[source]
Bases:
RosMessageDataDriving command for a car-like vehicle using Ackermann steering
Assumes Ackermann front-wheel steering. The left and right front wheels are generally at different angles. To simplify, the commanded angle corresponds to the yaw of a virtual wheel located at the center of the front axle, like on a tricycle. Positive yaw is to the left. (This is not the angle of the steering wheel inside the passenger compartment.)
Zero steering angle velocity means change the steering angle as quickly as possible. Positive velocity indicates a desired absolute rate of change either left or right. The controller tries not to exceed this limit in either direction, but sometimes it might.
Drive at requested speed, acceleration and jerk (the 1st, 2nd and 3rd derivatives of position). All are measured at the vehicle’s center of rotation, typically the center of the rear axle. The controller tries not to exceed these limits in either direction, but sometimes it might.
Speed is the desired scalar magnitude of the velocity vector. Direction is forward unless the sign is negative, indicating reverse.
Zero acceleration means change speed as quickly as possible. Positive acceleration indicates a desired absolute magnitude; that includes deceleration.
Zero jerk means change acceleration as quickly as possible. Positive jerk indicates a desired absolute rate of acceleration change in either direction (increasing or decreasing).
- Parameters:
steering_angle (
float, default:0.0) – desired virtual angle (radians)steering_angle_velocity (
float, default:0.0) – desired rate of change (radians/s)speed (
float, default:0.0) – desired forward speed (m/s)acceleration (
float, default:0.0) – desired acceleration (m/s^2)jerk (
float, default:0.0) – desired jerk (m/s^3)
- class symaware.extra.ros.roslib.data_structures.AckermannDriveStamped(header=<factory>, drive=<factory>)[source]
Bases:
RosMessageDataTime stamped drive command for robots with Ackermann steering
- Parameters:
header (
Header, default:<factory>) – Headerdrive (
AckermannDrive, default:<factory>) – AckermannDrive
- drive: AckermannDrive
- class symaware.extra.ros.roslib.data_structures.AirSensorOutput(header=<factory>, tid='', range=0.0, azimuth=0.0, elevation=0.0, velocity=0.0, heading=0.0)[source]
Bases:
RosMessageDataOutput produced by the AIR sensor
- class symaware.extra.ros.roslib.data_structures.Bool(data=False)[source]
Bases:
RosMessageDataBoolean message
- Parameters:
data (
bool, default:False) – boolean data
- class symaware.extra.ros.roslib.data_structures.Float32(data=0.0)[source]
Bases:
RosMessageData32-bit float message
- Parameters:
data (
float, default:0.0) – 32-bit float data
- class symaware.extra.ros.roslib.data_structures.Float32Array(data=<factory>, layout=<factory>)[source]
Bases:
RosMessageData32-bit float array message
- class symaware.extra.ros.roslib.data_structures.Float64(data=0.0)[source]
Bases:
RosMessageData64-bit float message
- Parameters:
data (
float, default:0.0) – 64-bit float data
- class symaware.extra.ros.roslib.data_structures.Float64Array(data=<factory>, layout=<factory>)[source]
Bases:
RosMessageData64-bit float array message
- class symaware.extra.ros.roslib.data_structures.Header(stamp=<factory>, frame_id='')[source]
Bases:
RosMessageDataMessage header with a sequence number and timestamp information
- class symaware.extra.ros.roslib.data_structures.Int32(data=0)[source]
Bases:
RosMessageData32-bit integer message
- Parameters:
data (
int, default:0) – 32-bit integer data
- class symaware.extra.ros.roslib.data_structures.Int32Array(data=<factory>, layout=<factory>)[source]
Bases:
RosMessageData32-bit integer array message
- class symaware.extra.ros.roslib.data_structures.JsonAirSensorOutput(header=<factory>, tid='', range=0.0, azimuth=0.0, elevation=0.0, velocity=0.0, heading=0.0)[source]
Bases:
RosMessageDataOutput produced by the AIR sensor, sent as a JSON string
- class symaware.extra.ros.roslib.data_structures.LaserScan(header=<factory>, angle_min=0.0, angle_max=0.0, angle_increment=0.0, time_increment=0.0, scan_time=0.0, range_min=0.0, range_max=0.0, ranges=<factory>, intensities=<factory>)[source]
Bases:
RosMessageDataSingle scan from a planar laser range-finder
If you have another ranging device with different behavior (e.g. a sonar array), please find or create a different message, since applications will make fairly laser-specific assumptions about this data
- Parameters:
header (
Header, default:<factory>) – Timestamp in the header is the acquisition time of the first ray in the scan. In frame frame_id, angles are measured around the positive Z axis (counterclockwise, if Z is up) with zero angle being forward along the x axisangle_min (
float, default:0.0) – Start angle of the scan [rad]angle_max (
float, default:0.0) – End angle of the scan [rad]angle_increment (
float, default:0.0) – Angular distance between measurements [rad]time_increment (
float, default:0.0) – Time between measurements [seconds] - if your scanner is moving, this will be used in interpolating position of 3d pointsscan_time (
float, default:0.0) – time between scans [seconds]range_min (
float, default:0.0) – minimum range value [m]range_max (
float, default:0.0) – maximum range value [m]ranges (
list[float], default:<factory>) – range data [m] (Note: values < range_min or > range_max should be discarded)intensities (
list[float], default:<factory>) – intensity data [device-specific units]. If your device does not provide intensities, please leave the array empty.
- class symaware.extra.ros.roslib.data_structures.Odometry(header=<factory>, child_frame_id='', pose=<factory>, twist=<factory>)[source]
Bases:
RosMessageDataThis represents an estimate of a position and velocity in free space. The pose in this message should be specified in the coordinate frame given by header.frame_id. The twist in this message should be specified in the coordinate frame given by the child_frame_id
- Parameters:
header (
Header, default:<factory>) – Headerchild_frame_id (
str, default:'') – stringpose (
PoseWithCovariance, default:<factory>) – PoseWithCovariancetwist (
TwistWithCovariance, default:<factory>) – TwistWithCovariance
- static msg_type()[source]
Return ROS message type string. Must be called on a subclass.
- Return type:
- pose: PoseWithCovariance
- twist: TwistWithCovariance
- class symaware.extra.ros.roslib.data_structures.Path(header=<factory>, poses=<factory>)[source]
Bases:
RosMessageDataList of points that make up a path
- class symaware.extra.ros.roslib.data_structures.PathStamped(header=<factory>, poses=<factory>)[source]
Bases:
RosMessageDataAn array of poses that represents a Path for a robot to follow
- static msg_type()[source]
Return ROS message type string. Must be called on a subclass.
- Return type:
- poses: list[PoseStamped]
- class symaware.extra.ros.roslib.data_structures.Point(x=0.0, y=0.0, z=0.0)[source]
Bases:
RosMessageData3-Dimensional point
- class symaware.extra.ros.roslib.data_structures.Pose(position=<factory>, orientation=<factory>)[source]
Bases:
RosMessageDataPose, combining both a position and an orientation
- static msg_type()[source]
Return ROS message type string. Must be called on a subclass.
- Return type:
- orientation: Quaternion
- class symaware.extra.ros.roslib.data_structures.PoseArray(header=<factory>, poses=<factory>)[source]
Bases:
RosMessageDataAn array of poses that represents a set of points in space
- class symaware.extra.ros.roslib.data_structures.PoseStamped(header=<factory>, pose=<factory>)[source]
Bases:
RosMessageDataA Pose with reference coordinate frame and timestamp
- class symaware.extra.ros.roslib.data_structures.PoseWithCovariance(pose=<factory>, covariance=<factory>)[source]
Bases:
RosMessageDataThis represents a pose in free space with uncertainty.
The pose is defined by a position and orientation.
- Parameters:
- class symaware.extra.ros.roslib.data_structures.PrescanStatus(ready=False, stop=False)[source]
Bases:
RosMessageDataWhether the simulation is ready to start
- class symaware.extra.ros.roslib.data_structures.Quaternion(x=0.0, y=0.0, z=0.0, w=0.0)[source]
Bases:
RosMessageDataQuaternion rotation
- class symaware.extra.ros.roslib.data_structures.RosLibMessage(sender_id, receiver_id, data)[source]
Bases:
MessageSimple class to store messages between agents.
- Parameters:
sender_id (
int) – Id of the sender agentreceiver_id (
int) – Id of the receiver agentdata (
RosMessageData) – ROS message
- data: RosMessageData
- class symaware.extra.ros.roslib.data_structures.RosMessageData[source]
Bases:
ABC
- class symaware.extra.ros.roslib.data_structures.String(data='')[source]
Bases:
RosMessageDataString message
- Parameters:
data (
str, default:'') – string data
- class symaware.extra.ros.roslib.data_structures.Time(sec=<factory>, nanosec=<factory>)[source]
Bases:
RosMessageDataTimestamp, using both seconds and nanoseconds
- class symaware.extra.ros.roslib.data_structures.TrafficLightState(header=<factory>, id='', position=<factory>, status='')[source]
Bases:
RosMessageDataState of a traffic light
- class symaware.extra.ros.roslib.data_structures.Twist(linear=<factory>, angular=<factory>)[source]
Bases:
RosMessageDataThis expresses velocity in free space broken into its linear and angular parts.
- Parameters:
- class symaware.extra.ros.roslib.data_structures.TwistWithCovariance(twist=<factory>, covariance=<factory>)[source]
Bases:
RosMessageDataThis expresses velocity in free space with uncertainty.
- Parameters:
- class symaware.extra.ros.roslib.data_structures.Vector3(x, y, z)[source]
Bases:
RosMessageData3-Dimensional vector
symaware.extra.ros.roslib.models module
- class symaware.extra.ros.roslib.models.RosLibAckermannDriveModel(ID, topic='', compression=None, latch=False, throttle_rate=0, queue_size=0, reconnect_on_close=True)[source]
Bases:
AckermannDriveModel[AckermannDriveStamped]Dynamical model using the AckermannDrive type messages as a control input.
- class symaware.extra.ros.roslib.models.RosLibOdometryeModel(ID, topic='', compression=None, latch=False, throttle_rate=0, queue_size=0, reconnect_on_close=True)[source]
Bases:
OdometryModel[Odometry]Dynamical model using the twist type messages as a control input.
- class symaware.extra.ros.roslib.models.RosLibTwistDriveModel(ID, topic='', compression=None, latch=False, throttle_rate=0, queue_size=0, reconnect_on_close=True)[source]
Bases:
TwistDriveModel[Twist]Dynamical model using the twist type messages as a control input.
symaware.extra.ros.roslib.ros_client module
- class symaware.extra.ros.roslib.ros_client.RosLibClient(host='localhost', port=9090, is_secure=False)[source]
Bases:
RosClient[RosMessageData,Topic,Topic]Global ROS client used to communicate with the [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) node. It should be initialised once using the with construct. This ensures that the client will disconnect gracefully regardless any error.
Example
Use the RosClient
>>> from symaware.extra.ros import RosClient >>> def main(): ... # Add your communication components in here ... pass >>> if __name__ == "__main__": ... with RosClient(host="localhost", port=9090) as ros: ... main(ros)
- Parameters:
- property is_connected
Whether client is connected to ROS bridge.
- publish(publisher, message)[source]
Publish the given message to the provided publisher.
- Parameters:
publisher (
Topic) – Publisher to use to send the messagemessage (
RosMessageData) – Message to publish to the ROS topics
- publish_topic(topic_name, message_type, compression=None, latch=False, throttle_rate=0, queue_size=0, reconnect_on_close=True)[source]
Create and advertise a new ROS topic with the given name and message_type.
- Parameters:
topic_name (
str) – ROS topic the sender publish to, e.g. /my_topicmessage_type (
type[RosMessageData]) – Type of messages the sender will publish, e.g. std_msgs/Stringcompression (
Any, default:None) – Type of compression to use, e.g. pnglatch (
bool, default:False) – True to latch the topic when publishing, False otherwisethrottle_rate (
int, default:0) – Rate (in ms between messages) at which to throttle the topicsqueue_size (
int, default:0) – Queue size created at bridge side for re-publishing webtopicsreconnect_on_close (
bool, default:True) – Reconnect topic (both publisher & subscriber) on reconnection
- Return type:
Topic
- property ros: roslibpy.Ros
Underlying roslibpy.Ros instance.
- subscribe_topic(topic_name, message_type, callback, compression=None, throttle_rate=0, queue_size=0, reconnect_on_close=True)[source]
Subscribe to the ROS topic with the given name and message_type.
- Parameters:
topic_name (
str) – ROS topic the sender publish to, e.g. /my_topicmessage_type (
type[RosMessageData]) – Type of messages the sender will publish, e.g. std_msgs/Stringcompression (
Any, default:None) – Type of compression to use, e.g. pngthrottle_rate (
int, default:0) – Rate (in ms between messages) at which to throttle the topicsqueue_size (
int, default:0) – Queue size created at bridge side for re-publishing webtopicsqueue_size – Queue length at bridge side used when subscribing
reconnect_on_close (
bool, default:True) – Reconnect topic (both publisher & subscriber) on reconnection