"""
Resources to handle observations files in various formats.
"""
import re
from collections import namedtuple
import footprints
from bronx.datagrip.varbcheaders import VarbcHeadersFile
from bronx.fancies import loggers
from bronx.syntax.decorators import nicedeco
from vortex.data.flow import GeoFlowResource, FlowResource
from vortex.data.contents import TextContent, AlmostListContent
from vortex.syntax import stdattrs, stddeco
from ..syntax.stdattrs import gvar, GenvKey
#: Automatic export of Observations class
__all__ = [
"Observations",
]
logger = loggers.getLogger(__name__)
@stddeco.namebuilding_insert("style", lambda s: "obs")
@stddeco.namebuilding_insert("stage", lambda s: s.stage)
@stddeco.namebuilding_insert("part", lambda s: s.part)
class Observations(GeoFlowResource):
"""
Abstract observation resource.
"""
_abstract = True
_footprint = dict(
info="Observations file",
attr=dict(
kind=dict(
values=["observations", "obs"],
remap=dict(obs="observations"),
),
part=dict(info="The name of this subset of observations."),
nativefmt=dict(
alias=("format",),
),
stage=dict(
info="The processing stage for this subset of observations."
),
),
)
@property
def realkind(self):
return "observations"
[docs]
class ObsProcessed(Observations):
"""Pre-Processed or Processed observations."""
_footprint = dict(
info="Pre-Processed observations.",
attr=dict(
nativefmt=dict(
values=["ascii", "netcdf", "hdf5"],
),
stage=dict(
values=[
"preprocessing",
],
),
),
)
[docs]
@stddeco.namebuilding_insert("layout", lambda s: s.layout)
class ObsODB(Observations):
"""Observations in ODB format associated to a given stage."""
_footprint = dict(
info="Packed observations (ODB, CCMA, etc.)",
attr=dict(
nativefmt=dict(
values=["odb", "odb/split", "odb/compressed"],
remap={"odb/split": "odb", "odb/compressed": "odb"},
),
layout=dict(
info="The layout of the ODB database.",
optional=True,
default="ecma",
values=[
"ccma",
"ecma",
"ecmascr",
"CCMA",
"ECMA",
"ECMASCR",
"rstbias",
"countryrstrhbias",
"sondetyperstrhbias",
"RSTBIAS",
"COUNTRYRSTRHBIAS",
"SONDETYPERSTRHBIAS",
],
remap=dict(
CCMA="ccma",
ECMA="ecma",
ECMASCR="ecmascr",
RSTBIAS="rstbias",
COUNTRYRSTRHBIAS="countryrstrhbias",
SONDETYPERSTRHBIAS="sondetyperstrhbias",
),
),
stage=dict(
values=[
"void",
"avg",
"average",
"screen",
"screening",
"split",
"build",
"traj",
"min",
"minim",
"complete",
"matchup",
"canari",
"cans",
],
remap=dict(
avg="average",
min="minim",
cans="canari",
split="build",
screen="screening",
),
),
),
)
def olive_basename(self):
"""OLIVE specific naming convention."""
stage_map = dict(
screening="screen", build="split", minim="min", canari="cans"
)
mystage = stage_map.get(self.stage, self.stage)
return "_".join((self.layout, mystage, self.part)) + ".tar"
@property
def _archive_mapping(self):
re_fullmix = re.compile(r"^(?:altitude|mix|full)$")
ecma_map = dict(
void="ecmascr.tar",
screening="odb_screen.tar",
matchup="odb_cpl.tar",
complete="odb_cpl.tar",
)
ecma_prefix = {
("matchup", "arpege"): "BASE/",
("complete", "arpege"): "BASE/",
("matchup", "arome"): "BASE/",
("complete", "arome"): "BASE/",
("screening", "arome"): "./",
}
if self.stage in ecma_map and self.layout == "ecma":
if re_fullmix.match(self.part):
return (ecma_map[self.stage], "extract=all&format=unknown")
elif self.part == "virtual":
return (
ecma_map[self.stage],
"extract={:s}ECMA&format=unknown".format(
ecma_prefix.get((self.stage, self.model), "")
),
)
else:
return (
ecma_map[self.stage],
"extract={:s}ECMA.{:s}&format=unknown".format(
ecma_prefix.get((self.stage, self.model), ""),
self.part,
),
)
elif self.stage == "screening" and self.layout == "ccma":
return ("odb_ccma_screen.tar", "")
elif re_fullmix.match(self.part) and self.stage == "traj":
return ("odb_traj.tar", "")
elif (
re_fullmix.match(self.part)
and self.stage == "minim"
and self.model == "aladin"
):
return ("odb_cpl.tar", "")
elif re_fullmix.match(self.part) and self.stage == "minim":
return ("odb_min.tar", "")
elif self.part in ("ground", "surf") and self.stage in (
"canari",
"surfan",
):
return ("odb_canari.tar", "")
else:
logger.error(
"No archive basename defined for such observations (format=%s, part=%s, stage=%s)",
self.nativefmt,
self.part,
self.stage,
)
return (None, None)
def archive_basename(self):
"""OP ARCHIVE specific naming convention."""
return self._archive_mapping[0]
def archive_urlquery(self):
"""OP ARCHIVE special query for odb case."""
return self._archive_mapping[1]
[docs]
class ObsRaw(Observations):
"""
TODO.
"""
_footprint = dict(
info="Raw observations set",
attr=dict(
nativefmt=dict(
values=["obsoul", "grib", "bufr", "ascii", "netcdf", "hdf5"],
remap=dict(
OBSOUL="obsoul",
GRIB="grib",
BUFR="bufr",
ASCII="ascii",
NETCDF="netcdf",
HDF5="hdf5",
),
),
stage=dict(values=["void", "extract", "raw", "std"]),
olivefmt=dict(
info="The mapping between Vortex and Olive formats names.",
type=footprints.FPDict,
optional=True,
default=footprints.FPDict(
ascii="ascii",
obsoul="obsoul",
grib="obsgrib",
bufr="obsbufr",
netcdf="netcdf",
hdf5="hdf5",
),
doc_visibility=footprints.doc.visibility.GURU,
),
),
)
def olive_basename(self):
"""OLIVE specific naming convention."""
return "_".join(
(
self.olivefmt.get(self.nativefmt, "obsfoo"),
self.stage,
self.part,
)
)
def archive_basename(self):
"""OP ARCHIVE specific naming convention."""
if (
re.match(r"^(?:bufr|obsoul|grib|netcdf|hdf5)$", self.nativefmt)
and self.part != "full"
and self.stage == "void"
):
return ".".join((self.nativefmt, self.part))
elif (
re.match(r"^obsoul$", self.nativefmt)
and self.part == "full"
and self.stage == "void"
):
return "obsoul"
else:
logger.error(
"No archive basename defined for such observations (format=%s, part=%s, stage=%s)",
self.nativefmt,
self.part,
self.stage,
)
[docs]
@stddeco.namebuilding_insert("radical", lambda s: s.kind)
@stddeco.namebuilding_insert(
"src",
lambda s: [
s.part,
],
)
class ObsFlags(FlowResource):
"""Class for observations flags."""
_footprint = dict(
info="Observations flags",
attr=dict(
kind=dict(
values=["obsflag"],
),
nativefmt=dict(
values=["ascii", "txt"],
default="txt",
remap=dict(ascii="txt"),
),
part=dict(),
),
)
@property
def realkind(self):
return "obsflags"
def olive_basename(self):
"""OLIVE specific naming convention."""
return "BDM_CQ"
@nicedeco
def needs_slurp(mtd):
"""Call _actual_slurp before anything happens."""
def new_stuff(self):
if self._do_delayed_slurp is not None:
with self._do_delayed_slurp.iod_context():
self._actual_slurp(self._do_delayed_slurp)
return mtd(self)
return new_stuff
[docs]
class VarBCContent(AlmostListContent):
# The VarBC file is too big: revert to the good old diff
_diffable = False
def __init__(self, **kw):
super().__init__(**kw)
self._parsed_data = None
self._do_delayed_slurp = None
@property
@needs_slurp
def data(self):
"""The internal data encapsulated."""
return self._data
@property
@needs_slurp
def size(self):
"""The internal data size."""
return self._size
@property
def parsed_data(self):
"""The data as a :class:`VarbcFile` object."""
if self._parsed_data is None:
# May fail if Numpy is not installed...
from bronx.datagrip.varbc import VarbcFile
self._parsed_data = VarbcFile(self.data)
return self._parsed_data
def _actual_slurp(self, container):
with container.preferred_decoding(byte=False):
self._size = container.totalsize
self._data.extend(container.readlines())
self._do_delayed_slurp = None
def slurp(self, container):
"""Get data from the ``container``."""
self._do_delayed_slurp = container
with container.preferred_decoding(byte=False):
container.rewind()
self._metadata = VarbcHeadersFile(
[container.readline() for _ in range(3)]
)
[docs]
@stddeco.namebuilding_append(
"src",
lambda s: [
s.stage,
],
)
class VarBC(FlowResource):
"""
VarBC file resource. Contains all the coefficients for the VarBC bias correction scheme.
"""
_footprint = dict(
info="Varbc file (coefficients for the bias correction of observations).",
attr=dict(
kind=dict(values=["varbc"]),
clscontents=dict(
default=VarBCContent,
),
nativefmt=dict(
values=["ascii", "txt"],
default="txt",
remap=dict(ascii="txt"),
),
stage=dict(
optional=True,
values=[
"void",
"merge",
"screen",
"screening",
"minim",
"traj",
],
remap=dict(screen="screening"),
default="void",
),
mixmodel=dict(
optional=True,
default=None,
values=stdattrs.models,
),
),
)
@property
def realkind(self):
return "varbc"
def olive_basename(self):
"""OLIVE specific naming convention."""
olivestage_map = {
"screening": "screen",
}
return (
self.realkind.upper()
+ "."
+ olivestage_map.get(self.stage, self.stage)
)
def archive_basename(self):
"""OP ARCHIVE specific naming convention."""
if self.stage in ("void", "traj"):
bname = "VARBC.cycle"
if self.mixmodel is not None:
bname += "_"
if self.mixmodel.startswith("alad"):
bname = bname + self.mixmodel[:4]
else:
bname = bname + self.mixmodel[:3]
else:
bname = "VARBC." + self.stage
return bname
[docs]
@stddeco.namebuilding_insert("src", lambda s: s.scope)
class BlackList(FlowResource):
"""
TODO.
"""
_footprint = [
gvar,
dict(
info="Blacklist file for observations",
attr=dict(
kind=dict(
values=["blacklist"],
),
gvar=dict(
default="blacklist_[scope]",
values=[
"BLACKLIST_LOC",
"BLACKLIST_DIAP",
"BLACKLIST_LOCAL",
"BLACKLIST_GLOBAL",
],
remap=dict(
BLACKLIST_LOCAL="BLACKLIST_LOC",
BLACKLIST_GLOBAL="BLACKLIST_DIAP",
blacklist_local="BLACKLIST_LOC",
blacklist_global="BLACKLIST_DIAP",
),
),
clscontents=dict(
default=TextContent,
),
nativefmt=dict(values=["txt"], default="txt"),
scope=dict(
values=[
"loc",
"local",
"site",
"global",
"diap",
"diapason",
],
remap=dict(
loc="local",
site="local",
diap="global",
diapason="global",
),
),
),
),
]
@property
def realkind(self):
return "blacklist"
def iga_pathinfo(self):
"""Standard path information for IGA inline cache."""
return dict(model=self.model)
def archive_map(self):
"""OP ARCHIVE specific naming convention."""
return {
"local": "LISTE_LOC",
"global": "LISTE_NOIRE_DIAP",
}
def archive_basename(self):
"""OP ARCHIVE local basename."""
mapd = self.archive_map()
return mapd.get(self.scope, "LISTE_NOIRE_X")
#: A namedtuple of the internal fields of an ObsRef file
ObsRefItem = namedtuple("ObsRefItem", ("data", "fmt", "instr", "date", "time"))
[docs]
class ObsRefContent(TextContent):
"""Content class for refdata resources."""
def append(self, item):
"""Append the specified ``item`` to internal data contents."""
self.data.append(ObsRefItem(*item))
def slurp(self, container):
with container.preferred_decoding(byte=False):
self._data.extend(
[
ObsRefItem(*x.split()[:5])
for x in container
if not x.startswith("#")
]
)
self._size = container.totalsize
@classmethod
def formatted_data(self, item):
"""Return a formatted string."""
return "{:8s} {:8s} {:16s} {:s} {!s}".format(
item.data, item.fmt, item.instr, str(item.date), item.time
)
[docs]
@stddeco.namebuilding_append(
"src",
lambda s: [
s.part,
],
)
class Refdata(FlowResource):
"""
TODO.
"""
_footprint = dict(
info="Refdata file",
attr=dict(
kind=dict(values=["refdata"]),
clscontents=dict(
default=ObsRefContent,
),
nativefmt=dict(
values=["ascii", "txt"], default="txt", remap=dict(ascii="txt")
),
part=dict(optional=True, default="all"),
),
)
@property
def realkind(self):
return "refdata"
def olive_basename(self):
"""OLIVE specific naming convention."""
return self.realkind + "." + self.part
def archive_basename(self):
"""OP ARCHIVE specific naming convention."""
return self.realkind
#: A namedtuple of the internal fields of an ObsMap file
ObsMapItem = namedtuple("ObsMapItem", ("odb", "data", "fmt", "instr"))
[docs]
class ObsMapContent(TextContent):
"""Content class for the *ObsMap* resources.
The :class:`ObsMap` resource provides its *discard* and *only* attributes.
This attribute is a :class:`footprints.stdtypes.FPSet` object thats holds
*odb:data* pairs that will be used to filter/discard some of the lines of
the local resource. The matching is done using regular expressions (however
when *:data* is omitted, ':' is automatically added at the end of the regular
expression).
The *only* attribute is evaluated first (if *only* is not provided or equals
*None*, all ObsMap lines are retained).
Here are some examples:
* ``discard=FPSet(('sev',))`` -> The *sev* ODB database will be discarded
(but the *seviri* database is kept).
* ``discard=FPSet(('radar', 'radar1'))`` -> Both the *radar* and *radar1*
ODB databases will be discarded.
* ``discard=FPSet(('radar1?', ))`` -> Same result as above.
* ``discard=FPSet(('conv:temp', ))`` -> Discard the *temp* data file that
would usualy be inserted in the *conv* database.
* ``discard=FPSet(('conv:temp', ))`` -> Discard the *temp* data file that
would usualy be inserted in the *conv* database.
* ``discard=FPSet(('conv:t[ea]', ))`` -> Discard the data file starting
with *te* or *ta* that would usualy be inserted in the *conv* database.
* ``only=FPSet(('conv',))`` -> Only *conv* ODB database will be used.
"""
def __init__(self, **kw):
kw.setdefault("discarded", set())
kw.setdefault("only", None)
super().__init__(**kw)
@property
def discarded(self):
"""Set of *odb:data* pairs that will be discarded."""
return self._discarded
@property
def only(self):
"""Set of *odb:data* pairs that will be kept (*None* means "keep everything")."""
return self._only
def append(self, item):
"""Append the specified ``item`` to internal data contents."""
self._data.append(ObsMapItem(*item))
def slurp(self, container):
"""Get data from the ``container``."""
if self.only is not None:
ofilters = [
re.compile(d if ":" in d else d + ":") for d in self.only
]
else:
ofilters = None
dfilters = [
re.compile(d if ":" in d else d + ":") for d in self.discarded
]
def item_filter(omline):
om = ":".join([omline.odb, omline.data])
return (
ofilters is None or any([f.match(om) for f in ofilters])
) and not any([f.match(om) for f in dfilters])
with container.preferred_decoding(byte=False):
container.rewind()
self.extend(
filter(
item_filter,
[
ObsMapItem(*x.split())
for x in [line.strip() for line in container]
if x and not x.startswith("#")
],
)
)
self._size = container.totalsize
@classmethod
def formatted_data(self, item):
"""Return a formatted string."""
return "{:12s} {:12s} {:12s} {:s}".format(
item.odb, item.data, item.fmt, item.instr
)
def odbset(self):
"""Return set of odb values."""
return {x.odb for x in self}
def dataset(self):
"""Return set of data values."""
return {x.data for x in self}
def fmtset(self):
"""Return set of format values."""
return {x.fmt for x in self}
def instrset(self):
"""Return set of instrument values."""
return {x.instr for x in self}
def datafmt(self, data):
"""Return format associated to specified ``data``."""
dfmt = [x.fmt for x in self if x.data == data]
try:
return dfmt[0]
except IndexError:
logger.warning('Data "%s" not found in ObsMap contents', data)
def getfmt(self, g, x):
"""
Return format ``part`` of data defined in ``g`` or ``x``.
* ``g`` stands for a guess dictionary.
* ``x`` stands for an extra dictionary.
These naming convention refer to the footprints resolve mechanism.
"""
part = g.get("part", x.get("part", None))
if part is None:
return None
else:
return self.datafmt(part)
[docs]
@stddeco.namebuilding_insert("style", lambda s: "obsmap")
@stddeco.namebuilding_insert("stage", lambda s: [s.scope, s.stage])
class ObsMap(FlowResource):
"""Observation mapping.
Simple ascii table for the description of the mapping of
observations set to ODB bases. The native format is :
odb / data / fmt / instr.
The *discard* attribute is passed directly to the :class:`ObsMapContent`
object in charge of accessing this resource: It is used to discard some
of the lines of the *ObsMap* file (for more details see the
:class:`ObsMapContent` class documentation)
"""
_footprint = [
gvar,
dict(
info="Bator mapping file",
attr=dict(
kind=dict(
values=["obsmap"],
),
clscontents=dict(
default=ObsMapContent,
),
nativefmt=dict(
values=["ascii", "txt"],
default="txt",
remap=dict(ascii="txt"),
),
stage=dict(optional=True, default="void"),
scope=dict(
optional=True,
default="full",
remap=dict(surf="surface"),
),
discard=dict(
info="Discard some lines of the mapping (see the class documentation).",
type=footprints.FPSet,
optional=True,
default=footprints.FPSet(),
),
only=dict(
info="Only retain some lines of the mapping (see the class documentation).",
type=footprints.FPSet,
optional=True,
),
),
),
]
@property
def realkind(self):
return "obsmap"
def contents_args(self):
"""Returns default arguments value to class content constructor."""
return dict(discarded=set(self.discard), only=self.only)
def olive_basename(self):
"""OLIVE specific naming convention."""
return "OBSMAP_" + self.stage
def archive_basename(self):
"""OP ARCHIVE specific naming convention."""
if self.scope.startswith("surf"):
return "BATOR_MAP_" + self.scope[:4].lower()
else:
return "BATOR_MAP"
def genv_basename(self):
"""Genv key naming convention."""
cutoff_map = {"production": "prod"}
if self.gvar is None:
if self.scope == "surface":
gkey = "bator_map_surf"
else:
gkey = "bator_map_" + cutoff_map.get(self.cutoff, self.cutoff)
return GenvKey(gkey)
else:
return self.gvar
[docs]
@stddeco.namebuilding_insert("src", lambda s: s.satbias)
class Bcor(FlowResource):
"""Bias correction parameters."""
_footprint = dict(
info="Bias correction parameters",
attr=dict(
kind=dict(
values=["bcor"],
),
nativefmt=dict(
values=["ascii", "txt"], default="txt", remap=dict(ascii="txt")
),
satbias=dict(
values=["mtop", "metop", "noaa", "ssmi"],
remap=dict(metop="mtop"),
),
),
)
@property
def realkind(self):
return "bcor"
def archive_basename(self):
"""OP ARCHIVE specific naming convention."""
return "bcor_" + self.satbias + ".dat"