From 21fadcf7ee2b942f70d2226c05383550a2563995 Mon Sep 17 00:00:00 2001 From: Erika Salomon Date: Mon, 15 May 2017 16:43:02 +0000 Subject: [PATCH 1/2] Select Entities from Entities Table: If merged, this commit will: - change the logic for creating entity-date sets so that for train sets, it filters the labels against and entities table and for test sets, it creates a cross join of all entities in the entities table and the dates for the matrix [Resolves #69] - creates entity date tables as temporary tables [Resolves #60], [Resolves #51] - reformats entity date table names so that uuid is last [Resolves #50] --- tests/test_architect.py | 123 +++++++++++++++++++++-------- tests/utils.py | 16 +++- timechop/builders.py | 166 +++++++++++++++++++--------------------- 3 files changed, 183 insertions(+), 122 deletions(-) diff --git a/tests/test_architect.py b/tests/test_architect.py index 26fd760..d328b96 100644 --- a/tests/test_architect.py +++ b/tests/test_architect.py @@ -1,19 +1,21 @@ from timechop.architect import Architect from timechop import builders -from tests.utils import create_features_and_labels_schemas +from tests.utils import create_schemas from tests.utils import create_entity_date_df from tests.utils import convert_string_column_to_date from tests.utils import NamedTempFile from tests.utils import TemporaryDirectory + import testing.postgresql import csv import datetime import pandas as pd import os -from sqlalchemy import create_engine from unittest import TestCase +from sqlalchemy import create_engine from metta import metta_io as metta from mock import Mock +import pytest # make some fake features data @@ -132,6 +134,7 @@ 'features_schema_name': 'features', 'labels_schema_name': 'labels', 'labels_table_name': 'labels', + 'entities_table_name': 'staging.entities' } def test_build_labels_query(): @@ -141,11 +144,12 @@ def test_build_labels_query(): # set up labeling config variables dates = [datetime.datetime(2016, 1, 1, 0, 0), datetime.datetime(2016, 2, 1, 0, 0)] + entities = [0, 1, 3] with testing.postgresql.Postgresql() as postgresql: # create an engine and generate a table with fake feature data engine = create_engine(postgresql.url()) - create_features_and_labels_schemas(engine, features_tables, labels) + create_schemas(engine, features_tables, labels, entities) # make a dataframe of labels to test against labels_df = pd.DataFrame( @@ -160,11 +164,17 @@ def test_build_labels_query(): ] ) labels_df['as_of_date'] = convert_string_column_to_date(labels_df['as_of_date']) - + labels_df = labels_df[labels_df['entity_id'].isin(entities)] + # create an engine and generate a table with fake feature data with testing.postgresql.Postgresql() as postgresql: engine = create_engine(postgresql.url()) - create_features_and_labels_schemas(engine, features_tables, labels) + create_schemas( + engine=engine, + features_tables=features_tables, + labels=labels, + entities=entities + ) with TemporaryDirectory() as temp_dir: architect = Architect( beginning_of_time = datetime.datetime(2010, 1, 1, 0, 0), @@ -208,10 +218,16 @@ def test_write_to_csv(): """ Test the write_to_csv function by checking whether the csv contains the correct number of lines. """ + entities = [0, 2, 3] with testing.postgresql.Postgresql() as postgresql: # create an engine and generate a table with fake feature data engine = create_engine(postgresql.url()) - create_features_and_labels_schemas(engine, features_tables, labels) + create_schemas( + engine=engine, + features_tables=features_tables, + labels=labels, + entities=entities + ) with TemporaryDirectory() as temp_dir: architect = Architect( @@ -239,7 +255,6 @@ def test_write_to_csv(): reader = csv.reader(f) assert(len([row for row in reader]) == len(table) + 1) - def test_make_entity_date_table(): """ Test that the make_entity_date_table function contains the correct values. @@ -248,11 +263,14 @@ def test_make_entity_date_table(): datetime.datetime(2016, 2, 1, 0, 0), datetime.datetime(2016, 3, 1, 0, 0)] + entities = [0, 1, 2] + # make a dataframe of entity ids and dates to test against ids_dates = create_entity_date_df( dates, labels, dates, + entities, 'booking', 'binary', '1 month' @@ -261,7 +279,12 @@ def test_make_entity_date_table(): with testing.postgresql.Postgresql() as postgresql: # create an engine and generate a table with fake feature data engine = create_engine(postgresql.url()) - create_features_and_labels_schemas(engine, features_tables, labels) + create_schemas( + engine=engine, + features_tables=features_tables, + labels=labels, + entities=entities + ) with TemporaryDirectory() as temp_dir: architect = Architect( @@ -281,7 +304,6 @@ def test_make_entity_date_table(): as_of_times=dates, label_type='binary', label_name='booking', - feature_table_names=['features0', 'features1'], matrix_uuid='my_uuid', matrix_type='train', label_window='1 month' @@ -289,7 +311,7 @@ def test_make_entity_date_table(): # read in the table result = pd.read_sql( - "select * from features.{} order by entity_id, as_of_date".format(entity_date_table_name), + "select * from {} order by entity_id, as_of_date".format(entity_date_table_name), engine ) labels_df = pd.read_sql('select * from labels.labels', engine) @@ -305,17 +327,28 @@ def test_make_entity_date_table(): print(test) assert(test.all().all()) + # test that the table disappears after session closes + engine.dispose() + engine2 = create_engine(postgresql.url()) + try: + engine2.execute('select * from {}'.format(entity_date_table_name)) + except: + programmingerror = True + assert(programmingerror) + def test_build_outer_join_query(): """ """ dates = [datetime.datetime(2016, 1, 1, 0, 0), datetime.datetime(2016, 2, 1, 0, 0)] + entities = [1, 2, 3] # make dataframe for entity ids and dates ids_dates = create_entity_date_df( dates, labels, dates, + entities, 'booking', 'binary', '1 month' @@ -342,8 +375,12 @@ def test_build_outer_join_query(): # create an engine and generate a table with fake feature data with testing.postgresql.Postgresql() as postgresql: engine = create_engine(postgresql.url()) - create_features_and_labels_schemas(engine, features_tables, labels) - + create_schemas( + engine=engine, + features_tables=features_tables, + labels=labels, + entities=entities + ) with TemporaryDirectory() as temp_dir: architect = Architect( beginning_of_time = datetime.datetime(2010, 1, 1, 0, 0), @@ -360,7 +397,6 @@ def test_build_outer_join_query(): as_of_times=dates, label_type='binary', label_name='booking', - feature_table_names=['features0', 'features1'], matrix_type='train', matrix_uuid='my_uuid', label_window='1 month' @@ -373,7 +409,7 @@ def test_build_outer_join_query(): query = architect.builder.build_outer_join_query( as_of_times = dates, right_table_name = 'features.{}'.format(table_name), - entity_date_table_name = 'features.{}'.format(entity_date_table_name), + entity_date_table_name = entity_date_table_name, right_column_selections = architect.builder._format_imputations( features[table_number] ) @@ -596,14 +632,21 @@ def test_generate_plans(): class TestBuildMatrix(object): def test_train_matrix(self): + dates = [ + datetime.datetime(2016, 1, 1, 0, 0), + datetime.datetime(2016, 2, 1, 0, 0), + datetime.datetime(2016, 3, 1, 0, 0) + ] + entities = [0, 1, 2] with testing.postgresql.Postgresql() as postgresql: # create an engine and generate a table with fake feature data engine = create_engine(postgresql.url()) - create_features_and_labels_schemas(engine, features_tables, labels) - - dates = [datetime.datetime(2016, 1, 1, 0, 0), - datetime.datetime(2016, 2, 1, 0, 0), - datetime.datetime(2016, 3, 1, 0, 0)] + create_schemas( + engine=engine, + features_tables=features_tables, + labels=labels, + entities=entities + ) with TemporaryDirectory() as temp_dir: architect = Architect( @@ -644,17 +687,25 @@ def test_train_matrix(self): ) with open(matrix_filename, 'r') as f: reader = csv.reader(f) - assert(len([row for row in reader]) == 12) + assert(len([row for row in reader]) == 9) def test_test_matrix(self): + dates = [ + datetime.datetime(2016, 1, 1, 0, 0), + datetime.datetime(2016, 2, 1, 0, 0), + datetime.datetime(2016, 3, 1, 0, 0) + ] + entities = [0, 1, 3] + with testing.postgresql.Postgresql() as postgresql: # create an engine and generate a table with fake feature data engine = create_engine(postgresql.url()) - create_features_and_labels_schemas(engine, features_tables, labels) - - dates = [datetime.datetime(2016, 1, 1, 0, 0), - datetime.datetime(2016, 2, 1, 0, 0), - datetime.datetime(2016, 3, 1, 0, 0)] + create_schemas( + engine=engine, + features_tables=features_tables, + labels=labels, + entities=entities + ) with TemporaryDirectory() as temp_dir: architect = Architect( @@ -702,17 +753,25 @@ def test_test_matrix(self): with open(matrix_filename, 'r') as f: reader = csv.reader(f) - assert(len([row for row in reader]) == 13) + assert(len([row for row in reader]) == 10) def test_replace(self): + dates = [ + datetime.datetime(2016, 1, 1, 0, 0), + datetime.datetime(2016, 2, 1, 0, 0), + datetime.datetime(2016, 3, 1, 0, 0) + ] + entities = [0, 2, 3] + with testing.postgresql.Postgresql() as postgresql: # create an engine and generate a table with fake feature data engine = create_engine(postgresql.url()) - create_features_and_labels_schemas(engine, features_tables, labels) - - dates = [datetime.datetime(2016, 1, 1, 0, 0), - datetime.datetime(2016, 2, 1, 0, 0), - datetime.datetime(2016, 3, 1, 0, 0)] + create_schemas( + engine=engine, + features_tables=features_tables, + labels=labels, + entities=entities + ) with TemporaryDirectory() as temp_dir: architect = Architect( @@ -761,7 +820,7 @@ def test_replace(self): with open(matrix_filename, 'r') as f: reader = csv.reader(f) - assert(len([row for row in reader]) == 13) + assert(len([row for row in reader]) == 10) # rerun architect.builder.make_entity_date_table = Mock() diff --git a/tests/utils.py b/tests/utils.py index e912228..10eb938 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -11,7 +11,7 @@ def convert_string_column_to_date(column): [datetime.datetime.strptime(date, '%Y-%m-%d').date() for date in column] ) -def create_features_and_labels_schemas(engine, features_tables, labels): +def create_schemas(engine, features_tables, labels, entities): """ This function makes a features schema and populates it with the fake data from above. @@ -41,6 +41,18 @@ def create_features_and_labels_schemas(engine, features_tables, labels): 'insert into labels.labels values (%s, %s, %s, %s, %s, %s)', row ) + # create entities table + engine.execute('drop table if exists staging cascade; create schema staging;') + engine.execute( + """ + create table staging.entities ( + entity_id int + ) + """ + ) + for entity in entities: + engine.execute('insert into staging.entities values (%s)', entity) + def create_features_table(table_number, table, engine): engine.execute( @@ -62,6 +74,7 @@ def create_entity_date_df( dates, labels, as_of_dates, + entities, label_name, label_type, label_window @@ -89,6 +102,7 @@ def create_entity_date_df( '%Y-%m-%d' ).date() for date in ids_dates['as_of_date']] ids_dates = ids_dates[ids_dates['as_of_date'].isin(dates)] + ids_dates = ids_dates[ids_dates['entity_id'].isin(entities)] print(ids_dates) print(dates) diff --git a/timechop/builders.py b/timechop/builders.py index b7fa196..e9a6e68 100644 --- a/timechop/builders.py +++ b/timechop/builders.py @@ -64,6 +64,7 @@ def build_labels_query( SELECT entity_id, as_of_date{labels} FROM {labels_schema_name}.{labels_table_name} + INNER JOIN {entities_table} USING(entity_id) WHERE as_of_date IN (SELECT (UNNEST (ARRAY{times}::timestamp[]))) AND label_name = '{l_name}' AND label_type = '{l_type}' AND @@ -71,13 +72,14 @@ def build_labels_query( ORDER BY entity_id, as_of_date """.format( - labels = final_column, - labels_schema_name = self.db_config['labels_schema_name'], - labels_table_name = self.db_config['labels_table_name'], - times = as_of_time_strings, - l_name = label_name, - l_type = label_type, - window = label_window + labels=final_column, + labels_schema_name=self.db_config['labels_schema_name'], + labels_table_name=self.db_config['labels_table_name'], + entities_table=self.db_config['entities_table_name'], + times=as_of_time_strings, + l_name=label_name, + l_type=label_type, + window=label_window ) return(query) @@ -138,7 +140,6 @@ def make_entity_date_table( as_of_times, label_name, label_type, - feature_table_names, matrix_type, matrix_uuid, label_window @@ -164,19 +165,17 @@ def make_entity_date_table( ) elif matrix_type == 'test': indices_query = self.get_all_valid_entity_date_combos( - as_of_times=as_of_times, - feature_table_names=feature_table_names + as_of_times=as_of_times ) else: raise ValueError('Unknown matrix type passed: {}'.format(matrix_type)) - table_name = '_'.join([matrix_uuid, 'tmp_entity_date']) + table_name = '_'.join(['tmp_entity_date', matrix_uuid]) query = """ - DROP TABLE IF EXISTS {features_schema_name}."{table_name}"; - CREATE TABLE {features_schema_name}."{table_name}" + DROP TABLE IF EXISTS "{table_name}"; + CREATE TEMPORARY TABLE "{table_name}" AS ({index_query}) """.format( - features_schema_name=self.db_config['features_schema_name'], table_name=table_name, index_query=indices_query ) @@ -184,26 +183,24 @@ def make_entity_date_table( return table_name - def get_all_valid_entity_date_combos(self, as_of_times, feature_table_names): - as_of_time_strings = [str(as_of_time) for as_of_time in as_of_times] - query_list = [] - for index, table in enumerate(feature_table_names): - union = '' - if index != 0: - union = 'UNION' - subquery = """ {u} - SELECT DISTINCT entity_id, as_of_date - FROM {schema_name}.{table_name} - WHERE as_of_date IN (SELECT (UNNEST (ARRAY{dates}::timestamp[]))) - """.format( - u = union, - table_name = table, - dates = as_of_time_strings, - schema_name = self.db_config['features_schema_name'] - ) - query_list.append(subquery) - - return(''.join(query_list)) + def get_all_valid_entity_date_combos(self, as_of_times): + as_of_time_strings = [str(as_of_time) for as_of_time in as_of_times] + query = """ + SELECT entity_id, as_of_date + FROM ( + SELECT DISTINCT entity_id + FROM {entities_table} + ) AS e + CROSS JOIN ( + SELECT as_of_date + FROM (SELECT (UNNEST (ARRAY{times}::timestamp[]))) t(as_of_date) + ) AS d + ORDER BY entity_id, as_of_date + """.format( + entities_table=self.db_config['entities_table_name'], + times=as_of_time_strings + ) + return(query) class CSVBuilder(BuilderBase): @@ -256,63 +253,54 @@ def build_matrix( as_of_times, label_name, label_type, - feature_dictionary.keys(), matrix_type, matrix_uuid, matrix_metadata['label_window'] ) + logging.info('Writing feature group data') + features_csv_names = self.write_features_data( + as_of_times, + feature_dictionary, + entity_date_table_name, + matrix_uuid + ) try: - logging.info('Writing feature group data') - features_csv_names = self.write_features_data( + logging.info('Writing label data') + labels_csv_name = self.write_labels_data( as_of_times, - feature_dictionary, + label_name, + label_type, + matrix_type, entity_date_table_name, + matrix_uuid, + matrix_metadata['label_window'] + ) + features_csv_names.insert(0, labels_csv_name) + + # stitch together the csvs + logging.info('Merging features data') + output = self.merge_feature_csvs( + features_csv_names, + matrix_directory, matrix_uuid ) - try: - logging.info('Writing label data') - labels_csv_name = self.write_labels_data( - as_of_times, - label_name, - label_type, - matrix_type, - entity_date_table_name, - matrix_uuid, - matrix_metadata['label_window'] - ) - features_csv_names.insert(0, labels_csv_name) - - # stitch together the csvs - logging.info('Merging features data') - output = self.merge_feature_csvs( - features_csv_names, - matrix_directory, - matrix_uuid - ) - finally: - # clean up files and database before finishing - for csv_name in features_csv_names: - self.remove_file(csv_name) - try: - # store the matrix - logging.info('Archiving matrix with metta') - metta.archive_matrix( - matrix_config=matrix_metadata, - df_matrix=output, - overwrite=True, - directory=self.matrix_directory, - format='csv' - ) - finally: - if isinstance(output, str): - os.remove(output) finally: - self.engine.execute( - 'drop table "{}"."{}";'.format( - self.db_config['features_schema_name'], - entity_date_table_name - ) + # clean up files and database before finishing + for csv_name in features_csv_names: + self.remove_file(csv_name) + try: + # store the matrix + logging.info('Archiving matrix with metta') + metta.archive_matrix( + matrix_config=matrix_metadata, + df_matrix=output, + overwrite=True, + directory=self.matrix_directory, + format='csv' ) + finally: + if isinstance(output, str): + os.remove(output) def write_labels_data( @@ -350,10 +338,7 @@ def write_labels_data( schema=self.db_config['labels_schema_name'], table=self.db_config['labels_table_name'] ), - entity_date_table_name='"{schema}"."{table}"'.format( - schema=self.db_config['features_schema_name'], - table=entity_date_table_name - ), + entity_date_table_name='"{}"'.format(entity_date_table_name), right_column_selections=', r.label as {}'.format(label_name), additional_conditions='''AND r.label_name = '{name}' AND @@ -372,7 +357,13 @@ def write_labels_data( self.write_to_csv(labels_query, csv_name) return(csv_name) - def write_features_data(self, as_of_times, feature_dictionary, entity_date_table_name, matrix_uuid): + def write_features_data( + self, + as_of_times, + feature_dictionary, + entity_date_table_name, + matrix_uuid + ): """ Loop over tables in features schema, writing the data from each to a csv. Return the full list of feature csv names and the list of all features. @@ -394,10 +385,7 @@ def write_features_data(self, as_of_times, feature_dictionary, entity_date_table schema = self.db_config['features_schema_name'], table = feature_table_name ), - entity_date_table_name = '{schema}."{table}"'.format( - schema = self.db_config['features_schema_name'], - table = entity_date_table_name - ), + entity_date_table_name = '"{}"'.format(entity_date_table_name), right_column_selections = self._format_imputations(feature_names) ) self.write_to_csv(features_query, csv_name) From 38ca3de044c5a1cb0b4c3222b2dfae0f3113f7cb Mon Sep 17 00:00:00 2001 From: Erika Salomon Date: Thu, 25 May 2017 01:13:33 +0000 Subject: [PATCH 2/2] first pass state table --- tests/test_architect.py | 318 ++++++++++++++++++++++++++-------------- tests/utils.py | 40 +++-- timechop/architect.py | 34 ++++- timechop/builders.py | 55 +++---- 4 files changed, 293 insertions(+), 154 deletions(-) diff --git a/tests/test_architect.py b/tests/test_architect.py index d328b96..6704e8b 100644 --- a/tests/test_architect.py +++ b/tests/test_architect.py @@ -20,6 +20,39 @@ # make some fake features data +states = [ + [0, '2016-02-01', False, True], + [0, '2016-02-01', False, True], + [0, '2016-03-01', False, True], + [0, '2016-04-01', False, True], + [0, '2016-05-01', False, True], + [1, '2016-01-01', True, False], + [1, '2016-02-01', True, False], + [1, '2016-03-01', True, False], + [1, '2016-04-01', True, False], + [1, '2016-05-01', True, False], + [2, '2016-01-01', True, False], + [2, '2016-02-01', True, True], + [2, '2016-03-01', True, False], + [2, '2016-04-01', True, True], + [2, '2016-05-01', True, False], + [3, '2016-01-01', False, True], + [3, '2016-02-01', True, True], + [3, '2016-03-01', False, True], + [3, '2016-04-01', True, True], + [3, '2016-05-01', False, True], + [4, '2016-01-01', True, True], + [4, '2016-02-01', True, True], + [4, '2016-03-01', True, True], + [4, '2016-04-01', True, True], + [4, '2016-05-01', True, True], + [5, '2016-01-01', False, False], + [5, '2016-02-01', False, False], + [5, '2016-03-01', False, False], + [5, '2016-04-01', False, False], + [5, '2016-05-01', False, False] +] + features0 = [ [0, '2016-01-01', 2, 0], [1, '2016-01-01', 1, 2], @@ -39,7 +72,10 @@ [0, '2016-03-01', 3, 3], [1, '2016-03-01', 3, 4], [2, '2016-03-01', 3, 3], - [3, '2016-03-01', 3, 4] + [3, '2016-03-01', 3, 4], + [0, '2016-03-01', 3, 3], + [4, '2016-03-01', 1, 4], + [5, '2016-03-01', 2, 4] ] features_tables = [features0, features1] @@ -86,6 +122,26 @@ [3, '2016-03-01', '1 month', 'ems', 'binary', 0], [3, '2016-04-01', '1 month', 'ems', 'binary', 1], [3, '2016-05-01', '1 month', 'ems', 'binary', 0], + [4, '2016-01-01', '1 month', 'booking', 'binary', 1], + [4, '2016-02-01', '1 month', 'booking', 'binary', 0], + [4, '2016-03-01', '1 month', 'booking', 'binary', 0], + [4, '2016-04-01', '1 month', 'booking', 'binary', 0], + [4, '2016-05-01', '1 month', 'booking', 'binary', 0], + [4, '2016-01-01', '1 month', 'ems', 'binary', 0], + [4, '2016-02-01', '1 month', 'ems', 'binary', 1], + [4, '2016-03-01', '1 month', 'ems', 'binary', 0], + [4, '2016-04-01', '1 month', 'ems', 'binary', 1], + [4, '2016-05-01', '1 month', 'ems', 'binary', 1], + [5, '2016-01-01', '1 month', 'booking', 'binary', 1], + [5, '2016-02-01', '1 month', 'booking', 'binary', 0], + [5, '2016-03-01', '1 month', 'booking', 'binary', 0], + [5, '2016-04-01', '1 month', 'booking', 'binary', 0], + [5, '2016-05-01', '1 month', 'booking', 'binary', 0], + [5, '2016-01-01', '1 month', 'ems', 'binary', 0], + [5, '2016-02-01', '1 month', 'ems', 'binary', 1], + [5, '2016-03-01', '1 month', 'ems', 'binary', 0], + [5, '2016-04-01', '1 month', 'ems', 'binary', 0], + [5, '2016-05-01', '1 month', 'ems', 'binary', 0], [0, '2016-02-01', '3 month', 'booking', 'binary', 0], [0, '2016-03-01', '3 month', 'booking', 'binary', 0], [0, '2016-04-01', '3 month', 'booking', 'binary', 0], @@ -124,7 +180,27 @@ [3, '2016-02-01', '3 month', 'ems', 'binary', 0], [3, '2016-03-01', '3 month', 'ems', 'binary', 0], [3, '2016-04-01', '3 month', 'ems', 'binary', 1], - [3, '2016-05-01', '3 month', 'ems', 'binary', 0] + [3, '2016-05-01', '3 month', 'ems', 'binary', 0], + [4, '2016-01-01', '3 month', 'booking', 'binary', 0], + [4, '2016-02-01', '3 month', 'booking', 'binary', 0], + [4, '2016-03-01', '3 month', 'booking', 'binary', 1], + [4, '2016-04-01', '3 month', 'booking', 'binary', 0], + [4, '2016-05-01', '3 month', 'booking', 'binary', 1], + [4, '2016-01-01', '3 month', 'ems', 'binary', 0], + [4, '2016-02-01', '3 month', 'ems', 'binary', 0], + [4, '2016-03-01', '3 month', 'ems', 'binary', 0], + [4, '2016-04-01', '3 month', 'ems', 'binary', 0], + [4, '2016-05-01', '3 month', 'ems', 'binary', 1], + [5, '2016-01-01', '3 month', 'booking', 'binary', 0], + [5, '2016-02-01', '3 month', 'booking', 'binary', 0], + [5, '2016-03-01', '3 month', 'booking', 'binary', 1], + [5, '2016-04-01', '3 month', 'booking', 'binary', 0], + [5, '2016-05-01', '3 month', 'booking', 'binary', 1], + [5, '2016-01-01', '3 month', 'ems', 'binary', 0], + [5, '2016-02-01', '3 month', 'ems', 'binary', 0], + [5, '2016-03-01', '3 month', 'ems', 'binary', 0], + [5, '2016-04-01', '3 month', 'ems', 'binary', 1], + [5, '2016-05-01', '3 month', 'ems', 'binary', 0] ] label_name = 'booking' @@ -134,7 +210,7 @@ 'features_schema_name': 'features', 'labels_schema_name': 'labels', 'labels_table_name': 'labels', - 'entities_table_name': 'staging.entities' + 'sparse_state_table_name': 'staging.sparse_states' } def test_build_labels_query(): @@ -144,12 +220,6 @@ def test_build_labels_query(): # set up labeling config variables dates = [datetime.datetime(2016, 1, 1, 0, 0), datetime.datetime(2016, 2, 1, 0, 0)] - entities = [0, 1, 3] - - with testing.postgresql.Postgresql() as postgresql: - # create an engine and generate a table with fake feature data - engine = create_engine(postgresql.url()) - create_schemas(engine, features_tables, labels, entities) # make a dataframe of labels to test against labels_df = pd.DataFrame( @@ -163,8 +233,22 @@ def test_build_labels_query(): 'label' ] ) + states_df = pd.DataFrame( + states, + columns = [ + 'entity_id', + 'as_of_date', + 'state_one', + 'state_two' + ] + ).set_index(['entity_id', 'as_of_date']) + labels_df = labels_df[ + states_df['entity_id'] == labels_df['entity_id'] & + states_df['as_of_date'] == labels_df['as_of_date'] & + states_df['state_one'] & + states_df['state_two'] + ] labels_df['as_of_date'] = convert_string_column_to_date(labels_df['as_of_date']) - labels_df = labels_df[labels_df['entity_id'].isin(entities)] # create an engine and generate a table with fake feature data with testing.postgresql.Postgresql() as postgresql: @@ -173,17 +257,18 @@ def test_build_labels_query(): engine=engine, features_tables=features_tables, labels=labels, - entities=entities + states=states ) with TemporaryDirectory() as temp_dir: architect = Architect( - beginning_of_time = datetime.datetime(2010, 1, 1, 0, 0), - label_names = ['booking'], - label_types = ['binary'], - db_config = db_config, - matrix_directory = temp_dir, - user_metadata = {}, - engine = engine + beginning_of_time=datetime.datetime(2010, 1, 1, 0, 0), + label_names=['booking'], + label_types=['binary'], + states=['state_one AND state_two'], + db_config=db_config, + matrix_directory=temp_dir, + user_metadata={}, + engine=engine ) # get the queries and test them @@ -218,7 +303,6 @@ def test_write_to_csv(): """ Test the write_to_csv function by checking whether the csv contains the correct number of lines. """ - entities = [0, 2, 3] with testing.postgresql.Postgresql() as postgresql: # create an engine and generate a table with fake feature data engine = create_engine(postgresql.url()) @@ -226,19 +310,19 @@ def test_write_to_csv(): engine=engine, features_tables=features_tables, labels=labels, - entities=entities + states=states ) - with TemporaryDirectory() as temp_dir: architect = Architect( - beginning_of_time = datetime.datetime(2010, 1, 1, 0, 0), - label_names = ['booking'], - label_types = ['binary'], - db_config = db_config, - matrix_directory = temp_dir, - user_metadata = {}, - engine = engine, - builder_class = builders.LowMemoryCSVBuilder + beginning_of_time=datetime.datetime(2010, 1, 1, 0, 0), + label_names=['booking'], + label_types=['binary'], + states=['state_one AND state_two'], + db_config=db_config, + matrix_directory=temp_dir, + user_metadata={}, + engine=engine, + builder_class=builders.LowMemoryCSVBuilder ) # for each table, check that corresponding csv has the correct # of rows @@ -259,21 +343,22 @@ def test_make_entity_date_table(): """ Test that the make_entity_date_table function contains the correct values. """ - dates = [datetime.datetime(2016, 1, 1, 0, 0), - datetime.datetime(2016, 2, 1, 0, 0), - datetime.datetime(2016, 3, 1, 0, 0)] - - entities = [0, 1, 2] + dates = [ + datetime.datetime(2016, 1, 1, 0, 0), + datetime.datetime(2016, 2, 1, 0, 0), + datetime.datetime(2016, 3, 1, 0, 0) + ] # make a dataframe of entity ids and dates to test against ids_dates = create_entity_date_df( - dates, - labels, - dates, - entities, - 'booking', - 'binary', - '1 month' + labels=labels, + states=states, + as_of_dates=dates, + state_one=True, + state_two=True, + label_name='booking', + label_type='binary', + label_window='1 month' ) with testing.postgresql.Postgresql() as postgresql: @@ -283,27 +368,29 @@ def test_make_entity_date_table(): engine=engine, features_tables=features_tables, labels=labels, - entities=entities + states=states ) with TemporaryDirectory() as temp_dir: architect = Architect( - beginning_of_time = datetime.datetime(2010, 1, 1, 0, 0), - label_names = ['booking'], - label_types = ['binary'], - db_config = db_config, - matrix_directory = temp_dir, - user_metadata = {}, - engine = engine + beginning_of_time=datetime.datetime(2010, 1, 1, 0, 0), + label_names=['booking'], + label_types=['binary'], + states=['state_one AND state_two'], + db_config=db_config, + matrix_directory=temp_dir, + user_metadata={}, + engine=engine ) engine.execute( - 'CREATE TABLE features.tmp_entity_date (a int, b date);' + 'CREATE TEMPORARY TABLE tmp_entity_date (a int, b date);' ) # call the function to test the creation of the table entity_date_table_name = architect.builder.make_entity_date_table( as_of_times=dates, label_type='binary', label_name='booking', + state='state_one AND state_two', matrix_uuid='my_uuid', matrix_type='train', label_window='1 month' @@ -339,19 +426,21 @@ def test_make_entity_date_table(): def test_build_outer_join_query(): """ """ - dates = [datetime.datetime(2016, 1, 1, 0, 0), - datetime.datetime(2016, 2, 1, 0, 0)] + dates = [ + datetime.datetime(2016, 1, 1, 0, 0), + datetime.datetime(2016, 2, 1, 0, 0) + ] - entities = [1, 2, 3] # make dataframe for entity ids and dates ids_dates = create_entity_date_df( - dates, - labels, - dates, - entities, - 'booking', - 'binary', - '1 month' + labels=labels, + states=states, + as_of_dates=dates, + state_one=True, + state_two=True, + label_name='booking', + label_type='binary', + label_window='1 month' ) features = [['f1', 'f2'], ['f3', 'f4']] @@ -379,17 +468,18 @@ def test_build_outer_join_query(): engine=engine, features_tables=features_tables, labels=labels, - entities=entities + states=states ) with TemporaryDirectory() as temp_dir: architect = Architect( - beginning_of_time = datetime.datetime(2010, 1, 1, 0, 0), - label_names = ['booking'], - label_types = ['binary'], - db_config = db_config, - matrix_directory = temp_dir, - user_metadata = {}, - engine = engine + beginning_of_time=datetime.datetime(2010, 1, 1, 0, 0), + label_names=['booking'], + label_types=['binary'], + states=['state_one AND state_two'], + db_config=db_config, + matrix_directory=temp_dir, + user_metadata={}, + engine=engine ) # make the entity-date table @@ -397,6 +487,7 @@ def test_build_outer_join_query(): as_of_times=dates, label_type='binary', label_name='booking', + state='state_one AND state_two', matrix_type='train', matrix_uuid='my_uuid', label_window='1 month' @@ -406,6 +497,7 @@ def test_build_outer_join_query(): for table_number, df in enumerate(features_dfs): table_name = 'features{}'.format(table_number) df = df.fillna(0) + df = df.reset_index() query = architect.builder.build_outer_join_query( as_of_times = dates, right_table_name = 'features.{}'.format(table_name), @@ -414,7 +506,7 @@ def test_build_outer_join_query(): features[table_number] ) ) - result = pd.read_sql(query, engine) + result = pd.read_sql(query, engine).reset_index() test = (result == df) assert(test.all().all()) @@ -425,6 +517,7 @@ def test_merge_feature_csvs_lowmem(self): beginning_of_time = datetime.datetime(2010, 1, 1, 0, 0), label_names = ['booking'], label_types = ['binary'], + states = ['state_one AND state_two'], db_config = db_config, matrix_directory = temp_dir, user_metadata = {}, @@ -493,6 +586,7 @@ def test_badinput(self): beginning_of_time = datetime.datetime(2010, 1, 1, 0, 0), label_names = ['booking'], label_types = ['binary'], + states = ['state_one AND state_two'], db_config = db_config, matrix_directory = temp_dir, user_metadata = {}, @@ -603,6 +697,7 @@ def test_generate_plans(): beginning_of_time = datetime.datetime(2010, 1, 1, 0, 0), label_names = ['booking'], label_types = ['binary'], + states = ['state_one AND state_two'], db_config = db_config, user_metadata = {}, matrix_directory = '', # this test won't write anything @@ -637,7 +732,6 @@ def test_train_matrix(self): datetime.datetime(2016, 2, 1, 0, 0), datetime.datetime(2016, 3, 1, 0, 0) ] - entities = [0, 1, 2] with testing.postgresql.Postgresql() as postgresql: # create an engine and generate a table with fake feature data engine = create_engine(postgresql.url()) @@ -645,7 +739,7 @@ def test_train_matrix(self): engine=engine, features_tables=features_tables, labels=labels, - entities=entities + states=states ) with TemporaryDirectory() as temp_dir: @@ -653,6 +747,7 @@ def test_train_matrix(self): beginning_of_time = datetime.datetime(2010, 1, 1, 0, 0), label_names = ['booking'], label_types = ['binary'], + states = ['state_one AND state_two'], db_config = db_config, matrix_directory = temp_dir, user_metadata = {}, @@ -667,18 +762,19 @@ def test_train_matrix(self): 'label_name': 'booking', 'end_time': datetime.datetime(2016, 3, 1, 0, 0), 'beginning_of_time': datetime.datetime(2016, 1, 1, 0, 0), - 'label_window': '1 month' + 'label_window': '1 month', + 'state': 'state_one AND state_two' } uuid = metta.generate_uuid(matrix_metadata) architect.build_matrix( - as_of_times = dates, - label_name = 'booking', - label_type = 'binary', - feature_dictionary = feature_dictionary, - matrix_directory = temp_dir, - matrix_metadata = matrix_metadata, - matrix_uuid = uuid, - matrix_type = 'train' + as_of_times=dates, + label_name='booking', + label_type='binary', + feature_dictionary=feature_dictionary, + matrix_directory=temp_dir, + matrix_metadata=matrix_metadata, + matrix_uuid=uuid, + matrix_type='train' ) matrix_filename = os.path.join( @@ -695,7 +791,6 @@ def test_test_matrix(self): datetime.datetime(2016, 2, 1, 0, 0), datetime.datetime(2016, 3, 1, 0, 0) ] - entities = [0, 1, 3] with testing.postgresql.Postgresql() as postgresql: # create an engine and generate a table with fake feature data @@ -704,7 +799,7 @@ def test_test_matrix(self): engine=engine, features_tables=features_tables, labels=labels, - entities=entities + states=states ) with TemporaryDirectory() as temp_dir: @@ -712,6 +807,7 @@ def test_test_matrix(self): beginning_of_time = datetime.datetime(2010, 1, 1, 0, 0), label_names = ['booking'], label_types = ['binary'], + states = ['state_one AND state_two'], db_config = db_config, matrix_directory = temp_dir, user_metadata = {}, @@ -732,18 +828,19 @@ def test_test_matrix(self): 'label_name': 'booking', 'end_time': datetime.datetime(2016, 3, 1, 0, 0), 'beginning_of_time': datetime.datetime(2016, 1, 1, 0, 0), - 'label_window': '1 month' + 'label_window': '1 month', + 'state': 'state_one AND state_two' } uuid = metta.generate_uuid(matrix_metadata) architect.build_matrix( - as_of_times = dates, - label_name = 'booking', - label_type = 'binary', - feature_dictionary = feature_dictionary, - matrix_directory = temp_dir, - matrix_metadata = matrix_metadata, - matrix_uuid = uuid, - matrix_type = 'test' + as_of_times=dates, + label_name='booking', + label_type='binary', + feature_dictionary=feature_dictionary, + matrix_directory=temp_dir, + matrix_metadata=matrix_metadata, + matrix_uuid=uuid, + matrix_type='test' ) print(os.listdir(temp_dir)) matrix_filename = os.path.join( @@ -761,7 +858,6 @@ def test_replace(self): datetime.datetime(2016, 2, 1, 0, 0), datetime.datetime(2016, 3, 1, 0, 0) ] - entities = [0, 2, 3] with testing.postgresql.Postgresql() as postgresql: # create an engine and generate a table with fake feature data @@ -770,7 +866,7 @@ def test_replace(self): engine=engine, features_tables=features_tables, labels=labels, - entities=entities + states=states ) with TemporaryDirectory() as temp_dir: @@ -778,6 +874,7 @@ def test_replace(self): beginning_of_time = datetime.datetime(2010, 1, 1, 0, 0), label_names = ['booking'], label_types = ['binary'], + states = ['state_one AND state_two'], db_config = db_config, matrix_directory = temp_dir, user_metadata = {}, @@ -799,18 +896,19 @@ def test_replace(self): 'label_name': 'booking', 'end_time': datetime.datetime(2016, 3, 1, 0, 0), 'beginning_of_time': datetime.datetime(2016, 1, 1, 0, 0), - 'label_window': '1 month' + 'label_window': '1 month', + 'state': 'state_one AND state_two' } uuid = metta.generate_uuid(matrix_metadata) architect.build_matrix( - as_of_times = dates, - label_name = 'booking', - label_type = 'binary', - feature_dictionary = feature_dictionary, - matrix_directory = temp_dir, - matrix_metadata = matrix_metadata, - matrix_uuid = uuid, - matrix_type = 'test' + as_of_times=dates, + label_name='booking', + label_type='binary', + feature_dictionary=feature_dictionary, + matrix_directory=temp_dir, + matrix_metadata=matrix_metadata, + matrix_uuid=uuid, + matrix_type='test' ) matrix_filename = os.path.join( @@ -825,13 +923,13 @@ def test_replace(self): # rerun architect.builder.make_entity_date_table = Mock() architect.builder.build_matrix( - as_of_times = dates, - label_name = 'booking', - label_type = 'binary', - feature_dictionary = feature_dictionary, - matrix_directory = temp_dir, - matrix_metadata = matrix_metadata, - matrix_uuid = uuid, - matrix_type = 'test' + as_of_times=dates, + label_name='booking', + label_type='binary', + feature_dictionary=feature_dictionary, + matrix_directory=temp_dir, + matrix_metadata=matrix_metadata, + matrix_uuid=uuid, + matrix_type='test' ) assert not architect.builder.make_entity_date_table.called diff --git a/tests/utils.py b/tests/utils.py index 10eb938..9db8965 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -11,7 +11,7 @@ def convert_string_column_to_date(column): [datetime.datetime.strptime(date, '%Y-%m-%d').date() for date in column] ) -def create_schemas(engine, features_tables, labels, entities): +def create_schemas(engine, features_tables, labels, states): """ This function makes a features schema and populates it with the fake data from above. @@ -42,16 +42,22 @@ def create_schemas(engine, features_tables, labels, entities): row ) # create entities table - engine.execute('drop table if exists staging cascade; create schema staging;') + engine.execute('drop schema if exists staging cascade; create schema staging;') engine.execute( """ - create table staging.entities ( - entity_id int + create table staging.sparse_states ( + entity_id int, + as_of_date date, + state_one bool, + state_two bool ) """ ) - for entity in entities: - engine.execute('insert into staging.entities values (%s)', entity) + for row in states: + engine.execute( + 'insert into staging.sparse_states values (%s, %s, %s, %s)', + row + ) def create_features_table(table_number, table, engine): @@ -71,10 +77,11 @@ def create_features_table(table_number, table, engine): ) def create_entity_date_df( - dates, labels, + states, as_of_dates, - entities, + state_one, + state_two, label_name, label_type, label_window @@ -91,20 +98,29 @@ def create_entity_date_df( 'label_type', 'label' ]) - dates = [date.date() for date in dates] + states_table = pd.DataFrame(states, columns = [ + 'entity_id', + 'as_of_date', + 'state_one', + 'state_two' + ]).set_index(['entity_id', 'as_of_date']) + as_of_dates = [date.date() for date in as_of_dates] labels_table = labels_table[labels_table['label_name'] == label_name] labels_table = labels_table[labels_table['label_type'] == label_type] labels_table = labels_table[labels_table['label_window'] == label_window] + labels_table = labels_table.join( + other=states_table, + on=('entity_id', 'as_of_date'), + ) + labels_table = labels_table[labels_table['state_one'] & labels_table['state_two']] ids_dates = labels_table[['entity_id', 'as_of_date']] ids_dates = ids_dates.sort_values(['entity_id', 'as_of_date']) ids_dates['as_of_date'] = [datetime.datetime.strptime( date, '%Y-%m-%d' ).date() for date in ids_dates['as_of_date']] - ids_dates = ids_dates[ids_dates['as_of_date'].isin(dates)] - ids_dates = ids_dates[ids_dates['entity_id'].isin(entities)] + ids_dates = ids_dates[ids_dates['as_of_date'].isin(as_of_dates)] print(ids_dates) - print(dates) return(ids_dates.reset_index(drop = True)) diff --git a/timechop/architect.py b/timechop/architect.py index 1a0394f..c6faa39 100644 --- a/timechop/architect.py +++ b/timechop/architect.py @@ -11,12 +11,23 @@ class Architect(object): - def __init__(self, beginning_of_time, label_names, label_types, db_config, - matrix_directory, user_metadata, engine, - builder_class=builders.HighMemoryCSVBuilder, replace=True): + def __init__( + self, + beginning_of_time, + label_names, + label_types, + states, + db_config, + matrix_directory, + user_metadata, + engine, + builder_class=builders.HighMemoryCSVBuilder, + replace=True + ): self.beginning_of_time = beginning_of_time # earliest time included in features self.label_names = label_names self.label_types = label_types + self.states = states self.db_config = db_config self.matrix_directory = matrix_directory self.user_metadata = user_metadata @@ -47,8 +58,15 @@ def _generate_build_task( 'matrix_type': matrix_metadata['matrix_type'] } - def _make_metadata(self, matrix_definition, feature_dictionary, label_name, - label_type, matrix_type): + def _make_metadata( + self, + matrix_definition, + feature_dictionary, + label_name, + label_type, + state, + matrix_type + ): """ Generate dictionary of matrix metadata. :param matrix_definition: temporal definition of matrix @@ -86,6 +104,7 @@ def _make_metadata(self, matrix_definition, feature_dictionary, label_name, # other information 'label_type': label_type, + 'state': state, 'matrix_id': matrix_id, 'matrix_type': matrix_type @@ -113,9 +132,10 @@ def generate_plans(self, matrix_set_definitions, feature_dictionaries): build_tasks = dict() for matrix_set in matrix_set_definitions: train_matrix = matrix_set['train_matrix'] - for label_name, label_type, feature_dictionary in itertools.product( + for label_name, label_type, state, feature_dictionary in itertools.product( self.label_names, self.label_types, + self.states, feature_dictionaries ): matrix_set_clone = copy.deepcopy(matrix_set) @@ -125,6 +145,7 @@ def generate_plans(self, matrix_set_definitions, feature_dictionaries): feature_dictionary, label_name, label_type, + state, 'train', ) train_uuid = metta.generate_uuid(train_metadata) @@ -144,6 +165,7 @@ def generate_plans(self, matrix_set_definitions, feature_dictionaries): feature_dictionary, label_name, label_type, + state, 'test', ) test_uuid = metta.generate_uuid(test_metadata) diff --git a/timechop/builders.py b/timechop/builders.py index e9a6e68..e76396d 100644 --- a/timechop/builders.py +++ b/timechop/builders.py @@ -44,6 +44,7 @@ def build_labels_query( final_column, label_name, label_type, + state, label_window ): """ Given a table, schema, and list of dates, write a query to get the @@ -61,21 +62,24 @@ def build_labels_query( """ as_of_time_strings = [str(as_of_time) for as_of_time in as_of_times] query = """ - SELECT entity_id, - as_of_date{labels} - FROM {labels_schema_name}.{labels_table_name} - INNER JOIN {entities_table} USING(entity_id) - WHERE as_of_date IN (SELECT (UNNEST (ARRAY{times}::timestamp[]))) AND + SELECT l.entity_id, + l.as_of_date{labels} + FROM {labels_schema_name}.{labels_table_name} l + JOIN {states_table} s + ON l.entity_id = s.entity_id + WHERE l.as_of_date IN (SELECT (UNNEST (ARRAY{times}::timestamp[]))) AND label_name = '{l_name}' AND label_type = '{l_type}' AND - label_window = '{window}' - ORDER BY entity_id, - as_of_date + label_window = '{window}' AND + {state_string} + ORDER BY l.entity_id, + l.as_of_date """.format( labels=final_column, labels_schema_name=self.db_config['labels_schema_name'], labels_table_name=self.db_config['labels_table_name'], - entities_table=self.db_config['entities_table_name'], + states_table=self.db_config['sparse_state_table_name'], + state_string=state, times=as_of_time_strings, l_name=label_name, l_type=label_type, @@ -140,6 +144,7 @@ def make_entity_date_table( as_of_times, label_name, label_type, + state, matrix_type, matrix_uuid, label_window @@ -161,12 +166,11 @@ def make_entity_date_table( final_column='', label_name=label_name, label_type=label_type, + state=state, label_window=label_window ) elif matrix_type == 'test': - indices_query = self.get_all_valid_entity_date_combos( - as_of_times=as_of_times - ) + indices_query = self.get_all_valid_entity_date_combos(state=state) else: raise ValueError('Unknown matrix type passed: {}'.format(matrix_type)) @@ -183,22 +187,15 @@ def make_entity_date_table( return table_name - def get_all_valid_entity_date_combos(self, as_of_times): - as_of_time_strings = [str(as_of_time) for as_of_time in as_of_times] + def get_all_valid_entity_date_combos(self, state): query = """ SELECT entity_id, as_of_date - FROM ( - SELECT DISTINCT entity_id - FROM {entities_table} - ) AS e - CROSS JOIN ( - SELECT as_of_date - FROM (SELECT (UNNEST (ARRAY{times}::timestamp[]))) t(as_of_date) - ) AS d + FROM {states_table} + WHERE {state_string} ORDER BY entity_id, as_of_date """.format( - entities_table=self.db_config['entities_table_name'], - times=as_of_time_strings + states_table=self.db_config['sparse_state_table_name'], + state_string=state ) return(query) @@ -253,6 +250,7 @@ def build_matrix( as_of_times, label_name, label_type, + matrix_metadata['state'], matrix_type, matrix_uuid, matrix_metadata['label_window'] @@ -270,6 +268,7 @@ def build_matrix( as_of_times, label_name, label_type, + matrix_metadata['state'], matrix_type, entity_date_table_name, matrix_uuid, @@ -308,6 +307,7 @@ def write_labels_data( as_of_times, label_name, label_type, + state, matrix_type, entity_date_table_name, matrix_uuid, @@ -329,16 +329,19 @@ def write_labels_data( final_column=', label as {}'.format(label_name), label_name=label_name, label_type=label_type, + state=state, label_window=label_window ) elif matrix_type == 'test': - labels_query=self.build_outer_join_query( + labels_query = self.build_outer_join_query( as_of_times=as_of_times, right_table_name='{schema}.{table}'.format( schema=self.db_config['labels_schema_name'], table=self.db_config['labels_table_name'] ), - entity_date_table_name='"{}"'.format(entity_date_table_name), + entity_date_table_name='"{table}"'.format( + table=entity_date_table_name + ), right_column_selections=', r.label as {}'.format(label_name), additional_conditions='''AND r.label_name = '{name}' AND