"""
General interest and NWP specific MPI launchers.
"""
import collections
import re
import math
from bronx.fancies import loggers
from bronx.syntax.iterators import interleave
import footprints
from vortex.algo import mpitools
from vortex.syntax.stdattrs import DelayedEnvValue
from vortex.tools.arm import ArmForgeTool
from ..tools.partitioning import setup_partitioning_in_namelist
#: No automatic export
__all__ = []
logger = loggers.getLogger(__name__)
[docs]
class MpiAuto(mpitools.MpiTool):
"""MpiTools that uses mpiauto as a proxy to several MPI implementations"""
_footprint = dict(
attr=dict(
mpiname=dict(
values=[
"mpiauto",
],
),
mpiopts=dict(default=None),
optprefix=dict(default="--"),
optmap=dict(
default=footprints.FPDict(
nn="nn",
nnp="nnp",
openmp="openmp",
np="np",
prefixcommand="prefix-command",
allowodddist="mpi-allow-odd-dist",
)
),
timeoutrestart=dict(
info="The number of attempts made by mpiauto",
optional=True,
default=DelayedEnvValue("MPI_INIT_TIMEOUT_RESTART", 2),
doc_visibility=footprints.doc.visibility.ADVANCED,
doc_zorder=-90,
),
sublauncher=dict(
info="How to actualy launch the MPI program",
values=["srun", "libspecific"],
optional=True,
doc_visibility=footprints.doc.visibility.ADVANCED,
doc_zorder=-90,
),
mpiwrapstd=dict(
values=[
False,
],
),
bindingmethod=dict(
info="How to bind the MPI processes",
values=["vortex", "arch", "launcherspecific"],
optional=True,
doc_visibility=footprints.doc.visibility.ADVANCED,
doc_zorder=-90,
),
mplbased=dict(
info="Is the executable based on MPL?",
type=bool,
optional=True,
default=False,
),
)
)
_envelope_wrapper_tpl = "envelope_wrapper_mpiauto.tpl"
_envelope_rank_var = "MPIAUTORANK"
_needs_mpilib_specific_mpienv = False
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.bindingmethod = "arch" if self.mplbased else "vortex"
def _reshaped_mpiopts(self):
"""Raw list of mpi tool command line options."""
options = super()._reshaped_mpiopts()
options["init-timeout-restart"] = [(self.timeoutrestart,)]
if self.sublauncher == "srun":
options["use-slurm-mpi"] = [()]
elif self.sublauncher == "libspecific":
options["no-use-slurm-mpi"] = [()]
if self.bindingmethod:
for k in [
"{:s}use-{:s}-bind".format(p, t)
for p in ("", "no-")
for t in ("arch", "slurm", "intelmpi", "openmpi")
]:
options.pop(k, None)
if self.bindingmethod == "arch":
options["use-arch-bind"] = [()]
elif (
self.bindingmethod == "launcherspecific"
and self.sublauncher == "srun"
):
options["no-use-arch-bind"] = [()]
options["use-slurm-bind"] = [()]
elif self.bindingmethod == "launcherspecific":
options["no-use-arch-bind"] = [()]
for k in [
"use-{:s}-bind".format(t)
for t in ("slurm", "intelmpi", "openmpi")
]:
options[k] = [()]
elif self.bindingmethod == "vortex":
options["no-use-arch-bind"] = [()]
return options
def _envelope_fix_envelope_bit(self, e_bit, e_desc):
"""Set the envelope fake binary options."""
e_bit.options = {
k: v for k, v in e_desc.items() if k not in ("openmp",)
}
e_bit.options["prefixcommand"] = self._envelope_wrapper_name
if self.binaries:
e_bit.master = self.binaries[0].master
def _set_binaries_hack(self, binaries):
"""Set the list of :class:`MpiBinaryDescription` objects associated with this instance."""
if len(binaries) > 1 and self.bindingmethod not in (
None,
"arch",
"vortex",
):
logger.info(
"The '{:s}' binding method is not working properly with multiple binaries.".format(
self.bindingmethod
)
)
logger.warning("Resetting the binding method to 'vortex'.")
self.bindingmethod = "vortex"
def _set_binaries_envelope_hack(self, binaries):
"""Tweak the envelope after binaries were setup."""
super()._set_binaries_envelope_hack(binaries)
for e_bit in self.envelope:
e_bit.master = binaries[0].master
def _set_envelope(self, value):
"""Set the envelope description."""
super()._set_envelope(value)
if len(self._envelope) > 1 and self.bindingmethod not in (
None,
"arch",
"vortex",
):
logger.info(
"The '{:s}' binding method is not working properly with complex envelopes.".format(
self.bindingmethod
)
)
logger.warning("Resetting the binding method to 'vortex'.")
self.bindingmethod = "vortex"
envelope = property(mpitools.MpiTool._get_envelope, _set_envelope)
def _hook_binary_mpiopts(self, binary, options):
tuned = options.copy()
# Regular MPI tasks count (the usual...)
if "nnp" in options and "nn" in options:
if options["nn"] * options["nnp"] == options["np"]:
# Remove harmful options
del tuned["np"]
tuned.pop("allowodddist", None)
# that's the strange MPI distribution...
else:
tuned["allowodddist"] = (
None # With this, let mpiauto determine its own partitioning
)
else:
msg = "The provided mpiopts are insufficient to build the command line: {!s}".format(
options
)
raise mpitools.MpiException(msg)
return tuned
def _envelope_mkwrapper_todostack(self):
ranksidx = 0
todostack, ranks_bsize = super()._envelope_mkwrapper_todostack()
for bin_obj in self.binaries:
if bin_obj.options:
for mpirank in range(ranksidx, ranksidx + bin_obj.nprocs):
prefix_c = bin_obj.options.get("prefixcommand", None)
if prefix_c:
todostack[mpirank] = (
prefix_c,
[
todostack[mpirank][0],
]
+ todostack[mpirank][1],
todostack[mpirank][2],
)
ranksidx += bin_obj.nprocs
return todostack, ranks_bsize
def _envelope_mkcmdline_extra(self, cmdl):
"""If possible, add an openmp option when the arch binding method is used."""
if self.bindingmethod != "vortex":
openmps = {b.options.get("openmp", None) for b in self.binaries}
if len(openmps) > 1:
if self.bindingmethod is not None:
logger.warning(
"Non-uniform OpenMP threads number... Not specifying anything."
)
else:
openmp = openmps.pop() or 1
cmdl.append(self.optprefix + self.optmap["openmp"])
cmdl.append(str(openmp))
def setup_environment(self, opts):
"""Last minute fixups."""
super().setup_environment(opts)
if self.bindingmethod in ("arch", "vortex"):
# Make sure srun does nothing !
self._logged_env_set("SLURM_CPU_BIND", "none")
def setup(self, opts=None):
"""Ensure that the prefixcommand has the execution rights."""
for bin_obj in self.binaries:
prefix_c = bin_obj.options.get("prefixcommand", None)
if prefix_c is not None:
if self.system.path.exists(prefix_c):
self.system.xperm(prefix_c, force=True)
else:
raise OSError("The prefixcommand do not exists.")
super().setup(opts)
[docs]
class MpiAutoDDT(MpiAuto):
"""
MpiTools that uses mpiauto as a proxy to several MPI implementations
with DDT support.
"""
_footprint = dict(
attr=dict(
mpiname=dict(
values=[
"mpiauto-ddt",
],
),
)
)
_conf_suffix = "-ddt"
def _reshaped_mpiopts(self):
options = super()._reshaped_mpiopts()
if "prefix-mpirun" in options:
raise mpitools.MpiException(
"It is not allowed to start DDT with another "
+ 'prefix_mpirun command defined: "{:s}"'.format(options)
)
armtool = ArmForgeTool(self.ticket)
options["prefix-mpirun"] = [
(
" ".join(
armtool.ddt_prefix_cmd(
sources=self.sources,
workdir=self.system.path.dirname(
self.binaries[0].master
),
)
),
)
]
return options
# Some IFS/Arpege specific things :
def arpifs_obsort_nprocab_binarydeco(cls):
"""Handle usual IFS/Arpege environment tweaking for OBSORT (nproca & nprocb).
Note: This is a class decorator for class somehow based on MpiBinaryDescription
"""
orig_setup_env = getattr(cls, "setup_environment")
def setup_environment(self, opts):
orig_setup_env(self, opts)
self.env.NPROCA = int(self.env.NPROCA or self.nprocs)
self.env.NPROCB = int(
self.env.NPROCB or self.nprocs // self.env.NPROCA
)
logger.info(
"MPI Setup NPROCA=%d and NPROCB=%d",
self.env.NPROCA,
self.env.NPROCB,
)
if hasattr(orig_setup_env, "__doc__"):
setup_environment.__doc__ = orig_setup_env.__doc__
setattr(cls, "setup_environment", setup_environment)
return cls
class _NWPIoServerMixin:
_NWP_IOSERV_PATTERNS = ("io_serv.*.d",)
def _nwp_ioserv_setup_namelist(
self, namcontents, namlocal, total_iotasks, computed_iodist_value=None
):
"""Applying IO Server profile on local namelist ``namlocal`` with contents namcontents."""
if "NAMIO_SERV" in namcontents:
namio = namcontents["NAMIO_SERV"]
else:
namio = namcontents.newblock("NAMIO_SERV")
namio.nproc_io = total_iotasks
if computed_iodist_value is not None:
namio.idistio = computed_iodist_value
if "VORTEX_IOSERVER_METHOD" in self.env:
namio.nio_serv_method = self.env.VORTEX_IOSERVER_METHOD
if "VORTEX_IOSERVER_BUFMAX" in self.env:
namio.nio_serv_buf_maxsize = self.env.VORTEX_IOSERVER_BUFMAX
if "VORTEX_IOSERVER_MLSERVER" in self.env:
namio.nmsg_level_server = self.env.VORTEX_IOSERVER_MLSERVER
if "VORTEX_IOSERVER_MLCLIENT" in self.env:
namio.nmsg_level_client = self.env.VORTEX_IOSERVER_MLCLIENT
if "VORTEX_IOSERVER_PROCESS" in self.env:
namio.nprocess_level = self.env.VORTEX_IOSERVER_PROCESS
if "VORTEX_IOSERVER_PIOMODEL" in self.env:
namio.pioprocr_MDL = self.env.VORTEX_IOSERVER_PIOMODEL
self.system.highlight(
"Parallel io server namelist for {:s}".format(namlocal)
)
print(namio.dumps())
return True
def _nwp_ioserv_iodirs(self):
"""Return an ordered list of directories matching the ``pattern`` attribute."""
found = []
for pattern in self._NWP_IOSERV_PATTERNS:
found.extend(self.system.glob(pattern))
return sorted(found)
def _nwp_ioserv_clean(self):
"""Post-execution cleaning for io server."""
# Old fashion way to make clear that some polling is needed.
self.system.touch("io_poll.todo")
# Get a look inside io server output directories according to its own pattern
ioserv_filelist = set()
ioserv_prefixes = set()
iofile_re = re.compile(
r"((ICMSH|PF|GRIBPF).*\+\d+(?::\d+)?(?:\.sfx)?)(?:\..+)?$"
)
self.system.highlight("Dealing with IO directories")
iodirs = self._nwp_ioserv_iodirs()
if iodirs:
logger.info("List of IO directories: %s", ",".join(iodirs))
f_summary = collections.defaultdict(lambda: [" "] * len(iodirs))
for i, iodir in enumerate(iodirs):
for iofile in self.system.listdir(iodir):
zf = iofile_re.match(iofile)
if zf:
f_summary[zf.group(1)][i] = "+"
ioserv_filelist.add((zf.group(1), zf.group(2)))
ioserv_prefixes.add(zf.group(2))
else:
f_summary[iofile][i] = "?"
max_names_len = max([len(iofile) for iofile in f_summary.keys()])
fmt_names = "{:" + str(max_names_len) + "s}"
logger.info(
"Data location accross the various IOserver directories:\n%s",
"\n".join(
[
(fmt_names + " |{:s}|").format(iofile, "".join(where))
for iofile, where in sorted(f_summary.items())
]
),
)
else:
logger.info("No IO directories were found")
if "GRIBPF" in ioserv_prefixes:
# If GRIB are requested, do not bother with old FA PF files
ioserv_prefixes.discard("PF")
ioserv_filelist = {(f, p) for f, p in ioserv_filelist if p != "PF"}
# Touch the output files
for tgfile, _ in ioserv_filelist:
self.system.touch(tgfile)
# Touch the io_poll.todo.PREFIX
for prefix in ioserv_prefixes:
self.system.touch("io_poll.todo.{:s}".format(prefix))
class _AbstractMpiNWP(mpitools.MpiBinaryBasic, _NWPIoServerMixin):
"""The kind of binaries used in IFS/Arpege."""
_abstract = True
def __init__(self, *kargs, **kwargs):
super().__init__(*kargs, **kwargs)
self._incore_iotasks = None
self._effective_incore_iotasks = None
self._incore_iotasks_fixer = None
self._incore_iodist = None
@property
def incore_iotasks(self):
"""The number of tasks dedicated to the IO server."""
return self._incore_iotasks
@incore_iotasks.setter
def incore_iotasks(self, value):
"""The number of tasks dedicated to the IO server."""
if isinstance(value, str) and value.endswith("%"):
value = math.ceil(self.nprocs * float(value[:-1]) / 100)
self._incore_iotasks = int(value)
self._effective_incore_iotasks = None
@property
def incore_iotasks_fixer(self):
"""Tweak the number of iotasks in order to respect a given constraints."""
return self._incore_iotasks_fixer
@incore_iotasks_fixer.setter
def incore_iotasks_fixer(self, value):
"""Tweak the number of iotasks in order to respect a given constraints."""
if not isinstance(value, str):
raise ValueError("A string is expected")
if value.startswith("nproc_multiple_of_"):
self._incore_iotasks_fixer = (
"nproc_multiple_of",
[int(i) for i in value[18:].split(",")],
)
else:
raise ValueError('The "{:s}" value is incorrect'.format(value))
@property
def effective_incore_iotasks(self):
"""Apply fixers to incore_iotasks and return this value.
e.g. "nproc_multiple_of_15,16,17" ensure that the number of processes
dedicated to computations (i.e. total number of process - IO processes)
is a multiple of 15, 16 or 17.
"""
if self.incore_iotasks is not None:
if self._effective_incore_iotasks is None:
if self.incore_iotasks_fixer is not None:
if self.incore_iotasks_fixer[0] == "nproc_multiple_of":
# Allow for 5% less, or add some tasks
for candidate in interleave(
range(self.incore_iotasks, self.nprocs + 1),
range(
self.incore_iotasks - 1,
int(math.ceil(0.95 * self.incore_iotasks)) - 1,
-1,
),
):
if any(
[
(self.nprocs - candidate) % multiple == 0
for multiple in self.incore_iotasks_fixer[
1
]
]
):
self._effective_incore_iotasks = candidate
break
else:
raise RuntimeError("Unsupported fixer")
if self._effective_incore_iotasks != self.incore_iotasks:
logger.info(
"The number of IO tasks was updated form %d to %d "
+ 'because of the "%s" fixer',
self.incore_iotasks,
self._effective_incore_iotasks,
self.incore_iotasks_fixer[0],
)
else:
self._effective_incore_iotasks = self.incore_iotasks
return self._effective_incore_iotasks
else:
return None
@property
def incore_iodist(self):
"""How to distribute IO server tasks within model tasks."""
return self._incore_iodist
@incore_iodist.setter
def incore_iodist(self, value):
"""How to distribute IO server tasks within model tasks."""
allowed = (
"begining",
"end",
"scattered",
)
if not (isinstance(value, str) and value in allowed):
raise ValueError(
"'{!s}' is not an allowed value ('{:s}')".format(
value, ", ".join(allowed)
)
)
self._incore_iodist = value
def _set_nam_macro(self, namcontents, namlocal, macro, value):
"""Set a namelist macro and log it!"""
namcontents.setmacro(macro, value)
logger.info("Setup macro %s=%s in %s", macro, str(value), namlocal)
def setup_namelist_delta(self, namcontents, namlocal):
"""Applying MPI profile on local namelist ``namlocal`` with contents namcontents."""
namw = False
# List of macros actually used in the namelist
nam_macros = set()
for nam_block in namcontents.values():
nam_macros.update(nam_block.macros())
# The actual number of tasks involved in computations
effective_nprocs = self.nprocs
if self.effective_incore_iotasks is not None:
effective_nprocs -= self.effective_incore_iotasks
# Set up the effective_nprocs related macros
nprocs_macros = ("NPROC", "NBPROC", "NTASKS")
if any([n in nam_macros for n in nprocs_macros]):
for n in nprocs_macros:
self._set_nam_macro(namcontents, namlocal, n, effective_nprocs)
namw = True
if any([n in nam_macros for n in ("NCPROC", "NDPROC")]):
self._set_nam_macro(
namcontents,
namlocal,
"NCPROC",
int(self.env.VORTEX_NPRGPNS or effective_nprocs),
)
self._set_nam_macro(
namcontents,
namlocal,
"NDPROC",
int(self.env.VORTEX_NPRGPEW or 1),
)
namw = True
if "NAMPAR1" in namcontents:
np1 = namcontents["NAMPAR1"]
for nstr in [x for x in ("NSTRIN", "NSTROUT") if x in np1]:
if (
isinstance(np1[nstr], (int, float))
and np1[nstr] > effective_nprocs
):
logger.info(
"Setup %s=%s in NAMPAR1 %s",
nstr,
effective_nprocs,
namlocal,
)
np1[nstr] = effective_nprocs
namw = True
# Deal with partitioning macros
namw_p = setup_partitioning_in_namelist(
namcontents,
effective_nprocs,
self.options.get("openmp", 1),
namlocal,
)
namw = namw or namw_p
# Incore IO tasks
if self.effective_incore_iotasks is not None:
c_iodist = None
if self.incore_iodist is not None:
if self.incore_iodist == "begining":
c_iodist = -1
elif self.incore_iodist == "end":
c_iodist = 0
elif self.incore_iodist == "scattered":
# Ensure that there is at least one task on the first node
c_iodist = min(
self.nprocs // self.effective_incore_iotasks,
self.options.get("nnp", self.nprocs),
)
else:
raise RuntimeError(
"incore_iodist '{!s}' is not supported: check your code".format(
self.incore_iodist
)
)
namw_io = self._nwp_ioserv_setup_namelist(
namcontents,
namlocal,
self.effective_incore_iotasks,
computed_iodist_value=c_iodist,
)
namw = namw or namw_io
return namw
def clean(self, opts=None):
"""Finalise the IO server run."""
super().clean(opts=opts)
if self.incore_iotasks:
self._nwp_ioserv_clean()
class MpiNWP(_AbstractMpiNWP):
"""The kind of binaries used in IFS/Arpege."""
_footprint = dict(
attr=dict(
kind=dict(
values=[
"basicnwp",
]
),
),
)
@arpifs_obsort_nprocab_binarydeco
class MpiNWPObsort(_AbstractMpiNWP):
"""The kind of binaries used in IFS/Arpege when the ODB OBSSORT code needs to be run."""
_footprint = dict(
attr=dict(
kind=dict(
values=[
"basicnwpobsort",
]
),
),
)
[docs]
@arpifs_obsort_nprocab_binarydeco
class MpiObsort(mpitools.MpiBinaryBasic):
"""The kind of binaries used when the ODB OBSSORT code needs to be run."""
_footprint = dict(
attr=dict(
kind=dict(
values=[
"basicobsort",
]
),
),
)
[docs]
class MpiNWPIO(mpitools.MpiBinaryIOServer, _NWPIoServerMixin):
"""Standard IFS/Arpege NWP IO server."""
_footprint = dict(
attr=dict(
kind=dict(
values=[
"nwpioserv",
]
),
iolocation=dict(
values=[-1, 0], default=0, optional=True, type=int
),
)
)
def setup_namelist_delta(self, namcontents, namlocal):
"""Setup the IO Server."""
self._nwp_ioserv_setup_namelist(
namcontents,
namlocal,
self.nprocs,
computed_iodist_value=(-1 if self.iolocation == 0 else None),
)
def clean(self, opts=None):
"""Finalise the IO server run."""
super().clean(opts=opts)
self._nwp_ioserv_clean()