Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: Add ERT WF to copy preprocessed data #716

Merged
merged 1 commit into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ docs = [

[project.entry-points.ert]
dataio_case_metadata = "fmu.dataio.scripts.create_case_metadata"
dataio_copy_preprocessed = "fmu.dataio.scripts.copy_preprocessed"

[tool.setuptools_scm]
write_to = "src/fmu/dataio/version.py"
Expand Down
166 changes: 166 additions & 0 deletions src/fmu/dataio/scripts/copy_preprocessed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
#!/usr/bin/env python

"""Copy preprocessed data to an FMU case while updating the metadata.

This script is intended to be run through an ERT HOOK PRESIM workflow.

"""

from __future__ import annotations

import argparse
import logging
import warnings
from pathlib import Path
from typing import TYPE_CHECKING, Final

from fmu.dataio import ExportPreprocessedData

try:
from ert.shared.plugins.plugin_manager import hook_implementation
except ModuleNotFoundError:
from ert_shared.plugins.plugin_manager import hook_implementation

try:
from ert.config import ErtScript
except ImportError:
from res.job_queue import ErtScript

if TYPE_CHECKING:
from ert.shared.plugins.workflow_config import WorkflowConfigs

logger: Final = logging.getLogger(__name__)

# This documentation is compiled into ert's internal docs
DESCRIPTION = """
WF_COPY_PREPROCESSED_DATAIO will copy preprocessed data to a FMU run at
<caseroot>/share/observations/. If the data contains metadata this will be
updated with information about the FMU run and ready for upload to Sumo.

Preprocessed data refers to data that has been exported with dataio outside of a FMU
context, and is typically located in a share/preprocessed/ folder on the project disk.
"""

EXAMPLES = """
Create an ERT workflow e.g. named ``ert/bin/workflows/xhook_copy_preprocessed_data``
with the contents::
WF_COPY_PREPROCESSED_DATAIO <SCRATCH>/<USER>/<CASE_DIR> <CONFIG_PATH> '../../share/preprocessed/'

Add following lines to your ERT config to have the job automatically executed::
LOAD_WORKFLOW ../bin/workflows/xhook_copy_preprocessed_data
HOOK_WORKFLOW xhook_copy_preprocessed_data PRE_SIMULATION
""" # noqa


def main() -> None:
"""Entry point from command line

When script is called from an ERT workflow, it will be called through the 'run'
method on the WfCopyPreprocessedData class. This context is the intended usage.
The command line entry point is still included, to clarify the difference and
for debugging purposes.
"""
parser = get_parser()
commandline_args = parser.parse_args()
copy_preprocessed_data_main(commandline_args)


class WfCopyPreprocessedData(ErtScript):
"""A class with a run() function that can be registered as an ERT plugin.

This is used for the ERT workflow context. It is prefixed 'Wf' to avoid a
potential naming collisions in fmu-dataio."""

def run(self, *args: str) -> None:
"""Parse arguments and call copy_preprocessed_data_main()"""
parser = get_parser()
workflow_args = parser.parse_args(args)
copy_preprocessed_data_main(workflow_args)


def copy_preprocessed_data_main(args: argparse.Namespace) -> None:
"""Copy the preprocessed data to scratch and upload it to sumo."""

check_arguments(args)
logger.setLevel(args.verbosity)

searchpath = Path(args.ert_config_path) / args.inpath
match_pattern = "[!.]*" # ignore metafiles (starts with '.')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any possibility this ignores other files we want? (I assume not: but just a sanity check)

files = [f for f in searchpath.rglob(match_pattern) if f.is_file()]
logger.debug("files found %s", files)

if not files:
raise ValueError(f"No files found in {searchpath=}, check spelling.")
tnatt marked this conversation as resolved.
Show resolved Hide resolved

logger.info("Starting to copy preprocessed files to <caseroot>/share/observations/")
for filepath in files:
ExportPreprocessedData(
casepath=args.ert_caseroot,
is_observation=True,
).export(filepath)
logger.info("Copied preprocessed file %s", filepath)

logger.debug("copy_preprocessed_data_main.py has finished.")


def check_arguments(args: argparse.Namespace) -> None:
"""Do basic sanity checks of input"""
logger.debug("Checking input arguments")
logger.debug("Arguments: %s", args)

if args.global_variables_path:
warnings.warn(
"The global variables path is no longer needed. Please remove the "
"'--global_variables_path' argument and path from the workflow file.",
FutureWarning,
)

if not Path(args.ert_caseroot).is_absolute():
logger.debug("Argument 'ert_caseroot' was not absolute: %s", args.ert_caseroot)
raise ValueError("'ert_caseroot' must be an absolute path")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

argparse.ArgumentError is another option here, but I see this is most likely taken from create_case_metadata


if Path(args.inpath).is_absolute():
logger.debug("Argument 'inpath' is absolute: %s", args.inpath)
raise ValueError(
"'inpath' is an absolute path, it should be relative to the ert_configpath",
)


def get_parser() -> argparse.ArgumentParser:
"""Construct parser object."""
parser = argparse.ArgumentParser()
parser.add_argument("ert_caseroot", type=str, help="Absolute path to the case root")
parser.add_argument(
"ert_config_path", type=str, help="ERT config path (<CONFIG_PATH>)"
)
parser.add_argument(
"inpath",
type=str,
help="Folder with preprocessed data relative to ert_configpath.",
default="../../share/preprocessed",
)
parser.add_argument(
"--global_variables_path",
type=str,
help="Deprecated and should be not be used",
)
parser.add_argument(
"--verbosity", type=str, help="Set log level", default="WARNING"
)
Comment on lines +147 to +149
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be deprecated like other verbosity options in dataio?

return parser


@hook_implementation
def legacy_ertscript_workflow(config: WorkflowConfigs) -> None:
"""Hook the WfCopyPreprocessedData class with documentation into ERT."""
workflow = config.add_workflow(
WfCopyPreprocessedData, "WF_COPY_PREPROCESSED_DATAIO"
)
workflow.parser = get_parser
workflow.description = DESCRIPTION
workflow.examples = EXAMPLES
workflow.category = "export"


if __name__ == "__main__":
main()
10 changes: 9 additions & 1 deletion tests/test_integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,15 @@ def fmu_snakeoil_project(tmp_path, monkeypatch, base_ert_config, global_config2_
"<USER>", # ert username
encoding="utf-8",
)

pathlib.Path(
tmp_path / "ert/bin/workflows/xhook_copy_preprocessed_data"
).write_text(
"WF_COPY_PREPROCESSED_DATAIO "
"<SCRATCH>/<USER>/<CASE_DIR> " # ert case root
"<CONFIG_PATH> " # ert config path
"../../share/preprocessed ", # inpath
encoding="utf-8",
)
pathlib.Path(tmp_path / "ert/model/snakeoil.ert").write_text(
base_ert_config, encoding="utf-8"
)
Expand Down
6 changes: 4 additions & 2 deletions tests/test_integration/test_hook_implementations.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,23 @@

import fmu.dataio.hook_implementations.jobs
from ert.shared.plugins.plugin_manager import ErtPluginManager
from fmu.dataio.scripts import create_case_metadata
from fmu.dataio.scripts import copy_preprocessed, create_case_metadata


def test_hook_implementations():
plugin_manager = ErtPluginManager(
plugins=[
fmu.dataio.hook_implementations.jobs,
create_case_metadata,
copy_preprocessed,
]
)

expected_forward_models = set()
installable_fms = plugin_manager.get_installable_jobs()
assert set(installable_fms) == expected_forward_models

expected_workflow_jobs = {"WF_CREATE_CASE_METADATA"}
expected_workflow_jobs = {"WF_CREATE_CASE_METADATA", "WF_COPY_PREPROCESSED_DATAIO"}
installable_workflow_jobs = plugin_manager.get_installable_workflow_jobs()
for wf_name, wf_location in installable_workflow_jobs.items():
assert wf_name in expected_workflow_jobs
Expand All @@ -31,6 +32,7 @@ def test_hook_implementations_docs():
plugins=[
fmu.dataio.hook_implementations.jobs,
create_case_metadata,
copy_preprocessed,
]
)

Expand Down
Loading