From 29341002a54e61f4ca8c03b0ff7942f9f746d618 Mon Sep 17 00:00:00 2001 From: jchen6727 Date: Tue, 10 Sep 2024 15:24:33 -0500 Subject: [PATCH] file cleanup, slurm scripts and timeout added to batchtools submit templates (#831) see netpyne.batchtools esp. submits.py and search.py slurm job submission: search() now supports job_type='slurm' with associated 'ifs' and 'socket' communication modes the run_config for these submission scripts should specify the following arguments: 'allocation' (allocation for the job) 'walltime' (time limit before job termination) 'nodes' (# nodes for job) 'coresPerNode' (number of cores per job--i.e. the mpiexec -n value) 'email' (user email) 'custom' (any custom commands to execute prior to sourcing ~/.bashrc) 'command' (execution of script, command to be run from the project directory to run the script) update to see submission: run_config for the submission now includes the following argument 'realtime' (time limit before job termination) --- .../rosenbrock/basic_rosenbrock/cma_batch.py | 5 +- netpyne/batchtools/search.py | 97 ++-------------- netpyne/batchtools/submits.py | 105 +++++++++++++++++- 3 files changed, 116 insertions(+), 91 deletions(-) diff --git a/netpyne/batchtools/examples/rosenbrock/basic_rosenbrock/cma_batch.py b/netpyne/batchtools/examples/rosenbrock/basic_rosenbrock/cma_batch.py index 9bde349c6..7f0be4e1a 100644 --- a/netpyne/batchtools/examples/rosenbrock/basic_rosenbrock/cma_batch.py +++ b/netpyne/batchtools/examples/rosenbrock/basic_rosenbrock/cma_batch.py @@ -8,6 +8,7 @@ cwd = os.getcwd() +# evaluation def eval_rosenbrock(x0, x1, tid): cfg = { 'x0': x0, @@ -18,8 +19,10 @@ def eval_rosenbrock(x0, x1, tid): label = 'rosenbrock' return float(trial(cfg, label, tid, Dispatcher, cwd, '../cma', submit)['fx']) -#data = eval_rosenbrock(1, 1, "x11") + + +# suggestor optimizer = CMA(mean=numpy.zeros(2), sigma=1.0) for generation in range(3): solutions = [] diff --git a/netpyne/batchtools/search.py b/netpyne/batchtools/search.py index 1b791c1d4..7765e2dda 100644 --- a/netpyne/batchtools/search.py +++ b/netpyne/batchtools/search.py @@ -11,8 +11,7 @@ from batchtk.utils import get_path import numpy from typing import Any, Callable, Dict, List, Optional, Tuple, Union - - +import submits choice = tune.choice grid = tune.grid_search @@ -216,12 +215,14 @@ def run(config): #should be constant? constructors = namedtuple('constructors', 'dispatcher, submit') constructor_tuples = { - ('sge', 'socket'): constructors(runtk.dispatchers.INETDispatcher, runtk.submits.SGESubmitSOCK), + ('sge', 'socket'): constructors(runtk.dispatchers.INETDispatcher, submits.SGESubmitSOCK), #('sge', 'unix'): constructors(runtk.dispatchers.UNIXDispatcher, runtk.submits.SGESubmitSOCK), #can't use AF_UNIX sockets on networked machines - ('sge', 'sfs' ): constructors(runtk.dispatchers.SFSDispatcher , runtk.submits.SGESubmitSFS ), + ('sge', 'sfs' ): constructors(runtk.dispatchers.SFSDispatcher , submits.SGESubmitSFS ), #('zsh', 'inet'): constructors(runtk.dispatchers.INETDispatcher, runtk.submits.ZSHSubmitSOCK), #TODO preferable to use AF_UNIX sockets on local machines - ('sh', 'socket'): constructors(runtk.dispatchers.UNIXDispatcher, runtk.submits.SHSubmitSOCK), - ('sh', 'sfs' ): constructors(runtk.dispatchers.SFSDispatcher , runtk.submits.SHSubmitSFS ), + ('slurm', 'socket'): constructors(runtk.dispatchers.INETDispatcher, submits.SLURMSubmitSOCK), + ('slurm', 'sfs' ): constructors(runtk.dispatchers.SFSDispatcher , submits.SLURMSubmitSFS ), + ('sh', 'socket'): constructors(runtk.dispatchers.UNIXDispatcher, submits.SHSubmitSOCK), + ('sh', 'sfs' ): constructors(runtk.dispatchers.SFSDispatcher , submits.SHSubmitSFS ), }#TODO, just say "socket"? """ @@ -237,19 +238,14 @@ def generate_constructors(job_type, comm_type = 'socket', **kwargs): def generate_parameters(params, algorithm, **kwargs): """ - - Parameters - ---------- + returns a dictionary of parameters for ray_search based on the input dictionary + from NOTES Salvador: params = {'synMechTau2': [3.0, 5.0, 7.0], # assumes list of values by default if grid search-like algo #'synMechTau2': [3.0, 7.0], # assumes lower/upper bounds by default if evol-like algo 'connWeight' : paramtypes.sample_from(lambda _: numpy.random.uniform(0.005, 0.15))} # can optionally pass any of the paramtypes (= ray.tune data types) - Returns - ------- - a dictionary of parameters for ray_search based on the input dictionary - + #TODO: check coverage of conditional statements (looks okay?) """ - #TODO: bloated function, prone to error ray_params = {} for param, space in params.items(): if isinstance(space, (list, tuple, range, numpy.ndarray)) and algorithm in {'variant_generator'}: @@ -334,79 +330,6 @@ def search(job_type: str, # the submission engine to run a single simulation (e. """ -def ray_search(dispatcher_constructor, submit_constructor, algorithm = "variant_generator", label = 'search', - params = None, concurrency = 1, output_path = '../batch', checkpoint_path = '../ray', - run_config = None, num_samples = 1): - ray.init( - runtime_env={"working_dir": "."}) # needed for python import statements - - #TODO class this object for self calls? cleaner? vs nested functions - #TODO clean up working_dir and excludes - if checkpoint_path[0] == '/': - storage_path = os.path.normpath(checkpoint_path) - elif checkpoint_path[0] == '.': - storage_path = os.path.normpath(os.path.join(os.getcwd(), checkpoint_path)) - else: - raise ValueError("checkpoint_dir must be an absolute path (starts with /) or relative to the current working directory (starts with .)") - algo = create_searcher(algorithm, max_concurrent=concurrency, batch=True) - #algo = ConcurrencyLimiter(searcher=algo, max_concurrent=concurrency, batch=True) - submit = submit_constructor() - submit.update_templates( - **run_config - ) - project_path = os.getcwd() - def run(config): - config.update({'saveFolder': output_path, 'simLabel': LABEL_POINTER}) - data = ray_trial(config, label, dispatcher_constructor, project_path, output_path, submit) - session.report({'data': data, 'config': config}) - - tuner = tune.Tuner( - run, - tune_config=tune.TuneConfig( - search_alg=algo, - num_samples=num_samples, # grid search samples 1 for each param - metric="data" - ), - run_config=RunConfig( - storage_path=storage_path, - name=algorithm, - ), - param_space=params, - ) - - results = tuner.fit() - resultsdf = results.get_dataframe() - resultsdf.to_csv("{}.csv".format(label)) -""" - - - - -""" -from netpyne.batchtools import search, paramtypes -import numpy - -https://docs.ray.io/en/latest/tune/api/doc/ray.tune.search.Searcher.html#ray.tune.search.Searcher - - - -params = {'synMechTau2': [3.0, 5.0, 7.0], # assumes list of values by default if grid search-like algo - #'synMechTau2': [3.0, 7.0], # assumes lower/upper bounds by default if evol-like algo - 'connWeight' : paramtypes.sample_from(lambda _: numpy.random.uniform(0.005, 0.15))} # can optionally pass any of the paramtypes (= ray.tune data types) - -run_config = {'sge': 5, 'command': 'python init.py'} - -#TODO rename ray_search to search -search(dispatcher = 'inet', # defaults to 'inet' if no arg is passed? - submit = 'socket', # defaults to 'socket' if no arg is passed? - params = params, - run_config = run_config, # - algorithm = "variant_generator", - concurrency = 9, - output_path = '../batch_func', - checkpoint_path = '../grid_func', - label = 'func_search', - num_samples = 3) SEE: 'variant_generator' 'random' -> points to variant_generator diff --git a/netpyne/batchtools/submits.py b/netpyne/batchtools/submits.py index 1d27a41cc..d907e4051 100644 --- a/netpyne/batchtools/submits.py +++ b/netpyne/batchtools/submits.py @@ -78,7 +78,7 @@ class SHSubmitSOCK(SHSubmit): runtk.SOCKET: '{sockname}'} class SGESubmit(Submit): - script_args = {'label', 'project_path', 'output_path', 'env', 'command', 'cores', 'vmem', } + script_args = {'label', 'queue', 'cores', 'vmem' 'realtime', 'output_path', 'project_path', 'env', 'command', } script_template = \ """\ #!/bin/bash @@ -118,7 +118,7 @@ def set_handles(self): class SGESubmitSFS(SGESubmit): - script_args = {'label', 'project_path', 'output_path', 'env', 'command', 'cores', 'vmem', } + script_args = {'label', 'queue', 'cores', 'vmem' 'realtime', 'output_path', 'project_path', 'env', 'command', } script_template = \ """\ #!/bin/bash @@ -143,7 +143,7 @@ class SGESubmitSFS(SGESubmit): } class SGESubmitSOCK(SGESubmit): - script_args = {'label', 'project_path', 'output_path', 'env', 'command', 'cores', 'vmem', 'sockname'} + script_args = {'label', 'queue', 'cores', 'vmem' 'realtime', 'output_path', 'project_path', 'sockname', 'env', 'command', } script_template = \ """\ #!/bin/bash @@ -165,3 +165,102 @@ class SGESubmitSOCK(SGESubmit): runtk.SOCKET: '{sockname}' } +class SlurmSubmit(Submit): + script_args = {'label', 'allocation', 'walltime', 'nodes', 'coresPerNode', 'output_path', 'email', 'reservation', 'custom', 'project_path', 'command'} + script_template = \ + """\ +#SBATCH --job-name={label} +#SBATCH -A {allocation} +#SBATCH -t {walltime} +#SBATCH --nodes={nodes} +#SBATCH --ntasks-per-node={coresPerNode} +#SBATCH -o {output_path}/{label}.run +#SBATCH -e {output_path}/{label}.err +#SBATCH --mail-user={email} +#SBATCH --mail-type=end +export JOBID=$SLURM_JOB_ID +{res} +{custom} +source ~/.bashrc +cd {project_path} +{command} +wait +""" + script_handles = {runtk.SUBMIT: '{output_path}/{label}.sh', + runtk.STDOUT: '{output_path}/{label}.run'} + def __init__(self, **kwargs): + super().__init__( + submit_template = Template(template="sbatch {output_path}/{label}.sh", + key_args={'output_path', 'label'}), + script_template = Template(template=self.script_template, + key_args=self.script_args), + handles = self.script_handles, + ) + + def submit_job(self, **kwargs): + proc = super().submit_job() + self.job_id = 0 #TODO, what is the output of an sbatch command? + return self.job_id + #validate job id + #try: + # self.job_id = proc.stdout.split(' ')[2] + #except Exception as e: #not quite sure how this would occur + # raise(Exception("{}\nJob submission failed:\n{}\n{}\n{}\n{}".format(e, self.submit, self.script, proc.stdout, proc.stderr))) + #return self.job_id + + def set_handles(self): + pass + +class SlurmSubmitSFS(SlurmSubmit): + script_args = {'label', 'allocation', 'walltime', 'nodes', 'coresPerNode', 'output_path', 'email', 'custom', 'project_path', 'command'} + script_template = \ + """\ +#SBATCH --job-name={label} +#SBATCH -A {allocation} +#SBATCH -t {walltime} +#SBATCH --nodes={nodes} +#SBATCH --ntasks-per-node={coresPerNode} +#SBATCH -o {output_path}/{label}.run +#SBATCH -e {output_path}/{label}.err +#SBATCH --mail-user={email} +#SBATCH --mail-type=end +export JOBID=$SLURM_JOB_ID +export OUTFILE="{output_path}/{label}.out" +export SGLFILE="{output_path}/{label}.sgl" +{custom} +source ~/.bashrc +cd {project_path} +{command} +wait +""" + script_handles = {runtk.SUBMIT: '{output_path}/{label}.sh', + runtk.STDOUT: '{output_path}/{label}.run', + runtk.MSGOUT: '{output_path}/{label}.out', + runtk.SGLOUT: '{output_path}/{label}.sgl', + } + +class SlurmSubmitSFS(SlurmSubmit): + script_args = {'label', 'allocation', 'walltime', 'nodes', 'coresPerNode', 'output_path', 'email', 'reservation', 'custom', 'project_path', 'command'} + script_template = \ + """\ +#SBATCH --job-name={label} +#SBATCH -A {allocation} +#SBATCH -t {walltime} +#SBATCH --nodes={nodes} +#SBATCH --ntasks-per-node={coresPerNode} +#SBATCH -o {output_path}/{label}.run +#SBATCH -e {output_path}/{label}.err +#SBATCH --mail-user={email} +#SBATCH --mail-type=end +export JOBID=$SLURM_JOB_ID +export SOCNAME="{sockname}" +{custom} +source ~/.bashrc +cd {project_path} +{command} +wait +""" + script_handles = {runtk.SUBMIT: '{output_path}/{label}.sh', + runtk.STDOUT: '{output_path}/{label}.run', + runtk.SOCKET: '{sockname}' + } \ No newline at end of file