From 7867b576365ff46ec2613ebe47bc4ad2778493b7 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 1 Aug 2024 09:45:26 +0200 Subject: [PATCH 01/11] Use caplog fixure rather than mock logging when testing HTEX shutdown (#3559) This replaces mock-captured call sequences which are over-testing the exact position of tested log calls in relation to any other log calls, which makes this test fragile when working on logs in the affected area of the code. This is consistent with other parts of the test suite which test log messages using caplog. --- parsl/tests/test_htex/test_htex.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/parsl/tests/test_htex/test_htex.py b/parsl/tests/test_htex/test_htex.py index fca68c3c2f..80a4e91bd5 100644 --- a/parsl/tests/test_htex/test_htex.py +++ b/parsl/tests/test_htex/test_htex.py @@ -71,12 +71,11 @@ def test_htex_start_encrypted( @pytest.mark.local @pytest.mark.parametrize("started", (True, False)) @pytest.mark.parametrize("timeout_expires", (True, False)) -@mock.patch(f"{_MOCK_BASE}.logger") def test_htex_shutdown( - mock_logger: mock.MagicMock, started: bool, timeout_expires: bool, htex: HighThroughputExecutor, + caplog ): mock_ix_proc = mock.Mock(spec=Popen) @@ -110,20 +109,19 @@ def kill_interchange(*args, **kwargs): htex.shutdown() - mock_logs = mock_logger.info.call_args_list if started: assert mock_ix_proc.terminate.called assert mock_ix_proc.wait.called assert {"timeout": 10} == mock_ix_proc.wait.call_args[1] if timeout_expires: - assert "Unable to terminate Interchange" in mock_logs[1][0][0] + assert "Unable to terminate Interchange" in caplog.text assert mock_ix_proc.kill.called - assert "Attempting" in mock_logs[0][0][0] - assert "Finished" in mock_logs[-1][0][0] + assert "Attempting HighThroughputExecutor shutdown" in caplog.text + assert "Finished HighThroughputExecutor shutdown" in caplog.text else: assert not mock_ix_proc.terminate.called assert not mock_ix_proc.wait.called - assert "has not started" in mock_logs[0][0][0] + assert "HighThroughputExecutor has not started" in caplog.text @pytest.mark.local From 8d606c93af6a7de4df8c60b99752aa4176e57096 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 1 Aug 2024 10:49:31 +0200 Subject: [PATCH 02/11] Rename submit-side monitoring radio for clarification (#3557) A Parsl executor is configured with a submit-side monitoring radio sender, which is used by the BlockProviderExecutor to send block status messages to the monitoring subsystem. Parsl executors also have a notion of a remote monitoring radio, used by remote workers to sending monitoring messages. This can be confusing when both of these radio senders are referred to in the same piece of code, as happened in ongoing monitoring plugin development in draft PR #3315. This PR is intended to make this sitution much less ambiguous by avoiding the mention of a monitoring radio in executor code without qualifying whether it is a submit-side or remote-worker-side radio definition. A future PR from the #3315 stack will introduce other monitoring radio references with the remote prefix, replacing the current radio_mode and related attributes. --- parsl/dataflow/dflow.py | 2 +- parsl/executors/base.py | 14 +++++++------- parsl/executors/status_handling.py | 4 ++-- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index a62a2261d0..88ef063230 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -1180,7 +1180,7 @@ def add_executors(self, executors: Sequence[ParslExecutor]) -> None: if self.monitoring: executor.hub_address = self.monitoring.hub_address executor.hub_zmq_port = self.monitoring.hub_zmq_port - executor.monitoring_radio = self.monitoring.radio + executor.submit_monitoring_radio = self.monitoring.radio if hasattr(executor, 'provider'): if hasattr(executor.provider, 'script_dir'): executor.provider.script_dir = os.path.join(self.run_dir, 'submit_scripts') diff --git a/parsl/executors/base.py b/parsl/executors/base.py index 941f392e9f..a112b9eb00 100644 --- a/parsl/executors/base.py +++ b/parsl/executors/base.py @@ -52,13 +52,13 @@ def __init__( *, hub_address: Optional[str] = None, hub_zmq_port: Optional[int] = None, - monitoring_radio: Optional[MonitoringRadioSender] = None, + submit_monitoring_radio: Optional[MonitoringRadioSender] = None, run_dir: str = ".", run_id: Optional[str] = None, ): self.hub_address = hub_address self.hub_zmq_port = hub_zmq_port - self.monitoring_radio = monitoring_radio + self.submit_monitoring_radio = submit_monitoring_radio self.run_dir = os.path.abspath(run_dir) self.run_id = run_id @@ -147,11 +147,11 @@ def hub_zmq_port(self, value: Optional[int]) -> None: self._hub_zmq_port = value @property - def monitoring_radio(self) -> Optional[MonitoringRadioSender]: + def submit_monitoring_radio(self) -> Optional[MonitoringRadioSender]: """Local radio for sending monitoring messages """ - return self._monitoring_radio + return self._submit_monitoring_radio - @monitoring_radio.setter - def monitoring_radio(self, value: Optional[MonitoringRadioSender]) -> None: - self._monitoring_radio = value + @submit_monitoring_radio.setter + def submit_monitoring_radio(self, value: Optional[MonitoringRadioSender]) -> None: + self._submit_monitoring_radio = value diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index 1e4ea3c0b4..0f7ed90592 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -269,10 +269,10 @@ def workers_per_node(self) -> Union[int, float]: def send_monitoring_info(self, status: Dict) -> None: # Send monitoring info for HTEX when monitoring enabled - if self.monitoring_radio: + if self.submit_monitoring_radio: msg = self.create_monitoring_info(status) logger.debug("Sending block monitoring message: %r", msg) - self.monitoring_radio.send((MessageType.BLOCK_INFO, msg)) + self.submit_monitoring_radio.send((MessageType.BLOCK_INFO, msg)) def create_monitoring_info(self, status: Dict[str, JobStatus]) -> Sequence[object]: """Create a monitoring message for each block based on the poll status. From 5ee584d26b1dcb0d22ab07de37df3cdd9be1248d Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 1 Aug 2024 11:17:40 +0200 Subject: [PATCH 03/11] Only scale in blocks that are in _status and non-terminal (#3548) In the BlockProviderExecutor, the block ID to job ID mapping structures contain the full historical list of blocks. Prior to this PR, the mapping was used as source of current jobs that should/could be scaled in. This was incorrect. and resulted in scaling in code attempting to: scale in blocks that had already finished, because it continues to see those blocks as eligible for scale-in not scale in blocks that were active - because rather than choosing to scale in an alive block, the code would choose to attempt to scale in a non-alive block After this PR, the _status structure which should contain reasonably up to date status information is used instead of the block/job ID mapping structures. (as a more general principle, those block/job ID mapping structures should never be examined as a whole but only used for mapping) Changed Behaviour: Scaling in should work better in executors using the default scaling in that was refactored in PR #3526, which right now is Work Queue and Task Vine. Fixes #3471 --- parsl/executors/status_handling.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index 0f7ed90592..200b43cc41 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -12,7 +12,7 @@ from parsl.executors.base import ParslExecutor from parsl.executors.errors import BadStateException, ScalingFailed from parsl.jobs.error_handlers import noop_error_handler, simple_error_handler -from parsl.jobs.states import JobState, JobStatus +from parsl.jobs.states import TERMINAL_STATES, JobState, JobStatus from parsl.monitoring.message_type import MessageType from parsl.providers.base import ExecutionProvider from parsl.utils import AtomicIDCounter @@ -222,16 +222,20 @@ def scale_in(self, blocks: int) -> List[str]: :return: A list of block ids corresponding to the blocks that were removed. """ - # Obtain list of blocks to kill - to_kill = list(self.blocks_to_job_id.keys())[:blocks] - kill_ids = [self.blocks_to_job_id[block] for block in to_kill] + + active_blocks = [block_id for block_id, status in self._status.items() + if status.state not in TERMINAL_STATES] + + block_ids_to_kill = active_blocks[:blocks] + + job_ids_to_kill = [self.blocks_to_job_id[block] for block in block_ids_to_kill] # Cancel the blocks provisioned if self.provider: - logger.info(f"Scaling in jobs: {kill_ids}") - r = self.provider.cancel(kill_ids) - job_ids = self._filter_scale_in_ids(kill_ids, r) - block_ids_killed = [self.job_ids_to_block[jid] for jid in job_ids] + logger.info(f"Scaling in jobs: {job_ids_to_kill}") + r = self.provider.cancel(job_ids_to_kill) + job_ids = self._filter_scale_in_ids(job_ids_to_kill, r) + block_ids_killed = [self.job_ids_to_block[job_id] for job_id in job_ids] return block_ids_killed else: logger.error("No execution provider available to scale in") From a24bc932117c1ba50701992b41efdda79f3e3eed Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 1 Aug 2024 11:48:57 +0200 Subject: [PATCH 04/11] Test UDP monitoring radio (#3555) Before this PR, this radio was omitted because all of executors in the basic monitoring test had over time been switched to different defaults. --- parsl/tests/test_monitoring/test_basic.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/parsl/tests/test_monitoring/test_basic.py b/parsl/tests/test_monitoring/test_basic.py index c900670ec8..1c792a9d82 100644 --- a/parsl/tests/test_monitoring/test_basic.py +++ b/parsl/tests/test_monitoring/test_basic.py @@ -25,10 +25,23 @@ def this_app(): # a configuration that is suitably configured for monitoring. def htex_config(): + """This config will use htex's default htex-specific monitoring radio mode""" from parsl.tests.configs.htex_local_alternate import fresh_config return fresh_config() +def htex_udp_config(): + """This config will force UDP""" + from parsl.tests.configs.htex_local_alternate import fresh_config + c = fresh_config() + assert len(c.executors) == 1 + + assert c.executors[0].radio_mode == "htex", "precondition: htex has a radio mode attribute, configured for htex radio" + c.executors[0].radio_mode = "udp" + + return c + + def workqueue_config(): from parsl.tests.configs.workqueue_ex import fresh_config c = fresh_config() @@ -48,7 +61,7 @@ def taskvine_config(): @pytest.mark.local -@pytest.mark.parametrize("fresh_config", [htex_config, workqueue_config, taskvine_config]) +@pytest.mark.parametrize("fresh_config", [htex_config, htex_udp_config, workqueue_config, taskvine_config]) def test_row_counts(tmpd_cwd, fresh_config): # this is imported here rather than at module level because # it isn't available in a plain parsl install, so this module From 11d88db8aa63d7228681b3a08f384a8a0088f63c Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 1 Aug 2024 13:32:15 +0200 Subject: [PATCH 05/11] Inline _scale_out in BlockProviderExecutor (#3554) This brings two related pieces of code together into a single method, removing the possibility of the _scale_out code being called in any other way than via scale_out_facade. Future PRs will rearrange the now unified code and make a bugfix that will be more easily fixable now. This PR should not change behaviour as it is only a code movement. --- parsl/executors/status_handling.py | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index 200b43cc41..5425094bca 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -183,22 +183,13 @@ def _filter_scale_in_ids(self, to_kill: Sequence[Any], killed: Sequence[bool]) - return list(compress(to_kill, killed)) def scale_out_facade(self, n: int) -> List[str]: - block_ids = self._scale_out(n) - new_status = {} - for block_id in block_ids: - new_status[block_id] = JobStatus(JobState.PENDING) - self.send_monitoring_info(new_status) - self._status.update(new_status) - return block_ids - - def _scale_out(self, blocks: int = 1) -> List[str]: """Scales out the number of blocks by "blocks" """ if not self.provider: raise ScalingFailed(self, "No execution provider available") block_ids = [] - logger.info(f"Scaling out by {blocks} blocks") - for _ in range(blocks): + logger.info(f"Scaling out by {n} blocks") + for _ in range(n): block_id = str(self._block_id_counter.get_id()) logger.info(f"Allocated block ID {block_id}") try: @@ -208,6 +199,12 @@ def _scale_out(self, blocks: int = 1) -> List[str]: block_ids.append(block_id) except Exception as ex: self._simulated_status[block_id] = JobStatus(JobState.FAILED, "Failed to start block {}: {}".format(block_id, ex)) + + new_status = {} + for block_id in block_ids: + new_status[block_id] = JobStatus(JobState.PENDING) + self.send_monitoring_info(new_status) + self._status.update(new_status) return block_ids def scale_in(self, blocks: int) -> List[str]: From 1fca73c92403255e1d6b58808e021836fcb2b2e0 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 1 Aug 2024 15:43:56 +0200 Subject: [PATCH 06/11] Bring block table updates together in scale out (#3562) There are multiple structures that contain information about blocks and the status of those blocks. This PR is part of ongoing work to make the information contained in those blocks more consistent. This PR brings updates to executor._status (before PR #3352, living in parsl.jobs.job_status_poller) together with updates to the block/job id mapping structures (which has existed in the executor layer under different names since commit a1963bf36fa6bac5bb5b28757a2c5d4a1fbe0462 introduced self.engines in 2018). This PR should not change behaviour: it moves code around in a way that should not affect how the various structures are left populated at the end of the method. This PR makes the cause of issue #3235 clearer, without attempting to fix it: in the successful code path touched by this PR, executor._status is updated immediately, but in the exception path, the status update only goes into _simulated_status and does not appear in executor._status until much later (when self.status() merges provider-provided status and simulated status driven by the job status poller). A subsequent PR will address issue #3235 --- parsl/executors/status_handling.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index 5425094bca..5ca70c5877 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -188,23 +188,27 @@ def scale_out_facade(self, n: int) -> List[str]: if not self.provider: raise ScalingFailed(self, "No execution provider available") block_ids = [] + monitoring_status_changes = {} logger.info(f"Scaling out by {n} blocks") for _ in range(n): block_id = str(self._block_id_counter.get_id()) logger.info(f"Allocated block ID {block_id}") try: job_id = self._launch_block(block_id) + + pending_status = JobStatus(JobState.PENDING) + self.blocks_to_job_id[block_id] = job_id self.job_ids_to_block[job_id] = block_id + self._status[block_id] = pending_status + + monitoring_status_changes[block_id] = pending_status block_ids.append(block_id) + except Exception as ex: self._simulated_status[block_id] = JobStatus(JobState.FAILED, "Failed to start block {}: {}".format(block_id, ex)) - new_status = {} - for block_id in block_ids: - new_status[block_id] = JobStatus(JobState.PENDING) - self.send_monitoring_info(new_status) - self._status.update(new_status) + self.send_monitoring_info(monitoring_status_changes) return block_ids def scale_in(self, blocks: int) -> List[str]: From 21362b5702316370aa1f68479e431b30ba1e8de5 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 1 Aug 2024 16:40:38 +0200 Subject: [PATCH 07/11] Update _status cache on failed blocks (#3563) _status stores the status of blocks. This structure is periodically updated by executor.status(), but any changes to block status that happen before such an update happens need to be done explicitly, to make them appear in _status immediately. Before this PR, this was done for launched blocks, 6 lines before this change, but not for failed blocks. This is the cause of issue #3235 - scaling code does not become aware of failed blocks until executor.status() updates _status on a slow poll. This PR makes _status be updated with that failed block information immediately (in addition to the existing code which places it in _simulated_status for when executor.status() rebuilds the entire _status structure). This PR has a test case which fails before this PR's change to status_handling.py but passes afterwards. Changed Behaviour Earlier recognition of failed blocks by the scaling code, which should lead to earlier overall failure when the scaling code decides to abort. Fixes #3235 --- parsl/executors/status_handling.py | 4 +- ...st_disconnected_blocks_failing_provider.py | 71 +++++++++++++++++++ 2 files changed, 74 insertions(+), 1 deletion(-) create mode 100644 parsl/tests/test_htex/test_disconnected_blocks_failing_provider.py diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index 5ca70c5877..34db2300f6 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -206,7 +206,9 @@ def scale_out_facade(self, n: int) -> List[str]: block_ids.append(block_id) except Exception as ex: - self._simulated_status[block_id] = JobStatus(JobState.FAILED, "Failed to start block {}: {}".format(block_id, ex)) + failed_status = JobStatus(JobState.FAILED, "Failed to start block {}: {}".format(block_id, ex)) + self._simulated_status[block_id] = failed_status + self._status[block_id] = failed_status self.send_monitoring_info(monitoring_status_changes) return block_ids diff --git a/parsl/tests/test_htex/test_disconnected_blocks_failing_provider.py b/parsl/tests/test_htex/test_disconnected_blocks_failing_provider.py new file mode 100644 index 0000000000..b2fa507aca --- /dev/null +++ b/parsl/tests/test_htex/test_disconnected_blocks_failing_provider.py @@ -0,0 +1,71 @@ +import logging + +import pytest + +import parsl +from parsl import Config +from parsl.executors import HighThroughputExecutor +from parsl.executors.errors import BadStateException +from parsl.jobs.states import JobState, JobStatus +from parsl.providers import LocalProvider + + +class FailingProvider(LocalProvider): + def submit(*args, **kwargs): + raise RuntimeError("Deliberate failure of provider.submit") + + +def local_config(): + """Config to simulate failing blocks without connecting""" + return Config( + executors=[ + HighThroughputExecutor( + label="HTEX", + heartbeat_period=1, + heartbeat_threshold=2, + poll_period=100, + max_workers_per_node=1, + provider=FailingProvider( + init_blocks=0, + max_blocks=2, + min_blocks=0, + ), + ) + ], + max_idletime=0.5, + strategy='htex_auto_scale', + strategy_period=0.1 + # this strategy period needs to be a few times smaller than the + # status_polling_interval of FailingProvider, which is 5s at + # time of writing + ) + + +@parsl.python_app +def double(x): + return x * 2 + + +@pytest.mark.local +def test_disconnected_blocks(): + """Test reporting of blocks that fail to connect from HTEX""" + dfk = parsl.dfk() + executor = dfk.executors["HTEX"] + + connected_blocks = executor.connected_blocks() + assert not connected_blocks, "Expected 0 blocks" + + future = double(5) + with pytest.raises(BadStateException): + future.result() + + assert isinstance(future.exception(), BadStateException) + + status_dict = executor.status() + assert len(status_dict) == 1, "Expected exactly 1 block" + for status in status_dict.values(): + assert isinstance(status, JobStatus) + assert status.state == JobState.MISSING + + connected_blocks = executor.connected_blocks() + assert connected_blocks == [], "Expected exactly 0 connected blocks" From 2b01411bb24f2d05547f36033b83fe7790d431be Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 2 Aug 2024 12:45:48 +0200 Subject: [PATCH 08/11] Make monitoring hub start into its own Exception (#3561) This is in line with the principle that Parsl exceptions should all be subclasses of ParslError. --- parsl/monitoring/errors.py | 6 ++++++ parsl/monitoring/monitoring.py | 3 ++- 2 files changed, 8 insertions(+), 1 deletion(-) create mode 100644 parsl/monitoring/errors.py diff --git a/parsl/monitoring/errors.py b/parsl/monitoring/errors.py new file mode 100644 index 0000000000..f41225ff44 --- /dev/null +++ b/parsl/monitoring/errors.py @@ -0,0 +1,6 @@ +from parsl.errors import ParslError + + +class MonitoringHubStartError(ParslError): + def __str__(self) -> str: + return "Hub failed to start" diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index f86bf81e87..c9a2dc9ed7 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -12,6 +12,7 @@ import typeguard from parsl.log_utils import set_file_logger +from parsl.monitoring.errors import MonitoringHubStartError from parsl.monitoring.message_type import MessageType from parsl.monitoring.radios import MultiprocessingQueueRadioSender from parsl.monitoring.router import router_starter @@ -195,7 +196,7 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat comm_q.join_thread() except queue.Empty: logger.error("Hub has not completed initialization in 120s. Aborting") - raise Exception("Hub failed to start") + raise MonitoringHubStartError() if isinstance(comm_q_result, str): logger.error(f"MonitoringRouter sent an error message: {comm_q_result}") From 4f139c28118d625c893f67417b0bc391f535ac8e Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 2 Aug 2024 18:55:24 +0200 Subject: [PATCH 09/11] Factor interchange monitoring code into a ZMQRadioSender (#3556) From an interchange perspective: this is a refactoring intended to clarify that the interchange isn't doing anything special wrt. monitoring messages and that it can send monitoring messages in the same way that remote workers can. From a monitoring perspective: this pulls ZMQ sender code out of the interchange and puts it in a place that is more natural for ongoing development. For example, a potential future use with Work Queue and Task Vine is that workers would also benefit from using ZMQ to send monitoring messages. In some potential use cases, it might be desirable to configure the radio used by the interchange instead of the hard-coded ZMQRadio. On-going work in draft PR #3315 addresses configuration of different types of radio and that work should be relevant here too. --- .../executors/high_throughput/interchange.py | 65 +++++++++---------- parsl/monitoring/radios.py | 16 +++++ 2 files changed, 47 insertions(+), 34 deletions(-) diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index 9ebe6b95b9..5da83ae3ca 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -20,6 +20,7 @@ from parsl.executors.high_throughput.manager_record import ManagerRecord from parsl.executors.high_throughput.manager_selector import ManagerSelector from parsl.monitoring.message_type import MessageType +from parsl.monitoring.radios import MonitoringRadioSender, ZMQRadioSender from parsl.process_loggers import wrap_with_logs from parsl.serialize import serialize as serialize_object from parsl.utils import setproctitle @@ -219,27 +220,15 @@ def task_puller(self) -> NoReturn: task_counter += 1 logger.debug(f"Fetched {task_counter} tasks so far") - def _create_monitoring_channel(self) -> Optional[zmq.Socket]: - if self.hub_address and self.hub_zmq_port: - logger.info("Connecting to MonitoringHub") - # This is a one-off because monitoring is unencrypted - hub_channel = zmq.Context().socket(zmq.DEALER) - hub_channel.set_hwm(0) - hub_channel.connect("tcp://{}:{}".format(self.hub_address, self.hub_zmq_port)) - logger.info("Connected to MonitoringHub") - return hub_channel - else: - return None - - def _send_monitoring_info(self, hub_channel: Optional[zmq.Socket], manager: ManagerRecord) -> None: - if hub_channel: + def _send_monitoring_info(self, monitoring_radio: Optional[MonitoringRadioSender], manager: ManagerRecord) -> None: + if monitoring_radio: logger.info("Sending message {} to MonitoringHub".format(manager)) d: Dict = cast(Dict, manager.copy()) d['timestamp'] = datetime.datetime.now() d['last_heartbeat'] = datetime.datetime.fromtimestamp(d['last_heartbeat']) - hub_channel.send_pyobj((MessageType.NODE_INFO, d)) + monitoring_radio.send((MessageType.NODE_INFO, d)) @wrap_with_logs(target="interchange") def _command_server(self) -> NoReturn: @@ -247,8 +236,11 @@ def _command_server(self) -> NoReturn: """ logger.debug("Command Server Starting") - # Need to create a new ZMQ socket for command server thread - hub_channel = self._create_monitoring_channel() + if self.hub_address is not None and self.hub_zmq_port is not None: + logger.debug("Creating monitoring radio to %s:%s", self.hub_address, self.hub_zmq_port) + monitoring_radio = ZMQRadioSender(self.hub_address, self.hub_zmq_port) + else: + monitoring_radio = None reply: Any # the type of reply depends on the command_req received (aka this needs dependent types...) @@ -298,7 +290,7 @@ def _command_server(self) -> NoReturn: if manager_id in self._ready_managers: m = self._ready_managers[manager_id] m['active'] = False - self._send_monitoring_info(hub_channel, m) + self._send_monitoring_info(monitoring_radio, m) else: logger.warning("Worker to hold was not in ready managers list") @@ -333,9 +325,14 @@ def start(self) -> None: # parent-process-inheritance problems. signal.signal(signal.SIGTERM, signal.SIG_DFL) - logger.info("Incoming ports bound") + logger.info("Starting main interchange method") - hub_channel = self._create_monitoring_channel() + if self.hub_address is not None and self.hub_zmq_port is not None: + logger.debug("Creating monitoring radio to %s:%s", self.hub_address, self.hub_zmq_port) + monitoring_radio = ZMQRadioSender(self.hub_address, self.hub_zmq_port) + logger.debug("Created monitoring radio") + else: + monitoring_radio = None poll_period = self.poll_period @@ -366,10 +363,10 @@ def start(self) -> None: while not kill_event.is_set(): self.socks = dict(poller.poll(timeout=poll_period)) - self.process_task_outgoing_incoming(interesting_managers, hub_channel, kill_event) - self.process_results_incoming(interesting_managers, hub_channel) - self.expire_bad_managers(interesting_managers, hub_channel) - self.expire_drained_managers(interesting_managers, hub_channel) + self.process_task_outgoing_incoming(interesting_managers, monitoring_radio, kill_event) + self.process_results_incoming(interesting_managers, monitoring_radio) + self.expire_bad_managers(interesting_managers, monitoring_radio) + self.expire_drained_managers(interesting_managers, monitoring_radio) self.process_tasks_to_send(interesting_managers) self.zmq_context.destroy() @@ -380,7 +377,7 @@ def start(self) -> None: def process_task_outgoing_incoming( self, interesting_managers: Set[bytes], - hub_channel: Optional[zmq.Socket], + monitoring_radio: Optional[MonitoringRadioSender], kill_event: threading.Event ) -> None: """Process one message from manager on the task_outgoing channel. @@ -434,7 +431,7 @@ def process_task_outgoing_incoming( m.update(msg) # type: ignore[typeddict-item] logger.info("Registration info for manager {!r}: {}".format(manager_id, msg)) - self._send_monitoring_info(hub_channel, m) + self._send_monitoring_info(monitoring_radio, m) if (msg['python_v'].rsplit(".", 1)[0] != self.current_platform['python_v'].rsplit(".", 1)[0] or msg['parsl_v'] != self.current_platform['parsl_v']): @@ -465,7 +462,7 @@ def process_task_outgoing_incoming( logger.error(f"Unexpected message type received from manager: {msg['type']}") logger.debug("leaving task_outgoing section") - def expire_drained_managers(self, interesting_managers: Set[bytes], hub_channel: Optional[zmq.Socket]) -> None: + def expire_drained_managers(self, interesting_managers: Set[bytes], monitoring_radio: Optional[MonitoringRadioSender]) -> None: for manager_id in list(interesting_managers): # is it always true that a draining manager will be in interesting managers? @@ -478,7 +475,7 @@ def expire_drained_managers(self, interesting_managers: Set[bytes], hub_channel: self._ready_managers.pop(manager_id) m['active'] = False - self._send_monitoring_info(hub_channel, m) + self._send_monitoring_info(monitoring_radio, m) def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None: # Check if there are tasks that could be sent to managers @@ -521,7 +518,7 @@ def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None: else: logger.debug("either no interesting managers or no tasks, so skipping manager pass") - def process_results_incoming(self, interesting_managers: Set[bytes], hub_channel: Optional[zmq.Socket]) -> None: + def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_radio: Optional[MonitoringRadioSender]) -> None: # Receive any results and forward to client if self.results_incoming in self.socks and self.socks[self.results_incoming] == zmq.POLLIN: logger.debug("entering results_incoming section") @@ -541,11 +538,11 @@ def process_results_incoming(self, interesting_managers: Set[bytes], hub_channel elif r['type'] == 'monitoring': # the monitoring code makes the assumption that no # monitoring messages will be received if monitoring - # is not configured, and that hub_channel will only + # is not configured, and that monitoring_radio will only # be None when monitoring is not configurated. - assert hub_channel is not None + assert monitoring_radio is not None - hub_channel.send_pyobj(r['payload']) + monitoring_radio.send(r['payload']) elif r['type'] == 'heartbeat': logger.debug(f"Manager {manager_id!r} sent heartbeat via results connection") b_messages.append((p_message, r)) @@ -589,7 +586,7 @@ def process_results_incoming(self, interesting_managers: Set[bytes], hub_channel interesting_managers.add(manager_id) logger.debug("leaving results_incoming section") - def expire_bad_managers(self, interesting_managers: Set[bytes], hub_channel: Optional[zmq.Socket]) -> None: + def expire_bad_managers(self, interesting_managers: Set[bytes], monitoring_radio: Optional[MonitoringRadioSender]) -> None: bad_managers = [(manager_id, m) for (manager_id, m) in self._ready_managers.items() if time.time() - m['last_heartbeat'] > self.heartbeat_threshold] for (manager_id, m) in bad_managers: @@ -597,7 +594,7 @@ def expire_bad_managers(self, interesting_managers: Set[bytes], hub_channel: Opt logger.warning(f"Too many heartbeats missed for manager {manager_id!r} - removing manager") if m['active']: m['active'] = False - self._send_monitoring_info(hub_channel, m) + self._send_monitoring_info(monitoring_radio, m) logger.warning(f"Cancelling htex tasks {m['tasks']} on removed manager") for tid in m['tasks']: diff --git a/parsl/monitoring/radios.py b/parsl/monitoring/radios.py index 6c77fd37b1..37bef0b06a 100644 --- a/parsl/monitoring/radios.py +++ b/parsl/monitoring/radios.py @@ -7,6 +7,8 @@ from multiprocessing.queues import Queue from typing import Optional +import zmq + from parsl.serialize import serialize _db_manager_excepts: Optional[Exception] @@ -186,3 +188,17 @@ def __init__(self, queue: Queue) -> None: def send(self, message: object) -> None: self.queue.put((message, 0)) + + +class ZMQRadioSender(MonitoringRadioSender): + """A monitoring radio which connects over ZMQ. This radio is not + thread-safe, because its use of ZMQ is not thread-safe. + """ + + def __init__(self, hub_address: str, hub_zmq_port: int) -> None: + self._hub_channel = zmq.Context().socket(zmq.DEALER) + self._hub_channel.set_hwm(0) + self._hub_channel.connect(f"tcp://{hub_address}:{hub_zmq_port}") + + def send(self, message: object) -> None: + self._hub_channel.send_pyobj(message) From 2f6a185c82f15800f335bd4c371ff93e2df1def9 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Sat, 3 Aug 2024 16:13:19 +0200 Subject: [PATCH 10/11] Fix pytest caplog heisenbug introduced in PR #3559 (#3565) Prior to this PR, this usage of caplog is dependent on the level of the root logger, which is not set by this test and so ends up being dependent on which tests have run before: sometimes the INFO logs output by htex.shutdown are not captured by caplog. This PR explicitly tells caplog to get at least INFO level logs, to capture the expected messages. --- parsl/tests/test_htex/test_htex.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/parsl/tests/test_htex/test_htex.py b/parsl/tests/test_htex/test_htex.py index 80a4e91bd5..810236c1b4 100644 --- a/parsl/tests/test_htex/test_htex.py +++ b/parsl/tests/test_htex/test_htex.py @@ -1,3 +1,4 @@ +import logging import pathlib from subprocess import Popen, TimeoutExpired from typing import Optional, Sequence @@ -107,7 +108,8 @@ def kill_interchange(*args, **kwargs): mock_ix_proc.terminate.side_effect = kill_interchange - htex.shutdown() + with caplog.at_level(logging.INFO): + htex.shutdown() if started: assert mock_ix_proc.terminate.called From d8e8d4b3d99b2f034b1d5e80f89f58e1278486c2 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 5 Aug 2024 16:29:13 +0200 Subject: [PATCH 11/11] Split monitoring router into two radio-specific receiver threads (#3558) This is part of two ongoing strands of development: * Preparation for arbitrary monitoring radio receivers, where UDP is not special compared to other methods. This code separates out the UDP specific code making it easier to move around later (see development in PR #3315) * Avoiding poll-one-thing, poll-another-thing tight polling loops in favour of multiple blocking loops. The two router receiver sections used to run in a single tight non-blocking loop, each component waiting loop_freq (= 10 ms) for something to happen. After this PR, the separated loops are more amenable to longer blocking times - they only need to discover when exit event is set which probably can be more on the order of 1 second. --- parsl/monitoring/router.py | 54 ++++++++++++++++++++++++++++---------- 1 file changed, 40 insertions(+), 14 deletions(-) diff --git a/parsl/monitoring/router.py b/parsl/monitoring/router.py index 9a422027c1..bf395e3662 100644 --- a/parsl/monitoring/router.py +++ b/parsl/monitoring/router.py @@ -5,6 +5,7 @@ import pickle import queue import socket +import threading import time from multiprocessing.synchronize import Event from typing import Optional, Tuple, Union @@ -108,7 +109,24 @@ def __init__(self, self.resource_msgs = resource_msgs self.exit_event = exit_event + @wrap_with_logs(target="monitoring_router") def start(self) -> None: + self.logger.info("Starting UDP listener thread") + udp_radio_receiver_thread = threading.Thread(target=self.start_udp_listener, daemon=True) + udp_radio_receiver_thread.start() + + self.logger.info("Starting ZMQ listener thread") + zmq_radio_receiver_thread = threading.Thread(target=self.start_zmq_listener, daemon=True) + zmq_radio_receiver_thread.start() + + self.logger.info("Joining on ZMQ listener thread") + zmq_radio_receiver_thread.join() + self.logger.info("Joining on UDP listener thread") + udp_radio_receiver_thread.join() + self.logger.info("Joined on both ZMQ and UDP listener threads") + + @wrap_with_logs(target="monitoring_router") + def start_udp_listener(self) -> None: try: while not self.exit_event.is_set(): try: @@ -119,6 +137,26 @@ def start(self) -> None: except socket.timeout: pass + self.logger.info("UDP listener draining") + last_msg_received_time = time.time() + while time.time() - last_msg_received_time < self.atexit_timeout: + try: + data, addr = self.udp_sock.recvfrom(2048) + msg = pickle.loads(data) + self.logger.debug("Got UDP Message from {}: {}".format(addr, msg)) + self.resource_msgs.put((msg, addr)) + last_msg_received_time = time.time() + except socket.timeout: + pass + + self.logger.info("UDP listener finishing normally") + finally: + self.logger.info("UDP listener finished") + + @wrap_with_logs(target="monitoring_router") + def start_zmq_listener(self) -> None: + try: + while not self.exit_event.is_set(): try: dfk_loop_start = time.time() while time.time() - dfk_loop_start < 1.0: # TODO make configurable @@ -161,21 +199,9 @@ def start(self) -> None: # thing to do. self.logger.warning("Failure processing a ZMQ message", exc_info=True) - self.logger.info("Monitoring router draining") - last_msg_received_time = time.time() - while time.time() - last_msg_received_time < self.atexit_timeout: - try: - data, addr = self.udp_sock.recvfrom(2048) - msg = pickle.loads(data) - self.logger.debug("Got UDP Message from {}: {}".format(addr, msg)) - self.resource_msgs.put((msg, addr)) - last_msg_received_time = time.time() - except socket.timeout: - pass - - self.logger.info("Monitoring router finishing normally") + self.logger.info("ZMQ listener finishing normally") finally: - self.logger.info("Monitoring router finished") + self.logger.info("ZMQ listener finished") @wrap_with_logs