"""
:class:`Handler` class is a cornerstone in any Vortex script. :class:`Handler`
objects are in charge of manipulating data between the working directory and
the various caches or archives".
"""
import functools
import importlib
import re
import sys
import bronx.fancies.dump
from bronx.fancies import loggers
from bronx.patterns import observer
from bronx.stdtypes.history import History
import footprints
from vortex import sessions
from vortex.tools import net
from vortex.util import config
from vortex.layout import contexts, dataflow
from vortex.data import containers, resources, providers
#: No automatic export
__all__ = []
logger = loggers.getLogger(__name__)
OBSERVER_TAG = "Resources-Handlers"
class HandlerError(RuntimeError):
"""Exception in case of missing resource during the wait mechanism."""
pass
def observer_board(obsname=None):
"""Proxy to :func:`footprints.observers.get`."""
if obsname is None:
obsname = OBSERVER_TAG
return observer.get(tag=obsname)
class IdCardAttrDumper(bronx.fancies.dump.TxtDumper):
"""Dump a text representation of almost any footprint object..."""
indent_size = 2
max_depth = 2
def __init__(self):
self._indent_first = 4
def _get_indent_first(self):
return self._indent_first
def _set_indent_first(self, val):
self._indent_first = val
indent_first = property(_get_indent_first, _set_indent_first)
def dump_fpattrs(self, fpobj, level=0):
"""Dump the attributes of a footprint based object."""
if level + 1 > self.max_depth:
return "{}{{...}}{}".format(
self._indent(level, self.break_before_dict_begin),
self._indent(level, self.break_after_dict_end),
)
else:
items = [
"{}{} = {}{},".format(
self._indent(level + 1, self.break_before_dict_key),
str(k),
self._indent(level + 2, self.break_before_dict_value),
self._recursive_dump(v, level + 1),
)
for k, v in sorted(fpobj.footprint_as_shallow_dict().items())
]
return " ".join(items)
def dump_default(self, obj, level=0, nextline=True):
"""Generic dump function. Concise view for GetByTag objects."""
if level + 1 > self.max_depth:
return " <%s...>" % type(obj).__class__
else:
if hasattr(obj, "tag"):
return "{:s} obj: tag={:s}".format(type(obj).__name__, obj.tag)
else:
parent_dump = super(
bronx.fancies.dump.TxtDumper, self
).dump_default(obj, level, nextline and self.break_default)
return "{:s} obj: {!s}".format(type(obj).__name__, parent_dump)
[docs]
class Handler:
"""
The resource handler object gathers a provider, a resource and a container
for any specific resource.
Other parameters given at construct time are stored as options.
"""
def __init__(self, rd, **kw):
if "glove" in rd:
del rd["glove"]
self._resource = rd.pop("resource", None)
self._provider = rd.pop("provider", None)
self._container = rd.pop("container", None)
self._empty = rd.pop("empty", False)
self._contents = None
self._uridata = None
self._options = rd.copy()
self._observer = observer_board(obsname=kw.pop("observer", None))
self._options.update(kw)
self._mdcheck = self._options.pop("metadatacheck", False)
self._mddelta = self._options.pop("metadatadelta", dict())
self._ghost = self._options.pop("ghost", False)
hook_names = [x for x in self._options.keys() if x.startswith("hook_")]
self._hooks = {x[5:]: self._options.pop(x) for x in hook_names}
self._delayhooks = self._options.pop("delayhooks", False)
self._history = History(tag="data-handler")
self._history.append(self.__class__.__name__, "init", True)
self._stage = ["load"]
self._observer.notify_new(self, dict(stage="load"))
self._localpr_cache = None # To cache the promise dictionary
self._latest_earlyget_id = None
self._latest_earlyget_opts = None
logger.debug("New resource handler %s", self.__dict__)
def __str__(self):
return str(self.__dict__)
def _get_resource(self):
"""Getter for ``resource`` property.
**Example**
>>> rh = vortex.input(
... vapp="arpege",
... vconf="4dvarfr",
... cutoff="production",
... date="202506160000",
... term=1,
... geometry="global1798",
... model="arpege",
... block="forecast",
... kind="modelstate",
... experiment="oper",
... local="myfile",
... )
>>> print(rh.resource)
<vortex.nwp.data.modelstates.Historic object at 0x7b430874a620 | model='arpege' date='2025-06-16T00:00:00Z' cutoff='production' geometry='<vortex.data.geometries.GaussGeometry | tag='global1798' id='ARPEGE TL1798c2.2 stretched-rotated geometry' tl=1798 c=2.2>' term='01:00' subset='None'>
"""
return self._resource
def _set_resource(self, value):
"""Setter for ``resource`` property."""
if isinstance(value, resources.Resource):
oldhash = self.simplified_hashkey
self._resource = value
self._notifyhash(oldhash)
self.reset_contents()
else:
raise ValueError(
"This value is not a plain Resource <{!s}>".format(value)
)
resource = property(_get_resource, _set_resource)
def _get_provider(self):
"""Getter for ``provider`` property."""
return self._provider
def _set_provider(self, value):
"""Setter for ``provider`` property."""
if isinstance(value, providers.Provider):
oldhash = self.simplified_hashkey
self._provider = value
self._notifyhash(oldhash)
self.reset_contents()
else:
raise ValueError(
"This value is not a plain Provider <{!s}>".format(value)
)
provider = property(_get_provider, _set_provider)
def _get_container(self):
"""Getter for ``container`` property."""
return self._container
def _set_container(self, value):
"""Setter for ``container`` property."""
if isinstance(value, containers.Container):
oldhash = self.simplified_hashkey
self._container = value
self._notifyhash(oldhash)
else:
raise ValueError(
"This value is not a plain Container <{!s}>".format(value)
)
container = property(_get_container, _set_container)
@property
def history(self):
return self._history
@property
def observer(self):
"""Footprint observer devoted to resource handlers tracking."""
return self._observer
def observers(self):
"""Remote objects observing the current resource handler... and maybe others."""
return self._observer.observers()
def observed(self):
"""Other objects observed by the observers of the current resource handler."""
return [x for x in self._observer.observed() if x is not self]
@property
def complete(self):
"""Returns whether all the internal components are defined."""
return bool(self.resource and self.provider and self.container)
@property
def stage(self):
"""Return current resource handler stage (load, get, put)."""
return self._stage[-1]
@property
def simplified_hashkey(self):
"""Returns a tuple that can be used as a hashkey to quickly identify the handler."""
if self.complete:
rkind = getattr(self.resource, "kind", None)
rfile = getattr(self.container, "filename", None)
return (rkind, rfile)
else:
return ("incomplete",)
@property
def _cur_session(self):
"""Return the current active session."""
return sessions.current()
@property
def _cur_context(self):
"""Return the current active context."""
return contexts.current()
def external_stage_update(self, newstage):
"""This method must not be used directly by users!
Update the stage upon request (e.g. the file has been fetched by another process).
"""
self._stage.append(newstage)
if newstage in ("get",):
self.container.updfill(True)
def _updstage(self, newstage, insitu=False):
"""Notify the new stage to any observing system."""
self._stage.append(newstage)
self._observer.notify_upd(self, dict(stage=newstage, insitu=insitu))
def _notifyhook(self, stage, hookname):
"""Notify that a hook function has been executed."""
self._observer.notify_upd(self, dict(stage=stage, hook=hookname))
def _notifyclear(self):
"""Notify that the hashkey has changed."""
self._observer.notify_upd(
self,
dict(
clear=True,
),
)
def _notifyhash(self, oldhash):
"""Notify that the hashkey has changed."""
self._observer.notify_upd(
self,
dict(
oldhash=oldhash,
),
)
def is_expected(self):
"""Return a boolean value according to the last stage value (expected or not)."""
return self.stage.startswith("expect")
@property
def contents(self):
"""
Returns an valid data layout object as long as the current handler
is complete and the container filled.
"""
if self._empty:
self.container.write("")
self._empty = False
if self.complete:
if self.container.filled or self.stage == "put":
if self._contents is None:
self._contents = self.resource.contents_handler(
datafmt=self.container.actualfmt
)
with self.container.iod_context():
self._contents.slurp(self.container)
return self._contents
else:
logger.warning(
"Contents requested on an empty container [%s]",
self.container,
)
else:
logger.warning(
"Contents requested for an uncomplete handler [%s]",
self.container,
)
return None
def reset_contents(self):
"""Delete actual internal reference to data contents manager."""
self._contents = None
@property
def ghost(self):
return self._ghost
@property
def hooks(self):
return self._hooks
@property
def options(self):
return self._options
@property
def delayhooks(self):
return self._delayhooks
def mkopts(self, *dicos, **kw):
"""Returns options associated to that handler and a system reference."""
opts = dict(
intent=dataflow.intent.IN,
fmt=self.container.actualfmt,
)
opts.update(self.options)
for d in dicos:
opts.update(d)
opts.update(kw)
return opts
def location(self, fatal=True):
"""Returns the URL as defined by the internal provider and resource."""
self._lasturl = None
if self.provider and self.resource:
try:
self._lasturl = self.provider.uri(self.resource)
except Exception as e:
if fatal:
raise
else:
return "OOPS: {!s} (but fatal is False)".format(e)
return self._lasturl
else:
logger.warning(
"Resource handler %s could not build location", self
)
return None
def idcard(self, indent=2):
"""
Returns a multilines documentation string with a summary
of the valuable information contained by this handler.
"""
tab = " " * indent
card = "\n".join(
(
"{0}Handler {1!r}",
"{0}{0}Complete : {2}",
"{0}{0}Options : {3}",
"{0}{0}Location : {4}",
)
).format(tab, self, self.complete, self.options, self.location())
if self.hooks:
card += "\n{0}{0}Hooks : {1}".format(
tab, ",".join(list(self.hooks.keys()))
)
d = IdCardAttrDumper(tag="idcarddumper")
d.reset()
d.indent_first = 2 * len(tab)
for subobj in ("resource", "provider", "container"):
obj = getattr(self, subobj, None)
if obj:
thisdoc = "{0}{0}{1:s} {2!r}".format(
tab, subobj.capitalize(), obj
)
thisdoc += d.dump_fpattrs(obj)
else:
thisdoc = "{0}{0}{1:s} undefined".format(
tab, subobj.capitalize()
)
card = card + "\n" + thisdoc
return card
def quickview(self, nb=0, indent=0):
"""Standard glance to objects."""
tab = " " * indent
print("{}{:02d}. {:s}".format(tab, nb, repr(self)))
print("{} Complete : {!s}".format(tab, self.complete))
for subobj in ("container", "provider", "resource"):
obj = getattr(self, subobj, None)
if obj:
print("{} {:10s}: {!s}".format(tab, subobj.capitalize(), obj))
def wide_key_lookup(self, key, exports=False, fatal=True):
"""Return the *key* attribute if it exists in the provider or resource.
If *exports* is True, the footprint_export() or the export_dict() function
is called upon the return value.
"""
try:
if key == "safeblock":
# In olive experiments, the block may contain an indication of
# the member's number. Usually we do not want to get that...
a_value = getattr(self.provider, "block")
a_value = re.sub(r"(member|fc)_?\d+/", "", a_value)
else:
a_value = getattr(self.provider, key)
except AttributeError:
try:
a_value = getattr(self.resource, key)
except AttributeError:
if fatal:
raise AttributeError(
"The {:s} attribute could not be found in {!r}".format(
key, self
)
)
else:
a_value = None
if exports:
if hasattr(a_value, "footprint_export"):
a_value = a_value.footprint_export()
elif hasattr(a_value, "export_dict"):
a_value = a_value.export_dict()
return a_value
def as_dict(self):
"""Produce a raw json-compatible dictionary."""
rhd = dict(options=dict())
for k, v in self.options.items():
try:
v = v.export_dict()
except (AttributeError, TypeError):
pass
rhd["options"][k] = v
for subobj in ("resource", "provider", "container"):
obj = getattr(self, subobj, None)
if obj is not None:
rhd[subobj] = obj.footprint_export()
return rhd
@property
def lasturl(self):
"""The last actual URL value evaluated."""
return self._lasturl
@property
def uridata(self):
"""Actual extra URI values after store definition."""
return self._uridata
@property
def store(self):
if self.resource and self.provider:
self._uridata = net.uriparse(self.location())
stopts = {
k: v for k, v in self.options.items() if k.startswith("stor")
}
return footprints.proxy.store(
scheme=self._uridata.pop("scheme"),
netloc=self._uridata.pop("netloc"),
username=self._uridata.pop("username"),
**stopts,
)
else:
return None
def check(self, **extras):
"""Returns a stat-like information to the remote resource."""
rst = None
if self.resource and self.provider:
store = self.store
if store:
logger.debug(
"Check resource %s at %s from %s",
self,
self.lasturl,
store,
)
rst = store.check(self.uridata, self.mkopts(extras))
if rst and self._mdcheck:
logger.info(
"metadatacheck is on: we are forcing a real get()..."
)
# We are using a temporary fake container
mycontainer = footprints.proxy.container(
shouldfly=True, actualfmt=self.container.actualfmt
)
try:
tmp_options = self.mkopts(extras)
tmp_options["obs_notify"] = False
rst = store.get(
self.uridata, mycontainer.iotarget(), tmp_options
)
if rst:
if store.delayed:
logger.warning(
"The resource is expected... let's say that's fine."
)
else:
# Create the contents manually and drop it when we are done.
contents = self.resource.contents_handler(
datafmt=mycontainer.actualfmt
)
contents.slurp(mycontainer)
rst = contents.metadata_check(
self.resource, delta=self._mddelta
)
finally:
# Delete the temporary container
mycontainer.clear()
self.history.append(store.fullname(), "check", rst)
if rst and self.stage == "load":
# Indicate that the resource was checked
self._updstage("checked")
if not rst:
# Always signal failures
self._updstage("void")
else:
logger.error(
"Could not find any store to check %s", self.lasturl
)
else:
logger.error(
"Could not check a rh without defined resource and provider %s",
self,
)
return rst
[docs]
def locate(self, **extras):
r"""
Try to figure out what would be the physical location of the resource.
:returns: A semiclon separated string listing the various locations where the resource can be found.
>>> rh = vortex.input(
... vapp="arpege",
... vconf="4dvarfr",
... cutoff="production",
... date="202506160000",
... term=1,
... geometry="global1798",
... model="arpege",
... block="forecast",
... kind="modelstate",
... experiment="oper",
... local="myfile",
... )
>>> print("\n".join(rh.locate().split(";")))
/home/user/.vortex.d/arpege/4dvarfr/OPER/20250616T0000P/forecast/historic.arpege.tl1798-c22+0001:00.fa
user@archive:/data/archive/arpege/4dvarfr/OPER/2025/06/16/T0000P/forecast/historic.arpege.tl1798-c22+0001:00.fa
"""
rst = None
if self.resource and self.provider:
store = self.store
if store:
logger.debug(
"Locate resource %s at %s from %s",
self,
self.lasturl,
store,
)
rst = store.locate(self.uridata, self.mkopts(extras))
self.history.append(store.fullname(), "locate", rst)
else:
logger.error(
"Could not find any store to locate %s", self.lasturl
)
else:
logger.error("Could not locate an incomplete rh %s", self)
return rst
def prestage(self, **extras):
"""Request the pre-staging of the remote resource."""
rst = None
if self.resource and self.provider:
store = self.store
if store:
logger.debug(
"Prestage resource %s at %s from %s",
self,
self.lasturl,
store,
)
rst = store.prestage(self.uridata, self.mkopts(extras))
self.history.append(store.fullname(), "prestage", rst)
else:
logger.error(
"Could not find any store to prestage %s", self.lasturl
)
else:
logger.error("Could not prestage an incomplete rh %s", self)
return rst
def _generic_apply_hooks(self, action, **extras):
"""Apply the hooks after a get request (or verify that they were done)."""
if self.hooks:
mytracker = extras.get("mytracker", None)
if mytracker is None:
iotarget = self.container.iotarget()
mytracker = self._cur_context.localtracker[iotarget]
for hook_name in sorted(self.hooks.keys()):
if mytracker.redundant_hook(action, hook_name):
logger.info(
"Hook already executed <hook_name:%s>", hook_name
)
else:
logger.info("Executing Hook <hook_name:%s>", hook_name)
hook_func, hook_args = self.hooks[hook_name]
hook_func(self._cur_session, self, *hook_args)
self._notifyhook(action, hook_name)
def apply_get_hooks(self, **extras):
"""Apply the hooks after a get request (or verify that they were done)."""
self._generic_apply_hooks(action="get", **extras)
def apply_put_hooks(self, **extras):
"""Apply the hooks before a put request (or verify that they were done)."""
self._generic_apply_hooks(action="put", **extras)
def _postproc_get(self, store, rst, extras):
self.container.updfill(rst)
# Check metadata if sensible
if self._mdcheck and rst and not store.delayed:
rst = self.contents.metadata_check(
self.resource, delta=self._mddelta
)
if not rst:
logger.info(
"We are now cleaning up the container and data contents."
)
self.reset_contents()
self.clear()
# For the record...
self.history.append(store.fullname(), "get", rst)
if rst:
# This is an expected resource
if store.delayed:
self._updstage("expected")
logger.info(
"Resource <%s> is expected", self.container.iotarget()
)
# This is a "real" resource
else:
self._updstage("get")
if self.hooks:
if not self.delayhooks:
self.apply_get_hooks(**extras)
else:
logger.info("(get-)Hooks were delayed")
else:
# Always signal failures
self._updstage("void")
return rst
def _actual_get(self, **extras):
"""Internal method in charge of getting the resource.
If requested, it will check the metadata of the resource and apply the
hook functions.
"""
rst = False
store = self.store
if store:
logger.debug(
"Get resource %s at %s from %s", self, self.lasturl, store
)
st_options = self.mkopts(dict(rhandler=self.as_dict()), extras)
# Actual get
try:
rst = store.get(
self.uridata,
self.container.iotarget(),
st_options,
)
except Exception:
rst = False
raise
finally:
rst = self._postproc_get(store, rst, extras)
else:
logger.error("Could not find any store to get %s", self.lasturl)
# Reset the promise dictionary cache
self._localpr_cache = None # To cache the promise dictionary
return rst
def _actual_earlyget(self, **extras):
"""Internal method in charge of requesting an earlyget on the resource.
:return: ``None`` if earlyget is unavailable (depending on the store's kind
and resource it can be perfectly fine). ``True`` if the resource was
actually fetched (no need to call :meth:`finaliseget`). Some kind of
non-null identifier that will be used to call :meth:`finaliseget`.
"""
try:
store = self.store
except Exception as e:
logger.error(
"The Resource handler was unable to create a store object (%s).",
str(e),
)
store = None
if store:
logger.debug(
"Early-Get resource %s at %s from %s",
self,
self.lasturl,
store,
)
st_options = self.mkopts(dict(rhandler=self.as_dict()), extras)
# Actual earlyget
try:
return store.earlyget(
self.uridata,
self.container.iotarget(),
st_options,
)
except Exception as e:
logger.error(
"The store's earlyget method did not return (%s): it should never append!",
str(e),
)
return None
else:
logger.error("Could not find any store to get %s", self.lasturl)
return None
def _get_proxy(self, callback, alternate=False, **extras):
"""
Process the **insitu** and **alternate** option and launch the **callback**
callable if sensible.
"""
rst = False
if self.complete:
if self.options.get(
"insitu", False
): # This a second pass (or third, forth, ...)
cur_tracker = self._cur_context.localtracker
cur_seq = self._cur_context.sequence
iotarget = self.container.iotarget()
# The localpath is here and listed in the tracker
if self.container.exists() and cur_tracker.is_tracked_input(
iotarget
):
# Am I consistent with the ResourceHandler recorded in the tracker ?
if cur_tracker[iotarget].match_rh("get", self):
rst = True
# There is the tricky usecase where we are dealing with an alternate
# that was already dealt with (yes, sometimes the nominal case and
# the alternate is the same !)
if not (
alternate
and iotarget
in [
s.rh.container.iotarget()
for s in cur_seq.effective_inputs()
]
):
self.container.updfill(True)
self._updstage("get", insitu=True)
logger.info(
"The <%s> resource is already here and matches the RH description :-)",
self.container.iotarget(),
)
else:
# This may happen if fatal=False and the local file was fetched
# by an alternate
if alternate:
if not self.container.is_virtual():
lpath = self.container.localpath()
for (
isec
) in self._cur_context.sequence.rinputs():
if (
isec.stage in ("get" or "expected")
and not isec.rh.container.is_virtual()
and isec.rh.container.localpath()
== lpath
):
rst = True
break
if rst:
logger.info(
"Alternate is on and the local file exists."
)
else:
logger.info(
"Alternate is on but the local file is not yet matched."
)
self._updstage("void", insitu=True)
else:
logger.info(
"Alternate is on. The local file exists. The container is virtual."
)
rst = True
else:
logger.info(
"The resource is already here but doesn't match the RH description :-("
)
cur_tracker[iotarget].match_rh(
"get", self, verbose=True
)
self._updstage("void", insitu=True)
# Bloody hell, the localpath doesn't exist
else:
rst = callback(
**extras
) # This might be an expected resource...
if rst:
logger.info(
"The resource was successfully fetched :-)"
)
else:
logger.info("Could not get the resource :-(")
else:
if alternate and self.container.exists():
logger.info("Alternate <%s> exists", alternate)
rst = True
else:
if self.container.exists():
logger.warning(
"The resource is already here: that should not happen at this stage !"
)
rst = callback(**extras)
else:
logger.error("Could not get an incomplete rh %s", self)
return rst
[docs]
def get(self, alternate=False, **extras):
"""Method to retrieve the resource through the provider and feed the current container.
The behaviour of this method depends on the **insitu** and **alternate** options:
* When **insitu** is True, the :class:`~vortex.layout.dataflow.LocalTracker`
object associated with the active context is checked to determine
whether the resource has already been fetched or not. If not, another
try is made (but without using any non-cache store).
* When **insitu** is False, an attempt to get the resource is systematically
made except if **alternate** is defined and the local container already
exists.
**Example**
.. code:: python
rhandlers = vortex.input(
kind='gridpoint',
term=1,
geometry='eurw1s40',
nativefmt='grib',
model='arome',
cutoff='production',
date=['2024060121', '2024060122'],
origin='historic',
vapp='arome',
vconf='pefrance',
member=[1,2,5],
experiment='myexp',
block='forecast',
local='gribfile_[member].grib',
format='grib',
)
for rh in rhandlers:
rh.get()
"""
return self._get_proxy(self._actual_get, alternate=alternate, **extras)
def earlyget(self, alternate=False, **extras):
"""The earlyget feature is somehow a declaration of intent.
It records in the current context that, at some point in the future, we will
retrieve the present resource. It can be useful for some kind of stores
(and useless to others). For example, when using a store that targets a mass
archive system, this information can be used to ask for several files at
once, which accelerates the overall process and optimises the tape's drivers
usage. On the other hand, for a cache based store, it does not make much sense
since the data is readily available on disk.
Return values can be:
* ``None`` if earlyget is unavailable (depending on the store's kind
and resource it can be perfectly fine).
* Some kind of non-null identifier that will be used later on to actually
retrieve the resource. It is returned to the user as a diagnostic but is
also stored internally within the :class:`Handler` object.
* ``True`` if the resource has actually been retrieved through the provider
and fed into the current container.
In any case, the :meth:`finaliseget` method should be called later on
to actually retrieve the resource and feed the container. When ``True``
is returned by the :meth:`earlyget` method, the :meth:`finaliseget` call
can be made although it is useless.
Like with the :meth:`get` method, the behaviour of this method depends
on the **insitu** and **alternate** options:
* When **insitu** is True, the :class:`~vortex.layout.dataflow.LocalTracker`
object associated with the active context is checked to determine
whether the resource has already been fetched or not. If not, another
try is made (but without using any non-cache store).
* When **insitu** is False, an attempt to get the resource is systematically
made except if **alternate** is defined and the local container already
exists.
"""
r_opts = extras.copy()
self._latest_earlyget_opts = r_opts
self._latest_earlyget_opts["alternate"] = alternate
self._latest_earlyget_id = self._get_proxy(
self._actual_earlyget, alternate=alternate, **extras
)
return self._latest_earlyget_id
def finaliseget(self):
"""
When the :meth:`earlyget` method had previously been called, the
:meth:`finaliseget` can be called to finalise the ``get`` sequence.
When :meth:`finaliseget` returns, if the return code is non-zero, the resource
has been retrieved and fed into the container.
:raises HandlerError: if :meth:`earlyget` is not called prior to this
method.
"""
if (
self._latest_earlyget_id is None
and self._latest_earlyget_opts is None
):
raise HandlerError(
"earlyget was not called yet. Calling finaliseget is not Allowed !"
)
try:
if self._latest_earlyget_id is True:
# Nothing to be done...
return True
elif self._latest_earlyget_id is None:
# Delayed get not available... do the usual get !
e_opts = self._latest_earlyget_opts.copy()
e_opts["insitu"] = False
return self._get_proxy(self._actual_get, **e_opts)
else:
alternate = self._latest_earlyget_opts.get("alternate", False)
if alternate and self.container.exists():
# The container may have been filled be another finaliseget
logger.info("Alternate <%s> exists", alternate)
rst = True
else:
rst = False
store = self.store
if store:
logger.debug(
"Finalise-Get resource %s at %s from %s",
self,
self.lasturl,
store,
)
st_options = self.mkopts(
dict(rhandler=self.as_dict()),
self._latest_earlyget_opts,
)
# Actual get
rst = store.finaliseget(
self._latest_earlyget_id,
self.uridata,
self.container.iotarget(),
st_options,
)
if rst is None:
# Delayed get failed... attempt the usual get
logger.warning(
"Delayed get result was unclear ! Reverting to the usual get."
)
e_opts = self._latest_earlyget_opts.copy()
e_opts["insitu"] = False
return self._get_proxy(self._actual_get, **e_opts)
else:
rst = self._postproc_get(
store, rst, self._latest_earlyget_opts
)
else:
logger.error(
"Could not find any store to get %s", self.lasturl
)
# Reset the promise dictionary cache
self._localpr_cache = (
None # To cache the promise dictionary
)
return rst
finally:
self._latest_earlyget_id = None
self._latest_earlyget_opts = None
def insitu_quickget(self, alternate=False, **extras):
"""This method attempts a straightforward insitu get.
It is designed to minimise the amount of outputs when everything goes smoothly.
"""
rst = False
if self.complete:
if self.options.get(
"insitu", False
): # This a second pass (or third, forth, ...)
cur_tracker = self._cur_context.localtracker
cur_seq = self._cur_context.sequence
iotarget = self.container.iotarget()
# The localpath is here and listed in the tracker
if self.container.exists() and cur_tracker.is_tracked_input(
iotarget
):
if cur_tracker[iotarget].match_rh("get", self):
rst = True
# There is the tricky usecase where we are dealing with an alternate
# that was already dealt with (yes, sometimes the nominal case and
# the alternate is the same !)
if not (
alternate
and iotarget
in [
s.rh.container.iotarget()
for s in cur_seq.effective_inputs()
]
):
self.container.updfill(True)
self._updstage("get", insitu=True)
elif alternate:
# Alternate is on and the local file exists: check if
# the file has already been fetch previously in the sequence
if iotarget in [
s.rh.container.iotarget()
for s in cur_seq.effective_inputs()
]:
rst = True
else:
logger.error(
"This method should not be called with insitu=False (rh %s)",
self,
)
return rst
[docs]
def put(self, **extras):
"""Method to store data from the current container through the provider.
Hook functions may be applied before the put in the designated store. We
will ensure that a given hook function (identified by its name) is not
applied more than once to the local container.
Conversely, the low-level stores are made aware of the previous successful
put. That way, a local container is not put twice to the same destination.
.. code:: python
rhandlers = vortex.output(
kind='gridpoint',
term=1,
geometry='eurw1s40',
nativefmt='grib',
model='arome',
cutoff='production',
date=['2024060121', '2024060122'],
origin='historic',
vapp='arome',
vconf='pefrance',
member=[1,2,5],
experiment='myexp',
block='forecast',
local='gribfile_[member].grib',
format='grib',
)
for rh in rhandlers:
rh.put()
"""
rst = False
if self.complete:
store = self.store
if store:
iotarget = self.container.iotarget()
logger.debug(
"Put resource %s as io %s at store %s",
self,
iotarget,
store,
)
if iotarget is not None and (
self.container.exists() or self.provider.expected
):
mytracker = self._cur_context.localtracker[iotarget]
# Execute the hooks only if the local file exists
if self.container.exists():
self.container.updfill(True)
if self.hooks:
if not self.delayhooks:
self.apply_put_hooks(
mytracker=mytracker, **extras
)
else:
logger.info("(put-)Hooks were delayed")
# Add a filter function to remove duplicated PUTs to the same uri
extras_ext = dict(extras)
extras_ext["urifilter"] = functools.partial(
mytracker.redundant_uri, "put"
)
# Actual put
logger.debug(
"Put resource %s at %s from %s",
self,
self.lasturl,
store,
)
rst = store.put(
iotarget,
self.uridata,
self.mkopts(dict(rhandler=self.as_dict()), extras_ext),
)
# For the record...
self.history.append(store.fullname(), "put", rst)
self._updstage("put")
elif self.ghost:
self.history.append(store.fullname(), "put", False)
self._updstage("ghost")
rst = True
else:
logger.error(
"Could not find any source to put [%s]", iotarget
)
else:
logger.error(
"Could not find any store to put [%s]", self.lasturl
)
else:
logger.error("Could not put an incomplete rh [%s]", self)
return rst
def delete(self, **extras):
"""Delete the remote resource from store."""
rst = None
if self.resource and self.provider:
store = self.store
if store:
logger.debug(
"Delete resource %s at %s from %s",
self,
self.lasturl,
store,
)
rst = store.delete(
self.uridata,
self.mkopts(dict(rhandler=self.as_dict()), extras),
)
self.history.append(store.fullname(), "delete", rst)
else:
logger.error(
"Could not find any store to delete %s", self.lasturl
)
else:
logger.error(
"Could not delete a rh without defined resource and provider %s",
self,
)
return rst
def clear(self):
"""Clear the local container contents."""
rst = False
if self.container:
logger.debug("Remove resource container %s", self.container)
rst = self.container.clear()
self.history.append(self.container.actualpath(), "clear", rst)
self._notifyclear()
stage_clear_mapping = dict(expected="checked", get="checked")
if self.stage in stage_clear_mapping:
self._updstage(stage_clear_mapping[self.stage])
return rst
def mkgetpr(
self,
pr_getter=None,
tplfile=None,
py_exec=sys.executable,
py_opts="",
):
"""Build a getter for the expected resource."""
if tplfile is None:
tplfile = (
"sync-" + ("fetch" if self.is_expected() else "skip") + ".tpl"
)
with importlib.resources.path(
"vortex.data.sync_templates",
tplfile,
) as tplpath:
tpl = config.load_template(tplpath)
if pr_getter is None:
pr_getter = self.container.localpath() + ".getpr"
t = self._cur_session
with open(pr_getter, "w", encoding="utf-8") as fd:
fd.write(
tpl.substitute(
python=py_exec,
pyopts=py_opts,
promise=self.container.localpath(),
)
)
t.sh.chmod(pr_getter, 0o555)
return pr_getter
@property
def _localpr_json(self):
if self.is_expected():
if self._localpr_cache is None:
self._localpr_cache = self._cur_session.sh.json_load(
self.container.localpath()
)
return self._localpr_cache
else:
return None
def is_grabable(self, check_exists=False):
"""Return if an expected resource is available or not.
Note: If it returns True, the user still needs to :meth:`get` the resource.
"""
rc = True
if self.is_expected():
pr = self._localpr_json
itself = pr.get("itself")
rc = not self._cur_session.sh.path.exists(itself)
if rc and check_exists:
remote = pr.get("locate").split(";")[0]
rc = self._cur_session.sh.path.exists(remote)
return rc
def wait(self, sleep=10, timeout=300, fatal=False):
"""Wait for an expected resource or return immediately."""
rc = True
local = self.container.localpath()
if self.is_expected():
nb = 0
sh = self._cur_session.sh
pr = self._localpr_json
itself = pr.get("itself")
nbtries = int(timeout / sleep)
logger.info(
"Waiting %d x %d s. for expected resource <%s>",
nbtries,
sleep,
local,
)
while sh.path.exists(itself):
sh.sleep(sleep)
nb += 1
if nb > nbtries:
logger.error("Could not wait anymore <%d>", nb)
rc = False
if fatal:
logger.critical(
"Missing expected resource is fatal <%s>", local
)
raise HandlerError("Expected resource missing")
break
else:
remote = pr.get("locate").split(";")[0]
if sh.path.exists(remote):
logger.info(
"Keeping promise for remote resource <%s>", remote
)
else:
logger.warning(
"Empty promise for remote resource <%s>", remote
)
rc = False
else:
logger.info("Resource <%s> not expected", local)
return rc
def save(self):
"""Rewrite data if contents have been updated."""
rst = False
if self.contents:
rst = self.contents.rewrite(self.container)
if not self.container.is_virtual():
self.container.close()
else:
logger.warning("Try to save undefined contents %s", self)
return rst
def strlast(self):
"""String formatted log of the last action."""
return " ".join([str(x) for x in self.history.last])