"""
A set of AlgoComponents interrogating various databases.
"""
import copy
from bronx.fancies import loggers
from bronx.stdtypes.date import Time
import footprints
from vortex.algo.components import (
AlgoComponent,
AlgoComponentDecoMixin,
Expresso,
BlindRun,
)
from vortex.algo.components import algo_component_deco_mixin_autodoc
from vortex.syntax.stdattrs import a_date
from vortex.tools.systems import ExecutionError
from ..tools.bdap import (
BDAPrequest_actual_command,
BDAPGetError,
BDAPRequestConfigurationError,
)
from ..tools.bdmp import BDMPrequest_actual_command, BDMPGetError
from ..tools.bdcp import BDCPrequest_actual_command, BDCPGetError
from ..tools.bdm import BDMGetError, BDMRequestConfigurationError, BDMError
from ..tools.mars import MarsGetError, callMarsExtract, findMarsExtractCommand
from ..data.obs import ObsMapContent
#: No automatic export
__all__ = []
logger = loggers.getLogger(__name__)
[docs]
class GetBDAPResource(AlgoComponent):
"""Algo component to get BDAP resources considering a BDAP query file."""
_footprint = dict(
info="Algo component to get BDAP files.",
attr=dict(
kind=dict(
values=["get_bdap"],
),
date=a_date,
target_bdap=dict(
default="OPER",
optional=True,
values=["OPER", "INTE"],
),
terms=dict(
info="A forecast term or a list of terms (rangex will be used to expand the string)",
alias=("term",),
),
command=dict(
default="dap3",
optional=True,
values=["dap3", "dap3_dev"],
),
),
)
def execute_single(self, rh, opts):
"""
Launch the BDAP request(s).
The results of each request are stored in a directory local_directory to avoid
files to be overwritten.
"""
# Determine the target BDAP
int_bdap = self.target_bdap == "INTE"
# Look for the input queries
input_queries = self.context.sequence.effective_inputs(
role="Query",
kind="bdap_query",
)
rc_all = True
for input_query in input_queries:
for term in [Time(t) for t in footprints.util.rangex(self.terms)]:
# Launch each input queries in a dedicated file
# (to check that the files do not overwrite each other)
query_file = input_query.rh.container.abspath
local_directory = "_".join(
[query_file, self.date.ymdhms, term.fmtraw]
)
with self.system.cdcontext(local_directory, create=True):
# Determine the command to be launched
actual_command = BDAPrequest_actual_command(
command=self.command,
date=self.date,
term=term,
query=query_file,
int_extraenv=int_bdap,
)
logger.info(
" ".join(["BDAP extract command:", actual_command])
)
logger.info("The %s directive file contains:", query_file)
self.system.cat(query_file, output=False)
# Launch the BDAP request
rc = self.system.spawn(
[
actual_command,
],
shell=True,
output=False,
fatal=False,
)
if not rc:
logger.exception(
"Problem during the BDAP request of %s.", query_file
)
if self.system.path.isfile("DIAG_BDAP"):
raise BDAPRequestConfigurationError
else:
raise BDAPGetError
rc_all = rc_all and rc
if not rc_all:
logger.exception("Problem during the BDAP request.")
return rc_all
[docs]
class GetBDMPResource(AlgoComponent):
"""Algo component to get BDMP resources considering a BDMP query file."""
_footprint = dict(
info="Algo component to get BDMP files.",
attr=dict(
kind=dict(
values=["get_bdmp"],
),
target_bdmp=dict(
default="OPER",
optional=True,
values=["OPER", "INTE", "ARCH"],
),
command=dict(
default="bdmp_lecture",
optional=True,
values=["bdmp_lecture", "bdmp_lecture_pg", "bdmp_lecture_ora"],
),
),
)
def execute_single(self, rh, opts):
"""
Launch the BDMP request(s).
The results of each request are stored in a directory local_directory to avoid
files to be overwritten.
"""
# Look for the input queries
input_queries = self.context.sequence.effective_inputs(
role="Query",
kind="bdmp_query",
)
rc_all = True
for input_query in input_queries:
# Get information on the query file
query_file = input_query.rh.container.abspath
logger.info("The %s directive file contains:", query_file)
self.system.cat(query_file, output=False)
# Construct the name of the temporary directory
local_directory = "_".join([query_file, "extract"])
# Determine the command to be launched
actual_command = BDMPrequest_actual_command(
command=self.command,
query=query_file,
target_bdmp=self.target_bdmp,
)
logger.info(" ".join(["BDMP extract command:", actual_command]))
# Launch the BDMP request
with self.system.cdcontext(local_directory, create=True):
rc = self.system.spawn(
[
actual_command,
],
shell=True,
output=False,
fatal=False,
)
if not rc:
logger.exception(
"Problem during the BDMP request of %s.", query_file
)
raise BDMPGetError
rc_all = rc_all and rc
if not rc_all:
logger.exception("Problem during the BDMP request.")
return rc_all
[docs]
class GetBDCPResource(AlgoComponent):
"""Algo component to get BDCP resources considering a BDCP query file."""
_footprint = dict(
info="Algo component to get BDCP files.",
attr=dict(
kind=dict(
values=["get_bdcp"],
),
target_bdcp=dict(
default="OPER",
optional=True,
values=["OPER"],
),
command=dict(
default="extraction_directives",
optional=True,
values=["extraction_directives"],
),
),
)
def execute_single(self, rh, opts):
"""
Launch the BDCP request(s).
The name of the output and log files are fixed by the AlgoComponent
according to the attributes of each request.
"""
# Look for the input queries
input_queries = self.context.sequence.effective_inputs(
role="Query",
kind="bdcp_query",
)
rc_all = True
for input_query in input_queries:
# Get information on the query file
query_file = input_query.rh.container.abspath
logger.info("The %s directive file contains:", query_file)
self.system.cat(query_file, output=False)
# Construct the name of the output and log files
local_directory = "_".join([query_file, "extract"])
output_file = "extract.out"
output_log = "extract.out.diag"
# Determine the command to be launched
actual_command = BDCPrequest_actual_command(
command=self.command,
query_file=query_file,
output_file=output_file,
)
logger.info(" ".join(["BDMP extract command:", actual_command]))
# Launch the BDCP request
with self.system.cdcontext(local_directory, create=True):
rc = self.system.spawn(
[
actual_command,
],
shell=True,
output=False,
fatal=False,
)
# Cat the log file
logger.info("Content of the log file:")
self.system.cat(output_log, output=False)
if not rc:
logger.exception(
"Problem during the BDCP request of %s.", query_file
)
raise BDCPGetError
rc_all = rc_all and rc
if not rc_all:
logger.exception("Problem during the BDCP request.")
return rc_all
@algo_component_deco_mixin_autodoc
class _GetBDMDecoMixin(AlgoComponentDecoMixin):
"""Class variables and methods usefull for BDM extractions."""
_MIXIN_EXTRA_FOOTPRINTS = [
footprints.Footprint(
attr=dict(
date=a_date,
pwd_file=dict(
default="/usr/local/sopra/neons_pwd",
values=["/usr/local/sopra/neons_pwd"],
optional=True,
),
fatal=dict(
type=bool,
default=False,
values=[True, False],
optional=True,
),
defaut_queryname=dict(
default="vortexdefault_query_name",
doc_visibility=footprints.doc.visibility.GURU,
optional=True,
),
)
)
]
def _verbose_env_export(self, varname, value):
self.env.setvar(varname, value)
logger.info(
"Setting environment variable %s = %s", varname, str(value)
)
def _prepare_commons(self, rh, opts):
"""
Prepare the launch of the script
"""
# Some exports to be done
self._verbose_env_export("PWD_FILE", self.pwd_file)
self._verbose_env_export("DMT_DATE_PIVOT", self.date.ymdhms)
_MIXIN_PREPARE_HOOKS = (_prepare_commons,)
def _spawn_command_options_extend(self, prev):
prev["query"] = self.defaut_queryname
return prev
_MIXIN_CLI_OPTS_EXTEND = (_spawn_command_options_extend,)
def _execute_commons(self, rh, opts):
"""Launch the BDM request(s).
The results of each request are stored in a directory local_directory
to avoid files overwritten by others
"""
# Look for the input queries
input_queries = self._get_input_queries()
# Initialize some variables
rc_all = True
# Loop over the query files
for input_query in input_queries:
# Find out the temporary directory name
query_filename = input_query.rh.container.filename
query_abspath = input_query.rh.container.abspath
loc_dir = self._local_directory(query_filename)
# Launch an execution for each input queries in a dedicated directory
# (to check that the files do not overwrite one another)
with self.system.cdcontext(loc_dir, create=True):
# Make the links needed
self.system.symlink(query_abspath, self.defaut_queryname)
# Cat the query content
logger.info("The %s directive file contains:", query_filename)
self.system.cat(self.defaut_queryname, output=False)
# Launch the BDM request and catch
try:
super(self.mixin_execute_companion(), self).execute(
rh, opts
)
except ExecutionError:
rc_all = False
logger.error(
"Problem during the BDM request of %s.", query_filename
)
if self.fatal:
raise BDMGetError(
"Problem during the BDM request of {}.".format(
query_filename
)
)
# Delete the links
self.system.rm(self.defaut_queryname)
self.system.dir(output=False, fatal=False)
if not rc_all:
logger.error(
"At least one of the BDM request failed. Please check the logs above."
)
_MIXIN_EXECUTE_OVERWRITE = _execute_commons
def _postfix_commons(self, rh, opts):
"""Concatenate the batormap from the different tasks and check if there is no duplicated entries."""
# BATORMAP concatenation
# Determine the name of the batormap produced by the execution in the different directories
input_queries = self._get_input_queries()
local_dir = [
self._local_directory(input_query.rh.container.filename)
for input_query in input_queries
]
temp_files = []
for directory in local_dir:
glob_files = self.system.glob("/".join([directory, "*batormap*"]))
for element in glob_files:
temp_files.append(element)
# Initialize the resulting batormap file
obsmap_filename = "_".join(["OBSMAP", self.date.ymdhms])
content = []
# Check if a batormap is already present in the directory (from previous extract)
if self.system.path.isfile(obsmap_filename):
temp_files.append(obsmap_filename)
# Loop over the directories to concatenate the batormap
for a_file in temp_files:
file_container = footprints.proxy.container(local=a_file)
content_tmp = ObsMapContent()
content_tmp.slurp(file_container)
content.append(content_tmp)
out_content = ObsMapContent()
out_content.merge(unique=True, *content)
out_content.sort()
out_container = footprints.proxy.container(local=obsmap_filename)
out_content.rewrite(out_container)
out_container.close()
logger.info("Content of the global batormap:")
self.system.cat(out_container.filename, output=False)
# Listing concatenation
# Initialize the resulting file
listing_filename = "OULOUTPUT"
# Determine the name of the listing files produced by the execution
listing_files = []
for directory in local_dir:
glob_files = self.system.glob(
"/".join([directory, listing_filename])
)
for element in glob_files:
listing_files.append(element)
# Check if a listing is already present and has to be merged with the other
if self.system.path.isfile(listing_filename):
temp_listing = ".".join([listing_filename, "tmp"])
self.system.mv(listing_filename, temp_listing)
listing_files.append(temp_listing)
# Concatenate the listings
self.system.cat(*listing_files, output=listing_filename)
_MIXIN_POSTFIX_HOOKS = (_postfix_commons,)
[docs]
class GetBDMBufr(Expresso, _GetBDMDecoMixin):
"""Algo component to get BDM resources considering a BDM query file."""
_footprint = dict(
info="Algo component to get BDM BUFR.",
attr=dict(
kind=dict(
values=["get_bdm_bufr"],
),
db_file_bdm=dict(
default="/usr/local/sopra/neons_db_bdm",
values=[
"/usr/local/sopra/neons_db_bdm",
"/usr/local/sopra/neons_db_bdm.archi",
"/usr/local/sopra/neons_db_bdm.intgr",
],
optional=True,
),
extra_env_opt=dict(
values=["RECHERCHE", "OPERATIONNEL", "OPER"],
default="OPER",
optional=True,
),
shlib_path=dict(
default="/usr/local/lib",
optional=True,
),
interpreter=dict(
default="awk",
values=["awk"],
optional=True,
),
),
)
def _local_directory(self, query_filename):
return "_".join(["BUFR", query_filename, self.date.ymdhms])
def _get_input_queries(self):
"""Returns the list of queries to process."""
return self.context.sequence.effective_inputs(
role="Query",
kind="bdm_query",
)
def prepare(self, rh, opts):
"""
Prepare the launch of the script
"""
# Do the standard pre-treatment
super().prepare(rh, opts)
# Some exports to be done
self._verbose_env_export("EXTR_ENV", self.extra_env_opt)
self._verbose_env_export("DB_FILE_BDM", self.db_file_bdm)
self._verbose_env_export(
"SHLIB_PATH", ":".join(["$SHLIB_PATH", self.shlib_path])
)
# Check if query files are present
input_queries = self._get_input_queries()
if len(input_queries) < 1:
logger.exception(
"No query file found for the BDM extraction. Stop."
)
raise BDMRequestConfigurationError(
"No query file found for the BDM extraction"
)
[docs]
class GetBDMOulan(BlindRun, _GetBDMDecoMixin):
"""Algo component to get BDM files using Oulan."""
_footprint = dict(
info="Algo component to get BDM files using Oulan.",
attr=dict(
kind=dict(
values=["get_bdm_oulan"],
),
db_file=dict(
default="/usr/local/sopra/neons_db",
values=["/usr/local/sopra/neons_db"],
optional=True,
),
defaut_queryname=dict(
default="NAMELIST",
),
),
)
def _local_directory(self, query_filename):
return "_".join(["Oulan", query_filename, self.date.ymdhms])
def _get_input_queries(self):
"""Returns the list of namelists to process."""
return self.context.sequence.effective_inputs(
role="NamelistOulan",
kind="namutil",
)
def prepare(self, rh, opts):
"""Prepare the execution of the Oulan extraction binary."""
# Do the standard pre-treatment
super().prepare(rh, opts)
# Export additional variables
self._verbose_env_export("DB_FILE", self.db_file)
# Check if namelists are present
input_namelists = self._get_input_queries()
if len(input_namelists) < 1:
logger.error("No Oulan namelist found. Stop.")
raise BDMError("No Oulan namelist found.")
[docs]
class GetMarsResource(AlgoComponent):
"""AlgoComponent to get Mars resources using a Mars query file"""
_footprint = dict(
info="AlgoComponent to get a Mars resource",
attr=dict(
kind=dict(
values=[
"get_mars",
]
),
date=a_date,
substitutions=dict(
info="A dictionary of values to be substituted",
type=footprints.stdtypes.FPDict,
default=footprints.stdtypes.FPDict(),
optional=True,
),
command=dict(optional=True),
fatal=dict(type=bool, default=True, optional=True),
),
)
def execute(self, rh, opts):
"""
Launch the Mars request(s).
The results of each requests are stored in a directory to avoid
files to be overwritten.
"""
# Look for input queries
input_queries = self.context.sequence.effective_inputs(
role="Query",
kind="mars_query",
)
if len(input_queries) < 1:
logger.exception(
"No query file found for the Mars extraction. Stop."
)
raise MarsGetError("No query file found for the Mars extraction")
rc_all = True
# Find the command to be launched
actual_command = findMarsExtractCommand(
sh=self.system, command=self.command
)
# Prepare the substitutions' dictionnary
dictkeyvalue = copy.deepcopy(self.substitutions)
if self.date is not None:
dictkeyvalue["YYYYMMDDHH"] = self.date.ymdh
dictkeyvalue["YYYYMMDD"] = self.date.ymd
dictkeyvalue["HH"] = self.date.hh
# For each input query, extract the files
for input_query in input_queries:
# Prepare the query file used
query_content = input_query.rh.contents
query_content.setitems(dictkeyvalue)
input_query.rh.save()
# Launch each input queries in a dedicated file
# (to check that the files do not overwrite each other)
query_file_path = input_query.rh.container.abspath
local_directory = "_".join([query_file_path, self.date.ymdhms])
logger.info(
"Here is the content of the query file %s (after substitution):",
query_file_path,
)
self.system.cat(query_file_path, output=False)
with self.system.cdcontext(local_directory, create=True):
# Launch the command
rc = callMarsExtract(
sh=self.system,
query_file=query_file_path,
fatal=self.fatal,
command=actual_command,
)
if not rc:
if self.fatal:
logger.error(
"Problem during the Mars request of %s",
query_file_path,
)
raise MarsGetError
else:
logger.warning(
"Problem during the Mars request of %s",
query_file_path,
)
rc_all = rc_all and rc