""" Implements Action and ActionInThread nodes.
Actions are the leaf nodes in any Sequencer script. They execute python methods or functions.
Coroutines are associated to :class:`Action` nodes while normal callable objects must be
associated to :class:`ActionInThread` nodes.
"""
import inspect
import logging
import functools
import asyncio
from datetime import datetime
import attr
from zope.interface import implementer
from seq.lib.contextvars_executor import ContextVarExecutor
from .state import T_STATE
from .rtflags import RTFLAG
from .interface import INode, _BaseNode
from .. import ob
from ..counter import Counter
from .utils import uniqueId
logger = logging.getLogger(__name__)
[docs]@implementer(INode)
@attr.s
class Action(_BaseNode):
""" Action ctor.
Action nodes executes `coroutines` in a Sequencer script.
Args:
f(coroutine): The coroutine the node will execute
Keyword Args:
id (str): Unique id. If not provided an unique identifier is assigned.
name (str): Node name. If not provided a name is assigned.
Raises:
TypeException: is `f` is not a coroutine.
"""
f = attr.ib(default=None, repr=False)
def __attrs_post_init__(self):
"""
setup object.
Sets name, id and allows node to run.
"""
self._check_function_type()
if self.name is None:
try:
self.name = self.f.__qualname__
except AttributeError:
self.name = id(self.f)
if self.id is None:
self.id = "%s_%s" % (self.name, uniqueId())
self.running_checkpoint.set() # running is allowed
self.serial_number = Counter.new_value()
try:
desc = self.f.__doc__ or "No Description"
except AttributeError:
pass
self.description = desc.strip().split('\n')[0]
#logger.info("node: %s, sn: %d", self.name, self.serial_number)
def _check_function_type(self):
if iscoroutine_or_partial(self.f) is False:
raise TypeError("target function shall be coroutine")
@property
def context(self):
""" Get context from the running Sequence
"""
from .sequence import Sequence
return Sequence.get_context()
def publish_state(self):
""" Requests controller object to publish node's state.
"""
ctrl = ob.OB.controller.get()
#assert(ctrl)
ctrl and ctrl.notify_state_change(self)
# async with self._cond:
# self._cond.notify(1)
@property
def state(self):
return super().state
@state.setter
def state(self, value):
"""Sets the node state"""
super(Action, self.__class__).state.fset(self, value)
#super().state.fset(self, value)
self.publish_state()
async def _execute_preamble(self):
from ..ob import RTFLAG
###logger.info("PREAMBLE ...: %s -- %d", self.name, self.runtime_flags)
if self.runtime_flags & RTFLAG.PAUSE:
self.state = T_STATE.PAUSED
self.pause()
# check permission to execute
await self.running_checkpoint.wait()
# now we are running ...
if self.running_checkpoint.is_set():
self.state = T_STATE.RUNNING
self.t_start = datetime.now()
# Skip this node?
if self.runtime_flags & RTFLAG.SKIP:
self.state = T_STATE.FINISHED
self.skip = True
self.t_end = datetime.now()
self.result = None
async def _execute_node_task(self, task):
try:
r = await task
self.result = task.result()
except Exception:
self.in_error = True
self.exception = task.exception()
raise # bubble up the exception
finally:
self.t_end = datetime.now()
self.state = T_STATE.FINISHED
return r
async def __call__(self, resume=False):
"""
Executes node action.
If the action is a coroutine a task is created and passed to
the asyncio loop for execution.
"""
await self._execute_preamble()
if self.state is T_STATE.FINISHED: # node skipped in preamble
return None
if iscoroutine_or_partial(self.f):
task = asyncio.create_task(self.f())
else:
# this should never happen
raise TypeError("This don't belong here", self.f)
r = await self._execute_node_task(task)
return r
[docs]@attr.s
class ActionInThread(Action):
""" ActionInThread ctor.
ActionInThread nodes executes python `callables` (functions or methods) in a Sequencer script.
Args:
f(callable): The `callable` the node will execute
Keyword Args:
id (str): Unique id. If not provided an unique identifier is assigned.
name (str): Node name. If not provided a name is assigned.
Raises:
TypeException: is `f` is not a python method or function.
"""
_executor = ContextVarExecutor()
def _check_function_type(self):
assert callable(self.f)
if iscoroutine_or_partial(self.f) is True:
raise TypeError("target function shall be a normal callable")
async def __call__(self, resume=False):
"""
Executes node action.
The action is a normal function, a special task is created
in order to execute the action in a different thread.
In any case the context parameter is passed to the action and
its result saved.
"""
await self._execute_preamble()
##logger.info("PREAMBLE: %s", self.name)
if self.state is T_STATE.FINISHED: # node skipped in preamble
return None
if iscoroutine_or_partial(self.f):
# This should never happen
raise TypeError("coroutines don't belong here")
else:
##logger.debug("run in thread: %s", self.f)
loop = asyncio.get_running_loop()
task = loop.run_in_executor(ActionInThread._executor, self.f)
r = await self._execute_node_task(task)
return r
#@attr.s
class StartNode(Action):
"""This is Starting Node.
marks node as unskipabble
"""
def __init__(self, *args, **kw):
super().__init__(*args, **kw)
self.can_skip=False
self.hide = True
class EndNode(Action):
"""This is the finish Node.
marks node as unskippable
"""
def __init__(self, *args, **kw):
super().__init__(*args, **kw)
self.can_skip = False
self.hide = True
def make_node(s, **kw):
"""Makes an :class:`Action` nodes out of a coroutines.
Args:
s(coroutine): target coroutine to be passed to :class:`Action` ctor.
Keyword Arguments:
**kw: Passed straight to :class:`Action` ctor.
Returns:
:class:`Action` node associated to `s` coroutine.
"""
logger.debug("MAKE NODE %s", s)
##logger.debug("coro: %s",s)
if isinstance(s, _BaseNode):
return s
elif iscoroutine_or_partial(s):
##logger.debug("create an Action node for: %s", s)
return Action(s, **kw)
raise TypeError("Argument %s must be a node or coroutine type"%(s))
def iscoroutine_or_partial(obj):
"""Support function.
Identifies coroutines wrapped with partial
"""
while isinstance(obj, functools.partial):
obj = obj.func
return inspect.iscoroutinefunction(obj)