Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sumo Map Prototype with WLF #1044

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
"InplaceVolumes = webviz_subsurface.plugins:InplaceVolumes",
"InplaceVolumesOneByOne = webviz_subsurface.plugins:InplaceVolumesOneByOne",
"LinePlotterFMU = webviz_subsurface.plugins:LinePlotterFMU",
"MapViewerSumo = webviz_subsurface.plugins:MapViewerSumo",
"MapViewerFMU = webviz_subsurface.plugins:MapViewerFMU",
"MorrisPlot = webviz_subsurface.plugins:MorrisPlot",
"ParameterAnalysis = webviz_subsurface.plugins:ParameterAnalysis",
Expand Down Expand Up @@ -98,13 +99,16 @@
"pandas>=1.1.5",
"pillow>=6.1",
"pyarrow>=5.0.0",
"redis",
"pyscal>=0.7.5",
"scipy>=1.2",
"statsmodels>=0.12.1", # indirect dependency through https://plotly.com/python/linear-fits/
"webviz-config>=0.3.8",
"webviz-core-components>=0.5.6",
"webviz-config",
"webviz-core-components",
"webviz-subsurface-components>=0.4.12",
"xtgeo>=2.14",
"fmu-sumo@git+https://github.com/equinor/fmu-sumo@explorer",
"sumo-wrapper-python@git+https://github.com/equinor/sumo-wrapper-python.git@master",
],
extras_require={"tests": TESTS_REQUIRE},
setup_requires=["setuptools_scm~=3.2"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@
SurfaceMeta,
SurfaceServer,
)
from .ensemble_provider_dealer import EnsembleProviderDealer
from .ensemble_provider_dealer_sumo import EnsembleProviderDealerSumo
Original file line number Diff line number Diff line change
@@ -0,0 +1,301 @@
import logging
from io import BytesIO
from pathlib import Path
from typing import List, Optional, Set

import flask_caching
import xtgeo

from webviz_subsurface._utils.perf_timer import PerfTimer

from . import webviz_sumo
from ._stat_surf_cache import StatSurfCache
from .ensemble_surface_provider import (
EnsembleSurfaceProvider,
ObservedSurfaceAddress,
SimulatedSurfaceAddress,
StatisticalSurfaceAddress,
SurfaceAddress,
SurfaceStatistic,
)

LOGGER = logging.getLogger(__name__)


class ProviderImplSumo(EnsembleSurfaceProvider):
def __init__(
self,
cache_dir: Path,
cache: flask_caching.SimpleCache,
sumo_id_of_case: str,
iteration_id: str,
use_access_token: bool,
access_token: Optional[str],
) -> None:
self._provider_id = f"sumo_{sumo_id_of_case}__{iteration_id}"
self._case_sumo_id = sumo_id_of_case
self._iteration_id = iteration_id
self._use_access_token = use_access_token
self._access_token = access_token

self._cache_prefix = f"ProviderImplSumo_{self._provider_id}__"
self._cache = cache

# my_cache_dir = cache_dir / Path(
# "ProviderImplSumo_StatSurfCache_" + self._provider_id
# )
# self._stat_surf_cache = StatSurfCache(my_cache_dir)

self._cached_sumo_case: Optional[webviz_sumo.Case] = None

def provider_id(self) -> str:
return self._provider_id

def attributes(self) -> List[str]:
timer = PerfTimer()

cache_key = f"{self._cache_prefix}__attributes"
cached_arr = self._cache.get(cache_key)
if cached_arr is not None:
LOGGER.debug(
f"ProviderImplSumo.attributes() from cache in: {timer.elapsed_s():.2f}s"
)
return cached_arr

case = self._get_my_sumo_case_obj()
attrib_names = case.get_object_property_values(
"tag_name", "surface", iteration_ids=[self._iteration_id]
)
attrib_names = sorted(attrib_names)

self._cache.set(cache_key, attrib_names)

LOGGER.debug(
f"ProviderImplSumo.attributes() completed using Sumo in: {timer.elapsed_s():.2f}s"
)

return attrib_names

def surface_names_for_attribute(self, surface_attribute: str) -> List[str]:
timer = PerfTimer()

cache_key = (
f"{self._cache_prefix}__surface_names_for_attribute_{surface_attribute}"
)
cached_arr = self._cache.get(cache_key)
if cached_arr is not None:
LOGGER.debug(
f"ProviderImplSumo.surface_names_for_attribute({surface_attribute}) from cache in: {timer.elapsed_s():.2f}s"
)
return cached_arr

case = self._get_my_sumo_case_obj()
surf_names = case.get_object_property_values(
"object_name",
"surface",
iteration_ids=[self._iteration_id],
tag_names=[surface_attribute],
)
surf_names = sorted(surf_names)

self._cache.set(cache_key, surf_names)

LOGGER.debug(
f"ProviderImplSumo.surface_names_for_attribute({surface_attribute}) completed using Sumo in: {timer.elapsed_s():.2f}s"
)

return surf_names

def surface_dates_for_attribute(
self, surface_attribute: str
) -> Optional[List[str]]:
timer = PerfTimer()

cache_key = (
f"{self._cache_prefix}__surface_dates_for_attribute_{surface_attribute}"
)
cached_arr = self._cache.get(cache_key)
if cached_arr is not None:
LOGGER.debug(
f"ProviderImplSumo.surface_dates_for_attribute({surface_attribute}) from cache in: {timer.elapsed_s():.2f}s"
)
if len(cached_arr) == 1 and not bool(cached_arr[0]):
return None
return cached_arr

case = self._get_my_sumo_case_obj()
time_intervals = case.get_object_property_values(
"time_interval",
"surface",
iteration_ids=[self._iteration_id],
tag_names=[surface_attribute],
)

datestr_arr: List[str] = []
for interval_str in time_intervals:
datestr_arr.append(interval_str if interval_str != "NULL" else "")

datestr_arr = sorted(datestr_arr)

self._cache.set(cache_key, datestr_arr)

LOGGER.debug(
f"ProviderImplSumo.surface_dates_for_attribute({surface_attribute}) completed using Sumo in: {timer.elapsed_s():.2f}s"
)

if len(datestr_arr) == 1 and not bool(datestr_arr[0]):
return None

return datestr_arr

def realizations(self) -> List[int]:
timer = PerfTimer()

cache_key = f"{self._cache_prefix}__realizations"
cached_arr = self._cache.get(cache_key)
if cached_arr is not None:
LOGGER.debug(
f"ProviderImplSumo.realizations() from cache in: {timer.elapsed_s():.2f}s"
)
return cached_arr

case = self._get_my_sumo_case_obj()
realization_ids = case.get_object_property_values("realization_id", "surface")
realization_ids = sorted(realization_ids)

self._cache.set(cache_key, realization_ids)

LOGGER.debug(
f"ProviderImplSumo.realizations() completed using Sumo in: {timer.elapsed_s():.2f}s"
)

return realization_ids

def get_surface(
self,
address: SurfaceAddress,
) -> Optional[xtgeo.RegularSurface]:
if isinstance(address, StatisticalSurfaceAddress):
return self._get_statistical_surface(address)
if isinstance(address, SimulatedSurfaceAddress):
return self._get_simulated_surface(address)
if isinstance(address, ObservedSurfaceAddress):
return None

raise TypeError("Unknown type of surface address")

def _get_my_sumo_case_obj(self) -> webviz_sumo.Case:

if self._cached_sumo_case is not None:
return self._cached_sumo_case

timer = PerfTimer()

if self._use_access_token:
sumo = webviz_sumo.create_explorer(self._access_token)
else:
sumo = webviz_sumo.create_interactive_explorer()
et_create_s = timer.lap_s()

self._cached_sumo_case = sumo.get_case_by_id(self._case_sumo_id)
et_get_s = timer.lap_s()

LOGGER.debug(
f"_get_my_sumo_case_obj() took: {timer.elapsed_s():.2f}s "
f"(create={et_create_s:.2f}s, get={et_get_s:.2f}s)"
)

return self._cached_sumo_case

def _get_simulated_surface(
self, address: SimulatedSurfaceAddress
) -> Optional[xtgeo.RegularSurface]:
"""Returns a Xtgeo surface instance of a single realization surface"""

timer = PerfTimer()

case = self._get_my_sumo_case_obj()

time_intervals_list = [address.datestr] if address.datestr is not None else []

surface_collection = case.get_objects(
"surface",
iteration_ids=[self._iteration_id],
realization_ids=[address.realization],
tag_names=[address.attribute],
object_names=[address.name],
time_intervals=time_intervals_list,
)

num_surfaces = len(surface_collection)
if num_surfaces == 0:
LOGGER.warning(f"No simulated surface found in Sumo for {address}")
return None
if num_surfaces > 1:
LOGGER.warning(
f"Multiple simulated surfaces found in Sumo for: {address}"
"Returning first surface."
)

surf = surface_collection[0]
blob_bytes: bytes = surf.blob
byte_stream = BytesIO(blob_bytes)
xtgeo_surf = xtgeo.surface_from_file(byte_stream)

LOGGER.debug(
f"ProviderImplSumo loaded simulated surface from Sumo in: {timer.elapsed_s():.2f}s"
)

return xtgeo_surf

def _get_statistical_surface(
self, address: StatisticalSurfaceAddress
) -> Optional[xtgeo.RegularSurface]:

timer = PerfTimer()

time_intervals_list = [address.datestr] if address.datestr is not None else []

case = self._get_my_sumo_case_obj()
et_get_case_s = timer.lap_s()

