diff --git a/src/ert/run_models/everest_run_model.py b/src/ert/run_models/everest_run_model.py index aa28a26ef55..05e8187d8b8 100644 --- a/src/ert/run_models/everest_run_model.py +++ b/src/ert/run_models/everest_run_model.py @@ -6,19 +6,14 @@ import logging import os import queue -import random import shutil from collections import defaultdict -from collections.abc import Callable, Mapping +from collections.abc import Callable from dataclasses import dataclass +from itertools import count from pathlib import Path from types import TracebackType -from typing import ( - TYPE_CHECKING, - Any, - Literal, - Protocol, -) +from typing import TYPE_CHECKING, Any, Literal, Protocol import numpy as np import seba_sqlite.sqlite_storage @@ -38,7 +33,6 @@ from ert.storage import open_storage from everest.config import EverestConfig from everest.optimizer.everest2ropt import everest2ropt -from everest.simulator import SimulatorCache from everest.simulator.everest_to_ert import everest_to_ert_config from everest.strings import EVEREST @@ -51,57 +45,6 @@ from ert.storage import Ensemble, Experiment -# A number of settings for the table reporters: -RESULT_COLUMNS = { - "result_id": "ID", - "batch_id": "Batch", - "functions.weighted_objective": "Total-Objective", - "linear_constraints.violations": "IC-violation", - "nonlinear_constraints.violations": "OC-violation", - "functions.objectives": "Objective", - "functions.constraints": "Constraint", - "evaluations.variables": "Control", - "linear_constraints.values": "IC-diff", - "nonlinear_constraints.values": "OC-diff", - "functions.scaled_objectives": "Scaled-Objective", - "functions.scaled_constraints": "Scaled-Constraint", - "evaluations.scaled_variables": "Scaled-Control", - "nonlinear_constraints.scaled_values": "Scaled-OC-diff", - "nonlinear_constraints.scaled_violations": "Scaled-OC-violation", -} -GRADIENT_COLUMNS = { - "result_id": "ID", - "batch_id": "Batch", - "gradients.weighted_objective": "Total-Gradient", - "gradients.objectives": "Grad-objective", - "gradients.constraints": "Grad-constraint", -} -SIMULATION_COLUMNS = { - "result_id": "ID", - "batch_id": "Batch", - "realization": "Realization", - "evaluations.evaluation_ids": "Simulation", - "evaluations.variables": "Control", - "evaluations.objectives": "Objective", - "evaluations.constraints": "Constraint", - "evaluations.scaled_variables": "Scaled-Control", - "evaluations.scaled_objectives": "Scaled-Objective", - "evaluations.scaled_constraints": "Scaled-Constraint", -} -PERTURBATIONS_COLUMNS = { - "result_id": "ID", - "batch_id": "Batch", - "realization": "Realization", - "evaluations.perturbed_evaluation_ids": "Simulation", - "evaluations.perturbed_variables": "Control", - "evaluations.perturbed_objectives": "Objective", - "evaluations.perturbed_constraints": "Constraint", - "evaluations.scaled_perturbed_variables": "Scaled-Control", - "evaluations.scaled_perturbed_objectives": "Scaled-Objective", - "evaluations.scaled_perturbed_constraints": "Scaled-Constraint", -} -MIN_HEADER_LEN = 3 - logger = logging.getLogger(__name__) @@ -156,7 +99,14 @@ def __init__( optimization_callback: OptimizerCallback, display_all_jobs: bool = True, ): - everest_config = self._add_defaults(everest_config) + assert everest_config.environment is not None + logging.getLogger(EVEREST).info( + "Using random seed: %d", everest_config.environment.random_seed + ) + logging.getLogger(EVEREST).info( + "To deterministically reproduce this experiment, " + "add the above random seed to your configuration file." + ) Path(everest_config.log_dir).mkdir(parents=True, exist_ok=True) Path(everest_config.optimization_output_dir).mkdir(parents=True, exist_ok=True) @@ -179,12 +129,12 @@ def __init__( None ) self._max_batch_num_reached = False - self._simulator_cache: SimulatorCache | None = None + self._evaluator_cache: _EvaluatorCache | None = None if ( everest_config.simulator is not None and everest_config.simulator.enable_cache ): - self._simulator_cache = SimulatorCache() + self._evaluator_cache = _EvaluatorCache() self._experiment: Experiment | None = None self.eval_server_cfg: EvaluatorServerConfig | None = None storage = open_storage(config.ens_path, mode="w") @@ -202,31 +152,6 @@ def __init__( self.num_retries_per_iter = 0 # OK? - @staticmethod - def _add_defaults(config: EverestConfig) -> EverestConfig: - """This function exists as a temporary mechanism to default configurations that - needs to be global in the sense that they should carry over both to ropt and ERT. - When the proper mechanism for this is implemented this code - should die. - - """ - defaulted_config = config.copy() - assert defaulted_config.environment is not None - - random_seed = defaulted_config.environment.random_seed - if random_seed is None: - random_seed = random.randint(1, 2**30) - - defaulted_config.environment.random_seed = random_seed - - logging.getLogger(EVEREST).info("Using random seed: %d", random_seed) - logging.getLogger(EVEREST).info( - "To deterministically reproduce this experiment, " - "add the above random seed to your configuration file." - ) - - return defaulted_config - @classmethod def create( cls, @@ -242,7 +167,7 @@ def default_simulation_callback( def default_optimization_callback() -> str | None: return None - ert_config = everest_to_ert_config(cls._add_defaults(ever_config)) + ert_config = everest_to_ert_config(ever_config) return cls( config=ert_config, everest_config=ever_config, @@ -251,6 +176,28 @@ def default_optimization_callback() -> str | None: or default_optimization_callback, ) + @classmethod + def name(cls) -> str: + return "Optimization run" + + @classmethod + def description(cls) -> str: + return "Run batches " + + @property + def exit_code( + self, + ) -> Literal["max_batch_num_reached"] | OptimizerExitCode | None: + return self._exit_code + + @property + def result(self) -> OptimalResult | None: + return self._result + + def __repr__(self) -> str: + config_json = json.dumps(self.everest_config, sort_keys=True, indent=2) + return f"EverestRunModel(config={config_json})" + def run_experiment( self, evaluator_server_config: EvaluatorServerConfig, restart: bool = False ) -> None: @@ -290,93 +237,63 @@ def run_experiment( else optimizer_exit_code ) - def check_if_runpath_exists(self) -> bool: - return ( - self.everest_config.simulation_dir is not None - and os.path.exists(self.everest_config.simulation_dir) - and any(os.listdir(self.everest_config.simulation_dir)) - ) - - def _handle_errors( - self, - batch: int, - simulation: Any, - realization: str, - fm_name: str, - error_path: str, - ) -> None: - fm_id = f"b_{batch}_r_{realization}_s_{simulation}_{fm_name}" - fm_logger = logging.getLogger("forward_models") - with open(error_path, encoding="utf-8") as errors: - error_str = errors.read() - - error_hash = hash(error_str) - err_msg = "Batch: {} Realization: {} Simulation: {} Job: {} Failed {}".format( - batch, realization, simulation, fm_name, "Error: {}\n {}" - ) - - if error_hash not in self._fm_errors: - error_id = len(self._fm_errors) - fm_logger.error(err_msg.format(error_id, error_str)) - self._fm_errors.update({error_hash: {"error_id": error_id, "ids": [fm_id]}}) - elif fm_id not in self._fm_errors[error_hash]["ids"]: - self._fm_errors[error_hash]["ids"].append(fm_id) - error_id = self._fm_errors[error_hash]["error_id"] - fm_logger.error(err_msg.format(error_id, "")) - - def _delete_runpath(self, run_args: list[RunArg]) -> None: - logging.getLogger(EVEREST).debug("Simulation callback called") - if self._simulation_delete_run_path: - for i, real in self.get_current_snapshot().reals.items(): - path_to_delete = run_args[int(i)].runpath - if real["status"] == "Finished" and os.path.isdir(path_to_delete): - - def onerror( - _: Callable[..., Any], - path: str, - sys_info: tuple[ - type[BaseException], BaseException, TracebackType - ], - ) -> None: - logging.getLogger(EVEREST).debug( - f"Failed to remove {path}, {sys_info}" - ) - - shutil.rmtree(path_to_delete, onerror=onerror) # pylint: disable=deprecated-argument - - def _on_before_forward_model_evaluation( - self, _: OptimizerEvent, optimizer: BasicOptimizer - ) -> None: - logging.getLogger(EVEREST).debug("Optimization callback called") - - if ( - self.everest_config.optimization is not None - and self.everest_config.optimization.max_batch_num is not None - and (self.batch_id >= self.everest_config.optimization.max_batch_num) - ): - self._max_batch_num_reached = True - logging.getLogger(EVEREST).info("Maximum number of batches reached") - optimizer.abort_optimization() - if ( - self._opt_callback is not None - and self._opt_callback() == "stop_optimization" - ): - logging.getLogger(EVEREST).info("User abort requested.") - optimizer.abort_optimization() - def _create_optimizer(self) -> BasicOptimizer: - assert ( - self.everest_config.environment is not None - and self.everest_config.environment is not None - ) - - ropt_output_folder = Path(self.everest_config.optimization_output_dir) + RESULT_COLUMNS = { + "result_id": "ID", + "batch_id": "Batch", + "functions.weighted_objective": "Total-Objective", + "linear_constraints.violations": "IC-violation", + "nonlinear_constraints.violations": "OC-violation", + "functions.objectives": "Objective", + "functions.constraints": "Constraint", + "evaluations.variables": "Control", + "linear_constraints.values": "IC-diff", + "nonlinear_constraints.values": "OC-diff", + "functions.scaled_objectives": "Scaled-Objective", + "functions.scaled_constraints": "Scaled-Constraint", + "evaluations.scaled_variables": "Scaled-Control", + "nonlinear_constraints.scaled_values": "Scaled-OC-diff", + "nonlinear_constraints.scaled_violations": "Scaled-OC-violation", + } + GRADIENT_COLUMNS = { + "result_id": "ID", + "batch_id": "Batch", + "gradients.weighted_objective": "Total-Gradient", + "gradients.objectives": "Grad-objective", + "gradients.constraints": "Grad-constraint", + } + SIMULATION_COLUMNS = { + "result_id": "ID", + "batch_id": "Batch", + "realization": "Realization", + "evaluations.evaluation_ids": "Simulation", + "evaluations.variables": "Control", + "evaluations.objectives": "Objective", + "evaluations.constraints": "Constraint", + "evaluations.scaled_variables": "Scaled-Control", + "evaluations.scaled_objectives": "Scaled-Objective", + "evaluations.scaled_constraints": "Scaled-Constraint", + } + PERTURBATIONS_COLUMNS = { + "result_id": "ID", + "batch_id": "Batch", + "realization": "Realization", + "evaluations.perturbed_evaluation_ids": "Simulation", + "evaluations.perturbed_variables": "Control", + "evaluations.perturbed_objectives": "Objective", + "evaluations.perturbed_constraints": "Constraint", + "evaluations.scaled_perturbed_variables": "Scaled-Control", + "evaluations.scaled_perturbed_objectives": "Scaled-Objective", + "evaluations.scaled_perturbed_constraints": "Scaled-Constraint", + } + MIN_HEADER_LEN = 3 # Initialize the optimizer with output tables. `min_header_len` is set # to ensure that all tables have the same number of header lines, # simplifying code that reads them as fixed width tables. `maximize` is # set because ropt reports minimization results, while everest wants # maximization results, necessitating a conversion step. + ropt_output_folder = Path(self.everest_config.optimization_output_dir) optimizer = ( BasicOptimizer( enopt_config=self.ropt_config, evaluator=self._forward_model_evaluator @@ -420,101 +337,157 @@ def _create_optimizer(self) -> BasicOptimizer: return optimizer - @classmethod - def name(cls) -> str: - return "Optimization run" + def _on_before_forward_model_evaluation( + self, _: OptimizerEvent, optimizer: BasicOptimizer + ) -> None: + logging.getLogger(EVEREST).debug("Optimization callback called") - @classmethod - def description(cls) -> str: - return "Run batches " + if ( + self.everest_config.optimization is not None + and self.everest_config.optimization.max_batch_num is not None + and (self.batch_id >= self.everest_config.optimization.max_batch_num) + ): + self._max_batch_num_reached = True + logging.getLogger(EVEREST).info("Maximum number of batches reached") + optimizer.abort_optimization() + if ( + self._opt_callback is not None + and self._opt_callback() == "stop_optimization" + ): + logging.getLogger(EVEREST).info("User abort requested.") + optimizer.abort_optimization() - @property - def exit_code( - self, - ) -> Literal["max_batch_num_reached"] | OptimizerExitCode | None: - return self._exit_code + def _forward_model_evaluator( + self, control_values: NDArray[np.float64], evaluator_context: EvaluatorContext + ) -> EvaluatorResult: + # Reset the current run status: + self.status = None - @property - def result(self) -> OptimalResult | None: - return self._result + # Get any cached_results results that may be useful: + cached_results = self._get_cached_results(control_values, evaluator_context) - def __repr__(self) -> str: - config_json = json.dumps(self.everest_config, sort_keys=True, indent=2) - return f"EverestRunModel(config={config_json})" + # Create the case to run: + case_data = self._init_case_data( + control_values, evaluator_context, cached_results + ) + + # Initialize a new experiment in storage: + assert self._experiment + ensemble = self._experiment.create_ensemble( + name=f"batch_{self.batch_id}", + ensemble_size=len(case_data), + ) + for sim_id, controls in enumerate(case_data.values()): + self._setup_sim(sim_id, controls, ensemble) + + # Evaluate the case: + run_args = self._get_run_args(ensemble, evaluator_context, case_data) + self._context_env.update( + { + "_ERT_EXPERIMENT_ID": str(ensemble.experiment_id), + "_ERT_ENSEMBLE_ID": str(ensemble.id), + "_ERT_SIMULATION_MODE": "batch_simulation", + } + ) + assert self.eval_server_cfg + self._evaluate_and_postprocess(run_args, ensemble, self.eval_server_cfg) + + # If necessary, delete the run path: + self._delete_runpath(run_args) + + # Gather the results and create the result for ropt: + results = self._gather_results(ensemble) + evaluator_result = self._get_evaluator_result( + control_values, evaluator_context, case_data, results, cached_results + ) + + # Increase the batch ID for the next evaluation: + self.batch_id += 1 + + # Add the results from the evaluations to the cache: + self._add_results_to_cache( + control_values, + evaluator_context, + case_data, + evaluator_result.objectives, + evaluator_result.constraints, + ) + + return evaluator_result + + def _get_cached_results( + self, control_values: NDArray[np.float64], evaluator_context: EvaluatorContext + ) -> dict[int, Any]: + cached_results: dict[int, Any] = {} + if self._evaluator_cache is not None: + assert evaluator_context.config.realizations.names is not None + for sim_idx, realization in enumerate(evaluator_context.realizations): + cached_data = self._evaluator_cache.get( + evaluator_context.config.realizations.names[realization], + control_values[sim_idx, :], + ) + if cached_data is not None: + cached_results[sim_idx] = cached_data + return cached_results @staticmethod - def _add_control( - controls: Mapping[str, Any], - control_name: tuple[Any, ...], - control_value: float, - ) -> None: - group_name = control_name[0] - variable_name = control_name[1] - group = controls[group_name] - if len(control_name) > 2: - index_name = str(control_name[2]) - if variable_name in group: - group[variable_name][index_name] = control_value + def _init_case_data( + control_values: NDArray[np.float64], + evaluator_context: EvaluatorContext, + cached_results: dict[int, Any], + ) -> dict[int, dict[str, Any]]: + def add_control( + controls: dict[str, Any], + control_name: tuple[Any, ...], + control_value: float, + ) -> None: + group_name = control_name[0] + variable_name = control_name[1] + group = controls.get(group_name, {}) + if len(control_name) > 2: + index_name = str(control_name[2]) + if variable_name in group: + group[variable_name][index_name] = control_value + else: + group[variable_name] = {index_name: control_value} else: - group[variable_name] = {index_name: control_value} - else: - group[variable_name] = control_value + group[variable_name] = control_value + controls[group_name] = group + + case_data = {} + for control_idx in range(control_values.shape[0]): + add_to_case = ( + evaluator_context.active is None + or evaluator_context.active[evaluator_context.realizations[control_idx]] + ) + if add_to_case and control_idx not in cached_results: + controls: dict[str, Any] = {} + assert evaluator_context.config.variables.names is not None + for control_name, control_value in zip( + evaluator_context.config.variables.names, + control_values[control_idx, :], + strict=False, + ): + add_control(controls, control_name, control_value) + case_data[control_idx] = controls + return case_data @staticmethod - def _get_active_results( + def _get_calculated_results( results: list[dict[str, NDArray[np.float64]]], names: tuple[str], controls: NDArray[np.float64], - active: NDArray[np.bool_], + case_data: dict[int, Any], ) -> NDArray[np.float64]: + control_indices = list(case_data.keys()) values = np.zeros((controls.shape[0], len(names)), dtype=float64) for func_idx, name in enumerate(names): - values[active, func_idx] = np.fromiter( + values[control_indices, func_idx] = np.fromiter( (np.nan if not result else result[name][0] for result in results), dtype=np.float64, ) return values - def init_case_data( - self, - control_values: NDArray[np.float64], - metadata: EvaluatorContext, - realization_ids: list[int], - ) -> tuple[ - list[tuple[int, defaultdict[str, Any]]], NDArray[np.bool_], dict[int, int] - ]: - active = ( - np.ones(control_values.shape[0], dtype=np.bool_) - if metadata.active is None - else np.fromiter( - (metadata.active[realization] for realization in metadata.realizations), - dtype=np.bool_, - ) - ) - case_data = [] - cached = {} - - for sim_idx, real_id in enumerate(realization_ids): - if self._simulator_cache is not None: - cache_id = self._simulator_cache.find_key( - real_id, control_values[sim_idx, :] - ) - if cache_id is not None: - cached[sim_idx] = cache_id - active[sim_idx] = False - - if active[sim_idx]: - controls: defaultdict[str, Any] = defaultdict(dict) - assert metadata.config.variables.names is not None - for control_name, control_value in zip( - metadata.config.variables.names, - control_values[sim_idx, :], - strict=False, - ): - self._add_control(controls, control_name, control_value) - case_data.append((real_id, controls)) - return case_data, active, cached - def _setup_sim( self, sim_id: int, @@ -569,40 +542,27 @@ def _check_suffix( control_name, sim_id, ExtParamConfig.to_dataset(control) ) - def _forward_model_evaluator( - self, control_values: NDArray[np.float64], metadata: EvaluatorContext - ) -> EvaluatorResult: + def _get_run_args( + self, + ensemble: Ensemble, + evaluator_context: EvaluatorContext, + case_data: dict[int, Any], + ) -> list[RunArg]: def _slug(entity: str) -> str: entity = " ".join(str(entity).split()) return "".join([x if x.isalnum() else "_" for x in entity.strip()]) - self.status = None # Reset the current run status - assert metadata.config.realizations.names - realization_ids = [ - metadata.config.realizations.names[realization] - for realization in metadata.realizations - ] - case_data, active, cached = self.init_case_data( - control_values=control_values, - metadata=metadata, - realization_ids=realization_ids, - ) - assert self._experiment - ensemble = self._experiment.create_ensemble( - name=f"batch_{self.batch_id}", - ensemble_size=len(case_data), - ) - for sim_id, (geo_id, controls) in enumerate(case_data): - assert isinstance(geo_id, int) - self._setup_sim(sim_id, controls, ensemble) - substitutions = self.ert_config.substitutions # fill in the missing geo_id data substitutions[""] = _slug(ensemble.name) self.active_realizations = [True] * len(case_data) - for sim_id, (geo_id, _) in enumerate(case_data): + assert evaluator_context.config.realizations.names is not None + for sim_id, control_idx in enumerate(case_data.keys()): if self.active_realizations[sim_id]: - substitutions[f""] = str(geo_id) + realization = evaluator_context.realizations[control_idx] + substitutions[f""] = str( + evaluator_context.config.realizations.names[realization] + ) run_paths = Runpaths( jobname_format=self.ert_config.model_config.jobname_format_string, @@ -611,24 +571,35 @@ def _slug(entity: str) -> str: substitutions=substitutions, eclbase=self.ert_config.model_config.eclbase_format_string, ) - run_args = create_run_arguments( + return create_run_arguments( run_paths, self.active_realizations, ensemble=ensemble, ) - self._context_env.update( - { - "_ERT_EXPERIMENT_ID": str(ensemble.experiment_id), - "_ERT_ENSEMBLE_ID": str(ensemble.id), - "_ERT_SIMULATION_MODE": "batch_simulation", - } - ) - assert self.eval_server_cfg - self._evaluate_and_postprocess(run_args, ensemble, self.eval_server_cfg) + def _delete_runpath(self, run_args: list[RunArg]) -> None: + logging.getLogger(EVEREST).debug("Simulation callback called") + if self._simulation_delete_run_path: + for i, real in self.get_current_snapshot().reals.items(): + path_to_delete = run_args[int(i)].runpath + if real["status"] == "Finished" and os.path.isdir(path_to_delete): - self._delete_runpath(run_args) - # gather results + def onerror( + _: Callable[..., Any], + path: str, + sys_info: tuple[ + type[BaseException], BaseException, TracebackType + ], + ) -> None: + logging.getLogger(EVEREST).debug( + f"Failed to remove {path}, {sys_info}" + ) + + shutil.rmtree(path_to_delete, onerror=onerror) # pylint: disable=deprecated-argument + + def _gather_results( + self, ensemble: Ensemble + ) -> list[dict[str, npt.NDArray[np.float64]]]: results: list[dict[str, npt.NDArray[np.float64]]] = [] for sim_id, successful in enumerate(self.active_realizations): if not successful: @@ -640,62 +611,80 @@ def _slug(entity: str) -> str: data = ensemble.load_responses(key, (sim_id,)) d[key] = data["values"].to_numpy() results.append(d) - for fnc_name, alias in self.everest_config.function_aliases.items(): for result in results: result[fnc_name] = result[alias] + return results - objectives = self._get_active_results( + def _get_evaluator_result( + self, + control_values: NDArray[np.float64], + evaluator_context: EvaluatorContext, + case_data: dict[int, Any], + results: list[dict[str, npt.NDArray[np.float64]]], + cached_results: dict[int, Any], + ) -> EvaluatorResult: + # We minimize the negative of the objectives: + objectives = -self._get_calculated_results( results, - metadata.config.objectives.names, # type: ignore + evaluator_context.config.objectives.names, # type: ignore control_values, - active, + case_data, ) constraints = None - if metadata.config.nonlinear_constraints is not None: - constraints = self._get_active_results( + if evaluator_context.config.nonlinear_constraints is not None: + constraints = self._get_calculated_results( results, - metadata.config.nonlinear_constraints.names, # type: ignore + evaluator_context.config.nonlinear_constraints.names, # type: ignore control_values, - active, + case_data, ) - if self._simulator_cache is not None: - for sim_idx, cache_id in cached.items(): - objectives[sim_idx, ...] = self._simulator_cache.get_objectives( - cache_id - ) + if self._evaluator_cache is not None: + for control_idx, ( + cached_objectives, + cached_constraints, + ) in cached_results.items(): + objectives[control_idx, ...] = cached_objectives if constraints is not None: - constraints[sim_idx, ...] = self._simulator_cache.get_constraints( - cache_id - ) - - sim_ids = np.empty(control_values.shape[0], dtype=np.intc) - sim_ids.fill(-1) - sim_ids[active] = np.arange(len(results), dtype=np.intc) - - # Add the results from active simulations to the cache: - if self._simulator_cache is not None: - for sim_idx, real_id in enumerate(realization_ids): - if active[sim_idx]: - self._simulator_cache.add_simulation_results( - sim_idx, real_id, control_values, objectives, constraints - ) + assert cached_constraints is not None + constraints[control_idx, ...] = cached_constraints - # Note the negative sign for the objective results. Everest aims to do a - # maximization, while the standard practice of minimizing is followed by - # ropt. Therefore we will minimize the negative of the objectives: - evaluator_result = EvaluatorResult( - objectives=-objectives, + sim_ids = np.full(control_values.shape[0], -1, dtype=np.intc) + sim_ids[list(case_data.keys())] = np.arange(len(case_data), dtype=np.intc) + return EvaluatorResult( + objectives=objectives, constraints=constraints, batch_id=self.batch_id, evaluation_ids=sim_ids, ) - self.batch_id += 1 + def _add_results_to_cache( + self, + control_values: NDArray[np.float64], + evaluator_context: EvaluatorContext, + case_data: dict[int, Any], + objectives: NDArray[np.float64], + constraints: NDArray[np.float64] | None, + ) -> None: + if self._evaluator_cache is not None: + assert evaluator_context.config.realizations.names is not None + for control_idx in case_data: + realization = evaluator_context.realizations[control_idx] + self._evaluator_cache.add( + evaluator_context.config.realizations.names[realization], + control_values[control_idx, ...], + objectives[control_idx, ...], + None if constraints is None else constraints[control_idx, ...], + ) - return evaluator_result + def check_if_runpath_exists(self) -> bool: + return ( + self.everest_config.simulation_dir is not None + and os.path.exists(self.everest_config.simulation_dir) + and any(os.listdir(self.everest_config.simulation_dir)) + ) def send_snapshot_event(self, event: Event, iteration: int) -> None: super().send_snapshot_event(event, iteration) @@ -731,7 +720,7 @@ def _simulation_status(self, snapshot: EnsembleSnapshot) -> SimulationStatus: batch=self.batch_id, simulation=simulation, realization=realization, - fm_name=fm_step.get("name", "Unknwon"), # type: ignore + fm_name=fm_step.get("name", "Unknown"), # type: ignore error_path=fm_step.get("stderr", ""), # type: ignore ) jobs_progress.append(jobs) @@ -741,3 +730,62 @@ def _simulation_status(self, snapshot: EnsembleSnapshot) -> SimulationStatus: "progress": jobs_progress, "batch_number": self.batch_id, } + + def _handle_errors( + self, + batch: int, + simulation: Any, + realization: str, + fm_name: str, + error_path: str, + ) -> None: + fm_id = f"b_{batch}_r_{realization}_s_{simulation}_{fm_name}" + fm_logger = logging.getLogger("forward_models") + with open(error_path, encoding="utf-8") as errors: + error_str = errors.read() + + error_hash = hash(error_str) + err_msg = "Batch: {} Realization: {} Simulation: {} Job: {} Failed {}".format( + batch, realization, simulation, fm_name, "Error: {}\n {}" + ) + + if error_hash not in self._fm_errors: + error_id = len(self._fm_errors) + fm_logger.error(err_msg.format(error_id, error_str)) + self._fm_errors.update({error_hash: {"error_id": error_id, "ids": [fm_id]}}) + elif fm_id not in self._fm_errors[error_hash]["ids"]: + self._fm_errors[error_hash]["ids"].append(fm_id) + error_id = self._fm_errors[error_hash]["error_id"] + fm_logger.error(err_msg.format(error_id, "")) + + +class _EvaluatorCache: + EPS = float(np.finfo(np.float32).eps) + + def __init__(self) -> None: + self._objectives: dict[int, NDArray[np.float64]] = {} + self._constraints: dict[int, NDArray[np.float64] | None] = {} + self._keys: defaultdict[int, list[tuple[NDArray[np.float64], int]]] = ( + defaultdict(list) + ) + self._counter = count() + + def add( + self, + realization_id: int, + control_values: NDArray[np.float64], + objectives: NDArray[np.float64], + constraints: NDArray[np.float64] | None, + ) -> None: + key = next(self._counter) + self._keys[realization_id].append((control_values.copy(), key)) + self._objectives[key] = objectives.copy() + self._constraints[key] = None if constraints is None else constraints.copy() + + def get( + self, realization_id: int, controls: NDArray[np.float64] + ) -> tuple[Any, ...] | None: + for cached_result, key in self._keys.get(realization_id, []): + if np.allclose(controls, cached_result, rtol=0.0, atol=self.EPS): + return self._objectives[key], self._constraints[key] + return None diff --git a/src/everest/config/environment_config.py b/src/everest/config/environment_config.py index 080c7bb09b1..4d54d33e9a1 100644 --- a/src/everest/config/environment_config.py +++ b/src/everest/config/environment_config.py @@ -1,6 +1,7 @@ -from typing import Literal +import random +from typing import Literal, Self -from pydantic import BaseModel, Field, field_validator +from pydantic import BaseModel, Field, field_validator, model_validator from everest.config.validation_utils import check_path_valid @@ -43,3 +44,9 @@ class EnvironmentConfig(BaseModel, extra="forbid"): # type: ignore def validate_output_folder(cls, output_folder): # pylint:disable=E0213 check_path_valid(output_folder) return output_folder + + @model_validator(mode="after") + def validate_random_seed(self) -> Self: + if self.random_seed is None: + self.random_seed = random.randint(1, 2**30) + return self diff --git a/src/everest/simulator/__init__.py b/src/everest/simulator/__init__.py index 02e9a03f80e..b1611081e43 100644 --- a/src/everest/simulator/__init__.py +++ b/src/everest/simulator/__init__.py @@ -1,5 +1,3 @@ -from everest.simulator.simulator_cache import SimulatorCache - JOB_SUCCESS = "Finished" JOB_WAITING = "Waiting" JOB_RUNNING = "Running" @@ -109,5 +107,4 @@ "JOB_RUNNING", "JOB_SUCCESS", "JOB_WAITING", - "SimulatorCache", ] diff --git a/src/everest/simulator/simulator_cache.py b/src/everest/simulator/simulator_cache.py deleted file mode 100644 index db4e3a7ae3c..00000000000 --- a/src/everest/simulator/simulator_cache.py +++ /dev/null @@ -1,58 +0,0 @@ -from collections import defaultdict -from itertools import count - -import numpy as np -from numpy._typing import NDArray - - -# This cache can be used to prevent re-evaluation of forward models. Due to its -# simplicity it has some limitations: -# - There is no limit on the number of cached entries. -# - Searching in the cache is by brute-force, iterating over the entries. -# Both of these should not be an issue for the intended use with cases where the -# forward models are very expensive to compute: The number of cached entries is -# not expected to become prohibitively large. -class SimulatorCache: - def __init__(self) -> None: - # Stores the realization/controls key, together with an ID. - self._keys: defaultdict[int, list[tuple[NDArray[np.float64], int]]] = ( - defaultdict(list) - ) - # Store objectives and constraints by ID: - self._objectives: dict[int, NDArray[np.float64]] = {} - self._constraints: dict[int, NDArray[np.float64]] = {} - - # Generate unique ID's: - self._counter = count() - - def add_simulation_results( - self, - sim_idx: int, - real_id: int, - control_values: NDArray[np.float64], - objectives: NDArray[np.float64], - constraints: NDArray[np.float64] | None, - ): - cache_id = next(self._counter) - self._keys[real_id].append((control_values[sim_idx, :].copy(), cache_id)) - self._objectives[cache_id] = objectives[sim_idx, ...].copy() - if constraints is not None: - self._constraints[cache_id] = constraints[sim_idx, ...].copy() - - def find_key(self, real_id: int, control_vector: NDArray[np.float64]) -> int | None: - # Brute-force search, premature optimization is the root of all evil: - for cached_vector, cache_id in self._keys.get(real_id, []): - if np.allclose( - control_vector, - cached_vector, - rtol=0.0, - atol=float(np.finfo(np.float32).eps), - ): - return cache_id - return None - - def get_objectives(self, cache_id: int) -> NDArray[np.float64]: - return self._objectives[cache_id] - - def get_constraints(self, cache_id: int) -> NDArray[np.float64]: - return self._constraints[cache_id]