Skip to content
This repository has been archived by the owner on Mar 29, 2023. It is now read-only.

Commit

Permalink
Merge pull request #2 from shawn-davis:increment_fix
Browse files Browse the repository at this point in the history
Increment_fix
  • Loading branch information
shawn-davis authored Aug 3, 2022
2 parents 0bcecfc + 5c8cb55 commit 53cbc9f
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 165 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ __pycache__/
*.egg-info/
MANIFEST
dist/
dfp/dask-worker-space/global.lock
dfp/dask-worker-space/purge.lock
28 changes: 28 additions & 0 deletions dfp/azure_proc.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/bin/sh

FILES=$1
ORIGIN="azure"
SAVE_DIR="/home/nfs/sdavis/azure_test/20220730_script"
FILETYPE="csv"
DELIMITER="^"
GROUPBY="userPrincipalName"
TIMESTAMP="createdDateTime"
APP="appDisplayName"
CITY="location.city"
STATE="location.state"
COUNTRY="location.countryOrRegion"
MANAGER="m_name"
EXTENSION=".csv"
MIN_RECORDS=0

python preprocess.py --origin $ORIGIN \
--files $FILES \
--save_dir $SAVE_DIR \
--filetype $FILETYPE \
--delimiter $DELIMITER \
--groupby $GROUPBY \
--timestamp $TIMESTAMP \
--app $APP \
--manager $MANAGER \
--extension $EXTENSION \
--min_records $MIN_RECORDS
290 changes: 125 additions & 165 deletions dfp/preprocess.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,34 @@
import time
import datetime
import pandas as pd
from dask import dataframe as dd

from dask.distributed import Client
import numpy as np
import os
import sys
import argparse
import json


# _AZURE_RENAME_COLUMNS = {"location.countryOrRegion": "locationcountryOrRegion",
# "location.state": "locationstate",
# "location.city": "locationcity",
# "createdDateTime":"time",
# "deviceDetail.displayName":"deviceDetaildisplayName",
# "deviceDetail.browser":"deviceDetailbrowser",
# "deviceDetail.operatingSystem":"deviceDetailoperatingSystem",
# "status.failureReason":"statusfailureReason"}
parser = argparse.ArgumentParser(description="Process Duo or Azure logs for DFP")
parser.add_argument('--origin', choices=['duo', 'azure'], default='duo', help='the type of logs to process: duo or azure')
parser.add_argument('--files', default=None, help='The directory containing the files to process')
parser.add_argument('--save_dir', default=None, help='The directory to save the processed files')
parser.add_argument('--filetype', default='csv', choices=['csv', 'json', 'jsonline'], help='Switch between csv and jsonlines for processing Azure logs')
parser.add_argument('--explode_raw', action='store_true', help='Option to explode the _raw key from a jsonline file')
parser.add_argument('--delimiter', default=',', help='The CSV delimiter in the files to be processed')
parser.add_argument('--groupby', default=None, help='The column to be aggregated over. Usually a username.')
parser.add_argument('--timestamp', default=None, help='The name of the column containing the timing info')
parser.add_argument('--city', default=None, help='The name of the column containing the city')
parser.add_argument('--state', default=None, help="the name of the column containing the state")
parser.add_argument('--country', default=None, help="The name of the column containing the country")
parser.add_argument('--app', default='appDisplayName', help="The name of the column containing the application. Does not apply to Duo logs.")
parser.add_argument('--manager', default=None, help='The column containing the manager name. Leave blank if you want user-level results')
parser.add_argument('--extension', default=None, help='The extensions of the files to be loaded. Only needed if there are other files in the directory containing the files to be processed')
parser.add_argument('--min_records', type=int, default=0, help='The minimum number of records needed for a processed user to be saved.')


# _AZURE_PARED_COLUMNS = ["userPrincipalName",
# "appDisplayName",
# "clientAppUsed",
# "time",
# "riskEventTypes_v2",
# "locationcity",
# "locationstate",
# "locationcountryOrRegion",
# "deviceDetaildisplayName",
# "deviceDetailbrowser",
# "deviceDetailoperatingSystem",
# "statusfailureReason"]
_DEFAULT_DATE = '1970-01-01T00:00:00.000000+00:00'


def _if_dir_not_exists(directory):
Expand All @@ -37,30 +40,26 @@ def _explode_raw(df):
return df2


def _azure_derived_features(df, timestamp_column, city_column, state_column, country_column, application_column):
def _derived_features(df, timestamp_column, city_column, state_column, country_column, application_column):
pdf = df.copy()
pdf['time'] = pd.to_datetime(pdf[timestamp_column])
pdf['day'] = pdf['time'].dt.date
pdf.sort_values(by=['time'], inplace=True)
pdf.fillna("nan", inplace=True)
pdf['overall_location'] = pdf[city_column] + ', ' + pdf[state_column] + ', ' + pdf[country_column]
pdf['locincrement'] = pdf.groupby('day')['overall_location'].transform(lambda x: pd.factorize(x)[0] + 1)
pdf['appincrement'] = pdf.groupby('day')[application_column].transform(lambda x: pd.factorize(x)[0] + 1)
pdf["logcount"]=pdf.groupby('day').cumcount()
pdf.drop('overall_location', inplace=True, axis = 1)
return pdf


def _duo_derived_features(df, timestamp_column, city_column, state_column, country_column):
pdf = df.copy()
pdf['time'] = pd.to_datetime(pdf[timestamp_column])
pdf['time'] = pd.to_datetime(pdf[timestamp_column], errors='coerce')
pdf['day'] = pdf['time'].dt.date
pdf.fillna({'time': pd.to_datetime(_DEFAULT_DATE), 'day': pd.to_datetime(_DEFAULT_DATE).date()}, inplace = True)
pdf.sort_values(by=['time'], inplace=True)
pdf.fillna("nan", inplace=True)
pdf['overall_location'] = pdf[city_column] + ', ' + pdf[state_column] + ', ' + pdf[country_column]
pdf['locincrement'] = pdf.groupby('day')['overall_location'].transform(lambda x: pd.factorize(x)[0] + 1)
overall_location_columns = [col for col in [city_column, state_column, country_column] if col is not None]
if len(overall_location_columns) > 0:
pdf['overall_location'] = pdf[overall_location_columns].apply(lambda x: ', '.join(x), axis=1)
pdf['loc_cat'] = pdf.groupby('day')['overall_location'].transform(lambda x: pd.factorize(x)[0] + 1)
pdf.fillna({'loc_cat': 1}, inplace = True)
pdf['locincrement'] = pdf.groupby('day')['loc_cat'].expanding(1).max().droplevel(0)
pdf.drop(['overall_location', 'loc_cat'], inplace=True, axis=1)
if application_column is not None:
pdf['app_cat'] = pdf.groupby('day')[application_column].transform(lambda x: pd.factorize(x)[0] + 1)
pdf.fillna({'app_cat': 1}, inplace = True)
pdf['appincrement'] = pdf.groupby('day')['app_cat'].expanding(1).max().droplevel(0)
pdf.drop('app_cat', inplace=True, axis=1)
pdf["logcount"]=pdf.groupby('day').cumcount()
pdf.drop('overall_location', inplace=True, axis=1)

return pdf


Expand All @@ -69,31 +68,46 @@ def _save_groups(df, outdir, source):
return df


def proc_azure_logs(files,
save_dir,
filetype = 'csv',
delimiter = ',',
groupby = 'userPrincipalName',
timestamp_column = 'createdDateTime',
city_column = 'location.city',
state_column = 'location.state',
country_column = 'location.countryOrRegion',
application_column = 'appDisplayName',
output_grouping = None,
extension=None,
min_records = 0):
def _parse_time(df, timestamp_column):
pdf = df.copy()
pdf['time'] = pd.to_datetime(pdf[timestamp_column])
pdf['day'] = pdf['time'].dt.date
return pdf


def proc_logs(files,
save_dir,
log_source = 'duo',
filetype = 'csv',
storage_options = {},
explode_raw = False,
delimiter = ',',
groupby = 'userPrincipalName',
timestamp_column = 'createdDateTime',
city_column = None,
state_column = None,
country_column = None,
application_column = None,
output_grouping = None,
extension=None,
min_records = 0):
"""
Process Azure log files for DFP training.
Process log files for DFP training.
Parameters
----------
files: str or List[str]
A directory or filepath or list of filepaths
save_dir: str
The directory to save the training data
log_source: str
The source of the logs. Used primarily for tracing training data provenance.
filetype: str, default='csv'
'csv' or 'json'
'csv', 'json', or 'jsonline'
storage_options: dict
any arguments to pass to dask if trying to access data from remote locations such as AWS
explode_raw: bool
This indicates that the data is in a nested jsonlines object with the _raw key
delimiter: str, default=','
The csv delimiter
groupby: str, default='userPrincipalName'
Expand Down Expand Up @@ -139,127 +153,73 @@ def proc_azure_logs(files,
files = []
assert isinstance(files, list) and len(files) > 0, 'Please pass a directory, a file-path, or a list of file-paths containing the files to be processed'

if filetype == 'json':
nested_logs = dd.read_json(files, lines=True)
meta = pd.json_normalize(json.loads(nested_logs.head(1)['_raw'].to_list()[0])).iloc[:0,:].copy()
azure_logs = nested_logs.map_partitions(lambda df: _explode_raw(df), meta=meta)
start_time = time.perf_counter()

if filetype == 'jsonline':
if explode_raw:
nested_logs = dd.read_json(files, lines=True, storage_options=storage_options)
meta = pd.json_normalize(json.loads(nested_logs.head(1)['_raw'].to_list()[0])).iloc[:0,:].copy()
logs = nested_logs.map_partitions(lambda df: _explode_raw(df), meta=meta).fillna('nan')
else:
logs = dd.read_json(files, lines=True, storage_options=storage_options).fillna('nan')
elif filetype == 'json':
logs = dd.read_json(files, storage_options=storage_options).fillna('nan')
else:
azure_logs = dd.read_csv(files, delimiter=delimiter, dtype='object')
logs = dd.read_csv(files, delimiter=delimiter, storage_options=storage_options, dtype='object').fillna('nan')

azure_meta = {c: v for c, v in zip(azure_logs._meta, azure_logs._meta.dtypes)}
azure_meta['time'] = 'datetime64[ns]'
azure_meta['day'] = 'datetime64[ns]'
azure_meta['locincrement'] = 'int'
azure_meta['appincrement'] = 'int'
azure_meta['logcount'] = 'int'
logs_meta = {c: v for c, v in zip(logs._meta, logs._meta.dtypes)}
logs_meta['time'] = 'datetime64[ns]'
logs_meta['day'] = 'datetime64[ns]'
if city_column is not None or state_column is not None or country_column is not None:
logs_meta['locincrement'] = 'int'
if application_column is not None:
logs_meta['appincrement'] = 'int'
logs_meta['logcount'] = 'int'

azure_logs.persist()

derived_azure = azure_logs.groupby(groupby).apply(lambda df: _azure_derived_features(df, timestamp_column, city_column, state_column, country_column, application_column), meta=azure_meta).reset_index(drop=True)
derived_logs = logs.groupby(groupby).apply(lambda df: _derived_features(df, timestamp_column, city_column, state_column, country_column, application_column), meta=logs_meta).reset_index(drop=True)

if min_records > 0:
user_entry_counts = azure_logs[[groupby, timestamp_column]].groupby(groupby).count().compute()
trainees = [user for user, count in user_entry_counts.to_dict()[timestamp_column].items() if count > min_records]
derived_azure[derived_azure[groupby].isin(trainees)].groupby(output_grouping).apply(lambda df: _save_groups(df, save_dir, "azure"), meta=derived_azure._meta).size.compute()
logs = logs.persist()
user_entry_counts = logs[[groupby, 'day']].groupby(groupby).count().compute()
trainees = [user for user, count in user_entry_counts.to_dict()['day'].items() if count > min_records]
derived_logs[derived_logs[groupby].isin(trainees)].groupby(output_grouping).apply(lambda df: _save_groups(df, save_dir, log_source), meta=derived_logs._meta).size.compute()
else:
derived_azure.groupby(output_grouping).apply(lambda df: _save_groups(df, save_dir, "azure"), meta=derived_azure._meta).size.compute()
derived_logs.groupby(output_grouping).apply(lambda df: _save_groups(df, save_dir, log_source), meta=derived_logs._meta).size.compute()

timing = datetime.timedelta(seconds = time.perf_counter() - start_time)

num_training_files = len([file for file in os.listdir(save_dir) if file.endswith('_azure.csv')])
print("%i training files successfully created" % num_training_files)
num_training_files = len([file for file in os.listdir(save_dir) if file.endswith('_{log_source}.csv'.format(log_source=log_source))])
print("{num_files} training files successfully created in {time}".format(num_files=num_training_files, time=timing))
if num_training_files > 0:
return True
else:
return False

def proc_duo_logs(files,
save_dir,
delimiter = ',',
groupby = 'username',
timestamp_column = 'isotimestamp',
city_column = 'location.city',
state_column = 'location.state',
country_column = 'location.country',
output_grouping = None,
extension=None,
min_records = 0):

"""
Process Duo log files for DFP training.
def _run():
opt = parser.parse_args()

