Skip to content

Commit

Permalink
file cleanup, slurm scripts and timeout added to batchtools submit te…
Browse files Browse the repository at this point in the history
…mplates (#831)

see netpyne.batchtools esp. submits.py and search.py

slurm job submission:
search() now supports job_type='slurm' with associated 'ifs' and 'socket' communication modes

the run_config for these submission scripts should specify the following arguments:
'allocation' (allocation for the job)
'walltime' (time limit before job termination)
'nodes' (# nodes for job)
'coresPerNode' (number of cores per job--i.e. the mpiexec -n value)
'email' (user email)
'custom' (any custom commands to execute prior to sourcing ~/.bashrc)
'command' (execution of script, command to be run from the project directory to run the script)

update to see submission:
run_config for the submission now includes the following argument 
'realtime' (time limit before job termination)
  • Loading branch information
jchen6727 authored Sep 10, 2024
1 parent 846f0bb commit 2934100
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

cwd = os.getcwd()

# evaluation
def eval_rosenbrock(x0, x1, tid):
cfg = {
'x0': x0,
Expand All @@ -18,8 +19,10 @@ def eval_rosenbrock(x0, x1, tid):
label = 'rosenbrock'
return float(trial(cfg, label, tid, Dispatcher, cwd, '../cma', submit)['fx'])

#data = eval_rosenbrock(1, 1, "x11")



# suggestor
optimizer = CMA(mean=numpy.zeros(2), sigma=1.0)
for generation in range(3):
solutions = []
Expand Down
97 changes: 10 additions & 87 deletions netpyne/batchtools/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@
from batchtk.utils import get_path
import numpy
from typing import Any, Callable, Dict, List, Optional, Tuple, Union


import submits

choice = tune.choice
grid = tune.grid_search
Expand Down Expand Up @@ -216,12 +215,14 @@ def run(config):
#should be constant?
constructors = namedtuple('constructors', 'dispatcher, submit')
constructor_tuples = {
('sge', 'socket'): constructors(runtk.dispatchers.INETDispatcher, runtk.submits.SGESubmitSOCK),
('sge', 'socket'): constructors(runtk.dispatchers.INETDispatcher, submits.SGESubmitSOCK),
#('sge', 'unix'): constructors(runtk.dispatchers.UNIXDispatcher, runtk.submits.SGESubmitSOCK), #can't use AF_UNIX sockets on networked machines
('sge', 'sfs' ): constructors(runtk.dispatchers.SFSDispatcher , runtk.submits.SGESubmitSFS ),
('sge', 'sfs' ): constructors(runtk.dispatchers.SFSDispatcher , submits.SGESubmitSFS ),
#('zsh', 'inet'): constructors(runtk.dispatchers.INETDispatcher, runtk.submits.ZSHSubmitSOCK), #TODO preferable to use AF_UNIX sockets on local machines
('sh', 'socket'): constructors(runtk.dispatchers.UNIXDispatcher, runtk.submits.SHSubmitSOCK),
('sh', 'sfs' ): constructors(runtk.dispatchers.SFSDispatcher , runtk.submits.SHSubmitSFS ),
('slurm', 'socket'): constructors(runtk.dispatchers.INETDispatcher, submits.SLURMSubmitSOCK),
('slurm', 'sfs' ): constructors(runtk.dispatchers.SFSDispatcher , submits.SLURMSubmitSFS ),
('sh', 'socket'): constructors(runtk.dispatchers.UNIXDispatcher, submits.SHSubmitSOCK),
('sh', 'sfs' ): constructors(runtk.dispatchers.SFSDispatcher , submits.SHSubmitSFS ),
}#TODO, just say "socket"?

"""
Expand All @@ -237,19 +238,14 @@ def generate_constructors(job_type, comm_type = 'socket', **kwargs):

def generate_parameters(params, algorithm, **kwargs):
"""
Parameters
----------
returns a dictionary of parameters for ray_search based on the input dictionary
from NOTES Salvador:
params = {'synMechTau2': [3.0, 5.0, 7.0], # assumes list of values by default if grid search-like algo
#'synMechTau2': [3.0, 7.0], # assumes lower/upper bounds by default if evol-like algo
'connWeight' : paramtypes.sample_from(lambda _: numpy.random.uniform(0.005, 0.15))} # can optionally pass any of the paramtypes (= ray.tune data types)
Returns
-------
a dictionary of parameters for ray_search based on the input dictionary
#TODO: check coverage of conditional statements (looks okay?)
"""
#TODO: bloated function, prone to error
ray_params = {}
for param, space in params.items():
if isinstance(space, (list, tuple, range, numpy.ndarray)) and algorithm in {'variant_generator'}:
Expand Down Expand Up @@ -334,79 +330,6 @@ def search(job_type: str, # the submission engine to run a single simulation (e.


"""
def ray_search(dispatcher_constructor, submit_constructor, algorithm = "variant_generator", label = 'search',
params = None, concurrency = 1, output_path = '../batch', checkpoint_path = '../ray',
run_config = None, num_samples = 1):
ray.init(
runtime_env={"working_dir": "."}) # needed for python import statements
#TODO class this object for self calls? cleaner? vs nested functions
#TODO clean up working_dir and excludes
if checkpoint_path[0] == '/':
storage_path = os.path.normpath(checkpoint_path)
elif checkpoint_path[0] == '.':
storage_path = os.path.normpath(os.path.join(os.getcwd(), checkpoint_path))
else:
raise ValueError("checkpoint_dir must be an absolute path (starts with /) or relative to the current working directory (starts with .)")
algo = create_searcher(algorithm, max_concurrent=concurrency, batch=True)
#algo = ConcurrencyLimiter(searcher=algo, max_concurrent=concurrency, batch=True)
submit = submit_constructor()
submit.update_templates(
**run_config
)
project_path = os.getcwd()
def run(config):
config.update({'saveFolder': output_path, 'simLabel': LABEL_POINTER})
data = ray_trial(config, label, dispatcher_constructor, project_path, output_path, submit)
session.report({'data': data, 'config': config})
tuner = tune.Tuner(
run,
tune_config=tune.TuneConfig(
search_alg=algo,
num_samples=num_samples, # grid search samples 1 for each param
metric="data"
),
run_config=RunConfig(
storage_path=storage_path,
name=algorithm,
),
param_space=params,
)
results = tuner.fit()
resultsdf = results.get_dataframe()
resultsdf.to_csv("{}.csv".format(label))
"""




"""
from netpyne.batchtools import search, paramtypes
import numpy
https://docs.ray.io/en/latest/tune/api/doc/ray.tune.search.Searcher.html#ray.tune.search.Searcher
params = {'synMechTau2': [3.0, 5.0, 7.0], # assumes list of values by default if grid search-like algo
#'synMechTau2': [3.0, 7.0], # assumes lower/upper bounds by default if evol-like algo
'connWeight' : paramtypes.sample_from(lambda _: numpy.random.uniform(0.005, 0.15))} # can optionally pass any of the paramtypes (= ray.tune data types)
run_config = {'sge': 5, 'command': 'python init.py'}
#TODO rename ray_search to search
search(dispatcher = 'inet', # defaults to 'inet' if no arg is passed?
submit = 'socket', # defaults to 'socket' if no arg is passed?
params = params,
run_config = run_config, #
algorithm = "variant_generator",
concurrency = 9,
output_path = '../batch_func',
checkpoint_path = '../grid_func',
label = 'func_search',
num_samples = 3)
SEE:
'variant_generator'
'random' -> points to variant_generator
Expand Down
105 changes: 102 additions & 3 deletions netpyne/batchtools/submits.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class SHSubmitSOCK(SHSubmit):
runtk.SOCKET: '{sockname}'}

class SGESubmit(Submit):
script_args = {'label', 'project_path', 'output_path', 'env', 'command', 'cores', 'vmem', }
script_args = {'label', 'queue', 'cores', 'vmem' 'realtime', 'output_path', 'project_path', 'env', 'command', }
script_template = \
"""\
#!/bin/bash
Expand Down Expand Up @@ -118,7 +118,7 @@ def set_handles(self):


class SGESubmitSFS(SGESubmit):
script_args = {'label', 'project_path', 'output_path', 'env', 'command', 'cores', 'vmem', }
script_args = {'label', 'queue', 'cores', 'vmem' 'realtime', 'output_path', 'project_path', 'env', 'command', }
script_template = \
"""\
#!/bin/bash
Expand All @@ -143,7 +143,7 @@ class SGESubmitSFS(SGESubmit):
}

class SGESubmitSOCK(SGESubmit):
script_args = {'label', 'project_path', 'output_path', 'env', 'command', 'cores', 'vmem', 'sockname'}
script_args = {'label', 'queue', 'cores', 'vmem' 'realtime', 'output_path', 'project_path', 'sockname', 'env', 'command', }
script_template = \
"""\
#!/bin/bash
Expand All @@ -165,3 +165,102 @@ class SGESubmitSOCK(SGESubmit):
runtk.SOCKET: '{sockname}'
}

class SlurmSubmit(Submit):
script_args = {'label', 'allocation', 'walltime', 'nodes', 'coresPerNode', 'output_path', 'email', 'reservation', 'custom', 'project_path', 'command'}
script_template = \
"""\
#SBATCH --job-name={label}
#SBATCH -A {allocation}
#SBATCH -t {walltime}
#SBATCH --nodes={nodes}
#SBATCH --ntasks-per-node={coresPerNode}
#SBATCH -o {output_path}/{label}.run
#SBATCH -e {output_path}/{label}.err
#SBATCH --mail-user={email}
#SBATCH --mail-type=end
export JOBID=$SLURM_JOB_ID
{res}
{custom}
source ~/.bashrc
cd {project_path}
{command}
wait
"""
script_handles = {runtk.SUBMIT: '{output_path}/{label}.sh',
runtk.STDOUT: '{output_path}/{label}.run'}
def __init__(self, **kwargs):
super().__init__(
submit_template = Template(template="sbatch {output_path}/{label}.sh",
key_args={'output_path', 'label'}),
script_template = Template(template=self.script_template,
key_args=self.script_args),
handles = self.script_handles,
)

def submit_job(self, **kwargs):
proc = super().submit_job()
self.job_id = 0 #TODO, what is the output of an sbatch command?
return self.job_id
#validate job id
#try:
# self.job_id = proc.stdout.split(' ')[2]
#except Exception as e: #not quite sure how this would occur
# raise(Exception("{}\nJob submission failed:\n{}\n{}\n{}\n{}".format(e, self.submit, self.script, proc.stdout, proc.stderr)))
#return self.job_id

def set_handles(self):
pass

class SlurmSubmitSFS(SlurmSubmit):
script_args = {'label', 'allocation', 'walltime', 'nodes', 'coresPerNode', 'output_path', 'email', 'custom', 'project_path', 'command'}
script_template = \
"""\
#SBATCH --job-name={label}
#SBATCH -A {allocation}
#SBATCH -t {walltime}
#SBATCH --nodes={nodes}
#SBATCH --ntasks-per-node={coresPerNode}
#SBATCH -o {output_path}/{label}.run
#SBATCH -e {output_path}/{label}.err
#SBATCH --mail-user={email}
#SBATCH --mail-type=end
export JOBID=$SLURM_JOB_ID
export OUTFILE="{output_path}/{label}.out"
export SGLFILE="{output_path}/{label}.sgl"
{custom}
source ~/.bashrc
cd {project_path}
{command}
wait
"""
script_handles = {runtk.SUBMIT: '{output_path}/{label}.sh',
runtk.STDOUT: '{output_path}/{label}.run',
runtk.MSGOUT: '{output_path}/{label}.out',
runtk.SGLOUT: '{output_path}/{label}.sgl',
}

class SlurmSubmitSFS(SlurmSubmit):
script_args = {'label', 'allocation', 'walltime', 'nodes', 'coresPerNode', 'output_path', 'email', 'reservation', 'custom', 'project_path', 'command'}
script_template = \
"""\
#SBATCH --job-name={label}
#SBATCH -A {allocation}
#SBATCH -t {walltime}
#SBATCH --nodes={nodes}
#SBATCH --ntasks-per-node={coresPerNode}
#SBATCH -o {output_path}/{label}.run
#SBATCH -e {output_path}/{label}.err
#SBATCH --mail-user={email}
#SBATCH --mail-type=end
export JOBID=$SLURM_JOB_ID
export SOCNAME="{sockname}"
{custom}
source ~/.bashrc
cd {project_path}
{command}
wait
"""
script_handles = {runtk.SUBMIT: '{output_path}/{label}.sh',
runtk.STDOUT: '{output_path}/{label}.run',
runtk.SOCKET: '{sockname}'
}

0 comments on commit 2934100

Please sign in to comment.