"""This module defines the MultiPath class, and methods for loading and
saving multipaths from xml files.
"""
from ..model.contact import Hold
from ..math import vectorops
from . import trajectory
import xml.etree.ElementTree as ET
from xml.sax.saxutils import escape
from xml.dom import minidom
import math
import bisect
import warnings
from typing import Union,List,Optional,Tuple
from .typing import Vector
[docs]class MultiPath:
"""A sophisticated path representation that allows timed/untimed paths, attached
velocity information, as well as making and breaking contact.
Primarily, a MultiPath consists of a list of Sections, each of which is a path or
timed trajectory along a fixed stance.
A Section can either contain the Holds defining its stance, or its stance could be
defined by indices into the holdSet member of MultiPath. If all Sections have the
latter structure it is known as an "aggregated" MultiPath.
Attributes:
sections (list of Sections): the segments of the multipath, each operating over a
fixed stance.
settings (dict mapping str to str): an unstructured propery map 'settings' which
can contain metadata about which robot this path applies to, how the path
was created, etc.
holdSet (dict mapping int to Hold): a set of indexed Holds, which can be referred
to inside Section objects.
"""
[docs] class Section:
"""Contains a path or time-parameterized trajectory, as well as a
list of holds and ik constraints
If the times member is set, this is time parameterized. Otherwise,
it is just a path.
Attributes:
settings (dict mapping str to str): an unstructured property map.
configs (list of lists of floats): a list of N configurations along the
path section.
velocities (list of lists of floats, optional): a list of N joint
velocities along the path section.
times (list of floats, optional) a list of N times for each configuration
along the path section.
holds (list of Holds, optional): the set of Holds that this section
is required to meet.
holdIndices (list of ints, optional): the set of Holds that this section
is required to meet, indexed into MultiPath.holdSet.
ikObjectives (list of IKObjectives, optional): the set of extra IKObjectives
that this section is required to meet.
"""
def __init__(self):
self.settings = {} # dict
self.configs = [] # List[Vector]
self.velocities = None #Optional[List[Vector]]
self.times = None #Optional[List[Vector]]
self.holds = [] #list
self.holdIndices = [] #list
self.ikObjectives = [] #list
def __init__(self):
self.sections = [] #List[MultiPath.Section]
self.settings = {} #dict
self.holdSet = dict() #dict
[docs] def numSections(self) -> int:
return len(self.sections)
[docs] def startConfig(self) -> Vector:
return self.sections[0].configs[0]
[docs] def endConfig(self) -> Vector:
return self.sections[-1].configs[-1]
[docs] def startTime(self) -> float:
if len(self.sections)==0 or self.sections[0].times==None: return 0
return self.sections[0].times[0]
[docs] def endTime(self) -> float:
"""Returns the final time parameter"""
if len(self.sections)==0: return 0
if self.sections[-1].times==None: return sum(len(s.configs)-1 for s in self.sections)
return self.sections[-1].times[-1]
[docs] def duration(self) -> float:
return self.endTime()-self.startTime()
[docs] def hasTiming(self) -> bool:
"""Returns true if the multipath is timed"""
return self.sections[0].times is not None
[docs] def checkValid(self):
"""Checks for validity of the path"""
if self.hasTiming():
t0 = self.startTime()
for i,s in enumerate(self.sections):
if s.times is None:
raise ValueError("MultiPath section 0 is timed but section "+str(i)+" is not timed")
if len(s.times) != len(s.configs):
raise ValueError("MultiPath section "+str(i)+" has invalid number of times")
for t in s.times:
if t < t0:
raise ValueError("MultiPath section "+str(i)+" times are not monotonically increasing")
t0 = t
else:
for i,s in enumerate(self.sections):
if s.times is not None:
raise ValueError("MultiPath section 0 is not timed but section "+str(i)+" is timed")
for i,s in enumerate(self.sections):
if len(s.configs) <= 1:
raise ValueError("Section "+str(i)+" has 0 or 1 configuration, timing may be messed up")
return True
[docs] def isContinuous(self) -> bool:
"""Returns true if all the sections are continuous (i.e., the last config of each
section matches the start config of the next)."""
for i in range(len(self.sections)-1):
if self.sections[i].configs[-1] != self.sections[i+1].configs[0]:
return False
return True
[docs] def getSectionTiming(self,section) -> Tuple[float,float]:
"""Returns a pair (tstart,tend) giving the timing of the section"""
assert section >= 0 and section < len(self.sections)
if self.hasTiming():
return self.sections[section].times[0],self.sections[section].times[-1]
t0 = 0
for i in range(section):
t0 += len(self.sections[i].configs)-1
return (t0,t0+len(self.sections[section].configs))
[docs] def getStance(self,section):
"""Returns the list of Holds that the section should satisfy"""
res = self.sections[section].holds[:]
res+=[self.holdSet[ind] for ind in self.sections[section].holdIndices]
for g in self.sections[section].ikObjectives:
h=Hold()
h.link = g.link()
h.ikConstraint = g
res.append(h)
return res
[docs] def getIKProblem(self,section):
"""Returns the set of IKObjectives that the section should satisfy"""
res = [h.ikConstraint for h in self.sections[section].holds]
res += [self.holdSet[ind].ikConstraint for ind in self.sections[section].holdIndices]
res += self.sections[section].ikObjectives
return res
[docs] def aggregateHolds(self,holdSimilarityThreshold=None):
"""Aggregates holds from all sections to the global holdSet variable, and converts
sections to use holdIndices. If holdSimilarityThreshold is not None, then sections'
holds that are the same within the given tolerance are combined into one hold."""
def sameHold(h1,h2,tol):
from ..math import vectorops,so3
if h1.link != h2.link: return False
if len(h1.contacts) != len(h2.contacts): return False
if h1.ikConstraint.numPosDims() != h2.ikConstraint.numPosDims(): return False
if h1.ikConstraint.numRotDims() != h2.ikConstraint.numRotDims(): return False
if h1.ikConstraint.numPosDims() == 3:
xl1,xw1 = h1.ikConstraint.getPosition()
xl2,xw2 = h2.ikConstraint.getPosition()
if vectorops.distanceSquared(xl1,xl2) > tol**2 or vectorops.distanceSquared(xw1,xw2) > tol**2:
return False
elif h1.ikConstraint.numPosDims() != 0:
raise NotImplementedError("Distance detection for non-point or free holds")
if h1.ikConstraint.numPosDims() == 3:
R1 = h1.ikConstraint.getRotation()
R2 = h2.ikConstraint.getRotation()
if so3.distance(R1,R2) > tol:
return False
for (c1,c2) in zip(h1.contacts,h2.contacts):
if vectorops.distanceSquared(c1,c2) > tol**2:
return False
return True
holdsByLink = dict()
for s in self.sections:
for h in s.holds:
found = False
if holdSimilarityThreshold is not None:
for oldHold,index in holdsByLink.get(h.link,[]):
if sameHold(h,oldHold,holdSimilarityThreshold):
s.holdIndices.append(index)
found = True
break
if not found:
s.holdIndices.append(len(self.holdSet))
self.holdSet[len(self.holdSet)] = h
if holdSimilarityThreshold is not None:
holdsByLink.setdefault(h.link,[]).append((h,s.holdIndices[-1]))
s.holds = []
[docs] def deaggregateHolds(self):
"""De-aggregates holds from the global holdSet variable into the sections' holds."""
for s in self.sections:
for h in s.holdIndices:
s.holds.append(self.holdSet[h])
s.holdIndices = []
self.holdSet = dict()
[docs] def setConfig(self,section,configIndex,q,v=None,t=None,maintainContinuity=True):
"""Sets a configuration along the path, maintaining continuity if maintainContinuity is true.
Equivalent to self.sections[section].configs[configIndex] = q except that adjacent sections'
configurations are also modified."""
assert section >= 0 and section < len(self.sections)
if configIndex < 0:
configIndex = len(self.sections[section].configs)-1
self.sections[section].configs[configIndex] = q
if v is not None:
assert self.sections[section].velocities is not None
self.sections[section].velocities[configIndex] = v
if t is not None:
assert self.sections[section].times is not None
self.sections[section].times[configIndex] = t
if maintainContinuity:
section0 = section
configIndex0 = configIndex
while section > 0 and configIndex == 0:
section = section - 1
configIndex = len(self.sections[section].configs)-1
self.setConfig(section,configIndex,q,v,t,False)
section = section0
configIndex = configIndex0
while section + 1 < len(self.sections) and configIndex + 1 == len(self.sections[section].configs):
section = section + 1
configIndex = 0
self.setConfig(section,configIndex,q,v,t,False)
[docs] def concat(self,path):
"""Appends the path, making sure times and holds are appropriately set"""
newSections = path.sections[:]
dt = 0.0
if path.hasTiming() and len(self.sections)>0:
#shift timing
assert(self.hasTiming())
dt = self.endTime()
for s in newSections:
s.times = [t+dt for t in s.times]
#rename global hold set
newholds = path.holdSet.copy()
namemap = dict()
for (name,h) in path.holdSet.items():
if name in self.holdSet:
found = False
for k in range(2,1000):
newname = str(name)+"("+str(k)+")"
if newname not in self.holdSet:
namemap[name] = newname
found = True
if not found:
raise ValueError("Unable to merge hold name "+name)
newholds[newname] = newholds[name]
del newholds[name]
for s in newSections:
inds = []
for h in s.holdIndices:
if h in namemap:
inds.append(namemap[h])
else:
inds.append(h)
s.holdIndices = inds
self.sections += newSections
self.holdSet.update(newholds)
[docs] def save(self,fn):
"""Saves this multipath to an xml file."""
tree = self.saveXML()
f = open(fn,'w')
f.write('<?xml version="1.0"?>\n')
f.write(_prettify(tree.getroot()))
#tree.write(fn,pretty_print=True)
f.close()
[docs] def load(self,fn):
"""Loads this multipath from a multipath xml file."""
tree = ET.parse(fn)
return self.loadXML(tree)
[docs] def saveXML(self):
"""Saves this multipath to a multipath xml tree (ElementTree)"""
from ..io import loader
root = ET.Element("multipath")
root.attrib = self.settings
for sec in self.sections:
xs = ET.Element("section")
root.append(xs)
xs.attrib = sec.settings
for ikgoal in sec.ikObjectives:
xik = ET.SubElement(xs,"ikgoal")
if hasattr(ikgoal,'text'):
xik.text = ikgoal.text
else:
xik.text = loader.write_IKObjective(ikgoal)
for h in sec.holds:
xh = ET.SubElement(xs,"hold")
xh.text = loader.write_Hold(h)
for h in sec.holdIndices:
xh = ET.SubElement(xs,"hold")
if isinstance(h,int):
xh.set("index",str(h))
else:
xh.set("name",str(h))
for i in range(len(sec.configs)):
xm = ET.Element("milestone")
xs.append(xm)
xm.set("config",loader.write_Vector(sec.configs[i]))
if sec.times != None:
xm.set("time",str(sec.times[i]))
if sec.velocities != None:
xm.set("velocity",loader.write_Vector(sec.velocities[i]))
for hkey,h in self.holdSet.items():
xh = ET.Element("hold")
root.append(xh)
if not isinstance(hkey,int):
xh.set('name',str(hkey))
xh.text = loader.write_Hold(h)
return ET.ElementTree(root)
[docs] def loadXML(self,tree):
"""Loads a multipath from a multipath xml tree (ElementTree)."""
from ..io import loader
self.sections = []
self.holdSet = dict()
self.settings = dict()
root = tree.getroot()
for k,v in root.attrib.items():
self.settings[k]=v
for sec in root.findall('section'):
s = MultiPath.Section()
for k,v in sec.attrib.items():
s.settings[k]=v
milestones = sec.findall('milestone')
for m in milestones:
if 'config' not in m.attrib:
raise ValueError("Milestone does not contain config attribute")
s.configs.append(loader.read_Vector(m.attrib['config']))
if 'time' in m.attrib:
if s.times==None: s.times = []
s.times.append(float(m.attrib['time']))
if 'velocity' in m.attrib:
if s.velocities==None: s.velocities = []
s.velocities.append(loader.read_Vector(m.attrib['velocity']))
for obj in sec.findall('ikgoal'):
s.ikObjectives.append(loader.read_IKObjective(obj.text))
s.ikObjectives[-1].text = obj.text
for h in sec.findall('hold'):
if 'index' in h.attrib:
s.holdIndices.append(int(h.attrib['index']))
elif 'name' in h.attrib:
s.holdIndices.append(h.attrib['name'])
else:
s.holds.append(loader.read_Hold(h.text))
self.sections.append(s)
#read global hold set
for h in root.findall('hold'):
hold = loader.read_Hold(h.text)
if 'name' in h.attrib:
self.holdSet[h.attrib['name']] = hold
else:
self.holdSet[len(self.holdSet)] = hold
return
[docs] def timeToSection(self,t):
"""Returns the section corresponding to the time parameter t"""
if not self.hasTiming():
return int(math.floor(t*len(self.sections)))
else:
if t < self.startTime(): return -1
for i,s in enumerate(self.sections):
if t < s.times[-1]:
return i
return len(self.sections)
[docs] def timeToSegment(self,t):
""" Returns a (section index,milestone index,param) tuple such that
interpolation between the section's milestone and its successor
corresponds to time t."""
s = self.timeToSection(t)
if s < 0: return (-1,0,0)
elif s >= len(self.sections): return (s,0,0)
sec = self.sections[s]
if len(sec.times)==0:
usec = t*len(self.sections)-s
tsec = (len(sec.milestones)-1)*usec
i = int(math.floor(tsec))
u = tsec - i
return (s,i,u)
else:
i = bisect.bisect_left(sec.times,t)
p = i-1
u=(t-sec.times[p])/(sec.times[i]-sec.times[p])
if i==0:
return (s,0,0)
return (s,p,u)
[docs] def eval(self,t):
"""Evaluates the MultiPath at time t."""
(s,i,u) = self.getSegment(t)
if s < 0: return self.startConfig()
elif s >= len(self.sections): return self.endConfig()
if u==0: return self.sections[s].milestones[i]
return vectorops.interpolate(self.sections[s].milestones[i],self.sections[s].milestones[i+1],u)
[docs] def setTrajectory(self,trajectory: Union[trajectory.Trajectory,List]):
"""Set this MultiPath to a single-section trajectory. If trajectory
is a HermiteTrajectory, velocities will be extracted.
"""
self.holdSet = {}
self.settings = {'program':'setTrajectory'}
self.sections = [MultiPath.Section()]
if isinstance(trajectory,trajectory.Trajectory):
self.sections[0].times = trajectory.times
self.sections[0].configs = trajectory.milestones
if isinstance(trajectory,trajectory.HermiteTrajectory):
n = len(trajectory.milestones[0])//2
self.sections[0].configs = [v[:n] for v in trajectory.milestones]
self.sections[0].velocities = [v[n:] for v in trajectory.milestones]
else:
self.sections[0].configs = trajectory
[docs] def getTrajectory(self,robot=None,eps=None) -> trajectory.Trajectory:
"""Returns a trajectory representation of this MultiPath. If robot is provided, then a RobotTrajectory
is returned. Otherwise, if velocity information is given, then a HermiteTrajectory is returned.
Otherwise, a Trajectory is returned.
If robot and eps is given, then the IK constraints along the trajectory are solved and the path is
discretized at resolution eps.
"""
res = trajectory.Trajectory()
if robot is not None:
res = trajectory.RobotTrajectory(robot)
if self.sections[0].velocities is not None:
warnings.warn("MultiPath.getTrajectory: can't discretize IK constraints with velocities specified")
elif self.sections[0].velocities is not None:
res = trajectory.HermiteTrajectory()
if robot is not None and eps is not None:
from ..plan.robotcspace import ClosedLoopRobotCSpace
hastiming = self.hasTiming()
for i,s in enumerate(self.sections):
space = ClosedLoopRobotCSpace(robot,self.getIKProblem(i))
for j in range(len(s.configs)-1):
ikpath = space.interpolationPath(s.configs[j],s.configs[j+1],eps)
if hastiming:
t0 = s.times[j]
t1 = s.times[j+1]
else:
t0 = len(res.milestones)
t1 = t0 + 1
iktimes = [t0 + float(k)/float(len(ikpath)-1)*(t1-t0) for k in range(len(ikpath))]
res.milestones += ikpath[:-1]
res.times += iktimes[:-1]
res.milestones.append(self.sections[-1].configs[-1])
else:
for s in self.sections:
res.milestones += s.configs[:-1]
res.milestones.append(self.sections[-1].configs[-1])
if self.sections[0].velocities is not None:
vels = []
for s in self.sections:
assert s.velocities is not None,"Some sections have velocities, some don't?"
vels += s.velocities[:-1]
vels.append(self.sections[-1].velocities[-1])
for i,q in enumerate(res.milestones):
assert len(vels[i]) == len(q),"Velocities don't have the right size?"
res.milestones[i] = q + vels[i]
if not self.hasTiming():
res.times = list(range(len(res.milestones)))
else:
for s in self.sections:
res.times += s.times[:-1]
res.times.append(self.sections[-1].times[-1])
return res
def _escape_nl(text):
return escape(text).replace('\n','
')
def _prettify(elem,indent_level=0):
"""Return a pretty-printed XML string for the Element.
"""
indent = " "
res = indent_level*indent + '<'+elem.tag.encode('utf-8')
for k in elem.keys():
res += " "+k.encode('utf-8')+'="'+_escape_nl(elem.get(k)).encode('utf-8')+'"'
children = elem.getchildren()
if len(children)==0 and not elem.text:
res += ' />'
return res
res += '>'
if elem.text:
res += _escape_nl(elem.text).encode('utf-8')
for c in children:
res += '\n'+_prettify(c,indent_level+1)
if len(children)>0:
res += '\n'+indent_level*indent
res += '</'+elem.tag.encode('utf-8')+'>'
return res