diff --git a/spark/jobs/utils/chi2.py b/spark/jobs/utils/chi2.py index 8620365f..082f9ed8 100644 --- a/spark/jobs/utils/chi2.py +++ b/spark/jobs/utils/chi2.py @@ -104,7 +104,9 @@ def __concatenate_columns(self) -> pyspark.sql.DataFrame: return concatenated_data - def __numeric_casting(self, concatenated_data) -> pyspark.sql.DataFrame: + def __numeric_casting( + self, concatenated_data, reference_column, current_column + ) -> pyspark.sql.DataFrame: """ Performs numeric casting on the concatenated data. @@ -118,18 +120,20 @@ def __numeric_casting(self, concatenated_data) -> pyspark.sql.DataFrame: StringIndexer(inputCol=column, outputCol=column + "_index").fit( concatenated_data ) - for column in [self.reference_column, self.current_column] + for column in [reference_column, current_column] ] pipeline = Pipeline(stages=indexers) return ( pipeline.fit(concatenated_data) .transform(concatenated_data) - .drop(self.reference_column, self.current_column) - .withColumnRenamed(self.reference_column + "_index", self.reference_column) - .withColumnRenamed(self.current_column + "_index", self.current_column) + .drop(reference_column, current_column) + .withColumnRenamed(reference_column + "_index", reference_column) + .withColumnRenamed(current_column + "_index", current_column) ) - def __current_column_to_vector(self, data) -> pyspark.sql.DataFrame: + def __current_column_to_vector( + self, data, reference_column, current_column + ) -> pyspark.sql.DataFrame: """ Converts the current column data to a vector using VectorAssembler. @@ -140,13 +144,15 @@ def __current_column_to_vector(self, data) -> pyspark.sql.DataFrame: - pyspark.sql.DataFrame: The DataFrame with the current column data converted to a vector. """ vector_assembler = VectorAssembler( - inputCols=[self.current_column], outputCol=f"{self.current_column}_vector" + inputCols=[current_column], outputCol=f"{current_column}_vector" ) return vector_assembler.transform(data).select( - self.reference_column, f"{self.current_column}_vector" + reference_column, f"{current_column}_vector" ) - def __prepare_data_for_test(self) -> pyspark.sql.DataFrame: + def __prepare_data_for_test( + self, reference_column, current_column + ) -> pyspark.sql.DataFrame: """ Prepares the data for the chi-square test by concatenating columns, performing numeric casting, and converting the current column data to a vector. @@ -156,12 +162,16 @@ def __prepare_data_for_test(self) -> pyspark.sql.DataFrame: """ concatenated_data = self.__concatenate_columns() numeric_concatenated_data = self.__numeric_casting( - concatenated_data=concatenated_data + concatenated_data=concatenated_data, + reference_column=reference_column, + current_column=current_column, ) - vector_data = self.__current_column_to_vector(data=numeric_concatenated_data) - return vector_data.select( - self.reference_column, f"{self.current_column}_vector" + vector_data = self.__current_column_to_vector( + data=numeric_concatenated_data, + reference_column=reference_column, + current_column=current_column, ) + return vector_data.select(reference_column, f"{current_column}_vector") def test(self, reference_column, current_column) -> Dict: """ @@ -186,14 +196,14 @@ def test(self, reference_column, current_column) -> Dict: .drop(*[current_column]) .na.drop() ) - self.reference_column = f"{reference_column}_reference" - self.current_column = f"{current_column}_current" + reference_column = f"{reference_column}_reference" + current_column = f"{current_column}_current" self.reference_size = self.reference.count() self.current_size = self.current.count() result = ChiSquareTest.test( - self.__prepare_data_for_test(), - f"{self.current_column}_vector", - self.reference_column, + self.__prepare_data_for_test(reference_column, current_column), + f"{current_column}_vector", + reference_column, True, ) diff --git a/spark/jobs/utils/current_binary.py b/spark/jobs/utils/current_binary.py index 366c9d3f..20939c45 100644 --- a/spark/jobs/utils/current_binary.py +++ b/spark/jobs/utils/current_binary.py @@ -388,7 +388,7 @@ def calculate_drift(self): result_tmp["pValue"] ) feature_dict_to_append["drift_calc"]["has_drift"] = bool( - result_tmp["pValue"] >= 0.05 + result_tmp["pValue"] <= 0.05 ) else: feature_dict_to_append["drift_calc"]["value"] = None @@ -402,7 +402,7 @@ def calculate_drift(self): reference_data=self.reference, current_data=self.current, alpha=0.05, - beta=0.000001, + phi=0.004, ) for column in numerical_features: @@ -414,10 +414,10 @@ def calculate_drift(self): } result_tmp = ks.test(column, column) feature_dict_to_append["drift_calc"]["value"] = float( - result_tmp["ks_Statistic"] + result_tmp["ks_statistic"] ) feature_dict_to_append["drift_calc"]["has_drift"] = bool( - result_tmp["ks_Statistic"] > result_tmp["critical_value"] + result_tmp["ks_statistic"] > result_tmp["critical_value"] ) drift_result["feature_metrics"].append(feature_dict_to_append) diff --git a/spark/jobs/utils/ks.py b/spark/jobs/utils/ks.py index e6b2cc80..4cfadf80 100644 --- a/spark/jobs/utils/ks.py +++ b/spark/jobs/utils/ks.py @@ -9,7 +9,7 @@ class KolmogorovSmirnovTest: It is designed to compare two sample distributions and determine if they differ significantly. """ - def __init__(self, reference_data, current_data, alpha, beta) -> None: + def __init__(self, reference_data, current_data, alpha, phi) -> None: """ Initializes the KolmogorovSmirnovTest with the provided data and parameters. @@ -17,11 +17,12 @@ def __init__(self, reference_data, current_data, alpha, beta) -> None: - reference_data (DataFrame): The reference data as a Spark DataFrame. - current_data (DataFrame): The current data as a Spark DataFrame. - alpha (float): The significance level for the hypothesis test. + - phi (float): ϕ defines the precision of the KS test statistic. """ self.reference_data = reference_data self.current_data = current_data self.alpha = alpha - self.beta = beta + self.phi = phi self.reference_size = self.reference_data.count() self.current_size = self.current_data.count() @@ -38,35 +39,23 @@ def __num_probs(n, delta: float, epsilon: float) -> int: """Calculate number of probability points for approx quantiles; at most this is the number of data points. Returns: - - int_ the number of probability points + - int: the number of probability points """ a = 1 / (delta - epsilon) + 1 return min(ceil(a), n) - def __critical_value_2(self, _alpha): - """Compute the critical value for the KS test at a given alpha level.""" + def __critical_value(self, significance_level) -> float: + """Compute the critical value for the KS test at a given alpha level. + Returns: + - float: the critical value for a given alpha + """ - return np.sqrt(-0.5 * np.log(_alpha / 2)) * np.sqrt( + return np.sqrt(-0.5 * np.log(significance_level / 2)) * np.sqrt( (self.reference_size + self.current_size) / (self.reference_size * self.current_size) ) - def __compute_phi(self, alpha, beta): - """Compute the value of phi for the given parameters.""" - - D_crit = self.__critical_value_2(self.alpha) - D_crit_upper = self.__critical_value_2(alpha + beta) - D_crit_lower = self.__critical_value_2(alpha - beta) - - # Compute the absolute differences - delta_upper = abs(D_crit_upper - D_crit) - delta_lower = abs(D_crit_lower - D_crit) - - # Phi is the minimum of these differences - phi = min(delta_upper, delta_lower) - return phi - def test(self, reference_column, current_column) -> dict: """Approximates two-sample KS distance with precision phi between columns of Spark DataFrames. @@ -76,28 +65,34 @@ def test(self, reference_column, current_column) -> dict: - current_column (str): The column name in the current data. """ - phi = self.__compute_phi(alpha=self.alpha, beta=self.beta) - delta = phi / 2 - eps45x = self.__eps45(delta, self.reference_size) - eps45y = self.__eps45(delta, self.current_size) - ax = self.__num_probs(self.reference_size, delta, eps45x) - ay = self.__num_probs(self.current_size, delta, eps45y) + delta = self.phi / 2 + + eps45x = self.__eps45(delta=delta, n=self.reference_size) + eps45y = self.__eps45(delta=delta, n=self.current_size) + + ax = self.__num_probs(n=self.reference_size, delta=delta, epsilon=eps45x) + ay = self.__num_probs(n=self.current_size, delta=delta, epsilon=eps45y) + pxi = linspace(1 / self.reference_size, 1, ax) pyj = linspace(1 / self.current_size, 1, ay) + xi = self.reference_data.approxQuantile(reference_column, list(pxi), eps45x) yj = self.current_data.approxQuantile(current_column, list(pyj), eps45y) + f_xi = pxi f_yi = interp(xi, yj, pyj) + f_yj = pyj f_xj = interp(yj, xi, pxi) + d_i = max(abs(f_xi - f_yi)) d_j = max(abs(f_xj - f_yj)) d_ks = max(d_i, d_j) - critical_value = self.__critical_value_2(_alpha=self.alpha) + + critical_value = self.__critical_value(significance_level=self.alpha) return { "critical_value": critical_value, - "ks_Statistic": round(d_ks, 10), + "ks_statistic": round(d_ks, 10), "alpha": self.alpha, - "phi": phi, } diff --git a/spark/tests/binary_drift_test.py b/spark/tests/binary_drift_test.py index 42f94ec4..2d1f6818 100644 --- a/spark/tests/binary_drift_test.py +++ b/spark/tests/binary_drift_test.py @@ -120,7 +120,7 @@ def test_drift(spark_fixture, drift_dataset): "drift_calc": { "type": "CHI2", "value": 0.0004993992273872871, - "has_drift": False, + "has_drift": True, }, }, { @@ -128,7 +128,7 @@ def test_drift(spark_fixture, drift_dataset): "drift_calc": { "type": "CHI2", "value": 0.49015296041582523, - "has_drift": True, + "has_drift": False, }, }, { @@ -284,7 +284,7 @@ def test_drift_boolean(spark_fixture, drift_dataset_bool): "drift_calc": { "type": "CHI2", "value": 0.0012340980408668267, - "has_drift": False, + "has_drift": True, }, }, { @@ -292,7 +292,7 @@ def test_drift_boolean(spark_fixture, drift_dataset_bool): "drift_calc": { "type": "CHI2", "value": 0.002699796063260207, - "has_drift": False, + "has_drift": True, }, }, { @@ -370,7 +370,7 @@ def test_drift_bigger_file(spark_fixture, drift_dataset_bigger_file): "drift_calc": { "type": "CHI2", "value": 0.26994857272252293, - "has_drift": True, + "has_drift": False, }, }, { @@ -378,7 +378,7 @@ def test_drift_bigger_file(spark_fixture, drift_dataset_bigger_file): "drift_calc": { "type": "CHI2", "value": 0.3894236957350261, - "has_drift": True, + "has_drift": False, }, }, {