From f4b3b7068ce4f885609983a87f7c658bf4a655d8 Mon Sep 17 00:00:00 2001 From: landmanbester Date: Wed, 18 Sep 2024 13:31:51 +0200 Subject: [PATCH] more consistent use of xds_from_url --- pfb/workers/fluxmop.py | 4 ++-- pfb/workers/klean.py | 10 +++------- pfb/workers/model2comps.py | 3 +-- pfb/workers/restore.py | 3 +-- pfb/workers/sara.py | 16 ++++------------ pfb/workers/spotless.py | 4 ++-- 6 files changed, 13 insertions(+), 27 deletions(-) diff --git a/pfb/workers/fluxmop.py b/pfb/workers/fluxmop.py index e8afa592..3087db96 100644 --- a/pfb/workers/fluxmop.py +++ b/pfb/workers/fluxmop.py @@ -58,7 +58,7 @@ def fluxmop(**kw): basename = opts.output_filename fits_oname = f'{opts.fits_output_folder}/{oname}' - dds_store = DaskMSStore(f'{basename}_{opts.suffix}.dds') + dds_name =f'{basename}_{opts.suffix}.dds' with ExitStack() as stack: from distributed import wait @@ -76,7 +76,7 @@ def fluxmop(**kw): ti = time.time() _fluxmop(**opts) - dds_list = dds_store.fs.glob(f'{dds_store.url}/*.zarr') + _, dds_list = xds_from_url(dds_name) # convert to fits files if opts.fits_mfs or opts.fits_cubes: diff --git a/pfb/workers/klean.py b/pfb/workers/klean.py index 61d8ed5f..1cb8bbec 100644 --- a/pfb/workers/klean.py +++ b/pfb/workers/klean.py @@ -55,13 +55,13 @@ def klean(**kw): basename = f'{basedir}/{oname}' fits_oname = f'{opts.fits_output_folder}/{oname}' - dds_store = DaskMSStore(f'{basename}_{opts.suffix}.dds') + dds_name = f'{basename}_{opts.suffix}.dds' with ExitStack() as stack: ti = time.time() _klean(**opts) - dds, dds_list = xds_from_url(dds_store.url) + dds, dds_list = xds_from_url(dds_name) from pfb.utils.fits import dds2fits @@ -108,11 +108,7 @@ def _klean(**kw): fits_oname = basename dds_name = f'{basename}_{opts.suffix}.dds' - dds_store = DaskMSStore(dds_name) - dds_list = dds_store.fs.glob(f'{dds_store.url}/*.zarr') - drop_vars = ['UVW','WEIGHT','MASK'] - dds = xds_from_list(dds_list, nthreads=opts.nthreads, - drop_vars=drop_vars) + dds, dds_list = xds_from_url(dds_name) nx, ny = dds[0].x.size, dds[0].y.size nx_psf, ny_psf = dds[0].x_psf.size, dds[0].y_psf.size diff --git a/pfb/workers/model2comps.py b/pfb/workers/model2comps.py index e6cb4906..0afd9378 100644 --- a/pfb/workers/model2comps.py +++ b/pfb/workers/model2comps.py @@ -80,8 +80,7 @@ def _model2comps(**kw): fits_oname = basename dds_name = f'{basename}_{opts.suffix}.dds' - dds_store = DaskMSStore(dds_name) - dds, dds_list = xds_from_url(dds_store.url) + dds, dds_list = xds_from_url(dds_name) if opts.model_out is not None: coeff_name = opts.model_out diff --git a/pfb/workers/restore.py b/pfb/workers/restore.py index d2b2d34b..dcefb589 100644 --- a/pfb/workers/restore.py +++ b/pfb/workers/restore.py @@ -93,8 +93,7 @@ def _restore(**kw): fits_oname = basename dds_name = f'{basename}_{opts.suffix}.dds' - dds_store = DaskMSStore(dds_name) - dds, dds_list = xds_from_url(dds_store.url) + dds, dds_list = xds_from_url(dds_name) if opts.drop_bands is not None: ddso = [] diff --git a/pfb/workers/sara.py b/pfb/workers/sara.py index 10fe99be..b64cc220 100644 --- a/pfb/workers/sara.py +++ b/pfb/workers/sara.py @@ -54,13 +54,13 @@ def sara(**kw): basename = opts.output_filename fits_oname = f'{opts.fits_output_folder}/{oname}' - dds_store = DaskMSStore(f'{basename}_{opts.suffix}.dds') + dds_name = f'{basename}_{opts.suffix}.dds' with ExitStack() as stack: ti = time.time() _sara(**opts) - dds, dds_list = xds_from_url(dds_store.url) + dds, dds_list = xds_from_url(dds_name) if opts.fits_mfs or opts.fits: from pfb.utils.fits import dds2fits @@ -138,11 +138,7 @@ def _sara(**kw): fits_oname = basename dds_name = f'{basename}_{opts.suffix}.dds' - dds_store = DaskMSStore(dds_name) - dds_list = dds_store.fs.glob(f'{dds_store.url}/*.zarr') - dds = xds_from_list(dds_list, - drop_vars=['UVW', 'WEIGHT', 'MASK'], - nthreads=opts.nthreads) + dds, dds_list = xds_from_url(dds_name) nx, ny = dds[0].x.size, dds[0].y.size nx_psf, ny_psf = dds[0].x_psf.size, dds[0].y_psf.size @@ -159,11 +155,7 @@ def _sara(**kw): nband = freq_out.size - # only need this to get ny_psf - dds = [ds.drop_vars('PSF') for ds in dds] - - # stitch dirty/psf in apparent scale - # drop_vars to avoid duplicates in memory + # drop_vars after access to avoid duplicates in memory # and avoid unintentional side effects? output_type = dds[0].DIRTY.dtype if 'RESIDUAL' in dds[0]: diff --git a/pfb/workers/spotless.py b/pfb/workers/spotless.py index a5b290dc..8df5e747 100644 --- a/pfb/workers/spotless.py +++ b/pfb/workers/spotless.py @@ -49,7 +49,7 @@ def spotless(**kw): xds_store = DaskMSStore(xds_name) opts.xds = xds_name fits_oname = f'{opts.fits_output_folder}/{oname}' - dds_store = DaskMSStore(f'{basename}_{opts.suffix}.dds') + dds_name = f'{basename}_{opts.suffix}.dds' OmegaConf.set_struct(opts, True) @@ -81,7 +81,7 @@ def spotless(**kw): ti = time.time() _spotless(**opts) - dds_list = dds_store.fs.glob(f'{dds_store.url}/*.zarr') + dds, dds_list = xds_from_url(dds_name) # convert to fits files futures = []