Skip to content

Commit

Permalink
Merge branch 'master' into benc-monitoring-router-one-queue
Browse files Browse the repository at this point in the history
  • Loading branch information
benclifford authored Aug 20, 2024
2 parents c5868af + bdfbb26 commit 412bdbb
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 11 deletions.
5 changes: 4 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Parsl - Parallel Scripting Library
==================================
|licence| |build-status| |docs| |NSF-1550588| |NSF-1550476| |NSF-1550562| |NSF-1550528|
|licence| |build-status| |docs| |NSF-1550588| |NSF-1550476| |NSF-1550562| |NSF-1550528| |CZI-EOSS|

Parsl extends parallelism in Python beyond a single computer.

Expand Down Expand Up @@ -64,6 +64,9 @@ then explore the `parallel computing patterns <https://parsl.readthedocs.io/en/s
.. |NSF-1550475| image:: https://img.shields.io/badge/NSF-1550475-blue.svg
:target: https://nsf.gov/awardsearch/showAward?AWD_ID=1550475
:alt: NSF award info
.. |CZI-EOSS| image:: https://chanzuckerberg.github.io/open-science/badges/CZI-EOSS.svg
:target: https://czi.co/EOSS
:alt: CZI's Essential Open Source Software for Science


Quickstart
Expand Down
50 changes: 40 additions & 10 deletions parsl/providers/slurm/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
logger = logging.getLogger(__name__)

# From https://slurm.schedmd.com/sacct.html#SECTION_JOB-STATE-CODES
translate_table = {
sacct_translate_table = {
'PENDING': JobState.PENDING,
'RUNNING': JobState.RUNNING,
'CANCELLED': JobState.CANCELLED,
Expand All @@ -37,6 +37,20 @@
'REQUEUED': JobState.PENDING
}

squeue_translate_table = {
'PD': JobState.PENDING,
'R': JobState.RUNNING,
'CA': JobState.CANCELLED,
'CF': JobState.PENDING, # (configuring),
'CG': JobState.RUNNING, # (completing),
'CD': JobState.COMPLETED,
'F': JobState.FAILED, # (failed),
'TO': JobState.TIMEOUT, # (timeout),
'NF': JobState.FAILED, # (node failure),
'RV': JobState.FAILED, # (revoked) and
'SE': JobState.FAILED # (special exit state)
}


class SlurmProvider(ClusterProvider, RepresentationMixin):
"""Slurm Execution Provider
Expand Down Expand Up @@ -155,6 +169,23 @@ def __init__(self,

self.regex_job_id = regex_job_id
self.worker_init = worker_init + '\n'
# Check if sacct works and if not fall back to squeue
cmd = "sacct -X"
logger.debug("Executing %s", cmd)
retcode, stdout, stderr = self.execute_wait(cmd)
# If sacct fails it should return retcode=1 stderr="Slurm accounting storage is disabled"
logger.debug(f"sacct returned retcode={retcode} stderr={stderr}")
if retcode == 0:
logger.debug("using sacct to get job status")
# Using state%20 to get enough characters to not truncate output
# of the state. Without output can look like "<job_id> CANCELLED+"
self._cmd = "sacct -X --noheader --format=jobid,state%20 --job '{0}'"
self._translate_table = sacct_translate_table
else:
logger.debug(f"sacct failed with retcode={retcode}")
logger.debug("falling back to using squeue to get job status")
self._cmd = "squeue --noheader --format='%i %t' --job '{0}'"
self._translate_table = squeue_translate_table

def _status(self):
'''Returns the status list for a list of job_ids
Expand All @@ -172,16 +203,14 @@ def _status(self):
logger.debug('No active jobs, skipping status update')
return

# Using state%20 to get enough characters to not truncate output
# of the state. Without output can look like "<job_id> CANCELLED+"
cmd = "sacct -X --noheader --format=jobid,state%20 --job '{0}'".format(job_id_list)
cmd = self._cmd.format(job_id_list)
logger.debug("Executing %s", cmd)
retcode, stdout, stderr = self.execute_wait(cmd)
logger.debug("sacct returned %s %s", stdout, stderr)
logger.debug("sacct/squeue returned %s %s", stdout, stderr)

# Execute_wait failed. Do no update
if retcode != 0:
logger.warning("sacct failed with non-zero exit code {}".format(retcode))
logger.warning("sacct/squeue failed with non-zero exit code {}".format(retcode))
return

jobs_missing = set(self.resources.keys())
Expand All @@ -193,19 +222,20 @@ def _status(self):
# For example "<job_id> CANCELLED by <user_id>"
# This splits and ignores anything past the first two unpacked values
job_id, slurm_state, *ignore = line.split()
if slurm_state not in translate_table:
if slurm_state not in self._translate_table:
logger.warning(f"Slurm status {slurm_state} is not recognized")
status = translate_table.get(slurm_state, JobState.UNKNOWN)
status = self._translate_table.get(slurm_state, JobState.UNKNOWN)
logger.debug("Updating job {} with slurm status {} to parsl state {!s}".format(job_id, slurm_state, status))
self.resources[job_id]['status'] = JobStatus(status,
stdout_path=self.resources[job_id]['job_stdout_path'],
stderr_path=self.resources[job_id]['job_stderr_path'])
jobs_missing.remove(job_id)

# sacct can get job info after jobs have completed so this path shouldn't be hit
# log a warning if there are missing jobs for some reason
# squeue does not report on jobs that are not running. So we are filling in the
# blanks for missing jobs, we might lose some information about why the jobs failed.
for missing_job in jobs_missing:
logger.warning("Updating missing job {} to completed status".format(missing_job))
logger.debug("Updating missing job {} to completed status".format(missing_job))
self.resources[missing_job]['status'] = JobStatus(
JobState.COMPLETED, stdout_path=self.resources[missing_job]['job_stdout_path'],
stderr_path=self.resources[missing_job]['job_stderr_path'])
Expand Down

0 comments on commit 412bdbb

Please sign in to comment.