Skip to content

Commit

Permalink
(WIP) make work with export
Browse files Browse the repository at this point in the history
  • Loading branch information
yngve-sk committed Dec 19, 2024
1 parent 4d32b9e commit db4c6dd
Show file tree
Hide file tree
Showing 2 changed files with 227 additions and 58 deletions.
141 changes: 95 additions & 46 deletions src/everest/everest_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def try_read_df(path: Path) -> polars.DataFrame | None:
@dataclass
class BatchDataFrames:
batch_id: int
batch_controls: polars.DataFrame
realization_controls: polars.DataFrame
batch_objectives: polars.DataFrame | None
realization_objectives: polars.DataFrame | None
batch_constraints: polars.DataFrame | None
Expand All @@ -73,6 +73,9 @@ def existing_dataframes(self) -> dict[str, polars.DataFrame]:
if self.realization_objectives is not None:
dataframes["realization_objectives"] = self.realization_objectives

if self.realization_controls is not None:
dataframes["realization_controls"] = self.realization_controls

if self.batch_constraints is not None:
dataframes["batch_constraints"] = self.batch_constraints

Expand Down Expand Up @@ -250,8 +253,8 @@ def read_from_experiment(self, experiment: _OptimizerOnlyExperiment) -> None:
ens.optimizer_mount_point / "perturbation_constraints.parquet"
)

batch_controls = try_read_df(
ens.optimizer_mount_point / "batch_controls.parquet"
realization_controls = try_read_df(
ens.optimizer_mount_point / "realization_controls.parquet"
)

with open(ens.optimizer_mount_point / "batch.json", encoding="utf-8") as f:
Expand All @@ -262,7 +265,7 @@ def read_from_experiment(self, experiment: _OptimizerOnlyExperiment) -> None:
self.batches.append(
BatchDataFrames(
batch_id,
batch_controls,
realization_controls,
batch_objectives,
realization_objectives,
batch_constraints,
Expand Down Expand Up @@ -321,7 +324,7 @@ def get_ensemble_by_name(self, name: str) -> _OptimizerOnlyEnsemble:

@dataclass
class _EvaluationResults:
batch_controls: polars.DataFrame
realization_controls: polars.DataFrame
batch_objectives: polars.DataFrame
realization_objectives: polars.DataFrame
batch_constraints: polars.DataFrame | None
Expand Down Expand Up @@ -411,13 +414,13 @@ def _initialize(self, event):
self._convert_names(config.variables.names), dtype=polars.String
),
"initial_value": polars.Series(
config.variables.initial_values, dtype=polars.Float32
config.variables.initial_values, dtype=polars.Float64
),
"lower_bounds": polars.Series(
config.variables.lower_bounds, dtype=polars.Float32
config.variables.lower_bounds, dtype=polars.Float64
),
"upper_bounds": polars.Series(
config.variables.upper_bounds, dtype=polars.Float32
config.variables.upper_bounds, dtype=polars.Float64
),
}
)
Expand All @@ -426,11 +429,11 @@ def _initialize(self, event):
{
"objective_name": config.objectives.names,
"weight": polars.Series(
config.objectives.weights, dtype=polars.Float32
config.objectives.weights, dtype=polars.Float64
),
"normalization": polars.Series(
[1.0 / s for s in config.objectives.scales],
dtype=polars.Float32,
dtype=polars.Float64,
),
}
)
Expand All @@ -439,7 +442,9 @@ def _initialize(self, event):
self.data.nonlinear_constraints = polars.DataFrame(
{
"constraint_name": config.nonlinear_constraints.names,
"normalization": config.nonlinear_constraints.scales,
"normalization": [
1.0 / s for s in config.nonlinear_constraints.scales
], # Q: Is this correct?
"constraint_rhs_value": config.nonlinear_constraints.rhs_values,
"constraint_type": config.nonlinear_constraints.types,
}
Expand All @@ -451,7 +456,7 @@ def _initialize(self, event):
config.realizations.names, dtype=polars.UInt16
),
"weight": polars.Series(
config.realizations.weights, dtype=polars.Float32
config.realizations.weights, dtype=polars.Float64
),
}
)
Expand All @@ -462,7 +467,7 @@ def _store_function_results(self, results: FunctionResults) -> _EvaluationResult
realization_objectives = polars.from_pandas(
results.to_dataframe(
"evaluations",
select=["variables", "objectives", "constraints", "evaluation_ids"],
select=["objectives", "constraints", "evaluation_ids"],
).reset_index(),
).drop("plan_id")
batch_objectives = polars.from_pandas(
Expand All @@ -472,27 +477,21 @@ def _store_function_results(self, results: FunctionResults) -> _EvaluationResult
).reset_index()
).drop("plan_id")

