symaware.extra.ros.components package

Submodules

symaware.extra.ros.components.communication_receiver module

class symaware.extra.ros.components.communication_receiver.RosCommunicationReceiver(agent_id, ros_client, topic, message_type, compression=None, throttle_rate=0, queue_size=100, reconnect_on_close=True, async_loop_lock=None)[source]

Bases: QueueCommunicationReceiver[Tasynclooplock], Generic[Tasynclooplock, TMessage]

Communication receiver for ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to communicate with ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component.

Example

Create a communication receiver for the “/path” topic of type Path

>>>
>>> from symaware.base import DefaultPerceptionSystem, DefaultRiskEstimator, DefaultUncertaintyEstimator
>>> agent_id = 0
>>> env = Environment(connection_method=p.DIRECT, async_loop_lock=TimeIntervalAsyncLoopLock(0.1))
>>> entity = RacecarEntity(agent_id, model=RacecarModel(agent_id))
>>> agent = Agent[KnowledgeDatabase](agent_id, entity)
>>> env.add_agents(agent)
>>> agent.add_components(
...     DefaultController(agent_id, TimeIntervalAsyncLoopLock(0.1)),
...     DefaultRiskEstimator(agent_id, TimeIntervalAsyncLoopLock(0.1)),
...     DefaultUncertaintyEstimator(agent_id, TimeIntervalAsyncLoopLock(0.1)),
...     DefaultPerceptionSystem(agent_id, env, TimeIntervalAsyncLoopLock(0.1)),
...     DefaultCommunicationSender(agent_id, TimeIntervalAsyncLoopLock(0.1)),
...     DefaultCommunicationReceiver(agent_id, TimeIntervalAsyncLoopLock(0.1)),
... )
>>> agent.initialise_agent(
...     initial_awareness_database=AwarenessVector(agent_id, np.zeros(7)),
...     initial_knowledge_database={agent_id: KnowledgeDatabase(goal_reached=False, goal_pos=np.zeros(3))},
... )
>>> AgentCoordinator(env, agent).run()
Parameters:
  • agent_id (int) – Identifier of the agent this component belongs to

  • topic (str) – ROS topic the receiver will subscribe to, e.g. /my_topic

  • message_type (type[TypeVar(TMessage)]) – Type of messages the receiver expects

  • compression (Any, default: None) – Type of compression to use, e.g. png

  • throttle_rate (int, default: 0) – Rate (in ms between messages) at which to throttle the topics

  • queue_size (int, default: 100) – Queue size created at bridge side for re-publishing webtopics

  • reconnect_on_close (bool, default: True) – Reconnect topic (both publisher & subscriber) on reconnection

  • async_loop_lock (Optional[TypeVar(Tasynclooplock, bound= AsyncLoopLock)], default: None) – Async loop lock to use for the communication receiver

_decode_message(*messages)[source]

The messages received from the communication channel are decoded to produce the output of the _compute() method. The default implementation assumes that the messages are of type InfoMessage, and produces a dictionary of Awareness vector and Knowledge database.

Although the method must be implemented, you may choose to sure the super implementation if the messages are of type InfoMessage.

Example

To support a new message type, you need to implement the _decode_message() method.

>>> from symaware.base import CommunicationReceiver, Message, InfoMessage, Identifier
>>> class MyCommunicationReceiver(CommunicationReceiver):
...     def _decode_message(self, *messages: Message) -> "dict[Identifier, str]":
...         # Your implementation here
...         # Example:
...         # Decode a message of type InfoMessage
...         res = {}
...         for message in messages:
...             if isinstance(message, InfoMessage):
...                 res[message.sender_id] = (message.awareness_database, message.knowledge_database)
...             else:
...                 raise TypeError(f"Unknown message type {type(message)}")
...         return res
Parameters:

messages (RosMessage) – Messages to parse

Returns:

Iterable[TypeVar(TMessage)] – Dictionary of Awareness vector and Knowledge database. The key is the sender id and the value is a tuple of Awareness vector and Knowledge database

_on_subscription_callback(message)[source]
initialise_component(agent, initial_awareness_database, initial_knowledge_database)[source]

Initialize the component with some custom logic. For example you can instantiate new attributes and properties of the component. This function is called upon initialization by the Agent. To get the agent’s initial awareness and knowledge database you can use the arguments of this function. Make sure to call the super method at the end of your implementation to notify the subscribers of the event.

