From d8c5997212b44a1abadbe990d2dfa126955e835f Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Wed, 8 Jan 2020 19:39:43 +0000 Subject: [PATCH 01/52] Allow Slurm Docker to be run as non-root --- canine/backends/dockerTransient.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/canine/backends/dockerTransient.py b/canine/backends/dockerTransient.py index 522fb51a..26eb5781 100644 --- a/canine/backends/dockerTransient.py +++ b/canine/backends/dockerTransient.py @@ -22,7 +22,7 @@ def __init__( self, nfs_compute_script = "/usr/local/share/cga_pipeline/src/provision_storage.sh", nfs_disk_size = 100, nfs_disk_type = "pd-standard", nfs_action_on_stop = "stop", action_on_stop = "delete", image_family = "pydpiper", image = None, - cluster_name = None, clust_frac = 0.01, **kwargs + cluster_name = None, clust_frac = 0.01, user = os.environ["USER"], **kwargs ): if cluster_name is None: raise ValueError("You must specify a name for this Slurm cluster!") @@ -50,6 +50,7 @@ def __init__( "image_family" : image_family, "image" : self.get_latest_image(image_family)["name"] if image is None else image, "clust_frac" : max(min(clust_frac, 1.0), 1e-6), + "user" : user, **{ k : v for k, v in self.config.items() if k not in { "worker_prefix", "image" } } } @@ -99,7 +100,7 @@ def init_slurm(self): image = image.tags[0], detach = True, network_mode = "host", volumes = { "/mnt/nfs" : { "bind" : "/mnt/nfs", "mode" : "rw" } }, name = self.config["cluster_name"], command = "/bin/bash", - stdin_open = True, remove = True + user = self.config["user"], stdin_open = True, remove = True ) self.container = self._get_container(self.config["cluster_name"]) @@ -148,7 +149,7 @@ def init_nodes(self): "".join(g.iloc[[0, -1]].index.tolist()) ) (ret, _, _) = self.invoke( - """scontrol update nodename={} + """sudo -E scontrol update nodename={} state=drain reason=unused""".format(node_expr) ) if ret != 0: From 4e8073d5690ea745209f766abd16aa8db58bbbc7 Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Thu, 9 Jan 2020 23:50:04 +0000 Subject: [PATCH 02/52] Use containerized workers by default --- canine/backends/dockerTransient.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/canine/backends/dockerTransient.py b/canine/backends/dockerTransient.py index 26eb5781..33251840 100644 --- a/canine/backends/dockerTransient.py +++ b/canine/backends/dockerTransient.py @@ -19,7 +19,8 @@ class DockerTransientImageSlurmBackend(TransientImageSlurmBackend): # {{{ def __init__( - self, nfs_compute_script = "/usr/local/share/cga_pipeline/src/provision_storage.sh", + self, nfs_compute_script = "/usr/local/share/cga_pipeline/src/provision_storage_container_host.sh", + compute_script = "/usr/local/share/cga_pipeline/src/provision_worker_container_host.sh", nfs_disk_size = 100, nfs_disk_type = "pd-standard", nfs_action_on_stop = "stop", action_on_stop = "delete", image_family = "pydpiper", image = None, cluster_name = None, clust_frac = 0.01, user = os.environ["USER"], **kwargs @@ -32,7 +33,10 @@ def __init__( # superclass constructor does something special with compute_script so # we need to pass it in - kwargs["compute_script"] = "/usr/local/share/cga_pipeline/src/provision_worker.sh {worker_prefix}".format(worker_prefix = socket.gethostname()) + kwargs["compute_script"] = "{script} {worker_prefix}".format( + script = compute_script, + worker_prefix = socket.gethostname() + ) super().__init__(**kwargs) self.config = { From b6a05076ced88d54c41cc1bfa40f2b9dbc17b044 Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Mon, 13 Jan 2020 23:26:37 +0000 Subject: [PATCH 03/52] Override root user from superclass --- canine/backends/dockerTransient.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/canine/backends/dockerTransient.py b/canine/backends/dockerTransient.py index 33251840..4303d527 100644 --- a/canine/backends/dockerTransient.py +++ b/canine/backends/dockerTransient.py @@ -55,7 +55,7 @@ def __init__( "image" : self.get_latest_image(image_family)["name"] if image is None else image, "clust_frac" : max(min(clust_frac, 1.0), 1e-6), "user" : user, - **{ k : v for k, v in self.config.items() if k not in { "worker_prefix", "image" } } + **{ k : v for k, v in self.config.items() if k not in { "worker_prefix", "image", "user" } } } # placeholder for Docker API From b1072afe0ec63e4e4f21cd1bccf414e1ca791bc4 Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Tue, 14 Jan 2020 03:28:57 +0000 Subject: [PATCH 04/52] Don't symlink deloc.py; don't copy to delocalize --- canine/localization/nfs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/canine/localization/nfs.py b/canine/localization/nfs.py index b06875be..0332aea2 100644 --- a/canine/localization/nfs.py +++ b/canine/localization/nfs.py @@ -140,7 +140,7 @@ def localize(self, inputs: typing.Dict[str, typing.Dict[str, str]], patterns: ty with open(script_path.localpath, 'w') as w: w.write(teardown_script) os.chmod(script_path.localpath, 0o775) - os.symlink( + shutil.copyfile( os.path.join( os.path.dirname(__file__), 'delocalization.py' @@ -218,7 +218,7 @@ def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typ '#!/bin/bash', 'if [[ -d {0} ]]; then cd {0}; fi'.format(os.path.join(compute_env['CANINE_JOBS'], jobId, 'workspace')), # 'mv ../stderr ../stdout .', - 'if which python3 2>/dev/null >/dev/null; then python3 {0} {1} {2} -c {3}; else python {0} {1} {2} -c {3}; fi'.format( + 'if which python3 2>/dev/null >/dev/null; then python3 {0} {1} {2} {3}; else python {0} {1} {2} {3}; fi'.format( os.path.join(compute_env['CANINE_ROOT'], 'delocalization.py'), compute_env['CANINE_OUTPUT'], jobId, From a32cfb3b16da60def52015331643a0df1b71a8a9 Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Tue, 14 Jan 2020 20:06:08 +0000 Subject: [PATCH 05/52] Increase default NFS disk size for perf. reasons Google artificially scales disk performance by size; a 2 TB disk seems like a sweet spot --- canine/backends/dockerTransient.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/canine/backends/dockerTransient.py b/canine/backends/dockerTransient.py index 4303d527..14fedcdd 100644 --- a/canine/backends/dockerTransient.py +++ b/canine/backends/dockerTransient.py @@ -21,7 +21,7 @@ class DockerTransientImageSlurmBackend(TransientImageSlurmBackend): # {{{ def __init__( self, nfs_compute_script = "/usr/local/share/cga_pipeline/src/provision_storage_container_host.sh", compute_script = "/usr/local/share/cga_pipeline/src/provision_worker_container_host.sh", - nfs_disk_size = 100, nfs_disk_type = "pd-standard", nfs_action_on_stop = "stop", + nfs_disk_size = 2000, nfs_disk_type = "pd-standard", nfs_action_on_stop = "stop", action_on_stop = "delete", image_family = "pydpiper", image = None, cluster_name = None, clust_frac = 0.01, user = os.environ["USER"], **kwargs ): From d12582f36e45d1ececf2e798f41757c71fc47ae3 Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Tue, 14 Jan 2020 22:25:49 +0000 Subject: [PATCH 06/52] Allow custom image to create NFS disk --- canine/backends/dockerTransient.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/canine/backends/dockerTransient.py b/canine/backends/dockerTransient.py index 14fedcdd..bd86c158 100644 --- a/canine/backends/dockerTransient.py +++ b/canine/backends/dockerTransient.py @@ -21,7 +21,7 @@ class DockerTransientImageSlurmBackend(TransientImageSlurmBackend): # {{{ def __init__( self, nfs_compute_script = "/usr/local/share/cga_pipeline/src/provision_storage_container_host.sh", compute_script = "/usr/local/share/cga_pipeline/src/provision_worker_container_host.sh", - nfs_disk_size = 2000, nfs_disk_type = "pd-standard", nfs_action_on_stop = "stop", + nfs_disk_size = 2000, nfs_disk_type = "pd-standard", nfs_action_on_stop = "stop", nfs_image = "", action_on_stop = "delete", image_family = "pydpiper", image = None, cluster_name = None, clust_frac = 0.01, user = os.environ["USER"], **kwargs ): @@ -43,10 +43,11 @@ def __init__( "cluster_name" : cluster_name, "worker_prefix" : socket.gethostname(), "nfs_compute_script" : - "--metadata startup-script=\"{script} {nfsds:d} {nfsdt}\"".format( + "--metadata startup-script=\"{script} {nfsds:d} {nfsdt} {nfsimg}\"".format( script = nfs_compute_script, nfsds = nfs_disk_size, - nfsdt = nfs_disk_type + nfsdt = nfs_disk_type, + nfsimg = nfs_image ), "action_on_stop" : action_on_stop, "nfs_action_on_stop" : nfs_action_on_stop if nfs_action_on_stop is not None From 595c837e24c405f4338db38ae4dd314b50ca9991 Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Thu, 16 Jan 2020 19:38:37 +0000 Subject: [PATCH 07/52] Unmount NFS on exit --- canine/backends/dockerTransient.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/canine/backends/dockerTransient.py b/canine/backends/dockerTransient.py index bd86c158..5d3e1929 100644 --- a/canine/backends/dockerTransient.py +++ b/canine/backends/dockerTransient.py @@ -187,6 +187,12 @@ def stop(self): self.nodes = allnodes.loc[allnodes["machine_type"] == "nfs"] super().stop(action_on_stop = self.config["nfs_action_on_stop"]) + if self.config["nfs_action_on_stop"] != "run": + try: + subprocess.check_call("sudo umount -f /mnt/nfs", shell = True) + except subprocess.CalledProcessError: + print("Could not unmount NFS (do you have open files on it?)\nPlease run `lsof | grep /mnt/nfs`, close any open files, and run `sudo umount -f /mnt/nfs` before attempting to run another pipeline.") + def _get_container(self, container_name): def closure(): return self.dkr.containers.get(container_name) From d5799f0982d6664197ab657079905e599ab04bc3 Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Thu, 16 Jan 2020 20:37:13 +0000 Subject: [PATCH 08/52] Don't symlink absolute paths to workspace If a file specified as an absolute path resides on the same NFS as the workspace, we shouldn't symlink it into the workspace -- chances are the user meant to supply it as an absolute path. If the user truly wants to symlink, they can specify this manually with an override. --- canine/localization/nfs.py | 35 ++++++++++++++++++++++++++--------- 1 file changed, 26 insertions(+), 9 deletions(-) diff --git a/canine/localization/nfs.py b/canine/localization/nfs.py index 0332aea2..692b7890 100644 --- a/canine/localization/nfs.py +++ b/canine/localization/nfs.py @@ -86,15 +86,7 @@ def localize_file(self, src: str, dest: PathType, transport: typing.Optional[Abs # # check if self.mount_path, self.local_dir, and src all exist on the same NFS share # symlink if yes, copy if no - vols = subprocess.check_output( - "df {} {} {} | awk 'NR > 1 {{ print $1 }}'".format( - self.mount_path, - self.local_dir, - src - ), - shell = True - ) - if len(set(vols.decode("utf-8").rstrip().split("\n"))) == 1: + if self.same_volume(src): os.symlink(src, dest.localpath) else: if os.path.isfile(src): @@ -116,6 +108,19 @@ def localize(self, inputs: typing.Dict[str, typing.Dict[str, str]], patterns: ty """ if overrides is None: overrides = {} + + # automatically override inputs that are absolute paths residing on the same + # NFS share + + # XXX: this can be potentially slow, since it has to iterate over every + # single input. It would make more sense to do this before the adapter + # converts raw inputs. + for input_dict in inputs.values(): + for k, v in input_dict.items(): + if k not in overrides: + if re.match(r"^/", v) is not None and self.same_volume(v): + overrides[k] = None + overrides = {k:v.lower() if isinstance(v, str) else None for k,v in overrides.items()} with self.backend.transport() as transport: if self.common: @@ -273,3 +278,15 @@ def finalize_staging_dir(self, jobs: typing.Iterable[str], transport: typing.Opt if len(jobs) and not os.path.isdir(controller_env['CANINE_OUTPUT']): os.mkdir(controller_env['CANINE_OUTPUT']) return self.staging_dir + + def same_volume(self, *args): + """ + Check if *args are stored on the same NFS mount as the output directory. + """ + vols = subprocess.check_output( + "df {} | awk 'NR > 1 {{ print $1 }}'".format( + " ".join([self.mount_path, self.local_dir, *args]) + ), + shell = True + ) + return len(set(vols.decode("utf-8").rstrip().split("\n"))) == 1 From 620c97a60431dfc2066321135d665d2d13d2f7e9 Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Thu, 16 Jan 2020 21:21:45 +0000 Subject: [PATCH 09/52] Make "symlink" synonymous to "localize" This makes it more intuitive for a user override the NFS backend's new default behavior. --- canine/localization/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/canine/localization/base.py b/canine/localization/base.py index d41f8bb9..21c6416c 100644 --- a/canine/localization/base.py +++ b/canine/localization/base.py @@ -441,7 +441,7 @@ def prepare_job_inputs(self, jobId: str, job_inputs: typing.Dict[str, str], comm 'stream', value ) - elif mode == 'localize': + elif mode in ['localize', 'symlink']: self.inputs[jobId][arg] = Localization( None, self.reserve_path('jobs', jobId, 'inputs', os.path.basename(os.path.abspath(value))) From 94140d98de5ea96593da3af3c904624211f47bf8 Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Tue, 21 Jan 2020 20:32:05 +0000 Subject: [PATCH 10/52] Initial commit of working job avoidance --- canine/orchestrator.py | 47 ++++++++++++++++++++++++++---------------- 1 file changed, 29 insertions(+), 18 deletions(-) diff --git a/canine/orchestrator.py b/canine/orchestrator.py index 63d54387..5e731032 100644 --- a/canine/orchestrator.py +++ b/canine/orchestrator.py @@ -421,26 +421,37 @@ def job_avoid(self, localizer, overwrite = False): #TODO: add params for type of # load in results and job spec dataframes r_df = pd.read_pickle(df_path.localpath) js_df = pd.DataFrame.from_dict(self.job_spec, orient = "index").rename_axis(index = "_job_id") - js_df.columns = pd.MultiIndex.from_product([["inputs"], js_df.columns]) - - r_df = r_df.reset_index(col_level = 0, col_fill = "_job_id") - js_df = js_df.reset_index(col_level = 0, col_fill = "_job_id") # check if jobs are compatible: they must have identical inputs and index, # and output columns must be matching - if not r_df["inputs"].columns.equals(js_df["inputs"].columns): + if not (r_df["inputs"].columns.isin(js_df.columns).all() and \ + js_df.columns.isin(r_df["inputs"].columns).all()): raise ValueError("Cannot job avoid; set of input parameters do not match") - r_df_inputs = r_df[["inputs", "_job_id"]].droplevel(0, axis = 1) - js_df_inputs = js_df[["inputs", "_job_id"]].droplevel(0, axis = 1) - - merge_df = r_df_inputs.join(js_df_inputs, how = "outer", lsuffix = "__r", rsuffix = "__js") - merge_df.columns = pd.MultiIndex.from_tuples([x.split("__")[::-1] for x in merge_df.columns]) - - if not merge_df["r"].equals(merge_df["js"]): - raise ValueError("Cannot job avoid; values of input parameters do not match") - - # if jobs are indeed compatible, we can figure out which need to be re-run - r_df.loc[r_df[("job", "slurm_state")] == "FAILED"] - - # destroy their output directories + # FIXME: removing stdout/stderr from output keys is a bug fix -- + # for some reason these aren't getting propagated to the output DF + output_temp = pd.Series(index = self.raw_outputs.keys() - {'stdout', 'stderr'}) + if not (r_df["outputs"].columns.isin(output_temp.index).all() and \ + output_temp.index.isin(r_df["outputs"].columns).all()): + raise ValueError("Cannot job avoid; sets of output parameters do not match") + + # check that values of inputs are the same + # we have to sort because the order of jobs might differ for the same + # inputs + sort_cols = r_df.columns.to_series()["inputs"] + r_df = r_df.sort_values(sort_cols.tolist()) + js_df = js_df.sort_values(sort_cols.index.tolist()) + + if not r_df["inputs"].equals(js_df): + raise ValueError("Cannot job avoid; values of input parameters do not match!") + + # if all is well, figure out which jobs need to be re-run + fail_idx = r_df[("job", "slurm_state")] == "FAILED" + + # remove jobs that don't need to be re-run from job_spec + for k in r_df.index[~fail_idx]: + self.job_spec.pop(k, None) + + # remove output directories of failed jobs + for k in self.job_spec: + os.rmdir(os.path.join(localizer.environment('local')["CANINE_JOBS"], k)) From 0e2cec9f54e6fa1459ef995d393936dc0990317f Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Tue, 21 Jan 2020 21:15:36 +0000 Subject: [PATCH 11/52] submit_batch_job needs to be aware of jobavoidance If a batch job of length 0 makes it to here, it was definitely completely avoided -- prior steps ensure that batch jobs of length 0 cannot proceed unless avoided. --- canine/orchestrator.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/canine/orchestrator.py b/canine/orchestrator.py index 5e731032..8776208f 100644 --- a/canine/orchestrator.py +++ b/canine/orchestrator.py @@ -399,6 +399,10 @@ def make_output_DF(self, batch_id, outputs, cpu_time, prev_acct, localizer = Non return df def submit_batch_job(self, entrypoint_path, compute_env, extra_sbatch_args = {}): + # this job was avoided + if len(self.job_spec) == 0: + return -2 + batch_id = self.backend.sbatch( entrypoint_path, **{ From 79024ddc2ebee059b62aec1d3ce8f9552ef0d23c Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Tue, 21 Jan 2020 23:06:09 +0000 Subject: [PATCH 12/52] os.rmdir is not rm -rf --- canine/orchestrator.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/canine/orchestrator.py b/canine/orchestrator.py index 8776208f..ff077865 100644 --- a/canine/orchestrator.py +++ b/canine/orchestrator.py @@ -1,6 +1,7 @@ import typing import os import time +import shutil import sys import warnings import traceback @@ -458,4 +459,4 @@ def job_avoid(self, localizer, overwrite = False): #TODO: add params for type of # remove output directories of failed jobs for k in self.job_spec: - os.rmdir(os.path.join(localizer.environment('local')["CANINE_JOBS"], k)) + shutil.rmtree(os.path.join(localizer.environment('local')["CANINE_JOBS"], k)) From ae0149f623a4873e2df9e54d84ca2c714c5ae9eb Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Wed, 22 Jan 2020 03:52:29 +0000 Subject: [PATCH 13/52] More robust job avoidance checks - Remove entire output directory if: - dataframe is empty (run likely crashed) - dataframe doesn't exist (run never started?) - Allow for all output to be removed, regardless of status (overwrite = True) --- canine/orchestrator.py | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/canine/orchestrator.py b/canine/orchestrator.py index ff077865..ca73a36a 100644 --- a/canine/orchestrator.py +++ b/canine/orchestrator.py @@ -11,6 +11,7 @@ from .localization import AbstractLocalizer, BatchedLocalizer, LocalLocalizer, RemoteLocalizer, NFSLocalizer from .utils import check_call import yaml +import numpy as np import pandas as pd from agutil import status_bar version = '0.7.0' @@ -420,13 +421,25 @@ def submit_batch_job(self, entrypoint_path, compute_env, extra_sbatch_args = {}) return batch_id def job_avoid(self, localizer, overwrite = False): #TODO: add params for type of avoidance (force, only if failed, etc.) - # is there preexisting output? df_path = localizer.reserve_path(localizer.staging_dir, "results.k9df.pickle") - if os.path.exists(df_path.localpath): + + # remove all output if specified + if overwrite: + if os.path.isdir(localizer.staging_dir): + shutil.rmtree(localizer.staging_dir) + return 0 + + # check for preexisting jobs' output + if os.path.exists(df_path.localpath): # load in results and job spec dataframes r_df = pd.read_pickle(df_path.localpath) js_df = pd.DataFrame.from_dict(self.job_spec, orient = "index").rename_axis(index = "_job_id") + if r_df.empty: + print("Could not recover previous job results; overwriting output and aborting job avoidance.") + shutil.rmtree(localizer.staging_dir) + return 0 + # check if jobs are compatible: they must have identical inputs and index, # and output columns must be matching if not (r_df["inputs"].columns.isin(js_df.columns).all() and \ @@ -452,6 +465,7 @@ def job_avoid(self, localizer, overwrite = False): #TODO: add params for type of # if all is well, figure out which jobs need to be re-run fail_idx = r_df[("job", "slurm_state")] == "FAILED" + self.df_avoided = r_df.loc[~fail_idx] # remove jobs that don't need to be re-run from job_spec for k in r_df.index[~fail_idx]: @@ -460,3 +474,12 @@ def job_avoid(self, localizer, overwrite = False): #TODO: add params for type of # remove output directories of failed jobs for k in self.job_spec: shutil.rmtree(os.path.join(localizer.environment('local')["CANINE_JOBS"], k)) + + return np.count_nonzero(~fail_idx) + + # if the output directory exists but there's no output dataframe, assume + # it's corrupted and remove it + elif os.path.isdir(localizer.staging_dir): + shutil.rmtree(localizer.staging_dir) + + return 0 From e209f3f39cb1cdf0532ae2015f8fbfc36cd16d06 Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Wed, 22 Jan 2020 15:06:24 +0000 Subject: [PATCH 14/52] Concatenate avoided DF with previously existing DF --- canine/orchestrator.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/canine/orchestrator.py b/canine/orchestrator.py index ca73a36a..1facc70f 100644 --- a/canine/orchestrator.py +++ b/canine/orchestrator.py @@ -175,6 +175,10 @@ def __init__(self, config: typing.Union[str, typing.Dict[str, typing.Any]]): if 'stderr' not in self.raw_outputs: self.raw_outputs['stderr'] = '../stderr' + # placeholder for dataframe containing previous results that were + # job avoided + self.df_avoided = None + def run_pipeline(self, output_dir: str = 'canine_output', dry_run: bool = False) -> pd.DataFrame: """ Runs the configured pipeline @@ -390,8 +394,14 @@ def make_output_DF(self, batch_id, outputs, cpu_time, prev_acct, localizer = Non # that don't receive any transformation with transformed columns df["outputs"] = df["outputs"].agg({ **self.output_map, **identity_map }) except: + traceback.print_exc() df = pd.DataFrame() + # concatenate with any previously existing job avoided results + if self.df_avoided is not None: + df = pd.concat([df, self.df_avoided]) + + # save DF to disk if isinstance(localizer, AbstractLocalizer): fname = "results.k9df.pickle" df.to_pickle(fname) From 28de503ad755e091731a367b1f9b57b417a728be Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Wed, 22 Jan 2020 15:07:02 +0000 Subject: [PATCH 15/52] Don't assume job_id corresponds to Slurm task array ID This is in the case of job avoidance -- if job_id 0 gets avoided, but job_id 1 gets re-run, then Slurm task array ID 0 -> job_id 1 We do, however, assume that ordering is conserved between the two. This is due to the fact that Python dictionaries are guaranteed to be insertion ordered. --- canine/orchestrator.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/canine/orchestrator.py b/canine/orchestrator.py index 1facc70f..7c7b9133 100644 --- a/canine/orchestrator.py +++ b/canine/orchestrator.py @@ -368,10 +368,10 @@ def make_output_DF(self, batch_id, outputs, cpu_time, prev_acct, localizer = Non df = pd.DataFrame.from_dict( data={ job_id: { - ('job', 'slurm_state'): acct['State'][batch_id+'_'+job_id], - ('job', 'exit_code'): acct['ExitCode'][batch_id+'_'+job_id], - ('job', 'cpu_hours'): (prev_acct['CPUTimeRAW'][batch_id+'_'+job_id] + ( - cpu_time[batch_id+'_'+job_id] if batch_id+'_'+job_id in cpu_time else 0 + ('job', 'slurm_state'): acct['State'][batch_id+'_'+str(array_id)], + ('job', 'exit_code'): acct['ExitCode'][batch_id+'_'+str(array_id)], + ('job', 'cpu_hours'): (prev_acct['CPUTimeRAW'][batch_id+'_'+str(array_id)] + ( + cpu_time[batch_id+'_'+str(array_id)] if batch_id+'_'+str(array_id) in cpu_time else 0 ))/3600 if prev_acct is not None else -1, **{ ('inputs', key) : val for key, val in self.job_spec[job_id].items() }, **{ @@ -379,7 +379,7 @@ def make_output_DF(self, batch_id, outputs, cpu_time, prev_acct, localizer = Non for key, val in outputs[job_id].items() } } - for job_id in self.job_spec + for array_id, job_id in enumerate(self.job_spec) }, orient = "index" ).rename_axis(index = "_job_id").astype({('job', 'cpu_hours'): int}) From fc2e80ca1d9f462318a2424eeed3481cc419496b Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Wed, 22 Jan 2020 15:19:04 +0000 Subject: [PATCH 16/52] Sort output DF after concatenating --- canine/orchestrator.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/canine/orchestrator.py b/canine/orchestrator.py index 7c7b9133..0fe33ec7 100644 --- a/canine/orchestrator.py +++ b/canine/orchestrator.py @@ -394,12 +394,11 @@ def make_output_DF(self, batch_id, outputs, cpu_time, prev_acct, localizer = Non # that don't receive any transformation with transformed columns df["outputs"] = df["outputs"].agg({ **self.output_map, **identity_map }) except: - traceback.print_exc() df = pd.DataFrame() # concatenate with any previously existing job avoided results if self.df_avoided is not None: - df = pd.concat([df, self.df_avoided]) + df = pd.concat([df, self.df_avoided]).sort_index() # save DF to disk if isinstance(localizer, AbstractLocalizer): From 65eb48e331751ea0ae3eb5af1692ea3b11d2603b Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Wed, 22 Jan 2020 15:29:33 +0000 Subject: [PATCH 17/52] Bump minimum Python version to 3.7 In 28de503, we assume that Orchestrator.job_spec's elements will be insertion ordered. This is only guaranteed in Python >= 3.7. In the future, we might want to be less lazy (and explicitly store the insert order ourselves), to relax this version requirement. --- setup.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index 29b9241a..866eb043 100644 --- a/setup.py +++ b/setup.py @@ -4,8 +4,8 @@ import sys ver_info = sys.version_info -if ver_info < (3,5,4): - raise RuntimeError("canine requires at least python 3.5.4") +if ver_info < (3,7,0): + raise RuntimeError("canine requires at least python 3.7") with open(os.path.join(os.path.dirname(__file__), 'canine', 'orchestrator.py')) as r: version = re.search(r'version = \'(\d+\.\d+\.\d+[-_a-zA-Z0-9]*)\'', r.read()).group(1) From 66e31bdeef109cc2d0d075fef0392f82874f9256 Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Wed, 22 Jan 2020 18:11:18 +0000 Subject: [PATCH 18/52] Add warning about hardcoded path --- canine/backends/dockerTransient.py | 1 + 1 file changed, 1 insertion(+) diff --git a/canine/backends/dockerTransient.py b/canine/backends/dockerTransient.py index 9d9020d4..1544176a 100644 --- a/canine/backends/dockerTransient.py +++ b/canine/backends/dockerTransient.py @@ -252,6 +252,7 @@ def mount_NFS(self): if not self.NFS_server_ready: raise Exception("Need to start NFS server before attempting to mount!") + # TODO: don't hardcode this script path; make it a parameter instead nfs_prov_script = os.path.join( os.path.dirname(__file__), 'slurm-docker/src/nfs_provision_worker.sh' From f323a9ee16c9b685653ebb346ac659e077677760 Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Fri, 24 Jan 2020 18:11:23 +0000 Subject: [PATCH 19/52] Re-create staging dir after nuking it Because this is created by the localizer constructor, many localizer methods assume that it already exists. --- canine/orchestrator.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/canine/orchestrator.py b/canine/orchestrator.py index 0fe33ec7..4f6cb2b6 100644 --- a/canine/orchestrator.py +++ b/canine/orchestrator.py @@ -436,6 +436,7 @@ def job_avoid(self, localizer, overwrite = False): #TODO: add params for type of if overwrite: if os.path.isdir(localizer.staging_dir): shutil.rmtree(localizer.staging_dir) + os.makedirs(localizer.staging_dir) return 0 # check for preexisting jobs' output @@ -447,6 +448,7 @@ def job_avoid(self, localizer, overwrite = False): #TODO: add params for type of if r_df.empty: print("Could not recover previous job results; overwriting output and aborting job avoidance.") shutil.rmtree(localizer.staging_dir) + os.makedirs(localizer.staging_dir) return 0 # check if jobs are compatible: they must have identical inputs and index, @@ -490,5 +492,6 @@ def job_avoid(self, localizer, overwrite = False): #TODO: add params for type of # it's corrupted and remove it elif os.path.isdir(localizer.staging_dir): shutil.rmtree(localizer.staging_dir) + os.makedirs(localizer.staging_dir) return 0 From b148964f2c31069576b047193bbe4247df3c35af Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Fri, 24 Jan 2020 22:12:06 +0000 Subject: [PATCH 20/52] Catch ValueError exceptions in job_avoid() Otherwise, this will just fail --- canine/orchestrator.py | 94 ++++++++++++++++++++++-------------------- 1 file changed, 49 insertions(+), 45 deletions(-) diff --git a/canine/orchestrator.py b/canine/orchestrator.py index 4f6cb2b6..eeef5abf 100644 --- a/canine/orchestrator.py +++ b/canine/orchestrator.py @@ -441,53 +441,57 @@ def job_avoid(self, localizer, overwrite = False): #TODO: add params for type of # check for preexisting jobs' output if os.path.exists(df_path.localpath): - # load in results and job spec dataframes - r_df = pd.read_pickle(df_path.localpath) - js_df = pd.DataFrame.from_dict(self.job_spec, orient = "index").rename_axis(index = "_job_id") - - if r_df.empty: - print("Could not recover previous job results; overwriting output and aborting job avoidance.") - shutil.rmtree(localizer.staging_dir) - os.makedirs(localizer.staging_dir) + try: + # load in results and job spec dataframes + r_df = pd.read_pickle(df_path.localpath) + js_df = pd.DataFrame.from_dict(self.job_spec, orient = "index").rename_axis(index = "_job_id") + + if r_df.empty: + print("Could not recover previous job results; overwriting output and aborting job avoidance.") + shutil.rmtree(localizer.staging_dir) + os.makedirs(localizer.staging_dir) + return 0 + + # check if jobs are compatible: they must have identical inputs and index, + # and output columns must be matching + if not (r_df["inputs"].columns.isin(js_df.columns).all() and \ + js_df.columns.isin(r_df["inputs"].columns).all()): + raise ValueError("Cannot job avoid; set of input parameters do not match") + + # FIXME: removing stdout/stderr from output keys is a bug fix -- + # for some reason these aren't getting propagated to the output DF + output_temp = pd.Series(index = self.raw_outputs.keys() - {'stdout', 'stderr'}) + if not (r_df["outputs"].columns.isin(output_temp.index).all() and \ + output_temp.index.isin(r_df["outputs"].columns).all()): + raise ValueError("Cannot job avoid; sets of output parameters do not match") + + # check that values of inputs are the same + # we have to sort because the order of jobs might differ for the same + # inputs + sort_cols = r_df.columns.to_series()["inputs"] + r_df = r_df.sort_values(sort_cols.tolist()) + js_df = js_df.sort_values(sort_cols.index.tolist()) + + if not r_df["inputs"].equals(js_df): + raise ValueError("Cannot job avoid; values of input parameters do not match!") + + # if all is well, figure out which jobs need to be re-run + fail_idx = r_df[("job", "slurm_state")] == "FAILED" + self.df_avoided = r_df.loc[~fail_idx] + + # remove jobs that don't need to be re-run from job_spec + for k in r_df.index[~fail_idx]: + self.job_spec.pop(k, None) + + # remove output directories of failed jobs + for k in self.job_spec: + shutil.rmtree(os.path.join(localizer.environment('local')["CANINE_JOBS"], k)) + + return np.count_nonzero(~fail_idx) + except ValueError as e: + print(e) return 0 - # check if jobs are compatible: they must have identical inputs and index, - # and output columns must be matching - if not (r_df["inputs"].columns.isin(js_df.columns).all() and \ - js_df.columns.isin(r_df["inputs"].columns).all()): - raise ValueError("Cannot job avoid; set of input parameters do not match") - - # FIXME: removing stdout/stderr from output keys is a bug fix -- - # for some reason these aren't getting propagated to the output DF - output_temp = pd.Series(index = self.raw_outputs.keys() - {'stdout', 'stderr'}) - if not (r_df["outputs"].columns.isin(output_temp.index).all() and \ - output_temp.index.isin(r_df["outputs"].columns).all()): - raise ValueError("Cannot job avoid; sets of output parameters do not match") - - # check that values of inputs are the same - # we have to sort because the order of jobs might differ for the same - # inputs - sort_cols = r_df.columns.to_series()["inputs"] - r_df = r_df.sort_values(sort_cols.tolist()) - js_df = js_df.sort_values(sort_cols.index.tolist()) - - if not r_df["inputs"].equals(js_df): - raise ValueError("Cannot job avoid; values of input parameters do not match!") - - # if all is well, figure out which jobs need to be re-run - fail_idx = r_df[("job", "slurm_state")] == "FAILED" - self.df_avoided = r_df.loc[~fail_idx] - - # remove jobs that don't need to be re-run from job_spec - for k in r_df.index[~fail_idx]: - self.job_spec.pop(k, None) - - # remove output directories of failed jobs - for k in self.job_spec: - shutil.rmtree(os.path.join(localizer.environment('local')["CANINE_JOBS"], k)) - - return np.count_nonzero(~fail_idx) - # if the output directory exists but there's no output dataframe, assume # it's corrupted and remove it elif os.path.isdir(localizer.staging_dir): From 66de503dc0c6d63976389cee6ec06dc7164ae978 Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Sat, 25 Jan 2020 20:17:55 +0000 Subject: [PATCH 21/52] Print discrepant inputs/outputs --- canine/orchestrator.py | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/canine/orchestrator.py b/canine/orchestrator.py index eeef5abf..fc0c394b 100644 --- a/canine/orchestrator.py +++ b/canine/orchestrator.py @@ -447,23 +447,32 @@ def job_avoid(self, localizer, overwrite = False): #TODO: add params for type of js_df = pd.DataFrame.from_dict(self.job_spec, orient = "index").rename_axis(index = "_job_id") if r_df.empty: - print("Could not recover previous job results; overwriting output and aborting job avoidance.") - shutil.rmtree(localizer.staging_dir) - os.makedirs(localizer.staging_dir) - return 0 + raise ValueError("Could not recover previous job results!") # check if jobs are compatible: they must have identical inputs and index, # and output columns must be matching if not (r_df["inputs"].columns.isin(js_df.columns).all() and \ js_df.columns.isin(r_df["inputs"].columns).all()): - raise ValueError("Cannot job avoid; set of input parameters do not match") + r_df_set = set(r_df["inputs"].columns) + js_df_set = set(js_df.columns) + raise ValueError( + "Cannot job avoid; sets of input parameters do not match:\n" + \ + "\u21B3saved: " + ", ".join(r_df_set - js_df_set) + "\n" + \ + "\u21B3job: " + ", ".join(js_df_set - r_df_set) + ) # FIXME: removing stdout/stderr from output keys is a bug fix -- # for some reason these aren't getting propagated to the output DF output_temp = pd.Series(index = self.raw_outputs.keys() - {'stdout', 'stderr'}) if not (r_df["outputs"].columns.isin(output_temp.index).all() and \ output_temp.index.isin(r_df["outputs"].columns).all()): - raise ValueError("Cannot job avoid; sets of output parameters do not match") + r_df_set = set(r_df["outputs"].columns) + o_df_set = set(output_temp.index) + raise ValueError( + "Cannot job avoid; sets of output parameters do not match:\n" + \ + "\u21B3saved: " + ", ".join(r_df_set - o_df_set) + "\n" + \ + "\u21B3job: " + ", ".join(o_df_set - r_df_set) + ) # check that values of inputs are the same # we have to sort because the order of jobs might differ for the same @@ -490,6 +499,9 @@ def job_avoid(self, localizer, overwrite = False): #TODO: add params for type of return np.count_nonzero(~fail_idx) except ValueError as e: print(e) + print("Overwriting output and aborting job avoidance.") + shutil.rmtree(localizer.staging_dir) + os.makedirs(localizer.staging_dir) return 0 # if the output directory exists but there's no output dataframe, assume From 3f12853dcec0a2e907c93d68507f72ee8ae76fbf Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Fri, 24 Jan 2020 22:12:06 +0000 Subject: [PATCH 22/52] Catch ValueError exceptions in job_avoid() Otherwise, this will just fail --- canine/orchestrator.py | 94 ++++++++++++++++++++++-------------------- 1 file changed, 49 insertions(+), 45 deletions(-) diff --git a/canine/orchestrator.py b/canine/orchestrator.py index 4f6cb2b6..eeef5abf 100644 --- a/canine/orchestrator.py +++ b/canine/orchestrator.py @@ -441,53 +441,57 @@ def job_avoid(self, localizer, overwrite = False): #TODO: add params for type of # check for preexisting jobs' output if os.path.exists(df_path.localpath): - # load in results and job spec dataframes - r_df = pd.read_pickle(df_path.localpath) - js_df = pd.DataFrame.from_dict(self.job_spec, orient = "index").rename_axis(index = "_job_id") - - if r_df.empty: - print("Could not recover previous job results; overwriting output and aborting job avoidance.") - shutil.rmtree(localizer.staging_dir) - os.makedirs(localizer.staging_dir) + try: + # load in results and job spec dataframes + r_df = pd.read_pickle(df_path.localpath) + js_df = pd.DataFrame.from_dict(self.job_spec, orient = "index").rename_axis(index = "_job_id") + + if r_df.empty: + print("Could not recover previous job results; overwriting output and aborting job avoidance.") + shutil.rmtree(localizer.staging_dir) + os.makedirs(localizer.staging_dir) + return 0 + + # check if jobs are compatible: they must have identical inputs and index, + # and output columns must be matching + if not (r_df["inputs"].columns.isin(js_df.columns).all() and \ + js_df.columns.isin(r_df["inputs"].columns).all()): + raise ValueError("Cannot job avoid; set of input parameters do not match") + + # FIXME: removing stdout/stderr from output keys is a bug fix -- + # for some reason these aren't getting propagated to the output DF + output_temp = pd.Series(index = self.raw_outputs.keys() - {'stdout', 'stderr'}) + if not (r_df["outputs"].columns.isin(output_temp.index).all() and \ + output_temp.index.isin(r_df["outputs"].columns).all()): + raise ValueError("Cannot job avoid; sets of output parameters do not match") + + # check that values of inputs are the same + # we have to sort because the order of jobs might differ for the same + # inputs + sort_cols = r_df.columns.to_series()["inputs"] + r_df = r_df.sort_values(sort_cols.tolist()) + js_df = js_df.sort_values(sort_cols.index.tolist()) + + if not r_df["inputs"].equals(js_df): + raise ValueError("Cannot job avoid; values of input parameters do not match!") + + # if all is well, figure out which jobs need to be re-run + fail_idx = r_df[("job", "slurm_state")] == "FAILED" + self.df_avoided = r_df.loc[~fail_idx] + + # remove jobs that don't need to be re-run from job_spec + for k in r_df.index[~fail_idx]: + self.job_spec.pop(k, None) + + # remove output directories of failed jobs + for k in self.job_spec: + shutil.rmtree(os.path.join(localizer.environment('local')["CANINE_JOBS"], k)) + + return np.count_nonzero(~fail_idx) + except ValueError as e: + print(e) return 0 - # check if jobs are compatible: they must have identical inputs and index, - # and output columns must be matching - if not (r_df["inputs"].columns.isin(js_df.columns).all() and \ - js_df.columns.isin(r_df["inputs"].columns).all()): - raise ValueError("Cannot job avoid; set of input parameters do not match") - - # FIXME: removing stdout/stderr from output keys is a bug fix -- - # for some reason these aren't getting propagated to the output DF - output_temp = pd.Series(index = self.raw_outputs.keys() - {'stdout', 'stderr'}) - if not (r_df["outputs"].columns.isin(output_temp.index).all() and \ - output_temp.index.isin(r_df["outputs"].columns).all()): - raise ValueError("Cannot job avoid; sets of output parameters do not match") - - # check that values of inputs are the same - # we have to sort because the order of jobs might differ for the same - # inputs - sort_cols = r_df.columns.to_series()["inputs"] - r_df = r_df.sort_values(sort_cols.tolist()) - js_df = js_df.sort_values(sort_cols.index.tolist()) - - if not r_df["inputs"].equals(js_df): - raise ValueError("Cannot job avoid; values of input parameters do not match!") - - # if all is well, figure out which jobs need to be re-run - fail_idx = r_df[("job", "slurm_state")] == "FAILED" - self.df_avoided = r_df.loc[~fail_idx] - - # remove jobs that don't need to be re-run from job_spec - for k in r_df.index[~fail_idx]: - self.job_spec.pop(k, None) - - # remove output directories of failed jobs - for k in self.job_spec: - shutil.rmtree(os.path.join(localizer.environment('local')["CANINE_JOBS"], k)) - - return np.count_nonzero(~fail_idx) - # if the output directory exists but there's no output dataframe, assume # it's corrupted and remove it elif os.path.isdir(localizer.staging_dir): From 3ff14fcb5a8179ae2bca062e6edaa41d1fb5ba0e Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Sat, 25 Jan 2020 20:17:55 +0000 Subject: [PATCH 23/52] Print discrepant inputs/outputs --- canine/orchestrator.py | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/canine/orchestrator.py b/canine/orchestrator.py index eeef5abf..fc0c394b 100644 --- a/canine/orchestrator.py +++ b/canine/orchestrator.py @@ -447,23 +447,32 @@ def job_avoid(self, localizer, overwrite = False): #TODO: add params for type of js_df = pd.DataFrame.from_dict(self.job_spec, orient = "index").rename_axis(index = "_job_id") if r_df.empty: - print("Could not recover previous job results; overwriting output and aborting job avoidance.") - shutil.rmtree(localizer.staging_dir) - os.makedirs(localizer.staging_dir) - return 0 + raise ValueError("Could not recover previous job results!") # check if jobs are compatible: they must have identical inputs and index, # and output columns must be matching if not (r_df["inputs"].columns.isin(js_df.columns).all() and \ js_df.columns.isin(r_df["inputs"].columns).all()): - raise ValueError("Cannot job avoid; set of input parameters do not match") + r_df_set = set(r_df["inputs"].columns) + js_df_set = set(js_df.columns) + raise ValueError( + "Cannot job avoid; sets of input parameters do not match:\n" + \ + "\u21B3saved: " + ", ".join(r_df_set - js_df_set) + "\n" + \ + "\u21B3job: " + ", ".join(js_df_set - r_df_set) + ) # FIXME: removing stdout/stderr from output keys is a bug fix -- # for some reason these aren't getting propagated to the output DF output_temp = pd.Series(index = self.raw_outputs.keys() - {'stdout', 'stderr'}) if not (r_df["outputs"].columns.isin(output_temp.index).all() and \ output_temp.index.isin(r_df["outputs"].columns).all()): - raise ValueError("Cannot job avoid; sets of output parameters do not match") + r_df_set = set(r_df["outputs"].columns) + o_df_set = set(output_temp.index) + raise ValueError( + "Cannot job avoid; sets of output parameters do not match:\n" + \ + "\u21B3saved: " + ", ".join(r_df_set - o_df_set) + "\n" + \ + "\u21B3job: " + ", ".join(o_df_set - r_df_set) + ) # check that values of inputs are the same # we have to sort because the order of jobs might differ for the same @@ -490,6 +499,9 @@ def job_avoid(self, localizer, overwrite = False): #TODO: add params for type of return np.count_nonzero(~fail_idx) except ValueError as e: print(e) + print("Overwriting output and aborting job avoidance.") + shutil.rmtree(localizer.staging_dir) + os.makedirs(localizer.staging_dir) return 0 # if the output directory exists but there's no output dataframe, assume From 32d51729561a1ca7ab00ced79407c7e580c79244 Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Mon, 27 Jan 2020 02:34:25 +0000 Subject: [PATCH 24/52] Prevent race conditions when saving dataframe Prepend UUID to saved dataframe filename before it gets localized -- otherwise, if we have multiple orchestrators active simultaneously, they will step on each others' outputs. --- canine/orchestrator.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/canine/orchestrator.py b/canine/orchestrator.py index fc0c394b..d5a64366 100644 --- a/canine/orchestrator.py +++ b/canine/orchestrator.py @@ -3,6 +3,7 @@ import time import shutil import sys +import uuid import warnings import traceback from subprocess import CalledProcessError @@ -402,7 +403,7 @@ def make_output_DF(self, batch_id, outputs, cpu_time, prev_acct, localizer = Non # save DF to disk if isinstance(localizer, AbstractLocalizer): - fname = "results.k9df.pickle" + fname = str(uuid.uuid4()) + "-results.k9df.pickle" df.to_pickle(fname) localizer.localize_file(fname, localizer.reserve_path(localizer.staging_dir, "results.k9df.pickle")) os.remove(fname) From 138c65d16d02d69848ff7b0f16e8a00295de1ce9 Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Mon, 27 Jan 2020 03:10:17 +0000 Subject: [PATCH 25/52] Output dataframe returns CPU time in seconds --- canine/orchestrator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/canine/orchestrator.py b/canine/orchestrator.py index d5a64366..6cc18eb3 100644 --- a/canine/orchestrator.py +++ b/canine/orchestrator.py @@ -371,9 +371,9 @@ def make_output_DF(self, batch_id, outputs, cpu_time, prev_acct, localizer = Non job_id: { ('job', 'slurm_state'): acct['State'][batch_id+'_'+str(array_id)], ('job', 'exit_code'): acct['ExitCode'][batch_id+'_'+str(array_id)], - ('job', 'cpu_hours'): (prev_acct['CPUTimeRAW'][batch_id+'_'+str(array_id)] + ( + ('job', 'cpu_seconds'): (prev_acct['CPUTimeRAW'][batch_id+'_'+str(array_id)] + ( cpu_time[batch_id+'_'+str(array_id)] if batch_id+'_'+str(array_id) in cpu_time else 0 - ))/3600 if prev_acct is not None else -1, + )) if prev_acct is not None else -1, **{ ('inputs', key) : val for key, val in self.job_spec[job_id].items() }, **{ ('outputs', key) : val[0] if isinstance(val, list) and len(val) == 1 else val From 87b78337550b82f7b81e07b342e1ff0936b1627c Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Mon, 27 Jan 2020 15:02:53 +0000 Subject: [PATCH 26/52] Pass inputs to df through shlex --- canine/localization/nfs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/canine/localization/nfs.py b/canine/localization/nfs.py index 692b7890..5d027a30 100644 --- a/canine/localization/nfs.py +++ b/canine/localization/nfs.py @@ -285,7 +285,7 @@ def same_volume(self, *args): """ vols = subprocess.check_output( "df {} | awk 'NR > 1 {{ print $1 }}'".format( - " ".join([self.mount_path, self.local_dir, *args]) + " ".join([shlex.quote(x) for x in [self.mount_path, self.local_dir, *args]]) ), shell = True ) From 27f81aed7e7677ff776568f19b8483218768bed3 Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Mon, 27 Jan 2020 16:45:12 +0000 Subject: [PATCH 27/52] Forgot to replace hours -> seconds in a couple places --- canine/orchestrator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/canine/orchestrator.py b/canine/orchestrator.py index 6cc18eb3..50253e30 100644 --- a/canine/orchestrator.py +++ b/canine/orchestrator.py @@ -278,7 +278,7 @@ def run_pipeline(self, output_dir: str = 'canine_output', dry_run: bool = False) runtime/3600, node_uptime=sum(uptime.values())/120 )[0]) - job_cost = self.backend.estimate_cost(job_cpu_time=df['cpu_hours'].to_dict())[1] + job_cost = self.backend.estimate_cost(job_cpu_time=df['cpu_seconds'].to_dict())[1] df['est_cost'] = [job_cost[job_id] for job_id in df.index] if job_cost is not None else [0] * len(df) except: traceback.print_exc() @@ -383,7 +383,7 @@ def make_output_DF(self, batch_id, outputs, cpu_time, prev_acct, localizer = Non for array_id, job_id in enumerate(self.job_spec) }, orient = "index" - ).rename_axis(index = "_job_id").astype({('job', 'cpu_hours'): int}) + ).rename_axis(index = "_job_id").astype({('job', 'cpu_seconds'): int}) # # apply functions to output columns (if any) From 40ded7e5e09c0d5e758e50e162bb61e6a67b5cac Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Tue, 28 Jan 2020 16:20:38 +0000 Subject: [PATCH 28/52] Add more conditions for not job avoiding --- canine/orchestrator.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/canine/orchestrator.py b/canine/orchestrator.py index d5a64366..8cc73bf0 100644 --- a/canine/orchestrator.py +++ b/canine/orchestrator.py @@ -447,7 +447,9 @@ def job_avoid(self, localizer, overwrite = False): #TODO: add params for type of r_df = pd.read_pickle(df_path.localpath) js_df = pd.DataFrame.from_dict(self.job_spec, orient = "index").rename_axis(index = "_job_id") - if r_df.empty: + if r_df.empty or \ + "inputs" not in r_df or \ + ("outputs" not in r_df and len(self.raw_outputs) > 0): raise ValueError("Could not recover previous job results!") # check if jobs are compatible: they must have identical inputs and index, From 8bcfb45cfcd1d58b7189bccc03b97ca831b3bcc3 Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Tue, 28 Jan 2020 17:24:44 +0000 Subject: [PATCH 29/52] Make exception for canine outputs This is intended for files that the user intentionally specified as absolute paths. Although Canine outputs are given as absolute paths, these are not intentional. --- canine/localization/nfs.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/canine/localization/nfs.py b/canine/localization/nfs.py index 692b7890..468c3cc1 100644 --- a/canine/localization/nfs.py +++ b/canine/localization/nfs.py @@ -110,7 +110,7 @@ def localize(self, inputs: typing.Dict[str, typing.Dict[str, str]], patterns: ty overrides = {} # automatically override inputs that are absolute paths residing on the same - # NFS share + # NFS share and are not Canine outputs # XXX: this can be potentially slow, since it has to iterate over every # single input. It would make more sense to do this before the adapter @@ -118,7 +118,8 @@ def localize(self, inputs: typing.Dict[str, typing.Dict[str, str]], patterns: ty for input_dict in inputs.values(): for k, v in input_dict.items(): if k not in overrides: - if re.match(r"^/", v) is not None and self.same_volume(v): + if re.match(r"^/", v) is not None and self.same_volume(v) and \ + re.match(r".*/outputs/\d+/.*?/[^/]+$", v) is None: overrides[k] = None overrides = {k:v.lower() if isinstance(v, str) else None for k,v in overrides.items()} From 0641679bc33b99a20111eb93aefe3ad52fb1e86f Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Thu, 30 Jan 2020 20:39:34 +0000 Subject: [PATCH 30/52] Kill straggling jobs upon cluster exit We also re-order the Docker backend's stop() method to take advantage of this -- we cannot stop any nodes before the container has halted. --- canine/backends/dockerTransient.py | 17 ++++++++++------- canine/backends/imageTransient.py | 10 ++++++++-- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/canine/backends/dockerTransient.py b/canine/backends/dockerTransient.py index 2c189e4b..87246b21 100644 --- a/canine/backends/dockerTransient.py +++ b/canine/backends/dockerTransient.py @@ -167,10 +167,6 @@ def init_nodes(self): ]) def stop(self): - # stop the Docker - if self.container is not None: - self.container().stop() - # delete node configuration file subprocess.check_call("rm -f /mnt/nfs/clust_conf/canine/backend_conf.pickle", shell = True) @@ -184,16 +180,23 @@ def stop(self): # self.config["action_on_stop"] is set super().stop() - # we handle the NFS separately - self.nodes = allnodes.loc[allnodes["machine_type"] == "nfs"] - super().stop(action_on_stop = self.config["nfs_action_on_stop"]) + # stop the Docker + if self.container is not None: + self.container().stop() + # unmount the NFS (this needs to be the last step, since Docker will + # hang if NFS is pulled out from under it) if self.config["nfs_action_on_stop"] != "run": try: subprocess.check_call("sudo umount -f /mnt/nfs", shell = True) except subprocess.CalledProcessError: print("Could not unmount NFS (do you have open files on it?)\nPlease run `lsof | grep /mnt/nfs`, close any open files, and run `sudo umount -f /mnt/nfs` before attempting to run another pipeline.") + # superclass method will stop/delete/leave the NFS running, depending on + # how self.config["nfs_action_on_stop"] is set. + self.nodes = allnodes.loc[allnodes["machine_type"] == "nfs"] + super().stop(action_on_stop = self.config["nfs_action_on_stop"], kill_straggling_jobs = False) + def _get_container(self, container_name): def closure(): return self.dkr.containers.get(container_name) diff --git a/canine/backends/imageTransient.py b/canine/backends/imageTransient.py index 89bede37..3fdabd34 100644 --- a/canine/backends/imageTransient.py +++ b/canine/backends/imageTransient.py @@ -257,7 +257,7 @@ def init_nodes(self): print("WARNING: couldn't shutdown instance {}".format(node), file = sys.stderr) print(e) - def stop(self, action_on_stop = None): + def stop(self, action_on_stop = None, kill_straggling_jobs = True): """ Delete or stop (default) compute instances """ @@ -265,8 +265,14 @@ def stop(self, action_on_stop = None): action_on_stop = self.config["action_on_stop"] # - # stop, delete, or leave running compute nodes + # kill any still-running jobs + if kill_straggling_jobs: + self.scancel(jobID = "", state = "RUNNING") + self.scancel(jobID = "", state = "PENDING") + self.scancel(jobID = "", state = "SUSPENDED") + # + # stop, delete, or leave running compute nodes for node in self.nodes.index: try: if action_on_stop == "delete": From cdbfe23eb8c17e4880665c1ac63b4a4a66e48f77 Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Thu, 30 Jan 2020 20:40:50 +0000 Subject: [PATCH 31/52] Error checking in invoke method This should be cherry-picked to the docker branch --- canine/backends/dockerTransient.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/canine/backends/dockerTransient.py b/canine/backends/dockerTransient.py index 87246b21..be5b7d17 100644 --- a/canine/backends/dockerTransient.py +++ b/canine/backends/dockerTransient.py @@ -273,10 +273,13 @@ def get_latest_image(self, image_family = None): return gce.images().getFromFamily(family = image_family, project = self.config["project"]).execute() def invoke(self, command, interactive = False): - return_code, (stdout, stderr) = self.container().exec_run( - command, demux = True, tty = interactive, stdin = interactive - ) - return (return_code, io.BytesIO(stdout), io.BytesIO(stderr)) + if self.container is not None and self.container().status == "running": + return_code, (stdout, stderr) = self.container().exec_run( + command, demux = True, tty = interactive, stdin = interactive + ) + return (return_code, io.BytesIO(stdout), io.BytesIO(stderr)) + else: + return (1, io.BytesIO(), io.BytesIO(b"Container is not running!")) # }}} From c6fb82ce4667e2a42a9cc77285e82b5c17d34e09 Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Thu, 30 Jan 2020 23:10:25 +0000 Subject: [PATCH 32/52] Update default compute scripts to new path --- canine/backends/dockerTransient.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/canine/backends/dockerTransient.py b/canine/backends/dockerTransient.py index e6b46735..3e3cc6eb 100644 --- a/canine/backends/dockerTransient.py +++ b/canine/backends/dockerTransient.py @@ -19,8 +19,8 @@ class DockerTransientImageSlurmBackend(TransientImageSlurmBackend): # {{{ def __init__( - self, nfs_compute_script = "/usr/local/share/cga_pipeline/src/provision_storage_container_host.sh", - compute_script = "/usr/local/share/cga_pipeline/src/provision_worker_container_host.sh", + self, nfs_compute_script = "/usr/local/share/slurm_gcp_docker/src/provision_storage_container_host.sh", + compute_script = "/usr/local/share/slurm_gcp_docker/src/provision_worker_container_host.sh", nfs_disk_size = 2000, nfs_disk_type = "pd-standard", nfs_action_on_stop = "stop", nfs_image = "", action_on_stop = "delete", image_family = "pydpiper", image = None, cluster_name = None, clust_frac = 0.01, user = os.environ["USER"], **kwargs From 75c3820ef36751992ae93f3e849545f60bfcf473 Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Mon, 3 Feb 2020 16:35:18 +0000 Subject: [PATCH 33/52] Add NFS monitoring thread --- canine/backends/dockerTransient.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/canine/backends/dockerTransient.py b/canine/backends/dockerTransient.py index 3e3cc6eb..d1e6ed81 100644 --- a/canine/backends/dockerTransient.py +++ b/canine/backends/dockerTransient.py @@ -11,6 +11,8 @@ import io import pickle import math +import threading +import time from .imageTransient import TransientImageSlurmBackend, list_instances, gce from ..utils import get_default_gcp_project, gcp_hourly_cost @@ -70,6 +72,8 @@ def __init__( self.NFS_server_ready = False self.NFS_ready = False + self.NFS_monitor_thread = None + self.NFS_monitor_lock = None def init_slurm(self): self.dkr = docker.from_env() @@ -184,7 +188,12 @@ def stop(self): # self.config["action_on_stop"] is set super().stop() + # # we handle the NFS separately + + # kill thread monitoring NFS to auto-restart it + self.NFS_monitor_lock.set() + self.nodes = allnodes.loc[allnodes["machine_type"] == "nfs"] super().stop(action_on_stop = self.config["nfs_action_on_stop"]) @@ -246,6 +255,14 @@ def start_NFS(self): ) print("done", flush = True) + # start NFS monitoring thread + self.NFS_monitor_lock = threading.Event() + self.NFS_monitor_thread = threading.Thread( + target = self.autorestart_preempted_node, + args = (nfs_nodename,) + ) + self.NFS_monitor_thread.start() + self.NFS_server_ready = True def mount_NFS(self): @@ -275,6 +292,14 @@ def invoke(self, command, interactive = False): ) return (return_code, io.BytesIO(stdout), io.BytesIO(stderr)) + def autorestart_preempted_node(self, nodename): + while not self.NFS_monitor_lock.is_set(): + inst_details = self._pzw(gce.instances().get)(instance = nodename).execute() + if inst_details["status"] != "RUNNING": + self._pzw(gce.instances().start)(instance = nodename).execute() + + time.sleep(60) + # }}} # Python version of checks in docker_run.sh From 477d3c0767ce9b339e47ff38ec837b4bee9e39d6 Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Mon, 3 Feb 2020 16:37:04 +0000 Subject: [PATCH 34/52] Pass elastic parameter to wait_for_cluster_ready --- canine/backends/dockerTransient.py | 2 +- canine/backends/imageTransient.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/canine/backends/dockerTransient.py b/canine/backends/dockerTransient.py index d1e6ed81..8b5fd9d8 100644 --- a/canine/backends/dockerTransient.py +++ b/canine/backends/dockerTransient.py @@ -135,7 +135,7 @@ def init_nodes(self): if not self.NFS_ready: raise Exception("NFS must be mounted before starting nodes!") - self.wait_for_cluster_ready() + self.wait_for_cluster_ready(elastic = True) # list all the nodes that Slurm is aware of diff --git a/canine/backends/imageTransient.py b/canine/backends/imageTransient.py index 465ff48c..6001e947 100644 --- a/canine/backends/imageTransient.py +++ b/canine/backends/imageTransient.py @@ -292,11 +292,11 @@ def list_instances_all_zones(self): for x in zone_dict["items"] ], axis = 0).reset_index(drop = True) - def wait_for_cluster_ready(self): + def wait_for_cluster_ready(self, elastic = False): """ Blocks until the main partition is marked as up """ - super().wait_for_cluster_ready(elastic = False) + super().wait_for_cluster_ready(elastic = elastic) # a handy wrapper to automatically add this instance's project and zone to # GCP API calls From dffbfce533751b2aa7818dc1831341afdaa3a389 Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Mon, 3 Feb 2020 17:08:53 +0000 Subject: [PATCH 35/52] Canine output folders can be nested now --- canine/localization/nfs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/canine/localization/nfs.py b/canine/localization/nfs.py index 2b8ee533..967948c8 100644 --- a/canine/localization/nfs.py +++ b/canine/localization/nfs.py @@ -119,7 +119,7 @@ def localize(self, inputs: typing.Dict[str, typing.Dict[str, str]], patterns: ty for k, v in input_dict.items(): if k not in overrides: if re.match(r"^/", v) is not None and self.same_volume(v) and \ - re.match(r".*/outputs/\d+/.*?/[^/]+$", v) is None: + re.match(r".*/outputs/\d+/.*", v) is None: overrides[k] = None overrides = {k:v.lower() if isinstance(v, str) else None for k,v in overrides.items()} From 1ba9bacf7b156a8eb760af2f745aa64ae5337eca Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Mon, 3 Feb 2020 17:51:11 +0000 Subject: [PATCH 36/52] Sometimes the GCE API hangs; catch this exception --- canine/backends/dockerTransient.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/canine/backends/dockerTransient.py b/canine/backends/dockerTransient.py index 8b5fd9d8..5b44cdff 100644 --- a/canine/backends/dockerTransient.py +++ b/canine/backends/dockerTransient.py @@ -294,9 +294,12 @@ def invoke(self, command, interactive = False): def autorestart_preempted_node(self, nodename): while not self.NFS_monitor_lock.is_set(): - inst_details = self._pzw(gce.instances().get)(instance = nodename).execute() - if inst_details["status"] != "RUNNING": - self._pzw(gce.instances().start)(instance = nodename).execute() + try: + inst_details = self._pzw(gce.instances().get)(instance = nodename).execute() + if inst_details["status"] != "RUNNING": + self._pzw(gce.instances().start)(instance = nodename).execute() + except: + print("Error querying NFS server status; retrying in 60s ...", file = sys.stderr) time.sleep(60) From 99c9ab6751c1c5ac243f7936e7c1b4246e830659 Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Mon, 3 Feb 2020 21:40:20 +0000 Subject: [PATCH 37/52] Support Pandas dataframes and series for input --- canine/orchestrator.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/canine/orchestrator.py b/canine/orchestrator.py index c9634db0..0cc5e12f 100644 --- a/canine/orchestrator.py +++ b/canine/orchestrator.py @@ -73,6 +73,14 @@ def stringify(obj: typing.Any) -> typing.Any: key:Orchestrator.stringify(val) for key, val in obj.items() } + elif isinstance(obj, pd.core.series.Series): + return [ + Orchestrator.stringify(elem) + for elem in obj.tolist() + ] + elif isinstance(obj, pd.core.frame.DataFrame): + return Orchestrator.stringify(obj.to_dict(orient = "list")) + return str(obj) @staticmethod @@ -104,7 +112,12 @@ def fill_config(cfg: typing.Union[str, typing.Dict[str, typing.Any]]) -> typing. return cfg - def __init__(self, config: typing.Union[str, typing.Dict[str, typing.Any]]): + def __init__(self, config: typing.Union[ + str, + typing.Dict[str, typing.Any], + pd.core.frame.DataFrame, + pd.core.series.Series + ]): """ Initializes the Orchestrator from a given config """ From f34844274df5da2eab21ee4c2197255a247c8c83 Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Mon, 3 Feb 2020 23:13:34 +0000 Subject: [PATCH 38/52] Handle errors from scancel --- canine/backends/imageTransient.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/canine/backends/imageTransient.py b/canine/backends/imageTransient.py index 19a3a7c6..206bfba4 100644 --- a/canine/backends/imageTransient.py +++ b/canine/backends/imageTransient.py @@ -267,9 +267,13 @@ def stop(self, action_on_stop = None, kill_straggling_jobs = True): # # kill any still-running jobs if kill_straggling_jobs: - self.scancel(jobID = "", state = "RUNNING") - self.scancel(jobID = "", state = "PENDING") - self.scancel(jobID = "", state = "SUSPENDED") + for state in ["RUNNING", "PENDING", "SUSPENDED"]: + try: + self.scancel(jobID = "", state = state) + except: + # TODO: in verbose logging mode, print that no jobs in this + # state could be killed + pass # # stop, delete, or leave running compute nodes From 5a47ad75501f6814ffd2a64bc92b081d5ff1f481 Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Tue, 4 Feb 2020 00:44:33 +0000 Subject: [PATCH 39/52] More robust cancelling of stragglers Rather than cancelling by state, cancel based on user - For some reason, cancelling by state was occasionally failing Wait up to 60 seconds for all jobs to have stopped -- scancel does not block --- canine/backends/imageTransient.py | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/canine/backends/imageTransient.py b/canine/backends/imageTransient.py index e785400e..0e8a8fc3 100644 --- a/canine/backends/imageTransient.py +++ b/canine/backends/imageTransient.py @@ -1,5 +1,6 @@ # vim: set expandtab: +import time import typing import subprocess import os @@ -267,9 +268,21 @@ def stop(self, action_on_stop = None, kill_straggling_jobs = True): # # kill any still-running jobs if kill_straggling_jobs: - self.scancel(jobID = "", state = "RUNNING") - self.scancel(jobID = "", state = "PENDING") - self.scancel(jobID = "", state = "SUSPENDED") + try: + self.scancel(jobID = "", user = self.config["user"]) + except: + # how do we handle this error? + pass + + # wait for jobs to finish + print("Terminating all jobs ... ", end = "", flush = True) + tot_time = 0 + while True: + if self.squeue().empty or tot_time > 60: + break + tot_time += 1 + time.sleep(1) + print("done") # # stop, delete, or leave running compute nodes From 0e9582d5cc377c4a2b11907f2f7342fb4a161738 Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Tue, 4 Feb 2020 12:51:25 +0000 Subject: [PATCH 40/52] Remove stdout/stderr hack These are now included in the output DF, so there's no reason to exclude them from field comparison. --- canine/orchestrator.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/canine/orchestrator.py b/canine/orchestrator.py index 0cc5e12f..1c547127 100644 --- a/canine/orchestrator.py +++ b/canine/orchestrator.py @@ -472,20 +472,18 @@ def job_avoid(self, localizer, overwrite = False): #TODO: add params for type of r_df_set = set(r_df["inputs"].columns) js_df_set = set(js_df.columns) raise ValueError( - "Cannot job avoid; sets of input parameters do not match:\n" + \ + "Cannot job avoid; sets of input parameters do not match! Parameters unique to:\n" + \ "\u21B3saved: " + ", ".join(r_df_set - js_df_set) + "\n" + \ "\u21B3job: " + ", ".join(js_df_set - r_df_set) ) - # FIXME: removing stdout/stderr from output keys is a bug fix -- - # for some reason these aren't getting propagated to the output DF - output_temp = pd.Series(index = self.raw_outputs.keys() - {'stdout', 'stderr'}) + output_temp = pd.Series(index = self.raw_outputs.keys()) if not (r_df["outputs"].columns.isin(output_temp.index).all() and \ output_temp.index.isin(r_df["outputs"].columns).all()): r_df_set = set(r_df["outputs"].columns) o_df_set = set(output_temp.index) raise ValueError( - "Cannot job avoid; sets of output parameters do not match:\n" + \ + "Cannot job avoid; sets of output parameters do not match! Parameters unique to:\n" + \ "\u21B3saved: " + ", ".join(r_df_set - o_df_set) + "\n" + \ "\u21B3job: " + ", ".join(o_df_set - r_df_set) ) From fef7a30ba4f7656e2ee76ba1315e9c91d8fd0b8e Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Wed, 5 Feb 2020 00:54:47 +0000 Subject: [PATCH 41/52] Catch exceptions from Google's API's random failures --- canine/backends/dockerTransient.py | 30 ++++++++++++++++++++++-------- canine/backends/imageTransient.py | 5 +++++ 2 files changed, 27 insertions(+), 8 deletions(-) diff --git a/canine/backends/dockerTransient.py b/canine/backends/dockerTransient.py index 713b3d52..6c169fc7 100644 --- a/canine/backends/dockerTransient.py +++ b/canine/backends/dockerTransient.py @@ -174,23 +174,37 @@ def stop(self): # delete node configuration file subprocess.check_call("rm -f /mnt/nfs/clust_conf/canine/backend_conf.pickle", shell = True) - # get list of nodes that still exist + # + # shutdown nodes that are still running (except NFS) allnodes = self.nodes - extant_nodes = self.list_instances_all_zones() - self.nodes = allnodes.loc[allnodes.index.isin(extant_nodes["name"]) & - (allnodes["machine_type"] != "nfs")] + + # sometimes the Google API will spectacularly fail; in that case, we + # just try to shutdown everything in the node list, regardless of whether + # it exists. + try: + extant_nodes = self.list_instances_all_zones() + self.nodes = allnodes.loc[allnodes.index.isin(extant_nodes["name"]) & + (allnodes["machine_type"] != "nfs")] + except: + self.nodes = allnodes.loc[allnodes["machine_type"] != "nfs"] # superclass method will stop/delete/leave these running, depending on how # self.config["action_on_stop"] is set super().stop() - # stop the Docker (needs to happen after super().stop() is invoked, - # which calls scancel, which in turn requires a running Slurm controller Docker + # + # stop the Docker + + # this needs to happen after super().stop() is invoked, since that + # calls scancel, which in turn requires a running Slurm controller Docker if self.container is not None: self.container().stop() - # unmount the NFS (this needs to be the last step, since Docker will - # hang if NFS is pulled out from under it) + # + # unmount the NFS + + # this needs to be the last step, since Docker will hang if NFS is pulled + # out from under it if self.config["nfs_action_on_stop"] != "run": try: subprocess.check_call("sudo umount -f /mnt/nfs", shell = True) diff --git a/canine/backends/imageTransient.py b/canine/backends/imageTransient.py index 97ee66de..668c6ff0 100644 --- a/canine/backends/imageTransient.py +++ b/canine/backends/imageTransient.py @@ -10,6 +10,7 @@ from ..utils import get_default_gcp_project, gcp_hourly_cost import googleapiclient.discovery as gd +import googleapiclient.errors import pandas as pd gce = gd.build('compute', 'v1') @@ -296,6 +297,10 @@ def stop(self, action_on_stop = None, kill_straggling_jobs = True): else: # default behavior is to shut down self._pzw(gce.instances().stop)(instance = node).execute() + except googleapiclient.errors.HttpError as e: + if e.resp != 404: + print("WARNING: couldn't shutdown instance {}".format(node), file = sys.stderr) + print(e) except Exception as e: print("WARNING: couldn't shutdown instance {}".format(node), file = sys.stderr) print(e) From 964dc96ad8f864a4dafa718aa71c583db26dd271 Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Wed, 5 Feb 2020 00:55:34 +0000 Subject: [PATCH 42/52] Wrap whole cancellation step in try block --- canine/backends/imageTransient.py | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/canine/backends/imageTransient.py b/canine/backends/imageTransient.py index 668c6ff0..665baf5e 100644 --- a/canine/backends/imageTransient.py +++ b/canine/backends/imageTransient.py @@ -271,19 +271,19 @@ def stop(self, action_on_stop = None, kill_straggling_jobs = True): if kill_straggling_jobs: try: self.scancel(jobID = "", user = self.config["user"]) - except: - # how do we handle this error? - pass - - # wait for jobs to finish - print("Terminating all jobs ... ", end = "", flush = True) - tot_time = 0 - while True: - if self.squeue().empty or tot_time > 60: - break - tot_time += 1 - time.sleep(1) - print("done") + + # wait for jobs to finish + print("Terminating all jobs ... ", end = "", flush = True) + tot_time = 0 + while True: + if self.squeue().empty or tot_time > 60: + break + tot_time += 1 + time.sleep(1) + print("done") + except Exception as e: + print("Error terminating all jobs!", file = sys.stderr) + print(e, file = sys.stderr) # # stop, delete, or leave running compute nodes From 3dd94de2363f007550ff4f547348292f0ed9c2c0 Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Thu, 6 Feb 2020 01:40:58 +0000 Subject: [PATCH 43/52] Catch exception from deleting node config file If NFS got preempted before workflow exits, this could raise an error. --- canine/backends/dockerTransient.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/canine/backends/dockerTransient.py b/canine/backends/dockerTransient.py index 6c169fc7..63797d5d 100644 --- a/canine/backends/dockerTransient.py +++ b/canine/backends/dockerTransient.py @@ -172,7 +172,11 @@ def init_nodes(self): def stop(self): # delete node configuration file - subprocess.check_call("rm -f /mnt/nfs/clust_conf/canine/backend_conf.pickle", shell = True) + try: + subprocess.check_call("rm -f /mnt/nfs/clust_conf/canine/backend_conf.pickle", shell = True) + except subprocess.CalledProcessError as e: + print("Couldn't delete node configuration file:", file = sys.stderr) + print(e) # # shutdown nodes that are still running (except NFS) From d039eed574b3f6cca353fa08cec898816b0745f8 Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Tue, 11 Feb 2020 18:58:52 +0000 Subject: [PATCH 44/52] Retry removing job shard directories Sometimes, shutil.rmtree will hang due to NFS lag. Retry five times, then abort, which causes the entire task directory to be removed. --- canine/orchestrator.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/canine/orchestrator.py b/canine/orchestrator.py index 9910ba25..d53b6c26 100644 --- a/canine/orchestrator.py +++ b/canine/orchestrator.py @@ -508,7 +508,19 @@ def job_avoid(self, localizer, overwrite = False): #TODO: add params for type of # remove output directories of failed jobs for k in self.job_spec: - shutil.rmtree(os.path.join(localizer.environment('local')["CANINE_JOBS"], k)) + # sometimes, rmtree will hang due to NFS lag. retry five times, + # then abort + tries = 0 + while True: + try: + shutil.rmtree(os.path.join(localizer.environment('local')["CANINE_JOBS"], k)) + break + except FileNotFoundError: + time.sleep(5) + + if tries >= 4: + raise ValueError("Cannot partially job avoid; error removing job directory!") + tries += 1 return np.count_nonzero(~fail_idx) except ValueError as e: From e02d6a73ffb93442ca5c70d9cd4345f780b7a2c6 Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Wed, 12 Feb 2020 00:38:58 +0000 Subject: [PATCH 45/52] Remove common inputs dir. if job avoidance occurs Otherwise, the localizer will try (and fail) to save common inputs to the common directory. If job avoidance occurs, this directory will already be present and thus crash the localizer. Removing it wholesale is somewhat hacky, but it's a lot more straightforward than modifying the orchestrator to coordinate this with the localizer. --- canine/orchestrator.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/canine/orchestrator.py b/canine/orchestrator.py index d53b6c26..82f50474 100644 --- a/canine/orchestrator.py +++ b/canine/orchestrator.py @@ -522,6 +522,11 @@ def job_avoid(self, localizer, overwrite = False): #TODO: add params for type of raise ValueError("Cannot partially job avoid; error removing job directory!") tries += 1 + # we also have to remove the common inputs directory, so that + # the localizer can regenerate it + if len(self.job_spec) > 0: + shutil.rmtree(os.path.join(localizer.environment('local')["CANINE_JOBS"], "common")) + return np.count_nonzero(~fail_idx) except ValueError as e: print(e) From 631b4abff0f896f84a4aec89f1489280c45fdade Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Wed, 12 Feb 2020 20:47:58 +0000 Subject: [PATCH 46/52] Retry all rmtree attempts --- canine/orchestrator.py | 27 +++++++-------------------- canine/utils.py | 20 ++++++++++++++++++++ 2 files changed, 27 insertions(+), 20 deletions(-) diff --git a/canine/orchestrator.py b/canine/orchestrator.py index 82f50474..08bc6687 100644 --- a/canine/orchestrator.py +++ b/canine/orchestrator.py @@ -1,7 +1,6 @@ import typing import os import time -import shutil import sys import uuid import warnings @@ -10,7 +9,7 @@ from .adapters import AbstractAdapter, ManualAdapter, FirecloudAdapter from .backends import AbstractSlurmBackend, LocalSlurmBackend, RemoteSlurmBackend, TransientGCPSlurmBackend, TransientImageSlurmBackend from .localization import AbstractLocalizer, BatchedLocalizer, LocalLocalizer, RemoteLocalizer, NFSLocalizer -from .utils import check_call +from .utils import check_call, rmtree_retry import yaml import numpy as np import pandas as pd @@ -449,7 +448,7 @@ def job_avoid(self, localizer, overwrite = False): #TODO: add params for type of # remove all output if specified if overwrite: if os.path.isdir(localizer.staging_dir): - shutil.rmtree(localizer.staging_dir) + rmtree_retry(localizer.staging_dir) os.makedirs(localizer.staging_dir) return 0 @@ -508,37 +507,25 @@ def job_avoid(self, localizer, overwrite = False): #TODO: add params for type of # remove output directories of failed jobs for k in self.job_spec: - # sometimes, rmtree will hang due to NFS lag. retry five times, - # then abort - tries = 0 - while True: - try: - shutil.rmtree(os.path.join(localizer.environment('local')["CANINE_JOBS"], k)) - break - except FileNotFoundError: - time.sleep(5) - - if tries >= 4: - raise ValueError("Cannot partially job avoid; error removing job directory!") - tries += 1 + rmtree_retry(os.path.join(localizer.environment('local')["CANINE_JOBS"], k)) # we also have to remove the common inputs directory, so that # the localizer can regenerate it if len(self.job_spec) > 0: - shutil.rmtree(os.path.join(localizer.environment('local')["CANINE_JOBS"], "common")) + rmtree_retry(os.path.join(localizer.environment('local')["CANINE_JOBS"], "common")) return np.count_nonzero(~fail_idx) - except ValueError as e: + except (ValueError, OSError) as e: print(e) print("Overwriting output and aborting job avoidance.") - shutil.rmtree(localizer.staging_dir) + rmtree_retry(localizer.staging_dir) os.makedirs(localizer.staging_dir) return 0 # if the output directory exists but there's no output dataframe, assume # it's corrupted and remove it elif os.path.isdir(localizer.staging_dir): - shutil.rmtree(localizer.staging_dir) + rmtree_retry(localizer.staging_dir) os.makedirs(localizer.staging_dir) return 0 diff --git a/canine/utils.py b/canine/utils.py index 3c27352a..5fa18cca 100644 --- a/canine/utils.py +++ b/canine/utils.py @@ -10,6 +10,8 @@ from subprocess import CalledProcessError import google.auth import paramiko +import shutil +import time class ArgumentHelper(dict): """ @@ -264,3 +266,21 @@ def gcp_hourly_cost(mtype: str, preemptible: bool = False, ssd_size: int = 0, hd else (gpu_pricing[gpu_type][1 if preemptible else 0] * gpu_count) ) ) + +def rmtree_retry(path, max_tries = 5, timeout = 5): + """ + Repeatedly invoke shutil.rmtree to remove stubborn directories. Sometimes, + rmtree will hang due to NFS lag. Retry up to a default of five times with a + default of five seconds between attempts, then raise an OSError. + """ + n_tries = 0 + while True: + try: + shutil.rmtree(path) + break + except FileNotFoundError: + time.sleep(timeout) + + if n_tries >= max_tries - 1: + raise OSError("Cannot remove " + path) + n_tries += 1 From c33a6e510bc02c8ada299c45fbf21caf31a26302 Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Fri, 7 Feb 2020 18:05:09 +0000 Subject: [PATCH 47/52] Change GCP image family/Docker image names --- canine/backends/dockerTransient.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/canine/backends/dockerTransient.py b/canine/backends/dockerTransient.py index 63797d5d..6f061332 100644 --- a/canine/backends/dockerTransient.py +++ b/canine/backends/dockerTransient.py @@ -24,7 +24,7 @@ def __init__( self, nfs_compute_script = "/usr/local/share/slurm_gcp_docker/src/provision_storage_container_host.sh", compute_script = "/usr/local/share/slurm_gcp_docker/src/provision_worker_container_host.sh", nfs_disk_size = 2000, nfs_disk_type = "pd-standard", nfs_action_on_stop = "stop", nfs_image = "", - action_on_stop = "delete", image_family = "pydpiper", image = None, + action_on_stop = "delete", image_family = "slurm-gcp-docker", image = None, cluster_name = None, clust_frac = 0.01, user = os.environ["USER"], **kwargs ): if cluster_name is None: @@ -81,7 +81,7 @@ def init_slurm(self): # # check if image exists try: - image = self.dkr.images.get('broadinstitute/pydpiper:latest') + image = self.dkr.images.get('broadinstitute/slurm_gcp_docker:latest') except docker.errors.ImageNotFound: raise Exception("You have not yet built or pulled the Slurm Docker image!") From ef698500ac879469d3d0328ff0adc1084052ea2e Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Fri, 7 Feb 2020 18:05:18 +0000 Subject: [PATCH 48/52] Bind mount /etc/gcloud In the future, we need to figure out how to make the Docker backend less gcloud-specific. --- canine/backends/dockerTransient.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/canine/backends/dockerTransient.py b/canine/backends/dockerTransient.py index 6f061332..0fb51212 100644 --- a/canine/backends/dockerTransient.py +++ b/canine/backends/dockerTransient.py @@ -104,10 +104,13 @@ def init_slurm(self): # # create the Slurm container if it's not already present if self.config["cluster_name"] not in [x.name for x in self.dkr.containers.list()]: - #if image not in [x.image for x in self.dkr.containers.list()]: self.dkr.containers.run( image = image.tags[0], detach = True, network_mode = "host", - volumes = { "/mnt/nfs" : { "bind" : "/mnt/nfs", "mode" : "rw" } }, + # FIXME: /etc/gcloud is cloud-provider specific. how can we make this more generic? + volumes = { + "/mnt/nfs" : { "bind" : "/mnt/nfs", "mode" : "rw" }, + "/etc/gcloud" : { "bind" : "/etc/gcloud", "mode" : "rw" } + }, name = self.config["cluster_name"], command = "/bin/bash", user = self.config["user"], stdin_open = True, remove = True ) From e5a87f7a887951520338513398f47d4d2a90a286 Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Fri, 7 Feb 2020 23:22:40 +0000 Subject: [PATCH 49/52] Bind mount ~/.config/gcloud to /etc/gcloud --- canine/backends/dockerTransient.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/canine/backends/dockerTransient.py b/canine/backends/dockerTransient.py index 0fb51212..3beddcaa 100644 --- a/canine/backends/dockerTransient.py +++ b/canine/backends/dockerTransient.py @@ -104,12 +104,13 @@ def init_slurm(self): # # create the Slurm container if it's not already present if self.config["cluster_name"] not in [x.name for x in self.dkr.containers.list()]: + # FIXME: gcloud is cloud-provider specific. how can we make this more generic? + gcloud_conf_dir = subprocess.check_output("echo -n ~/.config/gcloud", shell = True).decode() self.dkr.containers.run( image = image.tags[0], detach = True, network_mode = "host", - # FIXME: /etc/gcloud is cloud-provider specific. how can we make this more generic? volumes = { "/mnt/nfs" : { "bind" : "/mnt/nfs", "mode" : "rw" }, - "/etc/gcloud" : { "bind" : "/etc/gcloud", "mode" : "rw" } + gcloud_conf_dir : { "bind" : "/etc/gcloud", "mode" : "rw" } }, name = self.config["cluster_name"], command = "/bin/bash", user = self.config["user"], stdin_open = True, remove = True From ee6419372fd5780956321edb1cd318a1424ce77f Mon Sep 17 00:00:00 2001 From: Aaron Graubert Date: Wed, 19 Feb 2020 17:52:02 -0500 Subject: [PATCH 50/52] Job avoidance now works for all backends Minor fixes and improvements to job avoidance --- canine/backends/base.py | 44 ++++++ canine/backends/local.py | 9 ++ canine/localization/local.py | 92 ++++++++---- canine/orchestrator.py | 266 ++++++++++++++++++---------------- canine/utils.py | 18 +-- docs/canine/backends.html | 22 ++- docs/canine/index.html | 9 +- docs/canine/localization.html | 6 + docs/canine/xargs.html | 7 + docs/genindex.html | 22 ++- docs/objects.inv | Bin 1788 -> 1818 bytes docs/searchindex.js | 2 +- 12 files changed, 315 insertions(+), 182 deletions(-) diff --git a/canine/backends/base.py b/canine/backends/base.py index 1da92a28..a67f2c47 100644 --- a/canine/backends/base.py +++ b/canine/backends/base.py @@ -211,6 +211,50 @@ def walk(self, path: str) -> typing.Generator[typing.Tuple[str, typing.List[str] for dirname in dirnames: yield from self.walk(os.path.join(path, dirname)) + def rmtree(self, path: str, max_retries: int = 5, timeout: int = 5): + """ + Recursively remove the directory tree rooted at the given path. + Automatically retries failures after a brief timeout + """ + pathstat = self.stat(path) + if not stat.S_ISDIR(pathstat.st_mode): + raise NotADirectoryError(path) + for attempt in range(max_retries): + try: + return self._rmtree(path, pathstat) + except (OSError, FileNotFoundError, IOError, NotADirectoryError): + # Helps to preserve the exception traceback by conditionally re-raising here + if attempt >= (max_retries - 1): + raise + time.sleep(timeout) + # Should not be possible to reach here + raise RuntimeError("AbstractTransport.rmtree exceeded retries without exception") + + def _rmtree(self, path: str, pathstat: os.stat_result): + """ + (Internal) + Recursively remove the directory tree rooted at the given path. + Automatically retries failures after a brief timeout + """ + if not stat.S_ISDIR(pathstat.st_mode): + raise NotADirectoryError(path) + for fname in self.listdir(path): + fname = os.path.join(path, fname) + try: + fstat = self.stat(fname) + except FileNotFoundError: + # Handling for broken symlinks is bad + self.remove(fname) + else: + if stat.S_ISDIR(fstat.st_mode): + self._rmtree( + fname, + fstat + ) + else: + self.remove(fname) + self.rmdir(path) + def sendtree(self, src: str, dest: str): """ Copy the full local file tree src to the remote path dest diff --git a/canine/backends/local.py b/canine/backends/local.py index 7c09de9a..fecd9e93 100644 --- a/canine/backends/local.py +++ b/canine/backends/local.py @@ -3,6 +3,7 @@ import io import sys import subprocess +import shutil from .base import AbstractSlurmBackend, AbstractTransport from ..utils import ArgumentHelper, check_call from agutil import StdOutAdapter @@ -98,6 +99,14 @@ def walk(self, path: str) -> typing.Generator[typing.Tuple[str, typing.List[str] """ yield from os.walk(path) + def _rmtree(self, path: str, pathstat: os.stat_result): + """ + (Internal) + Recursively remove the directory tree rooted at the given path. + Automatically retries failures after a brief timeout + """ + shutil.rmtree(path) + class LocalSlurmBackend(AbstractSlurmBackend): """ SLURM backend for interacting with a local slurm node diff --git a/canine/localization/local.py b/canine/localization/local.py index 3f986151..2b6de42b 100644 --- a/canine/localization/local.py +++ b/canine/localization/local.py @@ -40,6 +40,7 @@ def __init__( super().__init__(backend, transfer_bucket, common, staging_dir, mount_path, project) self.queued_gs = [] # Queued gs:// -> remote staging transfers self.queued_batch = [] # Queued local -> remote directory transfers + self._has_localized = False def localize_file(self, src: str, dest: PathType, transport: typing.Optional[AbstractTransport] = None): """ @@ -47,20 +48,36 @@ def localize_file(self, src: str, dest: PathType, transport: typing.Optional[Abs gs:// files are queued for later transfer local files are symlinked to the staging directory """ - if src.startswith('gs://'): - self.queued_gs.append(( - src, - dest.controllerpath, - 'remote' - )) - elif os.path.exists(src): - src = os.path.abspath(src) - if not os.path.isdir(os.path.dirname(dest.localpath)): - os.makedirs(os.path.dirname(dest.localpath)) - if os.path.isfile(src): - os.symlink(src, dest.localpath) - else: - self.queued_batch.append((src, os.path.join(dest.controllerpath, os.path.basename(src)))) + if not self._has_localized: + if src.startswith('gs://'): + self.queued_gs.append(( + src, + dest.controllerpath, + 'remote' + )) + elif os.path.exists(src): + src = os.path.abspath(src) + if not os.path.isdir(os.path.dirname(dest.localpath)): + os.makedirs(os.path.dirname(dest.localpath)) + if os.path.isfile(src): + os.symlink(src, dest.localpath) + else: + self.queued_batch.append((src, os.path.join(dest.controllerpath, os.path.basename(src)))) + else: + warnings.warn("BatchedLocalizer.localize_file called after main localization. Ignoring normal handling and sending over transport") + with self.transport_context(transport) as transport: + if not transport.isdir(os.path.dirname(dest.controllerpath)): + transport.makedirs(os.path.dirname(dest.controllerpath)) + if src.startswith('gs://'): + self.gs_copy( + src, + dest.controllerpath, + 'remote' + ) + elif os.path.isfile(src): + transport.send(src, dest.controllerpath) + else: + transport.sendtree(src, dest.controllerpath) def __enter__(self): """ @@ -118,13 +135,14 @@ def localize(self, inputs: typing.Dict[str, typing.Dict[str, str]], patterns: ty self.sendtree( self.local_dir, self.staging_dir, - transport + transport,exist_okay=True ) staging_dir = self.finalize_staging_dir(inputs.keys(), transport=transport) for src, dest, context in self.queued_gs: self.gs_copy(src, dest, context) for src, dest in self.queued_batch: self.sendtree(src, os.path.dirname(dest)) + self._has_localized = True return staging_dir class LocalLocalizer(BatchedLocalizer): @@ -147,17 +165,33 @@ def localize_file(self, src: str, dest: PathType, transport: typing.Optional[Abs gs:// files are queued for later transfer local files are symlinked to the staging directory """ - if src.startswith('gs://'): - self.gs_copy( - src, - dest.localpath, - 'local' - ) - elif os.path.exists(src): - src = os.path.abspath(src) - if not os.path.isdir(os.path.dirname(dest.localpath)): - os.makedirs(os.path.dirname(dest.localpath)) - if os.path.isfile(src): - os.symlink(src, dest.localpath) - else: - self.queued_batch.append((src, os.path.join(dest.controllerpath, os.path.basename(src)))) + if self._has_localized: + if src.startswith('gs://'): + self.gs_copy( + src, + dest.localpath, + 'local' + ) + elif os.path.exists(src): + src = os.path.abspath(src) + if not os.path.isdir(os.path.dirname(dest.localpath)): + os.makedirs(os.path.dirname(dest.localpath)) + if os.path.isfile(src): + os.symlink(src, dest.localpath) + else: + self.queued_batch.append((src, os.path.join(dest.controllerpath, os.path.basename(src)))) + else: + warnings.warn("LocalLocalizer.localize_file called after main localization. Ignoring normal handling and sending over transport") + with self.transport_context(transport) as transport: + if not transport.isdir(os.path.dirname(dest.controllerpath)): + transport.makedirs(os.path.dirname(dest.controllerpath)) + if src.startswith('gs://'): + self.gs_copy( + src, + dest.controllerpath, + 'remote' + ) + elif os.path.isfile(src): + transport.send(src, dest.controllerpath) + else: + transport.sendtree(src, dest.controllerpath) diff --git a/canine/orchestrator.py b/canine/orchestrator.py index 08bc6687..c9434e5e 100644 --- a/canine/orchestrator.py +++ b/canine/orchestrator.py @@ -9,7 +9,7 @@ from .adapters import AbstractAdapter, ManualAdapter, FirecloudAdapter from .backends import AbstractSlurmBackend, LocalSlurmBackend, RemoteSlurmBackend, TransientGCPSlurmBackend, TransientImageSlurmBackend from .localization import AbstractLocalizer, BatchedLocalizer, LocalLocalizer, RemoteLocalizer, NFSLocalizer -from .utils import check_call, rmtree_retry +from .utils import check_call import yaml import numpy as np import pandas as pd @@ -215,7 +215,7 @@ def run_pipeline(self, output_dir: str = 'canine_output', dry_run: bool = False) with self._localizer_type(self.backend, **self.localizer_args) as localizer: # # localize inputs - self.job_avoid(localizer) + self.job_avoid(localizer) # AG: This raises an exception if job avoidance fails due to mismatched inputs. Should we instead clear the staging dir? entrypoint_path = self.localize_inputs_and_script(localizer) if dry_run: @@ -276,7 +276,8 @@ def run_pipeline(self, output_dir: str = 'canine_output', dry_run: bool = False) localizer.clean_on_exit = False raise finally: - if len(completed_jobs): + # Check if fully job-avoided so we still delocalize + if batch_id == -2 or len(completed_jobs): print("Delocalizing outputs") outputs = localizer.delocalize(self.raw_outputs, output_dir) print("Parsing output data") @@ -375,39 +376,42 @@ def wait_for_jobs_to_finish(self, batch_id): return completed_jobs, cpu_time, uptime, prev_acct def make_output_DF(self, batch_id, outputs, cpu_time, prev_acct, localizer = None) -> pd.DataFrame: - try: - acct = self.backend.sacct(job=batch_id) - - df = pd.DataFrame.from_dict( - data={ - job_id: { - ('job', 'slurm_state'): acct['State'][batch_id+'_'+str(array_id)], - ('job', 'exit_code'): acct['ExitCode'][batch_id+'_'+str(array_id)], - ('job', 'cpu_seconds'): (prev_acct['CPUTimeRAW'][batch_id+'_'+str(array_id)] + ( - cpu_time[batch_id+'_'+str(array_id)] if batch_id+'_'+str(array_id) in cpu_time else 0 - )) if prev_acct is not None else -1, - **{ ('inputs', key) : val for key, val in self.job_spec[job_id].items() }, - **{ - ('outputs', key) : val[0] if isinstance(val, list) and len(val) == 1 else val - for key, val in outputs[job_id].items() + df = pd.DataFrame() + if batch_id != -2: + try: + acct = self.backend.sacct(job=batch_id) + + df = pd.DataFrame.from_dict( + data={ + job_id: { + ('job', 'slurm_state'): acct['State'][batch_id+'_'+str(array_id)], + ('job', 'exit_code'): acct['ExitCode'][batch_id+'_'+str(array_id)], + ('job', 'cpu_seconds'): (prev_acct['CPUTimeRAW'][batch_id+'_'+str(array_id)] + ( + cpu_time[batch_id+'_'+str(array_id)] if batch_id+'_'+str(array_id) in cpu_time else 0 + )) if prev_acct is not None else -1, + **{ ('inputs', key) : val for key, val in self.job_spec[job_id].items() }, + **{ + ('outputs', key) : val[0] if isinstance(val, list) and len(val) == 1 else val + for key, val in outputs[job_id].items() + } } - } - for array_id, job_id in enumerate(self.job_spec) - }, - orient = "index" - ).rename_axis(index = "_job_id").astype({('job', 'cpu_seconds'): int}) - - # - # apply functions to output columns (if any) - if len(self.output_map) > 0: - # columns that receive no (i.e., identity) transformation - identity_map = { x : lambda y : y for x in set(df.columns.get_loc_level("outputs")[1]) - self.output_map.keys() } - - # we get back all columns from the dataframe by aggregating columns - # that don't receive any transformation with transformed columns - df["outputs"] = df["outputs"].agg({ **self.output_map, **identity_map }) - except: - df = pd.DataFrame() + for array_id, job_id in enumerate(self.job_spec) + }, + orient = "index" + ).rename_axis(index = "_job_id").astype({('job', 'cpu_seconds'): int}) + + # + # apply functions to output columns (if any) + if len(self.output_map) > 0: + # columns that receive no (i.e., identity) transformation + identity_map = { x : lambda y : y for x in set(df.columns.get_loc_level("outputs")[1]) - self.output_map.keys() } + + # we get back all columns from the dataframe by aggregating columns + # that don't receive any transformation with transformed columns + df["outputs"] = df["outputs"].agg({ **self.output_map, **identity_map }) + except: + traceback.print_exc() + # concatenate with any previously existing job avoided results if self.df_avoided is not None: @@ -415,18 +419,19 @@ def make_output_DF(self, batch_id, outputs, cpu_time, prev_acct, localizer = Non # save DF to disk if isinstance(localizer, AbstractLocalizer): - fname = str(uuid.uuid4()) + "-results.k9df.pickle" - df.to_pickle(fname) - localizer.localize_file(fname, localizer.reserve_path(localizer.staging_dir, "results.k9df.pickle")) - os.remove(fname) - + with localizer.transport_context() as transport: + dest = localizer.reserve_path("results.k9df.pickle").controllerpath + if not transport.isdir(os.path.dirname(dest)): + transport.makedirs(os.path.dirname(dest)) + with transport.open(dest, 'wb') as w: + df.to_pickle(w, compression=None) return df - def submit_batch_job(self, entrypoint_path, compute_env, extra_sbatch_args = {}): + def submit_batch_job(self, entrypoint_path, compute_env, extra_sbatch_args = {}) -> int: # this job was avoided if len(self.job_spec) == 0: return -2 - + batch_id = self.backend.sbatch( entrypoint_path, **{ @@ -442,90 +447,99 @@ def submit_batch_job(self, entrypoint_path, compute_env, extra_sbatch_args = {}) return batch_id - def job_avoid(self, localizer, overwrite = False): #TODO: add params for type of avoidance (force, only if failed, etc.) - df_path = localizer.reserve_path(localizer.staging_dir, "results.k9df.pickle") - - # remove all output if specified - if overwrite: - if os.path.isdir(localizer.staging_dir): - rmtree_retry(localizer.staging_dir) - os.makedirs(localizer.staging_dir) - return 0 - - # check for preexisting jobs' output - if os.path.exists(df_path.localpath): - try: - # load in results and job spec dataframes - r_df = pd.read_pickle(df_path.localpath) - js_df = pd.DataFrame.from_dict(self.job_spec, orient = "index").rename_axis(index = "_job_id") - - if r_df.empty or \ - "inputs" not in r_df or \ - ("outputs" not in r_df and len(self.raw_outputs) > 0): - raise ValueError("Could not recover previous job results!") - - # check if jobs are compatible: they must have identical inputs and index, - # and output columns must be matching - if not (r_df["inputs"].columns.isin(js_df.columns).all() and \ - js_df.columns.isin(r_df["inputs"].columns).all()): - r_df_set = set(r_df["inputs"].columns) - js_df_set = set(js_df.columns) - raise ValueError( - "Cannot job avoid; sets of input parameters do not match! Parameters unique to:\n" + \ - "\u21B3saved: " + ", ".join(r_df_set - js_df_set) + "\n" + \ - "\u21B3job: " + ", ".join(js_df_set - r_df_set) - ) - - output_temp = pd.Series(index = self.raw_outputs.keys()) - if not (r_df["outputs"].columns.isin(output_temp.index).all() and \ - output_temp.index.isin(r_df["outputs"].columns).all()): - r_df_set = set(r_df["outputs"].columns) - o_df_set = set(output_temp.index) - raise ValueError( - "Cannot job avoid; sets of output parameters do not match! Parameters unique to:\n" + \ - "\u21B3saved: " + ", ".join(r_df_set - o_df_set) + "\n" + \ - "\u21B3job: " + ", ".join(o_df_set - r_df_set) - ) - - # check that values of inputs are the same - # we have to sort because the order of jobs might differ for the same - # inputs - sort_cols = r_df.columns.to_series()["inputs"] - r_df = r_df.sort_values(sort_cols.tolist()) - js_df = js_df.sort_values(sort_cols.index.tolist()) - - if not r_df["inputs"].equals(js_df): - raise ValueError("Cannot job avoid; values of input parameters do not match!") - - # if all is well, figure out which jobs need to be re-run - fail_idx = r_df[("job", "slurm_state")] == "FAILED" - self.df_avoided = r_df.loc[~fail_idx] - - # remove jobs that don't need to be re-run from job_spec - for k in r_df.index[~fail_idx]: - self.job_spec.pop(k, None) - - # remove output directories of failed jobs - for k in self.job_spec: - rmtree_retry(os.path.join(localizer.environment('local')["CANINE_JOBS"], k)) - - # we also have to remove the common inputs directory, so that - # the localizer can regenerate it - if len(self.job_spec) > 0: - rmtree_retry(os.path.join(localizer.environment('local')["CANINE_JOBS"], "common")) - - return np.count_nonzero(~fail_idx) - except (ValueError, OSError) as e: - print(e) - print("Overwriting output and aborting job avoidance.") - rmtree_retry(localizer.staging_dir) - os.makedirs(localizer.staging_dir) + def job_avoid(self, localizer: AbstractLocalizer, overwrite: bool = False) -> int: #TODO: add params for type of avoidance (force, only if failed, etc.) + """ + Detects jobs which have previously been run in this staging directory. + Succeeded jobs are skipped. Failed jobs are reset and rerun + """ + with localizer.transport_context() as transport: + df_path = localizer.reserve_path("results.k9df.pickle").controllerpath + + #remove all output if specified + if overwrite: + if transport.isdir(localizer.staging_dir): + transport.rmtree(localizer.staging_dir) + transport.makedirs(localizer.staging_dir) return 0 - # if the output directory exists but there's no output dataframe, assume - # it's corrupted and remove it - elif os.path.isdir(localizer.staging_dir): - rmtree_retry(localizer.staging_dir) - os.makedirs(localizer.staging_dir) - + # check for preexisting jobs' output + if transport.exists(df_path): + try: + # load in results and job spec dataframes + with transport.open(df_path) as r: + r_df = pd.read_pickle(r, compression=None) + js_df = pd.DataFrame.from_dict(self.job_spec, orient = "index").rename_axis(index = "_job_id") + + if r_df.empty or \ + "inputs" not in r_df or \ + ("outputs" not in r_df and len(self.raw_outputs) > 0): + raise ValueError("Could not recover previous job results!") + + # check if jobs are compatible: they must have identical inputs and index, + # and output columns must be matching + if not (r_df["inputs"].columns.isin(js_df.columns).all() and \ + js_df.columns.isin(r_df["inputs"].columns).all()): + r_df_set = set(r_df["inputs"].columns) + js_df_set = set(js_df.columns) + raise ValueError( + "Cannot job avoid; sets of input parameters do not match! Parameters unique to:\n" + \ + "\u21B3saved: " + ", ".join(r_df_set - js_df_set) + "\n" + \ + "\u21B3job: " + ", ".join(js_df_set - r_df_set) + ) + + output_temp = pd.Series(index = self.raw_outputs.keys()) + if not (r_df["outputs"].columns.isin(output_temp.index).all() and \ + output_temp.index.isin(r_df["outputs"].columns).all()): + r_df_set = set(r_df["outputs"].columns) + o_df_set = set(output_temp.index) + raise ValueError( + "Cannot job avoid; sets of output parameters do not match! Parameters unique to:\n" + \ + "\u21B3saved: " + ", ".join(r_df_set - o_df_set) + "\n" + \ + "\u21B3job: " + ", ".join(o_df_set - r_df_set) + ) + + # check that values of inputs are the same + # we have to sort because the order of jobs might differ for the same + # inputs + sort_cols = r_df.columns.to_series()["inputs"] + r_df = r_df.sort_values(sort_cols.tolist()) + js_df = js_df.sort_values(sort_cols.index.tolist()) + + if not r_df["inputs"].equals(js_df): + raise ValueError("Cannot job avoid; values of input parameters do not match!") + + # if all is well, figure out which jobs need to be re-run + fail_idx = r_df[("job", "slurm_state")] == "FAILED" + self.df_avoided = r_df.loc[~fail_idx] + + # remove jobs that don't need to be re-run from job_spec + for k in r_df.index[~fail_idx]: + self.job_spec.pop(k, None) + + # remove output directories of failed jobs + for k in self.job_spec: + transport.rmtree( + localizer.reserve_path('jobs', k).controllerpath + ) + + # we also have to remove the common inputs directory, so that + # the localizer can regenerate it + if len(self.job_spec) > 0: + transport.rmtree( + localizer.reserve_path('common').controllerpath + ) + + return np.count_nonzero(~fail_idx) + except (ValueError, OSError) as e: + print(e) + print("Overwriting output and aborting job avoidance.") + transport.rmtree(localizer.staging_dir) + transport.makedirs(localizer.staging_dir) + return 0 + + # if the output directory exists but there's no output dataframe, assume + # it's corrupted and remove it + elif transport.isdir(localizer.staging_dir): + transport.rmtree(localizer.staging_dir) + transport.makedirs(localizer.staging_dir) return 0 diff --git a/canine/utils.py b/canine/utils.py index 5fa18cca..00d627c6 100644 --- a/canine/utils.py +++ b/canine/utils.py @@ -267,20 +267,4 @@ def gcp_hourly_cost(mtype: str, preemptible: bool = False, ssd_size: int = 0, hd ) ) -def rmtree_retry(path, max_tries = 5, timeout = 5): - """ - Repeatedly invoke shutil.rmtree to remove stubborn directories. Sometimes, - rmtree will hang due to NFS lag. Retry up to a default of five times with a - default of five seconds between attempts, then raise an OSError. - """ - n_tries = 0 - while True: - try: - shutil.rmtree(path) - break - except FileNotFoundError: - time.sleep(timeout) - - if n_tries >= max_tries - 1: - raise OSError("Cannot remove " + path) - n_tries += 1 +# rmtree_retry removed in favor of AbstractTransport.rmtree diff --git a/docs/canine/backends.html b/docs/canine/backends.html index dbc1c761..7283a718 100644 --- a/docs/canine/backends.html +++ b/docs/canine/backends.html @@ -239,6 +239,13 @@

canine.backends +
+rmtree(path: str, max_retries: int = 5, timeout: int = 5)
+

Recursively remove the directory tree rooted at the given path. +Automatically retries failures after a brief timeout

+
+
send(localfile: Union[str, IO], remotefile: Union[str, IO])
@@ -524,6 +531,13 @@

canine.backends +
+rmtree(path: str, max_retries: int = 5, timeout: int = 5)
+

Recursively remove the directory tree rooted at the given path. +Automatically retries failures after a brief timeout

+

+
send(localfile: Union[str, IO], remotefile: Union[str, IO])
@@ -825,7 +839,7 @@

canine.backends
-stop(action_on_stop=None)
+stop(action_on_stop=None, kill_straggling_jobs=True)

Delete or stop (default) compute instances

@@ -838,7 +852,7 @@

canine.backends
-wait_for_cluster_ready()
+wait_for_cluster_ready(elastic=False)

Blocks until the main partition is marked as up

@@ -846,7 +860,7 @@

canine.backends
-class canine.backends.DockerTransientImageSlurmBackend(nfs_compute_script='/usr/local/share/cga_pipeline/src/provision_storage_container_host.sh', compute_script='/usr/local/share/cga_pipeline/src/provision_worker_container_host.sh', nfs_disk_size=2000, nfs_disk_type='pd-standard', nfs_action_on_stop='stop', nfs_image='', action_on_stop='delete', image_family='pydpiper', image=None, cluster_name=None, clust_frac=0.01, user='aarong', **kwargs)
+class canine.backends.DockerTransientImageSlurmBackend(nfs_compute_script='/usr/local/share/slurm_gcp_docker/src/provision_storage_container_host.sh', compute_script='/usr/local/share/slurm_gcp_docker/src/provision_worker_container_host.sh', nfs_disk_size=2000, nfs_disk_type='pd-standard', nfs_action_on_stop='stop', nfs_image='', action_on_stop='delete', image_family='slurm-gcp-docker', image=None, cluster_name=None, clust_frac=0.01, user='aarong', **kwargs)
estimate_cost(clock_uptime: Optional[float] = None, node_uptime: Optional[float] = None, job_cpu_time: Optional[Dict[str, float]] = None) → Tuple[float, Optional[Dict[str, float]]]
@@ -946,7 +960,7 @@

canine.backends
-wait_for_cluster_ready()
+wait_for_cluster_ready(elastic=False)

Blocks until the main partition is marked as up

diff --git a/docs/canine/index.html b/docs/canine/index.html index a4d19ca3..ba415f1d 100644 --- a/docs/canine/index.html +++ b/docs/canine/index.html @@ -37,7 +37,7 @@

canine
-class canine.Orchestrator(config: Union[str, Dict[str, Any]])
+class canine.Orchestrator(config: Union[str, Dict[str, Any], pandas.core.frame.DataFrame, pandas.core.series.Series])

Main class Parses a configuration object, initializes, runs, and cleans up a Canine Pipeline

@@ -47,6 +47,13 @@

canine +
+job_avoid(localizer: canine.localization.base.AbstractLocalizer, overwrite: bool = False) → int
+

Detects jobs which have previously been run in this staging directory. +Succeeded jobs are skipped. Failed jobs are reset and rerun

+

+
run_pipeline(output_dir: str = 'canine_output', dry_run: bool = False) → pandas.core.frame.DataFrame
diff --git a/docs/canine/localization.html b/docs/canine/localization.html index bbe5c6c5..f87f16ce 100644 --- a/docs/canine/localization.html +++ b/docs/canine/localization.html @@ -545,6 +545,12 @@

canine.localization +
+same_volume(*args)
+

Check if *args are stored on the same NFS mount as the output directory.

+

+
sendtree(src: str, dest: str, transport: Optional[canine.backends.base.AbstractTransport] = None, exist_okay=False)
diff --git a/docs/canine/xargs.html b/docs/canine/xargs.html index d441645c..c073709b 100644 --- a/docs/canine/xargs.html +++ b/docs/canine/xargs.html @@ -49,6 +49,13 @@

canine.xargs +
+job_avoid(localizer: canine.localization.base.AbstractLocalizer, overwrite: bool = False) → int
+

Detects jobs which have previously been run in this staging directory. +Succeeded jobs are skipped. Failed jobs are reset and rerun

+

+
run_pipeline(dry_run: bool = False) → pandas.core.frame.DataFrame
diff --git a/docs/genindex.html b/docs/genindex.html index 354d0dfa..cba6fa6a 100644 --- a/docs/genindex.html +++ b/docs/genindex.html @@ -312,6 +312,12 @@

I

J

- +
    +
  • setdefault() (canine.utils.ArgumentHelper method) +
  • sinfo() (canine.backends.DockerTransientImageSlurmBackend method)
      diff --git a/docs/objects.inv b/docs/objects.inv index d7fcf8aa4301892f8ba39eadda6aba88d3707da3..cba218be125166ddd883de15571f028fff1b334b 100644 GIT binary patch delta 1668 zcmV-~27CGZ4Vn&+c7M^5+a?r-@BI|&w72WFx4BEUlQuI+o9Skzz0o>eUc(+?X?iBS8(XndW85u2!5SkItwL&38>WZRV$m0L-j*NheO{p(ZIucXIuX5 zSakDUb+Z}^*S~4H17kI@Qumc9C&pAi2I@vCtv8JGno6-@+kc^Tushi-B7BzjH91Ju zB-YoRs1;MdPXn+V;r=qQWt-#6>yw5k3)+N+{8Sbj^>g`Krh#(VHLu4bfm<+Ty#%s` z1K}O9!NEwtDWVP3OsLKpRHW=IBGV|KX{ZlGbcE;Jf(Zd(z(TyN(V7Am57j-Pdk{@< zvY*Ogll@%&lYewS^e0V(VIb_gmTcjUce;Huo)H-NDLVzac59rqZZSBGGB$M42yHNN zkb5Wu=ge3)nT#cZf~kp<+pki9Db$>Df9rc<>8>_ZGgWHL4573bd+2{DIPGr)FGXi! zBTKRW>Yyt%O^tgDmqJ^q3ZZRZq@*r26Tc8<>y6Yk<$s-V(pqaE%`vU9_mUExEtoS3 zhC((A^w1m{H zB6f!YlYa^X3dos9UaDN`6Wklye93_bJC@b0z@zTTn5G^C# z58;vn{t(RUkcBAILj;JJCSt_rxd@MuVJb6~osn6uJN#f;UWO|l$Y;j_|xMTmshuViQ8 zlYi2!#rbvj&$l154Mq%xWHX|8OtTSf1i^Ad*+|CY8GgF`h(d8pNEDA`MWXx+LlS3Z z*pg?U>EbMf66Yp=FpVNJJyDd4>5Pz5OUfpd6iO%g z-x5j6r;wCM9x0SIQYvAjOsYtsB#|QNA%8^@L&~LuluZWV{r3U(l~^W4C>igvJeH6* zIpITJ{d=!eQLf$ zcb4(s|DF#8vG?3#;cLC^dTIOxxPJ-?`UT@k@b{!;g#KDGDeghgz=m`@cU*_H$0K`zEZ}si$%pjY zlEC6yblVv#+mD)27lJHYzmvLd!F0lAHNU2+i+=NeImKxp0%h2jCJ)gZ zpb4NpF}%LW7+gYlI4A0P58t66_V67G|2)<8*6c;qaGh)0N?iv$yQ|do>en`vx=wg@ zPpM$_^Zq-PvCXSS(&{F>HGfa#ZSS-4(gp(cCm98mm$b0D-03P%`Dq8N$0Msi)E1~W z3+k#sytulYeF4bRtgJ5Q^y+dX1J#~JxVpSKt1XD9%jD!9cwgysbgb~M|GAot#>E+b z0C#~LR905E1O4B>|1p(2Z+PAf*Y(ZHt%^WX>UeW17+D|2bu~6>Tor^#chef)v*q+GMMC6IZ-d$xVe(&kQ>V2q4?D zr@Tg9FHaIAdB!6I9t)bBJSD`3)M|dZr8blZCZJM>H?3?s4%H789S(iZL<5iOoo)HI zW7E}l)zxZjT>qx&4vf{rO5Jy+oETI680Z`6wB9hzYbwQtZGVT+!R}rK-O?ict>o|7zsE< zw1Jul)mejzl$}Ln8U-{B^@)g%@SI05As`qm#LKeQ6u8(_cZBXiG(l%Sw#A(N)c%un zKlEo!gkd1;hkurA;g)y0eK(#Fu>6#r0$sZ~&RRDaoW>X%x@m+ln7EgFC*QqWivx4Eyf5nE)u}oeQ`yE@lmn zF3vD0C}tX_kw`8*1SFKp5BDgjJ|e@q!L zBd~kGWPe!=SiDxEU78>*5~{^=F7p(-pn%Yn#(N{h~)IJH-O`k zG*2%_YofOX4U7~}2gn%_b%>S`QU`IAAo@{Oh<_g9rN`(YT1Jo_!X-!PA(+`A3sGj^ zA0T2z{v$pw^!FG!vA>VX3I2UnM)W^NrHB7>R(eG1v$7HZAC{K^%xHVO3)|yaEH4T0 zY3W&jPs&OIBCzy`bPh~U1m>*tOkl=p(VQ$j7YLE?OrM?&_@wl7;PhVk=j)s7q#y=E zN`DQac+3PLItW6V5M?7J3or1~Q-&xMCvk}4k2VGc!_&7oh10MI4%wR>Z04 z$wi!*kz&N5$%#fBnU`+FxyfH9qsYuWCd$RkW{_(*%2sa_s@wSADmKd3YLuzcC{&+O zsxqTYO-7+=j3RXyMJh1L)n1gXy5RjgeSh|qSSCfN>g~KeR{1tL=Uwme(0m+^1?vpw z%-6!!4Lr369V!D1;+|%HA+})Mqj&RcZ~{dR273VG z7AAGTr#f3ug^c|o?`i|KQ?&)68~GrR2Pou%M{FH?8JNLV%&nkQwyhK|6zt{<+Tc~|JhD^_y2qNu4u_fBW7jcuv1Re$c7DtfJlj;N?e}J1I%I}1WExuDYvh+^i zXu|7Jv1E6G#Sz^J5?OM`@0mEN>#@*;*XN>XuMftTUw;H_1)lLSL^uFN&|#m7EyX^+ zjPD@|vX92rX$H_M-$P+_HS533M<Cx@Kcef-SlQlaMqLQ9aQ;sEx(U+> zn>GBJsxJC<{`C~6fe4ggtCl=O3oXKhxRn%Ig`Pj**VE)H|g+=)(}ghcE4$ zPSD{a>8tu~sj7Ff)BJE6dLh7t$4@#?_b{d97oZ8CzU;a_tLMFe5OyzA^v+};s2l`Jw z-dA3If57T|_ohJS=Uttij;sP Date: Thu, 20 Feb 2020 09:40:17 -0500 Subject: [PATCH 51/52] Removed UUID import --- canine/orchestrator.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/canine/orchestrator.py b/canine/orchestrator.py index c9434e5e..77bae371 100644 --- a/canine/orchestrator.py +++ b/canine/orchestrator.py @@ -2,7 +2,6 @@ import os import time import sys -import uuid import warnings import traceback from subprocess import CalledProcessError @@ -215,7 +214,7 @@ def run_pipeline(self, output_dir: str = 'canine_output', dry_run: bool = False) with self._localizer_type(self.backend, **self.localizer_args) as localizer: # # localize inputs - self.job_avoid(localizer) # AG: This raises an exception if job avoidance fails due to mismatched inputs. Should we instead clear the staging dir? + self.job_avoid(localizer) entrypoint_path = self.localize_inputs_and_script(localizer) if dry_run: @@ -476,7 +475,7 @@ def job_avoid(self, localizer: AbstractLocalizer, overwrite: bool = False) -> in raise ValueError("Could not recover previous job results!") # check if jobs are compatible: they must have identical inputs and index, - # and output columns must be matching + # and output columns must be matching if not (r_df["inputs"].columns.isin(js_df.columns).all() and \ js_df.columns.isin(r_df["inputs"].columns).all()): r_df_set = set(r_df["inputs"].columns) From ab151f6dcd33aa2b94b0e19322c485a77a3cb201 Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Thu, 27 Feb 2020 15:00:48 -0500 Subject: [PATCH 52/52] Make it clearer that cluster_name is a required param Rather than defaulting it to None and raising an exception if it's None in the constructor, just make it a required positional argument. --- canine/backends/dockerTransient.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/canine/backends/dockerTransient.py b/canine/backends/dockerTransient.py index 3beddcaa..a754c1f5 100644 --- a/canine/backends/dockerTransient.py +++ b/canine/backends/dockerTransient.py @@ -21,15 +21,13 @@ class DockerTransientImageSlurmBackend(TransientImageSlurmBackend): # {{{ def __init__( - self, nfs_compute_script = "/usr/local/share/slurm_gcp_docker/src/provision_storage_container_host.sh", + self, cluster_name, *, + nfs_compute_script = "/usr/local/share/slurm_gcp_docker/src/provision_storage_container_host.sh", compute_script = "/usr/local/share/slurm_gcp_docker/src/provision_worker_container_host.sh", nfs_disk_size = 2000, nfs_disk_type = "pd-standard", nfs_action_on_stop = "stop", nfs_image = "", action_on_stop = "delete", image_family = "slurm-gcp-docker", image = None, - cluster_name = None, clust_frac = 0.01, user = os.environ["USER"], **kwargs + clust_frac = 0.01, user = os.environ["USER"], **kwargs ): - if cluster_name is None: - raise ValueError("You must specify a name for this Slurm cluster!") - if "image" not in kwargs: kwargs["image"] = image