symaware.simulators.carla package

Submodules

symaware.simulators.carla.abstraction module

class symaware.simulators.carla.abstraction.Bounds(col_lb=0, col_ub=0, row_lb=0, row_ub=0)[source]

Bases: object

col_lb: int = 0
col_ub: int = 0
row_lb: int = 0
row_ub: int = 0
class symaware.simulators.carla.abstraction.GridAbstraction(grid_map, actions, state_repetition=1)[source]

Bases: Generic[_A], ABC

A simple grid-based abstraction over a continuous space.

This class provides a grid representation of a continuous space, allowing for easy mapping between continuous positions and discrete grid cells. Each cell in the grid can be marked as occupied or free, and the grid can be positioned, rotated, and scaled in continuous space.

property MDP: LightMDP

Get the MDP associated with the grid abstraction.

Return type:

The LightMDP instance representing the MDP

_cell_to_states_map: ndarray
_grid_map: GridMap
_state_to_cell(state)[source]

Convert a discrete grid state to grid cell coordinates.

Parameters:

state (int) – Discrete state identifier

Returns:

tuple[int, int] – Grid cell coordinates as (column, row) indices

Raises:

AssertionError: – If the state identifier is invalid

_states_to_cell_map: ndarray
property action_names: tuple[_A]

Get the list of action names in the grid abstraction.

Return type:

List of action identifiers

property action_set

Get the set of available actions in the MDP.

Return type:

Array of action identifiers

additional_dimension_to_name(additional_dimension_idx, additional_dimension_value)[source]

Return the name corresponding to an additional dimension value.

Parameters:
  • additional_dimension_idx (int) – Index of the additional dimension (which additional dimension we are referring to)

  • additional_dimension_value (int) – Value of the additional dimension (index within that dimension)

Return type:

str

gen_labels(label_function)[source]

Generate state labels based on the provided label function.

Maps regions defined by Bounds to label strings and assigns labels to states that fall within those regions.

Parameters:

label_function (Callable[[int, GridAbstraction], str]) – Function that takes a state and the GridAbstraction, and returns a label string

Returns:

List of labels for each state, with “_” as default for unlabeled states

gen_transitions()[source]

Generate the transition probability matrix for the MDP.

Computes transition probabilities for all state-action pairs based on the scenario-specific movement model.

Returns:

3D transition matrix with shape (n_states, n_actions, n_states) where entry [i,j,k] represents P(s’=k | s=i, a=j)

property grid_map: GridMap

Get the current grid map used in the abstraction.

Return type:

The GridMap instance representing the grid

property n_actions: int

Get the number of available actions in the grid abstraction.

Return type:

Number of actions

property n_states: int

Get the number of discrete states in the grid abstraction.

Return type:

Number of states

plot_labels(additional_dimension_to_plot=0, ax=None)[source]

Visualize the labels assigned to each state in the grid.

Returns:

Matplotlib figure and axes containing the label visualization

plot_transition_state(state, additional_dimension_to_plot=0)[source]

Visualize all the transition probabilities from a given state, across all actions.

Parameters:

state (int) – Current discrete state identifier

plot_transitions_state_action(state, action, additional_dimension_to_plot=0, axes=None)[source]

Visualize the transition probabilities from a given state and action.

Parameters:
  • state (int) – Current discrete state identifier

  • action (int) – Action identifier

pos_to_state(position)[source]

Convert a continuous position to a discrete grid state.

Parameters:

position (tuple[float, float]) – Continuous position as (x, y) coordinates

Returns:

int | None – Discrete state identifier corresponding to the grid cell containing the position, or None if the position is outside the grid or in an unoccupied cell

set_gridmap(grid_map)[source]
property state_set

Get the set of discrete states in the grid abstraction.

Return type:

Array of state identifiers

state_to_pos(state)[source]

Convert a discrete grid state to a continuous position.

Parameters:

state (int) – Discrete state identifier

Returns:

tuple[float, float] – Continuous position as (x, y) coordinates of the cell’s lower-left corner

Raises:

AssertionError: – If the state identifier is invalid

abstractmethod transition_func(state, action)[source]

Compute transition probabilities from a given state and action.

Parameters:
  • state (int) – Current discrete state identifier

  • action (int) – Action identifier

Returns:

ndarray – Array of transition probabilities to all states. The length of the array should equal the number of states.

property transition_matrix: ndarray

Get the transition probability matrix for the MDP.

Returns:

  • 3D transition matrix with shape (n_states, n_actions, n_states)

  • where entry [i,j,k] represents P(s’=k | s=i, a=j)

update(position=None, label_function=None)[source]

Update the MDP with a new initial state and labels.

Parameters:
  • position (tuple[float, float] | int | None, default: None) – New initial position as (x, y) coordinates or state index. Ignored if not provided

  • label_function (Optional[Callable[[int, GridAbstraction], str]], default: None) – Dictionary mapping Bounds regions to label strings. Ignored if not provided

class symaware.simulators.carla.abstraction.KingGridAbstraction(grid_map)[source]

Bases: GridAbstraction[tuple[int, int]]

Implementation of the King grid movement model.

The King can move one square in any direction (horizontally, vertically, or diagonally). The probability of successful movement is 0.8, while the probability of ending up in any of the adjacent squares (including diagonals) is uniformly distributed over the remaining 0.2 probability. There is also the action of staying in place with probability 1.0.

The actions are represented as tuples (delta_velocity_x, delta_velocity_y), where each component can be -1, 0, or 1.

    ↖ ↑ ↗ y   ← ● →     ↙ ↓ ↘        x
_get_prob_vector(delta)[source]
Return type:

ndarray

transition_func(state, action)[source]

Compute transition probabilities from a given state and action.

Parameters:
  • state (int) – Current discrete state identifier

  • action (int) – Action tuple containing (delta_velocity_x, delta_velocity_y)

Returns:

ndarray – Array of transition probabilities to all states. The length of the array should equal the number of states.

class symaware.simulators.carla.abstraction.PathAbstraction(waypoints, actions, radius=2.0)[source]

Bases: Generic[_A]

Create an abstraction (a path) over a continuous space.

The path will be defined by a sequence of waypoints, each representing a discrete state in the continuous space. An MDP will be associated to the path, describing the probability of moving from one waypoint to the next based on the action taken.

closest_waypoint(position)[source]

Find the closest waypoint to a given position.

Parameters:

position (tuple[float, float]) – Continuous position as (x, y) coordinates

Returns:

tuple[float, float] – Coordinates of the closest waypoint in the path

closest_waypoint_idx(position)[source]

Find the index of the closest waypoint to a given position.

Parameters:

position (tuple[float, float]) – Continuous position as (x, y) coordinates

Returns:

int – Index of the closest waypoint in the path

gen_labels(label_function)[source]

Generate state labels based on the provided label function.

Maps regions defined by Bounds to label strings and assigns labels to states that fall within those regions.

Parameters:

label_function (dict[tuple[float, float], str]) – Dictionary mapping Bounds regions to label strings

Returns:

List of labels for each state, with “_” as default for unlabeled states

gen_transitions()[source]

Generate the transition probability matrix for the MDP.

Computes transition probabilities for all state-action pairs based on the scenario-specific movement model.

Returns:

3D transition matrix with shape (n_states, n_actions, n_states) where entry [i,j,k] represents P(s’=k | s=i, a=j)

is_at_waypoint(position)[source]

Check if a given position is within the radius of any waypoint.

Parameters:

position (tuple[float, float]) – Continuous position as (x, y) coordinates

Returns:

bool – True if the position is within the radius of the waypoint, False otherwise

trans_func(idx, action)[source]

Compute transition probabilities from a given position and action.

Uses the probability distributions to determine the likelihood of moving to different neighboring waypoints.

Parameters:
  • idx (int) – Index of the current waypoint in the path

  • action (TypeVar(_A)) – Action to take, typically -2, -1, 0, 1, or 2

Returns:

Array of transition probabilities to all states

class symaware.simulators.carla.abstraction.PathGridAbstraction(grid_map, max_speed, speed_res=1.0, initial_state=(0.0, 0.0, 0.0), label_function=None)[source]

Bases: SpeedGridAbstraction[tuple[int, int]]

_get_prob_vector(delta)[source]
Return type:

ndarray

_grid_map: PathsGridMap
gen_transitions()[source]

Generate the transition probability matrix for the MDP.

Computes transition probabilities for all state-action pairs based on the scenario-specific movement model.

Returns:

3D transition matrix with shape (n_states, n_actions, n_states) where entry [i,j,k] represents P(s’=k | s=i, a=j)

state_action_to_next_state(state, action)[source]
Return type:

tuple[tuple[float, float], float]

transition_func(state, action, target_cell)[source]

Compute transition probabilities from a given state and action.

Parameters:
  • state (int) – Current discrete state identifier

  • action (int) – Action tuple containing (delta_velocity_x, delta_velocity_y)

Returns:

ndarray – Array of transition probabilities to all states. The length of the array should equal the number of states.

class symaware.simulators.carla.abstraction.SpeedGridAbstraction(grid_map, max_speed, actions, speed_res=1.0, initial_state=(0.0, 0.0, 0.0), label_function=None)[source]

Bases: GridAbstraction[_A]

A grid-based abstraction that incorporates velocity into the state representation.

This class extends the GridAbstraction to include velocity components in the state representation. The actions are represented as tuples (delta_movement, delta_speed), where each component can be -1, 0, or 1.

Movement Model: - The agent can move left (-1), forward (0), or right (1) with specified probabilities. - The speed can decrease (-1), remain the same (0), or increase (1) with specified probabilities.

In this case, the state representation is assumed to be a combination of position and velocity.

_cell_speed_to_state(cell, speed_idx)[source]

Convert grid cell coordinates and speed index to a discrete state identifier.

Parameters:
  • cell (tuple[int, int]) – Grid cell coordinates as (column, row) indices

  • speed_idx (int) – Speed index

Returns:

int – Unique state identifier for the cell and speed

Raises:

AssertionError – If cell coordinates or speed index are invalid:

_state_to_speed_idx(state)[source]

Convert a discrete state to its corresponding speed index.

Parameters:

state (int) – Discrete state identifier

Returns:

int – Grid cell coordinates as (column, row) indices

Raises:

AssertionError – If the state identifier is invalid:

additional_dimension_to_name(additional_dimension_idx)[source]

Return the name corresponding to an additional dimension value.

Parameters:
  • additional_dimension_idx (int) – Index of the additional dimension (which additional dimension we are referring to)

  • additional_dimension_value – Value of the additional dimension (index within that dimension)

Return type:

str

cell_speed_idx_to_pos_speed(cell, speed_idx)[source]

Convert grid cell coordinates and speed index to continuous position and speed.

Parameters:
  • cell (tuple[int, int]) – Grid cell coordinates as (column, row) indices

  • speed_idx (int) – Speed index

Returns:

tuple[tuple[float, float], float] – Tuple containing continuous position as (x, y) coordinates and speed value

Raises:

AssertionError – If cell coordinates or speed index are invalid:

property max_speed: float

Get the maximum speed represented in the abstraction.

Return type:

Maximum speed value

pos_speed_to_state(position, speed)[source]

Convert a continuous position and speed to the corresponding abstract state.

Parameters:
  • position (tuple[float, float]) – Continuous position as (x, y) coordinates

  • speed (float) – Continuous speed value

Returns:

int | None – Discrete state identifier corresponding to the grid cell and speed, or None if the position is outside the grid or in an unoccupied cell

property speed_range: int

Get the number of discrete speed levels in the abstraction.

