Source code for klampt.control.networkrobotinterface

"""Classes for building networked RIL servers and clients.

.. versionadded:: 0.9
"""

from .robotinterface import RobotInterfaceBase
from .robotinterfaceutils import _RobotInterfaceState,_RobotInterfaceStructure,_RobotInterfaceSettings,_RobotInterfaceCommand,_RobotInterfaceStatefulBase,_RobotInterfaceStatefulWrapper
from .robotinterfaceutils import _split_settings,_split_state,_gather_settings,_gather_commands,_update_from_settings
from ..model.subrobot import SubRobotModel
from xmlrpc.server import SimpleXMLRPCServer
from xmlrpc.client import ServerProxy, Marshaller
import signal
import traceback
from functools import wraps
import sys
import copy
from typing import Dict,Tuple,List,Sequence,Any
import warnings

# #helps when numpy is used
# Marshaller.dispatch[np.float64] = Marshaller.dump_double
# Marshaller.dispatch[np.ndarray] = Marshaller.dump_array



[docs]class ClientRobotInterfaceBase(_RobotInterfaceStatefulBase): """Base class for a robot interface that communicates with a server. begin/endStep() will need to be called repeatedly, and on endStep() all the communication is performed. """ def __init__(self): _RobotInterfaceStatefulBase.__init__(self) self._partInterfaces = dict() #type: Dict[str,_RobotInterfaceStatefulBase] self._initialized = False
[docs] def connect(self) -> bool: raise NotImplementedError("Subclass must handle this")
[docs] def disconnect(self) -> bool: raise NotImplementedError("Subclass must handle this")
[docs] def getInitialData(self) -> Tuple[Dict,_RobotInterfaceStructure,_RobotInterfaceSettings,_RobotInterfaceState]: raise NotImplementedError("Subclass must handle this")
[docs] def updateSettingsAndCommands(self, settings : _RobotInterfaceSettings, commands : List[_RobotInterfaceCommand]) -> _RobotInterfaceState: raise NotImplementedError("Subclass must handle this")
[docs] def partInterface(self,part,joint_idx=None): if joint_idx is not None: raise NotImplementedError("TODO: part interfaces for individual joint indices") return self._partInterfaces[part]
[docs] def initialize(self): if self._initialized: raise RuntimeError("initialize() can only be called once") if not self.connect(): return False properties, structure, settings, state = self.getInitialData() self.properties = properties self._structure = structure self._settings = settings self._state = state self._structure.klamptModel = self.klamptModel() if self._structure.klamptModel is None: warnings.warn("KlamptModel could not be loaded: {}".format(self.properties.get('klamptModelFile',None))) input() for (k,inds) in self._structure.parts.items(): if k is None: continue partInterface = _RobotInterfaceStatefulBase() partInterface._structure.controlRate = self._structure.controlRate partInterface._structure.numJoints = len(inds) if self._structure.jointNames is not None: partInterface._structure.jointNames = [self._structure.jointNames[i] for i in inds] if self._structure.klamptModel is not None: partInterface._structure.klamptModel = SubRobotModel(self._structure.klamptModel,inds) self._partInterfaces[k] = partInterface settings = _split_settings(self._settings,self._structure.parts) for (k,setting) in settings.items(): self._partInterfaces[k]._settings = setting self._initialized = True return True
[docs] def close(self): if self._initialized: if not self.disconnect(): return False return True
[docs] def beginStep(self): if self._commands: raise RuntimeError("Interface didn't clear command queue?") #distribute prior command state changes states = _split_state(self._state,self._structure.parts) for (k,state) in states.items(): self._partInterfaces[k]._state = state #zero out settings delta self._delta_settings = _RobotInterfaceSettings() for (k,iface) in self._partInterfaces.items(): iface._delta_settings = _RobotInterfaceSettings()
[docs] def endStep(self): #read settings changes, command changes and pass to interface assert self._delta_settings is not None,"Didn't call beginStep?" _gather_settings(self._structure.parts,dict((k,iface._delta_settings) for k,iface in self._partInterfaces.items()),self._delta_settings) cmds = _gather_commands([self._commands]+[iface._commands for iface in self._partInterfaces.values()], [self.indices()]+[self._structure.parts[k] for k in self._partInterfaces]) try: next_state = self.updateSettingsAndCommands(self._delta_settings,cmds) if next_state is None: self._state.status = 'server error' else: self._state = next_state except Exception: #must have disconnected self._state.status = 'disconnected' self._commands = [] self._delta_settings = None #clear part command queues for (k,iface) in self._partInterfaces.items(): iface._commands = []
[docs] def reset(self): if self._state.status == 'disconnected': self._initialized = False if self.initialize(): self._state.status = 'ok' else: super().reset()
[docs]class ServerRobotInterfaceBase(_RobotInterfaceStatefulBase): """Base class for an asynchronous robot interface that serves clients. Subclass will need to start the controller thread, and then start a thread that receives RPC calls and call getInitialData_impl and updateSettingsAndCommands_impl in response. You'll need to pair your server with a client that implements the appropriate protocols, e.g., :class:`XMLRPCRobotInterfaceServer` / :class:`XMLRPCRobotInterfaceClient` Usage (simplified, doesn't do error handling):: #TODO: define MyServer as subclass of ServerRobotInterfaceBase server = MyServer(addr,interface) if not server.initialize(): print("Error initializing") exit(1) server.startController() #serve the client quit = False client = None while not quit: if client is None: client = server.accept() else: func, args = server.read_rpc() if func == 'getInitialData': res = server.getInitialData_impl() server.respond_rpc(res) elif func == 'updateSettingsAndCommands': res = server.updateSettingsAndCommands_impl(*args) server.respond_rpc(res) else: raise RuntimeError("Invalid RPC call?") time.sleep(0.01) #some poll rate server.stop() server.close() """ def __init__(self, interface : RobotInterfaceBase): _RobotInterfaceStatefulBase.__init__(self) if not isinstance(interface,_RobotInterfaceStatefulBase): self._base = _RobotInterfaceStatefulWrapper(interface) else: self._base = interface self._base_initialized = False from threading import RLock,Thread self._thread = None #type: Thread self._lock = RLock() self._properties= None self._structure = None self._settings = None self._state = None self._commands = None self._stop = False
[docs] def getInitialData_impl(self) -> Tuple[Dict,_RobotInterfaceStructure,_RobotInterfaceSettings,_RobotInterfaceState]: assert self._base_initialized with self._lock: self._settings = copy.deepcopy(self._base._settings) self._state = copy.deepcopy(self._base._state) return self.properties,self._structure,self._settings,self._state
[docs] def updateSettingsAndCommands_impl(self, delta_settings : _RobotInterfaceSettings, commands : List[_RobotInterfaceCommand]) -> _RobotInterfaceState: with self._lock: _update_from_settings(None,self._base._delta_settings,delta_settings) self._base._commands += commands self._state = copy.deepcopy(self._base._state) return self._state
[docs] def initialize(self): assert not self._base_initialized print("Server initializing base interface",self._base) if not self._base.initialize(): return False self._base_initialized = True self.properties = copy.deepcopy(self._base.properties) tempRobot = self._base._structure.klamptModel self._base._structure.klamptModel = None self._structure = copy.deepcopy(self._base._structure) self._base._structure.klamptModel = tempRobot return True
def _advance(self): from klampt.control.utils import TimedLooper dt = 1.0 / self._base.controlRate() looper = TimedLooper(dt) with self._lock: self._base.beginStep() while looper and not self._stop: with self._lock: self._base.endStep() self._base.beginStep() self._base.endStep()
[docs] def startController(self): from threading import Thread self._thread = Thread(target=self._advance) self._thread.daemon = True self._stop = False self._thread.start() def sigint_handler(signum,frame): self.close() sys.exit(0) signal.signal(signal.SIGINT, sigint_handler)
[docs] def stop(self): """Asks to stop motion -- perhaps call this when all clients disconnect""" with self._lock: self._base.softStop()
[docs] def close(self): """Subclasses might want to override this to close clients as well.""" self._stop = True if self._thread is not None: self._thread.join() if self._base_initialized: self._base.close()
[docs]class XMLRPCRobotInterfaceServer(ServerRobotInterfaceBase): """A XMLRPC-based server for a RIL robot controller. Usage Args: interface (RobotInterfaceBase): the interface to serve ip (str): the IP address (usually '128.0.0.1' or 'localhost') port (int): the port of the listening socket. 7881 is used by default. """ def __init__(self, interface: RobotInterfaceBase, ip:str, port=7881): ServerRobotInterfaceBase.__init__(self,interface) self.ip = ip self.port = port print("Opening RIL server on",ip,port) self.server = SimpleXMLRPCServer((ip,port), logRequests=False, allow_none=True) self.started = False self.server.register_introspection_functions() self._register(self.getInitialData,'getInitialData') self._register(self.updateSettingsAndCommands,'updateSettingsAndCommands')
[docs] def log(self,err): """Logs an error. Can be overloaded.""" print(err)
[docs] def serve(self): if not ServerRobotInterfaceBase.initialize(self): raise RuntimeError("RobotInterface of type {} could not be initialized".format(self.interface)) self.startController() self.server.serve_forever()
[docs] def getInitialData(self): props,struct,settings,state = self.getInitialData_impl() return (props,struct.to_json(),settings.to_json(),state.to_json())
[docs] def updateSettingsAndCommands(self, delta_settings_json, commands_json): delta_settings = _RobotInterfaceSettings() commands = [] delta_settings.from_json(delta_settings_json) for cmd_json in commands_json: cmd = _RobotInterfaceCommand() cmd.from_json(cmd_json) commands.append(cmd) state = super().updateSettingsAndCommands_impl(delta_settings, commands) return state.to_json()
def _register(self,func,name=None): """ Registers a function to the xmlrpc server under the given name. """ if name is None: name = func.__name__ @wraps(func) def wrapper(*args, **kwargs): try: return func(*args, **kwargs) except Exception as e: tb = traceback.format_exc() self.log(tb) raise e self.server.register_function(wrapper, name)
[docs]class XMLRPCRobotInterfaceClient(ClientRobotInterfaceBase): """An XMLRPC-based client that connects to a RobotInterfaceServer. Args: addr (str): the IP address of the server, including port. Localhost port 7881 is used by default. """ def __init__(self,addr='http://localhost:7881'): ClientRobotInterfaceBase.__init__(self) self.addr = addr self.context_mgr = None #type: ServerProxy self.s = None #type: ServerProxy
[docs] def connect(self) -> bool: print("Opening RIL client on",self.addr) try: self.context_mgr = ServerProxy(self.addr, allow_none=True) self.s = self.context_mgr.__enter__() except Exception as e: print("Error connecting: ",e) return False return True
[docs] def disconnect(self) -> bool: if self.context_mgr is None: return True self.context_mgr.__exit__() self.context_mgr = None self.s = None
[docs] def getInitialData(self) -> Tuple[Dict,_RobotInterfaceStructure,_RobotInterfaceSettings,_RobotInterfaceState]: props,struct_json,settings_json,state_json = self.s.getInitialData() struct = _RobotInterfaceStructure() struct.from_json(struct_json) settings = _RobotInterfaceSettings() settings.from_json(settings_json) state = _RobotInterfaceState() state.from_json(state_json) return (props,struct,settings,state)
[docs] def updateSettingsAndCommands(self, delta_settings : _RobotInterfaceSettings, commands : List[_RobotInterfaceCommand]) -> _RobotInterfaceState: delta_settings_json = delta_settings.to_json() commands_json = [cmd.to_json() for cmd in commands] state_json = self.s.updateSettingsAndCommands(delta_settings_json,commands_json) state = _RobotInterfaceState() state.from_json(state_json) return state