Skip to content

Commit

Permalink
Merge branch 'master' into benc-flake8-bugbear
Browse files Browse the repository at this point in the history
  • Loading branch information
benclifford authored Sep 14, 2023
2 parents 8b0797e + 465eb43 commit 1cd646e
Show file tree
Hide file tree
Showing 9 changed files with 238 additions and 28 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/python-publish-to-testpypi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ jobs:

steps:
- uses: actions/checkout@v3

- name: Check if this commit is already released
id: already_released
run: |
if git tag --contains HEAD | grep -e '^[0-9]\{4\}\.[0-9]\{2\}\.[0-9]\{2\}$' ; then exit 1 ; fi
- name: Set up Python
uses: actions/setup-python@v3
with:
Expand Down
1 change: 1 addition & 0 deletions .wci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ execution_environment:
- LSF
- PBS
- Cobalt
- Flux
- GridEngine
- HTCondor
- AWS
Expand Down
17 changes: 10 additions & 7 deletions parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class DataFlowKernel:
"""

@typechecked
def __init__(self, config: Config = Config()) -> None:
def __init__(self, config: Config) -> None:
"""Initialize the DataFlowKernel.
Parameters
Expand Down Expand Up @@ -898,10 +898,10 @@ def _unwrap_futures(self, args, kwargs):
def submit(self,
func: Callable,
app_args: Sequence[Any],
executors: Union[str, Sequence[str]] = 'all',
cache: bool = False,
ignore_for_cache: Optional[Sequence[str]] = None,
app_kwargs: Dict[str, Any] = {},
executors: Union[str, Sequence[str]],
cache: bool,
ignore_for_cache: Optional[Sequence[str]],
app_kwargs: Dict[str, Any],
join: bool = False) -> AppFuture:
"""Add task to the dataflow system.
Expand Down Expand Up @@ -1146,8 +1146,11 @@ def wait_for_current_tasks(self) -> None:

logger.info("Waiting for all remaining tasks to complete")

items = list(self.tasks.items())
for task_id, task_record in items:
# .values is made into a list immediately to reduce (although not
# eliminate) a race condition where self.tasks can be modified
# elsewhere by a completing task being removed from the dictionary.
task_records = list(self.tasks.values())
for task_record in task_records:
# .exception() is a less exception throwing way of
# waiting for completion than .result()
fut = task_record['app_fu']
Expand Down
5 changes: 3 additions & 2 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@
import warnings
from multiprocessing import Queue
from typing import Dict, Sequence # noqa F401 (used in type annotation)
from typing import List, Optional, Tuple, Union
from typing import List, Optional, Tuple, Union, Callable
import math

from parsl.serialize import pack_apply_message, deserialize
from parsl.serialize.errors import SerializationError, DeserializationError
from parsl.app.errors import RemoteExceptionWrapper
from parsl.jobs.states import JobStatus
from parsl.executors.high_throughput import zmq_pipes
from parsl.executors.high_throughput import interchange
from parsl.executors.errors import (
Expand Down Expand Up @@ -212,7 +213,7 @@ def __init__(self,
poll_period: int = 10,
address_probe_timeout: Optional[int] = None,
worker_logdir_root: Optional[str] = None,
block_error_handler: bool = True):
block_error_handler: Union[bool, Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None]] = True):

logger.debug("Initializing HighThroughputExecutor")

Expand Down
27 changes: 14 additions & 13 deletions parsl/executors/status_handling.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
from __future__ import annotations
import logging
import threading
from itertools import compress
from abc import abstractmethod, abstractproperty
from concurrent.futures import Future
from typing import List, Any, Dict, Optional, Tuple, Union
from typing import List, Any, Dict, Optional, Tuple, Union, Callable

import parsl # noqa F401
from parsl.executors.base import ParslExecutor
from parsl.executors.errors import BadStateException, ScalingFailed
from parsl.jobs.states import JobStatus, JobState
from parsl.jobs.error_handlers import simple_error_handler, noop_error_handler
from parsl.providers.base import ExecutionProvider
from parsl.utils import AtomicIDCounter

import parsl.jobs.simple_error_handler as error_handler

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -47,10 +47,18 @@ class BlockProviderExecutor(ParslExecutor):
"""
def __init__(self, *,
provider: Optional[ExecutionProvider],
block_error_handler: bool):
block_error_handler: Union[bool, Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None]]):
super().__init__()
self._provider = provider
self.block_error_handler = block_error_handler
self.block_error_handler: Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None]
if isinstance(block_error_handler, bool):
if block_error_handler:
self.block_error_handler = simple_error_handler
else:
self.block_error_handler = noop_error_handler
else:
self.block_error_handler = block_error_handler

