symaware.base.components package

Submodules

symaware.base.components.communication_receiver module

class symaware.base.components.communication_receiver.CommunicationReceiver(agent_id, async_loop_lock=None)[source]

Bases: Component[Tasynclooplock, ReceivingCommunicationCallback, ReceivedCommunicationCallback]

Generic communication system of an symaware.base.Agent. It is used to communicate with other agents. The information collected is then used to update the knowledge database of the agent.

The internal communication channel is read and any number of messages collected are returned. The messages are then translated into a dictionary of AwarenessVector and KnowledgeDatabase ready to be used to update the agent’s state.

To implement a new communication system, you need to indicate how the new messages update the agent by implementing the _update() method.

Example

Create a new communication system by subclassing the CommunicationReceiver and implementing the _update() method.

>>> from symaware.base import CommunicationReceiver, Message
>>> class MyCommunicationReceiver(CommunicationReceiver):
...     def _update(self, *args, **kwargs):
...         # Your implementation here
...         # Example:
...         # Update the awareness database with the messages received
...         for sender_id, (awareness_vector, knowledge_database) in kwargs.items():
...             self._agent.awareness_database[sender_id] = awareness_vector
...             self._agent.knowledge_database[sender_id] = knowledge_database
Parameters:
  • agent_id (int) – Identifier of the agent this component belongs to

  • async_send_loop_lock – Async loop lock to use for the communication system

async _async_compute()[source]

Upon receiving a message, it is decoded and translated into useful information asynchronously. :rtype: dict[int, tuple[dict[int, AwarenessVector], dict[int, TypeVar(T, bound= KnowledgeDatabase)]]]

Note

Check _compute() for more information about the method and AsyncLoopLockable for more information about the async loop.

Returns:

Messages received by other agents as a dictionary of Awareness vector and Knowledge database

async _async_receive_communication_from_channel()[source]

The internal communication channel is read and any number of messages collected are returned asynchronously. :rtype: Iterable[Message]

Note

Check _receive_communication_from_channel() for more information about the method and AsyncLoopLockable for more information about the async loop.

Returns:

Iterable[Message]: _description_

_compute()[source]

Upon receiving a message, it is decoded and translated into useful information. The simplest one would be returning directly what you receive.

Returns:

dict[int, tuple[dict[int, AwarenessVector], dict[int, TypeVar(T, bound= KnowledgeDatabase)]]] – Messages received by other agents as a dictionary of Awareness vector and Knowledge database

abstract _decode_message(messages)[source]

The messages received from the communication channel are decoded to produce the output of the _compute() method. The default implementation assumes that the messages are of type InfoMessage, and produces a dictionary of Awareness vector and Knowledge database.

Although the method must be implemented, you may choose to sure the super implementation if the messages are of type InfoMessage.

Example

To support a new message type, you need to implement the _decode_message() method.

>>> from typing import Iterable
>>> from symaware.base import CommunicationSender, Message, InfoMessage, Identifier
>>> class MyCommunicationReceiver(CommunicationReceiver):
...     def _decode_message(self, messages: Iterable[Message]) -> dict[Identifier, str]:
...         # Your implementation here
...         # Example:
...         # Decode a message of type InfoMessage
...         res = {}
...         for message in messages:
...             if isinstance(message, InfoMessage):
...                 res[message.sender_id] = (message.awareness_database, message.knowledge_database)
...             else:
...                 raise TypeError(f"Unknown message type {type(message)}")
...         return res
Parameters:

messages (Iterable[Message]) – Iterable of messages to parse

Returns:

dict[int, tuple[dict[int, AwarenessVector], dict[int, TypeVar(T, bound= KnowledgeDatabase)]]] – Dictionary of Awareness vector and Knowledge database. The key is the sender id and the value is a tuple of Awareness vector and Knowledge database

abstract _receive_communication_from_channel()[source]

The internal communication channel is read and any number of messages collected are returned.

Example

Create a new communication system by subclassing the CommunicationReceiver and implementing the _receive_communication_from_channel() method.

>>> from typing import Iterable
>>> from symaware.base import CommunicationReceiver, Message
>>> class MyCommunicationReceiver(CommunicationReceiver):
...     def _receive_communication_from_channel(self) -> Iterable[Message]:
...         # Your implementation here
...         # Example:
...         # Return an empty list. No messages are ever received
...         return []
Returns:

Iterable[Message] – Messages received by another agent

class symaware.base.components.communication_receiver.DefaultCommunicationReceiver(agent_id, async_loop_lock=None)[source]

Bases: CommunicationReceiver[Tasynclooplock]

Default implementation of the info updater. It does not send or receive any message.

Parameters:

agent_id (int) – Identifier of the agent this component belongs to

_decode_message(messages)[source]

The messages received from the communication channel are decoded to produce the output of the _compute() method. The default implementation assumes that the messages are of type InfoMessage, and produces a dictionary of Awareness vector and Knowledge database.

Although the method must be implemented, you may choose to sure the super implementation if the messages are of type InfoMessage.

Example

To support a new message type, you need to implement the _decode_message() method.

>>> from typing import Iterable
>>> from symaware.base import CommunicationSender, Message, InfoMessage, Identifier
>>> class MyCommunicationReceiver(CommunicationReceiver):
...     def _decode_message(self, messages: Iterable[Message]) -> dict[Identifier, str]:
...         # Your implementation here
...         # Example:
...         # Decode a message of type InfoMessage
...         res = {}
...         for message in messages:
...             if isinstance(message, InfoMessage):
...                 res[message.sender_id] = (message.awareness_database, message.knowledge_database)
...             else:
...                 raise TypeError(f"Unknown message type {type(message)}")
...         return res
Parameters:

messages (Iterable[Message]) – Iterable of messages to parse

Returns:

dict – Dictionary of Awareness vector and Knowledge database. The key is the sender id and the value is a tuple of Awareness vector and Knowledge database

_receive_communication_from_channel()[source]

The internal communication channel is read and any number of messages collected are returned.

Example

Create a new communication system by subclassing the CommunicationReceiver and implementing the _receive_communication_from_channel() method.

>>> from typing import Iterable
>>> from symaware.base import CommunicationReceiver, Message
>>> class MyCommunicationReceiver(CommunicationReceiver):
...     def _receive_communication_from_channel(self) -> Iterable[Message]:
...         # Your implementation here
...         # Example:
...         # Return an empty list. No messages are ever received
...         return []
Returns:

Iterable[Message] – Messages received by another agent

_update()[source]

This very generic method is to be implemented by the specialised components. Update the agent with some new values, usually computed by the compute() method.

This method must be implemented in any custom component.

Example

The update() method can be implemented to perform some custom updates.

>>> import numpy as np
>>> from symaware.base import Component, TimeSeries
>>> class MyComponent(Component):
...     def _update(self, new_risk: TimeSeries, new_state: np.ndarray):
...         self._agent.self_awareness.risk = new_risk
...         self._agent.self_state = new_state
Parameters:
  • args – Additional arguments

  • kwargs – Additional keyword arguments

class symaware.base.components.communication_receiver.NullCommunicationReceiver[source]

Bases: CommunicationReceiver[Tasynclooplock], NullObject

Default communication system used as a placeholder. It is used when no communication system is set for an agent. An exception is raised if this object is used in any way.

_decode_message(messages)[source]

The messages received from the communication channel are decoded to produce the output of the _compute() method. The default implementation assumes that the messages are of type InfoMessage, and produces a dictionary of Awareness vector and Knowledge database.

Although the method must be implemented, you may choose to sure the super implementation if the messages are of type InfoMessage.

Example

To support a new message type, you need to implement the _decode_message() method.

>>> from typing import Iterable
>>> from symaware.base import CommunicationSender, Message, InfoMessage, Identifier
>>> class MyCommunicationReceiver(CommunicationReceiver):
...     def _decode_message(self, messages: Iterable[Message]) -> dict[Identifier, str]:
...         # Your implementation here
...         # Example:
...         # Decode a message of type InfoMessage
...         res = {}
...         for message in messages:
...             if isinstance(message, InfoMessage):
...                 res[message.sender_id] = (message.awareness_database, message.knowledge_database)
...             else:
...                 raise TypeError(f"Unknown message type {type(message)}")
...         return res
Parameters:

messages (Iterable[Message]) – Iterable of messages to parse

Returns:

Dictionary of Awareness vector and Knowledge database. The key is the sender id and the value is a tuple of Awareness vector and Knowledge database

_receive_communication_from_channel()[source]

The internal communication channel is read and any number of messages collected are returned.

Example

Create a new communication system by subclassing the CommunicationReceiver and implementing the _receive_communication_from_channel() method.

>>> from typing import Iterable
>>> from symaware.base import CommunicationReceiver, Message
>>> class MyCommunicationReceiver(CommunicationReceiver):
...     def _receive_communication_from_channel(self) -> Iterable[Message]:
...         # Your implementation here
...         # Example:
...         # Return an empty list. No messages are ever received
...         return []
Returns:

Messages received by another agent

_update()[source]

This very generic method is to be implemented by the specialised components. Update the agent with some new values, usually computed by the compute() method.

This method must be implemented in any custom component.

Example

The update() method can be implemented to perform some custom updates.

>>> import numpy as np
>>> from symaware.base import Component, TimeSeries
>>> class MyComponent(Component):
...     def _update(self, new_risk: TimeSeries, new_state: np.ndarray):
...         self._agent.self_awareness.risk = new_risk
...         self._agent.self_state = new_state
Parameters:
  • args – Additional arguments

  • kwargs – Additional keyword arguments

symaware.base.components.communication_sender module

class symaware.base.components.communication_sender.CommunicationSender(agent_id, async_loop_lock=None, send_to_self=False)[source]

Bases: Component[Tasynclooplock, SendingCommunicationCallback, SentCommunicationCallback]

Generic communication system of an symaware.base.Agent. It is used to communicate with other agents. The information collected is then used to update the knowledge database of the agent.

Before sending a message, it will be enqueued in the message queue. On the next iteration, the messages will be sent to the receiver agent(s) through the communication channel.

Parameters:
  • agent_id (int) – Identifier of the agent this component belongs to

  • async_loop_lock (Optional[TypeVar(Tasynclooplock, bound= AsyncLoopLock)], default: None) – Async loop lock to use for the communication sender

  • send_to_self (bool, default: False) – If True, the agent is able to send messages to itself

async _async_compute()[source]

All the messages in the message queue are sent to the receiver agent(s) through the communication channel asynchronously. :rtype: Iterable[Message]

Note

Check _compute() for more information about the method and AsyncLoopLockable for more information about the async loop.

Returns:

Message(s) sent to the receiver agent(s)

async _async_send_communication_through_channel(message)[source]

The information the agents wants to share has been encoded into a message and it sent to another agent using a communication channel asynchronously.

Note

Check _send_communication_through_channel() for more information about the method and AsyncLoopLockable for more information about the async loop.

Parameters:

message (Message) – Message to send to the receiver agent through the communication channel

