Source code for vortex.algo.components

# pylint: disable=unused-argument

"""
Abstract class for any AlgoComponent (:class:`AlgoComponent`) or AlgoComponent's
Mixins (:class:`AlgoComponentDecoMixin`).

Some very generic concrete AlgoComponent classes are also provided:

    * :class:`Expresso`: launch a simple script;
    * :class:`BlindRun`: launch a simple executable (no MPI);
    * :class:`Parallel`: launch an MPI application.

Additional abstract classes provide multiprocessing support (through the
:mod:`taylorism` package):

    * :class:`TaylorRun`: launch a piece of Python code on several processes;
    * :class:`ParaExpresso`: launch a script multiple times (in parallel);
    * :class:`ParaBlindRun`: launch an executable multiple times (in parallel).

Such classes are based on the :mod:`taylorism` (the developer should be familiar
with this package) and uses "Worker" classes provided in the
:mod:`vortex.tools.parallelism` package.

A few examples of AlgoComponent classes are shipped with the code
(see :ref:`examples_algo`). In addition to the documentation provided
in :ref:`stepbystep-index`, it might help.

When class inheritance is not applicable or ineffective, The AlgoComponent's
Mixins are a powerful tool to mutualise some pieces of code. See the
:class:`AlgoComponentDecoMixin` class documentation for more details.
"""

import collections.abc
import contextlib
import copy
import functools
import importlib
import locale
import logging
import multiprocessing
import queue
import shlex
import sys
import tempfile
import traceback as py_traceback

from bronx.fancies import loggers
from bronx.stdtypes import date
from bronx.syntax.decorators import nicedeco
import footprints
from taylorism import Boss
import vortex
import vortex.config as config
from vortex.algo import mpitools
from vortex.syntax.stdattrs import DelayedEnvValue
from vortex.tools.parallelism import ParallelResultParser

#: No automatic export
__all__ = []

logger = loggers.getLogger(__name__)


class AlgoComponentError(Exception):
    """Generic exception class for Algo Components."""

    pass


class AlgoComponentAssertionError(AlgoComponentError):
    """Assertion exception class for Algo Components."""

    pass


class DelayedAlgoComponentError(AlgoComponentError):
    """Triggered when exceptions occurred during the execution but were delayed."""

    def __init__(self, excs):
        super().__init__("One or several errors occurred during the run.")
        self._excs = excs

    def __iter__(self):
        yield from self._excs

    def __str__(self):
        outstr = "One or several errors occurred during the run. In order of appearance:\n"
        outstr += "\n".join(
            [
                "{:3d}. {!s} (type: {!s})".format(i + 1, exc, type(exc))
                for i, exc in enumerate(self)
            ]
        )
        return outstr


class ParallelInconsistencyAlgoComponentError(Exception):
    """Generic exception class for Algo Components."""

    def __init__(self, target):
        msg = "The len of {:s} is inconsistent with the number or ResourceHandlers."
        super().__init__(msg.format(target))


@nicedeco
def _clsmtd_mixin_locked(f):
    """
    This is a utility decorator (for class methods) : it ensures that the method can only
    be called on a bare :class:`AlgoComponentDecoMixin` class.
    """

    def wrapped_clsmethod(cls, *kargs, **kwargs):
        if issubclass(cls, AlgoComponent):
            raise RuntimeError(
                "This class method should not be called once the mixin is in use."
            )
        return f(cls, *kargs, **kwargs)

    return wrapped_clsmethod


def algo_component_deco_mixin_autodoc(cls):
    """
    Decorator that adds an automatic documentation on any :class:`AlgoComponentDecoMixin`
    class.
    """
    extradoc = ""

    # Document extra footprints
    if cls.MIXIN_AUTO_FPTWEAK and cls._MIXIN_EXTRA_FOOTPRINTS:
        extradoc += "\nThe following footprints will be applied to the target classes:\n\n"
        for fp in cls._MIXIN_EXTRA_FOOTPRINTS:
            if isinstance(fp, footprints.Footprint):
                extradoc += footprints.doc.format_docstring(
                    fp, footprints.setup.docstrings, abstractfpobj=True
                )
                extradoc += "\n"

    # Document decorating classes
    if cls.MIXIN_AUTO_DECO:
        for what, desc in (
            ("PREPARE_PREHOOKS", "before the original ``prepare`` method"),
            ("PREPARE_HOOKS", "after the original ``prepare`` method"),
            ("POSTFIX_PREHOOKS", "before the original ``postfix`` method"),
            ("POSTFIX_HOOKS", "after the original ``postfix`` method"),
            ("SPAWN_HOOKS", "after the original ``spawn_hook`` method"),
            (
                "CLI_OPTS_EXTEND",
                "to alter the result of the ``spawn_command_options`` method",
            ),
            (
                "STDIN_OPTS_EXTEND",
                "to alter the result of the ``spawn_stdin_options`` method",
            ),
            (
                "_MIXIN_EXECUTE_OVERWRITE",
                "instead of the original ``execute`` method",
            ),
            (
                "MPIBINS_HOOKS",
                "to alter the result of the ``_bootstrap_mpibins_hack`` method",
            ),
            (
                "MPIENVELOPE_HOOKS",
                "to alter the result of the ``_bootstrap_mpienvelope_hack`` method",
            ),
        ):
            what = "_MIXIN_{:s}".format(what)
            if getattr(cls, what, ()):
                extradoc += "\nThe following method(s) will be called {:s}:\n\n".format(
                    desc
                )
                extradoc += "\n".join(
                    "    * {!r}".format(cb) for cb in getattr(cls, what)
                )
                extradoc += "\n"

    if extradoc:
        extradoc = (
            "\n    .. note:: The following documentation is automatically generated. "
            + "From a developer point of view, using the present mixin class "
            + "will result in the following actions:\n"
            + " \n".join(
                ["        " + t if t else "" for t in extradoc.split("\n")]
            )
        )

        if isinstance(getattr(cls, "__doc__", None), str):
            cls.__doc__ += "\n" + extradoc
        else:
            cls.__doc__ = extradoc

    return cls


