Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/component sync #3269

Merged
merged 26 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 28 additions & 19 deletions .github/workflows/ci-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ jobs:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
sudo apt update
sudo apt install -y openmpi-bin
python -m venv testenv
. testenv/bin/activate
python -m pip install --upgrade pip setuptools wheel
python -m pip install .
- name: run examples
timeout-minutes: 15
timeout-minutes: 10
run: |
sudo apt update
sudo apt install -y openmpi-bin
export RADICAL_LOG_LVL=DEBUG_9
export RADICAL_PROFILE=True
export RADICAL_DEBUG=TRUE
Expand All @@ -42,25 +42,34 @@ jobs:
export RADICAL_UTILS_ZMQ_LOG_LVL=INFO
export RADICAL_UTILS_HEARTBEAT_LOG_LVL=INFO
. testenv/bin/activate
mkdir example_artifacts/
cd example_artifacts/
../examples/00_getting_started.py
../examples/09_mpi_tasks.py
../examples/01_task_details.py
../examples/02_failing_tasks.py
../examples/03_multiple_pilots.py
../examples/04_scheduler_selection.py
../examples/05_task_input_data.py
../examples/06_task_output_data.py
../examples/07_shared_task_data.py
../examples/08_task_environment.py
../examples/10_pre_and_post_exec.py
../examples/11_task_input_data_tar.py
../examples/11_task_input_folder.py
export RP_ROOT=$(pwd)
export BASE=/home/runner/radical.pilot.sandbox/
mkdir -p $BASE/client_sessions
cd $BASE/client_sessions
radical-stack
$RP_ROOT/examples/00_getting_started.py
$RP_ROOT/examples/09_mpi_tasks.py
$RP_ROOT/examples/01_task_details.py
$RP_ROOT/examples/02_failing_tasks.py
$RP_ROOT/examples/03_multiple_pilots.py
$RP_ROOT/examples/04_scheduler_selection.py
$RP_ROOT/examples/05_task_input_data.py
$RP_ROOT/examples/06_task_output_data.py
$RP_ROOT/examples/07_shared_task_data.py
$RP_ROOT/examples/08_task_environment.py
$RP_ROOT/examples/10_pre_and_post_exec.py
$RP_ROOT/examples/11_task_input_data_tar.py
$RP_ROOT/examples/11_task_input_folder.py

- name: prepare example_artifacts
if: always()
run: |
tar zcf /home/runner/example_artifacts.tgz /home/runner/radical.pilot.sandbox/

- name: upload example_artifacts
if: always()
uses: actions/upload-artifact@v3
with:
name: example_artifacts
path: example_artifacts
path: /home/runner/example_artifacts.tgz

55 changes: 55 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,58 @@ jobs:
path: sessions_analytics
retention-days: 5

examples:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [ '3.12' ]
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 2
- uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
sudo apt update
sudo apt install -y openmpi-bin libopenmpi-dev
sudo apt install -y texlive cm-super
sudo apt install -y texlive-fonts-extra texlive-extra-utils dvipng
sudo apt install -y texlive-fonts-recommended texlive-latex-extra
python -m venv testenv
. testenv/bin/activate
python -m pip install --upgrade pip setuptools wheel
python -m pip install -r requirements-ci.txt
radical-stack
- name: run examples
timeout-minutes: 10
run: |
export RADICAL_LOG_LVL=DEBUG_9
export RADICAL_PROFILE=True
export RADICAL_DEBUG=TRUE
export RADICAL_DEBUG_HELPER=TRUE
export RADICAL_REPORT=TRUE
export RADICAL_UTILS_ZMQ_LOG_LVL=INFO
export RADICAL_UTILS_HEARTBEAT_LOG_LVL=INFO
. testenv/bin/activate
export RP_ROOT=$(pwd)
export BASE=/home/runner/radical.pilot.sandbox/
mkdir -p $BASE/client_sessions
cd $BASE/client_sessions
radical-stack
for f in $RP_ROOT/examples/[01]*.py; do echo "=== $f"; $f; done

- name: prepare example_artifacts
if: always()
run: |
tar zcf /home/runner/example_artifacts.tgz /home/runner/radical.pilot.sandbox/

- name: upload example_artifacts
if: always()
uses: actions/upload-artifact@v3
with:
name: example_artifacts
path: /home/runner/example_artifacts.tgz


75 changes: 8 additions & 67 deletions bin/radical-pilot-bridge
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,13 @@ __license__ = "MIT"

import os
import sys
import time

import threading as mt
import setproctitle as spt
import multiprocessing as mp

import radical.utils as ru

from radical.pilot.messages import HeartbeatMessage
from radical.pilot.messages import ComponentStartedMessage


# ------------------------------------------------------------------------------
Expand All @@ -28,27 +26,8 @@ def main(sid, reg_addr, uid, ppid, evt):
- name: name of the bridge
- kind: type of bridge (`pubsub` or `queue`)

If the config contains a `heartbeat` section, that section must be formatted
as follows:

{
'from' : 'uid',
'addr_pub': 'addr_pub',
'addr_sub': 'addr_sub',
'interval': <float>,
'timeout' : <float>
}

If that section exists, the process will daemonize and heartbeats are used
to manage the bridge lifetime: the lifetime of this bridge is then dependent
on receiving heartbeats from the given `uid`: after `timeout` seconds of no
heartbeats arriving, the bridge will terminate. The bridge itself will
publish heartbeats every `interval` seconds on the heartbeat channel under
its own uid.

If the heartbeat section is not present in the config file, the components
lifetime is expected to be explicitly managed, i.e., that this wrapper
process hosting the bridge is terminated externally.
The config will also contain a `cmgr_url` entry which points to the cmgr
this component should register with.

The config file may contain other entries which are passed to the bridge
and are interpreted by the bridge implementation.
Expand Down Expand Up @@ -105,11 +84,8 @@ def wrapped_main(sid, reg_addr, uid, log, prof, ppid, evt):
pwatcher.watch(int(ppid))
pwatcher.watch(os.getpid())

term = mt.Event()
reg = ru.zmq.RegistryClient(url=reg_addr)

hb_cfg = ru.TypedDict(reg['heartbeat'])
b_cfg = ru.TypedDict(reg['bridges.%s.cfg' % uid])
reg = ru.zmq.RegistryClient(url=reg_addr)
b_cfg = ru.TypedDict(reg['bridges.%s.cfg' % uid])

# create the instance and begin to work
bridge = ru.zmq.Bridge.create(uid, cfg=b_cfg)
Expand All @@ -119,48 +95,13 @@ def wrapped_main(sid, reg_addr, uid, log, prof, ppid, evt):

reg.close()
bridge.start()

evt.set()

# re-enable the test below if timing issues crop up
# if 'pubsub' in uid:
# d = ru.zmq.test_pubsub(bridge.channel, bridge.addr_pub, bridge.addr_sub)

# bridge runs - send heartbeats so that cmgr knows about it
hb_pub = ru.zmq.Publisher('heartbeat', hb_cfg.addr_pub, log=log, prof=prof)

def hb_beat_cb():
hb_pub.put('heartbeat', HeartbeatMessage(uid=uid))

def hb_term_cb(hb_uid):
bridge.stop()
term.set()
return False

hb = ru.Heartbeat(uid=uid,
timeout=hb_cfg.timeout,
interval=hb_cfg.interval,
beat_cb=hb_beat_cb,
term_cb=hb_term_cb,
log=log)
hb.start()

# always watch out for session heartbeat
hb.watch(uid=sid)

# react on session heartbeats
def hb_sub_cb(topic, msg):
hb_msg = HeartbeatMessage(from_dict=msg)
if hb_msg.uid == sid:
hb.beat(uid=sid)

ru.zmq.Subscriber('heartbeat', hb_cfg.addr_sub,
topic='heartbeat', cb=hb_sub_cb,
log=log, prof=prof)
pipe = ru.zmq.Pipe(mode=ru.zmq.MODE_PUSH, url=b_cfg.cmgr_url)
pipe.put(ComponentStartedMessage(uid=uid, pid=os.getpid()))

# all is set up - we can sit idle 'til end of time.
while not term.is_set():
time.sleep(1)
bridge.wait()


# ------------------------------------------------------------------------------
Expand Down
71 changes: 6 additions & 65 deletions bin/radical-pilot-component
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,13 @@ __license__ = "MIT"

import os
import sys
import time

import threading as mt
import setproctitle as spt
import multiprocessing as mp

import radical.utils as ru
import radical.pilot as rp

from radical.pilot.messages import HeartbeatMessage

# ------------------------------------------------------------------------------
#
Expand All @@ -28,27 +25,8 @@ def main(sid, reg_addr, uid, ppid, evt):
- name: name of the component
- kind: type of component

If the config contains a `heartbeat` section, that section must be formatted
as follows:

{
'from' : 'uid',
'addr_pub': 'addr_pub',
'addr_sub': 'addr_sub',
'interval': <float>,
'timeout' : <float>
}

If that section exists, the process will daemonize and heartbeats are used
to manage the bridge lifetime: the lifetime of this bridge is then dependent
on receiving heartbeats from the given `uid`: after `timeout` seconds of no
heartbeats arriving, the bridge will terminate. The bridge itself will
publish heartbeats every `interval` seconds on the heartbeat channel under
its own uid.

If the heartbeat section is not present in the config file, the components
lifetime is expected to be explicitly managed, i.e., that this wrapper
process hosting the bridge is terminated externally.
The config will also contain a `cmgr_url` entry which points to the cmgr
this component should register with.

The config file may contain other entries which are passed to the component
and are interpreted by the component implementation.
Expand Down Expand Up @@ -79,59 +57,22 @@ def wrapped_main(sid, reg_addr, uid, log, prof, ppid, evt):
pwatcher.watch(int(ppid))
pwatcher.watch(os.getpid())

term = mt.Event()
reg = ru.zmq.RegistryClient(url=reg_addr)

hb_cfg = ru.TypedDict(reg['heartbeat'])
c_cfg = ru.TypedDict(reg['components.%s.cfg' % uid])
reg = ru.zmq.RegistryClient(url=reg_addr)
c_cfg = ru.TypedDict(reg['components.%s.cfg' % uid])

reg.close()

evt.set()

# start a non-primary session
session = rp.Session(uid=sid, cfg=c_cfg,
_role=rp.Session._DEFAULT, _reg_addr=reg_addr)

# create the instance and begin to work
comp = rp.utils.BaseComponent.create(c_cfg, session)
comp.start()

# component runs - send heartbeats so that session knows about it
hb_pub = ru.zmq.Publisher('heartbeat', hb_cfg.addr_pub, log=log, prof=prof)

def hb_beat_cb():
hb_pub.put('heartbeat', HeartbeatMessage(uid=uid))

def hb_term_cb(hb_uid):
comp.stop()
term.set()
return False

hb = ru.Heartbeat(uid=uid,
timeout=hb_cfg.timeout,
interval=hb_cfg.interval,
beat_cb=hb_beat_cb,
term_cb=hb_term_cb,
log=log)
hb.start()

# always watch out for session heartbeat
hb.watch(uid=sid)

# react on session heartbeats
def hb_sub_cb(topic, msg):
hb_msg = HeartbeatMessage(from_dict=msg)
if hb_msg.uid == sid:
hb.beat(uid=sid)

ru.zmq.Subscriber('heartbeat', hb_cfg.addr_sub,
topic='heartbeat', cb=hb_sub_cb,
log=log, prof=prof)
evt.set()

# all is set up - we can sit idle 'til end of time.
while not term.is_set():
time.sleep(1)
comp.wait()


# ------------------------------------------------------------------------------
Expand Down
Loading
Loading