Source code for klampt.control.blocks.state_machine

"""
Implements state machines as :class:`Block` structures.

- :class:`StateMachineBase`: a state machine that switches between
  multiple sub-controllers, with one sub-controller running at once and the
  active controller triggered by some signal.
- :class:`TransitionStateMachine`: a state machine with explicit
  transition conditions.
- :class:`TimedSequenceBlock`: a sequence of controllers, switched by
  time.

"""

from .core import Block
from .utils import BlockSignal

[docs]class StateMachineBase(Block): """A base class for a finite state machine controller. One sub-controller may be active at once. To implement transitions, :meth:`next_state` needs to be filled out by the subclass. If a sub-block has inputs 'enter' or 'exit', they are raised when the sub-controller enters or exits. Signals: - 'reset' (bool): when 1, the state machine goes back to the start state and resets all blocks. Outputs: - 'state' (int): the currently active state. Attributes: blocks (list of Block): the list of sub-controllers current (int): the index of the currently active sub-controller. -1 indicates no current sub-controller. start (int): the index of the initial sub-controller. """ def __init__(self,blocks,start=0,reset_on_enter=True): Block.__init__(self,[],['state']) self.blocks = blocks self.current = start self.start = start self.reset_on_enter = reset_on_enter
[docs] def next_state(self,state,*args,**kwargs): """Subclasses should override this to implement the transitions""" return state
def __str__(self): strs = [str(c) for c in self.blocks] strs[self.current] = '* '+strs[self.current] return self.__class__.__name__+'\n'+'\n '.join(strs)
[docs] def signal(self,type,*args): if type=='reset' or (type == 'enter' and self.reset_on_enter): if self.current >= 0: self.blocks[self.current].signal('exit') self.current = self.start if self.current >= 0: self.blocks[self.current].signal('enter') else: if self.current >= 0: self.blocks[self.current].signal(type,*args)
[docs] def advance(self,*args,**kwargs): if self.current<0: return None n = None try: res = self.blocks[self.current]._process() except BlockSignal as s: if s.type == 'change_state': n = int(s.text) else: self.signal(s.type) if n is None: n = self.next_state(self.current,*args,**kwargs) if n != self.current: self.blocks[self.current].signal('exit') if n >= 0: self.blocks[n].signal('enter') self.current = n return self.current
[docs] def func(self,name): if self.current>=0: self.blocks[self.current].func(name)
def __getstate__(self): blockState = [] for c in self.blocks: try: s = c.__getstate__() except NotImplementedError: s = None blockState.append(s) return {'current':self.current,'blocks':blockState} def __setstate__(self,state): if 'current' not in state or 'blocks' not in state or len(state['blocks']) != len(self.blocks): raise ValueError("Invalid state dict") self.current = state.current for (c,s) in zip(self.blocks,state['blocks']): if s is not None: c.__setstate__(s)
[docs]class TransitionStateMachine(StateMachineBase): """A state machine controller with a transition matrix that determines when to move to the next state. Attributes: blocks: same as StateMachineBase current: same as StateMachineBase transitions (list of dicts): a list of transition conditions [{x1:cond1,...,xk:condk},...,{}] so that if j is in transitions[i], then transitions[i][j](*inputs) is a condition that tells the state machine whether to change states from i to j. """ def __init__(self,blocks,transitions=None,inputs=0,start=0,reset_on_enter=True): StateMachineBase.__init__(self,blocks,start,reset_on_enter) Block.__init__(self,inputs,['state']) if transitions is None: self.transitions = [dict() for c in self.blocks] else: if len(transitions) != len(blocks): raise ValueError("Transitions is in invalid format") self.transitions = transitions
[docs] def set_transition(self,fromState,toState,func): assert fromState >= 0 and fromState < len(self.transitions) assert toState >= 0 and toState < len(self.transitions) self.transitions[fromState][toState] = func
[docs] def next_state(self,state,*args,**kwargs): if state < 0: return state trans = self.transitions[state] for (k,v) in trans.items(): if v(*args,**kwargs): print("TransitionStateMachine: Transition to controller",k,":",str(self.blocks[k])) return k return state
[docs]class TimedSequenceBlock(TransitionStateMachine): """A state-machine controller that goes through each sub-controller in sequence. Input: t (float): the current time """ def __init__(self,blocks,times): assert len(times)==len(blocks) trans = [{} for c in blocks] for i in range(len(blocks)-1): trans[i][i+1] = lambda t:t >= times[i] trans[-1][-1] = lambda t:t >= times[-1] TransitionStateMachine.__init__(self,blocks,trans,['t'])