Source code for symaware.simulators.carla.abstraction

from abc import ABC, abstractmethod
from dataclasses import dataclass
from functools import lru_cache
from itertools import product
from numbers import Integral
from typing import TYPE_CHECKING, Generic, TypeVar

import matplotlib.pyplot as plt
import numpy as np

from .grid import GridMap, PathsGridMap, WaypointGridMap
from .mdp import LightMDP

if TYPE_CHECKING:
    from typing import Callable, Literal

    from .grid import _Cell, _Position

    _State = int
    LabelMap = dict["Bounds", str]
    PositionMap = dict[tuple[float, float], str]
    IdxMap = dict[int, str]
    SteerAction = Literal["l", "f", "r"]  # 'l' for 'left', 'f' for 'forward', 'r' for 'right'
    SpeedAction = Literal["d", "c", "a"]  # 'd' for 'decelerate', 'c' for 'cruise', 'a' for 'accelerate'
    MoveAction = Literal[
        "ul", "u", "ur", "l", "s", "r", "dl", "d", "dr"
    ]  # 'ur' for 'turn right', 'u' for 'go straight', 'ul' for 'turn left'
    Mode = Literal["center", "lower-left-corner", "default"]
    GridLabelFunction = Callable[["_State", "GridAbstraction"], str]

_A = TypeVar("_A")


