"""Klampt trajectory AD functions:
===================== ============= ========================================
Function Derivative Notes
===================== ============= ========================================
hermite Y hermite(x1,v1,x2,v2,u) -> R^n
GeodesicInterpolate 1(u) ginterp[geodesic](a,b,u) -> R^n
GeodesicDistance 1 gdist[geodesic](a,b) -> R
TrajectoryEval 1 eval[Trajectory,endBehavior](t) -> R^n
Trajectory 1 trajectory[n,end](ts,xs,t) -> R^n
===================== ============= ========================================
Module summary
~~~~~~~~~~~~~~
.. autosummary::
hermite
GeodesicInterpolate
GeodesicDistance
TrajectoryEval
Trajectory
"""
import numpy as np
from .ad import ADFunctionInterface,ADFunctionCall,ADTerminal,function,_scalar
from ...model import trajectory
from .. import spline
from ..geodesic import GeodesicSpace
from klampt import RobotModel
def _hermite(x1,v1,x2,v2,u):
assert len(x1)==len(v1)
assert len(x1)==len(x2)
assert len(x1)==len(v2)
u2 = u*u
u3 = u*u*u
cx1 = 2.0*u3-3.0*u2+1.0
cx2 = -2.0*u3+3.0*u2
cv1 = (u3-2.0*u2+u)
cv2 = (u3-u2)
return cx1*x1 + cx2*x2 + cv1*v1 + cv2*v2
def _hermite_jvp(arg,darg,x1,v1,x2,v2,u):
if arg==4:
return np.array(spline.hermite_deriv(x1,v1,x2,v2,u))*darg[0]
u2 = u*u
u3 = u*u*u
if arg==0:
cx1 = 2.0*u3-3.0*u2+1.0
return cx1*darg
elif arg==1:
cv1 = (u3-2.0*u2+u)
return cv1*darg
elif arg==2:
cx2 = -2.0*u3+3.0*u2
return cx2*darg
else:
cv2 = (u3-u2)
return cv2*darg
def _hermite_gen_deriv(args,x1,v1,x2,v2,u):
assert len(args) >= 2
numu = len([a for a in args if a==4])
if numu==len(args):
#only deriv w.r.t. u
res = np.array(spline.hermite_deriv(x1,v1,x2,v2,u,order=len(args)))
#resize to the right tensor dimension
for a in args:
res=res[:,np.newaxis]
return res
if len(args)-numu > 1:
#bilinear-ish
return 0
if numu == 1:
u2 = u*u
dcx1 = (6.0*u2-6.0*u)
dcx2 = (-6.0*u2+6.0*u)
dcv1 = 3.0*u2-4.0*u+1.0
dcv2 = 3.0*u2-2.0*u
coeffs = [dcx1,dcv1,dcx2,dcv2]
assert len(args)==2
if args[0] != 4:
res = np.diag([coeffs[args[0]]]*len(x1))
return res[:,:,np.newaxis]
elif args[1] != 4:
res = np.diag([coeffs[args[1]]]*len(x1))
return res[:,np.newaxis,:]
assert False,"This code should not be reached"
elif numu == 2:
ddcx1 = 12*u
ddcx2 = -12.0*u
ddcv1 = 6.0*u-4.0
ddcv2 = 6.0*u-2.0
coeffs = [ddcx1,ddcv1,ddcx2,ddcv2]
res = None
for a in args:
if a!=4:
res = np.diag([coeffs[a]]*len(x1))
assert res is not None,"This code should not be reached"
uhit = False
for a in args:
if a==4:
#TODO: figure how to extend axis properly
if uhit:
res = res[:,np.newaxis]
else:
res = res[:,np.newaxis,:]
return res
elif numu == 3:
cx1 = 12
cx2 = -12.0
cv1 = 6.0
cv2 = 6.0
coeffs = [cx1,cv1,cx2,cv2]
res = None
for a in args:
if a!=4:
res = np.diag([coeffs[a]]*len(x1))
assert res is not None,"This code should not be reached"
uhit = False
for a in args:
if a==4:
#TODO: figure how to extend axis properly
if uhit:
res = res[:,np.newaxis]
else:
res = res[:,np.newaxis,:]
return res
else:
return 0
hermite = function(_hermite,'trajectory.hermite',[-1,-1,-1,-1,1],-1,['x1','v1','x2','v2','u'],jvp=_hermite_jvp,gen_derivative=_hermite_gen_deriv)
"""Autodiff'ed version of the :func:`klampt.math.spline.hermite` function. Takes
Arguments (x1,v1,x2,v2,u) defining a Hermite curve with endpoints x1,x2 and
velocities v1,v2, and returns the interpolant at parameter u in [0,1]."""
[docs]class GeodesicInterpolate(ADFunctionInterface):
"""Autodiff wrapper of geodesic.interpolate(a,b,u)."""
def __init__(self,geodesic):
self.geodesic = geodesic
def __str__(self):
return "trajectory.GeodesicInterpolate[%s]"%(self.geodesic.__class__.__name__)
[docs] def n_args(self):
return 3
[docs] def n_in(self,arg):
if arg==0 or arg==1:
return self.geodesic.extrinsicDimension()
return 1
[docs] def n_out(self):
return self.geodesic.extrinsicDimension()
[docs] def eval(self,a,b,u):
x = self.geodesic.interpolate(a.tolist(),b.tolist(),u)
return np.array(x)
[docs] def jvp(self,arg,darg,a,b,u):
if arg==2:
x = self.geodesic.interpolate(a.tolist(),b.tolist(),u)
return (np.array(self.geodesic.difference(b,x))-np.array(self.geodesic.difference(a,x)))*darg[0]
raise NotImplementedError()
[docs]class GeodesicDistance(ADFunctionInterface):
"""Autodiff wrapper of geodesic.distance(a,b)."""
def __init__(self,geodesic):
self.geodesic = geodesic
def __str__(self):
return "trajectory.GeodesicDistance[%s]"%(self.geodesic.__class__.__name__)
[docs] def n_args(self):
return 2
[docs] def n_in(self,arg):
return self.geodesic.extrinsicDimension()
[docs] def n_out(self):
return 1
[docs] def eval(self,a,b):
return self.geodesic.distance(a.tolist(),b.tolist())
[docs] def jvp(self,arg,darg,a,b):
d = self.eval(a,b)
if arg==0:
return np.array(self.geodesic.difference(b,a)).dot(darg)/d
else:
return np.array(self.geodesic.difference(a,b)).dot(darg)/d
[docs]class TrajectoryEval(ADFunctionInterface):
"""Autodiff wrapper of traj.eval(t).
Args:
traj (Trajectory): the trajectory to evaluate. Can be any Trajectory
subclass.
endBehavior (string): how to evaluate out-of-domain times. Can be
'halt' or 'loop'.
"""
def __init__(self,traj,endBehavior='halt'):
self.traj = traj
if isinstance(self.traj,trajectory.SE3Trajectory) or isinstance(self.traj,trajectory.SE3HermiteTrajectory):
self.flatten=lambda x:x[0]+x[1]
else:
self.flatten=lambda x:x
self.endBehavior = endBehavior
assert len(self.traj.milestones) > 0
def __str__(self):
if self.endBehavior == 'halt':
return "trajectory.TrajectoryEval[%s,%d segments,%f,%f]"%(self.traj.__class__.__name__,len(self.traj.times)-1,self.traj.startTime(),self.traj.endTime())
else:
return "trajectory.TrajectoryEval[%s,%d segments,%s]"%(self.traj.__class__.__name__,len(self.traj.times)-1,self.endBehavior)
[docs] def n_args(self):
return 1
[docs] def n_in(self,arg):
return 1
[docs] def n_out(self):
return len(self.flatten(self.traj.waypoint(self.traj.milestones[0])))
[docs] def eval(self,t):
x = self.traj.eval(t,self.endBehavior)
return np.array(self.flatten(x))
[docs] def derivative(self,arg,t):
assert arg == 0
return np.array(self.flatten(self.traj.deriv(t,self.endBehavior)))[:,np.newaxis]
[docs] def jvp(self,arg,darg,t):
assert arg == 0
return np.array(self.flatten(self.traj.deriv(t,self.endBehavior)))*darg
[docs]class Trajectory(ADFunctionInterface):
"""Autodiff piecewise linear interpolation traj(times,milestone_stack,t).
The dimensionality or a robot must be provided on initialization.
``milestone_stack`` is a flattened array of the trajectory's milestones.
Also, ``t`` can be either a scalar or an array of times. Providing an
array of times is faster than calling this repeatedly for multiple times.
Args:
n_or_robot (int or RobotModel): the dimension of the state, or the
robot. In this case the milestones must be robot configurations.
endBehavior (string): how to evaluate out-of-domain times. Can be
'halt' or 'loop'.
"""
def __init__(self,n_or_robot,endBehavior='halt'):
if isinstance(n_or_robot,RobotModel):
self.robot = n_or_robot
self.n = self.robot.numLinks()
else:
self.n = n_or_robot
self.robot = None
self.endBehavior = endBehavior
def __str__(self):
if self.robot is None:
return "trajectory.Trajectory[%d]"%(self.n,)
else:
return "trajectory.Trajectory[%s]"%(self.robot.getName(),)
[docs] def n_args(self):
return 3
[docs] def n_in(self,arg):
return -1
[docs] def n_out(self):
return -1
[docs] def eval(self,ts,xs,t):
if xs.shape != (len(ts)*self.n,):
raise ValueError("Invalid size of the xs array, must be a stacked vector of milestones of size %d x %d"%(len(ts),self.n))
if self.robot is None:
traj = trajectory.Trajectory(ts,xs.reshape(len(ts),self.n))
else:
traj = trajectory.RobotTrajectory(self.robot,ts,xs.reshape(len(ts),self.n))
if _scalar(t):
return np.array(traj.eval(t),self.endBehavior)
else:
return np.array([traj.eval(v,self.endBehavior) for v in t])
[docs] def jvp(self,arg,darg,ts,xs,t):
if xs.shape != (len(ts)*self.n,):
raise ValueError("Invalid size of the xs array, must be a stacked vector of milestones of size %d x %d"%(len(ts),self.n))
if self.robot is None:
traj = trajectory.Trajectory(ts,xs.reshape(len(ts),self.n))
else:
traj = trajectory.RobotTrajectory(self.robot,ts,xs.reshape(len(ts),self.n))
if arg==2:
if _scalar(t):
return np.array(traj.deriv(t,self.endBehavior))
else:
return np.array([traj.deriv(v,self.endBehavior) for v in t])
else:
if darg.shape != (self.n,):
raise ValueError("Invalid size of the darg array, must have size %d"%(self.n,))
if _scalar(t):
segs = [traj.getSegment(t,self.endBehavior)]
else:
segs = [traj.getSegment(v,self.endBehavior) for v in t]
derivs = []
for index,(i,u) in enumerate(segs):
if i<0 or i+1 >= len(ts): derivs.append(np.zeros(self.n))
ta = ts[i]
tb = ts[i+1]
a = xs[i*self.n:(i+1)*self.n]
b = xs[(i+1)*self.n:(i+2)*self.n]
if arg==0:
#u = (t-ta)/(tb-ta)
#x = a+u*(b-a)
#du = [-dta*(tb-ta) - (t-ta)*(dtb-dta)]/(tb-ta)^2 = [dta*(t-tb) + (ta-t)*dtb]/(tb-ta)^2
if ta==tb:
derivs.append(np.zeros(self.n))
else:
dta = darg[i]
dtb = darg[i+1]
du = -(dta*(1-u) + dtb*u) /(tb-ta)
derivs.append(du*(b-a))
else:
#x = (1-u)a+ub
da = darg[i*self.n:(i+1)*self.n]
db = darg[(i+1)*self.n:(i+2)*self.n]
derivs.append((1-u)*da + u*db)
if _scalar(t):
return derivs[0]
return np.array(derivs)