diff --git a/src/fmu/dataio/_definitions.py b/src/fmu/dataio/_definitions.py index 1efeafbe5..f4d448ce3 100644 --- a/src/fmu/dataio/_definitions.py +++ b/src/fmu/dataio/_definitions.py @@ -72,4 +72,5 @@ def __post_init__(self): "realization": "To realization-N/iter_M/share", "case": "To casename/share, but will also work on project disk", "case_symlink_realization": "To case/share, with symlinks on realizations level", + "preprocessed": "To share/preprocessed; from interactive runs but re-used later", } diff --git a/src/fmu/dataio/_filedata_provider.py b/src/fmu/dataio/_filedata_provider.py index cc9426e00..bca1e72d2 100644 --- a/src/fmu/dataio/_filedata_provider.py +++ b/src/fmu/dataio/_filedata_provider.py @@ -127,7 +127,7 @@ def _get_filestem(self): if self.dataio.filename_timedata_reverse: # class variable stem += "--" + base + "_" + monitor else: - stem += "--" + monitor + "_" + base + stem += "--" + monitor + "_" + base stem = stem.replace(".", "_").replace(" ", "_") @@ -156,10 +156,13 @@ def _get_path(self): outroot = outroot / "share" - if self.dataio.is_observation: - outroot = outroot / "observations" + if self.fmu_context == "preprocessed": + outroot = outroot / "preprocessed" else: - outroot = outroot / "results" + if self.dataio.is_observation: + outroot = outroot / "observations" + else: + outroot = outroot / "results" dest = outroot / self.efolder # e.g. "maps" @@ -184,7 +187,7 @@ def _get_path(self): if self.dataio.createfolder: dest.mkdir(parents=True, exist_ok=True) - # check that destination actually exists if verify_folder is True + # check that destination actually exists if verifyfolder is True if self.dataio.verifyfolder and not dest.exists(): raise IOError(f"Folder {str(dest)} is not present.") diff --git a/src/fmu/dataio/_fmu_provider.py b/src/fmu/dataio/_fmu_provider.py index 907887bd2..2064b8b0a 100644 --- a/src/fmu/dataio/_fmu_provider.py +++ b/src/fmu/dataio/_fmu_provider.py @@ -82,6 +82,8 @@ def detect_provider(self): logger.info("Detecting FMU provider as None") self.provider = None # e.g. an interactive RMS run self.dataio._usecontext = None # e.g. an interactive RMS run + if self.dataio.fmu_context == "preprocessed": + self.dataio._usecontext = self.dataio.fmu_context def _detect_ert2provider(self) -> bool: """Detect if ERT2 is provider and set itername, casename, etc.""" diff --git a/src/fmu/dataio/_metadata.py b/src/fmu/dataio/_metadata.py index 6cb959fb9..71a093201 100644 --- a/src/fmu/dataio/_metadata.py +++ b/src/fmu/dataio/_metadata.py @@ -8,6 +8,7 @@ import getpass import logging from dataclasses import dataclass, field +from pathlib import Path from typing import Any, Optional from warnings import warn @@ -15,7 +16,12 @@ from fmu.dataio._filedata_provider import _FileDataProvider from fmu.dataio._fmu_provider import _FmuProvider from fmu.dataio._objectdata_provider import _ObjectDataProvider -from fmu.dataio._utils import drop_nones, export_file_compute_checksum_md5 +from fmu.dataio._utils import ( + drop_nones, + export_file_compute_checksum_md5, + glue_metadata_preprocessed, + read_metadata, +) logger = logging.getLogger(__name__) @@ -142,10 +148,20 @@ class _MetaData: # relevant when ERT* fmu_context; same as rootpath in the ExportData class!: rootpath: str = field(default="", init=False) + # if re-using existing metadata + meta_existing: dict = field(default_factory=dict, init=False) + def __post_init__(self): logger.setLevel(level=self.verbosity) logger.info("Initialize _MetaData instance.") + # one special case is that obj is a file path, and dataio.reuse_metadata_rule is + # active. In this case we read the existing metadata here and reuse parts + # according to rule described in string self.reuse_metadata_rule! + if isinstance(self.obj, (str, Path)) and self.dataio.reuse_metadata_rule: + logger.info("Partially reuse existing metadata from %s", self.obj) + self.meta_existing = read_metadata(self.obj) + def _populate_meta_objectdata(self): """Analyze the actual object together with input settings. @@ -154,8 +170,7 @@ def _populate_meta_objectdata(self): Hence this must be ran early or first. """ - - self.objdata = _ObjectDataProvider(self.obj, self.dataio) + self.objdata = _ObjectDataProvider(self.obj, self.dataio, self.meta_existing) self.objdata.derive_metadata() self.meta_objectdata = self.objdata.metadata @@ -252,6 +267,20 @@ def _populate_meta_access(self): if self.dataio: self.meta_access = generate_meta_access(self.dataio.config) + def _reuse_existing_metadata(self, meta): + """Perform a merge procedure if the key `reuse_metadata_rule` is active.""" + if self.dataio and self.dataio.reuse_metadata_rule: + oldmeta = self.meta_existing + newmeta = meta.copy() + if self.dataio.reuse_metadata_rule == "preprocessed": + return glue_metadata_preprocessed(oldmeta, newmeta) + else: + raise ValueError( + f"The reuse_metadata_rule {self.dataio.reuse_metadata_rule} is not " + "supported." + ) + return meta + def generate_export_metadata(self, skip_null=True) -> dict: # TODO! -> skip_null? """Main function to generate the full metadata""" @@ -264,7 +293,8 @@ def generate_export_metadata(self, skip_null=True) -> dict: # TODO! -> skip_nul self._populate_meta_fmu() self._populate_meta_file() - # glue together metadata, order is as legacy code + # glue together metadata, order is as legacy code (but will be screwed if reuse + # of existing metadata...) meta = self.meta_dollars.copy() meta["tracklog"] = self.meta_tracklog meta["class"] = self.meta_class @@ -281,4 +311,6 @@ def generate_export_metadata(self, skip_null=True) -> dict: # TODO! -> skip_nul if skip_null: meta = drop_nones(meta) + meta = self._reuse_existing_metadata(meta) + return meta diff --git a/src/fmu/dataio/_objectdata_provider.py b/src/fmu/dataio/_objectdata_provider.py index 1f31577ac..ce9f9ed11 100644 --- a/src/fmu/dataio/_objectdata_provider.py +++ b/src/fmu/dataio/_objectdata_provider.py @@ -86,7 +86,8 @@ import logging from dataclasses import dataclass, field from datetime import datetime as dt -from typing import Any +from pathlib import Path +from typing import Any, Optional from warnings import warn import numpy as np @@ -94,7 +95,7 @@ import xtgeo # type: ignore from ._definitions import _ValidFormats -from ._utils import generate_description +from ._utils import generate_description, parse_timedata try: import pyarrow as pa # type: ignore @@ -118,27 +119,29 @@ class _ObjectDataProvider: * Investigating (parsing) the object (e.g. a XTGeo RegularSurface) itself * Combine the object info with user settings, globalconfig and class variables + * OR + * investigate current metadata if that is provided """ - # input fields, cannot be defaulted + # input fields obj: Any dataio: Any + meta_existing: Optional[dict] = None # result properties; the most important is metadata which IS the 'data' part in # the resulting metadata. But other variables needed later are also given # as instance properties in addition (for simplicity in other classes/functions) - metadata: dict = field(default_factory=dict) - - name: str = "" - classname: str = "" - efolder: str = "" - fmt: str = "" - extension: str = "" - layout: str = "" - bbox: dict = field(default_factory=dict) - specs: dict = field(default_factory=dict) - time0: str = "" - time1: str = "" + metadata: dict = field(default_factory=dict, init=False) + name: str = field(default="", init=False) + classname: str = field(default="", init=False) + efolder: str = field(default="", init=False) + fmt: str = field(default="", init=False) + extension: str = field(default="", init=False) + layout: str = field(default="", init=False) + bbox: dict = field(default_factory=dict, init=False) + specs: dict = field(default_factory=dict, init=False) + time0: str = field(default="", init=False) + time1: str = field(default="", init=False) def __post_init__(self): @@ -562,9 +565,31 @@ def _derive_timedata_newformat(self): logger.info("Timedata: time0 is %s while time1 is %s", self.time0, self.time1) return tresult + def _derive_from_existing(self): + """Derive from existing metadata.""" + + # do not change any items in 'data' block, as it may ruin e.g. stratigrapical + # setting (i.e. changing data.name is not allowed) + self.metadata = self.meta_existing["data"] + self.name = self.meta_existing["data"]["name"] + + # derive the additional attributes needed later e.g. in Filedata provider: + relpath = Path(self.meta_existing["file"]["relative_path"]) + self.efolder = relpath.parent.name + self.classname = self.meta_existing["class"] + self.extension = relpath.suffix + self.fmt = self.meta_existing["data"]["format"] + + self.time0, self.time1 = parse_timedata(self.meta_existing["data"]) + def derive_metadata(self): """Main function here, will populate the metadata block for 'data'.""" logger.info("Derive all metadata for data object...") + + if self.meta_existing: + self._derive_from_existing() + return + nameres = self._derive_name_stratigraphy() objres = self._derive_objectdata() diff --git a/src/fmu/dataio/_utils.py b/src/fmu/dataio/_utils.py index 4c72d73ad..78c6fafb4 100644 --- a/src/fmu/dataio/_utils.py +++ b/src/fmu/dataio/_utils.py @@ -3,13 +3,16 @@ import json import logging import os +import shutil import tempfile import uuid from copy import deepcopy +from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Union import pandas as pd # type: ignore +import yaml try: import pyarrow as pa # type: ignore @@ -20,7 +23,6 @@ from pyarrow import feather import xtgeo # type: ignore -import yaml from . import _design_kw from . import _oyaml as oyaml @@ -112,7 +114,11 @@ def export_metadata_file(yfile, metadata, savefmt="yaml", verbosity="WARNING") - def export_file(obj, filename, extension, flag=None): """Export a valid object to file""" - if extension == ".gri" and isinstance(obj, xtgeo.RegularSurface): + + if isinstance(obj, Path): + # special case when processing data which already has metadata + shutil.copy(obj, filename) + elif extension == ".gri" and isinstance(obj, xtgeo.RegularSurface): obj.to_file(filename, fformat="irap_binary") elif extension == ".csv" and isinstance(obj, (xtgeo.Polygons, xtgeo.Points)): out = obj.copy() # to not modify incoming instance! @@ -388,6 +394,47 @@ def generate_description(desc: Optional[Union[str, list]] = None) -> Union[list, else: raise ValueError("Description of wrong type, must be list of strings or string") + +def read_metadata(filename: Union[str, Path]) -> dict: + """Read the metadata as a dictionary given a filename. + + If the filename is e.g. /some/path/mymap.gri, the assosiated metafile + will be /some/path/.mymap.gri.yml (or json?) + + Args: + filename: The full path filename to the data-object. + + Returns: + A dictionary with metadata read from the assiated metadata file. + """ + fname = Path(filename) + metafile = str(fname.parent) + "/." + fname.stem + fname.suffix + ".yml" + metafilepath = Path(metafile) + if not metafilepath.exists(): + raise IOError(f"Cannot find requested metafile: {metafile}") + with open(metafilepath, "r") as stream: + metacfg = yaml.safe_load(stream) + + return metacfg + + +def glue_metadata_preprocessed(oldmeta, newmeta): + """Glue (combine) to metadata dicts according to rule 'preprocessed'.""" + + meta = oldmeta.copy() + meta["fmu"] = newmeta["fmu"] + meta["file"] = newmeta["file"] + meta["access"] = newmeta["access"] + + newmeta["tracklog"][-1]["event"] = "merged" + meta["tracklog"].extend(newmeta["tracklog"]) + + # the only field in 'data' that are allowed to update is name: + meta["data"]["name"] = newmeta["data"]["name"] + + return meta + + def parse_timedata(datablock: dict, isoformat=True): """The time section under datablock has variants to parse. diff --git a/src/fmu/dataio/dataio.py b/src/fmu/dataio/dataio.py index 9d9d9e559..6bd4994ae 100644 --- a/src/fmu/dataio/dataio.py +++ b/src/fmu/dataio/dataio.py @@ -13,7 +13,6 @@ from warnings import warn import pandas as pd # type: ignore -import yaml from . import _metadata from ._definitions import ALLOWED_CONTENTS, ALLOWED_FMU_CONTEXTS, CONTENTS_REQUIRED @@ -25,9 +24,9 @@ filter_validate_metadata, generate_description, prettyprint_dict, - some_config_from_env, - uuid_from_string, ) +from ._utils import read_metadata as _utils_read_metadata +from ._utils import some_config_from_env, uuid_from_string INSIDE_RMS = detect_inside_rms() @@ -194,15 +193,7 @@ def read_metadata(filename: Union[str, Path]) -> dict: Returns: A dictionary with metadata read from the assiated metadata file. """ - fname = Path(filename) - metafile = str(fname.parent) + "/." + fname.stem + fname.suffix + ".yml" - metafilepath = Path(metafile) - if not metafilepath.exists(): - raise IOError(f"Cannot find requested metafile: {metafile}") - with open(metafilepath, "r") as stream: - metacfg = yaml.safe_load(stream) - - return metacfg + return _utils_read_metadata(filename) # ====================================================================================== @@ -280,8 +271,8 @@ class ExportData: the file structure or by other means. Use with care! config: Required, either as key (here) or through an environment variable. - A dictionary with static settings. In the standard case this is read - from FMU global variables (via fmuconfig). The dictionary must contain some + A dictionary with static settings. In the standard case this is read from + FMU global variables (via fmuconfig). The dictionary must contain some predefined main level keys to work with fmu-dataio. If the key is missing or key value is None, then it will look for the environment variable FMU_GLOBAL_CONFIG to detect the file. If no success in finding the file, a @@ -294,9 +285,12 @@ class ExportData: fmu_context: In normal forward models, the fmu_context is ``realization`` which is default and will put data per realization. Other contexts may be ``case`` - which will put data relative to the case root. If a non-FMU run is detected - (e.g. you run from project), fmu-dataio will detect that and set actual - context to None as fall-back. + which will put data relative to the case root. Another important context is + "preprocessed" which will output to a dedicated "preprocessed" folder + instead, and metadata will be partially re-used in an ERT model run. If a + non-FMU run is detected (e.g. you run from project), fmu-dataio will detect + that and set actual context to None as fall-back (unless preprocessed is + specified). If value is "preprosessed", see also ``resuse_metadata`` key. description: A multiline description of the data either as a string or a list of strings. @@ -305,12 +299,14 @@ class ExportData: forcefolder: This setting shall only be used as exception, and will make it possible to output to a non-standard folder. A ``/`` in front will indicate - an absolute path*; otherwise it will be relative to casepath or rootpath, - as dependent on the both fmu_context and the is_observations - boolean value. A typical use-case is forcefolder="seismic" which will - replace the "cubes" standard folder for Cube output with "seismics". - Use with care and avoid if possible! (*) For absolute paths, the class - variable allow_forcefolder_absolute must set to True. + an absolute path*; otherwise it will be relative to casepath or rootpath, as + dependent on the both fmu_context and the is_observations boolean value. A + typical use-case is forcefolder="seismic" which will replace the "cubes" + standard folder for Cube output with "seismics". Use with care and avoid if + possible! (*) For absolute paths, the class variable + allow_forcefolder_absolute must set to True. + + grid_model: Currently allowed but planned for deprecation grid_model: Currently allowed but planned for deprecation @@ -321,7 +317,9 @@ class ExportData: is_prediction: True (default) if model prediction data is_observation: Default is False. If True, then disk storage will be on the - "share/observations" folder, otherwise on share/result + "share/observations" folder, otherwise on share/result. An exception arise + if fmu_context is "preprocessed", then the folder will be set to + "share/processed" irrespective the value of is_observation. name: Optional but recommended. The name of the object. If not set it is tried to be inferred from the xtgeo/pandas/... object. The name is then checked @@ -337,6 +335,11 @@ class ExportData: detected automatically from the FMU run. Can be used to override in rare cases. If so, numbers must be >= 0 + reuse_metadata_rule: This input is None or a string describing rule for reusing + metadata. Default is None, but if the input is a file string or object with + already valid metdata, then it is assumed to be "preprocessed", which + merges the metadata after predefined rules. + runpath: TODO! Optional and deprecated. The relative location of the current run root. Optional and will in most cases be auto-detected, assuming that FMU folder conventions are followed. For an ERT run e.g. @@ -419,6 +422,7 @@ class ExportData: case_folder: ClassVar[str] = "share/metadata" createfolder: ClassVar[bool] = True cube_fformat: ClassVar[str] = "segy" + filename_timedata_reverse: ClassVar[bool] = False # reverse order output file name grid_fformat: ClassVar[str] = "roff" include_ert2jobs: ClassVar[bool] = False # if True, include jobs.json from ERT2 legacy_time_format: ClassVar[bool] = False @@ -447,6 +451,7 @@ class ExportData: name: str = "" parent: str = "" realization: int = -999 + reuse_metadata_rule: Optional[str] = None runpath: Union[str, Path, None] = None subfolder: str = "" tagname: str = "" @@ -623,6 +628,19 @@ def _establish_pwd_rootpath(self): logger.info("pwd: %s", str(self._pwd)) logger.info("rootpath: %s", str(self._rootpath)) + def _check_obj_if_file(self, obj: Any) -> Any: + """When obj is file-like, it must be checked + assume preprocessed.""" + + if isinstance(obj, (str, Path)): + if isinstance(obj, str): + obj = Path(obj) + if not obj.exists(): + raise ValidationError(f"The file {obj} does not exist.") + if not self.reuse_metadata_rule: + self.reuse_metadata_rule = "preprocessed" + + return obj + # ================================================================================== # Public methods: # ================================================================================== @@ -636,6 +654,9 @@ def generate_metadata(self, obj: Any, compute_md5: bool = True, **kwargs) -> dic Examples of such known types are XTGeo objects (e.g. a RegularSurface), a Pandas Dataframe, a PyArrow table, etc. + If the key ``reuse_metadata_rule`` is applied with legal value, the object may + also be a reference to a file with existing metadata which then will be re-used. + Args: obj: XTGeo instance, a Pandas Dataframe instance or other supported object. compute_md5: If True, compute a MD5 checksum for the exported file. @@ -655,6 +676,7 @@ def generate_metadata(self, obj: Any, compute_md5: bool = True, **kwargs) -> dic self._update_check_settings(kwargs) self._update_globalconfig_from_settings() _check_global_config(self.config) + obj = self._check_obj_if_file(obj) self._establish_pwd_rootpath() self._validate_content_key() self._update_fmt_flag() @@ -688,7 +710,6 @@ def export(self, obj, **kwargs) -> str: Returns: String: full path to exported item. """ - self.generate_metadata(obj, compute_md5=False, **kwargs) metadata = self._metadata @@ -701,6 +722,7 @@ def export(self, obj, **kwargs) -> str: else: useflag = self._usefmtflag + obj = self._check_obj_if_file(obj) logger.info("Export to file and compute MD5 sum, using flag: <%s>", useflag) outfile, md5 = export_file_compute_checksum_md5( obj, outfile, outfile.suffix, flag=useflag diff --git a/tests/test_units/test_prerealization_surfaces.py b/tests/test_units/test_prerealization_surfaces.py index ec310cae5..63343d760 100644 --- a/tests/test_units/test_prerealization_surfaces.py +++ b/tests/test_units/test_prerealization_surfaces.py @@ -1,7 +1,10 @@ """Test the dataio running with pre-realization objects, e.g. surfaces. -Thses outputs will ned an active 'stage' key in order to come into the right folder -and classification +These outputs may need an active 'fmu_context' key in order to come into the right +folder and classification, but there are various ways to to this: + +1) Have files in a folder without any metadata; cf fmu_context="case" +2) Have files with pregenerated matadata in a folder; cf fmu_context="preprocessed" These objects are normally made as hook workflows before ERT has ran any forward jobs and are typically used to compare results. @@ -10,6 +13,7 @@ import os import pytest +from conftest import inside_rms import fmu.dataio.dataio as dataio from fmu.dataio import _utils as utils @@ -18,7 +22,10 @@ def test_regsurf_case_observation(fmurun_w_casemetadata, rmsglobalconfig, regsurf): - """Test generating pre-realization surfaces.""" + """Test generating pre-realization surfaces that comes right to case. + + Notice the difference between this use-case and the 'preprocessed' example later! + """ logger.info("Active folder is %s", fmurun_w_casemetadata) os.chdir(fmurun_w_casemetadata) @@ -65,3 +72,72 @@ def test_regsurf_case_observation_w_symlinks( exp = edata.export(regsurf) assert "ertrun1/share/observations/maps/mymap.gri" in exp + + +def test_regsurf_preprocessed_observation( + fmurun_w_casemetadata, rmssetup, rmsglobalconfig, regsurf +): + """Test generating pre-realization surfaces that comes to share/preprocessed. + + Later, a fmu run will update this (merge metadata) + """ + + @inside_rms + def _export_data_from_rms(rmssetup, rmsglobalconfig, regsurf): + """Run an export of a preprocessed surface inside RMS.""" + logger.info("Active folder is %s", rmssetup) + + os.chdir(rmssetup) + edata = dataio.ExportData( + config=rmsglobalconfig, # read from global config + fmu_context="preprocessed", + name="preprocessedmap", + is_observation=True, + timedata=[[20240802, "moni"], [20200909, "base"]], + ) + + metadata = edata.generate_metadata(regsurf) + logger.debug("\n%s", utils.prettyprint_dict(metadata)) + + assert ( + metadata["file"]["relative_path"] + == "share/preprocessed/maps/preprocessedmap--20240802_20200909.gri" + ) + + return edata.export(regsurf) + + def _run_case_fmu(fmurun_w_casemetadata, rmsglobalconfig, surfacepath): + """Run FMU workflow, using the preprocessed data as case data. + + When re-using metadata, the input object to dataio shall not be a XTGeo or + Pandas or ... instance, but just a file path (either as string or a pathlib.Path + object). This is because we want to avoid time and resources spent on double + reading e.g. a seismic cube, but rather trigger a file copy action instead. + + But it requires that valid metadata for that file is found. The rule for + merging is currently defaulted to "preprocessed". + """ + os.chdir(fmurun_w_casemetadata) + logger.info("Active folder is %s", fmurun_w_casemetadata) + + edata = dataio.ExportData( + config=rmsglobalconfig, # read from global config + fmu_context="case", + name="preprocessed_v2", + is_observation=True, + ) + metadata = edata.generate_metadata( + surfacepath, + ) + logger.debug("\n%s", utils.prettyprint_dict(metadata)) + assert ( + metadata["file"]["relative_path"] + == "share/observations/maps/preprocessed_v2--20240802_20200909.gri" + ) + assert "merged" in metadata["tracklog"][-1]["event"] + + # run two stage process + mysurf = _export_data_from_rms(rmssetup, rmsglobalconfig, regsurf) + _run_case_fmu(fmurun_w_casemetadata, rmsglobalconfig, mysurf) + + logger.info("Preprocessed surface is %s", mysurf)