class AlgoComponentDecoMixin:
    """
    This is the base class for any Mixin class targeting :class:`AlgoComponent`
    classes.

    Like any Mixin class, this Mixin class primary use is to define methods that
    will be available to the child class.

    However, this class will also interact with the :class:`AlgoComponentMeta`
    metaclass to alter the behaviour of the :class:`AlgoComponent` class it is
    used with. Several "alterations" will be made to the resulting
    :class:`AlgoComponent` class.

        * A bunch of footprints' attribute can be added to the resulting class.
          This is controlled by the :data:`MIXIN_AUTO_FPTWEAK` and
          :data:`_MIXIN_EXTRA_FOOTPRINTS` class variables.
          If :data:`MIXIN_AUTO_FPTWEAK` is ``True`` (which is the default), the
          :class:`~footrprints.Footprint` objects listed in the
          :data:`_MIXIN_EXTRA_FOOTPRINTS` tuple will be prepended to the resulting
          :class:`AlgoComponent` class footprint definition.

        * The ``execute`` method of the resulting class can be overwritten by
          the method referenced in the :data:`_MIXIN_EXECUTE_OVERWRITE` class
          variable. This is allowed only if no ``execute`` method is defined
          manually and if no other :class:`AlgoComponentDecoMixin` tries to
          overwrite it as well. If these two conditions are not met, a
          :class:`RuntimeError` exception will be thrown by the the
          :class:`AlgoComponentMeta` metaclass.

        * A bunch of the :class:`AlgoComponent`'s methods can be decorated. This
          is controlled by the :data:`MIXIN_AUTO_DECO` class variable (``True``
          by default) and a bunch of other class variables containing tuples.
          They are described below:

              * :data:`_MIXIN_PREPARE_PREHOOKS`: Tuple of methods that will be
                executed before the original prepare method. Such methods receive
                the same arguments list than the original decorated method.

              * :data:`_MIXIN_PREPARE_HOOKS`: Tuple of methods that will be
                executed after the original prepare method. Such methods receive
                the same arguments list than the original decorated method.

              * :data:`_MIXIN_EXECUTE_FINALISE_HOOKS`: Tuple of method that will
                be executed after any execution (even if the execution failed).

              * :data:`_MIXIN_FAIL_EXECUTE_HOOKS`: Tuple of method that will
                be executed if the execution fails (the original exception
                will be re-raised afterwards)

              * :data:`_MIXIN_POSTFIX_PREHOOKS`: Tuple of methods that will be
                executed before the original postfix method. Such methods receive
                the same arguments list than the original decorated method.

              * :data:`_MIXIN_POSTFIX_HOOKS`: Tuple of methods that will be
                executed after the original postfix method. Such methods receive
                the same arguments list than the original decorated method.

              * :data:`_MIXIN_SPAWN_HOOKS`: Tuple of methods that will be
                executed after the original spawn_hook method. Such methods receive
                the same arguments list than the original decorated method.

              * :data:`_MIXIN_CLI_OPTS_EXTEND`: Tuple of methods that will be
                executed after the original ``spawn_command_options`` method. Such
                method will receive one argument (``self`` set aside): the value
                returned by the original ``spawn_command_options`` method.

              * :data:`_MIXIN_STDIN_OPTS_EXTEND`: Tuple of methods that will be
                executed after the original ``spawn_stdin_options`` method. Such
                method will receive one argument (``self`` set aside): the value
                returned by the original ``spawn_stdin_options`` method.

    """

    MIXIN_AUTO_FPTWEAK = True
    MIXIN_AUTO_DECO = True

    _MIXIN_EXTRA_FOOTPRINTS = ()

    _MIXIN_PREPARE_PREHOOKS = ()
    _MIXIN_PREPARE_HOOKS = ()
    _MIXIN_EXECUTE_FINALISE_HOOKS = ()
    _MIXIN_FAIL_EXECUTE_HOOKS = ()
    _MIXIN_POSTFIX_PREHOOKS = ()
    _MIXIN_POSTFIX_HOOKS = ()
    _MIXIN_SPAWN_HOOKS = ()

    _MIXIN_CLI_OPTS_EXTEND = ()
    _MIXIN_STDIN_OPTS_EXTEND = ()

    _MIXIN_EXECUTE_OVERWRITE = None

    def __new__(cls, *args, **kwargs):
        if not issubclass(cls, AlgoComponent):
            # This class cannot be instanciated by itself !
            raise RuntimeError(
                "< {0.__name__:s} > is a mixin class: it cannot be instantiated.".format(
                    cls
                )
            )
        else:
            return super().__new__(cls)

    @classmethod
    @_clsmtd_mixin_locked
    def mixin_tweak_footprint(cls, fplocal):
        """Update the footprint definition list."""
        for fp in cls._MIXIN_EXTRA_FOOTPRINTS:
            assert isinstance(fp, footprints.Footprint)
            fplocal.insert(0, fp)

    @classmethod
    @_clsmtd_mixin_locked
    def _get_algo_wrapped(
        cls, targetcls, targetmtd, hooks, prehooks=(), reentering=False
    ):
        """Wraps **targetcls**'s **targetmtd** method."""
        orig_mtd = getattr(targetcls, targetmtd)
        if prehooks and reentering:
            raise ValueError(
                "Conflicting values between prehooks and reenterin."
            )

        def wrapped_method(self, *kargs, **kwargs):
            for phook in prehooks:
                phook(self, *kargs, **kwargs)
            rv = orig_mtd(self, *kargs, **kwargs)
            if reentering:
                kargs = [
                    rv,
                ] + list(kargs)
            for phook in hooks:
                rv = phook(self, *kargs, **kwargs)
                if reentering:
                    kargs[0] = rv
            if reentering:
                return rv

        wrapped_method.__name__ = orig_mtd.__name__
        wrapped_method.__doc__ = (orig_mtd.__doc__ or "").rstrip(
            "\n"
        ) + "\n\nDecorated by :class:`{0.__module__:s}{0.__name__:s}`.".format(
            cls
        )
        wrapped_method.__dict__.update(orig_mtd.__dict__)
        return wrapped_method

    @classmethod
    @_clsmtd_mixin_locked
    def mixin_algo_deco(cls, targetcls):
        """
        Applies all the necessary decorators to the **targetcls**
        :class:`AlgoComponent` class.
        """
        if not issubclass(targetcls, AlgoComponent):
            raise RuntimeError(
                "This class can only be mixed in AlgoComponent classes."
            )
        for targetmtd, hooks, prehooks, reenter in [
            (
                "prepare",
                cls._MIXIN_PREPARE_HOOKS,
                cls._MIXIN_PREPARE_PREHOOKS,
                False,
            ),
            ("fail_execute", cls._MIXIN_FAIL_EXECUTE_HOOKS, (), False),
            ("execute_finalise", cls._MIXIN_EXECUTE_FINALISE_HOOKS, (), False),
            (
                "postfix",
                cls._MIXIN_POSTFIX_HOOKS,
                cls._MIXIN_POSTFIX_PREHOOKS,
                False,
            ),
            ("spawn_hook", cls._MIXIN_SPAWN_HOOKS, (), False),
            ("spawn_command_options", cls._MIXIN_CLI_OPTS_EXTEND, (), True),
            ("spawn_stdin_options", cls._MIXIN_STDIN_OPTS_EXTEND, (), True),
        ]:
            if hooks or prehooks:
                setattr(
                    targetcls,
                    targetmtd,
                    cls._get_algo_wrapped(
                        targetcls, targetmtd, hooks, prehooks, reenter
                    ),
                )
        return targetcls

    @classmethod
    @_clsmtd_mixin_locked
    def mixin_execute_overwrite(cls):
        return cls._MIXIN_EXECUTE_OVERWRITE

    @classmethod
    def mixin_execute_companion(cls):
        """Find on which class "super" should be called (if_MIXIN_EXECUTE_OVERWRITE is used)."""
        comp = getattr(cls, "_algo_meta_execute_companion", ())
        if not comp:
            raise RuntimeError("unable to find a suitable companion class")
        return comp


class AlgoComponentMpiDecoMixin(AlgoComponentDecoMixin):
    """
    This is the base class for Mixin class targeting :class:`Parallel`
    classes.

    It inherits all the behaviour of the :class:`AlgoComponentDecoMixin` base
    class. But in addition, it allows to decorate additional :class:`Parallel`'s
    methods using the following class variables:

      * :data:`_MIXIN_MPIBINS_HOOKS`: Tuple of methods that will be
        executed after the original ``_bootstrap_mpibins_hack`` method. Such
        methods will receive five arguments (``self`` set aside):

            * The list of :class:`mpitools.MpiBinaryDescription` objects returned
              by the original ``_bootstrap_mpibins_hack`` method;
            * The list of :class:`mpitools.MpiBinaryDescription` objects as
              provided by the first caller;
            * The list of binary ResourceHandlers as provided to the ``run``
              method;
            * A dictionary of options as provided to the ``run`` method;
            * A boolean indicating if an MPI envelope is provided by the user.

      * :data:`_MIXIN_MPIENVELOPE_HOOKS`: Tuple of methods that will be
        executed after the original ``_bootstrap_mpienvelope_hack`` method. Such
        methods will receive four arguments (``self`` set aside):

            * The list of dictionaries describing the envelope returned
              by the original``_bootstrap_mpienvelope_hack`` method;
            * The list of dictionaries describing the envelope as
              provided by the first caller;
            * The list of binary ResourceHandlers as provided to the ``run``
              method;
            * A dictionary of options as provided to the ``run`` method;
            * The :class:`mpitools.MpiTool` that is used to generate the
              MPI command line

    """

    _MIXIN_MPIBINS_HOOKS = ()
    _MIXIN_MPIENVELOPE_HOOKS = ()
    _MIXIN_MPIENVELOPE_POSTHOOKS = ()

    @classmethod
    @_clsmtd_mixin_locked
    def mixin_algo_deco(cls, targetcls):
        """
        Applies all the necessary decorators to the **targetcls**
        :class:`AlgoComponent` class.
        """
        targetcls = AlgoComponentDecoMixin.mixin_algo_deco(targetcls)
        if not issubclass(targetcls, Parallel):
            raise RuntimeError(
                "This class can only be mixed in Parallel classes."
            )
        for targetmtd, hooks, prehooks, reenter in [
            ("_bootstrap_mpibins_hack", cls._MIXIN_MPIBINS_HOOKS, (), True),
            (
                "_bootstrap_mpienvelope_hack",
                cls._MIXIN_MPIENVELOPE_HOOKS,
                (),
                True,
            ),
            (
                "_bootstrap_mpienvelope_posthack",
                cls._MIXIN_MPIENVELOPE_POSTHOOKS,
                (),
                True,
            ),
        ]:
            if hooks or prehooks:
                setattr(
                    targetcls,
                    targetmtd,
                    cls._get_algo_wrapped(
                        targetcls, targetmtd, hooks, prehooks, reenter
                    ),
                )
        return targetcls


class AlgoComponentMeta(footprints.FootprintBaseMeta):
    """Meta class for building :class:`AlgoComponent` classes.

    In addition of performing footprints' usual stuff, it processes mixin classes
    that derives from the :class:`AlgoComponentDecoMixin` class. See the
    documentation of this class for more details.
    """

    def __new__(cls, n, b, d):
        # Mixin candidates: a mixin must only be dealt with once hence the
        # condition on issubclass(base, AlgoComponent)
        candidates = [
            base
            for base in b
            if (
                issubclass(base, AlgoComponentDecoMixin)
                and not issubclass(base, AlgoComponent)
            )
        ]
        # Tweak footprints
        todobases = [base for base in candidates if base.MIXIN_AUTO_FPTWEAK]
        if todobases:
            fplocal = d.get("_footprint", list())
            if not isinstance(fplocal, list):
                fplocal = [
                    fplocal,
                ]
            for base in todobases:
                base.mixin_tweak_footprint(fplocal)
            d["_footprint"] = fplocal
        # Overwrite the execute method...
        todobases_exc = [
            base
            for base in candidates
            if base.mixin_execute_overwrite() is not None
        ]
        if len(todobases_exc) > 1:
            raise RuntimeError(
                "Cannot overwrite < execute > multiple times: {:s}".format(
                    ",".join([base.__name__ for base in todobases_exc])
                )
            )
        if todobases_exc:
            if "execute" in d:
                raise RuntimeError(
                    "< execute > is already defined in the target class: cannot proceed"
                )
            d["execute"] = todobases_exc[0].mixin_execute_overwrite()
        # Create the class as usual
        fpcls = super().__new__(cls, n, b, d)
        if todobases_exc:
            setattr(fpcls, "_algo_meta_execute_companion", fpcls)
        # Apply decorators
        todobases = [base for base in candidates if base.MIXIN_AUTO_DECO]
        for base in reversed(todobases):
            base.mixin_algo_deco(fpcls)
        return fpcls


