Software design

High level goal

The goal is to create a collection python packages an user can easily install on their machine.

The software must allow for different component implementations to be swapped easily.

Agent structure

Agent

Package architecture

All packages will be namespaced under the symaware namespace. From the symaware.base package, each team will then develop their own implementation of one or more elements of the system.

--- title: Package architecture from the prospective of the user --- flowchart RL subgraph symawarep["symaware (namepsace)"] symaware["symaware.symaware"] end classDef background fill:#00000015 class symawarep background;
--- title: The base package provides the abstract interface of the system --- flowchart RL subgraph symawarep["symaware (namepsace)"] base[symaware.base] symaware[symaware.symaware] end symaware --> base classDef background fill:#00000015 class symawarep background;
--- title: | Each team will build their own implementation of some elements of the system for them to be combined together in the public package --- flowchart RL subgraph symawarep["symaware (namepsace)"] base[symaware.] mpi[symaware.mpi] kth[symaware.kth] tue[symaware.tue] uu[symaware.uu] nlr[symaware.nlr] sisw[symaware.sisw] base[symaware.base] symaware[symaware.symaware] end mpi --> base kth --> base tue --> base uu --> base nlr --> base sisw --> base symaware --> mpi symaware --> kth symaware --> tue symaware --> uu symaware --> nlr symaware --> sisw classDef background fill:#00000015 class symawarep background;

Software design of symaware.base

The main elements of the software have been divided in subpackages to enforce a coarse but clear separation of concerns.

--- title: Explicit dependencies --- flowchart TB user{{User}} subgraph base["symaware.base"] direction TB agent([base.Agent]) simulators[base.simulators] components[base.components] models[base.models] utils[base.utils] data[base.data] end user --> agent user --> simulators agent --> models agent --> components agent --> data simulators --> components simulators --> models simulators --> data components --> utils components --> models components --> data models --> utils models --> data classDef background fill:#00000015 classDef yellow stroke:#50623A,stroke-width:1px classDef red stroke:red,stroke-width:1px classDef green stroke:green,stroke-width:1px classDef blue stroke:blue,stroke-width:1px classDef orange stroke:orange,stroke-width:1px classDef magenta stroke:magenta,stroke-width:1px class simulators red; class agent orange; class components green; class models blue; class utils yellow; class data magenta; class base background;
--- title: Assuming transitive dependencies --- flowchart TB user{{User}} subgraph base["symaware.base"] direction TB agent([base.Agent]) simulators[base.simulators] components[base.components] models[base.models] data[base.data] utils[base.utils] end user --> agent user --> simulators agent --> components simulators --> components components --> models models --> data models --> utils classDef background fill:#00000015 classDef yellow stroke:#50623A,stroke-width:1px classDef red stroke:red,stroke-width:1px classDef green stroke:green,stroke-width:1px classDef blue stroke:blue,stroke-width:1px classDef orange stroke:orange,stroke-width:1px classDef magenta stroke:magenta,stroke-width:1px class simulators red; class agent orange; class components green; class models blue; class utils yellow; class data magenta; class base background;
--- title: Explicit dependencies --- flowchart TB user{{User}} subgraph base["symaware.base"] direction TB agent([base.Agent]) subgraph simulators[base.simulators] direction TB simulator_pybullet([simulators.pybullet]) simulator_pymunk([simulators.pymunk]) end subgraph components[base.components] direction TB controller([components.Controller]) perception_system([components.PerceptionSystem]) communication_system([components.CommunicationSystem]) risk_evaluator([components.RiskEvaluator]) uncertainty_evaluator([components.UncertaintyEvaluator]) end subgraph models[base.models] direction TB dynamical_model([models.DynamicModel]) environment([models.Environment]) entity([models.Entity]) end subgraph utils[base.utils] direction TB logger[utils.log] end subgraph data[base.data] direction TB knowledge([data.Knowledge]) awareness_vector([data.AwarenessVector]) end end user --> agent user --> simulators agent --> components simulators --> components components --> models models --> data models --> utils classDef background fill:#00000015 classDef yellow stroke:#50623A,stroke-width:1px classDef red stroke:red,stroke-width:1px classDef green stroke:green,stroke-width:1px classDef blue stroke:blue,stroke-width:1px classDef orange stroke:orange,stroke-width:1px classDef magenta stroke:magenta,stroke-width:1px class simulators red; class agent orange; class components green; class models blue; class utils yellow; class data magenta; class base,simulators,components,models,utils,data background;

Sequence diagram

The following sequence diagram shows the interaction between the different components of the system.

