From 5bb56b6fcd5c92686fa311ca36eafa2639aae139 Mon Sep 17 00:00:00 2001 From: "Juan E. Arango" Date: Mon, 22 Jul 2024 17:40:57 -0400 Subject: [PATCH 1/5] =?UTF-8?q?=F0=9F=94=A7=20add=20slurm=20exit=20codes?= =?UTF-8?q?=20improvements=20and=20try=20unlinks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/toil/batchSystems/abstractBatchSystem.py | 43 +- .../abstractGridEngineBatchSystem.py | 16 + src/toil/batchSystems/slurm.py | 517 ++++++++++++++---- src/toil/jobStores/fileJobStore.py | 15 +- src/toil/lib/threading.py | 5 +- src/toil/statsAndLogging.py | 3 + 6 files changed, 485 insertions(+), 114 deletions(-) diff --git a/src/toil/batchSystems/abstractBatchSystem.py b/src/toil/batchSystems/abstractBatchSystem.py index 478adbede9..ee5d83bd99 100644 --- a/src/toil/batchSystems/abstractBatchSystem.py +++ b/src/toil/batchSystems/abstractBatchSystem.py @@ -53,13 +53,42 @@ ('cleanWorkDir', bool)]) -class BatchJobExitReason(enum.Enum): - FINISHED: int = 1 # Successfully finished. - FAILED: int = 2 # Job finished, but failed. - LOST: int = 3 # Preemptable failure (job's executing host went away). - KILLED: int = 4 # Job killed before finishing. - ERROR: int = 5 # Internal error. - MEMLIMIT: int = 6 # Job hit batch system imposed memory limit +EXIT_STATUS_UNAVAILABLE_VALUE = 255 + +class BatchJobExitReason(enum.IntEnum): + FINISHED: int = 1 + """Successfully finished.""" + FAILED: int = 2 + """Job finished, but failed.""" + LOST: int = 3 + """Preemptable failure (job's executing host went away).""" + KILLED: int = 4 + """Job killed before finishing.""" + ERROR: int = 5 + """Internal error.""" + MEMLIMIT: int = 6 + """Job hit batch system imposed memory limit.""" + MISSING: int = 7 + """Job disappeared from the scheduler without actually stopping, so Toil killed it.""" + MAXJOBDURATION: int = 8 + """Job ran longer than --maxJobDuration, so Toil killed it.""" + PARTITION: int = 9 + """Job was not able to talk to the leader via the job store, so Toil declared it failed.""" + + + @classmethod + def to_string(cls, value: int) -> str: + """ + Convert to human-readable string. + + Given an int that may be or may be equal to a value from the enum, + produce the string value of its matching enum entry, or a stringified + int. + """ + try: + return cls(value).name + except ValueError: + return str(value) class AbstractBatchSystem(ABC): diff --git a/src/toil/batchSystems/abstractGridEngineBatchSystem.py b/src/toil/batchSystems/abstractGridEngineBatchSystem.py index 99d27de55e..9b5f1aa7b5 100644 --- a/src/toil/batchSystems/abstractGridEngineBatchSystem.py +++ b/src/toil/batchSystems/abstractGridEngineBatchSystem.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +import subprocess import time from abc import ABCMeta, abstractmethod from datetime import datetime @@ -27,6 +28,21 @@ logger = logging.getLogger(__name__) +def with_retries(operation, *args, **kwargs): + retries = 3 + latest_err = None + while retries: + retries -= 1 + try: + return operation(*args, **kwargs) + except subprocess.CalledProcessError as err: + latest_err = err + logger.error( + "Operation %s failed with code %d: %s", + operation, err.returncode, err.output) + raise latest_err + + class AbstractGridEngineBatchSystem(BatchSystemCleanupSupport): """ A partial implementation of BatchSystemSupport for batch systems run on a diff --git a/src/toil/batchSystems/slurm.py b/src/toil/batchSystems/slurm.py index a0e4bfaa62..34fb78e2b0 100644 --- a/src/toil/batchSystems/slurm.py +++ b/src/toil/batchSystems/slurm.py @@ -14,30 +14,95 @@ import logging import math import os +from collections import defaultdict from pipes import quote -from typing import List, Dict, Optional +from typing import Dict, List, Optional, Set, Tuple, TypeVar, Union, NamedTuple +from toil.batchSystems.abstractBatchSystem import BatchJobExitReason, EXIT_STATUS_UNAVAILABLE_VALUE from toil.batchSystems.abstractGridEngineBatchSystem import AbstractGridEngineBatchSystem +from toil.lib.humanize import bytes2human from toil.lib.misc import CalledProcessErrorStderr, call_command +from toil.statsAndLogging import TRACE logger = logging.getLogger(__name__) +MAX_MEMORY = 60 * 1e9 +OUT_OF_MEM_RETRIES = 2 + +# We have a complete list of Slurm states. States not in one of these aren't +# allowed. See + +# If a job is in one of these states, Slurm can't run it anymore. +# We don't include states where the job is held or paused here; +# those mean it could run and needs to wait for someone to un-hold +# it, so Toil should wait for it. +# +# We map from each terminal state to the Toil-ontology exit reason. +TERMINAL_STATES: Dict[str, BatchJobExitReason] = { + "BOOT_FAIL": BatchJobExitReason.LOST, + "CANCELLED": BatchJobExitReason.KILLED, + "COMPLETED": BatchJobExitReason.FINISHED, + "DEADLINE": BatchJobExitReason.KILLED, + "FAILED": BatchJobExitReason.FAILED, + "NODE_FAIL": BatchJobExitReason.LOST, + "OUT_OF_MEMORY": BatchJobExitReason.MEMLIMIT, + "PREEMPTED": BatchJobExitReason.KILLED, + "REVOKED": BatchJobExitReason.KILLED, + "SPECIAL_EXIT": BatchJobExitReason.FAILED, + "TIMEOUT": BatchJobExitReason.KILLED +} + +# If a job is in one of these states, it might eventually move to a different +# state. +NONTERMINAL_STATES: Set[str] = { + "CONFIGURING", + "COMPLETING", + "PENDING", + "RUNNING", + "RESV_DEL_HOLD", + "REQUEUE_FED", + "REQUEUE_HOLD", + "REQUEUED", + "RESIZING", + "SIGNALING", + "STAGE_OUT", + "STOPPED", + "SUSPENDED" +} class SlurmBatchSystem(AbstractGridEngineBatchSystem): + + def __init__(self, *args, **kwargs): + """Create a mapping table for JobIDs to JobNodes.""" + super(SlurmBatchSystem, self).__init__(*args, **kwargs) + self.Id2Node: Dict[str, Dict] = {} + self.resourceRetryCount = defaultdict(int) + + def issueBatchJob(self, jobDesc): + """Load the jobDesc into the JobID mapping table.""" + jobID = super(SlurmBatchSystem, self).issueBatchJob(jobDesc) + self.Id2Node[jobID] = jobDesc + return jobID class Worker(AbstractGridEngineBatchSystem.Worker): + + def forgetJob(self, jobID): + """Remove jobNode from the mapping table when forgetting.""" + self.boss.Id2Node.pop(jobID, None) + self.boss.resourceRetryCount.pop(jobID, None) + return super(SlurmBatchSystem.Worker, self).forgetJob(jobID) def getRunningJobIDs(self): # Should return a dictionary of Job IDs and number of seconds times = {} with self.runningJobsLock: - currentjobs = dict((str(self.batchJobIDs[x][0]), x) for x in self.runningJobs) + currentjobs: Dict[str, int] = {str(self.batchJobIDs[x][0]): x for x in self.runningJobs} # currentjobs is a dictionary that maps a slurm job id (string) to our own internal job id # squeue arguments: # -h for no header # --format to get jobid i, state %t and time days-hours:minutes:seconds - lines = call_command(['squeue', '-h', '--format', '%i %t %M']).split('\n') + lines = call_command(['squeue', '-h', '--format', '%i %t %M'], quiet=True).split('\n') for line in lines: values = line.split() if len(values) < 3: @@ -49,7 +114,7 @@ def getRunningJobIDs(self): return times - def killJob(self, jobID): + def killJob(self, jobID: int) -> None: call_command(['scancel', self.getBatchSystemID(jobID)]) def prepareSubmission(self, @@ -61,110 +126,324 @@ def prepareSubmission(self, job_environment: Optional[Dict[str, str]] = None) -> List[str]: return self.prepareSbatch(cpu, memory, jobID, jobName, job_environment) + ['--wrap={}'.format(command)] - def submitJob(self, subLine): + def submitJob(self, subLine: List[str]) -> int: try: - output = call_command(subLine) + # Slurm is not quite clever enough to follow the XDG spec on + # its own. If the submission command sees e.g. XDG_RUNTIME_DIR + # in our environment, it will send it along (especially with + # --export=ALL), even though it makes a promise to the job that + # Slurm isn't going to keep. It also has a tendency to create + # /run/user/ *at the start* of a job, but *not* keep it + # around for the duration of the job. + # + # So we hide the whole XDG universe from Slurm before we make + # the submission. + # Might as well hide DBUS also. + # This doesn't get us a trustworthy XDG session in Slurm, but + # it does let us see the one Slurm tries to give us. + no_session_environment = os.environ.copy() + session_names = [n for n in no_session_environment.keys() if n.startswith('XDG_') or n.startswith('DBUS_')] + for name in session_names: + del no_session_environment[name] + + output = call_command(subLine, env=no_session_environment) # sbatch prints a line like 'Submitted batch job 2954103' result = int(output.strip().split()[-1]) logger.debug("sbatch submitted job %d", result) return result except OSError as e: - logger.error("sbatch command failed") + logger.error(f"sbatch command failed with error: {e}") raise e - def getJobExitCode(self, slurmJobID): - logger.debug(f"Getting exit code for slurm job: {slurmJobID}") + def coalesce_job_exit_codes(self, batch_job_id_list: List[str]) -> List[Union[int, Tuple[int, Optional[BatchJobExitReason]], None]]: + """ + Collect all job exit codes in a single call. + :param batch_job_id_list: list of Job ID strings, where each string has the form + "[.]". + :return: list of job exit codes or exit code, exit reason pairs associated with the list of job IDs. + """ + logger.log(TRACE, "Getting exit codes for slurm jobs: %s", batch_job_id_list) + # Convert batch_job_id_list to list of integer job IDs. + job_id_list = [int(id.split('.')[0]) for id in batch_job_id_list] + status_dict = self._get_job_details(job_id_list) + exit_codes: List[Union[int, Tuple[int, Optional[BatchJobExitReason]], None]] = [] + for _, status in status_dict.items(): + exit_codes.append(self._get_job_return_code(status)) + return exit_codes + + def getJobExitCode(self, slurm_job_id: str) -> Union[int, Tuple[int, Optional[BatchJobExitReason]], None]: + """ + Get job exit code for given batch job ID. + :param slurm_job_id: string of the form "[.]". + :return: integer job exit code. + """ + logger.log(TRACE, "Getting exit code for slurm job: %s", slurm_job_id) + # Convert slurm_job_id to an integer job ID. + job_id = int(slurm_job_id.split('.')[0]) + status_dict = self._get_job_details([job_id]) + status = status_dict[job_id] + exit_status = self._get_job_return_code(status) + if exit_status is None: + return None + exit_code, exit_reason = exit_status + if exit_reason == BatchJobExitReason.MEMLIMIT: + # Retry job with 2x memory if it was killed because of memory + jobID = self._getJobID(slurm_job_id) + exit_code = self._customRetry(jobID, slurm_job_id) + return exit_code + + def _getJobID(self, slurm_job_id): + """Get toil job ID from the slurm job ID.""" + job_ids_dict = {slurm_job[0]: toil_job for toil_job, slurm_job in self.batchJobIDs.items()} + if slurm_job_id not in job_ids_dict: + raise RuntimeError("Unknown slurmJobID, could not be converted") + return job_ids_dict[slurm_job_id] + + def _customRetry(self, jobID, slurm_job_id): + """Increase the job memory 2x and retry, when it's killed by memlimit problems.""" + try: + jobNode = self.boss.Id2Node[jobID] + except KeyError: + logger.error("Can't resource retry %s, jobNode not found", jobID) + return 1 + + job_retries = self.boss.resourceRetryCount[jobID] + if job_retries < OUT_OF_MEM_RETRIES: + jobNode.jobName = (jobNode.jobName or "") + " OOM resource retry " + str(job_retries) + memory = jobNode.memory * (job_retries + 1) * 2 if jobNode.memory < MAX_MEMORY else MAX_MEMORY + + sbatch_line = self.prepareSubmission( + jobNode.cores, memory, jobID, jobNode.command, jobNode.jobName + ) + logger.debug("Running %r", sbatch_line) + new_slurm_job_id = with_retries(self.submitJob, sbatch_line) + self.batchJobIDs[jobID] = (new_slurm_job_id, None) + self.boss.resourceRetryCount[jobID] += 1 + logger.info( + "Detected job %s killed by SLURM, attempting retry with 2x memory: %s", + slurm_job_id, new_slurm_job_id + ) + logger.info( + "Issued job %s with job batch system ID: " + "%s and cores: %s, disk: %s, and memory: %s", + jobNode, str(new_slurm_job_id), int(jobNode.cores), + bytes2human(jobNode.disk), bytes2human(memory) + ) + with self.runningJobsLock: + self.runningJobs.add(jobID) + else: + logger.error("Can't retry job %s for memlimit more than twice") + return 1 + return None + + def _get_job_details(self, job_id_list: List[int]) -> Dict[int, Tuple[Optional[str], Optional[int]]]: + """ + Helper function for `getJobExitCode` and `coalesce_job_exit_codes`. + Fetch job details from Slurm's accounting system or job control system. + :param job_id_list: list of integer Job IDs. + :return: dict of job statuses, where key is the integer job ID, and value is a tuple + containing the job's state and exit code. + """ try: - state, rc = self._getJobDetailsFromSacct(slurmJobID) + status_dict = self._getJobDetailsFromSacct(job_id_list) except CalledProcessErrorStderr: # no accounting system or some other error - state, rc = self._getJobDetailsFromScontrol(slurmJobID) - - logger.debug("s job state is %s", state) - # If Job is in a running state, return None to indicate we don't have an update - if state in ('PENDING', 'RUNNING', 'CONFIGURING', 'COMPLETING', 'RESIZING', 'SUSPENDED'): + status_dict = self._getJobDetailsFromScontrol(job_id_list) + return status_dict + + + def _get_job_return_code(self, status: Tuple[Optional[str], Optional[int]]) -> Union[int, Tuple[int, Optional[BatchJobExitReason]], None]: + """ + Given a Slurm return code, status pair, summarize them into a Toil return code, exit reason pair. + The return code may have already been OR'd with the 128-offset + Slurm-reported signal. + Slurm will report return codes of 0 even if jobs time out instead + of succeeding: + + 2093597|TIMEOUT|0:0 + 2093597.batch|CANCELLED|0:15 + + So we guarantee here that, if the Slurm status string is not a + successful one as defined in + , we + will not return a successful return code. + Helper function for `getJobExitCode` and `coalesce_job_exit_codes`. + :param status: tuple containing the job's state and it's return code from Slurm. + :return: the job's return code for Toil if it's completed, otherwise None. + """ + state, rc = status + + if state not in TERMINAL_STATES: + # Don't treat the job as exited yet return None - return rc + exit_reason = TERMINAL_STATES[state] - def _getJobDetailsFromSacct(self, slurmJobID): - # SLURM job exit codes are obtained by running sacct. - args = ['sacct', - '-n', # no header - '-j', str(slurmJobID), # job - '--format', 'State,ExitCode', # specify output columns - '-P', # separate columns with pipes - '-S', '1970-01-01'] # override start time limit + if exit_reason == BatchJobExitReason.FINISHED: + # The only state that should produce a 0 ever is COMPLETED. So + # if the job is COMPLETED and the exit reason is thus FINISHED, + # pass along the code it has. + return (rc, exit_reason) # type: ignore[return-value] # mypy doesn't understand enums well + + if rc == 0: + # The job claims to be in a state other than COMPLETED, but + # also to have not encountered a problem. Say the exit status + # is unavailable. + return (EXIT_STATUS_UNAVAILABLE_VALUE, exit_reason) + + # If the code is nonzero, pass it along. + return (rc, exit_reason) # type: ignore[return-value] # mypy doesn't understand enums well + + def _canonicalize_state(self, state: str) -> str: + """ + Turn a state string form SLURM into just the state token like "CANCELED". + """ - stdout = call_command(args) + # Slurm will sometimes send something like "CANCELED by 30065" in + # the state column for some reason. + + state_token = state + + if " " in state_token: + state_token = state.split(" ", 1)[0] + + if state_token not in TERMINAL_STATES and state_token not in NONTERMINAL_STATES: + raise RuntimeError("Toil job in unimplemented Slurm state " + state) + + return state_token + + + def _getJobDetailsFromSacct(self, job_id_list: List[int]) -> Dict[int, Tuple[Optional[str], Optional[int]]]: + """ + Get SLURM job exit codes for the jobs in `job_id_list` by running `sacct`. + :param job_id_list: list of integer batch job IDs. + :return: dict of job statuses, where key is the job-id, and value is a tuple + containing the job's state and exit code. + """ + job_ids = ",".join(str(id) for id in job_id_list) + args = ['sacct', + '-n', # no header + '-j', job_ids, # job + '--format', 'JobIDRaw,State,ExitCode', # specify output columns + '-P', # separate columns with pipes + '-S', '1970-01-01'] # override start time limit + stdout = call_command(args, quiet=True) + # Collect the job statuses in a dict; key is the job-id, value is a tuple containing + # job state and exit status. Initialize dict before processing output of `sacct`. + job_statuses: Dict[int, Tuple[Optional[str], Optional[int]]] = {} + for job_id in job_id_list: + job_statuses[job_id] = (None, None) + for line in stdout.split('\n'): - logger.debug("%s output %s", args[0], line) values = line.strip().split('|') - if len(values) < 2: + if len(values) < 3: + continue + state: str + job_id_raw, state, exitcode = values + state = self._canonicalize_state(state) + logger.log(TRACE, "%s state of job %s is %s", args[0], job_id_raw, state) + # JobIDRaw is in the form JobID[.JobStep]; we're not interested in job steps. + job_id_parts = job_id_raw.split(".") + if len(job_id_parts) > 1: continue - state, exitcode = values - logger.debug("sacct job state is %s", state) - # If Job is in a running state, return None to indicate we don't have an update - status, signal = [int(n) for n in exitcode.split(':')] + job_id = int(job_id_parts[0]) + status: int + signal: int + status, signal = (int(n) for n in exitcode.split(':')) if signal > 0: # A non-zero signal may indicate e.g. an out-of-memory killed job status = 128 + signal - logger.debug("sacct exit code is %s, returning status %d", exitcode, status) - return state, status - logger.debug("Did not find exit code for job in sacct output") - return None, None - - def _getJobDetailsFromScontrol(self, slurmJobID): + logger.log(TRACE, "%s exit code of job %d is %s, return status %d", + args[0], job_id, exitcode, status) + job_statuses[job_id] = state, status + logger.log(TRACE, "%s returning job statuses: %s", args[0], job_statuses) + return job_statuses + + def _getJobDetailsFromScontrol(self, job_id_list: List[int]) -> Dict[int, Tuple[Optional[str], Optional[int]]]: + """ + Get SLURM job exit codes for the jobs in `job_id_list` by running `scontrol`. + :param job_id_list: list of integer batch job IDs. + :return: dict of job statuses, where key is the job-id, and value is a tuple + containing the job's state and exit code. + """ args = ['scontrol', 'show', - 'job', - str(slurmJobID)] - - stdout = call_command(args) + 'job'] + + # `scontrol` can only return information about a single job, + # or all the jobs it knows about. + if len(job_id_list) == 1: + args.append(str(job_id_list[0])) + + stdout = call_command(args, quiet=True) + + job_records = None if isinstance(stdout, str): - lines = stdout.splitlines() + job_records = stdout.strip().split('\n\n') elif isinstance(stdout, bytes): - lines = stdout.decode('utf-8').splitlines() - - job = dict() - for line in lines: - for item in line.split(): - logger.debug(f"{args[0]} output {item}") - - # Output is in the form of many key=value pairs, multiple pairs on each line - # and multiple lines in the output. Each pair is pulled out of each line and - # added to a dictionary. - # Note: In some cases, the value itself may contain white-space. So, if we find - # a key without a value, we consider that key part of the previous value. - bits = item.split('=', 1) - if len(bits) == 1: - job[key] += ' ' + bits[0] + job_records = stdout.decode('utf-8').strip().split('\n\n') + + # Collect the job statuses in a dict; key is the job-id, value is a tuple containing + # job state and exit status. Initialize dict before processing output of `scontrol`. + job_statuses: Dict[int, Tuple[Optional[str], Optional[int]]] = {} + job_id: Optional[int] + for job_id in job_id_list: + job_statuses[job_id] = (None, None) + + # `scontrol` will report "No jobs in the system", if there are no jobs in the system, + # and if no job-id was passed as argument to `scontrol`. + if len(job_records) > 0 and job_records[0] == "No jobs in the system": + return job_statuses + + for record in job_records: + job: Dict[str, str] = {} + job_id = None + for line in record.splitlines(): + for item in line.split(): + # Output is in the form of many key=value pairs, multiple pairs on each line + # and multiple lines in the output. Each pair is pulled out of each line and + # added to a dictionary. + # Note: In some cases, the value itself may contain white-space. So, if we find + # a key without a value, we consider that key part of the previous value. + bits = item.split('=', 1) + if len(bits) == 1: + job[key] += ' ' + bits[0] # type: ignore[has-type] # we depend on the previous iteration to populate key + else: + key = bits[0] + job[key] = bits[1] + # The first line of the record contains the JobId. Stop processing the remainder + # of this record, if we're not interested in this job. + job_id = int(job['JobId']) + if job_id not in job_id_list: + logger.log(TRACE, "%s job %d is not in the list", args[0], job_id) + break + if job_id is None or job_id not in job_id_list: + continue + state = job['JobState'] + state = self._canonicalize_state(state) + logger.log(TRACE, "%s state of job %s is %s", args[0], job_id, state) + try: + exitcode = job['ExitCode'] + if exitcode is not None: + status, signal = (int(n) for n in exitcode.split(':')) + if signal > 0: + # A non-zero signal may indicate e.g. an out-of-memory killed job + status = 128 + signal + logger.log(TRACE, "%s exit code of job %d is %s, return status %d", + args[0], job_id, exitcode, status) + rc = status else: - key = bits[0] - job[key] = bits[1] - - state = job['JobState'] - try: - exitcode = job['ExitCode'] - if exitcode is not None: - status, signal = [int(n) for n in exitcode.split(':')] - if signal > 0: - # A non-zero signal may indicate e.g. an out-of-memory killed job - status = 128 + signal - logger.debug("scontrol exit code is %s, returning status %d", exitcode, status) - rc = status - else: + rc = None + except KeyError: rc = None - except KeyError: - rc = None - - return state, rc - - """ - Implementation-specific helper methods - """ + job_statuses[job_id] = (state, rc) + logger.log(TRACE, "%s returning job statuses: %s", args[0], job_statuses) + return job_statuses + ### + ### Implementation-specific helper methods + ### def prepareSbatch(self, cpu: int, mem: int, @@ -172,57 +451,89 @@ def prepareSbatch(self, jobName: str, job_environment: Optional[Dict[str, str]]) -> List[str]: - # Returns the sbatch command line before the script to run - sbatch_line = ['sbatch', '-J', 'toil_job_{}_{}'.format(jobID, jobName)] + """ + Returns the sbatch command line to run to queue the job. + """ + + # Start by naming the job + sbatch_line = ['sbatch', '-J', f'toil_job_{jobID}_{jobName}'] + + # Make sure the job gets a signal before it disappears so that e.g. + # container cleanup finally blocks can run. Ask for SIGINT so we + # can get the default Python KeyboardInterrupt which third-party + # code is likely to plan for. Make sure to send it to the batch + # shell process with "B:", not to all the srun steps it launches + # (because there shouldn't be any). We cunningly replaced the batch + # shell process with the Toil worker process, so Toil should be + # able to get the signal. + # + # TODO: Add a way to detect when the job failed because it + # responded to this signal and use the right exit reason for it. + sbatch_line.append("--signal=B:INT@30") environment = {} environment.update(self.boss.environment) if job_environment: environment.update(job_environment) + + # "Native extensions" for SLURM (see DRMAA or SAGA) + nativeConfig = os.getenv('TOIL_SLURM_ARGS') + + # --export=[ALL,] + set_exports = "--export=ALL" + + if nativeConfig is not None: + logger.debug("Native SLURM options appended to sbatch from TOIL_SLURM_ARGS env. variable: %s", nativeConfig) + + for arg in nativeConfig.split(): + if arg.startswith("--mem") or arg.startswith("--cpus-per-task"): + raise ValueError(f"Some resource arguments are incompatible: {nativeConfig}") + # repleace default behaviour by the one stated at TOIL_SLURM_ARGS + if arg.startswith("--export"): + set_exports = arg + sbatch_line.extend(nativeConfig.split()) if environment: argList = [] for k, v in environment.items(): quoted_value = quote(os.environ[k] if v is None else v) - argList.append('{}={}'.format(k, quoted_value)) - - sbatch_line.append('--export=' + ','.join(argList)) - - if mem is not None and self.boss.config.allocate_mem: + argList.append(f'{k}={quoted_value}') + + set_exports += ',' + ','.join(argList) + + # add --export to the sbatch + sbatch_line.append(set_exports) + + parallel_env = os.getenv('TOIL_SLURM_PE') + if cpu and cpu > 1 and parallel_env: + sbatch_line.append(f'--partition={parallel_env}') + + if mem is not None and self.boss.config.allocate_mem: # type: ignore[attr-defined] # memory passed in is in bytes, but slurm expects megabytes sbatch_line.append(f'--mem={math.ceil(mem / 2 ** 20)}') if cpu is not None: sbatch_line.append(f'--cpus-per-task={math.ceil(cpu)}') - stdoutfile: str = self.boss.formatStdOutErrPath(jobID, '%j', 'out') - stderrfile: str = self.boss.formatStdOutErrPath(jobID, '%j', 'err') + stdoutfile: str = self.boss.format_std_out_err_path(jobID, '%j', 'out') + stderrfile: str = self.boss.format_std_out_err_path(jobID, '%j', 'err') sbatch_line.extend(['-o', stdoutfile, '-e', stderrfile]) - - # "Native extensions" for SLURM (see DRMAA or SAGA) - nativeConfig = os.getenv('TOIL_SLURM_ARGS') - if nativeConfig is not None: - logger.debug("Native SLURM options appended to sbatch from TOIL_SLURM_ARGS env. variable: {}".format(nativeConfig)) - if ("--mem" in nativeConfig) or ("--cpus-per-task" in nativeConfig): - raise ValueError("Some resource arguments are incompatible: {}".format(nativeConfig)) - - sbatch_line.extend(nativeConfig.split()) - + return sbatch_line - def parse_elapsed(self, elapsed): + def parse_elapsed(self, elapsed: str) -> int: # slurm returns elapsed time in days-hours:minutes:seconds format # Sometimes it will only return minutes:seconds, so days may be omitted # For ease of calculating, we'll make sure all the delimeters are ':' # Then reverse the list so that we're always counting up from seconds -> minutes -> hours -> days total_seconds = 0 try: - elapsed = elapsed.replace('-', ':').split(':') - elapsed.reverse() + elapsed_split: List[str] = elapsed.replace('-', ':').split(':') + elapsed_split.reverse() seconds_per_unit = [1, 60, 3600, 86400] for index, multiplier in enumerate(seconds_per_unit): - if index < len(elapsed): - total_seconds += multiplier * int(elapsed[index]) + if index < len(elapsed_split): + total_seconds += multiplier * int(elapsed_split[index]) except ValueError: pass # slurm may return INVALID instead of a time return total_seconds diff --git a/src/toil/jobStores/fileJobStore.py b/src/toil/jobStores/fileJobStore.py index 9a64d87213..12ba095604 100644 --- a/src/toil/jobStores/fileJobStore.py +++ b/src/toil/jobStores/fileJobStore.py @@ -236,7 +236,10 @@ def update(self, job): with open(self._getJobFileName(job.jobStoreID) + ".new", 'xb') as f: pickle.dump(job, f) # This should be atomic for the file system - os.rename(self._getJobFileName(job.jobStoreID) + ".new", self._getJobFileName(job.jobStoreID)) + try: + os.rename(self._getJobFileName(job.jobStoreID) + ".new", self._getJobFileName(job.jobStoreID)) + except OSError: + shutil.move(self._getJobFileName(job.jobStoreID) + ".new", self._getJobFileName(job.jobStoreID)) def delete(self, jobStoreID): # The jobStoreID is the relative path to the directory containing the job, @@ -599,7 +602,10 @@ def writeStatsAndLogging(self, statsAndLoggingString): writeFormat = 'w' if isinstance(statsAndLoggingString, str) else 'wb' with open(tempStatsFile, writeFormat) as f: f.write(statsAndLoggingString) - os.rename(tempStatsFile, tempStatsFile[:-4]) # This operation is atomic + try: + os.rename(tempStatsFile, tempStatsFile[:-4]) # This operation is atomic + except OSError: + shutil.move(tempStatsFile, tempStatsFile[:-4]) def readStatsAndLogging(self, callback, readAll=False): numberOfFilesProcessed = 0 @@ -615,7 +621,10 @@ def readStatsAndLogging(self, callback, readAll=False): newName = tempFile.rsplit('.', 1)[0] + '.new' newAbsTempFile = os.path.join(tempDir, newName) # Mark this item as read - os.rename(absTempFile, newAbsTempFile) + try: + os.rename(absTempFile, newAbsTempFile) + except OSError: + shutil.move(absTempFile, newAbsTempFile) return numberOfFilesProcessed ########################################## diff --git a/src/toil/lib/threading.py b/src/toil/lib/threading.py index e53a6aa632..135b879f6b 100644 --- a/src/toil/lib/threading.py +++ b/src/toil/lib/threading.py @@ -363,7 +363,10 @@ def global_mutex(workDir: str, mutex: str) -> Iterator[None]: # Delete it while we still own it, so we can't delete it from out from # under someone else who thinks they are holding it. logger.debug('PID %d releasing mutex %s', os.getpid(), lock_filename) - os.unlink(lock_filename) + try: + os.unlink(lock_filename) + except OSError as e: + print(f"Error: {e.strerror}. File {lock_filename} could not be deleted.") fcntl.lockf(fd, fcntl.LOCK_UN) # Note that we are unlinking it and then unlocking it; a lot of people # might have opened it before we unlinked it and will wake up when they diff --git a/src/toil/statsAndLogging.py b/src/toil/statsAndLogging.py index 4584ad1398..5632fe10bb 100644 --- a/src/toil/statsAndLogging.py +++ b/src/toil/statsAndLogging.py @@ -36,6 +36,9 @@ DEFAULT_LOGLEVEL = logging.INFO __loggingFiles = [] +# We have some logging that belongs at a TRACE level, below DEBUG +TRACE = logging.DEBUG - 5 +logging.addLevelName(TRACE, "TRACE") class StatsAndLogging: """A thread to aggregate statistics and logging.""" From 2a9b33d31c62a51849db70bf500ef413f26ab202 Mon Sep 17 00:00:00 2001 From: "Juan E. Arango" Date: Fri, 2 Aug 2024 14:23:45 -0400 Subject: [PATCH 2/5] =?UTF-8?q?=F0=9F=94=A7=20fix=20job=20var=20types=20an?= =?UTF-8?q?d=20update=20call=5Fcmd?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/toil/batchSystems/slurm.py | 27 ++++++++++++++------------- src/toil/common.py | 2 +- src/toil/leader.py | 2 +- src/toil/lib/misc.py | 30 ++++++++++++++++++++---------- 4 files changed, 36 insertions(+), 25 deletions(-) diff --git a/src/toil/batchSystems/slurm.py b/src/toil/batchSystems/slurm.py index 34fb78e2b0..1cd79e67bb 100644 --- a/src/toil/batchSystems/slurm.py +++ b/src/toil/batchSystems/slurm.py @@ -16,11 +16,11 @@ import os from collections import defaultdict from pipes import quote -from typing import Dict, List, Optional, Set, Tuple, TypeVar, Union, NamedTuple +from typing import Dict, List, Optional, Set, Tuple, Union from toil.batchSystems.abstractBatchSystem import BatchJobExitReason, EXIT_STATUS_UNAVAILABLE_VALUE -from toil.batchSystems.abstractGridEngineBatchSystem import AbstractGridEngineBatchSystem -from toil.lib.humanize import bytes2human +from toil.batchSystems.abstractGridEngineBatchSystem import AbstractGridEngineBatchSystem, with_retries +from toil.lib.conversions import bytes2human from toil.lib.misc import CalledProcessErrorStderr, call_command from toil.statsAndLogging import TRACE @@ -75,12 +75,12 @@ class SlurmBatchSystem(AbstractGridEngineBatchSystem): def __init__(self, *args, **kwargs): """Create a mapping table for JobIDs to JobNodes.""" super(SlurmBatchSystem, self).__init__(*args, **kwargs) - self.Id2Node: Dict[str, Dict] = {} + self.Id2Node: Dict[int, Dict] = {} self.resourceRetryCount = defaultdict(int) - def issueBatchJob(self, jobDesc): + def issueBatchJob(self, jobDesc, job_environment=None): """Load the jobDesc into the JobID mapping table.""" - jobID = super(SlurmBatchSystem, self).issueBatchJob(jobDesc) + jobID = super(SlurmBatchSystem, self).issueBatchJob(jobDesc, job_environment) self.Id2Node[jobID] = jobDesc return jobID @@ -187,20 +187,21 @@ def getJobExitCode(self, slurm_job_id: str) -> Union[int, Tuple[int, Optional[Ba return None exit_code, exit_reason = exit_status + logger.info("Exit code for %s job is: %s, %s", slurm_job_id, str(exit_code), exit_reason) if exit_reason == BatchJobExitReason.MEMLIMIT: # Retry job with 2x memory if it was killed because of memory jobID = self._getJobID(slurm_job_id) exit_code = self._customRetry(jobID, slurm_job_id) return exit_code - def _getJobID(self, slurm_job_id): + def _getJobID(self, slurm_job_id: str) -> int: """Get toil job ID from the slurm job ID.""" - job_ids_dict = {slurm_job[0]: toil_job for toil_job, slurm_job in self.batchJobIDs.items()} + job_ids_dict = {str(slurm_job[0]): int(toil_job) for toil_job, slurm_job in self.batchJobIDs.items()} if slurm_job_id not in job_ids_dict: - raise RuntimeError("Unknown slurmJobID, could not be converted") + raise RuntimeError(f"Unknown slurmJobID: {slurm_job_id}.\nTracked jobs: {job_ids_dict}") return job_ids_dict[slurm_job_id] - def _customRetry(self, jobID, slurm_job_id): + def _customRetry(self, jobID: int, slurm_job_id: str) -> int: """Increase the job memory 2x and retry, when it's killed by memlimit problems.""" try: jobNode = self.boss.Id2Node[jobID] @@ -449,7 +450,7 @@ def prepareSbatch(self, mem: int, jobID: int, jobName: str, - job_environment: Optional[Dict[str, str]]) -> List[str]: + job_environment: Optional[Dict[str, str]] = None) -> List[str]: """ Returns the sbatch command line to run to queue the job. @@ -515,8 +516,8 @@ def prepareSbatch(self, if cpu is not None: sbatch_line.append(f'--cpus-per-task={math.ceil(cpu)}') - stdoutfile: str = self.boss.format_std_out_err_path(jobID, '%j', 'out') - stderrfile: str = self.boss.format_std_out_err_path(jobID, '%j', 'err') + stdoutfile: str = self.boss.formatStdOutErrPath(jobID, '%j', 'out') + stderrfile: str = self.boss.formatStdOutErrPath(jobID, '%j', 'err') sbatch_line.extend(['-o', stdoutfile, '-e', stderrfile]) return sbatch_line diff --git a/src/toil/common.py b/src/toil/common.py index 7f64c65fa3..310519005f 100644 --- a/src/toil/common.py +++ b/src/toil/common.py @@ -185,7 +185,7 @@ def parse_int_list(s: str): if not os.path.exists(self.workDir): raise RuntimeError(f"The path provided to --workDir ({self.workDir}) does not exist.") - if len(self.workDir) > 80: + if len(self.workDir) > 120: logger.warning(f'Length of workDir path "{self.workDir}" is {len(self.workDir)} characters. ' f'Consider setting a shorter path with --workPath or setting TMPDIR to something ' f'like "/tmp" to avoid overly long paths.') diff --git a/src/toil/leader.py b/src/toil/leader.py index 1f7ac7251e..9f37701686 100644 --- a/src/toil/leader.py +++ b/src/toil/leader.py @@ -1130,7 +1130,7 @@ def processFinishedJob(self, batchSystemID, result_status, wallTime=None, exitRe if self.config.writeLogs or self.config.writeLogsGzip: batchSystemFileRoot, _ = os.path.splitext(os.path.basename(batchSystemFile)) jobNames = replacementJob.chainedJobs - if jobNames is None: # For jobs that fail this way, replacementJob.chainedJobs is not guaranteed to be set + if not jobNames: # For jobs that fail this way, replacementJob.chainedJobs is not guaranteed to be set jobNames = [str(replacementJob)] jobNames = [jobName + '_' + batchSystemFileRoot for jobName in jobNames] batchSystemFileStream.seek(0) diff --git a/src/toil/lib/misc.py b/src/toil/lib/misc.py index 9fe1e2f3cf..683cb37cc2 100644 --- a/src/toil/lib/misc.py +++ b/src/toil/lib/misc.py @@ -5,6 +5,7 @@ import subprocess import sys import typing +import datetime from typing import Iterator, Union, List, Optional @@ -41,15 +42,21 @@ def __str__(self) -> str: def call_command(cmd: List[str], *args: str, input: Optional[str] = None, timeout: Optional[float] = None, - useCLocale: bool = True, env: Optional[typing.Dict[str, str]] = None) -> Union[str, bytes]: - """Simplified calling of external commands. This always returns - stdout and uses utf- encode8. If process fails, CalledProcessErrorStderr - is raised. The captured stderr is always printed, regardless of - if an expect occurs, so it can be logged. At the debug log level, the - command and captured out are always used. With useCLocale, C locale - is force to prevent failures that occurred in some batch systems - with UTF-8 locale. + useCLocale: bool = True, env: Optional[typing.Dict[str, str]] = None, quiet: Optional[bool] = False) -> Union[str, bytes]: """ + Simplified calling of external commands. + If the process fails, CalledProcessErrorStderr is raised. + The captured stderr is always printed, regardless of + if an exception occurs, so it can be logged. + Always logs the command at debug log level. + :param quiet: If True, do not log the command output. If False (the + default), do log the command output at debug log level. + :param useCLocale: If True, C locale is forced, to prevent failures that + can occur in some batch systems when using UTF-8 locale. + :returns: Command standard output, decoded as utf-8. + """ + # NOTE: Interface MUST be kept in sync with call_sacct and call_scontrol in + # test_slurm.py, which monkey-patch this! # using non-C locales can cause GridEngine commands, maybe other to # generate errors @@ -58,12 +65,15 @@ def call_command(cmd: List[str], *args: str, input: Optional[str] = None, timeou env["LANGUAGE"] = env["LC_ALL"] = "C" logger.debug("run command: {}".format(" ".join(cmd))) + start_time = datetime.datetime.now() proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='utf-8', errors="replace", env=env) stdout, stderr = proc.communicate(input=input, timeout=timeout) + end_time = datetime.datetime.now() + runtime = (end_time - start_time).total_seconds() sys.stderr.write(stderr) if proc.returncode != 0: - logger.debug("command failed: {}: {}".format(" ".join(cmd), stderr.rstrip())) + logger.debug("command failed in {}s: {}: {}".format(runtime, " ".join(cmd), stderr.rstrip())) raise CalledProcessErrorStderr(proc.returncode, cmd, output=stdout, stderr=stderr) - logger.debug("command succeeded: {}: {}".format(" ".join(cmd), stdout.rstrip())) + logger.debug("command succeeded in {}s: {}: {}".format(runtime, " ".join(cmd), (': ' + stdout.rstrip()) if not quiet else '')) return stdout From c838244ab81b8d9292675cc4142f6054c9d85ee4 Mon Sep 17 00:00:00 2001 From: "Juan E. Arango" Date: Fri, 15 Nov 2024 10:16:40 -0500 Subject: [PATCH 3/5] =?UTF-8?q?=F0=9F=94=A7=20export=20job=20ids=20to=20fi?= =?UTF-8?q?le?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/toil/batchSystems/slurm.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/toil/batchSystems/slurm.py b/src/toil/batchSystems/slurm.py index 1cd79e67bb..9331bbef86 100644 --- a/src/toil/batchSystems/slurm.py +++ b/src/toil/batchSystems/slurm.py @@ -14,6 +14,7 @@ import logging import math import os +import subprocess from collections import defaultdict from pipes import quote from typing import Dict, List, Optional, Set, Tuple, Union @@ -126,7 +127,7 @@ def prepareSubmission(self, job_environment: Optional[Dict[str, str]] = None) -> List[str]: return self.prepareSbatch(cpu, memory, jobID, jobName, job_environment) + ['--wrap={}'.format(command)] - def submitJob(self, subLine: List[str]) -> int: + def submitJob(self, subLine: List[str]) -> int: try: # Slurm is not quite clever enough to follow the XDG spec on # its own. If the submission command sees e.g. XDG_RUNTIME_DIR @@ -149,7 +150,8 @@ def submitJob(self, subLine: List[str]) -> int: output = call_command(subLine, env=no_session_environment) # sbatch prints a line like 'Submitted batch job 2954103' result = int(output.strip().split()[-1]) - logger.debug("sbatch submitted job %d", result) + logger.info("sbatch submitted job %d", result) + subprocess.check_call([f"echo {result} >> $PWD/job_ids.txt"], shell=True) return result except OSError as e: logger.error(f"sbatch command failed with error: {e}") From b36e5ca661267a1cdfc2309e75743039a2b15371 Mon Sep 17 00:00:00 2001 From: "Juan E. Arango" Date: Fri, 13 Dec 2024 13:53:45 -0500 Subject: [PATCH 4/5] =?UTF-8?q?=F0=9F=94=A7=20remove=20duplicated=20with?= =?UTF-8?q?=5Fretries=20method?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../batchSystems/abstractGridEngineBatchSystem.py | 15 --------------- src/toil/batchSystems/slurm.py | 4 ++-- 2 files changed, 2 insertions(+), 17 deletions(-) diff --git a/src/toil/batchSystems/abstractGridEngineBatchSystem.py b/src/toil/batchSystems/abstractGridEngineBatchSystem.py index 9b5f1aa7b5..c066b31a64 100644 --- a/src/toil/batchSystems/abstractGridEngineBatchSystem.py +++ b/src/toil/batchSystems/abstractGridEngineBatchSystem.py @@ -28,21 +28,6 @@ logger = logging.getLogger(__name__) -def with_retries(operation, *args, **kwargs): - retries = 3 - latest_err = None - while retries: - retries -= 1 - try: - return operation(*args, **kwargs) - except subprocess.CalledProcessError as err: - latest_err = err - logger.error( - "Operation %s failed with code %d: %s", - operation, err.returncode, err.output) - raise latest_err - - class AbstractGridEngineBatchSystem(BatchSystemCleanupSupport): """ A partial implementation of BatchSystemSupport for batch systems run on a diff --git a/src/toil/batchSystems/slurm.py b/src/toil/batchSystems/slurm.py index 9331bbef86..08a520a8d0 100644 --- a/src/toil/batchSystems/slurm.py +++ b/src/toil/batchSystems/slurm.py @@ -20,7 +20,7 @@ from typing import Dict, List, Optional, Set, Tuple, Union from toil.batchSystems.abstractBatchSystem import BatchJobExitReason, EXIT_STATUS_UNAVAILABLE_VALUE -from toil.batchSystems.abstractGridEngineBatchSystem import AbstractGridEngineBatchSystem, with_retries +from toil.batchSystems.abstractGridEngineBatchSystem import AbstractGridEngineBatchSystem from toil.lib.conversions import bytes2human from toil.lib.misc import CalledProcessErrorStderr, call_command from toil.statsAndLogging import TRACE @@ -220,7 +220,7 @@ def _customRetry(self, jobID: int, slurm_job_id: str) -> int: jobNode.cores, memory, jobID, jobNode.command, jobNode.jobName ) logger.debug("Running %r", sbatch_line) - new_slurm_job_id = with_retries(self.submitJob, sbatch_line) + new_slurm_job_id = self.with_retries(self.submitJob, sbatch_line) self.batchJobIDs[jobID] = (new_slurm_job_id, None) self.boss.resourceRetryCount[jobID] += 1 logger.info( From bc164705a488e9d0b1081ce6f058c4dfcad3a59f Mon Sep 17 00:00:00 2001 From: Dylan Domenico Date: Tue, 7 Jan 2025 16:09:22 -0500 Subject: [PATCH 5/5] =?UTF-8?q?=F0=9F=90=9B=20fix=20with=5Fretries=20call?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/toil/batchSystems/slurm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/toil/batchSystems/slurm.py b/src/toil/batchSystems/slurm.py index 08a520a8d0..38ac1e7bed 100644 --- a/src/toil/batchSystems/slurm.py +++ b/src/toil/batchSystems/slurm.py @@ -220,7 +220,7 @@ def _customRetry(self, jobID: int, slurm_job_id: str) -> int: jobNode.cores, memory, jobID, jobNode.command, jobNode.jobName ) logger.debug("Running %r", sbatch_line) - new_slurm_job_id = self.with_retries(self.submitJob, sbatch_line) + new_slurm_job_id = self.boss.with_retries(self.submitJob, sbatch_line) self.batchJobIDs[jobID] = (new_slurm_job_id, None) self.boss.resourceRetryCount[jobID] += 1 logger.info(