Note

Invoking this method will notify the subscribers of the event initialised added with add_on_initialised().

Warning

The implementation of the method in Component notifies the subscribers of the event initialised, sets the attribute _agent to the agent passed as argument and the flag _initialised to True. If you override this method, make sure to call the super method at the end of your implementation.

Example

The initialise_component() method can be overwritten to provide some custom initialisation logic.

>>> import time
>>> import numpy as np
>>> from symaware.base import Component
>>> class MyComponent(Component):
...     def initialise_component(
...            self, agent, initial_awareness_database, initial_knowledge_database
...         ):
...         self._my_model = agent.entity.model
...         self._my_state = initial_awareness_database[self.agent_id].state
...         self._my_time = time.time()
...         self._my_list = []
...         super().initialise_component(entity, initial_awareness_database, initial_knowledge_database)
Parameters:
  • agent – Agent this component has been attached to

  • initial_awareness_database – Awareness database of the agent

  • initial_knowledge_database – Knowledge database of the agent

property message_type: type[TMessage]

Message type the receiver expects.

property topic: str

ROS topic the receiver subscribes to.

symaware.extra.ros.components.communication_sender module

class symaware.extra.ros.components.communication_sender.RosCommunicationSender(agent_id, ros_client, topic, message_type, compression=None, latch=False, throttle_rate=0, queue_size=0, reconnect_on_close=True, async_loop_lock=None)[source]

Bases: CommunicationSender[Tasynclooplock], Generic[Tasynclooplock, TMessage]

Communication sender for ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to communicate with ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component.

Example

Create a communication receiver for the “/path” topic of type Path

>>>
>>> from symaware.base import DefaultPerceptionSystem, DefaultRiskEstimator, DefaultUncertaintyEstimator
>>> agent_id = 0
>>> env = Environment(connection_method=p.DIRECT, async_loop_lock=TimeIntervalAsyncLoopLock(0.1))
>>> entity = RacecarEntity(agent_id, model=RacecarModel(agent_id))
>>> agent = Agent[KnowledgeDatabase](agent_id, entity)
>>> env.add_agents(agent)
>>> agent.add_components(
...     DefaultController(agent_id, TimeIntervalAsyncLoopLock(0.1)),
...     DefaultRiskEstimator(agent_id, TimeIntervalAsyncLoopLock(0.1)),
...     DefaultUncertaintyEstimator(agent_id, TimeIntervalAsyncLoopLock(0.1)),
...     DefaultPerceptionSystem(agent_id, env, TimeIntervalAsyncLoopLock(0.1)),
...     DefaultCommunicationSender(agent_id, TimeIntervalAsyncLoopLock(0.1)),
...     DefaultCommunicationReceiver(agent_id, TimeIntervalAsyncLoopLock(0.1)),
... )
>>> agent.initialise_agent(
...     initial_awareness_database=AwarenessVector(agent_id, np.zeros(7)),
...     initial_knowledge_database={agent_id: KnowledgeDatabase(goal_reached=False, goal_pos=np.zeros(3))},
... )
>>> AgentCoordinator(env, agent).run()
Parameters:
  • agent_id (int) – Identifier of the agent this component belongs to

  • topic (str) – ROS topic the sender publish to, e.g. /my_topic

  • message_type (type[TypeVar(TMessage)]) – Type of messages the sender will publish

  • compression (Any, default: None) – Type of compression to use, e.g. png

  • throttle_rate (int, default: 0) – Rate (in ms between messages) at which to throttle the topics

  • queue_size (int, default: 0) – Queue size created at bridge side for re-publishing webtopics

  • latch (bool, default: False) – True to latch the topic when publishing, False otherwise

  • reconnect_on_close (bool, default: True) – Reconnect topic (both publisher & subscriber) on reconnection

  • async_loop_lock (Optional[TypeVar(Tasynclooplock, bound= AsyncLoopLock)], default: None) – Async loop lock to use for the communication sender

_send_communication_through_channel(message)[source]

Send a RosMessage through the ROS topic channel.

initialise_component(agent, initial_awareness_database, initial_knowledge_database)[source]

Initialize the component with some custom logic. For example you can instantiate new attributes and properties of the component. This function is called upon initialization by the Agent. To get the agent’s initial awareness and knowledge database you can use the arguments of this function. Make sure to call the super method at the end of your implementation to notify the subscribers of the event.

Note

