From 063033a8c6680da49c4f8af381296b3730872ea7 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Sat, 12 Aug 2023 14:51:38 +0200 Subject: [PATCH 1/8] Remove unused key in iteration of task dictionary (#2863) This comes from a flake8-bugbear warning. --- parsl/dataflow/dflow.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index 936a32acf4..dd13b6adbd 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -1146,8 +1146,11 @@ def wait_for_current_tasks(self) -> None: logger.info("Waiting for all remaining tasks to complete") - items = list(self.tasks.items()) - for task_id, task_record in items: + # .values is made into a list immediately to reduce (although not + # eliminate) a race condition where self.tasks can be modified + # elsewhere by a completing task being removed from the dictionary. + task_records = list(self.tasks.values()) + for task_record in task_records: # .exception() is a less exception throwing way of # waiting for completion than .result() fut = task_record['app_fu'] From 322f01b7cf1611c3cb4af44239e2ec87f057c9ce Mon Sep 17 00:00:00 2001 From: Yadu Nand Babuji Date: Sat, 12 Aug 2023 09:20:04 -0500 Subject: [PATCH 2/8] Add user-pluggable block error handling and a new sliding-window error handler (#2858) This PR aims to address the limited capabilities of the current simple_error_handler at stopping the Parsl runtime when there are repeated failures. The current system only fails if all jobs fail, which is only indicative of configuration errors or problem with the batch scheduler. This PR adds new behavior that updates the existing block_error_handler bool variable to take a custom error handler. This PR also adds a new windows_error_handler to better handle long running workflows. This handler shuts down if and only if the last N jobs all failed. Co-authored-by: Ben Clifford --- parsl/executors/high_throughput/executor.py | 5 +- parsl/executors/status_handling.py | 27 +-- ...ple_error_handler.py => error_handlers.py} | 17 +- .../test_scaling/test_block_error_handler.py | 168 ++++++++++++++++++ 4 files changed, 201 insertions(+), 16 deletions(-) rename parsl/jobs/{simple_error_handler.py => error_handlers.py} (65%) create mode 100644 parsl/tests/test_scaling/test_block_error_handler.py diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index cc499ef1c3..69af77a86b 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -9,12 +9,13 @@ import warnings from multiprocessing import Queue from typing import Dict, Sequence # noqa F401 (used in type annotation) -from typing import List, Optional, Tuple, Union +from typing import List, Optional, Tuple, Union, Callable import math from parsl.serialize import pack_apply_message, deserialize from parsl.serialize.errors import SerializationError, DeserializationError from parsl.app.errors import RemoteExceptionWrapper +from parsl.jobs.states import JobStatus from parsl.executors.high_throughput import zmq_pipes from parsl.executors.high_throughput import interchange from parsl.executors.errors import ( @@ -212,7 +213,7 @@ def __init__(self, poll_period: int = 10, address_probe_timeout: Optional[int] = None, worker_logdir_root: Optional[str] = None, - block_error_handler: bool = True): + block_error_handler: Union[bool, Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None]] = True): logger.debug("Initializing HighThroughputExecutor") diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index 6a06ea45d0..29070ae4d5 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -1,19 +1,19 @@ +from __future__ import annotations import logging import threading from itertools import compress from abc import abstractmethod, abstractproperty from concurrent.futures import Future -from typing import List, Any, Dict, Optional, Tuple, Union +from typing import List, Any, Dict, Optional, Tuple, Union, Callable import parsl # noqa F401 from parsl.executors.base import ParslExecutor from parsl.executors.errors import BadStateException, ScalingFailed from parsl.jobs.states import JobStatus, JobState +from parsl.jobs.error_handlers import simple_error_handler, noop_error_handler from parsl.providers.base import ExecutionProvider from parsl.utils import AtomicIDCounter -import parsl.jobs.simple_error_handler as error_handler - logger = logging.getLogger(__name__) @@ -47,10 +47,18 @@ class BlockProviderExecutor(ParslExecutor): """ def __init__(self, *, provider: Optional[ExecutionProvider], - block_error_handler: bool): + block_error_handler: Union[bool, Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None]]): super().__init__() self._provider = provider - self.block_error_handler = block_error_handler + self.block_error_handler: Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None] + if isinstance(block_error_handler, bool): + if block_error_handler: + self.block_error_handler = simple_error_handler + else: + self.block_error_handler = noop_error_handler + else: + self.block_error_handler = block_error_handler + # errors can happen during the submit call to the provider; this is used # to keep track of such errors so that they can be handled in one place # together with errors reported by status() @@ -161,14 +169,7 @@ def handle_errors(self, status: Dict[str, JobStatus]) -> None: scheme will be used. :param status: status of all jobs launched by this executor """ - if not self.block_error_handler: - return - init_blocks = 3 - if hasattr(self.provider, 'init_blocks'): - init_blocks = self.provider.init_blocks - if init_blocks < 1: - init_blocks = 1 - error_handler.simple_error_handler(self, status, init_blocks) + self.block_error_handler(self, status) @property def tasks(self) -> Dict[object, Future]: diff --git a/parsl/jobs/simple_error_handler.py b/parsl/jobs/error_handlers.py similarity index 65% rename from parsl/jobs/simple_error_handler.py rename to parsl/jobs/error_handlers.py index 72481c6932..2bd91a8c4b 100644 --- a/parsl/jobs/simple_error_handler.py +++ b/parsl/jobs/error_handlers.py @@ -6,12 +6,27 @@ from parsl.jobs.states import JobStatus, JobState -def simple_error_handler(executor: status_handling.BlockProviderExecutor, status: Dict[str, JobStatus], threshold: int): +def noop_error_handler(executor: status_handling.BlockProviderExecutor, status: Dict[str, JobStatus], threshold: int = 3): + pass + + +def simple_error_handler(executor: status_handling.BlockProviderExecutor, status: Dict[str, JobStatus], threshold: int = 3): (total_jobs, failed_jobs) = _count_jobs(status) + if hasattr(executor.provider, "init_blocks"): + threshold = max(1, executor.provider.init_blocks) + if total_jobs >= threshold and failed_jobs == total_jobs: executor.set_bad_state_and_fail_all(_get_error(status)) +def windowed_error_handler(executor: status_handling.BlockProviderExecutor, status: Dict[str, JobStatus], threshold: int = 3): + sorted_status = [(key, status[key]) for key in sorted(status, key=lambda x: int(x))] + current_window = dict(sorted_status[-threshold:]) + total, failed = _count_jobs(current_window) + if failed == threshold: + executor.set_bad_state_and_fail_all(_get_error(status)) + + def _count_jobs(status: Dict[str, JobStatus]): total = 0 failed = 0 diff --git a/parsl/tests/test_scaling/test_block_error_handler.py b/parsl/tests/test_scaling/test_block_error_handler.py new file mode 100644 index 0000000000..9d680212e3 --- /dev/null +++ b/parsl/tests/test_scaling/test_block_error_handler.py @@ -0,0 +1,168 @@ +import pytest + +from parsl.executors import HighThroughputExecutor +from parsl.providers import LocalProvider +from unittest.mock import Mock +from parsl.jobs.states import JobStatus, JobState +from parsl.jobs.error_handlers import simple_error_handler, windowed_error_handler, noop_error_handler +from functools import partial + + +@pytest.mark.local +def test_block_error_handler_false(): + mock = Mock() + htex = HighThroughputExecutor(block_error_handler=False) + assert htex.block_error_handler is noop_error_handler + htex.set_bad_state_and_fail_all = mock + + bad_jobs = {'1': JobStatus(JobState.FAILED), + '2': JobStatus(JobState.FAILED), + '3': JobStatus(JobState.FAILED), + '4': JobStatus(JobState.FAILED)} + + htex.handle_errors(bad_jobs) + mock.assert_not_called() + + +@pytest.mark.local +def test_block_error_handler_mock(): + handler_mock = Mock() + htex = HighThroughputExecutor(block_error_handler=handler_mock) + assert htex.block_error_handler is handler_mock + + bad_jobs = {'1': JobStatus(JobState.FAILED), + '2': JobStatus(JobState.FAILED), + '3': JobStatus(JobState.FAILED), + '4': JobStatus(JobState.FAILED)} + + htex.handle_errors(bad_jobs) + handler_mock.assert_called() + handler_mock.assert_called_with(htex, bad_jobs) + + +@pytest.mark.local +def test_simple_error_handler(): + htex = HighThroughputExecutor(block_error_handler=simple_error_handler, + provider=LocalProvider(init_blocks=3)) + + assert htex.block_error_handler is simple_error_handler + + bad_state_mock = Mock() + htex.set_bad_state_and_fail_all = bad_state_mock + + bad_jobs = {'1': JobStatus(JobState.FAILED), + '2': JobStatus(JobState.FAILED)} + htex.handle_errors(bad_jobs) + bad_state_mock.assert_not_called() + + # Check the bad behavior where if any job is not failed + # bad state won't be set + bad_jobs = {'1': JobStatus(JobState.COMPLETED), + '2': JobStatus(JobState.FAILED), + '3': JobStatus(JobState.FAILED), + '4': JobStatus(JobState.FAILED)} + + htex.handle_errors(bad_jobs) + bad_state_mock.assert_not_called() + + bad_jobs = {'1': JobStatus(JobState.FAILED), + '2': JobStatus(JobState.FAILED), + '3': JobStatus(JobState.FAILED), + '4': JobStatus(JobState.FAILED)} + + htex.handle_errors(bad_jobs) + bad_state_mock.assert_called() + + +@pytest.mark.local +def test_windowed_error_handler(): + htex = HighThroughputExecutor(block_error_handler=windowed_error_handler) + assert htex.block_error_handler is windowed_error_handler + + bad_state_mock = Mock() + htex.set_bad_state_and_fail_all = bad_state_mock + + bad_jobs = {'1': JobStatus(JobState.FAILED), + '2': JobStatus(JobState.FAILED)} + htex.handle_errors(bad_jobs) + bad_state_mock.assert_not_called() + + bad_jobs = {'1': JobStatus(JobState.COMPLETED), + '2': JobStatus(JobState.FAILED), + '3': JobStatus(JobState.FAILED)} + htex.handle_errors(bad_jobs) + bad_state_mock.assert_not_called() + + bad_jobs = {'1': JobStatus(JobState.FAILED), + '2': JobStatus(JobState.FAILED), + '3': JobStatus(JobState.COMPLETED), + '4': JobStatus(JobState.FAILED)} + htex.handle_errors(bad_jobs) + bad_state_mock.assert_not_called() + + bad_jobs = {'1': JobStatus(JobState.COMPLETED), + '2': JobStatus(JobState.FAILED), + '3': JobStatus(JobState.FAILED), + '4': JobStatus(JobState.FAILED)} + htex.handle_errors(bad_jobs) + bad_state_mock.assert_called() + + +@pytest.mark.local +def test_windowed_error_handler_sorting(): + htex = HighThroughputExecutor(block_error_handler=windowed_error_handler) + assert htex.block_error_handler is windowed_error_handler + + bad_state_mock = Mock() + htex.set_bad_state_and_fail_all = bad_state_mock + + bad_jobs = {'8': JobStatus(JobState.FAILED), + '9': JobStatus(JobState.FAILED), + '10': JobStatus(JobState.FAILED), + '11': JobStatus(JobState.COMPLETED), + '12': JobStatus(JobState.COMPLETED)} + htex.handle_errors(bad_jobs) + bad_state_mock.assert_not_called() + + bad_jobs = {'8': JobStatus(JobState.COMPLETED), + '9': JobStatus(JobState.FAILED), + '21': JobStatus(JobState.FAILED), + '22': JobStatus(JobState.FAILED), + '10': JobStatus(JobState.FAILED)} + htex.handle_errors(bad_jobs) + bad_state_mock.assert_called() + + +@pytest.mark.local +def test_windowed_error_handler_with_threshold(): + error_handler = partial(windowed_error_handler, threshold=2) + htex = HighThroughputExecutor(block_error_handler=error_handler) + assert htex.block_error_handler is error_handler + + bad_state_mock = Mock() + htex.set_bad_state_and_fail_all = bad_state_mock + + bad_jobs = {'1': JobStatus(JobState.COMPLETED), + '2': JobStatus(JobState.FAILED)} + htex.handle_errors(bad_jobs) + bad_state_mock.assert_not_called() + + bad_jobs = {'1': JobStatus(JobState.COMPLETED), + '2': JobStatus(JobState.FAILED), + '3': JobStatus(JobState.COMPLETED)} + htex.handle_errors(bad_jobs) + bad_state_mock.assert_not_called() + + bad_jobs = {'1': JobStatus(JobState.COMPLETED), + '2': JobStatus(JobState.COMPLETED), + '3': JobStatus(JobState.COMPLETED), + '4': JobStatus(JobState.FAILED)} + htex.handle_errors(bad_jobs) + bad_state_mock.assert_not_called() + + bad_jobs = {'1': JobStatus(JobState.COMPLETED), + '2': JobStatus(JobState.COMPLETED), + '3': JobStatus(JobState.FAILED), + '4': JobStatus(JobState.FAILED)} + htex.handle_errors(bad_jobs) + bad_state_mock.assert_called() From cb1b365bae39c92c7378d9d9ba5cbc46261b3b20 Mon Sep 17 00:00:00 2001 From: "Andrew S. Rosen" Date: Sat, 12 Aug 2023 22:12:22 -0700 Subject: [PATCH 3/8] Update .wci.yml (#2868) --- .wci.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.wci.yml b/.wci.yml index 9ad60dacfb..c11a2e82b4 100644 --- a/.wci.yml +++ b/.wci.yml @@ -34,6 +34,7 @@ execution_environment: - LSF - PBS - Cobalt + - Flux - GridEngine - HTCondor - AWS From 1ffa6c3fce4ebf7c38be2c44526111f39502fc10 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 21 Aug 2023 17:43:56 +0200 Subject: [PATCH 4/8] Make resource monitor exit when monitored process has gone away (#2866) Issue #2840 reports that the process monitor does not always exit if the workflow process exits unexpectedly, and looking at the cpython implementation, it looks like these processes will only be terminated at worker shutdown if the worker shuts down normally. If it is surprise terminated/killed, it will not shut down its dependent daemons such as any resource monitors it has launched. This PR adds a liveness check for the process that it is monitoring, which is the parent worker process; so that if that process goes away without telling the resource monitor to exit, then the resource monitor will exit anyway. --- parsl/monitoring/remote.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parsl/monitoring/remote.py b/parsl/monitoring/remote.py index 479a7ccfa4..d42e0079b4 100644 --- a/parsl/monitoring/remote.py +++ b/parsl/monitoring/remote.py @@ -262,7 +262,7 @@ def accumulate_and_prepare() -> Dict[str, Any]: next_send = time.time() accumulate_dur = 5.0 # TODO: make configurable? - while not terminate_event.is_set(): + while not terminate_event.is_set() and pm.is_running(): logging.debug("start of monitoring loop") try: d = accumulate_and_prepare() From 2e158b04eeb3e0ddcae3231eec789a59d2b9a10c Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 21 Aug 2023 18:35:07 +0200 Subject: [PATCH 5/8] Rework _get_error in simple_error_handler to give more information (#2867) Before (in pytest output, with htex launch_cmd="/bin/false"): ``` Exception: STDOUT: Found cores : 8 Launching worker: 1 ``` After: ``` parsl.jobs.errors.TooManyJobFailuresError: Error 1: EXIT CODE: 1 STDOUT: Found cores : 8 Launching worker: 1 ``` * Fix counting so it counts even when js.message is None * Add exit code * Use a new error subclass rather than Exception * Pluralise messages in for no-messages string and remove brackets '[No error message received]' vs 'No error messages received' --- parsl/jobs/error_handlers.py | 16 ++++++++++++---- parsl/jobs/errors.py | 7 +++++++ 2 files changed, 19 insertions(+), 4 deletions(-) create mode 100644 parsl/jobs/errors.py diff --git a/parsl/jobs/error_handlers.py b/parsl/jobs/error_handlers.py index 2bd91a8c4b..c35a87447d 100644 --- a/parsl/jobs/error_handlers.py +++ b/parsl/jobs/error_handlers.py @@ -4,6 +4,7 @@ import parsl.executors.status_handling as status_handling from parsl.jobs.states import JobStatus, JobState +from parsl.jobs.errors import TooManyJobFailuresError def noop_error_handler(executor: status_handling.BlockProviderExecutor, status: Dict[str, JobStatus], threshold: int = 3): @@ -42,18 +43,25 @@ def _get_error(status: Dict[str, JobStatus]) -> Exception: err = "" count = 1 for js in status.values(): + err = err + f"Error {count}:\n" + count += 1 + if js.message is not None: - err = err + "{}. {}\n".format(count, js.message) - count += 1 + err = err + f"\t{js.message}\n" + + if js.exit_code is not None: + err = err + f"\tEXIT CODE: {js.exit_code}\n" + stdout = js.stdout_summary if stdout: err = err + "\tSTDOUT: {}\n".format(stdout) + stderr = js.stderr_summary if stderr: err = err + "\tSTDERR: {}\n".format(stderr) if len(err) == 0: - err = "[No error message received]" + err = "No error messages received" # wrapping things in an exception here doesn't really help in providing more information # than the string itself - return Exception(err) + return TooManyJobFailuresError(err) diff --git a/parsl/jobs/errors.py b/parsl/jobs/errors.py new file mode 100644 index 0000000000..6d42f429be --- /dev/null +++ b/parsl/jobs/errors.py @@ -0,0 +1,7 @@ +from parsl.errors import ParslError + + +class TooManyJobFailuresError(ParslError): + """Indicates that executor is shut down because of too many block failures. + """ + pass From 93a141373870dd4de1121931d02b969c72a83461 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 21 Aug 2023 19:08:25 +0200 Subject: [PATCH 6/8] Remove unused re-used DFK default configuration object (#2864) The default configuration actually comes from DataFlowKernelLoader.load() and the Config() object specified in the DataFlowKernel.__init__ default parameters is never used, as all constructions of DataFlowKernel in the codebase specify a config. This comes from flake8-bugbear --- parsl/dataflow/dflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index dd13b6adbd..b87ff0ded8 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -69,7 +69,7 @@ class DataFlowKernel: """ @typechecked - def __init__(self, config: Config = Config()) -> None: + def __init__(self, config: Config) -> None: """Initialize the DataFlowKernel. Parameters From 40b969f7a9e16b3fcab2db127753bf85cbcb5375 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 21 Aug 2023 19:52:53 +0200 Subject: [PATCH 7/8] Remove unused defaults from dfk.submit (#2865) All of these parameters are specified by invocations in the relevant app decorators and so these defaults are never used. Removing the default app_kwargs = {} is especially interesting for mutability safety. This comes from flake8-bugbear --- parsl/dataflow/dflow.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index b87ff0ded8..e9f9247cc2 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -898,10 +898,10 @@ def _unwrap_futures(self, args, kwargs): def submit(self, func: Callable, app_args: Sequence[Any], - executors: Union[str, Sequence[str]] = 'all', - cache: bool = False, - ignore_for_cache: Optional[Sequence[str]] = None, - app_kwargs: Dict[str, Any] = {}, + executors: Union[str, Sequence[str]], + cache: bool, + ignore_for_cache: Optional[Sequence[str]], + app_kwargs: Dict[str, Any], join: bool = False) -> AppFuture: """Add task to the dataflow system. From 465eb43be7d276a73af19d13c47da3c4f60eaaf8 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 5 Sep 2023 14:55:46 +0200 Subject: [PATCH 8/8] Inhibit release if current master is already a release (#2617) --- .github/workflows/python-publish-to-testpypi.yml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/.github/workflows/python-publish-to-testpypi.yml b/.github/workflows/python-publish-to-testpypi.yml index e280d61c57..de2afa7e3b 100644 --- a/.github/workflows/python-publish-to-testpypi.yml +++ b/.github/workflows/python-publish-to-testpypi.yml @@ -32,6 +32,12 @@ jobs: steps: - uses: actions/checkout@v3 + + - name: Check if this commit is already released + id: already_released + run: | + if git tag --contains HEAD | grep -e '^[0-9]\{4\}\.[0-9]\{2\}\.[0-9]\{2\}$' ; then exit 1 ; fi + - name: Set up Python uses: actions/setup-python@v3 with: