from abc import abstractmethod
from typing import TYPE_CHECKING
import numpy as np
from symaware.base.data import (
MultiAgentAwarenessVector,
MultiAgentKnowledgeDatabase,
TimeSeries,
)
from symaware.base.models import DynamicalModel, NullDynamicalModel
from symaware.base.utils import NullObject, Tasynclooplock
from .component import Component
if TYPE_CHECKING:
import sys
from typing import Any, Callable
from symaware.base.agent import Agent
if sys.version_info >= (3, 10):
from typing import TypeAlias
else:
from typing_extensions import TypeAlias
ComputingControlInputCallback: TypeAlias = Callable[[Agent], Any]
ComputedControlInputCallback: TypeAlias = Callable[[Agent, tuple[np.ndarray, TimeSeries]], Any]
[docs]
class Controller(Component[Tasynclooplock, "ComputingControlInputCallback", "ComputedControlInputCallback"]):
"""
Abstract class for the controller.
Compute the control input for the agent.
Normally the agent would use its information available at a certain time to compute a control input.
This could translate in a lot of information being passed to the :meth:`compute`.
Args
----
agent_id:
Identifier of the agent this component belongs to
async_loop_lock:
Async loop lock to use for the controller
"""
def __init__(self, agent_id, async_loop_lock: "Tasynclooplock | None" = None):
super().__init__(agent_id, async_loop_lock)
self._dynamical_model: DynamicalModel = NullDynamicalModel.instance()
@property
def dynamical_model(self) -> DynamicalModel:
"""
Dynamical system model for the agent
"""
return self._dynamical_model
[docs]
def initialise_component(
self,
agent: "Agent",
initial_awareness_database: MultiAgentAwarenessVector,
initial_knowledge_database: MultiAgentKnowledgeDatabase,
):
self._dynamical_model = agent.model
super().initialise_component(agent, initial_awareness_database, initial_knowledge_database)
[docs]
@abstractmethod
def _compute(self) -> "tuple[np.ndarray, TimeSeries]":
"""
Compute the control input for the agent.
Normally the agent would use its information available at a certain time to compute a control input.
This could translate in a lot of information being passed to this method.
This method must be implemented in any custom controller.
Example
-------
Create a new controller by subclassing the :class:`.Controller` and implementing the
:meth:`_compute` method and the :meth:`_update` method.
>>> from symaware.base import Controller, MultiAgentAwarenessVector, MultiAgentKnowledgeDatabase, TimeSeries
>>> import numpy as np
>>> class MyController(Controller):
... def _compute(self) -> "tuple[np.ndarray, TimeSeries]":
... # Your implementation here
... # Example:
... # Get the state of the agent
... state = self._agent.self_state
... # Get the goal position from the knowledge database
... goal_pos = self._agent.self_knowledge["goal_pos"]
... # Compute the control input as the difference between the goal position and the current state
... control_input = goal_pos - state
... # Return the control input and an empty TimeSeries
... return control_input, TimeSeries()
...
... def _update(self, control_input_and_intent: "tuple[np.ndarray, TimeSeries]"):
... # Your implementation here
... # Example:
... # Simply override the control input and intent of the agent
... control_input, intent = control_input_and_intent
... self._agent.model.control_input = control_input
... self._agent.self_awareness.intent = intent
Returns
-------
- New state of the agent the controller wants to reach,
- Time series of intents of the controller, Can be empty
"""
pass
[docs]
def _update(self, control_input_and_intent: "tuple[np.ndarray, TimeSeries]"):
"""
Update the agent's model with the computed control input
and store the intent in the agent's awareness vector.
Example
-------
A new controller could decide to override the default :meth:`_update` method.
>>> from symaware.base import Controller, TimeSeries
>>> class MyController(Controller):
... def _update(self, control_input_and_intent: "tuple[np.ndarray, TimeSeries]"):
... # Your implementation here
... # Example:
... # Simply override the control input and intent of the agent
... control_input, intent = control_input_and_intent
... self._agent.model.control_input = control_input
... self._agent.self_awareness.intent = intent
Args
----
control_input_and_intent:
Tuple composed of two elements:
- new control input to apply to the agent's model
- new intent to store in the agent's awareness vector
"""
control_input, intent = control_input_and_intent
self._agent.model.control_input = control_input
self._agent.self_awareness.intent = intent
[docs]
class NullController(Controller, NullObject):
"""
Default controller used as a placeholder.
It is used when no controller is set for an agent.
An exception is raised if this object is used in any way.
"""
def __init__(self):
super().__init__(-1)
[docs]
def _compute(self):
pass
[docs]
class DefaultController(Controller):
"""
Default implementation of the controller.
It returns a zero vector of the right size as control input.
Args
----
agent_id:
Identifier of the agent this component belongs to
async_loop_lock:
Async loop lock to use for the controller
"""
def __init__(self, agent_id, async_loop_lock: "Tasynclooplock | None" = None):
super().__init__(agent_id, async_loop_lock)
self._control_input = np.zeros(0)
[docs]
def initialise_component(self, agent, initial_awareness_database, initial_knowledge_database):
super().initialise_component(agent, initial_awareness_database, initial_knowledge_database)
self._control_input = np.zeros(agent.model.control_input_shape)
[docs]
def _compute(self):
return self._control_input, TimeSeries()