Source code for pybullet_planning.motion_planners.lazy_prm

from scipy.spatial.kdtree import KDTree
from heapq import heappush, heappop
from collections import namedtuple

from pybullet_planning.utils.iter_utils import get_pairs
from .utils import INF, elapsed_time
from .utils import random_selector, default_selector
from .smoothing import smooth_path

import time
import numpy as np

__all__ = [
    'lazy_prm',
    'replan_loop',
    ]

Node = namedtuple('Node', ['g', 'parent'])
unit_cost_fn = lambda v1, v2: 1.
zero_heuristic_fn = lambda v: 0

def retrace_path(visited, vertex):
    if vertex is None:
        return []
    return retrace_path(visited, visited[vertex].parent) + [vertex]

def dijkstra(start_v, neighbors_fn, cost_fn=unit_cost_fn):
    # Update the heuristic over time
    # TODO: overlap with discrete
    start_g = 0
    visited = {start_v: Node(start_g, None)}
    queue = [(start_g, start_v)]
    while queue:
        current_g, current_v = heappop(queue)
        if visited[current_v].g < current_g:
            continue
        for next_v in neighbors_fn(current_v):
            next_g = current_g + cost_fn(current_v, next_v)
            if (next_v not in visited) or (next_g < visited[next_v].g):
                visited[next_v] = Node(next_g, current_v)
                heappush(queue, (next_g, next_v))
    return visited

def wastar_search(start_v, end_v, neighbors_fn, cost_fn=unit_cost_fn,
                  heuristic_fn=zero_heuristic_fn, w=1, max_cost=INF, max_time=INF):
    """WA* uses evaluation function f(n) = g(n) + W*h(n).
    See: https://www.cs.cmu.edu/~aplatzer/orbital/Orbital-doc/api/orbital/algorithm/template/WAStar.html
    """
    # TODO: lazy wastar to get different paths
    #heuristic_fn = lambda v: cost_fn(v, end_v)
    priority_fn = lambda g, h: g + w*h
    goal_test = lambda v: v == end_v

    start_time = time.time()
    start_g, start_h = 0, heuristic_fn(start_v)
    visited = {start_v: Node(start_g, None)}
    queue = [(priority_fn(start_g, start_h), start_g, start_v)]
    while queue and (elapsed_time(start_time) < max_time):
        _, current_g, current_v = heappop(queue)
        if visited[current_v].g < current_g:
            continue
        if goal_test(current_v):
            return retrace_path(visited, current_v)
        for next_v in neighbors_fn(current_v):
            next_g = current_g + cost_fn(current_v, next_v)
            if (next_v not in visited) or (next_g < visited[next_v].g):
                visited[next_v] = Node(next_g, current_v)
                next_h = heuristic_fn(next_v)
                if priority_fn(next_g, next_h) < max_cost:
                    heappush(queue, (priority_fn(next_g, next_h), next_g, next_v))
    return None

##################################################

def get_embed_fn(weights):
    """get embedding function handle that apply dot(weights, q)
    """
    return lambda q: weights * q

def get_distance_fn(weights, p_norm=2):
    embed_fn = get_embed_fn(weights)
    return lambda q1, q2: np.linalg.norm(embed_fn(q2) - embed_fn(q1), ord=p_norm)

##################################################

def check_vertex(v, samples, colliding_vertices, collision_fn):
    if v not in colliding_vertices:
        colliding_vertices[v] = collision_fn(samples[v])
    return not colliding_vertices[v]

def check_edge(v1, v2, samples, colliding_edges, collision_fn, extend_fn):
    if (v1, v2) not in colliding_edges:
        segment = default_selector(extend_fn(samples[v1], samples[v2]))
        colliding_edges[v1, v2] = any(map(collision_fn, segment))
        colliding_edges[v2, v1] = colliding_edges[v1, v2]
    return not colliding_edges[v1, v2]

def check_path(path, colliding_vertices, colliding_edges, samples, extend_fn, collision_fn):
    for v in random_selector(path):
        if not check_vertex(v, samples, colliding_vertices, collision_fn):
            return False
    for v1, v2 in default_selector(get_pairs(path)):
        if not check_edge(v1, v2, samples, colliding_edges, collision_fn, extend_fn):
            return False
    return True

##################################################

