Skip to content

Commit

Permalink
solved conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
maocorte committed Jun 21, 2024
2 parents 1c0e32a + 7513b39 commit 8e4c015
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 28 deletions.
29 changes: 12 additions & 17 deletions spark/jobs/utils/current.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,14 @@ def __init__(
self.spark_session = spark_session
self.current = current
self.reference = reference
self.current_count = self.current.count()
self.reference_count = self.reference.count()
self.model = model

# FIXME use pydantic struct like data quality
def calculate_statistics(self) -> dict[str, float]:
number_of_variables = len(self.model.get_all_variables_current())
number_of_observations = self.current.count()
number_of_observations = self.current_count
number_of_numerical = len(self.model.get_numerical_variables_current())
number_of_categorical = len(self.model.get_categorical_variables_current())
number_of_datetime = len(self.model.get_datetime_variables_current())
Expand Down Expand Up @@ -188,12 +190,11 @@ def split_dict(dictionary):
for x in numerical_features
]

# FIXME maybe don't self.current.count()
missing_values_perc_agg = [
(
(
f.count(f.when(f.col(x).isNull() | f.isnan(x), x))
/ self.current.count()
/ self.current_count
)
* 100
).alias(f"{x}-missing_values_perc")
Expand All @@ -207,11 +208,10 @@ def split_dict(dictionary):
for x in numerical_features
]

# FIXME don't use self.current.count()
freq_agg = [
(
f.count(f.when(f.col(x).isNotNull() & ~f.isnan(x), True))
/ self.current.count()
/ self.current_count
).alias(f"{x}-frequency")
for x in numerical_features
]
Expand Down Expand Up @@ -329,11 +329,10 @@ def split_dict(dictionary):
for x in categorical_features
]

# FIXME maybe don't self.current.count()
missing_values_perc_agg = [
(
(f.count(f.when(f.col(x).isNull(), x)) / self.current.count()) * 100
).alias(f"{x}-missing_values_perc")
((f.count(f.when(f.col(x).isNull(), x)) / self.current_count) * 100).alias(
f"{x}-missing_values_perc"
)
for x in categorical_features
]

Expand All @@ -351,7 +350,6 @@ def split_dict(dictionary):

# FIXME by design this is not efficient
# FIXME understand if we want to divide by whole or by number of not null
# FIXME don't use self.reference.count()

count_distinct_categories = {
column: dict(
Expand All @@ -361,7 +359,7 @@ def split_dict(dictionary):
.agg(*[f.count(check_not_null(column)).alias("count")])
.withColumn(
"freq",
f.col("count") / self.current.count(),
f.col("count") / self.current_count,
)
.toPandas()
.set_index(column)
Expand Down Expand Up @@ -393,7 +391,7 @@ def calculate_class_metrics(self) -> List[ClassMetrics]:
number_of_true = number_true_and_false.get(1.0, 0)
number_of_false = number_true_and_false.get(0.0, 0)

number_of_observations = self.current.count()
number_of_observations = self.current_count

return [
ClassMetrics(
Expand All @@ -415,7 +413,7 @@ def calculate_data_quality(self) -> BinaryClassDataQuality:
if self.model.get_categorical_features():
feature_metrics.extend(self.calculate_data_quality_categorical())
return BinaryClassDataQuality(
n_observations=self.current.count(),
n_observations=self.current_count,
class_metrics=self.calculate_class_metrics(),
feature_metrics=feature_metrics,
)
Expand Down Expand Up @@ -742,9 +740,6 @@ def calculate_drift(self):
drift_result = dict()
drift_result["feature_metrics"] = []

ref_count = self.reference.count()
cur_count = self.current.count()

categorical_features = [
categorical.name for categorical in self.model.get_categorical_features()
]
Expand All @@ -761,7 +756,7 @@ def calculate_drift(self):
"type": "CHI2",
},
}
if ref_count > 5 and cur_count > 5:
if self.reference_count > 5 and self.current_count > 5:
result_tmp = chi2.test(column, column)
feature_dict_to_append["drift_calc"]["value"] = float(
result_tmp["pValue"]
Expand Down
19 changes: 8 additions & 11 deletions spark/jobs/utils/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class ReferenceMetricsService:
def __init__(self, reference: DataFrame, model: ModelOut):
self.model = model
self.reference = reference
self.reference_count = self.reference.count()

def __evaluate_binary_classification(
self, dataset: DataFrame, metric_name: str
Expand Down Expand Up @@ -99,7 +100,7 @@ def __calc_mc_metrics(self) -> dict[str, float]:
# FIXME use pydantic struct like data quality
def calculate_statistics(self) -> dict[str, float]:
number_of_variables = len(self.model.get_all_variables_reference())
number_of_observations = self.reference.count()
number_of_observations = self.reference_count
number_of_numerical = len(self.model.get_numerical_variables_reference())
number_of_categorical = len(self.model.get_categorical_variables_reference())
number_of_datetime = len(self.model.get_datetime_variables_reference())
Expand Down Expand Up @@ -256,12 +257,11 @@ def split_dict(dictionary):
for x in numerical_features
]

# FIXME maybe don't self.reference.count()
missing_values_perc_agg = [
(
(
f.count(f.when(f.col(x).isNull() | f.isnan(x), x))
/ self.reference.count()
/ self.reference_count
)
* 100
).alias(f"{x}-missing_values_perc")
Expand All @@ -275,11 +275,10 @@ def split_dict(dictionary):
for x in numerical_features
]

# FIXME don't use self.reference.count()
freq_agg = [
(
f.count(f.when(f.col(x).isNotNull() & ~f.isnan(x), True))
/ self.reference.count()
/ self.reference_count
).alias(f"{x}-frequency")
for x in numerical_features
]
Expand Down Expand Up @@ -403,10 +402,9 @@ def split_dict(dictionary):
for x in categorical_features
]

# FIXME maybe don't self.reference.count()
missing_values_perc_agg = [
(
(f.count(f.when(f.col(x).isNull(), x)) / self.reference.count()) * 100
(f.count(f.when(f.col(x).isNull(), x)) / self.reference_count) * 100
).alias(f"{x}-missing_values_perc")
for x in categorical_features
]
Expand All @@ -425,7 +423,6 @@ def split_dict(dictionary):

# FIXME by design this is not efficient
# FIXME understand if we want to divide by whole or by number of not null
# FIXME don't use self.reference.count()

count_distinct_categories = {
column: dict(
Expand All @@ -435,7 +432,7 @@ def split_dict(dictionary):
.agg(*[f.count(check_not_null(column)).alias("count")])
.withColumn(
"freq",
f.col("count") / self.reference.count(),
f.col("count") / self.reference_count,
)
.toPandas()
.set_index(column)
Expand Down Expand Up @@ -467,7 +464,7 @@ def calculate_class_metrics(self) -> List[ClassMetrics]:
number_of_true = number_true_and_false.get(1.0, 0)
number_of_false = number_true_and_false.get(0.0, 0)

number_of_observations = self.reference.count()
number_of_observations = self.reference_count

return [
ClassMetrics(
Expand All @@ -489,7 +486,7 @@ def calculate_data_quality(self) -> BinaryClassDataQuality:
if self.model.get_categorical_features():
feature_metrics.extend(self.calculate_data_quality_categorical())
return BinaryClassDataQuality(
n_observations=self.reference.count(),
n_observations=self.reference_count,
class_metrics=self.calculate_class_metrics(),
feature_metrics=feature_metrics,
)

0 comments on commit 8e4c015

Please sign in to comment.