Skip to content

Commit

Permalink
feat: added regression line calculation (#109)
Browse files Browse the repository at this point in the history
* feat: added regression line calculation

* feat: fixed tests for abalone dataset
  • Loading branch information
SteZamboni authored Jul 16, 2024
1 parent cb9a699 commit c5511ed
Show file tree
Hide file tree
Showing 7 changed files with 13,086 additions and 21,447 deletions.
27 changes: 27 additions & 0 deletions spark/jobs/metrics/model_quality_regression_calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from pyspark.sql.types import ArrayType, FloatType
from pyspark.ml.feature import StandardScaler, VectorAssembler, Bucketizer
from pyspark.ml.stat import KolmogorovSmirnovTest
from pyspark.ml.regression import LinearRegression

from models.regression_model_quality import (
RegressionMetricType,
Expand Down Expand Up @@ -181,6 +182,29 @@ def create_histogram(dataframe: DataFrame, feature: str):
res = result_df.select("value_count").rdd.flatMap(lambda x: x).collect()
return Histogram(buckets=buckets_spacing, values=res)

@staticmethod
def get_regression_line(model: ModelOut, dataframe: DataFrame):
dataframe_clean = dataframe.filter(
is_not_null(model.outputs.prediction.name) & is_not_null(model.target.name)
).select(model.outputs.prediction.name, model.target.name)

va = VectorAssembler(
inputCols=[model.outputs.prediction.name], outputCol="features"
)

data_va = va.transform(dataframe_clean)

train_data = data_va.select("features", model.target.name)

lr = LinearRegression(labelCol=model.target.name)

# Fit the model to the data and call this model lrModel
lr_model = lr.fit(train_data)
c = lr_model.coefficients[0]
i = lr_model.intercept

return [[0, i], [1, c + i]]

@staticmethod
def residual_metrics(model: ModelOut, dataframe: DataFrame):
residual_df_norm = ModelQualityRegressionCalculator.residual_calculation(
Expand Down Expand Up @@ -209,4 +233,7 @@ def residual_metrics(model: ModelOut, dataframe: DataFrame):
"targets": residual_df_norm.select(model.target.name)
.rdd.flatMap(lambda x: x)
.collect(),
"regression_line": ModelQualityRegressionCalculator.get_regression_line(
model, dataframe
),
}
40 changes: 22 additions & 18 deletions spark/tests/regression_current_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,20 @@ def reference_bike_dataframe(spark_fixture, test_data_dir):


@pytest.fixture()
def current_test_fe(spark_fixture, test_data_dir):
def current_test_abalone(spark_fixture, test_data_dir):
yield (
spark_fixture.read.csv(
f"{test_data_dir}/current/regression/regression_reference_test_FE.csv",
f"{test_data_dir}/current/regression/regression_abalone_current1.csv",
header=True,
)
)


@pytest.fixture()
def reference_test_fe(spark_fixture, test_data_dir):
def reference_test_abalone(spark_fixture, test_data_dir):
yield (
spark_fixture.read.csv(
f"{test_data_dir}/reference/regression/regression_reference_test_FE.csv",
f"{test_data_dir}/reference/regression/regression_abalone_reference.csv",
header=True,
)
)
Expand Down Expand Up @@ -95,7 +95,7 @@ def model():


@pytest.fixture()
def model_test_fe():
def model_test_abalone():
output = OutputType(
prediction=ColumnDefinition(name="prediction", type=SupportedTypes.int),
prediction_proba=None,
Expand Down Expand Up @@ -134,18 +134,18 @@ def model_test_fe():


@pytest.fixture()
def current_dataset_fe(current_test_fe, model_test_fe):
def current_dataset_abalone(current_test_abalone, model_test_abalone):
yield CurrentDataset(
raw_dataframe=current_test_fe,
model=model_test_fe,
raw_dataframe=current_test_abalone,
model=model_test_abalone,
)


@pytest.fixture()
def reference_dataset_fe(reference_test_fe, model_test_fe):
def reference_dataset_abalone(reference_test_abalone, model_test_abalone):
yield ReferenceDataset(
raw_dataframe=reference_test_fe,
model=model_test_fe,
raw_dataframe=reference_test_abalone,
model=model_test_abalone,
)


Expand Down Expand Up @@ -227,18 +227,20 @@ def test_model_quality(spark_fixture, current_dataset, reference_dataset):
)


def test_model_quality_test_fe(spark_fixture, current_dataset_fe, reference_dataset_fe):
def test_model_quality_abalone(
spark_fixture, current_dataset_abalone, reference_dataset_abalone
):
metrics_service = CurrentMetricsRegressionService(
spark_session=spark_fixture,
current=current_dataset_fe,
reference=reference_dataset_fe,
current=current_dataset_abalone,
reference=reference_dataset_abalone,
)

model_quality = metrics_service.calculate_model_quality()

assert not deepdiff.DeepDiff(
model_quality,
res.test_model_quality_test_fe_res,
res.test_model_quality_abalone_res,
ignore_order=True,
ignore_type_subclasses=True,
)
Expand All @@ -261,11 +263,13 @@ def test_drift_regression(spark_fixture, current_dataset, reference_dataset):
)


def test_drift_regression_chi(spark_fixture, current_dataset_fe, reference_dataset_fe):
def test_drift_regression_chi(
spark_fixture, current_dataset_abalone, reference_dataset_abalone
):
metrics_service = CurrentMetricsRegressionService(
spark_session=spark_fixture,
current=current_dataset_fe,
reference=reference_dataset_fe,
current=current_dataset_abalone,
reference=reference_dataset_abalone,
)

drift = metrics_service.calculate_drift()
Expand Down
14 changes: 7 additions & 7 deletions spark/tests/regression_reference_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ def reference_bike_nulls(spark_fixture, test_data_dir):


@pytest.fixture()
def reference_test_fe(spark_fixture, test_data_dir):
def reference_abalone(spark_fixture, test_data_dir):
yield spark_fixture.read.csv(
f"{test_data_dir}/reference/regression/regression_reference_test_FE.csv",
f"{test_data_dir}/reference/regression/regression_abalone_reference.csv",
header=True,
)

Expand Down Expand Up @@ -135,7 +135,7 @@ def reference_dataset_nulls(spark_fixture, reference_bike_nulls):


@pytest.fixture()
def reference_dataset_test_fe(spark_fixture, reference_test_fe):
def reference_dataset_abalone(spark_fixture, reference_abalone):
output = OutputType(
prediction=ColumnDefinition(name="prediction", type=SupportedTypes.int),
prediction_proba=None,
Expand Down Expand Up @@ -173,7 +173,7 @@ def reference_dataset_test_fe(spark_fixture, reference_test_fe):
)

yield ReferenceDataset(
raw_dataframe=reference_test_fe,
raw_dataframe=reference_abalone,
model=model,
)

Expand All @@ -192,16 +192,16 @@ def test_model_quality_metrics(reference_dataset):
)


def test_model_quality_fe(reference_dataset_test_fe):
def test_model_quality_abalone(reference_dataset_abalone):
metrics_service = ReferenceMetricsRegressionService(
reference=reference_dataset_test_fe,
reference=reference_dataset_abalone,
)

model_quality = metrics_service.calculate_model_quality()

assert not deepdiff.DeepDiff(
model_quality,
res.test_model_quality_fe_res,
res.test_model_quality_abalone_res,
ignore_order=True,
ignore_type_subclasses=True,
)
Expand Down
Loading

0 comments on commit c5511ed

Please sign in to comment.