import os
from typing import TYPE_CHECKING
import numpy as np
import pybullet as p
import pybullet_data
from symaware.base.models import Environment as BaseEnvironment
from symaware.base.utils import get_logger, log
from .entities import Entity
if TYPE_CHECKING:
# String type hinting to support python 3.9
from symaware.base.utils import AsyncLoopLock
[docs]
class Environment(BaseEnvironment):
"""
Environment based on the PyBullet physics engine.
Args
----
real_time_interval:
If set to a strictly positive value, pybullet will run the simulation in real time.
Otherwise, the simulation will run when :func:`step` is called.
connection_method:
Method used to connect to the pybullet server. See the pybullet documentation for more information.
plane_scaling:
Scale of the plane automatically added to the environment to represent the ground.
If set to a non positive value, the plane will not be added.
gravity:
Set the gravity the physics engine will apply at each simulation step.
It can be a single floating point value, in which case it will be applied along the third axis,
or a tuple of three values, one for each axis.
async_loop_lock:
Async loop lock to use for the environment
"""
__LOGGER = get_logger(__name__, "Pybullet.Environment")
def __init__(
self,
real_time_interval: float = 0,
connection_method: int = p.GUI,
plane_scaling: float = 1.0,
gravity: "float | tuple[float, float, float]" = (0, 0, -9.8),
async_loop_lock: "AsyncLoopLock | None" = None,
):
super().__init__(async_loop_lock)
self._is_pybullet_initialized = False
self._real_time_interval = real_time_interval
self._connection_method = connection_method
self._plane_scaling = plane_scaling
self._gravity = gravity if isinstance(gravity, tuple) else (0, 0, gravity)
@property
def use_real_time(self) -> bool:
return self._real_time_interval > 0
[docs]
@log(__LOGGER)
def get_entity_state(self, entity: Entity) -> np.ndarray:
if not isinstance(entity, Entity):
raise TypeError(f"Expected SpatialEntity, got {type(entity)}")
position, orientation = p.getBasePositionAndOrientation(entity.entity_id)
return np.array(position + orientation)
[docs]
@log(__LOGGER)
def _add_entity(self, entity: Entity):
if not isinstance(entity, Entity):
raise TypeError(f"Expected SpatialEntity, got {type(entity)}")
if not self._is_pybullet_initialized:
self.initialise()
entity.initialise()
[docs]
def initialise(self):
if self._is_pybullet_initialized:
return
self._notify("initialising", self)
self._is_pybullet_initialized = True
p.connect(self._connection_method)
p.resetSimulation()
p.setRealTimeSimulation(self.use_real_time)
p.setGravity(self._gravity[0], self._gravity[1], self._gravity[2])
if self._plane_scaling > 0:
p.loadURDF(os.path.join(pybullet_data.getDataPath(), "plane.urdf"), globalScaling=self._plane_scaling)
self._notify("initialised", self)
[docs]
def step(self):
self._notify("stepping", self)
super().step()
for entity in self._agent_entities.values():
entity.step()
if not self.use_real_time:
p.stepSimulation()
self._notify("stepped", self)
[docs]
def stop(self):
self._notify("stopping", self)
self._is_pybullet_initialized = False
p.disconnect()
self._notify("stopped", self)
[docs]
def set_debug_camera_position(
self, distance: float, yaw: float, pitch: float, position: "tuple[float, float, float]"
):
"""
Set the position of the debug camera in the pybullet environment.
Args
----
distance:
Distance from the target
yaw:
Yaw angle of the camera
pitch:
Pitch angle of the camera
position:
Position of the camera
"""
p.resetDebugVisualizerCamera(distance, yaw, pitch, position)
[docs]
def disable_debug_visualizer(self):
"""Disable the debug visualizer in the pybullet environment."""
p.configureDebugVisualizer(p.COV_ENABLE_GUI, False)
[docs]
def take_screenshot(
self, width: int = 1440, height: int = 1120, shadow: bool = False, renderer: int = p.ER_TINY_RENDERER
) -> np.ndarray:
"""
Take a screenshot of the current view of the camera and return it as a 3-dimensional numpy array
of (height x width x rgba).
The rgba values are in the interval [0, 1].
An image produced this way can be saved on the disk using the matplotib utility.
Example
-------
>>> # doctest: +SKIP
>>> import pybullet as p
>>> import matplotlib.pyplot as plt
>>> from symaware.simulators.pybullet import Environment
>>>
>>> env = Environment(connection_method=p.DIRECT)
>>> img = env.take_screenshot(width=1080, height=720)
>>> plt.imsave("my_image", img)
Args
----
width:
Width of the image
height:
Height of the image
shadow:
Whether to capture the shadows of the image
renderer:
Underlying renderer used by pybullet
Returns:
3-dimensional numpy array containing the screenshot of the simulation
"""
_, _, img, _, _ = p.getCameraImage(width=width, height=height, shadow=shadow, renderer=renderer)
return np.reshape(img, (height, width, 4)) * 1.0 / 255.0