diff --git a/tests/test_architect.py b/tests/test_architect.py index 26fd760..6704e8b 100644 --- a/tests/test_architect.py +++ b/tests/test_architect.py @@ -1,23 +1,58 @@ from timechop.architect import Architect from timechop import builders -from tests.utils import create_features_and_labels_schemas +from tests.utils import create_schemas from tests.utils import create_entity_date_df from tests.utils import convert_string_column_to_date from tests.utils import NamedTempFile from tests.utils import TemporaryDirectory + import testing.postgresql import csv import datetime import pandas as pd import os -from sqlalchemy import create_engine from unittest import TestCase +from sqlalchemy import create_engine from metta import metta_io as metta from mock import Mock +import pytest # make some fake features data +states = [ + [0, '2016-02-01', False, True], + [0, '2016-02-01', False, True], + [0, '2016-03-01', False, True], + [0, '2016-04-01', False, True], + [0, '2016-05-01', False, True], + [1, '2016-01-01', True, False], + [1, '2016-02-01', True, False], + [1, '2016-03-01', True, False], + [1, '2016-04-01', True, False], + [1, '2016-05-01', True, False], + [2, '2016-01-01', True, False], + [2, '2016-02-01', True, True], + [2, '2016-03-01', True, False], + [2, '2016-04-01', True, True], + [2, '2016-05-01', True, False], + [3, '2016-01-01', False, True], + [3, '2016-02-01', True, True], + [3, '2016-03-01', False, True], + [3, '2016-04-01', True, True], + [3, '2016-05-01', False, True], + [4, '2016-01-01', True, True], + [4, '2016-02-01', True, True], + [4, '2016-03-01', True, True], + [4, '2016-04-01', True, True], + [4, '2016-05-01', True, True], + [5, '2016-01-01', False, False], + [5, '2016-02-01', False, False], + [5, '2016-03-01', False, False], + [5, '2016-04-01', False, False], + [5, '2016-05-01', False, False] +] + features0 = [ [0, '2016-01-01', 2, 0], [1, '2016-01-01', 1, 2], @@ -37,7 +72,10 @@ [0, '2016-03-01', 3, 3], [1, '2016-03-01', 3, 4], [2, '2016-03-01', 3, 3], - [3, '2016-03-01', 3, 4] + [3, '2016-03-01', 3, 4], + [0, '2016-03-01', 3, 3], + [4, '2016-03-01', 1, 4], + [5, '2016-03-01', 2, 4] ] features_tables = [features0, features1] @@ -84,6 +122,26 @@ [3, '2016-03-01', '1 month', 'ems', 'binary', 0], [3, '2016-04-01', '1 month', 'ems', 'binary', 1], [3, '2016-05-01', '1 month', 'ems', 'binary', 0], + [4, '2016-01-01', '1 month', 'booking', 'binary', 1], + [4, '2016-02-01', '1 month', 'booking', 'binary', 0], + [4, '2016-03-01', '1 month', 'booking', 'binary', 0], + [4, '2016-04-01', '1 month', 'booking', 'binary', 0], + [4, '2016-05-01', '1 month', 'booking', 'binary', 0], + [4, '2016-01-01', '1 month', 'ems', 'binary', 0], + [4, '2016-02-01', '1 month', 'ems', 'binary', 1], + [4, '2016-03-01', '1 month', 'ems', 'binary', 0], + [4, '2016-04-01', '1 month', 'ems', 'binary', 1], + [4, '2016-05-01', '1 month', 'ems', 'binary', 1], + [5, '2016-01-01', '1 month', 'booking', 'binary', 1], + [5, '2016-02-01', '1 month', 'booking', 'binary', 0], + [5, '2016-03-01', '1 month', 'booking', 'binary', 0], + [5, '2016-04-01', '1 month', 'booking', 'binary', 0], + [5, '2016-05-01', '1 month', 'booking', 'binary', 0], + [5, '2016-01-01', '1 month', 'ems', 'binary', 0], + [5, '2016-02-01', '1 month', 'ems', 'binary', 1], + [5, '2016-03-01', '1 month', 'ems', 'binary', 0], + [5, '2016-04-01', '1 month', 'ems', 'binary', 0], + [5, '2016-05-01', '1 month', 'ems', 'binary', 0], [0, '2016-02-01', '3 month', 'booking', 'binary', 0], [0, '2016-03-01', '3 month', 'booking', 'binary', 0], [0, '2016-04-01', '3 month', 'booking', 'binary', 0], @@ -122,7 +180,27 @@ [3, '2016-02-01', '3 month', 'ems', 'binary', 0], [3, '2016-03-01', '3 month', 'ems', 'binary', 0], [3, '2016-04-01', '3 month', 'ems', 'binary', 1], - [3, '2016-05-01', '3 month', 'ems', 'binary', 0] + [3, '2016-05-01', '3 month', 'ems', 'binary', 0], + [4, '2016-01-01', '3 month', 'booking', 'binary', 0], + [4, '2016-02-01', '3 month', 'booking', 'binary', 0], + [4, '2016-03-01', '3 month', 'booking', 'binary', 1], + [4, '2016-04-01', '3 month', 'booking', 'binary', 0], + [4, '2016-05-01', '3 month', 'booking', 'binary', 1], + [4, '2016-01-01', '3 month', 'ems', 'binary', 0], + [4, '2016-02-01', '3 month', 'ems', 'binary', 0], + [4, '2016-03-01', '3 month', 'ems', 'binary', 0], + [4, '2016-04-01', '3 month', 'ems', 'binary', 0], + [4, '2016-05-01', '3 month', 'ems', 'binary', 1], + [5, '2016-01-01', '3 month', 'booking', 'binary', 0], + [5, '2016-02-01', '3 month', 'booking', 'binary', 0], + [5, '2016-03-01', '3 month', 'booking', 'binary', 1], + [5, '2016-04-01', '3 month', 'booking', 'binary', 0], + [5, '2016-05-01', '3 month', 'booking', 'binary', 1], + [5, '2016-01-01', '3 month', 'ems', 'binary', 0], + [5, '2016-02-01', '3 month', 'ems', 'binary', 0], + [5, '2016-03-01', '3 month', 'ems', 'binary', 0], + [5, '2016-04-01', '3 month', 'ems', 'binary', 1], + [5, '2016-05-01', '3 month', 'ems', 'binary', 0] ] label_name = 'booking' @@ -132,6 +210,7 @@ 'features_schema_name': 'features', 'labels_schema_name': 'labels', 'labels_table_name': 'labels', + 'sparse_state_table_name': 'staging.sparse_states' } def test_build_labels_query(): @@ -142,11 +221,6 @@ def test_build_labels_query(): dates = [datetime.datetime(2016, 1, 1, 0, 0), datetime.datetime(2016, 2, 1, 0, 0)] - with testing.postgresql.Postgresql() as postgresql: - # create an engine and generate a table with fake feature data - engine = create_engine(postgresql.url()) - create_features_and_labels_schemas(engine, features_tables, labels) - # make a dataframe of labels to test against labels_df = pd.DataFrame( labels, @@ -159,21 +233,42 @@ def test_build_labels_query(): 'label' ] ) + states_df = pd.DataFrame( + states, + columns = [ + 'entity_id', + 'as_of_date', + 'state_one', + 'state_two' + ] + ).set_index(['entity_id', 'as_of_date']) + labels_df = labels_df[ + states_df['entity_id'] == labels_df['entity_id'] & + states_df['as_of_date'] == labels_df['as_of_date'] & + states_df['state_one'] & + states_df['state_two'] + ] labels_df['as_of_date'] = convert_string_column_to_date(labels_df['as_of_date']) - + # create an engine and generate a table with fake feature data with testing.postgresql.Postgresql() as postgresql: engine = create_engine(postgresql.url()) - create_features_and_labels_schemas(engine, features_tables, labels) + create_schemas( + engine=engine, + features_tables=features_tables, + labels=labels, + states=states + ) with TemporaryDirectory() as temp_dir: architect = Architect( - beginning_of_time = datetime.datetime(2010, 1, 1, 0, 0), - label_names = ['booking'], - label_types = ['binary'], - db_config = db_config, - matrix_directory = temp_dir, - user_metadata = {}, - engine = engine + beginning_of_time=datetime.datetime(2010, 1, 1, 0, 0), + label_names=['booking'], + label_types=['binary'], + states=['state_one AND state_two'], + db_config=db_config, + matrix_directory=temp_dir, + user_metadata={}, + engine=engine ) # get the queries and test them @@ -211,18 +306,23 @@ def test_write_to_csv(): with testing.postgresql.Postgresql() as postgresql: # create an engine and generate a table with fake feature data engine = create_engine(postgresql.url()) - create_features_and_labels_schemas(engine, features_tables, labels) - + create_schemas( + engine=engine, + features_tables=features_tables, + labels=labels, + states=states + ) with TemporaryDirectory() as temp_dir: architect = Architect( - beginning_of_time = datetime.datetime(2010, 1, 1, 0, 0), - label_names = ['booking'], - label_types = ['binary'], - db_config = db_config, - matrix_directory = temp_dir, - user_metadata = {}, - engine = engine, - builder_class = builders.LowMemoryCSVBuilder + beginning_of_time=datetime.datetime(2010, 1, 1, 0, 0), + label_names=['booking'], + label_types=['binary'], + states=['state_one AND state_two'], + db_config=db_config, + matrix_directory=temp_dir, + user_metadata={}, + engine=engine, + builder_class=builders.LowMemoryCSVBuilder ) # for each table, check that corresponding csv has the correct # of rows @@ -239,49 +339,58 @@ def test_write_to_csv(): reader = csv.reader(f) assert(len([row for row in reader]) == len(table) + 1) - def test_make_entity_date_table(): """ Test that the make_entity_date_table function contains the correct values. """ - dates = [datetime.datetime(2016, 1, 1, 0, 0), - datetime.datetime(2016, 2, 1, 0, 0), - datetime.datetime(2016, 3, 1, 0, 0)] + dates = [ + datetime.datetime(2016, 1, 1, 0, 0), + datetime.datetime(2016, 2, 1, 0, 0), + datetime.datetime(2016, 3, 1, 0, 0) + ] # make a dataframe of entity ids and dates to test against ids_dates = create_entity_date_df( - dates, - labels, - dates, - 'booking', - 'binary', - '1 month' + labels=labels, + states=states, + as_of_dates=dates, + state_one=True, + state_two=True, + label_name='booking', + label_type='binary', + label_window='1 month' ) with testing.postgresql.Postgresql() as postgresql: # create an engine and generate a table with fake feature data engine = create_engine(postgresql.url()) - create_features_and_labels_schemas(engine, features_tables, labels) + create_schemas( + engine=engine, + features_tables=features_tables, + labels=labels, + states=states + ) with TemporaryDirectory() as temp_dir: architect = Architect( - beginning_of_time = datetime.datetime(2010, 1, 1, 0, 0), - label_names = ['booking'], - label_types = ['binary'], - db_config = db_config, - matrix_directory = temp_dir, - user_metadata = {}, - engine = engine + beginning_of_time=datetime.datetime(2010, 1, 1, 0, 0), + label_names=['booking'], + label_types=['binary'], + states=['state_one AND state_two'], + db_config=db_config, + matrix_directory=temp_dir, + user_metadata={}, + engine=engine ) engine.execute( - 'CREATE TABLE features.tmp_entity_date (a int, b date);' + 'CREATE TEMPORARY TABLE tmp_entity_date (a int, b date);' ) # call the function to test the creation of the table entity_date_table_name = architect.builder.make_entity_date_table( as_of_times=dates, label_type='binary', label_name='booking', - feature_table_names=['features0', 'features1'], + state='state_one AND state_two', matrix_uuid='my_uuid', matrix_type='train', label_window='1 month' @@ -289,7 +398,7 @@ def test_make_entity_date_table(): # read in the table result = pd.read_sql( - "select * from features.{} order by entity_id, as_of_date".format(entity_date_table_name), + "select * from {} order by entity_id, as_of_date".format(entity_date_table_name), engine ) labels_df = pd.read_sql('select * from labels.labels', engine) @@ -305,20 +414,33 @@ def test_make_entity_date_table(): print(test) assert(test.all().all()) + # test that the table disappears after session closes + engine.dispose() + engine2 = create_engine(postgresql.url()) + try: + engine2.execute('select * from {}'.format(entity_date_table_name)) + except: + programmingerror = True + assert(programmingerror) + def test_build_outer_join_query(): """ """ - dates = [datetime.datetime(2016, 1, 1, 0, 0), - datetime.datetime(2016, 2, 1, 0, 0)] + dates = [ + datetime.datetime(2016, 1, 1, 0, 0), + datetime.datetime(2016, 2, 1, 0, 0) + ] # make dataframe for entity ids and dates ids_dates = create_entity_date_df( - dates, - labels, - dates, - 'booking', - 'binary', - '1 month' + labels=labels, + states=states, + as_of_dates=dates, + state_one=True, + state_two=True, + label_name='booking', + label_type='binary', + label_window='1 month' ) features = [['f1', 'f2'], ['f3', 'f4']] @@ -342,17 +464,22 @@ def test_build_outer_join_query(): # create an engine and generate a table with fake feature data with testing.postgresql.Postgresql() as postgresql: engine = create_engine(postgresql.url()) - create_features_and_labels_schemas(engine, features_tables, labels) - + create_schemas( + engine=engine, + features_tables=features_tables, + labels=labels, + states=states + ) with TemporaryDirectory() as temp_dir: architect = Architect( - beginning_of_time = datetime.datetime(2010, 1, 1, 0, 0), - label_names = ['booking'], - label_types = ['binary'], - db_config = db_config, - matrix_directory = temp_dir, - user_metadata = {}, - engine = engine + beginning_of_time=datetime.datetime(2010, 1, 1, 0, 0), + label_names=['booking'], + label_types=['binary'], + states=['state_one AND state_two'], + db_config=db_config, + matrix_directory=temp_dir, + user_metadata={}, + engine=engine ) # make the entity-date table @@ -360,7 +487,7 @@ def test_build_outer_join_query(): as_of_times=dates, label_type='binary', label_name='booking', - feature_table_names=['features0', 'features1'], + state='state_one AND state_two', matrix_type='train', matrix_uuid='my_uuid', label_window='1 month' @@ -370,15 +497,16 @@ def test_build_outer_join_query(): for table_number, df in enumerate(features_dfs): table_name = 'features{}'.format(table_number) df = df.fillna(0) + df = df.reset_index() query = architect.builder.build_outer_join_query( as_of_times = dates, right_table_name = 'features.{}'.format(table_name), - entity_date_table_name = 'features.{}'.format(entity_date_table_name), + entity_date_table_name = entity_date_table_name, right_column_selections = architect.builder._format_imputations( features[table_number] ) ) - result = pd.read_sql(query, engine) + result = pd.read_sql(query, engine).reset_index() test = (result == df) assert(test.all().all()) @@ -389,6 +517,7 @@ def test_merge_feature_csvs_lowmem(self): beginning_of_time = datetime.datetime(2010, 1, 1, 0, 0), label_names = ['booking'], label_types = ['binary'], + states = ['state_one AND state_two'], db_config = db_config, matrix_directory = temp_dir, user_metadata = {}, @@ -457,6 +586,7 @@ def test_badinput(self): beginning_of_time = datetime.datetime(2010, 1, 1, 0, 0), label_names = ['booking'], label_types = ['binary'], + states = ['state_one AND state_two'], db_config = db_config, matrix_directory = temp_dir, user_metadata = {}, @@ -567,6 +697,7 @@ def test_generate_plans(): beginning_of_time = datetime.datetime(2010, 1, 1, 0, 0), label_names = ['booking'], label_types = ['binary'], + states = ['state_one AND state_two'], db_config = db_config, user_metadata = {}, matrix_directory = '', # this test won't write anything @@ -596,20 +727,27 @@ def test_generate_plans(): class TestBuildMatrix(object): def test_train_matrix(self): + dates = [ + datetime.datetime(2016, 1, 1, 0, 0), + datetime.datetime(2016, 2, 1, 0, 0), + datetime.datetime(2016, 3, 1, 0, 0) + ] with testing.postgresql.Postgresql() as postgresql: # create an engine and generate a table with fake feature data engine = create_engine(postgresql.url()) - create_features_and_labels_schemas(engine, features_tables, labels) - - dates = [datetime.datetime(2016, 1, 1, 0, 0), - datetime.datetime(2016, 2, 1, 0, 0), - datetime.datetime(2016, 3, 1, 0, 0)] + create_schemas( + engine=engine, + features_tables=features_tables, + labels=labels, + states=states + ) with TemporaryDirectory() as temp_dir: architect = Architect( beginning_of_time = datetime.datetime(2010, 1, 1, 0, 0), label_names = ['booking'], label_types = ['binary'], + states = ['state_one AND state_two'], db_config = db_config, matrix_directory = temp_dir, user_metadata = {}, @@ -624,18 +762,19 @@ def test_train_matrix(self): 'label_name': 'booking', 'end_time': datetime.datetime(2016, 3, 1, 0, 0), 'beginning_of_time': datetime.datetime(2016, 1, 1, 0, 0), - 'label_window': '1 month' + 'label_window': '1 month', + 'state': 'state_one AND state_two' } uuid = metta.generate_uuid(matrix_metadata) architect.build_matrix( - as_of_times = dates, - label_name = 'booking', - label_type = 'binary', - feature_dictionary = feature_dictionary, - matrix_directory = temp_dir, - matrix_metadata = matrix_metadata, - matrix_uuid = uuid, - matrix_type = 'train' + as_of_times=dates, + label_name='booking', + label_type='binary', + feature_dictionary=feature_dictionary, + matrix_directory=temp_dir, + matrix_metadata=matrix_metadata, + matrix_uuid=uuid, + matrix_type='train' ) matrix_filename = os.path.join( @@ -644,23 +783,31 @@ def test_train_matrix(self): ) with open(matrix_filename, 'r') as f: reader = csv.reader(f) - assert(len([row for row in reader]) == 12) + assert(len([row for row in reader]) == 9) def test_test_matrix(self): + dates = [ + datetime.datetime(2016, 1, 1, 0, 0), + datetime.datetime(2016, 2, 1, 0, 0), + datetime.datetime(2016, 3, 1, 0, 0) + ] + with testing.postgresql.Postgresql() as postgresql: # create an engine and generate a table with fake feature data engine = create_engine(postgresql.url()) - create_features_and_labels_schemas(engine, features_tables, labels) - - dates = [datetime.datetime(2016, 1, 1, 0, 0), - datetime.datetime(2016, 2, 1, 0, 0), - datetime.datetime(2016, 3, 1, 0, 0)] + create_schemas( + engine=engine, + features_tables=features_tables, + labels=labels, + states=states + ) with TemporaryDirectory() as temp_dir: architect = Architect( beginning_of_time = datetime.datetime(2010, 1, 1, 0, 0), label_names = ['booking'], label_types = ['binary'], + states = ['state_one AND state_two'], db_config = db_config, matrix_directory = temp_dir, user_metadata = {}, @@ -681,18 +828,19 @@ def test_test_matrix(self): 'label_name': 'booking', 'end_time': datetime.datetime(2016, 3, 1, 0, 0), 'beginning_of_time': datetime.datetime(2016, 1, 1, 0, 0), - 'label_window': '1 month' + 'label_window': '1 month', + 'state': 'state_one AND state_two' } uuid = metta.generate_uuid(matrix_metadata) architect.build_matrix( - as_of_times = dates, - label_name = 'booking', - label_type = 'binary', - feature_dictionary = feature_dictionary, - matrix_directory = temp_dir, - matrix_metadata = matrix_metadata, - matrix_uuid = uuid, - matrix_type = 'test' + as_of_times=dates, + label_name='booking', + label_type='binary', + feature_dictionary=feature_dictionary, + matrix_directory=temp_dir, + matrix_metadata=matrix_metadata, + matrix_uuid=uuid, + matrix_type='test' ) print(os.listdir(temp_dir)) matrix_filename = os.path.join( @@ -702,23 +850,31 @@ def test_test_matrix(self): with open(matrix_filename, 'r') as f: reader = csv.reader(f) - assert(len([row for row in reader]) == 13) + assert(len([row for row in reader]) == 10) def test_replace(self): + dates = [ + datetime.datetime(2016, 1, 1, 0, 0), + datetime.datetime(2016, 2, 1, 0, 0), + datetime.datetime(2016, 3, 1, 0, 0) + ] + with testing.postgresql.Postgresql() as postgresql: # create an engine and generate a table with fake feature data engine = create_engine(postgresql.url()) - create_features_and_labels_schemas(engine, features_tables, labels) - - dates = [datetime.datetime(2016, 1, 1, 0, 0), - datetime.datetime(2016, 2, 1, 0, 0), - datetime.datetime(2016, 3, 1, 0, 0)] + create_schemas( + engine=engine, + features_tables=features_tables, + labels=labels, + states=states + ) with TemporaryDirectory() as temp_dir: architect = Architect( beginning_of_time = datetime.datetime(2010, 1, 1, 0, 0), label_names = ['booking'], label_types = ['binary'], + states = ['state_one AND state_two'], db_config = db_config, matrix_directory = temp_dir, user_metadata = {}, @@ -740,18 +896,19 @@ def test_replace(self): 'label_name': 'booking', 'end_time': datetime.datetime(2016, 3, 1, 0, 0), 'beginning_of_time': datetime.datetime(2016, 1, 1, 0, 0), - 'label_window': '1 month' + 'label_window': '1 month', + 'state': 'state_one AND state_two' } uuid = metta.generate_uuid(matrix_metadata) architect.build_matrix( - as_of_times = dates, - label_name = 'booking', - label_type = 'binary', - feature_dictionary = feature_dictionary, - matrix_directory = temp_dir, - matrix_metadata = matrix_metadata, - matrix_uuid = uuid, - matrix_type = 'test' + as_of_times=dates, + label_name='booking', + label_type='binary', + feature_dictionary=feature_dictionary, + matrix_directory=temp_dir, + matrix_metadata=matrix_metadata, + matrix_uuid=uuid, + matrix_type='test' ) matrix_filename = os.path.join( @@ -761,18 +918,18 @@ def test_replace(self): with open(matrix_filename, 'r') as f: reader = csv.reader(f) - assert(len([row for row in reader]) == 13) + assert(len([row for row in reader]) == 10) # rerun architect.builder.make_entity_date_table = Mock() architect.builder.build_matrix( - as_of_times = dates, - label_name = 'booking', - label_type = 'binary', - feature_dictionary = feature_dictionary, - matrix_directory = temp_dir, - matrix_metadata = matrix_metadata, - matrix_uuid = uuid, - matrix_type = 'test' + as_of_times=dates, + label_name='booking', + label_type='binary', + feature_dictionary=feature_dictionary, + matrix_directory=temp_dir, + matrix_metadata=matrix_metadata, + matrix_uuid=uuid, + matrix_type='test' ) assert not architect.builder.make_entity_date_table.called diff --git a/tests/utils.py b/tests/utils.py index e912228..9db8965 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -11,7 +11,7 @@ def convert_string_column_to_date(column): [datetime.datetime.strptime(date, '%Y-%m-%d').date() for date in column] ) -def create_features_and_labels_schemas(engine, features_tables, labels): +def create_schemas(engine, features_tables, labels, states): """ This function makes a features schema and populates it with the fake data from above. @@ -41,6 +41,24 @@ def create_features_and_labels_schemas(engine, features_tables, labels): 'insert into labels.labels values (%s, %s, %s, %s, %s, %s)', row ) + # create entities table + engine.execute('drop schema if exists staging cascade; create schema staging;') + engine.execute( + """ + create table staging.sparse_states ( + entity_id int, + as_of_date date, + state_one bool, + state_two bool + ) + """ + ) + for row in states: + engine.execute( + 'insert into staging.sparse_states values (%s, %s, %s, %s)', + row + ) + def create_features_table(table_number, table, engine): engine.execute( @@ -59,9 +77,11 @@ def create_features_table(table_number, table, engine): ) def create_entity_date_df( - dates, labels, + states, as_of_dates, + state_one, + state_two, label_name, label_type, label_window @@ -78,19 +98,29 @@ def create_entity_date_df( 'label_type', 'label' ]) - dates = [date.date() for date in dates] + states_table = pd.DataFrame(states, columns = [ + 'entity_id', + 'as_of_date', + 'state_one', + 'state_two' + ]).set_index(['entity_id', 'as_of_date']) + as_of_dates = [date.date() for date in as_of_dates] labels_table = labels_table[labels_table['label_name'] == label_name] labels_table = labels_table[labels_table['label_type'] == label_type] labels_table = labels_table[labels_table['label_window'] == label_window] + labels_table = labels_table.join( + other=states_table, + on=('entity_id', 'as_of_date'), + ) + labels_table = labels_table[labels_table['state_one'] & labels_table['state_two']] ids_dates = labels_table[['entity_id', 'as_of_date']] ids_dates = ids_dates.sort_values(['entity_id', 'as_of_date']) ids_dates['as_of_date'] = [datetime.datetime.strptime( date, '%Y-%m-%d' ).date() for date in ids_dates['as_of_date']] - ids_dates = ids_dates[ids_dates['as_of_date'].isin(dates)] + ids_dates = ids_dates[ids_dates['as_of_date'].isin(as_of_dates)] print(ids_dates) - print(dates) return(ids_dates.reset_index(drop = True)) diff --git a/timechop/architect.py b/timechop/architect.py index 1a0394f..c6faa39 100644 --- a/timechop/architect.py +++ b/timechop/architect.py @@ -11,12 +11,23 @@ class Architect(object): - def __init__(self, beginning_of_time, label_names, label_types, db_config, - matrix_directory, user_metadata, engine, - builder_class=builders.HighMemoryCSVBuilder, replace=True): + def __init__( + self, + beginning_of_time, + label_names, + label_types, + states, + db_config, + matrix_directory, + user_metadata, + engine, + builder_class=builders.HighMemoryCSVBuilder, + replace=True + ): self.beginning_of_time = beginning_of_time # earliest time included in features self.label_names = label_names self.label_types = label_types + self.states = states self.db_config = db_config self.matrix_directory = matrix_directory self.user_metadata = user_metadata @@ -47,8 +58,15 @@ def _generate_build_task( 'matrix_type': matrix_metadata['matrix_type'] } - def _make_metadata(self, matrix_definition, feature_dictionary, label_name, - label_type, matrix_type): + def _make_metadata( + self, + matrix_definition, + feature_dictionary, + label_name, + label_type, + state, + matrix_type + ): """ Generate dictionary of matrix metadata. :param matrix_definition: temporal definition of matrix @@ -86,6 +104,7 @@ def _make_metadata(self, matrix_definition, feature_dictionary, label_name, # other information 'label_type': label_type, + 'state': state, 'matrix_id': matrix_id, 'matrix_type': matrix_type @@ -113,9 +132,10 @@ def generate_plans(self, matrix_set_definitions, feature_dictionaries): build_tasks = dict() for matrix_set in matrix_set_definitions: train_matrix = matrix_set['train_matrix'] - for label_name, label_type, feature_dictionary in itertools.product( + for label_name, label_type, state, feature_dictionary in itertools.product( self.label_names, self.label_types, + self.states, feature_dictionaries ): matrix_set_clone = copy.deepcopy(matrix_set) @@ -125,6 +145,7 @@ def generate_plans(self, matrix_set_definitions, feature_dictionaries): feature_dictionary, label_name, label_type, + state, 'train', ) train_uuid = metta.generate_uuid(train_metadata) @@ -144,6 +165,7 @@ def generate_plans(self, matrix_set_definitions, feature_dictionaries): feature_dictionary, label_name, label_type, + state, 'test', ) test_uuid = metta.generate_uuid(test_metadata) diff --git a/timechop/builders.py b/timechop/builders.py index b7fa196..e76396d 100644 --- a/timechop/builders.py +++ b/timechop/builders.py @@ -44,6 +44,7 @@ def build_labels_query( final_column, label_name, label_type, + state, label_window ): """ Given a table, schema, and list of dates, write a query to get the @@ -61,23 +62,28 @@ def build_labels_query( """ as_of_time_strings = [str(as_of_time) for as_of_time in as_of_times] query = """ - SELECT entity_id, - as_of_date{labels} - FROM {labels_schema_name}.{labels_table_name} - WHERE as_of_date IN (SELECT (UNNEST (ARRAY{times}::timestamp[]))) AND + SELECT l.entity_id, + l.as_of_date{labels} + FROM {labels_schema_name}.{labels_table_name} l + JOIN {states_table} s + ON l.entity_id = s.entity_id + WHERE l.as_of_date IN (SELECT (UNNEST (ARRAY{times}::timestamp[]))) AND label_name = '{l_name}' AND label_type = '{l_type}' AND - label_window = '{window}' - ORDER BY entity_id, - as_of_date + label_window = '{window}' AND + {state_string} + ORDER BY l.entity_id, + l.as_of_date """.format( - labels = final_column, - labels_schema_name = self.db_config['labels_schema_name'], - labels_table_name = self.db_config['labels_table_name'], - times = as_of_time_strings, - l_name = label_name, - l_type = label_type, - window = label_window + labels=final_column, + labels_schema_name=self.db_config['labels_schema_name'], + labels_table_name=self.db_config['labels_table_name'], + states_table=self.db_config['sparse_state_table_name'], + state_string=state, + times=as_of_time_strings, + l_name=label_name, + l_type=label_type, + window=label_window ) return(query) @@ -138,7 +144,7 @@ def make_entity_date_table( as_of_times, label_name, label_type, - feature_table_names, + state, matrix_type, matrix_uuid, label_window @@ -160,23 +166,20 @@ def make_entity_date_table( final_column='', label_name=label_name, label_type=label_type, + state=state, label_window=label_window ) elif matrix_type == 'test': - indices_query = self.get_all_valid_entity_date_combos( - as_of_times=as_of_times, - feature_table_names=feature_table_names - ) + indices_query = self.get_all_valid_entity_date_combos(state=state) else: raise ValueError('Unknown matrix type passed: {}'.format(matrix_type)) - table_name = '_'.join([matrix_uuid, 'tmp_entity_date']) + table_name = '_'.join(['tmp_entity_date', matrix_uuid]) query = """ - DROP TABLE IF EXISTS {features_schema_name}."{table_name}"; - CREATE TABLE {features_schema_name}."{table_name}" + DROP TABLE IF EXISTS "{table_name}"; + CREATE TEMPORARY TABLE "{table_name}" AS ({index_query}) """.format( - features_schema_name=self.db_config['features_schema_name'], table_name=table_name, index_query=indices_query ) @@ -184,26 +187,17 @@ def make_entity_date_table( return table_name - def get_all_valid_entity_date_combos(self, as_of_times, feature_table_names): - as_of_time_strings = [str(as_of_time) for as_of_time in as_of_times] - query_list = [] - for index, table in enumerate(feature_table_names): - union = '' - if index != 0: - union = 'UNION' - subquery = """ {u} - SELECT DISTINCT entity_id, as_of_date - FROM {schema_name}.{table_name} - WHERE as_of_date IN (SELECT (UNNEST (ARRAY{dates}::timestamp[]))) - """.format( - u = union, - table_name = table, - dates = as_of_time_strings, - schema_name = self.db_config['features_schema_name'] - ) - query_list.append(subquery) - - return(''.join(query_list)) + def get_all_valid_entity_date_combos(self, state): + query = """ + SELECT entity_id, as_of_date + FROM {states_table} + WHERE {state_string} + ORDER BY entity_id, as_of_date + """.format( + states_table=self.db_config['sparse_state_table_name'], + state_string=state + ) + return(query) class CSVBuilder(BuilderBase): @@ -256,63 +250,56 @@ def build_matrix( as_of_times, label_name, label_type, - feature_dictionary.keys(), + matrix_metadata['state'], matrix_type, matrix_uuid, matrix_metadata['label_window'] ) + logging.info('Writing feature group data') + features_csv_names = self.write_features_data( + as_of_times, + feature_dictionary, + entity_date_table_name, + matrix_uuid + ) try: - logging.info('Writing feature group data') - features_csv_names = self.write_features_data( + logging.info('Writing label data') + labels_csv_name = self.write_labels_data( as_of_times, - feature_dictionary, + label_name, + label_type, + matrix_metadata['state'], + matrix_type, entity_date_table_name, + matrix_uuid, + matrix_metadata['label_window'] + ) + features_csv_names.insert(0, labels_csv_name) + + # stitch together the csvs + logging.info('Merging features data') + output = self.merge_feature_csvs( + features_csv_names, + matrix_directory, matrix_uuid ) - try: - logging.info('Writing label data') - labels_csv_name = self.write_labels_data( - as_of_times, - label_name, - label_type, - matrix_type, - entity_date_table_name, - matrix_uuid, - matrix_metadata['label_window'] - ) - features_csv_names.insert(0, labels_csv_name) - - # stitch together the csvs - logging.info('Merging features data') - output = self.merge_feature_csvs( - features_csv_names, - matrix_directory, - matrix_uuid - ) - finally: - # clean up files and database before finishing - for csv_name in features_csv_names: - self.remove_file(csv_name) - try: - # store the matrix - logging.info('Archiving matrix with metta') - metta.archive_matrix( - matrix_config=matrix_metadata, - df_matrix=output, - overwrite=True, - directory=self.matrix_directory, - format='csv' - ) - finally: - if isinstance(output, str): - os.remove(output) finally: - self.engine.execute( - 'drop table "{}"."{}";'.format( - self.db_config['features_schema_name'], - entity_date_table_name - ) + # clean up files and database before finishing + for csv_name in features_csv_names: + self.remove_file(csv_name) + try: + # store the matrix + logging.info('Archiving matrix with metta') + metta.archive_matrix( + matrix_config=matrix_metadata, + df_matrix=output, + overwrite=True, + directory=self.matrix_directory, + format='csv' ) + finally: + if isinstance(output, str): + os.remove(output) def write_labels_data( @@ -320,6 +307,7 @@ def write_labels_data( as_of_times, label_name, label_type, + state, matrix_type, entity_date_table_name, matrix_uuid, @@ -341,17 +329,17 @@ def write_labels_data( final_column=', label as {}'.format(label_name), label_name=label_name, label_type=label_type, + state=state, label_window=label_window ) elif matrix_type == 'test': - labels_query=self.build_outer_join_query( + labels_query = self.build_outer_join_query( as_of_times=as_of_times, right_table_name='{schema}.{table}'.format( schema=self.db_config['labels_schema_name'], table=self.db_config['labels_table_name'] ), - entity_date_table_name='"{schema}"."{table}"'.format( - schema=self.db_config['features_schema_name'], + entity_date_table_name='"{table}"'.format( table=entity_date_table_name ), right_column_selections=', r.label as {}'.format(label_name), @@ -372,7 +360,13 @@ def write_labels_data( self.write_to_csv(labels_query, csv_name) return(csv_name) - def write_features_data(self, as_of_times, feature_dictionary, entity_date_table_name, matrix_uuid): + def write_features_data( + self, + as_of_times, + feature_dictionary, + entity_date_table_name, + matrix_uuid + ): """ Loop over tables in features schema, writing the data from each to a csv. Return the full list of feature csv names and the list of all features. @@ -394,10 +388,7 @@ def write_features_data(self, as_of_times, feature_dictionary, entity_date_table schema = self.db_config['features_schema_name'], table = feature_table_name ), - entity_date_table_name = '{schema}."{table}"'.format( - schema = self.db_config['features_schema_name'], - table = entity_date_table_name - ), + entity_date_table_name = '"{}"'.format(entity_date_table_name), right_column_selections = self._format_imputations(feature_names) ) self.write_to_csv(features_query, csv_name)