-
Notifications
You must be signed in to change notification settings - Fork 199
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Move FluxExecutor ZMQ into thread and explicitly clean it up #3517
Conversation
Prior to this PR, there were frequent hangs in CI at cleanup of the ZMQ objects used by the FluxExecutor. See issue #3484 for some more information. This PR attempts to remove some dangerous behaviour there: i) creation of ZMQ context and socket is moved into the thread which makes use of them - previous the socket was created on the main thread and passed into the submission thread which uses it. This removes some thread safety issues where a socket cannot be safely moved between threads. ii) ZMQ context and socket are more explicitly closed (using with-blocks) rather than leaving that to the garbage collector. In the hung tests, the ZMQ context was being garbage collected in the main thread, which is documented as being unsafe when sockets are open belonging to another thread (the submission thread) On my laptop I could see a hang around 50% of test runs before this PR. After this PR, I have run about 100 iterations of the flux tests with seeing any hangs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like a big improvement, thank you so much for tracking this down!
There is another instance of ZMQ sockets not being cleaned up in flux-executor-related code. I'm happy to open this as a separate PR if you like. diff --git a/parsl/executors/flux/flux_instance_manager.py b/parsl/executors/flux/flux_instance_manager.py
index 3d760bb5..e6111796 100644
--- a/parsl/executors/flux/flux_instance_manager.py
+++ b/parsl/executors/flux/flux_instance_manager.py
@@ -27,30 +27,29 @@ def main():
parser.add_argument("hostname", help="hostname of the parent executor's socket")
parser.add_argument("port", help="Port of the parent executor's socket")
args = parser.parse_args()
- context = zmq.Context()
- socket = context.socket(zmq.REQ)
- socket.connect(
- args.protocol + "://" + gethostbyname(args.hostname) + ":" + args.port
- )
- # send the path to the ``flux.job`` package
- socket.send(dirname(dirname(os.path.realpath(flux.__file__))).encode())
- logging.debug("Flux package path sent.")
- # collect the encapsulating Flux instance's URI
- local_uri = flux.Flux().attr_get("local-uri")
- hostname = gethostname()
- if args.hostname == hostname:
- flux_uri = local_uri
- else:
- flux_uri = "ssh://" + gethostname() + local_uri.replace("local://", "")
- logging.debug("Flux URI is %s", flux_uri)
- response = socket.recv() # get acknowledgment
- logging.debug("Received acknowledgment %s", response)
- socket.send(flux_uri.encode()) # send URI
- logging.debug("URI sent. Blocking for response...")
- response = socket.recv() # wait for shutdown message
- logging.debug("Response %s received, draining flux jobs...", response)
- flux.Flux().rpc("job-manager.drain").get()
- logging.debug("Flux jobs drained, exiting.")
+ with zmq.Context() as context, context.socket(zmq.REQ) as socket:
+ socket.connect(
+ args.protocol + "://" + gethostbyname(args.hostname) + ":" + args.port
+ )
+ # send the path to the ``flux.job`` package
+ socket.send(dirname(dirname(os.path.realpath(flux.__file__))).encode())
+ logging.debug("Flux package path sent.")
+ # collect the encapsulating Flux instance's URI
+ local_uri = flux.Flux().attr_get("local-uri")
+ hostname = gethostname()
+ if args.hostname == hostname:
+ flux_uri = local_uri
+ else:
+ flux_uri = "ssh://" + gethostname() + local_uri.replace("local://", "")
+ logging.debug("Flux URI is %s", flux_uri)
+ response = socket.recv() # get acknowledgment
+ logging.debug("Received acknowledgment %s", response)
+ socket.send(flux_uri.encode()) # send URI
+ logging.debug("URI sent. Blocking for response...")
+ response = socket.recv() # wait for shutdown message
+ logging.debug("Response %s received, draining flux jobs...", response)
+ flux.Flux().rpc("job-manager.drain").get()
+ logging.debug("Flux jobs drained, exiting.")
if __name__ == "__main__": |
@jameshcorbett yeah please make a PR for that. The main problem that I encounter with ZMQ in Parsl is to do with multithreading (and perhaps multiprocessing fork) and I think that isn't an issue in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good find. Once identified, this change is a no-brainer.
Before this PR, this thread stays running forever this *requires* socket to be closed at exit -- and this PR introduces code to do that context: see recent flux PR for same problem. because stopping this thread is now allowing garbage collection to happen, it looks like? or something similar... see PR #3517 for the same problem in Flux counts: before this PR, on parsl/tests/test_monitoring/ 451 fds, 32 threads after this PR, 48 fds, 1 thread
Before this PR, this thread stays running forever this *requires* socket to be closed at exit -- and this PR introduces code to do that context: see recent flux PR for same problem. because stopping this thread is now allowing garbage collection to happen, it looks like? or something similar... see PR #3517 for the same problem in Flux counts: before this PR, on parsl/tests/test_monitoring/ 451 fds, 32 threads after this PR, 48 fds, 1 thread
Before this PR, this thread stays running forever this *requires* socket to be closed at exit -- and this PR introduces code to do that context: see recent flux PR for same problem. because stopping this thread is now allowing garbage collection to happen, it looks like? or something similar... see PR #3517 for the same problem in Flux counts: before this PR, on parsl/tests/test_monitoring/ 451 fds, 32 threads after this PR, 48 fds, 1 thread
Prior to this PR, there were frequent hangs in CI at cleanup of the ZMQ objects used by the FluxExecutor. See issue #3484 for some more information.
This PR attempts to remove some dangerous behaviour there:
i) creation of ZMQ context and socket is moved into the thread which makes use of them - before this PR, the socket was created on the main thread and passed into the submission thread which uses it. This removes some thread safety issues where a socket cannot be safely moved between threads.
ii) ZMQ context and socket are more explicitly closed (using with-blocks) rather than leaving that to the garbage collector. In the hung tests, the ZMQ context was being garbage collected in the main thread, which is documented as being unsafe when sockets are open belonging to another thread (the submission thread)
On my laptop I could see a hang around 50% of test runs before this PR. After this PR, I have run about 100 iterations of the flux tests without seeing any hangs.
Fixes
Fixes #3484
Type of change