Skip to content

Commit

Permalink
make memory reporting optional
Browse files Browse the repository at this point in the history
  • Loading branch information
landmanbester committed Aug 28, 2024
1 parent a3dd719 commit 91709c4
Show file tree
Hide file tree
Showing 10 changed files with 42 additions and 20 deletions.
8 changes: 5 additions & 3 deletions pfb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def set_envs(nthreads, ncpu):
os.environ["NUMEXPR_NUM_THREADS"] = str(ne_threads)


def set_client(nworkers, stack, log,
def set_client(nworkers, log, stack=None,
host_address=None, direct_to_workers=False):
import dask
dask.config.set({'distributed.comm.compression': 'lz4'})
Expand All @@ -54,10 +54,12 @@ def set_client(nworkers, stack, log,
threads_per_worker=1,
memory_limit=0, # str(mem_limit/nworkers)+'GB'
asynchronous=False)
cluster = stack.enter_context(cluster)
if stack is not None:
cluster = stack.enter_context(cluster)
client = Client(cluster,
direct_to_workers=direct_to_workers)
client = stack.enter_context(client)
if stack is not None:
client = stack.enter_context(client)

client.wait_for_workers(nworkers)
dashboard_url = client.dashboard_link
Expand Down
5 changes: 5 additions & 0 deletions pfb/parser/hci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,11 @@ inputs:
default: false
info:
Compute naural gradient
memory_reporting:
dtype: bool
default: false
info:
Report worker memory as tasks complete

_include:
- (.)gridding.yml
Expand Down
6 changes: 6 additions & 0 deletions pfb/parser/init.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ inputs:
info:
Display progress.
Use --no-progressbar to deactivate.
memory_reporting:
dtype: bool
default: false
info:
Report worker memory as tasks complete

_include:
- (.)out.yml
- (.)dist.yml
Expand Down
2 changes: 1 addition & 1 deletion pfb/workers/degrid.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def degrid(**kw):

with ExitStack() as stack:
from pfb import set_client
client = set_client(opts.nworkers, stack, log)
client = set_client(opts.nworkers, log, stack)

ti = time.time()
_degrid(**opts)
Expand Down
2 changes: 1 addition & 1 deletion pfb/workers/fluxmop.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def fluxmop(**kw):
from distributed import wait
from pfb import set_client
if opts.nworkers > 1:
client = set_client(opts.nworkers, stack, log)
client = set_client(opts.nworkers, log, stack)
from distributed import as_completed
else:
print("Faking client", file=log)
Expand Down
2 changes: 1 addition & 1 deletion pfb/workers/grid.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def grid(**kw):
with ExitStack() as stack:
from pfb import set_client
if opts.nworkers > 1:
client = set_client(opts.nworkers, stack, log)
client = set_client(opts.nworkers, log, stack)
from distributed import as_completed
else:
print("Faking client", file=log)
Expand Down
7 changes: 6 additions & 1 deletion pfb/workers/hci.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def hci(**kw):
dask.config.set(**{'array.slicing.split_large_chunks': False})
from pfb import set_client
from distributed import wait, get_client
client = set_client(opts.nworkers, stack, log)
client = set_client(opts.nworkers, log, stack)

ti = time.time()
_hci(**opts)
Expand Down Expand Up @@ -415,6 +415,11 @@ def _hci(**kw):
associated_workers[future] = worker
n_launched += 1

if opts.memory_reporting:
worker_info = client.scheduler_info()['workers']
print(f'Total memory {worker} MB = ',
worker_info[worker]['metrics']['memory']/1e6, file=log)

if opts.progressbar:
print(f"\rProcessing: {n_launched}/{nds}", end='', flush=True)
print("\n") # after progressbar above
Expand Down
26 changes: 15 additions & 11 deletions pfb/workers/init.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,19 @@ def init(**kw):
resize_thread_pool(opts.nthreads)
set_envs(opts.nthreads, ncpu)

with ExitStack() as stack:
import dask
dask.config.set(**{'array.slicing.split_large_chunks': False})
from pfb import set_client
from distributed import wait, get_client
client = set_client(opts.nworkers, stack, log)
# with ExitStack() as stack:
import dask
dask.config.set(**{'array.slicing.split_large_chunks': False})
from pfb import set_client
from distributed import wait, get_client
client = set_client(opts.nworkers, log)

ti = time.time()
_init(**opts)

ti = time.time()
_init(**opts)
print(f"All done after {time.time() - ti}s", file=log)

print(f"All done after {time.time() - ti}s", file=log)
client.close()

def _init(**kw):
opts = OmegaConf.create(kw)
Expand Down Expand Up @@ -376,8 +378,10 @@ def _init(**kw):
associated_workers[future] = worker
n_launched += 1

worker_info = client.scheduler_info()['workers']
print(f'Total memory {worker} MB = ', worker_info[worker]['metrics']['memory'])
if opts.memory_reporting:
worker_info = client.scheduler_info()['workers']
print(f'Total memory {worker} MB = ',
worker_info[worker]['metrics']['memory']/1e6, file=log)

if opts.progressbar:
print(f"\rProcessing: {n_launched}/{nds}", end='', flush=True)
Expand Down
2 changes: 1 addition & 1 deletion pfb/workers/restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def restore(**kw):
with ExitStack() as stack:
if opts.nworkers > 1:
from pfb import set_client
client = set_client(opts.nworkers, stack, log)
client = set_client(opts.nworkers, log, stack)
else:
print("Faking client", file=log)
from pfb.utils.dist import fake_client
Expand Down
2 changes: 1 addition & 1 deletion pfb/workers/spotless.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def spotless(**kw):
with ExitStack() as stack:
from pfb import set_client
if opts.nworkers > 1:
client = set_client(opts.nworkers, stack, log,
client = set_client(opts.nworkers, log, stack,
direct_to_workers=opts.direct_to_workers)
from distributed import as_completed
else:
Expand Down

0 comments on commit 91709c4

Please sign in to comment.