"""
Defines a set of generic "system blocks" that are repeatedly-updating
processes. These can implement filters, estimators, read from sensor drivers,
or output commands to a simulated or real robot.
The :class:`Block` class defines a block as accepting some
inputs and produces some outputs every time that ``advance()`` is
called. The inputs and outputs are extremely general, and consist
of arguments referred to by an int or str index. Outputs can also be
added, subtracted, and compared to produce new outputs.
The core module contains several examples of controller
blocks that can be composed. Connect the inputs and outputs of blocks,
pass any subset to a :class:`SuperBlock`, and the inputs and outputs of
the SuperBlock will be automatically determined.
See :mod:`klampt.control.blocks.utils` for
utilities, and :mod:`klampt.control.blocks.state_machine` for state machines
that use Blocks.
.. versionadded:: 0.9
"""
from klampt.math import vectorops
from typing import Union,Any,Tuple,Sequence,Dict,List,Iterator
import weakref
import warnings
import operator
[docs]class Block(object):
"""A generic base class for some streaming computational block.
This is typically used to define robot controllers and components
of such controllers, such as state estimators and filters.
Users will create blocks, connect their inputs and outputs, and
then set up a SuperBlock. Then, SuperBlock.advance() will be called
repeatedly.
At a minimum, a block should specify its # of inputs and outputs
and implement the :meth:`advance` method. A block can also specify
named inputs and outputs, which will let the inputs to advance() be
specified as keyword, and the return value can also be a dict.
A stateful block should also implement the :meth:`__getstate__` and
:meth:`__getstate__` methods to help serialization / deserialization,
state machine resetting, etc.
Arguments:
inputs (int or list of str): gives the # of inputs or a list of
input names.
outputs (int or list of str): gives the # of outputs or a list
of output names.
"""
def __init__(self,inputs=0,outputs=0) -> None:
self._name = None
self._inputs = _BlockInputs(weakref.proxy(self),inputs)
self._outputs = _BlockOutputs(weakref.proxy(self),outputs)
[docs] def advance(self,*args) -> Union[None,Any,Tuple,Dict[str,Any]]:
"""Computes the output of this controller given an argument list,
and advances the time step.
Returns:
None (0 outputs), a value (1 output), a tuple, or a dict
"""
return None
[docs] def signal(self,name,*args):
"""Reacts to some asynchronous signal. Most blocks won't use this.
Common signals include:
* 'reset': reset a state machine
* 'enter': enter activity in a state machine
* 'exit': exit activity in a state machine
* 'change_state' index: trigger a state change in a state machine.
"""
return
def __str__(self):
"""Optional: return a descriptive string describing this block"""
if self._name is None:
return self.__class__.__name__
else:
return "{}({})"%(self._name,self.__class__.__name__)
def _process(self):
"""Called internally"""
for c in self._inputs._channels:
c.advance()
if len(self._inputs._namesToIndices) > 0: #map to keyword args
args = {}
for k,i in self._inputs._namesToIndices.items():
args[k] = self._inputs._channels[i]._currentValue
res = self.advance(**args)
else:
args = [c._currentValue for c in self._inputs._channels]
res = self.advance(*args)
if res is None:
if len(self._outputs)!=0:
raise ValueError("Block {} produces {} outputs but advance returned None".format(str(self),len(self._outputs)))
else:
if len(self._outputs)==1:
self._outputs[0].setValue(res)
else:
if len(res) != len(self._outputs):
raise ValueError("Block {} produces {} outputs but advance returned {}".format(str(self),len(self._outputs),len(res)))
if isinstance(res,dict):
#map to outputs using keywords
for k,r in res.items():
if k not in self._outputs._namesToIndices:
raise ValueError("Block {} produces output {} which is not a named output".format(str(self),str(k)))
i = self._outputs._namesToIndices[k]
self._outputs._channels[i].setValue(r)
else:
for r,o in zip(res,self._outputs._channels):
o.setValue(r)
def __call__(self,*args) -> 'Block':
"""If each of args is a one-output function, this connects all of their
outputs to each of this function's inputs.
Usage::
#result is a SomeBlock taking args block1 output 0, block2 output 1
result = SomeBlock()(block1,block2.output[1])
Returns:
Block
"""
if len(args) != len(self._inputs):
raise ValueError("Invalid number of args {} != {}".format(len(args),len(self._inputs)))
if len(self._outputs) != 1:
raise ValueError("Can only use call notation for functions with 1 channel output")
self._storage = [] #for shortcut evaluation, only weak references are stored in each channel. This prevents block deletion.
for i,arg in enumerate(args):
if isinstance(arg,Block):
if len(arg._output) != 1:
raise ValueError("Invalid arg {}, doesn't have single output".format(str(arg)))
self._inputs[i].connect(arg._outputs[0])
self._storage.append(arg)
elif isinstance(arg,(_BlockOutputs,_BlockOutputChannel)):
self._inputs[i].connect(arg)
else:
raise ValueError("Invalid arg {}, must be a block or an output channel".format(str(arg)))
return self
def __getattr__(self,name):
if name == 'input':
return self._inputs
elif name == 'output':
return self._outputs
else:
raise AttributeError()
def __setattr__(self,name,value):
if name == 'input':
if len(self._inputs._channels)!=1:
raise ValueError("For input=X to work, need exactly 1 channel")
self._inputs[0].connect(value)
elif name == 'output':
if len(self._outputs._channels)!=1:
raise ValueError("For output=X to work, need exactly 1 channel")
self._outputs[0].connect(value)
else:
super().__setattr__(name,value)
[docs] def debug(self,type):
"""Optional: hook to give feedback to the visualizer"""
if type=='print':
print(str(self))
else:
pass
[docs]class SuperBlock(Block):
"""A collection of Blocks with connected inputs and outputs.
Initialized with one or more seed Blocks. All nodes path-connected
to these seeds will be included in the group.
"""
def __init__(self,seeds:Union['Block',Sequence['Block']]):
self.blocks = []
self.edges = []
self.free_inputs = []
self.free_outputs = []
if isinstance(seeds,Block):
seeds = [seeds]
if len(seeds)==0:
return
#do an undirected DFS to extract all blocks
q = [s for s in seeds]
visited = {}
for s in seeds: visited[id(s.__repr__.__self__)] = True
while len(q)>0:
b = q.pop()
self.blocks.append(b)
for c in b._outputs._channels:
for target in c._downstream:
hash = id(target._block.__repr__.__self__)
if hash not in visited:
q.append(target._block)
visited[hash] = True
for c in b._inputs._channels:
target = c._upstream
if target is None: continue
hash = hash = id(target._block.__repr__.__self__)
if hash not in visited:
q.append(target._block)
visited[hash] = True
#do a topological sort
inputBlocks = []
for b in self.blocks:
inputBlocks.append((len(b._inputs._channels),b))
inputBlocks = sorted(inputBlocks,key=lambda x:x[0])
if inputBlocks[0][0] != 0:
warnings.warn("Diagram doesn't have any source blocks")
seeds = [inputBlocks[0][1]]
else:
seeds = []
for (c,b) in inputBlocks:
if c!=0: break
seeds.append(b)
#BFS to perform topological sort
self.blocks = []
visited = {}
q = [s for s in seeds]
for s in seeds: visited[id(s)] = True
while len(q)>0:
b = q.pop(0)
self.blocks.append(b)
for c in b._outputs._channels:
for target in c._downstream:
if id(target._block) not in visited:
q.append(target._block)
visited[id(target._block)] = True
for b in self.blocks:
self.edges.append([])
for c in b._outputs._channels:
if len(c._downstream)==0:
self.free_outputs.append(c)
for target in c._downstream:
self.edges[-1].append((c,target))
for c in b._inputs._channels:
if c._upstream is None:
self.free_inputs.append(c)
free_input_names = []
free_output_names = []
for c in self.free_inputs:
free_input_names.append(str(c))
for c in self.free_outputs:
free_output_names.append(str(c))
Block.__init__(self,free_input_names,free_output_names)
[docs] def advance(self,*args):
"""Triggers all of the nodes in the block diagram."""
if len(args) != len(self.free_inputs):
raise RuntimeError("Not enough arguments (%d) for free inputs (%d)"%(len(args),len(self.free_inputs)))
for a,c in self.free_inputs:
c._currentValue = a
for b in self.blocks:
b._process()
for c in self.free_inputs:
c._currentValue = None
return [c._currentValue for c in self.free_outputs]
[docs] def signal(self,name,*args):
for b in self.blocks:
b.signal(name,*args)
[docs] def debug(self,type):
for b in self.blocks:
b.debug(type)
[docs]class Source(Block):
def __init__(self):
Block.__init__(self,0,1)
[docs]class Sink(Block):
def __init__(self):
Block.__init__(self,1,0)
class ValueSource(Source):
"""A simple Source whose value is controlled by an external process
via ``Source.write(value)``.
"""
def __init__(self,value):
Source.__init__(self)
self.value = value
def advance(self):
return self.value
def write(self,value):
self.value = value
class ValueSink(Sink):
"""A simple Sink whose value is extracted by an external process
via ``Sink.read()``.
"""
def advance(self,value):
self.value = value
def read(self):
return self.value
class Counter(Source):
"""Maintains a value that increases linearly each time advance() is
called.
"""
def __init__(self,initVal=0,increment=1):
Source.__init__(self)
self.value = initVal
self.increment = increment
def advance(self):
res = self.value
self.value += self.increment
return res
def __getstate__(self):
return self.value,self.increment
def __setstate__(self,state):
self.value,self.increment = state
class Add(Block):
"""Adds two inputs (scalar or vector)."""
def __init__(self):
Block.__init__(self,2,1)
def advance(self,a,b):
if hasattr(a,'__iter__') or hasattr(b,'__iter__'):
return vectorops.add(a,b)
return a+b
class Sub(Block):
"""Subtracts the second input from the first (scalar or vector)."""
def __init__(self):
Block.__init__(self,2,1)
def advance(self,a,b):
if hasattr(a,'__iter__') or hasattr(b,'__iter__'):
return vectorops.sub(a,b)
return a-b
class BinaryOp(Block):
"""Generic lambda for a binary operation."""
def __init__(self,func):
self.func = func
Block.__init__(self,2,1)
def advance(self,a,b):
return self.func(a,b)
class Min(Block):
"""Takes the minimum of two inputs (scalar or vector)."""
def __init__(self):
Block.__init__(self,2,1)
def advance(self,a,b):
if hasattr(a,'__iter__') or hasattr(b,'__iter__'):
return vectorops.minimum(a,b)
return min(a,b)
class Max(Block):
"""Takes the maximum of two inputs (scalar or vector)."""
def __init__(self):
Block.__init__(self,2,1)
def advance(self,a,b):
if hasattr(a,'__iter__') or hasattr(b,'__iter__'):
return vectorops.maximum(a,b)
return max(a,b)
class _BlockInputChannel(object):
def __init__(self,block,index,name=None):
self._block = block
self._index = index
self._global_name = None
self._name = name
self._type = None
self._upstream = None #type: _BlockOutputChannel
self._currentValue = None
def __str__(self):
if self._global_name is not None: return self._global_name
elif self._name is None: return str(self._block)+'.'+"inputs[{}]".format(self._index)
else: return str(self._block)+'.'+self._name
def setName(self,name):
self._global_name = name
def setType(self,type):
self._type = type
def connect(self,channel):
if isinstance(channel,Block):
self.connect(channel._outputs)
elif isinstance(channel,_BlockOutputs):
if len(channel._channels) != 1:
raise ValueError("Cannot connect input to outputs with more than one channel")
self.connect(channel[0])
elif isinstance(channel,_BlockOutputChannel):
if channel._type is not None and self._type is not None:
if self._type != channel._type:
raise ValueError("Can't connect input of type {} to output of type {}".format(self._type,channel._type))
channel._downstream.append(weakref.proxy(self))
self._upstream = channel
elif isinstance(channel,(_BlockInputChannel,_BlockInputs)):
raise ValueError("Can only connect outputs to inputs")
else:
self._upstream = channel
def read(self):
return self._currentValue
def reset(self):
self._currentValue = None
def advance(self):
if self._upstream is not None:
if isinstance(self._upstream,_BlockOutputChannel):
self._currentValue = self._upstream._currentValue
else:
self._currentValue = self._upstream
else:
self._currentValue = None
class _BlockOutputChannel(object):
def __init__(self,block,index,name=None):
self._block = block
self._index = index
self._global_name = None
self._name = name
self._type = None
self._currentValue = None
self._downstream = [] #type: List[_BlockInputChannel]
def __str__(self):
if self._global_name is not None: return self._global_name
elif self._name is None: return str(self._block)+".outputs[{}]".format(self._index)
else: return str(self._block)+'.'+self._name
def setName(self,name):
self._global_name = name
def setType(self,type):
self._type = type
def connect(self,channel):
if isinstance(channel,_BlockInputs):
self.connect(channel[0])
elif isinstance(channel,_BlockInputChannel):
if channel._type is not None and self._type is not None:
if self._type != channel._type:
raise ValueError("Can't connect output of type {} to input of type {}".format(self._type,channel._type))
self._downstream.append(weakref.proxy(channel))
channel._upstream = self
elif isinstance(channel,(_BlockOutputChannel,_BlockOutputs)):
raise ValueError("Can only connect inputs to outputs")
else:
raise ValueError("Can't connect outputs to constants")
def reset(self):
self._currentValue = None
def setValue(self,val):
self._currentValue = val
def read(self):
return self._currentValue
def __add__(self,rhs):
res = Add()
res.input[0].connect(self)
res.input[1].connect(rhs)
return res
def __sub__(self,rhs):
res = Sub()
res.input[0].connect(self)
res.input[1].connect(rhs)
return res
def __gt__(self,rhs):
res = BinaryOp(operator.gt)
res.input[0].connect(self)
res.input[1].connect(rhs)
return res
def __ge__(self,rhs):
res = BinaryOp(operator.ge)
res.input[0].connect(self)
res.input[1].connect(rhs)
return res
def __lt__(self,rhs):
res = BinaryOp(operator.lt)
res.input[0].connect(self)
res.input[1].connect(rhs)
return res
def __le__(self,rhs):
res = BinaryOp(operator.le)
res.input[0].connect(self)
res.input[1].connect(rhs)
return res
def __eq__(self,rhs):
res = BinaryOp(operator.eq)
res.input[0].connect(self)
res.input[1].connect(rhs)
return res
def __ne__(self,rhs):
res = BinaryOp(operator.ne)
res.input[0].connect(self)
res.input[1].connect(rhs)
return res
class _BlockInputs(object):
def __init__(self,block,inputs) -> None:
self._block = block
if isinstance(inputs,int):
self._channels = [_BlockInputChannel(block,i) for i in range(inputs)]
self._namesToIndices = {}
else:
self._channels = [_BlockInputChannel(block,i,n) for i,n in enumerate(inputs)]
self._namesToIndices = {}
for i,n in enumerate(inputs):
self._namesToIndices[n] = i
def addChannel(self,name=None) -> None:
if name is None:
if len(self._namesToIndices) == len(self._channels) and len(self._channels) > 0:
raise ValueError("Can't add unnamed channel when prior channels are named")
self._channels.append(_BlockInputChannel(self._block),len(self._channels))
else:
if len(self._namesToIndices) != len(self._channels):
raise ValueError("Can't add named channel when prior channels are unnamed")
if name in self._namesToIndices:
raise ValueError("Already have a channel named {}".format(name))
i = len(self._channels)
self._channels.append(_BlockInputChannel(self._block,name),i)
self._namesToIndices[name] = i
def __len__(self) -> int:
return len(self._channels)
def __contains__(self, key : str) -> bool:
return key in self._namesToIndices
def items(self) -> Iterator:
if len(self._namesToIndices):
for k,i in self._namesToIndices.items():
yield (k,self._channels[i])
else:
return enumerate(self._channels)
def setName(self, name : str) -> None:
if len(self._channels)!=1:
raise ValueError("For input.setName() to work, need exactly 1 channel")
self._channels[0].setName(name)
def setType(self,type) -> None:
if len(self._channels)!=1:
raise ValueError("For input.setType() to work, need exactly 1 channel")
self._channels[0].setType(type)
def __getitem__(self, key : Union[int,str]):
if isinstance(key,int):
return self._channels[key]
else:
return self._channels[self._namesToIndices[key]]
def __setitem__(self, key : Union[int,str], value) -> None:
if isinstance(key,int):
self._channels[key].connect(value)
else:
self._channels[self._namesToIndices[key]].connect(value)
def read(self):
if len(self._channels)==0:
return None
elif len(self._channels) > 1:
if len(self._namesToIndices):
return dict((k,self._channels[i].read()) for (k,i) in self._namesToIndices.items())
else:
return [c.read() for c in self._channels]
return self._channels[0].read()
class _BlockOutputs(object):
def __init__(self,block,outputs) -> None:
self._block = block
if isinstance(outputs,int):
self._channels = [_BlockOutputChannel(block,i) for i in range(outputs)]
self._namesToIndices = {}
else:
self._channels = [_BlockOutputChannel(block,i,n) for i,n in enumerate(outputs)]
self._namesToIndices = {}
for i,n in enumerate(outputs):
self._namesToIndices[n] = i
def addChannel(self,name=None) -> None:
if name is None:
if len(self._namesToIndices) == len(self._channels) and len(self._channels) > 0:
raise ValueError("Can't add unnamed channel when prior channels are named")
i = len(self._channels)
self._channels.append(_BlockOutputChannel(self._block),i)
return i
else:
if len(self._namesToIndices) != len(self._channels):
raise ValueError("Can't add named channel when prior channels are unnamed")
if name in self._namesToIndices:
raise ValueError("Already have a channel named {}".format(name))
i = len(self._channels)
self._channels.append(_BlockOutputChannel(self._block,name),i)
self._namesToIndices[name] = i
return name
def __len__(self) -> int:
return len(self._channels)
def __contains__(self, key : str) -> bool:
return key in self._namesToIndices
def items(self) -> Iterator:
if len(self._namesToIndices):
for k,i in self._namesToIndices.items():
yield (k,self._channels[i])
else:
return enumerate(self._channels)
def setName(self, name : str) -> None:
if len(self._channels)!=1:
raise ValueError("For output.setName() to work, need exactly 1 channel")
self._channels[0].setName(name)
def setType(self,type) -> None:
if len(self._channels)!=1:
raise ValueError("For output.setType() to work, need exactly 1 channel")
self._channels[0].setType(type)
def __getitem__(self, key : Union[int,str]):
if isinstance(key,int):
return self._channels[key]
else:
return self._channels[self._namesToIndices[key]]
def __setitem__(self, key : Union[int,str], value) -> None:
if isinstance(key,int):
self._channels[key].connect(value)
else:
self._channels[self._namesToIndices[key]].connect(value)
def read(self):
if len(self._channels)==0:
return None
elif len(self._channels) > 1:
if len(self._namesToIndices):
return dict((k,self._channels[i].read()) for (k,i) in self._namesToIndices.items())
else:
return [c.read() for c in self._channels]
return self._channels[0].read()
def reset(self) -> None:
if len(self._channels)!=1:
raise ValueError("For output.reset() to work, need exactly 1 channel")
self._channels[0].reset()
def setValue(self,val) -> None:
if len(self._channels)!=1:
raise ValueError("For output.reset() to work, need exactly 1 channel")
self._channels[0].setValue(val)
def __add__(self,rhs):
if len(self._channels)!=1:
raise ValueError("For output + to work, need exactly 1 channel")
res = Add()
res.input[0].connect(self)
res.input[1].connect(rhs)
return res
def __sub__(self,rhs):
if len(self._channels)!=1:
raise ValueError("For output - to work, need exactly 1 channel")
res = Sub()
res.input[0].connect(self)
res.input[1].connect(rhs)
return res
def __gt__(self,rhs):
if len(self._channels)!=1:
raise ValueError("For output > to work, need exactly 1 channel")
res = BinaryOp(operator.gt)
res.input[0].connect(self)
res.input[1].connect(rhs)
return res
def __ge__(self,rhs):
if len(self._channels)!=1:
raise ValueError("For output >= to work, need exactly 1 channel")
res = BinaryOp(operator.ge)
res.input[0].connect(self)
res.input[1].connect(rhs)
return res
def __lt__(self,rhs):
if len(self._channels)!=1:
raise ValueError("For output < to work, need exactly 1 channel")
res = BinaryOp(operator.lt)
res.input[0].connect(self)
res.input[1].connect(rhs)
return res
def __le__(self,rhs):
if len(self._channels)!=1:
raise ValueError("For output <= to work, need exactly 1 channel")
res = BinaryOp(operator.le)
res.input[0].connect(self)
res.input[1].connect(rhs)
return res
def __eq__(self,rhs):
if len(self._channels)!=1:
raise ValueError("For output == to work, need exactly 1 channel")
res = BinaryOp(operator.eq)
res.input[0].connect(self)
res.input[1].connect(rhs)
return res
def __ne__(self,rhs):
if len(self._channels)!=1:
raise ValueError("For output != to work, need exactly 1 channel")
res = BinaryOp(operator.ne)
res.input[0].connect(self)
res.input[1].connect(rhs)
return res
def self_test():
from . import filters
import math
sensor = ValueSource(0.0)
sensor.output.setName("some sensor")
filter = filters.FIRFilter([0.25]*4) #moving average
filter.input = sensor.output[0] #for single-output blocks, output is the same as output[0]
filter.output.setName("filtered sensor")
#sink = Sink()
proc = Min()(filter.output,sensor.output)
#sink.input = proc.output
diagram = SuperBlock([sensor])
print("Blocks in super-block:",[str(b) for b in diagram.blocks])
print("Free inputs: ",[str(c) for c in diagram.free_inputs])
print("Free outputs: ",[str(c) for c in diagram.free_outputs])
for t in range(10):
t = t*0.001
sval = math.sin(t*10)
sensor.write(sval)
res = diagram.advance()
fval = proc.output.read()
print("f({}) = {}".format(sval,fval))
if __name__ == '__main__':
self_test()