diff --git a/catwalk/storage.py b/catwalk/storage.py index 53e263c3d..e4acd8089 100644 --- a/catwalk/storage.py +++ b/catwalk/storage.py @@ -111,10 +111,52 @@ def get_store(self, model_hash): class MatrixStore(object): - matrix = None - metadata = None _labels = None + def __init__(self, matrix_path=None, metadata_path=None): + self.matrix_path = matrix_path + self.metadata_path = metadata_path + self._matrix = None + self._metadata = None + self._head_of_matrix = None + + @property + def matrix(self): + if self._matrix is None: + self._load() + return self._matrix + + @property + def metadata(self): + if self._metadata is None: + self._load() + return self._metadata + + @property + def head_of_matrix(self): + if self._head_of_matrix is None: + self._get_head_of_matrix() + return self._head_of_matrix + + @property + def empty(self): + if not os.path.isfile(self.matrix_path): + return True + else: + head_of_matrix = self.head_of_matrix + return head_of_matrix.empty + + def columns(self, include_label=False): + head_of_matrix = self.head_of_matrix + columns = head_of_matrix.columns.tolist() + if include_label: + return columns + else: + return [ + col for col in columns + if col != self.metadata['label_name'] + ] + def labels(self): if self._labels is not None: logging.debug('using stored labels') @@ -128,15 +170,6 @@ def labels(self): def uuid(self): return self.metadata['metta-uuid'] - def columns(self, include_label=False): - columns = self.matrix.columns.tolist() - if include_label: - return columns - else: - return [ - col for col in columns - if col != self.metadata['label_name'] - ] def matrix_with_sorted_columns(self, columns): columnset = set(self.columns()) @@ -160,50 +193,33 @@ def matrix_with_sorted_columns(self, columns): ''', columnset ^ desired_columnset) -class MettaMatrixStore(MatrixStore): - def __init__(self, matrix_path, metadata_path): - self.matrix = pandas.read_hdf(matrix_path) - with open(metadata_path) as f: - self.metadata = yaml.load(f) +class HDFMatrixStore(MatrixStore): + def _get_head_of_matrix(self): + try: + hdf = pandas.HDFStore(self.matrix_path) + key = hdf.keys()[0] + head_of_matrix = hdf.select(key, start=0, stop=1) + head_of_matrix.set_index(self.metadata['indices'], inplace=True) + self._head_of_matrix = head_of_matrix + except pandas.error.EmptyDataError: + self._head_of_matrix = None -class MettaCSVMatrixStore(MatrixStore): - def __init__(self, matrix_path, metadata_path): - self.matrix_path = matrix_path - self.metadata_path = metadata_path - self._matrix = None - self._metadata = None - - @property - def matrix(self): - if self._matrix is None: - self._load() - return self._matrix - - @property - def metadata(self): - if self._metadata is None: - self._load() - return self._metadata + def _load(self): + self._matrix = pandas.read_hdf(self.matrix_path, mode='r+') + with open(self.metadata_path) as f: + self._metadata = yaml.load(f) + self._matrix.set_index(self.metadata['indices'], inplace=True) - @property - def empty(self): - if not os.path.isfile(self.matrix_path): - return True - else: - return pandas.read_csv(self.matrix_path, nrows=1).empty - def columns(self, include_label=False): - head_of_matrix = pandas.read_csv(self.matrix_path, nrows=1) - head_of_matrix.set_index(self.metadata['indices'], inplace=True) - columns = head_of_matrix.columns.tolist() - if include_label: - return columns - else: - return [ - col for col in columns - if col != self.metadata['label_name'] - ] +class CSVMatrixStore(MatrixStore): + def _get_head_of_matrix(self): + try: + head_of_matrix = pandas.read_csv(self.matrix_path, nrows=1) + head_of_matrix.set_index(self.metadata['indices'], inplace=True) + self._head_of_matrix = head_of_matrix + except pandas.error.EmptyDataError: + self._head_of_matrix = None def _load(self): self._matrix = pandas.read_csv(self.matrix_path) @@ -214,6 +230,21 @@ def _load(self): class InMemoryMatrixStore(MatrixStore): def __init__(self, matrix, metadata, labels=None): - self.matrix = matrix - self.metadata = metadata + self._matrix = matrix + self._metadata = metadata self._labels = labels + self._head_of_matrix = None + + def _get_head_of_matrix(self): + self._head_of_matrix = self.matrix.head(n=1) + + @property + def empty(self): + head_of_matrix = self.head_of_matrix + return head_of_matrix.empty + + @property + def matrix(self): + if self._metadata['indices'][0] in self._matrix.columns: + self._matrix.set_index(self._metadata['indices'], inplace=True) + return self._matrix diff --git a/tests/test_integration.py b/tests/test_integration.py index 6503a5328..a64d26a44 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -39,6 +39,7 @@ def test_integration(): 'label_window': '1y', 'feature_names': ['ft1', 'ft2'], 'metta-uuid': '1234', + 'indices': ['entity_id'], } train_store = InMemoryMatrixStore(train_matrix, train_metadata) @@ -55,12 +56,13 @@ def test_integration(): 'feature_one': [8], 'feature_two': [5], 'label': [5] - }).set_index('entity_id'), + }), { 'label_name': 'label', 'label_window': '1y', 'end_time': as_of_date, 'metta-uuid': '1234', + 'indices': ['entity_id'], } ) for as_of_date in as_of_dates diff --git a/tests/test_model_trainers.py b/tests/test_model_trainers.py index 2889d069d..40afab0ee 100644 --- a/tests/test_model_trainers.py +++ b/tests/test_model_trainers.py @@ -48,7 +48,8 @@ def test_model_trainer(): 'label_name': 'label', 'label_window': '1y', 'metta-uuid': '1234', - 'feature_names': ['ft1', 'ft2'] + 'feature_names': ['ft1', 'ft2'], + 'indices': ['entity_id'], } project_path = 'econ-dev/inspections' model_storage_engine = S3ModelStorageEngine(s3_conn, project_path) @@ -72,7 +73,7 @@ def test_model_trainer(): row for row in engine.execute('select * from results.feature_importances') ] - assert len(records) == 4 * 3 # maybe exclude entity_id? + assert len(records) == 4 * 2 # maybe exclude entity_id? yes records = [ row for row in @@ -106,6 +107,9 @@ def test_model_trainer(): 'feature_one': [4, 4], 'feature_two': [6, 5], }) + + test_matrix = InMemoryMatrixStore(matrix=test_matrix, metadata=metadata).matrix + for model_pickle in model_pickles: predictions = model_pickle.predict(test_matrix) assert len(predictions) == 2 @@ -142,7 +146,7 @@ def test_model_trainer(): row for row in engine.execute('select * from results.feature_importances') ] - assert len(records) == 4 * 3 # maybe exclude entity_id? + assert len(records) == 4 * 2 # maybe exclude entity_id? yes # 7. if the cache is missing but the metadata is still there, reuse the metadata for row in engine.execute('select model_hash from results.models'): @@ -207,7 +211,8 @@ def test_n_jobs_not_new_model(): 'beginning_of_time': datetime.date(2012, 12, 20), 'label_name': 'label', 'metta-uuid': '1234', - 'feature_names': ['ft1', 'ft2'] + 'feature_names': ['ft1', 'ft2'], + 'indices': ['entity_id'], }) ) assert len(train_tasks) == 35 # 32+3, would be (32*2)+3 if we didn't remove @@ -237,7 +242,7 @@ def test_retry_max(self): trainer = None # set up a basic model training run # TODO abstract the setup of a basic model training run where - # we don't worry about the specific values used? it would make + # we don't worry about the specific values used? it would make # tests like this require a bit less noise to read past with testing.postgresql.Postgresql() as postgresql: engine = create_engine(postgresql.url()) @@ -262,7 +267,8 @@ def test_retry_max(self): 'beginning_of_time': datetime.date(2012, 12, 20), 'label_name': 'label', 'metta-uuid': '1234', - 'feature_names': ['ft1', 'ft2'] + 'feature_names': ['ft1', 'ft2'], + 'indices': ['entity_id'], }) # the postgres server goes out of scope here and thus no longer exists with patch('time.sleep') as time_mock: @@ -306,7 +312,8 @@ def test_retry_recovery(self): 'beginning_of_time': datetime.date(2012, 12, 20), 'label_name': 'label', 'metta-uuid': '1234', - 'feature_names': ['ft1', 'ft2'] + 'feature_names': ['ft1', 'ft2'], + 'indices': ['entity_id'], }) # start without a database server diff --git a/tests/test_predictors.py b/tests/test_predictors.py index 2cd380cf3..5aa15ffac 100644 --- a/tests/test_predictors.py +++ b/tests/test_predictors.py @@ -49,6 +49,7 @@ def test_predictor(): 'end_time': AS_OF_DATE, 'label_window': '3month', 'metta-uuid': '1234', + 'indices': ['entity_id'], } matrix_store = InMemoryMatrixStore(matrix, metadata) @@ -95,6 +96,7 @@ def test_predictor(): 'end_time': AS_OF_DATE + datetime.timedelta(days=1), 'label_window': '3month', 'metta-uuid': '1234', + 'indices': ['entity_id'], } new_matrix_store = InMemoryMatrixStore(new_matrix, new_metadata) predictor.predict( @@ -146,6 +148,7 @@ def test_predictor_composite_index(): 'end_time': AS_OF_DATE, 'label_window': '3month', 'metta-uuid': '1234', + 'indices': ['entity_id'], } matrix_store = InMemoryMatrixStore(matrix, metadata) predict_proba = predictor.predict( @@ -235,6 +238,7 @@ def test_predictor_retrieve(): 'end_time': AS_OF_DATE, 'label_window': '3month', 'metta-uuid': '1234', + 'indices': ['entity_id'], } matrix_store = InMemoryMatrixStore(matrix, metadata) predict_proba = predictor.predict( diff --git a/tests/test_storage.py b/tests/test_storage.py index 499a81bae..a42af9a56 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -1,4 +1,4 @@ -from catwalk.storage import S3Store, FSStore, MemoryStore, InMemoryMatrixStore +from catwalk.storage import S3Store, FSStore, MemoryStore, InMemoryMatrixStore, HDFMatrixStore, CSVMatrixStore from moto import mock_s3 import tempfile import boto3 @@ -6,7 +6,7 @@ import pandas from collections import OrderedDict import unittest - +import yaml class SomeClass(object): def __init__(self, val): @@ -64,56 +64,96 @@ def matrix_store(self): 'label_name': 'label', 'indices': ['entity_id'], } - matrix_store = InMemoryMatrixStore(matrix=df, metadata=metadata) + + inmemory = InMemoryMatrixStore(matrix=df, metadata=metadata) + + with tempfile.TemporaryDirectory() as tmpdir: + tmpcsv = os.path.join(tmpdir, 'df.csv') + tmpyaml = os.path.join(tmpdir, 'metadata.yaml') + tmphdf = os.path.join(tmpdir, 'df.h5') + with open(tmpyaml, 'w') as outfile: + yaml.dump(metadata, outfile, default_flow_style=False) + df.to_csv(tmpcsv, index=False) + df.to_hdf(tmphdf, 'test') + csv = CSVMatrixStore(matrix_path=tmpcsv, metadata_path=tmpyaml) + hdf = HDFMatrixStore(matrix_path=tmphdf, metadata_path=tmpyaml) + + assert csv.matrix.to_dict() == inmemory.matrix.to_dict() + assert hdf.matrix.to_dict() == inmemory.matrix.to_dict() + + assert csv.metadata == inmemory.metadata + assert hdf.metadata == inmemory.metadata + + assert csv.head_of_matrix.to_dict() == inmemory.head_of_matrix.to_dict() + assert hdf.head_of_matrix.to_dict() == inmemory.head_of_matrix.to_dict() + + assert csv.empty == inmemory.empty + assert hdf.empty == inmemory.empty + + assert csv.labels().to_dict() == inmemory.labels().to_dict() + assert hdf.labels().to_dict() == inmemory.labels().to_dict() + + + matrix_store = [inmemory, hdf, csv] return matrix_store def test_MatrixStore_resort_columns(self): - result = self.matrix_store().\ - matrix_with_sorted_columns( - ['entity_id', 'm_feature', 'k_feature'] - )\ - .values\ - .tolist() - expected = [ - [1, 0.4, 0.5], - [2, 0.5, 0.4] - ] - self.assertEqual(expected, result) + matrix_store_list = self.matrix_store() + for matrix_store in matrix_store_list: + result = matrix_store.\ + matrix_with_sorted_columns( + ['m_feature', 'k_feature'] + )\ + .values\ + .tolist() + expected = [ + [0.4, 0.5], + [0.5, 0.4] + ] + self.assertEqual(expected, result) def test_MatrixStore_already_sorted_columns(self): - result = self.matrix_store().\ - matrix_with_sorted_columns( - ['entity_id', 'k_feature', 'm_feature'] - )\ - .values\ - .tolist() - expected = [ - [1, 0.5, 0.4], - [2, 0.4, 0.5] - ] - self.assertEqual(expected, result) + matrix_store_list = self.matrix_store() + for matrix_store in matrix_store_list: + result = matrix_store.\ + matrix_with_sorted_columns( + ['k_feature', 'm_feature'] + )\ + .values\ + .tolist() + expected = [ + [0.5, 0.4], + [0.4, 0.5] + ] + self.assertEqual(expected, result) def test_MatrixStore_sorted_columns_subset(self): with self.assertRaises(ValueError): - self.matrix_store().\ - matrix_with_sorted_columns(['entity_id', 'm_feature'])\ - .values\ - .tolist() + matrix_store_list = self.matrix_store() + for matrix_store in matrix_store_list: + matrix_store.\ + matrix_with_sorted_columns(['m_feature'])\ + .values\ + .tolist() def test_MatrixStore_sorted_columns_superset(self): with self.assertRaises(ValueError): - self.matrix_store().\ - matrix_with_sorted_columns( - ['entity_id', 'k_feature', 'l_feature', 'm_feature'] - )\ - .values\ - .tolist() + matrix_store_list = self.matrix_store() + for matrix_store in matrix_store_list: + matrix_store.\ + matrix_with_sorted_columns( + ['k_feature', 'l_feature', 'm_feature'] + )\ + .values\ + .tolist() def test_MatrixStore_sorted_columns_mismatch(self): with self.assertRaises(ValueError): - self.matrix_store().\ - matrix_with_sorted_columns( - ['entity_id', 'k_feature', 'l_feature'] - )\ - .values\ - .tolist() + matrix_store_list = self.matrix_store() + for matrix_store in matrix_store_list: + matrix_store.\ + matrix_with_sorted_columns( + ['k_feature', 'l_feature'] + )\ + .values\ + .tolist() diff --git a/tests/utils.py b/tests/utils.py index 34a734bf6..542708631 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -8,7 +8,7 @@ from sqlalchemy.orm import sessionmaker import datetime import os -from catwalk.storage import MettaCSVMatrixStore +from catwalk.storage import CSVMatrixStore, HDFMatrixStore from metta import metta_io as metta from collections import OrderedDict @@ -116,12 +116,73 @@ def sample_metta_csv_diff_order(directory): format='csv' ) - train_store = MettaCSVMatrixStore( + train_store = CSVMatrixStore( matrix_path=os.path.join(directory, '{}.csv'.format(train_uuid)), metadata_path=os.path.join(directory, '{}.yaml'.format(train_uuid)) ) - test_store = MettaCSVMatrixStore( + test_store = CSVMatrixStore( matrix_path=os.path.join(directory, '{}.csv'.format(test_uuid)), metadata_path=os.path.join(directory, '{}.yaml'.format(test_uuid)) ) return train_store, test_store + + +def sample_metta_hdf_diff_order(directory): + """Stores matrix and metadata in a metta-data-like form + + The train and test matrices will have different column orders + + Args: + directory (str) + """ + train_dict = OrderedDict([ + ('entity_id', [1, 2]), + ('k_feature', [0.5, 0.4]), + ('m_feature', [0.4, 0.5]), + ('label', [0, 1]) + ]) + train_matrix = pandas.DataFrame.from_dict(train_dict) + train_metadata = { + 'beginning_of_time': datetime.date(2014, 1, 1), + 'end_time': datetime.date(2015, 1, 1), + 'matrix_id': 'train_matrix', + 'label_name': 'label', + 'label_window': '3month', + 'indices': ['entity_id'], + } + + test_dict = OrderedDict([ + ('entity_id', [3, 4]), + ('m_feature', [0.4, 0.5]), + ('k_feature', [0.5, 0.4]), + ('label', [0, 1]) + ]) + + test_matrix = pandas.DataFrame.from_dict(test_dict) + test_metadata = { + 'beginning_of_time': datetime.date(2015, 1, 1), + 'end_time': datetime.date(2016, 1, 1), + 'matrix_id': 'test_matrix', + 'label_name': 'label', + 'label_window': '3month', + 'indices': ['entity_id'], + } + + train_uuid, test_uuid = metta.archive_train_test( + train_config=train_metadata, + df_train=train_matrix, + test_config=test_metadata, + df_test=test_matrix, + directory=directory, + format='hdf' + ) + + train_store = HDFMatrixStore( + matrix_path=os.path.join(directory, '{}.h5'.format(train_uuid)), + metadata_path=os.path.join(directory, '{}.yaml'.format(train_uuid)) + ) + test_store = HDFMatrixStore( + matrix_path=os.path.join(directory, '{}.h5'.format(test_uuid)), + metadata_path=os.path.join(directory, '{}.yaml'.format(test_uuid)) + ) + return train_store, test_store