_compute()[source]

All the messages in the message queue are sent to the receiver agent(s) through the communication channel. Some implementations may put a limit on the number of messages sent per iteration, to avoid flooding the channel.

Returns:

Iterable[Message] – Message(s) sent to the receiver agent(s)

abstract _send_communication_through_channel(message)[source]

The information the agents wants to share has been encoded into a message and it sent to another agent using a communication channel.

Example

Create a new communication system by subclassing the CommunicationSender and implementing the _send_communication_through_channel() method.

>>> from symaware.base import CommunicationSender, Message
>>> class MyCommunicationSender(CommunicationSender):
...     def _send_communication_through_channel(self, message: Message):
...         # Your implementation here
...         # Example:
...         # Print the message to the console
...         print(f"Sending message to {message.receiver_id}: {message}")
Parameters:

message (Message) – Message to send to the receiver agent through the communication channel

_update(messages_sent)[source]

This very generic method is to be implemented by the specialised components. Update the agent with some new values, usually computed by the compute() method.

This method must be implemented in any custom component.

Example

The update() method can be implemented to perform some custom updates.

>>> import numpy as np
>>> from symaware.base import Component, TimeSeries
>>> class MyComponent(Component):
...     def _update(self, new_risk: TimeSeries, new_state: np.ndarray):
...         self._agent.self_awareness.risk = new_risk
...         self._agent.self_state = new_state
Parameters:
  • args – Additional arguments

  • kwargs – Additional keyword arguments

dequeue_message(message)[source]

Dequeue a message from the message queue.

Parameters:

message (Message) – Message to remove from the message queue

enqueue_messages(*messages)[source]

Enqueue a message to be sent to another agent.

Parameters:

message – Message to send to the receiver agent

class symaware.base.components.communication_sender.DefaultCommunicationSender(agent_id, async_loop_lock=None, send_to_self=False)[source]

Bases: CommunicationSender[Tasynclooplock]

Default implementation of the info updater. It does not send or receive any message.

Parameters:

agent_id (int) – Identifier of the agent this component belongs to

_send_communication_through_channel(message)[source]

The information the agents wants to share has been encoded into a message and it sent to another agent using a communication channel.

Example

Create a new communication system by subclassing the CommunicationSender and implementing the _send_communication_through_channel() method.

>>> from symaware.base import CommunicationSender, Message
>>> class MyCommunicationSender(CommunicationSender):
...     def _send_communication_through_channel(self, message: Message):
...         # Your implementation here
...         # Example:
...         # Print the message to the console
...         print(f"Sending message to {message.receiver_id}: {message}")
Parameters:

message (Message) – Message to send to the receiver agent through the communication channel

class symaware.base.components.communication_sender.NullCommunicationSender[source]

Bases: CommunicationSender, NullObject

Default communication system used as a placeholder. It is used when no communication system is set for an agent. An exception is raised if this object is used in any way.

_send_communication_through_channel(message)[source]

The information the agents wants to share has been encoded into a message and it sent to another agent using a communication channel.

Example

Create a new communication system by subclassing the CommunicationSender and implementing the _send_communication_through_channel() method.

>>> from symaware.base import CommunicationSender, Message
>>> class MyCommunicationSender(CommunicationSender):
...     def _send_communication_through_channel(self, message: Message):
...         # Your implementation here
...         # Example:
...         # Print the message to the console
...         print(f"Sending message to {message.receiver_id}: {message}")
Parameters:

message (Message) – Message to send to the receiver agent through the communication channel

symaware.base.components.component module

class symaware.base.components.component.Component(agent_id, async_loop_lock=None)[source]

Bases: Publisher, AsyncLoopLockable[Tasynclooplock], ABC, Generic[Tasynclooplock, ComputingCallback, ComputedCallback]

Generic component of an Agent. All specialized components should inherit from this class.

Parameters:
  • agent_id (int) – Identifier of the agent this component belongs to

  • async_loop_lock (Optional[TypeVar(Tasynclooplock, bound= AsyncLoopLock)], default: None) – Async loop lock to use to wait for each loop if the component is used asynchronously

async _async_compute(*args, **kwargs)[source]

Given the state of the agent, compute a value or a set of values asynchronously.

Note

Check _compute() for more information about the method and AsyncLoopLockable for more information about the async loop.

Parameters:
  • args – Additional arguments

  • kwargs – Additional keyword arguments

Returns:

Any – Computed value or set of values

async _async_update(*args, **kwargs)[source]

Update the agent with some new values, usually computed by the compute() method, asynchronously.

Note

Check _update() for more information about the method and AsyncLoopLockable for more information about the async loop.

Parameters:
  • args – Additional arguments

  • kwargs – Additional keyword arguments

abstract _compute(*args, **kwargs)[source]

This very generic method is to be implemented by the specialised components. Given the state of the agent, compute a value or a set of values. Those values can be used to update the agent’s state or to compute the control input of the agent.

This method must be implemented in any custom component.

Example

The compute() method can be implemented to perform some custom computation.

>>> import numpy as np
>>> from symaware.base import Component
>>> class MyComponent(Component):
...     def _compute(self):
...         state = self._agent.self_state
...         risk = self._agent.self_awareness.risk.get(0, np.zeros(1))
...         return state * np.maximum(0, risk)
Parameters:
  • args – Additional arguments

  • kwargs – Additional keyword arguments

Returns:

Any – Computed value or set of values

abstract _update(*args, **kwargs)[source]

This very generic method is to be implemented by the specialised components. Update the agent with some new values, usually computed by the compute() method.

This method must be implemented in any custom component.

