Source code for vortex.nwp.algo.stdpost

"""
Various Post-Processing AlgoComponents.
"""

import collections
import json
import re
import time

from bronx.datagrip.namelist import NamelistBlock
from bronx.fancies import loggers
from footprints.stdtypes import FPTuple
import footprints
from taylorism import Boss

from vortex.layout.monitor import (
    BasicInputMonitor,
    AutoMetaGang,
    MetaGang,
    EntrySt,
    GangSt,
)
from vortex.algo.components import (
    AlgoComponentDecoMixin,
    AlgoComponentError,
    algo_component_deco_mixin_autodoc,
)
from vortex.algo.components import (
    TaylorRun,
    BlindRun,
    ParaBlindRun,
    Parallel,
    Expresso,
)
from vortex.syntax.stdattrs import DelayedEnvValue, FmtInt
from vortex.tools.grib import EcGribDecoMixin
from vortex.tools.parallelism import (
    TaylorVortexWorker,
    VortexWorkerBlindRun,
    ParallelResultParser,
)
from vortex.tools.systems import ExecutionError

from ..tools.grib import GRIBFilter
from ..tools.drhook import DrHookDecoMixin

#: No automatic export
__all__ = []

logger = loggers.getLogger(__name__)


class _FA2GribWorker(VortexWorkerBlindRun):
    """The taylorism worker that actually do the gribing (in parallel).

    This is called indirectly by taylorism when :class:`Fa2Grib` is used.
    """

    _footprint = dict(
        attr=dict(
            kind=dict(values=["fa2grib"]),
            # Progrid parameters
            fortnam=dict(),
            fortinput=dict(),
            compact=dict(),
            timeshift=dict(type=int),
            timeunit=dict(type=int),
            numod=dict(type=int),
            sciz=dict(type=int),
            scizoffset=dict(type=int, optional=True),
            # Input/Output data
            file_in=dict(),
            file_out=dict(),
            member=dict(
                type=FmtInt,
                optional=True,
            ),
        )
    )

    def vortex_task(self, **kwargs):
        logger.info("Starting the Fa2Grib processing for tag=%s", self.name)

        thisoutput = "GRIDOUTPUT"
        rdict = dict(rc=True)

        # First, check that the hooks were applied
        for thisinput in [
            x
            for x in self.context.sequence.inputs()
            if x.rh.container.localpath() == self.file_in
        ]:
            if thisinput.rh.delayhooks:
                thisinput.rh.apply_get_hooks()

        # Jump into a working directory
        cwd = self.system.pwd()
        tmpwd = self.system.path.join(cwd, self.file_out + ".process.d")
        self.system.mkdir(tmpwd)
        self.system.cd(tmpwd)

        # Build the local namelist block
        nb = NamelistBlock(name="NAML")
        nb.NBDOM = 1
        nb.CHOPER = self.compact
        nb.INUMOD = self.numod
        if self.scizoffset is not None:
            nb.ISCIZ = self.scizoffset + (
                self.member if self.member is not None else 0
            )
        else:
            if self.sciz:
                nb.ISCIZ = self.sciz
        if self.timeshift:
            nb.IHCTPI = self.timeshift
        if self.timeunit:
            nb.ITUNIT = self.timeunit
        nb["CLFSORT(1)"] = thisoutput
        nb["CDNOMF(1)"] = self.fortinput
        with open(self.fortnam, "w") as namfd:
            namfd.write(nb.dumps())

        # Finally set the actual init file
        self.system.softlink(
            self.system.path.join(cwd, self.file_in), self.fortinput
        )

        # Standard execution
        list_name = self.system.path.join(cwd, self.file_out + ".listing")
        try:
            self.local_spawn(list_name)
        except ExecutionError as e:
            rdict["rc"] = e

        # Freeze the current output
        if self.system.path.exists(thisoutput):
            self.system.move(
                thisoutput, self.system.path.join(cwd, self.file_out)
            )
        else:
            logger.warning("Missing some grib output: %s", self.file_out)
            rdict["rc"] = False

        # Final cleaning
        self.system.cd(cwd)
        self.system.remove(tmpwd)

        if self.system.path.exists(self.file_out):
            # Deal with promised resources
            expected = [
                x
                for x in self.context.sequence.outputs()
                if x.rh.provider.expected
                and x.rh.container.localpath() == self.file_out
            ]
            for thispromise in expected:
                thispromise.put(incache=True)

        logger.info("Fa2Grib processing is done for tag=%s", self.name)

        return rdict


