diff --git a/CHANGELOG.md b/CHANGELOG.md index 78399d850..873d029bd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,11 +5,14 @@ This project adheres to [Semantic Versioning](https://semver.org/). ## Unreleased ### Added +- [#349](https://github.com/equinor/flownet/pull/349) Analytics workflow now also available for prediction mode. - [#351](https://github.com/equinor/flownet/pull/351) Added simple plotting tool that allows for plotting of FlowNet ensembles and observations. ### Fixes +- [#349](https://github.com/equinor/flownet/pull/349) Fixes bug where the parquet parameters file would not be saved for iterations larger than 9. ### Changes +- [#349](https://github.com/equinor/flownet/pull/349) Structure change of the code. Moved all forward models called from ERT to a seperate folder ert/forward_models. Scripts moved: delete_simulation_output, save_iteration_parameters, iteration_analytics, render_realization and flow_job. - [#347](https://github.com/equinor/flownet/pull/347) Additional flow nodes is now allowed to be either a list (equal length of number of layers) or a single integer (which will be split over the layers according to volume of concave hull). ## [0.5.1] - 2021-03-03 diff --git a/setup.py b/setup.py index 4115b0317..023463ebb 100644 --- a/setup.py +++ b/setup.py @@ -56,16 +56,18 @@ use_scm_version=True, package_dir={"": "src"}, packages=find_packages("src"), - package_data={"flownet": ["templates/*", "static/*", "ert/FLOW_SIMULATION"]}, + package_data={ + "flownet": ["templates/*", "static/*", "ert/forward_models/FLOW_SIMULATION"] + }, entry_points={ - "ert": ["flow = flownet.ert._flow_job"], + "ert": ["flow = flownet.ert.forward_models._flow_job"], "console_scripts": [ "flownet=flownet._command_line:main", - "flownet_render_realization=flownet.realization:render_realization", - "flownet_delete_simulation_output=flownet.ahm:delete_simulation_output", - "flownet_run_flow=flownet.ert._flow_job:run_flow", - "flownet_save_iteration_parameters=flownet.ahm:save_iteration_parameters", - "flownet_save_iteration_analytics=flownet.ahm:save_iteration_analytics", + "flownet_render_realization=flownet.ert.forward_models:render_realization", + "flownet_delete_simulation_output=flownet.ert.forward_models:delete_simulation_output", + "flownet_run_flow=flownet.ert.forward_models:run_flow", + "flownet_save_iteration_parameters=flownet.ert.forward_models:save_iteration_parameters", + "flownet_save_iteration_analytics=flownet.ert.forward_models:save_iteration_analytics", "flownet_plot_results=flownet.utils.plot_results:main", ], }, diff --git a/src/flownet/ahm/__init__.py b/src/flownet/ahm/__init__.py index 9160bf02d..95526e3b5 100644 --- a/src/flownet/ahm/__init__.py +++ b/src/flownet/ahm/__init__.py @@ -1,7 +1,2 @@ -from ._assisted_history_matching import ( - AssistedHistoryMatching, - delete_simulation_output, - save_iteration_parameters, -) -from ._ahm_iteration_analytics import save_iteration_analytics +from ._assisted_history_matching import AssistedHistoryMatching from ._run_ahm import run_flownet_history_matching diff --git a/src/flownet/ahm/_assisted_history_matching.py b/src/flownet/ahm/_assisted_history_matching.py index 340d142ef..6653d9fbc 100644 --- a/src/flownet/ahm/_assisted_history_matching.py +++ b/src/flownet/ahm/_assisted_history_matching.py @@ -1,17 +1,8 @@ import argparse -import concurrent.futures -import glob -import json -import os -import pathlib -import re -import shutil -from typing import List, Dict, Tuple - +from typing import List from configsuite import ConfigSuite import jinja2 import numpy as np -import pandas as pd from ..ert import create_ert_setup, run_ert_subprocess from ..realization import Schedule @@ -160,97 +151,3 @@ def report(self): else " None ", ) print("") - - -def delete_simulation_output(): - """ - This function is called by a forward model in ERT, deleting unnecessary - simulation output files. - - Returns: - Nothing - - """ - parser = argparse.ArgumentParser(prog="Delete simulation output.") - - parser.add_argument( - "ecl_base", type=str, help="Base name of the simulation DATA file" - ) - - args = parser.parse_args() - - for suffix in ["EGRID", "INIT", "UNRST", "LOG", "PRT"]: - if os.path.exists(f"{args.ecl_base}.{suffix}"): - os.remove(f"{args.ecl_base}.{suffix}") - - -def _load_parameters(runpath: str) -> Tuple[int, Dict]: - """ - Internal helper function to load parameter.json files in - parallel. - - Args: - runpath: Path to where the realization is run. - - Returns: - Dictionary with the realizations' parameters. - - """ - realization = int(re.findall(r"[0-9]+", runpath)[-2]) - parameters = json.loads((pathlib.Path(runpath) / "parameters.json").read_text()) - - return realization, parameters["FLOWNET_PARAMETERS"] - - -def save_iteration_parameters(): - """ - This function is called as a pre-simulation workflow in ERT, saving all - parameters of an iteration to a file. - - The resulting dataframe is saved as a gzipped parquet file using a PyArrow table - and has the following format (example for 5 realizations and 2 parameters): - - | index = realization | parameter 1 | parameter 2 | - |=====================|=============|=============| - | 1 | x.x | x.x | - | 3 | x.x | x.x | - | 5 | x.x | x.x | - | 4 | x.x | x.x | - | 2 | x.x | x.x | - - Mind that the dataframe is not ordered. - - Returns: - Nothing - - """ - parser = argparse.ArgumentParser(prog="Save iteration parameters to a file.") - parser.add_argument("runpath", type=str, help="Path to the ERT runpath.") - args = parser.parse_args() - args.runpath = args.runpath.replace("%d", "*") - - print("Saving ERT parameters to file...", end=" ") - - iteration = int(re.findall(r"[0-9]+", sorted(glob.glob(args.runpath))[-1])[-1]) - runpath_list = glob.glob(args.runpath[::-1].replace("*", str(iteration), 1)[::-1]) - realizations_dict = {} - - with concurrent.futures.ProcessPoolExecutor() as executor: - for result in executor.map(_load_parameters, runpath_list): - realizations_dict[result[0]] = result[1] - - pd.DataFrame( - [parameters for _, parameters in realizations_dict.items()], - index=realizations_dict.keys(), - ).to_parquet( - f"parameters_iteration-{iteration}.parquet.gzip", - index=True, - engine="pyarrow", - compression="gzip", - ) - - shutil.copyfile( - f"parameters_iteration-{iteration}.parquet.gzip", - "parameters_iteration-latest.parquet.gzip", - ) - print("[Done]") diff --git a/src/flownet/ahm/_run_ahm.py b/src/flownet/ahm/_run_ahm.py index 62aef4ad1..caeecf559 100644 --- a/src/flownet/ahm/_run_ahm.py +++ b/src/flownet/ahm/_run_ahm.py @@ -399,10 +399,11 @@ def run_flownet_history_matching( field_data = FlowData( config.flownet.data_source.simulation.input_case, layers=config.flownet.data_source.simulation.layers, - perforation_handling_strategy=config.flownet.perforation_handling_strategy, ) df_production_data: pd.DataFrame = field_data.production - df_well_connections: pd.DataFrame = field_data.well_connections + df_well_connections: pd.DataFrame = field_data.get_well_connections( + config.flownet.perforation_handling_strategy + ) # Load log data if required df_well_logs: Optional[pd.DataFrame] = ( diff --git a/src/flownet/config_parser/_config_parser.py b/src/flownet/config_parser/_config_parser.py index a4b438359..794f251a2 100644 --- a/src/flownet/config_parser/_config_parser.py +++ b/src/flownet/config_parser/_config_parser.py @@ -1,7 +1,7 @@ import warnings import os import pathlib -from typing import Dict, Optional, List, Union +from typing import Dict, Optional, List import yaml import configsuite @@ -9,6 +9,12 @@ import pandas as pd from ._merge_configs import merge_configs +from ._config_transformations import ( + _integer_to_list, + _str_none_to_none, + _to_lower, + _to_upper, +) from ..data.from_flow import FlowData @@ -31,53 +37,6 @@ def create_schema(config_folder: Optional[pathlib.Path] = None) -> Dict: """ - @configsuite.transformation_msg("Convert integer to list") - def _integer_to_list(input_data: Union[List, int]) -> List: - """ - Converts integer to list with single item. - - Args: - input_data (Union[List, int]): - - Returns: - The input_data. If it wasn't a list yet is will be turned into a list. - """ - if isinstance(input_data, int): - input_data = [input_data] - return input_data - - @configsuite.transformation_msg("Convert 'None' to None") - def _str_none_to_none( - input_data: Union[str, int, float, None] - ) -> Union[str, int, float, None]: - """ - Converts "None" to None - Args: - input_data (Union[str, int, float, None]): - - Returns: - The input_data. If the input is "None" or "none" it is converted to None (str to None) - """ - if isinstance(input_data, str): - if input_data.lower() == "none": - return None - - return input_data - - @configsuite.transformation_msg("Convert string to lower case") - def _to_lower(input_data: Union[List[str], str]) -> Union[List[str], str]: - if isinstance(input_data, str): - return input_data.lower() - - return [x.lower() for x in input_data] - - @configsuite.transformation_msg("Convert string to upper case") - def _to_upper(input_data: Union[List[str], str]) -> Union[List[str], str]: - if isinstance(input_data, str): - return input_data.upper() - - return [x.upper() for x in input_data] - @configsuite.transformation_msg("Convert input string to absolute path") def _to_abs_path(path: Optional[str]) -> str: """ @@ -602,7 +561,8 @@ def _to_abs_path(path: Optional[str]) -> str: }, MK.Transformation: _to_upper, MK.Description: "List of accuracy metrics to be computed " - "in FlowNet analysis workflow", + "in FlowNet analysis workflow. " + "Supported metrics: MSE, RMSE, NRMSE, MAE, NMAE, R2", }, "quantity": { MK.Type: types.List, diff --git a/src/flownet/config_parser/_config_parser_pred.py b/src/flownet/config_parser/_config_parser_pred.py index b6c4a7a97..73d2813f1 100644 --- a/src/flownet/config_parser/_config_parser_pred.py +++ b/src/flownet/config_parser/_config_parser_pred.py @@ -1,26 +1,46 @@ import os import pathlib -from typing import Dict +from typing import Dict, Optional import yaml import configsuite from configsuite import types, MetaKeys as MK, ConfigSuite from ._merge_configs import merge_configs +from ._config_transformations import _to_upper -def create_schema(_to_abs_path) -> Dict: +def create_schema(config_folder: Optional[pathlib.Path] = None) -> Dict: """ Returns a configsuite type schema, where configuration value types are defined, together with which values are optional and/or has default values. Args: - _to_abs_path: Use absolute path transformation - + config_folder: Optional path to folder with config file Returns: Dictionary to be used as configsuite type schema """ + + @configsuite.transformation_msg("Convert input string to absolute path") + def _to_abs_path(path: Optional[str]) -> str: + """ + Helper function for the configsuite. Takes in a path as a string and + attempts to convert it to an absolute path. + + Args: + path: A relative or absolute path or None + + Returns: + Absolute path or empty string + + """ + if path is None: + return "" + if pathlib.Path(path).is_absolute(): + return str(pathlib.Path(path).resolve()) + return str((config_folder / pathlib.Path(path)).resolve()) # type: ignore + return { MK.Type: types.NamedDict, MK.Content: { @@ -70,6 +90,65 @@ def create_schema(_to_abs_path) -> Dict: "max_running": {MK.Type: types.Integer}, }, }, + "ref_sim": { + MK.Type: types.String, + MK.Transformation: _to_abs_path, + MK.Description: "Reference simulation to be used in analysis", + MK.AllowNone: True, + }, + "analysis": { + MK.Type: types.List, + MK.Description: "List of analysis workflows to run.", + MK.Content: { + MK.Item: { + MK.Type: types.NamedDict, + MK.Description: "Definitions of the analysis workflow.", + MK.Content: { + "metric": { + MK.Type: types.List, + MK.Content: { + MK.Item: { + MK.Type: types.String, + MK.AllowNone: True, + } + }, + MK.Transformation: _to_upper, + MK.Description: "List of accuracy metrics to be computed " + "in FlowNet analysis workflow. " + "Supported metrics: MSE, RMSE, NRMSE, MAE, NMAE, R2", + }, + "quantity": { + MK.Type: types.List, + MK.Content: { + MK.Item: { + MK.Type: types.String, + MK.AllowNone: True, + } + }, + MK.Transformation: _to_upper, + MK.Description: "List of summary vectors for which accuracy " + "is to be computed", + }, + "start": { + MK.Type: types.Date, + MK.AllowNone: True, + MK.Description: "Start date in YYYY-MM-DD format.", + }, + "end": { + MK.Type: types.Date, + MK.AllowNone: True, + MK.Description: "End date in YYYY-MM-DD format.", + }, + "outfile": { + MK.Type: types.String, + MK.AllowNone: True, + MK.Description: "The filename of the output of the workflow. " + "In case multiple analysis workflows are run this name should be unique.", + }, + }, + }, + }, + }, }, }, }, @@ -101,27 +180,10 @@ def parse_pred_config( yaml.safe_load(update_config.read_text()), ) - @configsuite.transformation_msg("Tries to convert input to absolute path") - def _to_abs_path(path: str) -> str: - """ - Helper function for the configsuite. Take in a path as a string and - attempts to convert it to an absolute path. - - Args: - path: A relative or absolute path - - Returns: - Absolute path - - """ - if path is None: - return "" - if pathlib.Path(path).is_absolute(): - return str(pathlib.Path(path).resolve()) - return str((base_config.parent / pathlib.Path(path)).resolve()) - suite = ConfigSuite( - input_config, create_schema(_to_abs_path=_to_abs_path), deduce_required=True + input_config, + create_schema(config_folder=base_config.parent), + deduce_required=True, ) if not suite.valid: @@ -139,4 +201,10 @@ def _to_abs_path(path: str) -> str: "Queue name and server needs to be provided if system is not 'LOCAL'." ) + if config.ert.analysis and not config.ert.ref_sim: + raise ValueError( + "Path to the folder of a reference simulation (ref_sim), " + "required for the analytics workflow is missing." + ) + return config diff --git a/src/flownet/config_parser/_config_transformations.py b/src/flownet/config_parser/_config_transformations.py new file mode 100644 index 000000000..3c2880b8d --- /dev/null +++ b/src/flownet/config_parser/_config_transformations.py @@ -0,0 +1,54 @@ +from typing import List, Union + +import configsuite + + +@configsuite.transformation_msg("Convert integer to list") +def _integer_to_list(input_data: Union[List, int]) -> List: + """ + Converts integer to list with single item. + + Args: + input_data (Union[List, int]): + + Returns: + The input_data. If it wasn't a list yet is will be turned into a list. + """ + if isinstance(input_data, int): + input_data = [input_data] + return input_data + + +@configsuite.transformation_msg("Convert 'None' to None") +def _str_none_to_none( + input_data: Union[str, int, float, None] +) -> Union[str, int, float, None]: + """ + Converts "None" to None + Args: + input_data (Union[str, int, float, None]): + + Returns: + The input_data. If the input is "None" or "none" it is converted to None (str to None) + """ + if isinstance(input_data, str): + if input_data.lower() == "none": + return None + + return input_data + + +@configsuite.transformation_msg("Convert string to lower case") +def _to_lower(input_data: Union[List[str], str]) -> Union[List[str], str]: + if isinstance(input_data, str): + return input_data.lower() + + return [x.lower() for x in input_data] + + +@configsuite.transformation_msg("Convert string to upper case") +def _to_upper(input_data: Union[List[str], str]) -> Union[List[str], str]: + if isinstance(input_data, str): + return input_data.upper() + + return [x.upper() for x in input_data] diff --git a/src/flownet/data/from_flow.py b/src/flownet/data/from_flow.py index 4401c649a..ad38e29d3 100644 --- a/src/flownet/data/from_flow.py +++ b/src/flownet/data/from_flow.py @@ -22,8 +22,6 @@ class FlowData(FromSource): Args: input_case: Full path to eclipse case to load data from layers: List with definition of isolated layers, if present. - perforation_handling_strategy: How to deal with perforations per well. - ('bottom_point', 'top_point', 'multiple') """ @@ -31,7 +29,6 @@ def __init__( self, input_case: Union[Path, str], layers: Tuple = (), - perforation_handling_strategy: str = "bottom_point", ): super().__init__() @@ -44,15 +41,18 @@ def __init__( self._wells = compdat.df(EclFiles(str(self._input_case))) self._layers = layers - self._perforation_handling_strategy: str = perforation_handling_strategy - # pylint: disable=too-many-branches - def _well_connections(self) -> pd.DataFrame: + def _well_connections(self, perforation_handling_strategy: str) -> pd.DataFrame: """ Function to extract well connection coordinates from a Flow simulation including their opening and closure time. The output of this function will be filtered based on the configured perforation strategy. + Args: + perforation_handling_strategy: Strategy to be used when creating perforations. + Valid options are bottom_point, top_point, multiple, time_avg_open_location and + multiple_based_on_workovers. + Returns: columns: WELL_NAME, X, Y, Z, DATE, OPEN, LAYER_ID @@ -100,11 +100,11 @@ def _well_connections(self) -> pd.DataFrame: try: perforation_strategy_method = getattr( - perforation_strategy, self._perforation_handling_strategy + perforation_strategy, perforation_handling_strategy ) except AttributeError as attribute_error: raise NotImplementedError( - f"The perforation handling strategy {self._perforation_handling_strategy} is unknown." + f"The perforation handling strategy {perforation_handling_strategy} is unknown." ) from attribute_error return perforation_strategy_method(df).sort_values(["DATE"]) @@ -362,6 +362,26 @@ def get_unique_regions(self, name: str) -> np.ndarray: """array with unique 'name' regions""" return np.unique(self._init[name][0]) + def get_well_connections(self, perforation_handling_strategy: str) -> pd.DataFrame: + """ + Function to get dataframe with all well connection coordinates, + filtered based on the perforation_handling_strategy. + + Args: + perforation_handling_strategy: Strategy to be used when creating perforations. + Valid options are bottom_point, top_point, multiple, + time_avg_open_location and multiple_based_on_workovers. + + Returns: + Dataframe with all well connection coordinates, + filtered based on the perforation_handling_strategy. + Columns: WELL_NAME, X, Y, Z, DATE, OPEN, LAYER_ID + """ + + return self._well_connections( + perforation_handling_strategy=perforation_handling_strategy + ) + @property def faults(self) -> pd.DataFrame: """dataframe with all fault data""" @@ -372,11 +392,6 @@ def production(self) -> pd.DataFrame: """dataframe with all production data""" return self._production_data() - @property - def well_connections(self) -> pd.DataFrame: - """dataframe with all well connection coordinates""" - return self._well_connections() - @property def well_logs(self) -> pd.DataFrame: """dataframe with all well log""" diff --git a/src/flownet/data/from_source.py b/src/flownet/data/from_source.py index 08f7a8ea6..08e420b73 100644 --- a/src/flownet/data/from_source.py +++ b/src/flownet/data/from_source.py @@ -16,11 +16,10 @@ def production(self) -> pd.DataFrame: "The production property is required to be implemented in a FromSource class." ) - @property @abstractmethod - def well_connections(self) -> pd.DataFrame: + def get_well_connections(self, perforation_handling_strategy: str) -> pd.DataFrame: raise NotImplementedError( - "The well_connections property is required to be implemented in a FromSource class." + "The get_well_connections method is required to be implemented in a FromSource class." ) @property diff --git a/src/flownet/ert/_create_ert_setup.py b/src/flownet/ert/_create_ert_setup.py index 6c4a971ea..78058cf13 100644 --- a/src/flownet/ert/_create_ert_setup.py +++ b/src/flownet/ert/_create_ert_setup.py @@ -179,7 +179,11 @@ def create_ert_setup( # pylint: disable=too-many-arguments output_folder = pathlib.Path(args.output_folder) os.makedirs(output_folder, exist_ok=True) - if not prediction_setup: + if prediction_setup and config.ert.ref_sim: + path_ref_sim = pathlib.Path(config.ert.ref_sim).resolve() + mode = "pred" + elif not prediction_setup: + mode = "ahm" # Derive absolute path to reference simulation case if config.flownet.data_source.simulation.input_case: path_ref_sim = pathlib.Path( @@ -270,8 +274,8 @@ def create_ert_setup( # pylint: disable=too-many-arguments fh.write( analytics_workflow_template.render( { + "mode": mode, "reference_simulation": path_ref_sim, - "perforation_strategy": config.flownet.perforation_handling_strategy, "run_path": config.ert.runpath, "ecl_base": config.ert.eclbase, "analysis_start": analysis_item.start, diff --git a/src/flownet/ert/FLOW_SIMULATION b/src/flownet/ert/forward_models/FLOW_SIMULATION similarity index 100% rename from src/flownet/ert/FLOW_SIMULATION rename to src/flownet/ert/forward_models/FLOW_SIMULATION diff --git a/src/flownet/ert/forward_models/__init__.py b/src/flownet/ert/forward_models/__init__.py new file mode 100644 index 000000000..97a9f805e --- /dev/null +++ b/src/flownet/ert/forward_models/__init__.py @@ -0,0 +1,5 @@ +from ._save_iteration_parameters import save_iteration_parameters +from ._delete_simulation_output import delete_simulation_output +from ._flow_job import run_flow +from ._iteration_analytics import save_iteration_analytics +from ._render_realization import render_realization diff --git a/src/flownet/ert/forward_models/_delete_simulation_output.py b/src/flownet/ert/forward_models/_delete_simulation_output.py new file mode 100644 index 000000000..0775f94d0 --- /dev/null +++ b/src/flownet/ert/forward_models/_delete_simulation_output.py @@ -0,0 +1,24 @@ +import os +import argparse + + +def delete_simulation_output(): + """ + This function is called by a forward model in ERT, deleting unnecessary + simulation output files. + + Returns: + Nothing + + """ + parser = argparse.ArgumentParser(prog="Delete simulation output.") + + parser.add_argument( + "ecl_base", type=str, help="Base name of the simulation DATA file" + ) + + args = parser.parse_args() + + for suffix in ["EGRID", "INIT", "UNRST", "LOG", "PRT"]: + if os.path.exists(f"{args.ecl_base}.{suffix}"): + os.remove(f"{args.ecl_base}.{suffix}") diff --git a/src/flownet/ert/_flow_job.py b/src/flownet/ert/forward_models/_flow_job.py similarity index 100% rename from src/flownet/ert/_flow_job.py rename to src/flownet/ert/forward_models/_flow_job.py diff --git a/src/flownet/ahm/_ahm_iteration_analytics.py b/src/flownet/ert/forward_models/_iteration_analytics.py similarity index 94% rename from src/flownet/ahm/_ahm_iteration_analytics.py rename to src/flownet/ert/forward_models/_iteration_analytics.py index c65711816..1c00fd7cb 100644 --- a/src/flownet/ahm/_ahm_iteration_analytics.py +++ b/src/flownet/ert/forward_models/_iteration_analytics.py @@ -13,6 +13,7 @@ from ecl.summary import EclSum from flownet.data import FlowData +from flownet.ert.forward_models.utils import get_last_iteration def filter_dataframe( @@ -271,28 +272,38 @@ def compute_metric_ensemble( def make_dataframe_simulation_data( - path: str, eclbase_file: str, keys: List[str], end_date: datetime -) -> Tuple[pd.DataFrame, int, int]: + mode: str, path: str, eclbase_file: str, keys: List[str], end_date: datetime +) -> Tuple[pd.DataFrame, str, int]: """ Internal helper function to generate dataframe containing data from ensemble of simulations from selected simulation keys Args: + mode: String with mode in which flownet is run: prediction (pred) or assisted hisotory matching (ahm) path: path to folder containing ensemble of simulations eclbase_file: name of simulation case file keys: list of prefix of quantities of interest to be loaded end_date: end date of time period for accuracy analysis + Raises: + ValueError: If mode is invalid (not pred or ahm). + Returns: df_sim: Pandas dataframe contained data from ensemble of simulations iteration: current AHM iteration number nb_real: number of realizations """ - iteration = sorted( - [int(rel_iter.replace("/", "").split("-")[-1]) for rel_iter in glob.glob(path)] - )[-1] - runpath_list = glob.glob(path[::-1].replace("*", str(iteration)[::-1], 1)[::-1]) + if mode == "pred": + runpath_list = glob.glob(path) + iteration = "latest" + elif mode == "ahm": + (i, runpath_list) = get_last_iteration(path) + iteration = str(i) + else: + raise ValueError( + f"{mode} is not a valid mode to run flownet with. Choose ahm or pred." + ) partial_load_simulations = functools.partial( _load_simulations, ecl_base=eclbase_file @@ -321,27 +332,12 @@ def make_dataframe_simulation_data( return df_sim, iteration, n_realization -def save_iteration_analytics(): - """ - This function is called as a post-simulation workflow in ERT, saving all - accuracy metrics of all iterations to a file and plotting evolution of accuracy - metrics over iterations. The resulting accuracy metric values are stored in - a CSV file in the FlowNet output folder, along with the figures - - Args: - None - - Returns: - Nothing - - """ +def parse_arguments(): parser = argparse.ArgumentParser(prog=("Save iteration analytics to a file.")) + parser.add_argument("mode", type=str, help="Mode: ahm or pred") parser.add_argument( "reference_simulation", type=str, help="Path to the reference simulation case" ) - parser.add_argument( - "perforation_strategy", type=str, help="Perforation handling strategy" - ) parser.add_argument("runpath", type=str, help="Path to the ERT runpath.") parser.add_argument( "eclbase", type=str, help="Path to the simulation from runpath." @@ -370,24 +366,44 @@ def save_iteration_analytics(): args = parser.parse_args() args.runpath = args.runpath.replace("%d", "*") + return args + + +def save_iteration_analytics(): + """ + This function is called as a post-simulation workflow in ERT, saving all + accuracy metrics of all iterations to a file and plotting evolution of accuracy + metrics over iterations. The resulting accuracy metric values are stored in + a CSV file in the FlowNet output folder, along with the figures + + Args: + None + + Returns: + Nothing + + """ + args = parse_arguments() + print("Saving iteration analytics...", end=" ", flush=True) # Fix list inputs metrics = list(args.metrics.replace("[", "").replace("]", "").split(",")) + # Vector keys to analyze + vector_keys = list(args.quantity.replace("[", "").replace("]", "").split(",")) + # Load ensemble of FlowNet (df_sim, iteration, nb_real) = make_dataframe_simulation_data( + args.mode, args.runpath, args.eclbase, - list(args.quantity.replace("[", "").replace("]", "").split(",")), + vector_keys, args.end, ) # Load reference simulation (OPM-Flow/Eclipse) - field_data = FlowData( - args.reference_simulation, - perforation_handling_strategy=args.perforation_strategy, - ) + field_data = FlowData(args.reference_simulation) df_obs: pd.DataFrame = field_data.production df_obs["DATE"] = df_obs["date"] @@ -401,9 +417,6 @@ def save_iteration_analytics(): # Initiate dataframe with metrics df_metrics = load_csv_file(args.outfile, ["quantity", "iteration"] + metrics) - # Vector keys to analyze - vector_keys = list(args.quantity.replace("[", "").replace("]", "").split(",")) - # Prepare data from reference simulation df_obs_filtered = filter_dataframe( df_obs, diff --git a/src/flownet/realization/_render_realization.py b/src/flownet/ert/forward_models/_render_realization.py similarity index 97% rename from src/flownet/realization/_render_realization.py rename to src/flownet/ert/forward_models/_render_realization.py index 3c6ccd257..04cfd1621 100644 --- a/src/flownet/realization/_render_realization.py +++ b/src/flownet/ert/forward_models/_render_realization.py @@ -7,8 +7,8 @@ import pandas as pd -from ..parameters import Parameter -from ._simulation_realization import SimulationRealization +from flownet.parameters import Parameter +from flownet.realization._simulation_realization import SimulationRealization def _ert_samples2simulation_input( diff --git a/src/flownet/ert/forward_models/_save_iteration_parameters.py b/src/flownet/ert/forward_models/_save_iteration_parameters.py new file mode 100644 index 000000000..91adce78b --- /dev/null +++ b/src/flownet/ert/forward_models/_save_iteration_parameters.py @@ -0,0 +1,81 @@ +import argparse +import concurrent.futures +import json +import pathlib +import re +import shutil +from typing import Dict, Tuple +import pandas as pd + +from flownet.ert.forward_models.utils import get_last_iteration + + +def _load_parameters(runpath: str) -> Tuple[int, Dict]: + """ + Internal helper function to load parameter.json files in + parallel. + + Args: + runpath: Path to where the realization is run. + + Returns: + Dictionary with the realizations' parameters. + + """ + realization = int(re.findall(r"[0-9]+", runpath)[-2]) + parameters = json.loads((pathlib.Path(runpath) / "parameters.json").read_text()) + + return realization, parameters["FLOWNET_PARAMETERS"] + + +def save_iteration_parameters(): + """ + This function is called as a pre-simulation workflow in ERT, saving all + parameters of an iteration to a file. + + The resulting dataframe is saved as a gzipped parquet file using a PyArrow table + and has the following format (example for 5 realizations and 2 parameters): + + | index = realization | parameter 1 | parameter 2 | + |=====================|=============|=============| + | 1 | x.x | x.x | + | 3 | x.x | x.x | + | 5 | x.x | x.x | + | 4 | x.x | x.x | + | 2 | x.x | x.x | + + Mind that the dataframe is not ordered. + + Returns: + Nothing + + """ + parser = argparse.ArgumentParser(prog="Save iteration parameters to a file.") + parser.add_argument("runpath", type=str, help="Path to the ERT runpath.") + args = parser.parse_args() + args.runpath = args.runpath.replace("%d", "*") + + print("Saving ERT parameters to file...", end=" ") + + (iteration, runpath_list) = get_last_iteration(args.runpath) + realizations_dict = {} + + with concurrent.futures.ProcessPoolExecutor() as executor: + for result in executor.map(_load_parameters, runpath_list): + realizations_dict[result[0]] = result[1] + + pd.DataFrame( + [parameters for _, parameters in realizations_dict.items()], + index=realizations_dict.keys(), + ).to_parquet( + f"parameters_iteration-{iteration}.parquet.gzip", + index=True, + engine="pyarrow", + compression="gzip", + ) + + shutil.copyfile( + f"parameters_iteration-{iteration}.parquet.gzip", + "parameters_iteration-latest.parquet.gzip", + ) + print("[Done]") diff --git a/src/flownet/ert/forward_models/utils.py b/src/flownet/ert/forward_models/utils.py new file mode 100644 index 000000000..9c41a99fa --- /dev/null +++ b/src/flownet/ert/forward_models/utils.py @@ -0,0 +1,22 @@ +import glob +from typing import List, Tuple + + +def get_last_iteration(path: str) -> Tuple[int, List]: + """ + Function to collect the last iteration number for which the simulation has run + and the associated runpaths of this last iteration of all simulations. + + Args: + path: ERT runpath + + Returns: + Tuple with integer of last iteration and list of runpaths of last iteration of all simulations. + + """ + iteration = sorted( + [int(rel_iter.replace("/", "").split("-")[-1]) for rel_iter in glob.glob(path)] + )[-1] + runpath_list = glob.glob(path[::-1].replace("*", str(iteration)[::-1], 1)[::-1]) + + return iteration, runpath_list diff --git a/src/flownet/realization/__init__.py b/src/flownet/realization/__init__.py index 278edf0d3..0fbc0d0df 100644 --- a/src/flownet/realization/__init__.py +++ b/src/flownet/realization/__init__.py @@ -1,2 +1 @@ from ._schedule import Schedule -from ._render_realization import render_realization diff --git a/src/flownet/templates/SAVE_ITERATION_ANALYTICS_WORKFLOW.jinja2 b/src/flownet/templates/SAVE_ITERATION_ANALYTICS_WORKFLOW.jinja2 index 3b75805fa..0bccd7dba 100644 --- a/src/flownet/templates/SAVE_ITERATION_ANALYTICS_WORKFLOW.jinja2 +++ b/src/flownet/templates/SAVE_ITERATION_ANALYTICS_WORKFLOW.jinja2 @@ -1 +1 @@ -SAVE_ITERATION_ANALYTICS_WORKFLOW_JOB {{ reference_simulation }} {{ perforation_strategy }} {{ run_path }} {{ ecl_base }} {{ analysis_start }} {{ analysis_end }} {{ analysis_quantity }} {{ analysis_metric }} {{ analysis_outfile }} +SAVE_ITERATION_ANALYTICS_WORKFLOW_JOB {{ mode }} {{ reference_simulation }} {{ run_path }} {{ ecl_base }} {{ analysis_start }} {{ analysis_end }} {{ analysis_quantity }} {{ analysis_metric }} {{ analysis_outfile }} diff --git a/tests/test_calculation_accuracy_metric.py b/tests/test_calculation_accuracy_metric.py deleted file mode 100644 index 60f332059..000000000 --- a/tests/test_calculation_accuracy_metric.py +++ /dev/null @@ -1,16 +0,0 @@ -import math - -import numpy as np - -from flownet.ahm._ahm_iteration_analytics import accuracy_metric - - -def test_calculation_accuracy_metric() -> None: - assert math.isclose( - accuracy_metric( - np.array([[0.5, 0.5, 0.5], [0.5, 0.5, 0.5]]), - np.array([[0.4, 0.4, 0.4], [0.6, 0.6, 0.6]]), - "RMSE", - ), - 0.1, - ) diff --git a/tests/test_data_flow.py b/tests/test_data_flow.py index 6724204f5..0e7867283 100644 --- a/tests/test_data_flow.py +++ b/tests/test_data_flow.py @@ -50,11 +50,7 @@ def _locate_test_case() -> Path: # pylint: disable=protected-access def test_grid_cell_bounding_boxes() -> None: layers = () - flowdata = FlowData( - _locate_test_case(), - layers, - "multiple_based_on_workovers", - ) + flowdata = FlowData(_locate_test_case(), layers) # Test one layer for the whole field and no layers equal flowdata._layers = ((1, flowdata.grid.nz),) diff --git a/tests/test_normalize_data.py b/tests/test_iteration_analytics.py similarity index 55% rename from tests/test_normalize_data.py rename to tests/test_iteration_analytics.py index 23db2dcf4..669726af2 100644 --- a/tests/test_normalize_data.py +++ b/tests/test_iteration_analytics.py @@ -1,6 +1,35 @@ +import math import numpy as np +import pandas as pd -from flownet.ahm._ahm_iteration_analytics import normalize_data +from flownet.ert.forward_models._iteration_analytics import ( + prepare_opm_reference_data, + prepare_flownet_data, + normalize_data, + accuracy_metric, +) + + +def test_prepare_opm_reference_data() -> None: + data = {"key_1": [1, 2], "key_2": [3, 4]} + assert np.allclose( + prepare_opm_reference_data( + pd.DataFrame(data, columns=["key_1", "key_2"]), "key_", 2 + ), + np.array([[1, 1], [3, 3], [2, 2], [4, 4]]), + ) + + +def test_prepare_flownet_data() -> None: + data = { + "realization_id": [1, 1, 2, 2], + "key_1": [11, 12, 21, 22], + "key_2": [13, 14, 23, 24], + } + assert np.allclose( + prepare_flownet_data(pd.DataFrame(data, columns=["key_1", "key_2"]), "key_", 2), + np.array([[11, 21], [13, 23], [12, 22], [14, 24]]), + ) def test_normalize_data() -> None: @@ -30,3 +59,14 @@ def test_normalize_data() -> None: and np.allclose(tmp_3[0], res_5) and all(np.allclose(x, y) for x, y in zip(tmp_3[1], res_6)) ) + + +def test_calculation_accuracy_metric() -> None: + assert math.isclose( + accuracy_metric( + np.array([[0.5, 0.5, 0.5], [0.5, 0.5, 0.5]]), + np.array([[0.4, 0.4, 0.4], [0.6, 0.6, 0.6]]), + "RMSE", + ), + 0.1, + ) diff --git a/tests/test_prepare_flownet_data.py b/tests/test_prepare_flownet_data.py deleted file mode 100644 index 464c75598..000000000 --- a/tests/test_prepare_flownet_data.py +++ /dev/null @@ -1,16 +0,0 @@ -import numpy as np -import pandas as pd - -from flownet.ahm._ahm_iteration_analytics import prepare_flownet_data - - -def test_prepare_flownet_data() -> None: - data = { - "realization_id": [1, 1, 2, 2], - "key_1": [11, 12, 21, 22], - "key_2": [13, 14, 23, 24], - } - assert np.allclose( - prepare_flownet_data(pd.DataFrame(data, columns=["key_1", "key_2"]), "key_", 2), - np.array([[11, 21], [13, 23], [12, 22], [14, 24]]), - ) diff --git a/tests/test_prepare_opm_ref_data.py b/tests/test_prepare_opm_ref_data.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/tests/test_prepare_opm_reference_data.py b/tests/test_prepare_opm_reference_data.py deleted file mode 100644 index 33e79366c..000000000 --- a/tests/test_prepare_opm_reference_data.py +++ /dev/null @@ -1,14 +0,0 @@ -import numpy as np -import pandas as pd - -from flownet.ahm._ahm_iteration_analytics import prepare_opm_reference_data - - -def test_prepare_opm_reference_data() -> None: - data = {"key_1": [1, 2], "key_2": [3, 4]} - assert np.allclose( - prepare_opm_reference_data( - pd.DataFrame(data, columns=["key_1", "key_2"]), "key_", 2 - ), - np.array([[1, 1], [3, 3], [2, 2], [4, 4]]), - )