Skip to content

Commit

Permalink
feat: add statistics for multiclassification + refactoring and improv…
Browse files Browse the repository at this point in the history
…ements (#35)

* fix: refactoring for statistics

* feat: statistics multiclass and refactoring

* fix: refactoring

* refactor: cleaned code

* feat: improved spark docker image

* fix: readme

* feat: add preload image

* fix: regenerate poetry.lock

* fix: change action
  • Loading branch information
rivamarco authored Jun 26, 2024
1 parent f8fdb5a commit 18588ea
Show file tree
Hide file tree
Showing 23 changed files with 901 additions and 855 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/trivy-scan.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ jobs:
scan-spark:
needs: [changes]
if: ${{ github.event_name == 'push' || ( github.event_name == 'pull_request' && contains(needs.changes.outputs.changed_files, 'spark/poetry.lock') ) }}
uses: radicalbit/radicalbit-github-workflows/.github/workflows/trivy-fs-scan.yaml@v1
uses: radicalbit/radicalbit-github-workflows/.github/workflows/trivy-fs-scan.yaml@main
with:
directory: ./spark
prcomment: ${{ github.event_name == 'pull_request' && contains(needs.changes.outputs.changed_files, 'spark/poetry.lock') }}
Expand Down
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,5 @@ ui/*.sw?
## K3S SPECIFICS #
#####################

docker/k3s_data/kubeconfig/
docker/k3s_data/kubeconfig/
docker/k3s_data/images/
2 changes: 2 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ services:
- ./docker/k3s_data/manifests/spark-init.yaml:/var/lib/rancher/k3s/server/manifests/spark-init.yaml
# Mount entrypoint
- ./docker/k3s_data/init/entrypoint.sh:/opt/entrypoint/entrypoint.sh
# Preload docker images
- ./docker/k3s_data/images:/var/lib/rancher/k3s/agent/images
expose:
- "6443" # Kubernetes API Server
- "80" # Ingress controller port 80
Expand Down
18 changes: 16 additions & 2 deletions spark/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
FROM python:3.10.14-slim AS build

WORKDIR /build
COPY poetry.lock pyproject.toml ./
RUN pip install --no-cache-dir poetry==1.8.3 && \
poetry export -f requirements.txt -o requirements.txt


FROM spark:3.5.1-scala2.12-java17-python3-ubuntu

# Requirements from previous step
COPY --from=build /build/requirements.txt .

# Adding needed jar
RUN curl -o /opt/spark/jars/bcprov-jdk15on-1.70.jar https://repo1.maven.org/maven2/org/bouncycastle/bcprov-jdk15on/1.70/bcprov-jdk15on-1.70.jar && \
curl -o /opt/spark/jars/bcpkix-jdk15on-1.70.jar https://repo1.maven.org/maven2/org/bouncycastle/bcpkix-jdk15on/1.70/bcpkix-jdk15on-1.70.jar && \
Expand All @@ -9,8 +20,11 @@ RUN curl -o /opt/spark/jars/bcprov-jdk15on-1.70.jar https://repo1.maven.org/mave

USER root

# Adding needed python libs that will be used by pyspark jobs
RUN pip install numpy pydantic pandas psycopg2-binary orjson scipy
RUN apt-get update && \
apt-get install -y --no-install-recommends gcc libpq-dev python3-dev

# Install requirements coming from pyproject
RUN pip install --no-cache-dir -r requirements.txt

USER spark

Expand Down
28 changes: 24 additions & 4 deletions spark/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,35 @@

This folder contains files to create the Spark docker image that will be used to calculate metrics.

The custom image is created using the `Dockerfile` and it is a base Spark image where are installed some additional dependencies and loaded with the custom jobs located in the `jobs` folder.

To create an additional job, add a `.py` file in `jobs` folder (take as an example `reference_job.py` for the boilerplate)
The custom image is created using the `Dockerfile` and it is a base Spark image where are installed additional dependencies and loaded with custom jobs located in the `jobs` folder.

### Development

This is a poetry project that can be used to develop and test the jobs before putting them in the docker image.

NB: if additional python dependencies are needed, pleas add them in `Dockerfile` accordingly, and not only in the `pyproject.toml`
To create an additional job, add a `.py` file in `jobs` folder (take as an example `reference_job.py` for the boilerplate) and write unit tests

### End-to-end testing

Before publishing the image is possible to test the platform with new development or improvement done in the spark image.

From this project folder, run

```bash
docker build . -t radicalbit-spark-py:develop && docker save radicalbit-spark-py:develop -o ../docker/k3s_data/images/radicalbit-spark-py:develop.tar
```

This will build and save the new image in `/docker/k3s_data/images/`.

To use this image in the Radicalbit Platform, the docker compose must be modified adding the following environment variable in the `api` container:

```
SPARK_IMAGE: "radicalbit-spark-py:develop"
```

When the k3s cluster inside the docker compose will start, it will automatically load the saved image that can be used to test the code during the development.

NB: when a new image is built and saved, the k3s container must be restarted

#### Formatting and linting

Expand Down
25 changes: 10 additions & 15 deletions spark/jobs/current_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
import orjson
from pyspark.sql.types import StructType, StructField, StringType

from jobs.metrics.statistics import calculate_statistics_current
from jobs.models.current_dataset import CurrentDataset
from jobs.models.reference_dataset import ReferenceDataset
from utils.current import CurrentMetricsService
from utils.models import JobStatus, ModelOut
from utils.spark import apply_schema_to_dataframe
from utils.db import update_job_status, write_to_db

from pyspark.sql import SparkSession
Expand Down Expand Up @@ -42,22 +44,15 @@ def main(
"fs.s3a.connection.ssl.enabled", "false"
)

current_schema = model.to_current_spark_schema()
current_dataset = spark_session.read.csv(current_dataset_path, header=True)
current_dataset = apply_schema_to_dataframe(current_dataset, current_schema)
current_dataset = current_dataset.select(
*[c for c in current_schema.names if c in current_dataset.columns]
)
reference_schema = model.to_reference_spark_schema()
reference_dataset = spark_session.read.csv(reference_dataset_path, header=True)
reference_dataset = apply_schema_to_dataframe(reference_dataset, reference_schema)
reference_dataset = reference_dataset.select(
*[c for c in reference_schema.names if c in reference_dataset.columns]
)
raw_current = spark_session.read.csv(current_dataset_path, header=True)
current_dataset = CurrentDataset(model=model, raw_dataframe=raw_current)
raw_reference = spark_session.read.csv(reference_dataset_path, header=True)
reference_dataset = ReferenceDataset(model=model, raw_dataframe=raw_reference)

metrics_service = CurrentMetricsService(
spark_session, current_dataset, reference_dataset, model=model
spark_session, current_dataset.current, reference_dataset.reference, model=model
)
statistics = metrics_service.calculate_statistics()
statistics = calculate_statistics_current(current_dataset)
data_quality = metrics_service.calculate_data_quality()
model_quality = metrics_service.calculate_model_quality_with_group_by_timestamp()
drift = metrics_service.calculate_drift()
Expand Down
Empty file added spark/jobs/metrics/__init__.py
Empty file.
148 changes: 148 additions & 0 deletions spark/jobs/metrics/statistics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
from models.current_dataset import CurrentDataset
from models.reference_dataset import ReferenceDataset
import pyspark.sql.functions as F

N_VARIABLES = "n_variables"
N_OBSERVATION = "n_observations"
MISSING_CELLS = "missing_cells"
MISSING_CELLS_PERC = "missing_cells_perc"
DUPLICATE_ROWS = "duplicate_rows"
DUPLICATE_ROWS_PERC = "duplicate_rows_perc"
NUMERIC = "numeric"
CATEGORICAL = "categorical"
DATETIME = "datetime"


# FIXME use pydantic struct like data quality
def calculate_statistics_reference(
reference_dataset: ReferenceDataset,
) -> dict[str, float]:
number_of_variables = len(reference_dataset.get_all_variables())
number_of_observations = reference_dataset.reference_count
number_of_numerical = len(reference_dataset.get_numerical_variables())
number_of_categorical = len(reference_dataset.get_categorical_variables())
number_of_datetime = len(reference_dataset.get_datetime_variables())
reference_columns = reference_dataset.reference.columns

stats = (
reference_dataset.reference.select(
[
F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c)
if t not in ("datetime", "date", "timestamp", "bool", "boolean")
else F.count(F.when(F.col(c).isNull(), c)).alias(c)
for c, t in reference_dataset.reference.dtypes
]
)
.withColumn(MISSING_CELLS, sum([F.col(c) for c in reference_columns]))
.withColumn(
MISSING_CELLS_PERC,
(F.col(MISSING_CELLS) / (number_of_variables * number_of_observations))
* 100,
)
.withColumn(
DUPLICATE_ROWS,
F.lit(
number_of_observations
- reference_dataset.reference.dropDuplicates(
[
c
for c in reference_columns
if c != reference_dataset.model.timestamp.name
]
).count()
),
)
.withColumn(
DUPLICATE_ROWS_PERC,
(F.col(DUPLICATE_ROWS) / number_of_observations) * 100,
)
.withColumn(N_VARIABLES, F.lit(number_of_variables))
.withColumn(N_OBSERVATION, F.lit(number_of_observations))
.withColumn(NUMERIC, F.lit(number_of_numerical))
.withColumn(CATEGORICAL, F.lit(number_of_categorical))
.withColumn(DATETIME, F.lit(number_of_datetime))
.select(
*[
MISSING_CELLS,
MISSING_CELLS_PERC,
DUPLICATE_ROWS,
DUPLICATE_ROWS_PERC,
N_VARIABLES,
N_OBSERVATION,
NUMERIC,
CATEGORICAL,
DATETIME,
]
)
.toPandas()
.to_dict(orient="records")[0]
)

return stats


def calculate_statistics_current(
current_dataset: CurrentDataset,
) -> dict[str, float]:
number_of_variables = len(current_dataset.get_all_variables())
number_of_observations = current_dataset.current_count
number_of_numerical = len(current_dataset.get_numerical_variables())
number_of_categorical = len(current_dataset.get_categorical_variables())
number_of_datetime = len(current_dataset.get_datetime_variables())
reference_columns = current_dataset.current.columns

stats = (
current_dataset.current.select(
[
F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c)
if t not in ("datetime", "date", "timestamp", "bool", "boolean")
else F.count(F.when(F.col(c).isNull(), c)).alias(c)
for c, t in current_dataset.current.dtypes
]
)
.withColumn(MISSING_CELLS, sum([F.col(c) for c in reference_columns]))
.withColumn(
MISSING_CELLS_PERC,
(F.col(MISSING_CELLS) / (number_of_variables * number_of_observations))
* 100,
)
.withColumn(
DUPLICATE_ROWS,
F.lit(
number_of_observations
- current_dataset.current.dropDuplicates(
[
c
for c in reference_columns
if c != current_dataset.model.timestamp.name
]
).count()
),
)
.withColumn(
DUPLICATE_ROWS_PERC,
(F.col(DUPLICATE_ROWS) / number_of_observations) * 100,
)
.withColumn(N_VARIABLES, F.lit(number_of_variables))
.withColumn(N_OBSERVATION, F.lit(number_of_observations))
.withColumn(NUMERIC, F.lit(number_of_numerical))
.withColumn(CATEGORICAL, F.lit(number_of_categorical))
.withColumn(DATETIME, F.lit(number_of_datetime))
.select(
*[
MISSING_CELLS,
MISSING_CELLS_PERC,
DUPLICATE_ROWS,
DUPLICATE_ROWS_PERC,
N_VARIABLES,
N_OBSERVATION,
NUMERIC,
CATEGORICAL,
DATETIME,
]
)
.toPandas()
.to_dict(orient="records")[0]
)

return stats
Empty file added spark/jobs/models/__init__.py
Empty file.
97 changes: 97 additions & 0 deletions spark/jobs/models/current_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
from typing import List

from pyspark.sql import DataFrame
from pyspark.sql.types import DoubleType, StructField, StructType

from utils.models import ModelOut, ModelType, ColumnDefinition
from utils.spark import apply_schema_to_dataframe


class CurrentDataset:
def __init__(self, model: ModelOut, raw_dataframe: DataFrame):
current_schema = self.spark_schema(model)
current_dataset = apply_schema_to_dataframe(raw_dataframe, current_schema)

self.model = model
self.current = current_dataset.select(
*[c for c in current_schema.names if c in current_dataset.columns]
)
self.current_count = self.current.count()

# FIXME this must exclude target when we will have separate current and ground truth
@staticmethod
def spark_schema(model: ModelOut):
all_features = (
model.features + [model.target] + [model.timestamp] + model.outputs.output
)
if model.outputs.prediction_proba and model.model_type == ModelType.BINARY:
enforce_float = [
model.target.name,
model.outputs.prediction.name,
model.outputs.prediction_proba.name,
]
elif model.model_type == ModelType.BINARY:
enforce_float = [model.target.name, model.outputs.prediction.name]
else:
enforce_float = []
return StructType(
[
StructField(
name=feature.name,
dataType=model.convert_types(feature.type),
nullable=False,
)
if feature.name not in enforce_float
else StructField(
name=feature.name,
dataType=DoubleType(),
nullable=False,
)
for feature in all_features
]
)

def get_numerical_features(self) -> List[ColumnDefinition]:
return [feature for feature in self.model.features if feature.is_numerical()]

def get_categorical_features(self) -> List[ColumnDefinition]:
return [feature for feature in self.model.features if feature.is_categorical()]

# FIXME this must exclude target when we will have separate current and ground truth
def get_numerical_variables(self) -> List[ColumnDefinition]:
all_features = (
self.model.features
+ [self.model.target]
+ [self.model.timestamp]
+ self.model.outputs.output
)
return [feature for feature in all_features if feature.is_numerical()]

# FIXME this must exclude target when we will have separate current and ground truth
def get_categorical_variables(self) -> List[ColumnDefinition]:
all_features = (
self.model.features
+ [self.model.target]
+ [self.model.timestamp]
+ self.model.outputs.output
)
return [feature for feature in all_features if feature.is_categorical()]

# FIXME this must exclude target when we will have separate current and ground truth
def get_datetime_variables(self) -> List[ColumnDefinition]:
all_features = (
self.model.features
+ [self.model.target]
+ [self.model.timestamp]
+ self.model.outputs.output
)
return [feature for feature in all_features if feature.is_datetime()]

# FIXME this must exclude target when we will have separate current and ground truth
def get_all_variables(self) -> List[ColumnDefinition]:
return (
self.model.features
+ [self.model.target]
+ [self.model.timestamp]
+ self.model.outputs.output
)
Loading

0 comments on commit 18588ea

Please sign in to comment.