Skip to content

Commit

Permalink
Matrix storage (#24)
Browse files Browse the repository at this point in the history
* Add HDFMatrixStore() class

* Solve #17

* refactor MatrixStore

* add unittest

* remove MettaMatrixStore

* test csv and hdf support

* make HDFMatrixStore, CSVMatrixStore and InMemoryMatrixStore have the same interface

* fix tests to be compatible with new matrixStore interface

* remove old columns() method and add more attribute testing
  • Loading branch information
tweddielin authored and thcrock committed Sep 14, 2017
1 parent 3f7e5c1 commit bda2dfa
Show file tree
Hide file tree
Showing 6 changed files with 250 additions and 105 deletions.
137 changes: 84 additions & 53 deletions catwalk/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,52 @@ def get_store(self, model_hash):


class MatrixStore(object):
matrix = None
metadata = None
_labels = None

def __init__(self, matrix_path=None, metadata_path=None):
self.matrix_path = matrix_path
self.metadata_path = metadata_path
self._matrix = None
self._metadata = None
self._head_of_matrix = None

@property
def matrix(self):
if self._matrix is None:
self._load()
return self._matrix

@property
def metadata(self):
if self._metadata is None:
self._load()
return self._metadata

@property
def head_of_matrix(self):
if self._head_of_matrix is None:
self._get_head_of_matrix()
return self._head_of_matrix

@property
def empty(self):
if not os.path.isfile(self.matrix_path):
return True
else:
head_of_matrix = self.head_of_matrix
return head_of_matrix.empty

def columns(self, include_label=False):
head_of_matrix = self.head_of_matrix
columns = head_of_matrix.columns.tolist()
if include_label:
return columns
else:
return [
col for col in columns
if col != self.metadata['label_name']
]

def labels(self):
if self._labels is not None:
logging.debug('using stored labels')
Expand All @@ -128,15 +170,6 @@ def labels(self):
def uuid(self):
return self.metadata['metta-uuid']

def columns(self, include_label=False):
columns = self.matrix.columns.tolist()
if include_label:
return columns
else:
return [
col for col in columns
if col != self.metadata['label_name']
]

def matrix_with_sorted_columns(self, columns):
columnset = set(self.columns())
Expand All @@ -160,50 +193,33 @@ def matrix_with_sorted_columns(self, columns):
''', columnset ^ desired_columnset)


class MettaMatrixStore(MatrixStore):
def __init__(self, matrix_path, metadata_path):
self.matrix = pandas.read_hdf(matrix_path)
with open(metadata_path) as f:
self.metadata = yaml.load(f)
class HDFMatrixStore(MatrixStore):
def _get_head_of_matrix(self):
try:
hdf = pandas.HDFStore(self.matrix_path)
key = hdf.keys()[0]
head_of_matrix = hdf.select(key, start=0, stop=1)
head_of_matrix.set_index(self.metadata['indices'], inplace=True)
self._head_of_matrix = head_of_matrix
except pandas.error.EmptyDataError:
self._head_of_matrix = None


class MettaCSVMatrixStore(MatrixStore):
def __init__(self, matrix_path, metadata_path):
self.matrix_path = matrix_path
self.metadata_path = metadata_path
self._matrix = None
self._metadata = None

@property
def matrix(self):
if self._matrix is None:
self._load()
return self._matrix

@property
def metadata(self):
if self._metadata is None:
self._load()
return self._metadata
def _load(self):
self._matrix = pandas.read_hdf(self.matrix_path, mode='r+')
with open(self.metadata_path) as f:
self._metadata = yaml.load(f)
self._matrix.set_index(self.metadata['indices'], inplace=True)

@property
def empty(self):
if not os.path.isfile(self.matrix_path):
return True
else:
return pandas.read_csv(self.matrix_path, nrows=1).empty

def columns(self, include_label=False):
head_of_matrix = pandas.read_csv(self.matrix_path, nrows=1)
head_of_matrix.set_index(self.metadata['indices'], inplace=True)
columns = head_of_matrix.columns.tolist()
if include_label:
return columns
else:
return [
col for col in columns
if col != self.metadata['label_name']
]
class CSVMatrixStore(MatrixStore):
def _get_head_of_matrix(self):
try:
head_of_matrix = pandas.read_csv(self.matrix_path, nrows=1)
head_of_matrix.set_index(self.metadata['indices'], inplace=True)
self._head_of_matrix = head_of_matrix
except pandas.error.EmptyDataError:
self._head_of_matrix = None

def _load(self):
self._matrix = pandas.read_csv(self.matrix_path)
Expand All @@ -214,6 +230,21 @@ def _load(self):

class InMemoryMatrixStore(MatrixStore):
def __init__(self, matrix, metadata, labels=None):
self.matrix = matrix
self.metadata = metadata
self._matrix = matrix
self._metadata = metadata
self._labels = labels
self._head_of_matrix = None

def _get_head_of_matrix(self):
self._head_of_matrix = self.matrix.head(n=1)

@property
def empty(self):
head_of_matrix = self.head_of_matrix
return head_of_matrix.empty

@property
def matrix(self):
if self._metadata['indices'][0] in self._matrix.columns:
self._matrix.set_index(self._metadata['indices'], inplace=True)
return self._matrix
4 changes: 3 additions & 1 deletion tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def test_integration():
'label_window': '1y',
'feature_names': ['ft1', 'ft2'],
'metta-uuid': '1234',
'indices': ['entity_id'],
}

train_store = InMemoryMatrixStore(train_matrix, train_metadata)
Expand All @@ -55,12 +56,13 @@ def test_integration():
'feature_one': [8],
'feature_two': [5],
'label': [5]
}).set_index('entity_id'),
}),
{
'label_name': 'label',
'label_window': '1y',
'end_time': as_of_date,
'metta-uuid': '1234',
'indices': ['entity_id'],
}
)
for as_of_date in as_of_dates
Expand Down
21 changes: 14 additions & 7 deletions tests/test_model_trainers.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ def test_model_trainer():
'label_name': 'label',
'label_window': '1y',
'metta-uuid': '1234',
'feature_names': ['ft1', 'ft2']
'feature_names': ['ft1', 'ft2'],
'indices': ['entity_id'],
}
project_path = 'econ-dev/inspections'
model_storage_engine = S3ModelStorageEngine(s3_conn, project_path)
Expand All @@ -72,7 +73,7 @@ def test_model_trainer():
row for row in
engine.execute('select * from results.feature_importances')
]
assert len(records) == 4 * 3 # maybe exclude entity_id?
assert len(records) == 4 * 2 # maybe exclude entity_id? yes

records = [
row for row in
Expand Down Expand Up @@ -106,6 +107,9 @@ def test_model_trainer():
'feature_one': [4, 4],
'feature_two': [6, 5],
})

test_matrix = InMemoryMatrixStore(matrix=test_matrix, metadata=metadata).matrix

for model_pickle in model_pickles:
predictions = model_pickle.predict(test_matrix)
assert len(predictions) == 2
Expand Down Expand Up @@ -142,7 +146,7 @@ def test_model_trainer():
row for row in
engine.execute('select * from results.feature_importances')
]
assert len(records) == 4 * 3 # maybe exclude entity_id?
assert len(records) == 4 * 2 # maybe exclude entity_id? yes

# 7. if the cache is missing but the metadata is still there, reuse the metadata
for row in engine.execute('select model_hash from results.models'):
Expand Down Expand Up @@ -207,7 +211,8 @@ def test_n_jobs_not_new_model():
'beginning_of_time': datetime.date(2012, 12, 20),
'label_name': 'label',
'metta-uuid': '1234',
'feature_names': ['ft1', 'ft2']
'feature_names': ['ft1', 'ft2'],
'indices': ['entity_id'],
})
)
assert len(train_tasks) == 35 # 32+3, would be (32*2)+3 if we didn't remove
Expand Down Expand Up @@ -237,7 +242,7 @@ def test_retry_max(self):
trainer = None
# set up a basic model training run
# TODO abstract the setup of a basic model training run where
# we don't worry about the specific values used? it would make
# we don't worry about the specific values used? it would make
# tests like this require a bit less noise to read past
with testing.postgresql.Postgresql() as postgresql:
engine = create_engine(postgresql.url())
Expand All @@ -262,7 +267,8 @@ def test_retry_max(self):
'beginning_of_time': datetime.date(2012, 12, 20),
'label_name': 'label',
'metta-uuid': '1234',
'feature_names': ['ft1', 'ft2']
'feature_names': ['ft1', 'ft2'],
'indices': ['entity_id'],
})
# the postgres server goes out of scope here and thus no longer exists
with patch('time.sleep') as time_mock:
Expand Down Expand Up @@ -306,7 +312,8 @@ def test_retry_recovery(self):
'beginning_of_time': datetime.date(2012, 12, 20),
'label_name': 'label',
'metta-uuid': '1234',
'feature_names': ['ft1', 'ft2']
'feature_names': ['ft1', 'ft2'],
'indices': ['entity_id'],
})

# start without a database server
Expand Down
4 changes: 4 additions & 0 deletions tests/test_predictors.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def test_predictor():
'end_time': AS_OF_DATE,
'label_window': '3month',
'metta-uuid': '1234',
'indices': ['entity_id'],
}

matrix_store = InMemoryMatrixStore(matrix, metadata)
Expand Down Expand Up @@ -95,6 +96,7 @@ def test_predictor():
'end_time': AS_OF_DATE + datetime.timedelta(days=1),
'label_window': '3month',
'metta-uuid': '1234',
'indices': ['entity_id'],
}
new_matrix_store = InMemoryMatrixStore(new_matrix, new_metadata)
predictor.predict(
Expand Down Expand Up @@ -146,6 +148,7 @@ def test_predictor_composite_index():
'end_time': AS_OF_DATE,
'label_window': '3month',
'metta-uuid': '1234',
'indices': ['entity_id'],
}
matrix_store = InMemoryMatrixStore(matrix, metadata)
predict_proba = predictor.predict(
Expand Down Expand Up @@ -235,6 +238,7 @@ def test_predictor_retrieve():
'end_time': AS_OF_DATE,
'label_window': '3month',
'metta-uuid': '1234',
'indices': ['entity_id'],
}
matrix_store = InMemoryMatrixStore(matrix, metadata)
predict_proba = predictor.predict(
Expand Down
Loading

0 comments on commit bda2dfa

Please sign in to comment.