surface_collection = case.get_objects(
"surface",
iteration_ids=[self._iteration_id],
realization_ids=address.realizations,
tag_names=[address.attribute],
object_names=[address.name],
time_intervals=time_intervals_list,
)
et_get_coll_s = timer.lap_s()

surf_count = len(surface_collection)
if surf_count == 0:
LOGGER.warning(f"No simulated surfaces found in Sumo for {address}")
return None

surfstat_to_sumostatstr_map = {
SurfaceStatistic.MEAN: "MEAN",
SurfaceStatistic.STDDEV: "STD",
SurfaceStatistic.MINIMUM: "MIN",
SurfaceStatistic.MAXIMUM: "MAX",
SurfaceStatistic.P10: "P10",
SurfaceStatistic.P90: "P90",
}
sumo_aggr_str = surfstat_to_sumostatstr_map[address.statistic]

agg_surf_bytes: bytes = surface_collection.aggregate(sumo_aggr_str)
et_calc_s = timer.lap_s()

byte_stream = BytesIO(agg_surf_bytes)
xtgeo_surf = xtgeo.surface_from_file(byte_stream)

LOGGER.debug(
f"ProviderImplSumo calculated statistical surface using Sumo in: "
f"{timer.elapsed_s():.2f}s ("
f"get_case={et_get_case_s:.2f}s, get_coll={et_get_coll_s:.2f}s, calc={et_calc_s:.2f}s), "
f"[#surfaces={surf_count}, stat={address.statistic}, "
f"attr={address.attribute}, name={address.name}, date={address.datestr}]"
)

return xtgeo_surf
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,16 @@ def surface_to_png_bytes_optimized(surface: xtgeo.RegularSurface) -> bytes:
surf_values_ma: np.ma.MaskedArray = surface.values

surf_values_ma = np.flip(surf_values_ma.transpose(), axis=0) # type: ignore
LOGGER.debug(f"flip/transpose: {timer.lap_s():.2f}s")
# LOGGER.debug(f"flip/transpose: {timer.lap_s():.2f}s")

# This will be a flat bool array with true for all valid entries
valid_arr = np.invert(np.ma.getmaskarray(surf_values_ma).flatten())
LOGGER.debug(f"get valid_arr: {timer.lap_s():.2f}s")
# LOGGER.debug(f"get valid_arr: {timer.lap_s():.2f}s")

shape = surf_values_ma.shape
min_val = surf_values_ma.min()
max_val = surf_values_ma.max()
LOGGER.debug(f"minmax: {timer.lap_s():.2f}s")
# LOGGER.debug(f"minmax: {timer.lap_s():.2f}s")

if min_val == 0.0 and max_val == 0.0:
scale_factor = 1.0
Expand All @@ -112,10 +112,10 @@ def surface_to_png_bytes_optimized(surface: xtgeo.RegularSurface) -> bytes:
# Get a NON-masked array with all undefined entries filled with 0
scaled_values = scaled_values_ma.filled(0)

LOGGER.debug(f"scale and fill: {timer.lap_s():.2f}s")
# LOGGER.debug(f"scale and fill: {timer.lap_s():.2f}s")

val_arr = scaled_values.astype(np.uint32).ravel()
LOGGER.debug(f"cast and flatten: {timer.lap_s():.2f}s")
# LOGGER.debug(f"cast and flatten: {timer.lap_s():.2f}s")

val = val_arr.view(dtype=np.uint8)
rgba_arr = np.empty(4 * len(val_arr), dtype=np.uint8)
Expand All @@ -124,22 +124,22 @@ def surface_to_png_bytes_optimized(surface: xtgeo.RegularSurface) -> bytes:
rgba_arr[2::4] = val[0::4]
rgba_arr[3::4] = np.multiply(valid_arr, 255).astype(np.uint8)

LOGGER.debug(f"rgba combine: {timer.lap_s():.2f}s")
# LOGGER.debug(f"rgba combine: {timer.lap_s():.2f}s")

# Back to 2d shape + 1 dimension for the rgba values.
rgba_arr_reshaped = rgba_arr.reshape((shape[0], shape[1], 4))

image = Image.fromarray(rgba_arr_reshaped, "RGBA")
LOGGER.debug(f"create: {timer.lap_s():.2f}s")
# LOGGER.debug(f"create: {timer.lap_s():.2f}s")

byte_io = io.BytesIO()
image.save(byte_io, format="png", compress_level=1)
LOGGER.debug(f"save png to bytes: {timer.lap_s():.2f}s")
# LOGGER.debug(f"save png to bytes: {timer.lap_s():.2f}s")

byte_io.seek(0)
ret_bytes = byte_io.read()
LOGGER.debug(f"read bytes: {timer.lap_s():.2f}s")
# LOGGER.debug(f"read bytes: {timer.lap_s():.2f}s")

LOGGER.debug(f"Total time: {timer.elapsed_s():.2f}s")
# LOGGER.debug(f"Total time: {timer.elapsed_s():.2f}s")

return ret_bytes
Loading