Source code for vortex.nwp.algo.eda

"""
AlgoComponents dedicated to computations related to the Ensemble Data Assimilation
system.
"""

import math
import re

from bronx.fancies import loggers
from bronx.stdtypes.date import Month, Time
import footprints

from vortex.algo.components import AlgoComponentError
from .ifsroot import IFSParallel

#: Automatic export off
__all__ = []

logger = loggers.getLogger(__name__)


class IFSEdaAbstractAlgo(IFSParallel):
    """Base class for any EDA related task wrapped into an IFS/Arpege binary."""

    _abstract = True
    _footprint = dict(
        info="Base class for any EDA related task",
        attr=dict(
            inputnaming=dict(
                info="Prescribe your own naming template for input files.",
                optional=True,
                doc_visibility=footprints.doc.visibility.ADVANCED,
            ),
        ),
    )

    def naming_convention(self, kind, rh, actualfmt=None, **kwargs):
        """Take into account the *inputnaming* attribute."""
        if kind == "edainput":
            return super().naming_convention(
                kind,
                rh,
                actualfmt=actualfmt,
                namingformat=self.inputnaming,
                **kwargs,
            )
        else:
            return super().naming_convention(
                kind, rh, actualfmt=actualfmt, **kwargs
            )


class IFSEdaEnsembleAbstractAlgo(IFSEdaAbstractAlgo):
    """Base class for any EDA related task wrapped into an IFS/Arpege binary.

    This extends the :class:`IFSEdaAbstractAlgo` with a *nbmember* attribute and
    the ability to detect the input files and re-number them (in order to be able
    to deal with missing members).
    """

    _INPUTS_ROLE = "ModelState"

    _abstract = True
    _footprint = dict(
        info="Base class for any EDA related task",
        attr=dict(
            nbmember=dict(
                info="The number of members to deal will (auto-detected if omitted)",
                type=int,
                optional=True,
            ),
            nbmin=dict(
                info="The minimum number of input members that is mandatory to go one",
                type=int,
                optional=True,
                default=2,
            ),
        ),
    )

    def __init__(self, *kargs, **kwargs):
        super().__init__(*kargs, **kwargs)
        self._actual_nbe = self.nbmember

    @property
    def actual_nbe(self):
        """The effective number of members."""
        return self._actual_nbe

    @property
    def actual_totalnumber(self):
        """The total number of members (=! actual_nbe for lagged ensembles. see below)."""
        return self.actual_nbe

    @property
    def actual_nbmin(self):
        """The minimum number of effective members that are mandatory to go one."""
        return self.nbmin

    @staticmethod
    def _members_sorting_key(s):
        """Return the sorting key for the **s** section."""
        member = getattr(s.rh.provider, "member", None)
        member = -math.inf if member is None else member
        block = getattr(s.rh.provider, "block", "")
        filename = getattr(s.rh.container, "basename", "")
        return member, block, filename

    def _members_effective_inputs(self):
        """The list of effective sections representing input data."""
        return sorted(
            self.context.sequence.effective_inputs(role=self._INPUTS_ROLE),
            key=self._members_sorting_key,
        )

    def _members_all_inputs(self):
        """The list of sections representing input data."""
        return sorted(
            self.context.sequence.filtered_inputs(role=self._INPUTS_ROLE),
            key=self._members_sorting_key,
        )

    def _check_members_list_numbering(self, rh, mlist, mformats):
        """Check if, for the **mlist** members list, some renaming id needed."""
        if len(set(mlist)) != len(mlist):
            logger.warning(
                "Some members are duplicated. That's very strange..."
            )
        if self.nbmember is None:
            self.algoassert(
                len(mformats) <= 1, "Mixed formats are not allowed !"
            )
        elif self.nbmember and len(mformats) > 1:
            logger.info(
                "%s have mixed formats... please correct that.",
                self._INPUTS_ROLE,
            )
        if mlist and self.nbmember is not None:
            # Consistency check
            if len(mlist) != self.nbmember:
                logger.warning(
                    "Discrepancy between *nbmember* and effective input files..."
                    + " sticking with *nbmember*"
                )
            else:
                logger.info("The input files member numbers checks out !")
            return False  # Ok, apparently the user knows what she/he is doing
        elif self.nbmember and not mlist:
            return False  # Ok, apparently the user knows what she/he is doing
        elif mlist and self.nbmember is None:
            innc = self.naming_convention(
                kind="edainput",
                variant=self.kind,
                rh=rh,
                totalnumber=self.actual_totalnumber,
                actualfmt=mformats.pop(),
            )
            checkfiles = [
                m
                for m in range(1, len(mlist) + 1)
                if self.system.path.exists(innc(number=m))
            ]
            if len(checkfiles) == len(mlist):
                logger.info("The input files numbering checks out !")
                return (
                    False  # Ok, apparently the user knows what she/he is doing
                )
            elif len(checkfiles) == 0:
                return True
            else:
                raise AlgoComponentError(
                    "Members renumbering is needed but some "
                    + "files are blocking the way !"
                )
        elif len(mlist) == 0 and self.nbmember is None:
            raise AlgoComponentError("No input files where found !")

    def modelstate_needs_renumbering(self, rh):
        """Check if, for the **mlist** members list, some renaming id needed."""
        eff_sections = self._members_effective_inputs()
        eff_members = [sec.rh.provider.member for sec in eff_sections]
        eff_formats = {sec.rh.container.actualfmt for sec in eff_sections}
        if eff_members and self.nbmember is None:
            self._actual_nbe = len(eff_members)
        return self._check_members_list_numbering(rh, eff_members, eff_formats)

    def modelstate_renumbering(self, rh, mlist):
        """Actualy rename the effective inputs."""
        eff_format = mlist[0].rh.container.actualfmt
        innc = self.naming_convention(
            kind="edainput",
            variant=self.kind,
            rh=rh,
            totalnumber=self.actual_totalnumber,
            actualfmt=eff_format,
        )
        for i, s in enumerate(mlist, start=1):
            logger.info(
                "Soft-Linking %s to %s",
                s.rh.container.localpath(),
                innc(number=i),
            )
            self.system.softlink(s.rh.container.localpath(), innc(number=i))

    def prepare_namelist_delta(self, rh, namcontents, namlocal):
        """Update the namelists with EDA related macros."""
        nam_updated = super(IFSEdaAbstractAlgo, self).prepare_namelist_delta(
            rh, namcontents, namlocal
        )
        if self.actual_nbe is not None:
            self._set_nam_macro(namcontents, namlocal, "NBE", self.actual_nbe)
            nam_updated = True
        return nam_updated

    def prepare(self, rh, opts):
        """Check the input files and act on it."""
        self.system.subtitle("Solving the input files nightmare...")
        if self.modelstate_needs_renumbering(rh):
            eff_sections = self._members_effective_inputs()
            if len(eff_sections) < self.actual_nbmin:
                raise AlgoComponentError(
                    "Not enough input files to continue..."
                )
            logger.info(
                "Starting input files renumbering. %d effective members found",
                len(eff_sections),
            )
            self.modelstate_renumbering(rh, eff_sections)
        self.system.subtitle("Other IFS related settings")
        super().prepare(rh, opts)