Return type:

Number of speed levels

property speed_resolution: float

Get the speed resolution used in the abstraction.

Return type:

Speed resolution value

state_to_pos_speed(state)[source]

Convert a discrete state identifier to continuous position and speed.

Parameters:

state (int) – Discrete state identifier

Returns:

tuple[tuple[float, float], float] – Tuple containing continuous position as (x, y) coordinates and speed value

Raises:

AssertionError – If the state identifier is invalid:

state_to_speed(state)[source]

Convert a discrete grid state to a continuous speed.

Parameters:

state (int) – Discrete state identifier

Returns:

float – Continuous speed value

Raises:

AssertionError – If the state identifier is invalid:

update(pos_speed=None, label_function=None)[source]

Update the MDP with a new initial state and labels.

Parameters:
  • pos_speed (tuple[float, float, float] | int | None, default: None) – New initial position as (x, y) coordinates and speed (z) or direct state index. Ignored if not provided

  • label_function (Optional[Callable[[int, GridAbstraction], str]], default: None) – Dictionary mapping Bounds regions to label strings. Ignored if not provided

class symaware.simulators.carla.abstraction.VelocityGridAbstraction(grid_map, max_speed, speed_res=1.0, initial_state=(0.0, 0.0, 0.0), label_function=None)[source]

Bases: SpeedGridAbstraction[tuple[SteerAction, SpeedAction]]

A grid-based abstraction that incorporates velocity into the state representation.

This class extends the GridAbstraction to include velocity components in the state representation. The actions are represented as tuples (delta_movement, delta_speed), where each component can be -1, 0, or 1.

Movement Model: - The agent can move left (-1), forward (0), or right (1) with specified probabilities. - The speed can decrease (-1), remain the same (0), or increase (1) with specified probabilities.

In this case, the state representation is assumed to be a combination of position and velocity.

_get_prob_spatial(delta)[source]
Return type:

ndarray

_get_prob_speed(delta)[source]
Return type:

ndarray

state_action_to_next_state(state, action)[source]
Return type:

tuple[tuple[float, float], float]

transition_func(state, action)[source]

Compute transition probabilities from a given state and action.

Parameters:
  • state (int) – Current discrete state identifier

  • action (int) – Action tuple containing (delta_velocity_x, delta_velocity_y)

Returns:

ndarray – Array of transition probabilities to all states. The length of the array should equal the number of states.

symaware.simulators.carla.abstraction._main()[source]
symaware.simulators.carla.abstraction.plotting()[source]

symaware.simulators.carla.controllers module

class symaware.simulators.carla.controllers.CarlaController(agent_id, async_loop_lock=None)[source]

Bases: Controller

_to_control_input(speed, steering_angle)[source]
initialise_component(agent, initial_awareness_database, initial_knowledge_database)[source]

Initialize the component with some custom logic. For example you can instantiate new attributes and properties of the component. This function is called upon initialization by the Agent. To get the agent’s initial awareness and knowledge database you can use the arguments of this function. Make sure to call the super method at the end of your implementation to notify the subscribers of the event.

Note

Invoking this method will notify the subscribers of the event initialised added with add_on_initialised().

Warning

The implementation of the method in Component notifies the subscribers of the event initialised, sets the attribute _agent to the agent passed as argument and the flag _initialised to True. If you override this method, make sure to call the super method at the end of your implementation.

Example

The initialise_component() method can be overwritten to provide some custom initialisation logic.

>>> import time
>>> import numpy as np
>>> from symaware.base import Component
>>> class MyComponent(Component):
...     def initialise_component(
...            self, agent, initial_awareness_database, initial_knowledge_database
...         ):
...         self._my_model = agent.entity.model
...         self._my_state = initial_awareness_database[self.agent_id].state
...         self._my_time = time.time()
...         self._my_list = []
...         super().initialise_component(entity, initial_awareness_database, initial_knowledge_database)
Parameters:
  • agent – Agent this component has been attached to

  • initial_awareness_database – Awareness database of the agent

  • initial_knowledge_database – Knowledge database of the agent

class symaware.simulators.carla.controllers.GridAbstractionController(agent_id, grid_map, policy_file='', async_loop_lock=None)[source]

Bases: Controller

initialise_component(agent, initial_awareness_database, initial_knowledge_database)[source]

Initialize the component with some custom logic. For example you can instantiate new attributes and properties of the component. This function is called upon initialization by the Agent. To get the agent’s initial awareness and knowledge database you can use the arguments of this function. Make sure to call the super method at the end of your implementation to notify the subscribers of the event.

Note

Invoking this method will notify the subscribers of the event initialised added with add_on_initialised().

Warning

The implementation of the method in Component notifies the subscribers of the event initialised, sets the attribute _agent to the agent passed as argument and the flag _initialised to True. If you override this method, make sure to call the super method at the end of your implementation.

Example

The initialise_component() method can be overwritten to provide some custom initialisation logic.

>>> import time
>>> import numpy as np
>>> from symaware.base import Component
>>> class MyComponent(Component):
...     def initialise_component(
...            self, agent, initial_awareness_database, initial_knowledge_database
...         ):
...         self._my_model = agent.entity.model
...         self._my_state = initial_awareness_database[self.agent_id].state
...         self._my_time = time.time()
...         self._my_list = []
...         super().initialise_component(entity, initial_awareness_database, initial_knowledge_database)
Parameters:
  • agent – Agent this component has been attached to

  • initial_awareness_database – Awareness database of the agent

  • initial_knowledge_database – Knowledge database of the agent

class symaware.simulators.carla.controllers.MPController(agent_id, time_step=-1, wheel_base=1.5, horizon_steps=5, async_loop_lock=None)[source]

Bases: CarlaController

_compute()[source]

Compute the control input for the agent. Normally the agent would use its information available at a certain time to compute a control input. This could translate in a lot of information being passed to this method.

This method must be implemented in any custom controller.

Example

Create a new controller by subclassing the Controller and implementing the _compute() method and the _update() method.

>>> from symaware.base import Controller, MultiAgentAwarenessVector, MultiAgentKnowledgeDatabase, TimeSeries
>>> import numpy as np
>>> class MyController(Controller):
...     def _compute(self) -> "tuple[np.ndarray, TimeSeries]":
...         # Your implementation here
...         # Example:
...         # Get the state of the agent
...         state = self._agent.self_state
...         # Get the goal position from the knowledge database
...         goal_pos = self._agent.self_knowledge["goal_pos"]
...         # Compute the control input as the difference between the goal position and the current state
...         control_input = goal_pos - state
...         # Return the control input and an empty TimeSeries
...         return control_input, TimeSeries()
...
...     def _update(self, control_input_and_intent: "tuple[np.ndarray, TimeSeries]"):
...         # Your implementation here
...         # Example:
...         # Simply override the control input and intent of the agent
...         control_input, intent = control_input_and_intent
...         self._agent.model.control_input = control_input
...         self._agent.self_awareness.intent = intent
Returns:

  • New state of the agent the controller wants to reach,

  • Time series of intents of the controller, Can be empty

get_state()[source]

Get the current optimized state trajectory.

Return type:

ndarray

initialise_component(agent, initial_awareness_database, initial_knowledge_database)[source]

Initialize the component with some custom logic. For example you can instantiate new attributes and properties of the component. This function is called upon initialization by the Agent. To get the agent’s initial awareness and knowledge database you can use the arguments of this function. Make sure to call the super method at the end of your implementation to notify the subscribers of the event.

Note

Invoking this method will notify the subscribers of the event initialised added with add_on_initialised().

Warning

The implementation of the method in Component notifies the subscribers of the event initialised, sets the attribute _agent to the agent passed as argument and the flag _initialised to True. If you override this method, make sure to call the super method at the end of your implementation.

Example

The initialise_component() method can be overwritten to provide some custom initialisation logic.

>>> import time
>>> import numpy as np
>>> from symaware.base import Component
>>> class MyComponent(Component):
...     def initialise_component(
...            self, agent, initial_awareness_database, initial_knowledge_database
...         ):
...         self._my_model = agent.entity.model
...         self._my_state = initial_awareness_database[self.agent_id].state
...         self._my_time = time.time()
...         self._my_list = []
...         super().initialise_component(entity, initial_awareness_database, initial_knowledge_database)
Parameters:
  • agent – Agent this component has been attached to

  • initial_awareness_database – Awareness database of the agent

  • initial_knowledge_database – Knowledge database of the agent

prob_init()[source]
run_step(current_state, target_ref)[source]

Solve the MPC optimization problem. The current state is composed as follows:

$$ [x, y, phi, v] $$

where $x, y$ are the position coordinates, $phi$ is the heading angle, and $v$ is the velocity. On the other hand, the target reference is composed as follows:

$$ [x_t, y_t, v_t] $$

where $x_t, y_t$ are the target position coordinates, and $v_t$ is the target velocity.

class symaware.simulators.carla.controllers.PIDLateralController(offset=0, K_P=1.0, K_I=0.1, K_D=0.1, dt=0.03)[source]

Bases: object

PIDLateralController implements lateral control using a PID.

_pid_control(waypoint, vehicle_transform)[source]

Estimate the steering angle of the vehicle based on the PID equations

Parameters:
  • waypoint (tuple[float, float]) – target waypoint

  • vehicle_transform (Transform) – current transform of the vehicle :return: steering control in the range [-1, 1]

change_parameters(K_P, K_I, K_D, dt)[source]

Changes the PID parameters

run_step(waypoint)[source]

Execute one step of lateral control to steer the vehicle towards a certain waypoin.

Parameters:

waypoint (tuple[float, float]) – target waypoint :return: steering control in the range [-1, 1] where: -1 maximum steering to left +1 maximum steering to right

set_agent(agent)[source]
set_offset(offset)[source]

Changes the offset

class symaware.simulators.carla.controllers.PIDLongitudinalController(K_P=1.0, K_I=0.1, K_D=0.1, dt=0.03)[source]

Bases: object

PIDLongitudinalController implements longitudinal control using a PID.

_pid_control(target_speed, current_speed)[source]

Estimate the throttle/brake of the vehicle based on the PID equations

change_parameters(K_P, K_I, K_D, dt)[source]

Changes the PID parameters

run_step(target_speed, debug=False)[source]

Execute one step of longitudinal control to reach a given target speed.

Parameters:
  • target_speed (float) – target speed in Km/h

  • debug (bool, default: False) – boolean for debugging :return: throttle control

set_agent(agent)[source]
class symaware.simulators.carla.controllers.PPController(agent_id, lookahead_distance=2.0, wheel_base=2.5, default_speed=1.0, max_steering=0.8, async_loop_lock=None)[source]

Bases: CarlaController

PPController implements a Pure Pursuit controller for path following. It computes the necessary speed and steering angle to follow a path.

_pure_pursuit(x, y, yaw, target)[source]

Pure pursuit algorithm to compute steering angle.

Parameters:
  • x (float) – current x position

  • y (float) – current y position

  • yaw (float) – current yaw angle in radians

  • target (tuple[float, float]) – target waypoint as (x, y) tuple

Returns:

float – steering angle in range [-1, 1]

_waypoint_compute(target_waypoint)[source]

Compute the control input using pure pursuit algorithm.

Returns:

control input array and time series

set_default_speed(speed)[source]

Set the default speed.

set_lookahead_distance(distance)[source]

Set the lookahead distance.

class symaware.simulators.carla.controllers.PathGridAbstractionController(agent_id, grid_map, async_loop_lock=None)[source]

Bases: Controller

