Source code for symaware.simulators.carla.controllers

import math
import os
from collections import deque
from typing import TYPE_CHECKING

import carla
import casadi as ca
import numpy as np
from scipy.spatial.transform import Rotation
from symaware.extra.ros import AckermannDriveModel, TwistDriveModel

from symaware.base import Agent
from symaware.base import Controller as BaseController
from symaware.base import TimeIntervalAsyncLoopLock, TimeSeries, get_logger

from .abstraction import GridMap, PathGridAbstraction
from .dynamical_model import VehicleControlModel
from .ltl import DFA
from .risk import VelocityLPRiskLTL

if TYPE_CHECKING:
    from typing import Callable, TypedDict

    from symaware.base import MultiAgentKnowledgeDatabase

    from .abstraction import SpeedAction, SteerAction

    class PathGridAbstractionKnowledgeDatabase(TypedDict):
        max_speed: float
        speed_res: float
        label_func: "Callable[[int, PathGridAbstraction], str]"
        cost_func: "Callable[[int, str, PathGridAbstraction], float]"
        safety: "DFA | str"
        cosafety: "DFA | str"


[docs] class GridAbstractionController(BaseController): def __init__(self, agent_id: int, grid_map: GridMap, policy_file: str = "", async_loop_lock=None): super().__init__(agent_id, async_loop_lock)
[docs] def initialise_component(self, agent, initial_awareness_database, initial_knowledge_database): super().initialise_component(agent, initial_awareness_database, initial_knowledge_database)
[docs] class CarlaController(BaseController): def __init__(self, agent_id: int, async_loop_lock=None): super().__init__(agent_id, async_loop_lock) self._model: "type[AckermannDriveModel] | type[TwistDriveModel] | type[VehicleControlModel]"
[docs] def initialise_component(self, agent, initial_awareness_database, initial_knowledge_database): super().initialise_component(agent, initial_awareness_database, initial_knowledge_database) self._model = type(self._agent.model)
[docs] def _to_control_input(self, speed: float, steering_angle: float): if issubclass(self._model, TwistDriveModel): return TwistDriveModel.control_input_to_array(linear_x=speed, angular_z=steering_angle) if issubclass(self._model, VehicleControlModel): return VehicleControlModel.control_input_to_array(throttle=speed, brake=0.0, steer=steering_angle) assert issubclass(self._model, AckermannDriveModel) return AckermannDriveModel.control_input_to_array(speed=speed, steering_angle=steering_angle)
[docs] class VehiclePIDController(CarlaController): """ VehiclePIDController is the combination of two PID controllers (lateral and longitudinal) to perform the low level control a vehicle from client side """ def __init__( self, agent_id: int, args_lateral={"K_P": 2.0, "K_D": 0.0, "K_I": 0.0}, args_longitudinal={"K_P": 2.0, "K_D": 0.0, "K_I": 0.0}, offset=0, max_throttle=0.75, max_brake=0.3, max_steering=0.8, async_loop_lock=None, ): """ Constructor method. :param vehicle: actor to apply to local planner logic onto :param args_lateral: dictionary of arguments to set the lateral PID controller using the following semantics: K_P -- Proportional term K_D -- Differential term K_I -- Integral term :param args_longitudinal: dictionary of arguments to set the longitudinal PID controller using the following semantics: K_P -- Proportional term K_D -- Differential term K_I -- Integral term :param offset: If different than zero, the vehicle will drive displaced from the center line. Positive values imply a right offset while negative ones mean a left one. Numbers high enough to cause the vehicle to drive through other lanes might break the controller. """ super().__init__(agent_id, async_loop_lock) self.max_brake = max_brake self.max_throt = max_throttle self.max_steer = max_steering self.past_steering = 0 dt = async_loop_lock.time_interval if isinstance(async_loop_lock, TimeIntervalAsyncLoopLock) else 0.1 self._lon_controller = PIDLongitudinalController(dt=dt, **args_longitudinal) self._lat_controller = PIDLateralController(offset, dt=dt, **args_lateral)
[docs] def initialise_component(self, agent, initial_awareness_database, initial_knowledge_database): super().initialise_component(agent, initial_awareness_database, initial_knowledge_database) self.past_steering = 0 self._lon_controller._agent = self._agent self._lat_controller._agent = self._agent
[docs] def _compute(self): target_speed = self._agent.self_knowledge["target_speed"] waypoint = self._agent.self_knowledge["target"] cmd = self.run_step(target_speed, waypoint) return ( self._to_control_input(speed=cmd.throttle - cmd.brake, steering_angle=cmd.steer), TimeSeries(), )
[docs] def run_step(self, target_speed: float, waypoint: "tuple[float, float]"): """ Execute one step of control invoking both lateral and longitudinal PID controllers to reach a target waypoint at a given target_speed. :param target_speed: desired vehicle speed :param waypoint: target location encoded as a waypoint :return: distance (in meters) to the waypoint """ acceleration = self._lon_controller.run_step(target_speed) current_steering = self._lat_controller.run_step(waypoint) control = carla.VehicleControl() if acceleration >= 0.0: control.throttle = min(acceleration, self.max_throt) control.brake = 0.0 else: control.throttle = 0.0 control.brake = min(abs(acceleration), self.max_brake) # Steering regulation: changes cannot happen abruptly, can't steer too much. if current_steering > self.past_steering + 0.1: current_steering = self.past_steering + 0.1 elif current_steering < self.past_steering - 0.1: current_steering = self.past_steering - 0.1 if current_steering >= 0: steering = min(self.max_steer, current_steering) else: steering = max(-self.max_steer, current_steering) control.steer = steering control.hand_brake = False control.manual_gear_shift = False self.past_steering = steering return control
[docs] def change_longitudinal_PID(self, args_longitudinal): """Changes the parameters of the PIDLongitudinalController""" self._lon_controller.change_parameters(**args_longitudinal)
[docs] def change_lateral_PID(self, args_lateral): """Changes the parameters of the PIDLateralController""" self._lat_controller.change_parameters(**args_lateral)
[docs] def set_offset(self, offset): """Changes the offset""" self._lat_controller.set_offset(offset)
[docs] class PIDLongitudinalController: """ PIDLongitudinalController implements longitudinal control using a PID. """ def __init__(self, K_P=1.0, K_I=0.1, K_D=0.1, dt=0.03): """ Constructor method. :param vehicle: actor to apply to local planner logic onto :param K_P: Proportional term :param K_D: Differential term :param K_I: Integral term :param dt: time differential in seconds """ self._agent: Agent self._k_p = K_P self._k_i = K_I self._k_d = K_D self._dt = dt self._error_buffer = deque(maxlen=10)
[docs] def set_agent(self, agent): self._agent = agent
[docs] def run_step(self, target_speed: float, debug: bool = False): """ Execute one step of longitudinal control to reach a given target speed. :param target_speed: target speed in Km/h :param debug: boolean for debugging :return: throttle control """ vel = self._agent.self_state[7:9] speed = math.sqrt(vel[0] ** 2 + vel[1] ** 2) if debug: print("Current speed = {}".format(speed)) return self._pid_control(target_speed, speed)
[docs] def _pid_control(self, target_speed, current_speed): """ Estimate the throttle/brake of the vehicle based on the PID equations :param target_speed: target speed in Km/h :param current_speed: current speed of the vehicle in Km/h :return: throttle/brake control """ error = target_speed - current_speed self._error_buffer.append(error) if len(self._error_buffer) >= 2: _de = (self._error_buffer[-1] - self._error_buffer[-2]) / self._dt _ie = sum(self._error_buffer) * self._dt else: _de = 0.0 _ie = 0.0 return np.clip((self._k_p * error) + (self._k_d * _de) + (self._k_i * _ie), -1.0, 1.0)
[docs] def change_parameters(self, K_P, K_I, K_D, dt): """Changes the PID parameters""" self._k_p = K_P self._k_i = K_I self._k_d = K_D self._dt = dt
[docs] class PIDLateralController: """ PIDLateralController implements lateral control using a PID. """ def __init__(self, offset=0, K_P=1.0, K_I=0.1, K_D=0.1, dt=0.03): """ Constructor method. :param vehicle: actor to apply to local planner logic onto :param offset: distance to the center line. If might cause issues if the value is large enough to make the vehicle invade other lanes. :param K_P: Proportional term :param K_D: Differential term :param K_I: Integral term :param dt: time differential in seconds """ self._agent: Agent self._k_p = K_P self._k_i = K_I self._k_d = K_D self._dt = dt self._offset = offset self._e_buffer = deque(maxlen=10)
[docs] def set_agent(self, agent): self._agent = agent
[docs] def run_step(self, waypoint: "tuple[float, float]"): """ Execute one step of lateral control to steer the vehicle towards a certain waypoin. :param waypoint: target waypoint :return: steering control in the range [-1, 1] where: -1 maximum steering to left +1 maximum steering to right """ try: rot = Rotation.from_quat(self._agent.self_state[3:7]).as_euler("xyz", degrees=True) rot[2], rot[1] = rot[1], rot[2] except ValueError: return 0.0 transform = carla.Transform(carla.Location(*self._agent.self_state[0:3]), carla.Rotation(*rot)) if transform.location.x == transform.location.y == 0: return 0.0 return self._pid_control(waypoint, transform)
[docs] def set_offset(self, offset): """Changes the offset""" self._offset = offset
[docs] def _pid_control(self, waypoint: "tuple[float, float]", vehicle_transform: "carla.Transform"): """ Estimate the steering angle of the vehicle based on the PID equations :param waypoint: target waypoint :param vehicle_transform: current transform of the vehicle :return: steering control in the range [-1, 1] """ # Get the ego's location and forward vector ego_loc = vehicle_transform.location v_vec = vehicle_transform.get_forward_vector() v_vec = np.array([v_vec.x, v_vec.y, 0.0]) # Get the vector vehicle-target_wp if self._offset != 0: raise NotImplementedError("Lateral offset control not implemented") # Displace the wp to the side # w_tran = waypoint.transform # r_vec = w_tran.get_right_vector() # w_loc = w_tran.location + carla.Location(x=self._offset * r_vec.x, y=self._offset * r_vec.y) else: w_loc = carla.Location(*waypoint) # waypoint.transform.location w_vec = np.array([w_loc.x - ego_loc.x, w_loc.y - ego_loc.y, 0.0]) wv_linalg = np.linalg.norm(w_vec) * np.linalg.norm(v_vec) if wv_linalg == 0: _dot = 1 else: _dot = math.acos(np.clip(np.dot(w_vec, v_vec) / (wv_linalg), -1.0, 1.0)) _cross = np.cross(v_vec, w_vec) if _cross[2] < 0: _dot *= -1.0 self._e_buffer.append(_dot) if len(self._e_buffer) >= 2: _de = (self._e_buffer[-1] - self._e_buffer[-2]) / self._dt _ie = sum(self._e_buffer) * self._dt else: _de = 0.0 _ie = 0.0 return np.clip((self._k_p * _dot) + (self._k_d * _de) + (self._k_i * _ie), -1.0, 1.0)
[docs] def change_parameters(self, K_P, K_I, K_D, dt): """Changes the PID parameters""" self._k_p = K_P self._k_i = K_I self._k_d = K_D self._dt = dt
[docs] class PPController(CarlaController): """ PPController implements a Pure Pursuit controller for path following. It computes the necessary speed and steering angle to follow a path. """ def __init__( self, agent_id: int, lookahead_distance: float = 2.0, wheel_base: float = 2.5, default_speed: float = 1.0, max_steering: float = 0.8, async_loop_lock=None, ): """ Constructor method. :param agent_id: ID of the agent :param lookahead_distance: distance ahead to look for the target point :param wheel_base: distance between front and rear axles :param default_speed: default speed when no target speed is specified :param max_steering: maximum steering angle in radians :param async_loop_lock: lock for async operations """ super().__init__(agent_id, async_loop_lock) self._lookahead_distance = lookahead_distance self._wheel_base = wheel_base self._default_speed = default_speed self._max_steering = max_steering
[docs] def _waypoint_compute(self, target_waypoint: "tuple[float, float] | None"): """ Compute the control input using pure pursuit algorithm. :return: control input array and time series """ # Get target speed from knowledge database, or use default target_speed = self._agent.self_knowledge.get("target_speed", self._default_speed) if target_waypoint is None: # No target, stop the vehicle return ( VehicleControlModel.control_input_to_array(throttle=0.0, brake=1.0, steer=0.0), TimeSeries(), ) # Get current position and orientation x, y = self._agent.self_state[0:2] quaternion = self._agent.self_state[3:7] yaw = to_euler(quaternion)[1] # Extract yaw from quaternion print(f"Current position: ({x}, {y}), yaw: {yaw}, target waypoint: {target_waypoint}") # Compute steering angle using pure pursuit steering_angle = self._pure_pursuit(x, y, yaw, target_waypoint) # Convert to throttle/brake commands # Simple control: if we have a target speed, use throttle, otherwise brake if target_speed > 0: throttle = min(target_speed, 1.0) else: throttle = 0.0 return throttle, steering_angle
[docs] def _pure_pursuit(self, x: float, y: float, yaw: float, target: "tuple[float, float]") -> float: """ Pure pursuit algorithm to compute steering angle. :param x: current x position :param y: current y position :param yaw: current yaw angle in radians :param target: target waypoint as (x, y) tuple :return: steering angle in range [-1, 1] """ tx, ty = target # Calculate the vector from vehicle to target dx = tx - x dy = ty - y # Transform target to vehicle's coordinate frame # Rotate by -yaw to get local coordinates local_x = dx * np.cos(-yaw) - dy * np.sin(-yaw) local_y = dx * np.sin(-yaw) + dy * np.cos(-yaw) # Calculate the lookahead distance (actual distance to target) lookahead = np.sqrt(local_x**2 + local_y**2) # Use the configured lookahead distance or actual distance, whichever is smaller effective_lookahead = min(lookahead, self._lookahead_distance) if effective_lookahead < 0.1: # Too close to target, no steering needed return 0.0 # Calculate curvature using pure pursuit formula # curvature = 2 * local_y / lookahead^2 curvature = 2.0 * local_y / (effective_lookahead**2) # Calculate steering angle using bicycle model # steering_angle = atan(wheel_base * curvature) steering_angle = np.arctan(self._wheel_base * curvature) # Normalize to [-1, 1] range normalized_steering = np.clip(steering_angle / self._max_steering, -1.0, 1.0) return normalized_steering
[docs] def set_lookahead_distance(self, distance: float): """Set the lookahead distance.""" self._lookahead_distance = distance
[docs] def set_default_speed(self, speed: float): """Set the default speed.""" self._default_speed = speed
[docs] class MPController(CarlaController): def __init__( self, agent_id, time_step: float = -1, wheel_base: float = 1.5, horizon_steps: int = 5, async_loop_lock=None ): super().__init__(agent_id, async_loop_lock) self.x_dim = 4 self.u_dim = 2 self.v_bound = [0, 8] self.ephi_bound = [-np.pi / 2, np.pi / 2] self.a_bound = [-2, 2] self.deltaphi_bound = [-1, 1] self.dt = ( time_step if time_step > 0 else (async_loop_lock.time_interval if isinstance(async_loop_lock, TimeIntervalAsyncLoopLock) else -1) ) assert self.dt > 0, "Time step must be positive either via argument or async loop lock." self.WB = wheel_base self.horizon = horizon_steps self.opti = ca.Opti() self.X = None self.U = None self.X_ref = self.opti.parameter(3) # (r_ref, ey_ref, v_ref) self.X_0 = self.opti.parameter(self.x_dim) self.prob_init()
[docs] def initialise_component(self, agent, initial_awareness_database, initial_knowledge_database): super().initialise_component(agent, initial_awareness_database, initial_knowledge_database) self.prob_init()
[docs] def prob_init(self): print("Initializing MPC problem...") # Define the state and control variables r = ca.SX.sym("r") ey = ca.SX.sym("ey") ephi = ca.SX.sym("ephi") v = ca.SX.sym("v") a = ca.SX.sym("a") deltaphi = ca.SX.sym("deltaphi") states = ca.vertcat(r, ey, ephi, v) controls = ca.vertcat(a, deltaphi) # Bicycle abstraction dynamics # Assume road curvature κ(t) = 0 for straight road (can be parameterized later) kappa = 0 # road curvature rdot = v * ca.cos(ephi) / (1 - ey * kappa) eydot = v * ca.sin(ephi) ephidot = v * (ca.tan(deltaphi) / self.WB - kappa * ca.cos(ephi) / (1 - kappa * ey)) vdot = a # Control horizon state_dot = ca.vertcat(rdot, eydot, ephidot, vdot) # Function to integrate dynamics over each interval integrate_f = ca.Function("integrate_f", [states, controls], [state_dot]) # Objective function and constraints self.X = self.opti.variable(self.x_dim, self.horizon + 1) # state trajectory self.U = self.opti.variable(self.u_dim, self.horizon) # control trajectory cost = 0 # Setup cost function and constraints for k in range(self.horizon): # State tracking cost Q = ca.DM([1.0, 1.0, 2.0]) Q_rate = 5 Q *= Q_rate cost = ( cost + Q[0] * (self.X[0, k] - self.X_ref[0]) ** 2 + Q[1] * (self.X[1, k] - self.X_ref[1]) ** 2 + Q[2] * (self.X[3, k] - self.X_ref[2]) ** 2 ) # Control input cost R = ca.DM([1.0, 5.0]) R_rate = 1 R *= R_rate cost = cost + R[0] * self.U[0, k] ** 2 + R[1] * self.U[1, k] ** 2 # System dynamics as constraints st_next = self.X[:, k] + integrate_f(self.X[:, k], self.U[:, k]) * self.dt self.opti.subject_to(self.X[:, k + 1] == st_next) # Control bounds self.opti.subject_to(self.U[0, k] > self.a_bound[0]) self.opti.subject_to(self.U[0, k] < self.a_bound[1]) self.opti.subject_to(self.U[1, k] > self.deltaphi_bound[0]) self.opti.subject_to(self.U[1, k] < self.deltaphi_bound[1]) # State bounds self.opti.subject_to(self.X[2, k + 1] > self.ephi_bound[0]) self.opti.subject_to(self.X[2, k + 1] < self.ephi_bound[1]) self.opti.subject_to(self.X[3, k + 1] > self.v_bound[0]) self.opti.subject_to(self.X[3, k + 1] < self.v_bound[1]) # Boundary conditions self.opti.subject_to(self.X[:, 0] == self.X_0) # initial condition # Solver configuration self.opti.minimize(cost) # opts = {'ipopt': {'print_level': 0}} opts = {"ipopt": {"print_level": 0}, "print_time": 0, "verbose": False} self.opti.solver("ipopt", opts)
[docs] def run_step(self, current_state: np.ndarray, target_ref: np.ndarray): """Solve the MPC optimization problem. The current state is composed as follows: $$ [x, y, phi, v] $$ where $x, y$ are the position coordinates, $phi$ is the heading angle, and $v$ is the velocity. On the other hand, the target reference is composed as follows: $$ [x_t, y_t, v_t] $$ where $x_t, y_t$ are the target position coordinates, and $v_t$ is the target velocity. """ self.opti.set_value(self.X_0, current_state) self.opti.set_value(self.X_ref, target_ref) try: sol = self.opti.solve() except RuntimeError: print("MPC solver failed, using previous control.") breakpoint() # If solver fails, return zero control return np.array([0.0, 0.0]) optimal_states = sol.value(self.X) optimal_controls = sol.value(self.U) return optimal_controls[:, 0]
[docs] def get_state(self) -> np.ndarray: """Get the current optimized state trajectory.""" state = self._agent.self_state return np.array( ( state[0], state[1], Rotation.from_quat(self._agent.self_state[3:7]).as_euler("xyz")[2], math.sqrt(state[7] ** 2 + state[8] ** 2), ) )
[docs] def _compute(self): xy = self._agent.self_state[:2] try: yaw = Rotation.from_quat(self._agent.self_state[3:7]).as_euler("xyz")[2] except ValueError: return self._to_control_input(speed=0, steering_angle=0), TimeSeries() speed = math.sqrt(self._agent.self_state[7] ** 2 + self._agent.self_state[8] ** 2) print(f"Current position: {xy}, yaw: {yaw}, speed: {speed}") state = np.array([xy[0], xy[1], yaw, speed]) speed, angle = self.run_step(state, np.array([xy[0] + 2, xy[1] - 20, min(speed + 1, self.v_bound[1])])) print(f"Computed control: speed={speed}, angle={angle}") return ( self._to_control_input(speed=speed, steering_angle=angle), TimeSeries(), )
[docs] class PathGridAbstractionController(BaseController): __LOGGER = get_logger(__name__, "PathGridAbstractionController") def __init__(self, agent_id: int, grid_map: GridMap, async_loop_lock=None): super().__init__(agent_id, async_loop_lock) self._grid_map = grid_map self._policy = np.array([]) self._abstraction: PathGridAbstraction = None self._lp: VelocityLPRiskLTL = None self._last_prod_state_idx: int = 0 # Index of the last product state, initialized to the initial state self._last_target: "tuple[tuple[float, float], float] | None" = None # Last target position and speed self._last_path: "int" = -1 # Last path index self._force_recompute: bool = True # Flag to force recomputation of the path. Used when the policy is updated. self._initial_pos_speed: "tuple[float, float, float]" = (0.0, 0.0, 0.0) self._last_policy_file: str = "" self._fig = None self._ax = None
[docs] def initialise_component( self, agent: Agent, initial_awareness_database, initial_knowledge_database: "MultiAgentKnowledgeDatabase[PathGridAbstractionKnowledgeDatabase]", ): super().initialise_component(agent, initial_awareness_database, initial_knowledge_database) self_knowledge_database = initial_knowledge_database[agent.id] self._initial_pos_speed = (agent.self_state[0], agent.self_state[1], 0.0) # Abstraction self._abstraction = PathGridAbstraction( self._grid_map, max_speed=self_knowledge_database["max_speed"], speed_res=self_knowledge_database["speed_res"], initial_state=(agent.self_state[0], agent.self_state[1], 0.0), label_function=self_knowledge_database["label_func"], ) # Load the specifications safety = self_knowledge_database["safety"] safe_dfa = safety if isinstance(safety, DFA) else DFA.from_spec(safety) cosafety = self_knowledge_database["cosafety"] cosafe_dfa = cosafety if isinstance(cosafety, DFA) else DFA.from_spec(cosafety) # Computing the policy cost_func = self_knowledge_database["cost_func"] self._lp = VelocityLPRiskLTL(self._abstraction, DFA_safe=safe_dfa, DFA_sc=cosafe_dfa, cost_func=cost_func) policy_file = f"{self._lp.hash}_policy.npy" self._last_policy_file = policy_file self._agent.self_knowledge["policy_file"] = policy_file if policy_file != "" and os.path.exists(policy_file) and not self._agent.self_knowledge.get("no_cache", False): self.__LOGGER.info("Offline policy file %s found, loading policy...", policy_file) self._policy = np.load(policy_file) else: self.__LOGGER.info("No offline policy file found, computing policy online...") self._policy, _ = self._lp.extract(self._lp.solve()) np.save(policy_file, self._policy) # import matplotlib.pyplot as plt # self._abstraction.plot_labels() # self._abstraction.plot_labels(additional_dimension_to_plot=1) # self._grid_map.plot_grid_projection() # self._lp.plot_policy(self._policy) # plt.show() self._agent.self_knowledge["outdated_policy"] = False
[docs] def _update_policy(self, pos: "tuple[float, float]", speed: float): if self._agent.self_knowledge["outdated_policy"] is False: return # No need to update the policy self.__LOGGER.info("Updating policy based on new knowledge...") self._lp.update( pos_speed=(self._initial_pos_speed), label_function=self._agent.self_knowledge["label_func"], cost_func=self._agent.self_knowledge["cost_func"], ) policy_file = f"{self._lp.hash}_policy.npy" if policy_file == self._last_policy_file: self.__LOGGER.info("Policy file hash unchanged, no need to update policy file.") self._agent.self_knowledge["outdated_policy"] = False return self._last_policy_file = policy_file self._agent.self_knowledge["policy_file"] = policy_file if policy_file != "" and os.path.exists(policy_file) and not self._agent.self_knowledge.get("no_cache", False): self.__LOGGER.info("Offline policy file %s found, loading policy...", policy_file) self._policy = np.load(policy_file) else: self.__LOGGER.info("No offline policy file found, computing policy online...") self._policy, _ = self._lp.extract(self._lp.solve()) np.save(policy_file, self._policy) self._agent.self_knowledge["outdated_policy"] = False self._last_path = -1 # Reset last path to force recomputation self._force_recompute = True # Force recomputation of the path self._show_updated_policy()
[docs] def _show_updated_policy(self): return import matplotlib.pyplot as plt if self._fig is None or self._ax is None: self._fig, self._ax = plt.subplots() plt.show(block=False) self.abstraction.plot_labels(2) self._fig.canvas.draw() self._fig.canvas.flush_events() plt.show(block=False) self.__LOGGER.info("Policy updated and plotted.")
@property def policy(self) -> "np.ndarray": return self._policy @property def abstraction(self) -> "PathGridAbstraction": return self._abstraction @property def lp(self) -> "VelocityLPRiskLTL": return self._lp
[docs] def _run_step(self, pos: "tuple[float, float]", speed: float) -> "tuple[tuple[float, float], float] | None": # With the current position and speed, get the current abstract state current_state = self._abstraction.pos_speed_to_state(pos, speed) if current_state is None: # print(f"Current position {pos} and speed {speed} out of abstraction bounds.") return self._last_target # Out of the abstraction bounds, keep last target # Then, compute the next product state based on the last product state and current abstract state new_prod_state_idx, new_prod_state = self._lp.product.get_next_prod_state( current_state, self._last_prod_state_idx ) if new_prod_state_idx == self._last_prod_state_idx and not self._force_recompute: return self._last_target # No change in product state, no need to update action self._force_recompute = False self._last_prod_state_idx = new_prod_state_idx # Since the product state has changed, get the new action from the policy action = self._abs_state_to_action(new_prod_state_idx) # If no action is available for the current speed, try all other speeds if action is None: actions = self._policy.reshape( self.abstraction.n_states, len(self._lp.product.DFA_cs.states), len(self._lp.product.DFA_safe.states) ) for i in range(self.abstraction.speed_range - 1, -1, -1): state = self._abstraction.pos_speed_to_state(pos, i * self._abstraction.speed_resolution) print( f"Trying to find action for speed index {i}... with speed {i * self._abstraction.speed_resolution} at position {pos} (from {new_prod_state} to {state})." ) if state is None: continue action = actions[state, new_prod_state[1] - 1, new_prod_state[2] - 1] print(f"Found action {action} for state {state} and prod state {new_prod_state_idx}.") if action >= 0: self._last_target = self._abstraction.state_action_to_next_state(state, action) self._last_path = self._abstraction.action_names[action][0] print(f"Last path updated to {self._last_path} for speed {i * self._abstraction.speed_resolution}.") return self._last_target # If still no action is available, keep the last target if action is None or action == -1: print(f"No action available for product state {new_prod_state_idx}.") if self._last_target is not None: self._last_target = (self._last_target[0], 0.0) # Set speed to 0 if no action is available return self._last_target # No action available, keep last target if action == -2: print(f"Action indicates to stop for product state {new_prod_state_idx}.") self._last_target = None self._last_path = -1 return self._last_target # Finally, convert the action to target position and speed and update the product state self._last_path = action[0] print(f"Last path updated to {self._last_path} for product state {new_prod_state_idx}.") self._last_target = self._abstraction.state_action_to_next_state(current_state, action) print( f"Computed new target: {self._last_target} using action: {action} => from prod state {new_prod_state_idx} {(pos, speed)}" ) return self._last_target
[docs] def _abs_state_to_action(self, prod_state_idx: int) -> "tuple[SteerAction, SpeedAction] | None": if self._policy.size == 0: raise RuntimeError("Policy not computed yet.") action_idx = int(self._policy[prod_state_idx]) if action_idx < 0: return None # No action available self.__LOGGER.debug( "From prod state %d computed action %s", prod_state_idx, self._abstraction.action_names[action_idx] ) return self._abstraction.action_names[action_idx]