From bd243a8dd1acfa060cb365daec32f2758c2c6fa0 Mon Sep 17 00:00:00 2001 From: qroa Date: Mon, 16 Dec 2024 12:25:06 +0100 Subject: [PATCH] REFACTOR: restucture the MapieQuantileRegressor Fit - Split the fit into prefit_estimators, fit_estimators and conformalize --- mapie/regression/quantile_regression.py | 194 +++++++++++++++--------- 1 file changed, 123 insertions(+), 71 deletions(-) diff --git a/mapie/regression/quantile_regression.py b/mapie/regression/quantile_regression.py index ed19e3e1b..558ab45de 100644 --- a/mapie/regression/quantile_regression.py +++ b/mapie/regression/quantile_regression.py @@ -1,7 +1,7 @@ from __future__ import annotations import warnings -from typing import Iterable, List, Optional, Tuple, Union, cast +from typing import Iterable, Dict, List, Optional, Tuple, Union, cast import numpy as np from sklearn.base import RegressorMixin, clone @@ -548,92 +548,144 @@ def fit( The model itself. """ self.cv = self._check_cv(cast(str, self.cv)) - - # Initialization + self.alpha_np = self._check_alpha(self.alpha) self.estimators_: List[RegressorMixin] = [] - if self.cv == "prefit": - estimator = cast(List, self.estimator) - alpha = self._check_alpha(self.alpha) - self._check_prefit_params(estimator) - X_calib, y_calib = indexable(X, y) - self.n_calib_samples = _num_samples(y_calib) - y_calib_preds = np.full( - shape=(3, self.n_calib_samples), - fill_value=np.nan - ) - for i, est in enumerate(estimator): - self.estimators_.append(est) - y_calib_preds[i] = est.predict(X_calib).ravel() - self.single_estimator_ = self.estimators_[2] + if self.cv == "prefit": + X_calib, y_calib = self.prefit_estimators(X, y) else: - # Checks - self._check_parameters() - checked_estimator = self._check_estimator(self.estimator) - alpha = self._check_alpha(self.alpha) - X, y = indexable(X, y) - random_state = check_random_state(random_state) - results = self._check_calib_set( - X, - y, - sample_weight, - X_calib, - y_calib, - calib_size, - random_state, - shuffle, - stratify, + X_calib, y_calib = self.fit_estimators( + X=X, + y=y, + sample_weight=sample_weight, + groups=groups, + X_calib=X_calib, + y_calib=y_calib, + calib_size=calib_size, + random_state=random_state, + shuffle=shuffle, + stratify=stratify, + **fit_params, ) - X_train, y_train, X_calib, y_calib, sample_weight_train = results - X_train, y_train = indexable(X_train, y_train) - X_calib, y_calib = indexable(X_calib, y_calib) - y_train, y_calib = _check_y(y_train), _check_y(y_calib) - self.n_calib_samples = _num_samples(y_calib) - check_alpha_and_n_samples(self.alpha, self.n_calib_samples) - sample_weight_train, X_train, y_train = check_null_weight( - sample_weight_train, + + self.conformalize(X_calib, y_calib) + + return self + + def prefit_estimators( + self, + X: ArrayLike, + y: ArrayLike + ) -> Tuple[ArrayLike, ArrayLike]: + + estimator = cast(List, self.estimator) + self._check_prefit_params(estimator) + + for i, est in enumerate(estimator): + self.estimators_.append(est) + self.single_estimator_ = self.estimators_[2] + + X_calib, y_calib = indexable(X, y) + return X_calib, y_calib + + def fit_estimators( + self, + X: ArrayLike, + y: ArrayLike, + sample_weight: Optional[ArrayLike] = None, + groups: Optional[ArrayLike] = None, + X_calib: Optional[ArrayLike] = None, + y_calib: Optional[ArrayLike] = None, + calib_size: Optional[float] = 0.3, + random_state: Optional[Union[int, np.random.RandomState]] = None, + shuffle: Optional[bool] = True, + stratify: Optional[ArrayLike] = None, + **fit_params, + ): + + self._check_parameters() + checked_estimator = self._check_estimator(self.estimator) + random_state = check_random_state(random_state) + X, y = indexable(X, y) + + results = self._check_calib_set( + X, + y, + sample_weight, + X_calib, + y_calib, + calib_size, + random_state, + shuffle, + stratify, + ) + + X_train, y_train, X_calib, y_calib, sample_weight_train = results + X_train, y_train = indexable(X_train, y_train) + X_calib, y_calib = indexable(X_calib, y_calib) + y_train, y_calib = _check_y(y_train), _check_y(y_calib) + self.n_calib_samples = _num_samples(y_calib) + check_alpha_and_n_samples(self.alpha, self.n_calib_samples) + sample_weight_train, X_train, y_train = check_null_weight( + sample_weight_train, + X_train, + y_train + ) + y_train = cast(NDArray, y_train) + + if isinstance(checked_estimator, Pipeline): + estimator = checked_estimator[-1] + else: + estimator = checked_estimator + name_estimator = estimator.__class__.__name__ + alpha_name = self.quantile_estimator_params[ + name_estimator + ]["alpha_name"] + for i, alpha_ in enumerate(self.alpha_np): + cloned_estimator_ = clone(checked_estimator) + params = {alpha_name: alpha_} + if isinstance(checked_estimator, Pipeline): + cloned_estimator_[-1].set_params(**params) + else: + cloned_estimator_.set_params(**params) + self.estimators_.append(fit_estimator( + cloned_estimator_, X_train, - y_train + y_train, + sample_weight_train, + **fit_params, + ) ) - y_train = cast(NDArray, y_train) + self.single_estimator_ = self.estimators_[2] - y_calib_preds = np.full( + return X_calib, y_calib + + def conformalize( + self, + X_conf: ArrayLike, + y_conf: ArrayLike, + sample_weight: Optional[ArrayLike] = None, + predict_params: Dict = {}, + ): + + X_conf, y_conf = indexable(X_conf, y_conf) + self.n_calib_samples = _num_samples(y_conf) + + y_calib_preds = np.full( shape=(3, self.n_calib_samples), fill_value=np.nan ) - if isinstance(checked_estimator, Pipeline): - estimator = checked_estimator[-1] - else: - estimator = checked_estimator - name_estimator = estimator.__class__.__name__ - alpha_name = self.quantile_estimator_params[ - name_estimator - ]["alpha_name"] - for i, alpha_ in enumerate(alpha): - cloned_estimator_ = clone(checked_estimator) - params = {alpha_name: alpha_} - if isinstance(checked_estimator, Pipeline): - cloned_estimator_[-1].set_params(**params) - else: - cloned_estimator_.set_params(**params) - self.estimators_.append(fit_estimator( - cloned_estimator_, - X_train, - y_train, - sample_weight_train, - **fit_params, - ) - ) - y_calib_preds[i] = self.estimators_[-1].predict(X_calib) - self.single_estimator_ = self.estimators_[2] + for i, est in enumerate(self.estimators_): + y_calib_preds[i] = est.predict(X_conf, **predict_params).ravel() self.conformity_scores_ = np.full( shape=(3, self.n_calib_samples), fill_value=np.nan ) - self.conformity_scores_[0] = y_calib_preds[0] - y_calib - self.conformity_scores_[1] = y_calib - y_calib_preds[1] + + self.conformity_scores_[0] = y_calib_preds[0] - y_conf + self.conformity_scores_[1] = y_conf - y_calib_preds[1] self.conformity_scores_[2] = np.max( [ self.conformity_scores_[0],