Source code for vortex.nwp.algo.forecasts

"""
AlgoComponents dedicated to NWP direct forecasts.
"""

import math
import re
from collections import defaultdict

from bronx.fancies import loggers
from bronx.stdtypes.date import Time, Month, Period
import footprints

from vortex.algo.components import AlgoComponentError, Parallel
from vortex.layout.dataflow import intent
from vortex.syntax.stdattrs import model
from vortex.util.structs import ShellEncoder
from .ifsroot import IFSParallel
from ..tools.drhook import DrHookDecoMixin
from ..syntax.stdattrs import outputid_deco

from typing import Any, Callable, Iterable
from vortex.data.handlers import Handler
from vortex.layout.dataflow import Section


#: No automatic export
__all__ = []

logger = loggers.getLogger(__name__)


[docs] class Forecast(IFSParallel): """Forecast for IFS-like Models.""" _footprint = [ outputid_deco, dict( info="Run a forecast with Arpege/IFS.", attr=dict( kind=dict( values=["forecast", "fc"], remap=dict(forecast="fc") ), hist_terms=dict( info="The list of terms when historical file production is requested.", type=footprints.FPList, optional=True, ), surfhist_terms=dict( info="The list of terms when surface file production is requested.", type=footprints.FPList, optional=True, ), pos_terms=dict( info="The list of terms when post-processed data is requested.", type=footprints.FPList, optional=True, ), s_norm_terms=dict( info="The list of terms when spectal norms should be computed.", type=footprints.FPList, optional=True, ), flyargs=dict( default=("ICMSH", "PF"), ), xpname=dict(default="FCST"), ddhpack=dict( info="After run, gather the DDH output file in directories.", type=bool, optional=True, default=False, doc_zorder=-5, ), ), ), ] @property def realkind(self): return "forecast" def _outputs_configurator(self, bin_rh): return footprints.proxy.ifsoutputs_configurator( model=self.model, cycle=bin_rh.resource.cycle, fcterm_unit=self.fcunit, ) def prepare(self, rh, opts): """Default pre-link for the initial condition file""" super().prepare(rh, opts) ininc = self.naming_convention("ic", rh) analysis = self.setlink( initrole=("InitialCondition", "Analysis"), initname=ininc() ) if analysis: analysis = analysis.pop() thismonth = analysis.rh.resource.date.month # Possibly fix the model clim if self.do_climfile_fixer(rh, convkind="modelclim"): self.climfile_fixer( rh, convkind="modelclim", month=thismonth, inputrole=("GlobalClim", "InitialClim"), inputkind="clim_model", ) # Possibly fix post-processing clim files self.all_localclim_fixer(rh, thismonth) self.grab(analysis, comment="analysis") # File linking for IAU increments # # In the case of a forecast with IAU, the IFS executable # expects to find input increment files (both analysis and # background counterpart) names suffixed according to the # order by which they are to be applied. In practice # input files are not renamed but links with correct names # are created pointing to them instead. Both analysed and # background states are required: to inject analysis # increments over multiple timesteps, the IAU algorithm # must be able to compute a difference between analysis # and background states. # # TODO: Clarify where both regexp keys are coming from guesses = self.context.sequence.effective_inputs( role=re.compile(r"IAU_(Background|Guess)", flags=re.IGNORECASE) ) analyses = self.context.sequence.effective_inputs( role=re.compile(r"IAU_(Analysis|Ic)", flags=re.IGNORECASE) ) def key(s: Section): # Increment files are sorted according to date, then # effective term. return ( s.rh.resource.date, s.rh.resource.date + s.rh.resource.term, ) self._create_ordered_links( bin_handler=rh, sections=analyses, sort_key=key, nameconv_kind="iau_analysis", ) self._create_ordered_links( bin_handler=rh, sections=guesses, sort_key=key, nameconv_kind="iau_background", ) # Promises should be nicely managed by a co-proccess if self.promises: prefixes_set = set() for pr_res in [pr.rh.resource for pr in self.promises]: if pr_res.realkind == "historic": prefixes_set.add("ICMSH") if pr_res.realkind == "gridpoint": prefixes_set.add( "{:s}PF".format( "GRIB" if pr_res.nativefmt == "grib" else "" ) ) self.io_poll_args = tuple(prefixes_set) self.flyput = len(self.io_poll_args) > 0 def _create_ordered_links( self, bin_handler: Handler, sections: Iterable[Section], sort_key: Callable[[Section], Any], nameconv_kind: str, ): """Create links to local files, with ordered names For an iterable of sections objects, this function creates symlinks to the corresponding local files (described by the assocatied "container" object". Link names are suffixed by a number string based on their order after sorting sections by the sort key. Example: ICIAUFCSTBK01, ICIAUFCSTBK02, ICIAUFCSTBK03... """ for i, sec in enumerate(sorted(sections, key=sort_key)): nameconv = self.naming_convention( nameconv_kind, bin_handler, actualfmt=sec.rh.container.actualfmt, ) target = nameconv(number=(i + 1)) link_name = sec.rh.container.localpath() if self.system.path.exists(target): logger.warning( "%s should be linked to %s but %s already exists.", link_name, target, target, ) continue logger.info("Linking %s to %s.", link_name, target) self.grab(sec, comment=nameconv_kind) self.system.softlink(link_name, target) def find_namelists(self, opts=None): """Find any namelists candidates in actual context inputs.""" return [ x.rh for x in self.context.sequence.effective_inputs( role="Namelist", kind="namelist" ) ] def prepare_namelist_delta(self, rh, namcontents, namlocal): nam_updated = super().prepare_namelist_delta(rh, namcontents, namlocal) if namlocal == "fort.4": o_conf = self._outputs_configurator(rh) o_conf.modelstate = self.hist_terms o_conf.surf_modelstate = self.surfhist_terms o_conf.post_processing = self.pos_terms o_conf.spectral_diag = self.s_norm_terms nam_updated_bis = o_conf(namcontents, namlocal) nam_updated = nam_updated or nam_updated_bis return nam_updated def postfix(self, rh, opts): """Find out if any special resources have been produced.""" sh = self.system # Look up for the gridpoint files gp_out = sh.ls("PF{}*".format(self.xpname)) gp_map = defaultdict(list) if gp_out: re_pf = re.compile( r"^PF{}(\w+)\+(\d+(?::\d+)?)$".format(self.xpname) ) for fname in gp_out: match_pf = re_pf.match(fname) if match_pf: gp_map[match_pf.group(1).lower()].append( Time(match_pf.group(2)) ) for k, v in gp_map.items(): v.sort() logger.info( "Gridpoint files found: domain=%s, terms=%s", k, ",".join([str(t) for t in v]), ) if len(gp_map) == 0: logger.info("No gridpoint file was found.") sh.json_dump(gp_map, "gridpoint_map.out", indent=4, cls=ShellEncoder) # Gather DDH in folders if self.ddhpack: ddhmap = dict(DL="dlimited", GL="global", ZO="zonal") for prefix, ddhkind in ddhmap.items(): flist = sh.glob("DHF{}{}+*".format(prefix, self.xpname)) if flist: dest = "ddhpack_{}".format(ddhkind) logger.info("Creating a DDH pack: %s", dest) sh.mkdir(dest) for lfa in flist: sh.mv(lfa, dest, fmt="lfa") super().postfix(rh, opts)
[docs] class LAMForecast(Forecast): """Forecast for IFS-like Limited Area Models.""" _footprint = dict( info="Run a forecast with an Arpege/IFS like Limited Area Model.", attr=dict( kind=dict( values=["lamfc", "lamforecast"], remap=dict(lamforecast="lamfc"), ), ), ) synctool = "atcp.alad" synctpl = "sync-fetch.tpl" def spawn_command_options(self): """Dictionary provided for command line factory.""" return dict( name=(self.xpname + "xxxx")[:4].upper(), timescheme=self.timescheme, timestep=self.timestep, fcterm=self.fcterm, fcunit=self.fcunit, model="aladin", ) def prepare(self, rh, opts): """Default pre-link for boundary conditions files.""" super().prepare(rh, opts) sh = self.system # Check boundaries conditions cplrh = [ x.rh for x in self.context.sequence.effective_inputs( role="BoundaryConditions", kind="boundary" ) ] cplrh.sort(key=lambda rh: rh.resource.date + rh.resource.term) # Ordered pre-linking of boundaring and building ot the synchronization tools firstsync = None sh.header("Check boundaries...") if any([x.is_expected() for x in cplrh]): logger.info("Some boundaries conditions are still expected") self.mksync = True else: logger.info("All boundaries conditions available") self.mksync = False for i, bound in enumerate(cplrh): thisbound = bound.container.localpath() lbcnc = self.naming_convention( "lbc", rh, actualfmt=bound.container.actualfmt ) sh.softlink(thisbound, lbcnc(number=i)) if self.mksync: bound.mkgetpr( pr_getter=self.synctool + ".{:03d}".format(i), ) if firstsync is None: firstsync = self.synctool + ".{:03d}".format(i) # Set up the first synchronization step if firstsync is not None: sh.symlink(firstsync, self.synctool) def postfix(self, rh, opts): """Post forecast information and cleaning.""" sh = self.system if self.mksync: synclog = self.synctool + ".log" if sh.path.exists(synclog): sh.subtitle(synclog) sh.cat(synclog, output=False) super().postfix(rh, opts)
[docs] class DFIForecast(LAMForecast): """OBSOLETE CODE: do not use.""" _footprint = dict( info="Run a forecast with an Arpege/IFS like Limited Area Model (with DFIs).", attr=dict( kind=dict( values=["fcdfi"], ), ), ) def prepare(self, rh, opts): """Pre-link boundary conditions as special DFI files.""" super().prepare(rh, opts) ininc = self.naming_convention("ic", rh) lbcnc = self.naming_convention("lbc", rh, actualfmt="fa") for pseudoterm in (999, 0, 1): self.system.softlink(ininc(), lbcnc(number=pseudoterm))
class FullPos(IFSParallel): """Fullpos for geometries transforms in IFS-like Models. OBSOLETE a/c cy46 (use the 903 configuration / fullpos server instead). """ _abstract = True _footprint = dict( attr=dict( xpname=dict(default="FPOS"), flyput=dict( default=False, values=[False], ), server_run=dict( values=[True, False], ), serversync_method=dict( default="simple_socket", ), serversync_medium=dict( default="cnt3_wait", ), ) ) @property def realkind(self): return "fullpos"
[docs] class FullPosGeo(FullPos): """Fullpos for geometries transforms in IFS-like Models. OBSOLETE a/c cy46 (use the 903 configuration / fullpos server instead). """ _footprint = dict( info="Run a fullpos to interpolate to a new geometry", attr=dict( kind=dict( values=["l2h", "h2l"], ), ), ) _RUNSTORE = "RUNOUT" def _compute_target_name(self, r): return "PF" + re.sub( "^(?:ICMSH)(.*?)(?:INIT)(.*)$", r"\1\2", r.container.localpath() ).format(self.xpname) def execute(self, rh, opts): """Loop on the various initial conditions provided.""" sh = self.system initrh = [ x.rh for x in self.context.sequence.effective_inputs( role=("Analysis", "Guess", "InitialCondition"), kind=( "analysis", "historic", "ic", re.compile("(stp|ana)min"), re.compile("pert"), ), ) ] # is there one (deterministic forecast) or many (ensemble forecast) fullpos to perform ? isMany = len(initrh) > 1 do_fix_input_clim = self.do_climfile_fixer(rh, convkind="modelclim") do_fix_output_clim = self.do_climfile_fixer( rh, convkind="targetclim", area="000" ) ininc = self.naming_convention("ic", rh) infile = ininc() for num, r in enumerate(initrh): str_subtitle = "Fullpos execution on {}".format( r.container.localpath() ) sh.subtitle(str_subtitle) # Set the actual init file if sh.path.exists(infile): if isMany: logger.critical( "Cannot process multiple Historic files if %s exists.", infile, ) else: sh.cp( r.container.localpath(), infile, fmt=r.container.actualfmt, intent=intent.IN, ) # Fix links for climatology files actualmonth = Month(r.resource.date + r.resource.term) startingclim = r.resource.geometry if do_fix_input_clim: self.climfile_fixer( rh, convkind="modelclim", month=actualmonth, geo=startingclim, inputrole=(re.compile("^Clim"), re.compile("Clim$")), inputkind="clim_model", ) if do_fix_output_clim: self.climfile_fixer( rh, convkind="targetclim", month=actualmonth, notgeo=startingclim, inputrole=(re.compile("^Clim"), re.compile("Clim$")), inputkind="clim_model", area="000", ) # Standard execution super().execute(rh, opts) # Find the output filename output_file = [x for x in sh.glob("PF{:s}*+*".format(self.xpname))] if len(output_file) != 1: raise AlgoComponentError("No or multiple output files found.") output_file = output_file[0] # prepares the next execution if isMany: # Set a local storage place sh.mkdir(self._RUNSTORE) # Freeze the current output sh.move( output_file, sh.path.join(self._RUNSTORE, "pfout_{:d}".format(num)), fmt=r.container.actualfmt, ) sh.remove(infile, fmt=r.container.actualfmt) # Cleaning/Log management if not self.server_run: # The only one listing sh.cat("NODE.001_01", output="NODE.all") # Some cleaning sh.rmall("ncf927", "dirlst") else: # Link the output files to new style names sh.cp( output_file, self._compute_target_name(r), fmt=r.container.actualfmt, intent="in", ) # Link the listing to NODE.all sh.cp("NODE.001_01", "NODE.all", intent="in") def postfix(self, rh, opts): """Post processing cleaning.""" sh = self.system initrh = [ x.rh for x in self.context.sequence.effective_inputs( role=("Analysis", "Guess", "InitialCondition"), kind=( "analysis", "historic", "ic", re.compile("(stp|ana)min"), re.compile("pert"), ), ) ] if len(initrh) > 1: for num, r in enumerate(initrh): sh.move( "{:s}/pfout_{:d}".format(self._RUNSTORE, num), self._compute_target_name(r), fmt=r.container.actualfmt, ) super().postfix(rh, opts)
[docs] class FullPosBDAP(FullPos): """Post-processing for IFS-like Models. OBSOLETE a/c cy46 (use the 903 configuration / fullpos server instead). """ _footprint = dict( info="Run a fullpos to post-process raw model outputs", attr=dict( kind=dict(values=["fullpos", "fp"], remap=dict(fp="fullpos")), fcterm=dict( values=[ 0, ], ), outputid=dict( info="The identifier for the encoding of post-processed fields.", optional=True, ), server_run=dict( values=[ False, ], ), ), ) def prepare(self, rh, opts): """Some additional checks.""" if self.system.path.exists("xxt00000000"): raise AlgoComponentError( "There should be no file named xxt00000000 in the working directory" ) super().prepare(rh, opts) def execute(self, rh, opts): """Loop on the various initial conditions provided.""" sh = self.system namrh = [ x.rh for x in self.context.sequence.effective_inputs(kind="namelistfp") ] namxx = [ x.rh for x in self.context.sequence.effective_inputs( role="FullPosSelection", kind="namselect", ) ] initsec = [ x for x in self.context.sequence.effective_inputs( role=("InitialCondition", "ModelState"), kind="historic", ) ] initsec.sort(key=lambda sec: sec.rh.resource.term) do_fix_input_clim = self.do_climfile_fixer(rh, convkind="modelclim") ininc = self.naming_convention("ic", rh) infile = ininc() for sec in initsec: r = sec.rh sh.subtitle("Loop on {:s}".format(r.resource.term.fmthm)) thisdate = r.resource.date + r.resource.term thismonth = thisdate.month logger.info("Fullpos <month:%s>" % thismonth) if do_fix_input_clim: self.climfile_fixer( rh, convkind="modelclim", month=thismonth, geo=r.resource.geometry, inputrole=(re.compile("^Clim"), re.compile("Clim$")), inputkind="clim_model", ) thesenames = self.all_localclim_fixer(rh, thismonth) # Set a local storage place runstore = "RUNOUT" + r.resource.term.fmtraw sh.mkdir(runstore) # Define an input namelist try: namfp = [ x for x in namrh if x.resource.term == r.resource.term ].pop() namfplocal = namfp.container.localpath() if self.outputid is not None: self._set_nam_macro( namfp.contents, namfplocal, "OUTPUTID", self.outputid ) namfp.contents.rewrite(namfp.container) sh.remove("fort.4") sh.symlink(namfplocal, "fort.4") except Exception: logger.critical( "Could not get a fullpos namelist for term %s", r.resource.term, ) raise # Define an selection namelist if namxx: namxt = [ x for x in namxx if x.resource.term == r.resource.term ] if namxt: sh.remove("xxt00000000") sh.symlink( namxt.pop().container.localpath(), "xxt00000000" ) else: logger.critical( "Could not get a selection namelist for term %s", r.resource.term, ) raise AlgoComponentError() else: logger.info("No selection namelist are provided.") # Finally set the actual init file sh.remove(infile) self.grab( sec, comment="Fullpos source (term={:s})".format( r.resource.term.fmthm ), ) sh.softlink(r.container.localpath(), infile) # Standard execution super().execute(rh, opts) # Freeze the current output for posfile in [ x for x in ( sh.glob("PF{:s}*+*".format(self.xpname)) + sh.glob("GRIBPF{:s}*+*".format(self.xpname)) ) ]: rootpos = re.sub("0+$", "", posfile) fmtpos = "grib" if posfile.startswith("GRIB") else "lfi" targetfile = sh.path.join( runstore, rootpos + r.resource.term.fmthm ) targetbase = sh.path.basename(targetfile) # Deal with potential promises expected = [ x for x in self.promises if x.rh.container.localpath() == targetbase ] if expected: logger.info( "Start dealing with promises for: %s.", ", ".join( [x.rh.container.localpath() for x in expected] ), ) if posfile != targetbase: sh.move(posfile, targetbase, fmt=fmtpos) posfile = targetbase for thispromise in expected: thispromise.put(incache=True) sh.move(posfile, targetfile, fmt=fmtpos) for logfile in sh.glob("NODE.*", "std*"): sh.move(logfile, sh.path.join(runstore, logfile)) # Some cleaning sh.rmall("PX{:s}*".format(self.xpname), fmt="lfi") sh.rmall("ncf927", "dirlst") for clim in thesenames: sh.rm(clim) def postfix(self, rh, opts): """Post processing cleaning.""" sh = self.system for fpfile in [ x for x in ( sh.glob("RUNOUT*/PF{:s}*".format(self.xpname)) + sh.glob("RUNOUT*/GRIBPF{:s}*+*".format(self.xpname)) ) if sh.path.isfile(x) ]: sh.move( fpfile, sh.path.basename(fpfile), fmt="grib" if "GRIBPF" in fpfile else "lfi", ) sh.cat("RUNOUT*/NODE.001_01", output="NODE.all") super().postfix(rh, opts)
[docs] class OfflineSurfex(Parallel, DrHookDecoMixin): """Run a forecast with the SURFEX's offline binary.""" _footprint = [ model, dict( info="Run a forecast with the SURFEX's offline binary.", attr=dict( kind=dict( values=[ "offline_forecast", ], ), model=dict( values=[ "surfex", ], ), model_tstep=dict( info="The timestep of the model", type=Period, ), diag_tstep=dict( info="The timestep for writing diagnostics outputs", type=Period, ), fcterm=dict( info="The forecast's term", type=Period, ), forcing_read_interval=dict( info="Read the forcing file every...", type=Period, default=Period("PT12H"), optional=True, ), ), ), ] def valid_executable(self, rh): """Check the executable's resource.""" bmodel = getattr(rh.resource, "model", None) rc = bmodel == "surfex" and rh.resource.realkind == "offline" if not rc: logger.error("Inapropriate binary provided") return rc and super().valid_executable(rh) @staticmethod def _fix_nam_macro(sec, macro, value): """Set a given namelist macro and issue a log message.""" sec.rh.contents.setmacro(macro, value) logger.info("Setup %s macro to %s.", macro, str(value)) def prepare(self, rh, opts): """Setup the appropriate namelist macros.""" self.system.subtitle("Offline SURFEX Settings.") # Find the run/final date ic = self.context.sequence.effective_inputs( role=("InitialConditions", "ModelState", "Analysis") ) if ic: if len(ic) > 1: logger.warning( "Multiple initial conditions, using only the first one..." ) rundate = ic[0].rh.resource.date if hasattr(ic[0].rh.resource, "term"): rundate += ic[0].rh.resource.term finaldate = rundate + self.fcterm finaldate = [ finaldate.year, finaldate.month, finaldate.day, finaldate.hour * 3600 + finaldate.minute * 60 + finaldate.second, ] logger.info("The final date is : %s", str(finaldate)) nbreads = int( math.ceil( (finaldate - rundate).length / self.forcing_read_interval.length ) ) else: logger.warning( "No initial conditions were found. Hope you know what you are doing..." ) finaldate = None # Ok, let's find the namelist namsecs = self.context.sequence.effective_inputs( role=("Namelist", "Namelistsurf") ) for namsec in namsecs: logger.info("Processing: %s", namsec.rh.container.localpath()) self._fix_nam_macro(namsec, "TSTEP", self.model_tstep.length) self._fix_nam_macro( namsec, "TSTEP_OUTPUTS", self.diag_tstep.length ) if finaldate: self._fix_nam_macro(namsec, "FINAL_STOP", finaldate) self._fix_nam_macro(namsec, "NB_READS", nbreads) if namsec.rh.contents.dumps_needs_update: namsec.rh.save() logger.info("Namelist dump: \n%s", namsec.rh.container.read())
class MUSCForecast(Forecast): """Forecast for MUSC single-column model.""" _footprint = dict( info="Run a forecast with a MUSC single-column model.", attr=dict( kind=dict( values=["musc"], ), ), ) def postfix(self, rh, opts): """Post forecast information and cleaning.""" sh = self.system # rename specific output files with hours term on 4 digits for compatibility for fmth formatting fmt = re.compile(r"Out\.(?P<termh>\d{3})\.\d{4}\.lfa$") for f in [ fmt.match(f) for f in sh.listdir() if f.startswith("Out.") and f.endswith(".lfa") ]: sh.rename( f.string, "Out.{:>04}.0000.lfa".format(int(f.group("termh"))) ) super().postfix(rh, opts)