Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix two bugs and add support for N masters, N workers with load balancing in RadicalPilotExecutor #3060

Merged
merged 138 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from 126 commits
Commits
Show all changes
138 commits
Select commit Hold shift + click to select a range
60ca3bf
radical_executor
AymenFJA May 17, 2023
645640a
init
AymenFJA May 17, 2023
34f13a3
Merge branch 'Parsl:master' into master
AymenFJA May 18, 2023
a28da44
Merge branch 'Parsl:master' into master
AymenFJA May 19, 2023
b6f8fb7
Merge branch 'Parsl:master' into master
AymenFJA May 31, 2023
648594c
Merge branch 'Parsl:master' into master
AymenFJA Jun 5, 2023
f3fb0da
Merge branch 'Parsl:master' into master
AymenFJA Jun 8, 2023
f476b64
Merge branch 'Parsl:master' into master
AymenFJA Jun 12, 2023
0d8366e
Merge branch 'Parsl:master' into master
AymenFJA Jun 28, 2023
8e380c5
Merge branch 'Parsl:master' into master
AymenFJA Jul 26, 2023
a3397d0
Merge branch 'Parsl:master' into master
AymenFJA Jul 28, 2023
051db89
adapt the new RAPTOR changes and cleanup
AymenFJA Jul 28, 2023
0427428
Merge branch 'Parsl:master' into master
AymenFJA Jul 31, 2023
94162ba
Merge branch 'Parsl:master' into master
AymenFJA Aug 3, 2023
7ddea2f
addressing comments and refine
AymenFJA Aug 6, 2023
9f1d59f
Merge branch 'Parsl:master' into master
AymenFJA Aug 6, 2023
a8ed4bc
flake8
AymenFJA Aug 6, 2023
e242c34
rpex name
AymenFJA Aug 6, 2023
9d7a7a9
refine and cleanup
AymenFJA Aug 7, 2023
79227c0
Merge branch 'Parsl:master' into master
AymenFJA Aug 10, 2023
0069488
Merge branch 'Parsl:master' into feature/refine
AymenFJA Aug 10, 2023
6197677
Merge branch 'Parsl:master' into master
AymenFJA Aug 11, 2023
1a8d6b7
Merge branch 'Parsl:master' into feature/refine
AymenFJA Aug 11, 2023
ecc0559
Merge pull request #1 from AymenFJA/feature/refine
AymenFJA Aug 11, 2023
bccb46e
Merge branch 'Parsl:master' into master
AymenFJA Aug 11, 2023
f697cfe
refine and adapt Parsl Base Executor
AymenFJA Aug 11, 2023
d65b735
Merge branch 'Parsl:master' into master
AymenFJA Aug 12, 2023
a286b0b
Merge branch 'Parsl:master' into master
AymenFJA Aug 16, 2023
a1bb899
radical tests
AymenFJA Aug 17, 2023
739bb75
Merge branch 'Parsl:master' into master
AymenFJA Aug 21, 2023
dd760d2
fixes and refine
AymenFJA Aug 23, 2023
67d0d6e
refine and docs
AymenFJA Aug 24, 2023
683c657
addressing comments
AymenFJA Aug 25, 2023
199b8bb
Merge branch 'Parsl:master' into master
AymenFJA Sep 7, 2023
362cd63
Merge branch 'Parsl:master' into master
AymenFJA Sep 15, 2023
65b8ac5
Merge branch 'Parsl:master' into master
AymenFJA Sep 25, 2023
e50267b
Merge branch 'Parsl:master' into master
AymenFJA Oct 5, 2023
03c63b2
disable RADICAL report
AymenFJA Oct 5, 2023
c2812b4
Merge branch 'master' of https://github.com/AymenFJA/parsl
AymenFJA Oct 5, 2023
09d4374
Merge branch 'Parsl:master' into master
AymenFJA Oct 12, 2023
65396c9
Merge branch 'Parsl:master' into master
AymenFJA Oct 17, 2023
63a39b2
refine
AymenFJA Oct 17, 2023
2e9dd42
refine
AymenFJA Oct 18, 2023
e72950e
refine
AymenFJA Oct 18, 2023
43db0cd
update mypy
AymenFJA Oct 19, 2023
99954c9
adding _init__ to radical_test folder
AymenFJA Oct 19, 2023
471c11d
add type annotation
AymenFJA Oct 19, 2023
7038748
Merge branch 'Parsl:master' into master
AymenFJA Oct 24, 2023
8ce031d
adding radical installation to test-requirements
AymenFJA Oct 24, 2023
3493828
Merge branch 'Parsl:master' into master
AymenFJA Oct 24, 2023
9565fb5
rename testes for pytest to recognize them
AymenFJA Oct 24, 2023
fbc200b
attempt to exclude radical_tests from other executors
AymenFJA Oct 24, 2023
32c0dde
Merge branch 'Parsl:master' into master
AymenFJA Oct 24, 2023
35b2833
Merge branch 'master' of https://github.com/AymenFJA/parsl
AymenFJA Oct 24, 2023
da934a9
separate radical test from all executors tests
AymenFJA Oct 25, 2023
d87c5ad
fix a typo
AymenFJA Oct 25, 2023
1bc9549
adding random and duration to pytest radical cmd
AymenFJA Oct 25, 2023
881eac7
enable debug logs
AymenFJA Oct 26, 2023
907fbc0
temp: including radical sessions in artificats
AymenFJA Oct 26, 2023
eede1ec
revert ci.yaml artificat path and set radical session path in workdir
AymenFJA Oct 26, 2023
96efa6e
flake8
AymenFJA Oct 26, 2023
666fab9
typo
AymenFJA Oct 26, 2023
96d45cc
Hack for debugging more in CI
benclifford Oct 30, 2023
e257506
Set timeout for everything to be 6 hours
benclifford Oct 30, 2023
6b0bc67
Reduce radical step timeout to 10 minutes, because that's plenty for …
benclifford Oct 30, 2023
7d3eeac
add mpich to the ci setup
AymenFJA Oct 30, 2023
8cc028f
Merge branch 'Parsl:master' into master
AymenFJA Oct 30, 2023
fde214d
optionally set agent sandbox location via env var to reduce tests zip…
AymenFJA Oct 30, 2023
2e2f2ac
Revert timing and test selection hacks that should not be needed any …
benclifford Oct 31, 2023
dfabd23
adding pytest to rpex env
AymenFJA Oct 31, 2023
3f1e9f9
Merge branch 'master' of https://github.com/AymenFJA/parsl
AymenFJA Oct 31, 2023
27aefbe
Making RadicalPilotExecutor imports explicit
AymenFJA Oct 31, 2023
1a607d7
Run all tests, not just test_callables.py, like other executors
benclifford Nov 1, 2023
d3ab39d
change master_per_node to cores_per_master and assert mpi comm size i…
AymenFJA Nov 1, 2023
34f25bf
Merge branch 'Parsl:master' into master
AymenFJA Nov 1, 2023
6c1e31a
flake8
AymenFJA Nov 1, 2023
ae3f43d
Exclude tests that expect stdin/out to be written to the local filesy…
benclifford Nov 1, 2023
110c1b3
1. setting task file path and file name via parsl stdout and stderr
AymenFJA Nov 2, 2023
fdd7917
fix conflict
AymenFJA Nov 2, 2023
373b30c
remove comma
AymenFJA Nov 2, 2023
93977e7
flake8
AymenFJA Nov 2, 2023
0e4c709
Added features:
AymenFJA Nov 3, 2023
9fe38da
raise exception when we fail to unwrap a parsl app
AymenFJA Nov 3, 2023
ae63a57
Merge branch 'master' into master
AymenFJA Nov 3, 2023
99c7d67
Merge branch 'master' into master
AymenFJA Nov 6, 2023
cfff365
This commit fixes:
AymenFJA Nov 6, 2023
3c140d6
This commit adds:
AymenFJA Nov 7, 2023
dcb2c41
Merge branch 'master' into master
AymenFJA Nov 7, 2023
2dc02bc
fix tests and ignore tests
AymenFJA Nov 8, 2023
01afe53
randomize the Radical tests
AymenFJA Nov 8, 2023
f428238
clarify comment
AymenFJA Nov 8, 2023
9e800e3
Adapt Parsl execution approach on the worker level
AymenFJA Nov 9, 2023
c4cd868
This commit adds:
AymenFJA Nov 10, 2023
e870b58
fix MPI functions wrapping
AymenFJA Nov 10, 2023
cd83126
Trying out some venv stuff
benclifford Nov 10, 2023
48935f2
Cache venv /conda/bin:/home/benc/parsl/virtualenv-3.11/bin:/home/benc…
benclifford Nov 10, 2023
2dbf657
-Switch RPEX default mode to use env in localhost
AymenFJA Nov 10, 2023
29d7d4c
Merge branch 'master' into master
AymenFJA Nov 10, 2023
f30596e
Initialize virtualenv at the start of each run step rather than copyi…
benclifford Nov 11, 2023
6bee18f
Remove misplaced virtualenv activation
benclifford Nov 11, 2023
65ff8f4
fix env name and do not include the agent sandbox in the runinfo folder
AymenFJA Nov 11, 2023
5306eea
Remove spurious whitespace change
benclifford Nov 13, 2023
62b55f9
Add RadicalPilotExecutor to reference guide
benclifford Nov 13, 2023
738e316
Install radical.pilot Python dependency the same as other optional de…
benclifford Nov 13, 2023
d606825
Merge branch 'master' into master
benclifford Nov 13, 2023
faaaefa
address Ben's comments
AymenFJA Nov 13, 2023
790efd5
a try to isolate RPEX specific test
AymenFJA Nov 13, 2023
b0c48af
fix radical executor ref.
AymenFJA Nov 14, 2023
931e58b
Fix a couple of documentation build failures - at least, docs now bui…
benclifford Nov 14, 2023
426c2b4
Rearrange import time dependencies to not need radical.pilot installed
benclifford Nov 14, 2023
7e891ba
Fix type annotations: only one type annotation for _setup_paths
benclifford Nov 14, 2023
8b2037e
Merge branch 'Parsl:master' into master
AymenFJA Nov 14, 2023
f5f339e
Merge branch 'Parsl:master' into master
AymenFJA Nov 16, 2023
a35b5ee
Merge branch 'Parsl:master' into master
AymenFJA Nov 17, 2023
468bc12
Merge branch 'Parsl:master' into master
AymenFJA Nov 20, 2023
b4ed5b0
Merge branch 'Parsl:master' into master
AymenFJA Nov 27, 2023
144c441
Merge branch 'Parsl:master' into master
AymenFJA Nov 29, 2023
bff50d6
Merge branch 'Parsl:master' into master
AymenFJA Dec 8, 2023
8a57a17
Merge branch 'Parsl:master' into master
AymenFJA Jan 16, 2024
ee495fc
Merge branch 'Parsl:master' into master
AymenFJA Jan 22, 2024
8085913
Merge branch 'Parsl:master' into master
AymenFJA Feb 1, 2024
2f4f861
Merge branch 'Parsl:master' into master
AymenFJA Feb 6, 2024
d973fb0
Merge branch 'Parsl:master' into master
AymenFJA Feb 7, 2024
babd790
Merge branch 'Parsl:master' into master
AymenFJA Feb 8, 2024
34558fb
Merge branch 'Parsl:master' into master
AymenFJA Feb 8, 2024
8026bf5
Fixes and improvements
AymenFJA Feb 8, 2024
bd807f2
Merge branch 'Parsl:master' into fix_issues_#3029_improve
AymenFJA May 1, 2024
5e570a0
Merge branch 'Parsl:master' into fix_issues_#3029_improve
AymenFJA May 3, 2024
61f9b92
separate task sandbox from task stdout/err location
AymenFJA May 3, 2024
5363dfe
Merge branch 'Parsl:master' into fix_issues_#3029_improve
AymenFJA May 9, 2024
0903330
address corner case of task exception
AymenFJA May 9, 2024
59daf6c
Merge origin/master with manual resolution
benclifford Jul 1, 2024
0380ff9
Merge branch 'master' into fix_issues_#3029_improve
AymenFJA Jul 1, 2024
85bade0
Merge branch 'Parsl:master' into fix_issues_#3029_improve
AymenFJA Jul 2, 2024
e10b95e
remove temp sandbox
AymenFJA Jul 2, 2024
bc24368
isort fix imports order
AymenFJA Jul 2, 2024
46f1653
Merge branch 'Parsl:master' into fix_issues_#3029_improve
AymenFJA Jul 3, 2024
c7b2648
Merge branch 'master' into fix_issues_#3029_improve
benclifford Jul 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export PATH := $(CCTOOLS_INSTALL)/bin/:$(PATH)
export CCTOOLS_VERSION=7.7.2
export HYDRA_LAUNCHER=fork
export OMPI_MCA_rmaps_base_oversubscribe=yes
export RADICAL_TEMP_SANDBOX=True
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this variable is not interpreted as a Python bool so it's maybe misleading to use True here in case someone thinks False would do something opposite.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with that. maybe RADICAL_TEMP_SANDBOX=yes or RADICAL_TEMP_SANDBOX=1 ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think everything to do with RADICAL_TEMP_SANDBOX can be removed from this PR now, because I think the problem it was solving was fixed in a different way by moving away from radical SAGA?

