diff --git a/hacking/datasets_from_tsp.py b/hacking/datasets_from_tsp.py new file mode 100644 index 00000000..bb4312e7 --- /dev/null +++ b/hacking/datasets_from_tsp.py @@ -0,0 +1,41 @@ +# %% +import pandas as pd + +from tsfm_public.toolkit.time_series_preprocessor import TimeSeriesPreprocessor + + +split_config = {"train": [0, 8640], "valid": [8640, 11520], "test": [11520, 14400]} + + +dataset_path = ( + "https://raw.githubusercontent.com/zhouhaoyi/ETDataset/main/ETT-small/ETTh1.csv" +) + +timestamp_column = "date" + +df = pd.read_csv( + dataset_path, + parse_dates=[timestamp_column], +) + +tsp = TimeSeriesPreprocessor( + id_columns=[], + timestamp_column=timestamp_column, + target_columns=["HUFL", "HULL", "MUFL", "MULL", "LUFL", "LULL", "OT"], + observable_columns=[], + control_columns=[], + conditional_columns=[], + static_categorical_columns=[], + scaling=True, + scaler_type="standard", + encode_categorical=False, + prediction_length=10, + context_length=96, +) + +train, valid, test = tsp.get_datasets(df, split_config) + +# %% +split_config = {"train": [0, 0.7], "valid": [0.7, 0.9], "test": [0.9, 1]} + +train, valid, test = tsp.get_datasets(df, split_config) diff --git a/tests/toolkit/test_time_series_preprocessor.py b/tests/toolkit/test_time_series_preprocessor.py index 42069611..73ac6bf6 100644 --- a/tests/toolkit/test_time_series_preprocessor.py +++ b/tests/toolkit/test_time_series_preprocessor.py @@ -16,6 +16,7 @@ create_timestamps, extend_time_series, ) +from tsfm_public.toolkit.util import FractionLocation def test_standard_scaler(sample_data): @@ -72,7 +73,7 @@ def test_time_series_preprocessor_encodes(sample_data): static_categorical_columns = ["cat", "cat2"] tsp = TimeSeriesPreprocessor( - input_columns=["val", "val2"], + target_columns=["val", "val2"], static_categorical_columns=static_categorical_columns, ) tsp.train(sample_data) @@ -156,3 +157,71 @@ def test_create_timestamps(): # it is an error to provide neither freq or sequence with pytest.raises(ValueError): ts = create_timestamps(start, periods=periods) + + +def test_get_datasets(ts_data): + tsp = TimeSeriesPreprocessor( + id_columns=["id"], + target_columns=["value1", "value2"], + prediction_length=5, + context_length=10, + ) + + train, valid, test = tsp.get_datasets( + ts_data, + split_config={"train": [0, 1 / 3], "valid": [1 / 3, 2 / 3], "test": [2 / 3, 1]}, + ) + + # 3 time series of length 50 + assert len(train) == 3 * (int((1 / 3) * 50) - (tsp.context_length + tsp.prediction_length) + 1) + + assert len(valid) == len(test) + + # no id columns, so treat as one big time series + tsp = TimeSeriesPreprocessor( + id_columns=[], + target_columns=["value1", "value2"], + prediction_length=5, + context_length=10, + ) + + train, valid, test = tsp.get_datasets( + ts_data, + split_config={ + "train": [0, 100], + "valid": [100, 125], + "test": [125, 150], + }, + fewshot_fraction=0.2, + fewshot_location=FractionLocation.LAST.value, + ) + + # new train length should be 20% of 100, minus the usual for context length and prediction length + fewshot_train_size = int(100 * 0.2) - (tsp.context_length + tsp.prediction_length) + 1 + assert len(train) == fewshot_train_size + + assert len(valid) == len(test) + + # no id columns, so treat as one big time series + tsp = TimeSeriesPreprocessor( + id_columns=[], + target_columns=["value1", "value2"], + prediction_length=5, + context_length=10, + ) + + train, valid, test = tsp.get_datasets( + ts_data, + split_config={ + "train": [0, 100], + "valid": [100, 125], + "test": [125, 150], + }, + fewshot_fraction=0.2, + fewshot_location=FractionLocation.FIRST.value, + ) + + # new train length should be 20% of 100, minus the usual for context length and prediction length + assert len(train) == fewshot_train_size + + assert len(valid) == len(test) diff --git a/tests/toolkit/test_util.py b/tests/toolkit/test_util.py new file mode 100644 index 00000000..9c01321b --- /dev/null +++ b/tests/toolkit/test_util.py @@ -0,0 +1,26 @@ +"""Tests for util functions""" + +import pytest + +from tsfm_public.toolkit.util import get_split_params + + +split_cases = [ + (0, 1, "select_by_index"), + (0, 0.1, "select_by_relative_fraction"), + (0.0, 0.1, "select_by_relative_fraction"), + (0.0, 200.0, "select_by_index"), + (0.0, 200, "select_by_index"), + (0.5, 1, "select_by_relative_fraction"), + (0.5, 1.0, "select_by_relative_fraction"), + (10, 100.0, "select_by_index"), +] + + +@pytest.mark.parametrize("left_arg,right_arg,expected", split_cases) +def test_get_split_params(left_arg, right_arg, expected): + """Test that get_split_params gives the right split function""" + + split_config, split_function = get_split_params({"train": [left_arg, right_arg], "valid": [0, 1], "test": [0, 1]}) + + assert split_function["train"].__name__ == expected diff --git a/tsfm_public/toolkit/time_series_preprocessor.py b/tsfm_public/toolkit/time_series_preprocessor.py index e583e588..89dbe279 100644 --- a/tsfm_public/toolkit/time_series_preprocessor.py +++ b/tsfm_public/toolkit/time_series_preprocessor.py @@ -21,7 +21,13 @@ PreTrainedFeatureExtractor, ) -from .util import join_list_without_repeat +from .dataset import ForecastDFDataset +from .util import ( + FractionLocation, + get_split_params, + join_list_without_repeat, + select_by_fixed_fraction, +) INTERNAL_ID_COLUMN = "__id" @@ -586,6 +592,91 @@ def scale_func(grp, id_columns): return df + def get_datasets( + self, + dataset: Union[Dataset, pd.DataFrame], + split_config: Dict[str, Any], + fewshot_fraction: Optional[float] = None, + fewshot_location: str = FractionLocation.LAST.value, + ) -> Tuple[Any]: + """Creates the preprocessed pytorch datasets needed for training and evaluation + using the HuggingFace trainer + + Args: + dataset (Union[Dataset, pd.DataFrame]): Loaded pandas dataframe + split_config (Dict[str, Any]): Dictionary of dictionaries containing + split parameters. For example: + { + train: [0, 50], + valid: [50, 70], + test: [70, 100] + } + end value is not inclusive + fewshot_fraction (float, optional): When non-null, return this percent of the original training + dataset. This is done to support fewshot fine-tuning. The fraction of data chosen is at the + end of the training dataset. + fewshot_location (str): Determines where the fewshot data is chosen. Valid options are "first" and "last" + as described in the enum FewshotLocation. Default is to choose the fewshot data at the end + of the training dataset (i.e., "last"). + + Returns: + Tuple of pytorch datasets, including: train, validation, test. + """ + + data = self._standardize_dataframe(dataset) + + # get split_params + # split_params = get_split_params(config, self.context_length, len(data)) + + split_params, split_function = get_split_params(split_config, context_length=self.context_length) + + # specify columns + column_specifiers = { + "id_columns": self.id_columns, + "timestamp_column": self.timestamp_column, + "target_columns": self.target_columns, + "observable_columns": self.observable_columns, + "control_columns": self.control_columns, + "conditional_columns": self.conditional_columns, + "static_categorical_columns": self.static_categorical_columns, + } + + # split data + train_data = split_function["train"](data, id_columns=self.id_columns, **split_params["train"]) + valid_data = split_function["valid"](data, id_columns=self.id_columns, **split_params["valid"]) + test_data = split_function["test"](data, id_columns=self.id_columns, **split_params["test"]) + + # data preprocessing + self.train(train_data) + + # handle fewshot operation + if fewshot_fraction is not None: + if not ((fewshot_fraction <= 1.0) and (fewshot_fraction > 0.0)): + raise ValueError(f"Fewshot fraction should be between 0 and 1, received {fewshot_fraction}") + + train_data = select_by_fixed_fraction( + train_data, + id_columns=self.id_columns, + fraction=fewshot_fraction, + location=fewshot_location, + ) + + params = column_specifiers + params["context_length"] = self.context_length + params["prediction_length"] = self.prediction_length + + # get torch datasets + test_dataset = ForecastDFDataset( + self.preprocess(test_data), + **params, + ) + train_dataset = ForecastDFDataset(self.preprocess(train_data), **params) + valid_dataset = ForecastDFDataset( + self.preprocess(valid_data), + **params, + ) + return train_dataset, valid_dataset, test_dataset + def create_timestamps( last_timestamp: Union[datetime.datetime, pd.Timestamp], diff --git a/tsfm_public/toolkit/util.py b/tsfm_public/toolkit/util.py index 270b100a..71359b1a 100644 --- a/tsfm_public/toolkit/util.py +++ b/tsfm_public/toolkit/util.py @@ -3,13 +3,21 @@ """Basic functions and utilities""" import copy +import enum from datetime import datetime from distutils.util import strtobool -from typing import Any, List, Optional, Union +from typing import Any, Callable, Dict, List, Optional, Tuple, Union import pandas as pd +class FractionLocation(enum.Enum): + """`Enum` for the different locations where a fraction of data can be chosen.""" + + FIRST = "first" + LAST = "last" + + def select_by_timestamp( df: pd.DataFrame, timestamp_column: str = "timestamp", @@ -145,6 +153,53 @@ def select_by_relative_fraction( return pd.concat(result) +def select_by_fixed_fraction( + df: pd.DataFrame, + id_columns: Optional[List[str]] = None, + fraction: float = 1.0, + location: str = FractionLocation.FIRST.value, +) -> pd.DataFrame: + """Select a portion of a dataset based on a fraction of the data. + Fraction can either be located at the start (location = FractionLocation.FIRST) or at the end (location = FractionLocation.LAST) + + Args: + df (pd.DataFrame): Input dataframe. + id_columns (List[str], optional): Columns which specify the IDs in the dataset. Defaults to None. + fraction (float): The fraction to select. + location (str): Location of where to select the fraction Defaults to FractionLocation.FIRST.value. + + Raises: + ValueError: Raised when the + + Returns: + pd.DataFrame: Subset of the dataframe. + """ + + if fraction < 0 or fraction > 1: + raise ValueError("The value of fraction should be between 0 and 1.") + + if not id_columns: + return _split_group_by_fixed_fraction( + df, + fraction=fraction, + location=location, + ).copy() + + groups = df.groupby(_get_groupby_columns(id_columns)) + result = [] + for name, group in groups: + result.append( + _split_group_by_fixed_fraction( + group, + name=name, + fraction=fraction, + location=location, + ) + ) + + return pd.concat(result) + + def _get_groupby_columns(id_columns: List[str]) -> Union[List[str], str]: if not isinstance(id_columns, (List)): raise ValueError("id_columns must be a list") @@ -202,7 +257,30 @@ def _split_group_by_fraction( else: end_index = None - return _split_group_by_index(group_df=group_df, start_index=start_index, end_index=end_index) + return _split_group_by_index(group_df=group_df, name=name, start_index=start_index, end_index=end_index) + + +def _split_group_by_fixed_fraction( + group_df: pd.DataFrame, + name: Optional[str] = None, + fraction: float = 1.0, + location: Optional[str] = None, +): + l = len(group_df) + fraction_size = int(fraction * l) + + if location == FractionLocation.FIRST.value: + start_index = 0 + end_index = fraction_size + elif location == FractionLocation.LAST.value: + start_index = l - fraction_size + end_index = l + else: + raise ValueError( + f"`location` should be either `{FractionLocation.FIRST.value}` or `{FractionLocation.LAST.value}`" + ) + + return _split_group_by_index(group_df=group_df, name=name, start_index=start_index, end_index=end_index) def convert_tsf_to_dataframe( @@ -336,6 +414,47 @@ def convert_tsf_to_dataframe( ) +def get_split_params( + split_config: Dict[str, List[Union[int, float]]], + context_length: Optional[int] = None, +) -> Tuple[Dict[str, Dict[str, Union[int, float]]], Dict[str, Callable]]: + """Get split parameters + + Args: + split_config (Dict[str, List[int, float]]): Dictionary containing keys for + train, valid, test. Each value consists of a list of length two, indicating + the boundaries of a split. + context_length (int, optional): Context length, used only when offseting + the split so predictions can be made for all elements of split. Defaults to None. + + Returns: + Tuple[Dict[str, Dict[str, Union[int, float]]], Dict[str, Callable]]: Tuple of split parameters + and split functions to use to split the data. + """ + + split_params = {} + split_function = {} + + for group in ["train", "test", "valid"]: + if ((split_config[group][0] < 1) and (split_config[group][0] != 0)) or (split_config[group][1] < 1): + split_params[group] = { + "start_fraction": split_config[group][0], + "end_fraction": split_config[group][1], + "start_offset": (context_length if (context_length and group != "train") else 0), + } + split_function[group] = select_by_relative_fraction + else: + split_params[group] = { + "start_index": ( + split_config[group][0] - (context_length if (context_length and group != "train") else 0) + ), + "end_index": split_config[group][1], + } + split_function[group] = select_by_index + + return split_params, split_function + + def convert_tsf(filename: str) -> pd.DataFrame: """Converts a tsf format file into a pandas dataframe. Returns the result in canonical multi-time series format, with an ID column, and timestamp.