Skip to content

Commit

Permalink
Merge pull request #24 from IBM/get_dataset_fixes
Browse files Browse the repository at this point in the history
Get dataset fixes
  • Loading branch information
wgifford authored Apr 4, 2024
2 parents f260c92 + 2e66f31 commit 547f0fb
Show file tree
Hide file tree
Showing 8 changed files with 531 additions and 83 deletions.
21 changes: 21 additions & 0 deletions hacking/datasets_from_tsp.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,24 @@
split_config = {"train": [0, 0.7], "valid": [0.7, 0.9], "test": [0.9, 1]}

train, valid, test = tsp.get_datasets(df, split_config)


# %%

df = pd.read_csv("/Users/wmgifford/Downloads/weather.csv", parse_dates=["date"])

tsp = TimeSeriesPreprocessor(
timestamp_column="date",
id_columns=[],
target_columns=[],
prediction_length=96,
context_length=512,
)

a, b, c = tsp.get_datasets(
df,
split_config={
"train": 0.7,
"test": 0.2,
},
)
15 changes: 14 additions & 1 deletion tests/toolkit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def ts_data():
"id2": nreps(["XX", "YY", "ZZ"], 50),
"timestamp": [datetime(2021, 1, 1) + timedelta(days=i) for i in range(50)] * 3,
"value1": range(150),
"value2": np.arange(150) / 3 + 10,
"value2": np.arange(150) ** 2 / 3 + 10,
}
)
return df
Expand All @@ -35,3 +35,16 @@ def sample_data():
}
)
return df


@pytest.fixture(scope="module")
def ts_data_runs():
df = pd.DataFrame(
{
"run_id": nreps(["1", "2", "3", "4"], 50),
"asset_id": nreps(["foo", "bar", "foo", "bar"], 50),
"timestamp": [datetime(2021, 1, 1) + timedelta(days=i) for i in range(50)] * 4,
"value1": range(200),
}
)
return df
60 changes: 60 additions & 0 deletions tests/toolkit/test_time_series_forecasting_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from tsfm_public.toolkit.time_series_forecasting_pipeline import (
TimeSeriesForecastingPipeline,
)
from tsfm_public.toolkit.time_series_preprocessor import TimeSeriesPreprocessor
from tsfm_public.toolkit.util import select_by_index


Expand Down Expand Up @@ -79,3 +80,62 @@ def test_forecasting_pipeline_forecasts():

forecasts_exploded = forecast_pipeline(test_data)
assert forecasts_exploded.shape == (prediction_length, len(target_columns) + 1)


def test_forecasting_pipeline_forecasts_with_preprocessor():
timestamp_column = "date"
id_columns = []
target_columns = ["HUFL", "HULL", "MUFL", "MULL", "LUFL", "LULL", "OT"]
prediction_length = 96

model_path = "ibm/patchtst-etth1-forecasting"
model = PatchTSTForPrediction.from_pretrained(model_path)
context_length = model.config.context_length

tsp = TimeSeriesPreprocessor(
timestamp_column=timestamp_column,
id_columns=id_columns,
target_columns=target_columns,
context_length=context_length,
prediction_length=prediction_length,
freq="1h",
)

forecast_pipeline = TimeSeriesForecastingPipeline(
model=model,
timestamp_column=timestamp_column,
id_columns=id_columns,
target_columns=target_columns,
freq="1h",
feature_extractor=tsp,
explode_forecasts=False,
)

dataset_path = "https://raw.githubusercontent.com/zhouhaoyi/ETDataset/main/ETT-small/ETTh2.csv"
data = pd.read_csv(
dataset_path,
parse_dates=[timestamp_column],
)
test_end_index = 12 * 30 * 24 + 8 * 30 * 24
test_start_index = test_end_index - context_length - 4

data = pd.read_csv(
dataset_path,
parse_dates=[timestamp_column],
)

test_data = select_by_index(
data,
id_columns=id_columns,
start_index=test_start_index,
end_index=test_end_index,
)

forecasts = forecast_pipeline(test_data)

assert forecasts.shape == (
test_end_index - test_start_index - context_length + 1,
2 * len(target_columns) + 1,
)

# to do: add check on the scaling
162 changes: 161 additions & 1 deletion tests/toolkit/test_time_series_preprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,75 @@ def test_time_series_preprocessor_encodes(sample_data):
assert sample_prep[c].dtype == float


def test_time_series_preprocessor_scales(ts_data):
df = ts_data

tsp = TimeSeriesPreprocessor(
timestamp_column="timestamp",
prediction_length=2,
context_length=5,
id_columns=["id", "id2"],
target_columns=["value1", "value2"],
scaling=True,
)

tsp.train(df)

# check scaled result
out = tsp.preprocess(df)
assert np.allclose(out.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.mean(x)), 0.0)
assert np.allclose(out.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.std(x)), 1.0)

# check inverse scale result
out_inv = tsp.inverse_scale_targets(out)
assert np.all(
out_inv.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.mean(x))
== df.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.mean(x))
)
assert np.all(
out_inv.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.std(x))
== df.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.std(x))
)

# check inverse scale result, with suffix

suffix = "_foo"
targets_suffix = [f"{c}{suffix}" for c in tsp.target_columns]
out.columns = [f"{c}{suffix}" if c in tsp.target_columns else c for c in out.columns]
out_inv = tsp.inverse_scale_targets(out, suffix=suffix)
assert np.all(
out_inv.groupby(tsp.id_columns)[targets_suffix].apply(lambda x: np.mean(x))
== df.groupby(tsp.id_columns)[tsp.target_columns].apply(lambda x: np.mean(x))
)


