-
Notifications
You must be signed in to change notification settings - Fork 199
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix two bugs and add support for N masters, N workers with load balancing in RadicalPilotExecutor #3060
Fix two bugs and add support for N masters, N workers with load balancing in RadicalPilotExecutor #3060
Changes from 126 commits
60ca3bf
645640a
34f13a3
a28da44
b6f8fb7
648594c
f3fb0da
f476b64
0d8366e
8e380c5
a3397d0
051db89
0427428
94162ba
7ddea2f
9f1d59f
a8ed4bc
e242c34
9d7a7a9
79227c0
0069488
6197677
1a8d6b7
ecc0559
bccb46e
f697cfe
d65b735
a286b0b
a1bb899
739bb75
dd760d2
67d0d6e
683c657
199b8bb
362cd63
65b8ac5
e50267b
03c63b2
c2812b4
09d4374
65396c9
63a39b2
2e9dd42
e72950e
43db0cd
99954c9
471c11d
7038748
8ce031d
3493828
9565fb5
fbc200b
32c0dde
35b2833
da934a9
d87c5ad
1bc9549
881eac7
907fbc0
eede1ec
96efa6e
666fab9
96d45cc
e257506
6b0bc67
7d3eeac
8cc028f
fde214d
2e2f2ac
dfabd23
3f1e9f9
27aefbe
1a607d7
d3ab39d
34f25bf
6c1e31a
ae3f43d
110c1b3
fdd7917
373b30c
93977e7
0e4c709
9fe38da
ae63a57
99c7d67
cfff365
3c140d6
dcb2c41
2dc02bc
01afe53
f428238
9e800e3
c4cd868
e870b58
cd83126
48935f2
2dbf657
29d7d4c
f30596e
6bee18f
65ff8f4
5306eea
62b55f9
738e316
d606825
faaaefa
790efd5
b0c48af
931e58b
426c2b4
7e891ba
8b2037e
f5f339e
a35b5ee
468bc12
b4ed5b0
144c441
bff50d6
8a57a17
ee495fc
8085913
2f4f861
d973fb0
babd790
34558fb
8026bf5
bd807f2
5e570a0
61f9b92
5363dfe
0903330
59daf6c
0380ff9
85bade0
e10b95e
bc24368
46f1653
c7b2648
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,7 @@ | |
import logging | ||
import inspect | ||
import requests | ||
import tempfile | ||
import typeguard | ||
import threading as mt | ||
|
||
|
@@ -17,7 +18,7 @@ | |
from concurrent.futures import Future | ||
|
||
from parsl.app.python import timeout | ||
from .rpex_resources import ResourceConfig | ||
from .rpex_resources import ResourceConfig, MPI, CLIENT | ||
from parsl.data_provider.files import File | ||
from parsl.utils import RepresentationMixin | ||
from parsl.app.errors import BashExitFailure | ||
|
@@ -59,7 +60,7 @@ class RadicalPilotExecutor(ParslExecutor, RepresentationMixin): | |
``rp.PilotManager`` and ``rp.TaskManager``. | ||
2. "translate": Unwrap, identify, and parse Parsl ``apps`` into ``rp.TaskDescription``. | ||
3. "submit": Submit Parsl apps to ``rp.TaskManager``. | ||
4. "shut_down": Shut down the RADICAL-Pilot runtime and all associated components. | ||
4. "shutdown": Shut down the RADICAL-Pilot runtime and all associated components. | ||
|
||
Here is a diagram | ||
|
||
|
@@ -138,19 +139,26 @@ def __init__(self, | |
self.future_tasks: Dict[str, Future] = {} | ||
|
||
if rpex_cfg: | ||
self.rpex_cfg = rpex_cfg | ||
self.rpex_cfg = rpex_cfg.get_config() | ||
elif not rpex_cfg and 'local' in resource: | ||
self.rpex_cfg = ResourceConfig() | ||
self.rpex_cfg = ResourceConfig().get_config() | ||
else: | ||
raise ValueError('Resource config file must be ' | ||
'specified for a non-local execution') | ||
raise ValueError('Resource config must be ' | ||
'specified for a non-local resources') | ||
|
||
def task_state_cb(self, task, state): | ||
""" | ||
Update the state of Parsl Future apps | ||
Based on RP task state callbacks. | ||
""" | ||
if not task.uid.startswith('master'): | ||
# check the Master/Worker state | ||
if task.mode in [rp.RAPTOR_MASTER, rp.RAPTOR_WORKER]: | ||
if state == rp.FAILED: | ||
exception = RuntimeError(f'{task.uid} failed with internal error: {task.stderr}') | ||
self._fail_all_tasks(exception) | ||
|
||
# check all other tasks state | ||
else: | ||
parsl_task = self.future_tasks[task.uid] | ||
|
||
if state == rp.DONE: | ||
|
@@ -186,6 +194,23 @@ def task_state_cb(self, task, state): | |
else: | ||
parsl_task.set_exception('Task failed for an unknown reason') | ||
|
||
def _fail_all_tasks(self, exception): | ||
""" | ||
Fail all outstanding tasks with the given exception. | ||
|
||
This method iterates through all outstanding tasks in the | ||
`_future_tasks` dictionary, which have not yet completed, | ||
and sets the provided exception as their result, indicating | ||
a failure. | ||
|
||
Parameters: | ||
- exception: The exception to be set as the result for all | ||
outstanding tasks. | ||
""" | ||
for fut_task in self.future_tasks.values(): | ||
if not fut_task.done(): | ||
fut_task.set_exception(exception) | ||
benclifford marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
def start(self): | ||
"""Create the Pilot component and pass it. | ||
""" | ||
|
@@ -202,63 +227,67 @@ def start(self): | |
'resource': self.resource} | ||
|
||
if not self.resource or 'local' in self.resource: | ||
# move the agent sandbox to the working dir mainly | ||
# for debugging purposes. This will allow parsl | ||
# to include the agent sandbox with the ci artifacts. | ||
if os.environ.get("LOCAL_SANDBOX"): | ||
pd_init['sandbox'] = self.run_dir | ||
# set the agent dir to /temp as a fix for #3029 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this changed sandbox behaviour the only part that fixes #3029? If so: it's moving things into a temporary directory only when the CI tests are run (or in other situations where the in non-test situations, this behaviour is otherwise unchanged, right? If so, does this PR stop that situation occurring for normal users who are running real applications, not our CI? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. While this is a very rare case that can happen when the filesystem is exhausted for some reason (maybe too many concurrent I/O or so in the CI test machine), this behavior should not happen on a regular basis. I do understand your concerns here that this might also happen specifically on a I think to address your concern we can make the default location for the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I investigated this a bunch more (and have commented on what I see in other issues) to try to understand what was actually happening in the CI environment, because I like to at least have some understanding of how that environment behaves. I can see why this is happening in the CI (due to entering a strange zone of fd number-space which radical.saga cannot deal with) and therefore why this would happen in a user workflow (when a user's workflow submitting process has opened many - around 1000 - files). This PR avoid the misbehaving radical.saga code path by having a special non-default option to turn on a local sandbox. But in radical-cybertools/radical.saga#885 Andrew mentions, effectively, that radical.saga would be better replaced by psij-python - that would have the same effect of removing the misbehaving codepath, in a different way. So I tried that out in this draft PR: #3079 which doesn't actually avoid the misbehaving codepath, as far as I can tell. But maybe this is a better approach longer term, if radical.saga is effectively deprecated in favour of psij-python? |
||
if os.environ.get("RADICAL_TEMP_SANDBOX"): | ||
os.environ["RADICAL_LOG_LVL"] = "DEBUG" | ||
|
||
logger.info("RPEX will be running in the local mode") | ||
pd_init['sandbox'] = agent_dir = tempfile.gettempdir() | ||
logger.debug(f'RPEX will be running in test mode, agent sandbox will be located in {agent_dir}') | ||
else: | ||
logger.info("RPEX will be running in local mode") | ||
|
||
pd = rp.PilotDescription(pd_init) | ||
pd.verify() | ||
|
||
self.rpex_cfg = self.rpex_cfg._get_cfg_file(path=self.run_dir) | ||
cfg = ru.Config(cfg=ru.read_json(self.rpex_cfg)) | ||
# start RP's main components TMGR, PMGR and Pilot | ||
self.tmgr = rp.TaskManager(session=self.session) | ||
self.pmgr = rp.PilotManager(session=self.session) | ||
self.pilot = self.pmgr.submit_pilots(pd) | ||
|
||
if not self.pilot.description.get('cores') or not self.pilot.description.get('nodes'): | ||
logger.warning('no "cores/nodes" per pilot were set, using default resources') | ||
|
||
self.master = cfg.master_descr | ||
self.n_masters = cfg.n_masters | ||
self.tmgr.add_pilots(self.pilot) | ||
self.tmgr.register_callback(self.task_state_cb) | ||
|
||
tds = list() | ||
master_path = '{0}/rpex_master.py'.format(PWD) | ||
worker_path = '{0}/rpex_worker.py'.format(PWD) | ||
|
||
for i in range(self.n_masters): | ||
td = rp.TaskDescription(self.master) | ||
td.mode = rp.RAPTOR_MASTER | ||
td.uid = ru.generate_id('master.%(item_counter)06d', ru.ID_CUSTOM, | ||
self.masters = [] | ||
|
||
logger.info(f'Starting {self.rpex_cfg.n_masters} masters and {self.rpex_cfg.n_workers} workers for each master') | ||
|
||
# create N masters | ||
for _ in range(self.rpex_cfg.n_masters): | ||
md = rp.TaskDescription(self.rpex_cfg.master_descr) | ||
md.uid = ru.generate_id('rpex.master.%(item_counter)06d', ru.ID_CUSTOM, | ||
ns=self.session.uid) | ||
td.ranks = 1 | ||
td.cores_per_rank = 1 | ||
td.arguments = [self.rpex_cfg, i] | ||
td.input_staging = self._stage_files([File(master_path), | ||
File(worker_path), | ||
File(self.rpex_cfg)], mode='in') | ||
tds.append(td) | ||
|
||
self.pmgr = rp.PilotManager(session=self.session) | ||
self.tmgr = rp.TaskManager(session=self.session) | ||
# submit the master to the TMGR | ||
master = self.tmgr.submit_raptors(md)[0] | ||
self.masters.append(master) | ||
|
||
# submit pilot(s) | ||
pilot = self.pmgr.submit_pilots(pd) | ||
if not pilot.description.get('cores'): | ||
logger.warning('no "cores" per pilot was set, using default resources {0}'.format(pilot.resources)) | ||
workers = [] | ||
# create N workers for each master and submit them to the TMGR | ||
for _ in range(self.rpex_cfg.n_workers): | ||
wd = rp.TaskDescription(self.rpex_cfg.worker_descr) | ||
wd.uid = ru.generate_id('rpex.worker.%(item_counter)06d', ru.ID_CUSTOM, | ||
ns=self.session.uid) | ||
wd.raptor_id = master.uid | ||
wd.input_staging = self._stage_files([File(worker_path)], mode='in') | ||
workers.append(wd) | ||
|
||
self.tmgr.submit_tasks(tds) | ||
self.tmgr.submit_workers(workers) | ||
|
||
self.select_master = self._cyclic_master_selector() | ||
|
||
# prepare or use the current env for the agent/pilot side environment | ||
if cfg.pilot_env_mode != 'client': | ||
logger.info("creating {0} environment for the executor".format(cfg.pilot_env.name)) | ||
pilot.prepare_env(env_name=cfg.pilot_env.name, | ||
env_spec=cfg.pilot_env.as_dict()) | ||
if self.rpex_cfg.pilot_env_mode != CLIENT: | ||
logger.info("creating {0} environment for the executor".format(self.rpex_cfg.pilot_env.name)) | ||
self.pilot.prepare_env(env_name=self.rpex_cfg.pilot_env.name, | ||
env_spec=self.rpex_cfg.pilot_env.as_dict()) | ||
else: | ||
client_env = sys.prefix | ||
logger.info("reusing ({0}) environment for the executor".format(client_env)) | ||
|
||
self.tmgr.add_pilots(pilot) | ||
self.tmgr.register_callback(self.task_state_cb) | ||
|
||
# create a bulking thread to run the actual task submission | ||
# to RP in bulks | ||
if self.bulk_mode: | ||
|
@@ -272,8 +301,21 @@ def start(self): | |
self._bulk_thread.daemon = True | ||
self._bulk_thread.start() | ||
|
||
logger.info('bulk mode is on, submitting tasks in bulks') | ||
|
||
return True | ||
|
||
def _cyclic_master_selector(self): | ||
""" | ||
Balance tasks submission across N masters and N workers | ||
""" | ||
current_master = 0 | ||
masters_uids = [m.uid for m in self.masters] | ||
|
||
while True: | ||
yield masters_uids[current_master] | ||
current_master = (current_master + 1) % len(self.masters) | ||
|
||
def unwrap(self, func, args): | ||
""" | ||
Unwrap a Parsl app and its args for further processing. | ||
|
@@ -364,22 +406,25 @@ def task_translate(self, tid, func, parsl_resource_specification, args, kwargs): | |
|
||
# This is the default mode where the bash_app will be executed as | ||
# as a single core process by RP. For cores > 1 the user must use | ||
# above or use MPI functions if their code is Python. | ||
# task.mode=rp.TASK_EXECUTABLE (above) or use MPI functions if their | ||
# code is Python. | ||
else: | ||
task.mode = rp.TASK_PROC | ||
task.raptor_id = 'master.%06d' % (tid % self.n_masters) | ||
task.raptor_id = next(self.select_master) | ||
task.executable = self._pack_and_apply_message(func, args, kwargs) | ||
|
||
elif PYTHON in task_type or not task_type: | ||
task.mode = rp.TASK_FUNCTION | ||
task.raptor_id = 'master.%06d' % (tid % self.n_masters) | ||
task.raptor_id = next(self.select_master) | ||
if kwargs.get('walltime'): | ||
func = timeout(func, kwargs['walltime']) | ||
|
||
# we process MPI function differently | ||
if 'comm' in kwargs: | ||
# Check how to serialize the function object | ||
if MPI in self.rpex_cfg.worker_type.lower(): | ||
task.use_mpi = True | ||
task.function = rp.PythonTask(func, *args, **kwargs) | ||
else: | ||
task.use_mpi = False | ||
task.function = self._pack_and_apply_message(func, args, kwargs) | ||
|
||
task.input_staging = self._stage_files(kwargs.get("inputs", []), | ||
|
@@ -394,7 +439,7 @@ def task_translate(self, tid, func, parsl_resource_specification, args, kwargs): | |
try: | ||
task.verify() | ||
except ru.typeddict.TDKeyError as e: | ||
raise Exception(f'{e}. Please check Radical.Pilot TaskDescription documentation') | ||
raise Exception(f'{e}. Please check: https://radicalpilot.readthedocs.io/en/stable/ documentation') | ||
|
||
return task | ||
|
||
|
@@ -545,7 +590,8 @@ def submit(self, func, resource_specification, *args, **kwargs): | |
|
||
def shutdown(self, hub=True, targets='all', block=False): | ||
"""Shutdown the executor, including all RADICAL-Pilot components.""" | ||
logger.info("RadicalPilotExecutor shutdown") | ||
logger.info("RadicalPilotExecutor is terminating...") | ||
self.session.close(download=True) | ||
logger.info("RadicalPilotExecutor is terminated.") | ||
|
||
return True |
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think this variable is not interpreted as a Python bool so it's maybe misleading to use
True
here in case someone thinksFalse
would do something opposite.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with that. maybe
RADICAL_TEMP_SANDBOX=yes
orRADICAL_TEMP_SANDBOX=1
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think everything to do with RADICAL_TEMP_SANDBOX can be removed from this PR now, because I think the problem it was solving was fixed in a different way by moving away from radical SAGA?