Skip to content

Commit

Permalink
Add rms collections
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-sol committed Nov 23, 2023
1 parent e2ebac9 commit fae85ea
Show file tree
Hide file tree
Showing 3 changed files with 338 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/fmu/dataio/rmscollectors/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

28 changes: 28 additions & 0 deletions src/fmu/dataio/rmscollectors/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from xtgeo import RoxUtils
import roxar
import roxar.jobs


def _get_project(project, readonly):
project = RoxUtils(project, readonly=readonly).project
return project


def get_job_arguments(owner, job_type, job_name):
"""Get job arguments
Args:
owner (list): list including parents of job
job_type (str): what job type
job_name (str): name of job
Returns:
dict: job settings
"""
job = roxar.jobs.Job.get_job(
owner=owner,
type=job_type,
name=job_name,
)
arguments = job.get_arguments()
return arguments
309 changes: 309 additions & 0 deletions src/fmu/dataio/rmscollectors/volumetrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,309 @@
# Get the job
import logging
from dataclasses import dataclass
import pandas as pd
from xtgeo import gridproperty_from_roxar, surface_from_roxar
from fmu.dataio import ExportData
from fmu.config.utilities import yaml_load
from fmu.dataio.rmscollectors import utils
import roxar
import roxar.jobs

logging.basicConfig(level="DEBUG")
logger = logging.getLogger("Inplace")

RENAME_VOLUMES = {
"Proj. real.": "REAL",
"Zone": "ZONE",
"Segment": "REGION",
"Boundary": "LICENSE",
"Facies": "FACIES",
"BulkOil": "BULK_OIL",
"NetOil": "NET_OIL",
"PoreOil": "PORV_OIL",
"HCPVOil": "HCPV_OIL",
"STOIIP": "STOIIP_OIL",
"AssociatedGas": "ASSOCIATEDGAS_OIL",
"BulkGas": "BULK_GAS",
"PoreGas": "PORV_GAS",
"HCPVGas": "HCPV_GAS",
"GIIP": "GIIP_GAS",
"AssociatedLiquid": "ASSOCIATEDOIL_GAS",
"Bulk": "BULK_TOTAL",
"Net": "NET_TOTAL",
"Pore": "PORV_TOTAL",
}


def _define_prefixes(out_input):
"""Define prefixes that could be used
Args:
out_input (dict): dict with info to construct dict
Returns:
list: possible prefixes
"""
prefix = out_input["Prefix"]
if prefix != "":
prefix = prefix + "_"
prefixes = []
if out_input["UseGas"]:
prefixes.append(prefix + "Gas_")
if out_input["UseOil"]:
prefixes.append(prefix + "Oil_")
logger.debug("\nReturning %s", prefixes)
return prefixes


def _define_output(out_input, selectors):
"""Find maps and properties that can be exported
Args:
out_input (dict): the output params from job
Returns:
dict: info to use for exporting
"""
logger.debug("\nExtracting output from %s", out_input)
out_location = out_input["MapOutput"].lower()
calculations = out_input["Calculations"]
prefixes = _define_prefixes(out_input)
properties = []
maps = []
for calculation in calculations:
for prfx in prefixes:
if calculation["CreateProperty"]:
properties.append(prfx + calculation["Type"].lower())
if calculation["CreateZoneMap"]:
maps.append(prfx + calculation["Type"].upper())
collated = {
"maps": maps,
"properties": properties,
"map_location": out_location,
"map_subfolders": selectors["Zone"]["filters"],
}
logger.debug("\nReturning %s", collated)
return collated


def _define_variables(variables_input):
"""Get info about volumetric setup
Args:
variables_input (dict): dictionary with input setup
Returns:
dict: information to be used as metadata?
"""
logger.debug("\nExtracting variables from %s", variables_input)
var_definitions = {}
variable_parameters = []
for var_group in variables_input.values():
for variable in var_group:
name = variable["Name"]
source = variable["InputSource"]
table_values = variable["TableValues"]
if variable["DataInput"]:
propname = variable["DataInput"][0][-1]
table_values = {"property": propname}
if propname not in variable_parameters:
variable_parameters.append(propname)
else:
if source == "REGION_MODEL":
table_values = "hidden"
var_definitions[name] = {
"applies": variable["InputType"],
"values": table_values,
}
logger.debug("\nReturning %s", var_definitions)
logger.debug("Additional properties are %s", variable_parameters)
return var_definitions, variable_parameters


def _define_selectors(in_dict):
"""Find filters that can be applied
Args:
in_dict (dict): the input section from job parameters
Returns:
dict: the selectors found
"""
logger.debug("\nExtracting selectors from %s", in_dict)
possible_selectors = ["Zone", "Region", "Facies"]
selectors = {}
for key in possible_selectors[1:]:
try:
selectors[key] = {
"filters": in_dict[f"Selected{key}Names"],
"parameter": in_dict[f"{key}Property"][-1],
}
except IndexError:
logger.warning("No selectors for %s", key)

selectors.update(
{
"Zone": {"filters": in_dict["SelectedZoneNames"]},
"parameter": "subgrids",
}
)
logger.debug("\nReturning %s", selectors)
return selectors


def get_volumetrics(report_params, project):
"""Get volumetrics table
Args:
report_params (dict): report section from volumetrics job
Returns:
pd.DataFrame: the volumes
"""
logger.debug("\nGetting volumes reading %s", report_params)
try:
volumes = pd.DataFrame.from_dict(
project.volumetric_tables[report_params[0]["ReportTableName"]]
.get_data_table()
.to_dict()
)
logger.debug("Volumes before renaming %s", volumes.head(2))
volumes.rename(columns=RENAME_VOLUMES, inplace=True)
logger.debug("Volumes after renaming %s", volumes.head(2))
volumes.drop("REAL", axis=1, inplace=True)
except KeyError:
logger.warning("No volume table attached")
volumes = None
return volumes


def _export_collection(
project,
collection,
parent,
job_name,
config_path="../../fmuconfig/output/global_variables.yml",
):
config = yaml_load(config_path)
exd = ExportData(config=config, parent=parent)
count = 0
for map_name in collection["maps"]:
for folder_name in collection["map_subfolders"]:
folder_name = f"Volumetrics_{job_name}/{folder_name}"
logger.debug(
"Fetching surface with name: %s, folder: %s", map_name, folder_name
)
try:
surf = surface_from_roxar(
project, map_name, folder_name, stype=collection["map_location"]
)
logger.debug(
"Exporting %s",
exd.export(
surf, name=map_name, tagname=job_name, content="property"
),
)

count += 1
except KeyError:
logger.warning("No surface called %s", map_name)
for property_name in collection["properties"]:
logger.debug("Will be exporting %s for %s", property_name, parent)
try:
prop = gridproperty_from_roxar(project, parent, property_name)
logger.debug(
"Exporting %s",
exd.export(
prop, name=property_name, tagname=job_name, content="property"
),
)
count += 1
except ValueError:
logger.warning("No parameter called %s", property_name)
try:
if collection["table"] is not None:
logger.debug(
"Exporting %s",
exd.export(
collection["table"],
parent=parent,
name="volumes",
tagname=job_name,
content="volumetrics",
),
)
count += 1
else:
logger.warning(
"No volumes exported, have you forgot to select table option?"
)
except KeyError:
logger.warning("No volume table attached")
logger.info("Exported %i objects", count)


@dataclass
class RmsInplaceVolumes:
"""Class for exporting data related to volumetrics"""

project: str
grid_name: str
job_name: str

def __post_init__(self):
"""Initialize what is not initialized upfront"""
self.params = utils.get_job_arguments(
["Grid models", self.grid_name, "Grid"], "Volumetrics", self.job_name
)
self.project = utils._get_project(self.project, True)
self.input = self.params["Input"][0]
self.output = self.params["Output"][0]
self.variables = self.params["Variables"][0]

self.report = get_volumetrics(self.params["Report"], self.project)
self.selectors = _define_selectors(self.input)
self.report_output = _define_output(self.output, self.selectors)
self.input_variables, additional_props = _define_variables(self.variables)
self.report_output["properties"].extend(additional_props)
logger.debug(self.report_output["properties"])
self.report_output["table"] = self.report

def export(self):
_export_collection(
self.project, self.report_output, self.grid_name, self.job_name
)


@dataclass
class RmsGrid:
"""Class for exporting data Grid"""

project: str
grid_name: str
job_name: str
params: dict = None

def __post_init__(self):
"""Initialize what is not initialized upfront"""
self.params = utils.get_job_arguments(
["Grid models", self.grid_name, "Grid"], "Create Grid", self.job_name
)
self.project = utils._get_project(self.project, True)


@dataclass
class RmsGrid:
"""Class for exporting data Grid"""

project: str
grid_name: str
job_name: str
params: dict = None

def __post_init__(self):
"""Initialize what is not initialized upfront"""
self.params = utils.get_job_arguments(
["Grid models", self.grid_name, "Grid"], "Create Grid", self.job_name
)
self.project = utils._get_project(self.project, True)

0 comments on commit fae85ea

Please sign in to comment.