forked from DIRACGrid/DIRAC
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: Partially implement FutureClient/JobStateUpdat
- Loading branch information
Showing
3 changed files
with
200 additions
and
37 deletions.
There are no files selected for viewing
108 changes: 95 additions & 13 deletions
108
src/DIRAC/WorkloadManagementSystem/FutureClient/JobStateUpdateClient.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,50 +1,132 @@ | ||
import functools | ||
from datetime import datetime, timezone | ||
|
||
|
||
from DIRAC.Core.Security.DiracX import DiracXClient | ||
from DIRAC.Core.Utilities.ReturnValues import convertToReturnValue | ||
from DIRAC.Core.Utilities.TimeUtilities import fromString | ||
|
||
|
||
def stripValueIfOK(func): | ||
"""Decorator to remove S_OK["Value"] from the return value of a function if it is OK. | ||
This is done as some update functions return the number of modified rows in | ||
the database. This likely not actually useful so it isn't supported in | ||
DiracX. Stripping the "Value" key of the dictionary means that we should | ||
get a fairly straight forward error if the assumption is incorrect. | ||
""" | ||
|
||
@functools.wraps(func) | ||
def wrapper(*args, **kwargs): | ||
result = func(*args, **kwargs) | ||
if result.get("OK"): | ||
assert result.pop("Value") is None, "Value should be None if OK" | ||
return result | ||
|
||
return wrapper | ||
|
||
|
||
class JobStateUpdateClient: | ||
@stripValueIfOK | ||
@convertToReturnValue | ||
def sendHeartBeat(self, jobID: str | int, dynamicData: dict, staticData: dict): | ||
raise NotImplementedError("TODO") | ||
print("HACK: This is a no-op until we decide what to do") | ||
|
||
@stripValueIfOK | ||
@convertToReturnValue | ||
def setJobApplicationStatus(self, jobID: str | int, appStatus: str, source: str = "Unknown"): | ||
raise NotImplementedError("TODO") | ||
statusDict = { | ||
"application_status": appStatus, | ||
} | ||
if source: | ||
statusDict["Source"] = source | ||
with DiracXClient() as api: | ||
api.jobs.set_single_job_status( | ||
jobID, | ||
{datetime.now(tz=timezone.utc): statusDict}, | ||
) | ||
|
||
@stripValueIfOK | ||
@convertToReturnValue | ||
def setJobAttribute(self, jobID: str | int, attribute: str, value: str): | ||
with DiracXClient() as api: | ||
api.jobs.set_single_job_properties(jobID, "need to [patch the client to have a nice summer body ?") | ||
raise NotImplementedError("TODO") | ||
if attribute == "Status": | ||
api.jobs.set_single_job_status( | ||
jobID, | ||
{datetime.now(tz=timezone.utc): {"status": value}}, | ||
) | ||
else: | ||
api.jobs.set_single_job_properties(jobID, {attribute: value}) | ||
|
||
@stripValueIfOK | ||
@convertToReturnValue | ||
def setJobFlag(self, jobID: str | int, flag: str): | ||
raise NotImplementedError("TODO") | ||
with DiracXClient() as api: | ||
api.jobs.set_single_job_properties(jobID, {flag: True}) | ||
|
||
@stripValueIfOK | ||
@convertToReturnValue | ||
def setJobParameter(self, jobID: str | int, name: str, value: str): | ||
raise NotImplementedError("TODO") | ||
print("HACK: This is a no-op until we decide what to do") | ||
|
||
@stripValueIfOK | ||
@convertToReturnValue | ||
def setJobParameters(self, jobID: str | int, parameters: list): | ||
raise NotImplementedError("TODO") | ||
print("HACK: This is a no-op until we decide what to do") | ||
|
||
@stripValueIfOK | ||
@convertToReturnValue | ||
def setJobSite(self, jobID: str | int, site: str): | ||
raise NotImplementedError("TODO") | ||
with DiracXClient() as api: | ||
api.jobs.set_single_job_properties(jobID, {"Site": site}) | ||
|
||
@stripValueIfOK | ||
@convertToReturnValue | ||
def setJobStatus( | ||
self, | ||
jobID: str | int, | ||
status: str = "", | ||
minorStatus: str = "", | ||
source: str = "Unknown", | ||
datetime=None, | ||
datetime_=None, | ||
force=False, | ||
): | ||
raise NotImplementedError("TODO") | ||
statusDict = {} | ||
if status: | ||
statusDict["Status"] = status | ||
if minorStatus: | ||
statusDict["MinorStatus"] = minorStatus | ||
if source: | ||
statusDict["Source"] = source | ||
if datetime_ is None: | ||
datetime_ = datetime.utcnow() | ||
with DiracXClient() as api: | ||
api.jobs.set_single_job_status( | ||
jobID, | ||
{fromString(datetime_).replace(tzinfo=timezone.utc): statusDict}, | ||
force=force, | ||
) | ||
|
||
@stripValueIfOK | ||
@convertToReturnValue | ||
def setJobStatusBulk(self, jobID: str | int, statusDict: dict, force=False): | ||
raise NotImplementedError("TODO") | ||
statusDict = {fromString(k).replace(tzinfo=timezone.utc): v for k, v in statusDict.items()} | ||
with DiracXClient() as api: | ||
api.jobs.set_job_status_bulk( | ||
{jobID: statusDict}, | ||
force=force, | ||
) | ||
|
||
@stripValueIfOK | ||
@convertToReturnValue | ||
def setJobsParameter(self, jobsParameterDict: dict): | ||
raise NotImplementedError("TODO") | ||
print("HACK: This is a no-op until we decide what to do") | ||
|
||
@stripValueIfOK | ||
@convertToReturnValue | ||
def unsetJobFlag(self, jobID: str | int, flag: str): | ||
raise NotImplementedError("TODO") | ||
with DiracXClient() as api: | ||
api.jobs.set_single_job_properties(jobID, {flag: False}) | ||
|
||
def updateJobFromStager(self, jobID: str | int, status: str): | ||
raise NotImplementedError("TODO") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,20 +1,47 @@ | ||
def compare_results(test_func): | ||
import time | ||
|
||
|
||
def compare_results(monkeypatch, test_func): | ||
"""Compare the results from DIRAC and DiracX based services for a reentrant function.""" | ||
ClientClass = test_func.func.__self__ | ||
assert ClientClass.diracxClient, "FutureClient is not set up!" | ||
compare_results2(monkeypatch, test_func, test_func) | ||
|
||
|
||
def compare_results2(monkeypatch, test_func1, test_func2): | ||
"""Compare the results from DIRAC and DiracX based services for two functions which should behave identically.""" | ||
# Get the result from the diracx-based handler | ||
future_result = test_func() | ||
start = time.monotonic() | ||
with monkeypatch.context() as m: | ||
m.setattr("DIRAC.Core.Tornado.Client.ClientSelector.useLegacyAdapter", lambda *_: True) | ||
try: | ||
future_result = test_func1() | ||
except Exception as e: | ||
future_result = e | ||
else: | ||
assert "rpcStub" not in future_result, "rpcStub should never be present when using DiracX!" | ||
diracx_duration = time.monotonic() - start | ||
|
||
# Get the result from the DIRAC-based handler | ||
diracxClient = ClientClass.diracxClient | ||
ClientClass.diracxClient = None | ||
try: | ||
old_result = test_func() | ||
finally: | ||
ClientClass.diracxClient = diracxClient | ||
# We don't care about the rpcStub | ||
start = time.monotonic() | ||
with monkeypatch.context() as m: | ||
m.setattr("DIRAC.Core.Tornado.Client.ClientSelector.useLegacyAdapter", lambda *_: False) | ||
old_result = test_func2() | ||
assert "rpcStub" in old_result, "rpcStub should always be present when using legacy DIRAC!" | ||
legacy_duration = time.monotonic() - start | ||
|
||
# We don't care about the rpcStub or Errno | ||
old_result.pop("rpcStub") | ||
old_result.pop("Errno", None) | ||
|
||
if not old_result["OK"]: | ||
assert not future_result["OK"], "FutureClient should have failed too!" | ||
elif "Value" in future_result: | ||
# Ensure the results match exactly | ||
assert old_result == future_result | ||
else: | ||
# See the "stripValueIfOK" decorator for explanation | ||
assert old_result["OK"] == future_result["OK"] | ||
# assert isinstance(old_result["Value"], int) | ||
|
||
# Ensure the results match | ||
assert old_result == future_result | ||
# if 3 * legacy_duration < diracx_duration: | ||
# print(f"Legacy DIRAC took {legacy_duration:.3f}s, FutureClient took {diracx_duration:.3f}s") | ||
# assert False, "FutureClient should be faster than legacy DIRAC!" |