Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/component sync #3269

Merged
merged 26 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 28 additions & 19 deletions .github/workflows/ci-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ jobs:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
sudo apt update
sudo apt install -y openmpi-bin
python -m venv testenv
. testenv/bin/activate
python -m pip install --upgrade pip setuptools wheel
python -m pip install .
- name: run examples
timeout-minutes: 15
timeout-minutes: 10
run: |
sudo apt update
sudo apt install -y openmpi-bin
export RADICAL_LOG_LVL=DEBUG_9
export RADICAL_PROFILE=True
export RADICAL_DEBUG=TRUE
Expand All @@ -42,25 +42,34 @@ jobs:
export RADICAL_UTILS_ZMQ_LOG_LVL=INFO
export RADICAL_UTILS_HEARTBEAT_LOG_LVL=INFO
. testenv/bin/activate
mkdir example_artifacts/
cd example_artifacts/
../examples/00_getting_started.py
../examples/09_mpi_tasks.py
../examples/01_task_details.py
../examples/02_failing_tasks.py
../examples/03_multiple_pilots.py
../examples/04_scheduler_selection.py
../examples/05_task_input_data.py
../examples/06_task_output_data.py
../examples/07_shared_task_data.py
../examples/08_task_environment.py
../examples/10_pre_and_post_exec.py
../examples/11_task_input_data_tar.py
../examples/11_task_input_folder.py
export RP_ROOT=$(pwd)
export BASE=/home/runner/radical.pilot.sandbox/
mkdir -p $BASE/client_sessions
cd $BASE/client_sessions
radical-stack
$RP_ROOT/examples/00_getting_started.py
$RP_ROOT/examples/09_mpi_tasks.py
$RP_ROOT/examples/01_task_details.py
$RP_ROOT/examples/02_failing_tasks.py
$RP_ROOT/examples/03_multiple_pilots.py
$RP_ROOT/examples/04_scheduler_selection.py
$RP_ROOT/examples/05_task_input_data.py
$RP_ROOT/examples/06_task_output_data.py
$RP_ROOT/examples/07_shared_task_data.py
$RP_ROOT/examples/08_task_environment.py
$RP_ROOT/examples/10_pre_and_post_exec.py
$RP_ROOT/examples/11_task_input_data_tar.py
$RP_ROOT/examples/11_task_input_folder.py

- name: prepare example_artifacts
if: always()
run: |
tar zcf /home/runner/example_artifacts.tgz /home/runner/radical.pilot.sandbox/

- name: upload example_artifacts
if: always()
uses: actions/upload-artifact@v3
with:
name: example_artifacts
path: example_artifacts
path: /home/runner/example_artifacts.tgz

75 changes: 8 additions & 67 deletions bin/radical-pilot-bridge
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,13 @@ __license__ = "MIT"

import os
import sys
import time

import threading as mt
import setproctitle as spt
import multiprocessing as mp

import radical.utils as ru

from radical.pilot.messages import HeartbeatMessage
from radical.pilot.messages import ComponentStartedMessage


# ------------------------------------------------------------------------------
Expand All @@ -28,27 +26,8 @@ def main(sid, reg_addr, uid, ppid, evt):
- name: name of the bridge
- kind: type of bridge (`pubsub` or `queue`)

If the config contains a `heartbeat` section, that section must be formatted
as follows:

{
'from' : 'uid',
'addr_pub': 'addr_pub',
'addr_sub': 'addr_sub',
'interval': <float>,
'timeout' : <float>
}

If that section exists, the process will daemonize and heartbeats are used
to manage the bridge lifetime: the lifetime of this bridge is then dependent
on receiving heartbeats from the given `uid`: after `timeout` seconds of no
heartbeats arriving, the bridge will terminate. The bridge itself will
publish heartbeats every `interval` seconds on the heartbeat channel under
its own uid.

If the heartbeat section is not present in the config file, the components
lifetime is expected to be explicitly managed, i.e., that this wrapper
process hosting the bridge is terminated externally.
The config will also contain a `cmgr_url` entry which points to the cmgr
this component should register with.

