-
Notifications
You must be signed in to change notification settings - Fork 175
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[v8.0] Introduce Scout Agent and Optimizer #7251
base: integration
Are you sure you want to change the base?
Changes from all commits
e1baf3d
7c96e60
15a765e
1437ffd
0f8adab
84090c7
3040d20
370bf71
c1129d6
f980634
be35c40
8ab1c8f
3a8b1e1
e9f41ca
ab7447b
01706ff
c274e6c
c7a61d0
cd8fc37
bfd069a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,227 @@ | ||||||
""" Agent for scout job framework to monitor scout status | ||||||
and update main job status according to scout status. | ||||||
""" | ||||||
from DIRAC import S_OK, S_ERROR | ||||||
from DIRAC.Core.Base.AgentModule import AgentModule | ||||||
from DIRAC.WorkloadManagementSystem.Client import JobStatus | ||||||
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations | ||||||
from DIRAC.WorkloadManagementSystem.Client.JobStateUpdateClient import JobStateUpdateClient | ||||||
from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB | ||||||
|
||||||
|
||||||
class ScoutingJobStatusAgent(AgentModule): | ||||||
""" | ||||||
This agent checks for jobs with Scouting status | ||||||
and manages the job status in relation to associated scout jobs | ||||||
""" | ||||||
|
||||||
def __init__(self, *args, **kwargs): | ||||||
""" c'tor | ||||||
""" | ||||||
AgentModule.__init__(self, *args, **kwargs) | ||||||
|
||||||
self.jobDB = None | ||||||
self.logDB = None | ||||||
|
||||||
############################################################################# | ||||||
def initialize(self): | ||||||
"""Sets defaults | ||||||
""" | ||||||
|
||||||
self.jobDB = JobStateUpdateClient() | ||||||
self.logDB = JobLoggingDB() | ||||||
|
||||||
return S_OK() | ||||||
|
||||||
############################################################################# | ||||||
def beginExecution(self): | ||||||
|
||||||
self.totalScoutJobs = Operations().getValue('WorkloadManagement/Scouting/totalScoutJobs', 10) | ||||||
self.criteriaFailedRate = Operations().getValue('WorkloadManagement/Scouting/criteriaFailedRate', 0.5) | ||||||
self.criteriaSucceededRate = Operations().getValue('WorkloadManagement/Scouting/criteriaSucceededRate', 0.3) | ||||||
self.criteriaStalledRate = Operations().getValue('WorkloadManagement/Scouting/criteriaStalledRate', 1.0) | ||||||
self.criteriaFailed = Operations().getValue('WorkloadManagement/Scouting/criteriaFailed', | ||||||
int(self.totalScoutJobs * self.criteriaFailedRate)) | ||||||
self.criteriaSucceeded = Operations().getValue('WorkloadManagement/Scouting/criteriaSucceeded', | ||||||
int(self.totalScoutJobs * self.criteriaSucceededRate)) | ||||||
self.criteriaStalled = Operations().getValue('WorkloadManagement/Scouting/criteriaStalled', | ||||||
int(self.totalScoutJobs * self.criteriaStalledRate)) | ||||||
Comment on lines
+39
to
+48
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can this go via the Agent options, or is this used also in other places? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In any case you have to add the agent to the ConfigTemplate file? |
||||||
|
||||||
if int(self.totalScoutJobs * self.criteriaFailedRate) > self.criteriaFailed: | ||||||
self.criteriaFailedRate = int(self.criteriaFailed / self.totalScoutJobs) | ||||||
if int(self.totalScoutJobs * self.criteriaSucceededRate) > self.criteriaSucceeded: | ||||||
self.criteriaSucceededRate = int(self.criteriaSucceeded / self.totalScoutJobs) | ||||||
if int(self.totalScoutJobs * self.criteriaStalledRate) > self.criteriaStalled: | ||||||
self.criteriaStalledRate = int(self.criteriaStalled / self.totalScoutJobs) | ||||||
|
||||||
self.log.info(f'Scouting parameters: Total: {self.totalScoutJobs}, Succeeded: {self.criteriaSucceeded}({self.criteriaSucceededRate}), | ||||||
Failed: {self.criteriaFailed}({self.criteriaFailedRate}), Stalled" {self.criteriaStalled}({self.criteriaStalledRate})') | ||||||
|
||||||
return S_OK() | ||||||
|
||||||
def execute(self): | ||||||
"""The ScoutingJobStatus execution method. | ||||||
""" | ||||||
result = self.jobDB.selectJobs({'Status': 'Scouting'}) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. JobStateUpdateClient doesn't seem to have a method that does what we want here. However, it seems that getJobs() in JobMonitoringHandler might have this functionality. Would you recommend using getJobs, or implementing this in JobStateUpdateHandler? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use |
||||||
if not result['OK']: | ||||||
return result | ||||||
|
||||||
joblist = result['Value'] | ||||||
if not joblist: | ||||||
self.log.info('No Jobs with scouting status. Skipping this cycle') | ||||||
return S_OK() | ||||||
|
||||||
self.log.info(f'Check {len(joblist)} scouting jobs') | ||||||
self.log.debug('joblist: ', joblist) | ||||||
|
||||||
scoutIDdict = {} | ||||||
for jobID in joblist: | ||||||
result = self.jobDB.getJobParameters(int(jobID), ['ScoutID']) # <lowest jobID>:<Highest jobID> | ||||||
if not result['OK']: | ||||||
self.log.warn(result['Message']) | ||||||
continue | ||||||
if not result['Value'].get(int(jobID)): | ||||||
continue | ||||||
|
||||||
scoutID = result['Value'][int(jobID)]['ScoutID'] | ||||||
scoutStatus = scoutIDdict.get(scoutID) | ||||||
if not scoutStatus: | ||||||
result = self.__getScoutStatus(scoutID) | ||||||
if not result['OK']: | ||||||
self.log.warn(result['Message']) | ||||||
continue | ||||||
scoutStatus = result['Value'] | ||||||
|
||||||
scoutIDdict[scoutID] = scoutStatus | ||||||
if scoutStatus['Status'] == 'NotComplete': | ||||||
self.log.verbose(f"{jobID}: skipping since corresponding scout does not complete yet.") | ||||||
continue | ||||||
else: | ||||||
result = self.__updateJobStatus(jobID, status=scoutStatus['Status'], | ||||||
minorstatus=scoutStatus['MinorStatus'], | ||||||
appstatus=scoutStatus['appstatus']) | ||||||
if not result['OK']: | ||||||
self.log.warn(result['Message']) | ||||||
|
||||||
self.log.info('final scoutIDdict:%s' % scoutIDdict) | ||||||
return S_OK() | ||||||
|
||||||
def __getScoutStatus(self, scoutid): | ||||||
ids = scoutid.split(':') | ||||||
scoutjoblist = list(range(int(ids[0]), int(ids[1]) + 1)) | ||||||
|
||||||
result = self.jobDB.getJobsAttributes(scoutjoblist, ['Status', 'Site']) | ||||||
if not result['OK']: | ||||||
return result | ||||||
|
||||||
donejoblist = [] | ||||||
donesitelist = [] | ||||||
failedjoblist = [] | ||||||
failedsitelist = [] | ||||||
stalledjoblist = [] | ||||||
scoutjobs = result['Value'].keys() | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. beware, this is python3 and not py2 code. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is safer:
Suggested change
|
||||||
for scoutjob in scoutjobs: | ||||||
status = result['Value'][scoutjob]['Status'] | ||||||
site = result['Value'][scoutjob]['Site'] | ||||||
jobid = scoutjob | ||||||
if status == JobStatus.DONE: | ||||||
donejoblist.append(jobid) | ||||||
donesitelist.append(site) | ||||||
elif status == JobStatus.FAILED: | ||||||
failedjoblist.append(jobid) | ||||||
failedsitelist.append(site) | ||||||
elif status == JobStatus.STALLED: | ||||||
stalledjoblist.append(jobid) | ||||||
|
||||||
if self.criteriaSucceeded > len(scoutjobs): | ||||||
criteriaSucceeded = max(int(len(scoutjobs) * self.criteriaSucceededRate), 1) | ||||||
self.log.verbose(f'criteriaSucceeded = {self.criteriaSucceeded}') | ||||||
else: | ||||||
criteriaSucceeded = self.criteriaSucceeded | ||||||
self.log.debug(f'criteriaSucceeded = {self.criteriaSucceeded}') | ||||||
|
||||||
if self.criteriaFailed > len(scoutjobs): | ||||||
criteriaFailed = max(int(len(scoutjobs) * self.criteriaFailedRate), 1) | ||||||
self.log.verbose(f'criteriaFailed = {self.criteriaFailed}') | ||||||
else: | ||||||
criteriaFailed = self.criteriaFailed | ||||||
self.log.debug(f'criteriaFailed = {self.criteriaFailed}') | ||||||
|
||||||
if self.criteriaStalled > len(scoutjobs): | ||||||
criteriaStalled = max(int(len(scoutjobs) * self.criteriaStalledRate), 1) | ||||||
self.log.verbose(f'criteriaStalled = {self.criteriaStalled}') | ||||||
else: | ||||||
criteriaStalled = self.criteriaStalled | ||||||
self.log.debug(f'criteriaStalled = {self.criteriaStalled}') | ||||||
|
||||||
if len(donejoblist) >= criteriaSucceeded: | ||||||
self.log.verbose(f'Scout (ID = {scoutid}) are done.') | ||||||
scoutStatus = {'Status': 'Checking', 'MinorStatus': 'Scouting', 'appstatus': 'Scout Complete'} | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
and similar in the lines below. |
||||||
|
||||||
elif len(failedjoblist) >= criteriaFailed: | ||||||
self.log.verbose(f'Scout (ID = {scoutid}) are failed.') | ||||||
msg = 'Failed scout job ' + str(failedjoblist) | ||||||
scoutStatus = {'Status': 'Failed', 'MinorStatus': 'Failed in scouting', 'appstatus': msg} | ||||||
|
||||||
elif len(stalledjoblist) >= criteriaStalled: | ||||||
self.log.verbose(f'Scout (ID = {scoutid}) are stalled.') | ||||||
msg = 'Stalled scout job ' + str(stalledjoblist) | ||||||
scoutStatus = {'Status': 'Stalled', 'MinorStatus': 'Stalled in scouting', 'appstatus': msg} | ||||||
|
||||||
else: | ||||||
self.log.verbose(f'Scout (ID = {scoutid}) did not completed.') | ||||||
scoutStatus = {'Status': 'NotComplete'} | ||||||
|
||||||
return S_OK(scoutStatus) | ||||||
|
||||||
def __updateJobStatus(self, job, status=None, minorstatus=None, appstatus=None): | ||||||
""" This method updates the job status in the JobDB. | ||||||
""" | ||||||
self.log.info(f'Job {job} set Status="{status}", MinorStatus="{minorstatus}", ApplicationStatus="{appstatus}".') | ||||||
if not self.am_getOption('Enable', True): | ||||||
result = S_OK('DisabledMode') | ||||||
|
||||||
# Update ApplicationStatus | ||||||
if not appstatus: | ||||||
result = self.jobDB.getJobAttributes(job, ['ApplicationStatus']) | ||||||
if result['OK']: | ||||||
minorstatus = result['Value']['ApplicationStatus'] | ||||||
|
||||||
self.log.verbose(f"self.jobDB.setJobAttribute({job},'ApplicationStatus','{appstatus}',update=True)") | ||||||
result = self.jobDB.setJobAttribute(job, 'ApplicationStatus', appstatus, update=True) | ||||||
if not result['OK']: | ||||||
return result | ||||||
|
||||||
# Update MinorStatus | ||||||
if not minorstatus: | ||||||
result = self.jobDB.getJobAttributes(job, ['MinorStatus']) | ||||||
if result['OK']: | ||||||
minorstatus = result['Value']['MinorStatus'] | ||||||
|
||||||
self.log.verbose(f"self.jobDB.setJobAttribute({job},'MinorStatus','{minorstatus}',update=True)") | ||||||
if not (result := self.jobDB.setJobAttribute(job,'MinorStatus',minorstatus,update=True)['OK']): | ||||||
return result | ||||||
|
||||||
# Update ScoutFlag | ||||||
result = self.jobDB.setJobParameter(int(job), 'ScoutFlag', 1) | ||||||
if not result['OK']: | ||||||
return result | ||||||
|
||||||
# Update Status | ||||||
if not status: # Retain last minor status for stalled jobs | ||||||
result = self.jobDB.getJobAttributes(job, ['Status']) | ||||||
if result['OK']: | ||||||
status = result['Value']['Status'] | ||||||
|
||||||
self.log.verbose(f"self.jobDB.setJobAttribute({job},'Status','{status}',update=True)") | ||||||
result = self.jobDB.setJobAttribute(job, 'Status', status, update=True) | ||||||
if not result['OK']: | ||||||
return result | ||||||
|
||||||
logStatus = status | ||||||
result = self.logDB.addLoggingRecord(job, status=logStatus, minor=minorstatus, | ||||||
source='ScoutingJobStatusAgent') | ||||||
if not result['OK']: | ||||||
self.log.warn(result) | ||||||
|
||||||
return result |
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,131 @@ | ||||||||||||
""" Executor to set status "Scouting" for a main job which has scout jobs | ||||||||||||
""" | ||||||||||||
|
||||||||||||
from DIRAC import S_OK, S_ERROR | ||||||||||||
|
||||||||||||
from DIRAC.WorkloadManagementSystem.Executor.Base.OptimizerExecutor import OptimizerExecutor | ||||||||||||
from DIRAC.WorkloadManagementSystem.Client import JobStatus | ||||||||||||
from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus | ||||||||||||
from DIRAC.WorkloadManagementSystem.Client.JobStateUpdateClient import JobStateUpdateClient | ||||||||||||
|
||||||||||||
|
||||||||||||
class Scouting(OptimizerExecutor): | ||||||||||||
""" | ||||||||||||
The specific Optimizer must provide the following methods: | ||||||||||||
- optimizeJob() - the main method called for each job | ||||||||||||
and it can provide: | ||||||||||||
- initializeOptimizer() before each execution cycle | ||||||||||||
""" | ||||||||||||
|
||||||||||||
@classmethod | ||||||||||||
def initializeOptimizer(cls): | ||||||||||||
""" Initialization of the optimizer. | ||||||||||||
""" | ||||||||||||
cls.__jobDB = JobStateUpdateClient() | ||||||||||||
return S_OK() | ||||||||||||
|
||||||||||||
def optimizeJob(self, jid, jobState): | ||||||||||||
self.jobLog.info('Getting scoutparams from JobParameters') | ||||||||||||
|
||||||||||||
result = self.__jobDB.getJobParameters(jid, ['ScoutFlag', 'ScoutID']) | ||||||||||||
if not result['OK']: | ||||||||||||
return result | ||||||||||||
|
||||||||||||
rCounter = 0 | ||||||||||||
if result['Value']: | ||||||||||||
scoutparams = result['Value'].get(jid) | ||||||||||||
self.jobLog.info('scoutparams: %s' % scoutparams) | ||||||||||||
if not scoutparams: | ||||||||||||
self.jobLog.info('Skipping optimizer, since scoutparams are abnormal') | ||||||||||||
return self.setNextOptimizer(jobState) | ||||||||||||
|
||||||||||||
scoutID, scoutFlag = scoutparams.get('ScoutID'),scoutparams.get('ScoutFlag') | ||||||||||||
if not scoutID: | ||||||||||||
self.jobLog.info('Skipping optimizers, since this job has not enough scoutparams.') | ||||||||||||
return self.setNextOptimizer(jobState) | ||||||||||||
|
||||||||||||
else: | ||||||||||||
result = jobState.getManifest() | ||||||||||||
if not result['OK']: | ||||||||||||
return result | ||||||||||||
jobManifest = result['Value'] | ||||||||||||
scoutID = jobManifest.getOption('ScoutID', None) | ||||||||||||
if not scoutID: | ||||||||||||
self.jobLog.info('Skipping optimizer, since no scout \ | ||||||||||||
corresponding to this job group') | ||||||||||||
Comment on lines
+54
to
+55
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This would print lots of empty spaces. Better to do
Suggested change
|
||||||||||||
return self.setNextOptimizer(jobState) | ||||||||||||
|
||||||||||||
scoutFlag = 0 | ||||||||||||
result = jobState.getAttribute('RescheduleCounter') | ||||||||||||
if not result['OK']: | ||||||||||||
return result | ||||||||||||
if result['Value'] == None: | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||
return S_ERROR('Reschedule Counter not found') | ||||||||||||
|
||||||||||||
rCounter = result['Value'] | ||||||||||||
if int(rCounter) > 0: | ||||||||||||
qdcampagna marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||
rCycle = int(rCounter) - 1 | ||||||||||||
result = self.__jobDB.getAtticJobParameters(jid, ['"ScoutFlag"'], | ||||||||||||
rescheduleCounter=rCycle) | ||||||||||||
self.jobLog.info(f"From AtticJobParameter: {result}") | ||||||||||||
if result['OK']: | ||||||||||||
try: | ||||||||||||
scoutFlag = result['Value'].get(rCycle).get('ScoutFlag', 0) | ||||||||||||
except: | ||||||||||||
pass | ||||||||||||
else: | ||||||||||||
self.jobLog.info(result['Message']) | ||||||||||||
self.jobLog.info(f'Setting scoutparams (ID:{scoutID}, Flag:{scoutFlag}) to JobParamter') | ||||||||||||
result = self.__setScoutparamsInJobParameters(jid, scoutID, scoutFlag, jobState) | ||||||||||||
if not result['OK']: | ||||||||||||
self.jobLog.info('Skipping, since failed in setting scoutparams of JobParameters.') | ||||||||||||
return self.setNextOptimizer(jobState) | ||||||||||||
|
||||||||||||
if int(scoutFlag) == 1: | ||||||||||||
self.jobLog.info(f'Skipping optimizer, since corresponding scout jobs complete \ | ||||||||||||
(ScoutFlag = {scoutFlag})') | ||||||||||||
return self.setNextOptimizer(jobState) | ||||||||||||
|
||||||||||||
self.jobLog.info(f'Job {jid} set scouting status') | ||||||||||||
return self.__setScoutingStatus(jobState) | ||||||||||||
|
||||||||||||
def __getIDandFlag(self, scoutparams): | ||||||||||||
|
||||||||||||
scoutID = scoutparams.get('ScoutID') | ||||||||||||
scoutFlag = scoutparams.get('ScoutFlag') | ||||||||||||
return scoutID, scoutFlag | ||||||||||||
|
||||||||||||
def __setScoutparamsInJobParameters(self, jid, scoutID, scoutFlag, jobState=None): | ||||||||||||
|
||||||||||||
if not jobState: | ||||||||||||
jobState = self.__jobData.jobState | ||||||||||||
|
||||||||||||
paramList = [] | ||||||||||||
paramList.append(('ScoutID', scoutID)) | ||||||||||||
paramList.append(('ScoutFlag', scoutFlag)) | ||||||||||||
result = self.__jobDB.setJobParameters(jid, paramList) | ||||||||||||
if not result['OK']: | ||||||||||||
self.jobLog.info('Skipping, since failed in recovering scoutparams of JobParameters.') | ||||||||||||
|
||||||||||||
return result | ||||||||||||
|
||||||||||||
def __setScoutingStatus(self, jobState=None): | ||||||||||||
|
||||||||||||
if not jobState: | ||||||||||||
jobState = self.__jobData.jobState | ||||||||||||
|
||||||||||||
result = jobState.getStatus() | ||||||||||||
if not (result := jobState.getStatus())['OK']: | ||||||||||||
return result | ||||||||||||
|
||||||||||||
opName = self.ex_optimizerName() | ||||||||||||
result = jobState.setStatus(self.ex_getOption('WaitingStatus', JobStatus.SCOUTING), | ||||||||||||
minorStatus=self.ex_getOption('WaitingMinorStatus', | ||||||||||||
'Waiting for Scout Job Completion'), | ||||||||||||
appStatus="Unknown", | ||||||||||||
source=opName) | ||||||||||||
if not result['OK']: | ||||||||||||
return result | ||||||||||||
|
||||||||||||
return S_OK() | ||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it was me not being clear, but this can't be a 1-to-1 substitution between
JobDB
andJobStateUpdateClient
, as the 2 don't expose the same methods. For example,JobStateUpdateClient
does not expose theselectJobs
method used below.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see. Does JobStateUpdateClient access only the methods found in JobStateUpdateHandler, or are there others?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only those.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, I would do this, for added clarity:
then you'll need at least also