From bf98e5049c34eaf1400663404e29672b85d3ac5e Mon Sep 17 00:00:00 2001 From: Nick Tyler Date: Tue, 4 Jun 2024 12:56:23 +0200 Subject: [PATCH 1/6] Use sacct to get slurm job information (#3422) Changes from the slurm squeue command to the sacct command. The sacct command is a bit easier on the Slurm scheduler as it connects to the slurm database instead of the slurm controller. One other part of using sacct is you can get job information for jobs which has finished as well so there may not be a need for the jobs_missing checks that are currently in the code. Co-authored-by: Ben Clifford --- parsl/providers/slurm/slurm.py | 53 ++++++++++++++++++++-------------- 1 file changed, 31 insertions(+), 22 deletions(-) diff --git a/parsl/providers/slurm/slurm.py b/parsl/providers/slurm/slurm.py index fead7143d5..ec6abeff56 100644 --- a/parsl/providers/slurm/slurm.py +++ b/parsl/providers/slurm/slurm.py @@ -19,25 +19,29 @@ logger = logging.getLogger(__name__) +# From https://slurm.schedmd.com/sacct.html#SECTION_JOB-STATE-CODES translate_table = { - 'PD': JobState.PENDING, - 'R': JobState.RUNNING, - 'CA': JobState.CANCELLED, - 'CF': JobState.PENDING, # (configuring), - 'CG': JobState.RUNNING, # (completing), - 'CD': JobState.COMPLETED, - 'F': JobState.FAILED, # (failed), - 'TO': JobState.TIMEOUT, # (timeout), - 'NF': JobState.FAILED, # (node failure), - 'RV': JobState.FAILED, # (revoked) and - 'SE': JobState.FAILED # (special exit state) + 'PENDING': JobState.PENDING, + 'RUNNING': JobState.RUNNING, + 'CANCELLED': JobState.CANCELLED, + 'COMPLETED': JobState.COMPLETED, + 'FAILED': JobState.FAILED, + 'NODE_FAIL': JobState.FAILED, + 'BOOT_FAIL': JobState.FAILED, + 'DEADLINE': JobState.TIMEOUT, + 'TIMEOUT': JobState.TIMEOUT, + 'REVOKED': JobState.FAILED, + 'OUT_OF_MEMORY': JobState.FAILED, + 'SUSPENDED': JobState.HELD, + 'PREEMPTED': JobState.TIMEOUT, + 'REQUEUED': JobState.PENDING } class SlurmProvider(ClusterProvider, RepresentationMixin): """Slurm Execution Provider - This provider uses sbatch to submit, squeue for status and scancel to cancel + This provider uses sbatch to submit, sacct for status and scancel to cancel jobs. The sbatch script to be used is created from a template file in this same module. @@ -168,14 +172,16 @@ def _status(self): logger.debug('No active jobs, skipping status update') return - cmd = "squeue --noheader --format='%i %t' --job '{0}'".format(job_id_list) + # Using state%20 to get enough characters to not truncate output + # of the state. Without output can look like " CANCELLED+" + cmd = "sacct -X --noheader --format=jobid,state%20 --job '{0}'".format(job_id_list) logger.debug("Executing %s", cmd) retcode, stdout, stderr = self.execute_wait(cmd) - logger.debug("squeue returned %s %s", stdout, stderr) + logger.debug("sacct returned %s %s", stdout, stderr) # Execute_wait failed. Do no update if retcode != 0: - logger.warning("squeue failed with non-zero exit code {}".format(retcode)) + logger.warning("sacct failed with non-zero exit code {}".format(retcode)) return jobs_missing = set(self.resources.keys()) @@ -183,7 +189,10 @@ def _status(self): if not line: # Blank line continue - job_id, slurm_state = line.split() + # Sacct includes extra information in some outputs + # For example " CANCELLED by " + # This splits and ignores anything past the first two unpacked values + job_id, slurm_state, *ignore = line.split() if slurm_state not in translate_table: logger.warning(f"Slurm status {slurm_state} is not recognized") status = translate_table.get(slurm_state, JobState.UNKNOWN) @@ -193,13 +202,13 @@ def _status(self): stderr_path=self.resources[job_id]['job_stderr_path']) jobs_missing.remove(job_id) - # squeue does not report on jobs that are not running. So we are filling in the - # blanks for missing jobs, we might lose some information about why the jobs failed. + # sacct can get job info after jobs have completed so this path shouldn't be hit + # log a warning if there are missing jobs for some reason for missing_job in jobs_missing: - logger.debug("Updating missing job {} to completed status".format(missing_job)) - self.resources[missing_job]['status'] = JobStatus(JobState.COMPLETED, - stdout_path=self.resources[missing_job]['job_stdout_path'], - stderr_path=self.resources[missing_job]['job_stderr_path']) + logger.warning("Updating missing job {} to completed status".format(missing_job)) + self.resources[missing_job]['status'] = JobStatus( + JobState.COMPLETED, stdout_path=self.resources[missing_job]['job_stdout_path'], + stderr_path=self.resources[missing_job]['job_stderr_path']) def submit(self, command: str, tasks_per_node: int, job_name="parsl.slurm") -> str: """Submit the command as a slurm job. From 204ac963464d1757cf83fb5e5f21b944e51ccf0c Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 4 Jun 2024 20:03:42 +0200 Subject: [PATCH 2/6] isort only parsl/ for consistency with flake8, mypy, lint-inits (#3469) --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 6ce923a9ac..0d368f4c59 100644 --- a/Makefile +++ b/Makefile @@ -37,7 +37,7 @@ lint: ## run linter script .PHONY: isort isort: ## run isort on all files - isort --check . + isort --check parsl/ .PHONY: flake8 flake8: ## run flake From f9e2bf5b3a57044bb38e3a1327b5aff672e9aac3 Mon Sep 17 00:00:00 2001 From: Nishchay Karle <45297081+NishchayKarle@users.noreply.github.com> Date: Thu, 6 Jun 2024 06:23:36 -0500 Subject: [PATCH 3/6] Allow users to select their preferred level of usage tracking (#3400) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR introduces a choice of 3 levels for users to select based on their preferred level of usage reporting. It introduces updates on top of #3229. Tracking Levels Level 1: python version, parsl version, operating system details. Level 2: configuration details + Level 1 Level 3: total apps run, total failed apps, execution time + Level 2 Usage tracking if currently enabled will be defaulted to level 1. Usage Data sent at launch (Levels 1 and 2) • Capture Parsl version, Python version, and environment details at startup. • Configuration Reporting: Log details about providers, launchers, executors, channels, and storage access methods used. Usage Data sent on closure (Level 3 only) • Number of apps ran • Number of failed apps • Total time elapsed --- parsl/config.py | 20 +++++-- parsl/tests/unit/test_usage_tracking.py | 45 +++++++++++++++ parsl/usage_tracking/levels.py | 6 ++ parsl/usage_tracking/usage.py | 77 +++++++++++++++++-------- 4 files changed, 121 insertions(+), 27 deletions(-) create mode 100644 parsl/tests/unit/test_usage_tracking.py create mode 100644 parsl/usage_tracking/levels.py diff --git a/parsl/config.py b/parsl/config.py index 8858ddb042..ecea149114 100644 --- a/parsl/config.py +++ b/parsl/config.py @@ -11,6 +11,8 @@ from parsl.executors.threads import ThreadPoolExecutor from parsl.monitoring import MonitoringHub from parsl.usage_tracking.api import UsageInformation +from parsl.usage_tracking.levels import DISABLED as USAGE_TRACKING_DISABLED +from parsl.usage_tracking.levels import LEVEL_3 as USAGE_TRACKING_LEVEL_3 from parsl.utils import RepresentationMixin logger = logging.getLogger(__name__) @@ -66,9 +68,12 @@ class Config(RepresentationMixin, UsageInformation): How often the scaling strategy should be executed. Default is 5 seconds. max_idletime : float, optional The maximum idle time allowed for an executor before strategy could shut down unused blocks. Default is 120.0 seconds. - usage_tracking : bool, optional - Set this field to True to opt-in to Parsl's usage tracking system. Parsl only collects minimal, non personally-identifiable, - information used for reporting to our funding agencies. Default is False. + usage_tracking : int, optional + Set this field to 1, 2, or 3 to opt-in to Parsl's usage tracking system. + The value represents the level of usage tracking detail to be collected. + Setting this field to 0 will disable usage tracking. Default (this field is not set): usage tracking is not enabled. + Parsl only collects minimal, non personally-identifiable, + information used for reporting to our funding agencies. initialize_logging : bool, optional Make DFK optionally not initialize any logging. Log messages will still be passed into the python logging system under the @@ -102,7 +107,7 @@ def __init__(self, strategy_period: Union[float, int] = 5, max_idletime: float = 120.0, monitoring: Optional[MonitoringHub] = None, - usage_tracking: bool = False, + usage_tracking: int = 0, initialize_logging: bool = True) -> None: executors = tuple(executors or []) @@ -136,6 +141,7 @@ def __init__(self, self.strategy = strategy self.strategy_period = strategy_period self.max_idletime = max_idletime + self.validate_usage_tracking(usage_tracking) self.usage_tracking = usage_tracking self.initialize_logging = initialize_logging self.monitoring = monitoring @@ -156,6 +162,12 @@ def _validate_executors(self) -> None: raise ConfigurationError('Executors must have unique labels ({})'.format( ', '.join(['label={}'.format(repr(d)) for d in duplicates]))) + def validate_usage_tracking(self, level: int) -> None: + if not USAGE_TRACKING_DISABLED <= level <= USAGE_TRACKING_LEVEL_3: + raise ConfigurationError( + f"Usage Tracking values must be 0, 1, 2, or 3 and not {level}" + ) + def get_usage_information(self): return {"executors_len": len(self.executors), "dependency_resolver": self.dependency_resolver is not None} diff --git a/parsl/tests/unit/test_usage_tracking.py b/parsl/tests/unit/test_usage_tracking.py new file mode 100644 index 0000000000..351355811c --- /dev/null +++ b/parsl/tests/unit/test_usage_tracking.py @@ -0,0 +1,45 @@ +"""Test usage_tracking values.""" + +import pytest + +import parsl +from parsl.config import Config +from parsl.errors import ConfigurationError + + +@pytest.mark.local +def test_config_load(): + """Test loading a config with usage tracking.""" + with parsl.load(Config(usage_tracking=3)): + pass + parsl.clear() + + +@pytest.mark.local +@pytest.mark.parametrize("level", (0, 1, 2, 3, False, True)) +def test_valid(level): + """Test valid usage_tracking values.""" + Config(usage_tracking=level) + assert Config(usage_tracking=level).usage_tracking == level + + +@pytest.mark.local +@pytest.mark.parametrize("level", (12, 1000, -1)) +def test_invalid_values(level): + """Test invalid usage_tracking values.""" + with pytest.raises(ConfigurationError): + Config(usage_tracking=level) + + +@pytest.mark.local +@pytest.mark.parametrize("level", ("abcd", None, bytes(1), 1.0, 1j, object())) +def test_invalid_types(level): + """Test invalid usage_tracking types.""" + with pytest.raises(Exception) as ex: + Config(usage_tracking=level) + + # with typeguard 4.x this is TypeCheckError, + # with typeguard 2.x this is TypeError + # we can't instantiate TypeCheckError if we're in typeguard 2.x environment + # because it does not exist... so check name using strings. + assert ex.type.__name__ in ["TypeCheckError", "TypeError"] diff --git a/parsl/usage_tracking/levels.py b/parsl/usage_tracking/levels.py new file mode 100644 index 0000000000..a772220ca3 --- /dev/null +++ b/parsl/usage_tracking/levels.py @@ -0,0 +1,6 @@ +"""Module for defining the usage tracking levels.""" + +DISABLED = 0 # Tracking is disabled +LEVEL_1 = 1 # Share info about Parsl version, Python version, platform +LEVEL_2 = 2 # Share info about config + level 1 +LEVEL_3 = 3 # Share info about app count, app fails, execution time + level 2 diff --git a/parsl/usage_tracking/usage.py b/parsl/usage_tracking/usage.py index 10acbd8e89..3730fcc464 100644 --- a/parsl/usage_tracking/usage.py +++ b/parsl/usage_tracking/usage.py @@ -7,8 +7,11 @@ import uuid from parsl.dataflow.states import States +from parsl.errors import ConfigurationError from parsl.multiprocessing import ForkProcess from parsl.usage_tracking.api import get_parsl_usage +from parsl.usage_tracking.levels import DISABLED as USAGE_TRACKING_DISABLED +from parsl.usage_tracking.levels import LEVEL_3 as USAGE_TRACKING_LEVEL_3 from parsl.utils import setproctitle from parsl.version import VERSION as PARSL_VERSION @@ -110,17 +113,32 @@ def __init__(self, dfk, port=50077, self.python_version = "{}.{}.{}".format(sys.version_info.major, sys.version_info.minor, sys.version_info.micro) - self.tracking_enabled = self.check_tracking_enabled() - logger.debug("Tracking status: {}".format(self.tracking_enabled)) - - def check_tracking_enabled(self): - """Check if tracking is enabled. - - Tracking will be enabled unless the following is true: - - 1. dfk.config.usage_tracking is set to False - + self.tracking_level = self.check_tracking_level() + self.start_time = None + logger.debug("Tracking level: {}".format(self.tracking_level)) + + def check_tracking_level(self) -> int: + """Check if tracking is enabled and return level. + + Checks usage_tracking in Config + - Possible values: [True, False, 0, 1, 2, 3] + + True/False values are treated as Level 1/Level 0 respectively. + + Returns: int + - 0 : Tracking is disabled + - 1 : Tracking is enabled with level 1 + Share info about Parsl version, Python version, platform + - 2 : Tracking is enabled with level 2 + Share info about config + level 1 + - 3 : Tracking is enabled with level 3 + Share info about app count, app fails, execution time + level 2 """ + if not USAGE_TRACKING_DISABLED <= self.config.usage_tracking <= USAGE_TRACKING_LEVEL_3: + raise ConfigurationError( + f"Usage Tracking values must be 0, 1, 2, or 3 and not {self.config.usage_tracking}" + ) + return self.config.usage_tracking def construct_start_message(self) -> bytes: @@ -133,18 +151,28 @@ def construct_start_message(self) -> bytes: 'parsl_v': self.parsl_version, 'python_v': self.python_version, 'platform.system': platform.system(), - 'start': int(time.time()), - 'components': get_parsl_usage(self.dfk._config)} + 'tracking_level': int(self.tracking_level)} + + if self.tracking_level >= 2: + message['components'] = get_parsl_usage(self.dfk._config) + + if self.tracking_level == 3: + self.start_time = int(time.time()) + message['start'] = self.start_time + logger.debug(f"Usage tracking start message: {message}") return self.encode_message(message) def construct_end_message(self) -> bytes: """Collect the final run information at the time of DFK cleanup. + This is only called if tracking level is 3. Returns: - Message dict dumped as json string, ready for UDP """ + end_time = int(time.time()) + app_count = self.dfk.task_count app_fails = self.dfk.task_state_counts[States.failed] + self.dfk.task_state_counts[States.dep_fail] @@ -157,7 +185,8 @@ def construct_end_message(self) -> bytes: 'app_fails': app_fails} message = {'correlator': self.correlator_uuid, - 'end': int(time.time()), + 'end': end_time, + 'execution_time': end_time - self.start_time, 'components': [dfk_component] + get_parsl_usage(self.dfk._config)} logger.debug(f"Usage tracking end message (unencoded): {message}") @@ -168,20 +197,22 @@ def encode_message(self, obj): def send_UDP_message(self, message: bytes) -> None: """Send UDP message.""" - if self.tracking_enabled: - try: - proc = udp_messenger(self.domain_name, self.UDP_PORT, self.sock_timeout, message) - self.procs.append(proc) - except Exception as e: - logger.debug("Usage tracking failed: {}".format(e)) + try: + proc = udp_messenger(self.domain_name, self.UDP_PORT, self.sock_timeout, message) + self.procs.append(proc) + except Exception as e: + logger.debug("Usage tracking failed: {}".format(e)) def send_start_message(self) -> None: - message = self.construct_start_message() - self.send_UDP_message(message) + if self.tracking_level: + self.start_time = time.time() + message = self.construct_start_message() + self.send_UDP_message(message) def send_end_message(self) -> None: - message = self.construct_end_message() - self.send_UDP_message(message) + if self.tracking_level == 3: + message = self.construct_end_message() + self.send_UDP_message(message) def close(self, timeout: float = 10.0) -> None: """First give each process one timeout period to finish what it is From 825b285e3ef636065eed6302910baa8bdeecdcac Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 6 Jun 2024 17:23:36 +0200 Subject: [PATCH 4/6] Add FAQ on surviving end of a batch job (#3475) --- docs/faq.rst | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/docs/faq.rst b/docs/faq.rst index 291016bda3..f427db82f9 100644 --- a/docs/faq.rst +++ b/docs/faq.rst @@ -358,3 +358,22 @@ or url = {https://doi.org/10.1145/3307681.3325400} } + +How can my tasks survive ``WorkerLost`` and ``ManagerLost`` at the end of a batch job? +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +When a batch job ends, pilot workers will be terminated by the batch system, +and any tasks running there will fail. With `HighThroughputExecutor`, +this failure will be reported as a `parsl.executors.high_throughput.errors.WorkerLost` or +`parsl.executors.high_throughput.interchange.ManagerLost` in the task future. + +To mitigate against this: + +* use retries by setting ``retries=`` in `parsl.config.Config`. +* if you only want to retry on certain errors such as `WorkerLost` and `ManagerLost`, + use ``retry_handler`` in `parsl.config.Config` to implement that policy. +* avoid sending tasks to batch jobs that will expire soon. With `HighThroughputExecutor`, + set drain_period to a little longer than you expect your tasks to take. + With `WorkQueueExecutor`, you can configure individual expected task duration using + a ``parsl_resource_specification`` and specify a worker ``--wall-time`` using the + ``worker_options`` parameter to the `WorkQueueExecutor`. From bc086f708a70933af69a8ec9903981a83ee2ffc9 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 7 Jun 2024 16:18:32 +0200 Subject: [PATCH 5/6] Re-enable Radical/MPI test that was waiting on issue #3029 (#3286) The fix to this issue comes from a change in Radical Cybertools, not a change in the Parsl codebase. See https://github.com/radical-cybertools/radical.saga/issues/885 --- parsl/tests/test_radical/test_mpi_funcs.py | 1 - 1 file changed, 1 deletion(-) diff --git a/parsl/tests/test_radical/test_mpi_funcs.py b/parsl/tests/test_radical/test_mpi_funcs.py index 2c69e26dd8..64f280ac19 100644 --- a/parsl/tests/test_radical/test_mpi_funcs.py +++ b/parsl/tests/test_radical/test_mpi_funcs.py @@ -16,7 +16,6 @@ def some_mpi_func(msg, sleep, comm=None, parsl_resource_specification={}): apps = [] -@pytest.mark.skip("hangs in CI - waiting for resolution of issue #3029") @pytest.mark.local @pytest.mark.radical def test_radical_mpi(n=7): From 2200f62feb99059b50707f90088b3712980637e4 Mon Sep 17 00:00:00 2001 From: shishichen <34603682+shishichen@users.noreply.github.com> Date: Mon, 10 Jun 2024 07:13:41 -0400 Subject: [PATCH 6/6] Added options to set annotations and a service account in the Kubernetes worker pod configuration (#3476) Added options for pod annotations and a service account to be used by the pod, which are passed through to the Kubernetes client. These options are needed when using the provider on Google Kubernetes Engine for pods to mount Google Cloud Storage persistent volumes. --- parsl/providers/kubernetes/kube.py | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/parsl/providers/kubernetes/kube.py b/parsl/providers/kubernetes/kube.py index 0b53881702..c5256a47f3 100644 --- a/parsl/providers/kubernetes/kube.py +++ b/parsl/providers/kubernetes/kube.py @@ -83,6 +83,10 @@ class KubernetesProvider(ExecutionProvider, RepresentationMixin): persistent_volumes: list[(str, str)] List of tuples describing persistent volumes to be mounted in the pod. The tuples consist of (PVC Name, Mount Directory). + service_account_name: str + Name of the service account to run the pod as. + annotations: Dict[str, str] + Annotations to set on the pod. """ @typeguard.typechecked def __init__(self, @@ -103,7 +107,9 @@ def __init__(self, group_id: Optional[str] = None, run_as_non_root: bool = False, secret: Optional[str] = None, - persistent_volumes: List[Tuple[str, str]] = []) -> None: + persistent_volumes: List[Tuple[str, str]] = [], + service_account_name: Optional[str] = None, + annotations: Optional[Dict[str, str]] = None) -> None: if not _kubernetes_enabled: raise OptionalModuleMissing(['kubernetes'], "Kubernetes provider requires kubernetes module and config.") @@ -146,6 +152,8 @@ def __init__(self, self.group_id = group_id self.run_as_non_root = run_as_non_root self.persistent_volumes = persistent_volumes + self.service_account_name = service_account_name + self.annotations = annotations self.kube_client = client.CoreV1Api() @@ -184,7 +192,9 @@ def submit(self, cmd_string, tasks_per_node, job_name="parsl"): pod_name=pod_name, job_name=job_name, cmd_string=formatted_cmd, - volumes=self.persistent_volumes) + volumes=self.persistent_volumes, + service_account_name=self.service_account_name, + annotations=self.annotations) self.resources[pod_name] = {'status': JobStatus(JobState.RUNNING)} return pod_name @@ -253,7 +263,9 @@ def _create_pod(self, job_name, port=80, cmd_string=None, - volumes=[]): + volumes=[], + service_account_name=None, + annotations=None): """ Create a kubernetes pod for the job. Args: - image (string) : Docker image to launch @@ -311,11 +323,12 @@ def _create_pod(self, claim_name=volume[0]))) metadata = client.V1ObjectMeta(name=pod_name, - labels={"app": job_name}) + labels={"app": job_name}, + annotations=annotations) spec = client.V1PodSpec(containers=[container], image_pull_secrets=[secret], - volumes=volume_defs - ) + volumes=volume_defs, + service_account_name=service_account_name) pod = client.V1Pod(spec=spec, metadata=metadata) api_response = self.kube_client.create_namespaced_pod(namespace=self.namespace,