Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move FluxExecutor ZMQ into thread and explicitly clean it up #3517

Merged
merged 3 commits into from
Jul 23, 2024

Conversation

benclifford
Copy link
Collaborator

@benclifford benclifford commented Jul 10, 2024

Prior to this PR, there were frequent hangs in CI at cleanup of the ZMQ objects used by the FluxExecutor. See issue #3484 for some more information.

This PR attempts to remove some dangerous behaviour there:

i) creation of ZMQ context and socket is moved into the thread which makes use of them - before this PR, the socket was created on the main thread and passed into the submission thread which uses it. This removes some thread safety issues where a socket cannot be safely moved between threads.

ii) ZMQ context and socket are more explicitly closed (using with-blocks) rather than leaving that to the garbage collector. In the hung tests, the ZMQ context was being garbage collected in the main thread, which is documented as being unsafe when sockets are open belonging to another thread (the submission thread)

On my laptop I could see a hang around 50% of test runs before this PR. After this PR, I have run about 100 iterations of the flux tests without seeing any hangs.

Fixes

Fixes #3484

Type of change

  • Bug fix

Prior to this PR, there were frequent hangs in CI at cleanup of the
ZMQ objects used by the FluxExecutor. See issue #3484 for some more
information.

This PR attempts to remove some dangerous behaviour there:

i) creation of ZMQ context and socket is moved into the thread which
makes use of them - previous the socket was created on the main thread
and passed into the submission thread which uses it. This removes some
thread safety issues where a socket cannot be safely moved between
threads.

ii) ZMQ context and socket are more explicitly closed (using with-blocks)
rather than leaving that to the garbage collector. In the hung tests,
the ZMQ context was being garbage collected in the main thread, which is
documented as being unsafe when sockets are open belonging to another
thread (the submission thread)

On my laptop I could see a hang around 50% of test runs before this PR.
After this PR, I have run about 100 iterations of the flux tests with seeing
any hangs.
@benclifford
Copy link
Collaborator Author

cc @jameshcorbett

Copy link
Contributor

@jameshcorbett jameshcorbett left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a big improvement, thank you so much for tracking this down!

@jameshcorbett
Copy link
Contributor

There is another instance of ZMQ sockets not being cleaned up in flux-executor-related code. I'm happy to open this as a separate PR if you like.

diff --git a/parsl/executors/flux/flux_instance_manager.py b/parsl/executors/flux/flux_instance_manager.py
index 3d760bb5..e6111796 100644
--- a/parsl/executors/flux/flux_instance_manager.py
+++ b/parsl/executors/flux/flux_instance_manager.py
@@ -27,30 +27,29 @@ def main():
     parser.add_argument("hostname", help="hostname of the parent executor's socket")
     parser.add_argument("port", help="Port of the parent executor's socket")
     args = parser.parse_args()
-    context = zmq.Context()
-    socket = context.socket(zmq.REQ)
-    socket.connect(
-        args.protocol + "://" + gethostbyname(args.hostname) + ":" + args.port
-    )
-    # send the path to the ``flux.job`` package
-    socket.send(dirname(dirname(os.path.realpath(flux.__file__))).encode())
-    logging.debug("Flux package path sent.")
-    # collect the encapsulating Flux instance's URI
-    local_uri = flux.Flux().attr_get("local-uri")
-    hostname = gethostname()
-    if args.hostname == hostname:
-        flux_uri = local_uri
-    else:
-        flux_uri = "ssh://" + gethostname() + local_uri.replace("local://", "")
-    logging.debug("Flux URI is %s", flux_uri)
-    response = socket.recv()  # get acknowledgment
-    logging.debug("Received acknowledgment %s", response)
-    socket.send(flux_uri.encode())  # send URI
-    logging.debug("URI sent. Blocking for response...")
-    response = socket.recv()  # wait for shutdown message
-    logging.debug("Response %s received, draining flux jobs...", response)
-    flux.Flux().rpc("job-manager.drain").get()
-    logging.debug("Flux jobs drained, exiting.")
+    with zmq.Context() as context, context.socket(zmq.REQ) as socket:
+        socket.connect(
+            args.protocol + "://" + gethostbyname(args.hostname) + ":" + args.port
+        )
+        # send the path to the ``flux.job`` package
+        socket.send(dirname(dirname(os.path.realpath(flux.__file__))).encode())
+        logging.debug("Flux package path sent.")
+        # collect the encapsulating Flux instance's URI
+        local_uri = flux.Flux().attr_get("local-uri")
+        hostname = gethostname()
+        if args.hostname == hostname:
+            flux_uri = local_uri
+        else:
+            flux_uri = "ssh://" + gethostname() + local_uri.replace("local://", "")
+        logging.debug("Flux URI is %s", flux_uri)
+        response = socket.recv()  # get acknowledgment
+        logging.debug("Received acknowledgment %s", response)
+        socket.send(flux_uri.encode())  # send URI
+        logging.debug("URI sent. Blocking for response...")
+        response = socket.recv()  # wait for shutdown message
+        logging.debug("Response %s received, draining flux jobs...", response)
+        flux.Flux().rpc("job-manager.drain").get()
+        logging.debug("Flux jobs drained, exiting.")
 
 
 if __name__ == "__main__":

@benclifford
Copy link
Collaborator Author

@jameshcorbett yeah please make a PR for that.

The main problem that I encounter with ZMQ in Parsl is to do with multithreading (and perhaps multiprocessing fork) and I think that isn't an issue in flux_instance_manager.py

@benclifford benclifford requested a review from yadudoc July 12, 2024 15:23
Copy link
Collaborator

@khk-globus khk-globus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good find. Once identified, this change is a no-brainer.

@benclifford benclifford merged commit 9798260 into master Jul 23, 2024
7 checks passed
@benclifford benclifford deleted the benc-flux-hang-2 branch July 23, 2024 17:28
benclifford added a commit that referenced this pull request Jul 31, 2024
Before this PR, this thread stays running forever

this *requires* socket to be closed at exit -- and this PR introduces code to do that

context:
see recent flux PR for same problem.
because stopping this thread is now allowing garbage collection to happen, it looks like? or something similar... see PR #3517 for the same problem in Flux


counts:
before this PR, on parsl/tests/test_monitoring/

451 fds, 32 threads

after this PR, 48 fds, 1 thread
benclifford added a commit that referenced this pull request Aug 8, 2024
Before this PR, this thread stays running forever

this *requires* socket to be closed at exit -- and this PR introduces code to do that

context:
see recent flux PR for same problem.
because stopping this thread is now allowing garbage collection to happen, it looks like? or something similar... see PR #3517 for the same problem in Flux


counts:
before this PR, on parsl/tests/test_monitoring/

451 fds, 32 threads

after this PR, 48 fds, 1 thread
benclifford added a commit that referenced this pull request Aug 16, 2024
Before this PR, this thread stays running forever

this *requires* socket to be closed at exit -- and this PR introduces code to do that

context:
see recent flux PR for same problem.
because stopping this thread is now allowing garbage collection to happen, it looks like? or something similar... see PR #3517 for the same problem in Flux


counts:
before this PR, on parsl/tests/test_monitoring/

451 fds, 32 threads

after this PR, 48 fds, 1 thread
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

flux-in-parsl-CI testing is very hangy
3 participants