symaware.base package

Subpackages

Submodules

symaware.base.agent module

class symaware.base.agent.Agent(ID, entity)[source]

Bases: Generic[T]

An agent represents an entity that can interact with the environment and other agents. It is made of several components. While the exact kind and number is up to the instance, usually these are the most common ones:

  • An symaware.base.Entity representing the simulated physical entity of the agent, with a dynamical model.

  • A Controller that computes the control input of the agent based on the current information.

  • A RiskEstimator that computes the risk of the agent based on the current information.

  • An UncertaintyEstimator that computes the uncertainty of the agent based on the current information.

  • A PerceptionSystem that collects information from the environment.

  • A CommunicationSystem that collects information from other agents and sends information to other agents.

Each component can easily be replaced by a custom one by subclassing the corresponding class.

Agents support for synchronous and asynchronous execution.

Note

To simplify coordination between multiple agents, the use of a AgentCoordinator is recommended.

Example

Create an agent with a simple dynamical model and the default components using the pybullet simulator:

>>> 
>>> import pybullet as p
>>> import numpy as np
>>> from symaware.simulators.pybullet import Environment, RacecarEntity, RacecarModel
>>> from symaware.base import Agent, TimeIntervalAsyncLoopLock, KnowledgeDatabase, AgentCoordinator, AwarenessVector
>>> from symaware.base import DefaultCommunicationSender, DefaultCommunicationReceiver
>>> from symaware.base import DefaultInfoUpdater, DefaultController
>>> from symaware.base import DefaultPerceptionSystem, DefaultRiskEstimator, DefaultUncertaintyEstimator
>>> agent_id = 0
>>> env = Environment(connection_method=p.DIRECT, async_loop_lock=TimeIntervalAsyncLoopLock(0.1))
>>> entity = RacecarEntity(agent_id, model=RacecarModel(agent_id))
>>> agent = Agent[KnowledgeDatabase](agent_id, entity)
>>> env.add_agents(agent)
>>> agent.add_components(
...     DefaultController(agent_id, TimeIntervalAsyncLoopLock(0.1)),
...     DefaultRiskEstimator(agent_id, TimeIntervalAsyncLoopLock(0.1)),
...     DefaultUncertaintyEstimator(agent_id, TimeIntervalAsyncLoopLock(0.1)),
...     DefaultPerceptionSystem(agent_id, env, TimeIntervalAsyncLoopLock(0.1)),
...     DefaultCommunicationSender(agent_id, TimeIntervalAsyncLoopLock(0.1)),
...     DefaultCommunicationReceiver(agent_id, TimeIntervalAsyncLoopLock(0.1)),
... )
>>> agent.initialise_agent(
...     initial_awareness_database=AwarenessVector(agent_id, np.zeros(7)),
...     initial_knowledge_database={agent_id: KnowledgeDatabase(goal_reached=False, goal_pos=np.zeros(3))},
... )
>>> AgentCoordinator(env, agent).run()
Parameters:
  • ID (int) – Identifier of the agent

  • entity (Entity) – Entity representing the simulated physical entity of the agent

_LOGGER = <Logger symaware.base.Agent (WARNING)>
async _async_step(component)[source]

Asynchronously run the step function for the component. Each component will compute its new value and update the agent accordingly. The frequency of the loop is determined by the component’s AsyncLoopLock.

Parameters:

component (Component) – Component to run asynchronously

add_components(*components)[source]

Add a component or a list of components to the agent.

Parameters:

components (Component) – single component or a list of components to add to the agent

async async_initialise_agent(initial_awareness_database, initial_knowledge_database)[source]

Initializes the agent with the given initial awareness database and initial knowledge database.

Parameters:
  • initial_awareness_database – The initial awareness database containing the state of the agent. If it is a dictionary, it must at least contain the state of the agent itself. Otherwise, it is assumed to be representing the state of the agent.

  • initial_knowledge_database – The initial knowledge database. If it is a dictionary, it must at least contain the state of the agent itself. Otherwise, it is assumed to be representing the state of the agent.

Raises:

ValueError – The agent ID is not present in the initial awareness database: or if the dimension of the awareness vector does not match the dimension of the dynamical model state input.

async async_run()[source]

Run the agent asynchronously. It will run all the components asynchronously.

async async_stop()[source]

Stop the agent asynchronously. Makes sure all the components have stopped running to stop the agent gracefully.

property awareness_database: dict[int, AwarenessVector]

Agent’s awareness database

property communication_receiver: CommunicationReceiver

Communication receiver for the agent. Null component is none is present

property communication_receivers: tuple[CommunicationReceiver, ...]

Communication receivers for the agent

property communication_sender: CommunicationSender

Communication system for the agent. Null component is none is present

property communication_senders: tuple[CommunicationSender, ...]

Communication system for the agent

property components: tuple[Component, ...]

Tuple containing all the components of the agent

property control_input: ndarray

Agent’s control input

property controller: Controller

Controller for the agent. Null component is none is present

property controllers: tuple[Controller, ...]

Controllers for the agent

property entity: Entity

Agent entity

property id: int

Agent ID

initialise_agent(initial_awareness_database, initial_knowledge_database)[source]

Initializes the agent with the given initial awareness database and initial knowledge database.

Parameters:
  • initial_awareness_database – The initial awareness database containing the state of the agent. If it is a dictionary, it must at least contain the state of the agent itself. Otherwise, it is assumed to be representing the state of the agent.

  • initial_knowledge_database – The initial knowledge database. If it is a dictionary, it must at least contain the state of the agent itself. Otherwise, it is assumed to be representing the state of the agent.

Raises:

ValueError – The agent ID is not present in the initial awareness database: or if the dimension of the awareness vector does not match the dimension of the dynamical model state input.

property is_initialised: bool

Check if the initialise_agent() method has been called before

property knowledge_database: MultiAgentKnowledgeDatabase[T]

Agent’s knowledge database

property model: DynamicalModel

Dynamical model for the agent

property perception_system: PerceptionSystem

Perception system for the agent. Null component is none is present

property perception_systems: tuple[PerceptionSystem, ...]

Perception systems for the agent

remove_components(*components)[source]

Remove a component or a list of components from the agent.

Parameters:

components (Component) – Components to remove from the agent

Raises:
  • TypeError – The component is not a Component object:

  • KeyError – The component is not present in the agent:

property risk_estimator: RiskEstimator

Risk estimator for the agent. Null component is none is present

property risk_estimators: tuple[RiskEstimator, ...]

Risk estimators for the agent

property self_awareness: AwarenessVector

Agent’s awareness vector

property self_knowledge: T

Agent’s knowledge

property self_state: ndarray

Agent’s state

step()[source]

Step function for the agent. Will run at each time step. It will, in order:

  • Collect information from the environment

  • Receive messages from other agents

  • Compute and update the risk

  • Compute and update the uncertainty

  • Compute the control input

  • Send the awareness and knowledge databases to other agents

Note

If multiple components of the same kind are present, the order in which they are added is the order in which they will be executed.

Example

Override the step() method in a subclass to change the order of components’ execution.

>>> from symaware.base import Agent
>>> class MyAgent(Agent):
...     def step(self):
...         # Your implementation here
...         # Example:
...         # Compute all the components in the order they were added
...         for component in self.components:
...             component.compute_and_update()
Raises:

ValueError – The agent must be initialised before running the step function.:

property uncertainty_estimator: UncertaintyEstimator

Uncertainty estimator for the agent. Null component is none is present

property uncertainty_estimators: tuple[UncertaintyEstimator, ...]

Uncertainty estimator for the agent

symaware.base.agent_coordinator module

class symaware.base.agent_coordinator.AgentCoordinator(env, *agents, post_init=None, post_stop=None, async_post_init=None, async_post_stop=None)[source]

Bases: Generic[T]

The agent coordinator is responsible for running all agents and the environment in an asynchronous manner. It is advised to use this class instead of running the agents and environment manually.

After loading the agents and the environment, the coordinator can be started by calling the run() or async_run() methods. It is possible to add a callback that will be executed after the initialisation and one after the termination of the agents and the environment.

Parameters:
__LOGGER = <Logger symaware.base.AgentCoordinator (WARNING)>
async _initialise(initialise_info)[source]

Initialise all agents asynchronously. Only the agents that have not been initialised before and are present in the initialise_info will be initialised.

Parameters:

initialise_info – Dictionary containing the initialisation information for each agent. The agent id is the key and the value is a tuple containing the awareness vector and the knowledge database

static _raise_sigint()[source]

Raise a SIGINT signal to stop the event loop gracefully.

static _raise_system_exit()[source]

Raise a SystemExit exception. Used to stop the event loop gracefully.

Raises:

SystemExit – A signal was received to stop the event loop:

Return type:

NoReturn

async _run()[source]

Run all agents and the environment asynchronously. The method will return when all agents and the environment have been stopped.

_set_loop_stop_signals(stop_signals)[source]

Set the stop signals for the event loop. By default, the signals SIGINT, SIGTERM and SIGABRT are used.

Parameters:

stop_signals (tuple[Signals, ...]) – Signals that will cause the event loop to stop gracefully

async _stop()[source]

Stop all agents and the environment gracefully. The method will return when all agents and the environment have been stopped.

add_agents(*agents)[source]

Add agents to the coordinator.

Parameters:

agents (Agent) – Agents that will be run by the coordinator

Raises:

ValueError – Agents must be an Agent or an Iterable of Agent:

property agents: list[Agent]

Collection of agents that will be run by the coordinator

async_run(initialise_info=None, timeout=-1, loop=None, stop_signals=None, close_loop=True)[source]

Start running the event loop. All agents will begin running their components asynchronously. The frequency at which each Component is executed is determined by their AsyncLoopLock. All agents that have not yet been initialised will be initialised before running, as long as the initialise_info contains the initialisation information for the agent, namely a tuple with the awareness vector and the knowledge database.

Example

The following example shows how to create an environment, agents and a coordinator.

>>> from symaware.base import (
...     AgentCoordinator,
...     Agent,
...     Environment,
...     TimeIntervalAsyncLoopLock,
...     Entity,
...     DynamicalModel,
... )
>>>
>>> class MyEnvironment(Environment):
...     ...
>>> class MyEntity(Entity):
...     ...
>>> class MyModel(DynamicalModel):
...     ...
>>>
>>> # Create the environment
>>> env = MyEnvironment(async_loop_lock=TimeIntervalAsyncLoopLock(0.1)) 
>>>
>>> # Create the agents
>>> agents = [Agent(i, MyEntity(i, MyModel(i))) for i in range(1, 5)] 
>>>
>>> # Set the components for each agent
>>> for agent in agents: 
...     agent.set_components(...) 
>>>
>>> # Create the coordinator
>>> coordinator = AgentCoordinator(env, agents) 
>>>
>>> # Run the coordinator
>>> coordinator.async_run() 
Parameters:
  • initialise_info (default: None) – Dictionary containing the initialisation information for each agent. The agent id is the key and the value is a tuple containing the awareness vector, the knowledge database and the time

  • timeout (default: -1) – Time in seconds after which the event loop will stop. If the timeout is negative, the event loop will run indefinitely

  • loop (default: None) – event loop. If none is provided, the default one will be used

  • stop_signals (default: None) – Signals that will cause the event loop to stop gracefully

  • close_loop (default: True) – Whether to close the event loop upon termination

Raises:

Exception – An unforeseen exception was raised during execution:

property env: Environment

Environment the agents will interact with and the coordinator will run

property is_initialised: bool

Whether all agents are initialised

run(time_step, timeout=-1, steps_limit=-1, initialise_info=None, pre_agent_iteration_callback=None)[source]

Start running all the agents and the environment. All agents will begin running their components synchronously. The frequency at which each Component is executed is determined by the time_step. All agents that have not yet been initialised will be initialised before running, as long as the initialise_info contains the initialisation information for the agent, namely a tuple with the awareness vector and the knowledge database.

Example

The following example shows how to create an environment, agents and a coordinator.

