Source code for klampt.vis.ipython.widgets

"""A Jupyter Notebook interface to Klampt.

Examples:
    Basic usage::

        from klampt import *
        from klampt.vis.ipython import KlamptWidget
        from IPython.display import display

        world = WorldModel()
        ... #set up the world...
        kvis = KlamptWidget(world,width=800,height=640)
        display(kvis)   # This pops up a window in Jupyter

    Immedate changes can be made using the methods in KlamptWidget::

        kvis.addText(name="text_id",text="hello",position=(10,10))
        kvis.addSphere(x=0,y=1.5,z=0,r=0.4)

    Change the configuration of things in the world, and then call update() to see the changes::

        robot = world.robot(0)
        q = robot.getConfig()
        q[2] += 1.0
        robot.setConfig(q)
        kvis.update()   # The previous changes are not made until this is called

    If you completely change the number of objects in the world, or their underlying geometries,
    you will need to call w.setWorld(world) again.  This is relatively expensive, so try not to
    do it too often::

        world.readElement(...)
        kvis.setWorld(world)

"""

from klampt import threejs_get_scene,threejs_get_transforms
from klampt.math import vectorops,so3,se3
from klampt.model import types
from klampt.model.trajectory import Trajectory,RobotTrajectory,SE3Trajectory
from klampt import RobotModel,RobotModelLink
import json
import time
import math
import ipywidgets as widgets
from ipywidgets import interact, interactive, fixed, interact_manual
from traitlets import Unicode, Dict, List, Int, validate, observe
import traitlets
import threading
import warnings

DEFAULT_POINT_RADIUS = 0.05
DEFAULT_AXIS_LENGTH = 0.2
DEFAULT_AXIS_WIDTH = 1
VALID_ITEM_TYPES = set(['Config','Configs','Vector3','RigidTransform','Trajectory','Geometry3D','TriangleMesh','WorldModel'])

