Skip to content

Commit

Permalink
first set of dask conversions, added param for single-threading/multi…
Browse files Browse the repository at this point in the history
…-threading Marcnuth#20
  • Loading branch information
hokiegeek2 committed Oct 12, 2018
1 parent e0c81e0 commit cd0a4e7
Showing 1 changed file with 33 additions and 11 deletions.
44 changes: 33 additions & 11 deletions anomaly_detection/anomaly_detect_ts.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@
period_override: Override the auto-generated period
Defaults to None
multithreaded: whether to use multi-threaded implementation of DataFrame operations
Defaults to False
Details:
Expand Down Expand Up @@ -138,6 +141,10 @@
import numpy as np
import scipy as sp
import pandas as pd
from multiprocessing import cpu_count
from dask import dataframe as ddf
from dask.dataframe.core import Series as DaskSeries

import datetime
import statsmodels.api as sm
import logging
Expand Down Expand Up @@ -279,24 +286,39 @@ def _get_max_anoms(data, max_anoms):
piecewise_median_period_weeks : int
used to determine days and observations per period
'''
def _process_long_term_data(data, period, granularity, piecewise_median_period_weeks):
def _process_long_term_data(raw_data, data, period, granularity, piecewise_median_period_weeks):
# Pre-allocate list with size equal to the number of piecewise_median_period_weeks chunks in x + any left over chunk
# handle edge cases for daily and single column data period lengths
num_obs_in_period = period * piecewise_median_period_weeks + 1 if granularity == 'day' else period * 7 * piecewise_median_period_weeks
num_days_in_period = (7 * piecewise_median_period_weeks) + 1 if granularity == 'day' else (7 * piecewise_median_period_weeks)

dask_df = _get_dask_dataframe(data)

all_data = []
# Subset x into piecewise_median_period_weeks chunks
for i in range(1, data.size + 1, num_obs_in_period):
start_date = data.index[i]
# if there is at least 14 days left, subset it, otherwise subset last_date - 14 days
end_date = start_date + datetime.timedelta(days=num_days_in_period)

if end_date < data.index[-1]:
all_data.append(data.loc[lambda x: (x.index >= start_date) & (x.index <= end_date)])
all_data.append(_execute_dask_task(lambda data: data.loc[lambda raw_data: (raw_data.index >= start_date) & (raw_data.index <= end_date)], dask_df))
else:
all_data.append(data.loc[lambda x: x.index >= data.index[-1] - datetime.timedelta(days=num_days_in_period)])
all_data.append(_execute_dask_task(lambda data: data.loc[lambda raw_data: raw_data.index >= data.index[-1] - datetime.timedelta(days=num_days_in_period)], dask_df))
return all_data

def _execute_dask_task(function, dask_df):
return dask_df.map_partitions(function).compute()

def _execute_dataframe_function(function, data, dask_df=None):
if dask_df is None:
return function
else:
return dask_df.map_partitions(lambda data: function).compute()

def _get_dask_dataframe(df):
return ddf.from_pandas(df, npartitions=cpu_count())

'''
Returns the results from the last day or hour only
Expand Down Expand Up @@ -390,17 +412,17 @@ def _get_decomposed_data_tuple(data, num_obs_per_period):
data = data - decomposed.seasonal - data.mean()
return (data, smoothed)

def anomaly_detect_ts(x, max_anoms=0.1, direction="pos", alpha=0.05, only_last=None,
def anomaly_detect_ts(raw_data, max_anoms=0.1, direction="pos", alpha=0.05, only_last=None,
threshold=None, e_value=False, longterm=False, piecewise_median_period_weeks=2,
plot=False, y_log=False, xlabel="", ylabel="count", title='shesd output: ', verbose=False,
dropna=False, resampling=False, period_override=None):
dropna=False, resampling=False, period_override=None, multithreaded=False):

if verbose:
logging.info("Validating input parameters")
# validation
assert isinstance(x, pd.Series), 'Data must be a series(Pandas.Series)'
assert x.values.dtype in [int, float], 'Values of the series must be number'
assert x.index.dtype == np.dtype('datetime64[ns]'), 'Index of the series must be datetime'
assert isinstance(raw_data, pd.Series), 'Data must be a series(Pandas.Series)'
assert raw_data.values.dtype in [int, float], 'Values of the series must be number'
assert raw_data.index.dtype == np.dtype('datetime64[ns]'), 'Index of the series must be datetime'
assert max_anoms <= 0.49 and max_anoms >= 0, 'max_anoms must be non-negative and less than 50% '
assert direction in ['pos', 'neg', 'both'], 'direction options: pos | neg | both'
assert only_last in [None, 'day', 'hr'], 'only_last options: None | day | hr'
Expand All @@ -412,9 +434,9 @@ def anomaly_detect_ts(x, max_anoms=0.1, direction="pos", alpha=0.05, only_last=N
if alpha < 0.01 or alpha > 0.1:
logging.warn('alpha is the statistical significance and is usually between 0.01 and 0.1')

# TODO Allow x.index to be number, here we can convert it to datetime
# TODO Allow raw_data.index to be number, here we can convert it to datetime

data, period, granularity = _get_data_tuple(x, period_override, resampling)
data, period, granularity = _get_data_tuple(raw_data, period_override, resampling)

if granularity is 'day':
num_days_per_line = 7
Expand All @@ -425,7 +447,7 @@ def anomaly_detect_ts(x, max_anoms=0.1, direction="pos", alpha=0.05, only_last=N

# If longterm is enabled, break the data into subset data frames and store in all_data
if longterm:
all_data = _process_long_term_data(data, period, granularity, piecewise_median_period_weeks)
all_data = _process_long_term_data(raw_data, data, period, granularity, piecewise_median_period_weeks)
else:
all_data = [data]

Expand Down

0 comments on commit cd0a4e7

Please sign in to comment.