Source code for symaware.simulators.carla.path_planner

"""
author Shuhao Qi
"""

import math
from enum import Enum
from typing import TYPE_CHECKING

import carla
import numpy as np

from symaware.base import Component, Identifier, Tasynclooplock

from .entities import VehicleEntity
from .util import Spline2D, calc_path, local_wp, pi_2_pi, ref_waypoint

if TYPE_CHECKING:
    from .perception import FeatureObservation


[docs] class Status(Enum): """Enumeration of path planning statuses. Defines possible states during path planning: following lane, lane changing, and stopping scenarios. """ FOLLOWING = 0 START_LANE_CHANGE_L = 1 LANE_CHANGING_L = 2 START_LANE_CHANGE_R = 3 LANE_CHANGING_R = 4 STOPPING = 5
[docs] class PathPlanner(Component): """Path planning component for vehicle navigation in CARLA. Computes safe and feasible paths for vehicles considering lane changes, junctions, and stopping scenarios. Uses spline-based path generation with behavioral state management. """ def __init__( self, agent_id: "Identifier", world: "carla.World", async_loop_lock: "Tasynclooplock | None" = None, ): super().__init__(agent_id, async_loop_lock) self.d_dist: float = 0.1 self._world = world self.merge_point = None self.target_lane = None self.junction_wps = None self.merge_length: float = 0.0
[docs] def initialise_component(self, agent, initial_awareness_database, initial_knowledge_database): assert isinstance(agent.entity, VehicleEntity), "PathPlanner requires agent.entity to be a VehicleEntity" return super().initialise_component(agent, initial_awareness_database, initial_knowledge_database)
[docs] def _compute(self): status = self._agent.self_knowledge["status"] features: FeatureObservation = self._agent.self_knowledge["features"] v: float = np.hypot(self._agent.self_state[6], self._agent.self_state[7]) merge_dist = max(4.0 * v, 12.0) if status == Status.FOLLOWING: print("Current status: following") follow_lane = local_wp(features.wp_list[0]) if features.is_junction: follow_lane = self.junction_waypoints(features.junction_lane) return self.following_path(follow_lane) if status == Status.STOPPING: print("Current status: stopping") follow_lane = local_wp(features.wp_list[0]) return self.following_path(follow_lane) elif status == Status.START_LANE_CHANGE_L: print("Current status: Start Left changing") left_lane = features.wp_list[0].get_left_lane() merge_point = self.merge_point_calcu(left_lane, merge_dist) self.target_lane = self._world.get_map().get_waypoint(merge_point) self._agent.self_knowledge["status"] = Status.LANE_CHANGING_L return self.lane_change_path(self.target_lane, merge_point) if status == Status.LANE_CHANGING_L: print("Current status: Left changing") return self.lane_change_path(self.target_lane, self.merge_point) if status == Status.START_LANE_CHANGE_R: print("Current status: Start Right changing") right_lane = features.wp_list[0].get_right_lane() merge_point = self.merge_point_calcu(right_lane, merge_dist) self.target_lane = self._world.get_map().get_waypoint(self.merge_point) self._agent.self_knowledge["status"] = Status.LANE_CHANGING_R return self.lane_change_path(self.target_lane, merge_point) if status == Status.LANE_CHANGING_R: print("Current status: Right changing") return self.lane_change_path(self.target_lane, merge_point)
[docs] def _update(self, args: "tuple[float, float, float, float]"): rx, ry, ryaw, s_sum = args print(f"{args = }")
[docs] def following_path(self, waypoints: "list[carla.Waypoint]"): wp_x, wp_y = [], [] for point in waypoints: wp_x.append(point.transform.location.x) wp_y.append(point.transform.location.y) cubicspline = Spline2D(wp_x, wp_y) s = np.arange(0, cubicspline.s[-1], self.d_dist) rx, ry, ryaw, rk = [], [], [], [] for i_s in s: ix, iy = cubicspline.calc_position(i_s) rx.append(ix) ry.append(iy) ryaw.append(pi_2_pi(cubicspline.calc_yaw(i_s))) rk.append(cubicspline.calc_curvature(i_s)) return rx, ry, ryaw, cubicspline.s[-1]
[docs] def lane_change_path(self, lane_target: carla.Waypoint, merge_point: carla.Location): x, y = self._agent.self_state[:2] yaw = self._agent.self_state[5] # generate reference line t_x = merge_point.x t_y = merge_point.y t_dx = lane_target.next(0.1)[0].transform.location.x - t_x t_dy = lane_target.next(0.1)[0].transform.location.y - t_y t_yaw = math.atan2(t_dy, t_dx) rx, ry, ryaw, s = calc_path(x, y, yaw, t_x, t_y, t_yaw, 3, self.d_dist) waypoints = ref_waypoint(lane_target) wp_x, wp_y = [], [] for point in waypoints: wp_x.append(point.transform.location.x) wp_y.append(point.transform.location.y) cubicspline = Spline2D(wp_x, wp_y) s = np.arange(0, cubicspline.s[-1], self.d_dist) for i_s in s: ix, iy = cubicspline.calc_position(i_s) rx.append(ix) ry.append(iy) ryaw.append(pi_2_pi(cubicspline.calc_yaw(i_s))) if np.hypot(x - t_x, y - t_y) < 1.0: self._agent.self_knowledge["status"] = Status.FOLLOWING return rx, ry, ryaw, s[-1]
[docs] def merge_point_calcu(self, lane_target: carla.Waypoint, merge_dist: float): return lane_target.next(merge_dist)[0].transform.location
[docs] def junction_waypoints(self, fixed_lane: carla.Waypoint): x, y = self._agent.self_state[:2] self.junction_wps = fixed_lane.next_until_lane_end(1.0) next_lane = self.fea_extract.map.get_waypoint(self.junction_wps[-1].transform.location) self.junction_wps.extend(self.extra_wp(next_lane)) new_point, nearest_dist, index = None, 1000, 0 for i in range(len(self.junction_wps)): pos = self.junction_wps[i].transform.location _dist = np.hypot(pos.x - x, pos.y - y) if _dist < nearest_dist: nearest_dist = _dist new_point = pos index = i return self.junction_wps[index:]
[docs] def stopping_path(self, waypoints: "list[carla.Waypoint]", vehicle, stop_point: carla.Location): wp_x, wp_y = [], [] for point in waypoints: wp_x.append(point.transform.location.x) wp_y.append(point.transform.location.y) cubicspline = Spline2D(wp_x, wp_y) stop_dist = np.hypot(vehicle.x - stop_point.x, vehicle.y - stop_point.y) s = np.arange(0, stop_dist, self.d_dist) rx, ry, ryaw, rk = [], [], [], [] for i_s in s: ix, iy = cubicspline.calc_position(i_s) rx.append(ix) ry.append(iy) ryaw.append(pi_2_pi(cubicspline.calc_yaw(i_s))) rk.append(cubicspline.calc_curvature(i_s)) return rx, ry, ryaw, stop_dist
[docs] def extra_wp(self, wp: carla.Waypoint, max_distance: float = 30.0): seq = 1.0 wp_l = [wp.next(0.01)[0]] while True: wp_l.append(wp.next(seq)[0]) seq += 1 if seq > max_distance: break return wp_l