__LOGGER = <Logger symaware.simulators.carla.PathGridAbstractionController (WARNING)>
_abs_state_to_action(prod_state_idx)[source]
Return type:

tuple[Literal['l', 'f', 'r'], Literal['d', 'c', 'a']] | None

_run_step(pos, speed)[source]
Return type:

tuple[tuple[float, float], float] | None

_show_updated_policy()[source]
_update_policy(pos, speed)[source]
property abstraction: PathGridAbstraction
initialise_component(agent, initial_awareness_database, initial_knowledge_database)[source]

Initialize the component with some custom logic. For example you can instantiate new attributes and properties of the component. This function is called upon initialization by the Agent. To get the agent’s initial awareness and knowledge database you can use the arguments of this function. Make sure to call the super method at the end of your implementation to notify the subscribers of the event.

Note

Invoking this method will notify the subscribers of the event initialised added with add_on_initialised().

Warning

The implementation of the method in Component notifies the subscribers of the event initialised, sets the attribute _agent to the agent passed as argument and the flag _initialised to True. If you override this method, make sure to call the super method at the end of your implementation.

Example

The initialise_component() method can be overwritten to provide some custom initialisation logic.

>>> import time
>>> import numpy as np
>>> from symaware.base import Component
>>> class MyComponent(Component):
...     def initialise_component(
...            self, agent, initial_awareness_database, initial_knowledge_database
...         ):
...         self._my_model = agent.entity.model
...         self._my_state = initial_awareness_database[self.agent_id].state
...         self._my_time = time.time()
...         self._my_list = []
...         super().initialise_component(entity, initial_awareness_database, initial_knowledge_database)
Parameters:
  • agent – Agent this component has been attached to

  • initial_awareness_database – Awareness database of the agent

  • initial_knowledge_database – Knowledge database of the agent

property lp: VelocityLPRiskLTL
property policy: ndarray
class symaware.simulators.carla.controllers.VehiclePIDController(agent_id, args_lateral={'K_D': 0.0, 'K_I': 0.0, 'K_P': 2.0}, args_longitudinal={'K_D': 0.0, 'K_I': 0.0, 'K_P': 2.0}, offset=0, max_throttle=0.75, max_brake=0.3, max_steering=0.8, async_loop_lock=None)[source]

Bases: CarlaController

VehiclePIDController is the combination of two PID controllers (lateral and longitudinal) to perform the low level control a vehicle from client side

_compute()[source]

Compute the control input for the agent. Normally the agent would use its information available at a certain time to compute a control input. This could translate in a lot of information being passed to this method.

This method must be implemented in any custom controller.

Example

Create a new controller by subclassing the Controller and implementing the _compute() method and the _update() method.

>>> from symaware.base import Controller, MultiAgentAwarenessVector, MultiAgentKnowledgeDatabase, TimeSeries
>>> import numpy as np
>>> class MyController(Controller):
...     def _compute(self) -> "tuple[np.ndarray, TimeSeries]":
...         # Your implementation here
...         # Example:
...         # Get the state of the agent
...         state = self._agent.self_state
...         # Get the goal position from the knowledge database
...         goal_pos = self._agent.self_knowledge["goal_pos"]
...         # Compute the control input as the difference between the goal position and the current state
...         control_input = goal_pos - state
...         # Return the control input and an empty TimeSeries
...         return control_input, TimeSeries()
...
...     def _update(self, control_input_and_intent: "tuple[np.ndarray, TimeSeries]"):
...         # Your implementation here
...         # Example:
...         # Simply override the control input and intent of the agent
...         control_input, intent = control_input_and_intent
...         self._agent.model.control_input = control_input
...         self._agent.self_awareness.intent = intent
Returns:

  • New state of the agent the controller wants to reach,

  • Time series of intents of the controller, Can be empty

change_lateral_PID(args_lateral)[source]

Changes the parameters of the PIDLateralController

change_longitudinal_PID(args_longitudinal)[source]

Changes the parameters of the PIDLongitudinalController

initialise_component(agent, initial_awareness_database, initial_knowledge_database)[source]

Initialize the component with some custom logic. For example you can instantiate new attributes and properties of the component. This function is called upon initialization by the Agent. To get the agent’s initial awareness and knowledge database you can use the arguments of this function. Make sure to call the super method at the end of your implementation to notify the subscribers of the event.

Note

Invoking this method will notify the subscribers of the event initialised added with add_on_initialised().

Warning

The implementation of the method in Component notifies the subscribers of the event initialised, sets the attribute _agent to the agent passed as argument and the flag _initialised to True. If you override this method, make sure to call the super method at the end of your implementation.

Example

The initialise_component() method can be overwritten to provide some custom initialisation logic.

>>> import time
>>> import numpy as np
>>> from symaware.base import Component
>>> class MyComponent(Component):
...     def initialise_component(
...            self, agent, initial_awareness_database, initial_knowledge_database
...         ):
...         self._my_model = agent.entity.model
...         self._my_state = initial_awareness_database[self.agent_id].state
...         self._my_time = time.time()
...         self._my_list = []
...         super().initialise_component(entity, initial_awareness_database, initial_knowledge_database)
Parameters:
  • agent – Agent this component has been attached to

  • initial_awareness_database – Awareness database of the agent

  • initial_knowledge_database – Knowledge database of the agent

run_step(target_speed, waypoint)[source]

Execute one step of control invoking both lateral and longitudinal PID controllers to reach a target waypoint at a given target_speed.

Parameters:
  • target_speed (float) – desired vehicle speed

  • waypoint (tuple[float, float]) – target location encoded as a waypoint :return: distance (in meters) to the waypoint

set_offset(offset)[source]

Changes the offset

symaware.simulators.carla.dynamical_model module

class symaware.simulators.carla.dynamical_model.DroneModelSubinputs[source]

Bases: TypedDict

rpm1: float
rpm2: float
rpm3: float
rpm4: float
class symaware.simulators.carla.dynamical_model.DynamicalModel(ID, control_input)[source]

Bases: DynamicalModel, Generic[T]

Abstract class for dynamical models using the CARLA simulation engine.

This is the parent class for all CARLA-based dynamical models. Subclasses implement specific models for different entity types (vehicles, walkers, etc.) and control input formats.

Parameters:
  • ID (int) – Identifier of the agent this model belongs to

  • control_input (ndarray) – Initial control input of the agent. Used to validate the size of future control inputs

_actor: T
initialise(actor)[source]

Initialize the dynamical model with a CARLA actor.

Parameters:

actor (Actor) – The CARLA actor to associate with this dynamical model

step()[source]

Run a simulation step of the dynamical model. Called by the associated entity at each simulation step.

Parameters:
  • args – Additional arguments

  • kwargs – Additional keyword arguments

class symaware.simulators.carla.dynamical_model.RacecarModelSubinputs[source]

Bases: TypedDict

steering_angle: ndarray
target_velocity: ndarray
class symaware.simulators.carla.dynamical_model.RawDynamicalModel(agent_id)[source]

Bases: DynamicalModel[Actor]

Raw dynamical model for entities.

Allows setting the position and orientation of an entity directly. Takes a control input of six elements: x, y, z position and yaw, pitch, roll rotations.

control_input_to_array(x, y, z, yaw, pitch, roll)[source]

Convert position and rotation values to array.

Returns:

ndarray – Array with [x, y, z, yaw, pitch, roll]

set_control_input(x, y, z, yaw, pitch, roll)[source]

Set the control input with position and rotation values.

Parameters:
  • x (float) – X position coordinate

  • y (float) – Y position coordinate

  • z (float) – Z position coordinate

  • yaw (float) – Yaw rotation angle

  • pitch (float) – Pitch rotation angle

  • roll (float) – Roll rotation angle

step()[source]

Run a simulation step of the dynamical model. Called by the associated entity at each simulation step.

Parameters:
  • args – Additional arguments

  • kwargs – Additional keyword arguments

property subinputs_dict: RawModelSubinputs

The input of a system is the composition of subinputs.

Example

A car input [vx, vy, vz, s] is composed by the velocity and the steering angle.

>>> from symaware.base import DynamicalModel
>>> class CarModel(DynamicalModel):
...     @property
...     def subinputs_dict(self):
...         return {
...             "velocity": self._control_input[:3],
...             "steering_angle": self._control_input[3]
...         }
...

Important

The order of the subinputs in the list must be the same as the order of the subinputs in the input vector

class symaware.simulators.carla.dynamical_model.RawModelSubinputs[source]

Bases: TypedDict

pitch: float
roll: float
x: float
y: float
yaw: float
z: float
class symaware.simulators.carla.dynamical_model.StillDynamicalModel(agent_id)[source]

Bases: DynamicalModel[Actor]

A static dynamical model for entities that do not move. This model takes no control inputs and keeps the entity stationary.

control_input_to_array()[source]
Return type:

ndarray

set_control_input()[source]
property subinputs_dict: RawModelSubinputs

The input of a system is the composition of subinputs.

Example

A car input [vx, vy, vz, s] is composed by the velocity and the steering angle.

>>> from symaware.base import DynamicalModel
>>> class CarModel(DynamicalModel):
...     @property
...     def subinputs_dict(self):
...         return {
...             "velocity": self._control_input[:3],
...             "steering_angle": self._control_input[3]
...         }
...

Important

The order of the subinputs in the list must be the same as the order of the subinputs in the input vector

class symaware.simulators.carla.dynamical_model.VehicleControlModel(agent_id)[source]

Bases: DynamicalModel[Vehicle]

A simple model that uses the built-in vehicle control mechanism of CARLA.

control_input_to_array(steer=0.0, brake=0.0, hand_brake=False, reverse=False, manual_gear_shift=False, gear=0)[source]
Overloads:
  • self, throttle (float), steer (float), brake (float), hand_brake (bool), reverse (bool), manual_gear_shift (bool), gear (int) → np.ndarray

  • self, throttle (carla.VehicleControl) → np.ndarray

Convert vehicle control parameters to array representation.

Returns:

Array with [throttle, steer, brake, hand_brake, reverse, manual_gear_shift, gear]

set_control_input(throttle=0.0, steer=0.0, brake=0.0, hand_brake=False, reverse=False, manual_gear_shift=False, gear=0)[source]
Overloads:
  • self, throttle (float), steer (float), brake (float), hand_brake (bool), reverse (bool), manual_gear_shift (bool), gear (int) → np.ndarray

  • self, throttle (carla.VehicleControl) → np.ndarray

Set the vehicle control inputs.

This method supports two calling conventions: individual parameters or a carla.VehicleControl object.

Parameters:
  • throttle (default: 0.0) – Throttle value (0-1) or a VehicleControl object

  • steer (default: 0.0) – Steering angle (-1 to 1)

  • brake (default: 0.0) – Brake value (0-1)

  • hand_brake (default: False) – Hand brake flag

  • reverse (default: False) – Reverse flag

  • manual_gear_shift (default: False) – Manual gear shift flag

  • gear (default: 0) – Gear number

step()[source]

Run a simulation step of the dynamical model. Called by the associated entity at each simulation step.

Parameters:
  • args – Additional arguments

  • kwargs – Additional keyword arguments

property subinputs_dict: VehicleControlSubinputs

The input of a system is the composition of subinputs.

Example

A car input [vx, vy, vz, s] is composed by the velocity and the steering angle.

>>> from symaware.base import DynamicalModel
>>> class CarModel(DynamicalModel):
...     @property
...     def subinputs_dict(self):
...         return {
...             "velocity": self._control_input[:3],
...             "steering_angle": self._control_input[3]
...         }
...

Important

The order of the subinputs in the list must be the same as the order of the subinputs in the input vector

class symaware.simulators.carla.dynamical_model.VehicleControlSubinputs[source]

Bases: TypedDict

brake: float
gear: float
hand_brake: float
manual_gear_shift: float
reverse: float
steer: float
throttle: float
class symaware.simulators.carla.dynamical_model.VehicleDynamicalModel(agent_id)[source]

Bases: DynamicalModel[Vehicle]

Velocity-based dynamical model for vehicle entities.

Allows setting the target velocity of the vehicle directly. Takes a control input of three elements: x, y, z velocity components.

control_input_to_array(x, y, z)[source]

Convert velocity values to array.

Returns:

ndarray – Array with [x, y, z] velocity components

set_control_input(x, y, z)[source]

Set the target velocity for the vehicle.

Parameters:
  • x (float) – Velocity in x direction

  • y (float) – Velocity in y direction

  • z (float) – Velocity in z direction

step()[source]

Run a simulation step of the dynamical model. Called by the associated entity at each simulation step.

Parameters:
  • args – Additional arguments

  • kwargs – Additional keyword arguments

property subinputs_dict: VelocitySubinputs

The input of a system is the composition of subinputs.

Example

A car input [vx, vy, vz, s] is composed by the velocity and the steering angle.

>>> from symaware.base import DynamicalModel
>>> class CarModel(DynamicalModel):
...     @property
...     def subinputs_dict(self):
...         return {
...             "velocity": self._control_input[:3],
...             "steering_angle": self._control_input[3]
...         }
...

Important

The order of the subinputs in the list must be the same as the order of the subinputs in the input vector

class symaware.simulators.carla.dynamical_model.Velocity2DSubinputs[source]

Bases: TypedDict

v: float
w: float
class symaware.simulators.carla.dynamical_model.VelocityDynamicalModel(agent_id)[source]

Bases: DynamicalModel[Actor]

Velocity-based dynamical model for entities.

Allows setting the target velocity of an entity directly. Takes a control input of three elements: x, y, z velocity components.

control_input_to_array(x, y, z)[source]

Convert velocity values to array.

Returns:

ndarray – Array with [x, y, z] velocity components

set_control_input(x, y, z)[source]

Set the target velocity for the entity.

Parameters:
  • x (float) – Velocity in x direction

  • y (float) – Velocity in y direction

  • z (float) – Velocity in z direction

step()[source]

Run a simulation step of the dynamical model. Called by the associated entity at each simulation step.

Parameters:
  • args – Additional arguments

  • kwargs – Additional keyword arguments

property subinputs_dict: VelocitySubinputs

The input of a system is the composition of subinputs.

Example

A car input [vx, vy, vz, s] is composed by the velocity and the steering angle.

>>> from symaware.base import DynamicalModel
>>> class CarModel(DynamicalModel):
...     @property
...     def subinputs_dict(self):
...         return {
...             "velocity": self._control_input[:3],
...             "steering_angle": self._control_input[3]
...         }
...

Important

The order of the subinputs in the list must be the same as the order of the subinputs in the input vector

class symaware.simulators.carla.dynamical_model.VelocitySubinputs[source]

Bases: TypedDict

x: float
y: float
z: float
class symaware.simulators.carla.dynamical_model.WalkDynamicalModel(agent_id=-1, speed=1.0, direction=-1.0, jump=False)[source]

Bases: DynamicalModel[Walker]

Dynamical model for walker entities in CARLA. This model controls walker movement using speed, direction, and jump parameters. The walker moves continuously in the specified direction at the given speed.

control_input_to_array()[source]

Convert control input to array representation (empty for walker model).

Return type:

ndarray

initialise(actor)[source]

Initialize the dynamical model with a CARLA actor.

Parameters:

actor (Walker) – The CARLA actor to associate with this dynamical model

set_control_input()[source]

Set control input (no-op for walker model).

property subinputs_dict: RawModelSubinputs

Dictionary of named control inputs (empty for walker model).

symaware.simulators.carla.entities module

class symaware.simulators.carla.entities.CameraEntity(id=-1, model=<factory>, actor_id=-1, kind='sensor.camera.rgb', actor=None, position=<factory>, velocity=<factory>, rotation=<factory>, acceleration=<factory>, pre_existing=False, kwargs=<factory>, cbs=<factory>, callback=None)[source]

Bases: SensorEntity

Camera sensor entity for CARLA simulation.

Represents a camera sensor that captures RGB images from the simulation. All internal identifiers are set to -1 by default and will be set to the correct values during initialisation.

Parameters:
  • position (ndarray, default: <factory>) – (3)-shaped initial position of the entity. If the size is less than 3, missing values are set to 0

  • velocity (ndarray, default: <factory>) – (3)-shaped initial velocity of the entity. If the size is less than 3, missing values are set to 0

  • rotation (ndarray, default: <factory>) – (3)-shaped initial orientation of the entity as Euler angles

  • model (DynamicalModel, default: <factory>) – Dynamical model associated with the entity. Must be a subclass of DynamicalModel

actor: carla.Sensor = None
callback: Callable[[carla.Image], None] | None = None
kind: str = 'sensor.camera.rgb'
spawn_actor(world, parent=None, **kwargs)[source]
Return type:

Actor

class symaware.simulators.carla.entities.Entity(id=-1, model=<factory>, actor_id=-1, kind='', actor=None, position=<factory>, velocity=<factory>, rotation=<factory>, acceleration=<factory>, pre_existing=False, kwargs=<factory>)[source]

Bases: Entity

Abstract class for CARLA entities in the simulation.

Represents an actor in the CARLA simulation environment. All internal identifiers are set to -1 by default and will be set to the correct values during initialisation.

Parameters:
  • position (ndarray, default: <factory>) – (3)-shaped initial position of the entity. If the size is less than 3, missing values are set to 0

  • velocity (ndarray, default: <factory>) – (3)-shaped initial velocity of the entity. If the size is less than 3, missing values are set to 0

  • rotation (ndarray, default: <factory>) – (3)-shaped initial orientation of the entity as Euler angles

  • model (DynamicalModel, default: <factory>) – Dynamical model associated with the entity. Must be a subclass of DynamicalModel

acceleration: ndarray
actor: carla.Actor = None
actor_id: int = -1
property direction: carla.Vector3D

Get the direction of the entity as a numpy array. The direction is defined as the orientation of the entity.

Return type:

Vector representing the direction the entity is facing.

id: int = -1
initialise(world, parent=None)[source]

Initialise the entity in the world. This method should be called after the entity is created. It sets the actor_id of the entity and the actor associated with it.

Parameters:

world (World) – The world in which the entity is created.

kind: str = ''
kwargs: dict[str, str]
property location: carla.Location

Get the location of the entity as a carla.Location object.

Return type:

Location of the entity

model: DynamicalModel
position: ndarray
pre_existing: bool = False
rotation: ndarray
spawn_actor(world, parent=None, **kwargs)[source]
Return type:

Actor

property state

Get the state of the entity as a numpy array. The state is defined as the position and orientation of the entity.

Return type:

(3 + 3 + 3)-shaped array containing the position, orientation and velocity of the entity.

stop()[source]
velocity: ndarray
class symaware.simulators.carla.entities.PedestrianEntity(id=-1, model=<factory>, actor_id=-1, kind='walker.pedestrian.0038', actor=None, position=<factory>, velocity=<factory>, rotation=<factory>, acceleration=<factory>, pre_existing=False, kwargs=<factory>)[source]

Bases: Entity

Pedestrian entity for CARLA simulation.

Represents a pedestrian (walker) actor in the CARLA simulation environment. All internal identifiers are set to -1 by default and will be set to the correct values during initialisation.

Parameters:
  • position (ndarray, default: <factory>) – (3)-shaped initial position of the entity. If the size is less than 3, missing values are set to 0

  • velocity (ndarray, default: <factory>) – (3)-shaped initial velocity of the entity. If the size is less than 3, missing values are set to 0

  • rotation (ndarray, default: <factory>) – (3)-shaped initial orientation of the entity as Euler angles

  • model (DynamicalModel, default: <factory>) – Dynamical model associated with the entity. Must be a subclass of DynamicalModel

kind: str = 'walker.pedestrian.0038'
class symaware.simulators.carla.entities.SensorEntity(id=-1, model=<factory>, actor_id=-1, kind='', actor=None, position=<factory>, velocity=<factory>, rotation=<factory>, acceleration=<factory>, pre_existing=False, kwargs=<factory>, cbs=<factory>)[source]

Bases: Entity

Abstract base class for sensor entities in CARLA.

Represents sensors (cameras, lidar, etc.) in the CARLA simulation. All internal identifiers are set to -1 by default and will be set to the correct values during initialisation.

Parameters:
  • position (ndarray, default: <factory>) – (3)-shaped initial position of the entity. If the size is less than 3, missing values are set to 0

  • velocity (ndarray, default: <factory>) – (3)-shaped initial velocity of the entity. If the size is less than 3, missing values are set to 0

  • rotation (ndarray, default: <factory>) – (3)-shaped initial orientation of the entity as Euler angles

  • model (DynamicalModel, default: <factory>) – Dynamical model associated with the entity. Must be a subclass of DynamicalModel

actor: carla.Sensor = None
cbs: list[callable]
spawn_actor(world, parent=None, **kwargs)[source]
Return type:

Sensor

stop()[source]
class symaware.simulators.carla.entities.SpectatorEntity(id=-1, model=<factory>, actor_id=-1, kind='', actor=None, position=<factory>, velocity=<factory>, rotation=<factory>, acceleration=<factory>, pre_existing=False, kwargs=<factory>, parent=None, following=False)[source]

Bases: Entity

Spectator camera entity for CARLA simulation.

Provides a special viewport to view the simulation from a detached perspective. All internal identifiers are set to -1 by default and will be set to the correct values during initialisation.

Parameters:
  • position (ndarray, default: <factory>) – (3)-shaped initial position of the entity. If the size is less than 3, missing values are set to 0

  • velocity (ndarray, default: <factory>) – (3)-shaped initial velocity of the entity. If the size is less than 3, missing values are set to 0

  • rotation (ndarray, default: <factory>) – (3)-shaped initial orientation of the entity as Euler angles

  • model (DynamicalModel, default: <factory>) – Dynamical model associated with the entity. Must be a subclass of DynamicalModel

following: bool = False
initialise(world, parent=None)[source]

Initialise the entity in the world. This method should be called after the entity is created. It sets the actor_id of the entity and the actor associated with it.

Parameters:

world – The world in which the entity is created.

parent: carla.Actor = None
spawn_actor(world, parent=None, **kwargs)[source]
Return type:

Sensor

step()[source]
class symaware.simulators.carla.entities.VehicleEntity(id=-1, model=<factory>, actor_id=-1, kind='vehicle.tesla.model3', actor=None, position=<factory>, velocity=<factory>, rotation=<factory>, acceleration=<factory>, pre_existing=False, kwargs=<factory>, color='255, 0, 0', autopilot=False, gravity=False, sensors=<factory>, wheel_base=0.0)[source]

Bases: Entity

Vehicle entity for CARLA simulation.

Represents a vehicle actor in the CARLA simulation environment. All internal identifiers are set to -1 by default and will be set to the correct values during initialisation.

