klampt.control.io package

roscontroller

Defines control interfaces between a ROS application and Klamp't robot.

rosinterface

Defines interface between a Klamp't application and ROS robot.

serialcontroller

An adaptor between RobotControllerBlock and the Klamp't C++ serial controller interface (SerialController).

klampt.control.io.roscontroller module

Defines control interfaces between a ROS application and Klamp’t robot.

  • RosRobotBlock is a Klampt RobotControlBlock that accepts inputs from a ROS application. This can be used to drive a simulation.

This also implements the make() protocol for use in klampt_control. The returned controller accepts ROS JointTrajectory messages from the ROS topic ‘/[robot_name]/joint_trajectory’ and writes out JointState messages to the ROS topic ‘/[robot_name]/joint_state’. It also acts as a ROS clock server.

class klampt.control.io.roscontroller.RosRobotBlock(joint_trajectory_sub_topic, joint_state_pub_topic, link_list)[source]

Bases: RobotControllerBlock

A controller that reads JointTrajectory messages from a given ROS topic, maintains the trajectory for use in a Klamp’t simulation, and writes JointState messages to another ROS topic.

Uses PID control with optional feedforward torques in the effort term.

Acts very much like a ROS JointTrajectoryActionController (http://wiki.ros.org/robot_mechanism_controllers/JointTrajectoryActionController) Currently does not support:

  • Setting PID gain constants,

  • Setting PID integral term bounds,

  • Parsing of FollowJointTrajectoryActions or reporting completion

    of the action.

  • Partial commands for subsets of joints are supported, but you

    cannot interleave separate messages to subsets of joints (i.e., you must let one joint group finish before sending messages to another).

Note: trajectory messages start execution exactly at the Klamp’t time given in the time stamp in the header. Hence, any nodes connected to this must have the /use_sim_time flag set to 1.

Sets the controller to subscribe to JointTrajectory messages from joint_trajectory_sub_topic, and publishes JointState messages from joint_state_pub_topic. The link_list argument is the ordered list of link names in the Klamp’t robot model.

jointTrajectoryCallback(msg)[source]
set_index(name, index)[source]
advance(**inputs)[source]

Computes the output of this controller given an argument list, and advances the time step.

Returns:

None (0 outputs), a value (1 output), a tuple, or a dict

map_output(vector, names, output_map, output_name)[source]

Maps a partial vector to output_map[output_name]. If output_name exists in output_map, then only the named values are overwritten. Otherwise, the missing values are set to zero.

class klampt.control.io.roscontroller.RosTimeBlock[source]

Bases: Block

A controller that simply publishes the simulation time to ROS. Doesn’t output anything.

advance(t)[source]

Computes the output of this controller given an argument list, and advances the time step.

Returns:

None (0 outputs), a value (1 output), a tuple, or a dict

klampt.control.io.roscontroller.make(klampt_robot_model)[source]

Creates a RosRobotInterface for the given model. klampt_robot_model is a RobotModel instance.

Subscribes to ‘/[robot_name]/joint_trajectory’. Publishes to ‘/[robot_name]/joint_state’ and ‘/clock’

klampt.control.io.rosinterface module

Defines interface between a Klamp’t application and ROS robot.

  • RosRobotInterface is a Klampt Robot Interface Layer (:class:~klampt.control.robotinterface.RobotInterfaceBase`) implementation that outputs commands to a ROS controller.

This also implements the make() protocol for use in klampt_control. The returned controller accepts ROS JointTrajectory messages from the ROS topic ‘/[robot_name]/joint_trajectory’ and writes out JointState messages to the ROS topic ‘/[robot_name]/joint_state’. It also acts as a ROS clock server.

class klampt.control.io.rosinterface.RosRobotInterface(robot, joint_state_sub_topic, joint_trajectory_pub_topic)[source]

Bases: RobotInterfaceBase

Implements a Klampt Robot Interface Layer for a ROS controlled

robot that outputs JointState messages and accepts JointTrajectory commands.

initialize() will create and start a ROS node.

initialize(False) will not start ROS. Instead, you’ll have to do this manually before calling initialize.

close() is optional to call before quitting. It must be called if you’d like to re-initialize.

initialize(init_ros=True)[source]

Tries to connect to the robot. Returns true if ready to send commands. This should probably be the first method called.

close()[source]

Cleanly shuts down any resources acquired using initialize().

jointStateCallback(jointState)[source]
klamptModel()[source]

If applicable, returns the Klamp’t RobotModel associated with this controller. Default tries to load from properties[‘klamptModelFile’].

Note that the configuration DOFs in the RIL robot correspond to the robot’s DRIVERs, not its links.

Note: the result of the default implementation is cached, so this can be called multiple times without a performance hit.

clock()[source]

Returns the current time on the robot’s clock, in seconds

status()[source]

Returns a status string for the robot / given joint. ‘ok’ means everything is OK.

commandedPosition()[source]

Retrieves the currently commanded joint position.

sensedPosition()[source]

Retrieves the currently sensed joint position.

sensedVelocity()[source]

Retrieves the currently sensed joint velocity.

sensedTorque()[source]

Retrieves the currently sensed joint torque.

setPiecewiseLinear(ts, qs, relative=True)[source]

Tells the robot to start a piecewise linear trajectory command. The first milestone will be interpolated from the current commanded configuration.

Parameters:
  • ts (list of floats) – times of the trajectory’s milestones

  • qs (list of floats, or list of list of floats) – list of the trajectory’s milestones

  • relative (bool) – if true, the times in ts are assumed to start at current time 0. Otherwise, they must all be greater than the current time retrieved by clock().

klampt.control.io.rosinterface.make(klampt_robot_model)[source]

Creates a RosRobotInterface for the given model. klampt_robot_model is a RobotModel instance.

Publishes to ‘/[robot_name]/joint_trajectory’. Subscribes to ‘/[robot_name]/joint_state’

klampt.control.io.serialcontroller module

An adaptor between RobotControllerBlock and the Klamp’t C++ serial controller interface (SerialController).

klampt.control.io.serialcontroller.packStrlen(s)[source]
klampt.control.io.serialcontroller.unpackStrlen(s)[source]
klampt.control.io.serialcontroller.writeSocket(socket, msg)[source]
klampt.control.io.serialcontroller.readSocket(socket, length)[source]
class klampt.control.io.serialcontroller.JsonClient(addr)[source]

Bases: dispatcher

An asyncore client that transmits JSON messages in the Klamp’t simple serial interface. Sends/receives variable-length messages such that the first 4 bytes are the length of the message (in binary) and the remainder is the payload.

Subclasses should override onMessage, which accepts with arbitrary Python objects that can be serialized by the json module. Subclasses should use sendMessage to send a message.

To run, call asyncore.loop().

handle_connect()[source]

Called on socket connect. May be overridden.

handle_close()[source]

Called on socket close. May be overridden.

handle_read()[source]

Called on read. Do not override; override onMessage instead.

writable()[source]

Called to determine whether there’s any data left to be sent. Do not override.

handle_write()[source]

Called to send data when available. Do not override.

onMessage(msg)[source]

Override this to handle an incoming message

sendMessage(msg)[source]

Call this to send an outgoing message

read(length)[source]
recv(buffer_size)[source]

Fix for windows sockets throwing EAGAIN crashing asyncore

class klampt.control.io.serialcontroller.ControllerClient(addr, controller)[source]

Bases: JsonClient

An asyncore client that relays Klampt ControllerBlock I/O to some receiver via a JSON-based serial interface. For example, this can be connected to a SerialController or to the SimTest app.

The interface simply translates messages back and forth using the raw ControllerBlock input / output dictionaries.

This uses the asyncore module. To run, pass it an address and a ControllerBlock interface. Then, call asyncore.loop(). The calling convention looks like this:

import asyncore
from klampt.control.io.serialcontroller import ControllerClient
from klampt.control.blocks.robotcontroller import RobotControllerBlock

class MyController(RobotControllerBlock):
    ...define your controller here...

#open up a client on localhost:3456
client = ControllerClient(('localhost',3456),MyController())
asyncore.loop()
Parameters:

addr – a (host,port) pair

handle_expt()[source]
handle_error()[source]
handle_connect()[source]

Called on socket connect. May be overridden.

onMessage(msg)[source]

Override this to handle an incoming message

class klampt.control.io.serialcontroller.JsonSerialController(addr=('localhost', 3456))[source]

Bases: RobotControllerBlock

A controller that maintains a server to write/read messages every output_and_advance cycle.

It simply translates messages back and forth to a client via a JSON-based serial interface.

accept()[source]

Get a new connection, if there isn’t one

advance(**inputs)[source]

Computes the output of this controller given an argument list, and advances the time step.

Returns:

None (0 outputs), a value (1 output), a tuple, or a dict

klampt.control.io.serialcontroller.make(robot)[source]