Skip to content
This repository has been archived by the owner on Mar 29, 2023. It is now read-only.

Commit

Permalink
Fixed S3 Prefix bug. Added a parameter that
Browse files Browse the repository at this point in the history
allows specification of columns that should be
lower case with spaces replaced with an '_'
  • Loading branch information
shawn-davis committed Aug 9, 2022
1 parent b9aced5 commit 21224ae
Showing 1 changed file with 41 additions and 23 deletions.
64 changes: 41 additions & 23 deletions dfp/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ def _if_dir_not_exists(directory):
os.makedirs(directory)


def _explode_raw(df):
df2 = pd.json_normalize(df['_raw'].apply(json.loads))
def _explode_raw(df, sep):
df2 = pd.json_normalize(df['_raw'].apply(json.loads), sep=sep)
return df2


def _derived_features(df, timestamp_column, city_column, state_column, country_column, application_column):
def _derived_features(df, timestamp_column, city_column, state_column, country_column, application_column, normalize_strings):
pdf = df.copy()
pdf['time'] = pd.to_datetime(pdf[timestamp_column], errors='coerce')
pdf['day'] = pdf['time'].dt.date
Expand All @@ -66,7 +66,11 @@ def _derived_features(df, timestamp_column, city_column, state_column, country_c
pdf['appincrement'] = pdf.groupby('day')['app_cat'].expanding(1).max().droplevel(0)
pdf.drop('app_cat', inplace=True, axis=1)
pdf["logcount"]=pdf.groupby('day').cumcount()

if normalize_strings:
for feature_col in normalize_strings:
if feature_col in pdf.columns:
pdf[feature_col] = pdf[feature_col].str.lower()
pdf[feature_col] = pdf[feature_col].str.replace(" ", "_")
return pdf


Expand All @@ -82,33 +86,34 @@ def _parse_time(df, timestamp_column):
return pdf


def _s3_load(access, secret, token, bucket, key, filetype, explode_raw, delimiter):
def _s3_load(access, secret, token, bucket, key, filetype, explode_raw, delimiter, sep):
session = boto3.Session(aws_access_key_id=access, aws_secret_access_key=secret, aws_session_token=token)
client = session.client('s3')
data = client.get_object(Bucket=bucket, Key=key)
contents = data['Body']
if filetype.startswith('json'):
log = json.load(contents)
if explode_raw:
pdf = pd.json_normalize(log['_raw'])
pdf = pd.json_normalize(log['_raw'], sep=sep)
else:
pdf = pd.json_normalize(log)
pdf = pd.json_normalize(log, sep=sep)
else:
pdf = pd.read_csv(contents, delimiter=delimiter).fillna
return pdf


def _load_json(file):
def _load_json(file, sep):
with open(file) as json_in:
log = json.load(json_in)
pdf = pd.json_normalize(log)
pdf = pd.json_normalize(log, sep=sep)
return pdf


def proc_logs(files,
save_dir,
log_source = 'duo',
filetype = 'csv',
sep = '.',
s3 = False,
aws_key = None,
aws_secret = None,
Expand All @@ -121,6 +126,7 @@ def proc_logs(files,
state_column = None,
country_column = None,
application_column = None,
normalize_strings = None,
output_grouping = None,
extension=None,
min_records = 0):
Expand All @@ -137,23 +143,31 @@ def proc_logs(files,
The source of the logs. Used primarily for tracing training data provenance.
filetype: str, default='csv'
'csv', 'json', or 'jsonline'
storage_options: dict
any arguments to pass to dask if trying to access data from remote locations such as AWS
sep: str, default='.'
The character to delimit nested json keys.
s3: bool
Flag to indicate data should be loaded from s3
aws_key: str
AWS Access Key
aws_secret: str
AWS Secret Key
aws_token: str
AWS Token
explode_raw: bool
This indicates that the data is in a nested jsonlines object with the _raw key
delimiter: str, default=','
The csv delimiter
groupby: str, default='userPrincipalName'
groupby: str
The column name to aggregate over for derived feature creation.
timestamp_column: str, default='createdDateTime
The column name containing the timestamp
city_column: str, default='location.city'
city_column: str
The column name containing the city location data
state_column: str, default='location.state'
state_column: str
The column name containing the state location data
country_column: str, default='location.countryOrRegion
country_column: str
The column name containing the country location data
application_column: str, default='appDisplayName'
application_column: str
The column name containing the app name data
output_grouping: str, optional
The column to aggregate the output training data. If None, this defaults to the aggregation level specified in the groupby parameter.
Expand All @@ -173,14 +187,18 @@ def proc_logs(files,

if output_grouping is None:
output_grouping = groupby
if isinstance(normalize_strings, str):
normalize_strings = [normalize_strings]
if not isinstance(normalize_strings, list):
normalize_strings = None

_if_dir_not_exists(save_dir)

if s3:
if '/' in files:
split_bucket = files.split('/')
bucket = split_bucket[0]
prefix = split_bucket[1:]
prefix = '/'.join(split_bucket[1:])
else:
bucket = files
prefix = None
Expand All @@ -200,7 +218,7 @@ def proc_logs(files,
if extension is not None:
keys = [key for key in keys if key.endswith(extension)]
assert len(keys) > 0, 'Please pass a directory, a file-path, or a list of file-paths containing the files to be processed'
dfs = [dask.delayed(_s3_load)(aws_key, aws_secret, aws_token, bucket, k, filetype, explode_raw, delimiter) for k in keys]
dfs = [dask.delayed(_s3_load)(aws_key, aws_secret, aws_token, bucket, k, filetype, explode_raw, delimiter, sep) for k in keys]
ddfs = [dd.from_delayed(df) for df in dfs]
logs = dd.concat(ddfs).fillna('nan')
else:
Expand All @@ -218,15 +236,15 @@ def proc_logs(files,
if filetype == 'jsonline':
if explode_raw:
nested_logs = dd.read_json(files, lines=True)
meta = pd.json_normalize(json.loads(nested_logs.head(1)['_raw'].to_list()[0])).iloc[:0,:].copy()
logs = nested_logs.map_partitions(lambda df: _explode_raw(df), meta=meta).fillna('nan')
meta = pd.json_normalize(json.loads(nested_logs.head(1)['_raw'].to_list()[0]), sep=sep).iloc[:0,:].copy()
logs = nested_logs.map_partitions(lambda df: _explode_raw(df, sep), meta=meta).fillna('nan')
else:
dfs = [dask.delayed(_load_json)(x) for x in files]
dfs = [dask.delayed(_load_json)(x, sep) for x in files]
# logs = dd.from_delayed(dfs, verify_meta=False)
ddfs = [dd.from_delayed(df) for df in dfs]
logs = dd.concat(ddfs).fillna('nan')
elif filetype == 'json':
dfs = [dask.delayed(_load_json)(x) for x in files]
dfs = [dask.delayed(_load_json)(x, sep) for x in files]
# logs = dd.from_delayed(dfs, verify_meta=False)
ddfs = [dd.from_delayed(df) for df in dfs]
logs = dd.concat(ddfs).fillna('nan')
Expand All @@ -242,7 +260,7 @@ def proc_logs(files,
logs_meta['appincrement'] = 'int'
logs_meta['logcount'] = 'int'

derived_logs = logs.groupby(groupby).apply(lambda df: _derived_features(df, timestamp_column, city_column, state_column, country_column, application_column), meta=logs_meta).reset_index(drop=True)
derived_logs = logs.groupby(groupby).apply(lambda df: _derived_features(df, timestamp_column, city_column, state_column, country_column, application_column, normalize_strings), meta=logs_meta).reset_index(drop=True)

# derived_meta = derived_logs.head(1).iloc[:0,:].copy()

Expand Down

0 comments on commit 21224ae

Please sign in to comment.