[docs]class KlamptWidget(widgets.DOMWidget): """ A Python interface with the Jupyter notebook frontend. The API is similar to the vis module, but has a reduced and slightly modified set of hooks. Attributes: width (Int): the width of the view in pixels (public property) height (Int): the height of the view in pixels (public property) scene (Dict): the scene JSON message (private) transforms (Dict): the transforms JSON message (private) rpc (Dict): the rpc JSON message (private) _camera (Dict): the incoming camera JSON message from the frontend (private) camera (Dict): the outgoing camera JSON message (private) drawn (Int): the incoming drawn message from the frontend (private) events (List): incoming events from the frontend (private) world (WorldModel): the WorldModel isinstance _extras (dict): a dict mapping extra item names to (type,threejs_items) pairs _rpc_calls (list): a list of pending RPC calls between beginRpc() and endRpc() _aggregating_rpc (int): non-zero if between beginRpc and endRpc """ _model_name = Unicode('KlamptModel').tag(sync=True) _view_name = Unicode('KlamptView').tag(sync=True) _model_module = Unicode('klampt-jupyter-widget').tag(sync=True) _view_module = Unicode('klampt-jupyter-widget').tag(sync=True) _model_module_version = Unicode('0.1.2').tag(sync=True) _view_module_version = Unicode('0.1.2').tag(sync=True) width = Int(800).tag(sync=True) height = Int(600).tag(sync=True) scene = Dict().tag(sync=True) transforms = Dict().tag(sync=True) rpc = Dict().tag(sync=True) _camera = Dict().tag(sync=True) events = List().tag(sync=True) drawn = Int(0).tag(sync=True) def __init__(self,world=None,*args,**kwargs): widgets.DOMWidget.__init__(self,*args,**kwargs) self.world = world self._extras = dict() self._aggregating_rpc = 0 self._rpc_calls = [] if world is not None: self.setWorld(world) self.rpc = {} self.displayed = False self.beginRpc(True) return def __repr__(self): if not self.displayed: self.endRpc(True) return widgets.DOMWidget.__repr__(self)
[docs] def setWorld(self,world): """Resets the world to a new WorldModel object. """ self.world = world self._extras = dict() self._aggregating_rpc = 0 self._rpc_calls = [] s = threejs_get_scene(self.world) self.scene = json.loads(s)
[docs] def update(self): """Updates the view with changes to the world. Unlike setWorld(), this only pushes the geometry transforms, so it's much faster.""" if self.world: s = threejs_get_transforms(self.world) self.transforms = json.loads(s)
[docs] def clear(self): """Clears everything from the visualization, including the world.""" self._extras = dict() self._aggregating_rpc = 0 self._rpc_calls = [] self._do_rpc({'type':'reset_scene'}) self.drawn = 0 self.displayed = False self.beginRpc(True) self.world = None
[docs] def clearExtras(self): """Erases all ghosts, lines, points, text, etc from the visualization, but keeps the world.""" self._extras = dict() self._do_rpc({'type':'clear_extras'})
#TODO: implement this to be more similar to the vis API #def clearText(self):
[docs] def add(self,name,item,type='auto',**kwargs): """Adds the item to the world, and returns a list of identifiers associated with it. Args: name (str): the name of the item, which will be used to refer to it from now on item: the item data type (str, optional): either 'auto' (default) or a string describing the type of ``item``, which can help disambiguate some types like 'Config' vs 'Vector3' (see below) kwargs: possible attributes. Examples include color, size, length, and width Supports items of type: * Config, as a ghost (list, same size as robot) * Configs, as a set of ghosts (list of lists, same size as robot) * Vector3, drawn as a sphere (3-list) * RigidTransform, drawn as an xform (pair of 9-list and 3-list) * Configs, drawn as a polyline (list of 3-lists) * Trajectory, drawn either as: * a polyline (3D Trajectory objects), * set of milestones (Trajectory or RobotTrajectory objects) * a polyline + set of rigid transform milestones (SE3Trajectory objects) * WorldModel, but only one world at once is supported (same as setWorld). """ if type == 'auto': try: candidates = types.object_to_types(item,self.world) except Exception: raise ValueError("Invalid item, not a known Klamp't type") if isinstance(candidates,(list,tuple)): #print("KlamptWidget.add: multiple matching types:",candidates) if 'Config' in candidates: if self.world is None: candidates.remove('Config') else: match = any(len(item) == self.world.robot(i).numLinks() for i in range(self.world.numRobots())) if not match: candidates.remove('Config') new_candidates = [v for v in candidates if v in VALID_ITEM_TYPES] if len(new_candidates)==0: raise ValueError("Invalid item, types %s not supported by IPython widget"%(str(candidates),)) type = new_candidates[0] else: type = candidates if type == 'Config': res = self.addGhost(name) self.setGhostConfig(item,name=name) if 'color' in kwargs: KlamptWidget.setColor(self,res,*kwargs['color']) return [res] elif type == 'Configs': if len(item[0]) == 3: #it's a polyline self.addPolyline(name,item) if 'color' in kwargs: KlamptWidget.setColor(self,name,*kwargs['color']) return [name] else: #it's a set of configurations names = [] for i,q in enumerate(item): iname = name+'_'+str(i) self.addGhost(iname) self.setGhostConfig(q,name=iname) names.append(iname) self._extras[name] = ('Configs',names) if 'color' in kwargs: KlamptWidget.setColor(self,name,*kwargs['color']) return names elif type == 'Vector3': self.addSphere(name,item[0],item[1],item[2],kwargs.get('size',DEFAULT_POINT_RADIUS)) if 'color' in kwargs: KlamptWidget.setColor(self,name,*kwargs['color']) return [name] elif type == 'RigidTransform': self.addXform(name,length=kwargs.get('length',DEFAULT_AXIS_LENGTH),width=kwargs.get('width',DEFAULT_AXIS_WIDTH)) self.setTransform(name,R=item[0],t=item[1]) return [name] elif type == 'Trajectory': if isinstance(item,SE3Trajectory): res = [] ttraj = [] for i in item.milestones: T = item.to_se3(item.milestones[i]) res += self.add(name+"_milestone_"+str(i),T) ttraj.append(T[1]) res += self.add(name,ttraj,**kwargs) self._extras[name] = ('Trajectory',res) return res elif isinstance(item,RobotTrajectory): #it's a set of configurations rindex = item.robot.index names = [] for i,q in enumerate(item.milestones): iname = name+'_'+str(i) self.addGhost(iname,rindex) self.setGhostConfig(q,iname,rindex) names.append(iname) self._extras[name] = ('Configs',names) if 'color' in kwargs: for name in names: KlamptWidget.setColor(self,name,*kwargs['color']) return names else: return self.add(name,item.milestones,**kwargs) elif type == 'Geometry3D': if item.type() == 'PointCloud': pc = item.getPointCloud() res = self.add(name,pc,'PointCloud',**kwargs) self.setTransform(name,*item.getCurrentTransform()) return res else: g = item.convert('TriangleMesh') tris = g.getTriangleMesh() res = self.add(name,tris,'TriangleMesh',**kwargs) self.setTransform(name,*item.getCurrentTransform()) return res elif type == 'TriangleMesh': tris = item data = ([v for v in tris.vertices],[i for i in tris.indices]) self._extras[name] = ('Trilist',data) self._do_rpc({'type':'add_trimesh','name':name,'verts':data[0],'tris':data[1]} ) if 'color' in kwargs: KlamptWidget.setColor(self,name,*kwargs['color']) return [name] elif type == 'PointCloud': pc = item from klampt.model import geometry colors = geometry.point_cloud_colors(pc,'rgb') data = ([v for v in pc.vertices],colors) self._extras[name] = ('Points',data) msg = {'type':'add_points','name':name,'verts':data[0],'size':kwargs.get('size',0.01)} if colors is not None: msg['colors'] = colors self._do_rpc(msg) if 'color' in kwargs: KlamptWidget.setColor(self,name,*kwargs['color']) return [name] elif type == 'WorldModel': if name != 'world' or self.world is not None: warnings.warn("KlamptWidget.add: only one world is supported, and should be added as world") self.world = item s = threejs_get_scene(self.world) self.scene = json.loads(s) else: raise ValueError("KlamptWidget can't handle objects of type "+type+" yet")
[docs] def remove(self,name): """Removes a certain named target, e.g. a ghost, line, text, etc.""" self._do_rpc({'type':'remove','object':name})
def hide(self,name,hidden=True): """Hides/shows named target, e.g. a ghost, line, text, etc.""" self._do_rpc({'type':'set_visible','object':name,'value':(not hidden)})
[docs] def resetCamera(self): """Resets the camera to the original view""" self._do_rpc({'type':'reset_camera'})
[docs] def getCamera(self): """Returns a data structure representing the current camera view""" res = dict(self._camera).copy() if 'r' in res: del res['r'] return res
[docs] def setCamera(self,cam): """Sets the current camera view""" msg = dict(cam).copy() msg['type'] = 'set_camera' self._do_rpc(msg) marked = dict(cam).copy() marked['r'] = 1 self._camera = marked
[docs] def hide(self,name,value=False): """Changes the visibility status of a certain named target""" target_name = name if name in self._extras: type,data = self._extras[name] if type == 'Config': target_name = data elif type == 'Configs' or type == 'Trajectory': self.beginRpc(strict=False) for subitem in data: self._do_rpc({'type':'set_visible','object':subitem,'value':value}) self.endRpc(strict=False) return self._do_rpc({'type':'set_visible','object':target_name,'value':value})
[docs] def setColor(self,target,r,g,b,a=1.0): """Sets the given RobotModel, RobotModelLink, named link, indexed link, or object name to some RGBA color (each channel in the range [0,1]).""" recursive=False target_name = None if isinstance(target, (int, float, complex)): robot = self.world.robot(0) target_as_link = robot.link(target) target_name=target_as_link.getName() elif isinstance(target,RobotModelLink): target_name=target.getName() elif isinstance(target,RobotModel): target_name=target.getName() recursive = True elif isinstance(target, str): target_name=target if target in self._extras: type,data = self._extras[target] if type == 'Config': target_name = data recursive = True elif type == 'Configs' or type == 'Trajectory': #it's a group set everything under the group self.beginRpc(strict=False) for subitem in data: KlamptWidget.setColor(self,subitem,r,g,b,a) self.endRpc(strict=False) return else: #see if it's the name of a robot try: self.world.robot(target).index recursive = True except Exception: found = False for i in range(self.world.numRobots()): if self.world.robot(i).link(target).index >= 0: found = True break if not found: raise ValueError("ERROR: setColor requires target of either robot, link, index, or string name of object!") else: raise ValueError("ERROR: setColor requires target of either robot, link, index, or string name of object!") rgba_color = [r,g,b,a] if recursive: self._do_rpc({'type':'set_color','object':target_name,'rgba':rgba_color,'recursive':True}) else: self._do_rpc({'type':'set_color','object':target_name,'rgba':rgba_color})
#print "Setting link color!",('object',target_name,'rgba'),rgba_color
[docs] def setTransform(self,name,R=so3.identity(),t=[0]*3,matrix=None): """Sets the transform of the target object. If matrix is given, it's a 16-element array giving the 4x4 homogeneous transform matrix, in row-major format. Otherwise, R and t are the 9-element klampt.so3 rotation and 3-element translation.""" if matrix is not None: self._do_rpc({'type':'set_transform','object':name,'matrix':matrix}) else: self._do_rpc({'type':'set_transform','object':name,'matrix':[R[0],R[3],R[6],t[0],R[1],R[4],R[7],t[1],R[2],R[5],R[8],t[2],0,0,0,1]})
[docs] def addGhost(self,name="ghost",robot=0): """Adds a ghost configuration of the robot that can be posed independently. name can be set to identify multiple ghosts. The identifier of the ghost in the three.js scene is prefixname + robot.getName(), and all the links are identified by prefixname + link name.""" if robot < 0 or robot >= self.world.numRobots(): raise ValueError("Invalid robot specified") target_name=self.world.robot(robot).getName() self._do_rpc({'type':'add_ghost','object':target_name,'prefix_name':name}) self._extras[name] = ('Config',name+target_name) return name
[docs] def getRobotConfig(self,robot=0): """A convenience function. Gets the robot's configuration in the visualization world.""" if robot < 0 or robot >= self.world.numRobots(): raise ValueError("Invalid robot specified") robot = self.world.robot(robot) q = robot.getConfig() return q
[docs] def setGhostConfig(self,q,name="ghost",robot=0): """Sets the configuration of the ghost to q. If the ghost is named, place its name in prefixname.""" if robot < 0 or robot >= self.world.numRobots(): raise ValueError("Invalid robot specified") robot = self.world.robot(robot) q_original = robot.getConfig() if len(q) != robot.numLinks(): raise ValueError("Config must be correct size: %d != %d"%(len(q),robot.numLinks())) robot.setConfig(q) self.beginRpc(strict=False) rpcs = [] for i in range(robot.numLinks()): T = robot.link(i).getTransform() p = robot.link(i).getParent() if p>=0: Tp = robot.link(p).getTransform() T = se3.mul(se3.inv(Tp),T) mat = se3.homogeneous(T) #mat is now a 4x4 homogeneous matrix linkname = name+robot.link(i).getName() #send to the ghost link with name "name"... self._do_rpc({'type':'set_transform','object':linkname,'matrix':[mat[0][0],mat[0][1],mat[0][2],mat[0][3],mat[1][0],mat[1][1],mat[1][2],mat[1][3],mat[2][0],mat[2][1],mat[2][2],mat[2][3],mat[3][0],mat[3][1],mat[3][2],mat[3][3]]}) self.endRpc(strict=False) robot.setConfig(q_original) #restore original config
[docs] def addText(self,name="HUD_Text1",text="",position=None): """Adds a new piece of text displayed on the screen. name is a unique identifier of the text, and position=(x,y) are the coordinates of upper left corner of the the text, in percent. """ if position is None: x,y = None,None else: x,y = position self._extras[name] = ('Text',(x,y,text)) self._do_rpc({'type':'add_text','name':name,'x':x,'y':y,'text':text})
[docs] def addSphere(self,name="Sphere1",x=0,y=0,z=0,r=1): """Adds a new sphere to the world with the given x,y,z position and radius r.""" self._extras[name] = ('Sphere',(x,y,z,r)) self._do_rpc({'type':'add_sphere','name':name,'x':x,'y':y,'z':z,'r':r})
[docs] def addLine(self,name="Line1",x1=0,y1=0,z1=0,x2=1,y2=1,z2=1): """Adds a new line segment to the world connecting point (x1,y1,z1) to (x2,y2,z2)""" verts = [x1,y1,z1,x2,y2,z2] self._extras[name] = ('Line',verts) self._do_rpc({'type':'add_line','name':name,'verts':verts})
[docs] def addXform(self,name="Xform1",length=DEFAULT_AXIS_LENGTH,width=DEFAULT_AXIS_WIDTH): """Adds a new transform widget to the world with the given line length and width""" self._extras[name] = ('RigidTransform',(length,width)) self._do_rpc({'type':'add_xform','name':name,'length':length,'width':width})
[docs] def addPolyline(self,name="Line1",pts=[]): """Adds a new polygonal line segment to the world connecting the given list of 3-tuples""" verts = sum(pts,[]) self._extras[name] = ('Line',verts) self._do_rpc({'type':'add_line','name':name,'verts':verts})
[docs] def addTriangle(self,name="Tri1",a=(0,0,0),b=(1,0,0),c=(0,1,0)): """Adds a new triangle with vertices a,b,c. a,b, and c are 3-lists or 3-tuples.""" verts = a+b+c self._extras[name] = ('Trilist',verts) self._do_rpc({'type':'add_trilist','name':name,'verts':verts})
[docs] def addQuad(self,name="Quad1",a=(0,0,0),b=(1,0,0),c=(1,1,0),d=(0,1,0)): """Adds a new quad (in CCW order) with vertices a,b,c,d. a,b,c and d are 3-lists or 3-tuples.""" verts = a+b+c+a+c+d self._extras[name] = ('Trilist',verts) self._do_rpc({'type':'add_trilist','name':name,'verts':verts})
[docs] def addBillboard(self,name="Billboard",image=[[]],format='auto',crange=[0,1],colormap='auto',filter='linear',size=(1,1)): """Adds a 2D billboard to the world. The image is a 2D array of values, which is texure-mapped to a quad. By default, the billboard is centered at (0,0,0) and faces up. To modify its location or orientation, call ``setTransform`` on it. Args: name (str): the name used to refer to this item image (list of lists or str): a 2D array of single-channel values, (r,g,b) tuples, or (r,g,b,a) tuples. Rows are listed top to bottom, rows from left to right. Or, can also be a URL. format (str, optional): The image format. Can be: * 'auto': autodetect the type from the image. If the image contains values, the format is 'value'. * 'value': the values are mapped through either 'opacity', 'rainbow', or gradient color mapping. * 'rgb': if the image contains values, they are interpreted as RGB values packed in 24 bit integers. Otherwise, the first 3 channels of the tuple are used. * 'rgba': if the image contains values, they are interpreted as RGB values packed in 32 bit integers. Otherwise, they are assumed to be (r,g,b,a) tuples crange (pair of numbers, optional): the range of the given values / channels. By default [0,1], but if you are using uint8 encoding this should be set to [0,255]. colormap (optional): how the color of the billboard should be set based on the image. Valid values are: * 'auto': if the image contains values, the gradient ((0,0,0),(1,1,1)) is used. Otherwise 'replace' is used. * (color1,color2): interpolates between the two given (r,g,b) or (r,g,b,a) tuples. * 'opacity': sets the alpha channel only. * 'modulate': the value / rgb / rgba texture modulates the billboard color as set by setColor filter (str, optional): how values between pixels are interpolated. Either 'nearest' or 'linear'. size (pair of numbers, optional): the (width,height) pair of the billboard, in world units. """ if not isinstance(image,str): import struct import base64 bytes = [] w,h = None,None h = len(image) for row in image: if w == None: w = len(row) else: assert w == len(row),"Image is not a 2D array" pixel = image[0][0] if format == 'auto': if hasattr(pixel,'__iter__'): if len(pixel) == 4: format = 'rgba' else: format = 'rgb' else: format = 'value' else: if not hasattr(pixel,'__iter__'): format = 'p'+format gradient = (type(colormap) != str) for row in image: for pixel in row: if format == 'value': u = min(1,max(0,(pixel - crange[0]) / (crange[1]-crange[0]))) if gradient: color = vectorops.interpolate(gradient[0],gradient[1],u) r = 0xff * min(1,max(0,color[0])) g = 0xff * min(1,max(0,color[1])) b = 0xff * min(1,max(0,color[2])) packed = (0xff << 24) | (int(b) << 16) | (int(g) << 8) | int(r) bytes.append(struct.pack('<I',packed)) else: val = 0xff * u bytes.append(struct.pack('B',val)) elif format == 'prgb' or format == 'prgba': bytes.append(struct.pack('<I', pixel)) elif format == 'rgb': r = 0xff * min(1,max(0,(pixel[0] - crange[0]) / (crange[1]-crange[0]))) g = 0xff * min(1,max(0,(pixel[1] - crange[0]) / (crange[1]-crange[0]))) b = 0xff * min(1,max(0,(pixel[2] - crange[0]) / (crange[1]-crange[0]))) packed = (0xff << 24) | (int(b) << 16) | (int(g) << 8) | int(r) bytes.append(struct.pack('<I', packed)) elif format == 'rgba': r = 0xff * min(1,max(0,(pixel[0] - crange[0]) / (crange[1]-crange[0]))) g = 0xff * min(1,max(0,(pixel[1] - crange[0]) / (crange[1]-crange[0]))) b = 0xff * min(1,max(0,(pixel[2] - crange[0]) / (crange[1]-crange[0]))) a = 0xff * min(1,max(0,(pixel[3] - crange[0]) / (crange[1]-crange[0]))) packed = (int(a) << 24) | (int(b) << 16) | (int(g) << 8) | int(r) bytes.append(struct.pack('<I', packed)) else: raise ValueError("Invalid format "+format) image = base64.b64encode(''.join(bytes)) self._do_rpc({'type':'add_billboard','name':name,'imagedata':image,'width':w,'height':h,'size':size,'filter':filter,'colormap':colormap}) else: self._do_rpc({'type':'add_billboard','name':name,'image':image,'size':size,'filter':filter,'colormap':colormap}) self._extras[name] = ('Billboard',image)
[docs] def beginRpc(self,strict=False): """Begins collecting a set of RPC calls to be sent at once, which is a bit faster than doing multiple addX or setX calls. Usage:: widget.beginRpc() widget.addX() ... widget.setX() widget.endRpc() #this sends all the messages at once """ if self._aggregating_rpc == 0: assert len(self._rpc_calls)==0 if self._aggregating_rpc != 0 and strict: raise RuntimeError("Each beginRpc() call must be ended with an endRpc() call") self._aggregating_rpc += 1 return
def _do_rpc(self,msg): """Internally used to send or queue an RPC call""" if self._aggregating_rpc: self._rpc_calls.append(msg) else: self.rpc = msg
[docs] def endRpc(self,strict=False): """Ends collecting a set of RPC calls to be sent at once, and sends the accumulated message""" if self._aggregating_rpc <= 0 or (self._aggregating_rpc!=1 and strict): raise ValueError("Each beginRpc() call must be ended with an endRpc() call") self._aggregating_rpc -= 1 if self._aggregating_rpc == 0 and len(self._rpc_calls) > 0: self.rpc = {'type':'multiple','calls':self._rpc_calls} self._rpc_calls = []
@observe('_camera') def _recv_camera(self,cam): #trigger an update? #print("Klampt widget received '_camera' message") marked = cam['new'].copy() marked['r'] = 1 self._camera = marked @observe('events') def _recv_events(self,events): elist = events['new'] if len(elist) > 0: for event in elist: self.on_event(event) self.events = [] @observe('drawn') def _recv_drawn(self,drawn): self.drawn = 0 self.displayed = True #print("Klampt widget received 'drawn' message")
[docs] def on_event(self,e): print("KlamptWidget got event",e)
[docs]def EditConfig(robot,klampt_widget=None,ghost=None,link_selector='slider',link_subset=None,callback=None): """Creates a Jupyter widget for interactive editing of the robot's configuration. Args: robot (RobotModel): the robot to edit klampt_widget (KlamptWidget, optional): the KlamptWidget visualization to update, or None if you don't want to visualize the editing. ghost (str, optional): if not None, this is the name of the ghost that should be updated. Widget updates are shown on the given ghost rather than the actual robot. To get the ghost configuration, you'll need to update the callback. link_selector (str): how to select links. Either: * 'slider': uses an IntSlider widget * 'dropdown': uses a Dropdown widget * 'all': shows sliders for all links link_subset (list, optional): if given, only a subset of links are shown. Otherwise, only non-fixed links are shown. callback (function, optional): a function callback(index,q) called when a DOF's value has changed. Returns: VBox: a widget to be displayed as you like """ qmin,qmax = robot.getJointLimits() qedit = robot.getConfig()[:] if link_subset == None: link_subset = [i for i in range(robot.numLinks()) if qmin[i] != qmax[i]] else: for link in link_subset: if link < 0 or link >= robot.numLinks(): raise ValueError("Invalid link specified in link_subset") link_subset = link_subset[:] def _dochange_link(link): if not math.isinf(qmin[link]): joint_slider.min = qmin[link] joint_slider.max = qmax[link] else: joint_slider.min = -2 joint_slider.max = 2 joint_slider.value = qedit[link] if klampt_widget and ghost == None: #show selected link in color #restore old colors klampt_widget.beginRpc() for i in link_subset: ilink = robot.link(i) KlamptWidget.setColor(klampt_widget,ilink,*ilink.appearance().getColor()) #change new color color = robot.link(link).appearance().getColor() r,g,b,a = color r = 1.0-(1.0-r)*0.5 g = 1.0-(1.0-g)*0.5 KlamptWidget.setColor(klampt_widget,link,r,g,b,a) klampt_widget.endRpc() def _dochange(link,value): if ghost: qold = robot.getConfig() qedit[link] = value robot.setConfig(qedit) if klampt_widget: if ghost: klampt_widget.setGhostConfig(qedit,ghost,robot.index) else: klampt_widget.update() if ghost: robot.setConfig(qold) if callback: callback(link,qedit) if link_selector == 'slider': link_slider=widgets.IntSlider(description='Link',min=0,max=len(link_subset)-1,value=0) joint_slider=widgets.FloatSlider(description='Value',min=0,max=1,value=0.5,step=0.001) _dochange_link(link_subset[0]) @interact(index=link_slider) def change_link(index): link = link_subset[index] _dochange_link(link) link_slider.observe(lambda change:change_link(change['new']),'value') def change_joint_value(value): link = link_subset[link_slider.value] _dochange(link,value) joint_slider.observe(lambda change:change_joint_value(change['new']),'value') return widgets.VBox([link_slider,joint_slider]) elif link_selector == 'dropdown': link_dropdown=widgets.Dropdown(description='Link',options=[robot.link(i).getName() for i in link_subset],value=robot.link(link_subset[0]).getName()) joint_slider=widgets.FloatSlider(description='Value',min=0,max=1,value=0.5,step=0.001) _dochange_link(link_subset[0]) def change_link(name): link = robot.link(name).index _dochange_link(link) link_dropdown.observe(lambda change:change_link(change['new']),'value') def change_joint_value(value): link = robot.link(link_dropdown.value).index _dochange(link,value) joint_slider.observe(lambda change:change_joint_value(change['new']),'value') return widgets.VBox([link_dropdown,joint_slider]) elif link_selector == 'all': sliders = [] for link in link_subset: sliders.append(widgets.FloatSlider(description=robot.link(link).getName(),min=qmin[link],max=qmax[link],value=qedit[link],step=0.001)) sliders[-1].observe(lambda value,link=link:_dochange(link,value['new']),'value') return widgets.VBox(sliders) else: raise ValueError("Invalid link_selector, must be slider, dropdown, or all")
[docs]def EditPoint(value=None,min=None,max=None,labels=None, klampt_widget=None,point_name='edited_point',point_radius=DEFAULT_POINT_RADIUS, callback=None): """Creates a Jupyter widget for interactive editing of an xyz point Args: value (list of 3 floats, optional): the initial value of the point. If given, this must be a list and will hold the edited values. min/max (list of 3 floats, optional): the minimum and maximum of the point labels (list of strs, optional): if given, the labels of each channel klampt_widget (KlamptWidget, optional): the KlamptWidget visualization to update, or None if you don't want to visualize the point. point_name (str, optional): the name of the point in the visualization world to edit. point_radius (float, optional): the radius of the visualized point. callback (function ,optional): a function callback(xyz) called when a DOF's value has changed. Returns: VBox: a widget that can be displayed as you like """ if value is None: value = [0,0,0] else: if not isinstance(value,list): raise ValueError("value must be a 3-element list") if len(value) != 3: raise ValueError("value must be a 3-element list") if labels is None: labels = 'xyz' if min is None: min = vectorops.add(value,[-5,-5,-5]) elif isinstance(min,(int,float)): min = [min,min,min] if max is None: max = vectorops.add(value,[5,5,5]) elif isinstance(max,(int,float)): max = [max,max,max] if len(min) != 3: raise ValueError("min must be a 3-element list") if len(max) != 3: raise ValueError("max must be a 3-element list") if klampt_widget: klampt_widget.addSphere(name=point_name,x=value[0],y=value[1],z=value[2],r=point_radius) def _dochange(index,element): value[index] = element if klampt_widget: klampt_widget.addSphere(name=point_name,x=value[0],y=value[1],z=value[2],r=point_radius) if callback: callback(value) elems = [] for i in range(3): elems.append(widgets.FloatSlider(description=labels[i],value=value[i],min=min[i],max=max[i],step=0.001)) elems[-1].observe(lambda v,i=i:_dochange(i,v['new']),'value') return widgets.VBox(elems)
[docs]def EditTransform(value=None,xmin=None,xmax=None,labels=None, klampt_widget=None,xform_name='edited_xform',axis_length=DEFAULT_AXIS_LENGTH,axis_width=DEFAULT_AXIS_WIDTH, callback=None): """Creates a Jupyter widget for interactive editing of a rigid transform point Args: value (klampt.se3 element), optional: the initial value of the transform (klampt.se3 element). If given as (R,t), the R and t members must be lists and will hold the edited values. xmin/xmax (list of 3 floats, optional): the minimum and maximum of the translation labels (list of strs, optional): if given, the labels of roll,pitch,yaw and x,y,z klampt_widget (KlamptWidget, optional): the KlamptWidget visualization to update, or None if you don't want to visualize the point. xform_name (str, optional): the name of the xform in the visualization world to edit. axis_length,axis_width (float, optional): the length and width of the visualized widget callback (function, optional): a function callback((R,t)) called when a DOF's value has changed. Returns: VBox: a widget that can be displayed as you like """ if value is None: value = se3.identity() else: if not isinstance(value,(tuple,list)): raise ValueError("value must be a 2-element sequence") if len(value) != 2: raise ValueError("value must be a 2-element sequence") if len(value[0]) != 9: raise ValueError("value[0] must be a 9-element list") if len(value[1]) != 3: raise ValueError("value[1] must be a 3-element list") if labels is None: labels = ['roll','pitch','yaw','x','y','z'] if xmin is None: xmin = vectorops.add(value[1],[-5,-5,-5]) elif isinstance(xmin,(int,float)): xmin = [xmin,xmin,xmin] if xmax is None: xmax = vectorops.add(value[1],[5,5,5]) elif isinstance(xmax,(int,float)): xmax = [xmax,xmax,xmax] if len(xmin) != 3: raise ValueError("xmin must be a 3-element list") if len(xmax) != 3: raise ValueError("xmax must be a 3-element list") if klampt_widget: klampt_widget.addXform(name=xform_name,length=axis_length,width=axis_width) klampt_widget.setTransform(name=xform_name,R=value[0],t=value[1]) rpy = list(so3.rpy(value[0])) def _do_rotation_change(index,element): rpy[index] = element value[0][:] = so3.from_rpy(rpy) if klampt_widget: klampt_widget.setTransform(name=xform_name,R=value[0],t=value[1]) if callback: callback(value) def _do_translation_change(index,element): value[1][index] = element if klampt_widget: klampt_widget.setTransform(name=xform_name,R=value[0],t=value[1]) if callback: callback(value) elems = [] for i in range(3): elems.append(widgets.FloatSlider(description=labels[i],value=rpy[i],min=0,max=math.pi*2,step=0.001)) elems[-1].observe(lambda v,i=i:_do_rotation_change(i,v['new']),'value') for i in range(3): elems.append(widgets.FloatSlider(description=labels[3+i],value=value[1][i],min=xmin[i],max=xmax[i],step=0.001)) elems[-1].observe(lambda v,i=i:_do_translation_change(i,v['new']),'value') return widgets.VBox(elems)
[docs]class Playback(widgets.VBox): """A play/pause/reset widget associated with a KlamptWidget. Attributes: klampt_widget (KlamptWidget, optional): the widget that should be updated after each advance call advance (function, optional): a function to be called for each new frame. pause (function, optional): a function to be called when pause is clicked. reset (function, optional): a function to be called when reset is clicked. maxframes (int, optional): the maximum number of frames. If None, this is unlimited. framerate (int, optional): number of frames per second desired. If None, frames are run as quickly as possible quiet (bool): if True, suppresses output during play playbutton, stepbutton, pausebutton, resetbutton (Button): the Button widgets """ def __init__(self,klampt_widget=None,advance=None,reset=None,pause=None,maxframes=None,framerate=None,quiet=False): self.klampt_widget = klampt_widget self.advance = advance self.reset = reset self.pause = pause self.maxframes = maxframes self.framerate = framerate self.quiet = quiet self.playbutton = widgets.Button( description='Play', disabled=False, button_style='', # 'success', 'info', 'warning', 'danger' or '' tooltip='Start the animation', icon='play') self.stepbutton = widgets.Button( description='Step', disabled=False, button_style='', # 'success', 'info', 'warning', 'danger' or '' tooltip='Step the animation', icon='step-forward') self.pausebutton = widgets.Button( description='Pause', disabled=True, button_style='', # 'success', 'info', 'warning', 'danger' or '' tooltip='Pause the animation', icon='pause') self.resetbutton = widgets.Button( description='Reset', disabled=False, button_style='', # 'success', 'info', 'warning', 'danger' or '' tooltip='Reset the animation', icon='undo') lock = threading.Lock() playdata = {'thread':None,'stop':0} self.playdata = playdata self.lock = lock self.frame = 0 #If we don't create this now, exceptions will never be printed self.out = widgets.Output() def play_thread_func(lock,playdata): if self.framerate is None: dt = 0 else: dt = 1.0/self.framerate playdata['stop'] = 0 playdata['last_frame_time'] = time.time() def do_advance(drawn=False): if playdata['stop']: return lock.acquire() try: self._advance() except Exception as e: with self.out: print("Exception occurred during Playback.advance, stopping animation") print(e) playdata['stop'] = 1 lock.release() return if self.klampt_widget and dt==0: self.klampt_widget.beginRpc() self.klampt_widget.add("__temp",[time.time(),0,0]) self.klampt_widget.remove("__temp") self.klampt_widget.endRpc() self.frame += 1 lock.release() if self.klampt_widget and dt==0: self.klampt_widget.observe(do_advance,'drawn') #kick it off with an update do_advance() t0 = time.time() while True: if playdata['stop']: break lock.acquire() if self.maxframes is not None and self.frame >= self.maxframes: #print "Stopping play by completion" self.playbutton.disabled = False self.pausebutton.disabled = True self.frame = 0 lock.release() break lock.release() if self.klampt_widget and dt==0: time.sleep(0.05) else: do_advance() t1 = time.time() time.sleep(max(dt-(t1-t0),0)) t0 = time.time() if self.klampt_widget and dt==0: self.klampt_widget.unobserve(do_advance,'drawn') playdata['thread'] = None return def on_play(b): #print "Play clicked" self.pausebutton.disabled = False self.playbutton.disabled = True assert playdata['thread'] == None playdata['thread'] = threading.Thread(target=play_thread_func,args=(lock,playdata)) playdata['thread'].start() def on_pause(b): #print "Pause clicked" self.stop() self._pause() def on_step(b): #print "Step clicked" self.stop() self.frame += 1 self._advance() def on_reset(b): #print "Reset clicked" self.stop() self.frame = 0 self.out.clear_output() self._reset() self.playbutton.on_click(on_play) self.stepbutton.on_click(on_step) self.pausebutton.on_click(on_pause) self.resetbutton.on_click(on_reset) widgets.VBox.__init__(self,[widgets.HBox([self.playbutton,self.stepbutton,self.pausebutton,self.resetbutton]), self.out])
[docs] def stop(self): """Stops any ongoing playback""" lock = self.lock playdata = self.playdata if playdata['thread'] is not None: #playing lock.acquire() playdata['stop'] = 1 lock.release() playdata['thread'].join() playdata['thread'] = None playdata['stop'] = 0 self.pausebutton.disabled = True self.playbutton.disabled = False
def _advance(self): if self.advance: if self.quiet: self.advance() else: with self.out: self.advance() if self.klampt_widget: self.klampt_widget.update() def _reset(self): if self.reset: with self.out: self.reset() if self.klampt_widget: self.klampt_widget.update() def _pause(self): if self.pause: with self.out: self.pause() if self.klampt_widget: self.klampt_widget.update()