diff --git a/ganga/GangaCore/Core/MonitoringComponent/Local_GangaMC_Service.py b/ganga/GangaCore/Core/MonitoringComponent/Local_GangaMC_Service.py index 5ab32b59ce..19c70f1559 100755 --- a/ganga/GangaCore/Core/MonitoringComponent/Local_GangaMC_Service.py +++ b/ganga/GangaCore/Core/MonitoringComponent/Local_GangaMC_Service.py @@ -715,9 +715,9 @@ def runMonitoring(self, jobs=None, steps=1, timeout=300): jobs: a registry slice to be monitored (None -> all jobs), it may be passed by the user so ._impl is stripped if needed Return: False, if the loop cannot be started or the timeout occured while waiting for monitoring termination - True, if the monitoring steps were successfully executed - Note: - This method is meant to be used in Ganga scripts to request monitoring on demand. + True, if the monitoring steps were successfully executed + Note: + This method is meant to be used in Ganga scripts to request monitoring on demand. """ log.debug("runMonitoring") @@ -726,14 +726,13 @@ def runMonitoring(self, jobs=None, steps=1, timeout=300): # Detect New Jobs from other sessions. new_jobs = stripProxy(self.registry_slice).objects.repository.update_index(True, True) self.newly_discovered_jobs = list(set(self.newly_discovered_jobs) | set(new_jobs)) - # Only load jobs from disk which are in new state currently. for i in self.newly_discovered_jobs: j = stripProxy(self.registry_slice(i)) job_status = lazyLoadJobStatus(j) if job_status in ['new']: stripProxy(self.registry_slice).objects.repository.load([i]) - if not isType(steps, int) and steps < 0: + if not isType(steps, int) or steps <= 0: log.warning("The number of monitor steps should be a positive (non-zero) integer") return False @@ -757,8 +756,32 @@ def runMonitoring(self, jobs=None, steps=1, timeout=300): log.error("Cannot run the monitoring loop. The following credentials are required: %s" % _missingCreds) return False - #log.debug("jobs: %s" % str(jobs)) - #log.debug("self.__mainLoopCond: %s" % str(self.__mainLoopCond)) + #log.debug("jobs: %s" % str(jobs)) + #log.debug("self.__mainLoopCond: %s" % str(self.__mainLoopCond)) + + # Handle the `jobs` input (new logic to handle int, list, job object) + if jobs is not None: + if isinstance(jobs, int): + # If jobs is a single int, convert it to a slice (equivalent to one job) + log.debug(f"Converting job ID {jobs} to a registry slice") + m_jobs = self.registry_slice(jobs) + elif isinstance(jobs, list): + # If jobs is a list of int, ensure all elements are int and convert to registry slice + if all(isinstance(job, int) for job in jobs): + log.debug(f"Converting job IDs {jobs} to a registry slice") + m_jobs = [self.registry_slice(job) for job in jobs] + else: + log.warning("List must contain integers representing job IDs") + return False + else: + # Handle job slices (existing behavior) + from GangaCore.GPIDev.Lib.Registry.RegistrySlice import RegistrySlice + if not isinstance(jobs, RegistrySlice): + log.warning('jobs argument must be a registry slice, int, list of int, or job object') + return False + m_jobs = jobs + # Pass the new `m_jobs` (registry slice) for further processing + self.makeUpdateJobStatusFunction(jobSlice=m_jobs) with self.__mainLoopCond: log.debug('Monitoring loop lock acquired. Enabling mon loop') @@ -766,50 +789,27 @@ def runMonitoring(self, jobs=None, steps=1, timeout=300): log.error("The monitoring loop is already running.") return False - if jobs is not None: - m_jobs = jobs - - # additional check if m_jobs is really a registry slice - # the underlying code is not prepared to handle correctly the - # situation if it is not - from GangaCore.GPIDev.Lib.Registry.RegistrySlice import RegistrySlice - if not isType(m_jobs, RegistrySlice): - log.warning( - 'runMonitoring: jobs argument must be a registry slice such as a result of jobs.select() or jobs[i1:i2]') - return False - - #self.registry_slice = m_jobs - #log.debug("m_jobs: %s" % str(m_jobs)) - self.makeUpdateJobStatusFunction(jobSlice=m_jobs) - - log.debug("Enable Loop, Clear Iterators and setCallbackHook") - # enable mon loop + log.debug("Enable Loop, Clear Iterators, and setCallbackHook") self.enabled = True # set how many steps to run self.steps = steps - # enable job list iterators self.stopIter.clear() - # Start backend update timeout checking. self.setCallbackHook(UpdateDict.timeoutCheck, {'thisDict': self.updateDict_ts}, True) - log.debug("Waking up Main Loop") - # wake up the mon loop self.__mainLoopCond.notify_all() log.debug("Waiting to execute steps") - # wait to execute the steps self.__monStepsTerminatedEvent.wait() self.__monStepsTerminatedEvent.clear() log.debug("Test for timeout") - # wait the steps to be executed or timeout to occur if not self.__awaitTermination(timeout): log.warning("Monitoring loop started but did not complete in the given timeout.") - # force loops termination self.stopIter.set() return False return True + def enableMonitoring(self): """ Run the monitoring loop continuously diff --git a/ganga/GangaCore/test/GPI/Monitoring/TestMonitoring.py b/ganga/GangaCore/test/GPI/Monitoring/TestMonitoring.py index 96f0dd6edf..2b23ec8ad7 100644 --- a/ganga/GangaCore/test/GPI/Monitoring/TestMonitoring.py +++ b/ganga/GangaCore/test/GPI/Monitoring/TestMonitoring.py @@ -94,3 +94,28 @@ def test_f_reallyDisabled(self): dummySleep(j) self.assertEqual(j.status, 'completed') + + def test_g_runMonitoring_withJobIDList(self): + from GangaCore.GPI import enableMonitoring, Job, runMonitoring + + enableMonitoring() + job_ids = [] + for _ in range(2): + j = Job() + j.submit() + dummySleep(j) + job_ids.append(j.id) + + result = runMonitoring(jobs=job_ids) + self.assertTrue(result, "runMonitoring with list of job IDs failed to execute successfully.") + + def test_h_runMonitoring_withJobObject(self): + from GangaCore.GPI import enableMonitoring, Job, runMonitoring + + enableMonitoring() + j = Job() + j.submit() + dummySleep(j) + + result = runMonitoring(jobs=j) + self.assertTrue(result, "runMonitoring with job object failed to execute successfully.") \ No newline at end of file