symaware.base package
Subpackages
- symaware.base.components package
- Submodules
- symaware.base.components.communication_receiver module
- symaware.base.components.communication_sender module
- symaware.base.components.component module
Component
Component._async_compute()
Component._async_update()
Component._compute()
Component._update()
Component.add_on_computed()
Component.add_on_computing()
Component.add_on_initialised()
Component.add_on_iterated()
Component.add_on_iterating()
Component.add_on_updated()
Component.add_on_updating()
Component.agent_id
Component.async_compute()
Component.async_compute_and_update()
Component.async_initialise_component()
Component.async_update()
Component.compute()
Component.compute_and_update()
Component.initialise_component()
Component.is_initialised
Component.remove_on_computed()
Component.remove_on_computing()
Component.remove_on_initialised()
Component.remove_on_iterated()
Component.remove_on_iterating()
Component.remove_on_updated()
Component.remove_on_updating()
Component.update()
- symaware.base.components.controller module
- symaware.base.components.perception_system module
- symaware.base.components.risk_estimator module
- symaware.base.components.uncertainty_estimator module
- Module contents
- symaware.base.data package
- Submodules
- symaware.base.data.data_structures module
- symaware.base.data.types module
SymawareConfig
SymawareConfig.agent
SymawareConfig.awareness_vector
SymawareConfig.communication_receiver
SymawareConfig.communication_sender
SymawareConfig.controller
SymawareConfig.environment
SymawareConfig.knowledge_database
SymawareConfig.perception_system
SymawareConfig.risk_estimator
SymawareConfig.uncertainty_estimator
- Module contents
- symaware.base.models package
- Submodules
- symaware.base.models.dynamical_model module
- symaware.base.models.entity module
- symaware.base.models.environment module
Environment
Environment._add_entity()
Environment.add_agents()
Environment.add_entities()
Environment.add_on_initialised()
Environment.add_on_initialising()
Environment.add_on_stepped()
Environment.add_on_stepping()
Environment.add_on_stopped()
Environment.add_on_stopping()
Environment.agent_states
Environment.async_run()
Environment.async_stop()
Environment.elapsed_time
Environment.entities
Environment.get_agent_state()
Environment.get_entity_state()
Environment.initialise()
Environment.remove_on_initialised()
Environment.remove_on_initialising()
Environment.remove_on_stepped()
Environment.remove_on_stepping()
Environment.remove_on_stopped()
Environment.remove_on_stopping()
Environment.start_time
Environment.step()
Environment.stop()
- Module contents
- symaware.base.utils package
- Submodules
- symaware.base.utils.async_loop_lock module
- symaware.base.utils.async_loop_lockable module
- symaware.base.utils.logging module
- symaware.base.utils.null_object module
- symaware.base.utils.publisher module
- Module contents
Submodules
symaware.base.agent module
- class symaware.base.agent.Agent(ID, entity)[source]
Bases:
Generic
[T
]An agent represents an entity that can interact with the environment and other agents. It is made of several components. While the exact kind and number is up to the instance, usually these are the most common ones:
An
symaware.base.Entity
representing the simulated physical entity of the agent, with a dynamical model.A
Controller
that computes the control input of the agent based on the current information.A
RiskEstimator
that computes the risk of the agent based on the current information.An
UncertaintyEstimator
that computes the uncertainty of the agent based on the current information.A
PerceptionSystem
that collects information from the environment.A
CommunicationSystem
that collects information from other agents and sends information to other agents.
Each component can easily be replaced by a custom one by subclassing the corresponding class.
Agents support for synchronous and asynchronous execution.
Note
To simplify coordination between multiple agents, the use of a
AgentCoordinator
is recommended.Example
Create an agent with a simple dynamical model and the default components using the pybullet simulator:
>>> >>> import pybullet as p >>> import numpy as np >>> from symaware.simulators.pybullet import Environment, RacecarEntity, RacecarModel >>> from symaware.base import Agent, TimeIntervalAsyncLoopLock, KnowledgeDatabase, AgentCoordinator, AwarenessVector >>> from symaware.base import DefaultCommunicationSender, DefaultCommunicationReceiver >>> from symaware.base import DefaultInfoUpdater, DefaultController >>> from symaware.base import DefaultPerceptionSystem, DefaultRiskEstimator, DefaultUncertaintyEstimator >>> agent_id = 0 >>> env = Environment(connection_method=p.DIRECT, async_loop_lock=TimeIntervalAsyncLoopLock(0.1)) >>> entity = RacecarEntity(agent_id, model=RacecarModel(agent_id)) >>> agent = Agent[KnowledgeDatabase](agent_id, entity) >>> env.add_agents(agent) >>> agent.add_components( ... DefaultController(agent_id, TimeIntervalAsyncLoopLock(0.1)), ... DefaultRiskEstimator(agent_id, TimeIntervalAsyncLoopLock(0.1)), ... DefaultUncertaintyEstimator(agent_id, TimeIntervalAsyncLoopLock(0.1)), ... DefaultPerceptionSystem(agent_id, env, TimeIntervalAsyncLoopLock(0.1)), ... DefaultCommunicationSender(agent_id, TimeIntervalAsyncLoopLock(0.1)), ... DefaultCommunicationReceiver(agent_id, TimeIntervalAsyncLoopLock(0.1)), ... ) >>> agent.initialise_agent( ... initial_awareness_database=AwarenessVector(agent_id, np.zeros(7)), ... initial_knowledge_database={agent_id: KnowledgeDatabase(goal_reached=False, goal_pos=np.zeros(3))}, ... ) >>> AgentCoordinator(env, agent).run()
- Parameters:
- _LOGGER = <Logger symaware.base.Agent (WARNING)>
- async _async_step(component)[source]
Asynchronously run the step function for the component. Each component will compute its new value and update the agent accordingly. The frequency of the loop is determined by the component’s
AsyncLoopLock
.- Parameters:
component (
Component
) – Component to run asynchronously
- add_components(*components)[source]
Add a component or a list of components to the agent.
- Parameters:
components (
Component
) – single component or a list of components to add to the agent
- async async_initialise_agent(initial_awareness_database, initial_knowledge_database)[source]
Initializes the agent with the given initial awareness database and initial knowledge database.
- Parameters:
initial_awareness_database – The initial awareness database containing the state of the agent. If it is a dictionary, it must at least contain the state of the agent itself. Otherwise, it is assumed to be representing the state of the agent.
initial_knowledge_database – The initial knowledge database. If it is a dictionary, it must at least contain the state of the agent itself. Otherwise, it is assumed to be representing the state of the agent.
- Raises:
ValueError – The agent ID is not present in the initial awareness database: or if the dimension of the awareness vector does not match the dimension of the dynamical model state input.
- async async_run()[source]
Run the agent asynchronously. It will run all the components asynchronously.
- async async_stop()[source]
Stop the agent asynchronously. Makes sure all the components have stopped running to stop the agent gracefully.
- property awareness_database: dict[int, AwarenessVector]
Agent’s awareness database
- property communication_receiver: CommunicationReceiver
Communication receiver for the agent. Null component is none is present
- property communication_receivers: tuple[CommunicationReceiver, ...]
Communication receivers for the agent
- property communication_sender: CommunicationSender
Communication system for the agent. Null component is none is present
- property communication_senders: tuple[CommunicationSender, ...]
Communication system for the agent
- property controller: Controller
Controller for the agent. Null component is none is present
- property controllers: tuple[Controller, ...]
Controllers for the agent
- initialise_agent(initial_awareness_database, initial_knowledge_database)[source]
Initializes the agent with the given initial awareness database and initial knowledge database.
- Parameters:
initial_awareness_database – The initial awareness database containing the state of the agent. If it is a dictionary, it must at least contain the state of the agent itself. Otherwise, it is assumed to be representing the state of the agent.
initial_knowledge_database – The initial knowledge database. If it is a dictionary, it must at least contain the state of the agent itself. Otherwise, it is assumed to be representing the state of the agent.
- Raises:
ValueError – The agent ID is not present in the initial awareness database: or if the dimension of the awareness vector does not match the dimension of the dynamical model state input.
- property is_initialised: bool
Check if the
initialise_agent()
method has been called before
- property knowledge_database: MultiAgentKnowledgeDatabase[T]
Agent’s knowledge database
- property model: DynamicalModel
Dynamical model for the agent
- property perception_system: PerceptionSystem
Perception system for the agent. Null component is none is present
- property perception_systems: tuple[PerceptionSystem, ...]
Perception systems for the agent
- property risk_estimator: RiskEstimator
Risk estimator for the agent. Null component is none is present
- property risk_estimators: tuple[RiskEstimator, ...]
Risk estimators for the agent
- property self_awareness: AwarenessVector
Agent’s awareness vector
- property self_knowledge: T
Agent’s knowledge
- step()[source]
Step function for the agent. Will run at each time step. It will, in order:
Collect information from the environment
Receive messages from other agents
Compute and update the risk
Compute and update the uncertainty
Compute the control input
Send the awareness and knowledge databases to other agents
Note
If multiple components of the same kind are present, the order in which they are added is the order in which they will be executed.
Example
Override the
step()
method in a subclass to change the order of components’ execution.>>> from symaware.base import Agent >>> class MyAgent(Agent): ... def step(self): ... # Your implementation here ... # Example: ... # Compute all the components in the order they were added ... for component in self.components: ... component.compute_and_update()
- Raises:
ValueError – The agent must be initialised before running the step function.:
- property uncertainty_estimator: UncertaintyEstimator
Uncertainty estimator for the agent. Null component is none is present
- property uncertainty_estimators: tuple[UncertaintyEstimator, ...]
Uncertainty estimator for the agent
symaware.base.agent_coordinator module
- class symaware.base.agent_coordinator.AgentCoordinator(env, *agents, post_init=None, post_stop=None, async_post_init=None, async_post_stop=None)[source]
Bases:
Generic
[T
]The agent coordinator is responsible for running all agents and the environment in an asynchronous manner. It is advised to use this class instead of running the agents and environment manually.
After loading the agents and the environment, the coordinator can be started by calling the
run()
orasync_run()
methods. It is possible to add a callback that will be executed after the initialisation and one after the termination of the agents and the environment.- Parameters:
env (
Environment
) – Environment the agents will interact with and the coordinator will runagents (
Agent
) – Collection of agents that will be run by the coordinatorpost_init (
Optional
[Callable
[[AgentCoordinator
],None
]], default:None
) – Callback that will be executed after the initialisation of the agents and the environmentpost_stop (
Optional
[Callable
[[AgentCoordinator
],None
]], default:None
) – Callback that will be executed after the termination of the agents and the environmentasync_post_init (
Optional
[Callable
[[AgentCoordinator
],Coroutine
[Any
,Any
,None
]]], default:None
) – Asynchronous callback that will be executed after the initialisation of the agents and the environmentasync_post_stop (
Optional
[Callable
[[AgentCoordinator
],Coroutine
[Any
,Any
,None
]]], default:None
) – Asynchronous callback that will be executed after the termination of the agents and the environment
- __LOGGER = <Logger symaware.base.AgentCoordinator (WARNING)>
- async _initialise(initialise_info)[source]
Initialise all agents asynchronously. Only the agents that have not been initialised before and are present in the initialise_info will be initialised.
- Parameters:
initialise_info – Dictionary containing the initialisation information for each agent. The agent id is the key and the value is a tuple containing the awareness vector and the knowledge database
- static _raise_system_exit()[source]
Raise a SystemExit exception. Used to stop the event loop gracefully.
- Raises:
SystemExit – A signal was received to stop the event loop:
- Return type:
- async _run()[source]
Run all agents and the environment asynchronously. The method will return when all agents and the environment have been stopped.
- _set_loop_stop_signals(stop_signals)[source]
Set the stop signals for the event loop. By default, the signals SIGINT, SIGTERM and SIGABRT are used.
- async _stop()[source]
Stop all agents and the environment gracefully. The method will return when all agents and the environment have been stopped.
- add_agents(*agents)[source]
Add agents to the coordinator.
- Parameters:
agents (
Agent
) – Agents that will be run by the coordinator- Raises:
ValueError – Agents must be an Agent or an Iterable of Agent:
- async_run(initialise_info=None, timeout=-1, loop=None, stop_signals=None, close_loop=True)[source]
Start running the event loop. All agents will begin running their components asynchronously. The frequency at which each
Component
is executed is determined by theirAsyncLoopLock
. All agents that have not yet been initialised will be initialised before running, as long as the initialise_info contains the initialisation information for the agent, namely a tuple with the awareness vector and the knowledge database.Example
The following example shows how to create an environment, agents and a coordinator.
>>> from symaware.base import ( ... AgentCoordinator, ... Agent, ... Environment, ... TimeIntervalAsyncLoopLock, ... Entity, ... DynamicalModel, ... ) >>> >>> class MyEnvironment(Environment): ... ... >>> class MyEntity(Entity): ... ... >>> class MyModel(DynamicalModel): ... ... >>> >>> # Create the environment >>> env = MyEnvironment(async_loop_lock=TimeIntervalAsyncLoopLock(0.1)) >>> >>> # Create the agents >>> agents = [Agent(i, MyEntity(i, MyModel(i))) for i in range(1, 5)] >>> >>> # Set the components for each agent >>> for agent in agents: ... agent.set_components(...) >>> >>> # Create the coordinator >>> coordinator = AgentCoordinator(env, agents) >>> >>> # Run the coordinator >>> coordinator.async_run()
- Parameters:
initialise_info (default:
None
) – Dictionary containing the initialisation information for each agent. The agent id is the key and the value is a tuple containing the awareness vector, the knowledge database and the timetimeout (default:
-1
) – Time in seconds after which the event loop will stop. If the timeout is negative, the event loop will run indefinitelyloop (default:
None
) – event loop. If none is provided, the default one will be usedstop_signals (default:
None
) – Signals that will cause the event loop to stop gracefullyclose_loop (default:
True
) – Whether to close the event loop upon termination
- Raises:
Exception – An unforeseen exception was raised during execution:
- property env: Environment
Environment the agents will interact with and the coordinator will run
- run(time_step, timeout=-1, steps_limit=-1, initialise_info=None, pre_agent_iteration_callback=None)[source]
Start running all the agents and the environment. All agents will begin running their components synchronously. The frequency at which each
Component
is executed is determined by the time_step. All agents that have not yet been initialised will be initialised before running, as long as the initialise_info contains the initialisation information for the agent, namely a tuple with the awareness vector and the knowledge database.Example
The following example shows how to create an environment, agents and a coordinator.
>>> from symaware.base import ( ... AgentCoordinator, ... Agent, ... Environment, ... TimeIntervalAsyncLoopLock, ... Entity, ... DynamicalModel, ... ) >>> >>> class MyEnvironment(Environment): ... ... >>> class MyEntity(Entity): ... ... >>> class MyModel(DynamicalModel): ... ... >>> >>> # Create the environment >>> env = MyEnvironment(async_loop_lock=TimeIntervalAsyncLoopLock(0.1)) >>> >>> # Create the agents >>> agents = [Agent(i, MyEntity(i, MyModel(i))) for i in range(1, 5)] >>> >>> # Set the components for each agent >>> for agent in agents: ... agent.set_components(...) >>> >>> # Create the coordinator >>> coordinator = AgentCoordinator(env, agents) >>> >>> # Run the coordinator with a time step of 1 second >>> coordinator.run(1)
- Parameters:
time_step – Time step representing the interval at which the agents and the environment will be updated
timeout (default:
-1
) – Time in seconds after which the event loop will stop. If the timeout is negative, the event loop will run indefinitelysteps_limit (default:
-1
) – Maximum number of steps the event loop will run before stopping. If the limit is negative, the agents will run indefinitelyinitialise_info (default:
None
) – Dictionary containing the initialisation information for each agent. The agent id is the key and the value is a tuple containing the awareness vector, the knowledge database and the timepre_agent_iteration_callback (default:
None
) – Callback that will be executed before each agent iteration. The callback will receive the agent, the iteration count and the elapsed time as arguments
- Raises:
Exception – An unforeseen exception was raised during execution:
Module contents
Base package for SymAware. It contains the basic components and classes to create an agent and its environment. Most of the classes are abstract and need to be implemented by the user, although some default implementations are provided.
To run a simulation using the package’s own main function,
the user needs to create a configuration python file
that contains a function called configure_symaware()
.
The function must return a SymawareConfig
object.
Example
Given a configuration file called my_config.py which contains a function like the following:
>>> # Import all the classes that will be used in the configuration
>>> from symaware.base import SymawareConfig # ... other imports
>>> def configure_symaware(num_agents: int, time_step: float, disable_async: bool) -> SymawareConfig:
...
... def time_loop_lock() -> "TimeIntervalAsyncLoopLock | None":
... return TimeIntervalAsyncLoopLock(time_step) if not disable_async else None
...
... #############################################################
... # 1. Create the environment and add any number of obstacles #
... #############################################################
... env = PyBulletEnvironment(real_time_interval=time_step if disable_async else 0, async_loop_lock=time_loop_lock())
... env.add_entities(PybulletBox(position=np.array([2, 2, 2])), PybulletSphere(position=np.array([-4, -4, 2])))
...
... #############################################################
... # 2. Create the agents and add them to the environment #
... #############################################################
... entities = [PyBulletRacecar(i, model=PybulletRacecarModel(i)) for i in range(num_agents)]
... for i, entity in enumerate(entities):
... entity.position = np.array([i, i, 0.1])
... agents = [Agent[MyKnowledgeDatabase](i, entities[i]) for i in range(num_agents)]
... env.add_agents(agents)
...
... #############################################################
... # 3. Create the knowledge database and the awareness vector #
... #############################################################
... knowledge_databases = [
... {
... i: MyKnowledgeDatabase(
... goal_reached=False,
... goal_pos=np.array([2, 2]),
... tolerance=0.1,
... speed=20 + 10 * i,
... steering_tolerance=0.01,
... )
... }
... for i in range(num_agents)
... ]
... awareness_vectors = [AwarenessVector(i, np.zeros(7)) for i in range(num_agents)]
...
... #############################################################
... # 4. Return the configuration #
... #############################################################
... return SymawareConfig(
... agent=agents,
... controller=[MyController(i, time_loop_lock()) for i in range(num_agents)],
... knowledge_database=knowledge_databases,
... awareness_vector=awareness_vectors,
... risk_estimator=[DefaultRiskEstimator(i, time_loop_lock()) for i in range(num_agents)],
... uncertainty_estimator=[DefaultUncertaintyEstimator(i, time_loop_lock()) for i in range(num_agents)],
... communication_system=[
... DefaultCommunicationSystem(i, time_loop_lock(), time_loop_lock()) for i in range(num_agents)
... ],
... info_updater=[DefaultInfoUpdater(i, time_loop_lock()) for i in range(num_agents)],
... perception_system=[DefaultPerceptionSystem(i, env, time_loop_lock()) for i in range(num_agents)],
... environment=env,
... )