Invoking this method will notify the subscribers of the event initialised added with add_on_initialised().

Warning

The implementation of the method in Component notifies the subscribers of the event initialised, sets the attribute _agent to the agent passed as argument and the flag _initialised to True. If you override this method, make sure to call the super method at the end of your implementation.

Example

The initialise_component() method can be overwritten to provide some custom initialisation logic.

>>> import time
>>> import numpy as np
>>> from symaware.base import Component
>>> class MyComponent(Component):
...     def initialise_component(
...            self, agent, initial_awareness_database, initial_knowledge_database
...         ):
...         self._my_model = agent.entity.model
...         self._my_state = initial_awareness_database[self.agent_id].state
...         self._my_time = time.time()
...         self._my_list = []
...         super().initialise_component(entity, initial_awareness_database, initial_knowledge_database)
Parameters:
  • agent – Agent this component has been attached to

  • initial_awareness_database – Awareness database of the agent

  • initial_knowledge_database – Knowledge database of the agent

initialise_topic()[source]

Create and advertise the ROS topic if not already initialized.

property message_type: type[TMessage]

Message type the sender publishes.

publish(message)[source]

Publish a message to the ROS topic, converting to dict if needed.

property topic: str

ROS topic the sender publishes to.

symaware.extra.ros.components.controller module

class symaware.extra.ros.components.controller.ACCPurePursuitPathController(agent_id, dt, ilimit=200, Kp=1.0, Ki=0.05, Kd=0.05, max_angle=0.5235987755982988, min_headway_break=0.7, min_headway_acc=1.4, sensor_linger_time=1.0, emergency_break_distance=0.25, default_max_speed=0, wheel_base=0.25, waypoint_complete_dist=0.5, async_loop_lock=None)[source]

Bases: PurePursuitPathController

Pure Pursuit controller that drives the vehicle following a path composed by an arbitrary amount of waypoints. It also uses Adaptive Cruise Control to avoid collisions if it equipped with an AIR sensor.

It is compatible with agents using a AckermannDriveModel. This controller needs the following entries in the knowledge database: - ‘max_speed’: Maximum speed the vehicle can reach. Defaults to default_max_speed - ‘path’: Path to follow. Mandatory - ‘air_sensor_output’: information collected from the AIR sensor. Ignored if absent

Parameters:
  • agent_id – Identifier of the agent this component belongs to

  • dt (float) – Time step

  • ilimit (float, default: 200) – Upper and lower bound for the integration in the PID controller

  • Kp (float, default: 1.0) – Coefficient for the proportional part of the PID controller

  • Ki (float, default: 0.05) – Coefficient for the integration part of the PID controller

  • Kd (float, default: 0.05) – Coefficient for the derivation part of the PID controller

  • max_angle (float, default: 0.5235987755982988) – Maximum angle in radiants from the heading of the car for the obstacle to be properly detected.

  • min_headway_break (float, default: 0.7) – Distance we want to keep from the obstacles in front of the car.

  • sensor_linger_time (float, default: 1.0) – Linger time for readings coming from the sensor to still be considered in the computation of the control input. [s] Readings older than this value will be ignored. Setting the value too low may cause the controller to drop readings that should be considered.

  • default_max_speed (float, default: 0) – Default speed if its value is not present in the ‘max_speed’ entry of the knowledge database

  • wheel_base (float, default: 0.25) – The distance between the front and rear axles of the agent, used for steering calculations.

  • waypoint_complete_dist (float, default: 0.5) – Distance threshold to consider a waypoint as passed.

  • async_loop_lock (default: None) – Async loop lock to use for the communication sender

_compute()[source]

Compute the control input for the agent. Normally the agent would use its information available at a certain time to compute a control input. This could translate in a lot of information being passed to this method.

This method must be implemented in any custom controller.

Example

Create a new controller by subclassing the Controller and implementing the _compute() method and the _update() method.

