From d166411e93197e8df49d2961299f1b62a8b6da0b Mon Sep 17 00:00:00 2001 From: rht Date: Tue, 11 Dec 2018 18:05:09 +0000 Subject: [PATCH] Implement a parallelized version of starter_one --- start.py | 10 ++++++++-- starter_one.py | 52 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 2 deletions(-) create mode 100644 starter_one.py diff --git a/start.py b/start.py index 26f43a0..b734bbd 100644 --- a/start.py +++ b/start.py @@ -169,7 +169,7 @@ def save_simulation(t, sim, sim_param, exit_now=False): exit(0) # main entry point -if __name__ == "__main__": +def entry_point(passed_args=None): """ use argparse to handle command line arguments""" parser = argparse.ArgumentParser(description='Model the Insurance sector') @@ -189,7 +189,8 @@ def save_simulation(t, sim, sim_param, exit_now=False): parser.add_argument("-p", "--showprogress", action="store_true", help="show timesteps") parser.add_argument("-v", "--verbose", action="store_true", help="more detailed output") parser.add_argument("--save_iterations", type=int, help="number of iterations to iterate before saving world state") - args = parser.parse_args() + parser.add_argument("--multiprocess", action="store_true", help="run this script as part of a parallel computation") + args = parser.parse_args(passed_args) if args.abce: isleconfig.use_abce = True @@ -238,6 +239,9 @@ def save_simulation(t, sim, sim_param, exit_now=False): [general_rc_event_schedule, general_rc_event_damage, np_seeds, random_seeds] = setup.obtain_ensemble(1) #Only one ensemble. This part will only be run locally (laptop). log = main(simulation_parameters, general_rc_event_schedule[0], general_rc_event_damage[0], np_seeds[0], random_seeds[0], save_iter) + + if args.multiprocess: + return log """ Restore the log at the end of the single simulation run for saving and for potential further study """ L = logger.Logger() @@ -248,3 +252,5 @@ def save_simulation(t, sim, sim_param, exit_now=False): CS = calibrationscore.CalibrationScore(L) score = CS.test_all() +if __name__ == "__main__": + entry_point() diff --git a/starter_one.py b/starter_one.py new file mode 100644 index 0000000..d016862 --- /dev/null +++ b/starter_one.py @@ -0,0 +1,52 @@ +import os +import datetime +import logger +import multiprocessing as mp + +import numpy as np + +from start import entry_point +import calibrationscore + +# Rename files +LOG_TYPES = 'operational contracts cash reinoperational reincontracts reincash premium'.split(' ') +timestamp = datetime.datetime.now().strftime('%Y_%h_%d_%H_%M') +for log_type in LOG_TYPES: + fname = 'data/one_%s.dat' % log_type + try: + os.rename(fname, fname + timestamp) + except FileNotFoundError: + print('file not found:', fname) + +# Now run the ensemble + +N = 300 +N = 10 +PROCS = 4 +# separate into PROCS procceses +epochs = np.array_split(np.arange(N), PROCS) +print([list(e) for e in epochs]) +manager = mp.Manager() +par_logs = manager.list() + +def _fn(_epoch): + logs = [entry_point(('--replicid %d --replicating --oneriskmodel --multiprocess' % i).split(' ')) for i in _epoch] + par_logs.append(logs) + +procs = [] +for epoch in epochs: + proc = mp.Process(target=_fn, args=(epoch,)) + proc.start() + procs.append(proc) + +for p in procs: + p.join() + +for logs in par_logs: + for log in logs: + L = logger.Logger() + L.restore_logger_object(log) + L.save_log(True) + + CS = calibrationscore.CalibrationScore(L) + score = CS.test_all()