From 658619dfff45d041f5ac2b87f1b49c78205c63b3 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 14 Oct 2024 14:14:35 +0000 Subject: [PATCH 1/3] Count outstanding tasks on submit side rather than in interchange (#3637) The count of outstanding tasks may now be higher: A task will be counted when it enters the task submission ZMQ channel to the interchange, rather than when it reaches the end of the ZMQ channel (which could be a queue under load), and a task will be counted as complete when it arrives back on the submit side, rather than when the interchange places it into the results channel. It's probably more desirable to count the tasks earlier like this and less desirable to count the results later like this, for scaling purposes - scaling load will be observed before it reaches the interchange, but reduction in load due to completion won't be observed until the result has got all the way through the result queue. I don't think this will affect users in real life - but I don't have any numbers for what delivery times look like across the two queues under heavy load. This removes an RPC that could be computed locally, and removes a use of the command channel which is generally thread-unsafe. This removes part of shutdown hangs: when pressing ctrl-c at the right point, shutdown will wait for the strategy thread to shut down, but the strategy thread can get blocked on OUTSTANDING_C, and OUTSTANDING_C will never return because the ctrl-C also made the interchange go away. This shutdown block is hard to demonstrate in a CI test because there are other blocking actions (such as a MANAGERS rpc) involved in shutdown, and it seems to need the strategy thread to be in just the right place. Implements #3365 --- parsl/executors/high_throughput/executor.py | 2 +- parsl/executors/high_throughput/interchange.py | 8 +------- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 338bc57a4e..f2bd26bf5b 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -582,7 +582,7 @@ def hold_worker(self, worker_id: str) -> None: def outstanding(self) -> int: """Returns the count of tasks outstanding across the interchange and managers""" - return self.command_client.run("OUTSTANDING_C") + return len(self.tasks) @property def connected_workers(self) -> int: diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index d61c76fed2..17e5f1da9e 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -252,13 +252,7 @@ def _command_server(self) -> NoReturn: try: command_req = self.command_channel.recv_pyobj() logger.debug("Received command request: {}".format(command_req)) - if command_req == "OUTSTANDING_C": - outstanding = self.pending_task_queue.qsize() - for manager in self._ready_managers.values(): - outstanding += len(manager['tasks']) - reply = outstanding - - elif command_req == "CONNECTED_BLOCKS": + if command_req == "CONNECTED_BLOCKS": reply = self.connected_block_history elif command_req == "WORKERS": From 2eb05cdfd39a13dac3e0ae3a650f9d69145d8eec Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 14 Oct 2024 14:52:56 +0000 Subject: [PATCH 2/3] Don't forward worker heartbeats to the submit side (#3634) Prior to this PR, heartbeats sent from worker pools to the interchange on the results channel would be forwarded on to the submit side along the interchange to submit side results channel. These heartbeat messages would then be discarded on the submit side. I think these forwarded messages don't do anything, so this PR removes the forwarding. The network path between interchange and submit side is on localhost and unlikely to need any TCP level keepalives. There is no handling for missing heartbeats, and indeed no expectation of heartbeats arriving: an interchange might exist for many hours with no connected workers, and so no forwarded heartbeats. This PR should reduce message load on the submit side results channel by one message per work pool per heartbeat period. See issue #3464 for further context. --- parsl/executors/high_throughput/executor.py | 4 +--- parsl/executors/high_throughput/interchange.py | 1 - 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index f2bd26bf5b..06ced1d1f4 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -460,9 +460,7 @@ def _result_queue_worker(self): except pickle.UnpicklingError: raise BadMessage("Message received could not be unpickled") - if msg['type'] == 'heartbeat': - continue - elif msg['type'] == 'result': + if msg['type'] == 'result': try: tid = msg['task_id'] except Exception: diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index 17e5f1da9e..036aefa2cd 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -543,7 +543,6 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_ monitoring_radio.send(r['payload']) elif r['type'] == 'heartbeat': logger.debug("Manager %r sent heartbeat via results connection", manager_id) - b_messages.append((p_message, r)) else: logger.error("Interchange discarding result_queue message of unknown type: %s", r["type"]) From ae5e5f49fe74c89d365fec34f75a76ec9c1c1810 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 14 Oct 2024 15:20:14 +0000 Subject: [PATCH 3/3] Remove unneeded interchange SIGTERM handler (#3635) This was introduced in PR #2629 to guard against the submit process installing a SIGTERM handler and then that handler being unexpectedly inherited by the interchange via multiprocesssing fork PR #3463 changed the interchange to run as a fresh Python process, which will not inherit SIGTERM handlers, so since then this line has been vestigial. Fixes issue #3588 --- parsl/executors/high_throughput/interchange.py | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index 036aefa2cd..b0228b52f0 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -6,7 +6,6 @@ import pickle import platform import queue -import signal import sys import threading import time @@ -313,16 +312,6 @@ def start(self) -> None: """ Start the interchange """ - # If a user workflow has set its own signal handler for sigterm, that - # handler will be inherited by the interchange process because it is - # launched as a multiprocessing fork process. - # That can interfere with the interchange shutdown mechanism, which is - # to receive a SIGTERM and exit immediately. - # See Parsl issue #2343 (Threads and multiprocessing cannot be - # intermingled without deadlocks) which talks about other fork-related - # parent-process-inheritance problems. - signal.signal(signal.SIGTERM, signal.SIG_DFL) - logger.info("Starting main interchange method") if self.hub_address is not None and self.hub_zmq_port is not None: