diff --git a/.gitignore b/.gitignore index 370a259..3983d6b 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,5 @@ __pycache__/ *.egg-info/ MANIFEST dist/ +dfp/dask-worker-space/global.lock +dfp/dask-worker-space/purge.lock diff --git a/dfp/azure_proc.sh b/dfp/azure_proc.sh new file mode 100644 index 0000000..f83393a --- /dev/null +++ b/dfp/azure_proc.sh @@ -0,0 +1,28 @@ +#!/bin/sh + +FILES=$1 +ORIGIN="azure" +SAVE_DIR="/home/nfs/sdavis/azure_test/20220730_script" +FILETYPE="csv" +DELIMITER="^" +GROUPBY="userPrincipalName" +TIMESTAMP="createdDateTime" +APP="appDisplayName" +CITY="location.city" +STATE="location.state" +COUNTRY="location.countryOrRegion" +MANAGER="m_name" +EXTENSION=".csv" +MIN_RECORDS=0 + +python preprocess.py --origin $ORIGIN \ + --files $FILES \ + --save_dir $SAVE_DIR \ + --filetype $FILETYPE \ + --delimiter $DELIMITER \ + --groupby $GROUPBY \ + --timestamp $TIMESTAMP \ + --app $APP \ + --manager $MANAGER \ + --extension $EXTENSION \ + --min_records $MIN_RECORDS diff --git a/dfp/preprocess.py b/dfp/preprocess.py index b40ac90..9e2d3cc 100644 --- a/dfp/preprocess.py +++ b/dfp/preprocess.py @@ -1,31 +1,34 @@ +import time +import datetime import pandas as pd from dask import dataframe as dd - +from dask.distributed import Client +import numpy as np import os +import sys +import argparse import json -# _AZURE_RENAME_COLUMNS = {"location.countryOrRegion": "locationcountryOrRegion", -# "location.state": "locationstate", -# "location.city": "locationcity", -# "createdDateTime":"time", -# "deviceDetail.displayName":"deviceDetaildisplayName", -# "deviceDetail.browser":"deviceDetailbrowser", -# "deviceDetail.operatingSystem":"deviceDetailoperatingSystem", -# "status.failureReason":"statusfailureReason"} +parser = argparse.ArgumentParser(description="Process Duo or Azure logs for DFP") +parser.add_argument('--origin', choices=['duo', 'azure'], default='duo', help='the type of logs to process: duo or azure') +parser.add_argument('--files', default=None, help='The directory containing the files to process') +parser.add_argument('--save_dir', default=None, help='The directory to save the processed files') +parser.add_argument('--filetype', default='csv', choices=['csv', 'json', 'jsonline'], help='Switch between csv and jsonlines for processing Azure logs') +parser.add_argument('--explode_raw', action='store_true', help='Option to explode the _raw key from a jsonline file') +parser.add_argument('--delimiter', default=',', help='The CSV delimiter in the files to be processed') +parser.add_argument('--groupby', default=None, help='The column to be aggregated over. Usually a username.') +parser.add_argument('--timestamp', default=None, help='The name of the column containing the timing info') +parser.add_argument('--city', default=None, help='The name of the column containing the city') +parser.add_argument('--state', default=None, help="the name of the column containing the state") +parser.add_argument('--country', default=None, help="The name of the column containing the country") +parser.add_argument('--app', default='appDisplayName', help="The name of the column containing the application. Does not apply to Duo logs.") +parser.add_argument('--manager', default=None, help='The column containing the manager name. Leave blank if you want user-level results') +parser.add_argument('--extension', default=None, help='The extensions of the files to be loaded. Only needed if there are other files in the directory containing the files to be processed') +parser.add_argument('--min_records', type=int, default=0, help='The minimum number of records needed for a processed user to be saved.') + -# _AZURE_PARED_COLUMNS = ["userPrincipalName", -# "appDisplayName", -# "clientAppUsed", -# "time", -# "riskEventTypes_v2", -# "locationcity", -# "locationstate", -# "locationcountryOrRegion", -# "deviceDetaildisplayName", -# "deviceDetailbrowser", -# "deviceDetailoperatingSystem", -# "statusfailureReason"] +_DEFAULT_DATE = '1970-01-01T00:00:00.000000+00:00' def _if_dir_not_exists(directory): @@ -37,30 +40,26 @@ def _explode_raw(df): return df2 -def _azure_derived_features(df, timestamp_column, city_column, state_column, country_column, application_column): +def _derived_features(df, timestamp_column, city_column, state_column, country_column, application_column): pdf = df.copy() - pdf['time'] = pd.to_datetime(pdf[timestamp_column]) - pdf['day'] = pdf['time'].dt.date - pdf.sort_values(by=['time'], inplace=True) - pdf.fillna("nan", inplace=True) - pdf['overall_location'] = pdf[city_column] + ', ' + pdf[state_column] + ', ' + pdf[country_column] - pdf['locincrement'] = pdf.groupby('day')['overall_location'].transform(lambda x: pd.factorize(x)[0] + 1) - pdf['appincrement'] = pdf.groupby('day')[application_column].transform(lambda x: pd.factorize(x)[0] + 1) - pdf["logcount"]=pdf.groupby('day').cumcount() - pdf.drop('overall_location', inplace=True, axis = 1) - return pdf - - -def _duo_derived_features(df, timestamp_column, city_column, state_column, country_column): - pdf = df.copy() - pdf['time'] = pd.to_datetime(pdf[timestamp_column]) + pdf['time'] = pd.to_datetime(pdf[timestamp_column], errors='coerce') pdf['day'] = pdf['time'].dt.date + pdf.fillna({'time': pd.to_datetime(_DEFAULT_DATE), 'day': pd.to_datetime(_DEFAULT_DATE).date()}, inplace = True) pdf.sort_values(by=['time'], inplace=True) - pdf.fillna("nan", inplace=True) - pdf['overall_location'] = pdf[city_column] + ', ' + pdf[state_column] + ', ' + pdf[country_column] - pdf['locincrement'] = pdf.groupby('day')['overall_location'].transform(lambda x: pd.factorize(x)[0] + 1) + overall_location_columns = [col for col in [city_column, state_column, country_column] if col is not None] + if len(overall_location_columns) > 0: + pdf['overall_location'] = pdf[overall_location_columns].apply(lambda x: ', '.join(x), axis=1) + pdf['loc_cat'] = pdf.groupby('day')['overall_location'].transform(lambda x: pd.factorize(x)[0] + 1) + pdf.fillna({'loc_cat': 1}, inplace = True) + pdf['locincrement'] = pdf.groupby('day')['loc_cat'].expanding(1).max().droplevel(0) + pdf.drop(['overall_location', 'loc_cat'], inplace=True, axis=1) + if application_column is not None: + pdf['app_cat'] = pdf.groupby('day')[application_column].transform(lambda x: pd.factorize(x)[0] + 1) + pdf.fillna({'app_cat': 1}, inplace = True) + pdf['appincrement'] = pdf.groupby('day')['app_cat'].expanding(1).max().droplevel(0) + pdf.drop('app_cat', inplace=True, axis=1) pdf["logcount"]=pdf.groupby('day').cumcount() - pdf.drop('overall_location', inplace=True, axis=1) + return pdf @@ -69,22 +68,31 @@ def _save_groups(df, outdir, source): return df -def proc_azure_logs(files, - save_dir, - filetype = 'csv', - delimiter = ',', - groupby = 'userPrincipalName', - timestamp_column = 'createdDateTime', - city_column = 'location.city', - state_column = 'location.state', - country_column = 'location.countryOrRegion', - application_column = 'appDisplayName', - output_grouping = None, - extension=None, - min_records = 0): +def _parse_time(df, timestamp_column): + pdf = df.copy() + pdf['time'] = pd.to_datetime(pdf[timestamp_column]) + pdf['day'] = pdf['time'].dt.date + return pdf + +def proc_logs(files, + save_dir, + log_source = 'duo', + filetype = 'csv', + storage_options = {}, + explode_raw = False, + delimiter = ',', + groupby = 'userPrincipalName', + timestamp_column = 'createdDateTime', + city_column = None, + state_column = None, + country_column = None, + application_column = None, + output_grouping = None, + extension=None, + min_records = 0): """ - Process Azure log files for DFP training. + Process log files for DFP training. Parameters ---------- @@ -92,8 +100,14 @@ def proc_azure_logs(files, A directory or filepath or list of filepaths save_dir: str The directory to save the training data + log_source: str + The source of the logs. Used primarily for tracing training data provenance. filetype: str, default='csv' - 'csv' or 'json' + 'csv', 'json', or 'jsonline' + storage_options: dict + any arguments to pass to dask if trying to access data from remote locations such as AWS + explode_raw: bool + This indicates that the data is in a nested jsonlines object with the _raw key delimiter: str, default=',' The csv delimiter groupby: str, default='userPrincipalName' @@ -139,127 +153,73 @@ def proc_azure_logs(files, files = [] assert isinstance(files, list) and len(files) > 0, 'Please pass a directory, a file-path, or a list of file-paths containing the files to be processed' - if filetype == 'json': - nested_logs = dd.read_json(files, lines=True) - meta = pd.json_normalize(json.loads(nested_logs.head(1)['_raw'].to_list()[0])).iloc[:0,:].copy() - azure_logs = nested_logs.map_partitions(lambda df: _explode_raw(df), meta=meta) + start_time = time.perf_counter() + + if filetype == 'jsonline': + if explode_raw: + nested_logs = dd.read_json(files, lines=True, storage_options=storage_options) + meta = pd.json_normalize(json.loads(nested_logs.head(1)['_raw'].to_list()[0])).iloc[:0,:].copy() + logs = nested_logs.map_partitions(lambda df: _explode_raw(df), meta=meta).fillna('nan') + else: + logs = dd.read_json(files, lines=True, storage_options=storage_options).fillna('nan') + elif filetype == 'json': + logs = dd.read_json(files, storage_options=storage_options).fillna('nan') else: - azure_logs = dd.read_csv(files, delimiter=delimiter, dtype='object') + logs = dd.read_csv(files, delimiter=delimiter, storage_options=storage_options, dtype='object').fillna('nan') - azure_meta = {c: v for c, v in zip(azure_logs._meta, azure_logs._meta.dtypes)} - azure_meta['time'] = 'datetime64[ns]' - azure_meta['day'] = 'datetime64[ns]' - azure_meta['locincrement'] = 'int' - azure_meta['appincrement'] = 'int' - azure_meta['logcount'] = 'int' + logs_meta = {c: v for c, v in zip(logs._meta, logs._meta.dtypes)} + logs_meta['time'] = 'datetime64[ns]' + logs_meta['day'] = 'datetime64[ns]' + if city_column is not None or state_column is not None or country_column is not None: + logs_meta['locincrement'] = 'int' + if application_column is not None: + logs_meta['appincrement'] = 'int' + logs_meta['logcount'] = 'int' - azure_logs.persist() - - derived_azure = azure_logs.groupby(groupby).apply(lambda df: _azure_derived_features(df, timestamp_column, city_column, state_column, country_column, application_column), meta=azure_meta).reset_index(drop=True) + derived_logs = logs.groupby(groupby).apply(lambda df: _derived_features(df, timestamp_column, city_column, state_column, country_column, application_column), meta=logs_meta).reset_index(drop=True) if min_records > 0: - user_entry_counts = azure_logs[[groupby, timestamp_column]].groupby(groupby).count().compute() - trainees = [user for user, count in user_entry_counts.to_dict()[timestamp_column].items() if count > min_records] - derived_azure[derived_azure[groupby].isin(trainees)].groupby(output_grouping).apply(lambda df: _save_groups(df, save_dir, "azure"), meta=derived_azure._meta).size.compute() + logs = logs.persist() + user_entry_counts = logs[[groupby, 'day']].groupby(groupby).count().compute() + trainees = [user for user, count in user_entry_counts.to_dict()['day'].items() if count > min_records] + derived_logs[derived_logs[groupby].isin(trainees)].groupby(output_grouping).apply(lambda df: _save_groups(df, save_dir, log_source), meta=derived_logs._meta).size.compute() else: - derived_azure.groupby(output_grouping).apply(lambda df: _save_groups(df, save_dir, "azure"), meta=derived_azure._meta).size.compute() + derived_logs.groupby(output_grouping).apply(lambda df: _save_groups(df, save_dir, log_source), meta=derived_logs._meta).size.compute() + + timing = datetime.timedelta(seconds = time.perf_counter() - start_time) - num_training_files = len([file for file in os.listdir(save_dir) if file.endswith('_azure.csv')]) - print("%i training files successfully created" % num_training_files) + num_training_files = len([file for file in os.listdir(save_dir) if file.endswith('_{log_source}.csv'.format(log_source=log_source))]) + print("{num_files} training files successfully created in {time}".format(num_files=num_training_files, time=timing)) if num_training_files > 0: return True else: return False -def proc_duo_logs(files, - save_dir, - delimiter = ',', - groupby = 'username', - timestamp_column = 'isotimestamp', - city_column = 'location.city', - state_column = 'location.state', - country_column = 'location.country', - output_grouping = None, - extension=None, - min_records = 0): - """ - Process Duo log files for DFP training. +def _run(): + opt = parser.parse_args() + + client = Client() + client.restart() + + print('Beginning {origin} pre-processing'.format(origin=opt.origin)) + proc_logs(files=opt.files, + log_source=opt.origin, + save_dir=opt.save_dir, + filetype=opt.filetype, + explode_raw=opt.explode_raw, + delimiter=opt.delimiter, + groupby=opt.groupby or 'userPrincipalName', + timestamp_column=opt.timestamp or 'createdDateTime', + city_column=opt.city, + state_column=opt.state, + country_column=opt.country, + application_column=opt.app, + output_grouping=opt.manager, + extension=opt.extension, + min_records=opt.min_records) - Parameters - ---------- - files: str or List[str] - A directory or filepath or list of filepaths - save_dir: str - The directory to save the training data - filetype: str, default='csv' - 'csv' or 'json' - delimiter: str, default=',' - The csv delimiter - groupby: str, default='userPrincipalName' - The column name to aggregate over for derived feature creation. - timestamp_column: str, default='createdDateTime - The column name containing the timestamp - city_column: str, default='location.city' - The column name containing the city location data - state_column: str, default='location.state' - The column name containing the state location data - country_column: str, default='location.countryOrRegion - The column name containing the country location data - output_grouping: str, optional - The column to aggregate the output training data. If None, this defaults to the aggregation level specified in the groupby parameter. - This is where you would specify the manager name column, if training is being done by manager group. - extension: str, optional - Specify the file extension to load, if the directory contains additional files that should not be loaded. - min_records: int, default=0 - The minimum number of records that need to be observed to save the data for training. Setting this to 0 creates data for all users. - - Returns - ------- - bool - True if more than 1 training file is returned, else False is returned + client.close() - """ - - if output_grouping is None: - output_grouping = groupby - - _if_dir_not_exists(save_dir) - - if isinstance(files, str): - if os.path.isdir(files): - if extension is not None: - files = [os.path.join(files, file) for file in os.listdir(files) if file.endswith(extension)] - else: - files = [os.path.join(files, file) for file in os.listdir(files)] - elif os.path.isfile(files): - files = [files] - else: - files = [] - assert isinstance(files, list) and len(files) > 0, 'Please pass a directory, a file-path, or a list of file-paths containing the files to be processed' - - duo_logs = dd.read_csv(files, delimiter=delimiter, dtype='object') - - duo_meta = {c: v for c, v in zip(duo_logs._meta, duo_logs._meta.dtypes)} - duo_meta['time'] = 'datetime64[ns]' - duo_meta['day'] = 'datetime64[ns]' - duo_meta['locincrement'] = 'int' - duo_meta['logcount'] = 'int' - - duo_logs.persist() - - derived_duo = duo_logs.groupby(groupby).apply(lambda df: _duo_derived_features(df, timestamp_column, city_column, state_column, country_column), meta=duo_meta).reset_index(drop=True) - - if min_records > 0: - user_entry_counts = duo_logs[[groupby, timestamp_column]].groupby(groupby).count().compute() - trainees = [user for user, count in user_entry_counts.to_dict()[timestamp_column].items() if count > min_records] - derived_duo[derived_duo[groupby].isin(trainees)].groupby(output_grouping).apply(lambda df: _save_groups(df, save_dir, "duo"), meta=duo_meta).size.compute() - else: - derived_duo.groupby(output_grouping).apply(lambda df: _save_groups(df, save_dir, "duo"), meta=duo_meta).size.compute() - - num_training_files = len([file for file in os.listdir(save_dir) if file.endswith('_duo.csv')]) - print("%i training files successfully created" % num_training_files) - if num_training_files > 0: - return True - else: - return False +if __name__ == '__main__': + _run() \ No newline at end of file