Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add job avoidance #22

Merged
merged 73 commits into from
Feb 27, 2020
Merged
Show file tree
Hide file tree
Changes from 70 commits
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
77203ac
Merge branch 'output_funcs' into develop
julianhess Jan 7, 2020
d8c5997
Allow Slurm Docker to be run as non-root
julianhess Jan 8, 2020
4e8073d
Use containerized workers by default
julianhess Jan 9, 2020
b6a0507
Override root user from superclass
julianhess Jan 13, 2020
b1072af
Don't symlink deloc.py; don't copy to delocalize
julianhess Jan 14, 2020
a32cfb3
Increase default NFS disk size for perf. reasons
julianhess Jan 14, 2020
d12582f
Allow custom image to create NFS disk
julianhess Jan 14, 2020
595c837
Unmount NFS on exit
julianhess Jan 16, 2020
d5799f0
Don't symlink absolute paths to workspace
julianhess Jan 16, 2020
620c97a
Make "symlink" synonymous to "localize"
julianhess Jan 16, 2020
94140d9
Initial commit of working job avoidance
julianhess Jan 21, 2020
47f5c26
Merge branch 'docker' into develop
julianhess Jan 21, 2020
c5527e8
Merge branch 'develop' into job_avoidance
julianhess Jan 21, 2020
0e2cec9
submit_batch_job needs to be aware of jobavoidance
julianhess Jan 21, 2020
79024dd
os.rmdir is not rm -rf
julianhess Jan 21, 2020
ae0149f
More robust job avoidance checks
julianhess Jan 22, 2020
e209f3f
Concatenate avoided DF with previously existing DF
julianhess Jan 22, 2020
28de503
Don't assume job_id corresponds to Slurm task array ID
julianhess Jan 22, 2020
fc2e80c
Sort output DF after concatenating
julianhess Jan 22, 2020
65eb48e
Bump minimum Python version to 3.7
julianhess Jan 22, 2020
3fcc699
Merge branch 'master' into develop
julianhess Jan 22, 2020
08a37da
Merge branch 'docker' into develop
julianhess Jan 22, 2020
0ab09e4
Merge branch 'docker' into develop
julianhess Jan 22, 2020
66e31bd
Add warning about hardcoded path
julianhess Jan 22, 2020
a31768f
Merge branch 'docker' into develop
julianhess Jan 22, 2020
26f1b6f
Merge branch 'docker' into develop
julianhess Jan 24, 2020
f323a9e
Re-create staging dir after nuking it
julianhess Jan 24, 2020
aadefa0
Merge branch 'job_avoidance' into develop
julianhess Jan 24, 2020
b148964
Catch ValueError exceptions in job_avoid()
julianhess Jan 24, 2020
66de503
Print discrepant inputs/outputs
julianhess Jan 25, 2020
3f12853
Catch ValueError exceptions in job_avoid()
julianhess Jan 24, 2020
3ff14fc
Print discrepant inputs/outputs
julianhess Jan 25, 2020
916f241
Merge branch 'job_avoidance' into develop
julianhess Jan 25, 2020
32d5172
Prevent race conditions when saving dataframe
julianhess Jan 27, 2020
e0245d4
Merge branch 'job_avoidance' into develop
julianhess Jan 27, 2020
138c65d
Output dataframe returns CPU time in seconds
julianhess Jan 27, 2020
87b7833
Pass inputs to df through shlex
julianhess Jan 27, 2020
27f81ae
Forgot to replace hours -> seconds in a couple places
julianhess Jan 27, 2020
a8fdc8d
Merge branch 'cpu_time_by_second' into develop
julianhess Jan 27, 2020
40ded7e
Add more conditions for not job avoiding
julianhess Jan 28, 2020
58e5f11
Merge branch 'job_avoidance' into develop
julianhess Jan 28, 2020
8bcfb45
Make exception for canine outputs
julianhess Jan 28, 2020
4c0d1e5
Merge branch 'nfs_overrides' into develop
julianhess Jan 28, 2020
0641679
Kill straggling jobs upon cluster exit
julianhess Jan 30, 2020
cdbfe23
Error checking in invoke method
julianhess Jan 30, 2020
bf30351
Merge branch 'master' into develop
julianhess Jan 30, 2020
252b47f
Merge branch 'master' into stragglers
julianhess Jan 30, 2020
cad6f92
Merge branch 'develop' into stragglers
julianhess Jan 30, 2020
c6fb82c
Update default compute scripts to new path
julianhess Jan 30, 2020
75c3820
Add NFS monitoring thread
julianhess Feb 3, 2020
477d3c0
Pass elastic parameter to wait_for_cluster_ready
julianhess Feb 3, 2020
80da83c
Merge branch 'master' into develop
julianhess Feb 3, 2020
dffbfce
Canine output folders can be nested now
julianhess Feb 3, 2020
1ba9bac
Sometimes the GCE API hangs; catch this exception
julianhess Feb 3, 2020
99c9ab6
Support Pandas dataframes and series for input
julianhess Feb 3, 2020
7ccaecd
Merge branch 'stragglers' into develop
julianhess Feb 3, 2020
f348442
Handle errors from scancel
julianhess Feb 3, 2020
5a47ad7
More robust cancelling of stragglers
julianhess Feb 4, 2020
5eeee5b
Merge branch 'stragglers' into develop
julianhess Feb 4, 2020
0e9582d
Remove stdout/stderr hack
julianhess Feb 4, 2020
fef7a30
Catch exceptions from Google's API's random failures
julianhess Feb 5, 2020
964dc96
Wrap whole cancellation step in try block
julianhess Feb 5, 2020
3dd94de
Catch exception from deleting node config file
julianhess Feb 6, 2020
5bd6b03
Merge branch 'master' into develop
julianhess Feb 10, 2020
d039eed
Retry removing job shard directories
julianhess Feb 11, 2020
e02d6a7
Remove common inputs dir. if job avoidance occurs
julianhess Feb 12, 2020
631b4ab
Retry all rmtree attempts
julianhess Feb 12, 2020
c33a6e5
Change GCP image family/Docker image names
julianhess Feb 7, 2020
ef69850
Bind mount /etc/gcloud
julianhess Feb 7, 2020
e5a87f7
Bind mount ~/.config/gcloud to /etc/gcloud
julianhess Feb 7, 2020
ee64193
Job avoidance now works for all backends
agraubert Feb 19, 2020
99f8e7b
Removed UUID import
agraubert Feb 20, 2020
ab151f6
Make it clearer that cluster_name is a required param
julianhess Feb 27, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 79 additions & 23 deletions canine/backends/dockerTransient.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import io
import pickle
import math
import threading
import time