sequenceDiagram participant p as Perception System participant cs as Communication System participant a as Agent participant ru as Risk Evaluator<br>Uncertainty Evaluator participant c as Controller p ->> a: Perceptual Information cs ->> a: Received Communication note over a: InfoUpdater<br>State = Awareness + Knowledge a ->> ru: Current state ru ->> a: Risk/Uncertainty a ->> c: Updated state c ->> a: Chosen action a ->> cs: Updated state

Asynchronous model

Instead of relying in a strict sequence of events, the system is designed to be asynchronous (asyncio is used for this purpose).

Each component is independent and can run concurrently with the others, with its own fire frequency or event trigger.

Most of the added complexity is hidden in the symaware.base. Components can be developed only using standard, synchronous code.

AsyncLoopLock

The AsyncLoopLock class determines how often the component will run.

  • TimeIntervalAsyncLoopLock runs the component at a fixed interval

  • EventAsyncLoopLock runs the component when a specific event is triggered

  • DefaultAsyncLoopLock the component will ruu continuously. Needs to be used in combination with a custom lock mechanism

See the AsyncLoopLock documentation or the example for more information.

Extending the system

The system is designed to be easily extensible.

There are two core aspects to this:

  • Adding new components: components determine the behavior of the agent

  • Adding new models: models simulate the environment and the physical state of the system

Adding new components

To add a new component, you must define a new class that inherits from the specific component they want to extend, which in turns inherits from symaware.base.components.Component.

The new component must implement its specific behavior in the abstract method the superclass provides.

For more information and examples, see the Component documentation or the components subpackage.

Example: adding a new controller

from symaware.base import Controller
class MyController(Controller):
    def __init__(self, agent_id, async_loop_lock = None):
        super().__init__(agent_id, async_loop_lock)
        self._control_input = np.zeros(0)

    def initialise_component(self, agent, initial_awareness_database, initial_knowledge_database):
        # Custom initialisation
        super().initialise_component(agent, initial_awareness_database, initial_knowledge_database)
        self._control_input = np.zeros(agent.model.control_input_shape)

    def _compute_control_input(self, awareness_database, knowledge_database):
        # Controller specific implementation
        return self._control_input, TimeSeries()

Example: minimal controller

from symaware.base import Controller

class MyController(Controller):

    def _compute_control_input(self, awareness_database, knowledge_database):
        # Controller specific implementation
        return np.zeros(self._agent.model.control_input_shape), TimeSeries()

Adding new models

Adding a new model is slightly more involved

It requires at least tree classes to be defined:

  • Environment: the environment in which the agent operates

  • Entity: the entities that populate the environment

  • DynamicModel: the dynamic model of the entity

For more information and examples, see the Model documentation or the simulators subpackage.

Example: adding a new environment

from symaware.base import Environment
class PyBulletEnvironment(Environment):
    def __init__(self, async_loop_lock = None):
        super().__init__(async_loop_lock)
        self._initialise_pybullet()

    def _initialise_pybullet(self):
        p.connect(p.GUI)
        # ... more initialisation code

    def get_entity_state(self, entity: Entity) -> np.ndarray:
        return np.array(p.getBasePositionAndOrientation(entity.entity_id))

    def _add_entity(self, entity: Entity):
        entity.initialise()

    def step(self):
        for entity in self._agent_entities.values():
            entity.step()
        p.stepSimulation()

Example: adding a new entity

from symaware.base import Entity
@dataclass
class PybulletSphere(Entity):
    model: PybulletDynamicalModel = field(default_factory=NullDynamicalModel)
    pos: np.ndarray
    angle: np.array
    radius: float

    def initialise(self):
        col_id = p.createCollisionShape(p.GEOM_SPHERE, radius=self.radius)
        vis_id = p.createVisualShape(p.GEOM_SPHERE, radius=self.radius)
        entity_id = p.createMultiBody(1, col_id, vis_id, self.pos, self.angle)
        if not isinstance(self.model, NullDynamicalModel):
            self.model.initialise(entity_id)

Example: adding a new dynamic model

from symaware.base import DynamicModel
class PybulletRacecarModel(DynamicModel):
    def __init__(self, ID, max_force):
        super().__init__(ID, control_input=np.zeros(2), state=np.zeros(7))
    @property
    def subinputs_dict(self) -> PybulletRacecarModelSubinputs:
        return {"velocity": self.control_input[0], "angle": self.control_input[1]}

    def initialise(self, entity_id: int):
        self._entity_id = entity_id

    def step(self):
        target_velocity, steering_angle = self._control_input
        # Just steer the front wheels
        for steer in (0, 2):
            p.setJointMotorControl2(self._entity_id, steer, p.POSITION_CONTROL, targetPosition=steering_angle)