class IFSEdaLaggedEnsembleAbstractAlgo(IFSEdaEnsembleAbstractAlgo):
    """Base class for any EDA related task wrapped into an IFS/Arpege binary.

    This extends the :class:`IFSEdaEnsembleAbstractAlgo` with a *nblag* attribute
    and the ability to detect the input files and re-number them (in order to be
    able to deal with missing members).
    """

    _PADDING_ROLE = "PaddingModelState"

    _abstract = True
    _footprint = dict(
        info="Base class for any EDA related task",
        attr=dict(
            nblag=dict(
                info="The number of lagged dates (auto-detected if omitted)",
                type=int,
                optional=True,
            ),
            padding=dict(
                info="Fill the gaps with some kind of climatological data",
                type=bool,
                optional=True,
                default=False,
            ),
        ),
    )

    def __init__(self, *kargs, **kwargs):
        super().__init__(*kargs, **kwargs)
        self._actual_nresx = self.nblag

    @property
    def actual_nresx(self):
        """The effective number of lagged dates."""
        return self._actual_nresx

    @property
    def actual_totalnumber(self):
        """The total number of members."""
        return (
            self.actual_nbe * self.actual_nresx
            if self.padding
            else self.actual_nbe
        )

    @property
    def actual_nbmin(self):
        """The minimum number of effective members that are mandatory to go one."""
        return self.nbmin * self.actual_nresx if self.padding else self.nbmin

    def _members_sorting_key(self, s):
        """Return the sorting key for the **s** section."""
        stuple = list(super()._members_sorting_key(s))
        rdate = getattr(s.rh.resource, "date")
        stuple.insert(0, rdate)
        return tuple(stuple)

    def modelstate_needs_renumbering(self, rh):
        """Check if, for the **mlist** members list, some renaming id needed."""
        all_sections = self._members_all_inputs()
        eff_sections = self._members_effective_inputs()

        # Look for available dates (lagged ensemble)
        all_dates = {sec.rh.resource.date for sec in all_sections}
        if all_dates and self.nblag is not None:
            # Consistency check
            if len(all_dates) != self.nblag:
                logger.warning(
                    "Discrepancy between *nblag* and input files..."
                    + " sticking with *nblag*"
                )
        elif all_dates and self.nblag is None:
            self._actual_nresx = len(all_dates)

        # Fetch the effective members list
        if self.padding:
            # For each date, check the member's list
            d_blocks = dict()
            for a_date in all_dates:
                d_sections = [
                    sec
                    for sec in all_sections
                    if sec.rh.resource.date == a_date
                ]
                d_members = sorted(
                    [sec.rh.provider.member for sec in d_sections]
                )
                d_formats = {sec.rh.container.actualfmt for sec in d_sections}
                d_blocks[a_date] = (d_members, d_formats)
            ref_date, (eff_members, eff_formats) = d_blocks.popitem()
            for a_date, a_data in d_blocks.items():
                self.algoassert(
                    a_data[0] == eff_members,
                    "Inconsistent members list (date={!s} vs {!s}).".format(
                        a_date, ref_date
                    ),
                )
                self.algoassert(
                    a_data[1] == eff_formats,
                    "Inconsistent formats list (date={!s} vs {!s}).".format(
                        a_date, ref_date
                    ),
                )
            d_blocks[ref_date] = (eff_members, eff_formats)
            if eff_members and self.nbmember is None:
                # Here, NBE is the number of members for one date
                self._actual_nbe = len(eff_members)
            # Generate a complete list off input files
            eff_members = []
            for a_date, a_data in sorted(d_blocks.items()):
                for member in a_data[0]:
                    eff_members.append((a_date, member))
        else:
            eff_members = [
                (sec.rh.resource.date, sec.rh.provider.member)
                for sec in eff_sections
            ]
            eff_formats = {sec.rh.container.actualfmt for sec in eff_sections}
            if eff_members and self.nbmember is None:
                # Here, NBE is the number of members for all dates
                self._actual_nbe = len(eff_members)

        return self._check_members_list_numbering(rh, eff_members, eff_formats)

    def modelstate_renumbering(self, rh, mlist):
        """Actualy rename the effective inputs."""
        if self.padding:
            eff_format = mlist[0].rh.container.actualfmt
            innc = self.naming_convention(
                kind="edainput",
                variant=self.kind,
                rh=rh,
                totalnumber=self.actual_totalnumber,
                actualfmt=eff_format,
            )
            all_sections = self._members_all_inputs()
            paddingstuff = self.context.sequence.effective_inputs(
                role=self._PADDING_ROLE
            )
            for i, s in enumerate(all_sections, start=1):
                if s.stage == "get" and s.rh.container.exists():
                    logger.info(
                        "Soft-Linking %s to %s",
                        s.rh.container.localpath(),
                        innc(number=i),
                    )
                    self.system.softlink(
                        s.rh.container.localpath(), innc(number=i)
                    )
                else:
                    mypadding = None
                    for p in paddingstuff:
                        if (
                            getattr(
                                p.rh.resource,
                                "ipert",
                                getattr(p.rh.resource, "number", None),
                            )
                            == i
                        ):
                            mypadding = p
                            break
                        else:
                            if (
                                getattr(p.rh.resource, "date", None)
                                == s.rh.resource.date
                                and getattr(p.rh.provider, "member", None)
                                == s.rh.provider.member
                            ):
                                mypadding = p
                                break
                    if mypadding is not None:
                        logger.warning(
                            "Soft-Linking Padding data %s to %s",
                            mypadding.rh.container.localpath(),
                            innc(number=i),
                        )
                        self.system.softlink(
                            mypadding.rh.container.localpath(), innc(number=i)
                        )
                    else:
                        raise AlgoComponentError(
                            "No padding data where found for i= {:d}: {!s}".format(
                                i, s
                            )
                        )
        else:
            super().modelstate_renumbering(rh, mlist)

    def prepare_namelist_delta(self, rh, namcontents, namlocal):
        """Update the namelists with EDA related macros."""
        nam_updated = super().prepare_namelist_delta(rh, namcontents, namlocal)
        if self.actual_nresx is not None:
            self._set_nam_macro(
                namcontents, namlocal, "NRESX", self.actual_nresx
            )
            nam_updated = True
        return nam_updated


