Skip to content

Commit

Permalink
fix: updated ks test with new phi value, fixed reference and current …
Browse files Browse the repository at this point in the history
…columns in class, fixed chi2 has_drift (#58)
  • Loading branch information
SteZamboni authored Jul 1, 2024
1 parent 379ee62 commit 5b484e4
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 58 deletions.
46 changes: 28 additions & 18 deletions spark/jobs/utils/chi2.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ def __concatenate_columns(self) -> pyspark.sql.DataFrame:

return concatenated_data

def __numeric_casting(self, concatenated_data) -> pyspark.sql.DataFrame:
def __numeric_casting(
self, concatenated_data, reference_column, current_column
) -> pyspark.sql.DataFrame:
"""
Performs numeric casting on the concatenated data.
Expand All @@ -118,18 +120,20 @@ def __numeric_casting(self, concatenated_data) -> pyspark.sql.DataFrame:
StringIndexer(inputCol=column, outputCol=column + "_index").fit(
concatenated_data
)
for column in [self.reference_column, self.current_column]
for column in [reference_column, current_column]
]
pipeline = Pipeline(stages=indexers)
return (
pipeline.fit(concatenated_data)
.transform(concatenated_data)
.drop(self.reference_column, self.current_column)
.withColumnRenamed(self.reference_column + "_index", self.reference_column)
.withColumnRenamed(self.current_column + "_index", self.current_column)
.drop(reference_column, current_column)
.withColumnRenamed(reference_column + "_index", reference_column)
.withColumnRenamed(current_column + "_index", current_column)
)

def __current_column_to_vector(self, data) -> pyspark.sql.DataFrame:
def __current_column_to_vector(
self, data, reference_column, current_column
) -> pyspark.sql.DataFrame:
"""
Converts the current column data to a vector using VectorAssembler.
Expand All @@ -140,13 +144,15 @@ def __current_column_to_vector(self, data) -> pyspark.sql.DataFrame:
- pyspark.sql.DataFrame: The DataFrame with the current column data converted to a vector.
"""
vector_assembler = VectorAssembler(
inputCols=[self.current_column], outputCol=f"{self.current_column}_vector"
inputCols=[current_column], outputCol=f"{current_column}_vector"
)
return vector_assembler.transform(data).select(
self.reference_column, f"{self.current_column}_vector"
reference_column, f"{current_column}_vector"
)

def __prepare_data_for_test(self) -> pyspark.sql.DataFrame:
def __prepare_data_for_test(
self, reference_column, current_column
) -> pyspark.sql.DataFrame:
"""
Prepares the data for the chi-square test by concatenating columns, performing numeric casting, and converting
the current column data to a vector.
Expand All @@ -156,12 +162,16 @@ def __prepare_data_for_test(self) -> pyspark.sql.DataFrame:
"""
concatenated_data = self.__concatenate_columns()
numeric_concatenated_data = self.__numeric_casting(
concatenated_data=concatenated_data
concatenated_data=concatenated_data,
reference_column=reference_column,
current_column=current_column,
)
vector_data = self.__current_column_to_vector(data=numeric_concatenated_data)
return vector_data.select(
self.reference_column, f"{self.current_column}_vector"
vector_data = self.__current_column_to_vector(
data=numeric_concatenated_data,
reference_column=reference_column,
current_column=current_column,
)
return vector_data.select(reference_column, f"{current_column}_vector")

def test(self, reference_column, current_column) -> Dict:
"""
Expand All @@ -186,14 +196,14 @@ def test(self, reference_column, current_column) -> Dict:
.drop(*[current_column])
.na.drop()
)
self.reference_column = f"{reference_column}_reference"
self.current_column = f"{current_column}_current"
reference_column = f"{reference_column}_reference"
current_column = f"{current_column}_current"
self.reference_size = self.reference.count()
self.current_size = self.current.count()
result = ChiSquareTest.test(
self.__prepare_data_for_test(),
f"{self.current_column}_vector",
self.reference_column,
self.__prepare_data_for_test(reference_column, current_column),
f"{current_column}_vector",
reference_column,
True,
)

Expand Down
8 changes: 4 additions & 4 deletions spark/jobs/utils/current_binary.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ def calculate_drift(self):
result_tmp["pValue"]
)
feature_dict_to_append["drift_calc"]["has_drift"] = bool(
result_tmp["pValue"] >= 0.05
result_tmp["pValue"] <= 0.05
)
else:
feature_dict_to_append["drift_calc"]["value"] = None
Expand All @@ -402,7 +402,7 @@ def calculate_drift(self):
reference_data=self.reference,
current_data=self.current,
alpha=0.05,
beta=0.000001,
phi=0.004,
)

for column in numerical_features:
Expand All @@ -414,10 +414,10 @@ def calculate_drift(self):
}
result_tmp = ks.test(column, column)
feature_dict_to_append["drift_calc"]["value"] = float(
result_tmp["ks_Statistic"]
result_tmp["ks_statistic"]
)
feature_dict_to_append["drift_calc"]["has_drift"] = bool(
result_tmp["ks_Statistic"] > result_tmp["critical_value"]
result_tmp["ks_statistic"] > result_tmp["critical_value"]
)
drift_result["feature_metrics"].append(feature_dict_to_append)