Parameters:
  • position (ndarray, default: <factory>) – (3)-shaped initial position of the entity. If the size is less than 3, missing values are set to 0

  • velocity (ndarray, default: <factory>) – (3)-shaped initial velocity of the entity. If the size is less than 3, missing values are set to 0

  • rotation (ndarray, default: <factory>) – (3)-shaped initial orientation of the entity as Euler angles

  • model (DynamicalModel, default: <factory>) – Dynamical model associated with the entity. Must be a subclass of DynamicalModel

actor: carla.Vehicle = None
autopilot: bool = False
color: str = '255,0,0'
gravity: bool = False
initialise(world, parent=None)[source]

Initialise the entity in the world. This method should be called after the entity is created. It sets the actor_id of the entity and the actor associated with it.

Parameters:

world – The world in which the entity is created.

kind: str = 'vehicle.tesla.model3'
sensors: list[Entity]
spawn_actor(world, parent=None, **kwargs)[source]
Return type:

Vehicle

property state

Get the state of the entity as a numpy array. The state is defined as the position and orientation of the entity.

Returns:

(3 + 3 + 3)-shaped array containing the position (m), orientation (rad) and velocity (m/s**2) of the entity.

Return type:

np.ndarray

step()[source]
stop()[source]
wheel_base: float = 0.0
class symaware.simulators.carla.entities.WarningSignEntity(id=-1, model=<factory>, actor_id=-1, kind='static.prop.trafficwarning', actor=None, position=<factory>, velocity=<factory>, rotation=<factory>, acceleration=<factory>, pre_existing=False, kwargs=<factory>)[source]

Bases: Entity

Warning sign entity for CARLA simulation.

Represents a warning sign or static object in the CARLA simulation environment. All internal identifiers are set to -1 by default and will be set to the correct values during initialisation.

Parameters:
  • position (ndarray, default: <factory>) – (3)-shaped initial position of the entity. If the size is less than 3, missing values are set to 0

  • velocity (ndarray, default: <factory>) – (3)-shaped initial velocity of the entity. If the size is less than 3, missing values are set to 0

  • rotation (ndarray, default: <factory>) – (3)-shaped initial orientation of the entity as Euler angles

  • model (DynamicalModel, default: <factory>) – Dynamical model associated with the entity. Must be a subclass of DynamicalModel

kind: str = 'static.prop.trafficwarning'

symaware.simulators.carla.environment module

class symaware.simulators.carla.environment.Environment(host='127.0.0.1', port=2000, connection_timeout=5000, map='Town01', spectator=False, render=True, time_interval=0.0, async_loop_lock=None)[source]

Bases: Environment

Environment based on the carla simulator.

Parameters:
  • real_time_interval – If set to a strictly positive value, pybullet will run the simulation in real time. Otherwise, the simulation will run when step() is called.

  • connection_method – Method used to connect to the pybullet server. See the pybullet documentation for more information.

  • plane_scaling – Scale of the plane automatically added to the environment to represent the ground. If set to a non positive value, the plane will not be added.

  • gravity – Set the gravity the physics engine will apply at each simulation step. It can be a single floating point value, in which case it will be applied along the third axis, or a tuple of three values, one for each axis.

  • async_loop_lock (AsyncLoopLock | None, default: None) – Async loop lock to use for the environment

__LOGGER = <Logger symaware.simulators.carla.carla.Environment (WARNING)>
_add_entity(entity)[source]

Add an entity to the environment.

Parameters:

entity (Entity) – The entity to add

add_gizmo(gizmo)[source]

Add a gizmo (visualization) to the environment.

Parameters:

gizmo (Gizmo) – The gizmo to add

add_gizmos(*gizmos)[source]

Add multiple gizmos to the environment.

Parameters:

gizmos (list[Gizmo]) – Variable number of gizmo objects to add

get_agent_direction(agent)[source]

Get the direction of an agent in the environment. The actual implementation should be done in the derived class, based on the simulated environment API.

Parameters:

agent (int | Agent) – agent to get the direction of

Returns:

Vector3D – Direction of the agent within the environment

get_agent_location(agent)[source]

Get the location of an agent.

Parameters:

agent (int | Agent) – Agent identifier or Agent object

Returns:

Location – Location of the agent

get_entity_direction(entity)[source]

Get the direction vector of an entity.

Parameters:

entity (Entity) – The entity to get the direction of

Returns:

Vector3D – Direction vector of the entity

get_entity_location(entity)[source]

Get the location of an entity.

Parameters:

entity (Entity) – The entity to get the location of

Returns:

Location – Location of the entity

get_entity_state(entity)[source]

Get the state of an entity.

Parameters:

entity (Entity) – The entity to get the state of

Returns:

ndarray – State array of the entity

get_waypoints(distance=5.0)[source]

Get all the waypoints in the current map.

Returns:

list[Waypoint] – List of waypoints in the current map

initialise()[source]

Initialise the simulation, allocating the required resources. Should be called when the simulation has been set up and is ready to be run. Some environment implementations may call it automatically when the environment is created. It is their responsibility to ensure that the method is idempotent.

property spectator: carla.Actor

Get the spectator of the world. The spectator is a special actor that can be used to view the world from a different perspective.

step()[source]

It can be called repeatedly step the environment forward in time, updating the state of all the entities.

stop()[source]

Terminate the simulation, releasing the resources. Should be called when the simulation has been running manually and needs to be stopped. Some environment implementations may call it automatically when the environment is destroyed. It is their responsibility to ensure that the method is idempotent.

Warning

Depending on the simulator implementation, calling this method may invalidate all the entities previously added to the environment. In that case, entities and the environment should be recreated from scratch.

property world: carla.World

Get the carla world. The world is the main object that contains all the actors and the environment.

symaware.simulators.carla.gizmo module

class symaware.simulators.carla.gizmo.Gizmo(color=<factory>, thickness=0.06, life_time=1.0)[source]

Bases: ABC

Abstract base class for debug visualization gizmos in CARLA.

Gizmos are used to display debug information in the CARLA simulator such as points, grids, paths, and other geometric shapes.

color: carla.Color
abstractmethod draw(world)[source]

Draw the gizmo in the world.

Parameters:

world (World) – The CARLA world to draw the gizmo in

initialise(world)[source]

Initialise the gizmo with the world. The world is the Carla world that the gizmo is associated with.

life_time: float = 1.0
thickness: float = 0.06
class symaware.simulators.carla.gizmo.GridGizmo(color=<factory>, thickness=0.06, life_time=1.0, origin=<factory>, shape=(1, 1), cell_size=(1.0, 1.0))[source]

Bases: Gizmo

A gizmo that draws a regular grid.

Displays a grid of boxes with the specified origin, shape, and cell size.

cell_size: tuple[float, float] = (1.0, 1.0)
draw(world)[source]

Draw the gizmo in the world.

Parameters:

world (World) – The CARLA world to draw the gizmo in

origin: carla.Location
shape: tuple[int, int] = (1, 1)
thickness: float = 0.06
class symaware.simulators.carla.gizmo.GridMapGizmo(color=<factory>, thickness=0.09, life_time=1.0, grid_map=None, z=0.0, draw_pos=False)[source]

Bases: Gizmo

A gizmo that draws a GridMap visualization.

Displays an occupancy grid map with cells colored based on occupancy status. Optionally shows occupied cell positions and coordinates.

_all_centers: tuple[carla.Location] = None
_all_pos: tuple[carla.Location]
_corners: tuple[carla.Location, carla.Location, carla.Location, carla.Location] = None
_extent: carla.Vector3D = None
_rotation: carla.Rotation
draw(world)[source]

Draw the gizmo in the world.

Parameters:

world (World) – The CARLA world to draw the gizmo in

draw_pos: bool = False
grid_map: GridMap = None
thickness: float = 0.09
z: float = 0.0
class symaware.simulators.carla.gizmo.PathGizmo(color=<factory>, thickness=0.06, life_time=1.0, waypoints=<factory>, size=0.2, z=0.0, label='')[source]

Bases: Gizmo

A gizmo that draws a path/trajectory.

Displays a series of waypoints connected as a path with directional arrows and optional labels at each waypoint.

draw(world)[source]

Draw the gizmo in the world.

Parameters:

world (World) – The CARLA world to draw the gizmo in

label: str = ''
size: float = 0.2
waypoints: list[carla.Location | carla.Waypoint]
z: float = 0.0
class symaware.simulators.carla.gizmo.PointGizmo(color=<factory>, thickness=0.06, life_time=1.0, location=<factory>, size=0.5)[source]

Bases: Gizmo

A gizmo that draws a single point at a location.

Displays a point with the specified size and color at a given location.

draw(world)[source]

Draw the gizmo in the world.

Parameters:

world (World) – The CARLA world to draw the gizmo in

location: carla.Location
size: float = 0.5
update_color(new_color)[source]
update_location(new_location)[source]
class symaware.simulators.carla.gizmo.WaypointGizmo(color=<factory>, thickness=0.06, life_time=1.0, origin=<factory>, shape=(1, 1), size=(1.0, 1.0), rotation=carla.Rotation)[source]

Bases: Gizmo

A gizmo that draws waypoint grid.

Displays a grid of waypoints with the specified origin, shape, and size.

draw(world)[source]

Draw the gizmo in the world.

Parameters:

world (World) – The CARLA world to draw the gizmo in

origin: carla.Location
rotation: carla.Rotation
shape: tuple[int, int] = (1, 1)
size: tuple[float, float] = (1.0, 1.0)
thickness: float = 0.06

symaware.simulators.carla.grid module

class symaware.simulators.carla.grid.GridMap(grid_size, cell_size, offset=None, rotation=0.0)[source]

Bases: object

A 2D grid map that can be positioned, rotated, and scaled in continuous space.

The GridMap represents a discrete 2D grid that is embedded in a continuous coordinate system. Each cell in the grid can be marked as occupied (True) or free (False). The grid can be: - Positioned at any location using an offset - Rotated by any angle - Scaled using different cell sizes

Coordinate System and Geometry

The grid uses a row-column indexing system where: - row (r) corresponds to the x-axis in the local grid frame - col (c) corresponds to the y-axis in the local grid frame

Grid Space (Discrete):

        columns →     +---+---+---+---+     |   |   |   |   | r   +---+---+---+---+ o   |   |   |   |   | w   +---+---+---+---+ s   |   |   |   |   |     +---+---+---+---+ ↓

Visual Guide - Tilting Your Head:

Normal view of the grid matrix:

[[0, 0, 1, 0], If you tilt your head 90° clockwise, [0, 1, 0, 0], you can visualize how this maps to [1, 0, 0, 1]] 2D space

After mental rotation, we get back the traditional Cartesian view:

↑ y | |  +---+---+---+ |  |   |   | 1 | |  +---+---+---+ |  | 1 |   |   | |  +---+---+---+ |  |   | 1 |   | |  +---+---+---+ |  |   |   | 1 | |  +---+---+---+ | +------------→ x
type grid_size:

tuple[int, int]

param grid_size:

Number of cells in the grid as (n_rows, n_cols)

type cell_size:

tuple[float, float]

param cell_size:

Physical size of each cell as (width, height) in world units

type offset:

tuple[float, float] | None, default: None

param offset:

Position of the grid origin (0,0) in world coordinates

type rotation:

float, default: 0.0

param rotation:

Rotation angle of the grid in radians, counter-clockwise

_grid
Type:

Boolean array representing occupied (True) and free (False) cells

_cell_size
Type:

Physical dimensions of each cell

_offset
Type:

World coordinates of the grid origin

_cos
Type:

Cosine of the rotation angle (cached for efficiency)

_sin
Type:

Sine of the rotation angle (cached for efficiency)

Examples

Create a simple grid:

