Source code for symaware.simulators.carla.environment

from typing import TYPE_CHECKING

import carla
import numpy as np

from symaware.base import Identifier, TimeIntervalAsyncLoopLock
from symaware.base.models import Environment as BaseEnvironment
from symaware.base.utils import get_logger, log

from .entities import Entity
from .gizmo import Gizmo

if TYPE_CHECKING:
    # String type hinting to support python 3.9
    from symaware.base import Agent
    from symaware.base.utils import AsyncLoopLock


[docs] class Environment(BaseEnvironment): """ Environment based on the carla simulator. Args ---- real_time_interval: If set to a strictly positive value, pybullet will run the simulation in real time. Otherwise, the simulation will run when :func:`step` is called. connection_method: Method used to connect to the pybullet server. See the pybullet documentation for more information. plane_scaling: Scale of the plane automatically added to the environment to represent the ground. If set to a non positive value, the plane will not be added. gravity: Set the gravity the physics engine will apply at each simulation step. It can be a single floating point value, in which case it will be applied along the third axis, or a tuple of three values, one for each axis. async_loop_lock: Async loop lock to use for the environment """ __LOGGER = get_logger(__name__, "carla.Environment") def __init__( self, host: str = "127.0.0.1", port: int = 2000, connection_timeout: float = 5000, map: str = "Town01", spectator: bool = False, render: bool = True, time_interval: float = 0.0, async_loop_lock: "AsyncLoopLock | None" = None, ): super().__init__(async_loop_lock) self._is_carla_initialized = False self._client = carla.Client(host=host, port=port) self._client.set_timeout(connection_timeout) self._world_name = map self._original_settings: "carla.WorldSettings" = None self._world: "carla.World" = None self._spectator = spectator self._gizmos: "list[Gizmo]" = [] self._render = render self._dt = ( async_loop_lock.time_interval if isinstance(async_loop_lock, TimeIntervalAsyncLoopLock) and time_interval == 0.0 else time_interval )
[docs] def get_waypoints(self, distance: float = 5.0) -> "list[carla.Waypoint]": """ Get all the waypoints in the current map. Returns ------- List of waypoints in the current map """ assert self._is_carla_initialized, "Carla environment is not initialized. Call initialise() first." assert self._world is not None, "World is not initialized. Call initialise() first." assert distance > 0.0, "Distance must be strictly positive." return self._world.get_map().generate_waypoints(distance)
[docs] def get_entity_state(self, entity: Entity) -> np.ndarray: """Get the state of an entity. Args ---- entity: The entity to get the state of Returns ------- State array of the entity """ assert isinstance(entity, Entity), f"Expected Entity, got {type(entity)}" return entity.state
[docs] def get_agent_location(self, agent: "Identifier | Agent") -> "carla.Location": """Get the location of an agent. Args ---- agent: Agent identifier or Agent object Returns ------- Location of the agent """ if isinstance(agent, Identifier): return self.get_entity_location(self._agent_entities[agent]) return self.get_entity_location(self._agent_entities[agent.id])
[docs] def get_entity_direction(self, entity: Entity) -> carla.Vector3D: """Get the direction vector of an entity. Args ---- entity: The entity to get the direction of Returns ------- Direction vector of the entity """ assert isinstance(entity, Entity), f"Expected Entity, got {type(entity)}" return entity.direction
[docs] def get_agent_direction(self, agent: "Identifier | Agent") -> carla.Vector3D: """ Get the direction of an agent in the environment. The actual implementation should be done in the derived class, based on the simulated environment API. Args ---- agent: agent to get the direction of Returns ------- Direction of the agent within the environment """ if isinstance(agent, Identifier): return self._agent_entities[agent].direction return self._agent_entities[agent.id].direction
[docs] def get_entity_location(self, entity: Entity) -> "carla.Location": """Get the location of an entity. Args ---- entity: The entity to get the location of Returns ------- Location of the entity """ assert isinstance(entity, Entity), f"Expected Entity, got {type(entity)}" return entity.location
[docs] def _add_entity(self, entity: Entity): """Add an entity to the environment. Args ---- entity: The entity to add """ assert isinstance(entity, Entity), f"Expected Entity, got {type(entity)}" if not self._is_carla_initialized: self.initialise() entity.initialise(self._world)
[docs] def add_gizmo(self, gizmo: Gizmo): """Add a gizmo (visualization) to the environment. Args ---- gizmo: The gizmo to add """ assert isinstance(gizmo, Gizmo), f"Expected Gizmo, got {type(gizmo)}" self._gizmos.append(gizmo) gizmo.initialise(self._world)
[docs] def add_gizmos(self, *gizmos: "list[Gizmo]"): """Add multiple gizmos to the environment. Args ---- gizmos: Variable number of gizmo objects to add """ for gizmo in gizmos: self.add_gizmo(gizmo)
@property def world(self) -> "carla.World": """ Get the carla world. The world is the main object that contains all the actors and the environment. """ if self._world is not None: return self._world raise RuntimeError("World is not initialized. Call initialise() first.") @property def spectator(self) -> "carla.Actor": """ Get the spectator of the world. The spectator is a special actor that can be used to view the world from a different perspective. """ if self._spectator and self._world is not None: return self._world.get_spectator() raise RuntimeError("Spectator is not enabled in this environment or the world is not initialized.")
[docs] def initialise(self): if self._is_carla_initialized: return self._notify("initialising", self) self._is_carla_initialized = True self._client.load_world(self._world_name) self._world = self._client.get_world() self._original_settings = self._world.get_settings() settings = self.world.get_settings() settings.fixed_delta_seconds = self._dt if self._dt > 0.0 else settings.fixed_delta_seconds settings.synchronous_mode = self._dt > 0.0 settings.no_rendering_mode = not self._render self._world.apply_settings(settings) if settings.synchronous_mode: tm = self._client.get_trafficmanager() tm.set_synchronous_mode(True) self._notify("initialised", self)
[docs] def step(self): self._notify("stepping", self) super().step() for entity in self._agent_entities.values(): entity.step() for gizmo in self._gizmos: gizmo.draw(self._world) if self._dt > 0.0: self._world.tick() self._notify("stepped", self)
[docs] def stop(self): self._notify("stopping", self) self._is_carla_initialized = False for entity in self._agent_entities.values(): entity.stop() self._client.apply_batch([carla.command.DestroyActor(x.actor) for x in self._entities if not x.pre_existing]) if self._original_settings is not None: self._world.apply_settings(self._original_settings) self._notify("stopped", self)