Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into benc-tmp-sander
Browse files Browse the repository at this point in the history
  • Loading branch information
benclifford committed Jun 7, 2024
2 parents 026b18d + 825b285 commit 14dc325
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 27 deletions.
19 changes: 19 additions & 0 deletions docs/faq.rst
Original file line number Diff line number Diff line change
Expand Up @@ -358,3 +358,22 @@ or
url = {https://doi.org/10.1145/3307681.3325400}
}


How can my tasks survive ``WorkerLost`` and ``ManagerLost`` at the end of a batch job?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

When a batch job ends, pilot workers will be terminated by the batch system,
and any tasks running there will fail. With `HighThroughputExecutor`,
this failure will be reported as a `parsl.executors.high_throughput.errors.WorkerLost` or
`parsl.executors.high_throughput.interchange.ManagerLost` in the task future.

To mitigate against this:

* use retries by setting ``retries=`` in `parsl.config.Config`.
* if you only want to retry on certain errors such as `WorkerLost` and `ManagerLost`,
use ``retry_handler`` in `parsl.config.Config` to implement that policy.
* avoid sending tasks to batch jobs that will expire soon. With `HighThroughputExecutor`,
set drain_period to a little longer than you expect your tasks to take.
With `WorkQueueExecutor`, you can configure individual expected task duration using
a ``parsl_resource_specification`` and specify a worker ``--wall-time`` using the
``worker_options`` parameter to the `WorkQueueExecutor`.
20 changes: 16 additions & 4 deletions parsl/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from parsl.executors.threads import ThreadPoolExecutor
from parsl.monitoring import MonitoringHub
from parsl.usage_tracking.api import UsageInformation
from parsl.usage_tracking.levels import DISABLED as USAGE_TRACKING_DISABLED
from parsl.usage_tracking.levels import LEVEL_3 as USAGE_TRACKING_LEVEL_3
from parsl.utils import RepresentationMixin

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -66,9 +68,12 @@ class Config(RepresentationMixin, UsageInformation):
How often the scaling strategy should be executed. Default is 5 seconds.
max_idletime : float, optional
The maximum idle time allowed for an executor before strategy could shut down unused blocks. Default is 120.0 seconds.
usage_tracking : bool, optional
Set this field to True to opt-in to Parsl's usage tracking system. Parsl only collects minimal, non personally-identifiable,
information used for reporting to our funding agencies. Default is False.
usage_tracking : int, optional
Set this field to 1, 2, or 3 to opt-in to Parsl's usage tracking system.
The value represents the level of usage tracking detail to be collected.
Setting this field to 0 will disable usage tracking. Default (this field is not set): usage tracking is not enabled.
Parsl only collects minimal, non personally-identifiable,
information used for reporting to our funding agencies.
initialize_logging : bool, optional
Make DFK optionally not initialize any logging. Log messages
will still be passed into the python logging system under the
Expand Down Expand Up @@ -102,7 +107,7 @@ def __init__(self,
strategy_period: Union[float, int] = 5,
max_idletime: float = 120.0,
monitoring: Optional[MonitoringHub] = None,
usage_tracking: bool = False,
usage_tracking: int = 0,
initialize_logging: bool = True) -> None:

executors = tuple(executors or [])
Expand Down Expand Up @@ -136,6 +141,7 @@ def __init__(self,
self.strategy = strategy
self.strategy_period = strategy_period
self.max_idletime = max_idletime
self.validate_usage_tracking(usage_tracking)
self.usage_tracking = usage_tracking
self.initialize_logging = initialize_logging
self.monitoring = monitoring
Expand All @@ -156,6 +162,12 @@ def _validate_executors(self) -> None:
raise ConfigurationError('Executors must have unique labels ({})'.format(
', '.join(['label={}'.format(repr(d)) for d in duplicates])))

def validate_usage_tracking(self, level: int) -> None:
if not USAGE_TRACKING_DISABLED <= level <= USAGE_TRACKING_LEVEL_3:
raise ConfigurationError(
f"Usage Tracking values must be 0, 1, 2, or 3 and not {level}"
)

def get_usage_information(self):
return {"executors_len": len(self.executors),
"dependency_resolver": self.dependency_resolver is not None}
45 changes: 45 additions & 0 deletions parsl/tests/unit/test_usage_tracking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
"""Test usage_tracking values."""

import pytest

import parsl
from parsl.config import Config
from parsl.errors import ConfigurationError


@pytest.mark.local
def test_config_load():
"""Test loading a config with usage tracking."""
with parsl.load(Config(usage_tracking=3)):
pass
parsl.clear()


@pytest.mark.local
@pytest.mark.parametrize("level", (0, 1, 2, 3, False, True))
def test_valid(level):
"""Test valid usage_tracking values."""
Config(usage_tracking=level)
assert Config(usage_tracking=level).usage_tracking == level


@pytest.mark.local
@pytest.mark.parametrize("level", (12, 1000, -1))
def test_invalid_values(level):
"""Test invalid usage_tracking values."""
with pytest.raises(ConfigurationError):
Config(usage_tracking=level)


@pytest.mark.local
@pytest.mark.parametrize("level", ("abcd", None, bytes(1), 1.0, 1j, object()))
def test_invalid_types(level):
"""Test invalid usage_tracking types."""
with pytest.raises(Exception) as ex:
Config(usage_tracking=level)

# with typeguard 4.x this is TypeCheckError,
# with typeguard 2.x this is TypeError
# we can't instantiate TypeCheckError if we're in typeguard 2.x environment
# because it does not exist... so check name using strings.
assert ex.type.__name__ in ["TypeCheckError", "TypeError"]
6 changes: 6 additions & 0 deletions parsl/usage_tracking/levels.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"""Module for defining the usage tracking levels."""

DISABLED = 0 # Tracking is disabled
LEVEL_1 = 1 # Share info about Parsl version, Python version, platform
LEVEL_2 = 2 # Share info about config + level 1
LEVEL_3 = 3 # Share info about app count, app fails, execution time + level 2
77 changes: 54 additions & 23 deletions parsl/usage_tracking/usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@
import uuid

from parsl.dataflow.states import States
from parsl.errors import ConfigurationError
from parsl.multiprocessing import ForkProcess
from parsl.usage_tracking.api import get_parsl_usage
from parsl.usage_tracking.levels import DISABLED as USAGE_TRACKING_DISABLED
from parsl.usage_tracking.levels import LEVEL_3 as USAGE_TRACKING_LEVEL_3
from parsl.utils import setproctitle
from parsl.version import VERSION as PARSL_VERSION

Expand Down Expand Up @@ -110,17 +113,32 @@ def __init__(self, dfk, port=50077,
self.python_version = "{}.{}.{}".format(sys.version_info.major,
sys.version_info.minor,
sys.version_info.micro)
self.tracking_enabled = self.check_tracking_enabled()
logger.debug("Tracking status: {}".format(self.tracking_enabled))

def check_tracking_enabled(self):
"""Check if tracking is enabled.
Tracking will be enabled unless the following is true:
1. dfk.config.usage_tracking is set to False
self.tracking_level = self.check_tracking_level()
self.start_time = None
logger.debug("Tracking level: {}".format(self.tracking_level))

def check_tracking_level(self) -> int:
"""Check if tracking is enabled and return level.
Checks usage_tracking in Config
- Possible values: [True, False, 0, 1, 2, 3]
True/False values are treated as Level 1/Level 0 respectively.
Returns: int
- 0 : Tracking is disabled
- 1 : Tracking is enabled with level 1
Share info about Parsl version, Python version, platform
- 2 : Tracking is enabled with level 2
Share info about config + level 1
- 3 : Tracking is enabled with level 3
Share info about app count, app fails, execution time + level 2
"""
if not USAGE_TRACKING_DISABLED <= self.config.usage_tracking <= USAGE_TRACKING_LEVEL_3:
raise ConfigurationError(
f"Usage Tracking values must be 0, 1, 2, or 3 and not {self.config.usage_tracking}"
)

return self.config.usage_tracking

def construct_start_message(self) -> bytes:
Expand All @@ -133,18 +151,28 @@ def construct_start_message(self) -> bytes:
'parsl_v': self.parsl_version,
'python_v': self.python_version,
'platform.system': platform.system(),
'start': int(time.time()),
'components': get_parsl_usage(self.dfk._config)}
'tracking_level': int(self.tracking_level)}

if self.tracking_level >= 2:
message['components'] = get_parsl_usage(self.dfk._config)

if self.tracking_level == 3:
self.start_time = int(time.time())
message['start'] = self.start_time

logger.debug(f"Usage tracking start message: {message}")

return self.encode_message(message)

def construct_end_message(self) -> bytes:
"""Collect the final run information at the time of DFK cleanup.
This is only called if tracking level is 3.
Returns:
- Message dict dumped as json string, ready for UDP
"""
end_time = int(time.time())

app_count = self.dfk.task_count

app_fails = self.dfk.task_state_counts[States.failed] + self.dfk.task_state_counts[States.dep_fail]
Expand All @@ -157,7 +185,8 @@ def construct_end_message(self) -> bytes:
'app_fails': app_fails}

message = {'correlator': self.correlator_uuid,
'end': int(time.time()),
'end': end_time,
'execution_time': end_time - self.start_time,
'components': [dfk_component] + get_parsl_usage(self.dfk._config)}
logger.debug(f"Usage tracking end message (unencoded): {message}")

Expand All @@ -168,20 +197,22 @@ def encode_message(self, obj):

def send_UDP_message(self, message: bytes) -> None:
"""Send UDP message."""
if self.tracking_enabled:
try:
proc = udp_messenger(self.domain_name, self.UDP_PORT, self.sock_timeout, message)
self.procs.append(proc)
except Exception as e:
logger.debug("Usage tracking failed: {}".format(e))
try:
proc = udp_messenger(self.domain_name, self.UDP_PORT, self.sock_timeout, message)
self.procs.append(proc)
except Exception as e:
logger.debug("Usage tracking failed: {}".format(e))

def send_start_message(self) -> None:
message = self.construct_start_message()
self.send_UDP_message(message)
if self.tracking_level:
self.start_time = time.time()
message = self.construct_start_message()
self.send_UDP_message(message)

def send_end_message(self) -> None:
message = self.construct_end_message()
self.send_UDP_message(message)
if self.tracking_level == 3:
message = self.construct_end_message()
self.send_UDP_message(message)

def close(self, timeout: float = 10.0) -> None:
"""First give each process one timeout period to finish what it is
Expand Down

0 comments on commit 14dc325

Please sign in to comment.