MPI=$(MPICH)

.PHONY: help
Expand Down
150 changes: 98 additions & 52 deletions parsl/executors/radical/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import logging
import inspect
import requests
import tempfile
import typeguard
import threading as mt

Expand All @@ -17,7 +18,7 @@
from concurrent.futures import Future

from parsl.app.python import timeout
from .rpex_resources import ResourceConfig
from .rpex_resources import ResourceConfig, MPI, CLIENT
from parsl.data_provider.files import File
from parsl.utils import RepresentationMixin
from parsl.app.errors import BashExitFailure
Expand Down Expand Up @@ -59,7 +60,7 @@ class RadicalPilotExecutor(ParslExecutor, RepresentationMixin):
``rp.PilotManager`` and ``rp.TaskManager``.
2. "translate": Unwrap, identify, and parse Parsl ``apps`` into ``rp.TaskDescription``.
3. "submit": Submit Parsl apps to ``rp.TaskManager``.
4. "shut_down": Shut down the RADICAL-Pilot runtime and all associated components.
4. "shutdown": Shut down the RADICAL-Pilot runtime and all associated components.

Here is a diagram

Expand Down Expand Up @@ -138,19 +139,26 @@ def __init__(self,
self.future_tasks: Dict[str, Future] = {}

if rpex_cfg:
self.rpex_cfg = rpex_cfg
self.rpex_cfg = rpex_cfg.get_config()
elif not rpex_cfg and 'local' in resource:
self.rpex_cfg = ResourceConfig()
self.rpex_cfg = ResourceConfig().get_config()
else:
raise ValueError('Resource config file must be '
'specified for a non-local execution')
raise ValueError('Resource config must be '
'specified for a non-local resources')

def task_state_cb(self, task, state):
"""
Update the state of Parsl Future apps
Based on RP task state callbacks.
"""
if not task.uid.startswith('master'):
# check the Master/Worker state
if task.mode in [rp.RAPTOR_MASTER, rp.RAPTOR_WORKER]:
if state == rp.FAILED:
exception = RuntimeError(f'{task.uid} failed with internal error: {task.stderr}')
self._fail_all_tasks(exception)

# check all other tasks state
else:
parsl_task = self.future_tasks[task.uid]

if state == rp.DONE:
Expand Down Expand Up @@ -186,6 +194,23 @@ def task_state_cb(self, task, state):
else:
parsl_task.set_exception('Task failed for an unknown reason')

def _fail_all_tasks(self, exception):
"""
Fail all outstanding tasks with the given exception.

This method iterates through all outstanding tasks in the
`_future_tasks` dictionary, which have not yet completed,
and sets the provided exception as their result, indicating
a failure.

Parameters:
- exception: The exception to be set as the result for all
outstanding tasks.
"""
for fut_task in self.future_tasks.values():
if not fut_task.done():
fut_task.set_exception(exception)
benclifford marked this conversation as resolved.
Show resolved Hide resolved

def start(self):
"""Create the Pilot component and pass it.
"""
Expand All @@ -202,63 +227,67 @@ def start(self):
'resource': self.resource}

if not self.resource or 'local' in self.resource:
# move the agent sandbox to the working dir mainly
# for debugging purposes. This will allow parsl
# to include the agent sandbox with the ci artifacts.
if os.environ.get("LOCAL_SANDBOX"):
pd_init['sandbox'] = self.run_dir
# set the agent dir to /temp as a fix for #3029
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this changed sandbox behaviour the only part that fixes #3029?

If so:

it's moving things into a temporary directory only when the CI tests are run (or in other situations where the RADICAL_TEMP_SANDBOX environment variable is set).

in non-test situations, this behaviour is otherwise unchanged, right? If so, does this PR stop that situation occurring for normal users who are running real applications, not our CI?

@AymenFJA

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this is a very rare case that can happen when the filesystem is exhausted for some reason (maybe too many concurrent I/O or so in the CI test machine), this behavior should not happen on a regular basis.

I do understand your concerns here that this might also happen specifically on a local non-test setup (as the agent sandbox is located in $HOME), we never observed this behavior on an HPC machine as we use the shared file system to store our agent sandbox.

I think to address your concern we can make the default location for the sandbox on local setup in general inside the /tmp folder? I am open to suggestions as well.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I investigated this a bunch more (and have commented on what I see in other issues) to try to understand what was actually happening in the CI environment, because I like to at least have some understanding of how that environment behaves.

I can see why this is happening in the CI (due to entering a strange zone of fd number-space which radical.saga cannot deal with) and therefore why this would happen in a user workflow (when a user's workflow submitting process has opened many - around 1000 - files).

This PR avoid the misbehaving radical.saga code path by having a special non-default option to turn on a local sandbox.

But in radical-cybertools/radical.saga#885 Andrew mentions, effectively, that radical.saga would be better replaced by psij-python - that would have the same effect of removing the misbehaving codepath, in a different way.

So I tried that out in this draft PR: #3079 which doesn't actually avoid the misbehaving codepath, as far as I can tell.

But maybe this is a better approach longer term, if radical.saga is effectively deprecated in favour of psij-python?

if os.environ.get("RADICAL_TEMP_SANDBOX"):
os.environ["RADICAL_LOG_LVL"] = "DEBUG"

logger.info("RPEX will be running in the local mode")
pd_init['sandbox'] = agent_dir = tempfile.gettempdir()
logger.debug(f'RPEX will be running in test mode, agent sandbox will be located in {agent_dir}')
else:
logger.info("RPEX will be running in local mode")

pd = rp.PilotDescription(pd_init)
pd.verify()

self.rpex_cfg = self.rpex_cfg._get_cfg_file(path=self.run_dir)
cfg = ru.Config(cfg=ru.read_json(self.rpex_cfg))
# start RP's main components TMGR, PMGR and Pilot
self.tmgr = rp.TaskManager(session=self.session)
self.pmgr = rp.PilotManager(session=self.session)
self.pilot = self.pmgr.submit_pilots(pd)

if not self.pilot.description.get('cores') or not self.pilot.description.get('nodes'):
logger.warning('no "cores/nodes" per pilot were set, using default resources')

self.master = cfg.master_descr
self.n_masters = cfg.n_masters
self.tmgr.add_pilots(self.pilot)
self.tmgr.register_callback(self.task_state_cb)

tds = list()
master_path = '{0}/rpex_master.py'.format(PWD)
worker_path = '{0}/rpex_worker.py'.format(PWD)

for i in range(self.n_masters):
td = rp.TaskDescription(self.master)
td.mode = rp.RAPTOR_MASTER
td.uid = ru.generate_id('master.%(item_counter)06d', ru.ID_CUSTOM,
self.masters = []

logger.info(f'Starting {self.rpex_cfg.n_masters} masters and {self.rpex_cfg.n_workers} workers for each master')

# create N masters
for _ in range(self.rpex_cfg.n_masters):
md = rp.TaskDescription(self.rpex_cfg.master_descr)
md.uid = ru.generate_id('rpex.master.%(item_counter)06d', ru.ID_CUSTOM,
ns=self.session.uid)
td.ranks = 1
td.cores_per_rank = 1
td.arguments = [self.rpex_cfg, i]
td.input_staging = self._stage_files([File(master_path),
File(worker_path),
File(self.rpex_cfg)], mode='in')
tds.append(td)

self.pmgr = rp.PilotManager(session=self.session)
self.tmgr = rp.TaskManager(session=self.session)
# submit the master to the TMGR
master = self.tmgr.submit_raptors(md)[0]
self.masters.append(master)

# submit pilot(s)
pilot = self.pmgr.submit_pilots(pd)
if not pilot.description.get('cores'):
logger.warning('no "cores" per pilot was set, using default resources {0}'.format(pilot.resources))
workers = []
# create N workers for each master and submit them to the TMGR
for _ in range(self.rpex_cfg.n_workers):
wd = rp.TaskDescription(self.rpex_cfg.worker_descr)
wd.uid = ru.generate_id('rpex.worker.%(item_counter)06d', ru.ID_CUSTOM,
ns=self.session.uid)
wd.raptor_id = master.uid
wd.input_staging = self._stage_files([File(worker_path)], mode='in')
workers.append(wd)

self.tmgr.submit_tasks(tds)
self.tmgr.submit_workers(workers)

self.select_master = self._cyclic_master_selector()

# prepare or use the current env for the agent/pilot side environment
if cfg.pilot_env_mode != 'client':
logger.info("creating {0} environment for the executor".format(cfg.pilot_env.name))
pilot.prepare_env(env_name=cfg.pilot_env.name,
env_spec=cfg.pilot_env.as_dict())
if self.rpex_cfg.pilot_env_mode != CLIENT:
logger.info("creating {0} environment for the executor".format(self.rpex_cfg.pilot_env.name))
self.pilot.prepare_env(env_name=self.rpex_cfg.pilot_env.name,
env_spec=self.rpex_cfg.pilot_env.as_dict())
else:
client_env = sys.prefix
logger.info("reusing ({0}) environment for the executor".format(client_env))

self.tmgr.add_pilots(pilot)
self.tmgr.register_callback(self.task_state_cb)

# create a bulking thread to run the actual task submission
# to RP in bulks
if self.bulk_mode:
Expand All @@ -272,8 +301,21 @@ def start(self):
self._bulk_thread.daemon = True
self._bulk_thread.start()

logger.info('bulk mode is on, submitting tasks in bulks')

return True

def _cyclic_master_selector(self):
"""
Balance tasks submission across N masters and N workers
"""
current_master = 0
masters_uids = [m.uid for m in self.masters]

while True:
yield masters_uids[current_master]
current_master = (current_master + 1) % len(self.masters)

def unwrap(self, func, args):
"""
Unwrap a Parsl app and its args for further processing.
Expand Down Expand Up @@ -364,22 +406,25 @@ def task_translate(self, tid, func, parsl_resource_specification, args, kwargs):

# This is the default mode where the bash_app will be executed as
# as a single core process by RP. For cores > 1 the user must use
# above or use MPI functions if their code is Python.
# task.mode=rp.TASK_EXECUTABLE (above) or use MPI functions if their
# code is Python.
else:
task.mode = rp.TASK_PROC
task.raptor_id = 'master.%06d' % (tid % self.n_masters)
task.raptor_id = next(self.select_master)
task.executable = self._pack_and_apply_message(func, args, kwargs)

elif PYTHON in task_type or not task_type:
task.mode = rp.TASK_FUNCTION
task.raptor_id = 'master.%06d' % (tid % self.n_masters)
task.raptor_id = next(self.select_master)
if kwargs.get('walltime'):
func = timeout(func, kwargs['walltime'])

# we process MPI function differently
if 'comm' in kwargs:
# Check how to serialize the function object
if MPI in self.rpex_cfg.worker_type.lower():
task.use_mpi = True
task.function = rp.PythonTask(func, *args, **kwargs)
else:
task.use_mpi = False
task.function = self._pack_and_apply_message(func, args, kwargs)

task.input_staging = self._stage_files(kwargs.get("inputs", []),
Expand All @@ -394,7 +439,7 @@ def task_translate(self, tid, func, parsl_resource_specification, args, kwargs):
try:
task.verify()
except ru.typeddict.TDKeyError as e:
raise Exception(f'{e}. Please check Radical.Pilot TaskDescription documentation')
raise Exception(f'{e}. Please check: https://radicalpilot.readthedocs.io/en/stable/ documentation')

return task

Expand Down Expand Up @@ -545,7 +590,8 @@ def submit(self, func, resource_specification, *args, **kwargs):

def shutdown(self, hub=True, targets='all', block=False):
"""Shutdown the executor, including all RADICAL-Pilot components."""
logger.info("RadicalPilotExecutor shutdown")
logger.info("RadicalPilotExecutor is terminating...")
self.session.close(download=True)
logger.info("RadicalPilotExecutor is terminated.")

return True
42 changes: 0 additions & 42 deletions parsl/executors/radical/rpex_master.py

This file was deleted.

20 changes: 13 additions & 7 deletions parsl/executors/radical/rpex_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class ResourceConfig:
python_v: str = f'{sys.version_info[0]}.{sys.version_info[1]}'
worker_type: str = DEFAULT_WORKER

def _get_cfg_file(cls, path=None):
def get_config(cls, path=None):

# Default ENV mode for RP is to reuse
# the client side. If this is not the case,
Expand All @@ -126,6 +126,7 @@ def _get_cfg_file(cls, path=None):
cfg = {
'n_masters': cls.masters,
'n_workers': cls.workers,
'worker_type': cls.worker_type,
'gpus_per_node': cls.worker_gpus_per_node,
'cores_per_node': cls.worker_cores_per_node,
'cores_per_master': cls.cores_per_master,
Expand All @@ -143,9 +144,10 @@ def _get_cfg_file(cls, path=None):
'pilot_env_mode': cls.pilot_env_mode,

'master_descr': {
"ranks": 1,
"cores_per_rank": 1,
"mode": rp.RAPTOR_MASTER,
"named_env": cls.pilot_env_name,
"executable": "python3 rpex_master.py",
},

'worker_descr': {
Expand All @@ -154,12 +156,16 @@ def _get_cfg_file(cls, path=None):
"raptor_file": "./rpex_worker.py",
"raptor_class": cls.worker_type if
cls.worker_type.lower() != MPI else MPI_WORKER,
"ranks": cls.nodes_per_worker * cls.worker_cores_per_node,
"gpus_per_rank": cls.nodes_per_worker * cls.worker_gpus_per_node,
}}

# Convert the class instance to a cfg file.
config_path = 'rpex.cfg'
# Convert the class instance to a Json file or a Config dict.
if path:
config_path = 'rpex.cfg'
config_path = path + '/' + config_path
with open(config_path, 'w') as f:
json.dump(cfg, f, indent=4)
return config_path
with open(config_path, 'w') as f:
json.dump(cfg, f, indent=4)
else:
config_obj = ru.Config(from_dict=cfg)
benclifford marked this conversation as resolved.
Show resolved Hide resolved
return config_obj
Loading