Skip to content

Commit

Permalink
Merge pull request #57 from simon-mazenoux/feat-implement-jobloggingdb
Browse files Browse the repository at this point in the history
Implement JobLoggingDB
  • Loading branch information
chrisburr authored Sep 8, 2023
2 parents 9fc4464 + 45baad9 commit 53f83de
Show file tree
Hide file tree
Showing 14 changed files with 968 additions and 84 deletions.
1 change: 1 addition & 0 deletions run_local.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dirac internal generate-cs "${tmp_dir}/cs_store/initialRepo" --vo=diracAdmin --u
export DIRACX_CONFIG_BACKEND_URL="git+file://${tmp_dir}/cs_store/initialRepo"
export DIRACX_DB_URL_AUTHDB="sqlite+aiosqlite:///:memory:"
export DIRACX_DB_URL_JOBDB="sqlite+aiosqlite:///:memory:"
export DIRACX_DB_URL_JOBLOGGINGDB="sqlite+aiosqlite:///:memory:"
export DIRACX_SERVICE_AUTH_TOKEN_KEY="file://${tmp_dir}/signing-key/rs256.key"
export DIRACX_SERVICE_AUTH_ALLOWED_REDIRECTS='["http://'$(hostname| tr -s '[:upper:]' '[:lower:]')':8000/docs/oauth2-redirect"]'

Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ diracx =
diracx.dbs =
AuthDB = diracx.db:AuthDB
JobDB = diracx.db:JobDB
JobLoggingDB = diracx.db:JobLoggingDB
SandboxMetadataDB = diracx.db:SandboxMetadataDB
#DummyDB = diracx.db:DummyDB
diracx.services =
Expand Down
47 changes: 47 additions & 0 deletions src/diracx/core/models.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
from __future__ import annotations

from datetime import datetime
from enum import Enum
from typing import Literal, TypedDict

from pydantic import BaseModel, Field

from diracx.core.utils import JobStatus


class ScalarSearchOperator(str, Enum):
EQUAL = "eq"
Expand Down Expand Up @@ -35,4 +40,46 @@ class VectorSearchSpec(TypedDict):
values: list[str]


class JobStatusUpdate(BaseModel):
status: JobStatus | None = Field(
default=None,
alias="Status",
)
minor_status: str | None = Field(
default=None,
alias="MinorStatus",
serialization_alias="minorStatus",
)
application_status: str | None = Field(
default=None,
alias="ApplicationStatus",
serialization_alias="applicationStatus",
)
status_source: str = Field(
alias="StatusSource",
default="Unknown",
)


class LimitedJobStatusReturn(BaseModel):
status: JobStatus = Field(alias="Status")
minor_status: str = Field(alias="MinorStatus")
application_status: str = Field(alias="ApplicationStatus")


class JobStatusReturn(LimitedJobStatusReturn):
status_time: datetime = Field(alias="StatusTime")
status_source: str = Field(alias="StatusSource")


class SetJobStatusReturn(BaseModel):
status: JobStatus | None = Field(alias="Status")
minor_status: str | None = Field(alias="MinorStatus")
application_status: str | None = Field(alias="ApplicationStatus")
heartbeat_time: datetime | None = Field(alias="HeartBeatTime")
start_exec_time: datetime | None = Field(alias="StartExecTime")
end_exec_time: datetime | None = Field(alias="EndExecTime")
last_update_time: datetime | None = Field(alias="LastUpdateTime")


SearchSpec = ScalarSearchSpec | VectorSearchSpec
19 changes: 14 additions & 5 deletions src/diracx/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,21 @@


class JobStatus(str, Enum):
Running = "Running"
Stalled = "Stalled"
Killed = "Killed"
Failed = "Failed"
RECEIVED = "RECEIVED"
SUBMITTING = "Submitting"
RECEIVED = "Received"
CHECKING = "Checking"
STAGING = "Staging"
WAITING = "Waiting"
MATCHED = "Matched"
RUNNING = "Running"
STALLED = "Stalled"
COMPLETING = "Completing"
DONE = "Done"
COMPLETED = "Completed"
FAILED = "Failed"
DELETED = "Deleted"
KILLED = "Killed"
RESCHEDULED = "Rescheduled"


def dotenv_files_from_environment(prefix: str) -> list[str]:
Expand Down
4 changes: 2 additions & 2 deletions src/diracx/db/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
__all__ = ("AuthDB", "JobDB", "SandboxMetadataDB")
__all__ = ("AuthDB", "JobDB", "JobLoggingDB", "SandboxMetadataDB")

from .auth.db import AuthDB
from .jobs.db import JobDB
from .jobs.db import JobDB, JobLoggingDB
from .sandbox_metadata.db import SandboxMetadataDB

# from .dummy.db import DummyDB
171 changes: 167 additions & 4 deletions src/diracx/db/jobs/db.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
from __future__ import annotations

import time
from datetime import datetime, timezone
from typing import Any

from sqlalchemy import func, insert, select, update
from sqlalchemy import delete, func, insert, select, update
from sqlalchemy.exc import NoResultFound

