diff --git a/spark/jobs/completion_job.py b/spark/jobs/completion_job.py index d601c223..003ef2d9 100644 --- a/spark/jobs/completion_job.py +++ b/spark/jobs/completion_job.py @@ -1,8 +1,10 @@ import sys import os import uuid - +import orjson from pyspark.sql.types import StructField, StructType, StringType + +from metrics.completion_metrics import LLMMetrics from utils.models import JobStatus from utils.db import update_job_status, write_to_db @@ -13,7 +15,11 @@ def compute_metrics(df: DataFrame) -> dict: complete_record = {} - # TODO: compute model quality metrics + completion_service = LLMMetrics() + model_quality = completion_service.extract_metrics(df) + complete_record["MODEL_QUALITY"] = orjson.dumps(model_quality.model_dump(serialize_as_any=True)).decode( + "utf-8" + ) return complete_record @@ -43,6 +49,7 @@ def main( spark_context._jsc.hadoopConfiguration().set( "fs.s3a.connection.ssl.enabled", "false" ) + print(completion_dataset_path) df = spark_session.read.option("multiline", "true").json(completion_dataset_path) complete_record = compute_metrics(df)