diff --git a/hacking/datasets_from_tsp.py b/hacking/datasets_from_tsp.py index bb4312e7..2f5be66f 100644 --- a/hacking/datasets_from_tsp.py +++ b/hacking/datasets_from_tsp.py @@ -39,3 +39,24 @@ split_config = {"train": [0, 0.7], "valid": [0.7, 0.9], "test": [0.9, 1]} train, valid, test = tsp.get_datasets(df, split_config) + + +# %% + +df = pd.read_csv("/Users/wmgifford/Downloads/weather.csv", parse_dates=["date"]) + +tsp = TimeSeriesPreprocessor( + timestamp_column="date", + id_columns=[], + target_columns=[], + prediction_length=96, + context_length=512, +) + +a, b, c = tsp.get_datasets( + df, + split_config={ + "train": 0.7, + "test": 0.2, + }, +) diff --git a/tests/toolkit/conftest.py b/tests/toolkit/conftest.py index aeb7c883..1ae61a6a 100644 --- a/tests/toolkit/conftest.py +++ b/tests/toolkit/conftest.py @@ -18,7 +18,7 @@ def ts_data(): "id2": nreps(["XX", "YY", "ZZ"], 50), "timestamp": [datetime(2021, 1, 1) + timedelta(days=i) for i in range(50)] * 3, "value1": range(150), - "value2": np.arange(150) / 3 + 10, + "value2": np.arange(150) ** 2 / 3 + 10, } ) return df @@ -35,3 +35,16 @@ def sample_data(): } ) return df + + +@pytest.fixture(scope="module") +def ts_data_runs(): + df = pd.DataFrame( + { + "run_id": nreps(["1", "2", "3", "4"], 50), + "asset_id": nreps(["foo", "bar", "foo", "bar"], 50), + "timestamp": [datetime(2021, 1, 1) + timedelta(days=i) for i in range(50)] * 4, + "value1": range(200), + } + ) + return df diff --git a/tests/toolkit/test_time_series_forecasting_pipeline.py b/tests/toolkit/test_time_series_forecasting_pipeline.py index 7b87dec5..87e8753c 100644 --- a/tests/toolkit/test_time_series_forecasting_pipeline.py +++ b/tests/toolkit/test_time_series_forecasting_pipeline.py @@ -8,6 +8,7 @@ from tsfm_public.toolkit.time_series_forecasting_pipeline import ( TimeSeriesForecastingPipeline, ) +from tsfm_public.toolkit.time_series_preprocessor import TimeSeriesPreprocessor from tsfm_public.toolkit.util import select_by_index @@ -79,3 +80,62 @@ def test_forecasting_pipeline_forecasts(): forecasts_exploded = forecast_pipeline(test_data) assert forecasts_exploded.shape == (prediction_length, len(target_columns) + 1) + + +def test_forecasting_pipeline_forecasts_with_preprocessor(): + timestamp_column = "date" + id_columns = [] + target_columns = ["HUFL", "HULL", "MUFL", "MULL", "LUFL", "LULL", "OT"] + prediction_length = 96 + + model_path = "ibm/patchtst-etth1-forecasting" + model = PatchTSTForPrediction.from_pretrained(model_path) + context_length = model.config.context_length + + tsp = TimeSeriesPreprocessor( + timestamp_column=timestamp_column, + id_columns=id_columns, + target_columns=target_columns, + context_length=context_length, + prediction_length=prediction_length, + freq="1h", + ) + + forecast_pipeline = TimeSeriesForecastingPipeline( + model=model, + timestamp_column=timestamp_column, + id_columns=id_columns, + target_columns=target_columns, + freq="1h", + feature_extractor=tsp, + explode_forecasts=False, + ) + + dataset_path = "https://raw.githubusercontent.com/zhouhaoyi/ETDataset/main/ETT-small/ETTh2.csv" + data = pd.read_csv( + dataset_path, + parse_dates=[timestamp_column], + ) + test_end_index = 12 * 30 * 24 + 8 * 30 * 24 + test_start_index = test_end_index - context_length - 4 + + data = pd.read_csv( + dataset_path, + parse_dates=[timestamp_column], + ) + + test_data = select_by_index( + data, + id_columns=id_columns, + start_index=test_start_index, + end_index=test_end_index, + ) + + forecasts = forecast_pipeline(test_data) + + assert forecasts.shape == ( + test_end_index - test_start_index - context_length + 1, + 2 * len(target_columns) + 1, + ) + + # to do: add check on the scaling diff --git a/tests/toolkit/test_time_series_preprocessor.py b/tests/toolkit/test_time_series_preprocessor.py index 73ac6bf6..9c052fec 100644 --- a/tests/toolkit/test_time_series_preprocessor.py +++ b/tests/toolkit/test_time_series_preprocessor.py @@ -84,6 +84,75 @@ def test_time_series_preprocessor_encodes(sample_data): assert sample_prep[c].dtype == float +def test_time_series_preprocessor_scales(ts_data): + df = ts_data + + tsp = TimeSeriesPreprocessor( + timestamp_column="timestamp", + prediction_length=2, + context_length=5, + id_columns=["id", "id2"], + target_columns=["value1", "value2"], + scaling=True, + ) + + tsp.train(df) + + # check scaled result + out = tsp.preprocess(df) + assert np.allclose(out.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.mean(x)), 0.0) + assert np.allclose(out.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.std(x)), 1.0) + + # check inverse scale result + out_inv = tsp.inverse_scale_targets(out) + assert np.all( + out_inv.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.mean(x)) + == df.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.mean(x)) + ) + assert np.all( + out_inv.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.std(x)) + == df.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.std(x)) + ) + + # check inverse scale result, with suffix + + suffix = "_foo" + targets_suffix = [f"{c}{suffix}" for c in tsp.target_columns] + out.columns = [f"{c}{suffix}" if c in tsp.target_columns else c for c in out.columns] + out_inv = tsp.inverse_scale_targets(out, suffix=suffix) + assert np.all( + out_inv.groupby(tsp.id_columns)[targets_suffix].apply(lambda x: np.mean(x)) + == df.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.mean(x)) + ) + + +def test_time_series_preprocessor_inv_scales_lists(ts_data): + df = ts_data + + tsp = TimeSeriesPreprocessor( + timestamp_column="timestamp", + prediction_length=2, + context_length=5, + id_columns=["id", "id2"], + target_columns=["value1", "value2"], + scaling=True, + ) + + tsp.train(df) + + # check scaled result + out = tsp.preprocess(df) + + # construct artificial result + out["value1"] = out["value1"].apply(lambda x: np.array([x] * 3)) + out["value2"] = out["value2"].apply(lambda x: np.array([x] * 3)) + + out_inv = tsp.inverse_scale_targets(out) + + assert out_inv["value1"].mean()[0] == df["value1"].mean() + assert out_inv["value2"].mean()[0] == df["value2"].mean() + + def test_augment_time_series(ts_data): periods = 5 a = extend_time_series(ts_data, timestamp_column="timestamp", grouping_columns=["id"], periods=periods) @@ -197,7 +266,9 @@ def test_get_datasets(ts_data): ) # new train length should be 20% of 100, minus the usual for context length and prediction length - fewshot_train_size = int(100 * 0.2) - (tsp.context_length + tsp.prediction_length) + 1 + fewshot_train_size = ( + int((100 - tsp.context_length) * 0.2) + tsp.context_length - (tsp.context_length + tsp.prediction_length) + 1 + ) assert len(train) == fewshot_train_size assert len(valid) == len(test) @@ -225,3 +296,92 @@ def test_get_datasets(ts_data): assert len(train) == fewshot_train_size assert len(valid) == len(test) + + # fraction splits + # no id columns, so treat as one big time series + tsp = TimeSeriesPreprocessor( + id_columns=[], + target_columns=["value1", "value2"], + prediction_length=5, + context_length=10, + ) + + train, valid, test = tsp.get_datasets( + ts_data, + split_config={ + "train": 0.7, + "test": 0.2, + }, + ) + + assert len(train) == int(150 * 0.7) - (tsp.context_length + tsp.prediction_length) + 1 + + assert len(test) == int(150 * 0.2) - tsp.prediction_length + 1 + + assert len(valid) == 150 - int(150 * 0.2) - int(150 * 0.7) - tsp.prediction_length + 1 + + +def test_train_without_targets(ts_data): + # no targets or other columns specified + tsp = TimeSeriesPreprocessor(id_columns=["id", "id2"], timestamp_column="timestamp") + tsp.train(ts_data) + + assert tsp.target_columns == ["value1", "value2"] + + # some other args specified + for arg in [ + "control_columns", + "conditional_columns", + "observable_columns", + "static_categorical_columns", + ]: + tsp = TimeSeriesPreprocessor( + id_columns=["id", "id2"], + timestamp_column="timestamp", + **{arg: ["value2"]}, + ) + tsp.train(ts_data) + + assert tsp.target_columns == ["value1"] + + # test targets honored + tsp = TimeSeriesPreprocessor( + id_columns=["id", "id2"], + timestamp_column="timestamp", + target_columns=["value2"], + ) + tsp.train(ts_data) + + assert tsp.target_columns == ["value2"] + + +def test_get_datasets_without_targets(ts_data): + ts_data = ts_data.drop(columns=["id", "id2"]) + tsp = TimeSeriesPreprocessor( + timestamp_column="timestamp", + prediction_length=2, + context_length=5, + ) + + train, _, _ = tsp.get_datasets(ts_data, split_config={"train": 0.7, "test": 0.2}) + + train.datasets[0].target_columns == ["value1", "value2"] + + +def test_id_columns_and_scaling_id_columns(ts_data_runs): + df = ts_data_runs + + tsp = TimeSeriesPreprocessor( + timestamp_column="timestamp", + prediction_length=2, + context_length=5, + id_columns=["asset_id", "run_id"], + scaling_id_columns=["asset_id"], + target_columns=["value1"], + scaling=True, + ) + + ds_train, ds_valid, ds_test = tsp.get_datasets(df, split_config={"train": 0.7, "test": 0.2}) + + assert len(tsp.target_scaler_dict) == 2 + assert len(ds_train.datasets) == 4 diff --git a/tests/toolkit/test_util.py b/tests/toolkit/test_util.py index 9c01321b..c2ff1169 100644 --- a/tests/toolkit/test_util.py +++ b/tests/toolkit/test_util.py @@ -1,8 +1,9 @@ """Tests for util functions""" +import pandas as pd import pytest -from tsfm_public.toolkit.util import get_split_params +from tsfm_public.toolkit.util import get_split_params, train_test_split split_cases = [ @@ -24,3 +25,27 @@ def test_get_split_params(left_arg, right_arg, expected): split_config, split_function = get_split_params({"train": [left_arg, right_arg], "valid": [0, 1], "test": [0, 1]}) assert split_function["train"].__name__ == expected + + +def test_train_test_split(): + n = 100 + df = pd.DataFrame({"date": range(n), "value": range(n)}) + + train, valid, test = train_test_split(df, train=0.7, test=0.2) + + assert len(train) == int(n * 0.7) + assert len(test) == int(n * 0.2) + valid_len_100 = n - int(n * 0.7) - int(n * 0.2) + assert len(valid) == valid_len_100 + + n = 101 + df = pd.DataFrame({"date": range(n), "value": range(n)}) + + train, valid, test = train_test_split(df, train=0.7, test=0.2) + + assert len(train) == int(n * 0.7) + assert len(test) == int(n * 0.2) + valid_len_101 = n - int(n * 0.7) - int(n * 0.2) + assert len(valid) == valid_len_101 + + assert valid_len_100 + 1 == valid_len_101 diff --git a/tsfm_public/toolkit/time_series_forecasting_pipeline.py b/tsfm_public/toolkit/time_series_forecasting_pipeline.py index 7c010f88..69d3ce4a 100644 --- a/tsfm_public/toolkit/time_series_forecasting_pipeline.py +++ b/tsfm_public/toolkit/time_series_forecasting_pipeline.py @@ -35,9 +35,12 @@ build_pipeline_init_args(has_tokenizer=False, has_feature_extractor=True, has_image_processor=False) ) class TimeSeriesForecastingPipeline(Pipeline): - """Hugging Face Pipeline for Time Series Forecasting""" + """Hugging Face Pipeline for Time Series Forecasting - # has_feature_extractor means we can pass feature_extractor=TimeSeriesPreprocessor + feature_extractor (TimeSeriesPreprocessor): A time series preprpocessor object that specifies how the time + series should be prepared. If this is provided, and of the other options below will be automatically + populated from this instance. + """ def __init__( self, @@ -128,40 +131,54 @@ def __call__( produces predictions. Args: - time_series (Union["pandas.DataFrame", str]): A pandas dataframe or a referce to a location - from where a pandas datarame can be loaded containing the time series on which to perform inference. + time_series (Union["pandas.DataFrame", str]): A pandas dataframe containing the time series on + which to perform inference. - future_time_series (Union["pandas.DataFrame", str]): A pandas dataframe or a referce to a location - from where a pandas datarame can be loaded containing future values, i.e., exogenous or supporting features - which are known in advance. + Keyword arguments: + future_time_series (Union["pandas.DataFrame", str]): A pandas dataframe containing future values, + i.e., exogenous or supporting features which are known in advance. - To do: describe batch vs. single and the need for future_time_series + feature_extractor (TimeSeriesPreprocessor): A time series preprpocessor object that specifies how the time + series should be prepared. If this is provided, and of the other options below will be automatically + populated from this instance. + timestamp_column (str): The name of the column containing the timestamp of the time series. - kwargs + id_columns (List[str]): List of column names which identify different time series in a multi-time series input. - future_time_series: Optional[Union["pandas.DataFrame", str]] = None, - prediction_length - context_length + target_columns (List[str]): List of column names which identify the target channels in the input, these are the + columns that will be forecasted. - timestamp_column (str): the column containing the date / timestamp - id_columns (List[str]): the list of columns containing ID information. If no ids are present, pass []. + observable_columns (List[str]): List of column names which identify the observable channels in the input. + Observable channels are channels which we have knowledge about in the past and future. For example, weather + conditions such as temperature or precipitation may be known or estimated in the future, but cannot be + changed. - "target_columns", - "observable_columns", - "control_columns", - "conditional_columns", - "static_categorical_columns", + control_columns (List[str]): List of column names which identify the control channels in the input. Control + channels are similar to observable channels, except that future values may be controlled. For example, discount + percentage of a particular product is known and controllable in the future. + conditional_columns (List[str]): List of column names which identify the conditional channels in the input. + Conditional channels are channels which we know in the past, but do not know in the future. - # OLD - input_columns (List[str]): the columns that are used as to create the inputs to the forecasting model. - These values are used to select data in the input dataframe. - output_columns (List[str]): the column names that are used to label the outputs of the forecasting model. - If omitted, it is assumed that the model will forecast values for all the input columns. + static_categorical_columns (List[str]): List of column names which identify categorical-valued channels in the input + which are fixed over time. + freq (str): A freqency indicator for the given `timestamp_column`. See + https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#period-aliases for a description of the + allowed values. If not provided, we will attempt to infer it from the data. If not provided, frequency will be + inferred from `timestamp_column`. - Return: + prediction_length (int): The length of the desired forecast. Currently, this value must not exceed the maximum value + suported by the model. If not specified, the maximum value supported by the model is used. + + context_length (int): Specifies the length of the context windows extracted from the historical data for feeding into + the model. + + explode_forecasts (bool): If true, forecasts are returned one value per row of the pandas dataframe. If false, the + forecast over the prediction length will be contained as a list in a single row of the pandas dataframe. + + Return (pandas dataframe): A new pandas dataframe containing the forecasts. Each row will contain the id, timestamp, the original input feature values and the output forecast for each input column. The output forecast is a list containing all the values over the prediction horizon. @@ -326,4 +343,9 @@ def postprocess(self, input, **kwargs): cols_ordered.extend([c for c in cols if c not in cols_ordered]) out = out[cols_ordered] + + # inverse scale if we have a feature extractor + if self.feature_extractor is not None: + out = self.feature_extractor.inverse_scale_targets(out, suffix="_prediction") + return out diff --git a/tsfm_public/toolkit/time_series_preprocessor.py b/tsfm_public/toolkit/time_series_preprocessor.py index 89dbe279..b88dcad7 100644 --- a/tsfm_public/toolkit/time_series_preprocessor.py +++ b/tsfm_public/toolkit/time_series_preprocessor.py @@ -2,11 +2,11 @@ # """Preprocessor for time series data preparation""" +import copy import datetime import enum import json from collections import defaultdict -from datetime import timedelta from typing import Any, Dict, Generator, List, Optional, Tuple, Union from warnings import warn @@ -116,10 +116,11 @@ def __init__( scaling: bool = False, # scale_outputs: bool = False, scaler_type: ScalerType = ScalerType.STANDARD.value, + scaling_id_columns: Optional[List[str]] = None, encode_categorical: bool = True, time_series_task: str = TimeSeriesTask.FORECASTING.value, frequency_mapping: Dict[str, int] = DEFAULT_FREQUENCY_MAPPING, - freq: Optional[Union[int, float, timedelta, pd.Timedelta, str]] = None, + freq: Optional[Union[int, str]] = None, **kwargs, ): # note base class __init__ methods sets all arguments as attributes @@ -127,6 +128,9 @@ def __init__( if not isinstance(id_columns, list): raise ValueError(f"Invalid argument provided for `id_columns`: {id_columns}") + if isinstance(timestamp_column, list): + raise ValueError(f"`timestamp_column` should not be a list, received: {timestamp_column}") + self.id_columns = id_columns self.timestamp_column = timestamp_column self.target_columns = list(target_columns) @@ -142,6 +146,7 @@ def __init__( self.time_series_task = time_series_task # self.scale_outputs = scale_outputs self.scaler_type = scaler_type + self.scaling_id_columns = scaling_id_columns if scaling_id_columns is not None else copy.copy(id_columns) # we maintain two scalers per time series to facilitate inverse scaling of the targets self.scaler_dict = {} @@ -152,7 +157,7 @@ def __init__( kwargs["processor_class"] = self.__class__.__name__ - self._validate_columns() + # self._validate_columns() super().__init__(**kwargs) @@ -351,8 +356,10 @@ def _get_groups( Yields: Generator[Any, pd.DataFrame]: Group name and resulting pandas dataframe for the group. """ - if self.id_columns: - group_by_columns = self.id_columns if len(self.id_columns) > 1 else self.id_columns[0] + if self.scaling_id_columns: + group_by_columns = ( + self.scaling_id_columns if len(self.scaling_id_columns) > 1 else self.scaling_id_columns[0] + ) else: group_by_columns = INTERNAL_ID_COLUMN @@ -463,6 +470,19 @@ def _check_dataset(self, dataset: Union[Dataset, pd.DataFrame]): if dataset is None or len(dataset) == 0: raise ValueError("Input dataset must not be null or zero length.") + def _set_targets(self, dataset: pd.DataFrame) -> None: + if self.target_columns == []: + skip_columns = copy.copy(self.id_columns) + if self.timestamp_column: + skip_columns.append(self.timestamp_column) + + skip_columns.extend(self.observable_columns) + skip_columns.extend(self.control_columns) + skip_columns.extend(self.conditional_columns) + skip_columns.extend(self.static_categorical_columns) + + self.target_columns = [c for c in dataset.columns.to_list() if c not in skip_columns] + def _estimate_frequency(self, df: pd.DataFrame): if self.timestamp_column: if self.id_columns: @@ -474,6 +494,10 @@ def _estimate_frequency(self, df: pd.DataFrame): # to do: make more robust self.freq = df_subset[self.timestamp_column].iloc[-1] - df_subset[self.timestamp_column].iloc[-2] + + if not isinstance(self.freq, (str, int)): + self.freq = str(self.freq) + else: # no timestamp, assume sequential count? self.freq = 1 @@ -494,8 +518,9 @@ def train( """ self._check_dataset(dataset) - df = self._standardize_dataframe(dataset) + self._set_targets(df) + self._validate_columns() if self.freq is None: self._estimate_frequency(df) @@ -508,27 +533,51 @@ def train( return self - def inverse_scale_targets(self, dataset: Union[Dataset, pd.DataFrame]) -> Dataset: + def inverse_scale_targets( + self, dataset: Union[Dataset, pd.DataFrame], suffix: Optional[str] = None + ) -> Union[Dataset, pd.DataFrame]: + self._check_dataset(dataset) df = self._standardize_dataframe(dataset) - if not self.scaling or len(self.target_scaler_dict) == 0: + if not self.scaling: + return dataset + + if len(self.target_scaler_dict) == 0: # trying to inverse scale but this preprocessor is not set up for scaling raise RuntimeError( - "Attempt to perform inverse scaling, but time series preprocess is not configured for scaling or scaler has not yet been trained. Please run the `train` method first." + "Attempt to perform inverse scaling, but time series preprocessor has not yet been trained. Please run the `train` method first." ) cols_to_scale = self.target_columns + if suffix is not None: + cols_to_scale = [f"{c}{suffix}" for c in cols_to_scale] + + col_has_list = [df[c].dtype == np.dtype("O") for c in cols_to_scale] + + def explode_row(df_row, name, columns): + df = pd.DataFrame(df_row[columns].to_dict()) + inv_scale = self.target_scaler_dict[name].inverse_transform(df) + df_out = df_row.copy() + for idx, c in enumerate(columns): + df_out[c] = inv_scale[:, idx] + return df_out def inverse_scale_func(grp, id_columns): if isinstance(id_columns, list): name = tuple(grp.iloc[0][id_columns].tolist()) else: name = grp.iloc[0][id_columns] - grp[cols_to_scale] = self.target_scaler_dict[name].inverse_transform(grp[cols_to_scale]) + + if not np.any(col_has_list): + grp[cols_to_scale] = self.target_scaler_dict[name].inverse_transform(grp[cols_to_scale]) + else: + grp[cols_to_scale] = grp[cols_to_scale].apply( + lambda x: explode_row(x, name, cols_to_scale), axis="columns" + ) return grp - if self.id_columns: - id_columns = self.id_columns if len(self.id_columns) > 1 else self.id_columns[0] + if self.scaling_id_columns: + id_columns = self.scaling_id_columns if len(self.scaling_id_columns) > 1 else self.scaling_id_columns[0] else: id_columns = INTERNAL_ID_COLUMN @@ -548,7 +597,6 @@ def preprocess( # 2) incremental / batch based processing of datasets to minimize memory impact self._check_dataset(dataset) - df = self._standardize_dataframe(dataset) if self.scaling: @@ -573,8 +621,10 @@ def scale_func(grp, id_columns): return grp - if self.id_columns: - id_columns = self.id_columns if len(self.id_columns) > 1 else self.id_columns[0] + if self.scaling_id_columns: + id_columns = ( + self.scaling_id_columns if len(self.scaling_id_columns) > 1 else self.scaling_id_columns[0] + ) else: id_columns = INTERNAL_ID_COLUMN @@ -595,7 +645,7 @@ def scale_func(grp, id_columns): def get_datasets( self, dataset: Union[Dataset, pd.DataFrame], - split_config: Dict[str, Any], + split_config: Dict[str, Union[List[Union[int, float]], float]], fewshot_fraction: Optional[float] = None, fewshot_location: str = FractionLocation.LAST.value, ) -> Tuple[Any]: @@ -604,17 +654,24 @@ def get_datasets( Args: dataset (Union[Dataset, pd.DataFrame]): Loaded pandas dataframe - split_config (Dict[str, Any]): Dictionary of dictionaries containing - split parameters. For example: + split_config (Dict[str, Union[List[Union[int, float]], float]]): Dictionary of dictionaries containing + split parameters. Two configurations are possible: + 1. Specify train/valid/test indices or relative fractions { train: [0, 50], valid: [50, 70], test: [70, 100] } end value is not inclusive + 2. Specify train/test fractions: + { + train: 0.7 + test: 0.2 + } + A valid split should not be specified directly; the above implies valid = 0.1 + fewshot_fraction (float, optional): When non-null, return this percent of the original training - dataset. This is done to support fewshot fine-tuning. The fraction of data chosen is at the - end of the training dataset. + dataset. This is done to support fewshot fine-tuning. fewshot_location (str): Determines where the fewshot data is chosen. Valid options are "first" and "last" as described in the enum FewshotLocation. Default is to choose the fewshot data at the end of the training dataset (i.e., "last"). @@ -625,11 +682,25 @@ def get_datasets( data = self._standardize_dataframe(dataset) - # get split_params - # split_params = get_split_params(config, self.context_length, len(data)) + if not self.context_length: + raise ValueError("TimeSeriesPreprocessor must be instantiated with non-null context_length") + if not self.prediction_length: + raise ValueError("TimeSeriesPreprocessor must be instantiated with non-null prediction_length") + # get split_params split_params, split_function = get_split_params(split_config, context_length=self.context_length) + # split data + if isinstance(split_function, dict): + train_data = split_function["train"](data, id_columns=self.id_columns, **split_params["train"]) + valid_data = split_function["valid"](data, id_columns=self.id_columns, **split_params["valid"]) + test_data = split_function["test"](data, id_columns=self.id_columns, **split_params["test"]) + else: + train_data, valid_data, test_data = split_function(data, id_columns=self.id_columns, **split_params) + + # data preprocessing + self.train(train_data) + # specify columns column_specifiers = { "id_columns": self.id_columns, @@ -641,14 +712,6 @@ def get_datasets( "static_categorical_columns": self.static_categorical_columns, } - # split data - train_data = split_function["train"](data, id_columns=self.id_columns, **split_params["train"]) - valid_data = split_function["valid"](data, id_columns=self.id_columns, **split_params["valid"]) - test_data = split_function["test"](data, id_columns=self.id_columns, **split_params["test"]) - - # data preprocessing - self.train(train_data) - # handle fewshot operation if fewshot_fraction is not None: if not ((fewshot_fraction <= 1.0) and (fewshot_fraction > 0.0)): @@ -659,6 +722,7 @@ def get_datasets( id_columns=self.id_columns, fraction=fewshot_fraction, location=fewshot_location, + minimum_size=self.context_length, ) params = column_specifiers @@ -695,6 +759,9 @@ def create_timestamps( # more complex logic is required to support all edge cases if isinstance(freq, (pd.Timedelta, datetime.timedelta, str)): + if isinstance(freq, str): + freq = pd._libs.tslibs.timedeltas.Timedelta(freq) + return pd.date_range( last_timestamp, freq=freq, diff --git a/tsfm_public/toolkit/util.py b/tsfm_public/toolkit/util.py index 71359b1a..57b2e844 100644 --- a/tsfm_public/toolkit/util.py +++ b/tsfm_public/toolkit/util.py @@ -158,6 +158,7 @@ def select_by_fixed_fraction( id_columns: Optional[List[str]] = None, fraction: float = 1.0, location: str = FractionLocation.FIRST.value, + minimum_size: Optional[int] = 0, ) -> pd.DataFrame: """Select a portion of a dataset based on a fraction of the data. Fraction can either be located at the start (location = FractionLocation.FIRST) or at the end (location = FractionLocation.LAST) @@ -167,9 +168,10 @@ def select_by_fixed_fraction( id_columns (List[str], optional): Columns which specify the IDs in the dataset. Defaults to None. fraction (float): The fraction to select. location (str): Location of where to select the fraction Defaults to FractionLocation.FIRST.value. + minimum_size (int, optional): Minimum size of the split. Defaults to None. Raises: - ValueError: Raised when the + ValueError: Raised when the fraction is not within the range [0,1]. Returns: pd.DataFrame: Subset of the dataframe. @@ -180,9 +182,7 @@ def select_by_fixed_fraction( if not id_columns: return _split_group_by_fixed_fraction( - df, - fraction=fraction, - location=location, + df, fraction=fraction, location=location, minimum_size=minimum_size ).copy() groups = df.groupby(_get_groupby_columns(id_columns)) @@ -194,12 +194,75 @@ def select_by_fixed_fraction( name=name, fraction=fraction, location=location, + minimum_size=minimum_size, ) ) return pd.concat(result) +def train_test_split( + df: pd.DataFrame, + id_columns: Optional[List[str]] = None, + train: Union[int, float] = 0.7, + test: Union[int, float] = 0.2, + valid_test_offset: int = 0, +): + # to do: add validation + + if not id_columns: + return tuple( + [ + tmp.copy() + for tmp in _split_group_train_test(df, train=train, test=test, valid_test_offset=valid_test_offset) + ] + ) + + groups = df.groupby(_get_groupby_columns(id_columns)) + result = [] + for name, group in groups: + result.append( + _split_group_train_test( + group, + name=name, + train=train, + test=test, + valid_test_offset=valid_test_offset, + ) + ) + + result_train, result_valid, result_test = zip(*result) + return pd.concat(result_train), pd.concat(result_valid), pd.concat(result_test) + + +def _split_group_train_test( + group_df: pd.DataFrame, + name: Optional[str] = None, + train: Union[int, float] = 0.7, + test: Union[int, float] = 0.2, + valid_test_offset: int = 0, +): + l = len(group_df) + + train_size = int(l * train) + test_size = int(l * test) + + valid_size = l - train_size - test_size + + train_df = _split_group_by_index(group_df, name, start_index=0, end_index=train_size) + + valid_df = _split_group_by_index( + group_df, + name, + start_index=train_size - valid_test_offset, + end_index=train_size + valid_size, + ) + + test_df = _split_group_by_index(group_df, name, start_index=train_size + valid_size - valid_test_offset) + + return train_df, valid_df, test_df + + def _get_groupby_columns(id_columns: List[str]) -> Union[List[str], str]: if not isinstance(id_columns, (List)): raise ValueError("id_columns must be a list") @@ -216,6 +279,7 @@ def _split_group_by_index( start_index: Optional[int] = None, end_index: Optional[int] = None, ) -> pd.DataFrame: + """Helper function for splitting by index.""" if start_index and (start_index >= len(group_df)): msg = "Selection would result in an empty time series, please check start_index and time series length" msg = msg + f" (id = {name})" if name else msg @@ -239,6 +303,7 @@ def _split_group_by_fraction( start_offset: Optional[int] = 0, end_fraction: Optional[float] = None, ) -> pd.DataFrame: + """Helper function for splitting by relative fraction.""" length = len(group_df) if start_fraction is not None: @@ -265,9 +330,11 @@ def _split_group_by_fixed_fraction( name: Optional[str] = None, fraction: float = 1.0, location: Optional[str] = None, + minimum_size: Optional[int] = 0, ): + """Helper function for splitting by fixed fraction.""" l = len(group_df) - fraction_size = int(fraction * l) + fraction_size = int(fraction * (l - minimum_size)) + minimum_size if location == FractionLocation.FIRST.value: start_index = 0 @@ -415,15 +482,19 @@ def convert_tsf_to_dataframe( def get_split_params( - split_config: Dict[str, List[Union[int, float]]], + split_config: Dict[str, Union[float, List[Union[int, float]]]], context_length: Optional[int] = None, ) -> Tuple[Dict[str, Dict[str, Union[int, float]]], Dict[str, Callable]]: """Get split parameters Args: - split_config (Dict[str, List[int, float]]): Dictionary containing keys for - train, valid, test. Each value consists of a list of length two, indicating + split_config ( Dict[str, Union[float, List[Union[int, float]]]]): Dictionary containing keys which + define the splits. Two options are possible: + 1. Specifiy train, valid, test. Each value consists of a list of length two, indicating the boundaries of a split. + 2. Specify train, test. Each value consists of a single floating point number specifying the + fraction of data to use. Valid is populated using the remaining data. + context_length (int, optional): Context length, used only when offseting the split so predictions can be made for all elements of split. Defaults to None. @@ -435,23 +506,32 @@ def get_split_params( split_params = {} split_function = {} - for group in ["train", "test", "valid"]: - if ((split_config[group][0] < 1) and (split_config[group][0] != 0)) or (split_config[group][1] < 1): - split_params[group] = { - "start_fraction": split_config[group][0], - "end_fraction": split_config[group][1], - "start_offset": (context_length if (context_length and group != "train") else 0), - } - split_function[group] = select_by_relative_fraction - else: - split_params[group] = { - "start_index": ( - split_config[group][0] - (context_length if (context_length and group != "train") else 0) - ), - "end_index": split_config[group][1], - } - split_function[group] = select_by_index - + if "valid" in split_config: + for group in ["train", "test", "valid"]: + if ((split_config[group][0] < 1) and (split_config[group][0] != 0)) or (split_config[group][1] < 1): + split_params[group] = { + "start_fraction": split_config[group][0], + "end_fraction": split_config[group][1], + "start_offset": (context_length if (context_length and group != "train") else 0), + } + split_function[group] = select_by_relative_fraction + else: + split_params[group] = { + "start_index": ( + split_config[group][0] - (context_length if (context_length and group != "train") else 0) + ), + "end_index": split_config[group][1], + } + split_function[group] = select_by_index + return split_params, split_function + + # no valid, assume train/test split + split_function = train_test_split + split_params = { + "train": split_config["train"], + "test": split_config["test"], + "valid_test_offset": context_length if context_length else 0, + } return split_params, split_function