import math
import os
from collections import deque
from typing import TYPE_CHECKING
import carla
import casadi as ca
import numpy as np
from scipy.spatial.transform import Rotation
from symaware.extra.ros import AckermannDriveModel, TwistDriveModel
from symaware.base import Agent
from symaware.base import Controller as BaseController
from symaware.base import TimeIntervalAsyncLoopLock, TimeSeries, get_logger
from .abstraction import GridMap, PathGridAbstraction
from .dynamical_model import VehicleControlModel
from .ltl import DFA
from .risk import VelocityLPRiskLTL
if TYPE_CHECKING:
from typing import Callable, TypedDict
from symaware.base import MultiAgentKnowledgeDatabase
from .abstraction import SpeedAction, SteerAction
class PathGridAbstractionKnowledgeDatabase(TypedDict):
max_speed: float
speed_res: float
label_func: "Callable[[int, PathGridAbstraction], str]"
cost_func: "Callable[[int, str, PathGridAbstraction], float]"
safety: "DFA | str"
cosafety: "DFA | str"
[docs]
class GridAbstractionController(BaseController):
def __init__(self, agent_id: int, grid_map: GridMap, policy_file: str = "", async_loop_lock=None):
super().__init__(agent_id, async_loop_lock)
[docs]
def initialise_component(self, agent, initial_awareness_database, initial_knowledge_database):
super().initialise_component(agent, initial_awareness_database, initial_knowledge_database)
[docs]
class CarlaController(BaseController):
def __init__(self, agent_id: int, async_loop_lock=None):
super().__init__(agent_id, async_loop_lock)
self._model: "type[AckermannDriveModel] | type[TwistDriveModel] | type[VehicleControlModel]"
[docs]
def initialise_component(self, agent, initial_awareness_database, initial_knowledge_database):
super().initialise_component(agent, initial_awareness_database, initial_knowledge_database)
self._model = type(self._agent.model)
[docs]
class VehiclePIDController(CarlaController):
"""
VehiclePIDController is the combination of two PID controllers
(lateral and longitudinal) to perform the
low level control a vehicle from client side
"""
def __init__(
self,
agent_id: int,
args_lateral={"K_P": 2.0, "K_D": 0.0, "K_I": 0.0},
args_longitudinal={"K_P": 2.0, "K_D": 0.0, "K_I": 0.0},
offset=0,
max_throttle=0.75,
max_brake=0.3,
max_steering=0.8,
async_loop_lock=None,
):
"""
Constructor method.
:param vehicle: actor to apply to local planner logic onto
:param args_lateral: dictionary of arguments to set the lateral PID controller
using the following semantics:
K_P -- Proportional term
K_D -- Differential term
K_I -- Integral term
:param args_longitudinal: dictionary of arguments to set the longitudinal
PID controller using the following semantics:
K_P -- Proportional term
K_D -- Differential term
K_I -- Integral term
:param offset: If different than zero, the vehicle will drive displaced from the center line.
Positive values imply a right offset while negative ones mean a left one. Numbers high enough
to cause the vehicle to drive through other lanes might break the controller.
"""
super().__init__(agent_id, async_loop_lock)
self.max_brake = max_brake
self.max_throt = max_throttle
self.max_steer = max_steering
self.past_steering = 0
dt = async_loop_lock.time_interval if isinstance(async_loop_lock, TimeIntervalAsyncLoopLock) else 0.1
self._lon_controller = PIDLongitudinalController(dt=dt, **args_longitudinal)
self._lat_controller = PIDLateralController(offset, dt=dt, **args_lateral)
[docs]
def initialise_component(self, agent, initial_awareness_database, initial_knowledge_database):
super().initialise_component(agent, initial_awareness_database, initial_knowledge_database)
self.past_steering = 0
self._lon_controller._agent = self._agent
self._lat_controller._agent = self._agent
[docs]
def _compute(self):
target_speed = self._agent.self_knowledge["target_speed"]
waypoint = self._agent.self_knowledge["target"]
cmd = self.run_step(target_speed, waypoint)
return (
self._to_control_input(speed=cmd.throttle - cmd.brake, steering_angle=cmd.steer),
TimeSeries(),
)
[docs]
def run_step(self, target_speed: float, waypoint: "tuple[float, float]"):
"""
Execute one step of control invoking both lateral and longitudinal
PID controllers to reach a target waypoint
at a given target_speed.
:param target_speed: desired vehicle speed
:param waypoint: target location encoded as a waypoint
:return: distance (in meters) to the waypoint
"""
acceleration = self._lon_controller.run_step(target_speed)
current_steering = self._lat_controller.run_step(waypoint)
control = carla.VehicleControl()
if acceleration >= 0.0:
control.throttle = min(acceleration, self.max_throt)
control.brake = 0.0
else:
control.throttle = 0.0
control.brake = min(abs(acceleration), self.max_brake)
# Steering regulation: changes cannot happen abruptly, can't steer too much.
if current_steering > self.past_steering + 0.1:
current_steering = self.past_steering + 0.1
elif current_steering < self.past_steering - 0.1:
current_steering = self.past_steering - 0.1
if current_steering >= 0:
steering = min(self.max_steer, current_steering)
else:
steering = max(-self.max_steer, current_steering)
control.steer = steering
control.hand_brake = False
control.manual_gear_shift = False
self.past_steering = steering
return control
[docs]
def change_longitudinal_PID(self, args_longitudinal):
"""Changes the parameters of the PIDLongitudinalController"""
self._lon_controller.change_parameters(**args_longitudinal)
[docs]
def change_lateral_PID(self, args_lateral):
"""Changes the parameters of the PIDLateralController"""
self._lat_controller.change_parameters(**args_lateral)
[docs]
def set_offset(self, offset):
"""Changes the offset"""
self._lat_controller.set_offset(offset)
[docs]
class PIDLongitudinalController:
"""
PIDLongitudinalController implements longitudinal control using a PID.
"""
def __init__(self, K_P=1.0, K_I=0.1, K_D=0.1, dt=0.03):
"""
Constructor method.
:param vehicle: actor to apply to local planner logic onto
:param K_P: Proportional term
:param K_D: Differential term
:param K_I: Integral term
:param dt: time differential in seconds
"""
self._agent: Agent
self._k_p = K_P
self._k_i = K_I
self._k_d = K_D
self._dt = dt
self._error_buffer = deque(maxlen=10)
[docs]
def set_agent(self, agent):
self._agent = agent
[docs]
def run_step(self, target_speed: float, debug: bool = False):
"""
Execute one step of longitudinal control to reach a given target speed.
:param target_speed: target speed in Km/h
:param debug: boolean for debugging
:return: throttle control
"""
vel = self._agent.self_state[7:9]
speed = math.sqrt(vel[0] ** 2 + vel[1] ** 2)
if debug:
print("Current speed = {}".format(speed))
return self._pid_control(target_speed, speed)
[docs]
def _pid_control(self, target_speed, current_speed):
"""
Estimate the throttle/brake of the vehicle based on the PID equations
:param target_speed: target speed in Km/h
:param current_speed: current speed of the vehicle in Km/h
:return: throttle/brake control
"""
error = target_speed - current_speed
self._error_buffer.append(error)
if len(self._error_buffer) >= 2:
_de = (self._error_buffer[-1] - self._error_buffer[-2]) / self._dt
_ie = sum(self._error_buffer) * self._dt
else:
_de = 0.0
_ie = 0.0
return np.clip((self._k_p * error) + (self._k_d * _de) + (self._k_i * _ie), -1.0, 1.0)
[docs]
def change_parameters(self, K_P, K_I, K_D, dt):
"""Changes the PID parameters"""
self._k_p = K_P
self._k_i = K_I
self._k_d = K_D
self._dt = dt
[docs]
class PIDLateralController:
"""
PIDLateralController implements lateral control using a PID.
"""
def __init__(self, offset=0, K_P=1.0, K_I=0.1, K_D=0.1, dt=0.03):
"""
Constructor method.
:param vehicle: actor to apply to local planner logic onto
:param offset: distance to the center line. If might cause issues if the value
is large enough to make the vehicle invade other lanes.
:param K_P: Proportional term
:param K_D: Differential term
:param K_I: Integral term
:param dt: time differential in seconds
"""
self._agent: Agent
self._k_p = K_P
self._k_i = K_I
self._k_d = K_D
self._dt = dt
self._offset = offset
self._e_buffer = deque(maxlen=10)
[docs]
def set_agent(self, agent):
self._agent = agent
[docs]
def run_step(self, waypoint: "tuple[float, float]"):
"""
Execute one step of lateral control to steer
the vehicle towards a certain waypoin.
:param waypoint: target waypoint
:return: steering control in the range [-1, 1] where:
-1 maximum steering to left
+1 maximum steering to right
"""
try:
rot = Rotation.from_quat(self._agent.self_state[3:7]).as_euler("xyz", degrees=True)
rot[2], rot[1] = rot[1], rot[2]
except ValueError:
return 0.0
transform = carla.Transform(carla.Location(*self._agent.self_state[0:3]), carla.Rotation(*rot))
if transform.location.x == transform.location.y == 0:
return 0.0
return self._pid_control(waypoint, transform)
[docs]
def set_offset(self, offset):
"""Changes the offset"""
self._offset = offset
[docs]
def _pid_control(self, waypoint: "tuple[float, float]", vehicle_transform: "carla.Transform"):
"""
Estimate the steering angle of the vehicle based on the PID equations
:param waypoint: target waypoint
:param vehicle_transform: current transform of the vehicle
:return: steering control in the range [-1, 1]
"""
# Get the ego's location and forward vector
ego_loc = vehicle_transform.location
v_vec = vehicle_transform.get_forward_vector()
v_vec = np.array([v_vec.x, v_vec.y, 0.0])
# Get the vector vehicle-target_wp
if self._offset != 0:
raise NotImplementedError("Lateral offset control not implemented")
# Displace the wp to the side
# w_tran = waypoint.transform
# r_vec = w_tran.get_right_vector()
# w_loc = w_tran.location + carla.Location(x=self._offset * r_vec.x, y=self._offset * r_vec.y)
else:
w_loc = carla.Location(*waypoint) # waypoint.transform.location
w_vec = np.array([w_loc.x - ego_loc.x, w_loc.y - ego_loc.y, 0.0])
wv_linalg = np.linalg.norm(w_vec) * np.linalg.norm(v_vec)
if wv_linalg == 0:
_dot = 1
else:
_dot = math.acos(np.clip(np.dot(w_vec, v_vec) / (wv_linalg), -1.0, 1.0))
_cross = np.cross(v_vec, w_vec)
if _cross[2] < 0:
_dot *= -1.0
self._e_buffer.append(_dot)
if len(self._e_buffer) >= 2:
_de = (self._e_buffer[-1] - self._e_buffer[-2]) / self._dt
_ie = sum(self._e_buffer) * self._dt
else:
_de = 0.0
_ie = 0.0
return np.clip((self._k_p * _dot) + (self._k_d * _de) + (self._k_i * _ie), -1.0, 1.0)
[docs]
def change_parameters(self, K_P, K_I, K_D, dt):
"""Changes the PID parameters"""
self._k_p = K_P
self._k_i = K_I
self._k_d = K_D
self._dt = dt
[docs]
class PPController(CarlaController):
"""
PPController implements a Pure Pursuit controller for path following.
It computes the necessary speed and steering angle to follow a path.
"""
def __init__(
self,
agent_id: int,
lookahead_distance: float = 2.0,
wheel_base: float = 2.5,
default_speed: float = 1.0,
max_steering: float = 0.8,
async_loop_lock=None,
):
"""
Constructor method.
:param agent_id: ID of the agent
:param lookahead_distance: distance ahead to look for the target point
:param wheel_base: distance between front and rear axles
:param default_speed: default speed when no target speed is specified
:param max_steering: maximum steering angle in radians
:param async_loop_lock: lock for async operations
"""
super().__init__(agent_id, async_loop_lock)
self._lookahead_distance = lookahead_distance
self._wheel_base = wheel_base
self._default_speed = default_speed
self._max_steering = max_steering
[docs]
def _waypoint_compute(self, target_waypoint: "tuple[float, float] | None"):
"""
Compute the control input using pure pursuit algorithm.
:return: control input array and time series
"""
# Get target speed from knowledge database, or use default
target_speed = self._agent.self_knowledge.get("target_speed", self._default_speed)
if target_waypoint is None:
# No target, stop the vehicle
return (
VehicleControlModel.control_input_to_array(throttle=0.0, brake=1.0, steer=0.0),
TimeSeries(),
)
# Get current position and orientation
x, y = self._agent.self_state[0:2]
quaternion = self._agent.self_state[3:7]
yaw = to_euler(quaternion)[1] # Extract yaw from quaternion
print(f"Current position: ({x}, {y}), yaw: {yaw}, target waypoint: {target_waypoint}")
# Compute steering angle using pure pursuit
steering_angle = self._pure_pursuit(x, y, yaw, target_waypoint)
# Convert to throttle/brake commands
# Simple control: if we have a target speed, use throttle, otherwise brake
if target_speed > 0:
throttle = min(target_speed, 1.0)
else:
throttle = 0.0
return throttle, steering_angle
[docs]
def _pure_pursuit(self, x: float, y: float, yaw: float, target: "tuple[float, float]") -> float:
"""
Pure pursuit algorithm to compute steering angle.
:param x: current x position
:param y: current y position
:param yaw: current yaw angle in radians
:param target: target waypoint as (x, y) tuple
:return: steering angle in range [-1, 1]
"""
tx, ty = target
# Calculate the vector from vehicle to target
dx = tx - x
dy = ty - y
# Transform target to vehicle's coordinate frame
# Rotate by -yaw to get local coordinates
local_x = dx * np.cos(-yaw) - dy * np.sin(-yaw)
local_y = dx * np.sin(-yaw) + dy * np.cos(-yaw)
# Calculate the lookahead distance (actual distance to target)
lookahead = np.sqrt(local_x**2 + local_y**2)
# Use the configured lookahead distance or actual distance, whichever is smaller
effective_lookahead = min(lookahead, self._lookahead_distance)
if effective_lookahead < 0.1:
# Too close to target, no steering needed
return 0.0
# Calculate curvature using pure pursuit formula
# curvature = 2 * local_y / lookahead^2
curvature = 2.0 * local_y / (effective_lookahead**2)
# Calculate steering angle using bicycle model
# steering_angle = atan(wheel_base * curvature)
steering_angle = np.arctan(self._wheel_base * curvature)
# Normalize to [-1, 1] range
normalized_steering = np.clip(steering_angle / self._max_steering, -1.0, 1.0)
return normalized_steering
[docs]
def set_lookahead_distance(self, distance: float):
"""Set the lookahead distance."""
self._lookahead_distance = distance
[docs]
def set_default_speed(self, speed: float):
"""Set the default speed."""
self._default_speed = speed
[docs]
class MPController(CarlaController):
def __init__(
self, agent_id, time_step: float = -1, wheel_base: float = 1.5, horizon_steps: int = 5, async_loop_lock=None
):
super().__init__(agent_id, async_loop_lock)
self.x_dim = 4
self.u_dim = 2
self.v_bound = [0, 8]
self.ephi_bound = [-np.pi / 2, np.pi / 2]
self.a_bound = [-2, 2]
self.deltaphi_bound = [-1, 1]
self.dt = (
time_step
if time_step > 0
else (async_loop_lock.time_interval if isinstance(async_loop_lock, TimeIntervalAsyncLoopLock) else -1)
)
assert self.dt > 0, "Time step must be positive either via argument or async loop lock."
self.WB = wheel_base
self.horizon = horizon_steps
self.opti = ca.Opti()
self.X = None
self.U = None
self.X_ref = self.opti.parameter(3) # (r_ref, ey_ref, v_ref)
self.X_0 = self.opti.parameter(self.x_dim)
self.prob_init()
[docs]
def initialise_component(self, agent, initial_awareness_database, initial_knowledge_database):
super().initialise_component(agent, initial_awareness_database, initial_knowledge_database)
self.prob_init()
[docs]
def prob_init(self):
print("Initializing MPC problem...")
# Define the state and control variables
r = ca.SX.sym("r")
ey = ca.SX.sym("ey")
ephi = ca.SX.sym("ephi")
v = ca.SX.sym("v")
a = ca.SX.sym("a")
deltaphi = ca.SX.sym("deltaphi")
states = ca.vertcat(r, ey, ephi, v)
controls = ca.vertcat(a, deltaphi)
# Bicycle abstraction dynamics
# Assume road curvature κ(t) = 0 for straight road (can be parameterized later)
kappa = 0 # road curvature
rdot = v * ca.cos(ephi) / (1 - ey * kappa)
eydot = v * ca.sin(ephi)
ephidot = v * (ca.tan(deltaphi) / self.WB - kappa * ca.cos(ephi) / (1 - kappa * ey))
vdot = a
# Control horizon
state_dot = ca.vertcat(rdot, eydot, ephidot, vdot)
# Function to integrate dynamics over each interval
integrate_f = ca.Function("integrate_f", [states, controls], [state_dot])
# Objective function and constraints
self.X = self.opti.variable(self.x_dim, self.horizon + 1) # state trajectory
self.U = self.opti.variable(self.u_dim, self.horizon) # control trajectory
cost = 0
# Setup cost function and constraints
for k in range(self.horizon):
# State tracking cost
Q = ca.DM([1.0, 1.0, 2.0])
Q_rate = 5
Q *= Q_rate
cost = (
cost
+ Q[0] * (self.X[0, k] - self.X_ref[0]) ** 2
+ Q[1] * (self.X[1, k] - self.X_ref[1]) ** 2
+ Q[2] * (self.X[3, k] - self.X_ref[2]) ** 2
)
# Control input cost
R = ca.DM([1.0, 5.0])
R_rate = 1
R *= R_rate
cost = cost + R[0] * self.U[0, k] ** 2 + R[1] * self.U[1, k] ** 2
# System dynamics as constraints
st_next = self.X[:, k] + integrate_f(self.X[:, k], self.U[:, k]) * self.dt
self.opti.subject_to(self.X[:, k + 1] == st_next)
# Control bounds
self.opti.subject_to(self.U[0, k] > self.a_bound[0])
self.opti.subject_to(self.U[0, k] < self.a_bound[1])
self.opti.subject_to(self.U[1, k] > self.deltaphi_bound[0])
self.opti.subject_to(self.U[1, k] < self.deltaphi_bound[1])
# State bounds
self.opti.subject_to(self.X[2, k + 1] > self.ephi_bound[0])
self.opti.subject_to(self.X[2, k + 1] < self.ephi_bound[1])
self.opti.subject_to(self.X[3, k + 1] > self.v_bound[0])
self.opti.subject_to(self.X[3, k + 1] < self.v_bound[1])
# Boundary conditions
self.opti.subject_to(self.X[:, 0] == self.X_0) # initial condition
# Solver configuration
self.opti.minimize(cost)
# opts = {'ipopt': {'print_level': 0}}
opts = {"ipopt": {"print_level": 0}, "print_time": 0, "verbose": False}
self.opti.solver("ipopt", opts)
[docs]
def run_step(self, current_state: np.ndarray, target_ref: np.ndarray):
"""Solve the MPC optimization problem.
The current state is composed as follows:
$$
[x, y, phi, v]
$$
where $x, y$ are the position coordinates, $phi$ is the heading angle, and $v$ is the velocity.
On the other hand, the target reference is composed as follows:
$$
[x_t, y_t, v_t]
$$
where $x_t, y_t$ are the target position coordinates, and $v_t$ is the target velocity.
"""
self.opti.set_value(self.X_0, current_state)
self.opti.set_value(self.X_ref, target_ref)
try:
sol = self.opti.solve()
except RuntimeError:
print("MPC solver failed, using previous control.")
breakpoint()
# If solver fails, return zero control
return np.array([0.0, 0.0])
optimal_states = sol.value(self.X)
optimal_controls = sol.value(self.U)
return optimal_controls[:, 0]
[docs]
def get_state(self) -> np.ndarray:
"""Get the current optimized state trajectory."""
state = self._agent.self_state
return np.array(
(
state[0],
state[1],
Rotation.from_quat(self._agent.self_state[3:7]).as_euler("xyz")[2],
math.sqrt(state[7] ** 2 + state[8] ** 2),
)
)
[docs]
def _compute(self):
xy = self._agent.self_state[:2]
try:
yaw = Rotation.from_quat(self._agent.self_state[3:7]).as_euler("xyz")[2]
except ValueError:
return self._to_control_input(speed=0, steering_angle=0), TimeSeries()
speed = math.sqrt(self._agent.self_state[7] ** 2 + self._agent.self_state[8] ** 2)
print(f"Current position: {xy}, yaw: {yaw}, speed: {speed}")
state = np.array([xy[0], xy[1], yaw, speed])
speed, angle = self.run_step(state, np.array([xy[0] + 2, xy[1] - 20, min(speed + 1, self.v_bound[1])]))
print(f"Computed control: speed={speed}, angle={angle}")
return (
self._to_control_input(speed=speed, steering_angle=angle),
TimeSeries(),
)
[docs]
class PathGridAbstractionController(BaseController):
__LOGGER = get_logger(__name__, "PathGridAbstractionController")
def __init__(self, agent_id: int, grid_map: GridMap, async_loop_lock=None):
super().__init__(agent_id, async_loop_lock)
self._grid_map = grid_map
self._policy = np.array([])
self._abstraction: PathGridAbstraction = None
self._lp: VelocityLPRiskLTL = None
self._last_prod_state_idx: int = 0 # Index of the last product state, initialized to the initial state
self._last_target: "tuple[tuple[float, float], float] | None" = None # Last target position and speed
self._last_path: "int" = -1 # Last path index
self._force_recompute: bool = True # Flag to force recomputation of the path. Used when the policy is updated.
self._initial_pos_speed: "tuple[float, float, float]" = (0.0, 0.0, 0.0)
self._last_policy_file: str = ""
self._fig = None
self._ax = None
[docs]
def initialise_component(
self,
agent: Agent,
initial_awareness_database,
initial_knowledge_database: "MultiAgentKnowledgeDatabase[PathGridAbstractionKnowledgeDatabase]",
):
super().initialise_component(agent, initial_awareness_database, initial_knowledge_database)
self_knowledge_database = initial_knowledge_database[agent.id]
self._initial_pos_speed = (agent.self_state[0], agent.self_state[1], 0.0)
# Abstraction
self._abstraction = PathGridAbstraction(
self._grid_map,
max_speed=self_knowledge_database["max_speed"],
speed_res=self_knowledge_database["speed_res"],
initial_state=(agent.self_state[0], agent.self_state[1], 0.0),
label_function=self_knowledge_database["label_func"],
)
# Load the specifications
safety = self_knowledge_database["safety"]
safe_dfa = safety if isinstance(safety, DFA) else DFA.from_spec(safety)
cosafety = self_knowledge_database["cosafety"]
cosafe_dfa = cosafety if isinstance(cosafety, DFA) else DFA.from_spec(cosafety)
# Computing the policy
cost_func = self_knowledge_database["cost_func"]
self._lp = VelocityLPRiskLTL(self._abstraction, DFA_safe=safe_dfa, DFA_sc=cosafe_dfa, cost_func=cost_func)
policy_file = f"{self._lp.hash}_policy.npy"
self._last_policy_file = policy_file
self._agent.self_knowledge["policy_file"] = policy_file
if policy_file != "" and os.path.exists(policy_file) and not self._agent.self_knowledge.get("no_cache", False):
self.__LOGGER.info("Offline policy file %s found, loading policy...", policy_file)
self._policy = np.load(policy_file)
else:
self.__LOGGER.info("No offline policy file found, computing policy online...")
self._policy, _ = self._lp.extract(self._lp.solve())
np.save(policy_file, self._policy)
# import matplotlib.pyplot as plt
# self._abstraction.plot_labels()
# self._abstraction.plot_labels(additional_dimension_to_plot=1)
# self._grid_map.plot_grid_projection()
# self._lp.plot_policy(self._policy)
# plt.show()
self._agent.self_knowledge["outdated_policy"] = False
[docs]
def _update_policy(self, pos: "tuple[float, float]", speed: float):
if self._agent.self_knowledge["outdated_policy"] is False:
return # No need to update the policy
self.__LOGGER.info("Updating policy based on new knowledge...")
self._lp.update(
pos_speed=(self._initial_pos_speed),
label_function=self._agent.self_knowledge["label_func"],
cost_func=self._agent.self_knowledge["cost_func"],
)
policy_file = f"{self._lp.hash}_policy.npy"
if policy_file == self._last_policy_file:
self.__LOGGER.info("Policy file hash unchanged, no need to update policy file.")
self._agent.self_knowledge["outdated_policy"] = False
return
self._last_policy_file = policy_file
self._agent.self_knowledge["policy_file"] = policy_file
if policy_file != "" and os.path.exists(policy_file) and not self._agent.self_knowledge.get("no_cache", False):
self.__LOGGER.info("Offline policy file %s found, loading policy...", policy_file)
self._policy = np.load(policy_file)
else:
self.__LOGGER.info("No offline policy file found, computing policy online...")
self._policy, _ = self._lp.extract(self._lp.solve())
np.save(policy_file, self._policy)
self._agent.self_knowledge["outdated_policy"] = False
self._last_path = -1 # Reset last path to force recomputation
self._force_recompute = True # Force recomputation of the path
self._show_updated_policy()
[docs]
def _show_updated_policy(self):
return
import matplotlib.pyplot as plt
if self._fig is None or self._ax is None:
self._fig, self._ax = plt.subplots()
plt.show(block=False)
self.abstraction.plot_labels(2)
self._fig.canvas.draw()
self._fig.canvas.flush_events()
plt.show(block=False)
self.__LOGGER.info("Policy updated and plotted.")
@property
def policy(self) -> "np.ndarray":
return self._policy
@property
def abstraction(self) -> "PathGridAbstraction":
return self._abstraction
@property
def lp(self) -> "VelocityLPRiskLTL":
return self._lp
[docs]
def _run_step(self, pos: "tuple[float, float]", speed: float) -> "tuple[tuple[float, float], float] | None":
# With the current position and speed, get the current abstract state
current_state = self._abstraction.pos_speed_to_state(pos, speed)
if current_state is None:
# print(f"Current position {pos} and speed {speed} out of abstraction bounds.")
return self._last_target # Out of the abstraction bounds, keep last target
# Then, compute the next product state based on the last product state and current abstract state
new_prod_state_idx, new_prod_state = self._lp.product.get_next_prod_state(
current_state, self._last_prod_state_idx
)
if new_prod_state_idx == self._last_prod_state_idx and not self._force_recompute:
return self._last_target # No change in product state, no need to update action
self._force_recompute = False
self._last_prod_state_idx = new_prod_state_idx
# Since the product state has changed, get the new action from the policy
action = self._abs_state_to_action(new_prod_state_idx)
# If no action is available for the current speed, try all other speeds
if action is None:
actions = self._policy.reshape(
self.abstraction.n_states, len(self._lp.product.DFA_cs.states), len(self._lp.product.DFA_safe.states)
)
for i in range(self.abstraction.speed_range - 1, -1, -1):
state = self._abstraction.pos_speed_to_state(pos, i * self._abstraction.speed_resolution)
print(
f"Trying to find action for speed index {i}... with speed {i * self._abstraction.speed_resolution} at position {pos} (from {new_prod_state} to {state})."
)
if state is None:
continue
action = actions[state, new_prod_state[1] - 1, new_prod_state[2] - 1]
print(f"Found action {action} for state {state} and prod state {new_prod_state_idx}.")
if action >= 0:
self._last_target = self._abstraction.state_action_to_next_state(state, action)
self._last_path = self._abstraction.action_names[action][0]
print(f"Last path updated to {self._last_path} for speed {i * self._abstraction.speed_resolution}.")
return self._last_target
# If still no action is available, keep the last target
if action is None or action == -1:
print(f"No action available for product state {new_prod_state_idx}.")
if self._last_target is not None:
self._last_target = (self._last_target[0], 0.0) # Set speed to 0 if no action is available
return self._last_target # No action available, keep last target
if action == -2:
print(f"Action indicates to stop for product state {new_prod_state_idx}.")
self._last_target = None
self._last_path = -1
return self._last_target
# Finally, convert the action to target position and speed and update the product state
self._last_path = action[0]
print(f"Last path updated to {self._last_path} for product state {new_prod_state_idx}.")
self._last_target = self._abstraction.state_action_to_next_state(current_state, action)
print(
f"Computed new target: {self._last_target} using action: {action} => from prod state {new_prod_state_idx} {(pos, speed)}"
)
return self._last_target
[docs]
def _abs_state_to_action(self, prod_state_idx: int) -> "tuple[SteerAction, SpeedAction] | None":
if self._policy.size == 0:
raise RuntimeError("Policy not computed yet.")
action_idx = int(self._policy[prod_state_idx])
if action_idx < 0:
return None # No action available
self.__LOGGER.debug(
"From prod state %d computed action %s", prod_state_idx, self._abstraction.action_names[action_idx]
)
return self._abstraction.action_names[action_idx]