symaware.base.components package
Submodules
symaware.base.components.communication_receiver module
- class symaware.base.components.communication_receiver.CommunicationReceiver(agent_id, async_loop_lock=None)[source]
Bases:
Component
[Tasynclooplock
,ReceivingCommunicationCallback
,ReceivedCommunicationCallback
]Generic communication system of an
symaware.base.Agent
. It is used to communicate with other agents. The information collected is then used to update the knowledge database of the agent.The internal communication channel is read and any number of messages collected are returned. The messages are then translated into a dictionary of
AwarenessVector
andKnowledgeDatabase
ready to be used to update the agent’s state.To implement a new communication system, you need to indicate how the new messages update the agent by implementing the
_update()
method.Example
Create a new communication system by subclassing the
CommunicationReceiver
and implementing the_update()
method.>>> from symaware.base import CommunicationReceiver, Message >>> class MyCommunicationReceiver(CommunicationReceiver): ... def _update(self, *args, **kwargs): ... # Your implementation here ... # Example: ... # Update the awareness database with the messages received ... for sender_id, (awareness_vector, knowledge_database) in kwargs.items(): ... self._agent.awareness_database[sender_id] = awareness_vector ... self._agent.knowledge_database[sender_id] = knowledge_database
- Parameters:
agent_id (
int
) – Identifier of the agent this component belongs toasync_send_loop_lock – Async loop lock to use for the communication system
- async _async_compute()[source]
Upon receiving a message, it is decoded and translated into useful information asynchronously. :rtype: dict[Identifier, tuple[MultiAgentAwarenessVector, MultiAgentKnowledgeDatabase]]
Note
Check
_compute()
for more information about the method andAsyncLoopLockable
for more information about the async loop.- Returns:
Messages received by other agents as a dictionary of Awareness vector and Knowledge database
- async _async_receive_communication_from_channel()[source]
The internal communication channel is read and any number of messages collected are returned asynchronously. :rtype:
Iterable
[Message
]Note
Check
_receive_communication_from_channel()
for more information about the method andAsyncLoopLockable
for more information about the async loop.- Returns:
Iterable[Message]: _description_
- _compute()[source]
Upon receiving a message, it is decoded and translated into useful information. The simplest one would be returning directly what you receive.
- Returns:
dict[Identifier, tuple[MultiAgentAwarenessVector, MultiAgentKnowledgeDatabase]] – Messages received by other agents as a dictionary of Awareness vector and Knowledge database
- abstract _decode_message(*messages)[source]
The messages received from the communication channel are decoded to produce the output of the
_compute()
method. The default implementation assumes that the messages are of typeInfoMessage
, and produces a dictionary of Awareness vector and Knowledge database.Although the method must be implemented, you may choose to sure the super implementation if the messages are of type
InfoMessage
.Example
To support a new message type, you need to implement the
_decode_message()
method.>>> from symaware.base import CommunicationSender, Message, InfoMessage, Identifier >>> class MyCommunicationReceiver(CommunicationReceiver): ... def _decode_message(self, *messages: Message) -> "dict[Identifier, str]": ... # Your implementation here ... # Example: ... # Decode a message of type InfoMessage ... res = {} ... for message in messages: ... if isinstance(message, InfoMessage): ... res[message.sender_id] = (message.awareness_database, message.knowledge_database) ... else: ... raise TypeError(f"Unknown message type {type(message)}") ... return res
- Parameters:
messages (
Message
) – Messages to parse- Returns:
dict[Identifier, tuple[MultiAgentAwarenessVector, MultiAgentKnowledgeDatabase]] – Dictionary of Awareness vector and Knowledge database. The key is the sender id and the value is a tuple of Awareness vector and Knowledge database
- abstract _receive_communication_from_channel()[source]
The internal communication channel is read and any number of messages collected are returned.
Example
Create a new communication system by subclassing the
CommunicationReceiver
and implementing the_receive_communication_from_channel()
method.>>> from typing import Iterable >>> from symaware.base import CommunicationReceiver, Message >>> class MyCommunicationReceiver(CommunicationReceiver): ... def _receive_communication_from_channel(self) -> Iterable[Message]: ... # Your implementation here ... # Example: ... # Return an empty list. No messages are ever received ... return []
- class symaware.base.components.communication_receiver.DefaultCommunicationReceiver(agent_id, async_loop_lock=None)[source]
Bases:
CommunicationReceiver
[Tasynclooplock
]Default implementation of the communication receiver. It does not send or receive any message.
- Parameters:
agent_id (
int
) – Identifier of the agent this component belongs to
- _decode_message(*messages)[source]
The messages received from the communication channel are decoded to produce the output of the
_compute()
method. The default implementation assumes that the messages are of typeInfoMessage
, and produces a dictionary of Awareness vector and Knowledge database.Although the method must be implemented, you may choose to sure the super implementation if the messages are of type
InfoMessage
.Example
To support a new message type, you need to implement the
_decode_message()
method.>>> from symaware.base import CommunicationSender, Message, InfoMessage, Identifier >>> class MyCommunicationReceiver(CommunicationReceiver): ... def _decode_message(self, *messages: Message) -> "dict[Identifier, str]": ... # Your implementation here ... # Example: ... # Decode a message of type InfoMessage ... res = {} ... for message in messages: ... if isinstance(message, InfoMessage): ... res[message.sender_id] = (message.awareness_database, message.knowledge_database) ... else: ... raise TypeError(f"Unknown message type {type(message)}") ... return res
- _receive_communication_from_channel()[source]
The internal communication channel is read and any number of messages collected are returned.
Example
Create a new communication system by subclassing the
CommunicationReceiver
and implementing the_receive_communication_from_channel()
method.>>> from typing import Iterable >>> from symaware.base import CommunicationReceiver, Message >>> class MyCommunicationReceiver(CommunicationReceiver): ... def _receive_communication_from_channel(self) -> Iterable[Message]: ... # Your implementation here ... # Example: ... # Return an empty list. No messages are ever received ... return []
- _update()[source]
This very generic method is to be implemented by the specialised components. Update the agent with some new values, usually computed by the
compute()
method.This method must be implemented in any custom component.
Example
The
update()
method can be implemented to perform some custom updates.>>> import numpy as np >>> from symaware.base import Component, TimeSeries >>> class MyComponent(Component): ... def _update(self, new_risk: TimeSeries, new_state: np.ndarray): ... self._agent.self_awareness.risk = new_risk ... self._agent.self_state = new_state
- Parameters:
args – Additional arguments
kwargs – Additional keyword arguments
- class symaware.base.components.communication_receiver.NullCommunicationReceiver[source]
Bases:
CommunicationReceiver
[Tasynclooplock
],NullObject
Default communication receiver used as a placeholder. It is used when no communication receiver is set for an agent. An exception is raised if this object is used in any way.
- _decode_message(*messages)[source]
The messages received from the communication channel are decoded to produce the output of the
_compute()
method. The default implementation assumes that the messages are of typeInfoMessage
, and produces a dictionary of Awareness vector and Knowledge database.Although the method must be implemented, you may choose to sure the super implementation if the messages are of type
InfoMessage
.Example
To support a new message type, you need to implement the
_decode_message()
method.>>> from symaware.base import CommunicationSender, Message, InfoMessage, Identifier >>> class MyCommunicationReceiver(CommunicationReceiver): ... def _decode_message(self, *messages: Message) -> "dict[Identifier, str]": ... # Your implementation here ... # Example: ... # Decode a message of type InfoMessage ... res = {} ... for message in messages: ... if isinstance(message, InfoMessage): ... res[message.sender_id] = (message.awareness_database, message.knowledge_database) ... else: ... raise TypeError(f"Unknown message type {type(message)}") ... return res
- Parameters:
messages (
Message
) – Messages to parse- Returns:
Dictionary of Awareness vector and Knowledge database. The key is the sender id and the value is a tuple of Awareness vector and Knowledge database
- _receive_communication_from_channel()[source]
The internal communication channel is read and any number of messages collected are returned.
Example
Create a new communication system by subclassing the
CommunicationReceiver
and implementing the_receive_communication_from_channel()
method.>>> from typing import Iterable >>> from symaware.base import CommunicationReceiver, Message >>> class MyCommunicationReceiver(CommunicationReceiver): ... def _receive_communication_from_channel(self) -> Iterable[Message]: ... # Your implementation here ... # Example: ... # Return an empty list. No messages are ever received ... return []
- Returns:
Messages received by another agent
- _update()[source]
This very generic method is to be implemented by the specialised components. Update the agent with some new values, usually computed by the
compute()
method.This method must be implemented in any custom component.
Example
The
update()
method can be implemented to perform some custom updates.>>> import numpy as np >>> from symaware.base import Component, TimeSeries >>> class MyComponent(Component): ... def _update(self, new_risk: TimeSeries, new_state: np.ndarray): ... self._agent.self_awareness.risk = new_risk ... self._agent.self_state = new_state
- Parameters:
args – Additional arguments
kwargs – Additional keyword arguments
symaware.base.components.communication_sender module
- class symaware.base.components.communication_sender.CommunicationSender(agent_id, async_loop_lock=None, send_to_self=False)[source]
Bases:
Component
[Tasynclooplock
,SendingCommunicationCallback
,SentCommunicationCallback
]Generic communication system of an
symaware.base.Agent
. It is used to communicate with other agents. The information collected is then used to update the knowledge database of the agent.Before sending a message, it will be enqueued in the message queue. On the next iteration, the messages will be sent to the receiver agent(s) through the communication channel.
- Parameters:
agent_id (
int
) – Identifier of the agent this component belongs toasync_loop_lock (
Optional
[TypeVar
(Tasynclooplock
, bound=AsyncLoopLock
)], default:None
) – Async loop lock to use for the communication sendersend_to_self (
bool
, default:False
) – If True, the agent is able to send messages to itself
- async _async_compute()[source]
All the messages in the message queue are sent to the receiver agent(s) through the communication channel asynchronously. :rtype:
Iterable
[Message
]Note
Check
_compute()
for more information about the method andAsyncLoopLockable
for more information about the async loop.- Returns:
Message(s) sent to the receiver agent(s)
- async _async_send_communication_through_channel(message)[source]
The information the agents wants to share has been encoded into a message and it sent to another agent using a communication channel asynchronously.
Note
Check
_send_communication_through_channel()
for more information about the method andAsyncLoopLockable
for more information about the async loop.- Parameters:
message (
Message
) – Message to send to the receiver agent through the communication channel
- _compute()[source]
All the messages in the message queue are sent to the receiver agent(s) through the communication channel. Some implementations may put a limit on the number of messages sent per iteration, to avoid flooding the channel.
- abstract _send_communication_through_channel(message)[source]
The information the agents wants to share has been encoded into a message and it sent to another agent using a communication channel.
Example
Create a new communication system by subclassing the
CommunicationSender
and implementing the_send_communication_through_channel()
method.>>> from symaware.base import CommunicationSender, Message >>> class MyCommunicationSender(CommunicationSender): ... def _send_communication_through_channel(self, message: Message): ... # Your implementation here ... # Example: ... # Print the message to the console ... print(f"Sending message to {message.receiver_id}: {message}")
- Parameters:
message (
Message
) – Message to send to the receiver agent through the communication channel
- _update(messages_sent)[source]
This very generic method is to be implemented by the specialised components. Update the agent with some new values, usually computed by the
compute()
method.This method must be implemented in any custom component.
Example
The
update()
method can be implemented to perform some custom updates.>>> import numpy as np >>> from symaware.base import Component, TimeSeries >>> class MyComponent(Component): ... def _update(self, new_risk: TimeSeries, new_state: np.ndarray): ... self._agent.self_awareness.risk = new_risk ... self._agent.self_state = new_state
- Parameters:
args – Additional arguments
kwargs – Additional keyword arguments
- class symaware.base.components.communication_sender.DefaultCommunicationSender(agent_id, async_loop_lock=None, send_to_self=False)[source]
Bases:
CommunicationSender
[Tasynclooplock
]Default implementation of the communication sender. It does not send or receive any message.
- Parameters:
agent_id (
int
) – Identifier of the agent this component belongs to
- _send_communication_through_channel(message)[source]
The information the agents wants to share has been encoded into a message and it sent to another agent using a communication channel.
Example
Create a new communication system by subclassing the
CommunicationSender
and implementing the_send_communication_through_channel()
method.>>> from symaware.base import CommunicationSender, Message >>> class MyCommunicationSender(CommunicationSender): ... def _send_communication_through_channel(self, message: Message): ... # Your implementation here ... # Example: ... # Print the message to the console ... print(f"Sending message to {message.receiver_id}: {message}")
- Parameters:
message (
Message
) – Message to send to the receiver agent through the communication channel
- class symaware.base.components.communication_sender.NullCommunicationSender[source]
Bases:
CommunicationSender
,NullObject
Default communication sender used as a placeholder. It is used when no communication sender is set for an agent. An exception is raised if this object is used in any way.
- _send_communication_through_channel(message)[source]
The information the agents wants to share has been encoded into a message and it sent to another agent using a communication channel.
Example
Create a new communication system by subclassing the
CommunicationSender
and implementing the_send_communication_through_channel()
method.>>> from symaware.base import CommunicationSender, Message >>> class MyCommunicationSender(CommunicationSender): ... def _send_communication_through_channel(self, message: Message): ... # Your implementation here ... # Example: ... # Print the message to the console ... print(f"Sending message to {message.receiver_id}: {message}")
- Parameters:
message (
Message
) – Message to send to the receiver agent through the communication channel
symaware.base.components.component module
- class symaware.base.components.component.Component(agent_id, async_loop_lock=None)[source]
Bases:
Publisher
,AsyncLoopLockable
[Tasynclooplock
],ABC
,Generic
[Tasynclooplock
,ComputingCallback
,ComputedCallback
]Generic component of an
Agent
. All specialized components should inherit from this class.- Parameters:
agent_id (
int
) – Identifier of the agent this component belongs toasync_loop_lock (
Optional
[TypeVar
(Tasynclooplock
, bound=AsyncLoopLock
)], default:None
) – Async loop lock to use to wait for each loop if the component is used asynchronously
- async _async_compute(*args, **kwargs)[source]
Given the state of the agent, compute a value or a set of values asynchronously.
Note
Check
_compute()
for more information about the method andAsyncLoopLockable
for more information about the async loop.- Parameters:
args – Additional arguments
kwargs – Additional keyword arguments
- Returns:
Any
– Computed value or set of values
- async _async_update(*args, **kwargs)[source]
Update the agent with some new values, usually computed by the
compute()
method, asynchronously.Note
Check
_update()
for more information about the method andAsyncLoopLockable
for more information about the async loop.- Parameters:
args – Additional arguments
kwargs – Additional keyword arguments
- abstract _compute(*args, **kwargs)[source]
This very generic method is to be implemented by the specialised components. Given the state of the agent, compute a value or a set of values. Those values can be used to update the agent’s state or to compute the control input of the agent.
This method must be implemented in any custom component.
Example
The
compute()
method can be implemented to perform some custom computation.>>> import numpy as np >>> from symaware.base import Component >>> class MyComponent(Component): ... def _compute(self): ... state = self._agent.self_state ... risk = self._agent.self_awareness.risk.get(0, np.zeros(1)) ... return state * np.maximum(0, risk)
- Parameters:
args – Additional arguments
kwargs – Additional keyword arguments
- Returns:
Any
– Computed value or set of values
- abstract _update(*args, **kwargs)[source]
This very generic method is to be implemented by the specialised components. Update the agent with some new values, usually computed by the
compute()
method.This method must be implemented in any custom component.
Example
The
update()
method can be implemented to perform some custom updates.>>> import numpy as np >>> from symaware.base import Component, TimeSeries >>> class MyComponent(Component): ... def _update(self, new_risk: TimeSeries, new_state: np.ndarray): ... self._agent.self_awareness.risk = new_risk ... self._agent.self_state = new_state
- Parameters:
args – Additional arguments
kwargs – Additional keyword arguments
- add_on_initialised(callback)[source]
Add a callback to the event
initialised
- Parameters:
callback (InitialisedCallback) – Callback to add
- async async_compute(*args, **kwargs)[source]
Given the state of the agent, compute a value or a set of values.
Note
Check
compute()
for more information about the method andAsyncLoopLockable
for more information about the async loop.- Parameters:
args – Additional arguments
kwargs – Additional keyword arguments
- Returns:
Any
– Computed value or set of values- Raises:
RuntimeError – If the controller has not been initialised yet:
- async async_compute_and_update(*args, **kwargs)[source]
Compute and update the component in one go asynchronously. See
compute()
andupdate()
for more information. Although not necessary, it is possible to override it with the correct signature. Just make sure to call the super method to ensure the correct behaviour. :rtype:Any
Note
Check
compute_and_update()
for more information about the method andAsyncLoopLockable
for more information about the async loop.- Returns:
Computed value or set of values
- async async_initialise_component(agent, initial_awareness_database, initial_knowledge_database)[source]
Initialize the component with some custom logic asynchronously.
Note
Check
initialise_component()
for more information about the method andAsyncLoopLockable
for more information about the async loop.- Parameters:
agent (Agent) – Agent this component has been attached to
initial_awareness_database (dict[Identifier, AwarenessVector]) – Awareness database of the agent
initial_knowledge_database (dict[Identifier, T]) – Knowledge database of the agent
- async async_update(*args, **kwargs)[source]
Given some new values, update the state of the agent asynchronously.
Note
Check
update()
for more information about the method andAsyncLoopLockable
for more information about the async loop.- Parameters:
args – Additional arguments
kwargs – Additional keyword arguments
- Raises:
RuntimeError – If the controller has not been initialised yet:
- compute(*args, **kwargs)[source]
Given the state of the agent, compute a value or a set of values. Those values can be used to update the agent’s state or to compute the control input of the agent. Internally, this method calls the
_compute()
method. Although not necessary, it is possible to override it with the correct signature. Just make sure to call the super method to ensure the correct behaviour.Note
Invoking this method will notify the subscribers of the events computing and
computed
added withadd_on_computing()
and :meth:add_on_computed
respectively.- Parameters:
args – Additional arguments
kwargs – Additional keyword arguments
- Returns:
Any
– Computed value or set of values- Raises:
RuntimeError – If the controller has not been initialised yet:
- compute_and_update(*args, **kwargs)[source]
Compute and update the component in one go. See
compute()
andupdate()
for more information. Although not necessary, it is possible to override it with the correct signature. Just make sure to call the super method to ensure the correct behaviour. :rtype:Any
Note
Invoking this method will notify the subscribers of the events iterating and
iterated
added withadd_on_iterating()
and :meth:add_on_iterated
respectively.- Returns:
Computed value or set of values
- initialise_component(agent, initial_awareness_database, initial_knowledge_database)[source]
Initialize the component with some custom logic. For example you can instantiate new attributes and properties of the component. This function is called upon initialization by the
Agent
. To get the agent’s initial awareness and knowledge database you can use the arguments of this function. Make sure to call the super method at the end of your implementation to notify the subscribers of the event.Note
Invoking this method will notify the subscribers of the event
initialised
added withadd_on_initialised()
.Warning
The implementation of the method in
Component
notifies the subscribers of the eventinitialised
, sets the attribute_agent
to the agent passed as argument and the flag_initialised
toTrue
. If you override this method, make sure to call the super method at the end of your implementation.Example
The
initialise_component()
method can be overwritten to provide some custom initialisation logic.>>> import time >>> import numpy as np >>> from symaware.base import Component >>> class MyComponent(Component): ... def initialise_component( ... self, agent, initial_awareness_database, initial_knowledge_database ... ): ... self._my_model = agent.entity.model ... self._my_state = initial_awareness_database[self.agent_id].state ... self._my_time = time.time() ... self._my_list = [] ... super().initialise_component(entity, initial_awareness_database, initial_knowledge_database)
- Parameters:
agent (Agent) – Agent this component has been attached to
initial_awareness_database (dict[Identifier, AwarenessVector]) – Awareness database of the agent
initial_knowledge_database (dict[Identifier, T]) – Knowledge database of the agent
- remove_on_initialised(callback)[source]
Remove a callback from the event
initialised
- Parameters:
callback (InitialisedCallback) – Callback to remove
- update(*args, **kwargs)[source]
Given some new values, update the state of the agent. Internally, this method calls the
_update()
method. Although not necessary, it is possible to override it with the correct signature. Just make sure to call the super method to ensure the correct behaviour.Note
Invoking this method will notify the subscribers of the events updating and
updated
added withadd_on_updating()
and :meth:add_on_updated
respectively.- Parameters:
args – Additional arguments
kwargs – Additional keyword arguments
- Raises:
RuntimeError – If the controller has not been initialised yet:
symaware.base.components.controller module
- class symaware.base.components.controller.Controller(agent_id, async_loop_lock=None)[source]
Bases:
Component
[Tasynclooplock
,ComputingControlInputCallback
,ComputedControlInputCallback
]Abstract class for the controller. Compute the control input for the agent. Normally the agent would use its information available at a certain time to compute a control input. This could translate in a lot of information being passed to the
compute()
.- Parameters:
agent_id – Identifier of the agent this component belongs to
async_loop_lock (
Optional
[TypeVar
(Tasynclooplock
, bound=AsyncLoopLock
)], default:None
) – Async loop lock to use for the controller
- abstract _compute()[source]
Compute the control input for the agent. Normally the agent would use its information available at a certain time to compute a control input. This could translate in a lot of information being passed to this method.
This method must be implemented in any custom controller.
Example
Create a new controller by subclassing the
Controller
and implementing the_compute()
method and the_update()
method.>>> from symaware.base import Controller, MultiAgentAwarenessVector, MultiAgentKnowledgeDatabase, TimeSeries >>> import numpy as np >>> class MyController(Controller): ... def _compute(self) -> "tuple[np.ndarray, TimeSeries]": ... # Your implementation here ... # Example: ... # Get the state of the agent ... state = self._agent.self_state ... # Get the goal position from the knowledge database ... goal_pos = self._agent.self_knowledge["goal_pos"] ... # Compute the control input as the difference between the goal position and the current state ... control_input = goal_pos - state ... # Return the control input and an empty TimeSeries ... return control_input, TimeSeries() ... ... def _update(self, control_input_and_intent: "tuple[np.ndarray, TimeSeries]"): ... # Your implementation here ... # Example: ... # Simply override the control input and intent of the agent ... control_input, intent = control_input_and_intent ... self._agent.model.control_input = control_input ... self._agent.self_awareness.intent = intent
- Returns:
tuple
[ndarray
,TimeSeries
] –:New state of the agent the controller wants to reach,
Time series of intents of the controller, Can be empty
- _update(control_input_and_intent)[source]
Update the agent’s model with the computed control input and store the intent in the agent’s awareness vector.
Example
A new controller could decide to override the default
_update()
method.>>> from symaware.base import Controller, TimeSeries >>> class MyController(Controller): ... def _update(self, control_input_and_intent: "tuple[np.ndarray, TimeSeries]"): ... # Your implementation here ... # Example: ... # Simply override the control input and intent of the agent ... control_input, intent = control_input_and_intent ... self._agent.model.control_input = control_input ... self._agent.self_awareness.intent = intent
- Parameters:
control_input_and_intent (
tuple
[ndarray
,TimeSeries
]) – Tuple composed of two elements: - new control input to apply to the agent’s model - new intent to store in the agent’s awareness vector
- property dynamical_model: DynamicalModel
Dynamical system model for the agent
- initialise_component(agent, initial_awareness_database, initial_knowledge_database)[source]
Initialize the component with some custom logic. For example you can instantiate new attributes and properties of the component. This function is called upon initialization by the
Agent
. To get the agent’s initial awareness and knowledge database you can use the arguments of this function. Make sure to call the super method at the end of your implementation to notify the subscribers of the event.Note
Invoking this method will notify the subscribers of the event
initialised
added withadd_on_initialised()
.Warning
The implementation of the method in
Component
notifies the subscribers of the eventinitialised
, sets the attribute_agent
to the agent passed as argument and the flag_initialised
toTrue
. If you override this method, make sure to call the super method at the end of your implementation.Example
The
initialise_component()
method can be overwritten to provide some custom initialisation logic.>>> import time >>> import numpy as np >>> from symaware.base import Component >>> class MyComponent(Component): ... def initialise_component( ... self, agent, initial_awareness_database, initial_knowledge_database ... ): ... self._my_model = agent.entity.model ... self._my_state = initial_awareness_database[self.agent_id].state ... self._my_time = time.time() ... self._my_list = [] ... super().initialise_component(entity, initial_awareness_database, initial_knowledge_database)
- Parameters:
agent (Agent) – Agent this component has been attached to
initial_awareness_database (dict[Identifier, AwarenessVector]) – Awareness database of the agent
initial_knowledge_database (dict[Identifier, T]) – Knowledge database of the agent
- class symaware.base.components.controller.DefaultController(agent_id, async_loop_lock=None)[source]
Bases:
Controller
Default implementation of the controller. It returns a zero vector of the right size as control input.
- Parameters:
agent_id – Identifier of the agent this component belongs to
async_loop_lock (
Optional
[TypeVar
(Tasynclooplock
, bound=AsyncLoopLock
)], default:None
) – Async loop lock to use for the controller
- _compute()[source]
Compute the control input for the agent. Normally the agent would use its information available at a certain time to compute a control input. This could translate in a lot of information being passed to this method.
This method must be implemented in any custom controller.
Example
Create a new controller by subclassing the
Controller
and implementing the_compute()
method and the_update()
method.>>> from symaware.base import Controller, MultiAgentAwarenessVector, MultiAgentKnowledgeDatabase, TimeSeries >>> import numpy as np >>> class MyController(Controller): ... def _compute(self) -> "tuple[np.ndarray, TimeSeries]": ... # Your implementation here ... # Example: ... # Get the state of the agent ... state = self._agent.self_state ... # Get the goal position from the knowledge database ... goal_pos = self._agent.self_knowledge["goal_pos"] ... # Compute the control input as the difference between the goal position and the current state ... control_input = goal_pos - state ... # Return the control input and an empty TimeSeries ... return control_input, TimeSeries() ... ... def _update(self, control_input_and_intent: "tuple[np.ndarray, TimeSeries]"): ... # Your implementation here ... # Example: ... # Simply override the control input and intent of the agent ... control_input, intent = control_input_and_intent ... self._agent.model.control_input = control_input ... self._agent.self_awareness.intent = intent
- Returns:
New state of the agent the controller wants to reach,
Time series of intents of the controller, Can be empty
- initialise_component(agent, initial_awareness_database, initial_knowledge_database)[source]
Initialize the component with some custom logic. For example you can instantiate new attributes and properties of the component. This function is called upon initialization by the
Agent
. To get the agent’s initial awareness and knowledge database you can use the arguments of this function. Make sure to call the super method at the end of your implementation to notify the subscribers of the event.Note
Invoking this method will notify the subscribers of the event
initialised
added withadd_on_initialised()
.Warning
The implementation of the method in
Component
notifies the subscribers of the eventinitialised
, sets the attribute_agent
to the agent passed as argument and the flag_initialised
toTrue
. If you override this method, make sure to call the super method at the end of your implementation.Example
The
initialise_component()
method can be overwritten to provide some custom initialisation logic.>>> import time >>> import numpy as np >>> from symaware.base import Component >>> class MyComponent(Component): ... def initialise_component( ... self, agent, initial_awareness_database, initial_knowledge_database ... ): ... self._my_model = agent.entity.model ... self._my_state = initial_awareness_database[self.agent_id].state ... self._my_time = time.time() ... self._my_list = [] ... super().initialise_component(entity, initial_awareness_database, initial_knowledge_database)
- Parameters:
agent – Agent this component has been attached to
initial_awareness_database – Awareness database of the agent
initial_knowledge_database – Knowledge database of the agent
- class symaware.base.components.controller.NullController[source]
Bases:
Controller
,NullObject
Default controller used as a placeholder. It is used when no controller is set for an agent. An exception is raised if this object is used in any way.
- _compute()[source]
Compute the control input for the agent. Normally the agent would use its information available at a certain time to compute a control input. This could translate in a lot of information being passed to this method.
This method must be implemented in any custom controller.
Example
Create a new controller by subclassing the
Controller
and implementing the_compute()
method and the_update()
method.>>> from symaware.base import Controller, MultiAgentAwarenessVector, MultiAgentKnowledgeDatabase, TimeSeries >>> import numpy as np >>> class MyController(Controller): ... def _compute(self) -> "tuple[np.ndarray, TimeSeries]": ... # Your implementation here ... # Example: ... # Get the state of the agent ... state = self._agent.self_state ... # Get the goal position from the knowledge database ... goal_pos = self._agent.self_knowledge["goal_pos"] ... # Compute the control input as the difference between the goal position and the current state ... control_input = goal_pos - state ... # Return the control input and an empty TimeSeries ... return control_input, TimeSeries() ... ... def _update(self, control_input_and_intent: "tuple[np.ndarray, TimeSeries]"): ... # Your implementation here ... # Example: ... # Simply override the control input and intent of the agent ... control_input, intent = control_input_and_intent ... self._agent.model.control_input = control_input ... self._agent.self_awareness.intent = intent
- Returns:
New state of the agent the controller wants to reach,
Time series of intents of the controller, Can be empty
symaware.base.components.perception_system module
- class symaware.base.components.perception_system.DefaultPerceptionSystem(agent_id, environment, async_loop_lock=None)[source]
Bases:
PerceptionSystem
[Tasynclooplock
]Default implementation of perception system. It returns the values collected from the environment.
- Parameters:
agent_id (
int
) – Identifier of the agent this component belongs to
- _compute()[source]
Compute the information perceived by the agent. It returns the values collected from the environment.
- Returns:
dict
[int
,StateObservation
] – Information perceived by the agent. Each agent’s state is identified by its id.
- class symaware.base.components.perception_system.NullPerceptionSystem[source]
Bases:
PerceptionSystem
[Tasynclooplock
],NullObject
Default perception system used as a placeholder. It is used when no perception system is set for an agent. An exception is raised if this object is used in any way.
- _compute()[source]
Perceive the state of each
Agent
in thesymaware.base.Environment
. It is used to update the knowledge database of the agent.Example
Create a new perception system by subclassing the
PerceptionSystem
and implementing the_compute()
method.>>> from symaware.base import PerceptionSystem, StateObservation >>> class MyPerceptionSystem(PerceptionSystem): ... def _compute(self): ... # Your implementation here ... # Example: ... # Get only the agent's own state ... if not self._agent_id in self._env.agent_states: ... return {} ... return {self._agent_id: StateObservation(self._agent_id, self._env.agent_states[self._agent_id])}
- Returns:
Information perceived by the agent. Each agent’s state is identified by its id.
- class symaware.base.components.perception_system.PerceptionSystem(agent_id, environment, async_loop_lock=None)[source]
Bases:
Component
[Tasynclooplock
,PerceivingInformationCallback
,PerceivedInformationCallback
]Generic perception system of an
symaware.base.Agent
. It is used to perceive the state of theAgent
thesymaware.base.Environment
. The implementation could extend this and include more complex interaction, like sensors or omniscient perception. The information collected is then used to update the awareness vector and knowledge database of the agent.- Parameters:
agent_id (
int
) – Identifier of the agent this component belongs toenvironment (
Environment
) –symaware.base.Environment
the perception system will observeasync_loop_lock (
Optional
[TypeVar
(Tasynclooplock
, bound=AsyncLoopLock
)], default:None
) – Async loop lock to use for the perception system
- abstract _compute()[source]
Perceive the state of each
Agent
in thesymaware.base.Environment
. It is used to update the knowledge database of the agent.Example
Create a new perception system by subclassing the
PerceptionSystem
and implementing the_compute()
method.>>> from symaware.base import PerceptionSystem, StateObservation >>> class MyPerceptionSystem(PerceptionSystem): ... def _compute(self): ... # Your implementation here ... # Example: ... # Get only the agent's own state ... if not self._agent_id in self._env.agent_states: ... return {} ... return {self._agent_id: StateObservation(self._agent_id, self._env.agent_states[self._agent_id])}
- Returns:
dict
[int
,StateObservation
] – Information perceived by the agent. Each agent’s state is identified by its id.
- _update(perceived_information)[source]
Update the agent’s model with the new perceived information.
Example
A new perception system could decide to override the default
_update()
method.>>> from symaware.base import PerceptionSystem, StateObservation, Identifier >>> class MyPerceptionSystem(PerceptionSystem): ... def _update(self, perceived_information: "dict[Identifier, StateObservation]"): ... # Your implementation here ... # Example: ... # Simply override the state of the agent ... self._agent.awareness_database[self._agent_id].state = perceived_information[self._agent_id].state
- Parameters:
perceived_information (
dict
[int
,StateObservation
]) – Information perceived by the agent. Each agent’s state is identified by its id.
- property environment: Environment
Environment
- perceive_self_state()[source]
Perceive the state of the agent itself
- Returns:
ndarray
– State of the agent- Raises:
RuntimeError – The perception system has not been initialised yet:
symaware.base.components.risk_estimator module
- class symaware.base.components.risk_estimator.DefaultRiskEstimator(agent_id, async_loop_lock=None)[source]
Bases:
RiskEstimator
[Tasynclooplock
]Default implementation of risk estimator. It returns the risk stored in the awareness database of the agent.
- Parameters:
agent_id (
int
) – Identifier of the agent this component belongs to
- _compute()[source]
Compute the risk for the agent.
This method must be implemented in any custom risk estimator.
- Return type:
Example
Create a new risk estimator by subclassing the
RiskEstimator
and implementing the_compute()
method.>>> from symaware.base import RiskEstimator, TimeSeries >>> class MyRiskEstimator(RiskEstimator): ... def _compute(self): ... # Your implementation here ... # Example: ... # Get the last value of the risk stored in the awareness database ... awareness_database = self._agent.awareness_database ... if len(awareness_database[self._agent_id].risk) == 0: ... return TimeSeries({0: np.array([0])}) ... last_value_idx = next(iter(awareness_database[self._agent_id].risk)) ... last_value = awareness_database[self._agent_id].risk[last_value_idx] ... # Invert the last value and return it as a TimeSeries ... return TimeSeries({0: np.array([1 - last_value])})
- class symaware.base.components.risk_estimator.NullRiskEstimator[source]
Bases:
RiskEstimator
[Tasynclooplock
],NullObject
Default risk estimator used as a placeholder. It is used when no risk estimator is set for an agent. An exception is raised if this object is used in any way.
- _compute()[source]
Compute the risk for the agent.
This method must be implemented in any custom risk estimator.
Example
Create a new risk estimator by subclassing the
RiskEstimator
and implementing the_compute()
method.>>> from symaware.base import RiskEstimator, TimeSeries >>> class MyRiskEstimator(RiskEstimator): ... def _compute(self): ... # Your implementation here ... # Example: ... # Get the last value of the risk stored in the awareness database ... awareness_database = self._agent.awareness_database ... if len(awareness_database[self._agent_id].risk) == 0: ... return TimeSeries({0: np.array([0])}) ... last_value_idx = next(iter(awareness_database[self._agent_id].risk)) ... last_value = awareness_database[self._agent_id].risk[last_value_idx] ... # Invert the last value and return it as a TimeSeries ... return TimeSeries({0: np.array([1 - last_value])})
- class symaware.base.components.risk_estimator.RiskEstimator(agent_id, async_loop_lock=None)[source]
Bases:
Component
[Tasynclooplock
,ComputingRiskCallback
,ComputedRiskCallback
]Generic risk estimator of an
symaware.base.Agent
. It is used to compute the risk of an agent. The result of the computation is stored in theAwarenessVector
of the agent.- Parameters:
agent_id (
int
) – Identifier of the agent this component belongs toasync_loop_lock (
Optional
[TypeVar
(Tasynclooplock
, bound=AsyncLoopLock
)], default:None
) – Async loop lock to use for the risk estimator
- abstract _compute()[source]
Compute the risk for the agent.
This method must be implemented in any custom risk estimator.
- Return type:
Example
Create a new risk estimator by subclassing the
RiskEstimator
and implementing the_compute()
method.>>> from symaware.base import RiskEstimator, TimeSeries >>> class MyRiskEstimator(RiskEstimator): ... def _compute(self): ... # Your implementation here ... # Example: ... # Get the last value of the risk stored in the awareness database ... awareness_database = self._agent.awareness_database ... if len(awareness_database[self._agent_id].risk) == 0: ... return TimeSeries({0: np.array([0])}) ... last_value_idx = next(iter(awareness_database[self._agent_id].risk)) ... last_value = awareness_database[self._agent_id].risk[last_value_idx] ... # Invert the last value and return it as a TimeSeries ... return TimeSeries({0: np.array([1 - last_value])})
- _update(risk)[source]
Update the agent’s model with the new risk time series.
Example
A new risk estimator could decide to override the default
_update()
method.>>> from symaware.base import RiskEstimator, TimeSeries >>> class MyRiskEstimator(RiskEstimator): ... def _update(self, risk: TimeSeries): ... # Your implementation here ... # Example: ... # Simply override the risk of the agent ... self._agent.self_awareness.risk = risk
- Parameters:
risk (
TimeSeries
) – New risk estimation to apply to store in the agent’s awareness vector
symaware.base.components.uncertainty_estimator module
- class symaware.base.components.uncertainty_estimator.DefaultUncertaintyEstimator(agent_id, async_loop_lock=None)[source]
Bases:
UncertaintyEstimator
[Tasynclooplock
]Default implementation of uncertainty estimator. It returns the uncertainty stored in the awareness database of the agent.
- Parameters:
agent_id (
int
) – Identifier of the agent this component belongs to
- class symaware.base.components.uncertainty_estimator.NullUncertaintyEstimator[source]
Bases:
UncertaintyEstimator
[Tasynclooplock
],NullObject
Default uncertainty estimator used as a placeholder. It is used when no uncertainty estimator is set for an agent. An exception is raised if this object is used in any way.
- _compute()[source]
Compute the uncertainty for the agent
Example
Create a new uncertainty estimator by subclassing the
UncertaintyEstimator
and implementing the_compute()
method.>>> from symaware.base import UncertaintyEstimator, TimeSeries >>> class MyUncertaintyEstimator(UncertaintyEstimator): ... def _compute(self): ... # Your implementation here ... # Example: ... # Get the last value of the uncertainty stored in the awareness database ... awareness_database = self._agent.awareness_database ... if len(awareness_database[self._agent_id].uncertainty) == 0: ... return TimeSeries({0: np.array([0])}) ... last_value_idx = sorted(awareness_database[self._agent_id].uncertainty) ... last_value = awareness_database[self._agent_id].uncertainty[last_value_idx[-1]] ... # Inver the last value and return it as a TimeSeries ... return TimeSeries({0: np.array([1 - last_value])})
- Returns:
Uncertainty of the agent
- class symaware.base.components.uncertainty_estimator.UncertaintyEstimator(agent_id, async_loop_lock=None)[source]
Bases:
Component
[Tasynclooplock
,ComputingUncertaintyCallback
,ComputedUncertaintyCallback
]Generic uncertainty computer of an
symaware.base.Agent
. It is used to compute the uncertainty of an agent. The result of the computation is stored in theAwarenessVector
of the agent.- Parameters:
agent_id (
int
) – Identifier of the agent this component belongs toasync_loop_lock (
Optional
[TypeVar
(Tasynclooplock
, bound=AsyncLoopLock
)], default:None
) – Async loop lock to use for the uncertainty estimator
- abstract _compute()[source]
Compute the uncertainty for the agent
Example
Create a new uncertainty estimator by subclassing the
UncertaintyEstimator
and implementing the_compute()
method.>>> from symaware.base import UncertaintyEstimator, TimeSeries >>> class MyUncertaintyEstimator(UncertaintyEstimator): ... def _compute(self): ... # Your implementation here ... # Example: ... # Get the last value of the uncertainty stored in the awareness database ... awareness_database = self._agent.awareness_database ... if len(awareness_database[self._agent_id].uncertainty) == 0: ... return TimeSeries({0: np.array([0])}) ... last_value_idx = sorted(awareness_database[self._agent_id].uncertainty) ... last_value = awareness_database[self._agent_id].uncertainty[last_value_idx[-1]] ... # Inver the last value and return it as a TimeSeries ... return TimeSeries({0: np.array([1 - last_value])})
- Returns:
TimeSeries
– Uncertainty of the agent
- _update(uncertainty)[source]
Update the uncertainty of the agent in the awareness database.
Example
>>> from symaware.base import UncertaintyEstimator, TimeSeries >>> class MyUncertaintyEstimator(UncertaintyEstimator): ... def _update(self, uncertainty: TimeSeries): ... # Your implementation here ... # Example: ... # Simply override the uncertainty of the agent ... self._agent.self_awareness.uncertainty = uncertainty
- Parameters:
uncertainty (
TimeSeries
) – Uncertainty to update in the awareness database