From 31ad7edfe90e9794fa4e672e4a9f0a83dd3f82f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20Fredrik=20Ki=C3=A6r?= <31612826+anders-kiaer@users.noreply.github.com> Date: Thu, 5 Dec 2024 10:48:28 +0000 Subject: [PATCH] Simplified FMU observation setup --- pyproject.toml | 8 +- .../__init__.py | 0 .../genertobs_unstable/_datatypes.py | 325 ++++++++++++ .../genertobs_unstable/_utilities.py | 496 ++++++++++++++++++ src/subscript/genertobs_unstable/_writers.py | 331 ++++++++++++ src/subscript/genertobs_unstable/main.py | 60 +++ .../genertobs_unstable/parse_config.py | 89 ++++ 7 files changed, 1307 insertions(+), 2 deletions(-) rename src/subscript/{field_statistics => genertobs_unstable}/__init__.py (100%) create mode 100644 src/subscript/genertobs_unstable/_datatypes.py create mode 100644 src/subscript/genertobs_unstable/_utilities.py create mode 100644 src/subscript/genertobs_unstable/_writers.py create mode 100644 src/subscript/genertobs_unstable/main.py create mode 100644 src/subscript/genertobs_unstable/parse_config.py diff --git a/pyproject.toml b/pyproject.toml index d3cfa1f3b..595783d1a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,6 +40,8 @@ dependencies = [ "res2df", "ert", "ert>=10.2.0b13", + "fmu-config", + "odfpy", "fmu-tools", "grid3d_maps", "matplotlib", @@ -55,6 +57,8 @@ dependencies = [ "shapely", "xlrd", "xtgeo", + "fmu-config", + "odfpy" ] [project.optional-dependencies] @@ -96,8 +100,10 @@ csv_merge = "subscript.csv_merge.csv_merge:main" csv_stack = "subscript.csv_stack.csv_stack:main" eclcompress = "subscript.eclcompress.eclcompress:main" ecldiff2roff = "subscript.ecldiff2roff.ecldiff2roff:main" +field_statistics = "subscript.field_statistics.field_statistics:main" fmu_copy_revision = "subscript.fmu_copy_revision.fmu_copy_revision:main" fmuobs = "subscript.fmuobs.fmuobs:main" +genertobs_unstable = "subscript.genertobs_unstable.main:main" grav_subs_maps = "subscript.grav_subs_maps.grav_subs_maps:main" grav_subs_points = "subscript.grav_subs_points.grav_subs_points:main" interp_relperm = "subscript.interp_relperm.interp_relperm:main" @@ -117,8 +123,6 @@ sw_model_utilities = "subscript.sw_model_utilities.sw_model_utilities:main" sunsch = "subscript.sunsch.sunsch:main" vfp2csv = "subscript.vfp2csv.vfp2csv:main" welltest_dpds = "subscript.welltest_dpds.welltest_dpds:main" -field_statistics = "subscript.field_statistics.field_statistics:main" - [project.entry-points.ert] subscript_jobs = "subscript.hook_implementations.jobs" diff --git a/src/subscript/field_statistics/__init__.py b/src/subscript/genertobs_unstable/__init__.py similarity index 100% rename from src/subscript/field_statistics/__init__.py rename to src/subscript/genertobs_unstable/__init__.py diff --git a/src/subscript/genertobs_unstable/_datatypes.py b/src/subscript/genertobs_unstable/_datatypes.py new file mode 100644 index 000000000..1dd473749 --- /dev/null +++ b/src/subscript/genertobs_unstable/_datatypes.py @@ -0,0 +1,325 @@ +"""Pydantic models for genertobs""" + +import logging +import warnings +from enum import Enum +from pathlib import Path +from typing import Dict, List, Union + +from pydantic import ( + BaseModel, + ConfigDict, + Field, + RootModel, + computed_field, + field_validator, + model_validator, +) + + +def is_number(tocheck): + """Check that variable can be converted to number + + Args: + tocheck (something): what shall be checked + + Returns: + bool: check passed or not + """ + try: + float(tocheck) + return True + except TypeError: + return False + + +def is_percent_range(string): + """Check if string ending with % is between 0 and 100 + + Args: + string (str): the string to check + + Returns: + bool: True if number is in the range + """ + logger = logging.getLogger(__file__ + ".is_percent_range") + logger.debug("Input is %s", string) + number = float(string.replace("%", "")) + if 0 < number < 100: + return True + + if number > 50: + warnings.warn( + "It seems weird to have an error of more than 50%" + f" ({number}, is this correct?)" + ) + return False + + +def is_string_convertible_2_percent(error): + """Check string + + Args: + error (str): string to check + + Raises: + ValueError: if string does not end with a percent sign + TypeError: if string cannot be converted to a number + ValueError: if the number ends with %, but is not between 0 and 100 + """ + logger = logging.getLogger(__file__ + ".is_string_convertible_2_percent") + logger.debug("Checking this string %s", error) + try: + if not is_number(error[:-1]): + raise TypeError(f"This: {error} is not convertible to a number") + except TypeError: + pass + + if not error.endswith("%"): + raise ValueError( + f"When default_error ({error}) given as string it must end with a % sign" + ) + + if not is_percent_range(error): + raise ValueError(f"The number {error} is not in the valid range 0-100") + + +def check_error_limits(error, err_min, err_max): + """Check error limits + + Args: + error (Union[str,int,float]): the error to check against + err_min (Union[int,float,None]): the lower limit + err_max (Union[int,float,None]): the higher limit + + + Raises: + ValueError: if err_min is not None when error is not in percent + ValueError: if err_max is not None when error is not in percent + ValueError: if err_min >= max + """ + logger = logging.getLogger(__file__ + ".check_error_limits") + logger.debug("Checking with error: %s, and limits %s-%s", error, err_min, err_max) + if isinstance(error, (float, int)): + if err_min is not None: + raise ValueError( + "default_error specified as an absolute number," + f" doesn't make sense to set a lower limit ({err_min})" + ) + if err_max is not None: + raise ValueError( + "default_error specified as an absolute number," + f" doesn't make sense to set a higher limit ({err_max})" + ) + else: + if err_min is not None and err_max is not None and err_min >= err_max: + raise ValueError( + f"When using limits, the minimum must be lower than the maximum\n" + f"{err_min}-{err_max}" + ) + + +class ObservationType(Enum): + """The valid datatypes in a config file + + Args: + Enum (Enum): Enumerator + """ + + SUMMARY = "summary" + RFT = "rft" + + +class RftType(Enum): + """Valid Rft types + + Args: + Enum (Enum): Enumerator + """ + + PRESSURE = "pressure" + SWAT = "saturation_water" + SOIL = "saturation_oil" + SGAS = "saturation_gas" + + +class ElementMetaData(BaseModel): + """Pattern for Metadata element for observations + + Args: + BaseModel (BaseModel): pydantic BaseModel + """ + + subtype: RftType = Field( + default=RftType.PRESSURE, + description=f"Type of rft observation, can be any of {RftType.__members__}", + ) + + @property + @computed_field + def columns(self) -> Dict[RftType, Dict[str, str]]: + """Define columns to expect + + Returns: + Dict[RftType, Dict[str, str]]: the expected column with units + """ + if self.subtype == RftType.PRESSURE: + units = {"unit": "bar"} + else: + units = {"unit": "fraction"} + return {self.subtype: units} + + +class PluginArguments(BaseModel): + """Plugin arguments for config element""" + + model_config = ConfigDict(extra="forbid") + zonemap: str = Field( + default="", + description="path to file with mapping between zone names and grid layers", + ) + trajectories: str = Field( + default="", description="path to folder with trajectory files" + ) + + +class ConfigElement(BaseModel): + """Element in a config file""" + + name: str = Field(min_length=5, description="Name of observation") + type: ObservationType = Field(description="Type of observation") + observation: str = Field( + description="path to file containing observations, this can be any csv" + " like file,\n i.e textfile or spreadsheet", + ) + active: bool = Field( + default=True, + description="If the observation element shall be used in\n " + "generation of ert observations", + ) + default_error: Union[str, float, int] = Field( + default=None, + description="Optional argument. Error to be used\n. Used only when" + "no error column present or where error column is empty." + " Can be supplied as any number, and even with a percentage sign", + ) + + min_error: Union[int, float] = Field( + default=None, + description="Optional argument. Minimum error, only allowed " + "when default_error is in percent", + ) + max_error: Union[int, float] = Field( + default=None, + description="Optional argument. Maximum error, only allowed " + "when default_error is in percent", + ) + + alias_file: str = Field(default=None, description="Name of file with aliases") + + @field_validator("observation") + @classmethod + def validate_path_exists(cls, observation_path: str): + """Check that observation file exists + + Args: + observation_path (str): the path to check + + Raises: + OSError: if observation_path does not exist + """ + if not Path(observation_path).exists(): + raise OSError(f"Input observation file {observation_path}, does not exist") + return observation_path + + @field_validator("default_error") + @classmethod + def validate_default_error(cls, error: Union[str, int, float]): + """Check that if error is string, and if so then check that it is in % + + Args: + observation_path (str): the path to check + + Raises: + OSError: if observation_path does not exist + """ + try: + is_string_convertible_2_percent(error) + except AttributeError: + if error < 0: # type: ignore + raise ValueError(f"default error cannot be negative {error}") # pylint: disable=raise-missing-from + return error + + @model_validator(mode="after") + def check_when_default_is_number(self): + """Check + + Returns: + Self: self + """ + check_error_limits(self.default_error, self.min_error, self.max_error) + return self + + +class SummaryConfigElement(ConfigElement): + """ConfigElement for Summary data + + Args: + ConfigElement (ConfigElement): base observation config element + """ + + model_config = ConfigDict(extra="forbid") + + +class RftConfigElement(ConfigElement): + """Config element for Rft + contains some additional fields + + Args: + ConfigElement (pydantic model): base observation config element + """ + + plugin_arguments: PluginArguments # = Field(default=None) + metadata: ElementMetaData = Field( + default=ElementMetaData(), + description="Metadata describing the type", + ) + + # @field_validator("type") + # @classmethod + # def validate_of_rft_type(cls, observation_type: ObservationType): + # """validate that type is rft + + # Args: + # observation_type (ObservationType): the type of observation + + # Raises: + # TypeError: if type is not RFT + + # Returns: + # ObservationType: type of observations + # """ + # if observation_type != ObservationType.RFT: + # raise TypeError(f"This is not rft type, but {observation_type}") + # return observation_type + + +class ObservationsConfig(RootModel): + """Root model for config file + + Args: + RootModel (Rootmodel): pydantic root model + """ + + root: List[Union[SummaryConfigElement, RftConfigElement]] = Field( + description="What type of observation", + ) + + def __iter__(self): + return iter(self.root) + + def __getitem__(self, item): + return self.root[item] + + def __len__(self): + return len(self.root) diff --git a/src/subscript/genertobs_unstable/_utilities.py b/src/subscript/genertobs_unstable/_utilities.py new file mode 100644 index 000000000..4053ab9d6 --- /dev/null +++ b/src/subscript/genertobs_unstable/_utilities.py @@ -0,0 +1,496 @@ +import logging +import re +from pathlib import Path +from typing import List, Optional, Union +from warnings import warn + +import pandas as pd + +from subscript.genertobs_unstable._datatypes import ( + ConfigElement, + ObservationType, + RftConfigElement, +) + + +def _fix_column_names(dataframe: pd.DataFrame) -> pd.DataFrame: + """Make column names lower case, strip leading and trailing whitespace + + Args: + dataframe (pd.DataFrame): the dataframe to modify + + Returns: + pd.DataFrame: the modified dataframe + """ + dataframe.columns = [col.lower().strip() for col in dataframe.columns] + return dataframe + + +def remove_undefined_values(frame: pd.DataFrame) -> pd.DataFrame: + """Remove rows that have undefined values in Dataframe + + Args: + frame (pd.DataFrame): the dataframe to sanitize + + """ + logger = logging.getLogger(__name__ + ".remove_undefined_values") + undefined_vals = ["-999.999", "-999.25", -999.25, -999.9, ""] + if "value" in frame.columns: + logger.debug("Have a value column, will remove undefined, if there are any") + not_undefs = (~frame.value.isin(undefined_vals)) & (~frame.value.isnull()) + logger.debug("%s row(s) will be removed", frame.shape[0] - not_undefs.sum()) + return frame.loc[not_undefs] + + logger.debug("No value column, cannot remove undefs") + return frame + + +def remove_whitespace(dataframe: pd.DataFrame): + """Remove whitespace in str columns for pandas dataframe + + Args: + dataframe (pd.DataFrame): the dataframe to modify + """ + logger = logging.getLogger(__name__ + ".remove_whitespace") + for col_name in dataframe.columns: + try: + dataframe[col_name] = dataframe[col_name].map(str.strip) + except TypeError: + logger.debug("%s is not str column", col_name) + + +def read_tabular_file(tabular_file_path: Union[str, Path]) -> pd.DataFrame: + """Read csv or excel file into pandas dataframe + + Args: + tabular_file_path (str): path to file + + Returns: + pd.DataFrame: the dataframe read from file + """ + logger = logging.getLogger(__name__ + ".read_tabular_file") + logger.info("Reading file %s", tabular_file_path) + dataframe = pd.DataFrame() + try: + read_info = "csv, with sep ," + dataframe = pd.read_csv(tabular_file_path, sep=",", dtype=str, comment="#") + except UnicodeDecodeError: + dataframe = pd.read_excel(tabular_file_path, dtype=str) + read_info = "excel" + except FileNotFoundError as fnerr: + raise FileNotFoundError(f"|{tabular_file_path}| could not be found") from fnerr + nr_cols = dataframe.shape[1] + logger.debug("Nr of columns are %s", nr_cols) + if nr_cols == 1: + logger.debug("Wrong number of columns, trying with other separators") + for separator in [";", r"\s+"]: + logger.debug("Trying with |%s| as separator", separator) + try: + dataframe = pd.read_csv( + tabular_file_path, sep=separator, dtype=str, comment="#" + ) + except pd.errors.ParserError as pepe: + raise IOError( + f"Failing to read {tabular_file_path} with separator {separator}" + ) from pepe + read_info = f"csv with sep {separator}" + if dataframe.shape[1] > 1: + break + + logger.debug("Way of reading %s", read_info) + logger.debug("Shape of frame %s", dataframe.shape) + + if dataframe.shape[1] == 1: + raise IOError( + "File is not parsed correctly, check if there is something wrong!" + ) + + dataframe = _fix_column_names(dataframe) + remove_whitespace(dataframe) + dataframe = remove_undefined_values(dataframe) + dataframe.rename({"key": "vector"}, inplace=True, axis="columns") + logger.debug("Returning dataframe %s", dataframe) + return dataframe + + +def inactivate_rows(dataframe: pd.DataFrame): + """Inactivate rows in dataframe + + Args: + dataframe (pd.DataFrame): the dataframe to decimate + """ + logger = logging.getLogger(__name__ + ".inactivate_rows") + try: + inactivated = ~dataframe.active + logger.debug("Filter is %s", inactivated) + nr_rows = inactivated.sum() + logger.info( + "%s rows inactivated (%s percent)", + nr_rows, + 100 * nr_rows / dataframe.shape[0], + ) + dataframe = dataframe.loc[~inactivated] + logger.debug("shape after deactivation %s", dataframe.shape) + except AttributeError: + logger.info("No inactivation done") + return dataframe + + +def check_and_fix_str(string_to_sanitize: str) -> str: + """Replace some unwanted strings in str + + Args: + string_to_sanitize (str): the input string + + Returns: + str: the sanitized string + """ + logger = logging.getLogger(__name__ + ".check_and_fix_str") + logger.debug("Initial string before sanitization |%s|", string_to_sanitize) + unwanted_characters = re.compile(r"(\s+|/)") + country_code = re.compile(r"^[a-zA-Z]+\s+") + unwanted_chars = unwanted_characters.findall(string_to_sanitize) + logger.debug("%s unwanted characters found %s", len(unwanted_chars), unwanted_chars) + if len(unwanted_chars) > 0: + warn( + f"String: {string_to_sanitize} contains {unwanted_chars} will be replaced\n" + "But might be an indication that something is not right!!" + ) + sanitized = unwanted_characters.sub( + "_", country_code.sub("", string_to_sanitize.strip()) + ) + + logger.debug("After sanitization %s", sanitized) + return sanitized + + logger.debug("String was good to go") + return string_to_sanitize + + +def convert_rft_to_list(frame: pd.DataFrame) -> list: + """Convert dataframe to list of dictionaries + + Args: + frame (pd.DataFrame): the input dataframe + + Returns: + list: the extracted results + """ + output = [] + logger = logging.getLogger(__name__ + ".convert_rft_to_list") + logger.debug("frame to convert %s", frame) + keepers = [ + "value", + "error", + "x", + "y", + "tvd", + "md", + "zone", + ] + additionals = [ + "well_name", + "date", + ] + relevant_columns = keepers + additionals + logger.debug( + "Hoping for these columns %s, available are %s", + relevant_columns, + frame.columns.to_list(), + ) + narrowed_down = frame.loc[:, frame.columns.isin(relevant_columns)] + well_names = narrowed_down.well_name.unique().tolist() + logger.debug("%s wells to write (%s)", len(well_names), well_names) + for well_name in well_names: + well_observations = narrowed_down.loc[narrowed_down.well_name == well_name] + dates = well_observations.date.unique().tolist() + logger.debug("Well %s has %s dates", well_name, len(dates)) + restart = 1 + for date in dates: + well_date_observations = well_observations.loc[ + well_observations.date == date + ] + output.append( + { + "well_name": well_name, + "date": date, + "restart": restart, + "label": f"{well_name}_{date}".replace("-", "_"), + "data": well_date_observations[keepers], + } + ) + restart += 1 + + return output + + +def convert_summary_to_list(frame: pd.DataFrame) -> list: + """Convert dataframe with summary obs to list of dictionaries + + Args: + frame (pd.DataFrame): the input dataframe + + Returns: + list: the extracted results + """ + output = [] + logger = logging.getLogger(__name__ + ".convert_summary_to_list") + logger.debug("frame to convert %s", frame) + keepers = ["date", "value", "error"] + additional = ["vector"] + relevant_columns = keepers + additional + narrowed_down = frame.loc[:, frame.columns.isin(relevant_columns)] + vectors = frame.vector.unique().tolist() + logger.debug("%s vectors to write (%s)", len(vectors), vectors) + for vector in vectors: + vector_observations = narrowed_down.loc[narrowed_down.vector == vector].copy() + vector_observations["label"] = ( + vector_observations["vector"].str.replace(":", "_").replace("-", "_") + + "_" + + [str(num) for num in range(vector_observations.shape[0])] + ) + output.append( + { + "vector": vector, + "data": vector_observations[keepers + ["label"]], + } + ) + return output + + +def convert_obs_df_to_list(frame: pd.DataFrame, content: ObservationType) -> list: + """Converts dataframe with observation to dictionary format + + Args: + frame (pd.DataFrame): the input dataframe + + Returns: + dict: the dictionary derived from dataframe + """ + logger = logging.getLogger(__name__ + ".convert_obs_df_to_dict") + logger.debug("Frame to extract from \n%s", frame) + obs_list = [] + if content == ObservationType.SUMMARY: + obs_list = convert_summary_to_list(frame) + elif content == ObservationType.RFT: + obs_list = convert_rft_to_list(frame) + logger.debug("\nFrame as list of dictionaries \n%s\n", obs_list) + return obs_list + + +def add_or_modify_error( + frame: pd.DataFrame, + error: Union[str, float, int], + err_min: Optional[Union[float, int]] = None, + err_max: Optional[Union[float, int]] = None, +): + """Complete error column in dataframe + + Args: + frame (pd.DataFrame): the dataframe to be modified + error (str): the error to add when it is undefined or not included + """ + logger = logging.getLogger(__name__ + ".add_or_modify_error") + logger.debug("Frame before error addition/modification \n%s\n", frame) + logger.debug("Frame has columns %s", frame.columns) + logger.debug("Error to apply %s", error) + + ensure_numeric(frame, "value") + ensure_numeric(frame, "error") + error = str(error) # convert to ensure that code is simpler further down + try: + error_holes = frame.error.isna() + except AttributeError: + logger.info("No error column provided, error will be added for all entries") + error_holes = pd.Series([True] * frame.shape[0]) + frame["error"] = None + if error_holes.sum() == 0: + logger.info("Error allready set, nothing will be changed") + frame.error = frame.error.astype(float) + + if error.endswith("%"): + logger.debug("Error is percent, will be multiplied with value") + + frac_error = float(error[:-1]) / 100 + logger.debug("Factor to multiply with %s", frac_error) + frame.loc[error_holes, "error"] = frame.loc[error_holes, "value"] * frac_error + if err_min is not None: + frame.loc[frame["error"] < err_min, "error"] = err_min + + if err_max is not None: + frame.loc[frame["error"] > err_max, "error"] = err_max + + else: + if err_max is not None or err_min is not None: + mess = f"""Truncation of error when error is absolute + has no effect min: {err_min}, max: {err_max} + """ + warn(mess) + + logger.debug("Error is absolute, will be added as constant") + abs_error = float(error) + logger.debug("Error to add %s", abs_error) + logger.debug("Error holes are %s", error_holes) + try: + frame.loc[error_holes, "error"] = abs_error + except TypeError: + # TODO: This exception shows that the code is not ideal, but works for now + logger.error("Fixing via a backdoor solution.. Code should be improved") + frame["error"] = abs_error + + dubious_errors = frame.error > frame.value + if dubious_errors.sum() > 0: + warn( + "Some errors are larger than the values" + f"\n{frame.loc[dubious_errors]}\n Is this intentional?" + ) + logger.debug("After addition/modification errors are \n%s\n", frame.error) + + +def ensure_numeric(frame: pd.DataFrame, key: str): + """Convert certain column to numeric if it isn't + + Args: + frame (pd.DataFrame): the dataframe + key (str): the column name for the column + """ + logger = logging.getLogger(__name__ + ".ensure_numeric") + try: + if frame[key].dtype.kind not in "iuf": + if frame[key].astype(str).str.contains(".").sum() > 0: + converter = float + else: + converter = int # type: ignore + frame[key] = frame[key].astype(converter) + except KeyError: + logger.debug("No %s column", key) + + +def extract_general(in_frame: pd.DataFrame, lable_name: str) -> pd.DataFrame: + """Modify dataframe from general observations + + Args: + in_frame (pd.DataFrame): the original dataframe + lable_name (str): anme of label + + Returns: + pd.DataFrame: modified dataframe + """ + logger = logging.getLogger(__name__ + ".extract_general") + general_observations = in_frame + general_observations["lable"] = lable_name + logger.debug("returning %s", general_observations) + return general_observations + + +def extract_from_row( + row: Union[RftConfigElement, ConfigElement], + parent_folder: Path, +) -> List[pd.DataFrame]: + """Extract results from row in config file + + Args: + row (pd.Series): the row to extract from + parent_folder (str, Path): the folder to use when reading file + + Returns: + pd.DataFrame: the extracted results + """ + # TODO: vector name for timeseries should not be wrapped into list? + # or maybe contained, but add key name or summat as idenfier + # Are there exceptions where it should not be list? + logger = logging.getLogger(__name__ + ".extract_from_row") + logging.debug("Extracting from this element %s", row) + input_file = parent_folder / row.observation + logger.debug("File reference in row %s", input_file) + content = row.type + obs_frame = read_obs_frame(input_file, content, row.alias_file) + + if not row.active: + obs_frame["active"] = "no" + + else: + if "active" not in obs_frame.columns: + obs_frame["active"] = "yes" + + obs_frame["active"] = obs_frame["active"] != "no" + + logger.info("Results after reading observations as dataframe:\n%s\n", obs_frame) + + add_or_modify_error(obs_frame, row.default_error, row.min_error, row.max_error) + logger.debug("\nThese are the observation results:\n %s", obs_frame) + + converted = convert_obs_df_to_list(obs_frame, content) + logger.debug("Converted results %s", converted) + + return converted + + +def replace_names(name_series: pd.Series, replacer: pd.DataFrame) -> pd.Series: + """Replace name in a pandas dataseries with values from dataframe + + Args: + name_series (pd.Series): the series to replace in + replacer (pd.DataFrame): the dataframe to replace with + + Raises: + ValueError: if replacer cannot be converted to dictionary to replace with + + Returns: + pd.Series: the dataseries with replaced values + """ + logger = logging.getLogger(__name__ + ".replace_names") + if replacer.shape[1] != 2: + raise ValueError( + "This dataframe cannot be used to replace names, has the wrong shape" + ) + + replace_dict = dict( + zip(replacer[replacer.columns[0]], replacer[replacer.columns[1]]) + ) + logger.info("Will replace names with dictionary %s", replace_dict) + replaced_names = name_series.replace(replace_dict) + if replaced_names.equals(name_series): + warn("No replacement is done, column is unchanged") + logger.info("New column: %s", replaced_names) + return replaced_names + + +def read_obs_frame( + input_file: Path, content: ObservationType, alias_file +) -> pd.DataFrame: + """Read obs table, generate summary to be converted to ert esotheric format + + Args: + input_file (Path): the file where the data is + label (str): lable to be added for general obs + content (Str): what content to be read + + Returns: + tuple: the actual observation data, the summary of observations for csv output + """ + logger = logging.getLogger(__name__ + ".read_obs_frame") + logger.debug("Trying to read from %s", input_file) + if content not in [ObservationType.SUMMARY, ObservationType.RFT]: + label = input_file.stem + obs_frame = extract_general(read_tabular_file(input_file), label) + else: + obs_frame = read_tabular_file(input_file) + + try: + obs_frame["date"] = pd.to_datetime(obs_frame["date"]).dt.strftime("%Y-%m-%d") + except KeyError: + logger.warning("No date column for this dataframe") + + try: + obs_frame["well_name"] = obs_frame["well_name"].map(check_and_fix_str) + if alias_file is not None: + logger.debug("Reading alias file |%s|", alias_file) + aliases = read_tabular_file(alias_file) + logger.debug("Will replace names with aliases %s", aliases) + obs_frame["well_name"] = replace_names(obs_frame["well_name"], aliases) + + except KeyError: + logger.debug("No well_name column for this dataframe") + logger.debug("Returning %s", obs_frame) + return obs_frame diff --git a/src/subscript/genertobs_unstable/_writers.py b/src/subscript/genertobs_unstable/_writers.py new file mode 100644 index 000000000..4f3ab29d5 --- /dev/null +++ b/src/subscript/genertobs_unstable/_writers.py @@ -0,0 +1,331 @@ +import logging +import os +import pwd +import re +import time +from datetime import datetime +from pathlib import Path +from shutil import rmtree +from typing import Optional, Tuple + +import pandas as pd + +from subscript.genertobs_unstable._datatypes import ObservationType +from subscript.genertobs_unstable._utilities import check_and_fix_str, inactivate_rows + +GENDATA_RFT_EXPLAINER = """------------------------- +-- GENDATA_RFT -- Create files with simulated rft pressure +------------------------- +-- ERT doc: https://fmu-docs.equinor.com/docs/ert/reference/forward_models.html#GENDATA_RFT + +""" + +GENDATA_EXPLAINER = """------------------------- +-- GEN_DATA -- Create GEN_DATA of rft for usage in AHM +------------------------- +-- ERT doc: https://fmu-docs.equinor.com/docs/ert/reference/configuration/keywords.html#gen-data + +-- ert id Result file name input format report step +""" + + +def add_time_stamp(string="", record_type="f", comment_mark="--"): + """Add commented line with user and timestamp + + Args: + string (str): the string to stamp + record_type (str, optional): specifies if it is file or folder. Defaults to "f". + + Returns: + _type_: _description_ + """ + ctime = datetime.now().strftime("%Y-%m-%d:%H:%M:%S") + user = pwd.getpwuid(os.getuid())[0] + type_str = "file" if record_type == "f" else "folder" + + time_stamped = ( + f"{comment_mark}This {type_str} is autogenerated by {user} " + f"running genertobs_unstable at {ctime}\n" + ) + + time_stamped += f"{comment_mark} DO NOT EDIT THIS {type_str.upper()} MANUALLY!\n" + + time_stamped += string + return time_stamped + + +def write_csv_with_comment(file_path, frame): + """Write to csv file with timestamped header + + Args: + file_path (str): path to file + frame (pd.DataFrame): the dataframe to write + """ + + with open(file_path, "w", encoding="utf-8") as stream: + # stream.write(add_time_stamp(comment_mark="#")) + frame.to_csv(stream, index=False, header=False, sep=" ") + + +def write_timeseries_ertobs(obs_dict: dict): + """Make ertobs string to from dictionary + + Args: + obs_dict (dict): the dictionary to extract from + + Returns: + str: string to write into ertobs file + """ + logger = logging.getLogger(__name__ + ".write_timeseries_ertobs") + logger.debug("%s observations to write", obs_dict) + obs_frames = [] + for element in obs_dict: + logger.debug("Element to extract from %s", element) + key = element["vector"] + logger.debug(key) + obs_frame = inactivate_rows(element["data"]) + obs_frame["class"] = "SUMMARY_OBSERVATION" + obs_frame["key"] = f"KEY={key}" + ";};" + order = ["class", "label", "value", "error", "date", "key"] + obs_frame = obs_frame[order] + obs_frame["value"] = "{VALUE=" + obs_frame["value"].astype(str) + ";" + obs_frame["error"] = "ERROR=" + obs_frame["error"].astype(str) + ";" + obs_frame["date"] = "DATE=" + obs_frame["date"].astype(str) + ";" + obs_frames.append(obs_frame) + obs_frames_str = pd.concat(obs_frames).to_string(header=False, index=False) + obs_str = re.sub(r" +", " ", obs_frames_str) + "\n" # type: ignore + logger.debug("Returning %s", obs_str) + return obs_str + + +def select_from_dict(keys: list, full_dict: dict): + """Select some keys from a bigger dictionary + + Args: + keys (list): the keys to select + full_dict (dict): the dictionary to extract from + + Returns: + dict: the subselection of dict + """ + return {key: full_dict[key] for key in keys} + + +def create_rft_ertobs_str(element: pd.Series, prefix: str, obs_file: Path) -> str: + """Create the rft ertobs string for specific well + + Args: + element (RftConfigElement): element with data + prefix (str): prefix to be included + obs_file (str): name file with corresponding well observations + + Returns: + str: the string + """ + return ( + f"GENERAL_OBSERVATION {element['well_name']}_{prefix}_OBS " + + "{" + + f"DATA={element['well_name']}_{prefix}_SIM;" + + f" RESTART = {element['restart']}; " + + f"OBS_FILE = {obs_file}" + + ";};\n" + ) + + +def create_rft_gendata_str(element: pd.Series, prefix, outfolder_name: str) -> str: + """Create the string to write as gendata call + + Args: + element (pd.Series): data row + prefix (str): prefix to be included + outfolder_name (str): path to folder where results are stored + + Returns: + str: the string + """ + separator_string = "_" if prefix == "PRESSURE" else f"_{prefix}_" + return ( + f"GEN_DATA {element['well_name']}_{prefix}_SIM " + + f"RESULT_FILE:{outfolder_name}/RFT{separator_string}{element['well_name']}_%d" + + f" REPORT_STEPS:{element['restart']}\n" + ) + + +def write_genrft_str( + parent: Path, well_date_path: Path, layer_zone_table: Path, outfolder_name: str +) -> str: + """write the string to define the GENDATA_RFT call + + Args: + parent (str): path where rfts are stored + well_date_path (str): path to file with well, date, and restart number + layer_zone_table (str): path to zones vs layer file + outfolder_name (str): path to where results will be restored + + Returns: + str: the string + """ + logger = logging.getLogger(__name__ + ".write_genrft_str") + string_warning = ( + "\n\n!!Remember that the zone layer file: %s will need to have path relative\n" + + " to runpath for realization, so please double check that this is the case,\n" + + " otherwise you will just stop ert later!!\n\n" + ) + time.sleep(2) + logger.warning( + string_warning, + layer_zone_table, + ) + str_parent = str(parent) + string = ( + GENDATA_RFT_EXPLAINER + + f"DEFINE {parent}\n" + + f"FORWARD_MODEL MAKE_DIRECTORY(={outfolder_name})\n" + + "FORWARD_MODEL GENDATA_RFT(=," + + "=/" + + f"{str(well_date_path).replace(str_parent, '')}," + + f"={str(layer_zone_table)}," + + f" ={outfolder_name})\n\n" + ) + logger.debug("Returning %s", string) + return string + + +def write_rft_ertobs( + rft_dict: dict, well_date_file: Path, parent_folder: Path +) -> Tuple[str, str]: + """Write all rft files for rft dictionary, pluss info str + + Args: + rft_dict (dict): the rft information + parent_folder (str, optional): path to parent folder. Defaults to "". + + Returns: + str: ertobs strings for rfts + """ + logger = logging.getLogger(__name__ + ".write_rft_ertobs") + rft_folder = Path(parent_folder) / "rft" + rft_folder.mkdir(exist_ok=True) + logger.debug("%s observations to write", rft_dict) + well_date_list = [] + rft_ertobs_str = "" + gen_data = "" + prefix = rft_dict["config"].metadata.subtype.name + outfolder_name = "gendata_rft" + logger.debug("prefix is %s", prefix) + for element in rft_dict["data"]: + logger.debug(element["well_name"]) + obs_file = write_well_rft_files(rft_folder, prefix, element) + if obs_file is not None: + well_date_list.append( + [element["well_name"], element["date"], element["restart"]] + ) + rft_ertobs_str += create_rft_ertobs_str(element, prefix, obs_file) + gen_data += create_rft_gendata_str(element, prefix, outfolder_name) + logger.debug( + "\n---------------Before \n%s--------------------\n\n", gen_data + ) + + well_date_frame = pd.DataFrame( + well_date_list, columns=["well_name", "date", "restart"] + ) + + write_csv_with_comment(well_date_file, well_date_frame) + logger.debug("Written %s", str(well_date_file)) + logger.debug("\n---------------After \n%s--------------------\n\n", gen_data) + + return rft_ertobs_str, gen_data + + +def write_well_rft_files( + parent_folder: Path, prefix: str, element: dict +) -> Optional[Path]: + """Write rft files for rft element for one well + + Args: + parent_folder (str): parent to write all files to + prefix (str): prefix defining if it is pressure or saturation + element (dict): the info about the element + + Returns: + str: ertobs string for well + """ + logger = logging.getLogger(__name__ + ".write_well_rft_files") + well_frame = inactivate_rows(element["data"]) + if well_frame.empty: + return None + fixed_file_name = check_and_fix_str(element["well_name"]) + obs_file = (parent_folder / f"{prefix.lower()}_{fixed_file_name}.obs").resolve() + position_file = parent_folder / f"{fixed_file_name}.txt" + logger.debug("Writing %s and %s", obs_file, position_file) + obs_frame = well_frame[["value", "error"]] + logger.debug("observations\n%s", obs_frame) + write_csv_with_comment(obs_file, obs_frame) + position_frame = well_frame[["x", "y", "md", "tvd", "zone"]] + logger.debug("positions for\n%s", position_frame) + write_csv_with_comment(position_file, position_frame) + + return obs_file + + +def write_dict_to_ertobs(obs_list: list, parent: Path) -> str: + """Write all observation data for ert + + Args: + obs_list (list): the list of all observations + parent (str, optional): location to write to. Defaults to "". + + Returns: + str: parent folder for all written info + """ + logger = logging.getLogger(__name__ + ".write_dict_to_ertobs") + logger.debug("%s observations to write", len(obs_list)) + logger.debug(obs_list) + + if parent.exists(): + logger.warning("%s exists, deleting and overwriting contents", str(parent)) + rmtree(parent) + parent.mkdir() + well_date_file_name = parent / "rft/well_date_restart.txt" + gendata_rft_folder_name = "gendata_rft" + gendata_rft_str = "" + obs_str = add_time_stamp() + gen_data = "" + readme_file = parent / "readme.txt" + readme_file.write_text(add_time_stamp(record_type="d")) + for obs in obs_list: + logger.debug(obs) + obs_str += f"--\n--{obs['config'].name}\n" + if obs["config"].type == ObservationType.SUMMARY: + obs_str += write_timeseries_ertobs(obs["data"]) + + elif obs["config"].type == ObservationType.RFT: + if gendata_rft_str == "": + gendata_rft_str = write_genrft_str( + parent / "rft", + well_date_file_name, + obs["config"].plugin_arguments.zonemap, + gendata_rft_folder_name, + ) + rft_str_element, gen_data_element = write_rft_ertobs( + obs, well_date_file_name, parent + ) + obs_str += rft_str_element + gen_data += gen_data_element + logger.debug("No gen_data is %s characters (%s)", len(gen_data), gen_data) + else: + logger.warning( + "Currently not supporting other formats than timeseries and rft" + ) + ertobs_file = parent / "ert_observations.obs" + ertobs_file.write_text(obs_str) + if gen_data: + gen_data = gendata_rft_str + GENDATA_EXPLAINER + gen_data + gen_data_file = parent / "gen_data_rft_wells.ert" + gen_data_file.write_text(add_time_stamp(gen_data)) + logger.debug("Written %s", str(gen_data_file)) + + # Set to read only for others, user needs all rights for tests + # group is able to delete + parent.chmod(0o774) + return obs_str diff --git a/src/subscript/genertobs_unstable/main.py b/src/subscript/genertobs_unstable/main.py new file mode 100644 index 000000000..c81bed7f0 --- /dev/null +++ b/src/subscript/genertobs_unstable/main.py @@ -0,0 +1,60 @@ +"""Main function of genertobs""" + +import argparse +import logging +from pathlib import Path + +from subscript.genertobs_unstable._writers import write_dict_to_ertobs +from subscript.genertobs_unstable.parse_config import ( + generate_data_from_config, + read_yaml_config, +) + + +def parse_args(): + """Parse args for genertobs""" + info_string = "Generates all neccessary files for using observations in ert" + parser = argparse.ArgumentParser(description=info_string) + parser.add_argument( + "config_file", + help="path to config file, name of this file will be name of generated folder", + type=str, + ) + parser.add_argument("--d", help="debug mode", action="store_true") + return parser.parse_args() + + +def run(config_path: Path): + """Generate data from config file + + Args: + config_path (str): path to genertobs file + output_folder (str): path to where all results will be stored + """ + logger = logging.getLogger(__name__ + ".run") + logger.info("Here is config path %s", config_path) + config = read_yaml_config(config_path) + logger.debug("Read config: %s", config) + config_path = Path(config_path) + export_folder = (config_path.parent / f"{config_path.stem}_do_not_edit").resolve() + + data = generate_data_from_config(config, config_path.parent) + logger.debug("Data generated %s", data) + write_dict_to_ertobs(data, export_folder) + logger.info("Exported all ert obs results to folder %s", str(export_folder)) + + return export_folder + + +def main(): + """Run the whole shebang""" + logger = logging.getLogger(__name__ + ".main") + args = parse_args() + level = logging.DEBUG if args.d else logging.WARNING + logging.basicConfig(level=level) + logger.debug("Have read args %s", args) + run(args.config_file) + + +if __name__ == "__main__": + main() diff --git a/src/subscript/genertobs_unstable/parse_config.py b/src/subscript/genertobs_unstable/parse_config.py new file mode 100644 index 000000000..34216109a --- /dev/null +++ b/src/subscript/genertobs_unstable/parse_config.py @@ -0,0 +1,89 @@ +"""Code related to fmobs config stuff""" + +import logging +from pathlib import Path + +import yaml + +from subscript.genertobs_unstable._datatypes import ObservationsConfig +from subscript.genertobs_unstable._utilities import extract_from_row + +# def read_tabular_config( +# config_file_name: Union[str, Path], parent_folder: Union[str, PosixPath] = None +# ) -> List[pd.DataFrame]: +# """Parse config file in csv/excel like format + +# Args: +# config_file_name (str): path to config file + +# Returns: +# pd.DataFrame: the config file as dataframe +# """ +# logger = logging.getLogger(__name__ + ".read_config_file") +# config = read_tabular_file(config_file_name) +# logger.debug("Shape of config : %s", config.shape) +# if parent_folder is None: +# parent_folder = Path(config_file_name).parent +# else: +# parent_folder = Path(parent_folder) + +# obs_data = [] + +# for rnr, row in config.iterrows(): +# if row["active"] != "yes": +# logger.info("row %s is deactivated", rnr) +# continue + +# row_obs = extract_from_row(row, parent_folder) +# obs_data.append(row_obs) + +# obs_data = pd.concat(obs_data) +# return obs_data + + +def read_yaml_config(config_file_name: Path) -> ObservationsConfig: + """Read configuration from file + + Args: + config_file_name (str): path to yaml file + + Raises: + RuntimeError: If something goes wrong + + Returns: + dict: the configuration as dictionary + """ + logger = logging.getLogger(__name__ + ".read_yaml_config") + + config = {} + try: + with open(config_file_name, "r", encoding="utf-8") as stream: + config = yaml.safe_load(stream) + except OSError as ose: + raise RuntimeError(f"Could not read {config_file_name}") from ose + logger.debug("Returning %s", config) + return ObservationsConfig.model_validate(config) + + +def generate_data_from_config(config: ObservationsConfig, parent: Path) -> list: + """Generate tuple with dict and dataframe from config dict + + Args: + config (ObservationConfig): the configuration pydantic model + parent (PosixPath): path of parent folder of file containing dict + + Returns: + dict: dictionary with observations + """ + logger = logging.getLogger(__name__ + ".generate_data_from_config") + logger.debug("Here is config to parse %s", config) + data = [] + for config_element in config: + logger.info("Parsing element %s", config_element.name) + data_element = { + "config": config_element, + "data": extract_from_row(config_element, parent), + } + data.append(data_element) + + return data