forked from h2oai/driverlessai-recipes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
auto_arima_forecast.py
168 lines (151 loc) · 6.88 KB
/
auto_arima_forecast.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
"""Auto ARIMA transformer is a time series transformer that predicts target using ARIMA models"""
# For more information about the python ARIMA package
# please visit https://www.alkaline-ml.com/pmdarima/index.html
import importlib
from h2oaicore.transformer_utils import CustomTimeSeriesTransformer
import datatable as dt
import numpy as np
import pandas as pd
from h2oaicore.systemutils import make_experiment_logger, loggerinfo, loggerwarning
class MyAutoArimaTransformer(CustomTimeSeriesTransformer):
_binary = False
_multiclass = False
_modules_needed_by_name = ['pmdarima']
_included_model_classes = None
@staticmethod
def get_default_properties():
return dict(col_type="time_column", min_cols=1, max_cols=1, relative_importance=1)
def fit(self, X: dt.Frame, y: np.array = None):
"""
Fits ARIMA models (1 per time group) using historical target values contained in y
:param X: Datatable frame containing the features
:param y: numpy array containing the historical values of the target
:return: self
"""
# Import the ARIMA python module
pm = importlib.import_module('pmdarima')
# Init models
self.models = {}
# Convert to pandas
X = X.to_pandas()
XX = X[self.tgc].copy()
XX['y'] = np.array(y)
self.nan_value = np.mean(y)
self.ntrain = X.shape[0]
# Group the input by TGC (Time group column) excluding the time column itself
tgc_wo_time = list(np.setdiff1d(self.tgc, self.time_column))
if len(tgc_wo_time) > 0:
XX_grp = XX.groupby(tgc_wo_time)
else:
XX_grp = [([None], XX)]
# Get the logger if it exists
logger = None
if self.context and self.context.experiment_id:
logger = make_experiment_logger(
experiment_id=self.context.experiment_id,
tmp_dir=self.context.tmp_dir,
experiment_tmp_dir=self.context.experiment_tmp_dir
)
# Build 1 ARIMA model per time group columns
nb_groups = len(XX_grp)
for _i_g, (key, X) in enumerate(XX_grp):
# Just say where we are in the fitting process
if (_i_g + 1) % max(1, nb_groups // 20) == 0:
loggerinfo(logger, "Auto ARIMA : %d%% of groups fitted" % (100 * (_i_g + 1) // nb_groups))
key = key if isinstance(key, list) else [key]
grp_hash = '_'.join(map(str, key))
# print("auto arima - fitting on data of shape: %s for group: %s" % (str(X.shape), grp_hash))
order = np.argsort(X[self.time_column])
try:
model = pm.auto_arima(X['y'].values[order], error_action='ignore')
except:
model = None
self.models[grp_hash] = model
return self
def transform(self, X: dt.Frame):
"""
Uses fitted models (1 per time group) to predict the target
If self.is_train exists, it means we are doing in-sample predictions
if it does not then we Arima is used to predict the future
:param X: Datatable Frame containing the features
:return: ARIMA predictions
"""
X = X.to_pandas()
XX = X[self.tgc].copy()
tgc_wo_time = list(np.setdiff1d(self.tgc, self.time_column))
if len(tgc_wo_time) > 0:
XX_grp = XX.groupby(tgc_wo_time)
else:
XX_grp = [([None], XX)]
# Get the logger if it exists
logger = None
if self.context and self.context.experiment_id:
logger = make_experiment_logger(
experiment_id=self.context.experiment_id,
tmp_dir=self.context.tmp_dir,
experiment_tmp_dir=self.context.experiment_tmp_dir
)
nb_groups = len(XX_grp)
preds = []
for _i_g, (key, X) in enumerate(XX_grp):
# Just say where we are in the fitting process
if (_i_g + 1) % max(1, nb_groups // 20) == 0:
loggerinfo(logger, "Auto ARIMA : %d%% of groups transformed" % (100 * (_i_g + 1) // nb_groups))
key = key if isinstance(key, list) else [key]
grp_hash = '_'.join(map(str, key))
# print("auto arima - transforming data of shape: %s for group: %s" % (str(X.shape), grp_hash))
order = np.argsort(X[self.time_column])
if grp_hash in self.models:
model = self.models[grp_hash]
if model is not None:
yhat = model.predict_in_sample() \
if hasattr(self, 'is_train') else model.predict(n_periods=X.shape[0])
yhat = yhat[order]
XX = pd.DataFrame(yhat, columns=['yhat'])
else:
XX = pd.DataFrame(np.full((X.shape[0], 1), self.nan_value), columns=['yhat']) # invalid model
else:
XX = pd.DataFrame(np.full((X.shape[0], 1), self.nan_value), columns=['yhat']) # unseen groups
XX.index = X.index
preds.append(XX)
XX = pd.concat(tuple(preds), axis=0).sort_index()
return XX
def fit_transform(self, X: dt.Frame, y: np.array = None):
"""
Fits the ARIMA models (1 per time group) and outputs the corresponding predictions
:param X: Datatable Frame
:param y: Target to be used to fit the ARIMA model and perdict in-sample
:return: in-sample ARIMA predictions
"""
# Flag the fact we are doing in-sample predictions
self.is_train = True
ret = self.fit(X, y).transform(X)
del self.is_train
return ret
def update_history(self, X: dt.Frame, y: np.array = None):
"""
Update the model fit with additional observed endog/exog values.
Updating an ARIMA adds new observations to the model, updating the MLE of the parameters
accordingly by performing several new iterations (maxiter) from the existing model parameters.
:param X: Datatable Frame containing input features
:param y: Numpy array containing new observations to update the ARIMA model
:return:
"""
X = X.to_pandas()
XX = X[self.tgc].copy
XX['y'] = np.array(y)
tgc_wo_time = list(np.setdiff1d(self.tgc, self.time_column))
if len(tgc_wo_time) > 0:
XX_grp = XX.groupby(tgc_wo_time)
else:
XX_grp = [([None], XX)]
for key, X in XX_grp:
key = key if isinstance(key, list) else [key]
grp_hash = '_'.join(map(str, key))
print("auto arima - update history with data of shape: %s for group: %s" % (str(X.shape), grp_hash))
order = np.argsort(X[self.time_column])
if grp_hash in self.models:
model = self.models[grp_hash]
if model is not None:
model.update(X['y'].values[order])
return self