Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add resilience for heartbeats from unknown managers #3643

Merged
merged 6 commits into from
Oct 25, 2024
Merged

Conversation

benclifford
Copy link
Collaborator

Description

Make interchange survive a heartbeat message from an unregistered manager.

#3262 and #3632 report situations where a heartbeat is received by the interchange after the heartbeat period has expired, and so the relevant manager has been unregistered. Before this PR, the interchange crashed when this happened. After this PR, it will log a warning.

This PR adds a test for such a heartbeat message as well as a couple of other bad protocol messages.

Fixes

Fixes #3262, fixes #3632

Type of change

  • Bug fix

make my own test: start a block, suspend it until it disappears by heartbeat, then continue it so it sends a heartbeat. assert that interchange doesn't crash / can still run stuff?
Copy link
Collaborator

@khk-globus khk-globus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks on target. I recognize that at least the 3.12 tests aren't passing now, but this basic thrust looks good, with just a couple of non-obligatory inline suggestions.

parsl/executors/high_throughput/interchange.py Outdated Show resolved Hide resolved
Comment on lines 15 to 16
T_s = 1

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clever; remove some of the magic from the hard-coded values.

parsl/tests/test_scaling/test_missing_heartbeat_3262.py Outdated Show resolved Hide resolved
Comment on lines 69 to 81
(task_port, result_port) = htex.command_client.run("WORKER_PORTS")

context = zmq.Context()
channel_timeout = 10000 # in milliseconds
task_channel = context.socket(zmq.DEALER)
task_channel.setsockopt(zmq.LINGER, 0)
task_channel.setsockopt(zmq.IDENTITY, b'testid')

task_channel.set_hwm(0)
task_channel.setsockopt(zmq.SNDTIMEO, channel_timeout)
task_channel.connect(f"tcp://localhost:{task_port}")

task_channel.send(msg)
Copy link
Collaborator

@khk-globus khk-globus Oct 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels super boiler-plate-y, and I wonder if other tests might (or could be refactored to) make use of it. Good for this test, but I'm wondering if this might be wrapped up into a fixture.

(Maybe not, but that's where my mind is going at the moment.)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a repeat of code in the process worker pool to initialize this same "tasks flowing from interchange to worker pool" zmq socket.

If refactoring, I think it would make sense to put that into its own module that is then usable by both the real process worker pool and tests that are mocking parts of the worker pool (what this test does) - along the lines of the zmq_pipes module at https://github.com/Parsl/parsl/blob/master/parsl/executors/high_throughput/zmq_pipes.py for the submit side.

@benclifford benclifford merged commit 6af844f into master Oct 25, 2024
7 checks passed
@benclifford benclifford deleted the benc-heartbeat branch October 25, 2024 12:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants