diff --git a/src/ert/run_models/everest_run_model.py b/src/ert/run_models/everest_run_model.py index aa28a26ef55..5e44ea25a6d 100644 --- a/src/ert/run_models/everest_run_model.py +++ b/src/ert/run_models/everest_run_model.py @@ -11,6 +11,7 @@ from collections import defaultdict from collections.abc import Callable, Mapping from dataclasses import dataclass +from itertools import count from pathlib import Path from types import TracebackType from typing import ( @@ -38,7 +39,6 @@ from ert.storage import open_storage from everest.config import EverestConfig from everest.optimizer.everest2ropt import everest2ropt -from everest.simulator import SimulatorCache from everest.simulator.everest_to_ert import everest_to_ert_config from everest.strings import EVEREST @@ -179,12 +179,12 @@ def __init__( None ) self._max_batch_num_reached = False - self._simulator_cache: SimulatorCache | None = None + self._cache: _EvaluatorCache | None = None if ( everest_config.simulator is not None and everest_config.simulator.enable_cache ): - self._simulator_cache = SimulatorCache() + self._cache = _EvaluatorCache() self._experiment: Experiment | None = None self.eval_server_cfg: EvaluatorServerConfig | None = None storage = open_storage(config.ens_path, mode="w") @@ -450,7 +450,7 @@ def _add_control( ) -> None: group_name = control_name[0] variable_name = control_name[1] - group = controls[group_name] + group = controls.get(group_name, {}) if len(control_name) > 2: index_name = str(control_name[2]) if variable_name in group: @@ -459,61 +459,61 @@ def _add_control( group[variable_name] = {index_name: control_value} else: group[variable_name] = control_value + controls[group_name] = group @staticmethod - def _get_active_results( + def _get_calculated_results( results: list[dict[str, NDArray[np.float64]]], names: tuple[str], controls: NDArray[np.float64], - active: NDArray[np.bool_], + case_data: dict[str, Any], ) -> NDArray[np.float64]: + control_indices = list(case_data.keys()) values = np.zeros((controls.shape[0], len(names)), dtype=float64) for func_idx, name in enumerate(names): - values[active, func_idx] = np.fromiter( + values[control_indices, func_idx] = np.fromiter( (np.nan if not result else result[name][0] for result in results), dtype=np.float64, ) return values - def init_case_data( - self, - control_values: NDArray[np.float64], - metadata: EvaluatorContext, - realization_ids: list[int], - ) -> tuple[ - list[tuple[int, defaultdict[str, Any]]], NDArray[np.bool_], dict[int, int] - ]: - active = ( - np.ones(control_values.shape[0], dtype=np.bool_) - if metadata.active is None - else np.fromiter( - (metadata.active[realization] for realization in metadata.realizations), - dtype=np.bool_, - ) - ) - case_data = [] + def _get_cached_results( + self, control_values: NDArray[np.float64], evaluator_context: EvaluatorContext + ) -> dict[int, int]: cached = {} - - for sim_idx, real_id in enumerate(realization_ids): - if self._simulator_cache is not None: - cache_id = self._simulator_cache.find_key( - real_id, control_values[sim_idx, :] + if self._cache is not None: + for sim_idx, realization in enumerate(evaluator_context.realizations): + cache_id = self._cache.find_key( + evaluator_context.config.realizations.names[realization], + control_values[sim_idx, :], ) if cache_id is not None: cached[sim_idx] = cache_id - active[sim_idx] = False + return cached - if active[sim_idx]: - controls: defaultdict[str, Any] = defaultdict(dict) - assert metadata.config.variables.names is not None + def _init_case_data( + self, + control_values: NDArray[np.float64], + evaluator_context: EvaluatorContext, + cached: dict[int, int], + ) -> dict[int, dict[str, Any]]: + case_data = {} + for control_idx in range(control_values.shape[0]): + add_to_case = ( + evaluator_context.active is None + or evaluator_context.active[evaluator_context.realizations[control_idx]] + ) + if add_to_case and control_idx not in cached: + controls: dict[str, Any] = {} + assert evaluator_context.config.variables.names is not None for control_name, control_value in zip( - metadata.config.variables.names, - control_values[sim_idx, :], + evaluator_context.config.variables.names, + control_values[control_idx, :], strict=False, ): self._add_control(controls, control_name, control_value) - case_data.append((real_id, controls)) - return case_data, active, cached + case_data[control_idx] = controls + return case_data def _setup_sim( self, @@ -569,40 +569,26 @@ def _check_suffix( control_name, sim_id, ExtParamConfig.to_dataset(control) ) - def _forward_model_evaluator( - self, control_values: NDArray[np.float64], metadata: EvaluatorContext - ) -> EvaluatorResult: + def _get_run_args( + self, + ensemble: Ensemble, + evaluator_context: EvaluatorContext, + case_data: dict[str, Any], + ) -> list[RunArg]: def _slug(entity: str) -> str: entity = " ".join(str(entity).split()) return "".join([x if x.isalnum() else "_" for x in entity.strip()]) - self.status = None # Reset the current run status - assert metadata.config.realizations.names - realization_ids = [ - metadata.config.realizations.names[realization] - for realization in metadata.realizations - ] - case_data, active, cached = self.init_case_data( - control_values=control_values, - metadata=metadata, - realization_ids=realization_ids, - ) - assert self._experiment - ensemble = self._experiment.create_ensemble( - name=f"batch_{self.batch_id}", - ensemble_size=len(case_data), - ) - for sim_id, (geo_id, controls) in enumerate(case_data): - assert isinstance(geo_id, int) - self._setup_sim(sim_id, controls, ensemble) - substitutions = self.ert_config.substitutions # fill in the missing geo_id data substitutions[""] = _slug(ensemble.name) self.active_realizations = [True] * len(case_data) - for sim_id, (geo_id, _) in enumerate(case_data): + for sim_id, control_idx in enumerate(case_data.keys()): if self.active_realizations[sim_id]: - substitutions[f""] = str(geo_id) + realization = evaluator_context.realizations[control_idx] + substitutions[f""] = str( + evaluator_context.config.realizations.names[realization] + ) run_paths = Runpaths( jobname_format=self.ert_config.model_config.jobname_format_string, @@ -611,24 +597,73 @@ def _slug(entity: str) -> str: substitutions=substitutions, eclbase=self.ert_config.model_config.eclbase_format_string, ) - run_args = create_run_arguments( + return create_run_arguments( run_paths, self.active_realizations, ensemble=ensemble, ) - self._context_env.update( - { - "_ERT_EXPERIMENT_ID": str(ensemble.experiment_id), - "_ERT_ENSEMBLE_ID": str(ensemble.id), - "_ERT_SIMULATION_MODE": "batch_simulation", - } + def _get_evaluator_result( + self, + control_values: NDArray[np.float64], + evaluator_context: EvaluatorContext, + case_data: dict[str, Any], + results: list[dict[str, npt.NDArray[np.float64]]], + cached: dict[int, int], + ) -> EvaluatorResult: + # We minimize the negative of the objectives: + objectives = -self._get_calculated_results( + results, + evaluator_context.config.objectives.names, # type: ignore + control_values, + case_data, ) - assert self.eval_server_cfg - self._evaluate_and_postprocess(run_args, ensemble, self.eval_server_cfg) - self._delete_runpath(run_args) - # gather results + constraints = None + if evaluator_context.config.nonlinear_constraints is not None: + constraints = self._get_calculated_results( + results, + evaluator_context.config.nonlinear_constraints.names, # type: ignore + control_values, + case_data, + ) + + if cached: + for control_idx, cache_id in cached.items(): + objectives[control_idx, ...] = self._cache.objectives[cache_id] + if constraints is not None: + constraints[control_idx, ...] = self._cache.constraints[cache_id] + + sim_ids = np.full(control_values.shape[0], -1, dtype=np.intc) + sim_ids[list(case_data.keys())] = np.arange(len(case_data), dtype=np.intc) + return EvaluatorResult( + objectives=objectives, + constraints=constraints, + batch_id=self.batch_id, + evaluation_ids=sim_ids, + ) + + def _add_results_to_cache( + self, + control_values: NDArray[np.float64], + evaluator_context: EvaluatorContext, + case_data: dict[str, Any], + objectives: NDArray[np.float64], + constraints: NDArray[np.float64] | None, + ) -> None: + if self._cache is not None: + for control_idx in case_data: + realization = evaluator_context.realizations[control_idx] + self._cache.add_results( + evaluator_context.config.realizations.names[realization], + control_values[control_idx, ...], + objectives[control_idx, ...], + constraints[control_idx, ...], + ) + + def _gather_results( + self, ensemble: Ensemble + ) -> list[dict[str, npt.NDArray[np.float64]]]: results: list[dict[str, npt.NDArray[np.float64]]] = [] for sim_id, successful in enumerate(self.active_realizations): if not successful: @@ -640,61 +675,65 @@ def _slug(entity: str) -> str: data = ensemble.load_responses(key, (sim_id,)) d[key] = data["values"].to_numpy() results.append(d) - for fnc_name, alias in self.everest_config.function_aliases.items(): for result in results: result[fnc_name] = result[alias] + return results - objectives = self._get_active_results( - results, - metadata.config.objectives.names, # type: ignore - control_values, - active, - ) + def _forward_model_evaluator( + self, control_values: NDArray[np.float64], evaluator_context: EvaluatorContext + ) -> EvaluatorResult: + # Reset the current run status: + self.status = None - constraints = None - if metadata.config.nonlinear_constraints is not None: - constraints = self._get_active_results( - results, - metadata.config.nonlinear_constraints.names, # type: ignore - control_values, - active, - ) + # Get any cached results that may be useful: + cached = self._get_cached_results(control_values, evaluator_context) - if self._simulator_cache is not None: - for sim_idx, cache_id in cached.items(): - objectives[sim_idx, ...] = self._simulator_cache.get_objectives( - cache_id - ) - if constraints is not None: - constraints[sim_idx, ...] = self._simulator_cache.get_constraints( - cache_id - ) + # Create the case to run: + case_data = self._init_case_data(control_values, evaluator_context, cached) - sim_ids = np.empty(control_values.shape[0], dtype=np.intc) - sim_ids.fill(-1) - sim_ids[active] = np.arange(len(results), dtype=np.intc) + # Initialize a new experiment in storage: + assert self._experiment + ensemble = self._experiment.create_ensemble( + name=f"batch_{self.batch_id}", + ensemble_size=len(case_data), + ) + for sim_id, controls in enumerate(case_data.values()): + self._setup_sim(sim_id, controls, ensemble) - # Add the results from active simulations to the cache: - if self._simulator_cache is not None: - for sim_idx, real_id in enumerate(realization_ids): - if active[sim_idx]: - self._simulator_cache.add_simulation_results( - sim_idx, real_id, control_values, objectives, constraints - ) + # Evaluate the case: + run_args = self._get_run_args(ensemble, evaluator_context, case_data) + self._context_env.update( + { + "_ERT_EXPERIMENT_ID": str(ensemble.experiment_id), + "_ERT_ENSEMBLE_ID": str(ensemble.id), + "_ERT_SIMULATION_MODE": "batch_simulation", + } + ) + assert self.eval_server_cfg + self._evaluate_and_postprocess(run_args, ensemble, self.eval_server_cfg) - # Note the negative sign for the objective results. Everest aims to do a - # maximization, while the standard practice of minimizing is followed by - # ropt. Therefore we will minimize the negative of the objectives: - evaluator_result = EvaluatorResult( - objectives=-objectives, - constraints=constraints, - batch_id=self.batch_id, - evaluation_ids=sim_ids, + # If necessary, delete the run path: + self._delete_runpath(run_args) + + # Gather the results and create the result for ropt: + results = self._gather_results(ensemble) + evaluator_result = self._get_evaluator_result( + control_values, evaluator_context, case_data, results, cached ) + # Increase the batch ID for the next evaluation: self.batch_id += 1 + # Add the results from the evaluations to the cache: + self._add_results_to_cache( + control_values, + evaluator_context, + case_data, + evaluator_result.objectives, + evaluator_result.constraints, + ) + return evaluator_result def send_snapshot_event(self, event: Event, iteration: int) -> None: @@ -741,3 +780,34 @@ def _simulation_status(self, snapshot: EnsembleSnapshot) -> SimulationStatus: "progress": jobs_progress, "batch_number": self.batch_id, } + + +class _EvaluatorCache: + EPS = float(np.finfo(np.float32).eps) + + def __init__(self) -> None: + self._keys: defaultdict[int, list[tuple[NDArray[np.float64], int]]] = ( + defaultdict(list) + ) + self.objectives: dict[int, NDArray[np.float64]] = {} + self.constraints: dict[int, NDArray[np.float64]] = {} + self._counter = count() + + def add_results( + self, + real_id: int, + control_values: NDArray[np.float64], + objectives: NDArray[np.float64], + constraints: NDArray[np.float64] | None, + ): + cache_id = next(self._counter) + self._keys[real_id].append((control_values.copy(), cache_id)) + self.objectives[cache_id] = objectives.copy() + if constraints is not None: + self.constraints[cache_id] = constraints.copy() + + def find_key(self, real_id: int, controls: NDArray[np.float64]) -> int | None: + for cached, cache_id in self._keys.get(real_id, []): + if np.allclose(controls, cached, rtol=0.0, atol=self.EPS): + return cache_id + return None diff --git a/src/everest/simulator/__init__.py b/src/everest/simulator/__init__.py index 02e9a03f80e..b1611081e43 100644 --- a/src/everest/simulator/__init__.py +++ b/src/everest/simulator/__init__.py @@ -1,5 +1,3 @@ -from everest.simulator.simulator_cache import SimulatorCache - JOB_SUCCESS = "Finished" JOB_WAITING = "Waiting" JOB_RUNNING = "Running" @@ -109,5 +107,4 @@ "JOB_RUNNING", "JOB_SUCCESS", "JOB_WAITING", - "SimulatorCache", ] diff --git a/src/everest/simulator/simulator_cache.py b/src/everest/simulator/simulator_cache.py deleted file mode 100644 index db4e3a7ae3c..00000000000 --- a/src/everest/simulator/simulator_cache.py +++ /dev/null @@ -1,58 +0,0 @@ -from collections import defaultdict -from itertools import count - -import numpy as np -from numpy._typing import NDArray - - -# This cache can be used to prevent re-evaluation of forward models. Due to its -# simplicity it has some limitations: -# - There is no limit on the number of cached entries. -# - Searching in the cache is by brute-force, iterating over the entries. -# Both of these should not be an issue for the intended use with cases where the -# forward models are very expensive to compute: The number of cached entries is -# not expected to become prohibitively large. -class SimulatorCache: - def __init__(self) -> None: - # Stores the realization/controls key, together with an ID. - self._keys: defaultdict[int, list[tuple[NDArray[np.float64], int]]] = ( - defaultdict(list) - ) - # Store objectives and constraints by ID: - self._objectives: dict[int, NDArray[np.float64]] = {} - self._constraints: dict[int, NDArray[np.float64]] = {} - - # Generate unique ID's: - self._counter = count() - - def add_simulation_results( - self, - sim_idx: int, - real_id: int, - control_values: NDArray[np.float64], - objectives: NDArray[np.float64], - constraints: NDArray[np.float64] | None, - ): - cache_id = next(self._counter) - self._keys[real_id].append((control_values[sim_idx, :].copy(), cache_id)) - self._objectives[cache_id] = objectives[sim_idx, ...].copy() - if constraints is not None: - self._constraints[cache_id] = constraints[sim_idx, ...].copy() - - def find_key(self, real_id: int, control_vector: NDArray[np.float64]) -> int | None: - # Brute-force search, premature optimization is the root of all evil: - for cached_vector, cache_id in self._keys.get(real_id, []): - if np.allclose( - control_vector, - cached_vector, - rtol=0.0, - atol=float(np.finfo(np.float32).eps), - ): - return cache_id - return None - - def get_objectives(self, cache_id: int) -> NDArray[np.float64]: - return self._objectives[cache_id] - - def get_constraints(self, cache_id: int) -> NDArray[np.float64]: - return self._constraints[cache_id]