[docs] class IFSEdaFemars(IFSEdaAbstractAlgo): """Convert some FA file in ECMWF-GRIB files. PLEASE DO NOT USE !""" _footprint = dict( info="Convert some FA file in ECMWF-GRIB files.", attr=dict( kind=dict( values=["femars"], ), rawfiles=dict( type=bool, optional=True, default=False, ), ), ) def postfix(self, rh, opts): """Find out if any special resources have been produced.""" sh = self.system # Gather rawfiles in folders if self.rawfiles: flist = sh.glob("tmprawfile_D000_L*") dest = "rawfiles" logger.info("Creating a rawfiles pack: %s", dest) sh.mkdir(dest) for fic in flist: sh.mv(fic, dest, fmt="grib") super().postfix(rh, opts)
[docs] class IFSInflationLike(IFSEdaAbstractAlgo): """Apply the inflation scheme on a given modelstate.""" _RUNSTORE = "RUNOUT" _USELESS_MATCH = re.compile(r"^(?P<target>\w+)\+term\d+:\d+$") _footprint = dict( info="Operations around the background error covariance matrix", attr=dict( kind=dict( values=[ "infl", "pert", ], ), conf=dict( values=[ 701, ] ), ), ) def __init__(self, *kargs, **kwargs): super().__init__(*kargs, **kwargs) self._outputs_shelf = list() def _check_effective_terms(self, roles): eff_terms = None for role in roles: eterm = { sec.rh.resource.term for sec in self.context.sequence.effective_inputs(role=role) } if eterm: if eff_terms is None: eff_terms = eterm else: if eff_terms != eterm: raise AlgoComponentError( "Inconsistencies between inputs effective terms." ) return sorted(eff_terms) def _link_stuff_in( self, role, actualterm, targetnc, targetintent="in", wastebasket=None ): estuff = [ sec for sec in self.context.sequence.effective_inputs(role=role) if sec.rh.resource.term == actualterm ] if len(estuff) > 1: logger.warning( "Multiple %s for the same date ! Going on...", role ) elif len(estuff) == 1: # Detect the inputs format actfmt = estuff[0].rh.container.actualfmt nconv = self.naming_convention(actualfmt=actfmt, **targetnc) targetname = nconv(**targetnc) if self.system.path.exists(targetname): logger.info( "%s: %s already exists. Hopping for the best...", role, targetname, ) else: logger.info( "%s: copying (intent=%s) %s to %s", role, targetintent, estuff[0].rh.container.localpath(), targetname, ) self.system.cp( estuff[0].rh.container.localpath(), targetname, fmt=actfmt, intent=targetintent, ) if wastebasket is not None: wastebasket.append((targetname, actfmt)) return nconv, targetname, estuff[0] return None, None, None def execute(self, rh, opts): """Loop on the various terms provided.""" eff_terms = self._check_effective_terms( ["ModelState", "EnsembleMean", "Guess"] ) fix_curclim = self.do_climfile_fixer(rh, convkind="modelclim") fix_clclim = self.do_climfile_fixer(rh, convkind="closest_modelclim") if eff_terms: for actualterm in eff_terms: wastebasket = list() self.system.title("Loop on term {!s}".format(actualterm)) self.system.subtitle("Solving the input files nightmare...") # Ensemble Mean ? mean_number = 2 if self.model == "arome" else 0 targetnc = dict( kind="edainput", variant="infl", rh=rh, number=mean_number ) self._link_stuff_in( "EnsembleMean", actualterm, targetnc, wastebasket=wastebasket, ) # Model State ? targetnc = dict( kind="edainput", variant="infl", rh=rh, number=1 ) _, _, mstate = self._link_stuff_in( "ModelState", actualterm, targetnc, wastebasket=wastebasket ) # Control ? control_number = 0 if self.model == "arome" else 2 targetnc = dict( kind="edainput", variant="infl", rh=rh, number=control_number, ) self._link_stuff_in( "Control", actualterm, targetnc, wastebasket=wastebasket ) # Guess ? targetnc = dict( kind="edaoutput", variant="infl", rh=rh, number=1, term=Time(0), ) outnc, _, _ = self._link_stuff_in( "Guess", actualterm, targetnc, targetintent="inout" ) if outnc is None: outnc = self.naming_convention( kind="edaoutput", variant="infl", rh=rh ) # Fix clim ! if fix_curclim and mstate: month = Month((mstate.rh.resource.date + actualterm).ymdh) self.climfile_fixer( rh=rh, convkind="modelclim", month=month, inputrole=("GlobalClim", "InitialClim"), inputkind="clim_model", ) if fix_clclim and mstate: closestmonth = Month( (mstate.rh.resource.date + actualterm).ymdh + ":closest" ) self.climfile_fixer( rh=rh, convkind="closest_modelclim", month=closestmonth, inputrole=("GlobalClim", "InitialClim"), inputkind="clim_model", ) # Deal with useless stuff... SADLY ! useless = [ sec for sec in self.context.sequence.effective_inputs( role="Useless" ) if ( sec.rh.resource.term == actualterm and self._USELESS_MATCH.match( sec.rh.container.localpath() ) ) ] for a_useless in useless: targetname = self._USELESS_MATCH.match( a_useless.rh.container.localpath() ).group("target") if self.system.path.exists(targetname): logger.warning( "Some useless stuff is already here: %s. I don't care...", targetname, ) else: logger.info( "Dealing with useless stuff: %s -> %s", a_useless.rh.container.localpath(), targetname, ) self.system.cp( a_useless.rh.container.localpath(), targetname, fmt=a_useless.rh.container.actualfmt, intent="in", ) wastebasket.append( (targetname, a_useless.rh.container.actualfmt) ) # Standard execution super().execute(rh, opts) # The concatenated listing self.system.cat("NODE.001_01", output="NODE.all") # prepares the next execution if len(eff_terms) > 1: self.system.mkdir(self._RUNSTORE) # Freeze the current output shelf_label = self.system.path.join( self._RUNSTORE, outnc(number=1, term=actualterm) ) self.system.move( outnc(number=1, term=Time(0)), shelf_label, fmt="fa" ) self._outputs_shelf.append(shelf_label) # Some cleaning for afile in wastebasket: self.system.remove(afile[0], fmt=afile[1]) self.system.rmall("ncf927", "dirlst") else: # We should not be here but whatever... some task are poorly written ! super().execute(rh, opts) def postfix(self, rh, opts): """Post-processing cleaning.""" self.system.title("Finalising the execution...") for afile in self._outputs_shelf: logger.info("Output found: %s", self.system.path.basename(afile)) self.system.move(afile, self.system.path.basename(afile), fmt="fa") super().postfix(rh, opts)
[docs] class IFSInflationFactor(IFSEdaEnsembleAbstractAlgo): """Compute an inflation factor based on individual members.""" _footprint = dict( info="Compute an inflation factor based on individual members", attr=dict( kind=dict( values=[ "infl_factor", ], ), ), )
[docs] class IFSInflationFactorLegacy(IFSInflationFactor): """Compute an inflation factor based on individual members. KEPT FOR COMPATIBILITY. DO NOT USE ! """ _footprint = dict( info="Compute an inflation factor based on individual members", attr=dict( kind=dict( values=["infl", "pert"], ), conf=dict( outcast=[ 701, ] ), ), )
[docs] class IFSEnsembleMean(IFSEdaEnsembleAbstractAlgo): """Apply the inflation scheme on a given modelstate.""" _footprint = dict( info="Operations around the background error covariance matrix", attr=dict( kind=dict( values=[ "mean", ], ), ), )
[docs] class IFSCovB(IFSEdaLaggedEnsembleAbstractAlgo): """Operations around the background error covariance matrix.""" _footprint = dict( info="Operations around the background error covariance matrix", attr=dict( kind=dict( values=[ "covb", ], ), hybrid=dict( type=bool, optional=True, default=False, ), ), ) _HYBRID_CLIM_ROLE = "ClimatologicalModelState" @property def actual_totalnumber(self): """The total number of members (times 2 if hybrid...).""" parent_totalnumber = super().actual_totalnumber return parent_totalnumber * 2 if self.hybrid else parent_totalnumber def prepare(self, rh, opts): """Default pre-link for the initial condition file""" super().prepare(rh, opts) # Legacy... for num, sec in enumerate( sorted( self.context.sequence.effective_inputs(role="Rawfiles"), key=self._members_sorting_key, ), start=1, ): repname = sec.rh.container.localpath() radical = repname.split("_")[0] + "_D{:03d}_L{:s}" for filename in self.system.listdir(repname): level = re.search(r"_L(\d+)$", filename) if level is not None: self.system.softlink( self.system.path.join(repname, filename), radical.format(num, level.group(1)), ) # Legacy... for num, sec in enumerate( sorted( self.context.sequence.effective_inputs(role="LaggedEnsemble"), key=self._members_sorting_key, ), start=1, ): repname = sec.rh.container.localpath() radical = repname.split("_")[0] + "_{:03d}" self.system.softlink(repname, radical.format(num)) # Requesting Hybrid computations ? if self.hybrid: hybstuff = self.context.sequence.effective_inputs( role=self._HYBRID_CLIM_ROLE ) hybformat = hybstuff[0].rh.container.actualfmt totalnumber = ( self.actual_nbe * self.actual_nresx if self.padding else self.actual_nbe ) for i, tnum in enumerate( range(totalnumber + 1, 2 * totalnumber + 1) ): innc = self.naming_convention( kind="edainput", variant=self.kind, totalnumber=self.actual_totalnumber, rh=rh, actualfmt=hybformat, ) logger.info( "Soft-Linking %s to %s", hybstuff[i].rh.container.localpath(), innc(number=tnum), ) self.system.softlink( hybstuff[i].rh.container.localpath(), innc(number=tnum) )