"""Contains utilities for the Klampt Robot Interface Layer.
The :class:`RobotInterfaceCompleter` class is extremely widely used to
standardize the capabilities of
:class:`~.robotinterface.RobotInterfaceBase` to handle
advanced functions. For example, if you have a position controlled robot
(e.g., an Arduino-controlled set of motors) and want to do more
sophisticated work with it, you can create a completer and get access to
motion queues, Cartesian control, collision filters, etc.
You can also assemble unified interfaces to robots composed of separate
independent parts via :class:`OmniRobotInterface`. For example, if you
have a robot arm + gripper, each working on separate physical interfaces,
creating an OmniRobotInterface will let you simultaneously control both.
Helper classes:
* :class:`StepContext`: uses a Python context to guard beginStep()/endStep()
pairs.
* :class:`ThreadedRobotInterface`: adapts a synchronous interface to an
asynchronous one via threading.
* :class:`MultiprocessingRobotInterface`: adapts a synchronous interface to an
asynchronous one via multiprocessing.
* :class:`RobotInterfaceRecorder`: records or plays back commands to a
RIL object.
Hints on wrapping to get the best performance:
* RobotInterfaceCompleter sends immediate state queries and commands to the
base interface, so it is more efficient than OmniRobotInterface when the
base interface implements the given commands and queries.
* OmniRobotInterface makes all state queries at the start of the step and
delays sending the actual commands at the end of the step.
* For asynchronous or client-server interfaces, the user has the choice of
completing the base interface then wrapping it (e.g.,
``ThreadedRobotInterface(RobotInterfaceCompleter(MyInterface)))``, or
wrapping the base interface then completing it (e.g.,
``RobotInterfaceCompleter(ThreadedRobotInterface(MyInterface)))``. In the
former case, more information is transmitted over the network, but the
caller thread does less work. In the latter case, less information is
transmitted, but the caller thread must do more work (including, for
example, Cartesian control and collision detection.)
* Collision detection is usually the most computationally complex step.
Configure your klamptModel's geometries to be as simple as possible and
set the trajectory collision checking to... TODO
This module also provides :func:`make_from_file` which can either take a
.py / .pyc file, or a module string naming a Python file with a `make`
method.
"""
"""
TODO: Multi-part controllers that run at different rates
TODO: figure out how multi-part FILTERS will work
- should command filters be checked "caller side" (i.e., right when command is issued)
- pros: less likelihood to induce timing overruns inside server, easier multithreaded / distributed implementation because the filter doesn't need to be transferred
- cons: less information available because commands have not been collected
- should filters be set per part and only affect commands / states on that part, or should they propagate?
Resolved: only affect the part. It is up to user to create multi-part filters on parent.
- should filters check a whole commanded trajectory, or just run in real time on the current command?
TODO: figure out how emulated gravity compensation will work -- need full-body sensed configuration
Complex example:
* robot: OmniRobotInterface
* base - MobileBaseInterface
* left - OmniRobotInterface #TODO: figure how nested multi-robot interfaces should work
- arm - ArmInterface1
- gripper - GripperInterface1
* right - GripperArmInterface
- arm: standard part
- gripper - standard part
* head - HeadInterface
left_arm = robot.partInterface("left").partInterface("arm") # wouldn't work, only one layer of parts
right_arm = robot.partInterface("right").partInterface("arm") # wouldn't work, only one layer of parts
left_arm.configFromKlampt() # world work
left_arm.moveToCartesian(T_target,frame='base') # works, frame='base' is accessible in part interface
left_arm.moveToCartesian(T_target,frame='world') # works, frame='world' is accessible in state thanks to emulator
robot.setFilter("self_collision",SelfCollisionFilter(robot.klamptModel())) #does not work, filters not working properly
right_arm.moveToPosition(colliding_right_arm_config) # would not raise collision, left_arm and right_arm are not filtered
left_arm.moveToPosition(colliding_left_arm_config)
right_arm.moveToPosition(colliding_right_arm_config) # TODO: should this raise a collision error? Or should it wait
left_arm.moveToPosition(non_colliding_left_arm_config)
OmniRobotInterface:
'part' means 'a legitimate part corresponding to a physical RIL'
'subpart' means some collection of indices corresponding to a whole
Parts must have disjoint set of indices whose union is the whole robot.
The klamptModel for a part/sub-part is a SubRobotModel of the whole robot.
beginStep():
for all parts:
part.beginStep()
gather state / status information into unified state / status
emulate parts of the state that are unavailable
for all parts / subparts:
distribute unified state
emulate base transforms and cartesian transforms
store a prevstatus structure
run filters affected by state -- TODO: do we go through tree of interfaces to find part and sub-part filters?
whole, part, and sub-part queries:
retrieve items from distributed state / settings
for cartesian queries w.r.t. frame='world', klamptModel is set to whole robot's sensed, commanded, or destination position
whole, part, and sub-part commands:
status change updates immediately
run filters affected by command -- TODO: how do we run filter for a part or a sub-part?
command goes into queue
internally, can do some immediate changes of unified state e.g., for commanded position and joint control mode
endStep():
distribute commands to emulators
for emulated joints and multi-axis controllers:
update emulated joint state
update multi-axis controller state
distribute emulator commands to parts
if any item in status was changed by user (status.x!=prevstatus.x), distribute to relevant part(s)
for all parts:
part.endStep()
Multi-part Cartesian commands and queries w.r.t. base/world work because each
interface is aware of the parent and its Cartesian transform.
"""
from .robotinterface import RobotInterfaceBase
from ..math import vectorops,so2,so3,se3,spline
from ..plan import motionplanning
from ..model.trajectory import Trajectory,HermiteTrajectory
from ..model.robotinfo import RobotInfo
from ..robotsim import WorldModel,RobotModel
from ..model.subrobot import SubRobotModel
from .cartesian_drive import CartesianDriveSolver
from . import blocks
from .utils import Promise
import bisect
import math
import warnings
import time
from functools import wraps
from ..model.typing import Vector,Vector3,RigidTransform
from typing import Dict,Any,Union,Optional,List,Tuple,Sequence,Callable,TextIO
import copy
import numpy as np
PART_NAME_SEPARATOR = '__'
SETTING_COMMAND_METHODS = ['reset','enableSensor','setPIDGains','setToolCoordinates','setGravityCompensation']
MOTION_COMMAND_METHODS = ['estop','softStop',
'setPosition','setVelocity','setTorque','setPID',
'moveToPosition','setPiecewiseLinear','setPiecewiseCubic',
'setCartesianPosition','moveToCartesianPosition','setCartesianVelocity','setCartesianForce',
'setControlMode']
STRUCTURE_QUERY_METHODS = ['numJoints','parts','controlRate','jointName','sensors','hasSensor','klamptModel']
SETTING_QUERY_METHODS = ['status','clock','getPIDGains','getToolCoordinates','getGravityCompensation']
SENSOR_QUERY_METHODS = ['enabledSensors','sensorMeasurements','sensorUpdateTime']
MOTION_QUERY_METHODS = ['isMoving',
'sensedPosition','sensedVelocity','sensedTorque',
'commandedPosition','commandedVelocity','commandedTorque',
'destinationPosition','destinationVelocity','destinationTime',
'queuedTrajectory',
'sensedCartesianPosition','sensedCartesianVelocity','sensedCartesianForce',
'commandedCartesianPosition','commandedCartesianVelocity','commandedCartesianForce',
'destinationCartesianPosition','destinationCartesianVelocity',
'queuedCartesianTrajectory']
UTILITY_METHODS = ['indices','cartesianPosition','cartesianVelocity','cartesianForce',
'partToRobotConfig','robotToPartConfig',
'configFromKlampt','velocityFromKlampt','configToKlampt','velocityToKlampt']
COMMAND_METHODS = SETTING_COMMAND_METHODS+MOTION_COMMAND_METHODS
QUERY_METHODS = STRUCTURE_QUERY_METHODS+SETTING_QUERY_METHODS+SENSOR_QUERY_METHODS+MOTION_QUERY_METHODS
CUSTOM_METHODS = ['functionCall','setSetting','getSetting','setFilter','stateValue']
ALL_METHODS = COMMAND_METHODS + QUERY_METHODS + CUSTOM_METHODS
[docs]class StepContext:
"""Makes it easier to handle exceptions in RIL control code. Flags allow
you to handle exceptions manually, print exceptions, or silence them
completely.
Args:
interface (RobotInterfaceBase): the interface to step.
ignore (bool): if False, raises the exception. Otherwise, prints
an error message.
silent (bool): if True, doesn't print error messages if ignore=True.
Members:
numExceptions (int): counts the # of exceptions raised
Usage::
interface = MyRobotInterface()
interface.initialize()
looper = TimedLooper(1.0/interface.controlRate())
#declare context outside of loop so numExceptions can be counted
context = StepContext(interface,ignore=True)
while looper:
with context:
interface.setCartesianPosition([0.4]) #incorrect arguments, exceptions will be printed
if context.numExceptions > 10:
print("Breaking loop, too many exceptions.")
looper.stop()
"""
def __init__(self, interface : RobotInterfaceBase, ignore=False, silent=False):
self.interface = interface
self.ignore = ignore
self.silent = silent
self.numExceptions = 0
self._beginStep_exception = None
self._beginStep_traceback = None
def __enter__(self):
try:
self.interface.beginStep()
except Exception as e:
if self.ignore:
import traceback
self._beginStep_exception = e
self._beginStep_traceback = traceback.extract_tb()
self.numExceptions += 1
else:
raise
return self
def __exit__(self, type, value, tb):
if type is not None:
self.numExceptions += 1
if self.ignore and not self.silent:
if self._beginStep_exception is not None:
print("Exception during {}.beginStep()".format(str(self.interface)))
print(" Value:", self._beginStep_exception)
print(" Traceback:", self._beginStep_traceback)
self._beginStep_exception = None
self._beginStep_traceback = None
elif type is not None:
print("Exception between {}.beginStep/endStep()".format(str(self.interface)))
print(" Type:", type)
print(" Value:", value)
print(" Traceback:", tb)
try:
self.interface.endStep()
except Exception as e:
self.numExceptions += 1
if self.ignore:
import traceback
print("Exception during {}.endStep()".format(str(self.interface)))
print(" Value:", e)
print(" Traceback:", traceback.extract_tb())
else:
raise
if self.ignore:
return True
[docs]def make_from_file(fn : str, robotModel : RobotModel, *args,**kwargs) -> RobotInterfaceBase:
"""Create a RobotInterfaceBase from a Python file or module containing the
``make()`` function.
args and kwargs will be passed to ``make``.
Example::
iface = make_from_file('klampt.control.simrobotcontroller', robot)
"""
import importlib
if fn.endswith('py') or fn.endswith('pyc'):
import os
import sys
path,base = os.path.split(fn)
mod_name,file_ext = os.path.splitext(base)
sys.path.append(os.path.abspath(path))
mod = importlib.import_module(mod_name,base)
sys.path.pop(-1)
else:
mod = importlib.import_module(fn)
try:
maker = mod.make
except AttributeError:
print("Module",mod.__name__,"must have a make() method")
raise
return maker(robotModel,*args,**kwargs)
class _Struct:
"""Base class for stateful items.
If the struct has a member ``children`` it is assumed to be hierarchical.
Fields in class variable DO_NOT_SERIALIZE are not serialized
"""
DO_NOT_SERIALIZE = []
def __init__(self,rhs=None):
if rhs is not None: #copy constructor
for k in self.__dict__:
if k=='children' and rhs.children is not None:
self.children = dict()
for (name,value) in rhs.children.items():
assert isinstance(value,_Struct)
self.children[name] = self.__class__(value)
else:
setattr(self,k,copy.copy(getattr(rhs,k)))
def to_json(self):
def _to_json(obj,name):
if isinstance(obj,np.ndarray):
return obj.tolist()
elif isinstance(obj,_Struct):
return obj.to_json()
elif isinstance(obj,dict):
assert all(isinstance(k,str) for k in obj),"All dictionary keys of {} must be str".format(name)
return dict((k,_to_json(v,name)) for (k,v) in obj.items())
elif isinstance(obj,(tuple,list)):
return [_to_json(v,name) for v in obj]
return obj
res = dict()
for (k,v) in self.__dict__.items():
if v is not None and k not in self.__class__.DO_NOT_SERIALIZE:
res[k] = _to_json(v,k)
return res
def from_json(self,jsonobj):
for k in self.__dict__:
if k not in self.__class__.DO_NOT_SERIALIZE:
self.__dict__[k] = None
for (k,v) in jsonobj.items():
self.__dict__[k] = v
if 'children' in jsonobj and jsonobj['children'] is not None:
for (k,v) in self.children.items():
self.children[k] = self.__class__()
self.children[k].from_json(v)
def complete(self) -> bool:
"""Returns true if all items are present in state"""
if not all(v is not None for k,v in self.__dict__.items()):
return False
if hasattr(self,'children') and self.children is not None:
for (k,v) in self.children.items():
if not v.complete():
return False
return True
def update(self,rhs) -> None:
"""Adds any non-None items from rhs to self."""
for k,v in rhs.__dict__.items():
if v is None: continue
if k == 'children':
if self.children is None:
self.children = dict()
for ck,cv in v.items():
if ck not in self.children:
self.children[ck] = cv
else:
self.children[ck].update(cv)
else:
self.__dict__[k] = v
class _RobotInterfaceStructure(_Struct):
"""Constant information over the life of an interface."""
DO_NOT_SERIALIZE = ["robotModel"]
def __init__(self):
self.controlRate = None #type: float
self.numJoints = None #type: int
self.jointNames = None #type: List[str]
self.parts = None #type: Dict[Union[None,int],List[int]]
self.sensors = None #type: List[str]
self.klamptModel = None #type: Union[RobotModel,SubRobotModel]
def to_json(self):
#can't serialize None
if self.parts is not None and None in self.parts:
ROOT_PART_TOKEN = '__'
rootpart = self.parts[None]
self.parts[ROOT_PART_TOKEN] = rootpart
del self.parts[None]
res = _Struct.to_json(self)
del self.parts[ROOT_PART_TOKEN]
self.parts[None] = rootpart
return res
return _Struct.to_json(self)
def from_json(self,jsonobj):
_Struct.from_json(self,jsonobj)
ROOT_PART_TOKEN = '__'
if self.parts is not None and ROOT_PART_TOKEN in self.parts:
rootpart = self.parts[ROOT_PART_TOKEN]
self.parts[None] = rootpart
del self.parts[ROOT_PART_TOKEN]
class _RobotInterfacePartSettings(_Struct):
"""Per-part settings structure, with nesting.
Includes settings for Cartesian control, gravity compensation,
sensors, and custom settings.
"""
def __init__(self, rhs : '_RobotInterfacePartSettings' = None):
self.toolCoordinates = None #type: Vector3
self.gravity = None #type: Vector3
self.load = None #type: float
self.load_com = None #type: Vector3
self.settings = None #type: Dict[str,Any]
self.sensorEnabled = None #type: Dict[str,bool]
self.children = None #type: Dict[str,_RobotInterfacePartSettings]
_Struct.__init__(self,rhs)
class _RobotInterfaceSettings(_RobotInterfacePartSettings):
"""Global settings structure, treated as read/write. Inherits per-part
settings, adds PID gains.
"""
def __init__(self):
_RobotInterfacePartSettings.__init__(self)
self.kP = None #type: Vector
self.kI = None #type: Vector
self.kD = None #type: Vector
class _RobotInterfaceCartesianState(_Struct):
"""Cartesian control state, represented as coordinates in the
part's base frame.
If this is a sub-part interface, base elements should be given.
These map from the part's base to the world frame.
"""
def __init__(self):
self.sensedPosition = None #type: RigidTransform
self.sensedVelocity = None #type: Tuple[Vector3,Vector3]
self.sensedForce = None #type: Tuple[Vector3,Vector3]
self.commandedPosition = None #type: RigidTransform
self.commandedVelocity = None #type: Tuple[Vector3,Vector3]
self.commandedForce = None #type: Tuple[Vector3,Vector3]
self.destinationPosition = None #type: RigidTransform
self.destinationVelocity = None #type: Tuple[Vector3,Vector3]
self.baseSensedPosition = None #type: RigidTransform
self.baseSensedVelocity = None #type: Tuple[Vector3,Vector3]
self.baseCommandedPosition = None #type: RigidTransform
self.baseCommandedVelocity = None #type: Tuple[Vector3,Vector3]
self.baseDestinationPosition = None #type: RigidTransform
self.baseDestinationVelocity = None #type: Tuple[Vector3,Vector3]
class _RobotInterfacePartState(_Struct):
"""State for individual parts and sub-parts"""
def __init__(self, rhs : '_RobotInterfacePartState' = None):
self.cartesianState = None #type: _RobotInterfaceCartesianState
self.sensorState = None #type: Dict[str,Tuple[float,Vector]]
self.children = None #type: Dict[str,_RobotInterfacePartState]
_Struct.__init__(self,rhs)
def from_json(self,jsonobj):
_Struct.from_json(self,jsonobj)
if self.cartesianState is not None:
cartesian_obj = self.cartesianState
self.cartesianState = _RobotInterfaceCartesianState()
self.cartesianState.from_json(cartesian_obj)
class _RobotInterfaceState(_RobotInterfacePartState):
"""Internal state of the robot, treated as read-only. Inherits per-part
state, adds joint-wise values."""
def __init__(self):
_RobotInterfacePartState.__init__(self)
self.clock = None #type: float
self.rate = None #type: float
self.status = None #type: str
self.isMoving = None #type: bool
self.sensedPosition = None #type: Vector
self.sensedVelocity = None #type: Vector
self.sensedTorque = None #type: Vector
self.commandedPosition = None #type: Vector
self.commandedVelocity = None #type: Vector
self.commandedTorque = None #type: Vector
self.destinationPosition = None #type: Vector
self.destinationVelocity = None #type: Vector
self.destinationTime = None #type: float
self.queuedTrajectory = None #type: List[Tuple[List[float],List[float],List[float]]]
self.jointControlModes = None #type: List[str]
self.stateValue = {} #type: Dict[str,Vector]
class _RobotInterfaceCommand(_Struct):
"""Command from the user to the robot."""
DO_NOT_SERIALIZE = ["promise"]
def __init__(self):
self.time = None #type: float
self.indices = None #type: List[int]
self.func = None #type: str
self.args = None #type: Tuple[Any]
self.promise = None #type: Promise
def _valid_vector(v,length=None) -> bool:
if v is None: return False
if length is not None and len(v) != length: return False
if all(x is not None for x in v):
if any(x is None for x in v):
print("WARNING: vector has mized values and None: {}".format(v))
return False
return True
return False
def _gather_state_var(res,state,state_inds,attr,n):
stateattr = getattr(state,attr)
if stateattr is not None:
resattr = getattr(res,attr)
if resattr is None:
resattr = [None]*n
setattr(res,attr,resattr)
for i,j in enumerate(state_inds):
#assert stateattr[i] is not None,"Part reported vector of None for state attribute for {}".format(attr)
resattr[j] = stateattr[i]
def _split_state_var(unified,state,state_inds,attr):
unifiedattr = getattr(unified,attr)
if unifiedattr is not None:
state_vals = [None]*len(state_inds)
for i,j in enumerate(state_inds):
state_vals[i] = unifiedattr[j]
if not all(v is None for v in state_vals):
setattr(state,attr,state_vals)
def _gather_state(parts : Dict[str,Sequence[int]],
part_states : Dict[str,_RobotInterfaceState],
unified_state: _RobotInterfaceState) -> None:
"""Merges sub-parts' states into a macro state"""
assert len(parts) == len(part_states)
assert len(parts) > 0
n = max(max(inds) for inds in parts.values())+1
for (name,inds) in parts.items():
if name is None: continue
state = part_states[name]
if state.clock is not None:
if unified_state.clock is None:
unified_state.clock = state.clock
if unified_state.rate is not None:
if unified_state.rate is None:
unified_state.rate = state.rate
else:
unified_state.rate = max(state.rate,unified_state.rate)
if state.status is not None:
if unified_state.status is None:
unified_state.status = state.status
else:
if state.status == 'ok':
pass
elif unified_state.status == 'ok':
unified_state.status = state.status
else:
unified_state.status = unified_state.status + ", " + state.status
if state.isMoving is not None:
if unified_state.isMoving is None:
unified_state.isMoving = state.isMoving
else:
unified_state.isMoving = unified_state.isMoving or state.isMoving
_gather_state_var(unified_state,state,inds,'sensedPosition',n)
_gather_state_var(unified_state,state,inds,'sensedVelocity',n)
_gather_state_var(unified_state,state,inds,'commandedPosition',n)
_gather_state_var(unified_state,state,inds,'commandedVelocity',n)
_gather_state_var(unified_state,state,inds,'commandedTorque',n)
_gather_state_var(unified_state,state,inds,'destinationPosition',n)
_gather_state_var(unified_state,state,inds,'destinationVelocity',n)
if state.destinationTime is not None:
if unified_state.destinationTime is None:
unified_state.destinationTime = state.destinationTime
else:
unified_state.destinationTime = max(unified_state.destinationTime,state.destinationTime)
if state.queuedTrajectory is not None:
if unified_state.queuedTrajectory is None:
unified_state.queuedTrajectory = [(None,None,None)]*n
for i,j in enumerate(inds):
unified_state.queuedTrajectory[j] = state.queuedTrajectory[i]
_gather_state_var(unified_state,state,inds,'jointControlModes',n)
for k,v in state.stateValue.items():
if k not in unified_state.stateValue:
unified_state.stateValue[k] = [None]*n
for i,j in enumerate(inds):
unified_state.stateValue[k][j] = v[i]
if state.cartesianState is not None or state.sensorState is not None or state.children is not None:
if unified_state.children is None:
unified_state.children = dict()
unified_state.children[name] = _RobotInterfacePartState(state)
def _split_state(unified_state: _RobotInterfaceState,
parts : Dict[str,List[int]]) -> Dict[str,_RobotInterfaceState]:
"""Splits a macro-state into a list of sub-parts' states"""
res = dict()
for (name,inds) in parts.items():
if name is None: continue
state = _RobotInterfaceState()
res[name] = state
state.clock = unified_state.clock
state.rate = unified_state.rate
state.status = unified_state.status
state.isMoving = unified_state.isMoving
_split_state_var(unified_state,state,inds,'sensedPosition')
_split_state_var(unified_state,state,inds,'sensedPosition')
_split_state_var(unified_state,state,inds,'sensedVelocity')
_split_state_var(unified_state,state,inds,'commandedPosition')
_split_state_var(unified_state,state,inds,'commandedVelocity')
_split_state_var(unified_state,state,inds,'commandedTorque')
_split_state_var(unified_state,state,inds,'destinationPosition')
_split_state_var(unified_state,state,inds,'destinationVelocity')
state.destinationTime = unified_state.destinationTime
if unified_state.queuedTrajectory is not None:
state.queuedTrajectory = [unified_state.queuedTrajectory[j] for j in inds]
_split_state_var(unified_state,state,inds,'jointControlModes')
for k,v in unified_state.stateValue.items():
state.stateValue[k] = [v[j] for j in inds]
if name in unified_state.children:
partState = unified_state.children[name]
state.sensorState = copy.copy(partState.sensorState)
state.cartesianState = copy.copy(partState.cartesianState)
state.children = copy.deepcopy(partState.children)
return res
def _gather_settings(parts : Dict[str,List[int]],
part_settings : Dict[str,_RobotInterfaceSettings],
unified_settings : _RobotInterfaceSettings) -> None:
if None in parts:
assert len(part_settings)+1 == len(parts),"part settings isn't the right size? {} vs {}".format(list(part_settings.keys()),list(parts.keys()))
else:
assert len(part_settings) == len(parts),"part settings isn't the right size? {} vs {}".format(list(part_settings.keys()),list(parts.keys()))
assert len(parts) > 0
n = max(max(inds) for inds in parts.values())+1
for (partname,inds) in parts.items():
if partname is None: continue
settings = part_settings[partname]
_gather_state_var(unified_settings,settings,inds,'kP',n)
_gather_state_var(unified_settings,settings,inds,'kI',n)
_gather_state_var(unified_settings,settings,inds,'kD',n)
if any(getattr(settings,n) is not None for n in ['toolCoordinates','gravity','settings','sensorEnabled','children']):
if unified_settings.children is None:
unified_settings.children = dict()
unified_settings.children[partname] = _RobotInterfacePartSettings(settings)
def _split_settings(unified_settings : _RobotInterfaceSettings,
parts : Dict[str,List[int]]) -> Dict[str,_RobotInterfaceSettings]:
res = dict()
for (partname,inds) in parts.items():
if partname is None: continue
settings = _RobotInterfaceSettings()
res[partname] = settings
_split_state_var(unified_settings,settings,inds,'kP')
_split_state_var(unified_settings,settings,inds,'kI')
_split_state_var(unified_settings,settings,inds,'kD')
if unified_settings.children is not None and partname in unified_settings.children:
partsetting = unified_settings.children[partname]
for n in ['toolCoordinates','gravity','load','load_com','settings','sensorEnabled','children']:
setattr(settings,n,copy.copy(getattr(partsetting,n)))
return res
def _gather_commands(part_commands : Sequence[Sequence[_RobotInterfaceCommand]],
part_indices : Sequence[Sequence[int]]) -> List[_RobotInterfaceCommand]:
assert len(part_commands)==len(part_indices)
res = dict()
for (cmdqueue,inds) in zip(part_commands,part_indices):
for command in cmdqueue:
if command.indices is None: whole_inds = inds
else: whole_inds = [inds[i] for i in command.indices]
if command.func not in res:
cmd = _RobotInterfaceCommand()
cmd.time = command.time
cmd.indices = whole_inds
cmd.func = command.func
cmd.args = [copy.copy(a) for a in command.args]
res[command.func] = cmd
else:
#func already exists, now add to it
cmd = res[command.func]
cmd.time = max(cmd.time,command.time)
if cmd.indices == whole_inds:
#more up-to-date version of same indices
cmd.args = [copy.copy(a) for a in command.args]
else:
#merge
if cmd.indices is not None:
for i in cmd.indices:
if i in whole_inds:
raise NotImplementedError("TODO: command index clashes")
else:
cmd.indices = inds
for i,(a,a2) in enumerate(zip(cmd.args,command.args)):
if hasattr(a2,'__iter__') and len(a2)==len(inds):
assert len(a) == len(cmd.indices)
cmd.args[i].extend(a2)
cmd.indices += whole_inds
return sorted(list(res.values()), key = lambda x:x.time)
def _update_from_part_settings(interface : RobotInterfaceBase, settings : _RobotInterfacePartSettings, delta_settings : _RobotInterfacePartSettings):
if delta_settings.toolCoordinates is not None:
if interface is not None:
try:
interface.setToolCoordinates(delta_settings.toolCoordinates)
except NotImplementedError as e:
return e
settings.toolCoordinates = delta_settings.toolCoordinates
if delta_settings.gravity is not None:
assert delta_settings.load is not None
assert delta_settings.load_com is not None
if interface is not None:
try:
interface.setGravityCompensation(delta_settings.gravity,delta_settings.load,delta_settings.load_com)
except NotImplementedError as e:
return e
settings.gravity = delta_settings.gravity
settings.load = delta_settings.load
settings.load_com = delta_settings.load_com
if delta_settings.settings is not None:
if settings.settings is None:
settings.settings = dict()
for k,val in delta_settings.settings.items():
if k not in settings.settings or val != settings.settings[k]:
if interface is not None:
try:
interface.setSetting(k,val)
except NotImplementedError as e:
return e
settings.settings[k] = val
if delta_settings.sensorEnabled is not None:
if settings.sensorEnabled is None:
settings.sensorEnabled = dict()
for k,val in delta_settings.sensorEnabled.items():
if k not in settings.sensorEnabled or val != settings.sensorEnabled[k]:
if interface is not None:
interface.enableSensor(k,val)
settings.sensorEnabled[k] = val
def _update_from_settings(interface : RobotInterfaceBase,
settings : _RobotInterfaceSettings,
delta_settings : _RobotInterfaceSettings):
errors = []
if delta_settings.kP is not None:
assert delta_settings.kI is not None
assert delta_settings.kD is not None
#fill with prior settings
if settings.kP is not None:
for i,v in enumerate(delta_settings.kP):
if v is not None: settings.kP[i] = v
for i,v in enumerate(delta_settings.kI):
if v is not None: settings.kI[i] = v
for i,v in enumerate(delta_settings.kD):
if v is not None: settings.kD[i] = v
else:
settings.kP = delta_settings.kP
settings.kI = delta_settings.kI
settings.kD = delta_settings.kD
if any(v is None for v in settings.kP):
raise ValueError("kP has a null value")
if any(v is None for v in settings.kI):
raise ValueError("kI has a null value")
if any(v is None for v in settings.kD):
raise ValueError("kD has a null value")
if interface is not None:
try:
interface.setPIDGains(settings.kP,settings.kD,settings.kI)
except NotImplementedError as e:
errors.append(e)
res = _update_from_part_settings(interface,settings,delta_settings)
if res is not None:
errors.append(res)
return errors
def _update_parts_from_settings(part_interfaces : Dict[str,RobotInterfaceBase],
part_indices : Dict[str,List[int]],
settings : _RobotInterfaceSettings,
delta_settings : _RobotInterfaceSettings,
full_update = False):
errors = []
for part,iface in part_interfaces.items():
if delta_settings.children is None or part not in delta_settings.children: continue
assert part is not None,"Shouldn't have None as a partInterface"
res = _update_from_part_settings(iface,settings.get(part,_RobotInterfacePartSettings()),delta_settings.children[part])
errors += res
if full_update:
delta_sub_settings = _split_settings(delta_settings,[part_indices[part] for part in part_interfaces])
sub_settings = _split_settings(settings,[part_indices[part] for part in part_interfaces])
for ((part,iface),sub_setting,delta_sub_setting) in zip(part_interfaces.items(),sub_settings,delta_sub_settings):
_update_from_settings(iface, sub_setting, delta_sub_setting)
return errors
def _try_methods(obj, fn: Union[str,List[str]], args, fallback : Callable = None, cache : Dict = None):
"""Tries calling a method implemented in ``obj``, with potential
alternatives.
Returns the result of calling ``getattr(obj,fn)(*args)``, or if that raises a
``NotImplementedError``, returns ``fallback(*args)``.
Also, fn and args can be lists. In this case, fallback is ignored, and
instead each function and each argument is tried in turn. The i'th arg
can be a callable (e.g., a lambda function), which produces the argument
tuple for the i'th function.
If cache is not None, it must be a dict, and will speed up the overhead of
evaluation by about 5x or so on multiple calls. If the item is cached, the
function will not call the method and test for a NotImplementedError to
determine if the method is implemented. By passing the same cache to
multiple calls, this function will only explore the object's methods once.
Returns:
multiple: If fn is a list, returns a pair (res,fn) where res is the
result of a successful call, and fn is the function name.
If fn is a str, returns the result of a successful call or the
fallback.
"""
if isinstance(fn,list):
#go through functions
assert len(fn) == len(args)
for i,(f,a) in enumerate(zip(fn,args)):
docall = False
testcall = True
if cache is not None:
try:
if not cache[f]: continue
docall = True
testcall = False
except KeyError:
#test whether it exists
testcall = True
if docall or testcall:
try:
resolved = getattr(obj,f)
except AttributeError:
if cache is not None:
cache[f] = False
continue
if callable(a):
try:
a = a()
except NotImplementedError:
#used to mark something is currently invalid and should cause a fallback
continue
except Exception:
print("Exception raised while evaluating argument of method",f,"of object",str(obj))
raise
args[i] = a
assert isinstance(args[i],(list,tuple)),"Invalid type of argument list, must be tuple: "+str(args[i])
if testcall:
try:
res = resolved(*args[i])
if cache is not None:
cache[f] = True
return res,f
except NotImplementedError:
if cache is not None:
cache[f] = False
except Exception:
print("Exception raised while evaluating method",f,"of object",str(obj))
raise
continue
if docall: #known to exist
return resolved(*args[i]),f
#fall through to next iterations
pass
raise NotImplementedError("Functions "+','.join(fn)+" are not implemented by base interface")
#one function only
docall = False
trycall = True
if cache is not None:
#print("Trying to get native",fn)
try:
docall = cache[fn]
trycall = False
except KeyError:
trycall = True
if trycall:
try:
resolved = getattr(obj,fn)
except AttributeError:
if cache is not None:
cache[fn] = False
if fallback is None:
raise NotImplementedError("Function {} is not defined by base interface {}, no fallback available".format(fn,obj))
if callable(fallback):
return fallback(*args)
return fallback
try:
#print("...first time...")
res = resolved(*args)
#print("...available")
if cache is not None:
cache[fn] = True
return res
except NotImplementedError:
if cache is not None:
cache[fn] = False
except Exception as e:
print("Error",e,"received during call of",fn,"of base",str(obj))
raise
if docall:
try:
res = getattr(obj,fn)(*args)
return res
except Exception as e:
print("Error",e,"received during later call of",fn,"of base",str(obj))
raise
if fallback is None:
raise NotImplementedError("Method {} is not defined by object {}, no fallback available".format(fn,obj))
if callable(fallback):
return fallback(*args)
return fallback
class _LockedCallable:
"""Helper"""
def __init__(self,func,lock):
self._func = func
self._lock = lock
def __call__(self,*args,**kwargs):
with self._lock:
return self._func(*args,**kwargs)
class _RobotInterfaceStatefulBase(RobotInterfaceBase):
"""A helper class that accesses as much as possible from settings / state.
Used to avoid lots of boilerplate code when building more useful wrappers.
Child classes must implement initialize(), beginStep(), and endStep().
Child classes should also clear out _delta_settings at beginStep(), then
check it at endStep() to see settings that have changed. The
:func:`_update_from_settings` helper function propagates changes to the
new settings and any associated robot interface.
"""
def __init__(self):
RobotInterfaceBase.__init__(self)
self._structure = _RobotInterfaceStructure()
self._settings = _RobotInterfaceSettings()
self._delta_settings = None #type: _RobotInterfaceSettings
self._commands = [] #type: List[_RobotInterfaceCommand]
self._state = _RobotInterfaceState()
self._shortcut_update = True #if True, the corresponding query to a setX command will be updated immediately
self._partInterfaces = dict() #type: Dict[str,'_RobotInterfaceStatefulBase']
self._has = dict() #type: Dict[str,bool]
COMPLEX_METHODS = ['_queueCommand','setSetting','getSetting',
'sensors','enabledSensors','hasSensor','enableSensor',
'getGravityCompensation','setGravityCompensation',
'getPIDGains','setPIDGains',
'sensedCartesianPosition','sensedCartesianVelocity','sensedCartesianForce',
'commandedCartesianPosition','commandedCartesianVelocity','commandedCartesianForce',
'destinationCartesianPosition','destinationCartesianVelocity']
def _queueCommand(self,func,args=()):
if not self._has.get(func,True):
raise NotImplementedError("Interface does not implement {}".format(func))
cmd = _RobotInterfaceCommand()
cmd.time = time.time()
cmd.func = func
cmd.args = args
cmd.promise = Promise()
self._commands.append(cmd)
if len(self._commands) > 1000:
raise RuntimeError("Queue has grown abnormally large, must not be correctly processed?")
return cmd.promise
def _init_part_interfaces(self):
for (k,inds) in self._structure.parts.items():
if k is None: continue
partInterface = _RobotInterfaceStatefulBase()
partInterface._structure.controlRate = self._structure.controlRate
partInterface._structure.numJoints = len(inds)
if self._structure.jointNames is not None:
partInterface._structure.jointNames = [self._structure.jointNames[i] for i in inds]
if self._structure.klamptModel is not None:
link_inds = []
for i in inds:
link_inds += self._structure.klamptModel.driver(i).getAffectedLinks()
partInterface._structure.klamptModel = SubRobotModel(self._structure.klamptModel,link_inds)
partInterface.properties['name'] = k
partInterface.properties['part'] = True
self._partInterfaces[k] = partInterface
def _split_settings(self):
for part,iface in self._partInterfaces.items(): #initial time
assert part is not None,"Shouldn't have None as a partInterface"
if part not in self._settings.children:
self._settings.children[part] = _RobotInterfacePartSettings(iface._settings)
sub_settings = _split_settings(self._settings,self._structure.parts)
for part,iface in self._partInterfaces.items():
self._partInterfaces[part]._settings = sub_settings[part]
def _split_state(self):
sub_states = _split_state(self._state, self._structure.parts)
for (part,iface) in self._partInterfaces.items():
iface._state = sub_states[part]
def _clear_deltas(self):
self._delta_settings = _RobotInterfaceSettings()
for (part,iface) in self._partInterfaces.items():
iface._delta_settings = _RobotInterfaceSettings()
def _advance_settings_and_commands(self) -> Tuple[_RobotInterfaceSettings,List[_RobotInterfaceCommand]]:
"""Returns the settings delta and queue of new commands.
Clears the settings delta and command queue.
"""
_gather_settings(self._structure.parts,dict((k,v._delta_settings) for (k,v) in self._partInterfaces.items()),self._delta_settings)
_update_from_settings(None,self._settings,self._delta_settings)
settings = _split_settings(self._settings,self._structure.parts)
for (k,setting) in settings.items():
self._partInterfaces[k]._settings = setting
cmds = _gather_commands([self._commands]+[iface._commands for iface in self._partInterfaces.values()],
[self.indices()]+list(v for (k,v) in self._structure.parts.items() if k is not None))
res = self._delta_settings
#clear settings deltas queues
self._delta_settings = _RobotInterfaceSettings()
for (k,iface) in self._partInterfaces.items():
iface._delta_settings = _RobotInterfaceSettings()
#clear part command queues
for (k,iface) in self._partInterfaces.items():
iface._commands = []
self._commands = []
return res,cmds
def controlRate(self):
if self._structure.controlRate is not None: return self._structure.controlRate
if self._state.rate is None: raise NotImplementedError()
return self._state.rate
def parts(self):
if self._structure.parts is None: raise NotImplementedError()
return self._structure.parts
def partInterface(self,part,joint_idx=None):
if joint_idx is not None:
raise NotImplementedError("TODO: part interfaces for individual joint indices")
return self._partInterfaces[part]
def numJoints(self,part = None):
if part is None:
return self._structure.numJoints
return len(self._structure.parts[part])
def jointName(self,joint_idx):
if self._structure.jointNames is None:
raise NotImplementedError()
return self._structure.jointNames[joint_idx]
def clock(self):
if self._state.clock is None: raise NotImplementedError("clock() not implemented by base or beginStep() not called")
return self._state.clock
def estop(self):
return self._queueCommand('estop')
def softStop(self):
return self._queueCommand('softStop')
def reset(self):
return self._queueCommand('reset')
def setSetting(self,key,value):
if self._delta_settings is None: raise RuntimeError("Settings query called outside of beginStep()/endStep()")
if self._delta_settings.settings is None: self._delta_settings.settings = dict()
self._delta_settings.settings[key] = value
def getSetting(self,key):
if self._delta_settings is None: raise RuntimeError("Settings query called outside of beginStep()/endStep()")
if self._delta_settings.settings is not None and key in self._delta_settings.settings:
return self._delta_settings.settings[key]
if self._settings.settings is None: raise KeyError("Settings has not been initialized")
return copy.copy(self._settings.settings[key])
def sensors(self):
if self._settings.sensorEnabled is None: raise NotImplementedError()
return list(self._settings.sensorEnabled.keys())
def enabledSensors(self):
if self._delta_settings is None: raise RuntimeError("Settings query called outside of beginStep()/endStep()")
if self._delta_settings.sensorEnabled is not None:
if self._settings.sensorEnabled is not None:
#return union of previously and recently enabled sensors
return [s for (s,e) in self._settings.sensorEnabled.items() if e or self._delta_settings.sensorEnabled[s]]
return [s for (s,e) in self._delta_settings.sensorEnabled.items() if e]
if self._settings.sensorEnabled is None: return []
return [s for (s,e) in self._settings.sensorEnabled.items() if e]
def hasSensor(self,sensor):
return self._settings.sensorEnabled and sensor in self._settings.sensorEnabled
def enableSensor(self,sensor,enabled=True):
if self._settings.sensorEnabled is None: raise NotImplementedError()
if self._delta_settings is None: raise RuntimeError("Settings query called outside of beginStep()/endStep()")
if self._delta_settings.sensorEnabled is None:
self._delta_settings.sensorEnabled = dict((s,False) for s in self._settings.sensorEnabled)
self._delta_settings.sensorEnabled[sensor] = enabled
def sensorMeasurements(self,sensor):
if self._state.sensorState is None: raise NotImplementedError()
assert sensor in self._state.sensorState
return self._state.sensorState[sensor][1]
def sensorUpdateTime(self,sensor):
if self._state.sensorState is None: raise NotImplementedError()
assert sensor in self._state.sensorState
return self._state.sensorState[sensor][0]
def setPosition(self,q):
assert len(q)==self._structure.numJoints,"Invalid size of position command: {} != {}".format(len(q),self._structure.numJoints)
res = self._queueCommand('setPosition',(q,))
if self._shortcut_update:
self._state.commandedPosition = copy.copy(q)
return res
def setVelocity(self,v,ttl=None):
assert len(v)==self._structure.numJoints,"Invalid size of velocity command: {} != {}".format(len(v),self._structure.numJoints)
res = self._queueCommand('setVelocity',(v,ttl))
if self._shortcut_update:
self._state.commandedVelocity = copy.copy(v)
return res
def setTorque(self,t,ttl=None):
assert len(t)==self._structure.numJoints,"Invalid size of torque command: {} != {}".format(len(t),self._structure.numJoints)
res = self._queueCommand('setTorque',(t,ttl))
if self._shortcut_update:
self._state.commandedTorque = copy.copy(t)
return res
def setPID(self,q,dq,t=None):
assert len(q)==self._structure.numJoints,"Invalid size of position command: {} != {}".format(len(q),self._structure.numJoints)
assert len(dq)==self._structure.numJoints,"Invalid size of velocity command: {} != {}".format(len(dq),self._structure.numJoints)
if t is not None:
assert len(t)==self._structure.numJoints,"Invalid size of torque command: {} != {}".format(len(t),self._structure.numJoints)
res = self._queueCommand('setPID',(q,dq,t))
if self._shortcut_update:
self._state.commandedPosition = copy.copy(q)
self._state.commandedVelocity = copy.copy(dq)
return res
def setPIDGains(self,kP,kI,kD):
if self._delta_settings is None: raise RuntimeError("Settings query called outside of beginStep()/endStep()")
if not hasattr(kP,'__iter__'):
kP = [kP]*len(self._structure.numJoints)
if not hasattr(kD,'__iter__'):
kD = [kD]*len(self._structure.numJoints)
if not hasattr(kI,'__iter__'):
kI = [kI]*len(self._structure.numJoints)
assert len(kP)==self._structure.numJoints,"Invalid size of kP: {} != {}".format(len(kP),self._structure.numJoints)
assert len(kI)==self._structure.numJoints,"Invalid size of kI: {} != {}".format(len(kI),self._structure.numJoints)
assert len(kD)==self._structure.numJoints,"Invalid size of kD: {} != {}".format(len(kD),self._structure.numJoints)
assert all(v >= 0 for v in kP),"kP must be nonnegative"
assert all(v >= 0 for v in kI),"kI must be nonnegative"
assert all(v >= 0 for v in kD),"kD must be nonnegative"
self._delta_settings.kP = kP
self._delta_settings.kI = kI
self._delta_settings.kD = kD
def getPIDGains(self):
if self._delta_settings is None: raise RuntimeError("Settings query called outside of beginStep()/endStep()")
if self._delta_settings.kP is not None:
return (copy.copy(self._delta_settings.kP),copy.copy(self._delta_settings.kI),copy.copy(self._delta_settings.kD))
if self._settings.kP is None: raise NotImplementedError()
return (copy.copy(self._settings.kP),copy.copy(self._settings.kI),copy.copy(self._settings.kD))
def moveToPosition(self,q,speed=1):
assert len(q)==self._structure.numJoints,"Invalid size of position command: {} != {}".format(len(q),self._structure.numJoints)
res = self._queueCommand('moveToPosition',(q,speed))
if self._shortcut_update:
self._state.destinationPosition = copy.copy(q)
return res
def setPiecewiseLinear(self,ts,qs,relative=True):
for q in qs:
assert len(q)==self._structure.numJoints,"Invalid size of position milestone: {} != {}".format(len(q),self._structure.numJoints)
assert len(qs) == len(ts)
res = self._queueCommand('setPiecewiseLinear',(ts,qs,relative))
if self._shortcut_update and len(ts):
self._state.destinationPosition = copy.copy(qs[-1])
return res
def setPiecewiseCubic(self,ts,qs,vs,relative=True):
for q in qs:
assert len(q)==self._structure.numJoints,"Invalid size of position milestone: {} != {}".format(len(q),self._structure.numJoints)
for v in vs:
assert len(v)==self._structure.numJoints,"Invalid size of velocity milestone: {} != {}".format(len(v),self._structure.numJoints)
assert len(qs) == len(ts)
assert len(vs) == len(ts)
res = self._queueCommand('setPiecewiseCubic',(ts,qs,vs,relative))
if self._shortcut_update and len(ts):
self._state.destinationPosition = copy.copy(qs[-1])
self._state.destinationVelocity = copy.copy(vs[-1])
return res
def setToolCoordinates(self,xtool_local):
if self._delta_settings is None: raise RuntimeError("Settings query called outside of beginStep()/endStep()")
self._delta_settings.toolCoordinates = xtool_local
def getToolCoordinates(self):
if self._delta_settings is None: raise RuntimeError("Settings query called outside of beginStep()/endStep()")
if self._delta_settings.toolCoordinates:
return self._delta_settings.toolCoordinates
if self._settings.toolCoordinates is None: raise NotImplementedError()
return self._settings.toolCoordinates
def setGravityCompensation(self,gravity=[0,0,-9.8],load=0.0,load_com=[0,0,0]):
if self._delta_settings is None: raise RuntimeError("Settings query called outside of beginStep()/endStep()")
self._delta_settings.gravity = gravity
self._delta_settings.load = load
self._delta_settings.load_com = load_com
def getGravityCompensation(self):
if self._delta_settings is None: raise RuntimeError("Settings query called outside of beginStep()/endStep()")
if self._delta_settings.gravity:
return (copy.copy(self._delta_settings.gravity),self._delta_settings.load,copy.copy(self._delta_settings.load_com))
if self._settings.gravity is None: raise NotImplementedError()
return (copy.copy(self._settings.gravity),self._settings.load,copy.copy(self._settings.load_com))
def setCartesianPosition(self,xparams,frame='world'):
res = self._queueCommand('setCartesianPosition',(xparams,frame))
if self._shortcut_update and frame == 'base':
self._state.cartesianState.commandedPosition = xparams
return res
def moveToCartesianPosition(self,xparams,speed=1.0,frame='world'):
res = self._queueCommand('moveToCartesianPosition',(xparams,speed,frame))
if self._shortcut_update and frame == 'base':
self._state.cartesianState.destinationPosition = xparams
return res
def moveToCartesianPositionLinear(self,xparams,speed=1.0,frame='world'):
res = self._queueCommand('moveToCartesianPositionLinear',(xparams,speed,frame))
if self._shortcut_update and frame == 'base':
self._state.cartesianState.destinationPosition = xparams
return res
def setCartesianVelocity(self,dxparams,ttl=None,frame='world'):
res = self._queueCommand('setCartesianVelocity',(dxparams,ttl,frame))
if self._shortcut_update and frame == 'base':
self._state.cartesianState.commandedVelocity = dxparams
return res
def setCartesianForce(self,fparams,ttl=None,frame='world'):
res = self._queueCommand('setCartesianForce',(fparams,ttl,frame))
if self._shortcut_update and frame == 'base':
self._state.cartesianState.commandedForce = fparams
return res
def status(self,joint_idx=None):
if self._state.status is None: raise NotImplementedError()
return self._state.status
def isMoving(self,joint_idx=None):
if self._state.isMoving is None: raise NotImplementedError()
return self._state.isMoving
def sensedPosition(self):
if self._state.sensedPosition is None: raise NotImplementedError()
return copy.copy(self._state.sensedPosition)
def sensedVelocity(self):
if self._state.sensedVelocity is None: raise NotImplementedError()
return copy.copy(self._state.sensedVelocity)
def sensedTorque(self):
if self._state.sensedTorque is None: raise NotImplementedError()
return copy.copy(self._state.sensedTorque)
def commandedPosition(self):
if self._state.commandedPosition is None: raise NotImplementedError()
return copy.copy(self._state.commandedPosition)
def commandedVelocity(self):
if self._state.commandedVelocity is None: raise NotImplementedError()
return copy.copy(self._state.commandedVelocity)
def commandedTorque(self):
if self._state.commandedTorque is None: raise NotImplementedError()
return copy.copy(self._state.commandedTorque)
def destinationPosition(self):
if self._state.destinationPosition is None: raise NotImplementedError()
return copy.copy(self._state.destinationPosition)
def destinationVelocity(self):
if self._state.destinationVelocity is None: raise NotImplementedError()
return copy.copy(self._state.destinationVelocity)
def destinationTime(self):
if self._state.destinationTime is None: raise NotImplementedError()
return copy.copy(self._state.destinationTime)
def stateValue(self,name):
if self._state.stateValue is None: raise NotImplementedError()
return copy.copy(self._state.stateValue.get(name,None))
def cartesianPosition(self,q,frame='world'):
assert len(q) == self._structure.numJoints,"Invalid length of configuration: {} != {}".format(len(q),self._structure.numJoints)
toolCoords = self.getToolCoordinates()
model = self.klamptModel()
if toolCoords is None or model is None: raise NotImplementedError()
indices = list(range(model.numDrivers()))
assert(self._structure.numJoints == model.numDrivers())
if frame == 'world' and isinstance(model,SubRobotModel):
raise NotImplementedError("cartesianPosition can't calculate kinematics because it doesn't know whole-robot configuration")
return klamptCartesianPosition(model,q,indices,toolCoords,frame)
def cartesianVelocity(self,q,dq,frame='world'):
assert len(q) == self._structure.numJoints,"Invalid length of configuration: {} != {}".format(len(q),self._structure.numJoints)
assert len(dq) == self._structure.numJoints,"Invalid length of velocity: {} != {}".format(len(dq),self._structure.numJoints)
toolCoords = self.getToolCoordinates()
model = self.klamptModel()
if toolCoords is None or model is None: raise NotImplementedError()
indices = list(range(model.numDrivers()))
assert(self._structure.numJoints == model.numDrivers())
if frame == 'world' and isinstance(model,SubRobotModel):
raise NotImplementedError("cartesianVelocity can't calculate kinematics because it doesn't know whole-robot configuration")
return klamptCartesianVelocity(model,q,dq,indices,toolCoords,frame)
def cartesianForce(self,q,t,frame='world'):
assert len(q) == self._structure.numJoints,"Invalid length of configuration: {} != {}".format(len(q),self._structure.numJoints)
assert len(t) == self._structure.numJoints,"Invalid length of torques: {} != {}".format(len(t),self._structure.numJoints)
toolCoords = self.getToolCoordinates()
model = self.klamptModel()
if toolCoords is None or model is None: raise NotImplementedError()
indices = list(range(model.numDrivers()))
assert(self._structure.numJoints == model.numDrivers())
if frame == 'world' and isinstance(model,SubRobotModel):
raise NotImplementedError("cartesianForce can't calculate kinematics because it doesn't know whole-robot configuration")
return klamptCartesianForce(model,q,t,indices,toolCoords,frame)
def sensedCartesianPosition(self,frame='world'):
toolCoordinates = self.getToolCoordinates()
if toolCoordinates is None: raise NotImplementedError("Cartesian control not available")
if self._state.cartesianState is None or self._state.cartesianState.sensedPosition is None:
raise NotImplementedError("Cartesian control not available")
if frame=='base': return copy.copy(self._state.cartesianState.sensedPosition)
elif frame == 'tool': return se3.identity()
elif frame == 'end effector': return (so3.identity(),copy.copy(toolCoordinates))
else: #base -> world
Tbase = self._state.cartesianState.baseSensedPosition
if Tbase is None: return self._state.cartesianState.sensedPosition
return se3.mul(Tbase,self._state.cartesianState.sensedPosition)
def sensedCartesianVelocity(self,frame='world'):
toolCoordinates = self.getToolCoordinates()
if toolCoordinates is None: raise NotImplementedError("Cartesian control not available")
if self._state.cartesianState is None or self._state.cartesianState.sensedVelocity is None:
raise NotImplementedError("Cartesian control not available")
wtool_base,vtool_base = self._state.cartesianState.sensedVelocity
Ttool_base = self._state.cartesianState.sensedPosition
if frame=='base': return copy.copy(self._state.cartesianState.sensedVelocity)
elif frame == 'tool':
if Ttool_base is None: Rbase_tool = so3.identity()
else: Rbase_tool = so3.inv(Ttool_base[0])
return (so3.apply(Rbase_tool,wtool_base),so3.apply(Rbase_tool,vtool_base))
elif frame == 'end effector':
if Ttool_base is None: Rbase_tool = so3.identity()
else: Rbase_tool = so3.inv(Ttool_base[0])
wtool_tool = so3.apply(Rbase_tool,wtool_base)
vtool_tool = so3.apply(Rbase_tool,vtool_base)
vtool_ee = vectorops.cross(wtool_tool,toolCoordinates)
return (wtool_tool,vectorops.add(vtool_tool,vtool_ee))
else: #base -> world
Tbase = self._state.cartesianState.baseSensedPosition
if Tbase is None: return copy.copy(self._state.cartesianState.sensedVelocity)
Rbase = Tbase[0]
wtool = so3.apply(Rbase,wtool_base)
vtool = so3.apply(Rbase,vtool_base)
if self._state.cartesianState.sensedVelocity is None:
return (wtool,vtool)
else:
wbase,vbase = self._state.cartesianState.sensedVelocity
#add on base angular velocity x relative position of tool
return (vectorops.add(wbase,wtool),vectorops.add(vbase,vtool,vectorops.cross(wbase,Ttool_base[1])))
def sensedCartesianForce(self,frame='world'):
toolCoordinates = self.getToolCoordinates()
if toolCoordinates is None: raise NotImplementedError("Cartesian control not available")
if self._state.cartesianState is None or self._state.cartesianState.sensedForce is None:
raise NotImplementedError("Cartesian control not available")
ttool_base,ftool_base = self._state.cartesianState.sensedForce
Ttool_base = self._state.cartesianState.sensedPosition
if frame=='base': return copy.copy(self._state.cartesianState.sensedForce)
elif frame == 'tool':
if Ttool_base is None: Rbase_tool = so3.identity()
else: Rbase_tool = so3.inv(Ttool_base[0])
return (so3.apply(Rbase_tool,ttool_base),so3.apply(Rbase_tool,ftool_base))
elif frame == 'end effector':
if Ttool_base is None: Rbase_tool = so3.identity()
else: Rbase_tool = so3.inv(Ttool_base[0])
ttool_tool = so3.apply(Rbase_tool,ttool_base)
ftool_tool = so3.apply(Rbase_tool,ftool_base)
ftool_ee = vectorops.cross(ttool_tool,toolCoordinates)
return (ttool_tool,vectorops.add(ftool_tool,ftool_ee))
else: #base -> world
Tbase = self._state.cartesianState.baseSensedPosition
if Tbase is None: return copy.copy(self._state.cartesianState.sensedForce)
Rbase = Tbase[0]
ttool = so3.apply(Rbase,ttool_base)
ftool = so3.apply(Rbase,ftool_base)
return (ttool,ftool)
def commandedCartesianPosition(self,frame='world'):
toolCoordinates = self.getToolCoordinates()
if toolCoordinates is None: raise NotImplementedError("Cartesian control not available")
if self._state.cartesianState is None or self._state.cartesianState.commandedPosition is None:
raise NotImplementedError("Cartesian control not available")
if frame=='base': return copy.copy(self._state.cartesianState.commandedPosition)
elif frame == 'tool': return se3.identity()
elif frame == 'end effector': return (so3.identity(),toolCoordinates)
else: #base -> world
Tbase = self._state.cartesianState.baseCommandedPosition
if Tbase is None:
return copy.copy(self._state.cartesianState.commandedPosition)
return se3.mul(Tbase,self._state.cartesianState.commandedPosition)
def commandedCartesianVelocity(self,frame='world'):
toolCoordinates = self.getToolCoordinates()
if toolCoordinates is None: raise NotImplementedError("Cartesian control not available")
if self._state.cartesianState is None or self._state.cartesianState.commandedVelocity is None:
raise NotImplementedError("Cartesian control not available")
wtool_base,vtool_base = self._state.cartesianState.commandedVelocity
Ttool_base = self._state.cartesianState.commandedPosition
if frame=='base': return copy.copy(self._state.cartesianState.commandedVelocity)
elif frame == 'tool':
if Ttool_base is None: Rbase_tool = so3.identity()
else: Rbase_tool = so3.inv(Ttool_base[0])
return (so3.apply(Rbase_tool,wtool_base),so3.apply(Rbase_tool,vtool_base))
elif frame == 'end effector':
if Ttool_base is None: Rbase_tool = so3.identity()
else: Rbase_tool = so3.inv(Ttool_base[0])
wtool_tool = so3.apply(Rbase_tool,wtool_base)
vtool_tool = so3.apply(Rbase_tool,vtool_base)
vtool_ee = vectorops.cross(wtool_tool,toolCoordinates)
return (wtool_tool,vectorops.add(vtool_tool,vtool_ee))
else: #base -> world
Tbase = copy.copy(self._state.cartesianState.baseCommandedPosition)
if Tbase is None: return self._state.cartesianState.commandedVelocity
Rbase = Tbase[0]
wtool = so3.apply(Rbase,wtool_base)
vtool = so3.apply(Rbase,vtool_base)
if self._state.cartesianState.commandedVelocity is None:
return (wtool,vtool)
else:
wbase,vbase = self._state.cartesianState.commandedVelocity
#add on base angular velocity x relative position of tool
return (vectorops.add(wbase,wtool),vectorops.add(vbase,vtool,vectorops.cross(wbase,Ttool_base[1])))
def commandedCartesianForce(self,frame='world'):
toolCoordinates = self.getToolCoordinates()
if toolCoordinates is None: raise NotImplementedError("Cartesian control not available")
if self._state.cartesianState is None or self._state.cartesianState.commandedForce is None:
raise NotImplementedError("Cartesian control not available")
ttool_base,ftool_base = self._state.cartesianState.commandedForce
Ttool_base = self._state.cartesianState.commandedPosition
if frame=='base': return copy.copy(self._state.cartesianState.commandedForce)
elif frame == 'tool':
if Ttool_base is None: Rbase_tool = so3.identity()
else: Rbase_tool = so3.inv(Ttool_base[0])
return (so3.apply(Rbase_tool,ttool_base),so3.apply(Rbase_tool,ftool_base))
elif frame == 'end effector':
if Ttool_base is None: Rbase_tool = so3.identity()
else: Rbase_tool = so3.inv(Ttool_base[0])
ttool_tool = so3.apply(Rbase_tool,ttool_base)
ftool_tool = so3.apply(Rbase_tool,ftool_base)
ftool_ee = vectorops.cross(ttool_tool,toolCoordinates)
return (ttool_tool,vectorops.add(ftool_tool,ftool_ee))
else: #base -> world
Tbase = self._state.cartesianState.baseCommandedPosition
if Tbase is None: return copy.copy(self._state.cartesianState.commandedForce)
Rbase = Tbase[0]
ttool = so3.apply(Rbase,ttool_base)
ftool = so3.apply(Rbase,ftool_base)
return (ttool,ftool)
def destinationCartesianPosition(self,frame='world'):
toolCoordinates = self.getToolCoordinates()
if toolCoordinates is None: raise NotImplementedError("Cartesian control not available")
if self._state.cartesianState is None or self._state.cartesianState.destinationPosition is None:
raise NotImplementedError("Cartesian control not available")
if frame=='base': return copy.copy(self._state.cartesianState.destinationPosition)
elif frame == 'tool': return se3.identity()
elif frame == 'end effector': return (so3.identity(),toolCoordinates)
else: #base -> world
Tbase = self._state.cartesianState.baseDestinationPosition
if Tbase is None: return self._state.cartesianState.destinationPosition
return se3.mul(Tbase,self._state.cartesianState.destinationPosition)
def destinationCartesianVelocity(self,frame='world'):
toolCoordinates = self.getToolCoordinates()
if toolCoordinates is None: raise NotImplementedError("Cartesian control not available")
if self._state.cartesianState is None or self._state.cartesianState.destinationVelocity is None:
raise NotImplementedError("Cartesian control not available")
wtool_base,vtool_base = self._state.cartesianState.destinationVelocity
Ttool_base = self._state.cartesianState.destinationPosition
if frame=='base': return copy.copy(self._state.cartesianState.destinationVelocity)
elif frame == 'tool':
if Ttool_base is None: Rbase_tool = so3.identity()
else: Rbase_tool = so3.inv(Ttool_base[0])
return (so3.apply(Rbase_tool,wtool_base),so3.apply(Rbase_tool,vtool_base))
elif frame == 'end effector':
if Ttool_base is None: Rbase_tool = so3.identity()
else: Rbase_tool = so3.inv(Ttool_base[0])
wtool_tool = so3.apply(Rbase_tool,wtool_base)
vtool_tool = so3.apply(Rbase_tool,vtool_base)
vtool_ee = vectorops.cross(wtool_tool,toolCoordinates)
return (wtool_tool,vectorops.add(vtool_tool,vtool_ee))
else: #base -> world
Tbase = self._state.cartesianState.baseDestinationPosition
if Tbase is None: return copy.copy(self._state.cartesianState.destinationVelocity)
Rbase = Tbase[0]
wtool = so3.apply(Rbase,wtool_base)
vtool = so3.apply(Rbase,vtool_base)
if self._state.cartesianState.destinationVelocity is None:
return (wtool,vtool)
else:
wbase,vbase = self._state.cartesianState.destinationVelocity
#add on base angular velocity x relative position of tool
return (vectorops.add(wbase,wtool),vectorops.add(vbase,vtool,vectorops.cross(wbase,Ttool_base[1])))
def klamptModel(self):
if self._structure.klamptModel is None:
res = RobotInterfaceBase.klamptModel(self)
self._structure.klamptModel = res
return self._structure.klamptModel
class _RobotInterfaceStatefulWrapper(_RobotInterfaceStatefulBase):
"""A helper class that wraps another RobotInterfaceBase. It follows
the paradigm:
0. Query the base's settings.
1. beginStep(): queries the base's state and stores it.
2. User can
* Change the settings.
* Query state (latched on step 1 and incurs no overhead).
* Issue commands (recorded).
3. endStep():
* If the settings changed, pass them onto the base.
* If commands were issued, pass them to the base.
The main use of this class is so that managing interfaces can
read state/commmands and modify commands before endStep() is called.
It is also helpful for threaded / multiprocessing interfaces because
all settings/state/commands are stored in one place.
"""
def __init__(self, base: RobotInterfaceBase, base_initialized:bool = False):
_RobotInterfaceStatefulBase.__init__(self)
self._base = base #type: RobotInterfaceBase
self._base_initialized = base_initialized
self._has = dict()
self._first_step = True
self._partInterfaces = dict() #type: Dict[str,'_RobotInterfaceStatefulWrapper']
self._in_step = False
self.properties = copy.deepcopy(base.properties)
def _try(self,fn,args,defaultValue=None,strict=True):
try:
if self._has[fn]:
return getattr(self._base,fn)(*args)
return defaultValue
except KeyError: #first time
try:
res = getattr(self._base,fn)(*args)
#print("...available")
self._has[fn] = True
return res
except NotImplementedError:
self._has[fn] = False
return defaultValue
except Exception as e:
if strict:
print("Error",e,"received during first call of",fn,"of base",str(self._base))
raise
else:
self._has[fn] = False
return defaultValue
except Exception as e:
print("Error",e,"received during later call of",fn,"of base",str(self._base))
raise
def __str__(self):
return "StatefulWrapper("+str(self._base)+')'
def initialize(self):
if not self._base_initialized:
if not self._base.initialize(): return False
self._base_initialized = True
self._try('beginStep',())
try:
self._structure.controlRate = self._base.controlRate()
except Exception:
self._structure.controlRate = None
self._structure.parts = self._base.parts()
self._structure.numJoints = len(self._structure.parts[None])
self._structure.klamptModel = self._base.klamptModel()
try:
self._structure.jointNames = [None]*self._structure.numJoints
for i in range(self._structure.numJoints):
self._structure.jointNames[i] = self._base.jointName(i)
except Exception:
self._structure.jointNames = None
for p in self._structure.parts:
if p is None: continue
self._partInterfaces[p] = _RobotInterfaceStatefulWrapper(self._base.partInterface(p),base_initialized=True)
self._partInterfaces[p].properties['name'] = p
self._partInterfaces[p].properties['part'] = True
self._partInterfaces[p].initialize()
if len(self._partInterfaces[p].parts()) > 1:
raise NotImplementedError("TODO: parts that have sub-parts")
#read settings
sensors = self._try('sensors',(),[])
self._structure.sensors = sensors if sensors else None
if sensors:
self._settings.sensorEnabled = dict()
for s in sensors:
self._settings.sensorEnabled[s] = False
for s in self._try('enabledSensors',(),[]):
self._settings.sensorEnabled[s] = True
res = self._try('getPIDGains',(),strict=False)
if res is not None:
self._settings.kP,self._settings.kI,self._settings.kD = res
res = self._try('getToolCoordinates',(),strict=False)
if res is not None:
self._settings.toolCoordinates = res
res = self._try('getGravityCompensation',(),strict=False)
if res is not None:
self._settings.gravity = res[0]
self._settings.load = res[1]
self._settings.load_com = res[2]
if len(self._structure.parts) > 1:
self._settings.children = dict()
res = self._try('getSettings',(),strict=False)
if res is not None:
self._settings.settings = res
self._split_settings()
#discover capabilities by commanding no-ops
self._state.commandedPosition = self._try('commandedPosition',(),self._state.commandedPosition)
self._state.commandedVelocity = self._try('commandedVelocity',(),self._state.commandedVelocity,strict=False)
self._state.commandedTorque = self._try('commandedTorque',(),self._state.commandedTorque,strict=False)
if _valid_vector(self._state.commandedTorque):
self._try('setTorque',(self._state.commandedTorque,))
if _valid_vector(self._state.commandedVelocity):
self._try('setVelocity',(self._state.commandedVelocity,))
self._try('setPiecewiseCubic',([0],[self._state.commandedPosition],[self._state.commandedVelocity]))
self._try('setPID',(self._state.commandedPosition,self._state.commandedVelocity))
else:
vel = [0]*self._structure.numJoints
self._try('setVelocity',(vel,))
self._try('setPiecewiseCubic',([0],[self._state.commandedPosition],[vel]),strict=False)
self._try('setPID',(self._state.commandedPosition,vel),strict=False)
if _valid_vector(self._state.commandedPosition):
self._try('setPosition',(self._state.commandedPosition,))
self._try('moveToPosition',(self._state.commandedPosition,),strict=False)
self._try('setPiecewiseLinear',([0],[self._state.commandedPosition]))
else:
print("strange commanded position?",self._state.commandedPosition)
input()
#print("My base class",self._base,"implements",self._has)
#input("Press enter to continue")
self._try('endStep',())
return True
def close(self):
return self._base.close()
def beginStep(self):
if self._in_step:
raise RuntimeError("beginStep called twice without endStep")
self._try('beginStep',())
self._state.rate = self._try('controlRate',())
self._state.clock = self._try('clock',())
self._state.status = self._try('status',())
self._state.sensedPosition = self._try('sensedPosition',(),self._state.sensedPosition)
self._state.sensedVelocity = self._try('sensedVelocity',(),self._state.sensedVelocity)
self._state.sensedTorque = self._try('sensedTorque',(),self._state.sensedTorque)
self._state.commandedPosition = self._try('commandedPosition',(),self._state.commandedPosition)
self._state.commandedVelocity = self._try('commandedVelocity',(),self._state.commandedVelocity)
self._state.commandedTorque = self._try('commandedTorque',(),self._state.commandedTorque)
self._state.destinationPosition = self._try('destinationPosition',(),self._state.destinationPosition)
self._state.destinationVelocity = self._try('destinationVelocity',(),self._state.destinationVelocity)
self._state.destinationTime = self._try('destinationTime',(),self._state.destinationTime)
self._state.queuedTrajectory = self._try('queuedTrajectory',(),self._state.queuedTrajectory)
if self._settings.toolCoordinates is not None:
if self._state.cartesianState is None:
self._state.cartesianState = _RobotInterfaceCartesianState()
self._state.cartesianState.sensedPosition = self._try('sensedCartesianPosition',('base',),self._state.cartesianState.sensedPosition,strict=False)
self._state.cartesianState.sensedVelocity = self._try('sensedCartesianVelocity',('base',),self._state.cartesianState.sensedVelocity,strict=False)
self._state.cartesianState.sensedForce = self._try('sensedCartesianForce',('base',),self._state.cartesianState.sensedForce,strict=False)
self._state.cartesianState.commandedPosition = self._try('commandedCartesianPosition',('base',),self._state.cartesianState.commandedPosition,strict=False)
self._state.cartesianState.commandedVelocity = self._try('commandedCartesianVelocity',('base',),self._state.cartesianState.commandedVelocity,strict=False)
self._state.cartesianState.commandedForce = self._try('commandedCartesianForce',('base',),self._state.cartesianState.commandedForce,strict=False)
self._state.cartesianState.destinationPosition = self._try('destinationCartesianPosition',('base',),self._state.cartesianState.destinationPosition,strict=False)
self._state.cartesianState.destinationVelocity = self._try('destinationCartesianVelocity',('base',),self._state.cartesianState.destinationVelocity,strict=False)
for sensor in self._try('enabledSensors',(),[]):
if self._state.sensorState is None:
self._state.sensorState = dict()
t_update = self._try('sensorUpdateTime',(sensor,),0)
if sensor not in self._state.sensorState or t_update > self._state.sensorState[sensor][0]:
measurements = self._try('sensorMeasurements',(sensor,),[])
self._state.sensorState[sensor] = (t_update,measurements)
self._split_state()
self._clear_deltas()
for (part,iface) in self._partInterfaces.items():
iface.beginStep()
self._in_step = True
def endStep(self):
if not self._in_step:
raise RuntimeError("endStep called without corresponding beginStep")
errors = []
#update settings from deltas
_gather_settings(self._structure.parts,dict((k,iface._delta_settings) for k,iface in self._partInterfaces.items()),self._delta_settings)
_update_from_settings(self._base,self._settings,self._delta_settings)
_update_parts_from_settings(self._partInterfaces,self._structure.parts,self._settings,self._delta_settings)
self._delta_settings = None
#update child settings
sub_settings = _split_settings(self._settings,self._structure.parts)
for (part,iface) in self._partInterfaces.items():
iface._settings = sub_settings[part]
#read through command queue
for part,iface in self._partInterfaces.items():
for cmd in iface._commands:
try:
getattr(iface,cmd.func)(*cmd.args)
except Exception as e:
if cmd.func in ['reset','estop','softStop']: #bubble-up
getattr(self._base,cmd.func)(*cmd.args)
else:
cmd.promise.errback(e) #TODO: errbacks can add to command queue
errors.append(e)
iface._commands = []
iface._in_step = False
for cmd in self._commands:
if cmd.indices is not None and len(cmd.indices) != self._structure.numJoints:
errors.append("Invalid number of indices")
try:
getattr(self._base,cmd.func)(*cmd.args)
except Exception as e:
cmd.promise.errback(e) #TODO: errbacks can add to command queue
errors.append(e)
#finish
self._commands = []
self._try('endStep',())
self._in_step = False
if errors:
raise RuntimeError("Runtime errors: {}".format(','.join(str(e) for e in errors)))
def partInterface(self,part,joint_idx=None):
if joint_idx is None:
if part not in self._partInterfaces:
raise RuntimeError("Tried to retrieve part interface without initializing")
return self._partInterfaces[part]
raise NotImplementedError("TODO: single-joint interfaces")
def jointName(self,joint_idx):
res = self._try('jointName',[joint_idx])
if res is not None: return res
robot = self.klamptModel()
if robot is None:
return 'Joint '+str(joint_idx)
return robot.link(robot.driver(joint_idx).getAffectedLink()).getName()
def setControlMode(self,mode,*args,**kwargs):
self._queueCommand('setControlMode',[mode,args,kwargs])
def functionCall(self,proc,*args,**kwargs):
self._base.functionCall(proc,*args,**kwargs)
def cartesianPosition(self,q,frame='world'):
return self._try('cartesianPosition',[q,frame])
def cartesianVelocity(self,q,dq,frame='world'):
return self._try('cartesianVelocity',[q,dq,frame])
def cartesianForce(self,q,t,frame='world'):
return self._try('cartesianPosition',[q,t,frame])
def klamptModel(self):
return self._base.klamptModel()
def configFromKlampt(self,klamptConfig,part=None,joint_idx=None):
return self._base.configFromKlampt(klamptConfig,part=None,joint_idx=None)
def velocityFromKlampt(self,klamptVelocity,part=None,joint_idx=None):
return self._base.velocityFromKlampt(klamptVelocity,part=None,joint_idx=None)
def configToKlampt(self,config,klamptConfig=None,part=None,joint_idx=None):
return self._base.configToKlampt(config,klamptConfig,part=None,joint_idx=None)
def velocityToKlampt(self,velocity,klamptVelocity=None,part=None,joint_idx=None):
return self._base.velocityToKlampt(velocity,klamptVelocity=None,part=None,joint_idx=None)
[docs]class OmniRobotInterface(_RobotInterfaceStatefulBase):
"""A RobotInterfaceBase that aggregates one or more parts, which are
communicated with separately. For example, a mobile manipulator can
consist of a base, arms, and grippers.
On startup, call ``addPhysicalPart(name,driver_indices,interface)`` for
each part of the robot. ``driver_indices`` must correspond to the indices
of drivers corresponding to that part in the Klamp't model.
The total configuration for the robot is a list of floats, segmented into
physical parts. The ordering of the parts' configurations, velocities, etc
is given by the driver_indices as they were added.
Allows for complex arrangements like a physical part that has sub-parts,
such as an arm with an integrated gripper handled with the same physical
interface. Sub-parts are named 'PARENT__CHILD'.
TODO: handle more than one layer of nesting.
For example, if you want to assemble a unified interface / klamptModel
from two interfaces, with part2 (e.g., a gripper) mounted on part1
(e.g., an arm), you'd run::
voltron_controller = OmniRobotInterface(klampt_model)
voltron_controller.addPhysicalPart("part1",part1_interface,part1_indices)
voltron_controller.addPhysicalPart("part2",part2_interface,part2_indices)
if not voltron_controller.initailize():
print("hmm... error on initialize?")
... do stuff, treating voltron_controller as a unified robot ...
If you have separate parts with their own Klamp't models, and wish to
dynamically construct a unified model, you may use the ``mount`` command
as follows::
world = WorldModel()
world.readFile(part1_interface.properties['klamptModelFile'])
world.readFile(part2_interface.properties['klamptModelFile'])
part1_model = world.robot(0)
part2_model = world.robot(1)
ee_link = part1_model.numLinks()-1
mount_rotation = ... some so3 element...
mount_translation = ... some translation vector...
part1_model.mount(ee_link,part2_model,mount_rotation,mount_translation) #part1 model is now the whole robot
world.remove(part2_model) #not strictly necessary to remove the model from the world...
klampt_model = world.robot(0)
part1_indices = list(range(part1_model.numDrivers()))
part2_indices = list(range(part1_model.numDrivers(),part1_model.numDrivers()+part2_model.numDrivers()))
#code above should follow
If your robot has a RobotInfo structure associated with it, you'd point its
``controllerFile`` attribute to a file containing code like::
from klampt.control.robotinterfaceutils import OmniRobotInterface
#import Part1RobotInterface and Part2RobotInterface from somewhere
def make(robotModel):
voltron_controller = OmniRobotInterface(robotModel)
part1_controller = Part1RobotInterface()
part2_controller = Part2RobotInterface()
n1 = part1_controller.numJoints()
n2 = part2_controller.numJoints()
assert n1 + n2 == robotModel.numDrivers()
voltron_controller.addPhysicalPart("part1",part1_controller,list(range(n1)))
voltron_controller.addPhysicalPart("part2",part2_controller,list(range(n1,n1+n2)))
#add any virtual parts if your RobotInfo defines them
return voltron_controller
"""
def __init__(self, klamptModel : RobotModel=None):
_RobotInterfaceStatefulBase.__init__(self)
self._structure.parts = dict()
if klamptModel is None:
self._structure.numJoints = 0
self._structure.parts[None] = []
else:
self._structure.klamptModel = klamptModel
self._structure.numJoints = klamptModel.numDrivers()
self._structure.parts[None] = list(range(self._structure.numJoints))
self._physicalParts = dict() # type: Dict[str,_RobotInterfaceStatefulWrapper]
self._physicalPartIndices = dict() # type: Dict[str,List[int]]
self._physicalPartUpdateTimes = dict() # type: Dict[str,float]
self._childPhysicalParts = dict() # type: Dict[str,_RobotInterfaceStatefulWrapper]
self._childPhysicalPartIndices = dict() # type: Dict[str,List[int]]
self._virtualParts = dict() # type: Dict[str,_RobotInterfaceStatefulBase]
self._virtualPartIndices = dict() # type: Dict[str,List[int]]
self._emulator = None
self._filters = dict() # type: Dict[str,Dict[str,Callable]]
self._in_step = False
del self._partInterfaces
self.properties['complete'] = True
[docs] def addPhysicalPart(self, name : str, interface : RobotInterfaceBase, indices : Sequence[int]=None) -> None:
"""Adds a new part corresponding to a physical RIL implementation.
If klamptModel was not provided to the initializer, then this part will
be appended to the list of parts, and indices=None is allowed.
Otherwise, indices needs to be a list of driver indices corresponding
to the part in klamptModel.
"""
if name in self._structure.parts:
raise ValueError("Part named {} already exists".format(name))
n = self.numJoints()
if indices is None:
if self._structure.klamptModel is not None:
raise ValueError("Must specify indices for a physical part when a unified klamptModel has been defined")
try:
ni = interface.numJoints()
except NotImplementedError:
raise ValueError("When building up an OmniRobotInterface, the physical interface must implement numJoints()")
indices = list(range(n,n+ni))
self._structure.numJoints += ni
self._structure.parts[None] = list(range(self._structure.numJoints))
else:
if self._structure.klamptModel is None:
if any(i < n for i in indices):
raise ValueError("When building up an OmniRobotInterface, all indices must be contiguous and added in order")
else:
for i in indices:
assert i >= 0 and i < n,"Invalid index {} must be in range [0,{}]".format(i,n-1)
if not isinstance(interface,_RobotInterfaceStatefulWrapper):
interface = _RobotInterfaceStatefulWrapper(interface)
self._physicalParts[name] = interface
interface.properties['name'] = name
interface.properties['part'] = True
self._physicalPartIndices[name] = indices
self._physicalPartUpdateTimes[name] = 0
self._structure.parts[name] = self._physicalPartIndices[name]
[docs] def addVirtualPart(self, name : str, indices : Sequence[int]) -> None:
if name in self._structure.parts:
raise ValueError("Part named {} already exists".format(name))
n = self.numJoints()
for i in indices:
assert i >= 0 and i < n,"Invalid index {} must be in range [0,{}]".format(i,n-1)
interface = _RobotInterfaceStatefulBase()
interface.properties['name'] = name
interface.properties['part'] = True
interface._structure.controlRate = self._structure.controlRate
interface._structure.numJoints = len(indices)
interface._structure.parts = dict()
interface._structure.parts[None] = list(range(interface._structure.numJoints))
self._virtualParts[name] = interface
self._virtualPartIndices[name] = indices
self._structure.parts[name] = self._virtualPartIndices[name]
[docs] def setFilter(self, name : str, filter : Optional[Callable], input='auto', output='auto') -> None:
"""Enables or disables a filter. The filter is attached to the given
name, which can be a standard filter item or sensor.
Standard filters include:
* 'joint_limit': adjusts position commands
* 'velocity_limit': adjusts velocity commands
* 'acceleration_limit': adjusts acceleration commands
* 'torque_limit': adjusts torque commands
* 'tracking_monitor': monitors sensed vs commanded positions
* 'self_collision': adjusts position commands to avoid collisions
* 'obstacle_collision': adjusts position commands to avoid collisions
* SENSOR_NAME: some sensor for which ``hasSensor(SENSOR_NAME)=True``.
Valid operands include:
* 'sensedPosition', 'sensedVelocity', 'sensedAcceleration', and
'sensedTorque': updated on every time step
* 'commandedPosition', 'commandedVelocity', 'commandedAcceleration',
and 'commandedTorque': updated on every time step
* 'positionCommand', 'velocityCommand', 'torqueCommand': updated when a
position, velocity, or torque command are scheduled for sending to the
robot.
* SENSOR_NAME: some sensor for which ``hasSensor(SENSOR_NAME)=True``.
* 'status': the input / output status
Args:
name (str): the filter identifier
filter (Callable, or None): the filter operator. If None, disables
the filter.
Otherwise, the filter is callable and takes #inputs arguments
(usually 1) and returns #outputs arguments to be stored in the
output (a tuple if #outputs > 1).
Note that to implement a stateful filter, you would create a
class that updates internal state on __call__.
RobotInterfaceCompleter also accepts
:class:`klampt.control.blocks.Block` objects.
input (str or list of str): one or more input operands. If 'auto',
the input operands are auto-determined from the name.
output (str or list of str): one or more output operands. If
'auto', the output operands are auto-determined from the name.
"""
self._emulator.setFilter(self.indices(), name, filter, input, output)
[docs] def setPartFilter(self, part : str, name : str, filter : Optional[Callable], input='auto', output='auto') -> None:
self._emulator.setFilter(self.indices(part), name, filter, input, output)
[docs] def setJointLimits(self, qmin='auto', qmax='auto', op='clamp'):
"""Activates a joint limit filter.
If qmin/qmax are 'auto', these are read from the klampt robot model
or the properties.
If op is...
* 'clamp' then commands are silently clamped to their limits.
* 'stop' a soft-stop is raised.
* 'warn' a warning is printed and the robot silently ignores the
command.
"""
#need to limit to hardware values
hw_qmin,hw_qmax = self.properties.get('joint_limits',(None,None))
self._emulator.setJointLimits(self.indices(),qmin,qmax,op,hw_qmin,hw_qmax)
[docs] def setPartJointLimits(self, part : str, qmin='auto', qmax='auto', op='clamp'):
"""Activates a joint limit filter.
If qmin/qmax are 'auto', these are read from the klampt robot model
or the properties.
If op is...
* 'clamp' then commands are silently clamped to their limits.
* 'stop' a soft-stop is raised.
* 'warn' a warning is printed and the robot silently ignores the
command.
"""
indices = self.indices(part)
#need to limit to hardware values
hw_qmin,hw_qmax = self.properties.get('joint_limits',(None,None))
if hw_qmin is not None:
hw_qmin = [hw_qmin[i] for i in indices]
hw_qmax = [hw_qmax[i] for i in indices]
self._emulator.setJointLimits(indices,qmin,qmax,op,hw_qmin,hw_qmax)
[docs] def setCollisionFilter(self, world : WorldModel=None, op = 'warn'):
"""Activates a collision filter.
If world=None, only does self collision. Otherwise, does self and
world collision.
If op = 'stop', a soft stop is raised if a collision is predicted.
If op = 'warn', a warning is printed and the robot silently ignores
the command.
"""
self._emulator.setCollisionFilter(world,op)
[docs] def initialize(self) -> bool:
if len(self._physicalParts) == 0:
warnings.warn("No physical parts defined")
return False
if self._structure.klamptModel is not None:
for k,iface in self._virtualParts.items():
link_inds = []
for i in self._virtualPartIndices[k]:
link_inds += self._structure.klamptModel.driver(i).getAffectedLinks()
iface._structure.klamptModel = SubRobotModel(self._structure.klamptModel,link_inds)
for k,iface in self._physicalParts.items():
link_inds = []
for i in self._physicalPartIndices[k]:
link_inds += self._structure.klamptModel.driver(i).getAffectedLinks()
iface._structure.klamptModel = SubRobotModel(self._structure.klamptModel,link_inds)
n = self._structure.klamptModel.numDrivers()
else:
n = 0
for k,iface in self._physicalParts.items():
n = max(n,max(self._physicalPartIndices[k])+1)
self._structure.numJoints = n
self._structure.jointNames = [None]*n
#initialize physical interfaces
controlled = [False]*n
for (k,indices) in self._physicalPartIndices.items():
for i in indices:
controlled[i] = True
if not all(controlled):
warnings.error("Not all joints are covered by a physical interface")
return False
for (k,iface) in self._physicalParts.items():
if not iface.initialize():
return False
#update the structure
pindices = self._physicalPartIndices[k]
ni = len(pindices)
for i in range(ni):
try:
ji = iface.jointName(i)
except NotImplementedError:
ji = self._structure.klamptModel.link(self._structure.klamptModel.driver(pindices[i]).getAffectedLink()).getName()
self._structure.jointNames[pindices[i]] = ji
#take the max control rate
if self._structure.controlRate is None:
self._structure.controlRate = iface._structure.controlRate
elif iface._structure.controlRate is not None:
self._structure.controlRate = max(self._structure.controlRate,iface._structure.controlRate)
#discover child physical interfaces
for sub_part,sub_indices in iface.parts().items():
if sub_part is not None:
sub_iface = iface.partInterface(sub_part)
my_indices = [indices[i] for i in sub_indices]
self._childPhysicalParts[k+PART_NAME_SEPARATOR+sub_part] = sub_iface
self._childPhysicalPartIndices[k+PART_NAME_SEPARATOR+sub_part] = my_indices
link_indices = []
for i in my_indices:
link_indices += self._structure.klamptModel.driver(i).getAffectedIndices()
sub_iface.klamptModel = SubRobotModel(self._structure.klamptModel,link_indices)
indices = dict()
indices.update(self._physicalPartIndices)
indices.update(self._childPhysicalPartIndices)
indices.update(self._virtualPartIndices)
settings = dict()
for (k,iface) in self._physicalParts.items():
settings[k] = iface._settings
for (k,iface) in self._childPhysicalParts.items():
settings[k] = iface._settings
for (k,iface) in self._virtualParts.items():
settings[k] = iface._settings
_gather_settings(indices,settings,self._settings)
#initialize emulator with settings
self._emulator = RobotInterfaceEmulator(n,self._structure.klamptModel)
if self._settings.kP is not None and self._settings.kI is not None and self._settings.kD is not None:
self._emulator.setPIDGains(self._settings.kP,self._settings.kI,self._settings.kD)
all_indices = list(range(self.numJoints()))
if self._settings.toolCoordinates is not None:
self._emulator.setToolCoordinates(all_indices,self._settings.toolCoordinates)
if self._settings.gravity is not None:
self._emulator.setGravityCompensation(all_indices,self._settings.gravity,self._settings.load,self._settings.load_com)
for (k,settings) in self._settings.children.items():
assert k in self._structure.parts
if settings.toolCoordinates is not None:
self._emulator.setToolCoordinates(self._structure.parts[k],settings.toolCoordinates)
if settings.gravity is not None:
self._emulator.setGravityCompensation(self._structure.parts[k],settings.gravity,settings.load,settings.load_com)
#can only check filters at this point
for (k,iface) in self._physicalParts.items():
self._emulator._validateFilters(iface,self._physicalPartIndices[k])
for (k,iface) in self._childPhysicalParts.items():
self._emulator._validateFilters(iface,self._childPhysicalPartIndices[k])
for (k,iface) in self._virtualParts.items():
self._emulator._validateFilters(iface,self._virtualPartIndices[k])
#distribute structure and settings across virtual parts -- physical and child physical parts are already updated
settings = _split_settings(self._settings,self._virtualPartIndices)
for (k,iface) in self._virtualParts.items():
iface._settings = settings[k]
iface._structure.jointNames = [self._structure.jointNames[i] for i in self._virtualPartIndices[k]]
link_indices = []
for i in self._virtualPartIndices[k]:
link_indices += self._structure.klamptModel.driver(i).getAffectedLinks()
iface._structure.klamptModel = SubRobotModel(self._structure.klamptModel,link_indices)
return True
[docs] def beginStep(self):
if self._in_step:
raise RuntimeError("beginStep() called without prior endStep()")
for (k,iface) in self._physicalParts.items():
iface.beginStep()
try:
prev_state = self._state
self._state = _RobotInterfaceState()
_gather_state(self._physicalPartIndices,dict((k,iface._state) for (k,iface) in self._physicalParts.items()),self._state)
#initialize emulated state on first try
assert self._structure.controlRate is not None or self._state.rate is not None or self._state.clock is not None,"None of the physical parts implement a clock or rate"
assert self._state.sensedPosition is not None or self._state.commandedPosition is not None
if self._emulator.numUpdates < 0:
if self._state.sensedPosition is None:
self._state.sensedPosition = copy.copy(self._state.commandedPosition)
if self._state.commandedPosition is None:
self._state.commandedPosition = copy.copy(self._state.sensedPosition)
if self._state.commandedVelocity is None:
self._state.commandedVelocity = copy.copy(self._state.sensedVelocity)
if self._state.commandedTorque is None:
self._state.commandedTorque = copy.copy(self._state.sensedTorque)
self._emulator.initialize(self._state.sensedPosition,self._state.sensedVelocity,self._state.sensedTorque,
self._state.commandedPosition,self._state.commandedVelocity,self._state.commandedTorque)
else:
#on later steps, fill parts of state that are not updated by physical parts with emulated state
self._updateStateFromEmulator('commandedPosition')
self._updateStateFromEmulator('commandedVelocity')
self._updateStateFromEmulator('commandedTorque')
self._updateStateFromEmulator('destinationPosition')
self._updateStateFromEmulator('destinationVelocity')
if self._state.children is not None:
for (k,substate) in self._state.children.items():
substate.cartesianState = None
for (k,indices) in self._structure.parts.items():
if tuple(indices) in self._emulator.cartesianInterfaces:
if self._state.children is None:
self._state.children = dict()
if k not in self._state.children:
self._state.children[k] = _RobotInterfacePartState()
self._state.children[k].cartesianState = self._updateCartesianStateFromEmulator(k)
rate = self._state.rate
if rate is None: rate = self._structure.controlRate
self._emulator.advanceClock(self._state.clock,rate)
if self._state.status == 'ok':
self._state.status = self._emulator.status
else:
self._emulator.status = self._state.status
if self._state.clock is None:
self._state.clock = self._emulator.curClock
if self._state.rate is None:
self._state.rate = 1.0/self._emulator.dt
#distribute state across physical parts
states = _split_state(self._state,self._physicalPartIndices)
for (k,iface) in self._physicalParts.items():
iface._state = states[k]
#distribute states across child physical parts
states = _split_state(self._state,self._childPhysicalPartIndices)
for (k,iface) in self._childPhysicalParts.items():
iface._state = states[k]
#distribute state across virtual parts
states = _split_state(self._state,self._virtualPartIndices)
for (k,iface) in self._virtualParts.items():
iface._state = states[k]
#initialize _delta_settings for this and all parts
self._delta_settings = _RobotInterfaceSettings()
for (k,iface) in self._physicalParts.items():
iface._delta_settings = _RobotInterfaceSettings()
for (k,iface) in self._childPhysicalParts.items():
iface._delta_settings = _RobotInterfaceSettings()
for (k,iface) in self._virtualParts.items():
iface._delta_settings = _RobotInterfaceSettings()
except Exception:
for (k,iface) in self._physicalParts.items():
iface.endStep()
raise
self._in_step = True
[docs] def endStep(self):
if not self._in_step:
raise RuntimeError("endStep() called without corresponding beginStep()")
#check settings
indices = dict()
indices.update(self._physicalPartIndices)
indices.update(self._childPhysicalPartIndices)
indices.update(self._virtualPartIndices)
delta_settings = dict()
for (k,iface) in self._physicalParts.items():
delta_settings[k] = iface._delta_settings
for (k,iface) in self._childPhysicalParts.items():
delta_settings[k] = iface._delta_settings
for (k,iface) in self._virtualParts.items():
delta_settings[k] = iface._delta_settings
_gather_settings(indices,delta_settings,self._delta_settings)
#parse settings for virtual parts to determine whether new cartesian controllers have been created
#update emulator settings to match all parts as well
all_indices = list(range(self.numJoints()))
if self._delta_settings.toolCoordinates is not None:
self._emulator.setToolCoordinates(all_indices,self._delta_settings.toolCoordinates)
if self._delta_settings.gravity is not None:
self._emulator.setGravityCompensation(all_indices,self._delta_settings.gravity,self._delta_settings.load,self._delta_settings.load_com)
if self._delta_settings.children is not None:
for k,settings in self._delta_settings.children.items():
if settings.toolCoordinates is not None:
self._emulator.setToolCoordinates(self._structure.parts[k],settings.toolCoordinates)
if settings.gravity is not None:
self._emulator.setGravityCompensation(self._structure.parts[k],settings.gravity,settings.load,settings.load_com)
#TODO: gather prev / current changes of PID
#distribute changed settings across physical interfaces
delta_settings = _split_settings(self._delta_settings,self._physicalPartIndices)
for (k,iface) in self._physicalParts.items():
_update_from_settings(iface,iface._settings,delta_settings[k])
delta_settings = _split_settings(self._delta_settings,self._childPhysicalPartIndices)
for (k,iface) in self._childPhysicalParts.items():
_update_from_settings(None,iface._settings,delta_settings[k])
delta_settings = _split_settings(self._delta_settings,self._virtualPartIndices)
for (k,iface) in self._virtualParts.items():
_update_from_settings(None,iface._settings,delta_settings[k])
self._delta_settings = None
#gather commands
names = [None]
indices = [self.indices()]
commands = [self._commands]
for (k,iface) in self._physicalParts.items():
names.append(k)
indices.append(self._physicalPartIndices[k])
commands.append([])
for cmd in iface._commands:
if cmd.func in ['setControlMode','functionCall']: #custom commands
getattr(iface,cmd.func)(cmd.args[0],*cmd.args[1],**cmd.args[2])
if cmd.func == 'setControlMode':
self._emulator.promote(cmd.indices,None)
else:
commands[-1].append(cmd)
iface._commands = [] #erase primary commands
for (k,iface) in self._childPhysicalParts.items():
names.append(k)
indices.append(self._childPhysicalPartIndices[k])
commands.append([])
for cmd in iface._commands:
if cmd.func in ['setControlMode','functionCall']: #custom commands
getattr(iface,cmd.func)(cmd.args[0],*cmd.args[1],**cmd.args[2])
if cmd.func == 'setControlMode':
self._emulator.promote(cmd.indices,None)
else:
commands[-1].append(cmd)
iface._commands = [] #erase primary commands
for (k,iface) in self._virtualParts.items():
names.append(k)
indices.append(self._virtualPartIndices[k])
commands.append(iface._commands)
iface._commands = [] #erase primary commands
self._commands = _gather_commands(commands,indices)
#set commands to emulator, advance
for cmd in self._commands:
if cmd.func in ['reset']:
getattr(self._emulator,cmd.func)()
else:
if cmd.indices is None: #command to everything
getattr(self._emulator,cmd.func)(all_indices,*cmd.args)
else:
getattr(self._emulator,cmd.func)(cmd.indices,*cmd.args)
self._emulator.update(self._state.sensedPosition,self._state.sensedVelocity)
#distribute to parts
for (k,iface) in self._physicalParts.items():
inds = self._physicalPartIndices[k]
#figure out commands to actually send to the physical parts based on the desired control
#modes of the joints
(desiredControlMode,baseControlMode) = self._emulator.sendToInterface(iface,inds)
#if baseControlMode is not None:
# print("Controller",k,iface,"desired control mode",desiredControlMode,"chosen",baseControlMode)
#TODO: when a value is emulated, set a flag to use emulated values rather than base
for (k,iface) in self._physicalParts.items():
iface.endStep()
iface._in_step = False
for (k,iface) in self._childPhysicalParts.items():
iface._commands = []
iface._in_step = False
for (k,iface) in self._virtualParts.items():
iface._commands = []
self._commands = []
self._in_step = False
def _updateStateFromEmulator(self,attr):
stateval = getattr(self._state,attr)
if stateval is None:
value = getattr(self._emulator,attr)()
if value is None: return
if all(v is None for v in value): return
setattr(self._state,attr,value)
#assert all(v is not None for v in value),"Emulator returned a vector with some Nones for attribute {}: {}?".format(attr,value)
elif any(v is None for v in stateval):
value = getattr(self._emulator,attr)()
if value is None: return
if all(v is None for v in value): return
#assert all(v is not None for v in value),"Emulator returned a vector with some Nones for attribute {}: {}?".format(attr,value)
for i,v in enumerate(stateval):
if v is None:
stateval[i] = value[i]
def _updateCartesianStateFromEmulator(self,part):
state = _RobotInterfaceCartesianState()
indices = self._structure.parts[part]
state.baseSensedPosition = se3.identity()
state.baseCommandedPosition = se3.identity()
state.baseDestinationPosition = se3.identity()
state.baseSensedVelocity = ([0]*3,[0]*3)
state.baseCommandedVelocity = ([0]*3,[0]*3)
state.baseDestinationVelocity = ([0]*3,[0]*3)
first_index = self.klamptModel().driver(indices[0]).getAffectedLink()
base_index = self.klamptModel().link(first_index).getParent()
if base_index >= 0:
if self._state.sensedPosition is not None:
state.baseSensedPosition = self._emulator.cartesianPosition(base_index,self._state.sensedPosition,'world')
if self._state.sensedVelocity is not None:
state.baseSensedVelocity = self._emulator.cartesianVelocity(base_index,self._state.sensedPosition,self._state.sensedVelocity,'world')
if self._state.commandedPosition is not None:
state.baseCommandedPosition = self._emulator.cartesianPosition(base_index,self._state.commandedPosition,'world')
if self._state.commandedVelocity is not None:
state.baseCommandedVelocity = self._emulator.cartesianVelocity(base_index,self._state.commandedPosition,self._state.commandedVelocity,'world')
if self._state.destinationPosition is not None:
state.baseDestinationPosition = self._emulator.cartesianPosition(base_index,self._state.destinationPosition,'world')
if self._state.destinationVelocity is not None:
state.baseDestinationVelocity = self._emulator.cartesianVelocity(base_index,self._state.destinationPosition,self._state.destinationVelocity,'world')
assert self._state.sensedPosition is not None
assert self._state.commandedPosition is not None
state.sensedPosition = self._emulator.cartesianPosition(indices,self._state.sensedPosition,'base')
if self._state.sensedVelocity is not None:
state.sensedVelocity = self._emulator.cartesianVelocity(indices,self._state.sensedPosition,self._state.sensedVelocity,'base')
if self._state.sensedTorque is not None:
state.sensedForce = self._emulator.cartesianForce(indices,self._state.sensedPosition,self._state.sensedTorque,'base')
state.commandedPosition = self._emulator.cartesianPosition(indices,self._state.commandedPosition,'base')
if self._state.commandedVelocity is not None:
state.commandedVelocity = self._emulator.cartesianVelocity(indices,self._state.commandedPosition,self._state.commandedVelocity,'base')
if self._state.commandedTorque is not None:
state.commandedForce = self._emulator.cartesianForce(indices,self._state.commandedPosition,self._state.commandedTorque,'base')
if self._state.destinationPosition is not None:
state.destinationPosition = self._emulator.cartesianPosition(indices,self._state.destinationPosition,'base')
if self._state.destinationVelocity is not None:
state.destinationVelocity = self._emulator.cartesianVelocity(indices,self._state.destinationPosition,self._state.destinationVelocity,'base')
# print("Tool coordinates",self._emulator.getToolCoordinates(indices))
# print("Commanded base",state.baseCommandedPosition)
# print("Commanded position w.r.t. base",state.commandedPosition)
return state
[docs] def partInterface(self,part,joint_idx = None):
if part in self._physicalParts:
iface = self._physicalParts[part]
elif part in self._childPhysicalParts:
iface = self._childPhysicalParts[part]
elif part in self._virtualParts:
iface = self._virtualParts[part]
elif part is None:
if joint_idx is not None:
raise NotImplementedError("Single-joint interfaces")
return self
else:
raise ValueError("Invalid part name {}".format(part))
if joint_idx is not None:
return iface.partInterface(None,joint_idx)
return iface
[docs] def reset(self):
for part,iface in self._physicalParts.items():
if not iface.reset():
return False
return True
[docs]class RobotInterfaceCompleter(RobotInterfaceBase):
"""Completes as much of the RobotInterfaceBase API as possible from a
partially specified RobotInterfaceBase. For example, if your
RobotInterfaceBase implements position control, then the Completer
will provide velocity, piecewise linear, and piecewise cubic, and Cartesian
control commands.
At a minimum, a base must implement :meth:`sensedPosition` and one of the
following:
- :meth:`setPosition`
- :meth:`moveToPosition`
- :meth:`setVelocity`
- :meth:`setPID`
- :meth:`setTorque`
This class will *not* emulate:
- :meth:`status`
- :meth:`sensedTorque`
- Any sensors
- :meth:`functionCall`
- :meth:`setSetting` and `getSetting`
- :meth:`stateValue`
- :meth:`klamptModel`
It will emulate:
- Clock time from control rate using integration
- Control rate from clock time using differences
- Joint velocities from joint positions using finite differences
- Joint positions from joint velocities using integration
- PID control from torque commands
- Torque commands from PID control
- Piecewise linear / piecewise cubic control from position / velocity
/ PID control.
- Cartesian positions / velocities / forces from joint positions /
velocities / torques using the Klamp't model.
- Cartesian control using the Klamp't model and IK
- Optional safety checks (joint limits, velocity limits, discontinuous
commands, and tracking errors)
- E-stopping and soft stopping requiring reset.
.. note::
The base interface's klamptModel() method must be implemented for
Cartesian control and acceleration-bounded control to work properly.
Args:
base_interface (RobotInterfaceBase): the partially-implemented
interface.
base_initialized (bool, optional): if True, initialize() will not
initialize the base.
"""
def __init__(self,base_interface : RobotInterfaceBase, base_initialized=False):
RobotInterfaceBase.__init__(self)
self._base = base_interface
self._baseInitialized = base_initialized
self._parent = None
self._subRobot = False
self._parts = dict()
self._baseParts = None
self._has = dict()
self._indices = None
self._emulator = None #type: RobotInterfaceEmulator
self._emulatorControlMode = None
self._baseControlMode = None
self._inStep = False
self.properties = base_interface.properties.copy()
self.properties['complete'] = True
def __str__(self):
return "Completer("+str(self._base)+')'
[docs] def numJoints(self,part=None):
if self._parts is not None and part in self._parts:
return len(self._parts[part])
return self._base.numJoints(part)
[docs] def parts(self):
if self._parts is None: #pre-initialization
return self._base.parts()
return self._parts
[docs] def addPart(self,name,indices):
"""Can add new parts, e.g., for Cartesian control.
As an example, suppose we've created a RobotInterfaceCompleter for a
6DOF robot with 1DOF gripper, with the base controller using position
control of all 7 DOFs. We can then create an arm and a gripper via::
robotinterface.addPart("arm",list(range(6)))
robotinterface.addPart("gripper",[6])
#start moving the arm upward
robotinterface.partInterface("arm").setCartesianVelocity([0,0,0],[0,0,0.1])
"""
assert name not in self._parts
self._parts[name] = indices
def _try(self,fn,args,fallback=None):
return _try_methods(self._base,fn,args,fallback,self._has)
[docs] def initialize(self):
if self._indices is not None:
warnings.warn("RobotInterfaceCompleter(%s): Should only call initialize() once."%(str(self._base),))
return True
if not self._baseInitialized:
if not self._base.initialize():
warnings.warn("RobotInterfaceCompleter(%s): base controller did not initialize"%(str(self._base),))
return False
self._baseInitialized = True
#discover capabilities
n = self._base.numJoints()
for (part,indices) in self._parts.items():
if any(i<0 or i >= n for i in indices):
warnings.warn("RobotInterfaceCompleter(%s): part has invalid index %d (#joints=%d)"%(str(self._base),max(indices),n))
self._baseParts = self._base.parts()
self._parts.update(self._baseParts)
self._indices = self.indices()
self._try('beginStep',(),0)
self._try('controlRate',(),0)
curclock = self._try('clock',(),0)
self._try('sensedPosition',(),0)
self._try('sensedVelocity',(),0)
self._try('sensedTorque',(),0)
self._try('commandedPosition',(),0)
self._try('commandedVelocity',(),0)
self._try('commandedTorque',(),0)
self._try('destinationPosition',(),0)
self._try('destinationVelocity',(),0)
self._try('destinationTime',(),0)
self._try('queuedTrajectory',(),0)
self._try('endStep',(),0)
if not self._has['controlRate'] and not self._has['clock']:
warnings.warn("RobotInterfaceCompleter(%s): Need at least one of controlRate() and clock() to be implemented"%(str(self._base),))
return False
if not self._has['sensedPosition'] and not self._has['commandedPosition']:
warnings.warn("RobotInterfaceCompleter(%s): Need at least one of sensedPosition() and commandedPosition() to be implemented"%(str(self._base),))
return False
self._emulator = RobotInterfaceEmulator(self._base.numJoints(),self._base.klamptModel())
self._emulator.curClock = curclock
print("RobotInterfaceCompleter({}): Starting with clock {}".format(self._base,curclock))
assert curclock is not None
self._warned = False
return True
[docs] def close(self):
self._base_initialized=False
return self._base.close()
[docs] def startStep(self):
if not self._warned:
warnings.warn("startStep is deprecated, use beginStep",DeprecationWarning)
self._warned = True
self.beginStep()
[docs] def beginStep(self):
assert self._indices is not None,"RobotInterface not initialized yet"
assert not self._subRobot,"Can't do beginStep on a sub-interface"
assert not self._inStep,"beginStep called twice in a row?"
self._inStep = True
self._try('beginStep',(),0)
if self._emulator.numUpdates < 0:
qsns = self._try('sensedPosition',(),lambda *args:None)
vsns = self._try('sensedVelocity',(),lambda *args:None)
tsns = self._try('sensedTorque',(),lambda *args:None)
qcmd = self._try('commandedPosition',(),lambda *args:None)
vcmd = self._try('commandedVelocity',(),lambda *args:None)
tcmd = self._try('commandedTorque',(),lambda *args:None)
self._emulator.initialize(qsns,vsns,tsns,qcmd,vcmd,tcmd)
if not self._has['controlRate']:
self._emulator.advanceClock(self.clock(),None)
elif not self._has['clock']:
self._emulator.advanceClock(None,self.controlRate())
else:
self._emulator.advanceClock(self.clock(),self.controlRate())
[docs] def endStep(self):
assert not self._subRobot,"Can't do endStep on a sub-interface"
assert self._inStep,"endStep called outside of a step?"
self._inStep = False
q = self.sensedPosition()
v = self.sensedVelocity()
try:
self._emulator.update(q,v)
except Exception as e:
self.softStop()
raise e
self._emulatorControlMode,self._baseControlMode = self._emulator.sendToInterface(self._base)
self._try('endStep',(),0)
[docs] def controlRate(self):
def _controlRate_backup(self):
if self._emulator.dt is None:
raise RuntimeError("Base interface doesn't have controlRate, and need to have at least one time step before determining backup controlRate")
return 1.0/self._emulator.dt
return self._try('controlRate',(),lambda :_controlRate_backup(self))
[docs] def clock(self):
return self._try('clock',(),self._emulator.curClock)
[docs] def estop(self):
"""Requires reset"""
self._base.estop()
self._emulator.estop(self._indices)
[docs] def softStop(self):
"""Requires reset"""
self._base.softStop()
self._emulator.softStop(self._indices)
[docs] def reset(self):
if self._emulator.status != 'ok':
self._emulator.reset()
if self._base.status() != 'ok':
return self._base.reset()
return True
[docs] def setSetting(self,key,value):
self._base.setSetting(key,value)
[docs] def getSetting(self,key):
return self._base.getSetting(key)
[docs] def partInterface(self,part,joint_idx=None):
return _SubRobotInterfaceCompleter(self,part,joint_idx)
[docs] def jointName(self,joint_idx):
def modelJointName(idx):
robot = self.klamptModel()
if robot is None:
return 'Joint '+str(idx)
return robot.link(robot.driver(idx).getAffectedLink()).getName()
return self._try('jointName',[joint_idx],modelJointName)
[docs] def setControlMode(self,mode,*args,**kwargs):
if self._emulator.status == 'ok':
self._emulator.promote(self._indices,None) #stop
self._base.setControlMode(mode,*args,**kwargs)
[docs] def functionCall(self,proc,*args,**kwargs):
return self._base.functionCall(proc,*args,**kwargs)
[docs] def setFilter(self, name : str, filter : Optional[Callable], input='auto', output='auto') -> None:
"""Enables or disables a filter. The filter is attached to the given
name, which can be a standard filter item or sensor.
Standard filters include:
* 'joint_limit': adjusts position commands
* 'velocity_limit': adjusts velocity commands
* 'acceleration_limit': adjusts acceleration commands
* 'torque_limit': adjusts torque commands
* 'tracking_monitor': monitors sensed vs commanded positions
* 'self_collision': adjusts position commands to avoid collisions
* 'obstacle_collision': adjusts position commands to avoid collisions
* SENSOR_NAME: some sensor for which ``hasSensor(SENSOR_NAME)=True``.
Valid operands include:
* 'sensedPosition', 'sensedVelocity', 'sensedAcceleration', and
'sensedTorque': updated on every time step
* 'commandedPosition', 'commandedVelocity', 'commandedAcceleration',
and 'commandedTorque': updated on every time step
* 'positionCommand', 'velocityCommand', 'torqueCommand': updated when a
position, velocity, or torque command are scheduled for sending to the
robot.
* SENSOR_NAME: some sensor for which ``hasSensor(SENSOR_NAME)=True``.
* 'status': the input / output status
Args:
name (str): the filter identifier
filter (Callable, or None): the filter operator. If None, disables
the filter.
Otherwise, the filter is callable and takes #inputs arguments
(usually 1) and returns #outputs arguments to be stored in the
output (a tuple if #outputs > 1).
Note that to implement a stateful filter, you would create a
class that updates internal state on __call__.
RobotInterfaceCompleter also accepts
:class:`klampt.control.blocks.Block` objects.
input (str or list of str): one or more input operands. If 'auto',
the input operands are auto-determined from the name.
output (str or list of str): one or more output operands. If
'auto', the output operands are auto-determined from the name.
"""
self._emulator.setFilter(self._indices, name, filter, input, output)
[docs] def setJointLimits(self, qmin='auto', qmax='auto', op='clamp'):
"""Activates a joint limit filter.
If qmin/qmax are 'auto', these are read from the klampt robot model
or the properties.
If op is...
* 'clamp' then commands are silently clamped to their limits.
* 'stop' a soft-stop is raised.
* 'warn' a warning is printed and the robot silently ignores the
command.
"""
#need to limit to hardware values
hw_qmin,hw_qmax = self.properties.get('joint_limits',(None,None))
self._emulator.setJointLimits(self._indices,qmin,qmax,op,hw_qmin,hw_qmax)
[docs] def setCollisionFilter(self, world : WorldModel=None, op = 'warn'):
"""Activates a collision filter.
If world=None, only does self collision. Otherwise, does self and
world collision.
If op = 'stop', a soft stop is raised if a collision is predicted.
If op = 'warn', a warning is printed and the robot silently ignores
the command.
"""
self._emulator.setCollisionFilter(world,op)
[docs] def sensors(self):
return self._try('sensors',[],[])+list(self._emulator.virtualSensorMeasurements.keys())
[docs] def enabledSensors(self):
return self._try('enabledSensors',[],[])+list(self._emulator.virtualSensorMeasurements.keys())
[docs] def hasSensor(self,sensor):
if sensor in self._emulator.virtualSensorMeasurements: return True
return self._try('hasSensor',[sensor],lambda sensor:sensor in self.sensors())
[docs] def enableSensor(self,sensor):
return self._try('enableSensor',[sensor],False)
[docs] def sensorMeasurements(self,sensor):
return self._base.sensorMeasurements(sensor)
[docs] def sensorUpdateTime(self,sensor):
return self._base.sensorUpdateTime(sensor)
[docs] def setPosition(self,q):
assert len(q) == len(self._indices),"Position vector has incorrect length: {} != {}".format(len(q),len(self._indices))
q = self._emulator.commandFilter(self._indices,'positionCommand',q)
self._emulator.setPosition(self._indices,q)
[docs] def setVelocity(self,v,ttl=None):
assert len(v) == len(self._indices),"Velocity vector has incorrect length: {} != {}".format(len(v),len(self._indices))
v = self._emulator.commandFilter(self._indices,'velocityCommand',v)
self._emulator.setVelocity(self._indices,v,ttl)
[docs] def setTorque(self,t,ttl=None):
assert len(t) == len(self._indices),"Torque vector has incorrect length: {} != {}".format(len(t),len(self._indices))
assert ttl is None or isinstance(ttl,(float,int)),"ttl must be a number"
t = self._emulator.commandFilter(self._indices,'torqueCommand',t)
self._emulator.setTorque(self._indices,t,ttl)
[docs] def setPID(self,q,dq,t=None):
assert len(q) == len(self._indices),"Position vector has incorrect length: {} != {}".format(len(q),len(self._indices))
assert len(dq) == len(self._indices),"Velocity vector has incorrect length: {} != {}".format(len(dq),len(self._indices))
q = self._emulator.commandFilter(self._indices,'positionCommand',q)
dq = self._emulator.commandFilter(self._indices,'velocityCommand',dq)
self._emulator.setPID(self._indices,q,dq,t)
[docs] def setPIDGains(self,kP,kI,kD):
assert len(kP) == len(self._indices),"kP vector has incorrect length: {} != {}".format(len(kP),len(self._indices))
assert len(kI) == len(self._indices),"kI vector has incorrect length: {} != {}".format(len(kI),len(self._indices))
assert len(kD) == len(self._indices),"kD vector has incorrect length: {} != {}".format(len(kD),len(self._indices))
assert all(v >= 0 for v in kP),"kP must be nonnegative"
assert all(v >= 0 for v in kI),"kI must be nonnegative"
assert all(v >= 0 for v in kD),"kD must be nonnegative"
try:
self._base.setPIDGains(kP,kI,kD)
except NotImplementedError:
pass
if not hasattr(kP,'__iter__'):
kP = [kP]*len(self._indices)
if not hasattr(kD,'__iter__'):
kD = [kD]*len(self._indices)
if not hasattr(kI,'__iter__'):
kI = [kI]*len(self._indices)
self._emulator.setPIDGains(self._indices,kP,kI,kD)
[docs] def getPIDGains(self):
try:
return self._base.getPIDGains()
except NotImplementedError:
pass
return self._emulator.getPIDGains(self._indices)
[docs] def moveToPosition(self,q,speed=1):
assert len(q) == len(self._indices),"Position vector has incorrect length: {} != {}".format(len(q),len(self._indices))
assert isinstance(speed,(float,int))
q = self._emulator.commandFilter(self._indices,'positionCommand',q)
self._emulator.moveToPosition(self._indices,q,speed)
[docs] def setPiecewiseLinear(self,ts,qs,relative=True):
assert len(ts) == len(qs),"Piecewise linear path must have equal numbers of times and configurations"
for i,q in enumerate(qs):
qs[i] = self._emulator.commandFilter(self._indices,'positionCommand',q)
self._emulator.setPiecewiseLinear(self._indices,ts,qs,relative)
[docs] def setPiecewiseCubic(self,ts,qs,vs,relative=True):
assert len(ts) == len(qs),"Piecewise cubic path must have equal numbers of times and configurations"
assert len(ts) == len(vs),"Piecewise cubic path must have equal numbers of times and velocities"
for i,q in enumerate(qs):
qs[i] = self._emulator.commandFilter(self._indices,'positionCommand',q)
vs[i] = self._emulator.commandFilter(self._indices,'velocityCommand',vs[i])
self._emulator.setPiecewiseCubic(self._indices,ts,qs,vs,relative)
[docs] def setGravityCompensation(self,gravity=[0,0,-9.8],load=0.0,load_com=[0,0,0]):
try:
self._base.setGravityCompensation(gravity,load,load_com)
except NotImplementedError:
pass
self._emulator.setGravityCompensation(self._indices,gravity,load,load_com)
[docs] def getGravityCompensation(self):
try:
return self._base.getGravityCompensation()
except NotImplementedError:
pass
self._emulator.getGravityCompensation(self._indices)
[docs] def setCartesianPosition(self,xparams,frame='world'):
assert len(xparams)==2 or len(xparams)==3, "Cartesian target must be a point or (R,t) transform"
self._emulator.setCartesianPosition(self._indices,xparams,frame)
[docs] def moveToCartesianPosition(self,xparams,speed=1.0,frame='world'):
assert len(xparams)==2 or len(xparams)==3, "Cartesian target must be a point or (R,t) transform"
assert isinstance(speed,(float,int)),"Speed must be a number"
self._emulator.moveToCartesianPosition(self._indices,xparams,speed,frame)
[docs] def moveToCartesianPositionLinear(self,xparams,speed=1.0,frame='world'):
assert len(xparams)==2 or len(xparams)==3, "Cartesian target must be a point or (R,t) transform"
assert isinstance(speed,(float,int)),"Speed must be a number"
self._emulator.moveToCartesianPositionLinear(self._indices,xparams,speed,frame)
[docs] def setCartesianVelocity(self,dxparams,ttl=None,frame='world'):
assert len(dxparams)==2 or len(dxparams)==3, "Cartesian velocity must be a velocity or (angularVelocity,velocity) pair"
assert ttl is None or isinstance(ttl,(int,float)), "ttl must be None or a number"
self._emulator.setCartesianVelocity(self._indices,dxparams,ttl,frame)
[docs] def setCartesianForce(self,fparams,ttl=None,frame='world'):
assert len(fparams)==2 or len(fparams)==3, "Cartesian force must be a force or (torque,force) pair"
assert ttl is None or isinstance(ttl,(int,float)), "ttl must be None or a number"
self._emulator.setCartesianForce(self._indices,fparams,ttl,frame)
[docs] def status(self,joint_idx=None):
if self._emulator.status != 'ok': return self._emulator.status
return self._base.status(joint_idx)
[docs] def isMoving(self,joint_idx=None):
if self._emulator.isMoving(self._indices):
return True
try:
return self._base.isMoving(joint_idx)
except NotImplementedError:
return False
[docs] def sensedPosition(self):
#need to have this implemented on base
return self._base.sensedPosition()
[docs] def sensedVelocity(self):
return self._try('sensedVelocity',[],self._emulator.sensedVelocity)
[docs] def sensedTorque(self):
#need to have this implemented on base
return self._base.sensedTorque()
[docs] def commandedPosition(self):
return self._try('commandedPosition',[],self._emulator.commandedPosition)
[docs] def commandedVelocity(self):
return self._try('commandedVelocity',[],self._emulator.commandedVelocity)
[docs] def commandedTorque(self):
return self._try('commandedTorque',[],self._emulator.commandedTorque)
[docs] def destinationPosition(self):
if self._baseControlMode == self._emulatorControlMode:
#using base interface
return self._try('destinationPosition',[],lambda : self._emulator.destinationPosition())
else:
return self._emulator.destinationPosition()
[docs] def destinationVelocity(self):
if self._baseControlMode == self._emulatorControlMode:
#using base interface
return self._try('destinationVelocity',[],lambda : self._emulator.destinationVelocity())
else:
return self._emulator.destinationVelocity()
[docs] def destinationTime(self):
if self._baseControlMode == self._emulatorControlMode:
#using base interface
return self._try('destinationTime',[],lambda : self._emulator.destinationTime())
else:
return self._emulator.destinationTime()
[docs] def stateValue(self,name):
return self._base.stateValue(name)
[docs] def cartesianPosition(self,q,frame='world'):
if self._baseControlMode == self._emulatorControlMode:
#using base interface
return self._try('cartesianPosition',[q,frame],lambda q,frame: self._emulator.cartesianPosition(self._indices,q,frame))
else:
return self._emulator.cartesianPosition(self._indices,q,frame)
[docs] def cartesianVelocity(self,q,dq,frame='world'):
if self._baseControlMode == self._emulatorControlMode:
#using base interface
return self._try('cartesianVelocity',[q,dq,frame],lambda q,dq,frame: self._emulator.cartesianVelocity(self._indices,q,dq,frame))
else:
return self._emulator.cartesianVelocity(self._indices,q,dq,frame)
[docs] def cartesianForce(self,q,t,frame='world'):
if self._baseControlMode == self._emulatorControlMode:
#using base interface
return self._try('cartesianForce',[q,t,frame],lambda q,t,frame: self._emulator.cartesianForce(self._indices,q,t,frame))
else:
return self._emulator.cartesianForce(self._indices,q,t)
[docs] def sensedCartesianPosition(self,frame='world'):
return self._try('sensedCartesianPosition',[frame],lambda frame: RobotInterfaceBase.sensedCartesianPosition(self,frame))
[docs] def sensedCartesianVelocity(self,frame='world'):
return self._try('sensedCartesianVelocity',[frame],lambda frame: RobotInterfaceBase.sensedCartesianVelocity(self,frame))
[docs] def sensedCartesianForce(self,frame='world'):
return self._try('sensedCartesianForce',[frame],lambda frame: RobotInterfaceBase.sensedCartesianForce(self,frame))
[docs] def commandedCartesianPosition(self,frame='world'):
return self._try('commandedCartesianPosition',[frame],lambda frame: RobotInterfaceBase.commandedCartesianPosition(self,frame))
[docs] def commandedCartesianVelocity(self,frame='world'):
return self._try('commandedCartesianVelocity',[frame],lambda frame: RobotInterfaceBase.commandedCartesianVelocity(self,frame))
[docs] def commandedCartesianForce(self,frame='world'):
return self._try('commandedCartesianForce',[frame],lambda frame: RobotInterfaceBase.commandedCartesianForce(self,frame))
[docs] def destinationCartesianPosition(self,frame='world'):
return self._try('destinationCartesianPosition',[frame],lambda frame: RobotInterfaceBase.destinationCartesianPosition(self,frame))
[docs] def destinationCartesianVelocity(self,frame='world'):
return self._try('destinationCartesianVelocity',[frame],lambda frame: RobotInterfaceBase.destinationCartesianVelocity(self,frame))
[docs] def partToRobotConfig(self,pconfig,part,robotConfig):
return self._base.partToRobotConfig(pconfig,part,robotConfig)
[docs] def klamptModel(self):
if self._klamptModel is not None: return self._klamptModel
return self._base.klamptModel()
[docs] def configFromKlampt(self,klamptConfig,part=None,joint_idx=None):
#TODO: this doesn't work when this is a sub-model
return self._base.configFromKlampt(klamptConfig,part=None,joint_idx=None)
[docs] def velocityFromKlampt(self,klamptVelocity,part=None,joint_idx=None):
#TODO: this doesn't work when this is a sub-model
return self._base.velocityFromKlampt(klamptVelocity,part=None,joint_idx=None)
[docs] def configToKlampt(self,config,klamptConfig=None,part=None,joint_idx=None):
#TODO: this doesn't work when this is a sub-model
return self._base.configToKlampt(config,klamptConfig,part=None,joint_idx=None)
[docs] def velocityToKlampt(self,velocity,klamptVelocity=None,part=None,joint_idx=None):
#TODO: this doesn't work when this is a sub-model
return self._base.velocityToKlampt(velocity,klamptVelocity=None,part=None,joint_idx=None)
[docs] def printStatus(self):
print("****** Status of RobotInterfaceCompleter *********")
print("Base interface",str(self._base))
if self._indices is None:
print("Not initialized")
print("**************************************************")
return
print("Status",self.status())
if len(self._parts) != len(self._baseParts):
newparts = [k for k in self._parts.keys() if k not in self._baseParts]
print("Added parts:",','.join(newparts))
if self._baseControlMode == self._emulatorControlMode:
print("Using base control mode",self._baseControlMode)
else:
print("Using emulator control mode",self._emulatorControlMode,"and base mode",self._baseControlMode)
self._emulator.printStatus()
print("**************************************************")
[docs]class LimitFilter:
"""Checks if the argument is in the range [qmin,qmax]. Can either
clamp to range silently, warn, or perform a stop.
"""
def __init__(self,op,qmin,qmax):
self.qmin = qmin
self.qmax = qmax
self.op = op
if op not in ['clamp','stop','warn']:
raise ValueError("Only supports clamp, stop, and warn")
if op == 'clamp':
clamper = blocks.utils.Clamp()
self.subblock = clamper
else:
checker = blocks.utils.LimitExceeded()
self.subblock = checker
def __call__(self,q):
if self.op == 'clamp':
return self.subblock.advance(q,self.qmin,self.qmax)
else:
if self.subblock.advance(q,self.qmin,self.qmax):
if self.op == 'stop':
raise blocks.utils.BlockSignal('softStop','limit reached')
else:
raise blocks.utils.BlockSignal('warn','limit reached')
return q
[docs]class BrakeLimitFilter:
"""Checks if the argument (q,dq) can brake in time to stay within in the
range [qmin,qmax]. Can either clamp dq to range silently, warn, or
perform a stop.
"""
def __init__(self,op,qmin,qmax,amin,amax):
assert len(qmin)==len(amax)
assert len(qmax)==len(amax)
assert len(amin)==len(amax)
self.qmin = qmin
self.qmax = qmax
self.amin = amin
self.amax = amax
for i in range(qmin):
assert qmin[i] <= qmax[i]
assert amin[i] <= 0 and amax[i] >= 0
self.op = op
if op not in ['clamp','stop','warn']:
raise ValueError("Only supports clamp, stop, and warn")
def __call__(self,q,dq):
assert len(q)==len(dq)
assert len(q)==len(self.amax)
qres = []
vres = []
for i,x in enumerate(q):
v = dq[i]
a = self.amax[i] if v < 0 else self.amin[i]
t = -v/a
xn = x - 0.5*v**2/a
if xn < self.qmin[i] or xn > self.qmax[i]:
if self.op == 'stop':
raise blocks.utils.BlockSignal('softStop','limit reached')
elif self.op == 'warn':
raise blocks.utils.BlockSignal('warn','limit reached')
else:
if x < self.qmin[i]:
x = self.qmin[i]
v = 0
elif x > self.qmax[i]:
x = self.qmax[i]
v = 0
else:
if v < 0:
#x - 0.5*v**2/a = qmin => 2a(x - qmin) = v**2
v = -math.sqrt((x - self.qmin[i])*2*a)
else:
#x - 0.5*v**2/a = qmax => 2a(x - qmax) = v**2
v = math.sqrt((x - self.qmax[i])*2*a)
qres.append(x)
vres.append(v)
return (qres,vres)
[docs]class CartesianLimitFilter:
"""Checks if the rotation is within some range and the translation is
in the range [xmin,xmax]. The rotation range is so3.rpy(Rref^T R) in
[rpymin,rpymax].
Can set Rref,rpymin,rpymax to None to turn off rotation range checking,
and set xmin,xmax to None to turn off translation range checking.
Can either clamp to range silently, warn, or perform a stop.
"""
def __init__(self,op,Rref,rpymin,rpymax,xmin,xmax):
self.Rref = Rref
self.rpymin = rpymin
self.rpymax = rpymax
self.xmin = xmin
self.xmax = xmax
self.op = op
if op not in ['clamp','stop','warn']:
raise ValueError("Only supports clamp, stop, and warn")
def __call__(self,T):
R,t = T
assert len(R)==9
assert len(t)==3
if self.Rref is not None:
Rrel = so3.mul(so3.inv(self.Rref),R)
rpy = so3.rpy(Rrel)
rpynew = [0]*3
for i in range(3):
rpynew[i] = min(max(rpy[i],self.rpymin[i]),self.rpymax[i])
if rpy != rpynew:
if self.op == 'stop':
raise blocks.utils.BlockSignal('softStop','limit reached')
elif self.op == 'warn':
raise blocks.utils.BlockSignal('warn','limit reached')
R = so3.mul(self.Rref,so3.from_rpy(rpynew))
if self.xmin is not None:
tnew = [0]*3
for i in range(3):
tnew[i] = min(max(t[i],self.xmin[i]),self.xmax[i])
if t != tnew:
if self.op == 'stop':
raise blocks.utils.BlockSignal('softStop','limit reached')
elif self.op == 'warn':
raise blocks.utils.BlockSignal('warn','limit reached')
return (R,t)
[docs]class SelfCollisionFilter:
"""Soft-stops if the robot at configuration q is in collision."""
def __init__(self, op:str, robot : 'RobotModel'):
if op not in ['stop','warn']:
raise ValueError("Only supports stop and warn")
self.op = op
self.robot = robot
def __call__(self,q):
self.robot.setConfig(self.robot.configFromDrivers(q))
if self.robot.selfCollides():
if self.op == 'stop':
raise blocks.utils.BlockSignal('softStop','collision')
else:
raise blocks.utils.BlockSignal('warn','collision')
return q
[docs]class CollisionFilter:
"""Soft-stops if the robot at configuration q is in collision."""
def __init__(self, op:str, robot:'RobotModel', obstacles):
if op not in ['stop','warn']:
raise ValueError("Only supports stop and warn")
self.op = op
self.robot = robot
self.robotGeoms = []
for i in range(robot.numLinks()):
g = robot.link(i).geometry()
if not g.empty():
self.robotGeoms.append(g)
self.obstacles = obstacles
def __call__(self,q):
self.robot.setConfig(self.robot.configFromDrivers(q))
for o in self.obstacles:
for g in self.robotGeoms:
if o.collides(g):
if self.op == 'stop':
raise blocks.utils.BlockSignal('softStop','collision')
else:
raise blocks.utils.BlockSignal('warn','collision')
return q
[docs]class DifferenceFilter:
"""Soft-stops if the difference between the two arguments exceed a
threshold. Returns 0 arguments.
"""
def __init__(self,op,threshold,norm=float('inf')):
self.threshold = threshold
self.op = op
if op not in ['stop','warn']:
raise ValueError("Only supports stop and warn")
self.subblock = blocks.utils.Distance(norm)
[docs] def advance(self,q1,q2):
d = self.subblock.advance(q1,q2)
if d > self.threshod:
if self.op == 'stop':
raise blocks.utils.BlockSignal('softStop','difference threshold exceeded')
else:
raise blocks.utils.BlockSignal('warn','difference threshold exceeded')
[docs]class LoggingFilter(blocks.Block):
"""Logs all the input items to fn, which is assumed by default to be a
CSV file.
"""
def __init__(self,fn,items,delim=',',header=True):
super().__init__(items,0)
self.items = items
self.fn = fn
self.file = None
self.delim = delim
self.header = header
[docs] def advance(self,*args):
assert len(args) == len(self.items)
import numpy as np
import os
if self.file is None:
if os.path.exists(self.fn):
self.file = open(self.fn,'a')
else:
self.file = open(self.fn,'w')
#write header
if self.header:
argnames = []
for n,a in zip(self.items,args):
if hasattr(a,'__iter__'):
for i in range(len(a)):
argnames.append(n+'['+str(i)+']')
self.file.write(self.delim.join(argnames))
self.file.write('\n')
channels = np.hstack(args).tolist()
self.file.write(self.delim.join(str(v) for v in channels))
self.file.write('\n')
[docs]def klamptCartesianPosition(model : RobotModel,
q: Vector,
part_indices : List[int],
tool_coordinates : Vector3,
frame :str) -> RigidTransform:
"""Helper for implementers: an implementation of cartesianPosition
that uses the Klampt model.
Args:
robot (RobotModel): the Klampt model
q: configuration for whole robot (#drivers)
part_indices: the driver indices of the part being queried.
tool_coordinates: the local coordinates of the tool point
frame: either 'world' or 'base'
"""
if frame == 'tool':
return se3.identity()
elif frame == 'end effector':
return (so3.identity(),tool_coordinates)
assert len(q) == model.numDrivers()
for i,v in enumerate(q):
if i > part_indices[-1]: break
model.driver(i).setValue(v)
model.setConfig(model.getConfig())
ee_index = model.driver(part_indices[-1]).getAffectedLink()
ee_xform = model.link(ee_index).getTransform()
tool_xform = ee_xform[0],se3.apply(ee_xform,tool_coordinates)
if frame == 'base':
first_link_index = model.driver(part_indices[0]).getAffectedLink()
base_link_index = model.link(first_link_index).getParent()
if base_link_index < 0:
return tool_xform
base_xform = model.link(base_link_index).getTransform()
return se3.mul(se3.inv(base_xform),tool_xform)
elif frame == 'world':
return tool_xform
else:
raise ValueError("Invalid frame specified")
[docs]def klamptCartesianVelocity(model : RobotModel,
q : Vector,
dq : Vector,
part_indices : Sequence[int],
tool_coordinates : Vector3,
frame : str) -> Tuple[Vector3,Vector3]:
"""Helper for implementers: an implementation of cartesianVelocity
that uses the Klampt model.
Args:
q: configuration for whole robot (#drivers)
dq: velocity for whole robot (#drivers)
part_indices: the driver indices of the part being queried.
tool_coordinates: the local coordinates of the tool point
frame: either 'world' or 'base'
"""
assert len(q) == model.numDrivers()
assert len(dq) == model.numDrivers()
for i,v in enumerate(q):
if i > part_indices[-1]: break
model.driver(i).setValue(v)
if frame == 'base' and i < part_indices[0]:
model.driver(i).setVelocity(0) #get relative velocity to base
else:
model.driver(i).setVelocity(dq[i])
model.setConfig(model.getConfig())
ee_index = model.driver(part_indices[-1]).getAffectedLink()
ee = model.link(ee_index)
v = ee.getPointVelocity(tool_coordinates)
w = ee.getAngularVelocity()
if frame == 'base':
first_link_index = model.driver(part_indices[0]).getAffectedLink()
base_link_index = model.link(first_link_index).getParent()
if base_link_index < 0: return (w,v)
Tbase = model.link(base_link_index).getTransform()
Rworld_base = so3.inv(Tbase[0])
return (so3.apply(Rworld_base,w),so3.apply(Rworld_base,v))
elif frame == 'end effector' or frame == 'tool':
Tee = ee.getTransform()
Rworld_ee = so3.inv(Tee[0])
return (so3.apply(Rworld_ee,w),so3.apply(Rworld_ee,v))
elif frame == 'world':
return (w,v)
else:
raise ValueError("Invalid frame specified")
[docs]def klamptCartesianForce(model : RobotModel,
q: Vector,
t: Vector,
part_indices : List[int],
tool_coordinates : Vector3,
frame : str):
"""Helper for implementers: an implementation of cartesianForce
that uses the Klampt model.
Args:
q: configuration for whole robot (#drivers)
t: torques for whole robot (#drivers)
part_indices: the driver indices of the part being queried.
tool_coordinates: the local coordinates of the tool point
frame: either 'world' or 'base'
"""
import numpy as np
assert len(q) == model.numDrivers()
for i,v in enumerate(q):
if i > part_indices[-1]: break
model.driver(i).setValue(v)
model.setConfig(model.getConfig())
ee_index = model.driver(part_indices[-1]).getAffectedLink()
ee = model.link(ee_index)
J = ee.getJacobian(tool_coordinates)
wrench = np.dot(J,model.velocityFromDrivers(t))
torque,force = wrench[:3].tolist(),wrench[3:].tolist()
if frame == 'world':
return (torque,force)
elif frame == 'base':
first_link_index = model.driver(part_indices[0]).getAffectedLink()
base_link_index = model.link(first_link_index).getParent()
if base_link_index < 0: return (torque,force)
Tbase = model.link(base_link_index).getTransform()
Rworld_base = so3.inv(Tbase[0])
return (so3.apply(Rworld_base,torque),so3.apply(Rworld_base,force))
elif frame == 'end effector' or frame == 'tool':
Tee = ee.getTransform()
Rworld_ee = so3.inv(Tee[0])
return (so3.apply(Rworld_ee,torque),so3.apply(Rworld_ee,force))
raise ValueError("Invalid frame specified")
[docs]class ThreadedRobotInterface(_RobotInterfaceStatefulBase):
"""Adapts a synchronous robot interface to an asynchronous robot interface
that runs in a separate thread.
There is no need to run beginStep() and endStep() on this interface.
All query functions will return the results from the prior time step.
All commands are queued, with execution delayed until the next time step.
"""
def __init__(self, interface : RobotInterfaceBase):
_RobotInterfaceStatefulBase.__init__(self)
from threading import Thread, RLock
if not isinstance(interface,_RobotInterfaceStatefulBase):
interface = _RobotInterfaceStatefulWrapper(interface)
self._interface = interface
self._partInterfaces = dict() #type: Dict[str,_RobotInterfaceStatefulBase]
self._thread = Thread(target=self._threadFunc)
self._thread.daemon = True
self._lock = RLock()
self._initialized = None
self.properties = copy.deepcopy(self._interface.properties)
if self.properties.get('asynchronous',False):
warnings.warn("ThreadedRobotInterface: interface is already asynchronous? Suggest not threading it")
self.properties['asynchronous'] = True
self._shortcut_update = False #changes will only be reflected on next time step
for method in _RobotInterfaceStatefulBase.COMPLEX_METHODS:
#lock all complex method accesses
setattr(self,method,_LockedCallable(getattr(self,method),self._lock))
def __str__(self):
return "Threaded({})".format(str(self._interface))
[docs] def log(self,msg):
"""Override this to change thread warning prints"""
print(msg)
[docs] def initialize(self):
if self._initialized is not None:
raise RuntimeError("initialize() can only be called once")
self._quit = False
self._thread.start()
t0 = time.time()
while self._initialized is None:
time.sleep(0.01)
t1 = time.time()
if t1 - t0 > 5:
print("Initialization is taking a long time...")
t0 = t1
#lock all complex method accesses for child interfaces
for part,iface in self._partInterfaces.items():
for method in _RobotInterfaceStatefulBase.COMPLEX_METHODS:
setattr(iface,method,_LockedCallable(getattr(iface,method),self._lock))
return self._initialized
[docs] def close(self):
if self._quit:
warnings.warn("close() called before initialize()?")
elif self._thread is not None:
self._quit = True
self._thread.join()
self._thread = None
else:
warnings.warn("close() called twice?")
def _threadFunc(self):
assert self._initialized is None
from .utils import TimedLooper
try:
if not self._interface.initialize():
self._initialized = False
return
except Exception as e:
self._initialized = False
raise
with self._lock:
#note: can't copy klampt models
robot_model = self._interface._structure.klamptModel
self._interface._structure.klamptModel = None
self._structure = copy.deepcopy(self._interface._structure)
self._interface._structure.klamptModel = robot_model
self._settings = copy.deepcopy(self._interface._settings)
self._structure.klamptModel = None #load a new klampt model to avoid thread clashes
self._structure.klamptModel = self.klamptModel()
if self._structure.klamptModel is None:
warnings.warn("Robot didn't have 'klamptModel' property -- reusing RobotModel object in thread")
self._structure.klamptModel = robot_model
self._init_part_interfaces()
for part,iface in self._partInterfaces.items():
iface.properties['asynchronous'] = True
self._split_settings()
self._clear_deltas()
try:
dt = 1.0/self._interface.controlRate()
except Exception as e:
print("Error calling controlRate()",e)
self._initialized = False
raise
self._initialized = True #tell calling thread that it can continue with initialize()
#begin control loop
looper = TimedLooper(dt=dt,name='RobotInterfaceServer({})'.format(str(self._interface)),warning_printer=self.log)
while looper:
with StepContext(self._interface,ignore=True):
if self._interface._commands:
self.quit = True
raise RuntimeError("Interface didn't clear command queue?")
#write state changes
with self._lock:
self._state = copy.deepcopy(self._interface._state)
self._split_state()
with self._lock:
#read settings changes, command changes and pass to interface
settings,cmds = self._advance_settings_and_commands()
self._interface._delta_settings = settings
self._interface._commands = cmds
if self._quit:
break
self._interface.close()
[docs]class MultiprocessingRobotInterface(_RobotInterfaceStatefulBase):
"""Adapts a synchronous robot interface to an asynchronous robot interface
that runs in a separate process.
There is no need to run beginStep() and endStep() on this interface.
All query functions will return the results from the prior time step.
All commands are queued, with execution delayed until the next time step.
"""
def __init__(self, interface : RobotInterfaceBase):
_RobotInterfaceStatefulBase.__init__(self)
from threading import Thread, RLock
from multiprocessing import Process,Queue
self._interfaceName = str(interface)
self._partInterfaces = dict() #type: Dict[str,_RobotInterfaceStatefulBase]
self._thread = Thread(target=self._threadFunc,args=[interface])
self._thread.daemon = True
self._lock = RLock()
self._initialized = None
sendQueue = Queue()
receiveQueue = Queue()
self._process = Process(target=self._processFunc,args=[interface,sendQueue,receiveQueue])
self._sendQueue = sendQueue
self._receiveQueue = receiveQueue
self.properties = copy.deepcopy(interface.properties)
if self.properties.get('asynchronous',False):
warnings.warn("ThreadedRobotInterface: interface is already asynchronous? Suggest not threading it")
self.properties['asynchronous'] = True
self._shortcut_update = False #changes will only be reflected on next time step
for method in _RobotInterfaceStatefulBase.COMPLEX_METHODS:
#lock all complex method accesses
setattr(self,method,_LockedCallable(getattr(self,method),self._lock))
def __str__(self):
return "Multiprocessing({})".format(self._interfaceName)
[docs] def log(self,msg):
"""Override this to change thread warning prints"""
print(msg)
[docs] def initialize(self):
if self._initialized is not None:
raise RuntimeError("initialize() can only be called once")
self._quit = False
self._thread.start()
self._process.start()
t0 = time.time()
while self._initialized is None:
time.sleep(0.01)
t1 = time.time()
if t1 - t0 > 5:
print("Initialization is taking a long time...")
t0 = t1
return self._initialized
[docs] def close(self):
if self._quit:
warnings.warn("close() called before initialize()?")
elif self._thread is not None:
self._sendQueue.put(('quit',()))
self._process.join()
self._process.close()
self._quit = True
self._thread.join()
self._thread = None
else:
warnings.warn("close() called twice?")
def _threadFunc(self, interface):
assert self._initialized is None
from .utils import TimedLooper
with self._lock:
initial_message = self._receiveQueue.get()
if initial_message is None:
errorMessage = self._receiveQueue.get()
print(" Error message:",errorMessage)
self._initialized = False
return
import pickle
data = pickle.loads(initial_message)
self._structure,self._settings,self._state = data
assert self._state.sensedPosition is not None,"Initial state gave empty sensedPosition?"
#self._structure,self._settings,self._state = initial_message
self._structure.klamptModel = None #use the interface's klampt model -- no risk of thread clashes because the other process has a copy
self._structure.klamptModel = interface.klamptModel()
self._init_part_interfaces()
#mark as asynchronous, lock complex method accesses
for (k,iface) in self._partInterfaces.items():
for method in _RobotInterfaceStatefulBase.COMPLEX_METHODS:
setattr(iface,method,_LockedCallable(getattr(iface,method),self._lock))
iface.properties['asynchronous'] = True
self._split_settings()
self._clear_deltas()
self._initialized = True #tell calling thread that it can continue with initialize()
while True:
with self._lock:
delta_settings,commands = self._advance_settings_and_commands()
self._sendQueue.put(('updateSettingsAndCommands',(delta_settings,commands)))
newState = self._receiveQueue.get()
if newState is None:
print("Controller encountered an error, breaking out of thread.")
errorMessage = self._receiveQueue.get()
print(" Error message:",errorMessage)
break
with self._lock:
self._state = newState
self._split_state()
if self._quit:
break
def _processFunc(self, interface : RobotInterfaceBase, receiveQueue, sendQueue):
assert self._initialized is None
from .utils import TimedLooper
if not isinstance(interface,_RobotInterfaceStatefulBase):
interface = _RobotInterfaceStatefulWrapper(interface)
if not interface.initialize():
sendQueue.put(None) #marks an error
sendQueue.put("Interface {} failed to initialize".format(str(interface._base)))
return
interface.beginStep()
interface.endStep()
dt = 1.0/interface.controlRate()
looper = TimedLooper(dt=dt,name='RobotInterfaceServer({})'.format(self._interfaceName),warning_printer=self.log)
#can't serialize RobotModel
robot_model = interface._structure.klamptModel
interface._structure.klamptModel = None
import pickle
data = pickle.dumps((interface._structure,interface._settings,interface._state))
sendQueue.put(data)
#sendQueue.put((interface._structure,interface._settings,interface._state))
interface._structure.klamptModel = robot_model
firstTime = True
while looper:
try:
with StepContext(interface):
#send state after first loop
if firstTime:
firstTime = False
else:
sendQueue.put(interface._state)
msg = receiveQueue.get()
if len(msg) != 2 or msg[0] != 'updateSettingsAndCommands':
sendQueue.put(None) #marks an error
sendQueue.put("Invalid message from thread, must be a tuple")
interface.endStep()
break
delta_settings,commands = msg[1]
interface._delta_settings = delta_settings
interface._commands = commands
except Exception as e:
sendQueue.put(None) #marks an error
sendQueue.put("Exception from interface: {}".format(str(e)))
break
print("Ending controller process")
class _JointInterfaceEmulatorData:
CONTROL_MODE_PRECEDENCE = {'setPID':0,'setVelocity':1,'setPosition':2,'setPiecewiseLinear':3,'setPiecewiseCubic':4}
def __init__(self,name):
self.name = name
self.dt = None
self.controlMode = None
self.sensedPosition = None
self.sensedVelocity = None
self.commandedPosition = None
self.commandedVelocity = None
self.commandedTorque = None
self.commandTTL = None
self.continuousRotation = False
self.lastCommandedPosition = None
self.commandParametersChanged = False
self.pidCmd = None
self.pidGains = None
self.pidIntegralError = None
self.trajectoryTimes = None
self.trajectoryMilestones = None
self.trajectoryVelocities = None
self.externalController = None
self.commandSent = False
def _positionDifference(self,a,b):
if self.continuousRotation:
return so2.diff(a,b)
else:
return a-b
def update(self,t,q,v,dt):
if v is None:
if self.sensedPosition is None:
self.sensedVelocity = 0
else:
self.sensedVelocity = self._positionDifference(q,self.sensedPosition)/dt
v = self.sensedVelocity
else:
self.sensedVelocity = v
self.sensedPosition = q
if self.commandTTL is not None:
self.commandTTL -= dt
if self.controlMode == 'setPID':
self.commandedPosition = self.pidCmd[0]
self.commandedVelocity = self.pidCmd[1]
self.commandedTorque = self.pidCmd[2]
self.pidIntegralError += self._positionDifference(self.commandedPosition,q)*dt
self.commandTTL = dt*5
elif self.controlMode == 'setPiecewiseLinear' or self.controlMode == 'setPiecewiseCubic':
self.commandedPosition,self.commandedVelocity = self.evalTrajectory(t+dt)
self.commandTTL = dt*5
self.updateTrajectory(t)
self.commandedTorque = 0
elif self.controlMode == 'setPosition' or self.controlMode == 'moveToPosition':
if self.lastCommandedPosition is None:
self.commandedVelocity = 0
else:
self.commandedVelocity = (self.commandedPosition-self.lastCommandedPosition)/dt
self.lastCommandedPosition = self.commandedPosition
elif self.controlMode == 'setVelocity':
self.commandedPosition += self.commandedVelocity*dt
if self.commandTTL is not None and self.commandTTL <= 0:
self.commandedVelocity = 0
self.commandTTL = None
self.controlMode = None
elif self.controlMode == 'setTorque':
if self.commandTTL is not None and self.commandTTL <= 0:
self.commandedTorque = 0
self.commandTTL = None
self.controlMode = None
else:
raise RuntimeError("Invalid control mode, joint {}?".format(self.name))
def getCommand(self,commandType):
if self.controlMode is None:
#getting a command for a joint that hasn't had any commands sent yet
if self.commandedPosition is None:
self.commandedPosition = self.sensedPosition
self.commandedVelocity = 0
if self.sensedPosition is None:
raise ValueError("Trying to get a command for joint {} before sensed position was entered?".format(self.name))
self.controlMode = 'setPosition'
if commandType == 'setPiecewiseCubic':
if self.controlMode == 'setPosition':
if self.commandedVelocity == 0:
return [0],[self.commandedPosition],[0]
else:
dt = self._positionDifference(self.commandedPosition,self.lastCommandedPosition)/self.commandedVelocity
return [dt],[self.commandedPosition],[self.commandedVelocity]
elif self.trajectoryTimes is not None:
return self.trajectoryTimes,self.trajectoryMilestones,self.trajectoryVelocities
else:
print("TODO: convert other control modes to setPiecewiseCubic?")
elif commandType == 'setPiecewiseLinear':
if self.controlMode == 'setPiecewiseCubic':
traj = HermiteTrajectory(self.trajectoryTimes,[[m] for m in self.trajectoryMilestones],[[v] for v in self.trajectoryVelocities])
assert self.dt is not None
configTraj = traj.discretize(self.dt)
return configTraj.times,[m[0] for m in configTraj.milestones]
if self.controlMode == 'setPiecewiseLinear':
return self.trajectoryTimes,self.trajectoryMilestones
elif self.controlMode == 'setPosition':
#construct interpolant... should we do it in 1 time step or stretch it out?
if self.commandedVelocity == 0:
return [0],[self.commandedPosition]
dt = self._positionDifference(self.commandedPosition,self.lastCommandedPosition)/self.commandedVelocity
return [dt],[self.commandedPosition]
elif self.controlMode == 'setVelocity':
#construct interpolant
raise NotImplementedError("Shouldn't ever be in v control model")
else:
raise NotImplementedError("TODO: convert other control types to linear path")
if commandType == 'setPID':
return self.commandedPosition,self.commandedVelocity,self.commandedTorque
elif commandType == 'setTorque':
if self.controlMode == 'setPID':
if self.pidGains is None:
raise RuntimeError("Can't emulate PID control for joint {} using torque control, no gains are set".format(self.name))
qdes,vdes,tdes = self.pidCmd
kp,ki,kd = self.pidGains
t_pid = kp*self._positionDifference(qdes,self.sensedPosition) + kd*(vdes-self.sensedVelocity) + ki*self.pidIntegralError + tdes
#if abs(self.pidIntegralError[i]*ki) > tmax:
#cap integral error to prevent wind-up
return t_pid,self.commandTTL
else:
assert self.controlMode == 'setTorque',"Can't emulate torque control with any command type except for PID and torque control"
return self.commandedTorque,self.commandTTL
elif commandType == 'setPosition' or commandType == 'moveToPosition':
return self.commandedPosition,
elif commandType == 'setVelocity':
assert self.controlMode != 'setPID',"Can't emulate PID control mode with velocity control"
if self.controlMode == 'setVelocity':
return self.commandedVelocity,self.commandTTL
else:
#TODO: make these tunable per-joint?
kFeedforward = 0.8
kTrack = 1.0
vel = kFeedforward*self.commandedVelocity + kTrack*self._positionDifference(self.commandedPosition,self.sensedPosition)
return vel,self.commandTTL
def promote(self,controlType):
if self.controlMode == controlType:
return
if controlType == 'setPID':
self.pidIntegralError = 0.0
self.pidCmd = (self.commandedPosition,(0 if self.commandedVelocity is None else self.commandedVelocity),0.0)
elif controlType == 'setPiecewiseCubic':
if controlType == 'setPiecewiseLinear':
self.trajectoryVelocities = [0]*len(self.trajectoryTimes)
elif self.controlMode in ['setPosition','setVelocity','setTorque']:
if self.commandedPosition is None:
q = self.sensedPosition
else:
q = self.commandedPosition
if self.commandedVelocity is None:
if self.sensedVelocity is None:
v = 0
else:
v = self.sensedVelocity
else:
v = self.commandedVelocity
self.trajectoryTimes = [0]
self.trajectoryMilestones = [q]
self.trajectoryVelocities = [v]
elif self.controlMode == 'setPID':
self.trajectoryTimes = [0]
self.trajectoryMilestones = [self.pidCmd[0]]
self.trajectoryVelocities = [0]
elif controlType == 'setPiecewiseLinear':
if self.controlMode in ['setPosition','setVelocity','setTorque']:
if self.commandedPosition is None:
q = self.sensedPosition
else:
q = self.commandedPosition
self.trajectoryTimes = [0]
self.trajectoryMilestones = [q]
elif self.controlMode == 'setPID':
self.trajectoryTimes = [0]
self.trajectoryMilestones = [self.pidCmd[0]]
self.trajectoryVelocities = None
self.controlMode = controlType
def destinationTime(self,t):
if self.controlMode not in ['setPiecewiseLinear','setPiecewiseCubic']:
return t
return self.trajectoryTimes[-1]
def destinationPosition(self):
if self.controlMode not in ['setPiecewiseLinear','setPiecewiseCubic']:
return self.commandedPosition
return self.trajectoryMilestones[-1]
def destinationVelocity(self):
if self.controlMode not in ['setPiecewiseLinear','setPiecewiseCubic']:
return self.commandedVelocity
if self.controlMode == 'setPiecewiseLinear':
return 0
return self.trajectoryVelocities[-1]
def _getSegment(self,t):
"""Returns the index and interpolation parameter for the
segment at time t.
Running time is O(log n) time where n is the number of segments.
"""
if len(self.trajectoryTimes)==0:
raise ValueError("Empty trajectory")
if len(self.trajectoryTimes)==1:
return (-1,0)
if t >= self.trajectoryTimes[-1]:
return (len(self.trajectoryTimes)-1,0)
if t <= self.trajectoryTimes[0]:
return (0,0)
i = bisect.bisect_right(self.trajectoryTimes,t)
p=i-1
assert i > 0 and i < len(self.trajectoryTimes),"Invalid time index "+str(t)+" in "+str(self.trajectoryTimes)
u=(t-self.trajectoryTimes[p])/(self.trajectoryTimes[i]-self.trajectoryTimes[p])
if i==0:
return (-1,0)
assert u >= 0 and u <= 1
return (p,u)
def evalTrajectory(self,t):
"""Returns (position,velocity) tuple"""
assert len(self.trajectoryTimes) == len(self.trajectoryMilestones)
i,u = self._getSegment(t)
if i<0:
return self.trajectoryMilestones[0],0
if i+1 >= len(self.trajectoryTimes):
return self.trajectoryMilestones[-1],0
dt = self.trajectoryTimes[i+1]-self.trajectoryTimes[i]
if self.trajectoryVelocities is None:
#piecewise linear
dp = self._positionDifference(self.trajectoryMilestones[i+1],self.trajectoryMilestones[i])
pos = self.trajectoryMilestones[i] + u*dp
if dt == 0:
#discontinuity?
vel = 0
else:
vel = dp/dt
return pos,vel
else:
#piecewise cubic
assert len(self.trajectoryTimes) == len(self.trajectoryVelocities)
x1,v1 = [self.trajectoryMilestones[i]],[self.trajectoryVelocities[i]*dt]
x2,v2 = [self.trajectoryMilestones[i+1]],[self.trajectoryVelocities[i+1]*dt]
x2[0] = x1[0] + self._positionDifference(x2[0],x1[0]) #handle continuous rotation joints
x = spline.hermite_eval(x1,v1,x2,v2,u)
dx = vectorops.mul(spline.hermite_deriv(x1,v1,x2,v2,u),1.0/dt)
return x[0],dx[0]
def updateTrajectory(self,t):
i,u = self._getSegment(t)
if i < 0:
return
if i+1 >= len(self.trajectoryTimes):
if len(self.trajectoryTimes) > 0:
self.trajectoryTimes = self.trajectoryTimes[-1:]
self.trajectoryMilestones = self.trajectoryMilestones[-1:]
if self.trajectoryVelocities is not None:
self.trajectoryVelocities = self.trajectoryVelocities[-1:]
#print("STOP",self.trajectoryTimes,self.trajectoryMilestones,self.trajectoryVelocities)
return
#math.log2 is available only in Python 3... convert to math.log(x,2) in Python 2
if i > math.log2(len(self.trajectoryTimes)):
self.trajectoryTimes = self.trajectoryTimes[i:]
self.trajectoryMilestones = self.trajectoryMilestones[i:]
if self.trajectoryVelocities is not None:
self.trajectoryVelocities = self.trajectoryVelocities[i:]
#print("PWL now have",len(self.trajectoryTimes),"milestnoes ending in",self.trajectoryMilestones[-1])
class _CartesianEmulatorData:
def __init__(self,robot,indices):
self.robot = robot
self.indices = indices
self.robotIndices = [robot.driver(i).getAffectedLink() for i in indices]
self.driver = CartesianDriveSolver(robot)
assert indices[-1] == max(indices),"Indices must be in sorted order"
self.eeLink = robot.link(self.robotIndices[-1])
self.toolCoordinates = [0,0,0]
self.t = None
self.active = False
self.mode = None
self.driveCommand = None,None
self.endDriveTime = None
def setToolCoordinates(self,xtool_local):
if self.active:
raise RuntimeError("Can't set tool coordinates while a cartesian velocity command is active")
self.toolCoordinates = xtool_local
def getToolCoordinates(self):
return self.toolCoordinates
def solveIK(self,xparams,frame='world'):
if frame != 'world':
raise NotImplementedError("TODO: Can only handle world frame, for now")
qorig = self.robot.getConfig()
if not self.active:
self.driver.start(qorig,self.robotIndices[-1],endEffectorPositions=self.toolCoordinates)
goal = self.driver.ikGoals[0]
if len(xparams) == 3:
#point-to-point constraint
goal.setFixedPosConstraint(self.toolCoordinates,xparams)
goal.setFreeRotConstraint()
elif len(xparams) == 2:
if len(xparams[0]) != 9 or len(xparams[1]) != 3:
raise ValueError("Invalid IK parameters, must be a point or se3 element")
goal.setFixedPosConstraint(self.toolCoordinates,xparams[1])
goal.setFixedRotConstraint(xparams[0]);
else:
raise ValueError("Invalid IK parameters, must be a point or se3 element")
self.driver.solver.set(0,goal)
err = self.driver.solver.getResidual()
res = self.driver.solver.solve()
if res:
return self.robot.getConfig()
err2 = self.driver.solver.getResidual()
if vectorops.normSquared(err2) < vectorops.normSquared(err):
return self.robot.getConfig()
else:
return qorig
def setCartesianVelocity(self,qcur,dxparams,ttl,frame='world'):
assert len(qcur) == self.robot.numDrivers(),"Invalid length of current configuration: {}!={}".format(len(qcur),self.robot.numDrivers())
assert ttl is None or isinstance(ttl,(int,float)),"Invalid value for ttl: {}".format(ttl)
if frame!='world':
raise NotImplementedError("TODO: Can only handle world frame, for now")
for i,v in enumerate(qcur):
self.robot.driver(i).setValue(v)
qcur = self.robot.getConfig()
if not self.active or self.mode != 'setCartesianVelocity':
self.driver.start(qcur,self.robotIndices[-1],endEffectorPositions=self.toolCoordinates)
self.active = True
self.mode = 'setCartesianVelocity'
if self.t is None:
self.endDriveTime = ttl
elif ttl is not None:
self.endDriveTime = self.t + ttl
else:
self.endDriveTime = None
if len(dxparams) == 2:
if len(dxparams[0]) != 3 or len(dxparams[1]) != 3:
raise ValueError("Invalid IK parameters, must be a 3 vector or a pair of angular velocity / velocity vectors")
self.driveCommand = dxparams
else:
if len(dxparams) != 3:
raise ValueError("Invalid IK parameters, must be a 3 vector or a pair of angular velocity / velocity vectors")
self.driveCommand = None,dxparams
def moveToCartesianPositionLinear(self,qcur,xparams,speed,frame='world'):
assert len(qcur) == self.robot.numDrivers(),"Invalid length of current configuration: {}!={}".format(len(qcur),self.robot.numDrivers())
assert speed is None or speed >= 0,"Invalid speed, must be positive"
if len(xparams) == 2:
if len(xparams[0]) != 9 or len(xparams[1]) != 3:
raise ValueError("Invalid IK parameters, must be a 3 vector or a RigidTransform")
else:
if len(xparams) != 3:
raise ValueError("Invalid IK parameters, must be a 3 vector or a RigidTransform")
if frame!='world':
raise NotImplementedError("TODO: Can only handle world frame, for now")
for i,v in enumerate(qcur):
self.robot.driver(i).setValue(v)
qcur = self.robot.getConfig()
if not self.active or self.mode != 'moveToCartesianPositionLinear':
self.driver.start(qcur,self.robotIndices[-1],endEffectorPositions=self.toolCoordinates)
self.active = True
self.mode = 'moveToCartesianPositionLinear'
if speed is None:
speed = 1.0 #TODO: determine a good speed from the distance traveled
self.driveCommand = (xparams,1.0/speed)
self.endDriveTime = None
def update(self,qcur,t,dt):
if self.t is None:
if self.endDriveTime is not None:
self.endDriveTime = t + self.endDriveTime
self.t = t
if not self.active:
return
#print("CartesianEmulatorData update",t)
if self.endDriveTime is not None and t > self.endDriveTime:
self.active = False
return
assert len(qcur) == self.robot.numDrivers()
for i,v in enumerate(qcur):
self.robot.driver(i).setValue(v)
qcur_rob = self.robot.getConfig()
if self.mode == 'setCartesianVelocity':
#print("Drive command",self.driveCommand,"dt",dt)
(amt,q) = self.driver.drive(qcur_rob,self.driveCommand[0],self.driveCommand[1],dt)
self.robot.setConfig(q)
#print("Result",amt,q)
else: #moveToCartesianLinear
xdesired,remainingTime = self.driveCommand
if remainingTime <= dt:
remainingTime = dt
delta = 0
xcur = self.cartesianPosition(qcur)
assert len(xcur) == len(xdesired),"Invalid result from cartesianPosition or args to moveToCartesianPositionLinear"
if len(xdesired)==3:
v = vectorops.mul(vectorops.sub(xdesired,xcur),1.0/remainingTime)
(amt,q) = self.driver.drive(qcur_rob,None,v,dt)
delta = vectorops.norm(v)
else:
assert len(xdesired)==2
assert len(xdesired[0])==9
assert len(xdesired[1])==3
wv = vectorops.mul(se3.error(self.driveCommand[0],xcur),1.0/remainingTime)
w = wv[:3]
v = wv[3:]
delta = vectorops.norm(wv)
(amt,q) = self.driver.drive(qcur_rob,w,v,dt)
if amt*delta < 1e-8: #done
self.active = False
self.robot.setConfig(q)
remainingTime = self.driveCommand[1]-dt*amt
self.driveCommand = (self.driveCommand[0],remainingTime)
return [self.robot.driver(i).getValue() for i in self.indices]
def cartesianPosition(self,q,frame='world'):
assert len(q) == self.robot.numDrivers()
return klamptCartesianPosition(self.robot,q,self.indices,self.toolCoordinates,frame)
def cartesianVelocity(self,q,dq,frame='world'):
assert len(q) == self.robot.numDrivers()
assert len(dq) == self.robot.numDrivers()
return klamptCartesianVelocity(self.robot,q,dq,self.indices,self.toolCoordinates,frame)
def cartesianForce(self,q,t,frame='world'):
assert len(q) == self.robot.numDrivers()
assert len(t) == self.robot.numDrivers()
return klamptCartesianForce(self.robot,q,t,self.indices,self.toolCoordinates,frame)
[docs]class RobotInterfaceEmulator:
"""Emulates a fully-functional motion controller in software. All motion
commands and queries are available, but the caller must always specify
which indices are being used. Most prominently used in
:class:`RobotInterfaceCompleter` and :class:`OmniRobotInterface`
Joint control usage::
interface = MyRobotInterfaceBase()
emulator = RobotInterfaceEmulator(interface.numJoints(),klampt_model)
interface.beginStep()
qcmd = interface.commandedPosition() #query current configuration
interface.endStep()
emulator.initCommand(qcmd,[0]*len(qcmd),None) #initialize with current commanded values
move_to_sent = False
while True:
t = interface.clock()
emulator.advanceClock(t,None) #can also advance by rate
q,v = interface.sensedPosition(),interface.sensedVelocity()
emulator.update(q,v)
if not move_to_sent:
part1_indices = [0,1,4,5] # whatever indices are available
part1_config = [0.4,0.3,-0.2,0.5]
emulator.moveTo(part1_indices,part1_config) #begins a move-to-command
part2_indices = [2,3] # some other indices
part2_velocity = [0.1,0.3]
emulator.setVelocity(part2_indices,part2_velocity,1.5) #begins a velocity command for 1.5s
move_to_sent = True
#after the first step, the emulator will generate position commands that execute the move-to
q = emulator.getCommand('setPosition') #argument is the desired command type
interface.setPosition(q)
emulator.onBaseCommand('setPosition','setPosition') #indicates that we successfully executed that command type
To minimize the number of instructions sent to the interface or to dispatch
to different command types in the base interface::
dcm = emulator.desiredControlMode()
if dcm is not None: #desiredControlMode() returns None if done with sending,
q = emulator.getCommand('setPosition') #argument is the desired command type
interface.setPosition(q) #send to the actual controller
emulator.onBaseCommand(dcm,'setPosition') #indicates that we successfully executed that command type
Note that setPosition and setVelocity are always available as command
types.
To run Cartesian control::
#setup
six_dof_indices = [0,1,2,3,4,5]
tool_coords = (0,0,0.1)
emulator.setToolCoordinates(six_dof_indices,tool_coords)
while True:
... #do standard emulator setup
if do_cartesian_move:
#read transform and send a move to
target_shift = [0.2,0,0]
Tcmd = emulator.commandedCartesianPosition(six_dof_indices)
Ttarget = (Tcmd[0],vectorops.add(Tcmd[0],target_shift)
emulator.moveToCartesianPosition(six_dof_indices,Ttarget)
do_cartesian_move = False
... #do standard emulator command generation. The emulator will solve for IK automatically.
"""
CONTROL_MODES = ['setPID','setVelocity','setPosition','moveToPosition','setPiecewiseLinear','setPiecewiseCubic']
def __init__(self, nd : int, klamptModel : RobotModel):
self.klamptModel = klamptModel
self.curClock = None
self.dt = None
self.numUpdates = -1
self.status = 'ok'
self.jointData = [_JointInterfaceEmulatorData('Joint '+str(i)) for i in range(nd)]
if klamptModel is not None:
#find any continuous rotation joints
for i in range(nd):
d = klamptModel.driver(i)
if len(d.getAffectedLinks())==1:
link = d.getAffectedLinks()[0]
if klamptModel.getJointType(link)=='spin':
#print("RobotInterfaceEmulator: Interpreting joint",i," as a spin joint")
self.jointData[i].continuousRotation = True
self.cartesianInterfaces = dict() #type: Dict[str,_CartesianEmulatorData]
self.stateFilters = dict() #type: Dict[Sequence[int],Dict[str,Callable]]
self.commandFilters = dict() #type: Dict[Sequence[int],Dict[str,Callable]]
self.virtualSensorMeasurements = dict() #type: Dict[str,Tuple[float,Vector]]
[docs] def initialize(self,qsns,vsns,tsns,qcmd,vcmd,tcmd):
"""Could be called before the emulator starts running to initialize the
commanded joint positions before the emulator takes over.
"""
assert qcmd is None or len(qcmd) == len(self.jointData)
assert vcmd is None or len(vcmd) == len(self.jointData)
assert tcmd is None or len(tcmd) == len(self.jointData)
self.numUpdates = 0
for i,j in enumerate(self.jointData):
j.sensedPosition = qsns[i]
j.sensedVelocity = 0 if vsns is None else vsns[i]
#j.sensedTorque = None if tsns is None else tsns[i]
if qcmd is not None:
j.commandedPosition = qcmd[i]
else:
j.commandedPosition = qsns[i]
if vcmd is not None:
j.commandedVelocity = vcmd[i]
else:
j.commandedVelocity = 0
if tcmd is not None and tcmd[i] is not None:
j.commandedTorque = tcmd[i]
else:
j.commandedTorque = 0
[docs] def advanceClock(self,newClock,rate):
if self.numUpdates < 0:
raise RuntimeError("Need to call initialize before advanceClock")
lastClock = self.curClock
if rate is None:
self.curClock = newClock
if lastClock is not None and lastClock != self.curClock:
self.dt = self.curClock - lastClock
elif newClock is None:
self.dt = 1.0/rate
if self.curClock is None: self.curClock = 0
self.curClock = self.curClock + self.dt
else:
self.dt = 1.0/rate
if abs(self.curClock + self.dt - newClock) > 1e-3:
warnings.warn("Time advance doesn't seem consistent? Last clock {}, dt {}, predicted clock {}, reported clock {}".format(lastClock,self.dt,lastClock+self.dt,newClock))
lastClock = newClock - self.dt
self.curClock = newClock
[docs] def update(self,q,v):
"""Advances the interface with the sensed config q and sensed velocity v.
"""
assert q is not None,"Must pass a valid sensed config to update"
if self.numUpdates < 0:
raise RuntimeError("Need to call initialize before update")
self.numUpdates += 1
if v is None: #can accept null velocities
v = [None]*len(q)
qcmd = q
try:
qcmd = self.getCommand('setPosition')[0]
except Exception:
pass
for inds,c in self.cartesianInterfaces.items():
qdes = c.update(qcmd,self.curClock,self.dt)
if qdes is not None: #cartesian controller is active -- change joints to setPosition
assert len(qdes) == len(c.indices)
for i,x in zip(inds,qdes):
self.jointData[i].promote('setPosition')
self.jointData[i].commandSent = False
self.jointData[i].commandedVelocity = (x-self.jointData[i].commandedPosition)/self.dt
self.jointData[i].commandedPosition = x
self.jointData[i].commandTTL = 5.0*self.dt
for i,j in enumerate(self.jointData):
j.dt = self.dt
j.update(self.curClock,q[i],v[i],self.dt)
[docs] def sendToInterface(self,interface: RobotInterfaceBase, indices = None) -> Tuple[str,str]:
"""Sends a command to the interface using the highest level interface
as possible.
Returns:
tuple: (desiredControlMode,baseControlMode) indicating which mode was
desired and which was actually achieved.
"""
desiredControlMode = self.desiredControlMode(indices)
baseControlMode = None
if desiredControlMode is None:
#no control needed
return None,None
if not hasattr(interface,'_control_mode_map'): #initialize method cache
interface._control_mode_map = dict()
if desiredControlMode in interface._control_mode_map:
baseControlMode = interface._control_mode_map[desiredControlMode]
args = self.getCommand(baseControlMode,indices)
getattr(interface,baseControlMode)(*args)
else:
if desiredControlMode == 'setPiecewiseCubic':
res=_try_methods(interface,['setPiecewiseCubic','setPiecewiseLinear','setPID','setPosition','setVelocity','moveToPosition','setTorque'],
[lambda:self.getCommand('setPiecewiseCubic',indices),
lambda :self.getCommand('setPiecewiseLinear',indices),
lambda :self.getCommand('setPID',indices),
lambda :self.getCommand('setPosition',indices),
lambda :self.getCommand('setVelocity',indices),
lambda :self.getCommand('moveToPosition',indices),
lambda :self.getCommand('setTorque',indices)])
baseControlMode = res[1]
elif desiredControlMode == 'setPiecewiseLinear':
res=_try_methods(interface,['setPiecewiseLinear','setPID','setPosition','setVelocity','moveToPosition','setTorque'],
[lambda:self.getCommand('setPiecewiseLinear',indices),
lambda :self.getCommand('setPID',indices),
lambda :self.getCommand('setPosition',indices),
lambda :self.getCommand('setVelocity',indices),
lambda :self.getCommand('moveToPosition',indices),
lambda :self.getCommand('setTorque',indices)])
baseControlMode = res[1]
elif desiredControlMode == 'setPID':
res=_try_methods(interface,['setPID','setTorque','setPosition','setVelocity','moveToPosition'],
[lambda:self.getCommand('setPID',indices),
lambda :self.getCommand('setTorque',indices),
lambda :self.getCommand('setPosition',indices),
lambda :self.getCommand('setVelocity',indices),
lambda :self.getCommand('moveToPosition',indices)])
baseControlMode = res[1]
elif desiredControlMode == 'setPosition':
res=_try_methods(interface,['setPosition','setVelocity','setPID','moveToPosition','setPiecewiseLinear','setTorque'],
[lambda:self.getCommand('setPosition',indices),
lambda :self.getCommand('setVelocity',indices),
lambda :self.getCommand('setPID',indices),
lambda :self.getCommand('moveToPosition',indices),
lambda :self.getCommand('setPiecewiseLinear',indices),
lambda :self.getCommand('setTorque',indices)])
baseControlMode = res[1]
elif desiredControlMode == 'setVelocity':
res=_try_methods(interface,['setVelocity','setPosition','setPID','moveToPosition','setPiecewiseLinear','setTorque'],
[lambda:self.getCommand('setVelocity',indices),
lambda :self.getCommand('setPosition',indices),
lambda :self.getCommand('setPID',indices),
lambda :self.getCommand('moveToPosition',indices),
lambda :self.getCommand('setPiecewiseLinear',indices),
lambda :self.getCommand('setTorque',indices)])
baseControlMode = res[1]
elif desiredControlMode == 'setTorque':
res=_try_methods(interface,['setTorque','setPID'],
[lambda:self.getCommand('setTorque',indices),
lambda :self.getCommand('setPID',indices)])
baseControlMode = res[1]
else:
raise RuntimeError("Invalid emulator control type? "+','.join(str(j.controlMode) for j in self.jointData))
interface._control_mode_map[desiredControlMode] = baseControlMode
if isinstance(indices,list):
indices = tuple(indices)
stateFilters = self.stateFilters.get(indices,None)
if stateFilters is not None:
#process filters
input_variables = {}
#query the inputs as needed
for k,(filter,inputs,outputs) in stateFilters.items():
skip = False
for v in inputs:
if v in input_variables: continue
try:
input_variables[v] = self._getFilterInput(indices,v)
except ValueError:
if interface.hasSensor(v):
#acts on a sensor, but the sensor is not enabled
skip = True
else:
raise
if skip: continue
args = [input_variables[v] for v in inputs]
results = self._runFilter(k,filter,args)
for k,r in enumerate(outputs,results):
if k in self.virtualSensorMeasurements:
self.virtualSensorMeasurements[k] = (self.curClock,r)
else:
raise RuntimeError("Invalid output item {}".format(k))
self.onBaseCommand(desiredControlMode, baseControlMode, indices)
return (desiredControlMode,baseControlMode)
[docs] def desiredControlMode(self,indices=None):
"""Returns the highest level control mode requested of the joints.
Returns None if no command is active.
if indices is not None, then it returns the desired control mode for
the joints referred to by indices.
"""
activeJoints = self.jointData
if indices is not None:
activeJoints = [self.jointData[j] for j in indices]
if all(j.commandSent for j in activeJoints):
return None
commonMode = None
precedence = 10000
for j in activeJoints:
if j.controlMode != commonMode:
if commonMode is None:
commonMode = j.controlMode
elif j.controlMode is None:
pass
elif _JointInterfaceEmulatorData.CONTROL_MODE_PRECEDENCE[j.controlMode] < precedence:
precedence = _JointInterfaceEmulatorData.CONTROL_MODE_PRECEDENCE[j.controlMode]
commonMode = j.controlMode
if commonMode is not None and any(j.controlMode is None for j in activeJoints):
#some joint controller is off but others are on.
#shouldn't promote to piecewise linear or piecewise cubic
if precedence > _JointInterfaceEmulatorData.CONTROL_MODE_PRECEDENCE['setPosition']:
commonMode = 'setPosition'
return commonMode
[docs] def getCommand(self,commandType,indices=None):
"""Retrieves the args for the command to be sent to the actual
interface on this time step.
if indices is not None, then it returns the command for
the joints referred to by indices.
"""
activeJoints = self.jointData
if indices is not None:
activeJoints = [self.jointData[j] for j in indices]
if isinstance(indices,list):
indices = tuple(indices)
res = [j.getCommand(commandType) for j in activeJoints]
commandFilters = self.commandFilters.get(indices,None)
if commandFilters and self.desiredControlMode(indices) != commandType:
#if command items have been emulated, run the appropriate command filters here
underlyingCommandTypes = []
if commandType in ['setPosition','moveToPosition']: underlyingCommandTypes = ['positionCommand']
elif commandType == 'setVelocity': underlyingCommandTypes = ['velocityCommand']
elif commandType == 'setTorque': underlyingCommandTypes = ['torqueCommand']
elif commandType == 'setPiecewiseLinear': underlyingCommandTypes = ['positionCommand']
elif commandType == 'setPiecewiseCubic': underlyingCommandTypes = ['velocityCommand']
#process filters
input_variables = {}
for (name,(filter,inputs,outputs)) in commandFilters.items():
if any(c in outputs for c in underlyingCommandTypes):
args = []
for i in inputs:
if i not in input_variables:
input_variables[i] = self._getFilterInput(indices,i)
args.append(input_variables[i])
#print("Extracting",commandType,":",len(res),"values for", len(indices) if indices else 'all',"joints")
if commandType in ['setVelocity','setTorque']:
#extract out the TTL, take the max
ttl = None
if any(x[1] is not None for x in res):
ttl = max(x[1] for x in res if x[1] is not None)
cmd = list(x[0] for x in res)
return cmd,ttl
if commandType == 'setPiecewiseLinear':
unifiedTimes = None
nonuniform = False
for times,positions in res:
if unifiedTimes is None:
unifiedTimes = times
elif unifiedTimes != times:
nonuniform = True
unifiedTimes = list(sorted(set(unifiedTimes)|set(times)))
unifiedMilestones = []
joint_trajs = []
if nonuniform:
for traj in res:
remeshed = Trajectory(traj[0],[[v] for v in traj[1]]).remesh(unifiedTimes)[0]
joint_trajs.append([m[0] for m in remeshed.milestones])
assert len(joint_trajs[-1])==len(unifiedTimes)
else:
joint_trajs = [traj[1] for traj in res]
assert all(len(traj)==len(unifiedTimes) for traj in joint_trajs)
for i in range(len(unifiedTimes)):
unifiedMilestones.append([traj[i] for traj in joint_trajs])
t0 = self.curClock if self.curClock is not None else 0
futureTimes = []
futureMilestones = []
for i,t in enumerate(unifiedTimes):
if t > t0:
futureTimes.append(t-t0)
futureMilestones.append(unifiedMilestones[i])
# if len(futureTimes)==0:
# warnings.warn("getCommand is returning empty command because no times are after current time???")
return futureTimes,futureMilestones
if commandType == 'setPiecewiseCubic':
unifiedTimes = None
for times,positions,velocities in res:
if unifiedTimes is None:
unifiedTimes = times
elif unifiedTimes != times:
raise NotImplementedError("TODO: convert nonuniform piecewise cubic times to position control")
unifiedMilestones = []
for i in range(len(unifiedTimes)):
unifiedMilestones.append([traj[1][i] for traj in res])
unifiedVelocities = []
for i in range(len(unifiedTimes)):
unifiedVelocities.append([traj[2][i] for traj in res])
t0 = self.curClock if self.curClock is not None else 0
futureTimes = []
futureMilestones = []
futureVelocities = []
for i,t in enumerate(unifiedTimes):
if t > t0:
futureTimes.append(t-t0)
futureMilestones.append(unifiedMilestones[i])
futureVelocities.append(unifiedVelocities[i])
# if len(futureTimes)==0:
# warnings.warn("getCommand is returning empty command because no times are after current time???")
return futureTimes,futureMilestones,futureVelocities
return list(zip(*res))
[docs] def onBaseCommand(self,desiredControlMode,sentControlMode,indices=None):
"""To be called when the completer has decided which base control mode to use."""
if desiredControlMode is None:
return
activeJoints = self.jointData
if indices is not None:
activeJoints = [self.jointData[j] for j in indices]
if desiredControlMode in ['setPiecewiseCubic','setPiecewiseLinear']:
if sentControlMode in ['setPiecewiseCubic','setPiecewiseLinear']:
for j in activeJoints: j.commandSent = True
elif desiredControlMode == 'setPID':
if sentControlMode in ['setPID','setPosition','moveToPosition']:
for j in activeJoints: j.commandSent = True
elif desiredControlMode == 'setPosition':
if sentControlMode in ['setPosition','moveToPosition','setPiecewiseLinear']:
for j in activeJoints: j.commandSent = True
elif desiredControlMode == 'setVelocity':
if sentControlMode == ['setVelocity','setPiecewiseLinear']:
for j in activeJoints: j.commandSent = True
elif desiredControlMode == 'setTorque':
if sentControlMode in ['setTorque','setPID']:
for j in activeJoints: j.commandSent = True
if desiredControlMode not in ['setPiecewiseCubic','setPiecewiseLinear']:
for j in self.jointData: #continue with trajectory if any joint wants it
if j.controlMode in ['setPiecewiseCubic','setPiecewiseLinear']:
j.commandSent = False
[docs] def setFilter(self,indices, name:str, filter:Callable, input='auto', output='auto') -> None:
if isinstance(indices,list):
indices = tuple(indices)
assert isinstance(indices,tuple),"Don't accept None as indices"
commandFilters = self.commandFilters.setdefault(indices,dict())
stateFilters = self.stateFilters.setdefault(indices,dict())
if filter is None:
if name in commandFilters:
del commandFilters[name]
if name in stateFilters:
del stateFilters[name]
return
if input == 'auto':
if name in ['joint_limit','self_collision','obstacle_collision']:
input = 'positionCommand'
elif name == 'velocity_limit':
input = 'velocityCommand'
elif name == 'acceleration_limit':
input = 'accelerationCommand'
elif name == 'torque_limit':
input = 'torqueCommand'
elif name == 'tracking_monitor':
input = ['commandedPosition','sensedPosition']
else:
input = name
if output == 'auto':
if name in ['joint_limit','self_collision','obstacle_collision']:
output = 'positionCommand'
elif name == 'velocity_limit':
output = 'velocityCommand'
elif name == 'acceleration_limit':
output = 'accelerationCommand'
elif name == 'torque_limit':
output = 'torqueCommand'
elif name == 'tracking_monitor':
output = []
else:
output = name
if not isinstance(input,(list,tuple)):
input = [input]
COMMANDS = ['positionCommand','velocityCommand','accelerationCommand','torqueCommand']
STATES = MOTION_QUERY_METHODS
#validate inputs
for i in input:
if i in COMMANDS:
continue
try:
meth = getattr(self,i)
except AttributeError:
try:
meth = getattr(self,'get'+i[0].upper()+i[1:])
except AttributeError:
raise ValueError("Input {} isn't recognized".format(i))
if output is None:
output = []
elif not isinstance(output,(list,tuple)):
output = [output]
for o in output:
if o in STATES:
raise ValueError("Can't output an existing state variable")
if o not in COMMANDS:
self.virtualSensorMeasurements[o] = (None,None)
if isinstance(filter,blocks.Block):
block = filter
filter = lambda *args:block.advance(*args)
if all(i in COMMANDS for i in input):
if all(o in COMMANDS for o in output):
commandFilters[name] = (filter,input,output)
elif any(o in COMMANDS for o in output):
raise NotImplementedError("TODO: mixed command / state filters")
else:
stateFilters[name] = (filter,input,output)
else:
if any(o in COMMANDS for o in output):
raise NotImplementedError("TODO: mixed command / state filters")
stateFilters[name] = (filter,input,output)
def _validateFilters(self, interface : RobotInterfaceBase, indices):
COMMANDS = ['positionCommand','velocityCommand','accelerationCommand','torqueCommand']
STATES = MOTION_QUERY_METHODS
if isinstance(indices,list):
indices = tuple(indices)
stateFilters = self.stateFilters.get(indices,dict())
commandFilters = self.commandFilters.get(indices,dict())
for (name,(filter,input,output)) in stateFilters.items():
if not callable(filter):
raise ValueError("Invalid filter, not callable")
isSensor = False
for i in input:
if i not in COMMANDS and i not in STATES and i not in self.virtualSensorMeasurements:
if interface.hasSensor(i):
isSensor = True
else:
raise ValueError("Invalid input to filter {}, input {} isn't a sensed or commanded value".format(name,i))
for o in output:
if o not in self.virtualSensorMeasurements:
raise ValueError("Invalid output of filter {}, output {} isn't a virtual sensor".format(name,i))
for (name,(filter,input,output)) in commandFilters.items():
if not callable(filter):
raise ValueError("Invalid filter, not callable")
isSensor = False
for i in input:
if i not in COMMANDS and i not in STATES and i not in self.virtualSensorMeasurements:
if interface.hasSensor(i):
isSensor = True
else:
raise ValueError("Invalid input to filter {}, input {} isn't a sensed or commanded value".format(name,i))
for o in output:
if o not in COMMANDS:
raise ValueError("Invalid output of filter {}, output {} isn't a commanded value".format(name,i))
[docs] def setJointLimits(self,indices,qmin,qmax,op,hw_qmin=None,hw_qmax=None):
if qmin == 'auto' or qmax == 'auto':
model = self.klamptModel
if model is None:
if hw_qmin is None:
raise ValueError("Can't set a joint limit filter with 'auto' limits when klamptModel is not defined")
else:
if qmin == 'auto': qmin = hw_qmin
if qmax == 'auto': qmax = hw_qmax
else:
mqmin,mqmax = [],[]
for i in range(model.numDrivers()):
a,b = model.driver(i).getLimits()
if model.getJointType(model.driver(i).getAffectedLink()) == 'spin':
a = 0
b = math.pi*2
mqmin.append(a)
mqmax.append(b)
if qmin == 'auto': qmin = [mqmin[i] for i in indices]
if qmax == 'auto': qmax = [mqmax[i] for i in indices]
if hw_qmin is not None:
qmin = vectorops.maximum(qmin,hw_qmin)
qmax = vectorops.minimum(qmax,hw_qmax)
assert len(qmin) == len(indices)
assert len(qmax) == len(indices)
self.setFilter(indices,"joint_limit",LimitFilter(op,qmin,qmax))
[docs] def setCollisionFilter(self,world,op):
from ..model.collide import WorldCollider
robot = self.klamptModel
indices = list(range(len(self.jointData)))
if robot is None:
raise ValueError("Can't set a collision filter when klamptModel is not defined")
if world is None:
self.setFilter(indices,'self_collision',SelfCollisionFilter(op,robot))
else:
obs = []
for i in range(world.numTerrains()):
obs.append(world.terrain(i).geometry())
for i in range(world.numRigidObjects()):
obs.append(world.rigidObject(i).geometry())
self.setFilter(indices,'obstacle_collision',CollisionFilter(op,robot,obs))
def _getFilterInput(self,indices,value):
#TODO: calculate accelerations
if value == 'status': return self.status
elif value.endswith('Command'): return getattr('commanded'+value[:-7].capitalize())(indices)
try:
return getattr(self,value)(indices)
except AttributeError:
try:
return getattr(self,'get'+value[0].upper()+value[1:])(indices)
except AttributeError:
raise ValueError("Invalid filter input {}".format(value))
def _runFilter(self,indices,name,func,args):
try:
results = func(*args)
return results
except blocks.utils.BlockSignal as e:
if e.signal == 'softStop':
self.softStop(indices)
return
elif e.signal == 'warn':
print("Filter",name,"produced warning",e)
else:
print("Filter",name,"raised an unknown signal type?",e.type,e)
[docs] def commandFilter(self,indices,item,value):
if isinstance(indices,list):
indices = tuple(indices)
commandFilters = self.commandFilters.get(indices,dict())
for (k,f) in commandFilters.items():
(func,inputs,outputs) = f
if item in inputs:
args =[]
for i in inputs:
if i==item: args.append(value)
else:
args.append(self._getFilterInput(indices,i))
res = self._runFilter(indices,k,func,args)
if res is not None:
assert len(res)==len(value),"Invalid result from filter {}".format(k)
value = res
return value
[docs] def reset(self):
self.status = 'ok'
[docs] def softStop(self,indices):
self.status = 'softStop'
self.promote(range(len(self.jointData)),None) #clear pending commands
qcmd = self.commandedPosition()
qcmd = [qcmd[i] for i in indices]
self.setPosition(indices,qcmd)
[docs] def estop(self,indices):
self.status = 'estop'
self.promote(range(len(self.jointData)),None) #clear pending commands
qcmd = self.commandedPosition()
qcmd = [qcmd[i] for i in indices]
self.setPosition(indices,qcmd)
[docs] def setPosition(self,indices,q):
"""Backup: runs an immediate position command"""
if self.status != 'ok': return
self.promote(indices,'setPosition')
for (i,v) in zip(indices,q):
self.jointData[i].commandedPosition = v
[docs] def moveToPosition(self,indices,q,speed):
"""Backup: runs a position command using a piecewise linear trajectory"""
if self.status != 'ok': return
model = self.klamptModel
if model is None:
#TODO: warn that move-to is unavailable?
warnings.warn("moveToPosition is not available because base controller's klamptModel() is not implemented.")
self.promote(indices,'setPosition')
for (i,v) in zip(indices,q):
self.jointData[i].commandedPosition = v
else:
#move to command emulation using bounded velocity curve
xmin, xmax = [], []
vmax, amax = [], []
for i in indices:
a,b = model.driver(i).getLimits()
xmin.append(a)
xmax.append(b)
a,b = model.driver(i).getVelocityLimits()
vmax.append(min(-a,b)*speed)
a,b = model.driver(i).getAccelerationLimits()
amax.append(min(-a,b)*(speed**2))
assert all(v >= 0 for v in vmax)
assert all(v >= 0 for v in amax)
assert all(not math.isinf(v) for v in amax),"Infinite acceleration limits not supported"
qcmd = [(self.jointData[i].commandedPosition if self.jointData[i].commandedPosition is not None else self.jointData[i].sensedPosition) for i in indices]
dqcmd = [(self.jointData[i].commandedVelocity if self.jointData[i].commandedVelocity is not None else self.jointData[i].sensedVelocity) for i in indices]
#handle continuous rotations...
if any(self.jointData[j].continuousRotation for j in indices):
q = q[:]
for i,j in enumerate(indices):
if self.jointData[j].continuousRotation:
q[i] = qcmd[i] + self.jointData[j]._positionDifference(q[i],qcmd[i])
for i in range(len(dqcmd)):
if qcmd[i] < xmin[i]:
xmin[i] = qcmd[i]
if qcmd[i] > xmax[i]:
xmax[i] = qcmd[i]
#TODO: warn if this is way out of bounds
if abs(dqcmd[i]) > vmax[i]:
vmax[i] = abs(dqcmd[i])
try:
ts,xs,vs = motionplanning.interpolate_nd_min_time(qcmd,dqcmd,q,[0]*len(q),xmin,xmax,vmax,amax)
if any(len(t)==0 for t in ts):
raise ValueError("Unable to solve for min-time interpolation")
except Exception as e:
print("Couldn't solve for move to?")
print(e)
if "velocity" in str(e):
print("vcmd:",dqcmd,"vmax:",vmax)
else:
print("qcmd",qcmd,"q:",q)
print("qmin:",xmin,"qmax:",xmax)
print("vcmd:",dqcmd,"vmax:",vmax)
print("amax:",amax)
raise
try:
ts,xs,vs = motionplanning.combine_nd_cubic(ts,xs,vs)
except Exception as e:
print("Couldn't solve for combine?")
print("qcmd",qcmd,"q:",q)
print("qmin:",xmin,"qmax:",xmax)
print("vcmd:",dqcmd,"vmax:",vmax)
print("amax:",amax)
print(ts)
print(xs)
print(vs)
raise
self.setPiecewiseCubic(indices,ts,xs,vs,True)
[docs] def setVelocity(self,indices,v,ttl):
"""Backup: runs a velocity command for ttl seconds using a piecewise linear
trajectory. If ttl is not specified, uses ttl=1."""
if self.status != 'ok': return
qcmd = [self.jointData[i].commandedPosition if self.jointData[i].commandedPosition is not None else self.jointData[i].sensedPosition for i in indices]
if ttl is None:
ttl = 1.0
model = self.klamptModel
if model is not None:
limits = [model.driver(i).getLimits() for i in indices]
xmin,xmax = zip(*limits)
#stop when the first joint limit is hit
ttl = 1.0
for i in range(len(indices)):
if qcmd[i] < xmin[i] or qcmd[i] > xmax[i]:
raise ValueError("Current position %d is out of joint limits: %f <= %f <= %f"%(i,xmin[i],qcmd[i],xmax[i]))
if qcmd[i] + ttl*v[i] < xmin[i]:
ttl = (xmin[i]+1e-3-qcmd[i])/v[i]
if qcmd[i] + ttl*v[i] > xmax[i]:
ttl = (xmax[i]-1e-3-qcmd[i])/v[i]
q0 = qcmd
q = vectorops.madd(q0,v,ttl)
self.setPiecewiseLinear(indices,[ttl],[q],True)
[docs] def setTorque(self,indices,t,ttl):
if self.status != 'ok': return
self.promote(indices,'setTorque')
for i,v in zip(indices,t):
self.jointData[i].commandedTorque = v
self.jointData[i].commandTTL = ttl
[docs] def setPID(self,indices,q,dq,t):
if self.status != 'ok': return
if t is None:
t = [0.0]*len(q)
self.promote(indices,'setPID')
for i,qi,dqi,ti in zip(indices,q,dq,t):
self.jointData[i].pidCmd = (qi,dqi,ti)
[docs] def setPIDGains(self,indices,kP,kI,kD):
if self.status != 'ok': return
for i,P,I,D in zip(indices,kP,kI,kD):
self.jointData[i].pidGains = (kP,kI,kD)
[docs] def getPIDGains(self,indices):
if self.status != 'ok': return
kP,kI,kD = [],[],[]
for i in indices:
if self.jointData[i].pidGains is None:
if kP:
raise RuntimeError("Some joints have PID gains and others do not?")
else:
P,I,D = self.jointData[i].pidGains
kP.append(P)
kI.append(I)
kD.append(D)
if not kP:
return None,None,None
return kP,kI,kD
[docs] def setPiecewiseLinear(self,indices,ts,qs,relative):
assert self.curClock is not None
assert len(ts) == len(qs)
if self.status != 'ok': return
if relative:
if any(t < 0 for t in ts):
raise ValueError("Can't set a trajectory with negative relative times")
ts = [t+self.curClock for t in ts]
else:
if any(t < self.curClock for t in ts):
raise ValueError("Can't set a trajectory with absolute times < clock time")
for i,q in enumerate(qs):
assert len(q) == len(indices),"Trajectory milestone %d is of incorrect size %d != %d"%(i,len(q),len(indices))
if ts[0] > self.curClock:
ts = [self.curClock] + ts
q0 = []
for i in indices:
if self.jointData[i].commandedPosition is None:
if self.jointData[i].sensedPosition is None:
raise RuntimeError("Can't set a trajectory before first controller clock cycle")
q0.append(self.jointData[i].sensedPosition)
else:
q0.append(self.jointData[i].commandedPosition)
qs = [q0] + qs
self.promote(indices,'setPiecewiseLinear')
for k,i in enumerate(indices):
self.jointData[i].trajectoryTimes = ts
self.jointData[i].trajectoryMilestones = [q[k] for q in qs]
[docs] def setPiecewiseCubic(self,indices,ts,qs,vs,relative):
assert self.curClock is not None
assert len(ts) == len(qs)
assert len(ts) == len(vs)
if self.status != 'ok': return
if len(ts)==0:
return
if relative:
if any(t < 0 for t in ts):
raise ValueError("Can't set a trajectory with negative relative times")
ts = [t+self.curClock for t in ts]
else:
if any(t < self.curClock for t in ts):
raise ValueError("Can't set a trajectory with absolute times < clock time")
for q in qs:
assert len(q) == len(indices),"Trajectory milestone is of incorrect size"
for v in vs:
assert len(v) == len(indices),"Trajectory milestone velocity is of incorrect size"
if ts[0] > self.curClock:
ts = [self.curClock] + ts
qs = [[]] + qs
vs = [[]] + vs
for i in indices:
if self.jointData[i].commandedPosition is None:
if self.jointData[i].sensedPosition is None:
raise RuntimeError("Can't set a trajectory before first controller clock cycle")
qs[0].append(self.jointData[i].sensedPosition)
else:
qs[0].append(self.jointData[i].commandedPosition)
if self.jointData[i].commandedVelocity is None:
if self.jointData[i].sensedVelocity is None:
vs[0].append(0)
else:
vs[0].append(self.jointData[i].sensedVelocity)
else:
vs[0].append(self.jointData[i].commandedVelocity)
self.promote(indices,'setPiecewiseCubic')
for k,i in enumerate(indices):
self.jointData[i].trajectoryTimes = ts
self.jointData[i].trajectoryMilestones = [q[k] for q in qs]
self.jointData[i].trajectoryVelocities = [v[k] for v in vs]
[docs] def isMoving(self,indices):
for i in indices:
j = self.jointData[i]
if j.controlMode == 'setVelocity':
if j.commandedVelocity != 0:
return True
elif j.controlMode == 'setPosition':
if abs(j.commandedPosition - j.sensedPosition) > 1e-3:
return True
elif j.controlMode == 'setPID':
if j.commandedVelocity != 0:
return True
if abs(j.commandedPosition - j.sensedPosition) > 1e-3:
return True
elif j.controlMode in ['setPiecewiseLinear','setPiecewiseCubic']:
if self.curClock < j.destinationTime(self.curClock):
return True
return False
[docs] def commandedPosition(self):
res = [j.commandedPosition for j in self.jointData]
if not _valid_vector(res): raise RuntimeError("commandedPosition queried before a step could be taken?")
return res
[docs] def commandedVelocity(self):
res = [j.commandedVelocity for j in self.jointData]
if not _valid_vector(res): raise RuntimeError("commandedVelocity queried before a step could be taken?")
return res
[docs] def commandedTorque(self):
res = [j.commandedTorque for j in self.jointData]
if not _valid_vector(res): raise RuntimeError("no commanded torque sent to emulator")
return res
[docs] def sensedPosition(self):
res = [j.sensedPosition for j in self.jointData]
if not _valid_vector(res): raise RuntimeError("sensedPosition queried before a step could be taken?")
return res
[docs] def sensedVelocity(self):
return [j.sensedVelocity for j in self.jointData]
[docs] def destinationPosition(self):
return [j.destinationPosition() for j in self.jointData]
[docs] def destinationVelocity(self):
return [j.destinationVelocity() for j in self.jointData]
[docs] def destinationTime(self):
return max(j.destinationTime(self.curClock) for j in self.jointData)
[docs] def setGravityCompensation(self,indices,gravity,load,load_com):
raise NotImplementedError("TODO: implement gravity compensation?")
[docs] def getGravityCompensation(self,indices):
raise NotImplementedError("TODO: implement gravity compensation?")
[docs] def setCartesianPosition(self,indices,xparams,frame):
if self.status != 'ok': return
try:
c = self.cartesianInterfaces[tuple(indices)]
except KeyError:
c = _CartesianEmulatorData(self.klamptModel,indices)
self.cartesianInterfaces[tuple(indices)] = c
for i,j in enumerate(self.jointData):
self.klamptModel.driver(i).setValue(j.sensedPosition if j.commandedPosition is None else j.commandedPosition)
c.active = False
qKlamptNext = c.solveIK(xparams,frame)
self.klamptModel.setConfig(qKlamptNext)
self.setPosition(indices,[self.klamptModel.driver(i).getValue() for i in indices])
[docs] def moveToCartesianPosition(self,indices,xparams,speed,frame):
assert speed > 0
if self.status != 'ok': return
try:
c = self.cartesianInterfaces[tuple(indices)]
except KeyError:
c = _CartesianEmulatorData(self.klamptModel,indices)
self.cartesianInterfaces[tuple(indices)] = c
for i,j in enumerate(self.jointData):
self.klamptModel.driver(i).setValue(j.sensedPosition if j.commandedPosition is None else j.commandedPosition)
c.active = False
qKlamptNext = c.solveIK(xparams,frame)
self.klamptModel.setConfig(qKlamptNext)
self.moveToPosition(indices,[self.klamptModel.driver(i).getValue() for i in indices],speed)
[docs] def moveToCartesianPositionLinear(self,indices,xparams,speed,frame):
assert speed > 0
if self.status != 'ok': return
try:
c = self.cartesianInterfaces[tuple(indices)]
except KeyError:
c = _CartesianEmulatorData(self.klamptModel,indices)
self.cartesianInterfaces[tuple(indices)] = c
for i,j in enumerate(self.jointData):
self.klamptModel.driver(i).setValue(j.sensedPosition if j.commandedPosition is None else j.commandedPosition)
qstart = [j.sensedPosition if j.commandedPosition is None else j.commandedPosition for j in self.jointData]
c.moveToCartesianPositionLinear(qstart,xparams,speed,frame)
for i in indices:
self.jointData[i].externalController = c
[docs] def setCartesianVelocity(self,indices,dxparams,ttl,frame):
if self.status != 'ok': return
try:
c = self.cartesianInterfaces[tuple(indices)]
except KeyError:
c = _CartesianEmulatorData(self.klamptModel,indices)
self.cartesianInterfaces[tuple(indices)] = c
qstart = [j.sensedPosition if j.commandedPosition is None else j.commandedPosition for j in self.jointData]
c.setCartesianVelocity(qstart,dxparams,ttl,frame)
for i in indices:
self.jointData[i].externalController = c
[docs] def setCartesianForce(self,indices,fparams,ttl,frame):
if self.status != 'ok': return
try:
c = self.cartesianInterfaces[tuple(indices)]
except KeyError:
c = _CartesianEmulatorData(self.klamptModel,indices)
self.cartesianInterfaces[tuple(indices)] = c
raise NotImplementedError("TODO: implement cartesian force?")
qstart = [j.sensedPosition if j.commandedPosition is None else j.commandedPosition for j in self.jointData]
c.setCartesianForce(qstart,fparams,ttl,frame)
[docs] def cartesianPosition(self,indices,q,frame):
"""
Emulates cartesianPosition. Can also get link transforms.
Args:
indices (str, int, or tuple): if str or int, retrieves a link
transform, and ``frame`` must be "world". If a tuple,
retrieves the position of the cartesian controller for the
given indices, in the given frame.
q (Vector): the configuration of the full body
frame (str): either "world", "base", "end effector", or "tool"
"""
assert len(q) == len(self.jointData),"cartesianPosition: must use full-body configuration, {} != {}".format(len(q),len(self.jointData))
if any(v is None for v in q):
q = [0.0 if v is None else v for v in q]
if isinstance(indices,(str,int)): #single link
assert frame=='world','Can only retrieve world coordinates for single link'
self.klamptModel.setConfig(self.klamptModel.configFromDrivers(q))
return self.klamptModel.link(indices).getTransform()
try:
c = self.cartesianInterfaces[tuple(indices)]
return c.cartesianPosition(q,frame)
except KeyError:
raise ValueError("Invalid Cartesian index set for emulator: no command currently set")
[docs] def cartesianVelocity(self,indices,q,dq,frame):
"""
Emulates cartesianVelocity. Can also get link velocities.
Args:
indices (str, int, or tuple): if str or int, retrieves a link
velocity, and ``frame`` must be "world". If a tuple,
retrieves the velocity of the cartesian controller for the
given indices, in the given frame.
q (Vector): the configuration of the full body
dq (Vector): the joint velocities of the full body
frame (str): either "world", "base", "end effector", or "tool"
"""
assert len(q) == len(self.jointData),"cartesianVelocity: must use full-body configuration, {} != {}".format(len(q),len(self.jointData))
assert len(dq) == len(self.jointData),"cartesianVelocity: must use full-body configuration, {} != {}".format(len(dq),len(self.jointData))
if any(v is None for v in q):
q = [0.0 if v is None else v for v in q]
if any(v is None for v in dq):
dq = [0.0 if v is None else v for v in dq]
if isinstance(indices,(str,int)): #single link
assert frame=='world','Can only retrieve world coordinates for single link'
self.klamptModel.setConfig(self.klamptModel.configFromDrivers(q))
self.klamptModel.setVelocity(self.klamptModel.velocityFromDrivers(dq))
link = self.klamptModel.link(indices)
return link.getAngularVelocity(),link.getVelocity()
try:
c = self.cartesianInterfaces[tuple(indices)]
return c.cartesianVelocity(q,dq,frame)
except KeyError:
raise ValueError("Invalid Cartesian index set for emulator: no command currently set")
[docs] def cartesianForce(self,indices,q,t,frame):
assert len(q) == len(self.jointData),"cartesianForce: must use full-body configuration, {} != {}".format(len(q),len(self.jointData))
assert len(t) == len(self.jointData),"cartesianForce: must use full-body configuration, {} != {}".format(len(t),len(self.jointData))
if any(v is None for v in q):
q = [0.0 if v is None else v for v in q]
if any(v is None for v in t):
t = [0.0 if v is None else v for v in t]
try:
c = self.cartesianInterfaces[tuple(indices)]
return c.cartesianForce(q,t,frame)
except KeyError:
raise ValueError("Invalid Cartesian index set for emulator: no command currently set")
[docs] def printStatus(self):
joint_control = [True]*len(self.jointData)
for k,c in self.cartesianInterfaces.items():
if c.active:
print("Cartesian interface active on joints",k)
print(" Tool coordinates",c.toolCoordinates)
print(" Drive command",c.driveCommand)
for j in k:
joint_control[j] = False
for i,j in enumerate(self.jointData):
if not joint_control[i]: continue
print("Joint",i,"control mode",j.controlMode)
class _SubRobotInterfaceCompleter(RobotInterfaceCompleter):
def __init__(self,interface,part,joint_idx):
RobotInterfaceCompleter.__init__(self,interface._base)
#it's a sub-interface
assert isinstance(interface,RobotInterfaceCompleter)
assert not isinstance(interface._base,RobotInterfaceCompleter),"Can't do nested sub-interfaces"
assert interface._emulator.curClock is not None,"Interface needs to be initialized before accessing a part"
self._parent = interface
self._base_part_interface = None
try:
self._base_part_interface = interface._base.partInterface(part,joint_idx)
except Exception:
pass
self.properties = interface.properties.copy()
if part is not None:
if joint_idx is None:
self.properties['name'] = part
else:
self.properties['name'] = part + '[' + str(joint_idx) + ']'
else:
self.properties['name'] = str(joint_idx)
self.properties['part'] = True
self._klamptModel = interface._klamptModel
self._worldModel = interface._worldModel
self._subRobot = True
#self._indices = interface._base.indices(part,joint_idx)
self._indices = interface.indices(part,joint_idx)
self._parts = dict()
self._parts[None] = self._indices
self._has = interface._has
self._emulator = interface._emulator
def __str__(self):
return str(self._parent)+'['+self.properties['name']+']'
def status(self,joint_idx=None):
if self._base_part_interface is not None:
return self._base_part_interface.status(joint_idx)
return RobotInterfaceCompleter.status(self,joint_idx)
def estop(self):
#if self._base_part_interface is not None:
# self._base_part_interface.estop()
#Do we want the whole robot to stop, or just the part?
return RobotInterfaceCompleter.estop(self)
def reset(self):
if self._base_part_interface is not None:
return self._base_part_interface.reset()
return RobotInterfaceCompleter.reset(self)
def addPart(self,name,indices):
raise RuntimeError("Can't add sub-parts to part")
def initialize(self):
raise RuntimeError("Can't call initialize() on a sub-robot interface.")
def beginStep(self):
raise RuntimeError("Can't call beginStep() on a sub-robot interface.")
def endStep(self):
raise RuntimeError("Can't call endStep() on a sub-robot interface.")
def _toPart(self,q):
return [q[i] for i in self._indices]
def sensedPosition(self):
return self._toPart(self._parent.sensedPosition())
def sensedVelocity(self):
return self._toPart(self._parent.sensedVelocity())
def sensedTorque(self):
return self._toPart(self._parent.sensedTorque())
def commandedPosition(self):
return self._toPart(self._parent.commandedPosition())
def commandedVelocity(self):
return self._toPart(self._parent.commandedPosition())
def commandedTorque(self):
return self._toPart(self._parent.commandedTorque())
def destinationPosition(self):
return self._toPart(self._parent.destinationPosition())
def destinationVelocity(self):
return self._toPart(self._parent.destinationVelocity())
def sensedCartesianPosition(self,frame='world'):
return self.cartesianPosition(self._parent.sensedPosition(),frame)
def sensedCartesianVelocity(self,frame='world'):
return self.cartesianVelocity(self._parent.sensedPosition(),self._parent.sensedVelocity(),frame)
def sensedCartesianForce(self,frame='world'):
return self.cartesianForce(self._parent.sensedPosition(),self._parent.sensedTorque(),frame)
def commandedCartesianPosition(self,frame='world'):
return self.cartesianPosition(self._parent.commandedPosition(),frame)
def commandedCartesianVelocity(self,frame='world'):
return self.cartesianVelocity(self._parent.commandedPosition(),self._parent.commandedVelocity(),frame)
def commandedCartesianForce(self,frame='world'):
return self.cartesianForce(self._parent.commandedPosition(),self._parent.commandedTorque(),frame)
def destinationCartesianPosition(self,frame='world'):
return self.cartesianPosition(self._parent.destinationPosition(),frame)
def destinationCartesianVelocity(self,frame='world'):
return self.cartesianVelocity(self._parent.destinationPosition(),self._parent.destinationVelocity())
def queuedCartesianTrajectory(self,frame='world'):
res = self._parent.queuedTrajectory()
if len(res) == 2:
ts,qs = res
return ts,[self.cartesianPosition(q,frame) for q in qs]
elif len(res) == 3:
ts,qs,vs = res
return ts,[self.cartesianPosition(q,frame) for q in qs],[self.cartesianVelocity(q,dq,frame) for q,dq in zip(qs,vs)]
else:
raise RuntimeError("Invalid result from queuedTrajectory")
[docs]class RobotInterfaceLogger:
"""A logger that will write parts of the RIL state to a CSV file.
TODO: make this a filter.
"""
TEST_PROPERTIES = ['controlRate','parts','sensors','enabledSensors','numJoints','indices']
TEST_METHODS = ['clock','status','isMoving',
'sensedPosition','sensedVelocity','sensedTorque','commandedPosition','commandedVelocity','commandedTorque',
'destinationPosition','destinationVelocity','destinationTime']
def __init__(self, interface : RobotInterfaceBase, file : Union[TextIO,str]):
import csv
self._interface = interface
if isinstance(file,str):
self._file = open(file,'w')
else:
self._file = file
self._writer = csv.writer(self._file)
self._first_time = True
[docs] def step(self):
if self._file is None:
raise RuntimeError("step() called after stop()")
if self._first_time:
results = []
for prop in RobotInterfaceLogger.TEST_PROPERTIES:
try:
results.append(str(getattr(self._interface,prop)()))
except Exception as e:
results.append('Error '+str(e))
self._writer.writerow(RobotInterfaceLogger.TEST_PROPERTIES)
self._writer.writerow(results)
self._writer.writerow(RobotInterfaceLogger.TEST_METHODS)
for func in RobotInterfaceLogger.TEST_METHODS:
try:
results.append(str(getattr(self._interface,func)()))
except Exception as e:
results.append('Error '+str(e))
self._writer.writerow(results)
[docs] def stop(self):
self._file.close()
self._file = None
self._writer = None
[docs]class RobotInterfaceRecorder:
"""Records / playbacks all RIL commands to disk.
Record usage::
interface = MyRobotInterface(...)
recorder = RobotInterfaceRecorder(interface,'recording.txt')
while True:
... issue commands
recorder.step() #this does the work
interface.endStep()
... sleep, visualize, etc
recorder.stop()
Playback usage::
interface = MyRobotInterface(...)
recorder = RobotInterfaceRecorder(interface,'recording.txt',playback=True)
while not recorder.done():
recorder.step() #this does the work
interface.endStep()
... sleep, visualize, etc
Args:
interface (RobotInterfaceBase): NOTE: must be stateful.
file (str or file-like): the file to save to / load from
playback (bool): whether to use record or playback mode
playback_delay (float, optional): when in playback mode, how long to
wait before the first command is issued. If None, uses the same
timing as in the original recording (assumes clocks start at 0).
"""
def __init__(self, interface : RobotInterfaceBase, file = Union[TextIO,str], playback=False, playback_delay=None):
self._interface = interface
if not hasattr(interface,'_commands'):
raise ValueError("The interface needs to be stateful. Try wrapping in a RobotInterfaceCompleter or _RobotInterfaceStatefulWrapper")
if isinstance(file,str):
if playback:
self._file = open(file,'r')
else:
self._file = open(file,'w')
else:
self._file = file
self._playback = playback
if playback:
import json
self._commands = []
while True:
try:
jsonobj = json.load(self._file)
except IOError:
break
time = float(jsonobj['time'])
cmd = _RobotInterfaceCommand(jsonobj['command'])
self._commands.append((time,cmd))
self._timewarp = 0
self._command_index = 0
if self._commands and playback_delay is not None:
self._timewarp = playback_delay - self._commands[0][0]
if isinstance(file,str):
self._file.close()
[docs] def step(self) -> None:
"""Call this after commands have been issued to ``interface`` but
before ``interface.endStep()``.
"""
if self._file is None:
return
if self._playback:
t = self._interface.clock()
while self._command_index < len(self._commands):
if t >= self._commands[self._command_index][0] + self._timewarp:
cmd = self._commands[self._command_index][1]
self._interface._commands.append(cmd)
else:
break
self._command_index += 1
else:
import json
t = self._interface.clock()
cmds = self._interface._commands
for cmd in cmds:
cmd_json = cmd.to_json()
json.dump([t,cmd_json],self._file)
[docs] def stop(self) -> None:
if self._file is None:
return
if not self._playback:
self._file.close()
self._file = None
[docs] def done(self) -> bool:
if self._file is None:
return True
if self._playback:
return self._command_index >= len(self._commands)
else:
return False