from .imageTransient import TransientImageSlurmBackend, list_instances, gce
from ..utils import get_default_gcp_project, gcp_hourly_cost
Expand All @@ -19,10 +21,10 @@

class DockerTransientImageSlurmBackend(TransientImageSlurmBackend): # {{{
def __init__(
self, nfs_compute_script = "/usr/local/share/cga_pipeline/src/provision_storage_container_host.sh",
compute_script = "/usr/local/share/cga_pipeline/src/provision_worker_container_host.sh",
self, nfs_compute_script = "/usr/local/share/slurm_gcp_docker/src/provision_storage_container_host.sh",
compute_script = "/usr/local/share/slurm_gcp_docker/src/provision_worker_container_host.sh",
nfs_disk_size = 2000, nfs_disk_type = "pd-standard", nfs_action_on_stop = "stop", nfs_image = "",
action_on_stop = "delete", image_family = "pydpiper", image = None,
action_on_stop = "delete", image_family = "slurm-gcp-docker", image = None,
cluster_name = None, clust_frac = 0.01, user = os.environ["USER"], **kwargs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason that cluster_name isn't a positional? Enforcing it as a required optional seems weird

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't want there to be any positional arguments. I should really make this constructor signature

def __init__(self, *, nfs_compute_script, ...

to clarify this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All arguments are positional in python. They can be put in arbitrary order using names regardless of if you set a default value. Not setting a default value simply makes an argument required.

Unless you're creating a complex object hierarchy where the available arguments change based on selected arguments or mixins, I am generally of the opinion that always-available arguments should be explicit in the constructor signature for the sake of clarity and readability.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can have functions which do not support positional arguments:

In [1]: def foo(*, bah):
   ...:     pass
   ...:

In [2]: foo("a")
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-2-1df0092b7087> in <module>
----> 1 foo("a")

TypeError: foo() takes 0 positional arguments but 1 was given

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, that's a sneaky new python 3 trick.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most code I develop has been 2 and 3 compatible, so I haven't come across that pattern.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's Python 3-only, though I wouldn't exactly call it "new" :-)

Added to the standard in 2006: https://www.python.org/dev/peps/pep-3102/

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I guess I should clarify. I think that setting an argument to have a default kind of signifies that it's optional. I have required parameters in the NFSLocalizer that have a default and appear in the middle of the argument list for compliance with the base class API. Is there a reason not to move the required parameters to the front of the list and remove their defaults so it's more obvious in docs/inspection that they are required?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, I was being real dumb. Let's make this a positional and remove the check in the constructor.

https://github.com/broadinstitute/canine/blob/e02d6a73ffb93442ca5c70d9cd4345f780b7a2c6/canine/backends/dockerTransient.py#L30-L31

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(It's not like the exception message gives the user any additional information.)

):
if cluster_name is None:
Expand Down Expand Up @@ -70,14 +72,16 @@ def __init__(

self.NFS_server_ready = False
self.NFS_ready = False
self.NFS_monitor_thread = None
self.NFS_monitor_lock = None

def init_slurm(self):
self.dkr = docker.from_env()

#
# check if image exists
try:
image = self.dkr.images.get('broadinstitute/pydpiper:latest')
image = self.dkr.images.get('broadinstitute/slurm_gcp_docker:latest')
except docker.errors.ImageNotFound:
raise Exception("You have not yet built or pulled the Slurm Docker image!")

Expand All @@ -100,10 +104,14 @@ def init_slurm(self):
#
# create the Slurm container if it's not already present
if self.config["cluster_name"] not in [x.name for x in self.dkr.containers.list()]:
#if image not in [x.image for x in self.dkr.containers.list()]:
# FIXME: gcloud is cloud-provider specific. how can we make this more generic?
gcloud_conf_dir = subprocess.check_output("echo -n ~/.config/gcloud", shell = True).decode()
self.dkr.containers.run(
image = image.tags[0], detach = True, network_mode = "host",
volumes = { "/mnt/nfs" : { "bind" : "/mnt/nfs", "mode" : "rw" } },
volumes = {
"/mnt/nfs" : { "bind" : "/mnt/nfs", "mode" : "rw" },
gcloud_conf_dir : { "bind" : "/etc/gcloud", "mode" : "rw" }
},
name = self.config["cluster_name"], command = "/bin/bash",
user = self.config["user"], stdin_open = True, remove = True
)
Expand Down Expand Up @@ -131,7 +139,7 @@ def init_nodes(self):
if not self.NFS_ready:
raise Exception("NFS must be mounted before starting nodes!")

self.wait_for_cluster_ready()
self.wait_for_cluster_ready(elastic = True)

# list all the nodes that Slurm is aware of

Expand Down Expand Up @@ -167,33 +175,59 @@ def init_nodes(self):
])

def stop(self):
# stop the Docker
if self.container is not None:
self.container().stop()

# delete node configuration file
subprocess.check_call("rm -f /mnt/nfs/clust_conf/canine/backend_conf.pickle", shell = True)
try:
subprocess.check_call("rm -f /mnt/nfs/clust_conf/canine/backend_conf.pickle", shell = True)
except subprocess.CalledProcessError as e:
print("Couldn't delete node configuration file:", file = sys.stderr)
print(e)

# get list of nodes that still exist
#
# shutdown nodes that are still running (except NFS)
allnodes = self.nodes
extant_nodes = self.list_instances_all_zones()
self.nodes = allnodes.loc[allnodes.index.isin(extant_nodes["name"]) &
(allnodes["machine_type"] != "nfs")]

# sometimes the Google API will spectacularly fail; in that case, we
# just try to shutdown everything in the node list, regardless of whether
# it exists.
try:
extant_nodes = self.list_instances_all_zones()
self.nodes = allnodes.loc[allnodes.index.isin(extant_nodes["name"]) &
(allnodes["machine_type"] != "nfs")]
except:
self.nodes = allnodes.loc[allnodes["machine_type"] != "nfs"]

# superclass method will stop/delete/leave these running, depending on how
# self.config["action_on_stop"] is set
super().stop()

# we handle the NFS separately
self.nodes = allnodes.loc[allnodes["machine_type"] == "nfs"]
super().stop(action_on_stop = self.config["nfs_action_on_stop"])
#
# stop the Docker

# this needs to happen after super().stop() is invoked, since that
# calls scancel, which in turn requires a running Slurm controller Docker
if self.container is not None:
self.container().stop()

#
# unmount the NFS

# this needs to be the last step, since Docker will hang if NFS is pulled
# out from under it
if self.config["nfs_action_on_stop"] != "run":
try:
subprocess.check_call("sudo umount -f /mnt/nfs", shell = True)
except subprocess.CalledProcessError:
print("Could not unmount NFS (do you have open files on it?)\nPlease run `lsof | grep /mnt/nfs`, close any open files, and run `sudo umount -f /mnt/nfs` before attempting to run another pipeline.")

# superclass method will stop/delete/leave the NFS running, depending on
# how self.config["nfs_action_on_stop"] is set.

# kill thread that auto-restarts NFS
self.NFS_monitor_lock.set()

self.nodes = allnodes.loc[allnodes["machine_type"] == "nfs"]
super().stop(action_on_stop = self.config["nfs_action_on_stop"], kill_straggling_jobs = False)

def _get_container(self, container_name):
def closure():
return self.dkr.containers.get(container_name)
Expand Down Expand Up @@ -246,6 +280,14 @@ def start_NFS(self):
)
print("done", flush = True)

# start NFS monitoring thread
self.NFS_monitor_lock = threading.Event()
self.NFS_monitor_thread = threading.Thread(
target = self.autorestart_preempted_node,
args = (nfs_nodename,)
)
self.NFS_monitor_thread.start()

self.NFS_server_ready = True

def mount_NFS(self):
Expand All @@ -270,10 +312,24 @@ def get_latest_image(self, image_family = None):
return gce.images().getFromFamily(family = image_family, project = self.config["project"]).execute()

def invoke(self, command, interactive = False):
return_code, (stdout, stderr) = self.container().exec_run(
command, demux = True, tty = interactive, stdin = interactive
)
return (return_code, io.BytesIO(stdout), io.BytesIO(stderr))
if self.container is not None and self.container().status == "running":
return_code, (stdout, stderr) = self.container().exec_run(
command, demux = True, tty = interactive, stdin = interactive
)
return (return_code, io.BytesIO(stdout), io.BytesIO(stderr))
else:
return (1, io.BytesIO(), io.BytesIO(b"Container is not running!"))

def autorestart_preempted_node(self, nodename):
while not self.NFS_monitor_lock.is_set():
try:
inst_details = self._pzw(gce.instances().get)(instance = nodename).execute()
if inst_details["status"] != "RUNNING":
self._pzw(gce.instances().start)(instance = nodename).execute()
except:
print("Error querying NFS server status; retrying in 60s ...", file = sys.stderr)

time.sleep(60)

# }}}

Expand Down
32 changes: 28 additions & 4 deletions canine/backends/imageTransient.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# vim: set expandtab:

import time
import typing
import subprocess
import os
Expand All @@ -9,6 +10,7 @@
from ..utils import get_default_gcp_project, gcp_hourly_cost

import googleapiclient.discovery as gd
import googleapiclient.errors
import pandas as pd

gce = gd.build('compute', 'v1')
Expand Down Expand Up @@ -257,16 +259,34 @@ def init_nodes(self):
print("WARNING: couldn't shutdown instance {}".format(node), file = sys.stderr)
print(e)

def stop(self, action_on_stop = None):
def stop(self, action_on_stop = None, kill_straggling_jobs = True):
"""
Delete or stop (default) compute instances
"""
if action_on_stop is None:
action_on_stop = self.config["action_on_stop"]

#
# stop, delete, or leave running compute nodes
# kill any still-running jobs
if kill_straggling_jobs:
try:
self.scancel(jobID = "", user = self.config["user"])

