symaware.base.models package

Submodules

symaware.base.models.dynamical_model module

class symaware.base.models.dynamical_model.DynamicalModel(ID, control_input)[source]

Bases: ABC

Abstract class for dynamical models that will simulate the behaviour of an entity. At each step of the simulation, the model is fed the control input from the Controller.

Parameters:
  • ID (int) – Identifier of the agent this model belongs to. A value of -1 indicates that no agent is associated with this model.

  • control_input (ndarray) – Initial control input of the model. Is is also used to check the shape of future control inputs

property control_input: ndarray

Last control input received by the model.

property control_input_shape: tuple[int, ...]

Shape of the control input vector.

property control_input_size: int

Size of the control input vector.

property id: int

Identifier of the agent this model belongs to. A value of -1 indicates that no agent is associated with this model.

abstract initialise(*args, **kwargs)[source]

Initialise the dynamical model. Called by the associated entity when it is added to the environment.

Parameters:
  • args – Additional arguments

  • kwargs – Additional keyword arguments

abstract step(*args, **kwargs)[source]

Run a simulation step of the dynamical model. Called by the associated entity at each simulation step.

Parameters:
  • args – Additional arguments

  • kwargs – Additional keyword arguments

Return type:

ndarray

abstract property subinputs_dict: DynamicalModelSubinputs

The input of a system is the composition of subinputs.

Example

A car input [vx, vy, vz, s] is composed by the velocity and the steering angle.

>>> from symaware.base import DynamicalModel
>>> class CarModel(DynamicalModel):
...     @property
...     def subinputs_dict(self):
...         return {
...             "velocity": self._control_input[:3],
...             "steering_angle": self._control_input[3]
...         }
...

Important

The order of the subinputs in the list must be the same as the order of the subinputs in the input vector

class symaware.base.models.dynamical_model.DynamicalModelSubinputs[source]

Bases: TypedDict

class symaware.base.models.dynamical_model.NullDynamicalModel[source]

Bases: DynamicalModel, NullObject

Default dynamical model used as a placeholder. It is used when no dynamical model has been defined for an entity. The entity will just ignore it and won’t have any step function associated with it. An exception is raised if this object is used in any way.

initialise()[source]

Initialise the dynamical model. Called by the associated entity when it is added to the environment.

Parameters:
  • args – Additional arguments

  • kwargs – Additional keyword arguments

step()[source]

Run a simulation step of the dynamical model. Called by the associated entity at each simulation step.

Parameters:
  • args – Additional arguments

  • kwargs – Additional keyword arguments

property subinputs_dict

The input of a system is the composition of subinputs.

Example

A car input [vx, vy, vz, s] is composed by the velocity and the steering angle.

>>> from symaware.base import DynamicalModel
>>> class CarModel(DynamicalModel):
...     @property
...     def subinputs_dict(self):
...         return {
...             "velocity": self._control_input[:3],
...             "steering_angle": self._control_input[3]
...         }
...

Important

The order of the subinputs in the list must be the same as the order of the subinputs in the input vector

symaware.base.models.entity module

class symaware.base.models.entity.Entity(id=-1, model=<factory>)[source]

Bases: ABC

Generic class that represents an entity in the simulation environment. It can be linked to an Agent. Additionally, can support a dynamical model that determines its behaviour during the simulation, based on the control inputs it receives.

Note

The initialise and step method will be called by the simulation environment, and should be implemented by the subclasses to define the characteristics of the entity according to the chosen simulator.

Parameters:
  • id (int, default: -1) – id of the agent this entity is linked to

  • model (DynamicalModel, default: <factory>) – dynamical model that determines the behaviour of this entity, if any

id: int = -1
abstract initialise(*args, **kwargs)[source]
model: DynamicalModel
step()[source]

symaware.base.models.environment module

class symaware.base.models.environment.Environment(async_loop_lock=None)[source]

Bases: Publisher, AsyncLoopLockable

Just a support class to have multiple agents working in the same environment

Parameters:

async_loop_lock (AsyncLoopLock | None, default: None) – async loop lock to use with the environment

abstract _add_entity(entity)[source]

Add an entity to the environment, initialising it. The actual implementation should be done in the derived class, based on the simulated environment API. The entity’s initialise_entity() function should be called within this function with the appropriate arguments.