>>> from symaware.simulators.carla import GridMap
>>> grid = GridMap(grid_size=(10, 10), cell_size=(1.0, 1.0))
>>> grid.add_pos((5.5, 5.5))  # Mark cell containing position (5.5, 5.5) as occupied

Create a rotated grid:

>>> from symaware.simulators.carla import GridMap
>>> grid = GridMap(
...     grid_size=(10, 10),
...     cell_size=(2.0, 2.0),
...     offset=(100.0, 100.0),
...     rotation=np.pi/4  # 45 degrees
... )

See also

GridAbstraction

A more complex grid-based abstraction with MDP

_LOGGER = <Logger symaware.simulators.carla.GridMap (WARNING)>
add_cell(row, col)[source]

Activate a cell in the grid at the specified row and column.

add_cell_range(row_lb, row_ub, col_lb, col_ub)[source]

Activate a range of cells in the grid.

add_pos(pos)[source]

Add a cell to the grid based on a world position.

Parameters:

pos (tuple[float, float])

add_pos_range(lb, ub)[source]

Add a range of cells to the grid based on world coordinate bounds.

Parameters:
property all_centers: list[tuple[float, float]]

List of all occupied cell center positions.

property all_pos: list[tuple[float, float]]

List of all occupied cell positions.

property cell_height: float

Height of each grid cell.

property cell_size: tuple[float, float]

Physical size of each cell as (width, height).

cell_to_pos(cell, mode='default')[source]

Convert grid cell coordinates to continuous world position.

This method applies the full geometric transformation: 1. Scale: Multiply cell indices by cell_size 2. Rotate: Apply rotation matrix 3. Translate: Add offset

Transformation Diagram:

Grid Space → Scale back → Rotate → Offset → World Space

Parameters:
  • cell (tuple[int, int]) – Grid cell coordinates as (row, col)

  • mode (Literal['center', 'lower-left-corner', 'default'], default: 'default') – Position within the cell to return: - “center”: Center of the cell (default) - “lower-left-corner”: Lower-left corner of the cell

Returns:

tuple[float, float] – World coordinates (x, y) of the cell

Examples

>>> from symaware.simulators.carla import GridMap
>>> grid = GridMap((10, 10), (1.0, 1.0), offset=(0, 0))
>>> grid.cell_to_pos((5, 5))  # Center of cell (5,5)
(5.5, 5.5)
>>> grid.cell_to_pos((5, 5), mode="lower-left-corner")
(5.0, 5.0)
property cell_width: float

Width of each grid cell.

clear()[source]

Clear the grid by setting all cells to False.

property cols: int

Number of columns in the grid.

compute_paths(start, targets, use_8_connectivity=True)[source]

Compute all shortest paths from a starting cell to a list of target cells.

This method treats the occupied cells in the grid as nodes in a graph, where adjacent cells are connected. It uses networkx to find all simple paths between start and each target. Target nodes are guaranteed to have no edges connecting them, even if they are adjacent.

Graph Structure:

For 8-connectivity:          ↖ ↑ ↗         ← ● →         ↙ ↓ ↘  For 4-connectivity:          ↑       ← ● →         ↓
Parameters:
  • start (tuple[int, int]) – Starting cell as (row, col) indices

  • targets (list[tuple[int, int]] | tuple[int, int]) – Target cell or list of target cells as (row, col) indices

  • use_8_connectivity (bool, default: True) – If True, uses 8-connectivity (includes diagonals). If False, uses 4-connectivity.

Returns:

dict[tuple[int, int], list[list[tuple[int, int]]]] – List of paths, where each path is a list of cells from start to target.

Examples

>>> from symaware.simulators.carla import GridMap
>>> grid = GridMap((10, 10), (1.0, 1.0))
>>> grid.add_cell_range(0, 10, 0, 10)
>>> paths = grid.compute_paths((0, 0), [(5, 5), (7, 7)])

Notes

  • Only occupied cells are considered as valid nodes in the graph

  • Target nodes will never have edges between them, ensuring path independence

compute_paths_pos(start, targets, use_8_connectivity=True)[source]

Compute all possible paths from a starting position to a target position.

This is a convenience wrapper around compute_paths that accepts positions instead of cell indices.

Parameters:
  • start_pos – Starting position as (x, y) coordinates

  • target_pos – Target position as (x, y) coordinates

  • use_8_connectivity (bool, default: True) – If True, uses 8-connectivity (includes diagonals). If False, uses 4-connectivity. Default is True (King’s move pattern).

Returns:

list[list[tuple[float, float]]] – List of paths, where each path is a list of positions (x, y) from start to target. Returns empty list if no path exists or if start/target are not in occupied cells.

Examples

>>> from symaware.simulators.carla import GridMap
>>> grid = GridMap((10, 10), (1.0, 1.0))
>>> grid.add_cell_range(0, 10, 0, 10)
>>> paths = grid.compute_paths_pos((0.5, 0.5), (5.5, 5.5))
>>> print(f"Found {len(paths)} paths")
Found 1 paths
fill()[source]

Fill the entire grid with the specified boolean value.

classmethod from_bounds(lb, ub, cell_size, rotation=0.0)[source]

Create a GridMap from lower and upper bound positions from world coordinates.

This factory method automatically calculates the required grid size to cover the rectangular region from lb to ub when rotated.

Geometry:

In world coordinates:          In the grid's local frame (after rotation):          ●        ╱ ╲                          ●----------------------● ub       ╱   ╲                         |                      |      ╱     ╲                        |                      |  lb ●       ╲                       |                      |      ╲       ╲                   lb ●----------------------●       ╲       ╲        ╲       ╲         ╲       ● ub          ╲     ╱           ╲   ╱            ╲ ╱             ●

Grid size is computed to cover the rotated bounding box.

Parameters:
  • lb (tuple[float, float]) – Lower bound (bottom-left corner) in world coordinates

  • ub (tuple[float, float]) – Upper bound (top-right corner) in world coordinates

  • cell_size (tuple[float, float]) – Size of each grid cell as (width, height)

  • rotation (float, default: 0.0) – Rotation angle in radians

Returns:

A new GridMap instance covering the specified bounds

Examples

>>> from symaware.simulators.carla import GridMap
>>> grid = GridMap.from_bounds(
...     lb=(0.0, 0.0),
...     ub=(10.0, 20.0),
...     cell_size=(1.0, 1.0)
... )
classmethod from_corner(corner, map_size, cell_size, rotation=0.0)[source]

Create a GridMap from a corner position (world coordinate) and map dimensions (world units).

This is useful when you know the physical size of the area you want to cover.

Geometry:

In world coordinates:          In the grid's local frame (after rotation):              ●            ╱ ╲                          ●----------------------●    height ╱   ╲                         |                      |          ╱     ╲                 height |                      | corner  ●       ╲                       |                      |          ╲       ╲               corner ●----------------------●           ╲       ╲                              width            ╲       ╲       width ╲       ●              ╲     ╱               ╲   ╱                ╲ ╱                 ●
Parameters:
  • corner (tuple[float, float]) – Position of the grid origin (lower-left corner) in world coordinates

  • map_size (tuple[float, float]) – Physical dimensions of the map as (width, height)

  • cell_size (tuple[float, float]) – Size of each grid cell as (width, height)

  • rotation (float, default: 0.0) – Rotation angle in radians

Returns:

A new GridMap instance with the specified dimensions

Examples

>>> from symaware.simulators.carla import GridMap
>>> grid = GridMap.from_corner(
...     corner=(100.0, 100.0),
...     map_size=(50.0, 50.0),
...     cell_size=(1.0, 1.0),
...     rotation=np.pi/4
... )
get_all_pos(mode='default')[source]
Return type:

list[tuple[float, float]]

property grid: ndarray

The underlying grid array (boolean matrix).

property height: float

Total height of the grid in world coordinates.

is_occupied(row, col)[source]

Check if a cell in the grid is occupied.

Return type:

bool

property num_occupied: int

Number of occupied cells in the grid.

plot_grid(ax=None)[source]

Plot the grid in its native discrete space (row-column view).

This shows the grid as a matrix where: - X-axis represents rows - Y-axis represents columns - Black cells are occupied (True) - White cells are free (False)

Grid Visualization:

    cols → rows┌─────────┐ ↓   │░░ ░   ░ │  ← Black = occupied     │ ░   ░   │  ← White = free     │  ░ ░    │     └─────────┘
Returns:

tuple[Figure, Axes] – Matplotlib figure and axes objects

plot_grid_projection(points=None, point_color='green', point_marker='o', point_size=50, ax=None)[source]

Plot the grid projected into continuous world space.

This visualization shows: - Occupied cells as red squares in their world positions - Grid boundary as a blue polygon - The effect of rotation and offset on the grid layout - Optional points in world coordinates

Parameters:
Returns:

tuple[Figure, Axes] – Matplotlib figure and axes objects, or None if no occupied cells

Notes

The marker size is proportional to cell_size to give an accurate representation of cell dimensions in world space.

pos_to_cell(pos)[source]

Convert continuous world position to grid cell coordinates.

This method applies the inverse geometric transformation: 1. Translate: Subtract offset 2. Rotate: Apply inverse rotation matrix (transpose) 3. Scale: Divide by cell_size and floor

Inverse Transformation Diagram:

World Space → Remove offset → Rotate → Scale → Grid Space

Parameters:

pos (tuple[float, float]) – World coordinates as (x, y)

Returns:

tuple[int, int] – Grid cell coordinates as (row, col)

Raises:

AssertionError – If the position is outside the grid bounds

Examples

>>> from symaware.simulators.carla import GridMap
>>> grid = GridMap((10, 10), (1.0, 1.0), offset=(0, 0))
>>> grid.pos_to_cell((5.7, 3.2))
(5, 3)
remove_cell(row, col)[source]

Deactivate a cell in the grid at the specified row and column.

remove_cell_range(row_lb, row_ub, col_lb, col_ub)[source]

Deactivate a range of cells in the grid.

remove_pos(pos)[source]

Remove a cell from the grid based on a woorld position.

Parameters:

pos (tuple[float, float])

remove_pos_range(lb, ub)[source]

Remove a range of cells from the grid based on world coordinate bounds.

Parameters:
property rotation_cos: float

Cosine of the grid rotation angle.

property rotation_sin: float

Sine of the grid rotation angle.

property rows: int

Number of rows in the grid.

set_cell(row, col, value)[source]
set_cell_range(row_lb, row_ub, col_lb, col_ub, value)[source]

Set a range of cells in the grid.

set_pos(pos, value)[source]

Set a cell in the grid based on a world position.

Parameters:
set_pos_range(lb, ub, value)[source]

Set a range of cells in the grid based on world coordinate bounds.

Parameters:
visualize_paths(paths, start=None, targets=None)[source]

Visualize the computed paths on the grid.

Parameters:
  • paths (list[list[tuple[int, int]]]) – List of paths, where each path is a list of cells (row, col)

  • start (tuple[int, int] | None, default: None) – Starting cell to highlight. If None, uses first cell of first path.

  • targets (list[tuple[int, int]] | None, default: None) – Target cells to highlight. If None, infers from paths dict keys or last cells.

Returns:

tuple[Figure, Axes] – Matplotlib figure and axes objects

property width: float

Total width of the grid in world coordinates.

class symaware.simulators.carla.grid.PathsGridMap(grid_size, cell_size, paths=None, offset=None, rotation=0)[source]

Bases: WaypointGridMap

