Source code for symaware.extra.ros.components.controller

from typing import TYPE_CHECKING
import math

import numpy as np
import roslibpy
from symaware.base import Controller as BaseController
from symaware.base import (
    Identifier,
    Tasynclooplock,
    TimeIntervalAsyncLoopLock,
    TimeSeries,
    get_logger,
)

from symaware.extra.ros.models import AckermannDriveModel, TwistDriveModel
from symaware.extra.ros.utils import dist, quaternion_to_euler, wrap_angle

if TYPE_CHECKING:
    from symaware.extra.ros.roslib import AirSensorOutput, Pose, PoseStamped, Vector3


[docs] class Controller(BaseController[Tasynclooplock]): """ Base class for ROS controllers. Args ---- agent_id: Identifier of the agent this component belongs to async_loop_lock: Async loop lock to use for the communication sender """ def __init__(self, agent_id: Identifier, async_loop_lock: Tasynclooplock = None): super().__init__(agent_id, async_loop_lock) self._model: "type[AckermannDriveModel] | type[TwistDriveModel]"
[docs] def initialise_component(self, agent, initial_awareness_database, initial_knowledge_database): super().initialise_component(agent, initial_awareness_database, initial_knowledge_database) self._model = type(self._agent.model)
[docs] def _to_control_input(self, speed: float, steering_angle: float): """ Convert speed and steering angle into a model-specific control input. Args ---- speed: Desired longitudinal speed. steering_angle: Desired steering angle. Returns ------- Control input array for the active drive model. """ if issubclass(self._model, TwistDriveModel): return TwistDriveModel.control_input_to_array(linear_x=speed, angular_z=steering_angle) assert issubclass(self._model, AckermannDriveModel) return AckermannDriveModel.control_input_to_array(speed=speed, steering_angle=steering_angle)
[docs] def get_speed_and_steering_angle(self, control_input: np.ndarray) -> "tuple[float, float]": """ Extract speed and steering angle from a model-specific control input. Args ---- control_input: Control input array from the active drive model. Returns ------- Tuple of (speed, steering_angle). """ if issubclass(self._model, TwistDriveModel): return control_input[0], control_input[5] assert issubclass(self._model, AckermannDriveModel) return control_input[2], control_input[0]
[docs] class StraightController(Controller[Tasynclooplock]): """ Naive controller that just drives straight forward at a constant speed. It is compatible with agents using a AckermannDriveModel. This controller needs the following entries in the knowledge database: - 'max_speed': Maximum speed the vehicle can reach. Defaults to `default_max_speed` Args ---- agent_id: Identifier of the agent this component belongs to default_max_speed: Default speed if its value is not present in the max_speed entry of the knowledge database async_loop_lock: Async loop lock to use for the communication sender """ def __init__(self, agent_id: Identifier, default_max_speed: float, async_loop_lock: Tasynclooplock = None): super().__init__(agent_id, async_loop_lock) self._default_max_speed = default_max_speed
[docs] def _compute(self): speed = self._agent.self_knowledge.get("max_speed", self._default_max_speed) return self._to_control_input(speed=speed, steering_angle=0), TimeSeries()
[docs] class AirStraightController(StraightController[Tasynclooplock]): """ Naive controller that just drives straight forward at a constant speed. If the air sensor finds something in front of the vehicle, the latter will suddenly stop. It is compatible with agents using a AckermannDriveModel. This controller needs the following entries in the knowledge database: - 'max_speed': Maximum speed the vehicle can reach. Defaults to `default_max_speed` - 'air_sensor_output': information collected from the AIR sensor. Ignored if absent Args ---- agent_id: Identifier of the agent this component belongs to default_max_speed: Default speed if its value is not present in the max_speed entry of the knowledge database async_loop_lock: Async loop lock to use for the communication sender """ def __init__( self, agent_id: Identifier, default_max_speed: float, min_distance: float, async_loop_lock: Tasynclooplock = None, ): super().__init__(agent_id, default_max_speed, async_loop_lock) self._min_distance = min_distance
[docs] def _compute(self): distance = 99999 air_sensor: "AirSensorOutput | None" = None if (air_sensor := self._agent.self_knowledge.get("air_sensor_output", None)) is not None: distance = air_sensor.range speed = ( self._agent.self_knowledge.get("max_speed", self._default_max_speed) if distance > self._min_distance else 0 ) return self._to_control_input(speed=speed, steering_angle=0), TimeSeries()
[docs] class PurePursuitPointController(Controller[Tasynclooplock]): """ Pure Pursuit controller that drives the vehicle to a target point in front of the vehicle. It is compatible with agents using a AckermannDriveModel. This controller needs the following entries in the knowledge database: - 'max_speed': Maximum speed the vehicle can reach. Defaults to `default_max_speed` Args ---- agent_id: Identifier of the agent this component belongs to default_max_speed: Default speed if its value is not present in the 'max_speed' entry of the knowledge database wheelbase: The distance between the front and rear axles of the agent, used for steering calculations. async_loop_lock: Async loop lock to use for the communication sender """ def __init__( self, agent_id: Identifier, target_position: "tuple[float, float]", default_max_speed: float = 0, wheel_base: float = 0.25, async_loop_lock: "Tasynclooplock| None" = None, ): super().__init__(agent_id, async_loop_lock) self._target_position = target_position self._default_max_speed = default_max_speed self._wheel_base = wheel_base
[docs] def _compute(self): speed = self._agent.self_knowledge.get("max_speed", self._default_max_speed) tx, ty = self._target_position x, y = self._agent.self_state[:2] yaw = wrap_angle(quaternion_to_euler(*self._agent.self_state[3:7])[2]) alpha: float = np.arctan2(ty - y, tx - x) - yaw steering_angle: float = -np.arctan(2 * self._wheel_base * np.sin(alpha) / dist((x, y), (tx, ty))) return self._to_control_input(speed=speed, steering_angle=steering_angle), TimeSeries()
[docs] class PurePursuitPathController(Controller[Tasynclooplock]): """ Pure Pursuit controller that drives the vehicle following a path composed by an arbitrary amount of waypoints. It is compatible with agents using a AckermannDriveModel. This controller needs the following entries in the knowledge database: - 'max_speed': Maximum speed the vehicle can reach. Defaults to `default_max_speed` - 'path': Path to follow. Mandatory Args ---- agent_id: Identifier of the agent this component belongs to default_max_speed: Default speed if its value is not present in the 'max_speed' entry of the knowledge database wheel_base: The distance between the front and rear axles of the agent, used for steering calculations. waypoint_complete_dist: Distance threshold to consider a waypoint as passed. async_loop_lock: Async loop lock to use for the communication sender """ def __init__( self, agent_id, default_max_speed: float = 0, wheel_base: float = 0.33, waypoint_complete_dist: float = 0.5, async_loop_lock=None, ): super().__init__(agent_id, async_loop_lock) self._default_max_speed = default_max_speed self._wheel_base = wheel_base self._waypoint_complete_dist = waypoint_complete_dist self._waypoint_idx = 0 @property def next_waypoint(self) -> "tuple[float, float, float]": """ Get the next waypoint as a (x, y, z) tuple. Returns ------- Next waypoint position, or (0, 0, 0) when no waypoints remain. """ if self._waypoint_idx >= len(self._agent.self_knowledge["path"].poses): return (0, 0, 0) waypoint: "PoseStamped | Pose" = self._agent.self_knowledge["path"].poses[self._waypoint_idx] pose = waypoint.pose if hasattr(waypoint, "pose") else waypoint tx, ty = pose.position.x, pose.position.y return (tx, ty, 0)
[docs] def _compute(self): speed = self._agent.self_knowledge.get("max_speed", self._default_max_speed) x, y = self._agent.self_state[:2] yaw = wrap_angle(quaternion_to_euler(*self._agent.self_state[3:7])[2]) while self._waypoint_idx < len(self._agent.self_knowledge["path"].poses): waypoint: "PoseStamped | Pose" = self._agent.self_knowledge["path"].poses[self._waypoint_idx] pose = waypoint.pose if hasattr(waypoint, "pose") else waypoint tx, ty = pose.position.x, pose.position.y distance = dist((x, y), (tx, ty)) if distance > self._waypoint_complete_dist: break self._waypoint_idx += 1 if self._waypoint_idx >= len(self._agent.self_knowledge["path"].poses): return self._to_control_input(speed=0, steering_angle=0), TimeSeries() alpha: float = np.arctan2(ty - y, tx - x) - yaw steering_angle: float = np.arctan(2 * self._wheel_base * np.sin(alpha) / distance) return self._to_control_input(speed=speed, steering_angle=steering_angle), TimeSeries()
[docs] class ACCPurePursuitPathController(PurePursuitPathController): """ Pure Pursuit controller that drives the vehicle following a path composed by an arbitrary amount of waypoints. It also uses Adaptive Cruise Control to avoid collisions if it equipped with an AIR sensor. It is compatible with agents using a AckermannDriveModel. This controller needs the following entries in the knowledge database: - 'max_speed': Maximum speed the vehicle can reach. Defaults to `default_max_speed` - 'path': Path to follow. Mandatory - 'air_sensor_output': information collected from the AIR sensor. Ignored if absent Args ---- agent_id: Identifier of the agent this component belongs to dt: Time step ilimit: Upper and lower bound for the integration in the PID controller Kp: Coefficient for the proportional part of the PID controller Ki: Coefficient for the integration part of the PID controller Kd: Coefficient for the derivation part of the PID controller max_angle: Maximum angle in radiants from the heading of the car for the obstacle to be properly detected. min_headway_break: Distance we want to keep from the obstacles in front of the car. sensor_linger_time: Linger time for readings coming from the sensor to still be considered in the computation of the control input. [s] Readings older than this value will be ignored. Setting the value too low may cause the controller to drop readings that should be considered. default_max_speed: Default speed if its value is not present in the 'max_speed' entry of the knowledge database wheel_base: The distance between the front and rear axles of the agent, used for steering calculations. waypoint_complete_dist: Distance threshold to consider a waypoint as passed. async_loop_lock: Async loop lock to use for the communication sender """ def __init__( self, agent_id, dt: float, ilimit: float = 200, Kp: float = 1.0, Ki: float = 0.05, Kd: float = 0.05, max_angle: float = np.deg2rad(90) / 3, min_headway_break: float = 0.7, min_headway_acc: float = 1.4, sensor_linger_time: float = 1.0, emergency_break_distance: float = 0.25, default_max_speed: float = 0, wheel_base: float = 0.25, waypoint_complete_dist: float = 0.5, async_loop_lock=None, ): super().__init__(agent_id, default_max_speed, wheel_base, waypoint_complete_dist, async_loop_lock) self._max_angle = abs(max_angle) self._min_headway_break = min_headway_break self._min_headway_acc = min_headway_acc self._emergency_break_distance = emergency_break_distance self._Kp = Kp self._Ki = Ki self._Kd = Kd self._dt = dt self._ilimit = ilimit self._integ = 0.0 self._prev_meas = 0.0 self._sensor_linger_time = max(sensor_linger_time, 0.0)
[docs] def _compute(self): control_input, intent = super()._compute() # Extract speed and steering angle desired_speed, steering_angle = self.get_speed_and_steering_angle(control_input) # Filter cars with the same direction as ego vehicle air: "AirSensorOutput | None" = self._agent.self_knowledge.get("air_sensor_output", None) if ( air is None or (air.header.stamp.sec + self._sensor_linger_time) * 1e9 + air.header.stamp.nanosec < roslibpy.Time.now().secs * 1e9 + roslibpy.Time.now().nsecs ): return control_input, intent dist_target = 999999 if abs(np.deg2rad(air.azimuth)) <= self._max_angle and air.range < dist_target: dist_target = air.range if dist_target > self._min_headway_acc: # control speed return control_input, intent if dist_target < self._emergency_break_distance: get_logger("controller", "ACCPurePursuitPathController").info("Emergency break") return self._to_control_input(speed=0, steering_angle=steering_angle), intent # control spacing max_dist_error = self._min_headway_acc - self._min_headway_break dist_error = np.clip(dist_target - self._min_headway_break, 0, max_dist_error) assert max_dist_error > 0 assert dist_error <= max_dist_error speed = np.clip(self._Kp * desired_speed * dist_error / max_dist_error, 0, desired_speed) return (self._to_control_input(speed=speed, steering_angle=steering_angle), intent)
[docs] class KnowledgeAwareACCPurePursuitPathController(ACCPurePursuitPathController): """ ACC Pure Pursuit controller that adapts speed when entering a school zone. It uses traffic sign knowledge to detect school zones and limits the speed accordingly. """ def __init__( self, agent_id, dt: float, ilimit: float = 200, Kp: float = 3.0, Ki: float = 0.05, Kd: float = 0.04, max_angle: float = np.deg2rad(90) / 3, min_headway_break: float = 0.7, min_headway_acc: float = 10.0, sensor_linger_time: float = 1.0, emergency_break_distance: float = 0.2, default_max_speed: float = 0, wheel_base: float = 0.25, waypoint_complete_dist: float = 0.5, async_loop_lock=None, ): super().__init__( agent_id, dt, ilimit, Kp, Ki, Kd, max_angle, min_headway_break, min_headway_acc, sensor_linger_time, emergency_break_distance, default_max_speed, wheel_base, waypoint_complete_dist, async_loop_lock, ) self._is_in_school_zone = False @property def is_in_school_zone(self): """Return True if the agent is currently in a school zone.""" return self._is_in_school_zone
[docs] def _check_is_school_zone(self): """ Update school zone status based on the latest traffic sign knowledge. """ sign: "Vector3 | None" = self._agent.self_knowledge["traffic_sign"].get(301, None) if sign is None or self._is_in_school_zone: # This way it is impossible to leave the school zone return if sign.x < 1 and not self._is_in_school_zone: self._is_in_school_zone = True self._notify("on_enter_school_zone", self._agent)
[docs] def _compute(self): control_input, intent = super()._compute() self._check_is_school_zone() if not self._is_in_school_zone: return control_input, intent control_input[0] = np.clip( control_input[0], -self._agent.self_knowledge["max_school_zone_speed"], self._agent.self_knowledge["max_school_zone_speed"], ) return control_input, intent
[docs] class PIDController(Controller[Tasynclooplock]): """ A PID controller for vehicle waypoint navigation. Tracks error history and supports customizable PID coefficients for both lateral and longitudinal control. """ def __init__( self, agent_id: int, max_steering: float = 0.5, max_throttle: float = 0.8, lateral_pid: "tuple[float, float, float] | None" = None, longitudinal_pid: "tuple[float, float, float] | None" = None, async_loop_lock=None, ): """ Initialize the PID controller. Args ---- dt: Time step in seconds max_steering: Maximum steering magnitude max_throttle: Maximum throttle magnitude lateral_pid: Tuple of (Kp, Ki, Kd) for lateral (yaw) control longitudinal_pid: Tuple of (Kp, Ki, Kd) for longitudinal (speed) control """ super().__init__(agent_id=agent_id, async_loop_lock=async_loop_lock) self.dt = async_loop_lock.time_interval if isinstance(async_loop_lock, TimeIntervalAsyncLoopLock) else 0.04 self.max_steering = max_steering self.max_throttle = max_throttle # Lateral (yaw) PID coefficients self.lateral_kp, self.lateral_ki, self.lateral_kd = lateral_pid or (1.5, 0.0, 0.0) # Longitudinal (speed) PID coefficients self.longitudinal_kp, self.longitudinal_ki, self.longitudinal_kd = longitudinal_pid or (0.5, 0.0, 0.0) # Error history for PID enhancement self.yaw_error_history = [] self.speed_error_history = [] self.position_error_history = [] self.max_history_length = 100 # Integral accumulation self.yaw_integral = 0.0 self.speed_integral = 0.0 # Previous errors for derivative calculation self.prev_yaw_error = 0.0 self.prev_speed_error = 0.0
[docs] def set_lateral_pid(self, kp: float, ki: float, kd: float): """Set lateral (yaw) PID coefficients.""" self.lateral_kp = kp self.lateral_ki = ki self.lateral_kd = kd
[docs] def set_longitudinal_pid(self, kp: float, ki: float, kd: float): """Set longitudinal (speed) PID coefficients.""" self.longitudinal_kp = kp self.longitudinal_ki = ki self.longitudinal_kd = kd
[docs] def compute_control(self, target_pos: "tuple[float, float]", target_speed: float) -> "tuple[float, float]": """ Compute the control input needed to reach the desired target and speed using PID control. The state is a 10 dimensional vector containing: - x position - y position - z position (unused, can be ignored) - x orientation (quaternion) - y orientation (quaternion) - z orientation (quaternion) - w orientation (quaternion) - x linear velocity - y linear velocity - z linear velocity Args ---- state: The current state of the vehicle. target_pos: The target position to reach, as a tuple of (x, y) coordinates. target_speed: The target speed to reach in m/s. Returns ------- The control input needed to reach the target position and speed, as (throttle, steering). """ state = self._agent.self_state # Extract current position and velocity current_pos = np.array(state[:2]) current_vel = np.array(state[7:9]) current_speed = np.linalg.norm(current_vel) # Extract yaw from quaternion (state[3:7] = [qx, qy, qz, qw]) qx, qy, qz, qw = state[3:7] sin_yaw = 2.0 * (qw * qz + qx * qy) cos_yaw = 1.0 - 2.0 * (qy * qy + qz * qz) current_yaw = math.atan2(sin_yaw, cos_yaw) # Compute position error pos_error = target_pos - current_pos distance_to_target = np.linalg.norm(pos_error) self._record_error(self.position_error_history, distance_to_target) # ===== LATERAL (YAW) CONTROL ===== desired_yaw = math.atan2(pos_error[1], pos_error[0]) yaw_error = desired_yaw - current_yaw yaw_error = math.atan2(math.sin(yaw_error), math.cos(yaw_error)) # Wrap to [-pi, pi] self._record_error(self.yaw_error_history, yaw_error) # Proportional term yaw_p = self.lateral_kp * yaw_error # Integral term self.yaw_integral += yaw_error * self.dt yaw_i = self.lateral_ki * self.yaw_integral # Derivative term yaw_d = self.lateral_kd * (yaw_error - self.prev_yaw_error) / self.dt if self.dt > 0 else 0.0 self.prev_yaw_error = yaw_error steering_command = np.clip(yaw_p + yaw_i + yaw_d, -self.max_steering, self.max_steering) # ===== LONGITUDINAL (SPEED) CONTROL ===== speed_error = target_speed - current_speed self._record_error(self.speed_error_history, speed_error) # Proportional term speed_p = self.longitudinal_kp * speed_error # Integral term self.speed_integral += speed_error * self.dt speed_i = self.longitudinal_ki * self.speed_integral # Derivative term speed_d = self.longitudinal_kd * (speed_error - self.prev_speed_error) / self.dt if self.dt > 0 else 0.0 self.prev_speed_error = speed_error throttle_command = np.clip(speed_p + speed_i + speed_d, -self.max_throttle, self.max_throttle) # If very close to target, reduce speed to avoid overshooting if distance_to_target < 0.5: throttle_command = min(throttle_command, 0.1) return (throttle_command, steering_command)
[docs] def _record_error(self, error_list: "list[float]", error: float): """Record an error value in the history, maintaining max length.""" error_list.append(error) if len(error_list) > self.max_history_length: error_list.pop(0)
[docs] def get_error_stats(self) -> "dict[str, dict[str, float]]": """Get statistics about error history.""" stats = {} for name, history in [ ("yaw", self.yaw_error_history), ("speed", self.speed_error_history), ("position", self.position_error_history), ]: if len(history) > 0: stats[name] = { "mean": float(np.mean(history)), "std": float(np.std(history)), "min": float(np.min(history)), "max": float(np.max(history)), "latest": float(history[-1]), } else: stats[name] = {"mean": 0.0, "std": 0.0, "min": 0.0, "max": 0.0, "latest": 0.0} return stats
[docs] def reset_history(self): """Reset all error history and integral terms.""" self.yaw_error_history.clear() self.speed_error_history.clear() self.position_error_history.clear() self.yaw_integral = 0.0 self.speed_integral = 0.0 self.prev_yaw_error = 0.0 self.prev_speed_error = 0.0
[docs] class PurePursuitController(Controller[Tasynclooplock]): """ A Pure Pursuit controller for vehicle waypoint navigation. Uses geometric path tracking (lookahead-based steering) for lateral control and a simple proportional controller for longitudinal (speed) control. """ def __init__( self, agent_id: int, wheelbase: float = 0.32, lookahead_distance: float = 2.0, min_lookahead: float = 0.5, max_lookahead: float = 5.0, lookahead_speed_gain: float = 0.5, max_steering: float = 0.5, max_throttle: float = 0.8, speed_kp: float = 0.5, speed_ki: float = 0.0, speed_kd: float = 0.0, async_loop_lock=None, ): """ Initialize the Pure Pursuit controller. Args ---- agent_id: Identifier of the agent. wheelbase: Distance between front and rear axles in metres (default: 0.32 for F1tenth). lookahead_distance: Base lookahead distance in metres (default: 2.0). min_lookahead: Minimum lookahead distance in metres (default: 0.5). max_lookahead: Maximum lookahead distance in metres (default: 5.0). lookahead_speed_gain: Gain that scales speed to adjust the lookahead distance (default: 0.5). max_steering: Maximum steering magnitude (default: 0.5). max_throttle: Maximum throttle magnitude (default: 0.8). speed_kp: Proportional gain for longitudinal (speed) control (default: 0.5). speed_ki: Integral gain for longitudinal (speed) control (default: 0.0). speed_kd: Derivative gain for longitudinal (speed) control (default: 0.0). async_loop_lock: Optional async loop lock. """ super().__init__(agent_id=agent_id, async_loop_lock=async_loop_lock) self.dt = async_loop_lock.time_interval if isinstance(async_loop_lock, TimeIntervalAsyncLoopLock) else 0.04 # Pure Pursuit parameters self.wheelbase = wheelbase self.lookahead_distance = lookahead_distance self.min_lookahead = min_lookahead self.max_lookahead = max_lookahead self.lookahead_speed_gain = lookahead_speed_gain self.max_steering = max_steering self.max_throttle = max_throttle # Longitudinal (speed) PID coefficients self.speed_kp = speed_kp self.speed_ki = speed_ki self.speed_kd = speed_kd # Error history self.yaw_error_history = [] self.speed_error_history = [] self.position_error_history = [] self.max_history_length = 100 # Longitudinal integral / derivative state self.speed_integral = 0.0 self.prev_speed_error = 0.0
[docs] def set_lookahead(self, base: float, min_ld: float, max_ld: float, speed_gain: float): """Set Pure Pursuit lookahead parameters.""" self.lookahead_distance = base self.min_lookahead = min_ld self.max_lookahead = max_ld self.lookahead_speed_gain = speed_gain
[docs] def set_longitudinal_pid(self, kp: float, ki: float, kd: float): """Set longitudinal (speed) PID coefficients.""" self.speed_kp = kp self.speed_ki = ki self.speed_kd = kd
[docs] def compute_control(self, target_pos: "tuple[float, float]", target_speed: float) -> "tuple[float, float]": """ Compute the control input using the Pure Pursuit algorithm for lateral control and PID for longitudinal control. The state is a 10-dimensional vector containing: - x position - y position - z position (unused) - x orientation (quaternion) - y orientation (quaternion) - z orientation (quaternion) - w orientation (quaternion) - x linear velocity - y linear velocity - z linear velocity Args ---- target_pos: The target (lookahead) position as (x, y). target_speed: The desired speed in m/s. Returns ------- (throttle, steering) control commands. """ state = self._agent.self_state # Current pose current_pos = np.array(state[:2]) current_vel = np.array(state[7:9]) current_speed = np.linalg.norm(current_vel) # Yaw from quaternion qx, qy, qz, qw = state[3:7] sin_yaw = 2.0 * (qw * qz + qx * qy) cos_yaw = 1.0 - 2.0 * (qy * qy + qz * qz) current_yaw = np.arctan2(sin_yaw, cos_yaw) # Position error pos_error = target_pos - current_pos distance_to_target = np.linalg.norm(pos_error) self._record_error(self.position_error_history, distance_to_target) # LATERAL (STEERING) - Pure Pursuit # Adaptive lookahead distance based on current speed ld = np.clip( self.lookahead_distance + self.lookahead_speed_gain * current_speed, self.min_lookahead, self.max_lookahead, ) # Alpha: angle between the vehicle heading and the vector toward # the target point desired_yaw = np.arctan2(pos_error[1], pos_error[0]) alpha = desired_yaw - current_yaw alpha = np.arctan2(np.sin(alpha), np.cos(alpha)) # wrap to [-pi, pi] self._record_error(self.yaw_error_history, alpha) # Pure Pursuit steering law: # curvature = 2 * sin(alpha) / ld # steering = atan(wheelbase * curvature) # When the target is closer than ld we use the actual distance so # the curvature grows appropriately. effective_ld = min(ld, distance_to_target) if distance_to_target > 1e-6 else ld curvature = 2.0 * np.sin(alpha) / effective_ld steering_command = np.arctan(self.wheelbase * curvature) steering_command = float(np.clip(steering_command, -self.max_steering, self.max_steering)) # LONGITUDINAL (SPEED) — PID speed_error = target_speed - current_speed self._record_error(self.speed_error_history, speed_error) speed_p = self.speed_kp * speed_error self.speed_integral += speed_error * self.dt speed_i = self.speed_ki * self.speed_integral speed_d = self.speed_kd * (speed_error - self.prev_speed_error) / self.dt if self.dt > 0 else 0.0 self.prev_speed_error = speed_error throttle_command = float(np.clip(speed_p + speed_i + speed_d, -self.max_throttle, self.max_throttle)) # Slow down when very close to the target to avoid overshooting if distance_to_target < 0.5: throttle_command = min(throttle_command, 0.1) return (throttle_command, steering_command)
[docs] def _record_error(self, error_list: "list[float]", error: float): """Record an error value in the history, maintaining max length.""" error_list.append(error) if len(error_list) > self.max_history_length: error_list.pop(0)
[docs] def get_error_stats(self) -> "dict[str, dict[str, float]]": """Get statistics about error history.""" stats = {} for name, history in [ ("yaw", self.yaw_error_history), ("speed", self.speed_error_history), ("position", self.position_error_history), ]: if len(history) > 0: stats[name] = { "mean": float(np.mean(history)), "std": float(np.std(history)), "min": float(np.min(history)), "max": float(np.max(history)), "latest": float(history[-1]), } else: stats[name] = {"mean": 0.0, "std": 0.0, "min": 0.0, "max": 0.0, "latest": 0.0} return stats
[docs] def reset_history(self): """Reset all error history and integral terms.""" self.yaw_error_history.clear() self.speed_error_history.clear() self.position_error_history.clear() self.speed_integral = 0.0 self.prev_speed_error = 0.0