class _GribFilterWorker(TaylorVortexWorker):
    """The taylorism worker that actually filter the gribfiles.

    This is called indirectly by taylorism when :class:`Fa2Grib` is used.
    """

    _footprint = dict(
        attr=dict(
            kind=dict(values=["gribfilter"]),
            # Filter settings
            filters=dict(
                type=FPTuple,
            ),
            concatenate=dict(
                type=bool,
            ),
            # Put files if they are expected
            put_promises=dict(
                type=bool,
                optional=True,
                default=True,
            ),
            # Input/Output data
            file_in=dict(),
            file_outfmt=dict(),
            file_outintent=dict(
                optional=True,
                default="in",
            ),
        )
    )

    def vortex_task(self, **kwargs):
        logger.info("Starting the GribFiltering for tag=%s", self.file_in)

        rdict = dict(rc=True)

        # Create the filtering object and add filters
        gfilter = GRIBFilter(concatenate=self.concatenate)
        if self.filters:
            gfilter.add_filters(*list(self.filters))

        # Process the input file
        newfiles = gfilter(self.file_in, self.file_outfmt, self.file_outintent)

        if newfiles:
            if self.put_promises:
                # Deal with promised resources
                allpromises = [
                    x
                    for x in self.context.sequence.outputs()
                    if x.rh.provider.expected
                ]
                for newfile in newfiles:
                    expected = [
                        x
                        for x in allpromises
                        if x.rh.container.localpath() == newfile
                    ]
                    for thispromise in expected:
                        thispromise.put(incache=True)
        else:
            logger.warning("No file has been generated.")
            rdict["rc"] = False

        logger.info("GribFiltering is done for tag=%s", self.name)

        return rdict


def parallel_grib_filter(
    context,
    inputs,
    outputs,
    intents=(),
    cat=False,
    filters=FPTuple(),
    nthreads=8,
):
    """A simple method that calls the GRIBFilter class in parallel.

    :param vortex.layout.contexts.Context context: the current context
    :param list[str] inputs: the list of input file names
    :param list[str] outputs: the list of output file names
    :param list[str] intents: the list of intent (in|inout) for output files (in if omitted)
    :param bool cat: whether or not to concatenate the input files (False by default)
    :param tuple filters: a list of filters to apply (as a list of JSON dumps)
    :param int nthreads: the maximum number of tasks used concurently (8 by default)
    """
    if not cat and len(filters) == 0:
        raise AlgoComponentError(
            "cat must be true or filters must be provided"
        )
    if len(inputs) != len(outputs):
        raise AlgoComponentError(
            "inputs and outputs must have the same length"
        )
    if len(intents) != len(outputs):
        intents = FPTuple(
            [
                "in",
            ]
            * len(outputs)
        )
    boss = Boss(
        scheduler=footprints.proxy.scheduler(
            limit="threads", max_threads=nthreads
        )
    )
    common_i = dict(
        kind="gribfilter", filters=filters, concatenate=cat, put_promises=False
    )
    for ifile, ofile, intent in zip(inputs, outputs, intents):
        logger.info(
            "%s -> %s (intent: %s) added to the GRIBfilter task's list",
            ifile,
            ofile,
            intent,
        )
        boss.set_instructions(
            common_i,
            dict(
                name=[
                    ifile,
                ],
                file_in=[
                    ifile,
                ],
                file_outfmt=[
                    ofile,
                ],
                file_outintent=[
                    intent,
                ],
            ),
        )
    boss.make_them_work()
    boss.wait_till_finished()
    logger.info("All files are processed.")
    report = boss.get_report()
    prp = ParallelResultParser(context)
    for r in report["workers_report"]:
        if isinstance(prp(r), Exception):
            raise AlgoComponentError("An error occurred in GRIBfilter.")


