From 21fadcf7ee2b942f70d2226c05383550a2563995 Mon Sep 17 00:00:00 2001 From: Erika Salomon Date: Mon, 15 May 2017 16:43:02 +0000 Subject: [PATCH] Select Entities from Entities Table: If merged, this commit will: - change the logic for creating entity-date sets so that for train sets, it filters the labels against and entities table and for test sets, it creates a cross join of all entities in the entities table and the dates for the matrix [Resolves #69] - creates entity date tables as temporary tables [Resolves #60], [Resolves #51] - reformats entity date table names so that uuid is last [Resolves #50] --- tests/test_architect.py | 123 +++++++++++++++++++++-------- tests/utils.py | 16 +++- timechop/builders.py | 166 +++++++++++++++++++--------------------- 3 files changed, 183 insertions(+), 122 deletions(-) diff --git a/tests/test_architect.py b/tests/test_architect.py index 26fd760..d328b96 100644 --- a/tests/test_architect.py +++ b/tests/test_architect.py @@ -1,19 +1,21 @@ from timechop.architect import Architect from timechop import builders -from tests.utils import create_features_and_labels_schemas +from tests.utils import create_schemas from tests.utils import create_entity_date_df from tests.utils import convert_string_column_to_date from tests.utils import NamedTempFile from tests.utils import TemporaryDirectory + import testing.postgresql import csv import datetime import pandas as pd import os -from sqlalchemy import create_engine from unittest import TestCase +from sqlalchemy import create_engine from metta import metta_io as metta from mock import Mock +import pytest # make some fake features data @@ -132,6 +134,7 @@ 'features_schema_name': 'features', 'labels_schema_name': 'labels', 'labels_table_name': 'labels', + 'entities_table_name': 'staging.entities' } def test_build_labels_query(): @@ -141,11 +144,12 @@ def test_build_labels_query(): # set up labeling config variables dates = [datetime.datetime(2016, 1, 1, 0, 0), datetime.datetime(2016, 2, 1, 0, 0)] + entities = [0, 1, 3] with testing.postgresql.Postgresql() as postgresql: # create an engine and generate a table with fake feature data engine = create_engine(postgresql.url()) - create_features_and_labels_schemas(engine, features_tables, labels) + create_schemas(engine, features_tables, labels, entities) # make a dataframe of labels to test against labels_df = pd.DataFrame( @@ -160,11 +164,17 @@ def test_build_labels_query(): ] ) labels_df['as_of_date'] = convert_string_column_to_date(labels_df['as_of_date']) - + labels_df = labels_df[labels_df['entity_id'].isin(entities)] + # create an engine and generate a table with fake feature data with testing.postgresql.Postgresql() as postgresql: engine = create_engine(postgresql.url()) - create_features_and_labels_schemas(engine, features_tables, labels) + create_schemas( + engine=engine, + features_tables=features_tables, + labels=labels, + entities=entities + ) with TemporaryDirectory() as temp_dir: architect = Architect( beginning_of_time = datetime.datetime(2010, 1, 1, 0, 0), @@ -208,10 +218,16 @@ def test_write_to_csv(): """ Test the write_to_csv function by checking whether the csv contains the correct number of lines. """ + entities = [0, 2, 3] with testing.postgresql.Postgresql() as postgresql: # create an engine and generate a table with fake feature data engine = create_engine(postgresql.url()) - create_features_and_labels_schemas(engine, features_tables, labels) + create_schemas( + engine=engine, + features_tables=features_tables, + labels=labels, + entities=entities + ) with TemporaryDirectory() as temp_dir: architect = Architect( @@ -239,7 +255,6 @@ def test_write_to_csv(): reader = csv.reader(f) assert(len([row for row in reader]) == len(table) + 1) - def test_make_entity_date_table(): """ Test that the make_entity_date_table function contains the correct values. @@ -248,11 +263,14 @@ def test_make_entity_date_table(): datetime.datetime(2016, 2, 1, 0, 0), datetime.datetime(2016, 3, 1, 0, 0)] + entities = [0, 1, 2] + # make a dataframe of entity ids and dates to test against ids_dates = create_entity_date_df( dates, labels, dates, + entities, 'booking', 'binary', '1 month' @@ -261,7 +279,12 @@ def test_make_entity_date_table(): with testing.postgresql.Postgresql() as postgresql: # create an engine and generate a table with fake feature data engine = create_engine(postgresql.url()) - create_features_and_labels_schemas(engine, features_tables, labels) + create_schemas( + engine=engine, + features_tables=features_tables, + labels=labels, + entities=entities + ) with TemporaryDirectory() as temp_dir: architect = Architect( @@ -281,7 +304,6 @@ def test_make_entity_date_table(): as_of_times=dates, label_type='binary', label_name='booking', - feature_table_names=['features0', 'features1'], matrix_uuid='my_uuid', matrix_type='train', label_window='1 month' @@ -289,7 +311,7 @@ def test_make_entity_date_table(): # read in the table result = pd.read_sql( - "select * from features.{} order by entity_id, as_of_date".format(entity_date_table_name), + "select * from {} order by entity_id, as_of_date".format(entity_date_table_name), engine ) labels_df = pd.read_sql('select * from labels.labels', engine) @@ -305,17 +327,28 @@ def test_make_entity_date_table(): print(test) assert(test.all().all()) + # test that the table disappears after session closes + engine.dispose() + engine2 = create_engine(postgresql.url()) + try: + engine2.execute('select * from {}'.format(entity_date_table_name)) + except: + programmingerror = True + assert(programmingerror) + def test_build_outer_join_query(): """ """ dates = [datetime.datetime(2016, 1, 1, 0, 0), datetime.datetime(2016, 2, 1, 0, 0)] + entities = [1, 2, 3] # make dataframe for entity ids and dates ids_dates = create_entity_date_df( dates, labels, dates, + entities, 'booking', 'binary', '1 month' @@ -342,8 +375,12 @@ def test_build_outer_join_query(): # create an engine and generate a table with fake feature data with testing.postgresql.Postgresql() as postgresql: engine = create_engine(postgresql.url()) - create_features_and_labels_schemas(engine, features_tables, labels) - + create_schemas( + engine=engine, + features_tables=features_tables, + labels=labels, + entities=entities + ) with TemporaryDirectory() as temp_dir: architect = Architect( beginning_of_time = datetime.datetime(2010, 1, 1, 0, 0), @@ -360,7 +397,6 @@ def test_build_outer_join_query(): as_of_times=dates, label_type='binary', label_name='booking', - feature_table_names=['features0', 'features1'], matrix_type='train', matrix_uuid='my_uuid', label_window='1 month' @@ -373,7 +409,7 @@ def test_build_outer_join_query(): query = architect.builder.build_outer_join_query( as_of_times = dates, right_table_name = 'features.{}'.format(table_name), - entity_date_table_name = 'features.{}'.format(entity_date_table_name), + entity_date_table_name = entity_date_table_name, right_column_selections = architect.builder._format_imputations( features[table_number] ) @@ -596,14 +632,21 @@ def test_generate_plans(): class TestBuildMatrix(object): def test_train_matrix(self): + dates = [ + datetime.datetime(2016, 1, 1, 0, 0), + datetime.datetime(2016, 2, 1, 0, 0), + datetime.datetime(2016, 3, 1, 0, 0) + ] + entities = [0, 1, 2] with testing.postgresql.Postgresql() as postgresql: # create an engine and generate a table with fake feature data engine = create_engine(postgresql.url()) - create_features_and_labels_schemas(engine, features_tables, labels) - - dates = [datetime.datetime(2016, 1, 1, 0, 0), - datetime.datetime(2016, 2, 1, 0, 0), - datetime.datetime(2016, 3, 1, 0, 0)] + create_schemas( + engine=engine, + features_tables=features_tables, + labels=labels, + entities=entities + ) with TemporaryDirectory() as temp_dir: architect = Architect( @@ -644,17 +687,25 @@ def test_train_matrix(self): ) with open(matrix_filename, 'r') as f: reader = csv.reader(f) - assert(len([row for row in reader]) == 12) + assert(len([row for row in reader]) == 9) def test_test_matrix(self): + dates = [ + datetime.datetime(2016, 1, 1, 0, 0), + datetime.datetime(2016, 2, 1, 0, 0), + datetime.datetime(2016, 3, 1, 0, 0) + ] + entities = [0, 1, 3] + with testing.postgresql.Postgresql() as postgresql: # create an engine and generate a table with fake feature data engine = create_engine(postgresql.url()) - create_features_and_labels_schemas(engine, features_tables, labels) - - dates = [datetime.datetime(2016, 1, 1, 0, 0), - datetime.datetime(2016, 2, 1, 0, 0), - datetime.datetime(2016, 3, 1, 0, 0)] + create_schemas( + engine=engine, + features_tables=features_tables, + labels=labels, + entities=entities + ) with TemporaryDirectory() as temp_dir: architect = Architect( @@ -702,17 +753,25 @@ def test_test_matrix(self): with open(matrix_filename, 'r') as f: reader = csv.reader(f) - assert(len([row for row in reader]) == 13) + assert(len([row for row in reader]) == 10) def test_replace(self): + dates = [ + datetime.datetime(2016, 1, 1, 0, 0), + datetime.datetime(2016, 2, 1, 0, 0), + datetime.datetime(2016, 3, 1, 0, 0) + ] + entities = [0, 2, 3] + with testing.postgresql.Postgresql() as postgresql: # create an engine and generate a table with fake feature data engine = create_engine(postgresql.url()) - create_features_and_labels_schemas(engine, features_tables, labels) - - dates = [datetime.datetime(2016, 1, 1, 0, 0), - datetime.datetime(2016, 2, 1, 0, 0), - datetime.datetime(2016, 3, 1, 0, 0)] + create_schemas( + engine=engine, + features_tables=features_tables, + labels=labels, + entities=entities + ) with TemporaryDirectory() as temp_dir: architect = Architect( @@ -761,7 +820,7 @@ def test_replace(self): with open(matrix_filename, 'r') as f: reader = csv.reader(f) - assert(len([row for row in reader]) == 13) + assert(len([row for row in reader]) == 10) # rerun architect.builder.make_entity_date_table = Mock() diff --git a/tests/utils.py b/tests/utils.py index e912228..10eb938 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -11,7 +11,7 @@ def convert_string_column_to_date(column): [datetime.datetime.strptime(date, '%Y-%m-%d').date() for date in column] ) -def create_features_and_labels_schemas(engine, features_tables, labels): +def create_schemas(engine, features_tables, labels, entities): """ This function makes a features schema and populates it with the fake data from above. @@ -41,6 +41,18 @@ def create_features_and_labels_schemas(engine, features_tables, labels): 'insert into labels.labels values (%s, %s, %s, %s, %s, %s)', row ) + # create entities table + engine.execute('drop table if exists staging cascade; create schema staging;') + engine.execute( + """ + create table staging.entities ( + entity_id int + ) + """ + ) + for entity in entities: + engine.execute('insert into staging.entities values (%s)', entity) + def create_features_table(table_number, table, engine): engine.execute( @@ -62,6 +74,7 @@ def create_entity_date_df( dates, labels, as_of_dates, + entities, label_name, label_type, label_window @@ -89,6 +102,7 @@ def create_entity_date_df( '%Y-%m-%d' ).date() for date in ids_dates['as_of_date']] ids_dates = ids_dates[ids_dates['as_of_date'].isin(dates)] + ids_dates = ids_dates[ids_dates['entity_id'].isin(entities)] print(ids_dates) print(dates) diff --git a/timechop/builders.py b/timechop/builders.py index b7fa196..e9a6e68 100644 --- a/timechop/builders.py +++ b/timechop/builders.py @@ -64,6 +64,7 @@ def build_labels_query( SELECT entity_id, as_of_date{labels} FROM {labels_schema_name}.{labels_table_name} + INNER JOIN {entities_table} USING(entity_id) WHERE as_of_date IN (SELECT (UNNEST (ARRAY{times}::timestamp[]))) AND label_name = '{l_name}' AND label_type = '{l_type}' AND @@ -71,13 +72,14 @@ def build_labels_query( ORDER BY entity_id, as_of_date """.format( - labels = final_column, - labels_schema_name = self.db_config['labels_schema_name'], - labels_table_name = self.db_config['labels_table_name'], - times = as_of_time_strings, - l_name = label_name, - l_type = label_type, - window = label_window + labels=final_column, + labels_schema_name=self.db_config['labels_schema_name'], + labels_table_name=self.db_config['labels_table_name'], + entities_table=self.db_config['entities_table_name'], + times=as_of_time_strings, + l_name=label_name, + l_type=label_type, + window=label_window ) return(query) @@ -138,7 +140,6 @@ def make_entity_date_table( as_of_times, label_name, label_type, - feature_table_names, matrix_type, matrix_uuid, label_window @@ -164,19 +165,17 @@ def make_entity_date_table( ) elif matrix_type == 'test': indices_query = self.get_all_valid_entity_date_combos( - as_of_times=as_of_times, - feature_table_names=feature_table_names + as_of_times=as_of_times ) else: raise ValueError('Unknown matrix type passed: {}'.format(matrix_type)) - table_name = '_'.join([matrix_uuid, 'tmp_entity_date']) + table_name = '_'.join(['tmp_entity_date', matrix_uuid]) query = """ - DROP TABLE IF EXISTS {features_schema_name}."{table_name}"; - CREATE TABLE {features_schema_name}."{table_name}" + DROP TABLE IF EXISTS "{table_name}"; + CREATE TEMPORARY TABLE "{table_name}" AS ({index_query}) """.format( - features_schema_name=self.db_config['features_schema_name'], table_name=table_name, index_query=indices_query ) @@ -184,26 +183,24 @@ def make_entity_date_table( return table_name - def get_all_valid_entity_date_combos(self, as_of_times, feature_table_names): - as_of_time_strings = [str(as_of_time) for as_of_time in as_of_times] - query_list = [] - for index, table in enumerate(feature_table_names): - union = '' - if index != 0: - union = 'UNION' - subquery = """ {u} - SELECT DISTINCT entity_id, as_of_date - FROM {schema_name}.{table_name} - WHERE as_of_date IN (SELECT (UNNEST (ARRAY{dates}::timestamp[]))) - """.format( - u = union, - table_name = table, - dates = as_of_time_strings, - schema_name = self.db_config['features_schema_name'] - ) - query_list.append(subquery) - - return(''.join(query_list)) + def get_all_valid_entity_date_combos(self, as_of_times): + as_of_time_strings = [str(as_of_time) for as_of_time in as_of_times] + query = """ + SELECT entity_id, as_of_date + FROM ( + SELECT DISTINCT entity_id + FROM {entities_table} + ) AS e + CROSS JOIN ( + SELECT as_of_date + FROM (SELECT (UNNEST (ARRAY{times}::timestamp[]))) t(as_of_date) + ) AS d + ORDER BY entity_id, as_of_date + """.format( + entities_table=self.db_config['entities_table_name'], + times=as_of_time_strings + ) + return(query) class CSVBuilder(BuilderBase): @@ -256,63 +253,54 @@ def build_matrix( as_of_times, label_name, label_type, - feature_dictionary.keys(), matrix_type, matrix_uuid, matrix_metadata['label_window'] ) + logging.info('Writing feature group data') + features_csv_names = self.write_features_data( + as_of_times, + feature_dictionary, + entity_date_table_name, + matrix_uuid + ) try: - logging.info('Writing feature group data') - features_csv_names = self.write_features_data( + logging.info('Writing label data') + labels_csv_name = self.write_labels_data( as_of_times, - feature_dictionary, + label_name, + label_type, + matrix_type, entity_date_table_name, + matrix_uuid, + matrix_metadata['label_window'] + ) + features_csv_names.insert(0, labels_csv_name) + + # stitch together the csvs + logging.info('Merging features data') + output = self.merge_feature_csvs( + features_csv_names, + matrix_directory, matrix_uuid ) - try: - logging.info('Writing label data') - labels_csv_name = self.write_labels_data( - as_of_times, - label_name, - label_type, - matrix_type, - entity_date_table_name, - matrix_uuid, - matrix_metadata['label_window'] - ) - features_csv_names.insert(0, labels_csv_name) - - # stitch together the csvs - logging.info('Merging features data') - output = self.merge_feature_csvs( - features_csv_names, - matrix_directory, - matrix_uuid - ) - finally: - # clean up files and database before finishing - for csv_name in features_csv_names: - self.remove_file(csv_name) - try: - # store the matrix - logging.info('Archiving matrix with metta') - metta.archive_matrix( - matrix_config=matrix_metadata, - df_matrix=output, - overwrite=True, - directory=self.matrix_directory, - format='csv' - ) - finally: - if isinstance(output, str): - os.remove(output) finally: - self.engine.execute( - 'drop table "{}"."{}";'.format( - self.db_config['features_schema_name'], - entity_date_table_name - ) + # clean up files and database before finishing + for csv_name in features_csv_names: + self.remove_file(csv_name) + try: + # store the matrix + logging.info('Archiving matrix with metta') + metta.archive_matrix( + matrix_config=matrix_metadata, + df_matrix=output, + overwrite=True, + directory=self.matrix_directory, + format='csv' ) + finally: + if isinstance(output, str): + os.remove(output) def write_labels_data( @@ -350,10 +338,7 @@ def write_labels_data( schema=self.db_config['labels_schema_name'], table=self.db_config['labels_table_name'] ), - entity_date_table_name='"{schema}"."{table}"'.format( - schema=self.db_config['features_schema_name'], - table=entity_date_table_name - ), + entity_date_table_name='"{}"'.format(entity_date_table_name), right_column_selections=', r.label as {}'.format(label_name), additional_conditions='''AND r.label_name = '{name}' AND @@ -372,7 +357,13 @@ def write_labels_data( self.write_to_csv(labels_query, csv_name) return(csv_name) - def write_features_data(self, as_of_times, feature_dictionary, entity_date_table_name, matrix_uuid): + def write_features_data( + self, + as_of_times, + feature_dictionary, + entity_date_table_name, + matrix_uuid + ): """ Loop over tables in features schema, writing the data from each to a csv. Return the full list of feature csv names and the list of all features. @@ -394,10 +385,7 @@ def write_features_data(self, as_of_times, feature_dictionary, entity_date_table schema = self.db_config['features_schema_name'], table = feature_table_name ), - entity_date_table_name = '{schema}."{table}"'.format( - schema = self.db_config['features_schema_name'], - table = entity_date_table_name - ), + entity_date_table_name = '"{}"'.format(entity_date_table_name), right_column_selections = self._format_imputations(feature_names) ) self.write_to_csv(features_query, csv_name)