diff --git a/api/README.md b/api/README.md index c7cb4ca3..9d6c2618 100644 --- a/api/README.md +++ b/api/README.md @@ -140,3 +140,12 @@ SPARK_ON_K8S_EXECUTOR_MAX_INSTANCES: 2 ``` Adjust these variables as needed to allocate more resources or modify the number of executor instances for your specific use case. + +### Pods annotations + +If you need for some reasons to add annotations to driver and executor pods, you can adjust the following environment variables in the backend container passing a valid json as in the example: + +``` +SPARK_ON_K8S_SPARK_DRIVER_ANNOTATIONS: '{"my.annotation/driver": "my-value"}' +SPARK_ON_K8S_SPARK_EXECUTOR_ANNOTATIONS: '{"my.annotation/executor": "my-value"}' +``` diff --git a/api/app/main.py b/api/app/main.py index 82f49984..b40f4914 100644 --- a/api/app/main.py +++ b/api/app/main.py @@ -29,7 +29,6 @@ from app.routes.infer_schema_route import InferSchemaRoute from app.routes.metrics_route import MetricsRoute from app.routes.model_route import ModelRoute -from app.routes.spark_job_route import SparkJobRoute from app.routes.upload_dataset_route import UploadDatasetRoute from app.services.file_service import FileService from app.services.metrics_service import MetricsService @@ -122,7 +121,6 @@ async def lifespan(fastapi: FastAPI): app.include_router(UploadDatasetRoute.get_router(file_service), prefix='/api/models') app.include_router(InferSchemaRoute.get_router(file_service), prefix='/api/schema') app.include_router(MetricsRoute.get_router(metrics_service), prefix='/api/models') -app.include_router(SparkJobRoute.get_router(spark_k8s_service), prefix='/api/jobs') app.include_router(HealthcheckRoute.get_healthcheck_route()) diff --git a/api/app/routes/spark_job_route.py b/api/app/routes/spark_job_route.py deleted file mode 100644 index ed53cce0..00000000 --- a/api/app/routes/spark_job_route.py +++ /dev/null @@ -1,15 +0,0 @@ -from fastapi import APIRouter, status - -from app.services.spark_k8s_service import SparkK8SService - - -class SparkJobRoute: - @staticmethod - def get_router(spark_k8s_service: SparkK8SService) -> APIRouter: - router = APIRouter() - - @router.post('/run', status_code=status.HTTP_200_OK) - def run_job(job_name: str): - return spark_k8s_service.run_job(job_name) - - return router diff --git a/api/app/services/file_service.py b/api/app/services/file_service.py index 254f3f2c..e2c78707 100644 --- a/api/app/services/file_service.py +++ b/api/app/services/file_service.py @@ -10,7 +10,8 @@ from fastapi import HTTPException, UploadFile from fastapi_pagination import Page, Params import pandas as pd -from spark_on_k8s.client import SparkOnK8S +from spark_on_k8s.client import ExecutorInstances, PodResources, SparkOnK8S +from spark_on_k8s.utils.configuration import Configuration from app.core.config.config import create_secrets, get_config from app.db.dao.current_dataset_dao import CurrentDatasetDAO @@ -108,8 +109,8 @@ def upload_reference_file( logger.debug('File %s has been correctly stored in the db', inserted_file) spark_config = get_config().spark_config - self.spark_k8s_client.submit_app( - image=spark_config.spark_image, + self.__submit_app( + app_name=str(model_out.uuid), app_path=spark_config.spark_reference_app_path, app_arguments=[ model_out.model_dump_json(), @@ -117,12 +118,6 @@ def upload_reference_file( str(inserted_file.uuid), ReferenceDatasetMetrics.__tablename__, ], - app_name=str(model_out.uuid), - namespace=spark_config.spark_namespace, - service_account=spark_config.spark_service_account, - image_pull_policy=spark_config.spark_image_pull_policy, - app_waiter='no_wait', - secret_values=create_secrets(), ) return ReferenceDatasetDTO.from_reference_dataset(inserted_file) @@ -163,8 +158,8 @@ def bind_reference_file( logger.debug('File %s has been correctly stored in the db', inserted_file) spark_config = get_config().spark_config - self.spark_k8s_client.submit_app( - image=spark_config.spark_image, + self.__submit_app( + app_name=str(model_out.uuid), app_path=spark_config.spark_reference_app_path, app_arguments=[ model_out.model_dump_json(), @@ -172,12 +167,6 @@ def bind_reference_file( str(inserted_file.uuid), ReferenceDatasetMetrics.__tablename__, ], - app_name=str(model_out.uuid), - namespace=spark_config.spark_namespace, - service_account=spark_config.spark_service_account, - image_pull_policy=spark_config.spark_image_pull_policy, - app_waiter='no_wait', - secret_values=create_secrets(), ) return ReferenceDatasetDTO.from_reference_dataset(inserted_file) @@ -252,8 +241,8 @@ def upload_current_file( logger.debug('File %s has been correctly stored in the db', inserted_file) spark_config = get_config().spark_config - self.spark_k8s_client.submit_app( - image=spark_config.spark_image, + self.__submit_app( + app_name=str(model_out.uuid), app_path=spark_config.spark_current_app_path, app_arguments=[ model_out.model_dump_json(), @@ -262,12 +251,6 @@ def upload_current_file( reference_dataset.path.replace('s3://', 's3a://'), CurrentDatasetMetrics.__tablename__, ], - app_name=str(model_out.uuid), - namespace=spark_config.spark_namespace, - service_account=spark_config.spark_service_account, - image_pull_policy=spark_config.spark_image_pull_policy, - app_waiter='no_wait', - secret_values=create_secrets(), ) return CurrentDatasetDTO.from_current_dataset(inserted_file) @@ -311,8 +294,8 @@ def bind_current_file( logger.debug('File %s has been correctly stored in the db', inserted_file) spark_config = get_config().spark_config - self.spark_k8s_client.submit_app( - image=spark_config.spark_image, + self.__submit_app( + app_name=str(model_out.uuid), app_path=spark_config.spark_current_app_path, app_arguments=[ model_out.model_dump_json(), @@ -321,12 +304,6 @@ def bind_current_file( reference_dataset.path.replace('s3://', 's3a://'), CurrentDatasetMetrics.__tablename__, ], - app_name=str(model_out.uuid), - namespace=spark_config.spark_namespace, - service_account=spark_config.spark_service_account, - image_pull_policy=spark_config.spark_image_pull_policy, - app_waiter='no_wait', - secret_values=create_secrets(), ) return CurrentDatasetDTO.from_current_dataset(inserted_file) @@ -471,3 +448,36 @@ def validate_file( csv_file.file.flush() csv_file.file.seek(0) + + def __submit_app( + self, app_name: str, app_path: str, app_arguments: List[str] + ) -> None: + spark_config = get_config().spark_config + self.spark_k8s_client.submit_app( + image=spark_config.spark_image, + app_path=app_path, + app_arguments=app_arguments, + app_name=app_name, + namespace=spark_config.spark_namespace, + service_account=spark_config.spark_service_account, + image_pull_policy=spark_config.spark_image_pull_policy, + app_waiter='no_wait', + secret_values=create_secrets(), + driver_annotations=Configuration.SPARK_ON_K8S_SPARK_DRIVER_ANNOTATIONS, + executor_annotations=Configuration.SPARK_ON_K8S_SPARK_EXECUTOR_ANNOTATIONS, + driver_resources=PodResources( + cpu=Configuration.SPARK_ON_K8S_DRIVER_CPU, + memory=Configuration.SPARK_ON_K8S_DRIVER_MEMORY, + memory_overhead=Configuration.SPARK_ON_K8S_DRIVER_MEMORY_OVERHEAD, + ), + executor_resources=PodResources( + cpu=Configuration.SPARK_ON_K8S_EXECUTOR_CPU, + memory=Configuration.SPARK_ON_K8S_EXECUTOR_MEMORY, + memory_overhead=Configuration.SPARK_ON_K8S_EXECUTOR_MEMORY_OVERHEAD, + ), + executor_instances=ExecutorInstances( + min=Configuration.SPARK_ON_K8S_EXECUTOR_MIN_INSTANCES, + max=Configuration.SPARK_ON_K8S_EXECUTOR_MAX_INSTANCES, + initial=Configuration.SPARK_ON_K8S_EXECUTOR_INITIAL_INSTANCES, + ), + )