from diracx.core.exceptions import InvalidQueryError
from diracx.core.models import JobStatusReturn, LimitedJobStatusReturn
from diracx.core.utils import JobStatus

from ..utils import BaseDB, apply_search_filters
from .schema import Base as JobDBBase
from .schema import InputData, JobJDLs, Jobs
from .schema import (
InputData,
JobDBBase,
JobJDLs,
JobLoggingDBBase,
Jobs,
LoggingInfo,
)


class JobDB(BaseDB):
Expand Down Expand Up @@ -185,7 +194,7 @@ async def insert(
class_ad_job = ClassAd(jobJDL)
class_ad_req = ClassAd("[]")
if not class_ad_job.isOK():
job_attrs["Status"] = JobStatus.Failed
job_attrs["Status"] = JobStatus.FAILED

job_attrs["MinorStatus"] = "Error in JDL syntax"

Expand Down Expand Up @@ -242,3 +251,157 @@ async def insert(
"MinorStatus": initial_minor_status,
"TimeStamp": datetime.now(tz=timezone.utc),
}

async def get_job_status(self, job_id: int) -> LimitedJobStatusReturn:
stmt = select(Jobs.Status, Jobs.MinorStatus, Jobs.ApplicationStatus).where(
Jobs.JobID == job_id
)
return LimitedJobStatusReturn(
**dict((await self.conn.execute(stmt)).one()._mapping)
)


MAGIC_EPOC_NUMBER = 1270000000


class JobLoggingDB(BaseDB):
"""Frontend for the JobLoggingDB. Provides the ability to store changes with timestamps"""

# This needs to be here for the BaseDB to create the engine
metadata = JobLoggingDBBase.metadata

async def insert_record(
self,
job_id: int,
status: JobStatus,
minor_status: str,
application_status: str,
date: datetime,
source: str,
):
"""
Add a new entry to the JobLoggingDB table. One, two or all the three status
components (status, minorStatus, applicationStatus) can be specified.
Optionally the time stamp of the status can
be provided in a form of a string in a format '%Y-%m-%d %H:%M:%S' or
as datetime.datetime object. If the time stamp is not provided the current
UTC time is used.
"""

seqnum_stmt = (
select(func.coalesce(func.max(LoggingInfo.SeqNum) + 1, 1))
.where(LoggingInfo.JobID == job_id)
.scalar_subquery()
)

epoc = (
time.mktime(date.timetuple())
+ date.microsecond / 1000000.0
- MAGIC_EPOC_NUMBER
)

stmt = insert(LoggingInfo).values(
JobID=int(job_id),
SeqNum=seqnum_stmt,
Status=status,
MinorStatus=minor_status,
ApplicationStatus=application_status[:255],
StatusTime=date,
StatusTimeOrder=epoc,
StatusSource=source[:32],
)
await self.conn.execute(stmt)

async def get_records(self, job_id: int) -> list[JobStatusReturn]:
"""Returns a Status,MinorStatus,ApplicationStatus,StatusTime,StatusSource tuple
for each record found for job specified by its jobID in historical order
"""

stmt = (
select(
LoggingInfo.Status,
LoggingInfo.MinorStatus,
LoggingInfo.ApplicationStatus,
LoggingInfo.StatusTime,
LoggingInfo.StatusSource,
)
.where(LoggingInfo.JobID == int(job_id))
.order_by(LoggingInfo.StatusTimeOrder, LoggingInfo.StatusTime)
)
rows = await self.conn.execute(stmt)

values = []
for (
status,
minor_status,
application_status,
status_time,
status_source,
) in rows:
values.append(
[
status,
minor_status,
application_status,
status_time.replace(tzinfo=timezone.utc),
status_source,
]
)

# If no value has been set for the application status in the first place,
# We put this status to unknown
res = []
if values:
if values[0][2] == "idem":
values[0][2] = "Unknown"

# We replace "idem" values by the value previously stated
for i in range(1, len(values)):
for j in range(3):
if values[i][j] == "idem":
values[i][j] = values[i - 1][j]

# And we replace arrays with tuples
for (
status,
minor_status,
application_status,
status_time,
status_source,
) in values:
res.append(
JobStatusReturn(
Status=status,
MinorStatus=minor_status,
ApplicationStatus=application_status,
StatusTime=status_time,
StatusSource=status_source,
)
)

return res

async def delete_records(self, job_id: int):
"""Delete logging records for given jobs"""

stmt = delete(LoggingInfo).where(LoggingInfo.JobID == job_id)
await self.conn.execute(stmt)

async def get_wms_time_stamps(self, job_id):
"""Get TimeStamps for job MajorState transitions
return a {State:timestamp} dictionary
"""

result = {}
stmt = select(
LoggingInfo.Status,
LoggingInfo.StatusTimeOrder,
).where(LoggingInfo.JobID == job_id)
rows = await self.conn.execute(stmt)
if not rows.rowcount:
raise NoResultFound(f"No Logging Info for job {job_id}")

for event, etime in rows:
result[event] = str(etime + MAGIC_EPOC_NUMBER)

return result
Loading

0 comments on commit 53f83de

Please sign in to comment.