Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🔧 patch for v5.5.0 latest for python 3.6 #2

Open
wants to merge 5 commits into
base: releases/5.5.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 36 additions & 7 deletions src/toil/batchSystems/abstractBatchSystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,42 @@
('cleanWorkDir', bool)])


class BatchJobExitReason(enum.Enum):
FINISHED: int = 1 # Successfully finished.
FAILED: int = 2 # Job finished, but failed.
LOST: int = 3 # Preemptable failure (job's executing host went away).
KILLED: int = 4 # Job killed before finishing.
ERROR: int = 5 # Internal error.
MEMLIMIT: int = 6 # Job hit batch system imposed memory limit
EXIT_STATUS_UNAVAILABLE_VALUE = 255

class BatchJobExitReason(enum.IntEnum):
FINISHED: int = 1
"""Successfully finished."""
FAILED: int = 2
"""Job finished, but failed."""
LOST: int = 3
"""Preemptable failure (job's executing host went away)."""
KILLED: int = 4
"""Job killed before finishing."""
ERROR: int = 5
"""Internal error."""
MEMLIMIT: int = 6
"""Job hit batch system imposed memory limit."""
MISSING: int = 7
"""Job disappeared from the scheduler without actually stopping, so Toil killed it."""
MAXJOBDURATION: int = 8
"""Job ran longer than --maxJobDuration, so Toil killed it."""
PARTITION: int = 9
"""Job was not able to talk to the leader via the job store, so Toil declared it failed."""


@classmethod
def to_string(cls, value: int) -> str:
"""
Convert to human-readable string.

Given an int that may be or may be equal to a value from the enum,
produce the string value of its matching enum entry, or a stringified
int.
"""
try:
return cls(value).name
except ValueError:
return str(value)


class AbstractBatchSystem(ABC):
Expand Down
1 change: 1 addition & 0 deletions src/toil/batchSystems/abstractGridEngineBatchSystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import subprocess
import time
from abc import ABCMeta, abstractmethod
from datetime import datetime
Expand Down
522 changes: 418 additions & 104 deletions src/toil/batchSystems/slurm.py

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/toil/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ def parse_int_list(s: str):
if not os.path.exists(self.workDir):
raise RuntimeError(f"The path provided to --workDir ({self.workDir}) does not exist.")

if len(self.workDir) > 80:
if len(self.workDir) > 120:
logger.warning(f'Length of workDir path "{self.workDir}" is {len(self.workDir)} characters. '
f'Consider setting a shorter path with --workPath or setting TMPDIR to something '
f'like "/tmp" to avoid overly long paths.')
Expand Down
15 changes: 12 additions & 3 deletions src/toil/jobStores/fileJobStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,10 @@ def update(self, job):
with open(self._getJobFileName(job.jobStoreID) + ".new", 'xb') as f:
pickle.dump(job, f)
# This should be atomic for the file system
os.rename(self._getJobFileName(job.jobStoreID) + ".new", self._getJobFileName(job.jobStoreID))
try:
os.rename(self._getJobFileName(job.jobStoreID) + ".new", self._getJobFileName(job.jobStoreID))
except OSError:
shutil.move(self._getJobFileName(job.jobStoreID) + ".new", self._getJobFileName(job.jobStoreID))

def delete(self, jobStoreID):
# The jobStoreID is the relative path to the directory containing the job,
Expand Down Expand Up @@ -599,7 +602,10 @@ def writeStatsAndLogging(self, statsAndLoggingString):
writeFormat = 'w' if isinstance(statsAndLoggingString, str) else 'wb'
with open(tempStatsFile, writeFormat) as f:
f.write(statsAndLoggingString)
os.rename(tempStatsFile, tempStatsFile[:-4]) # This operation is atomic
try:
os.rename(tempStatsFile, tempStatsFile[:-4]) # This operation is atomic
except OSError:
shutil.move(tempStatsFile, tempStatsFile[:-4])

def readStatsAndLogging(self, callback, readAll=False):
numberOfFilesProcessed = 0
Expand All @@ -615,7 +621,10 @@ def readStatsAndLogging(self, callback, readAll=False):
newName = tempFile.rsplit('.', 1)[0] + '.new'
newAbsTempFile = os.path.join(tempDir, newName)
# Mark this item as read
os.rename(absTempFile, newAbsTempFile)
try:
os.rename(absTempFile, newAbsTempFile)
except OSError:
shutil.move(absTempFile, newAbsTempFile)
return numberOfFilesProcessed

