From d6d8f73f6ed602514388645ffc7b67b83f658a5e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Therese=20Natter=C3=B8y?= <61694854+tnatt@users.noreply.github.com> Date: Tue, 25 Jun 2024 14:55:44 +0200 Subject: [PATCH] ENH: Add ERT WF to copy preprocessed data --- pyproject.toml | 1 + src/fmu/dataio/scripts/copy_preprocessed.py | 166 ++++++++++++++ tests/test_integration/conftest.py | 10 +- .../test_hook_implementations.py | 6 +- .../test_wf_copy_preprocessed_data.py | 209 ++++++++++++++++++ 5 files changed, 389 insertions(+), 3 deletions(-) create mode 100644 src/fmu/dataio/scripts/copy_preprocessed.py create mode 100644 tests/test_integration/test_wf_copy_preprocessed_data.py diff --git a/pyproject.toml b/pyproject.toml index 53bb8e7b4..4fa24577e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -78,6 +78,7 @@ docs = [ [project.entry-points.ert] dataio_case_metadata = "fmu.dataio.scripts.create_case_metadata" +dataio_copy_preprocessed = "fmu.dataio.scripts.copy_preprocessed" [tool.setuptools_scm] write_to = "src/fmu/dataio/version.py" diff --git a/src/fmu/dataio/scripts/copy_preprocessed.py b/src/fmu/dataio/scripts/copy_preprocessed.py new file mode 100644 index 000000000..14888249e --- /dev/null +++ b/src/fmu/dataio/scripts/copy_preprocessed.py @@ -0,0 +1,166 @@ +#!/usr/bin/env python + +"""Copy preprocessed data to an FMU case while updating the metadata. + +This script is intended to be run through an ERT HOOK PRESIM workflow. + +""" + +from __future__ import annotations + +import argparse +import logging +import warnings +from pathlib import Path +from typing import TYPE_CHECKING, Final + +from fmu.dataio import ExportPreprocessedData + +try: + from ert.shared.plugins.plugin_manager import hook_implementation +except ModuleNotFoundError: + from ert_shared.plugins.plugin_manager import hook_implementation + +try: + from ert.config import ErtScript +except ImportError: + from res.job_queue import ErtScript + +if TYPE_CHECKING: + from ert.shared.plugins.workflow_config import WorkflowConfigs + +logger: Final = logging.getLogger(__name__) + +# This documentation is compiled into ert's internal docs +DESCRIPTION = """ +WF_COPY_PREPROCESSED_DATAIO will copy preprocessed data to a FMU run at +/share/observations/. If the data contains metadata this will be +updated with information about the FMU run and ready for upload to Sumo. + +Preprocessed data refers to data that has been exported with dataio outside of a FMU +context, and is typically located in a share/preprocessed/ folder on the project disk. +""" + +EXAMPLES = """ +Create an ERT workflow e.g. named ``ert/bin/workflows/xhook_copy_preprocessed_data`` +with the contents:: + WF_COPY_PREPROCESSED_DATAIO // '../../share/preprocessed/' + +Add following lines to your ERT config to have the job automatically executed:: + LOAD_WORKFLOW ../bin/workflows/xhook_copy_preprocessed_data + HOOK_WORKFLOW xhook_copy_preprocessed_data PRE_SIMULATION +""" # noqa + + +def main() -> None: + """Entry point from command line + + When script is called from an ERT workflow, it will be called through the 'run' + method on the WfCopyPreprocessedData class. This context is the intended usage. + The command line entry point is still included, to clarify the difference and + for debugging purposes. + """ + parser = get_parser() + commandline_args = parser.parse_args() + copy_preprocessed_data_main(commandline_args) + + +class WfCopyPreprocessedData(ErtScript): + """A class with a run() function that can be registered as an ERT plugin. + + This is used for the ERT workflow context. It is prefixed 'Wf' to avoid a + potential naming collisions in fmu-dataio.""" + + def run(self, *args: str) -> None: + """Parse arguments and call copy_preprocessed_data_main()""" + parser = get_parser() + workflow_args = parser.parse_args(args) + copy_preprocessed_data_main(workflow_args) + + +def copy_preprocessed_data_main(args: argparse.Namespace) -> None: + """Copy the preprocessed data to scratch and upload it to sumo.""" + + check_arguments(args) + logger.setLevel(args.verbosity) + + searchpath = Path(args.ert_config_path) / args.inpath + match_pattern = "[!.]*" # ignore metafiles (starts with '.') + files = [f for f in searchpath.rglob(match_pattern) if f.is_file()] + logger.debug("files found %s", files) + + if not files: + raise ValueError(f"No files found in {searchpath=}, check spelling.") + + logger.info("Starting to copy preprocessed files to /share/observations/") + for filepath in files: + ExportPreprocessedData( + casepath=args.ert_caseroot, + is_observation=True, + ).export(filepath) + logger.info("Copied preprocessed file %s", filepath) + + logger.debug("copy_preprocessed_data_main.py has finished.") + + +def check_arguments(args: argparse.Namespace) -> None: + """Do basic sanity checks of input""" + logger.debug("Checking input arguments") + logger.debug("Arguments: %s", args) + + if args.global_variables_path: + warnings.warn( + "The global variables path is no longer needed. Please remove the " + "'--global_variables_path' argument and path from the workflow file.", + FutureWarning, + ) + + if not Path(args.ert_caseroot).is_absolute(): + logger.debug("Argument 'ert_caseroot' was not absolute: %s", args.ert_caseroot) + raise ValueError("'ert_caseroot' must be an absolute path") + + if Path(args.inpath).is_absolute(): + logger.debug("Argument 'inpath' is absolute: %s", args.inpath) + raise ValueError( + "'inpath' is an absolute path, it should be relative to the ert_configpath", + ) + + +def get_parser() -> argparse.ArgumentParser: + """Construct parser object.""" + parser = argparse.ArgumentParser() + parser.add_argument("ert_caseroot", type=str, help="Absolute path to the case root") + parser.add_argument( + "ert_config_path", type=str, help="ERT config path ()" + ) + parser.add_argument( + "inpath", + type=str, + help="Folder with preprocessed data relative to ert_configpath.", + default="../../share/preprocessed", + ) + parser.add_argument( + "--global_variables_path", + type=str, + help="Deprecated and should be not be used", + ) + parser.add_argument( + "--verbosity", type=str, help="Set log level", default="WARNING" + ) + return parser + + +@hook_implementation +def legacy_ertscript_workflow(config: WorkflowConfigs) -> None: + """Hook the WfCopyPreprocessedData class with documentation into ERT.""" + workflow = config.add_workflow( + WfCopyPreprocessedData, "WF_COPY_PREPROCESSED_DATAIO" + ) + workflow.parser = get_parser + workflow.description = DESCRIPTION + workflow.examples = EXAMPLES + workflow.category = "export" + + +if __name__ == "__main__": + main() diff --git a/tests/test_integration/conftest.py b/tests/test_integration/conftest.py index 07e6b3630..5ab02b3f7 100644 --- a/tests/test_integration/conftest.py +++ b/tests/test_integration/conftest.py @@ -56,7 +56,15 @@ def fmu_snakeoil_project(tmp_path, monkeypatch, base_ert_config, global_config2_ "", # ert username encoding="utf-8", ) - + pathlib.Path( + tmp_path / "ert/bin/workflows/xhook_copy_preprocessed_data" + ).write_text( + "WF_COPY_PREPROCESSED_DATAIO " + "// " # ert case root + " " # ert config path + "../../share/preprocessed ", # inpath + encoding="utf-8", + ) pathlib.Path(tmp_path / "ert/model/snakeoil.ert").write_text( base_ert_config, encoding="utf-8" ) diff --git a/tests/test_integration/test_hook_implementations.py b/tests/test_integration/test_hook_implementations.py index beaddc97a..64a3625da 100644 --- a/tests/test_integration/test_hook_implementations.py +++ b/tests/test_integration/test_hook_implementations.py @@ -2,7 +2,7 @@ import fmu.dataio.hook_implementations.jobs from ert.shared.plugins.plugin_manager import ErtPluginManager -from fmu.dataio.scripts import create_case_metadata +from fmu.dataio.scripts import copy_preprocessed, create_case_metadata def test_hook_implementations(): @@ -10,6 +10,7 @@ def test_hook_implementations(): plugins=[ fmu.dataio.hook_implementations.jobs, create_case_metadata, + copy_preprocessed, ] ) @@ -17,7 +18,7 @@ def test_hook_implementations(): installable_fms = plugin_manager.get_installable_jobs() assert set(installable_fms) == expected_forward_models - expected_workflow_jobs = {"WF_CREATE_CASE_METADATA"} + expected_workflow_jobs = {"WF_CREATE_CASE_METADATA", "WF_COPY_PREPROCESSED_DATAIO"} installable_workflow_jobs = plugin_manager.get_installable_workflow_jobs() for wf_name, wf_location in installable_workflow_jobs.items(): assert wf_name in expected_workflow_jobs @@ -31,6 +32,7 @@ def test_hook_implementations_docs(): plugins=[ fmu.dataio.hook_implementations.jobs, create_case_metadata, + copy_preprocessed, ] ) diff --git a/tests/test_integration/test_wf_copy_preprocessed_data.py b/tests/test_integration/test_wf_copy_preprocessed_data.py new file mode 100644 index 000000000..4d791abec --- /dev/null +++ b/tests/test_integration/test_wf_copy_preprocessed_data.py @@ -0,0 +1,209 @@ +import ert.__main__ +import fmu.dataio as dataio +import pytest +import yaml + + +def _export_preprocessed_data(config, regsurf): + """Export preprocessed surfaces""" + dataio.ExportData( + config=config, + preprocessed=True, + name="TopVolantis", + content="depth", + subfolder="mysubfolder", + ).export(regsurf) + + dataio.ExportData( + config=config, + preprocessed=True, + name="TopVolon", + content="depth", + ).export(regsurf) + + +def _add_create_case_workflow(filepath): + with open(filepath, "a", encoding="utf-8") as f: + f.writelines( + [ + "LOAD_WORKFLOW ../bin/workflows/xhook_create_case_metadata\n" + "HOOK_WORKFLOW xhook_create_case_metadata PRE_SIMULATION\n" + ] + ) + + +def _add_copy_preprocessed_workflow(filepath): + with open(filepath, "a", encoding="utf-8") as f: + f.writelines( + [ + "LOAD_WORKFLOW ../bin/workflows/xhook_copy_preprocessed_data\n" + "HOOK_WORKFLOW xhook_copy_preprocessed_data PRE_SIMULATION\n" + ] + ) + + +def test_copy_preprocessed_runs_successfully( + fmu_snakeoil_project, monkeypatch, mocker, globalconfig2, regsurf +): + """Test that exporting preprocessed data works and that the metadata is updated""" + monkeypatch.chdir(fmu_snakeoil_project) + _export_preprocessed_data(globalconfig2, regsurf) + + monkeypatch.chdir(fmu_snakeoil_project / "ert/model") + _add_create_case_workflow("snakeoil.ert") + _add_copy_preprocessed_workflow("snakeoil.ert") + + mocker.patch( + "sys.argv", + ["ert", "test_run", "snakeoil.ert", "--disable-monitoring"], + ) + ert.__main__.main() + + fmu_case = fmu_snakeoil_project / "scratch/user/snakeoil" + + fmu_case_yml = fmu_case / "share/metadata/fmu_case.yml" + assert fmu_case_yml.exists() + + observations_folder = fmu_case / "share/observations" + + assert (observations_folder / "maps/topvolon.gri").exists() + assert (observations_folder / "maps/.topvolon.gri.yml").exists() + assert (observations_folder / "maps/mysubfolder/topvolantis.gri").exists() + assert (observations_folder / "maps/mysubfolder/.topvolantis.gri.yml").exists() + + # check one of the metafiles to see that the fmu block has been added + metafile = observations_folder / "maps/.topvolon.gri.yml" + with open(metafile, encoding="utf-8") as f: + meta = yaml.safe_load(f) + + assert meta["fmu"]["case"]["name"] == "snakeoil" + assert meta["fmu"]["case"]["user"]["id"] == "user" + assert meta["fmu"]["context"]["stage"] == "case" + assert len(meta["tracklog"]) == 2 + + +def test_copy_preprocessed_no_casemeta( + fmu_snakeoil_project, monkeypatch, mocker, globalconfig2, regsurf, capsys +): + """Test that an error is written to stderr if no case metadata can be found.""" + + monkeypatch.chdir(fmu_snakeoil_project) + _export_preprocessed_data(globalconfig2, regsurf) + + monkeypatch.chdir(fmu_snakeoil_project / "ert/model") + _add_copy_preprocessed_workflow("snakeoil.ert") + + mocker.patch( + "sys.argv", + ["ert", "test_run", "snakeoil.ert", "--disable-monitoring"], + ) + + ert.__main__.main() + + _stdout, stderr = capsys.readouterr() + assert "ValueError: Could not detect valid case metadata" in stderr + + +def test_copy_preprocessed_no_preprocessed_files( + fmu_snakeoil_project, monkeypatch, mocker, capsys +): + """ + Test that an error is written to stderr if no files can be found. + Here represented by not running the initial export of preprocessed data + """ + + monkeypatch.chdir(fmu_snakeoil_project / "ert/model") + _add_create_case_workflow("snakeoil.ert") + _add_copy_preprocessed_workflow("snakeoil.ert") + + mocker.patch( + "sys.argv", + ["ert", "test_run", "snakeoil.ert", "--disable-monitoring"], + ) + + ert.__main__.main() + + _stdout, stderr = capsys.readouterr() + assert "No files found in searchpath" in stderr + + +def test_inpath_absolute_path_raises(fmu_snakeoil_project, monkeypatch, mocker, capsys): + """Test that an error is written to stderr if the inpath argument is absolute""" + + # create a workflow file with an absoulte inpath + workflow_file = ( + fmu_snakeoil_project / "ert/bin/workflows/xhook_copy_preprocessed_data" + ) + with open(workflow_file, encoding="utf-8", mode="w") as f: + f.write( + "WF_COPY_PREPROCESSED_DATAIO // " + "/../../share/preprocessed" # absolute path + ) + + monkeypatch.chdir(fmu_snakeoil_project / "ert/model") + _add_copy_preprocessed_workflow("snakeoil.ert") + + mocker.patch( + "sys.argv", + ["ert", "test_run", "snakeoil.ert", "--disable-monitoring"], + ) + + ert.__main__.main() + + _stdout, stderr = capsys.readouterr() + assert "ValueError: 'inpath' is an absolute path" in stderr + + +def test_copy_preprocessed_no_preprocessed_meta( + fmu_snakeoil_project, monkeypatch, mocker, regsurf +): + """Test that a pure copy happens if the files don't have metadata""" + + monkeypatch.chdir(fmu_snakeoil_project) + # an invalid config will trigger no metadata to be created + _export_preprocessed_data({"wrong": "config"}, regsurf) + + monkeypatch.chdir(fmu_snakeoil_project / "ert/model") + _add_create_case_workflow("snakeoil.ert") + _add_copy_preprocessed_workflow("snakeoil.ert") + + mocker.patch( + "sys.argv", + ["ert", "test_run", "snakeoil.ert", "--disable-monitoring"], + ) + + with pytest.warns(UserWarning, match=r"will be copied.+but without metadata"): + ert.__main__.main() + + observations_folder = ( + fmu_snakeoil_project / "scratch/user/snakeoil/share/observations" + ) + + assert (observations_folder / "maps/topvolon.gri").exists() + assert not (observations_folder / "maps/.topvolon.gri.yml").exists() + assert (observations_folder / "maps/mysubfolder/topvolantis.gri").exists() + assert not (observations_folder / "maps/mysubfolder/.topvolantis.gri.yml").exists() + + +def test_deprecation_warning_global_variables( + fmu_snakeoil_project, monkeypatch, mocker +): + """Test that deprecation warning is issued if global variables path is input""" + + # add the deprecated argument to the workflow file + workflow_file = ( + fmu_snakeoil_project / "ert/bin/workflows/xhook_copy_preprocessed_data" + ) + with open(workflow_file, encoding="utf-8", mode="a") as f: + f.write(" '--global_variables_path' dummypath") + + monkeypatch.chdir(fmu_snakeoil_project / "ert/model") + _add_copy_preprocessed_workflow("snakeoil.ert") + + mocker.patch( + "sys.argv", + ["ert", "test_run", "snakeoil.ert", "--disable-monitoring"], + ) + + with pytest.warns(FutureWarning, match="no longer needed"): + ert.__main__.main()