在本教程中,您将了解如何开发网格搜索所有 SARIMA 模型超参数的框架,以进行单变量时间序列预测。
- 如何使用前向验证从零开始开发网格搜索 SARIMA 模型的框架。
- 如何为出生日常时间序列数据网格搜索 SARIMA 模型超参数。
- 如何对洗发水销售,汽车销售和温度的月度时间序列数据进行网格搜索 SARIMA 模型超参数。
如何在 Python 中搜索用于时间序列预测的 SARIMA 模型超参数 Thomas 的照片,保留一些权利。
- SARIMA 用于时间序列预测
- 开发网格搜索框架
- 案例研究 1:没有趋势或季节性
- 案例研究 2:趋势
- 案例研究 3:季节性
- 案例研究 4:趋势和季节性
季节性自回归整合移动平均线,SARIMA 或季节性 ARIMA,是 ARIMA 的扩展,明确支持具有季节性成分的单变量时间序列数据。
通过在 ARIMA 中包含额外的季节性术语来形成季节性 ARIMA 模型[...]模型的季节性部分由与模型的非季节性组成非常相似的术语组成,但它们涉及季节性时段的后移。
- 第 242 页,预测:原则和实践,2013。
配置 SARIMA 需要为系列的趋势和季节性元素选择超参数。
它们与 ARIMA 模型相同;特别:
- p :趋势自动回归顺序。
- d :趋势差异顺序。
- q :趋势均线。
有四个不属于 ARIMA 的季节性元素必须配置;他们是:
- P :季节性自回归顺序。
- D :季节性差异顺序。
- Q :季节性移动平均线。
- m :单个季节性时段的时间步数。
同时,SARIMA 模型的表示法指定为:
SARIMA 模型可以通过模型配置参数包含 ARIMA,ARMA,AR 和 MA 模型。
季节性 ARIMA 模型可能具有大量参数和术语组合。因此,在拟合数据时尝试各种模型并使用适当的标准选择最佳拟合模型是合适的...
- 第 143-144 页,介绍时间序列与 R ,2009 年。
在本节中,我们将针对给定的单变量时间序列预测问题开发网格搜索 SARIMA 模型超参数的框架。
我们将使用 statsmodels 库提供的 SARIMA 的实现。
- order :用于趋势建模的元组 p,d 和 q 参数。
- sesonal_order :用于建模季节性的 P,D,Q 和 m 参数元组
- 趋势:用于将确定性趋势模型控制为“n”,“c”,“t”,“ct”之一的参数,无趋势,常数,线性和常数,线性趋势,分别。
我们还尝试通过放宽约束来使模型健壮,例如数据必须是静止的并且 MA 变换是可逆的。
# one-step sarima forecast
def sarima_forecast(history, config):
order, sorder, trend = config
# define model
model = SARIMAX(history, order=order, seasonal_order=sorder, trend=trend, enforce_stationarity=False, enforce_invertibility=False)
# fit model
model_fit = model.fit(disp=False)
# make one step forecast
yhat = model_fit.predict(len(history), len(history))
return yhat[0]
我们可以使用给定指定大小的分割的切片来分割列表或 NumPy 数据数组,例如,从测试集中的数据中使用的时间步数。
# split a univariate dataset into train/test sets
def train_test_split(data, n_test):
return data[:-n_test], data[-n_test:]
时间序列预测有许多流行的错误分数。在这种情况下,我们将使用均方根误差(RMSE),但您可以将其更改为您的首选度量,例如 MAPE,MAE 等
函数将根据实际(测试集)和预测值列表计算 RMSE。
# root mean squared error or rmse
def measure_rmse(actual, predicted):
return sqrt(mean_squared_error(actual, predicted))
# walk-forward validation for univariate data
def walk_forward_validation(data, n_test, cfg):
predictions = list()
# split dataset
train, test = train_test_split(data, n_test)
# seed history with training dataset
history = [x for x in train]
# step over each time-step in the test set
for i in range(len(test)):
# fit model and make forecast for history
yhat = sarima_forecast(history, cfg)
# store forecast in list of predictions
# add actual observation to history for the next loop
# estimate prediction error
error = measure_rmse(test, predictions)
return error
的调用,并更改 _ 中的错误计算 measure_rmse()_ 功能。
我们可以使用不同的模型配置列表重复调用 walk_forward_validation()。
此外,某些型号还可能会对某些数据发出警告,例如:来自 statsmodels 库调用的线性代数库。
中,并使用 try-except 和 block 来忽略警告。我们还可以添加调试支持来禁用这些保护,以防我们想要查看实际情况。最后,如果确实发生了错误,我们可以返回 None 结果,否则我们可以打印一些有关每个模型评估技能的信息。当评估大量模型时,这很有用。
# score a model, return None on failure
def score_model(data, n_test, cfg, debug=False):
result = None
# convert config to a key
key = str(cfg)
# show all warnings and fail on exception if debugging
if debug:
result = walk_forward_validation(data, n_test, cfg)
# one failure during model validation suggests an unstable config
# never show warnings when grid searching, too noisy
with catch_warnings():
result = walk_forward_validation(data, n_test, cfg)
error = None
# check for an interesting result
if result is not None:
print(' > Model[%s] %.3f' % (key, result))
return (key, result)
通过并行评估模型配置,我们可以大大加快网格搜索过程。一种方法是使用 Joblib 库。
我们可以定义一个 Parallel 对象,其中包含要使用的核心数,并将其设置为硬件中检测到的分数。
executor = Parallel(n_jobs=cpu_count(), backend='multiprocessing')
tasks = (delayed(score_model)(data, n_test, cfg) for cfg in cfg_list)
最后,我们可以使用 Parallel 对象并行执行任务列表。
scores = executor(tasks)
scores = [score_model(data, n_test, cfg) for cfg in cfg_list]
评估配置列表的结果将是元组列表,每个元组都有一个名称,该名称总结了特定的模型配置,并且使用该配置评估的模型的错误为 RMSE,如果出现错误则为 None。
scores = [r for r in scores if r[1] != None]
函数实现此行为。可选的 _ 并行 _ 参数允许对所有内核的模型进行开启或关闭调整,默认情况下处于打开状态。
# grid search configs
def grid_search(data, cfg_list, n_test, parallel=True):
scores = None
if parallel:
# execute configs in parallel
executor = Parallel(n_jobs=cpu_count(), backend='multiprocessing')
tasks = (delayed(score_model)(data, n_test, cfg) for cfg in cfg_list)
scores = executor(tasks)
scores = [score_model(data, n_test, cfg) for cfg in cfg_list]
# remove empty results
scores = [r for r in scores if r[1] != None]
# sort configs by error, asc
scores.sort(key=lambda tup: tup[1])
return scores
这些配置假设趋势和季节性的每个 AR,MA 和 I 分量都是低阶的,例如,关(0)或[1,2]。如果您认为订单可能更高,则可能需要扩展这些范围。可以指定季节性时段的可选列表,您甚至可以更改该功能以指定您可能了解的有关时间序列的其他元素。
理论上,有 1,296 种可能的模型配置需要评估,但在实践中,许多模型配置无效并会导致我们将陷入和忽略的错误。
# create a set of sarima configs to try
def sarima_configs(seasonal=[0]):
models = list()
# define config lists
p_params = [0, 1, 2]
d_params = [0, 1]
q_params = [0, 1, 2]
t_params = ['n','c','t','ct']
P_params = [0, 1, 2]
D_params = [0, 1]
Q_params = [0, 1, 2]
m_params = seasonal
# create config instances
for p in p_params:
for d in d_params:
for q in q_params:
for t in t_params:
for P in P_params:
for D in D_params:
for Q in Q_params:
for m in m_params:
cfg = [(p,d,q), (P,D,Q,m), t]
return models
我们现在有一个网格搜索 SARIMA 模型超参数的框架,通过一步前进验证。
它是通用的,适用于作为列表或 NumPy 数组提供的任何内存中单变量时间序列。
我们可以通过在人为设计的 10 步数据集上进行测试来确保所有部分协同工作。
# grid search sarima hyperparameters
from math import sqrt
from multiprocessing import cpu_count
from joblib import Parallel
from joblib import delayed
from warnings import catch_warnings
from warnings import filterwarnings
from statsmodels.tsa.statespace.sarimax import SARIMAX
from sklearn.metrics import mean_squared_error
# one-step sarima forecast
def sarima_forecast(history, config):
order, sorder, trend = config
# define model
model = SARIMAX(history, order=order, seasonal_order=sorder, trend=trend, enforce_stationarity=False, enforce_invertibility=False)
# fit model
model_fit = model.fit(disp=False)
# make one step forecast
yhat = model_fit.predict(len(history), len(history))
return yhat[0]
# root mean squared error or rmse
def measure_rmse(actual, predicted):
return sqrt(mean_squared_error(actual, predicted))
# split a univariate dataset into train/test sets
def train_test_split(data, n_test):
return data[:-n_test], data[-n_test:]
# walk-forward validation for univariate data
def walk_forward_validation(data, n_test, cfg):
predictions = list()
# split dataset
train, test = train_test_split(data, n_test)
# seed history with training dataset
history = [x for x in train]
# step over each time-step in the test set
for i in range(len(test)):
# fit model and make forecast for history
yhat = sarima_forecast(history, cfg)
# store forecast in list of predictions
# add actual observation to history for the next loop
# estimate prediction error
error = measure_rmse(test, predictions)
return error
# score a model, return None on failure
def score_model(data, n_test, cfg, debug=False):
result = None
# convert config to a key
key = str(cfg)
# show all warnings and fail on exception if debugging
if debug:
result = walk_forward_validation(data, n_test, cfg)
# one failure during model validation suggests an unstable config
# never show warnings when grid searching, too noisy
with catch_warnings():
result = walk_forward_validation(data, n_test, cfg)
error = None
# check for an interesting result
if result is not None:
print(' > Model[%s] %.3f' % (key, result))
return (key, result)
# grid search configs
def grid_search(data, cfg_list, n_test, parallel=True):
scores = None
if parallel:
# execute configs in parallel
executor = Parallel(n_jobs=cpu_count(), backend='multiprocessing')
tasks = (delayed(score_model)(data, n_test, cfg) for cfg in cfg_list)
scores = executor(tasks)
scores = [score_model(data, n_test, cfg) for cfg in cfg_list]
# remove empty results
scores = [r for r in scores if r[1] != None]
# sort configs by error, asc
scores.sort(key=lambda tup: tup[1])
return scores
# create a set of sarima configs to try
def sarima_configs(seasonal=[0]):
models = list()
# define config lists
p_params = [0, 1, 2]
d_params = [0, 1]
q_params = [0, 1, 2]
t_params = ['n','c','t','ct']
P_params = [0, 1, 2]
D_params = [0, 1]
Q_params = [0, 1, 2]
m_params = seasonal
# create config instances
for p in p_params:
for d in d_params:
for q in q_params:
for t in t_params:
for P in P_params:
for D in D_params:
for Q in Q_params:
for m in m_params:
cfg = [(p,d,q), (P,D,Q,m), t]
return models
if`_name_`== '__main__':
# define dataset
data = [10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0]
# data split
n_test = 4
# model configs
cfg_list = sarima_configs()
# grid search
scores = grid_search(data, cfg_list, n_test)
# list top 3 configs
for cfg, error in scores[:3]:
print(cfg, error)
[10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0]
> Model[[(2, 0, 0), (2, 0, 0, 0), 'ct']] 0.001
> Model[[(2, 0, 0), (2, 0, 1, 0), 'ct']] 0.000
> Model[[(2, 0, 1), (0, 0, 0, 0), 'n']] 0.000
> Model[[(2, 0, 1), (0, 0, 1, 0), 'n']] 0.000
[(2, 1, 0), (1, 0, 0, 0), 'n'] 0.0
[(2, 1, 0), (2, 0, 0, 0), 'n'] 0.0
[(2, 1, 1), (1, 0, 1, 0), 'n'] 0.0
现在我们有一个强大的网格搜索 SARIMA 模型超参数框架,让我们在一套标准的单变量时间序列数据集上进行测试。
选择数据集用于演示目的;我并不是说 SARIMA 模型是每个数据集的最佳方法;在某些情况下,或许 ETS 或其他更合适的东西。
“每日女性分娩”数据集总结了 1959 年美国加利福尼亚州每日女性总分娩数。
您可以从 DataMarket 了解有关数据集的更多信息。
在当前工作目录中使用文件名“ daily-total-female-births.csv ”保存文件。
将此数据集作为 Pandas 系列加载。
series = read_csv('daily-total-female-births.csv', header=0, index_col=0)
数据集有一年或 365 个观测值。我们将使用前 200 个进行训练,将剩余的 165 个作为测试集。
# grid search sarima hyperparameters for daily female dataset
from math import sqrt
from multiprocessing import cpu_count
from joblib import Parallel
from joblib import delayed
from warnings import catch_warnings
from warnings import filterwarnings
from statsmodels.tsa.statespace.sarimax import SARIMAX
from sklearn.metrics import mean_squared_error
from pandas import read_csv
# one-step sarima forecast
def sarima_forecast(history, config):
order, sorder, trend = config
# define model
model = SARIMAX(history, order=order, seasonal_order=sorder, trend=trend, enforce_stationarity=False, enforce_invertibility=False)
# fit model
model_fit = model.fit(disp=False)
# make one step forecast
yhat = model_fit.predict(len(history), len(history))
return yhat[0]
# root mean squared error or rmse
def measure_rmse(actual, predicted):
return sqrt(mean_squared_error(actual, predicted))
# split a univariate dataset into train/test sets
def train_test_split(data, n_test):
return data[:-n_test], data[-n_test:]
# walk-forward validation for univariate data
def walk_forward_validation(data, n_test, cfg):
predictions = list()
# split dataset
train, test = train_test_split(data, n_test)
# seed history with training dataset
history = [x for x in train]
# step over each time-step in the test set
for i in range(len(test)):
# fit model and make forecast for history
yhat = sarima_forecast(history, cfg)
# store forecast in list of predictions
# add actual observation to history for the next loop
# estimate prediction error
error = measure_rmse(test, predictions)
return error
# score a model, return None on failure
def score_model(data, n_test, cfg, debug=False):
result = None
# convert config to a key
key = str(cfg)
# show all warnings and fail on exception if debugging
if debug:
result = walk_forward_validation(data, n_test, cfg)
# one failure during model validation suggests an unstable config
# never show warnings when grid searching, too noisy
with catch_warnings():
result = walk_forward_validation(data, n_test, cfg)
error = None
# check for an interesting result
if result is not None:
print(' > Model[%s] %.3f' % (key, result))
return (key, result)
# grid search configs
def grid_search(data, cfg_list, n_test, parallel=True):
scores = None
if parallel:
# execute configs in parallel
executor = Parallel(n_jobs=cpu_count(), backend='multiprocessing')
tasks = (delayed(score_model)(data, n_test, cfg) for cfg in cfg_list)
scores = executor(tasks)
scores = [score_model(data, n_test, cfg) for cfg in cfg_list]
# remove empty results
scores = [r for r in scores if r[1] != None]
# sort configs by error, asc
scores.sort(key=lambda tup: tup[1])
return scores
# create a set of sarima configs to try
def sarima_configs(seasonal=[0]):
models = list()
# define config lists
p_params = [0, 1, 2]
d_params = [0, 1]
q_params = [0, 1, 2]
t_params = ['n','c','t','ct']
P_params = [0, 1, 2]
D_params = [0, 1]
Q_params = [0, 1, 2]
m_params = seasonal
# create config instances
for p in p_params:
for d in d_params:
for q in q_params:
for t in t_params:
for P in P_params:
for D in D_params:
for Q in Q_params:
for m in m_params:
cfg = [(p,d,q), (P,D,Q,m), t]
return models
if`_name_`== '__main__':
# load dataset
series = read_csv('daily-total-female-births.csv', header=0, index_col=0)
data = series.values
# data split
n_test = 165
# model configs
cfg_list = sarima_configs()
# grid search
scores = grid_search(data, cfg_list, n_test)
# list top 3 configs
for cfg, error in scores[:3]:
print(cfg, error)
在评估模型时打印模型配置和 RMSE 在运行结束时报告前三个模型配置及其错误。
我们可以看到最好的结果是大约 6.77 个出生的 RMSE,具有以下配置:
- 订单 :( 1,0,2)
- 季节性命令 :( 1,0,1,0)
- 趋势参数:'t'表示线性趋势
令人惊讶的是,具有一些季节性元素的配置导致最低的错误。我不会猜到这种配置,可能会坚持使用 ARIMA 模型。
> Model[[(2, 1, 2), (1, 0, 1, 0), 'ct']] 6.905
> Model[[(2, 1, 2), (2, 0, 0, 0), 'ct']] 7.031
> Model[[(2, 1, 2), (2, 0, 1, 0), 'ct']] 6.985
> Model[[(2, 1, 2), (1, 0, 2, 0), 'ct']] 6.941
> Model[[(2, 1, 2), (2, 0, 2, 0), 'ct']] 7.056
[(1, 0, 2), (1, 0, 1, 0), 't'] 6.770349800255089
[(0, 1, 2), (1, 0, 2, 0), 'ct'] 6.773217122759515
[(2, 1, 1), (2, 0, 2, 0), 'ct'] 6.886633191752254
您可以从 DataMarket 了解有关数据集的更多信息。
将此数据集作为 Pandas 系列加载。
# parse dates
def custom_parser(x):
return datetime.strptime('195'+x, '%Y-%m')
# load dataset
series = read_csv('shampoo.csv', header=0, index_col=0, date_parser=custom_parser)
数据集有三年,或 36 个观测值。我们将使用前 24 个用于训练,其余 12 个用作测试集。
# grid search sarima hyperparameters for monthly shampoo sales dataset
from math import sqrt
from multiprocessing import cpu_count
from joblib import Parallel
from joblib import delayed
from warnings import catch_warnings
from warnings import filterwarnings
from statsmodels.tsa.statespace.sarimax import SARIMAX
from sklearn.metrics import mean_squared_error
from pandas import read_csv
from pandas import datetime
# one-step sarima forecast
def sarima_forecast(history, config):
order, sorder, trend = config
# define model
model = SARIMAX(history, order=order, seasonal_order=sorder, trend=trend, enforce_stationarity=False, enforce_invertibility=False)
# fit model
model_fit = model.fit(disp=False)
# make one step forecast
yhat = model_fit.predict(len(history), len(history))
return yhat[0]
# root mean squared error or rmse
def measure_rmse(actual, predicted):
return sqrt(mean_squared_error(actual, predicted))
# split a univariate dataset into train/test sets
def train_test_split(data, n_test):
return data[:-n_test], data[-n_test:]
# walk-forward validation for univariate data
def walk_forward_validation(data, n_test, cfg):
predictions = list()
# split dataset
train, test = train_test_split(data, n_test)
# seed history with training dataset
history = [x for x in train]
# step over each time-step in the test set
for i in range(len(test)):
# fit model and make forecast for history
yhat = sarima_forecast(history, cfg)
# store forecast in list of predictions
# add actual observation to history for the next loop
# estimate prediction error
error = measure_rmse(test, predictions)
return error
# score a model, return None on failure
def score_model(data, n_test, cfg, debug=False):
result = None
# convert config to a key
key = str(cfg)
# show all warnings and fail on exception if debugging
if debug:
result = walk_forward_validation(data, n_test, cfg)
# one failure during model validation suggests an unstable config
# never show warnings when grid searching, too noisy
with catch_warnings():
result = walk_forward_validation(data, n_test, cfg)
error = None
# check for an interesting result
if result is not None:
print(' > Model[%s] %.3f' % (key, result))
return (key, result)
# grid search configs
def grid_search(data, cfg_list, n_test, parallel=True):
scores = None
if parallel:
# execute configs in parallel
executor = Parallel(n_jobs=cpu_count(), backend='multiprocessing')
tasks = (delayed(score_model)(data, n_test, cfg) for cfg in cfg_list)
scores = executor(tasks)
scores = [score_model(data, n_test, cfg) for cfg in cfg_list]
# remove empty results
scores = [r for r in scores if r[1] != None]
# sort configs by error, asc
scores.sort(key=lambda tup: tup[1])
return scores
# create a set of sarima configs to try
def sarima_configs(seasonal=[0]):
models = list()
# define config lists
p_params = [0, 1, 2]
d_params = [0, 1]
q_params = [0, 1, 2]
t_params = ['n','c','t','ct']
P_params = [0, 1, 2]
D_params = [0, 1]
Q_params = [0, 1, 2]
m_params = seasonal
# create config instances
for p in p_params:
for d in d_params:
for q in q_params:
for t in t_params:
for P in P_params:
for D in D_params:
for Q in Q_params:
for m in m_params:
cfg = [(p,d,q), (P,D,Q,m), t]
return models
# parse dates
def custom_parser(x):
return datetime.strptime('195'+x, '%Y-%m')
if`_name_`== '__main__':
# load dataset
series = read_csv('shampoo.csv', header=0, index_col=0, date_parser=custom_parser)
data = series.values
# data split
n_test = 12
# model configs
cfg_list = sarima_configs()
# grid search
scores = grid_search(data, cfg_list, n_test)
# list top 3 configs
for cfg, error in scores[:3]:
print(cfg, error)
在评估模型时打印模型配置和 RMSE 在运行结束时报告前三个模型配置及其错误。
我们可以看到最好的结果是 RMSE 约为 54.76,具有以下配置:
- 趋势订单 :( 0,1,2)
- 季节性命令 :( 2,0,2,0)
- 趋势参数:'t'(线性趋势)
> Model[[(2, 1, 2), (1, 0, 1, 0), 'ct']] 68.891
> Model[[(2, 1, 2), (2, 0, 0, 0), 'ct']] 75.406
> Model[[(2, 1, 2), (1, 0, 2, 0), 'ct']] 80.908
> Model[[(2, 1, 2), (2, 0, 1, 0), 'ct']] 78.734
> Model[[(2, 1, 2), (2, 0, 2, 0), 'ct']] 82.958
[(0, 1, 2), (2, 0, 2, 0), 't'] 54.767582003072874
[(0, 1, 1), (2, 0, 2, 0), 'ct'] 58.69987083057107
[(1, 1, 2), (0, 0, 1, 0), 't'] 58.709089340600094
“月平均温度”数据集总结了 1920 至 1939 年华氏诺丁汉城堡的月平均气温,以华氏度为单位。
您可以从 DataMarket 了解有关数据集的更多信息。
在当前工作目录中使用文件名“ monthly-mean-temp.csv ”保存文件。
将此数据集作为 Pandas 系列加载。
series = read_csv('monthly-mean-temp.csv', header=0, index_col=0)
数据集有 20 年,或 240 个观测值。我们将数据集修剪为过去五年的数据(60 个观测值),以加快模型评估过程,并使用去年或 12 个观测值来测试集。
# trim dataset to 5 years
data = data[-(5*12):]
季节性成分的周期约为一年,或 12 个观测值。在准备模型配置时,我们将此作为调用sarima_configs()
# model configs
cfg_list = sarima_configs(seasonal=[0, 12])
# grid search sarima hyperparameters for monthly mean temp dataset
from math import sqrt
from multiprocessing import cpu_count
from joblib import Parallel
from joblib import delayed
from warnings import catch_warnings
from warnings import filterwarnings
from statsmodels.tsa.statespace.sarimax import SARIMAX
from sklearn.metrics import mean_squared_error
from pandas import read_csv
# one-step sarima forecast
def sarima_forecast(history, config):
order, sorder, trend = config
# define model
model = SARIMAX(history, order=order, seasonal_order=sorder, trend=trend, enforce_stationarity=False, enforce_invertibility=False)
# fit model
model_fit = model.fit(disp=False)
# make one step forecast
yhat = model_fit.predict(len(history), len(history))
return yhat[0]
# root mean squared error or rmse
def measure_rmse(actual, predicted):
return sqrt(mean_squared_error(actual, predicted))
# split a univariate dataset into train/test sets
def train_test_split(data, n_test):
return data[:-n_test], data[-n_test:]
# walk-forward validation for univariate data
def walk_forward_validation(data, n_test, cfg):
predictions = list()
# split dataset
train, test = train_test_split(data, n_test)
# seed history with training dataset
history = [x for x in train]
# step over each time-step in the test set
for i in range(len(test)):
# fit model and make forecast for history
yhat = sarima_forecast(history, cfg)
# store forecast in list of predictions
# add actual observation to history for the next loop
# estimate prediction error
error = measure_rmse(test, predictions)
return error
# score a model, return None on failure
def score_model(data, n_test, cfg, debug=False):
result = None
# convert config to a key
key = str(cfg)
# show all warnings and fail on exception if debugging
if debug:
result = walk_forward_validation(data, n_test, cfg)
# one failure during model validation suggests an unstable config
# never show warnings when grid searching, too noisy
with catch_warnings():
result = walk_forward_validation(data, n_test, cfg)
error = None
# check for an interesting result
if result is not None:
print(' > Model[%s] %.3f' % (key, result))
return (key, result)
# grid search configs
def grid_search(data, cfg_list, n_test, parallel=True):
scores = None
if parallel:
# execute configs in parallel
executor = Parallel(n_jobs=cpu_count(), backend='multiprocessing')
tasks = (delayed(score_model)(data, n_test, cfg) for cfg in cfg_list)
scores = executor(tasks)
scores = [score_model(data, n_test, cfg) for cfg in cfg_list]
# remove empty results
scores = [r for r in scores if r[1] != None]
# sort configs by error, asc
scores.sort(key=lambda tup: tup[1])
return scores
# create a set of sarima configs to try
def sarima_configs(seasonal=[0]):
models = list()
# define config lists
p_params = [0, 1, 2]
d_params = [0, 1]
q_params = [0, 1, 2]
t_params = ['n','c','t','ct']
P_params = [0, 1, 2]
D_params = [0, 1]
Q_params = [0, 1, 2]
m_params = seasonal
# create config instances
for p in p_params:
for d in d_params:
for q in q_params:
for t in t_params:
for P in P_params:
for D in D_params:
for Q in Q_params:
for m in m_params:
cfg = [(p,d,q), (P,D,Q,m), t]
return models
if`_name_`== '__main__':
# load dataset
series = read_csv('monthly-mean-temp.csv', header=0, index_col=0)
data = series.values
# trim dataset to 5 years
data = data[-(5*12):]
# data split
n_test = 12
# model configs
cfg_list = sarima_configs(seasonal=[0, 12])
# grid search
scores = grid_search(data, cfg_list, n_test)
# list top 3 configs
for cfg, error in scores[:3]:
print(cfg, error)
在评估模型时打印模型配置和 RMSE 在运行结束时报告前三个模型配置及其错误。
我们可以看到最好的结果是大约 1.5 度的 RMSE,具有以下配置:
- 趋势订单 :( 0,0,0)
- 季节性命令 :( 1,0,1,12)
- 趋势参数:'n'(无趋势)
正如我们所料,该模型没有趋势组件和 12 个月的季节性 ARMA 组件。
> Model[[(2, 1, 2), (2, 1, 0, 12), 't']] 4.599
> Model[[(2, 1, 2), (1, 1, 0, 12), 'ct']] 2.477
> Model[[(2, 1, 2), (2, 0, 0, 12), 'ct']] 2.548
> Model[[(2, 1, 2), (2, 0, 1, 12), 'ct']] 2.893
> Model[[(2, 1, 2), (2, 1, 0, 12), 'ct']] 5.404
[(0, 0, 0), (1, 0, 1, 12), 'n'] 1.5577613610905712
[(0, 0, 0), (1, 1, 0, 12), 'n'] 1.6469530713847962
[(0, 0, 0), (2, 0, 0, 12), 'n'] 1.7314448163607488
“月度汽车销售”数据集总结了 1960 年至 1968 年间加拿大魁北克省的月度汽车销量。
您可以从 DataMarket 了解有关数据集的更多信息。
在当前工作目录中使用文件名“ monthly-car-sales.csv ”保存文件。
将此数据集作为 Pandas 系列加载。
series = read_csv('monthly-car-sales.csv', header=0, index_col=0)
数据集有 9 年或 108 个观测值。我们将使用去年或 12 个观测值作为测试集。
季节性成分的期限可能是六个月或 12 个月。在准备模型配置时,我们将尝试将两者作为调用sarima_configs()
# model configs
cfg_list = sarima_configs(seasonal=[0,6,12])
# grid search sarima hyperparameters for monthly car sales dataset
from math import sqrt
from multiprocessing import cpu_count
from joblib import Parallel
from joblib import delayed
from warnings import catch_warnings
from warnings import filterwarnings
from statsmodels.tsa.statespace.sarimax import SARIMAX
from sklearn.metrics import mean_squared_error
from pandas import read_csv
# one-step sarima forecast
def sarima_forecast(history, config):
order, sorder, trend = config
# define model
model = SARIMAX(history, order=order, seasonal_order=sorder, trend=trend, enforce_stationarity=False, enforce_invertibility=False)
# fit model
model_fit = model.fit(disp=False)
# make one step forecast
yhat = model_fit.predict(len(history), len(history))
return yhat[0]
# root mean squared error or rmse
def measure_rmse(actual, predicted):
return sqrt(mean_squared_error(actual, predicted))
# split a univariate dataset into train/test sets
def train_test_split(data, n_test):
return data[:-n_test], data[-n_test:]
# walk-forward validation for univariate data
def walk_forward_validation(data, n_test, cfg):
predictions = list()
# split dataset
train, test = train_test_split(data, n_test)
# seed history with training dataset
history = [x for x in train]
# step over each time-step in the test set
for i in range(len(test)):
# fit model and make forecast for history
yhat = sarima_forecast(history, cfg)
# store forecast in list of predictions
# add actual observation to history for the next loop
# estimate prediction error
error = measure_rmse(test, predictions)
return error
# score a model, return None on failure
def score_model(data, n_test, cfg, debug=False):
result = None
# convert config to a key
key = str(cfg)
# show all warnings and fail on exception if debugging
if debug:
result = walk_forward_validation(data, n_test, cfg)
# one failure during model validation suggests an unstable config
# never show warnings when grid searching, too noisy
with catch_warnings():
result = walk_forward_validation(data, n_test, cfg)
error = None
# check for an interesting result
if result is not None:
print(' > Model[%s] %.3f' % (key, result))
return (key, result)
# grid search configs
def grid_search(data, cfg_list, n_test, parallel=True):
scores = None
if parallel:
# execute configs in parallel
executor = Parallel(n_jobs=cpu_count(), backend='multiprocessing')
tasks = (delayed(score_model)(data, n_test, cfg) for cfg in cfg_list)
scores = executor(tasks)
scores = [score_model(data, n_test, cfg) for cfg in cfg_list]
# remove empty results
scores = [r for r in scores if r[1] != None]
# sort configs by error, asc
scores.sort(key=lambda tup: tup[1])
return scores
# create a set of sarima configs to try
def sarima_configs(seasonal=[0]):
models = list()
# define config lists
p_params = [0, 1, 2]
d_params = [0, 1]
q_params = [0, 1, 2]
t_params = ['n','c','t','ct']
P_params = [0, 1, 2]
D_params = [0, 1]
Q_params = [0, 1, 2]
m_params = seasonal
# create config instances
for p in p_params:
for d in d_params:
for q in q_params:
for t in t_params:
for P in P_params:
for D in D_params:
for Q in Q_params:
for m in m_params:
cfg = [(p,d,q), (P,D,Q,m), t]
return models
if`_name_`== '__main__':
# load dataset
series = read_csv('monthly-car-sales.csv', header=0, index_col=0)
data = series.values
# data split
n_test = 12
# model configs
cfg_list = sarima_configs(seasonal=[0,6,12])
# grid search
scores = grid_search(data, cfg_list, n_test)
# list top 3 configs
for cfg, error in scores[:3]:
print(cfg, error)
在评估模型时打印模型配置和 RMSE 在运行结束时报告前三个模型配置及其错误。
我们可以看到最好的结果是 RMSE 大约 1,551 销售,具有以下配置:
- 趋势订单 :( 0,0,0)
- 季节性命令 :( 1,1,0,12)
- 趋势参数:'t'(线性趋势)
> Model[[(2, 1, 2), (2, 1, 1, 6), 'ct']] 2246.248
> Model[[(2, 1, 2), (2, 0, 2, 12), 'ct']] 10710.462
> Model[[(2, 1, 2), (2, 1, 2, 6), 'ct']] 2183.568
> Model[[(2, 1, 2), (2, 1, 0, 12), 'ct']] 2105.800
> Model[[(2, 1, 2), (2, 1, 1, 12), 'ct']] 2330.361
> Model[[(2, 1, 2), (2, 1, 2, 12), 'ct']] 31580326686.803
[(0, 0, 0), (1, 1, 0, 12), 't'] 1551.8423920342414
[(0, 0, 0), (2, 1, 1, 12), 'c'] 1557.334614575545
[(0, 0, 0), (1, 1, 0, 12), 'c'] 1559.3276311282675
- 数据转换。更新框架以支持可配置的数据转换,例如规范化和标准化。
- 地块预测。更新框架以重新拟合具有最佳配置的模型并预测整个测试数据集,然后将预测与测试集中的实际观察值进行比较。
- 调整历史数量。更新框架以调整用于拟合模型的历史数据量(例如,在 10 年最高温度数据的情况下)。
- Statsmodels 状态空间方法的时间序列分析
- statsmodels.tsa.statespace.sarimax.SARIMAX API
- statsmodels.tsa.statespace.sarimax.SARIMAXResults API
- Statsmodels SARIMAX 笔记本
- Joblib:运行 Python 函数作为管道作业
在本教程中,您了解了如何开发网格搜索所有 SARIMA 模型超参数的框架,以进行单变量时间序列预测。
- 如何使用前向验证从零开始开发网格搜索 SARIMA 模型的框架。
- 如何为出生日常时间序列数据网格搜索 SARIMA 模型超参数。
- 如何针对洗发水销售,汽车销售和温度的月度时间序列数据网格搜索 SARIMA 模型超参数。
你有任何问题吗? 在下面的评论中提出您的问题,我会尽力回答。