from abc import ABC, abstractmethod
from dataclasses import dataclass
from functools import lru_cache
from itertools import product
from numbers import Integral
from typing import TYPE_CHECKING, Generic, TypeVar
import matplotlib.pyplot as plt
import numpy as np
from .grid import GridMap, PathsGridMap, WaypointGridMap
from .mdp import LightMDP
if TYPE_CHECKING:
from typing import Callable, Literal
from .grid import _Cell, _Position
_State = int
LabelMap = dict["Bounds", str]
PositionMap = dict[tuple[float, float], str]
IdxMap = dict[int, str]
SteerAction = Literal["l", "f", "r"] # 'l' for 'left', 'f' for 'forward', 'r' for 'right'
SpeedAction = Literal["d", "c", "a"] # 'd' for 'decelerate', 'c' for 'cruise', 'a' for 'accelerate'
MoveAction = Literal[
"ul", "u", "ur", "l", "s", "r", "dl", "d", "dr"
] # 'ur' for 'turn right', 'u' for 'go straight', 'ul' for 'turn left'
Mode = Literal["center", "lower-left-corner", "default"]
GridLabelFunction = Callable[["_State", "GridAbstraction"], str]
_A = TypeVar("_A")
[docs]
@dataclass(frozen=True)
class Bounds:
col_lb: int = 0
col_ub: int = 0
row_lb: int = 0
row_ub: int = 0
[docs]
class PathAbstraction(Generic[_A]):
"""
Create an abstraction (a path) over a continuous space.
The path will be defined by a sequence of waypoints, each representing a discrete state in the continuous space.
An MDP will be associated to the path, describing the probability of moving
from one waypoint to the next based on the action taken.
"""
def __init__(self, waypoints: "list[tuple[float, float]]", actions: "tuple[_A]", radius: float = 2.0):
"""
Initialize the path abstraction over a continuous space.
Args
----
waypoints:
List of waypoints defining the path as (x, y) coordinates
actions:
Tuple of available actions for the MDP
radius:
Radius around each waypoint to consider as "at" the waypoint
Raises
------
AssertionError: If waypoints list is empty
"""
assert len(waypoints) > 0, "Waypoints list is empty"
self._radius = radius
self._state_set = np.array(waypoints)
self._action_set = np.array(actions)
self._trans_matrix = self.gen_transitions()
[docs]
def gen_transitions(self):
"""
Generate the transition probability matrix for the MDP.
Computes transition probabilities for all state-action pairs based on the
scenario-specific movement model.
Returns
-------
3D transition matrix with shape (n_states, n_actions, n_states)
where entry [i,j,k] represents P(s'=k | s=i, a=j)
"""
P = None
for i, state in enumerate(self._state_set):
P_s = None
for action in self._action_set:
P_s_a = self.trans_func(i, action)
P_s = np.vstack((P_s, P_s_a)) if P_s is not None else P_s_a
P_s = np.expand_dims(P_s, axis=0)
P = np.vstack((P, P_s)) if P is not None else P_s
return P
[docs]
def gen_labels(self, label_function: "PositionMap"):
"""
Generate state labels based on the provided label function.
Maps regions defined by Bounds to label strings and assigns labels to
states that fall within those regions.
Args
----
label_function:
Dictionary mapping Bounds regions to label strings
Returns
-------
List of labels for each state, with "_" as default for unlabeled states
"""
# The setting of resolution should correspond to regions of each label
label_map = ["_"] * len(self._state_set)
for position, label in label_function.items():
for n, state in enumerate(self._state_set):
if np.linalg.norm(state - np.array(position)) <= self._radius:
label_map[n] = label if label_map[n] == "_" else label_map[n] + label
return label_map
[docs]
def trans_func(self, idx: int, action: _A):
"""
Compute transition probabilities from a given position and action.
Uses the probability distributions to determine the likelihood
of moving to different neighboring waypoints.
Args
----
idx:
Index of the current waypoint in the path
action:
Action to take, typically -2, -1, 0, 1, or 2
Returns
-------
Array of transition probabilities to all states
"""
next_state = np.zeros(len(self._state_set))
def apply_transition_values(values: "list[float]"):
for i, v in enumerate(values):
target_idx = idx - 2 + i
if 0 <= target_idx < len(self._state_set):
next_state[target_idx] = v
if action == -2:
apply_transition_values([4.0, 6.0, 0.0, 0.0, 0.0])
elif action == -1:
apply_transition_values([0.0, 0.7, 0.3, 0.0, 0.0])
elif action == 0:
apply_transition_values([0.0, 0.1, 0.8, 0.1, 0.0])
elif action == 1:
apply_transition_values([0.0, 0.0, 0.3, 0.7, 0.0])
elif action == 2:
apply_transition_values([0.0, 0.0, 0.0, 6.0, 4.0])
# Ensure probabilities sum to 1
tot = np.sum(next_state)
if tot < 1.0:
next_state[idx] += 1.0 - tot
assert np.isclose(np.sum(next_state), 1.0), "Transition probabilities do not sum to 1"
return next_state
[docs]
def closest_waypoint_idx(self, position: "_Position") -> int:
"""
Find the index of the closest waypoint to a given position.
Args
----
position:
Continuous position as (x, y) coordinates
Returns
-------
Index of the closest waypoint in the path
"""
dists = np.linalg.norm(self._state_set - np.array(position), axis=1)
return int(np.argmin(dists))
[docs]
def closest_waypoint(self, position: "_Position") -> "_Position":
"""
Find the closest waypoint to a given position.
Args
----
position:
Continuous position as (x, y) coordinates
Returns
-------
Coordinates of the closest waypoint in the path
"""
return tuple(self._state_set[self.closest_waypoint_idx(position)])
[docs]
def is_at_waypoint(self, position: "_Position") -> bool:
"""
Check if a given position is within the radius of any waypoint.
Args
----
position:
Continuous position as (x, y) coordinates
Returns
-------
True if the position is within the radius of the waypoint, False otherwise
"""
waypoint_idx = self.closest_waypoint_idx(position)
waypoint = self._state_set[waypoint_idx]
dist = np.linalg.norm(np.array(position) - waypoint)
return dist <= self._radius
[docs]
class GridAbstraction(Generic[_A], ABC):
"""
A simple grid-based abstraction over a continuous space.
This class provides a grid representation of a continuous space,
allowing for easy mapping between continuous positions and discrete grid cells.
Each cell in the grid can be marked as occupied or free, and the grid can be
positioned, rotated, and scaled in continuous space.
"""
def __init__(self, grid_map: "GridMap", actions: "tuple[_A]", state_repetition: int = 1):
self._grid_map: "GridMap" # Will get initialized in set_gridmap
self._cell_to_states_map: "np.ndarray" # Will get initialized in set_gridmap
self._states_to_cell_map: "np.ndarray" # Will get initialized in set_gridmap
self.set_gridmap(grid_map)
self._states_to_cell_map = np.tile(self._states_to_cell_map, state_repetition)
self._action_to_name = actions
self._name_to_action = {a: i for i, a in enumerate(actions)}
self._transition_matrix = self.gen_transitions()
self._MDP = LightMDP(
num_states=len(self._states_to_cell_map),
initial_state=0,
num_actions=len(self._action_to_name),
transitions=self._transition_matrix,
labels=["_"] * len(self._states_to_cell_map),
)
@property
def MDP(self) -> LightMDP:
"""
Get the MDP associated with the grid abstraction.
Returns
-------
The LightMDP instance representing the MDP
"""
return self._MDP
@property
def action_set(self):
"""
Get the set of available actions in the MDP.
Returns
-------
Array of action identifiers
"""
return np.arange(len(self._action_to_name))
@property
def state_set(self):
"""
Get the set of discrete states in the grid abstraction.
Returns
-------
Array of state identifiers
"""
return np.arange(len(self._states_to_cell_map))
@property
def n_states(self) -> int:
"""
Get the number of discrete states in the grid abstraction.
Returns
-------
Number of states
"""
return len(self._states_to_cell_map)
@property
def n_actions(self) -> int:
"""
Get the number of available actions in the grid abstraction.
Returns
-------
Number of actions
"""
return len(self._action_to_name)
@property
def action_names(self) -> "tuple[_A]":
"""
Get the list of action names in the grid abstraction.
Returns
-------
List of action identifiers
"""
return self._action_to_name
@property
def transition_matrix(self) -> np.ndarray:
"""
Get the transition probability matrix for the MDP.
Returns
-------
3D transition matrix with shape (n_states, n_actions, n_states)
where entry [i,j,k] represents P(s'=k | s=i, a=j)
"""
return self._transition_matrix
@property
def grid_map(self) -> "GridMap":
"""
Get the current grid map used in the abstraction.
Returns
-------
The GridMap instance representing the grid
"""
return self._grid_map
@grid_map.setter
def grid_map(self, grid_map: "GridMap"):
"""
Set the grid map used in the abstraction.
This will update the internal mappings accordingly.
Args
----
grid_map:
The GridMap instance to set
"""
self.set_gridmap(grid_map)
[docs]
def set_gridmap(self, grid_map: "GridMap"):
# Clear all cached functions that depend on the grid map
for method in self.__class__.__dict__.values():
if hasattr(method, "cache_clear"):
method.cache_clear()
self._grid_map = grid_map
filled_cells = grid_map.num_occupied
assert filled_cells > 0, "Grid map has no filled cells"
self._cell_to_states_map = np.full(grid_map._grid.shape[:2], -1, dtype=int)
self._states_to_cell_map = np.empty(filled_cells, dtype=int)
counter = 0
for c in range(grid_map._grid.shape[1]):
for r in range(grid_map._grid.shape[0]):
# TODO: r first or c first?
if grid_map.is_occupied(r, c):
self._cell_to_states_map[r, c] = counter
self._states_to_cell_map[counter] = r * grid_map.grid.shape[1] + c
counter += 1
[docs]
def gen_labels(self, label_function: "GridLabelFunction"):
"""
Generate state labels based on the provided label function.
Maps regions defined by Bounds to label strings and assigns labels to
states that fall within those regions.
Args
----
label_function:
Function that takes a state and the GridAbstraction, and returns a label string
Returns
-------
List of labels for each state, with "_" as default for unlabeled states
"""
# The setting of resolution should correspond to regions of each label
label_map = ["_"] * len(self._states_to_cell_map)
for state, _ in enumerate(self._states_to_cell_map):
label = label_function(state, self)
label_map[state] = label if label_map[state] == "_" else label_map[state] + label
return label_map
[docs]
def pos_to_state(self, position: "_Position") -> "int | None":
"""
Convert a continuous position to a discrete grid state.
Args
----
position:
Continuous position as (x, y) coordinates
Returns
-------
Discrete state identifier corresponding to the grid cell containing the position,
or None if the position is outside the grid or in an unoccupied cell
"""
x, y = self._grid_map.pos_to_cell(position)
if 0 <= x < self._grid_map._grid.shape[0] and 0 <= y < self._grid_map._grid.shape[1]:
state_idx = self._cell_to_states_map[x, y]
if state_idx != -1:
return state_idx
return None
[docs]
@lru_cache
def state_to_pos(self, state: int) -> "_Position":
"""
Convert a discrete grid state to a continuous position.
Args
----
state:
Discrete state identifier
Returns
-------
Continuous position as (x, y) coordinates of the cell's lower-left corner
Raises
------
AssertionError:
If the state identifier is invalid
"""
assert 0 <= state < len(self._states_to_cell_map), "Invalid state identifier"
r, c = self._state_to_cell(state)
return self._grid_map.cell_to_pos((r, c))
[docs]
def _state_to_cell(self, state: int) -> "_Cell":
"""
Convert a discrete grid state to grid cell coordinates.
Args
----
state:
Discrete state identifier
Returns
-------
Grid cell coordinates as (column, row) indices
Raises
------
AssertionError:
If the state identifier is invalid
"""
assert 0 <= state < len(self._states_to_cell_map), "Invalid state identifier"
cell_idx = self._states_to_cell_map[state]
r = cell_idx // self._grid_map._grid.shape[1]
c = cell_idx % self._grid_map._grid.shape[1]
return (r, c)
[docs]
def update(self, position: "_Position | int | None" = None, label_function: "GridLabelFunction | None" = None):
"""
Update the MDP with a new initial state and labels.
Args
----
position:
New initial position as (x, y) coordinates or state index. Ignored if not provided
label_function:
Dictionary mapping Bounds regions to label strings. Ignored if not provided
"""
if isinstance(position, int):
self._MDP.initial_state = position
elif position is not None:
self._MDP.initial_state = self.pos_to_state(position)
if label_function is not None:
self._MDP.labels = self.gen_labels(label_function)
[docs]
def gen_transitions(self):
"""
Generate the transition probability matrix for the MDP.
Computes transition probabilities for all state-action pairs based on the
scenario-specific movement model.
Returns
-------
3D transition matrix with shape (n_states, n_actions, n_states)
where entry [i,j,k] represents P(s'=k | s=i, a=j)
"""
P = np.empty((self._states_to_cell_map.size, len(self._action_to_name), self._states_to_cell_map.size))
for state, _ in enumerate(self._states_to_cell_map):
P_s = np.empty((len(self._action_to_name), self._states_to_cell_map.size))
for action, _ in enumerate(self._action_to_name):
P_s_a = self.transition_func(state, action)
P_s[action, :] = P_s_a
P[state, :, :] = P_s
return P
[docs]
def additional_dimension_to_name(self, additional_dimension_idx: int, additional_dimension_value: int) -> "str":
"""
Return the name corresponding to an additional dimension value.
Args
----
additional_dimension_idx:
Index of the additional dimension (which additional dimension we are referring to)
additional_dimension_value:
Value of the additional dimension (index within that dimension)
"""
return ""
[docs]
@abstractmethod
def transition_func(self, state: "int", action: "int") -> np.ndarray:
"""
Compute transition probabilities from a given state and action.
Args
----
state:
Current discrete state identifier
action:
Action identifier
Returns
-------
Array of transition probabilities to all states.
The length of the array should equal the number of states.
"""
pass
[docs]
def plot_transitions_state_action(
self, state: "int", action: "int", additional_dimension_to_plot: int = 0, axes: "plt.Axes | None" = None
):
"""
Visualize the transition probabilities from a given state and action.
Args
----
state:
Current discrete state identifier
action:
Action identifier
"""
assert (
0 <= additional_dimension_to_plot < (len(self._states_to_cell_map) // self._grid_map.num_occupied)
), "Invalid additional dimension to plot"
prob_map = np.zeros(self._grid_map._grid.shape[:2])
P_s_a = self._transition_matrix[state, action, :]
looping_idx = 0
num_full_cells = self._grid_map.num_occupied
for i, prob in enumerate(P_s_a):
r, c = self._state_to_cell(i)
prob_map[r, c] = prob
looping_idx += 1
if looping_idx == num_full_cells:
looping_idx = 0
additional_dimension_idx = i // num_full_cells
if additional_dimension_idx != additional_dimension_to_plot:
continue
# reached end of grid, plot current prob_map
add_bar = axes is None
if axes is not None:
ax = axes
fig = ax.get_figure()
else:
fig, ax = plt.subplots()
im = ax.imshow(prob_map.T, cmap="hot", origin="lower", interpolation="nearest", vmin=0, vmax=1)
if add_bar:
fig.colorbar(im)
r, c = self._state_to_cell(state)
ax.scatter(r, c, color="green", marker="o", s=100, label="Current State")
x, y = self.state_to_pos(state)
ax.set_title(
f"P(s'=k | s=({x:.2f}, {y:.2f}{self.additional_dimension_to_name(additional_dimension_idx)}), a={self._action_to_name[action]})"
)
ax.legend()
return fig, ax
[docs]
def plot_transition_state(self, state: "int", additional_dimension_to_plot: int = 0):
"""
Visualize all the transition probabilities from a given state, across all actions.
Args
----
state:
Current discrete state identifier
"""
n_actions = len(self._action_to_name)
max_fig_cols = 4
n_rows = (n_actions + max_fig_cols - 1) // max_fig_cols
fig, axes = plt.subplots(n_rows, min(n_actions, max_fig_cols))
for action in range(n_actions):
ax = axes[action // max_fig_cols, action % max_fig_cols] if n_rows > 1 else axes[action % max_fig_cols]
self.plot_transitions_state_action(
state, action, additional_dimension_to_plot=additional_dimension_to_plot, axes=ax
)
fig.colorbar(ax.images[0], ax=axes.ravel().tolist())
return fig, axes
[docs]
def plot_labels(self, additional_dimension_to_plot: int = 0, ax: "plt.Axes | None" = None):
"""
Visualize the labels assigned to each state in the grid.
Returns
-------
Matplotlib figure and axes containing the label visualization
"""
label_map = np.full(self._grid_map._grid.shape[:2], "█", dtype=str)
num_full_cells = self._grid_map.num_occupied
for state, _ in enumerate(self._states_to_cell_map):
additional_dimension_idx = state // num_full_cells
if additional_dimension_idx != additional_dimension_to_plot:
continue
r, c = self._state_to_cell(state)
ic = self._grid_map._grid.shape[1] - c - 1 # invert column for plotting
label_map[r, ic] = self._MDP.labels[state]
fig, ax = plt.subplots() if ax is None else (ax.get_figure(), ax)
ax.table(cellText=label_map.T, loc="center", cellLoc="center")
ax.axis("off")
fig.tight_layout()
return fig, ax
[docs]
class KingGridAbstraction(GridAbstraction["tuple[int, int]"]):
"""
Implementation of the King grid movement model.
The King can move one square in any direction (horizontally, vertically, or diagonally).
The probability of successful movement is 0.8, while the probability of ending up in any
of the adjacent squares (including diagonals) is uniformly distributed over the remaining 0.2 probability.
There is also the action of staying in place with probability 1.0.
The actions are represented as tuples (delta_velocity_x, delta_velocity_y), where each component can be -1, 0, or 1.
.. svgbob::
↖ ↑ ↗
y ← ● →
↙ ↓ ↘
x
"""
def __init__(self, grid_map: "GridMap"):
super().__init__(grid_map, tuple(product((-1, 0, 1), repeat=2)))
[docs]
def _get_prob_vector(self, delta: "int") -> np.ndarray:
if delta == -1:
return np.array([0.1, 0.7, 0.2, 0.0, 0.0])
if delta == 0:
return np.array([0.0, 0.0, 1.0, 0.0, 0.0])
if delta == 1:
return np.array([0.0, 0.0, 0.2, 0.7, 0.1])
raise ValueError("Invalid action for King movement model")
[docs]
def transition_func(self, state: "int", action: "int") -> np.ndarray:
"""
Compute transition probabilities from a given state and action.
Args
----
state:
Current discrete state identifier
action:
Action tuple containing (delta_velocity_x, delta_velocity_y)
Returns
-------
Array of transition probabilities to all states.
The length of the array should equal the number of states.
"""
x, y = self._state_to_cell(state)
P_s_a = np.zeros(len(self.state_set))
delta_x, delta_y = self._action_to_name[action]
prob_x = self._get_prob_vector(delta_x)
prob_y = self._get_prob_vector(delta_y)
assert len(prob_x) % 2 == 1 and len(prob_y) % 2 == 1, "Probability vectors must have odd length"
prob = np.outer(prob_x, prob_y)
for m in range(len(prob_x)):
for n in range(len(prob_y)):
new_x = x + m - len(prob_x) // 2
new_y = y + n - len(prob_y) // 2
if 0 <= new_x < self._grid_map.grid.shape[0] and 0 <= new_y < self._grid_map.grid.shape[1]:
state = self._cell_to_states_map[new_x, new_y]
if state == -1:
continue
P_s_a[state] = prob[m, n]
# P_sn_row = np.clip(position[0] + m - 2, 0, self._grid_shape[0] - 1)
# P_sn_col = np.clip(position[1] + n - 2, 0, self._grid_shape[1] - 1)
# P_sn[P_sn_row, P_sn_col] += prob_map[m, n]
return P_s_a
[docs]
class SpeedGridAbstraction(GridAbstraction[_A]):
"""
A grid-based abstraction that incorporates velocity into the state representation.
This class extends the GridAbstraction to include velocity components in the state representation.
The actions are represented as tuples (delta_movement, delta_speed), where each component can be -1, 0, or 1.
Movement Model:
- The agent can move left (-1), forward (0), or right (1) with specified probabilities.
- The speed can decrease (-1), remain the same (0), or increase (1) with specified probabilities.
In this case, the state representation is assumed to be a combination of position and velocity.
"""
def __init__(
self,
grid_map: "GridMap",
max_speed: float,
actions: "tuple[_A]",
speed_res: float = 1.0,
initial_state: "tuple[float, float, float]" = (0.0, 0.0, 0.0),
label_function: "GridLabelFunction | None" = None,
):
self._num_active_pos = grid_map.num_occupied
self._speed_range = int(max_speed // speed_res) + 1
self._speed_res = speed_res
super().__init__(grid_map, actions, state_repetition=self._speed_range)
self.update(initial_state, label_function)
@property
def speed_range(self) -> int:
"""
Get the number of discrete speed levels in the abstraction.
Returns
-------
Number of speed levels
"""
return self._speed_range
@property
def speed_resolution(self) -> float:
"""
Get the speed resolution used in the abstraction.
Returns
-------
Speed resolution value
"""
return self._speed_res
@property
def max_speed(self) -> float:
"""
Get the maximum speed represented in the abstraction.
Returns
-------
Maximum speed value
"""
return (self._speed_range - 1) * self._speed_res
[docs]
@lru_cache
def _state_to_speed_idx(self, state: int) -> "int":
"""
Convert a discrete state to its corresponding speed index.
Args
----
state:
Discrete state identifier
Returns
-------
Grid cell coordinates as (column, row) indices
Raises
------
AssertionError: If the state identifier is invalid
"""
assert 0 <= state < len(self._states_to_cell_map), "Invalid state identifier"
return state // self._num_active_pos
[docs]
def _cell_speed_to_state(self, cell: "_Cell", speed_idx: int) -> "int":
"""
Convert grid cell coordinates and speed index to a discrete state identifier.
Args
----
cell:
Grid cell coordinates as (column, row) indices
speed_idx:
Speed index
Returns
-------
Unique state identifier for the cell and speed
Raises
------
AssertionError: If cell coordinates or speed index are invalid
"""
r, c = cell
assert (
0 <= r < self._grid_map._grid.shape[0] and 0 <= c < self._grid_map._grid.shape[1]
), "Invalid cell coordinates"
assert 0 <= speed_idx < self._speed_range, "Invalid speed index"
base_state = self._cell_to_states_map[r, c]
return base_state + self._num_active_pos * speed_idx
[docs]
@lru_cache
def state_to_speed(self, state: int) -> "float":
"""
Convert a discrete grid state to a continuous speed.
Args
----
state:
Discrete state identifier
Returns
-------
Continuous speed value
Raises
------
AssertionError: If the state identifier is invalid
"""
assert 0 <= state < len(self._states_to_cell_map), "Invalid state identifier"
return self._state_to_speed_idx(state) * self._speed_res
[docs]
def cell_speed_idx_to_pos_speed(self, cell: "_Cell", speed_idx: int) -> "tuple[_Position, float]":
"""
Convert grid cell coordinates and speed index to continuous position and speed.
Args
----
cell:
Grid cell coordinates as (column, row) indices
speed_idx:
Speed index
Returns
-------
Tuple containing continuous position as (x, y) coordinates and speed value
Raises
------
AssertionError: If cell coordinates or speed index are invalid
"""
r, c = cell
assert (
0 <= r < self._grid_map._grid.shape[0] and 0 <= c < self._grid_map._grid.shape[1]
), "Invalid cell coordinates"
assert 0 <= speed_idx < self._speed_range, "Invalid speed index"
return self._grid_map.cell_to_pos((r, c)), speed_idx * self._speed_res
[docs]
def pos_speed_to_state(self, position: "_Position", speed: float) -> "int | None":
"""
Convert a continuous position and speed to the corresponding abstract state.
Args
----
position:
Continuous position as (x, y) coordinates
speed:
Continuous speed value
Returns
-------
Discrete state identifier corresponding to the grid cell and speed,
or None if the position is outside the grid or in an unoccupied cell
"""
base_state = self.pos_to_state(position)
if base_state is None:
return None
speed_idx = int(speed // self._speed_res)
if 0 <= speed_idx < self._speed_range:
return base_state + self._num_active_pos * speed_idx
return None
[docs]
def state_to_pos_speed(self, state: "int") -> "tuple[_Position, float]":
"""
Convert a discrete state identifier to continuous position and speed.
Args
----
state:
Discrete state identifier
Returns
-------
Tuple containing continuous position as (x, y) coordinates and speed value
Raises
------
AssertionError: If the state identifier is invalid
"""
return self.state_to_pos(state), self.state_to_speed(state)
[docs]
def update(
self,
pos_speed: "tuple[float, float, float] | int | None" = None,
label_function: "GridLabelFunction | None" = None,
):
"""
Update the MDP with a new initial state and labels.
Args
----
pos_speed:
New initial position as (x, y) coordinates and speed (z) or direct state index. Ignored if not provided
label_function:
Dictionary mapping Bounds regions to label strings. Ignored if not provided
"""
if isinstance(pos_speed, int):
self._MDP.initial_state = pos_speed
elif pos_speed is not None:
self._MDP.initial_state = self.pos_speed_to_state(pos_speed[0:2], pos_speed[2])
if label_function is not None:
self._MDP.labels = self.gen_labels(label_function)
[docs]
def additional_dimension_to_name(self, additional_dimension_idx: int) -> str:
return f", s: {additional_dimension_idx * self._speed_res}"
[docs]
class PathGridAbstraction(SpeedGridAbstraction["tuple[int, int]"]):
def __init__(
self,
grid_map: "GridMap",
max_speed: float,
speed_res: float = 1.0,
initial_state: "tuple[float, float, float]" = (0.0, 0.0, 0.0),
label_function: "GridLabelFunction | None" = None,
):
assert isinstance(grid_map, PathsGridMap), "PathGridAbstraction requires a PathsGridMap"
self._grid_map: "PathsGridMap"
super().__init__(
grid_map=grid_map,
max_speed=max_speed,
actions=tuple(product(range(grid_map.num_paths), (-1, 0, 1))),
speed_res=speed_res,
initial_state=initial_state,
label_function=label_function,
)
[docs]
def _get_prob_vector(self, delta: "int") -> np.ndarray:
if delta == -1:
return np.array([0.0, 0.8, 0.2, 0.0, 0.0])
if delta == 0:
return np.array([0.0, 0.0, 1.0, 0.0, 0.0])
if delta == 1:
return np.array([0.0, 0.0, 0.2, 0.8, 0.0])
raise ValueError("Invalid action")
[docs]
def gen_transitions(self):
"""
Generate the transition probability matrix for the MDP.
Computes transition probabilities for all state-action pairs based on the
scenario-specific movement model.
Returns
-------
3D transition matrix with shape (n_states, n_actions, n_states)
where entry [i,j,k] represents P(s'=k | s=i, a=j)
"""
P = np.zeros((self._states_to_cell_map.size, len(self._action_to_name), self._states_to_cell_map.size))
for (current_cell, path_idx), target_cell in self._grid_map.edges.items():
for speed in range(self._speed_range):
state = self._cell_speed_to_state(current_cell, speed)
for speed_delta in (-1, 0, 1):
action = self._name_to_action[(path_idx, speed_delta)]
P[state, action, :] = self.transition_func(state, action, target_cell=target_cell)
return P
[docs]
def transition_func(self, state: "int", action: "int", target_cell: "_Cell") -> "np.ndarray":
"""
Compute transition probabilities from a given state and action.
Args
----
state:
Current discrete state identifier
action:
Action tuple containing (delta_velocity_x, delta_velocity_y)
Returns
-------
Array of transition probabilities to all states.
The length of the array should equal the number of states.
"""
speed_idx = self._state_to_speed_idx(state)
P_s_a = np.zeros(len(self.state_set))
_, delta_speed = self._action_to_name[action]
prob_speed = self._get_prob_vector(delta_speed)
assert len(prob_speed) % 2 == 1, "Probability vectors must have odd length"
for v in range(len(prob_speed)):
new_speed_idx = speed_idx + v - len(prob_speed) // 2
if not (0 <= new_speed_idx < self._speed_range):
continue # Out of speed bounds
new_state = self._cell_speed_to_state(target_cell, new_speed_idx)
assert new_state != -1, "State should be valid"
P_s_a[new_state] = prob_speed[v]
prob = P_s_a.sum()
missing = 1.0 - prob
if missing > 0 and prob > 0:
P_s_a += P_s_a / prob * missing # Scale existing probabilities
assert np.isclose(P_s_a.sum(), 1.0), "Transition probabilities must sum to 1"
return P_s_a
[docs]
def state_action_to_next_state(
self, state: "int | tuple[float, float, float]", action: "tuple[int, int] | int"
) -> "tuple[tuple[float, float], float]":
if isinstance(action, Integral):
action = self._action_to_name[action]
path_idx, delta_speed = action
if isinstance(state, Integral):
x, y = self._state_to_cell(state)
s = self._state_to_speed_idx(state)
else:
x, y, s = state
next_cell: "_Cell" = self._grid_map.edges.get(((x, y), path_idx), (x, y))
assert next_cell is not None, "No valid next cell for the given state and action"
assert (
0 <= next_cell[0] < self.grid_map.rows and 0 <= next_cell[1] < self.grid_map.cols
), "Next cell out of bounds"
next_s = np.clip(s + delta_speed, 0, self.speed_range - 1)
return self.cell_speed_idx_to_pos_speed(next_cell, next_s)
[docs]
class VelocityGridAbstraction(SpeedGridAbstraction["tuple[SteerAction, SpeedAction]"]):
"""
A grid-based abstraction that incorporates velocity into the state representation.
This class extends the GridAbstraction to include velocity components in the state representation.
The actions are represented as tuples (delta_movement, delta_speed), where each component can be -1, 0, or 1.
Movement Model:
- The agent can move left (-1), forward (0), or right (1) with specified probabilities.
- The speed can decrease (-1), remain the same (0), or increase (1) with specified probabilities.
In this case, the state representation is assumed to be a combination of position and velocity.
"""
def __init__(
self,
grid_map: "GridMap",
max_speed: float,
speed_res: float = 1.0,
initial_state: "tuple[float, float, float]" = (0.0, 0.0, 0.0),
label_function: "GridLabelFunction | None" = None,
):
move_set = np.array(
["ul", "u", "ur", "l", "s", "r", "dl", "d", "dr"]
) # 'ur' for 'turn right', 'u' for 'go straight', 'ul' for 'turn left'
# speed_set = np.array(["d", "c", "a"]) # 'd' for 'decelerate', 'c' for 'cruise', 'a' for 'accelerate'
speed_set = np.array(["c"]) # 'd' for 'decelerate', 'c' for 'cruise', 'a' for 'accelerate'
A, B = np.meshgrid(move_set, speed_set)
actions = tuple((m, s) for m, s in np.array([A.flatten(), B.flatten()]).T)
super().__init__(
grid_map=grid_map,
max_speed=max_speed,
actions=actions,
speed_res=speed_res,
initial_state=initial_state,
label_function=label_function,
)
[docs]
def _get_prob_spatial(self, delta: "MoveAction") -> np.ndarray:
prob_spatial = np.zeros((5, 5)) # TODO: make 3x3
if delta == "ul": # up left
# Primary transition: r+1, ey+1 (forward and left)
prob_spatial[1, 3] = 0.8 # r+1, ey+1
# Some uncertainty around the main transition
prob_spatial[2, 3] = 0.1 # r+1, ey+0 (forward only)
prob_spatial[1, 2] = 0.1 # r+0, ey+1 (left only)
elif delta == "u": # up
# Primary transition: r+1, ey+0 (forward only)
prob_spatial[2, 3] = 0.9 # r+1, ey+0
# Some uncertainty
prob_spatial[2, 2] = 0.1 # r+0, ey+0 (no movement)
elif delta == "ur": # up right
# Primary transition: r+1, ey-1 (forward and right)
prob_spatial[3, 3] = 0.8 # r+1, ey-1
# Some uncertainty around the main transition
prob_spatial[2, 3] = 0.1 # r+1, ey+0 (forward only)
prob_spatial[3, 2] = 0.1 # r+0, ey-1 (right only)
elif delta == "l": # left
# Primary transition: r+0, ey+1 (left only)
prob_spatial[1, 2] = 0.8 # r+0, ey+1
# Some uncertainty around the main transition
prob_spatial[1, 1] = 0.1 # r+1, ey+1 (forward and left)
prob_spatial[1, 3] = 0.1 # r-1, ey+1 (backward and left)
elif delta == "s": # stay
prob_spatial[2, 2] = 1.0 # r+0, ey+0 (no movement)
elif delta == "r": # right
# Primary transition: r+0, ey-1 (right only)
prob_spatial[3, 2] = 0.8 # r+0, ey-1
# Some uncertainty around the main transition
prob_spatial[3, 1] = 0.1 # r+1, ey-1 (forward and right)
prob_spatial[3, 3] = 0.1 # r-1, ey-1 (backward and right)
elif delta == "dl": # down left
# Primary transition: r-1, ey+1 (backward and left)
prob_spatial[1, 1] = 0.8 # r-1, ey+1
# Some uncertainty around the main transition
prob_spatial[2, 1] = 0.1 # r-1, ey+0 (backward only)
prob_spatial[1, 2] = 0.1 # r+0, ey+1 (left only)
elif delta == "d": # down
# Primary transition: r-1, ey+0 (backward only)
prob_spatial[2, 1] = 0.9 # r-1, ey+0
# Some uncertainty
prob_spatial[2, 2] = 0.1 # r+0, ey+0 (no movement)
elif delta == "dr": # down right
# Primary transition: r-1, ey-1 (backward and right)
prob_spatial[3, 1] = 0.8 # r-1, ey-1
# Some uncertainty around the main transition
prob_spatial[2, 1] = 0.1 # r-1, ey+0 (backward only)
prob_spatial[3, 2] = 0.1 # r+0, ey-1 (right only)
return prob_spatial
[docs]
def _get_prob_speed(self, delta: "SpeedAction") -> np.ndarray:
prob_v = np.zeros(5) # Index 2 represents no change (δv=0)
if delta == "a":
prob_v[3] = 0.9 # v+1
prob_v[2] = 0.1 # v+0 (failed acceleration)
elif delta == "c":
prob_v[2] = 1.0 # v+0 (no velocity change)
elif delta == "d":
prob_v[1] = 0.9 # v-1
prob_v[2] = 0.1 # v+0 (failed deceleration)
return prob_v
[docs]
def transition_func(self, state: "int", action: "int") -> np.ndarray:
"""
Compute transition probabilities from a given state and action.
Args
----
state:
Current discrete state identifier
action:
Action tuple containing (delta_velocity_x, delta_velocity_y)
Returns
-------
Array of transition probabilities to all states.
The length of the array should equal the number of states.
"""
x, y = self._state_to_cell(state)
speed_idx = self._state_to_speed_idx(state)
P_s_a = np.zeros(len(self._states_to_cell_map))
delta_mov, delta_speed = self._action_to_name[action]
prob_mov = self._get_prob_spatial(delta_mov)
prob_speed = self._get_prob_speed(delta_speed)
assert len(prob_mov) % 2 == 1 and len(prob_speed) % 2 == 1, "Probability vectors must have odd length"
# Create a combined probability distribution over movement and speed changes
# Imagine extending the 2D movement probability grid into a 3D grid by stacking
# copies of it for each possible speed change, weighted by the speed change probabilities.
#
# /────────┐
# k/ / │
# ┌────────┐ ┌────────┐ │
# │ │ ┌┐ │ │ /
# i │ │ X ││k = i │ │ /
# │ │ └┘ │ │/
# └────────┘ └────────┘
# j j
#
prob = np.einsum("ij,k->ijk", prob_mov, prob_speed)
assert prob.shape == (prob_mov.shape[0], prob_mov.shape[1], prob_speed.shape[0])
for m in range(prob_mov.shape[0]):
for n in range(prob_mov.shape[1]):
if prob_mov[m, n] == 0.0:
continue # No probability for this movement
new_x = x + m - prob_mov.shape[0] // 2
new_y = y + n - prob_mov.shape[1] // 2
if not self._grid_map.is_occupied(new_x, new_y):
continue # Cell is not occupied
for v in range(len(prob_speed)):
if prob[m, n, v] == 0.0:
continue # No probability for this transition
new_speed_idx = speed_idx + v - len(prob_speed) // 2
if not (0 <= new_speed_idx < self._speed_range):
continue # Out of speed bounds
new_state = self._cell_speed_to_state((new_x, new_y), new_speed_idx)
assert new_state != -1, "State should be valid"
P_s_a[new_state] = prob[m, n, v]
# TODO: you can get situation where the output prob distribution does not sum to 1. Is it a problem?
# Potentially we could compute the missing probability for each (m,n) and distribute it to the valid new states proportionally
# missing_prob = 1.0 - P_s_a.sum()
# if missing_prob > 0:
# valid_new_states = np.where(P_s_a > 0)[0]
# P_s_a[valid_new_states] += P_s_a[valid_new_states] * missing_prob / P_s_a[valid_new_states].sum()
# assert P_s_a.sum() == 0 or np.allclose(P_s_a.sum(), 1.0, atol=0.01), f"Transition probabilities must sum to 1, got {P_s_a.sum()}"
return P_s_a
[docs]
def state_action_to_next_state(
self, state: "int | tuple[float, float, float]", action: "tuple[SteerAction, SpeedAction] | int"
) -> "tuple[tuple[float, float], float]":
if isinstance(action, Integral):
action = self._action_to_name[action]
delta_mov, delta_speed = action
if isinstance(state, Integral):
x, y = self._state_to_cell(state)
s = self._state_to_speed_idx(state)
else:
x, y, s = state
# delta_x = 1
# if delta_mov == "l":
# delta_y = 1
# elif delta_mov == "f":
# delta_y = 0
# elif delta_mov == "r":
# delta_y = -1
if delta_mov == "ul":
delta_x = -1
delta_y = 1
elif delta_mov == "u":
delta_x = 0
delta_y = 1
elif delta_mov == "ur":
delta_x = 1
delta_y = 1
elif delta_mov == "l":
delta_x = -1
delta_y = 0
elif delta_mov == "s":
delta_x = 0
delta_y = 0
elif delta_mov == "r":
delta_x = 1
delta_y = 0
elif delta_mov == "dl":
delta_x = -1
delta_y = -1
elif delta_mov == "d":
delta_x = 0
delta_y = -1
elif delta_mov == "dr":
delta_x = 1
delta_y = -1
if delta_speed == "a":
delta_s = 1
elif delta_speed == "c":
delta_s = 0
elif delta_speed == "d":
delta_s = -1
target_x = np.clip(x + delta_x, 0, self.grid_map.rows - 1)
target_y = np.clip(y + delta_y, 0, self.grid_map.cols - 1)
target_s = np.clip(s + delta_s, 0, self.speed_range - 1)
return self.cell_speed_idx_to_pos_speed((target_x, target_y), target_s)
[docs]
def _main():
from matplotlib import pyplot as plt
ROTATION = np.pi / 6 # 30 degrees
point = (32.0, 18.0)
cos_angle = np.cos(ROTATION)
sin_angle = np.sin(ROTATION)
translated_x = point[0] - 10.0
translated_y = point[1] - 10.0
rotated_x = translated_x * cos_angle - translated_y * sin_angle
rotated_y = translated_x * sin_angle + translated_y * cos_angle
rotated_point = (rotated_x + 10.0, rotated_y + 10.0)
for grid in (
GridMap((10, 4), (2.0, 2.0), (12.0, 10.0), rotation=ROTATION),
GridMap.from_bounds((12.0, 10.0), rotated_point, (2.0, 2.0), rotation=ROTATION),
GridMap.from_corner((12.0, 10.0), (20.0, 8.0), (2.0, 2.0), rotation=ROTATION),
):
grid.add_cell(2, 3)
grid.add_cell(8, 0)
grid.add_cell(9, 1)
grid.add_pos((18, 17))
grid.add_pos_range((10, 14), (18, 14))
fig1, ax1 = grid.plot_grid()
fig2, ax2 = grid.plot_grid_projection()
plt.show()
[docs]
def plotting():
ROTATION = np.pi / 6 # 30 degrees
grid = GridMap.from_corner((12.0, 10.0), (20.0, 8.0), (2.0, 2.0), rotation=ROTATION)
grid.add_cell_range(0, 10, 0, 4)
grid.add_cell(2, 1)
grid.add_cell(1, 0)
grid.remove_cell(9, 3)
grid.plot_grid()
abs = VelocityGridAbstraction(grid, max_speed=1.0, speed_res=0.5, initial_state=(12.0, 10.0, 0.0))
abs.plot_transition_state(5, additional_dimension_to_plot=1)
plt.show()
if __name__ == "__main__":
plotting()