# wait for jobs to finish
print("Terminating all jobs ... ", end = "", flush = True)
tot_time = 0
while True:
if self.squeue().empty or tot_time > 60:
break
tot_time += 1
time.sleep(1)
print("done")
except Exception as e:
print("Error terminating all jobs!", file = sys.stderr)
print(e, file = sys.stderr)

#
# stop, delete, or leave running compute nodes
for node in self.nodes.index:
try:
if action_on_stop == "delete":
Expand All @@ -277,6 +297,10 @@ def stop(self, action_on_stop = None):
else:
# default behavior is to shut down
self._pzw(gce.instances().stop)(instance = node).execute()
except googleapiclient.errors.HttpError as e:
if e.resp != 404:
print("WARNING: couldn't shutdown instance {}".format(node), file = sys.stderr)
print(e)
except Exception as e:
print("WARNING: couldn't shutdown instance {}".format(node), file = sys.stderr)
print(e)
Expand All @@ -292,11 +316,11 @@ def list_instances_all_zones(self):
for x in zone_dict["items"]
], axis = 0).reset_index(drop = True)

def wait_for_cluster_ready(self):
def wait_for_cluster_ready(self, elastic = False):
"""
Blocks until the main partition is marked as up
"""
super().wait_for_cluster_ready(elastic = False)
super().wait_for_cluster_ready(elastic = elastic)

# a handy wrapper to automatically add this instance's project and zone to
# GCP API calls
Expand Down
2 changes: 1 addition & 1 deletion canine/localization/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ def prepare_job_inputs(self, jobId: str, job_inputs: typing.Dict[str, str], comm
'stream',
value
)
elif mode == 'localize':
elif mode in ['localize', 'symlink']:
self.inputs[jobId][arg] = Localization(
None,
self.reserve_path('jobs', jobId, 'inputs', os.path.basename(os.path.abspath(value)))
Expand Down
36 changes: 27 additions & 9 deletions canine/localization/nfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,7 @@ def localize_file(self, src: str, dest: PathType, transport: typing.Optional[Abs
#
# check if self.mount_path, self.local_dir, and src all exist on the same NFS share
# symlink if yes, copy if no
vols = subprocess.check_output(
"df {} {} {} | awk 'NR > 1 {{ print $1 }}'".format(
self.mount_path,
self.local_dir,
src
),
shell = True
)
if len(set(vols.decode("utf-8").rstrip().split("\n"))) == 1:
if self.same_volume(src):
os.symlink(src, dest.localpath)
else:
if os.path.isfile(src):
Expand All @@ -116,6 +108,20 @@ def localize(self, inputs: typing.Dict[str, typing.Dict[str, str]], patterns: ty
"""
if overrides is None:
overrides = {}

# automatically override inputs that are absolute paths residing on the same
# NFS share and are not Canine outputs

# XXX: this can be potentially slow, since it has to iterate over every
# single input. It would make more sense to do this before the adapter
# converts raw inputs.
agraubert marked this conversation as resolved.
Show resolved Hide resolved
for input_dict in inputs.values():
for k, v in input_dict.items():
if k not in overrides:
if re.match(r"^/", v) is not None and self.same_volume(v) and \
re.match(r".*/outputs/\d+/.*", v) is None:
agraubert marked this conversation as resolved.
Show resolved Hide resolved
overrides[k] = None

overrides = {k:v.lower() if isinstance(v, str) else None for k,v in overrides.items()}
with self.backend.transport() as transport:
if self.common:
Expand Down Expand Up @@ -269,3 +275,15 @@ def finalize_staging_dir(self, jobs: typing.Iterable[str], transport: typing.Opt
if len(jobs) and not os.path.isdir(controller_env['CANINE_OUTPUT']):
os.mkdir(controller_env['CANINE_OUTPUT'])
return self.staging_dir

def same_volume(self, *args):
"""
Check if *args are stored on the same NFS mount as the output directory.
"""
vols = subprocess.check_output(
"df {} | awk 'NR > 1 {{ print $1 }}'".format(
" ".join([shlex.quote(x) for x in [self.mount_path, self.local_dir, *args]])
),
shell = True
)
return len(set(vols.decode("utf-8").rstrip().split("\n"))) == 1
Loading