klampt.control.blocks package

Defines a set of generic “system blocks” that are repeatedly-updating processes. These can implement filters, estimators, read from sensor drivers, or output commands to a simulated or real robot.

core

Defines a set of generic "system blocks" that are repeatedly-updating processes.

robotcontroller

Defines RobotControllerBlock, which can be used for SimpleSimulator or the Klampt Robot Interface Layer (see RobotInterfaceBase).

utils

Contains the following helper blocks:

filters

state_machine

Implements state machines as Block structures.

trajectory_tracking

klampt.control.blocks.robotcontroller module

Defines RobotControllerBlock, which can be used for SimpleSimulator or the Klampt Robot Interface Layer (see RobotInterfaceBase). Defines a standard convention, expecting to receive the following values as input:

  • t: current time, in seconds

  • dt: controller time step, in seconds

  • q: sensed robot configuration

  • dq: sensed robot velocity

  • [sensor1_name]: measurement vector for sensor 1

  • [sensor1_name]: measurement vector for sensor k

The output is expected to contain one or more of the following keys:

  • qcmd

  • dqcmd

  • tcmd

  • torquecmd

These are interpreted as follows:

  • If qcmd is set, then it’s a PID command. If dqcmd is also set, then it describes the desired velocity. If dqcmd is not set, the desired velocity is 0. If torquecmd is also set, then torquecmd is a feedforward torque.

  • If dqcmd and tcmd are set, they describe a fixed velocity command for duration tcmd.

  • If torquecmd is set, this describes a torque command.

No other combinations are currently supported.

For convenience, your RobotControllerBlock subclass may use the RobotControllerIO class for object-oriented access to the input / output data. Example usage is as follows:

api = RobotControllerIO(inputs)
print("Current time is",api.time())
#set a position command
api.setJointPositionCommand(5,0.5)
return api.makeCommand()

Binding robot controllers to a robot

The easiest way to use this with a simulated / real robot is the klampt.control.interop.RobotControllerBlockToInterface class.

Example that outputs to a simulation (and visualizes it):

from klampt.control.block.robotcontroller import RobotControllerBase
from klampt.control.simrobotinterface import *
from klampt.control.interop import RobotControllerBlockToInterface
from klampt import WorldModel, Simulator
from klampt import vis

world = WorldModel()
...
#create world, MyControllerObject class here
...
sim = Simulator(world)

vis.add("world",world)
vis.show()
controller = MyControllerObject()  #subclass of RobotControllerBase
interface = SimPositionControlInterface(sim.controller(0),sim)
binding = RobotControllerBlockToInterface(controller,interface)
while vis.shown():
    binding.advance()
    sim.updateWorld()
    vis.update()

Example that outputs to a real robot:

from klampt.control.block.controller import RobotControllerBlock
from klampt.control.interop import RobotControllerBlockToInterface

#create MyControllerObject, MyRobotInterface class here

controller = MyControllerObject()  #subclass of RobotControllerBase
interface = MyRobotInterface(...)
binding = RobotControllerBlockToInterface(controller,interface)
while not done:
    binding.advance()

For tighter control over a simulation, such as sub-stepping, you should use the klampt.sim.simulation module. Robot controllers in RobotControllerBlock form are accepted by the SimpleSimulator.setController() method.

class klampt.control.blocks.robotcontroller.RobotControllerBlock(robotModel=None)[source]

Bases: Block

A block implementation for a robot controller. This doesn’t do anything but set up the block. The subclass still has to implement advance, signal, __getstate__/__savestate__, etc.

Recommend using the RobotControllerIO class to format the results.

class klampt.control.blocks.robotcontroller.RobotControllerIO(kwargs)[source]

Bases: object

A helper class that makes it a bit easier to implement a RobotControllerBase by providing an object-oriented interface to the dictionary-based protocol.

Usage:

class MyController(ControllerBlock):
    def advance(self,**args):
        api = RobotControllerIO(inputs)
        print("Current time is",api.time())
        #set a position command
        api.setJointPositionCommand(5,0.5)
        return api.makeCommand()

