diff --git a/canine/backends/base.py b/canine/backends/base.py index 1da92a28..a67f2c47 100644 --- a/canine/backends/base.py +++ b/canine/backends/base.py @@ -211,6 +211,50 @@ def walk(self, path: str) -> typing.Generator[typing.Tuple[str, typing.List[str] for dirname in dirnames: yield from self.walk(os.path.join(path, dirname)) + def rmtree(self, path: str, max_retries: int = 5, timeout: int = 5): + """ + Recursively remove the directory tree rooted at the given path. + Automatically retries failures after a brief timeout + """ + pathstat = self.stat(path) + if not stat.S_ISDIR(pathstat.st_mode): + raise NotADirectoryError(path) + for attempt in range(max_retries): + try: + return self._rmtree(path, pathstat) + except (OSError, FileNotFoundError, IOError, NotADirectoryError): + # Helps to preserve the exception traceback by conditionally re-raising here + if attempt >= (max_retries - 1): + raise + time.sleep(timeout) + # Should not be possible to reach here + raise RuntimeError("AbstractTransport.rmtree exceeded retries without exception") + + def _rmtree(self, path: str, pathstat: os.stat_result): + """ + (Internal) + Recursively remove the directory tree rooted at the given path. + Automatically retries failures after a brief timeout + """ + if not stat.S_ISDIR(pathstat.st_mode): + raise NotADirectoryError(path) + for fname in self.listdir(path): + fname = os.path.join(path, fname) + try: + fstat = self.stat(fname) + except FileNotFoundError: + # Handling for broken symlinks is bad + self.remove(fname) + else: + if stat.S_ISDIR(fstat.st_mode): + self._rmtree( + fname, + fstat + ) + else: + self.remove(fname) + self.rmdir(path) + def sendtree(self, src: str, dest: str): """ Copy the full local file tree src to the remote path dest diff --git a/canine/backends/dockerTransient.py b/canine/backends/dockerTransient.py index e6b46735..a754c1f5 100644 --- a/canine/backends/dockerTransient.py +++ b/canine/backends/dockerTransient.py @@ -11,6 +11,8 @@ import io import pickle import math +import threading +import time from .imageTransient import TransientImageSlurmBackend, list_instances, gce from ..utils import get_default_gcp_project, gcp_hourly_cost @@ -19,15 +21,13 @@ class DockerTransientImageSlurmBackend(TransientImageSlurmBackend): # {{{ def __init__( - self, nfs_compute_script = "/usr/local/share/cga_pipeline/src/provision_storage_container_host.sh", - compute_script = "/usr/local/share/cga_pipeline/src/provision_worker_container_host.sh", + self, cluster_name, *, + nfs_compute_script = "/usr/local/share/slurm_gcp_docker/src/provision_storage_container_host.sh", + compute_script = "/usr/local/share/slurm_gcp_docker/src/provision_worker_container_host.sh", nfs_disk_size = 2000, nfs_disk_type = "pd-standard", nfs_action_on_stop = "stop", nfs_image = "", - action_on_stop = "delete", image_family = "pydpiper", image = None, - cluster_name = None, clust_frac = 0.01, user = os.environ["USER"], **kwargs + action_on_stop = "delete", image_family = "slurm-gcp-docker", image = None, + clust_frac = 0.01, user = os.environ["USER"], **kwargs ): - if cluster_name is None: - raise ValueError("You must specify a name for this Slurm cluster!") - if "image" not in kwargs: kwargs["image"] = image @@ -70,6 +70,8 @@ def __init__( self.NFS_server_ready = False self.NFS_ready = False + self.NFS_monitor_thread = None + self.NFS_monitor_lock = None def init_slurm(self): self.dkr = docker.from_env() @@ -77,7 +79,7 @@ def init_slurm(self): # # check if image exists try: - image = self.dkr.images.get('broadinstitute/pydpiper:latest') + image = self.dkr.images.get('broadinstitute/slurm_gcp_docker:latest') except docker.errors.ImageNotFound: raise Exception("You have not yet built or pulled the Slurm Docker image!") @@ -100,10 +102,14 @@ def init_slurm(self): # # create the Slurm container if it's not already present if self.config["cluster_name"] not in [x.name for x in self.dkr.containers.list()]: - #if image not in [x.image for x in self.dkr.containers.list()]: + # FIXME: gcloud is cloud-provider specific. how can we make this more generic? + gcloud_conf_dir = subprocess.check_output("echo -n ~/.config/gcloud", shell = True).decode() self.dkr.containers.run( image = image.tags[0], detach = True, network_mode = "host", - volumes = { "/mnt/nfs" : { "bind" : "/mnt/nfs", "mode" : "rw" } }, + volumes = { + "/mnt/nfs" : { "bind" : "/mnt/nfs", "mode" : "rw" }, + gcloud_conf_dir : { "bind" : "/etc/gcloud", "mode" : "rw" } + }, name = self.config["cluster_name"], command = "/bin/bash", user = self.config["user"], stdin_open = True, remove = True ) @@ -131,7 +137,7 @@ def init_nodes(self): if not self.NFS_ready: raise Exception("NFS must be mounted before starting nodes!") - self.wait_for_cluster_ready() + self.wait_for_cluster_ready(elastic = True) # list all the nodes that Slurm is aware of @@ -167,33 +173,59 @@ def init_nodes(self): ]) def stop(self): - # stop the Docker - if self.container is not None: - self.container().stop() - # delete node configuration file - subprocess.check_call("rm -f /mnt/nfs/clust_conf/canine/backend_conf.pickle", shell = True) + try: + subprocess.check_call("rm -f /mnt/nfs/clust_conf/canine/backend_conf.pickle", shell = True) + except subprocess.CalledProcessError as e: + print("Couldn't delete node configuration file:", file = sys.stderr) + print(e) - # get list of nodes that still exist + # + # shutdown nodes that are still running (except NFS) allnodes = self.nodes - extant_nodes = self.list_instances_all_zones() - self.nodes = allnodes.loc[allnodes.index.isin(extant_nodes["name"]) & - (allnodes["machine_type"] != "nfs")] + + # sometimes the Google API will spectacularly fail; in that case, we + # just try to shutdown everything in the node list, regardless of whether + # it exists. + try: + extant_nodes = self.list_instances_all_zones() + self.nodes = allnodes.loc[allnodes.index.isin(extant_nodes["name"]) & + (allnodes["machine_type"] != "nfs")] + except: + self.nodes = allnodes.loc[allnodes["machine_type"] != "nfs"] # superclass method will stop/delete/leave these running, depending on how # self.config["action_on_stop"] is set super().stop() - # we handle the NFS separately - self.nodes = allnodes.loc[allnodes["machine_type"] == "nfs"] - super().stop(action_on_stop = self.config["nfs_action_on_stop"]) + # + # stop the Docker + + # this needs to happen after super().stop() is invoked, since that + # calls scancel, which in turn requires a running Slurm controller Docker + if self.container is not None: + self.container().stop() + + # + # unmount the NFS + # this needs to be the last step, since Docker will hang if NFS is pulled + # out from under it if self.config["nfs_action_on_stop"] != "run": try: subprocess.check_call("sudo umount -f /mnt/nfs", shell = True) except subprocess.CalledProcessError: print("Could not unmount NFS (do you have open files on it?)\nPlease run `lsof | grep /mnt/nfs`, close any open files, and run `sudo umount -f /mnt/nfs` before attempting to run another pipeline.") + # superclass method will stop/delete/leave the NFS running, depending on + # how self.config["nfs_action_on_stop"] is set. + + # kill thread that auto-restarts NFS + self.NFS_monitor_lock.set() + + self.nodes = allnodes.loc[allnodes["machine_type"] == "nfs"] + super().stop(action_on_stop = self.config["nfs_action_on_stop"], kill_straggling_jobs = False) + def _get_container(self, container_name): def closure(): return self.dkr.containers.get(container_name) @@ -246,6 +278,14 @@ def start_NFS(self): ) print("done", flush = True) + # start NFS monitoring thread + self.NFS_monitor_lock = threading.Event() + self.NFS_monitor_thread = threading.Thread( + target = self.autorestart_preempted_node, + args = (nfs_nodename,) + ) + self.NFS_monitor_thread.start() + self.NFS_server_ready = True def mount_NFS(self): @@ -270,10 +310,24 @@ def get_latest_image(self, image_family = None): return gce.images().getFromFamily(family = image_family, project = self.config["project"]).execute() def invoke(self, command, interactive = False): - return_code, (stdout, stderr) = self.container().exec_run( - command, demux = True, tty = interactive, stdin = interactive - ) - return (return_code, io.BytesIO(stdout), io.BytesIO(stderr)) + if self.container is not None and self.container().status == "running": + return_code, (stdout, stderr) = self.container().exec_run( + command, demux = True, tty = interactive, stdin = interactive + ) + return (return_code, io.BytesIO(stdout), io.BytesIO(stderr)) + else: + return (1, io.BytesIO(), io.BytesIO(b"Container is not running!")) + + def autorestart_preempted_node(self, nodename): + while not self.NFS_monitor_lock.is_set(): + try: + inst_details = self._pzw(gce.instances().get)(instance = nodename).execute() + if inst_details["status"] != "RUNNING": + self._pzw(gce.instances().start)(instance = nodename).execute() + except: + print("Error querying NFS server status; retrying in 60s ...", file = sys.stderr) + + time.sleep(60) # }}} diff --git a/canine/backends/imageTransient.py b/canine/backends/imageTransient.py index 465ff48c..665baf5e 100644 --- a/canine/backends/imageTransient.py +++ b/canine/backends/imageTransient.py @@ -1,5 +1,6 @@ # vim: set expandtab: +import time import typing import subprocess import os @@ -9,6 +10,7 @@ from ..utils import get_default_gcp_project, gcp_hourly_cost import googleapiclient.discovery as gd +import googleapiclient.errors import pandas as pd gce = gd.build('compute', 'v1') @@ -257,7 +259,7 @@ def init_nodes(self): print("WARNING: couldn't shutdown instance {}".format(node), file = sys.stderr) print(e) - def stop(self, action_on_stop = None): + def stop(self, action_on_stop = None, kill_straggling_jobs = True): """ Delete or stop (default) compute instances """ @@ -265,8 +267,26 @@ def stop(self, action_on_stop = None): action_on_stop = self.config["action_on_stop"] # - # stop, delete, or leave running compute nodes + # kill any still-running jobs + if kill_straggling_jobs: + try: + self.scancel(jobID = "", user = self.config["user"]) + + # wait for jobs to finish + print("Terminating all jobs ... ", end = "", flush = True) + tot_time = 0 + while True: + if self.squeue().empty or tot_time > 60: + break + tot_time += 1 + time.sleep(1) + print("done") + except Exception as e: + print("Error terminating all jobs!", file = sys.stderr) + print(e, file = sys.stderr) + # + # stop, delete, or leave running compute nodes for node in self.nodes.index: try: if action_on_stop == "delete": @@ -277,6 +297,10 @@ def stop(self, action_on_stop = None): else: # default behavior is to shut down self._pzw(gce.instances().stop)(instance = node).execute() + except googleapiclient.errors.HttpError as e: + if e.resp != 404: + print("WARNING: couldn't shutdown instance {}".format(node), file = sys.stderr) + print(e) except Exception as e: print("WARNING: couldn't shutdown instance {}".format(node), file = sys.stderr) print(e) @@ -292,11 +316,11 @@ def list_instances_all_zones(self): for x in zone_dict["items"] ], axis = 0).reset_index(drop = True) - def wait_for_cluster_ready(self): + def wait_for_cluster_ready(self, elastic = False): """ Blocks until the main partition is marked as up """ - super().wait_for_cluster_ready(elastic = False) + super().wait_for_cluster_ready(elastic = elastic) # a handy wrapper to automatically add this instance's project and zone to # GCP API calls diff --git a/canine/backends/local.py b/canine/backends/local.py index 7c09de9a..fecd9e93 100644 --- a/canine/backends/local.py +++ b/canine/backends/local.py @@ -3,6 +3,7 @@ import io import sys import subprocess +import shutil from .base import AbstractSlurmBackend, AbstractTransport from ..utils import ArgumentHelper, check_call from agutil import StdOutAdapter @@ -98,6 +99,14 @@ def walk(self, path: str) -> typing.Generator[typing.Tuple[str, typing.List[str] """ yield from os.walk(path) + def _rmtree(self, path: str, pathstat: os.stat_result): + """ + (Internal) + Recursively remove the directory tree rooted at the given path. + Automatically retries failures after a brief timeout + """ + shutil.rmtree(path) + class LocalSlurmBackend(AbstractSlurmBackend): """ SLURM backend for interacting with a local slurm node diff --git a/canine/localization/base.py b/canine/localization/base.py index 502de07a..3d3d73ec 100644 --- a/canine/localization/base.py +++ b/canine/localization/base.py @@ -437,7 +437,7 @@ def prepare_job_inputs(self, jobId: str, job_inputs: typing.Dict[str, str], comm 'stream', value ) - elif mode == 'localize': + elif mode in ['localize', 'symlink']: self.inputs[jobId][arg] = Localization( None, self.reserve_path('jobs', jobId, 'inputs', os.path.basename(os.path.abspath(value))) diff --git a/canine/localization/local.py b/canine/localization/local.py index 3f986151..2b6de42b 100644 --- a/canine/localization/local.py +++ b/canine/localization/local.py @@ -40,6 +40,7 @@ def __init__( super().__init__(backend, transfer_bucket, common, staging_dir, mount_path, project) self.queued_gs = [] # Queued gs:// -> remote staging transfers self.queued_batch = [] # Queued local -> remote directory transfers + self._has_localized = False def localize_file(self, src: str, dest: PathType, transport: typing.Optional[AbstractTransport] = None): """ @@ -47,20 +48,36 @@ def localize_file(self, src: str, dest: PathType, transport: typing.Optional[Abs gs:// files are queued for later transfer local files are symlinked to the staging directory """ - if src.startswith('gs://'): - self.queued_gs.append(( - src, - dest.controllerpath, - 'remote' - )) - elif os.path.exists(src): - src = os.path.abspath(src) - if not os.path.isdir(os.path.dirname(dest.localpath)): - os.makedirs(os.path.dirname(dest.localpath)) - if os.path.isfile(src): - os.symlink(src, dest.localpath) - else: - self.queued_batch.append((src, os.path.join(dest.controllerpath, os.path.basename(src)))) + if not self._has_localized: + if src.startswith('gs://'): + self.queued_gs.append(( + src, + dest.controllerpath, + 'remote' + )) + elif os.path.exists(src): + src = os.path.abspath(src) + if not os.path.isdir(os.path.dirname(dest.localpath)): + os.makedirs(os.path.dirname(dest.localpath)) + if os.path.isfile(src): + os.symlink(src, dest.localpath) + else: + self.queued_batch.append((src, os.path.join(dest.controllerpath, os.path.basename(src)))) + else: + warnings.warn("BatchedLocalizer.localize_file called after main localization. Ignoring normal handling and sending over transport") + with self.transport_context(transport) as transport: + if not transport.isdir(os.path.dirname(dest.controllerpath)): + transport.makedirs(os.path.dirname(dest.controllerpath)) + if src.startswith('gs://'): + self.gs_copy( + src, + dest.controllerpath, + 'remote' + ) + elif os.path.isfile(src): + transport.send(src, dest.controllerpath) + else: + transport.sendtree(src, dest.controllerpath) def __enter__(self): """ @@ -118,13 +135,14 @@ def localize(self, inputs: typing.Dict[str, typing.Dict[str, str]], patterns: ty self.sendtree( self.local_dir, self.staging_dir, - transport + transport,exist_okay=True ) staging_dir = self.finalize_staging_dir(inputs.keys(), transport=transport) for src, dest, context in self.queued_gs: self.gs_copy(src, dest, context) for src, dest in self.queued_batch: self.sendtree(src, os.path.dirname(dest)) + self._has_localized = True return staging_dir class LocalLocalizer(BatchedLocalizer): @@ -147,17 +165,33 @@ def localize_file(self, src: str, dest: PathType, transport: typing.Optional[Abs gs:// files are queued for later transfer local files are symlinked to the staging directory """ - if src.startswith('gs://'): - self.gs_copy( - src, - dest.localpath, - 'local' - ) - elif os.path.exists(src): - src = os.path.abspath(src) - if not os.path.isdir(os.path.dirname(dest.localpath)): - os.makedirs(os.path.dirname(dest.localpath)) - if os.path.isfile(src): - os.symlink(src, dest.localpath) - else: - self.queued_batch.append((src, os.path.join(dest.controllerpath, os.path.basename(src)))) + if self._has_localized: + if src.startswith('gs://'): + self.gs_copy( + src, + dest.localpath, + 'local' + ) + elif os.path.exists(src): + src = os.path.abspath(src) + if not os.path.isdir(os.path.dirname(dest.localpath)): + os.makedirs(os.path.dirname(dest.localpath)) + if os.path.isfile(src): + os.symlink(src, dest.localpath) + else: + self.queued_batch.append((src, os.path.join(dest.controllerpath, os.path.basename(src)))) + else: + warnings.warn("LocalLocalizer.localize_file called after main localization. Ignoring normal handling and sending over transport") + with self.transport_context(transport) as transport: + if not transport.isdir(os.path.dirname(dest.controllerpath)): + transport.makedirs(os.path.dirname(dest.controllerpath)) + if src.startswith('gs://'): + self.gs_copy( + src, + dest.controllerpath, + 'remote' + ) + elif os.path.isfile(src): + transport.send(src, dest.controllerpath) + else: + transport.sendtree(src, dest.controllerpath) diff --git a/canine/localization/nfs.py b/canine/localization/nfs.py index 4cf295ac..967948c8 100644 --- a/canine/localization/nfs.py +++ b/canine/localization/nfs.py @@ -86,15 +86,7 @@ def localize_file(self, src: str, dest: PathType, transport: typing.Optional[Abs # # check if self.mount_path, self.local_dir, and src all exist on the same NFS share # symlink if yes, copy if no - vols = subprocess.check_output( - "df {} {} {} | awk 'NR > 1 {{ print $1 }}'".format( - self.mount_path, - self.local_dir, - src - ), - shell = True - ) - if len(set(vols.decode("utf-8").rstrip().split("\n"))) == 1: + if self.same_volume(src): os.symlink(src, dest.localpath) else: if os.path.isfile(src): @@ -116,6 +108,20 @@ def localize(self, inputs: typing.Dict[str, typing.Dict[str, str]], patterns: ty """ if overrides is None: overrides = {} + + # automatically override inputs that are absolute paths residing on the same + # NFS share and are not Canine outputs + + # XXX: this can be potentially slow, since it has to iterate over every + # single input. It would make more sense to do this before the adapter + # converts raw inputs. + for input_dict in inputs.values(): + for k, v in input_dict.items(): + if k not in overrides: + if re.match(r"^/", v) is not None and self.same_volume(v) and \ + re.match(r".*/outputs/\d+/.*", v) is None: + overrides[k] = None + overrides = {k:v.lower() if isinstance(v, str) else None for k,v in overrides.items()} with self.backend.transport() as transport: if self.common: @@ -269,3 +275,15 @@ def finalize_staging_dir(self, jobs: typing.Iterable[str], transport: typing.Opt if len(jobs) and not os.path.isdir(controller_env['CANINE_OUTPUT']): os.mkdir(controller_env['CANINE_OUTPUT']) return self.staging_dir + + def same_volume(self, *args): + """ + Check if *args are stored on the same NFS mount as the output directory. + """ + vols = subprocess.check_output( + "df {} | awk 'NR > 1 {{ print $1 }}'".format( + " ".join([shlex.quote(x) for x in [self.mount_path, self.local_dir, *args]]) + ), + shell = True + ) + return len(set(vols.decode("utf-8").rstrip().split("\n"))) == 1 diff --git a/canine/orchestrator.py b/canine/orchestrator.py index e3d84fd4..77bae371 100644 --- a/canine/orchestrator.py +++ b/canine/orchestrator.py @@ -10,6 +10,7 @@ from .localization import AbstractLocalizer, BatchedLocalizer, LocalLocalizer, RemoteLocalizer, NFSLocalizer from .utils import check_call import yaml +import numpy as np import pandas as pd from agutil import status_bar version = '0.7.1' @@ -70,6 +71,14 @@ def stringify(obj: typing.Any) -> typing.Any: key:Orchestrator.stringify(val) for key, val in obj.items() } + elif isinstance(obj, pd.core.series.Series): + return [ + Orchestrator.stringify(elem) + for elem in obj.tolist() + ] + elif isinstance(obj, pd.core.frame.DataFrame): + return Orchestrator.stringify(obj.to_dict(orient = "list")) + return str(obj) @staticmethod @@ -101,7 +110,12 @@ def fill_config(cfg: typing.Union[str, typing.Dict[str, typing.Any]]) -> typing. return cfg - def __init__(self, config: typing.Union[str, typing.Dict[str, typing.Any]]): + def __init__(self, config: typing.Union[ + str, + typing.Dict[str, typing.Any], + pd.core.frame.DataFrame, + pd.core.series.Series + ]): """ Initializes the Orchestrator from a given config """ @@ -173,6 +187,10 @@ def __init__(self, config: typing.Union[str, typing.Dict[str, typing.Any]]): if 'stderr' not in self.raw_outputs: self.raw_outputs['stderr'] = '../stderr' + # placeholder for dataframe containing previous results that were + # job avoided + self.df_avoided = None + def run_pipeline(self, output_dir: str = 'canine_output', dry_run: bool = False) -> pd.DataFrame: """ Runs the configured pipeline @@ -257,7 +275,8 @@ def run_pipeline(self, output_dir: str = 'canine_output', dry_run: bool = False) localizer.clean_on_exit = False raise finally: - if len(completed_jobs): + # Check if fully job-avoided so we still delocalize + if batch_id == -2 or len(completed_jobs): print("Delocalizing outputs") outputs = localizer.delocalize(self.raw_outputs, output_dir) print("Parsing output data") @@ -271,7 +290,7 @@ def run_pipeline(self, output_dir: str = 'canine_output', dry_run: bool = False) runtime/3600, node_uptime=sum(uptime.values())/120 )[0]) - job_cost = self.backend.estimate_cost(job_cpu_time=df[('job', 'cpu_hours')].to_dict())[1] + job_cost = self.backend.estimate_cost(job_cpu_time=df[('job', 'cpu_seconds')].to_dict())[1] df['est_cost'] = [job_cost[job_id] for job_id in df.index] if job_cost is not None else [0] * len(df) except: traceback.print_exc() @@ -356,49 +375,62 @@ def wait_for_jobs_to_finish(self, batch_id): return completed_jobs, cpu_time, uptime, prev_acct def make_output_DF(self, batch_id, outputs, cpu_time, prev_acct, localizer = None) -> pd.DataFrame: - try: - acct = self.backend.sacct(job=batch_id) - - df = pd.DataFrame.from_dict( - data={ - job_id: { - ('job', 'slurm_state'): acct['State'][batch_id+'_'+job_id], - ('job', 'exit_code'): acct['ExitCode'][batch_id+'_'+job_id], - ('job', 'cpu_hours'): (prev_acct['CPUTimeRAW'][batch_id+'_'+job_id] + ( - cpu_time[batch_id+'_'+job_id] if batch_id+'_'+job_id in cpu_time else 0 - ))/3600 if prev_acct is not None else -1, - **{ ('inputs', key) : val for key, val in self.job_spec[job_id].items() }, - **{ - ('outputs', key) : val[0] if isinstance(val, list) and len(val) == 1 else val - for key, val in outputs[job_id].items() + df = pd.DataFrame() + if batch_id != -2: + try: + acct = self.backend.sacct(job=batch_id) + + df = pd.DataFrame.from_dict( + data={ + job_id: { + ('job', 'slurm_state'): acct['State'][batch_id+'_'+str(array_id)], + ('job', 'exit_code'): acct['ExitCode'][batch_id+'_'+str(array_id)], + ('job', 'cpu_seconds'): (prev_acct['CPUTimeRAW'][batch_id+'_'+str(array_id)] + ( + cpu_time[batch_id+'_'+str(array_id)] if batch_id+'_'+str(array_id) in cpu_time else 0 + )) if prev_acct is not None else -1, + **{ ('inputs', key) : val for key, val in self.job_spec[job_id].items() }, + **{ + ('outputs', key) : val[0] if isinstance(val, list) and len(val) == 1 else val + for key, val in outputs[job_id].items() + } } - } - for job_id in self.job_spec - }, - orient = "index" - ).rename_axis(index = "_job_id").astype({('job', 'cpu_hours'): int}) - - # - # apply functions to output columns (if any) - if len(self.output_map) > 0: - # columns that receive no (i.e., identity) transformation - identity_map = { x : lambda y : y for x in set(df.columns.get_loc_level("outputs")[1]) - self.output_map.keys() } - - # we get back all columns from the dataframe by aggregating columns - # that don't receive any transformation with transformed columns - df["outputs"] = df["outputs"].agg({ **self.output_map, **identity_map }) - except: - df = pd.DataFrame() + for array_id, job_id in enumerate(self.job_spec) + }, + orient = "index" + ).rename_axis(index = "_job_id").astype({('job', 'cpu_seconds'): int}) - if isinstance(localizer, AbstractLocalizer): - fname = "results.k9df.pickle" - df.to_pickle(fname) - localizer.localize_file(fname, localizer.reserve_path(localizer.staging_dir, "results.k9df.pickle")) - os.remove(fname) + # + # apply functions to output columns (if any) + if len(self.output_map) > 0: + # columns that receive no (i.e., identity) transformation + identity_map = { x : lambda y : y for x in set(df.columns.get_loc_level("outputs")[1]) - self.output_map.keys() } + + # we get back all columns from the dataframe by aggregating columns + # that don't receive any transformation with transformed columns + df["outputs"] = df["outputs"].agg({ **self.output_map, **identity_map }) + except: + traceback.print_exc() + + # concatenate with any previously existing job avoided results + if self.df_avoided is not None: + df = pd.concat([df, self.df_avoided]).sort_index() + + # save DF to disk + if isinstance(localizer, AbstractLocalizer): + with localizer.transport_context() as transport: + dest = localizer.reserve_path("results.k9df.pickle").controllerpath + if not transport.isdir(os.path.dirname(dest)): + transport.makedirs(os.path.dirname(dest)) + with transport.open(dest, 'wb') as w: + df.to_pickle(w, compression=None) return df - def submit_batch_job(self, entrypoint_path, compute_env, extra_sbatch_args = {}): + def submit_batch_job(self, entrypoint_path, compute_env, extra_sbatch_args = {}) -> int: + # this job was avoided + if len(self.job_spec) == 0: + return -2 + batch_id = self.backend.sbatch( entrypoint_path, **{ @@ -414,33 +446,99 @@ def submit_batch_job(self, entrypoint_path, compute_env, extra_sbatch_args = {}) return batch_id - def job_avoid(self, localizer, overwrite = False): #TODO: add params for type of avoidance (force, only if failed, etc.) - # is there preexisting output? - df_path = localizer.reserve_path(localizer.staging_dir, "results.k9df.pickle") - if os.path.exists(df_path.localpath): - # load in results and job spec dataframes - r_df = pd.read_pickle(df_path.localpath) - js_df = pd.DataFrame.from_dict(self.job_spec, orient = "index").rename_axis(index = "_job_id") - js_df.columns = pd.MultiIndex.from_product([["inputs"], js_df.columns]) - - r_df = r_df.reset_index(col_level = 0, col_fill = "_job_id") - js_df = js_df.reset_index(col_level = 0, col_fill = "_job_id") - - # check if jobs are compatible: they must have identical inputs and index, - # and output columns must be matching - if not r_df["inputs"].columns.equals(js_df["inputs"].columns): - raise ValueError("Cannot job avoid; set of input parameters do not match") - - r_df_inputs = r_df[["inputs", "_job_id"]].droplevel(0, axis = 1) - js_df_inputs = js_df[["inputs", "_job_id"]].droplevel(0, axis = 1) - - merge_df = r_df_inputs.join(js_df_inputs, how = "outer", lsuffix = "__r", rsuffix = "__js") - merge_df.columns = pd.MultiIndex.from_tuples([x.split("__")[::-1] for x in merge_df.columns]) - - if not merge_df["r"].equals(merge_df["js"]): - raise ValueError("Cannot job avoid; values of input parameters do not match") - - # if jobs are indeed compatible, we can figure out which need to be re-run - r_df.loc[r_df[("job", "slurm_state")] == "FAILED"] - - # destroy their output directories + def job_avoid(self, localizer: AbstractLocalizer, overwrite: bool = False) -> int: #TODO: add params for type of avoidance (force, only if failed, etc.) + """ + Detects jobs which have previously been run in this staging directory. + Succeeded jobs are skipped. Failed jobs are reset and rerun + """ + with localizer.transport_context() as transport: + df_path = localizer.reserve_path("results.k9df.pickle").controllerpath + + #remove all output if specified + if overwrite: + if transport.isdir(localizer.staging_dir): + transport.rmtree(localizer.staging_dir) + transport.makedirs(localizer.staging_dir) + return 0 + + # check for preexisting jobs' output + if transport.exists(df_path): + try: + # load in results and job spec dataframes + with transport.open(df_path) as r: + r_df = pd.read_pickle(r, compression=None) + js_df = pd.DataFrame.from_dict(self.job_spec, orient = "index").rename_axis(index = "_job_id") + + if r_df.empty or \ + "inputs" not in r_df or \ + ("outputs" not in r_df and len(self.raw_outputs) > 0): + raise ValueError("Could not recover previous job results!") + + # check if jobs are compatible: they must have identical inputs and index, + # and output columns must be matching + if not (r_df["inputs"].columns.isin(js_df.columns).all() and \ + js_df.columns.isin(r_df["inputs"].columns).all()): + r_df_set = set(r_df["inputs"].columns) + js_df_set = set(js_df.columns) + raise ValueError( + "Cannot job avoid; sets of input parameters do not match! Parameters unique to:\n" + \ + "\u21B3saved: " + ", ".join(r_df_set - js_df_set) + "\n" + \ + "\u21B3job: " + ", ".join(js_df_set - r_df_set) + ) + + output_temp = pd.Series(index = self.raw_outputs.keys()) + if not (r_df["outputs"].columns.isin(output_temp.index).all() and \ + output_temp.index.isin(r_df["outputs"].columns).all()): + r_df_set = set(r_df["outputs"].columns) + o_df_set = set(output_temp.index) + raise ValueError( + "Cannot job avoid; sets of output parameters do not match! Parameters unique to:\n" + \ + "\u21B3saved: " + ", ".join(r_df_set - o_df_set) + "\n" + \ + "\u21B3job: " + ", ".join(o_df_set - r_df_set) + ) + + # check that values of inputs are the same + # we have to sort because the order of jobs might differ for the same + # inputs + sort_cols = r_df.columns.to_series()["inputs"] + r_df = r_df.sort_values(sort_cols.tolist()) + js_df = js_df.sort_values(sort_cols.index.tolist()) + + if not r_df["inputs"].equals(js_df): + raise ValueError("Cannot job avoid; values of input parameters do not match!") + + # if all is well, figure out which jobs need to be re-run + fail_idx = r_df[("job", "slurm_state")] == "FAILED" + self.df_avoided = r_df.loc[~fail_idx] + + # remove jobs that don't need to be re-run from job_spec + for k in r_df.index[~fail_idx]: + self.job_spec.pop(k, None) + + # remove output directories of failed jobs + for k in self.job_spec: + transport.rmtree( + localizer.reserve_path('jobs', k).controllerpath + ) + + # we also have to remove the common inputs directory, so that + # the localizer can regenerate it + if len(self.job_spec) > 0: + transport.rmtree( + localizer.reserve_path('common').controllerpath + ) + + return np.count_nonzero(~fail_idx) + except (ValueError, OSError) as e: + print(e) + print("Overwriting output and aborting job avoidance.") + transport.rmtree(localizer.staging_dir) + transport.makedirs(localizer.staging_dir) + return 0 + + # if the output directory exists but there's no output dataframe, assume + # it's corrupted and remove it + elif transport.isdir(localizer.staging_dir): + transport.rmtree(localizer.staging_dir) + transport.makedirs(localizer.staging_dir) + return 0 diff --git a/canine/utils.py b/canine/utils.py index 3c27352a..00d627c6 100644 --- a/canine/utils.py +++ b/canine/utils.py @@ -10,6 +10,8 @@ from subprocess import CalledProcessError import google.auth import paramiko +import shutil +import time class ArgumentHelper(dict): """ @@ -264,3 +266,5 @@ def gcp_hourly_cost(mtype: str, preemptible: bool = False, ssd_size: int = 0, hd else (gpu_pricing[gpu_type][1 if preemptible else 0] * gpu_count) ) ) + +# rmtree_retry removed in favor of AbstractTransport.rmtree diff --git a/docs/canine/backends.html b/docs/canine/backends.html index dbc1c761..7283a718 100644 --- a/docs/canine/backends.html +++ b/docs/canine/backends.html @@ -239,6 +239,13 @@
rmtree
(path: str, max_retries: int = 5, timeout: int = 5)¶Recursively remove the directory tree rooted at the given path. +Automatically retries failures after a brief timeout
+send
(localfile: Union[str, IO], remotefile: Union[str, IO])¶rmtree
(path: str, max_retries: int = 5, timeout: int = 5)¶Recursively remove the directory tree rooted at the given path. +Automatically retries failures after a brief timeout
+send
(localfile: Union[str, IO], remotefile: Union[str, IO])¶stop
(action_on_stop=None)¶stop
(action_on_stop=None, kill_straggling_jobs=True)¶
Delete or stop (default) compute instances
wait_for_cluster_ready
()¶wait_for_cluster_ready
(elastic=False)¶
Blocks until the main partition is marked as up
canine.backends.
DockerTransientImageSlurmBackend
(nfs_compute_script='/usr/local/share/cga_pipeline/src/provision_storage_container_host.sh', compute_script='/usr/local/share/cga_pipeline/src/provision_worker_container_host.sh', nfs_disk_size=2000, nfs_disk_type='pd-standard', nfs_action_on_stop='stop', nfs_image='', action_on_stop='delete', image_family='pydpiper', image=None, cluster_name=None, clust_frac=0.01, user='aarong', **kwargs)¶canine.backends.
DockerTransientImageSlurmBackend
(nfs_compute_script='/usr/local/share/slurm_gcp_docker/src/provision_storage_container_host.sh', compute_script='/usr/local/share/slurm_gcp_docker/src/provision_worker_container_host.sh', nfs_disk_size=2000, nfs_disk_type='pd-standard', nfs_action_on_stop='stop', nfs_image='', action_on_stop='delete', image_family='slurm-gcp-docker', image=None, cluster_name=None, clust_frac=0.01, user='aarong', **kwargs)¶
estimate_cost
(clock_uptime: Optional[float] = None, node_uptime: Optional[float] = None, job_cpu_time: Optional[Dict[str, float]] = None) → Tuple[float, Optional[Dict[str, float]]]¶wait_for_cluster_ready
()¶wait_for_cluster_ready
(elastic=False)¶
Blocks until the main partition is marked as up
canine.
Orchestrator
(config: Union[str, Dict[str, Any]])¶canine.
Orchestrator
(config: Union[str, Dict[str, Any], pandas.core.frame.DataFrame, pandas.core.series.Series])¶
Main class Parses a configuration object, initializes, runs, and cleans up a Canine Pipeline
job_avoid
(localizer: canine.localization.base.AbstractLocalizer, overwrite: bool = False) → int¶Detects jobs which have previously been run in this staging directory. +Succeeded jobs are skipped. Failed jobs are reset and rerun
+run_pipeline
(output_dir: str = 'canine_output', dry_run: bool = False) → pandas.core.frame.DataFrame¶same_volume
(*args)¶Check if *args are stored on the same NFS mount as the output directory.
+sendtree
(src: str, dest: str, transport: Optional[canine.backends.base.AbstractTransport] = None, exist_okay=False)¶job_avoid
(localizer: canine.localization.base.AbstractLocalizer, overwrite: bool = False) → int¶Detects jobs which have previously been run in this staging directory. +Succeeded jobs are skipped. Failed jobs are reset and rerun
+run_pipeline
(dry_run: bool = False) → pandas.core.frame.DataFrame¶- | + |
S |
|