Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RP thread cleanup #3272

Open
AymenFJA opened this issue Dec 2, 2024 · 3 comments
Open

RP thread cleanup #3272

AymenFJA opened this issue Dec 2, 2024 · 3 comments

Comments

@AymenFJA
Copy link
Contributor

AymenFJA commented Dec 2, 2024

From Parsl+RP: termination issue with RAPTOR: Parsl/parsl#3708

@andre-merzky
Copy link
Member

This has actually nothing to do with raptor, but with the ZMQ listeners we create all over the place in the session and managers. The RP PR #3269 and RU PR radical-cybertools/radical.utils#428 resolve that issue.

In Parsl's RADICAL executor, the following patch should be applied to ensure thread cleanup also (not tested):

$ git diff | cat
diff --git i/parsl/executors/radical/executor.py w/parsl/executors/radical/executor.py
index 93b4b38b..5e670d67 100644
--- i/parsl/executors/radical/executor.py
+++ w/parsl/executors/radical/executor.py
@@ -136,6 +136,7 @@ class RadicalPilotExecutor(ParslExecutor, RepresentationMixin):
         self.working_dir = working_dir
         self.pilot_kwargs = rpex_pilot_kwargs
         self.future_tasks: Dict[str, Future] = {}
+        self._term = mt.Event()

         if rpex_cfg:
             self.rpex_cfg = rpex_cfg.get_config()
@@ -532,7 +533,7 @@ class RadicalPilotExecutor(ParslExecutor, RepresentationMixin):

         bulk = list()

-        while True:
+        while not self._term.is_set():

             now = time.time()  # time of last submission

@@ -552,6 +553,9 @@ class RadicalPilotExecutor(ParslExecutor, RepresentationMixin):
                 if len(bulk) >= self._max_bulk_size:
                     break

+                if self._term.is_set():
+                    break
+
             if bulk:
                 logger.debug('submit bulk: %d', len(bulk))
                 self.tmgr.submit_tasks(bulk)
@@ -588,6 +592,7 @@ class RadicalPilotExecutor(ParslExecutor, RepresentationMixin):
     def shutdown(self, hub=True, targets='all', block=False):
         """Shutdown the executor, including all RADICAL-Pilot components."""
         logger.info("RadicalPilotExecutor is terminating...")
+        self._term.set()
+        self._bulk_thread.join()
         self.session.close(download=True)
         logger.info("RadicalPilotExecutor is terminated.")

@andre-merzky andre-merzky changed the title RP Raptor termination RP thread cleanup Dec 4, 2024
@AymenFJA
Copy link
Contributor Author

AymenFJA commented Dec 9, 2024

This has actually nothing to do with raptor, but with the ZMQ listeners we create all over the place in the session and managers. The RP PR #3269 and RU PR radical-cybertools/radical.utils#428 resolve that issue.

In Parsl's RADICAL executor, the following patch should be applied to ensure thread cleanup also (not tested):

$ git diff | cat
diff --git i/parsl/executors/radical/executor.py w/parsl/executors/radical/executor.py
index 93b4b38b..5e670d67 100644
--- i/parsl/executors/radical/executor.py
+++ w/parsl/executors/radical/executor.py
@@ -136,6 +136,7 @@ class RadicalPilotExecutor(ParslExecutor, RepresentationMixin):
         self.working_dir = working_dir
         self.pilot_kwargs = rpex_pilot_kwargs
         self.future_tasks: Dict[str, Future] = {}
+        self._term = mt.Event()

         if rpex_cfg:
             self.rpex_cfg = rpex_cfg.get_config()
@@ -532,7 +533,7 @@ class RadicalPilotExecutor(ParslExecutor, RepresentationMixin):

         bulk = list()

-        while True:
+        while not self._term.is_set():

             now = time.time()  # time of last submission

@@ -552,6 +553,9 @@ class RadicalPilotExecutor(ParslExecutor, RepresentationMixin):
                 if len(bulk) >= self._max_bulk_size:
                     break

+                if self._term.is_set():
+                    break
+
             if bulk:
                 logger.debug('submit bulk: %d', len(bulk))
                 self.tmgr.submit_tasks(bulk)
@@ -588,6 +592,7 @@ class RadicalPilotExecutor(ParslExecutor, RepresentationMixin):
     def shutdown(self, hub=True, targets='all', block=False):
         """Shutdown the executor, including all RADICAL-Pilot components."""
         logger.info("RadicalPilotExecutor is terminating...")
+        self._term.set()
+        self._bulk_thread.join()
         self.session.close(download=True)
         logger.info("RadicalPilotExecutor is terminated.")

@andre-merzky this is addressed now with a PR here: Parsl/parsl#3718

@AymenFJA
Copy link
Contributor Author

This is addressed with the PR here: #3269. I will keep this open until Ben confirm that his tests are passing as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants