From e63a6e625ef83b8b2af6780afb26e241f2855a07 Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Tue, 17 Dec 2024 09:35:23 +0100 Subject: [PATCH 1/5] registry dump hack --- src/radical/pilot/session.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/radical/pilot/session.py b/src/radical/pilot/session.py index d0eec6b64..891fc625f 100644 --- a/src/radical/pilot/session.py +++ b/src/radical/pilot/session.py @@ -359,6 +359,18 @@ def _start_registry(self): self._cfg.reg_addr = self._reg_service.addr + def reg_watcher(): + reg = ru.zmq.RegistryClient(url=self._cfg.reg_addr) + while True: + time.sleep(60 * 10) + self._log.debug('===== registry dump %s', self.uid) + reg.dump(self.uid) + + self._reg_watcher = mt.Thread(target=reg_watcher) + self._reg_watcher.daemon = True + self._reg_watcher.start() + + # -------------------------------------------------------------------------- # def _connect_registry(self): From a4109aa97f99669580e24423005d03f47a3554ff Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Wed, 18 Dec 2024 20:40:38 +0100 Subject: [PATCH 2/5] snap --- src/radical/pilot/agent/agent_0.py | 4 +- src/radical/pilot/agent/executing/base.py | 46 +++-- src/radical/pilot/agent/executing/dragon.py | 2 +- src/radical/pilot/agent/executing/popen.py | 193 +++++++++--------- src/radical/pilot/agent/executing/sleep.py | 2 +- src/radical/pilot/agent/launch_method/base.py | 28 +++ src/radical/pilot/agent/launch_method/srun.py | 33 +++ .../pilot/agent/resource_manager/base.py | 14 +- src/radical/pilot/pilot_manager.py | 11 +- src/radical/pilot/session.py | 13 ++ src/radical/pilot/task_manager.py | 16 +- src/radical/pilot/utils/component.py | 8 + tests/unit_tests/test_executing/test_popen.py | 6 +- tests/unit_tests/test_rm/test_base.py | 5 +- 14 files changed, 250 insertions(+), 131 deletions(-) diff --git a/src/radical/pilot/agent/agent_0.py b/src/radical/pilot/agent/agent_0.py index e0ad305f1..c62cda0e4 100644 --- a/src/radical/pilot/agent/agent_0.py +++ b/src/radical/pilot/agent/agent_0.py @@ -541,10 +541,12 @@ def _start_sub_agents(self): } # find a launcher to use - launcher = self._rm.find_launcher(agent_task) + launcher, lname = self._rm.find_launcher(agent_task) if not launcher: raise RuntimeError('no launch method found for sub agent') + self._log.debug('found sa launcher %s', lname) + # FIXME: set RP environment (as in Popen Executor) tmp = '#!%s\n\n' % self._shell diff --git a/src/radical/pilot/agent/executing/base.py b/src/radical/pilot/agent/executing/base.py index 4d23dee25..6f2b62eb1 100644 --- a/src/radical/pilot/agent/executing/base.py +++ b/src/radical/pilot/agent/executing/base.py @@ -137,12 +137,14 @@ def control_cb(self, topic, msg): self._log.info('cancel_tasks command (%s)', arg) for tid in arg['uids']: - self.cancel_task(tid) + task = self.get_task(tid) + if task: + self.cancel_task(task) # -------------------------------------------------------------------------- # - def cancel_task(self, uid): + def cancel_task(self, task): raise NotImplementedError('cancel_task is not implemented') @@ -156,24 +158,35 @@ def _to_watcher(self): `self._cancel_task(task)`. That has to be implemented by al executors. ''' - while not self._term.is_set(): + # tasks to watch for timeout, sorted by absolut timeout timestamp + to_list = list() - # check once per second at most - time.sleep(1) + while not self._term.is_set(): - now = time.time() + # collect new tasks to watch with self._to_lock: - # running tasks for next check - to_list = list() - for to, start, task in self._to_tasks: - if now - start > to: - self._prof.prof('task_timeout', uid=task['uid']) - self.cancel_task(uid=task['uid']) - else: - to_list.append([to, start, task]) + for task, cancel_time in self._to_tasks: + to_list.append([task, cancel_time]) + + self._to_tasks = list() + + # avoid busy wait + if not to_list: + time.sleep(1) + continue + + # sort by timeout, smallest first + to_list.sort(key=lambda x: x[1]) - self._to_tasks = to_list + # cancel all tasks which have timed out + for task, cancel_time in to_list: + now = time.time() + if now > cancel_time: + self._prof.prof('task_timeout', uid=task['uid']) + self.cancel_task(task=task) + else: + break # -------------------------------------------------------------------------- @@ -184,7 +197,8 @@ def handle_timeout(self, task): if to > 0.0: with self._to_lock: - self._to_tasks.append([to, time.time(), task]) + cancel_time = time.time() + to + self._to_tasks.append([task, cancel_time]) # -------------------------------------------------------------------------- diff --git a/src/radical/pilot/agent/executing/dragon.py b/src/radical/pilot/agent/executing/dragon.py index ddc05b0ef..878cb9b53 100644 --- a/src/radical/pilot/agent/executing/dragon.py +++ b/src/radical/pilot/agent/executing/dragon.py @@ -90,7 +90,7 @@ def _launch_task(self, task): # -------------------------------------------------------------------------- # - def cancel_task(self, uid): + def cancel_task(self, task): ''' This method is called by the base class to actually cancel the task. diff --git a/src/radical/pilot/agent/executing/popen.py b/src/radical/pilot/agent/executing/popen.py index 8d020be07..8b7260828 100644 --- a/src/radical/pilot/agent/executing/popen.py +++ b/src/radical/pilot/agent/executing/popen.py @@ -47,10 +47,6 @@ def _kill(*args, **kwargs): # class Popen(AgentExecutingComponent): - # flags for the watcher queue - TO_WATCH = 0 - TO_CANCEL = 1 - # -------------------------------------------------------------------------- # def initialize(self): @@ -58,6 +54,7 @@ def initialize(self): # self._log.debug('popen initialize start') super().initialize() + self._tasks = dict() self._watch_queue = queue.Queue() # run watcher thread @@ -68,10 +65,59 @@ def initialize(self): # -------------------------------------------------------------------------- # - def cancel_task(self, uid): + def get_task(self, tid): + + return self._tasks.get(tid) + + + # -------------------------------------------------------------------------- + # + def cancel_task(self, task): + + # was the task even started? + tid = task['uid'] + proc = task.get('proc') + if not proc: + # task was not started, nothing to do + self._log.debug('task %s was not started', tid) + return + + # check if the task is, maybe, already done + exit_code = proc.poll() + if exit_code is not None: + # task is done, nothing to do + self._log.debug('task %s is already done', tid) + return + + # task is still running -- cancel it + self._log.debug('cancel %s', tid) + self._prof.prof('task_run_cancel_start', uid=tid) + + launcher = self._rm.get_launcher(task['launcher_name']) + launcher.cancel_task(task, proc.pid) + + proc.wait() # make sure proc is collected + + try: + # might race with task collection + del task['proc'] # proc is not json serializable + except KeyError: + pass + + task['exit_code'] = None + task['target_state'] = rps.CANCELED - self._log.debug('request cancel task %s', uid) - self._watch_queue.put([self.TO_CANCEL, uid]) + self._prof.prof('task_run_cancel_stop', uid=tid) + self._prof.prof('unschedule_start', uid=tid) + self.publish(rpc.AGENT_UNSCHEDULE_PUBSUB, task) + + self.advance([task], rps.AGENT_STAGING_OUTPUT_PENDING, + publish=True, push=True) + + try: + del self._tasks[tid] + except KeyError: + pass # -------------------------------------------------------------------------- @@ -84,6 +130,7 @@ def work(self, tasks): try: self._prof.prof('task_start', uid=task['uid']) + self._tasks.update({task['uid']: task}) self._handle_task(task) except Exception as e: @@ -195,7 +242,12 @@ def _handle_task(self, task): # # now do the very same stuff for the `post_exec` directive # ... - launcher = self._rm.find_launcher(task) + launcher, lname = self._rm.find_launcher(task) + + if not launcher: + raise RuntimeError('no launcher found for %s' % task) + + task['launcher_name'] = lname exec_path , _ = self._create_exec_script(launcher, task) _, launch_path = self._create_launch_script(launcher, task, exec_path) @@ -232,15 +284,14 @@ def _handle_task(self, task): self.handle_timeout(task) # watch task for completion - self._watch_queue.put([self.TO_WATCH, task]) + self._watch_queue.put(task) # -------------------------------------------------------------------------- # def _watch(self): - to_watch = list() # contains task dicts - to_cancel = set() # contains task IDs + to_watch = list() # contains task dicts try: while not self._term.is_set(): @@ -250,35 +301,24 @@ def _watch(self): # also don't want to learn about tasks until all # slots are filled, because then we may not be able # to catch finishing tasks in time -- so there is - # a fine balance here. Balance means 100. + # a fine balance here. Fine balance means 100. MAX_QUEUE_BULKSIZE = 100 count = 0 try: while count < MAX_QUEUE_BULKSIZE: - - flag, thing = self._watch_queue.get_nowait() + to_watch.append(self._watch_queue.get_nowait()) count += 1 - # NOTE: `thing` can be task id or task dict, depending - # on the flag value - if flag == self.TO_WATCH : to_watch.append(thing) - elif flag == self.TO_CANCEL: to_cancel.add(thing) - else: raise RuntimeError('unknown flag %s' % flag) - except queue.Empty: - # nothing found -- no problem, see if any tasks finished pass # check on the known tasks. - action = self._check_running(to_watch, to_cancel) - - # FIXME: remove uids from lists after completion + self._check_running(to_watch) - if not action and not count: - # nothing happened at all! Zzz for a bit. - # FIXME: make configurable - time.sleep(0.1) + if not count: + # no new tasks, no new state -- sleep a bit + time.sleep(0.05) except Exception as e: self._log.exception('Error in ExecWorker watch loop (%s)' % e) @@ -288,9 +328,9 @@ def _watch(self): # -------------------------------------------------------------------------- # Iterate over all running tasks, check their status, and decide on the # next step. Also check for a requested cancellation for the tasks. - def _check_running(self, to_watch, to_cancel): + def _check_running(self, to_watch): - action = False + tasks_to_advance = list() # `to_watch.remove()` in the loop requires copy to iterate over the list for task in list(to_watch): @@ -299,52 +339,7 @@ def _check_running(self, to_watch, to_cancel): # poll subprocess object exit_code = task['proc'].poll() - - tasks_to_advance = list() - tasks_to_cancel = list() - - if exit_code is None: - - # process is still running - cancel if needed - if tid in to_cancel: - - self._log.debug('cancel %s', tid) - - action = True - self._prof.prof('task_run_cancel_start', uid=tid) - - # got a request to cancel this task - send SIGTERM to the - # process group (which should include the actual launch - # method) - try: - # kill the whole process group. - # Try SIGINT first to allow signal handlers, then - # SIGTERM to allow clean termination, then SIGKILL to - # enforce termination. - pgrp = os.getpgid(task['proc'].pid) - os.killpg(pgrp, signal.SIGINT) - time.sleep(0.1) - os.killpg(pgrp, signal.SIGTERM) - time.sleep(0.1) - os.killpg(pgrp, signal.SIGKILL) - - except OSError: - # lost race: task is already gone, we ignore this - # FIXME: collect and move to DONE/FAILED - pass - - task['proc'].wait() # make sure proc is collected - - to_cancel.remove(tid) - to_watch.remove(task) - del task['proc'] # proc is not json serializable - - self._prof.prof('task_run_cancel_stop', uid=tid) - - self._prof.prof('unschedule_start', uid=tid) - tasks_to_cancel.append(task) - - else: + if exit_code is not None: action = True self._prof.prof('task_run_stop', uid=tid) @@ -355,41 +350,45 @@ def _check_running(self, to_watch, to_cancel): # we have a valid return code -- task is final self._log.info("Task %s has return code %s.", tid, exit_code) - task['exit_code'] = exit_code - # Free the Slots, Flee the Flots, Ree the Frots! to_watch.remove(task) - if tid in to_cancel: - to_cancel.remove(tid) - del task['proc'] # proc is not json serializable + + try: + # might race with task cancellation + del task['proc'] # proc is not json serializable + except KeyError: + pass + tasks_to_advance.append(task) self._prof.prof('unschedule_start', uid=tid) - if exit_code != 0: - # task failed - fail after staging output - task['exception'] = 'RuntimeError("task failed")' - task['exception_detail'] = 'exit code: %s' % exit_code - task['target_state' ] = rps.FAILED - - else: + if exit_code == 0: # The task finished cleanly, see if we need to deal with # output data. We always move to stageout, even if there # are no directives -- at the very least, we'll upload # stdout/stderr + task['exit_code'] = exit_code task['target_state'] = rps.DONE - self.publish(rpc.AGENT_UNSCHEDULE_PUBSUB, - tasks_to_cancel + tasks_to_advance) + else: + # task failed (we still run staging output) + task['exit_code'] = exit_code + task['exception'] = 'RuntimeError("task failed")' + task['exception_detail'] = 'exit code: %s' % exit_code + task['target_state'] = rps.FAILED - if tasks_to_cancel: - self.advance(tasks_to_cancel, rps.CANCELED, - publish=True, push=False) - if tasks_to_advance: - self.advance(tasks_to_advance, rps.AGENT_STAGING_OUTPUT_PENDING, - publish=True, push=True) + self.publish(rpc.AGENT_UNSCHEDULE_PUBSUB, tasks_to_advance) - return action + if tasks_to_advance: + self.advance(tasks_to_advance, rps.AGENT_STAGING_OUTPUT_PENDING, + publish=True, push=True) + + for task in tasks_to_advance: + try: + del self._tasks[task['uid']] + except KeyError: + pass # ------------------------------------------------------------------------------ diff --git a/src/radical/pilot/agent/executing/sleep.py b/src/radical/pilot/agent/executing/sleep.py index dd07fb810..6d7705e0f 100644 --- a/src/radical/pilot/agent/executing/sleep.py +++ b/src/radical/pilot/agent/executing/sleep.py @@ -88,7 +88,7 @@ def work(self, tasks): # -------------------------------------------------------------------------- # - def cancel_task(self, uid): + def cancel_task(self, task): raise NotImplementedError('no cancellation support in sleep executor') diff --git a/src/radical/pilot/agent/launch_method/base.py b/src/radical/pilot/agent/launch_method/base.py index ed2034d2f..b889bacb3 100644 --- a/src/radical/pilot/agent/launch_method/base.py +++ b/src/radical/pilot/agent/launch_method/base.py @@ -5,6 +5,8 @@ __license__ = 'MIT' import os +import time +import signal import radical.utils as ru @@ -188,6 +190,32 @@ def get_launcher_env(self): raise NotImplementedError("incomplete LaunchMethod %s" % self.name) + # -------------------------------------------------------------------------- + # + def cancel_task(self, task, pid): + ''' + This method cancels the task, in the default case by killing the task's + launch process identified by its process ID. + ''' + + # kill the process group which should include the actual launch method + try: + self._log.debug('killing task %s (%d)', task['uid'], pid) + os.killpg(pid, signal.SIGTERM) + + # also send a SIGKILL to drive the message home. + # NOTE: the `sleep` will limit the cancel throughput! + try: + time.sleep(0.1) + os.killpg(pid, signal.SIGKILL) + except OSError: + pass + + except OSError: + # lost race: task is already gone, we ignore this + self._log.debug('task already gone: %s', task['uid']) + + # -------------------------------------------------------------------------- # def get_task_named_env(self, env_name): diff --git a/src/radical/pilot/agent/launch_method/srun.py b/src/radical/pilot/agent/launch_method/srun.py index de9cc7519..bf37c3ad1 100644 --- a/src/radical/pilot/agent/launch_method/srun.py +++ b/src/radical/pilot/agent/launch_method/srun.py @@ -86,6 +86,39 @@ def finalize(self): pass + # -------------------------------------------------------------------------- + # + def cancel_task(self, task, pid): + ''' + This method cancels the task, in the default case by killing the task's + launch process identified by its process ID. + ''' + + # according to ORNL, SIGINT should be sent twice for effective rank + # cancellation + try: + self._log.debug('killing task %s (%d)', task['uid'], pid) + os.killpg(pid, signal.SIGINT) + + try: + time.sleep(0.1) + os.killpg(pid, signal.SIGINT) + except OSError: + pass + + # also send a SIGKILL to drive the message home. + # NOTE: the `sleep` will limit the cancel throughput! + try: + time.sleep(0.1) + os.killpg(pid, signal.SIGKILL) + except OSError: + pass + + except OSError: + # lost race: task is already gone, we ignore this + self._log.debug('task already gone: %s', task['uid']) + + # -------------------------------------------------------------------------- # def can_launch(self, task): diff --git a/src/radical/pilot/agent/resource_manager/base.py b/src/radical/pilot/agent/resource_manager/base.py index c06d75277..b8fc47693 100644 --- a/src/radical/pilot/agent/resource_manager/base.py +++ b/src/radical/pilot/agent/resource_manager/base.py @@ -470,7 +470,7 @@ def find_launcher(self, task): launcher = self._launchers[name] lm_can_launch, err_message = launcher.can_launch(task) if lm_can_launch: - return launcher + return launcher, name else: errors.append([name, err_message]) @@ -478,7 +478,17 @@ def find_launcher(self, task): for name, error in errors: self._log.debug(' %s: %s', name, error) - return None + return None, None + + + # -------------------------------------------------------------------------- + # + def get_launcher(self, lname): + + if lname not in self._launchers: + raise ValueError('no such launcher %s' % lname) + + return self._launchers[lname] # -------------------------------------------------------------------------- diff --git a/src/radical/pilot/pilot_manager.py b/src/radical/pilot/pilot_manager.py index 1ab6655a8..db7e0c077 100644 --- a/src/radical/pilot/pilot_manager.py +++ b/src/radical/pilot/pilot_manager.py @@ -231,6 +231,14 @@ def close(self, terminate=True): self._closed = True self._rep.ok('>>ok\n') + self.dump() + + super().close() + + + # -------------------------------------------------------------------------- + # + def dump(self): # dump json json = self.as_dict() @@ -242,9 +250,6 @@ def close(self, terminate=True): ru.write_json(json, tgt) - super().close() - - # -------------------------------------------------------------------------- # def as_dict(self): diff --git a/src/radical/pilot/session.py b/src/radical/pilot/session.py index 891fc625f..29db1bceb 100644 --- a/src/radical/pilot/session.py +++ b/src/radical/pilot/session.py @@ -807,6 +807,19 @@ def __exit__(self, exc_type, exc_value, traceback): self.close() + # -------------------------------------------------------------------------- + # + def dump(self): + + self._reg.dump() + + for tmgr in self._tmgrs.values(): + tmgr.dump() + + for pmgr in self._pmgrs.values(): + pmgr.dump() + + # -------------------------------------------------------------------------- # def close(self, **kwargs): diff --git a/src/radical/pilot/task_manager.py b/src/radical/pilot/task_manager.py index 2f9768db9..e94c2ff40 100644 --- a/src/radical/pilot/task_manager.py +++ b/src/radical/pilot/task_manager.py @@ -238,6 +238,17 @@ def close(self): self._closed = True self._rep.ok('>>ok\n') + self.dump() + + self._ctrl_sub.stop() + + super().close() + + + # -------------------------------------------------------------------------- + # + def dump(self): + # dump json json = self.as_dict() # json['_id'] = self.uid @@ -248,11 +259,6 @@ def close(self): tgt = '%s/%s.json' % (self._session.path, self.uid) ru.write_json(json, tgt) - self._ctrl_sub.stop() - - super().close() - - # -------------------------------------------------------------------------- # def as_dict(self): diff --git a/src/radical/pilot/utils/component.py b/src/radical/pilot/utils/component.py index fdc5cb851..f0fd0c456 100644 --- a/src/radical/pilot/utils/component.py +++ b/src/radical/pilot/utils/component.py @@ -1188,6 +1188,14 @@ def advance(self, things, state=None, publish=True, push=False, qname=None, output.channel) output.put(_things, qname=qname) + # if a file `'/tmp/rp_wait_%s' % _state.lower()` exists, we + # wait for that file to disappear before continuing + waitfile = '/tmp/rp_wait_%s' % _state.lower() + while os.path.isfile(waitfile): + self._log.debug('===== wait for file %s', waitfile) + time.sleep(1) + + # ts = time.time() # for thing in _things: # self._prof.prof('put', uid=thing['uid'], state=_state, diff --git a/tests/unit_tests/test_executing/test_popen.py b/tests/unit_tests/test_executing/test_popen.py index 4cb7e8f3f..48b1a9a9f 100755 --- a/tests/unit_tests/test_executing/test_popen.py +++ b/tests/unit_tests/test_executing/test_popen.py @@ -45,6 +45,7 @@ def test_control_cb(self, mocked_logger, mocked_init): pex = Popen(cfg=None, session=None) pex._log = mocked_logger() + pex._tasks = dict() pex._cancel_lock = mt.RLock() pex._watch_queue = queue.Queue() @@ -196,9 +197,8 @@ def test_check_running(self, mocked_killpg, mocked_init): task['proc'].poll.return_value = None task['proc'].pid = os.getpid() to_watch.append(task) - to_cancel.append(task['uid']) - pex._check_running(to_watch, to_cancel) - self.assertFalse(to_cancel) + pex.cancel_task(task) + self.assertFalse(task['uid'] in self._tasks) # case 2: exit_code == 0 task['proc'] = mock.Mock() diff --git a/tests/unit_tests/test_rm/test_base.py b/tests/unit_tests/test_rm/test_base.py index 5ae4311ad..01f6a0e01 100755 --- a/tests/unit_tests/test_rm/test_base.py +++ b/tests/unit_tests/test_rm/test_base.py @@ -204,12 +204,13 @@ def mocked_can_launch_false(task): return False, 'error' mocked_lm.can_launch = mocked_can_launch_false - self.assertIsNone(rm.find_launcher(task={'uid': 'task0000'})) + self.assertEqual((None, None), rm.find_launcher(task={'uid': 'task0000'})) def mocked_can_launch_true(task): return True, '' mocked_lm.can_launch = mocked_can_launch_true - self.assertIs(rm.find_launcher(task={'uid': 'task0000'}), mocked_lm) + self.assertEqual(rm.find_launcher(task={'uid': 'task0000'}), + (mocked_lm, 'SRUN')) # -------------------------------------------------------------------------- # From 5e7e6efa69d61296cc944d3458c2085f3dc18bac Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Wed, 18 Dec 2024 20:47:23 +0100 Subject: [PATCH 3/5] snap --- src/radical/pilot/pilot_manager.py | 6 +++--- tests/unit_tests/test_executing/test_popen.py | 8 ++++++-- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/radical/pilot/pilot_manager.py b/src/radical/pilot/pilot_manager.py index db7e0c077..345c5caf4 100644 --- a/src/radical/pilot/pilot_manager.py +++ b/src/radical/pilot/pilot_manager.py @@ -236,9 +236,9 @@ def close(self, terminate=True): super().close() - # -------------------------------------------------------------------------- - # - def dump(self): + # -------------------------------------------------------------------------- + # + def dump(self): # dump json json = self.as_dict() diff --git a/tests/unit_tests/test_executing/test_popen.py b/tests/unit_tests/test_executing/test_popen.py index 48b1a9a9f..411f24e3d 100755 --- a/tests/unit_tests/test_executing/test_popen.py +++ b/tests/unit_tests/test_executing/test_popen.py @@ -179,12 +179,18 @@ def test_extend_pre_exec(self, mocked_init): @mock.patch('os.killpg') def test_check_running(self, mocked_killpg, mocked_init): + class Launcher(object): + def cancel_task(self, task): + pass + task = dict(self._test_case['task']) task['target_state'] = None pex = Popen(cfg=None, session=None) pex._log = pex._prof = mock.Mock() pex.advance = pex.publish = mock.Mock() + pex._rm = mock.Mock() + pex._rm._get_launcher = mock.Mock(return_value=Launcher()) os.getpgid = mock.Mock() os.killpg = mock.Mock() @@ -196,9 +202,7 @@ def test_check_running(self, mocked_killpg, mocked_init): task['proc'] = mock.Mock() task['proc'].poll.return_value = None task['proc'].pid = os.getpid() - to_watch.append(task) pex.cancel_task(task) - self.assertFalse(task['uid'] in self._tasks) # case 2: exit_code == 0 task['proc'] = mock.Mock() From 1ce7c7a6b120814abf96f03a8f6b7f4287a9d9de Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Wed, 18 Dec 2024 21:08:51 +0100 Subject: [PATCH 4/5] fix tests --- tests/unit_tests/test_agent_0/test_agent_0.py | 4 ++-- tests/unit_tests/test_executing/test_popen.py | 24 ++++++++++++------- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/tests/unit_tests/test_agent_0/test_agent_0.py b/tests/unit_tests/test_agent_0/test_agent_0.py index d2beac980..dc605de40 100755 --- a/tests/unit_tests/test_agent_0/test_agent_0.py +++ b/tests/unit_tests/test_agent_0/test_agent_0.py @@ -148,7 +148,7 @@ def test_start_sub_agents(self, mocked_run_sh_callout, mocked_ru_env_prep, 'threads_per_core': 2}) # no launcher for agent task(s) - agent_0._rm.find_launcher.return_value = None + agent_0._rm.find_launcher.return_value = (None, None) with self.assertRaises(RuntimeError): agent_0._start_sub_agents() @@ -167,7 +167,7 @@ def check_agent_task(agent_task, *args, **kwargs): launcher = mock.Mock() launcher.get_launcher_env.return_value = [] launcher.get_launch_cmds = check_agent_task - agent_0._rm.find_launcher.return_value = launcher + agent_0._rm.find_launcher.return_value = launcher, None agent_files = glob.glob('%s/agent_1.*.sh' % agent_0._pwd) self.assertEqual(0, len(agent_files)) diff --git a/tests/unit_tests/test_executing/test_popen.py b/tests/unit_tests/test_executing/test_popen.py index 411f24e3d..7e058591b 100755 --- a/tests/unit_tests/test_executing/test_popen.py +++ b/tests/unit_tests/test_executing/test_popen.py @@ -43,11 +43,16 @@ def test_control_cb(self, mocked_logger, mocked_init): mocked_logger._debug_level = 1 + check = list() + def cancel_task(task): + check.append(task['uid']) + pex = Popen(cfg=None, session=None) pex._log = mocked_logger() - pex._tasks = dict() pex._cancel_lock = mt.RLock() - pex._watch_queue = queue.Queue() + pex.cancel_task = cancel_task + pex._tasks = {'task.0000': {'uid': 'task.0000'}, + 'task.0001': {'uid': 'task.0001'}} msg = {'cmd': '', 'arg': {'uids': ['task.0000', 'task.0001']}} self.assertIsNone(pex.control_cb(topic=None, msg=msg)) @@ -55,9 +60,7 @@ def test_control_cb(self, mocked_logger, mocked_init): msg['cmd'] = 'cancel_tasks' self.assertIsNone(pex.control_cb(topic=None, msg=msg)) for uid in msg['arg']['uids']: - mode, tid = pex._watch_queue.get() - self.assertEqual(mode, pex.TO_CANCEL) - self.assertEqual(tid, uid) + self.assertIn(uid, check) # -------------------------------------------------------------------------- # @@ -72,7 +75,7 @@ def test_handle_task(self, mocked_sp_popen, mocked_lm_init, launcher.name = 'APRUN' launcher._command = '/bin/aprun' launcher._env_sh = 'env/lm_aprun.sh' - mocked_find_launcher.return_value = launcher + mocked_find_launcher.return_value = (launcher, 'APRUN') task = dict(self._test_case['task']) task['slots'] = self._test_case['setup']['slots'] @@ -184,13 +187,15 @@ def cancel_task(self, task): pass task = dict(self._test_case['task']) - task['target_state'] = None + task['target_state'] = None + task['launcher_name'] = None pex = Popen(cfg=None, session=None) pex._log = pex._prof = mock.Mock() pex.advance = pex.publish = mock.Mock() pex._rm = mock.Mock() pex._rm._get_launcher = mock.Mock(return_value=Launcher()) + pex._tasks = {task['uid']: task} os.getpgid = mock.Mock() os.killpg = mock.Mock() @@ -203,19 +208,20 @@ def cancel_task(self, task): task['proc'].poll.return_value = None task['proc'].pid = os.getpid() pex.cancel_task(task) + self.assertNotIn(task['uid'], pex._tasks) # case 2: exit_code == 0 task['proc'] = mock.Mock() task['proc'].poll.return_value = 0 to_watch.append(task) - pex._check_running(to_watch, to_cancel) + pex._check_running(to_watch) self.assertEqual(task['target_state'], rps.DONE) # case 3: exit_code == 1 task['proc'] = mock.Mock() task['proc'].poll.return_value = 1 to_watch.append(task) - pex._check_running(to_watch, to_cancel) + pex._check_running(to_watch) self.assertEqual(task['target_state'], rps.FAILED) # -------------------------------------------------------------------------- From c5ede2741c9451d3b2a8b7d51d25d2c3990e21da Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Wed, 18 Dec 2024 23:51:26 +0100 Subject: [PATCH 5/5] snap --- src/radical/pilot/session.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/src/radical/pilot/session.py b/src/radical/pilot/session.py index 29db1bceb..53642ee57 100644 --- a/src/radical/pilot/session.py +++ b/src/radical/pilot/session.py @@ -359,18 +359,6 @@ def _start_registry(self): self._cfg.reg_addr = self._reg_service.addr - def reg_watcher(): - reg = ru.zmq.RegistryClient(url=self._cfg.reg_addr) - while True: - time.sleep(60 * 10) - self._log.debug('===== registry dump %s', self.uid) - reg.dump(self.uid) - - self._reg_watcher = mt.Thread(target=reg_watcher) - self._reg_watcher.daemon = True - self._reg_watcher.start() - - # -------------------------------------------------------------------------- # def _connect_registry(self):