diff --git a/Makefile b/Makefile index 6ce923a9ac..0d368f4c59 100644 --- a/Makefile +++ b/Makefile @@ -37,7 +37,7 @@ lint: ## run linter script .PHONY: isort isort: ## run isort on all files - isort --check . + isort --check parsl/ .PHONY: flake8 flake8: ## run flake diff --git a/parsl/providers/slurm/slurm.py b/parsl/providers/slurm/slurm.py index fead7143d5..ec6abeff56 100644 --- a/parsl/providers/slurm/slurm.py +++ b/parsl/providers/slurm/slurm.py @@ -19,25 +19,29 @@ logger = logging.getLogger(__name__) +# From https://slurm.schedmd.com/sacct.html#SECTION_JOB-STATE-CODES translate_table = { - 'PD': JobState.PENDING, - 'R': JobState.RUNNING, - 'CA': JobState.CANCELLED, - 'CF': JobState.PENDING, # (configuring), - 'CG': JobState.RUNNING, # (completing), - 'CD': JobState.COMPLETED, - 'F': JobState.FAILED, # (failed), - 'TO': JobState.TIMEOUT, # (timeout), - 'NF': JobState.FAILED, # (node failure), - 'RV': JobState.FAILED, # (revoked) and - 'SE': JobState.FAILED # (special exit state) + 'PENDING': JobState.PENDING, + 'RUNNING': JobState.RUNNING, + 'CANCELLED': JobState.CANCELLED, + 'COMPLETED': JobState.COMPLETED, + 'FAILED': JobState.FAILED, + 'NODE_FAIL': JobState.FAILED, + 'BOOT_FAIL': JobState.FAILED, + 'DEADLINE': JobState.TIMEOUT, + 'TIMEOUT': JobState.TIMEOUT, + 'REVOKED': JobState.FAILED, + 'OUT_OF_MEMORY': JobState.FAILED, + 'SUSPENDED': JobState.HELD, + 'PREEMPTED': JobState.TIMEOUT, + 'REQUEUED': JobState.PENDING } class SlurmProvider(ClusterProvider, RepresentationMixin): """Slurm Execution Provider - This provider uses sbatch to submit, squeue for status and scancel to cancel + This provider uses sbatch to submit, sacct for status and scancel to cancel jobs. The sbatch script to be used is created from a template file in this same module. @@ -168,14 +172,16 @@ def _status(self): logger.debug('No active jobs, skipping status update') return - cmd = "squeue --noheader --format='%i %t' --job '{0}'".format(job_id_list) + # Using state%20 to get enough characters to not truncate output + # of the state. Without output can look like " CANCELLED+" + cmd = "sacct -X --noheader --format=jobid,state%20 --job '{0}'".format(job_id_list) logger.debug("Executing %s", cmd) retcode, stdout, stderr = self.execute_wait(cmd) - logger.debug("squeue returned %s %s", stdout, stderr) + logger.debug("sacct returned %s %s", stdout, stderr) # Execute_wait failed. Do no update if retcode != 0: - logger.warning("squeue failed with non-zero exit code {}".format(retcode)) + logger.warning("sacct failed with non-zero exit code {}".format(retcode)) return jobs_missing = set(self.resources.keys()) @@ -183,7 +189,10 @@ def _status(self): if not line: # Blank line continue - job_id, slurm_state = line.split() + # Sacct includes extra information in some outputs + # For example " CANCELLED by " + # This splits and ignores anything past the first two unpacked values + job_id, slurm_state, *ignore = line.split() if slurm_state not in translate_table: logger.warning(f"Slurm status {slurm_state} is not recognized") status = translate_table.get(slurm_state, JobState.UNKNOWN) @@ -193,13 +202,13 @@ def _status(self): stderr_path=self.resources[job_id]['job_stderr_path']) jobs_missing.remove(job_id) - # squeue does not report on jobs that are not running. So we are filling in the - # blanks for missing jobs, we might lose some information about why the jobs failed. + # sacct can get job info after jobs have completed so this path shouldn't be hit + # log a warning if there are missing jobs for some reason for missing_job in jobs_missing: - logger.debug("Updating missing job {} to completed status".format(missing_job)) - self.resources[missing_job]['status'] = JobStatus(JobState.COMPLETED, - stdout_path=self.resources[missing_job]['job_stdout_path'], - stderr_path=self.resources[missing_job]['job_stderr_path']) + logger.warning("Updating missing job {} to completed status".format(missing_job)) + self.resources[missing_job]['status'] = JobStatus( + JobState.COMPLETED, stdout_path=self.resources[missing_job]['job_stdout_path'], + stderr_path=self.resources[missing_job]['job_stderr_path']) def submit(self, command: str, tasks_per_node: int, job_name="parsl.slurm") -> str: """Submit the command as a slurm job.