From fea9b607bf285055743dc4a348188e1f19b47047 Mon Sep 17 00:00:00 2001 From: "Wesley M. Gifford" Date: Thu, 28 Mar 2024 09:51:58 -0400 Subject: [PATCH 01/21] update selection to include minimum Signed-off-by: Wesley M. Gifford --- tsfm_public/toolkit/util.py | 27 +++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/tsfm_public/toolkit/util.py b/tsfm_public/toolkit/util.py index 71359b1a..83852b16 100644 --- a/tsfm_public/toolkit/util.py +++ b/tsfm_public/toolkit/util.py @@ -158,6 +158,7 @@ def select_by_fixed_fraction( id_columns: Optional[List[str]] = None, fraction: float = 1.0, location: str = FractionLocation.FIRST.value, + minimum_size: Optional[int] = 0, ) -> pd.DataFrame: """Select a portion of a dataset based on a fraction of the data. Fraction can either be located at the start (location = FractionLocation.FIRST) or at the end (location = FractionLocation.LAST) @@ -180,9 +181,7 @@ def select_by_fixed_fraction( if not id_columns: return _split_group_by_fixed_fraction( - df, - fraction=fraction, - location=location, + df, fraction=fraction, location=location, minimum_size=minimum_size ).copy() groups = df.groupby(_get_groupby_columns(id_columns)) @@ -194,6 +193,7 @@ def select_by_fixed_fraction( name=name, fraction=fraction, location=location, + minimum_size=minimum_size, ) ) @@ -265,9 +265,28 @@ def _split_group_by_fixed_fraction( name: Optional[str] = None, fraction: float = 1.0, location: Optional[str] = None, + minimum_size: Optional[int] = 0, ): + """_summary_ + Split by a fixed fraction, but ensure a minimum size. Size of the fraction is based on: + + int(fraction * (l-minimum)) + minimum + + Args: + group_df (pd.DataFrame): _description_ + name (Optional[str], optional): _description_. Defaults to None. + fraction (float, optional): _description_. Defaults to 1.0. + location (Optional[str], optional): _description_. Defaults to None. + minimum_size (Optional[int], optional): _description_. Defaults to 0. + + Raises: + ValueError: _description_ + + Returns: + _type_: _description_ + """ l = len(group_df) - fraction_size = int(fraction * l) + fraction_size = int(fraction * (l - minimum_size)) + minimum_size if location == FractionLocation.FIRST.value: start_index = 0 From 39628ad1c7595110f98d90097a8ede419d515ae9 Mon Sep 17 00:00:00 2001 From: "Wesley M. Gifford" Date: Thu, 28 Mar 2024 10:24:43 -0400 Subject: [PATCH 02/21] update docstrings Signed-off-by: Wesley M. Gifford --- tsfm_public/toolkit/util.py | 24 +++++------------------- 1 file changed, 5 insertions(+), 19 deletions(-) diff --git a/tsfm_public/toolkit/util.py b/tsfm_public/toolkit/util.py index 83852b16..c8a434f9 100644 --- a/tsfm_public/toolkit/util.py +++ b/tsfm_public/toolkit/util.py @@ -168,9 +168,10 @@ def select_by_fixed_fraction( id_columns (List[str], optional): Columns which specify the IDs in the dataset. Defaults to None. fraction (float): The fraction to select. location (str): Location of where to select the fraction Defaults to FractionLocation.FIRST.value. + minimum_size (int, optional): Minimum size of the split. Defaults to None. Raises: - ValueError: Raised when the + ValueError: Raised when the fraction is not within the range [0,1]. Returns: pd.DataFrame: Subset of the dataframe. @@ -216,6 +217,7 @@ def _split_group_by_index( start_index: Optional[int] = None, end_index: Optional[int] = None, ) -> pd.DataFrame: + """Helper function for splitting by index.""" if start_index and (start_index >= len(group_df)): msg = "Selection would result in an empty time series, please check start_index and time series length" msg = msg + f" (id = {name})" if name else msg @@ -239,6 +241,7 @@ def _split_group_by_fraction( start_offset: Optional[int] = 0, end_fraction: Optional[float] = None, ) -> pd.DataFrame: + """Helper function for splitting by relative fraction.""" length = len(group_df) if start_fraction is not None: @@ -267,24 +270,7 @@ def _split_group_by_fixed_fraction( location: Optional[str] = None, minimum_size: Optional[int] = 0, ): - """_summary_ - Split by a fixed fraction, but ensure a minimum size. Size of the fraction is based on: - - int(fraction * (l-minimum)) + minimum - - Args: - group_df (pd.DataFrame): _description_ - name (Optional[str], optional): _description_. Defaults to None. - fraction (float, optional): _description_. Defaults to 1.0. - location (Optional[str], optional): _description_. Defaults to None. - minimum_size (Optional[int], optional): _description_. Defaults to 0. - - Raises: - ValueError: _description_ - - Returns: - _type_: _description_ - """ + """Helper function for splitting by fixed fraction.""" l = len(group_df) fraction_size = int(fraction * (l - minimum_size)) + minimum_size From 4905d9e7f7635d6de707d120a07597dca1260512 Mon Sep 17 00:00:00 2001 From: "Wesley M. Gifford" Date: Thu, 28 Mar 2024 10:25:30 -0400 Subject: [PATCH 03/21] pass minimum Signed-off-by: Wesley M. Gifford --- tsfm_public/toolkit/time_series_preprocessor.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tsfm_public/toolkit/time_series_preprocessor.py b/tsfm_public/toolkit/time_series_preprocessor.py index 89dbe279..172a0dc0 100644 --- a/tsfm_public/toolkit/time_series_preprocessor.py +++ b/tsfm_public/toolkit/time_series_preprocessor.py @@ -659,6 +659,7 @@ def get_datasets( id_columns=self.id_columns, fraction=fewshot_fraction, location=fewshot_location, + minimum_size=self.context_length, ) params = column_specifiers From e072c641010fc0a08abade02d55da23b946c413b Mon Sep 17 00:00:00 2001 From: "Wesley M. Gifford" Date: Thu, 28 Mar 2024 11:04:02 -0400 Subject: [PATCH 04/21] update test Signed-off-by: Wesley M. Gifford --- tests/toolkit/test_time_series_preprocessor.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/toolkit/test_time_series_preprocessor.py b/tests/toolkit/test_time_series_preprocessor.py index 73ac6bf6..f5f2c12d 100644 --- a/tests/toolkit/test_time_series_preprocessor.py +++ b/tests/toolkit/test_time_series_preprocessor.py @@ -197,7 +197,9 @@ def test_get_datasets(ts_data): ) # new train length should be 20% of 100, minus the usual for context length and prediction length - fewshot_train_size = int(100 * 0.2) - (tsp.context_length + tsp.prediction_length) + 1 + fewshot_train_size = ( + int((100 - tsp.context_length) * 0.2) + tsp.context_length - (tsp.context_length + tsp.prediction_length) + 1 + ) assert len(train) == fewshot_train_size assert len(valid) == len(test) From 67b1f48466ebefb5f8056967edd56711f912ca4e Mon Sep 17 00:00:00 2001 From: "Wesley M. Gifford" Date: Fri, 29 Mar 2024 09:19:18 -0400 Subject: [PATCH 05/21] add more standard train/test split function Signed-off-by: Wesley M. Gifford --- .../toolkit/test_time_series_preprocessor.py | 23 ++++ tests/toolkit/test_util.py | 27 ++++- .../toolkit/time_series_preprocessor.py | 9 +- tsfm_public/toolkit/util.py | 100 +++++++++++++++--- 4 files changed, 138 insertions(+), 21 deletions(-) diff --git a/tests/toolkit/test_time_series_preprocessor.py b/tests/toolkit/test_time_series_preprocessor.py index f5f2c12d..9f599287 100644 --- a/tests/toolkit/test_time_series_preprocessor.py +++ b/tests/toolkit/test_time_series_preprocessor.py @@ -227,3 +227,26 @@ def test_get_datasets(ts_data): assert len(train) == fewshot_train_size assert len(valid) == len(test) + + # fraction splits + # no id columns, so treat as one big time series + tsp = TimeSeriesPreprocessor( + id_columns=[], + target_columns=["value1", "value2"], + prediction_length=5, + context_length=10, + ) + + train, valid, test = tsp.get_datasets( + ts_data, + split_config={ + "train": 0.7, + "test": 0.2, + }, + ) + + assert len(train) == int(150 * 0.7) - (tsp.context_length + tsp.prediction_length) + 1 + + assert len(test) == int(150 * 0.2) - (tsp.context_length + tsp.prediction_length) + 1 + + assert len(valid) == 150 - int(150 * 0.2) - int(150 * 0.7) - (tsp.context_length + tsp.prediction_length) + 1 diff --git a/tests/toolkit/test_util.py b/tests/toolkit/test_util.py index 9c01321b..c2ff1169 100644 --- a/tests/toolkit/test_util.py +++ b/tests/toolkit/test_util.py @@ -1,8 +1,9 @@ """Tests for util functions""" +import pandas as pd import pytest -from tsfm_public.toolkit.util import get_split_params +from tsfm_public.toolkit.util import get_split_params, train_test_split split_cases = [ @@ -24,3 +25,27 @@ def test_get_split_params(left_arg, right_arg, expected): split_config, split_function = get_split_params({"train": [left_arg, right_arg], "valid": [0, 1], "test": [0, 1]}) assert split_function["train"].__name__ == expected + + +def test_train_test_split(): + n = 100 + df = pd.DataFrame({"date": range(n), "value": range(n)}) + + train, valid, test = train_test_split(df, train=0.7, test=0.2) + + assert len(train) == int(n * 0.7) + assert len(test) == int(n * 0.2) + valid_len_100 = n - int(n * 0.7) - int(n * 0.2) + assert len(valid) == valid_len_100 + + n = 101 + df = pd.DataFrame({"date": range(n), "value": range(n)}) + + train, valid, test = train_test_split(df, train=0.7, test=0.2) + + assert len(train) == int(n * 0.7) + assert len(test) == int(n * 0.2) + valid_len_101 = n - int(n * 0.7) - int(n * 0.2) + assert len(valid) == valid_len_101 + + assert valid_len_100 + 1 == valid_len_101 diff --git a/tsfm_public/toolkit/time_series_preprocessor.py b/tsfm_public/toolkit/time_series_preprocessor.py index 172a0dc0..61aba7db 100644 --- a/tsfm_public/toolkit/time_series_preprocessor.py +++ b/tsfm_public/toolkit/time_series_preprocessor.py @@ -642,9 +642,12 @@ def get_datasets( } # split data - train_data = split_function["train"](data, id_columns=self.id_columns, **split_params["train"]) - valid_data = split_function["valid"](data, id_columns=self.id_columns, **split_params["valid"]) - test_data = split_function["test"](data, id_columns=self.id_columns, **split_params["test"]) + if isinstance(split_function, dict): + train_data = split_function["train"](data, id_columns=self.id_columns, **split_params["train"]) + valid_data = split_function["valid"](data, id_columns=self.id_columns, **split_params["valid"]) + test_data = split_function["test"](data, id_columns=self.id_columns, **split_params["test"]) + else: + train_data, valid_data, test_data = split_function(data, id_columns=self.id_columns, **split_params) # data preprocessing self.train(train_data) diff --git a/tsfm_public/toolkit/util.py b/tsfm_public/toolkit/util.py index c8a434f9..7d2a2612 100644 --- a/tsfm_public/toolkit/util.py +++ b/tsfm_public/toolkit/util.py @@ -201,6 +201,63 @@ def select_by_fixed_fraction( return pd.concat(result) +def train_test_split( + df: pd.DataFrame, + id_columns: Optional[List[str]] = None, + train: Union[int, float] = 0.7, + test: Union[int, float] = 0.2, + valid_test_offset: int = 0, +): + # to do: add validation + + if not id_columns: + return tuple([tmp.copy() for tmp in _split_group_train_test(df, train=train, test=test)]) + + groups = df.groupby(_get_groupby_columns(id_columns)) + result = [] + for name, group in groups: + result.append( + _split_group_train_test( + group, + name=name, + train=train, + test=test, + valid_test_offset=valid_test_offset, + ) + ) + + result_train, result_valid, result_test = zip(**result) + return pd.concat(result_train), pd.concat(result_valid), pd.concat(result_test) + + +def _split_group_train_test( + group_df: pd.DataFrame, + name: Optional[str] = None, + train: Union[int, float] = 0.7, + test: Union[int, float] = 0.2, + valid_test_offset: int = 0, +): + l = len(group_df) + + train_size = int(l * train) + test_size = int(l * test) + + valid_size = l - train_size - test_size + + train_df = _split_group_by_index(group_df, name, start_index=0, end_index=train_size) + + valid_df = _split_group_by_index( + group_df, + name, + start_index=train_size - valid_test_offset, + end_index=train_size + valid_size, + ) + + test_df = _split_group_by_index(group_df, name, start_index=train_size + valid_size - valid_test_offset) + + return train_df, valid_df, test_df + + def _get_groupby_columns(id_columns: List[str]) -> Union[List[str], str]: if not isinstance(id_columns, (List)): raise ValueError("id_columns must be a list") @@ -440,23 +497,32 @@ def get_split_params( split_params = {} split_function = {} - for group in ["train", "test", "valid"]: - if ((split_config[group][0] < 1) and (split_config[group][0] != 0)) or (split_config[group][1] < 1): - split_params[group] = { - "start_fraction": split_config[group][0], - "end_fraction": split_config[group][1], - "start_offset": (context_length if (context_length and group != "train") else 0), - } - split_function[group] = select_by_relative_fraction - else: - split_params[group] = { - "start_index": ( - split_config[group][0] - (context_length if (context_length and group != "train") else 0) - ), - "end_index": split_config[group][1], - } - split_function[group] = select_by_index - + if "valid" in split_config: + for group in ["train", "test", "valid"]: + if ((split_config[group][0] < 1) and (split_config[group][0] != 0)) or (split_config[group][1] < 1): + split_params[group] = { + "start_fraction": split_config[group][0], + "end_fraction": split_config[group][1], + "start_offset": (context_length if (context_length and group != "train") else 0), + } + split_function[group] = select_by_relative_fraction + else: + split_params[group] = { + "start_index": ( + split_config[group][0] - (context_length if (context_length and group != "train") else 0) + ), + "end_index": split_config[group][1], + } + split_function[group] = select_by_index + return split_params, split_function + + # no valid, assume train/test split + split_function = train_test_split + split_params = { + "train": split_config["train"], + "test": split_config["test"], + "valid_test_offset": context_length if context_length else 0, + } return split_params, split_function From b1b981d272f376f07aecc78ecb411e9d06ec65ea Mon Sep 17 00:00:00 2001 From: "Wesley M. Gifford" Date: Fri, 29 Mar 2024 09:49:00 -0400 Subject: [PATCH 06/21] update docstrings Signed-off-by: Wesley M. Gifford --- tsfm_public/toolkit/time_series_preprocessor.py | 17 ++++++++++++----- tsfm_public/toolkit/util.py | 10 +++++++--- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/tsfm_public/toolkit/time_series_preprocessor.py b/tsfm_public/toolkit/time_series_preprocessor.py index 61aba7db..c0c47dc4 100644 --- a/tsfm_public/toolkit/time_series_preprocessor.py +++ b/tsfm_public/toolkit/time_series_preprocessor.py @@ -595,7 +595,7 @@ def scale_func(grp, id_columns): def get_datasets( self, dataset: Union[Dataset, pd.DataFrame], - split_config: Dict[str, Any], + split_config: Dict[str, Union[List[Union[int, float]], float]], fewshot_fraction: Optional[float] = None, fewshot_location: str = FractionLocation.LAST.value, ) -> Tuple[Any]: @@ -604,17 +604,24 @@ def get_datasets( Args: dataset (Union[Dataset, pd.DataFrame]): Loaded pandas dataframe - split_config (Dict[str, Any]): Dictionary of dictionaries containing - split parameters. For example: + split_config (Dict[str, Union[List[Union[int, float]], float]]): Dictionary of dictionaries containing + split parameters. Two configurations are possible: + 1. Specify train/valid/test indices or relative fractions { train: [0, 50], valid: [50, 70], test: [70, 100] } end value is not inclusive + 2. Specify train/test fractions: + { + train: 0.7 + test: 0.2 + } + A valid split should not be specified directly; the above implies valid = 0.1 + fewshot_fraction (float, optional): When non-null, return this percent of the original training - dataset. This is done to support fewshot fine-tuning. The fraction of data chosen is at the - end of the training dataset. + dataset. This is done to support fewshot fine-tuning. fewshot_location (str): Determines where the fewshot data is chosen. Valid options are "first" and "last" as described in the enum FewshotLocation. Default is to choose the fewshot data at the end of the training dataset (i.e., "last"). diff --git a/tsfm_public/toolkit/util.py b/tsfm_public/toolkit/util.py index 7d2a2612..235e979e 100644 --- a/tsfm_public/toolkit/util.py +++ b/tsfm_public/toolkit/util.py @@ -477,15 +477,19 @@ def convert_tsf_to_dataframe( def get_split_params( - split_config: Dict[str, List[Union[int, float]]], + split_config: Dict[str, Union[float, List[Union[int, float]]]], context_length: Optional[int] = None, ) -> Tuple[Dict[str, Dict[str, Union[int, float]]], Dict[str, Callable]]: """Get split parameters Args: - split_config (Dict[str, List[int, float]]): Dictionary containing keys for - train, valid, test. Each value consists of a list of length two, indicating + split_config ( Dict[str, Union[float, List[Union[int, float]]]]): Dictionary containing keys which + define the splits. Two options are possible: + 1. Specifiy train, valid, test. Each value consists of a list of length two, indicating the boundaries of a split. + 2. Specify train, test. Each value consists of a single floating point number specifying the + fraction of data to use. Valid is populated using the remaining data. + context_length (int, optional): Context length, used only when offseting the split so predictions can be made for all elements of split. Defaults to None. From 11509f0dab0c7bf0557577a0a401123f2bb29362 Mon Sep 17 00:00:00 2001 From: "Wesley M. Gifford" Date: Fri, 29 Mar 2024 10:19:46 -0400 Subject: [PATCH 07/21] allow auto-determination of targets Signed-off-by: Wesley M. Gifford --- .../toolkit/test_time_series_preprocessor.py | 34 +++++++++++++++++++ .../toolkit/time_series_preprocessor.py | 19 +++++++++-- 2 files changed, 51 insertions(+), 2 deletions(-) diff --git a/tests/toolkit/test_time_series_preprocessor.py b/tests/toolkit/test_time_series_preprocessor.py index 9f599287..0eac1b5f 100644 --- a/tests/toolkit/test_time_series_preprocessor.py +++ b/tests/toolkit/test_time_series_preprocessor.py @@ -250,3 +250,37 @@ def test_get_datasets(ts_data): assert len(test) == int(150 * 0.2) - (tsp.context_length + tsp.prediction_length) + 1 assert len(valid) == 150 - int(150 * 0.2) - int(150 * 0.7) - (tsp.context_length + tsp.prediction_length) + 1 + + +def test_train_without_targets(ts_data): + # no targets or other columns specified + tsp = TimeSeriesPreprocessor(id_columns=["id", "id2"], timestamp_column="timestamp") + tsp.train(ts_data) + + assert tsp.target_columns == ["value1", "value2"] + + # some other args specified + for arg in [ + "control_columns", + "conditional_columns", + "observable_columns", + "static_categorical_columns", + ]: + tsp = TimeSeriesPreprocessor( + id_columns=["id", "id2"], + timestamp_column="timestamp", + **{arg: ["value2"]}, + ) + tsp.train(ts_data) + + assert tsp.target_columns == ["value1"] + + # test targets honored + tsp = TimeSeriesPreprocessor( + id_columns=["id", "id2"], + timestamp_column="timestamp", + target_columns=["value2"], + ) + tsp.train(ts_data) + + assert tsp.target_columns == ["value2"] diff --git a/tsfm_public/toolkit/time_series_preprocessor.py b/tsfm_public/toolkit/time_series_preprocessor.py index c0c47dc4..5bb2625e 100644 --- a/tsfm_public/toolkit/time_series_preprocessor.py +++ b/tsfm_public/toolkit/time_series_preprocessor.py @@ -2,6 +2,7 @@ # """Preprocessor for time series data preparation""" +import copy import datetime import enum import json @@ -152,7 +153,7 @@ def __init__( kwargs["processor_class"] = self.__class__.__name__ - self._validate_columns() + # self._validate_columns() super().__init__(**kwargs) @@ -463,6 +464,19 @@ def _check_dataset(self, dataset: Union[Dataset, pd.DataFrame]): if dataset is None or len(dataset) == 0: raise ValueError("Input dataset must not be null or zero length.") + def _set_targets(self, dataset: pd.DataFrame) -> None: + if self.target_columns == []: + skip_columns = copy.copy(self.id_columns) + if self.timestamp_column: + skip_columns.append(self.timestamp_column) + + skip_columns.extend(self.observable_columns) + skip_columns.extend(self.control_columns) + skip_columns.extend(self.conditional_columns) + skip_columns.extend(self.static_categorical_columns) + + self.target_columns = [c for c in dataset.columns.to_list() if c not in skip_columns] + def _estimate_frequency(self, df: pd.DataFrame): if self.timestamp_column: if self.id_columns: @@ -494,8 +508,8 @@ def train( """ self._check_dataset(dataset) - df = self._standardize_dataframe(dataset) + self._set_targets(df) if self.freq is None: self._estimate_frequency(df) @@ -509,6 +523,7 @@ def train( return self def inverse_scale_targets(self, dataset: Union[Dataset, pd.DataFrame]) -> Dataset: + self._check_dataset(dataset) df = self._standardize_dataframe(dataset) if not self.scaling or len(self.target_scaler_dict) == 0: From 5d6178ffca2e820b9047b6f650e6d54d980b2ca2 Mon Sep 17 00:00:00 2001 From: "Wesley M. Gifford" Date: Fri, 29 Mar 2024 10:38:37 -0400 Subject: [PATCH 08/21] Add tests, update get_dataset for target columns Signed-off-by: Wesley M. Gifford --- .../toolkit/test_time_series_preprocessor.py | 13 ++++++++ .../toolkit/time_series_preprocessor.py | 31 ++++++++++--------- 2 files changed, 30 insertions(+), 14 deletions(-) diff --git a/tests/toolkit/test_time_series_preprocessor.py b/tests/toolkit/test_time_series_preprocessor.py index 0eac1b5f..57424e31 100644 --- a/tests/toolkit/test_time_series_preprocessor.py +++ b/tests/toolkit/test_time_series_preprocessor.py @@ -284,3 +284,16 @@ def test_train_without_targets(ts_data): tsp.train(ts_data) assert tsp.target_columns == ["value2"] + + +def test_get_datasets_without_targets(ts_data): + ts_data = ts_data.drop(columns=["id", "id2"]) + tsp = TimeSeriesPreprocessor( + timestamp_column="timestamp", + prediction_length=2, + context_length=5, + ) + + train, _, _ = tsp.get_datasets(ts_data, split_config={"train": 0.7, "test": 0.2}) + + train.datasets[0].target_columns == ["value1", "value2"] diff --git a/tsfm_public/toolkit/time_series_preprocessor.py b/tsfm_public/toolkit/time_series_preprocessor.py index 5bb2625e..ce801755 100644 --- a/tsfm_public/toolkit/time_series_preprocessor.py +++ b/tsfm_public/toolkit/time_series_preprocessor.py @@ -510,6 +510,7 @@ def train( self._check_dataset(dataset) df = self._standardize_dataframe(dataset) self._set_targets(df) + self._validate_columns() if self.freq is None: self._estimate_frequency(df) @@ -563,7 +564,6 @@ def preprocess( # 2) incremental / batch based processing of datasets to minimize memory impact self._check_dataset(dataset) - df = self._standardize_dataframe(dataset) if self.scaling: @@ -647,22 +647,14 @@ def get_datasets( data = self._standardize_dataframe(dataset) - # get split_params - # split_params = get_split_params(config, self.context_length, len(data)) + if not self.context_length: + raise ValueError("TimeSeriesPreprocessor must be instantiated with non-null context_length") + if not self.prediction_length: + raise ValueError("TimeSeriesPreprocessor must be instantiated with non-null prediction_length") + # get split_params split_params, split_function = get_split_params(split_config, context_length=self.context_length) - # specify columns - column_specifiers = { - "id_columns": self.id_columns, - "timestamp_column": self.timestamp_column, - "target_columns": self.target_columns, - "observable_columns": self.observable_columns, - "control_columns": self.control_columns, - "conditional_columns": self.conditional_columns, - "static_categorical_columns": self.static_categorical_columns, - } - # split data if isinstance(split_function, dict): train_data = split_function["train"](data, id_columns=self.id_columns, **split_params["train"]) @@ -674,6 +666,17 @@ def get_datasets( # data preprocessing self.train(train_data) + # specify columns + column_specifiers = { + "id_columns": self.id_columns, + "timestamp_column": self.timestamp_column, + "target_columns": self.target_columns, + "observable_columns": self.observable_columns, + "control_columns": self.control_columns, + "conditional_columns": self.conditional_columns, + "static_categorical_columns": self.static_categorical_columns, + } + # handle fewshot operation if fewshot_fraction is not None: if not ((fewshot_fraction <= 1.0) and (fewshot_fraction > 0.0)): From 4176fa35fd8261b4f52b83042417dca31fb319fc Mon Sep 17 00:00:00 2001 From: "Wesley M. Gifford" Date: Fri, 29 Mar 2024 11:04:06 -0400 Subject: [PATCH 09/21] pass parameters Signed-off-by: Wesley M. Gifford --- tsfm_public/toolkit/util.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tsfm_public/toolkit/util.py b/tsfm_public/toolkit/util.py index 235e979e..643b16df 100644 --- a/tsfm_public/toolkit/util.py +++ b/tsfm_public/toolkit/util.py @@ -211,7 +211,12 @@ def train_test_split( # to do: add validation if not id_columns: - return tuple([tmp.copy() for tmp in _split_group_train_test(df, train=train, test=test)]) + return tuple( + [ + tmp.copy() + for tmp in _split_group_train_test(df, train=train, test=test, valid_test_offset=valid_test_offset) + ] + ) groups = df.groupby(_get_groupby_columns(id_columns)) result = [] From 869c1f22f06247fa2e7a2ef12ed742a43f84d7fa Mon Sep 17 00:00:00 2001 From: "Wesley M. Gifford" Date: Fri, 29 Mar 2024 11:06:58 -0400 Subject: [PATCH 10/21] update tests Signed-off-by: Wesley M. Gifford --- hacking/datasets_from_tsp.py | 21 +++++++++++++++++++ .../toolkit/test_time_series_preprocessor.py | 4 ++-- 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/hacking/datasets_from_tsp.py b/hacking/datasets_from_tsp.py index bb4312e7..2f5be66f 100644 --- a/hacking/datasets_from_tsp.py +++ b/hacking/datasets_from_tsp.py @@ -39,3 +39,24 @@ split_config = {"train": [0, 0.7], "valid": [0.7, 0.9], "test": [0.9, 1]} train, valid, test = tsp.get_datasets(df, split_config) + + +# %% + +df = pd.read_csv("/Users/wmgifford/Downloads/weather.csv", parse_dates=["date"]) + +tsp = TimeSeriesPreprocessor( + timestamp_column="date", + id_columns=[], + target_columns=[], + prediction_length=96, + context_length=512, +) + +a, b, c = tsp.get_datasets( + df, + split_config={ + "train": 0.7, + "test": 0.2, + }, +) diff --git a/tests/toolkit/test_time_series_preprocessor.py b/tests/toolkit/test_time_series_preprocessor.py index 57424e31..86c33d55 100644 --- a/tests/toolkit/test_time_series_preprocessor.py +++ b/tests/toolkit/test_time_series_preprocessor.py @@ -247,9 +247,9 @@ def test_get_datasets(ts_data): assert len(train) == int(150 * 0.7) - (tsp.context_length + tsp.prediction_length) + 1 - assert len(test) == int(150 * 0.2) - (tsp.context_length + tsp.prediction_length) + 1 + assert len(test) == int(150 * 0.2) - tsp.prediction_length + 1 - assert len(valid) == 150 - int(150 * 0.2) - int(150 * 0.7) - (tsp.context_length + tsp.prediction_length) + 1 + assert len(valid) == 150 - int(150 * 0.2) - int(150 * 0.7) - tsp.prediction_length + 1 def test_train_without_targets(ts_data): From 67ff1b3524094c373a2fbaca2afbcb04df8f183c Mon Sep 17 00:00:00 2001 From: "Wesley M. Gifford" Date: Fri, 29 Mar 2024 13:55:04 -0400 Subject: [PATCH 11/21] keep freq as string or int Signed-off-by: Wesley M. Gifford --- tsfm_public/toolkit/time_series_preprocessor.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tsfm_public/toolkit/time_series_preprocessor.py b/tsfm_public/toolkit/time_series_preprocessor.py index ce801755..369a9250 100644 --- a/tsfm_public/toolkit/time_series_preprocessor.py +++ b/tsfm_public/toolkit/time_series_preprocessor.py @@ -7,7 +7,6 @@ import enum import json from collections import defaultdict -from datetime import timedelta from typing import Any, Dict, Generator, List, Optional, Tuple, Union from warnings import warn @@ -120,7 +119,7 @@ def __init__( encode_categorical: bool = True, time_series_task: str = TimeSeriesTask.FORECASTING.value, frequency_mapping: Dict[str, int] = DEFAULT_FREQUENCY_MAPPING, - freq: Optional[Union[int, float, timedelta, pd.Timedelta, str]] = None, + freq: Optional[Union[int, str]] = None, **kwargs, ): # note base class __init__ methods sets all arguments as attributes @@ -488,6 +487,10 @@ def _estimate_frequency(self, df: pd.DataFrame): # to do: make more robust self.freq = df_subset[self.timestamp_column].iloc[-1] - df_subset[self.timestamp_column].iloc[-2] + + if not isinstance(self.freq, (str, int)): + self.freq = str(self.freq) + else: # no timestamp, assume sequential count? self.freq = 1 @@ -724,6 +727,9 @@ def create_timestamps( # more complex logic is required to support all edge cases if isinstance(freq, (pd.Timedelta, datetime.timedelta, str)): + if isinstance(freq, str): + freq = pd._libs.tslibs.timedeltas.Timedelta(freq) + return pd.date_range( last_timestamp, freq=freq, From a008fb33ec946e4630b6249875dac78c2f164c55 Mon Sep 17 00:00:00 2001 From: "Wesley M. Gifford" Date: Fri, 29 Mar 2024 14:14:28 -0400 Subject: [PATCH 12/21] check timestamp_column Signed-off-by: Wesley M. Gifford --- tsfm_public/toolkit/time_series_preprocessor.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tsfm_public/toolkit/time_series_preprocessor.py b/tsfm_public/toolkit/time_series_preprocessor.py index 369a9250..c0b9345d 100644 --- a/tsfm_public/toolkit/time_series_preprocessor.py +++ b/tsfm_public/toolkit/time_series_preprocessor.py @@ -127,6 +127,9 @@ def __init__( if not isinstance(id_columns, list): raise ValueError(f"Invalid argument provided for `id_columns`: {id_columns}") + if isinstance(timestamp_column, list): + raise ValueError(f"`timestamp_column` should not be a list, received: {timestamp_column}") + self.id_columns = id_columns self.timestamp_column = timestamp_column self.target_columns = list(target_columns) From e1f28a46233d5d96117d2cdb6302d0d41ee81856 Mon Sep 17 00:00:00 2001 From: "Wesley M. Gifford" Date: Fri, 29 Mar 2024 16:43:34 -0400 Subject: [PATCH 13/21] be less strict Signed-off-by: Wesley M. Gifford --- tsfm_public/toolkit/time_series_preprocessor.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tsfm_public/toolkit/time_series_preprocessor.py b/tsfm_public/toolkit/time_series_preprocessor.py index c0b9345d..6f5451a6 100644 --- a/tsfm_public/toolkit/time_series_preprocessor.py +++ b/tsfm_public/toolkit/time_series_preprocessor.py @@ -529,14 +529,17 @@ def train( return self - def inverse_scale_targets(self, dataset: Union[Dataset, pd.DataFrame]) -> Dataset: + def inverse_scale_targets(self, dataset: Union[Dataset, pd.DataFrame]) -> Union[Dataset, pd.DataFrame]: self._check_dataset(dataset) df = self._standardize_dataframe(dataset) - if not self.scaling or len(self.target_scaler_dict) == 0: + if not self.scaling: + return df + + if len(self.target_scaler_dict) == 0: # trying to inverse scale but this preprocessor is not set up for scaling raise RuntimeError( - "Attempt to perform inverse scaling, but time series preprocess is not configured for scaling or scaler has not yet been trained. Please run the `train` method first." + "Attempt to perform inverse scaling, but time series preprocessor has not yet been trained. Please run the `train` method first." ) cols_to_scale = self.target_columns From 1d70242dca4e24b015f714660dd459261bfdbe80 Mon Sep 17 00:00:00 2001 From: "Wesley M. Gifford" Date: Mon, 1 Apr 2024 17:03:01 -0400 Subject: [PATCH 14/21] add scaling_id_columns and test Signed-off-by: Wesley M. Gifford --- tests/toolkit/conftest.py | 13 +++++++++++++ .../toolkit/test_time_series_preprocessor.py | 19 +++++++++++++++++++ .../toolkit/time_series_preprocessor.py | 18 ++++++++++++------ 3 files changed, 44 insertions(+), 6 deletions(-) diff --git a/tests/toolkit/conftest.py b/tests/toolkit/conftest.py index aeb7c883..fc5a0960 100644 --- a/tests/toolkit/conftest.py +++ b/tests/toolkit/conftest.py @@ -35,3 +35,16 @@ def sample_data(): } ) return df + + +@pytest.fixture(scope="module") +def ts_data_runs(): + df = pd.DataFrame( + { + "run_id": nreps(["1", "2", "3", "4"], 50), + "asset_id": nreps(["foo", "bar", "foo", "bar"], 50), + "timestamp": [datetime(2021, 1, 1) + timedelta(days=i) for i in range(50)] * 4, + "value1": range(200), + } + ) + return df diff --git a/tests/toolkit/test_time_series_preprocessor.py b/tests/toolkit/test_time_series_preprocessor.py index 86c33d55..6a113dfe 100644 --- a/tests/toolkit/test_time_series_preprocessor.py +++ b/tests/toolkit/test_time_series_preprocessor.py @@ -297,3 +297,22 @@ def test_get_datasets_without_targets(ts_data): train, _, _ = tsp.get_datasets(ts_data, split_config={"train": 0.7, "test": 0.2}) train.datasets[0].target_columns == ["value1", "value2"] + + +def test_id_columns_and_scaling_id_columns(ts_data_runs): + df = ts_data_runs + + tsp = TimeSeriesPreprocessor( + timestamp_column="timestamp", + prediction_length=2, + context_length=5, + id_columns=["asset_id", "run_id"], + scaling_id_columns=["asset_id"], + target_columns=["value1"], + scaling=True, + ) + + ds_train, ds_valid, ds_test = tsp.get_datasets(df, split_config={"train": 0.7, "test": 0.2}) + + assert len(tsp.target_scaler_dict) == 2 + assert len(ds_train.datasets) == 4 diff --git a/tsfm_public/toolkit/time_series_preprocessor.py b/tsfm_public/toolkit/time_series_preprocessor.py index 6f5451a6..e12bc1cb 100644 --- a/tsfm_public/toolkit/time_series_preprocessor.py +++ b/tsfm_public/toolkit/time_series_preprocessor.py @@ -116,6 +116,7 @@ def __init__( scaling: bool = False, # scale_outputs: bool = False, scaler_type: ScalerType = ScalerType.STANDARD.value, + scaling_id_columns: Optional[List[str]] = None, encode_categorical: bool = True, time_series_task: str = TimeSeriesTask.FORECASTING.value, frequency_mapping: Dict[str, int] = DEFAULT_FREQUENCY_MAPPING, @@ -145,6 +146,7 @@ def __init__( self.time_series_task = time_series_task # self.scale_outputs = scale_outputs self.scaler_type = scaler_type + self.scaling_id_columns = scaling_id_columns if scaling_id_columns is not None else copy.copy(id_columns) # we maintain two scalers per time series to facilitate inverse scaling of the targets self.scaler_dict = {} @@ -354,8 +356,10 @@ def _get_groups( Yields: Generator[Any, pd.DataFrame]: Group name and resulting pandas dataframe for the group. """ - if self.id_columns: - group_by_columns = self.id_columns if len(self.id_columns) > 1 else self.id_columns[0] + if self.scaling_id_columns: + group_by_columns = ( + self.scaling_id_columns if len(self.scaling_id_columns) > 1 else self.scaling_id_columns[0] + ) else: group_by_columns = INTERNAL_ID_COLUMN @@ -552,8 +556,8 @@ def inverse_scale_func(grp, id_columns): grp[cols_to_scale] = self.target_scaler_dict[name].inverse_transform(grp[cols_to_scale]) return grp - if self.id_columns: - id_columns = self.id_columns if len(self.id_columns) > 1 else self.id_columns[0] + if self.scaling_id_columns: + id_columns = self.scaling_id_columns if len(self.scaling_id_columns) > 1 else self.scaling_id_columns[0] else: id_columns = INTERNAL_ID_COLUMN @@ -597,8 +601,10 @@ def scale_func(grp, id_columns): return grp - if self.id_columns: - id_columns = self.id_columns if len(self.id_columns) > 1 else self.id_columns[0] + if self.scaling_id_columns: + id_columns = ( + self.scaling_id_columns if len(self.scaling_id_columns) > 1 else self.scaling_id_columns[0] + ) else: id_columns = INTERNAL_ID_COLUMN From 4f209380a0b1b17e9dd4764cf76c23fc6d2b536d Mon Sep 17 00:00:00 2001 From: "Wesley M. Gifford" Date: Mon, 1 Apr 2024 17:03:30 -0400 Subject: [PATCH 15/21] fix unzip bug Signed-off-by: Wesley M. Gifford --- tsfm_public/toolkit/util.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tsfm_public/toolkit/util.py b/tsfm_public/toolkit/util.py index 643b16df..57b2e844 100644 --- a/tsfm_public/toolkit/util.py +++ b/tsfm_public/toolkit/util.py @@ -231,7 +231,7 @@ def train_test_split( ) ) - result_train, result_valid, result_test = zip(**result) + result_train, result_valid, result_test = zip(*result) return pd.concat(result_train), pd.concat(result_valid), pd.concat(result_test) From f94dbcf3518ac65428b52394d2077be5c2b6618d Mon Sep 17 00:00:00 2001 From: "Wesley M. Gifford" Date: Tue, 2 Apr 2024 09:13:55 -0400 Subject: [PATCH 16/21] allow sufix during inverse scaling, add tests Signed-off-by: Wesley M. Gifford --- .../toolkit/test_time_series_preprocessor.py | 42 +++++++++++++++++++ .../toolkit/time_series_preprocessor.py | 8 +++- 2 files changed, 48 insertions(+), 2 deletions(-) diff --git a/tests/toolkit/test_time_series_preprocessor.py b/tests/toolkit/test_time_series_preprocessor.py index 6a113dfe..3d56e400 100644 --- a/tests/toolkit/test_time_series_preprocessor.py +++ b/tests/toolkit/test_time_series_preprocessor.py @@ -84,6 +84,48 @@ def test_time_series_preprocessor_encodes(sample_data): assert sample_prep[c].dtype == float +def test_time_series_preprocessor_scales(ts_data): + df = ts_data + + tsp = TimeSeriesPreprocessor( + timestamp_column="timestamp", + prediction_length=2, + context_length=5, + id_columns=["id", "id2"], + target_columns=["value1", "value2"], + scaling=True, + ) + + tsp.train(df) + + # check scaled result + out = tsp.preprocess(df) + assert np.allclose(out.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.mean(x)), 0.0) + assert np.allclose(out.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.std(x)), 1.0) + + # check inverse scale result + out_inv = tsp.inverse_scale_targets(out) + assert np.all( + out_inv.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.mean(x)) + == df.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.mean(x)) + ) + assert np.all( + out_inv.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.std(x)) + == df.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.std(x)) + ) + + # check inverse scale result, with suffix + + suffix = "_foo" + targets_suffix = [f"{c}{suffix}" for c in tsp.target_columns] + out.columns = [f"{c}{suffix}" if c in tsp.target_columns else c for c in out.columns] + out_inv = tsp.inverse_scale_targets(out, suffix=suffix) + assert np.all( + out_inv.groupby(tsp.id_columns)[targets_suffix].apply(lambda x: np.mean(x)) + == df.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.mean(x)) + ) + + def test_augment_time_series(ts_data): periods = 5 a = extend_time_series(ts_data, timestamp_column="timestamp", grouping_columns=["id"], periods=periods) diff --git a/tsfm_public/toolkit/time_series_preprocessor.py b/tsfm_public/toolkit/time_series_preprocessor.py index e12bc1cb..e31ca5a8 100644 --- a/tsfm_public/toolkit/time_series_preprocessor.py +++ b/tsfm_public/toolkit/time_series_preprocessor.py @@ -533,12 +533,14 @@ def train( return self - def inverse_scale_targets(self, dataset: Union[Dataset, pd.DataFrame]) -> Union[Dataset, pd.DataFrame]: + def inverse_scale_targets( + self, dataset: Union[Dataset, pd.DataFrame], suffix: Optional[str] = None + ) -> Union[Dataset, pd.DataFrame]: self._check_dataset(dataset) df = self._standardize_dataframe(dataset) if not self.scaling: - return df + return dataset if len(self.target_scaler_dict) == 0: # trying to inverse scale but this preprocessor is not set up for scaling @@ -547,6 +549,8 @@ def inverse_scale_targets(self, dataset: Union[Dataset, pd.DataFrame]) -> Union[ ) cols_to_scale = self.target_columns + if suffix is not None: + cols_to_scale = [f"{c}{suffix}" for c in cols_to_scale] def inverse_scale_func(grp, id_columns): if isinstance(id_columns, list): From 2c147ec0e9cf6ede82bd14e7be1938e80d2648ed Mon Sep 17 00:00:00 2001 From: "Wesley M. Gifford" Date: Tue, 2 Apr 2024 18:45:06 -0400 Subject: [PATCH 17/21] improve inverse scaling, wip Signed-off-by: Wesley M. Gifford --- tests/toolkit/conftest.py | 8 +- .../toolkit/test_time_series_preprocessor.py | 69 +++++++-- .../time_series_forecasting_pipeline.py | 5 + .../toolkit/time_series_preprocessor.py | 131 ++++++++++++++---- 4 files changed, 170 insertions(+), 43 deletions(-) diff --git a/tests/toolkit/conftest.py b/tests/toolkit/conftest.py index fc5a0960..7a143e5b 100644 --- a/tests/toolkit/conftest.py +++ b/tests/toolkit/conftest.py @@ -16,9 +16,10 @@ def ts_data(): { "id": nreps(["A", "B", "C"], 50), "id2": nreps(["XX", "YY", "ZZ"], 50), - "timestamp": [datetime(2021, 1, 1) + timedelta(days=i) for i in range(50)] * 3, + "timestamp": [datetime(2021, 1, 1) + timedelta(days=i) for i in range(50)] + * 3, "value1": range(150), - "value2": np.arange(150) / 3 + 10, + "value2": np.arange(150) ** 2 / 3 + 10, } ) return df @@ -43,7 +44,8 @@ def ts_data_runs(): { "run_id": nreps(["1", "2", "3", "4"], 50), "asset_id": nreps(["foo", "bar", "foo", "bar"], 50), - "timestamp": [datetime(2021, 1, 1) + timedelta(days=i) for i in range(50)] * 4, + "timestamp": [datetime(2021, 1, 1) + timedelta(days=i) for i in range(50)] + * 4, "value1": range(200), } ) diff --git a/tests/toolkit/test_time_series_preprocessor.py b/tests/toolkit/test_time_series_preprocessor.py index 3d56e400..c6439e16 100644 --- a/tests/toolkit/test_time_series_preprocessor.py +++ b/tests/toolkit/test_time_series_preprocessor.py @@ -27,9 +27,9 @@ def test_standard_scaler(sample_data): # check shape preserved result = scaler.fit_transform(sample_data[columns]) assert result.shape == sample_data[columns].shape - expected = (sample_data[columns].values - np.mean(sample_data[columns].values, axis=0)) / np.std( - sample_data[columns].values, axis=0 - ) + expected = ( + sample_data[columns].values - np.mean(sample_data[columns].values, axis=0) + ) / np.std(sample_data[columns].values, axis=0) np.testing.assert_allclose(result, expected) # check serialization @@ -100,8 +100,12 @@ def test_time_series_preprocessor_scales(ts_data): # check scaled result out = tsp.preprocess(df) - assert np.allclose(out.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.mean(x)), 0.0) - assert np.allclose(out.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.std(x)), 1.0) + assert np.allclose( + out.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.mean(x)), 0.0 + ) + assert np.allclose( + out.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.std(x)), 1.0 + ) # check inverse scale result out_inv = tsp.inverse_scale_targets(out) @@ -118,7 +122,9 @@ def test_time_series_preprocessor_scales(ts_data): suffix = "_foo" targets_suffix = [f"{c}{suffix}" for c in tsp.target_columns] - out.columns = [f"{c}{suffix}" if c in tsp.target_columns else c for c in out.columns] + out.columns = [ + f"{c}{suffix}" if c in tsp.target_columns else c for c in out.columns + ] out_inv = tsp.inverse_scale_targets(out, suffix=suffix) assert np.all( out_inv.groupby(tsp.id_columns)[targets_suffix].apply(lambda x: np.mean(x)) @@ -126,9 +132,37 @@ def test_time_series_preprocessor_scales(ts_data): ) +def test_time_series_preprocessor_inv_scales_lists(ts_data): + df = ts_data + + tsp = TimeSeriesPreprocessor( + timestamp_column="timestamp", + prediction_length=2, + context_length=5, + id_columns=["id", "id2"], + target_columns=["value1", "value2"], + scaling=True, + ) + + tsp.train(df) + + # check scaled result + out = tsp.preprocess(df) + + # construct artificial result + out["value1"] = out["value1"].apply(lambda x: np.array([x, x])) + out["value2"] = out["value2"].apply(lambda x: np.array([x, x])) + + out_inv = tsp.inverse_scale_targets(out) + + 1 + + def test_augment_time_series(ts_data): periods = 5 - a = extend_time_series(ts_data, timestamp_column="timestamp", grouping_columns=["id"], periods=periods) + a = extend_time_series( + ts_data, timestamp_column="timestamp", grouping_columns=["id"], periods=periods + ) # check that length increases by periods for each id assert a.shape[0] == ts_data.shape[0] + 3 * periods @@ -215,7 +249,9 @@ def test_get_datasets(ts_data): ) # 3 time series of length 50 - assert len(train) == 3 * (int((1 / 3) * 50) - (tsp.context_length + tsp.prediction_length) + 1) + assert len(train) == 3 * ( + int((1 / 3) * 50) - (tsp.context_length + tsp.prediction_length) + 1 + ) assert len(valid) == len(test) @@ -240,7 +276,10 @@ def test_get_datasets(ts_data): # new train length should be 20% of 100, minus the usual for context length and prediction length fewshot_train_size = ( - int((100 - tsp.context_length) * 0.2) + tsp.context_length - (tsp.context_length + tsp.prediction_length) + 1 + int((100 - tsp.context_length) * 0.2) + + tsp.context_length + - (tsp.context_length + tsp.prediction_length) + + 1 ) assert len(train) == fewshot_train_size @@ -287,11 +326,15 @@ def test_get_datasets(ts_data): }, ) - assert len(train) == int(150 * 0.7) - (tsp.context_length + tsp.prediction_length) + 1 + assert ( + len(train) == int(150 * 0.7) - (tsp.context_length + tsp.prediction_length) + 1 + ) assert len(test) == int(150 * 0.2) - tsp.prediction_length + 1 - assert len(valid) == 150 - int(150 * 0.2) - int(150 * 0.7) - tsp.prediction_length + 1 + assert ( + len(valid) == 150 - int(150 * 0.2) - int(150 * 0.7) - tsp.prediction_length + 1 + ) def test_train_without_targets(ts_data): @@ -354,7 +397,9 @@ def test_id_columns_and_scaling_id_columns(ts_data_runs): scaling=True, ) - ds_train, ds_valid, ds_test = tsp.get_datasets(df, split_config={"train": 0.7, "test": 0.2}) + ds_train, ds_valid, ds_test = tsp.get_datasets( + df, split_config={"train": 0.7, "test": 0.2} + ) assert len(tsp.target_scaler_dict) == 2 assert len(ds_train.datasets) == 4 diff --git a/tsfm_public/toolkit/time_series_forecasting_pipeline.py b/tsfm_public/toolkit/time_series_forecasting_pipeline.py index 7c010f88..04a758cc 100644 --- a/tsfm_public/toolkit/time_series_forecasting_pipeline.py +++ b/tsfm_public/toolkit/time_series_forecasting_pipeline.py @@ -326,4 +326,9 @@ def postprocess(self, input, **kwargs): cols_ordered.extend([c for c in cols if c not in cols_ordered]) out = out[cols_ordered] + + # inverse scale if we have a feature extractor + if self.feature_extractor is not None: + out = self.feature_extractor.inverse_scale_targets(out, column_suffix="_prediction") + return out diff --git a/tsfm_public/toolkit/time_series_preprocessor.py b/tsfm_public/toolkit/time_series_preprocessor.py index e31ca5a8..db6c8f82 100644 --- a/tsfm_public/toolkit/time_series_preprocessor.py +++ b/tsfm_public/toolkit/time_series_preprocessor.py @@ -56,7 +56,9 @@ def to_json(self) -> str: return json.dumps(self.to_dict()) @classmethod - def from_dict(cls, feature_extractor_dict: Dict[str, Any], **kwargs) -> "SKLearnFeatureExtractionBase": + def from_dict( + cls, feature_extractor_dict: Dict[str, Any], **kwargs + ) -> "SKLearnFeatureExtractionBase": """ """ t = cls() @@ -126,10 +128,14 @@ def __init__( # note base class __init__ methods sets all arguments as attributes if not isinstance(id_columns, list): - raise ValueError(f"Invalid argument provided for `id_columns`: {id_columns}") + raise ValueError( + f"Invalid argument provided for `id_columns`: {id_columns}" + ) if isinstance(timestamp_column, list): - raise ValueError(f"`timestamp_column` should not be a list, received: {timestamp_column}") + raise ValueError( + f"`timestamp_column` should not be a list, received: {timestamp_column}" + ) self.id_columns = id_columns self.timestamp_column = timestamp_column @@ -146,7 +152,11 @@ def __init__( self.time_series_task = time_series_task # self.scale_outputs = scale_outputs self.scaler_type = scaler_type - self.scaling_id_columns = scaling_id_columns if scaling_id_columns is not None else copy.copy(id_columns) + self.scaling_id_columns = ( + scaling_id_columns + if scaling_id_columns is not None + else copy.copy(id_columns) + ) # we maintain two scalers per time series to facilitate inverse scaling of the targets self.scaler_dict = {} @@ -223,7 +233,10 @@ def recursive_check_ndarray(dictionary): elif isinstance(value, np.int64): dictionary[key] = int(value) elif isinstance(value, list): - dictionary[key] = [vv.tolist() if isinstance(vv, np.ndarray) else vv for vv in value] + dictionary[key] = [ + vv.tolist() if isinstance(vv, np.ndarray) else vv + for vv in value + ] elif isinstance(value, dict): dictionary[key] = recursive_check_ndarray(value) return dictionary @@ -239,7 +252,9 @@ def recursive_check_ndarray(dictionary): return json.dumps(dictionary, indent=2, sort_keys=True) + "\n" @classmethod - def from_dict(cls, feature_extractor_dict: Dict[str, Any], **kwargs) -> "PreTrainedFeatureExtractor": + def from_dict( + cls, feature_extractor_dict: Dict[str, Any], **kwargs + ) -> "PreTrainedFeatureExtractor": """ Instantiates a type of [`~feature_extraction_utils.FeatureExtractionMixin`] from a Python dictionary of parameters. @@ -358,7 +373,9 @@ def _get_groups( """ if self.scaling_id_columns: group_by_columns = ( - self.scaling_id_columns if len(self.scaling_id_columns) > 1 else self.scaling_id_columns[0] + self.scaling_id_columns + if len(self.scaling_id_columns) > 1 + else self.scaling_id_columns[0] ) else: group_by_columns = INTERNAL_ID_COLUMN @@ -423,7 +440,9 @@ def get_frequency_token(self, token_name: str): token = self.frequency_mapping.get(token_name, None) if token is None: - warn(f"Frequency token {token_name} was not found in the frequncy token mapping.") + warn( + f"Frequency token {token_name} was not found in the frequncy token mapping." + ) token = self.frequency_mapping["oov"] return token @@ -456,7 +475,11 @@ def exogenous_channel_indices(self) -> List[int]: @property def prediction_channel_indices(self) -> List[int]: - return [i for i, c in enumerate(self._get_real_valued_dynamic_channels()) if c in self.target_columns] + return [ + i + for i, c in enumerate(self._get_real_valued_dynamic_channels()) + if c in self.target_columns + ] def _check_dataset(self, dataset: Union[Dataset, pd.DataFrame]): """Basic checks for input dataset. @@ -481,7 +504,9 @@ def _set_targets(self, dataset: pd.DataFrame) -> None: skip_columns.extend(self.conditional_columns) skip_columns.extend(self.static_categorical_columns) - self.target_columns = [c for c in dataset.columns.to_list() if c not in skip_columns] + self.target_columns = [ + c for c in dataset.columns.to_list() if c not in skip_columns + ] def _estimate_frequency(self, df: pd.DataFrame): if self.timestamp_column: @@ -493,7 +518,10 @@ def _estimate_frequency(self, df: pd.DataFrame): df_subset = df # to do: make more robust - self.freq = df_subset[self.timestamp_column].iloc[-1] - df_subset[self.timestamp_column].iloc[-2] + self.freq = ( + df_subset[self.timestamp_column].iloc[-1] + - df_subset[self.timestamp_column].iloc[-2] + ) if not isinstance(self.freq, (str, int)): self.freq = str(self.freq) @@ -552,16 +580,33 @@ def inverse_scale_targets( if suffix is not None: cols_to_scale = [f"{c}{suffix}" for c in cols_to_scale] + col_has_list = [df[c].dtype == np.dtype("O") for c in cols_to_scale] + def inverse_scale_func(grp, id_columns): if isinstance(id_columns, list): name = tuple(grp.iloc[0][id_columns].tolist()) else: name = grp.iloc[0][id_columns] - grp[cols_to_scale] = self.target_scaler_dict[name].inverse_transform(grp[cols_to_scale]) + + if not np.any(col_has_list): + grp[cols_to_scale] = self.target_scaler_dict[name].inverse_transform( + grp[cols_to_scale] + ) + else: + # resort to rowwise transformation + # need converstion to array of list to single 2D array, then back to array of list + # at this point, all column elements should have same length + grp[cols_to_scale] = grp[cols_to_scale].apply( + lambda x: self.target_scaler_dict[name].inverse_transform(x) + ) return grp if self.scaling_id_columns: - id_columns = self.scaling_id_columns if len(self.scaling_id_columns) > 1 else self.scaling_id_columns[0] + id_columns = ( + self.scaling_id_columns + if len(self.scaling_id_columns) > 1 + else self.scaling_id_columns[0] + ) else: id_columns = INTERNAL_ID_COLUMN @@ -599,15 +644,21 @@ def scale_func(grp, id_columns): name = tuple(grp.iloc[0][id_columns].tolist()) else: name = grp.iloc[0][id_columns] - grp[self.target_columns] = self.target_scaler_dict[name].transform(grp[self.target_columns]) + grp[self.target_columns] = self.target_scaler_dict[name].transform( + grp[self.target_columns] + ) if other_cols_to_scale: - grp[other_cols_to_scale] = self.scaler_dict[name].transform(grp[other_cols_to_scale]) + grp[other_cols_to_scale] = self.scaler_dict[name].transform( + grp[other_cols_to_scale] + ) return grp if self.scaling_id_columns: id_columns = ( - self.scaling_id_columns if len(self.scaling_id_columns) > 1 else self.scaling_id_columns[0] + self.scaling_id_columns + if len(self.scaling_id_columns) > 1 + else self.scaling_id_columns[0] ) else: id_columns = INTERNAL_ID_COLUMN @@ -621,7 +672,9 @@ def scale_func(grp, id_columns): cols_to_encode = self._get_columns_to_encode() if self.encode_categorical and cols_to_encode: if not self.categorical_encoder: - raise RuntimeError("Attempt to encode categorical columns, but the encoder has not been trained yet.") + raise RuntimeError( + "Attempt to encode categorical columns, but the encoder has not been trained yet." + ) df[cols_to_encode] = self.categorical_encoder.transform(df[cols_to_encode]) return df @@ -667,20 +720,34 @@ def get_datasets( data = self._standardize_dataframe(dataset) if not self.context_length: - raise ValueError("TimeSeriesPreprocessor must be instantiated with non-null context_length") + raise ValueError( + "TimeSeriesPreprocessor must be instantiated with non-null context_length" + ) if not self.prediction_length: - raise ValueError("TimeSeriesPreprocessor must be instantiated with non-null prediction_length") + raise ValueError( + "TimeSeriesPreprocessor must be instantiated with non-null prediction_length" + ) # get split_params - split_params, split_function = get_split_params(split_config, context_length=self.context_length) + split_params, split_function = get_split_params( + split_config, context_length=self.context_length + ) # split data if isinstance(split_function, dict): - train_data = split_function["train"](data, id_columns=self.id_columns, **split_params["train"]) - valid_data = split_function["valid"](data, id_columns=self.id_columns, **split_params["valid"]) - test_data = split_function["test"](data, id_columns=self.id_columns, **split_params["test"]) + train_data = split_function["train"]( + data, id_columns=self.id_columns, **split_params["train"] + ) + valid_data = split_function["valid"]( + data, id_columns=self.id_columns, **split_params["valid"] + ) + test_data = split_function["test"]( + data, id_columns=self.id_columns, **split_params["test"] + ) else: - train_data, valid_data, test_data = split_function(data, id_columns=self.id_columns, **split_params) + train_data, valid_data, test_data = split_function( + data, id_columns=self.id_columns, **split_params + ) # data preprocessing self.train(train_data) @@ -699,7 +766,9 @@ def get_datasets( # handle fewshot operation if fewshot_fraction is not None: if not ((fewshot_fraction <= 1.0) and (fewshot_fraction > 0.0)): - raise ValueError(f"Fewshot fraction should be between 0 and 1, received {fewshot_fraction}") + raise ValueError( + f"Fewshot fraction should be between 0 and 1, received {fewshot_fraction}" + ) train_data = select_by_fixed_fraction( train_data, @@ -729,13 +798,17 @@ def get_datasets( def create_timestamps( last_timestamp: Union[datetime.datetime, pd.Timestamp], freq: Optional[Union[int, float, datetime.timedelta, pd.Timedelta, str]] = None, - time_sequence: Optional[Union[List[int], List[float], List[datetime.datetime], List[pd.Timestamp]]] = None, + time_sequence: Optional[ + Union[List[int], List[float], List[datetime.datetime], List[pd.Timestamp]] + ] = None, periods: int = 1, ): """Simple utility to create a list of timestamps based on start, delta and number of periods""" if freq is None and time_sequence is None: - raise ValueError("Neither `freq` nor `time_sequence` provided, cannot determine frequency.") + raise ValueError( + "Neither `freq` nor `time_sequence` provided, cannot determine frequency." + ) if freq is None: # to do: make more robust @@ -801,7 +874,9 @@ def augment_one_series(group: Union[pd.Series, pd.DataFrame]): if grouping_columns == []: new_time_series = augment_one_series(time_series) else: - new_time_series = time_series.groupby(grouping_columns).apply(augment_one_series, include_groups=False) + new_time_series = time_series.groupby(grouping_columns).apply( + augment_one_series, include_groups=False + ) idx_names = list(new_time_series.index.names) idx_names[-1] = "__delete" new_time_series = new_time_series.reset_index(names=idx_names) From f10005cfc1511bf40d0bf8f2639503446357fbb6 Mon Sep 17 00:00:00 2001 From: "Wesley M. Gifford" Date: Tue, 2 Apr 2024 20:13:41 -0400 Subject: [PATCH 18/21] handle scaling of lists, update tests Signed-off-by: Wesley M. Gifford --- tests/toolkit/conftest.py | 6 +- .../toolkit/test_time_series_preprocessor.py | 50 +++---- .../toolkit/time_series_preprocessor.py | 133 +++++------------- 3 files changed, 55 insertions(+), 134 deletions(-) diff --git a/tests/toolkit/conftest.py b/tests/toolkit/conftest.py index 7a143e5b..1ae61a6a 100644 --- a/tests/toolkit/conftest.py +++ b/tests/toolkit/conftest.py @@ -16,8 +16,7 @@ def ts_data(): { "id": nreps(["A", "B", "C"], 50), "id2": nreps(["XX", "YY", "ZZ"], 50), - "timestamp": [datetime(2021, 1, 1) + timedelta(days=i) for i in range(50)] - * 3, + "timestamp": [datetime(2021, 1, 1) + timedelta(days=i) for i in range(50)] * 3, "value1": range(150), "value2": np.arange(150) ** 2 / 3 + 10, } @@ -44,8 +43,7 @@ def ts_data_runs(): { "run_id": nreps(["1", "2", "3", "4"], 50), "asset_id": nreps(["foo", "bar", "foo", "bar"], 50), - "timestamp": [datetime(2021, 1, 1) + timedelta(days=i) for i in range(50)] - * 4, + "timestamp": [datetime(2021, 1, 1) + timedelta(days=i) for i in range(50)] * 4, "value1": range(200), } ) diff --git a/tests/toolkit/test_time_series_preprocessor.py b/tests/toolkit/test_time_series_preprocessor.py index c6439e16..9c052fec 100644 --- a/tests/toolkit/test_time_series_preprocessor.py +++ b/tests/toolkit/test_time_series_preprocessor.py @@ -27,9 +27,9 @@ def test_standard_scaler(sample_data): # check shape preserved result = scaler.fit_transform(sample_data[columns]) assert result.shape == sample_data[columns].shape - expected = ( - sample_data[columns].values - np.mean(sample_data[columns].values, axis=0) - ) / np.std(sample_data[columns].values, axis=0) + expected = (sample_data[columns].values - np.mean(sample_data[columns].values, axis=0)) / np.std( + sample_data[columns].values, axis=0 + ) np.testing.assert_allclose(result, expected) # check serialization @@ -100,12 +100,8 @@ def test_time_series_preprocessor_scales(ts_data): # check scaled result out = tsp.preprocess(df) - assert np.allclose( - out.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.mean(x)), 0.0 - ) - assert np.allclose( - out.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.std(x)), 1.0 - ) + assert np.allclose(out.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.mean(x)), 0.0) + assert np.allclose(out.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.std(x)), 1.0) # check inverse scale result out_inv = tsp.inverse_scale_targets(out) @@ -122,9 +118,7 @@ def test_time_series_preprocessor_scales(ts_data): suffix = "_foo" targets_suffix = [f"{c}{suffix}" for c in tsp.target_columns] - out.columns = [ - f"{c}{suffix}" if c in tsp.target_columns else c for c in out.columns - ] + out.columns = [f"{c}{suffix}" if c in tsp.target_columns else c for c in out.columns] out_inv = tsp.inverse_scale_targets(out, suffix=suffix) assert np.all( out_inv.groupby(tsp.id_columns)[targets_suffix].apply(lambda x: np.mean(x)) @@ -150,19 +144,18 @@ def test_time_series_preprocessor_inv_scales_lists(ts_data): out = tsp.preprocess(df) # construct artificial result - out["value1"] = out["value1"].apply(lambda x: np.array([x, x])) - out["value2"] = out["value2"].apply(lambda x: np.array([x, x])) + out["value1"] = out["value1"].apply(lambda x: np.array([x] * 3)) + out["value2"] = out["value2"].apply(lambda x: np.array([x] * 3)) out_inv = tsp.inverse_scale_targets(out) - 1 + assert out_inv["value1"].mean()[0] == df["value1"].mean() + assert out_inv["value2"].mean()[0] == df["value2"].mean() def test_augment_time_series(ts_data): periods = 5 - a = extend_time_series( - ts_data, timestamp_column="timestamp", grouping_columns=["id"], periods=periods - ) + a = extend_time_series(ts_data, timestamp_column="timestamp", grouping_columns=["id"], periods=periods) # check that length increases by periods for each id assert a.shape[0] == ts_data.shape[0] + 3 * periods @@ -249,9 +242,7 @@ def test_get_datasets(ts_data): ) # 3 time series of length 50 - assert len(train) == 3 * ( - int((1 / 3) * 50) - (tsp.context_length + tsp.prediction_length) + 1 - ) + assert len(train) == 3 * (int((1 / 3) * 50) - (tsp.context_length + tsp.prediction_length) + 1) assert len(valid) == len(test) @@ -276,10 +267,7 @@ def test_get_datasets(ts_data): # new train length should be 20% of 100, minus the usual for context length and prediction length fewshot_train_size = ( - int((100 - tsp.context_length) * 0.2) - + tsp.context_length - - (tsp.context_length + tsp.prediction_length) - + 1 + int((100 - tsp.context_length) * 0.2) + tsp.context_length - (tsp.context_length + tsp.prediction_length) + 1 ) assert len(train) == fewshot_train_size @@ -326,15 +314,11 @@ def test_get_datasets(ts_data): }, ) - assert ( - len(train) == int(150 * 0.7) - (tsp.context_length + tsp.prediction_length) + 1 - ) + assert len(train) == int(150 * 0.7) - (tsp.context_length + tsp.prediction_length) + 1 assert len(test) == int(150 * 0.2) - tsp.prediction_length + 1 - assert ( - len(valid) == 150 - int(150 * 0.2) - int(150 * 0.7) - tsp.prediction_length + 1 - ) + assert len(valid) == 150 - int(150 * 0.2) - int(150 * 0.7) - tsp.prediction_length + 1 def test_train_without_targets(ts_data): @@ -397,9 +381,7 @@ def test_id_columns_and_scaling_id_columns(ts_data_runs): scaling=True, ) - ds_train, ds_valid, ds_test = tsp.get_datasets( - df, split_config={"train": 0.7, "test": 0.2} - ) + ds_train, ds_valid, ds_test = tsp.get_datasets(df, split_config={"train": 0.7, "test": 0.2}) assert len(tsp.target_scaler_dict) == 2 assert len(ds_train.datasets) == 4 diff --git a/tsfm_public/toolkit/time_series_preprocessor.py b/tsfm_public/toolkit/time_series_preprocessor.py index db6c8f82..b88dcad7 100644 --- a/tsfm_public/toolkit/time_series_preprocessor.py +++ b/tsfm_public/toolkit/time_series_preprocessor.py @@ -56,9 +56,7 @@ def to_json(self) -> str: return json.dumps(self.to_dict()) @classmethod - def from_dict( - cls, feature_extractor_dict: Dict[str, Any], **kwargs - ) -> "SKLearnFeatureExtractionBase": + def from_dict(cls, feature_extractor_dict: Dict[str, Any], **kwargs) -> "SKLearnFeatureExtractionBase": """ """ t = cls() @@ -128,14 +126,10 @@ def __init__( # note base class __init__ methods sets all arguments as attributes if not isinstance(id_columns, list): - raise ValueError( - f"Invalid argument provided for `id_columns`: {id_columns}" - ) + raise ValueError(f"Invalid argument provided for `id_columns`: {id_columns}") if isinstance(timestamp_column, list): - raise ValueError( - f"`timestamp_column` should not be a list, received: {timestamp_column}" - ) + raise ValueError(f"`timestamp_column` should not be a list, received: {timestamp_column}") self.id_columns = id_columns self.timestamp_column = timestamp_column @@ -152,11 +146,7 @@ def __init__( self.time_series_task = time_series_task # self.scale_outputs = scale_outputs self.scaler_type = scaler_type - self.scaling_id_columns = ( - scaling_id_columns - if scaling_id_columns is not None - else copy.copy(id_columns) - ) + self.scaling_id_columns = scaling_id_columns if scaling_id_columns is not None else copy.copy(id_columns) # we maintain two scalers per time series to facilitate inverse scaling of the targets self.scaler_dict = {} @@ -233,10 +223,7 @@ def recursive_check_ndarray(dictionary): elif isinstance(value, np.int64): dictionary[key] = int(value) elif isinstance(value, list): - dictionary[key] = [ - vv.tolist() if isinstance(vv, np.ndarray) else vv - for vv in value - ] + dictionary[key] = [vv.tolist() if isinstance(vv, np.ndarray) else vv for vv in value] elif isinstance(value, dict): dictionary[key] = recursive_check_ndarray(value) return dictionary @@ -252,9 +239,7 @@ def recursive_check_ndarray(dictionary): return json.dumps(dictionary, indent=2, sort_keys=True) + "\n" @classmethod - def from_dict( - cls, feature_extractor_dict: Dict[str, Any], **kwargs - ) -> "PreTrainedFeatureExtractor": + def from_dict(cls, feature_extractor_dict: Dict[str, Any], **kwargs) -> "PreTrainedFeatureExtractor": """ Instantiates a type of [`~feature_extraction_utils.FeatureExtractionMixin`] from a Python dictionary of parameters. @@ -373,9 +358,7 @@ def _get_groups( """ if self.scaling_id_columns: group_by_columns = ( - self.scaling_id_columns - if len(self.scaling_id_columns) > 1 - else self.scaling_id_columns[0] + self.scaling_id_columns if len(self.scaling_id_columns) > 1 else self.scaling_id_columns[0] ) else: group_by_columns = INTERNAL_ID_COLUMN @@ -440,9 +423,7 @@ def get_frequency_token(self, token_name: str): token = self.frequency_mapping.get(token_name, None) if token is None: - warn( - f"Frequency token {token_name} was not found in the frequncy token mapping." - ) + warn(f"Frequency token {token_name} was not found in the frequncy token mapping.") token = self.frequency_mapping["oov"] return token @@ -475,11 +456,7 @@ def exogenous_channel_indices(self) -> List[int]: @property def prediction_channel_indices(self) -> List[int]: - return [ - i - for i, c in enumerate(self._get_real_valued_dynamic_channels()) - if c in self.target_columns - ] + return [i for i, c in enumerate(self._get_real_valued_dynamic_channels()) if c in self.target_columns] def _check_dataset(self, dataset: Union[Dataset, pd.DataFrame]): """Basic checks for input dataset. @@ -504,9 +481,7 @@ def _set_targets(self, dataset: pd.DataFrame) -> None: skip_columns.extend(self.conditional_columns) skip_columns.extend(self.static_categorical_columns) - self.target_columns = [ - c for c in dataset.columns.to_list() if c not in skip_columns - ] + self.target_columns = [c for c in dataset.columns.to_list() if c not in skip_columns] def _estimate_frequency(self, df: pd.DataFrame): if self.timestamp_column: @@ -518,10 +493,7 @@ def _estimate_frequency(self, df: pd.DataFrame): df_subset = df # to do: make more robust - self.freq = ( - df_subset[self.timestamp_column].iloc[-1] - - df_subset[self.timestamp_column].iloc[-2] - ) + self.freq = df_subset[self.timestamp_column].iloc[-1] - df_subset[self.timestamp_column].iloc[-2] if not isinstance(self.freq, (str, int)): self.freq = str(self.freq) @@ -582,6 +554,14 @@ def inverse_scale_targets( col_has_list = [df[c].dtype == np.dtype("O") for c in cols_to_scale] + def explode_row(df_row, name, columns): + df = pd.DataFrame(df_row[columns].to_dict()) + inv_scale = self.target_scaler_dict[name].inverse_transform(df) + df_out = df_row.copy() + for idx, c in enumerate(columns): + df_out[c] = inv_scale[:, idx] + return df_out + def inverse_scale_func(grp, id_columns): if isinstance(id_columns, list): name = tuple(grp.iloc[0][id_columns].tolist()) @@ -589,24 +569,15 @@ def inverse_scale_func(grp, id_columns): name = grp.iloc[0][id_columns] if not np.any(col_has_list): - grp[cols_to_scale] = self.target_scaler_dict[name].inverse_transform( - grp[cols_to_scale] - ) + grp[cols_to_scale] = self.target_scaler_dict[name].inverse_transform(grp[cols_to_scale]) else: - # resort to rowwise transformation - # need converstion to array of list to single 2D array, then back to array of list - # at this point, all column elements should have same length grp[cols_to_scale] = grp[cols_to_scale].apply( - lambda x: self.target_scaler_dict[name].inverse_transform(x) + lambda x: explode_row(x, name, cols_to_scale), axis="columns" ) return grp if self.scaling_id_columns: - id_columns = ( - self.scaling_id_columns - if len(self.scaling_id_columns) > 1 - else self.scaling_id_columns[0] - ) + id_columns = self.scaling_id_columns if len(self.scaling_id_columns) > 1 else self.scaling_id_columns[0] else: id_columns = INTERNAL_ID_COLUMN @@ -644,21 +615,15 @@ def scale_func(grp, id_columns): name = tuple(grp.iloc[0][id_columns].tolist()) else: name = grp.iloc[0][id_columns] - grp[self.target_columns] = self.target_scaler_dict[name].transform( - grp[self.target_columns] - ) + grp[self.target_columns] = self.target_scaler_dict[name].transform(grp[self.target_columns]) if other_cols_to_scale: - grp[other_cols_to_scale] = self.scaler_dict[name].transform( - grp[other_cols_to_scale] - ) + grp[other_cols_to_scale] = self.scaler_dict[name].transform(grp[other_cols_to_scale]) return grp if self.scaling_id_columns: id_columns = ( - self.scaling_id_columns - if len(self.scaling_id_columns) > 1 - else self.scaling_id_columns[0] + self.scaling_id_columns if len(self.scaling_id_columns) > 1 else self.scaling_id_columns[0] ) else: id_columns = INTERNAL_ID_COLUMN @@ -672,9 +637,7 @@ def scale_func(grp, id_columns): cols_to_encode = self._get_columns_to_encode() if self.encode_categorical and cols_to_encode: if not self.categorical_encoder: - raise RuntimeError( - "Attempt to encode categorical columns, but the encoder has not been trained yet." - ) + raise RuntimeError("Attempt to encode categorical columns, but the encoder has not been trained yet.") df[cols_to_encode] = self.categorical_encoder.transform(df[cols_to_encode]) return df @@ -720,34 +683,20 @@ def get_datasets( data = self._standardize_dataframe(dataset) if not self.context_length: - raise ValueError( - "TimeSeriesPreprocessor must be instantiated with non-null context_length" - ) + raise ValueError("TimeSeriesPreprocessor must be instantiated with non-null context_length") if not self.prediction_length: - raise ValueError( - "TimeSeriesPreprocessor must be instantiated with non-null prediction_length" - ) + raise ValueError("TimeSeriesPreprocessor must be instantiated with non-null prediction_length") # get split_params - split_params, split_function = get_split_params( - split_config, context_length=self.context_length - ) + split_params, split_function = get_split_params(split_config, context_length=self.context_length) # split data if isinstance(split_function, dict): - train_data = split_function["train"]( - data, id_columns=self.id_columns, **split_params["train"] - ) - valid_data = split_function["valid"]( - data, id_columns=self.id_columns, **split_params["valid"] - ) - test_data = split_function["test"]( - data, id_columns=self.id_columns, **split_params["test"] - ) + train_data = split_function["train"](data, id_columns=self.id_columns, **split_params["train"]) + valid_data = split_function["valid"](data, id_columns=self.id_columns, **split_params["valid"]) + test_data = split_function["test"](data, id_columns=self.id_columns, **split_params["test"]) else: - train_data, valid_data, test_data = split_function( - data, id_columns=self.id_columns, **split_params - ) + train_data, valid_data, test_data = split_function(data, id_columns=self.id_columns, **split_params) # data preprocessing self.train(train_data) @@ -766,9 +715,7 @@ def get_datasets( # handle fewshot operation if fewshot_fraction is not None: if not ((fewshot_fraction <= 1.0) and (fewshot_fraction > 0.0)): - raise ValueError( - f"Fewshot fraction should be between 0 and 1, received {fewshot_fraction}" - ) + raise ValueError(f"Fewshot fraction should be between 0 and 1, received {fewshot_fraction}") train_data = select_by_fixed_fraction( train_data, @@ -798,17 +745,13 @@ def get_datasets( def create_timestamps( last_timestamp: Union[datetime.datetime, pd.Timestamp], freq: Optional[Union[int, float, datetime.timedelta, pd.Timedelta, str]] = None, - time_sequence: Optional[ - Union[List[int], List[float], List[datetime.datetime], List[pd.Timestamp]] - ] = None, + time_sequence: Optional[Union[List[int], List[float], List[datetime.datetime], List[pd.Timestamp]]] = None, periods: int = 1, ): """Simple utility to create a list of timestamps based on start, delta and number of periods""" if freq is None and time_sequence is None: - raise ValueError( - "Neither `freq` nor `time_sequence` provided, cannot determine frequency." - ) + raise ValueError("Neither `freq` nor `time_sequence` provided, cannot determine frequency.") if freq is None: # to do: make more robust @@ -874,9 +817,7 @@ def augment_one_series(group: Union[pd.Series, pd.DataFrame]): if grouping_columns == []: new_time_series = augment_one_series(time_series) else: - new_time_series = time_series.groupby(grouping_columns).apply( - augment_one_series, include_groups=False - ) + new_time_series = time_series.groupby(grouping_columns).apply(augment_one_series, include_groups=False) idx_names = list(new_time_series.index.names) idx_names[-1] = "__delete" new_time_series = new_time_series.reset_index(names=idx_names) From 4878db69ead3a307d52c780e89ddd40928fcac8e Mon Sep 17 00:00:00 2001 From: "Wesley M. Gifford" Date: Tue, 2 Apr 2024 21:44:16 -0400 Subject: [PATCH 19/21] add tests, update pipline for inverse scale Signed-off-by: Wesley M. Gifford --- .../test_time_series_forecasting_pipeline.py | 60 +++++++++++++++++++ .../time_series_forecasting_pipeline.py | 2 +- 2 files changed, 61 insertions(+), 1 deletion(-) diff --git a/tests/toolkit/test_time_series_forecasting_pipeline.py b/tests/toolkit/test_time_series_forecasting_pipeline.py index 7b87dec5..87e8753c 100644 --- a/tests/toolkit/test_time_series_forecasting_pipeline.py +++ b/tests/toolkit/test_time_series_forecasting_pipeline.py @@ -8,6 +8,7 @@ from tsfm_public.toolkit.time_series_forecasting_pipeline import ( TimeSeriesForecastingPipeline, ) +from tsfm_public.toolkit.time_series_preprocessor import TimeSeriesPreprocessor from tsfm_public.toolkit.util import select_by_index @@ -79,3 +80,62 @@ def test_forecasting_pipeline_forecasts(): forecasts_exploded = forecast_pipeline(test_data) assert forecasts_exploded.shape == (prediction_length, len(target_columns) + 1) + + +def test_forecasting_pipeline_forecasts_with_preprocessor(): + timestamp_column = "date" + id_columns = [] + target_columns = ["HUFL", "HULL", "MUFL", "MULL", "LUFL", "LULL", "OT"] + prediction_length = 96 + + model_path = "ibm/patchtst-etth1-forecasting" + model = PatchTSTForPrediction.from_pretrained(model_path) + context_length = model.config.context_length + + tsp = TimeSeriesPreprocessor( + timestamp_column=timestamp_column, + id_columns=id_columns, + target_columns=target_columns, + context_length=context_length, + prediction_length=prediction_length, + freq="1h", + ) + + forecast_pipeline = TimeSeriesForecastingPipeline( + model=model, + timestamp_column=timestamp_column, + id_columns=id_columns, + target_columns=target_columns, + freq="1h", + feature_extractor=tsp, + explode_forecasts=False, + ) + + dataset_path = "https://raw.githubusercontent.com/zhouhaoyi/ETDataset/main/ETT-small/ETTh2.csv" + data = pd.read_csv( + dataset_path, + parse_dates=[timestamp_column], + ) + test_end_index = 12 * 30 * 24 + 8 * 30 * 24 + test_start_index = test_end_index - context_length - 4 + + data = pd.read_csv( + dataset_path, + parse_dates=[timestamp_column], + ) + + test_data = select_by_index( + data, + id_columns=id_columns, + start_index=test_start_index, + end_index=test_end_index, + ) + + forecasts = forecast_pipeline(test_data) + + assert forecasts.shape == ( + test_end_index - test_start_index - context_length + 1, + 2 * len(target_columns) + 1, + ) + + # to do: add check on the scaling diff --git a/tsfm_public/toolkit/time_series_forecasting_pipeline.py b/tsfm_public/toolkit/time_series_forecasting_pipeline.py index 04a758cc..9118a373 100644 --- a/tsfm_public/toolkit/time_series_forecasting_pipeline.py +++ b/tsfm_public/toolkit/time_series_forecasting_pipeline.py @@ -329,6 +329,6 @@ def postprocess(self, input, **kwargs): # inverse scale if we have a feature extractor if self.feature_extractor is not None: - out = self.feature_extractor.inverse_scale_targets(out, column_suffix="_prediction") + out = self.feature_extractor.inverse_scale_targets(out, suffix="_prediction") return out From 2dbd15d9eaa5fda25a2ecf4541d43acef30480b3 Mon Sep 17 00:00:00 2001 From: "Wesley M. Gifford" Date: Wed, 3 Apr 2024 18:07:12 -0400 Subject: [PATCH 20/21] docstrings Signed-off-by: Wesley M. Gifford --- .../time_series_forecasting_pipeline.py | 107 ++++++++++++------ 1 file changed, 73 insertions(+), 34 deletions(-) diff --git a/tsfm_public/toolkit/time_series_forecasting_pipeline.py b/tsfm_public/toolkit/time_series_forecasting_pipeline.py index 9118a373..445b22d8 100644 --- a/tsfm_public/toolkit/time_series_forecasting_pipeline.py +++ b/tsfm_public/toolkit/time_series_forecasting_pipeline.py @@ -32,12 +32,17 @@ @add_end_docstrings( - build_pipeline_init_args(has_tokenizer=False, has_feature_extractor=True, has_image_processor=False) + build_pipeline_init_args( + has_tokenizer=False, has_feature_extractor=True, has_image_processor=False + ) ) class TimeSeriesForecastingPipeline(Pipeline): - """Hugging Face Pipeline for Time Series Forecasting""" + """Hugging Face Pipeline for Time Series Forecasting - # has_feature_extractor means we can pass feature_extractor=TimeSeriesPreprocessor + feature_extractor (TimeSeriesPreprocessor): A time series preprpocessor object that specifies how the time + series should be prepared. If this is provided, and of the other options below will be automatically + populated from this instance. + """ def __init__( self, @@ -62,7 +67,9 @@ def _sanitize_parameters(self, **kwargs): """ context_length = kwargs.get("context_length", self.model.config.context_length) - prediction_length = kwargs.get("prediction_length", self.model.config.prediction_length) + prediction_length = kwargs.get( + "prediction_length", self.model.config.prediction_length + ) preprocess_kwargs = { "prediction_length": prediction_length, @@ -128,40 +135,54 @@ def __call__( produces predictions. Args: - time_series (Union["pandas.DataFrame", str]): A pandas dataframe or a referce to a location - from where a pandas datarame can be loaded containing the time series on which to perform inference. + time_series (Union["pandas.DataFrame", str]): A pandas dataframe containing the time series on + which to perform inference. - future_time_series (Union["pandas.DataFrame", str]): A pandas dataframe or a referce to a location - from where a pandas datarame can be loaded containing future values, i.e., exogenous or supporting features - which are known in advance. + Keyword arguments: + future_time_series (Union["pandas.DataFrame", str]): A pandas dataframe containing future values, + i.e., exogenous or supporting features which are known in advance. - To do: describe batch vs. single and the need for future_time_series + feature_extractor (TimeSeriesPreprocessor): A time series preprpocessor object that specifies how the time + series should be prepared. If this is provided, and of the other options below will be automatically + populated from this instance. + timestamp_column (str): The name of the column containing the timestamp of the time series. - kwargs + id_columns (List[str]): List of column names which identify different time series in a multi-time series input. - future_time_series: Optional[Union["pandas.DataFrame", str]] = None, - prediction_length - context_length + target_columns (List[str]): List of column names which identify the target channels in the input, these are the + columns that will be forecasted. - timestamp_column (str): the column containing the date / timestamp - id_columns (List[str]): the list of columns containing ID information. If no ids are present, pass []. + observable_columns (List[str]): List of column names which identify the observable channels in the input. + Observable channels are channels which we have knowledge about in the past and future. For example, weather + conditions such as temperature or precipitation may be known or estimated in the future, but cannot be + changed. - "target_columns", - "observable_columns", - "control_columns", - "conditional_columns", - "static_categorical_columns", + control_columns (List[str]): List of column names which identify the control channels in the input. Control + channels are similar to observable channels, except that future values may be controlled. For example, discount + percentage of a particular product is known and controllable in the future. + + conditional_columns (List[str]): List of column names which identify the conditional channels in the input. + Conditional channels are channels which we know in the past, but do not know in the future. + static_categorical_columns (List[str]): List of column names which identify categorical-valued channels in the input + which are fixed over time. - # OLD - input_columns (List[str]): the columns that are used as to create the inputs to the forecasting model. - These values are used to select data in the input dataframe. - output_columns (List[str]): the column names that are used to label the outputs of the forecasting model. - If omitted, it is assumed that the model will forecast values for all the input columns. + freq (str): A freqency indicator for the given `timestamp_column`. See + https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#period-aliases for a description of the + allowed values. If not provided, we will attempt to infer it from the data. If not provided, frequency will be + inferred from `timestamp_column`. + prediction_length (int): The length of the desired forecast. Currently, this value must not exceed the maximum value + suported by the model. If not specified, the maximum value supported by the model is used. - Return: + context_length (int): Specifies the length of the context windows extracted from the historical data for feeding into + the model. + + explode_forecasts (bool): If true, forecasts are returned one value per row of the pandas dataframe. If false, the + forecast over the prediction length will be contained as a list in a single row of the pandas dataframe. + + Return (pandas dataframe): A new pandas dataframe containing the forecasts. Each row will contain the id, timestamp, the original input feature values and the output forecast for each input column. The output forecast is a list containing all the values over the prediction horizon. @@ -170,7 +191,9 @@ def __call__( return super().__call__(time_series, **kwargs) - def preprocess(self, time_series, **kwargs) -> Dict[str, Union[GenericTensor, List[Any]]]: + def preprocess( + self, time_series, **kwargs + ) -> Dict[str, Union[GenericTensor, List[Any]]]: """Preprocess step Load the data, if not already loaded, and then generate a pytorch dataset. """ @@ -198,12 +221,16 @@ def preprocess(self, time_series, **kwargs) -> Dict[str, Union[GenericTensor, Li # do we need to check the timestamp column? pass else: - raise ValueError(f"`future_time_series` of type {type(future_time_series)} is not supported.") + raise ValueError( + f"`future_time_series` of type {type(future_time_series)} is not supported." + ) # stack the time series for c in future_time_series.columns: if c not in time_series.columns: - raise ValueError(f"Future time series input contains an unknown column {c}.") + raise ValueError( + f"Future time series input contains an unknown column {c}." + ) time_series = pd.concat((time_series, future_time_series), axis=0) else: @@ -264,7 +291,11 @@ def _forward(self, model_inputs, **kwargs): # copy the other inputs copy_inputs = True - for k in [akey for akey in model_inputs.keys() if (akey not in model_input_keys) or copy_inputs]: + for k in [ + akey + for akey in model_inputs.keys() + if (akey not in model_input_keys) or copy_inputs + ]: model_outputs[k] = model_inputs[k] return model_outputs @@ -276,14 +307,20 @@ def postprocess(self, input, **kwargs): """ out = {} - model_output_key = "prediction_outputs" if "prediction_outputs" in input.keys() else "prediction_logits" + model_output_key = ( + "prediction_outputs" + if "prediction_outputs" in input.keys() + else "prediction_logits" + ) # name the predictions of target columns # outputs should only have size equal to target columns prediction_columns = [] for i, c in enumerate(kwargs["target_columns"]): prediction_columns.append(f"{c}_prediction") - out[prediction_columns[-1]] = input[model_output_key][:, :, i].numpy().tolist() + out[prediction_columns[-1]] = ( + input[model_output_key][:, :, i].numpy().tolist() + ) # provide the ground truth values for the targets # when future is unknown, we will have augmented the provided dataframe with NaN values to cover the future for i, c in enumerate(kwargs["target_columns"]): @@ -329,6 +366,8 @@ def postprocess(self, input, **kwargs): # inverse scale if we have a feature extractor if self.feature_extractor is not None: - out = self.feature_extractor.inverse_scale_targets(out, suffix="_prediction") + out = self.feature_extractor.inverse_scale_targets( + out, suffix="_prediction" + ) return out From 2e66f31f739d2e37aacd37f1ff0f02fca06a0dbd Mon Sep 17 00:00:00 2001 From: "Wesley M. Gifford" Date: Wed, 3 Apr 2024 18:24:22 -0400 Subject: [PATCH 21/21] quality Signed-off-by: Wesley M. Gifford --- .../time_series_forecasting_pipeline.py | 40 +++++-------------- 1 file changed, 9 insertions(+), 31 deletions(-) diff --git a/tsfm_public/toolkit/time_series_forecasting_pipeline.py b/tsfm_public/toolkit/time_series_forecasting_pipeline.py index 445b22d8..69d3ce4a 100644 --- a/tsfm_public/toolkit/time_series_forecasting_pipeline.py +++ b/tsfm_public/toolkit/time_series_forecasting_pipeline.py @@ -32,9 +32,7 @@ @add_end_docstrings( - build_pipeline_init_args( - has_tokenizer=False, has_feature_extractor=True, has_image_processor=False - ) + build_pipeline_init_args(has_tokenizer=False, has_feature_extractor=True, has_image_processor=False) ) class TimeSeriesForecastingPipeline(Pipeline): """Hugging Face Pipeline for Time Series Forecasting @@ -67,9 +65,7 @@ def _sanitize_parameters(self, **kwargs): """ context_length = kwargs.get("context_length", self.model.config.context_length) - prediction_length = kwargs.get( - "prediction_length", self.model.config.prediction_length - ) + prediction_length = kwargs.get("prediction_length", self.model.config.prediction_length) preprocess_kwargs = { "prediction_length": prediction_length, @@ -191,9 +187,7 @@ def __call__( return super().__call__(time_series, **kwargs) - def preprocess( - self, time_series, **kwargs - ) -> Dict[str, Union[GenericTensor, List[Any]]]: + def preprocess(self, time_series, **kwargs) -> Dict[str, Union[GenericTensor, List[Any]]]: """Preprocess step Load the data, if not already loaded, and then generate a pytorch dataset. """ @@ -221,16 +215,12 @@ def preprocess( # do we need to check the timestamp column? pass else: - raise ValueError( - f"`future_time_series` of type {type(future_time_series)} is not supported." - ) + raise ValueError(f"`future_time_series` of type {type(future_time_series)} is not supported.") # stack the time series for c in future_time_series.columns: if c not in time_series.columns: - raise ValueError( - f"Future time series input contains an unknown column {c}." - ) + raise ValueError(f"Future time series input contains an unknown column {c}.") time_series = pd.concat((time_series, future_time_series), axis=0) else: @@ -291,11 +281,7 @@ def _forward(self, model_inputs, **kwargs): # copy the other inputs copy_inputs = True - for k in [ - akey - for akey in model_inputs.keys() - if (akey not in model_input_keys) or copy_inputs - ]: + for k in [akey for akey in model_inputs.keys() if (akey not in model_input_keys) or copy_inputs]: model_outputs[k] = model_inputs[k] return model_outputs @@ -307,20 +293,14 @@ def postprocess(self, input, **kwargs): """ out = {} - model_output_key = ( - "prediction_outputs" - if "prediction_outputs" in input.keys() - else "prediction_logits" - ) + model_output_key = "prediction_outputs" if "prediction_outputs" in input.keys() else "prediction_logits" # name the predictions of target columns # outputs should only have size equal to target columns prediction_columns = [] for i, c in enumerate(kwargs["target_columns"]): prediction_columns.append(f"{c}_prediction") - out[prediction_columns[-1]] = ( - input[model_output_key][:, :, i].numpy().tolist() - ) + out[prediction_columns[-1]] = input[model_output_key][:, :, i].numpy().tolist() # provide the ground truth values for the targets # when future is unknown, we will have augmented the provided dataframe with NaN values to cover the future for i, c in enumerate(kwargs["target_columns"]): @@ -366,8 +346,6 @@ def postprocess(self, input, **kwargs): # inverse scale if we have a feature extractor if self.feature_extractor is not None: - out = self.feature_extractor.inverse_scale_targets( - out, suffix="_prediction" - ) + out = self.feature_extractor.inverse_scale_targets(out, suffix="_prediction") return out