"""
author Shuhao Qi
"""
import math
from enum import Enum
from typing import TYPE_CHECKING
import carla
import numpy as np
from symaware.base import Component, Identifier, Tasynclooplock
from .entities import VehicleEntity
from .util import Spline2D, calc_path, local_wp, pi_2_pi, ref_waypoint
if TYPE_CHECKING:
from .perception import FeatureObservation
[docs]
class Status(Enum):
"""Enumeration of path planning statuses.
Defines possible states during path planning: following lane, lane changing,
and stopping scenarios.
"""
FOLLOWING = 0
START_LANE_CHANGE_L = 1
LANE_CHANGING_L = 2
START_LANE_CHANGE_R = 3
LANE_CHANGING_R = 4
STOPPING = 5
[docs]
class PathPlanner(Component):
"""Path planning component for vehicle navigation in CARLA.
Computes safe and feasible paths for vehicles considering lane changes,
junctions, and stopping scenarios. Uses spline-based path generation
with behavioral state management.
"""
def __init__(
self,
agent_id: "Identifier",
world: "carla.World",
async_loop_lock: "Tasynclooplock | None" = None,
):
super().__init__(agent_id, async_loop_lock)
self.d_dist: float = 0.1
self._world = world
self.merge_point = None
self.target_lane = None
self.junction_wps = None
self.merge_length: float = 0.0
[docs]
def initialise_component(self, agent, initial_awareness_database, initial_knowledge_database):
assert isinstance(agent.entity, VehicleEntity), "PathPlanner requires agent.entity to be a VehicleEntity"
return super().initialise_component(agent, initial_awareness_database, initial_knowledge_database)
[docs]
def _compute(self):
status = self._agent.self_knowledge["status"]
features: FeatureObservation = self._agent.self_knowledge["features"]
v: float = np.hypot(self._agent.self_state[6], self._agent.self_state[7])
merge_dist = max(4.0 * v, 12.0)
if status == Status.FOLLOWING:
print("Current status: following")
follow_lane = local_wp(features.wp_list[0])
if features.is_junction:
follow_lane = self.junction_waypoints(features.junction_lane)
return self.following_path(follow_lane)
if status == Status.STOPPING:
print("Current status: stopping")
follow_lane = local_wp(features.wp_list[0])
return self.following_path(follow_lane)
elif status == Status.START_LANE_CHANGE_L:
print("Current status: Start Left changing")
left_lane = features.wp_list[0].get_left_lane()
merge_point = self.merge_point_calcu(left_lane, merge_dist)
self.target_lane = self._world.get_map().get_waypoint(merge_point)
self._agent.self_knowledge["status"] = Status.LANE_CHANGING_L
return self.lane_change_path(self.target_lane, merge_point)
if status == Status.LANE_CHANGING_L:
print("Current status: Left changing")
return self.lane_change_path(self.target_lane, self.merge_point)
if status == Status.START_LANE_CHANGE_R:
print("Current status: Start Right changing")
right_lane = features.wp_list[0].get_right_lane()
merge_point = self.merge_point_calcu(right_lane, merge_dist)
self.target_lane = self._world.get_map().get_waypoint(self.merge_point)
self._agent.self_knowledge["status"] = Status.LANE_CHANGING_R
return self.lane_change_path(self.target_lane, merge_point)
if status == Status.LANE_CHANGING_R:
print("Current status: Right changing")
return self.lane_change_path(self.target_lane, merge_point)
[docs]
def _update(self, args: "tuple[float, float, float, float]"):
rx, ry, ryaw, s_sum = args
print(f"{args = }")
[docs]
def following_path(self, waypoints: "list[carla.Waypoint]"):
wp_x, wp_y = [], []
for point in waypoints:
wp_x.append(point.transform.location.x)
wp_y.append(point.transform.location.y)
cubicspline = Spline2D(wp_x, wp_y)
s = np.arange(0, cubicspline.s[-1], self.d_dist)
rx, ry, ryaw, rk = [], [], [], []
for i_s in s:
ix, iy = cubicspline.calc_position(i_s)
rx.append(ix)
ry.append(iy)
ryaw.append(pi_2_pi(cubicspline.calc_yaw(i_s)))
rk.append(cubicspline.calc_curvature(i_s))
return rx, ry, ryaw, cubicspline.s[-1]
[docs]
def lane_change_path(self, lane_target: carla.Waypoint, merge_point: carla.Location):
x, y = self._agent.self_state[:2]
yaw = self._agent.self_state[5]
# generate reference line
t_x = merge_point.x
t_y = merge_point.y
t_dx = lane_target.next(0.1)[0].transform.location.x - t_x
t_dy = lane_target.next(0.1)[0].transform.location.y - t_y
t_yaw = math.atan2(t_dy, t_dx)
rx, ry, ryaw, s = calc_path(x, y, yaw, t_x, t_y, t_yaw, 3, self.d_dist)
waypoints = ref_waypoint(lane_target)
wp_x, wp_y = [], []
for point in waypoints:
wp_x.append(point.transform.location.x)
wp_y.append(point.transform.location.y)
cubicspline = Spline2D(wp_x, wp_y)
s = np.arange(0, cubicspline.s[-1], self.d_dist)
for i_s in s:
ix, iy = cubicspline.calc_position(i_s)
rx.append(ix)
ry.append(iy)
ryaw.append(pi_2_pi(cubicspline.calc_yaw(i_s)))
if np.hypot(x - t_x, y - t_y) < 1.0:
self._agent.self_knowledge["status"] = Status.FOLLOWING
return rx, ry, ryaw, s[-1]
[docs]
def merge_point_calcu(self, lane_target: carla.Waypoint, merge_dist: float):
return lane_target.next(merge_dist)[0].transform.location
[docs]
def junction_waypoints(self, fixed_lane: carla.Waypoint):
x, y = self._agent.self_state[:2]
self.junction_wps = fixed_lane.next_until_lane_end(1.0)
next_lane = self.fea_extract.map.get_waypoint(self.junction_wps[-1].transform.location)
self.junction_wps.extend(self.extra_wp(next_lane))
new_point, nearest_dist, index = None, 1000, 0
for i in range(len(self.junction_wps)):
pos = self.junction_wps[i].transform.location
_dist = np.hypot(pos.x - x, pos.y - y)
if _dist < nearest_dist:
nearest_dist = _dist
new_point = pos
index = i
return self.junction_wps[index:]
[docs]
def stopping_path(self, waypoints: "list[carla.Waypoint]", vehicle, stop_point: carla.Location):
wp_x, wp_y = [], []
for point in waypoints:
wp_x.append(point.transform.location.x)
wp_y.append(point.transform.location.y)
cubicspline = Spline2D(wp_x, wp_y)
stop_dist = np.hypot(vehicle.x - stop_point.x, vehicle.y - stop_point.y)
s = np.arange(0, stop_dist, self.d_dist)
rx, ry, ryaw, rk = [], [], [], []
for i_s in s:
ix, iy = cubicspline.calc_position(i_s)
rx.append(ix)
ry.append(iy)
ryaw.append(pi_2_pi(cubicspline.calc_yaw(i_s)))
rk.append(cubicspline.calc_curvature(i_s))
return rx, ry, ryaw, stop_dist