Skip to content

Commit

Permalink
Merge pull request #22 from julianhess/develop
Browse files Browse the repository at this point in the history
Add job avoidance
  • Loading branch information
agraubert authored Feb 27, 2020
2 parents a1153d6 + ab151f6 commit a053e29
Show file tree
Hide file tree
Showing 17 changed files with 486 additions and 153 deletions.
44 changes: 44 additions & 0 deletions canine/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,50 @@ def walk(self, path: str) -> typing.Generator[typing.Tuple[str, typing.List[str]
for dirname in dirnames:
yield from self.walk(os.path.join(path, dirname))

def rmtree(self, path: str, max_retries: int = 5, timeout: int = 5):
"""
Recursively remove the directory tree rooted at the given path.
Automatically retries failures after a brief timeout
"""
pathstat = self.stat(path)
if not stat.S_ISDIR(pathstat.st_mode):
raise NotADirectoryError(path)
for attempt in range(max_retries):
try:
return self._rmtree(path, pathstat)
except (OSError, FileNotFoundError, IOError, NotADirectoryError):
# Helps to preserve the exception traceback by conditionally re-raising here
if attempt >= (max_retries - 1):
raise
time.sleep(timeout)
# Should not be possible to reach here
raise RuntimeError("AbstractTransport.rmtree exceeded retries without exception")

def _rmtree(self, path: str, pathstat: os.stat_result):
"""
(Internal)
Recursively remove the directory tree rooted at the given path.
Automatically retries failures after a brief timeout
"""
if not stat.S_ISDIR(pathstat.st_mode):
raise NotADirectoryError(path)
for fname in self.listdir(path):
fname = os.path.join(path, fname)
try:
fstat = self.stat(fname)
except FileNotFoundError:
# Handling for broken symlinks is bad
self.remove(fname)
else:
if stat.S_ISDIR(fstat.st_mode):
self._rmtree(
fname,
fstat
)
else:
self.remove(fname)
self.rmdir(path)

def sendtree(self, src: str, dest: str):
"""
Copy the full local file tree src to the remote path dest
Expand Down
108 changes: 81 additions & 27 deletions canine/backends/dockerTransient.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import io
import pickle
import math
import threading
import time

from .imageTransient import TransientImageSlurmBackend, list_instances, gce
from ..utils import get_default_gcp_project, gcp_hourly_cost
Expand All @@ -19,15 +21,13 @@

class DockerTransientImageSlurmBackend(TransientImageSlurmBackend): # {{{
def __init__(
self, nfs_compute_script = "/usr/local/share/cga_pipeline/src/provision_storage_container_host.sh",
compute_script = "/usr/local/share/cga_pipeline/src/provision_worker_container_host.sh",
self, cluster_name, *,
nfs_compute_script = "/usr/local/share/slurm_gcp_docker/src/provision_storage_container_host.sh",
compute_script = "/usr/local/share/slurm_gcp_docker/src/provision_worker_container_host.sh",
nfs_disk_size = 2000, nfs_disk_type = "pd-standard", nfs_action_on_stop = "stop", nfs_image = "",
action_on_stop = "delete", image_family = "pydpiper", image = None,
cluster_name = None, clust_frac = 0.01, user = os.environ["USER"], **kwargs
action_on_stop = "delete", image_family = "slurm-gcp-docker", image = None,
clust_frac = 0.01, user = os.environ["USER"], **kwargs
):
if cluster_name is None:
raise ValueError("You must specify a name for this Slurm cluster!")

if "image" not in kwargs:
kwargs["image"] = image

Expand Down Expand Up @@ -70,14 +70,16 @@ def __init__(

self.NFS_server_ready = False
self.NFS_ready = False
self.NFS_monitor_thread = None
self.NFS_monitor_lock = None

def init_slurm(self):
self.dkr = docker.from_env()

#
# check if image exists
try:
image = self.dkr.images.get('broadinstitute/pydpiper:latest')
image = self.dkr.images.get('broadinstitute/slurm_gcp_docker:latest')
except docker.errors.ImageNotFound:
raise Exception("You have not yet built or pulled the Slurm Docker image!")

Expand All @@ -100,10 +102,14 @@ def init_slurm(self):
#
# create the Slurm container if it's not already present
if self.config["cluster_name"] not in [x.name for x in self.dkr.containers.list()]:
#if image not in [x.image for x in self.dkr.containers.list()]:
# FIXME: gcloud is cloud-provider specific. how can we make this more generic?
gcloud_conf_dir = subprocess.check_output("echo -n ~/.config/gcloud", shell = True).decode()
self.dkr.containers.run(
image = image.tags[0], detach = True, network_mode = "host",
volumes = { "/mnt/nfs" : { "bind" : "/mnt/nfs", "mode" : "rw" } },
volumes = {
"/mnt/nfs" : { "bind" : "/mnt/nfs", "mode" : "rw" },
gcloud_conf_dir : { "bind" : "/etc/gcloud", "mode" : "rw" }
},
name = self.config["cluster_name"], command = "/bin/bash",
user = self.config["user"], stdin_open = True, remove = True
)
Expand Down Expand Up @@ -131,7 +137,7 @@ def init_nodes(self):
if not self.NFS_ready:
raise Exception("NFS must be mounted before starting nodes!")

self.wait_for_cluster_ready()
self.wait_for_cluster_ready(elastic = True)

# list all the nodes that Slurm is aware of

Expand Down Expand Up @@ -167,33 +173,59 @@ def init_nodes(self):
])

def stop(self):
# stop the Docker
if self.container is not None:
self.container().stop()

# delete node configuration file
subprocess.check_call("rm -f /mnt/nfs/clust_conf/canine/backend_conf.pickle", shell = True)
try:
subprocess.check_call("rm -f /mnt/nfs/clust_conf/canine/backend_conf.pickle", shell = True)
except subprocess.CalledProcessError as e:
print("Couldn't delete node configuration file:", file = sys.stderr)
print(e)

# get list of nodes that still exist
#
# shutdown nodes that are still running (except NFS)
allnodes = self.nodes
extant_nodes = self.list_instances_all_zones()
self.nodes = allnodes.loc[allnodes.index.isin(extant_nodes["name"]) &
(allnodes["machine_type"] != "nfs")]

# sometimes the Google API will spectacularly fail; in that case, we
# just try to shutdown everything in the node list, regardless of whether
# it exists.
try:
extant_nodes = self.list_instances_all_zones()
self.nodes = allnodes.loc[allnodes.index.isin(extant_nodes["name"]) &
(allnodes["machine_type"] != "nfs")]
except:
self.nodes = allnodes.loc[allnodes["machine_type"] != "nfs"]

# superclass method will stop/delete/leave these running, depending on how
# self.config["action_on_stop"] is set
super().stop()

# we handle the NFS separately
self.nodes = allnodes.loc[allnodes["machine_type"] == "nfs"]
super().stop(action_on_stop = self.config["nfs_action_on_stop"])
#
# stop the Docker

# this needs to happen after super().stop() is invoked, since that
# calls scancel, which in turn requires a running Slurm controller Docker
if self.container is not None:
self.container().stop()

#
# unmount the NFS

# this needs to be the last step, since Docker will hang if NFS is pulled
# out from under it
if self.config["nfs_action_on_stop"] != "run":
try:
subprocess.check_call("sudo umount -f /mnt/nfs", shell = True)
except subprocess.CalledProcessError:
print("Could not unmount NFS (do you have open files on it?)\nPlease run `lsof | grep /mnt/nfs`, close any open files, and run `sudo umount -f /mnt/nfs` before attempting to run another pipeline.")

# superclass method will stop/delete/leave the NFS running, depending on
# how self.config["nfs_action_on_stop"] is set.

# kill thread that auto-restarts NFS
self.NFS_monitor_lock.set()

self.nodes = allnodes.loc[allnodes["machine_type"] == "nfs"]
super().stop(action_on_stop = self.config["nfs_action_on_stop"], kill_straggling_jobs = False)

def _get_container(self, container_name):
def closure():
return self.dkr.containers.get(container_name)
Expand Down Expand Up @@ -246,6 +278,14 @@ def start_NFS(self):
)
print("done", flush = True)

# start NFS monitoring thread
self.NFS_monitor_lock = threading.Event()
self.NFS_monitor_thread = threading.Thread(
target = self.autorestart_preempted_node,
args = (nfs_nodename,)
)
self.NFS_monitor_thread.start()

self.NFS_server_ready = True

def mount_NFS(self):
Expand All @@ -270,10 +310,24 @@ def get_latest_image(self, image_family = None):
return gce.images().getFromFamily(family = image_family, project = self.config["project"]).execute()

def invoke(self, command, interactive = False):
return_code, (stdout, stderr) = self.container().exec_run(
command, demux = True, tty = interactive, stdin = interactive
)
return (return_code, io.BytesIO(stdout), io.BytesIO(stderr))
if self.container is not None and self.container().status == "running":
return_code, (stdout, stderr) = self.container().exec_run(
command, demux = True, tty = interactive, stdin = interactive
)
return (return_code, io.BytesIO(stdout), io.BytesIO(stderr))
else:
return (1, io.BytesIO(), io.BytesIO(b"Container is not running!"))

def autorestart_preempted_node(self, nodename):
while not self.NFS_monitor_lock.is_set():
try:
inst_details = self._pzw(gce.instances().get)(instance = nodename).execute()
if inst_details["status"] != "RUNNING":
self._pzw(gce.instances().start)(instance = nodename).execute()
except:
print("Error querying NFS server status; retrying in 60s ...", file = sys.stderr)

time.sleep(60)

# }}}

Expand Down
32 changes: 28 additions & 4 deletions canine/backends/imageTransient.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# vim: set expandtab:

import time
import typing
import subprocess
import os
Expand All @@ -9,6 +10,7 @@
from ..utils import get_default_gcp_project, gcp_hourly_cost

import googleapiclient.discovery as gd
import googleapiclient.errors
import pandas as pd

gce = gd.build('compute', 'v1')
Expand Down Expand Up @@ -257,16 +259,34 @@ def init_nodes(self):
print("WARNING: couldn't shutdown instance {}".format(node), file = sys.stderr)
print(e)

def stop(self, action_on_stop = None):
def stop(self, action_on_stop = None, kill_straggling_jobs = True):
"""
Delete or stop (default) compute instances
"""
if action_on_stop is None:
action_on_stop = self.config["action_on_stop"]

#
# stop, delete, or leave running compute nodes
# kill any still-running jobs
if kill_straggling_jobs:
try:
self.scancel(jobID = "", user = self.config["user"])

# wait for jobs to finish
print("Terminating all jobs ... ", end = "", flush = True)
tot_time = 0
while True:
if self.squeue().empty or tot_time > 60:
break
tot_time += 1
time.sleep(1)
print("done")
except Exception as e:
print("Error terminating all jobs!", file = sys.stderr)
print(e, file = sys.stderr)

#
# stop, delete, or leave running compute nodes
for node in self.nodes.index:
try:
if action_on_stop == "delete":
Expand All @@ -277,6 +297,10 @@ def stop(self, action_on_stop = None):
else:
# default behavior is to shut down
self._pzw(gce.instances().stop)(instance = node).execute()
except googleapiclient.errors.HttpError as e:
if e.resp != 404:
print("WARNING: couldn't shutdown instance {}".format(node), file = sys.stderr)
print(e)
except Exception as e:
print("WARNING: couldn't shutdown instance {}".format(node), file = sys.stderr)
print(e)
Expand All @@ -292,11 +316,11 @@ def list_instances_all_zones(self):
for x in zone_dict["items"]
], axis = 0).reset_index(drop = True)

def wait_for_cluster_ready(self):
def wait_for_cluster_ready(self, elastic = False):
"""
Blocks until the main partition is marked as up
"""
super().wait_for_cluster_ready(elastic = False)
super().wait_for_cluster_ready(elastic = elastic)

# a handy wrapper to automatically add this instance's project and zone to
# GCP API calls
Expand Down
9 changes: 9 additions & 0 deletions canine/backends/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io
import sys
import subprocess
import shutil
from .base import AbstractSlurmBackend, AbstractTransport
from ..utils import ArgumentHelper, check_call
from agutil import StdOutAdapter
Expand Down Expand Up @@ -98,6 +99,14 @@ def walk(self, path: str) -> typing.Generator[typing.Tuple[str, typing.List[str]
"""
yield from os.walk(path)

def _rmtree(self, path: str, pathstat: os.stat_result):
"""
(Internal)
Recursively remove the directory tree rooted at the given path.
Automatically retries failures after a brief timeout
"""
shutil.rmtree(path)

class LocalSlurmBackend(AbstractSlurmBackend):
"""
SLURM backend for interacting with a local slurm node
Expand Down
2 changes: 1 addition & 1 deletion canine/localization/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ def prepare_job_inputs(self, jobId: str, job_inputs: typing.Dict[str, str], comm
'stream',
value
)
elif mode == 'localize':
elif mode in ['localize', 'symlink']:
self.inputs[jobId][arg] = Localization(
None,
self.reserve_path('jobs', jobId, 'inputs', os.path.basename(os.path.abspath(value)))
Expand Down
Loading

0 comments on commit a053e29

Please sign in to comment.