from typing import TYPE_CHECKING
import numpy as np
import pygame
import pymunk
import pymunk.pygame_util
from symaware.base.models import Environment as BaseEnvironment
from symaware.base.utils import TimeIntervalAsyncLoopLock, get_logger, log
from .entities import Entity
if TYPE_CHECKING:
# String type hinting to support python 3.9
from symaware.base.utils import AsyncLoopLock
[docs]
class Environment(BaseEnvironment):
"""
Environment based on the Pymunk physics engine.
Args
----
connection_method:
Method used to connect to the pymunk server. See the pymunk documentation for more information.
real_time_interval:
If set to a strictly positive value, pymunk will run the simulation in real time.
Otherwise, the simulation will run when :func:`step` is called.
async_loop_lock:
Async loop lock to use for the environment
"""
__LOGGER = get_logger(__name__, "PymunkEnvironment")
def __init__(
self,
real_time_interval: float = 0,
visualise: bool = True,
async_loop_lock: "AsyncLoopLock | None" = None,
):
super().__init__(async_loop_lock)
self._real_time_interval = real_time_interval
if self._real_time_interval <= 0 and isinstance(async_loop_lock, TimeIntervalAsyncLoopLock):
self._real_time_interval = async_loop_lock.time_interval
self._is_pymunk_initialized = False
self._visualise = visualise
self._space = pymunk.Space()
if self._visualise:
self._screen: pygame.Surface
self._draw_options: pymunk.pygame_util.DrawOptions
[docs]
@log(__LOGGER)
def get_entity_state(self, entity: Entity) -> np.ndarray:
if not isinstance(entity, Entity):
raise TypeError(f"Expected PymunkSpatialEntity, got {type(entity)}")
return np.array(
[
entity.body.position.x,
entity.body.position.y,
entity.body.velocity.x,
entity.body.velocity.y,
entity.body.force.x,
entity.body.force.y,
entity.body.angular_velocity,
entity.body.angle,
]
)
[docs]
@log(__LOGGER)
def _add_entity(self, entity: Entity):
if not isinstance(entity, Entity):
raise TypeError(f"Expected PymunkSpatialEntity, got {type(entity)}")
entity.initialise(self._space)
[docs]
@log(__LOGGER)
def step(self):
if not self._is_pymunk_initialized:
self.initialise()
for entity in self._agent_entities.values():
entity.step()
if self._visualise:
self._screen.fill(pygame.Color("black"))
self._space.debug_draw(self._draw_options)
self._space.step(self._real_time_interval)
if self._visualise:
pygame.display.flip()
[docs]
@log(__LOGGER)
def initialise(self):
if self._is_pymunk_initialized:
return
self._is_pymunk_initialized = True
if self._visualise:
pygame.init()
self._screen = pygame.display.set_mode((600, 600))
self._draw_options = pymunk.pygame_util.DrawOptions(self._screen)
[docs]
def stop(self):
self._is_pymunk_initialized = False
if self._visualise:
pygame.quit()