symaware.extra.ros.ros1 package
Submodules
symaware.extra.ros.ros1.components module
- class symaware.extra.ros.ros1.components.Ros1AirSensorOutputPerceptionSystem(agent_id, ros_client, topic, callback_at_initialise_timeout=-1.0, async_loop_lock=None)[source]
Bases:
PerceptionSystem[Tasynclooplock,AirSensorOutput]Perception system that interfaces with ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component.
This perception system is used to perceive output of the AIR sensor. The information will be stored in the agent’s knowledge database with the key “air_sensor_output”.
- Parameters:
agent_id (
int) – Identifier of the agent this component belongs toros_client (
Ros1Client) – ROS client to use for the communicationtopic (
str) – ROS topic the perception will subscribe to, e.g. /my_topiccallback_at_initialise_timeout (
float, default:-1.0) – If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message from the topic it is subscribed to or the timeout expires. Useful to coordinate when expecting some mandatory data from another ROS node. Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop.async_loop_lock (
Optional[TypeVar(Tasynclooplock, bound=AsyncLoopLock)], default:None) – Async loop lock to use for the communication receiver
- _update(perceived_information)[source]
Update the agent’s model with the new perceived information.
Example
A new perception system could decide to override the default
_update()method.>>> from symaware.base import PerceptionSystem, StateObservation, Identifier >>> class MyPerceptionSystem(PerceptionSystem): ... def _update(self, perceived_information: "dict[Identifier, StateObservation]"): ... # Your implementation here ... # Example: ... # Simply override the state of the agent ... self._agent.awareness_database[self._agent_id].state = perceived_information[self._agent_id].state
- Parameters:
perceived_information (
Iterable[AirSensorOutput]) – Information perceived by the agent. Each agent’s state is identified by its id.
- class symaware.extra.ros.ros1.components.Ros1JsonAirSensorOutputPerceptionSystem(agent_id, ros_client, topic, callback_at_initialise_timeout=-1.0, async_loop_lock=None)[source]
Bases:
PerceptionSystem[Tasynclooplock,JsonAirSensorOutput]Perception system that interfaces with ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component.
This perception system is used to perceive output of the AIR sensor, sent using JSON. The information will be stored in the agent’s knowledge database with the key “air_sensor_output”.
- Parameters:
agent_id (
int) – Identifier of the agent this component belongs toros_client (
Ros1Client) – ROS client to use for the communicationtopic (
str) – ROS topic the perception will subscribe to, e.g. /my_topiccallback_at_initialise_timeout (
float, default:-1.0) – If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message from the topic it is subscribed to or the timeout expires. Useful to coordinate when expecting some mandatory data from another ROS node. Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop.async_loop_lock (
Optional[TypeVar(Tasynclooplock, bound=AsyncLoopLock)], default:None) – Async loop lock to use for the communication receiver
- _update(perceived_information)[source]
Update the agent’s model with the new perceived information.
Example
A new perception system could decide to override the default
_update()method.>>> from symaware.base import PerceptionSystem, StateObservation, Identifier >>> class MyPerceptionSystem(PerceptionSystem): ... def _update(self, perceived_information: "dict[Identifier, StateObservation]"): ... # Your implementation here ... # Example: ... # Simply override the state of the agent ... self._agent.awareness_database[self._agent_id].state = perceived_information[self._agent_id].state
- Parameters:
perceived_information (
Iterable[JsonAirSensorOutput]) – Information perceived by the agent. Each agent’s state is identified by its id.
- class symaware.extra.ros.ros1.components.Ros1OdometryPerceptionSystem(agent_id, ros_client, topic, callback_at_initialise_timeout=-1.0, async_loop_lock=None)[source]
Bases:
PerceptionSystem[Tasynclooplock,Odometry]Perception system that interfaces with ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component.
This perception system is used to perceive the odometry information. The information will be stored in the agent’s self state.
- Parameters:
agent_id (
int) – Identifier of the agent this component belongs toros_client (
Ros1Client) – ROS client to use for the communicationtopic (
str) – ROS topic the perception will subscribe to, e.g. /my_topicasync_loop_lock (
Optional[TypeVar(Tasynclooplock, bound=AsyncLoopLock)], default:None) – Async loop lock to use for the communication receiver
- _update(perceived_information)[source]
Update the agent’s model with the new perceived information.
Example
A new perception system could decide to override the default
_update()method.>>> from symaware.base import PerceptionSystem, StateObservation, Identifier >>> class MyPerceptionSystem(PerceptionSystem): ... def _update(self, perceived_information: "dict[Identifier, StateObservation]"): ... # Your implementation here ... # Example: ... # Simply override the state of the agent ... self._agent.awareness_database[self._agent_id].state = perceived_information[self._agent_id].state
- Parameters:
perceived_information (
Iterable[Odometry]) – Information perceived by the agent. Each agent’s state is identified by its id.
- class symaware.extra.ros.ros1.components.Ros1PathPerceptionSystem(agent_id, ros_client, topic, callback_at_initialise_timeout=-1.0, async_loop_lock=None)[source]
Bases:
PerceptionSystem[Tasynclooplock,Path]Perception system that interfaces with ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component.
This perception system is used to perceive the path for the agent to follow. The information will be stored in the agent’s knowledge database with the key “path”.
- Parameters:
agent_id (
int) – Identifier of the agent this component belongs toros_client (
Ros1Client) – ROS client to use for the communicationtopic (
str) – ROS topic the perception will subscribe to, e.g. /my_topiccallback_at_initialise_timeout (
float, default:-1.0) – If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message from the topic it is subscribed to or the timeout expires. Useful to coordinate when expecting some mandatory data from another ROS node. Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop.async_loop_lock (
Optional[TypeVar(Tasynclooplock, bound=AsyncLoopLock)], default:None) – Async loop lock to use for the communication receiver
- _update(perceived_information)[source]
Update the agent’s model with the new perceived information.
Example
A new perception system could decide to override the default
_update()method.>>> from symaware.base import PerceptionSystem, StateObservation, Identifier >>> class MyPerceptionSystem(PerceptionSystem): ... def _update(self, perceived_information: "dict[Identifier, StateObservation]"): ... # Your implementation here ... # Example: ... # Simply override the state of the agent ... self._agent.awareness_database[self._agent_id].state = perceived_information[self._agent_id].state
- Parameters:
perceived_information (
Iterable[Path]) – Information perceived by the agent. Each agent’s state is identified by its id.
- class symaware.extra.ros.ros1.components.Ros1PathStampedPerceptionSystem(agent_id, ros_client, topic, callback_at_initialise_timeout=-1.0, async_loop_lock=None)[source]
Bases:
PerceptionSystem[Tasynclooplock,PathStamped]Perception system that interfaces with ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component.
This perception system is used to perceive the path stamped for the agent to follow. The information will be stored in the agent’s knowledge database with the key “path”.
- Parameters:
agent_id (
int) – Identifier of the agent this component belongs toros_client (
Ros1Client) – ROS client to use for the communicationtopic (
str) – ROS topic the perception will subscribe to, e.g. /my_topiccallback_at_initialise_timeout (
float, default:-1.0) – If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message from the topic it is subscribed to or the timeout expires. Useful to coordinate when expecting some mandatory data from another ROS node. Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop.async_loop_lock (
Optional[TypeVar(Tasynclooplock, bound=AsyncLoopLock)], default:None) – Async loop lock to use for the communication receiver
- _update(perceived_information)[source]
Update the agent’s model with the new perceived information.
Example
A new perception system could decide to override the default
_update()method.>>> from symaware.base import PerceptionSystem, StateObservation, Identifier >>> class MyPerceptionSystem(PerceptionSystem): ... def _update(self, perceived_information: "dict[Identifier, StateObservation]"): ... # Your implementation here ... # Example: ... # Simply override the state of the agent ... self._agent.awareness_database[self._agent_id].state = perceived_information[self._agent_id].state
- Parameters:
perceived_information (
Iterable[PathStamped]) – Information perceived by the agent. Each agent’s state is identified by its id.
- class symaware.extra.ros.ros1.components.Ros1PosePerceptionSystem(agent_id, ros_client, topic, callback_at_initialise_timeout=-1.0, async_loop_lock=None)[source]
Bases:
PerceptionSystem[Tasynclooplock,Pose]Perception system that interfaces with ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component.
This perception system is used to perceive the pose of the agent. The information will be stored in the agent’s self state.
- Parameters:
agent_id (
int) – Identifier of the agent this component belongs toros_client (
Ros1Client) – ROS client to use for the communicationtopic (
str) – ROS topic the perception will subscribe to, e.g. /my_topiccallback_at_initialise_timeout (
float, default:-1.0) – If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message from the topic it is subscribed to or the timeout expires. Useful to coordinate when expecting some mandatory data from another ROS node. Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop.async_loop_lock (
Optional[TypeVar(Tasynclooplock, bound=AsyncLoopLock)], default:None) – Async loop lock to use for the communication receiver
- _update(perceived_information)[source]
Update the agent’s model with the new perceived information.
Example
A new perception system could decide to override the default
_update()method.>>> from symaware.base import PerceptionSystem, StateObservation, Identifier >>> class MyPerceptionSystem(PerceptionSystem): ... def _update(self, perceived_information: "dict[Identifier, StateObservation]"): ... # Your implementation here ... # Example: ... # Simply override the state of the agent ... self._agent.awareness_database[self._agent_id].state = perceived_information[self._agent_id].state
- Parameters:
perceived_information (
Iterable[Pose]) – Information perceived by the agent. Each agent’s state is identified by its id.
- class symaware.extra.ros.ros1.components.Ros1PoseStampedPerceptionSystem(agent_id, ros_client, topic, callback_at_initialise_timeout=-1.0, async_loop_lock=None)[source]
Bases:
PerceptionSystem[Tasynclooplock,PoseStamped]Perception system that interfaces with ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component.
This perception system is used to perceive the pose stamped information. The information will be stored in the agent’s self state.
- Parameters:
agent_id (
int) – Identifier of the agent this component belongs toros_client (
Ros1Client) – ROS client to use for the communicationtopic (
str) – ROS topic the perception will subscribe to, e.g. /my_topicasync_loop_lock (
Optional[TypeVar(Tasynclooplock, bound=AsyncLoopLock)], default:None) – Async loop lock to use for the communication receiver
- _update(perceived_information)[source]
Update the agent’s model with the new perceived information.
Example
A new perception system could decide to override the default
_update()method.>>> from symaware.base import PerceptionSystem, StateObservation, Identifier >>> class MyPerceptionSystem(PerceptionSystem): ... def _update(self, perceived_information: "dict[Identifier, StateObservation]"): ... # Your implementation here ... # Example: ... # Simply override the state of the agent ... self._agent.awareness_database[self._agent_id].state = perceived_information[self._agent_id].state
- Parameters:
perceived_information (
Iterable[PoseStamped]) – Information perceived by the agent. Each agent’s state is identified by its id.
- class symaware.extra.ros.ros1.components.Ros1PrescanReadyPerceptionSystem(agent_id, ros_client, topic, callback_at_initialise_timeout=-1.0, async_loop_lock=None)[source]
Bases:
PerceptionSystem[Tasynclooplock,Bool]Perception system that interfaces with ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component.
This perception system is used to receive the flag from Prescan, to know when the simulation is ready. The information will be stored in the agent’s knowledge database with the key “prescan_status”.
- Parameters:
agent_id (
int) – Identifier of the agent this component belongs toros_client (
Ros1Client) – ROS client to use for the communicationtopic (
str) – ROS topic the perception will subscribe to, e.g. /my_topiccallback_at_initialise_timeout (
float, default:-1.0) – If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message from the topic it is subscribed to or the timeout expires. Useful to coordinate when expecting some mandatory data from another ROS node. Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop.async_loop_lock (
Optional[TypeVar(Tasynclooplock, bound=AsyncLoopLock)], default:None) – Async loop lock to use for the communication receiver
- _update(perceived_information)[source]
Update the agent’s model with the new perceived information.
Example
A new perception system could decide to override the default
_update()method.>>> from symaware.base import PerceptionSystem, StateObservation, Identifier >>> class MyPerceptionSystem(PerceptionSystem): ... def _update(self, perceived_information: "dict[Identifier, StateObservation]"): ... # Your implementation here ... # Example: ... # Simply override the state of the agent ... self._agent.awareness_database[self._agent_id].state = perceived_information[self._agent_id].state
- Parameters:
perceived_information (
Iterable[Bool]) – Information perceived by the agent. Each agent’s state is identified by its id.
- class symaware.extra.ros.ros1.components.Ros1PrescanStatusPerceptionSystem(agent_id, ros_client, topic, callback_at_initialise_timeout=-1.0, async_loop_lock=None)[source]
Bases:
PerceptionSystem[Tasynclooplock,PrescanStatus]Perception system that interfaces with ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component.
This perception system is used to receive the state of Prescan, to know when the simulation is ready. The information will be stored in the agent’s knowledge database with the key “prescan_status”.
- Parameters:
agent_id (
int) – Identifier of the agent this component belongs toros_client (
Ros1Client) – ROS client to use for the communicationtopic (
str) – ROS topic the perception will subscribe to, e.g. /my_topiccallback_at_initialise_timeout (
float, default:-1.0) – If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message from the topic it is subscribed to or the timeout expires. Useful to coordinate when expecting some mandatory data from another ROS node. Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop.async_loop_lock (
Optional[TypeVar(Tasynclooplock, bound=AsyncLoopLock)], default:None) – Async loop lock to use for the communication receiver
- _update(perceived_information)[source]
Update the agent’s model with the new perceived information.
Example
A new perception system could decide to override the default
_update()method.>>> from symaware.base import PerceptionSystem, StateObservation, Identifier >>> class MyPerceptionSystem(PerceptionSystem): ... def _update(self, perceived_information: "dict[Identifier, StateObservation]"): ... # Your implementation here ... # Example: ... # Simply override the state of the agent ... self._agent.awareness_database[self._agent_id].state = perceived_information[self._agent_id].state
- Parameters:
perceived_information (
Iterable[PrescanStatus]) – Information perceived by the agent. Each agent’s state is identified by its id.
symaware.extra.ros.ros1.data_structures module
symaware.extra.ros.ros1.models module
- class symaware.extra.ros.ros1.models.Ros1AckermannDriveModel(ID, topic='', compression=None, latch=False, throttle_rate=0, queue_size=0, reconnect_on_close=True)[source]
Bases:
AckermannDriveModel[AckermannDriveStamped]Dynamical model using the AckermannDrive type messages as a control input.
- class symaware.extra.ros.ros1.models.Ros1TwistDriveModel(ID, topic='', compression=None, latch=False, throttle_rate=0, queue_size=0, reconnect_on_close=True)[source]
Bases:
TwistDriveModel[Twist]Dynamical model using the twist type messages as a control input.
symaware.extra.ros.ros1.ros_client module
- class symaware.extra.ros.ros1.ros_client.Ros1Client[source]
Bases:
RosClient[object,Subscriber,Publisher]Global ROS client used to communicate with the [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) node. It should be initialised once using the with construct. This ensures that the client will disconnect gracefully regardless any error.
Example
Use the RosClient
>>> from symaware.extra.ros import RosClient >>> def main(): ... # Add your communication components in here ... pass >>> if __name__ == "__main__": ... with RosClient(host="localhost", port=9090) as ros: ... main(ros)
- Parameters:
host – Name or IP address of the ROS bridge host, e.g. localhost, 127.0.0.1, 192.168.1.2, my.works.station
port – ROS bridge port, e.g. 9090
is_secure – Whether to use a secure web sockets connection
- publish(publisher, message)[source]
Publish the given message to the provided publisher.
- Parameters:
publisher (
Publisher) – Publisher to use to send the messagemessage (
object) – Message to publish to the ROS topics
- publish_topic(topic_name, message_type, compression=None, latch=False, throttle_rate=0, queue_size=0, reconnect_on_close=True)[source]
Create and advertise a new ROS topic with the given name and message_type.
- Parameters:
topic_name (
str) – ROS topic the sender publish to, e.g. /my_topicmessage_type (
type) – Type of messages the sender will publish, e.g. std_msgs/Stringcompression (
Any, default:None) – Type of compression to use, e.g. pnglatch (
bool, default:False) – True to latch the topic when publishing, False otherwisethrottle_rate (
int, default:0) – Rate (in ms between messages) at which to throttle the topicsqueue_size (
int, default:0) – Queue size created at bridge side for re-publishing webtopicsreconnect_on_close (
bool, default:True) – Reconnect topic (both publisher & subscriber) on reconnection
- Return type:
Publisher
- subscribe_topic(topic_name, message_type, callback, compression=None, throttle_rate=0, queue_size=0, reconnect_on_close=True)[source]
Subscribe to the ROS topic with the given name and message_type.
- Parameters:
topic_name (
str) – ROS topic the sender publish to, e.g. /my_topicmessage_type (
type) – Type of messages the sender will publish, e.g. std_msgs/Stringcompression (
Any, default:None) – Type of compression to use, e.g. pngthrottle_rate (
int, default:0) – Rate (in ms between messages) at which to throttle the topicsqueue_size (
int, default:0) – Queue length at bridge side used when subscribingreconnect_on_close (
bool, default:True) – Reconnect topic (both publisher & subscriber) on reconnection
- Return type:
Subscriber