-
Notifications
You must be signed in to change notification settings - Fork 5
/
feature_generators.py
591 lines (513 loc) · 23.8 KB
/
feature_generators.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
from collections import OrderedDict
from collate.collate import Aggregate, Categorical, Compare
from collate.spacetime import SpacetimeAggregation
from architect.utils import convert_str_to_relativedelta
import sqlalchemy
import logging
class FeatureGenerator(object):
def __init__(
self,
db_engine,
features_schema_name,
replace=True,
feature_start_time=None
):
"""Generates aggregate features using collate
Args:
db_engine (sqlalchemy.db.engine)
features_schema_name (string) Name of schema where feature
tables should be written to
replace (boolean, optional) Whether or not existing features
should be replaced
feature_start_time (string/datetime, optional) point in time before which
should not be included in features
"""
self.db_engine = db_engine
self.features_schema_name = features_schema_name
self.categorical_cache = {}
self.replace = replace
self.feature_start_time = feature_start_time
self.entity_id_column = 'entity_id'
def _validate_keys(self, aggregation_config):
for key in ['from_obj', 'intervals', 'groups', 'knowledge_date_column', 'prefix']:
if key not in aggregation_config:
raise ValueError(
'{} required as key: aggregation config: {}'
.format(key, aggregation_config)
)
def _validate_aggregates(self, aggregation_config):
if 'aggregates' not in aggregation_config \
and 'categoricals' not in aggregation_config \
and 'array_categoricals' not in aggregation_config:
raise ValueError(
'Need either aggregates, categoricals, or array_categoricals' +
' in {}'.format(aggregation_config)
)
def _validate_categoricals(self, categoricals):
for categorical in categoricals:
if 'choice_query' in categorical:
logging.info('Validating choice query')
try:
with self.db_engine.begin() as conn:
conn.execute('explain {}'.format(categorical['choice_query']))
except Exception as exc:
raise ValueError(
'choice query does not run. \n'
'choice query: "{}"\n'
'Full error: {}'
.format(categorical['choice_query'], exc)
)
def _validate_from_obj(self, from_obj):
logging.info('Validating from_obj')
try:
with self.db_engine.begin() as conn:
conn.execute('explain select * from {}'.format(from_obj))
except Exception as exc:
raise ValueError(
'from_obj query does not run. \n'
'from_obj: "{}"\n'
'Full error: {}'
.format(from_obj, exc)
)
def _validate_time_intervals(self, intervals):
logging.info('Validating time intervals')
for interval in intervals:
if interval != 'all':
convert_str_to_relativedelta(interval)
def _validate_groups(self, groups):
if 'entity_id' not in groups:
raise ValueError('One of the aggregation groups is required to be entity_id')
def _validate_imputation_rule(self, aggregate_type, impute_rule):
"""Validate the imputation rule for a given aggregation type."""
# dictionary of imputation type : required parameters
valid_imputations = {
'all': {
'mean': [],
'constant': ['value'],
'zero': [],
'zero_noflag': [],
'error': []
},
'aggregates': {
'binary_mode': [],
},
'categoricals': {
'null_category': []
}
}
valid_imputations['array_categoricals'] = valid_imputations['categoricals']
# the valid imputation rules for the specific aggregation type being checked
valid_types = dict(valid_imputations['all'], **valid_imputations[aggregate_type])
# no imputation rule was specified
if 'type' not in impute_rule.keys():
raise ValueError('Imputation type must be specified')
# a rule was specified, but not valid for this type of aggregate
if impute_rule['type'] not in valid_types.keys():
raise ValueError('Invalid imputation type %s for %s'
% (impute_rule['type'], aggregate_type))
# check that all required parameters exist in the keys of the imputation rule
required_params = valid_types[impute_rule['type']]
for param in required_params:
if param not in impute_rule.keys():
raise ValueError('Missing param %s for %s' % (param, impute_rule['type']))
def _validate_imputations(self, aggregation_config):
"""Validate the imputation rules in an aggregation config, looping
through all three types of aggregates. Most of the work here is
done by _validate_imputation_rule() to check the requirements of
each imputation rule found
"""
agg_types = ['aggregates', 'categoricals', 'array_categoricals']
for agg_type in agg_types:
# base_imp are the top-level rules, `such as aggregates_imputation`
base_imp = aggregation_config.get(agg_type+'_imputation', {})
# loop through the individual aggregates
for agg in aggregation_config.get(agg_type, []):
# combine any aggregate-level imputation rules with top-level ones
imp_dict = dict(base_imp, **agg.get('imputation', {}))
# imputation rules are metric-specific, so check each metric's rule
for metric in agg['metrics']:
# metric rules may be defined by the metric name (e.g., 'max')
# or with the 'all' catch-all, with named metrics taking
# precedence. If we fall back to {}, the rule validator will
# error out on no metric found.
impute_rule = imp_dict.get(metric, imp_dict.get('all', {}))
self._validate_imputation_rule(agg_type, impute_rule)
def _validate_aggregation(self, aggregation_config):
logging.info('Validating aggregation config %s', aggregation_config)
self._validate_keys(aggregation_config)
self._validate_aggregates(aggregation_config)
self._validate_categoricals(aggregation_config.get('categoricals', []))
self._validate_from_obj(aggregation_config['from_obj'])
self._validate_time_intervals(aggregation_config['intervals'])
self._validate_groups(aggregation_config['groups'])
self._validate_imputations(aggregation_config)
def validate(self, feature_aggregation_config):
"""Validate a feature aggregation config applied to this object
The validations range from basic type checks, key presence checks,
as well as validating the sql in from objects.
Args:
feature_aggregation_config (list) all values, except for feature
date, necessary to instantiate a collate.SpacetimeAggregation
Raises: ValueError if any part of the config is found to be invalid
"""
for aggregation in feature_aggregation_config:
self._validate_aggregation(aggregation)
def _compute_choices(self, choice_query):
if choice_query not in self.categorical_cache:
with self.db_engine.begin() as conn:
self.categorical_cache[choice_query] = [
row[0] for row in conn.execute(choice_query)
]
logging.info(
'Computed list of categoricals: %s for choice query: %s',
self.categorical_cache[choice_query],
choice_query
)
return self.categorical_cache[choice_query]
def _build_choices(self, categorical):
logging.info(
'Building categorical choices for column %s, metrics %s',
categorical['column'],
categorical['metrics']
)
if 'choices' in categorical:
logging.info('Found list of configured choices: %s', categorical['choices'])
return categorical['choices']
else:
return self._compute_choices(categorical['choice_query'])
def _build_categoricals(self, categorical_config, impute_rules):
# TODO: only include null flag where necessary
return [
Categorical(
col=categorical['column'],
choices=self._build_choices(categorical),
function=categorical['metrics'],
impute_rules=dict(
impute_rules,
coltype='categorical',
**categorical.get('imputation', {})
),
include_null=True
)
for categorical in categorical_config
]
def _build_array_categoricals(self, categorical_config, impute_rules):
# TODO: only include null flag where necessary
return [
Compare(
col=categorical['column'],
op='@>',
choices={
choice: "array['{}'::varchar]".format(choice)
for choice in
self._build_choices(categorical)
},
function=categorical['metrics'],
impute_rules=dict(
impute_rules,
coltype='array_categorical',
**categorical.get('imputation', {})
),
op_in_name=False,
quote_choices=False,
include_null=True
)
for categorical in categorical_config
]
def _aggregation(self, aggregation_config, feature_dates, state_table):
logging.info(
'Building collate.SpacetimeAggregation for config %s and as_of_dates %s',
aggregation_config,
feature_dates
)
# read top-level imputation rules from the aggregation config; we'll allow
# these to be overridden by imputation rules at the individual feature
# level as those get parsed as well
agimp = aggregation_config.get('aggregates_imputation', {})
catimp = aggregation_config.get('categoricals_imputation', {})
arrcatimp = aggregation_config.get('array_categoricals_imputation', {})
aggregates = [
Aggregate(
aggregate['quantity'],
aggregate['metrics'],
dict(agimp, coltype='aggregate', **aggregate.get('imputation', {}))
)
for aggregate in aggregation_config.get('aggregates', [])
]
logging.info('Found %s quantity aggregates', len(aggregates))
categoricals = self._build_categoricals(
aggregation_config.get('categoricals', []),
catimp
)
logging.info('Found %s categorical aggregates', len(categoricals))
array_categoricals = self._build_array_categoricals(
aggregation_config.get('array_categoricals', []),
arrcatimp
)
logging.info('Found %s array categorical aggregates', len(array_categoricals))
return SpacetimeAggregation(
aggregates + categoricals + array_categoricals,
from_obj=aggregation_config['from_obj'],
intervals=aggregation_config['intervals'],
groups=aggregation_config['groups'],
dates=feature_dates,
state_table=state_table,
state_group=self.entity_id_column,
date_column=aggregation_config['knowledge_date_column'],
output_date_column='as_of_date',
input_min_date=self.feature_start_time,
schema=self.features_schema_name,
prefix=aggregation_config['prefix']
)
def aggregations(self, feature_aggregation_config, feature_dates, state_table):
"""Creates collate.SpacetimeAggregations from the given arguments
Args:
feature_aggregation_config (list) all values, except for feature
date, necessary to instantiate a collate.SpacetimeAggregation
feature_dates (list) dates to generate features as of
state_table (string) schema.table_name for state table with all entity/date pairs
Returns: (list) collate.SpacetimeAggregations
"""
return [
self._aggregation(aggregation_config, feature_dates, state_table)
for aggregation_config in feature_aggregation_config
]
def generate_all_table_tasks(self, aggregations, task_type):
"""Generates SQL commands for creating, populating, and indexing
feature group tables
Args:
aggregations (list) collate.SpacetimeAggregation objects
type (str) either 'aggregation' or 'imputation'
Returns: (dict) keys are group table names, values are themselves dicts,
each with keys for different stages of table creation (prepare, inserts, finalize)
and with values being lists of SQL commands
"""
logging.debug('---------------------')
# pick the method to use for generating tasks depending on whether we're
# building the aggregations or imputations
if task_type == 'aggregation':
task_generator = self._generate_agg_table_tasks_for
logging.debug('---------FEATURE GENERATION------------')
elif task_type == 'imputation':
task_generator = self._generate_imp_table_tasks_for
logging.debug('---------FEATURE IMPUTATION------------')
else:
raise ValueError('Table task type must be aggregation or imputation')
logging.debug('---------------------')
table_tasks = OrderedDict()
for aggregation in aggregations:
table_tasks.update(task_generator(aggregation))
logging.info('Created %s tables', len(table_tasks.keys()))
return table_tasks
def create_all_tables(self, feature_aggregation_config, feature_dates, state_table):
"""Create all feature tables.
First builds the aggregation tables, and then performs
imputation on any null values, (requiring a two-step process to
determine which columns contain nulls after the initial
aggregation tables are built).
Args:
feature_aggregation_config (list) all values, except for
feature date, necessary to instantiate a
`collate.SpacetimeAggregation`
feature_dates (list) dates to generate features as of
state_table (string) schema.table_name for state table with
all entity/date pairs
Returns: (list) table names
"""
aggs = self.aggregations(
feature_aggregation_config,
feature_dates,
state_table
)
# first, generate and run table tasks for aggregations
table_tasks_aggregate = self.generate_all_table_tasks(
aggs,
task_type='aggregation'
)
self.process_table_tasks(table_tasks_aggregate)
# second, perform the imputations (this will query the tables
# constructed above to identify features containing nulls)
table_tasks_impute = self.generate_all_table_tasks(
aggs,
task_type='imputation'
)
impute_keys = self.process_table_tasks(table_tasks_impute)
# double-check that the imputation worked and no nulls remain
# in the data:
nullcols = []
with self.db_engine.begin() as conn:
for agg in aggs:
results = conn.execute(agg.find_nulls(imputed=True))
null_counts = results.first().items()
nullcols += [col for (col, val) in null_counts if val > 0]
if len(nullcols) > 0:
raise ValueError(
"Imputation failed for {} columns. Null values remain in: {}"
.format(len(nullcols), nullcols)
)
return impute_keys
def process_table_tasks(self, table_tasks):
for table_name, task in table_tasks.items():
logging.info('Running feature table queries for %s', table_name)
self.run_commands(task.get('prepare', []))
self.run_commands(task.get('inserts', []))
self.run_commands(task.get('finalize', []))
return table_tasks.keys()
def _explain_selects(self, aggregations):
with self.db_engine.begin() as conn:
for aggregation in aggregations:
for selectlist in aggregation.get_selects().values():
for select in selectlist:
query = 'explain ' + str(select)
results = list(conn.execute(query))
logging.debug(str(select))
logging.debug(results)
def _clean_table_name(self, table_name):
# remove the schema and quotes from the name
return table_name.split('.')[1].replace('"', "")
def _table_exists(self, table_name):
try:
with self.db_engine.begin() as conn:
conn.execute('select 1 from {}.{} limit 1'
.format(self.features_schema_name,
table_name)
).first()
except sqlalchemy.exc.ProgrammingError:
return False
else:
return True
def run_commands(self, command_list):
with self.db_engine.begin() as conn:
for command in command_list:
logging.debug('Executing feature generation query: %s', command)
conn.execute(command)
def _aggregation_index_query(self, aggregation, imputed=False):
return 'CREATE INDEX ON {} ({}, {})'.format(
aggregation.get_table_name(imputed=imputed),
self.entity_id_column,
aggregation.output_date_column
)
def _aggregation_index_columns(self, aggregation):
return sorted([group for group in aggregation.groups.keys()] + [aggregation.output_date_column])
def index_column_lookup(self, aggregations, imputed=True):
return dict((
self._clean_table_name(aggregation.get_table_name(imputed=imputed)),
self._aggregation_index_columns(aggregation)
) for aggregation in aggregations)
def _generate_agg_table_tasks_for(self, aggregation):
"""Generates SQL commands for preparing, populating, and finalizing
each feature group table in the given aggregation
Args:
aggregation (collate.SpacetimeAggregation)
Returns: (dict) of structure {
'prepare': list of commands to prepare table for population
'inserts': list of commands to populate table
'finalize': list of commands to finalize table after population
}
"""
create_schema = aggregation.get_create_schema()
creates = aggregation.get_creates()
drops = aggregation.get_drops()
indexes = aggregation.get_indexes()
inserts = aggregation.get_inserts()
if create_schema is not None:
with self.db_engine.begin() as conn:
conn.execute(create_schema)
table_tasks = OrderedDict()
for group in aggregation.groups:
group_table = self._clean_table_name(
aggregation.get_table_name(group=group)
)
imputed_table = self._clean_table_name(
aggregation.get_table_name(imputed=True)
)
if self.replace or (
not self._table_exists(group_table)
and
not self._table_exists(imputed_table)
):
table_tasks[group_table] = {
'prepare': [drops[group], creates[group]],
'inserts': inserts[group],
'finalize': [indexes[group]],
}
logging.info('Created table tasks for %s', group_table)
else:
logging.info(
'Skipping feature table creation for %s',
group_table
)
table_tasks[group_table] = {}
logging.info('Created table tasks for aggregation')
if self.replace or (
not self._table_exists(
self._clean_table_name(aggregation.get_table_name())
)
and
not self._table_exists(
self._clean_table_name(aggregation.get_table_name(imputed=True))
)
):
table_tasks[self._clean_table_name(aggregation.get_table_name())] = {
'prepare': [aggregation.get_drop(), aggregation.get_create()],
'inserts': [],
'finalize': [self._aggregation_index_query(aggregation)],
}
else:
table_tasks[self._clean_table_name(aggregation.get_table_name())] = {}
return table_tasks
def _generate_imp_table_tasks_for(self, aggregation, drop_preagg=True):
"""Generate SQL statements for preparing, populating, and
finalizing imputations, for each feature group table in the
given aggregation.
Requires the existance of the underlying feature and aggregation
tables defined in `_generate_agg_table_tasks_for()`.
Args:
aggregation (collate.SpacetimeAggregation)
drop_preagg: boolean to specify dropping pre-imputation
tables
Returns: (dict) of structure {
'prepare': list of commands to prepare table for population
'inserts': list of commands to populate table
'finalize': list of commands to finalize table after population
}
"""
table_tasks = OrderedDict()
imp_tbl_name = self._clean_table_name(
aggregation.get_table_name(imputed=True)
)
if not self.replace and self._table_exists(imp_tbl_name):
logging.info('Skipping imputation table creation for %s',
imp_tbl_name)
table_tasks[imp_tbl_name] = {}
return table_tasks
# excute query to find columns with null values and create lists of columns
# that do and do not need imputation when creating the imputation table
with self.db_engine.begin() as conn:
results = conn.execute(aggregation.find_nulls())
null_counts = results.first().items()
impute_cols = [col for (col, val) in null_counts if val > 0]
nonimpute_cols = [col for (col, val) in null_counts if val == 0]
# table tasks for imputed aggregation table, most of the work is done here
# by collate's get_impute_create()
table_tasks[imp_tbl_name] = {
'prepare': [
aggregation.get_drop(imputed=True),
aggregation.get_impute_create(
impute_cols=impute_cols,
nonimpute_cols=nonimpute_cols
)
],
'inserts': [],
'finalize': [self._aggregation_index_query(aggregation, imputed=True)]
}
logging.info('Created table tasks for imputation: %s', imp_tbl_name)
# do some cleanup:
# drop the group-level and aggregation tables, just leaving the
# imputation table if drop_preagg=True
if drop_preagg:
drops = aggregation.get_drops()
table_tasks[imp_tbl_name]['finalize'] += (list(drops.values()) +
[aggregation.get_drop()])
logging.info('Added drop table cleanup tasks: %s', imp_tbl_name)
return table_tasks