From 45baad93ac0c1cef0fb94c3e48a465adc04968d0 Mon Sep 17 00:00:00 2001 From: Simon Mazenoux Date: Fri, 8 Sep 2023 14:00:14 +0200 Subject: [PATCH] feat: implement jobloggingdb --- run_local.sh | 1 + setup.cfg | 1 + src/diracx/core/models.py | 47 +++ src/diracx/core/utils.py | 19 +- src/diracx/db/__init__.py | 4 +- src/diracx/db/jobs/db.py | 171 ++++++++- src/diracx/db/jobs/schema.py | 40 ++- src/diracx/db/jobs/status_utility.py | 148 ++++++++ src/diracx/routers/dependencies.py | 3 + src/diracx/routers/job_manager/__init__.py | 188 +++++++--- tests/conftest.py | 1 + tests/db/{ => jobs}/test_jobDB.py | 0 tests/db/jobs/test_jobLoggingDB.py | 38 ++ tests/routers/test_job_manager.py | 391 ++++++++++++++++++++- 14 files changed, 968 insertions(+), 84 deletions(-) create mode 100644 src/diracx/db/jobs/status_utility.py rename tests/db/{ => jobs}/test_jobDB.py (100%) create mode 100644 tests/db/jobs/test_jobLoggingDB.py diff --git a/run_local.sh b/run_local.sh index d53ef40c..5fe2f2e5 100755 --- a/run_local.sh +++ b/run_local.sh @@ -16,6 +16,7 @@ dirac internal generate-cs "${tmp_dir}/cs_store/initialRepo" --vo=diracAdmin --u export DIRACX_CONFIG_BACKEND_URL="git+file://${tmp_dir}/cs_store/initialRepo" export DIRACX_DB_URL_AUTHDB="sqlite+aiosqlite:///:memory:" export DIRACX_DB_URL_JOBDB="sqlite+aiosqlite:///:memory:" +export DIRACX_DB_URL_JOBLOGGINGDB="sqlite+aiosqlite:///:memory:" export DIRACX_SERVICE_AUTH_TOKEN_KEY="file://${tmp_dir}/signing-key/rs256.key" export DIRACX_SERVICE_AUTH_ALLOWED_REDIRECTS='["http://'$(hostname| tr -s '[:upper:]' '[:lower:]')':8000/docs/oauth2-redirect"]' diff --git a/setup.cfg b/setup.cfg index cebde400..f70bd5d1 100644 --- a/setup.cfg +++ b/setup.cfg @@ -76,6 +76,7 @@ diracx = diracx.dbs = AuthDB = diracx.db:AuthDB JobDB = diracx.db:JobDB + JobLoggingDB = diracx.db:JobLoggingDB SandboxMetadataDB = diracx.db:SandboxMetadataDB #DummyDB = diracx.db:DummyDB diracx.services = diff --git a/src/diracx/core/models.py b/src/diracx/core/models.py index abd85444..fa604801 100644 --- a/src/diracx/core/models.py +++ b/src/diracx/core/models.py @@ -1,8 +1,13 @@ from __future__ import annotations +from datetime import datetime from enum import Enum from typing import Literal, TypedDict +from pydantic import BaseModel, Field + +from diracx.core.utils import JobStatus + class ScalarSearchOperator(str, Enum): EQUAL = "eq" @@ -35,4 +40,46 @@ class VectorSearchSpec(TypedDict): values: list[str] +class JobStatusUpdate(BaseModel): + status: JobStatus | None = Field( + default=None, + alias="Status", + ) + minor_status: str | None = Field( + default=None, + alias="MinorStatus", + serialization_alias="minorStatus", + ) + application_status: str | None = Field( + default=None, + alias="ApplicationStatus", + serialization_alias="applicationStatus", + ) + status_source: str = Field( + alias="StatusSource", + default="Unknown", + ) + + +class LimitedJobStatusReturn(BaseModel): + status: JobStatus = Field(alias="Status") + minor_status: str = Field(alias="MinorStatus") + application_status: str = Field(alias="ApplicationStatus") + + +class JobStatusReturn(LimitedJobStatusReturn): + status_time: datetime = Field(alias="StatusTime") + status_source: str = Field(alias="StatusSource") + + +class SetJobStatusReturn(BaseModel): + status: JobStatus | None = Field(alias="Status") + minor_status: str | None = Field(alias="MinorStatus") + application_status: str | None = Field(alias="ApplicationStatus") + heartbeat_time: datetime | None = Field(alias="HeartBeatTime") + start_exec_time: datetime | None = Field(alias="StartExecTime") + end_exec_time: datetime | None = Field(alias="EndExecTime") + last_update_time: datetime | None = Field(alias="LastUpdateTime") + + SearchSpec = ScalarSearchSpec | VectorSearchSpec diff --git a/src/diracx/core/utils.py b/src/diracx/core/utils.py index b4ff7e4d..09052eff 100644 --- a/src/diracx/core/utils.py +++ b/src/diracx/core/utils.py @@ -6,12 +6,21 @@ class JobStatus(str, Enum): - Running = "Running" - Stalled = "Stalled" - Killed = "Killed" - Failed = "Failed" - RECEIVED = "RECEIVED" SUBMITTING = "Submitting" + RECEIVED = "Received" + CHECKING = "Checking" + STAGING = "Staging" + WAITING = "Waiting" + MATCHED = "Matched" + RUNNING = "Running" + STALLED = "Stalled" + COMPLETING = "Completing" + DONE = "Done" + COMPLETED = "Completed" + FAILED = "Failed" + DELETED = "Deleted" + KILLED = "Killed" + RESCHEDULED = "Rescheduled" def dotenv_files_from_environment(prefix: str) -> list[str]: diff --git a/src/diracx/db/__init__.py b/src/diracx/db/__init__.py index e1ff898a..3dd13c3c 100644 --- a/src/diracx/db/__init__.py +++ b/src/diracx/db/__init__.py @@ -1,7 +1,7 @@ -__all__ = ("AuthDB", "JobDB", "SandboxMetadataDB") +__all__ = ("AuthDB", "JobDB", "JobLoggingDB", "SandboxMetadataDB") from .auth.db import AuthDB -from .jobs.db import JobDB +from .jobs.db import JobDB, JobLoggingDB from .sandbox_metadata.db import SandboxMetadataDB # from .dummy.db import DummyDB diff --git a/src/diracx/db/jobs/db.py b/src/diracx/db/jobs/db.py index d1de12f8..388ec6be 100644 --- a/src/diracx/db/jobs/db.py +++ b/src/diracx/db/jobs/db.py @@ -1,16 +1,25 @@ from __future__ import annotations +import time from datetime import datetime, timezone from typing import Any -from sqlalchemy import func, insert, select, update +from sqlalchemy import delete, func, insert, select, update +from sqlalchemy.exc import NoResultFound from diracx.core.exceptions import InvalidQueryError +from diracx.core.models import JobStatusReturn, LimitedJobStatusReturn from diracx.core.utils import JobStatus from ..utils import BaseDB, apply_search_filters -from .schema import Base as JobDBBase -from .schema import InputData, JobJDLs, Jobs +from .schema import ( + InputData, + JobDBBase, + JobJDLs, + JobLoggingDBBase, + Jobs, + LoggingInfo, +) class JobDB(BaseDB): @@ -185,7 +194,7 @@ async def insert( class_ad_job = ClassAd(jobJDL) class_ad_req = ClassAd("[]") if not class_ad_job.isOK(): - job_attrs["Status"] = JobStatus.Failed + job_attrs["Status"] = JobStatus.FAILED job_attrs["MinorStatus"] = "Error in JDL syntax" @@ -242,3 +251,157 @@ async def insert( "MinorStatus": initial_minor_status, "TimeStamp": datetime.now(tz=timezone.utc), } + + async def get_job_status(self, job_id: int) -> LimitedJobStatusReturn: + stmt = select(Jobs.Status, Jobs.MinorStatus, Jobs.ApplicationStatus).where( + Jobs.JobID == job_id + ) + return LimitedJobStatusReturn( + **dict((await self.conn.execute(stmt)).one()._mapping) + ) + + +MAGIC_EPOC_NUMBER = 1270000000 + + +class JobLoggingDB(BaseDB): + """Frontend for the JobLoggingDB. Provides the ability to store changes with timestamps""" + + # This needs to be here for the BaseDB to create the engine + metadata = JobLoggingDBBase.metadata + + async def insert_record( + self, + job_id: int, + status: JobStatus, + minor_status: str, + application_status: str, + date: datetime, + source: str, + ): + """ + Add a new entry to the JobLoggingDB table. One, two or all the three status + components (status, minorStatus, applicationStatus) can be specified. + Optionally the time stamp of the status can + be provided in a form of a string in a format '%Y-%m-%d %H:%M:%S' or + as datetime.datetime object. If the time stamp is not provided the current + UTC time is used. + """ + + seqnum_stmt = ( + select(func.coalesce(func.max(LoggingInfo.SeqNum) + 1, 1)) + .where(LoggingInfo.JobID == job_id) + .scalar_subquery() + ) + + epoc = ( + time.mktime(date.timetuple()) + + date.microsecond / 1000000.0 + - MAGIC_EPOC_NUMBER + ) + + stmt = insert(LoggingInfo).values( + JobID=int(job_id), + SeqNum=seqnum_stmt, + Status=status, + MinorStatus=minor_status, + ApplicationStatus=application_status[:255], + StatusTime=date, + StatusTimeOrder=epoc, + StatusSource=source[:32], + ) + await self.conn.execute(stmt) + + async def get_records(self, job_id: int) -> list[JobStatusReturn]: + """Returns a Status,MinorStatus,ApplicationStatus,StatusTime,StatusSource tuple + for each record found for job specified by its jobID in historical order + """ + + stmt = ( + select( + LoggingInfo.Status, + LoggingInfo.MinorStatus, + LoggingInfo.ApplicationStatus, + LoggingInfo.StatusTime, + LoggingInfo.StatusSource, + ) + .where(LoggingInfo.JobID == int(job_id)) + .order_by(LoggingInfo.StatusTimeOrder, LoggingInfo.StatusTime) + ) + rows = await self.conn.execute(stmt) + + values = [] + for ( + status, + minor_status, + application_status, + status_time, + status_source, + ) in rows: + values.append( + [ + status, + minor_status, + application_status, + status_time.replace(tzinfo=timezone.utc), + status_source, + ] + ) + + # If no value has been set for the application status in the first place, + # We put this status to unknown + res = [] + if values: + if values[0][2] == "idem": + values[0][2] = "Unknown" + + # We replace "idem" values by the value previously stated + for i in range(1, len(values)): + for j in range(3): + if values[i][j] == "idem": + values[i][j] = values[i - 1][j] + + # And we replace arrays with tuples + for ( + status, + minor_status, + application_status, + status_time, + status_source, + ) in values: + res.append( + JobStatusReturn( + Status=status, + MinorStatus=minor_status, + ApplicationStatus=application_status, + StatusTime=status_time, + StatusSource=status_source, + ) + ) + + return res + + async def delete_records(self, job_id: int): + """Delete logging records for given jobs""" + + stmt = delete(LoggingInfo).where(LoggingInfo.JobID == job_id) + await self.conn.execute(stmt) + + async def get_wms_time_stamps(self, job_id): + """Get TimeStamps for job MajorState transitions + return a {State:timestamp} dictionary + """ + + result = {} + stmt = select( + LoggingInfo.Status, + LoggingInfo.StatusTimeOrder, + ).where(LoggingInfo.JobID == job_id) + rows = await self.conn.execute(stmt) + if not rows.rowcount: + raise NoResultFound(f"No Logging Info for job {job_id}") + + for event, etime in rows: + result[event] = str(etime + MAGIC_EPOC_NUMBER) + + return result diff --git a/src/diracx/db/jobs/schema.py b/src/diracx/db/jobs/schema.py index 6dc228d1..d13cf500 100644 --- a/src/diracx/db/jobs/schema.py +++ b/src/diracx/db/jobs/schema.py @@ -6,15 +6,17 @@ ForeignKeyConstraint, Index, Integer, + Numeric, PrimaryKeyConstraint, String, Text, ) from sqlalchemy.orm import declarative_base -from ..utils import Column, NullColumn +from ..utils import Column, DateNowColumn, NullColumn -Base = declarative_base() +JobDBBase = declarative_base() +JobLoggingDBBase = declarative_base() class EnumBackedBool(types.TypeDecorator): @@ -43,7 +45,7 @@ def process_result_value(self, value, dialect) -> bool: raise NotImplementedError(f"Unknown {value=}") -class JobJDLs(Base): +class JobJDLs(JobDBBase): __tablename__ = "JobJDLs" JobID = Column(Integer, autoincrement=True) JDL = Column(Text) @@ -52,7 +54,7 @@ class JobJDLs(Base): __table_args__ = (PrimaryKeyConstraint("JobID"),) -class Jobs(Base): +class Jobs(JobDBBase): __tablename__ = "Jobs" JobID = Column("JobID", Integer, primary_key=True, default=0) @@ -112,7 +114,7 @@ class Jobs(Base): ) -class InputData(Base): +class InputData(JobDBBase): __tablename__ = "InputData" JobID = Column(Integer, primary_key=True) LFN = Column(String(255), default="", primary_key=True) @@ -120,7 +122,7 @@ class InputData(Base): __table_args__ = (ForeignKeyConstraint(["JobID"], ["Jobs.JobID"]),) -class JobParameters(Base): +class JobParameters(JobDBBase): __tablename__ = "JobParameters" JobID = Column(Integer, primary_key=True) Name = Column(String(100), primary_key=True) @@ -128,7 +130,7 @@ class JobParameters(Base): __table_args__ = (ForeignKeyConstraint(["JobID"], ["Jobs.JobID"]),) -class OptimizerParameters(Base): +class OptimizerParameters(JobDBBase): __tablename__ = "OptimizerParameters" JobID = Column(Integer, primary_key=True) Name = Column(String(100), primary_key=True) @@ -136,7 +138,7 @@ class OptimizerParameters(Base): __table_args__ = (ForeignKeyConstraint(["JobID"], ["Jobs.JobID"]),) -class AtticJobParameters(Base): +class AtticJobParameters(JobDBBase): __tablename__ = "AtticJobParameters" JobID = Column(Integer, ForeignKey("Jobs.JobID"), primary_key=True) Name = Column(String(100), primary_key=True) @@ -144,7 +146,7 @@ class AtticJobParameters(Base): RescheduleCycle = Column(Integer) -class SiteMask(Base): +class SiteMask(JobDBBase): __tablename__ = "SiteMask" Site = Column(String(64), primary_key=True) Status = Column(String(64)) @@ -153,7 +155,7 @@ class SiteMask(Base): Comment = Column(Text) -class SiteMaskLogging(Base): +class SiteMaskLogging(JobDBBase): __tablename__ = "SiteMaskLogging" Site = Column(String(64), primary_key=True) UpdateTime = Column(DateTime, primary_key=True) @@ -162,7 +164,7 @@ class SiteMaskLogging(Base): Comment = Column(Text) -class HeartBeatLoggingInfo(Base): +class HeartBeatLoggingInfo(JobDBBase): __tablename__ = "HeartBeatLoggingInfo" JobID = Column(Integer, primary_key=True) Name = Column(String(100), primary_key=True) @@ -172,7 +174,7 @@ class HeartBeatLoggingInfo(Base): __table_args__ = (ForeignKeyConstraint(["JobID"], ["Jobs.JobID"]),) -class JobCommands(Base): +class JobCommands(JobDBBase): __tablename__ = "JobCommands" JobID = Column(Integer, primary_key=True) Command = Column(String(100)) @@ -182,3 +184,17 @@ class JobCommands(Base): ExecutionTime = NullColumn(DateTime) __table_args__ = (ForeignKeyConstraint(["JobID"], ["Jobs.JobID"]),) + + +class LoggingInfo(JobLoggingDBBase): + __tablename__ = "LoggingInfo" + JobID = Column(Integer) + SeqNum = Column(Integer) + Status = Column(String(32), default="") + MinorStatus = Column(String(128), default="") + ApplicationStatus = Column(String(255), default="") + StatusTime = DateNowColumn() + # TODO: Check that this corresponds to the DOUBLE(12,3) type in MySQL + StatusTimeOrder = Column(Numeric(precision=12, scale=3), default=0) + StatusSource = Column(String(32), default="Unknown") + __table_args__ = (PrimaryKeyConstraint("JobID", "SeqNum"),) diff --git a/src/diracx/db/jobs/status_utility.py b/src/diracx/db/jobs/status_utility.py new file mode 100644 index 00000000..c83c05e0 --- /dev/null +++ b/src/diracx/db/jobs/status_utility.py @@ -0,0 +1,148 @@ +from datetime import datetime, timezone +from unittest.mock import MagicMock + +from sqlalchemy.exc import NoResultFound + +from diracx.core.models import ( + JobStatusUpdate, + ScalarSearchOperator, + SetJobStatusReturn, +) +from diracx.core.utils import JobStatus +from diracx.db.jobs.db import JobDB, JobLoggingDB + + +async def set_job_status( + job_id: int, + status: dict[datetime, JobStatusUpdate], + job_db: JobDB, + job_logging_db: JobLoggingDB, + force: bool = False, +) -> SetJobStatusReturn: + """Set various status fields for job specified by its jobId. + Set only the last status in the JobDB, updating all the status + logging information in the JobLoggingDB. The statusDict has datetime + as a key and status information dictionary as values + """ + + from DIRAC.Core.Utilities import TimeUtilities + from DIRAC.Core.Utilities.ReturnValues import returnValueOrRaise + from DIRAC.WorkloadManagementSystem.Utilities.JobStatusUtility import ( + getNewStatus, + getStartAndEndTime, + ) + + # transform JobStateUpdate objects into dicts + statusDict = {} + for key, value in status.items(): + statusDict[key] = value.dict(by_alias=True) + + res = await job_db.search( + parameters=["Status", "StartExecTime", "EndExecTime"], + search=[ + { + "parameter": "JobID", + "operator": ScalarSearchOperator.EQUAL, + "value": str(job_id), + } + ], + sorts=[], + ) + if not res: + raise NoResultFound(f"Job {job_id} not found") + + currentStatus = res[0]["Status"] + startTime = res[0]["StartExecTime"] + endTime = res[0]["EndExecTime"] + + # If the current status is Stalled and we get an update, it should probably be "Running" + if currentStatus == JobStatus.STALLED: + currentStatus = JobStatus.RUNNING + + # Get the latest time stamps of major status updates + try: + result = await job_logging_db.get_wms_time_stamps(job_id) + except NoResultFound as e: + raise e + + ##################################################################################################### + + # This is more precise than "LastTime". timeStamps is a sorted list of tuples... + timeStamps = sorted((float(t), s) for s, t in result.items()) + lastTime = TimeUtilities.fromEpoch(timeStamps[-1][0]).replace(tzinfo=timezone.utc) + + # Get chronological order of new updates + updateTimes = sorted(statusDict) + + newStartTime, newEndTime = getStartAndEndTime( + startTime, endTime, updateTimes, timeStamps, statusDict + ) + + job_data = {} + if updateTimes[-1] >= lastTime: + new_status, new_minor, new_application = returnValueOrRaise( + getNewStatus( + job_id, + updateTimes, + lastTime, + statusDict, + currentStatus, + force, + MagicMock(), + ) + ) + + if new_status: + job_data["Status"] = new_status + job_data["LastUpdateTime"] = datetime.now(timezone.utc) + if new_minor: + job_data["MinorStatus"] = new_minor + if new_application: + job_data["ApplicationStatus"] = new_application + + # TODO: implement elasticJobParametersDB ? + # if cls.elasticJobParametersDB: + # result = cls.elasticJobParametersDB.setJobParameter(int(jobID), "Status", status) + # if not result["OK"]: + # return result + + for updTime in updateTimes: + if statusDict[updTime]["StatusSource"].startswith("Job"): + job_data["HeartBeatTime"] = updTime + + if not startTime and newStartTime: + job_data["StartExecTime"] = newStartTime + + if not endTime and newEndTime: + job_data["EndExecTime"] = newEndTime + + if job_data: + await job_db.setJobAttributes(job_id, job_data) + + # Update the JobLoggingDB records + # TODO: Because I really didn't liked the fact that the input field is called "Source" + # and the output field is called "StatusSource" + # I changed the name of the input field to "StatusSource" + # Meaning this change must be added to the transformation layer for DIRAC. + + for updTime in updateTimes: + sDict = statusDict[updTime] + if not sDict["Status"]: + sDict["Status"] = "idem" + if not sDict["MinorStatus"]: + sDict["MinorStatus"] = "idem" + if not sDict["ApplicationStatus"]: + sDict["ApplicationStatus"] = "idem" + if not sDict["StatusSource"]: + sDict["StatusSource"] = "Unknown" + + await job_logging_db.insert_record( + job_id, + sDict["Status"], + sDict["MinorStatus"], + sDict["ApplicationStatus"], + updTime, + sDict["StatusSource"], + ) + + return SetJobStatusReturn(**job_data) diff --git a/src/diracx/routers/dependencies.py b/src/diracx/routers/dependencies.py index 9df412a1..17d4cf83 100644 --- a/src/diracx/routers/dependencies.py +++ b/src/diracx/routers/dependencies.py @@ -4,6 +4,7 @@ "Config", "AuthDB", "JobDB", + "JobLoggingDB", "add_settings_annotation", "AvailableSecurityProperties", ) @@ -17,6 +18,7 @@ from diracx.core.properties import SecurityProperty from diracx.db import AuthDB as _AuthDB from diracx.db import JobDB as _JobDB +from diracx.db import JobLoggingDB as _JobLoggingDB T = TypeVar("T") @@ -29,6 +31,7 @@ def add_settings_annotation(cls: T) -> T: # Databases AuthDB = Annotated[_AuthDB, Depends(_AuthDB.transaction)] JobDB = Annotated[_JobDB, Depends(_JobDB.transaction)] +JobLoggingDB = Annotated[_JobLoggingDB, Depends(_JobLoggingDB.transaction)] # Miscellaneous Config = Annotated[_Config, Depends(ConfigSource.create)] diff --git a/src/diracx/routers/job_manager/__init__.py b/src/diracx/routers/job_manager/__init__.py index 5f77fa3a..2f06a475 100644 --- a/src/diracx/routers/job_manager/__init__.py +++ b/src/diracx/routers/job_manager/__init__.py @@ -2,20 +2,32 @@ import asyncio import logging -from datetime import datetime +from datetime import datetime, timezone from http import HTTPStatus from typing import Annotated, Any, TypedDict from fastapi import Body, Depends, HTTPException, Query from pydantic import BaseModel, root_validator +from sqlalchemy.exc import NoResultFound from diracx.core.config import Config, ConfigSource -from diracx.core.models import ScalarSearchOperator, SearchSpec, SortSpec +from diracx.core.models import ( + JobStatusReturn, + JobStatusUpdate, + LimitedJobStatusReturn, + ScalarSearchOperator, + SearchSpec, + SetJobStatusReturn, + SortSpec, +) from diracx.core.properties import JOB_ADMINISTRATOR, NORMAL_USER from diracx.core.utils import JobStatus +from diracx.db.jobs.status_utility import ( + set_job_status, +) from ..auth import UserInfo, has_properties, verify_dirac_access_token -from ..dependencies import JobDB +from ..dependencies import JobDB, JobLoggingDB from ..fastapi_classes import DiracxRouter MAX_PARAMETRIC_JOBS = 20 @@ -64,16 +76,6 @@ class JobID(BaseModel): job_id: int -class JobStatusUpdate(BaseModel): - job_id: int - status: JobStatus - - -class JobStatusReturn(TypedDict): - job_id: int - status: JobStatus - - EXAMPLE_JDLS = { "Simple JDL": [ """Arguments = "jobDescription.xml -o LogLevel=INFO"; @@ -102,6 +104,7 @@ async def submit_bulk_jobs( # FIXME: Using mutliple doesn't work with swagger? job_definitions: Annotated[list[str], Body(example=EXAMPLE_JDLS["Simple JDL"])], job_db: JobDB, + job_logging_db: JobLoggingDB, user_info: Annotated[UserInfo, Depends(verify_dirac_access_token)], ) -> list[InsertedJob]: from DIRAC.Core.Utilities.ClassAd.ClassAdLight import ClassAd @@ -178,7 +181,7 @@ def __init__(self, user_info: UserInfo, allInfo: bool = True): detail=f"Normal user cannot submit more than {MAX_PARAMETRIC_JOBS} jobs at once", ) - jobIDList = [] + result = [] if parametricJob: initialStatus = JobStatus.SUBMITTING @@ -192,7 +195,7 @@ def __init__(self, user_info: UserInfo, allInfo: bool = True): ) in ( jobDescList ): # jobDescList because there might be a list generated by a parametric job - job_id = await job_db.insert( + res = await job_db.insert( jobDescription, user_info.preferred_username, user_info.dirac_group, @@ -201,22 +204,23 @@ def __init__(self, user_info: UserInfo, allInfo: bool = True): user_info.vo, ) + job_id = res["JobID"] logging.debug( f'Job added to the JobDB", "{job_id} for {user_info.preferred_username}/{user_info.dirac_group}' ) - # TODO comment out for test just now - # self.jobLoggingDB.addLoggingRecord( - # jobID, - # result["Status"], - # result["MinorStatus"], - # date=result["TimeStamp"], - # source="JobManager", - # ) + await job_logging_db.insert_record( + int(job_id), + initialStatus, + initialMinorStatus, + "Unknown", + datetime.now(timezone.utc), + "JobManager", + ) - jobIDList.append(job_id) + result.append(res) - return jobIDList + return result # TODO: is this needed ? # if not parametricJob: @@ -233,44 +237,59 @@ async def delete_bulk_jobs(job_ids: Annotated[list[int], Query()]): return job_ids -@router.get("/{job_id}") -async def get_single_job(job_id: int): - return f"This job {job_id}" - - -@router.delete("/{job_id}") -async def delete_single_job(job_id: int): - return f"I am deleting {job_id}" - - -@router.post("/{job_id}/kill", dependencies=[has_properties(JOB_ADMINISTRATOR)]) -async def kill_single_job(job_id: int): - return f"I am killing {job_id}" - - -@router.get("/{job_id}/status") -async def get_single_job_status(job_id: int) -> JobStatus: - return JobStatus.Stalled - - -@router.post("/{job_id}/status") -async def set_single_job_status(job_id: int, status: JobStatus): - return f"Updating Job {job_id} to {status}" - - @router.post("/kill") -async def kill_bulk_jobs(job_ids: Annotated[list[int], Query()]): +async def kill_bulk_jobs( + job_ids: Annotated[list[int], Query()], +): return job_ids @router.get("/status") -async def get_bulk_job_status(job_ids: Annotated[list[int], Query(max_items=10)]): - return [{"job_id": jid, "status": JobStatus.Running} for jid in job_ids] +async def get_job_status_bulk( + job_ids: Annotated[list[int], Query()], job_db: JobDB +) -> dict[int, LimitedJobStatusReturn]: + try: + result = await asyncio.gather( + *(job_db.get_job_status(job_id) for job_id in job_ids) + ) + return {job_id: status for job_id, status in zip(job_ids, result)} + except NoResultFound as e: + raise HTTPException(status_code=HTTPStatus.NOT_FOUND, detail=str(e)) from e -@router.post("/status") -async def set_status_bulk(job_update: list[JobStatusUpdate]) -> list[JobStatusReturn]: - return [{"job_id": job.job_id, "status": job.status} for job in job_update] +@router.put("/status") +async def set_job_status_bulk( + job_update: dict[int, dict[datetime, JobStatusUpdate]], + job_db: JobDB, + job_logging_db: JobLoggingDB, + force: bool = False, +) -> dict[int, SetJobStatusReturn]: + # check that the datetime contains timezone info + for status in job_update.values(): + for dt in status: + if dt.tzinfo is None: + raise HTTPException( + status_code=HTTPStatus.BAD_REQUEST, + detail=f"Timestamp {dt} is not timezone aware", + ) + + res = await asyncio.gather( + *( + set_job_status(job_id, status, job_db, job_logging_db, force) + for job_id, status in job_update.items() + ) + ) + return {job_id: status for job_id, status in zip(job_update.keys(), res)} + + +@router.get("/status/history") +async def get_job_status_history_bulk( + job_ids: Annotated[list[int], Query()], job_logging_db: JobLoggingDB +) -> dict[int, list[JobStatusReturn]]: + result = await asyncio.gather( + *(job_logging_db.get_records(job_id) for job_id in job_ids) + ) + return {job_id: status for job_id, status in zip(job_ids, result)} EXAMPLE_SEARCHES = { @@ -380,3 +399,60 @@ async def summary( } ) return await job_db.summary(body.grouping, body.search) + + +@router.get("/{job_id}") +async def get_single_job(job_id: int): + return f"This job {job_id}" + + +@router.get("/{job_id}/status") +async def get_single_job_status( + job_id: int, job_db: JobDB +) -> dict[int, LimitedJobStatusReturn]: + try: + status = await job_db.get_job_status(job_id) + except NoResultFound as e: + raise HTTPException( + status_code=HTTPStatus.NOT_FOUND, detail=f"Job {job_id} not found" + ) from e + return {job_id: status} + + +@router.put("/{job_id}/status") +async def set_single_job_status( + job_id: int, + status: Annotated[dict[datetime, JobStatusUpdate], Body()], + job_db: JobDB, + job_logging_db: JobLoggingDB, + force: bool = False, +) -> dict[int, SetJobStatusReturn]: + # check that the datetime contains timezone info + for dt in status: + if dt.tzinfo is None: + raise HTTPException( + status_code=HTTPStatus.BAD_REQUEST, + detail=f"Timestamp {dt} is not timezone aware", + ) + + try: + latest_status = await set_job_status( + job_id, status, job_db, job_logging_db, force + ) + except NoResultFound as e: + raise HTTPException(status_code=HTTPStatus.NOT_FOUND, detail=str(e)) from e + return {job_id: latest_status} + + +@router.get("/{job_id}/status/history") +async def get_single_job_status_history( + job_id: int, + job_logging_db: JobLoggingDB, +) -> dict[int, list[JobStatusReturn]]: + try: + status = await job_logging_db.get_records(job_id) + except NoResultFound as e: + raise HTTPException( + status_code=HTTPStatus.NOT_FOUND, detail="Job not found" + ) from e + return {job_id: status} diff --git a/tests/conftest.py b/tests/conftest.py index a852c376..24eacb6b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -73,6 +73,7 @@ def with_app(test_auth_settings, with_config_repo): all_service_settings=[test_auth_settings], database_urls={ "JobDB": "sqlite+aiosqlite:///:memory:", + "JobLoggingDB": "sqlite+aiosqlite:///:memory:", "AuthDB": "sqlite+aiosqlite:///:memory:", "SandboxMetadataDB": "sqlite+aiosqlite:///:memory:", }, diff --git a/tests/db/test_jobDB.py b/tests/db/jobs/test_jobDB.py similarity index 100% rename from tests/db/test_jobDB.py rename to tests/db/jobs/test_jobDB.py diff --git a/tests/db/jobs/test_jobLoggingDB.py b/tests/db/jobs/test_jobLoggingDB.py new file mode 100644 index 00000000..1048c7d2 --- /dev/null +++ b/tests/db/jobs/test_jobLoggingDB.py @@ -0,0 +1,38 @@ +from datetime import datetime, timezone + +import pytest + +from diracx.core.utils import JobStatus +from diracx.db import JobLoggingDB + + +@pytest.fixture +async def job_logging_db(): + job_logging_db = JobLoggingDB("sqlite+aiosqlite:///:memory:") + async with job_logging_db.engine_context(): + yield job_logging_db + + +async def test_insert_record(job_logging_db: JobLoggingDB): + async with job_logging_db as job_logging_db: + # Arrange + date = datetime.now(timezone.utc) + + # Act + await job_logging_db.insert_record( + 1, + status=JobStatus.RECEIVED.value, + minor_status="minor_status", + application_status="application_status", + date=date, + source="pytest", + ) + + # Assert + res = await job_logging_db.get_records(1) + assert len(res) == 1 + assert res[0].status == JobStatus.RECEIVED.value + assert res[0].minor_status == "minor_status" + assert res[0].application_status == "application_status" + assert res[0].status_time == date + assert res[0].status_source == "pytest" diff --git a/tests/routers/test_job_manager.py b/tests/routers/test_job_manager.py index 96c712d0..f42a1f1d 100644 --- a/tests/routers/test_job_manager.py +++ b/tests/routers/test_job_manager.py @@ -1,6 +1,10 @@ +from datetime import datetime, timezone from http import HTTPStatus import pytest +from fastapi.testclient import TestClient + +from diracx.core.utils import JobStatus TEST_JDL = """ Arguments = "jobDescription.xml -o LogLevel=INFO"; @@ -134,7 +138,13 @@ def test_insert_and_search(normal_user_client): r = normal_user_client.post( "/jobs/search", json={ - "search": [{"parameter": "Status", "operator": "eq", "value": "RECEIVED"}] + "search": [ + { + "parameter": "Status", + "operator": "eq", + "value": JobStatus.RECEIVED.value, + } + ] }, ) assert r.status_code == 200, r.json() @@ -145,7 +155,7 @@ def test_insert_and_search(normal_user_client): ) assert r.status_code == 200, r.json() assert r.json() == [ - {"JobID": jid, "Status": "RECEIVED"} for jid in submitted_job_ids + {"JobID": jid, "Status": JobStatus.RECEIVED.value} for jid in submitted_job_ids ] # Test /jobs/summary @@ -153,17 +163,26 @@ def test_insert_and_search(normal_user_client): "/jobs/summary", json={"grouping": ["Status", "OwnerGroup"]} ) assert r.status_code == 200, r.json() - assert r.json() == [{"Status": "RECEIVED", "OwnerGroup": "test_group", "count": 1}] + + assert r.json() == [ + {"Status": JobStatus.RECEIVED.value, "OwnerGroup": "test_group", "count": 1} + ] r = normal_user_client.post( "/jobs/summary", json={ "grouping": ["Status"], - "search": [{"parameter": "Status", "operator": "eq", "value": "RECEIVED"}], + "search": [ + { + "parameter": "Status", + "operator": "eq", + "value": JobStatus.RECEIVED.value, + } + ], }, ) assert r.status_code == 200, r.json() - assert r.json() == [{"Status": "RECEIVED", "count": 1}] + assert r.json() == [{"Status": JobStatus.RECEIVED.value, "count": 1}] r = normal_user_client.post( "/jobs/summary", @@ -208,3 +227,365 @@ def test_user_cannot_submit_multiple_jdl_if_at_least_one_of_them_is_parametric( def test_user_without_the_normal_user_property_cannot_submit_job(admin_user_client): res = admin_user_client.post("/jobs/", json=[TEST_JDL]) assert res.status_code == HTTPStatus.FORBIDDEN, res.json() + + +def test_get_job_status(normal_user_client: TestClient): + """Test that the job status is returned correctly.""" + # Arrange + job_definitions = [TEST_JDL] + r = normal_user_client.post("/jobs/", json=job_definitions) + assert r.status_code == 200, r.json() + assert len(r.json()) == 1 # Parameters.JOB_ID is 3 + job_id = r.json()[0]["JobID"] + + # Act + r = normal_user_client.get(f"/jobs/{job_id}/status") + + # Assert + assert r.status_code == 200, r.json() + # TODO: should we return camel case here (and everywhere else) ? + assert r.json()[str(job_id)]["Status"] == JobStatus.RECEIVED.value + assert r.json()[str(job_id)]["MinorStatus"] == "Job accepted" + assert r.json()[str(job_id)]["ApplicationStatus"] == "Unknown" + + +def test_get_status_of_nonexistent_job(normal_user_client: TestClient): + """Test that the job status is returned correctly.""" + # Act + r = normal_user_client.get("/jobs/1/status") + + # Assert + assert r.status_code == 404, r.json() + assert r.json() == {"detail": "Job 1 not found"} + + +def test_get_job_status_in_bulk(normal_user_client: TestClient): + """Test that we can get the status of multiple jobs in one request""" + # Arrange + job_definitions = [TEST_PARAMETRIC_JDL] + r = normal_user_client.post("/jobs/", json=job_definitions) + assert r.status_code == 200, r.json() + assert len(r.json()) == 3 # Parameters.JOB_ID is 3 + submitted_job_ids = sorted([job_dict["JobID"] for job_dict in r.json()]) + assert isinstance(submitted_job_ids, list) + assert (isinstance(submitted_job_id, int) for submitted_job_id in submitted_job_ids) + + # Act + r = normal_user_client.get("/jobs/status", params={"job_ids": submitted_job_ids}) + + # Assert + print(r.json()) + assert r.status_code == 200, r.json() + assert len(r.json()) == 3 # Parameters.JOB_ID is 3 + for job_id in submitted_job_ids: + assert str(job_id) in r.json() + assert r.json()[str(job_id)]["Status"] == JobStatus.SUBMITTING.value + assert r.json()[str(job_id)]["MinorStatus"] == "Bulk transaction confirmation" + assert r.json()[str(job_id)]["ApplicationStatus"] == "Unknown" + + +async def test_get_job_status_history(normal_user_client: TestClient): + # Arrange + job_definitions = [TEST_JDL] + before = datetime.now(timezone.utc) + r = normal_user_client.post("/jobs/", json=job_definitions) + after = datetime.now(timezone.utc) + assert r.status_code == 200, r.json() + assert len(r.json()) == 1 + job_id = r.json()[0]["JobID"] + r = normal_user_client.get(f"/jobs/{job_id}/status") + assert r.status_code == 200, r.json() + assert r.json()[str(job_id)]["Status"] == JobStatus.RECEIVED.value + assert r.json()[str(job_id)]["MinorStatus"] == "Job accepted" + assert r.json()[str(job_id)]["ApplicationStatus"] == "Unknown" + + NEW_STATUS = JobStatus.CHECKING.value + NEW_MINOR_STATUS = "JobPath" + beforebis = datetime.now(timezone.utc) + r = normal_user_client.put( + f"/jobs/{job_id}/status", + json={ + datetime.now(tz=timezone.utc).isoformat(): { + "Status": NEW_STATUS, + "MinorStatus": NEW_MINOR_STATUS, + } + }, + ) + afterbis = datetime.now(timezone.utc) + assert r.status_code == 200, r.json() + assert r.json()[str(job_id)]["Status"] == NEW_STATUS + assert r.json()[str(job_id)]["MinorStatus"] == NEW_MINOR_STATUS + + # Act + r = normal_user_client.get( + f"/jobs/{job_id}/status/history", + ) + + # Assert + assert r.status_code == 200, r.json() + assert len(r.json()) == 1 + assert len(r.json()[str(job_id)]) == 2 + assert r.json()[str(job_id)][0]["Status"] == JobStatus.RECEIVED.value + assert r.json()[str(job_id)][0]["MinorStatus"] == "Job accepted" + assert r.json()[str(job_id)][0]["ApplicationStatus"] == "Unknown" + assert ( + before < datetime.fromisoformat(r.json()[str(job_id)][0]["StatusTime"]) < after + ) + assert r.json()[str(job_id)][0]["StatusSource"] == "JobManager" + + assert r.json()[str(job_id)][1]["Status"] == JobStatus.CHECKING.value + assert r.json()[str(job_id)][1]["MinorStatus"] == "JobPath" + assert r.json()[str(job_id)][1]["ApplicationStatus"] == "Unknown" + assert ( + beforebis + < datetime.fromisoformat(r.json()[str(job_id)][1]["StatusTime"]) + < afterbis + ) + assert r.json()[str(job_id)][1]["StatusSource"] == "Unknown" + + +def test_get_job_status_history_in_bulk(normal_user_client: TestClient): + # Arrange + job_definitions = [TEST_JDL] + r = normal_user_client.post("/jobs/", json=job_definitions) + assert r.status_code == 200, r.json() + assert len(r.json()) == 1 + job_id = r.json()[0]["JobID"] + r = normal_user_client.get(f"/jobs/{job_id}/status") + assert r.status_code == 200, r.json() + assert r.json()[str(job_id)]["Status"] == JobStatus.RECEIVED.value + assert r.json()[str(job_id)]["MinorStatus"] == "Job accepted" + assert r.json()[str(job_id)]["ApplicationStatus"] == "Unknown" + + # Act + r = normal_user_client.get("/jobs/status/history", params={"job_ids": [job_id]}) + + # Assert + assert r.status_code == 200, r.json() + assert len(r.json()) == 1 + assert r.json()[str(job_id)][0]["Status"] == JobStatus.RECEIVED.value + assert r.json()[str(job_id)][0]["MinorStatus"] == "Job accepted" + assert r.json()[str(job_id)][0]["ApplicationStatus"] == "Unknown" + assert datetime.fromisoformat(r.json()[str(job_id)][0]["StatusTime"]) + assert r.json()[str(job_id)][0]["StatusSource"] == "JobManager" + + +def test_set_job_status(normal_user_client: TestClient): + # Arrange + job_definitions = [TEST_JDL] + r = normal_user_client.post("/jobs/", json=job_definitions) + assert r.status_code == 200, r.json() + assert len(r.json()) == 1 + job_id = r.json()[0]["JobID"] + r = normal_user_client.get(f"/jobs/{job_id}/status") + assert r.status_code == 200, r.json() + assert r.json()[str(job_id)]["Status"] == JobStatus.RECEIVED.value + assert r.json()[str(job_id)]["MinorStatus"] == "Job accepted" + assert r.json()[str(job_id)]["ApplicationStatus"] == "Unknown" + + # Act + NEW_STATUS = JobStatus.CHECKING.value + NEW_MINOR_STATUS = "JobPath" + r = normal_user_client.put( + f"/jobs/{job_id}/status", + json={ + datetime.now(tz=timezone.utc).isoformat(): { + "Status": NEW_STATUS, + "MinorStatus": NEW_MINOR_STATUS, + } + }, + ) + + # Assert + assert r.status_code == 200, r.json() + assert r.json()[str(job_id)]["Status"] == NEW_STATUS + assert r.json()[str(job_id)]["MinorStatus"] == NEW_MINOR_STATUS + + r = normal_user_client.get(f"/jobs/{job_id}/status") + assert r.status_code == 200, r.json() + assert r.json()[str(job_id)]["Status"] == NEW_STATUS + assert r.json()[str(job_id)]["MinorStatus"] == NEW_MINOR_STATUS + assert r.json()[str(job_id)]["ApplicationStatus"] == "Unknown" + + +def test_set_job_status_invalid_job(normal_user_client: TestClient): + # Act + r = normal_user_client.put( + "/jobs/1/status", + json={ + datetime.now(tz=timezone.utc).isoformat(): { + "Status": JobStatus.CHECKING.value, + "MinorStatus": "JobPath", + } + }, + ) + + # Assert + assert r.status_code == 404, r.json() + assert r.json() == {"detail": "Job 1 not found"} + + +def test_set_job_status_offset_naive_datetime_return_bad_request( + normal_user_client: TestClient, +): + # Arrange + job_definitions = [TEST_JDL] + r = normal_user_client.post("/jobs/", json=job_definitions) + assert r.status_code == 200, r.json() + assert len(r.json()) == 1 + job_id = r.json()[0]["JobID"] + + # Act + date = datetime.utcnow().isoformat(sep=" ") + r = normal_user_client.put( + f"/jobs/{job_id}/status", + json={ + date: { + "Status": JobStatus.CHECKING.value, + "MinorStatus": "JobPath", + } + }, + ) + + # Assert + assert r.status_code == HTTPStatus.BAD_REQUEST, r.json() + assert r.json() == {"detail": f"Timestamp {date} is not timezone aware"} + + +def test_set_job_status_cannot_make_impossible_transitions( + normal_user_client: TestClient, +): + # Arrange + job_definitions = [TEST_JDL] + r = normal_user_client.post("/jobs/", json=job_definitions) + assert r.status_code == 200, r.json() + assert len(r.json()) == 1 + job_id = r.json()[0]["JobID"] + r = normal_user_client.get(f"/jobs/{job_id}/status") + assert r.status_code == 200, r.json() + assert r.json()[str(job_id)]["Status"] == JobStatus.RECEIVED.value + assert r.json()[str(job_id)]["MinorStatus"] == "Job accepted" + assert r.json()[str(job_id)]["ApplicationStatus"] == "Unknown" + + # Act + NEW_STATUS = JobStatus.RUNNING.value + NEW_MINOR_STATUS = "JobPath" + r = normal_user_client.put( + f"/jobs/{job_id}/status", + json={ + datetime.now(tz=timezone.utc).isoformat(): { + "Status": NEW_STATUS, + "MinorStatus": NEW_MINOR_STATUS, + } + }, + ) + + # Assert + assert r.status_code == 200, r.json() + assert r.json()[str(job_id)]["Status"] != NEW_STATUS + assert r.json()[str(job_id)]["MinorStatus"] == NEW_MINOR_STATUS + + r = normal_user_client.get(f"/jobs/{job_id}/status") + assert r.status_code == 200, r.json() + assert r.json()[str(job_id)]["Status"] != NEW_STATUS + assert r.json()[str(job_id)]["MinorStatus"] == NEW_MINOR_STATUS + assert r.json()[str(job_id)]["ApplicationStatus"] == "Unknown" + + +def test_set_job_status_force(normal_user_client: TestClient): + # Arrange + job_definitions = [TEST_JDL] + r = normal_user_client.post("/jobs/", json=job_definitions) + assert r.status_code == 200, r.json() + assert len(r.json()) == 1 + job_id = r.json()[0]["JobID"] + r = normal_user_client.get(f"/jobs/{job_id}/status") + assert r.status_code == 200, r.json() + assert r.json()[str(job_id)]["Status"] == JobStatus.RECEIVED.value + assert r.json()[str(job_id)]["MinorStatus"] == "Job accepted" + assert r.json()[str(job_id)]["ApplicationStatus"] == "Unknown" + + # Act + NEW_STATUS = JobStatus.RUNNING.value + NEW_MINOR_STATUS = "JobPath" + r = normal_user_client.put( + f"/jobs/{job_id}/status", + json={ + datetime.now(tz=timezone.utc).isoformat(): { + "Status": NEW_STATUS, + "MinorStatus": NEW_MINOR_STATUS, + } + }, + params={"force": True}, + ) + + # Assert + assert r.status_code == 200, r.json() + assert r.json()[str(job_id)]["Status"] == NEW_STATUS + assert r.json()[str(job_id)]["MinorStatus"] == NEW_MINOR_STATUS + + r = normal_user_client.get(f"/jobs/{job_id}/status") + assert r.status_code == 200, r.json() + assert r.json()[str(job_id)]["Status"] == NEW_STATUS + assert r.json()[str(job_id)]["MinorStatus"] == NEW_MINOR_STATUS + assert r.json()[str(job_id)]["ApplicationStatus"] == "Unknown" + + +def test_set_job_status_bulk(normal_user_client: TestClient): + # Arrange + job_definitions = [TEST_PARAMETRIC_JDL] + r = normal_user_client.post("/jobs/", json=job_definitions) + assert r.status_code == 200, r.json() + assert len(r.json()) == 3 + job_ids = sorted([job_dict["JobID"] for job_dict in r.json()]) + + for job_id in job_ids: + r = normal_user_client.get(f"/jobs/{job_id}/status") + assert r.status_code == 200, r.json() + assert r.json()[str(job_id)]["Status"] == JobStatus.SUBMITTING.value + assert r.json()[str(job_id)]["MinorStatus"] == "Bulk transaction confirmation" + + # Act + NEW_STATUS = JobStatus.CHECKING.value + NEW_MINOR_STATUS = "JobPath" + r = normal_user_client.put( + "/jobs/status", + json={ + job_id: { + datetime.now(timezone.utc).isoformat(): { + "Status": NEW_STATUS, + "MinorStatus": NEW_MINOR_STATUS, + } + } + for job_id in job_ids + }, + ) + + # Assert + assert r.status_code == 200, r.json() + for job_id in job_ids: + assert r.json()[str(job_id)]["Status"] == NEW_STATUS + assert r.json()[str(job_id)]["MinorStatus"] == NEW_MINOR_STATUS + + r_get = normal_user_client.get(f"/jobs/{job_id}/status") + assert r_get.status_code == 200, r_get.json() + assert r_get.json()[str(job_id)]["Status"] == NEW_STATUS + assert r_get.json()[str(job_id)]["MinorStatus"] == NEW_MINOR_STATUS + assert r_get.json()[str(job_id)]["ApplicationStatus"] == "Unknown" + + +def test_set_job_status_with_invalid_job_id(normal_user_client: TestClient): + # Act + r = normal_user_client.put( + "/jobs/999999999/status", + json={ + datetime.now(tz=timezone.utc).isoformat(): { + "Status": JobStatus.CHECKING.value, + "MinorStatus": "JobPath", + }, + }, + ) + + # Assert + assert r.status_code == 404, r.json() + assert r.json() == {"detail": "Job 999999999 not found"}