-
Notifications
You must be signed in to change notification settings - Fork 199
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add resilience for heartbeats from unknown managers #3643
Conversation
make my own test: start a block, suspend it until it disappears by heartbeat, then continue it so it sends a heartbeat. assert that interchange doesn't crash / can still run stuff?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks on target. I recognize that at least the 3.12 tests aren't passing now, but this basic thrust looks good, with just a couple of non-obligatory inline suggestions.
T_s = 1 | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clever; remove some of the magic from the hard-coded values.
(task_port, result_port) = htex.command_client.run("WORKER_PORTS") | ||
|
||
context = zmq.Context() | ||
channel_timeout = 10000 # in milliseconds | ||
task_channel = context.socket(zmq.DEALER) | ||
task_channel.setsockopt(zmq.LINGER, 0) | ||
task_channel.setsockopt(zmq.IDENTITY, b'testid') | ||
|
||
task_channel.set_hwm(0) | ||
task_channel.setsockopt(zmq.SNDTIMEO, channel_timeout) | ||
task_channel.connect(f"tcp://localhost:{task_port}") | ||
|
||
task_channel.send(msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels super boiler-plate-y, and I wonder if other tests might (or could be refactored to) make use of it. Good for this test, but I'm wondering if this might be wrapped up into a fixture.
(Maybe not, but that's where my mind is going at the moment.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's a repeat of code in the process worker pool to initialize this same "tasks flowing from interchange to worker pool" zmq socket.
If refactoring, I think it would make sense to put that into its own module that is then usable by both the real process worker pool and tests that are mocking parts of the worker pool (what this test does) - along the lines of the zmq_pipes module at https://github.com/Parsl/parsl/blob/master/parsl/executors/high_throughput/zmq_pipes.py for the submit side.
Description
Make interchange survive a heartbeat message from an unregistered manager.
#3262 and #3632 report situations where a heartbeat is received by the interchange after the heartbeat period has expired, and so the relevant manager has been unregistered. Before this PR, the interchange crashed when this happened. After this PR, it will log a warning.
This PR adds a test for such a heartbeat message as well as a couple of other bad protocol messages.
Fixes
Fixes #3262, fixes #3632
Type of change