Parameters:

entity (Entity) – entity to initialise

add_agents(*agents)[source]

Abstract high level interface this class exposes to add entities to the environment. The _add_single_obstacle() will add the entity to the underlying physics engine.

Parameters:

agent – agent to add to the environment

add_entities(*entities)[source]

Abstract high level interface this class exposes to add entities to the environment. The _add_single_obstacle() will add the entity to the underlying physics engine. Once the entity is added, it is also stored in the internal set of entities, to avoid adding and initialising it multiple times.

Parameters:

entities (Entity) – single instance of symaware.base.Entity or an iterable of them

add_on_initialised(callback)[source]

Add a callback to the event initialised

Parameters:

callback (Callable[[Environment], Any]) – Callback to add

add_on_initialising(callback)[source]

Add a callback to the event initialising

Parameters:

callback (Callable[[Environment], Any]) – Callback to add

add_on_stepped(callback)[source]

Add a callback to the event stepped

Parameters:

callback (Callable[[Environment], Any]) – Callback to add

add_on_stepping(callback)[source]

Add a callback to the event stepping

Parameters:

callback (Callable[[Environment], Any]) – Callback to add

add_on_stopped(callback)[source]

Add a callback to the event stopped

Parameters:

callback (Callable[[Environment], Any]) – Callback to add

add_on_stopping(callback)[source]

Add a callback to the event stopping

Parameters:

callback (Callable[[Environment], Any]) – Callback to add

property agent_states: dict[int, ndarray]

Dictionary mapping agent identifiers to their states in the environment

async async_run()[source]

Start the environment loop asynchronously. It will run the environment until async_stop() is called. The frequency of the loop is determined by the AsyncLoopLock used to initialise the environment.

async async_stop()[source]

Gracefully stop the environment loop asynchronously. Once the last cycle is completed, the control is returned to the caller.

property elapsed_time: float

Time elapsed since the start of the simulation. If the simulation has not started yet, it will return -1. Can be used to synchronise the agents in the environment. It is 0 at the first simulation step and incremented by the delta time of each subsequent step.

property entities: set[Entity]

Set of entities in the environment

get_agent_state(agent)[source]
Return type:

ndarray

abstract get_entity_state(entity)[source]

Get the state of an entity in the environment. The actual implementation should be done in the derived class, based on the simulated environment API.

Parameters:

entity (Entity) – entity to get the state of

Returns:

ndarray – State of the entity within the environment

abstract initialise()[source]

Initialise the simulation, allocating the required resources. Should be called when the simulation has been set up and is ready to be run. Some environment implementations may call it automatically when the environment is created. It is their responsibility to ensure that the method is idempotent.

remove_on_initialised(callback)[source]

Remove a callback from the event initialised

Parameters:

callback (Callable[[Environment], Any]) – Callback to remove

remove_on_initialising(callback)[source]

Remove a callback from the event initialising

Parameters:

callback (Callable[[Environment], Any]) – Callback to remove

remove_on_stepped(callback)[source]

Remove a callback from the event stepped

Parameters:

callback (Callable[[Environment], Any]) – Callback to remove

remove_on_stepping(callback)[source]

Remove a callback from the event stepping

Parameters:

callback (Callable[[Environment], Any]) – Callback to remove

remove_on_stopped(callback)[source]

Remove a callback from the event stopped

Parameters:

callback (Callable[[Environment], Any]) – Callback to remove

remove_on_stopping(callback)[source]

Remove a callback from the event stopping

Parameters:

callback (Callable[[Environment], Any]) – Callback to remove

property start_time: float

Time at which the simulation started. If the simulation has not started yet, it will return -1.

abstract step()[source]

It can be called repeatedly step the environment forward in time, updating the state of all the entities.

abstract stop()[source]

Terminate the simulation, releasing the resources. Should be called when the simulation has been running manually and needs to be stopped. Some environment implementations may call it automatically when the environment is destroyed. It is their responsibility to ensure that the method is idempotent.

Warning

Depending on the simulator implementation, calling this method may invalidate all the entities previously added to the environment. In that case, entities and the environment should be recreated from scratch.

Module contents