diff --git a/tsfm_public/toolkit/time_series_forecasting_pipeline.py b/tsfm_public/toolkit/time_series_forecasting_pipeline.py index 39e3e0e5..6f9961da 100644 --- a/tsfm_public/toolkit/time_series_forecasting_pipeline.py +++ b/tsfm_public/toolkit/time_series_forecasting_pipeline.py @@ -101,11 +101,10 @@ def run_single(self, inputs, preprocess_params, forward_params, postprocess_para build_pipeline_init_args(has_tokenizer=False, has_feature_extractor=True, has_image_processor=False) ) class TimeSeriesForecastingPipeline(TimeSeriesPipeline): - """Hugging Face Pipeline for Time Series Forecasting + """ + Time Series Forecasting using HF time series forecasting models. This pipeline consumes a `pandas.DataFrame` + containing the time series data and produces a new `pandas.DataFrame` containing the forecasts. - feature_extractor (TimeSeriesPreprocessor): A time series preprpocessor object that specifies how the time - series should be prepared. If this is provided, and of the other options below will be automatically - populated from this instance. """ def __init__( @@ -132,7 +131,8 @@ def _sanitize_parameters( self, **kwargs, ): - """Assign parameters to the different parts of the process. + """Assigns parameters to the different steps of the process. If context_length and prediction_length + are not provided they are taken from the model config. For expected parameters see the call method below. """ @@ -140,6 +140,21 @@ def _sanitize_parameters( context_length = kwargs.get("context_length", self.model.config.context_length) prediction_length = kwargs.get("prediction_length", self.model.config.prediction_length) + # autopopulate from feature extractor + if self.feature_extractor: + for p in [ + "id_columns", + "timestamp_column", + "target_columns", + "observable_columns", + "control_columns", + "conditional_columns", + "static_categorical_columns", + "freq", + ]: + if p not in kwargs: + kwargs[p] = getattr(self.feature_extractor, p) + preprocess_kwargs = { "prediction_length": prediction_length, "context_length": context_length, @@ -226,7 +241,7 @@ def __call__( i.e., exogenous or supporting features which are known in advance. feature_extractor (TimeSeriesPreprocessor): A time series preprpocessor object that specifies how the time - series should be prepared. If this is provided, and of the other options below will be automatically + series should be prepared. If this is provided, any of the other options below will be automatically populated from this instance. timestamp_column (str): The name of the column containing the timestamp of the time series. @@ -267,6 +282,10 @@ def __call__( inverse_scale_outputs (bool): If true and a valid feature extractor is provided, the outputs will be inverse scaled. + add_known_ground_truth (bool): If True add columns containing the ground truth data. Prediction columns will have a + suffix of "_prediction". Default True. If false, on columns containing predictions are produced, no suffix is + added. + Return (pandas dataframe): A new pandas dataframe containing the forecasts. Each row will contain the id, timestamp, the original input feature values and the output forecast for each input column. The output forecast is a list containing @@ -286,6 +305,8 @@ def preprocess(self, time_series, **kwargs) -> Dict[str, Union[GenericTensor, Li id_columns = kwargs.get("id_columns") # context_length = kwargs.get("context_length") + # use the feature extractor here + if isinstance(time_series, str): time_series = pd.read_csv( time_series, @@ -327,17 +348,6 @@ def preprocess(self, time_series, **kwargs) -> Dict[str, Union[GenericTensor, Li **kwargs, ) - # # stack all the outputs - # # torch tensors are stacked, but other values are passed through as a list - # first = dataset[0] - # full_output = {} - # for k, v in first.items(): - # if isinstance(v, torch.Tensor): - # full_output[k] = torch.stack(tuple(r[k] for r in dataset)) - # else: - # full_output[k] = [r[k] for r in dataset] - - # return full_output return dataset def _forward(self, model_inputs, **kwargs):