client = Client()
client.restart()

print('Beginning {origin} pre-processing'.format(origin=opt.origin))
proc_logs(files=opt.files,
log_source=opt.origin,
save_dir=opt.save_dir,
filetype=opt.filetype,
explode_raw=opt.explode_raw,
delimiter=opt.delimiter,
groupby=opt.groupby or 'userPrincipalName',
timestamp_column=opt.timestamp or 'createdDateTime',
city_column=opt.city,
state_column=opt.state,
country_column=opt.country,
application_column=opt.app,
output_grouping=opt.manager,
extension=opt.extension,
min_records=opt.min_records)

Parameters
----------
files: str or List[str]
A directory or filepath or list of filepaths
save_dir: str
The directory to save the training data
filetype: str, default='csv'
'csv' or 'json'
delimiter: str, default=','
The csv delimiter
groupby: str, default='userPrincipalName'
The column name to aggregate over for derived feature creation.
timestamp_column: str, default='createdDateTime
The column name containing the timestamp
city_column: str, default='location.city'
The column name containing the city location data
state_column: str, default='location.state'
The column name containing the state location data
country_column: str, default='location.countryOrRegion
The column name containing the country location data
output_grouping: str, optional
The column to aggregate the output training data. If None, this defaults to the aggregation level specified in the groupby parameter.
This is where you would specify the manager name column, if training is being done by manager group.
extension: str, optional
Specify the file extension to load, if the directory contains additional files that should not be loaded.
min_records: int, default=0
The minimum number of records that need to be observed to save the data for training. Setting this to 0 creates data for all users.
Returns
-------
bool
True if more than 1 training file is returned, else False is returned
client.close()

"""

if output_grouping is None:
output_grouping = groupby

_if_dir_not_exists(save_dir)

if isinstance(files, str):
if os.path.isdir(files):
if extension is not None:
files = [os.path.join(files, file) for file in os.listdir(files) if file.endswith(extension)]
else:
files = [os.path.join(files, file) for file in os.listdir(files)]
elif os.path.isfile(files):
files = [files]
else:
files = []
assert isinstance(files, list) and len(files) > 0, 'Please pass a directory, a file-path, or a list of file-paths containing the files to be processed'

duo_logs = dd.read_csv(files, delimiter=delimiter, dtype='object')

duo_meta = {c: v for c, v in zip(duo_logs._meta, duo_logs._meta.dtypes)}
duo_meta['time'] = 'datetime64[ns]'
duo_meta['day'] = 'datetime64[ns]'
duo_meta['locincrement'] = 'int'
duo_meta['logcount'] = 'int'

duo_logs.persist()

derived_duo = duo_logs.groupby(groupby).apply(lambda df: _duo_derived_features(df, timestamp_column, city_column, state_column, country_column), meta=duo_meta).reset_index(drop=True)

if min_records > 0:
user_entry_counts = duo_logs[[groupby, timestamp_column]].groupby(groupby).count().compute()
trainees = [user for user, count in user_entry_counts.to_dict()[timestamp_column].items() if count > min_records]
derived_duo[derived_duo[groupby].isin(trainees)].groupby(output_grouping).apply(lambda df: _save_groups(df, save_dir, "duo"), meta=duo_meta).size.compute()
else:
derived_duo.groupby(output_grouping).apply(lambda df: _save_groups(df, save_dir, "duo"), meta=duo_meta).size.compute()

num_training_files = len([file for file in os.listdir(save_dir) if file.endswith('_duo.csv')])
print("%i training files successfully created" % num_training_files)
if num_training_files > 0:
return True
else:
return False
if __name__ == '__main__':
_run()

0 comments on commit 53cbc9f

Please sign in to comment.