Skip to content

Commit

Permalink
Add mechanism for preprocessed data
Browse files Browse the repository at this point in the history
Here data can be exported from e.g. RMS
as preprocessed, meaning that most metadata
are valid but FMU metadata will be missing. When
applied in a FMU run, the file with existing metadata
will be used, and FMU specific metadata will be added by
a merge process.
  • Loading branch information
jcrivenaes committed Sep 6, 2022
1 parent e248e09 commit d969279
Show file tree
Hide file tree
Showing 8 changed files with 262 additions and 54 deletions.
1 change: 1 addition & 0 deletions src/fmu/dataio/_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,5 @@ def __post_init__(self):
"realization": "To realization-N/iter_M/share",
"case": "To casename/share, but will also work on project disk",
"case_symlink_realization": "To case/share, with symlinks on realizations level",
"preprocessed": "To share/preprocessed; from interactive runs but re-used later",
}
13 changes: 8 additions & 5 deletions src/fmu/dataio/_filedata_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def _get_filestem(self):
if self.dataio.filename_timedata_reverse: # class variable
stem += "--" + base + "_" + monitor
else:
stem += "--" + monitor + "_" + base
stem += "--" + monitor + "_" + base

stem = stem.replace(".", "_").replace(" ", "_")

Expand Down Expand Up @@ -156,10 +156,13 @@ def _get_path(self):

outroot = outroot / "share"

if self.dataio.is_observation:
outroot = outroot / "observations"
if self.fmu_context == "preprocessed":
outroot = outroot / "preprocessed"
else:
outroot = outroot / "results"
if self.dataio.is_observation:
outroot = outroot / "observations"
else:
outroot = outroot / "results"

dest = outroot / self.efolder # e.g. "maps"

Expand All @@ -184,7 +187,7 @@ def _get_path(self):
if self.dataio.createfolder:
dest.mkdir(parents=True, exist_ok=True)

# check that destination actually exists if verify_folder is True
# check that destination actually exists if verifyfolder is True
if self.dataio.verifyfolder and not dest.exists():
raise IOError(f"Folder {str(dest)} is not present.")

Expand Down
2 changes: 2 additions & 0 deletions src/fmu/dataio/_fmu_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ def detect_provider(self):
logger.info("Detecting FMU provider as None")
self.provider = None # e.g. an interactive RMS run
self.dataio._usecontext = None # e.g. an interactive RMS run
if self.dataio.fmu_context == "preprocessed":
self.dataio._usecontext = self.dataio.fmu_context

def _detect_ert2provider(self) -> bool:
"""Detect if ERT2 is provider and set itername, casename, etc."""
Expand Down
40 changes: 36 additions & 4 deletions src/fmu/dataio/_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,20 @@
import getpass
import logging
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Optional
from warnings import warn

from fmu.dataio._definitions import SCHEMA, SOURCE, VERSION
from fmu.dataio._filedata_provider import _FileDataProvider
from fmu.dataio._fmu_provider import _FmuProvider
from fmu.dataio._objectdata_provider import _ObjectDataProvider
from fmu.dataio._utils import drop_nones, export_file_compute_checksum_md5
from fmu.dataio._utils import (
drop_nones,
export_file_compute_checksum_md5,
glue_metadata_preprocessed,
read_metadata,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -142,10 +148,20 @@ class _MetaData:
# relevant when ERT* fmu_context; same as rootpath in the ExportData class!:
rootpath: str = field(default="", init=False)

# if re-using existing metadata
meta_existing: dict = field(default_factory=dict, init=False)

def __post_init__(self):
logger.setLevel(level=self.verbosity)
logger.info("Initialize _MetaData instance.")

# one special case is that obj is a file path, and dataio.reuse_metadata_rule is
# active. In this case we read the existing metadata here and reuse parts
# according to rule described in string self.reuse_metadata_rule!
if isinstance(self.obj, (str, Path)) and self.dataio.reuse_metadata_rule:
logger.info("Partially reuse existing metadata from %s", self.obj)
self.meta_existing = read_metadata(self.obj)

def _populate_meta_objectdata(self):
"""Analyze the actual object together with input settings.
Expand All @@ -154,8 +170,7 @@ def _populate_meta_objectdata(self):
Hence this must be ran early or first.
"""

self.objdata = _ObjectDataProvider(self.obj, self.dataio)
self.objdata = _ObjectDataProvider(self.obj, self.dataio, self.meta_existing)
self.objdata.derive_metadata()
self.meta_objectdata = self.objdata.metadata

Expand Down Expand Up @@ -252,6 +267,20 @@ def _populate_meta_access(self):
if self.dataio:
self.meta_access = generate_meta_access(self.dataio.config)

def _reuse_existing_metadata(self, meta):
"""Perform a merge procedure if the key `reuse_metadata_rule` is active."""
if self.dataio and self.dataio.reuse_metadata_rule:
oldmeta = self.meta_existing
newmeta = meta.copy()
if self.dataio.reuse_metadata_rule == "preprocessed":
return glue_metadata_preprocessed(oldmeta, newmeta)
else:
raise ValueError(
f"The reuse_metadata_rule {self.dataio.reuse_metadata_rule} is not "
"supported."
)
return meta

def generate_export_metadata(self, skip_null=True) -> dict: # TODO! -> skip_null?
"""Main function to generate the full metadata"""

Expand All @@ -264,7 +293,8 @@ def generate_export_metadata(self, skip_null=True) -> dict: # TODO! -> skip_nul
self._populate_meta_fmu()
self._populate_meta_file()

# glue together metadata, order is as legacy code
# glue together metadata, order is as legacy code (but will be screwed if reuse
# of existing metadata...)
meta = self.meta_dollars.copy()
meta["tracklog"] = self.meta_tracklog
meta["class"] = self.meta_class
Expand All @@ -281,4 +311,6 @@ def generate_export_metadata(self, skip_null=True) -> dict: # TODO! -> skip_nul
if skip_null:
meta = drop_nones(meta)

meta = self._reuse_existing_metadata(meta)

return meta
55 changes: 40 additions & 15 deletions src/fmu/dataio/_objectdata_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,16 @@
import logging
from dataclasses import dataclass, field
from datetime import datetime as dt
from typing import Any
from pathlib import Path
from typing import Any, Optional
from warnings import warn

import numpy as np
import pandas as pd # type: ignore
import xtgeo # type: ignore

from ._definitions import _ValidFormats
from ._utils import generate_description
from ._utils import generate_description, parse_timedata

try:
import pyarrow as pa # type: ignore
Expand All @@ -118,27 +119,29 @@ class _ObjectDataProvider:
* Investigating (parsing) the object (e.g. a XTGeo RegularSurface) itself
* Combine the object info with user settings, globalconfig and class variables
* OR
* investigate current metadata if that is provided
"""

# input fields, cannot be defaulted
# input fields
obj: Any
dataio: Any
meta_existing: Optional[dict] = None

# result properties; the most important is metadata which IS the 'data' part in
# the resulting metadata. But other variables needed later are also given
# as instance properties in addition (for simplicity in other classes/functions)
metadata: dict = field(default_factory=dict)

name: str = ""
classname: str = ""
efolder: str = ""
fmt: str = ""
extension: str = ""
layout: str = ""
bbox: dict = field(default_factory=dict)
specs: dict = field(default_factory=dict)
time0: str = ""
time1: str = ""
metadata: dict = field(default_factory=dict, init=False)
name: str = field(default="", init=False)
classname: str = field(default="", init=False)
efolder: str = field(default="", init=False)
fmt: str = field(default="", init=False)
extension: str = field(default="", init=False)
layout: str = field(default="", init=False)
bbox: dict = field(default_factory=dict, init=False)
specs: dict = field(default_factory=dict, init=False)
time0: str = field(default="", init=False)
time1: str = field(default="", init=False)

def __post_init__(self):

Expand Down Expand Up @@ -562,9 +565,31 @@ def _derive_timedata_newformat(self):
logger.info("Timedata: time0 is %s while time1 is %s", self.time0, self.time1)
return tresult

def _derive_from_existing(self):
"""Derive from existing metadata."""

# do not change any items in 'data' block, as it may ruin e.g. stratigrapical
# setting (i.e. changing data.name is not allowed)
self.metadata = self.meta_existing["data"]
self.name = self.meta_existing["data"]["name"]

# derive the additional attributes needed later e.g. in Filedata provider:
relpath = Path(self.meta_existing["file"]["relative_path"])
self.efolder = relpath.parent.name
self.classname = self.meta_existing["class"]
self.extension = relpath.suffix
self.fmt = self.meta_existing["data"]["format"]

self.time0, self.time1 = parse_timedata(self.meta_existing["data"])

def derive_metadata(self):
"""Main function here, will populate the metadata block for 'data'."""
logger.info("Derive all metadata for data object...")

if self.meta_existing:
self._derive_from_existing()
return

nameres = self._derive_name_stratigraphy()
objres = self._derive_objectdata()

Expand Down
51 changes: 49 additions & 2 deletions src/fmu/dataio/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
import json
import logging
import os
import shutil
import tempfile
import uuid
from copy import deepcopy
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Union

import pandas as pd # type: ignore
import yaml

try:
import pyarrow as pa # type: ignore
Expand All @@ -20,7 +23,6 @@
from pyarrow import feather

import xtgeo # type: ignore
import yaml

from . import _design_kw
from . import _oyaml as oyaml
Expand Down Expand Up @@ -112,7 +114,11 @@ def export_metadata_file(yfile, metadata, savefmt="yaml", verbosity="WARNING") -

def export_file(obj, filename, extension, flag=None):
"""Export a valid object to file"""
if extension == ".gri" and isinstance(obj, xtgeo.RegularSurface):

if isinstance(obj, Path):
# special case when processing data which already has metadata
shutil.copy(obj, filename)
elif extension == ".gri" and isinstance(obj, xtgeo.RegularSurface):
obj.to_file(filename, fformat="irap_binary")
elif extension == ".csv" and isinstance(obj, (xtgeo.Polygons, xtgeo.Points)):
out = obj.copy() # to not modify incoming instance!
Expand Down Expand Up @@ -388,6 +394,47 @@ def generate_description(desc: Optional[Union[str, list]] = None) -> Union[list,
else:
raise ValueError("Description of wrong type, must be list of strings or string")


def read_metadata(filename: Union[str, Path]) -> dict:
"""Read the metadata as a dictionary given a filename.
If the filename is e.g. /some/path/mymap.gri, the assosiated metafile
will be /some/path/.mymap.gri.yml (or json?)
Args:
filename: The full path filename to the data-object.
Returns:
A dictionary with metadata read from the assiated metadata file.
"""
fname = Path(filename)
metafile = str(fname.parent) + "/." + fname.stem + fname.suffix + ".yml"
metafilepath = Path(metafile)
if not metafilepath.exists():
raise IOError(f"Cannot find requested metafile: {metafile}")
with open(metafilepath, "r") as stream:
metacfg = yaml.safe_load(stream)

return metacfg


def glue_metadata_preprocessed(oldmeta, newmeta):
"""Glue (combine) to metadata dicts according to rule 'preprocessed'."""

meta = oldmeta.copy()
meta["fmu"] = newmeta["fmu"]
meta["file"] = newmeta["file"]
meta["access"] = newmeta["access"]

newmeta["tracklog"][-1]["event"] = "merged"
meta["tracklog"].extend(newmeta["tracklog"])

# the only field in 'data' that are allowed to update is name:
meta["data"]["name"] = newmeta["data"]["name"]

return meta


def parse_timedata(datablock: dict, isoformat=True):
"""The time section under datablock has variants to parse.
Expand Down
Loading

0 comments on commit d969279

Please sign in to comment.