Source code for symaware.simulators.carla.perception

"""
author Shuhao Qi
"""

import math
from dataclasses import dataclass, field

import carla
import numpy as np

from symaware.base import Identifier, Observation, PerceptionSystem, Tasynclooplock

from .entities import VehicleEntity
from .environment import Environment
from .util import branch_wp, cal_angle, local_wp


[docs] @dataclass(frozen=True) class FeatureObservation(Observation): """ Observation class for the FeatureExt system. Args ---- observed_object_id: id of the agent that is the subject of the observation state: state of the observed agent """ cur_lane: "carla.Waypoint | None" = None is_junction: bool = False current_wp: "list[carla.Waypoint]" = field(default_factory=list) all_wp: "list[carla.Waypoint]" = field(default_factory=list) wp_list: "list[carla.Waypoint]" = field(default_factory=list) lane_width: float = 0.0 junction_lane: " carla.Waypoint | None" = None lead_car: "tuple[float, float, float, float] | None" = None
[docs] class FeatureExtractor(PerceptionSystem): """Extracts features and observations from the CARLA environment. Perceives vehicle states, lane information, traffic lights, and nearby vehicles to build a comprehensive observation of the driving environment. """ def __init__(self, agent_id: int, environment: Environment, async_loop_lock: "Tasynclooplock | None" = None): super().__init__(agent_id, environment, async_loop_lock) self._env: Environment self.traffic_light_flag = None self.wp_list = None self.current_loc = None self.max_distance = 70 self.distance_rate = 1.4 self.visible_zombie_cars: "list[carla.Vehicle]" = [] self.show_dt = 1.0 self.is_junction = False self.junction_lane_flag = False self.junction_lane = None self.stop_sign = False self.start_sign = True self.stop_wps = None self.time = 200 self.traffic_light = None self.traffic_light_flag = False
[docs] def _compute(self) -> "dict[Identifier, FeatureObservation]": """ Compute the information perceived by the agent. It returns the values collected from the environment. Returns ------- Information perceived by the agent. Each agent's state is identified by its id. """ current_loc = self._env.get_agent_location(self.agent_id) cur_lane = self._env.world.get_map().get_waypoint(current_loc) is_junction = cur_lane.is_junction current_wp = local_wp(cur_lane) all_wp = branch_wp(cur_lane) wp_list = self.wp_list_extract(cur_lane) lane_width = cur_lane.lane_width if is_junction: jun = cur_lane.get_junction() waypoints = jun.get_waypoints(carla.LaneType.Any) for waypoint in waypoints: for wp in waypoint: if wp.road_id == cur_lane.road_id: wp_list = self.wp_list_extract(wp) # self.lane_display(waypoint) if not self.junction_lane_flag: self.junction_lane_flag = True junction_lane = cur_lane else: self.junction_lane_flag = False junction_lane = None return { self.agent_id: FeatureObservation( observed_object_id=self.agent_id, cur_lane=cur_lane, is_junction=is_junction, current_wp=current_wp, all_wp=all_wp, wp_list=wp_list, lane_width=lane_width, junction_lane=junction_lane, lead_car=self.find_lead_car(cur_lane), ) }
[docs] def _update(self, perceived_information: "dict[Identifier, FeatureObservation]"): for agent_id, feature_observation in perceived_information.items(): feature_knowledge: dict = self._agent._knowledge_database.setdefault(agent_id, {}) feature_knowledge["features"] = feature_observation
[docs] def traffic_light_rule(self): # Check the state of traffic light if not self.traffic_light_flag: self.traffic_light = self._env.world.get_traffic_lights_from_waypoint(self.current_wp[0], 50) if self.traffic_light: self.traffic_light_flag = True if self.traffic_light[0].get_state() == carla.TrafficLightState.Red: self.stop_sign = True self.start_sign = False self.stop_wps = self.traffic_light[0].get_stop_waypoints() elif self.traffic_light[0].get_state() == carla.TrafficLightState.Green: print("Green Light") self.stop_sign = False self.start_sign = True self.traffic_light_flag = False else: self.stop_sign = False # Set the green light forcefully if self.stop_sign: self.time -= 1 print("self.time", self.time) if self.time < 0: self.time = 200 self.traffic_light[0].set_state(carla.TrafficLightState.Green)
[docs] def wp_list_extract(self, cur_wp: carla.Waypoint): wp_l = [cur_wp] if cur_wp.is_junction is False: wp_l = self.wp_side_extract(wp_l, cur_wp, "right") wp_l = self.wp_side_extract(wp_l, cur_wp, "left") return wp_l
[docs] def find_lead_car(self, cur_wp: carla.Waypoint) -> "tuple[float, float, float, float] | None": forward_cars: "list[tuple[float, int]]" = [] x, y = self._env.get_agent_state(self.agent_id)[:2] ego_dir = self._env.get_agent_direction(self.agent_id) ego_vec = [ego_dir.x, ego_dir.y] for i, car in enumerate(self._env.entities): if not isinstance(car, VehicleEntity) or car == self._agent.entity: continue z_pos = car.location vec_car = [z_pos.x - x, z_pos.y - y] dis: float = np.hypot(vec_car[0], vec_car[1]) if dis < 50: theta = cal_angle(vec_car, ego_vec) if theta < math.pi / 2 and self.check_onroad(car, cur_wp): forward_cars.append([dis, i]) if forward_cars: forward_cars.sort() lead_car: VehicleEntity = self._env.entities[forward_cars[0][1]] state = lead_car.state pos_x, pos_y, vel = state[0], state[1], np.hypot(state[6], state[7]) length = lead_car.actor.bounding_box.extent.x return [pos_x, pos_y, length, vel] else: return None
[docs] def find_cars_onlane(self, lane): """find the forwards and backwards car on the selected lane""" forwards_cars: "list[tuple[float, int]]" = [] backwards_cars: "list[tuple[float, int]]" = [] ego_pos = [self.vehicle.get_location().x, self.vehicle.get_location().y] ego_dir = ( self.vehicle.get_physics_control().wheels[0].position - self.vehicle.get_physics_control().wheels[2].position ) ego_vec = [ego_dir.x, ego_dir.y] for i in range(len(self.zombie_cars)): if self.zombie_cars[i] == self.vehicle: continue z_pos = self.zombie_cars[i].get_location() dis = np.hypot(z_pos.x - self.current_loc.x, z_pos.y - self.current_loc.y) if dis < 50: vec_car = [z_pos.x - ego_pos[0], z_pos.y - ego_pos[1]] theta = common.cal_angle(vec_car, ego_vec) if self.check_onroad(self.zombie_cars[i], lane): if theta <= math.pi / 2: forwards_cars.append([dis, i]) else: backwards_cars.append([dis, i]) if forwards_cars: forwards_cars.sort() lead_car = self.zombie_cars[forwards_cars[0][1]] pos_x = lead_car.get_transform().location.x pos_y = lead_car.get_transform().location.y length = lead_car.bounding_box.extent.x vel = np.hypot(lead_car.get_velocity().x, lead_car.get_velocity().y) lead_info = [pos_x, pos_y, length, vel] # self.bbox_display(lead_car, "blue") else: lead_info = None if backwards_cars: backwards_cars.sort() fol_car = self.zombie_cars[backwards_cars[0][1]] pos_x = fol_car.get_transform().location.x pos_y = fol_car.get_transform().location.y length = fol_car.bounding_box.extent.x vel = np.hypot(fol_car.get_velocity().x, fol_car.get_velocity().y) acc = np.hypot(fol_car.get_acceleration().x, fol_car.get_acceleration().y) fol_info = [pos_x, pos_y, length, vel, acc] # self.bbox_display(fol_car, "green") else: fol_info = None return lead_info, fol_info
[docs] def check_onroad(self, vehicle: VehicleEntity, lane: carla.Waypoint): for i in range(4): p = vehicle.actor.get_physics_control().wheels[i].position / 100 wp = self._env.world.get_map().get_waypoint(p) if wp.lane_id == lane.lane_id: return True return False
[docs] def lane_display(self, wp_list: "list[carla.Waypoint]"): # Legal Lane for wp in wp_list: if wp.lane_change == carla.LaneChange.Right: legal_pos_list_left = self.generate_position_list(local_wp(wp), "left") self.draw_lane_line(legal_pos_list_left, "left") elif wp.lane_change == carla.LaneChange.Left: legal_pos_list_right = self.generate_position_list(local_wp(wp), "right") self.draw_lane_line(legal_pos_list_right, "right") elif wp.lane_change == carla.LaneChange.NONE: legal_pos_list_left = self.generate_position_list(local_wp(wp), "left") legal_pos_list_right = self.generate_position_list(local_wp(wp), "right") self.draw_lane_line(legal_pos_list_left, "left") self.draw_lane_line(legal_pos_list_right, "right")
[docs] def point_display(self, my_color): # Legal Lane for wp in self.all_wp: self._env.world.debug.draw_point(wp.transform.location, size=0.1, color=my_color, life_time=self.show_dt)
[docs] def bbox_display(self, vehicle: carla.Vehicle, color: str): if color == "red": _color = carla.Color(60, 10, 10, 0) elif color == "green": _color = carla.Color(10, 60, 10, 0) elif color == "blue": _color = carla.Color(10, 10, 60, 0) # Bounding Box bounding_box = vehicle.bounding_box bounding_box.location = vehicle.get_transform().location self._env.world.debug.draw_box( bounding_box, vehicle.get_transform().rotation, color=_color, life_time=self.show_dt )
[docs] def ref_display(self, ref_x, ref_y): for n in range(0, len(ref_x), max(int(len(ref_x) / 20), 1)): p = carla.Location() p.x = ref_x[n] p.y = ref_y[n] self._env.world.debug.draw_point(p, size=0.08, color=carla.Color(0, 0, 255), life_time=self.show_dt)
[docs] def wp_side_extract(self, wp_list: "list[carla.Waypoint]", wp: carla.Waypoint, side: str): if side == "right": while True: if wp.lane_change == carla.LaneChange.Right or wp.lane_change == carla.LaneChange.Both: wp = wp.get_right_lane() wp_list.append(wp) else: break if side == "left": while True: if wp.lane_change == carla.LaneChange.Left or wp.lane_change == carla.LaneChange.Both: wp = wp.get_left_lane() wp_list.append(wp) else: break return wp_list
[docs] def find_lanepoint_right(self, wp: carla.Waypoint): location_drift = carla.Location( x=-np.sin(wp.transform.rotation.yaw / 180 * np.pi) * wp.lane_width / 2, y=np.cos(wp.transform.rotation.yaw / 180 * np.pi) * wp.lane_width / 2, z=0.2, ) lp = carla.Location(wp.transform.location + location_drift) return lp
[docs] def find_lanepoint_left(self, wp: carla.Waypoint): location_drift = carla.Location( x=np.sin(wp.transform.rotation.yaw / 180 * np.pi) * wp.lane_width / 2, y=-np.cos(wp.transform.rotation.yaw / 180 * np.pi) * wp.lane_width / 2, z=0.2, ) lp = carla.Location(wp.transform.location + location_drift) return lp
[docs] def generate_position_list(self, wp_l: "list[carla.Waypoint]", side: str = "right"): if wp_l is None: return None else: pos_list = [] if side == "right": for i in range(len(wp_l)): pos_list.append(self.find_lanepoint_right(wp_l[i])) elif side == "left": for i in range(len(wp_l)): pos_list.append(self.find_lanepoint_left(wp_l[i])) else: return None return pos_list
[docs] def draw_lane_points(self, wp_l: "list[carla.Waypoint]", side: str = "right"): if wp_l and side == "middle": for i in range(len(wp_l)): self._env.world.debug.draw_point(wp_l[i].transform.location + carla.Location(0, 0, 0.2), life_time=0.2) if wp_l and side == "right": for i in range(len(wp_l)): self._env.world.debug.draw_point(self.find_lanepoint_right(wp_l[i]), life_time=0.2) elif wp_l and side == "left": for i in range(len(wp_l)): self._env.world.debug.draw_point(self.find_lanepoint_left(wp_l[i]), life_time=0.2) return
[docs] def draw_lane_line(self, pos_list: "list[carla.Location]", side="right"): if pos_list and side == "right": for _loop in range(len(pos_list) // 2 - 1): self._env.world.debug.draw_line( pos_list[2 * _loop], pos_list[2 * _loop + 1], thickness=0.2, color=carla.Color(50, 0, 100, 0), life_time=0.2, ) if pos_list and side == "left": for _loop in range(len(pos_list) // 2 - 1): self._env.world.debug.draw_line( pos_list[2 * _loop], pos_list[2 * _loop + 1], thickness=0.2, color=carla.Color(50, 0, 100, 0), life_time=0.2, )