Skip to content

Commit

Permalink
terminate bulk thread
Browse files Browse the repository at this point in the history
  • Loading branch information
AymenFJA committed Dec 9, 2024
1 parent 649c8b4 commit 2ea2b24
Showing 1 changed file with 12 additions and 1 deletion.
13 changes: 12 additions & 1 deletion parsl/executors/radical/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ def __init__(self,
self.resource = resource
self._uid = RPEX.lower()
self.bulk_mode = bulk_mode
self._terminate = mt.Event()
self.working_dir = working_dir
self.pilot_kwargs = rpex_pilot_kwargs
self.future_tasks: Dict[str, Future] = {}
Expand Down Expand Up @@ -532,7 +533,7 @@ def _bulk_collector(self):

bulk = list()

while True:
while not self._terminate.is_set():

now = time.time() # time of last submission

Expand All @@ -552,6 +553,9 @@ def _bulk_collector(self):
if len(bulk) >= self._max_bulk_size:
break

if self._terminate.is_set():
break

if bulk:
logger.debug('submit bulk: %d', len(bulk))
self.tmgr.submit_tasks(bulk)
Expand Down Expand Up @@ -588,6 +592,13 @@ def submit(self, func, resource_specification, *args, **kwargs):
def shutdown(self, hub=True, targets='all', block=False):
"""Shutdown the executor, including all RADICAL-Pilot components."""
logger.info("RadicalPilotExecutor is terminating...")

self._terminate.set()

# ensure we are in the bulk submssion mode
if self.bulk_mode:
self._bulk_thread.join()

self.session.close(download=True)
logger.info("RadicalPilotExecutor is terminated.")

Expand Down

0 comments on commit 2ea2b24

Please sign in to comment.