Skip to content

Commit

Permalink
ENH: Add ERT WF to copy preprocessed data
Browse files Browse the repository at this point in the history
  • Loading branch information
tnatt committed Jun 28, 2024
1 parent d072a18 commit c32f1ec
Show file tree
Hide file tree
Showing 5 changed files with 390 additions and 3 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ docs = [

[project.entry-points.ert]
dataio_case_metadata = "fmu.dataio.scripts.create_case_metadata"
dataio_copy_preprocessed = "fmu.dataio.scripts.copy_preprocessed"

[tool.setuptools_scm]
write_to = "src/fmu/dataio/version.py"
Expand Down
167 changes: 167 additions & 0 deletions src/fmu/dataio/scripts/copy_preprocessed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
#!/usr/bin/env python

"""Copy preprocessed data to an FMU case while updating the metadata.
This script is intended to be run through an ERT HOOK PRESIM workflow.
"""

from __future__ import annotations

import argparse
import logging
import warnings
from pathlib import Path
from typing import TYPE_CHECKING, Final

from fmu.dataio import ExportPreprocessedData

try:
from ert.shared.plugins.plugin_manager import hook_implementation
except ModuleNotFoundError:
from ert_shared.plugins.plugin_manager import hook_implementation

try:
from ert.config import ErtScript
except ImportError:
from res.job_queue import ErtScript

if TYPE_CHECKING:
from ert.shared.plugins.workflow_config import WorkflowConfigs

logger: Final = logging.getLogger(__name__)
logger.setLevel(logging.CRITICAL)

# This documentation is compiled into ert's internal docs
DESCRIPTION = """
WF_COPY_PREPROCESSED_DATAIO will copy preprocessed data to a FMU run at
<caseroot>/share/observations/. If the data contains metadata this will be
updated with information about the FMU run and ready for upload to sumo.
Preprocessed data refers to data that has been exported with dataio outside of a FMU
context, and is typically located in a share/preprocessed/ folder on the project disk.
"""

EXAMPLES = """
Create an ERT workflow e.g. named ``ert/bin/workflows/xhook_copy_preprocessed_data``
with the contents::
WF_COPY_PREPROCESSED_DATAIO <SCRATCH>/<USER>/<CASE_DIR> <CONFIG_PATH> '../../share/preprocessed/'
Add following lines to your ERT config to have the job automatically executed::
LOAD_WORKFLOW ../bin/workflows/xhook_copy_preprocessed_data
HOOK_WORKFLOW xhook_copy_preprocessed_data PRE_SIMULATION
""" # noqa


def main() -> None:
"""Entry point from command line
When script is called from an ERT workflow, it will be called through the 'run'
method on the WfCopyPreprocessedData class. This context is the intended usage.
The command line entry point is still included, to clarify the difference and
for debugging purposes.
"""
parser = get_parser()
commandline_args = parser.parse_args()
copy_preprocessed_data_main(commandline_args)


class WfCopyPreprocessedData(ErtScript):
"""A class with a run() function that can be registered as an ERT plugin.
This is used for the ERT workflow context. It is prefixed 'Wf' to avoid a
potential naming collisions in fmu-dataio."""

def run(self, *args: str) -> None:
"""Parse arguments and call copy_preprocessed_data_main()"""
parser = get_parser()
workflow_args = parser.parse_args(args)
copy_preprocessed_data_main(workflow_args)


def copy_preprocessed_data_main(args: argparse.Namespace) -> None:
"""Copy the preprocessed data to scratch and upload it to sumo."""

check_arguments(args)
logger.setLevel(args.verbosity)

searchpath = Path(args.ert_config_path) / args.inpath
match_pattern = "[!.]*" # ignore metafiles (starts with '.')
files = [f for f in searchpath.rglob(match_pattern) if f.is_file()]
logger.debug("files found %s", files)

if not files:
raise ValueError(f"No files found in {searchpath=}, check spelling.")

logger.info("Starting to copy preprocessed files to <caseroot>/share/observations/")
for filepath in files:
ExportPreprocessedData(
casepath=args.ert_caseroot,
is_observation=True,
).export(filepath)
logger.info("Copied preprocessed file %s", filepath)

logger.debug("copy_preprocessed_data_main.py has finished.")


def check_arguments(args: argparse.Namespace) -> None:
"""Do basic sanity checks of input"""
logger.debug("Checking input arguments")
logger.debug("Arguments: %s", args)

if args.global_variables_path:
warnings.warn(
"The global variables path is no longer needed. Please remove the "
"'--global_variables_path' argument and path from the workflow file.",
FutureWarning,
)

if not Path(args.ert_caseroot).is_absolute():
logger.debug("Argument 'ert_caseroot' was not absolute: %s", args.ert_caseroot)
raise ValueError("'ert_caseroot' must be an absolute path")

if Path(args.inpath).is_absolute():
logger.debug("Argument 'inpath' is absolute: %s", args.inpath)
raise ValueError(
"'inpath' is an absolute path, it should be relative to the ert_configpath",
)


def get_parser() -> argparse.ArgumentParser:
"""Construct parser object."""
parser = argparse.ArgumentParser()
parser.add_argument("ert_caseroot", type=str, help="Absolute path to the case root")
parser.add_argument(
"ert_config_path", type=str, help="ERT config path (<CONFIG_PATH>)"
)
parser.add_argument(
"inpath",
type=str,
help="Folder with preprocessed data relative to ert_configpath",
default="../../share/preprocessed",
)
parser.add_argument(
"--global_variables_path",
type=str,
help="Deprecated and should be not be used",
)
parser.add_argument(
"--verbosity", type=str, help="Set log level", default="WARNING"
)
return parser


@hook_implementation
def legacy_ertscript_workflow(config: WorkflowConfigs) -> None:
"""Hook the WfCopyPreprocessedData class with documentation into ERT."""
workflow = config.add_workflow(
WfCopyPreprocessedData, "WF_COPY_PREPROCESSED_DATAIO"
)
workflow.parser = get_parser
workflow.description = DESCRIPTION
workflow.examples = EXAMPLES
workflow.category = "export"


if __name__ == "__main__":
main()
10 changes: 9 additions & 1 deletion tests/test_integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,15 @@ def fmu_snakeoil_project(tmp_path, monkeypatch, base_ert_config, global_config2_
"<USER>", # ert username
encoding="utf-8",
)

pathlib.Path(
tmp_path / "ert/bin/workflows/xhook_copy_preprocessed_data"
).write_text(
"WF_COPY_PREPROCESSED_DATAIO "
"<SCRATCH>/<USER>/<CASE_DIR> " # ert case root
"<CONFIG_PATH> " # ert config path
"../../share/preprocessed ", # inpath
encoding="utf-8",
)
pathlib.Path(tmp_path / "ert/model/snakeoil.ert").write_text(
base_ert_config, encoding="utf-8"
)
Expand Down
6 changes: 4 additions & 2 deletions tests/test_integration/test_hook_implementations.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,23 @@

import fmu.dataio.hook_implementations.jobs
from ert.shared.plugins.plugin_manager import ErtPluginManager
from fmu.dataio.scripts import create_case_metadata
from fmu.dataio.scripts import copy_preprocessed, create_case_metadata


def test_hook_implementations():
plugin_manager = ErtPluginManager(
plugins=[
fmu.dataio.hook_implementations.jobs,
create_case_metadata,
copy_preprocessed,
]
)

expected_forward_models = set()
installable_fms = plugin_manager.get_installable_jobs()
assert set(installable_fms) == expected_forward_models

expected_workflow_jobs = {"WF_CREATE_CASE_METADATA"}
expected_workflow_jobs = {"WF_CREATE_CASE_METADATA", "WF_COPY_PREPROCESSED_DATAIO"}
installable_workflow_jobs = plugin_manager.get_installable_workflow_jobs()
for wf_name, wf_location in installable_workflow_jobs.items():
assert wf_name in expected_workflow_jobs
Expand All @@ -31,6 +32,7 @@ def test_hook_implementations_docs():
plugins=[
fmu.dataio.hook_implementations.jobs,
create_case_metadata,
copy_preprocessed,
]
)

Expand Down
Loading

0 comments on commit c32f1ec

Please sign in to comment.