Skip to content

Commit

Permalink
Merge branch 'master' into benc-context-exit-modes
Browse files Browse the repository at this point in the history
  • Loading branch information
benclifford authored Jun 10, 2024
2 parents 7ac8651 + 2200f62 commit 081112e
Show file tree
Hide file tree
Showing 9 changed files with 191 additions and 57 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ lint: ## run linter script

.PHONY: isort
isort: ## run isort on all files
isort --check .
isort --check parsl/

.PHONY: flake8
flake8: ## run flake
Expand Down
19 changes: 19 additions & 0 deletions docs/faq.rst
Original file line number Diff line number Diff line change
Expand Up @@ -358,3 +358,22 @@ or
url = {https://doi.org/10.1145/3307681.3325400}
}


How can my tasks survive ``WorkerLost`` and ``ManagerLost`` at the end of a batch job?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

When a batch job ends, pilot workers will be terminated by the batch system,
and any tasks running there will fail. With `HighThroughputExecutor`,
this failure will be reported as a `parsl.executors.high_throughput.errors.WorkerLost` or
`parsl.executors.high_throughput.interchange.ManagerLost` in the task future.

To mitigate against this:

* use retries by setting ``retries=`` in `parsl.config.Config`.
* if you only want to retry on certain errors such as `WorkerLost` and `ManagerLost`,
use ``retry_handler`` in `parsl.config.Config` to implement that policy.
* avoid sending tasks to batch jobs that will expire soon. With `HighThroughputExecutor`,
set drain_period to a little longer than you expect your tasks to take.
With `WorkQueueExecutor`, you can configure individual expected task duration using
a ``parsl_resource_specification`` and specify a worker ``--wall-time`` using the
``worker_options`` parameter to the `WorkQueueExecutor`.
20 changes: 16 additions & 4 deletions parsl/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from parsl.executors.threads import ThreadPoolExecutor
from parsl.monitoring import MonitoringHub
from parsl.usage_tracking.api import UsageInformation
from parsl.usage_tracking.levels import DISABLED as USAGE_TRACKING_DISABLED
from parsl.usage_tracking.levels import LEVEL_3 as USAGE_TRACKING_LEVEL_3
from parsl.utils import RepresentationMixin

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -75,9 +77,12 @@ class Config(RepresentationMixin, UsageInformation):
How often the scaling strategy should be executed. Default is 5 seconds.
max_idletime : float, optional
The maximum idle time allowed for an executor before strategy could shut down unused blocks. Default is 120.0 seconds.
usage_tracking : bool, optional
Set this field to True to opt-in to Parsl's usage tracking system. Parsl only collects minimal, non personally-identifiable,
information used for reporting to our funding agencies. Default is False.
usage_tracking : int, optional
Set this field to 1, 2, or 3 to opt-in to Parsl's usage tracking system.
The value represents the level of usage tracking detail to be collected.
Setting this field to 0 will disable usage tracking. Default (this field is not set): usage tracking is not enabled.
Parsl only collects minimal, non personally-identifiable,
information used for reporting to our funding agencies.
initialize_logging : bool, optional
Make DFK optionally not initialize any logging. Log messages
will still be passed into the python logging system under the
Expand Down Expand Up @@ -112,7 +117,7 @@ def __init__(self,
strategy_period: Union[float, int] = 5,
max_idletime: float = 120.0,
monitoring: Optional[MonitoringHub] = None,
usage_tracking: bool = False,
usage_tracking: int = 0,
initialize_logging: bool = True) -> None:

executors = tuple(executors or [])
Expand Down Expand Up @@ -147,6 +152,7 @@ def __init__(self,
self.strategy = strategy
self.strategy_period = strategy_period
self.max_idletime = max_idletime
self.validate_usage_tracking(usage_tracking)
self.usage_tracking = usage_tracking
self.initialize_logging = initialize_logging
self.monitoring = monitoring
Expand All @@ -167,6 +173,12 @@ def _validate_executors(self) -> None:
raise ConfigurationError('Executors must have unique labels ({})'.format(
', '.join(['label={}'.format(repr(d)) for d in duplicates])))

def validate_usage_tracking(self, level: int) -> None:
if not USAGE_TRACKING_DISABLED <= level <= USAGE_TRACKING_LEVEL_3:
raise ConfigurationError(
f"Usage Tracking values must be 0, 1, 2, or 3 and not {level}"
)

def get_usage_information(self):
return {"executors_len": len(self.executors),
"dependency_resolver": self.dependency_resolver is not None}
25 changes: 19 additions & 6 deletions parsl/providers/kubernetes/kube.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ class KubernetesProvider(ExecutionProvider, RepresentationMixin):
persistent_volumes: list[(str, str)]
List of tuples describing persistent volumes to be mounted in the pod.
The tuples consist of (PVC Name, Mount Directory).
service_account_name: str
Name of the service account to run the pod as.
annotations: Dict[str, str]
Annotations to set on the pod.
"""
@typeguard.typechecked
def __init__(self,
Expand All @@ -103,7 +107,9 @@ def __init__(self,
group_id: Optional[str] = None,
run_as_non_root: bool = False,
secret: Optional[str] = None,
persistent_volumes: List[Tuple[str, str]] = []) -> None:
persistent_volumes: List[Tuple[str, str]] = [],
service_account_name: Optional[str] = None,
annotations: Optional[Dict[str, str]] = None) -> None:
if not _kubernetes_enabled:
raise OptionalModuleMissing(['kubernetes'],
"Kubernetes provider requires kubernetes module and config.")
Expand Down Expand Up @@ -146,6 +152,8 @@ def __init__(self,
self.group_id = group_id
self.run_as_non_root = run_as_non_root
self.persistent_volumes = persistent_volumes
self.service_account_name = service_account_name
self.annotations = annotations

self.kube_client = client.CoreV1Api()

Expand Down Expand Up @@ -184,7 +192,9 @@ def submit(self, cmd_string, tasks_per_node, job_name="parsl"):
pod_name=pod_name,
job_name=job_name,
cmd_string=formatted_cmd,
volumes=self.persistent_volumes)
volumes=self.persistent_volumes,
service_account_name=self.service_account_name,
annotations=self.annotations)
self.resources[pod_name] = {'status': JobStatus(JobState.RUNNING)}

return pod_name
Expand Down Expand Up @@ -253,7 +263,9 @@ def _create_pod(self,
job_name,
port=80,
cmd_string=None,
volumes=[]):
volumes=[],
service_account_name=None,
annotations=None):
""" Create a kubernetes pod for the job.
Args:
- image (string) : Docker image to launch
Expand Down Expand Up @@ -311,11 +323,12 @@ def _create_pod(self,
claim_name=volume[0])))

metadata = client.V1ObjectMeta(name=pod_name,
labels={"app": job_name})
labels={"app": job_name},
annotations=annotations)
spec = client.V1PodSpec(containers=[container],
image_pull_secrets=[secret],
volumes=volume_defs
)
volumes=volume_defs,
service_account_name=service_account_name)

