From 2981a287bbffc4aead9f70e2a427c1def3fa9f36 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Wed, 31 Jul 2024 13:17:07 +0200 Subject: [PATCH 1/6] Move monitoring router parameters into object attributes (#3521) This is to support rearrangement of the structure of the router code into multiple threads and methods, without having to manually wire all of the multiprocessing objects between the new methods and threads. These objects are part of the context of the router object, rather than parameters to individual methods which might change, and they are all multiprocessing objects which are thread-safe. --- parsl/monitoring/router.py | 49 +++++++++++++++++++++++++------------- 1 file changed, 32 insertions(+), 17 deletions(-) diff --git a/parsl/monitoring/router.py b/parsl/monitoring/router.py index 70b4862295..9a422027c1 100644 --- a/parsl/monitoring/router.py +++ b/parsl/monitoring/router.py @@ -32,7 +32,12 @@ def __init__(self, logdir: str = ".", run_id: str, logging_level: int = logging.INFO, - atexit_timeout: int = 3 # in seconds + atexit_timeout: int = 3, # in seconds + priority_msgs: "queue.Queue[AddressedMonitoringMessage]", + node_msgs: "queue.Queue[AddressedMonitoringMessage]", + block_msgs: "queue.Queue[AddressedMonitoringMessage]", + resource_msgs: "queue.Queue[AddressedMonitoringMessage]", + exit_event: Event, ): """ Initializes a monitoring configuration class. @@ -51,7 +56,11 @@ def __init__(self, Logging level as defined in the logging module. Default: logging.INFO atexit_timeout : float, optional The amount of time in seconds to terminate the hub without receiving any messages, after the last dfk workflow message is received. + *_msgs : Queue + Four multiprocessing queues to receive messages, routed by type tag, and sometimes modified according to type tag. + exit_event : Event + An event that the main Parsl process will set to signal that the monitoring router should shut down. """ os.makedirs(logdir, exist_ok=True) self.logger = set_file_logger("{}/monitoring_router.log".format(logdir), @@ -93,19 +102,20 @@ def __init__(self, min_port=zmq_port_range[0], max_port=zmq_port_range[1]) - def start(self, - priority_msgs: "queue.Queue[AddressedMonitoringMessage]", - node_msgs: "queue.Queue[AddressedMonitoringMessage]", - block_msgs: "queue.Queue[AddressedMonitoringMessage]", - resource_msgs: "queue.Queue[AddressedMonitoringMessage]", - exit_event: Event) -> None: + self.priority_msgs = priority_msgs + self.node_msgs = node_msgs + self.block_msgs = block_msgs + self.resource_msgs = resource_msgs + self.exit_event = exit_event + + def start(self) -> None: try: - while not exit_event.is_set(): + while not self.exit_event.is_set(): try: data, addr = self.udp_sock.recvfrom(2048) resource_msg = pickle.loads(data) self.logger.debug("Got UDP Message from {}: {}".format(addr, resource_msg)) - resource_msgs.put((resource_msg, addr)) + self.resource_msgs.put((resource_msg, addr)) except socket.timeout: pass @@ -125,15 +135,15 @@ def start(self, if msg[0] == MessageType.NODE_INFO: msg[1]['run_id'] = self.run_id - node_msgs.put(msg_0) + self.node_msgs.put(msg_0) elif msg[0] == MessageType.RESOURCE_INFO: - resource_msgs.put(msg_0) + self.resource_msgs.put(msg_0) elif msg[0] == MessageType.BLOCK_INFO: - block_msgs.put(msg_0) + self.block_msgs.put(msg_0) elif msg[0] == MessageType.TASK_INFO: - priority_msgs.put(msg_0) + self.priority_msgs.put(msg_0) elif msg[0] == MessageType.WORKFLOW_INFO: - priority_msgs.put(msg_0) + self.priority_msgs.put(msg_0) else: # There is a type: ignore here because if msg[0] # is of the correct type, this code is unreachable, @@ -158,7 +168,7 @@ def start(self, data, addr = self.udp_sock.recvfrom(2048) msg = pickle.loads(data) self.logger.debug("Got UDP Message from {}: {}".format(addr, msg)) - resource_msgs.put((msg, addr)) + self.resource_msgs.put((msg, addr)) last_msg_received_time = time.time() except socket.timeout: pass @@ -191,7 +201,12 @@ def router_starter(comm_q: "queue.Queue[Union[Tuple[int, int], str]]", zmq_port_range=zmq_port_range, logdir=logdir, logging_level=logging_level, - run_id=run_id) + run_id=run_id, + priority_msgs=priority_msgs, + node_msgs=node_msgs, + block_msgs=block_msgs, + resource_msgs=resource_msgs, + exit_event=exit_event) except Exception as e: logger.error("MonitoringRouter construction failed.", exc_info=True) comm_q.put(f"Monitoring router construction failed: {e}") @@ -200,7 +215,7 @@ def router_starter(comm_q: "queue.Queue[Union[Tuple[int, int], str]]", router.logger.info("Starting MonitoringRouter in router_starter") try: - router.start(priority_msgs, node_msgs, block_msgs, resource_msgs, exit_event) + router.start() except Exception as e: router.logger.exception("router.start exception") exception_q.put(('Hub', str(e))) From 9c982c5c3bca64205ac57fed93fae1a8ad365d3c Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Wed, 31 Jul 2024 13:47:25 +0200 Subject: [PATCH 2/6] Add type annotation and error log to _filter_scale_in_ids (#3549) Especially this log message is intended to help user understanding when Parsl is not scaling in as they expected - before this PR, any blocks marked as not-scaled-in were not reported to the user (perhaps on the assumption that it might work next time round?) --- parsl/executors/status_handling.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index 772fc9a69a..1e4ea3c0b4 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -167,10 +167,18 @@ def tasks(self) -> Dict[object, Future]: def provider(self): return self._provider - def _filter_scale_in_ids(self, to_kill, killed): + def _filter_scale_in_ids(self, to_kill: Sequence[Any], killed: Sequence[bool]) -> Sequence[Any]: """ Filter out job id's that were not killed """ assert len(to_kill) == len(killed) + + if False in killed: + killed_job_ids = [jid for jid, k in zip(to_kill, killed) if k] + not_killed_job_ids = [jid for jid, k in zip(to_kill, killed) if not k] + logger.warning("Some jobs were not killed successfully: " + f"killed jobs: {killed_job_ids}, " + f"not-killed jobs: {not_killed_job_ids}") + # Filters first iterable by bool values in second return list(compress(to_kill, killed)) From 7867b576365ff46ec2613ebe47bc4ad2778493b7 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 1 Aug 2024 09:45:26 +0200 Subject: [PATCH 3/6] Use caplog fixure rather than mock logging when testing HTEX shutdown (#3559) This replaces mock-captured call sequences which are over-testing the exact position of tested log calls in relation to any other log calls, which makes this test fragile when working on logs in the affected area of the code. This is consistent with other parts of the test suite which test log messages using caplog. --- parsl/tests/test_htex/test_htex.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/parsl/tests/test_htex/test_htex.py b/parsl/tests/test_htex/test_htex.py index fca68c3c2f..80a4e91bd5 100644 --- a/parsl/tests/test_htex/test_htex.py +++ b/parsl/tests/test_htex/test_htex.py @@ -71,12 +71,11 @@ def test_htex_start_encrypted( @pytest.mark.local @pytest.mark.parametrize("started", (True, False)) @pytest.mark.parametrize("timeout_expires", (True, False)) -@mock.patch(f"{_MOCK_BASE}.logger") def test_htex_shutdown( - mock_logger: mock.MagicMock, started: bool, timeout_expires: bool, htex: HighThroughputExecutor, + caplog ): mock_ix_proc = mock.Mock(spec=Popen) @@ -110,20 +109,19 @@ def kill_interchange(*args, **kwargs): htex.shutdown() - mock_logs = mock_logger.info.call_args_list if started: assert mock_ix_proc.terminate.called assert mock_ix_proc.wait.called assert {"timeout": 10} == mock_ix_proc.wait.call_args[1] if timeout_expires: - assert "Unable to terminate Interchange" in mock_logs[1][0][0] + assert "Unable to terminate Interchange" in caplog.text assert mock_ix_proc.kill.called - assert "Attempting" in mock_logs[0][0][0] - assert "Finished" in mock_logs[-1][0][0] + assert "Attempting HighThroughputExecutor shutdown" in caplog.text + assert "Finished HighThroughputExecutor shutdown" in caplog.text else: assert not mock_ix_proc.terminate.called assert not mock_ix_proc.wait.called - assert "has not started" in mock_logs[0][0][0] + assert "HighThroughputExecutor has not started" in caplog.text @pytest.mark.local From 8d606c93af6a7de4df8c60b99752aa4176e57096 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 1 Aug 2024 10:49:31 +0200 Subject: [PATCH 4/6] Rename submit-side monitoring radio for clarification (#3557) A Parsl executor is configured with a submit-side monitoring radio sender, which is used by the BlockProviderExecutor to send block status messages to the monitoring subsystem. Parsl executors also have a notion of a remote monitoring radio, used by remote workers to sending monitoring messages. This can be confusing when both of these radio senders are referred to in the same piece of code, as happened in ongoing monitoring plugin development in draft PR #3315. This PR is intended to make this sitution much less ambiguous by avoiding the mention of a monitoring radio in executor code without qualifying whether it is a submit-side or remote-worker-side radio definition. A future PR from the #3315 stack will introduce other monitoring radio references with the remote prefix, replacing the current radio_mode and related attributes. --- parsl/dataflow/dflow.py | 2 +- parsl/executors/base.py | 14 +++++++------- parsl/executors/status_handling.py | 4 ++-- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index a62a2261d0..88ef063230 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -1180,7 +1180,7 @@ def add_executors(self, executors: Sequence[ParslExecutor]) -> None: if self.monitoring: executor.hub_address = self.monitoring.hub_address executor.hub_zmq_port = self.monitoring.hub_zmq_port - executor.monitoring_radio = self.monitoring.radio + executor.submit_monitoring_radio = self.monitoring.radio if hasattr(executor, 'provider'): if hasattr(executor.provider, 'script_dir'): executor.provider.script_dir = os.path.join(self.run_dir, 'submit_scripts') diff --git a/parsl/executors/base.py b/parsl/executors/base.py index 941f392e9f..a112b9eb00 100644 --- a/parsl/executors/base.py +++ b/parsl/executors/base.py @@ -52,13 +52,13 @@ def __init__( *, hub_address: Optional[str] = None, hub_zmq_port: Optional[int] = None, - monitoring_radio: Optional[MonitoringRadioSender] = None, + submit_monitoring_radio: Optional[MonitoringRadioSender] = None, run_dir: str = ".", run_id: Optional[str] = None, ): self.hub_address = hub_address self.hub_zmq_port = hub_zmq_port - self.monitoring_radio = monitoring_radio + self.submit_monitoring_radio = submit_monitoring_radio self.run_dir = os.path.abspath(run_dir) self.run_id = run_id @@ -147,11 +147,11 @@ def hub_zmq_port(self, value: Optional[int]) -> None: self._hub_zmq_port = value @property - def monitoring_radio(self) -> Optional[MonitoringRadioSender]: + def submit_monitoring_radio(self) -> Optional[MonitoringRadioSender]: """Local radio for sending monitoring messages """ - return self._monitoring_radio + return self._submit_monitoring_radio - @monitoring_radio.setter - def monitoring_radio(self, value: Optional[MonitoringRadioSender]) -> None: - self._monitoring_radio = value + @submit_monitoring_radio.setter + def submit_monitoring_radio(self, value: Optional[MonitoringRadioSender]) -> None: + self._submit_monitoring_radio = value diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index 1e4ea3c0b4..0f7ed90592 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -269,10 +269,10 @@ def workers_per_node(self) -> Union[int, float]: def send_monitoring_info(self, status: Dict) -> None: # Send monitoring info for HTEX when monitoring enabled - if self.monitoring_radio: + if self.submit_monitoring_radio: msg = self.create_monitoring_info(status) logger.debug("Sending block monitoring message: %r", msg) - self.monitoring_radio.send((MessageType.BLOCK_INFO, msg)) + self.submit_monitoring_radio.send((MessageType.BLOCK_INFO, msg)) def create_monitoring_info(self, status: Dict[str, JobStatus]) -> Sequence[object]: """Create a monitoring message for each block based on the poll status. From 5ee584d26b1dcb0d22ab07de37df3cdd9be1248d Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 1 Aug 2024 11:17:40 +0200 Subject: [PATCH 5/6] Only scale in blocks that are in _status and non-terminal (#3548) In the BlockProviderExecutor, the block ID to job ID mapping structures contain the full historical list of blocks. Prior to this PR, the mapping was used as source of current jobs that should/could be scaled in. This was incorrect. and resulted in scaling in code attempting to: scale in blocks that had already finished, because it continues to see those blocks as eligible for scale-in not scale in blocks that were active - because rather than choosing to scale in an alive block, the code would choose to attempt to scale in a non-alive block After this PR, the _status structure which should contain reasonably up to date status information is used instead of the block/job ID mapping structures. (as a more general principle, those block/job ID mapping structures should never be examined as a whole but only used for mapping) Changed Behaviour: Scaling in should work better in executors using the default scaling in that was refactored in PR #3526, which right now is Work Queue and Task Vine. Fixes #3471 --- parsl/executors/status_handling.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index 0f7ed90592..200b43cc41 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -12,7 +12,7 @@ from parsl.executors.base import ParslExecutor from parsl.executors.errors import BadStateException, ScalingFailed from parsl.jobs.error_handlers import noop_error_handler, simple_error_handler -from parsl.jobs.states import JobState, JobStatus +from parsl.jobs.states import TERMINAL_STATES, JobState, JobStatus from parsl.monitoring.message_type import MessageType from parsl.providers.base import ExecutionProvider from parsl.utils import AtomicIDCounter @@ -222,16 +222,20 @@ def scale_in(self, blocks: int) -> List[str]: :return: A list of block ids corresponding to the blocks that were removed. """ - # Obtain list of blocks to kill - to_kill = list(self.blocks_to_job_id.keys())[:blocks] - kill_ids = [self.blocks_to_job_id[block] for block in to_kill] + + active_blocks = [block_id for block_id, status in self._status.items() + if status.state not in TERMINAL_STATES] + + block_ids_to_kill = active_blocks[:blocks] + + job_ids_to_kill = [self.blocks_to_job_id[block] for block in block_ids_to_kill] # Cancel the blocks provisioned if self.provider: - logger.info(f"Scaling in jobs: {kill_ids}") - r = self.provider.cancel(kill_ids) - job_ids = self._filter_scale_in_ids(kill_ids, r) - block_ids_killed = [self.job_ids_to_block[jid] for jid in job_ids] + logger.info(f"Scaling in jobs: {job_ids_to_kill}") + r = self.provider.cancel(job_ids_to_kill) + job_ids = self._filter_scale_in_ids(job_ids_to_kill, r) + block_ids_killed = [self.job_ids_to_block[job_id] for job_id in job_ids] return block_ids_killed else: logger.error("No execution provider available to scale in") From a24bc932117c1ba50701992b41efdda79f3e3eed Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 1 Aug 2024 11:48:57 +0200 Subject: [PATCH 6/6] Test UDP monitoring radio (#3555) Before this PR, this radio was omitted because all of executors in the basic monitoring test had over time been switched to different defaults. --- parsl/tests/test_monitoring/test_basic.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/parsl/tests/test_monitoring/test_basic.py b/parsl/tests/test_monitoring/test_basic.py index c900670ec8..1c792a9d82 100644 --- a/parsl/tests/test_monitoring/test_basic.py +++ b/parsl/tests/test_monitoring/test_basic.py @@ -25,10 +25,23 @@ def this_app(): # a configuration that is suitably configured for monitoring. def htex_config(): + """This config will use htex's default htex-specific monitoring radio mode""" from parsl.tests.configs.htex_local_alternate import fresh_config return fresh_config() +def htex_udp_config(): + """This config will force UDP""" + from parsl.tests.configs.htex_local_alternate import fresh_config + c = fresh_config() + assert len(c.executors) == 1 + + assert c.executors[0].radio_mode == "htex", "precondition: htex has a radio mode attribute, configured for htex radio" + c.executors[0].radio_mode = "udp" + + return c + + def workqueue_config(): from parsl.tests.configs.workqueue_ex import fresh_config c = fresh_config() @@ -48,7 +61,7 @@ def taskvine_config(): @pytest.mark.local -@pytest.mark.parametrize("fresh_config", [htex_config, workqueue_config, taskvine_config]) +@pytest.mark.parametrize("fresh_config", [htex_config, htex_udp_config, workqueue_config, taskvine_config]) def test_row_counts(tmpd_cwd, fresh_config): # this is imported here rather than at module level because # it isn't available in a plain parsl install, so this module