diff --git a/pfb/__init__.py b/pfb/__init__.py index 874aee26..46039509 100644 --- a/pfb/__init__.py +++ b/pfb/__init__.py @@ -30,7 +30,7 @@ def set_envs(nthreads, ncpu): os.environ["NUMEXPR_NUM_THREADS"] = str(ne_threads) -def set_client(nworkers, stack, log, +def set_client(nworkers, log, stack=None, host_address=None, direct_to_workers=False): import dask dask.config.set({'distributed.comm.compression': 'lz4'}) @@ -54,10 +54,12 @@ def set_client(nworkers, stack, log, threads_per_worker=1, memory_limit=0, # str(mem_limit/nworkers)+'GB' asynchronous=False) - cluster = stack.enter_context(cluster) + if stack is not None: + cluster = stack.enter_context(cluster) client = Client(cluster, direct_to_workers=direct_to_workers) - client = stack.enter_context(client) + if stack is not None: + client = stack.enter_context(client) client.wait_for_workers(nworkers) dashboard_url = client.dashboard_link diff --git a/pfb/parser/hci.yaml b/pfb/parser/hci.yaml index caa178c1..61c82398 100644 --- a/pfb/parser/hci.yaml +++ b/pfb/parser/hci.yaml @@ -163,6 +163,11 @@ inputs: default: false info: Compute naural gradient + memory_reporting: + dtype: bool + default: false + info: + Report worker memory as tasks complete _include: - (.)gridding.yml diff --git a/pfb/parser/init.yaml b/pfb/parser/init.yaml index e023d1fa..31ae6a37 100644 --- a/pfb/parser/init.yaml +++ b/pfb/parser/init.yaml @@ -121,6 +121,12 @@ inputs: info: Display progress. Use --no-progressbar to deactivate. + memory_reporting: + dtype: bool + default: false + info: + Report worker memory as tasks complete + _include: - (.)out.yml - (.)dist.yml diff --git a/pfb/workers/degrid.py b/pfb/workers/degrid.py index bae20e1f..4c8fe80b 100644 --- a/pfb/workers/degrid.py +++ b/pfb/workers/degrid.py @@ -76,7 +76,7 @@ def degrid(**kw): with ExitStack() as stack: from pfb import set_client - client = set_client(opts.nworkers, stack, log) + client = set_client(opts.nworkers, log, stack) ti = time.time() _degrid(**opts) diff --git a/pfb/workers/fluxmop.py b/pfb/workers/fluxmop.py index 469370a7..5b03e161 100644 --- a/pfb/workers/fluxmop.py +++ b/pfb/workers/fluxmop.py @@ -69,7 +69,7 @@ def fluxmop(**kw): from distributed import wait from pfb import set_client if opts.nworkers > 1: - client = set_client(opts.nworkers, stack, log) + client = set_client(opts.nworkers, log, stack) from distributed import as_completed else: print("Faking client", file=log) diff --git a/pfb/workers/grid.py b/pfb/workers/grid.py index 3981d5b0..cddbd32b 100644 --- a/pfb/workers/grid.py +++ b/pfb/workers/grid.py @@ -81,7 +81,7 @@ def grid(**kw): with ExitStack() as stack: from pfb import set_client if opts.nworkers > 1: - client = set_client(opts.nworkers, stack, log) + client = set_client(opts.nworkers, log, stack) from distributed import as_completed else: print("Faking client", file=log) diff --git a/pfb/workers/hci.py b/pfb/workers/hci.py index b9c8410b..fbc3f97e 100644 --- a/pfb/workers/hci.py +++ b/pfb/workers/hci.py @@ -87,7 +87,7 @@ def hci(**kw): dask.config.set(**{'array.slicing.split_large_chunks': False}) from pfb import set_client from distributed import wait, get_client - client = set_client(opts.nworkers, stack, log) + client = set_client(opts.nworkers, log, stack) ti = time.time() _hci(**opts) @@ -415,6 +415,11 @@ def _hci(**kw): associated_workers[future] = worker n_launched += 1 + if opts.memory_reporting: + worker_info = client.scheduler_info()['workers'] + print(f'Total memory {worker} MB = ', + worker_info[worker]['metrics']['memory']/1e6, file=log) + if opts.progressbar: print(f"\rProcessing: {n_launched}/{nds}", end='', flush=True) print("\n") # after progressbar above diff --git a/pfb/workers/init.py b/pfb/workers/init.py index c1b5b7eb..3acb8906 100644 --- a/pfb/workers/init.py +++ b/pfb/workers/init.py @@ -82,17 +82,19 @@ def init(**kw): resize_thread_pool(opts.nthreads) set_envs(opts.nthreads, ncpu) - with ExitStack() as stack: - import dask - dask.config.set(**{'array.slicing.split_large_chunks': False}) - from pfb import set_client - from distributed import wait, get_client - client = set_client(opts.nworkers, stack, log) + # with ExitStack() as stack: + import dask + dask.config.set(**{'array.slicing.split_large_chunks': False}) + from pfb import set_client + from distributed import wait, get_client + client = set_client(opts.nworkers, log) + + ti = time.time() + _init(**opts) - ti = time.time() - _init(**opts) + print(f"All done after {time.time() - ti}s", file=log) - print(f"All done after {time.time() - ti}s", file=log) + client.close() def _init(**kw): opts = OmegaConf.create(kw) @@ -376,8 +378,10 @@ def _init(**kw): associated_workers[future] = worker n_launched += 1 - worker_info = client.scheduler_info()['workers'] - print(f'Total memory {worker} MB = ', worker_info[worker]['metrics']['memory']) + if opts.memory_reporting: + worker_info = client.scheduler_info()['workers'] + print(f'Total memory {worker} MB = ', + worker_info[worker]['metrics']['memory']/1e6, file=log) if opts.progressbar: print(f"\rProcessing: {n_launched}/{nds}", end='', flush=True) diff --git a/pfb/workers/restore.py b/pfb/workers/restore.py index 40e1d2f8..88c13bd1 100644 --- a/pfb/workers/restore.py +++ b/pfb/workers/restore.py @@ -55,7 +55,7 @@ def restore(**kw): with ExitStack() as stack: if opts.nworkers > 1: from pfb import set_client - client = set_client(opts.nworkers, stack, log) + client = set_client(opts.nworkers, log, stack) else: print("Faking client", file=log) from pfb.utils.dist import fake_client diff --git a/pfb/workers/spotless.py b/pfb/workers/spotless.py index 0638929d..ae057168 100644 --- a/pfb/workers/spotless.py +++ b/pfb/workers/spotless.py @@ -72,7 +72,7 @@ def spotless(**kw): with ExitStack() as stack: from pfb import set_client if opts.nworkers > 1: - client = set_client(opts.nworkers, stack, log, + client = set_client(opts.nworkers, log, stack, direct_to_workers=opts.direct_to_workers) from distributed import as_completed else: