Source code for pysm.pysm

'''Python State Machine

The goal of this library is to give you a close to the State Pattern
simplicity with much more flexibility. And, if needed, the full state machine
functionality, including `FSM
<https://en.wikipedia.org/wiki/Finite-state_machine>`_, `HSM
<https://en.wikipedia.org/wiki/UML_state_machine
#Hierarchically_nested_states>`_, `PDA
<https://en.wikipedia.org/wiki/Pushdown_automaton>`_ and other tasty things.

Goals:
    - Provide a State Pattern-like behavior with more flexibility
    - Be explicit and don't add any code to objects
    - Handle directly any kind of event (not only strings) - parsing strings is
      cool again!
    - Keep it simple, even for someone who's not very familiar with the FSM
      terminology

----

.. |StateMachine| replace:: :class:`~.StateMachine`
.. |State| replace:: :class:`~.State`
.. |Hashable| replace:: :class:`~collections.Hashable`
.. |Iterable| replace:: :class:`~collections.Iterable`
.. |Callable| replace:: :class:`~collections.Callable`

'''
import logging
import sys
from collections import defaultdict, deque


# Required to make it Micropython compatible
if str(type(defaultdict)).find('module') > 0:
    # pylint: disable=no-member
    defaultdict = defaultdict.defaultdict


# Required to make it Micropython compatible
def patch_deque(deque_module):
    class deque_maxlen(object):
        def __init__(self, iterable=None, maxlen=0):
            # pylint: disable=no-member
            if iterable is None:
                iterable = []
            if maxlen in [None, 0]:
                maxlen = float('Inf')
            self.q = deque_module.deque(iterable)
            self.maxlen = maxlen

        def pop(self):
            return self.q.pop()

        def append(self, item):
            if self.maxlen > 0 and len(self.q) >= self.maxlen:
                self.q.popleft()
            self.q.append(item)

        def __getattr__(self, name):
            return getattr(self.q, name)

        def __bool__(self):
            # pylint: disable=len-as-condition
            if len(self.q) > 0:
                return True
            return False

        def __iter__(self):
            return iter(self.q)

        def __getitem__(self, key):
            return self.q[key]

    return deque_maxlen


# Required to make it Micropython compatible
try:
    test_deque = deque(maxlen=1)
except TypeError:
    # TypeError: unexpected keyword argument 'maxlen'
    if hasattr(deque, 'deque'):
        deque = patch_deque(deque)
    else:
        class MockDequeModule(object):
            deque = deque
        deque = patch_deque(MockDequeModule)
else:
    del test_deque


logger = logging.getLogger(__name__)
handler = logging.StreamHandler(sys.stdout)
logger.addHandler(handler)
logger.setLevel(logging.INFO)


