Skip to content

Commit

Permalink
Add support for objective value constraints
Browse files Browse the repository at this point in the history
  • Loading branch information
theo-brown committed Jan 23, 2024
1 parent e038e21 commit 70a19f9
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 40 deletions.
16 changes: 16 additions & 0 deletions jetto_mobo/acquisition.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from botorch.acquisition.multi_objective.monte_carlo import (
qNoisyExpectedHypervolumeImprovement,
)
from botorch.acquisition.multi_objective.objective import IdentityMCMultiOutputObjective
from botorch.models.gpytorch import GPyTorchModel, ModelListGPyTorchModel
from botorch.optim import optimize_acqf
from botorch.sampling.normal import SobolQMCNormalSampler
Expand Down Expand Up @@ -50,6 +51,7 @@ def generate_trial_candidates(
bounds: torch.Tensor,
model: Union[GPyTorchModel, ModelListGPyTorchModel],
acquisition_function: AcquisitionFunction,
n_constraints: int = 0,
device: Union[torch.device, None] = None,
dtype: Optional[torch.dtype] = None,
n_mc_samples: int = 256,
Expand All @@ -76,6 +78,8 @@ def generate_trial_candidates(
Trained surrogate model.
acquisition_function : AcquisitionFunction
Acquisition function to use for generating trial candidates. We recommend using either ``jetto_mobo.acquisition.qNoisyExpectedImprovement`` for single-objective optimisation and ``jetto_mobo.acquisition.qNoisyExpectedHypervolumeImprovement`` for multi-objective optimisation.
n_constraints : int, optional, default = 0
Number of constraints. If nonzero, the last ``n_constraints`` outputs of the model will be treated as constraints.
device : Union[str, torch.device, None], default = None
Torch device to use for optimising the acquisition function. If None, optimisation will be performed using the device the model is on.
dtype : Optional[torch.dtype], default = None
Expand Down Expand Up @@ -139,11 +143,23 @@ def generate_trial_candidates(
device=device, dtype=dtype
)

# Handle constraints
if n_constraints == 0:
objective = None
constraints = None
else:
objective = IdentityMCMultiOutputObjective(
outcomes=list(range(model_.num_outputs - n_constraints))
)
constraints = [lambda Y: Y[..., -i] for i in range(1, n_constraints + 1)]

# Set up acquisition function
acqf = acquisition_function(
model=model_,
X_baseline=observed_inputs_,
sampler=SobolQMCNormalSampler(sample_shape=torch.Size([n_mc_samples])),
objective=objective,
constraints=constraints,
**acqf_kwargs,
)

Expand Down
106 changes: 66 additions & 40 deletions jetto_mobo/surrogate.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
# TODO: write module-level docstring
from typing import Literal, Optional, Union
from typing import Optional, Union

import torch
from botorch import fit_gpytorch_mll
from botorch.models import SingleTaskGP
from botorch.models.model_list_gp_regression import ModelListGP
from botorch.models.transforms.outcome import Standardize
from botorch.models.transforms.input import Normalize
from gpytorch.mlls import ExactMarginalLogLikelihood
from botorch.models.transforms.outcome import Standardize
from gpytorch.mlls.sum_marginal_log_likelihood import SumMarginalLogLikelihood


def fit_surrogate_model(
X: torch.Tensor,
X_bounds: torch.Tensor,
Y: torch.Tensor,
inputs: torch.Tensor,
input_bounds: torch.Tensor,
objective_values: torch.Tensor,
constraint_values: Optional[torch.Tensor] = None,
device: Union[str, torch.device, None] = None,
dtype: Optional[torch.dtype] = None,
mode: Literal["independent", "joint"] = "joint",
normalise: bool = True,
standardise: bool = True,
) -> Union[SingleTaskGP, ModelListGP]:
Expand All @@ -26,73 +25,100 @@ def fit_surrogate_model(
Parameters
----------
X : torch.Tensor
inputs : torch.Tensor
Input data. Shape is ``(N, D)``, where ``N`` is the number of data points and ``D`` is the number of input dimensions.
X_bounds : torch.Tensor
input_bounds : torch.Tensor
Bounds on the input space, shape ``(D, 2)``. ``bounds[i, 0]`` is the lower bound and ``bounds[i, 1]`` is the upper bound for the ``i``th input.
Y : torch.Tensor
Output data. Shape is ``(N, M)``, where ``N`` is the number of data points and ``M`` is the number of output dimensions.
objective_values : torch.Tensor
Output data. Shape is ``(N, M)``, where ``N`` is the number of data points and ``M`` is the number of objectives.
constraint_values : Optional[torch.Tensor], default = None
Constraint data. Constraint functions should be of the form ``g(x) <= 0``, which means ``constraint_values`` will be negative if the constraint is satisfied.
Shape is ``(N, C)``, where ``N`` is the number of data points and ``C`` is the number of constraints.
device : Optional[Union[str, torch.device]], default = None
Device to use for fitting the surrogate model. If None, the device of the input data (``X``) will be used.
dtype : Optional[torch.dtype], default = None
Data type to use for fitting the surrogate model. If None, the data type of the input data (``X``) will be used.
mode : Literal["independent", "joint"], default = "joint"
Type of surrogate model to use. If ``"joint"``, all outputs are modelled jointly. If ``"independent"``, each output is modelled independently.
normalise : bool, default = True
Whether to normalise the input data before fitting the surrogate model. This normally results in improved performance.
standardise : bool, default = True
Whether to standardise the output data before fitting the surrogate model. This normally results in improved performance.
Whether to standardise the objective data before fitting the surrogate model. This normally results in improved performance.
Returns
-------
Union[SingleTaskGP, ModelListGP]
Fitted surrogate model.
"""
if device is None:
device = X.device
device = inputs.device
if dtype is None:
dtype = X.dtype
dtype = inputs.dtype

# Convert to correct device and data type
X_ = X.to(device=device, dtype=dtype)
Y_ = Y.to(device=device, dtype=dtype)
X_bounds_ = X_bounds.to(device=device, dtype=dtype)
inputs_ = inputs.to(device=device, dtype=dtype)
objective_values_ = objective_values.to(device=device, dtype=dtype)
constraint_values_ = constraint_values.to(device=device, dtype=dtype)
input_bounds_ = input_bounds.to(device=device, dtype=dtype)

# Check that dimensions match
if not X.shape[0] == Y.shape[0]:
if not inputs.shape[0] == objective_values_.shape[0]:
raise ValueError(
f"Shape of input and output data must match in the first dimension (got input shape {X.shape} and output shape {Y.shape})."
f"Shape of input and output data must match in the first dimension (got input shape {inputs.shape} and output shape {objective_values_.shape})."
)
if constraint_values is not None:
if not inputs.shape[0] == constraint_values_.shape[0]:
raise ValueError(
f"Shape of input and constraint data must match in the first dimension (got input shape {inputs.shape} and constraint shape {constraint_values_.shape})."
)
if not inputs.shape[1] == input_bounds_.shape[0]:
raise ValueError(
f"Number of input dimensions must match the number of bounds (got input shape {inputs.shape} and bounds shape {input_bounds_.shape})."
)

# Transforms
input_transform = Normalize(d=X_.size(-1), bounds=X_bounds) if normalise else None
output_transform = Standardize(m=Y.size(-1)) if standardise else None
input_transform = (
Normalize(d=inputs_.size(-1), bounds=input_bounds_) if normalise else None
)
output_transform = (
Standardize(m=objective_values_.size(-1)) if standardise else None
)

# Select model
if mode == "joint":
model = SingleTaskGP(
X_,
Y_,
input_transform=input_transform,
outcome_transform=output_transform,
)
mll = ExactMarginalLogLikelihood(model.likelihood, model)
elif mode == "independent":
# Initialise model
if constraint_values is not None:
objective_models = [
SingleTaskGP(
inputs_,
objective_values_[:, i].unsqueeze(1),
input_transform=input_transform,
outcome_transform=output_transform,
)
for i in range(objective_values_.shape[1])
]
constraint_models = [
SingleTaskGP(
inputs_,
constraint_values_[:, i].unsqueeze(1),
input_transform=input_transform,
outcome_transform=output_transform,
)
for i in range(constraint_values_.shape[1])
]
models = objective_models + constraint_models
model = ModelListGP(*models)
mll = SumMarginalLogLikelihood(model.likelihood, model)
else:
model = ModelListGP(
*[
SingleTaskGP(
X_,
Y_[:, i].unsqueeze(1),
inputs_,
objective_values_[:, i].unsqueeze(1),
input_transform=input_transform,
outcome_transform=output_transform,
)
for i in range(Y_.shape[1])
for i in range(objective_values_.shape[1])
]
)
mll = SumMarginalLogLikelihood(model.likelihood, model)
else:
raise ValueError(f"Unknown mode {mode}. Must be 'joint' or 'list'.")

# Return fitted model
# Fit model
mll = SumMarginalLogLikelihood(model.likelihood, model)
fit_gpytorch_mll(mll)
return model

1 comment on commit 70a19f9

@theo-brown
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this commit revoked support for joint modelling of objectives; now we use a ModelListGP only

Please sign in to comment.