Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline enhancement #90

Merged
merged 4 commits into from
Jul 26, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 46 additions & 46 deletions tsfm_public/toolkit/time_series_forecasting_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def run_single(self, inputs, preprocess_params, forward_params, postprocess_para
dataset = self.preprocess(inputs, **preprocess_params)

batch_size = forward_params["batch_size"]
num_workers = forward_params["num_workers"]
signature = inspect.signature(self.model.forward)
signature_columns = list(signature.parameters.keys())

Expand All @@ -64,10 +65,10 @@ def run_single(self, inputs, preprocess_params, forward_params, postprocess_para
signature_columns=signature_columns,
logger=None,
description=None,
model_name=self.model.__class__.__name__,
model_name=None,
)
dataloader = DataLoader(
dataset, num_workers=1, batch_size=batch_size, collate_fn=remove_columns_collator, shuffle=False
dataset, batch_size=batch_size, num_workers=num_workers, collate_fn=remove_columns_collator, shuffle=False
)

# iterate over dataloader
Expand Down Expand Up @@ -103,11 +104,10 @@ def run_single(self, inputs, preprocess_params, forward_params, postprocess_para
build_pipeline_init_args(has_tokenizer=False, has_feature_extractor=True, has_image_processor=False)
)
class TimeSeriesForecastingPipeline(TimeSeriesPipeline):
"""Hugging Face Pipeline for Time Series Forecasting
"""
Time Series Forecasting using HF time series forecasting models. This pipeline consumes a `pandas.DataFrame`
containing the time series data and produces a new `pandas.DataFrame` containing the forecasts.

feature_extractor (TimeSeriesPreprocessor): A time series preprpocessor object that specifies how the time
series should be prepared. If this is provided, and of the other options below will be automatically
populated from this instance.
"""

def __init__(
Expand All @@ -116,11 +116,13 @@ def __init__(
freq: Optional[str] = None,
explode_forecasts: bool = False,
inverse_scale_outputs: bool = True,
add_known_ground_truth: bool = True,
**kwargs,
):
kwargs["freq"] = freq
kwargs["explode_forecasts"] = explode_forecasts
kwargs["inverse_scale_outputs"] = inverse_scale_outputs
kwargs["add_known_ground_truth"] = add_known_ground_truth
super().__init__(*args, **kwargs)

if self.framework == "tf":
Expand All @@ -132,14 +134,30 @@ def _sanitize_parameters(
self,
**kwargs,
):
"""Assign parameters to the different parts of the process.
"""Assigns parameters to the different steps of the process. If context_length and prediction_length
are not provided they are taken from the model config.

For expected parameters see the call method below.
"""

context_length = kwargs.get("context_length", self.model.config.context_length)
prediction_length = kwargs.get("prediction_length", self.model.config.prediction_length)

# autopopulate from feature extractor
if self.feature_extractor:
for p in [
"id_columns",
"timestamp_column",
"target_columns",
"observable_columns",
"control_columns",
"conditional_columns",
"static_categorical_columns",
"freq",
]:
if p not in kwargs:
kwargs[p] = getattr(self.feature_extractor, p)

preprocess_kwargs = {
"prediction_length": prediction_length,
"context_length": context_length,
Expand Down Expand Up @@ -170,6 +188,7 @@ def _sanitize_parameters(
"freq",
"explode_forecasts",
"inverse_scale_outputs",
"add_known_ground_truth",
]

for c in preprocess_params:
Expand All @@ -188,7 +207,14 @@ def _sanitize_parameters(
else:
batch_size = self._batch_size

forward_kwargs = {"batch_size": batch_size}
num_workers = kwargs.get("num_workers", self._num_workers)
if num_workers is None:
if self._num_workers is None:
num_workers = 0
else:
num_workers = self._num_workers

forward_kwargs = {"batch_size": batch_size, "num_workers": num_workers}

# if "id_columns" in kwargs:
# preprocess_kwargs["id_columns"] = kwargs["id_columns"]
Expand Down Expand Up @@ -225,7 +251,7 @@ def __call__(
i.e., exogenous or supporting features which are known in advance.

feature_extractor (TimeSeriesPreprocessor): A time series preprpocessor object that specifies how the time
series should be prepared. If this is provided, and of the other options below will be automatically
series should be prepared. If this is provided, any of the other options below will be automatically
populated from this instance.

timestamp_column (str): The name of the column containing the timestamp of the time series.
Expand Down Expand Up @@ -266,6 +292,10 @@ def __call__(

inverse_scale_outputs (bool): If true and a valid feature extractor is provided, the outputs will be inverse scaled.

add_known_ground_truth (bool): If True add columns containing the ground truth data. Prediction columns will have a
suffix of "_prediction". Default True. If false, on columns containing predictions are produced, no suffix is
added.

Return (pandas dataframe):
A new pandas dataframe containing the forecasts. Each row will contain the id, timestamp, the original
input feature values and the output forecast for each input column. The output forecast is a list containing
Expand All @@ -285,6 +315,8 @@ def preprocess(self, time_series, **kwargs) -> Dict[str, Union[GenericTensor, Li
id_columns = kwargs.get("id_columns")
# context_length = kwargs.get("context_length")

# use the feature extractor here

if isinstance(time_series, str):
time_series = pd.read_csv(
time_series,
Expand Down Expand Up @@ -326,17 +358,6 @@ def preprocess(self, time_series, **kwargs) -> Dict[str, Union[GenericTensor, Li
**kwargs,
)

# # stack all the outputs
# # torch tensors are stacked, but other values are passed through as a list
# first = dataset[0]
# full_output = {}
# for k, v in first.items():
# if isinstance(v, torch.Tensor):
# full_output[k] = torch.stack(tuple(r[k] for r in dataset))
# else:
# full_output[k] = [r[k] for r in dataset]

# return full_output
return dataset

def _forward(self, model_inputs, **kwargs):
Expand All @@ -350,29 +371,6 @@ def _forward(self, model_inputs, **kwargs):
original input keys.
"""

# Eventually we should use inspection somehow
# inspect.signature(model_forward).parameters.keys()
# model_input_keys = {
# "past_values",
# "static_categorical_values",
# "freq_token",
# } # todo: this should not be hardcoded

# signature = inspect.signature(self.model.forward)
# model_input_keys = list(signature.parameters.keys())

# model_inputs_only = {}
# for k in model_input_keys:
# if k in model_inputs:
# model_inputs_only[k] = model_inputs[k]

# model_outputs = self.model(**model_inputs_only)

# # copy the other inputs
# copy_inputs = True
# for k in [akey for akey in model_inputs.keys() if (akey not in model_input_keys) or copy_inputs]:
# model_outputs[k] = model_inputs[k]

model_outputs = self.model(**model_inputs)

return model_outputs
Expand All @@ -392,14 +390,16 @@ def postprocess(self, input, **kwargs):

# name the predictions of target columns
# outputs should only have size equal to target columns

prediction_columns = []
for i, c in enumerate(kwargs["target_columns"]):
prediction_columns.append(f"{c}_prediction")
prediction_columns.append(f"{c}_prediction" if kwargs["add_known_ground_truth"] else c)
out[prediction_columns[-1]] = input[model_output_key][:, :, i].numpy().tolist()
# provide the ground truth values for the targets
# when future is unknown, we will have augmented the provided dataframe with NaN values to cover the future
for i, c in enumerate(kwargs["target_columns"]):
out[c] = input["future_values"][:, :, i].numpy().tolist()
if kwargs["add_known_ground_truth"]:
for i, c in enumerate(kwargs["target_columns"]):
out[c] = input["future_values"][:, :, i].numpy().tolist()

if "timestamp_column" in kwargs:
out[kwargs["timestamp_column"]] = input["timestamp"]
Expand Down
Loading