>>> from symaware.base import Controller, MultiAgentAwarenessVector, MultiAgentKnowledgeDatabase, TimeSeries
>>> import numpy as np
>>> class MyController(Controller):
...     def _compute(self) -> "tuple[np.ndarray, TimeSeries]":
...         # Your implementation here
...         # Example:
...         # Get the state of the agent
...         state = self._agent.self_state
...         # Get the goal position from the knowledge database
...         goal_pos = self._agent.self_knowledge["goal_pos"]
...         # Compute the control input as the difference between the goal position and the current state
...         control_input = goal_pos - state
...         # Return the control input and an empty TimeSeries
...         return control_input, TimeSeries()
...
...     def _update(self, control_input_and_intent: "tuple[np.ndarray, TimeSeries]"):
...         # Your implementation here
...         # Example:
...         # Simply override the control input and intent of the agent
...         control_input, intent = control_input_and_intent
...         self._agent.model.control_input = control_input
...         self._agent.self_awareness.intent = intent
Returns:

  • New state of the agent the controller wants to reach,

  • Time series of intents of the controller, Can be empty

class symaware.extra.ros.components.controller.AirStraightController(agent_id, default_max_speed, min_distance, async_loop_lock=None)[source]

Bases: StraightController[Tasynclooplock]

Naive controller that just drives straight forward at a constant speed. If the air sensor finds something in front of the vehicle, the latter will suddenly stop.

It is compatible with agents using a AckermannDriveModel. This controller needs the following entries in the knowledge database: - ‘max_speed’: Maximum speed the vehicle can reach. Defaults to default_max_speed - ‘air_sensor_output’: information collected from the AIR sensor. Ignored if absent

Parameters:
  • agent_id (int) – Identifier of the agent this component belongs to

  • default_max_speed (float) – Default speed if its value is not present in the max_speed entry of the knowledge database

  • async_loop_lock (TypeVar(Tasynclooplock, bound= AsyncLoopLock), default: None) – Async loop lock to use for the communication sender

_compute()[source]

Compute the control input for the agent. Normally the agent would use its information available at a certain time to compute a control input. This could translate in a lot of information being passed to this method.

This method must be implemented in any custom controller.

Example

Create a new controller by subclassing the Controller and implementing the _compute() method and the _update() method.

>>> from symaware.base import Controller, MultiAgentAwarenessVector, MultiAgentKnowledgeDatabase, TimeSeries
>>> import numpy as np
>>> class MyController(Controller):
...     def _compute(self) -> "tuple[np.ndarray, TimeSeries]":
...         # Your implementation here
...         # Example:
...         # Get the state of the agent
...         state = self._agent.self_state
...         # Get the goal position from the knowledge database
...         goal_pos = self._agent.self_knowledge["goal_pos"]
...         # Compute the control input as the difference between the goal position and the current state
...         control_input = goal_pos - state
...         # Return the control input and an empty TimeSeries
...         return control_input, TimeSeries()
...
...     def _update(self, control_input_and_intent: "tuple[np.ndarray, TimeSeries]"):
...         # Your implementation here
...         # Example:
...         # Simply override the control input and intent of the agent
...         control_input, intent = control_input_and_intent
...         self._agent.model.control_input = control_input
...         self._agent.self_awareness.intent = intent
Returns:

  • New state of the agent the controller wants to reach,

  • Time series of intents of the controller, Can be empty

class symaware.extra.ros.components.controller.Controller(agent_id, async_loop_lock=None)[source]

Bases: Controller[Tasynclooplock]

Base class for ROS controllers.

Parameters:
  • agent_id (int) – Identifier of the agent this component belongs to

  • async_loop_lock (TypeVar(Tasynclooplock, bound= AsyncLoopLock), default: None) – Async loop lock to use for the communication sender

_to_control_input(speed, steering_angle)[source]

Convert speed and steering angle into a model-specific control input.

Parameters:
  • speed (float) – Desired longitudinal speed.

  • steering_angle (float) – Desired steering angle.

Returns:

Control input array for the active drive model.

get_speed_and_steering_angle(control_input)[source]

Extract speed and steering angle from a model-specific control input.

Parameters:

control_input (ndarray) – Control input array from the active drive model.

Returns:

tuple[float, float] – Tuple of (speed, steering_angle).

initialise_component(agent, initial_awareness_database, initial_knowledge_database)[source]

Initialize the component with some custom logic. For example you can instantiate new attributes and properties of the component. This function is called upon initialization by the Agent. To get the agent’s initial awareness and knowledge database you can use the arguments of this function. Make sure to call the super method at the end of your implementation to notify the subscribers of the event.

Note

Invoking this method will notify the subscribers of the event initialised added with add_on_initialised().

Warning

The implementation of the method in Component notifies the subscribers of the event initialised, sets the attribute _agent to the agent passed as argument and the flag _initialised to True. If you override this method, make sure to call the super method at the end of your implementation.