def test_time_series_preprocessor_inv_scales_lists(ts_data):
df = ts_data

tsp = TimeSeriesPreprocessor(
timestamp_column="timestamp",
prediction_length=2,
context_length=5,
id_columns=["id", "id2"],
target_columns=["value1", "value2"],
scaling=True,
)

tsp.train(df)

# check scaled result
out = tsp.preprocess(df)

# construct artificial result
out["value1"] = out["value1"].apply(lambda x: np.array([x] * 3))
out["value2"] = out["value2"].apply(lambda x: np.array([x] * 3))

out_inv = tsp.inverse_scale_targets(out)

assert out_inv["value1"].mean()[0] == df["value1"].mean()
assert out_inv["value2"].mean()[0] == df["value2"].mean()


def test_augment_time_series(ts_data):
periods = 5
a = extend_time_series(ts_data, timestamp_column="timestamp", grouping_columns=["id"], periods=periods)
Expand Down Expand Up @@ -197,7 +266,9 @@ def test_get_datasets(ts_data):
)

# new train length should be 20% of 100, minus the usual for context length and prediction length
fewshot_train_size = int(100 * 0.2) - (tsp.context_length + tsp.prediction_length) + 1
fewshot_train_size = (
int((100 - tsp.context_length) * 0.2) + tsp.context_length - (tsp.context_length + tsp.prediction_length) + 1
)
assert len(train) == fewshot_train_size

assert len(valid) == len(test)
Expand Down Expand Up @@ -225,3 +296,92 @@ def test_get_datasets(ts_data):
assert len(train) == fewshot_train_size

assert len(valid) == len(test)

# fraction splits
# no id columns, so treat as one big time series
tsp = TimeSeriesPreprocessor(
id_columns=[],
target_columns=["value1", "value2"],
prediction_length=5,
context_length=10,
)

train, valid, test = tsp.get_datasets(
ts_data,
split_config={
"train": 0.7,
"test": 0.2,
},
)

assert len(train) == int(150 * 0.7) - (tsp.context_length + tsp.prediction_length) + 1

assert len(test) == int(150 * 0.2) - tsp.prediction_length + 1

assert len(valid) == 150 - int(150 * 0.2) - int(150 * 0.7) - tsp.prediction_length + 1


def test_train_without_targets(ts_data):
# no targets or other columns specified
tsp = TimeSeriesPreprocessor(id_columns=["id", "id2"], timestamp_column="timestamp")
tsp.train(ts_data)

assert tsp.target_columns == ["value1", "value2"]

# some other args specified
for arg in [
"control_columns",
"conditional_columns",
"observable_columns",
"static_categorical_columns",
]:
tsp = TimeSeriesPreprocessor(
id_columns=["id", "id2"],
timestamp_column="timestamp",
**{arg: ["value2"]},
)
tsp.train(ts_data)

assert tsp.target_columns == ["value1"]

# test targets honored
tsp = TimeSeriesPreprocessor(
id_columns=["id", "id2"],
timestamp_column="timestamp",
target_columns=["value2"],
)
tsp.train(ts_data)

assert tsp.target_columns == ["value2"]


def test_get_datasets_without_targets(ts_data):
ts_data = ts_data.drop(columns=["id", "id2"])
tsp = TimeSeriesPreprocessor(
timestamp_column="timestamp",
prediction_length=2,
context_length=5,
)

train, _, _ = tsp.get_datasets(ts_data, split_config={"train": 0.7, "test": 0.2})

train.datasets[0].target_columns == ["value1", "value2"]


def test_id_columns_and_scaling_id_columns(ts_data_runs):
df = ts_data_runs

tsp = TimeSeriesPreprocessor(
timestamp_column="timestamp",
prediction_length=2,
context_length=5,
id_columns=["asset_id", "run_id"],
scaling_id_columns=["asset_id"],
target_columns=["value1"],
scaling=True,
)

ds_train, ds_valid, ds_test = tsp.get_datasets(df, split_config={"train": 0.7, "test": 0.2})

assert len(tsp.target_scaler_dict) == 2
assert len(ds_train.datasets) == 4
27 changes: 26 additions & 1 deletion tests/toolkit/test_util.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
"""Tests for util functions"""

import pandas as pd
import pytest

from tsfm_public.toolkit.util import get_split_params
from tsfm_public.toolkit.util import get_split_params, train_test_split


split_cases = [
Expand All @@ -24,3 +25,27 @@ def test_get_split_params(left_arg, right_arg, expected):
split_config, split_function = get_split_params({"train": [left_arg, right_arg], "valid": [0, 1], "test": [0, 1]})

assert split_function["train"].__name__ == expected


def test_train_test_split():
n = 100
df = pd.DataFrame({"date": range(n), "value": range(n)})

train, valid, test = train_test_split(df, train=0.7, test=0.2)

assert len(train) == int(n * 0.7)
assert len(test) == int(n * 0.2)
valid_len_100 = n - int(n * 0.7) - int(n * 0.2)
assert len(valid) == valid_len_100

n = 101
df = pd.DataFrame({"date": range(n), "value": range(n)})

train, valid, test = train_test_split(df, train=0.7, test=0.2)

assert len(train) == int(n * 0.7)
assert len(test) == int(n * 0.2)
valid_len_101 = n - int(n * 0.7) - int(n * 0.2)
assert len(valid) == valid_len_101

assert valid_len_100 + 1 == valid_len_101
Loading

0 comments on commit 547f0fb

Please sign in to comment.