pod = client.V1Pod(spec=spec, metadata=metadata)
api_response = self.kube_client.create_namespaced_pod(namespace=self.namespace,
Expand Down
53 changes: 31 additions & 22 deletions parsl/providers/slurm/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,29 @@

logger = logging.getLogger(__name__)

# From https://slurm.schedmd.com/sacct.html#SECTION_JOB-STATE-CODES
translate_table = {
'PD': JobState.PENDING,
'R': JobState.RUNNING,
'CA': JobState.CANCELLED,
'CF': JobState.PENDING, # (configuring),
'CG': JobState.RUNNING, # (completing),
'CD': JobState.COMPLETED,
'F': JobState.FAILED, # (failed),
'TO': JobState.TIMEOUT, # (timeout),
'NF': JobState.FAILED, # (node failure),
'RV': JobState.FAILED, # (revoked) and
'SE': JobState.FAILED # (special exit state)
'PENDING': JobState.PENDING,
'RUNNING': JobState.RUNNING,
'CANCELLED': JobState.CANCELLED,
'COMPLETED': JobState.COMPLETED,
'FAILED': JobState.FAILED,
'NODE_FAIL': JobState.FAILED,
'BOOT_FAIL': JobState.FAILED,
'DEADLINE': JobState.TIMEOUT,
'TIMEOUT': JobState.TIMEOUT,
'REVOKED': JobState.FAILED,
'OUT_OF_MEMORY': JobState.FAILED,
'SUSPENDED': JobState.HELD,
'PREEMPTED': JobState.TIMEOUT,
'REQUEUED': JobState.PENDING
}


class SlurmProvider(ClusterProvider, RepresentationMixin):
"""Slurm Execution Provider
This provider uses sbatch to submit, squeue for status and scancel to cancel
This provider uses sbatch to submit, sacct for status and scancel to cancel
jobs. The sbatch script to be used is created from a template file in this
same module.
Expand Down Expand Up @@ -168,22 +172,27 @@ def _status(self):
logger.debug('No active jobs, skipping status update')
return

cmd = "squeue --noheader --format='%i %t' --job '{0}'".format(job_id_list)
# Using state%20 to get enough characters to not truncate output
# of the state. Without output can look like "<job_id> CANCELLED+"
cmd = "sacct -X --noheader --format=jobid,state%20 --job '{0}'".format(job_id_list)
logger.debug("Executing %s", cmd)
retcode, stdout, stderr = self.execute_wait(cmd)
logger.debug("squeue returned %s %s", stdout, stderr)
logger.debug("sacct returned %s %s", stdout, stderr)

# Execute_wait failed. Do no update
if retcode != 0:
logger.warning("squeue failed with non-zero exit code {}".format(retcode))
logger.warning("sacct failed with non-zero exit code {}".format(retcode))
return

jobs_missing = set(self.resources.keys())
for line in stdout.split('\n'):
if not line:
# Blank line
continue
job_id, slurm_state = line.split()
# Sacct includes extra information in some outputs
# For example "<job_id> CANCELLED by <user_id>"
# This splits and ignores anything past the first two unpacked values
job_id, slurm_state, *ignore = line.split()
if slurm_state not in translate_table:
logger.warning(f"Slurm status {slurm_state} is not recognized")
status = translate_table.get(slurm_state, JobState.UNKNOWN)
Expand All @@ -193,13 +202,13 @@ def _status(self):
stderr_path=self.resources[job_id]['job_stderr_path'])
jobs_missing.remove(job_id)

# squeue does not report on jobs that are not running. So we are filling in the
# blanks for missing jobs, we might lose some information about why the jobs failed.
# sacct can get job info after jobs have completed so this path shouldn't be hit
# log a warning if there are missing jobs for some reason
for missing_job in jobs_missing:
logger.debug("Updating missing job {} to completed status".format(missing_job))
self.resources[missing_job]['status'] = JobStatus(JobState.COMPLETED,
stdout_path=self.resources[missing_job]['job_stdout_path'],
stderr_path=self.resources[missing_job]['job_stderr_path'])
logger.warning("Updating missing job {} to completed status".format(missing_job))
self.resources[missing_job]['status'] = JobStatus(
JobState.COMPLETED, stdout_path=self.resources[missing_job]['job_stdout_path'],
stderr_path=self.resources[missing_job]['job_stderr_path'])

def submit(self, command: str, tasks_per_node: int, job_name="parsl.slurm") -> str:
"""Submit the command as a slurm job.
Expand Down
1 change: 0 additions & 1 deletion parsl/tests/test_radical/test_mpi_funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ def some_mpi_func(msg, sleep, comm=None, parsl_resource_specification={}):
apps = []


@pytest.mark.skip("hangs in CI - waiting for resolution of issue #3029")
@pytest.mark.local
@pytest.mark.radical
def test_radical_mpi(n=7):
Expand Down
45 changes: 45 additions & 0 deletions parsl/tests/unit/test_usage_tracking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
"""Test usage_tracking values."""

import pytest

import parsl
from parsl.config import Config
from parsl.errors import ConfigurationError


@pytest.mark.local
def test_config_load():
"""Test loading a config with usage tracking."""
with parsl.load(Config(usage_tracking=3)):
pass
parsl.clear()


@pytest.mark.local
@pytest.mark.parametrize("level", (0, 1, 2, 3, False, True))
def test_valid(level):
"""Test valid usage_tracking values."""
Config(usage_tracking=level)
assert Config(usage_tracking=level).usage_tracking == level


@pytest.mark.local
@pytest.mark.parametrize("level", (12, 1000, -1))
def test_invalid_values(level):
"""Test invalid usage_tracking values."""
with pytest.raises(ConfigurationError):
Config(usage_tracking=level)


@pytest.mark.local
@pytest.mark.parametrize("level", ("abcd", None, bytes(1), 1.0, 1j, object()))
def test_invalid_types(level):
"""Test invalid usage_tracking types."""
with pytest.raises(Exception) as ex:
Config(usage_tracking=level)

# with typeguard 4.x this is TypeCheckError,
# with typeguard 2.x this is TypeError
# we can't instantiate TypeCheckError if we're in typeguard 2.x environment
# because it does not exist... so check name using strings.
assert ex.type.__name__ in ["TypeCheckError", "TypeError"]
6 changes: 6 additions & 0 deletions parsl/usage_tracking/levels.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"""Module for defining the usage tracking levels."""

DISABLED = 0 # Tracking is disabled
LEVEL_1 = 1 # Share info about Parsl version, Python version, platform
LEVEL_2 = 2 # Share info about config + level 1
LEVEL_3 = 3 # Share info about app count, app fails, execution time + level 2
Loading

0 comments on commit 081112e

Please sign in to comment.