diff --git a/.github/workflows/python-publish-to-testpypi.yml b/.github/workflows/python-publish-to-testpypi.yml index e280d61c57..de2afa7e3b 100644 --- a/.github/workflows/python-publish-to-testpypi.yml +++ b/.github/workflows/python-publish-to-testpypi.yml @@ -32,6 +32,12 @@ jobs: steps: - uses: actions/checkout@v3 + + - name: Check if this commit is already released + id: already_released + run: | + if git tag --contains HEAD | grep -e '^[0-9]\{4\}\.[0-9]\{2\}\.[0-9]\{2\}$' ; then exit 1 ; fi + - name: Set up Python uses: actions/setup-python@v3 with: diff --git a/.wci.yml b/.wci.yml index 9ad60dacfb..c11a2e82b4 100644 --- a/.wci.yml +++ b/.wci.yml @@ -34,6 +34,7 @@ execution_environment: - LSF - PBS - Cobalt + - Flux - GridEngine - HTCondor - AWS diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index 936a32acf4..e9f9247cc2 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -69,7 +69,7 @@ class DataFlowKernel: """ @typechecked - def __init__(self, config: Config = Config()) -> None: + def __init__(self, config: Config) -> None: """Initialize the DataFlowKernel. Parameters @@ -898,10 +898,10 @@ def _unwrap_futures(self, args, kwargs): def submit(self, func: Callable, app_args: Sequence[Any], - executors: Union[str, Sequence[str]] = 'all', - cache: bool = False, - ignore_for_cache: Optional[Sequence[str]] = None, - app_kwargs: Dict[str, Any] = {}, + executors: Union[str, Sequence[str]], + cache: bool, + ignore_for_cache: Optional[Sequence[str]], + app_kwargs: Dict[str, Any], join: bool = False) -> AppFuture: """Add task to the dataflow system. @@ -1146,8 +1146,11 @@ def wait_for_current_tasks(self) -> None: logger.info("Waiting for all remaining tasks to complete") - items = list(self.tasks.items()) - for task_id, task_record in items: + # .values is made into a list immediately to reduce (although not + # eliminate) a race condition where self.tasks can be modified + # elsewhere by a completing task being removed from the dictionary. + task_records = list(self.tasks.values()) + for task_record in task_records: # .exception() is a less exception throwing way of # waiting for completion than .result() fut = task_record['app_fu'] diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index cc499ef1c3..69af77a86b 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -9,12 +9,13 @@ import warnings from multiprocessing import Queue from typing import Dict, Sequence # noqa F401 (used in type annotation) -from typing import List, Optional, Tuple, Union +from typing import List, Optional, Tuple, Union, Callable import math from parsl.serialize import pack_apply_message, deserialize from parsl.serialize.errors import SerializationError, DeserializationError from parsl.app.errors import RemoteExceptionWrapper +from parsl.jobs.states import JobStatus from parsl.executors.high_throughput import zmq_pipes from parsl.executors.high_throughput import interchange from parsl.executors.errors import ( @@ -212,7 +213,7 @@ def __init__(self, poll_period: int = 10, address_probe_timeout: Optional[int] = None, worker_logdir_root: Optional[str] = None, - block_error_handler: bool = True): + block_error_handler: Union[bool, Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None]] = True): logger.debug("Initializing HighThroughputExecutor") diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index 6a06ea45d0..29070ae4d5 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -1,19 +1,19 @@ +from __future__ import annotations import logging import threading from itertools import compress from abc import abstractmethod, abstractproperty from concurrent.futures import Future -from typing import List, Any, Dict, Optional, Tuple, Union +from typing import List, Any, Dict, Optional, Tuple, Union, Callable import parsl # noqa F401 from parsl.executors.base import ParslExecutor from parsl.executors.errors import BadStateException, ScalingFailed from parsl.jobs.states import JobStatus, JobState +from parsl.jobs.error_handlers import simple_error_handler, noop_error_handler from parsl.providers.base import ExecutionProvider from parsl.utils import AtomicIDCounter -import parsl.jobs.simple_error_handler as error_handler - logger = logging.getLogger(__name__) @@ -47,10 +47,18 @@ class BlockProviderExecutor(ParslExecutor): """ def __init__(self, *, provider: Optional[ExecutionProvider], - block_error_handler: bool): + block_error_handler: Union[bool, Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None]]): super().__init__() self._provider = provider - self.block_error_handler = block_error_handler + self.block_error_handler: Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None] + if isinstance(block_error_handler, bool): + if block_error_handler: + self.block_error_handler = simple_error_handler + else: + self.block_error_handler = noop_error_handler + else: + self.block_error_handler = block_error_handler + # errors can happen during the submit call to the provider; this is used # to keep track of such errors so that they can be handled in one place # together with errors reported by status() @@ -161,14 +169,7 @@ def handle_errors(self, status: Dict[str, JobStatus]) -> None: scheme will be used. :param status: status of all jobs launched by this executor """ - if not self.block_error_handler: - return - init_blocks = 3 - if hasattr(self.provider, 'init_blocks'): - init_blocks = self.provider.init_blocks - if init_blocks < 1: - init_blocks = 1 - error_handler.simple_error_handler(self, status, init_blocks) + self.block_error_handler(self, status) @property def tasks(self) -> Dict[object, Future]: diff --git a/parsl/jobs/simple_error_handler.py b/parsl/jobs/error_handlers.py similarity index 52% rename from parsl/jobs/simple_error_handler.py rename to parsl/jobs/error_handlers.py index 72481c6932..c35a87447d 100644 --- a/parsl/jobs/simple_error_handler.py +++ b/parsl/jobs/error_handlers.py @@ -4,14 +4,30 @@ import parsl.executors.status_handling as status_handling from parsl.jobs.states import JobStatus, JobState +from parsl.jobs.errors import TooManyJobFailuresError -def simple_error_handler(executor: status_handling.BlockProviderExecutor, status: Dict[str, JobStatus], threshold: int): +def noop_error_handler(executor: status_handling.BlockProviderExecutor, status: Dict[str, JobStatus], threshold: int = 3): + pass + + +def simple_error_handler(executor: status_handling.BlockProviderExecutor, status: Dict[str, JobStatus], threshold: int = 3): (total_jobs, failed_jobs) = _count_jobs(status) + if hasattr(executor.provider, "init_blocks"): + threshold = max(1, executor.provider.init_blocks) + if total_jobs >= threshold and failed_jobs == total_jobs: executor.set_bad_state_and_fail_all(_get_error(status)) +def windowed_error_handler(executor: status_handling.BlockProviderExecutor, status: Dict[str, JobStatus], threshold: int = 3): + sorted_status = [(key, status[key]) for key in sorted(status, key=lambda x: int(x))] + current_window = dict(sorted_status[-threshold:]) + total, failed = _count_jobs(current_window) + if failed == threshold: + executor.set_bad_state_and_fail_all(_get_error(status)) + + def _count_jobs(status: Dict[str, JobStatus]): total = 0 failed = 0 @@ -27,18 +43,25 @@ def _get_error(status: Dict[str, JobStatus]) -> Exception: err = "" count = 1 for js in status.values(): + err = err + f"Error {count}:\n" + count += 1 + if js.message is not None: - err = err + "{}. {}\n".format(count, js.message) - count += 1 + err = err + f"\t{js.message}\n" + + if js.exit_code is not None: + err = err + f"\tEXIT CODE: {js.exit_code}\n" + stdout = js.stdout_summary if stdout: err = err + "\tSTDOUT: {}\n".format(stdout) + stderr = js.stderr_summary if stderr: err = err + "\tSTDERR: {}\n".format(stderr) if len(err) == 0: - err = "[No error message received]" + err = "No error messages received" # wrapping things in an exception here doesn't really help in providing more information # than the string itself - return Exception(err) + return TooManyJobFailuresError(err) diff --git a/parsl/jobs/errors.py b/parsl/jobs/errors.py new file mode 100644 index 0000000000..6d42f429be --- /dev/null +++ b/parsl/jobs/errors.py @@ -0,0 +1,7 @@ +from parsl.errors import ParslError + + +class TooManyJobFailuresError(ParslError): + """Indicates that executor is shut down because of too many block failures. + """ + pass diff --git a/parsl/monitoring/remote.py b/parsl/monitoring/remote.py index 479a7ccfa4..d42e0079b4 100644 --- a/parsl/monitoring/remote.py +++ b/parsl/monitoring/remote.py @@ -262,7 +262,7 @@ def accumulate_and_prepare() -> Dict[str, Any]: next_send = time.time() accumulate_dur = 5.0 # TODO: make configurable? - while not terminate_event.is_set(): + while not terminate_event.is_set() and pm.is_running(): logging.debug("start of monitoring loop") try: d = accumulate_and_prepare() diff --git a/parsl/tests/test_scaling/test_block_error_handler.py b/parsl/tests/test_scaling/test_block_error_handler.py new file mode 100644 index 0000000000..9d680212e3 --- /dev/null +++ b/parsl/tests/test_scaling/test_block_error_handler.py @@ -0,0 +1,168 @@ +import pytest + +from parsl.executors import HighThroughputExecutor +from parsl.providers import LocalProvider +from unittest.mock import Mock +from parsl.jobs.states import JobStatus, JobState +from parsl.jobs.error_handlers import simple_error_handler, windowed_error_handler, noop_error_handler +from functools import partial + + +@pytest.mark.local +def test_block_error_handler_false(): + mock = Mock() + htex = HighThroughputExecutor(block_error_handler=False) + assert htex.block_error_handler is noop_error_handler + htex.set_bad_state_and_fail_all = mock + + bad_jobs = {'1': JobStatus(JobState.FAILED), + '2': JobStatus(JobState.FAILED), + '3': JobStatus(JobState.FAILED), + '4': JobStatus(JobState.FAILED)} + + htex.handle_errors(bad_jobs) + mock.assert_not_called() + + +@pytest.mark.local +def test_block_error_handler_mock(): + handler_mock = Mock() + htex = HighThroughputExecutor(block_error_handler=handler_mock) + assert htex.block_error_handler is handler_mock + + bad_jobs = {'1': JobStatus(JobState.FAILED), + '2': JobStatus(JobState.FAILED), + '3': JobStatus(JobState.FAILED), + '4': JobStatus(JobState.FAILED)} + + htex.handle_errors(bad_jobs) + handler_mock.assert_called() + handler_mock.assert_called_with(htex, bad_jobs) + + +@pytest.mark.local +def test_simple_error_handler(): + htex = HighThroughputExecutor(block_error_handler=simple_error_handler, + provider=LocalProvider(init_blocks=3)) + + assert htex.block_error_handler is simple_error_handler + + bad_state_mock = Mock() + htex.set_bad_state_and_fail_all = bad_state_mock + + bad_jobs = {'1': JobStatus(JobState.FAILED), + '2': JobStatus(JobState.FAILED)} + htex.handle_errors(bad_jobs) + bad_state_mock.assert_not_called() + + # Check the bad behavior where if any job is not failed + # bad state won't be set + bad_jobs = {'1': JobStatus(JobState.COMPLETED), + '2': JobStatus(JobState.FAILED), + '3': JobStatus(JobState.FAILED), + '4': JobStatus(JobState.FAILED)} + + htex.handle_errors(bad_jobs) + bad_state_mock.assert_not_called() + + bad_jobs = {'1': JobStatus(JobState.FAILED), + '2': JobStatus(JobState.FAILED), + '3': JobStatus(JobState.FAILED), + '4': JobStatus(JobState.FAILED)} + + htex.handle_errors(bad_jobs) + bad_state_mock.assert_called() + + +@pytest.mark.local +def test_windowed_error_handler(): + htex = HighThroughputExecutor(block_error_handler=windowed_error_handler) + assert htex.block_error_handler is windowed_error_handler + + bad_state_mock = Mock() + htex.set_bad_state_and_fail_all = bad_state_mock + + bad_jobs = {'1': JobStatus(JobState.FAILED), + '2': JobStatus(JobState.FAILED)} + htex.handle_errors(bad_jobs) + bad_state_mock.assert_not_called() + + bad_jobs = {'1': JobStatus(JobState.COMPLETED), + '2': JobStatus(JobState.FAILED), + '3': JobStatus(JobState.FAILED)} + htex.handle_errors(bad_jobs) + bad_state_mock.assert_not_called() + + bad_jobs = {'1': JobStatus(JobState.FAILED), + '2': JobStatus(JobState.FAILED), + '3': JobStatus(JobState.COMPLETED), + '4': JobStatus(JobState.FAILED)} + htex.handle_errors(bad_jobs) + bad_state_mock.assert_not_called() + + bad_jobs = {'1': JobStatus(JobState.COMPLETED), + '2': JobStatus(JobState.FAILED), + '3': JobStatus(JobState.FAILED), + '4': JobStatus(JobState.FAILED)} + htex.handle_errors(bad_jobs) + bad_state_mock.assert_called() + + +@pytest.mark.local +def test_windowed_error_handler_sorting(): + htex = HighThroughputExecutor(block_error_handler=windowed_error_handler) + assert htex.block_error_handler is windowed_error_handler + + bad_state_mock = Mock() + htex.set_bad_state_and_fail_all = bad_state_mock + + bad_jobs = {'8': JobStatus(JobState.FAILED), + '9': JobStatus(JobState.FAILED), + '10': JobStatus(JobState.FAILED), + '11': JobStatus(JobState.COMPLETED), + '12': JobStatus(JobState.COMPLETED)} + htex.handle_errors(bad_jobs) + bad_state_mock.assert_not_called() + + bad_jobs = {'8': JobStatus(JobState.COMPLETED), + '9': JobStatus(JobState.FAILED), + '21': JobStatus(JobState.FAILED), + '22': JobStatus(JobState.FAILED), + '10': JobStatus(JobState.FAILED)} + htex.handle_errors(bad_jobs) + bad_state_mock.assert_called() + + +@pytest.mark.local +def test_windowed_error_handler_with_threshold(): + error_handler = partial(windowed_error_handler, threshold=2) + htex = HighThroughputExecutor(block_error_handler=error_handler) + assert htex.block_error_handler is error_handler + + bad_state_mock = Mock() + htex.set_bad_state_and_fail_all = bad_state_mock + + bad_jobs = {'1': JobStatus(JobState.COMPLETED), + '2': JobStatus(JobState.FAILED)} + htex.handle_errors(bad_jobs) + bad_state_mock.assert_not_called() + + bad_jobs = {'1': JobStatus(JobState.COMPLETED), + '2': JobStatus(JobState.FAILED), + '3': JobStatus(JobState.COMPLETED)} + htex.handle_errors(bad_jobs) + bad_state_mock.assert_not_called() + + bad_jobs = {'1': JobStatus(JobState.COMPLETED), + '2': JobStatus(JobState.COMPLETED), + '3': JobStatus(JobState.COMPLETED), + '4': JobStatus(JobState.FAILED)} + htex.handle_errors(bad_jobs) + bad_state_mock.assert_not_called() + + bad_jobs = {'1': JobStatus(JobState.COMPLETED), + '2': JobStatus(JobState.COMPLETED), + '3': JobStatus(JobState.FAILED), + '4': JobStatus(JobState.FAILED)} + htex.handle_errors(bad_jobs) + bad_state_mock.assert_called()