Source code for symaware.simulators.carla.perception
"""
author Shuhao Qi
"""
import math
from dataclasses import dataclass, field
import carla
import numpy as np
from symaware.base import Identifier, Observation, PerceptionSystem, Tasynclooplock
from .entities import VehicleEntity
from .environment import Environment
from .util import branch_wp, cal_angle, local_wp
[docs]
@dataclass(frozen=True)
class FeatureObservation(Observation):
"""
Observation class for the FeatureExt system.
Args
----
observed_object_id:
id of the agent that is the subject of the observation
state:
state of the observed agent
"""
cur_lane: "carla.Waypoint | None" = None
is_junction: bool = False
current_wp: "list[carla.Waypoint]" = field(default_factory=list)
all_wp: "list[carla.Waypoint]" = field(default_factory=list)
wp_list: "list[carla.Waypoint]" = field(default_factory=list)
lane_width: float = 0.0
junction_lane: " carla.Waypoint | None" = None
lead_car: "tuple[float, float, float, float] | None" = None
[docs]
class FeatureExtractor(PerceptionSystem):
"""Extracts features and observations from the CARLA environment.
Perceives vehicle states, lane information, traffic lights, and nearby vehicles
to build a comprehensive observation of the driving environment.
"""
def __init__(self, agent_id: int, environment: Environment, async_loop_lock: "Tasynclooplock | None" = None):
super().__init__(agent_id, environment, async_loop_lock)
self._env: Environment
self.traffic_light_flag = None
self.wp_list = None
self.current_loc = None
self.max_distance = 70
self.distance_rate = 1.4
self.visible_zombie_cars: "list[carla.Vehicle]" = []
self.show_dt = 1.0
self.is_junction = False
self.junction_lane_flag = False
self.junction_lane = None
self.stop_sign = False
self.start_sign = True
self.stop_wps = None
self.time = 200
self.traffic_light = None
self.traffic_light_flag = False
[docs]
def _compute(self) -> "dict[Identifier, FeatureObservation]":
"""
Compute the information perceived by the agent.
It returns the values collected from the environment.
Returns
-------
Information perceived by the agent. Each agent's state is identified by its id.
"""
current_loc = self._env.get_agent_location(self.agent_id)
cur_lane = self._env.world.get_map().get_waypoint(current_loc)
is_junction = cur_lane.is_junction
current_wp = local_wp(cur_lane)
all_wp = branch_wp(cur_lane)
wp_list = self.wp_list_extract(cur_lane)
lane_width = cur_lane.lane_width
if is_junction:
jun = cur_lane.get_junction()
waypoints = jun.get_waypoints(carla.LaneType.Any)
for waypoint in waypoints:
for wp in waypoint:
if wp.road_id == cur_lane.road_id:
wp_list = self.wp_list_extract(wp)
# self.lane_display(waypoint)
if not self.junction_lane_flag:
self.junction_lane_flag = True
junction_lane = cur_lane
else:
self.junction_lane_flag = False
junction_lane = None
return {
self.agent_id: FeatureObservation(
observed_object_id=self.agent_id,
cur_lane=cur_lane,
is_junction=is_junction,
current_wp=current_wp,
all_wp=all_wp,
wp_list=wp_list,
lane_width=lane_width,
junction_lane=junction_lane,
lead_car=self.find_lead_car(cur_lane),
)
}
[docs]
def _update(self, perceived_information: "dict[Identifier, FeatureObservation]"):
for agent_id, feature_observation in perceived_information.items():
feature_knowledge: dict = self._agent._knowledge_database.setdefault(agent_id, {})
feature_knowledge["features"] = feature_observation
[docs]
def traffic_light_rule(self):
# Check the state of traffic light
if not self.traffic_light_flag:
self.traffic_light = self._env.world.get_traffic_lights_from_waypoint(self.current_wp[0], 50)
if self.traffic_light:
self.traffic_light_flag = True
if self.traffic_light[0].get_state() == carla.TrafficLightState.Red:
self.stop_sign = True
self.start_sign = False
self.stop_wps = self.traffic_light[0].get_stop_waypoints()
elif self.traffic_light[0].get_state() == carla.TrafficLightState.Green:
print("Green Light")
self.stop_sign = False
self.start_sign = True
self.traffic_light_flag = False
else:
self.stop_sign = False
# Set the green light forcefully
if self.stop_sign:
self.time -= 1
print("self.time", self.time)
if self.time < 0:
self.time = 200
self.traffic_light[0].set_state(carla.TrafficLightState.Green)
[docs]
def wp_list_extract(self, cur_wp: carla.Waypoint):
wp_l = [cur_wp]
if cur_wp.is_junction is False:
wp_l = self.wp_side_extract(wp_l, cur_wp, "right")
wp_l = self.wp_side_extract(wp_l, cur_wp, "left")
return wp_l
[docs]
def find_lead_car(self, cur_wp: carla.Waypoint) -> "tuple[float, float, float, float] | None":
forward_cars: "list[tuple[float, int]]" = []
x, y = self._env.get_agent_state(self.agent_id)[:2]
ego_dir = self._env.get_agent_direction(self.agent_id)
ego_vec = [ego_dir.x, ego_dir.y]
for i, car in enumerate(self._env.entities):
if not isinstance(car, VehicleEntity) or car == self._agent.entity:
continue
z_pos = car.location
vec_car = [z_pos.x - x, z_pos.y - y]
dis: float = np.hypot(vec_car[0], vec_car[1])
if dis < 50:
theta = cal_angle(vec_car, ego_vec)
if theta < math.pi / 2 and self.check_onroad(car, cur_wp):
forward_cars.append([dis, i])
if forward_cars:
forward_cars.sort()
lead_car: VehicleEntity = self._env.entities[forward_cars[0][1]]
state = lead_car.state
pos_x, pos_y, vel = state[0], state[1], np.hypot(state[6], state[7])
length = lead_car.actor.bounding_box.extent.x
return [pos_x, pos_y, length, vel]
else:
return None
[docs]
def find_cars_onlane(self, lane):
"""find the forwards and backwards car on the selected lane"""
forwards_cars: "list[tuple[float, int]]" = []
backwards_cars: "list[tuple[float, int]]" = []
ego_pos = [self.vehicle.get_location().x, self.vehicle.get_location().y]
ego_dir = (
self.vehicle.get_physics_control().wheels[0].position
- self.vehicle.get_physics_control().wheels[2].position
)
ego_vec = [ego_dir.x, ego_dir.y]
for i in range(len(self.zombie_cars)):
if self.zombie_cars[i] == self.vehicle:
continue
z_pos = self.zombie_cars[i].get_location()
dis = np.hypot(z_pos.x - self.current_loc.x, z_pos.y - self.current_loc.y)
if dis < 50:
vec_car = [z_pos.x - ego_pos[0], z_pos.y - ego_pos[1]]
theta = common.cal_angle(vec_car, ego_vec)
if self.check_onroad(self.zombie_cars[i], lane):
if theta <= math.pi / 2:
forwards_cars.append([dis, i])
else:
backwards_cars.append([dis, i])
if forwards_cars:
forwards_cars.sort()
lead_car = self.zombie_cars[forwards_cars[0][1]]
pos_x = lead_car.get_transform().location.x
pos_y = lead_car.get_transform().location.y
length = lead_car.bounding_box.extent.x
vel = np.hypot(lead_car.get_velocity().x, lead_car.get_velocity().y)
lead_info = [pos_x, pos_y, length, vel]
# self.bbox_display(lead_car, "blue")
else:
lead_info = None
if backwards_cars:
backwards_cars.sort()
fol_car = self.zombie_cars[backwards_cars[0][1]]
pos_x = fol_car.get_transform().location.x
pos_y = fol_car.get_transform().location.y
length = fol_car.bounding_box.extent.x
vel = np.hypot(fol_car.get_velocity().x, fol_car.get_velocity().y)
acc = np.hypot(fol_car.get_acceleration().x, fol_car.get_acceleration().y)
fol_info = [pos_x, pos_y, length, vel, acc]
# self.bbox_display(fol_car, "green")
else:
fol_info = None
return lead_info, fol_info
[docs]
def check_onroad(self, vehicle: VehicleEntity, lane: carla.Waypoint):
for i in range(4):
p = vehicle.actor.get_physics_control().wheels[i].position / 100
wp = self._env.world.get_map().get_waypoint(p)
if wp.lane_id == lane.lane_id:
return True
return False
[docs]
def lane_display(self, wp_list: "list[carla.Waypoint]"):
# Legal Lane
for wp in wp_list:
if wp.lane_change == carla.LaneChange.Right:
legal_pos_list_left = self.generate_position_list(local_wp(wp), "left")
self.draw_lane_line(legal_pos_list_left, "left")
elif wp.lane_change == carla.LaneChange.Left:
legal_pos_list_right = self.generate_position_list(local_wp(wp), "right")
self.draw_lane_line(legal_pos_list_right, "right")
elif wp.lane_change == carla.LaneChange.NONE:
legal_pos_list_left = self.generate_position_list(local_wp(wp), "left")
legal_pos_list_right = self.generate_position_list(local_wp(wp), "right")
self.draw_lane_line(legal_pos_list_left, "left")
self.draw_lane_line(legal_pos_list_right, "right")
[docs]
def point_display(self, my_color):
# Legal Lane
for wp in self.all_wp:
self._env.world.debug.draw_point(wp.transform.location, size=0.1, color=my_color, life_time=self.show_dt)
[docs]
def bbox_display(self, vehicle: carla.Vehicle, color: str):
if color == "red":
_color = carla.Color(60, 10, 10, 0)
elif color == "green":
_color = carla.Color(10, 60, 10, 0)
elif color == "blue":
_color = carla.Color(10, 10, 60, 0)
# Bounding Box
bounding_box = vehicle.bounding_box
bounding_box.location = vehicle.get_transform().location
self._env.world.debug.draw_box(
bounding_box, vehicle.get_transform().rotation, color=_color, life_time=self.show_dt
)
[docs]
def ref_display(self, ref_x, ref_y):
for n in range(0, len(ref_x), max(int(len(ref_x) / 20), 1)):
p = carla.Location()
p.x = ref_x[n]
p.y = ref_y[n]
self._env.world.debug.draw_point(p, size=0.08, color=carla.Color(0, 0, 255), life_time=self.show_dt)
[docs]
def wp_side_extract(self, wp_list: "list[carla.Waypoint]", wp: carla.Waypoint, side: str):
if side == "right":
while True:
if wp.lane_change == carla.LaneChange.Right or wp.lane_change == carla.LaneChange.Both:
wp = wp.get_right_lane()
wp_list.append(wp)
else:
break
if side == "left":
while True:
if wp.lane_change == carla.LaneChange.Left or wp.lane_change == carla.LaneChange.Both:
wp = wp.get_left_lane()
wp_list.append(wp)
else:
break
return wp_list
[docs]
def find_lanepoint_right(self, wp: carla.Waypoint):
location_drift = carla.Location(
x=-np.sin(wp.transform.rotation.yaw / 180 * np.pi) * wp.lane_width / 2,
y=np.cos(wp.transform.rotation.yaw / 180 * np.pi) * wp.lane_width / 2,
z=0.2,
)
lp = carla.Location(wp.transform.location + location_drift)
return lp
[docs]
def find_lanepoint_left(self, wp: carla.Waypoint):
location_drift = carla.Location(
x=np.sin(wp.transform.rotation.yaw / 180 * np.pi) * wp.lane_width / 2,
y=-np.cos(wp.transform.rotation.yaw / 180 * np.pi) * wp.lane_width / 2,
z=0.2,
)
lp = carla.Location(wp.transform.location + location_drift)
return lp
[docs]
def generate_position_list(self, wp_l: "list[carla.Waypoint]", side: str = "right"):
if wp_l is None:
return None
else:
pos_list = []
if side == "right":
for i in range(len(wp_l)):
pos_list.append(self.find_lanepoint_right(wp_l[i]))
elif side == "left":
for i in range(len(wp_l)):
pos_list.append(self.find_lanepoint_left(wp_l[i]))
else:
return None
return pos_list
[docs]
def draw_lane_points(self, wp_l: "list[carla.Waypoint]", side: str = "right"):
if wp_l and side == "middle":
for i in range(len(wp_l)):
self._env.world.debug.draw_point(wp_l[i].transform.location + carla.Location(0, 0, 0.2), life_time=0.2)
if wp_l and side == "right":
for i in range(len(wp_l)):
self._env.world.debug.draw_point(self.find_lanepoint_right(wp_l[i]), life_time=0.2)
elif wp_l and side == "left":
for i in range(len(wp_l)):
self._env.world.debug.draw_point(self.find_lanepoint_left(wp_l[i]), life_time=0.2)
return
[docs]
def draw_lane_line(self, pos_list: "list[carla.Location]", side="right"):
if pos_list and side == "right":
for _loop in range(len(pos_list) // 2 - 1):
self._env.world.debug.draw_line(
pos_list[2 * _loop],
pos_list[2 * _loop + 1],
thickness=0.2,
color=carla.Color(50, 0, 100, 0),
life_time=0.2,
)
if pos_list and side == "left":
for _loop in range(len(pos_list) // 2 - 1):
self._env.world.debug.draw_line(
pos_list[2 * _loop],
pos_list[2 * _loop + 1],
thickness=0.2,
color=carla.Color(50, 0, 100, 0),
life_time=0.2,
)