Source code for symaware.simulators.pybullet.environment

import os
from typing import TYPE_CHECKING

import numpy as np
import pybullet as p
import pybullet_data
from symaware.base.models import Environment as BaseEnvironment
from symaware.base.utils import get_logger, log

from .entities import Entity

if TYPE_CHECKING:
    # String type hinting to support python 3.9
    from symaware.base.utils import AsyncLoopLock


[docs] class Environment(BaseEnvironment): """ Environment based on the PyBullet physics engine. Args ---- real_time_interval: If set to a strictly positive value, pybullet will run the simulation in real time. Otherwise, the simulation will run when :func:`step` is called. connection_method: Method used to connect to the pybullet server. See the pybullet documentation for more information. plane_scaling: Scale of the plane automatically added to the environment to represent the ground. If set to a non positive value, the plane will not be added. gravity: Set the gravity the physics engine will apply at each simulation step. It can be a single floating point value, in which case it will be applied along the third axis, or a tuple of three values, one for each axis. async_loop_lock: Async loop lock to use for the environment """ __LOGGER = get_logger(__name__, "Pybullet.Environment") def __init__( self, real_time_interval: float = 0, connection_method: int = p.GUI, plane_scaling: float = 1.0, gravity: "float | tuple[float, float, float]" = (0, 0, -9.8), async_loop_lock: "AsyncLoopLock | None" = None, ): super().__init__(async_loop_lock) self._is_pybullet_initialized = False self._real_time_interval = real_time_interval self._connection_method = connection_method self._plane_scaling = plane_scaling self._gravity = gravity if isinstance(gravity, tuple) else (0, 0, gravity) @property def use_real_time(self) -> bool: return self._real_time_interval > 0
[docs] @log(__LOGGER) def get_entity_state(self, entity: Entity) -> np.ndarray: if not isinstance(entity, Entity): raise TypeError(f"Expected SpatialEntity, got {type(entity)}") position, orientation = p.getBasePositionAndOrientation(entity.entity_id) return np.array(position + orientation)
[docs] @log(__LOGGER) def _add_entity(self, entity: Entity): if not isinstance(entity, Entity): raise TypeError(f"Expected SpatialEntity, got {type(entity)}") if not self._is_pybullet_initialized: self.initialise() entity.initialise()
[docs] def initialise(self): if self._is_pybullet_initialized: return self._notify("initialising", self) self._is_pybullet_initialized = True p.connect(self._connection_method) p.resetSimulation() p.setRealTimeSimulation(self.use_real_time) p.setGravity(self._gravity[0], self._gravity[1], self._gravity[2]) if self._plane_scaling > 0: p.loadURDF(os.path.join(pybullet_data.getDataPath(), "plane.urdf"), globalScaling=self._plane_scaling) self._notify("initialised", self)
[docs] def step(self): self._notify("stepping", self) super().step() for entity in self._agent_entities.values(): entity.step() if not self.use_real_time: p.stepSimulation() self._notify("stepped", self)
[docs] def stop(self): self._notify("stopping", self) self._is_pybullet_initialized = False p.disconnect() self._notify("stopped", self)
[docs] def set_debug_camera_position( self, distance: float, yaw: float, pitch: float, position: "tuple[float, float, float]" ): """ Set the position of the debug camera in the pybullet environment. Args ---- distance: Distance from the target yaw: Yaw angle of the camera pitch: Pitch angle of the camera position: Position of the camera """ p.resetDebugVisualizerCamera(distance, yaw, pitch, position)
[docs] def configure_debug_visualizer( self, flag: int, enable: bool, light_position: "tuple[float, float, float]", shadow_map_resolution: int, shadow_map_size: int, shadow_map_intensity: float, rgb_background: "tuple[float, float, float]", ): """ Configure the debug visualizer in the pybullet environment Args ---- flag: Flag to set enable: Enable or disable the debug visualizer light_position: Position of the light shadow_map_resolution: Resolution of the shadow map shadow_map_size: Size of the shadow map shadow_map_intensity: Intensity of the shadow map rgb_background: Background color """ p.configureDebugVisualizer( flag, enable, light_position, shadow_map_resolution, shadow_map_size, shadow_map_intensity, rgb_background, )
[docs] def disable_debug_visualizer(self): """Disable the debug visualizer in the pybullet environment.""" p.configureDebugVisualizer(p.COV_ENABLE_GUI, False)
[docs] def take_screenshot( self, width: int = 1440, height: int = 1120, shadow: bool = False, renderer: int = p.ER_TINY_RENDERER ) -> np.ndarray: """ Take a screenshot of the current view of the camera and return it as a 3-dimensional numpy array of (height x width x rgba). The rgba values are in the interval [0, 1]. An image produced this way can be saved on the disk using the matplotib utility. Example ------- >>> # doctest: +SKIP >>> import pybullet as p >>> import matplotlib.pyplot as plt >>> from symaware.simulators.pybullet import Environment >>> >>> env = Environment(connection_method=p.DIRECT) >>> img = env.take_screenshot(width=1080, height=720) >>> plt.imsave("my_image", img) Args ---- width: Width of the image height: Height of the image shadow: Whether to capture the shadows of the image renderer: Underlying renderer used by pybullet Returns: 3-dimensional numpy array containing the screenshot of the simulation """ _, _, img, _, _ = p.getCameraImage(width=width, height=height, shadow=shadow, renderer=renderer) return np.reshape(img, (height, width, 4)) * 1.0 / 255.0