# errors can happen during the submit call to the provider; this is used
# to keep track of such errors so that they can be handled in one place
# together with errors reported by status()
Expand Down Expand Up @@ -161,14 +169,7 @@ def handle_errors(self, status: Dict[str, JobStatus]) -> None:
scheme will be used.
:param status: status of all jobs launched by this executor
"""
if not self.block_error_handler:
return
init_blocks = 3
if hasattr(self.provider, 'init_blocks'):
init_blocks = self.provider.init_blocks
if init_blocks < 1:
init_blocks = 1
error_handler.simple_error_handler(self, status, init_blocks)
self.block_error_handler(self, status)

@property
def tasks(self) -> Dict[object, Future]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,30 @@

import parsl.executors.status_handling as status_handling
from parsl.jobs.states import JobStatus, JobState
from parsl.jobs.errors import TooManyJobFailuresError


def simple_error_handler(executor: status_handling.BlockProviderExecutor, status: Dict[str, JobStatus], threshold: int):
def noop_error_handler(executor: status_handling.BlockProviderExecutor, status: Dict[str, JobStatus], threshold: int = 3):
pass


def simple_error_handler(executor: status_handling.BlockProviderExecutor, status: Dict[str, JobStatus], threshold: int = 3):
(total_jobs, failed_jobs) = _count_jobs(status)
if hasattr(executor.provider, "init_blocks"):
threshold = max(1, executor.provider.init_blocks)

if total_jobs >= threshold and failed_jobs == total_jobs:
executor.set_bad_state_and_fail_all(_get_error(status))


def windowed_error_handler(executor: status_handling.BlockProviderExecutor, status: Dict[str, JobStatus], threshold: int = 3):
sorted_status = [(key, status[key]) for key in sorted(status, key=lambda x: int(x))]
current_window = dict(sorted_status[-threshold:])
total, failed = _count_jobs(current_window)
if failed == threshold:
executor.set_bad_state_and_fail_all(_get_error(status))


def _count_jobs(status: Dict[str, JobStatus]):
total = 0
failed = 0
Expand All @@ -27,18 +43,25 @@ def _get_error(status: Dict[str, JobStatus]) -> Exception:
err = ""
count = 1
for js in status.values():
err = err + f"Error {count}:\n"
count += 1

if js.message is not None:
err = err + "{}. {}\n".format(count, js.message)
count += 1
err = err + f"\t{js.message}\n"

if js.exit_code is not None:
err = err + f"\tEXIT CODE: {js.exit_code}\n"

stdout = js.stdout_summary
if stdout:
err = err + "\tSTDOUT: {}\n".format(stdout)

stderr = js.stderr_summary
if stderr:
err = err + "\tSTDERR: {}\n".format(stderr)

if len(err) == 0:
err = "[No error message received]"
err = "No error messages received"
# wrapping things in an exception here doesn't really help in providing more information
# than the string itself
return Exception(err)
return TooManyJobFailuresError(err)
7 changes: 7 additions & 0 deletions parsl/jobs/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from parsl.errors import ParslError


class TooManyJobFailuresError(ParslError):
"""Indicates that executor is shut down because of too many block failures.
"""
pass
2 changes: 1 addition & 1 deletion parsl/monitoring/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ def accumulate_and_prepare() -> Dict[str, Any]:
next_send = time.time()
accumulate_dur = 5.0 # TODO: make configurable?

while not terminate_event.is_set():
while not terminate_event.is_set() and pm.is_running():
logging.debug("start of monitoring loop")
try:
d = accumulate_and_prepare()
Expand Down
168 changes: 168 additions & 0 deletions parsl/tests/test_scaling/test_block_error_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
import pytest

from parsl.executors import HighThroughputExecutor
from parsl.providers import LocalProvider
from unittest.mock import Mock
from parsl.jobs.states import JobStatus, JobState
from parsl.jobs.error_handlers import simple_error_handler, windowed_error_handler, noop_error_handler
from functools import partial


@pytest.mark.local
def test_block_error_handler_false():
mock = Mock()
htex = HighThroughputExecutor(block_error_handler=False)
assert htex.block_error_handler is noop_error_handler
htex.set_bad_state_and_fail_all = mock

bad_jobs = {'1': JobStatus(JobState.FAILED),
'2': JobStatus(JobState.FAILED),
'3': JobStatus(JobState.FAILED),
'4': JobStatus(JobState.FAILED)}

htex.handle_errors(bad_jobs)
mock.assert_not_called()


@pytest.mark.local
def test_block_error_handler_mock():
handler_mock = Mock()
htex = HighThroughputExecutor(block_error_handler=handler_mock)
assert htex.block_error_handler is handler_mock

bad_jobs = {'1': JobStatus(JobState.FAILED),
'2': JobStatus(JobState.FAILED),
'3': JobStatus(JobState.FAILED),
'4': JobStatus(JobState.FAILED)}

htex.handle_errors(bad_jobs)
handler_mock.assert_called()
handler_mock.assert_called_with(htex, bad_jobs)


@pytest.mark.local
def test_simple_error_handler():
htex = HighThroughputExecutor(block_error_handler=simple_error_handler,
provider=LocalProvider(init_blocks=3))

assert htex.block_error_handler is simple_error_handler

bad_state_mock = Mock()
htex.set_bad_state_and_fail_all = bad_state_mock

bad_jobs = {'1': JobStatus(JobState.FAILED),
'2': JobStatus(JobState.FAILED)}
htex.handle_errors(bad_jobs)
bad_state_mock.assert_not_called()

# Check the bad behavior where if any job is not failed
# bad state won't be set
bad_jobs = {'1': JobStatus(JobState.COMPLETED),
'2': JobStatus(JobState.FAILED),
'3': JobStatus(JobState.FAILED),
'4': JobStatus(JobState.FAILED)}

htex.handle_errors(bad_jobs)
bad_state_mock.assert_not_called()

bad_jobs = {'1': JobStatus(JobState.FAILED),
'2': JobStatus(JobState.FAILED),
'3': JobStatus(JobState.FAILED),
'4': JobStatus(JobState.FAILED)}

htex.handle_errors(bad_jobs)
bad_state_mock.assert_called()


@pytest.mark.local
def test_windowed_error_handler():
htex = HighThroughputExecutor(block_error_handler=windowed_error_handler)
assert htex.block_error_handler is windowed_error_handler

bad_state_mock = Mock()
htex.set_bad_state_and_fail_all = bad_state_mock

bad_jobs = {'1': JobStatus(JobState.FAILED),
'2': JobStatus(JobState.FAILED)}
htex.handle_errors(bad_jobs)
bad_state_mock.assert_not_called()

bad_jobs = {'1': JobStatus(JobState.COMPLETED),
'2': JobStatus(JobState.FAILED),
'3': JobStatus(JobState.FAILED)}
htex.handle_errors(bad_jobs)
bad_state_mock.assert_not_called()

bad_jobs = {'1': JobStatus(JobState.FAILED),
'2': JobStatus(JobState.FAILED),
'3': JobStatus(JobState.COMPLETED),
'4': JobStatus(JobState.FAILED)}
htex.handle_errors(bad_jobs)
bad_state_mock.assert_not_called()

bad_jobs = {'1': JobStatus(JobState.COMPLETED),
'2': JobStatus(JobState.FAILED),
'3': JobStatus(JobState.FAILED),
'4': JobStatus(JobState.FAILED)}
htex.handle_errors(bad_jobs)
bad_state_mock.assert_called()


@pytest.mark.local
def test_windowed_error_handler_sorting():
htex = HighThroughputExecutor(block_error_handler=windowed_error_handler)
assert htex.block_error_handler is windowed_error_handler

bad_state_mock = Mock()
htex.set_bad_state_and_fail_all = bad_state_mock

bad_jobs = {'8': JobStatus(JobState.FAILED),
'9': JobStatus(JobState.FAILED),
'10': JobStatus(JobState.FAILED),
'11': JobStatus(JobState.COMPLETED),
'12': JobStatus(JobState.COMPLETED)}
htex.handle_errors(bad_jobs)
bad_state_mock.assert_not_called()

bad_jobs = {'8': JobStatus(JobState.COMPLETED),
'9': JobStatus(JobState.FAILED),
'21': JobStatus(JobState.FAILED),
'22': JobStatus(JobState.FAILED),
'10': JobStatus(JobState.FAILED)}
htex.handle_errors(bad_jobs)
bad_state_mock.assert_called()


@pytest.mark.local
def test_windowed_error_handler_with_threshold():
error_handler = partial(windowed_error_handler, threshold=2)
htex = HighThroughputExecutor(block_error_handler=error_handler)
assert htex.block_error_handler is error_handler

bad_state_mock = Mock()
htex.set_bad_state_and_fail_all = bad_state_mock

bad_jobs = {'1': JobStatus(JobState.COMPLETED),
'2': JobStatus(JobState.FAILED)}
htex.handle_errors(bad_jobs)
bad_state_mock.assert_not_called()

bad_jobs = {'1': JobStatus(JobState.COMPLETED),
'2': JobStatus(JobState.FAILED),
'3': JobStatus(JobState.COMPLETED)}
htex.handle_errors(bad_jobs)
bad_state_mock.assert_not_called()

bad_jobs = {'1': JobStatus(JobState.COMPLETED),
'2': JobStatus(JobState.COMPLETED),
'3': JobStatus(JobState.COMPLETED),
'4': JobStatus(JobState.FAILED)}
htex.handle_errors(bad_jobs)
bad_state_mock.assert_not_called()

bad_jobs = {'1': JobStatus(JobState.COMPLETED),
'2': JobStatus(JobState.COMPLETED),
'3': JobStatus(JobState.FAILED),
'4': JobStatus(JobState.FAILED)}
htex.handle_errors(bad_jobs)
bad_state_mock.assert_called()

0 comments on commit 1cd646e

Please sign in to comment.