[docs] class Fa2Grib(ParaBlindRun): """Standard FA conversion, e.g. with PROGRID as a binary resource.""" _footprint = dict( attr=dict( kind=dict( values=["fa2grib"], ), timeout=dict( type=int, optional=True, default=300, ), refreshtime=dict( type=int, optional=True, default=20, ), fatal=dict( type=bool, optional=True, default=True, ), fortnam=dict( optional=True, default="fort.4", ), fortinput=dict( optional=True, default="fort.11", ), compact=dict( optional=True, default=DelayedEnvValue("VORTEX_GRIB_COMPACT", "L"), ), timeshift=dict( type=int, optional=True, default=DelayedEnvValue("VORTEX_GRIB_SHIFT", 0), ), timeunit=dict( type=int, optional=True, default=DelayedEnvValue("VORTEX_GRIB_TUNIT", 1), ), numod=dict( type=int, optional=True, default=DelayedEnvValue("VORTEX_GRIB_NUMOD", 221), ), sciz=dict( type=int, optional=True, default=DelayedEnvValue("VORTEX_GRIB_SCIZ", 0), ), scizoffset=dict( type=int, optional=True, ), ) ) def prepare(self, rh, opts): """Set some variables according to target definition.""" super().prepare(rh, opts) self.system.remove(self.fortinput) self.env.DR_HOOK_NOT_MPI = 1 self.system.subtitle( "{:s} : directory listing (pre-run)".format(self.realkind) ) self.system.dir(output=False, fatal=False) def execute(self, rh, opts): """Loop on the various initial conditions provided.""" self._default_pre_execute(rh, opts) common_i = self._default_common_instructions(rh, opts) # Update the common instructions common_i.update( dict( fortnam=self.fortnam, fortinput=self.fortinput, compact=self.compact, numod=self.numod, sciz=self.sciz, scizoffset=self.scizoffset, timeshift=self.timeshift, timeunit=self.timeunit, ) ) tmout = False # Monitor for the input files bm = BasicInputMonitor( self.context, caching_freq=self.refreshtime, role="Gridpoint", kind="gridpoint", ) with bm: while not bm.all_done or len(bm.available) > 0: while bm.available: s = bm.pop_available().section file_in = s.rh.container.localpath() # Find the name of the output file if s.rh.provider.member is not None: file_out = "GRIB{:s}_{!s}+{:s}".format( s.rh.resource.geometry.area, s.rh.provider.member, s.rh.resource.term.fmthm, ) else: file_out = "GRIB{:s}+{:s}".format( s.rh.resource.geometry.area, s.rh.resource.term.fmthm, ) logger.info( "Adding input file %s to the job list", file_in ) self._add_instructions( common_i, dict( name=[ file_in, ], file_in=[ file_in, ], file_out=[ file_out, ], member=[ s.rh.provider.member, ], ), ) if not (bm.all_done or len(bm.available) > 0): # Timeout ? tmout = bm.is_timedout(self.timeout) if tmout: break # Wait a little bit :-) time.sleep(1) bm.health_check(interval=30) self._default_post_execute(rh, opts) for failed_file in [ e.section.rh.container.localpath() for e in bm.failed.values() ]: logger.error( "We were unable to fetch the following file: %s", failed_file ) if self.fatal: self.delayed_exception_add( IOError("Unable to fetch {:s}".format(failed_file)), traceback=False, ) if tmout: raise OSError("The waiting loop timed out")
[docs] class StandaloneGRIBFilter(TaylorRun): _footprint = dict( attr=dict( kind=dict( values=["gribfilter"], ), timeout=dict( type=int, optional=True, default=300, ), refreshtime=dict( type=int, optional=True, default=20, ), concatenate=dict( type=bool, default=False, optional=True, ), fatal=dict( type=bool, optional=True, default=True, ), ) ) def prepare(self, rh, opts): """Set some variables according to target definition.""" super().prepare(rh, opts) self.system.subtitle( "{:s} : directory listing (pre-run)".format(self.realkind) ) self.system.dir(output=False, fatal=False) def execute(self, rh, opts): # We re-serialise data because footprints don't like dictionaries filters = [ json.dumps(x.rh.contents.data) for x in self.context.sequence.effective_inputs( role="GRIBFilteringRequest", kind="filtering_request" ) ] filters = FPTuple(filters) self._default_pre_execute(rh, opts) common_i = self._default_common_instructions(rh, opts) # Update the common instructions common_i.update(dict(concatenate=self.concatenate, filters=filters)) tmout = False # Monitor for the input files bm = BasicInputMonitor( self.context, caching_freq=self.refreshtime, role="Gridpoint", kind="gridpoint", ) with bm: while not bm.all_done or len(bm.available) > 0: while bm.available: s = bm.pop_available().section file_in = s.rh.container.localpath() file_outfmt = re.sub( r"^(.*?)((:?\.[^.]*)?)$", r"\1_{filtername:s}\2", file_in, ) logger.info( "Adding input file %s to the job list", file_in ) self._add_instructions( common_i, dict( name=[ file_in, ], file_in=[ file_in, ], file_outfmt=[ file_outfmt, ], ), ) if not (bm.all_done or len(bm.available) > 0): # Timeout ? tmout = bm.is_timedout(self.timeout) if tmout: break # Wait a little bit :-) time.sleep(1) bm.health_check(interval=30) self._default_post_execute(rh, opts) for failed_file in [ e.section.rh.container.localpath() for e in bm.failed.values() ]: logger.error( "We were unable to fetch the following file: %s", failed_file ) if self.fatal: self.delayed_exception_add( IOError("Unable to fetch {:s}".format(failed_file)), traceback=False, ) if tmout: raise OSError("The waiting loop timed out")
[docs] class AddField(BlindRun): """Miscellaneous manipulation on input FA resources.""" _footprint = dict( attr=dict( kind=dict( values=["addcst", "addconst", "addfield"], remap=dict( addconst="addcst", ), ), fortnam=dict( optional=True, default="fort.4", ), fortinput=dict( optional=True, default="fort.11", ), fortoutput=dict( optional=True, default="fort.12", ), ) ) def prepare(self, rh, opts): """Set some variables according to target definition.""" super().prepare(rh, opts) self.system.remove(self.fortinput) self.env.DR_HOOK_NOT_MPI = 1 def execute(self, rh, opts): """Loop on the various initial conditions provided.""" # Is there any namelist provided ? namrh = [ x.rh for x in self.context.sequence.effective_inputs( role=("Namelist"), kind="namelist" ) ] if namrh: self.system.softlink(namrh[0].container.localpath(), self.fortnam) else: logger.warning("Do not find any namelist for %s", self.kind) # Look for some sources files srcrh = [ x.rh for x in self.context.sequence.effective_inputs( role=("Gridpoint", "Sources"), kind="gridpoint" ) ] srcrh.sort(key=lambda rh: rh.resource.term) for r in srcrh: self.system.title( "Loop on domain {:s} and term {:s}".format( r.resource.geometry.area, r.resource.term.fmthm ) ) # Some cleaning self.system.remove(self.fortinput) self.system.remove(self.fortoutput) # Prepare double input self.system.link(r.container.localpath(), self.fortinput) self.system.cp(r.container.localpath(), self.fortoutput) # Standard execution opts["loop"] = r.resource.term super().execute(rh, opts) # Some cleaning self.system.rmall("DAPDIR", self.fortinput, self.fortoutput) def postfix(self, rh, opts): """Post add cleaning.""" super().postfix(rh, opts) self.system.remove(self.fortnam)
[docs] class DegradedDiagPEError(AlgoComponentError): """Exception raised when some of the members are missing in the calculations.""" def __init__(self, ginfo, missings): super().__init__() self._ginfo = ginfo self._missings = missings def __str__(self): outstr = ( "Missing input data for geometry={0.area:s}, term={1!s}:\n".format( self._ginfo["geometry"], self._ginfo["term"] ) ) for k, missing in self._missings.items(): for member in missing: outstr += "{:s}: member #{!s}\n".format(k, member) return outstr
[docs] class DiagPE(BlindRun, DrHookDecoMixin, EcGribDecoMixin): """Execution of diagnostics on grib input (ensemble forecasts specific).""" _footprint = dict( attr=dict( kind=dict( values=["diagpe"], ), method=dict( info="The method used to compute the diagnosis", values=["neighbour"], ), numod=dict( type=int, info="The GRIB model number", optional=True, default=DelayedEnvValue("VORTEX_GRIB_NUMOD", 118), ), timeout=dict( type=int, optional=True, default=900, ), refreshtime=dict( type=int, optional=True, default=20, ), missinglimit=dict( type=int, optional=True, default=0, ), waitlimit=dict( type=int, optional=True, default=900, ), fatal=dict( type=bool, optional=True, default=True, ), gribfilter_tasks=dict( type=int, optional=True, default=8, ), ), ) _method2output_map = dict(neighbour="GRIB_PE_VOISIN") def spawn_hook(self): """Usually a good habit to dump the fort.4 namelist.""" super().spawn_hook() if self.system.path.exists("fort.4"): self.system.subtitle( "{:s} : dump namelist <fort.4>".format(self.realkind) ) self.system.cat("fort.4", output=False) def _actual_execute( self, gmembers, ifilters, filters, basedate, finalterm, rh, opts, gang ): mygeometry = gang.info["geometry"] myterm = gang.info["term"] self.system.title( "Start processing for geometry={:s}, term={!s}.".format( mygeometry.area, myterm ) ) # Find out what is the common set of members members = set( gmembers ) # gmembers is mutable: we need a copy of it (hence the explicit set()) missing_members = dict() for subgang in gang.memberslist: smembers = { s.section.rh.provider.member for s in subgang.memberslist if s.state == EntrySt.available } ufomembers = { s.section.rh.provider.member for s in subgang.memberslist if s.state == EntrySt.ufo } missing_members[subgang.nickname] = ( gmembers - smembers - ufomembers ) members &= smembers # Record an error if members != gmembers: newexc = DegradedDiagPEError(gang.info, missing_members) logger.error("Some of the data are missing for this geometry/term") if self.fatal: self.delayed_exception_add(newexc, traceback=False) else: logger.info( "Fatal is false consequently no exception is recorded. It would look like this:" ) print(newexc) members = sorted(members) # This is hopeless :-( if gang.state == GangSt.failed: return # If needed, concatenate or filter the "superset" files supersets = list() for subgang in gang.memberslist: supersets.extend( [ ( s.section.rh.container.localpath(), re.sub( r"^[a-zA-Z]+_(.*)$", r"\1", s.section.rh.container.localpath(), ), ) for s in subgang.memberslist if s.section.role == "GridpointSuperset" ] ) supersets_todo = [ (s, t) for s, t in supersets if not self.system.path.exists(t) ] if supersets_todo: if len(ifilters): parallel_grib_filter( self.context, [s for s, t in supersets_todo], [t for s, t in supersets_todo], filters=ifilters, nthreads=self.gribfilter_tasks, ) else: parallel_grib_filter( self.context, [s for s, t in supersets_todo], [t for s, t in supersets_todo], cat=True, nthreads=self.gribfilter_tasks, ) # Tweak the namelist namsec = self.setlink( initrole="Namelist", initkind="namelist", initname="fort.4" ) for nam in [x.rh for x in namsec if "NAM_PARAM" in x.rh.contents]: logger.info( "Substitute the date (%s) to AAAAMMJJHH namelist entry", basedate.ymdh, ) nam.contents["NAM_PARAM"]["AAAAMMJJHH"] = basedate.ymdh logger.info( "Substitute the number of members (%d) to NBRUN namelist entry", len(members), ) nam.contents["NAM_PARAM"]["NBRUN"] = len(members) logger.info( "Substitute the the number of terms to NECH(0) namelist entry" ) nam.contents["NAM_PARAM"]["NECH(0)"] = 1 logger.info( "Substitute the ressource term to NECH(1) namelist entry" ) # NB: term should be expressed in minutes nam.contents["NAM_PARAM"]["NECH(1)"] = int(myterm) nam.contents["NAM_PARAM"]["ECHFINALE"] = finalterm.hour # Now, update the model number for the GRIB files logger.info( "Substitute the model number (%d) to namelist entry", self.numod, ) nam.contents["NAM_PARAM"]["NMODELE"] = self.numod # Add the NAM_PARAMPE block if "NAM_NMEMBRES" in nam.contents: # Cleaning is needed... del nam.contents["NAM_NMEMBRES"] newblock = nam.contents.newblock("NAM_NMEMBRES") for i, member in enumerate(members): newblock["NMEMBRES({:d})".format(i + 1)] = int(member) # We are done with the namelist nam.save() # Standard execution opts["loop"] = myterm super().execute(rh, opts) actualname = r"{:s}_{:s}\+{:s}".format( self._method2output_map[self.method], mygeometry.area, myterm.fmthm ) # Find out the output file and filter it filtered_out = list() if len(filters): for candidate in [ f for f in self.system.glob( self._method2output_map[self.method] + "*" ) if re.match(actualname, f) ]: logger.info("Starting GRIB filtering on %s.", candidate) filtered_out.extend( filters(candidate, candidate + "_{filtername:s}") ) # The diagnostic output may be promised expected = [ x for x in self.promises if ( re.match(actualname, x.rh.container.localpath()) or x.rh.container.localpath() in filtered_out ) ] for thispromise in expected: thispromise.put(incache=True) def execute(self, rh, opts): """Loop on the various grib files provided.""" # Intialise a GRIBFilter for output files (at least try to) gfilter = GRIBFilter(concatenate=False) # We re-serialise data because footprints don't like dictionaries ofilters = [ x.rh.contents.data for x in self.context.sequence.effective_inputs( role="GRIBFilteringRequest", kind="filtering_request" ) ] gfilter.add_filters(ofilters) # Do we need to filter input files ? # We re-serialise data because footprints don't like dictionaries ifilters = [ json.dumps(x.rh.contents.data) for x in self.context.sequence.effective_inputs( role="GRIBInputFilteringRequest" ) ] # Monitor for the input files bm = BasicInputMonitor( self.context, caching_freq=self.refreshtime, role=(re.compile(r"^Gridpoint"), "Sources"), kind="gridpoint", ) # Check that the date is consistent among inputs basedates = set() members = set() for rhI in [s.section.rh for s in bm.memberslist]: basedates.add(rhI.resource.date) members.add(rhI.provider.member) if len(basedates) > 1: raise AlgoComponentError( "The date must be consistent among the input resources" ) basedate = basedates.pop() # Setup BasicGangs basicmeta = AutoMetaGang() basicmeta.autofill( bm, ("term", "safeblock", "geometry"), allowmissing=self.missinglimit, waitlimit=self.waitlimit, ) # Find out what are the terms, domains and blocks geometries = set() terms = collections.defaultdict(set) blocks = collections.defaultdict(set) reverse = dict() for m in basicmeta.memberslist: (geo, term, block) = ( m.info["geometry"], m.info["term"], m.info["safeblock"], ) geometries.add(geo) terms[geo].add(term) blocks[geo].add(block) reverse[(geo, term, block)] = m for geometry in geometries: terms[geometry] = sorted(terms[geometry]) # Setup the MetaGang that fits our needs complexmeta = MetaGang() complexgangs = collections.defaultdict(collections.deque) for geometry in geometries: nterms = len(terms[geometry]) for i_term, term in enumerate(terms[geometry]): elementary_meta = MetaGang() elementary_meta.info = dict(geometry=geometry, term=term) cterms = [ terms[geometry][i] for i in range(i_term, min(i_term + 2, nterms)) ] for inside_term in cterms: for inside_block in blocks[geometry]: try: elementary_meta.add_member( reverse[(geometry, inside_term, inside_block)] ) except KeyError: raise KeyError( "Something is wrong in the inputs: check again !" ) complexmeta.add_member(elementary_meta) complexgangs[geometry].append(elementary_meta) # Now, starts monitoring everything with bm: current_gang = dict() for geometry in geometries: try: current_gang[geometry] = complexgangs[geometry].popleft() except IndexError: current_gang[geometry] = None while any([g is not None for g in current_gang.values()]): for geometry, a_gang in [ (g, current_gang[g]) for g in geometries if ( current_gang[g] is not None and current_gang[g].state is not GangSt.ufo ) ]: self._actual_execute( members, ifilters, gfilter, basedate, terms[geometry][-1], rh, opts, a_gang, ) # Next one try: current_gang[geometry] = complexgangs[ geometry ].popleft() except IndexError: current_gang[geometry] = None if not ( bm.all_done or any( gang is not None and gang.state is not GangSt.ufo for gang in current_gang.values() ) ): # Timeout ? bm.is_timedout(self.timeout, IOError) # Wait a little bit :-) time.sleep(1) bm.health_check(interval=30)
@algo_component_deco_mixin_autodoc class _DiagPIDecoMixin(AlgoComponentDecoMixin): """Class variables and methods usefull for DiagPI.""" _MIXIN_EXTRA_FOOTPRINTS = [ footprints.Footprint( attr=dict( kind=dict( values=["diagpi", "diaglabo"], ), numod=dict( info="The GRIB model number", type=int, optional=True, default=DelayedEnvValue("VORTEX_GRIB_NUMOD", 62), ), gribcat=dict(type=bool, optional=True, default=False), gribfilter_tasks=dict( type=int, optional=True, default=8, ), ), ) ] def _prepare_pihook(self, rh, opts): """Set some variables according to target definition.""" # Check for input files to concatenate if self.gribcat: srcsec = self.context.sequence.effective_inputs( role=("Gridpoint", "Sources", "Preview", "Previous"), kind="gridpoint", ) cat_list_in = [sec for sec in srcsec if not sec.rh.is_expected()] outsec = self.context.sequence.effective_inputs( role="GridpointOutputPrepare" ) cat_list_out = [sec for sec in outsec if not sec.rh.is_expected()] self._automatic_cat(cat_list_in, cat_list_out) # prepare for delayed filtering self._delayed_filtering = [] def _postfix_pihook(self, rh, opts): """Filter outputs.""" if self._delayed_filtering: self._batch_filter(self._delayed_filtering) def _spawn_pihook(self): """Usually a good habit to dump the fort.4 namelist.""" if self.system.path.exists("fort.4"): self.system.subtitle( "{:s} : dump namelist <fort.4>".format(self.realkind) ) self.system.cat("fort.4", output=False) _MIXIN_PREPARE_HOOKS = (_prepare_pihook,) _MIXIN_POSTFIX_HOOKS = (_postfix_pihook,) _MIXIN_SPAWN_HOOKS = (_spawn_pihook,) def _automatic_cat(self, list_in, list_out): """Concatenate the *list_in* and *list_out* input files.""" if self.gribcat: inputs = [] outputs = [] intents = [] for seclist, intent in zip((list_in, list_out), ("in", "inout")): for isec in seclist: tmpin = isec.rh.container.localpath() + ".tmpcat" self.system.move( isec.rh.container.localpath(), tmpin, fmt="grib" ) inputs.append(tmpin) outputs.append(isec.rh.container.localpath()) intents.append(intent) parallel_grib_filter( self.context, inputs, outputs, intents, cat=True, nthreads=self.gribfilter_tasks, ) for ifile in inputs: self.system.rm(ifile, fmt="grib") def _batch_filter(self, candidates): """If no promises are made, the GRIB are filtered at once at the end.""" # We re-serialise data because footprints don't like dictionaries filters = [ json.dumps(x.rh.contents.data) for x in self.context.sequence.effective_inputs( role="GRIBFilteringRequest", kind="filtering_request" ) ] parallel_grib_filter( self.context, candidates, [f + "_{filtername:s}" for f in candidates], filters=FPTuple(filters), nthreads=self.gribfilter_tasks, ) def _execute_picommons(self, rh, opts): """Loop on the various grib files provided.""" # Intialise a GRIBFilter (at least try to) gfilter = GRIBFilter(concatenate=False) gfilter.add_filters(self.context) srcsec = self.context.sequence.effective_inputs( role=("Gridpoint", "Sources"), kind="gridpoint" ) srcsec.sort(key=lambda s: s.rh.resource.term) outsec = self.context.sequence.effective_inputs( role="GridpointOutputPrepare" ) if outsec: outsec.sort(key=lambda s: s.rh.resource.term) for sec in srcsec: r = sec.rh self.system.title( "Loop on domain {:s} and term {:s}".format( r.resource.geometry.area, r.resource.term.fmthm ) ) # Tweak the namelist namsec = self.setlink( initrole="Namelist", initkind="namelist", initname="fort.4" ) for nam in [x.rh for x in namsec if "NAM_PARAM" in x.rh.contents]: logger.info( "Substitute the date (%s) to AAAAMMJJHH namelist entry", r.resource.date.ymdh, ) nam.contents["NAM_PARAM"]["AAAAMMJJHH"] = r.resource.date.ymdh logger.info( "Substitute the the number of terms to NECH(0) namelist entry" ) nam.contents["NAM_PARAM"]["NECH(0)"] = 1 logger.info( "Substitute the ressource term to NECH(1) namelist entry" ) # NB: term should be expressed in minutes nam.contents["NAM_PARAM"]["NECH(1)"] = int(r.resource.term) # Add the member number in a dedicated namelist block if r.provider.member is not None: mblock = nam.contents.newblock("NAM_PARAMPE") mblock["NMEMBER"] = int(r.provider.member) # Now, update the model number for the GRIB files if "NAM_DIAG" in nam.contents: nmod = self.numod logger.info( "Substitute the model number (%d) to namelist entry", nmod, ) for namk in ("CONV", "BR", "HIV", "ECHOT", "ICA", "PSN"): if ( namk in nam.contents["NAM_DIAG"] and nam.contents["NAM_DIAG"][namk] != 0 ): nam.contents["NAM_DIAG"][namk] = nmod # We are done with the namelist nam.save() cat_list_in = [] cat_list_out = [] # Expect the input grib file to be here if sec.rh.is_expected(): cat_list_in.append(sec) self.grab(sec, comment="diagpi source") if outsec: out = outsec.pop(0) assert out.rh.resource.term == sec.rh.resource.term if out.rh.is_expected(): cat_list_out.append(out) self.grab(out, comment="diagpi output") # Also link in previous grib files in order to compute some winter diagnostics srcpsec = [ x for x in self.context.sequence.effective_inputs( role=("Preview", "Previous"), kind="gridpoint" ) if x.rh.resource.term < r.resource.term ] for pr in srcpsec: if pr.rh.is_expected(): cat_list_in.append(pr) self.grab( pr, comment="diagpi additional source for winter diag" ) self._automatic_cat(cat_list_in, cat_list_out) # Standard execution opts["loop"] = r.resource.term super(self.mixin_execute_companion(), self).execute(rh, opts) actualname = r"GRIB[-_A-Z]+{:s}\+{:s}(?:_member\d+)?$".format( r.resource.geometry.area, r.resource.term.fmthm ) # Find out the output file and filter it filtered_out = list() if len(gfilter): for candidate in [ f for f in self.system.glob("GRIB*") if re.match(actualname, f) ]: if len(self.promises): logger.info( "Starting GRIB filtering on %s.", candidate ) filtered_out.extend( gfilter(candidate, candidate + "_{filtername:s}") ) else: self._delayed_filtering.append(candidate) # The diagnostic output may be promised expected = [ x for x in self.promises if ( re.match(actualname, x.rh.container.localpath()) or x.rh.container.localpath() in filtered_out ) ] for thispromise in expected: thispromise.put(incache=True) _MIXIN_EXECUTE_OVERWRITE = _execute_picommons
[docs] class DiagPI(BlindRun, _DiagPIDecoMixin, EcGribDecoMixin): """Execution of diagnostics on grib input (deterministic forecasts specific).""" pass
[docs] class DiagPIMPI(Parallel, _DiagPIDecoMixin, EcGribDecoMixin): """Execution of diagnostics on grib input (deterministic forecasts specific).""" pass
[docs] class Fa2GaussGrib(BlindRun, DrHookDecoMixin): """Standard FA conversion, e.g. with GOBPTOUT as a binary resource.""" _footprint = dict( attr=dict( kind=dict( values=["fa2gaussgrib"], ), fortinput=dict( optional=True, default="PFFPOS_FIELDS", ), numod=dict( type=int, optional=True, default=DelayedEnvValue("VORTEX_GRIB_NUMOD", 212), ), verbose=dict( type=bool, optional=True, default=False, ), ) ) def execute(self, rh, opts): """Loop on the various initial conditions provided.""" thisoutput = "GRID_" + self.fortinput[7:14] + "1" gpsec = self.context.sequence.effective_inputs( role=("Historic", "ModelState") ) gpsec.sort(key=lambda s: s.rh.resource.term) for sec in gpsec: r = sec.rh self.system.title( "Loop on files: {:s}".format(r.container.localpath()) ) # Some preventive cleaning self.system.remove(thisoutput) self.system.remove("fort.4") # Build the local namelist block nb = NamelistBlock(name="NAML") nb.NBDOM = 1 nb.INUMOD = self.numod nb["LLBAVE"] = self.verbose nb["CDNOMF(1)"] = self.fortinput with open("fort.4", "w") as namfd: namfd.write(nb.dumps()) self.system.header( "{:s} : local namelist {:s} dump".format( self.realkind, "fort.4" ) ) self.system.cat("fort.4", output=False) # Expect the input FP file source to be there... self.grab(sec, comment="fullpos source") # Finally set the actual init file self.system.softlink(r.container.localpath(), self.fortinput) # Standard execution super().execute(rh, opts) # Freeze the current output if self.system.path.exists(thisoutput): self.system.move( thisoutput, "GGRID" + r.container.localpath()[6:], fmt="grib", ) else: logger.warning("Missing some grib output for %s", thisoutput) # Some cleaning self.system.rmall(self.fortinput)
[docs] class Reverser(BlindRun, DrHookDecoMixin): """Compute the initial state for Ctpini.""" _footprint = dict( info="Compute initial state for Ctpini.", attr=dict( kind=dict( values=["reverser"], ), param_iter=dict( type=int, ), condlim=dict( type=int, ), ano_type=dict( type=int, ), ), ) def prepare(self, rh, opts): # Get info about the directives files directory directives = self.context.sequence.effective_inputs( role="Directives", kind="ctpini_directives_file" ) if len(directives) < 1: logger.error("No directive file found. Stop") raise ValueError("No directive file found.") if len(directives) > 1: logger.warning( "Multiple directive files found. This is strange..." ) # Substitute values in the simili namelist param = self.context.sequence.effective_inputs(role="Param") if len(param) < 1: logger.error("No parameter file found. Stop") raise ValueError("No parameter file found.") elif len(param) > 1: logger.warning( "Multiple files for parameter, the first %s is taken", param[0].rh.container.filename, ) param = param[0].rh paramct = param.contents dictkeyvalue = dict() dictkeyvalue[r"param_iter"] = str(self.param_iter) dictkeyvalue[r"condlim"] = str(self.condlim) dictkeyvalue[r"ano_type"] = str(self.ano_type) paramct.setitems(dictkeyvalue) param.save() logger.info("Here is the parameter file (after substitution):") param.container.cat() # Call the parent's prepare super().prepare(rh, opts)
[docs] class DegradedEnsembleDiagError(AlgoComponentError): """Exception raised when some of the members are missing.""" pass
[docs] class FailedEnsembleDiagError(DegradedEnsembleDiagError): """Exception raised when too many members are missing.""" pass
[docs] class PyEnsembleDiag(Expresso): """Execution of diagnostics on grib input (ensemble forecasts specific).""" _footprint = dict( attr=dict( kind=dict( values=["py_diag_ens"], ), timeout=dict( type=int, optional=True, default=1200, ), refreshtime=dict( type=int, optional=True, default=20, ), missinglimit=dict( type=int, optional=True, default=0, ), waitlimit=dict( type=int, optional=True, default=900, ), ), ) def __init__(self, *kargs, **kwargs): super().__init__(*kargs, **kwargs) self._cl_args = dict() def spawn_command_options(self): """Prepare options for the resource's command line.""" return self._cl_args def _actual_execute(self, rh, opts, input_rhs, **infos): """Actually run the script for a specific bunch of input files (**inpu_rhs**).""" output_fname = ( "ensdiag_{safeblock:s}_{geometry.tag:s}_{term.fmthm}.grib".format( **infos ) ) self._cl_args = dict(flowconf="flowconf.json", output=output_fname) # Create the JSON file that will be ingested by the script self.system.json_dump( dict( date=input_rhs[0].resource.date.ymdhm, term=infos["term"].fmthm, geometry=infos["geometry"].tag, area=infos["geometry"].area, block=infos["safeblock"], grib_files=[r.container.localpath() for r in input_rhs], ), self._cl_args["flowconf"], ) # Actualy run the post-processing script super().execute(rh, opts) # The diagnostic output may be promised for thispromise in [ x for x in self.promises if output_fname == x.rh.container.localpath() ]: thispromise.put(incache=True) @staticmethod def _gang_txt_id(gang): """A string that identifies the input data currently being processed.""" return ( "term={term.fmthm:s}, " + "geometry={geometry.tag:s} " + "and block={safeblock:s}" ).format(**gang.info) def _handle_gang_rescue(self, gang): """If some of the entries are missing, create a delayed exception.""" if gang.state in (GangSt.pcollectable, GangSt.failed): txt_id = self._gang_txt_id(gang) self.system.subtitle("WARNING: Missing data for " + txt_id) for st in (EntrySt.ufo, EntrySt.failed, EntrySt.expected): if gang.members[st]: print( "Here is the list of Resource Handler with status < {:s} >:".format( st ) ) for i, e in enumerate(gang.members[st]): e.section.rh.quickview(nb=i + 1, indent=1) self.delayed_exception_add( FailedEnsembleDiagError( "Too many inputs are missing for " + txt_id ) if gang.state == GangSt.failed else DegradedEnsembleDiagError( "Some of the inputs are missing for " + txt_id ), traceback=False, ) def execute(self, rh, opts): """Loop on the various grib files provided.""" # Monitor for the input files bm = BasicInputMonitor( self.context, caching_freq=self.refreshtime, role="Gridpoint" ) # Check that the date is consistent among inputs basedates = set() members = set() for rhI in [s.section.rh for s in bm.memberslist]: basedates.add(rhI.resource.date) members.add(rhI.provider.member) if len(basedates) > 1: raise AlgoComponentError( "The date must be consistent among the input resources" ) # Setup BasicGangs basicmeta = AutoMetaGang() basicmeta.autofill( bm, ("term", "safeblock", "geometry"), allowmissing=self.missinglimit, waitlimit=self.waitlimit, ) # Now, starts monitoring everything with bm: while basicmeta.has_ufo() or basicmeta.has_pcollectable(): for thegang in basicmeta.consume_pcolectable(): txt_id = self._gang_txt_id(thegang) self.system.title("Dealing with " + txt_id) available = thegang.members[EntrySt.available] self._handle_gang_rescue(thegang) self._actual_execute( rh, opts, [e.section.rh for e in available], **thegang.info, ) self.system.highlight("Done with " + txt_id) if ( not bm.all_done and basicmeta.has_ufo() and not basicmeta.has_pcollectable() ): # Timeout ? tmout = bm.is_timedout(self.timeout) if tmout: break # Wait a little bit :-) time.sleep(1) bm.health_check(interval=30) # Warn for failed gangs if basicmeta.members[GangSt.failed]: self.system.title( "One or several (term, geometry, block) group(s) could not be processed" ) for thegang in basicmeta.members[GangSt.failed]: self._handle_gang_rescue(thegang)