"""
AlgoComponents for the next generation of Fullpos runs (based on the 903
configuration).
"""
import collections
import functools
import math
import re
from itertools import filterfalse
import time
from bronx.compat.functools import cached_property
from bronx.datagrip.namelist import NamelistBlock
from bronx.stdtypes.date import Time, Date
from bronx.fancies import loggers
import footprints
from vortex.algo.components import AlgoComponentError
import vortex.layout.monitor as _lmonitor
from .ifsroot import IFSParallel
from ..syntax.stdattrs import outputid_deco
#: No automatic export
__all__ = []
logger = loggers.getLogger(__name__)
fullpos_server_flypoll_pickle = ".fullpos_server_flypoll"
class FullPosServerFlyPollPersistantState:
"""Persistent storage object for Fullpos's polling method."""
def __init__(self):
self.cursor = collections.defaultdict(functools.partial(Time, -9999))
self.found = collections.defaultdict(list)
def fullpos_server_flypoll(
sh, outputprefix, termfile, directories=(".",), **kwargs
): # @UnusedVariable
"""Check sub-**directories** to determine wether new output files are available or not."""
new = list()
for directory in directories:
with sh.cdcontext(directory, create=True):
if sh.path.exists(fullpos_server_flypoll_pickle):
fpoll_st = sh.pickle_load(fullpos_server_flypoll_pickle)
else:
fpoll_st = FullPosServerFlyPollPersistantState()
try:
if sh.path.exists(termfile):
with open(termfile) as wfh:
rawcursor = wfh.readline().rstrip("\n")
try:
cursor = Time(rawcursor)
except TypeError:
logger.warning(
'Unable to convert "%s" to a Time object',
rawcursor,
)
return new
pre = re.compile(
r"^{:s}\w*\+(\d+(?::\d\d)?)(?:\.\w+)?$".format(
outputprefix
)
)
candidates = [pre.match(f) for f in sh.listdir()]
lnew = list()
for candidate in filterfalse(
lambda c: c is None, candidates
):
if candidate.group(0).endswith(".d"):
continue
ctime = Time(candidate.group(1))
if (
ctime > fpoll_st.cursor[outputprefix]
and ctime <= cursor
):
lnew.append(candidate.group(0))
fpoll_st.cursor[outputprefix] = cursor
fpoll_st.found[outputprefix].extend(lnew)
new.extend(
[
sh.path.normpath(sh.path.join(directory, anew))
for anew in lnew
]
)
finally:
sh.pickle_dump(fpoll_st, fullpos_server_flypoll_pickle)
return new
class FullposServerDiscoveredInputs:
"""Holds all kind of information on input files."""
def __init__(self):
self.inidata = dict()
self.tododata = list()
self.guessdata = list()
self.termscount = collections.defaultdict(int)
self.anyexpected = False
self.inputsminlen = 0
self.firstprefix = None
def actual_suffixlen(self, minlen=None):
"""Find out the required suffixlen."""
if minlen is None:
minlen = self.inputsminlen
return max(minlen, int(math.floor(math.log10(len(self.tododata)))))
[docs]
class FullPosServer(IFSParallel):
"""Fullpos Server for geometry transforms & post-processing in IFS-like Models.
Input/Output files are labelled as follows:
* Let ``INPUTFILE_0`` denote an input file name (the user can choose
whichever name she/he wants provided that the associated input's
section has the "ModelState" role).
* For FA files: the corresponding output file will be
``NPUTFILE_0.domain.out`` where ``domain`` is the domain name chosen
by the user in the namelist. If a Surfex output file is also created
(it depends on the namelist) it will be named ``NPUTFILE_0.domain.sfx.out``
* For GRIB files: the corresponding output file will be
``NPUTFILE_0.domain.grib.out``.
* Some or all output files may be pre-positionned (data generated by the
c903 will be appended on them). The corresponding section's role needs
to be "OutputGuess". The local filename of the section, needs to match
the expected output filename. In our exemple it would be
``NPUTFILE_0.domain.out``.
:note: To use this algocomponent, the c903's server needs to be activated
in the namelist (NFPSERVER != 0).
:note: With the current IFS/Arpege code, in order for the output's polling
to work properly, an ``ECHFP`` whiteness file have to be incremented
by the server, in each of the output directories.
:note: Climatology files are not managed (only few sanity checks are
performed). The user needs to name the input climatology file
consistently with the c903' namelist. For role="InitialClim" sections,
the filename should be something like ``Const.Clim.m[month]``. For
role="TargetClim" sections the filename should be something like
``const.clim.[domain].m[month]`` where ``[domain]`` corresponds to
the name of the output domain (as specified in the namelist file)
and ``[month]`` corresponds to the month being dealt with (on 2 digits).
Interesting features:
* Input files can be expected (for on the fly processing)
* Input files are dealt with in arbitrary order depending on their
availability (useful for ensemble processing).
* Output files can be promised
"""
_INITIALCONDITION_ROLE = re.compile(r"InitialCondition((?:\w+)?)")
_INPUTDATA_ROLE_STR = "ModelState"
_INPUTDATA_ROLE = re.compile(r"ModelState((?:\w+)?)")
_OUTPUTGUESS_ROLE = "OutputGuess"
_MODELSIDE_INPUTPREFIX0 = "ICM"
_MODELSIDE_INPUTPREFIX1 = "SH"
_MODELSIDE_OUTPUTPREFIX = "PF"
_MODELSIDE_OUTPUTPREFIX_GRIB = "GRIBPF"
_MODELSIDE_TERMFILE = "./ECHFP"
_MODELSIDE_OUT_SUFFIXLEN_MIN = 4
_MODELSIDE_IND_SUFFIXLEN_MIN = 4
_MODELSIDE_INE_SUFFIXLEN_MIN = dict(grib=6)
_SERVERSYNC_RAISEONEXIT = False
_SERVERSYNC_RUNONSTARTUP = False
_SERVERSYNC_STOPONEXIT = False
_footprint = [
outputid_deco,
dict(
attr=dict(
kind=dict(
values=[
"fpserver",
],
),
outdirectories=dict(
info="The list of possible output directories.",
type=footprints.stdtypes.FPList,
default=footprints.stdtypes.FPList(
[
".",
]
),
optional=True,
),
append_domain=dict(
info=(
"If defined, the output file for domain append_domain "
+ "will be made a copy of the input file (prior to the "
+ "server run"
),
optional=True,
),
basedate=dict(
info="The run date of the coupling generating process",
type=Date,
optional=True,
),
xpname=dict(default="FPOS"),
conf=dict(
default=903,
),
timestep=dict(
default=1.0,
),
timeout=dict(
type=int,
optional=True,
default=300,
),
refreshtime=dict(
info="How frequently are the expected input files looked for ? (seconds)",
type=int,
optional=True,
default=20,
),
server_run=dict(
# This is a rw attribute: it will be managed internally
values=[True, False]
),
serversync_method=dict(
default="simple_socket",
),
serversync_medium=dict(
default="nextfile_wait",
),
maxpollingthreads=dict(
type=int,
optional=True,
default=8,
),
flypoll=dict(
default="internal",
),
defaultformat=dict(
info="Format for the legacy output files.",
default="fa",
optional=True,
),
)
),
]
@property
def realkind(self):
return "fullpos"
def __init__(self, *args, **kw):
super().__init__(*args, **kw)
self._flyput_mapping_d = dict()
def flyput_outputmapping(self, item):
"""Map an output file to its final name."""
sh = self.system
for out_re, data in self._flyput_mapping_d.items():
m_re = out_re.match(sh.path.basename(item))
if m_re:
return (
sh.path.join(
sh.path.dirname(item),
data[0].format(
m_re.group("fpdom"), m_re.group("suffix")
),
),
data[1],
)
@cached_property
def inputs(self):
"""Retrieve the lists in input sections/ResourceHandlers."""
discovered = FullposServerDiscoveredInputs()
# Initial conditions
inisec = self.context.sequence.effective_inputs(
role=self._INITIALCONDITION_ROLE
)
if inisec:
for s in inisec:
iprefix = (
self._INITIALCONDITION_ROLE.match(
s.alternate if s.role is None else s.role
).group(1)
or self._MODELSIDE_INPUTPREFIX1
)
fprefix = self._MODELSIDE_INPUTPREFIX0 + iprefix
if fprefix in discovered.inidata:
raise AlgoComponentError(
"Only one Initial Condition is allowed."
)
else:
discovered.inidata[fprefix] = s
# Model states
todosec0 = self.context.sequence.effective_inputs(
role=self._INPUTDATA_ROLE
)
todosec1 = collections.defaultdict(list)
discovered.anyexpected = any(
[isec.rh.is_expected() for isec in todosec0]
)
hasterms = all(
[hasattr(isec.rh.resource, "term") for isec in todosec0]
)
# Sort things up (if possible)
if hasterms:
logger.info("Sorting input data based on the actual term.")
todosec0 = sorted(todosec0, key=lambda s: self._actual_term(s.rh))
if todosec0:
for iseq, s in enumerate(todosec0):
rprefix = (
self._INPUTDATA_ROLE.match(
s.alternate if s.role is None else s.role
).group(1)
or self._MODELSIDE_INPUTPREFIX1
)
todosec1[rprefix].append(s)
if iseq == 0:
# Find the "default" prefix and suffix len based on the first section
discovered.firstprefix = rprefix
discovered.inputsminlen = (
self._MODELSIDE_INE_SUFFIXLEN_MIN.get(
s.rh.container.actualfmt,
self._MODELSIDE_IND_SUFFIXLEN_MIN,
)
)
iprefixes = sorted(todosec1.keys())
if len(iprefixes) == 1:
for s in todosec0:
discovered.tododata.append(
{self._MODELSIDE_INPUTPREFIX0 + iprefixes[0]: s}
)
else:
if len({len(secs) for secs in todosec1.values()}) > 1:
raise AlgoComponentError(
"Inconsistent number of input data."
)
for sections in zip(*[iter(todosec1[i]) for i in iprefixes]):
discovered.tododata.append(
{
self._MODELSIDE_INPUTPREFIX0 + k: v
for k, v in zip(iprefixes, sections)
}
)
# Detect the number of terms based on the firstprefix
if hasterms:
for sections in discovered.tododata:
act_term = self._actual_term(
sections[
self._MODELSIDE_INPUTPREFIX0 + discovered.firstprefix
].rh
)
discovered.termscount[act_term] += 1
# Look for guesses of output files
guesses_sec0 = collections.defaultdict(list)
guess_entry = collections.namedtuple(
"guess_entry", ("sdir", "prefix", "domain", "suffix", "sec")
)
for sec in self.context.sequence.effective_inputs(
role=self._OUTPUTGUESS_ROLE
):
s_lpath = sec.rh.container.localpath()
s_match = self._o_algo_re.match(self.system.path.basename(s_lpath))
if s_match:
guesses_sec0[s_match.group("base")].append(
guess_entry(
self.system.path.dirname(s_lpath),
self._o_auto_prefix(
"grib"
if s_match.group("grib")
else self.defaultformat
),
s_match.group("fpdom"),
s_match.group("suffix"),
sec,
)
)
discovered.anyexpected = (
discovered.anyexpected or sec.rh.is_expected()
)
else:
logger.warning(
"Improper name for the following output guess < %s >. Ignoring it.",
s_lpath,
)
# Pair them with input file (based on their name)
for iinput in discovered.tododata:
isec = iinput[
self._MODELSIDE_INPUTPREFIX0 + discovered.firstprefix
]
discovered.guessdata.append(
guesses_sec0.pop(
self.system.path.basename(isec.rh.container.localpath()),
(),
)
)
if guesses_sec0:
logger.warning(
"Some input data were left unsed: < %s >", guesses_sec0
)
logger.info(
"discovered guessdata are: < %s >", discovered.guessdata
)
return discovered
@cached_property
def object_namelists(self):
"""The list of object's namelists."""
namrhs = [
isec.rh
for isec in self.context.sequence.effective_inputs(
role="ObjectNamelist"
)
if isec.rh.resource.realkind == "namelist_fpobject"
]
# Update the object's content
for namrh in namrhs:
namsave = False
if namrh.resource.fp_cmodel is not None:
self._set_nam_macro(
namrh.contents,
namrh.container.localpath(),
"FP_CMODEL",
namrh.resource.fp_cmodel,
)
namsave = True
if namrh.resource.fp_lextern is not None:
self._set_nam_macro(
namrh.contents,
namrh.container.localpath(),
"FP_LEXTERN",
namrh.resource.fp_lextern,
)
namsave = True
if namrh.resource.fp_terms is not None:
if not self.inputs.termscount:
raise AlgoComponentError(
"In this use case, all input data must have a term attribute"
)
active_terms = {Time(t) for t in namrh.resource.fp_terms}
# Generate the list of NFPOSTS
global_i = 0
nfposts = list()
for term, n_term in sorted(self.inputs.termscount.items()):
if term in active_terms:
nfposts.extend(range(global_i, global_i + n_term))
global_i += n_term
# Get the NAMFPC block
try:
nfpc = namrh.contents["NAMFPC"]
except KeyError:
raise AlgoComponentError(
"NAMFPC should be defined in {:s}".format(
namrh.container.localpath()
)
)
# Sanity check
for k in nfpc.keys():
if k.startswith("NFPOSTS"):
raise AlgoComponentError(
"&NAMFPC NFPOSTS*(*) / entries should not be defined in {:s}".format(
namrh.container.localpath()
)
)
# Write NFPOSTS to NAMFPC
nfpc["NFPOSTS(0)"] = -len(nfposts)
for i, v in enumerate(nfposts):
nfpc["NFPOSTS({:d})".format(i + 1)] = -v
logger.info(
"The NAMFPC namelist in %s was updated.",
namrh.container.localpath(),
)
logger.debug(
"The updated NAMFPC namelist in %s is:\n%s",
namrh.container.localpath(),
nfpc,
)
namsave = True
if namsave:
namrh.save()
return namrhs
@cached_property
def xxtmapping(self):
"""A handy dictionary about selection namelists."""
namxxrh = collections.defaultdict(dict)
for isec in self.context.sequence.effective_inputs(
role="FullPosSelection", kind="namselect"
):
dpath = self.system.path.dirname(isec.rh.container.localpath())
namxxrh[dpath][isec.rh.resource.term] = isec.rh
if namxxrh and not self.inputs.termscount:
raise AlgoComponentError(
"In this use case, all input data must have a term attribute"
)
return namxxrh
@cached_property
def _i_fmt(self):
"""The input files format (as expected by the c903)."""
return (
"{:s}"
+ "{:s}+".format(self.xpname)
+ "{:0"
+ str(self.inputs.actual_suffixlen())
+ "d}"
)
@cached_property
def _o_raw_fmt(self):
"""The output files format (as imposed by the c903)."""
return (
"{:s}"
+ "{:s}".format(self.xpname)
+ "{:s}+"
+ "{:0"
+ str(
self.inputs.actual_suffixlen(self._MODELSIDE_OUT_SUFFIXLEN_MIN)
)
+ "d}{:s}"
)
@cached_property
def _o_re_fmt(self):
"""The output files regex (as imposed by the c903)."""
return (
"^{:s}"
+ "{:s}".format(self.xpname)
+ r"(?P<fpdom>\w+)\+"
+ "{:0"
+ str(
self.inputs.actual_suffixlen(self._MODELSIDE_OUT_SUFFIXLEN_MIN)
)
+ r"d}(?P<suffix>(?:\.sfx)?)$"
)
@cached_property
def _o_init_re_fmt(self):
"""The output files regex (as imposed by the c903)."""
return (
"^{:s}"
+ "{:s}".format(self.xpname)
+ r"(?P<fpdom>\w+){:s}(?P<suffix>(?:\.sfx)?)$"
)
@cached_property
def _o_algo_re(self):
"""The regex for any output (as imposed by our AlgoComponent)."""
return re.compile(
r"(?P<base>.+)\.(?P<fpdom>\w+)(?P<suffix>(?:\.sfx)?)(?P<grib>(?:\.grib)?)\.out$"
)
@cached_property
def _o_suffix(self):
"""The FAs output suffix (as imposed by our AlgoComponent)."""
return ".{:s}{:s}.out"
@cached_property
def _o_grb_suffix(self):
"""The GRIBs output suffix (as imposed by our AlgoComponent)."""
return ".{:s}{:s}.grib.out"
def _o_auto_prefix(self, fmt):
"""Return the appropriate output files prefix (as imposed by the c903)."""
return dict(grib=self._MODELSIDE_OUTPUTPREFIX_GRIB).get(
fmt, self._MODELSIDE_OUTPUTPREFIX
)
def _actual_term(self, rhandler):
"""Compute the actual Resource Handler term."""
rterm = rhandler.resource.term
if self.basedate is not None:
rterm += rhandler.resource.date - self.basedate
return rterm
def _add_output_mapping(self, outputs_mapping, i, out_re, out_fname):
"""Add mappings for output file."""
# FA/default file
re_default = out_re.format(self._MODELSIDE_OUTPUTPREFIX, i)
what_default = (out_fname + self._o_suffix, self.defaultformat)
outputs_mapping[re.compile(re_default)] = what_default
# GRIB files
re_grib = out_re.format(self._MODELSIDE_OUTPUTPREFIX_GRIB, i)
what_grib = (out_fname + self._o_grb_suffix, "grib")
outputs_mapping[re.compile(re_grib)] = what_grib
logger.info(
"Output %s mapped as %s. Output %s mapped as %s.",
re_default,
what_default[0],
re_grib,
what_grib[0],
)
def _link_input(self, iprefix, irh, i, inputs_mapping, outputs_mapping):
"""Link an input file and update the mappings dictionaries."""
sourcepath = irh.container.localpath()
inputs_mapping[sourcepath] = self._i_fmt.format(iprefix, i)
self.system.cp(
sourcepath,
inputs_mapping[sourcepath],
intent="in",
fmt=irh.container.actualfmt,
)
logger.info("%s copied as %s.", sourcepath, inputs_mapping[sourcepath])
if iprefix == self._MODELSIDE_INPUTPREFIX0 + self.inputs.firstprefix:
self._add_output_mapping(
outputs_mapping,
i,
self._o_re_fmt,
self.system.path.basename(sourcepath),
)
if self.append_domain:
outputpath = self._o_raw_fmt.format(
self._o_auto_prefix(irh.container.actualfmt),
self.append_domain,
i,
"",
)
if self.outdirectories:
todo = [
self.system.path.join(d, outputpath)
for d in self.outdirectories
]
else:
todo = [
outputpath,
]
for a_outputpath in todo:
self.system.cp(
sourcepath,
a_outputpath,
intent="inout",
fmt=irh.container.actualfmt,
)
logger.info(
"output file prepared: %s copied (rw) to %s.",
sourcepath,
a_outputpath,
)
def _move_output_guess(self, iguess, i):
"""Move the output file guesses to their final location."""
sourcepath = iguess.sec.rh.container.localpath()
destpath = self.system.path.join(
iguess.sdir,
self._o_raw_fmt.format(
iguess.prefix, iguess.domain, i, iguess.suffix
),
)
self.system.mv(
sourcepath, destpath, fmt=iguess.sec.rh.container.actualfmt
)
logger.info("output guess %s was moved to %s.", sourcepath, destpath)
def _link_xxt(self, todorh, i):
"""If necessary, link in the appropriate xxtNNNNNNMM file."""
for sdir, tdict in self.xxtmapping.items():
xxtrh = tdict.get(self._actual_term(todorh), None)
if xxtrh is not None:
xxtsource = self.system.path.relpath(
xxtrh.container.abspath, sdir
)
# The file is expected to follow the xxtDDDDHHMM syntax where DDDD
# is the number of days
days_hours = (i // 24) * 100 + i % 24
xxttarget = "xxt{:06d}00".format(days_hours)
xxttarget = self.system.path.join(sdir, xxttarget)
self.system.symlink(xxtsource, xxttarget)
logger.info("XXT %s linked in as %s.", xxtsource, xxttarget)
def _init_poll_and_move(self, outputs_mapping):
"""Deal with the PF*INIT file."""
sh = self.system
candidates = self.system.glob(
"{:s}{:s}*INIT".format(self._MODELSIDE_OUTPUTPREFIX, self.xpname)
)
outputnames = list()
for thisdata in candidates:
mappeddata = None
for out_re, data in outputs_mapping.items():
m_re = out_re.match(thisdata)
if m_re:
mappeddata = (
sh.path.join(
sh.path.dirname(thisdata),
data[0].format(
m_re.group("fpdom"), m_re.group("suffix")
),
),
data[1],
)
break
if mappeddata is None:
raise AlgoComponentError(
"The mapping failed for {:s}.".format(thisdata)
)
# Already dealt with ?
if not self.system.path.exists(mappeddata[0]):
logger.info(
"Linking <%s> to <%s> (fmt=%s).",
thisdata,
mappeddata[0],
mappeddata[1],
)
outputnames.append(mappeddata[0])
self.system.cp(
thisdata, mappeddata[0], intent="in", fmt=mappeddata[1]
)
return outputnames
def _poll_and_move(self, outputs_mapping):
"""Call **io_poll** and rename available output files."""
sh = self.system
data = self.manual_flypolling()
outputnames = list()
for thisdata in data:
mappeddata = None
for out_re, data in outputs_mapping.items():
m_re = out_re.match(sh.path.basename(thisdata))
if m_re:
mappeddata = (
sh.path.join(
sh.path.dirname(thisdata),
data[0].format(
m_re.group("fpdom"), m_re.group("suffix")
),
),
data[1],
)
break
if mappeddata is None:
raise AlgoComponentError(
"The mapping failed for {:s}.".format(thisdata)
)
logger.info(
"Linking <%s> to <%s> (fmt=%s).",
thisdata,
mappeddata[0],
mappeddata[1],
)
outputnames.append(mappeddata[0])
self.system.cp(
thisdata, mappeddata[0], intent="in", fmt=mappeddata[1]
)
return outputnames
def _deal_with_promises(self, outputs_mapping, pollingcb):
if self.promises:
seen = pollingcb(outputs_mapping)
for afile in seen:
candidates = [
x
for x in self.promises
if x.rh.container.abspath
== self.system.path.abspath(afile)
]
if candidates:
logger.info("The output data is promised <%s>", afile)
bingo = candidates.pop()
bingo.put(incache=True)
def prepare(self, rh, opts):
"""Various sanity checks + namelist tweaking."""
super().prepare(rh, opts)
if self.object_namelists:
self.system.subtitle("Object Namelists customisation")
for o_nam in self.object_namelists:
# a/c cy44: &NAMFPIOS NFPDIGITS=__SUFFIXLEN__, /
self._set_nam_macro(
o_nam.contents,
o_nam.container.localpath(),
"SUFFIXLEN",
self.inputs.actual_suffixlen(
self._MODELSIDE_OUT_SUFFIXLEN_MIN
),
)
if o_nam.contents.dumps_needs_update:
logger.info(
"Rewritting the %s namelists file.",
o_nam.container.actualpath(),
)
o_nam.save()
self.system.subtitle("Dealing with various input files")
# Sanity check over climfiles and geometries
input_geo = {
sec.rh.resource.geometry
for sdict in self.inputs.tododata
for sec in sdict.values()
}
if len(input_geo) == 0:
raise AlgoComponentError("No input data are provided, ...")
elif len(input_geo) > 1:
raise AlgoComponentError(
"Multiple geometries are not allowed for input data."
)
else:
input_geo = input_geo.pop()
input_climgeo = {
x.rh.resource.geometry
for x in self.context.sequence.effective_inputs(
role=("InputClim", "InitialClim")
)
}
if len(input_climgeo) == 0:
logger.info("No input clim provided. Going on without it...")
elif len(input_climgeo) > 1:
raise AlgoComponentError(
"Multiple geometries are not allowed for input climatology."
)
else:
if input_climgeo.pop() != input_geo:
raise AlgoComponentError(
"The input data and input climatology geometries does not match."
)
# Initial Condition geometry sanity check
if self.inputs.inidata and any(
[
sec.rh.resource.geometry != input_geo
for sec in self.inputs.inidata.values()
]
):
raise AlgoComponentError(
"The Initial Condition geometry differs from other input data."
)
# Sanity check on target climatology files
target_climgeos = {
x.rh.resource.geometry
for x in self.context.sequence.effective_inputs(role="TargetClim")
}
if len(target_climgeos) == 0:
logger.info("No target clim are provided. Going on without it...")
# Sanity check on selection namelists
if self.xxtmapping:
for tdict in self.xxtmapping.values():
if {
self._actual_term(sec.rh)
for sdict in self.inputs.tododata
for sec in sdict.values()
} < set(tdict.keys()):
raise AlgoComponentError(
"The list of terms between input data and selection namelists differs"
)
else:
logger.info("No selection namelists detected. That's fine")
# Link in the initial condition file (if necessary)
for iprefix, isec in self.inputs.inidata.items():
i_init = "{:s}{:s}INIT".format(iprefix, self.xpname)
if isec.rh.container.basename != i_init:
self.system.cp(
isec.rh.container.localpath(),
i_init,
intent="in",
fmt=isec.rh.container.actualfmt,
)
logger.info(
"Initial condition file %s copied as %s.",
isec.rh.container.localpath(),
i_init,
)
def find_namelists(self, opts=None):
"""Find any namelists candidates in actual context inputs."""
return [
x.rh
for x in self.context.sequence.effective_inputs(
role="Namelist", kind="namelist"
)
]
def prepare_namelist_delta(self, rh, namcontents, namlocal):
super().prepare_namelist_delta(rh, namcontents, namlocal)
# With cy43: &NAMCT0 CSCRIPT_PPSERVER=__SERVERSYNC_SCRIPT__, /
if self.inputs.anyexpected:
self._set_nam_macro(
namcontents,
namlocal,
"SERVERSYNC_SCRIPT",
self.system.path.join(".", self.serversync_medium),
)
else:
# Do not harass the filesystem...
self._set_nam_macro(
namcontents, namlocal, "SERVERSYNC_SCRIPT", " "
)
# With cy43: &NAMCT0 CFPNCF=__IOPOLL_WHITNESSFILE__, /
self._set_nam_macro(
namcontents,
namlocal,
"IOPOLL_WHITNESSFILE",
self._MODELSIDE_TERMFILE,
)
# With cy43: No matching namelist key
# a/c cy44: &NAMFPIOS NFPDIGITS=__SUFFIXLEN__, /
self._set_nam_macro(
namcontents,
namlocal,
"SUFFIXLEN",
self.inputs.actual_suffixlen(self._MODELSIDE_OUT_SUFFIXLEN_MIN),
)
# No matching namelist yet
self._set_nam_macro(
namcontents,
namlocal,
"INPUT_SUFFIXLEN",
self.inputs.actual_suffixlen(),
)
# With cy43: &NAMCT0 NFRPOS=__INPUTDATALEN__, /
self._set_nam_macro(
namcontents, namlocal, "INPUTDATALEN", -len(self.inputs.tododata)
)
# Auto generate the list of namelists for the various objects
if self.object_namelists:
if (
"NAMFPOBJ" not in namcontents
or len(namcontents["NAMFPOBJ"]) == 0
):
nb_o = NamelistBlock("NAMFPOBJ")
nb_o["NFPOBJ"] = len(self.object_namelists)
for i_nam, nam in enumerate(self.object_namelists):
if nam.resource.fp_conf:
nb_o["NFPCONF({:d})".format(i_nam + 1)] = (
nam.resource.fp_conf
)
nb_o["CNAMELIST({:d})".format(i_nam + 1)] = (
nam.container.localpath()
)
namcontents["NAMFPOBJ"] = nb_o
logger.info(
'The following namelist block has been added to "%s":\n%s',
namlocal,
nb_o.dumps(),
)
else:
logger.warning(
'The NAMFPOBJ namelist in "%s" is not empty. Leaving it as it is',
namlocal,
)
# Just in case FP_CMODEL is defined in the main namelist
if self.outputid is not None and any(
["FP_CMODEL" in nam_b.macros() for nam_b in namcontents.values()]
):
self._set_nam_macro(
namcontents, namlocal, "FP_CMODEL", self.outputid
)
return True
def spawn_pre_dirlisting(self):
"""Print a directory listing just before run."""
super().spawn_pre_dirlisting()
for sdir in self.outdirectories:
self.system.subtitle(
"{:s} : {:s} sub-directory listing (pre-execution)".format(
self.realkind, sdir
)
)
self.system.dir(sdir, output=False, fatal=False)
def spawn_hook(self):
"""Usually a good habit to dump the fort.4 namelist."""
super().spawn_hook()
for o_nam in self.object_namelists:
self.system.subtitle(
"{:s} : dump namelist <{:s}>".format(
self.realkind, o_nam.container.localpath()
)
)
self.system.cat(o_nam.container.localpath(), output=False)
def execute(self, rh, opts):
"""Server still or Normal execution depending on the input sequence."""
sh = self.system
# Input and Output mapping
inputs_mapping = dict()
outputs_mapping = dict()
# Initial condition file ?
if self.inputs.inidata:
for iprefix, isec in self.inputs.inidata.items():
# The initial condition resource may be expected
self.grab(isec)
# Fix potential links and output mappings
sourcepath = isec.rh.container.basename
if (
iprefix
== self._MODELSIDE_INPUTPREFIX0 + self.inputs.firstprefix
):
self._add_output_mapping(
outputs_mapping,
"INIT",
self._o_init_re_fmt,
sourcepath,
)
i_init = "{:s}{:s}INIT".format(iprefix, self.xpname)
if isec.rh.container.basename != i_init:
self.system.cp(
sourcepath,
i_init,
intent="in",
fmt=isec.rh.container.actualfmt,
)
logger.info(
"Initial condition file %s copied as %s.",
isec.rh.container.localpath(),
i_init,
)
else:
if self.inputs.tododata:
# Just in case the INIT file is transformed
fakesource = (
self._MODELSIDE_INPUTPREFIX0
+ self.inputs.firstprefix
+ self.xpname
+ "INIT"
)
self._add_output_mapping(
outputs_mapping, "INIT", self._o_init_re_fmt, fakesource
)
# Initialise the flying stuff
self.flyput = False # Do not use flyput every time...
flyprefixes = set()
for s in self.promises:
lpath = s.rh.container.localpath()
if lpath.endswith(".grib.out"):
flyprefixes.add(self._MODELSIDE_OUTPUTPREFIX_GRIB)
elif lpath.endswith(".out"):
flyprefixes.add(self._MODELSIDE_OUTPUTPREFIX)
self.io_poll_args = tuple(flyprefixes)
self.io_poll_kwargs = dict(directories=tuple(set(self.outdirectories)))
for directory in set(self.outdirectories):
sh.mkdir(directory) # Create possible output directories
if self.flypoll == "internal":
self.io_poll_method = functools.partial(fullpos_server_flypoll, sh)
self.io_poll_kwargs["termfile"] = sh.path.basename(
self._MODELSIDE_TERMFILE
)
self.flymapping = True
self._flyput_mapping_d = outputs_mapping
# Deal with XXT files
if self.xxtmapping:
for i, istuff in enumerate(self.inputs.tododata):
self._link_xxt(
istuff[
self._MODELSIDE_INPUTPREFIX0 + self.inputs.firstprefix
].rh,
i,
)
if self.inputs.anyexpected:
# Some server sync here...
self.server_run = True
self.system.subtitle("Starting computation with server_run=T")
# Process the data in chronological order ?
ordered_processing = self.xxtmapping or any(
[
o_rh.resource.fp_terms is not None
for o_rh in self.object_namelists
]
)
if ordered_processing:
logger.info("Input data will be processed chronologicaly.")
# IO poll settings
self.io_poll_kwargs["nthreads"] = self.maxpollingthreads
# Is there already an Initial Condition file ?
# If so, start the binary...
if self.inputs.inidata:
super().execute(rh, opts)
# Did the server stopped ?
if not self.server_alive():
logger.error("Server initialisation failed.")
return
self._deal_with_promises(
outputs_mapping, self._init_poll_and_move
)
# Setup the InputMonitor
all_entries = set()
metagang = _lmonitor.MetaGang()
cur_term = None
cur_term_gangs = set()
prev_term_gangs = set()
for istuff, iguesses in zip(
self.inputs.tododata, self.inputs.guessdata
):
iinputs = {
_lmonitor.InputMonitorEntry(s) for s in istuff.values()
}
iinputs |= {
_lmonitor.InputMonitorEntry(g.sec) for g in iguesses
}
iterm = self._actual_term(
istuff[
self._MODELSIDE_INPUTPREFIX0 + self.inputs.firstprefix
].rh
)
all_entries.update(iinputs)
bgang = _lmonitor.BasicGang()
bgang.add_member(*iinputs)
igang = _lmonitor.MetaGang()
igang.info = (istuff, iguesses, iterm)
igang.add_member(bgang)
# If needed, wait for the previous terms to complete
if ordered_processing:
if cur_term is not None and cur_term != iterm:
# Detect term's change
prev_term_gangs = cur_term_gangs
cur_term_gangs = set()
if prev_term_gangs:
# Wait for the gangs of the previous terms
igang.add_member(*prev_term_gangs)
# Save things up for the next time
cur_term_gangs.add(igang)
cur_term = iterm
metagang.add_member(igang)
bm = _lmonitor.ManualInputMonitor(
self.context,
all_entries,
caching_freq=self.refreshtime,
)
# Start the InputMonitor
tmout = False
current_i = 0
server_stopped = False
with bm:
while not bm.all_done or len(bm.available) > 0:
# Fetch available inputs and sort them
ibatch = list()
while metagang.has_collectable():
thegang = metagang.pop_collectable()
ibatch.append(thegang.info)
ibatch.sort(
key=lambda item: item[2]
) # Sort according to the term
# Deal with the various available inputs
for istuff, iguesses, iterm in ibatch:
sh.highlight(
"The Fullpos Server is triggered (step={:d})...".format(
current_i
)
)
# Link for the init file (if needed)
if current_i == 0 and not self.inputs.inidata:
for iprefix, isec in istuff.items():
i_init = "{:s}{:s}INIT".format(
iprefix, self.xpname
)
if not sh.path.exists(i_init):
sh.cp(
isec.rh.container.localpath(),
i_init,
intent="in",
fmt=isec.rh.container.actualfmt,
)
logger.info(
"%s copied as %s. For initialisation purposes only.",
isec.rh.container.localpath(),
i_init,
)
super().execute(rh, opts)
# Did the server stopped ?
if not self.server_alive():
logger.error("Server initialisation failed.")
return
self._deal_with_promises(
outputs_mapping, self._init_poll_and_move
)
# Link input files
for iprefix, isec in istuff.items():
self._link_input(
iprefix,
isec.rh,
current_i,
inputs_mapping,
outputs_mapping,
)
for iguess in iguesses:
self._move_output_guess(iguess, current_i)
# Let's go...
super().execute(rh, opts)
self._deal_with_promises(
outputs_mapping, self._poll_and_move
)
current_i += 1
# Did the server stopped ?
if not self.server_alive():
server_stopped = True
if not bm.all_done:
logger.error(
"The server stopped but everything wasn't processed..."
)
break
if server_stopped:
break
if not (bm.all_done or metagang.has_collectable()):
# Timeout ?
tmout = bm.is_timedout(self.timeout)
if tmout:
break
# Wait a little bit :-)
time.sleep(1)
bm.health_check(interval=30)
for failed_file in [
e.section.rh.container.localpath() for e in bm.failed.values()
]:
logger.error(
"We were unable to fetch the following file: %s",
failed_file,
)
if self.fatal:
self.delayed_exception_add(
IOError("Unable to fetch {:s}".format(failed_file)),
traceback=False,
)
if tmout:
raise OSError("The waiting loop timed out")
else:
# Direct Run !
self.server_run = False
self.system.subtitle("Starting computation with server_run=F")
# Link for the inifile (if needed)
if not self.inputs.inidata:
for iprefix, isec in self.inputs.tododata[0].items():
i_init = "{:s}{:s}INIT".format(iprefix, self.xpname)
if not sh.path.exists(i_init):
sh.cp(
isec.rh.container.localpath(),
i_init,
intent="in",
fmt=isec.rh.container.actualfmt,
)
logger.info(
"%s copied as %s. For initialisation purposes only.",
isec.rh.container.localpath(),
i_init,
)
# Create all links well in advance
for i, (iinputs, iguesses) in enumerate(
zip(self.inputs.tododata, self.inputs.guessdata)
):
for iprefix, isec in iinputs.items():
self._link_input(
iprefix, isec.rh, i, inputs_mapping, outputs_mapping
)
for iguess in iguesses:
self._move_output_guess(iguess, i)
# On the fly ?
if self.promises:
self.flyput = True
# Let's roll !
super().execute(rh, opts)
# Map all outputs to destination (using io_poll)
self.io_poll_args = tuple(
[
self._MODELSIDE_OUTPUTPREFIX,
self._MODELSIDE_OUTPUTPREFIX_GRIB,
]
)
self._init_poll_and_move(outputs_mapping)
self._poll_and_move(outputs_mapping)