Skip to content

Commit

Permalink
Don't forward worker heartbeats to the submit side (#3634)
Browse files Browse the repository at this point in the history
Prior to this PR, heartbeats sent from worker pools to the interchange on
the results channel would be forwarded on to the submit side along the
interchange to submit side results channel.

These heartbeat messages would then be discarded on the submit side.

I think these forwarded messages don't do anything, so this PR removes the
forwarding.

The network path between interchange and submit side is on localhost and
unlikely to need any TCP level keepalives. There is no handling for missing
heartbeats, and indeed no expectation of heartbeats arriving: an interchange
might exist for many hours with no connected workers, and so no forwarded
heartbeats.

This PR should reduce message load on the submit side results channel by
one message per work pool per heartbeat period.

See issue #3464 for further context.
  • Loading branch information
benclifford authored Oct 14, 2024
1 parent 658619d commit 2eb05cd
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 4 deletions.
4 changes: 1 addition & 3 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,9 +460,7 @@ def _result_queue_worker(self):
except pickle.UnpicklingError:
raise BadMessage("Message received could not be unpickled")

if msg['type'] == 'heartbeat':
continue
elif msg['type'] == 'result':
if msg['type'] == 'result':
try:
tid = msg['task_id']
except Exception:
Expand Down
1 change: 0 additions & 1 deletion parsl/executors/high_throughput/interchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,6 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_
monitoring_radio.send(r['payload'])
elif r['type'] == 'heartbeat':
logger.debug("Manager %r sent heartbeat via results connection", manager_id)
b_messages.append((p_message, r))
else:
logger.error("Interchange discarding result_queue message of unknown type: %s", r["type"])

Expand Down

0 comments on commit 2eb05cd

Please sign in to comment.