From 58b5225f0fd8cc73a2f920596e841c8b94c2459a Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Wed, 27 Nov 2024 09:31:25 +0100 Subject: [PATCH 01/21] snap --- .github/workflows/ci-release.yml | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci-release.yml b/.github/workflows/ci-release.yml index 163060f63..d43f0e062 100644 --- a/.github/workflows/ci-release.yml +++ b/.github/workflows/ci-release.yml @@ -25,6 +25,8 @@ jobs: python-version: ${{ matrix.python-version }} - name: Install dependencies run: | + sudo apt update + sudo apt install -y openmpi-bin python -m venv testenv . testenv/bin/activate python -m pip install --upgrade pip setuptools wheel @@ -32,8 +34,6 @@ jobs: - name: run examples timeout-minutes: 15 run: | - sudo apt update - sudo apt install -y openmpi-bin export RADICAL_LOG_LVL=DEBUG_9 export RADICAL_PROFILE=True export RADICAL_DEBUG=TRUE @@ -44,6 +44,10 @@ jobs: . testenv/bin/activate mkdir example_artifacts/ cd example_artifacts/ + radical-stack + pwd + ls -l $HOME + ls -l /home/runner || echo 'no runner home' ../examples/00_getting_started.py ../examples/09_mpi_tasks.py ../examples/01_task_details.py From 70ffb47b3fc375cccf73a2de95a9ed8ddb0b057a Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Wed, 27 Nov 2024 09:43:42 +0100 Subject: [PATCH 02/21] snap --- .github/workflows/ci-release.yml | 3 --- 1 file changed, 3 deletions(-) diff --git a/.github/workflows/ci-release.yml b/.github/workflows/ci-release.yml index d43f0e062..25b3e3779 100644 --- a/.github/workflows/ci-release.yml +++ b/.github/workflows/ci-release.yml @@ -45,9 +45,6 @@ jobs: mkdir example_artifacts/ cd example_artifacts/ radical-stack - pwd - ls -l $HOME - ls -l /home/runner || echo 'no runner home' ../examples/00_getting_started.py ../examples/09_mpi_tasks.py ../examples/01_task_details.py From a6944cbcde657655b06f79b9335c105c44f52711 Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Wed, 27 Nov 2024 10:55:36 +0100 Subject: [PATCH 03/21] snap --- .github/workflows/ci-release.yml | 34 +++++++++++++++++--------------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/.github/workflows/ci-release.yml b/.github/workflows/ci-release.yml index 25b3e3779..c9e451b4e 100644 --- a/.github/workflows/ci-release.yml +++ b/.github/workflows/ci-release.yml @@ -42,26 +42,28 @@ jobs: export RADICAL_UTILS_ZMQ_LOG_LVL=INFO export RADICAL_UTILS_HEARTBEAT_LOG_LVL=INFO . testenv/bin/activate - mkdir example_artifacts/ - cd example_artifacts/ + export RP_ROOT=$(pwd) + export BASE=/home/runner/radical.pilot.sandbox/ + mkdir -p $BASE/client_sessions + cd $BASE/client_sessions radical-stack - ../examples/00_getting_started.py - ../examples/09_mpi_tasks.py - ../examples/01_task_details.py - ../examples/02_failing_tasks.py - ../examples/03_multiple_pilots.py - ../examples/04_scheduler_selection.py - ../examples/05_task_input_data.py - ../examples/06_task_output_data.py - ../examples/07_shared_task_data.py - ../examples/08_task_environment.py - ../examples/10_pre_and_post_exec.py - ../examples/11_task_input_data_tar.py - ../examples/11_task_input_folder.py + $RP_ROOT/examples/00_getting_started.py + $RP_ROOT/examples/09_mpi_tasks.py + $RP_ROOT/examples/01_task_details.py + $RP_ROOT/examples/02_failing_tasks.py + $RP_ROOT/examples/03_multiple_pilots.py + $RP_ROOT/examples/04_scheduler_selection.py + $RP_ROOT/examples/05_task_input_data.py + $RP_ROOT/examples/06_task_output_data.py + $RP_ROOT/examples/07_shared_task_data.py + $RP_ROOT/examples/08_task_environment.py + $RP_ROOT/examples/10_pre_and_post_exec.py + $RP_ROOT/examples/11_task_input_data_tar.py + $RP_ROOT/examples/11_task_input_folder.py - name: upload example_artifacts if: always() uses: actions/upload-artifact@v3 with: name: example_artifacts - path: example_artifacts + path: /home/runner/radical.pilot.sandbox/ From ffd94ca297ef5002d20a119fa03fa42db91411d2 Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Wed, 27 Nov 2024 11:22:56 +0100 Subject: [PATCH 04/21] snap --- .github/workflows/ci-release.yml | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci-release.yml b/.github/workflows/ci-release.yml index c9e451b4e..a8dcf003e 100644 --- a/.github/workflows/ci-release.yml +++ b/.github/workflows/ci-release.yml @@ -32,7 +32,7 @@ jobs: python -m pip install --upgrade pip setuptools wheel python -m pip install . - name: run examples - timeout-minutes: 15 + timeout-minutes: 10 run: | export RADICAL_LOG_LVL=DEBUG_9 export RADICAL_PROFILE=True @@ -60,10 +60,16 @@ jobs: $RP_ROOT/examples/10_pre_and_post_exec.py $RP_ROOT/examples/11_task_input_data_tar.py $RP_ROOT/examples/11_task_input_folder.py - - name: upload example_artifacts + + - name: prepare example_artifacts + if: always() + run: | + tar zcf /home/runner/example_artifacts.tgz /home/runner/radical.pilot.sandbox/ + + - name: prepare example_artifacts if: always() uses: actions/upload-artifact@v3 with: name: example_artifacts - path: /home/runner/radical.pilot.sandbox/ + path: /home/runner/example_artifacts.tgz From 52fffbb4054165e8eaab9f6d204de28a4fc417b8 Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Wed, 27 Nov 2024 12:25:54 +0100 Subject: [PATCH 05/21] hotfix: break circular termination lock --- bin/radical-pilot-component | 3 +-- src/radical/pilot/agent/executing/base.py | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/bin/radical-pilot-component b/bin/radical-pilot-component index 304ab6e85..36c291335 100755 --- a/bin/radical-pilot-component +++ b/bin/radical-pilot-component @@ -130,8 +130,7 @@ def wrapped_main(sid, reg_addr, uid, log, prof, ppid, evt): log=log, prof=prof) # all is set up - we can sit idle 'til end of time. - while not term.is_set(): - time.sleep(1) + comp.wait() # ------------------------------------------------------------------------------ diff --git a/src/radical/pilot/agent/executing/base.py b/src/radical/pilot/agent/executing/base.py index 33fbd40c5..0d9c1ebdd 100644 --- a/src/radical/pilot/agent/executing/base.py +++ b/src/radical/pilot/agent/executing/base.py @@ -156,7 +156,7 @@ def _to_watcher(self): `self._cancel_task(task)`. That has to be implemented by al executors. ''' - while True: + while not self._term.is_set(): # check once per second at most time.sleep(1) From 2aff9e070fcd709f4072a4e358d602c5c6372e4e Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Wed, 27 Nov 2024 12:50:53 +0100 Subject: [PATCH 06/21] snap --- src/radical/pilot/utils/component.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/radical/pilot/utils/component.py b/src/radical/pilot/utils/component.py index 11a137440..e14c562af 100644 --- a/src/radical/pilot/utils/component.py +++ b/src/radical/pilot/utils/component.py @@ -236,7 +236,7 @@ def start(self): def wait(self): while not self._term.is_set(): - time.sleep(1) + time.sleep(0.1) # -------------------------------------------------------------------------- From 718afa5baa1be62ba74855b3deefeb82ce4c46f7 Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Wed, 27 Nov 2024 22:50:09 +0100 Subject: [PATCH 07/21] run examples in PR CI --- .github/workflows/ci-release.yml | 2 +- .github/workflows/ci.yml | 67 ++++++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci-release.yml b/.github/workflows/ci-release.yml index a8dcf003e..7ab907e33 100644 --- a/.github/workflows/ci-release.yml +++ b/.github/workflows/ci-release.yml @@ -66,7 +66,7 @@ jobs: run: | tar zcf /home/runner/example_artifacts.tgz /home/runner/radical.pilot.sandbox/ - - name: prepare example_artifacts + - name: upload example_artifacts if: always() uses: actions/upload-artifact@v3 with: diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1a7e067d2..741b397bd 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -157,3 +157,70 @@ jobs: path: sessions_analytics retention-days: 5 + examples + runs-on: ubuntu-latest + strategy: + matrix: + python-version: [ '3.12' ] + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 2 + - uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + sudo apt update + sudo apt install -y openmpi + sudo apt install -y texlive cm-super + sudo apt install -y texlive-fonts-extra texlive-extra-utils dvipng + sudo apt install -y texlive-fonts-recommended texlive-latex-extra + python -m venv testenv + . testenv/bin/activate + python -m pip install --upgrade pip setuptools wheel + python -m pip install -r requirements-ci.txt + radical-stack + - name: run examples + timeout-minutes: 10 + run: | + export RADICAL_LOG_LVL=DEBUG_9 + export RADICAL_PROFILE=True + export RADICAL_DEBUG=TRUE + export RADICAL_DEBUG_HELPER=TRUE + export RADICAL_REPORT=TRUE + export RADICAL_UTILS_ZMQ_LOG_LVL=INFO + export RADICAL_UTILS_HEARTBEAT_LOG_LVL=INFO + . testenv/bin/activate + export RP_ROOT=$(pwd) + export BASE=/home/runner/radical.pilot.sandbox/ + mkdir -p $BASE/client_sessions + cd $BASE/client_sessions + radical-stack + $RP_ROOT/examples/00_getting_started.py + $RP_ROOT/examples/09_mpi_tasks.py + $RP_ROOT/examples/01_task_details.py + $RP_ROOT/examples/02_failing_tasks.py + $RP_ROOT/examples/03_multiple_pilots.py + $RP_ROOT/examples/04_scheduler_selection.py + $RP_ROOT/examples/05_task_input_data.py + $RP_ROOT/examples/06_task_output_data.py + $RP_ROOT/examples/07_shared_task_data.py + $RP_ROOT/examples/08_task_environment.py + $RP_ROOT/examples/10_pre_and_post_exec.py + $RP_ROOT/examples/11_task_input_data_tar.py + $RP_ROOT/examples/11_task_input_folder.py + + - name: prepare example_artifacts + if: always() + run: | + tar zcf /home/runner/example_artifacts.tgz /home/runner/radical.pilot.sandbox/ + + - name: upload example_artifacts + if: always() + uses: actions/upload-artifact@v3 + with: + name: example_artifacts + path: /home/runner/example_artifacts.tgz + + From ac2137d7b14615dad0a595e8287b10066a6c3557 Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Wed, 27 Nov 2024 23:04:35 +0100 Subject: [PATCH 08/21] snap --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 741b397bd..683d8c9eb 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -157,7 +157,7 @@ jobs: path: sessions_analytics retention-days: 5 - examples + examples: runs-on: ubuntu-latest strategy: matrix: From 6ff1b22831ce8237976ec5878b6da2ec1e5fbbbf Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Wed, 27 Nov 2024 23:10:51 +0100 Subject: [PATCH 09/21] snap --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 683d8c9eb..ffb19230d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -172,7 +172,7 @@ jobs: - name: Install dependencies run: | sudo apt update - sudo apt install -y openmpi + sudo apt install -y openmpi-bin libopenmpi-dev sudo apt install -y texlive cm-super sudo apt install -y texlive-fonts-extra texlive-extra-utils dvipng sudo apt install -y texlive-fonts-recommended texlive-latex-extra From 646b6e0883f564286b81018bd65fae2f40f1be35 Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Thu, 28 Nov 2024 12:52:36 +0100 Subject: [PATCH 10/21] snap --- .github/workflows/ci.yml | 14 +-- bin/radical-pilot-bridge | 75 ++---------- bin/radical-pilot-component | 68 +---------- examples/00_getting_started.py | 2 + src/radical/pilot/messages.py | 39 +++--- src/radical/pilot/pilot_manager.py | 3 + src/radical/pilot/session.py | 119 +------------------ src/radical/pilot/utils/__init__.py | 1 + src/radical/pilot/utils/component.py | 15 ++- src/radical/pilot/utils/component_manager.py | 50 ++++---- 10 files changed, 80 insertions(+), 306 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ffb19230d..5371ae215 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -197,19 +197,7 @@ jobs: mkdir -p $BASE/client_sessions cd $BASE/client_sessions radical-stack - $RP_ROOT/examples/00_getting_started.py - $RP_ROOT/examples/09_mpi_tasks.py - $RP_ROOT/examples/01_task_details.py - $RP_ROOT/examples/02_failing_tasks.py - $RP_ROOT/examples/03_multiple_pilots.py - $RP_ROOT/examples/04_scheduler_selection.py - $RP_ROOT/examples/05_task_input_data.py - $RP_ROOT/examples/06_task_output_data.py - $RP_ROOT/examples/07_shared_task_data.py - $RP_ROOT/examples/08_task_environment.py - $RP_ROOT/examples/10_pre_and_post_exec.py - $RP_ROOT/examples/11_task_input_data_tar.py - $RP_ROOT/examples/11_task_input_folder.py + for f in $RP_ROOT/examples/[01]*.py; do echo "=== $f"; $f; done - name: prepare example_artifacts if: always() diff --git a/bin/radical-pilot-bridge b/bin/radical-pilot-bridge index 8a5ac5149..bb6bc20e0 100755 --- a/bin/radical-pilot-bridge +++ b/bin/radical-pilot-bridge @@ -6,15 +6,13 @@ __license__ = "MIT" import os import sys -import time -import threading as mt import setproctitle as spt import multiprocessing as mp import radical.utils as ru -from radical.pilot.messages import HeartbeatMessage +from radical.pilot.messages import ComponentStartedMessage # ------------------------------------------------------------------------------ @@ -28,27 +26,8 @@ def main(sid, reg_addr, uid, ppid, evt): - name: name of the bridge - kind: type of bridge (`pubsub` or `queue`) - If the config contains a `heartbeat` section, that section must be formatted - as follows: - - { - 'from' : 'uid', - 'addr_pub': 'addr_pub', - 'addr_sub': 'addr_sub', - 'interval': , - 'timeout' : - } - - If that section exists, the process will daemonize and heartbeats are used - to manage the bridge lifetime: the lifetime of this bridge is then dependent - on receiving heartbeats from the given `uid`: after `timeout` seconds of no - heartbeats arriving, the bridge will terminate. The bridge itself will - publish heartbeats every `interval` seconds on the heartbeat channel under - its own uid. - - If the heartbeat section is not present in the config file, the components - lifetime is expected to be explicitly managed, i.e., that this wrapper - process hosting the bridge is terminated externally. + The config will also contain a `cmgr_url` entry which points to the cmgr + this component should register with. The config file may contain other entries which are passed to the bridge and are interpreted by the bridge implementation. @@ -105,11 +84,8 @@ def wrapped_main(sid, reg_addr, uid, log, prof, ppid, evt): pwatcher.watch(int(ppid)) pwatcher.watch(os.getpid()) - term = mt.Event() - reg = ru.zmq.RegistryClient(url=reg_addr) - - hb_cfg = ru.TypedDict(reg['heartbeat']) - b_cfg = ru.TypedDict(reg['bridges.%s.cfg' % uid]) + reg = ru.zmq.RegistryClient(url=reg_addr) + b_cfg = ru.TypedDict(reg['bridges.%s.cfg' % uid]) # create the instance and begin to work bridge = ru.zmq.Bridge.create(uid, cfg=b_cfg) @@ -119,48 +95,13 @@ def wrapped_main(sid, reg_addr, uid, log, prof, ppid, evt): reg.close() bridge.start() - evt.set() - # re-enable the test below if timing issues crop up - # if 'pubsub' in uid: - # d = ru.zmq.test_pubsub(bridge.channel, bridge.addr_pub, bridge.addr_sub) - - # bridge runs - send heartbeats so that cmgr knows about it - hb_pub = ru.zmq.Publisher('heartbeat', hb_cfg.addr_pub, log=log, prof=prof) - - def hb_beat_cb(): - hb_pub.put('heartbeat', HeartbeatMessage(uid=uid)) - - def hb_term_cb(hb_uid): - bridge.stop() - term.set() - return False - - hb = ru.Heartbeat(uid=uid, - timeout=hb_cfg.timeout, - interval=hb_cfg.interval, - beat_cb=hb_beat_cb, - term_cb=hb_term_cb, - log=log) - hb.start() - - # always watch out for session heartbeat - hb.watch(uid=sid) - - # react on session heartbeats - def hb_sub_cb(topic, msg): - hb_msg = HeartbeatMessage(from_dict=msg) - if hb_msg.uid == sid: - hb.beat(uid=sid) - - ru.zmq.Subscriber('heartbeat', hb_cfg.addr_sub, - topic='heartbeat', cb=hb_sub_cb, - log=log, prof=prof) + pipe = ru.zmq.Pipe(mode=ru.zmq.MODE_PUSH, url=b_cfg.cmgr_url) + pipe.put(ComponentStartedMessage(uid=uid)) # all is set up - we can sit idle 'til end of time. - while not term.is_set(): - time.sleep(1) + bridge.wait() # ------------------------------------------------------------------------------ diff --git a/bin/radical-pilot-component b/bin/radical-pilot-component index 36c291335..0ddd00bc9 100755 --- a/bin/radical-pilot-component +++ b/bin/radical-pilot-component @@ -6,16 +6,13 @@ __license__ = "MIT" import os import sys -import time -import threading as mt import setproctitle as spt import multiprocessing as mp import radical.utils as ru import radical.pilot as rp -from radical.pilot.messages import HeartbeatMessage # ------------------------------------------------------------------------------ # @@ -28,27 +25,8 @@ def main(sid, reg_addr, uid, ppid, evt): - name: name of the component - kind: type of component - If the config contains a `heartbeat` section, that section must be formatted - as follows: - - { - 'from' : 'uid', - 'addr_pub': 'addr_pub', - 'addr_sub': 'addr_sub', - 'interval': , - 'timeout' : - } - - If that section exists, the process will daemonize and heartbeats are used - to manage the bridge lifetime: the lifetime of this bridge is then dependent - on receiving heartbeats from the given `uid`: after `timeout` seconds of no - heartbeats arriving, the bridge will terminate. The bridge itself will - publish heartbeats every `interval` seconds on the heartbeat channel under - its own uid. - - If the heartbeat section is not present in the config file, the components - lifetime is expected to be explicitly managed, i.e., that this wrapper - process hosting the bridge is terminated externally. + The config will also contain a `cmgr_url` entry which points to the cmgr + this component should register with. The config file may contain other entries which are passed to the component and are interpreted by the component implementation. @@ -79,16 +57,11 @@ def wrapped_main(sid, reg_addr, uid, log, prof, ppid, evt): pwatcher.watch(int(ppid)) pwatcher.watch(os.getpid()) - term = mt.Event() - reg = ru.zmq.RegistryClient(url=reg_addr) - - hb_cfg = ru.TypedDict(reg['heartbeat']) - c_cfg = ru.TypedDict(reg['components.%s.cfg' % uid]) + reg = ru.zmq.RegistryClient(url=reg_addr) + c_cfg = ru.TypedDict(reg['components.%s.cfg' % uid]) reg.close() - evt.set() - # start a non-primary session session = rp.Session(uid=sid, cfg=c_cfg, _role=rp.Session._DEFAULT, _reg_addr=reg_addr) @@ -96,38 +69,7 @@ def wrapped_main(sid, reg_addr, uid, log, prof, ppid, evt): # create the instance and begin to work comp = rp.utils.BaseComponent.create(c_cfg, session) comp.start() - - # component runs - send heartbeats so that session knows about it - hb_pub = ru.zmq.Publisher('heartbeat', hb_cfg.addr_pub, log=log, prof=prof) - - def hb_beat_cb(): - hb_pub.put('heartbeat', HeartbeatMessage(uid=uid)) - - def hb_term_cb(hb_uid): - comp.stop() - term.set() - return False - - hb = ru.Heartbeat(uid=uid, - timeout=hb_cfg.timeout, - interval=hb_cfg.interval, - beat_cb=hb_beat_cb, - term_cb=hb_term_cb, - log=log) - hb.start() - - # always watch out for session heartbeat - hb.watch(uid=sid) - - # react on session heartbeats - def hb_sub_cb(topic, msg): - hb_msg = HeartbeatMessage(from_dict=msg) - if hb_msg.uid == sid: - hb.beat(uid=sid) - - ru.zmq.Subscriber('heartbeat', hb_cfg.addr_sub, - topic='heartbeat', cb=hb_sub_cb, - log=log, prof=prof) + evt.set() # all is set up - we can sit idle 'til end of time. comp.wait() diff --git a/examples/00_getting_started.py b/examples/00_getting_started.py index f49e02355..d26c93404 100755 --- a/examples/00_getting_started.py +++ b/examples/00_getting_started.py @@ -45,7 +45,9 @@ # read the config used for resource details config = ru.read_json('%s/config.json' % os.path.dirname(__file__)).get(resource, {}) + print('=== create pmgr') pmgr = rp.PilotManager(session=session) + print('=== got pmgr') tmgr = rp.TaskManager(session=session) report.header('submit pilots') diff --git a/src/radical/pilot/messages.py b/src/radical/pilot/messages.py index 2cd141c55..b97ba6040 100644 --- a/src/radical/pilot/messages.py +++ b/src/radical/pilot/messages.py @@ -4,6 +4,7 @@ import radical.utils as ru + # ------------------------------------------------------------------------------ # class RPBaseMessage(ru.Message): @@ -11,9 +12,14 @@ class RPBaseMessage(ru.Message): # rpc distinguishes messages which are forwarded to the proxy bridge and # those which are not and thus remain local to the module they originate in. - _schema = {'fwd' : bool} - _defaults = {'_msg_type': 'rp_msg', - 'fwd' : False} + _msg_type = 'rp_msg' + _schema = {'fwd': bool} + _defaults = {'fwd': False} + + @classmethod + def register(cls): + cls._defaults['_msg_type'] = cls._msg_type + ru.Message.register_msg_type(cls._msg_type, cls) # we do not register this message type - it is not supposed to be used @@ -22,31 +28,30 @@ class RPBaseMessage(ru.Message): # ------------------------------------------------------------------------------ # -class HeartbeatMessage(RPBaseMessage): +class ComponentStartedMessage(RPBaseMessage): - # heartbeat messages are never forwarded + # startup messages are never forwarded - _schema = {'uid' : str} - _defaults = {'_msg_type': 'heartbeat', - 'fwd' : False, - 'uid' : None} + _msg_type = 'component_start' + _schema = {'uid': str} + _defaults = {'fwd': False, + 'uid': None} -ru.Message.register_msg_type('heartbeat', HeartbeatMessage) +ComponentStartedMessage.register() # ------------------------------------------------------------------------------ # class RPCRequestMessage(RPBaseMessage): + _msg_type = 'rpc_req' _schema = {'uid' : str, # uid of message 'addr' : str, # who is expected to act on the request 'cmd' : str, # rpc command 'args' : list, # rpc command arguments 'kwargs' : dict} # rpc command named arguments - _defaults = { - '_msg_type': 'rpc_req', - 'fwd' : True, + _defaults = {'fwd' : True, 'uid' : None, 'addr' : None, 'cmd' : None, @@ -54,20 +59,20 @@ class RPCRequestMessage(RPBaseMessage): 'kwargs' : {}} - -ru.Message.register_msg_type('rpc_req', RPCRequestMessage) +RPCRequestMessage.register() # ------------------------------------------------------------------------------ # class RPCResultMessage(RPBaseMessage): + _msg_type = 'rpc_res' _schema = {'uid' : str, # uid of rpc call 'val' : Any, # return value (`None` by default) 'out' : str, # stdout 'err' : str, # stderr 'exc' : str} # raised exception representation - _defaults = {'_msg_type': 'rpc_res', + _defaults = {'_msg_type': _msg_type, 'fwd' : True, 'uid' : None, 'val' : None, @@ -90,7 +95,7 @@ def __init__(self, rpc_req=None, from_dict=None, **kwargs): super().__init__(from_dict, **kwargs) -ru.Message.register_msg_type('rpc_res', RPCResultMessage) +RPCResultMessage.register() # ------------------------------------------------------------------------------ diff --git a/src/radical/pilot/pilot_manager.py b/src/radical/pilot/pilot_manager.py index e5f354c27..307a972a0 100644 --- a/src/radical/pilot/pilot_manager.py +++ b/src/radical/pilot/pilot_manager.py @@ -123,7 +123,10 @@ def __init__(self, session, cfg='default'): cfg.heartbeat = session.cfg.heartbeat cfg.client_sandbox = session._get_client_sandbox() + print('pmgr init') + super().__init__(cfg, session=session) + print('pmgr start') self.start() self._log.info('started pmgr %s', self._uid) diff --git a/src/radical/pilot/session.py b/src/radical/pilot/session.py index cd6c0246e..50def5dfa 100644 --- a/src/radical/pilot/session.py +++ b/src/radical/pilot/session.py @@ -15,7 +15,6 @@ from . import constants as rpc from . import utils as rpu -from .messages import HeartbeatMessage from .proxy import Proxy from .resource_config import ResourceConfig, ENDPOINTS_DEFAULT @@ -65,16 +64,6 @@ class Session(object): entities. """ - # In that role, the session will create a special pubsub channel `heartbeat` - # which is used by all components in its hierarchy to exchange heartbeat - # messages. Those messages are used to watch component health - if - # a (parent or child) component fails to send heartbeats for a certain - # amount of time, it is considered dead and the process tree will terminate. - # That heartbeat management is implemented in the `ru.Heartbeat` class. - # Only primary sessions instantiate a heartbeat channel (i.e., only the root - # sessions of RP client or agent modules), but all components need to call - # the sessions `heartbeat()` method at regular intervals. - # the reporter is an application-level singleton _reporter = None @@ -177,7 +166,6 @@ def __init__(self, proxy_url: Optional[str ] = None, self._tmgrs = dict() # map IDs to tmgr instances self._cmgr = None # only primary sessions have a cmgr self._rm = None # resource manager (agent_0 sessions) - self._hb = None # heartbeat monitor if _reg_addr: @@ -252,17 +240,12 @@ def _init_primary(self): # only primary sessions start and initialize the proxy service self._start_proxy() - # start heartbeat channel - self._start_heartbeat() - # push the session config into the registry self._publish_cfg() # start bridges and components self._start_components() - time.sleep(1) - # primary session hooks into the control pubsub bcfg = self._reg['bridges.%s' % rpc.CONTROL_PUBSUB] self._ctrl_pub = ru.zmq.Publisher(channel=rpc.CONTROL_PUBSUB, @@ -299,7 +282,6 @@ def _init_agent_0(self): self._start_registry() self._connect_registry() self._connect_proxy() - self._start_heartbeat() self._publish_cfg() self._init_rm() self._start_components() @@ -548,93 +530,6 @@ def _init_cfg_from_registry(self): self._prof.prof('session_start', uid=self._uid) - # -------------------------------------------------------------------------- - # - def _start_heartbeat(self): - - # only primary and agent_0 sessions manage heartbeats - assert self._role in [self._PRIMARY, self._AGENT_0] - - # start the embedded heartbeat pubsub bridge - self._hb_pubsub = ru.zmq.PubSub('heartbeat_pubsub', - cfg={'uid' : 'heartbeat_pubsub', - 'type' : 'pubsub', - 'log_lvl': 'debug', - 'path' : self._cfg.path}) - self._hb_pubsub.start() - time.sleep(1) - - # re-enable the test below if timing issues crop up - # ru.zmq.test_pubsub(self._hb_pubsub.channel, - # self._hb_pubsub.addr_pub, - # self._hb_pubsub.addr_sub), - - # fill 'cfg.heartbeat' section - self._cfg.heartbeat.addr_pub = str(self._hb_pubsub.addr_pub) - self._cfg.heartbeat.addr_sub = str(self._hb_pubsub.addr_sub) - - # create a publisher for that channel to publish own heartbeat - self._hb_pub = ru.zmq.Publisher(channel='heartbeat_pubsub', - url=self._cfg.heartbeat.addr_pub, - log=self._log, - prof=self._prof) - - - # -------------------------------------- - # start the heartbeat monitor, but first - # define its callbacks - def _hb_beat_cb(): - # called on every heartbeat: cfg.heartbeat.interval` - # publish own heartbeat - self._hb_pub.put('heartbeat', HeartbeatMessage(uid=self._uid)) - - # also update proxy heartbeat - if self._proxy: - try: - self._proxy.request('heartbeat', {'sid': self._uid}) - except: - # ignore errors in case proxy went away already - pass - # -------------------------------------- - - # -------------------------------------- - # called when some entity misses - # heartbeats: `cfg.heartbeat.timeout` - def _hb_term_cb(hb_uid): - if self._cmgr: - self._cmgr.close() - return False - # -------------------------------------- - - # create heartbeat manager which monitors all components in this session - # self._log.debug('hb %s from session', self._uid) - self._hb = ru.Heartbeat(uid=self._uid, - timeout=self._cfg.heartbeat.timeout, - interval=self._cfg.heartbeat.interval, - beat_cb=_hb_beat_cb, - term_cb=_hb_term_cb, - log=self._log) - self._hb.start() - - # -------------------------------------- - # subscribe to heartbeat msgs and inform - # self._hb about every heartbeat - def _hb_msg_cb(topic, msg): - - hb_msg = HeartbeatMessage(from_dict=msg) - - if hb_msg.uid != self._uid: - self._hb.beat(uid=hb_msg.uid) - # -------------------------------------- - - ru.zmq.Subscriber(channel='heartbeat_pubsub', - topic='heartbeat', - url=self._cfg.heartbeat.addr_sub, - cb=_hb_msg_cb, - log=self._log, - prof=self._prof) - - # -------------------------------------------------------------------------- # def _publish_cfg(self): @@ -644,15 +539,13 @@ def _publish_cfg(self): assert self._role in [self._PRIMARY, self._AGENT_0] - # push proxy, bridges, components and heartbeat subsections separately + # push proxy, bridges, and components subsections separately flat_cfg = copy.deepcopy(self._cfg) - del flat_cfg['heartbeat'] del flat_cfg['bridges'] del flat_cfg['components'] self._reg['cfg'] = flat_cfg - self._reg['heartbeat'] = self._cfg.heartbeat self._reg['bridges'] = self._cfg.bridges # proxy bridges self._reg['components'] = {} @@ -880,9 +773,8 @@ def _start_components(self): assert self._role in [self._PRIMARY, self._AGENT_0, self._AGENT_N] - # primary sessions and agents have a component manager which also - # manages heartbeat. 'self._cmgr.close()` should be called during - # termination + # primary sessions and agents have a component manager + # 'self._cmgr.close()` should be called during termination self._cmgr = rpu.ComponentManager(self.uid, self.reg_addr, self._uid) self._cmgr.start_bridges(self._cfg.bridges) self._cmgr.start_components(self._cfg.components) @@ -953,11 +845,6 @@ def close(self, **kwargs): if self._cmgr: self._cmgr.close() - # stop heartbeats - if self._hb: - self._hb.stop() - self._hb_pubsub.stop() - if self._proxy: if self._role == self._PRIMARY: diff --git a/src/radical/pilot/utils/__init__.py b/src/radical/pilot/utils/__init__.py index 147da205b..4eda231c4 100644 --- a/src/radical/pilot/utils/__init__.py +++ b/src/radical/pilot/utils/__init__.py @@ -35,6 +35,7 @@ from .prof_utils import * from .misc import * from .session import * +from .bridge import * from .component import * from .component_manager import * from .serializer import * diff --git a/src/radical/pilot/utils/component.py b/src/radical/pilot/utils/component.py index e14c562af..3d8c18552 100644 --- a/src/radical/pilot/utils/component.py +++ b/src/radical/pilot/utils/component.py @@ -16,6 +16,7 @@ from .. import constants as rpc from .. import states as rps +from ..messages import ComponentStartedMessage from ..messages import RPCRequestMessage, RPCResultMessage @@ -103,9 +104,8 @@ def __init__(self, cfg, session): the session under which to run this component, and a uid for the component itself which MUST be unique within the scope of the given session. - All components and the component managers will continuously sent heartbeat - messages on the control pubsub - missing heartbeats will by default lead to - component termination. + Components will send a startup message to the component manager upon + successful initialization. Further, the class must implement the registered work methods, with a signature of:: @@ -230,6 +230,15 @@ def start(self): assert self._thread.is_alive() + # send startup message + if self._cfg.cmgr_url: + self._log.debug('send startup message to %s', self._cfg.cmgr_url) + pipe = ru.zmq.Pipe(mode=ru.zmq.MODE_PUSH, url=self._cfg.cmgr_url) + pipe.put(ComponentStartedMessage(uid=self.uid)) + + # give the message some time to get out + time.sleep(0.1) + # -------------------------------------------------------------------------- # diff --git a/src/radical/pilot/utils/component_manager.py b/src/radical/pilot/utils/component_manager.py index d27da077d..4768b1268 100644 --- a/src/radical/pilot/utils/component_manager.py +++ b/src/radical/pilot/utils/component_manager.py @@ -9,7 +9,7 @@ import radical.utils as ru -from ..messages import HeartbeatMessage +from ..messages import ComponentStartedMessage # ------------------------------------------------------------------------------ @@ -22,8 +22,8 @@ class ComponentManager(object): etc. This ComponentManager centralises the code needed to spawn, manage and terminate such components. Any code which needs to create component should create a ComponentManager instance and pass the required component and - bridge layout and configuration. Callng `stop()` on the cmgr will terminate - the components and brisged. + bridge layout and configuration. Calling `stop()` on the cmgr will + terminate the components and bridges. ''' # -------------------------------------------------------------------------- @@ -39,7 +39,6 @@ def __init__(self, sid, reg_addr, owner): self._reg = ru.zmq.RegistryClient(url=self._reg_addr) self._cfg = ru.Config(from_dict=self._reg['cfg']) - self._hb_cfg = ru.Config(from_dict=self._reg['heartbeat']) self._uid = ru.generate_id('cmgr.%(item_counter)04d', ru.ID_CUSTOM, ns=self._sid) @@ -55,30 +54,28 @@ def __init__(self, sid, reg_addr, owner): self._log.debug('cmgr %s (%s)', self._uid, self._owner) - # component managers listen on the heartbeat pubsub to see if spawned - # components come alive - self._heartbeats = dict() # heartbeats we have seen - ru.zmq.Subscriber(channel='heartbeat_pubsub', - topic='heartbeat', - url=self._hb_cfg.addr_sub, - cb=self._hb_msg_cb, - log=self._log, - prof=self._prof) + # component managers open a zmq pipe so that components and bridges can + # send registration messages. + self._startups = dict() # startup messages we have seen + def register_cb(msg): + self._log.debug('=== got message: %s', msg) + msg = ru.zmq.Message.deserialize(msg) + if isinstance(msg, ComponentStartedMessage): + self._startups[msg.uid] = msg + else: + self._log.error('unknown message type: %s', type(msg)) - # -------------------------------------------------------------------------- - # - def _hb_msg_cb(self, topic, msg): - - hb_msg = HeartbeatMessage(from_dict=msg) - self._heartbeats[hb_msg.uid] = time.time() + self._pipe = ru.zmq.Pipe(mode=ru.zmq.MODE_PULL) + self._pipe.register_cb(register_cb) + self._cfg.cmgr_url = str(self._pipe.url) # -------------------------------------------------------------------------- # def _wait_startup(self, uids, timeout): ''' - Wait for the first heartbeat of the given component UIDs to appear. If + Wait for the startup message of the given component UIDs to appear. If that does not happen before timeout, an exception is raised. ''' @@ -89,7 +86,7 @@ def _wait_startup(self, uids, timeout): self._log.debug('wait for : %s', nok) - ok = [uid for uid in uids if uid in self._heartbeats] + ok = [uid for uid in uids if uid in self._startups] nok = [uid for uid in uids if uid not in ok] if len(ok) == len(uids): @@ -131,13 +128,13 @@ def start_bridges(self, bridges): bcfg.uid = uid bcfg.channel = bname bcfg.cmgr = self.uid + bcfg.cmgr_url = self._cfg.cmgr_url bcfg.owner = self._owner bcfg.sid = self._cfg.sid bcfg.path = self._cfg.path bcfg.reg_addr = self._cfg.reg_addr bcfg.log_lvl = self._cfg.log_lvl bcfg.debug_lvl = self._cfg.debug_lvl - bcfg.heartbeat = self._hb_cfg self._reg['bridges.%s.cfg' % bname] = bcfg @@ -151,12 +148,11 @@ def start_bridges(self, bridges): self._log.error(msg) raise RuntimeError(msg) - self._heartbeats[bname] = None self._log.info('created bridge %s [%s]', bname, bname) - # all bridges are started, wait for their heartbeats + # all bridges are started, wait for their startup messages self._log.debug('wait for %s', buids) - self._wait_startup(buids, timeout=self._hb_cfg.timeout) + self._wait_startup(buids, timeout=10.0) self._prof.prof('start_bridges_stop', uid=self._uid) @@ -184,13 +180,13 @@ def start_components(self, components, cfg = None): ccfg.owner = self._owner ccfg.sid = self._cfg.sid ccfg.cmgr = self._cfg.uid + ccfg.cmgr_url = self._cfg.cmgr_url ccfg.base = self._cfg.base ccfg.path = self._cfg.path ccfg.reg_addr = self._cfg.reg_addr ccfg.proxy_url = self._cfg.proxy_url ccfg.log_lvl = self._cfg.log_lvl ccfg.debug_lvl = self._cfg.debug_lvl - ccfg.heartbeat = self._hb_cfg if cfg: ru.dict_merge(ccfg, cfg, ru.OVERWRITE) @@ -213,7 +209,7 @@ def start_components(self, components, cfg = None): # all components should start now, wait for heartbeats to appear. self._log.debug('wait for %s', cuids) - self._wait_startup(cuids, timeout=self._hb_cfg.timeout) + self._wait_startup(cuids, timeout=10.0) self._prof.prof('start_components_stop', uid=self._uid) From 222418a4d554a1b60241eb247a93c0da23a0ac24 Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Fri, 29 Nov 2024 12:12:11 +0100 Subject: [PATCH 11/21] snap --- tests/unit_tests/test_executing/test_base.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/tests/unit_tests/test_executing/test_base.py b/tests/unit_tests/test_executing/test_base.py index 0f8df63e7..a9bf6556d 100755 --- a/tests/unit_tests/test_executing/test_base.py +++ b/tests/unit_tests/test_executing/test_base.py @@ -75,10 +75,15 @@ def test_initialize(self, mocked_rm, mocked_init): 'resource_manager': 'FORK', 'agent_spawner' : 'POPEN'}) - ec._log = ec._prof = mock.Mock() - ec.work = ec.control_cb = mock.Mock() - ec.register_input = ec.register_output = mock.Mock() - ec.register_publisher = ec.register_subscriber = mock.Mock() + ec._term = mock.Mock() + ec._log = mock.Mock() + ec._prof = mock.Mock() + ec.work = mock.Mock() + ec.control_cb = mock.Mock() + ec.register_input = mock.Mock() + ec.register_output = mock.Mock() + ec.register_publisher = mock.Mock() + ec.register_subscriber = mock.Mock() mocked_rm.create.return_value = mocked_rm ec.initialize() From 49a76305ab4a940576a97c0ececbf52fe1dcefd2 Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Fri, 29 Nov 2024 12:21:36 +0100 Subject: [PATCH 12/21] snap --- src/radical/pilot/utils/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/radical/pilot/utils/__init__.py b/src/radical/pilot/utils/__init__.py index 4eda231c4..147da205b 100644 --- a/src/radical/pilot/utils/__init__.py +++ b/src/radical/pilot/utils/__init__.py @@ -35,7 +35,6 @@ from .prof_utils import * from .misc import * from .session import * -from .bridge import * from .component import * from .component_manager import * from .serializer import * From c8b1bf83fd3cb1d8c2243a1841cedf76a7a84c8f Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Tue, 3 Dec 2024 16:27:19 +0100 Subject: [PATCH 13/21] snap --- examples/00_getting_started.py | 2 -- src/radical/pilot/messages.py | 1 + src/radical/pilot/utils/staging_helper.py | 5 ++++- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/examples/00_getting_started.py b/examples/00_getting_started.py index d26c93404..f49e02355 100755 --- a/examples/00_getting_started.py +++ b/examples/00_getting_started.py @@ -45,9 +45,7 @@ # read the config used for resource details config = ru.read_json('%s/config.json' % os.path.dirname(__file__)).get(resource, {}) - print('=== create pmgr') pmgr = rp.PilotManager(session=session) - print('=== got pmgr') tmgr = rp.TaskManager(session=session) report.header('submit pilots') diff --git a/src/radical/pilot/messages.py b/src/radical/pilot/messages.py index b97ba6040..01bc621e7 100644 --- a/src/radical/pilot/messages.py +++ b/src/radical/pilot/messages.py @@ -18,6 +18,7 @@ class RPBaseMessage(ru.Message): @classmethod def register(cls): + # TODO: this should be moved to the RU base class cls._defaults['_msg_type'] = cls._msg_type ru.Message.register_msg_type(cls._msg_type, cls) diff --git a/src/radical/pilot/utils/staging_helper.py b/src/radical/pilot/utils/staging_helper.py index 1311c1913..92052dfd0 100644 --- a/src/radical/pilot/utils/staging_helper.py +++ b/src/radical/pilot/utils/staging_helper.py @@ -146,6 +146,9 @@ def copy(self, src, tgt, flags): # # FIXME: why?? # flags = 0 + src = ru.Url(src) + tgt = ru.Url(tgt) + assert self._has_saga tmp = ru.Url(tgt) @@ -154,7 +157,7 @@ def copy(self, src, tgt, flags): fs = self._rsfs.Directory(str(tmp)) flags |= self._rsfs.CREATE_PARENTS - if os.path.isdir(src) or src.endswith('/'): + if os.path.isdir(src.path) or src.path.endswith('/'): flags |= self._rsfs.RECURSIVE # self._log.debug("copy %s 1 -> %s [%s]" % (src, tgt, flags)) From caca6956604f3c19ddca8854a5119a7393e656bc Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Tue, 3 Dec 2024 17:23:01 +0100 Subject: [PATCH 14/21] thread cleanup --- bin/radical-pilot-bridge | 2 +- src/radical/pilot/messages.py | 6 +- src/radical/pilot/pilot_manager.py | 2 + src/radical/pilot/proxy.py | 10 ++-- src/radical/pilot/session.py | 59 +++++++++++++------- src/radical/pilot/task_manager.py | 12 ++-- src/radical/pilot/utils/component.py | 35 +++++++++--- src/radical/pilot/utils/component_manager.py | 15 +++++ 8 files changed, 100 insertions(+), 41 deletions(-) diff --git a/bin/radical-pilot-bridge b/bin/radical-pilot-bridge index bb6bc20e0..34c94b37b 100755 --- a/bin/radical-pilot-bridge +++ b/bin/radical-pilot-bridge @@ -98,7 +98,7 @@ def wrapped_main(sid, reg_addr, uid, log, prof, ppid, evt): evt.set() pipe = ru.zmq.Pipe(mode=ru.zmq.MODE_PUSH, url=b_cfg.cmgr_url) - pipe.put(ComponentStartedMessage(uid=uid)) + pipe.put(ComponentStartedMessage(uid=uid, pid=os.getpid())) # all is set up - we can sit idle 'til end of time. bridge.wait() diff --git a/src/radical/pilot/messages.py b/src/radical/pilot/messages.py index 01bc621e7..23ac314cc 100644 --- a/src/radical/pilot/messages.py +++ b/src/radical/pilot/messages.py @@ -34,9 +34,11 @@ class ComponentStartedMessage(RPBaseMessage): # startup messages are never forwarded _msg_type = 'component_start' - _schema = {'uid': str} + _schema = {'uid': str, + 'pid': int} _defaults = {'fwd': False, - 'uid': None} + 'uid': None, + 'pid': None} ComponentStartedMessage.register() diff --git a/src/radical/pilot/pilot_manager.py b/src/radical/pilot/pilot_manager.py index 307a972a0..a7c67a17d 100644 --- a/src/radical/pilot/pilot_manager.py +++ b/src/radical/pilot/pilot_manager.py @@ -245,6 +245,8 @@ def close(self, terminate=True): ru.write_json(json, tgt) + super().close() + # -------------------------------------------------------------------------- # diff --git a/src/radical/pilot/proxy.py b/src/radical/pilot/proxy.py index b48502233..157ee23a7 100644 --- a/src/radical/pilot/proxy.py +++ b/src/radical/pilot/proxy.py @@ -123,8 +123,7 @@ # To any request other than the above, the ZMQ bridge will respond: # 'err': 'invalid request' # -# ------------------------------------------------------------------------------ - +# # ------------------------------------------------------------------------------ # class Proxy(ru.zmq.Server): @@ -132,6 +131,7 @@ class Proxy(ru.zmq.Server): def __init__(self, path=None): self._lock = mt.Lock() + self._term = mt.Event() self._clients = dict() ru.zmq.Server.__init__(self, uid='radical.pilot.proxy', @@ -153,9 +153,9 @@ def __init__(self, path=None): def _monitor(self): # this is a daemon thread - it never exits until process termination - while True: + while not self._term.is_set(): - time.sleep(10) + time.sleep(0.5) now = time.time() # iterate w/o lock, and thus get a snapshot of the known sids @@ -192,6 +192,8 @@ def _monitor(self): # def stop(self): + self._term.set() + for sid in self._clients: self._log.info('stop client %s' % sid) self._clients[sid]['term'].set() diff --git a/src/radical/pilot/session.py b/src/radical/pilot/session.py index 50def5dfa..d0eec6b64 100644 --- a/src/radical/pilot/session.py +++ b/src/radical/pilot/session.py @@ -157,15 +157,17 @@ def __init__(self, proxy_url: Optional[str ] = None, self._proxy_cfg = None self._closed = False self._created = time.time() + self._to_stop = list() self._close_options = _CloseOptions(close_options) self._close_options.verify() - self._proxy = None # proxy client instance - self._reg = None # registry client instance - self._pmgrs = dict() # map IDs to pmgr instances - self._tmgrs = dict() # map IDs to tmgr instances - self._cmgr = None # only primary sessions have a cmgr - self._rm = None # resource manager (agent_0 sessions) + self._proxy = None # proxy server instance + self._proxy_client = None # proxy client instance + self._reg = None # registry client instance + self._pmgrs = dict() # map IDs to pmgr instances + self._tmgrs = dict() # map IDs to tmgr instances + self._cmgr = None # only primary sessions have a cmgr + self._rm = None # resource manager (agent_0 sessions) if _reg_addr: @@ -606,8 +608,8 @@ def _start_proxy(self): # configure proxy channels try: - self._proxy = ru.zmq.Client(url=self._cfg.proxy_url, log=self._log) - self._proxy_cfg = self._proxy.request('register', {'sid':self._uid}) + self._proxy_client = ru.zmq.Client(url=self._cfg.proxy_url, log=self._log) + self._proxy_cfg = self._proxy_client.request('register', {'sid':self._uid}) except: self._log.exception('%s: failed to start proxy', self._role) @@ -624,8 +626,8 @@ def _connect_proxy(self): assert self._cfg.proxy_url # query the proxy service to fetch proxy cfg created by primary session - self._proxy = ru.zmq.Client(url=self._cfg.proxy_url) - self._proxy_cfg = self._proxy.request('lookup', {'sid': self._uid}) + self._proxy_client = ru.zmq.Client(url=self._cfg.proxy_url) + self._proxy_cfg = self._proxy_client.request('lookup', {'sid': self._uid}) self._log.debug('proxy response: %s', self._proxy_cfg) @@ -699,8 +701,11 @@ def pubsub_fwd(topic, msg): publisher.put(tgt, msg) - ru.zmq.Subscriber(channel=src, topic=src, path=path, cb=pubsub_fwd, - url=url_sub, log=self._log, prof=self._prof) + sub = ru.zmq.Subscriber(channel=src, topic=src, path=path, + cb=pubsub_fwd, url=url_sub, + log=self._log, prof=self._prof) + + self._to_stop.append(sub) # -------------------------------------------------------------------------- @@ -826,6 +831,7 @@ def close(self, **kwargs): options = self._close_options if options.terminate: + # terminate all components if self._role == self._PRIMARY: self._ctrl_pub.put(rpc.CONTROL_PUBSUB, {'cmd': 'terminate', @@ -845,17 +851,25 @@ def close(self, **kwargs): if self._cmgr: self._cmgr.close() - if self._proxy: + if self._proxy_client: if self._role == self._PRIMARY: try: self._log.debug('session %s closes service', self._uid) - self._proxy.request('unregister', {'sid': self._uid}) + self._proxy_client.request('unregister', {'sid': self._uid}) except: pass if self._role in [self._PRIMARY, self._AGENT_0]: - self._proxy.close() + self._proxy_client.close() + self._proxy_client = None + + if self._proxy: + + if self._role in [self._PRIMARY, self._AGENT_0]: + + self._proxy.stop() + self._proxy.wait() self._proxy = None self._log.debug("session %s closed", self._uid) @@ -889,24 +903,27 @@ def close(self, **kwargs): self._rep.ok('>>ok\n') + for thing in self._to_stop: + thing.stop() + + # -------------------------------------------------------------------------- # def _run_proxy(self): - proxy = Proxy(path=self._cfg.path) + self._proxy = Proxy(path=self._cfg.path) try: - proxy.start() + self._proxy.start() - self._proxy_url = proxy.addr + self._proxy_url = self._proxy.addr self._proxy_event.set() # run forever until process is interrupted or killed - proxy.wait() + self._proxy.wait() finally: - proxy.stop() - proxy.wait() + self._proxy.stop() # -------------------------------------------------------------------------- diff --git a/src/radical/pilot/task_manager.py b/src/radical/pilot/task_manager.py index 30f9d0fc3..6307df3e5 100644 --- a/src/radical/pilot/task_manager.py +++ b/src/radical/pilot/task_manager.py @@ -175,10 +175,10 @@ def __init__(self, session, cfg='default', scheduler=None): self._ctrl_pub = ru.zmq.Publisher(rpc.CONTROL_PUBSUB, url=ctrl_addr_pub, log=self._log, prof=self._prof) - ru.zmq.Subscriber(rpc.CONTROL_PUBSUB, url=ctrl_addr_sub, - log=self._log, prof=self._prof, - cb=self._control_cb, - topic=rpc.CONTROL_PUBSUB) + self._ctrl_sub = ru.zmq.Subscriber(rpc.CONTROL_PUBSUB, url=ctrl_addr_sub, + log=self._log, prof=self._prof, + cb=self._control_cb, + topic=rpc.CONTROL_PUBSUB) self._prof.prof('setup_done', uid=self._uid) self._rep.ok('>>ok\n') @@ -248,6 +248,10 @@ def close(self): tgt = '%s/%s.json' % (self._session.path, self.uid) ru.write_json(json, tgt) + self._ctrl_sub.stop() + + super().close() + # -------------------------------------------------------------------------- # diff --git a/src/radical/pilot/utils/component.py b/src/radical/pilot/utils/component.py index 3d8c18552..3e432cc52 100644 --- a/src/radical/pilot/utils/component.py +++ b/src/radical/pilot/utils/component.py @@ -234,7 +234,7 @@ def start(self): if self._cfg.cmgr_url: self._log.debug('send startup message to %s', self._cfg.cmgr_url) pipe = ru.zmq.Pipe(mode=ru.zmq.MODE_PUSH, url=self._cfg.cmgr_url) - pipe.put(ComponentStartedMessage(uid=self.uid)) + pipe.put(ComponentStartedMessage(uid=self.uid, pid=os.getpid())) # give the message some time to get out time.sleep(0.1) @@ -829,28 +829,28 @@ def register_timed_cb(self, cb, cb_data=None, timer=None): class Idler(mt.Thread): # -------------------------------------------------------------- - def __init__(self, name, log, timer, cb, cb_data, cb_lock): + def __init__(self, name, log, timer, cb, cb_data, cb_lock, term): self._name = name self._log = log self._timeout = timer self._cb = cb self._cb_data = cb_data self._cb_lock = cb_lock + self._term = term self._last = 0.0 - self._term = mt.Event() - super(Idler, self).__init__() + super().__init__() self.daemon = True self.start() - def stop(self): - self._term.set() - def run(self): try: self._log.debug('start idle thread: %s', self._cb) ret = True - while ret and not self._term.is_set(): + while ret: + if self._term.is_set(): + break + if self._timeout and \ self._timeout > (time.time() - self._last): # not yet @@ -869,7 +869,8 @@ def run(self): # ------------------------------------------------------------------ idler = Idler(name=name, timer=timer, log=self._log, - cb=cb, cb_data=cb_data, cb_lock=self._cb_lock) + cb=cb, cb_data=cb_data, cb_lock=self._cb_lock, + term=self._term) self._threads[name] = idler self._log.debug('%s registered idler %s', self.uid, name) @@ -1211,6 +1212,22 @@ def publish(self, pubsub, msg, topic=None): self._publishers[pubsub].put(topic, msg) + # -------------------------------------------------------------------------- + # + def close(self): + + self._term.set() + + for inp in self._inputs: + self._inputs[inp]['queue'].stop() + + for sub in self._subscribers: + self._subscribers[sub].stop() + + self._prof.close() + self._log.close() + + # ------------------------------------------------------------------------------ # class ClientComponent(BaseComponent): diff --git a/src/radical/pilot/utils/component_manager.py b/src/radical/pilot/utils/component_manager.py index 4768b1268..27d6a83ac 100644 --- a/src/radical/pilot/utils/component_manager.py +++ b/src/radical/pilot/utils/component_manager.py @@ -6,6 +6,7 @@ import os import time +import signal import radical.utils as ru @@ -36,6 +37,7 @@ def __init__(self, sid, reg_addr, owner): self._sid = sid self._reg_addr = reg_addr self._owner = owner + self._to_kill = list() self._reg = ru.zmq.RegistryClient(url=self._reg_addr) self._cfg = ru.Config(from_dict=self._reg['cfg']) @@ -63,6 +65,7 @@ def register_cb(msg): msg = ru.zmq.Message.deserialize(msg) if isinstance(msg, ComponentStartedMessage): self._startups[msg.uid] = msg + self._to_kill.append(msg.pid) else: self._log.error('unknown message type: %s', type(msg)) @@ -220,6 +223,18 @@ def close(self): self._prof.prof('close', uid=self._uid) + for pid in self._to_kill: + + self._log.debug('kill %s', pid) + + try: + os.kill(pid, signal.SIGKILL) + + except ProcessLookupError: + pass + + self._pipe.stop() + # ------------------------------------------------------------------------------ From 869f44f08b2486a24d755a17d95d69ce6878dff7 Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Tue, 3 Dec 2024 18:56:34 +0100 Subject: [PATCH 15/21] snap --- src/radical/pilot/utils/component.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/radical/pilot/utils/component.py b/src/radical/pilot/utils/component.py index 3e432cc52..76e713c06 100644 --- a/src/radical/pilot/utils/component.py +++ b/src/radical/pilot/utils/component.py @@ -561,9 +561,6 @@ def _finalize(self): # call component level finalize, before we tear down channels self.finalize() - for thread in self._threads.values(): - thread.stop() - self._log.debug('%s close prof', self.uid) try: self._prof.prof('component_final') From 8368576593017c9400c4cfa989cffc41091bbf24 Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Wed, 4 Dec 2024 00:04:02 +0100 Subject: [PATCH 16/21] snap --- src/radical/pilot/pilot.py | 13 ++++++++++--- src/radical/pilot/pilot_manager.py | 3 --- src/radical/pilot/states.py | 2 +- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/src/radical/pilot/pilot.py b/src/radical/pilot/pilot.py index 62711b9ae..486a0779e 100644 --- a/src/radical/pilot/pilot.py +++ b/src/radical/pilot/pilot.py @@ -77,6 +77,7 @@ def __init__(self, pmgr: PilotManager, descr): self._uid = self._descr.get('uid') self._state = rps.NEW self._log = pmgr._log + self._sub = None self._pilot_dict = dict() self._callbacks = dict() self._cb_lock = ru.RLock() @@ -167,9 +168,10 @@ def __init__(self, pmgr: PilotManager, descr): self._ctrl_pub = ru.zmq.Publisher(rpc.CONTROL_PUBSUB, url=ctrl_addr_pub, log=self._log, prof=self._prof) - ru.zmq.Subscriber(rpc.CONTROL_PUBSUB, url=ctrl_addr_sub, - log=self._log, prof=self._prof, cb=self._control_cb, - topic=rpc.CONTROL_PUBSUB) + self._sub = ru.zmq.Subscriber(rpc.CONTROL_PUBSUB, url=ctrl_addr_sub, + log=self._log, prof=self._prof, + cb=self._control_cb, + topic=rpc.CONTROL_PUBSUB) # -------------------------------------------------------------------------- @@ -234,6 +236,10 @@ def _update(self, pilot_dict): self._state = target + if self._state in rps.FINAL: + if self._sub: + self._sub.stop() + # FIXME: this is a hack to get the resource details into the pilot resources = pilot_dict.get('resources') or {} rm_info = resources.get('rm_info') @@ -610,6 +616,7 @@ def wait(self, state=None, timeout=None): if self.state in rps.FINAL: + # we will never see another state progression. Raise an error # (unless we waited for this) if self.state in states: diff --git a/src/radical/pilot/pilot_manager.py b/src/radical/pilot/pilot_manager.py index a7c67a17d..1ab6655a8 100644 --- a/src/radical/pilot/pilot_manager.py +++ b/src/radical/pilot/pilot_manager.py @@ -123,10 +123,7 @@ def __init__(self, session, cfg='default'): cfg.heartbeat = session.cfg.heartbeat cfg.client_sandbox = session._get_client_sandbox() - print('pmgr init') - super().__init__(cfg, session=session) - print('pmgr start') self.start() self._log.info('started pmgr %s', self._uid) diff --git a/src/radical/pilot/states.py b/src/radical/pilot/states.py index 784197c46..f1c0ac274 100644 --- a/src/radical/pilot/states.py +++ b/src/radical/pilot/states.py @@ -71,7 +71,7 @@ def _pilot_state_progress(pid, current, target): # allow to transition from FAILED to DONE (done gets picked up from DB, # sometimes after pilot watcher detects demise) if current == FAILED: - if target in [DONE, FAILED]: + if target in FINAL: return [target, []] if current in FINAL and target != current: From cbaa0d506c9cbba26a851bb33013aa12c9feef28 Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Wed, 4 Dec 2024 10:44:42 +0100 Subject: [PATCH 17/21] snap --- src/radical/pilot/agent/agent_0.py | 18 +++++++++++++++--- src/radical/pilot/pilot.py | 13 ++++++++++--- 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/src/radical/pilot/agent/agent_0.py b/src/radical/pilot/agent/agent_0.py index 2694642f7..e0ad305f1 100644 --- a/src/radical/pilot/agent/agent_0.py +++ b/src/radical/pilot/agent/agent_0.py @@ -298,6 +298,8 @@ def finalize(self): try : log = ru.ru_open('./agent_0.log', 'r').read(1024) except: pass + self._log.debug('final cause: %s', self._final_cause) + if self._final_cause == 'timeout' : state = rps.DONE elif self._final_cause == 'cancel' : state = rps.CANCELED elif self._final_cause == 'sys.exit' : state = rps.CANCELED @@ -620,13 +622,23 @@ def control_cb(self, topic, msg): return self._ctrl_cancel_pilots(msg) elif cmd == 'service_info': - self._log.debug('=== PILOT COMMAND: %s: %s', cmd, arg) + self._log.debug('PILOT COMMAND: %s: %s', cmd, arg) return self._ctrl_service_info(msg, arg) else: self._log.error('invalid command: [%s]', cmd) + # -------------------------------------------------------------------------- + # + def stop(self): + + self._log.info('stop agent') + self._final_cause = 'cancel' + super().stop() + self._session.close() + + # -------------------------------------------------------------------------- # def _ctrl_cancel_pilots(self, msg): @@ -655,7 +667,7 @@ def _ctrl_service_info(self, msg, arg): error = arg['error'] info = arg['info'] - self._log.debug('=== service info: %s: %s', uid, info) + self._log.debug('service info: %s: %s', uid, info) # This message signals that an agent service instance is up and running. # We expect to find the service UID in args and can then unblock the @@ -683,7 +695,7 @@ def _ctrl_service_info(self, msg, arg): self._reg['services.%s' % uid] = info # signal main thread when that the service is up - self._log.debug('=== set service start event for %s', uid) + self._log.debug('set service start event for %s', uid) self._service_start_evt.set() return True diff --git a/src/radical/pilot/pilot.py b/src/radical/pilot/pilot.py index 486a0779e..95733f9cc 100644 --- a/src/radical/pilot/pilot.py +++ b/src/radical/pilot/pilot.py @@ -3,8 +3,11 @@ __copyright__ = "Copyright 2013-2016, http://radical.rutgers.edu" __license__ = "MIT" +import os +import sys import copy import time +import signal import threading as mt @@ -191,13 +194,17 @@ def _default_state_cb(self, pilot, state=None): self._log.info("[Callback]: pilot %s state: %s.", uid, state) if state == rps.FAILED and self._exit_on_error: + self._log.error("[Callback]: pilot '%s' failed (exit)", uid) + self._sub.stop() + self._sub = None # There are different ways to tell main... - ru.cancel_main_thread('int') + # ru.print_stacktrace() + sys.stderr.write('=== pilot failed, exit_on_error ===\n') + ru.cancel_main_thread('term') # raise RuntimeError('pilot %s failed - fatal!' % self.uid) - # os.kill(os.getpid()) - # sys.exit() + # os.kill(os.getpid(), signal.SIGTERM) # -------------------------------------------------------------------------- From b40f61d0067b1b34d1a183660ea0f1471f02a99d Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Tue, 10 Dec 2024 23:27:24 +0100 Subject: [PATCH 18/21] respond to comments --- src/radical/pilot/pilot.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/radical/pilot/pilot.py b/src/radical/pilot/pilot.py index 95733f9cc..43494b0c4 100644 --- a/src/radical/pilot/pilot.py +++ b/src/radical/pilot/pilot.py @@ -246,6 +246,7 @@ def _update(self, pilot_dict): if self._state in rps.FINAL: if self._sub: self._sub.stop() + self._sub = None # FIXME: this is a hack to get the resource details into the pilot resources = pilot_dict.get('resources') or {} From ce332a693ae15757a446a43958d969bdb2e7eab5 Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Tue, 10 Dec 2024 23:46:45 +0100 Subject: [PATCH 19/21] respond to comments --- src/radical/pilot/pilot.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/radical/pilot/pilot.py b/src/radical/pilot/pilot.py index 43494b0c4..b68073e92 100644 --- a/src/radical/pilot/pilot.py +++ b/src/radical/pilot/pilot.py @@ -197,7 +197,6 @@ def _default_state_cb(self, pilot, state=None): self._log.error("[Callback]: pilot '%s' failed (exit)", uid) self._sub.stop() - self._sub = None # There are different ways to tell main... # ru.print_stacktrace() @@ -244,9 +243,7 @@ def _update(self, pilot_dict): self._state = target if self._state in rps.FINAL: - if self._sub: - self._sub.stop() - self._sub = None + self._sub.stop() # FIXME: this is a hack to get the resource details into the pilot resources = pilot_dict.get('resources') or {} From c15af7578d5a470817856a33dfc34859cfe88fc5 Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Tue, 10 Dec 2024 23:47:28 +0100 Subject: [PATCH 20/21] respond to comments --- src/radical/pilot/states.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/radical/pilot/states.py b/src/radical/pilot/states.py index f1c0ac274..a59a939f3 100644 --- a/src/radical/pilot/states.py +++ b/src/radical/pilot/states.py @@ -68,7 +68,7 @@ def _pilot_state_progress(pid, current, target): if target in [DONE, FAILED, CANCELED]: return [target, []] - # allow to transition from FAILED to DONE (done gets picked up from DB, + # allow to transition from FAILED to DONE (done gets picked up from proxy, # sometimes after pilot watcher detects demise) if current == FAILED: if target in FINAL: From 0230d1e2fe196686dfe555e055106c89f6a8ce0e Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Wed, 11 Dec 2024 11:55:29 +0100 Subject: [PATCH 21/21] cleanup --- .github/workflows/ci.yml | 55 ---------------------------------------- 1 file changed, 55 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5371ae215..1a7e067d2 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -157,58 +157,3 @@ jobs: path: sessions_analytics retention-days: 5 - examples: - runs-on: ubuntu-latest - strategy: - matrix: - python-version: [ '3.12' ] - steps: - - uses: actions/checkout@v4 - with: - fetch-depth: 2 - - uses: actions/setup-python@v5 - with: - python-version: ${{ matrix.python-version }} - - name: Install dependencies - run: | - sudo apt update - sudo apt install -y openmpi-bin libopenmpi-dev - sudo apt install -y texlive cm-super - sudo apt install -y texlive-fonts-extra texlive-extra-utils dvipng - sudo apt install -y texlive-fonts-recommended texlive-latex-extra - python -m venv testenv - . testenv/bin/activate - python -m pip install --upgrade pip setuptools wheel - python -m pip install -r requirements-ci.txt - radical-stack - - name: run examples - timeout-minutes: 10 - run: | - export RADICAL_LOG_LVL=DEBUG_9 - export RADICAL_PROFILE=True - export RADICAL_DEBUG=TRUE - export RADICAL_DEBUG_HELPER=TRUE - export RADICAL_REPORT=TRUE - export RADICAL_UTILS_ZMQ_LOG_LVL=INFO - export RADICAL_UTILS_HEARTBEAT_LOG_LVL=INFO - . testenv/bin/activate - export RP_ROOT=$(pwd) - export BASE=/home/runner/radical.pilot.sandbox/ - mkdir -p $BASE/client_sessions - cd $BASE/client_sessions - radical-stack - for f in $RP_ROOT/examples/[01]*.py; do echo "=== $f"; $f; done - - - name: prepare example_artifacts - if: always() - run: | - tar zcf /home/runner/example_artifacts.tgz /home/runner/radical.pilot.sandbox/ - - - name: upload example_artifacts - if: always() - uses: actions/upload-artifact@v3 - with: - name: example_artifacts - path: /home/runner/example_artifacts.tgz - -