Skip to content

Commit

Permalink
Merge branch 'master' into update-usage-tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
benclifford authored Jun 5, 2024
2 parents 8591614 + 204ac96 commit 3b3ced0
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 23 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ lint: ## run linter script

.PHONY: isort
isort: ## run isort on all files
isort --check .
isort --check parsl/

.PHONY: flake8
flake8: ## run flake
Expand Down
53 changes: 31 additions & 22 deletions parsl/providers/slurm/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,29 @@

logger = logging.getLogger(__name__)

# From https://slurm.schedmd.com/sacct.html#SECTION_JOB-STATE-CODES
translate_table = {
'PD': JobState.PENDING,
'R': JobState.RUNNING,
'CA': JobState.CANCELLED,
'CF': JobState.PENDING, # (configuring),
'CG': JobState.RUNNING, # (completing),
'CD': JobState.COMPLETED,
'F': JobState.FAILED, # (failed),
'TO': JobState.TIMEOUT, # (timeout),
'NF': JobState.FAILED, # (node failure),
'RV': JobState.FAILED, # (revoked) and
'SE': JobState.FAILED # (special exit state)
'PENDING': JobState.PENDING,
'RUNNING': JobState.RUNNING,
'CANCELLED': JobState.CANCELLED,
'COMPLETED': JobState.COMPLETED,
'FAILED': JobState.FAILED,
'NODE_FAIL': JobState.FAILED,
'BOOT_FAIL': JobState.FAILED,
'DEADLINE': JobState.TIMEOUT,
'TIMEOUT': JobState.TIMEOUT,
'REVOKED': JobState.FAILED,
'OUT_OF_MEMORY': JobState.FAILED,
'SUSPENDED': JobState.HELD,
'PREEMPTED': JobState.TIMEOUT,
'REQUEUED': JobState.PENDING
}


class SlurmProvider(ClusterProvider, RepresentationMixin):
"""Slurm Execution Provider
This provider uses sbatch to submit, squeue for status and scancel to cancel
This provider uses sbatch to submit, sacct for status and scancel to cancel
jobs. The sbatch script to be used is created from a template file in this
same module.
Expand Down Expand Up @@ -168,22 +172,27 @@ def _status(self):
logger.debug('No active jobs, skipping status update')
return

cmd = "squeue --noheader --format='%i %t' --job '{0}'".format(job_id_list)
# Using state%20 to get enough characters to not truncate output
# of the state. Without output can look like "<job_id> CANCELLED+"
cmd = "sacct -X --noheader --format=jobid,state%20 --job '{0}'".format(job_id_list)
logger.debug("Executing %s", cmd)
retcode, stdout, stderr = self.execute_wait(cmd)
logger.debug("squeue returned %s %s", stdout, stderr)
logger.debug("sacct returned %s %s", stdout, stderr)

# Execute_wait failed. Do no update
if retcode != 0:
logger.warning("squeue failed with non-zero exit code {}".format(retcode))
logger.warning("sacct failed with non-zero exit code {}".format(retcode))
return

jobs_missing = set(self.resources.keys())
for line in stdout.split('\n'):
if not line:
# Blank line
continue
job_id, slurm_state = line.split()
# Sacct includes extra information in some outputs
# For example "<job_id> CANCELLED by <user_id>"
# This splits and ignores anything past the first two unpacked values
job_id, slurm_state, *ignore = line.split()
if slurm_state not in translate_table:
logger.warning(f"Slurm status {slurm_state} is not recognized")
status = translate_table.get(slurm_state, JobState.UNKNOWN)
Expand All @@ -193,13 +202,13 @@ def _status(self):
stderr_path=self.resources[job_id]['job_stderr_path'])
jobs_missing.remove(job_id)

# squeue does not report on jobs that are not running. So we are filling in the
# blanks for missing jobs, we might lose some information about why the jobs failed.
# sacct can get job info after jobs have completed so this path shouldn't be hit
# log a warning if there are missing jobs for some reason
for missing_job in jobs_missing:
logger.debug("Updating missing job {} to completed status".format(missing_job))
self.resources[missing_job]['status'] = JobStatus(JobState.COMPLETED,
stdout_path=self.resources[missing_job]['job_stdout_path'],
stderr_path=self.resources[missing_job]['job_stderr_path'])
logger.warning("Updating missing job {} to completed status".format(missing_job))
self.resources[missing_job]['status'] = JobStatus(
JobState.COMPLETED, stdout_path=self.resources[missing_job]['job_stdout_path'],
stderr_path=self.resources[missing_job]['job_stderr_path'])

def submit(self, command: str, tasks_per_node: int, job_name="parsl.slurm") -> str:
"""Submit the command as a slurm job.
Expand Down

0 comments on commit 3b3ced0

Please sign in to comment.