Example

The update() method can be implemented to perform some custom updates.

>>> import numpy as np
>>> from symaware.base import Component, TimeSeries
>>> class MyComponent(Component):
...     def _update(self, new_risk: TimeSeries, new_state: np.ndarray):
...         self._agent.self_awareness.risk = new_risk
...         self._agent.self_state = new_state
Parameters:
  • args – Additional arguments

  • kwargs – Additional keyword arguments

add_on_computed(callback)[source]

Add a callback to the event computed

Parameters:

callback (TypeVar(ComputedCallback, bound= Callable[..., Any])) – Callback to add

add_on_computing(callback)[source]

Add a callback to the event computing

Parameters:

callback (TypeVar(ComputingCallback, bound= Callable[..., Any])) – Callback to add

add_on_initialised(callback)[source]

Add a callback to the event initialised

Parameters:

callback (Callable[[Agent, dict[int, AwarenessVector], dict[int, TypeVar(T, bound= KnowledgeDatabase)]], Any]) – Callback to add

add_on_iterated(callback)[source]

Add a callback to the event iterated

Parameters:

callback (Callable[[Agent, int, Any], Any]) – Callback to add

add_on_iterating(callback)[source]

Add a callback to the event iterating

Parameters:

callback (Callable[[Agent, int], Any]) – Callback to add

add_on_updated(callback)[source]

Add a callback to the event updated

Parameters:

callback (Callable[[Agent], Any]) – Callback to add

add_on_updating(callback)[source]

Add a callback to the event updating

Parameters:

callback (TypeVar(ComputedCallback, bound= Callable[..., Any])) – Callback to add

property agent_id: int

Agent identifier

async async_compute(*args, **kwargs)[source]

Given the state of the agent, compute a value or a set of values.

Note

Check compute() for more information about the method and AsyncLoopLockable for more information about the async loop.

Parameters:
  • args – Additional arguments

  • kwargs – Additional keyword arguments

Returns:

Any – Computed value or set of values

Raises:

RuntimeError – If the controller has not been initialised yet:

async async_compute_and_update(*args, **kwargs)[source]

Compute and update the component in one go asynchronously. See compute() and update() for more information. Although not necessary, it is possible to override it with the correct signature. Just make sure to call the super method to ensure the correct behaviour. :rtype: Any

Note

Check compute_and_update() for more information about the method and AsyncLoopLockable for more information about the async loop.

Returns:

Computed value or set of values

async async_initialise_component(agent, initial_awareness_database, initial_knowledge_database)[source]

Initialize the component with some custom logic asynchronously.

Note

Check initialise_component() for more information about the method and AsyncLoopLockable for more information about the async loop.

Parameters:
async async_update(*args, **kwargs)[source]

Given some new values, update the state of the agent asynchronously.

Note

Check update() for more information about the method and AsyncLoopLockable for more information about the async loop.

Parameters:
  • args – Additional arguments

  • kwargs – Additional keyword arguments

Raises:

RuntimeError – If the controller has not been initialised yet:

compute(*args, **kwargs)[source]

Given the state of the agent, compute a value or a set of values. Those values can be used to update the agent’s state or to compute the control input of the agent. Internally, this method calls the _compute() method. Although not necessary, it is possible to override it with the correct signature. Just make sure to call the super method to ensure the correct behaviour.

Note

Invoking this method will notify the subscribers of the events computing and computed added with add_on_computing() and :meth:add_on_computed respectively.

Parameters:
  • args – Additional arguments

  • kwargs – Additional keyword arguments

Returns:

Any – Computed value or set of values

Raises:

RuntimeError – If the controller has not been initialised yet:

compute_and_update(*args, **kwargs)[source]

Compute and update the component in one go. See compute() and update() for more information. Although not necessary, it is possible to override it with the correct signature. Just make sure to call the super method to ensure the correct behaviour. :rtype: Any

Note

Invoking this method will notify the subscribers of the events iterating and iterated added with add_on_iterating() and :meth:add_on_iterated respectively.

Returns:

Computed value or set of values

initialise_component(agent, initial_awareness_database, initial_knowledge_database)[source]

Initialize the component with some custom logic. For example you can instantiate new attributes and properties of the component. This function is called upon initialization by the Agent. To get the agent’s initial awareness and knowledge database you can use the arguments of this function. Make sure to call the super method at the end of your implementation to notify the subscribers of the event.

Note

Invoking this method will notify the subscribers of the event initialised added with add_on_initialised().

Warning

The implementation of the method in Component notifies the subscribers of the event initialised, sets the attribute _agent to the agent passed as argument and the flag _initialised to True. If you override this method, make sure to call the super method at the end of your implementation.

Example

The initialise_component() method can be overwritten to provide some custom initialisation logic.

>>> import time
>>> import numpy as np
>>> from symaware.base import Component
>>> class MyComponent(Component):
...     def initialise_component(
...            self, agent, initial_awareness_database, initial_knowledge_database
...         ):
...         self._my_model = agent.entity.model
...         self._my_state = initial_awareness_database[self.agent_id].state
...         self._my_time = time.time()
...         self._my_list = []
...         super().initialise_component(entity, initial_awareness_database, initial_knowledge_database)
Parameters:
property is_initialised: bool

Flag indicating whether the component has been initialised

remove_on_computed(callback)[source]

Remove a callback from the event computed

Parameters:

callback (TypeVar(ComputedCallback, bound= Callable[..., Any])) – Callback to remove

remove_on_computing(callback)[source]

Remove a callback from the event computing

Parameters:

callback (TypeVar(ComputingCallback, bound= Callable[..., Any])) – Callback to remove

remove_on_initialised(callback)[source]

Remove a callback from the event initialised

Parameters:

callback (Callable[[Agent, dict[int, AwarenessVector], dict[int, TypeVar(T, bound= KnowledgeDatabase)]], Any]) – Callback to remove

remove_on_iterated(callback)[source]

Remove a callback from the event iterated

Parameters:

callback (Callable[[Agent, int, Any], Any]) – Callback to remove

remove_on_iterating(callback)[source]

Remove a callback from the event iterating

Parameters:

callback (Callable[[Agent, int], Any]) – Callback to remove

remove_on_updated(callback)[source]

Remove a callback from the event updated

Parameters:

callback (Callable[[Agent], Any]) – Callback to remove

remove_on_updating(callback)[source]

Remove a callback from the event updating

Parameters:

callback (TypeVar(ComputedCallback, bound= Callable[..., Any])) – Callback to remove

update(*args, **kwargs)[source]

Given some new values, update the state of the agent. Internally, this method calls the _update() method. Although not necessary, it is possible to override it with the correct signature. Just make sure to call the super method to ensure the correct behaviour.

Note

Invoking this method will notify the subscribers of the events updating and updated added with add_on_updating() and :meth:add_on_updated respectively.

Parameters:
  • args – Additional arguments

  • kwargs – Additional keyword arguments

Raises:

RuntimeError – If the controller has not been initialised yet:

symaware.base.components.controller module

class symaware.base.components.controller.Controller(agent_id, async_loop_lock=None)[source]

Bases: Component[Tasynclooplock, ComputingControlInputCallback, ComputedControlInputCallback]

Abstract class for the controller. Compute the control input for the agent. Normally the agent would use its information available at a certain time to compute a control input. This could translate in a lot of information being passed to the compute().

Parameters:
  • agent_id – Identifier of the agent this component belongs to

  • async_loop_lock (Optional[TypeVar(Tasynclooplock, bound= AsyncLoopLock)], default: None) – Async loop lock to use for the controller

abstract _compute()[source]

Compute the control input for the agent. Normally the agent would use its information available at a certain time to compute a control input. This could translate in a lot of information being passed to this method.

This method must be implemented in any custom controller.

Example

Create a new controller by subclassing the Controller and implementing the _compute() method and the _update() method.

>>> from symaware.base import Controller, MultiAgentAwarenessVector, MultiAgentKnowledgeDatabase, TimeSeries
>>> import numpy as np
>>> class MyController(Controller):
...     def _compute(self) -> tuple[np.ndarray, TimeSeries]:
...         # Your implementation here
...         # Example:
...         # Get the state of the agent
...         state = self._agent.self_state
...         # Get the goal position from the knowledge database
...         goal_pos = self._agent.self_knowledge["goal_pos"]
...         # Compute the control input as the difference between the goal position and the current state
...         control_input = goal_pos - state
...         # Return the control input and an empty TimeSeries
...         return control_input, TimeSeries()
...
...     def _update(self, control_input_and_intent: tuple[np.ndarray, TimeSeries]):
...         # Your implementation here
...         # Example:
...         # Simply override the control input and intent of the agent
...         control_input, intent = control_input_and_intent
...         self._agent.model.control_input = control_input
...         self._agent.self_awareness.intent = intent
Returns:

tuple[ndarray, TimeSeries] –:

  • New state of the agent the controller wants to reach,

  • Time series of intents of the controller, Can be empty

_update(control_input_and_intent)[source]

Update the agent’s model with the computed control input and store the intent in the agent’s awareness vector.

Example

A new controller could decide to override the default _update() method.

>>> from symaware.base import Controller, TimeSeries
>>> class MyController(Controller):
...     def _update(self, control_input_and_intent: tuple[np.ndarray, TimeSeries]):
...         # Your implementation here
...         # Example:
...         # Simply override the control input and intent of the agent
...         control_input, intent = control_input_and_intent
...         self._agent.model.control_input = control_input
...         self._agent.self_awareness.intent = intent
Parameters:

control_input_and_intent (tuple[ndarray, TimeSeries]) – Tuple composed of two elements: - new control input to apply to the agent’s model - new intent to store in the agent’s awareness vector

property dynamical_model: DynamicalModel

Dynamical system model for the agent

initialise_component(agent, initial_awareness_database, initial_knowledge_database)[source]

Initialize the component with some custom logic. For example you can instantiate new attributes and properties of the component. This function is called upon initialization by the Agent. To get the agent’s initial awareness and knowledge database you can use the arguments of this function. Make sure to call the super method at the end of your implementation to notify the subscribers of the event.

Note

Invoking this method will notify the subscribers of the event initialised added with add_on_initialised().

Warning

The implementation of the method in Component notifies the subscribers of the event initialised, sets the attribute _agent to the agent passed as argument and the flag _initialised to True. If you override this method, make sure to call the super method at the end of your implementation.

Example

The initialise_component() method can be overwritten to provide some custom initialisation logic.

>>> import time
>>> import numpy as np
>>> from symaware.base import Component
>>> class MyComponent(Component):
...     def initialise_component(
...            self, agent, initial_awareness_database, initial_knowledge_database
...         ):
...         self._my_model = agent.entity.model
...         self._my_state = initial_awareness_database[self.agent_id].state
...         self._my_time = time.time()
...         self._my_list = []
...         super().initialise_component(entity, initial_awareness_database, initial_knowledge_database)
Parameters:
class symaware.base.components.controller.DefaultController(agent_id, async_loop_lock=None)[source]

