Skip to content

Commit

Permalink
Release v1.1.0 (#35)
Browse files Browse the repository at this point in the history
  • Loading branch information
sjschlapbach authored Feb 24, 2024
2 parents 7e818c8 + d2f45f8 commit 39ea472
Show file tree
Hide file tree
Showing 21 changed files with 1,033 additions and 342 deletions.
2 changes: 1 addition & 1 deletion src/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .algorithm import *
from .algorithms import *
from .envs import *
from .obstacles import *
from .examples import *
Expand Down
1 change: 1 addition & 0 deletions src/algorithm/__init__.py → src/algorithms/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .graph import Graph
from .timed_edge import TimedEdge
from .ta_prm import TAPRM
from .rrt import RRT
192 changes: 83 additions & 109 deletions src/algorithm/graph.py → src/algorithms/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from scipy.stats import qmc

from src.envs.environment_instance import EnvironmentInstance
from src.algorithm.timed_edge import TimedEdge
from src.algorithms.timed_edge import TimedEdge


class Graph:
Expand All @@ -19,7 +19,6 @@ class Graph:
Args:
num_samples (int): The number of samples to generate as vertices in the graph.
neighbour_distance (float): The maximum distance between a vertex and its neighbor.
env (EnvironmentInstance): An instance of the environment in which the graph is constructed.
Attributes:
Expand All @@ -28,25 +27,22 @@ class Graph:
edges (dict): A dictionary that stores the edges between vertices.
connections (dict): A dictionary that stores the connections between vertices for faster access.
num_vertices (int): The number of vertices in the graph.
neighbour_distance (float): The maximum distance between a vertex and its neighbor.
max_connections (int): The maximum number of connections for each vertex.
start (int): The index of the start vertex.
goal (int): The index of the goal vertex.
Methods:
__init__: Initializes a Graph object.
__sample_nodes: Generates random points as vertices in the graph.
connect_vertices: Connects vertices in the graph.
__connect_neighbours: Connects the given vertex with its neighboring vertices within a specified distance.
plot: Plots the graph, including all vertices and edges.
connect_start: Connects the start node to the graph.
connect_goal: Connects the goal node to the graph.
path_cost: Calculates the cost of a given solution path.
"""

def __init__(
self,
env: EnvironmentInstance,
num_samples: int = 1000,
neighbour_distance: float = 10.0,
max_connections: int = 10,
seed: int = None,
quiet: bool = False,
):
Expand All @@ -56,8 +52,6 @@ def __init__(
Args:
env (EnvironmentInstance): The environment instance to use for sampling and collision checking.
num_samples (int): The number of samples to generate.
neighbour_distance (float): The maximum distance between neighboring vertices.
max_connections (int): The maximum number of connections for each vertex.
seed (int): The seed to use for random number generation.
quiet (bool): If True, disables verbose print statements / progress bars.
"""
Expand All @@ -66,20 +60,68 @@ def __init__(

# save other parameters
self.num_vertices = num_samples
self.neighbour_distance = neighbour_distance
self.max_connections = max_connections

# sample random vertices
self.__sample_nodes(num_samples=num_samples, seed=seed)

# connect vertices
self.connect_vertices(quiet=quiet)

# initialize empty start and goal vertex indices
self.start = None
self.goal = None

def connect_start(self, coords: Tuple[float, float]):
# initialize parameter required for sample-dependent connection distance
d = 2
obs_free_volume = env.get_static_obs_free_volume()
unit_ball_volume = np.pi
self.gammaPRM = (
2
* ((1 + 1 / d) ** (1 / d))
* ((obs_free_volume / unit_ball_volume) ** (1 / d))
+ 1e-10
)

# sample random vertices and connect them to all neighbors in gammaPRM-dependent distance
self.vertices = {}
vertex_idx = 0

# initialize edges, connections and heuristic dictionaries
# format {"edge_id": EdgeWithTemporalAvailability}
self.edges = {}
# format {"node_id": [("neighbor_id", "edge_id"), ...]}, initialize empty
self.connections = {}
# format {"node_id": float}, initialize infinite
self.heuristic = {}

# initialize edge index
next_edge_idx = 0

if not quiet:
print("Connecting vertices in the graph...")

while vertex_idx < num_samples:
# draw sample from random uniform distribution
x_candidate = np.random.uniform(env.dim_x[0], env.dim_x[1])
y_candidate = np.random.uniform(env.dim_y[0], env.dim_y[1])
pt = ShapelyPoint(x_candidate, y_candidate)

if not self.env.static_collision_free(pt):
continue
else:
self.vertices[vertex_idx] = pt
self.connections[vertex_idx] = []
self.heuristic[vertex_idx] = np.inf

# connect to all neighbors in vertex-set size-dependent distance
n = vertex_idx + 1
neighbor_distance = self.gammaPRM * (np.log(n) / n) ** (1 / 2)
success, next_edge_idx = self.__connect_neighbours(
vertex_idx=vertex_idx,
neighbor_distance=neighbor_distance,
next_edge_idx=next_edge_idx,
)

if success or vertex_idx == 0:
vertex_idx += 1

def connect_start(
self, coords: Tuple[float, float], override_distance: float = None
):
"""
Connects the start node to the graph.
Expand All @@ -106,20 +148,29 @@ def connect_start(self, coords: Tuple[float, float]):
# add start node to vertices and create connect it to the graph
self.vertices[self.start] = start_pt
self.connections[self.start] = []
n = self.start + 1
neighbor_distance = self.gammaPRM * (np.log(n) / n) ** (1 / 2)
success, _ = self.__connect_neighbours(
self.start, next_edge_idx=len(self.edges), ignore_max_connections=True
vertex_idx=self.start,
neighbor_distance=(
neighbor_distance if override_distance is None else override_distance
),
next_edge_idx=len(self.edges),
)

# check if the start node was connected to any other node
if not success or len(self.connections[self.start]) == 0:
raise ValueError("Start node could not be connected to any other node.")

def connect_goal(self, coords: ShapelyPoint, quiet: bool = False):
def connect_goal(
self, coords: ShapelyPoint, quiet: bool = False, override_distance: float = None
):
"""
Connects the goal node to the graph.
Args:
coords (ShapelyPoint): The coordinates of the goal node.
quiet (bool): If True, disables verbose print statements / progress bars.
Raises:
ValueError: If the goal node is not collision-free or could not be connected to any other node.
Expand All @@ -141,8 +192,14 @@ def connect_goal(self, coords: ShapelyPoint, quiet: bool = False):
# add goal node to vertices and create connect it to the graph
self.vertices[self.goal] = goal_pt
self.connections[self.goal] = []
n = self.goal + 1
neighbor_distance = self.gammaPRM * (np.log(n) / n) ** (1 / 2)
success, _ = self.__connect_neighbours(
self.goal, next_edge_idx=len(self.edges), ignore_max_connections=True
vertex_idx=self.goal,
neighbor_distance=(
neighbor_distance if override_distance is None else override_distance
),
next_edge_idx=len(self.edges),
)

# check if the goal node was connected to any other node
Expand All @@ -156,75 +213,16 @@ def connect_goal(self, coords: ShapelyPoint, quiet: bool = False):
for key in tqdm(self.vertices, disable=quiet):
self.heuristic[key] = self.vertices[key].distance(self.vertices[self.goal])

def __sample_nodes(self, num_samples: int, seed: int = None):
"""
Generates random points as vertices in the graph.
Args:
num_samples (int): The number of samples to generate.
env (EnvironmentInstance): The environment instance to use for sampling and collision checking.
"""
# set up Halton sequence and draw samples
lower_bounds = [self.env.dim_x[0], self.env.dim_y[0]]
upper_bounds = [self.env.dim_x[1], self.env.dim_y[1]]
sampler = qmc.Halton(d=2, scramble=False, seed=seed)

self.vertices = {}
vertex_idx = 0

while vertex_idx < num_samples:
# draw sample
candidate = sampler.random(n=1)
candidate = qmc.scale(candidate, lower_bounds, upper_bounds)

pt = ShapelyPoint(candidate[0][0], candidate[0][1])

if not self.env.static_collision_free(pt):
continue
else:
self.vertices[vertex_idx] = pt
vertex_idx += 1

def connect_vertices(self, quiet: bool = False):
"""
Connects the vertices in the graph by creating edges between neighboring vertices.
This method initializes the edges and connections dictionaries, and then iterates over each vertex in the graph.
For each vertex, it finds all neighboring vertices within the specified distance and creates an edge between them.
The edge is checked for collisions with static obstacles, and if it is collision-free, it is added to the graph.
The corresponding availability with respect to dynamic obstacles is checked and influnces the availability of the edge.
Returns:
None
"""

# initialize edges, connections and heuristic dictionaries
# format {"edge_id": EdgeWithTemporalAvailability}
self.edges = {}
# format {"node_id": [("neighbor_id", "edge_id"), ...]}
self.connections = {key: [] for key in self.vertices}
# format {"node_id": float}
self.heuristic = {key: np.inf for key in self.vertices}

# initialize edge index
next_edge_idx = 0

if not quiet:
print("Connecting vertices in the graph...")

for key in tqdm(self.vertices, disable=quiet):
success, next_edge_idx = self.__connect_neighbours(
key, next_edge_idx=next_edge_idx
)

def __connect_neighbours(
self, vertex_idx: int, next_edge_idx: int, ignore_max_connections: bool = False
self, vertex_idx: int, neighbor_distance: float, next_edge_idx: int
):
"""
Connects the given vertex with its neighboring vertices within a specified distance.
Args:
vertex_idx (int): The index of the vertex to connect.
neighbor_distance (float): The maximum distance at which to connect neighboring vertices.
next_edge_idx (int): The index of the next edge to be added to the graph.
Returns:
bool: True if the vertex was successfully connected to at least one other vertex, False otherwise.
Expand All @@ -237,24 +235,14 @@ def __connect_neighbours(
# initialize neighbours list
neighbours = []

# if the maximum number of connections is reached, skip
if (
len(self.connections[vertex_idx]) >= self.max_connections
and not ignore_max_connections
):
return False, next_edge_idx

# find all neighbours within the specified distance
for other_key in self.vertices:
other_vertex = self.vertices[other_key]
distance_to_other = vertex.distance(other_vertex)

if distance_to_other <= self.neighbour_distance:
if distance_to_other <= neighbor_distance:
neighbours.append((other_key, distance_to_other))

# sort neighbours by distance (ascending)
neighbours = sorted(neighbours, key=lambda x: x[1])

# track if node was successfully connected to any other node
valid_connection = False

Expand All @@ -268,20 +256,6 @@ def __connect_neighbours(
if nkey in [x[0] for x in self.connections[vertex_idx]]:
continue

# if the current key has reached the maximum number of connections, skip
if (
len(self.connections[vertex_idx]) >= self.max_connections
and not ignore_max_connections
):
return True, next_edge_idx

# if the neighbour node has reached the maximum number of connections, skip
if (
len(self.connections[nkey]) >= self.max_connections
and not ignore_max_connections
):
continue

# extract neighbour key and shapely coordinates
nnode = self.vertices[nkey]

Expand Down
Loading

0 comments on commit 39ea472

Please sign in to comment.