_edges: dict[tuple[tuple[int, int], int], tuple[int, int]]
_terminating_cells: list[tuple[int, int]]
property edges: dict[tuple[tuple[int, int], int], tuple[int, int]]
classmethod from_grid_map(grid_map, start, targets)[source]
property num_paths
set_paths(paths, additional_range=0)[source]
property terminating_cells: list[tuple[int, int]]
class symaware.simulators.carla.grid.WaypointGridMap(grid_size, cell_size, offset=None, rotation=0)[source]

Bases: GridMap

add_cell(row, col, pos=None)[source]

Activate a cell in the grid at the specified row and column.

add_pos(pos)[source]

Add a cell to the grid based on a world position.

Parameters:

pos (tuple[float, float])

cell_to_pos(cell, mode='default')[source]

Convert grid cell coordinates to continuous world position.

This method applies the full geometric transformation: 1. Scale: Multiply cell indices by cell_size 2. Rotate: Apply rotation matrix 3. Translate: Add offset

Transformation Diagram:

Grid Space → Scale back → Rotate → Offset → World Space

Parameters:
  • cell (tuple[int, int]) – Grid cell coordinates as (row, col)

  • mode (Literal['center', 'lower-left-corner', 'default'], default: 'default') – Position within the cell to return: - “center”: Center of the cell (default) - “lower-left-corner”: Lower-left corner of the cell

Returns:

tuple[float, float] – World coordinates (x, y) of the cell

Examples

>>> from symaware.simulators.carla import GridMap
>>> grid = GridMap((10, 10), (1.0, 1.0), offset=(0, 0))
>>> grid.cell_to_pos((5, 5))  # Center of cell (5,5)
(5.5, 5.5)
>>> grid.cell_to_pos((5, 5), mode="lower-left-corner")
(5.0, 5.0)
clear()[source]

Clear the grid by setting all cells to NaN.

fill()[source]

Fill the entire grid with the specified boolean value.

is_occupied(row, col)[source]

Check if a cell in the grid is occupied.

Return type:

bool

property num_occupied: int

Number of occupied cells in the grid.

plot_grid(ax=None)[source]

Plot the grid in its native discrete space (row-column view).

This shows the grid as a matrix where: - X-axis represents rows - Y-axis represents columns - Black cells are occupied (True) - White cells are free (False)

Grid Visualization:

    cols → rows┌─────────┐ ↓   │░░ ░   ░ │  ← Black = occupied     │ ░   ░   │  ← White = free     │  ░ ░    │     └─────────┘
Returns:

tuple[Figure, Axes] – Matplotlib figure and axes objects

remove_cell(row, col)[source]

Deactivate a cell in the grid at the specified row and column.

remove_pos(pos)[source]

Remove a cell from the grid based on a woorld position.

Parameters:

pos (tuple[float, float])

set_cell(row, col, value)[source]
set_cell_range(row_lb, row_ub, col_lb, col_ub, value)[source]

Set a range of cells in the grid.

symaware.simulators.carla.ltl module

author Shuhao Qi

class symaware.simulators.carla.ltl.DFA(states, alphabet, transitions, initial_state, sink_states, AP)[source]

Bases: object

Deterministic Finite Automaton representation.

Represents a DFA with states, alphabet, transitions, and sink states. Used to encode Linear Temporal Logic (LTL) specifications.

states

List of state indices in the DFA

alphabet

List of alphabet symbols (atomic propositions)

transitions

Dictionary mapping (state, symbol) to next state

initial_state

The starting state of the DFA

sink_states

List of accepting or sink states

AP

List of atomic propositions

AP: list[str]
alphabet: list[str]
classmethod from_spec(spec, AP_set=None)[source]
Return type:

DFA

get_alphabet(letter)[source]
Return type:

tuple[bool, ...]

initial_state: int
is_sink_state(state)[source]
Return type:

bool

classmethod null_dfa()[source]
Return type:

DFA

sink_states: list[int]
states: list[int]
transitions: dict[tuple[int, tuple[bool, ...]], int]
class symaware.simulators.carla.ltl.LTL_Spec(spec, AP_set)[source]

Bases: object

Linear Temporal Logic specification translator.

Converts LTL formula strings to DFA representations using the ltlf2dfa library. Supports atomic propositions and standard LTL operators.

gen_transition(trans_condition)[source]
Return type:

dict[tuple[int, tuple[bool, ...]], int]

get_alphabet(letter)[source]
to_network(dfa_dot, plot=False)[source]
translate(spec)[source]

symaware.simulators.carla.mdp module

class symaware.simulators.carla.mdp.LightMDP(num_states, initial_state, num_actions, transitions, labels=<factory>)[source]

Bases: object

A lightweight Markov Decision Process (MDP) representation.

Uses integer indices for states and actions instead of hashable objects. Provides the same interface as MDP but with reduced memory overhead.

Parameters:
  • num_states (int) – Number of states

  • initial_state (int) – Index of the initial state

  • num_actions (int) – Number of actions

  • transitions (ndarray[double]) – 3D array of state transition probabilities accessible via state_from -> action -> state_to

  • labels (tuple[str, ...], default: <factory>) – Labels for the states. Auto-generated if not provided

action_to_idx(action)[source]

Convert an action index (identity function for LightMDP).

Parameters:

action (int) – Action index

Returns:

int – The same action index

property actions: ndarray[int]

Get an array of all actions.

Return type:

Array of action indices from 0 to num_actions-1

get_label(state)[source]

Get the label for a given state.

Parameters:

state (int) – State index

Returns:

str – Label of the state

get_trans_prob(state, action)[source]

Get the transition probability distribution for a given state and action.

Parameters:
  • state (int) – Source state

  • action (int) – Action to take

Returns:

ndarray[double] – Array of transition probabilities to all possible next states, or 0.0 if the state or action is not known

initial_state: int
labels: tuple[str, ...]
classmethod null_mdp()[source]
Return type:

LightMDP

num_actions: int
num_states: int
state_to_idx(state)[source]

Convert a state index (identity function for LightMDP).

Parameters:

state (int) – State index

Returns:

int – The same state index

property states: ndarray[int]

Get an array of all states.

Return type:

Array of state indices from 0 to num_states-1

transitions: ndarray[float64]
class symaware.simulators.carla.mdp.MDP(states, initial_state, actions, transitions, labels=<factory>)[source]

Bases: Generic[_S, _A]

A Markov Decision Process (MDP) representation.

Represents an MDP as a tuple (states, initial_state, actions, transitions). Supports accessing and manipulating transition probabilities and provides MDP composition via the @ operator.

Parameters:
  • states (Iterable[TypeVar(_S)]) – Collection of states. Can contain any hashable type

  • initial_state (TypeVar(_S)) – The initial state

  • actions (Iterable[TypeVar(_A)]) – Collection of actions. Can contain any hashable type

  • transitions (ndarray[double]) – 3D array of state transition probabilities accessible via state_from -> action -> state_to

  • labels (tuple[str, ...], default: <factory>) – Labels for the states. Auto-generated from states if not provided

_action_to_idx: dict[_A, int]
_state_to_idx: dict[_S, int]
property action_idxs

Get an array of all action indices.

Return type:

Array of action indices from 0 to len(actions)-1

action_to_idx(action)[source]

Convert an action to its index.

Parameters:

action (TypeVar(_A)) – Action to convert

Returns:

int – Index of the action

actions: Iterable[_A]
get_label(state)[source]

Get the label for a given state.

Parameters:

state (TypeVar(_S)) – State to get label for

Returns:

str – Label of the state

get_trans_prob(state, action)[source]

Get the transition probability distribution for a given state and action.

Parameters:
  • state (TypeVar(_S)) – Source state

  • action (TypeVar(_A)) – Action to take

Returns:

ndarray[double] – Array of transition probabilities to all possible next states, or 0.0 if the state or action is not known

get_trans_prob_idx(state_idx, action_idx)[source]

Get the transition probability distribution for a given state and action using indices.

Parameters:
  • state_idx (int) – Index of the source state

  • action_idx (int) – Index of the action

Returns:

ndarray[double] – Array of transition probabilities to all possible next states

initial_state: _S
labels: tuple[str, ...]
property num_actions: int
property num_states: int
property state_idxs

Get an array of all state indices.

Return type:

Array of state indices from 0 to len(states)-1

state_to_idx(state)[source]

Convert a state to its index.

Parameters:

state (TypeVar(_S)) – State to convert

Returns:

int – Index of the state

states: Iterable[_S]
transitions: ndarray[float64]
symaware.simulators.carla.mdp.main()[source]

symaware.simulators.carla.path_planner module

author Shuhao Qi

class symaware.simulators.carla.path_planner.PathPlanner(agent_id, world, async_loop_lock=None)[source]

Bases: Component

Path planning component for vehicle navigation in CARLA.

Computes safe and feasible paths for vehicles considering lane changes, junctions, and stopping scenarios. Uses spline-based path generation with behavioral state management.

_compute()[source]

This very generic method is to be implemented by the specialised components. Given the state of the agent, compute a value or a set of values. Those values can be used to update the agent’s state or to compute the control input of the agent.

This method must be implemented in any custom component.

Example

The compute() method can be implemented to perform some custom computation.

>>> import numpy as np
>>> from symaware.base import Component
>>> class MyComponent(Component):
...     def _compute(self):
...         state = self._agent.self_state
...         risk = self._agent.self_awareness.risk.get(0, np.zeros(1))
...         return state * np.maximum(0, risk)
Parameters:
  • args – Additional arguments

  • kwargs – Additional keyword arguments

Returns:

Computed value or set of values

_update(args)[source]

This very generic method is to be implemented by the specialised components. Update the agent with some new values, usually computed by the compute() method.

This method must be implemented in any custom component.

Example

The update() method can be implemented to perform some custom updates.

>>> import numpy as np
>>> from symaware.base import Component, TimeSeries
>>> class MyComponent(Component):
...     def _update(self, new_risk: TimeSeries, new_state: np.ndarray):
...         self._agent.self_awareness.risk = new_risk
...         self._agent.self_state = new_state
Parameters:
extra_wp(wp, max_distance=30.0)[source]
following_path(waypoints)[source]
initialise_component(agent, initial_awareness_database, initial_knowledge_database)[source]

Initialize the component with some custom logic. For example you can instantiate new attributes and properties of the component. This function is called upon initialization by the Agent. To get the agent’s initial awareness and knowledge database you can use the arguments of this function. Make sure to call the super method at the end of your implementation to notify the subscribers of the event.

Note

Invoking this method will notify the subscribers of the event initialised added with add_on_initialised().

Warning

The implementation of the method in Component notifies the subscribers of the event initialised, sets the attribute _agent to the agent passed as argument and the flag _initialised to True. If you override this method, make sure to call the super method at the end of your implementation.

Example

The initialise_component() method can be overwritten to provide some custom initialisation logic.

>>> import time
>>> import numpy as np
>>> from symaware.base import Component
>>> class MyComponent(Component):
...     def initialise_component(
...            self, agent, initial_awareness_database, initial_knowledge_database
...         ):
...         self._my_model = agent.entity.model
...         self._my_state = initial_awareness_database[self.agent_id].state
...         self._my_time = time.time()
...         self._my_list = []
...         super().initialise_component(entity, initial_awareness_database, initial_knowledge_database)
Parameters:
  • agent – Agent this component has been attached to

  • initial_awareness_database – Awareness database of the agent

  • initial_knowledge_database – Knowledge database of the agent

junction_waypoints(fixed_lane)[source]
lane_change_path(lane_target, merge_point)[source]
merge_point_calcu(lane_target, merge_dist)[source]
stopping_path(waypoints, vehicle, stop_point)[source]
class symaware.simulators.carla.path_planner.Status(*values)[source]