def compute_graph(samples, weights=None, p_norm=2, max_degree=10, max_distance=INF, approximate_eps=0.):
    """Build a graph based on samples. We use ``scipy.spatial.kdtree.KDTree`` for now.

    Parameters
    ----------
    samples : list
        list of sampled configurations
    weights : list, optional
        joint weights, by default None, using np.ones(dimension)
    p_norm : int, optional
        p norm used for computing distance, by default 2
    max_degree : int, optional
        max vertex degree in the graph, by default 10
    max_distance : [type], optional
        [description], by default INF
    approximate_eps : [type], optional
        [description], by default 0.

    Returns
    -------
    vertices : list
        list of graph vertex indices
    edges : list
        list of graph edges (vertex index pairs)
    """
    vertices = list(range(len(samples)))
    edges = set()
    if not vertices:
        return vertices, edges
    if weights is None:
        weights = np.ones(len(samples[0]))
    embed_fn = get_embed_fn(weights)
    embedded = list(map(embed_fn, samples))
    kd_tree = KDTree(embedded)
    for v1 in vertices:
        # TODO: could dynamically compute distances
        distances, neighbors = kd_tree.query(embedded[v1], k=max_degree + 1, eps=approximate_eps,
                                             p=p_norm, distance_upper_bound=max_distance)
        for d, v2 in zip(distances, neighbors):
            if (d < max_distance) and (v1 != v2):
                edges.update([(v1, v2), (v2, v1)])
    # print(time.time() - start_time, len(edges), float(len(edges))/len(samples))
    return vertices, edges

##################################################

[docs]def lazy_prm(start, goal, sample_fn, extend_fn, collision_fn, num_samples=100, weights=None, p_norm=2, lazy=False, max_cost=INF, max_time=INF, verbose=False, draw_fn=None, **kwargs): #, max_paths=INF): """ :param start: Start configuration - conf :param goal: End configuration - conf :param sample_fn: Sample function - sample_fn()->conf :param extend_fn: Extension function - extend_fn(q1, q2)->[q', ..., q"] :param collision_fn: Collision function - collision_fn(q)->bool :param max_time: Maximum runtime - float :param kwargs: Keyword arguments :return: Path [q', ..., q"] or None if unable to find a solution """ # TODO: compute parameters using start, goal, and sample_fn statistics # TODO: multi-query motion planning start_time = time.time() # TODO: can embed pose and/or points on the robot for other distances if weights is None: weights = np.ones(len(start)) distance_fn = get_distance_fn(weights, p_norm=p_norm) # TODO: can compute cost between waypoints from extend_fn samples = [] while len(samples) < num_samples: conf = sample_fn() if (distance_fn(start, conf) + distance_fn(conf, goal)) < max_cost: samples.append(conf) start_index, end_index = 0, 1 samples[start_index] = start samples[end_index] = goal cost_fn = lambda v1, v2: distance_fn(samples[v1], samples[v2]) vertices, edges = compute_graph(samples, p_norm=p_norm, **kwargs) neighbors_from_index = {v: set() for v in vertices} for v1, v2 in edges: neighbors_from_index[v1].add(v2) if draw_fn: draw_fn(samples, edges) colliding_vertices, colliding_edges = {}, {} def neighbors_fn(v1): for v2 in neighbors_from_index[v1]: if not colliding_vertices.get(v2, False) and not colliding_edges.get((v1, v2), False): yield v2 if not lazy: for vertex in vertices: check_vertex(vertex, samples, colliding_vertices, collision_fn) for vertex1, vertex2 in edges: check_edge(vertex1, vertex2, samples, colliding_edges, collision_fn, extend_fn) visited = dijkstra(end_index, neighbors_fn, cost_fn) heuristic_fn = lambda v: visited[v].g if v in visited else INF path = None while (elapsed_time(start_time) < max_time) and (path is None): # TODO: max_attempts # TODO: extra cost to prioritize reusing checked edges lazy_path = wastar_search(start_index, end_index, neighbors_fn=neighbors_fn, cost_fn=cost_fn, heuristic_fn=heuristic_fn, max_cost=max_cost, max_time=max_time-elapsed_time(start_time)) if lazy_path is None: break cost = sum(cost_fn(v1, v2) for v1, v2 in get_pairs(lazy_path)) if verbose: print('Length: {} | Cost: {:.3f} | Vertices: {} | Edges: {} | Time: {:.3f}'.format( len(lazy_path), cost, len(colliding_vertices), len(colliding_edges), elapsed_time(start_time))) if check_path(lazy_path, colliding_vertices, colliding_edges, samples, extend_fn, collision_fn): path = lazy_path if path is None: return path, edges, colliding_vertices, colliding_edges solution = [start] for q1, q2 in get_pairs(path): solution.extend(extend_fn(samples[q1], samples[q2])) return solution, samples, edges, colliding_vertices, colliding_edges
################################################## def replan_loop(start_conf, end_conf, sample_fn, extend_fn, collision_fn, params_list=[100], smooth=0, **kwargs): if collision_fn(start_conf) or collision_fn(end_conf): return None from .meta import direct_path path = direct_path(start_conf, end_conf, extend_fn, collision_fn, **kwargs) if path is not None: return path for num_samples in params_list: path, _, _, _ = lazy_prm(start_conf, end_conf, sample_fn, extend_fn, collision_fn, num_samples=num_samples, **kwargs) if path is not None: return smooth_path(path, extend_fn, collision_fn, max_smooth_iterations=smooth, **kwargs) return None