from typing import TYPE_CHECKING
import math
import numpy as np
import roslibpy
from symaware.base import Controller as BaseController
from symaware.base import (
Identifier,
Tasynclooplock,
TimeIntervalAsyncLoopLock,
TimeSeries,
get_logger,
)
from symaware.extra.ros.models import AckermannDriveModel, TwistDriveModel
from symaware.extra.ros.utils import dist, quaternion_to_euler, wrap_angle
if TYPE_CHECKING:
from symaware.extra.ros.roslib import AirSensorOutput, Pose, PoseStamped, Vector3
[docs]
class Controller(BaseController[Tasynclooplock]):
"""
Base class for ROS controllers.
Args
----
agent_id:
Identifier of the agent this component belongs to
async_loop_lock:
Async loop lock to use for the communication sender
"""
def __init__(self, agent_id: Identifier, async_loop_lock: Tasynclooplock = None):
super().__init__(agent_id, async_loop_lock)
self._model: "type[AckermannDriveModel] | type[TwistDriveModel]"
[docs]
def initialise_component(self, agent, initial_awareness_database, initial_knowledge_database):
super().initialise_component(agent, initial_awareness_database, initial_knowledge_database)
self._model = type(self._agent.model)
[docs]
def get_speed_and_steering_angle(self, control_input: np.ndarray) -> "tuple[float, float]":
"""
Extract speed and steering angle from a model-specific control input.
Args
----
control_input:
Control input array from the active drive model.
Returns
-------
Tuple of (speed, steering_angle).
"""
if issubclass(self._model, TwistDriveModel):
return control_input[0], control_input[5]
assert issubclass(self._model, AckermannDriveModel)
return control_input[2], control_input[0]
[docs]
class StraightController(Controller[Tasynclooplock]):
"""
Naive controller that just drives straight forward at a constant speed.
It is compatible with agents using a AckermannDriveModel.
This controller needs the following entries in the knowledge database:
- 'max_speed': Maximum speed the vehicle can reach. Defaults to `default_max_speed`
Args
----
agent_id:
Identifier of the agent this component belongs to
default_max_speed:
Default speed if its value is not present in the max_speed entry of the knowledge database
async_loop_lock:
Async loop lock to use for the communication sender
"""
def __init__(self, agent_id: Identifier, default_max_speed: float, async_loop_lock: Tasynclooplock = None):
super().__init__(agent_id, async_loop_lock)
self._default_max_speed = default_max_speed
[docs]
def _compute(self):
speed = self._agent.self_knowledge.get("max_speed", self._default_max_speed)
return self._to_control_input(speed=speed, steering_angle=0), TimeSeries()
[docs]
class AirStraightController(StraightController[Tasynclooplock]):
"""
Naive controller that just drives straight forward at a constant speed.
If the air sensor finds something in front of the vehicle, the latter will suddenly stop.
It is compatible with agents using a AckermannDriveModel.
This controller needs the following entries in the knowledge database:
- 'max_speed': Maximum speed the vehicle can reach. Defaults to `default_max_speed`
- 'air_sensor_output': information collected from the AIR sensor. Ignored if absent
Args
----
agent_id:
Identifier of the agent this component belongs to
default_max_speed:
Default speed if its value is not present in the max_speed entry of the knowledge database
async_loop_lock:
Async loop lock to use for the communication sender
"""
def __init__(
self,
agent_id: Identifier,
default_max_speed: float,
min_distance: float,
async_loop_lock: Tasynclooplock = None,
):
super().__init__(agent_id, default_max_speed, async_loop_lock)
self._min_distance = min_distance
[docs]
def _compute(self):
distance = 99999
air_sensor: "AirSensorOutput | None" = None
if (air_sensor := self._agent.self_knowledge.get("air_sensor_output", None)) is not None:
distance = air_sensor.range
speed = (
self._agent.self_knowledge.get("max_speed", self._default_max_speed) if distance > self._min_distance else 0
)
return self._to_control_input(speed=speed, steering_angle=0), TimeSeries()
[docs]
class PurePursuitPointController(Controller[Tasynclooplock]):
"""
Pure Pursuit controller that drives the vehicle
to a target point in front of the vehicle.
It is compatible with agents using a AckermannDriveModel.
This controller needs the following entries in the knowledge database:
- 'max_speed': Maximum speed the vehicle can reach. Defaults to `default_max_speed`
Args
----
agent_id:
Identifier of the agent this component belongs to
default_max_speed:
Default speed if its value is not present in the 'max_speed' entry of the knowledge database
wheelbase:
The distance between the front and rear axles of the agent, used for steering calculations.
async_loop_lock:
Async loop lock to use for the communication sender
"""
def __init__(
self,
agent_id: Identifier,
target_position: "tuple[float, float]",
default_max_speed: float = 0,
wheel_base: float = 0.25,
async_loop_lock: "Tasynclooplock| None" = None,
):
super().__init__(agent_id, async_loop_lock)
self._target_position = target_position
self._default_max_speed = default_max_speed
self._wheel_base = wheel_base
[docs]
def _compute(self):
speed = self._agent.self_knowledge.get("max_speed", self._default_max_speed)
tx, ty = self._target_position
x, y = self._agent.self_state[:2]
yaw = wrap_angle(quaternion_to_euler(*self._agent.self_state[3:7])[2])
alpha: float = np.arctan2(ty - y, tx - x) - yaw
steering_angle: float = -np.arctan(2 * self._wheel_base * np.sin(alpha) / dist((x, y), (tx, ty)))
return self._to_control_input(speed=speed, steering_angle=steering_angle), TimeSeries()
[docs]
class PurePursuitPathController(Controller[Tasynclooplock]):
"""
Pure Pursuit controller that drives the vehicle following a path composed by an arbitrary amount of waypoints.
It is compatible with agents using a AckermannDriveModel.
This controller needs the following entries in the knowledge database:
- 'max_speed': Maximum speed the vehicle can reach. Defaults to `default_max_speed`
- 'path': Path to follow. Mandatory
Args
----
agent_id:
Identifier of the agent this component belongs to
default_max_speed:
Default speed if its value is not present in the 'max_speed' entry of the knowledge database
wheel_base:
The distance between the front and rear axles of the agent, used for steering calculations.
waypoint_complete_dist:
Distance threshold to consider a waypoint as passed.
async_loop_lock:
Async loop lock to use for the communication sender
"""
def __init__(
self,
agent_id,
default_max_speed: float = 0,
wheel_base: float = 0.33,
waypoint_complete_dist: float = 0.5,
async_loop_lock=None,
):
super().__init__(agent_id, async_loop_lock)
self._default_max_speed = default_max_speed
self._wheel_base = wheel_base
self._waypoint_complete_dist = waypoint_complete_dist
self._waypoint_idx = 0
@property
def next_waypoint(self) -> "tuple[float, float, float]":
"""
Get the next waypoint as a (x, y, z) tuple.
Returns
-------
Next waypoint position, or (0, 0, 0) when no waypoints remain.
"""
if self._waypoint_idx >= len(self._agent.self_knowledge["path"].poses):
return (0, 0, 0)
waypoint: "PoseStamped | Pose" = self._agent.self_knowledge["path"].poses[self._waypoint_idx]
pose = waypoint.pose if hasattr(waypoint, "pose") else waypoint
tx, ty = pose.position.x, pose.position.y
return (tx, ty, 0)
[docs]
def _compute(self):
speed = self._agent.self_knowledge.get("max_speed", self._default_max_speed)
x, y = self._agent.self_state[:2]
yaw = wrap_angle(quaternion_to_euler(*self._agent.self_state[3:7])[2])
while self._waypoint_idx < len(self._agent.self_knowledge["path"].poses):
waypoint: "PoseStamped | Pose" = self._agent.self_knowledge["path"].poses[self._waypoint_idx]
pose = waypoint.pose if hasattr(waypoint, "pose") else waypoint
tx, ty = pose.position.x, pose.position.y
distance = dist((x, y), (tx, ty))
if distance > self._waypoint_complete_dist:
break
self._waypoint_idx += 1
if self._waypoint_idx >= len(self._agent.self_knowledge["path"].poses):
return self._to_control_input(speed=0, steering_angle=0), TimeSeries()
alpha: float = np.arctan2(ty - y, tx - x) - yaw
steering_angle: float = np.arctan(2 * self._wheel_base * np.sin(alpha) / distance)
return self._to_control_input(speed=speed, steering_angle=steering_angle), TimeSeries()
[docs]
class ACCPurePursuitPathController(PurePursuitPathController):
"""
Pure Pursuit controller that drives the vehicle following a path composed by an arbitrary amount of waypoints.
It also uses Adaptive Cruise Control to avoid collisions if it equipped with an AIR sensor.
It is compatible with agents using a AckermannDriveModel.
This controller needs the following entries in the knowledge database:
- 'max_speed': Maximum speed the vehicle can reach. Defaults to `default_max_speed`
- 'path': Path to follow. Mandatory
- 'air_sensor_output': information collected from the AIR sensor. Ignored if absent
Args
----
agent_id:
Identifier of the agent this component belongs to
dt:
Time step
ilimit:
Upper and lower bound for the integration in the PID controller
Kp:
Coefficient for the proportional part of the PID controller
Ki:
Coefficient for the integration part of the PID controller
Kd:
Coefficient for the derivation part of the PID controller
max_angle:
Maximum angle in radiants from the heading of the car for the obstacle to be properly detected.
min_headway_break:
Distance we want to keep from the obstacles in front of the car.
sensor_linger_time:
Linger time for readings coming from the sensor to still be considered in the computation of the control input. [s]
Readings older than this value will be ignored.
Setting the value too low may cause the controller to drop readings that should be considered.
default_max_speed:
Default speed if its value is not present in the 'max_speed' entry of the knowledge database
wheel_base:
The distance between the front and rear axles of the agent, used for steering calculations.
waypoint_complete_dist:
Distance threshold to consider a waypoint as passed.
async_loop_lock:
Async loop lock to use for the communication sender
"""
def __init__(
self,
agent_id,
dt: float,
ilimit: float = 200,
Kp: float = 1.0,
Ki: float = 0.05,
Kd: float = 0.05,
max_angle: float = np.deg2rad(90) / 3,
min_headway_break: float = 0.7,
min_headway_acc: float = 1.4,
sensor_linger_time: float = 1.0,
emergency_break_distance: float = 0.25,
default_max_speed: float = 0,
wheel_base: float = 0.25,
waypoint_complete_dist: float = 0.5,
async_loop_lock=None,
):
super().__init__(agent_id, default_max_speed, wheel_base, waypoint_complete_dist, async_loop_lock)
self._max_angle = abs(max_angle)
self._min_headway_break = min_headway_break
self._min_headway_acc = min_headway_acc
self._emergency_break_distance = emergency_break_distance
self._Kp = Kp
self._Ki = Ki
self._Kd = Kd
self._dt = dt
self._ilimit = ilimit
self._integ = 0.0
self._prev_meas = 0.0
self._sensor_linger_time = max(sensor_linger_time, 0.0)
[docs]
def _compute(self):
control_input, intent = super()._compute()
# Extract speed and steering angle
desired_speed, steering_angle = self.get_speed_and_steering_angle(control_input)
# Filter cars with the same direction as ego vehicle
air: "AirSensorOutput | None" = self._agent.self_knowledge.get("air_sensor_output", None)
if (
air is None
or (air.header.stamp.sec + self._sensor_linger_time) * 1e9 + air.header.stamp.nanosec
< roslibpy.Time.now().secs * 1e9 + roslibpy.Time.now().nsecs
):
return control_input, intent
dist_target = 999999
if abs(np.deg2rad(air.azimuth)) <= self._max_angle and air.range < dist_target:
dist_target = air.range
if dist_target > self._min_headway_acc: # control speed
return control_input, intent
if dist_target < self._emergency_break_distance:
get_logger("controller", "ACCPurePursuitPathController").info("Emergency break")
return self._to_control_input(speed=0, steering_angle=steering_angle), intent
# control spacing
max_dist_error = self._min_headway_acc - self._min_headway_break
dist_error = np.clip(dist_target - self._min_headway_break, 0, max_dist_error)
assert max_dist_error > 0
assert dist_error <= max_dist_error
speed = np.clip(self._Kp * desired_speed * dist_error / max_dist_error, 0, desired_speed)
return (self._to_control_input(speed=speed, steering_angle=steering_angle), intent)
[docs]
class KnowledgeAwareACCPurePursuitPathController(ACCPurePursuitPathController):
"""
ACC Pure Pursuit controller that adapts speed when entering a school zone.
It uses traffic sign knowledge to detect school zones and
limits the speed accordingly.
"""
def __init__(
self,
agent_id,
dt: float,
ilimit: float = 200,
Kp: float = 3.0,
Ki: float = 0.05,
Kd: float = 0.04,
max_angle: float = np.deg2rad(90) / 3,
min_headway_break: float = 0.7,
min_headway_acc: float = 10.0,
sensor_linger_time: float = 1.0,
emergency_break_distance: float = 0.2,
default_max_speed: float = 0,
wheel_base: float = 0.25,
waypoint_complete_dist: float = 0.5,
async_loop_lock=None,
):
super().__init__(
agent_id,
dt,
ilimit,
Kp,
Ki,
Kd,
max_angle,
min_headway_break,
min_headway_acc,
sensor_linger_time,
emergency_break_distance,
default_max_speed,
wheel_base,
waypoint_complete_dist,
async_loop_lock,
)
self._is_in_school_zone = False
@property
def is_in_school_zone(self):
"""Return True if the agent is currently in a school zone."""
return self._is_in_school_zone
[docs]
def _check_is_school_zone(self):
"""
Update school zone status based on the latest traffic sign knowledge.
"""
sign: "Vector3 | None" = self._agent.self_knowledge["traffic_sign"].get(301, None)
if sign is None or self._is_in_school_zone: # This way it is impossible to leave the school zone
return
if sign.x < 1 and not self._is_in_school_zone:
self._is_in_school_zone = True
self._notify("on_enter_school_zone", self._agent)
[docs]
def _compute(self):
control_input, intent = super()._compute()
self._check_is_school_zone()
if not self._is_in_school_zone:
return control_input, intent
control_input[0] = np.clip(
control_input[0],
-self._agent.self_knowledge["max_school_zone_speed"],
self._agent.self_knowledge["max_school_zone_speed"],
)
return control_input, intent
[docs]
class PIDController(Controller[Tasynclooplock]):
"""
A PID controller for vehicle waypoint navigation.
Tracks error history and supports customizable PID coefficients for both lateral and longitudinal control.
"""
def __init__(
self,
agent_id: int,
max_steering: float = 0.5,
max_throttle: float = 0.8,
lateral_pid: "tuple[float, float, float] | None" = None,
longitudinal_pid: "tuple[float, float, float] | None" = None,
async_loop_lock=None,
):
"""
Initialize the PID controller.
Args
----
dt:
Time step in seconds
max_steering:
Maximum steering magnitude
max_throttle:
Maximum throttle magnitude
lateral_pid:
Tuple of (Kp, Ki, Kd) for lateral (yaw) control
longitudinal_pid:
Tuple of (Kp, Ki, Kd) for longitudinal (speed) control
"""
super().__init__(agent_id=agent_id, async_loop_lock=async_loop_lock)
self.dt = async_loop_lock.time_interval if isinstance(async_loop_lock, TimeIntervalAsyncLoopLock) else 0.04
self.max_steering = max_steering
self.max_throttle = max_throttle
# Lateral (yaw) PID coefficients
self.lateral_kp, self.lateral_ki, self.lateral_kd = lateral_pid or (1.5, 0.0, 0.0)
# Longitudinal (speed) PID coefficients
self.longitudinal_kp, self.longitudinal_ki, self.longitudinal_kd = longitudinal_pid or (0.5, 0.0, 0.0)
# Error history for PID enhancement
self.yaw_error_history = []
self.speed_error_history = []
self.position_error_history = []
self.max_history_length = 100
# Integral accumulation
self.yaw_integral = 0.0
self.speed_integral = 0.0
# Previous errors for derivative calculation
self.prev_yaw_error = 0.0
self.prev_speed_error = 0.0
[docs]
def set_lateral_pid(self, kp: float, ki: float, kd: float):
"""Set lateral (yaw) PID coefficients."""
self.lateral_kp = kp
self.lateral_ki = ki
self.lateral_kd = kd
[docs]
def set_longitudinal_pid(self, kp: float, ki: float, kd: float):
"""Set longitudinal (speed) PID coefficients."""
self.longitudinal_kp = kp
self.longitudinal_ki = ki
self.longitudinal_kd = kd
[docs]
def compute_control(self, target_pos: "tuple[float, float]", target_speed: float) -> "tuple[float, float]":
"""
Compute the control input needed to reach the desired target and speed using PID control.
The state is a 10 dimensional vector containing:
- x position
- y position
- z position (unused, can be ignored)
- x orientation (quaternion)
- y orientation (quaternion)
- z orientation (quaternion)
- w orientation (quaternion)
- x linear velocity
- y linear velocity
- z linear velocity
Args
----
state: The current state of the vehicle.
target_pos: The target position to reach, as a tuple of (x, y) coordinates.
target_speed: The target speed to reach in m/s.
Returns
-------
The control input needed to reach the target position and speed, as (throttle, steering).
"""
state = self._agent.self_state
# Extract current position and velocity
current_pos = np.array(state[:2])
current_vel = np.array(state[7:9])
current_speed = np.linalg.norm(current_vel)
# Extract yaw from quaternion (state[3:7] = [qx, qy, qz, qw])
qx, qy, qz, qw = state[3:7]
sin_yaw = 2.0 * (qw * qz + qx * qy)
cos_yaw = 1.0 - 2.0 * (qy * qy + qz * qz)
current_yaw = math.atan2(sin_yaw, cos_yaw)
# Compute position error
pos_error = target_pos - current_pos
distance_to_target = np.linalg.norm(pos_error)
self._record_error(self.position_error_history, distance_to_target)
# ===== LATERAL (YAW) CONTROL =====
desired_yaw = math.atan2(pos_error[1], pos_error[0])
yaw_error = desired_yaw - current_yaw
yaw_error = math.atan2(math.sin(yaw_error), math.cos(yaw_error)) # Wrap to [-pi, pi]
self._record_error(self.yaw_error_history, yaw_error)
# Proportional term
yaw_p = self.lateral_kp * yaw_error
# Integral term
self.yaw_integral += yaw_error * self.dt
yaw_i = self.lateral_ki * self.yaw_integral
# Derivative term
yaw_d = self.lateral_kd * (yaw_error - self.prev_yaw_error) / self.dt if self.dt > 0 else 0.0
self.prev_yaw_error = yaw_error
steering_command = np.clip(yaw_p + yaw_i + yaw_d, -self.max_steering, self.max_steering)
# ===== LONGITUDINAL (SPEED) CONTROL =====
speed_error = target_speed - current_speed
self._record_error(self.speed_error_history, speed_error)
# Proportional term
speed_p = self.longitudinal_kp * speed_error
# Integral term
self.speed_integral += speed_error * self.dt
speed_i = self.longitudinal_ki * self.speed_integral
# Derivative term
speed_d = self.longitudinal_kd * (speed_error - self.prev_speed_error) / self.dt if self.dt > 0 else 0.0
self.prev_speed_error = speed_error
throttle_command = np.clip(speed_p + speed_i + speed_d, -self.max_throttle, self.max_throttle)
# If very close to target, reduce speed to avoid overshooting
if distance_to_target < 0.5:
throttle_command = min(throttle_command, 0.1)
return (throttle_command, steering_command)
[docs]
def _record_error(self, error_list: "list[float]", error: float):
"""Record an error value in the history, maintaining max length."""
error_list.append(error)
if len(error_list) > self.max_history_length:
error_list.pop(0)
[docs]
def get_error_stats(self) -> "dict[str, dict[str, float]]":
"""Get statistics about error history."""
stats = {}
for name, history in [
("yaw", self.yaw_error_history),
("speed", self.speed_error_history),
("position", self.position_error_history),
]:
if len(history) > 0:
stats[name] = {
"mean": float(np.mean(history)),
"std": float(np.std(history)),
"min": float(np.min(history)),
"max": float(np.max(history)),
"latest": float(history[-1]),
}
else:
stats[name] = {"mean": 0.0, "std": 0.0, "min": 0.0, "max": 0.0, "latest": 0.0}
return stats
[docs]
def reset_history(self):
"""Reset all error history and integral terms."""
self.yaw_error_history.clear()
self.speed_error_history.clear()
self.position_error_history.clear()
self.yaw_integral = 0.0
self.speed_integral = 0.0
self.prev_yaw_error = 0.0
self.prev_speed_error = 0.0
[docs]
class PurePursuitController(Controller[Tasynclooplock]):
"""
A Pure Pursuit controller for vehicle waypoint navigation.
Uses geometric path tracking (lookahead-based steering) for lateral control
and a simple proportional controller for longitudinal (speed) control.
"""
def __init__(
self,
agent_id: int,
wheelbase: float = 0.32,
lookahead_distance: float = 2.0,
min_lookahead: float = 0.5,
max_lookahead: float = 5.0,
lookahead_speed_gain: float = 0.5,
max_steering: float = 0.5,
max_throttle: float = 0.8,
speed_kp: float = 0.5,
speed_ki: float = 0.0,
speed_kd: float = 0.0,
async_loop_lock=None,
):
"""
Initialize the Pure Pursuit controller.
Args
----
agent_id:
Identifier of the agent.
wheelbase:
Distance between front and rear axles in metres (default: 0.32 for F1tenth).
lookahead_distance:
Base lookahead distance in metres (default: 2.0).
min_lookahead:
Minimum lookahead distance in metres (default: 0.5).
max_lookahead:
Maximum lookahead distance in metres (default: 5.0).
lookahead_speed_gain:
Gain that scales speed to adjust the lookahead distance (default: 0.5).
max_steering:
Maximum steering magnitude (default: 0.5).
max_throttle:
Maximum throttle magnitude (default: 0.8).
speed_kp:
Proportional gain for longitudinal (speed) control (default: 0.5).
speed_ki:
Integral gain for longitudinal (speed) control (default: 0.0).
speed_kd:
Derivative gain for longitudinal (speed) control (default: 0.0).
async_loop_lock:
Optional async loop lock.
"""
super().__init__(agent_id=agent_id, async_loop_lock=async_loop_lock)
self.dt = async_loop_lock.time_interval if isinstance(async_loop_lock, TimeIntervalAsyncLoopLock) else 0.04
# Pure Pursuit parameters
self.wheelbase = wheelbase
self.lookahead_distance = lookahead_distance
self.min_lookahead = min_lookahead
self.max_lookahead = max_lookahead
self.lookahead_speed_gain = lookahead_speed_gain
self.max_steering = max_steering
self.max_throttle = max_throttle
# Longitudinal (speed) PID coefficients
self.speed_kp = speed_kp
self.speed_ki = speed_ki
self.speed_kd = speed_kd
# Error history
self.yaw_error_history = []
self.speed_error_history = []
self.position_error_history = []
self.max_history_length = 100
# Longitudinal integral / derivative state
self.speed_integral = 0.0
self.prev_speed_error = 0.0
[docs]
def set_lookahead(self, base: float, min_ld: float, max_ld: float, speed_gain: float):
"""Set Pure Pursuit lookahead parameters."""
self.lookahead_distance = base
self.min_lookahead = min_ld
self.max_lookahead = max_ld
self.lookahead_speed_gain = speed_gain
[docs]
def set_longitudinal_pid(self, kp: float, ki: float, kd: float):
"""Set longitudinal (speed) PID coefficients."""
self.speed_kp = kp
self.speed_ki = ki
self.speed_kd = kd
[docs]
def compute_control(self, target_pos: "tuple[float, float]", target_speed: float) -> "tuple[float, float]":
"""
Compute the control input using the Pure Pursuit algorithm for lateral
control and PID for longitudinal control.
The state is a 10-dimensional vector containing:
- x position
- y position
- z position (unused)
- x orientation (quaternion)
- y orientation (quaternion)
- z orientation (quaternion)
- w orientation (quaternion)
- x linear velocity
- y linear velocity
- z linear velocity
Args
----
target_pos: The target (lookahead) position as (x, y).
target_speed: The desired speed in m/s.
Returns
-------
(throttle, steering) control commands.
"""
state = self._agent.self_state
# Current pose
current_pos = np.array(state[:2])
current_vel = np.array(state[7:9])
current_speed = np.linalg.norm(current_vel)
# Yaw from quaternion
qx, qy, qz, qw = state[3:7]
sin_yaw = 2.0 * (qw * qz + qx * qy)
cos_yaw = 1.0 - 2.0 * (qy * qy + qz * qz)
current_yaw = np.arctan2(sin_yaw, cos_yaw)
# Position error
pos_error = target_pos - current_pos
distance_to_target = np.linalg.norm(pos_error)
self._record_error(self.position_error_history, distance_to_target)
# LATERAL (STEERING) - Pure Pursuit
# Adaptive lookahead distance based on current speed
ld = np.clip(
self.lookahead_distance + self.lookahead_speed_gain * current_speed,
self.min_lookahead,
self.max_lookahead,
)
# Alpha: angle between the vehicle heading and the vector toward
# the target point
desired_yaw = np.arctan2(pos_error[1], pos_error[0])
alpha = desired_yaw - current_yaw
alpha = np.arctan2(np.sin(alpha), np.cos(alpha)) # wrap to [-pi, pi]
self._record_error(self.yaw_error_history, alpha)
# Pure Pursuit steering law:
# curvature = 2 * sin(alpha) / ld
# steering = atan(wheelbase * curvature)
# When the target is closer than ld we use the actual distance so
# the curvature grows appropriately.
effective_ld = min(ld, distance_to_target) if distance_to_target > 1e-6 else ld
curvature = 2.0 * np.sin(alpha) / effective_ld
steering_command = np.arctan(self.wheelbase * curvature)
steering_command = float(np.clip(steering_command, -self.max_steering, self.max_steering))
# LONGITUDINAL (SPEED) — PID
speed_error = target_speed - current_speed
self._record_error(self.speed_error_history, speed_error)
speed_p = self.speed_kp * speed_error
self.speed_integral += speed_error * self.dt
speed_i = self.speed_ki * self.speed_integral
speed_d = self.speed_kd * (speed_error - self.prev_speed_error) / self.dt if self.dt > 0 else 0.0
self.prev_speed_error = speed_error
throttle_command = float(np.clip(speed_p + speed_i + speed_d, -self.max_throttle, self.max_throttle))
# Slow down when very close to the target to avoid overshooting
if distance_to_target < 0.5:
throttle_command = min(throttle_command, 0.1)
return (throttle_command, steering_command)
[docs]
def _record_error(self, error_list: "list[float]", error: float):
"""Record an error value in the history, maintaining max length."""
error_list.append(error)
if len(error_list) > self.max_history_length:
error_list.pop(0)
[docs]
def get_error_stats(self) -> "dict[str, dict[str, float]]":
"""Get statistics about error history."""
stats = {}
for name, history in [
("yaw", self.yaw_error_history),
("speed", self.speed_error_history),
("position", self.position_error_history),
]:
if len(history) > 0:
stats[name] = {
"mean": float(np.mean(history)),
"std": float(np.std(history)),
"min": float(np.min(history)),
"max": float(np.max(history)),
"latest": float(history[-1]),
}
else:
stats[name] = {"mean": 0.0, "std": 0.0, "min": 0.0, "max": 0.0, "latest": 0.0}
return stats
[docs]
def reset_history(self):
"""Reset all error history and integral terms."""
self.yaw_error_history.clear()
self.speed_error_history.clear()
self.position_error_history.clear()
self.speed_integral = 0.0
self.prev_speed_error = 0.0