forked from yandexdataschool/flavours-of-physics-start
-
Notifications
You must be signed in to change notification settings - Fork 0
/
evaluation.py
133 lines (106 loc) · 5.78 KB
/
evaluation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import numpy
from sklearn.metrics import roc_curve, auc
def __rolling_window(data, window_size):
"""
Rolling window: take window with definite size through the array
:param data: array-like
:param window_size: size
:return: the sequence of windows
Example: data = array(1, 2, 3, 4, 5, 6), window_size = 4
Then this function return array(array(1, 2, 3, 4), array(2, 3, 4, 5), array(3, 4, 5, 6))
"""
shape = data.shape[:-1] + (data.shape[-1] - window_size + 1, window_size)
strides = data.strides + (data.strides[-1],)
return numpy.lib.stride_tricks.as_strided(data, shape=shape, strides=strides)
def __cvm(subindices, total_events):
"""
Compute Cramer-von Mises metric.
Compared two distributions, where first is subset of second one.
Assuming that second is ordered by ascending
:param subindices: indices of events which will be associated with the first distribution
:param total_events: count of events in the second distribution
:return: cvm metric
"""
target_distribution = numpy.arange(1, total_events + 1, dtype='float') / total_events
subarray_distribution = numpy.cumsum(numpy.bincount(subindices, minlength=total_events), dtype='float')
subarray_distribution /= 1.0 * subarray_distribution[-1]
return numpy.mean((target_distribution - subarray_distribution) ** 2)
def compute_cvm(predictions, masses, n_neighbours=200, step=50):
"""
Computing Cramer-von Mises (cvm) metric on background events: take average of cvms calculated for each mass bin.
In each mass bin global prediction's cdf is compared to prediction's cdf in mass bin.
:param predictions: array-like, predictions
:param masses: array-like, in case of Kaggle tau23mu this is reconstructed mass
:param n_neighbours: count of neighbours for event to define mass bin
:param step: step through sorted mass-array to define next center of bin
:return: average cvm value
"""
predictions = numpy.array(predictions)
masses = numpy.array(masses)
assert len(predictions) == len(masses)
# First, reorder by masses
predictions = predictions[numpy.argsort(masses)]
# Second, replace probabilities with order of probability among other events
predictions = numpy.argsort(numpy.argsort(predictions, kind='mergesort'), kind='mergesort')
# Now, each window forms a group, and we can compute contribution of each group to CvM
cvms = []
for window in __rolling_window(predictions, window_size=n_neighbours)[::step]:
cvms.append(__cvm(subindices=window, total_events=len(predictions)))
return numpy.mean(cvms)
def __roc_curve_splitted(data_zero, data_one, sample_weights_zero, sample_weights_one):
"""
Compute roc curve
:param data_zero: 0-labeled data
:param data_one: 1-labeled data
:param sample_weights_zero: weights for 0-labeled data
:param sample_weights_one: weights for 1-labeled data
:return: roc curve
"""
labels = [0] * len(data_zero) + [1] * len(data_one)
weights = numpy.concatenate([sample_weights_zero, sample_weights_one])
data_all = numpy.concatenate([data_zero, data_one])
fpr, tpr, _ = roc_curve(labels, data_all, sample_weight=weights)
return fpr, tpr
def compute_ks(data_prediction, mc_prediction, weights_data, weights_mc):
"""
Compute Kolmogorov-Smirnov (ks) distance between real data predictions cdf and Monte Carlo one.
:param data_prediction: array-like, real data predictions
:param mc_prediction: array-like, Monte Carlo data predictions
:param weights_data: array-like, real data weights
:param weights_mc: array-like, Monte Carlo weights
:return: ks value
"""
assert len(data_prediction) == len(weights_data), 'Data length and weight one must be the same'
assert len(mc_prediction) == len(weights_mc), 'Data length and weight one must be the same'
data_prediction, mc_prediction = numpy.array(data_prediction), numpy.array(mc_prediction)
weights_data, weights_mc = numpy.array(weights_data), numpy.array(weights_mc)
assert numpy.all(data_prediction >= 0.) and numpy.all(data_prediction <= 1.), 'Data predictions are out of range [0, 1]'
assert numpy.all(mc_prediction >= 0.) and numpy.all(mc_prediction <= 1.), 'MC predictions are out of range [0, 1]'
weights_data /= numpy.sum(weights_data)
weights_mc /= numpy.sum(weights_mc)
fpr, tpr = __roc_curve_splitted(data_prediction, mc_prediction, weights_data, weights_mc)
Dnm = numpy.max(numpy.abs(fpr - tpr))
return Dnm
def roc_auc_truncated(labels, predictions, tpr_thresholds=(0.2, 0.4, 0.6, 0.8),
roc_weights=(4, 3, 2, 1, 0)):
"""
Compute weighted area under ROC curve.
:param labels: array-like, true labels
:param predictions: array-like, predictions
:param tpr_thresholds: array-like, true positive rate thresholds delimiting the ROC segments
:param roc_weights: array-like, weights for true positive rate segments
:return: weighted AUC
"""
assert numpy.all(predictions >= 0.) and numpy.all(predictions <= 1.), 'Data predictions are out of range [0, 1]'
assert len(tpr_thresholds) + 1 == len(roc_weights), 'Incompatible lengths of thresholds and weights'
fpr, tpr, _ = roc_curve(labels, predictions)
area = 0.
tpr_thresholds = [0.] + list(tpr_thresholds) + [1.]
for index in range(1, len(tpr_thresholds)):
tpr_cut = numpy.minimum(tpr, tpr_thresholds[index])
tpr_previous = numpy.minimum(tpr, tpr_thresholds[index - 1])
area += roc_weights[index - 1] * (auc(fpr, tpr_cut, reorder=True) - auc(fpr, tpr_previous, reorder=True))
tpr_thresholds = numpy.array(tpr_thresholds)
# roc auc normalization to be 1 for an ideal classifier
area /= numpy.sum((tpr_thresholds[1:] - tpr_thresholds[:-1]) * numpy.array(roc_weights))
return area