Bases: Controller

Default implementation of the controller. It returns a zero vector of the right size as control input.

Parameters:
  • agent_id – Identifier of the agent this component belongs to

  • async_loop_lock (Optional[TypeVar(Tasynclooplock, bound= AsyncLoopLock)], default: None) – Async loop lock to use for the controller

_compute()[source]

Compute the control input for the agent. Normally the agent would use its information available at a certain time to compute a control input. This could translate in a lot of information being passed to this method.

This method must be implemented in any custom controller.

Example

Create a new controller by subclassing the Controller and implementing the _compute() method and the _update() method.

>>> from symaware.base import Controller, MultiAgentAwarenessVector, MultiAgentKnowledgeDatabase, TimeSeries
>>> import numpy as np
>>> class MyController(Controller):
...     def _compute(self) -> tuple[np.ndarray, TimeSeries]:
...         # Your implementation here
...         # Example:
...         # Get the state of the agent
...         state = self._agent.self_state
...         # Get the goal position from the knowledge database
...         goal_pos = self._agent.self_knowledge["goal_pos"]
...         # Compute the control input as the difference between the goal position and the current state
...         control_input = goal_pos - state
...         # Return the control input and an empty TimeSeries
...         return control_input, TimeSeries()
...
...     def _update(self, control_input_and_intent: tuple[np.ndarray, TimeSeries]):
...         # Your implementation here
...         # Example:
...         # Simply override the control input and intent of the agent
...         control_input, intent = control_input_and_intent
...         self._agent.model.control_input = control_input
...         self._agent.self_awareness.intent = intent
Returns:

  • New state of the agent the controller wants to reach,

  • Time series of intents of the controller, Can be empty

initialise_component(agent, initial_awareness_database, initial_knowledge_database)[source]

Initialize the component with some custom logic. For example you can instantiate new attributes and properties of the component. This function is called upon initialization by the Agent. To get the agent’s initial awareness and knowledge database you can use the arguments of this function. Make sure to call the super method at the end of your implementation to notify the subscribers of the event.

Note

Invoking this method will notify the subscribers of the event initialised added with add_on_initialised().

Warning

The implementation of the method in Component notifies the subscribers of the event initialised, sets the attribute _agent to the agent passed as argument and the flag _initialised to True. If you override this method, make sure to call the super method at the end of your implementation.

Example

The initialise_component() method can be overwritten to provide some custom initialisation logic.

>>> import time
>>> import numpy as np
>>> from symaware.base import Component
>>> class MyComponent(Component):
...     def initialise_component(
...            self, agent, initial_awareness_database, initial_knowledge_database
...         ):
...         self._my_model = agent.entity.model
...         self._my_state = initial_awareness_database[self.agent_id].state
...         self._my_time = time.time()
...         self._my_list = []
...         super().initialise_component(entity, initial_awareness_database, initial_knowledge_database)
Parameters:
  • agent – Agent this component has been attached to

  • initial_awareness_database – Awareness database of the agent

  • initial_knowledge_database – Knowledge database of the agent

class symaware.base.components.controller.NullController[source]

Bases: Controller, NullObject

Default controller used as a placeholder. It is used when no controller is set for an agent. An exception is raised if this object is used in any way.

_compute()[source]

Compute the control input for the agent. Normally the agent would use its information available at a certain time to compute a control input. This could translate in a lot of information being passed to this method.

This method must be implemented in any custom controller.

Example

Create a new controller by subclassing the Controller and implementing the _compute() method and the _update() method.

>>> from symaware.base import Controller, MultiAgentAwarenessVector, MultiAgentKnowledgeDatabase, TimeSeries
>>> import numpy as np
>>> class MyController(Controller):
...     def _compute(self) -> tuple[np.ndarray, TimeSeries]:
...         # Your implementation here
...         # Example:
...         # Get the state of the agent
...         state = self._agent.self_state
...         # Get the goal position from the knowledge database
...         goal_pos = self._agent.self_knowledge["goal_pos"]
...         # Compute the control input as the difference between the goal position and the current state
...         control_input = goal_pos - state
...         # Return the control input and an empty TimeSeries
...         return control_input, TimeSeries()
...
...     def _update(self, control_input_and_intent: tuple[np.ndarray, TimeSeries]):
...         # Your implementation here
...         # Example:
...         # Simply override the control input and intent of the agent
...         control_input, intent = control_input_and_intent
...         self._agent.model.control_input = control_input
...         self._agent.self_awareness.intent = intent
Returns:

  • New state of the agent the controller wants to reach,

  • Time series of intents of the controller, Can be empty

symaware.base.components.perception_system module

class symaware.base.components.perception_system.DefaultPerceptionSystem(agent_id, environment, async_loop_lock=None)[source]

Bases: PerceptionSystem[Tasynclooplock]

Default implementation of perception system. It returns the values collected from the environment.

Parameters:

agent_id (int) – Identifier of the agent this component belongs to

_compute()[source]

Compute the information perceived by the agent. It returns the values collected from the environment.

Returns:

dict[int, StateObservation] – Information perceived by the agent. Each agent’s state is identified by its id.

class symaware.base.components.perception_system.NullPerceptionSystem[source]

Bases: PerceptionSystem[Tasynclooplock], NullObject

Default perception system used as a placeholder. It is used when no perception system is set for an agent. An exception is raised if this object is used in any way.

_compute()[source]

Perceive the state of each Agent in the symaware.base.Environment. It is used to update the knowledge database of the agent.

Example

Create a new perception system by subclassing the PerceptionSystem and implementing the _compute() method.