[docs] @dataclass(frozen=True) class Bounds: col_lb: int = 0 col_ub: int = 0 row_lb: int = 0 row_ub: int = 0
[docs] class PathAbstraction(Generic[_A]): """ Create an abstraction (a path) over a continuous space. The path will be defined by a sequence of waypoints, each representing a discrete state in the continuous space. An MDP will be associated to the path, describing the probability of moving from one waypoint to the next based on the action taken. """ def __init__(self, waypoints: "list[tuple[float, float]]", actions: "tuple[_A]", radius: float = 2.0): """ Initialize the path abstraction over a continuous space. Args ---- waypoints: List of waypoints defining the path as (x, y) coordinates actions: Tuple of available actions for the MDP radius: Radius around each waypoint to consider as "at" the waypoint Raises ------ AssertionError: If waypoints list is empty """ assert len(waypoints) > 0, "Waypoints list is empty" self._radius = radius self._state_set = np.array(waypoints) self._action_set = np.array(actions) self._trans_matrix = self.gen_transitions()
[docs] def gen_transitions(self): """ Generate the transition probability matrix for the MDP. Computes transition probabilities for all state-action pairs based on the scenario-specific movement model. Returns ------- 3D transition matrix with shape (n_states, n_actions, n_states) where entry [i,j,k] represents P(s'=k | s=i, a=j) """ P = None for i, state in enumerate(self._state_set): P_s = None for action in self._action_set: P_s_a = self.trans_func(i, action) P_s = np.vstack((P_s, P_s_a)) if P_s is not None else P_s_a P_s = np.expand_dims(P_s, axis=0) P = np.vstack((P, P_s)) if P is not None else P_s return P
[docs] def gen_labels(self, label_function: "PositionMap"): """ Generate state labels based on the provided label function. Maps regions defined by Bounds to label strings and assigns labels to states that fall within those regions. Args ---- label_function: Dictionary mapping Bounds regions to label strings Returns ------- List of labels for each state, with "_" as default for unlabeled states """ # The setting of resolution should correspond to regions of each label label_map = ["_"] * len(self._state_set) for position, label in label_function.items(): for n, state in enumerate(self._state_set): if np.linalg.norm(state - np.array(position)) <= self._radius: label_map[n] = label if label_map[n] == "_" else label_map[n] + label return label_map
[docs] def trans_func(self, idx: int, action: _A): """ Compute transition probabilities from a given position and action. Uses the probability distributions to determine the likelihood of moving to different neighboring waypoints. Args ---- idx: Index of the current waypoint in the path action: Action to take, typically -2, -1, 0, 1, or 2 Returns ------- Array of transition probabilities to all states """ next_state = np.zeros(len(self._state_set)) def apply_transition_values(values: "list[float]"): for i, v in enumerate(values): target_idx = idx - 2 + i if 0 <= target_idx < len(self._state_set): next_state[target_idx] = v if action == -2: apply_transition_values([4.0, 6.0, 0.0, 0.0, 0.0]) elif action == -1: apply_transition_values([0.0, 0.7, 0.3, 0.0, 0.0]) elif action == 0: apply_transition_values([0.0, 0.1, 0.8, 0.1, 0.0]) elif action == 1: apply_transition_values([0.0, 0.0, 0.3, 0.7, 0.0]) elif action == 2: apply_transition_values([0.0, 0.0, 0.0, 6.0, 4.0]) # Ensure probabilities sum to 1 tot = np.sum(next_state) if tot < 1.0: next_state[idx] += 1.0 - tot assert np.isclose(np.sum(next_state), 1.0), "Transition probabilities do not sum to 1" return next_state
[docs] def closest_waypoint_idx(self, position: "_Position") -> int: """ Find the index of the closest waypoint to a given position. Args ---- position: Continuous position as (x, y) coordinates Returns ------- Index of the closest waypoint in the path """ dists = np.linalg.norm(self._state_set - np.array(position), axis=1) return int(np.argmin(dists))
[docs] def closest_waypoint(self, position: "_Position") -> "_Position": """ Find the closest waypoint to a given position. Args ---- position: Continuous position as (x, y) coordinates Returns ------- Coordinates of the closest waypoint in the path """ return tuple(self._state_set[self.closest_waypoint_idx(position)])
[docs] def is_at_waypoint(self, position: "_Position") -> bool: """ Check if a given position is within the radius of any waypoint. Args ---- position: Continuous position as (x, y) coordinates Returns ------- True if the position is within the radius of the waypoint, False otherwise """ waypoint_idx = self.closest_waypoint_idx(position) waypoint = self._state_set[waypoint_idx] dist = np.linalg.norm(np.array(position) - waypoint) return dist <= self._radius
[docs] class GridAbstraction(Generic[_A], ABC): """ A simple grid-based abstraction over a continuous space. This class provides a grid representation of a continuous space, allowing for easy mapping between continuous positions and discrete grid cells. Each cell in the grid can be marked as occupied or free, and the grid can be positioned, rotated, and scaled in continuous space. """ def __init__(self, grid_map: "GridMap", actions: "tuple[_A]", state_repetition: int = 1): self._grid_map: "GridMap" # Will get initialized in set_gridmap self._cell_to_states_map: "np.ndarray" # Will get initialized in set_gridmap self._states_to_cell_map: "np.ndarray" # Will get initialized in set_gridmap self.set_gridmap(grid_map) self._states_to_cell_map = np.tile(self._states_to_cell_map, state_repetition) self._action_to_name = actions self._name_to_action = {a: i for i, a in enumerate(actions)} self._transition_matrix = self.gen_transitions() self._MDP = LightMDP( num_states=len(self._states_to_cell_map), initial_state=0, num_actions=len(self._action_to_name), transitions=self._transition_matrix, labels=["_"] * len(self._states_to_cell_map), ) @property def MDP(self) -> LightMDP: """ Get the MDP associated with the grid abstraction. Returns ------- The LightMDP instance representing the MDP """ return self._MDP @property def action_set(self): """ Get the set of available actions in the MDP. Returns ------- Array of action identifiers """ return np.arange(len(self._action_to_name)) @property def state_set(self): """ Get the set of discrete states in the grid abstraction. Returns ------- Array of state identifiers """ return np.arange(len(self._states_to_cell_map)) @property def n_states(self) -> int: """ Get the number of discrete states in the grid abstraction. Returns ------- Number of states """ return len(self._states_to_cell_map) @property def n_actions(self) -> int: """ Get the number of available actions in the grid abstraction. Returns ------- Number of actions """ return len(self._action_to_name) @property def action_names(self) -> "tuple[_A]": """ Get the list of action names in the grid abstraction. Returns ------- List of action identifiers """ return self._action_to_name @property def transition_matrix(self) -> np.ndarray: """ Get the transition probability matrix for the MDP. Returns ------- 3D transition matrix with shape (n_states, n_actions, n_states) where entry [i,j,k] represents P(s'=k | s=i, a=j) """ return self._transition_matrix @property def grid_map(self) -> "GridMap": """ Get the current grid map used in the abstraction. Returns ------- The GridMap instance representing the grid """ return self._grid_map @grid_map.setter def grid_map(self, grid_map: "GridMap"): """ Set the grid map used in the abstraction. This will update the internal mappings accordingly. Args ---- grid_map: The GridMap instance to set """ self.set_gridmap(grid_map)
[docs] def set_gridmap(self, grid_map: "GridMap"): # Clear all cached functions that depend on the grid map for method in self.__class__.__dict__.values(): if hasattr(method, "cache_clear"): method.cache_clear() self._grid_map = grid_map filled_cells = grid_map.num_occupied assert filled_cells > 0, "Grid map has no filled cells" self._cell_to_states_map = np.full(grid_map._grid.shape[:2], -1, dtype=int) self._states_to_cell_map = np.empty(filled_cells, dtype=int) counter = 0 for c in range(grid_map._grid.shape[1]): for r in range(grid_map._grid.shape[0]): # TODO: r first or c first? if grid_map.is_occupied(r, c): self._cell_to_states_map[r, c] = counter self._states_to_cell_map[counter] = r * grid_map.grid.shape[1] + c counter += 1
[docs] def gen_labels(self, label_function: "GridLabelFunction"): """ Generate state labels based on the provided label function. Maps regions defined by Bounds to label strings and assigns labels to states that fall within those regions. Args ---- label_function: Function that takes a state and the GridAbstraction, and returns a label string Returns ------- List of labels for each state, with "_" as default for unlabeled states """ # The setting of resolution should correspond to regions of each label label_map = ["_"] * len(self._states_to_cell_map) for state, _ in enumerate(self._states_to_cell_map): label = label_function(state, self) label_map[state] = label if label_map[state] == "_" else label_map[state] + label return label_map
[docs] def pos_to_state(self, position: "_Position") -> "int | None": """ Convert a continuous position to a discrete grid state. Args ---- position: Continuous position as (x, y) coordinates Returns ------- Discrete state identifier corresponding to the grid cell containing the position, or None if the position is outside the grid or in an unoccupied cell """ x, y = self._grid_map.pos_to_cell(position) if 0 <= x < self._grid_map._grid.shape[0] and 0 <= y < self._grid_map._grid.shape[1]: state_idx = self._cell_to_states_map[x, y] if state_idx != -1: return state_idx return None
[docs] @lru_cache def state_to_pos(self, state: int) -> "_Position": """ Convert a discrete grid state to a continuous position. Args ---- state: Discrete state identifier Returns ------- Continuous position as (x, y) coordinates of the cell's lower-left corner Raises ------ AssertionError: If the state identifier is invalid """ assert 0 <= state < len(self._states_to_cell_map), "Invalid state identifier" r, c = self._state_to_cell(state) return self._grid_map.cell_to_pos((r, c))
[docs] def _state_to_cell(self, state: int) -> "_Cell": """ Convert a discrete grid state to grid cell coordinates. Args ---- state: Discrete state identifier Returns ------- Grid cell coordinates as (column, row) indices Raises ------ AssertionError: If the state identifier is invalid """ assert 0 <= state < len(self._states_to_cell_map), "Invalid state identifier" cell_idx = self._states_to_cell_map[state] r = cell_idx // self._grid_map._grid.shape[1] c = cell_idx % self._grid_map._grid.shape[1] return (r, c)
[docs] def update(self, position: "_Position | int | None" = None, label_function: "GridLabelFunction | None" = None): """ Update the MDP with a new initial state and labels. Args ---- position: New initial position as (x, y) coordinates or state index. Ignored if not provided label_function: Dictionary mapping Bounds regions to label strings. Ignored if not provided """ if isinstance(position, int): self._MDP.initial_state = position elif position is not None: self._MDP.initial_state = self.pos_to_state(position) if label_function is not None: self._MDP.labels = self.gen_labels(label_function)
[docs] def gen_transitions(self): """ Generate the transition probability matrix for the MDP. Computes transition probabilities for all state-action pairs based on the scenario-specific movement model. Returns ------- 3D transition matrix with shape (n_states, n_actions, n_states) where entry [i,j,k] represents P(s'=k | s=i, a=j) """ P = np.empty((self._states_to_cell_map.size, len(self._action_to_name), self._states_to_cell_map.size)) for state, _ in enumerate(self._states_to_cell_map): P_s = np.empty((len(self._action_to_name), self._states_to_cell_map.size)) for action, _ in enumerate(self._action_to_name): P_s_a = self.transition_func(state, action) P_s[action, :] = P_s_a P[state, :, :] = P_s return P
[docs] def additional_dimension_to_name(self, additional_dimension_idx: int, additional_dimension_value: int) -> "str": """ Return the name corresponding to an additional dimension value. Args ---- additional_dimension_idx: Index of the additional dimension (which additional dimension we are referring to) additional_dimension_value: Value of the additional dimension (index within that dimension) """ return ""
[docs] @abstractmethod def transition_func(self, state: "int", action: "int") -> np.ndarray: """ Compute transition probabilities from a given state and action. Args ---- state: Current discrete state identifier action: Action identifier Returns ------- Array of transition probabilities to all states. The length of the array should equal the number of states. """ pass
[docs] def plot_transitions_state_action( self, state: "int", action: "int", additional_dimension_to_plot: int = 0, axes: "plt.Axes | None" = None ): """ Visualize the transition probabilities from a given state and action. Args ---- state: Current discrete state identifier action: Action identifier """ assert ( 0 <= additional_dimension_to_plot < (len(self._states_to_cell_map) // self._grid_map.num_occupied) ), "Invalid additional dimension to plot" prob_map = np.zeros(self._grid_map._grid.shape[:2]) P_s_a = self._transition_matrix[state, action, :] looping_idx = 0 num_full_cells = self._grid_map.num_occupied for i, prob in enumerate(P_s_a): r, c = self._state_to_cell(i) prob_map[r, c] = prob looping_idx += 1 if looping_idx == num_full_cells: looping_idx = 0 additional_dimension_idx = i // num_full_cells if additional_dimension_idx != additional_dimension_to_plot: continue # reached end of grid, plot current prob_map add_bar = axes is None if axes is not None: ax = axes fig = ax.get_figure() else: fig, ax = plt.subplots() im = ax.imshow(prob_map.T, cmap="hot", origin="lower", interpolation="nearest", vmin=0, vmax=1) if add_bar: fig.colorbar(im) r, c = self._state_to_cell(state) ax.scatter(r, c, color="green", marker="o", s=100, label="Current State") x, y = self.state_to_pos(state) ax.set_title( f"P(s'=k | s=({x:.2f}, {y:.2f}{self.additional_dimension_to_name(additional_dimension_idx)}), a={self._action_to_name[action]})" ) ax.legend() return fig, ax
[docs] def plot_transition_state(self, state: "int", additional_dimension_to_plot: int = 0): """ Visualize all the transition probabilities from a given state, across all actions. Args ---- state: Current discrete state identifier """ n_actions = len(self._action_to_name) max_fig_cols = 4 n_rows = (n_actions + max_fig_cols - 1) // max_fig_cols fig, axes = plt.subplots(n_rows, min(n_actions, max_fig_cols)) for action in range(n_actions): ax = axes[action // max_fig_cols, action % max_fig_cols] if n_rows > 1 else axes[action % max_fig_cols] self.plot_transitions_state_action( state, action, additional_dimension_to_plot=additional_dimension_to_plot, axes=ax ) fig.colorbar(ax.images[0], ax=axes.ravel().tolist()) return fig, axes
[docs] def plot_labels(self, additional_dimension_to_plot: int = 0, ax: "plt.Axes | None" = None): """ Visualize the labels assigned to each state in the grid. Returns ------- Matplotlib figure and axes containing the label visualization """ label_map = np.full(self._grid_map._grid.shape[:2], "█", dtype=str) num_full_cells = self._grid_map.num_occupied for state, _ in enumerate(self._states_to_cell_map): additional_dimension_idx = state // num_full_cells if additional_dimension_idx != additional_dimension_to_plot: continue r, c = self._state_to_cell(state) ic = self._grid_map._grid.shape[1] - c - 1 # invert column for plotting label_map[r, ic] = self._MDP.labels[state] fig, ax = plt.subplots() if ax is None else (ax.get_figure(), ax) ax.table(cellText=label_map.T, loc="center", cellLoc="center") ax.axis("off") fig.tight_layout() return fig, ax
[docs] class KingGridAbstraction(GridAbstraction["tuple[int, int]"]): """ Implementation of the King grid movement model. The King can move one square in any direction (horizontally, vertically, or diagonally). The probability of successful movement is 0.8, while the probability of ending up in any of the adjacent squares (including diagonals) is uniformly distributed over the remaining 0.2 probability. There is also the action of staying in place with probability 1.0. The actions are represented as tuples (delta_velocity_x, delta_velocity_y), where each component can be -1, 0, or 1. .. svgbob:: ↖ ↑ ↗ y ← ● → ↙ ↓ ↘ x """ def __init__(self, grid_map: "GridMap"): super().__init__(grid_map, tuple(product((-1, 0, 1), repeat=2)))
[docs] def _get_prob_vector(self, delta: "int") -> np.ndarray: if delta == -1: return np.array([0.1, 0.7, 0.2, 0.0, 0.0]) if delta == 0: return np.array([0.0, 0.0, 1.0, 0.0, 0.0]) if delta == 1: return np.array([0.0, 0.0, 0.2, 0.7, 0.1]) raise ValueError("Invalid action for King movement model")
[docs] def transition_func(self, state: "int", action: "int") -> np.ndarray: """ Compute transition probabilities from a given state and action. Args ---- state: Current discrete state identifier action: Action tuple containing (delta_velocity_x, delta_velocity_y) Returns ------- Array of transition probabilities to all states. The length of the array should equal the number of states. """ x, y = self._state_to_cell(state) P_s_a = np.zeros(len(self.state_set)) delta_x, delta_y = self._action_to_name[action] prob_x = self._get_prob_vector(delta_x) prob_y = self._get_prob_vector(delta_y) assert len(prob_x) % 2 == 1 and len(prob_y) % 2 == 1, "Probability vectors must have odd length" prob = np.outer(prob_x, prob_y) for m in range(len(prob_x)): for n in range(len(prob_y)): new_x = x + m - len(prob_x) // 2 new_y = y + n - len(prob_y) // 2 if 0 <= new_x < self._grid_map.grid.shape[0] and 0 <= new_y < self._grid_map.grid.shape[1]: state = self._cell_to_states_map[new_x, new_y] if state == -1: continue P_s_a[state] = prob[m, n] # P_sn_row = np.clip(position[0] + m - 2, 0, self._grid_shape[0] - 1) # P_sn_col = np.clip(position[1] + n - 2, 0, self._grid_shape[1] - 1) # P_sn[P_sn_row, P_sn_col] += prob_map[m, n] return P_s_a
[docs] class SpeedGridAbstraction(GridAbstraction[_A]): """ A grid-based abstraction that incorporates velocity into the state representation. This class extends the GridAbstraction to include velocity components in the state representation. The actions are represented as tuples (delta_movement, delta_speed), where each component can be -1, 0, or 1. Movement Model: - The agent can move left (-1), forward (0), or right (1) with specified probabilities. - The speed can decrease (-1), remain the same (0), or increase (1) with specified probabilities. In this case, the state representation is assumed to be a combination of position and velocity. """ def __init__( self, grid_map: "GridMap", max_speed: float, actions: "tuple[_A]", speed_res: float = 1.0, initial_state: "tuple[float, float, float]" = (0.0, 0.0, 0.0), label_function: "GridLabelFunction | None" = None, ): self._num_active_pos = grid_map.num_occupied self._speed_range = int(max_speed // speed_res) + 1 self._speed_res = speed_res super().__init__(grid_map, actions, state_repetition=self._speed_range) self.update(initial_state, label_function) @property def speed_range(self) -> int: """ Get the number of discrete speed levels in the abstraction. Returns ------- Number of speed levels """ return self._speed_range @property def speed_resolution(self) -> float: """ Get the speed resolution used in the abstraction. Returns ------- Speed resolution value """ return self._speed_res @property def max_speed(self) -> float: """ Get the maximum speed represented in the abstraction. Returns ------- Maximum speed value """ return (self._speed_range - 1) * self._speed_res
[docs] @lru_cache def _state_to_speed_idx(self, state: int) -> "int": """ Convert a discrete state to its corresponding speed index. Args ---- state: Discrete state identifier Returns ------- Grid cell coordinates as (column, row) indices Raises ------ AssertionError: If the state identifier is invalid """ assert 0 <= state < len(self._states_to_cell_map), "Invalid state identifier" return state // self._num_active_pos
[docs] def _cell_speed_to_state(self, cell: "_Cell", speed_idx: int) -> "int": """ Convert grid cell coordinates and speed index to a discrete state identifier. Args ---- cell: Grid cell coordinates as (column, row) indices speed_idx: Speed index Returns ------- Unique state identifier for the cell and speed Raises ------ AssertionError: If cell coordinates or speed index are invalid """ r, c = cell assert ( 0 <= r < self._grid_map._grid.shape[0] and 0 <= c < self._grid_map._grid.shape[1] ), "Invalid cell coordinates" assert 0 <= speed_idx < self._speed_range, "Invalid speed index" base_state = self._cell_to_states_map[r, c] return base_state + self._num_active_pos * speed_idx
[docs] @lru_cache def state_to_speed(self, state: int) -> "float": """ Convert a discrete grid state to a continuous speed. Args ---- state: Discrete state identifier Returns ------- Continuous speed value Raises ------ AssertionError: If the state identifier is invalid """ assert 0 <= state < len(self._states_to_cell_map), "Invalid state identifier" return self._state_to_speed_idx(state) * self._speed_res
[docs] def cell_speed_idx_to_pos_speed(self, cell: "_Cell", speed_idx: int) -> "tuple[_Position, float]": """ Convert grid cell coordinates and speed index to continuous position and speed. Args ---- cell: Grid cell coordinates as (column, row) indices speed_idx: Speed index Returns ------- Tuple containing continuous position as (x, y) coordinates and speed value Raises ------ AssertionError: If cell coordinates or speed index are invalid """ r, c = cell assert ( 0 <= r < self._grid_map._grid.shape[0] and 0 <= c < self._grid_map._grid.shape[1] ), "Invalid cell coordinates" assert 0 <= speed_idx < self._speed_range, "Invalid speed index" return self._grid_map.cell_to_pos((r, c)), speed_idx * self._speed_res
[docs] def pos_speed_to_state(self, position: "_Position", speed: float) -> "int | None": """ Convert a continuous position and speed to the corresponding abstract state. Args ---- position: Continuous position as (x, y) coordinates speed: Continuous speed value Returns ------- Discrete state identifier corresponding to the grid cell and speed, or None if the position is outside the grid or in an unoccupied cell """ base_state = self.pos_to_state(position) if base_state is None: return None speed_idx = int(speed // self._speed_res) if 0 <= speed_idx < self._speed_range: return base_state + self._num_active_pos * speed_idx return None
[docs] def state_to_pos_speed(self, state: "int") -> "tuple[_Position, float]": """ Convert a discrete state identifier to continuous position and speed. Args ---- state: Discrete state identifier Returns ------- Tuple containing continuous position as (x, y) coordinates and speed value Raises ------ AssertionError: If the state identifier is invalid """ return self.state_to_pos(state), self.state_to_speed(state)
[docs] def update( self, pos_speed: "tuple[float, float, float] | int | None" = None, label_function: "GridLabelFunction | None" = None, ): """ Update the MDP with a new initial state and labels. Args ---- pos_speed: New initial position as (x, y) coordinates and speed (z) or direct state index. Ignored if not provided label_function: Dictionary mapping Bounds regions to label strings. Ignored if not provided """ if isinstance(pos_speed, int): self._MDP.initial_state = pos_speed elif pos_speed is not None: self._MDP.initial_state = self.pos_speed_to_state(pos_speed[0:2], pos_speed[2]) if label_function is not None: self._MDP.labels = self.gen_labels(label_function)
[docs] def additional_dimension_to_name(self, additional_dimension_idx: int) -> str: return f", s: {additional_dimension_idx * self._speed_res}"
[docs] class PathGridAbstraction(SpeedGridAbstraction["tuple[int, int]"]): def __init__( self, grid_map: "GridMap", max_speed: float, speed_res: float = 1.0, initial_state: "tuple[float, float, float]" = (0.0, 0.0, 0.0), label_function: "GridLabelFunction | None" = None, ): assert isinstance(grid_map, PathsGridMap), "PathGridAbstraction requires a PathsGridMap" self._grid_map: "PathsGridMap" super().__init__( grid_map=grid_map, max_speed=max_speed, actions=tuple(product(range(grid_map.num_paths), (-1, 0, 1))), speed_res=speed_res, initial_state=initial_state, label_function=label_function, )
[docs] def _get_prob_vector(self, delta: "int") -> np.ndarray: if delta == -1: return np.array([0.0, 0.8, 0.2, 0.0, 0.0]) if delta == 0: return np.array([0.0, 0.0, 1.0, 0.0, 0.0]) if delta == 1: return np.array([0.0, 0.0, 0.2, 0.8, 0.0]) raise ValueError("Invalid action")
[docs] def gen_transitions(self): """ Generate the transition probability matrix for the MDP. Computes transition probabilities for all state-action pairs based on the scenario-specific movement model. Returns ------- 3D transition matrix with shape (n_states, n_actions, n_states) where entry [i,j,k] represents P(s'=k | s=i, a=j) """ P = np.zeros((self._states_to_cell_map.size, len(self._action_to_name), self._states_to_cell_map.size)) for (current_cell, path_idx), target_cell in self._grid_map.edges.items(): for speed in range(self._speed_range): state = self._cell_speed_to_state(current_cell, speed) for speed_delta in (-1, 0, 1): action = self._name_to_action[(path_idx, speed_delta)] P[state, action, :] = self.transition_func(state, action, target_cell=target_cell) return P
[docs] def transition_func(self, state: "int", action: "int", target_cell: "_Cell") -> "np.ndarray": """ Compute transition probabilities from a given state and action. Args ---- state: Current discrete state identifier action: Action tuple containing (delta_velocity_x, delta_velocity_y) Returns ------- Array of transition probabilities to all states. The length of the array should equal the number of states. """ speed_idx = self._state_to_speed_idx(state) P_s_a = np.zeros(len(self.state_set)) _, delta_speed = self._action_to_name[action] prob_speed = self._get_prob_vector(delta_speed) assert len(prob_speed) % 2 == 1, "Probability vectors must have odd length" for v in range(len(prob_speed)): new_speed_idx = speed_idx + v - len(prob_speed) // 2 if not (0 <= new_speed_idx < self._speed_range): continue # Out of speed bounds new_state = self._cell_speed_to_state(target_cell, new_speed_idx) assert new_state != -1, "State should be valid" P_s_a[new_state] = prob_speed[v] prob = P_s_a.sum() missing = 1.0 - prob if missing > 0 and prob > 0: P_s_a += P_s_a / prob * missing # Scale existing probabilities assert np.isclose(P_s_a.sum(), 1.0), "Transition probabilities must sum to 1" return P_s_a
[docs] def state_action_to_next_state( self, state: "int | tuple[float, float, float]", action: "tuple[int, int] | int" ) -> "tuple[tuple[float, float], float]": if isinstance(action, Integral): action = self._action_to_name[action] path_idx, delta_speed = action if isinstance(state, Integral): x, y = self._state_to_cell(state) s = self._state_to_speed_idx(state) else: x, y, s = state next_cell: "_Cell" = self._grid_map.edges.get(((x, y), path_idx), (x, y)) assert next_cell is not None, "No valid next cell for the given state and action" assert ( 0 <= next_cell[0] < self.grid_map.rows and 0 <= next_cell[1] < self.grid_map.cols ), "Next cell out of bounds" next_s = np.clip(s + delta_speed, 0, self.speed_range - 1) return self.cell_speed_idx_to_pos_speed(next_cell, next_s)
[docs] class VelocityGridAbstraction(SpeedGridAbstraction["tuple[SteerAction, SpeedAction]"]): """ A grid-based abstraction that incorporates velocity into the state representation. This class extends the GridAbstraction to include velocity components in the state representation. The actions are represented as tuples (delta_movement, delta_speed), where each component can be -1, 0, or 1. Movement Model: - The agent can move left (-1), forward (0), or right (1) with specified probabilities. - The speed can decrease (-1), remain the same (0), or increase (1) with specified probabilities. In this case, the state representation is assumed to be a combination of position and velocity. """ def __init__( self, grid_map: "GridMap", max_speed: float, speed_res: float = 1.0, initial_state: "tuple[float, float, float]" = (0.0, 0.0, 0.0), label_function: "GridLabelFunction | None" = None, ): move_set = np.array( ["ul", "u", "ur", "l", "s", "r", "dl", "d", "dr"] ) # 'ur' for 'turn right', 'u' for 'go straight', 'ul' for 'turn left' # speed_set = np.array(["d", "c", "a"]) # 'd' for 'decelerate', 'c' for 'cruise', 'a' for 'accelerate' speed_set = np.array(["c"]) # 'd' for 'decelerate', 'c' for 'cruise', 'a' for 'accelerate' A, B = np.meshgrid(move_set, speed_set) actions = tuple((m, s) for m, s in np.array([A.flatten(), B.flatten()]).T) super().__init__( grid_map=grid_map, max_speed=max_speed, actions=actions, speed_res=speed_res, initial_state=initial_state, label_function=label_function, )
[docs] def _get_prob_spatial(self, delta: "MoveAction") -> np.ndarray: prob_spatial = np.zeros((5, 5)) # TODO: make 3x3 if delta == "ul": # up left # Primary transition: r+1, ey+1 (forward and left) prob_spatial[1, 3] = 0.8 # r+1, ey+1 # Some uncertainty around the main transition prob_spatial[2, 3] = 0.1 # r+1, ey+0 (forward only) prob_spatial[1, 2] = 0.1 # r+0, ey+1 (left only) elif delta == "u": # up # Primary transition: r+1, ey+0 (forward only) prob_spatial[2, 3] = 0.9 # r+1, ey+0 # Some uncertainty prob_spatial[2, 2] = 0.1 # r+0, ey+0 (no movement) elif delta == "ur": # up right # Primary transition: r+1, ey-1 (forward and right) prob_spatial[3, 3] = 0.8 # r+1, ey-1 # Some uncertainty around the main transition prob_spatial[2, 3] = 0.1 # r+1, ey+0 (forward only) prob_spatial[3, 2] = 0.1 # r+0, ey-1 (right only) elif delta == "l": # left # Primary transition: r+0, ey+1 (left only) prob_spatial[1, 2] = 0.8 # r+0, ey+1 # Some uncertainty around the main transition prob_spatial[1, 1] = 0.1 # r+1, ey+1 (forward and left) prob_spatial[1, 3] = 0.1 # r-1, ey+1 (backward and left) elif delta == "s": # stay prob_spatial[2, 2] = 1.0 # r+0, ey+0 (no movement) elif delta == "r": # right # Primary transition: r+0, ey-1 (right only) prob_spatial[3, 2] = 0.8 # r+0, ey-1 # Some uncertainty around the main transition prob_spatial[3, 1] = 0.1 # r+1, ey-1 (forward and right) prob_spatial[3, 3] = 0.1 # r-1, ey-1 (backward and right) elif delta == "dl": # down left # Primary transition: r-1, ey+1 (backward and left) prob_spatial[1, 1] = 0.8 # r-1, ey+1 # Some uncertainty around the main transition prob_spatial[2, 1] = 0.1 # r-1, ey+0 (backward only) prob_spatial[1, 2] = 0.1 # r+0, ey+1 (left only) elif delta == "d": # down # Primary transition: r-1, ey+0 (backward only) prob_spatial[2, 1] = 0.9 # r-1, ey+0 # Some uncertainty prob_spatial[2, 2] = 0.1 # r+0, ey+0 (no movement) elif delta == "dr": # down right # Primary transition: r-1, ey-1 (backward and right) prob_spatial[3, 1] = 0.8 # r-1, ey-1 # Some uncertainty around the main transition prob_spatial[2, 1] = 0.1 # r-1, ey+0 (backward only) prob_spatial[3, 2] = 0.1 # r+0, ey-1 (right only) return prob_spatial
[docs] def _get_prob_speed(self, delta: "SpeedAction") -> np.ndarray: prob_v = np.zeros(5) # Index 2 represents no change (δv=0) if delta == "a": prob_v[3] = 0.9 # v+1 prob_v[2] = 0.1 # v+0 (failed acceleration) elif delta == "c": prob_v[2] = 1.0 # v+0 (no velocity change) elif delta == "d": prob_v[1] = 0.9 # v-1 prob_v[2] = 0.1 # v+0 (failed deceleration) return prob_v
[docs] def transition_func(self, state: "int", action: "int") -> np.ndarray: """ Compute transition probabilities from a given state and action. Args ---- state: Current discrete state identifier action: Action tuple containing (delta_velocity_x, delta_velocity_y) Returns ------- Array of transition probabilities to all states. The length of the array should equal the number of states. """ x, y = self._state_to_cell(state) speed_idx = self._state_to_speed_idx(state) P_s_a = np.zeros(len(self._states_to_cell_map)) delta_mov, delta_speed = self._action_to_name[action] prob_mov = self._get_prob_spatial(delta_mov) prob_speed = self._get_prob_speed(delta_speed) assert len(prob_mov) % 2 == 1 and len(prob_speed) % 2 == 1, "Probability vectors must have odd length" # Create a combined probability distribution over movement and speed changes # Imagine extending the 2D movement probability grid into a 3D grid by stacking # copies of it for each possible speed change, weighted by the speed change probabilities. # # /────────┐ # k/ / │ # ┌────────┐ ┌────────┐ │ # │ │ ┌┐ │ │ / # i │ │ X ││k = i │ │ / # │ │ └┘ │ │/ # └────────┘ └────────┘ # j j # prob = np.einsum("ij,k->ijk", prob_mov, prob_speed) assert prob.shape == (prob_mov.shape[0], prob_mov.shape[1], prob_speed.shape[0]) for m in range(prob_mov.shape[0]): for n in range(prob_mov.shape[1]): if prob_mov[m, n] == 0.0: continue # No probability for this movement new_x = x + m - prob_mov.shape[0] // 2 new_y = y + n - prob_mov.shape[1] // 2 if not self._grid_map.is_occupied(new_x, new_y): continue # Cell is not occupied for v in range(len(prob_speed)): if prob[m, n, v] == 0.0: continue # No probability for this transition new_speed_idx = speed_idx + v - len(prob_speed) // 2 if not (0 <= new_speed_idx < self._speed_range): continue # Out of speed bounds new_state = self._cell_speed_to_state((new_x, new_y), new_speed_idx) assert new_state != -1, "State should be valid" P_s_a[new_state] = prob[m, n, v] # TODO: you can get situation where the output prob distribution does not sum to 1. Is it a problem? # Potentially we could compute the missing probability for each (m,n) and distribute it to the valid new states proportionally # missing_prob = 1.0 - P_s_a.sum() # if missing_prob > 0: # valid_new_states = np.where(P_s_a > 0)[0] # P_s_a[valid_new_states] += P_s_a[valid_new_states] * missing_prob / P_s_a[valid_new_states].sum() # assert P_s_a.sum() == 0 or np.allclose(P_s_a.sum(), 1.0, atol=0.01), f"Transition probabilities must sum to 1, got {P_s_a.sum()}" return P_s_a
[docs] def state_action_to_next_state( self, state: "int | tuple[float, float, float]", action: "tuple[SteerAction, SpeedAction] | int" ) -> "tuple[tuple[float, float], float]": if isinstance(action, Integral): action = self._action_to_name[action] delta_mov, delta_speed = action if isinstance(state, Integral): x, y = self._state_to_cell(state) s = self._state_to_speed_idx(state) else: x, y, s = state # delta_x = 1 # if delta_mov == "l": # delta_y = 1 # elif delta_mov == "f": # delta_y = 0 # elif delta_mov == "r": # delta_y = -1 if delta_mov == "ul": delta_x = -1 delta_y = 1 elif delta_mov == "u": delta_x = 0 delta_y = 1 elif delta_mov == "ur": delta_x = 1 delta_y = 1 elif delta_mov == "l": delta_x = -1 delta_y = 0 elif delta_mov == "s": delta_x = 0 delta_y = 0 elif delta_mov == "r": delta_x = 1 delta_y = 0 elif delta_mov == "dl": delta_x = -1 delta_y = -1 elif delta_mov == "d": delta_x = 0 delta_y = -1 elif delta_mov == "dr": delta_x = 1 delta_y = -1 if delta_speed == "a": delta_s = 1 elif delta_speed == "c": delta_s = 0 elif delta_speed == "d": delta_s = -1 target_x = np.clip(x + delta_x, 0, self.grid_map.rows - 1) target_y = np.clip(y + delta_y, 0, self.grid_map.cols - 1) target_s = np.clip(s + delta_s, 0, self.speed_range - 1) return self.cell_speed_idx_to_pos_speed((target_x, target_y), target_s)
[docs] def _main(): from matplotlib import pyplot as plt ROTATION = np.pi / 6 # 30 degrees point = (32.0, 18.0) cos_angle = np.cos(ROTATION) sin_angle = np.sin(ROTATION) translated_x = point[0] - 10.0 translated_y = point[1] - 10.0 rotated_x = translated_x * cos_angle - translated_y * sin_angle rotated_y = translated_x * sin_angle + translated_y * cos_angle rotated_point = (rotated_x + 10.0, rotated_y + 10.0) for grid in ( GridMap((10, 4), (2.0, 2.0), (12.0, 10.0), rotation=ROTATION), GridMap.from_bounds((12.0, 10.0), rotated_point, (2.0, 2.0), rotation=ROTATION), GridMap.from_corner((12.0, 10.0), (20.0, 8.0), (2.0, 2.0), rotation=ROTATION), ): grid.add_cell(2, 3) grid.add_cell(8, 0) grid.add_cell(9, 1) grid.add_pos((18, 17)) grid.add_pos_range((10, 14), (18, 14)) fig1, ax1 = grid.plot_grid() fig2, ax2 = grid.plot_grid_projection() plt.show()
[docs] def plotting(): ROTATION = np.pi / 6 # 30 degrees grid = GridMap.from_corner((12.0, 10.0), (20.0, 8.0), (2.0, 2.0), rotation=ROTATION) grid.add_cell_range(0, 10, 0, 4) grid.add_cell(2, 1) grid.add_cell(1, 0) grid.remove_cell(9, 3) grid.plot_grid() abs = VelocityGridAbstraction(grid, max_speed=1.0, speed_res=0.5, initial_state=(12.0, 10.0, 0.0)) abs.plot_transition_state(5, additional_dimension_to_plot=1) plt.show()
if __name__ == "__main__": plotting()