Source code for symaware.simulators.pymunk.environment

from typing import TYPE_CHECKING

import numpy as np
import pygame
import pymunk
import pymunk.pygame_util
from symaware.base.models import Environment as BaseEnvironment
from symaware.base.utils import TimeIntervalAsyncLoopLock, get_logger, log

from .entities import Entity

if TYPE_CHECKING:
    # String type hinting to support python 3.9
    from symaware.base.utils import AsyncLoopLock


[docs] class Environment(BaseEnvironment): """ Environment based on the Pymunk physics engine. Args ---- connection_method: Method used to connect to the pymunk server. See the pymunk documentation for more information. real_time_interval: If set to a strictly positive value, pymunk will run the simulation in real time. Otherwise, the simulation will run when :func:`step` is called. async_loop_lock: Async loop lock to use for the environment """ __LOGGER = get_logger(__name__, "PymunkEnvironment") def __init__( self, real_time_interval: float = 0, visualise: bool = True, async_loop_lock: "AsyncLoopLock | None" = None, ): super().__init__(async_loop_lock) self._real_time_interval = real_time_interval if self._real_time_interval <= 0 and isinstance(async_loop_lock, TimeIntervalAsyncLoopLock): self._real_time_interval = async_loop_lock.time_interval self._is_pymunk_initialized = False self._visualise = visualise self._space = pymunk.Space() if self._visualise: self._screen: pygame.Surface self._draw_options: pymunk.pygame_util.DrawOptions
[docs] @log(__LOGGER) def get_entity_state(self, entity: Entity) -> np.ndarray: if not isinstance(entity, Entity): raise TypeError(f"Expected PymunkSpatialEntity, got {type(entity)}") return np.array( [ entity.body.position.x, entity.body.position.y, entity.body.velocity.x, entity.body.velocity.y, entity.body.force.x, entity.body.force.y, entity.body.angular_velocity, entity.body.angle, ] )
[docs] @log(__LOGGER) def _add_entity(self, entity: Entity): if not isinstance(entity, Entity): raise TypeError(f"Expected PymunkSpatialEntity, got {type(entity)}") entity.initialise(self._space)
[docs] @log(__LOGGER) def step(self): if not self._is_pymunk_initialized: self.initialise() for entity in self._agent_entities.values(): entity.step() if self._visualise: self._screen.fill(pygame.Color("black")) self._space.debug_draw(self._draw_options) self._space.step(self._real_time_interval) if self._visualise: pygame.display.flip()
[docs] @log(__LOGGER) def initialise(self): if self._is_pymunk_initialized: return self._is_pymunk_initialized = True if self._visualise: pygame.init() self._screen = pygame.display.set_mode((600, 600)) self._draw_options = pymunk.pygame_util.DrawOptions(self._screen)
[docs] def stop(self): self._is_pymunk_initialized = False if self._visualise: pygame.quit()