From cd0a4e7bb4cd467adee3e7e3b1c70e4de8c34b94 Mon Sep 17 00:00:00 2001 From: hokiegeek2 Date: Fri, 12 Oct 2018 14:11:29 -0400 Subject: [PATCH] first set of dask conversions, added param for single-threading/multi-threading #20 --- anomaly_detection/anomaly_detect_ts.py | 44 +++++++++++++++++++------- 1 file changed, 33 insertions(+), 11 deletions(-) diff --git a/anomaly_detection/anomaly_detect_ts.py b/anomaly_detection/anomaly_detect_ts.py index a1e0b36..896ea54 100644 --- a/anomaly_detection/anomaly_detect_ts.py +++ b/anomaly_detection/anomaly_detect_ts.py @@ -64,6 +64,9 @@ period_override: Override the auto-generated period Defaults to None + + multithreaded: whether to use multi-threaded implementation of DataFrame operations + Defaults to False Details: @@ -138,6 +141,10 @@ import numpy as np import scipy as sp import pandas as pd +from multiprocessing import cpu_count +from dask import dataframe as ddf +from dask.dataframe.core import Series as DaskSeries + import datetime import statsmodels.api as sm import logging @@ -279,24 +286,39 @@ def _get_max_anoms(data, max_anoms): piecewise_median_period_weeks : int used to determine days and observations per period ''' -def _process_long_term_data(data, period, granularity, piecewise_median_period_weeks): +def _process_long_term_data(raw_data, data, period, granularity, piecewise_median_period_weeks): # Pre-allocate list with size equal to the number of piecewise_median_period_weeks chunks in x + any left over chunk # handle edge cases for daily and single column data period lengths num_obs_in_period = period * piecewise_median_period_weeks + 1 if granularity == 'day' else period * 7 * piecewise_median_period_weeks num_days_in_period = (7 * piecewise_median_period_weeks) + 1 if granularity == 'day' else (7 * piecewise_median_period_weeks) + dask_df = _get_dask_dataframe(data) + all_data = [] # Subset x into piecewise_median_period_weeks chunks for i in range(1, data.size + 1, num_obs_in_period): start_date = data.index[i] # if there is at least 14 days left, subset it, otherwise subset last_date - 14 days end_date = start_date + datetime.timedelta(days=num_days_in_period) + if end_date < data.index[-1]: - all_data.append(data.loc[lambda x: (x.index >= start_date) & (x.index <= end_date)]) + all_data.append(_execute_dask_task(lambda data: data.loc[lambda raw_data: (raw_data.index >= start_date) & (raw_data.index <= end_date)], dask_df)) else: - all_data.append(data.loc[lambda x: x.index >= data.index[-1] - datetime.timedelta(days=num_days_in_period)]) + all_data.append(_execute_dask_task(lambda data: data.loc[lambda raw_data: raw_data.index >= data.index[-1] - datetime.timedelta(days=num_days_in_period)], dask_df)) return all_data +def _execute_dask_task(function, dask_df): + return dask_df.map_partitions(function).compute() + +def _execute_dataframe_function(function, data, dask_df=None): + if dask_df is None: + return function + else: + return dask_df.map_partitions(lambda data: function).compute() + +def _get_dask_dataframe(df): + return ddf.from_pandas(df, npartitions=cpu_count()) + ''' Returns the results from the last day or hour only @@ -390,17 +412,17 @@ def _get_decomposed_data_tuple(data, num_obs_per_period): data = data - decomposed.seasonal - data.mean() return (data, smoothed) -def anomaly_detect_ts(x, max_anoms=0.1, direction="pos", alpha=0.05, only_last=None, +def anomaly_detect_ts(raw_data, max_anoms=0.1, direction="pos", alpha=0.05, only_last=None, threshold=None, e_value=False, longterm=False, piecewise_median_period_weeks=2, plot=False, y_log=False, xlabel="", ylabel="count", title='shesd output: ', verbose=False, - dropna=False, resampling=False, period_override=None): + dropna=False, resampling=False, period_override=None, multithreaded=False): if verbose: logging.info("Validating input parameters") # validation - assert isinstance(x, pd.Series), 'Data must be a series(Pandas.Series)' - assert x.values.dtype in [int, float], 'Values of the series must be number' - assert x.index.dtype == np.dtype('datetime64[ns]'), 'Index of the series must be datetime' + assert isinstance(raw_data, pd.Series), 'Data must be a series(Pandas.Series)' + assert raw_data.values.dtype in [int, float], 'Values of the series must be number' + assert raw_data.index.dtype == np.dtype('datetime64[ns]'), 'Index of the series must be datetime' assert max_anoms <= 0.49 and max_anoms >= 0, 'max_anoms must be non-negative and less than 50% ' assert direction in ['pos', 'neg', 'both'], 'direction options: pos | neg | both' assert only_last in [None, 'day', 'hr'], 'only_last options: None | day | hr' @@ -412,9 +434,9 @@ def anomaly_detect_ts(x, max_anoms=0.1, direction="pos", alpha=0.05, only_last=N if alpha < 0.01 or alpha > 0.1: logging.warn('alpha is the statistical significance and is usually between 0.01 and 0.1') - # TODO Allow x.index to be number, here we can convert it to datetime + # TODO Allow raw_data.index to be number, here we can convert it to datetime - data, period, granularity = _get_data_tuple(x, period_override, resampling) + data, period, granularity = _get_data_tuple(raw_data, period_override, resampling) if granularity is 'day': num_days_per_line = 7 @@ -425,7 +447,7 @@ def anomaly_detect_ts(x, max_anoms=0.1, direction="pos", alpha=0.05, only_last=N # If longterm is enabled, break the data into subset data frames and store in all_data if longterm: - all_data = _process_long_term_data(data, period, granularity, piecewise_median_period_weeks) + all_data = _process_long_term_data(raw_data, data, period, granularity, piecewise_median_period_weeks) else: all_data = [data]