# pyright: basic, reportArgumentType=false, reportIncompatibleMethodOverride=false, reportOptionalMemberAccess=false
'''Optional asyncio execution layer for pysm.
This module is intentionally not imported from ``pysm.__init__``. It requires
CPython's ``asyncio`` runtime and keeps the classic core import untouched.
'''
import asyncio
import inspect
from collections import deque
from .pysm import Event, StateMachine, StateMachineException, any_event, logger
[docs]
class AsyncQueuedStateMachine(StateMachine):
'''Async state machine with run-to-completion event scheduling.
The machine is intended to be used from one asyncio event loop. External
idle dispatches are serialized with an ``asyncio.Lock``. Events dispatched
from the currently running transition task are queued as internal events.
Events dispatched by other tasks while the machine is already processing
are queued as external events and return after enqueueing.
'''
def __init__(self, name, max_internal_steps=None):
super(AsyncQueuedStateMachine, self).__init__(name)
self.max_internal_steps = max_internal_steps
self._internal_queue = deque()
self._external_queue = deque()
self._is_processing = False
self._processing_task = None
self._execution_lock = None
[docs]
def initialize(self, fire_events_on_init=False):
if fire_events_on_init:
raise StateMachineException(
'Use async_initialize(fire_events_on_init=True) for '
'AsyncQueuedStateMachine enter handlers')
StateMachine.initialize(self, fire_events_on_init=False)
[docs]
async def async_initialize(self, fire_events_on_init=False):
'''Initialize and optionally await entry handlers on the initial path.'''
async with self._get_execution_lock():
StateMachine.initialize(self, fire_events_on_init=False)
if not fire_events_on_init:
return
self._is_processing = True
self._processing_task = asyncio.current_task()
try:
await self._enter_initial_states()
await self._process_queues()
except BaseException:
self._clear_queues()
raise
finally:
self._is_processing = False
self._processing_task = None
async def _process_queues(self):
internal_steps = 0
while self._internal_queue or self._external_queue:
if self._internal_queue:
internal_steps += 1
if (self.max_internal_steps is not None and
internal_steps > self.max_internal_steps):
raise StateMachineException(
'AsyncQueuedStateMachine "{0}" exceeded '
'max_internal_steps={1}'.format(
self.name, self.max_internal_steps))
next_event = self._internal_queue.popleft()
else:
internal_steps = 0
next_event = self._external_queue.popleft()
await self._dispatch_one(next_event)
[docs]
async def dispatch(self, event):
'''Enqueue and process ``event`` using async RTC semantics.'''
current_task = asyncio.current_task()
if self._is_processing:
if current_task is self._processing_task:
self._internal_queue.append(event)
else:
self._external_queue.append(event)
return
async with self._get_execution_lock():
self._external_queue.append(event)
self._is_processing = True
self._processing_task = current_task
try:
await self._process_queues()
except BaseException:
self._clear_queues()
raise
finally:
self._is_processing = False
self._processing_task = None
async def _dispatch_one(self, event):
event.state_machine = self
leaf_state_before = self._require_initialized()
await self._on(leaf_state_before, event)
transition = await self._get_transition(event)
if transition is None:
return
to_state = transition['to_state']
from_state = transition['from_state']
await self._call(transition['before'], leaf_state_before, event)
top_state = await self._exit_states(event, from_state, to_state)
await self._call(transition['action'], leaf_state_before, event)
await self._enter_states(event, top_state, to_state)
await self._call(transition['after'], self.leaf_state, event)
async def _on(self, state, event):
if event.name in state.handlers:
event.propagate = False
await self._call(state.handlers[event.name], state, event)
if (state.parent and event.propagate and
event.name not in ('exit', 'enter')):
await self._on(state.parent, event)
async def _enter_initial_states(self):
for state in self._initial_entry_path():
logger.debug('entering %s', state.name)
enter_event = Event('enter', propagate=False, source_event=None)
enter_event.state_machine = self
self.root_machine._leaf_state = state
await self._on(state, enter_event)
async def _get_transition(self, event):
machine = self.leaf_state.parent
while machine:
transition = await self._get_machine_transition(machine, event)
if transition:
return transition
machine = machine.parent
return None
async def _get_machine_transition(self, machine, event):
key = (machine.state, event.name, event.input)
transition = await self._get_transition_matching_condition(
machine, key, event)
if transition:
return transition
key = (machine.state, any_event, event.input)
return await self._get_transition_matching_condition(
machine, key, event)
async def _get_transition_matching_condition(self, machine, key, event):
from_state = self.leaf_state
for transition in machine._transitions._transitions[key]:
result = await self._call(
transition['condition'], from_state, event)
if result is True:
return transition
return None
async def _exit_states(self, event, from_state, to_state):
if to_state is None:
return None
state = self.leaf_state
self.leaf_state_stack.push(state)
while ((state.parent and
not (from_state.is_substate(state) and
to_state.is_substate(state))) or
(state == from_state == to_state)):
logger.debug('exiting %s', state.name)
exit_event = Event('exit', propagate=False, source_event=event)
exit_event.state_machine = self
self.root_machine._leaf_state = state
await self._on(state, exit_event)
state.parent.state_stack.push(state)
state.parent.state = state.parent.initial_state
state = state.parent
return state
async def _enter_states(self, event, top_state, to_state):
if to_state is None:
return
path = []
state = self._get_leaf_state(to_state)
while state.parent and state != top_state:
path.append(state)
state = state.parent
for state in reversed(path):
logger.debug('entering %s', state.name)
enter_event = Event('enter', propagate=False, source_event=event)
enter_event.state_machine = self
self.root_machine._leaf_state = state
await self._on(state, enter_event)
state.parent.state = state
[docs]
async def set_previous_leaf_state(self, event=None):
'''Async version of ``StateMachine.set_previous_leaf_state``.'''
if event is not None:
event.state_machine = self
from_state = self.leaf_state
try:
to_state = self.leaf_state_stack.peek()
except IndexError:
return
top_state = await self._exit_states(event, from_state, to_state)
await self._enter_states(event, top_state, to_state)
[docs]
async def revert_to_previous_leaf_state(self, event=None):
'''Async version of ``StateMachine.revert_to_previous_leaf_state``.'''
await self.set_previous_leaf_state(event)
try:
self.leaf_state_stack.pop()
self.leaf_state_stack.pop()
except IndexError:
return
async def _call(self, callback, state, event):
result = callback(state, event)
if inspect.isawaitable(result):
return await result
return result
def _clear_queues(self):
while self._internal_queue:
self._internal_queue.popleft()
while self._external_queue:
self._external_queue.popleft()
def _get_execution_lock(self):
if self._execution_lock is None:
self._execution_lock = asyncio.Lock()
return self._execution_lock