All methods, except for those prefixed by make or set, are to get values from the input dictionary.

The makeXCommand methods return a propertly formatted output dictionary. Alternatively, you can make several setXCommand calls and then call makeCommand() to retrieve the output dictionary.

time()[source]

Returns the robot clock time

timeStep()[source]

Returns the robot time step

commandedConfiguration()[source]

Returns the commanded joint configuration or None if it is not sensed.

commandedVelocity()[source]

Returns the commanded joint velocities or None if it is not sensed.

sensedConfiguration()[source]

Returns the sensed joint configuration or None if it is not sensed.

sensedVelocity()[source]

Returns the sensed joint velocity or None if it is not sensed.

sensedTorque()[source]

Returns the sensed torque or None if it is not sensed.

sensorNames()[source]

Returns the list of sensor names (including clock and time step)

sensorValue(sensor)[source]

Returns the value of the named sensor.

makePositionCommand(q)[source]
makePIDCommand(q, dq)[source]
makeFeedforwardPIDCommand(q, dq, torque)[source]
makeVelocityCommand(dq, t)[source]
makeTorqueCommand(torque)[source]
setPositionCommand(value)[source]
setPIDCommand(q, dq)[source]
setFeedforwardPIDCommand(q, dq, torque)[source]
setVelocityCommand(v, dt)[source]
setTorqueCommand(torque)[source]
setJointPositionCommand(index, value)[source]

Sets a single indexed joint to a position command

setJointVelocityCommand(index, value)[source]

Sets a single indexed joint to a velocity command

makeCommand()[source]

Returns the command from previous setXCommand() calls.

klampt.control.blocks.utils module

Contains the following helper blocks:

  • BlockSignal: an exception that allows a block to signal something (e.g., an error condition) to its parent.

  • SignalBlock: a block that raises a signal if an error is raised.

  • LambdaBlock: a stateless block that simply runs a fixed function on each time step.

  • LinearBlock: a stateless block that simply performs a linear or affine function.

  • Concatenate: concatenates sevearl items together.

  • Clamp: restricts an item to a range.

  • LimitExceeded: returns 1 if the limits are exceeded

  • Distance: calculates the distance.

Exceptions:

BlockSignal(signal, text)

An exception raised by a block if it wishes to raise a signal to a parent.

Classes:

SignalBlock(type, text)

A block that raises a signal if its input is nonzero

LambdaBlock(f[, inputs, outputs])

A fixed-function controller that simply evaluates a function.

LinearBlock(A[, b])

Implements a linear function output = A*input + b

Concatenate(inputs)

Concatenates vectors from multiple items into a single vector.

Clamp()

Restricts a value to some range

LimitExceeded()

Returns 1 if a value exceeds some range

Distance([metric])

Returns the L-p distance between two values

WorldCollision(world_or_collider)

Returns True if a collision occurrs in the world (or a collider)

If()

Mux(k)

Function (index, case0, case1, ..., casek) returning case[index]

exception klampt.control.blocks.utils.BlockSignal(signal, text)[source]

Bases: RuntimeError

An exception raised by a block if it wishes to raise a signal to a parent.

signal

the identifier of the signal

Type:

str

class klampt.control.blocks.utils.SignalBlock(type, text)[source]

Bases: Block

A block that raises a signal if its input is nonzero

Methods:

advance(signal)

Computes the output of this controller given an argument list, and advances the time step.

advance(signal)[source]

Computes the output of this controller given an argument list, and advances the time step.

Returns:

None (0 outputs), a value (1 output), a tuple, or a dict

class klampt.control.blocks.utils.LambdaBlock(f, inputs='auto', outputs='auto')[source]

Bases: Block

A fixed-function controller that simply evaluates a function. The function arguments and return values are mapped from/to the input/output dictionaries.

Methods:

advance(*args)

Computes the output of this controller given an argument list, and advances the time step.

advance(*args)[source]

Computes the output of this controller given an argument list, and advances the time step.

Returns:

None (0 outputs), a value (1 output), a tuple, or a dict

