Software design
High level goal
The goal is to create a collection python packages an user can easily install on their machine.
The software must allow for different component implementations to be swapped easily.
Agent structure

Package architecture
All packages will be namespaced under the symaware namespace.
From the symaware.base package, each team will then develop their own implementation of one or more elements of the system.
---
title: Package architecture from the prospective of the user
---
flowchart RL
subgraph symawarep["symaware (namepsace)"]
symaware["symaware.symaware"]
end
classDef background fill:#00000015
class symawarep background;
---
title: The base package provides the abstract interface of the system
---
flowchart RL
subgraph symawarep["symaware (namepsace)"]
base[symaware.base]
symaware[symaware.symaware]
end
symaware --> base
classDef background fill:#00000015
class symawarep background;
---
title: Each team will build their own implementation of some elements of the system for them to be combined together in the public package
---
flowchart RL
subgraph symawarep["symaware (namepsace)"]
base[symaware.]
mpi[symaware.mpi]
kth[symaware.kth]
tue[symaware.tue]
uu[symaware.uu]
nlr[symaware.nlr]
sisw[symaware.sisw]
base[symaware.base]
symaware[symaware.symaware]
end
mpi --> base
kth --> base
tue --> base
uu --> base
nlr --> base
sisw --> base
symaware --> mpi
symaware --> kth
symaware --> tue
symaware --> uu
symaware --> nlr
symaware --> sisw
classDef background fill:#00000015
class symawarep background;
Software design of symaware.base
The main elements of the software have been divided in subpackages to enforce a coarse but clear separation of concerns.
---
title: Explicit dependencies
---
flowchart TB
user{{User}}
subgraph base["symaware.base"]
direction TB
agent([base.Agent])
simulators[base.simulators]
components[base.components]
models[base.models]
utils[base.utils]
data[base.data]
end
user --> agent
user --> simulators
agent --> models
agent --> components
agent --> data
simulators --> components
simulators --> models
simulators --> data
components --> utils
components --> models
components --> data
models --> utils
models --> data
classDef background fill:#00000015
classDef yellow stroke:#50623A,stroke-width:1px
classDef red stroke:red,stroke-width:1px
classDef green stroke:green,stroke-width:1px
classDef blue stroke:blue,stroke-width:1px
classDef orange stroke:orange,stroke-width:1px
classDef magenta stroke:magenta,stroke-width:1px
class simulators red;
class agent orange;
class components green;
class models blue;
class utils yellow;
class data magenta;
class base background;
---
title: Assuming transitive dependencies
---
flowchart TB
user{{User}}
subgraph base["symaware.base"]
direction TB
agent([base.Agent])
simulators[base.simulators]
components[base.components]
models[base.models]
data[base.data]
utils[base.utils]
end
user --> agent
user --> simulators
agent --> components
simulators --> components
components --> models
models --> data
models --> utils
classDef background fill:#00000015
classDef yellow stroke:#50623A,stroke-width:1px
classDef red stroke:red,stroke-width:1px
classDef green stroke:green,stroke-width:1px
classDef blue stroke:blue,stroke-width:1px
classDef orange stroke:orange,stroke-width:1px
classDef magenta stroke:magenta,stroke-width:1px
class simulators red;
class agent orange;
class components green;
class models blue;
class utils yellow;
class data magenta;
class base background;
---
title: Explicit dependencies
---
flowchart TB
user{{User}}
subgraph base["symaware.base"]
direction TB
agent([base.Agent])
subgraph simulators[base.simulators]
direction TB
simulator_pybullet([simulators.pybullet])
simulator_pymunk([simulators.pymunk])
end
subgraph components[base.components]
direction TB
controller([components.Controller])
perception_system([components.PerceptionSystem])
communication_system([components.CommunicationSystem])
risk_evaluator([components.RiskEvaluator])
uncertainty_evaluator([components.UncertaintyEvaluator])
end
subgraph models[base.models]
direction TB
dynamical_model([models.DynamicModel])
environment([models.Environment])
entity([models.Entity])
end
subgraph utils[base.utils]
direction TB
logger[utils.log]
end
subgraph data[base.data]
direction TB
knowledge([data.Knowledge])
awareness_vector([data.AwarenessVector])
end
end
user --> agent
user --> simulators
agent --> components
simulators --> components
components --> models
models --> data
models --> utils
classDef background fill:#00000015
classDef yellow stroke:#50623A,stroke-width:1px
classDef red stroke:red,stroke-width:1px
classDef green stroke:green,stroke-width:1px
classDef blue stroke:blue,stroke-width:1px
classDef orange stroke:orange,stroke-width:1px
classDef magenta stroke:magenta,stroke-width:1px
class simulators red;
class agent orange;
class components green;
class models blue;
class utils yellow;
class data magenta;
class base,simulators,components,models,utils,data background;
Sequence diagram
The following sequence diagram shows the interaction between the different components of the system.
sequenceDiagram
participant p as Perception System
participant cs as Communication System
participant a as Agent
participant ru as Risk Evaluator<br>Uncertainty Evaluator
participant c as Controller
p ->> a: Perceptual Information
cs ->> a: Received Communication
note over a: InfoUpdater<br>State = Awareness + Knowledge
a ->> ru: Current state
ru ->> a: Risk/Uncertainty
a ->> c: Updated state
c ->> a: Chosen action
a ->> cs: Updated state
Asynchronous model
Instead of relying in a strict sequence of events, the system is designed to be asynchronous (asyncio is used for this purpose).
Each component is independent and can run concurrently with the others, with its own fire frequency or event trigger.
Most of the added complexity is hidden in the symaware.base.
Components can be developed only using standard, synchronous code.
AsyncLoopLock
The AsyncLoopLock class determines how often the component will run.
TimeIntervalAsyncLoopLockruns the component at a fixed intervalEventAsyncLoopLockruns the component when a specific event is triggeredDefaultAsyncLoopLockthe component will ruu continuously. Needs to be used in combination with a custom lock mechanism
See the AsyncLoopLock documentation or the example for more information.
Extending the system
The system is designed to be easily extensible.
There are two core aspects to this:
Adding new components: components determine the behavior of the agent
Adding new models: models simulate the environment and the physical state of the system
Adding new components
To add a new component, you must define a new class that inherits from the specific component they want to extend, which in turns inherits from symaware.base.components.Component.
The new component must implement its specific behavior in the abstract method the superclass provides.
For more information and examples, see the Component documentation or the components subpackage.
Example: adding a new controller
from symaware.base import Controller
class MyController(Controller):
def __init__(self, agent_id, async_loop_lock = None):
super().__init__(agent_id, async_loop_lock)
self._control_input = np.zeros(0)
def initialise_component(self, agent, initial_awareness_database, initial_knowledge_database):
# Custom initialisation
super().initialise_component(agent, initial_awareness_database, initial_knowledge_database)
self._control_input = np.zeros(agent.model.control_input_shape)
def _compute_control_input(self, awareness_database, knowledge_database):
# Controller specific implementation
return self._control_input, TimeSeries()
Example: minimal controller
from symaware.base import Controller
class MyController(Controller):
def _compute_control_input(self, awareness_database, knowledge_database):
# Controller specific implementation
return np.zeros(self._agent.model.control_input_shape), TimeSeries()
Adding new models
Adding a new model is slightly more involved
It requires at least tree classes to be defined:
Environment: the environment in which the agent operatesEntity: the entities that populate the environmentDynamicModel: the dynamic model of the entity
For more information and examples, see the Model documentation or the simulators subpackage.
Example: adding a new environment
from symaware.base import Environment
class PyBulletEnvironment(Environment):
def __init__(self, async_loop_lock = None):
super().__init__(async_loop_lock)
self._initialise_pybullet()
def _initialise_pybullet(self):
p.connect(p.GUI)
# ... more initialisation code
def get_entity_state(self, entity: Entity) -> np.ndarray:
return np.array(p.getBasePositionAndOrientation(entity.entity_id))
def _add_entity(self, entity: Entity):
entity.initialise()
def step(self):
for entity in self._agent_entities.values():
entity.step()
p.stepSimulation()
Example: adding a new entity
from symaware.base import Entity
@dataclass
class PybulletSphere(Entity):
model: PybulletDynamicalModel = field(default_factory=NullDynamicalModel)
pos: np.ndarray
angle: np.array
radius: float
def initialise(self):
col_id = p.createCollisionShape(p.GEOM_SPHERE, radius=self.radius)
vis_id = p.createVisualShape(p.GEOM_SPHERE, radius=self.radius)
entity_id = p.createMultiBody(1, col_id, vis_id, self.pos, self.angle)
if not isinstance(self.model, NullDynamicalModel):
self.model.initialise(entity_id)
Example: adding a new dynamic model
from symaware.base import DynamicModel
class PybulletRacecarModel(DynamicModel):
def __init__(self, ID, max_force):
super().__init__(ID, control_input=np.zeros(2), state=np.zeros(7))
@property
def subinputs_dict(self) -> PybulletRacecarModelSubinputs:
return {"velocity": self.control_input[0], "angle": self.control_input[1]}
def initialise(self, entity_id: int):
self._entity_id = entity_id
def step(self):
target_velocity, steering_angle = self._control_input
# Just steer the front wheels
for steer in (0, 2):
p.setJointMotorControl2(self._entity_id, steer, p.POSITION_CONTROL, targetPosition=steering_angle)