# pyright: basic
'''Optional queued execution layers for pysm.
The classes in this module deliberately live outside ``pysm.__init__`` so the
classic core import remains tiny and suitable for MicroPython-oriented builds.
'''
from collections import deque
from .pysm import StateMachine, StateMachineException
[docs]
class QueuedStateMachine(StateMachine):
'''State machine with run-to-completion event scheduling.
External events enter an external FIFO queue. Events dispatched while the
machine is already processing enter a separate internal FIFO queue. The
internal queue is always drained before the next external event is handled.
'''
def __init__(self, name, max_internal_steps=None):
super(QueuedStateMachine, self).__init__(name)
self.max_internal_steps = max_internal_steps
self._internal_queue = deque()
self._external_queue = deque()
self._is_processing = False
[docs]
def initialize(self, fire_events_on_init=False):
StateMachine.initialize(self, fire_events_on_init=False)
if not fire_events_on_init:
return
self._is_processing = True
try:
self._enter_initial_states()
self._process_queues()
except BaseException:
self._clear_queues()
raise
finally:
self._is_processing = False
[docs]
def dispatch(self, event):
'''Enqueue ``event`` and process pending work to completion.'''
if self._is_processing:
self._internal_queue.append(event)
return
self._external_queue.append(event)
self._is_processing = True
try:
self._process_queues()
except BaseException:
self._clear_queues()
raise
finally:
self._is_processing = False
def _process_queues(self):
internal_steps = 0
while self._internal_queue or self._external_queue:
if self._internal_queue:
internal_steps += 1
if (self.max_internal_steps is not None and
internal_steps > self.max_internal_steps):
raise StateMachineException(
'QueuedStateMachine "{0}" exceeded '
'max_internal_steps={1}'.format(
self.name, self.max_internal_steps))
next_event = self._internal_queue.popleft()
else:
internal_steps = 0
next_event = self._external_queue.popleft()
StateMachine.dispatch(self, next_event)
def _clear_queues(self):
while self._internal_queue:
self._internal_queue.popleft()
while self._external_queue:
self._external_queue.popleft()
[docs]
class ThreadSafeQueuedStateMachine(QueuedStateMachine):
'''Queued state machine protected by a reentrant execution lock.
Long-running or blocking handlers hold the lock until the current
run-to-completion cycle finishes. Async execution is intentionally kept out
of this class.
'''
def __init__(self, name, max_internal_steps=None):
super(ThreadSafeQueuedStateMachine, self).__init__(
name, max_internal_steps=max_internal_steps)
import threading
self._execution_lock = threading.RLock()
[docs]
def dispatch(self, event):
with self._execution_lock:
return QueuedStateMachine.dispatch(self, event)
[docs]
def initialize(self, fire_events_on_init=False):
with self._execution_lock:
return QueuedStateMachine.initialize(
self, fire_events_on_init=fire_events_on_init)