From 7867b576365ff46ec2613ebe47bc4ad2778493b7 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 1 Aug 2024 09:45:26 +0200 Subject: [PATCH 01/26] Use caplog fixure rather than mock logging when testing HTEX shutdown (#3559) This replaces mock-captured call sequences which are over-testing the exact position of tested log calls in relation to any other log calls, which makes this test fragile when working on logs in the affected area of the code. This is consistent with other parts of the test suite which test log messages using caplog. --- parsl/tests/test_htex/test_htex.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/parsl/tests/test_htex/test_htex.py b/parsl/tests/test_htex/test_htex.py index fca68c3c2f..80a4e91bd5 100644 --- a/parsl/tests/test_htex/test_htex.py +++ b/parsl/tests/test_htex/test_htex.py @@ -71,12 +71,11 @@ def test_htex_start_encrypted( @pytest.mark.local @pytest.mark.parametrize("started", (True, False)) @pytest.mark.parametrize("timeout_expires", (True, False)) -@mock.patch(f"{_MOCK_BASE}.logger") def test_htex_shutdown( - mock_logger: mock.MagicMock, started: bool, timeout_expires: bool, htex: HighThroughputExecutor, + caplog ): mock_ix_proc = mock.Mock(spec=Popen) @@ -110,20 +109,19 @@ def kill_interchange(*args, **kwargs): htex.shutdown() - mock_logs = mock_logger.info.call_args_list if started: assert mock_ix_proc.terminate.called assert mock_ix_proc.wait.called assert {"timeout": 10} == mock_ix_proc.wait.call_args[1] if timeout_expires: - assert "Unable to terminate Interchange" in mock_logs[1][0][0] + assert "Unable to terminate Interchange" in caplog.text assert mock_ix_proc.kill.called - assert "Attempting" in mock_logs[0][0][0] - assert "Finished" in mock_logs[-1][0][0] + assert "Attempting HighThroughputExecutor shutdown" in caplog.text + assert "Finished HighThroughputExecutor shutdown" in caplog.text else: assert not mock_ix_proc.terminate.called assert not mock_ix_proc.wait.called - assert "has not started" in mock_logs[0][0][0] + assert "HighThroughputExecutor has not started" in caplog.text @pytest.mark.local From 8d606c93af6a7de4df8c60b99752aa4176e57096 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 1 Aug 2024 10:49:31 +0200 Subject: [PATCH 02/26] Rename submit-side monitoring radio for clarification (#3557) A Parsl executor is configured with a submit-side monitoring radio sender, which is used by the BlockProviderExecutor to send block status messages to the monitoring subsystem. Parsl executors also have a notion of a remote monitoring radio, used by remote workers to sending monitoring messages. This can be confusing when both of these radio senders are referred to in the same piece of code, as happened in ongoing monitoring plugin development in draft PR #3315. This PR is intended to make this sitution much less ambiguous by avoiding the mention of a monitoring radio in executor code without qualifying whether it is a submit-side or remote-worker-side radio definition. A future PR from the #3315 stack will introduce other monitoring radio references with the remote prefix, replacing the current radio_mode and related attributes. --- parsl/dataflow/dflow.py | 2 +- parsl/executors/base.py | 14 +++++++------- parsl/executors/status_handling.py | 4 ++-- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index a62a2261d0..88ef063230 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -1180,7 +1180,7 @@ def add_executors(self, executors: Sequence[ParslExecutor]) -> None: if self.monitoring: executor.hub_address = self.monitoring.hub_address executor.hub_zmq_port = self.monitoring.hub_zmq_port - executor.monitoring_radio = self.monitoring.radio + executor.submit_monitoring_radio = self.monitoring.radio if hasattr(executor, 'provider'): if hasattr(executor.provider, 'script_dir'): executor.provider.script_dir = os.path.join(self.run_dir, 'submit_scripts') diff --git a/parsl/executors/base.py b/parsl/executors/base.py index 941f392e9f..a112b9eb00 100644 --- a/parsl/executors/base.py +++ b/parsl/executors/base.py @@ -52,13 +52,13 @@ def __init__( *, hub_address: Optional[str] = None, hub_zmq_port: Optional[int] = None, - monitoring_radio: Optional[MonitoringRadioSender] = None, + submit_monitoring_radio: Optional[MonitoringRadioSender] = None, run_dir: str = ".", run_id: Optional[str] = None, ): self.hub_address = hub_address self.hub_zmq_port = hub_zmq_port - self.monitoring_radio = monitoring_radio + self.submit_monitoring_radio = submit_monitoring_radio self.run_dir = os.path.abspath(run_dir) self.run_id = run_id @@ -147,11 +147,11 @@ def hub_zmq_port(self, value: Optional[int]) -> None: self._hub_zmq_port = value @property - def monitoring_radio(self) -> Optional[MonitoringRadioSender]: + def submit_monitoring_radio(self) -> Optional[MonitoringRadioSender]: """Local radio for sending monitoring messages """ - return self._monitoring_radio + return self._submit_monitoring_radio - @monitoring_radio.setter - def monitoring_radio(self, value: Optional[MonitoringRadioSender]) -> None: - self._monitoring_radio = value + @submit_monitoring_radio.setter + def submit_monitoring_radio(self, value: Optional[MonitoringRadioSender]) -> None: + self._submit_monitoring_radio = value diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index 1e4ea3c0b4..0f7ed90592 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -269,10 +269,10 @@ def workers_per_node(self) -> Union[int, float]: def send_monitoring_info(self, status: Dict) -> None: # Send monitoring info for HTEX when monitoring enabled - if self.monitoring_radio: + if self.submit_monitoring_radio: msg = self.create_monitoring_info(status) logger.debug("Sending block monitoring message: %r", msg) - self.monitoring_radio.send((MessageType.BLOCK_INFO, msg)) + self.submit_monitoring_radio.send((MessageType.BLOCK_INFO, msg)) def create_monitoring_info(self, status: Dict[str, JobStatus]) -> Sequence[object]: """Create a monitoring message for each block based on the poll status. From 5ee584d26b1dcb0d22ab07de37df3cdd9be1248d Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 1 Aug 2024 11:17:40 +0200 Subject: [PATCH 03/26] Only scale in blocks that are in _status and non-terminal (#3548) In the BlockProviderExecutor, the block ID to job ID mapping structures contain the full historical list of blocks. Prior to this PR, the mapping was used as source of current jobs that should/could be scaled in. This was incorrect. and resulted in scaling in code attempting to: scale in blocks that had already finished, because it continues to see those blocks as eligible for scale-in not scale in blocks that were active - because rather than choosing to scale in an alive block, the code would choose to attempt to scale in a non-alive block After this PR, the _status structure which should contain reasonably up to date status information is used instead of the block/job ID mapping structures. (as a more general principle, those block/job ID mapping structures should never be examined as a whole but only used for mapping) Changed Behaviour: Scaling in should work better in executors using the default scaling in that was refactored in PR #3526, which right now is Work Queue and Task Vine. Fixes #3471 --- parsl/executors/status_handling.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index 0f7ed90592..200b43cc41 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -12,7 +12,7 @@ from parsl.executors.base import ParslExecutor from parsl.executors.errors import BadStateException, ScalingFailed from parsl.jobs.error_handlers import noop_error_handler, simple_error_handler -from parsl.jobs.states import JobState, JobStatus +from parsl.jobs.states import TERMINAL_STATES, JobState, JobStatus from parsl.monitoring.message_type import MessageType from parsl.providers.base import ExecutionProvider from parsl.utils import AtomicIDCounter @@ -222,16 +222,20 @@ def scale_in(self, blocks: int) -> List[str]: :return: A list of block ids corresponding to the blocks that were removed. """ - # Obtain list of blocks to kill - to_kill = list(self.blocks_to_job_id.keys())[:blocks] - kill_ids = [self.blocks_to_job_id[block] for block in to_kill] + + active_blocks = [block_id for block_id, status in self._status.items() + if status.state not in TERMINAL_STATES] + + block_ids_to_kill = active_blocks[:blocks] + + job_ids_to_kill = [self.blocks_to_job_id[block] for block in block_ids_to_kill] # Cancel the blocks provisioned if self.provider: - logger.info(f"Scaling in jobs: {kill_ids}") - r = self.provider.cancel(kill_ids) - job_ids = self._filter_scale_in_ids(kill_ids, r) - block_ids_killed = [self.job_ids_to_block[jid] for jid in job_ids] + logger.info(f"Scaling in jobs: {job_ids_to_kill}") + r = self.provider.cancel(job_ids_to_kill) + job_ids = self._filter_scale_in_ids(job_ids_to_kill, r) + block_ids_killed = [self.job_ids_to_block[job_id] for job_id in job_ids] return block_ids_killed else: logger.error("No execution provider available to scale in") From a24bc932117c1ba50701992b41efdda79f3e3eed Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 1 Aug 2024 11:48:57 +0200 Subject: [PATCH 04/26] Test UDP monitoring radio (#3555) Before this PR, this radio was omitted because all of executors in the basic monitoring test had over time been switched to different defaults. --- parsl/tests/test_monitoring/test_basic.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/parsl/tests/test_monitoring/test_basic.py b/parsl/tests/test_monitoring/test_basic.py index c900670ec8..1c792a9d82 100644 --- a/parsl/tests/test_monitoring/test_basic.py +++ b/parsl/tests/test_monitoring/test_basic.py @@ -25,10 +25,23 @@ def this_app(): # a configuration that is suitably configured for monitoring. def htex_config(): + """This config will use htex's default htex-specific monitoring radio mode""" from parsl.tests.configs.htex_local_alternate import fresh_config return fresh_config() +def htex_udp_config(): + """This config will force UDP""" + from parsl.tests.configs.htex_local_alternate import fresh_config + c = fresh_config() + assert len(c.executors) == 1 + + assert c.executors[0].radio_mode == "htex", "precondition: htex has a radio mode attribute, configured for htex radio" + c.executors[0].radio_mode = "udp" + + return c + + def workqueue_config(): from parsl.tests.configs.workqueue_ex import fresh_config c = fresh_config() @@ -48,7 +61,7 @@ def taskvine_config(): @pytest.mark.local -@pytest.mark.parametrize("fresh_config", [htex_config, workqueue_config, taskvine_config]) +@pytest.mark.parametrize("fresh_config", [htex_config, htex_udp_config, workqueue_config, taskvine_config]) def test_row_counts(tmpd_cwd, fresh_config): # this is imported here rather than at module level because # it isn't available in a plain parsl install, so this module From 11d88db8aa63d7228681b3a08f384a8a0088f63c Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 1 Aug 2024 13:32:15 +0200 Subject: [PATCH 05/26] Inline _scale_out in BlockProviderExecutor (#3554) This brings two related pieces of code together into a single method, removing the possibility of the _scale_out code being called in any other way than via scale_out_facade. Future PRs will rearrange the now unified code and make a bugfix that will be more easily fixable now. This PR should not change behaviour as it is only a code movement. --- parsl/executors/status_handling.py | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index 200b43cc41..5425094bca 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -183,22 +183,13 @@ def _filter_scale_in_ids(self, to_kill: Sequence[Any], killed: Sequence[bool]) - return list(compress(to_kill, killed)) def scale_out_facade(self, n: int) -> List[str]: - block_ids = self._scale_out(n) - new_status = {} - for block_id in block_ids: - new_status[block_id] = JobStatus(JobState.PENDING) - self.send_monitoring_info(new_status) - self._status.update(new_status) - return block_ids - - def _scale_out(self, blocks: int = 1) -> List[str]: """Scales out the number of blocks by "blocks" """ if not self.provider: raise ScalingFailed(self, "No execution provider available") block_ids = [] - logger.info(f"Scaling out by {blocks} blocks") - for _ in range(blocks): + logger.info(f"Scaling out by {n} blocks") + for _ in range(n): block_id = str(self._block_id_counter.get_id()) logger.info(f"Allocated block ID {block_id}") try: @@ -208,6 +199,12 @@ def _scale_out(self, blocks: int = 1) -> List[str]: block_ids.append(block_id) except Exception as ex: self._simulated_status[block_id] = JobStatus(JobState.FAILED, "Failed to start block {}: {}".format(block_id, ex)) + + new_status = {} + for block_id in block_ids: + new_status[block_id] = JobStatus(JobState.PENDING) + self.send_monitoring_info(new_status) + self._status.update(new_status) return block_ids def scale_in(self, blocks: int) -> List[str]: From 1fca73c92403255e1d6b58808e021836fcb2b2e0 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 1 Aug 2024 15:43:56 +0200 Subject: [PATCH 06/26] Bring block table updates together in scale out (#3562) There are multiple structures that contain information about blocks and the status of those blocks. This PR is part of ongoing work to make the information contained in those blocks more consistent. This PR brings updates to executor._status (before PR #3352, living in parsl.jobs.job_status_poller) together with updates to the block/job id mapping structures (which has existed in the executor layer under different names since commit a1963bf36fa6bac5bb5b28757a2c5d4a1fbe0462 introduced self.engines in 2018). This PR should not change behaviour: it moves code around in a way that should not affect how the various structures are left populated at the end of the method. This PR makes the cause of issue #3235 clearer, without attempting to fix it: in the successful code path touched by this PR, executor._status is updated immediately, but in the exception path, the status update only goes into _simulated_status and does not appear in executor._status until much later (when self.status() merges provider-provided status and simulated status driven by the job status poller). A subsequent PR will address issue #3235 --- parsl/executors/status_handling.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index 5425094bca..5ca70c5877 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -188,23 +188,27 @@ def scale_out_facade(self, n: int) -> List[str]: if not self.provider: raise ScalingFailed(self, "No execution provider available") block_ids = [] + monitoring_status_changes = {} logger.info(f"Scaling out by {n} blocks") for _ in range(n): block_id = str(self._block_id_counter.get_id()) logger.info(f"Allocated block ID {block_id}") try: job_id = self._launch_block(block_id) + + pending_status = JobStatus(JobState.PENDING) + self.blocks_to_job_id[block_id] = job_id self.job_ids_to_block[job_id] = block_id + self._status[block_id] = pending_status + + monitoring_status_changes[block_id] = pending_status block_ids.append(block_id) + except Exception as ex: self._simulated_status[block_id] = JobStatus(JobState.FAILED, "Failed to start block {}: {}".format(block_id, ex)) - new_status = {} - for block_id in block_ids: - new_status[block_id] = JobStatus(JobState.PENDING) - self.send_monitoring_info(new_status) - self._status.update(new_status) + self.send_monitoring_info(monitoring_status_changes) return block_ids def scale_in(self, blocks: int) -> List[str]: From 21362b5702316370aa1f68479e431b30ba1e8de5 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 1 Aug 2024 16:40:38 +0200 Subject: [PATCH 07/26] Update _status cache on failed blocks (#3563) _status stores the status of blocks. This structure is periodically updated by executor.status(), but any changes to block status that happen before such an update happens need to be done explicitly, to make them appear in _status immediately. Before this PR, this was done for launched blocks, 6 lines before this change, but not for failed blocks. This is the cause of issue #3235 - scaling code does not become aware of failed blocks until executor.status() updates _status on a slow poll. This PR makes _status be updated with that failed block information immediately (in addition to the existing code which places it in _simulated_status for when executor.status() rebuilds the entire _status structure). This PR has a test case which fails before this PR's change to status_handling.py but passes afterwards. Changed Behaviour Earlier recognition of failed blocks by the scaling code, which should lead to earlier overall failure when the scaling code decides to abort. Fixes #3235 --- parsl/executors/status_handling.py | 4 +- ...st_disconnected_blocks_failing_provider.py | 71 +++++++++++++++++++ 2 files changed, 74 insertions(+), 1 deletion(-) create mode 100644 parsl/tests/test_htex/test_disconnected_blocks_failing_provider.py diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index 5ca70c5877..34db2300f6 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -206,7 +206,9 @@ def scale_out_facade(self, n: int) -> List[str]: block_ids.append(block_id) except Exception as ex: - self._simulated_status[block_id] = JobStatus(JobState.FAILED, "Failed to start block {}: {}".format(block_id, ex)) + failed_status = JobStatus(JobState.FAILED, "Failed to start block {}: {}".format(block_id, ex)) + self._simulated_status[block_id] = failed_status + self._status[block_id] = failed_status self.send_monitoring_info(monitoring_status_changes) return block_ids diff --git a/parsl/tests/test_htex/test_disconnected_blocks_failing_provider.py b/parsl/tests/test_htex/test_disconnected_blocks_failing_provider.py new file mode 100644 index 0000000000..b2fa507aca --- /dev/null +++ b/parsl/tests/test_htex/test_disconnected_blocks_failing_provider.py @@ -0,0 +1,71 @@ +import logging + +import pytest + +import parsl +from parsl import Config +from parsl.executors import HighThroughputExecutor +from parsl.executors.errors import BadStateException +from parsl.jobs.states import JobState, JobStatus +from parsl.providers import LocalProvider + + +class FailingProvider(LocalProvider): + def submit(*args, **kwargs): + raise RuntimeError("Deliberate failure of provider.submit") + + +def local_config(): + """Config to simulate failing blocks without connecting""" + return Config( + executors=[ + HighThroughputExecutor( + label="HTEX", + heartbeat_period=1, + heartbeat_threshold=2, + poll_period=100, + max_workers_per_node=1, + provider=FailingProvider( + init_blocks=0, + max_blocks=2, + min_blocks=0, + ), + ) + ], + max_idletime=0.5, + strategy='htex_auto_scale', + strategy_period=0.1 + # this strategy period needs to be a few times smaller than the + # status_polling_interval of FailingProvider, which is 5s at + # time of writing + ) + + +@parsl.python_app +def double(x): + return x * 2 + + +@pytest.mark.local +def test_disconnected_blocks(): + """Test reporting of blocks that fail to connect from HTEX""" + dfk = parsl.dfk() + executor = dfk.executors["HTEX"] + + connected_blocks = executor.connected_blocks() + assert not connected_blocks, "Expected 0 blocks" + + future = double(5) + with pytest.raises(BadStateException): + future.result() + + assert isinstance(future.exception(), BadStateException) + + status_dict = executor.status() + assert len(status_dict) == 1, "Expected exactly 1 block" + for status in status_dict.values(): + assert isinstance(status, JobStatus) + assert status.state == JobState.MISSING + + connected_blocks = executor.connected_blocks() + assert connected_blocks == [], "Expected exactly 0 connected blocks" From 2b01411bb24f2d05547f36033b83fe7790d431be Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 2 Aug 2024 12:45:48 +0200 Subject: [PATCH 08/26] Make monitoring hub start into its own Exception (#3561) This is in line with the principle that Parsl exceptions should all be subclasses of ParslError. --- parsl/monitoring/errors.py | 6 ++++++ parsl/monitoring/monitoring.py | 3 ++- 2 files changed, 8 insertions(+), 1 deletion(-) create mode 100644 parsl/monitoring/errors.py diff --git a/parsl/monitoring/errors.py b/parsl/monitoring/errors.py new file mode 100644 index 0000000000..f41225ff44 --- /dev/null +++ b/parsl/monitoring/errors.py @@ -0,0 +1,6 @@ +from parsl.errors import ParslError + + +class MonitoringHubStartError(ParslError): + def __str__(self) -> str: + return "Hub failed to start" diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index f86bf81e87..c9a2dc9ed7 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -12,6 +12,7 @@ import typeguard from parsl.log_utils import set_file_logger +from parsl.monitoring.errors import MonitoringHubStartError from parsl.monitoring.message_type import MessageType from parsl.monitoring.radios import MultiprocessingQueueRadioSender from parsl.monitoring.router import router_starter @@ -195,7 +196,7 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat comm_q.join_thread() except queue.Empty: logger.error("Hub has not completed initialization in 120s. Aborting") - raise Exception("Hub failed to start") + raise MonitoringHubStartError() if isinstance(comm_q_result, str): logger.error(f"MonitoringRouter sent an error message: {comm_q_result}") From 4f139c28118d625c893f67417b0bc391f535ac8e Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 2 Aug 2024 18:55:24 +0200 Subject: [PATCH 09/26] Factor interchange monitoring code into a ZMQRadioSender (#3556) From an interchange perspective: this is a refactoring intended to clarify that the interchange isn't doing anything special wrt. monitoring messages and that it can send monitoring messages in the same way that remote workers can. From a monitoring perspective: this pulls ZMQ sender code out of the interchange and puts it in a place that is more natural for ongoing development. For example, a potential future use with Work Queue and Task Vine is that workers would also benefit from using ZMQ to send monitoring messages. In some potential use cases, it might be desirable to configure the radio used by the interchange instead of the hard-coded ZMQRadio. On-going work in draft PR #3315 addresses configuration of different types of radio and that work should be relevant here too. --- .../executors/high_throughput/interchange.py | 65 +++++++++---------- parsl/monitoring/radios.py | 16 +++++ 2 files changed, 47 insertions(+), 34 deletions(-) diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index 9ebe6b95b9..5da83ae3ca 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -20,6 +20,7 @@ from parsl.executors.high_throughput.manager_record import ManagerRecord from parsl.executors.high_throughput.manager_selector import ManagerSelector from parsl.monitoring.message_type import MessageType +from parsl.monitoring.radios import MonitoringRadioSender, ZMQRadioSender from parsl.process_loggers import wrap_with_logs from parsl.serialize import serialize as serialize_object from parsl.utils import setproctitle @@ -219,27 +220,15 @@ def task_puller(self) -> NoReturn: task_counter += 1 logger.debug(f"Fetched {task_counter} tasks so far") - def _create_monitoring_channel(self) -> Optional[zmq.Socket]: - if self.hub_address and self.hub_zmq_port: - logger.info("Connecting to MonitoringHub") - # This is a one-off because monitoring is unencrypted - hub_channel = zmq.Context().socket(zmq.DEALER) - hub_channel.set_hwm(0) - hub_channel.connect("tcp://{}:{}".format(self.hub_address, self.hub_zmq_port)) - logger.info("Connected to MonitoringHub") - return hub_channel - else: - return None - - def _send_monitoring_info(self, hub_channel: Optional[zmq.Socket], manager: ManagerRecord) -> None: - if hub_channel: + def _send_monitoring_info(self, monitoring_radio: Optional[MonitoringRadioSender], manager: ManagerRecord) -> None: + if monitoring_radio: logger.info("Sending message {} to MonitoringHub".format(manager)) d: Dict = cast(Dict, manager.copy()) d['timestamp'] = datetime.datetime.now() d['last_heartbeat'] = datetime.datetime.fromtimestamp(d['last_heartbeat']) - hub_channel.send_pyobj((MessageType.NODE_INFO, d)) + monitoring_radio.send((MessageType.NODE_INFO, d)) @wrap_with_logs(target="interchange") def _command_server(self) -> NoReturn: @@ -247,8 +236,11 @@ def _command_server(self) -> NoReturn: """ logger.debug("Command Server Starting") - # Need to create a new ZMQ socket for command server thread - hub_channel = self._create_monitoring_channel() + if self.hub_address is not None and self.hub_zmq_port is not None: + logger.debug("Creating monitoring radio to %s:%s", self.hub_address, self.hub_zmq_port) + monitoring_radio = ZMQRadioSender(self.hub_address, self.hub_zmq_port) + else: + monitoring_radio = None reply: Any # the type of reply depends on the command_req received (aka this needs dependent types...) @@ -298,7 +290,7 @@ def _command_server(self) -> NoReturn: if manager_id in self._ready_managers: m = self._ready_managers[manager_id] m['active'] = False - self._send_monitoring_info(hub_channel, m) + self._send_monitoring_info(monitoring_radio, m) else: logger.warning("Worker to hold was not in ready managers list") @@ -333,9 +325,14 @@ def start(self) -> None: # parent-process-inheritance problems. signal.signal(signal.SIGTERM, signal.SIG_DFL) - logger.info("Incoming ports bound") + logger.info("Starting main interchange method") - hub_channel = self._create_monitoring_channel() + if self.hub_address is not None and self.hub_zmq_port is not None: + logger.debug("Creating monitoring radio to %s:%s", self.hub_address, self.hub_zmq_port) + monitoring_radio = ZMQRadioSender(self.hub_address, self.hub_zmq_port) + logger.debug("Created monitoring radio") + else: + monitoring_radio = None poll_period = self.poll_period @@ -366,10 +363,10 @@ def start(self) -> None: while not kill_event.is_set(): self.socks = dict(poller.poll(timeout=poll_period)) - self.process_task_outgoing_incoming(interesting_managers, hub_channel, kill_event) - self.process_results_incoming(interesting_managers, hub_channel) - self.expire_bad_managers(interesting_managers, hub_channel) - self.expire_drained_managers(interesting_managers, hub_channel) + self.process_task_outgoing_incoming(interesting_managers, monitoring_radio, kill_event) + self.process_results_incoming(interesting_managers, monitoring_radio) + self.expire_bad_managers(interesting_managers, monitoring_radio) + self.expire_drained_managers(interesting_managers, monitoring_radio) self.process_tasks_to_send(interesting_managers) self.zmq_context.destroy() @@ -380,7 +377,7 @@ def start(self) -> None: def process_task_outgoing_incoming( self, interesting_managers: Set[bytes], - hub_channel: Optional[zmq.Socket], + monitoring_radio: Optional[MonitoringRadioSender], kill_event: threading.Event ) -> None: """Process one message from manager on the task_outgoing channel. @@ -434,7 +431,7 @@ def process_task_outgoing_incoming( m.update(msg) # type: ignore[typeddict-item] logger.info("Registration info for manager {!r}: {}".format(manager_id, msg)) - self._send_monitoring_info(hub_channel, m) + self._send_monitoring_info(monitoring_radio, m) if (msg['python_v'].rsplit(".", 1)[0] != self.current_platform['python_v'].rsplit(".", 1)[0] or msg['parsl_v'] != self.current_platform['parsl_v']): @@ -465,7 +462,7 @@ def process_task_outgoing_incoming( logger.error(f"Unexpected message type received from manager: {msg['type']}") logger.debug("leaving task_outgoing section") - def expire_drained_managers(self, interesting_managers: Set[bytes], hub_channel: Optional[zmq.Socket]) -> None: + def expire_drained_managers(self, interesting_managers: Set[bytes], monitoring_radio: Optional[MonitoringRadioSender]) -> None: for manager_id in list(interesting_managers): # is it always true that a draining manager will be in interesting managers? @@ -478,7 +475,7 @@ def expire_drained_managers(self, interesting_managers: Set[bytes], hub_channel: self._ready_managers.pop(manager_id) m['active'] = False - self._send_monitoring_info(hub_channel, m) + self._send_monitoring_info(monitoring_radio, m) def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None: # Check if there are tasks that could be sent to managers @@ -521,7 +518,7 @@ def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None: else: logger.debug("either no interesting managers or no tasks, so skipping manager pass") - def process_results_incoming(self, interesting_managers: Set[bytes], hub_channel: Optional[zmq.Socket]) -> None: + def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_radio: Optional[MonitoringRadioSender]) -> None: # Receive any results and forward to client if self.results_incoming in self.socks and self.socks[self.results_incoming] == zmq.POLLIN: logger.debug("entering results_incoming section") @@ -541,11 +538,11 @@ def process_results_incoming(self, interesting_managers: Set[bytes], hub_channel elif r['type'] == 'monitoring': # the monitoring code makes the assumption that no # monitoring messages will be received if monitoring - # is not configured, and that hub_channel will only + # is not configured, and that monitoring_radio will only # be None when monitoring is not configurated. - assert hub_channel is not None + assert monitoring_radio is not None - hub_channel.send_pyobj(r['payload']) + monitoring_radio.send(r['payload']) elif r['type'] == 'heartbeat': logger.debug(f"Manager {manager_id!r} sent heartbeat via results connection") b_messages.append((p_message, r)) @@ -589,7 +586,7 @@ def process_results_incoming(self, interesting_managers: Set[bytes], hub_channel interesting_managers.add(manager_id) logger.debug("leaving results_incoming section") - def expire_bad_managers(self, interesting_managers: Set[bytes], hub_channel: Optional[zmq.Socket]) -> None: + def expire_bad_managers(self, interesting_managers: Set[bytes], monitoring_radio: Optional[MonitoringRadioSender]) -> None: bad_managers = [(manager_id, m) for (manager_id, m) in self._ready_managers.items() if time.time() - m['last_heartbeat'] > self.heartbeat_threshold] for (manager_id, m) in bad_managers: @@ -597,7 +594,7 @@ def expire_bad_managers(self, interesting_managers: Set[bytes], hub_channel: Opt logger.warning(f"Too many heartbeats missed for manager {manager_id!r} - removing manager") if m['active']: m['active'] = False - self._send_monitoring_info(hub_channel, m) + self._send_monitoring_info(monitoring_radio, m) logger.warning(f"Cancelling htex tasks {m['tasks']} on removed manager") for tid in m['tasks']: diff --git a/parsl/monitoring/radios.py b/parsl/monitoring/radios.py index 6c77fd37b1..37bef0b06a 100644 --- a/parsl/monitoring/radios.py +++ b/parsl/monitoring/radios.py @@ -7,6 +7,8 @@ from multiprocessing.queues import Queue from typing import Optional +import zmq + from parsl.serialize import serialize _db_manager_excepts: Optional[Exception] @@ -186,3 +188,17 @@ def __init__(self, queue: Queue) -> None: def send(self, message: object) -> None: self.queue.put((message, 0)) + + +class ZMQRadioSender(MonitoringRadioSender): + """A monitoring radio which connects over ZMQ. This radio is not + thread-safe, because its use of ZMQ is not thread-safe. + """ + + def __init__(self, hub_address: str, hub_zmq_port: int) -> None: + self._hub_channel = zmq.Context().socket(zmq.DEALER) + self._hub_channel.set_hwm(0) + self._hub_channel.connect(f"tcp://{hub_address}:{hub_zmq_port}") + + def send(self, message: object) -> None: + self._hub_channel.send_pyobj(message) From 2f6a185c82f15800f335bd4c371ff93e2df1def9 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Sat, 3 Aug 2024 16:13:19 +0200 Subject: [PATCH 10/26] Fix pytest caplog heisenbug introduced in PR #3559 (#3565) Prior to this PR, this usage of caplog is dependent on the level of the root logger, which is not set by this test and so ends up being dependent on which tests have run before: sometimes the INFO logs output by htex.shutdown are not captured by caplog. This PR explicitly tells caplog to get at least INFO level logs, to capture the expected messages. --- parsl/tests/test_htex/test_htex.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/parsl/tests/test_htex/test_htex.py b/parsl/tests/test_htex/test_htex.py index 80a4e91bd5..810236c1b4 100644 --- a/parsl/tests/test_htex/test_htex.py +++ b/parsl/tests/test_htex/test_htex.py @@ -1,3 +1,4 @@ +import logging import pathlib from subprocess import Popen, TimeoutExpired from typing import Optional, Sequence @@ -107,7 +108,8 @@ def kill_interchange(*args, **kwargs): mock_ix_proc.terminate.side_effect = kill_interchange - htex.shutdown() + with caplog.at_level(logging.INFO): + htex.shutdown() if started: assert mock_ix_proc.terminate.called From d8e8d4b3d99b2f034b1d5e80f89f58e1278486c2 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 5 Aug 2024 16:29:13 +0200 Subject: [PATCH 11/26] Split monitoring router into two radio-specific receiver threads (#3558) This is part of two ongoing strands of development: * Preparation for arbitrary monitoring radio receivers, where UDP is not special compared to other methods. This code separates out the UDP specific code making it easier to move around later (see development in PR #3315) * Avoiding poll-one-thing, poll-another-thing tight polling loops in favour of multiple blocking loops. The two router receiver sections used to run in a single tight non-blocking loop, each component waiting loop_freq (= 10 ms) for something to happen. After this PR, the separated loops are more amenable to longer blocking times - they only need to discover when exit event is set which probably can be more on the order of 1 second. --- parsl/monitoring/router.py | 54 ++++++++++++++++++++++++++++---------- 1 file changed, 40 insertions(+), 14 deletions(-) diff --git a/parsl/monitoring/router.py b/parsl/monitoring/router.py index 9a422027c1..bf395e3662 100644 --- a/parsl/monitoring/router.py +++ b/parsl/monitoring/router.py @@ -5,6 +5,7 @@ import pickle import queue import socket +import threading import time from multiprocessing.synchronize import Event from typing import Optional, Tuple, Union @@ -108,7 +109,24 @@ def __init__(self, self.resource_msgs = resource_msgs self.exit_event = exit_event + @wrap_with_logs(target="monitoring_router") def start(self) -> None: + self.logger.info("Starting UDP listener thread") + udp_radio_receiver_thread = threading.Thread(target=self.start_udp_listener, daemon=True) + udp_radio_receiver_thread.start() + + self.logger.info("Starting ZMQ listener thread") + zmq_radio_receiver_thread = threading.Thread(target=self.start_zmq_listener, daemon=True) + zmq_radio_receiver_thread.start() + + self.logger.info("Joining on ZMQ listener thread") + zmq_radio_receiver_thread.join() + self.logger.info("Joining on UDP listener thread") + udp_radio_receiver_thread.join() + self.logger.info("Joined on both ZMQ and UDP listener threads") + + @wrap_with_logs(target="monitoring_router") + def start_udp_listener(self) -> None: try: while not self.exit_event.is_set(): try: @@ -119,6 +137,26 @@ def start(self) -> None: except socket.timeout: pass + self.logger.info("UDP listener draining") + last_msg_received_time = time.time() + while time.time() - last_msg_received_time < self.atexit_timeout: + try: + data, addr = self.udp_sock.recvfrom(2048) + msg = pickle.loads(data) + self.logger.debug("Got UDP Message from {}: {}".format(addr, msg)) + self.resource_msgs.put((msg, addr)) + last_msg_received_time = time.time() + except socket.timeout: + pass + + self.logger.info("UDP listener finishing normally") + finally: + self.logger.info("UDP listener finished") + + @wrap_with_logs(target="monitoring_router") + def start_zmq_listener(self) -> None: + try: + while not self.exit_event.is_set(): try: dfk_loop_start = time.time() while time.time() - dfk_loop_start < 1.0: # TODO make configurable @@ -161,21 +199,9 @@ def start(self) -> None: # thing to do. self.logger.warning("Failure processing a ZMQ message", exc_info=True) - self.logger.info("Monitoring router draining") - last_msg_received_time = time.time() - while time.time() - last_msg_received_time < self.atexit_timeout: - try: - data, addr = self.udp_sock.recvfrom(2048) - msg = pickle.loads(data) - self.logger.debug("Got UDP Message from {}: {}".format(addr, msg)) - self.resource_msgs.put((msg, addr)) - last_msg_received_time = time.time() - except socket.timeout: - pass - - self.logger.info("Monitoring router finishing normally") + self.logger.info("ZMQ listener finishing normally") finally: - self.logger.info("Monitoring router finished") + self.logger.info("ZMQ listener finished") @wrap_with_logs From 10a6a00144bbbcf12923e95b8f940370fcf76e9a Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 6 Aug 2024 23:39:05 +0200 Subject: [PATCH 12/26] Remove monitoring router modification of node message (#3567) Prior to this PR, the monitoring router would add a run_id field to every NODE_INFO message that it received. These are messages from the interchange describing worker pools. The monitoring router does not modify any other messages. This PR sets the run_id at the point of message origination inside the interchange (in _send_monitoring_info), and makes the router leave NODE_INFO messages unmodified (like the other message types). This is part of work to make the router less aware of message types by removing a bunch of message-type specific handling. This PR brings in a bunch of rewiring to get the run id into the interchange rather than into the monitoring router. * Changed Behaviour This should not change any workflow-user-facing behaviour. Globus Compute (or anyone else building a fake Parsl environment) will maybe have to change how they fake their Parsl implementation to pass in a run id (the executor.run_id part of dfk.add_executors). --- parsl/dataflow/dflow.py | 2 +- parsl/executors/high_throughput/executor.py | 1 + parsl/executors/high_throughput/interchange.py | 4 ++++ parsl/monitoring/monitoring.py | 3 +-- parsl/monitoring/router.py | 7 +------ parsl/tests/test_htex/test_zmq_binding.py | 3 ++- 6 files changed, 10 insertions(+), 10 deletions(-) diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index 88ef063230..344173c4b1 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -116,7 +116,7 @@ def __init__(self, config: Config) -> None: if self.monitoring: if self.monitoring.logdir is None: self.monitoring.logdir = self.run_dir - self.monitoring.start(self.run_id, self.run_dir, self.config.run_dir) + self.monitoring.start(self.run_dir, self.config.run_dir) self.time_began = datetime.datetime.now() self.time_completed: Optional[datetime.datetime] = None diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 6c181cdee7..1a56195c07 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -551,6 +551,7 @@ def _start_local_interchange_process(self) -> None: "logging_level": logging.DEBUG if self.worker_debug else logging.INFO, "cert_dir": self.cert_dir, "manager_selector": self.manager_selector, + "run_id": self.run_id, } config_pickle = pickle.dumps(interchange_config) diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index 5da83ae3ca..fa0969d398 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -55,6 +55,7 @@ def __init__(self, poll_period: int, cert_dir: Optional[str], manager_selector: ManagerSelector, + run_id: str, ) -> None: """ Parameters @@ -125,6 +126,8 @@ def __init__(self, self.command_channel.connect("tcp://{}:{}".format(client_address, client_ports[2])) logger.info("Connected to client") + self.run_id = run_id + self.hub_address = hub_address self.hub_zmq_port = hub_zmq_port @@ -227,6 +230,7 @@ def _send_monitoring_info(self, monitoring_radio: Optional[MonitoringRadioSender d: Dict = cast(Dict, manager.copy()) d['timestamp'] = datetime.datetime.now() d['last_heartbeat'] = datetime.datetime.fromtimestamp(d['last_heartbeat']) + d['run_id'] = self.run_id monitoring_radio.send((MessageType.NODE_INFO, d)) diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index c9a2dc9ed7..9dccbecd35 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -106,7 +106,7 @@ def __init__(self, self.resource_monitoring_enabled = resource_monitoring_enabled self.resource_monitoring_interval = resource_monitoring_interval - def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> None: + def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> None: logger.debug("Starting MonitoringHub") @@ -161,7 +161,6 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat "zmq_port_range": self.hub_port_range, "logdir": self.logdir, "logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO, - "run_id": run_id }, name="Monitoring-Router-Process", daemon=True, diff --git a/parsl/monitoring/router.py b/parsl/monitoring/router.py index bf395e3662..4be454b797 100644 --- a/parsl/monitoring/router.py +++ b/parsl/monitoring/router.py @@ -31,7 +31,6 @@ def __init__(self, monitoring_hub_address: str = "127.0.0.1", logdir: str = ".", - run_id: str, logging_level: int = logging.INFO, atexit_timeout: int = 3, # in seconds priority_msgs: "queue.Queue[AddressedMonitoringMessage]", @@ -71,7 +70,6 @@ def __init__(self, self.hub_address = hub_address self.atexit_timeout = atexit_timeout - self.run_id = run_id self.loop_freq = 10.0 # milliseconds @@ -172,7 +170,6 @@ def start_zmq_listener(self) -> None: msg_0 = (msg, 0) if msg[0] == MessageType.NODE_INFO: - msg[1]['run_id'] = self.run_id self.node_msgs.put(msg_0) elif msg[0] == MessageType.RESOURCE_INFO: self.resource_msgs.put(msg_0) @@ -218,8 +215,7 @@ def router_starter(comm_q: "queue.Queue[Union[Tuple[int, int], str]]", zmq_port_range: Tuple[int, int], logdir: str, - logging_level: int, - run_id: str) -> None: + logging_level: int) -> None: setproctitle("parsl: monitoring router") try: router = MonitoringRouter(hub_address=hub_address, @@ -227,7 +223,6 @@ def router_starter(comm_q: "queue.Queue[Union[Tuple[int, int], str]]", zmq_port_range=zmq_port_range, logdir=logdir, logging_level=logging_level, - run_id=run_id, priority_msgs=priority_msgs, node_msgs=node_msgs, block_msgs=block_msgs, diff --git a/parsl/tests/test_htex/test_zmq_binding.py b/parsl/tests/test_htex/test_zmq_binding.py index 2273443b99..e21c065d0d 100644 --- a/parsl/tests/test_htex/test_zmq_binding.py +++ b/parsl/tests/test_htex/test_zmq_binding.py @@ -25,7 +25,8 @@ def make_interchange(*, interchange_address: Optional[str], cert_dir: Optional[s logdir=".", logging_level=logging.INFO, manager_selector=RandomManagerSelector(), - poll_period=10) + poll_period=10, + run_id="test_run_id") @pytest.fixture From 1c7a0e40ed37b4ffe6c31633d4de4e1d9360e9f9 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 8 Aug 2024 10:40:22 +0200 Subject: [PATCH 13/26] Aggressively deprecate Channels and AdHocProvider (#3569) This is described in issue #3515. Issue #3515 has not received any comments arguing in favour of retaining channels and the AdHocprovider, and several in support of removing them, and so this PR takes a heavy handed approach that is well on the way to the end goal of #3515 of deleting channels and the AdHocProvider entirely: Channels except LocalChannel are renamed, so that any users of other channels will have to make a change to their code and actively observe the word "Deprecated" in the name. The AdHocProvider is renamed in the same way. Most documentation (but not docstrings) about channels and the ad-hoc provider is removed or replaced with a link to issue Tests which much be manually run, and so in effect are never run and shouldn't be expected to work now - parsl/tests/manual_tests and parsl/tests/integration/ - are deleted rather than fixed to follow the above naming change. The tests for SSH channels and the AdHocProvider that run in CI are modified to continue passing. Exposure of the deprecated components via top level parsl.providers and parsl.channels re-export is removed. To use this components, the deprecated modules must be imported directly. --- docs/historical/changelog.rst | 8 +-- docs/reference.rst | 13 ++--- docs/userguide/configuring.rst | 38 +++----------- docs/userguide/examples/config.py | 5 +- docs/userguide/execution.rst | 3 +- docs/userguide/plugins.rst | 11 ++--- parsl/channels/__init__.py | 5 +- parsl/channels/oauth_ssh/oauth_ssh.py | 4 +- parsl/channels/ssh/ssh.py | 2 +- parsl/channels/ssh_il/ssh_il.py | 4 +- parsl/configs/ad_hoc.py | 38 -------------- parsl/providers/__init__.py | 4 -- parsl/providers/ad_hoc/ad_hoc.py | 8 ++- parsl/tests/configs/ad_hoc_cluster_htex.py | 35 ------------- parsl/tests/configs/htex_ad_hoc_cluster.py | 26 ---------- parsl/tests/configs/local_adhoc.py | 4 +- parsl/tests/configs/swan_htex.py | 43 ---------------- .../integration/test_channels/test_scp_1.py | 45 ----------------- .../integration/test_channels/test_ssh_1.py | 40 --------------- .../test_channels/test_ssh_errors.py | 46 ----------------- .../test_channels/test_ssh_file_transport.py | 41 ---------------- .../test_channels/test_ssh_interactive.py | 24 --------- parsl/tests/manual_tests/test_ad_hoc_htex.py | 49 ------------------- parsl/tests/manual_tests/test_oauth_ssh.py | 13 ----- .../test_providers/test_local_provider.py | 11 +++-- 25 files changed, 40 insertions(+), 480 deletions(-) delete mode 100644 parsl/configs/ad_hoc.py delete mode 100644 parsl/tests/configs/ad_hoc_cluster_htex.py delete mode 100644 parsl/tests/configs/htex_ad_hoc_cluster.py delete mode 100644 parsl/tests/configs/swan_htex.py delete mode 100644 parsl/tests/integration/test_channels/test_scp_1.py delete mode 100644 parsl/tests/integration/test_channels/test_ssh_1.py delete mode 100644 parsl/tests/integration/test_channels/test_ssh_errors.py delete mode 100644 parsl/tests/integration/test_channels/test_ssh_file_transport.py delete mode 100644 parsl/tests/integration/test_channels/test_ssh_interactive.py delete mode 100644 parsl/tests/manual_tests/test_ad_hoc_htex.py delete mode 100644 parsl/tests/manual_tests/test_oauth_ssh.py diff --git a/docs/historical/changelog.rst b/docs/historical/changelog.rst index 18fe6ca5b1..931998f93d 100644 --- a/docs/historical/changelog.rst +++ b/docs/historical/changelog.rst @@ -334,7 +334,7 @@ New Functionality * New launcher: `parsl.launchers.WrappedLauncher` for launching tasks inside containers. -* `parsl.channels.SSHChannel` now supports a ``key_filename`` kwarg `issue#1639 `_ +* ``parsl.channels.SSHChannel`` now supports a ``key_filename`` kwarg `issue#1639 `_ * Newly added Makefile wraps several frequent developer operations such as: @@ -442,7 +442,7 @@ New Functionality module, parsl.data_provider.globus * `parsl.executors.WorkQueueExecutor`: a new executor that integrates functionality from `Work Queue `_ is now available. -* New provider to support for Ad-Hoc clusters `parsl.providers.AdHocProvider` +* New provider to support for Ad-Hoc clusters ``parsl.providers.AdHocProvider`` * New provider added to support LSF on Summit `parsl.providers.LSFProvider` * Support for CPU and Memory resource hints to providers `(github) `_. * The ``logging_level=logging.INFO`` in `parsl.monitoring.MonitoringHub` is replaced with ``monitoring_debug=False``: @@ -468,7 +468,7 @@ New Functionality * Several test-suite improvements that have dramatically reduced test duration. * Several improvements to the Monitoring interface. -* Configurable port on `parsl.channels.SSHChannel`. +* Configurable port on ``parsl.channels.SSHChannel``. * ``suppress_failure`` now defaults to True. * `parsl.executors.HighThroughputExecutor` is the recommended executor, and ``IPyParallelExecutor`` is deprecated. * `parsl.executors.HighThroughputExecutor` will expose worker information via environment variables: ``PARSL_WORKER_RANK`` and ``PARSL_WORKER_COUNT`` @@ -532,7 +532,7 @@ New Functionality * Cleaner user app file log management. * Updated configurations using `parsl.executors.HighThroughputExecutor` in the configuration section of the userguide. -* Support for OAuth based SSH with `parsl.channels.OAuthSSHChannel`. +* Support for OAuth based SSH with ``parsl.channels.OAuthSSHChannel``. Bug Fixes ^^^^^^^^^ diff --git a/docs/reference.rst b/docs/reference.rst index 1af850792c..d8e18bd244 100644 --- a/docs/reference.rst +++ b/docs/reference.rst @@ -38,15 +38,9 @@ Configuration Channels ======== -.. autosummary:: - :toctree: stubs - :nosignatures: - - parsl.channels.base.Channel - parsl.channels.LocalChannel - parsl.channels.SSHChannel - parsl.channels.OAuthSSHChannel - parsl.channels.SSHInteractiveLoginChannel +Channels are deprecated in Parsl. See +`issue 3515 `_ +for further discussion. Data management =============== @@ -109,7 +103,6 @@ Providers :toctree: stubs :nosignatures: - parsl.providers.AdHocProvider parsl.providers.AWSProvider parsl.providers.CobaltProvider parsl.providers.CondorProvider diff --git a/docs/userguide/configuring.rst b/docs/userguide/configuring.rst index 24ce0ca938..bb3a3949e3 100644 --- a/docs/userguide/configuring.rst +++ b/docs/userguide/configuring.rst @@ -15,7 +15,7 @@ queues, durations, and data management options. The following example shows a basic configuration object (:class:`~parsl.config.Config`) for the Frontera supercomputer at TACC. This config uses the `parsl.executors.HighThroughputExecutor` to submit -tasks from a login node (`parsl.channels.LocalChannel`). It requests an allocation of +tasks from a login node. It requests an allocation of 128 nodes, deploying 1 worker for each of the 56 cores per node, from the normal partition. To limit network connections to just the internal network the config specifies the address used by the infiniband interface with ``address_by_interface('ib0')`` @@ -23,7 +23,6 @@ used by the infiniband interface with ``address_by_interface('ib0')`` .. code-block:: python from parsl.config import Config - from parsl.channels import LocalChannel from parsl.providers import SlurmProvider from parsl.executors import HighThroughputExecutor from parsl.launchers import SrunLauncher @@ -36,7 +35,6 @@ used by the infiniband interface with ``address_by_interface('ib0')`` address=address_by_interface('ib0'), max_workers_per_node=56, provider=SlurmProvider( - channel=LocalChannel(), nodes_per_block=128, init_blocks=1, partition='normal', @@ -197,22 +195,6 @@ Stepping through the following question should help formulate a suitable configu are on a **native Slurm** system like :ref:`configuring_nersc_cori` -4) Where will the main Parsl program run and how will it communicate with the apps? - -+------------------------+--------------------------+---------------------------------------------------+ -| Parsl program location | App execution target | Suitable channel | -+========================+==========================+===================================================+ -| Laptop/Workstation | Laptop/Workstation | `parsl.channels.LocalChannel` | -+------------------------+--------------------------+---------------------------------------------------+ -| Laptop/Workstation | Cloud Resources | No channel is needed | -+------------------------+--------------------------+---------------------------------------------------+ -| Laptop/Workstation | Clusters with no 2FA | `parsl.channels.SSHChannel` | -+------------------------+--------------------------+---------------------------------------------------+ -| Laptop/Workstation | Clusters with 2FA | `parsl.channels.SSHInteractiveLoginChannel` | -+------------------------+--------------------------+---------------------------------------------------+ -| Login node | Cluster/Supercomputer | `parsl.channels.LocalChannel` | -+------------------------+--------------------------+---------------------------------------------------+ - Heterogeneous Resources ----------------------- @@ -337,7 +319,6 @@ Provide either the number of executors (Parsl will assume they are named in inte worker_debug=True, available_accelerators=2, provider=LocalProvider( - channel=LocalChannel(), init_blocks=1, max_blocks=1, ), @@ -372,7 +353,6 @@ Select the best blocking strategy for processor's cache hierarchy (choose ``alte worker_debug=True, cpu_affinity='alternating', provider=LocalProvider( - channel=LocalChannel(), init_blocks=1, max_blocks=1, ), @@ -412,18 +392,12 @@ These include ``OMP_NUM_THREADS``, ``GOMP_COMP_AFFINITY``, and ``KMP_THREAD_AFFI Ad-Hoc Clusters --------------- -Any collection of compute nodes without a scheduler can be considered an -ad-hoc cluster. Often these machines have a shared file system such as NFS or Lustre. -In order to use these resources with Parsl, they need to set-up for password-less SSH access. - -To use these ssh-accessible collection of nodes as an ad-hoc cluster, we use -the `parsl.providers.AdHocProvider` with an `parsl.channels.SSHChannel` to each node. An example -configuration follows. +Parsl's support of ad-hoc clusters of compute nodes without a scheduler +is deprecated. -.. literalinclude:: ../../parsl/configs/ad_hoc.py - -.. note:: - Multiple blocks should not be assigned to each node when using the `parsl.executors.HighThroughputExecutor` +See +`issue #3515 `_ +for further discussion. Amazon Web Services ------------------- diff --git a/docs/userguide/examples/config.py b/docs/userguide/examples/config.py index 166faaf4ac..68057d2b01 100644 --- a/docs/userguide/examples/config.py +++ b/docs/userguide/examples/config.py @@ -1,4 +1,3 @@ -from parsl.channels import LocalChannel from parsl.config import Config from parsl.executors import HighThroughputExecutor from parsl.providers import LocalProvider @@ -8,9 +7,7 @@ HighThroughputExecutor( label="htex_local", cores_per_worker=1, - provider=LocalProvider( - channel=LocalChannel(), - ), + provider=LocalProvider(), ) ], ) diff --git a/docs/userguide/execution.rst b/docs/userguide/execution.rst index 4168367f9d..df17dc458f 100644 --- a/docs/userguide/execution.rst +++ b/docs/userguide/execution.rst @@ -47,8 +47,7 @@ Parsl currently supports the following providers: 7. `parsl.providers.AWSProvider`: This provider allows you to provision and manage cloud nodes from Amazon Web Services. 8. `parsl.providers.GoogleCloudProvider`: This provider allows you to provision and manage cloud nodes from Google Cloud. 9. `parsl.providers.KubernetesProvider`: This provider allows you to provision and manage containers on a Kubernetes cluster. -10. `parsl.providers.AdHocProvider`: This provider allows you manage execution over a collection of nodes to form an ad-hoc cluster. -11. `parsl.providers.LSFProvider`: This provider allows you to schedule resources via IBM's LSF scheduler. +10. `parsl.providers.LSFProvider`: This provider allows you to schedule resources via IBM's LSF scheduler. diff --git a/docs/userguide/plugins.rst b/docs/userguide/plugins.rst index 4ecff86cfe..c3c38dea63 100644 --- a/docs/userguide/plugins.rst +++ b/docs/userguide/plugins.rst @@ -16,8 +16,8 @@ executor to run code on the local submitting host, while another executor can run the same code on a large supercomputer. -Providers, Launchers and Channels ---------------------------------- +Providers and Launchers +----------------------- Some executors are based on blocks of workers (for example the `parsl.executors.HighThroughputExecutor`: the submit side requires a batch system (eg slurm, kubernetes) to start worker processes, which then @@ -34,10 +34,9 @@ add on any wrappers that are needed to launch the command (eg srun inside slurm). Providers and launchers are usually paired together for a particular system type. -A `Channel` allows the commands used to interact with an `ExecutionProvider` to be -executed on a remote system. The default channel executes commands on the -local system, but a few variants of an `parsl.channels.SSHChannel` are provided. - +Parsl also has a deprecated ``Channel`` abstraction. See +`issue 3515 `_ +for further discussion. File staging ------------ diff --git a/parsl/channels/__init__.py b/parsl/channels/__init__.py index 5a45d15278..c81f6a8bf1 100644 --- a/parsl/channels/__init__.py +++ b/parsl/channels/__init__.py @@ -1,7 +1,4 @@ from parsl.channels.base import Channel from parsl.channels.local.local import LocalChannel -from parsl.channels.oauth_ssh.oauth_ssh import OAuthSSHChannel -from parsl.channels.ssh.ssh import SSHChannel -from parsl.channels.ssh_il.ssh_il import SSHInteractiveLoginChannel -__all__ = ['Channel', 'SSHChannel', 'LocalChannel', 'SSHInteractiveLoginChannel', 'OAuthSSHChannel'] +__all__ = ['Channel', 'LocalChannel'] diff --git a/parsl/channels/oauth_ssh/oauth_ssh.py b/parsl/channels/oauth_ssh/oauth_ssh.py index c9efa27767..3173b163a8 100644 --- a/parsl/channels/oauth_ssh/oauth_ssh.py +++ b/parsl/channels/oauth_ssh/oauth_ssh.py @@ -3,7 +3,7 @@ import paramiko -from parsl.channels.ssh.ssh import SSHChannel +from parsl.channels.ssh.ssh import DeprecatedSSHChannel from parsl.errors import OptionalModuleMissing try: @@ -17,7 +17,7 @@ logger = logging.getLogger(__name__) -class OAuthSSHChannel(SSHChannel): +class DeprecatedOAuthSSHChannel(DeprecatedSSHChannel): """SSH persistent channel. This enables remote execution on sites accessible via ssh. This channel uses Globus based OAuth tokens for authentication. """ diff --git a/parsl/channels/ssh/ssh.py b/parsl/channels/ssh/ssh.py index bf33727e63..38b8afe47b 100644 --- a/parsl/channels/ssh/ssh.py +++ b/parsl/channels/ssh/ssh.py @@ -24,7 +24,7 @@ def _auth(self, username, *args): return -class SSHChannel(Channel, RepresentationMixin): +class DeprecatedSSHChannel(Channel, RepresentationMixin): ''' SSH persistent channel. This enables remote execution on sites accessible via ssh. It is assumed that the user has setup host keys so as to ssh to the remote host. Which goes to say that the following diff --git a/parsl/channels/ssh_il/ssh_il.py b/parsl/channels/ssh_il/ssh_il.py index 02e7a58cd4..3a5e0c5096 100644 --- a/parsl/channels/ssh_il/ssh_il.py +++ b/parsl/channels/ssh_il/ssh_il.py @@ -3,12 +3,12 @@ import paramiko -from parsl.channels.ssh.ssh import SSHChannel +from parsl.channels.ssh.ssh import DeprecatedSSHChannel logger = logging.getLogger(__name__) -class SSHInteractiveLoginChannel(SSHChannel): +class DeprecatedSSHInteractiveLoginChannel(DeprecatedSSHChannel): """SSH persistent channel. This enables remote execution on sites accessible via ssh. This channel supports interactive login and is appropriate when keys are not set up. diff --git a/parsl/configs/ad_hoc.py b/parsl/configs/ad_hoc.py deleted file mode 100644 index 05b0e8190d..0000000000 --- a/parsl/configs/ad_hoc.py +++ /dev/null @@ -1,38 +0,0 @@ -from typing import Any, Dict - -from parsl.channels import SSHChannel -from parsl.config import Config -from parsl.executors import HighThroughputExecutor -from parsl.providers import AdHocProvider -from parsl.usage_tracking.levels import LEVEL_1 - -user_opts: Dict[str, Dict[str, Any]] -user_opts = {'adhoc': - {'username': 'YOUR_USERNAME', - 'script_dir': 'YOUR_SCRIPT_DIR', - 'remote_hostnames': ['REMOTE_HOST_URL_1', 'REMOTE_HOST_URL_2'] - } - } - - -config = Config( - executors=[ - HighThroughputExecutor( - label='remote_htex', - max_workers_per_node=2, - worker_logdir_root=user_opts['adhoc']['script_dir'], - provider=AdHocProvider( - # Command to be run before starting a worker, such as: - # 'module load Anaconda; source activate parsl_env'. - worker_init='', - channels=[SSHChannel(hostname=m, - username=user_opts['adhoc']['username'], - script_dir=user_opts['adhoc']['script_dir'], - ) for m in user_opts['adhoc']['remote_hostnames']] - ) - ) - ], - # AdHoc Clusters should not be setup with scaling strategy. - strategy='none', - usage_tracking=LEVEL_1, -) diff --git a/parsl/providers/__init__.py b/parsl/providers/__init__.py index 475737f1f9..150f425f3d 100644 --- a/parsl/providers/__init__.py +++ b/parsl/providers/__init__.py @@ -1,6 +1,3 @@ -# Workstation Provider -from parsl.providers.ad_hoc.ad_hoc import AdHocProvider - # Cloud Providers from parsl.providers.aws.aws import AWSProvider from parsl.providers.azure.azure import AzureProvider @@ -24,7 +21,6 @@ 'SlurmProvider', 'TorqueProvider', 'LSFProvider', - 'AdHocProvider', 'PBSProProvider', 'AWSProvider', 'GoogleCloudProvider', diff --git a/parsl/providers/ad_hoc/ad_hoc.py b/parsl/providers/ad_hoc/ad_hoc.py index 207dd55738..9059648101 100644 --- a/parsl/providers/ad_hoc/ad_hoc.py +++ b/parsl/providers/ad_hoc/ad_hoc.py @@ -12,8 +12,12 @@ logger = logging.getLogger(__name__) -class AdHocProvider(ExecutionProvider, RepresentationMixin): - """ Ad-hoc execution provider +class DeprecatedAdHocProvider(ExecutionProvider, RepresentationMixin): + """ Deprecated ad-hoc execution provider + + The (former) AdHocProvider is deprecated. See + `issue #3515 `_ + for further discussion. This provider is used to provision execution resources over one or more ad hoc nodes that are each accessible over a Channel (say, ssh) but otherwise lack a cluster scheduler. diff --git a/parsl/tests/configs/ad_hoc_cluster_htex.py b/parsl/tests/configs/ad_hoc_cluster_htex.py deleted file mode 100644 index 0949b82392..0000000000 --- a/parsl/tests/configs/ad_hoc_cluster_htex.py +++ /dev/null @@ -1,35 +0,0 @@ -from typing import Any, Dict - -from parsl.channels import SSHChannel -from parsl.config import Config -from parsl.executors import HighThroughputExecutor -from parsl.providers import AdHocProvider - -user_opts = {'adhoc': - {'username': 'YOUR_USERNAME', - 'script_dir': 'YOUR_SCRIPT_DIR', - 'remote_hostnames': ['REMOTE_HOST_URL_1', 'REMOTE_HOST_URL_2'] - } - } # type: Dict[str, Dict[str, Any]] - -config = Config( - executors=[ - HighThroughputExecutor( - label='remote_htex', - max_workers_per_node=2, - worker_logdir_root=user_opts['adhoc']['script_dir'], - encrypted=True, - provider=AdHocProvider( - # Command to be run before starting a worker, such as: - # 'module load Anaconda; source activate parsl_env'. - worker_init='', - channels=[SSHChannel(hostname=m, - username=user_opts['adhoc']['username'], - script_dir=user_opts['adhoc']['script_dir'], - ) for m in user_opts['adhoc']['remote_hostnames']] - ) - ) - ], - # AdHoc Clusters should not be setup with scaling strategy. - strategy='none', -) diff --git a/parsl/tests/configs/htex_ad_hoc_cluster.py b/parsl/tests/configs/htex_ad_hoc_cluster.py deleted file mode 100644 index db24b42ab2..0000000000 --- a/parsl/tests/configs/htex_ad_hoc_cluster.py +++ /dev/null @@ -1,26 +0,0 @@ -from parsl.channels import SSHChannel -from parsl.config import Config -from parsl.executors import HighThroughputExecutor -from parsl.providers import AdHocProvider -from parsl.tests.configs.user_opts import user_opts - -config = Config( - executors=[ - HighThroughputExecutor( - label='remote_htex', - cores_per_worker=1, - worker_debug=False, - address=user_opts['public_ip'], - encrypted=True, - provider=AdHocProvider( - move_files=False, - parallelism=1, - worker_init=user_opts['adhoc']['worker_init'], - channels=[SSHChannel(hostname=m, - username=user_opts['adhoc']['username'], - script_dir=user_opts['adhoc']['script_dir'], - ) for m in user_opts['adhoc']['remote_hostnames']] - ) - ) - ], -) diff --git a/parsl/tests/configs/local_adhoc.py b/parsl/tests/configs/local_adhoc.py index 25b1f38d61..9b1f951842 100644 --- a/parsl/tests/configs/local_adhoc.py +++ b/parsl/tests/configs/local_adhoc.py @@ -1,7 +1,7 @@ from parsl.channels import LocalChannel from parsl.config import Config from parsl.executors import HighThroughputExecutor -from parsl.providers import AdHocProvider +from parsl.providers.ad_hoc.ad_hoc import DeprecatedAdHocProvider def fresh_config(): @@ -10,7 +10,7 @@ def fresh_config(): HighThroughputExecutor( label='AdHoc', encrypted=True, - provider=AdHocProvider( + provider=DeprecatedAdHocProvider( channels=[LocalChannel(), LocalChannel()] ) ) diff --git a/parsl/tests/configs/swan_htex.py b/parsl/tests/configs/swan_htex.py deleted file mode 100644 index 3b1b6785ab..0000000000 --- a/parsl/tests/configs/swan_htex.py +++ /dev/null @@ -1,43 +0,0 @@ -""" -================== Block -| ++++++++++++++ | Node -| | | | -| | Task | | . . . -| | | | -| ++++++++++++++ | -================== -""" -from parsl.channels import SSHChannel -from parsl.config import Config -from parsl.executors import HighThroughputExecutor -from parsl.launchers import AprunLauncher -from parsl.providers import TorqueProvider - -# If you are a developer running tests, make sure to update parsl/tests/configs/user_opts.py -# If you are a user copying-and-pasting this as an example, make sure to either -# 1) create a local `user_opts.py`, or -# 2) delete the user_opts import below and replace all appearances of `user_opts` with the literal value -# (i.e., user_opts['swan']['username'] -> 'your_username') -from .user_opts import user_opts - -config = Config( - executors=[ - HighThroughputExecutor( - label='swan_htex', - encrypted=True, - provider=TorqueProvider( - channel=SSHChannel( - hostname='swan.cray.com', - username=user_opts['swan']['username'], - script_dir=user_opts['swan']['script_dir'], - ), - nodes_per_block=1, - init_blocks=1, - max_blocks=1, - launcher=AprunLauncher(), - scheduler_options=user_opts['swan']['scheduler_options'], - worker_init=user_opts['swan']['worker_init'], - ), - ) - ] -) diff --git a/parsl/tests/integration/test_channels/test_scp_1.py b/parsl/tests/integration/test_channels/test_scp_1.py deleted file mode 100644 index c11df3c663..0000000000 --- a/parsl/tests/integration/test_channels/test_scp_1.py +++ /dev/null @@ -1,45 +0,0 @@ -import os - -from parsl.channels.ssh.ssh import SSHChannel as SSH - - -def connect_and_list(hostname, username): - out = '' - conn = SSH(hostname, username=username) - conn.push_file(os.path.abspath('remote_run.sh'), '/home/davidk/') - # ec, out, err = conn.execute_wait("ls /tmp/remote_run.sh; bash /tmp/remote_run.sh") - conn.close() - return out - - -script = '''#!/bin/bash -echo "Hostname: $HOSTNAME" -echo "Cpu info -----" -cat /proc/cpuinfo -echo "Done----------" -''' - - -def test_connect_1(): - with open('remote_run.sh', 'w') as f: - f.write(script) - - sites = { - 'midway': { - 'url': 'midway.rcc.uchicago.edu', - 'uname': 'yadunand' - }, - 'swift': { - 'url': 'swift.rcc.uchicago.edu', - 'uname': 'yadunand' - } - } - - for site in sites.values(): - out = connect_and_list(site['url'], site['uname']) - print("Sitename :{0} hostname:{1}".format(site['url'], out)) - - -if __name__ == "__main__": - - test_connect_1() diff --git a/parsl/tests/integration/test_channels/test_ssh_1.py b/parsl/tests/integration/test_channels/test_ssh_1.py deleted file mode 100644 index 61ab3f2705..0000000000 --- a/parsl/tests/integration/test_channels/test_ssh_1.py +++ /dev/null @@ -1,40 +0,0 @@ -from parsl.channels.ssh.ssh import SSHChannel as SSH - - -def connect_and_list(hostname, username): - conn = SSH(hostname, username=username) - ec, out, err = conn.execute_wait("echo $HOSTNAME") - conn.close() - return out - - -def test_midway(): - ''' Test ssh channels to midway - ''' - url = 'midway.rcc.uchicago.edu' - uname = 'yadunand' - out = connect_and_list(url, uname) - print("Sitename :{0} hostname:{1}".format(url, out)) - - -def test_beagle(): - ''' Test ssh channels to beagle - ''' - url = 'login04.beagle.ci.uchicago.edu' - uname = 'yadunandb' - out = connect_and_list(url, uname) - print("Sitename :{0} hostname:{1}".format(url, out)) - - -def test_osg(): - ''' Test ssh connectivity to osg - ''' - url = 'login.osgconnect.net' - uname = 'yadunand' - out = connect_and_list(url, uname) - print("Sitename :{0} hostname:{1}".format(url, out)) - - -if __name__ == "__main__": - - pass diff --git a/parsl/tests/integration/test_channels/test_ssh_errors.py b/parsl/tests/integration/test_channels/test_ssh_errors.py deleted file mode 100644 index 7483e30a5c..0000000000 --- a/parsl/tests/integration/test_channels/test_ssh_errors.py +++ /dev/null @@ -1,46 +0,0 @@ -from parsl.channels.errors import BadHostKeyException, SSHException -from parsl.channels.ssh.ssh import SSHChannel as SSH - - -def connect_and_list(hostname, username): - conn = SSH(hostname, username=username) - ec, out, err = conn.execute_wait("echo $HOSTNAME") - conn.close() - return out - - -def test_error_1(): - try: - connect_and_list("bad.url.gov", "ubuntu") - except Exception as e: - assert type(e) is SSHException, "Expected SSException, got: {0}".format(e) - - -def test_error_2(): - try: - connect_and_list("swift.rcc.uchicago.edu", "mango") - except SSHException: - print("Caught the right exception") - else: - raise Exception("Expected SSException, got: {0}".format(e)) - - -def test_error_3(): - ''' This should work - ''' - try: - connect_and_list("edison.nersc.gov", "yadunand") - except BadHostKeyException as e: - print("Caught exception BadHostKeyException: ", e) - else: - assert False, "Expected SSException, got: {0}".format(e) - - -if __name__ == "__main__": - - tests = [test_error_1, test_error_2, test_error_3] - - for test in tests: - print("---------Running : {0}---------------".format(test)) - test() - print("----------------------DONE--------------------------") diff --git a/parsl/tests/integration/test_channels/test_ssh_file_transport.py b/parsl/tests/integration/test_channels/test_ssh_file_transport.py deleted file mode 100644 index 61672c3ff5..0000000000 --- a/parsl/tests/integration/test_channels/test_ssh_file_transport.py +++ /dev/null @@ -1,41 +0,0 @@ -import parsl -from parsl.channels.ssh.ssh import SSHChannel as SSH - - -def connect_and_list(hostname, username): - conn = SSH(hostname, username=username) - ec, out, err = conn.execute_wait("echo $HOSTNAME") - conn.close() - return out - - -def test_push(conn, fname="test001.txt"): - - with open(fname, 'w') as f: - f.write("Hello from parsl.ssh testing\n") - - conn.push_file(fname, "/tmp") - ec, out, err = conn.execute_wait("ls /tmp/{0}".format(fname)) - print(ec, out, err) - - -def test_pull(conn, fname="test001.txt"): - - local = "foo" - conn.pull_file("/tmp/{0}".format(fname), local) - - with open("{0}/{1}".format(local, fname), 'r') as f: - print(f.readlines()) - - -if __name__ == "__main__": - - parsl.set_stream_logger() - - # This is for testing - conn = SSH("midway.rcc.uchicago.edu", username="yadunand") - - test_push(conn) - test_pull(conn) - - conn.close() diff --git a/parsl/tests/integration/test_channels/test_ssh_interactive.py b/parsl/tests/integration/test_channels/test_ssh_interactive.py deleted file mode 100644 index c6f9b9dea9..0000000000 --- a/parsl/tests/integration/test_channels/test_ssh_interactive.py +++ /dev/null @@ -1,24 +0,0 @@ -import parsl -from parsl.channels.ssh_il.ssh_il import SSHInteractiveLoginChannel as SSH - - -def connect_and_list(hostname, username): - conn = SSH(hostname, username=username) - ec, out, err = conn.execute_wait("echo $HOSTNAME") - conn.close() - return out - - -def test_cooley(): - ''' Test ssh channels to midway - ''' - url = 'cooley.alcf.anl.gov' - uname = 'yadunand' - out = connect_and_list(url, uname) - print("Sitename :{0} hostname:{1}".format(url, out)) - return - - -if __name__ == "__main__": - parsl.set_stream_logger() - test_cooley() diff --git a/parsl/tests/manual_tests/test_ad_hoc_htex.py b/parsl/tests/manual_tests/test_ad_hoc_htex.py deleted file mode 100644 index dfa34ec0d1..0000000000 --- a/parsl/tests/manual_tests/test_ad_hoc_htex.py +++ /dev/null @@ -1,49 +0,0 @@ -import parsl -from parsl import python_app - -parsl.set_stream_logger() - -from parsl.channels import SSHChannel -from parsl.config import Config -from parsl.executors import HighThroughputExecutor -from parsl.providers import AdHocProvider - -remotes = ['midway2-login2.rcc.uchicago.edu', 'midway2-login1.rcc.uchicago.edu'] - -config = Config( - executors=[ - HighThroughputExecutor( - label='AdHoc', - max_workers_per_node=2, - worker_logdir_root="/scratch/midway2/yadunand/parsl_scripts", - encrypted=True, - provider=AdHocProvider( - worker_init="source /scratch/midway2/yadunand/parsl_env_setup.sh", - channels=[SSHChannel(hostname=m, - username="yadunand", - script_dir="/scratch/midway2/yadunand/parsl_cluster") - for m in remotes] - ) - ) - ] -) - - -@python_app -def platform(sleep=2, stdout=None): - import platform - import time - time.sleep(sleep) - return platform.uname() - - -def test_raw_provider(): - - parsl.load(config) - - x = [platform() for i in range(10)] - print([i.result() for i in x]) - - -if __name__ == "__main__": - test_raw_provider() diff --git a/parsl/tests/manual_tests/test_oauth_ssh.py b/parsl/tests/manual_tests/test_oauth_ssh.py deleted file mode 100644 index 3d464bcc0e..0000000000 --- a/parsl/tests/manual_tests/test_oauth_ssh.py +++ /dev/null @@ -1,13 +0,0 @@ -from parsl.channels import OAuthSSHChannel - - -def test_channel(): - channel = OAuthSSHChannel(hostname='ssh.demo.globus.org', username='yadunand') - x, stdout, stderr = channel.execute_wait('ls') - print(x, stdout, stderr) - assert x == 0, "Expected exit code 0, got {}".format(x) - - -if __name__ == '__main__': - - test_channel() diff --git a/parsl/tests/test_providers/test_local_provider.py b/parsl/tests/test_providers/test_local_provider.py index c6844b00c0..497c13370d 100644 --- a/parsl/tests/test_providers/test_local_provider.py +++ b/parsl/tests/test_providers/test_local_provider.py @@ -11,7 +11,8 @@ import pytest -from parsl.channels import LocalChannel, SSHChannel +from parsl.channels import LocalChannel +from parsl.channels.ssh.ssh import DeprecatedSSHChannel from parsl.jobs.states import JobState from parsl.launchers import SingleNodeLauncher from parsl.providers import LocalProvider @@ -92,10 +93,10 @@ def test_ssh_channel(): # already exist, so create it here. pathlib.Path('{}/known.hosts'.format(config_dir)).touch(mode=0o600) script_dir = tempfile.mkdtemp() - channel = SSHChannel('127.0.0.1', port=server_port, - script_dir=remote_script_dir, - host_keys_filename='{}/known.hosts'.format(config_dir), - key_filename=priv_key) + channel = DeprecatedSSHChannel('127.0.0.1', port=server_port, + script_dir=remote_script_dir, + host_keys_filename='{}/known.hosts'.format(config_dir), + key_filename=priv_key) try: p = LocalProvider(channel=channel, launcher=SingleNodeLauncher(debug=False)) From 114e701b81f1abbc71a4fd438896fece16784f4d Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 8 Aug 2024 16:08:47 +0200 Subject: [PATCH 14/26] Close processes in Work Queue and Task Vine shutdown (#3576) This releases 2 file descriptors with work queue (from 21 to 19 at the end of CI Work Queue test) and 4 file descriptors with Task Vine (from 19 to 15 at the end of CI Task Vine test) This is part of work being merged from draft PR #3397 to shut down components more cleanly, rather than relying on process exit. --- parsl/executors/taskvine/executor.py | 2 ++ parsl/executors/workqueue/executor.py | 2 ++ 2 files changed, 4 insertions(+) diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index bebed1a51b..2e1efb211f 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -589,11 +589,13 @@ def shutdown(self, *args, **kwargs): # Join all processes before exiting logger.debug("Joining on submit process") self._submit_process.join() + self._submit_process.close() logger.debug("Joining on collector thread") self._collector_thread.join() if self.worker_launch_method == 'factory': logger.debug("Joining on factory process") self._factory_process.join() + self._factory_process.close() # Shutdown multiprocessing queues self._ready_task_queue.close() diff --git a/parsl/executors/workqueue/executor.py b/parsl/executors/workqueue/executor.py index a1ad49bca9..ae39f8c118 100644 --- a/parsl/executors/workqueue/executor.py +++ b/parsl/executors/workqueue/executor.py @@ -704,6 +704,8 @@ def shutdown(self, *args, **kwargs): logger.debug("Joining on submit process") self.submit_process.join() + self.submit_process.close() + logger.debug("Joining on collector thread") self.collector_thread.join() From ec9bbf63807c2d55fea6a8fccbdfb9bec7077950 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 8 Aug 2024 17:00:10 +0200 Subject: [PATCH 15/26] Promote unable to terminate warning to logger.WARNING (#3574) Even if the subsequent SIGKILL works, this is an exceptional circumstance that should be logged. --- parsl/executors/high_throughput/executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 1a56195c07..301052c4c5 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -832,7 +832,7 @@ def shutdown(self, timeout: float = 10.0): try: self.interchange_proc.wait(timeout=timeout) except subprocess.TimeoutExpired: - logger.info("Unable to terminate Interchange process; sending SIGKILL") + logger.warning("Unable to terminate Interchange process; sending SIGKILL") self.interchange_proc.kill() logger.info("Closing ZMQ pipes") From 03e94c3619943db468feb25051f7b7e2c9933f09 Mon Sep 17 00:00:00 2001 From: Yadu Nand Babuji Date: Fri, 9 Aug 2024 15:22:30 -0500 Subject: [PATCH 16/26] Adding notes on `available_accelerators` (#3577) * Adding notes on how to specify list of strings to available_accelerators * Clarify how to bind multiple GPUs to workers --- docs/userguide/configuring.rst | 43 ++++++++++++++++++++++++++++++---- 1 file changed, 39 insertions(+), 4 deletions(-) diff --git a/docs/userguide/configuring.rst b/docs/userguide/configuring.rst index bb3a3949e3..f3fe5cc407 100644 --- a/docs/userguide/configuring.rst +++ b/docs/userguide/configuring.rst @@ -306,9 +306,13 @@ and Work Queue does not require Python to run. Accelerators ------------ -Many modern clusters provide multiple accelerators per compute note, yet many applications are best suited to using a single accelerator per task. -Parsl supports pinning each worker to difference accelerators using ``available_accelerators`` option of the :class:`~parsl.executors.HighThroughputExecutor`. -Provide either the number of executors (Parsl will assume they are named in integers starting from zero) or a list of the names of the accelerators available on the node. +Many modern clusters provide multiple accelerators per compute note, yet many applications are best suited to using a +single accelerator per task. Parsl supports pinning each worker to different accelerators using +``available_accelerators`` option of the :class:`~parsl.executors.HighThroughputExecutor`. Provide either the number of +executors (Parsl will assume they are named in integers starting from zero) or a list of the names of the accelerators +available on the node. Parsl will limit the number of workers it launches to the number of accelerators specified, +in other words, you cannot have more workers per node than there are accelerators. By default, Parsl will launch +as many workers as the accelerators specified via ``available_accelerators``. .. code-block:: python @@ -327,7 +331,38 @@ Provide either the number of executors (Parsl will assume they are named in inte strategy='none', ) -For hardware that uses Nvidia devices, Parsl allows for the oversubscription of workers to GPUS. This is intended to make use of Nvidia's `Multi-Process Service (MPS) `_ available on many of their GPUs that allows users to run multiple concurrent processes on a single GPU. The user needs to set in the ``worker_init`` commands to start MPS on every node in the block (this is machine dependent). The ``available_accelerators`` option should then be set to the total number of GPU partitions run on a single node in the block. For example, for a node with 4 Nvidia GPUs, to create 8 workers per GPU, set ``available_accelerators=32``. GPUs will be assigned to workers in ascending order in contiguous blocks. In the example, workers 0-7 will be placed on GPU 0, workers 8-15 on GPU 1, workers 16-23 on GPU 2, and workers 24-31 on GPU 3. +It is possible to bind multiple/specific accelerators to each worker by specifying a list of comma separated strings +each specifying accelerators. In the context of binding to NVIDIA GPUs, this works by setting ``CUDA_VISIBLE_DEVICES`` +on each worker to a specific string in the list supplied to ``available_accelerators``. + +Here's an example: + +.. code-block:: python + + # The following config is trimmed for clarity + local_config = Config( + executors=[ + HighThroughputExecutor( + # Starts 2 workers per node, each bound to 2 GPUs + available_accelerators=["0,1", "2,3"], + + # Start a single worker bound to all 4 GPUs + # available_accelerators=["0,1,2,3"] + ) + ], + ) + +GPU Oversubscription +"""""""""""""""""""" + +For hardware that uses Nvidia devices, Parsl allows for the oversubscription of workers to GPUS. This is intended to +make use of Nvidia's `Multi-Process Service (MPS) `_ available on many of their +GPUs that allows users to run multiple concurrent processes on a single GPU. The user needs to set in the +``worker_init`` commands to start MPS on every node in the block (this is machine dependent). The +``available_accelerators`` option should then be set to the total number of GPU partitions run on a single node in the +block. For example, for a node with 4 Nvidia GPUs, to create 8 workers per GPU, set ``available_accelerators=32``. +GPUs will be assigned to workers in ascending order in contiguous blocks. In the example, workers 0-7 will be placed +on GPU 0, workers 8-15 on GPU 1, workers 16-23 on GPU 2, and workers 24-31 on GPU 3. Multi-Threaded Applications --------------------------- From 2067b407bbf6e0d9d9ab66ab5b2393642907a1ae Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Sat, 10 Aug 2024 09:34:44 +0200 Subject: [PATCH 17/26] Convert monitoring type annotations to PEP-526 from comments (#3573) This is in preparation for future type work in the monitoring codebase (for example, see PR #3572). This PR does not claim that the types it is moving around are correct (and PR #3572 contains some instances where the types are incorrect). It is a purely syntactic PR. After this PR, $ git grep '# type:' parsl/monitoring/ returns two remaining comment style annotations, which are 'type: ignore' exclusions not specific types. --- parsl/monitoring/db_manager.py | 20 ++++++++++---------- parsl/monitoring/remote.py | 8 ++++---- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/parsl/monitoring/db_manager.py b/parsl/monitoring/db_manager.py index 9f19cd9f4d..8f9f302640 100644 --- a/parsl/monitoring/db_manager.py +++ b/parsl/monitoring/db_manager.py @@ -283,7 +283,7 @@ def __init__(self, ): self.workflow_end = False - self.workflow_start_message = None # type: Optional[MonitoringMessage] + self.workflow_start_message: Optional[MonitoringMessage] = None self.logdir = logdir os.makedirs(self.logdir, exist_ok=True) @@ -299,10 +299,10 @@ def __init__(self, self.batching_interval = batching_interval self.batching_threshold = batching_threshold - self.pending_priority_queue = queue.Queue() # type: queue.Queue[TaggedMonitoringMessage] - self.pending_node_queue = queue.Queue() # type: queue.Queue[MonitoringMessage] - self.pending_block_queue = queue.Queue() # type: queue.Queue[MonitoringMessage] - self.pending_resource_queue = queue.Queue() # type: queue.Queue[MonitoringMessage] + self.pending_priority_queue: queue.Queue[TaggedMonitoringMessage] = queue.Queue() + self.pending_node_queue: queue.Queue[MonitoringMessage] = queue.Queue() + self.pending_block_queue: queue.Queue[MonitoringMessage] = queue.Queue() + self.pending_resource_queue: queue.Queue[MonitoringMessage] = queue.Queue() def start(self, priority_queue: "queue.Queue[TaggedMonitoringMessage]", @@ -351,18 +351,18 @@ def start(self, If that happens, the message will be added to deferred_resource_messages and processed later. """ - inserted_tasks = set() # type: Set[object] + inserted_tasks: Set[object] = set() """ like inserted_tasks but for task,try tuples """ - inserted_tries = set() # type: Set[Any] + inserted_tries: Set[Any] = set() # for any task ID, we can defer exactly one message, which is the # assumed-to-be-unique first message (with first message flag set). # The code prior to this patch will discard previous message in # the case of multiple messages to defer. - deferred_resource_messages = {} # type: MonitoringMessage + deferred_resource_messages: MonitoringMessage = {} exception_happened = False @@ -505,7 +505,7 @@ def start(self, "Got {} messages from block queue".format(len(block_info_messages))) # block_info_messages is possibly a nested list of dict (at different polling times) # Each dict refers to the info of a job/block at one polling time - block_messages_to_insert = [] # type: List[Any] + block_messages_to_insert: List[Any] = [] for block_msg in block_info_messages: block_messages_to_insert.extend(block_msg) self._insert(table=BLOCK, messages=block_messages_to_insert) @@ -686,7 +686,7 @@ def _insert(self, table: str, messages: List[MonitoringMessage]) -> None: logger.exception("Rollback failed") def _get_messages_in_batch(self, msg_queue: "queue.Queue[X]") -> List[X]: - messages = [] # type: List[X] + messages: List[X] = [] start = time.time() while True: if time.time() - start >= self.batching_interval or len(messages) >= self.batching_threshold: diff --git a/parsl/monitoring/remote.py b/parsl/monitoring/remote.py index 055a013627..d374338dee 100644 --- a/parsl/monitoring/remote.py +++ b/parsl/monitoring/remote.py @@ -199,10 +199,10 @@ def monitor(pid: int, pm = psutil.Process(pid) - children_user_time = {} # type: Dict[int, float] - children_system_time = {} # type: Dict[int, float] - children_num_ctx_switches_voluntary = {} # type: Dict[int, float] - children_num_ctx_switches_involuntary = {} # type: Dict[int, float] + children_user_time: Dict[int, float] = {} + children_system_time: Dict[int, float] = {} + children_num_ctx_switches_voluntary: Dict[int, float] = {} + children_num_ctx_switches_involuntary: Dict[int, float] = {} def accumulate_and_prepare() -> Dict[str, Any]: d = {"psutil_process_" + str(k): v for k, v in pm.as_dict().items() if k in simple} From ffb364450c943f827fdc815d05ade40ebaf2724f Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Wed, 14 Aug 2024 22:57:12 +0200 Subject: [PATCH 18/26] Correct and check types on monitoring router and database processes (#3572) Prior to this PR, the startup code for the monitoring router and database processes had type annotations on queues; but these types were not checked, and were incorrect - they were labelled process-local Queue instead of multiprocessing queues. This did not cause much trouble execution- and mypy-wise, as the interfaces of those two classes are similar enough, but it is confusing to read in a part of the codebase that is already confusing (that confusion is probably what lead to the incorrect annotations in the first place...) They were not checked because the informal policy of "internal stuff is checked with mypy, external interfaces are checked with typeguard" works badly here: The startup methods are launched using multiprocessing.Process, and function invocations are not type-checked by mypy across a multiprocessing Process constructor. Changed Behaviour This PR introduces typeguard decorators onto the router and database start methods so that this internal checking happens at runtime. This consequently reveals that the type annotations of these methods are incorrect, and so this PR makes those consequential changes. Further, generic types (Queue[MessageType]) are not supported on multiprocessing.Queues before Python 3.12 - so those generic indices are removed from the type annotations. That is unfortunate and weakens in-process static verification - but they could be re-introduced after Parsl drops Python 3.11 support (around 2027 in the present informal support policy) --- parsl/monitoring/db_manager.py | 22 +++++++++++++--------- parsl/monitoring/router.py | 26 ++++++++++++++------------ 2 files changed, 27 insertions(+), 21 deletions(-) diff --git a/parsl/monitoring/db_manager.py b/parsl/monitoring/db_manager.py index 8f9f302640..853bc4c3c7 100644 --- a/parsl/monitoring/db_manager.py +++ b/parsl/monitoring/db_manager.py @@ -1,11 +1,14 @@ import datetime import logging +import multiprocessing.queues as mpq import os import queue import threading import time from typing import Any, Dict, List, Optional, Set, Tuple, TypeVar, cast +import typeguard + from parsl.dataflow.states import States from parsl.errors import OptionalModuleMissing from parsl.log_utils import set_file_logger @@ -305,10 +308,10 @@ def __init__(self, self.pending_resource_queue: queue.Queue[MonitoringMessage] = queue.Queue() def start(self, - priority_queue: "queue.Queue[TaggedMonitoringMessage]", - node_queue: "queue.Queue[MonitoringMessage]", - block_queue: "queue.Queue[MonitoringMessage]", - resource_queue: "queue.Queue[MonitoringMessage]") -> None: + priority_queue: mpq.Queue, + node_queue: mpq.Queue, + block_queue: mpq.Queue, + resource_queue: mpq.Queue) -> None: self._kill_event = threading.Event() self._priority_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal, @@ -719,11 +722,12 @@ def close(self) -> None: @wrap_with_logs(target="database_manager") -def dbm_starter(exception_q: "queue.Queue[Tuple[str, str]]", - priority_msgs: "queue.Queue[TaggedMonitoringMessage]", - node_msgs: "queue.Queue[MonitoringMessage]", - block_msgs: "queue.Queue[MonitoringMessage]", - resource_msgs: "queue.Queue[MonitoringMessage]", +@typeguard.typechecked +def dbm_starter(exception_q: mpq.Queue, + priority_msgs: mpq.Queue, + node_msgs: mpq.Queue, + block_msgs: mpq.Queue, + resource_msgs: mpq.Queue, db_url: str, logdir: str, logging_level: int) -> None: diff --git a/parsl/monitoring/router.py b/parsl/monitoring/router.py index 4be454b797..7cce223048 100644 --- a/parsl/monitoring/router.py +++ b/parsl/monitoring/router.py @@ -1,15 +1,16 @@ from __future__ import annotations import logging +import multiprocessing.queues as mpq import os import pickle -import queue import socket import threading import time from multiprocessing.synchronize import Event -from typing import Optional, Tuple, Union +from typing import Optional, Tuple +import typeguard import zmq from parsl.log_utils import set_file_logger @@ -33,10 +34,10 @@ def __init__(self, logdir: str = ".", logging_level: int = logging.INFO, atexit_timeout: int = 3, # in seconds - priority_msgs: "queue.Queue[AddressedMonitoringMessage]", - node_msgs: "queue.Queue[AddressedMonitoringMessage]", - block_msgs: "queue.Queue[AddressedMonitoringMessage]", - resource_msgs: "queue.Queue[AddressedMonitoringMessage]", + priority_msgs: mpq.Queue, + node_msgs: mpq.Queue, + block_msgs: mpq.Queue, + resource_msgs: mpq.Queue, exit_event: Event, ): """ Initializes a monitoring configuration class. @@ -202,12 +203,13 @@ def start_zmq_listener(self) -> None: @wrap_with_logs -def router_starter(comm_q: "queue.Queue[Union[Tuple[int, int], str]]", - exception_q: "queue.Queue[Tuple[str, str]]", - priority_msgs: "queue.Queue[AddressedMonitoringMessage]", - node_msgs: "queue.Queue[AddressedMonitoringMessage]", - block_msgs: "queue.Queue[AddressedMonitoringMessage]", - resource_msgs: "queue.Queue[AddressedMonitoringMessage]", +@typeguard.typechecked +def router_starter(comm_q: mpq.Queue, + exception_q: mpq.Queue, + priority_msgs: mpq.Queue, + node_msgs: mpq.Queue, + block_msgs: mpq.Queue, + resource_msgs: mpq.Queue, exit_event: Event, hub_address: str, From e34a70a23090e6f97b6090b3e3d567651e81e3d5 Mon Sep 17 00:00:00 2001 From: arhag23 <35051569+arhag23@users.noreply.github.com> Date: Fri, 16 Aug 2024 07:18:08 -0400 Subject: [PATCH 19/26] Make paramiko an optional dependency (#3584) Removed paramiko from requirements.txt and added it as an optional module in setup.py. Added OptionalModuleMissing errors for the ssh channel files for when usage is attempted without the required paramiko module being installed. Changed Behaviour: If users have code that depends on the ssh channels, they may need to opt in to that module. Prepares for #3515 --- parsl/channels/oauth_ssh/oauth_ssh.py | 12 ++++++++++-- parsl/channels/ssh/ssh.py | 22 ++++++++++++++++------ parsl/channels/ssh_il/ssh_il.py | 14 ++++++++++++-- requirements.txt | 1 - setup.py | 1 + test-requirements.txt | 1 + 6 files changed, 40 insertions(+), 11 deletions(-) diff --git a/parsl/channels/oauth_ssh/oauth_ssh.py b/parsl/channels/oauth_ssh/oauth_ssh.py index 3173b163a8..1b690a4e3c 100644 --- a/parsl/channels/oauth_ssh/oauth_ssh.py +++ b/parsl/channels/oauth_ssh/oauth_ssh.py @@ -1,11 +1,15 @@ import logging import socket -import paramiko - from parsl.channels.ssh.ssh import DeprecatedSSHChannel from parsl.errors import OptionalModuleMissing +try: + import paramiko + _ssh_enabled = True +except (ImportError, NameError, FileNotFoundError): + _ssh_enabled = False + try: from oauth_ssh.oauth_ssh_token import find_access_token from oauth_ssh.ssh_service import SSHService @@ -38,6 +42,10 @@ def __init__(self, hostname, username=None, script_dir=None, envs=None, port=22) Raises: ''' + if not _ssh_enabled: + raise OptionalModuleMissing(['ssh'], + "OauthSSHChannel requires the ssh module and config.") + if not _oauth_ssh_enabled: raise OptionalModuleMissing(['oauth_ssh'], "OauthSSHChannel requires oauth_ssh module and config.") diff --git a/parsl/channels/ssh/ssh.py b/parsl/channels/ssh/ssh.py index 38b8afe47b..c53a26b831 100644 --- a/parsl/channels/ssh/ssh.py +++ b/parsl/channels/ssh/ssh.py @@ -2,8 +2,6 @@ import logging import os -import paramiko - from parsl.channels.base import Channel from parsl.channels.errors import ( AuthException, @@ -13,15 +11,24 @@ FileCopyException, SSHException, ) +from parsl.errors import OptionalModuleMissing from parsl.utils import RepresentationMixin +try: + import paramiko + _ssh_enabled = True +except (ImportError, NameError, FileNotFoundError): + _ssh_enabled = False + + logger = logging.getLogger(__name__) -class NoAuthSSHClient(paramiko.SSHClient): - def _auth(self, username, *args): - self._transport.auth_none(username) - return +if _ssh_enabled: + class NoAuthSSHClient(paramiko.SSHClient): + def _auth(self, username, *args): + self._transport.auth_none(username) + return class DeprecatedSSHChannel(Channel, RepresentationMixin): @@ -53,6 +60,9 @@ def __init__(self, hostname, username=None, password=None, script_dir=None, envs Raises: ''' + if not _ssh_enabled: + raise OptionalModuleMissing(['ssh'], + "SSHChannel requires the ssh module and config.") self.hostname = hostname self.username = username diff --git a/parsl/channels/ssh_il/ssh_il.py b/parsl/channels/ssh_il/ssh_il.py index 3a5e0c5096..67e5501a43 100644 --- a/parsl/channels/ssh_il/ssh_il.py +++ b/parsl/channels/ssh_il/ssh_il.py @@ -1,9 +1,15 @@ import getpass import logging -import paramiko - from parsl.channels.ssh.ssh import DeprecatedSSHChannel +from parsl.errors import OptionalModuleMissing + +try: + import paramiko + _ssh_enabled = True +except (ImportError, NameError, FileNotFoundError): + _ssh_enabled = False + logger = logging.getLogger(__name__) @@ -30,6 +36,10 @@ def __init__(self, hostname, username=None, password=None, script_dir=None, envs Raises: ''' + if not _ssh_enabled: + raise OptionalModuleMissing(['ssh'], + "SSHInteractiveLoginChannel requires the ssh module and config.") + self.hostname = hostname self.username = username self.password = password diff --git a/requirements.txt b/requirements.txt index e89202942e..c60517655f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,7 +12,6 @@ globus-sdk dill tblib requests -paramiko psutil>=5.5.1 setproctitle filelock>=3.13,<4 diff --git a/setup.py b/setup.py index 85e014dc18..4934d01e5d 100755 --- a/setup.py +++ b/setup.py @@ -35,6 +35,7 @@ 'flux': ['pyyaml', 'cffi', 'jsonschema'], 'proxystore': ['proxystore'], 'radical-pilot': ['radical.pilot==1.60', 'radical.utils==1.60'], + 'ssh': ['paramiko'], # Disabling psi-j since github direct links are not allowed by pypi # 'psij': ['psi-j-parsl@git+https://github.com/ExaWorks/psi-j-parsl'] } diff --git a/test-requirements.txt b/test-requirements.txt index c735de8d5c..415e995c1b 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -1,6 +1,7 @@ flake8==6.1.0 ipyparallel pandas +paramiko pytest>=7.4.0,<8 pytest-cov pytest-random-order From 357547ff2b67a60d8935ae5b63d2ee029ca0cada Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 16 Aug 2024 14:06:12 +0200 Subject: [PATCH 20/26] Make router_starter parameters mandatory kwargs (#3583) See PR #2973 for justification of mandatory keyword args. --- parsl/monitoring/monitoring.py | 11 ++++++++--- parsl/monitoring/router.py | 3 ++- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index 9dccbecd35..a76e2cf487 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -154,9 +154,14 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No self.router_exit_event = Event() self.router_proc = ForkProcess(target=router_starter, - args=(comm_q, self.exception_q, self.priority_msgs, self.node_msgs, - self.block_msgs, self.resource_msgs, self.router_exit_event), - kwargs={"hub_address": self.hub_address, + kwargs={"comm_q": comm_q, + "exception_q": self.exception_q, + "priority_msgs": self.priority_msgs, + "node_msgs": self.node_msgs, + "block_msgs": self.block_msgs, + "resource_msgs": self.resource_msgs, + "exit_event": self.router_exit_event, + "hub_address": self.hub_address, "udp_port": self.hub_port, "zmq_port_range": self.hub_port_range, "logdir": self.logdir, diff --git a/parsl/monitoring/router.py b/parsl/monitoring/router.py index 7cce223048..343410e3a4 100644 --- a/parsl/monitoring/router.py +++ b/parsl/monitoring/router.py @@ -204,7 +204,8 @@ def start_zmq_listener(self) -> None: @wrap_with_logs @typeguard.typechecked -def router_starter(comm_q: mpq.Queue, +def router_starter(*, + comm_q: mpq.Queue, exception_q: mpq.Queue, priority_msgs: mpq.Queue, node_msgs: mpq.Queue, From f1359199e4f9e16f3ad15c3b5e9d53f8471820d0 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 16 Aug 2024 19:12:10 +0200 Subject: [PATCH 21/26] Remove monitoring queue tag switch monitoring db pre-router (#3587) The main goals of this PR is to make _migrate_logs_to_internal much more clearly a message forwarder, rather than a message interpreter. This follows on from PR #2168 which introduces _dispatch_to_internal to dispatches messages based on their tag rather than on the queue the message was received on, and is part of an ongoing series to simplify the queue and routing structure inside the monitoring router and database code. Further PRs in preparation (in draft PR #3315) contain further simplifications building on this PR. After this PR: * the database manager will respond to a STOP message on any incoming queue, vs previously only on the priority queue. This is a consequence of treating the queues all the same now. * the database manager will not perform such strong validation of message structure based on message tag at this point. That's part of expecting the code to forward messages, not inspect them, with later inspecting code being the place to care abou structure. This only affects behaviour when invalid messages are sent. Related PRs and context: #3567 changes the monitoring router to be more of a router and to not inspect and modify certain in-transit messages. There is a long slow project to regularise queues: PR #2117 makes resource info messages look like other message so they can be dispatched alongside other message types. The priority queue was initially (as I understand it) introduced to attempt to address a race condition of message order arrival vs SQL database key constraints. The priority queue is an attempt to force certain messages to be processed before others (not in the priority queue). However a subsequent commit in 2019, 0a4b68555ce1946e46b96a13f9003e0733252ec6, introduces a more robust approach because this priority queue approach does not work and so is not needed. --- parsl/monitoring/db_manager.py | 43 ++++++++++------------------------ 1 file changed, 13 insertions(+), 30 deletions(-) diff --git a/parsl/monitoring/db_manager.py b/parsl/monitoring/db_manager.py index 853bc4c3c7..053c98d598 100644 --- a/parsl/monitoring/db_manager.py +++ b/parsl/monitoring/db_manager.py @@ -316,7 +316,7 @@ def start(self, self._kill_event = threading.Event() self._priority_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal, args=( - priority_queue, 'priority', self._kill_event,), + priority_queue, self._kill_event,), name="Monitoring-migrate-priority", daemon=True, ) @@ -324,7 +324,7 @@ def start(self, self._node_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal, args=( - node_queue, 'node', self._kill_event,), + node_queue, self._kill_event,), name="Monitoring-migrate-node", daemon=True, ) @@ -332,7 +332,7 @@ def start(self, self._block_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal, args=( - block_queue, 'block', self._kill_event,), + block_queue, self._kill_event,), name="Monitoring-migrate-block", daemon=True, ) @@ -340,7 +340,7 @@ def start(self, self._resource_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal, args=( - resource_queue, 'resource', self._kill_event,), + resource_queue, self._kill_event,), name="Monitoring-migrate-resource", daemon=True, ) @@ -577,43 +577,26 @@ def start(self, raise RuntimeError("An exception happened sometime during database processing and should have been logged in database_manager.log") @wrap_with_logs(target="database_manager") - def _migrate_logs_to_internal(self, logs_queue: queue.Queue, queue_tag: str, kill_event: threading.Event) -> None: - logger.info("Starting processing for queue {}".format(queue_tag)) + def _migrate_logs_to_internal(self, logs_queue: queue.Queue, kill_event: threading.Event) -> None: + logger.info("Starting _migrate_logs_to_internal") while not kill_event.is_set() or logs_queue.qsize() != 0: - logger.debug("""Checking STOP conditions for {} threads: {}, {}""" - .format(queue_tag, kill_event.is_set(), logs_queue.qsize() != 0)) + logger.debug("Checking STOP conditions: kill event: %s, queue has entries: %s", + kill_event.is_set(), logs_queue.qsize() != 0) try: x, addr = logs_queue.get(timeout=0.1) except queue.Empty: continue else: - if queue_tag == 'priority' and x == 'STOP': + if x == 'STOP': self.close() - elif queue_tag == 'priority': # implicitly not 'STOP' - assert isinstance(x, tuple) - assert len(x) == 2 - assert x[0] in [MessageType.WORKFLOW_INFO, MessageType.TASK_INFO], \ - "_migrate_logs_to_internal can only migrate WORKFLOW_,TASK_INFO message from priority queue, got x[0] == {}".format(x[0]) - self._dispatch_to_internal(x) - elif queue_tag == 'resource': - assert isinstance(x, tuple), "_migrate_logs_to_internal was expecting a tuple, got {}".format(x) - assert x[0] == MessageType.RESOURCE_INFO, ( - "_migrate_logs_to_internal can only migrate RESOURCE_INFO message from resource queue, " - "got tag {}, message {}".format(x[0], x) - ) - self._dispatch_to_internal(x) - elif queue_tag == 'node': - assert len(x) == 2, "expected message tuple to have exactly two elements" - assert x[0] == MessageType.NODE_INFO, "_migrate_logs_to_internal can only migrate NODE_INFO messages from node queue" - - self._dispatch_to_internal(x) - elif queue_tag == "block": - self._dispatch_to_internal(x) else: - logger.error(f"Discarding because unknown queue tag '{queue_tag}', message: {x}") + self._dispatch_to_internal(x) def _dispatch_to_internal(self, x: Tuple) -> None: + assert isinstance(x, tuple) + assert len(x) == 2, "expected message tuple to have exactly two elements" + if x[0] in [MessageType.WORKFLOW_INFO, MessageType.TASK_INFO]: self.pending_priority_queue.put(cast(Any, x)) elif x[0] == MessageType.RESOURCE_INFO: From 123df5151f71c3f1be76e97f06c0ccf5e8be79d3 Mon Sep 17 00:00:00 2001 From: Yadu Nand Babuji Date: Fri, 16 Aug 2024 14:04:54 -0500 Subject: [PATCH 22/26] Move MPI behavior from HTEX to MPIExecutor (#3582) This PR moves the following MPI related functionality and options from HTEX to MPIExecutor: Kwarg options enable_mpi_mode and mpi_launcher is now removed from HTEX Checks for launcher being set to SimpleLauncher Checks for a valid mpi_launcher in now in MPIExecutor A new validate_resource_specification method is added to HTEX that currently asserts that no resource_specification is passed to it, since HTEX does not support any such options MPIExecutor overrides validate_resource_specification to check for a valid MPI resource specification These changes should make it easier to have executor specific resource validation. Changed Behaviour HTEX kwarg enable_mpi_mode and mpi_launcher are no longer supported. Expect to use MPI functionality only through the MPIExecutor --- parsl/executors/high_throughput/executor.py | 42 ++++------ .../executors/high_throughput/mpi_executor.py | 25 +++++- .../high_throughput/mpi_prefix_composer.py | 9 ++- .../test_resource_spec_validation.py | 40 ++++++++++ .../test_mpi_apps/test_bad_mpi_config.py | 43 ++++++---- .../test_mpi_apps/test_mpi_mode_disabled.py | 47 ----------- .../test_mpi_apps/test_mpi_mode_enabled.py | 24 ++++-- parsl/tests/test_mpi_apps/test_mpiex.py | 5 +- .../tests/test_mpi_apps/test_resource_spec.py | 80 +++++++++---------- test-requirements.txt | 1 + 10 files changed, 171 insertions(+), 145 deletions(-) create mode 100644 parsl/tests/test_htex/test_resource_spec_validation.py delete mode 100644 parsl/tests/test_mpi_apps/test_mpi_mode_disabled.py diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 301052c4c5..c4097500f1 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -12,7 +12,6 @@ import typeguard -import parsl.launchers from parsl import curvezmq from parsl.addresses import get_all_addresses from parsl.app.errors import RemoteExceptionWrapper @@ -25,8 +24,7 @@ RandomManagerSelector, ) from parsl.executors.high_throughput.mpi_prefix_composer import ( - VALID_LAUNCHERS, - validate_resource_spec, + InvalidResourceSpecification, ) from parsl.executors.status_handling import BlockProviderExecutor from parsl.jobs.states import TERMINAL_STATES, JobState, JobStatus @@ -224,17 +222,6 @@ class HighThroughputExecutor(BlockProviderExecutor, RepresentationMixin, UsageIn Parsl will create names as integers starting with 0. default: empty list - - enable_mpi_mode: bool - If enabled, MPI launch prefixes will be composed for the batch scheduler based on - the nodes available in each batch job and the resource_specification dict passed - from the app. This is an experimental feature, please refer to the following doc section - before use: https://parsl.readthedocs.io/en/stable/userguide/mpi_apps.html - - mpi_launcher: str - This field is only used if enable_mpi_mode is set. Select one from the - list of supported MPI launchers = ("srun", "aprun", "mpiexec"). - default: "mpiexec" """ @typeguard.typechecked @@ -263,8 +250,6 @@ def __init__(self, poll_period: int = 10, address_probe_timeout: Optional[int] = None, worker_logdir_root: Optional[str] = None, - enable_mpi_mode: bool = False, - mpi_launcher: str = "mpiexec", manager_selector: ManagerSelector = RandomManagerSelector(), block_error_handler: Union[bool, Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None]] = True, encrypted: bool = False): @@ -330,15 +315,6 @@ def __init__(self, self.encrypted = encrypted self.cert_dir = None - self.enable_mpi_mode = enable_mpi_mode - assert mpi_launcher in VALID_LAUNCHERS, \ - f"mpi_launcher must be set to one of {VALID_LAUNCHERS}" - if self.enable_mpi_mode: - assert isinstance(self.provider.launcher, parsl.launchers.SimpleLauncher), \ - "mpi_mode requires the provider to be configured to use a SimpleLauncher" - - self.mpi_launcher = mpi_launcher - if not launch_cmd: launch_cmd = DEFAULT_LAUNCH_CMD self.launch_cmd = launch_cmd @@ -348,6 +324,8 @@ def __init__(self, self.interchange_launch_cmd = interchange_launch_cmd radio_mode = "htex" + enable_mpi_mode: bool = False + mpi_launcher: str = "mpiexec" def _warn_deprecated(self, old: str, new: str): warnings.warn( @@ -377,6 +355,18 @@ def worker_logdir(self): return "{}/{}".format(self.worker_logdir_root, self.label) return self.logdir + def validate_resource_spec(self, resource_specification: dict): + """HTEX does not support *any* resource_specification options and + will raise InvalidResourceSpecification is any are passed to it""" + if resource_specification: + raise InvalidResourceSpecification( + set(resource_specification.keys()), + ("HTEX does not support the supplied resource_specifications." + "For MPI applications consider using the MPIExecutor. " + "For specifications for core count/memory/walltime, consider using WorkQueueExecutor. ") + ) + return + def initialize_scaling(self): """Compose the launch command and scale out the initial blocks. """ @@ -660,7 +650,7 @@ def submit(self, func, resource_specification, *args, **kwargs): Future """ - validate_resource_spec(resource_specification, self.enable_mpi_mode) + self.validate_resource_spec(resource_specification) if self.bad_state_is_set: raise self.executor_exception diff --git a/parsl/executors/high_throughput/mpi_executor.py b/parsl/executors/high_throughput/mpi_executor.py index b8045d38b3..04b8cf5197 100644 --- a/parsl/executors/high_throughput/mpi_executor.py +++ b/parsl/executors/high_throughput/mpi_executor.py @@ -8,8 +8,13 @@ GENERAL_HTEX_PARAM_DOCS, HighThroughputExecutor, ) +from parsl.executors.high_throughput.mpi_prefix_composer import ( + VALID_LAUNCHERS, + validate_resource_spec, +) from parsl.executors.status_handling import BlockProviderExecutor from parsl.jobs.states import JobStatus +from parsl.launchers import SimpleLauncher from parsl.providers import LocalProvider from parsl.providers.base import ExecutionProvider @@ -30,6 +35,11 @@ class MPIExecutor(HighThroughputExecutor): max_workers_per_block: int Maximum number of MPI applications to run at once per block + mpi_launcher: str + Select one from the list of supported MPI launchers: + ("srun", "aprun", "mpiexec"). + default: "mpiexec" + {GENERAL_HTEX_PARAM_DOCS} """ @@ -60,7 +70,6 @@ def __init__(self, super().__init__( # Hard-coded settings cores_per_worker=1e-9, # Ensures there will be at least an absurd number of workers - enable_mpi_mode=True, max_workers_per_node=max_workers_per_block, # Everything else @@ -82,9 +91,21 @@ def __init__(self, poll_period=poll_period, address_probe_timeout=address_probe_timeout, worker_logdir_root=worker_logdir_root, - mpi_launcher=mpi_launcher, block_error_handler=block_error_handler, encrypted=encrypted ) + self.enable_mpi_mode = True + self.mpi_launcher = mpi_launcher self.max_workers_per_block = max_workers_per_block + + if not isinstance(self.provider.launcher, SimpleLauncher): + raise TypeError("mpi_mode requires the provider to be configured to use a SimpleLauncher") + + if mpi_launcher not in VALID_LAUNCHERS: + raise ValueError(f"mpi_launcher set to:{mpi_launcher} must be set to one of {VALID_LAUNCHERS}") + + self.mpi_launcher = mpi_launcher + + def validate_resource_spec(self, resource_specification: dict): + return validate_resource_spec(resource_specification) diff --git a/parsl/executors/high_throughput/mpi_prefix_composer.py b/parsl/executors/high_throughput/mpi_prefix_composer.py index 78c5d8b867..0125d9a532 100644 --- a/parsl/executors/high_throughput/mpi_prefix_composer.py +++ b/parsl/executors/high_throughput/mpi_prefix_composer.py @@ -21,14 +21,15 @@ def __str__(self): class InvalidResourceSpecification(Exception): """Exception raised when Invalid input is supplied via resource specification""" - def __init__(self, invalid_keys: Set[str]): + def __init__(self, invalid_keys: Set[str], message: str = ''): self.invalid_keys = invalid_keys + self.message = message def __str__(self): - return f"Invalid resource specification options supplied: {self.invalid_keys}" + return f"Invalid resource specification options supplied: {self.invalid_keys} {self.message}" -def validate_resource_spec(resource_spec: Dict[str, str], is_mpi_enabled: bool): +def validate_resource_spec(resource_spec: Dict[str, str]): """Basic validation of keys in the resource_spec Raises: InvalidResourceSpecification if the resource_spec @@ -38,7 +39,7 @@ def validate_resource_spec(resource_spec: Dict[str, str], is_mpi_enabled: bool): # empty resource_spec when mpi_mode is set causes parsl to hang # ref issue #3427 - if is_mpi_enabled and len(user_keys) == 0: + if len(user_keys) == 0: raise MissingResourceSpecification('MPI mode requires optional parsl_resource_specification keyword argument to be configured') legal_keys = set(("ranks_per_node", diff --git a/parsl/tests/test_htex/test_resource_spec_validation.py b/parsl/tests/test_htex/test_resource_spec_validation.py new file mode 100644 index 0000000000..ac0c580c20 --- /dev/null +++ b/parsl/tests/test_htex/test_resource_spec_validation.py @@ -0,0 +1,40 @@ +import queue +from unittest import mock + +import pytest + +from parsl.executors import HighThroughputExecutor +from parsl.executors.high_throughput.mpi_prefix_composer import ( + InvalidResourceSpecification, +) + + +def double(x): + return x * 2 + + +@pytest.mark.local +def test_submit_calls_validate(): + + htex = HighThroughputExecutor() + htex.outgoing_q = mock.Mock(spec=queue.Queue) + htex.validate_resource_spec = mock.Mock(spec=htex.validate_resource_spec) + + res_spec = {} + htex.submit(double, res_spec, (5,), {}) + htex.validate_resource_spec.assert_called() + + +@pytest.mark.local +def test_resource_spec_validation(): + htex = HighThroughputExecutor() + ret_val = htex.validate_resource_spec({}) + assert ret_val is None + + +@pytest.mark.local +def test_resource_spec_validation_bad_keys(): + htex = HighThroughputExecutor() + + with pytest.raises(InvalidResourceSpecification): + htex.validate_resource_spec({"num_nodes": 2}) diff --git a/parsl/tests/test_mpi_apps/test_bad_mpi_config.py b/parsl/tests/test_mpi_apps/test_bad_mpi_config.py index 336bf87703..ebeb64622d 100644 --- a/parsl/tests/test_mpi_apps/test_bad_mpi_config.py +++ b/parsl/tests/test_mpi_apps/test_bad_mpi_config.py @@ -1,33 +1,48 @@ import pytest from parsl import Config -from parsl.executors import HighThroughputExecutor +from parsl.executors import MPIExecutor from parsl.launchers import AprunLauncher, SimpleLauncher, SrunLauncher from parsl.providers import SlurmProvider @pytest.mark.local -def test_bad_launcher_with_mpi_mode(): - """AssertionError if a launcher other than SimpleLauncher is supplied""" +def test_bad_launcher(): + """TypeError if a launcher other than SimpleLauncher is supplied""" for launcher in [SrunLauncher(), AprunLauncher()]: - with pytest.raises(AssertionError): + with pytest.raises(TypeError): Config(executors=[ - HighThroughputExecutor( - enable_mpi_mode=True, + MPIExecutor( provider=SlurmProvider(launcher=launcher), ) ]) @pytest.mark.local -def test_correct_launcher_with_mpi_mode(): +def test_bad_mpi_launcher(): + """ValueError if an unsupported mpi_launcher is specified""" + + with pytest.raises(ValueError): + Config(executors=[ + MPIExecutor( + mpi_launcher="bad_launcher", + provider=SlurmProvider(launcher=SimpleLauncher()), + ) + ]) + + +@pytest.mark.local +@pytest.mark.parametrize( + "mpi_launcher", + ["srun", "aprun", "mpiexec"] +) +def test_correct_launcher_with_mpi_mode(mpi_launcher: str): """Confirm that SimpleLauncher works with mpi_mode""" - config = Config(executors=[ - HighThroughputExecutor( - enable_mpi_mode=True, - provider=SlurmProvider(launcher=SimpleLauncher()), - ) - ]) - assert isinstance(config.executors[0].provider.launcher, SimpleLauncher) + executor = MPIExecutor( + mpi_launcher=mpi_launcher, + provider=SlurmProvider(launcher=SimpleLauncher()), + ) + + assert isinstance(executor.provider.launcher, SimpleLauncher) diff --git a/parsl/tests/test_mpi_apps/test_mpi_mode_disabled.py b/parsl/tests/test_mpi_apps/test_mpi_mode_disabled.py deleted file mode 100644 index e1e5c70883..0000000000 --- a/parsl/tests/test_mpi_apps/test_mpi_mode_disabled.py +++ /dev/null @@ -1,47 +0,0 @@ -from typing import Dict - -import pytest - -import parsl -from parsl import python_app -from parsl.tests.configs.htex_local import fresh_config - -EXECUTOR_LABEL = "MPI_TEST" - - -def local_config(): - config = fresh_config() - config.executors[0].label = EXECUTOR_LABEL - config.executors[0].max_workers_per_node = 1 - config.executors[0].enable_mpi_mode = False - return config - - -@python_app -def get_env_vars(parsl_resource_specification: Dict = {}) -> Dict: - import os - - parsl_vars = {} - for key in os.environ: - if key.startswith("PARSL_"): - parsl_vars[key] = os.environ[key] - return parsl_vars - - -@pytest.mark.local -def test_only_resource_specs_set(): - """Confirm that resource_spec env vars are set while launch prefixes are not - when enable_mpi_mode = False""" - resource_spec = { - "num_nodes": 4, - "ranks_per_node": 2, - } - - future = get_env_vars(parsl_resource_specification=resource_spec) - - result = future.result() - assert isinstance(result, Dict) - assert "PARSL_DEFAULT_PREFIX" not in result - assert "PARSL_SRUN_PREFIX" not in result - assert result["PARSL_NUM_NODES"] == str(resource_spec["num_nodes"]) - assert result["PARSL_RANKS_PER_NODE"] == str(resource_spec["ranks_per_node"]) diff --git a/parsl/tests/test_mpi_apps/test_mpi_mode_enabled.py b/parsl/tests/test_mpi_apps/test_mpi_mode_enabled.py index 6743d40eba..aff2501674 100644 --- a/parsl/tests/test_mpi_apps/test_mpi_mode_enabled.py +++ b/parsl/tests/test_mpi_apps/test_mpi_mode_enabled.py @@ -6,26 +6,34 @@ import pytest import parsl -from parsl import bash_app, python_app +from parsl import Config, bash_app, python_app +from parsl.executors import MPIExecutor from parsl.executors.high_throughput.mpi_prefix_composer import ( MissingResourceSpecification, ) -from parsl.tests.configs.htex_local import fresh_config +from parsl.launchers import SimpleLauncher +from parsl.providers import LocalProvider EXECUTOR_LABEL = "MPI_TEST" def local_setup(): - config = fresh_config() - config.executors[0].label = EXECUTOR_LABEL - config.executors[0].max_workers_per_node = 2 - config.executors[0].enable_mpi_mode = True - config.executors[0].mpi_launcher = "mpiexec" cwd = os.path.abspath(os.path.dirname(__file__)) pbs_nodefile = os.path.join(cwd, "mocks", "pbs_nodefile") - config.executors[0].provider.worker_init = f"export PBS_NODEFILE={pbs_nodefile}" + config = Config( + executors=[ + MPIExecutor( + label=EXECUTOR_LABEL, + max_workers_per_block=2, + mpi_launcher="mpiexec", + provider=LocalProvider( + worker_init=f"export PBS_NODEFILE={pbs_nodefile}", + launcher=SimpleLauncher() + ) + ) + ]) parsl.load(config) diff --git a/parsl/tests/test_mpi_apps/test_mpiex.py b/parsl/tests/test_mpi_apps/test_mpiex.py index a85547abea..2e8a38bc68 100644 --- a/parsl/tests/test_mpi_apps/test_mpiex.py +++ b/parsl/tests/test_mpi_apps/test_mpiex.py @@ -4,7 +4,6 @@ import pytest -import parsl from parsl import Config, HighThroughputExecutor from parsl.executors.high_throughput.mpi_executor import MPIExecutor from parsl.launchers import SimpleLauncher @@ -42,8 +41,8 @@ def test_docstring(): def test_init(): """Ensure all relevant kwargs are copied over from HTEx""" - new_kwargs = {'max_workers_per_block'} - excluded_kwargs = {'available_accelerators', 'enable_mpi_mode', 'cores_per_worker', 'max_workers_per_node', + new_kwargs = {'max_workers_per_block', 'mpi_launcher'} + excluded_kwargs = {'available_accelerators', 'cores_per_worker', 'max_workers_per_node', 'mem_per_worker', 'cpu_affinity', 'max_workers', 'manager_selector'} # Get the kwargs from both HTEx and MPIEx diff --git a/parsl/tests/test_mpi_apps/test_resource_spec.py b/parsl/tests/test_mpi_apps/test_resource_spec.py index 99d0187ccd..f180c67d52 100644 --- a/parsl/tests/test_mpi_apps/test_resource_spec.py +++ b/parsl/tests/test_mpi_apps/test_resource_spec.py @@ -1,18 +1,20 @@ import contextlib import logging import os +import queue import typing import unittest from typing import Dict +from unittest import mock import pytest -import parsl from parsl.app.app import python_app +from parsl.executors.high_throughput.executor import HighThroughputExecutor +from parsl.executors.high_throughput.mpi_executor import MPIExecutor from parsl.executors.high_throughput.mpi_prefix_composer import ( InvalidResourceSpecification, MissingResourceSpecification, - validate_resource_spec, ) from parsl.executors.high_throughput.mpi_resource_management import ( get_nodes_in_batchjob, @@ -20,6 +22,8 @@ get_slurm_hosts_list, identify_scheduler, ) +from parsl.launchers import SimpleLauncher +from parsl.providers import LocalProvider from parsl.tests.configs.htex_local import fresh_config EXECUTOR_LABEL = "MPI_TEST" @@ -48,23 +52,6 @@ def get_env_vars(parsl_resource_specification: Dict = {}) -> Dict: return parsl_vars -@pytest.mark.local -def test_resource_spec_env_vars(): - resource_spec = { - "num_nodes": 4, - "ranks_per_node": 2, - } - - assert double(5).result() == 10 - - future = get_env_vars(parsl_resource_specification=resource_spec) - - result = future.result() - assert isinstance(result, Dict) - assert result["PARSL_NUM_NODES"] == str(resource_spec["num_nodes"]) - assert result["PARSL_RANKS_PER_NODE"] == str(resource_spec["ranks_per_node"]) - - @pytest.mark.local @unittest.mock.patch("subprocess.check_output", return_value=b"c203-031\nc203-032\n") def test_slurm_mocked_mpi_fetch(subprocess_check): @@ -83,16 +70,6 @@ def add_to_path(path: os.PathLike) -> typing.Generator[None, None, None]: os.environ["PATH"] = old_path -@pytest.mark.local -@pytest.mark.skip -def test_slurm_mpi_fetch(): - logging.warning(f"Current pwd : {os.path.dirname(__file__)}") - with add_to_path(os.path.dirname(__file__)): - logging.warning(f"PATH: {os.environ['PATH']}") - nodeinfo = get_slurm_hosts_list() - logging.warning(f"Got : {nodeinfo}") - - @contextlib.contextmanager def mock_pbs_nodefile(nodefile: str = "pbs_nodefile") -> typing.Generator[None, None, None]: cwd = os.path.abspath(os.path.dirname(__file__)) @@ -122,22 +99,43 @@ def test_top_level(): @pytest.mark.local @pytest.mark.parametrize( - "resource_spec, is_mpi_enabled, exception", + "resource_spec, exception", ( - ({"num_nodes": 2, "ranks_per_node": 1}, False, None), - ({"launcher_options": "--debug_foo"}, False, None), - ({"num_nodes": 2, "BAD_OPT": 1}, False, InvalidResourceSpecification), - ({}, False, None), - ({"num_nodes": 2, "ranks_per_node": 1}, True, None), - ({"launcher_options": "--debug_foo"}, True, None), - ({"num_nodes": 2, "BAD_OPT": 1}, True, InvalidResourceSpecification), - ({}, True, MissingResourceSpecification), + + ({"num_nodes": 2, "ranks_per_node": 1}, None), + ({"launcher_options": "--debug_foo"}, None), + ({"num_nodes": 2, "BAD_OPT": 1}, InvalidResourceSpecification), + ({}, MissingResourceSpecification), ) ) -def test_resource_spec(resource_spec: Dict, is_mpi_enabled: bool, exception): +def test_mpi_resource_spec(resource_spec: Dict, exception): + """Test validation of resource_specification in MPIExecutor""" + + mpi_ex = MPIExecutor(provider=LocalProvider(launcher=SimpleLauncher())) + mpi_ex.outgoing_q = mock.Mock(spec=queue.Queue) + if exception: with pytest.raises(exception): - validate_resource_spec(resource_spec, is_mpi_enabled) + mpi_ex.validate_resource_spec(resource_spec) else: - result = validate_resource_spec(resource_spec, is_mpi_enabled) + result = mpi_ex.validate_resource_spec(resource_spec) assert result is None + + +@pytest.mark.local +@pytest.mark.parametrize( + "resource_spec", + ( + {"num_nodes": 2, "ranks_per_node": 1}, + {"launcher_options": "--debug_foo"}, + {"BAD_OPT": 1}, + ) +) +def test_mpi_resource_spec_passed_to_htex(resource_spec: dict): + """HTEX should reject every resource_spec""" + + htex = HighThroughputExecutor() + htex.outgoing_q = mock.Mock(spec=queue.Queue) + + with pytest.raises(InvalidResourceSpecification): + htex.validate_resource_spec(resource_spec) diff --git a/test-requirements.txt b/test-requirements.txt index 415e995c1b..acd670b5e9 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -8,6 +8,7 @@ pytest-random-order nbsphinx sphinx_rtd_theme mypy==1.5.1 +types-mock types-python-dateutil types-requests types-paramiko From 73f6f657233aa393f829f0361ba89c819218ff79 Mon Sep 17 00:00:00 2001 From: "Daniel S. Katz" Date: Tue, 20 Aug 2024 00:37:37 -0500 Subject: [PATCH 23/26] Add CZI badge to README.rst (#3596) --- README.rst | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/README.rst b/README.rst index 72048d39f4..da7f8245a5 100644 --- a/README.rst +++ b/README.rst @@ -1,6 +1,6 @@ Parsl - Parallel Scripting Library ================================== -|licence| |build-status| |docs| |NSF-1550588| |NSF-1550476| |NSF-1550562| |NSF-1550528| +|licence| |build-status| |docs| |NSF-1550588| |NSF-1550476| |NSF-1550562| |NSF-1550528| |CZI-EOSS| Parsl extends parallelism in Python beyond a single computer. @@ -64,6 +64,9 @@ then explore the `parallel computing patterns Date: Tue, 20 Aug 2024 04:46:13 -0700 Subject: [PATCH 24/26] Fallback to squeue when sacct is missing in SlurmProvider (#3591) Adds internal check to test whether the slurm provider should use the sacct or squeue command. Some slurm clusters might not use the accounting database sacct uses. This allows slurm clusters that use the database to use the sacct command which can be easier on the slurm scheduler, or if the database is not present switch to the squeue command which will should work on all clusters. Fixes #3590 --- parsl/providers/slurm/slurm.py | 50 +++++++++++++++++++++++++++------- 1 file changed, 40 insertions(+), 10 deletions(-) diff --git a/parsl/providers/slurm/slurm.py b/parsl/providers/slurm/slurm.py index ec6abeff56..54b4053fed 100644 --- a/parsl/providers/slurm/slurm.py +++ b/parsl/providers/slurm/slurm.py @@ -20,7 +20,7 @@ logger = logging.getLogger(__name__) # From https://slurm.schedmd.com/sacct.html#SECTION_JOB-STATE-CODES -translate_table = { +sacct_translate_table = { 'PENDING': JobState.PENDING, 'RUNNING': JobState.RUNNING, 'CANCELLED': JobState.CANCELLED, @@ -37,6 +37,20 @@ 'REQUEUED': JobState.PENDING } +squeue_translate_table = { + 'PD': JobState.PENDING, + 'R': JobState.RUNNING, + 'CA': JobState.CANCELLED, + 'CF': JobState.PENDING, # (configuring), + 'CG': JobState.RUNNING, # (completing), + 'CD': JobState.COMPLETED, + 'F': JobState.FAILED, # (failed), + 'TO': JobState.TIMEOUT, # (timeout), + 'NF': JobState.FAILED, # (node failure), + 'RV': JobState.FAILED, # (revoked) and + 'SE': JobState.FAILED # (special exit state) +} + class SlurmProvider(ClusterProvider, RepresentationMixin): """Slurm Execution Provider @@ -155,6 +169,23 @@ def __init__(self, self.regex_job_id = regex_job_id self.worker_init = worker_init + '\n' + # Check if sacct works and if not fall back to squeue + cmd = "sacct -X" + logger.debug("Executing %s", cmd) + retcode, stdout, stderr = self.execute_wait(cmd) + # If sacct fails it should return retcode=1 stderr="Slurm accounting storage is disabled" + logger.debug(f"sacct returned retcode={retcode} stderr={stderr}") + if retcode == 0: + logger.debug("using sacct to get job status") + # Using state%20 to get enough characters to not truncate output + # of the state. Without output can look like " CANCELLED+" + self._cmd = "sacct -X --noheader --format=jobid,state%20 --job '{0}'" + self._translate_table = sacct_translate_table + else: + logger.debug(f"sacct failed with retcode={retcode}") + logger.debug("falling back to using squeue to get job status") + self._cmd = "squeue --noheader --format='%i %t' --job '{0}'" + self._translate_table = squeue_translate_table def _status(self): '''Returns the status list for a list of job_ids @@ -172,16 +203,14 @@ def _status(self): logger.debug('No active jobs, skipping status update') return - # Using state%20 to get enough characters to not truncate output - # of the state. Without output can look like " CANCELLED+" - cmd = "sacct -X --noheader --format=jobid,state%20 --job '{0}'".format(job_id_list) + cmd = self._cmd.format(job_id_list) logger.debug("Executing %s", cmd) retcode, stdout, stderr = self.execute_wait(cmd) - logger.debug("sacct returned %s %s", stdout, stderr) + logger.debug("sacct/squeue returned %s %s", stdout, stderr) # Execute_wait failed. Do no update if retcode != 0: - logger.warning("sacct failed with non-zero exit code {}".format(retcode)) + logger.warning("sacct/squeue failed with non-zero exit code {}".format(retcode)) return jobs_missing = set(self.resources.keys()) @@ -193,9 +222,9 @@ def _status(self): # For example " CANCELLED by " # This splits and ignores anything past the first two unpacked values job_id, slurm_state, *ignore = line.split() - if slurm_state not in translate_table: + if slurm_state not in self._translate_table: logger.warning(f"Slurm status {slurm_state} is not recognized") - status = translate_table.get(slurm_state, JobState.UNKNOWN) + status = self._translate_table.get(slurm_state, JobState.UNKNOWN) logger.debug("Updating job {} with slurm status {} to parsl state {!s}".format(job_id, slurm_state, status)) self.resources[job_id]['status'] = JobStatus(status, stdout_path=self.resources[job_id]['job_stdout_path'], @@ -203,9 +232,10 @@ def _status(self): jobs_missing.remove(job_id) # sacct can get job info after jobs have completed so this path shouldn't be hit - # log a warning if there are missing jobs for some reason + # squeue does not report on jobs that are not running. So we are filling in the + # blanks for missing jobs, we might lose some information about why the jobs failed. for missing_job in jobs_missing: - logger.warning("Updating missing job {} to completed status".format(missing_job)) + logger.debug("Updating missing job {} to completed status".format(missing_job)) self.resources[missing_job]['status'] = JobStatus( JobState.COMPLETED, stdout_path=self.resources[missing_job]['job_stdout_path'], stderr_path=self.resources[missing_job]['job_stderr_path']) From 0fc966f2a284839df6c6662fd369d3530ae31a20 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 20 Aug 2024 14:16:18 +0200 Subject: [PATCH 25/26] Collapse 4 monitoring router to db queues into 1 queue (#3593) Prior to this PR, there are four multiprocessing queues from the monitoring router process to the database manager process. (also used by the submit process via MultiprocessingQueueRadioSender but that is not so relevant for this PR) Each message arriving at the router goes into MonitoringRouter.start_zmq_listener where it is dispatched based on tag type into one of these four queues towards the monitoring database. In the monitoring database code, no matter which queue the messages arrive on, they are all passed into DatabaseManager._dispatch_to_internal. The four queues then don't provide much functionality - their effect is maybe some non-deterministic message order shuffling. This PR collapses those four queues into a single queue. # Changed Behaviour Messages will arrive at the database manager in possibly different orders. This might flush out more race conditions. The monitoring router would previous validate that a message tag was one of 5 known message tags (as part of choosing which queue to dispatch to). This PR removes that validation. That validation now happens at the receiving end of the (now single) queue, in DatabaseManager._dispatch_to_internal. Error messages related to invalid tags (which should only be coming from development of new message types) will now appear in the database manager process, rather than the router process. --- parsl/monitoring/db_manager.py | 39 ++++------------------------------ parsl/monitoring/monitoring.py | 28 +++++------------------- parsl/monitoring/router.py | 36 +++---------------------------- 3 files changed, 12 insertions(+), 91 deletions(-) diff --git a/parsl/monitoring/db_manager.py b/parsl/monitoring/db_manager.py index 053c98d598..4fcf5ec2e2 100644 --- a/parsl/monitoring/db_manager.py +++ b/parsl/monitoring/db_manager.py @@ -308,35 +308,9 @@ def __init__(self, self.pending_resource_queue: queue.Queue[MonitoringMessage] = queue.Queue() def start(self, - priority_queue: mpq.Queue, - node_queue: mpq.Queue, - block_queue: mpq.Queue, resource_queue: mpq.Queue) -> None: self._kill_event = threading.Event() - self._priority_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal, - args=( - priority_queue, self._kill_event,), - name="Monitoring-migrate-priority", - daemon=True, - ) - self._priority_queue_pull_thread.start() - - self._node_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal, - args=( - node_queue, self._kill_event,), - name="Monitoring-migrate-node", - daemon=True, - ) - self._node_queue_pull_thread.start() - - self._block_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal, - args=( - block_queue, self._kill_event,), - name="Monitoring-migrate-block", - daemon=True, - ) - self._block_queue_pull_thread.start() self._resource_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal, args=( @@ -372,20 +346,18 @@ def start(self, while (not self._kill_event.is_set() or self.pending_priority_queue.qsize() != 0 or self.pending_resource_queue.qsize() != 0 or self.pending_node_queue.qsize() != 0 or self.pending_block_queue.qsize() != 0 or - priority_queue.qsize() != 0 or resource_queue.qsize() != 0 or - node_queue.qsize() != 0 or block_queue.qsize() != 0): + resource_queue.qsize() != 0): """ WORKFLOW_INFO and TASK_INFO messages (i.e. priority messages) """ try: - logger.debug("""Checking STOP conditions: {}, {}, {}, {}, {}, {}, {}, {}, {}""".format( + logger.debug("""Checking STOP conditions: {}, {}, {}, {}, {}, {}""".format( self._kill_event.is_set(), self.pending_priority_queue.qsize() != 0, self.pending_resource_queue.qsize() != 0, self.pending_node_queue.qsize() != 0, self.pending_block_queue.qsize() != 0, - priority_queue.qsize() != 0, resource_queue.qsize() != 0, - node_queue.qsize() != 0, block_queue.qsize() != 0)) + resource_queue.qsize() != 0)) # This is the list of resource messages which can be reprocessed as if they # had just arrived because the corresponding first task message has been @@ -707,9 +679,6 @@ def close(self) -> None: @wrap_with_logs(target="database_manager") @typeguard.typechecked def dbm_starter(exception_q: mpq.Queue, - priority_msgs: mpq.Queue, - node_msgs: mpq.Queue, - block_msgs: mpq.Queue, resource_msgs: mpq.Queue, db_url: str, logdir: str, @@ -726,7 +695,7 @@ def dbm_starter(exception_q: mpq.Queue, logdir=logdir, logging_level=logging_level) logger.info("Starting dbm in dbm starter") - dbm.start(priority_msgs, node_msgs, block_msgs, resource_msgs) + dbm.start(resource_msgs) except KeyboardInterrupt: logger.exception("KeyboardInterrupt signal caught") dbm.close() diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index a76e2cf487..e1de80116c 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -7,7 +7,7 @@ import time from multiprocessing import Event, Process from multiprocessing.queues import Queue -from typing import TYPE_CHECKING, Any, Optional, Tuple, Union, cast +from typing import TYPE_CHECKING, Any, Literal, Optional, Tuple, Union, cast import typeguard @@ -138,27 +138,15 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No self.exception_q: Queue[Tuple[str, str]] self.exception_q = SizedQueue(maxsize=10) - self.priority_msgs: Queue[Tuple[Any, int]] - self.priority_msgs = SizedQueue() - - self.resource_msgs: Queue[AddressedMonitoringMessage] + self.resource_msgs: Queue[Union[AddressedMonitoringMessage, Tuple[Literal["STOP"], Literal[0]]]] self.resource_msgs = SizedQueue() - self.node_msgs: Queue[AddressedMonitoringMessage] - self.node_msgs = SizedQueue() - - self.block_msgs: Queue[AddressedMonitoringMessage] - self.block_msgs = SizedQueue() - self.router_exit_event: ms.Event self.router_exit_event = Event() self.router_proc = ForkProcess(target=router_starter, kwargs={"comm_q": comm_q, "exception_q": self.exception_q, - "priority_msgs": self.priority_msgs, - "node_msgs": self.node_msgs, - "block_msgs": self.block_msgs, "resource_msgs": self.resource_msgs, "exit_event": self.router_exit_event, "hub_address": self.hub_address, @@ -173,7 +161,7 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No self.router_proc.start() self.dbm_proc = ForkProcess(target=dbm_starter, - args=(self.exception_q, self.priority_msgs, self.node_msgs, self.block_msgs, self.resource_msgs,), + args=(self.exception_q, self.resource_msgs,), kwargs={"logdir": self.logdir, "logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO, "db_url": self.logging_endpoint, @@ -192,7 +180,7 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No self.filesystem_proc.start() logger.info(f"Started filesystem radio receiver process {self.filesystem_proc.pid}") - self.radio = MultiprocessingQueueRadioSender(self.block_msgs) + self.radio = MultiprocessingQueueRadioSender(self.resource_msgs) try: comm_q_result = comm_q.get(block=True, timeout=120) @@ -249,7 +237,7 @@ def close(self) -> None: logger.debug("Finished waiting for router termination") if len(exception_msgs) == 0: logger.debug("Sending STOP to DBM") - self.priority_msgs.put(("STOP", 0)) + self.resource_msgs.put(("STOP", 0)) else: logger.debug("Not sending STOP to DBM, because there were DBM exceptions") logger.debug("Waiting for DB termination") @@ -267,14 +255,8 @@ def close(self) -> None: logger.info("Closing monitoring multiprocessing queues") self.exception_q.close() self.exception_q.join_thread() - self.priority_msgs.close() - self.priority_msgs.join_thread() self.resource_msgs.close() self.resource_msgs.join_thread() - self.node_msgs.close() - self.node_msgs.join_thread() - self.block_msgs.close() - self.block_msgs.join_thread() logger.info("Closed monitoring multiprocessing queues") diff --git a/parsl/monitoring/router.py b/parsl/monitoring/router.py index 343410e3a4..e92386c407 100644 --- a/parsl/monitoring/router.py +++ b/parsl/monitoring/router.py @@ -14,7 +14,6 @@ import zmq from parsl.log_utils import set_file_logger -from parsl.monitoring.message_type import MessageType from parsl.monitoring.types import AddressedMonitoringMessage, TaggedMonitoringMessage from parsl.process_loggers import wrap_with_logs from parsl.utils import setproctitle @@ -34,9 +33,6 @@ def __init__(self, logdir: str = ".", logging_level: int = logging.INFO, atexit_timeout: int = 3, # in seconds - priority_msgs: mpq.Queue, - node_msgs: mpq.Queue, - block_msgs: mpq.Queue, resource_msgs: mpq.Queue, exit_event: Event, ): @@ -57,8 +53,8 @@ def __init__(self, Logging level as defined in the logging module. Default: logging.INFO atexit_timeout : float, optional The amount of time in seconds to terminate the hub without receiving any messages, after the last dfk workflow message is received. - *_msgs : Queue - Four multiprocessing queues to receive messages, routed by type tag, and sometimes modified according to type tag. + resource_msgs : multiprocessing.Queue + A multiprocessing queue to receive messages to be routed onwards to the database process exit_event : Event An event that the main Parsl process will set to signal that the monitoring router should shut down. @@ -102,9 +98,6 @@ def __init__(self, min_port=zmq_port_range[0], max_port=zmq_port_range[1]) - self.priority_msgs = priority_msgs - self.node_msgs = node_msgs - self.block_msgs = block_msgs self.resource_msgs = resource_msgs self.exit_event = exit_event @@ -170,24 +163,7 @@ def start_zmq_listener(self) -> None: msg_0: AddressedMonitoringMessage msg_0 = (msg, 0) - if msg[0] == MessageType.NODE_INFO: - self.node_msgs.put(msg_0) - elif msg[0] == MessageType.RESOURCE_INFO: - self.resource_msgs.put(msg_0) - elif msg[0] == MessageType.BLOCK_INFO: - self.block_msgs.put(msg_0) - elif msg[0] == MessageType.TASK_INFO: - self.priority_msgs.put(msg_0) - elif msg[0] == MessageType.WORKFLOW_INFO: - self.priority_msgs.put(msg_0) - else: - # There is a type: ignore here because if msg[0] - # is of the correct type, this code is unreachable, - # but there is no verification that the message - # received from zmq_receiver_channel.recv_pyobj() is actually - # of that type. - self.logger.error("Discarding message " # type: ignore[unreachable] - f"from interchange with unknown type {msg[0].value}") + self.resource_msgs.put(msg_0) except zmq.Again: pass except Exception: @@ -207,9 +183,6 @@ def start_zmq_listener(self) -> None: def router_starter(*, comm_q: mpq.Queue, exception_q: mpq.Queue, - priority_msgs: mpq.Queue, - node_msgs: mpq.Queue, - block_msgs: mpq.Queue, resource_msgs: mpq.Queue, exit_event: Event, @@ -226,9 +199,6 @@ def router_starter(*, zmq_port_range=zmq_port_range, logdir=logdir, logging_level=logging_level, - priority_msgs=priority_msgs, - node_msgs=node_msgs, - block_msgs=block_msgs, resource_msgs=resource_msgs, exit_event=exit_event) except Exception as e: From b284dc1b068e397ad87fe4154e640af60d364c6d Mon Sep 17 00:00:00 2001 From: Kevin Hunter Kesling Date: Wed, 21 Aug 2024 12:32:32 -0400 Subject: [PATCH 26/26] Non-functional change: minor log call-site updates (#3597) Massage log statements to use argument style consistent with recent practice --- .../executors/high_throughput/interchange.py | 60 ++++++++++--------- 1 file changed, 31 insertions(+), 29 deletions(-) diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index fa0969d398..cd7d0596a9 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -375,7 +375,7 @@ def start(self) -> None: self.zmq_context.destroy() delta = time.time() - start - logger.info("Processed {} tasks in {} seconds".format(self.count, delta)) + logger.info(f"Processed {self.count} tasks in {delta} seconds") logger.warning("Exiting") def process_task_outgoing_incoming( @@ -396,9 +396,8 @@ def process_task_outgoing_incoming( try: msg = json.loads(message[1].decode('utf-8')) except Exception: - logger.warning("Got Exception reading message from manager: {!r}".format( - manager_id), exc_info=True) - logger.debug("Message: \n{!r}\n".format(message[1])) + logger.warning(f"Got Exception reading message from manager: {manager_id!r}", exc_info=True) + logger.debug("Message:\n %r\n", message[1]) return # perform a bit of validation on the structure of the deserialized @@ -406,7 +405,7 @@ def process_task_outgoing_incoming( # in obviously malformed cases if not isinstance(msg, dict) or 'type' not in msg: logger.error(f"JSON message was not correctly formatted from manager: {manager_id!r}") - logger.debug("Message: \n{!r}\n".format(message[1])) + logger.debug("Message:\n %r\n", message[1]) return if msg['type'] == 'registration': @@ -425,7 +424,7 @@ def process_task_outgoing_incoming( self.connected_block_history.append(msg['block_id']) interesting_managers.add(manager_id) - logger.info("Adding manager: {!r} to ready queue".format(manager_id)) + logger.info(f"Adding manager: {manager_id!r} to ready queue") m = self._ready_managers[manager_id] # m is a ManagerRecord, but msg is a dict[Any,Any] and so can @@ -434,12 +433,12 @@ def process_task_outgoing_incoming( # later. m.update(msg) # type: ignore[typeddict-item] - logger.info("Registration info for manager {!r}: {}".format(manager_id, msg)) + logger.info(f"Registration info for manager {manager_id!r}: {msg}") self._send_monitoring_info(monitoring_radio, m) if (msg['python_v'].rsplit(".", 1)[0] != self.current_platform['python_v'].rsplit(".", 1)[0] or msg['parsl_v'] != self.current_platform['parsl_v']): - logger.error("Manager {!r} has incompatible version info with the interchange".format(manager_id)) + logger.error(f"Manager {manager_id!r} has incompatible version info with the interchange") logger.debug("Setting kill event") kill_event.set() e = VersionMismatch("py.v={} parsl.v={}".format(self.current_platform['python_v'].rsplit(".", 1)[0], @@ -452,16 +451,15 @@ def process_task_outgoing_incoming( self.results_outgoing.send(pkl_package) logger.error("Sent failure reports, shutting down interchange") else: - logger.info("Manager {!r} has compatible Parsl version {}".format(manager_id, msg['parsl_v'])) - logger.info("Manager {!r} has compatible Python version {}".format(manager_id, - msg['python_v'].rsplit(".", 1)[0])) + logger.info(f"Manager {manager_id!r} has compatible Parsl version {msg['parsl_v']}") + logger.info(f"Manager {manager_id!r} has compatible Python version {msg['python_v'].rsplit('.', 1)[0]}") elif msg['type'] == 'heartbeat': self._ready_managers[manager_id]['last_heartbeat'] = time.time() - logger.debug("Manager {!r} sent heartbeat via tasks connection".format(manager_id)) + logger.debug("Manager %r sent heartbeat via tasks connection", manager_id) self.task_outgoing.send_multipart([manager_id, b'', PKL_HEARTBEAT_CODE]) elif msg['type'] == 'drain': self._ready_managers[manager_id]['draining'] = True - logger.debug(f"Manager {manager_id!r} requested drain") + logger.debug("Manager %r requested drain", manager_id) else: logger.error(f"Unexpected message type received from manager: {msg['type']}") logger.debug("leaving task_outgoing section") @@ -484,9 +482,11 @@ def expire_drained_managers(self, interesting_managers: Set[bytes], monitoring_r def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None: # Check if there are tasks that could be sent to managers - logger.debug("Managers count (interesting/total): {interesting}/{total}".format( - total=len(self._ready_managers), - interesting=len(interesting_managers))) + logger.debug( + "Managers count (interesting/total): {}/{}", + len(interesting_managers), + len(self._ready_managers) + ) if interesting_managers and not self.pending_task_queue.empty(): shuffled_managers = self.manager_selector.sort_managers(self._ready_managers, interesting_managers) @@ -497,7 +497,7 @@ def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None: tasks_inflight = len(m['tasks']) real_capacity = m['max_capacity'] - tasks_inflight - if (real_capacity and m['active'] and not m['draining']): + if real_capacity and m["active"] and not m["draining"]: tasks = self.get_tasks(real_capacity) if tasks: self.task_outgoing.send_multipart([manager_id, b'', pickle.dumps(tasks)]) @@ -506,19 +506,19 @@ def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None: tids = [t['task_id'] for t in tasks] m['tasks'].extend(tids) m['idle_since'] = None - logger.debug("Sent tasks: {} to manager {!r}".format(tids, manager_id)) + logger.debug("Sent tasks: %s to manager %r", tids, manager_id) # recompute real_capacity after sending tasks real_capacity = m['max_capacity'] - tasks_inflight if real_capacity > 0: - logger.debug("Manager {!r} has free capacity {}".format(manager_id, real_capacity)) + logger.debug("Manager %r has free capacity %s", manager_id, real_capacity) # ... so keep it in the interesting_managers list else: - logger.debug("Manager {!r} is now saturated".format(manager_id)) + logger.debug("Manager %r is now saturated", manager_id) interesting_managers.remove(manager_id) else: interesting_managers.remove(manager_id) # logger.debug("Nothing to send to manager {}".format(manager_id)) - logger.debug("leaving _ready_managers section, with {} managers still interesting".format(len(interesting_managers))) + logger.debug("leaving _ready_managers section, with %s managers still interesting", len(interesting_managers)) else: logger.debug("either no interesting managers or no tasks, so skipping manager pass") @@ -528,9 +528,9 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_ logger.debug("entering results_incoming section") manager_id, *all_messages = self.results_incoming.recv_multipart() if manager_id not in self._ready_managers: - logger.warning("Received a result from a un-registered manager: {!r}".format(manager_id)) + logger.warning(f"Received a result from a un-registered manager: {manager_id!r}") else: - logger.debug(f"Got {len(all_messages)} result items in batch from manager {manager_id!r}") + logger.debug("Got %s result items in batch from manager %r", len(all_messages), manager_id) b_messages = [] @@ -548,10 +548,10 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_ monitoring_radio.send(r['payload']) elif r['type'] == 'heartbeat': - logger.debug(f"Manager {manager_id!r} sent heartbeat via results connection") + logger.debug("Manager %r sent heartbeat via results connection", manager_id) b_messages.append((p_message, r)) else: - logger.error("Interchange discarding result_queue message of unknown type: {}".format(r['type'])) + logger.error("Interchange discarding result_queue message of unknown type: %s", r["type"]) got_result = False m = self._ready_managers[manager_id] @@ -560,14 +560,16 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_ if r['type'] == 'result': got_result = True try: - logger.debug(f"Removing task {r['task_id']} from manager record {manager_id!r}") + logger.debug("Removing task %s from manager record %r", r["task_id"], manager_id) m['tasks'].remove(r['task_id']) except Exception: # If we reach here, there's something very wrong. - logger.exception("Ignoring exception removing task_id {} for manager {!r} with task list {}".format( + logger.exception( + "Ignoring exception removing task_id %s for manager %r with task list %s", r['task_id'], manager_id, - m['tasks'])) + m["tasks"] + ) b_messages_to_send = [] for (b_message, _) in b_messages: @@ -578,7 +580,7 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_ self.results_outgoing.send_multipart(b_messages_to_send) logger.debug("Sent messages on results_outgoing") - logger.debug(f"Current tasks on manager {manager_id!r}: {m['tasks']}") + logger.debug("Current tasks on manager %r: %s", manager_id, m["tasks"]) if len(m['tasks']) == 0 and m['idle_since'] is None: m['idle_since'] = time.time()