Software design
High level goal
The goal is to create a collection python packages an user can easily install on their machine.
The software must allow for different component implementations to be swapped easily.
Agent structure
Package architecture
All packages will be namespaced under the symaware
namespace.
From the symaware.base
package, each team will then develop their own implementation of one or more elements of the system.
--- title: Package architecture from the prospective of the user --- flowchart RL subgraph symawarep["symaware (namepsace)"] symaware["symaware.symaware"] end classDef background fill:#00000015 class symawarep background;
--- title: The base package provides the abstract interface of the system --- flowchart RL subgraph symawarep["symaware (namepsace)"] base[symaware.base] symaware[symaware.symaware] end symaware --> base classDef background fill:#00000015 class symawarep background;
--- title: Each team will build their own implementation of some elements of the system for them to be combined together in the public package --- flowchart RL subgraph symawarep["symaware (namepsace)"] base[symaware.] mpi[symaware.mpi] kth[symaware.kth] tue[symaware.tue] uu[symaware.uu] nlr[symaware.nlr] sisw[symaware.sisw] base[symaware.base] symaware[symaware.symaware] end mpi --> base kth --> base tue --> base uu --> base nlr --> base sisw --> base symaware --> mpi symaware --> kth symaware --> tue symaware --> uu symaware --> nlr symaware --> sisw classDef background fill:#00000015 class symawarep background;
Software design of symaware.base
The main elements of the software have been divided in subpackages to enforce a coarse but clear separation of concerns.
--- title: Explicit dependencies --- flowchart TB user{{User}} subgraph base["symaware.base"] direction TB agent([base.Agent]) simulators[base.simulators] components[base.components] models[base.models] utils[base.utils] data[base.data] end user --> agent user --> simulators agent --> models agent --> components agent --> data simulators --> components simulators --> models simulators --> data components --> utils components --> models components --> data models --> utils models --> data classDef background fill:#00000015 classDef yellow stroke:#50623A,stroke-width:1px classDef red stroke:red,stroke-width:1px classDef green stroke:green,stroke-width:1px classDef blue stroke:blue,stroke-width:1px classDef orange stroke:orange,stroke-width:1px classDef magenta stroke:magenta,stroke-width:1px class simulators red; class agent orange; class components green; class models blue; class utils yellow; class data magenta; class base background;
--- title: Assuming transitive dependencies --- flowchart TB user{{User}} subgraph base["symaware.base"] direction TB agent([base.Agent]) simulators[base.simulators] components[base.components] models[base.models] data[base.data] utils[base.utils] end user --> agent user --> simulators agent --> components simulators --> components components --> models models --> data models --> utils classDef background fill:#00000015 classDef yellow stroke:#50623A,stroke-width:1px classDef red stroke:red,stroke-width:1px classDef green stroke:green,stroke-width:1px classDef blue stroke:blue,stroke-width:1px classDef orange stroke:orange,stroke-width:1px classDef magenta stroke:magenta,stroke-width:1px class simulators red; class agent orange; class components green; class models blue; class utils yellow; class data magenta; class base background;
--- title: Explicit dependencies --- flowchart TB user{{User}} subgraph base["symaware.base"] direction TB agent([base.Agent]) subgraph simulators[base.simulators] direction TB simulator_pybullet([simulators.pybullet]) simulator_pymunk([simulators.pymunk]) end subgraph components[base.components] direction TB controller([components.Controller]) perception_system([components.PerceptionSystem]) communication_system([components.CommunicationSystem]) risk_evaluator([components.RiskEvaluator]) uncertainty_evaluator([components.UncertaintyEvaluator]) end subgraph models[base.models] direction TB dynamical_model([models.DynamicModel]) environment([models.Environment]) entity([models.Entity]) end subgraph utils[base.utils] direction TB logger[utils.log] end subgraph data[base.data] direction TB knowledge([data.Knowledge]) awareness_vector([data.AwarenessVector]) end end user --> agent user --> simulators agent --> components simulators --> components components --> models models --> data models --> utils classDef background fill:#00000015 classDef yellow stroke:#50623A,stroke-width:1px classDef red stroke:red,stroke-width:1px classDef green stroke:green,stroke-width:1px classDef blue stroke:blue,stroke-width:1px classDef orange stroke:orange,stroke-width:1px classDef magenta stroke:magenta,stroke-width:1px class simulators red; class agent orange; class components green; class models blue; class utils yellow; class data magenta; class base,simulators,components,models,utils,data background;
Sequence diagram
The following sequence diagram shows the interaction between the different components of the system.
sequenceDiagram participant p as Perception System participant cs as Communication System participant a as Agent participant ru as Risk Evaluator<br>Uncertainty Evaluator participant c as Controller p ->> a: Perceptual Information cs ->> a: Received Communication note over a: InfoUpdater<br>State = Awareness + Knowledge a ->> ru: Current state ru ->> a: Risk/Uncertainty a ->> c: Updated state c ->> a: Chosen action a ->> cs: Updated state
Asynchronous model
Instead of relying in a strict sequence of events, the system is designed to be asynchronous (asyncio is used for this purpose).
Each component is independent and can run concurrently with the others, with its own fire frequency or event trigger.
Most of the added complexity is hidden in the symaware.base
.
Components can be developed only using standard, synchronous code.
AsyncLoopLock
The AsyncLoopLock
class determines how often the component will run.
TimeIntervalAsyncLoopLock
runs the component at a fixed intervalEventAsyncLoopLock
runs the component when a specific event is triggeredDefaultAsyncLoopLock
the component will ruu continuously. Needs to be used in combination with a custom lock mechanism
See the AsyncLoopLock documentation or the example for more information.
Extending the system
The system is designed to be easily extensible.
There are two core aspects to this:
Adding new components: components determine the behavior of the agent
Adding new models: models simulate the environment and the physical state of the system
Adding new components
To add a new component, you must define a new class that inherits from the specific component they want to extend, which in turns inherits from symaware.base.components.Component
.
The new component must implement its specific behavior in the abstract method the superclass provides.
For more information and examples, see the Component documentation or the components subpackage.
Example: adding a new controller
from symaware.base import Controller
class MyController(Controller):
def __init__(self, agent_id, async_loop_lock = None):
super().__init__(agent_id, async_loop_lock)
self._control_input = np.zeros(0)
def initialise_component(self, agent, initial_awareness_database, initial_knowledge_database):
# Custom initialisation
super().initialise_component(agent, initial_awareness_database, initial_knowledge_database)
self._control_input = np.zeros(agent.model.control_input_shape)
def _compute_control_input(self, awareness_database, knowledge_database):
# Controller specific implementation
return self._control_input, TimeSeries()
Example: minimal controller
from symaware.base import Controller
class MyController(Controller):
def _compute_control_input(self, awareness_database, knowledge_database):
# Controller specific implementation
return np.zeros(self._agent.model.control_input_shape), TimeSeries()
Adding new models
Adding a new model is slightly more involved
It requires at least tree classes to be defined:
Environment
: the environment in which the agent operatesEntity
: the entities that populate the environmentDynamicModel
: the dynamic model of the entity
For more information and examples, see the Model documentation or the simulators subpackage.
Example: adding a new environment
from symaware.base import Environment
class PyBulletEnvironment(Environment):
def __init__(self, async_loop_lock = None):
super().__init__(async_loop_lock)
self._initialise_pybullet()
def _initialise_pybullet(self):
p.connect(p.GUI)
# ... more initialisation code
def get_entity_state(self, entity: Entity) -> np.ndarray:
return np.array(p.getBasePositionAndOrientation(entity.entity_id))
def _add_entity(self, entity: Entity):
entity.initialise()
def step(self):
for entity in self._agent_entities.values():
entity.step()
p.stepSimulation()
Example: adding a new entity
from symaware.base import Entity
@dataclass
class PybulletSphere(Entity):
model: PybulletDynamicalModel = field(default_factory=NullDynamicalModel)
pos: np.ndarray
angle: np.array
radius: float
def initialise(self):
col_id = p.createCollisionShape(p.GEOM_SPHERE, radius=self.radius)
vis_id = p.createVisualShape(p.GEOM_SPHERE, radius=self.radius)
entity_id = p.createMultiBody(1, col_id, vis_id, self.pos, self.angle)
if not isinstance(self.model, NullDynamicalModel):
self.model.initialise(entity_id)
Example: adding a new dynamic model
from symaware.base import DynamicModel
class PybulletRacecarModel(DynamicModel):
def __init__(self, ID, max_force):
super().__init__(ID, control_input=np.zeros(2), state=np.zeros(7))
@property
def subinputs_dict(self) -> PybulletRacecarModelSubinputs:
return {"velocity": self.control_input[0], "angle": self.control_input[1]}
def initialise(self, entity_id: int):
self._entity_id = entity_id
def step(self):
target_velocity, steering_angle = self._control_input
# Just steer the front wheels
for steer in (0, 2):
p.setJointMotorControl2(self._entity_id, steer, p.POSITION_CONTROL, targetPosition=steering_angle)