diff --git a/src/ert/run_models/everest_run_model.py b/src/ert/run_models/everest_run_model.py index 2ea3d16e28c..4522804dc8e 100644 --- a/src/ert/run_models/everest_run_model.py +++ b/src/ert/run_models/everest_run_model.py @@ -12,7 +12,6 @@ import shutil import threading import time -from dataclasses import dataclass from pathlib import Path from types import TracebackType from typing import ( @@ -28,7 +27,6 @@ TypedDict, ) -import seba_sqlite.sqlite_storage from ropt.enums import EventType, OptimizerExitCode from ropt.plan import BasicOptimizer, Event from seba_sqlite import SqliteStorage @@ -37,6 +35,7 @@ from ert.ensemble_evaluator import EvaluatorServerConfig from ert.storage import open_storage from everest.config import EverestConfig +from everest.everest_storage import EverestStorage, OptimalResult from everest.optimizer.everest2ropt import everest2ropt from everest.simulator import Simulator from everest.simulator.everest_to_ert import everest_to_ert_config @@ -291,24 +290,6 @@ class OptimizerCallback(Protocol): def __call__(self) -> str | None: ... -@dataclass -class OptimalResult: - batch: int - controls: List[Any] - total_objective: float - - @staticmethod - def from_seba_optimal_result( - o: Optional[seba_sqlite.sqlite_storage.OptimalResult] = None, - ) -> "OptimalResult" | None: - if o is None: - return None - - return OptimalResult( - batch=o.batch, controls=o.controls, total_objective=o.total_objective - ) - - class EverestRunModel(BaseRunModel): def __init__( self, @@ -420,6 +401,16 @@ def run_experiment( # Initialize the ropt optimizer: optimizer = self._create_optimizer(simulator) + self.ever_storage = EverestStorage( + output_dir=Path(self.everest_config.optimization_output_dir), + ) + self.ever_storage.observe_optimizer( + optimizer, + Path(self.everest_config.optimization_output_dir) + / "dakota" + / "OPT_DEFAULT.out", + ) + # The SqliteStorage object is used to store optimization results from # Seba in an sqlite database. It reacts directly to events emitted by # Seba and is not called by Everest directly. The stored results are @@ -437,6 +428,8 @@ def run_experiment( self._result = OptimalResult.from_seba_optimal_result( seba_storage.get_optimal_result() # type: ignore ) + optimal_result_from_everstorage = self.ever_storage.get_optimal_result() + assert self._result == optimal_result_from_everstorage if self._monitor_thread is not None: self._monitor_thread.stop() diff --git a/src/everest/everest_storage.py b/src/everest/everest_storage.py index 7b4e0585133..d82d403eea8 100644 --- a/src/everest/everest_storage.py +++ b/src/everest/everest_storage.py @@ -1,54 +1,401 @@ -import copy +from __future__ import annotations + +import datetime +import json import logging import os -import sqlite3 -import time -from collections import namedtuple -from itertools import count +from dataclasses import dataclass, field from pathlib import Path +from typing import ( + TYPE_CHECKING, + Any, + Dict, + List, + Optional, +) import numpy as np - -from ropt.enums import ConstraintType, EventType +import polars +from numpy.core.numeric import Infinity +from ropt.config.enopt import EnOptConfig +from ropt.enums import EventType +from ropt.plan import BasicOptimizer, Event from ropt.results import FunctionResults, GradientResults, convert_to_maximize -from .database import Database -from .snapshot import SebaSnapshot - -OptimalResult = namedtuple( - "OptimalResult", "batch, controls, total_objective, expected_objectives" -) +from seba_sqlite import sqlite_storage logger = logging.getLogger(__name__) +if TYPE_CHECKING: + pass -def _convert_names(control_names): - converted_names = [] - for name in control_names: - converted = f"{name[0]}_{name[1]}" - if len(name) > 2: - converted += f"-{name[2]}" - converted_names.append(converted) - return converted_names +@dataclass +class OptimalResult: + batch: int + controls: List[Any] + total_objective: float -class EverestStorage: - # This implementation builds as much as possible on the older database and - # snapshot code, since it is meant for backwards compatibility, and should - # not be extended with new functionality. + @staticmethod + def from_seba_optimal_result( + o: Optional[sqlite_storage.OptimalResult] = None, + ) -> "OptimalResult" | None: + if o is None: + return None + + return OptimalResult( + batch=o.batch, controls=o.controls, total_objective=o.total_objective + ) + + +def try_read_df(path: Path) -> polars.DataFrame | None: + return polars.read_parquet(path) if path.exists() else None + + +@dataclass +class BatchDataFrames: + batch_id: int + batch_controls: polars.DataFrame + batch_objectives: Optional[polars.DataFrame] + realization_objectives: Optional[polars.DataFrame] + batch_constraints: Optional[polars.DataFrame] + realization_constraints: Optional[polars.DataFrame] + batch_objective_gradient: Optional[polars.DataFrame] + perturbation_objectives: Optional[polars.DataFrame] + batch_constraint_gradient: Optional[polars.DataFrame] + perturbation_constraints: Optional[polars.DataFrame] + is_improvement: Optional[bool] = False + + @property + def existing_dataframes(self) -> Dict[str, polars.DataFrame]: + dataframes = {} + + if self.batch_objectives is not None: + dataframes["batch_objectives"] = self.batch_objectives + + if self.realization_objectives is not None: + dataframes["realization_objectives"] = self.realization_objectives + + if self.batch_constraints is not None: + dataframes["batch_constraints"] = self.batch_constraints + + if self.realization_constraints is not None: + dataframes["realization_constraints"] = self.realization_constraints + + if self.batch_objective_gradient is not None: + dataframes["batch_objective_gradient"] = self.batch_objective_gradient + + if self.perturbation_objectives is not None: + dataframes["perturbation_objectives"] = self.perturbation_objectives + + if self.batch_constraint_gradient is not None: + dataframes["batch_constraint_gradient"] = self.batch_constraint_gradient + + if self.perturbation_constraints is not None: + dataframes["perturbation_constraints"] = self.perturbation_constraints + + return dataframes + + +@dataclass +class EverestStorageDataFrames: + batches: List[BatchDataFrames] = field(default_factory=list) + time_ended: Optional[datetime.date] = None + initial_values: Optional[polars.DataFrame] = None + objective_functions: Optional[polars.DataFrame] = None + nonlinear_constraints: Optional[polars.DataFrame] = None + realization_weights: Optional[polars.DataFrame] = None + + def write_to_experiment( + self, experiment: _OptimizerOnlyExperiment, write_csv=False + ): + # Stored in ensembles instead + # self.batch_objectives.write_parquet(path / "objective_data.parquet") + # self.gradient_evaluation.write_parquet(path / "gradient_evaluation.parquet") + # self.gradient.write_parquet(path / "gradient.parquet") + + # The stuff under experiment should maybe be JSON? + # 2DO PICK ONE, for now doing both for proof of concept - def __init__(self, optimizer, output_dir): - # Internal variables. + with open( + experiment.optimizer_mount_point / "initial_values.json", + mode="w+", + encoding="utf-8", + ) as f: + f.write(json.dumps(self.initial_values.to_dicts())) + + self.initial_values.write_parquet( + experiment.optimizer_mount_point / "initial_values.parquet" + ) + + with open( + experiment.optimizer_mount_point / "objective_functions.json", + mode="w+", + encoding="utf-8", + ) as f: + f.write(json.dumps(self.objective_functions.to_dicts())) + + self.objective_functions.write_parquet( + experiment.optimizer_mount_point / "objective_functions.parquet" + ) + + if self.nonlinear_constraints is not None: + with open( + experiment.optimizer_mount_point / "nonlinear_constraints.json", + mode="w+", + encoding="utf-8", + ) as f: + f.write(json.dumps(self.nonlinear_constraints.to_dicts())) + self.nonlinear_constraints.write_parquet( + experiment.optimizer_mount_point / "nonlinear_constraints.parquet" + ) + + if self.realization_weights is not None: + with open( + experiment.optimizer_mount_point / "realization_weights.json", + mode="w+", + encoding="utf-8", + ) as f: + f.write(json.dumps(self.realization_weights.to_dicts())) + + self.realization_weights.write_parquet( + experiment.optimizer_mount_point / "realization_weights.parquet" + ) + + for batch_data in self.batches: + ensemble = experiment.get_ensemble_by_name(f"batch_{batch_data.batch_id}") + with open( + ensemble.optimizer_mount_point / "batch.json", "w+", encoding="utf-8" + ) as f: + json.dump( + { + "id": batch_data.batch_id, + "is_improvement": batch_data.is_improvement, + }, + f, + ) + + for df_key, df in batch_data.existing_dataframes.items(): + df.write_parquet(ensemble.optimizer_mount_point / f"{df_key}.parquet") + + if write_csv: + self.initial_values.write_csv( + experiment.optimizer_mount_point / "initial_values.csv" + ) + + self.objective_functions.write_csv( + experiment.optimizer_mount_point / "objective_functions.csv" + ) + + if self.nonlinear_constraints is not None: + self.nonlinear_constraints.write_csv( + experiment.optimizer_mount_point / "nonlinear_constraints.csv" + ) + + if self.realization_weights is not None: + self.realization_weights.write_csv( + experiment.optimizer_mount_point / "realization_weights.csv" + ) + + for batch_data in self.batches: + ensemble = experiment.get_ensemble_by_name( + f"batch_{batch_data.batch_id}" + ) + for df_key, df in batch_data.existing_dataframes.items(): + df.write_csv(ensemble.optimizer_mount_point / f"{df_key}.csv") + df.write_json(ensemble.optimizer_mount_point / f"{df_key}.json") + + def read_from_experiment(self, experiment: _OptimizerOnlyExperiment) -> None: + self.initial_values = polars.read_parquet( + experiment.optimizer_mount_point / "initial_values.parquet" + ) + self.objective_functions = polars.read_parquet( + experiment.optimizer_mount_point / "objective_functions.parquet" + ) + + if ( + experiment.optimizer_mount_point / "nonlinear_constraints.parquet" + ).exists(): + self.nonlinear_constraints = polars.read_parquet( + experiment.optimizer_mount_point / "nonlinear_constraints.parquet" + ) + + if (experiment.optimizer_mount_point / "realization_weights.parquet").exists(): + self.realization_weights = polars.read_parquet( + experiment.optimizer_mount_point / "realization_weights.parquet" + ) + + for name, ens in experiment.ensembles.items(): + batch_id = int(name.split("_")[1]) + + batch_objectives = try_read_df( + ens.optimizer_mount_point / "batch_objectives.parquet" + ) + realization_objectives = try_read_df( + ens.optimizer_mount_point / "realization_objectives.parquet" + ) + batch_constraints = try_read_df( + ens.optimizer_mount_point / "batch_constraints.parquet" + ) + realization_constraints = try_read_df( + ens.optimizer_mount_point / "realization_constraints.parquet" + ) + batch_objective_gradient = try_read_df( + ens.optimizer_mount_point / "batch_objective_gradient.parquet" + ) + perturbation_objectives = try_read_df( + ens.optimizer_mount_point / "perturbation_objectives.parquet" + ) + batch_constraint_gradient = try_read_df( + ens.optimizer_mount_point / "batch_constraint_gradient.parquet" + ) + perturbation_constraints = try_read_df( + ens.optimizer_mount_point / "perturbation_constraints.parquet" + ) + + batch_controls = try_read_df( + ens.optimizer_mount_point / "batch_controls.parquet" + ) + + with open(ens.optimizer_mount_point / "batch.json", encoding="utf-8") as f: + info = json.load(f) + batch_id = info["id"] + is_improvement = info["is_improvement"] + + self.batches.append( + BatchDataFrames( + batch_id, + batch_controls, + batch_objectives, + realization_objectives, + batch_constraints, + realization_constraints, + batch_objective_gradient, + perturbation_objectives, + batch_constraint_gradient, + perturbation_constraints, + is_improvement, + ) + ) + + self.batches.sort(key=lambda b: b.batch_id) + + +class _OptimizerOnlyEnsemble: + def __init__(self, output_dir: Path): self._output_dir = output_dir - self._database = Database(output_dir) + + @property + def optimizer_mount_point(self) -> Path: + if not (self._output_dir / "optimizer").exists(): + Path.mkdir(self._output_dir / "optimizer", parents=True) + + return self._output_dir / "optimizer" + + +class _OptimizerOnlyExperiment: + def __init__(self, output_dir: Path): + self._output_dir = output_dir + self._ensembles = {} + + @property + def optimizer_mount_point(self) -> Path: + if not (self._output_dir / "optimizer").exists(): + Path.mkdir(self._output_dir / "optimizer", parents=True) + + return self._output_dir / "optimizer" + + @property + def ensembles(self) -> Dict[str, _OptimizerOnlyEnsemble]: + return { + str(d): _OptimizerOnlyEnsemble(self._output_dir / "ensembles" / d) + for d in os.listdir(self._output_dir / "ensembles") + if "batch_" in d + } + + def get_ensemble_by_name(self, name: str) -> _OptimizerOnlyEnsemble: + if name not in self._ensembles: + self._ensembles[name] = _OptimizerOnlyEnsemble( + self._output_dir / "ensembles" / name + ) + + return self._ensembles[name] + + +@dataclass +class _EvaluationResults: + batch_controls: polars.DataFrame + batch_objectives: polars.DataFrame + realization_objectives: polars.DataFrame + batch_constraints: Optional[polars.DataFrame] + realization_constraints: Optional[polars.DataFrame] + + +@dataclass +class _GradientResults: + batch_objective_gradient: polars.DataFrame + perturbation_objectives: polars.DataFrame + batch_constraint_gradient: Optional[polars.DataFrame] + perturbation_constraints: Optional[polars.DataFrame] + + +class EverestStorage: + def __init__( + self, + output_dir: Path, + ) -> None: + self._initialized = False self._control_ensemble_id = 0 self._gradient_ensemble_id = 0 - self._simulator_results = None - self._merit_file = Path(output_dir) / "dakota" / "OPT_DEFAULT.out" - # Connect event handlers. - self._set_event_handlers(optimizer) + self._output_dir = output_dir + self._merit_file: Optional[Path] = None + self.data = EverestStorageDataFrames() - self._initialized = False + def write_to_output_dir(self) -> None: + exp = _OptimizerOnlyExperiment(self._output_dir) + + # csv writing mostly for dev/debugging/quick inspection + self.data.write_to_experiment(exp, write_csv=True) + + def read_from_output_dir(self) -> None: + exp = _OptimizerOnlyExperiment(self._output_dir) + self.data.read_from_experiment(exp) + self._initialized = True + + def observe_optimizer( + self, + optimizer: BasicOptimizer, + merit_file: Path, + ) -> None: + # We only need this file if we are observing a running ROPT instance + # (using dakota backend) + self._merit_file = merit_file + + # Q: Do these observers have to be explicitly disconnected/destroyed? + optimizer.add_observer(EventType.START_OPTIMIZER_STEP, self._initialize) + optimizer.add_observer( + EventType.FINISHED_EVALUATION, self._handle_finished_batch_event + ) + optimizer.add_observer( + EventType.FINISHED_OPTIMIZER_STEP, + self._handle_finished_event, + ) + + @property + def experiment(self) -> _OptimizerOnlyExperiment: + # Should be replaced with ERT experiment + # in the long run + return self._experiment + + @staticmethod + def _convert_names(control_names): + converted_names = [] + for name in control_names: + converted = f"{name[0]}_{name[1]}" + if len(name) > 2: + converted += f"-{name[2]}" + converted_names.append(converted) + return converted_names @property def file(self): @@ -59,356 +406,648 @@ def _initialize(self, event): return self._initialized = True - self._database.add_experiment( - name="optimization_experiment", start_time_stamp=time.time() + config = event.config + self.data.initial_values = polars.DataFrame( + { + "control_name": polars.Series( + self._convert_names(config.variables.names), dtype=polars.String + ), + "initial_value": polars.Series( + config.variables.initial_values, dtype=polars.Float32 + ), + "lower_bounds": polars.Series( + config.variables.lower_bounds, dtype=polars.Float32 + ), + "upper_bounds": polars.Series( + config.variables.upper_bounds, dtype=polars.Float32 + ), + } ) - # Add configuration values. - config = event.config - for control_name, initial_value, lower_bound, upper_bound in zip( - _convert_names(config.variables.names), - config.variables.initial_values, - config.variables.lower_bounds, - config.variables.upper_bounds, - ): - self._database.add_control_definition( - control_name, initial_value, lower_bound, upper_bound - ) - - for name, weight, scale in zip( - config.objective_functions.names, - config.objective_functions.weights, - config.objective_functions.scales, - ): - self._database.add_function( - name=name, - function_type="OBJECTIVE", - weight=weight, - normalization=1.0 / scale, - ) + self.data.objective_functions = polars.DataFrame( + { + "objective_name": config.objective_functions.names, + "weight": polars.Series( + config.objective_functions.weights, dtype=polars.Float32 + ), + "normalization": polars.Series( + [1.0 / s for s in config.objective_functions.scales], + dtype=polars.Float32, + ), + } + ) if config.nonlinear_constraints is not None: - for name, scale, rhs_value, constraint_type in zip( - config.nonlinear_constraints.names, - config.nonlinear_constraints.scales, - config.nonlinear_constraints.rhs_values, - config.nonlinear_constraints.types, - ): - self._database.add_function( - name=name, - function_type="CONSTRAINT", - normalization=scale, - rhs_value=rhs_value, - constraint_type=ConstraintType(constraint_type).name.lower(), + self.data.nonlinear_constraints = polars.DataFrame( + { + "constraint_name": config.nonlinear_constraints.names, + "normalization": config.nonlinear_constraints.scales, + "constraint_rhs_value": config.nonlinear_constraints.rhs_values, + "constraint_type": config.nonlinear_constraints.types, + } + ) + + self.data.realization_weights = polars.DataFrame( + { + "realization": polars.Series( + config.realizations.names, dtype=polars.UInt16 + ), + "weight": polars.Series( + config.realizations.weights, dtype=polars.Float32 + ), + } + ) + + def _store_function_results( + self, config: EnOptConfig, results: FunctionResults + ) -> _EvaluationResults: + # We could select only objective values, + # but we select all to also get the constraint values (if they exist) + realization_objectives = polars.from_pandas( + results.to_dataframe(config, "evaluations").reset_index() + ).drop("plan_id") + batch_objectives = polars.from_pandas( + results.to_dataframe( + config, + "functions", + select=["objectives", "weighted_objective", "scaled_objectives"], + ).reset_index() + ).drop("plan_id") + + batch_controls = polars.from_pandas( + results.to_dataframe( + config, "evaluations", select=["variables", "scaled_variables"] + ).reset_index() + ).drop("plan_id") + + batch_controls = self._rename_columns(batch_controls) + control_names = batch_controls["control_name"].unique().to_list() + + has_scaled_controls = "scaled_control_value" in batch_controls + batch_controls = batch_controls.pivot( + on="control_name", + values=["control_value", "scaled_control_value"] + if has_scaled_controls + else ["control_value"], + separator=":", + ) + + if has_scaled_controls: + batch_controls = batch_controls.rename( + { + **{f"control_value:{name}": name for name in control_names}, + **{ + f"scaled_control_value:{name}": f"{name}.scaled" + for name in control_names + }, + } + ) + + try: + batch_constraints = polars.from_pandas( + results.to_dataframe(config, "nonlinear_constraints").reset_index() + ).drop("plan_id") + except AttributeError: + batch_constraints = None + + realization_constraints = None + + batch_objectives = self._rename_columns(batch_objectives) + realization_objectives = self._rename_columns(realization_objectives) + objective_names = realization_objectives["objective_name"].unique().to_list() + + has_scaled_objectives = ( + "scaled_objective_value" in realization_objectives.columns + ) + + batch_objectives = batch_objectives.pivot( + on="objective_name", + values=["objective_value", "scaled_objective_value"] + if has_scaled_objectives + else ["objective_value"], + separator=":", + ) + + if has_scaled_objectives: + batch_objectives = batch_objectives.rename( + { + **{f"objective_value:{name}": name for name in objective_names}, + **( + { + f"scaled_objective_value:{name}": f"{name}.scaled" + for name in objective_names + } + if has_scaled_objectives + else {} + ), + } + ) + + if batch_constraints is not None: + batch_constraints = batch_constraints.rename( + { + "nonlinear_constraint": "constraint_name", + "values": "constraint_value", + "violations": "constraint_violation", + "scaled_values": "scaled_constraint_value", + "scaled_violations": "scaled_constraint_violation", + } + ) + + constraint_names = batch_constraints["constraint_name"].unique().to_list() + + batch_constraints = batch_constraints.pivot( + on="constraint_name", + values=[ + "constraint_value", + "scaled_constraint_value", + "constraint_violation", + "scaled_constraint_violation", + ], + separator=";", + ).rename( + { + **{f"constraint_value;{name}": name for name in constraint_names}, + **{ + f"scaled_constraint_value;{name}": f"{name}.scaled" + for name in constraint_names + }, + **{ + f"constraint_violation;{name}": f"{name}.violation" + for name in constraint_names + }, + **{ + f"scaled_constraint_violation;{name}": f"{name}.scaled_violation" + for name in constraint_names + }, + } + ) + + # remove from main table, and create separate constraints table + realization_constraints = realization_objectives[ + "result_id", + "batch_id", + "realization", + "constraint_name", + "constraint_value", + "scaled_constraint_value", + ].unique(["result_id", "batch_id", "realization", "constraint_name"]) + realization_constraints = realization_constraints.pivot( + values=["constraint_value", "scaled_constraint_value"], + on="constraint_name", + separator=";", + ).rename( + { + **{f"constraint_value;{name}": name for name in constraint_names}, + **{ + f"scaled_constraint_value;{name}": f"{name}.scaled" + for name in constraint_names + }, + } + ) + + realization_objectives = realization_objectives.drop( + [c for c in realization_objectives.columns if "constraint" in c.lower()] + ).unique(subset=["result_id", "batch_id", "realization", "control_name"]) + batch_objectives = batch_objectives.drop( + [c for c in batch_objectives.columns if "constraint" in c.lower()] + ).unique(subset=["result_id", "batch_id"]) + + realization_objectives = ( + realization_objectives.drop(["control_name", "control_value"]) + .unique(subset=["result_id", "batch_id", "realization"]) + .pivot( + values="objective_value", + index=[ + "result_id", + "batch_id", + "realization", + ], + columns="objective_name", + ) + ) + + return _EvaluationResults( + batch_controls, + batch_objectives, + realization_objectives, + batch_constraints, + realization_constraints, + ) + + @staticmethod + def _rename_columns(df: polars.DataFrame): + _renames = { + "objective": "objective_name", + "weighted_objective": "total_objective_value", + "variable": "control_name", + "variables": "control_value", + "objectives": "objective_value", + "constraints": "constraint_value", + "nonlinear_constraint": "constraint_name", + "scaled_constraints": "scaled_constraint_value", + "scaled_objectives": "scaled_objective_value", + "perturbed_variables": "perturbed_control_value", + "perturbed_objectives": "perturbed_objective_value", + "perturbed_constraints": "perturbed_constraint_value", + "scaled_perturbed_objectives": "scaled_perturbed_objective_value", + "scaled_perturbed_constraints": "scaled_perturbed_constraint_value", + "scaled_variables": "scaled_control_value", + } + return df.rename({k: v for k, v in _renames.items() if k in df.columns}) + + def _store_gradient_results( + self, config: EnOptConfig, results: FunctionResults + ) -> _GradientResults: + perturbation_objectives = polars.from_pandas( + results.to_dataframe(config, "evaluations").reset_index() + ).drop("plan_id") + + try: + # ROPT_NOTE: Why is this sometimes None? How can we know if it is + # expected to be None? + batch_objective_gradient = polars.from_pandas( + results.to_dataframe(config, "gradients").reset_index() + ).drop("plan_id") + except AttributeError: + batch_objective_gradient = None + + if batch_objective_gradient is not None: + batch_objective_gradient = self._rename_columns(batch_objective_gradient) + + perturbation_objectives = self._rename_columns(perturbation_objectives) + + if "constraint_name" in perturbation_objectives: + constraint_names = ( + perturbation_objectives["constraint_name"].unique().to_list() + ) + perturbation_constraints = ( + perturbation_objectives[ + "result_id", + "batch_id", + "realization", + "perturbation", + "control_name", + "perturbed_control_value", + *[ + c + for c in perturbation_objectives.columns + if "constraint" in c.lower() + ], + ] + .pivot( + on="constraint_name", + values=[ + "perturbed_constraint_value", + "scaled_perturbed_constraint_value", + ], + separator=";", ) + .rename( + { + **{ + f"perturbed_constraint_value;{name}": name + for name in constraint_names + }, + **{ + f"scaled_perturbed_constraint_value;{name}": f"{name}.scaled" + for name in constraint_names + }, + } + ) + .pivot(on="control_name", values="perturbed_control_value") + ) + + if batch_objective_gradient is not None: + # ROPT_NOTE: Will this ever happen? We get constraints + # but no "gradient" field in the results. + batch_constraint_gradient = batch_objective_gradient[ + "result_id", + "batch_id", + "control_name", + *[ + c + for c in batch_objective_gradient.columns + if "constraint" in c.lower() + ], + ] + + batch_objective_gradient = batch_objective_gradient.drop( + [ + c + for c in batch_objective_gradient.columns + if "constraint" in c.lower() + ] + ).unique() - for name, weight in zip( - config.realizations.names, - config.realizations.weights, - ): - self._database.add_realization(str(name), weight) - - def _add_batch(self, config, controls, perturbed_controls): - self._gradient_ensemble_id += 1 - self._control_ensemble_id = self._gradient_ensemble_id - control_names = _convert_names(config.variables.names) - for control_name, value in zip(control_names, controls): - self._database.add_control_value( - set_id=self._control_ensemble_id, - control_name=control_name, - value=value, - ) - if perturbed_controls is not None: - perturbed_controls = perturbed_controls.reshape( - perturbed_controls.shape[0], -1 - ) - self._gradient_ensemble_id = self._control_ensemble_id - for g_idx in range(perturbed_controls.shape[1]): - self._gradient_ensemble_id += 1 - for c_idx, c_name in enumerate(control_names): - self._database.add_control_value( - set_id=self._gradient_ensemble_id, - control_name=c_name, - value=perturbed_controls[c_idx, g_idx], - ) - - def _add_simulations(self, config, result): - self._gradient_ensemble_id = self._control_ensemble_id - simulation_index = count() - if isinstance(result, FunctionResults): - for realization_name in config.realizations.names: - self._database.add_simulation( - realization_name=str(realization_name), - set_id=self._control_ensemble_id, - sim_name=f"{result.batch_id}_{next(simulation_index)}", - is_gradient=False, + constraint_names = ( + batch_constraint_gradient["constraint_name"].unique().to_list() ) - if isinstance(result, GradientResults): - for realization_name in config.realizations.names: - for _ in range(config.gradient.number_of_perturbations): - self._gradient_ensemble_id += 1 - self._database.add_simulation( - realization_name=str(realization_name), - set_id=self._gradient_ensemble_id, - sim_name=f"{result.batch_id}_{next(simulation_index)}", - is_gradient=True, - ) - - def _add_simulator_results( - self, config, batch, objective_results, constraint_results - ): - if constraint_results is None: - results = objective_results - else: - results = np.vstack((objective_results, constraint_results)) - statuses = np.logical_and.reduce(np.isfinite(results), axis=0) - names = config.objective_functions.names - if config.nonlinear_constraints is not None: - names += config.nonlinear_constraints.names - - for sim_idx, status in enumerate(statuses): - sim_name = f"{batch}_{sim_idx}" - for function_idx, name in enumerate(names): - if status: - self._database.add_simulation_result( - sim_name, results[function_idx, sim_idx], name, 0 - ) - self._database.set_simulation_ended(sim_name, status) - - def _add_constraint_values(self, config, batch, constraint_values): - statuses = np.logical_and.reduce(np.isfinite(constraint_values), axis=0) - for sim_id, status in enumerate(statuses): - if status: - for idx, constraint_name in enumerate( - config.nonlinear_constraints.names - ): - # Note the time_index=0, the database supports storing - # multipel time-points, but we do not support that, so we - # use times_index=0. - self._database.update_simulation_result( - simulation_name=f"{batch}_{sim_id}", - function_name=constraint_name, - times_index=0, - value=constraint_values[idx, sim_id], - ) - - def _add_gradients(self, config, objective_gradients): - for grad_index, gradient in enumerate(objective_gradients): - for control_index, control_name in enumerate( - _convert_names(config.variables.names) - ): - self._database.add_gradient_result( - gradient[control_index], - config.objective_functions.names[grad_index], - 1, - control_name, + batch_constraint_gradient = batch_constraint_gradient.pivot( + on="constraint_name", + values=["constraint_value", "scaled_constraint_value"], + separator=";", + ).rename( + { + **{ + f"constraint_value;{name}": name + for name in constraint_names + }, + **{ + f"scaled_constraint_value;{name}": f"{name}.scaled" + for name in constraint_names + }, + } ) + else: + batch_constraint_gradient = None + + perturbation_objectives = perturbation_objectives.drop( + [ + c + for c in perturbation_objectives.columns + if "constraint" in c.lower() + ] + ).unique() + else: + batch_constraint_gradient = None + perturbation_constraints = None - def _add_total_objective(self, total_objective): - self._database.add_calculation_result( - set_id=self._control_ensemble_id, - object_function_value=total_objective, + perturbation_objectives = perturbation_objectives.drop( + "perturbed_evaluation_ids", "control_value" ) - def _convert_constraints(self, config, constraint_results): - constraint_results = copy.deepcopy(constraint_results) - rhs_values = config.nonlinear_constraints.rhs_values - for idx, constraint_type in enumerate(config.nonlinear_constraints.types): - constraint_results[idx] -= rhs_values[idx] - if constraint_type == ConstraintType.LE: - constraint_results[idx] *= -1.0 - return constraint_results - - def _store_results(self, config, results): - if isinstance(results, FunctionResults): - objective_results = results.evaluations.objectives - objective_results = np.moveaxis(objective_results, -1, 0) - constraint_results = results.evaluations.constraints - if constraint_results is not None: - constraint_results = np.moveaxis(constraint_results, -1, 0) - else: - objective_results = None - constraint_results = None - - if isinstance(results, GradientResults): - perturbed_variables = results.evaluations.perturbed_variables - perturbed_variables = np.moveaxis(perturbed_variables, -1, 0) - perturbed_objectives = results.evaluations.perturbed_objectives - perturbed_objectives = np.moveaxis(perturbed_objectives, -1, 0) - perturbed_constraints = results.evaluations.perturbed_constraints - if perturbed_constraints is not None: - perturbed_constraints = np.moveaxis(perturbed_constraints, -1, 0) - else: - perturbed_variables = None - perturbed_objectives = None - perturbed_constraints = None + perturbation_objectives = perturbation_objectives.pivot( + on="objective_name", values="perturbed_objective_value" + ) - self._add_batch(config, results.evaluations.variables, perturbed_variables) - self._add_simulations(config, results) + perturbation_objectives = perturbation_objectives.pivot( + on="control_name", values="perturbed_control_value" + ) + + # All that remains in perturbation_objectives is + # control values per realization, which is redundant to store here. - # Convert back the simulation results to the legacy format: - if perturbed_objectives is not None: - perturbed_objectives = perturbed_objectives.reshape( - perturbed_objectives.shape[0], -1 + if batch_objective_gradient is not None: + objective_names = ( + batch_objective_gradient["objective_name"].unique().to_list() + ) + batch_objective_gradient = batch_objective_gradient.pivot( + on="objective_name", + values=["objective_value", "total_objective_value"], + separator=";", + ).rename( + { + **{f"objective_value;{name}": name for name in objective_names}, + **{ + f"total_objective_value;{name}": f"{name}.total" + for name in objective_names + }, + } ) - if objective_results is None: - objective_results = perturbed_objectives - else: - objective_results = np.hstack((objective_results, perturbed_objectives)) - if config.nonlinear_constraints is not None: - if perturbed_constraints is not None: - perturbed_constraints = perturbed_constraints.reshape( - perturbed_constraints.shape[0], -1 - ) - if constraint_results is None: - constraint_results = perturbed_constraints - else: - constraint_results = np.hstack( - (constraint_results, perturbed_constraints) - ) - # The legacy code converts all constraints to the form f(x) >=0: - constraint_results = self._convert_constraints(config, constraint_results) - - self._add_simulator_results( - config, results.batch_id, objective_results, constraint_results + return _GradientResults( + batch_objective_gradient, + perturbation_objectives, + batch_constraint_gradient, + perturbation_constraints, ) - if config.nonlinear_constraints: - self._add_constraint_values(config, results.batch_id, constraint_results) - if isinstance(results, FunctionResults) and results.functions is not None: - self._add_total_objective(results.functions.weighted_objective) - if isinstance(results, GradientResults) and results.gradients is not None: - self._add_gradients(config, results.gradients.objectives) - def _handle_finished_batch_event(self, event): - logger.debug("Storing batch results in the sqlite database") + def _handle_finished_batch_event(self, event: Event): + logger.debug("Storing batch results dataframes") converted_results = tuple( - convert_to_maximize(result) for result in event.results) - results = [] + convert_to_maximize(result) for result in event.results + ) + results: List[FunctionResults | GradientResults] = [] + + # Q: Maybe this whole clause can be removed? + # Not sure why it is there, putting the best function result first + # +-----------------------------------------------------------------+ + # | | best_value = -np.inf best_results = None for item in converted_results: if isinstance(item, GradientResults): results.append(item) if ( - isinstance(item, FunctionResults) - and item.functions is not None - and item.functions.weighted_objective > best_value + isinstance(item, FunctionResults) + and item.functions is not None + and item.functions.weighted_objective > best_value ): best_value = item.functions.weighted_objective best_results = item + if best_results is not None: - results = [best_results] + results + results = [best_results, *results] + # | | + # +-----------------------------------------------------------------+ last_batch = -1 + + _batches = {} for item in results: - if item.batch_id != last_batch: - self._database.add_batch() - self._store_results(event.config, item) - if item.batch_id != last_batch: - self._database.set_batch_ended - last_batch = item.batch_id + if item.batch_id not in _batches: + _batches[item.batch_id] = {} + + if isinstance(item, FunctionResults): + eval_results = self._store_function_results(event.config, item) + + _batches[item.batch_id]["batch_controls"] = eval_results.batch_controls + _batches[item.batch_id]["batch_objectives"] = ( + eval_results.batch_objectives + ) + _batches[item.batch_id]["realization_objectives"] = ( + eval_results.realization_objectives + ) + _batches[item.batch_id]["batch_constraints"] = ( + eval_results.batch_constraints + ) + _batches[item.batch_id]["realization_constraints"] = ( + eval_results.realization_constraints + ) - self._database.set_batch_ended(time.time(), True) + if isinstance(item, GradientResults): + gradient_results = self._store_gradient_results(event.config, item) - # Merit values are dakota specific, load them if the output file exists: - self._database.update_calculation_result(_get_merit_values(self._merit_file)) + _batches[item.batch_id]["batch_objective_gradient"] = ( + gradient_results.batch_objective_gradient + ) + _batches[item.batch_id]["perturbation_objectives"] = ( + gradient_results.perturbation_objectives + ) + _batches[item.batch_id]["batch_constraint_gradient"] = ( + gradient_results.batch_constraint_gradient + ) + _batches[item.batch_id]["perturbation_constraints"] = ( + gradient_results.perturbation_constraints + ) - backup_data(self._database.location) + if item.batch_id != last_batch: + pass + # Q: Could apply timestamps here but, necessary?.. + # self._database.set_batch_ended + last_batch = item.batch_id + + for batch_id, info in _batches.items(): + self.data.batches.append( + BatchDataFrames( + batch_id=batch_id, + batch_controls=info.get("batch_controls"), + batch_objectives=info.get("batch_objectives"), + realization_objectives=info.get("realization_objectives"), + batch_constraints=info.get("batch_constraints"), + realization_constraints=info.get("realization_constraints"), + batch_objective_gradient=info.get("batch_objective_gradient"), + perturbation_objectives=info.get("perturbation_objectives"), + batch_constraint_gradient=info.get("batch_constraint_gradient"), + perturbation_constraints=info.get("perturbation_constraints"), + ) + ) def _handle_finished_event(self, event): logger.debug("Storing final results in the sqlite database") - self._database.update_calculation_result(_get_merit_values(self._merit_file)) - self._database.set_experiment_ended(time.time()) - def _set_event_handlers(self, optimizer): - optimizer.add_observer( - EventType.START_OPTIMIZER_STEP, self._initialize - ) - optimizer.add_observer( - EventType.FINISHED_EVALUATION, self._handle_finished_batch_event - ) - optimizer.add_observer( - EventType.FINISHED_OPTIMIZER_STEP, - self._handle_finished_event, - ) + merit_values = self._get_merit_values() + if merit_values: + # NOTE: Batch 0 is always an "accepted batch", and "accepted batches" are + # batches with merit_flag , which means that it was an improvement + # Should/could + self.data.batches[0].is_improvement = True + improvement_batches = [ + b for b in self.data.batches if b.batch_objectives is not None + ][1:] + for i, b in enumerate(improvement_batches): + merit_value = next( + (m["value"] for m in merit_values if (m["iter"] - 1) == i), None + ) + if merit_value is None: + continue - def get_optimal_result(self): - snapshot = SebaSnapshot(self._output_dir) - optimum = next( - ( - data - for data in reversed(snapshot.get_optimization_data()) - if data.merit_flag - ), - None, - ) - if optimum is None: - return None - objectives = snapshot.get_snapshot(batches=[optimum.batch_id]) - return OptimalResult( - batch=optimum.batch_id, - controls=optimum.controls, - total_objective=optimum.objective_value, - expected_objectives={ - name:value[0] for name, value in objectives.expected_objectives.items() - }, + b.batch_objectives = b.batch_objectives.with_columns( + polars.lit(merit_value).alias("merit_value") + ) + b.is_improvement = True + else: + max_total_objective = -Infinity + for b in self.data.batches: + if b.batch_objectives is not None: + total_objective = b.batch_objectives["total_objective_value"].item() + if total_objective > max_total_objective: + b.is_improvement = True + max_total_objective = total_objective + + self.write_to_output_dir() + + def get_optimal_result(self) -> Optional[OptimalResult]: + # Only used in tests, not super important + has_merit = any( + "merit_value" in b.batch_objectives.columns + for b in self.data.batches + if b.batch_objectives is not None ) + def find_best_batch(filter_by, sort_by): + matching_batches = [b for b in self.data.batches if filter_by(b)] -def backup_data(database_location) -> None: - src = sqlite3.connect(database_location) - dst = sqlite3.connect(database_location + ".backup") - with dst: - src.backup(dst) - src.close() - dst.close() - - -def _get_merit_fn_lines(merit_path): - if os.path.isfile(merit_path): - with open(merit_path, "r", errors="replace", encoding="utf-8") as reader: - lines = reader.readlines() - start_line_idx = -1 - for inx, line in enumerate(lines): - if "Merit" in line and "feval" in line: - start_line_idx = inx + 1 - if start_line_idx > -1 and line.startswith("="): - return lines[start_line_idx:inx] - if start_line_idx > -1: - return lines[start_line_idx:] - return [] - - -def _parse_merit_line(merit_values_string): - values = [] - for merit_elem in merit_values_string.split(): - try: - values.append(float(merit_elem)) - except ValueError: - for elem in merit_elem.split("0x")[1:]: - values.append(float.fromhex("0x" + elem)) - if len(values) == 8: - # Dakota starts counting at one, correct to be zero-based. - return int(values[5]) - 1, values[4] - return None - - -def _get_merit_values(merit_file): - # Read the file containing merit information. - # The file should contain the following table header - # Iter F(x) mu alpha Merit feval btracks Penalty - # :return: merit values indexed by the function evaluation number - # example: - # 0: merit_value_0 - # 1: merit_value_1 - # 2 merit_value_2 - # ... - # ] - merit_values = [] - if merit_file.exists(): - for line in _get_merit_fn_lines(merit_file): - value = _parse_merit_line(line) - if value is not None: - merit_values.append({"iter":value[0], "value":value[1]}) - return merit_values + if not matching_batches: + return None + + matching_batches.sort(key=sort_by) + _batch = matching_batches[0] + _controls_dict = _batch.batch_controls.drop( + [ + "result_id", + "batch_id", + *[ + c + for c in _batch.batch_controls.columns + if c.endswith(".scaled") # don't need scaled control values + ], + ] + ).to_dicts()[0] + + return _batch, _controls_dict + + if has_merit: + # Minimize merit + batch, controls_dict = find_best_batch( + filter_by=lambda b: ( + b.batch_objectives is not None + and "merit_value" in b.batch_objectives.columns + ), + sort_by=lambda b: b.batch_objectives.select( + polars.col("merit_value").min() + ).item(), + ) + return OptimalResult( + batch=batch.batch_id, + controls=controls_dict, + total_objective=batch.batch_objectives.select( + polars.col("total_objective_value").sample(n=1) + ).item(), + ) + else: + # Maximize objective + batch, controls_dict = find_best_batch( + filter_by=lambda b: b.batch_objectives is not None + and not b.batch_objectives.is_empty(), + sort_by=lambda b: -b.batch_objectives.select( + polars.col("total_objective_value").sample(n=1) + ).item(), + ) + + return OptimalResult( + batch=batch.batch_id, + controls=controls_dict, + total_objective=batch.batch_objectives.select( + polars.col("total_objective_value") + ).item(), + ) + + @staticmethod + def _get_merit_fn_lines(merit_path): + if os.path.isfile(merit_path): + with open(merit_path, "r", errors="replace", encoding="utf-8") as reader: + lines = reader.readlines() + start_line_idx = -1 + for inx, line in enumerate(lines): + if "Merit" in line and "feval" in line: + start_line_idx = inx + 1 + if start_line_idx > -1 and line.startswith("="): + return lines[start_line_idx:inx] + if start_line_idx > -1: + return lines[start_line_idx:] + return [] + + @staticmethod + def _parse_merit_line(merit_values_string): + values = [] + for merit_elem in merit_values_string.split(): + try: + values.append(float(merit_elem)) + except ValueError: + for elem in merit_elem.split("0x")[1:]: + values.append(float.fromhex("0x" + elem)) + if len(values) == 8: + # Dakota starts counting at one, correct to be zero-based. + return int(values[5]) - 1, values[4] + return None + + def _get_merit_values(self): + # Read the file containing merit information. + # The file should contain the following table header + # Iter F(x) mu alpha Merit feval btracks Penalty + # :return: merit values indexed by the function evaluation number + # example: + # 0: merit_value_0 + # 1: merit_value_1 + # 2 merit_value_2 + # ... + # ] + merit_values = [] + if self._merit_file.exists(): + for line in EverestStorage._get_merit_fn_lines(self._merit_file): + value = EverestStorage._parse_merit_line(line) + if value is not None: + merit_values.append({"iter": value[0], "value": value[1]}) + return merit_values