The config file may contain other entries which are passed to the bridge
and are interpreted by the bridge implementation.
Expand Down Expand Up @@ -105,11 +84,8 @@ def wrapped_main(sid, reg_addr, uid, log, prof, ppid, evt):
pwatcher.watch(int(ppid))
pwatcher.watch(os.getpid())

term = mt.Event()
reg = ru.zmq.RegistryClient(url=reg_addr)

hb_cfg = ru.TypedDict(reg['heartbeat'])
b_cfg = ru.TypedDict(reg['bridges.%s.cfg' % uid])
reg = ru.zmq.RegistryClient(url=reg_addr)
b_cfg = ru.TypedDict(reg['bridges.%s.cfg' % uid])

# create the instance and begin to work
bridge = ru.zmq.Bridge.create(uid, cfg=b_cfg)
Expand All @@ -119,48 +95,13 @@ def wrapped_main(sid, reg_addr, uid, log, prof, ppid, evt):

reg.close()
bridge.start()

evt.set()

# re-enable the test below if timing issues crop up
# if 'pubsub' in uid:
# d = ru.zmq.test_pubsub(bridge.channel, bridge.addr_pub, bridge.addr_sub)

# bridge runs - send heartbeats so that cmgr knows about it
hb_pub = ru.zmq.Publisher('heartbeat', hb_cfg.addr_pub, log=log, prof=prof)

def hb_beat_cb():
hb_pub.put('heartbeat', HeartbeatMessage(uid=uid))

def hb_term_cb(hb_uid):
bridge.stop()
term.set()
return False

hb = ru.Heartbeat(uid=uid,
timeout=hb_cfg.timeout,
interval=hb_cfg.interval,
beat_cb=hb_beat_cb,
term_cb=hb_term_cb,
log=log)
hb.start()

# always watch out for session heartbeat
hb.watch(uid=sid)

# react on session heartbeats
def hb_sub_cb(topic, msg):
hb_msg = HeartbeatMessage(from_dict=msg)
if hb_msg.uid == sid:
hb.beat(uid=sid)

ru.zmq.Subscriber('heartbeat', hb_cfg.addr_sub,
topic='heartbeat', cb=hb_sub_cb,
log=log, prof=prof)
pipe = ru.zmq.Pipe(mode=ru.zmq.MODE_PUSH, url=b_cfg.cmgr_url)
pipe.put(ComponentStartedMessage(uid=uid, pid=os.getpid()))

# all is set up - we can sit idle 'til end of time.
while not term.is_set():
time.sleep(1)
bridge.wait()


# ------------------------------------------------------------------------------
Expand Down
71 changes: 6 additions & 65 deletions bin/radical-pilot-component
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,13 @@ __license__ = "MIT"

import os
import sys
import time

import threading as mt
import setproctitle as spt
import multiprocessing as mp

import radical.utils as ru
import radical.pilot as rp

from radical.pilot.messages import HeartbeatMessage

# ------------------------------------------------------------------------------
#
Expand All @@ -28,27 +25,8 @@ def main(sid, reg_addr, uid, ppid, evt):
- name: name of the component
- kind: type of component

If the config contains a `heartbeat` section, that section must be formatted
as follows:

{
'from' : 'uid',
'addr_pub': 'addr_pub',
'addr_sub': 'addr_sub',
'interval': <float>,
'timeout' : <float>
}

If that section exists, the process will daemonize and heartbeats are used
to manage the bridge lifetime: the lifetime of this bridge is then dependent
on receiving heartbeats from the given `uid`: after `timeout` seconds of no
heartbeats arriving, the bridge will terminate. The bridge itself will
publish heartbeats every `interval` seconds on the heartbeat channel under
its own uid.

If the heartbeat section is not present in the config file, the components
lifetime is expected to be explicitly managed, i.e., that this wrapper
process hosting the bridge is terminated externally.
The config will also contain a `cmgr_url` entry which points to the cmgr
this component should register with.

The config file may contain other entries which are passed to the component
and are interpreted by the component implementation.
Expand Down Expand Up @@ -79,59 +57,22 @@ def wrapped_main(sid, reg_addr, uid, log, prof, ppid, evt):
pwatcher.watch(int(ppid))
pwatcher.watch(os.getpid())

term = mt.Event()
reg = ru.zmq.RegistryClient(url=reg_addr)

hb_cfg = ru.TypedDict(reg['heartbeat'])
c_cfg = ru.TypedDict(reg['components.%s.cfg' % uid])
reg = ru.zmq.RegistryClient(url=reg_addr)
c_cfg = ru.TypedDict(reg['components.%s.cfg' % uid])

reg.close()

evt.set()

# start a non-primary session
session = rp.Session(uid=sid, cfg=c_cfg,
_role=rp.Session._DEFAULT, _reg_addr=reg_addr)

# create the instance and begin to work
comp = rp.utils.BaseComponent.create(c_cfg, session)
comp.start()

# component runs - send heartbeats so that session knows about it
hb_pub = ru.zmq.Publisher('heartbeat', hb_cfg.addr_pub, log=log, prof=prof)

def hb_beat_cb():
hb_pub.put('heartbeat', HeartbeatMessage(uid=uid))

def hb_term_cb(hb_uid):
comp.stop()
term.set()
return False

hb = ru.Heartbeat(uid=uid,
timeout=hb_cfg.timeout,
interval=hb_cfg.interval,
beat_cb=hb_beat_cb,
term_cb=hb_term_cb,
log=log)
hb.start()

# always watch out for session heartbeat
hb.watch(uid=sid)

# react on session heartbeats
def hb_sub_cb(topic, msg):
hb_msg = HeartbeatMessage(from_dict=msg)
if hb_msg.uid == sid:
hb.beat(uid=sid)

ru.zmq.Subscriber('heartbeat', hb_cfg.addr_sub,
topic='heartbeat', cb=hb_sub_cb,
log=log, prof=prof)
evt.set()

# all is set up - we can sit idle 'til end of time.
while not term.is_set():
time.sleep(1)
comp.wait()


# ------------------------------------------------------------------------------
Expand Down
18 changes: 15 additions & 3 deletions src/radical/pilot/agent/agent_0.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,8 @@ def finalize(self):
try : log = ru.ru_open('./agent_0.log', 'r').read(1024)
except: pass

self._log.debug('final cause: %s', self._final_cause)

if self._final_cause == 'timeout' : state = rps.DONE
elif self._final_cause == 'cancel' : state = rps.CANCELED
elif self._final_cause == 'sys.exit' : state = rps.CANCELED
Expand Down Expand Up @@ -620,13 +622,23 @@ def control_cb(self, topic, msg):
return self._ctrl_cancel_pilots(msg)

elif cmd == 'service_info':
self._log.debug('=== PILOT COMMAND: %s: %s', cmd, arg)
self._log.debug('PILOT COMMAND: %s: %s', cmd, arg)
return self._ctrl_service_info(msg, arg)

else:
self._log.error('invalid command: [%s]', cmd)


# --------------------------------------------------------------------------
#
def stop(self):

self._log.info('stop agent')
self._final_cause = 'cancel'
super().stop()
self._session.close()


# --------------------------------------------------------------------------
#
def _ctrl_cancel_pilots(self, msg):
Expand Down Expand Up @@ -655,7 +667,7 @@ def _ctrl_service_info(self, msg, arg):
error = arg['error']
info = arg['info']

self._log.debug('=== service info: %s: %s', uid, info)
self._log.debug('service info: %s: %s', uid, info)

# This message signals that an agent service instance is up and running.
# We expect to find the service UID in args and can then unblock the
Expand Down Expand Up @@ -683,7 +695,7 @@ def _ctrl_service_info(self, msg, arg):
self._reg['services.%s' % uid] = info

# signal main thread when that the service is up
self._log.debug('=== set service start event for %s', uid)
self._log.debug('set service start event for %s', uid)
self._service_start_evt.set()

return True
Expand Down
2 changes: 1 addition & 1 deletion src/radical/pilot/agent/executing/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def _to_watcher(self):
`self._cancel_task(task)`. That has to be implemented by al executors.
'''

while True:
while not self._term.is_set():

# check once per second at most
time.sleep(1)
mtitov marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
Loading
Loading