Example

The initialise_component() method can be overwritten to provide some custom initialisation logic.

>>> import time
>>> import numpy as np
>>> from symaware.base import Component
>>> class MyComponent(Component):
...     def initialise_component(
...            self, agent, initial_awareness_database, initial_knowledge_database
...         ):
...         self._my_model = agent.entity.model
...         self._my_state = initial_awareness_database[self.agent_id].state
...         self._my_time = time.time()
...         self._my_list = []
...         super().initialise_component(entity, initial_awareness_database, initial_knowledge_database)
Parameters:
  • agent – Agent this component has been attached to

  • initial_awareness_database – Awareness database of the agent

  • initial_knowledge_database – Knowledge database of the agent

class symaware.extra.ros.components.controller.KnowledgeAwareACCPurePursuitPathController(agent_id, dt, ilimit=200, Kp=3.0, Ki=0.05, Kd=0.04, max_angle=0.5235987755982988, min_headway_break=0.7, min_headway_acc=10.0, sensor_linger_time=1.0, emergency_break_distance=0.2, default_max_speed=0, wheel_base=0.25, waypoint_complete_dist=0.5, async_loop_lock=None)[source]

Bases: ACCPurePursuitPathController

ACC Pure Pursuit controller that adapts speed when entering a school zone.

It uses traffic sign knowledge to detect school zones and limits the speed accordingly.

_check_is_school_zone()[source]

Update school zone status based on the latest traffic sign knowledge.

_compute()[source]

Compute the control input for the agent. Normally the agent would use its information available at a certain time to compute a control input. This could translate in a lot of information being passed to this method.

This method must be implemented in any custom controller.

Example

Create a new controller by subclassing the Controller and implementing the _compute() method and the _update() method.

>>> from symaware.base import Controller, MultiAgentAwarenessVector, MultiAgentKnowledgeDatabase, TimeSeries
>>> import numpy as np
>>> class MyController(Controller):
...     def _compute(self) -> "tuple[np.ndarray, TimeSeries]":
...         # Your implementation here
...         # Example:
...         # Get the state of the agent
...         state = self._agent.self_state
...         # Get the goal position from the knowledge database
...         goal_pos = self._agent.self_knowledge["goal_pos"]
...         # Compute the control input as the difference between the goal position and the current state
...         control_input = goal_pos - state
...         # Return the control input and an empty TimeSeries
...         return control_input, TimeSeries()
...
...     def _update(self, control_input_and_intent: "tuple[np.ndarray, TimeSeries]"):
...         # Your implementation here
...         # Example:
...         # Simply override the control input and intent of the agent
...         control_input, intent = control_input_and_intent
...         self._agent.model.control_input = control_input
...         self._agent.self_awareness.intent = intent
Returns:

  • New state of the agent the controller wants to reach,

  • Time series of intents of the controller, Can be empty

property is_in_school_zone

Return True if the agent is currently in a school zone.

class symaware.extra.ros.components.controller.PIDController(agent_id, max_steering=0.5, max_throttle=0.8, lateral_pid=None, longitudinal_pid=None, async_loop_lock=None)[source]

Bases: Controller[Tasynclooplock]

A PID controller for vehicle waypoint navigation. Tracks error history and supports customizable PID coefficients for both lateral and longitudinal control.

_record_error(error_list, error)[source]

Record an error value in the history, maintaining max length.

compute_control(target_pos, target_speed)[source]

Compute the control input needed to reach the desired target and speed using PID control. The state is a 10 dimensional vector containing:

  • x position

  • y position

  • z position (unused, can be ignored)

  • x orientation (quaternion)

  • y orientation (quaternion)

  • z orientation (quaternion)

  • w orientation (quaternion)

  • x linear velocity

  • y linear velocity

  • z linear velocity

Parameters:
Returns:

tuple[float, float] – The control input needed to reach the target position and speed, as (throttle, steering).

get_error_stats()[source]

Get statistics about error history.

Return type:

dict[str, dict[str, float]]

reset_history()[source]

Reset all error history and integral terms.

set_lateral_pid(kp, ki, kd)[source]

Set lateral (yaw) PID coefficients.

set_longitudinal_pid(kp, ki, kd)[source]

Set longitudinal (speed) PID coefficients.

