From 70a19f9c7390c27b6e888cdf7644ddd861431899 Mon Sep 17 00:00:00 2001 From: Theo Brown <7982453+theo-brown@users.noreply.github.com> Date: Tue, 23 Jan 2024 10:35:28 +0000 Subject: [PATCH] Add support for objective value constraints --- jetto_mobo/acquisition.py | 16 ++++++ jetto_mobo/surrogate.py | 106 ++++++++++++++++++++++++-------------- 2 files changed, 82 insertions(+), 40 deletions(-) diff --git a/jetto_mobo/acquisition.py b/jetto_mobo/acquisition.py index 58c4603..0480c66 100644 --- a/jetto_mobo/acquisition.py +++ b/jetto_mobo/acquisition.py @@ -9,6 +9,7 @@ from botorch.acquisition.multi_objective.monte_carlo import ( qNoisyExpectedHypervolumeImprovement, ) +from botorch.acquisition.multi_objective.objective import IdentityMCMultiOutputObjective from botorch.models.gpytorch import GPyTorchModel, ModelListGPyTorchModel from botorch.optim import optimize_acqf from botorch.sampling.normal import SobolQMCNormalSampler @@ -50,6 +51,7 @@ def generate_trial_candidates( bounds: torch.Tensor, model: Union[GPyTorchModel, ModelListGPyTorchModel], acquisition_function: AcquisitionFunction, + n_constraints: int = 0, device: Union[torch.device, None] = None, dtype: Optional[torch.dtype] = None, n_mc_samples: int = 256, @@ -76,6 +78,8 @@ def generate_trial_candidates( Trained surrogate model. acquisition_function : AcquisitionFunction Acquisition function to use for generating trial candidates. We recommend using either ``jetto_mobo.acquisition.qNoisyExpectedImprovement`` for single-objective optimisation and ``jetto_mobo.acquisition.qNoisyExpectedHypervolumeImprovement`` for multi-objective optimisation. + n_constraints : int, optional, default = 0 + Number of constraints. If nonzero, the last ``n_constraints`` outputs of the model will be treated as constraints. device : Union[str, torch.device, None], default = None Torch device to use for optimising the acquisition function. If None, optimisation will be performed using the device the model is on. dtype : Optional[torch.dtype], default = None @@ -139,11 +143,23 @@ def generate_trial_candidates( device=device, dtype=dtype ) + # Handle constraints + if n_constraints == 0: + objective = None + constraints = None + else: + objective = IdentityMCMultiOutputObjective( + outcomes=list(range(model_.num_outputs - n_constraints)) + ) + constraints = [lambda Y: Y[..., -i] for i in range(1, n_constraints + 1)] + # Set up acquisition function acqf = acquisition_function( model=model_, X_baseline=observed_inputs_, sampler=SobolQMCNormalSampler(sample_shape=torch.Size([n_mc_samples])), + objective=objective, + constraints=constraints, **acqf_kwargs, ) diff --git a/jetto_mobo/surrogate.py b/jetto_mobo/surrogate.py index 78b8644..b57ec98 100644 --- a/jetto_mobo/surrogate.py +++ b/jetto_mobo/surrogate.py @@ -1,23 +1,22 @@ # TODO: write module-level docstring -from typing import Literal, Optional, Union +from typing import Optional, Union import torch from botorch import fit_gpytorch_mll from botorch.models import SingleTaskGP from botorch.models.model_list_gp_regression import ModelListGP -from botorch.models.transforms.outcome import Standardize from botorch.models.transforms.input import Normalize -from gpytorch.mlls import ExactMarginalLogLikelihood +from botorch.models.transforms.outcome import Standardize from gpytorch.mlls.sum_marginal_log_likelihood import SumMarginalLogLikelihood def fit_surrogate_model( - X: torch.Tensor, - X_bounds: torch.Tensor, - Y: torch.Tensor, + inputs: torch.Tensor, + input_bounds: torch.Tensor, + objective_values: torch.Tensor, + constraint_values: Optional[torch.Tensor] = None, device: Union[str, torch.device, None] = None, dtype: Optional[torch.dtype] = None, - mode: Literal["independent", "joint"] = "joint", normalise: bool = True, standardise: bool = True, ) -> Union[SingleTaskGP, ModelListGP]: @@ -26,22 +25,23 @@ def fit_surrogate_model( Parameters ---------- - X : torch.Tensor + inputs : torch.Tensor Input data. Shape is ``(N, D)``, where ``N`` is the number of data points and ``D`` is the number of input dimensions. - X_bounds : torch.Tensor + input_bounds : torch.Tensor Bounds on the input space, shape ``(D, 2)``. ``bounds[i, 0]`` is the lower bound and ``bounds[i, 1]`` is the upper bound for the ``i``th input. - Y : torch.Tensor - Output data. Shape is ``(N, M)``, where ``N`` is the number of data points and ``M`` is the number of output dimensions. + objective_values : torch.Tensor + Output data. Shape is ``(N, M)``, where ``N`` is the number of data points and ``M`` is the number of objectives. + constraint_values : Optional[torch.Tensor], default = None + Constraint data. Constraint functions should be of the form ``g(x) <= 0``, which means ``constraint_values`` will be negative if the constraint is satisfied. + Shape is ``(N, C)``, where ``N`` is the number of data points and ``C`` is the number of constraints. device : Optional[Union[str, torch.device]], default = None Device to use for fitting the surrogate model. If None, the device of the input data (``X``) will be used. dtype : Optional[torch.dtype], default = None Data type to use for fitting the surrogate model. If None, the data type of the input data (``X``) will be used. - mode : Literal["independent", "joint"], default = "joint" - Type of surrogate model to use. If ``"joint"``, all outputs are modelled jointly. If ``"independent"``, each output is modelled independently. normalise : bool, default = True Whether to normalise the input data before fitting the surrogate model. This normally results in improved performance. standardise : bool, default = True - Whether to standardise the output data before fitting the surrogate model. This normally results in improved performance. + Whether to standardise the objective data before fitting the surrogate model. This normally results in improved performance. Returns ------- @@ -49,50 +49,76 @@ def fit_surrogate_model( Fitted surrogate model. """ if device is None: - device = X.device + device = inputs.device if dtype is None: - dtype = X.dtype + dtype = inputs.dtype # Convert to correct device and data type - X_ = X.to(device=device, dtype=dtype) - Y_ = Y.to(device=device, dtype=dtype) - X_bounds_ = X_bounds.to(device=device, dtype=dtype) + inputs_ = inputs.to(device=device, dtype=dtype) + objective_values_ = objective_values.to(device=device, dtype=dtype) + constraint_values_ = constraint_values.to(device=device, dtype=dtype) + input_bounds_ = input_bounds.to(device=device, dtype=dtype) # Check that dimensions match - if not X.shape[0] == Y.shape[0]: + if not inputs.shape[0] == objective_values_.shape[0]: raise ValueError( - f"Shape of input and output data must match in the first dimension (got input shape {X.shape} and output shape {Y.shape})." + f"Shape of input and output data must match in the first dimension (got input shape {inputs.shape} and output shape {objective_values_.shape})." + ) + if constraint_values is not None: + if not inputs.shape[0] == constraint_values_.shape[0]: + raise ValueError( + f"Shape of input and constraint data must match in the first dimension (got input shape {inputs.shape} and constraint shape {constraint_values_.shape})." + ) + if not inputs.shape[1] == input_bounds_.shape[0]: + raise ValueError( + f"Number of input dimensions must match the number of bounds (got input shape {inputs.shape} and bounds shape {input_bounds_.shape})." ) # Transforms - input_transform = Normalize(d=X_.size(-1), bounds=X_bounds) if normalise else None - output_transform = Standardize(m=Y.size(-1)) if standardise else None + input_transform = ( + Normalize(d=inputs_.size(-1), bounds=input_bounds_) if normalise else None + ) + output_transform = ( + Standardize(m=objective_values_.size(-1)) if standardise else None + ) - # Select model - if mode == "joint": - model = SingleTaskGP( - X_, - Y_, - input_transform=input_transform, - outcome_transform=output_transform, - ) - mll = ExactMarginalLogLikelihood(model.likelihood, model) - elif mode == "independent": + # Initialise model + if constraint_values is not None: + objective_models = [ + SingleTaskGP( + inputs_, + objective_values_[:, i].unsqueeze(1), + input_transform=input_transform, + outcome_transform=output_transform, + ) + for i in range(objective_values_.shape[1]) + ] + constraint_models = [ + SingleTaskGP( + inputs_, + constraint_values_[:, i].unsqueeze(1), + input_transform=input_transform, + outcome_transform=output_transform, + ) + for i in range(constraint_values_.shape[1]) + ] + models = objective_models + constraint_models + model = ModelListGP(*models) + mll = SumMarginalLogLikelihood(model.likelihood, model) + else: model = ModelListGP( *[ SingleTaskGP( - X_, - Y_[:, i].unsqueeze(1), + inputs_, + objective_values_[:, i].unsqueeze(1), input_transform=input_transform, outcome_transform=output_transform, ) - for i in range(Y_.shape[1]) + for i in range(objective_values_.shape[1]) ] ) - mll = SumMarginalLogLikelihood(model.likelihood, model) - else: - raise ValueError(f"Unknown mode {mode}. Must be 'joint' or 'list'.") - # Return fitted model + # Fit model + mll = SumMarginalLogLikelihood(model.likelihood, model) fit_gpytorch_mll(mll) return model