"""General utilities for creating and operating with C-spaces.
"""
import math
import time
from ..math import vectorops
from .cspace import CSpace,MotionPlan
import warnings
import itertools
import random
[docs]def default_sampleneighborhood(c,r):
return [ci + random.uniform(-r,r) for ci in c]
[docs]def default_visible(a,b):
raise RuntimeError("Can't check visibility")
[docs]def default_distance(a,b):
return vectorops.distance(a,b)
[docs]def default_interpolate(a,b,u):
return vectorops.interpolate(a,b,u)
[docs]def make_default(space):
"""Helper: makes a space's callbacks perform the default Cartesian space
operations."""
space.sampleneighborhood = default_sampleneighborhood
space.visible = default_visible
space.distance = default_distance
space.interpolate = default_interpolate
[docs]class CompositeCSpace(CSpace):
"""A cartesian product of multiple spaces, given as a list upon
construction. The feasible method can be overloaded to include
interaction tests."""
def __init__(self,spaces):
CSpace.__init__(self)
self.spaces = spaces
#construct optional methods
def sampleneighborhood(c,r):
return self.join(s.sampleneighborhood(cs,r) for (s,cs) in zip(self.spaces,self.split(c)))
def visible(a,b):
return all(s.visible(ai,bi) for (s,ai,bi) in zip(self.spaces,self.split(a),self.split(b)))
def distance(a,b):
return sum(s.distance(ai,bi) for (s,ai,bi) in zip(self.spaces,self.split(a),self.split(b)))
def interpolate(a,b,u):
return self.join(s.interpolate(ai,bi,u) for (s,ai,bi) in zip(self.spaces,self.split(a),self.split(b)))
if any(hasattr(s,'sampleneighborhood') for s in spaces):
for s in self.spaces:
if not hasattr(s,'sampleneighborhood'):
s.sampleneighborhood = default_sampleneighborhood
self.sampleneighborhood = sampleneighborhood
if any(hasattr(s,'visible') for s in spaces):
for s in self.spaces:
if not hasattr(s,'visible'):
s.visible = default_visible
self.visible = visible
if any(hasattr(s,'distance') for s in spaces):
for s in self.spaces:
if not hasattr(s,'distance'):
s.distance = default_distance
self.distance = distance
if any(hasattr(s,'interpolate') for s in spaces):
for s in self.spaces:
if not hasattr(s,'interpolate'):
s.interpolate = default_interpolate
self.interpolate = interpolate
#TODO: should add feasibility tests for subspaces -- this will allow the planning module to optimize
#constraint testing order.
[docs] def subDims(self):
return [len(s.sample()) for s in self.spaces]
[docs] def split(self,x):
d = self.subDims()
res = []
pos = 0
for di in d:
res.append(x[pos:pos+di])
pos += di
return res
[docs] def join(self,xs):
res = []
for x in xs:
res += x
return res
[docs] def feasible(self,x):
for (xi,si) in zip(self.split(x),self.spaces):
if not si.feasible(xi):
return False
return True
[docs] def sample(self):
return self.join(s.sample() for s in self.spaces)
[docs]class EmbeddedCSpace(CSpace):
"""A subspace of an ambient space, with the active DOFs given by a list
of DOF indices of that ambient space.
.. note ::
A MotionPlan constructed on this object operates in the embedded space,
NOT the ambient space. To push endpoints to the embedded space you
will need to call :meth:`EmbeddedCSpace.project`, and to pull a plan
back to the ambient space, use the :meth:`EmbeddedCSpace.liftPlan`
method.
To make this more convenient, the :class:`SubsetMotionPlan` class is
provided for you. :class:`EmbeddedMotionPlan` does the same thing.
.. note ::
Sampling does not work directly when the ambient space has implicit
manifold constraints, e.g., closed-loop constraints. ``sample``
and ``sampleneighborhood`` will need to be customized so that the
constraint solving is done without perturbing the seed configuration
except for the dofs in the moving subset.
Attributes:
ambientspace (CSpace): the ambient configuration space
mapping (list): the list of active indices into the ambient configuration
space
xinit (list, optional): the initial configuration in the ambient space
(by default, 0 vector)
"""
def __init__(self,ambientspace,subset,xinit=None):
CSpace.__init__(self)
self.ambientspace = ambientspace
n = len(ambientspace.sample())
self.mapping = subset
#start at the zero config if no initial configuration is given
if xinit==None:
self.xinit = [0.0]*n
else:
self.xinit = xinit
#construct optional methods
def sampleneighborhood(c,r):
return self.project(self.ambientspace.sampleneighborhood(self.lift(c),r))
def visible(a,b):
return self.ambientspace.visible(self.lift(a),self.lift(b))
def distance(a,b):
return self.ambientspace.distance(self.lift(a),self.lift(b))
def interpolate(a,b,u):
return self.project(self.ambientspace.interpolate(self.lift(a),self.lift(b),u))
if hasattr(ambientspace,'sampleneighborhood'):
self.sampleneighborhood = sampleneighborhood
if hasattr(ambientspace,'visible'):
self.visible = visible
if hasattr(ambientspace,'distance'):
self.distance = distance
if hasattr(ambientspace,'interpolate'):
self.interpolate = interpolate
self.eps = self.ambientspace.eps
self.bound = [self.ambientspace.bound[i] for i in self.mapping]
self.properties = self.ambientspace.properties
if self.ambientspace.feasibilityTests is not None:
self.feasibilityTests = [(lambda x,f=f:f(self.lift(x))) for f in self.ambientspace.feasibilityTests]
self.feasibilityTestNames = self.ambientspace.feasibilityTestNames[:]
self.feasibilityTestDependencies = self.ambientspace.feasibilityTestDependencies[:]
[docs] def project(self,xamb):
"""Ambient space -> embedded space"""
if len(xamb) != len(self.xinit):
raise ValueError("Invalid length of ambient space vector: %d should be %d"%(len(xamb),len(self.xinit)))
return [xamb[i] for i in self.mapping]
[docs] def lift(self,xemb):
"""Embedded space -> ambient space"""
if len(xemb) != len(self.mapping):
raise ValueError("Invalid length of embedded space vector: %d should be %d"%(len(xemb),len(self.mapping)))
xamb = self.xinit[:]
for (i,j) in enumerate(self.mapping):
xamb[j] = xemb[i]
return xamb
[docs] def liftPath(self,path):
"""Given a CSpace path, lifts it to a full ambient C-space path"""
return [self.lift(q) for q in path]
[docs] def projectPath(self,path_amb):
"""Given an ambient C-space path, projects it to a C-space path"""
return [self.project(q) for q in path_amb]
[docs] def feasible(self,x):
return self.ambientspace.feasible(self.lift(x))
[docs] def sample(self):
return self.project(self.ambientspace.sample())
[docs]class AffineEmbeddedCSpace(CSpace):
"""A subspace of an ambient space, with the active DOFs given by variables x
that are transformed to the ambient space by the transform x_amb = A*x+b.
Here A is a sparse matrix.
.. note ::
A MotionPlan constructed on this object operates in the embedded space,
NOT the ambient space. To push endpoints to the embedded space you
will need to call :meth:`AffineEmbeddedCSpace.project`, and to pull a
plan back to the ambient space, use the
:meth:`AffineEmbeddedCSpace.liftPlan` method.
To make this more convenient, the :class:`EmbeddedMotionPlan` class is
provided for you.
.. note ::
Sampling does not work directly when the ambient space has implicit
manifold constraints, e.g., closed-loop constraints. ``sample``
and ``sampleneighborhood`` will need to be customized so that the
constraint solving is done without perturbing the seed configuration
except for the dofs in the moving subset.
Attributes:
ambientspace (CSpace): the ambient configuration space
A (list of dict, numpy array, or scipy sparse matrix): the map from
embedded space to ambient space
b (list, optional): the offset in the ambient space (by default, the
0 vector)
"""
def __init__(self,ambientspace,A,b=None):
CSpace.__init__(self)
self.ambientspace = ambientspace
n = len(ambientspace.sample())
self.A = A
if hasattr(A,'shape'): #numpy array or scipy.sparse matrix
m = A.shape[1]
if self.A.shape[0] != n:
raise ValueError("Coefficient matrix must have n rows")
else:
if len(self.A) != n:
raise ValueError("Coefficient matrix must have n rows")
m = 0
for row in A:
if not isinstance(row,dict):
raise ValueError("Coefficient matrix must be a list of dicts, numpy array, or scipy matrix")
for j in row:
if j < 0:
raise ValueError("Invalid entry {} in coeffcient matrix".format(j))
m = max(j,m)
if m >= n:
warnings.warn("Embedded space has more DOFs than ambient space?")
self.m = m
self.n = n
#start at the zero config if no offset is given
if b==None:
self.b = [0.0]*n
else:
self.b = b
if len(b) != n:
raise ValueError("Offset matrix must have n entries")
#construct optional methods
def sampleneighborhood(c,r):
return self.project(self.ambientspace.sampleneighborhood(self.lift(c),r))
def visible(a,b):
return self.ambientspace.visible(self.lift(a),self.lift(b))
def distance(a,b):
return self.ambientspace.distance(self.lift(a),self.lift(b))
def interpolate(a,b,u):
return self.project(self.ambientspace.interpolate(self.lift(a),self.lift(b),u))
if hasattr(ambientspace,'sampleneighborhood'):
self.sampleneighborhood = sampleneighborhood
if hasattr(ambientspace,'visible'):
self.visible = visible
if hasattr(ambientspace,'distance'):
self.distance = distance
if hasattr(ambientspace,'interpolate'):
self.interpolate = interpolate
self.eps = self.ambientspace.eps
inf = float('inf')
self.bound = [[-inf,inf] for i in range(m)]
#set naive bounds
Aentries = []
if hasattr(A,'tocoo'):
Acoo = A.tocoo()
Aentries = zip(Acoo.row,Acoo.col,Acoo.data)
elif hasattr(A,'dot'):
import numpy as np
Aentries = [(i,j,v) for (i,j),v in np.ndenumerate(A) if v != 0]
else:
Aentries = [(i,j,v) for (j,v) in row.items() for i,row in enumerate(A)]
for (i,j,v) in Aentries:
if v < 0:
if self.bound[j][0]*v > self.ambientspace.bound[i][1]:
self.bound[j][0] = self.ambientspace.bound[i][1]/v
if self.bound[j][1]*v < self.ambientspace.bound[i][0]:
self.bound[j][1] = self.ambientspace.bound[i][0]/v
elif v > 0:
if self.bound[j][0]*v < self.ambientspace.bound[i][1]:
self.bound[j][0] = self.ambientspace.bound[i][1]/v
if self.bound[j][1]*v > self.ambientspace.bound[i][0]:
self.bound[j][1] = self.ambientspace.bound[i][0]/v
self.properties = self.ambientspace.properties
if self.ambientspace.feasibilityTests is not None:
self.feasibilityTests = [(lambda x,f=f:f(self.lift(x))) for f in self.ambientspace.feasibilityTests]
self.feasibilityTestNames = self.ambientspace.feasibilityTestNames[:]
self.feasibilityTestDependencies = self.ambientspace.feasibilityTestDependencies[:]
print("AffineEmbeddedCSpace projects from dimension",self.m,"to",self.n)
[docs] @staticmethod
def fromRobotDrivers(robot,ambientspace,drivers=None):
"""Creates an AffineEmbeddedCSpace for a robot's drivers.
Args:
robot (RobotModel): the robot that you'd like to move
ambientspace (CSpace): an ambient space on the robot's full DOFs,
e.g., RobotCSpace.
drivers (list of int or list of str, optional): if provided, gives
a list of robot's active drivers.
"""
if drivers is None:
drivers = range(robot.numDrivers())
m = robot.numDrivers()
else:
m = len(drivers)
try:
import scipy.sparse
data = []
rows = []
cols = []
b = [0]*robot.numLinks()
for i,dind in enumerate(drivers):
d = robot.driver(dind)
links = d.getAffectedLinks()
if d.getType() != 'affine':
data.append(1)
rows.append(links[0])
cols.append(i)
else:
scale,offset = d.getAffineCoeffs()
for j,s,o in zip(links,scale,offset):
data.append(s)
rows.append(j)
cols.append(i)
b[j] = o
A = scipy.sparse.coo_matrix((data,(rows,cols)),shape=(robot.numLinks(),m))
except ImportError:
A = [dict() for i in robot.numLinks()]
b = [0]*robot.numLinks()
for i,dind in enumerate(drivers):
d = robot.driver(dind)
links = d.getAffectedLinks()
if d.getType() != 'affine':
A[links[0]][i] = 1
else:
scale,offset = d.getAffineCoeffs()
for j,s,o in zip(links,scale,offset):
A[j][i] = s
b[j] = o
return AffineEmbeddedCSpace(ambientspace,A,b)
[docs] def project(self,xamb):
"""Ambient space -> embedded space"""
if len(xamb) != len(self.b):
raise ValueError("Invalid length of ambient space vector: %d should be %d"%(len(xamb),len(self.b)))
import numpy as np
#xamb = A*xemb + b
if hasattr(self.A,'todense'):
import scipy.sparse.linalg
xemb = scipy.sparse.linalg.lsqr(self.A,np.asarray(xamb)-np.asarray(self.b))[0].tolist()
elif hasattr(self.A,'dot'):
xemb = np.linalg.lstsq(self.A,np.asarray(xamb)-np.asarray(self.b))[0].tolist()
else:
import scipy.sparse
A = scipy.sparse.lil_matrix((self.A.shape[0],self.m))
for i,row in enumerate(self.A):
for j,v in row.items():
A.rows[i].append(j)
A.data[i].append(v)
xemb = scipy.sparse.linalg.lsqr(A.tocsr(),np.asarray(xamb)-np.asarray(self.b))[0].tolist()
assert len(xemb)==self.m
return xemb
[docs] def lift(self,xemb):
"""Embedded space -> ambient space"""
if len(xemb) != self.m:
raise ValueError("Invalid length of embedded space vector: %d should be %d"%(len(xemb),self.m))
if hasattr(self.A,'dot'): #numpy array or scipy.sparse matrix
xamb = vectorops.add(self.b,self.A.dot(xemb))
else:
xamb = self.b[:]
#list of dicts
for (i,dofs) in enumerate(self.A):
for j,c in dofs.items():
xamb[j] += c*xemb[i]
return xamb
[docs] def liftPath(self,path):
"""Given a CSpace path, lifts it to a full ambient C-space path"""
return [self.lift(q) for q in path]
[docs] def projectPath(self,path_amb):
"""Given an ambient C-space path, projects it to a C-space path"""
return [self.project(q) for q in path_amb]
[docs] def feasible(self,x):
return self.ambientspace.feasible(self.lift(x))
[docs] def sample(self):
return self.project(self.ambientspace.sample())
[docs]class SubsetMotionPlan (MotionPlan):
"""An adaptor that "lifts" a motion planner in an :class:`EmbeddedCSpace`
to a higher dimensional ambient space. Used for planning in subsets of
robot DOFs.
"""
def __init__(self,space,subset,q0,type=None,**options):
MotionPlan.__init__(self,space,type,**options)
self.subset = subset
self.q0 = q0
[docs] def project(self,xamb):
"""Ambient space -> embedded space"""
if len(xamb) != len(self.q0):
raise ValueError("Invalid length of ambient space vector: %d should be %d"%(len(xamb),len(self.q0)))
return [xamb[i] for i in self.subset]
[docs] def lift(self,xemb):
"""Embedded space -> ambient space"""
if len(xemb) != len(self.subset):
raise ValueError("Invalid length of embedded space vector: %d should be %d"%(len(xemb),len(self.subset)))
xamb = self.q0[:]
for (i,j) in enumerate(self.subset):
xamb[j] = xemb[i]
return xamb
[docs] def setEndpoints(self,start,goal):
"""Takes care of projecting the start and goal (represented in the ambient
space) down to the subset. Works with both config and set goals.
"""
#take care of the moving subset -- make sure to lift MP configurations back to
#space configurations
embstart = self.project(start)
if hasattr(goal,'__iter__'):
if len(goal)==2 and callable(goal[0]) and callable(goal[1]):
#it's a (test,sample) pair
def goaltest(x,test=goal[0]):
return test(self.lift(x))
def goalsample(sample=goal[1]):
qamb = sample()
qproj = self.project(qamb)
return qproj
embgoal = [goaltest,goalsample]
else:
#it's a configuration
embgoal = self.project(goal)
elif callable(goal):
def goaltest(x,goal=goal):
return goal(self.lift(x))
embgoal = goaltest
MotionPlan.setEndpoints(self,embstart,embgoal)
[docs] def addMilestone(self,x):
"""Manually adds a milestone from the ambient space, and returns its index"""
return MotionPlan.addMilestone(self,self.project(x))
[docs] def getPath(self,milestone1=None,milestone2=None):
"""Lifts the motion planner's lower-dimensional path back to the ambient space"""
spath = MotionPlan.getPath(self,milestone1,milestone2)
if spath == None: return None
return [self.lift(q) for q in spath]
[docs] def getRoadmap(self):
"""Lifts the motion planner's lower-dimensional roadmap back to the ambient space"""
V,E = MotionPlan.getRoadmap(self)
return [self.lift(v) for v in V],E
[docs]class EmbeddedMotionPlan (MotionPlan):
"""An adaptor that "lifts" a motion planner in an :class:`EmbeddedCSpace`
or :class:`AffineEmbeddedCSpace` to a higher dimensional ambient space.
Used for planning in robots with frozen DOFs (EmbeddedCSpace) or
"affine" drivers (AffineEmbeddedCSpace).
"""
def __init__(self,space,q0,type=None,**options):
if not isinstance(space,(EmbeddedCSpace,AffineEmbeddedCSpace)):
if not hasattr(space,'project') or not hasattr(space,'lift'):
raise ValueError("space argument must have the project and lift methods")
else:
warnings.warn("EmbeddedMotionPlan has not been tested with classes other than EmbeddedCSpace and AffineEmbeddedCSpace")
MotionPlan.__init__(self,space,type,**options)
[docs] def setEndpoints(self,start,goal):
"""Takes care of projecting the start and goal (represented in the ambient
space) down to the subset. Works with both config and set goals.
"""
#take care of the moving subset -- make sure to lift MP configurations back to
#space configurations
embstart = self.space.project(start)
if hasattr(goal,'__iter__'):
if len(goal)==2 and callable(goal[0]) and callable(goal[1]):
#it's a (test,sample) pair
def goaltest(x,test=goal[0]):
return test(self.space.lift(x))
def goalsample(sample=goal[1]):
qamb = sample()
qproj = self.space.project(qamb)
return qproj
embgoal = [goaltest,goalsample]
else:
#it's a configuration
embgoal = self.space.project(goal)
elif callable(goal):
def goaltest(x,goal=goal):
return goal(self.space.lift(x))
embgoal = goaltest
MotionPlan.setEndpoints(self,embstart,embgoal)
[docs] def addMilestone(self,x):
"""Manually adds a milestone from the ambient space, and returns its index"""
return MotionPlan.addMilestone(self,self.space.project(x))
[docs] def getPath(self,milestone1=None,milestone2=None):
"""Lifts the motion planner's lower-dimensional path back to the ambient space"""
spath = MotionPlan.getPath(self,milestone1,milestone2)
if spath == None: return None
return [self.space.lift(q) for q in spath]
[docs] def getRoadmap(self):
"""Lifts the motion planner's lower-dimensional roadmap back to the ambient space"""
V,E = MotionPlan.getRoadmap(self)
return [self.space.lift(v) for v in V],E
[docs] def setCostFunction(self, edgeCost=None, terminalCost=None):
"""Sets edge / terminal costs from functions in the ambient space"""
if edgeCost:
projectedEdgeCost = lambda a,b:edgeCost(self.space.lift(a),self.space.lift(b))
else:
projectedEdgeCost = None
if terminalCost:
projectedTerminalCost = lambda a:terminalCost(self.space.lift(a))
else:
projectedTerminalCost = None
MotionPlan.setCostFunction(self,projectedEdgeCost,projectedTerminalCost)
[docs] def pathCost(self,p):
"""Calculates the cost of a path in the ambient space."""
return MotionPlan.pathCost(self,[self.space.project(x) for x in p])