>>> from symaware.base import (
...     AgentCoordinator,
...     Agent,
...     Environment,
...     TimeIntervalAsyncLoopLock,
...     Entity,
...     DynamicalModel,
... )
>>>
>>> class MyEnvironment(Environment):
...     ...
>>> class MyEntity(Entity):
...     ...
>>> class MyModel(DynamicalModel):
...     ...
>>>
>>> # Create the environment
>>> env = MyEnvironment(async_loop_lock=TimeIntervalAsyncLoopLock(0.1)) 
>>>
>>> # Create the agents
>>> agents = [Agent(i, MyEntity(i, MyModel(i))) for i in range(1, 5)] 
>>>
>>> # Set the components for each agent
>>> for agent in agents: 
...     agent.set_components(...) 
>>>
>>> # Create the coordinator
>>> coordinator = AgentCoordinator(env, agents) 
>>>
>>> # Run the coordinator with a time step of 1 second
>>> coordinator.run(1) 
Parameters:
  • time_step – Time step representing the interval at which the agents and the environment will be updated

  • timeout (default: -1) – Time in seconds after which the event loop will stop. If the timeout is negative, the event loop will run indefinitely

  • steps_limit (default: -1) – Maximum number of steps the event loop will run before stopping. If the limit is negative, the agents will run indefinitely

  • initialise_info (default: None) – Dictionary containing the initialisation information for each agent. The agent id is the key and the value is a tuple containing the awareness vector, the knowledge database and the time

  • pre_agent_iteration_callback (default: None) – Callback that will be executed before each agent iteration. The callback will receive the agent, the iteration count and the elapsed time as arguments

Raises:

Exception – An unforeseen exception was raised during execution:

property running: bool

Whether the coordinator is running

Module contents

Base package for SymAware. It contains the basic components and classes to create an agent and its environment. Most of the classes are abstract and need to be implemented by the user, although some default implementations are provided.

To run a simulation using the package’s own main function, the user needs to create a configuration python file that contains a function called configure_symaware(). The function must return a SymawareConfig object.

Example

Given a configuration file called my_config.py which contains a function like the following:

>>> # Import all the classes that will be used in the configuration
>>> from symaware.base import SymawareConfig  # ... other imports
>>> def configure_symaware(num_agents: int, time_step: float, disable_async: bool) -> SymawareConfig:
...
...    def time_loop_lock() -> "TimeIntervalAsyncLoopLock | None":
...        return TimeIntervalAsyncLoopLock(time_step) if not disable_async else None
...
...    #############################################################
...    # 1. Create the environment and add any number of obstacles #
...    #############################################################
...    env = PyBulletEnvironment(real_time_interval=time_step if disable_async else 0, async_loop_lock=time_loop_lock())
...    env.add_entities(PybulletBox(position=np.array([2, 2, 2])), PybulletSphere(position=np.array([-4, -4, 2])))
...
...    #############################################################
...    # 2. Create the agents and add them to the environment      #
...    #############################################################
...    entities = [PyBulletRacecar(i, model=PybulletRacecarModel(i)) for i in range(num_agents)]
...    for i, entity in enumerate(entities):
...        entity.position = np.array([i, i, 0.1])
...    agents = [Agent[MyKnowledgeDatabase](i, entities[i]) for i in range(num_agents)]
...    env.add_agents(agents)
...
...    #############################################################
...    # 3. Create the knowledge database and the awareness vector #
...    #############################################################
...    knowledge_databases = [
...        {
...            i: MyKnowledgeDatabase(
...                goal_reached=False,
...                goal_pos=np.array([2, 2]),
...                tolerance=0.1,
...                speed=20 + 10 * i,
...                steering_tolerance=0.01,
...            )
...        }
...        for i in range(num_agents)
...    ]
...    awareness_vectors = [AwarenessVector(i, np.zeros(7)) for i in range(num_agents)]
...
...    #############################################################
...    # 4. Return the configuration                               #
...    #############################################################
...    return SymawareConfig(
...        agent=agents,
...        controller=[MyController(i, time_loop_lock()) for i in range(num_agents)],
...        knowledge_database=knowledge_databases,
...        awareness_vector=awareness_vectors,
...        risk_estimator=[DefaultRiskEstimator(i, time_loop_lock()) for i in range(num_agents)],
...        uncertainty_estimator=[DefaultUncertaintyEstimator(i, time_loop_lock()) for i in range(num_agents)],
...        communication_system=[
...            DefaultCommunicationSystem(i, time_loop_lock(), time_loop_lock()) for i in range(num_agents)
...        ],
...        info_updater=[DefaultInfoUpdater(i, time_loop_lock()) for i in range(num_agents)],
...        perception_system=[DefaultPerceptionSystem(i, env, time_loop_lock()) for i in range(num_agents)],
...        environment=env,
...    )