From efd6e61a8d01498990e1922661bff41ce72bc32a Mon Sep 17 00:00:00 2001 From: Tristan Crockett Date: Mon, 17 Jul 2017 16:49:54 -0500 Subject: [PATCH] Import code from triage - Add all modules and tests from triage that are not related to the Pipeline wrapper --- catwalk/db.py | 24 ++ catwalk/estimators/__init__.py | 0 catwalk/estimators/classifiers.py | 78 ++++ catwalk/estimators/dsapp_scaler.org | 556 ++++++++++++++++++++++++++++ catwalk/estimators/transformers.py | 71 ++++ catwalk/evaluation.py | 362 ++++++++++++++++++ catwalk/feature_importances.py | 59 +++ catwalk/model_trainers.py | 425 +++++++++++++++++++++ catwalk/predictors.py | 255 +++++++++++++ catwalk/storage.py | 219 +++++++++++ catwalk/utils.py | 180 +++++++++ requirements.txt | 14 + requirements_dev.txt | 4 + setup.py | 2 + tests/test_estimators.py | 76 ++++ tests/test_evaluation.py | 178 +++++++++ tests/test_feature_importances.py | 45 +++ tests/test_integration.py | 159 ++++++++ tests/test_model_trainers.py | 327 ++++++++++++++++ tests/test_predictors.py | 285 ++++++++++++++ tests/test_storage.py | 119 ++++++ tests/test_utils.py | 106 ++++++ tests/utils.py | 127 +++++++ 23 files changed, 3671 insertions(+) create mode 100644 catwalk/db.py create mode 100644 catwalk/estimators/__init__.py create mode 100644 catwalk/estimators/classifiers.py create mode 100644 catwalk/estimators/dsapp_scaler.org create mode 100644 catwalk/estimators/transformers.py create mode 100644 catwalk/evaluation.py create mode 100644 catwalk/feature_importances.py create mode 100644 catwalk/model_trainers.py create mode 100644 catwalk/predictors.py create mode 100644 catwalk/storage.py create mode 100644 catwalk/utils.py create mode 100644 tests/test_estimators.py create mode 100644 tests/test_evaluation.py create mode 100644 tests/test_feature_importances.py create mode 100644 tests/test_integration.py create mode 100644 tests/test_model_trainers.py create mode 100644 tests/test_predictors.py create mode 100644 tests/test_storage.py create mode 100644 tests/test_utils.py create mode 100644 tests/utils.py diff --git a/catwalk/db.py b/catwalk/db.py new file mode 100644 index 0000000..3d80f37 --- /dev/null +++ b/catwalk/db.py @@ -0,0 +1,24 @@ +import yaml +from sqlalchemy import create_engine +from sqlalchemy.engine.url import URL +from sqlalchemy.pool import QueuePool + +from results_schema import * + + +def ensure_db(engine): + Base.metadata.create_all(engine) + + +def connect(poolclass=QueuePool): + with open('database.yaml') as f: + profile = yaml.load(f) + dbconfig = { + 'host': profile['host'], + 'username': profile['user'], + 'database': profile['db'], + 'password': profile['pass'], + 'port': profile['port'], + } + dburl = URL('postgres', **dbconfig) + return create_engine(dburl, poolclass=poolclass) diff --git a/catwalk/estimators/__init__.py b/catwalk/estimators/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/catwalk/estimators/classifiers.py b/catwalk/estimators/classifiers.py new file mode 100644 index 0000000..0ee7cf0 --- /dev/null +++ b/catwalk/estimators/classifiers.py @@ -0,0 +1,78 @@ +# coding: utf-8 + +from sklearn.base import BaseEstimator, ClassifierMixin +from sklearn.pipeline import Pipeline +from sklearn.preprocessing import MinMaxScaler +from sklearn.linear_model import LogisticRegression + +from catwalk.estimators.transformers import CutOff + +class ScaledLogisticRegression(BaseEstimator, ClassifierMixin): + """ + An in-place replacement for the scikit-learn's LogisticRegression. + + It incorporates the MaxMinScaler, and the CutOff as preparations + for the logistic regression. + """ + def __init__(self, penalty='l2', dual=False, tol=1e-4, C=1.0, + fit_intercept=True, intercept_scaling=1, class_weight=None, + random_state=None, solver='liblinear', max_iter=100, + multi_class='ovr', verbose=0, warm_start=False, n_jobs=1): + + + self.penalty = penalty + self.dual = dual + self.tol = tol + self.C = C + self.fit_intercept = fit_intercept + self.intercept_scaling = intercept_scaling + self.class_weight = class_weight + self.random_state = random_state + self.solver = solver + self.max_iter = max_iter + self.multi_class = multi_class + self.verbose = verbose + self.warm_start = warm_start + self.n_jobs = n_jobs + + self.minmax_scaler = MinMaxScaler() + self.dsapp_cutoff = CutOff() + self.lr = LogisticRegression(penalty=penalty, dual=dual, tol=tol, C=C, + fit_intercept=fit_intercept, intercept_scaling=intercept_scaling, class_weight=class_weight, + random_state=random_state, solver=solver, max_iter=max_iter, + multi_class=multi_class, verbose=verbose, warm_start=warm_start, n_jobs=n_jobs) + + self.pipeline =Pipeline([ + ('minmax_scaler', self.minmax_scaler), + ('dsapp_cutoff', self.dsapp_cutoff), + ('lr', self.lr) + ]) + + + def fit(self, X, y = None): + self.pipeline.fit(X, y) + + self.min_ = self.pipeline.named_steps['minmax_scaler'].min_ + self.scale_ = self.pipeline.named_steps['minmax_scaler'].scale_ + self.data_min_ = self.pipeline.named_steps['minmax_scaler'].data_min_ + self.data_max_ = self.pipeline.named_steps['minmax_scaler'].data_max_ + self.data_range_ = self.pipeline.named_steps['minmax_scaler'].data_range_ + + self.coef_ = self.pipeline.named_steps['lr'].coef_ + self.intercept_ = self.pipeline.named_steps['lr'].intercept_ + + self.classes_ = self.pipeline.named_steps['lr'].classes_ + + return self + + def predict_proba(self, X): + return self.pipeline.predict_proba(X) + + def predict_log_proba(self, X): + return self.pipeline.predict_log_proba(X) + + def predict(self, X): + return self.pipeline.predict(X) + + def score(self, X, y): + return self.pipeline.score(X,y) diff --git a/catwalk/estimators/dsapp_scaler.org b/catwalk/estimators/dsapp_scaler.org new file mode 100644 index 0000000..2ac8eea --- /dev/null +++ b/catwalk/estimators/dsapp_scaler.org @@ -0,0 +1,556 @@ +* CutOff Transformer + :PROPERTIES: + :header-args: :session + :END: + + +** The problem + + + #+BEGIN_SRC ipython + import pandas as pd + import numpy as np + from sklearn import preprocessing + from sklearn import datasets + #+END_SRC + + #+RESULTS: + + #+BEGIN_SRC ipython + dataset = datasets.load_breast_cancer() + + X = dataset.data + + y = dataset.target + #+END_SRC + + #+RESULTS: + + + #+BEGIN_SRC ipython + from sklearn.model_selection import train_test_split + + X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.3, random_state=12345) + + #+END_SRC + + #+RESULTS: + + + #+BEGIN_SRC ipython + minmax_scaler = preprocessing.MinMaxScaler().fit(X_train) + + X_train_minmax = minmax_scaler.transform(X_train) + #+END_SRC + + #+RESULTS: + + #+BEGIN_SRC ipython + np.amin(X_train_minmax, axis=0) + #+END_SRC + + #+RESULTS: + : array([ 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., + : 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., + : 0., 0., 0., 0.]) + + + #+BEGIN_SRC ipython + np.amax(X_train_minmax, axis=0) + #+END_SRC + + #+RESULTS: + : array([ 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., + : 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., + : 1., 1., 1., 1.]) + + + #+BEGIN_SRC ipython + X_test_minmax = minmax_scaler.transform(X_test) + + np.amin(X_test_minmax, axis=0) + #+END_SRC + + #+RESULTS: + : array([-0.0382801 , 0.04605809, -0.02937829, -0.01616379, 0.08919383, + : -0.01261026, 0. , 0. , 0.0540404 , 0.00643586, + : 0.00137606, 0.01617751, -0.00067897, 0.00262048, 0.04413193, + : -0.00778784, 0. , 0. , 0.03388304, 0.00349414, + : -0.03772888, 0.07462687, -0.03207402, -0.01367747, 0.06846431, + : 0.0077201 , 0. , 0. , 0.00902827, 0.03623343]) + + + + #+BEGIN_SRC ipython + np.amax(X_test_minmax, axis=0) + #+END_SRC + + #+RESULTS: + : array([ 0.95626536, 1.22697095, 0.95447432, 0.89181034, 0.81132075, + : 0.80898248, 0.96251172, 0.95079523, 0.77626263, 1.05370617, + : 0.39670469, 0.56219059, 0.38171308, 0.40586255, 1.36082713, + : 1.35660122, 2.57980456, 1.29070905, 0.72814769, 1.37603636, + : 0.89257236, 0.87553305, 0.88743254, 0.74588306, 1.02852679, + : 1.13188961, 1.30308077, 0.94707904, 1.20527441, 1.62954254]) + + +** Proposed solution + + Implement a /transformer/ + + #+BEGIN_SRC ipython :tangle transformers.py + # coding: utf-8 + + import warnings + + import numpy as np + + from sklearn.base import BaseEstimator, TransformerMixin + from sklearn.utils import check_array + from sklearn.utils.validation import FLOAT_DTYPES + + DEPRECATION_MSG_1D = ( + "Passing 1d arrays as data is deprecated in 0.17 and will " + "raise ValueError in 0.19. Reshape your data either using " + "X.reshape(-1, 1) if your data has a single feature or " + "X.reshape(1, -1) if it contains a single sample." + ) + + class CutOff(BaseEstimator, TransformerMixin): + """ + Transforms features cutting values out of established range + + + Args: + feature_range: Range of allowed values, default=`(0,1)` + + Usage: + The recommended way of using this is:: + + from sklearn.pipeline import Pipeline + + minmax_scaler = preprocessing.MinMaxScaler() + dsapp_cutoff = CutOff() + lr = linear_model.LogisticRegression() + + pipeline =Pipeline([ + ('minmax_scaler',minmax_scaler), + ('dsapp_cutoff', dsapp_cutoff), + ('lr', lr) + ]) + + pipeline.fit(X_train, y_train) + pipeline.predict(X_test) + + """ + def __init__(self, feature_range=(0,1), copy=True): + self.feature_range = feature_range + self.copy = copy + + def fit(self, X, y = None): + return self + + def transform(self, X): + feature_range = self.feature_range + + X = check_array(X, copy=self.copy, ensure_2d=False, dtype=FLOAT_DTYPES) + + if X.ndim == 1: + warnings.warn(DEPRECATION_MSG_1D, DeprecationWarning) + + + if np.any(X > feature_range[1]) or np.any(X < feature_range[0]): + warnings.warn(f"You got data that are out of the range:{feature_range}") + + X[X > feature_range[1]] = feature_range[1] + X[X < feature_range[0]] = feature_range[0] + + return X + #+END_SRC + #+RESULTS: + +*** Tests + + #+BEGIN_SRC ipython :tangle ../../tests/test_estimators.py + import numpy as np + + import warnings + + import pytest + + from catwalk.estimators.transformers import CutOff + from catwalk.estimators.classifiers import ScaledLogisticRegression + + from sklearn import linear_model + + from sklearn import datasets + from sklearn import preprocessing + from sklearn.pipeline import Pipeline + from sklearn.model_selection import train_test_split + + @pytest.fixture + def data(): + dataset = datasets.load_breast_cancer() + X = dataset.data + y = dataset.target + + X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.3, random_state=12345) + + return {'X_train':X_train, 'X_test':X_test, 'y_train':y_train, 'y_test':y_test} + + def test_cutoff_warning(): + X_data = [1, 2, 0.5, 0.7, 100, -1, -23, 0] + + cutoff = CutOff() + + with pytest.warns(UserWarning): + cutoff.fit_transform(X_data) + + + def test_cutoff_transformer(): + cutoff = CutOff() + + X_data = [1, 2, 0.5, 0.7, 100, -1, -23, 0] + + assert np.all(cutoff.fit_transform(X_data) == [1, 1, 0.5, 0.7, 1, 0, 0, 0]) + + def test_cutoff_inside_a_pipeline(data): + minmax_scaler = preprocessing.MinMaxScaler() + dsapp_cutoff = CutOff() + + pipeline =Pipeline([ + ('minmax_scaler',minmax_scaler), + ('dsapp_cutoff', dsapp_cutoff) + ]) + + pipeline.fit(data['X_train'], data['y_train']) + + X_fake_new_data = data['X_test'][-1,:] + 0.5 + + mms = preprocessing.MinMaxScaler().fit(data['X_train']) + + assert np.all(( mms.transform(X_fake_new_data) > 1 ) == (pipeline.transform(X_fake_new_data) == 1)) + #+END_SRC + + #+BEGIN_SRC ipython + from sklearn.pipeline import Pipeline + from sklearn import linear_model + + minmax_scaler = preprocessing.MinMaxScaler() + dsapp_cutoff = CutOff() + + pipeline =Pipeline([ + ('minmax_scaler',minmax_scaler), + ('dsapp_cutoff', dsapp_cutoff) + ]) + #+END_SRC + +*** Using in a full pipeline + + #+BEGIN_SRC ipython + from sklearn.pipeline import Pipeline + from sklearn import linear_model + + minmax_scaler = preprocessing.MinMaxScaler() + dsapp_cutoff = CutOff() + lr = linear_model.LogisticRegression() + + pipeline =Pipeline([ + ('minmax_scaler',minmax_scaler), + ('dsapp_cutoff', dsapp_cutoff), + ('lr', lr) + ]) + #+END_SRC + + #+RESULTS: + + #+BEGIN_SRC ipython + pipeline.fit(X_train, y_train) + #+END_SRC + + #+RESULTS: + : Pipeline(steps=[('minmax_scaler', MinMaxScaler(copy=True, feature_range=(0, 1))), ('dsapp_cutoff', CutOff(copy=True, feature_range=(0, 1))), ('lr', LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True, + : intercept_scaling=1, max_iter=100, multi_class='ovr', n_jobs=1, + : penalty='l2', random_state=None, solver='liblinear', tol=0.0001, + : verbose=0, warm_start=False))]) + + + #+BEGIN_SRC ipython + pipeline.predict(X_test) + #+END_SRC + + #+RESULTS: + : array([1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 0, 0, 1, 0, + : 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 0, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, + : 1, 0, 1, 0, 1, 1, 0, 1, 1, 1, 0, 1, 0, 1, 1, 1, 1, 1, 0, 1, 1, 0, 0, + : 0, 1, 1, 1, 1, 0, 1, 0, 1, 1, 1, 1, 1, 0, 1, 0, 1, 0, 1, 1, 1, 0, 0, + : 0, 1, 1, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 1, 0, 0, 1, 1, 0, + : 1, 1, 1, 1, 1, 0, 1, 0, 1, 0, 1, 1, 0, 1, 1, 0, 0, 1, 0, 1, 1, 1, 1, + : 1, 1, 0, 1, 0, 1, 1, 1, 1, 0, 1, 0, 1, 1, 1, 0, 1, 0, 0, 1, 0, 1, 1, + : 0, 1, 1, 1, 0, 1, 1, 1, 0, 1]) + + + #+BEGIN_SRC ipython + pipeline.score(X_test, y_test) + #+END_SRC + + #+RESULTS: + : 0.9590643274853801 + + #+BEGIN_SRC ipython + pipeline.predict(X_test[-1,:]) + #+END_SRC + + #+RESULTS: + : array([1]) + +*** Storing the pipeline + + + #+BEGIN_SRC ipython + from sklearn.externals import joblib + joblib.dump(pipeline, 'dsapp_pipeline.plk') + #+END_SRC + + #+RESULTS: + | dsapp_pipeline.plk | + + + #+BEGIN_SRC ipython + pipeline_reloaded =joblib.load('dsapp_pipeline.plk') + pipeline_reloaded.transform(X_fake_new_data + 0.5) + #+END_SRC + + #+RESULTS: + : array([ 0.27960688, 0.28257261, 0.2540902 , 0.14362069, 1. , + : 1. , 1. , 1. , 1. , 1. , + : 0.21651276, 0.19965523, 0.0452458 , 0.0212328 , 1. , + : 1. , 1. , 1. , 1. , 1. , + : 0.19521559, 0.22414712, 0.16787458, 0.08615063, 1. , + : 0.74621408, 0.7536428 , 1. , 1. , 1. ]) + + +* "ScaledLogisticRegression" model + :PROPERTIES: + :header-args: :session + :END: + + We could encapsulate the functionality of the previous pipeline in a + class, so, we can forget about all this details and just use it as a + inplace replacemente for =scikit-learn= =Logisticregression= class. + + #+BEGIN_SRC ipython :tangle classifiers.py + # coding: utf-8 + + from sklearn.base import BaseEstimator, ClassifierMixin + from sklearn.pipeline import Pipeline + from sklearn.preprocessing import MinMaxScaler + from sklearn.linear_model import LogisticRegression + + from catwalk.estimators.transformers import CutOff + + class ScaledLogisticRegression(BaseEstimator, ClassifierMixin): + """ + An in-place replacement for the scikit-learn's LogisticRegression. + + It incorporates the MaxMinScaler, and the CutOff as preparations + for the logistic regression. + """ + def __init__(self, penalty='l2', dual=False, tol=1e-4, C=1.0, + fit_intercept=True, intercept_scaling=1, class_weight=None, + random_state=None, solver='liblinear', max_iter=100, + multi_class='ovr', verbose=0, warm_start=False, n_jobs=1): + + + self.penalty = penalty + self.dual = dual + self.tol = tol + self.C = C + self.fit_intercept = fit_intercept + self.intercept_scaling = intercept_scaling + self.class_weight = class_weight + self.random_state = random_state + self.solver = solver + self.max_iter = max_iter + self.multi_class = multi_class + self.verbose = verbose + self.warm_start = warm_start + self.n_jobs = n_jobs + + self.minmax_scaler = MinMaxScaler() + self.dsapp_cutoff = CutOff() + self.lr = LogisticRegression(penalty=penalty, dual=dual, tol=tol, C=C, + fit_intercept=fit_intercept, intercept_scaling=intercept_scaling, class_weight=class_weight, + random_state=random_state, solver=solver, max_iter=max_iter, + multi_class=multi_class, verbose=verbose, warm_start=warm_start, n_jobs=n_jobs) + + self.pipeline =Pipeline([ + ('minmax_scaler', self.minmax_scaler), + ('dsapp_cutoff', self.dsapp_cutoff), + ('lr', self.lr) + ]) + + + def fit(self, X, y = None): + self.pipeline.fit(X, y) + + self.min_ = self.pipeline.named_steps['minmax_scaler'].min_ + self.scale_ = self.pipeline.named_steps['minmax_scaler'].scale_ + self.data_min_ = self.pipeline.named_steps['minmax_scaler'].data_min_ + self.data_max_ = self.pipeline.named_steps['minmax_scaler'].data_max_ + self.data_range_ = self.pipeline.named_steps['minmax_scaler'].data_range_ + + self.coef_ = self.pipeline.named_steps['lr'].coef_ + self.intercept_ = self.pipeline.named_steps['lr'].intercept_ + + self.classes_ = self.pipeline.named_steps['lr'].classes_ + + return self + + def predict_proba(self, X): + return self.pipeline.predict_proba(X) + + def predict_log_proba(self, X): + return self.pipeline.predict_log_proba(X) + + def predict(self, X): + return self.pipeline.predict(X) + + def score(self, X, y): + return self.pipeline.score(X,y) + #+END_SRC + + #+RESULTS: + + + We can see that, this class reproduces the behaviour that the =pipeline= + of the last section. + + + #+BEGIN_SRC ipython :tangle ../../tests/test_estimators.py + def test_dsapp_lr(data): + dsapp_lr = ScaledLogisticRegression() + dsapp_lr.fit(data['X_train'], data['y_train']) + + minmax_scaler = preprocessing.MinMaxScaler() + dsapp_cutoff = CutOff() + lr = linear_model.LogisticRegression() + + pipeline =Pipeline([ + ('minmax_scaler',minmax_scaler), + ('dsapp_cutoff', dsapp_cutoff), + ('lr', lr) + ]) + + pipeline.fit(data['X_train'], data['y_train']) + + assert np.all(dsapp_lr.predict(data['X_test']) == pipeline.predict(data['X_test'])) + #+END_SRC + + #+RESULTS: + + + +* Another approach (abandoned) + + #+BEGIN_SRC ipython + class DsappMinMaxScaler(preprocessing.MinMaxScaler): + def transform(self, X): + X_bad = super(DsappMinMaxScaler, self).transform(X) + X_bad[X_bad > 1] = 1 + X_bad[X_bad < 0] = 0 + return X_bad + #+END_SRC + + + #+BEGIN_SRC ipython + dsapp_scaler = DsappMinMaxScaler().fit(X_train) + X_train_dsapp = dsapp_scaler.transform(X_train) + np.amin(X_train_dsapp, axis=0) + #+END_SRC + + #+RESULTS: + : array([ 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., + : 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., + : 0., 0., 0., 0.]) + + #+BEGIN_SRC ipython + X_test_dsapp = dsapp_scaler.transform(X_test) + np.amin(X_test_dsapp, axis=0) + #+END_SRC + + + #+RESULTS: + : array([ 0.95626536, 1.22697095, 0.95447432, 0.89181034, 0.81132075, + : 0.80898248, 0.96251172, 0.95079523, 0.77626263, 1.05370617, + : 0.39670469, 0.56219059, 0.38171308, 0.40586255, 1.36082713, + : 1.35660122, 2.57980456, 1.29070905, 0.72814769, 1.37603636, + : 0.89257236, 0.87553305, 0.88743254, 0.74588306, 1.02852679, + : 1.13188961, 1.30308077, 0.94707904, 1.20527441, 1.62954254]) + + #+RESULTS: + : array([ 0. , 0.04605809, 0. , 0. , 0.08919383, + : 0. , 0. , 0. , 0.0540404 , 0.00643586, + : 0.00137606, 0.01617751, 0. , 0.00262048, 0.04413193, + : 0. , 0. , 0. , 0.03388304, 0.00349414, + : 0. , 0.07462687, 0. , 0. , 0.06846431, + : 0.0077201 , 0. , 0. , 0.00902827, 0.03623343]) + + + #+BEGIN_SRC ipython + np.amax(X_test_dsapp, axis=0) + #+END_SRC + + #+RESULTS: + : array([ 0.95626536, 1. , 0.95447432, 0.89181034, 0.81132075, + : 0.80898248, 0.96251172, 0.95079523, 0.77626263, 1. , + : 0.39670469, 0.56219059, 0.38171308, 0.40586255, 1. , + : 1. , 1. , 1. , 0.72814769, 1. , + : 0.89257236, 0.87553305, 0.88743254, 0.74588306, 1. , + : 1. , 1. , 0.94707904, 1. , 1. ]) + + #+BEGIN_SRC ipython + np.amax(X_test_minmax, axis=0) + #+END_SRC + + #+RESULTS: + : array([ 0.95626536, 1.22697095, 0.95447432, 0.89181034, 0.81132075, + : 0.80898248, 0.96251172, 0.95079523, 0.77626263, 1.05370617, + : 0.39670469, 0.56219059, 0.38171308, 0.40586255, 1.36082713, + : 1.35660122, 2.57980456, 1.29070905, 0.72814769, 1.37603636, + : 0.89257236, 0.87553305, 0.88743254, 0.74588306, 1.02852679, + : 1.13188961, 1.30308077, 0.94707904, 1.20527441, 1.62954254]) + + + + #+BEGIN_SRC ipython + X_fake_new_data = X_test[-1,:] + 0.5 + X_fake_new_data + #+END_SRC + + #+RESULTS: + #+begin_example + array([ 1.34500000e+01, 1.65200000e+01, 8.36400000e+01, + 5.14200000e+02, 6.00500000e-01, 5.79430000e-01, + 5.61550000e-01, 5.33700000e-01, 6.73000000e-01, + 5.64700000e-01, 7.09400000e-01, 1.26360000e+00, + 1.73100000e+00, 1.81700000e+01, 5.08725000e-01, + 5.20030000e-01, 5.23350000e-01, 5.11320000e-01, + 5.26250000e-01, 5.04726000e-01, 1.42400000e+01, + 2.04300000e+01, 8.93100000e+01, 5.85900000e+02, + 6.48300000e-01, 7.06800000e-01, 7.24100000e-01, + 6.05600000e-01, 8.38000000e-01, 5.95840000e-01]) + #+end_example + + + #+BEGIN_SRC ipython + dsapp_scaler.transform(X_fake_new_data) + #+END_SRC + + #+RESULTS: + : array([ 0.25503686, 0.26182573, 0.2505335 , 0.14340517, 0.43215672, + : 0.17390359, 0.14421275, 0.16749503, 0.33838384, 0.3271194 , + : 0.03545175, 0.08915311, 0.02167045, 0.02029892, 0.32437434, + : 0.1743862 , 0.15211726, 0.27677262, 0.25845669, 0.18213281, + : 0.17675724, 0.2108209 , 0.16530455, 0.08602606, 0.52387421, + : 0.19713159, 0.23324313, 0.3628866 , 0.43121882, 0.4360838 ]) diff --git a/catwalk/estimators/transformers.py b/catwalk/estimators/transformers.py new file mode 100644 index 0000000..bcf75c2 --- /dev/null +++ b/catwalk/estimators/transformers.py @@ -0,0 +1,71 @@ +# coding: utf-8 + +import warnings + +import numpy as np + +from sklearn.base import BaseEstimator, TransformerMixin +from sklearn.utils import check_array +from sklearn.utils.validation import FLOAT_DTYPES + +DEPRECATION_MSG_1D = ( + "Passing 1d arrays as data is deprecated in 0.17 and will " + "raise ValueError in 0.19. Reshape your data either using " + "X.reshape(-1, 1) if your data has a single feature or " + "X.reshape(1, -1) if it contains a single sample." +) + +class CutOff(BaseEstimator, TransformerMixin): + """ + Transforms features cutting values out of established range + + + Args: + feature_range: Range of allowed values, default=`(0,1)` + + Usage: + The recommended way of using this is:: + + from sklearn.pipeline import Pipeline + + minmax_scaler = preprocessing.MinMaxScaler() + dsapp_cutoff = CutOff() + lr = linear_model.LogisticRegression() + + pipeline =Pipeline([ + ('minmax_scaler',minmax_scaler), + ('dsapp_cutoff', dsapp_cutoff), + ('lr', lr) + ]) + + pipeline.fit(X_train, y_train) + pipeline.predict(X_test) + + """ + def __init__(self, feature_range=(0,1), copy=True): + self.feature_range = feature_range + self.copy = copy + + def fit(self, X, y = None): + return self + + def transform(self, X): + feature_range = self.feature_range + + X = check_array(X, copy=self.copy, ensure_2d=False, dtype=FLOAT_DTYPES) + + if X.ndim == 1: + warnings.warn(DEPRECATION_MSG_1D, DeprecationWarning) + + + if np.any(X > feature_range[1]) or np.any(X < feature_range[0]): + warnings.warn( + "You got data that are out of the range: {}" + .format(feature_range) + ) + + + X[X > feature_range[1]] = feature_range[1] + X[X < feature_range[0]] = feature_range[0] + + return X diff --git a/catwalk/evaluation.py b/catwalk/evaluation.py new file mode 100644 index 0000000..6ed12e4 --- /dev/null +++ b/catwalk/evaluation.py @@ -0,0 +1,362 @@ +import numpy +from sklearn import metrics +from results_schema import Evaluation +from catwalk.utils import db_retry, sort_predictions_and_labels +from sqlalchemy.orm import sessionmaker +import logging +import time + + +"""Metric definitions + +Mostly just wrappers around sklearn.metrics functions, these functions +implement a generalized interface to metric calculations that can be stored +as a scalar in the database. + +All functions should take four parameters: +predictions_proba (1d array-like) Prediction probabilities +predictions_binary (1d array-like) Binarized predictions +labels (1d array-like) Ground truth target values +parameters (dict) Any needed hyperparameters in the implementation + +All functions should return: (float) the resulting score + +Functions defined here are meant to be used in ModelEvaluator.available_metrics +""" + + +def precision(_, predictions_binary, labels, parameters): + return metrics.precision_score(labels, predictions_binary, **parameters) + + +def recall(_, predictions_binary, labels, parameters): + return metrics.recall_score(labels, predictions_binary, **parameters) + + +def fbeta(_, predictions_binary, labels, parameters): + return metrics.fbeta_score(labels, predictions_binary, **parameters) + + +def f1(_, predictions_binary, labels, parameters): + return metrics.f1_score(labels, predictions_binary, **parameters) + + +def accuracy(_, predictions_binary, labels, parameters): + return metrics.accuracy_score(labels, predictions_binary, **parameters) + + +def roc_auc(predictions_proba, _, labels, parameters): + return metrics.roc_auc_score(labels, predictions_proba) + + +def avg_precision(predictions_proba, _, labels, parameters): + return metrics.average_precision_score(labels, predictions_proba) + + +def true_positives(_, predictions_binary, labels, parameters): + tp = [1 if x == 1 and y == 1 else 0 + for (x, y) in zip(predictions_binary, labels)] + return int(numpy.sum(tp)) + + +def false_positives(_, predictions_binary, labels, parameters): + fp = [1 if x == 1 and y == 0 else 0 + for (x, y) in zip(predictions_binary, labels)] + return int(numpy.sum(fp)) + + +def true_negatives(_, predictions_binary, labels, parameters): + tn = [1 if x == 0 and y == 0 else 0 + for (x, y) in zip(predictions_binary, labels)] + return int(numpy.sum(tn)) + + +def false_negatives(_, predictions_binary, labels, parameters): + fn = [1 if x == 0 and y == 1 else 0 + for (x, y) in zip(predictions_binary, labels)] + return int(numpy.sum(fn)) + + +def fpr(_, predictions_binary, labels, parameters): + fp = false_positives(_, predictions_binary, labels, parameters) + return float(fp / labels.count(0)) + + +class UnknownMetricError(ValueError): + """Signifies that a metric name was passed, but no matching computation + function is available + """ + pass + + +def generate_binary_at_x(test_predictions, x_value, unit='top_n'): + """Generate subset of predictions based on top% or absolute + + Args: + test_predictions (list) A list of predictions, sorted by risk desc + test_labels (list) A list of labels, sorted by risk desc + x_value (int) The percentile or absolute value desired + unit (string, default 'top_n') The subsetting method desired, + either percentile or top_n + + Returns: (list) The predictions subset + """ + if unit == 'percentile': + cutoff_index = int(len(test_predictions) * (x_value / 100.00)) + else: + cutoff_index = x_value + test_predictions_binary = [ + 1 if x < cutoff_index else 0 + for x in range(len(test_predictions)) + ] + return test_predictions_binary + + +class ModelEvaluator(object): + """An object that can score models based on its known metrics""" + + """Available metric calculation functions + + Each value is expected to be a function that takes in the following params + (predictions_proba, predictions_binary, labels, parameters) + and return a numeric score + """ + available_metrics = { + 'precision@': precision, + 'recall@': recall, + 'fbeta@': fbeta, + 'f1': f1, + 'accuracy': accuracy, + 'roc_auc': roc_auc, + 'average precision score': avg_precision, + 'true positives@': true_positives, + 'true negatives@': true_negatives, + 'false positives@': false_positives, + 'false negatives@': false_negatives, + 'fpr@': fpr, + } + + def __init__(self, metric_groups, db_engine, sort_seed=None, custom_metrics=None): + """ + Args: + metric_groups (list) A list of groups of metric/configurations + to use for evaluating all given models + + Each entry is a dict, with a list of metrics, and potentially + thresholds and parameter lists. Each metric is expected to + be a key in self.available_metrics + + Examples: + + metric_groups = [{ + 'metrics': ['precision@', 'recall@'], + 'thresholds': { + 'percentiles': [5.0, 10.0], + 'top_n': [5, 10] + } + }, { + 'metrics': ['f1'], + }, { + 'metrics': ['fbeta@'], + 'parameters': [{'beta': 0.75}, {'beta': 1.25}] + }] + + db_engine (sqlalchemy.engine) + custom_metrics (dict) Functions to generate metrics + not available by default + Each function is expected take in the following params: + (predictions_proba, predictions_binary, labels, parameters) + and return a numeric score + """ + self.metric_groups = metric_groups + self.db_engine = db_engine + self.sort_seed = sort_seed or int(time.time()) + if custom_metrics: + self.available_metrics.update(custom_metrics) + if self.db_engine: + self.sessionmaker = sessionmaker(bind=self.db_engine) + + def _generate_evaluations( + self, + metrics, + parameters, + threshold_config, + predictions_proba, + predictions_binary, + labels, + ): + """Generate evaluations based on config and create ORM objects to hold them + + Args: + metrics (list) names of metric to compute + parameters (list) dicts holding parameters to pass to metrics + threshold_config (dict) Unit type and value referring to how any + thresholds were computed. Combined with parameter string + to make a unique identifier for the parameter in the database + predictions_proba (list) Probability predictions + predictions_binary (list) Binary predictions + labels (list) True labels + + Returns: (list) results_schema.Evaluation objects + Raises: UnknownMetricError if a given metric is not present in + self.available_metrics + """ + evaluations = [] + num_labeled_examples = len(labels) + num_labeled_above_threshold = predictions_binary.count(1) + num_positive_labels = labels.count(1) + for metric in metrics: + if metric in self.available_metrics: + for parameter_combination in parameters: + value = self.available_metrics[metric]( + predictions_proba, + predictions_binary, + labels, + parameter_combination + ) + + full_params = parameter_combination.copy() + full_params.update(threshold_config) + parameter_string = '/'.join([ + '{}_{}'.format(val, key) + for key, val in full_params.items() + ]) + evaluations.append(Evaluation( + metric=metric, + parameter=parameter_string, + value=value, + num_labeled_examples=num_labeled_examples, + num_labeled_above_threshold=num_labeled_above_threshold, + num_positive_labels=num_positive_labels, + sort_seed=self.sort_seed + )) + else: + raise UnknownMetricError() + return evaluations + + def evaluate( + self, + predictions_proba, + labels, + model_id, + evaluation_start_time, + evaluation_end_time, + example_frequency + ): + """Evaluate a model based on predictions, and save the results + + Args: + predictions_proba (numpy.array) List of prediction probabilities + labels (numpy.array) The true labels for the prediction set + model_id (int) The database identifier of the model + evaluation_start_time (datetime.datetime) The time of the first prediction being evaluated + evaluation_end_time (datetime.datetime) The time of the last prediction being evaluated + example_frequency (string) How frequently predictions were generated + """ + predictions_proba_sorted, labels_sorted = sort_predictions_and_labels( + predictions_proba, + labels, + self.sort_seed + ) + labels_sorted = numpy.array(labels_sorted) + + evaluations = [] + for group in self.metric_groups: + parameters = group.get('parameters', [{}]) + if 'thresholds' not in group: + evaluations = evaluations + self._generate_evaluations( + group['metrics'], + parameters, + {}, + predictions_proba, + generate_binary_at_x( + predictions_proba_sorted, + 100, + unit='percentile' + ), + labels_sorted.tolist(), + ) + + logging.info('thresholding predictions for group') + for pct_thresh in group.get('thresholds', {}).get('percentiles', []): + predicted_classes = numpy.array(generate_binary_at_x( + predictions_proba_sorted, + pct_thresh, + unit='percentile' + )) + nan_mask = numpy.isfinite(labels_sorted) + predicted_classes = (predicted_classes[nan_mask]).tolist() + present_labels_sorted = (labels_sorted[nan_mask]).tolist() + evaluations = evaluations + self._generate_evaluations( + group['metrics'], + parameters, + {'pct': pct_thresh}, + None, + predicted_classes, + present_labels_sorted, + ) + + for abs_thresh in group.get('thresholds', {}).get('top_n', []): + predicted_classes = numpy.array(generate_binary_at_x( + predictions_proba_sorted, + abs_thresh, + unit='top_n' + )) + nan_mask = numpy.isfinite(labels_sorted) + predicted_classes = (predicted_classes[nan_mask]).tolist() + present_labels_sorted = (labels_sorted[nan_mask]).tolist() + evaluations = evaluations + self._generate_evaluations( + group['metrics'], + parameters, + {'abs': abs_thresh}, + None, + predicted_classes, + present_labels_sorted, + ) + + logging.info('Writing metrics to db') + self._write_to_db( + model_id, + evaluation_start_time, + evaluation_end_time, + example_frequency, + evaluations + ) + logging.info('Done writing metrics to db') + + @db_retry + def _write_to_db( + self, + model_id, + evaluation_start_time, + evaluation_end_time, + example_frequency, + evaluations + ): + """Write evaluation objects to the database + + Binds the model_id as as_of_date to the given ORM objects + and writes them to the database + + Args: + model_id (int) primary key of the model + as_of_date (datetime.date) Date the predictions were made as of + evaluations (list) results_schema.Evaluation objects + """ + session = self.sessionmaker() + session.query(Evaluation)\ + .filter_by( + model_id=model_id, + evaluation_start_time=evaluation_start_time, + evaluation_end_time=evaluation_end_time, + example_frequency=example_frequency + ).delete() + + for evaluation in evaluations: + evaluation.model_id = model_id + evaluation.evaluation_start_time = evaluation_start_time + evaluation.evaluation_end_time = evaluation_end_time + evaluation.example_frequency = example_frequency + session.add(evaluation) + session.commit() + session.close() diff --git a/catwalk/feature_importances.py b/catwalk/feature_importances.py new file mode 100644 index 0000000..cad9c76 --- /dev/null +++ b/catwalk/feature_importances.py @@ -0,0 +1,59 @@ +import warnings + +import numpy as np + +import sklearn + +def _ad_hoc_feature_importances(model): + """ + Get the "ad-hoc feature importances" for scikit-learn's models + lacking the `feature_importances_` attribute + + Args: + model: A trained model that has not a `feature_importances_` attribute + + Returns: + At this moment, this method only returns the odds ratio of both the + intercept and the coefficients given by sklearn's implementation of + the LogisticRegression. + The order of the odds ratio list is the standard + of the statistical packages (like R, SAS, etc) i.e. (intercept, coefficients) + """ + feature_importances = None + + if isinstance(model, (sklearn.linear_model.logistic.LogisticRegression)): + coef_odds_ratio = np.exp(model.coef_) + #intercept_odds_ratio = np.exp(model.intercept_[:,np.newaxis]) + ## We are ignoring the intercept + + ## NOTE: We need to squeeze this array so it have the correct dimensions + feature_importances = coef_odds_ratio.squeeze() + + return feature_importances + +def get_feature_importances(model): + """ + Get feature importances (from scikit-learn) of a trained model. + + Args: + model: Trained model + + Returns: + Feature importances, or failing that, None + """ + + feature_importances = None + + if hasattr(model, 'feature_importances_'): + feature_importances = model.feature_importances_ + else: + warnings.warn( + "\nThe selected algorithm, doesn't support a standard way" + "\nof calculate the importance of each feature used." + "\nFalling back to ad-hoc methods" + "\n(e.g. in LogisticRegression we will return Odd Ratios instead coefficients)" + ) + + feature_importances = _ad_hoc_feature_importances(model) + + return feature_importances diff --git a/catwalk/model_trainers.py b/catwalk/model_trainers.py new file mode 100644 index 0000000..b785437 --- /dev/null +++ b/catwalk/model_trainers.py @@ -0,0 +1,425 @@ +import sklearn +from sklearn.model_selection import ParameterGrid + +from sqlalchemy.orm import sessionmaker + +import importlib +import json +import logging +import datetime +import copy +import pandas +import numpy as np +import warnings + +from catwalk.utils import \ + filename_friendly_hash, \ + retrieve_model_id_from_hash, \ + db_retry + +from results_schema import Model, FeatureImportance + +from catwalk.feature_importances import \ + _ad_hoc_feature_importances, \ + get_feature_importances + +class ModelTrainer(object): + """Trains a series of classifiers using the same training set + Args: + project_path (string) path to project folder, + under which to cache model pickles + experiment_hash (string) foreign key to the results.experiments table + model_storage_engine (catwalk.storage.ModelStorageEngine) + db_engine (sqlalchemy.engine) + replace (bool) whether or not to replace existing versions of models + """ + def __init__( + self, + project_path, + experiment_hash, + model_storage_engine, + db_engine, + model_group_keys, + replace=True + ): + self.project_path = project_path + self.experiment_hash = experiment_hash + self.model_storage_engine = model_storage_engine + self.db_engine = db_engine + self.sessionmaker = sessionmaker(bind=self.db_engine) + self.model_group_keys = model_group_keys + self.replace = replace + + def unique_parameters(self, parameters): + return { + key: parameters[key] + for key in parameters.keys() + if key != 'n_jobs' + } + + def _model_hash(self, matrix_metadata, class_path, parameters): + """Generates a unique identifier for a trained model + based on attributes of the model that together define + equivalence; in other words, if we train a second model with these + same attributes there would be no reason to keep the old one) + + Args: + class_path (string): a full class path for the classifier + parameters (dict): all hyperparameters to be passed to the classifier + + Returns: (string) a unique identifier + """ + + unique = { + 'className': class_path, + 'parameters': self.unique_parameters(parameters), + 'project_path': self.project_path, + 'training_metadata': matrix_metadata + } + return filename_friendly_hash(unique) + + def _generate_model_configs(self, grid_config): + """Flattens a model/parameter grid configuration into individually + trainable model/parameter pairs + + Yields: (tuple) classpath and parameters + """ + for class_path, parameter_config in grid_config.items(): + for parameters in ParameterGrid(parameter_config): + yield class_path, parameters + + def _train(self, matrix_store, class_path, parameters): + """Fit a model to a training set. Works on any modeling class that + is available in this package's environment and implements .fit + + Args: + class_path (string) A full classpath to the model class + parameters (dict) hyperparameters to give to the model constructor + + Returns: + tuple of (fitted model, list of column names without label) + """ + module_name, class_name = class_path.rsplit(".", 1) + module = importlib.import_module(module_name) + cls = getattr(module, class_name) + instance = cls(**parameters) + y = matrix_store.labels() + + return instance.fit(matrix_store.matrix, y), matrix_store.matrix.columns + + @db_retry + def _write_model_to_db( + self, + class_path, + parameters, + feature_names, + model_hash, + trained_model, + model_group_id, + misc_db_parameters + ): + """Writes model and feature importance data to a database + Will overwrite the data of any previous versions + (any existing model that shares a hash) + + Args: + class_path (string) A full classpath to the model class + parameters (dict) hyperparameters to give to the model constructor + feature_names (list) feature names in order given to model + model_hash (string) a unique id for the model + trained_model (object) a trained model object + misc_db_parameters (dict) params to pass through to the database + """ + saved_model_id = retrieve_model_id_from_hash(self.db_engine, model_hash) + if saved_model_id: + # logging.warning('deleting existing model %s', existing_model.model_id) + # existing_model.delete(session) + # session.commit() + logging.warning('model meta data already stored %s', saved_model_id) + return saved_model_id + + session = self.sessionmaker() + model = Model( + model_hash=model_hash, + model_type=class_path, + model_parameters=parameters, + model_group_id=model_group_id, + experiment_hash=self.experiment_hash, + **misc_db_parameters + ) + session.add(model) + + feature_importance = get_feature_importances(trained_model) + temp_df = pandas.DataFrame({'feature_importance': feature_importance}) + features_index = temp_df.index.tolist() + rankings_abs = temp_df['feature_importance'].rank(method='dense', ascending=False) + rankings_pct = temp_df['feature_importance'].rank(method='dense', ascending=False, pct=True) + for feature_index, importance, rank_abs, rank_pct in zip( + features_index, + feature_importance, + rankings_abs, + rankings_pct): + feature_importance = FeatureImportance( + model=model, + feature_importance=round(float(importance), 10), + feature=feature_names[feature_index], + rank_abs=int(rank_abs), + rank_pct=round(float(rank_pct), 10) + ) + session.add(feature_importance) + session.commit() + model_id = model.model_id + session.close() + return model_id + + def _train_and_store_model( + self, + matrix_store, + class_path, + parameters, + model_hash, + model_store, + misc_db_parameters + ): + """Train a model, cache it, and write metadata to a database + + Args: + matrix_store(catwalk.storage.MatrixStore) a matrix and metadata + class_path (string) A full classpath to the model class + parameters (dict) hyperparameters to give to the model constructor + model_hash (string) a unique id for the model + model_store (catwalk.storage.Store) the place in which to store the model + misc_db_parameters (dict) params to pass through to the database + + Returns: (int) a database id for the model + """ + misc_db_parameters['run_time'] = datetime.datetime.now().isoformat() + trained_model, feature_names = self._train( + matrix_store, + class_path, + parameters, + ) + + unique_parameters = self.unique_parameters(parameters) + + model_group_id = self._get_model_group_id( + class_path, + unique_parameters, + matrix_store.metadata, + ) + logging.info('Trained model: %s', model_hash) + model_store.write(trained_model) + logging.info('Cached model: %s', model_hash) + model_id = self._write_model_to_db( + class_path, + unique_parameters, + feature_names, + model_hash, + trained_model, + model_group_id, + misc_db_parameters + ) + logging.info('Wrote model to db: %s', model_hash) + return model_id + + def _get_model_group_id( + self, + class_path, + parameters, + matrix_metadata, + ): + """ + Returns model group id using store procedure 'get_model_group_id' which will + return the same value for models with the same class_path, parameters, + features, and model_config + + Args: + class_path (string) A full classpath to the model class + parameters (dict) hyperparameters to give to the model constructor + matrix_metadata (dict) stored metadata about the train matrix + + Returns: (int) a database id for the model group id + """ + feature_names = matrix_metadata['feature_names'] + model_config = {} + for model_group_key in self.model_group_keys: + model_config[model_group_key] = matrix_metadata[model_group_key] + db_conn = self.db_engine.raw_connection() + cur = db_conn.cursor() + cur.execute( "SELECT EXISTS ( " + " SELECT * " + " FROM pg_catalog.pg_proc " + " WHERE proname = 'get_model_group_id' ) ") + condition = cur.fetchone() + confition = condition[0] + + if condition: + query = ("SELECT get_model_group_id( " + " '{class_path}'::TEXT, " + " '{parameters}'::JSONB, " + " ARRAY{feature_names}::TEXT [] , " + " '{model_config}'::JSONB )" + .format(class_path=class_path, + parameters=json.dumps(parameters), + feature_names=feature_names, + model_config=json.dumps(model_config, sort_keys=True))) + + cur.execute(query) + db_conn.commit() + model_group_id = cur.fetchone() + model_group_id = model_group_id[0] + + else: + logging.info("Could not found store procedure public.pivot_table") + model_group_id = None + db_conn.close() + + logging.debug('Model_group_id = {}'.format(model_group_id)) + return model_group_id + + def generate_trained_models( + self, + grid_config, + misc_db_parameters, + matrix_store + ): + """Train and store configured models, yielding the ids one by one + + Args: + grid_config (dict) of format {classpath: hyperparameter dicts} + example: { 'sklearn.ensemble.RandomForestClassifier': { + 'n_estimators': [1,10,100,1000,10000], + 'max_depth': [1,5,10,20,50,100], + 'max_features': ['sqrt','log2'], + 'min_samples_split': [2,5,10] + } } + misc_db_parameters (dict) params to pass through to the database + matrix_store (catwalk.storage.MatrixStore) a matrix and metadata + + Yields: (int) model ids + """ + for train_task in self.generate_train_tasks( + grid_config, + misc_db_parameters, + matrix_store + ): + yield self.process_train_task(**train_task) + + def train_models( + self, + grid_config, + misc_db_parameters, + matrix_store + ): + """Train and store configured models + + Args: + grid_config (dict) of format {classpath: hyperparameter dicts} + example: { 'sklearn.ensemble.RandomForestClassifier': { + 'n_estimators': [1,10,100,1000,10000], + 'max_depth': [1,5,10,20,50,100], + 'max_features': ['sqrt','log2'], + 'min_samples_split': [2,5,10] + } } + misc_db_parameters (dict) params to pass through to the database + matrix_store(catwalk.storage.MatrixStore) a matrix and metadata + + Returns: + (list) of model ids + """ + return [ + model_id for model_id in self.generate_trained_models( + grid_config, + misc_db_parameters, + matrix_store + ) + ] + + def process_train_task( + self, + matrix_store, + class_path, + parameters, + model_hash, + misc_db_parameters + ): + """Trains and stores a model, or skips it and returns the existing id + + Args: + matrix_store (catwalk.storage.MatrixStore) a matrix and metadata + class_path (string): a full class path for the classifier + parameters (dict): all hyperparameters to be passed to the classifier + model_hash (string) a unique id for the model + misc_db_parameters (dict) params to pass through to the database + Returns: (int) model id + """ + model_store = self.model_storage_engine.get_store(model_hash) + saved_model_id = retrieve_model_id_from_hash(self.db_engine, model_hash) + if not self.replace and model_store.exists() and saved_model_id: + logging.info('Skipping %s/%s', class_path, parameters) + return saved_model_id + + if self.replace: + reason = 'replace flag has been set' + elif not model_store.exists(): + reason = 'model pickle not found in store' + elif not saved_model_id: + reason = 'model metadata not found' + + logging.info('Training %s/%s: %s', class_path, parameters, reason) + model_id = self._train_and_store_model( + matrix_store, + class_path, + parameters, + model_hash, + model_store, + misc_db_parameters + ) + return model_id + + def generate_train_tasks( + self, + grid_config, + misc_db_parameters, + matrix_store=None + ): + """Train and store configured models, yielding the ids one by one + + Args: + grid_config (dict) of format {classpath: hyperparameter dicts} + example: { 'sklearn.ensemble.RandomForestClassifier': { + 'n_estimators': [1,10,100,1000,10000], + 'max_depth': [1,5,10,20,50,100], + 'max_features': ['sqrt','log2'], + 'min_samples_split': [2,5,10] + } } + misc_db_parameters (dict) params to pass through to the database + + Returns: (list) training task definitions, suitable for process_train_task kwargs + """ + matrix_store = matrix_store or self.matrix_store + misc_db_parameters = copy.deepcopy(misc_db_parameters) + misc_db_parameters['batch_run_time'] = datetime.datetime.now().isoformat() + misc_db_parameters['train_end_time'] = matrix_store.metadata['end_time'] + misc_db_parameters['train_label_window'] = matrix_store.metadata['label_window'] + misc_db_parameters['train_matrix_uuid'] = matrix_store.uuid + + tasks = [] + + for class_path, parameters in self._generate_model_configs(grid_config): + model_hash = self._model_hash(matrix_store.metadata, class_path, parameters) + + if any(task['model_hash'] == model_hash for task in tasks): + logging.warning('Skipping model_hash %s because another' \ + 'equivalent one found in this batch.' \ + 'Classpath: %s -- Hyperparameters: %s', + model_hash, class_path, parameters) + continue + tasks.append({ + 'matrix_store': matrix_store, + 'class_path': class_path, + 'parameters': parameters, + 'model_hash': model_hash, + 'misc_db_parameters': misc_db_parameters, + }) + return tasks diff --git a/catwalk/predictors.py b/catwalk/predictors.py new file mode 100644 index 0000000..90e51ab --- /dev/null +++ b/catwalk/predictors.py @@ -0,0 +1,255 @@ +from results_schema import Model, Prediction +from sqlalchemy.orm import sessionmaker +from datetime import datetime +import pandas +import logging +import math +import numpy +import tempfile +import csv +import postgres_copy +from catwalk.utils import db_retry + + +class ModelNotFoundError(ValueError): + pass + + +class Predictor(object): + expected_matrix_ts_format = '%Y-%m-%d %H:%M:%S' + + def __init__( + self, + project_path, + model_storage_engine, + db_engine, + replace=True + ): + """Encapsulates the task of generating predictions on an arbitrary + dataset and storing the results + + Args: + project_path (string) the path under which to store project data + model_storage_engine (catwalk.storage.ModelStorageEngine) + db_engine (sqlalchemy.engine) + """ + self.project_path = project_path + self.model_storage_engine = model_storage_engine + self.db_engine = db_engine + if self.db_engine: + self.sessionmaker = sessionmaker(bind=self.db_engine) + self.replace = replace + + @db_retry + def _retrieve_model_hash(self, model_id): + """Retrieves the model hash associated with a given model id + + Args: + model_id (int) The id of a given model in the database + + Returns: (str) the stored hash of the model + """ + try: + session = self.sessionmaker() + model_hash = session.query(Model).get(model_id).model_hash + finally: + session.close() + return model_hash + + @db_retry + def load_model(self, model_id): + """Downloads the cached model associated with a given model id + + Args: + model_id (int) The id of a given model in the database + + Returns: + A python object which implements .predict() + """ + + model_hash = self._retrieve_model_hash(model_id) + logging.info('Checking for model_hash %s in store', model_hash) + model_store = self.model_storage_engine.get_store(model_hash) + if model_store.exists(): + return model_store.load() + + @db_retry + def delete_model(self, model_id): + """Deletes the cached model associated with a given model id + + Args: + model_id (int) The id of a given model in the database + """ + model_hash = self._retrieve_model_hash(model_id) + model_store = self.model_storage_engine.get_store(model_hash) + model_store.delete() + + @db_retry + def _existing_predictions(self, session, model_id, matrix_store): + return session.query(Prediction)\ + .filter_by(model_id=model_id)\ + .filter(Prediction.as_of_date.in_(self._as_of_dates(matrix_store))) + + def _as_of_dates(self, matrix_store): + matrix = matrix_store.matrix + if 'as_of_date' in matrix.index.names: + return matrix.index.levels[ + matrix.index.names.index('as_of_date') + ].tolist() + else: + return [matrix_store.metadata['end_time']] + + @db_retry + def _load_saved_predictions(self, existing_predictions, matrix_store): + index = matrix_store.matrix.index + score_lookup = {} + for prediction in existing_predictions: + score_lookup[( + prediction.entity_id, + prediction.as_of_date.date().isoformat() + )] = prediction.score + if 'as_of_date' in index.names: + score_iterator = (score_lookup[(entity_id, datetime.strptime(dt, self.expected_matrix_ts_format).date().isoformat())] for entity_id, dt in index) + else: + as_of_date = matrix_store.metadata['end_time'].date().isoformat() + score_iterator = (score_lookup[(row, as_of_date)] for row in index) + return numpy.fromiter(score_iterator, float) + + @db_retry + def _write_to_db( + self, + model_id, + matrix_store, + predictions, + labels, + misc_db_parameters, + ): + """Writes given predictions to database + + entity_ids, predictions, labels are expected to be in the same order + + Args: + model_id (int) the id of the model associated with the given predictions + matrix_store (catwalk.storage.MatrixStore) the matrix and metadata + entity_ids (iterable) entity ids that predictions were made on + predictions (iterable) predicted values + labels (iterable) labels of prediction set (int) the id of the model to predict based off of + """ + session = self.sessionmaker() + self._existing_predictions(session, model_id, matrix_store)\ + .delete(synchronize_session=False) + session.expire_all() + db_objects = [] + test_label_window = matrix_store.metadata['label_window'] + logging.warning(test_label_window) + + if 'as_of_date' in matrix_store.matrix.index.names: + session.commit() + session.close() + with tempfile.TemporaryFile(mode='w+') as f: + writer = csv.writer(f, quoting=csv.QUOTE_MINIMAL) + for index, score, label in zip( + matrix_store.matrix.index, + predictions, + labels + ): + entity_id, as_of_date = index + prediction = Prediction( + model_id=int(model_id), + entity_id=int(entity_id), + as_of_date=as_of_date, + score=float(score), + label_value=int(label) if not math.isnan(label) else None, + matrix_uuid=matrix_store.uuid, + test_label_window=test_label_window, + **misc_db_parameters + ) + writer.writerow([ + prediction.model_id, + prediction.entity_id, + prediction.as_of_date, + prediction.score, + prediction.label_value, + prediction.rank_abs, + prediction.rank_pct, + prediction.matrix_uuid, + prediction.test_label_window + ]) + f.seek(0) + postgres_copy.copy_from(f, Prediction, self.db_engine, format='csv') + else: + temp_df = pandas.DataFrame({'score': predictions}) + rankings_abs = temp_df['score'].rank(method='dense', ascending=False) + rankings_pct = temp_df['score'].rank(method='dense', ascending=False, pct=True) + for entity_id, score, label, rank_abs, rank_pct in zip( + matrix_store.matrix.index, + predictions, + labels, + rankings_abs, + rankings_pct + ): + db_objects.append(Prediction( + model_id=int(model_id), + entity_id=int(entity_id), + as_of_date=matrix_store.metadata['end_time'], + score=round(float(score), 10), + label_value=int(label) if not math.isnan(label) else None, + rank_abs=int(rank_abs), + rank_pct=round(float(rank_pct), 10), + matrix_uuid=matrix_store.uuid, + test_label_window=test_label_window, + **misc_db_parameters + )) + + session.bulk_save_objects(db_objects) + session.commit() + session.close() + + + def predict(self, model_id, matrix_store, misc_db_parameters, train_matrix_columns): + """Generate predictions and store them in the database + + Args: + model_id (int) the id of the trained model to predict based off of + matrix_store (catwalk.storage.MatrixStore) a wrapper for the + prediction matrix and metadata + misc_db_parameters (dict): attributes and values to add to each + Prediction object in the results schema + train_matrix_columns (list): The order of columns that the model + was trained on + + Returns: + (numpy.Array) the generated prediction values + """ + session = self.sessionmaker() + if not self.replace: + existing_predictions = self._existing_predictions( + session, + model_id, + matrix_store + ) + index = matrix_store.matrix.index + if existing_predictions.count() == len(index): + logging.info('Found predictions, returning saved versions') + return self._load_saved_predictions( + existing_predictions, + matrix_store + ) + + model = self.load_model(model_id) + if not model: + raise ModelNotFoundError('Model id {} not found'.format(model_id)) + + labels = matrix_store.labels() + predictions_proba = model.predict_proba( + matrix_store.matrix_with_sorted_columns(train_matrix_columns) + ) + + self._write_to_db( + model_id, + matrix_store, + predictions_proba[:,1], + labels, + misc_db_parameters + ) + return predictions_proba[:,1] diff --git a/catwalk/storage.py b/catwalk/storage.py new file mode 100644 index 0000000..53e263c --- /dev/null +++ b/catwalk/storage.py @@ -0,0 +1,219 @@ +from .utils import upload_object_to_key, key_exists, model_cache_key, download_object +import os +import pickle +import pandas +import yaml +import logging + + +class Store(object): + def __init__(self, path): + self.path = path + + def exists(self): + raise NotImplementedError + + def load(self): + raise NotImplementedError + + def write(self, obj): + raise NotImplementedError + + +class S3Store(Store): + def exists(self): + return key_exists(self.path) + + def write(self, obj): + upload_object_to_key(obj, self.path) + + def load(self): + return download_object(self.path) + + def delete(self): + self.path.delete() + + +class FSStore(Store): + def exists(self): + return os.path.isfile(self.path) + + def write(self, obj): + os.makedirs(os.path.dirname(self.path), exist_ok=True) + with open(self.path, 'w+b') as f: + pickle.dump(obj, f) + + def load(self): + with open(self.path, 'rb') as f: + return pickle.load(f) + + def delete(self): + os.remove(self.path) + + +class MemoryStore(Store): + store = None + + def exists(self): + return self.store is not None + + def write(self, obj): + self.store = obj + + def load(self): + return self.store + + def delete(self): + self.store = None + + +class ModelStorageEngine(object): + def __init__(self, project_path): + self.project_path = project_path + + def get_store(self, model_hash): + pass + + +class S3ModelStorageEngine(ModelStorageEngine): + def __init__(self, s3_conn, *args, **kwargs): + super(S3ModelStorageEngine, self).__init__(*args, **kwargs) + self.s3_conn = s3_conn + + def get_store(self, model_hash): + return S3Store(model_cache_key( + self.project_path, + model_hash, + self.s3_conn + )) + + +class FSModelStorageEngine(ModelStorageEngine): + def __init__(self, *args, **kwargs): + super(FSModelStorageEngine, self).__init__(*args, **kwargs) + os.makedirs(os.path.join(self.project_path, 'trained_models'), exist_ok=True) + + def get_store(self, model_hash): + return FSStore('/'.join([ + self.project_path, + 'trained_models', + model_hash + ])) + + +class InMemoryModelStorageEngine(ModelStorageEngine): + stores = {} + + def get_store(self, model_hash): + if model_hash not in self.stores: + self.stores[model_hash] = MemoryStore(model_hash) + return self.stores[model_hash] + + +class MatrixStore(object): + matrix = None + metadata = None + _labels = None + + def labels(self): + if self._labels is not None: + logging.debug('using stored labels') + return self._labels + else: + logging.debug('popping labels from matrix') + self._labels = self.matrix.pop(self.metadata['label_name']) + return self._labels + + @property + def uuid(self): + return self.metadata['metta-uuid'] + + def columns(self, include_label=False): + columns = self.matrix.columns.tolist() + if include_label: + return columns + else: + return [ + col for col in columns + if col != self.metadata['label_name'] + ] + + def matrix_with_sorted_columns(self, columns): + columnset = set(self.columns()) + desired_columnset = set(columns) + if columnset == desired_columnset: + if self.columns() != columns: + logging.warning('Column orders not the same, re-ordering') + return self.matrix[columns] + else: + if columnset.issuperset(desired_columnset): + raise ValueError(''' + Columnset is superset of desired columnset. Extra items: %s + ''', columnset - desired_columnset) + elif columnset.issubset(desired_columnset): + raise ValueError(''' + Columnset is subset of desired columnset. Extra items: %s + ''', desired_columnset - columnset) + else: + raise ValueError(''' + Columnset and desired columnset mismatch. Unique items: %s + ''', columnset ^ desired_columnset) + + +class MettaMatrixStore(MatrixStore): + def __init__(self, matrix_path, metadata_path): + self.matrix = pandas.read_hdf(matrix_path) + with open(metadata_path) as f: + self.metadata = yaml.load(f) + + +class MettaCSVMatrixStore(MatrixStore): + def __init__(self, matrix_path, metadata_path): + self.matrix_path = matrix_path + self.metadata_path = metadata_path + self._matrix = None + self._metadata = None + + @property + def matrix(self): + if self._matrix is None: + self._load() + return self._matrix + + @property + def metadata(self): + if self._metadata is None: + self._load() + return self._metadata + + @property + def empty(self): + if not os.path.isfile(self.matrix_path): + return True + else: + return pandas.read_csv(self.matrix_path, nrows=1).empty + + def columns(self, include_label=False): + head_of_matrix = pandas.read_csv(self.matrix_path, nrows=1) + head_of_matrix.set_index(self.metadata['indices'], inplace=True) + columns = head_of_matrix.columns.tolist() + if include_label: + return columns + else: + return [ + col for col in columns + if col != self.metadata['label_name'] + ] + + def _load(self): + self._matrix = pandas.read_csv(self.matrix_path) + with open(self.metadata_path) as f: + self._metadata = yaml.load(f) + self._matrix.set_index(self.metadata['indices'], inplace=True) + + +class InMemoryMatrixStore(MatrixStore): + def __init__(self, matrix, metadata, labels=None): + self.matrix = matrix + self.metadata = metadata + self._labels = labels diff --git a/catwalk/utils.py b/catwalk/utils.py new file mode 100644 index 0000000..a8c6e84 --- /dev/null +++ b/catwalk/utils.py @@ -0,0 +1,180 @@ +import datetime +import pickle +import tempfile +import hashlib +import botocore +import pandas +import random +import yaml +import json +from results_schema import Experiment, Model +from retrying import retry +from sqlalchemy.orm import sessionmaker +import sqlalchemy + + +def split_s3_path(path): + """ + Args: + path: a string representing an s3 path including a bucket + (bucket_name/prefix/prefix2) + Returns: + A tuple containing the bucket name and full prefix) + """ + return path.split('/', 1) + + +def upload_object_to_key(obj, cache_key): + """Pickles object and uploads it to the given s3 key + + Args: + obj (object) any picklable Python object + cache_key (boto3.s3.Object) an s3 key + """ + with tempfile.NamedTemporaryFile('w+b') as f: + pickle.dump(obj, f) + f.seek(0) + cache_key.upload_file(f.name) + + +def download_object(cache_key): + with tempfile.NamedTemporaryFile() as f: + cache_key.download_fileobj(f) + f.seek(0) + return pickle.load(f) + + +def model_cache_key(project_path, model_id, s3_conn): + """Generates an s3 key for a given model_id + + Args: + model_id (string) a unique model id + + Returns: + (boto3.s3.Object) an s3 key, which may or may not have contents + """ + bucket_name, prefix = split_s3_path(project_path) + path = '/'.join([prefix, 'trained_models', model_id]) + return s3_conn.Object(bucket_name, path) + + +def key_exists(key): + try: + key.load() + except botocore.exceptions.ClientError as e: + if e.response['Error']['Code'] == "404": + return False + else: + raise + else: + return True + + +def get_matrix_and_metadata(matrix_path, metadata_path): + """Retrieve a matrix in hdf format and + metadata about the matrix in yaml format + + Returns: (tuple) matrix, metadata + """ + matrix = pandas.read_hdf(matrix_path) + with open(metadata_path) as f: + metadata = yaml.load(f) + return matrix, metadata + + +def filename_friendly_hash(inputs): + def dt_handler(x): + if isinstance(x, datetime.datetime) or isinstance(x, datetime.date): + return x.isoformat() + raise TypeError("Unknown type") + return hashlib.md5( + json.dumps(inputs, default=dt_handler, sort_keys=True) + .encode('utf-8') + ).hexdigest() + + +def retry_if_db_error(exception): + return isinstance(exception, sqlalchemy.exc.OperationalError) + + +DEFAULT_RETRY_KWARGS = { + 'retry_on_exception': retry_if_db_error, + 'wait_exponential_multiplier': 1000, # wait 2^x*1000ms between each retry + 'stop_max_attempt_number': 14, + # with this configuration, last wait will be ~2 hours + # for a total of ~4.5 hours waiting +} + + +db_retry = retry(**DEFAULT_RETRY_KWARGS) + + +@db_retry +def save_experiment_and_get_hash(config, db_engine): + experiment_hash = filename_friendly_hash(config) + session = sessionmaker(bind=db_engine)() + session.merge(Experiment( + experiment_hash=experiment_hash, + config=config + )) + session.commit() + session.close() + return experiment_hash + + +class Batch: + # modified from + # http://codereview.stackexchange.com/questions/118883/split-up-an-iterable-into-batches + def __init__(self, iterable, limit=None): + self.iterator = iter(iterable) + self.limit = limit + try: + self.current = next(self.iterator) + except StopIteration: + self.on_going = False + else: + self.on_going = True + + def group(self): + yield self.current + # start enumerate at 1 because we already yielded the last saved item + for num, item in enumerate(self.iterator, 1): + self.current = item + if num == self.limit: + break + yield item + else: + self.on_going = False + + def __iter__(self): + while self.on_going: + yield self.group() + + +def sort_predictions_and_labels(predictions_proba, labels, sort_seed): + random.seed(sort_seed) + predictions_proba_sorted, labels_sorted = zip(*sorted( + zip(predictions_proba, labels), + key=lambda pair: (pair[0], random.random()), reverse=True) + ) + return predictions_proba_sorted, labels_sorted + + +@db_retry +def retrieve_model_id_from_hash(db_engine, model_hash): + """Retrieves a model id from the database that matches the given hash + + Args: + db_engine (sqlalchemy.engine) A database engine + model_hash (str) The model hash to lookup + + Returns: (int) The model id (if found in DB), None (if not) + """ + session = sessionmaker(bind=db_engine)() + try: + saved = session.query(Model)\ + .filter_by(model_hash=model_hash)\ + .one_or_none() + return saved.model_id if saved else None + finally: + session.close() diff --git a/requirements.txt b/requirements.txt index e69de29..ecf71f9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -0,0 +1,14 @@ +SQLAlchemy +PyYAML +psycopg2 +python-dateutil +scipy +sklearn +tables==3.3.0 +pandas +boto3 +inflection +numpy>=1.12 +sqlalchemy-postgres-copy +retrying +results-schema diff --git a/requirements_dev.txt b/requirements_dev.txt index f526889..df26e1e 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -1,5 +1,7 @@ -r requirements.txt +git+git://github.com/dssg/metta-data.git + pip==9.0.1 bumpversion==0.5.3 flake8==3.3.0 @@ -8,3 +10,5 @@ pytest==3.0.7 codecov==2.0.9 pytest-cov==2.5.1 testing.postgresql==1.3.0 + +moto<1.0 diff --git a/setup.py b/setup.py index 865a591..9ff1767 100644 --- a/setup.py +++ b/setup.py @@ -26,6 +26,7 @@ url='https://github.com/dssg/catwalk', packages=[ 'catwalk', + 'catwalk.estimators' ], include_package_data=True, install_requires=requirements, @@ -36,6 +37,7 @@ 'Intended Audience :: Developers', 'Natural Language :: English', 'Programming Language :: Python :: 3.4', + 'Programming Language :: Python :: 3.5', ], test_suite='tests', tests_require=test_requirements diff --git a/tests/test_estimators.py b/tests/test_estimators.py new file mode 100644 index 0000000..47dea4b --- /dev/null +++ b/tests/test_estimators.py @@ -0,0 +1,76 @@ +import numpy as np + +import warnings + +import pytest + +from catwalk.estimators.transformers import CutOff +from catwalk.estimators.classifiers import ScaledLogisticRegression + +from sklearn import linear_model + +from sklearn import datasets +from sklearn import preprocessing +from sklearn.pipeline import Pipeline +from sklearn.model_selection import train_test_split + +@pytest.fixture +def data(): + dataset = datasets.load_breast_cancer() + X = dataset.data + y = dataset.target + + X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.3, random_state=12345) + + return {'X_train':X_train, 'X_test':X_test, 'y_train':y_train, 'y_test':y_test} + +def test_cutoff_warning(): + X_data = [1, 2, 0.5, 0.7, 100, -1, -23, 0] + + cutoff = CutOff() + + with pytest.warns(UserWarning): + cutoff.fit_transform(X_data) + + +def test_cutoff_transformer(): + cutoff = CutOff() + + X_data = [1, 2, 0.5, 0.7, 100, -1, -23, 0] + + assert np.all(cutoff.fit_transform(X_data) == [1, 1, 0.5, 0.7, 1, 0, 0, 0]) + +def test_cutoff_inside_a_pipeline(data): + minmax_scaler = preprocessing.MinMaxScaler() + dsapp_cutoff = CutOff() + + pipeline =Pipeline([ + ('minmax_scaler',minmax_scaler), + ('dsapp_cutoff', dsapp_cutoff) + ]) + + pipeline.fit(data['X_train'], data['y_train']) + + X_fake_new_data = data['X_test'][-1,:] + 0.5 + + mms = preprocessing.MinMaxScaler().fit(data['X_train']) + + assert np.all(( mms.transform(X_fake_new_data) > 1 ) == (pipeline.transform(X_fake_new_data) == 1)) + +def test_dsapp_lr(data): + dsapp_lr = ScaledLogisticRegression() + dsapp_lr.fit(data['X_train'], data['y_train']) + + minmax_scaler = preprocessing.MinMaxScaler() + dsapp_cutoff = CutOff() + lr = linear_model.LogisticRegression() + + pipeline =Pipeline([ + ('minmax_scaler',minmax_scaler), + ('dsapp_cutoff', dsapp_cutoff), + ('lr', lr) + ]) + + pipeline.fit(data['X_train'], data['y_train']) + + assert np.all(dsapp_lr.predict(data['X_test']) == pipeline.predict(data['X_test'])) diff --git a/tests/test_evaluation.py b/tests/test_evaluation.py new file mode 100644 index 0000000..2881901 --- /dev/null +++ b/tests/test_evaluation.py @@ -0,0 +1,178 @@ +from catwalk.evaluation import ModelEvaluator, generate_binary_at_x, fpr +import testing.postgresql + +import numpy +from sqlalchemy import create_engine +from catwalk.db import ensure_db +from tests.utils import fake_labels, fake_trained_model +from catwalk.storage import InMemoryModelStorageEngine +import datetime + + +def always_half(predictions_proba, predictions_binary, labels, parameters): + return 0.5 + + +def test_evaluating_early_warning(): + with testing.postgresql.Postgresql() as postgresql: + db_engine = create_engine(postgresql.url()) + ensure_db(db_engine) + metric_groups = [{ + 'metrics': ['precision@', + 'recall@', + 'true positives@', + 'true negatives@', + 'false positives@', + 'false negatives@'], + 'thresholds': { + 'percentiles': [5.0, 10.0], + 'top_n': [5, 10] + } + }, { + 'metrics': ['f1', + 'mediocre', + 'accuracy', + 'roc_auc', + 'average precision score'], + }, { + 'metrics': ['fbeta@'], + 'parameters': [{'beta': 0.75}, {'beta': 1.25}] + }] + + custom_metrics = {'mediocre': always_half} + + model_evaluator = ModelEvaluator( + metric_groups, + db_engine, + custom_metrics=custom_metrics + ) + + trained_model, model_id = fake_trained_model( + 'myproject', + InMemoryModelStorageEngine('myproject'), + db_engine + ) + + labels = fake_labels(5) + as_of_date = datetime.date(2016, 5, 5) + model_evaluator.evaluate( + trained_model.predict_proba(labels)[:, 1], + labels, + model_id, + as_of_date, + as_of_date, + '1y' + ) + + # assert + # that all of the records are there + records = [ + row[0] for row in + db_engine.execute( + '''select distinct(metric || parameter) + from results.evaluations + where model_id = %s and + evaluation_start_time = %s order by 1''', + (model_id, as_of_date) + ) + ] + assert records == [ + 'accuracy', + 'average precision score', + 'f1', + 'false negatives@10.0_pct', + 'false negatives@10_abs', + 'false negatives@5.0_pct', + 'false negatives@5_abs', + 'false positives@10.0_pct', + 'false positives@10_abs', + 'false positives@5.0_pct', + 'false positives@5_abs', + 'fbeta@0.75_beta', + 'fbeta@1.25_beta', + 'mediocre', + 'precision@10.0_pct', + 'precision@10_abs', + 'precision@5.0_pct', + 'precision@5_abs', + 'recall@10.0_pct', + 'recall@10_abs', + 'recall@5.0_pct', + 'recall@5_abs', + 'roc_auc', + 'true negatives@10.0_pct', + 'true negatives@10_abs', + 'true negatives@5.0_pct', + 'true negatives@5_abs', + 'true positives@10.0_pct', + 'true positives@10_abs', + 'true positives@5.0_pct', + 'true positives@5_abs' + ] + + +def test_model_scoring_inspections(): + with testing.postgresql.Postgresql() as postgresql: + db_engine = create_engine(postgresql.url()) + ensure_db(db_engine) + metric_groups = [{ + 'metrics': ['precision@', 'recall@', 'fpr@'], + 'thresholds': {'percentiles': [50.0], 'top_n': [3]} + }] + + model_evaluator = ModelEvaluator(metric_groups, db_engine) + + _, model_id = fake_trained_model( + 'myproject', + InMemoryModelStorageEngine('myproject'), + db_engine + ) + + labels = numpy.array([True, False, numpy.nan, True, False]) + prediction_probas = numpy.array([0.56, 0.4, 0.55, 0.5, 0.3]) + evaluation_start = datetime.datetime(2016, 4, 1) + evaluation_end = datetime.datetime(2016, 7, 1) + example_frequency = '1d' + model_evaluator.evaluate( + prediction_probas, + labels, + model_id, + evaluation_start, + evaluation_end, + example_frequency + ) + + for record in db_engine.execute( + '''select * from results.evaluations + where model_id = %s and evaluation_start_time = %s order by 1''', + (model_id, evaluation_start) + ): + assert record['num_labeled_examples'] == 4 + assert record['num_positive_labels'] == 2 + if 'pct' in record['parameter']: + assert record['num_labeled_above_threshold'] == 1 + else: + assert record['num_labeled_above_threshold'] == 2 + + +def test_generate_binary_at_x(): + input_list = [0.9, 0.8, 0.7, 0.7, 0.7, 0.7, 0.7, 0.7, 0.7, 0.6] + + # bug can arise when the same value spans both sides of threshold + assert generate_binary_at_x(input_list, 50, 'percentile') == \ + [1, 1, 1, 1, 1, 0, 0, 0, 0, 0] + + assert generate_binary_at_x(input_list, 2) == \ + [1, 1, 0, 0, 0, 0, 0, 0, 0, 0] + + +def test_fpr(): + predictions_binary = \ + [1, 1, 1, 0, 0, 0, 0, 0] + labels = \ + [1, 1, 0, 1, 0, 0, 0, 1] + + result = fpr([], predictions_binary, labels, []) + # false positives = 1 + # total negatives = 4 + assert result == 0.25 diff --git a/tests/test_feature_importances.py b/tests/test_feature_importances.py new file mode 100644 index 0000000..14e3ce7 --- /dev/null +++ b/tests/test_feature_importances.py @@ -0,0 +1,45 @@ +import numpy + +import warnings + +import pytest + +from catwalk.feature_importances import _ad_hoc_feature_importances, get_feature_importances + +from sklearn import datasets +from sklearn.linear_model import LogisticRegression, LinearRegression +from sklearn.ensemble import RandomForestClassifier + +from sklearn.model_selection import train_test_split + +@pytest.fixture +def trained_models(): + dataset = datasets.load_breast_cancer() + X = dataset.data + y = dataset.target + + X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.3, random_state=12345) + + rf = RandomForestClassifier() + rf.fit(X_train, y_train) + + lr = LogisticRegression() + lr.fit(X_train, y_train) + + return {'RF':rf, 'LR':lr} + +def test_throwing_warning_if_lr(trained_models): + with pytest.warns(UserWarning): + get_feature_importances(trained_models['LR']) + + +def test_correct_feature_importances_for_lr(trained_models): + feature_importances = get_feature_importances(trained_models['LR']) + + ## It returns the intercept, too + assert feature_importances.shape == (30,) + +def test_correct_feature_importances_for_rf(trained_models): + feature_importances = get_feature_importances(trained_models['RF']) + + assert feature_importances.shape == (30,) diff --git a/tests/test_integration.py b/tests/test_integration.py new file mode 100644 index 0000000..6503a53 --- /dev/null +++ b/tests/test_integration.py @@ -0,0 +1,159 @@ +from catwalk.model_trainers import ModelTrainer +from catwalk.predictors import Predictor +from catwalk.evaluation import ModelEvaluator +from catwalk.utils import save_experiment_and_get_hash + +import boto3 +import testing.postgresql + +from moto import mock_s3 +from sqlalchemy import create_engine +from catwalk.db import ensure_db + +from catwalk.storage import S3ModelStorageEngine, InMemoryMatrixStore +import datetime +import pandas + + +def test_integration(): + with testing.postgresql.Postgresql() as postgresql: + db_engine = create_engine(postgresql.url()) + ensure_db(db_engine) + + with mock_s3(): + s3_conn = boto3.resource('s3') + s3_conn.create_bucket(Bucket='econ-dev') + project_path = 'econ-dev/inspections' + + # create train and test matrices + train_matrix = pandas.DataFrame.from_dict({ + 'entity_id': [1, 2], + 'feature_one': [3, 4], + 'feature_two': [5, 6], + 'label': [7, 8] + }).set_index('entity_id') + train_metadata = { + 'beginning_of_time': datetime.date(2012, 12, 20), + 'end_time': datetime.date(2016, 12, 20), + 'label_name': 'label', + 'label_window': '1y', + 'feature_names': ['ft1', 'ft2'], + 'metta-uuid': '1234', + } + + train_store = InMemoryMatrixStore(train_matrix, train_metadata) + + as_of_dates = [ + datetime.date(2016, 12, 21), + datetime.date(2017, 1, 21) + ] + + test_stores = [ + InMemoryMatrixStore( + pandas.DataFrame.from_dict({ + 'entity_id': [3], + 'feature_one': [8], + 'feature_two': [5], + 'label': [5] + }).set_index('entity_id'), + { + 'label_name': 'label', + 'label_window': '1y', + 'end_time': as_of_date, + 'metta-uuid': '1234', + } + ) + for as_of_date in as_of_dates + ] + + model_storage_engine = S3ModelStorageEngine(s3_conn, project_path) + + experiment_hash = save_experiment_and_get_hash({}, db_engine) + # instantiate pipeline objects + trainer = ModelTrainer( + project_path=project_path, + experiment_hash=experiment_hash, + model_storage_engine=model_storage_engine, + db_engine=db_engine, + model_group_keys=['label_name', 'label_window'] + ) + predictor = Predictor( + project_path, + model_storage_engine, + db_engine + ) + model_evaluator = ModelEvaluator( + [{'metrics': ['precision@'], 'thresholds': {'top_n': [5]}}], + db_engine + ) + + # run the pipeline + grid_config = { + 'sklearn.linear_model.LogisticRegression': { + 'C': [0.00001, 0.0001], + 'penalty': ['l1', 'l2'], + 'random_state': [2193] + } + } + model_ids = trainer.train_models( + grid_config=grid_config, + misc_db_parameters=dict(), + matrix_store=train_store + ) + + for model_id in model_ids: + for as_of_date, test_store in zip(as_of_dates, test_stores): + predictions_proba = predictor.predict( + model_id, + test_store, + misc_db_parameters=dict(), + train_matrix_columns=['feature_one', 'feature_two'] + ) + + model_evaluator.evaluate( + predictions_proba, + test_store.labels(), + model_id, + as_of_date, + as_of_date, + '6month' + ) + + # assert + # 1. that the predictions table entries are present and + # can be linked to the original models + records = [ + row for row in + db_engine.execute('''select entity_id, model_id, as_of_date + from results.predictions + join results.models using (model_id) + order by 3, 2''') + ] + assert records == [ + (3, 1, datetime.datetime(2016, 12, 21)), + (3, 2, datetime.datetime(2016, 12, 21)), + (3, 3, datetime.datetime(2016, 12, 21)), + (3, 4, datetime.datetime(2016, 12, 21)), + (3, 1, datetime.datetime(2017, 1, 21)), + (3, 2, datetime.datetime(2017, 1, 21)), + (3, 3, datetime.datetime(2017, 1, 21)), + (3, 4, datetime.datetime(2017, 1, 21)), + ] + + # that evaluations are there + records = [ + row for row in + db_engine.execute(''' + select model_id, evaluation_start_time, metric, parameter + from results.evaluations order by 2, 1''') + ] + assert records == [ + (1, datetime.datetime(2016, 12, 21), 'precision@', '5_abs'), + (2, datetime.datetime(2016, 12, 21), 'precision@', '5_abs'), + (3, datetime.datetime(2016, 12, 21), 'precision@', '5_abs'), + (4, datetime.datetime(2016, 12, 21), 'precision@', '5_abs'), + (1, datetime.datetime(2017, 1, 21), 'precision@', '5_abs'), + (2, datetime.datetime(2017, 1, 21), 'precision@', '5_abs'), + (3, datetime.datetime(2017, 1, 21), 'precision@', '5_abs'), + (4, datetime.datetime(2017, 1, 21), 'precision@', '5_abs'), + ] diff --git a/tests/test_model_trainers.py b/tests/test_model_trainers.py new file mode 100644 index 0000000..2889d06 --- /dev/null +++ b/tests/test_model_trainers.py @@ -0,0 +1,327 @@ +import boto3 +import pandas +import pickle +import testing.postgresql +import datetime +import sqlalchemy +from sqlalchemy.orm import sessionmaker +import unittest +from unittest.mock import patch + +from moto import mock_s3 +from sqlalchemy import create_engine +from catwalk.db import ensure_db +from catwalk.utils import model_cache_key + +from catwalk.model_trainers import ModelTrainer +from catwalk.storage import InMemoryModelStorageEngine,\ + S3ModelStorageEngine, InMemoryMatrixStore + + +def test_model_trainer(): + with testing.postgresql.Postgresql() as postgresql: + engine = create_engine(postgresql.url()) + ensure_db(engine) + + grid_config = { + 'sklearn.linear_model.LogisticRegression': { + 'C': [0.00001, 0.0001], + 'penalty': ['l1', 'l2'], + 'random_state': [2193] + } + } + + with mock_s3(): + s3_conn = boto3.resource('s3') + s3_conn.create_bucket(Bucket='econ-dev') + + # create training set + matrix = pandas.DataFrame.from_dict({ + 'entity_id': [1, 2], + 'feature_one': [3, 4], + 'feature_two': [5, 6], + 'label': ['good', 'bad'] + }) + metadata = { + 'beginning_of_time': datetime.date(2012, 12, 20), + 'end_time': datetime.date(2016, 12, 20), + 'label_name': 'label', + 'label_window': '1y', + 'metta-uuid': '1234', + 'feature_names': ['ft1', 'ft2'] + } + project_path = 'econ-dev/inspections' + model_storage_engine = S3ModelStorageEngine(s3_conn, project_path) + trainer = ModelTrainer( + project_path=project_path, + experiment_hash=None, + model_storage_engine=model_storage_engine, + db_engine=engine, + model_group_keys=['label_name', 'label_window'] + ) + matrix_store = InMemoryMatrixStore(matrix, metadata) + model_ids = trainer.train_models( + grid_config=grid_config, + misc_db_parameters=dict(), + matrix_store=matrix_store + ) + + # assert + # 1. that the models and feature importances table entries are present + records = [ + row for row in + engine.execute('select * from results.feature_importances') + ] + assert len(records) == 4 * 3 # maybe exclude entity_id? + + records = [ + row for row in + engine.execute('select model_hash from results.models') + ] + assert len(records) == 4 + + cache_keys = [ + model_cache_key(project_path, model_row[0], s3_conn) + for model_row in records + ] + + # 2. that the model groups are distinct + records = [ + row for row in + engine.execute('select distinct model_group_id from results.models') + ] + assert len(records) == 4 + + # 3. that all four models are cached + model_pickles = [ + pickle.loads(cache_key.get()['Body'].read()) + for cache_key in cache_keys + ] + assert len(model_pickles) == 4 + assert len([x for x in model_pickles if x is not None]) == 4 + + # 4. that their results can have predictions made on it + test_matrix = pandas.DataFrame.from_dict({ + 'entity_id': [3, 4], + 'feature_one': [4, 4], + 'feature_two': [6, 5], + }) + for model_pickle in model_pickles: + predictions = model_pickle.predict(test_matrix) + assert len(predictions) == 2 + + # 5. when run again, same models are returned + new_model_ids = trainer.train_models( + grid_config=grid_config, + misc_db_parameters=dict(), + matrix_store=matrix_store + ) + assert len([ + row for row in + engine.execute('select model_hash from results.models') + ]) == 4 + assert model_ids == new_model_ids + + # 6. if metadata is deleted but the cache is still there, + # retrains that one and replaces the feature importance records + engine.execute('delete from results.feature_importances where model_id = 3') + engine.execute('delete from results.models where model_id = 3') + new_model_ids = trainer.train_models( + grid_config=grid_config, + misc_db_parameters=dict(), + matrix_store=matrix_store + ) + expected_model_ids = [1, 2, 4, 5] + assert expected_model_ids == sorted(new_model_ids) + assert [ + row['model_id'] for row in + engine.execute('select model_id from results.models order by 1 asc') + ] == expected_model_ids + + records = [ + row for row in + engine.execute('select * from results.feature_importances') + ] + assert len(records) == 4 * 3 # maybe exclude entity_id? + + # 7. if the cache is missing but the metadata is still there, reuse the metadata + for row in engine.execute('select model_hash from results.models'): + model_storage_engine.get_store(row[0]).delete() + expected_model_ids = [1, 2, 4, 5] + new_model_ids = trainer.train_models( + grid_config=grid_config, + misc_db_parameters=dict(), + matrix_store=matrix_store + ) + assert expected_model_ids == sorted(new_model_ids) + + # 8. that the generator interface works the same way + new_model_ids = trainer.generate_trained_models( + grid_config=grid_config, + misc_db_parameters=dict(), + matrix_store=matrix_store + ) + assert expected_model_ids == \ + sorted([model_id for model_id in new_model_ids]) + +def test_n_jobs_not_new_model(): + grid_config = { + 'sklearn.ensemble.AdaBoostClassifier': { + 'n_estimators': [10, 100, 1000] + }, + 'sklearn.ensemble.RandomForestClassifier': { + 'n_estimators': [10, 100], + 'max_features': ['sqrt', 'log2'], + 'max_depth': [5, 10, 15, 20], + 'criterion': ['gini', 'entropy'], + 'n_jobs': [12, 24], + } + } + + with testing.postgresql.Postgresql() as postgresql: + engine = create_engine(postgresql.url()) + ensure_db(engine) + with mock_s3(): + s3_conn = boto3.resource('s3') + s3_conn.create_bucket(Bucket='econ-dev') + trainer = ModelTrainer( + project_path='econ-dev/inspections', + experiment_hash=None, + model_storage_engine=S3ModelStorageEngine(s3_conn, 'econ-dev/inspections'), + db_engine=engine, + model_group_keys=['label_name', 'label_window'] + ) + + matrix = pandas.DataFrame.from_dict({ + 'entity_id': [1, 2], + 'feature_one': [3, 4], + 'feature_two': [5, 6], + 'label': ['good', 'bad'] + }) + train_tasks = trainer.generate_train_tasks( + grid_config, + dict(), + InMemoryMatrixStore(matrix, { + 'label_window': '1d', + 'end_time': datetime.datetime.now(), + 'beginning_of_time': datetime.date(2012, 12, 20), + 'label_name': 'label', + 'metta-uuid': '1234', + 'feature_names': ['ft1', 'ft2'] + }) + ) + assert len(train_tasks) == 35 # 32+3, would be (32*2)+3 if we didn't remove + assert len([ + task for task in train_tasks + if 'n_jobs' in task['parameters'] + ]) == 32 + + for train_task in train_tasks: + trainer.process_train_task(**train_task) + + for row in engine.execute( + 'select model_parameters from results.model_groups' + ): + assert 'n_jobs' not in row[0] + + +class RetryTest(unittest.TestCase): + def test_retry_max(self): + grid_config = { + 'sklearn.ensemble.AdaBoostClassifier': { + 'n_estimators': [10] + }, + } + + engine = None + trainer = None + # set up a basic model training run + # TODO abstract the setup of a basic model training run where + # we don't worry about the specific values used? it would make + # tests like this require a bit less noise to read past + with testing.postgresql.Postgresql() as postgresql: + engine = create_engine(postgresql.url()) + ensure_db(engine) + trainer = ModelTrainer( + project_path='econ-dev/inspections', + experiment_hash=None, + model_storage_engine=InMemoryModelStorageEngine(project_path=''), + db_engine=engine, + model_group_keys=['label_name', 'label_window'] + ) + + matrix = pandas.DataFrame.from_dict({ + 'entity_id': [1, 2], + 'feature_one': [3, 4], + 'feature_two': [5, 6], + 'label': ['good', 'bad'] + }) + matrix_store = InMemoryMatrixStore(matrix, { + 'label_window': '1d', + 'end_time': datetime.datetime.now(), + 'beginning_of_time': datetime.date(2012, 12, 20), + 'label_name': 'label', + 'metta-uuid': '1234', + 'feature_names': ['ft1', 'ft2'] + }) + # the postgres server goes out of scope here and thus no longer exists + with patch('time.sleep') as time_mock: + with self.assertRaises(sqlalchemy.exc.OperationalError): + trainer.train_models(grid_config, dict(), matrix_store) + # we want to make sure that we are using the retrying module sanely + # as opposed to matching the exact # of calls specified by the code + assert len(time_mock.mock_calls) > 5 + + def test_retry_recovery(self): + grid_config = { + 'sklearn.ensemble.AdaBoostClassifier': { + 'n_estimators': [10] + }, + } + + engine = None + trainer = None + port = None + with testing.postgresql.Postgresql() as postgresql: + port = postgresql.settings['port'] + engine = create_engine(postgresql.url()) + ensure_db(engine) + trainer = ModelTrainer( + project_path='econ-dev/inspections', + experiment_hash=None, + model_storage_engine=InMemoryModelStorageEngine(project_path=''), + db_engine=engine, + model_group_keys=['label_name', 'label_window'] + ) + + matrix = pandas.DataFrame.from_dict({ + 'entity_id': [1, 2], + 'feature_one': [3, 4], + 'feature_two': [5, 6], + 'label': ['good', 'bad'] + }) + matrix_store = InMemoryMatrixStore(matrix, { + 'label_window': '1d', + 'end_time': datetime.datetime.now(), + 'beginning_of_time': datetime.date(2012, 12, 20), + 'label_name': 'label', + 'metta-uuid': '1234', + 'feature_names': ['ft1', 'ft2'] + }) + + # start without a database server + # then bring it back up after the first sleep + # use self so it doesn't go out of scope too early and shut down + self.new_server = None + def replace_db(arg): + self.new_server = testing.postgresql.Postgresql(port=port) + engine = create_engine(self.new_server.url()) + ensure_db(engine) + with patch('time.sleep') as time_mock: + time_mock.side_effect = replace_db + try: + trainer.train_models(grid_config, dict(), matrix_store) + finally: + if self.new_server is not None: + self.new_server.stop() + assert len(time_mock.mock_calls) == 1 diff --git a/tests/test_predictors.py b/tests/test_predictors.py new file mode 100644 index 0000000..2cd380c --- /dev/null +++ b/tests/test_predictors.py @@ -0,0 +1,285 @@ +import boto3 +import testing.postgresql + +from moto import mock_s3 +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker +from sqlalchemy.orm.session import make_transient +from results_schema import Prediction +from catwalk.db import ensure_db +import pandas + +from catwalk.predictors import Predictor +from tests.utils import fake_trained_model, sample_metta_csv_diff_order +from catwalk.storage import \ + InMemoryModelStorageEngine,\ + S3ModelStorageEngine,\ + InMemoryMatrixStore +import datetime + +from unittest.mock import Mock +from numpy.testing import assert_array_equal +import tempfile + +AS_OF_DATE = datetime.date(2016, 12, 21) + + +def test_predictor(): + with testing.postgresql.Postgresql() as postgresql: + db_engine = create_engine(postgresql.url()) + ensure_db(db_engine) + + with mock_s3(): + s3_conn = boto3.resource('s3') + s3_conn.create_bucket(Bucket='econ-dev') + project_path = 'econ-dev/inspections' + model_storage_engine = S3ModelStorageEngine(s3_conn, project_path) + _, model_id = \ + fake_trained_model(project_path, model_storage_engine, db_engine) + predictor = Predictor(project_path, model_storage_engine, db_engine) + # create prediction set + matrix = pandas.DataFrame.from_dict({ + 'entity_id': [1, 2], + 'feature_one': [3, 4], + 'feature_two': [5, 6], + 'label': [7, 8] + }).set_index('entity_id') + metadata = { + 'label_name': 'label', + 'end_time': AS_OF_DATE, + 'label_window': '3month', + 'metta-uuid': '1234', + } + + matrix_store = InMemoryMatrixStore(matrix, metadata) + train_matrix_columns = ['feature_one', 'feature_two'] + predict_proba = predictor.predict( + model_id, + matrix_store, + misc_db_parameters=dict(), + train_matrix_columns=train_matrix_columns + ) + + # assert + # 1. that the returned predictions are of the desired length + assert len(predict_proba) == 2 + + # 2. that the predictions table entries are present and + # can be linked to the original models + records = [ + row for row in + db_engine.execute('''select entity_id, as_of_date + from results.predictions + join results.models using (model_id)''') + ] + assert len(records) == 2 + + # 3. that the contained as_of_dates match what we sent in + for record in records: + assert record[1].date() == AS_OF_DATE + + # 4. that the entity ids match the given dataset + assert sorted([record[0] for record in records]) == [1, 2] + + # 5. running with same model_id, different as of date + # then with same as of date only replaces the records + # with the same date + new_matrix = pandas.DataFrame.from_dict({ + 'entity_id': [1, 2], + 'feature_one': [3, 4], + 'feature_two': [5, 6], + 'label': [7, 8] + }).set_index('entity_id') + new_metadata = { + 'label_name': 'label', + 'end_time': AS_OF_DATE + datetime.timedelta(days=1), + 'label_window': '3month', + 'metta-uuid': '1234', + } + new_matrix_store = InMemoryMatrixStore(new_matrix, new_metadata) + predictor.predict( + model_id, + new_matrix_store, + misc_db_parameters=dict(), + train_matrix_columns=train_matrix_columns + ) + predictor.predict( + model_id, + matrix_store, + misc_db_parameters=dict(), + train_matrix_columns=train_matrix_columns + ) + records = [ + row for row in + db_engine.execute('''select entity_id, as_of_date + from results.predictions + join results.models using (model_id)''') + ] + assert len(records) == 4 + + # 6. That we can delete the model when done prediction on it + predictor.delete_model(model_id) + assert predictor.load_model(model_id) == None + + +def test_predictor_composite_index(): + with testing.postgresql.Postgresql() as postgresql: + db_engine = create_engine(postgresql.url()) + ensure_db(db_engine) + project_path = 'econ-dev/inspections' + model_storage_engine = InMemoryModelStorageEngine(project_path) + _, model_id = \ + fake_trained_model(project_path, model_storage_engine, db_engine) + predictor = Predictor(project_path, model_storage_engine, db_engine) + dayone = datetime.datetime(2011, 1, 1) + daytwo = datetime.datetime(2011, 1, 2) + # create prediction set + matrix = pandas.DataFrame.from_dict({ + 'entity_id': [1, 2, 1, 2], + 'as_of_date': [dayone, dayone, daytwo, daytwo], + 'feature_one': [3, 4, 5, 6], + 'feature_two': [5, 6, 7, 8], + 'label': [7, 8, 8, 7] + }).set_index(['entity_id', 'as_of_date']) + metadata = { + 'label_name': 'label', + 'end_time': AS_OF_DATE, + 'label_window': '3month', + 'metta-uuid': '1234', + } + matrix_store = InMemoryMatrixStore(matrix, metadata) + predict_proba = predictor.predict( + model_id, + matrix_store, + misc_db_parameters=dict(), + train_matrix_columns=['feature_one', 'feature_two'] + ) + + # assert + # 1. that the returned predictions are of the desired length + assert len(predict_proba) == 4 + + # 2. that the predictions table entries are present and + # can be linked to the original models + records = [ + row for row in + db_engine.execute('''select entity_id, as_of_date + from results.predictions + join results.models using (model_id)''') + ] + assert len(records) == 4 + + +def test_predictor_get_train_columns(): + with testing.postgresql.Postgresql() as postgresql: + db_engine = create_engine(postgresql.url()) + ensure_db(db_engine) + project_path = 'econ-dev/inspections' + with tempfile.TemporaryDirectory() as temp_dir: + train_store, test_store = sample_metta_csv_diff_order(temp_dir) + + model_storage_engine = InMemoryModelStorageEngine(project_path) + _, model_id = \ + fake_trained_model( + project_path, + model_storage_engine, + db_engine, + train_matrix_uuid=train_store.uuid + ) + predictor = Predictor(project_path, model_storage_engine, db_engine) + + predict_proba = predictor.predict( + model_id, + test_store, + misc_db_parameters=dict(), + train_matrix_columns=train_store.columns() + ) + # assert + # 1. that we calculated predictions + assert len(predict_proba) > 0 + + # 2. that the predictions table entries are present and + # can be linked to the original models + records = [ + row for row in + db_engine.execute('''select entity_id, as_of_date + from results.predictions + join results.models using (model_id)''') + ] + assert len(records) > 0 + + +def test_predictor_retrieve(): + with testing.postgresql.Postgresql() as postgresql: + db_engine = create_engine(postgresql.url()) + ensure_db(db_engine) + project_path = 'econ-dev/inspections' + model_storage_engine = InMemoryModelStorageEngine(project_path) + _, model_id = \ + fake_trained_model(project_path, model_storage_engine, db_engine) + predictor = Predictor(project_path, model_storage_engine, db_engine, replace=False) + dayone = datetime.date(2011, 1, 1).strftime(predictor.expected_matrix_ts_format) + daytwo = datetime.date(2011, 1, 2).strftime(predictor.expected_matrix_ts_format) + # create prediction set + matrix_data = { + 'entity_id': [1, 2, 1, 2], + 'as_of_date': [dayone, dayone, daytwo, daytwo], + 'feature_one': [3, 4, 5, 6], + 'feature_two': [5, 6, 7, 8], + 'label': [7, 8, 8, 7] + } + matrix = pandas.DataFrame.from_dict(matrix_data)\ + .set_index(['entity_id', 'as_of_date']) + metadata = { + 'label_name': 'label', + 'end_time': AS_OF_DATE, + 'label_window': '3month', + 'metta-uuid': '1234', + } + matrix_store = InMemoryMatrixStore(matrix, metadata) + predict_proba = predictor.predict( + model_id, + matrix_store, + misc_db_parameters=dict(), + train_matrix_columns=['feature_one', 'feature_two'] + ) + + # When run again, the predictions retrieved from the database + # should match. + # + # Some trickiness here. Let's explain: + # + # If we are not careful, retrieving predictions from the database and + # presenting them as a numpy array can result in a bad ordering, + # since the given matrix may not be 'ordered' by some criteria + # that can be easily represented by an ORDER BY clause. + # + # It will sometimes work, because without ORDER BY you will get + # it back in the table's physical order, which unless something has + # happened to the table will be the order you inserted it, + # which could very well be the order in the matrix. + # So it's not a bug that would necessarily immediately show itself, + # but when it does go wrong your scores will be garbage. + # + # So we simulate a table order mutation that can happen over time: + # Remove the first row and put it at the end. + # If the Predictor doesn't explicitly reorder the results, this will fail + session = sessionmaker(bind=db_engine)() + obj = session.query(Prediction).first() + session.delete(obj) + session.commit() + + make_transient(obj) + session = sessionmaker(bind=db_engine)() + session.add(obj) + session.commit() + + predictor.load_model = Mock() + new_predict_proba = predictor.predict( + model_id, + matrix_store, + misc_db_parameters=dict(), + train_matrix_columns=['feature_one', 'feature_two'] + ) + assert_array_equal(new_predict_proba, predict_proba) + assert not predictor.load_model.called diff --git a/tests/test_storage.py b/tests/test_storage.py new file mode 100644 index 0000000..499a81b --- /dev/null +++ b/tests/test_storage.py @@ -0,0 +1,119 @@ +from catwalk.storage import S3Store, FSStore, MemoryStore, InMemoryMatrixStore +from moto import mock_s3 +import tempfile +import boto3 +import os +import pandas +from collections import OrderedDict +import unittest + + +class SomeClass(object): + def __init__(self, val): + self.val = val + + +def test_S3Store(): + with mock_s3(): + s3_conn = boto3.resource('s3') + s3_conn.create_bucket(Bucket='a-bucket') + store = S3Store(s3_conn.Object('a-bucket', 'a-path')) + assert not store.exists() + store.write(SomeClass('val')) + assert store.exists() + newVal = store.load() + assert newVal.val == 'val' + store.delete() + assert not store.exists() + + +def test_FSStore(): + with tempfile.TemporaryDirectory() as tmpdir: + tmpfile = os.path.join(tmpdir, 'tmpfile') + store = FSStore(tmpfile) + assert not store.exists() + store.write(SomeClass('val')) + assert store.exists() + newVal = store.load() + assert newVal.val == 'val' + store.delete() + assert not store.exists() + + +def test_MemoryStore(): + store = MemoryStore(None) + assert not store.exists() + store.write(SomeClass('val')) + assert store.exists() + newVal = store.load() + assert newVal.val == 'val' + store.delete() + assert not store.exists() + + +class MatrixStoreTest(unittest.TestCase): + def matrix_store(self): + data_dict = OrderedDict([ + ('entity_id', [1, 2]), + ('k_feature', [0.5, 0.4]), + ('m_feature', [0.4, 0.5]), + ('label', [0, 1]) + ]) + df = pandas.DataFrame.from_dict(data_dict) + metadata = { + 'label_name': 'label', + 'indices': ['entity_id'], + } + matrix_store = InMemoryMatrixStore(matrix=df, metadata=metadata) + return matrix_store + + def test_MatrixStore_resort_columns(self): + result = self.matrix_store().\ + matrix_with_sorted_columns( + ['entity_id', 'm_feature', 'k_feature'] + )\ + .values\ + .tolist() + expected = [ + [1, 0.4, 0.5], + [2, 0.5, 0.4] + ] + self.assertEqual(expected, result) + + def test_MatrixStore_already_sorted_columns(self): + result = self.matrix_store().\ + matrix_with_sorted_columns( + ['entity_id', 'k_feature', 'm_feature'] + )\ + .values\ + .tolist() + expected = [ + [1, 0.5, 0.4], + [2, 0.4, 0.5] + ] + self.assertEqual(expected, result) + + def test_MatrixStore_sorted_columns_subset(self): + with self.assertRaises(ValueError): + self.matrix_store().\ + matrix_with_sorted_columns(['entity_id', 'm_feature'])\ + .values\ + .tolist() + + def test_MatrixStore_sorted_columns_superset(self): + with self.assertRaises(ValueError): + self.matrix_store().\ + matrix_with_sorted_columns( + ['entity_id', 'k_feature', 'l_feature', 'm_feature'] + )\ + .values\ + .tolist() + + def test_MatrixStore_sorted_columns_mismatch(self): + with self.assertRaises(ValueError): + self.matrix_store().\ + matrix_with_sorted_columns( + ['entity_id', 'k_feature', 'l_feature'] + )\ + .values\ + .tolist() diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..aecf051 --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,106 @@ +from catwalk.utils import filename_friendly_hash, \ + save_experiment_and_get_hash, \ + sort_predictions_and_labels +from catwalk.db import ensure_db +from sqlalchemy import create_engine +import testing.postgresql +import datetime +import logging +import re + + +def test_filename_friendly_hash(): + data = { + 'stuff': 'stuff', + 'other_stuff': 'more_stuff', + 'a_datetime': datetime.datetime(2015, 1, 1), + 'a_date': datetime.date(2016, 1, 1), + 'a_number': 5.0 + } + output = filename_friendly_hash(data) + assert isinstance(output, str) + assert re.match('^[\w]+$', output) is not None + + # make sure ordering keys differently doesn't change the hash + new_output = filename_friendly_hash({ + 'other_stuff': 'more_stuff', + 'stuff': 'stuff', + 'a_datetime': datetime.datetime(2015, 1, 1), + 'a_date': datetime.date(2016, 1, 1), + 'a_number': 5.0 + }) + assert new_output == output + + # make sure new data hashes to something different + new_output = filename_friendly_hash({ + 'stuff': 'stuff', + 'a_number': 5.0 + }) + assert new_output != output + + +def test_filename_friendly_hash_stability(): + nested_data = { + 'one': 'two', + 'three': { + 'four': 'five', + 'six': 'seven' + } + } + output = filename_friendly_hash(nested_data) + # 1. we want to make sure this is stable across different runs + # so hardcode an expected value + assert output == '9a844a7ebbfd821010b1c2c13f7391e6' + other_nested_data = { + 'one': 'two', + 'three': { + 'six': 'seven', + 'four': 'five' + } + } + new_output = filename_friendly_hash(other_nested_data) + assert output == new_output + + +def test_save_experiment_and_get_hash(): + # no reason to make assertions on the config itself, use a basic dict + experiment_config = {'one': 'two'} + with testing.postgresql.Postgresql() as postgresql: + engine = create_engine(postgresql.url()) + ensure_db(engine) + exp_hash = save_experiment_and_get_hash(experiment_config, engine) + assert isinstance(exp_hash, str) + new_hash = save_experiment_and_get_hash(experiment_config, engine) + assert new_hash == exp_hash + +def test_sort_predictions_and_labels(): + predictions = [ + 0.5, + 0.4, + 0.6, + 0.5, + ] + + labels = [ + False, + False, + True, + True + ] + + sorted_predictions, sorted_labels = sort_predictions_and_labels( + predictions, + labels, + 8 + ) + assert sorted_predictions == (0.6, 0.5, 0.5, 0.4) + assert sorted_labels == (True, True, False, False) + + + sorted_predictions, sorted_labels = sort_predictions_and_labels( + predictions, + labels, + 12345 + ) + assert sorted_predictions == (0.6, 0.5, 0.5, 0.4) + assert sorted_labels == (True, False, True, False) diff --git a/tests/utils.py b/tests/utils.py new file mode 100644 index 0000000..34a734b --- /dev/null +++ b/tests/utils.py @@ -0,0 +1,127 @@ +from contextlib import contextmanager +import pandas +import tempfile +import yaml +import numpy +import random +from results_schema import Model +from sqlalchemy.orm import sessionmaker +import datetime +import os +from catwalk.storage import MettaCSVMatrixStore +from metta import metta_io as metta +from collections import OrderedDict + + +@contextmanager +def fake_metta(matrix_dict, metadata): + """Stores matrix and metadata in a metta-data-like form + + Args: + matrix_dict (dict) of form { columns: values }. + Expects an entity_id to be present which it will use as the index + metadata (dict). Any metadata that should be set + + Yields: + tuple of filenames for matrix and metadata + """ + matrix = pandas.DataFrame.from_dict(matrix_dict).set_index('entity_id') + with tempfile.NamedTemporaryFile() as matrix_file: + with tempfile.NamedTemporaryFile('w') as metadata_file: + hdf = pandas.HDFStore(matrix_file.name) + hdf.put('title', matrix, data_columns=True) + matrix_file.seek(0) + + yaml.dump(metadata, metadata_file) + metadata_file.seek(0) + yield (matrix_file.name, metadata_file.name) + + +def fake_labels(length): + return numpy.array([random.choice([True, False]) for i in range(0, length)]) + + +class MockTrainedModel(object): + def predict_proba(self, dataset): + return numpy.random.rand(len(dataset), len(dataset)) + + +def fake_trained_model(project_path, model_storage_engine, db_engine, train_matrix_uuid='efgh'): + """Creates and stores a trivial trained model + + Args: + project_path (string) a desired fs/s3 project path + model_storage_engine (catwalk.storage.ModelStorageEngine) + db_engine (sqlalchemy.engine) + + Returns: + (int) model id for database retrieval + """ + trained_model = MockTrainedModel() + model_storage_engine.get_store('abcd').write(trained_model) + session = sessionmaker(db_engine)() + db_model = Model(model_hash='abcd', train_matrix_uuid=train_matrix_uuid) + session.add(db_model) + session.commit() + return trained_model, db_model.model_id + + +def sample_metta_csv_diff_order(directory): + """Stores matrix and metadata in a metta-data-like form + + The train and test matrices will have different column orders + + Args: + directory (str) + """ + train_dict = OrderedDict([ + ('entity_id', [1, 2]), + ('k_feature', [0.5, 0.4]), + ('m_feature', [0.4, 0.5]), + ('label', [0, 1]) + ]) + train_matrix = pandas.DataFrame.from_dict(train_dict) + train_metadata = { + 'beginning_of_time': datetime.date(2014, 1, 1), + 'end_time': datetime.date(2015, 1, 1), + 'matrix_id': 'train_matrix', + 'label_name': 'label', + 'label_window': '3month', + 'indices': ['entity_id'], + } + + test_dict = OrderedDict([ + ('entity_id', [3, 4]), + ('m_feature', [0.4, 0.5]), + ('k_feature', [0.5, 0.4]), + ('label', [0, 1]) + ]) + + test_matrix = pandas.DataFrame.from_dict(test_dict) + test_metadata = { + 'beginning_of_time': datetime.date(2015, 1, 1), + 'end_time': datetime.date(2016, 1, 1), + 'matrix_id': 'test_matrix', + 'label_name': 'label', + 'label_window': '3month', + 'indices': ['entity_id'], + } + + train_uuid, test_uuid = metta.archive_train_test( + train_config=train_metadata, + df_train=train_matrix, + test_config=test_metadata, + df_test=test_matrix, + directory=directory, + format='csv' + ) + + train_store = MettaCSVMatrixStore( + matrix_path=os.path.join(directory, '{}.csv'.format(train_uuid)), + metadata_path=os.path.join(directory, '{}.yaml'.format(train_uuid)) + ) + test_store = MettaCSVMatrixStore( + matrix_path=os.path.join(directory, '{}.csv'.format(test_uuid)), + metadata_path=os.path.join(directory, '{}.yaml'.format(test_uuid)) + ) + return train_store, test_store