Source code for klampt.control.robotinterface

"""The main module for Klampt's Robot Interface Layer.
"""

from ..robotsim import WorldModel
import functools


[docs]class RobotInterfaceBase(object): """The main class for the Klamp't Robot Interface Layer. Defines a unifying API to interface with a robot's motor controller, whether it's simulated or a real robot. .. note:: The API may look intimidating, but a subclass implementer is free to set up as many or as few of the given methods as the robot's motor controller truly implements. The :class:`RobotInterfaceCompleter` class will fill in the remainder of derived methods. See the Functionalities section for more details. Each of these methods should be synchronous calls, called at a single time step. The calling convention is:: interface = MyRobotInterface(...args...) if not interface.initialize(): #should be called first raise RuntimeError("There was some problem initializing interface "+str(interface)) dt = 1.0/interface.controlRate() while interface.status() == 'ok': #no error handling done here... t0 = time.time() interface.startStep() [any getXXX or setXXX commands here comprising the control loop] interface.endStep() t1 = time.time() telapsed = t1 - t0 [wait for time max(dt - telapsed,0)] interface.close() #cleanly shut down the interface To accept asynchronous commands, a :class:`RobotInterfaceBase` subclass can be passed to :class:`AsynchronousRobotInterface` or :class:`RobotInterfaceServer`. **DOFs and Parts** The number of DOFs is assumed equal to the number of joint actuators / encoders. If the robot has fewer actuators than encoders, the commands for unactuated joints should just be ignored. If the robot corresponds to a Klampt model (typical), then the number of DOFs should be ``model.numDrivers()`` A robot can have "parts", which are named groups of DOFs. For example, a robot with a gripper can have parts "arm" and "gripper", which can be controlled separately. You may retrieve part names using :meth:`parts`, part indices using :meth:`indices`, and access a RIL interface to a part using :meth:`partController`. It is suggested that these parts correspond with parts in the robot's :class:`~klampt.model.robotinfo.RobotInfo` structure. **Functionalities** There are a few functions your subclass will need to fill out: * :meth:`numJoints` or :meth:`klamptModel` * Either :meth:`clock` or :meth:`controlRate` * Either :meth:`setPosition`, :meth:`moveToPosition`, :meth:`setVelocity`, :meth:`setTorque`, or :meth:`setPID` * Either :meth:`sensedPosition` or :meth:`commandedPosition` Pass your RobotInterfaceBase subclass to :class:`RobotInterfaceCompleter` to complete the implementation of as many of the remaining items as possible. See the :class:`SimPositionControlInterface` class for an example that passes commands to a Klamp't physics simulator. Attributes: properties (dict): a dict from string key to property value. Application dependent. Examples may include: * 'name': str * 'version': str * 'simulated': bool * 'klamptModelFile': str """ def __init__(self,**properties): self.properties = properties self._worldModel = None self._klamptModel = None self._klamptDriverIndices = None def __str__(self): inner = [] if 'name' in self.properties: inner.append(self.properties['name']) if 'version' in self.properties: inner.append(self.properties['version']) return "RobotInterface("+','.join(inner)+')'
[docs] def initialize(self): """Tries to connect to the robot. Returns true if ready to send commands. This should probably be the first method called. """ return True
[docs] def close(self): """Cleanly shuts down any resources acquired using initialize().""" return True
[docs] def startStep(self): """This is called before the commands sent at each time step""" pass
[docs] def endStep(self): """This is called after the commands sent at each time step""" pass
[docs] def numJoints(self,part=None): """Returns the number of joints of the given part. By default, this returns the number of actuated DOFs in the Klamp't model. """ if part is None: m = self.klamptModel() if m is None: raise NotImplementedError() return m.numDrivers() return len(self.parts()[part])
[docs] @functools.lru_cache(maxsize=None) def parts(self): """Returns a dictionary of (part-name,configuration index list) pairs defining the named parts of the robot. Since this will be used a lot, make sure to declare your override with @functools.lru_cache. """ return {None:list(range(self.numJoints()))}
[docs] def indices(self,part=None,joint_idx=None): """Helper: returns a list of indices for the given part / joint index""" plist = self.parts()[part] if joint_idx is None: return plist assert joint_idx >= 0 and joint_idx < len(plist),"Invalid joint index for part "+str(part) return [plist[joint_idx]]
[docs] def partInterface(self,part=None,joint_idx=None): """Returns a RobotInterfaceBase that allows control of the given part/joint. If no such controller exists, raises a NotImplementedError. The part/joint controller should operate on exactly the DOFs specified by self.indices(part,joint_idx). """ if part is None and joint_idx is None: return self raise NotImplementedError()
[docs] def controlRate(self): """Returns the control rate, in Hz""" raise NotImplementedError()
[docs] def clock(self): """Returns the current time on the robot's clock, in seconds""" raise NotImplementedError()
[docs] def estop(self): """Calls an emergency stop on the robot""" self.softStop()
[docs] def softStop(self): """Calls a software E-stop on the robot (braking as quickly as possible). Default implementation stops robot at current position; a better solution would slow the robot down. """ self.setPosition(self.commandedPosition())
[docs] def reset(self): """If the robot has a non-normal status code, attempt to reset it to normal operation. Returns true on success, false on failure. """ return False
[docs] def jointName(self,joint_idx): """Returns a string naming the given joint""" raise NotImplementedError()
[docs] def sensors(self): """Returns a list of names of possible sensors on this robot.""" raise NotImplementedError()
[docs] def enabledSensors(self): """Returns a list of names of enabled sensors on this robot.""" raise NotImplementedError()
[docs] def hasSensor(self,sensor): """Returns true if the given sensor can be enabled. """ return sensor in self.sensors()
[docs] def enableSensor(self,sensor,enabled=True): """Enables / disables a sensor. Returns true if successful. """ raise NotImplementedError()
[docs] def sensorMeasurements(self,name): """Returns the latest measurements from a sensor. Interpretation of the result is sensor-dependent. """ raise NotImplementedError()
[docs] def sensorUpdateTime(self,name): """Returns the clock time of the last sensor update.""" raise NotImplementedError()
[docs] def setPosition(self,q): """Sets an instantaneous position command. Args: q (list of floats): A list of floats giving the desired configuration of the robot. """ raise NotImplementedError()
[docs] def setVelocity(self,v,ttl=None): """Sets an instantaneous velocity command. Args: v (list of floats): A list of floats giving the desired velocity of each joint. ttl (float, optional): A time-to-live for this command. """ raise NotImplementedError()
[docs] def setTorque(self,t,ttl=None): """Sets a instantaneous torque command. Args: t (list of floats): A list of floats giving the desired torques at each joint. ttl (float, optional): A time-to-live for this command. """ raise NotImplementedError()
[docs] def setPID(self,q,dq,t=None): """Sets a PID command to configuration q, velocity dq, and feedforward torque t. """ raise NotImplementedError()
[docs] def setPIDGains(self,kP,kI,kD): """Sets the PID gains. Some controllers might not implement this even if they implement setPID... """ raise NotImplementedError()
[docs] def moveToPosition(self,q,speed=1.0): """Sets a move-to position command. The trajectory that the robot will take on should be extractable through getMoveToTrajectory(q). Args: q (list of floats): A list of floats giving the desired configuration of the robot. speed (float, optional): The speed at which the position should be reached. """ raise NotImplementedError()
[docs] def setPiecewiseLinear(self,ts,qs,relative=True): """Tells the robot to start a piecewise linear trajectory command. The first milestone will be interpolated from the current commanded configuration. Args: ts (list of floats): times of the trajectory's milestones qs (list of floats, or list of list of floats): list of the trajectory's milestones relative (bool): if true, the times in `ts` are assumed to start at current time 0. Otherwise, they must all be greater than the current time retrieved by clock(). """ raise NotImplementedError()
[docs] def setPiecewiseCubic(self,ts,qs,vs): """Tells the robot to start a piecewise cubic trajectory command. The first milestone will be interpolated from the current commanded configuration / velocity. Args: ts (list of floats): times of the trajectory's milestones qs (list of floats, or list of list of floats): list of the trajectory's milestones vs (list of floats, or list of list of floats): list of the trajectory's derivatives at the milestones. relative (bool): if true, the times in `ts` are assumed to start at current time 0. Otherwise, they must all be greater than the current time retrieved by clock(). """ raise NotImplementedError()
[docs] def setToolCoordinates(self,xtool_local): """Sets the tool coordinates of this robot relative to its end effector link.""" raise NotImplementedError()
[docs] def getToolCoordinates(self): """Gets the tool coordinates of this robot relative to its end effector link.""" raise NotImplementedError()
[docs] def setGravityCompensation(self,gravity=[0,0,-9.8],load=0.0,load_com=[0,0,0]): """Sets up gravity compensation with a given gravity vector and end effector load. Args: gravity (list of 3 floats, optional): the gravity vector in the base frame, in m/s^2. load (float, optional): a weight attached to the end effector, in kg. load_com (list of 3 floats, optional): the COM of the load, expressed relative to the end-effector link frame. """ raise NotImplementedError()
[docs] def setCartesianPosition(self,xparams): """Sets a Cartesian position command. Like setPosition, this command is sent in immediate mode. Args: xparams: typically a klampt.math.se3 object for position / orientation commands, or a 3-vector for position-only. """ raise NotImplementedError()
[docs] def moveToCartesianPosition(self,xparams,speed=1.0): """Sets a Cartesian move-to-position command. Args: xparams: typically a klampt.math.se3 object for position / orientation commands, or a 3-vector for position-only. speed (float, optional): The speed at which the position should be reached. """ raise NotImplementedError()
[docs] def setCartesianVelocity(self,dxparams,ttl=None): """Sets a Cartesian velocity command. Args: dxparams: typically an (angular velocity, translational velocity) pair for position / orientation commands. A 3-vector for position-only. ttl (float, optional): A time-to-live for this command. """ raise NotImplementedError()
[docs] def setCartesianForce(self,fparams,ttl=None): """Sets a Cartesian torque command. Args: fparams: typically an (torque, force) pair for position / orientation commands. A 3-vector for position-only. ttl (float, optional): A time-to-live for this command. """ raise NotImplementedError()
[docs] def status(self,part=None,joint_idx=None): """Returns a status string for the given part / joint. 'ok' means everything is OK.""" if part is not None or joint_idx is not None: return self.getPartController(part,joint_idx).status() return 'ok'
[docs] def isMoving(self,part=None,joint_idx=None): """Returns true if the part / joint are currently moving""" if part is not None or joint_idx is not None: return self.getPartController(part,joint_idx).isMoving() raise NotImplementedError()
[docs] def sensedPosition(self): """Retrieves the currently sensed joint position. """ raise NotImplementedError()
[docs] def sensedVelocity(self): """Retrieves the currently sensed joint velocity. """ raise NotImplementedError()
[docs] def sensedTorque(self): """Retrieves the currently sensed joint torque. """ raise NotImplementedError()
[docs] def commandedPosition(self): """Retrieves the currently commanded joint position. """ raise NotImplementedError()
[docs] def commandedVelocity(self): """Retrieves the currently commanded joint velocity. """ raise NotImplementedError()
[docs] def commandedTorque(self): """Retrieves the currently commanded joint torque. """ raise NotImplementedError()
[docs] def destinationPosition(self): """Retrieves the destination of a motion queue controller. """ raise NotImplementedError()
[docs] def destinationVelocity(self): """Retrieves the final velocity of a motion queue controller. """ raise NotImplementedError()
[docs] def destinationTime(self): """Retrieves the final clock time of a motion queue controller. """ raise NotImplementedError()
[docs] def queuedTrajectory(self): """Returns a trajectory starting from the current time representing all commands in a motion queue controller. Returns: tuple: either (ts,qs) or (ts,qs,vs) representing a piecewise linear or a piecewise-cubic trajectory. """ raise NotImplementedError()
[docs] def cartesianPosition(self,q): """Converts from a joint position vector to a cartesian position""" raise NotImplementedError()
[docs] def cartesianVelocity(self,q,dq): """Converts from a joint position / velocity vector to a cartesian velocity""" raise NotImplementedError()
[docs] def cartesianForce(self,q,t): """Converts from a joint position / torque vector to a cartesian force""" raise NotImplementedError()
[docs] def sensedCartesianPosition(self): return self.cartesianPosition(self.sensedPosition())
[docs] def sensedCartesianVelocity(self): return self.cartesianVelocity(self.sensedPosition(),self.sensedVelocity())
[docs] def sensedCartesianForce(self): return self.cartesianForce(self.sensedPosition(),self.sensedTorque())
[docs] def commandedCartesianPosition(self): return self.cartesianPosition(self.commandedPosition())
[docs] def commandedCartesianVelocity(self): return self.cartesianVelocity(self.commandedPosition(),self.commandedVelocity())
[docs] def commandedCartesianForce(self): return self.cartesianForce(self.commandedPosition(),self.commandedTorque())
[docs] def destinationCartesianPosition(self): """Retrieves the Cartesian destination of a motion queue controller""" return self.cartesianPosition(self.destinationPosition())
[docs] def destinationCartesianVelocity(self): """Retrieves the final Cartesian velocity of a motion queue controller""" return self.cartesianVelocity(self.destinationPosition(),self.desinationVelocity())
[docs] def queuedCartesianTrajectory(self): """Returns the Cartesian trajectory starting from the current time representing all commands in a motion queue controller. Returns: tuple: either (ts,Ts) or (ts,Ts,dTs) representing a piecewise linear or a piecewise-cubic trajectory. """ res = self.queuedTrajectory() if len(res) == 2: ts,qs = res return ts,[self.cartesianPosition(q) for q in qs] elif len(res) == 3: ts,qs,vs = res return ts,[self.cartesianPosition(q) for q in qs],[self.cartesianVelocity(q,dq) for q,dq in zip(qs,vs)] else: raise RuntimeError("Invalid result from queuedTrajectory")
[docs] def partToRobotConfig(self,pconfig,part,robotConfig,joint_idx=None): """Fills a configuration vector for the whole robot given the configuration `pconfig` for a part""" pindices = self.indices(part,joint_idx) q = [v for v in robotConfig] assert len(pindices) == len(pconfig) for (i,x) in zip(pindices,pconfig): q[i] = x return q
[docs] def robotToPartConfig(self,robotConfig,part,joint_idx=None): """Retrieves a part's configuration from a robot configuration""" pindices = self.indices(part,joint_idx) return [robotConfig[i] for i in pindices]
[docs] def klamptModel(self): """If applicable, returns the Klamp't RobotModel associated with this controller. Default tries to load from properties['klamptModelFile']. Note: the result of the default implementation is cached, so this can be called multiple times without a performance hit. """ if self._klamptModel is not None: return self._klamptModel if 'klamptModelFile' in self.properties: if hasattr(self,'_worldModel'): if self._worldModel is not None: raise RuntimeError("Hmm... caching doesn't seem to be working right?") self._worldModel = WorldModel() if not self._worldModel.loadFile(self.properties['klamptModelFile']): self._worldModel = None else: self._klamptModel = self._worldModel.robot(0) return self._klamptModel return None
[docs] def configFromKlampt(self,klamptConfig,part=None,joint_idx=None): """Extracts a RobotInterfaceBase configuration from a configuration of the Klampt model. Note: the configuration of the model in self.klamptModel() is overridden. """ model = self.klamptModel() if model is None: return klamptConfig if len(klamptConfig) != model.numLinks(): raise ValueError("Length of klamptConfig is invalid for "+str(self)) qdrivers = model.configToDrivers(klamptConfig) if part is None and joint_idx is None: return qdrivers return [qdrivers[i] for i in self.indices(part,joint_idx)]
[docs] def velocityFromKlampt(self,klamptVelocity,part=None,joint_idx=None): """Extracts a RobotInterfaceBase velocity from a velocity of the Klampt model.""" model = self.klamptModel() if model is None: return klamptVelocity if len(klamptVelocity) != model.numLinks(): raise ValueError("Length of klamptVelocity is invalid for "+str(self)) vdrivers = model.velocityToDrivers(klamptVelocity) if part is None and joint_idx is None: return vdrivers return [vdrivers[i] for i in self.indices(part,joint_idx)]
[docs] def configToKlampt(self,config,klamptConfig=None,part=None,joint_idx=None): """Creates a configuration vector for the Klamp't model using the RobotInterfaceBase configuration. If klamptConfig is given, then these values are used for the non-part configuration values. Otherwise, the robot's current configuration from self.klamptModel() is used. Note: the configuration of the model in self.klamptModel() is overridden. """ model = self.klamptModel() if model is None: return config dofs = self.indices(part,joint_idx) if len(dofs) != len(config): raise ValueError("Length of config is invalid for "+str(self)) if klamptConfig is not None: model.setConfig(klamptConfig) if part is None and joint_idx is None: return model.configFromDrivers(config) else: for (i,x) in zip(dofs,config): model.driver(i).setValue(x) return model.getConfig()
[docs] def velocityToKlampt(self,velocity,klamptVelocity=None,part=None,joint_idx=None): """Creates a velocity vector for a Klamp't model using the joint velocity. If klamptVelocity is given, then these values are used for the non-part configuration values. Otherwise, the robot's current velocity from self.klamptModel() is used. Note: the velocity of the model in self.klamptModel() is overridden. """ model = self.klamptModel() if model is None: return velocity dofs = self.indices(part,joint_idx) if len(dofs) != len(velocity): raise ValueError("Length of velocity is invalid for "+str(self)) if klamptVelocity is not None: model.setVelocity(klamptVelocity) if part is None and joint_idx is None: return model.velocityFromDrivers(velocity) else: for (i,x) in zip(dofs,velocity): model.driver(i).setVelocity(x) return model.getVelocity()