Source code for vortex.nwp.algo.assim

"""
AlgoComponents dedicated to computations related to Data Assimilation systems.
"""

from bronx.fancies import loggers
from bronx.stdtypes.date import Date

from vortex.algo.components import BlindRun
from vortex.syntax.stdattrs import a_date
from .ifsroot import IFSParallel
from ..tools import odb, drhook

#: No automatic export
__all__ = []

logger = loggers.getLogger(__name__)


[docs] class Anamix(IFSParallel): """Merge the surface and atmospheric analyses into a single file""" _footprint = dict( info="Merge surface and atmospheric analyses", attr=dict( kind=dict( values=["anamix"], ), conf=dict( default=701, ), xpname=dict( default="CANS", ), timestep=dict( default=1, ), ), )
[docs] class SstAnalysis(IFSParallel): """SST (Sea Surface Temperature) Analysis""" _footprint = dict( attr=dict( kind=dict( values=["sstana", "sst_ana", "sst_analysis", "c931"], remap=dict(autoremap="first"), ), conf=dict( default=931, ), xpname=dict( default="ANAL", ), timestep=dict( default="1.", ), ) )
[docs] class SeaIceAnalysis(IFSParallel): """Sea Ice Analysis""" _footprint = dict( attr=dict( kind=dict( values=["seaiceana", "seaice_ana", "seaice_analysis", "c932"], remap=dict(autoremap="first"), ), conf=dict( default=932, ), xpname=dict( default="ANAL", ), timestep=dict( default="1.", ), date=dict( type=Date, ), ) ) def find_namelists(self, opts=None): namrh_list = super().find_namelists(opts) if not namrh_list: logger.critical("No namelist was found.") raise ValueError("No namelist was found for seaice analysis") return namrh_list def prepare_namelist_delta(self, rh, namcontents, namlocal): super().prepare_namelist_delta(rh, namcontents, namlocal) self._set_nam_macro(namcontents, namlocal, "IDAT", int(self.date.ymd)) return True
[docs] class Canari(IFSParallel, odb.OdbComponentDecoMixin): """Surface analysis.""" _footprint = dict( info="Surface assimilation based on optimal interpolation", attr=dict( kind=dict( values=["canari"], ), binarysingle=dict( default="basicnwpobsort", ), conf=dict( default=701, ), xpname=dict( default="CANS", ), ), ) def prepare(self, rh, opts): """Get a look at raw observations input files.""" super().prepare(rh, opts) # Looking for input observations obsodb = [ x for x in self.lookupodb() if x.rh.resource.part.startswith("surf") ] if not obsodb: raise ValueError("No surface obsdata for canari") self.odb_date_and_layout_from_sections(obsodb) # Find the unique input ODb database ssurf = obsodb.pop() if obsodb: logger.error("More than one surface obsdata provided") logger.error( "Using : %s / %s", ssurf.rh.resource.layout, ssurf.rh.resource.part, ) for sobs in obsodb: logger.error( "Skip : %s / %s", sobs.rh.resource.layout, sobs.rh.resource.part, ) # Fix paths + generate a global IOASSING file cma_path = self.system.path.abspath(ssurf.rh.container.localpath()) self.odb.fix_db_path(self.virtualdb, cma_path) self.odb.ioassign_gather(cma_path) # Some extra settings self.odb.create_poolmask(self.virtualdb, cma_path) self.odb.shuffle_setup(self.slots, mergedirect=True, ccmadirect=False) self.env.update( ODB_POOLMASKING=1, ODB_PACKING=-1, BASETIME=self.date.ymdh, ) # Fix the input DB intent self.odb_rw_or_overwrite_method(ssurf)
[docs] class Screening(IFSParallel, odb.OdbComponentDecoMixin): """Observation screening.""" _footprint = dict( info="Observations screening.", attr=dict( kind=dict( values=["screening", "screen", "thinning"], remap=dict(autoremap="first"), ), binarysingle=dict( default="basicnwpobsort", ), ioassign=dict( optional=False, ), conf=dict( default=2, ), xpname=dict( default="SCRE", ), ), ) def prepare(self, rh, opts): """Get a look at raw observations input files.""" super().prepare(rh, opts) # Looking for input observations allodb = self.lookupodb() self.odb_date_and_layout_from_sections(allodb) # Perform the pre-merging stuff (this will create the ECMA virtual DB) virtualdb_path = self.odb_merge_if_needed(allodb) # Prepare the CCMA DB ccma_path = self.odb_create_db(layout="CCMA") # Fix paths + generate a global IOASSING file self.odb.fix_db_path(self.virtualdb, virtualdb_path) self.odb.fix_db_path("CCMA", ccma_path) self.odb.ioassign_gather(virtualdb_path, ccma_path) # Some extra settings self.odb.create_poolmask(self.virtualdb, virtualdb_path) self.odb.shuffle_setup(self.slots, mergedirect=True, ccmadirect=True) # Look for extras ODB raw self.odb_handle_raw_dbs() # Fix the input databases intent self.odb_rw_or_overwrite_method(*allodb) # Look for channels namelists and set appropriate links self.setchannels()
class IFSODBCCMA(IFSParallel, odb.OdbComponentDecoMixin): """Specialised IFSODB for CCMA processing""" _abstract = True _footprint = dict( attr=dict( virtualdb=dict( default="ccma", ), binarysingle=dict( default="basicnwpobsort", ), ) ) def prepare(self, rh, opts): """Get a look at raw observations input files.""" super().prepare(rh, opts) sh = self.system # Looking for input observations allodb = self.lookupodb() allccma = [x for x in allodb if x.rh.resource.layout.lower() == "ccma"] if allccma: if len(allccma) > 1: logger.error( "Multiple CCMA databases detected: only the first one is taken into account" ) else: raise ValueError("Missing CCMA input data for " + self.kind) # Set env and IOASSIGN ccma = allccma.pop() ccma_path = sh.path.abspath(ccma.rh.container.localpath()) self.odb_date_and_layout_from_sections( [ ccma, ] ) self.odb.fix_db_path(ccma.rh.resource.layout, ccma_path) self.odb.ioassign_gather(ccma_path) # Fix the input database intent self.odb_rw_or_overwrite_method(ccma) # Look for channels namelists and set appropriate links self.setchannels()
[docs] class Minim(IFSODBCCMA): """Observation minimisation.""" _footprint = dict( info="Minimisation in the assimilation process.", attr=dict( kind=dict( values=["minim", "min", "minimisation"], remap=dict(autoremap="first"), ), conf=dict( default=131, ), xpname=dict( default="MINI", ), ), ) def prepare(self, rh, opts): """Find out if preconditioning eigenvectors are here.""" super().prepare(rh, opts) # Check if a preconditioning EV map is here evmaprh = self.context.sequence.effective_inputs( role=("PreconEVMap", "PreconditionningEVMap"), kind="precevmap" ) if evmaprh: if len(evmaprh) > 1: logger.warning( "Several preconditioning EV maps provided. Using the first one." ) nprec_ev = evmaprh[0].rh.contents.data["evlen"] # If there are preconditioning EV: update the namelist if nprec_ev > 0: for namrh in [ x.rh for x in self.context.sequence.effective_inputs( role="Namelist", kind="namelist", ) ]: namc = namrh.contents try: namc["NAMVAR"].NPCVECS = nprec_ev namc.rewrite(namrh.container) except Exception: logger.critical( "Could not fix NAMVAR in %s", namrh.container.actualpath(), ) raise logger.info( "%d preconditioning EV will by used (NPCVECS=%d).", nprec_ev, nprec_ev, ) else: logger.warning( "A preconditioning EV map was found, " + "but no preconditioning EV are available." ) else: logger.info("No preconditioning EV were found.") def postfix(self, rh, opts): """Find out if any special resources have been produced.""" sh = self.system # Look up for PREConditionning Eigen Vectors prec = sh.ls("MEMINI*") if prec: prec_info = dict(evlen=len(prec)) prec_info["evnum"] = [int(x[6:]) for x in prec] sh.json_dump(prec_info, "precev_map.out", indent=4) super().postfix(rh, opts)
[docs] class Trajectory(IFSODBCCMA): """Observation trajectory.""" _footprint = dict( info="Trajectory in the assimilation process.", attr=dict( kind=dict( values=["traj", "trajectory"], remap=dict(autoremap="first"), ), conf=dict( default=2, ), xpname=dict( default="TRAJ", ), ), )
[docs] class PseudoTrajectory(BlindRun, drhook.DrHookDecoMixin): """Copy a few fields from the Guess file into the Analysis file""" _footprint = dict( attr=dict( kind=dict( values=["pseudotraj", "traj", "trajectory"], remap=dict(autoremap="first"), ), ) )
[docs] class SstGrb2Ascii(BlindRun): """Transform sst grib files from the BDAP into ascii files""" _footprint = dict( info="Binary to change the format of sst BDAP files.", attr=dict( kind=dict( values=["lect_bdap"], ), date=a_date, nlat=dict( default=0, ), nlon=dict( default=0, ), ), ) def prepare(self, rh, opts): """Add namelist delta, prepare the environment and build the arguments needed.""" super().prepare(rh, opts) for namrh in [ x.rh for x in self.context.sequence.effective_inputs( role="Namelist", kind="namelist", ) ]: namc = namrh.contents try: namc.newblock("NAMFILE") namc["NAMFILE"].NBFICH = 1 namc["NAMFILE"]["CCNFICH(1)"] = "GRIB_SST" namc.rewrite(namrh.container) except Exception: logger.critical( "Could not fix NAMFILE in %s", namrh.container.actualpath() ) raise def spawn_command_options(self): """Build the dictionnary to provide arguments to the binary.""" return dict( year=self.date.year, month=self.date.month, day=self.date.day, hour=self.date.hour, lon=self.nlon, lat=self.nlat, )
[docs] class IceNetCDF2Ascii(BlindRun): """Transform ice NetCDF files from the BDPE into ascii files""" _footprint = dict( info="Binary to change the format of ice BDPE files.", attr=dict( kind=dict( values=["ice_nc2ascii"], ), output_file=dict(optional=True, default="ice_concent"), param=dict( optional=True, default="ice_conc", ), ), ) def prepare(self, rh, opts): super().prepare(rh, opts) # Look for the input files list_netcdf = self.context.sequence.effective_inputs( role="NetCDFfiles", kind="observations" ) hn_file = "" hs_file = "" for sect in list_netcdf: part = sect.rh.resource.part filename = sect.rh.container.filename if part == "ice_hn": if hn_file == "": hn_file = filename logger.info( "The input file for the North hemisphere is: %s.", hn_file, ) else: logger.warning( "There was already one file for the North hemisphere. " "The following one, %s, is not used.", filename, ) elif part == "ice_hs": if hs_file == "": hs_file = filename logger.info( "The input file for the South hemisphere is: %s.", hs_file, ) else: logger.warning( "There was already one file for the South hemisphere. " "The following one, %s, is not used.", filename, ) else: logger.warning("The following file is not used: %s.", filename) self.input_file_hn = hn_file self.input_file_hs = hs_file def spawn_command_options(self): """Build the dictionnary to provide arguments to the binary.""" return dict( file_in_hn=self.input_file_hn, file_in_hs=self.input_file_hs, param=self.param, file_out=self.output_file, )