class symaware.extra.ros.components.controller.PurePursuitController(agent_id, wheelbase=0.32, lookahead_distance=2.0, min_lookahead=0.5, max_lookahead=5.0, lookahead_speed_gain=0.5, max_steering=0.5, max_throttle=0.8, speed_kp=0.5, speed_ki=0.0, speed_kd=0.0, async_loop_lock=None)[source]

Bases: Controller[Tasynclooplock]

A Pure Pursuit controller for vehicle waypoint navigation. Uses geometric path tracking (lookahead-based steering) for lateral control and a simple proportional controller for longitudinal (speed) control.

_record_error(error_list, error)[source]

Record an error value in the history, maintaining max length.

compute_control(target_pos, target_speed)[source]

Compute the control input using the Pure Pursuit algorithm for lateral control and PID for longitudinal control.

The state is a 10-dimensional vector containing:

  • x position

  • y position

  • z position (unused)

  • x orientation (quaternion)

  • y orientation (quaternion)

  • z orientation (quaternion)

  • w orientation (quaternion)

  • x linear velocity

  • y linear velocity

  • z linear velocity

Parameters:
Returns:

tuple[float, float] – (throttle, steering) control commands.

get_error_stats()[source]

Get statistics about error history.

Return type:

dict[str, dict[str, float]]

reset_history()[source]

Reset all error history and integral terms.

set_longitudinal_pid(kp, ki, kd)[source]

Set longitudinal (speed) PID coefficients.

set_lookahead(base, min_ld, max_ld, speed_gain)[source]

Set Pure Pursuit lookahead parameters.

class symaware.extra.ros.components.controller.PurePursuitPathController(agent_id, default_max_speed=0, wheel_base=0.33, waypoint_complete_dist=0.5, async_loop_lock=None)[source]

Bases: Controller[Tasynclooplock]

Pure Pursuit controller that drives the vehicle following a path composed by an arbitrary amount of waypoints.

It is compatible with agents using a AckermannDriveModel. This controller needs the following entries in the knowledge database: - ‘max_speed’: Maximum speed the vehicle can reach. Defaults to default_max_speed - ‘path’: Path to follow. Mandatory

Parameters:
  • agent_id – Identifier of the agent this component belongs to

  • default_max_speed (float, default: 0) – Default speed if its value is not present in the ‘max_speed’ entry of the knowledge database

  • wheel_base (float, default: 0.33) – The distance between the front and rear axles of the agent, used for steering calculations.

  • waypoint_complete_dist (float, default: 0.5) – Distance threshold to consider a waypoint as passed.

  • async_loop_lock (default: None) – Async loop lock to use for the communication sender

_compute()[source]

Compute the control input for the agent. Normally the agent would use its information available at a certain time to compute a control input. This could translate in a lot of information being passed to this method.

This method must be implemented in any custom controller.

Example

Create a new controller by subclassing the Controller and implementing the _compute() method and the _update() method.

>>> from symaware.base import Controller, MultiAgentAwarenessVector, MultiAgentKnowledgeDatabase, TimeSeries
>>> import numpy as np
>>> class MyController(Controller):
...     def _compute(self) -> "tuple[np.ndarray, TimeSeries]":
...         # Your implementation here
...         # Example:
...         # Get the state of the agent
...         state = self._agent.self_state
...         # Get the goal position from the knowledge database
...         goal_pos = self._agent.self_knowledge["goal_pos"]
...         # Compute the control input as the difference between the goal position and the current state
...         control_input = goal_pos - state
...         # Return the control input and an empty TimeSeries
...         return control_input, TimeSeries()
...
...     def _update(self, control_input_and_intent: "tuple[np.ndarray, TimeSeries]"):
...         # Your implementation here
...         # Example:
...         # Simply override the control input and intent of the agent
...         control_input, intent = control_input_and_intent
...         self._agent.model.control_input = control_input
...         self._agent.self_awareness.intent = intent
Returns:

  • New state of the agent the controller wants to reach,

  • Time series of intents of the controller, Can be empty

property next_waypoint: tuple[float, float, float]

Get the next waypoint as a (x, y, z) tuple.

Return type:

Next waypoint position, or (0, 0, 0) when no waypoints remain.

class symaware.extra.ros.components.controller.PurePursuitPointController(agent_id, target_position, default_max_speed=0, wheel_base=0.25, async_loop_lock=None)[source]

Bases: Controller[Tasynclooplock]

Pure Pursuit controller that drives the vehicle to a target point in front of the vehicle.

