From 4845cbd8b23289ecd863229efa5989cc58cabcea Mon Sep 17 00:00:00 2001 From: Mikhail Titov Date: Fri, 1 Dec 2023 16:34:42 -0500 Subject: [PATCH 01/10] fixed worker description in registry --- src/radical/pilot/raptor/master.py | 5 +++-- src/radical/pilot/raptor/worker_default.py | 5 ++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/radical/pilot/raptor/master.py b/src/radical/pilot/raptor/master.py index acfbbbc316..a120876af4 100644 --- a/src/radical/pilot/raptor/master.py +++ b/src/radical/pilot/raptor/master.py @@ -399,8 +399,7 @@ def submit_workers(self, descriptions: List[TaskDescription] # the default worker needs it's own task description to derive the # amount of available resources - self._reg['raptor.%s.cfg' % self._uid] = td.as_dict() - self._reg.dump('raptor_master') + self._reg['raptor.%s.cfg' % td.uid] = td.as_dict() # all workers run in the same sandbox as the master task = dict() @@ -437,6 +436,7 @@ def submit_workers(self, descriptions: List[TaskDescription] self.advance(tasks, publish=True, push=True) + self._reg.dump('raptor_master') return [task['uid'] for task in tasks] @@ -888,6 +888,7 @@ def terminate(self): # self.wait() self._log.debug('all workers terminated') + self._reg.close() # ------------------------------------------------------------------------------ diff --git a/src/radical/pilot/raptor/worker_default.py b/src/radical/pilot/raptor/worker_default.py index b7e1bf85f1..438fe88c6d 100644 --- a/src/radical/pilot/raptor/worker_default.py +++ b/src/radical/pilot/raptor/worker_default.py @@ -47,12 +47,11 @@ def __init__(self, raptor_id : str): cb=self._request_cb) # the master should have stored our own task description in the registry - self._reg.dump('raptor_worker') self._descr = self._reg['raptor.%s.cfg' % self._uid] # keep worker ID and rank - self._n_cores = int(os.environ.get('cores_per_rank', 1)) - self._n_gpus = int(os.environ.get('gpus_per_rank', 0)) + self._n_cores = int(self._descr.get('cores_per_rank', 1)) + self._n_gpus = int(self._descr.get('gpus_per_rank', 0)) # We need to make sure to run only up to `gpn` tasks using a gpu # within that pool, so need a separate counter for that. From 2833c3aec3126cd14c319cc5099f56a745c10d8e Mon Sep 17 00:00:00 2001 From: Mikhail Titov Date: Fri, 1 Dec 2023 16:36:20 -0500 Subject: [PATCH 02/10] worker registration proceeds for rank 0 --- src/radical/pilot/raptor/worker.py | 37 +++++++++++++++--------------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/src/radical/pilot/raptor/worker.py b/src/radical/pilot/raptor/worker.py index 657caa79f7..83453c1cd3 100644 --- a/src/radical/pilot/raptor/worker.py +++ b/src/radical/pilot/raptor/worker.py @@ -40,17 +40,17 @@ def __init__(self, manager, rank, raptor_id): self._ranks = int(os.environ['RP_RANKS']) self._reg = ru.zmq.RegistryClient(url=self._reg_addr) + self._cfg = ru.Config(cfg=self._reg['cfg']) - self._cfg = ru.Config(cfg=self._reg['cfg']) - - self._log = ru.Logger(name=self._uid, ns='radical.pilot.worker', + self._log = ru.Logger(name=self._uid, + ns='radical.pilot.worker', level=self._cfg.log_lvl, debug=self._cfg.debug_lvl, targets=self._cfg.log_tgt, path=self._cfg.path) self._prof = ru.Profiler(name='%s.%04d' % (self._uid, self._rank), ns='radical.pilot.worker', - path=self._cfg.path) + path=self._sbox) # register for lifetime management messages on the control pubsub psbox = os.environ['RP_PILOT_SANDBOX'] @@ -106,6 +106,7 @@ def __init__(self, manager, rank, raptor_id): # the manager (rank 0) registers the worker with the master if self._manager: + self._log.debug('register: %s / %s', self._uid, self._raptor_id) self._ctrl_pub.put(rpc.CONTROL_PUBSUB, reg_msg) @@ -113,21 +114,21 @@ def __init__(self, manager, rank, raptor_id): # self._ctrl_pub.put(rpc.CONTROL_PUBSUB, {'cmd': 'worker_unregister', # 'arg': {'uid' : self._uid}}) - # wait for raptor response - self._log.debug('wait for registration to complete') - count = 0 - while not self._reg_event.wait(timeout=1): - if count < 60: - count += 1 - self._log.debug('re-register: %s / %s', self._uid, self._raptor_id) - self._ctrl_pub.put(rpc.CONTROL_PUBSUB, reg_msg) - else: - self.stop() - self.join() - self._log.error('registration with master timed out') - raise RuntimeError('registration with master timed out') + # wait for raptor response + self._log.debug('wait for registration to complete') + count = 0 + while not self._reg_event.wait(timeout=1): + if count < 60: + count += 1 + self._log.debug('re-register: %s / %s', self._uid, self._raptor_id) + self._ctrl_pub.put(rpc.CONTROL_PUBSUB, reg_msg) + else: + self.stop() + self.join() + self._log.error('registration with master timed out') + raise RuntimeError('registration with master timed out') - self._log.debug('registration with master ok') + self._log.debug('registration with master ok') # -------------------------------------------------------------------------- From cd66ab1b550cf4c438b1f3465b7da9b920e37b48 Mon Sep 17 00:00:00 2001 From: Mikhail Titov Date: Mon, 4 Dec 2023 18:42:08 -0500 Subject: [PATCH 03/10] added fix for getting worker description from Registry --- src/radical/pilot/raptor/worker_default.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/radical/pilot/raptor/worker_default.py b/src/radical/pilot/raptor/worker_default.py index 438fe88c6d..f724ef56ea 100644 --- a/src/radical/pilot/raptor/worker_default.py +++ b/src/radical/pilot/raptor/worker_default.py @@ -47,7 +47,7 @@ def __init__(self, raptor_id : str): cb=self._request_cb) # the master should have stored our own task description in the registry - self._descr = self._reg['raptor.%s.cfg' % self._uid] + self._descr = self._reg['raptor.%s.cfg' % self._uid] or {} # keep worker ID and rank self._n_cores = int(self._descr.get('cores_per_rank', 1)) From ff5bef1b203439e045cf3bef06015fc4a2e800e3 Mon Sep 17 00:00:00 2001 From: Mikhail Titov Date: Mon, 4 Dec 2023 19:15:16 -0500 Subject: [PATCH 04/10] fixed worker resources (get it either from registry or from env variables) --- src/radical/pilot/raptor/worker_default.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/radical/pilot/raptor/worker_default.py b/src/radical/pilot/raptor/worker_default.py index f724ef56ea..7c78ef75b9 100644 --- a/src/radical/pilot/raptor/worker_default.py +++ b/src/radical/pilot/raptor/worker_default.py @@ -50,8 +50,10 @@ def __init__(self, raptor_id : str): self._descr = self._reg['raptor.%s.cfg' % self._uid] or {} # keep worker ID and rank - self._n_cores = int(self._descr.get('cores_per_rank', 1)) - self._n_gpus = int(self._descr.get('gpus_per_rank', 0)) + self._n_cores = int(self._descr.get('cores_per_rank') or + os.getenv('RP_CORES_PER_RANK', 1)) + self._n_gpus = int(self._descr.get('gpus_per_rank') or + os.getenv('RP_GPUS_PER_RANK', 0)) # We need to make sure to run only up to `gpn` tasks using a gpu # within that pool, so need a separate counter for that. From 13d12365a0e645adac3b2b68606050335ab19243 Mon Sep 17 00:00:00 2001 From: Mikhail Titov Date: Tue, 5 Dec 2023 17:24:29 -0500 Subject: [PATCH 05/10] update HB config parameters and use AgentComponent's registry client --- src/radical/pilot/raptor/master.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/radical/pilot/raptor/master.py b/src/radical/pilot/raptor/master.py index a120876af4..44abb26a10 100644 --- a/src/radical/pilot/raptor/master.py +++ b/src/radical/pilot/raptor/master.py @@ -70,16 +70,14 @@ def __init__(self, cfg: ru.Config = None): self._rsbox = os.environ['RP_RESOURCE_SANDBOX'] self._reg_addr = os.environ['RP_REGISTRY_ADDRESS'] - self._reg = ru.zmq.RegistryClient(url=self._reg_addr) - self._workers = dict() # wid: worker self._tasks = dict() # bookkeeping of submitted requests self._exec_tasks = list() # keep track of executable tasks self._term = mt.Event() # termination signal self._thread = None # run loop - self._hb_freq = 500 # check worker heartbetas every n seconds - self._hb_timeout = 1000 # consider worker dead after 150 seconds + self._hb_freq = 500 # check worker heartbeats every N seconds + self._hb_timeout = 100000 # consider worker dead after M seconds self._session = Session(uid=self._sid, _reg_addr=self._reg_addr, _role=Session._DEFAULT) @@ -397,7 +395,7 @@ def submit_workers(self, descriptions: List[TaskDescription] # ensure that defaults and backward compatibility kick in td.verify() - # the default worker needs it's own task description to derive the + # the default worker needs its own task description to derive the # amount of available resources self._reg['raptor.%s.cfg' % td.uid] = td.as_dict() @@ -888,7 +886,6 @@ def terminate(self): # self.wait() self._log.debug('all workers terminated') - self._reg.close() # ------------------------------------------------------------------------------ From 9d5c1cf7cef607826210fbe3c0e5d1b6451fdfae Mon Sep 17 00:00:00 2001 From: Mikhail Titov Date: Tue, 5 Dec 2023 17:24:53 -0500 Subject: [PATCH 06/10] update HB config parameters for Worker --- src/radical/pilot/raptor/worker.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/radical/pilot/raptor/worker.py b/src/radical/pilot/raptor/worker.py index 83453c1cd3..f8040e8137 100644 --- a/src/radical/pilot/raptor/worker.py +++ b/src/radical/pilot/raptor/worker.py @@ -72,8 +72,9 @@ def __init__(self, manager, rank, raptor_id): # let ZMQ settle time.sleep(1) + self._hb_register_count = 60 # run heartbeat thread in all ranks (one hb msg every `n` seconds) - self._hb_delay = 5 + self._hb_delay = 300 self._hb_thread = mt.Thread(target=self._hb_worker) self._hb_thread.daemon = True self._hb_thread.start() @@ -117,8 +118,8 @@ def __init__(self, manager, rank, raptor_id): # wait for raptor response self._log.debug('wait for registration to complete') count = 0 - while not self._reg_event.wait(timeout=1): - if count < 60: + while not self._reg_event.wait(timeout=5): + if count < self._hb_register_count: count += 1 self._log.debug('re-register: %s / %s', self._uid, self._raptor_id) self._ctrl_pub.put(rpc.CONTROL_PUBSUB, reg_msg) From 84c5edf636415deeae5ad5bc13dcd33c9068200c Mon Sep 17 00:00:00 2001 From: Mikhail Titov Date: Thu, 7 Dec 2023 11:57:26 -0500 Subject: [PATCH 07/10] added tracing events into `MPIWorkerRank.run` --- src/radical/pilot/raptor/worker_mpi.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/radical/pilot/raptor/worker_mpi.py b/src/radical/pilot/raptor/worker_mpi.py index fe839f7aad..974251340d 100644 --- a/src/radical/pilot/raptor/worker_mpi.py +++ b/src/radical/pilot/raptor/worker_mpi.py @@ -397,12 +397,15 @@ def __init__(self, rank_task_q_get, rank_result_q_put, # def run(self): + self._prof.prof('import_mpi') from mpi4py import MPI + self._prof.prof('read_mpi_info_start') self._world = MPI.COMM_WORLD self._group = self._world.Get_group() self._rank = self._world.rank self._ranks = self._world.size + self._prof.prof('read_mpi_info_stop') try: self._log.debug('init worker [%d] [%d] rtq_get:%s rrq_put:%s', From c772ecc6c1b030aa5fd0535ef24b87bc8b72c4bb Mon Sep 17 00:00:00 2001 From: Mikhail Titov Date: Fri, 19 Jan 2024 01:03:32 -0500 Subject: [PATCH 08/10] get hb configs from the session (use registry client from AgentComponent) --- src/radical/pilot/raptor/master.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/src/radical/pilot/raptor/master.py b/src/radical/pilot/raptor/master.py index a9dab133ca..8f424509c4 100644 --- a/src/radical/pilot/raptor/master.py +++ b/src/radical/pilot/raptor/master.py @@ -69,13 +69,6 @@ def __init__(self, cfg: ru.Config = None): self._rsbox = os.environ['RP_RESOURCE_SANDBOX'] self._reg_addr = os.environ['RP_REGISTRY_ADDRESS'] - self._reg = ru.zmq.RegistryClient(url=self._reg_addr) - self._reg.dump(self._uid) - - # get hb configs - self._hb_freq = self._reg['rcfg.raptor.hb_frequency'] - self._hb_tout = self._reg['rcfg.raptor.hb_timeout'] - self._workers = dict() # wid: worker self._tasks = dict() # bookkeeping of submitted requests self._exec_tasks = list() # keep track of executable tasks @@ -92,6 +85,10 @@ def __init__(self, cfg: ru.Config = None): super().__init__(ccfg, self._session) + # get hb configs (RegistryClient instance is initiated in Session) + self._hb_freq = self._session.rcfg.raptor.hb_frequency + self._hb_tout = self._session.rcfg.raptor.hb_timeout + self._log.debug('hb freq: %s', self._hb_freq) self._log.debug('hb tout: %s', self._hb_tout) @@ -404,7 +401,6 @@ def submit_workers(self, descriptions: List[TaskDescription] # the default worker needs its own task description to derive the # amount of available resources self._reg['raptor.%s.cfg' % td.uid] = td.as_dict() - # self._reg.dump('raptor_master') # all workers run in the same sandbox as the master task = dict() @@ -441,6 +437,7 @@ def submit_workers(self, descriptions: List[TaskDescription] self.advance(tasks, publish=True, push=True) + # dump registry with all worker descriptions ("raptor..cfg") self._reg.dump('raptor_master') return [task['uid'] for task in tasks] From dfe1af3b4195e3743449c92af71b67be4511a678 Mon Sep 17 00:00:00 2001 From: Mikhail Titov Date: Fri, 19 Jan 2024 01:11:10 -0500 Subject: [PATCH 09/10] removed duplication (MPI setup is done in `MPIWorker`) --- src/radical/pilot/raptor/worker_mpi.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/src/radical/pilot/raptor/worker_mpi.py b/src/radical/pilot/raptor/worker_mpi.py index b7db804fec..5061a9ef33 100644 --- a/src/radical/pilot/raptor/worker_mpi.py +++ b/src/radical/pilot/raptor/worker_mpi.py @@ -403,16 +403,6 @@ def __init__(self, rank_task_q_get, rank_result_q_put, # def run(self): - self._prof.prof('import_mpi') - from mpi4py import MPI - - self._prof.prof('read_mpi_info_start') - self._world = MPI.COMM_WORLD - self._group = self._world.Get_group() - self._rank = self._world.rank - self._ranks = self._world.size - self._prof.prof('read_mpi_info_stop') - try: self._log.debug('init worker [%d] [%d] rtq_get:%s rrq_put:%s', self._rank, self._ranks, From d6aacd9ce4354856a677ca43de8c0ea9a50cb8b8 Mon Sep 17 00:00:00 2001 From: Mikhail Titov Date: Mon, 22 Jan 2024 11:18:56 -0500 Subject: [PATCH 10/10] fixed registry dump for the raptor master task --- src/radical/pilot/raptor/master.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/radical/pilot/raptor/master.py b/src/radical/pilot/raptor/master.py index 8f424509c4..d424fb5b1d 100644 --- a/src/radical/pilot/raptor/master.py +++ b/src/radical/pilot/raptor/master.py @@ -438,7 +438,7 @@ def submit_workers(self, descriptions: List[TaskDescription] self.advance(tasks, publish=True, push=True) # dump registry with all worker descriptions ("raptor..cfg") - self._reg.dump('raptor_master') + self._reg.dump(self._uid) return [task['uid'] for task in tasks]