Bases: Enum

Enumeration of path planning statuses.

Defines possible states during path planning: following lane, lane changing, and stopping scenarios.

FOLLOWING = 0
LANE_CHANGING_L = 2
LANE_CHANGING_R = 4
START_LANE_CHANGE_L = 1
START_LANE_CHANGE_R = 3
STOPPING = 5

symaware.simulators.carla.perception module

author Shuhao Qi

class symaware.simulators.carla.perception.FeatureExtractor(agent_id, environment, async_loop_lock=None)[source]

Bases: PerceptionSystem

Extracts features and observations from the CARLA environment.

Perceives vehicle states, lane information, traffic lights, and nearby vehicles to build a comprehensive observation of the driving environment.

_compute()[source]

Compute the information perceived by the agent. It returns the values collected from the environment.

Returns:

dict[int, FeatureObservation] – Information perceived by the agent. Each agent’s state is identified by its id.

_update(perceived_information)[source]

Update the agent’s model with the new perceived information.

Example

A new perception system could decide to override the default _update() method.

>>> from symaware.base import PerceptionSystem, StateObservation, Identifier
>>> class MyPerceptionSystem(PerceptionSystem):
...     def _update(self, perceived_information: "dict[Identifier, StateObservation]"):
...         # Your implementation here
...         # Example:
...         # Simply override the state of the agent
...         self._agent.awareness_database[self._agent_id].state = perceived_information[self._agent_id].state
Parameters:

perceived_information (dict[int, FeatureObservation]) – Information perceived by the agent. Each agent’s state is identified by its id.

bbox_display(vehicle, color)[source]
check_onroad(vehicle, lane)[source]
draw_lane_line(pos_list, side='right')[source]
draw_lane_points(wp_l, side='right')[source]
find_cars_onlane(lane)[source]

find the forwards and backwards car on the selected lane

find_lanepoint_left(wp)[source]
find_lanepoint_right(wp)[source]
find_lead_car(cur_wp)[source]
Return type:

tuple[float, float, float, float] | None

generate_position_list(wp_l, side='right')[source]
lane_display(wp_list)[source]
point_display(my_color)[source]
ref_display(ref_x, ref_y)[source]
traffic_light_rule()[source]
wp_list_extract(cur_wp)[source]
wp_side_extract(wp_list, wp, side)[source]
class symaware.simulators.carla.perception.FeatureObservation(observed_object_id, cur_lane=None, is_junction=False, current_wp=<factory>, all_wp=<factory>, wp_list=<factory>, lane_width=0.0, junction_lane=None, lead_car=None)[source]

Bases: Observation

Observation class for the FeatureExt system.

Parameters:
  • observed_object_id – id of the agent that is the subject of the observation

  • state – state of the observed agent

all_wp: list[carla.Waypoint]
cur_lane: carla.Waypoint | None = None
current_wp: list[carla.Waypoint]
is_junction: bool = False
junction_lane: carla.Waypoint | None = None
lane_width: float = 0.0
lead_car: tuple[float, float, float, float] | None = None
wp_list: list[carla.Waypoint]

symaware.simulators.carla.risk module

author Shuhao Qi

class symaware.simulators.carla.risk.Product(MDP, DFA_cs, DFA_safe)[source]

Bases: Generic[_S, _A]

The product MDP of $mathcal{M}$, $A_{cs}$ and $A_s$ is defined as a tuple $P = (Z, z_0, A, hat{T}, G, D)$, with the state set

\[Z \coloneq S \times Q_{cs} \times Q_s A \coloneq \text{action set} z_0 \coloneq (\bar{s}_0, \delta_{cs}(q^0_{cs}, \mathcal{L}(s_0)_cs, \delta_s(q^0_s, L(\bar{s}_0))) \hat{T} \coloneq Z \times A \times Z \to [0,1]\]
Parameters:
  • MDP (MDP[TypeVar(_S), TypeVar(_A)]) – The Markov Decision Process representing system dynamics

  • DFA_cs (DFA) – The co-safety (liveness) DFA

  • DFA_safe (DFA) – The safety DFA

_state_to_index(state)[source]
Return type:

int

gen_cost_map(cost_func=None, abstraction=None)[source]
Return type:

ndarray

gen_final_states()[source]
gen_product_transition()[source]
get_next_prod_state(mdp_state, last_product_state)[source]
Return type:

tuple[int, tuple[TypeVar(_S), int, int]]

update(MDP)[source]
class symaware.simulators.carla.risk.RiskLTL(abs_model, MDP_sys, MDP_env=LightMDP(num_states=0, initial_state=0, num_actions=0, transitions=array([], shape=(0, 0, 0), dtype=float64), labels=()), DFA_safe=DFA(states=[], alphabet=[], transitions={}, initial_state=0, sink_states=[], AP=[]), DFA_sc=DFA(states=[], alphabet=[], transitions={}, initial_state=0, sink_states=[], AP=[]), cost_func=None)[source]

Bases: object

Risk-aware controller using LTL specifications and a Markov Decision Process.

Solves the problem of finding a policy that satisfies LTL specifications while minimizing cost using a product automaton and Linear Programming.

cal_risk(policy_map, cost_map)[source]
change_cost_func(cost_func)[source]
property cost_map
static policy_from_file(file_path)[source]
Return type:

ndarray

property product
update(sys_state, env_state, risk_th)[source]
class symaware.simulators.carla.risk.VelocityLPRiskLTL(abs_model, DFA_safe, DFA_sc, cost_func=None)[source]

Bases: RiskLTL

Velocity-based LTL risk controller with Linear Programming solver.

Extends RiskLTL to handle velocity-based abstractions and provides efficient optimization using Gurobi Linear Programming.

_compute_transition_time()[source]
property abstraction: VelocityGridAbstraction
extract(occup_dict)[source]
property hash
plot_policy(policy, subsample=1, ax=None)[source]

Create a 3D visualization of the policy showing state transitions.

This function plots the policy as a 3D vector field where: - X, Y axes represent spatial position - Z axis represents speed - Arrows show the transition from current state to next state based on the policy

Parameters:
  • policy (ndarray) – Policy array where each index corresponds to a state (composed of x, y, speed) and each element is the action to take in that state

  • subsample (int, default: 1) – Plot every nth state to reduce clutter (default: 1, plot all states)

Returns:

Matplotlib figure and 3D axes objects

Returns:

tuple[Figure, Axes3D]

plot_policy_plane(policy, initial_state, flexible_speed=False, ax=None)[source]
Return type:

tuple[Figure, plt.Axes]

solve(initial_state=None, initial_guess=None)[source]
update(pos_speed, label_function=None, cost_func=None)[source]

symaware.simulators.carla.util module

Cubic Spline library on python

author Atsushi Sakai

usage: see test codes as below

license: MIT

class symaware.simulators.carla.util.Spline(x, y)[source]

Bases: object

Cubic Spline class

__calc_A(h)

calc matrix A for spline coefficient c

Return type:

ndarray

__calc_B(h)

calc matrix B for spline coefficient c

Return type:

ndarray

__search_index(x)

search data segment index

Return type:

int

calc(t)[source]

Calc position

if t is outside of the input x, return None

calcd(t)[source]

Calc first derivative

if t is outside of the input x, return None

calcdd(t)[source]

Calc second derivative

class symaware.simulators.carla.util.Spline2D(x, y)[source]

Bases: object

2D Cubic Spline class

__calc_s(x, y)
Return type:

list[float]

calc_curvature(s)[source]

calc curvature

calc_position(s)[source]

calc position

calc_yaw(s)[source]

calc yaw

symaware.simulators.carla.util.bernstein_poly(n, i, t)[source]

Bernstein polynom. :type n: int :param n: (int) polynom degree :type i: int :param i: (int) :type t: float :param t: (float)

Returns:

float – (float)

symaware.simulators.carla.util.bezier(t, control_points)[source]

Return one point on the bezier curve. :type t: :param t: (float) number in [0, 1] :type control_points: :param control_points: (numpy array) :return: (numpy array) Coordinates of the point

symaware.simulators.carla.util.bezier_derivatives_control_points(control_points, n_derivatives)[source]

Compute control points of the successive derivatives of a given bezier curve. A derivative of a bezier curve is a bezier curve. See https://pomax.github.io/bezierinfo/#derivatives for detailed explanations :type control_points: :param control_points: (numpy array) :type n_derivatives: :param n_derivatives: (int) e.g., n_derivatives=2 -> compute control points for first and second derivatives :return: ([numpy array])

symaware.simulators.carla.util.branch_wp(wp, max_distance=80, distance_rate=1.4)[source]

Get all branching waypoints (including lane changes).

Parameters:
  • wp (Waypoint) – Starting waypoint

  • max_distance (float, default: 80) – Maximum distance ahead to gather waypoints

  • distance_rate (float, default: 1.4) – Rate at which distance increases between waypoints

Returns:

List of all waypoints including branches from the starting point

symaware.simulators.carla.util.cal_angle(vec_1, vec_2)[source]

Calculate angle between two 2D vectors.

Parameters:
  • vec_1 (list[float]) – First vector as [x, y]

  • vec_2 (list[float]) – Second vector as [x, y]

Returns:

float | None – Angle in radians, or None if either vector has zero magnitude

symaware.simulators.carla.util.calc_curvature(dx, dy, ddx, ddy)[source]

Compute curvature at one point given first and second derivatives. :type dx: :param dx: (float) First derivative along x axis :type dy: :param dy: (float) :type ddx: :param ddx: (float) Second derivative along x axis :type ddy: :param ddy: (float) :return: (float)

symaware.simulators.carla.util.calc_path(sx, sy, syaw, ex, ey, eyaw, offset, d_dist)[source]

Compute Bezier curve path from start to end position.

Parameters:
  • sx (float) – X-coordinate of the starting point

  • sy (float) – Y-coordinate of the starting point

  • syaw (float) – Yaw angle at start (radians)

  • ex (float) – X-coordinate of the ending point

  • ey (float) – Y-coordinate of the ending point

  • eyaw (float) – Yaw angle at the end (radians)

  • offset (float) – Control point offset

  • d_dist (float) – Distance between points on the path

Returns:

Tuple of (path_x, path_y) numpy arrays representing the bezier path

symaware.simulators.carla.util.calc_s(rx, ry)[source]
symaware.simulators.carla.util.calc_yaw(t, control_points)[source]

calc yaw

symaware.simulators.carla.util.local_wp(wp, max_distance=50, distance_rate=1.4)[source]

Get waypoints ahead on the current lane.

Parameters:
  • wp (Waypoint) – Starting waypoint

  • max_distance (float, default: 50) – Maximum distance ahead to gather waypoints

  • distance_rate (float, default: 1.4) – Rate at which distance increases between waypoints

Returns:

List of waypoints ahead on the current lane

symaware.simulators.carla.util.pi_2_pi(theta)[source]
Return type:

float

symaware.simulators.carla.util.ref_waypoint(wp, max_dist=30.0, dist_rate=1.4)[source]

Get reference waypoints ahead with exponential spacing.

Parameters:
  • wp (Waypoint) – Starting waypoint

  • max_dist (float, default: 30.0) – Maximum distance ahead to gather waypoints

  • dist_rate (float, default: 1.4) – Rate at which distance increases exponentially

Returns:

list[Waypoint] – List of reference waypoints with exponential spacing

symaware.simulators.carla.util.test_spline()[source]
symaware.simulators.carla.util.test_spline2d()[source]

Module contents