class klampt.control.blocks.utils.LinearBlock(A, b=None)[source]

Bases: Block

Implements a linear function output = A*input + b

The user must fill out the self.gains member using the addGain() method.

To use this, Numpy must be available on your system.

Methods:

advance(x)

Computes the output of this controller given an argument list, and advances the time step.

advance(x)[source]

Computes the output of this controller given an argument list, and advances the time step.

Returns:

None (0 outputs), a value (1 output), a tuple, or a dict

class klampt.control.blocks.utils.Concatenate(inputs)[source]

Bases: Block

Concatenates vectors from multiple items into a single vector. Useful for when you have one controller for each arm, one for a lower body, etc.

Parameters:

n (int) – the number of items

Methods:

advance(*args)

Computes the output of this controller given an argument list, and advances the time step.

advance(*args)[source]

Computes the output of this controller given an argument list, and advances the time step.

Returns:

None (0 outputs), a value (1 output), a tuple, or a dict

class klampt.control.blocks.utils.Clamp[source]

Bases: Block

Restricts a value to some range

Methods:

advance(x, minimum, maximum)

Computes the output of this controller given an argument list, and advances the time step.

advance(x, minimum, maximum)[source]

Computes the output of this controller given an argument list, and advances the time step.

Returns:

None (0 outputs), a value (1 output), a tuple, or a dict

class klampt.control.blocks.utils.LimitExceeded[source]

Bases: Block

Returns 1 if a value exceeds some range

Methods:

advance(x, minimum, maximum)

Computes the output of this controller given an argument list, and advances the time step.

advance(x, minimum, maximum)[source]

Computes the output of this controller given an argument list, and advances the time step.

Returns:

None (0 outputs), a value (1 output), a tuple, or a dict

class klampt.control.blocks.utils.Distance(metric=inf)[source]

Bases: Block

Returns the L-p distance between two values

Methods:

advance(x1, x2)

Computes the output of this controller given an argument list, and advances the time step.

advance(x1, x2)[source]

Computes the output of this controller given an argument list, and advances the time step.

Returns:

None (0 outputs), a value (1 output), a tuple, or a dict

class klampt.control.blocks.utils.WorldCollision(world_or_collider)[source]

Bases: Block

Returns True if a collision occurrs in the world (or a collider)

Methods:

advance(q)

Computes the output of this controller given an argument list, and advances the time step.

advance(q)[source]

Computes the output of this controller given an argument list, and advances the time step.

Return type:

bool

Returns:

None (0 outputs), a value (1 output), a tuple, or a dict

class klampt.control.blocks.utils.If[source]

Bases: Block

Methods:

advance(cond, truebranch, falsebranch)

Computes the output of this controller given an argument list, and advances the time step.

advance(cond, truebranch, falsebranch)[source]

Computes the output of this controller given an argument list, and advances the time step.

Returns:

None (0 outputs), a value (1 output), a tuple, or a dict

class klampt.control.blocks.utils.Mux(k)[source]

Bases: Block

Function (index, case0, case1, …, casek) returning case[index]

Methods:

advance(*args)

Computes the output of this controller given an argument list, and advances the time step.

advance(*args)[source]

Computes the output of this controller given an argument list, and advances the time step.

Returns:

None (0 outputs), a value (1 output), a tuple, or a dict

klampt.control.blocks.filters module

Classes:

FIRFilter(b)

An estimator that filters some other signal using a Finite Impulse Response filter.

class klampt.control.blocks.filters.FIRFilter(b)[source]

Bases: Block

An estimator that filters some other signal using a Finite Impulse Response filter. b is the vector of coefficients:

y[t] = x[t]*b[0] + … + x[t-k+1]*b[k-1].

For example, a k-moving average filter would set the b vector to [1/k,…,1/k]

Methods:

advance(x)

Computes the output of this controller given an argument list, and advances the time step.

signal(type, **inputs)

Reacts to some asynchronous signal.

advance(x)[source]

Computes the output of this controller given an argument list, and advances the time step.

Returns:

None (0 outputs), a value (1 output), a tuple, or a dict