>>> from symaware.base import PerceptionSystem, StateObservation
>>> class MyPerceptionSystem(PerceptionSystem):
...     def _compute(self):
...         # Your implementation here
...         # Example:
...         # Get only the agent's own state
...         if not self._agent_id in self._env.agent_states:
...             return {}
...         return {self._agent_id: StateObservation(self._agent_id, self._env.agent_states[self._agent_id])}
Returns:

Information perceived by the agent. Each agent’s state is identified by its id.

class symaware.base.components.perception_system.PerceptionSystem(agent_id, environment, async_loop_lock=None)[source]

Bases: Component[Tasynclooplock, PerceivingInformationCallback, PerceivedInformationCallback]

Generic perception system of an symaware.base.Agent. It is used to perceive the state of the Agent the symaware.base.Environment. The implementation could extend this and include more complex interaction, like sensors or omniscient perception. The information collected is then used to update the awareness vector and knowledge database of the agent.

Parameters:
  • agent_id (int) – Identifier of the agent this component belongs to

  • environment (Environment) – symaware.base.Environment the perception system will observe

  • async_loop_lock (Optional[TypeVar(Tasynclooplock, bound= AsyncLoopLock)], default: None) – Async loop lock to use for the perception system

abstract _compute()[source]

Perceive the state of each Agent in the symaware.base.Environment. It is used to update the knowledge database of the agent.

Example

Create a new perception system by subclassing the PerceptionSystem and implementing the _compute() method.

>>> from symaware.base import PerceptionSystem, StateObservation
>>> class MyPerceptionSystem(PerceptionSystem):
...     def _compute(self):
...         # Your implementation here
...         # Example:
...         # Get only the agent's own state
...         if not self._agent_id in self._env.agent_states:
...             return {}
...         return {self._agent_id: StateObservation(self._agent_id, self._env.agent_states[self._agent_id])}
Returns:

dict[int, StateObservation] – Information perceived by the agent. Each agent’s state is identified by its id.

_update(perceived_information)[source]

Update the agent’s model with the new perceived information.

Example

A new perception system could decide to override the default _update() method.

>>> from symaware.base import PerceptionSystem, StateObservation, Identifier
>>> class MyPerceptionSystem(PerceptionSystem):
...     def _update(self, perceived_information: dict[Identifier, StateObservation]):
...         # Your implementation here
...         # Example:
...         # Simply override the state of the agent
...         self._agent.awareness_database[self._agent_id].state = perceived_information[self._agent_id].state
Parameters:

perceived_information (dict[int, StateObservation]) – Information perceived by the agent. Each agent’s state is identified by its id.

property environment: Environment

Environment

perceive_self_state()[source]

Perceive the state of the agent itself

Returns:

ndarray – State of the agent

Raises:

RuntimeError – The perception system has not been initialised yet:

symaware.base.components.risk_estimator module

class symaware.base.components.risk_estimator.DefaultRiskEstimator(agent_id, async_loop_lock=None)[source]

Bases: RiskEstimator[Tasynclooplock]

Default implementation of risk estimator. It returns the risk stored in the awareness database of the agent.

Parameters:

agent_id (int) – Identifier of the agent this component belongs to

_compute()[source]

Compute the risk for the agent.

This method must be implemented in any custom risk estimator.

Return type:

TimeSeries

Example

Create a new risk estimator by subclassing the RiskEstimator and implementing the _compute() method.

>>> from symaware.base import RiskEstimator, TimeSeries
>>> class MyRiskEstimator(RiskEstimator):
...     def _compute(self):
...         # Your implementation here
...         # Example:
...         # Get the last value of the risk stored in the awareness database
...         awareness_database = self._agent.awareness_database
...         if len(awareness_database[self._agent_id].risk) == 0:
...             return TimeSeries({0: np.array([0])})
...         last_value_idx = next(iter(awareness_database[self._agent_id].risk))
...         last_value = awareness_database[self._agent_id].risk[last_value_idx]
...         # Invert the last value and return it as a TimeSeries
...         return TimeSeries({0: np.array([1 - last_value])})
class symaware.base.components.risk_estimator.NullRiskEstimator[source]

Bases: RiskEstimator[Tasynclooplock], NullObject

Default risk estimator used as a placeholder. It is used when no risk estimator is set for an agent. An exception is raised if this object is used in any way.

_compute()[source]

Compute the risk for the agent.

This method must be implemented in any custom risk estimator.

Example

Create a new risk estimator by subclassing the RiskEstimator and implementing the _compute() method.

>>> from symaware.base import RiskEstimator, TimeSeries
>>> class MyRiskEstimator(RiskEstimator):
...     def _compute(self):
...         # Your implementation here
...         # Example:
...         # Get the last value of the risk stored in the awareness database
...         awareness_database = self._agent.awareness_database
...         if len(awareness_database[self._agent_id].risk) == 0:
...             return TimeSeries({0: np.array([0])})
...         last_value_idx = next(iter(awareness_database[self._agent_id].risk))
...         last_value = awareness_database[self._agent_id].risk[last_value_idx]
...         # Invert the last value and return it as a TimeSeries
...         return TimeSeries({0: np.array([1 - last_value])})
class symaware.base.components.risk_estimator.RiskEstimator(agent_id, async_loop_lock=None)[source]

Bases: Component[Tasynclooplock, ComputingRiskCallback, ComputedRiskCallback]

Generic risk estimator of an symaware.base.Agent. It is used to compute the risk of an agent. The result of the computation is stored in the AwarenessVector of the agent.