It is compatible with agents using a AckermannDriveModel. This controller needs the following entries in the knowledge database: - ‘max_speed’: Maximum speed the vehicle can reach. Defaults to default_max_speed

Parameters:
  • agent_id (int) – Identifier of the agent this component belongs to

  • default_max_speed (float, default: 0) – Default speed if its value is not present in the ‘max_speed’ entry of the knowledge database

  • wheelbase – The distance between the front and rear axles of the agent, used for steering calculations.

  • async_loop_lock (Optional[TypeVar(Tasynclooplock, bound= AsyncLoopLock)], default: None) – Async loop lock to use for the communication sender

_compute()[source]

Compute the control input for the agent. Normally the agent would use its information available at a certain time to compute a control input. This could translate in a lot of information being passed to this method.

This method must be implemented in any custom controller.

Example

Create a new controller by subclassing the Controller and implementing the _compute() method and the _update() method.

>>> from symaware.base import Controller, MultiAgentAwarenessVector, MultiAgentKnowledgeDatabase, TimeSeries
>>> import numpy as np
>>> class MyController(Controller):
...     def _compute(self) -> "tuple[np.ndarray, TimeSeries]":
...         # Your implementation here
...         # Example:
...         # Get the state of the agent
...         state = self._agent.self_state
...         # Get the goal position from the knowledge database
...         goal_pos = self._agent.self_knowledge["goal_pos"]
...         # Compute the control input as the difference between the goal position and the current state
...         control_input = goal_pos - state
...         # Return the control input and an empty TimeSeries
...         return control_input, TimeSeries()
...
...     def _update(self, control_input_and_intent: "tuple[np.ndarray, TimeSeries]"):
...         # Your implementation here
...         # Example:
...         # Simply override the control input and intent of the agent
...         control_input, intent = control_input_and_intent
...         self._agent.model.control_input = control_input
...         self._agent.self_awareness.intent = intent
Returns:

  • New state of the agent the controller wants to reach,

  • Time series of intents of the controller, Can be empty

class symaware.extra.ros.components.controller.StraightController(agent_id, default_max_speed, async_loop_lock=None)[source]

Bases: Controller[Tasynclooplock]

Naive controller that just drives straight forward at a constant speed.

It is compatible with agents using a AckermannDriveModel. This controller needs the following entries in the knowledge database: - ‘max_speed’: Maximum speed the vehicle can reach. Defaults to default_max_speed

Parameters:
  • agent_id (int) – Identifier of the agent this component belongs to

  • default_max_speed (float) – Default speed if its value is not present in the max_speed entry of the knowledge database

  • async_loop_lock (TypeVar(Tasynclooplock, bound= AsyncLoopLock), default: None) – Async loop lock to use for the communication sender

_compute()[source]

Compute the control input for the agent. Normally the agent would use its information available at a certain time to compute a control input. This could translate in a lot of information being passed to this method.

This method must be implemented in any custom controller.

Example

Create a new controller by subclassing the Controller and implementing the _compute() method and the _update() method.

>>> from symaware.base import Controller, MultiAgentAwarenessVector, MultiAgentKnowledgeDatabase, TimeSeries
>>> import numpy as np
>>> class MyController(Controller):
...     def _compute(self) -> "tuple[np.ndarray, TimeSeries]":
...         # Your implementation here
...         # Example:
...         # Get the state of the agent
...         state = self._agent.self_state
...         # Get the goal position from the knowledge database
...         goal_pos = self._agent.self_knowledge["goal_pos"]
...         # Compute the control input as the difference between the goal position and the current state
...         control_input = goal_pos - state
...         # Return the control input and an empty TimeSeries
...         return control_input, TimeSeries()
...
...     def _update(self, control_input_and_intent: "tuple[np.ndarray, TimeSeries]"):
...         # Your implementation here
...         # Example:
...         # Simply override the control input and intent of the agent
...         control_input, intent = control_input_and_intent
...         self._agent.model.control_input = control_input
...         self._agent.self_awareness.intent = intent
Returns:

  • New state of the agent the controller wants to reach,

  • Time series of intents of the controller, Can be empty

symaware.extra.ros.components.perception_system module

class symaware.extra.ros.components.perception_system.PerceptionSystem(agent_id, ros_client, topic, message_type, callback_at_initialise_timeout=-1.0, compression=None, throttle_rate=0, queue_size=100, reconnect_on_close=True, async_loop_lock=None)[source]