signal(type, **inputs)[source]

Reacts to some asynchronous signal. Most blocks won’t use this. Common signals include:

  • ‘reset’: reset a state machine

  • ‘enter’: enter activity in a state machine

  • ‘exit’: exit activity in a state machine

  • ‘change_state’ index: trigger a state change in a state machine.

klampt.control.blocks.state_machine module

Implements state machines as Block structures.

  • StateMachineBase: a state machine that switches between multiple sub-controllers, with one sub-controller running at once and the active controller triggered by some signal.

  • TransitionStateMachine: a state machine with explicit transition conditions.

  • TimedSequenceBlock: a sequence of controllers, switched by time.

Classes:

StateMachineBase(blocks[, start, reset_on_enter])

A base class for a finite state machine controller.

TransitionStateMachine(blocks[, ...])

A state machine controller with a transition matrix that determines when to move to the next state.

TimedSequenceBlock(blocks, times)

A state-machine controller that goes through each sub-controller in sequence.

class klampt.control.blocks.state_machine.StateMachineBase(blocks, start=0, reset_on_enter=True)[source]

Bases: Block

A base class for a finite state machine controller. One sub-controller may be active at once.

To implement transitions, next_state() needs to be filled out by the subclass.

If a sub-block has inputs ‘enter’ or ‘exit’, they are raised when the sub-controller enters or exits.

Signals:
  • ‘reset’ (bool): when 1, the state machine goes back to the start state and resets all blocks.

Outputs:
  • ‘state’ (int): the currently active state.

blocks

the list of sub-controllers

Type:

list of Block

current

the index of the currently active sub-controller. -1 indicates no current sub-controller.

Type:

int

start

the index of the initial sub-controller.

Type:

int

Methods:

next_state(state, *args, **kwargs)

Subclasses should override this to implement the transitions

signal(type, *args)

Reacts to some asynchronous signal.

advance(*args, **kwargs)

Computes the output of this controller given an argument list, and advances the time step.

func(name)

next_state(state, *args, **kwargs)[source]

Subclasses should override this to implement the transitions

signal(type, *args)[source]

Reacts to some asynchronous signal. Most blocks won’t use this. Common signals include:

  • ‘reset’: reset a state machine

  • ‘enter’: enter activity in a state machine

  • ‘exit’: exit activity in a state machine

  • ‘change_state’ index: trigger a state change in a state machine.

advance(*args, **kwargs)[source]

Computes the output of this controller given an argument list, and advances the time step.

Returns:

None (0 outputs), a value (1 output), a tuple, or a dict

func(name)[source]
class klampt.control.blocks.state_machine.TransitionStateMachine(blocks, transitions=None, inputs=0, start=0, reset_on_enter=True)[source]

Bases: StateMachineBase

A state machine controller with a transition matrix that determines when to move to the next state.

blocks

same as StateMachineBase

current

same as StateMachineBase

transitions

a list of transition conditions [{x1:cond1,…,xk:condk},…,{}] so that if j is in transitions[i], then transitions[i][j](*inputs) is a condition that tells the state machine whether to change states from i to j.

Type:

list of dicts

Methods:

set_transition(fromState, toState, func)

next_state(state, *args, **kwargs)

Subclasses should override this to implement the transitions

set_transition(fromState, toState, func)[source]
next_state(state, *args, **kwargs)[source]

Subclasses should override this to implement the transitions

class klampt.control.blocks.state_machine.TimedSequenceBlock(blocks, times)[source]

Bases: TransitionStateMachine

A state-machine controller that goes through each sub-controller in sequence.

Input:

t (float): the current time

klampt.control.blocks.trajectory_tracking module

Classes:

TrajectoryPositionController(traj[, type])

A (robot) controller that takes in a trajectory and outputs the position along the trajectory.

TrajectoryWithFeedforwardTorqueController(...)

A controller that takes in a joint trajectory and a feedforward torque trajectory.

Functions:

make(robot[, file, ff_torque_file])

class klampt.control.blocks.trajectory_tracking.TrajectoryPositionController(traj, type=('qcmd', 'dqcmd'))[source]