class AlgoComponent(footprints.FootprintBase, metaclass=AlgoComponentMeta):
    """Component in charge of any kind of processing."""

    _SERVERSYNC_RAISEONEXIT = True
    _SERVERSYNC_RUNONSTARTUP = True
    _SERVERSYNC_STOPONEXIT = True

    _abstract = True
    _collector = ("component",)
    _footprint = dict(
        info="Abstract algo component",
        attr=dict(
            engine=dict(
                info="The way the executable should be run.",
                values=[
                    "algo",
                ],
            ),
            flyput=dict(
                info="Activate a background job in charge off on the fly processing.",
                type=bool,
                optional=True,
                default=False,
                access="rwx",
                doc_visibility=footprints.doc.visibility.GURU,
                doc_zorder=-99,
            ),
            flypoll=dict(
                info="The system method called by the flyput background job.",
                optional=True,
                default="io_poll",
                access="rwx",
                doc_visibility=footprints.doc.visibility.GURU,
                doc_zorder=-99,
            ),
            flyargs=dict(
                info="Arguments for the *flypoll* method.",
                type=footprints.FPTuple,
                optional=True,
                default=footprints.FPTuple(),
                doc_visibility=footprints.doc.visibility.GURU,
                doc_zorder=-99,
            ),
            flymapping=dict(
                info="Allow renaming of output files during on the fly processing.",
                optional=True,
                default=False,
                access="rwx",
                doc_visibility=footprints.doc.visibility.GURU,
                doc_zorder=-99,
            ),
            timeout=dict(
                info="Default timeout (in sec.) used  when waiting for an expected resource.",
                type=int,
                optional=True,
                default=180,
                doc_zorder=-50,
            ),
            server_run=dict(
                info="Run the executable as a server.",
                type=bool,
                optional=True,
                values=[False],
                default=False,
                access="rwx",
                doc_visibility=footprints.doc.visibility.ADVANCED,
            ),
            serversync_method=dict(
                info="The method that is used to synchronise with the server.",
                optional=True,
                doc_visibility=footprints.doc.visibility.GURU,
            ),
            serversync_medium=dict(
                info="The medium that is used to synchronise with the server.",
                optional=True,
                doc_visibility=footprints.doc.visibility.GURU,
            ),
            extendpypath=dict(
                info="The list of things to be prepended in the python's path.",
                type=footprints.FPList,
                default=footprints.FPList([]),
                optional=True,
            ),
        ),
    )

    def __init__(self, *args, **kw):
        logger.debug("Algo component init %s", self.__class__)
        self._fslog = list()
        self._promises = None
        self._expected = None
        self._delayed_excs = list()
        self._server_synctool = None
        self._server_process = None
        super().__init__(*args, **kw)

    @property
    def realkind(self):
        """Default kind is ``algo``."""
        return "algo"

    @property
    def fslog(self):
        """Changes on the filesystem during the execution."""
        return self._fslog

    def fstag(self):
        """Defines a tag specific to the current algo component."""
        return "-".join((self.realkind, self.engine))

    def fsstamp(self, opts):
        """Ask the current context to put a stamp on file system."""
        self.context.fstrack_stamp(tag=self.fstag())

    def fscheck(self, opts):
        """Ask the current context to check changes on file system since last stamp."""
        self._fslog.append(self.context.fstrack_check(tag=self.fstag()))

    @property
    def promises(self):
        """Build and return list of actual promises of the current component."""
        if self._promises is None:
            self._promises = [
                x
                for x in self.context.sequence.outputs()
                if x.rh.provider.expected
            ]
        return self._promises

    @property
    def expected_resources(self):
        """Return the list of really expected inputs."""
        if self._expected is None:
            self._expected = [
                x
                for x in self.context.sequence.effective_inputs()
                if x.rh.is_expected()
            ]
        return self._expected

    def delayed_exception_add(self, exc, traceback=True):
        """Store the exception so that it will be handled at the end of the run."""
        logger.error("An exception is delayed")
        if traceback:
            (exc_type, exc_value, exc_traceback) = sys.exc_info()
            print("Exception type: {!s}".format(exc_type))
            print("Exception info: {!s}".format(exc_value))
            print("Traceback:")
            print("\n".join(py_traceback.format_tb(exc_traceback)))
        self._delayed_excs.append(exc)

    def algoassert(self, assertion, msg=""):
        if not assertion:
            raise AlgoComponentAssertionError(msg)

    def grab(self, sec, comment="resource", sleep=10, timeout=None):
        """Wait for a given resource and get it if expected."""
        local = sec.rh.container.localpath()
        self.system.header("Wait for " + comment + " ... [" + local + "]")
        if timeout is None:
            timeout = self.timeout
        if sec.rh.wait(timeout=timeout, sleep=sleep):
            if sec.rh.is_expected():
                sec.get(incache=True)
        elif sec.fatal:
            logger.critical("Missing expected resource <%s>", local)
            raise ValueError("Could not get " + local)
        else:
            logger.error("Missing expected resource <%s>", local)

    def export(self, packenv):
        """Export environment variables in given pack."""
        for k, v in config.from_config(section=packenv).items():
            if k not in self.env:
                logger.info("Setting %s env %s = %s", packenv.upper(), k, v)
                self.env[k] = v

    def prepare(self, rh, opts):
        """Set some defaults env values."""
        if config.is_defined(section="env"):
            self.export("env")

    def absexcutable(self, xfile):
        """Retuns the absolute pathname of the ``xfile`` executable."""
        absx = self.system.path.abspath(xfile)
        return absx

    def flyput_method(self):
        """Check out what could be a valid io_poll command."""
        return getattr(
            self, "io_poll_method", getattr(self.system, self.flypoll, None)
        )

    def flyput_args(self):
        """Return actual io_poll prefixes."""
        return getattr(self, "io_poll_args", tuple(self.flyargs))

    def flyput_kwargs(self):
        """Return actual io_poll prefixes."""
        return getattr(self, "io_poll_kwargs", dict())

    def flyput_check(self):
        """Check default args for io_poll command."""
        actual_args = list()
        if self.flymapping:
            # No checks when mapping is activated
            return self.flyput_args()
        else:
            for arg in self.flyput_args():
                logger.info("Check arg <%s>", arg)
                if any(
                    [
                        x.rh.container.basename.startswith(arg)
                        for x in self.promises
                    ]
                ):
                    logger.info(
                        "Match some promise %s",
                        str(
                            [
                                x.rh.container.basename
                                for x in self.promises
                                if x.rh.container.basename.startswith(arg)
                            ]
                        ),
                    )
                    actual_args.append(arg)
                else:
                    logger.info(
                        "Do not match any promise %s",
                        str([x.rh.container.basename for x in self.promises]),
                    )
            return actual_args

    def flyput_sleep(self):
        """Return a sleeping time in seconds between io_poll commands."""
        return getattr(
            self, "io_poll_sleep", self.env.get("IO_POLL_SLEEP", 20)
        )

    def flyput_outputmapping(self, item):
        """Map output to another filename."""
        return item, "unknown"

    def _flyput_job_internal_search(
        self, io_poll_method, io_poll_args, io_poll_kwargs
    ):
        data = list()
        for arg in io_poll_args:
            logger.info("Polling check arg %s", arg)
            rc = io_poll_method(arg, **io_poll_kwargs)
            try:
                data.extend(rc.result)
            except AttributeError:
                data.extend(rc)
            data = [x for x in data if x]
            logger.info("Polling retrieved data %s", str(data))
        return data

    def _flyput_job_internal_put(self, data):
        for thisdata in data:
            if self.flymapping:
                mappeddata, mappedfmt = self.flyput_outputmapping(thisdata)
                if not mappeddata:
                    raise AlgoComponentError(
                        "The mapping method failed for {:s}.".format(thisdata)
                    )
                if thisdata != mappeddata:
                    logger.info(
                        "Linking <%s> to <%s> (fmt=%s) before put",
                        thisdata,
                        mappeddata,
                        mappedfmt,
                    )
                    self.system.cp(
                        thisdata, mappeddata, intent="in", fmt=mappedfmt
                    )
            else:
                mappeddata = thisdata
            candidates = [
                x
                for x in self.promises
                if x.rh.container.abspath
                == self.system.path.abspath(mappeddata)
            ]
            if candidates:
                logger.info("Polled data is promised <%s>", mappeddata)
                bingo = candidates.pop()
                bingo.put(incache=True)
            else:
                logger.warning("Polled data not promised <%s>", mappeddata)

    def flyput_job(
        self,
        io_poll_method,
        io_poll_args,
        io_poll_kwargs,
        event_complete,
        event_free,
        queue_context,
    ):
        """Poll new data resources."""
        logger.info("Polling with method %s", str(io_poll_method))
        logger.info("Polling with args %s", str(io_poll_args))

        time_sleep = self.flyput_sleep()
        redo = True

        # Start recording the changes in the current context
        ctxrec = self.context.get_recorder()

        while redo and not event_complete.is_set():
            event_free.clear()
            try:
                data = self._flyput_job_internal_search(
                    io_poll_method, io_poll_args, io_poll_kwargs
                )
                self._flyput_job_internal_put(data)
            except Exception as trouble:
                logger.error(
                    "Polling trouble: %s. %s",
                    str(trouble),
                    py_traceback.format_exc(),
                )
                redo = False
            finally:
                event_free.set()
            if redo and not data and not event_complete.is_set():
                logger.info("Get asleep for %d seconds...", time_sleep)
                self.system.sleep(time_sleep)

        # Stop recording and send back the results
        ctxrec.unregister()
        logger.info("Sending the Context recorder to the master process.")
        queue_context.put(ctxrec)
        queue_context.close()

        if redo:
            logger.info("Polling exit on complete event")
        else:
            logger.warning("Polling exit on abort")

    def flyput_begin(self):
        """Launch a co-process to handle promises."""

        nope = (None, None, None, None)
        if not self.flyput:
            return nope

        sh = self.system
        sh.subtitle("On the fly - Begin")

        if not self.promises:
            logger.info("No promise, no co-process")
            return nope

        # Find out a polling method
        io_poll_method = self.flyput_method()
        if not io_poll_method:
            logger.error(
                "No method or shell function defined for polling data"
            )
            return nope

        # Be sure that some default args could match local promises names
        io_poll_args = self.flyput_check()
        if not io_poll_args:
            logger.error("Could not check default arguments for polling data")
            return nope

        # Additional named attributes
        io_poll_kwargs = self.flyput_kwargs()

        # Define events for a nice termination
        event_stop = multiprocessing.Event()
        event_free = multiprocessing.Event()
        queue_ctx = multiprocessing.Queue()

        p_io = multiprocessing.Process(
            name=self.footprint_clsname(),
            target=self.flyput_job,
            args=(
                io_poll_method,
                io_poll_args,
                io_poll_kwargs,
                event_stop,
                event_free,
                queue_ctx,
            ),
        )

        # The co-process is started
        p_io.start()

        return (p_io, event_stop, event_free, queue_ctx)

    def manual_flypolling(self):
        """Call the flyput method and returns the list of newly available files."""
        # Find out a polling method
        io_poll_method = self.flyput_method()
        if not io_poll_method:
            raise AlgoComponentError("Unable to find an io_poll_method")
        # Find out some polling prefixes
        io_poll_args = self.flyput_check()
        if not io_poll_args:
            raise AlgoComponentError("Unable to find an io_poll_args")
        # Additional named attributes
        io_poll_kwargs = self.flyput_kwargs()
        # Starting polling each of the prefixes
        return self._flyput_job_internal_search(
            io_poll_method, io_poll_args, io_poll_kwargs
        )

    def manual_flypolling_job(self):
        """Call the flyput method and deal with promised files."""
        data = self.manual_flypolling()
        self._flyput_job_internal_put(data)

    def flyput_end(self, p_io, e_complete, e_free, queue_ctx):
        """Wait for the co-process in charge of promises."""
        e_complete.set()
        logger.info("Waiting for polling process... <%s>", p_io.pid)
        t0 = date.now()
        e_free.wait(60)
        # Get the Queue and update the context
        time_sleep = self.flyput_sleep()
        try:
            # allow 5 sec to put data into queue (it should be more than enough)
            ctxrec = queue_ctx.get(block=True, timeout=time_sleep + 5)
        except queue.Empty:
            logger.warning("Impossible to get the Context recorder")
            ctxrec = None
        finally:
            queue_ctx.close()
        if ctxrec is not None:
            ctxrec.replay_in(self.context)
        p_io.join(30)
        t1 = date.now()
        waiting = t1 - t0
        logger.info(
            "Waiting for polling process took %f seconds",
            waiting.total_seconds(),
        )
        if p_io.is_alive():
            logger.warning("Force termination of polling process")
            p_io.terminate()
        logger.info("Polling still alive ? %s", str(p_io.is_alive()))
        return not p_io.is_alive()

    def server_begin(self, rh, opts):
        """Start a subprocess and run the server in it."""
        self._server_event = multiprocessing.Event()
        self._server_process = multiprocessing.Process(
            name=self.footprint_clsname(),
            target=self.server_job,
            args=(rh, opts),
        )
        self._server_process.start()

    def server_job(self, rh, opts):
        """Actually run the server and catch all Exceptions.

        If the server crashes, is killed or whatever, the Exception is displayed
        and the appropriate Event is set.
        """
        self.system.signal_intercept_on()
        try:
            self.execute_single(rh, opts)
        except Exception:
            (exc_type, exc_value, exc_traceback) = sys.exc_info()
            print("Exception type: {!s}".format(exc_type))
            print("Exception info: {!s}".format(exc_value))
            print("Traceback:")
            print("\n".join(py_traceback.format_tb(exc_traceback)))
            # Alert the main process of the error
            self._server_event.set()

    def server_alive(self):
        """Is the server still running ?"""
        return (
            self._server_process is not None
            and self._server_process.is_alive()
        )

    def server_end(self):
        """End the server.

        A first attempt is made to terminate it nicely. If it doesn't work,
        a SIGTERM is sent.
        """
        rc = False
        # This test should always succeed...
        if (
            self._server_synctool is not None
            and self._server_process is not None
        ):
            # Is the process still running ?
            if self._server_process.is_alive():
                # Try to stop it nicely
                if (
                    self._SERVERSYNC_STOPONEXIT
                    and self._server_synctool.trigger_stop()
                ):
                    t0 = date.now()
                    self._server_process.join(30)
                    waiting = date.now() - t0
                    logger.info(
                        "Waiting for the server to stop took %f seconds",
                        waiting.total_seconds(),
                    )
                rc = not self._server_event.is_set()
                # Be less nice if needed...
                if (
                    not self._SERVERSYNC_STOPONEXIT
                ) or self._server_process.is_alive():
                    logger.warning("Force termination of the server process")
                    self._server_process.terminate()
                    self.system.sleep(
                        1
                    )  # Allow some time for the process to terminate
                    if not self._SERVERSYNC_STOPONEXIT:
                        rc = False
            else:
                rc = not self._server_event.is_set()
            logger.info(
                "Server still alive ? %s", str(self._server_process.is_alive())
            )
            # We are done with the server
            self._server_synctool = None
            self._server_process = None
            del self._server_event
            # Check the rc
            if not rc:
                raise AlgoComponentError("The server process ended badly.")
        return rc

    def spawn_pre_dirlisting(self):
        """Print a directory listing just before run."""
        self.system.subtitle(
            "{:s} : directory listing (pre-execution)".format(self.realkind)
        )
        self.system.dir(output=False, fatal=False)

    def spawn_hook(self):
        """Last chance to say something before execution."""
        pass

    def spawn(self, args, opts, stdin=None):
        """
        Spawn in the current system the command as defined in raw ``args``.

        The followings environment variables could drive part of the execution:

          * VORTEX_DEBUG_ENV : dump current environment before spawn
        """
        sh = self.system

        if self.env.true("vortex_debug_env"):
            sh.subtitle(
                "{:s} : dump environment (os bound: {!s})".format(
                    self.realkind, self.env.osbound()
                )
            )
            self.env.osdump()

        # On-the-fly coprocessing initialisation
        p_io, e_complete, e_free, q_ctx = self.flyput_begin()

        sh.remove("core")
        sh.softlink("/dev/null", "core")
        self.spawn_hook()
        self.target.spawn_hook(sh)
        self.spawn_pre_dirlisting()
        sh.subtitle("{:s} : start execution".format(self.realkind))
        try:
            sh.spawn(
                args, output=False, stdin=stdin, fatal=opts.get("fatal", True)
            )
        finally:
            # On-the-fly coprocessing cleaning
            if p_io:
                self.flyput_end(p_io, e_complete, e_free, q_ctx)

    def spawn_command_options(self):
        """Prepare options for the resource's command line."""
        return dict()

    def spawn_command_line(self, rh):
        """Split the shell command line of the resource to be run."""
        opts = self.spawn_command_options()
        return shlex.split(rh.resource.command_line(**opts))

    def spawn_stdin_options(self):
        """Prepare options for the resource's stdin generator."""
        return dict()

    def spawn_stdin(self, rh):
        """Generate the stdin File-Like object of the resource to be run."""
        opts = self.spawn_stdin_options()
        stdin_text = rh.resource.stdin_text(**opts)
        if stdin_text is not None:
            plocale = locale.getlocale()[1] or "ascii"
            tmpfh = tempfile.TemporaryFile(dir=self.system.pwd(), mode="w+b")
            if isinstance(stdin_text, str):
                tmpfh.write(stdin_text.encode(plocale))
            else:
                tmpfh.write(stdin_text)
            tmpfh.seek(0)
            return tmpfh
        else:
            return None

    def execute_single(self, rh, opts):
        """Abstract method.

        When server_run is True, this method is used to start the server.
        Otherwise, this method is called by each :meth:`execute` call.
        """
        pass

    def execute(self, rh, opts):
        """Abstract method."""
        if self.server_run:
            # First time here ?
            if self._server_synctool is None:
                if self.serversync_method is None:
                    raise ValueError("The serversync_method must be provided.")
                self._server_synctool = footprints.proxy.serversynctool(
                    method=self.serversync_method,
                    medium=self.serversync_medium,
                    raiseonexit=self._SERVERSYNC_RAISEONEXIT,
                )
                self._server_synctool.set_servercheck_callback(
                    self.server_alive
                )
                self.server_begin(rh, opts)
                # Wait for the first request
                self._server_synctool.trigger_wait()
                if self._SERVERSYNC_RUNONSTARTUP:
                    self._server_synctool.trigger_run()
            else:
                # Acknowledge that we are ready and wait for the next request
                self._server_synctool.trigger_run()
        else:
            self.execute_single(rh, opts)

    def fail_execute(self, e, rh, kw):
        """This method is called if :meth:`execute` raise an exception."""
        pass

    def execute_finalise(self, opts):
        """Abstract method.

        This method is called inconditionaly when :meth:`execute` exits (even
        if an Exception was raised).
        """
        if self.server_run:
            self.server_end()

    def postfix_post_dirlisting(self):
        self.system.subtitle(
            "{:s} : directory listing (post-run)".format(self.realkind)
        )
        self.system.dir(output=False, fatal=False)

    def postfix(self, rh, opts):
        """Some basic informations."""
        self.postfix_post_dirlisting()

    def dumplog(self, opts):
        """Dump to local file the internal log of the current algo component."""
        self.system.pickle_dump(self.fslog, "log." + self.fstag())

    def delayed_exceptions(self, opts):
        """Gather all the delayed exceptions and raises one if necessary."""
        if len(self._delayed_excs) > 0:
            excstmp = self._delayed_excs
            self._delayed_excs = list()
            raise DelayedAlgoComponentError(excstmp)

    def valid_executable(self, rh):
        """
        Return a boolean value according to the effective executable nature
        of the resource handler provided.
        """
        return True

    def abortfabrik(self, step, msg):
        """A shortcut to avoid next steps of the run."""

        def fastexit(self, *args, **kw):
            logger.warning(
                "Run <%s> skipped because abort occurred [%s]", step, msg
            )

        return fastexit

    def abort(self, msg="Not documented"):
        """A shortcut to avoid next steps of the run."""
        for step in ("prepare", "execute", "postfix"):
            setattr(self, step, self.abortfabrik(step, msg))

    def run(self, rh=None, **kw):
        """Sequence for execution : prepare / execute / postfix."""
        self._status = True

        # Get instance shorcuts to context and system objects
        self.ticket = vortex.sessions.current()
        self.context = self.ticket.context
        self.system = self.context.system
        self.target = kw.pop("target", None)
        if self.target is None:
            self.target = self.system.default_target

        # Before trying to do anything, check the executable
        if not self.valid_executable(rh):
            logger.warning(
                "Resource %s is not a valid executable", rh.resource
            )
            return False

        # A cloned environment will be bound to the OS
        self.env = self.context.env.clone()
        with self.env:
            # The actual "run" recipe
            self.prepare(rh, kw)  # 1
            self.fsstamp(kw)  # 2
            try:
                self.execute(rh, kw)  # 3
            except Exception as e:
                self.fail_execute(e, rh, kw)  # 3.1
                raise
            finally:
                self.execute_finalise(kw)  # 3.2
            self.fscheck(kw)  # 4
            self.postfix(rh, kw)  # 5
            self.dumplog(kw)  # 6
            self.delayed_exceptions(kw)  # 7

        # Free local references
        self.env = None
        self.system = None

        return self._status

    def quickview(self, nb=0, indent=0):
        """Standard glance to objects."""
        tab = "  " * indent
        print("{}{:02d}. {:s}".format(tab, nb, repr(self)))
        for subobj in ("kind", "engine", "interpreter"):
            obj = getattr(self, subobj, None)
            if obj:
                print("{}  {:s}: {!s}".format(tab, subobj, obj))
        print()

    def setlink(
        self,
        initrole=None,
        initkind=None,
        initname=None,
        inittest=lambda x: True,
    ):
        """Set a symbolic link for actual resource playing defined role."""
        initsec = [
            x
            for x in self.context.sequence.effective_inputs(
                role=initrole, kind=initkind
            )
            if inittest(x.rh)
        ]

        if not initsec:
            logger.warning(
                "Could not find logical role %s with kind %s - assuming already renamed",
                initrole,
                initkind,
            )

        if len(initsec) > 1:
            logger.warning(
                "More than one role %s with kind %s", initrole, initkind
            )

        if initname is not None:
            for l in [x.rh.container.localpath() for x in initsec]:
                if not self.system.path.exists(initname):
                    self.system.symlink(l, initname)
                    break

        return initsec


[docs] class PythonFunction(AlgoComponent): """Execute a function defined in Python module. The function is passed the current :class:`sequence <vortex.layout.dataflow.Sequence>`, as well as a keyword arguments described by attribute ``func_kwargs``. Example: .. code-block:: python >>> exe = toolbox.executable( ... role = 'Script', ... format = 'ascii', ... hostname = 'localhost', ... kind = 'script', ... language = 'python', ... local = 'module.py', ... remote = '/path/to/module.py', ... tube = 'file', ... ) >>> tbalgo = toolbox.algo( ... engine="function", ... func_name="my_plugin_entry_point_function", ... func_kwargs={ntasks: 35, subnproc: 4}, ... ) >>> tbalgo.run(exe[0]) .. code-block:: python # /path/to/module.py # ... def my_plugin_entry_point_function( sequence, ntasks, subnproc, ): for input in sequence.effective_inputs(role=gridpoint): # ... """ _footprint = dict( info="Execute a Python function in a given module", attr=dict( engine=dict(values=["function"]), func_name=dict(info="The function's name"), func_kwargs=dict( info=( "A dictionary containing the function's keyword arguments" ), type=footprints.FPDict, default=footprints.FPDict({}), optional=True, ), ), ) def prepare(self, rh, opts): spec = importlib.util.spec_from_file_location( name="module", location=rh.container.localpath() ) mod = importlib.util.module_from_spec(spec) sys.path.extend(self.extendpypath) try: spec.loader.exec_module(mod) except AttributeError: raise AttributeError self.func = getattr(mod, self.func_name) def execute(self, rh, opts): self.func( self.context.sequence, **self.func_kwargs, ) def execute_finalise(self, opts): for p in self.extendpypath: sys.path.remove(p)
class ExecutableAlgoComponent(AlgoComponent): """Component in charge of running executable resources.""" _abstract = True def valid_executable(self, rh): """ Return a boolean value according to the effective executable nature of the resource handler provided. """ return rh is not None class xExecutableAlgoComponent(ExecutableAlgoComponent): """Component in charge of running executable resources.""" _abstract = True def valid_executable(self, rh): """ Return a boolean value according to the effective executable nature of the resource handler provided. """ rc = super().valid_executable(rh) if rc: # Ensure that the input file is executable xrh = ( rh if isinstance(rh, (list, tuple)) else [ rh, ] ) for arh in xrh: self.system.xperm(arh.container.localpath(), force=True) return rc class TaylorRun(AlgoComponent): """ Run any taylorism Worker in the current environment. This abstract class includes helpers to use the taylorism package in order to introduce an external parallelisation. It is designed to work well with a taylorism Worker class that inherits from :class:`vortex.tools.parallelism.TaylorVortexWorker`. """ _abstract = True _footprint = dict( info="Abstract algo component based on the taylorism package.", attr=dict( kind=dict(), verbose=dict( info="Run in verbose mode", type=bool, default=False, optional=True, doc_zorder=-50, ), ntasks=dict( info="The maximum number of parallel tasks", type=int, default=DelayedEnvValue("VORTEX_SUBMIT_TASKS", 1), optional=True, ), ), ) def __init__(self, *kargs, **kwargs): super().__init__(*kargs, **kwargs) self._boss = None def _default_common_instructions(self, rh, opts): """Create a common instruction dictionary that will be used by the workers.""" return dict(kind=self.kind, taskdebug=self.verbose) def _default_pre_execute(self, rh, opts): """Various initialisations. In particular it creates the task scheduler (Boss).""" # Start the task scheduler self._boss = Boss( verbose=self.verbose, scheduler=footprints.proxy.scheduler( limit="threads", max_threads=self.ntasks ), ) self._boss.make_them_work() def _add_instructions(self, common_i, individual_i): """Give a new set of instructions to the Boss.""" self._boss.set_instructions(common_i, individual_i) def _default_post_execute(self, rh, opts): """Summarise the results of the various tasks that were run.""" logger.info( "All the input files were dealt with: now waiting for the parallel processing to finish" ) self._boss.wait_till_finished() logger.info( "The parallel processing has finished. here are the results:" ) report = self._boss.get_report() prp = ParallelResultParser(self.context) for r in report["workers_report"]: rc = prp(r) if isinstance(rc, Exception): self.delayed_exception_add(rc, traceback=False) rc = False self._default_rc_action(rh, opts, r, rc) def _default_rc_action(self, rh, opts, report, rc): """How should we process the return code ?""" if not rc: logger.warning( "Apparently something went sideways with this task (rc=%s).", str(rc), ) def execute(self, rh, opts): """ This should be adapted to your needs... A usual sequence is:: self._default_pre_execute(rh, opts) common_i = self._default_common_instructions(rh, opts) # Update the common instructions common_i.update(dict(someattribute='Toto', )) # Your own code here # Give some instructions to the boss self._add_instructions(common_i, dict(someattribute=['Toto', ],)) # Your own code here self._default_post_execute(rh, opts) """ raise NotImplementedError
[docs] class Expresso(ExecutableAlgoComponent): """Run a script resource in the good environment.""" _footprint = dict( info="AlgoComponent that simply runs a script", attr=dict( interpreter=dict( info="The interpreter needed to run the script.", values=[ "current", "awk", "ksh", "bash", "perl", "python", "singularity", ], ), interpreter_path=dict( info="The interpreter command.", optional=True, ), interpreter_args=dict( info="Some options to pass to the expresso", optional=True, default="", ), engine=dict(values=["exec", "launch"]), ), ) @property def _actual_interpreter(self): """Return the interpreter command.""" if self.interpreter == "current": if self.interpreter_path is not None: raise ValueError( "*interpreter=current* and *interpreter_path* attributes are incompatible" ) return sys.executable else: if self.interpreter_path is None: return self.interpreter else: if self.system.xperm(self.interpreter_path): return self.interpreter_path else: raise AlgoComponentError( "The '{:s}' interpreter is not executable".format( self.interpreter_path ) ) def _interpreter_args_fix(self, rh, opts): cmd_opts = shlex.split(self.interpreter_args) absexec = self.absexcutable(rh.container.localpath()) if self.interpreter == "awk": return [*cmd_opts, "-f", absexec] if self.interpreter == "singularity": return ["run", *cmd_opts, absexec] else: return [ *cmd_opts, absexec, ] def execute_single(self, rh, opts): """ Run the specified resource handler through the current interpreter, using the resource command_line method as args. """ # Generic config args = [ self._actual_interpreter, ] args.extend(self._interpreter_args_fix(rh, opts)) args.extend(self.spawn_command_line(rh)) logger.info("Run script %s", args) rh_stdin = self.spawn_stdin(rh) if rh_stdin is not None: plocale = locale.getlocale()[1] or "ascii" logger.info( "Script stdin:\n%s", rh_stdin.read().decode(plocale, "replace") ) rh_stdin.seek(0) # Python path stuff newpypath = ":".join(self.extendpypath) if "pythonpath" in self.env: newpypath += ":{:s}".format(self.env.pythonpath) # launching the program... with self.env.delta_context(pythonpath=newpypath): self.spawn(args, opts, stdin=rh_stdin)
class ParaExpresso(TaylorRun): """ Run any script in the current environment. This abstract class includes helpers to use the taylorism package in order to introduce an external parallelisation. It is designed to work well with a taylorism Worker class that inherits from :class:`vortex.tools.parallelism.VortexWorkerBlindRun`. """ _abstract = True _footprint = dict( info="AlgoComponent that simply runs a script using the taylorism package.", attr=dict( interpreter=dict( info="The interpreter needed to run the script.", values=["current", "awk", "ksh", "bash", "perl", "python"], ), engine=dict(values=["exec", "launch"]), interpreter_path=dict( info="The full path to the interpreter.", optional=True, ), extendpypath=dict( info="The list of things to be prepended in the python's path.", type=footprints.FPList, default=footprints.FPList([]), optional=True, ), ), ) def valid_executable(self, rh): """ Return a boolean value according to the effective executable nature of the resource handler provided. """ return rh is not None def _interpreter_args_fix(self, rh, opts): absexec = self.absexcutable(rh.container.localpath()) if self.interpreter == "awk": return ["-f", absexec] else: return [ absexec, ] def _default_common_instructions(self, rh, opts): """Create a common instruction dictionary that will be used by the workers.""" ddict = super()._default_common_instructions(rh, opts) actual_interpreter = ( sys.executable if self.interpreter == "current" else self.interpreter ) ddict["progname"] = actual_interpreter ddict["progargs"] = footprints.FPList( self._interpreter_args_fix(rh, opts) + self.spawn_command_line(rh) ) ddict["progenvdelta"] = footprints.FPDict() # Deal with the python path newpypath = ":".join(self.extendpypath) if "pythonpath" in self.env: self.env.pythonpath += ":{:s}".format(newpypath) if newpypath: ddict["progenvdelta"]["pythonpath"] = newpypath return ddict class BlindRun(xExecutableAlgoComponent): """ Run any executable resource in the current environment. Mandatory argument is: * engine ( values = blind ) """ _footprint = dict( info="AlgoComponent that simply runs a serial binary", attr=dict(engine=dict(values=["blind"])), ) def execute_single(self, rh, opts): """ Run the specified resource handler as an absolute executable, using the resource command_line method as args. """ args = [self.absexcutable(rh.container.localpath())] args.extend(self.spawn_command_line(rh)) logger.info("BlindRun executable resource %s", args) rh_stdin = self.spawn_stdin(rh) if rh_stdin is not None: plocale = locale.getlocale()[1] or "ascii" logger.info( "BlindRun executable stdin (fileno:%d):\n%s", rh_stdin.fileno(), rh_stdin.read().decode(plocale, "replace"), ) rh_stdin.seek(0) self.spawn(args, opts, stdin=rh_stdin) class ParaBlindRun(TaylorRun): """ Run any executable resource (without MPI) in the current environment. This abstract class includes helpers to use the taylorism package in order to introduce an external parallelisation. It is designed to work well with a taylorism Worker class that inherits from :class:`vortex.tools.parallelism.VortexWorkerBlindRun`. """ _abstract = True _footprint = dict( info="Abstract AlgoComponent that runs a serial binary using the taylorism package.", attr=dict( engine=dict(values=["blind"]), taskset=dict( info="Topology/Method to set up the CPU affinity of the child task.", default=None, optional=True, values=[ "{:s}{:s}".format(t, m) for t in ("raw", "socketpacked", "numapacked") for m in ("", "_taskset", "_gomp", "_omp", "_ompverbose") ], ), taskset_bsize=dict( info="The number of threads used by one task", type=int, default=1, optional=True, ), ), ) def valid_executable(self, rh): """ Return a boolean value according to the effective executable nature of the resource handler provided. """ rc = rh is not None if rc: # Ensure that the input file is executable xrh = ( rh if isinstance(rh, (list, tuple)) else [ rh, ] ) for arh in xrh: self.system.xperm(arh.container.localpath(), force=True) return rc def _default_common_instructions(self, rh, opts): """Create a common instruction dictionary that will be used by the workers.""" ddict = super()._default_common_instructions(rh, opts) ddict["progname"] = self.absexcutable(rh.container.localpath()) ddict["progargs"] = footprints.FPList(self.spawn_command_line(rh)) ddict["progtaskset"] = self.taskset ddict["progtaskset_bsize"] = self.taskset_bsize return ddict class Parallel(xExecutableAlgoComponent): """ Run a binary launched with MPI support. """ _footprint = dict( info="AlgoComponent that simply runs an MPI binary", attr=dict( engine=dict(values=["parallel"]), mpitool=dict( info="The object used to launch the parallel program", optional=True, type=mpitools.MpiTool, doc_visibility=footprints.doc.visibility.GURU, ), mpiname=dict( info=( "The mpiname of a class in the mpitool collector " + "(used only if *mpitool* is not provided)" ), optional=True, alias=["mpi"], doc_visibility=footprints.doc.visibility.GURU, ), mpiverbose=dict( info="Boost logging verbosity in mpitools", optional=True, default=False, doc_visibility=footprints.doc.visibility.GURU, ), binaries=dict( info="List of MpiBinaryDescription objects", optional=True, type=footprints.FPList, doc_visibility=footprints.doc.visibility.GURU, ), binarysingle=dict( info="If *binaries* is missing, the default binary role for single binaries", optional=True, default="basicsingle", doc_visibility=footprints.doc.visibility.GURU, ), binarymulti=dict( info="If *binaries* is missing, the default binary role for multiple binaries", type=footprints.FPList, optional=True, default=footprints.FPList( [ "basic", ] ), doc_visibility=footprints.doc.visibility.GURU, ), ), ) def _mpitool_attributes(self, opts): """Return the dictionary of attributes needed to create the mpitool object.""" # Read the appropriate configuration in the target file if not config.is_defined(section="mpitool"): conf_dict = {} else: conf_dict = config.from_config(section="mpitool") if self.mpiname: conf_dict["mpiname"] = self.mpiname # Make "mpirun" the default mpi command name if "mpiname" not in conf_dict.keys(): conf_dict["mpiname"] = "mpirun" possible_attrs = functools.reduce( lambda s, t: s | t, [ set(cls.footprint_retrieve().attr.keys()) for cls in footprints.proxy.mpitools ], ) nonkeys = set(conf_dict.keys()) - possible_attrs if nonkeys: msg = ( "The following keywords are unknown configuration" 'keys for section "mpitool":\n' ) raise ValueError(msg + "\n".join(nonkeys)) return conf_dict def spawn_command_line(self, rh): """Split the shell command line of the resource to be run.""" return [super(Parallel, self).spawn_command_line(r) for r in rh] def _bootstrap_mpibins_hack(self, bins, rh, opts, use_envelope): return copy.deepcopy(bins) def _bootstrap_mpienvelope_hack(self, envelope, rh, opts, mpi): return copy.deepcopy(envelope) def _bootstrap_mpienvelope_posthack(self, envelope, rh, opts, mpi): return None def _bootstrap_mpitool(self, rh, opts): """Initialise the mpitool object and finds out the command line.""" # Rh is a list binaries... if not isinstance(rh, collections.abc.Iterable): rh = [ rh, ] # Find the MPI launcher mpi = self.mpitool if not mpi: mpi = footprints.proxy.mpitool( sysname=self.system.sysname, **self._mpitool_attributes(opts) ) if not mpi: logger.critical( "Component %s could not find any mpitool", self.footprint_clsname(), ) raise AttributeError("No valid mpitool attr could be found.") # Setup various useful things (env, system, ...) mpi.import_basics(self) mpi_opts = opts.get("mpiopts", dict()) envelope = [] use_envelope = "envelope" in mpi_opts if use_envelope: envelope = mpi_opts.pop("envelope") if envelope == "auto": blockspec = dict( nn=self.env.get("VORTEX_SUBMIT_NODES", 1), ) if "VORTEX_SUBMIT_TASKS" in self.env: blockspec["nnp"] = self.env.get("VORTEX_SUBMIT_TASKS") else: raise ValueError( "when envelope='auto', VORTEX_SUBMIT_TASKS must be set up." ) envelope = [ blockspec, ] elif isinstance(envelope, dict): envelope = [ envelope, ] elif isinstance(envelope, (list, tuple)): pass else: raise AttributeError("Invalid envelope specification") if envelope: envelope_ntasks = sum([d["nn"] * d["nnp"] for d in envelope]) if not envelope: use_envelope = False if not use_envelope: # Some MPI presets mpi_desc = dict() for mpi_k in ("tasks", "openmp"): mpi_kenv = "VORTEX_SUBMIT_" + mpi_k.upper() if mpi_kenv in self.env: mpi_desc[mpi_k] = self.env.get(mpi_kenv) # Binaries may be grouped together on the same nodes bin_groups = mpi_opts.pop("groups", []) # Find out the command line bargs = self.spawn_command_line(rh) # Potential Source files sources = [] # The usual case: no indications, 1 binary + a potential ioserver if len(rh) == 1 and not self.binaries: # In such a case, defining group does not makes sense self.algoassert( not bin_groups, "With only one binary, groups should not be defined", ) # The main program allowbind = mpi_opts.pop("allowbind", True) distribution = mpi_opts.pop( "distribution", self.env.get("VORTEX_MPIBIN_DEF_DISTRIBUTION", None), ) if use_envelope: master = footprints.proxy.mpibinary( kind=self.binarysingle, ranks=envelope_ntasks, openmp=self.env.get("VORTEX_SUBMIT_OPENMP", None), allowbind=allowbind, distribution=distribution, ) else: master = footprints.proxy.mpibinary( kind=self.binarysingle, nodes=self.env.get("VORTEX_SUBMIT_NODES", 1), allowbind=allowbind, distribution=distribution, **mpi_desc, ) master.options = mpi_opts master.master = self.absexcutable(rh[0].container.localpath()) master.arguments = bargs[0] bins = [ master, ] # Source files ? if hasattr(rh[0].resource, "guess_binary_sources"): sources.extend( rh[0].resource.guess_binary_sources(rh[0].provider) ) # Multiple binaries are to be launched: no IO server support here. elif len(rh) > 1 and not self.binaries: # Binary roles if len(self.binarymulti) == 1: bnames = self.binarymulti * len(rh) else: if len(self.binarymulti) != len(rh): raise ParallelInconsistencyAlgoComponentError( "self.binarymulti" ) bnames = self.binarymulti # Check mpiopts shape for k, v in mpi_opts.items(): if not isinstance(v, collections.abc.Iterable): raise ValueError( "In such a case, mpiopts must be Iterable" ) if len(v) != len(rh): raise ParallelInconsistencyAlgoComponentError( "mpiopts[{:s}]".format(k) ) # Check bin_group shape if bin_groups: if len(bin_groups) != len(rh): raise ParallelInconsistencyAlgoComponentError("bin_group") # Create MpiBinaryDescription objects bins = list() allowbinds = mpi_opts.pop( "allowbind", [ True, ] * len(rh), ) distributions = mpi_opts.pop( "distribution", [ self.env.get("VORTEX_MPIBIN_DEF_DISTRIBUTION", None), ] * len(rh), ) for i, r in enumerate(rh): if use_envelope: bins.append( footprints.proxy.mpibinary( kind=bnames[i], allowbind=allowbinds[i], distribution=distributions[i], ) ) else: bins.append( footprints.proxy.mpibinary( kind=bnames[i], nodes=self.env.get("VORTEX_SUBMIT_NODES", 1), allowbind=allowbinds[i], distribution=distributions[i], **mpi_desc, ) ) # Reshape mpiopts bins[i].options = {k: v[i] for k, v in mpi_opts.items()} if bin_groups: bins[i].group = bin_groups[i] bins[i].master = self.absexcutable(r.container.localpath()) bins[i].arguments = bargs[i] # Source files ? if hasattr(r.resource, "guess_binary_sources"): sources.extend(r.resource.guess_binary_sources(r.provider)) # Nothing to do: binary descriptions are provided by the user else: if len(self.binaries) != len(rh): raise ParallelInconsistencyAlgoComponentError("self.binaries") bins = self.binaries for i, r in enumerate(rh): bins[i].master = self.absexcutable(r.container.localpath()) bins[i].arguments = bargs[i] # The global envelope envelope = self._bootstrap_mpienvelope_hack(envelope, rh, opts, mpi) if envelope: mpi.envelope = envelope # The binaries description mpi.binaries = self._bootstrap_mpibins_hack( bins, rh, opts, use_envelope ) upd_envelope = self._bootstrap_mpienvelope_posthack( envelope, rh, opts, mpi ) if upd_envelope: mpi.envelope = upd_envelope # The source files mpi.sources = sources if envelope: # Check the consistency between nranks and the total number of processes envelope_ntasks = sum([d.nprocs for d in mpi.envelope]) mpibins_total = sum([m.nprocs for m in mpi.binaries]) if not envelope_ntasks == mpibins_total: raise AlgoComponentError( ( "The number of requested ranks ({:d}) must be equal " "to the number of processes available in the envelope ({:d})" ).format(mpibins_total, envelope_ntasks) ) args = mpi.mkcmdline() for b in mpi.binaries: logger.info( "Run %s in parallel mode. Args: %s.", b.master, " ".join(b.arguments), ) logger.info("Full MPI command line: %s", " ".join(args)) # Setup various useful things (env, system, ...) mpi.import_basics(self) return mpi, args @contextlib.contextmanager def _tweak_mpitools_logging(self): if self.mpiverbose: m_loggers = dict() for m_logger_name in [ l for l in loggers.lognames if "mpitools" in l ]: m_logger = loggers.getLogger(m_logger_name) m_loggers[m_logger] = m_logger.level m_logger.setLevel(logging.DEBUG) try: yield finally: for m_logger, prev_level in m_loggers.items(): m_logger.setLevel(prev_level) else: yield def execute_single(self, rh, opts): """Run the specified resource handler through the `mpitool` launcher An argument named `mpiopts` could be provided as a dictionary: it may contain indications on the number of nodes, tasks, ... """ self.system.subtitle("{:s} : parallel engine".format(self.realkind)) with self._tweak_mpitools_logging(): # Return a mpitool object and the mpicommand line mpi, args = self._bootstrap_mpitool(rh, opts) # Specific parallel settings mpi.setup(opts) # This is actual running command self.spawn(args, opts) # Specific parallel cleaning mpi.clean(opts) @algo_component_deco_mixin_autodoc class ParallelIoServerMixin(AlgoComponentMpiDecoMixin): """Adds an IOServer capabilities (footprints attributes + MPI bianries alteration).""" _MIXIN_EXTRA_FOOTPRINTS = [ footprints.Footprint( info="Abstract IoServer footprints' attributes.", attr=dict( ioserver=dict( info="The object used to launch the IOserver part of the binary.", type=mpitools.MpiBinaryIOServer, optional=True, default=None, doc_visibility=footprints.doc.visibility.GURU, ), ioname=dict( info=( "The binary_kind of a class in the mpibinary collector " + "(used only if *ioserver* is not provided)" ), optional=True, default="ioserv", doc_visibility=footprints.doc.visibility.GURU, ), iolocation=dict( info="Location of the IO server within the binary list", type=int, default=-1, optional=True, ), ), ), ] def _bootstrap_mpibins_ioserver_hack( self, bins, bins0, rh, opts, use_envelope ): """If requested, adds an extra binary that will act as an IOServer.""" master = bins[-1] # A potential IO server io = self.ioserver if not io and int(self.env.get("VORTEX_IOSERVER_NODES", -1)) >= 0: io = footprints.proxy.mpibinary( kind=self.ioname, nodes=self.env.VORTEX_IOSERVER_NODES, tasks=( self.env.VORTEX_IOSERVER_TASKS or master.options.get("nnp", master.tasks) ), openmp=( self.env.VORTEX_IOSERVER_OPENMP or master.options.get("openmp", master.openmp) ), iolocation=self.iolocation, ) io.options = { x[3:]: opts[x] for x in opts.keys() if x.startswith("io_") } io.master = master.master io.arguments = master.arguments if ( not io and int(self.env.get("VORTEX_IOSERVER_COMPANION_TASKS", -1)) >= 0 ): io = footprints.proxy.mpibinary( kind=self.ioname, nodes=master.options.get("nn", master.nodes), tasks=self.env.VORTEX_IOSERVER_COMPANION_TASKS, openmp=( self.env.VORTEX_IOSERVER_OPENMP or master.options.get("openmp", master.openmp) ), ) io.options = { x[3:]: opts[x] for x in opts.keys() if x.startswith("io_") } io.master = master.master io.arguments = master.arguments if master.group is not None: # The master binary is already in a group ! Use it. io.group = master.group else: io.group = "auto_masterwithio" master.group = "auto_masterwithio" if ( not io and self.env.get("VORTEX_IOSERVER_INCORE_TASKS", None) is not None ): if hasattr(master, "incore_iotasks"): master.incore_iotasks = self.env.VORTEX_IOSERVER_INCORE_TASKS if ( not io and self.env.get("VORTEX_IOSERVER_INCORE_FIXER", None) is not None ): if hasattr(master, "incore_iotasks_fixer"): master.incore_iotasks_fixer = ( self.env.VORTEX_IOSERVER_INCORE_FIXER ) if ( not io and self.env.get("VORTEX_IOSERVER_INCORE_DIST", None) is not None ): if hasattr(master, "incore_iodist"): master.incore_iodist = self.env.VORTEX_IOSERVER_INCORE_DIST if io: rh.append(rh[0]) if master.group is None: if "nn" in master.options: master.options["nn"] = ( master.options["nn"] - io.options["nn"] ) else: logger.warning( 'The "nn" option is not available in the master binary ' + "mpi options. Consequently it can be fixed..." ) if self.iolocation >= 0: bins.insert(self.iolocation, io) else: bins.append(io) return bins _MIXIN_MPIBINS_HOOKS = (_bootstrap_mpibins_ioserver_hack,) @algo_component_deco_mixin_autodoc class ParallelOpenPalmMixin(AlgoComponentMpiDecoMixin): """Class mixin to be used with OpenPALM programs. It will automatically add the OpenPALM driver binary to the list of binaries. The location of the OpenPALM driver should be automatically detected provided that a section with ``role=OpenPALM Driver`` lies in the input's sequence. Alternatively, the path to the OpenPALM driver can be provided using the **openpalm_driver** footprint's argument. """ _MIXIN_EXTRA_FOOTPRINTS = [ footprints.Footprint( info="Abstract OpenPALM footprints' attributes.", attr=dict( openpalm_driver=dict( info=( "The path to the OpenPALM driver binary. " + "When omitted, the input sequence is looked up " + "for section with ``role=OpenPALM Driver``." ), optional=True, doc_visibility=footprints.doc.visibility.ADVANCED, ), openpalm_overcommit=dict( info=( "Run the OpenPALM driver on the first node in addition " + "to existing tasks. Otherwise dedicated tasks are used." ), type=bool, default=True, optional=True, doc_visibility=footprints.doc.visibility.ADVANCED, ), openpalm_binddriver=dict( info="Try to bind the OpenPALM driver binary.", type=bool, optional=True, default=False, doc_visibility=footprints.doc.visibility.ADVANCED, ), openpalm_binkind=dict( info="The binary kind for the OpenPALM driver.", optional=True, default="basic", doc_visibility=footprints.doc.visibility.GURU, ), ), ), ] @property def _actual_openpalm_driver(self): """Returns the OpenPALM's driver location.""" path = self.openpalm_driver if path is None: drivers = self.context.sequence.effective_inputs( role="OpenPALMDriver" ) if not drivers: raise AlgoComponentError("No OpenPALM driver was provided.") elif len(drivers) > 1: raise AlgoComponentError( "Several OpenPALM driver were provided." ) path = drivers[0].rh.container.localpath() else: if not self.system.path.exists(path): raise AlgoComponentError( "No OpenPALM driver was provider ({:s} does not exists).".format( path ) ) return path def _bootstrap_mpibins_openpalm_hack( self, bins, bins0, rh, opts, use_envelope ): """Adds the OpenPALM driver to the binary list.""" single_bin = len(bins) == 1 master = bins[0] driver = footprints.proxy.mpibinary( kind=self.openpalm_binkind, nodes=1, tasks=self.env.VORTEX_OPENPALM_DRV_TASKS or 1, openmp=self.env.VORTEX_OPENPALM_DRV_OPENMP or 1, allowbind=opts.pop( "palmdrv_bind", self.env.get( "VORTEX_OPENPALM_DRV_BIND", self.openpalm_binddriver ), ), ) driver.options = { x[8:]: opts[x] for x in opts.keys() if x.startswith("palmdrv_") } driver.master = self._actual_openpalm_driver self.system.xperm(driver.master, force=True) bins.insert(0, driver) if not self.openpalm_overcommit and single_bin: # Tweak the number of tasks of the master program in order to accommodate # the driver # NB: If multiple binaries are provided, the user must do this by # himself (i.e. leave enough room for the driver's task). if "nn" in master.options: master.options["nn"] = master.options["nn"] - 1 else: # Ok, tweak nprocs instead (an envelope might be defined) try: nprocs = master.nprocs except mpitools.MpiException: logger.error( 'Neither the "nn" option nor the nprocs is ' + "available for the master binary. Consequently " + "it can be fixed..." ) else: master.options["np"] = nprocs - driver.nprocs return bins _MIXIN_MPIBINS_HOOKS = (_bootstrap_mpibins_openpalm_hack,) def _bootstrap_mpienvelope_openpalm_posthack( self, env, env0, rh, opts, mpi ): """ Tweak the MPI envelope in order to execute the OpenPALM driver on the appropriate node. """ master = mpi.binaries[ 1 ] # The first "real" program that will be launched driver = mpi.binaries[0] # The OpenPALM driver if self.openpalm_overcommit: # Execute the driver on the first compute node if env or env0: env = env or copy.deepcopy(env0) # An envelope is already defined... update it if not ("nn" in env[0] and "nnp" in env[0]): raise AlgoComponentError( "'nn' and 'nnp' must be defined in the envelope" ) if env[0]["nn"] > 1: env[0]["nn"] -= 1 newenv = copy.copy(env[0]) newenv["nn"] = 1 newenv["nnp"] += driver.nprocs env.insert(0, newenv) else: env[0]["nnp"] += driver.nprocs else: # Setup a new envelope if not ("nn" in master.options and "nnp" in master.options): raise AlgoComponentError( "'nn' and 'nnp' must be defined for the master executable" ) env = [ dict( nn=1, nnp=master.options["nnp"] + driver.nprocs, openmp=master.options.get("openmp", 1), ) ] if master.options["nn"] > 1: env.append( dict( nn=master.options["nn"] - 1, nnp=master.options["nnp"], openmp=master.options.get("openmp", 1), ) ) if len(mpi.binaries) > 2: env.extend([b.options for b in mpi.binaries[2:]]) return env _MIXIN_MPIENVELOPE_POSTHOOKS = (_bootstrap_mpienvelope_openpalm_posthack,)