klampt.control.networkrobotinterface module

Classes for building networked RIL servers and clients.

New in version 0.9.

Classes:

ClientRobotInterfaceBase()

Base class for a robot interface that communicates with a server.

ServerRobotInterfaceBase(interface)

Base class for an asynchronous robot interface that serves clients.

XMLRPCRobotInterfaceServer(interface, ip[, port])

A XMLRPC-based server for a RIL robot controller.

XMLRPCRobotInterfaceClient([addr])

An XMLRPC-based client that connects to a RobotInterfaceServer.

class klampt.control.networkrobotinterface.ClientRobotInterfaceBase[source]

Bases: klampt.control.robotinterfaceutils._RobotInterfaceStatefulBase

Base class for a robot interface that communicates with a server. begin/endStep() will need to be called repeatedly, and on endStep() all the communication is performed.

Methods:

connect()

rtype

bool

disconnect()

rtype

bool

getInitialData()

rtype

Tuple[Dict, _RobotInterfaceStructure, _RobotInterfaceSettings, _RobotInterfaceState]

updateSettingsAndCommands(settings, commands)

rtype

_RobotInterfaceState

partInterface(part[, joint_idx])

Returns a RobotInterfaceBase that allows control of the given part/joint.

initialize()

Tries to connect to the robot.

close()

Cleanly shuts down any resources acquired using initialize().

beginStep()

This is called before the commands sent at each time step

endStep()

This is called after the commands sent at each time step

reset()

If the robot has a non-normal status code, attempt to reset it to normal operation.

connect()[source]
Return type

bool

disconnect()[source]
Return type

bool

getInitialData()[source]
Return type

Tuple[Dict, _RobotInterfaceStructure, _RobotInterfaceSettings, _RobotInterfaceState]

updateSettingsAndCommands(settings, commands)[source]
Return type

_RobotInterfaceState

partInterface(part, joint_idx=None)[source]

Returns a RobotInterfaceBase that allows control of the given part/joint. If no such controller exists, raises a NotImplementedError.

The part/joint controller should operate on exactly the DOFs specified by self.indices(part,joint_idx).

initialize()[source]

Tries to connect to the robot. Returns true if ready to send commands. This should probably be the first method called.

close()[source]

Cleanly shuts down any resources acquired using initialize().

beginStep()[source]

This is called before the commands sent at each time step

endStep()[source]

This is called after the commands sent at each time step

reset()[source]

If the robot has a non-normal status code, attempt to reset it to normal operation. The caller should poll until status()==’ok’

class klampt.control.networkrobotinterface.ServerRobotInterfaceBase(interface)[source]

Bases: klampt.control.robotinterfaceutils._RobotInterfaceStatefulBase

Base class for an asynchronous robot interface that serves clients. Subclass will need to start the controller thread, and then start a thread that receives RPC calls and call getInitialData_impl and updateSettingsAndCommands_impl in response.

You’ll need to pair your server with a client that implements the appropriate protocols, e.g., XMLRPCRobotInterfaceServer / XMLRPCRobotInterfaceClient

Usage (simplified, doesn’t do error handling):

#TODO: define MyServer as subclass of ServerRobotInterfaceBase

server = MyServer(addr,interface)
if not server.initialize():
    print("Error initializing")
    exit(1)
server.startController()

#serve the client
quit = False
client = None
while not quit:
    if client is None:
        client = server.accept()
    else:
        func, args = server.read_rpc()
        if func == 'getInitialData':
            res = server.getInitialData_impl()
            server.respond_rpc(res)
        elif func == 'updateSettingsAndCommands':
            res = server.updateSettingsAndCommands_impl(*args)
            server.respond_rpc(res)
        else:
            raise RuntimeError("Invalid RPC call?")
    time.sleep(0.01) #some poll rate
server.stop()
server.close()

Methods:

getInitialData_impl()

rtype

Tuple[Dict, _RobotInterfaceStructure, _RobotInterfaceSettings, _RobotInterfaceState]

updateSettingsAndCommands_impl(…)

rtype

_RobotInterfaceState

initialize()

Tries to connect to the robot.

startController()

stop()

Asks to stop motion – perhaps call this when all clients disconnect

close()

Subclasses might want to override this to close clients as well.

getInitialData_impl()[source]
Return type

Tuple[Dict, _RobotInterfaceStructure, _RobotInterfaceSettings, _RobotInterfaceState]

updateSettingsAndCommands_impl(delta_settings, commands)[source]
Return type

_RobotInterfaceState

initialize()[source]

Tries to connect to the robot. Returns true if ready to send commands. This should probably be the first method called.

startController()[source]
stop()[source]

Asks to stop motion – perhaps call this when all clients disconnect

close()[source]

Subclasses might want to override this to close clients as well.

class klampt.control.networkrobotinterface.XMLRPCRobotInterfaceServer(interface, ip, port=7881)[source]

Bases: klampt.control.networkrobotinterface.ServerRobotInterfaceBase

A XMLRPC-based server for a RIL robot controller.

Usage

Args:

interface (RobotInterfaceBase): the interface to serve ip (str): the IP address (usually ‘128.0.0.1’ or ‘localhost’) port (int): the port of the listening socket. 7881 is used by default.

Methods:

log(err)

Logs an error.

serve()

getInitialData()

updateSettingsAndCommands(…)

log(err)[source]

Logs an error. Can be overloaded.

serve()[source]
getInitialData()[source]
updateSettingsAndCommands(delta_settings_json, commands_json)[source]
class klampt.control.networkrobotinterface.XMLRPCRobotInterfaceClient(addr='http://localhost:7881')[source]

Bases: klampt.control.networkrobotinterface.ClientRobotInterfaceBase

An XMLRPC-based client that connects to a RobotInterfaceServer.

Parameters

addr (str) – the IP address of the server, including port. Localhost port 7881 is used by default.

Methods:

connect()

rtype

bool

disconnect()

rtype

bool

getInitialData()

rtype

Tuple[Dict, _RobotInterfaceStructure, _RobotInterfaceSettings, _RobotInterfaceState]

updateSettingsAndCommands(delta_settings, …)

rtype

_RobotInterfaceState

connect()[source]
Return type

bool

disconnect()[source]
Return type

bool

getInitialData()[source]
Return type

Tuple[Dict, _RobotInterfaceStructure, _RobotInterfaceSettings, _RobotInterfaceState]

updateSettingsAndCommands(delta_settings, commands)[source]
Return type

_RobotInterfaceState