Bases: RobotControllerBlock

A (robot) controller that takes in a trajectory and outputs the position along the trajectory. If type is a 2-tuple, this will also output the derivative of the trajectory

Methods:

advance(**inputs)

Computes the output of this controller given an argument list, and advances the time step.

signal(type, *inputs)

Reacts to some asynchronous signal.

advance(**inputs)[source]

Computes the output of this controller given an argument list, and advances the time step.

Returns:

None (0 outputs), a value (1 output), a tuple, or a dict

signal(type, *inputs)[source]

Reacts to some asynchronous signal. Most blocks won’t use this. Common signals include:

  • ‘reset’: reset a state machine

  • ‘enter’: enter activity in a state machine

  • ‘exit’: exit activity in a state machine

  • ‘change_state’ index: trigger a state change in a state machine.

class klampt.control.blocks.trajectory_tracking.TrajectoryWithFeedforwardTorqueController(traj, torquetraj)[source]

Bases: RobotControllerBlock

A controller that takes in a joint trajectory and a feedforward torque trajectory.

Methods:

advance(**inputs)

Computes the output of this controller given an argument list, and advances the time step.

signal(type, **inputs)

Reacts to some asynchronous signal.

advance(**inputs)[source]

Computes the output of this controller given an argument list, and advances the time step.

Returns:

None (0 outputs), a value (1 output), a tuple, or a dict

signal(type, **inputs)[source]

Reacts to some asynchronous signal. Most blocks won’t use this. Common signals include:

  • ‘reset’: reset a state machine

  • ‘enter’: enter activity in a state machine

  • ‘exit’: exit activity in a state machine

  • ‘change_state’ index: trigger a state change in a state machine.

klampt.control.blocks.trajectory_tracking.make(robot, file='mypath.path', ff_torque_file=None)[source]

Module contents

class klampt.control.blocks.Block(inputs=0, outputs=0)[source]

Bases: object

A generic base class for some streaming computational block. This is typically used to define robot controllers and components of such controllers, such as state estimators and filters.

Users will create blocks, connect their inputs and outputs, and then set up a SuperBlock. Then, SuperBlock.advance() will be called repeatedly.

At a minimum, a block should specify its # of inputs and outputs and implement the advance() method. A block can also specify named inputs and outputs, which will let the inputs to advance() be specified as keyword, and the return value can also be a dict.

A stateful block should also implement the __getstate__() and __getstate__() methods to help serialization / deserialization, state machine resetting, etc.

Parameters:
  • inputs (int or list of str) – gives the # of inputs or a list of input names.

  • outputs (int or list of str) – gives the # of outputs or a list of output names.

advance(*args)[source]

Computes the output of this controller given an argument list, and advances the time step.

Return type:

Union[None, Any, Tuple, Dict[str, Any]]

Returns:

None (0 outputs), a value (1 output), a tuple, or a dict

signal(name, *args)[source]

Reacts to some asynchronous signal. Most blocks won’t use this. Common signals include:

  • ‘reset’: reset a state machine

  • ‘enter’: enter activity in a state machine

  • ‘exit’: exit activity in a state machine

  • ‘change_state’ index: trigger a state change in a state machine.

debug(type)[source]

Optional: hook to give feedback to the visualizer

class klampt.control.blocks.SuperBlock(seeds)[source]

Bases: Block

A collection of Blocks with connected inputs and outputs.

Initialized with one or more seed Blocks. All nodes path-connected to these seeds will be included in the group.

advance(*args)[source]

Triggers all of the nodes in the block diagram.

signal(name, *args)[source]

Reacts to some asynchronous signal. Most blocks won’t use this. Common signals include:

  • ‘reset’: reset a state machine

  • ‘enter’: enter activity in a state machine

  • ‘exit’: exit activity in a state machine

  • ‘change_state’ index: trigger a state change in a state machine.

debug(type)[source]

Optional: hook to give feedback to the visualizer

class klampt.control.blocks.Source[source]

Bases: Block

class klampt.control.blocks.Sink[source]

Bases: Block