diff --git a/CHANGELOG.md b/CHANGELOG.md index 3da9409727..354a9e45bd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,7 +12,8 @@ but cannot always guarantee backwards compatibility. Changes that may **break co **Improved** - Improvements to `ForecastingModel`: Improved `start` handling for historical forecasts, backtest, residuals, and gridsearch. If `start` is not within the trainable / forecastable points, uses the closest valid start point that is a round multiple of `stride` ahead of start. Raises a ValueError, if no valid start point exists. This guarantees that all historical forecasts are `n * stride` points away from start, and will simplify many downstream tasks. [#2560](https://github.com/unit8co/darts/issues/2560) by [Dennis Bader](https://github.com/dennisbader). -- Added `data_transformers` argument to `historical_forecasts`, `backtest` and `gridsearch` that allows scaling of the series without data-leakage. [#2529](https://github.com/unit8co/darts/pull/2529) by [Antoine Madrona](https://github.com/madtoinou) and [Jan Fidor](https://github.com/JanFidor) +- Added `data_transformers` argument to `historical_forecasts`, `backtest`, `residuals`, and `gridsearch` that allows scaling of the series without data-leakage. [#2529](https://github.com/unit8co/darts/pull/2529) by [Antoine Madrona](https://github.com/madtoinou) and [Jan Fidor](https://github.com/JanFidor) +- Added `idx_params` argument to `DataTransformer` that allows users to use only a subset of the transformers when `global_fit=False` and severals series are used. [#2529](https://github.com/unit8co/darts/pull/2529) by [Antoine Madrona](https://github.com/madtoinou) **Fixed** diff --git a/darts/dataprocessing/pipeline.py b/darts/dataprocessing/pipeline.py index 156825238b..5413e2b68c 100644 --- a/darts/dataprocessing/pipeline.py +++ b/darts/dataprocessing/pipeline.py @@ -5,7 +5,7 @@ from collections.abc import Iterator, Sequence from copy import deepcopy -from typing import Union +from typing import Optional, Union from darts import TimeSeries from darts.dataprocessing.transformers import ( @@ -158,7 +158,9 @@ def fit_transform( return data def transform( - self, data: Union[TimeSeries, Sequence[TimeSeries]] + self, + data: Union[TimeSeries, Sequence[TimeSeries]], + idx_params: Optional[Union[int, Sequence[int]]] = None, ) -> Union[TimeSeries, Sequence[TimeSeries]]: """ For each data transformer in pipeline transform data. Then transformed data is passed to next transformer. @@ -174,11 +176,14 @@ def transform( Transformed data. """ for transformer in self._transformers: - data = transformer.transform(data) + data = transformer.transform(data, idx_params=idx_params) return data def inverse_transform( - self, data: Union[TimeSeries, Sequence[TimeSeries]], partial: bool = False + self, + data: Union[TimeSeries, Sequence[TimeSeries]], + partial: bool = False, + idx_params: Optional[Union[int, Sequence[int]]] = None, ) -> Union[TimeSeries, Sequence[TimeSeries]]: """ For each data transformer in the pipeline, inverse-transform data. Then inverse transformed data is passed to @@ -207,12 +212,12 @@ def inverse_transform( ) for transformer in reversed(self._transformers): - data = transformer.inverse_transform(data) + data = transformer.inverse_transform(data, idx_params=idx_params) return data else: for transformer in reversed(self._transformers): if isinstance(transformer, InvertibleDataTransformer): - data = transformer.inverse_transform(data) + data = transformer.inverse_transform(data, idx_params=idx_params) return data @property diff --git a/darts/dataprocessing/transformers/base_data_transformer.py b/darts/dataprocessing/transformers/base_data_transformer.py index b8ab774651..0fcf118b4f 100644 --- a/darts/dataprocessing/transformers/base_data_transformer.py +++ b/darts/dataprocessing/transformers/base_data_transformer.py @@ -363,7 +363,10 @@ def transform( # Take note of original input for unmasking purposes: if isinstance(series, TimeSeries): data = [series] - transformer_selector = [0] + if idx_params: + transformer_selector = self._check_idx_params(idx_params) + else: + transformer_selector = [0] else: data = series if idx_params: diff --git a/darts/dataprocessing/transformers/fittable_data_transformer.py b/darts/dataprocessing/transformers/fittable_data_transformer.py index 86d470703d..9ef6a9c9f4 100644 --- a/darts/dataprocessing/transformers/fittable_data_transformer.py +++ b/darts/dataprocessing/transformers/fittable_data_transformer.py @@ -271,7 +271,8 @@ def fit( transformer_selector = range(len(series)) params_iterator = self._get_params( - transformer_selector=transformer_selector, calling_fit=True + transformer_selector=transformer_selector, + calling_fit=True, ) fit_iterator = ( zip(data, params_iterator) diff --git a/darts/dataprocessing/transformers/invertible_data_transformer.py b/darts/dataprocessing/transformers/invertible_data_transformer.py index 8daeb5fcdb..41457f370b 100644 --- a/darts/dataprocessing/transformers/invertible_data_transformer.py +++ b/darts/dataprocessing/transformers/invertible_data_transformer.py @@ -327,7 +327,10 @@ def inverse_transform( called_with_sequence_series = False if isinstance(series, TimeSeries): data = [series] - transformer_selector = [0] + if idx_params: + transformer_selector = self._check_idx_params(idx_params) + else: + transformer_selector = [0] called_with_single_series = True elif isinstance(series[0], TimeSeries): # Sequence[TimeSeries] data = series @@ -346,7 +349,6 @@ def inverse_transform( for idx, series_list in iterator_: data.extend(series_list) transformer_selector += [idx] * len(series_list) - input_iterator = _build_tqdm_iterator( zip(data, self._get_params(transformer_selector=transformer_selector)), verbose=self._verbose, diff --git a/darts/models/forecasting/forecasting_model.py b/darts/models/forecasting/forecasting_model.py index 154e3b811e..8ddd2084b2 100644 --- a/darts/models/forecasting/forecasting_model.py +++ b/darts/models/forecasting/forecasting_model.py @@ -911,8 +911,10 @@ def retrain_func( data_transformers=data_transformers, copy=True ) - # data transformer must already be fitted and can be directly applied to all the series + using_prefitted_transformers = False + # data transformer already fitted and can be directly applied to all the series if data_transformers and not retrain: + using_prefitted_transformers = True series, past_covariates, future_covariates = _apply_data_transformers( series=series, past_covariates=past_covariates, @@ -1097,6 +1099,7 @@ def retrain_func( train_series = train_series[-train_length_:] # when `retrain=True`, data transformers are also retrained between iterations to avoid data-leakage + # using a single series if data_transformers and retrain: train_series, past_covariates_, future_covariates_ = ( _apply_data_transformers( @@ -1187,11 +1190,11 @@ def retrain_func( **predict_kwargs, ) - # target transformer is either already fitted or fitted during the retraining forecast = _apply_inverse_data_transformers( series=train_series, forecasts=forecast, data_transformers=data_transformers, + idx_transformer=idx if using_prefitted_transformers else None, ) show_predict_warnings = False diff --git a/darts/tests/utils/historical_forecasts/test_historical_forecasts.py b/darts/tests/utils/historical_forecasts/test_historical_forecasts.py index ffed863bec..e752c24636 100644 --- a/darts/tests/utils/historical_forecasts/test_historical_forecasts.py +++ b/darts/tests/utils/historical_forecasts/test_historical_forecasts.py @@ -2,6 +2,7 @@ import logging from copy import deepcopy from itertools import product +from typing import Optional import numpy as np import pandas as pd @@ -33,6 +34,7 @@ from darts.tests.conftest import TORCH_AVAILABLE, tfm_kwargs from darts.utils import n_steps_between from darts.utils import timeseries_generation as tg +from darts.utils.ts_utils import SeriesType, get_series_seq_type if TORCH_AVAILABLE: import torch @@ -1257,7 +1259,7 @@ def test_optimized_historical_forecasts_regression(self, config): forecast_horizon=forecast_horizon, ) - self.helper_compare_hf(hist_fct, opti_hist_fct, last_points_only) + self.helper_compare_hf(hist_fct, opti_hist_fct) @pytest.mark.parametrize( "config", @@ -2647,6 +2649,7 @@ def helper_manual_scaling_prediction( retrain: bool, end_idx: int, ocl: int, + idx_transformer: Optional[int] = None, ): ts_copy = deepcopy(ts) hf_scaler_copy = deepcopy(hf_scaler) @@ -2661,7 +2664,9 @@ def helper_manual_scaling_prediction( if retrain: hf_scaler_copy[ts_name].fit(tmp_ts) # apply the scaler on the whole series - ts_copy[ts_name] = hf_scaler_copy[ts_name].transform(ts_copy[ts_name]) + ts_copy[ts_name] = hf_scaler_copy[ts_name].transform( + ts_copy[ts_name], idx_params=idx_transformer + ) series = ts_copy.pop("series")[:end_idx] if retrain: @@ -2677,23 +2682,33 @@ def helper_manual_scaling_prediction( # scale back the forecasts if isinstance(hf_scaler_copy.get("series"), InvertibleDataTransformer): - return hf_scaler_copy["series"].inverse_transform(pred) + return hf_scaler_copy["series"].inverse_transform( + pred, idx_params=idx_transformer + ) else: return pred - def helper_compare_hf(self, ts_A, ts_B, last_points_only: bool): - if last_points_only: - ts_A.time_index.equals(ts_B.time_index) - np.testing.assert_almost_equal( - ts_A.all_values(), - ts_B.all_values(), - ) - else: - for ts_a, ts_b in zip(ts_A, ts_B): - assert ts_a.time_index.equals(ts_b.time_index) + def helper_compare_hf(self, ts_A, ts_B): + """Helper method to compare all the entries between two historical forecasts""" + type_ts_a = get_series_seq_type(ts_A) + type_ts_b = get_series_seq_type(ts_B) + + assert type_ts_a == type_ts_b + assert len(ts_A) == len(ts_B) + + if type_ts_a == SeriesType.SINGLE: + ts_A = [[ts_A]] + ts_B = [[ts_B]] + elif type_ts_a == SeriesType.SEQ: + ts_A = [ts_A] + ts_B = [ts_B] + + for ts_a, ts_b in zip(ts_A, ts_B): + for ts_a_, ts_b_ in zip(ts_a, ts_b): + assert ts_a_.time_index.equals(ts_b_.time_index) np.testing.assert_almost_equal( - ts_a.all_values(), - ts_b.all_values(), + ts_a_.all_values(), + ts_b_.all_values(), ) def helper_get_model_params( @@ -2810,7 +2825,7 @@ def test_historical_forecasts_with_scaler(self, params): # verify that the results are identical when using single Scaler or a Pipeline assert len(hf_auto) == len(hf_auto_pipeline) == 2 - self.helper_compare_hf(hf_auto, hf_auto_pipeline, last_points_only) + self.helper_compare_hf(hf_auto, hf_auto_pipeline) # optimized historical forecast since horizon_length <= ocl and retrain=False if not retrain: @@ -2823,7 +2838,7 @@ def test_historical_forecasts_with_scaler(self, params): data_transformers=hf_scaler, ) assert len(opti_hf_auto) == len(hf_auto) == 2 - self.helper_compare_hf(hf_auto, opti_hf_auto, last_points_only) + self.helper_compare_hf(hf_auto, opti_hf_auto) # for 2nd to last historical forecast manual_hf_0 = self.helper_manual_scaling_prediction( @@ -2841,24 +2856,21 @@ def test_historical_forecasts_with_scaler(self, params): values=np.array([manual_hf_0.values()[-1], manual_hf_1.values()[-1]]), columns=manual_hf_0.components, ) - self.helper_compare_hf(tmp_ts, hf_auto, last_points_only) + self.helper_compare_hf(tmp_ts, hf_auto) else: - self.helper_compare_hf( - hf_auto, [manual_hf_0, manual_hf_1], last_points_only - ) + self.helper_compare_hf(hf_auto, [manual_hf_0, manual_hf_1]) def test_historical_forecasts_with_scaler_errors(self): """Check that the appropriate exception is raised when providing incorrect parameters.""" ocl = 2 - model = LinearRegressionModel(lags=5, output_chunk_length=ocl) - model.fit(self.sine_univariate1) - hf_args = { "start": -ocl - 1, "start_format": "position", "forecast_horizon": ocl, "verbose": False, } + model = LinearRegressionModel(lags=5, output_chunk_length=ocl) + model.fit(self.sine_univariate1) # retrain=False and unfitted data transformers with pytest.raises(ValueError) as err: @@ -2869,10 +2881,184 @@ def test_historical_forecasts_with_scaler_errors(self): retrain=False, ) assert str(err.value).startswith( - "All the fittable entries in `data_transformers` must already be fitted when `retrain=False`" + "All the fittable entries in `data_transformers` must already be fitted when `retrain=False`, the " + ) + + # retrain=False, multiple series not matching the fitted data transformers dimensions + with pytest.raises(ValueError) as err: + _ = model.historical_forecasts( + **hf_args, + series=[self.sine_univariate1] * 2, + data_transformers={ + "series": Scaler(global_fit=False).fit([self.sine_univariate1] * 3) + }, + retrain=False, + ) + assert str(err.value).startswith( + "When multiple series are provided, their number should match the number of fitted params in the " + "`data_transformers` defined with `global_fit=False`." + ) + + # retrain=True, multiple series and unfitted data transformers with global_fit=True + with pytest.raises(ValueError) as err: + _ = model.historical_forecasts( + **hf_args, + series=[self.sine_univariate1, self.sine_univariate2], + data_transformers={"series": Scaler(global_fit=True)}, + retrain=True, + ) + assert str(err.value).startswith( + "When `retrain=True` and multiple series are provided, all the fittable `data_transformers` " + "must be defined with `global_fit=False" ) - # TODO: retrain=True and unfitted data transformers with global_fit=True + @pytest.mark.parametrize("params", product([True, False], [True, False])) + def test_historical_forecasts_with_scaler_multiple_series(self, params): + """Verify that the scaling in historical forecasts behave as expected when multiple series are used.""" + retrain, global_fit = params + unique_param_entry = retrain or global_fit + ocl = 2 + hf_args = { + "start": -ocl, + "start_format": "position", + "forecast_horizon": ocl, + "last_points_only": False, + "retrain": retrain, + "verbose": False, + } + series = [self.sine_univariate1, self.sine_univariate2, self.sine_univariate3] + + model = LinearRegressionModel(lags=5, output_chunk_length=ocl) + model.fit(series) + + def get_scaler(fit: bool): + if fit: + return Scaler(global_fit=global_fit).fit(series) + else: + return Scaler(global_fit=global_fit) + + # global fit is not supported with retrain and multiple series + if retrain and global_fit: + expected_msg = ( + "When `retrain=True` and multiple series are provided, all the fittable `data_transformers` must " + "be defined with `global_fit=False`." + ) + with pytest.raises(ValueError) as err: + _ = model.historical_forecasts( + **hf_args, + series=series, + data_transformers={"series": get_scaler(fit=True)}, + ) + assert str(err.value) == expected_msg + + with pytest.raises(ValueError) as err: + _ = model.historical_forecasts( + **hf_args, + series=series, + data_transformers={"series": get_scaler(fit=False)}, + ) + assert str(err.value) == expected_msg + return + + # using all the series used to fit the scaler + hf = model.historical_forecasts( + **hf_args, + series=series, + data_transformers={"series": get_scaler(fit=True)}, + ) + manual_hf_0 = self.helper_manual_scaling_prediction( + model, + {"series": series[0]}, + {"series": get_scaler(fit=True)}, + retrain, + -ocl, + ocl, + idx_transformer=None if unique_param_entry else 0, + ) + manual_hf_1 = self.helper_manual_scaling_prediction( + model, + {"series": series[1]}, + {"series": get_scaler(fit=True)}, + retrain, + -ocl, + ocl, + idx_transformer=None if unique_param_entry else 1, + ) + manual_hf_2 = self.helper_manual_scaling_prediction( + model, + {"series": series[2]}, + {"series": get_scaler(fit=True)}, + retrain, + -ocl, + ocl, + idx_transformer=None if unique_param_entry else 2, + ) + self.helper_compare_hf(hf, [[manual_hf_0], [manual_hf_1], [manual_hf_2]]) + + # scaler fit on 3 series, historical forecast only over the first one + hf = model.historical_forecasts( + **hf_args, + series=series[0], + data_transformers={"series": get_scaler(fit=True)}, + ) + manual_hf_0 = self.helper_manual_scaling_prediction( + model, + {"series": series[0]}, + {"series": get_scaler(fit=True)}, + retrain, + -ocl, + ocl, + ) + self.helper_compare_hf(hf, [manual_hf_0]) + + # scaler fit on 3 series, historical forecast only over the last one, causing a mismatch + hf = model.historical_forecasts( + **hf_args, + series=series[2], + data_transformers={"series": get_scaler(fit=True)}, + ) + # note that the idx_transformer is not specified, only the first transformer is used (instead of the 3rd) + manual_hf_2 = self.helper_manual_scaling_prediction( + model, + {"series": series[2]}, + {"series": get_scaler(fit=True)}, + retrain, + -ocl, + ocl, + ) + self.helper_compare_hf(hf, [manual_hf_2]) + + if retrain: + hf = model.historical_forecasts( + **hf_args, + series=series, + data_transformers={"series": get_scaler(fit=False)}, + ) + manual_hf_0 = self.helper_manual_scaling_prediction( + model, + {"series": series[0]}, + {"series": get_scaler(fit=False)}, + retrain, + -ocl, + ocl, + ) + manual_hf_1 = self.helper_manual_scaling_prediction( + model, + {"series": series[1]}, + {"series": get_scaler(fit=False)}, + retrain, + -ocl, + ocl, + ) + manual_hf_2 = self.helper_manual_scaling_prediction( + model, + {"series": series[2]}, + {"series": get_scaler(fit=False)}, + retrain, + -ocl, + ocl, + ) + self.helper_compare_hf(hf, [[manual_hf_0], [manual_hf_1], [manual_hf_2]]) @pytest.mark.parametrize( "model_type,enable_optimization", diff --git a/darts/utils/historical_forecasts/utils.py b/darts/utils/historical_forecasts/utils.py index e97d264204..5a0122fd82 100644 --- a/darts/utils/historical_forecasts/utils.py +++ b/darts/utils/historical_forecasts/utils.py @@ -10,6 +10,7 @@ from darts.dataprocessing.pipeline import Pipeline from darts.dataprocessing.transformers import ( BaseDataTransformer, + FittableDataTransformer, ) from darts.logging import get_logger, raise_log from darts.timeseries import TimeSeries @@ -221,18 +222,22 @@ def _historical_forecasts_general_checks(model, series, kwargs): data_pipelines = _convert_data_transformers( data_transformers=n.data_transformers, copy=False ) - # identify the global fit pipelines (all transformers are defined with global_fit=True) + # extract pipelines containing at least one fittable element + fittable_pipelines = [ + transf_ for transf_ in data_pipelines.values() if transf_.fittable + ] + # extract pipelines where all the fittable transformer are fitted globally global_fit_pipelines = [ - transf_ for transf_ in data_pipelines.items() if transf_._global_fit + transf_ for transf_ in fittable_pipelines if transf_._global_fit ] if n.retrain: - # if more than one series is passed and the pipelines are retrained, they must all be global - if len(series) > 1 and len(global_fit_pipelines) != len(data_pipelines): + # if more than one series is passed and the pipelines are retrained, they cannot be global + if len(series) > 1 and len(global_fit_pipelines) > 0: raise_log( ValueError( "When `retrain=True` and multiple series are provided, all the fittable " - "`data_transformers` must be defined with `global_fit=True`." + "`data_transformers` must be defined with `global_fit=False`." ), logger, ) @@ -253,15 +258,20 @@ def _historical_forecasts_general_checks(model, series, kwargs): ) # extract the number of fitted params in each pipeline (already fitted) fitted_params_pipelines = [ - max(len(t._fitted_params) for t in pipeline if t.fittable) + max( + len(t._fitted_params) + for t in pipeline + if isinstance(t, FittableDataTransformer) + ) for pipeline in data_pipelines.values() ] - # if multiple series are passed, their number should match the number of fitted params in the - # non-global_fit pipelines + if len(series) > 1: - # the number of series and fitted params should match to avoid unexpected behavior when - # multiple series are used - if len(series) != max(fitted_params_pipelines): + # if multiple series are passed and the pipelines are not all globally fitted, the number of series must + # match the number of fitted params in the pipelines + if len(global_fit_pipelines) != len(fittable_pipelines) and len( + series + ) != max(fitted_params_pipelines): raise_log( ValueError( "When multiple series are provided, their number should match the number of fitted params " @@ -1161,12 +1171,11 @@ def _apply_data_transformers( If the Pipeline is fittable and `fit_transformers=True`, the series are sliced to correspond to the information available at model training time """ - # also, `global_fit`` is implicitly not supported + # `global_fit`` is not supported, requires too complex time indexes manipulation across series (slice and align) if fit_transformers and any( not (isinstance(ts, TimeSeries) or ts is None) for ts in [series, past_covariates, future_covariates] ): - # very complex to slice and align time indexes across series raise_log( ValueError( "Fitting the data transformers on multiple series is not supported, either provide trained " @@ -1205,15 +1214,21 @@ def _apply_inverse_data_transformers( series: Union[TimeSeries, Sequence[TimeSeries]], forecasts: Union[TimeSeries, list[TimeSeries], list[list[TimeSeries]]], data_transformers: dict[str, Pipeline], + idx_transformer: Optional[int] = None, ) -> Union[TimeSeries, list[TimeSeries], list[list[TimeSeries]]]: """ Apply the inverse transform to the forecasts when defined. + + `idx_transformer` is used to retrieve the appropriate transformer when the data transformer was + fitted with several series and global_fit=False. """ if "series" in data_transformers and data_transformers["series"].invertible: called_with_single_series = get_series_seq_type(series) == SeriesType.SINGLE if called_with_single_series: forecasts = [forecasts] - forecasts = data_transformers["series"].inverse_transform(forecasts) + forecasts = data_transformers["series"].inverse_transform( + forecasts, idx_params=idx_transformer + ) return forecasts[0] if called_with_single_series else forecasts else: return forecasts