Source code for vortex.nwp.algo.coupling

"""
AlgoComponents dedicated to the coupling between NWP models.
"""

import re
import footprints

from bronx.compat.functools import cached_property
from bronx.fancies import loggers
from bronx.stdtypes import date

from .ifsroot import IFSParallel
from ..tools.drhook import DrHookDecoMixin
from vortex.algo.components import AlgoComponentError, BlindRun, Parallel
from vortex.algo.components import (
    AlgoComponentDecoMixin,
    algo_component_deco_mixin_autodoc,
)
from vortex.layout.dataflow import intent
from vortex.tools.grib import EcGribDecoMixin

from .forecasts import FullPos

#: No automatic export
__all__ = []

logger = loggers.getLogger(__name__)


coupling_basedate_fp = footprints.Footprint(
    attr=dict(
        basedate=dict(
            info="The run date of the coupling generating process",
            type=date.Date,
            optional=True,
        )
    )
)


[docs] @algo_component_deco_mixin_autodoc class CouplingBaseDateNamMixin(AlgoComponentDecoMixin): """Add a basedate attribute and make namelist substitution.""" _MIXIN_EXTRA_FOOTPRINTS = (coupling_basedate_fp,) def _prepare_basedate_hook(self, rh, opts): """Update the namelist with date information.""" def set_nam_macro(namrh, macro, value): namrh.contents.setmacro(macro, value) logger.info( "Setup macro %s=%s in %s", macro, str(value), namrh.container.actualpath(), ) for namsec in self.context.sequence.effective_inputs( kind=("namelist",) ): if self.basedate is not None: set_nam_macro(namsec.rh, "YYYY", int(self.basedate.year)) set_nam_macro(namsec.rh, "MM", int(self.basedate.month)) set_nam_macro(namsec.rh, "DD", int(self.basedate.day)) if namsec.rh.contents.dumps_needs_update: namsec.rh.save() _MIXIN_PREPARE_HOOKS = (_prepare_basedate_hook,)
[docs] class Coupling(FullPos): """Coupling for IFS-like LAM Models. OBSOLETE a/c cy46 (use the 903 configuration / fullpos server instead). """ _footprint = [ coupling_basedate_fp, dict( info="Create coupling files for a Limited Area Model.", attr=dict( kind=dict( values=["coupling"], ), ), ), ] @property def realkind(self): return "coupling" def prepare(self, rh, opts): """Default pre-link for namelist file and domain change.""" super().prepare(rh, opts) namsec = self.setlink( initrole="Namelist", initkind="namelist", initname="fort.4" ) for nam in [x.rh for x in namsec if "NAMFPC" in x.rh.contents]: logger.info('Substitute "AREA" to CFPDOM namelist entry') nam.contents["NAMFPC"]["CFPDOM(1)"] = "AREA" nam.save() def execute(self, rh, opts): """Loop on the various initial conditions provided.""" sh = self.system cplsec = self.context.sequence.effective_inputs( role=("InitialCondition", "CouplingSource"), kind=("historic", "analysis"), ) cplsec.sort(key=lambda s: s.rh.resource.term) ininc = self.naming_convention("ic", rh) infile = ininc() isMany = len(cplsec) > 1 outprefix = "PF{:s}AREA".format(self.xpname) cplguess = self.context.sequence.effective_inputs(role="Guess") cplguess.sort(key=lambda s: s.rh.resource.term) guessing = bool(cplguess) cplsurf = self.context.sequence.effective_inputs( role=("SurfaceInitialCondition", "SurfaceCouplingSource") ) cplsurf.sort(key=lambda s: s.rh.resource.term) surfacing = bool(cplsurf) inisurfnc = self.naming_convention("ic", rh, model="surfex") infilesurf = inisurfnc() if surfacing: # Link in the Surfex's PGD sclimnc = self.naming_convention( kind="targetclim", rh=rh, model="surfex" ) self.setlink( initrole=("ClimPGD",), initkind=("pgdfa", "pgdlfi"), initname=sclimnc(area="AREA"), ) for sec in cplsec: r = sec.rh sh.subtitle("Loop on {!s}".format(r.resource)) # First attempt to set actual date as the one of the source model actualdate = r.resource.date + r.resource.term # Expect the coupling source to be there... self.grab(sec, comment="coupling source") # Set the actual init file if sh.path.exists(infile): if isMany: logger.critical( "Cannot process multiple Historic files if %s exists.", infile, ) else: sh.cp( r.container.localpath(), infile, fmt=r.container.actualfmt, intent=intent.IN, ) # If the surface file is needed, set the actual initsurf file if cplsurf: # Expecting the coupling surface source to be there... cplsurf_in = cplsurf.pop(0) self.grab(cplsurf_in, comment="coupling surface source") if sh.path.exists(infilesurf): if isMany: logger.critical( "Cannot process multiple surface historic files if %s exists.", infilesurf, ) else: sh.cp( cplsurf_in.rh.container.localpath(), infilesurf, fmt=cplsurf_in.rh.container.actualfmt, intent=intent.IN, ) elif surfacing: logger.error("No more surface source to loop on for coupling") # The output could be an input as well if cplguess: cplout = cplguess.pop(0) cplpath = cplout.rh.container.localpath() if sh.path.exists(cplpath): actualdateguess = ( cplout.rh.resource.date + cplout.rh.resource.term ) if actualdate == actualdateguess: logger.error( "The guess date, %s, is different from the source date %s, !", actualdateguess.reallynice(), actualdate.reallynice(), ) # Expect the coupling guess to be there... self.grab(cplout, comment="coupling guess") logger.info("Coupling with existing guess <%s>", cplpath) inoutfile = outprefix + "+0000" if cplpath != inoutfile: sh.remove(inoutfile, fmt=cplout.rh.container.actualfmt) sh.move( cplpath, inoutfile, fmt=cplout.rh.container.actualfmt, intent=intent.INOUT, ) else: logger.warning( "Missing guess input for coupling <%s>", cplpath ) elif guessing: logger.error("No more guess to loop on for coupling") # Find out actual monthly climatological resource actualmonth = date.Month(actualdate) self.climfile_fixer( rh, convkind="modelclim", month=actualmonth, inputrole=("GlobalClim", "InitialClim"), inputkind="clim_model", ) self.climfile_fixer( rh, convkind="targetclim", month=actualmonth, inputrole=("LocalClim", "TargetClim"), inputkind="clim_model", area="AREA", ) # Standard execution super().execute(rh, opts) # Set a local appropriate file posfile = [ x for x in sh.glob(outprefix + "+*") if re.match(outprefix + r"\+\d+(?:\:\d+)?(?:\.sfx)?$", x) ] if len(posfile) > 1: logger.critical( "Many " + outprefix + " files, do not know how to adress that" ) posfile = posfile[0] if self.basedate is None: actualterm = r.resource.term else: actualterm = (actualdate - self.basedate).time() actualname = ( re.sub( r"^.+?((?:_\d+)?)(?:\+[:\d]+)?$", r"CPLOUT\1+", r.container.localpath(), ) + actualterm.fmthm ) if isMany: sh.move( sh.path.realpath(posfile), actualname, fmt=r.container.actualfmt, ) if sh.path.exists(posfile): sh.rm(posfile) else: # This is here because of legacy with .sfx files sh.cp( sh.path.realpath(posfile), actualname, fmt=r.container.actualfmt, intent=intent.IN, ) # promises management expected = [ x for x in self.promises if x.rh.container.localpath() == actualname ] if expected: for thispromise in expected: thispromise.put(incache=True) # The only one listing if not self.server_run: sh.cat("NODE.001_01", output="NODE.all") # prepares the next execution if isMany: # Some cleaning sh.rmall("PXFPOS*", fmt=r.container.actualfmt) sh.remove(infile, fmt=r.container.actualfmt) if cplsurf: sh.remove(infilesurf, fmt=r.container.actualfmt) if not self.server_run: sh.rmall("ncf927", "dirlst", "NODE.[0123456789]*", "std*")
[docs] class CouplingLAM(Coupling): """Coupling for LAM to LAM Models (useless beyond cy40). OBSOLETE a/c cy40. """ _footprint = dict( info="Create coupling files for a Limited Area Model (useless beyond cy40).", attr=dict( kind=dict( values=["lamcoupling"], ), ), ) def spawn_command_options(self): """Dictionary provided for command line factory.""" opts = super().spawn_command_options() opts["model"] = "aladin" return opts
[docs] @algo_component_deco_mixin_autodoc class PrepMixin(AlgoComponentDecoMixin): """Coupling/Interpolation of Surfex files.""" _MIXIN_EXTRA_FOOTPRINTS = ( footprints.Footprint( info="Coupling/Interpolation of Surfex files.", attr=dict( kind=dict( values=["prep"], ), underlyingformat=dict( info="The format of input data (as expected by the PREP executable).", values=["fa", "lfi", "netcdf"], optional=True, default="fa", ), underlyingoutputformat=dict( info=( "The format of output data (as expected by the PREP executable)." + "If omited, *underlyingformat* is used." ), values=["fa", "lfi", "netcdf", "txt"], optional=True, ), outputformat=dict( info=( "The format of output data (as expected by the user)." + "If omited, same as input data." ), values=["fa", "lfi", "netcdf", "txt"], optional=True, ), ), ), ) @cached_property def _actual_u_output_format(self): return ( self.underlyingoutputformat if self.underlyingoutputformat is not None else self.underlyingformat ) def _actual_output_format(self, in_format): return ( self.outputformat if self.outputformat is not None else in_format ) @staticmethod def _sfx_fmt_remap(fmt): return dict(netcdf="nc").get(fmt, fmt) @cached_property def _has_sfx_lfi(self): addon_checked = ( "sfx" in self.system.loaded_addons() and "lfi" in self.system.loaded_addons() ) if not addon_checked: raise RuntimeError("The sfx addon is needed... please load it.") return addon_checked def _do_input_format_change(self, section, output_name, output_fmt): (localpath, infmt) = ( section.rh.container.localpath(), section.rh.container.actualfmt, ) self.system.subtitle("Processing inputs/climatologies") if section.rh.container.actualfmt != output_fmt: if infmt == "fa" and output_fmt == "lfi" and self._has_sfx_lfi: if self.system.path.exists(output_name): raise OSError( "The file {!r} already exists.".format(output_name) ) logger.info( "Calling sfxtools' fa2lfi from %s to %s.", localpath, output_name, ) self.system.sfx_fa2lfi(localpath, output_name) else: raise RuntimeError( "Format conversion from {!r} to {!r} is not possible".format( infmt, output_fmt ) ) else: if not self.system.path.exists(output_name): logger.info("Linking %s to %s", localpath, output_name) self.system.cp( localpath, output_name, intent=intent.IN, fmt=infmt ) def _process_outputs(self, binrh, section, output_clim, output_name): (radical, outfmt) = ( self.system.path.splitext(section.rh.container.localpath())[0], self._actual_output_format(section.rh.container.actualfmt), ) finaloutput = "{:s}_interpolated.{:s}".format(radical, outfmt) finallisting = "{:s}_listing".format(radical) self.system.subtitle("Processing outputs") if outfmt != self._actual_u_output_format: # There is a need for a format change if ( outfmt == "fa" and self._actual_u_output_format == "lfi" and self._has_sfx_lfi ): logger.info( "Calling lfitools' faempty from %s to %s.", output_clim, finaloutput, ) self.system.fa_empty(output_clim, finaloutput) logger.info( "Calling sfxtools' lfi2fa from %s to %s.", output_name, finaloutput, ) self.system.sfx_lfi2fa(output_name, finaloutput) finallfi = "{:s}_interpolated.{:s}".format( radical, self._actual_u_output_format ) self.system.mv(output_name, finallfi) else: raise RuntimeError( "Format conversion from {!r} to {!r} is not possible".format( self._actual_u_output_format, outfmt ) ) else: # No format change needed logger.info("Moving %s to %s", output_name, finaloutput) self.system.mv(output_name, finaloutput, fmt=outfmt) # Also rename the listing :-) if binrh.resource.cycle < "cy48t1": try: self.system.mv("LISTING_PREP.txt", finallisting) except OSError: self.system.mv("LISTING_PREP0.txt", finallisting) else: self.system.mv("LISTING_PREP0.txt", finallisting) return finaloutput def _prepare_prep_hook(self, rh, opts): """Default pre-link for namelist file and domain change.""" # Convert the initial clim if needed... iniclim = self.context.sequence.effective_inputs(role=("InitialClim",)) if not (len(iniclim) == 1): raise AlgoComponentError("One Initial clim have to be provided") self._do_input_format_change( iniclim[0], "PGD1." + self._sfx_fmt_remap(self.underlyingformat), self.underlyingformat, ) # Convert the target clim if needed... targetclim = self.context.sequence.effective_inputs( role=("TargetClim",) ) if not (len(targetclim) == 1): raise AlgoComponentError("One Target clim have to be provided") self._do_input_format_change( targetclim[0], "PGD2." + self._sfx_fmt_remap(self._actual_u_output_format), self._actual_u_output_format, ) _MIXIN_PREPARE_HOOKS = (_prepare_prep_hook,) def _spawn_hook_prep_hook(self): """Dump the namelists.""" for namsec in self.context.sequence.effective_inputs( kind=("namelist",) ): self.system.subtitle( "Here is the content of the {:s} namelist".format( namsec.rh.container.actualpath() ) ) namsec.rh.container.cat() _MIXIN_SPAWN_HOOKS = (_spawn_hook_prep_hook,) def _execute_prep_common(self, rh, opts): """Loop on the various initial conditions provided.""" sh = self.system cplsec = self.context.sequence.effective_inputs( role=("InitialCondition", "CouplingSource"), kind=("historic", "analysis"), ) cplsec.sort(key=lambda s: s.rh.resource.term) infile = "PREP1.{:s}".format( self._sfx_fmt_remap(self.underlyingformat) ) outfile = "PREP2.{:s}".format( self._sfx_fmt_remap(self._actual_u_output_format) ) targetclim = self.context.sequence.effective_inputs( role=("TargetClim",) ) targetclim = targetclim[0].rh.container.localpath() for sec in cplsec: r = sec.rh sh.header("Loop on {:s}".format(r.container.localpath())) # Expect the coupling source to be there... self.grab(sec, comment="coupling source") # Set the actual init file if sh.path.exists(infile): logger.critical( "Cannot process input files if %s exists.", infile ) self._do_input_format_change(sec, infile, self.underlyingformat) # Standard execution super(self.mixin_execute_companion(), self).execute(rh, opts) sh.subtitle("Listing after PREP") sh.dir(output=False, fatal=False) # Deal with outputs actualname = self._process_outputs(rh, sec, targetclim, outfile) # promises management expected = [ x for x in self.promises if x.rh.container.localpath() == actualname ] if expected: for thispromise in expected: thispromise.put(incache=True) # Some cleaning sh.rmall("*.des") sh.rmall("PREP1.*") _MIXIN_EXECUTE_OVERWRITE = _execute_prep_common
[docs] class Prep( BlindRun, PrepMixin, CouplingBaseDateNamMixin, DrHookDecoMixin, EcGribDecoMixin, ): """Coupling/Interpolation of Surfex files (non-MPI version).""" pass
[docs] class ParallelPrep( Parallel, PrepMixin, CouplingBaseDateNamMixin, DrHookDecoMixin, EcGribDecoMixin, ): """Coupling/Interpolation of Surfex files (MPI version).""" pass
[docs] class C901(IFSParallel): """Run of C901 configuration.""" _footprint = dict( info="Run C901 configuration", attr=dict( kind=dict( values=[ "c901", ] ), clim=dict(type=bool), xpname=dict(default="a001"), ), ) SPECTRAL_FILE_SH = "ICMSH{prefix}INIT{suffix}" GRIDPOINT_FILE_UA = "ICMUA{prefix}INIT{suffix}" GRIDPOINT_FILE_GG = "ICMGG{prefix}INIT{suffix}" OUTPUT_FILE_NAME = "CN90x{}INIT" OUTPUT_LISTING_NAME = "NODE.001_01" LIST_INPUT_FILES = [ ("SpectralFileSH", SPECTRAL_FILE_SH), ("GridpointFileUA", GRIDPOINT_FILE_UA), ("GridpointFileGG", GRIDPOINT_FILE_GG), ] LIST_CST_INPUT_FILES = [ ("ConstantSpectralFileSH", SPECTRAL_FILE_SH), ("ConstantGridpointFileUA", GRIDPOINT_FILE_UA), ("ConstantGridpointFileGG", GRIDPOINT_FILE_GG), ] @property def realkind(self): return "c901" def sort_files_per_prefix(self, list_types, unique=False): """Function used to sort the files according to their prefix in a given type""" result = dict() for file_role, file_template in list_types: result[file_role] = dict() input_files = self.context.sequence.effective_inputs( role=file_role ) template = file_template.format( prefix=r"(?P<prefix>\S{4})", suffix=r"(?P<suffix>\S*)" ) for file_s in input_files: file_name = file_s.rh.container.filename find_elements = re.search(template, file_name) if find_elements is None: logger.error( "The name of the file %s do not follow the template %s.", file_name, template, ) raise ValueError( "The name of the file do not follow the template." ) else: if find_elements.group("prefix") not in result[file_role]: result[file_role][find_elements.group("prefix")] = ( list() ) else: if unique: logger.error( "Only one file should be present for each type and each suffix." ) raise ValueError( "Only one file should be present for each suffix." ) result[file_role][find_elements.group("prefix")].append( file_s ) if result[file_role]: for file_prefix in result[file_role]: result[file_role][file_prefix].sort( key=lambda s: s.rh.resource.date + s.rh.resource.term ) else: del result[file_role] return result def execute(self, rh, opts): """Loop on the various files provided""" sh = self.system # Create the template for files to be removed at each validity date and for the outputname deleted_spectral_file_SH = self.SPECTRAL_FILE_SH.format( prefix="*", suffix="" ) deleted_gridpoint_file_UA = self.GRIDPOINT_FILE_UA.format( prefix="*", suffix="" ) deleted_gridpoint_file_GG = self.GRIDPOINT_FILE_GG.format( prefix="*", suffix="" ) output_name = self.OUTPUT_FILE_NAME.format(self.xpname.upper()) # Sort input files sorted_cst_input_files = self.sort_files_per_prefix( self.LIST_CST_INPUT_FILES, unique=True ) sorted_input_files = self.sort_files_per_prefix(self.LIST_INPUT_FILES) # Determine the validity present for each non constant input files, # check that they are the same for all. # Also create the list of the filenames that should be deleted input_validity = list() for file_role in sorted_input_files: for file_prefix in sorted_input_files[file_role]: input_validity.append( [ s.rh.resource.date + s.rh.resource.term for s in sorted_input_files[file_role][file_prefix] ] ) test_wrong_input_validity = True for i in range(1, len(input_validity)): test_wrong_input_validity = test_wrong_input_validity and ( input_validity[0] == input_validity[i] ) self.algoassert( test_wrong_input_validity, "The files of each type must have the same validity dates.", ) # Modify namelist input_namelist = self.context.sequence.effective_inputs( role="Namelist", kind="namelist" ) for namelist in input_namelist: namcontents = namelist.rh.contents self._set_nam_macro( namcontents, namelist.rh.container.actualpath(), "LLCLIM", self.clim, ) if namcontents.dumps_needs_update: namcontents.rewrite(namelist.rh.container) for current_validity in input_validity[0]: # Deal with constant input files (gridpoint and spectral) for file_role, file_template in self.LIST_CST_INPUT_FILES: if file_role in sorted_cst_input_files: for file_prefix in sorted_cst_input_files[file_role]: file_name = file_template.format( prefix=file_prefix, suffix="" ) current_file_input = sorted_cst_input_files[file_role][ file_prefix ][0] self.algoassert( not sh.path.exists(file_name), "The file {} already exists. It should not.".format( file_name ), ) sh.cp( current_file_input.rh.container.iotarget(), file_name, intent="in", ) # Deal with other input files (gridpoint and spectral) for file_role, file_template in self.LIST_INPUT_FILES: if file_role in sorted_input_files: for file_prefix in sorted_input_files[file_role]: file_name = file_template.format( prefix=file_prefix, suffix="" ) current_file_input = sorted_input_files[file_role][ file_prefix ].pop() self.algoassert( not sh.path.exists(file_name), "The file {} already exists. It should not.".format( file_name ), ) sh.cp( current_file_input.rh.container.iotarget(), file_name, intent="in", ) if self.clim: # Find the right climatology file current_month = date.Month(current_validity) self.climfile_fixer( rh, convkind="modelclim", month=current_month, inputrole=("GlobalClim", "InitialClim"), inputkind="clim_model", ) # Standard execution super().execute(rh, opts) # Move the output file current_term = current_file_input.rh.resource.term sh.move( output_name, output_name + "+{}".format(current_term.fmthm) ) # Cat all the listings into a single one sh.cat(self.OUTPUT_LISTING_NAME, output="NODE.all") # Remove unneeded files sh.rmall( deleted_spectral_file_SH, deleted_gridpoint_file_GG, deleted_gridpoint_file_UA, "std*", self.OUTPUT_LISTING_NAME, )
[docs] class DomeoForcingAtmo(BlindRun, CouplingBaseDateNamMixin): """Correct the Domeo forcing file.""" _footprint = dict( info="Domeo Forcing Atmo", attr=dict( kind=dict( values=["domeo_forcing"], ), basedate=dict( optional=False, ), ), )