[docs]class AnyEvent(object): ''' hash(object()) doesn't work in MicroPython therefore the need for this class. ''' pass
any_event = AnyEvent() def is_iterable(obj): try: iter(obj) except TypeError: return False return True
[docs]class StateMachineException(Exception): '''All |StateMachine| exceptions are of this type. ''' pass
[docs]class Event(object): r'''Triggers actions and transition in |StateMachine|. Events are also used to control the flow of data propagated to states within the states hierarchy. Event objects have the following attributes set after an event has been dispatched: **Attributes:** .. attribute:: state_machine A |StateMachine| instance that is handling the event (the one whose :func:`pysm.pysm.StateMachine.dispatch` method is called) .. attribute:: propagate An event is propagated from a current leaf state up in the states hierarchy until it encounters a handler that can handle the event. To propagate it further, it has to be set to `True` in a handler. :param name: Name of an event. It may be anything as long as it's hashable. :type name: |Hashable| :param input: Optional input. Anything hashable. :type input: |Hashable| :param \*\*cargo: Keyword arguments for an event, used to transport data to handlers. It's added to an event as a `cargo` property of type `dict`. For `enter` and `exit` events, the original event that triggered a transition is passed in cargo as `source_event` entry. .. note`: `enter` and `exit` events are never propagated, even if the `propagate` flag is set to `True` in a handler. **Example Usage:** .. code-block:: python state_machine.dispatch(Event('start')) state_machine.dispatch(Event('start', key='value')) state_machine.dispatch(Event('parse', input='#', entity=my_object)) state_machine.dispatch(Event('%')) state_machine.dispatch(Event(frozenset([1, 2]))) ''' def __init__(self, name, input=None, **cargo): self.name = name self.input = input self.propagate = True self.cargo = cargo # This must be always the root machine self.state_machine = None def __repr__(self): return '<Event {0}, input={1}, cargo={2} ({3})>'.format( self.name, self.input, self.cargo, hex(id(self)))
[docs]class State(object): '''Represents a state in a state machine. `enter` and `exit` handlers are called whenever a state is entered or exited respectively. These action names are reserved only for this purpose. It is encouraged to extend this class to encapsulate a state behavior, similarly to the State Pattern. Once it's extended, the preferred way of adding an event handlers is through the :func:`register_handlers` hook. Usually, there's no need to create the :func:`__init__` in a subclass. :param name: Human readable state name :type name: str **Example Usage:** .. code-block:: python # Extending State to encapsulate state-related behavior. Similar to the # State Pattern. class Running(State): def on_enter(self, state, event): print('Running state entered') def on_jump(self, state, event): print('Jumping') def on_dollar(self, state, event): print('Dollar found!') def register_handlers(self): self.handlers = { 'enter': self.on_enter, 'jump': self.on_jump, '$': self.on_dollar } .. code-block:: python # Different way of attaching handlers. A handler may be any function as # long as it takes `state` and `event` args. def another_handler(state, event): print('Another handler') running = State('running') running.handlers = { 'another_event': another_handler } ''' def __init__(self, name): self.parent = None self.name = name # self.id = 1 self.handlers = {} self.initial = False self.register_handlers() def __repr__(self): return '<State {0} ({1})>'.format(self.name, hex(id(self)))
[docs] def register_handlers(self): '''Hook method to register event handlers. It is used to easily extend |State| class. The hook is called from within the base :func:`.State.__init__`. Usually, the :func:`__init__` doesn't have to be created in a subclass. Event handlers are kept in a `dict`, with events' names as keys, therefore registered events may be of any hashable type. Handlers take two arguments: - **state**: The current state that is handling an event. The same handler function may be attached to many states, therefore it is helpful to get the handling state's instance. - **event**: An event that triggered the handler call. If it is an `enter` or `exit` event, then the source event (the one that triggered the transition) is passed in `event`'s cargo property as `cargo.source_event`. **Example Usage:** .. code-block:: python class On(State): def handle_my_event(self, state, event): print('Handling an event') def register_handlers(self): self.handlers = { 'my_event': self.handle_my_event, '&': self.handle_my_event, frozenset([1, 2]): self.handle_my_event } ''' pass
[docs] def is_substate(self, state): '''Check whether the `state` is a substate of `self`. Also `self` is considered a substate of `self`. :param state: State to verify :type state: |State| :returns: `True` if `state` is a substate of `self`, `False` otherwise :rtype: bool ''' if state is self: return True parent = self.parent while parent: if parent is state: return True parent = parent.parent return False
def _on(self, event): if event.name in self.handlers: event.propagate = False self.handlers[event.name](self, event) # Never propagate exit/enter events, even if propagate is set to True if (self.parent and event.propagate and event.name not in ('exit', 'enter')): self.parent._on(event) def _nop(self, state, event): del state # Unused (silence pylint) del event # Unused (silence pylint) return True
class TransitionsContainer(object): def __init__(self, machine): self._machine = machine self._transitions = defaultdict(list) def add(self, key, transition): self._transitions[key].append(transition) def get(self, event): key = (self._machine.state, event.name, event.input) return self._get_transition_matching_condition(key, event) def _get_transition_matching_condition(self, key, event): from_state = self._machine.leaf_state for transition in self._transitions[key]: if transition['condition'](from_state, event) is True: return transition key = (self._machine.state, any_event, event.input) for transition in self._transitions[key]: if transition['condition'](from_state, event) is True: return transition return None class Stack(object): def __init__(self, maxlen=None): self.deque = deque(maxlen=maxlen) def pop(self): return self.deque.pop() def push(self, value): self.deque.append(value) def peek(self): return self.deque[-1] def __repr__(self): return str(list(self.deque))
[docs]class StateMachine(State): '''State machine controls actions and transitions. To provide the State Pattern-like behavior, the formal state machine rules may be slightly broken, and instead of creating an `internal transition <https://en.wikipedia.org/wiki/UML_state_machine #Internal_transitions>`_ for every action that doesn't require a state change, event handlers may be added to states. These are handled first when an event occurs. After that the actual transition is called, calling `enter`/`exit` actions and other transition actions. Nevertheless, internal transitions are also supported. So the order of calls on an event is as follows: 1. State's event handler 2. `condition` callback 3. `before` callback 4. `exit` handlers 5. `action` callback 6. `enter` handlers 7. `after` callback If there's no handler in states or transition for an event, it is silently ignored. If using nested state machines, all events should be sent to the root state machine. **Attributes:** .. attribute:: state Current, local state (instance of |State|) in a state machine. .. attribute:: stack Stack that can be used if the `Pushdown Automaton (PDA) <https://en.wikipedia.org/wiki/Pushdown_automaton>`_ functionality is needed. .. attribute:: state_stack Stack of previous local states in a state machine. With every transition, a previous state (instance of |State|) is pushed to the `state_stack`. Only :attr:`.StateMachine.STACK_SIZE` (32 by default) are stored and old values are removed from the stack. .. attribute:: leaf_state_stack Stack of previous leaf states in a state machine. With every transition, a previous leaf state (instance of |State|) is pushed to the `leaf_state_stack`. Only :attr:`.StateMachine.STACK_SIZE` (32 by default) are stored and old values are removed from the stack. **leaf_state** See the :attr:`~.StateMachine.leaf_state` property. **root_machine** See the :attr:`~.StateMachine.root_machine` property. :param name: Human readable state machine name :type name: str .. note :: |StateMachine| extends |State| and therefore it is possible to always use a |StateMachine| instance instead of the |State|. This wouldn't be a good practice though, as the |State| class is designed to be as small as possible memory-wise and thus it's more memory efficient. It is valid to replace a |State| with a |StateMachine| later on if there's a need to extend a state with internal states. .. note:: For the sake of speed thread safety isn't guaranteed. **Example Usage:** .. code-block:: python state_machine = StateMachine('root_machine') state_on = State('On') state_off = State('Off') state_machine.add_state('Off', initial=True) state_machine.add_state('On') state_machine.add_transition(state_on, state_off, events=['off']) state_machine.add_transition(state_off, state_on, events=['on']) state_machine.initialize() state_machine.dispatch(Event('on')) ''' STACK_SIZE = 32 def __init__(self, name): super(StateMachine, self).__init__(name) self.states = set() self.state = None self._transitions = TransitionsContainer(self) self.state_stack = Stack(maxlen=StateMachine.STACK_SIZE) self.leaf_state_stack = Stack(maxlen=StateMachine.STACK_SIZE) self.stack = Stack()
[docs] def add_state(self, state, initial=False): '''Add a state to a state machine. If states are added, one (and only one) of them has to be declared as `initial`. :param state: State to be added. It may be an another |StateMachine| :type state: |State| :param initial: Declare a state as initial :type initial: bool ''' Validator(self).validate_add_state(state, initial) state.initial = initial state.parent = self self.states.add(state)
[docs] def add_states(self, *states): '''Add `states` to the |StateMachine|. To set the initial state use :func:`set_initial_state`. :param states: A list of states to be added :type states: |State| ''' for state in states: self.add_state(state)
[docs] def set_initial_state(self, state): '''Set an initial state in a state machine. :param state: Set this state as initial in a state machine :type state: |State| ''' Validator(self).validate_set_initial(state) state.initial = True
@property def initial_state(self): '''Get the initial state in a state machine. :returns: Initial state in a state machine :rtype: |State| ''' for state in self.states: if state.initial: return state return None @property def root_machine(self): '''Get the root state machine in a states hierarchy. :returns: Root state in the states hierarchy :rtype: |StateMachine| ''' machine = self while machine.parent: machine = machine.parent return machine
[docs] def add_transition( self, from_state, to_state, events, input=None, action=None, condition=None, before=None, after=None): '''Add a transition to a state machine. All callbacks take two arguments - `state` and `event`. See parameters description for details. It is possible to create conditional if/elif/else-like logic for transitions. To do so, add many same transition rules with different condition callbacks. First met condition will trigger a transition, if no condition is met, no transition is performed. :param from_state: Source state :type from_state: |State| :param to_state: Target state. If `None`, then it's an `internal transition <https://en.wikipedia.org/wiki/UML_state_machine #Internal_transitions>`_ :type to_state: |State|, `None` :param events: List of events that trigger the transition :type events: |Iterable| of |Hashable| :param input: List of inputs that trigger the transition. A transition event may be associated with a specific input. i.e.: An event may be ``parse`` and an input associated with it may be ``$``. May be `None` (default), then every matched event name triggers a transition. :type input: `None`, |Iterable| of |Hashable| :param action: Action callback that is called during the transition after all states have been left but before the new one is entered. `action` callback takes two arguments: - state: Leaf state before transition - event: Event that triggered the transition :type action: |Callable| :param condition: Condition callback - if returns `True` transition may be initiated. `condition` callback takes two arguments: - state: Leaf state before transition - event: Event that triggered the transition :type condition: |Callable| :param before: Action callback that is called right before the transition. `before` callback takes two arguments: - state: Leaf state before transition - event: Event that triggered the transition :type before: |Callable| :param after: Action callback that is called just after the transition `after` callback takes two arguments: - state: Leaf state after transition - event: Event that triggered the transition :type after: |Callable| ''' # Rather than adding some if statements later on, let's just declare a # neutral items that will do nothing if called. It simplifies the logic # a lot. if input is None: input = tuple([None]) if action is None: action = self._nop if before is None: before = self._nop if after is None: after = self._nop if condition is None: condition = self._nop Validator(self).validate_add_transition( from_state, to_state, events, input) for input_value in input: for event in events: key = (from_state, event, input_value) transition = { 'from_state': from_state, 'to_state': to_state, 'action': action, 'condition': condition, 'before': before, 'after': after, } self._transitions.add(key, transition)
def _get_transition(self, event): machine = self.leaf_state.parent while machine: transition = machine._transitions.get(event) if transition: return transition machine = machine.parent return None @property def leaf_state(self): '''Get the current leaf state. The :attr:`~.StateMachine.state` property gives the current, local state in a state machine. The `leaf_state` goes to the bottom in a hierarchy of states. In most cases, this is the property that should be used to get the current state in a state machine, even in a flat FSM, to keep the consistency in the code and to avoid confusion. :returns: Leaf state in a hierarchical state machine :rtype: |State| ''' return self._get_leaf_state(self) def _get_leaf_state(self, state): while hasattr(state, 'state') and state.state is not None: state = state.state return state
[docs] def initialize(self): '''Initialize states in the state machine. After a state machine has been created and all states are added to it, :func:`initialize` has to be called. If using nested state machines (HSM), :func:`initialize` has to be called on a root state machine in the hierarchy. ''' machines = deque() machines.append(self) while machines: machine = machines.popleft() Validator(self).validate_initial_state(machine) machine.state = machine.initial_state for child_state in machine.states: if isinstance(child_state, StateMachine): machines.append(child_state)
[docs] def dispatch(self, event): '''Dispatch an event to a state machine. If using nested state machines (HSM), it has to be called on a root state machine in the hierarchy. :param event: Event to be dispatched :type event: :class:`.Event` ''' event.state_machine = self leaf_state_before = self.leaf_state leaf_state_before._on(event) transition = self._get_transition(event) if transition is None: return to_state = transition['to_state'] from_state = transition['from_state'] transition['before'](leaf_state_before, event) top_state = self._exit_states(event, from_state, to_state) transition['action'](leaf_state_before, event) self._enter_states(event, top_state, to_state) transition['after'](self.leaf_state, event)
def _exit_states(self, event, from_state, to_state): if to_state is None: return None state = self.leaf_state self.leaf_state_stack.push(state) while (state.parent and not (from_state.is_substate(state) and to_state.is_substate(state)) or (state == from_state == to_state)): logger.debug('exiting %s', state.name) exit_event = Event('exit', propagate=False, source_event=event) exit_event.state_machine = self state._on(exit_event) state.parent.state_stack.push(state) state.parent.state = state.parent.initial_state state = state.parent return state def _enter_states(self, event, top_state, to_state): if to_state is None: return path = [] state = self._get_leaf_state(to_state) while state.parent and state != top_state: path.append(state) state = state.parent for state in reversed(path): logger.debug('entering %s', state.name) enter_event = Event('enter', propagate=False, source_event=event) enter_event.state_machine = self state._on(enter_event) state.parent.state = state
[docs] def set_previous_leaf_state(self, event=None): '''Transition to a previous leaf state. This makes a dynamic transition to a historical state. The current `leaf_state` is saved on the stack of historical leaf states when calling this method. :param event: (Optional) event that is passed to states involved in the transition :type event: :class:`.Event` ''' if event is not None: event.state_machine = self from_state = self.leaf_state try: to_state = self.leaf_state_stack.peek() except IndexError: return top_state = self._exit_states(event, from_state, to_state) self._enter_states(event, top_state, to_state)
[docs] def revert_to_previous_leaf_state(self, event=None): '''Similar to :func:`set_previous_leaf_state` but the current leaf_state is not saved on the stack of states. It allows to perform transitions further in the history of states. ''' self.set_previous_leaf_state(event) try: self.leaf_state_stack.pop() self.leaf_state_stack.pop() except IndexError: return
class Validator(object): def __init__(self, state_machine): self.state_machine = state_machine self.template = 'Machine "{0}" error: {1}'.format( self.state_machine.name, '{0}') def _raise(self, msg): raise StateMachineException(self.template.format(msg)) def validate_add_state(self, state, initial): if not isinstance(state, State): msg = 'Unable to add state of type {0}'.format(type(state)) self._raise(msg) self._validate_state_already_added(state) if initial is True: self.validate_set_initial(state) def _validate_state_already_added(self, state): root_machine = self.state_machine.root_machine machines = deque() machines.append(root_machine) while machines: machine = machines.popleft() if state in machine.states and machine is not self.state_machine: msg = ('Machine "{0}" error: State "{1}" is already added ' 'to machine "{2}"'.format( self.state_machine.name, state.name, machine.name)) self._raise(msg) for child_state in machine.states: if isinstance(child_state, StateMachine): machines.append(child_state) def validate_set_initial(self, state): for added_state in self.state_machine.states: if added_state.initial is True and added_state is not state: msg = ('Unable to set initial state to "{0}". ' 'Initial state is already set to "{1}"' .format(state.name, added_state.name)) self._raise(msg) def validate_add_transition(self, from_state, to_state, events, input): self._validate_from_state(from_state) self._validate_to_state(to_state) self._validate_events(events) self._validate_input(input) def _validate_from_state(self, from_state): if from_state not in self.state_machine.states: msg = 'Unable to add transition from unknown state "{0}"'.format( from_state.name) self._raise(msg) def _validate_to_state(self, to_state): root_machine = self.state_machine.root_machine # pylint: disable=no-else-return if to_state is None: return elif to_state is root_machine: return elif not to_state.is_substate(root_machine): msg = 'Unable to add transition to unknown state "{0}"'.format( to_state.name) self._raise(msg) def _validate_events(self, events): if not is_iterable(events): msg = ('Unable to add transition, events is not iterable: {0}' .format(events)) self._raise(msg) def _validate_input(self, input): if not is_iterable(input): msg = ('Unable to add transition, input is not iterable: {0}' .format(input)) self._raise(msg) def validate_initial_state(self, machine): if machine.states and not machine.initial_state: msg = 'Machine "{0}" has no initial state'.format(machine.name) self._raise(msg)