Source code for seq.lib.nodes.template

import attr
#import json
import logging
import contextvars as cv

#from .action import make_node
#from .state import T_STATE
from .sequence import Sequence
from .utils import uniqueId

from ..counter import Counter
from ..  import ob

#logger = logging.get#Logger(__name__)

value_converter = {
    "string": str,
    "number": float,
    "integer": int
}

[docs]@attr.s class Template(Sequence): tpl_name = attr.ib(default=None) params = attr.ib(default=attr.Factory(list), repr=False) content = attr.ib(default=attr.Factory(dict), repr=False) mod = attr.ib(default=None, repr=False, init=False) _parameters = attr.ib(default=attr.Factory(dict), repr=False, init=False) _validator = attr.ib(default=attr.Factory(dict), repr=False, init=False) P = cv.ContextVar("P", default={}) def __attrs_post_init__(self): """ Assigns node's name and id. """ if self.name is None: self.name = "Template" super().__attrs_post_init__() for el in self.params: self._parameters[el['name']] = el # def make_task(self, node, input_list, resume): # from ..seqtask import TplTask # return TplTask(self.id, node, input_list, resume=resume) def set_params(self, d): for k,v in d.items(): if k in self._parameters: self._parameters[k] = d async def execute(self, resume=False): """Executes node -- this just creates the asyncio task""" Template.P.set(self._parameters) return await super().execute(resume) @staticmethod def from_dict(d): t = Template(tpl_name=d['templateName'], params=d.get('parameters',{}), content=d) t.mod = ob.ModuleLoader.from_mod(t.tpl_name) return t
[docs] @staticmethod def create(d, *args, **kw): """Creates a :class:`Template` node Args: *args: Variable length list of nodes or coroutines that composes the sequence. Keyword Args: id: Node id name: node name """ a = Template.from_dict(d) a.seq = [ob.OB._instance_sequence(a.mod, *args, **kw)] return a