Parameters:
  • agent_id (int) – Identifier of the agent this component belongs to

  • async_loop_lock (Optional[TypeVar(Tasynclooplock, bound= AsyncLoopLock)], default: None) – Async loop lock to use for the risk estimator

abstract _compute()[source]

Compute the risk for the agent.

This method must be implemented in any custom risk estimator.

Return type:

TimeSeries

Example

Create a new risk estimator by subclassing the RiskEstimator and implementing the _compute() method.

>>> from symaware.base import RiskEstimator, TimeSeries
>>> class MyRiskEstimator(RiskEstimator):
...     def _compute(self):
...         # Your implementation here
...         # Example:
...         # Get the last value of the risk stored in the awareness database
...         awareness_database = self._agent.awareness_database
...         if len(awareness_database[self._agent_id].risk) == 0:
...             return TimeSeries({0: np.array([0])})
...         last_value_idx = next(iter(awareness_database[self._agent_id].risk))
...         last_value = awareness_database[self._agent_id].risk[last_value_idx]
...         # Invert the last value and return it as a TimeSeries
...         return TimeSeries({0: np.array([1 - last_value])})
_update(risk)[source]

Update the agent’s model with the new risk time series.

Example

A new risk estimator could decide to override the default _update() method.

>>> from symaware.base import RiskEstimator, TimeSeries
>>> class MyRiskEstimator(RiskEstimator):
...     def _update(self, risk: TimeSeries):
...         # Your implementation here
...         # Example:
...         # Simply override the risk of the agent
...         self._agent.self_awareness.risk = risk
Parameters:

risk (TimeSeries) – New risk estimation to apply to store in the agent’s awareness vector

symaware.base.components.uncertainty_estimator module

class symaware.base.components.uncertainty_estimator.DefaultUncertaintyEstimator(agent_id, async_loop_lock=None)[source]

Bases: UncertaintyEstimator[Tasynclooplock]

Default implementation of uncertainty estimator. It returns the uncertainty stored in the awareness database of the agent.

Parameters:

agent_id (int) – Identifier of the agent this component belongs to

_compute()[source]

Compute the uncertainty for the agent by returning the uncertainty stored in the awareness database of the agent.

Returns:

Uncertainty of the agent

class symaware.base.components.uncertainty_estimator.NullUncertaintyEstimator[source]

Bases: UncertaintyEstimator[Tasynclooplock], NullObject

Default uncertainty estimator used as a placeholder. It is used when no uncertainty estimator is set for an agent. An exception is raised if this object is used in any way.

_compute()[source]

Compute the uncertainty for the agent

Example

Create a new uncertainty estimator by subclassing the UncertaintyEstimator and implementing the _compute() method.

>>> from symaware.base import UncertaintyEstimator, TimeSeries
>>> class MyUncertaintyEstimator(UncertaintyEstimator):
...     def _compute(self):
...         # Your implementation here
...         # Example:
...         # Get the last value of the uncertainty stored in the awareness database
...         awareness_database = self._agent.awareness_database
...         if len(awareness_database[self._agent_id].uncertainty) == 0:
...             return TimeSeries({0: np.array([0])})
...         last_value_idx = sorted(awareness_database[self._agent_id].uncertainty)
...         last_value = awareness_database[self._agent_id].uncertainty[last_value_idx[-1]]
...         # Inver the last value and return it as a TimeSeries
...         return TimeSeries({0: np.array([1 - last_value])})
Returns:

Uncertainty of the agent

class symaware.base.components.uncertainty_estimator.UncertaintyEstimator(agent_id, async_loop_lock=None)[source]

Bases: Component[Tasynclooplock, ComputingUncertaintyCallback, ComputedUncertaintyCallback]

Generic uncertainty computer of an symaware.base.Agent. It is used to compute the uncertainty of an agent. The result of the computation is stored in the AwarenessVector of the agent.

Parameters:
  • agent_id (int) – Identifier of the agent this component belongs to

  • async_loop_lock (Optional[TypeVar(Tasynclooplock, bound= AsyncLoopLock)], default: None) – Async loop lock to use for the uncertainty estimator

abstract _compute()[source]

Compute the uncertainty for the agent

Example

Create a new uncertainty estimator by subclassing the UncertaintyEstimator and implementing the _compute() method.

>>> from symaware.base import UncertaintyEstimator, TimeSeries
>>> class MyUncertaintyEstimator(UncertaintyEstimator):
...     def _compute(self):
...         # Your implementation here
...         # Example:
...         # Get the last value of the uncertainty stored in the awareness database
...         awareness_database = self._agent.awareness_database
...         if len(awareness_database[self._agent_id].uncertainty) == 0:
...             return TimeSeries({0: np.array([0])})
...         last_value_idx = sorted(awareness_database[self._agent_id].uncertainty)
...         last_value = awareness_database[self._agent_id].uncertainty[last_value_idx[-1]]
...         # Inver the last value and return it as a TimeSeries
...         return TimeSeries({0: np.array([1 - last_value])})
Returns:

TimeSeries – Uncertainty of the agent

_update(uncertainty)[source]

Update the uncertainty of the agent in the awareness database.

Example

>>> from symaware.base import UncertaintyEstimator, TimeSeries
>>> class MyUncertaintyEstimator(UncertaintyEstimator):
...     def _update(self, uncertainty: TimeSeries):
...         # Your implementation here
...         # Example:
...         # Simply override the uncertainty of the agent
...         self._agent.self_awareness.uncertainty = uncertainty
Parameters:

uncertainty (TimeSeries) – Uncertainty to update in the awareness database

Module contents