Expand Down
55 changes: 25 additions & 30 deletions spark/jobs/utils/ks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,20 @@ class KolmogorovSmirnovTest:
It is designed to compare two sample distributions and determine if they differ significantly.
"""

def __init__(self, reference_data, current_data, alpha, beta) -> None:
def __init__(self, reference_data, current_data, alpha, phi) -> None:
"""
Initializes the KolmogorovSmirnovTest with the provided data and parameters.
Parameters:
- reference_data (DataFrame): The reference data as a Spark DataFrame.
- current_data (DataFrame): The current data as a Spark DataFrame.
- alpha (float): The significance level for the hypothesis test.
- phi (float): ϕ defines the precision of the KS test statistic.
"""
self.reference_data = reference_data
self.current_data = current_data
self.alpha = alpha
self.beta = beta
self.phi = phi
self.reference_size = self.reference_data.count()
self.current_size = self.current_data.count()

Expand All @@ -38,35 +39,23 @@ def __num_probs(n, delta: float, epsilon: float) -> int:
"""Calculate number of probability points for approx
quantiles; at most this is the number of data points.
Returns:
- int_ the number of probability points
- int: the number of probability points
"""

a = 1 / (delta - epsilon) + 1
return min(ceil(a), n)

def __critical_value_2(self, _alpha):
"""Compute the critical value for the KS test at a given alpha level."""
def __critical_value(self, significance_level) -> float:
"""Compute the critical value for the KS test at a given alpha level.
Returns:
- float: the critical value for a given alpha
"""

return np.sqrt(-0.5 * np.log(_alpha / 2)) * np.sqrt(
return np.sqrt(-0.5 * np.log(significance_level / 2)) * np.sqrt(
(self.reference_size + self.current_size)
/ (self.reference_size * self.current_size)
)

def __compute_phi(self, alpha, beta):
"""Compute the value of phi for the given parameters."""

D_crit = self.__critical_value_2(self.alpha)
D_crit_upper = self.__critical_value_2(alpha + beta)
D_crit_lower = self.__critical_value_2(alpha - beta)

# Compute the absolute differences
delta_upper = abs(D_crit_upper - D_crit)
delta_lower = abs(D_crit_lower - D_crit)

# Phi is the minimum of these differences
phi = min(delta_upper, delta_lower)
return phi

def test(self, reference_column, current_column) -> dict:
"""Approximates two-sample KS distance with precision
phi between columns of Spark DataFrames.
Expand All @@ -76,28 +65,34 @@ def test(self, reference_column, current_column) -> dict:
- current_column (str): The column name in the current data.
"""

phi = self.__compute_phi(alpha=self.alpha, beta=self.beta)
delta = phi / 2
eps45x = self.__eps45(delta, self.reference_size)
eps45y = self.__eps45(delta, self.current_size)
ax = self.__num_probs(self.reference_size, delta, eps45x)
ay = self.__num_probs(self.current_size, delta, eps45y)
delta = self.phi / 2

eps45x = self.__eps45(delta=delta, n=self.reference_size)
eps45y = self.__eps45(delta=delta, n=self.current_size)

ax = self.__num_probs(n=self.reference_size, delta=delta, epsilon=eps45x)
ay = self.__num_probs(n=self.current_size, delta=delta, epsilon=eps45y)

pxi = linspace(1 / self.reference_size, 1, ax)
pyj = linspace(1 / self.current_size, 1, ay)

xi = self.reference_data.approxQuantile(reference_column, list(pxi), eps45x)
yj = self.current_data.approxQuantile(current_column, list(pyj), eps45y)

f_xi = pxi
f_yi = interp(xi, yj, pyj)

f_yj = pyj
f_xj = interp(yj, xi, pxi)

d_i = max(abs(f_xi - f_yi))
d_j = max(abs(f_xj - f_yj))
d_ks = max(d_i, d_j)
critical_value = self.__critical_value_2(_alpha=self.alpha)

critical_value = self.__critical_value(significance_level=self.alpha)

return {
"critical_value": critical_value,
"ks_Statistic": round(d_ks, 10),
"ks_statistic": round(d_ks, 10),
"alpha": self.alpha,
"phi": phi,
}
12 changes: 6 additions & 6 deletions spark/tests/binary_drift_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,15 @@ def test_drift(spark_fixture, drift_dataset):
"drift_calc": {
"type": "CHI2",
"value": 0.0004993992273872871,
"has_drift": False,
"has_drift": True,
},
},
{
"feature_name": "cat2",
"drift_calc": {
"type": "CHI2",
"value": 0.49015296041582523,
"has_drift": True,
"has_drift": False,
},
},
{
Expand Down Expand Up @@ -284,15 +284,15 @@ def test_drift_boolean(spark_fixture, drift_dataset_bool):
"drift_calc": {
"type": "CHI2",
"value": 0.0012340980408668267,
"has_drift": False,
"has_drift": True,
},
},
{
"feature_name": "bool1",
"drift_calc": {
"type": "CHI2",
"value": 0.002699796063260207,
"has_drift": False,
"has_drift": True,
},
},
{
Expand Down Expand Up @@ -370,15 +370,15 @@ def test_drift_bigger_file(spark_fixture, drift_dataset_bigger_file):
"drift_calc": {
"type": "CHI2",
"value": 0.26994857272252293,
"has_drift": True,
"has_drift": False,
},
},
{
"feature_name": "cat2",
"drift_calc": {
"type": "CHI2",
"value": 0.3894236957350261,
"has_drift": True,
"has_drift": False,
},
},
{
Expand Down

0 comments on commit 5b484e4

Please sign in to comment.