Bases: PerceptionSystem[Tasynclooplock], Generic[Tasynclooplock, TMessage]

Perception system that interfaces with ROS topics. It uses [ROS bridge](https://github.com/RobotWebTools/rosbridge_suite) to receive messages from ROS nodes without requiring a full ROS installation on the machine running this script. It needs the ROS client to be initialised before this component.

Parameters:
  • agent_id (int) – Identifier of the agent this component belongs to

  • ros_client (RosClient) – ROS client to use for the communication

  • topic (str) – ROS topic the perception will subscribe to, e.g. /my_topic

  • message_type (type[TypeVar(TMessage)]) – Type of messages the perception expects

  • callback_at_initialise_timeout (float, default: -1.0) – If this value is strictly greater than zero, the perception system will stop at the initialisation step until it receives a message from the topic it is subscribed to or the timeout expires. Useful to coordinate when expecting some mandatory data from another ROS node. Keep in mind that, util one of the two conditions are not met, the agent coordinator will not start the main loop.

  • compression (Any, default: None) – Type of compression to use, e.g. png

  • throttle_rate (int, default: 0) – Rate (in ms between messages) at which to throttle the topics

  • queue_size (int, default: 100) – Queue length at bridge side used when subscribing

  • reconnect_on_close (bool, default: True) – Reconnect topic (both publisher & subscriber) on reconnection

  • async_loop_lock (Optional[TypeVar(Tasynclooplock, bound= AsyncLoopLock)], default: None) – Async loop lock to use for the communication receiver

_LOGGER = <Logger symaware.extra.ros.components.PerceptionSystem (WARNING)>
async _async_compute()[source]

Given the state of the agent, compute a value or a set of values asynchronously.

Note

Check _compute() for more information about the method and AsyncLoopLockable for more information about the async loop.

Parameters:
  • args – Additional arguments

  • kwargs – Additional keyword arguments

Returns:

Iterable[TypeVar(TMessage)] – Computed value or set of values

_compute()[source]

Perceive the state of each Agent in the symaware.base.Environment. It is used to update the knowledge database of the agent.

Example

Create a new perception system by subclassing the PerceptionSystem and implementing the _compute() method.

>>> from symaware.base import PerceptionSystem, StateObservation
>>> class MyPerceptionSystem(PerceptionSystem):
...     def _compute(self):
...         # Your implementation here
...         # Example:
...         # Get only the agent's own state
...         if not self._agent_id in self._env.agent_states:
...             return {}
...         return {self._agent_id: StateObservation(self._agent_id, self._env.agent_states[self._agent_id])}
Returns:

Iterable[TypeVar(TMessage)] – Information perceived by the agent. Each agent’s state is identified by its id.

_on_subscription_callback(message)[source]

Callback that adds messages to the queue in a thread-safe manner.

abstractmethod _update(perceived_information)[source]

Update the agent’s model with the new perceived information.

Example

A new perception system could decide to override the default _update() method.

>>> from symaware.base import PerceptionSystem, StateObservation, Identifier
>>> class MyPerceptionSystem(PerceptionSystem):
...     def _update(self, perceived_information: "dict[Identifier, StateObservation]"):
...         # Your implementation here
...         # Example:
...         # Simply override the state of the agent
...         self._agent.awareness_database[self._agent_id].state = perceived_information[self._agent_id].state
Parameters:

perceived_information (Iterable[TypeVar(TMessage)]) – Information perceived by the agent. Each agent’s state is identified by its id.

async async_initialise_component(agent, initial_awareness_database, initial_knowledge_database)[source]

Initialize the component with some custom logic asynchronously.

Note

Check initialise_component() for more information about the method and AsyncLoopLockable for more information about the async loop.

Parameters:
  • agent – Agent this component has been attached to

  • initial_awareness_database – Awareness database of the agent

  • initial_knowledge_database – Knowledge database of the agent

async async_stop()[source]

Release the async loop lock. It means that the object will complete the current loop immediately and then stop.

Example

This function is used by the Agent when it stops the async loop.

clear_queue()[source]

Clear all messages from the perception queue.

initialise_component(agent, initial_awareness_database, initial_knowledge_database)[source]

Initialise the perception system component, optionally waiting for mandatory data from the topic.

property message_type: type[TMessage]

Message type the perception system expects.

property topic: str

ROS topic the perception system subscribes to.

Module contents