try:
#version 0.9+
from klampt.plan.motionplanning import interpolate_nd_min_time,brake_nd
from klampt.plan.motionplanning import combine_nd_cubic
except ImportError:
#version 0.8.x
from klampt.plan.motionplanning import interpolateNDMinTime as interpolate_nd_min_time
from klampt.plan.motionplanning import combineNDCubic as combine_nd_cubic
brake_nd = None
from klampt.math.spline import hermite_eval,hermite_deriv
from klampt.math import vectorops
from klampt.model.trajectory import Trajectory,HermiteTrajectory
import math
import warnings
[docs]class VelocityBoundedMotionGeneration:
"""A motion generator that limits the element-wise velocities for
continuously generated position targets. Note that the target
is reached simultaneously on each dimension.
Usage::
from klampt.control.utils import TimedLooper
x0 = robot.getPosition()
generator = VelocityBoundedMotionGeneration(x0,vmax)
dt = 1.0/rate #some control rate
looper = TimedLooper(dt)
while looper:
#TODO: send commands, e.g., generator.setTarget(xtgt)
(x,v) = generator.update(dt)
robot.setPosition(x)
Args:
x0 (list of floats): the start position
vmax (list of floats, optional): the velocity limits
"""
def __init__(self,x0,vmax):
if len(x0) != len(vmax):
raise ValueError("Invalid length of vmax")
n = len(x0)
self.x = [v for v in x0]
self.v = [0]*n
self.vmax = vmax
self.times = [0]
self.milestones = [self.x]
self.curTime = 0
self.trajTime = 0
[docs] def remainingTime(self):
"""Returns the remaining time to finish the path, in s. Returns 0
if the path is done executing.
"""
return max(0,self.times[-1]-self.curTime)
[docs] def reset(self,x0):
"""Resets the motion generator to the start position x0."""
self.x = x0
self.v = [0]*len(x0)
self.times = [0]
self.milestones = [self.x]
self.curTime = 0
self.trajTime = 0
[docs] def update(self,dt,xtarget=None):
"""Updates the motion generator. Optionally sets the new target.
Returns:
tuple: (x,v) giving the new state
"""
if dt <= 0:
raise ValueError("Invalid dt")
if xtarget is not None:
self.setTarget(xtarget)
self.curTime += dt
self.trajTime += dt
xnew,vnew = self.predict(0)
self.x = xnew
self.v = vnew
return xnew,vnew
[docs] def setTarget(self,xtarget,append=False):
"""Sets a new target. If append=True, appends the target
to the motion queue.
"""
assert len(xtarget) == len(self.x)
self._trim(append)
xlast = self.milestones[-1]
tmax = 0
for (x,xt,vmax) in zip(xlast,xtarget,self.vmax):
dx = xt-x
tmax = max(tmax, abs(dx) / vmax)
self.times.append(self.times[-1]+tmax)
self.milestones.append(xtarget)
[docs] def setVelocity(self,vtarget,duration=1,append=False):
"""Sets a velocity command to vtarget. Moves along this
speed for duration seconds and then stops.
"""
assert len(vtarget) == len(self.x)
assert duration >= 0
self._trim(append)
xlast = self.milestones[-1]
self.times.append(self.times[-1]+duration)
self.milestones.append(vectorops.madd(xlast,vtarget,duration))
[docs] def brake(self):
"""Stops as quickly as possible. Since acceleration is unbounded,
this just stops immediately."""
self.setTarget(self.x)
def _trim(self,append):
newtimes = [0]
newmilestones = [self.x]
if append:
#cut prior trajectory to trajTime
for j,t in enumerate(self.times):
if t > self.trajTime:
newtimes.append(t-self.trajTime)
newmilestones.append(self.milestones[j])
else:
#just reset prior trajectory
pass
self.times = newtimes
self.milestones = newmilestones
self.trajTime = 0
[docs] def duration(self):
"""Returns the time remaining in the trajectory"""
return max(self.times[-1]-self.trajTime,0)
[docs] def target(self):
"""Returns the final position on the trajectory"""
return self.milestones[-1]
[docs] def predict(self,t):
"""Predicts the state that the system will be in at time t>=0 in the
future, assuming no changes to the target.
Returns:
tuple: (x,v) giving the new state
"""
t = t + self.trajTime
if t < self.times[0]:
return self.x,self.v
j = 0
while j+1 < len(self.times):
if t < self.times[j+1]:
u = (t - self.times[j])/(self.times[j+1]-self.times[j])
speed = 1.0/(self.times[j+1]-self.times[j])
x = vectorops.interpolate(self.milestones[j],self.milestones[j+1],u)
v = vectorops.mul(vectorops.sub(self.milestones[j+1],self.milestones[j],speed))
return x,v
j += 1
return self.milestones[-1],[0]*len(self.x)
[docs] def trajectory(self):
"""Returns the future trajectory as a Trajectory.
"""
times = [0]
milestones = [self.x]
for j in range(len(self.times)):
if self.trajTime < self.times[j]:
times.append(self.times[j]-self.trajTime)
milestones.append(self.milestones[j])
return Trajectory(times,milestones)
[docs]class AccelerationBoundedMotionGeneration:
"""A motion generator similar to the Reflexxes library, which
provides acceleration- and velocity-bounded smooth trajectories
for arbitrary targets.
Usage::
from klampt.control.utils import TimedLooper
x0 = robot.getPosition()
generator = AccelerationBoundedMotionGeneration(x0,vmax)
dt = 1.0/rate #some control rate
looper = TimedLooper(dt)
while looper:
#TODO: send commands, e.g., generator.setTarget(xtgt)
(x,v) = generator.update(dt)
robot.setPosition(x)
Args:
x0 (list of floats): the start position
xmin (list of floats, optional): the lower position joint limits
xmax (list of floats, optional): the upper position joint limits
vmax (list of floats, optional): the velocity limits
amax (list of floats): the acceleration limits. Non-optional (for now.)
"""
def __init__(self,x0,xmin=None,xmax=None,vmax=None,amax=None):
if len(x0) != len(vmax):
raise ValueError("Invalid length of vmax")
if len(x0) != len(amax):
raise ValueError("Invalid length of amax")
n = len(x0)
self.x = [v for v in x0]
self.v = [0]*n
self.xmin = xmin if xmin is not None else [-float('inf')]*n
self.xmax = xmax if xmax is not None else [float('inf')]*n
self.vmax = vmax if vmax is not None else [float('inf')]*n
self.amax = amax if amax is not None else [float('inf')]*n
if amax is None:
raise ValueError("amax needs to be specified")
self.times = [[0] for v in x0]
self.milestones = [[v] for v in x0]
self.dmilestones = [[0] for v in x0]
self.trajTime = 0
self.curTime = 0
[docs] def remainingTime(self):
"""Returns the remaining time to finish the path, in s. Returns 0
if the path is done executing.
"""
return max(0,max(t[-1]-self.trajTime for t in self.times))
[docs] def reset(self,x0):
"""Resets the motion generator to the start position x0."""
if len(x0) != len(self.amax):
raise ValueError("Invalid length of the configuration")
self.x = x0
self.v = [0]*len(x0)
self.times = [[0] for v in x0]
self.milestones = [[v] for v in x0]
self.dmilestones = [[0] for v in x0]
self.trajTime = 0
[docs] def update(self,dt,xtarget=None,vtarget=None):
"""Updates the motion generator. Optionally sets the new target and velocity.
If velocity is None, ends in 0 velocity.
Returns:
tuple: (x,v) giving the new state
"""
if dt <= 0:
raise ValueError("Invalid dt")
if xtarget is not None:
self.setTarget(xtarget,vtarget)
self.trajTime += dt
self.curTime += dt
x,v = self.predict(0)
self.x = x
self.v = v
return x,v
[docs] def setTarget(self,xtarget,vtarget=None,append=False):
"""Sets a new target position xtarget and optional velocity vtarget.
If append=True, appends the target to the motion queue.
"""
assert len(xtarget) == len(self.x)
if vtarget is None:
vtarget = [0]*len(xtarget)
assert len(vtarget) == len(self.x)
self._trim(append)
t,m,v = interpolate_nd_min_time(self.x,self.v,xtarget,vtarget,
self.xmin,self.xmax,self.vmax,self.amax)
if len(t)==0:
if self.x != xtarget or self.v != vtarget:
warnings.warn("Cannot solve for path from {}, {} to target {}, {}".format(self.x,self.v,xtarget,vtarget))
else:
assert len(t) == len(m)
assert len(t) == len(v)
assert len(t) == len(self.x)
for i,vi in enumerate(v):
for j,vj in enumerate(vi):
if abs(vj) > self.vmax[i]:
vj = min(max(-self.vmax[i],vj),self.vmax[i])
if abs(vj) > self.vmax[i]*1.001:
warnings.warn("Solved velocity {} is larger than vmax {}".format(vi,self.vmax[i]))
vi[j] = vj
for i in range(len(self.x)):
self.times[i] += [x+self.trajTime for x in t[i][1:]]
self.milestones[i] += m[i][1:]
self.dmilestones[i] += v[i][1:]
self._checkValid()
[docs] def setVelocity(self,vtarget,duration=1,append=False):
"""Sets a velocity command to vtarget. Moves along this
speed for duration seconds and then stops.
"""
assert len(vtarget)==len(self.x)
assert duration >= 0
self._trim(append)
self._append_ramp_velocity(vtarget)
#go straight for a duration
for i in range(len(self.x)):
self.times[i].append(self.times[i][-1]+duration)
self.milestones[i].append(self.milestones[-1][i]+vtarget[i]*duration)
self.dmilestones[i].append(vtarget)
#ramp down
self._append_ramp_velocity([0]*len(self.x))
self._checkValid()
[docs] def brake(self):
"""Stops as quickly as possible under the acceleration bounds.
"""
self._trim(False)
if brake_nd is None:
#0.8.x Klampt
raise NotImplementedError("brake() not available in Klampt 0.8.x")
t,m,v = brake_nd(self.x,self.v,self.xmin,self.xmax,self.amax)
for i in range(len(self.x)):
assert t[i][0] == 0
self.times[i] += [x+self.trajTime for x in t[i][1:]]
self.milestones[i] += m[i][1:]
self.dmilestones[i] += v[i][1:]
self._checkValid()
def _append_ramp_velocity(self,vtarget):
vlast = self.dmilestones[-1]
#ramp up to vtarget
tmax = 0
for i in range(len(self.x)):
dv = vtarget[i] - vlast[i]
tmax = max(tmax,abs(dv/self.amax[i]))
if tmax > 0:
#ramp up to tmax with parabolic curve
for i in range(len(self.x)):
self.times[i].append(self.times[i][-1]+tmax)
dv = vtarget[i] - vlast[i]
a = dv / tmax
self.milestones[i].append(self.milestones[-1][i]+self.dmilestones[-1][i]*tmax + a*0.5*tmax**2)
self.dmilestones[i].append(vtarget[i])
self._checkValid()
def _trim(self,append):
newtimes = [[0] for v in self.x]
newmilestones = [[v] for v in self.x]
newdmilestones = [[v] for v in self.v]
if append:
#cut prior trajectory to trajTime
for i in range(len(self.x)):
for j,t in enumerate(self.times[i]):
if t > self.trajTime:
newtimes[i].append(t-self.trajTime)
newmilestones[i].append(self.milestones[i][j])
newdmilestones[i].append(self.dmilestones[i][j])
else:
#reset prior path
pass
self.times = newtimes
self.milestones = newmilestones
self.dmilestones = newdmilestones
self.trajTime = 0
self._checkValid()
[docs] def duration(self):
"""Returns the time remaining in the trajectory"""
return max(self.times[0][-1]-self.trajTime,0)
[docs] def target(self):
"""Returns the final position on the trajectory"""
return [mi[-1] for mi in self.milestones]
[docs] def predict(self,t):
"""Predicts the state that the system will be in at time t>=0 in the
future, assuming no changes to the target.
Args:
t (float): the time in the future. Should be >=0.
Returns:
tuple: (x,v) giving the predicted state
"""
t = t + self.trajTime
x = []
v = []
for i in range(len(self.times)):
ti,mi,dmi = self.times[i],self.milestones[i],self.dmilestones[i]
#evaluate trajectory
j = 0
xi = mi[-1]
vi = dmi[-1]
if t < ti[-1]:
while j+1 < len(ti):
if t < ti[j+1]:
assert t >= ti[j]
dt = (ti[j+1]-ti[j])
u = (t-ti[j])/dt
xi = hermite_eval([mi[j]],[dmi[j]*dt],[mi[j+1]],[dmi[j+1]*dt],u)[0]
vi = hermite_deriv([mi[j]],[dmi[j]*dt],[mi[j+1]],[dmi[j+1]*dt],u)[0]/dt
if abs(vi) > self.vmax[i]:
if abs(vi) > self.vmax[i]*1.001:
warnings.warn("{} {} -> {} {} at u={}, dt={}".format(mi[j],dmi[j],mi[j+1],dmi[j+1],u,(ti[j+1]-ti[j])))
warnings.warn("Evaluated velocity {} is larger than vmax {}".format(vi,self.vmax[i]))
vi = min(max(-self.vmax[i],vi),self.vmax[i])
break
j += 1
x.append(xi)
v.append(vi)
return x,v
[docs] def trajectory(self):
"""Returns the future trajectory as a HermiteTrajectory.
"""
self._checkValid()
times,milestones,dmilestones = combine_nd_cubic(self.times,self.milestones,self.dmilestones)
prefix,suffix = HermiteTrajectory(times,milestones,dmilestones).split(self.trajTime)
suffix.times = [t-self.trajTime for t in suffix.times]
return suffix
def _checkValid(self):
assert len(self.x) == len(self.v)
assert len(self.x) == len(self.times)
assert len(self.times) == len(self.milestones)
assert len(self.times) == len(self.dmilestones)
for i in range(len(self.times)):
assert len(self.times[i]) == len(self.milestones[i])
assert len(self.times[i]) == len(self.dmilestones[i])
for j in range(len(self.times[i])-1):
assert self.times[i][j+1] >= self.times[i][j]