##########################################
Expand Down
2 changes: 1 addition & 1 deletion src/toil/leader.py
Original file line number Diff line number Diff line change
Expand Up @@ -1130,7 +1130,7 @@ def processFinishedJob(self, batchSystemID, result_status, wallTime=None, exitRe
if self.config.writeLogs or self.config.writeLogsGzip:
batchSystemFileRoot, _ = os.path.splitext(os.path.basename(batchSystemFile))
jobNames = replacementJob.chainedJobs
if jobNames is None: # For jobs that fail this way, replacementJob.chainedJobs is not guaranteed to be set
if not jobNames: # For jobs that fail this way, replacementJob.chainedJobs is not guaranteed to be set
jobNames = [str(replacementJob)]
jobNames = [jobName + '_' + batchSystemFileRoot for jobName in jobNames]
batchSystemFileStream.seek(0)
Expand Down
30 changes: 20 additions & 10 deletions src/toil/lib/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import subprocess
import sys
import typing
import datetime

from typing import Iterator, Union, List, Optional

Expand Down Expand Up @@ -41,15 +42,21 @@ def __str__(self) -> str:


def call_command(cmd: List[str], *args: str, input: Optional[str] = None, timeout: Optional[float] = None,
useCLocale: bool = True, env: Optional[typing.Dict[str, str]] = None) -> Union[str, bytes]:
"""Simplified calling of external commands. This always returns
stdout and uses utf- encode8. If process fails, CalledProcessErrorStderr
is raised. The captured stderr is always printed, regardless of
if an expect occurs, so it can be logged. At the debug log level, the
command and captured out are always used. With useCLocale, C locale
is force to prevent failures that occurred in some batch systems
with UTF-8 locale.
useCLocale: bool = True, env: Optional[typing.Dict[str, str]] = None, quiet: Optional[bool] = False) -> Union[str, bytes]:
"""
Simplified calling of external commands.
If the process fails, CalledProcessErrorStderr is raised.
The captured stderr is always printed, regardless of
if an exception occurs, so it can be logged.
Always logs the command at debug log level.
:param quiet: If True, do not log the command output. If False (the
default), do log the command output at debug log level.
:param useCLocale: If True, C locale is forced, to prevent failures that
can occur in some batch systems when using UTF-8 locale.
:returns: Command standard output, decoded as utf-8.
"""
# NOTE: Interface MUST be kept in sync with call_sacct and call_scontrol in
# test_slurm.py, which monkey-patch this!

# using non-C locales can cause GridEngine commands, maybe other to
# generate errors
Expand All @@ -58,12 +65,15 @@ def call_command(cmd: List[str], *args: str, input: Optional[str] = None, timeou
env["LANGUAGE"] = env["LC_ALL"] = "C"

logger.debug("run command: {}".format(" ".join(cmd)))
start_time = datetime.datetime.now()
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
encoding='utf-8', errors="replace", env=env)
stdout, stderr = proc.communicate(input=input, timeout=timeout)
end_time = datetime.datetime.now()
runtime = (end_time - start_time).total_seconds()
sys.stderr.write(stderr)
if proc.returncode != 0:
logger.debug("command failed: {}: {}".format(" ".join(cmd), stderr.rstrip()))
logger.debug("command failed in {}s: {}: {}".format(runtime, " ".join(cmd), stderr.rstrip()))
raise CalledProcessErrorStderr(proc.returncode, cmd, output=stdout, stderr=stderr)
logger.debug("command succeeded: {}: {}".format(" ".join(cmd), stdout.rstrip()))
logger.debug("command succeeded in {}s: {}: {}".format(runtime, " ".join(cmd), (': ' + stdout.rstrip()) if not quiet else ''))
return stdout
5 changes: 4 additions & 1 deletion src/toil/lib/threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,10 @@ def global_mutex(workDir: str, mutex: str) -> Iterator[None]:
# Delete it while we still own it, so we can't delete it from out from
# under someone else who thinks they are holding it.
logger.debug('PID %d releasing mutex %s', os.getpid(), lock_filename)
os.unlink(lock_filename)
try:
os.unlink(lock_filename)
except OSError as e:
print(f"Error: {e.strerror}. File {lock_filename} could not be deleted.")
fcntl.lockf(fd, fcntl.LOCK_UN)
# Note that we are unlinking it and then unlocking it; a lot of people
# might have opened it before we unlinked it and will wake up when they
Expand Down
3 changes: 3 additions & 0 deletions src/toil/statsAndLogging.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
DEFAULT_LOGLEVEL = logging.INFO
__loggingFiles = []

# We have some logging that belongs at a TRACE level, below DEBUG
TRACE = logging.DEBUG - 5
logging.addLevelName(TRACE, "TRACE")

class StatsAndLogging:
"""A thread to aggregate statistics and logging."""
Expand Down