batch_controls = polars.from_pandas(
results.to_dataframe("evaluations", select=["variables"]).reset_index()
realization_controls = polars.from_pandas(
results.to_dataframe(
"evaluations", select=["variables", "evaluation_ids"]
).reset_index()
).drop("plan_id")

batch_controls = self._rename_columns(batch_controls)
control_names = batch_controls["control_name"].unique().to_list()
realization_controls = self._rename_columns(realization_controls)
realization_controls = self._enforce_dtypes(realization_controls)

has_scaled_controls = "scaled_control_value" in batch_controls
batch_controls = batch_controls.pivot(
realization_controls = realization_controls.pivot(
on="control_name",
values=["control_value"], # , "scaled_control_value"]
separator=":",
)

if has_scaled_controls:
batch_controls = batch_controls.rename(
{
**{f"control_value:{name}": name for name in control_names},
}
)

try:
batch_constraints = polars.from_pandas(
results.to_dataframe("nonlinear_constraints").reset_index()
Expand All @@ -503,7 +502,10 @@ def _store_function_results(self, results: FunctionResults) -> _EvaluationResult
realization_constraints = None

batch_objectives = self._rename_columns(batch_objectives)
batch_objectives = self._enforce_dtypes(batch_objectives)

realization_objectives = self._rename_columns(realization_objectives)
realization_objectives = self._enforce_dtypes(realization_objectives)

batch_objectives = batch_objectives.pivot(
on="objective_name",
Expand Down Expand Up @@ -544,35 +546,33 @@ def _store_function_results(self, results: FunctionResults) -> _EvaluationResult
"result_id",
"batch_id",
"realization",
"simulation_id",
"constraint_name",
"constraint_value",
].unique(["result_id", "batch_id", "realization", "constraint_name"])
]
realization_constraints = realization_constraints.pivot(
values=["constraint_value"], on="constraint_name"
)
realization_objectives = realization_objectives.drop(
[c for c in realization_objectives.columns if "constraint" in c.lower()]
).unique(subset=["result_id", "batch_id", "realization", "control_name"])
)
batch_objectives = batch_objectives.drop(
[c for c in batch_objectives.columns if "constraint" in c.lower()]
).unique(subset=["result_id", "batch_id"])

realization_objectives = (
realization_objectives.drop(["control_name", "control_value"])
.unique(subset=["result_id", "batch_id", "realization", "objective_name"])
.pivot(
values="objective_value",
index=[
"result_id",
"batch_id",
"realization",
],
columns="objective_name",
)

realization_objectives = realization_objectives.pivot(
values="objective_value",
index=[
"result_id",
"batch_id",
"realization",
"simulation_id",
],
columns="objective_name",
)

return _EvaluationResults(
batch_controls,
realization_controls,
batch_objectives,
realization_objectives,
batch_constraints,
Expand Down Expand Up @@ -601,9 +601,52 @@ def _rename_columns(df: polars.DataFrame):
"scaled_perturbed_objectives": "scaled_perturbed_objective_value",
"scaled_perturbed_constraints": "scaled_perturbed_constraint_value",
"scaled_variables": "scaled_control_value",
"evaluation_ids": "simulation_id",
}
return df.rename({k: v for k, v in _renames.items() if k in df.columns})

@staticmethod
def _enforce_dtypes(df: polars.DataFrame):
_dtypes = {
"batch_id": polars.UInt16,
"result_id": polars.UInt16,
"perturbation": polars.UInt16,
"realization": polars.UInt16,
"simulation_id": polars.UInt16,
"objective_name": polars.String,
"control_name": polars.String,
"constraint_name": polars.String,
"total_objective_value": polars.Float64,
"control_value": polars.Float64,
"objective_value": polars.Float64,
"constraint_value": polars.Float64,
"scaled_constraint_value": polars.Float64,
"scaled_objective_value": polars.Float64,
"perturbed_control_value": polars.Float64,
"perturbed_objective_value": polars.Float64,
"perturbed_constraint_value": polars.Float64,
"scaled_perturbed_objective_value": polars.Float64,
"scaled_perturbed_constraint_value": polars.Float64,
"scaled_control_value": polars.Float64,
}

existing_cols = set(df.columns)
unaccounted_cols = existing_cols - set(_dtypes)
if len(unaccounted_cols) > 0:
raise KeyError(
f"Expected all keys to have a specified dtype, found {unaccounted_cols}"
)

df = df.cast(
{
colname: dtype
for colname, dtype in _dtypes.items()
if colname in df.columns
}
)

return df

def _store_gradient_results(self, results: FunctionResults) -> _GradientResults:
perturbation_objectives = polars.from_pandas(
results.to_dataframe("evaluations").reset_index()
Expand All @@ -628,8 +671,10 @@ def _store_gradient_results(self, results: FunctionResults) -> _GradientResults:
if c.lower().startswith("scaled")
)
batch_objective_gradient = self._rename_columns(batch_objective_gradient)
batch_objective_gradient = self._enforce_dtypes(batch_objective_gradient)

perturbation_objectives = self._rename_columns(perturbation_objectives)
perturbation_objectives = self._rename_columns(perturbation_objectives)

if "constraint_name" in perturbation_objectives:
perturbation_constraints = (
Expand Down Expand Up @@ -769,7 +814,9 @@ def _handle_finished_batch_event(self, event: Event):
if isinstance(item, FunctionResults):
eval_results = self._store_function_results(item)

_batches[item.batch_id]["batch_controls"] = eval_results.batch_controls
_batches[item.batch_id]["realization_controls"] = (
eval_results.realization_controls
)
_batches[item.batch_id]["batch_objectives"] = (
eval_results.batch_objectives
)
Expand Down Expand Up @@ -809,7 +856,7 @@ def _handle_finished_batch_event(self, event: Event):
self.data.batches.append(
BatchDataFrames(
batch_id=batch_id,
batch_controls=info.get("batch_controls"),
realization_controls=info.get("realization_controls"),
batch_objectives=info.get("batch_objectives"),
realization_objectives=info.get("realization_objectives"),
batch_constraints=info.get("batch_constraints"),
Expand Down Expand Up @@ -871,13 +918,15 @@ def find_best_batch(filter_by, sort_by):

matching_batches.sort(key=sort_by)
_batch = matching_batches[0]
_controls_dict = _batch.batch_controls.drop(
_controls_dict = _batch.realization_controls.drop(
[
"result_id",
"batch_id",
"simulation_id",
"realization",
*[
c
for c in _batch.batch_controls.columns
for c in _batch.realization_controls.columns
if c.endswith(".scaled") # don't need scaled control values
],
]
Expand Down
Loading

0 comments on commit db4c6dd

Please sign in to comment.