from typing import TYPE_CHECKING
import carla
import numpy as np
from symaware.base import Identifier, TimeIntervalAsyncLoopLock
from symaware.base.models import Environment as BaseEnvironment
from symaware.base.utils import get_logger, log
from .entities import Entity
from .gizmo import Gizmo
if TYPE_CHECKING:
# String type hinting to support python 3.9
from symaware.base import Agent
from symaware.base.utils import AsyncLoopLock
[docs]
class Environment(BaseEnvironment):
"""
Environment based on the carla simulator.
Args
----
real_time_interval:
If set to a strictly positive value, pybullet will run the simulation in real time.
Otherwise, the simulation will run when :func:`step` is called.
connection_method:
Method used to connect to the pybullet server. See the pybullet documentation for more information.
plane_scaling:
Scale of the plane automatically added to the environment to represent the ground.
If set to a non positive value, the plane will not be added.
gravity:
Set the gravity the physics engine will apply at each simulation step.
It can be a single floating point value, in which case it will be applied along the third axis,
or a tuple of three values, one for each axis.
async_loop_lock:
Async loop lock to use for the environment
"""
__LOGGER = get_logger(__name__, "carla.Environment")
def __init__(
self,
host: str = "127.0.0.1",
port: int = 2000,
connection_timeout: float = 5000,
map: str = "Town01",
spectator: bool = False,
render: bool = True,
time_interval: float = 0.0,
async_loop_lock: "AsyncLoopLock | None" = None,
):
super().__init__(async_loop_lock)
self._is_carla_initialized = False
self._client = carla.Client(host=host, port=port)
self._client.set_timeout(connection_timeout)
self._world_name = map
self._original_settings: "carla.WorldSettings" = None
self._world: "carla.World" = None
self._spectator = spectator
self._gizmos: "list[Gizmo]" = []
self._render = render
self._dt = (
async_loop_lock.time_interval
if isinstance(async_loop_lock, TimeIntervalAsyncLoopLock) and time_interval == 0.0
else time_interval
)
[docs]
def get_waypoints(self, distance: float = 5.0) -> "list[carla.Waypoint]":
"""
Get all the waypoints in the current map.
Returns
-------
List of waypoints in the current map
"""
assert self._is_carla_initialized, "Carla environment is not initialized. Call initialise() first."
assert self._world is not None, "World is not initialized. Call initialise() first."
assert distance > 0.0, "Distance must be strictly positive."
return self._world.get_map().generate_waypoints(distance)
[docs]
def get_entity_state(self, entity: Entity) -> np.ndarray:
"""Get the state of an entity.
Args
----
entity:
The entity to get the state of
Returns
-------
State array of the entity
"""
assert isinstance(entity, Entity), f"Expected Entity, got {type(entity)}"
return entity.state
[docs]
def get_agent_location(self, agent: "Identifier | Agent") -> "carla.Location":
"""Get the location of an agent.
Args
----
agent:
Agent identifier or Agent object
Returns
-------
Location of the agent
"""
if isinstance(agent, Identifier):
return self.get_entity_location(self._agent_entities[agent])
return self.get_entity_location(self._agent_entities[agent.id])
[docs]
def get_entity_direction(self, entity: Entity) -> carla.Vector3D:
"""Get the direction vector of an entity.
Args
----
entity:
The entity to get the direction of
Returns
-------
Direction vector of the entity
"""
assert isinstance(entity, Entity), f"Expected Entity, got {type(entity)}"
return entity.direction
[docs]
def get_agent_direction(self, agent: "Identifier | Agent") -> carla.Vector3D:
"""
Get the direction of an agent in the environment.
The actual implementation should be done in the derived class, based on the
simulated environment API.
Args
----
agent:
agent to get the direction of
Returns
-------
Direction of the agent within the environment
"""
if isinstance(agent, Identifier):
return self._agent_entities[agent].direction
return self._agent_entities[agent.id].direction
[docs]
def get_entity_location(self, entity: Entity) -> "carla.Location":
"""Get the location of an entity.
Args
----
entity:
The entity to get the location of
Returns
-------
Location of the entity
"""
assert isinstance(entity, Entity), f"Expected Entity, got {type(entity)}"
return entity.location
[docs]
def _add_entity(self, entity: Entity):
"""Add an entity to the environment.
Args
----
entity:
The entity to add
"""
assert isinstance(entity, Entity), f"Expected Entity, got {type(entity)}"
if not self._is_carla_initialized:
self.initialise()
entity.initialise(self._world)
[docs]
def add_gizmo(self, gizmo: Gizmo):
"""Add a gizmo (visualization) to the environment.
Args
----
gizmo:
The gizmo to add
"""
assert isinstance(gizmo, Gizmo), f"Expected Gizmo, got {type(gizmo)}"
self._gizmos.append(gizmo)
gizmo.initialise(self._world)
[docs]
def add_gizmos(self, *gizmos: "list[Gizmo]"):
"""Add multiple gizmos to the environment.
Args
----
gizmos:
Variable number of gizmo objects to add
"""
for gizmo in gizmos:
self.add_gizmo(gizmo)
@property
def world(self) -> "carla.World":
"""
Get the carla world.
The world is the main object that contains all the actors and the environment.
"""
if self._world is not None:
return self._world
raise RuntimeError("World is not initialized. Call initialise() first.")
@property
def spectator(self) -> "carla.Actor":
"""
Get the spectator of the world.
The spectator is a special actor that can be used to view the world from a different perspective.
"""
if self._spectator and self._world is not None:
return self._world.get_spectator()
raise RuntimeError("Spectator is not enabled in this environment or the world is not initialized.")
[docs]
def initialise(self):
if self._is_carla_initialized:
return
self._notify("initialising", self)
self._is_carla_initialized = True
self._client.load_world(self._world_name)
self._world = self._client.get_world()
self._original_settings = self._world.get_settings()
settings = self.world.get_settings()
settings.fixed_delta_seconds = self._dt if self._dt > 0.0 else settings.fixed_delta_seconds
settings.synchronous_mode = self._dt > 0.0
settings.no_rendering_mode = not self._render
self._world.apply_settings(settings)
if settings.synchronous_mode:
tm = self._client.get_trafficmanager()
tm.set_synchronous_mode(True)
self._notify("initialised", self)
[docs]
def step(self):
self._notify("stepping", self)
super().step()
for entity in self._agent_entities.values():
entity.step()
for gizmo in self._gizmos:
gizmo.draw(self._world)
if self._dt > 0.0:
self._world.tick()
self._notify("stepped", self)
[docs]
def stop(self):
self._notify("stopping", self)
self._is_carla_initialized = False
for entity in self._agent_entities.values():
entity.stop()
self._client.apply_batch([carla.command.DestroyActor(x.actor) for x in self._entities if not x.pre_existing])
if self._original_settings is not None:
self._world.apply_settings(self._original_settings)
self._notify("stopped", self)