From faa15e2e46f4ef8f00e99688328c76bf446bda22 Mon Sep 17 00:00:00 2001 From: Alec Thomson Date: Wed, 31 Jul 2024 12:07:56 +0800 Subject: [PATCH] Refactor of beamcon (#59) * Refactor 2d * Start on 3d * 3D tests * 3D fixes * Update deps * No deps * Sanity * Absolute paths * Add cores limiter * Fix logic * Add some rand to test * Ruff * Formatting * Cleanup * Simplify smoother * Docs * Docs * Docs * Docs * Ruff * Version bump * Fix types * Add dels * All the executors * Update yaml * Add CLI arg * Docs * data types and free unused matrices in robust convolution * pass executor type to smooth_fits_cube * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * readme performance suggestion * Get logging working * ignore * Fixes * Bump python --------- Co-authored-by: Austin Shen Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- .gitignore | 2 + Dockerfile | 2 +- Dockerfile.mpich | 2 +- README.md | 163 +++--- environment.yml | 4 +- pyproject.toml | 3 +- racs_tools/au2.py | 2 +- racs_tools/beamcon_2D.py | 747 ++++++++++++--------------- racs_tools/beamcon_3D.py | 993 +++++++++++++++++------------------- racs_tools/convolve_uv.py | 490 +++++++++++++++--- racs_tools/gaussft.py | 10 +- racs_tools/getnoise_list.py | 13 +- racs_tools/logging.py | 86 +++- racs_tools/parallel.py | 43 ++ tests/test_2d.py | 61 +-- tests/test_3d.py | 29 +- 16 files changed, 1469 insertions(+), 1181 deletions(-) create mode 100644 racs_tools/parallel.py diff --git a/.gitignore b/.gitignore index ce6a25d..b55e77e 100644 --- a/.gitignore +++ b/.gitignore @@ -141,3 +141,5 @@ mk_convolve beamlog.3d.astropy.txt beamlog.3d.robust.txt beamlog.3d.scipy.txt +*.fits +beamlog* diff --git a/Dockerfile b/Dockerfile index e586d05..4f83d86 100644 --- a/Dockerfile +++ b/Dockerfile @@ -9,7 +9,7 @@ RUN mkdir /tmp/numba_cache & chmod 777 /tmp/numba_cache & NUMBA_CACHE_DIR=/tmp/n ENV NUMBA_CACHE_DIR=/tmp/numba_cache COPY --chown=$MAMBA_USER:$MAMBA_USER . ./src RUN echo "Installing python and uv" -RUN micromamba install python=3.8 uv -y -c conda-forge && \ +RUN micromamba install python=3.12 uv -y -c conda-forge && \ micromamba clean --all --yes RUN echo "Installing RACS-tools" RUN micromamba run uv pip install ./src diff --git a/Dockerfile.mpich b/Dockerfile.mpich index 8ce0bf2..6e987b1 100644 --- a/Dockerfile.mpich +++ b/Dockerfile.mpich @@ -9,7 +9,7 @@ RUN mkdir /tmp/numba_cache & chmod 777 /tmp/numba_cache & NUMBA_CACHE_DIR=/tmp/n ENV NUMBA_CACHE_DIR=/tmp/numba_cache COPY --chown=$MAMBA_USER:$MAMBA_USER . ./src RUN echo "Installing python and uv" -RUN micromamba install python=3.10 uv -y -c conda-forge && \ +RUN micromamba install python=3.12 uv -y -c conda-forge && \ micromamba clean --all --yes RUN echo "Installing RACS-tools" RUN micromamba run uv pip install ./src[mpi] diff --git a/README.md b/README.md index 54ac9bd..29e465b 100644 --- a/README.md +++ b/README.md @@ -43,124 +43,102 @@ pip install git+https://github.com/AlecThomson/RACS-tools ``` $ beamcon_2D -h -usage: beamcon_2D [-h] [-p PREFIX] [-s SUFFIX] [-o OUTDIR] [--conv_mode {robust,scipy,astropy,astropy_fft}] [-v] [-d] [--bmaj BMAJ] [--bmin BMIN] [--bpa BPA] [--log LOG] [--logfile LOGFILE] [-c CUTOFF] [--circularise] [-t TOLERANCE] [-e EPSILON] [-n NSAMPS] [--ncores N_CORES | --mpi] infile [infile ...] - - Smooth a field of 2D images to a common resolution. - - - Parallelisation can run using multiprocessing or MPI. - - - Default names of output files are /path/to/beamlog{infile//.fits/.{SUFFIX}.fits} - - - By default, the smallest common beam will be automatically computed. - - Optionally, you can specify a target beam to use. - +usage: beamcon_2D [-h] [-p PREFIX] [-s SUFFIX] [-o OUTDIR] [--conv_mode {robust,scipy,astropy,astropy_fft}] [-v] [-d] [--bmaj BMAJ] [--bmin BMIN] + [--bpa BPA] [--log LOG] [--logfile LOGFILE] [-c CUTOFF] [--circularise] [-t TOLERANCE] [-e EPSILON] [-n NSAMPS] [--ncores NCORES] + [--executor {thread,process,mpi}] + infile [infile ...] +Smooth a field of 2D images to a common resolution. - Parallelisation can run using multiprocessing or MPI. - Default names of output files are +/path/to/beamlog{infile//.fits/.{SUFFIX}.fits} - By default, the smallest common beam will be automatically computed. - Optionally, you can specify a +target beam to use. positional arguments: infile Input FITS image(s) to smooth (can be a wildcard) - beam info must be in header. -optional arguments: +options: -h, --help show this help message and exit -p PREFIX, --prefix PREFIX - Add prefix to output filenames. + Add prefix to output filenames. (default: None) -s SUFFIX, --suffix SUFFIX - Add suffix to output filenames [sm]. + Add suffix to output filenames [sm]. (default: sm) -o OUTDIR, --outdir OUTDIR - Output directory of smoothed FITS image(s) [same as input file]. + Output directory of smoothed FITS image(s) [same as input file]. (default: None) --conv_mode {robust,scipy,astropy,astropy_fft} - Which method to use for convolution [robust]. - 'robust' computes the analytic FT of the convolving Gaussian. - Note this mode can now handle NaNs in the data. - Can also be 'scipy', 'astropy', or 'astropy_fft'. - Note these other methods cannot cope well with small convolving beams. - - -v, --verbosity Increase output verbosity - -d, --dryrun Compute common beam and stop [False]. - --bmaj BMAJ Target BMAJ (arcsec) to convolve to [None]. - --bmin BMIN Target BMIN (arcsec) to convolve to [None]. - --bpa BPA Target BPA (deg) to convolve to [None]. - --log LOG Name of beamlog file. If provided, save beamlog data to a file [None - not saved]. - --logfile LOGFILE Save logging output to file + Which method to use for convolution [robust]. 'robust' computes the analytic FT of the convolving Gaussian. Note this mode can + now handle NaNs in the data. Can also be 'scipy', 'astropy', or 'astropy_fft'. Note these other methods cannot cope well with + small convolving beams. (default: robust) + -v, --verbosity Increase output verbosity (default: 0) + -d, --dryrun Compute common beam and stop [False]. (default: False) + --bmaj BMAJ Target BMAJ (arcsec) to convolve to [None]. (default: None) + --bmin BMIN Target BMIN (arcsec) to convolve to [None]. (default: None) + --bpa BPA Target BPA (deg) to convolve to [None]. (default: None) + --log LOG Name of beamlog file. If provided, save beamlog data to a file [None - not saved]. (default: None) + --logfile LOGFILE Save logging output to file (default: None) -c CUTOFF, --cutoff CUTOFF - Cutoff BMAJ value (arcsec) -- Blank channels with BMAJ larger than this [None -- no limit] - --circularise Circularise the final PSF -- Sets the BMIN = BMAJ, and BPA=0. + Cutoff BMAJ value (arcsec) -- Blank channels with BMAJ larger than this [None -- no limit] (default: None) + --circularise Circularise the final PSF -- Sets the BMIN = BMAJ, and BPA=0. (default: False) -t TOLERANCE, --tolerance TOLERANCE - tolerance for radio_beam.commonbeam. + tolerance for radio_beam.commonbeam. (default: 0.0001) -e EPSILON, --epsilon EPSILON - epsilon for radio_beam.commonbeam. + epsilon for radio_beam.commonbeam. (default: 0.0005) -n NSAMPS, --nsamps NSAMPS - nsamps for radio_beam.commonbeam. - --ncores N_CORES Number of processes (uses multiprocessing). - --mpi Run with MPI. + nsamps for radio_beam.commonbeam. (default: 200) + --ncores NCORES Number of cores to use for parallelisation. If None, use all available cores. (default: None) + --executor {thread,process,mpi} + Executor to use for parallelisation (default: thread) ``` ``` $ beamcon_3D -h -usage: beamcon_3D [-h] [--uselogs] [--mode MODE] [--conv_mode {robust,scipy,astropy,astropy_fft}] [-v] [--logfile LOGFILE] [-d] [-p PREFIX] [-s SUFFIX] [-o OUTDIR] [--bmaj BMAJ] [--bmin BMIN] [--bpa BPA] [-c CUTOFF] [--circularise] [--ref_chan {first,last,mid}] [-t TOLERANCE] [-e EPSILON] [-n NSAMPS] infile [infile ...] - - Smooth a field of 3D cubes to a common resolution. - - - Parallelisation is done using MPI. - - - Default names of output files are /path/to/beamlog{infile//.fits/.{SUFFIX}.fits} - - - By default, the smallest common beam will be automatically computed. - - Optionally, you can specify a target beam to use. - - - It is currently assumed that cubes will be 4D with a dummy Stokes axis. - - Iterating over Stokes axis is not yet supported. - +usage: beamcon_3D [-h] [--uselogs] [--mode MODE] [--conv_mode {robust,scipy,astropy,astropy_fft}] [-v] [--logfile LOGFILE] [-d] [-p PREFIX] [-s SUFFIX] + [-o OUTDIR] [--bmaj BMAJ] [--bmin BMIN] [--bpa BPA] [-c CUTOFF] [--circularise] [--ref_chan {first,last,mid}] [-t TOLERANCE] + [-e EPSILON] [-n NSAMPS] [--ncores NCORES] [--executor_type {thread,process,mpi}] + infile [infile ...] +Smooth a field of 3D cubes to a common resolution. - Default names of output files are /path/to/beamlog{infile//.fits/.{SUFFIX}.fits} - By default, the +smallest common beam will be automatically computed. - Optionally, you can specify a target beam to use. - It is currently assumed that cubes will be +4D with a dummy Stokes axis. - Iterating over Stokes axis is not yet supported. positional arguments: - infile Input FITS image(s) to smooth (can be a wildcard) - - CASA beamtable will be used if present i.e. if CASAMBM = T - - Otherwise beam info must be in co-located beamlog files. - - beamlog must have the name /path/to/beamlog{infile//.fits/.txt} - + infile Input FITS image(s) to smooth (can be a wildcard) - CASA beamtable will be used if present i.e. if CASAMBM = T - Otherwise beam + info must be in co-located beamlog files. - beamlog must have the name /path/to/beamlog{infile//.fits/.txt} -optional arguments: +options: -h, --help show this help message and exit - --uselogs Get convolving information from previous run [False]. - --mode MODE Common resolution mode [natural]. - natural -- allow frequency variation. - total -- smooth all plans to a common resolution. - + --uselogs Get convolving information from previous run [False]. (default: False) + --mode MODE Common resolution mode [natural]. natural -- allow frequency variation. total -- smooth all plans to a common resolution. + (default: natural) --conv_mode {robust,scipy,astropy,astropy_fft} - Which method to use for convolution [robust]. - 'robust' computes the analytic FT of the convolving Gaussian. - Note this mode can now handle NaNs in the data. - Can also be 'scipy', 'astropy', or 'astropy_fft'. - Note these other methods cannot cope well with small convolving beams. - - -v, --verbosity Increase output verbosity - --logfile LOGFILE Save logging output to file - -d, --dryrun Compute common beam and stop [False]. + Which method to use for convolution [robust]. 'robust' computes the analytic FT of the convolving Gaussian. Note this mode can + now handle NaNs in the data. Can also be 'scipy', 'astropy', or 'astropy_fft'. Note these other methods cannot cope well with + small convolving beams. (default: robust) + -v, --verbosity Increase output verbosity (default: 0) + --logfile LOGFILE Save logging output to file (default: None) + -d, --dryrun Compute common beam and stop. (default: False) -p PREFIX, --prefix PREFIX - Add prefix to output filenames. + Add prefix to output filenames. (default: None) -s SUFFIX, --suffix SUFFIX - Add suffix to output filenames [{MODE}]. + Add suffix to output filenames [{MODE}]. (default: None) -o OUTDIR, --outdir OUTDIR - Output directory of smoothed FITS image(s) [None - same as input]. - --bmaj BMAJ BMAJ to convolve to [max BMAJ from given image(s)]. - --bmin BMIN BMIN to convolve to [max BMAJ from given image(s)]. - --bpa BPA BPA to convolve to [0]. + Output directory of smoothed FITS image(s) [None - same as input]. (default: None) + --bmaj BMAJ BMAJ to convolve to [max BMAJ from given image(s)]. (default: None) + --bmin BMIN BMIN to convolve to [max BMAJ from given image(s)]. (default: None) + --bpa BPA BPA to convolve to [0]. (default: None) -c CUTOFF, --cutoff CUTOFF - Cutoff BMAJ value (arcsec) -- Blank channels with BMAJ larger than this [None -- no limit] - --circularise Circularise the final PSF -- Sets the BMIN = BMAJ, and BPA=0. + Cutoff BMAJ value (arcsec) -- Blank channels with BMAJ larger than this [None -- no limit] (default: None) + --circularise Circularise the final PSF -- Sets the BMIN = BMAJ, and BPA=0. (default: False) --ref_chan {first,last,mid} - Reference psf for header [None]. - first -- use psf for first frequency channel. - last -- use psf for the last frequency channel. - mid -- use psf for the centre frequency channel. - Will use the CRPIX channel if not set. - + Reference psf for header [None]. first -- use psf for first frequency channel. last -- use psf for the last frequency channel. + mid -- use psf for the centre frequency channel. Will use the CRPIX channel if not set. (default: None) -t TOLERANCE, --tolerance TOLERANCE - tolerance for radio_beam.commonbeam. + tolerance for radio_beam.commonbeam. (default: 0.0001) -e EPSILON, --epsilon EPSILON - epsilon for radio_beam.commonbeam. + epsilon for radio_beam.commonbeam. (default: 0.0005) -n NSAMPS, --nsamps NSAMPS - nsamps for radio_beam.commonbeam. + nsamps for radio_beam.commonbeam. (default: 200) + --ncores NCORES Number of cores to use for parallelisation. If None, use all available cores. (default: None) + --executor_type {thread,process,mpi} + Executor type for parallelisation. (default: thread) ``` ``` @@ -186,6 +164,19 @@ options: If finding a common beam fails, try tweaking the `tolerance`, `epsilon`, and `nsamps` parameters. See [radio-beam](https://radio-beam.readthedocs.io/en/latest/) for more details. +## Performance + +Profiling for `beamcon_3D` suggests this program requires a minimum of ~15X the memory of a data cube slice per process to perform convolution to a common beam. So for a 800 MB slice (e.g. typical POSSUM cube) you would want to allow 15 GB memory per worker (I use 20 GB). Choose `ncores` appropriately given your machine memory availability and this limit to ensure optimal performance with multiprocessing. + +An example slurm header for `beamcon_3D`: + +``` +#SBATCH --nodes=1 +#SBATCH --ntasks-per-node=1 +#SBATCH --cpus-per-task= +#SBATCH --mem-per-cpu=20G +``` + ## Contributing Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to change. diff --git a/environment.yml b/environment.yml index a093613..c595c7c 100644 --- a/environment.yml +++ b/environment.yml @@ -5,16 +5,14 @@ channels: - defaults - pkgw-forge dependencies: -- python=3.8 +- python=3.10 - pip - numpy - scipy - astropy -- gfortran - mpi4py - pip: - spectral_cube - radio_beam - - schwimmbad - tqdm - ./ diff --git a/pyproject.toml b/pyproject.toml index ba9fbf2..603536b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,12 +12,11 @@ python = ">=3.8" numpy = "<2" astropy = ">=5" radio_beam = "*" -schwimmbad = "*" scipy = "*" spectral_cube = ">=0.6.3" tqdm = "*" numba = "*" -mpi4py = {version = "*", optional = true} +mpi4py = {version = ">=3", optional = true} [tool.poetry.dev-dependencies] black = "*" diff --git a/racs_tools/au2.py b/racs_tools/au2.py index d695920..49a76b0 100644 --- a/racs_tools/au2.py +++ b/racs_tools/au2.py @@ -162,6 +162,6 @@ def gauss_factor( * bmin2 / math.sqrt(alpha * beta - 0.25 * gamma * gamma) ) - fac = ((math.sqrt(dx1**2) * math.sqrt(dy1**2))) / amp + fac = (math.sqrt(dx1**2) * math.sqrt(dy1**2)) / amp return fac, amp, bmaj, bmin, np.degrees(bpa) diff --git a/racs_tools/beamcon_2D.py b/racs_tools/beamcon_2D.py index 2a7ad6a..3ac60c4 100644 --- a/racs_tools/beamcon_2D.py +++ b/racs_tools/beamcon_2D.py @@ -3,15 +3,11 @@ __author__ = "Alec Thomson" import logging -import os import sys -from functools import partial -from hashlib import new -from typing import Dict, List, Optional, Tuple +from pathlib import Path +from typing import List, Literal, NamedTuple, Optional, Tuple -import astropy.wcs import numpy as np -import schwimmbad from astropy import units as u from astropy.io import ascii, fits from astropy.table import Table @@ -21,129 +17,132 @@ from radio_beam.utils import BeamError from tqdm import tqdm -from racs_tools import au2 -from racs_tools.convolve_uv import smooth -from racs_tools.logging import logger, setup_logger +from racs_tools.convolve_uv import ( + get_convolving_beam, + get_nyquist_beam, + my_ceil, + parse_conv_mode, + round_up, + smooth, +) +from racs_tools.logging import ( + init_worker, + log_listener, + log_queue, + logger, + set_verbosity, +) +from racs_tools.parallel import get_executor + +# logger = setup_logger() + ############################################# #### ADAPTED FROM SCRIPT BY T. VERNSTROM #### ############################################# - - -def round_up(n: float, decimals: int = 0) -> float: - """Round to number of decimals - - Args: - n (float): Number to round. - decimals (int, optional): Number of decimals. Defaults to 0. - - Returns: - float: Rounded number. - """ - multiplier = 10**decimals - return np.ceil(n * multiplier) / multiplier - - -def my_ceil(a: float, precision: float = 0.0) -> float: - """Modified ceil function to round up to precision - - Args: - a (float): Number to round. - precision (float, optional): Precision of rounding. Defaults to 0. - - Returns: - float: Rounded number. - """ - return np.round(a + 0.5 * 10 ** (-precision), precision) - - -def getbeam( - old_beam: Beam, - new_beam: Beam, - dx: u.Quantity, - dy: u.Quantity, +class ImageData(NamedTuple): + """Image data and metadata""" + + filename: Path + """Filename of FITS file""" + image: np.ndarray + """Image data""" + four_d: bool + """Does the image have spectral and polarization axes?""" + header: fits.Header + """FITS header""" + old_beam: Beam + """Original beam""" + nx: int + """Number of pixels in x""" + ny: int + """Number of pixels in y""" + dx: u.Quantity + """Pixel size in x""" + dy: u.Quantity + """Pixel size in y""" + + +class BeamLogInfo(NamedTuple): + """Beam log info""" + + filename: Path + """Filename of FITS file""" + old_beam: Beam + """Original beam""" + new_beam: Beam + """Target beam""" + conv_beam: Beam + """Convolving beam""" + + +def check_target_beam( + target_beam: Beam, + all_beams: Beams, + files: List[Path], cutoff: Optional[float] = None, -) -> Tuple[Beam, float]: - """Get the beam to use for smoothing +) -> bool: + """Check that target beam will deconvolve Args: - old_beam (Beam): Current beam. - new_beam (Beam): Target beam. - dx (u.Quantity): Pixel size in x. - dy (u.Quantity): Pixel size in y. - cutoff (float, optional): Cutoff for beamsize in arcsec. Defaults to None. + target_beam (Beam): Target beam. + all_beams (Beams): All the beams to check. + files (List[Path]): All the FITS files to check. + cutoff (Optional[float], optional): Cutoff of beam in arcsec. Defaults to None. Raises: - err: If beam deconvolution fails. + BeamError: If beam deconvolution fails. Returns: - Tuple[Beam, float]: Convolving beam and scaling factor. + bool: If the target beam will deconvolve. """ - logger.info(f"Current beam is {old_beam!r}") - - if cutoff is not None and old_beam.major.to(u.arcsec) > cutoff * u.arcsec: - return ( - Beam( - major=np.nan * u.deg, - minor=np.nan * u.deg, - pa=np.nan * u.deg, - ), - np.nan, - ) - - if new_beam == old_beam: - conbm = Beam( - major=0 * u.deg, - minor=0 * u.deg, - pa=0 * u.deg, - ) - fac = 1.0 - logger.warning( - f"New beam {new_beam!r} and old beam {old_beam!r} are the same. Won't attempt convolution." + logger.info("Checking that target beam will deconvolve...") + mask_count = 0 + failed = [] + for i, (beam, file) in enumerate( + tqdm( + zip(all_beams, files), + total=len(all_beams), + desc="Deconvolving", + disable=(logger.level > logging.INFO), ) - return conbm, fac - try: - conbm = new_beam.deconvolve(old_beam) - except BeamError as err: - logger.error(err) - logger.warning( - f"Could not deconvolve. New: {new_beam!r}, Old: {old_beam!r} - will set convolving beam to 0.0" - ) - conbm = new_beam.deconvolve(old_beam, failure_returns_pointlike=True) - fac, amp, outbmaj, outbmin, outbpa = au2.gauss_factor( - beamConv=[ - conbm.major.to(u.arcsec).value, - conbm.minor.to(u.arcsec).value, - conbm.pa.to(u.deg).value, - ], - beamOrig=[ - old_beam.major.to(u.arcsec).value, - old_beam.minor.to(u.arcsec).value, - old_beam.pa.to(u.deg).value, - ], - dx1=dx.to(u.arcsec).value, - dy1=dy.to(u.arcsec).value, - ) - - return conbm, fac - - -def getimdata(cubenm: str) -> dict: + ): + if cutoff is not None and beam.major.to(u.arcsec) > cutoff * u.arcsec: + continue + try: + target_beam.deconvolve(beam) + except ValueError: + mask_count += 1 + failed.append(file) + except BeamError as be: + # BeamError should not be raised if beams are equal + if target_beam != beam: + raise BeamError(be) + is_good = mask_count == 0 + if not is_good: + logger.error("The following images could not reach target resolution:") + logger.error(failed) + + return is_good + + +def getimdata(cubenm: Path) -> ImageData: """Get image data from FITS file Args: - cubenm (str): File name of image. + cubenm (Path): File name of image. Returns: - dict: Data and metadata. + ImageData: Data and metadata. """ logger.info(f"Getting image data from {cubenm}") with fits.open(cubenm, memmap=True, mode="denywrite") as hdu: - w = astropy.wcs.WCS(hdu[0]) - pixelscales = proj_plane_pixel_scales(w) + header = hdu[0].header + wcs = WCS(hdu[0]) + pixelscales = proj_plane_pixel_scales(wcs) - dxas = pixelscales[0] * u.deg - dyas = pixelscales[1] * u.deg + dx = pixelscales[0] * u.deg + dy = pixelscales[1] * u.deg if len(hdu[0].data.shape) == 4: # has spectral, polarization axes @@ -153,135 +152,138 @@ def getimdata(cubenm: str) -> dict: nx, ny = data.shape[-1], data.shape[-2] old_beam = Beam.from_fits_header(hdu[0].header) - - datadict = { - "filename": os.path.basename(cubenm), - "image": data, - "4d": (len(hdu[0].data.shape) == 4), - "header": hdu[0].header, - "old_beam": old_beam, - "nx": nx, - "ny": ny, - "dx": dxas, - "dy": dyas, - } - return datadict + is_4d = len(hdu[0].data.shape) == 4 + return ImageData( + filename=Path(cubenm), + image=data, + four_d=is_4d, + header=header, + old_beam=old_beam, + nx=nx, + ny=ny, + dx=dx, + dy=dy, + ) def savefile( newimage: np.ndarray, - filename: str, + outfile: Path, header: fits.Header, - final_beam: Beam, - outdir: str = ".", + new_beam: Beam, ) -> None: """Save smoothed image to FITS file Args: newimage (np.ndarray): Smoothed image. - filename (str): File name. + outfile (Path): File name. header (fits.Header): FITS header. - final_beam (Beam): New beam. - outdir (str, optional): Output directory. Defaults to ".". + new_beam (Beam): New beam. + + Raises: + FileNotFoundError: If file is not saved. """ - outfile = f"{outdir}/{filename}" - logger.info(f"Saving to {outfile}") - beam = final_beam + logger.info(f"Saving to {outfile.absolute()}") + beam = new_beam header = beam.attach_to_header(header) - fits.writeto(outfile, newimage.astype(np.float32), header=header, overwrite=True) + fits.writeto( + outfile.absolute(), newimage.astype(np.float32), header=header, overwrite=True + ) + + if not outfile.exists(): + raise FileNotFoundError(f"File {outfile} not saved!") -def worker( - file: str, - outdir: str, +def beamcon_2d_on_fits( + file: Path, new_beam: Beam, - conv_mode: str, + conv_mode: Literal["robust", "scipy", "astropy", "astropy_fft"], suffix: str = "", prefix: str = "", + outdir: Optional[Path] = None, cutoff: Optional[float] = None, dryrun: bool = False, -) -> dict: - """Parallel worker function +) -> BeamLogInfo: + """Run beamcon_2d on a FITS file Args: - file (str): FITS file to smooth. - outdir (str): Output directory. - new_beam (Beam): Target PSF. - conv_mode (str): Convolving mode. + file (Path): FITS file to smooth. + new_beam (Beam): Target beam. + conv_mode (str): Convolution mode. suffix (str, optional): Filename suffix. Defaults to "". prefix (str, optional): Filename prefix. Defaults to "". - cutoff (float, optional): PSF cutoff. Defaults to None. - dryrun (bool, optional): Do a dryrun. Defaults to False. + outdir (Optional[Path], optional): Ouput directory. Defaults to None (will be same as input). + cutoff (Optional[float], optional): Cutoff for beamsize in arcsec. Defaults to None. + dryrun (bool, optional): Don't save any images. Defaults to False. Returns: - dict: Output data. + BeamLogInfo: Beamlog information. """ logger.info(f"Working on {file}") - if outdir is None: - outdir = os.path.dirname(file) + outfile = Path(file.name) + if suffix: + outfile = outfile.with_suffix(f".{suffix}.fits") + if prefix: + outfile = Path(prefix + outfile.name) - if outdir == "": - outdir = "." + if outdir is not None: + outdir = outdir.absolute() + else: + outdir = file.parent.absolute() - outfile = os.path.basename(file) - outfile = outfile.replace(".fits", "") + f".{suffix}.fits" - if prefix is not None: - outfile = prefix + outfile - datadict = getimdata(file) + image_data = getimdata(file) - conbeam, sfactor = getbeam( - old_beam=datadict["old_beam"], + conv_beam, _ = get_convolving_beam( + old_beam=image_data.old_beam, new_beam=new_beam, - dx=datadict["dx"], - dy=datadict["dy"], + dx=image_data.dx, + dy=image_data.dy, cutoff=cutoff, ) - datadict.update({"conbeam": conbeam, "final_beam": new_beam, "sfactor": sfactor}) - if not dryrun: - if np.isnan(sfactor) or np.isnan(conbeam): - logger.warning(f"Setting {outfile} to NaN") - newim = datadict["image"] * np.nan - elif ( - conbeam == Beam(major=0 * u.deg, minor=0 * u.deg, pa=0 * u.deg) - and sfactor == 1 - ): - newim = datadict["image"] - else: - newim = smooth( - image=datadict["image"], - old_beam=datadict["old_beam"], - final_beam=datadict["final_beam"], - dx=datadict["dx"], - dy=datadict["dy"], - sfactor=datadict["sfactor"], - conbeam=datadict["conbeam"], - conv_mode=conv_mode, - ) - if datadict["4d"]: - # make it back into a 4D image - newim = np.expand_dims(np.expand_dims(newim, axis=0), axis=0) - datadict.update( - { - "newimage": newim, - } - ) - savefile( - newimage=datadict["newimage"], + if dryrun: + return BeamLogInfo( filename=outfile, - header=datadict["header"], - final_beam=datadict["final_beam"], - outdir=outdir, + old_beam=image_data.old_beam, + new_beam=new_beam, + conv_beam=conv_beam, ) - return datadict + new_image = smooth( + image=image_data.image, + old_beam=image_data.old_beam, + new_beam=new_beam, + dx=image_data.dx, + dy=image_data.dy, + conv_mode=conv_mode, + cutoff=cutoff, + ) + + if image_data.four_d: + # make it back into a 4D image + new_image = np.expand_dims(np.expand_dims(new_image, axis=0), axis=0) + + savefile( + newimage=new_image, + outfile=outdir / outfile, + header=image_data.header, + new_beam=new_beam, + ) + del new_image + + return BeamLogInfo( + filename=outfile, + old_beam=image_data.old_beam, + new_beam=new_beam, + conv_beam=conv_beam, + ) -def getmaxbeam( - files: List[str], - conv_mode: str = "robust", - target_beam: Beam = None, +def get_common_beam( + files: List[Path], + conv_mode: Literal["robust", "scipy", "astropy", "astropy_fft"] = "robust", + target_beam: Optional[Beam] = None, cutoff: Optional[float] = None, tolerance: float = 0.0001, nsamps: float = 200, @@ -290,17 +292,13 @@ def getmaxbeam( """Get the smallest common beam. Args: - files (List[str]): List of FITS files. - conv_mode (str, optional): Convolution mode. Defaults to "robust". - target_beam (Beam, optional): Target PSF. Defaults to None. - cutoff (float, optional): Cutoff PSF BMAJ in arcsec. Defaults to None. - tolerance (float, optional): Common beam tolerance. Defaults to 0.0001. - nsamps (float, optional): Common beam samples. Defaults to 200. - epsilon (float, optional): Commonn beam epsilon. Defaults to 0.0005. - - Raises: - Exception: X and Y pixel sizes are not the same. - Exception: Convolving beam will be undersampled on pixel grid. + files (List[Path]): FITS files to convolve. + conv_mode (Literal["robust", "scipy", "astropy", "astropy_fft"], optional): _description_. Defaults to "robust". + target_beam (Optional[Beam], optional): Target beam. Defaults to None. + cutoff (Optional[float], optional): Cutoff for beamsize in arcse. Defaults to None. + tolerance (float, optional): Radio beam tolerance. Defaults to 0.0001. + nsamps (float, optional): Radio beam nsamps. Defaults to 200. + epsilon (float, optional): Radio beam epsilon. Defaults to 0.0005. Returns: Tuple[Beam, Beams]: Common beam and all beams. @@ -335,87 +333,38 @@ def getmaxbeam( if target_beam is None: # Find the common beam try: - cmn_beam = beams[~flags].common_beam( + target_beam = beams[~flags].common_beam( tolerance=tolerance, epsilon=epsilon, nsamps=nsamps ) except BeamError: logger.warning( "Couldn't find common beam with defaults\nTrying again with smaller tolerance" ) - cmn_beam = beams[~flags].common_beam( + target_beam = beams[~flags].common_beam( tolerance=tolerance * 0.1, epsilon=epsilon, nsamps=nsamps ) # Round up values - cmn_beam = Beam( - major=my_ceil(cmn_beam.major.to(u.arcsec).value, precision=1) * u.arcsec, - minor=my_ceil(cmn_beam.minor.to(u.arcsec).value, precision=1) * u.arcsec, - pa=round_up(cmn_beam.pa.to(u.deg), decimals=2), + target_beam = Beam( + major=my_ceil(target_beam.major.to(u.arcsec).value, precision=1) * u.arcsec, + minor=my_ceil(target_beam.minor.to(u.arcsec).value, precision=1) * u.arcsec, + pa=round_up(target_beam.pa.to(u.deg), decimals=2), ) - else: - cmn_beam = target_beam - - target_header = header - w = WCS(target_header) - pixelscales = proj_plane_pixel_scales(w) - - dx = pixelscales[0] * u.deg - dy = pixelscales[1] * u.deg - if not np.isclose(dx, dy): - raise Exception("GRID MUST BE SAME IN X AND Y") - grid = dy if conv_mode != "robust": - # Get the minor axis of the convolving beams - minorcons = [] - for beam in beams: - try: - minorcons += [cmn_beam.deconvolve(beam).minor.to(u.arcsec).value] - except BeamError as err: - logger.error(err) - logger.warning( - f"Could not deconvolve. New: {cmn_beam!r}, Old: {beam!r} - will set convolving beam to 0.0" - ) - minorcons += [ - cmn_beam.deconvolve(beam, failure_returns_pointlike=True) - .minor.to(u.arcsec) - .value - ] - minorcons = np.array(minorcons) * u.arcsec - samps = minorcons / grid.to(u.arcsec) - # Check that convolving beam will be Nyquist sampled - if any(samps.value < 2): - # Set the convolving beam to be Nyquist sampled - nyq_con_beam = Beam(major=grid * 2, minor=grid * 2, pa=0 * u.deg) - # Find new target based on common beam * Nyquist beam - # Not sure if this is best - but it works - nyq_beam = cmn_beam.convolve(nyq_con_beam) - nyq_beam = Beam( - major=my_ceil(nyq_beam.major.to(u.arcsec).value, precision=1) - * u.arcsec, - minor=my_ceil(nyq_beam.minor.to(u.arcsec).value, precision=1) - * u.arcsec, - pa=round_up(nyq_beam.pa.to(u.deg), decimals=2), - ) - logger.info(f"Smallest common Nyquist sampled beam is: {nyq_beam!r}") - if target_beam is not None: - if target_beam < nyq_beam: - logger.warning("TARGET BEAM WILL BE UNDERSAMPLED!") - raise Exception("CAN'T UNDERSAMPLE BEAM - EXITING") - else: - logger.warning("COMMON BEAM WILL BE UNDERSAMPLED!") - logger.warning("SETTING COMMON BEAM TO NYQUIST BEAM") - cmn_beam = nyq_beam + target_beam = get_nyquist_beam( + target_beam=target_beam, target_header=header, beams=beams + ) - return cmn_beam, beams + return target_beam, beams -def writelog(output: List[Dict], commonbeam_log: str): +def writelog(output: List[BeamLogInfo], commonbeam_log: Path): """Write beamlog file. Args: - output (List[Dict]): List of dictionaries containing output. - commonbeam_log (str): Name of log file. + output (List[BeamLogInfo]): List of beamlog information. + commonbeam_log (Path): Name of log file. """ commonbeam_tab = Table() commonbeam_tab.add_column([out["filename"] for out in output], name="FileName") @@ -434,28 +383,28 @@ def writelog(output: List[Dict], commonbeam_log: str): ) # Target commonbeam_tab.add_column( - [out["final_beam"].major.to(u.deg).value for out in output] * u.deg, + [out["new_beam"].major.to(u.deg).value for out in output] * u.deg, name="Target BMAJ", ) commonbeam_tab.add_column( - [out["final_beam"].minor.to(u.deg).value for out in output] * u.deg, + [out["new_beam"].minor.to(u.deg).value for out in output] * u.deg, name="Target BMIN", ) commonbeam_tab.add_column( - [out["final_beam"].pa.to(u.deg).value for out in output] * u.deg, + [out["new_beam"].pa.to(u.deg).value for out in output] * u.deg, name="Target BPA", ) # Convolving commonbeam_tab.add_column( - [out["conbeam"].major.to(u.deg).value for out in output] * u.deg, + [out["conv_beam"].major.to(u.deg).value for out in output] * u.deg, name="Convolving BMAJ", ) commonbeam_tab.add_column( - [out["conbeam"].minor.to(u.deg).value for out in output] * u.deg, + [out["conv_beam"].minor.to(u.deg).value for out in output] * u.deg, name="Convolving BMIN", ) commonbeam_tab.add_column( - [out["conbeam"].pa.to(u.deg).value for out in output] * u.deg, + [out["conv_beam"].pa.to(u.deg).value for out in output] * u.deg, name="Convolving BPA", ) @@ -472,13 +421,12 @@ def writelog(output: List[Dict], commonbeam_log: str): logger.info(f"Convolving log written to {commonbeam_log}") -def main( - pool, - infile: list = [], +def smooth_fits_files( + infile_list: List[Path] = [], prefix: Optional[str] = None, suffix: Optional[str] = None, - outdir: Optional[str] = None, - conv_mode: str = "robust", + outdir: Optional[Path] = None, + conv_mode: Literal["robust", "scipy", "astropy", "astropy_fft"] = "robust", dryrun: bool = False, bmaj: Optional[float] = None, bmin: Optional[float] = None, @@ -486,80 +434,73 @@ def main( log: Optional[str] = None, circularise: bool = False, cutoff: Optional[float] = None, + listfile: bool = False, tolerance: float = 0.0001, nsamps: int = 200, epsilon: float = 0.0005, - listfile: bool = False, -): - """Main script. + ncores: Optional[int] = None, + executor_type: Literal["thread", "process", "mpi"] = "thread", + verbosity: int = 0, +) -> Beam: + """Smooth a field of 2D images to a common resolution. Args: - pool (mp.Pool): Multiprocessing pool. - infile (list, optional): List of images to convolve. Defaults to []. - prefix (str, optional): Output prefix. Defaults to None. - suffix (str, optional): Output suffix. Defaults to None. - outdir (str, optional): Output directory. Defaults to None. - conv_mode (str, optional): Colvolution mode. Defaults to "robust". - dryrun (bool, optional): Do a dryrun. Defaults to False. - bmaj (float, optional): Target BMAJ. Defaults to None. - bmin (float, optional): Target BMIN. Defaults to None. - bpa (float, optional): Target BPA. Defaults to None. - log (str, optional): Input beamlog. Defaults to None. - circularise (bool, optional): Make beam circular. Defaults to False. - cutoff (float, optional): Cutoff beams. Defaults to None. - tolerance (float, optional): Common tolerance. Defaults to 0.0001. - nsamps (int, optional): Common samples. Defaults to 200. - epsilon (float, optional): Common epsilon. Defaults to 0.0005. - + infile_list (List[Path], optional): List of FITS files to convolve. Defaults to []. + prefix (Optional[str], optional): Output filename prefix. Defaults to None. + suffix (Optional[str], optional): Output filename suffix. Defaults to None. + outdir (Optional[Path], optional): Output directory. Defaults to None - same as input. + conv_mode (Literal["robust", "scipy", "astropy", "astropy_fft"], optional): Convolution mode. Defaults to "robust". + dryrun (bool, optional): Don't save any images. Defaults to False. + bmaj (Optional[float], optional): Target beam major axis in arcsec. Defaults to None. + bmin (Optional[float], optional): Target beam minor axis in arcsec. Defaults to None. + bpa (Optional[float], optional): Target beam poistion angle in deg. Defaults to None. + log (Optional[str], optional): Ouput logfile. Defaults to None. + circularise (bool, optional): Set minor axis to same as major. Defaults to False. + cutoff (Optional[float], optional): Cutoff for beamsize in arcse. Defaults to None. + tolerance (float, optional): Radio beam tolerance. Defaults to 0.0001. + nsamps (int, optional): Radio beam nsamp. Defaults to 200. + epsilon (float, optional): Radio beam epsilon. Defaults to 0.0005. + ncores (Optional[int], optional): Maximum number of cores to use. Defaults to None. + executor_type (Literal["thread", "process", "mpi"], optional): Executor to use. Defaults to "thread". Raises: - Exception: If no files are found. - Exception: If invalid convolution mode is specified. - Exception: If partial target beam is specified. - Exception: If target beam cannot be used. - """ + FileNotFoundError: If no files are found. + ValueError: If target beam is not specified completely. + BeamError: If target beam is too small. + Returns: + Beam: Common beam used. + """ + # Required for multiprocessing logging + log_listener.start() if dryrun: logger.info("Doing a dry run -- no files will be saved") - # Fix up outdir - if outdir is not None: - outdir = os.path.abspath(outdir) + + # Check early as can fail + Executor = get_executor(executor_type) # Get file list if listfile: - with open(infile[0]) as f: - infile = f.read().splitlines() - files = sorted(infile) - if files == []: - raise Exception("No files found!") - - # Parse args - logger.info(f"Convolution mode: {conv_mode}") - if not conv_mode in ["robust", "scipy", "astropy", "astropy_fft"]: - raise Exception("Please select valid convolution method!") - - logger.info(f"Using convolution method {conv_mode}") - if conv_mode == "robust": - logger.info("This is the most robust method. And fast!") - elif conv_mode == "scipy": - logger.info("This fast, but not robust to NaNs or small PSF changes") - else: - logger.info("This is slower, but robust to NaNs, but not to small PSF changes") + assert len(infile_list) == 1, "Only one list file can be provided!" + with open(infile_list[0]) as f: + infile_list = [Path(line) for line in f.read().splitlines()] + files = sorted(infile_list) + if len(files) == 0: + raise FileNotFoundError("No files found!") - nonetest = [test is None for test in [bmaj, bmin, bpa]] + conv_mode = parse_conv_mode(conv_mode) + nonetest = [param is None for param in (bmaj, bmin, bpa)] if all(nonetest): target_beam = None - - elif not all(nonetest) and any(nonetest): - raise Exception("Please specify all target beam params!") - - elif not all(nonetest) and not any(nonetest): + elif any(nonetest): + raise ValueError("Please specify all target beam params!") + else: target_beam = Beam(bmaj * u.arcsec, bmin * u.arcsec, bpa * u.deg) logger.info(f"Target beam is {target_beam!r}") # Find smallest common beam - big_beam, allbeams = getmaxbeam( + common_beam, all_beams = get_common_beam( files, conv_mode=conv_mode, target_beam=target_beam, @@ -570,72 +511,45 @@ def main( ) if target_beam is not None: - logger.info("Checking that target beam will deconvolve...") - - mask_count = 0 - failed = [] - for i, (beam, file) in enumerate( - tqdm( - zip(allbeams, files), - total=len(allbeams), - desc="Deconvolving", - disable=(logger.level > logging.INFO), - ) - ): - if cutoff is not None and beam.major.to(u.arcsec) > cutoff * u.arcsec: - continue - try: - target_beam.deconvolve(beam) - except ValueError: - mask_count += 1 - failed.append(file) - except BeamError as be: - # BeamError should not be raised if beams are equal - if target_beam != beam: - raise BeamError(be) - if mask_count > 0: - logger.warning("The following images could not reach target resolution:") - logger.warning(failed) - - raise Exception("Please choose a larger target beam!") - - else: - new_beam = target_beam + if not check_target_beam(target_beam, all_beams, files, cutoff): + raise BeamError("Please choose a larger target beam!") - else: - new_beam = big_beam + common_beam = target_beam if circularise: logger.info("Circular beam requested, setting BMIN=BMAJ and BPA=0") - new_beam = Beam( - major=new_beam.major, - minor=new_beam.major, + common_beam = Beam( + major=common_beam.major, + minor=common_beam.major, pa=0 * u.deg, ) - logger.info(f"Final beam is {new_beam!r}") - - output = list( - pool.map( - partial( - worker, + logger.info(f"Final beam is {common_beam!r}") + with Executor( + max_workers=ncores, initializer=init_worker, initargs=(log_queue, verbosity) + ) as executor: + futures = [] + for file in files: + future = executor.submit( + beamcon_2d_on_fits, + file=file, outdir=outdir, - new_beam=new_beam, + new_beam=common_beam, conv_mode=conv_mode, suffix=suffix, prefix=prefix, cutoff=cutoff, dryrun=dryrun, - ), - files, - ) - ) + ) + futures.append(future) + beam_log_list = [future.result() for future in futures] if log is not None: - writelog(output, log) + writelog(beam_log_list, log) logger.info("Done!") - return new_beam + log_listener.enqueue_sentinel() + return common_beam def cli(): @@ -657,13 +571,13 @@ def cli(): # Parse the command line options parser = argparse.ArgumentParser( - description=descStr, formatter_class=argparse.RawTextHelpFormatter + description=descStr, formatter_class=argparse.ArgumentDefaultsHelpFormatter ) parser.add_argument( "infile", metavar="infile", - type=str, + type=Path, help="Input FITS image(s) to smooth (can be a wildcard) - beam info must be in header.", nargs="+", ) @@ -696,7 +610,7 @@ def cli(): "-o", "--outdir", dest="outdir", - type=str, + type=Path, default=None, help="Output directory of smoothed FITS image(s) [same as input file].", ) @@ -808,42 +722,53 @@ def cli(): help="nsamps for radio_beam.commonbeam.", ) - group = parser.add_mutually_exclusive_group() - - group.add_argument( + parser.add_argument( "--ncores", - dest="n_cores", - default=1, type=int, - help="Number of processes (uses multiprocessing).", + default=None, + help="Number of cores to use for parallelisation. If None, use all available cores.", ) - group.add_argument( - "--mpi", dest="mpi", default=False, action="store_true", help="Run with MPI." + + parser.add_argument( + "--executor", + type=str, + choices=["thread", "process", "mpi"], + default="thread", + help="Executor to use for parallelisation", ) args = parser.parse_args() - # Set up logging - logger = setup_logger( - verbosity=args.verbosity, - filename=args.logfile, - ) - pool = schwimmbad.choose_pool(mpi=args.mpi, processes=args.n_cores) - if args.mpi: - if not pool.is_master(): - pool.wait() - sys.exit(0) + nonetest = [param is None for param in (args.bmaj, args.bmin, args.bpa)] + if not all(nonetest) and any(nonetest): + parser.error("Please specify all target beam params!") - arg_dict = vars(args) - # pop unwanted arguments - _ = arg_dict.pop("mpi") - _ = arg_dict.pop("n_cores") - _ = arg_dict.pop("verbosity") - _ = arg_dict.pop("logfile") + set_verbosity( + logger=logger, + verbosity=args.verbosity, + ) - _ = main(pool, **arg_dict) - pool.close() + _ = smooth_fits_files( + infile_list=args.infile, + prefix=args.prefix, + suffix=args.suffix, + outdir=args.outdir, + conv_mode=args.conv_mode, + dryrun=args.dryrun, + bmaj=args.bmaj, + bmin=args.bmin, + bpa=args.bpa, + log=args.log, + circularise=args.circularise, + cutoff=args.cutoff, + tolerance=args.tolerance, + nsamps=args.nsamps, + epsilon=args.epsilon, + ncores=args.ncores, + executor_type=args.executor, + verbosity=args.verbosity, + ) if __name__ == "__main__": - cli() + sys.exit(cli()) diff --git a/racs_tools/beamcon_3D.py b/racs_tools/beamcon_3D.py index 3aeda49..f78b0b0 100644 --- a/racs_tools/beamcon_3D.py +++ b/racs_tools/beamcon_3D.py @@ -3,14 +3,12 @@ __author__ = "Alec Thomson" import logging -import os -import stat import sys import warnings -from typing import Dict, List, Tuple +from pathlib import Path +from typing import List, Literal, NamedTuple, Optional, Tuple import numpy as np -import scipy.signal from astropy import units as u from astropy.io import ascii, fits from astropy.table import Table @@ -25,30 +23,15 @@ from racs_tools import au2 from racs_tools.beamcon_2D import my_ceil, round_up -from racs_tools.convolve_uv import smooth -from racs_tools.logging import logger, setup_logger - -mpiSwitch = False -if ( - os.environ.get("OMPI_COMM_WORLD_SIZE") is not None - or int(os.environ.get("SLURM_NTASKS") or 1) > 1 -): - mpiSwitch = True - -if mpiSwitch: - try: - from mpi4py import MPI - except ModuleNotFoundError: - raise ModuleNotFoundError( - "Script called with mpiexec/mpirun/srun, but mpi4py not installed" - ) - # Get the processing environment - comm = MPI.COMM_WORLD - nPE = comm.Get_size() - myPE = comm.Get_rank() -else: - nPE = 1 - myPE = 0 +from racs_tools.convolve_uv import parse_conv_mode, smooth +from racs_tools.logging import ( + init_worker, + log_listener, + log_queue, + logger, + set_verbosity, +) +from racs_tools.parallel import get_executor warnings.filterwarnings(action="ignore", category=SpectralCubeWarning, append=True) warnings.simplefilter("ignore", category=AstropyWarning) @@ -59,111 +42,86 @@ ############################################# -class Error(OSError): - pass - - -class SameFileError(Error): - """Raised when source and destination are the same file.""" - - -class SpecialFileError(OSError): - """Raised when trying to do a kind of operation (e.g. copying) which is - not supported on a special file (e.g. a named pipe)""" - - -class ExecError(OSError): - """Raised when a command could not be executed""" - - -class ReadError(OSError): - """Raised when an archive cannot be read""" - - -class RegistryError(Exception): - """Raised when a registry operation with the archiving - and unpacking registeries fails""" - - -def _samefile(src, dst): - # Macintosh, Unix. - if hasattr(os.path, "samefile"): - try: - return os.path.samefile(src, dst) - except OSError: - return False - - -def copyfile(src, dst, *, follow_symlinks=True): - """Copy data from src to dst. - - If follow_symlinks is not set and src is a symbolic link, a new - symlink will be created instead of copying the file it points to. - - """ - if _samefile(src, dst): - raise SameFileError(f"{src!r} and {dst!r} are the same file") - - for fn in [src, dst]: - try: - st = os.stat(fn) - except OSError: - # File most likely does not exist - pass - else: - # XXX What about other special files? (sockets, devices...) - if stat.S_ISFIFO(st.st_mode): - raise SpecialFileError("`%s` is a named pipe" % fn) - - if not follow_symlinks and os.path.islink(src): - os.symlink(os.readlink(src), dst) - else: - with open(src, "rb") as fsrc: - with open(dst, "wb") as fdst: - copyfileobj(fsrc, fdst) - return dst - - -def copyfileobj(fsrc, fdst, length=16 * 1024): - # copied = 0 - total = os.fstat(fsrc.fileno()).st_size - with tqdm( - total=total, - disable=(logger.level > logging.INFO), - unit_scale=True, - desc="Copying file", - ) as pbar: - while True: - buf = fsrc.read(length) - if not buf: - break - fdst.write(buf) - copied = len(buf) - pbar.update(copied) - - -def getbeams(file: str, header: fits.Header) -> Tuple[Table, int, str]: +class CubeBeams(NamedTuple): + """Cube beams""" + + beam_table: Table + """Beam table""" + nchan: int + """Number of channels""" + beamlog: Path + """Beamlog filename""" + + +class CubeData(NamedTuple): + """Cube data and metadata""" + + filename: Path + """Cube filename""" + outdir: Path + """Output directory""" + dx: u.Quantity + """Pixel size in x direction""" + dy: u.Quantity + """Pixel size in y direction""" + beamlog: Path + """Beamlog filename""" + beam_table: Table + """Beam table""" + beams: Beams + """Beams object""" + nchan: int + """Number of channels""" + mask: np.ndarray + """Mask array""" + + def with_options(self, **kwargs): + """Create a new CubeData instance with keywords updated + + Returns: + CubeData: New CubeData instance with updated attributes + """ + # TODO: Update the signature to have the actual attributes to + # help keep mypy and other linters happy + as_dict = self._asdict() + as_dict.update(kwargs) + + return CubeData(**as_dict) + + +class CommonBeamData(NamedTuple): + """Common beam data""" + + commonbeams: Beams + """Common beams""" + convbeams: Beams + """Convolving beams""" + facs: np.ndarray + """Scaling factors""" + commonbeamlog: Path + """Common beamlog filename""" + + +def get_beams(file: Path, header: fits.Header) -> CubeBeams: """Get beam information from a fits file or beamlog. Args: - file (str): FITS filename. - header (fits.Header): FITS header. + file (Path): FITS filename. + header (fits.Header): FITS headesr. + + Raises: + e: If beam information cannot be extracted. Returns: - Tuple[Table, int, str]: Table of beams, number of beams, and beamlog filename. + CubeBeams: Table of beams, number of beams, and beamlog filename. """ # Add beamlog info to dict just in case - dirname = os.path.dirname(file) - basename = os.path.basename(file) - if dirname == "": - dirname = "." - beamlog = f"{dirname}/beamlog.{basename}".replace(".fits", ".txt") + dirname = file.parent + basename = file.name + beamlog = (dirname / f"beamlog.{basename}").with_suffix(".txt") # First check for CASA beams - try: - headcheck = header["CASAMBM"] - except KeyError: - headcheck = False + headcheck = header.get("CASAMBM", False) if headcheck: logger.info( "CASA beamtable found in header - will use this table for beam calculations" @@ -174,7 +132,7 @@ def getbeams(file: str, header: fits.Header) -> Tuple[Table, int, str]: # Otherwise use beamlog file else: - try: + if beamlog.exists(): logger.info("No CASA beamtable found in header - looking for beamlogs") logger.info(f"Getting beams from {beamlog}") @@ -192,7 +150,7 @@ def getbeams(file: str, header: fits.Header) -> Tuple[Table, int, str]: unit = u.Unit(col[idx + 1 : -1]) beams[col].unit = unit beams[col].name = new_col - except FileNotFoundError: + else: logger.warning("No beamlog found") logger.warning("Using header beam - assuming a constant beam") try: @@ -214,7 +172,7 @@ def getbeams(file: str, header: fits.Header) -> Tuple[Table, int, str]: raise e nchan = len(beams) - return beams, nchan, beamlog + return CubeBeams(beams, nchan, beamlog) def getfacs( @@ -236,7 +194,7 @@ def getfacs( if conbm == Beam(major=0 * u.deg, minor=0 * u.deg, pa=0 * u.deg): fac = 1.0 else: - fac, amp, outbmaj, outbmin, outbpa = au2.gauss_factor( + fac, _, _, _, _ = au2.gauss_factor( beamConv=[ conbm.major.to(u.arcsec).value, conbm.minor.to(u.arcsec).value, @@ -255,44 +213,28 @@ def getfacs( return facs -def cpu_to_use(max_cpu: int, count: int) -> int: - """Find number of cpus to use. - Find the right number of cpus to use when dividing up a task, such - that there are no remainders. - Args: - max_cpu (int): Maximum number of cores to use for a process. - count (int): Number of tasks. - - Returns: - int: Maximum number of cores to be used that divides into the number - of tasks. - """ - factors_list = [] - for i in range(1, count + 1): - if count % i == 0: - factors_list.append(i) - factors = np.array(factors_list) - return max(factors[factors <= max_cpu]) - - -def worker( - filename: str, +def smooth_plane( + filename: Path, idx: int, - **kwargs, + old_beam: Beam, + new_beam: Beam, + dx: u.Quantity, + dy: u.Quantity, + conv_mode: Literal["robust", "scipy", "astropy", "astropy_fft"] = "robust", ) -> np.ndarray: - """Smooth worker function. - - Extracts a single image from a FITS cube and smooths it. + """Smooth a single plane of a cube. Args: - filename (str): FITS cube filename. + filename (Path): FITS filename. idx (int): Channel index. - - Kwargs: - Passed to :func:`smooth`. + old_beam (Beam): Old beam. + new_beam (Beam): Target beam. + dx (u.Quantity): Pixel size in x direction. + dy (u.Quantity): Pixel size in y direction. + conv_mode (Literal["robust", "scipy", "astropy", "astropy_fft"], optional): Convolution mode. Defaults to "robust". Returns: - np.ndarray: Smoothed channel image. + np.ndarray: Convolved plane. """ cube = SpectralCube.read(filename) # Get spectral axis @@ -306,29 +248,33 @@ def worker( plane = cube.unmasked_data[slicer].value.astype(np.float32) logger.debug(f"Size of plane is {(plane.nbytes*u.byte).to(u.MB)}") - newim = smooth(image=plane, **kwargs) + newim = smooth( + image=plane, + old_beam=old_beam, + new_beam=new_beam, + dx=dx, + dy=dy, + conv_mode=conv_mode, + ) + del plane return newim -def makedata(files: List[str], outdir: str) -> Dict[str, dict]: +def make_data(files: List[Path], outdir: List[Path]) -> List[CubeData]: """Create data dictionary. Args: - files (List[str]): List of filenames. - outdir (str): Output directory. + files (List[Path]): List of filenames. + outdir (List[Path]): Output directories. Raises: Exception: If pixel grid in X and Y is not the same. Returns: - Dict[Dict]: Data and metadata for each channel and image. + List[CubeData]: Data and metadata for each channel and image. """ - datadict = {} # type: Dict[str,dict] - for i, (file, out) in enumerate(zip(files, outdir)): - # Set up files - datadict[f"cube_{i}"] = {} - datadict[f"cube_{i}"]["filename"] = file - datadict[f"cube_{i}"]["outdir"] = out + cube_data_list: List[CubeData] = [] + for _, (file, out) in enumerate(zip(files, outdir)): # Get metadata header = fits.getheader(file) w = WCS(header) @@ -336,39 +282,50 @@ def makedata(files: List[str], outdir: str) -> Dict[str, dict]: dxas = pixelscales[0] * u.deg dyas = pixelscales[1] * u.deg - - datadict[f"cube_{i}"]["dx"] = dxas - datadict[f"cube_{i}"]["dy"] = dyas if not np.isclose(dxas, dyas): - raise Exception("GRID MUST BE SAME IN X AND Y") + raise ValueError(f"GRID MUST BE SAME IN X AND Y, got {dxas} and {dyas}") # Get beam info - beam, nchan, beamlog = getbeams(file=file, header=header) - datadict[f"cube_{i}"]["beamlog"] = beamlog - datadict[f"cube_{i}"]["beam"] = beam - datadict[f"cube_{i}"]["nchan"] = nchan - return datadict + beam_table, nchan, beamlog = get_beams(file=file, header=header) + # Construct beams + bmaj = np.array(beam_table["BMAJ"]) * beam_table["BMAJ"].unit + bmin = np.array(beam_table["BMIN"]) * beam_table["BMIN"].unit + bpa = np.array(beam_table["BPA"]) * beam_table["BPA"].unit + beams = Beams(major=bmaj, minor=bmin, pa=bpa) + cube_data = CubeData( + filename=file, + outdir=out, + dx=dxas, + dy=dyas, + beamlog=beamlog, + beam_table=beam_table, + beams=beams, + nchan=nchan, + mask=np.array([False] * nchan), + ) + cube_data_list.append(cube_data) + return cube_data_list def commonbeamer( - datadict: Dict[str, dict], + cube_data_list: List[CubeData], nchans: int, - conv_mode: str = "robust", - mode: str = "natural", - suffix: str = None, - target_beam: Beam = None, + conv_mode: Literal["robust", "scipy", "astropy", "astropy_fft"] = "robust", + mode: Literal["natural", "total"] = "natural", + suffix: Optional[str] = None, + target_beam: Optional[Beam] = None, circularise: bool = False, tolerance: float = 0.0001, nsamps: int = 200, epsilon: float = 0.0005, -) -> Dict[str, dict]: +) -> List[CommonBeamData]: """Find common beam for all channels. Computed beams will be written to convolving beam logger. Args: - datadict (Dict[str, dict]): Main data dictionary. + cube_data_list (List[CubeData]): List of cube data. nchans (int): Number of channels. - conv_mode (str, optional): Convolution mode. Defaults to "robust". - mode (str, optional): Frequency mode. Defaults to "natural". + conv_mode (Literal["robust", "scipy", "astropy", "astropy_fft"], optional): Convolution mode. Defaults to "robust". + mode (Literal["natural", "total"], optional): Frequency mode. Defaults to "natural". target_beam (Beam, optional): Target PSF. Defaults to None. circularise (bool, optional): Circularise PSF. Defaults to False. tolerance (float, optional): Common beam tolerance. Defaults to 0.0001. @@ -379,7 +336,7 @@ def commonbeamer( Exception: If convolving beam will be undersampled on pixel grid. Returns: - Dict[str, dict]: Updated data dictionary. + List[CommonBeamData]: Common beam data for each channel and cube. """ if suffix is None: suffix = mode @@ -394,11 +351,11 @@ def commonbeamer( majors_list = [] minors_list = [] pas_list = [] - for key in datadict.keys(): - major = datadict[key]["beams"][n].major - minor = datadict[key]["beams"][n].minor - pa = datadict[key]["beams"][n].pa - if datadict[key]["mask"][n]: + for cube_data in cube_data_list: + major = cube_data.beams[n].major + minor = cube_data.beams[n].minor + pa = cube_data.beams[n].pa + if cube_data.mask[n]: major *= np.nan minor *= np.nan pa *= np.nan @@ -454,7 +411,7 @@ def commonbeamer( pa=round_up(commonbeam.pa.to(u.deg), decimals=2), ) - grid = datadict[key]["dy"] + grid = cube_data.dy if conv_mode != "robust": # Get the minor axis of the convolving beams minorcons = [] @@ -507,13 +464,13 @@ def commonbeamer( majors_list = [] minors_list = [] pas_list = [] - for key in datadict.keys(): - major = datadict[key]["beams"].major - minor = datadict[key]["beams"].minor - pa = datadict[key]["beams"].pa - major[datadict[key]["mask"]] *= np.nan - minor[datadict[key]["mask"]] *= np.nan - pa[datadict[key]["mask"]] *= np.nan + for cube_data in cube_data_list: + major = cube_data.beams.major + minor = cube_data.beams.minor + pa = cube_data.beams.pa + major[cube_data.mask] *= np.nan + minor[cube_data.mask] *= np.nan + pa[cube_data.mask] *= np.nan majors_list.append(major.to(u.arcsec).value) minors_list.append(minor.to(u.arcsec).value) pas_list.append(pa.to(u.deg).value) @@ -554,7 +511,7 @@ def commonbeamer( ) if conv_mode != "robust": # Get the minor axis of the convolving beams - grid = datadict[key]["dy"] + grid = cube_data.dy minorcons = [] for beam in big_beams[~np.isnan(big_beams)]: minorcons += [commonbeam.deconvolve(beam).minor.to(u.arcsec).value] @@ -604,8 +561,9 @@ def commonbeamer( for i, commonbeam in enumerate(commonbeams): logger.info(f"Channel {i}: {commonbeam!r}") - for key in tqdm( - datadict.keys(), + common_beam_data_list: List[CommonBeamData] = [] + for cube_data in tqdm( + cube_data_list, desc="Getting convolution data", disable=(logger.level > logging.INFO), ): @@ -613,8 +571,8 @@ def commonbeamer( conv_bmaj = [] conv_bmin = [] conv_bpa = [] - old_beams = datadict[key]["beams"] - masks = datadict[key]["mask"] + old_beams = cube_data.beams + masks = cube_data.mask for commonbeam, old_beam, mask in zip(commonbeams, old_beams, masks): if mask: convbeam = Beam( @@ -654,18 +612,21 @@ def commonbeamer( # Get gaussian beam factors facs = getfacs( - beams=datadict[key]["beams"], + beams=cube_data.beams, convbeams=convbeams, - dx=datadict[key]["dx"], - dy=datadict[key]["dy"], + dx=cube_data.dx, + dy=cube_data.dy, ) - datadict[key]["facs"] = facs # Setup conv beamlog - datadict[key]["convbeams"] = convbeams - commonbeam_log = datadict[key]["beamlog"].replace(".txt", f".{suffix}.txt") - datadict[key]["commonbeams"] = commonbeams - datadict[key]["commonbeamlog"] = commonbeam_log + commonbeam_log = cube_data.beamlog.with_suffix(f".{suffix}.txt") + common_beam_data = CommonBeamData( + commonbeams=commonbeams, + convbeams=convbeams, + facs=facs, + commonbeamlog=commonbeam_log, + ) + common_beam_data_list.append(common_beam_data) commonbeam_tab = Table() # Save target @@ -695,59 +656,67 @@ def commonbeamer( ) logger.info(f"Convolving log written to {commonbeam_log}") - return datadict + return common_beam_data_list -def masking(nchans: int, datadict: dict, cutoff: u.Quantity = None) -> dict: - """Apply masking to data. +def masking( + cube_data_list: List[CubeData], cutoff: Optional[u.Quantity] = None +) -> List[CubeData]: + """Apply masking to cubes Args: - nchans (int): Number of channels in cubes. - datadict (dict): Data dictionary. - cutoff (None, optional): Cutoff BMAJ size for masking. Defaults to None. + cube_data_list (List[CubeData]): List of cube data. + cutoff (Optional[u.Quantity], optional): Cutoff for PSF. Defaults to None. Returns: - dict: Updated data dictionary. + List[CubeData]: List of masked cube data. """ - for key in datadict.keys(): - mask = np.array([False] * nchans) - datadict[key]["mask"] = mask - if cutoff is not None: - for key in datadict.keys(): - majors = datadict[key]["beams"].major - cutmask = majors > cutoff - datadict[key]["mask"] += cutmask - # Check for pipeline masking nullbeam = Beam(major=0 * u.deg, minor=0 * u.deg, pa=0 * u.deg) tiny = np.finfo(np.float32).tiny # Smallest positive number - used to mask smallbeam = Beam(major=tiny * u.deg, minor=tiny * u.deg, pa=tiny * u.deg) - for key in datadict.keys(): + masked_cube_data_list: List[CubeData] = [] + for cube_data in cube_data_list: + mask = cube_data.mask + if cutoff is not None: + majors = cube_data.beams.major + cutmask = majors > cutoff + mask += cutmask nullmask = np.logical_or( - datadict[key]["beams"] == nullbeam, - datadict[key]["beams"] == smallbeam, + cube_data.beams == nullbeam, + cube_data.beams == smallbeam, ) - datadict[key]["mask"] += nullmask - return datadict + mask += nullmask + masked_cube_data = cube_data.with_options(mask=mask) + masked_cube_data_list.append(masked_cube_data) + return masked_cube_data_list def initfiles( - filename: str, + filename: Path, commonbeams: Beams, - outdir: str, - mode: str, - suffix=None, - prefix=None, - ref_chan=None, -): + outdir: Path, + mode: Literal["natural", "total"], + suffix="", + prefix="", + ref_chan: Optional[int] = None, +) -> Path: """Initialise output files Args: - datadict (dict): Main data dict - indexed - mode (str): 'total' or 'natural' + filename (Path): Original filename + commonbeams (Beams): Common beams for each channel + outdir (Path): Output directory + mode (Literal["natural", "total"]): Frequency mode - natural or total + suffix (str, optional): Output suffix. Defaults to "". + prefix (str, optional): Output prefix. Defaults to "". + ref_chan (Optional[int], optional): Reference channel index. Defaults to None. + + Raises: + ValueError: If no Stokes axis is found in the header Returns: - datadict: Updated datadict + Path: Output filename """ logger.debug(f"Reading {filename}") with fits.open(filename, memmap=True, mode="denywrite") as hdulist: @@ -785,7 +754,7 @@ def initfiles( nstokes = stokes_axis.array_shape[0] if nstokes > 1: logger.critical( - f"More than one Stokes parameter in header. Only the first one will be used." + "More than one Stokes parameter in header. Only the first one will be used." ) pols = np.zeros_like(chans) # Zeros because we take the first one if any( @@ -839,29 +808,30 @@ def initfiles( # Set up output file if suffix is None: suffix = mode - outname = os.path.basename(filename) - outname = outname.replace(".fits", "") + f".{suffix}.fits" - if prefix is not None: - outname = prefix + outname + outfile = Path(filename.name) + if suffix: + outfile = outfile.with_suffix(f".{suffix}.fits") + if prefix: + outfile = Path(prefix + outfile.name) - outfile = os.path.join(outdir, outname) + outfile = outdir / outfile.name logger.info(f"Initialising to {outfile}") new_hdulist.writeto(outfile, overwrite=True) return outfile -def readlogs(commonbeam_log: str) -> Tuple[Beams, Beams, np.ndarray]: +def readlogs(commonbeam_log: Path) -> CommonBeamData: """Read convolving log files Args: - commonbeam_log (str): Filename of the common beam log + commonbeam_log (Path): Filename of the common beam log Raises: Exception: If the log file is not found Returns: - Tuple[Beams, Beams, np.ndarray]: Common beams, convolving beams, and scaling factors + CommonBeamData: Common beams, convolving beams, and scaling factors """ logger.info(f"Reading from {commonbeam_log}") try: @@ -883,298 +853,245 @@ def readlogs(commonbeam_log: str) -> Tuple[Beams, Beams, np.ndarray]: logger.info("Final beams are:") for i, commonbeam in enumerate(commonbeams): logger.info(f"Channel {i}: {commonbeam!r}") - return commonbeams, convbeams, facs + return CommonBeamData( + commonbeams=commonbeams, + convbeams=convbeams, + facs=facs, + commonbeamlog=commonbeam_log, + ) + + +def smooth_and_write_plane( + chan: int, + cube_data: CubeData, + common_beam_data: CommonBeamData, + outfile: Path, + conv_mode: Literal["robust", "scipy", "astropy", "astropy_fft"] = "robust", +) -> None: + """Smooth a single plane of a cube and write to a output file. + + Args: + chan (int): Channel index. + cube_data (CubeData): Cube data. + common_beam_data (CommonBeamData): Common beam data. + outfile (Path): Output filename. + conv_mode (Literal["robust", "scipy", "astropy", "astropy_fft"], optional): _description_. Defaults to "robust". + """ + logger.debug(f"{outfile} - channel {chan} - Started") + + newim = smooth_plane( + filename=cube_data.filename, + idx=chan, + old_beam=cube_data.beams[chan], + new_beam=common_beam_data.commonbeams[chan], + dx=cube_data.dx, + dy=cube_data.dy, + conv_mode=conv_mode, + ) + + with fits.open(outfile, mode="update", memmap=True) as outfh: + # Find which axis is the spectral and stokes + wcs = WCS(outfh[0]) + axis_type_dict = wcs.get_axis_types()[::-1] # Reverse order for fits + axis_names = [i["coordinate_type"] for i in axis_type_dict] + spec_idx = axis_names.index("spectral") + stokes_idx = axis_names.index("stokes") + slicer = [slice(None)] * len(outfh[0].data.shape) + slicer[spec_idx] = chan + slicer[stokes_idx] = 0 # only do single stokes + slicer = tuple(slicer) + + outfh[0].data[slicer] = newim.astype(np.float32) # make sure data is 32-bit + outfh.flush() + del newim + + logger.info(f"{outfile} - channel {chan} - Done") -def main( - infile: list, +def smooth_fits_cube( + infiles_list: List[Path], uselogs: bool = False, - mode: str = "natural", - conv_mode: str = "robust", + mode: Literal["natural", "total"] = "natural", + conv_mode: Literal["robust", "scipy", "astropy", "astropy_fft"] = "robust", dryrun: bool = False, prefix: str = None, suffix: str = None, - outdir: str = None, - bmaj: float = None, - bmin: float = None, - bpa: float = None, - cutoff: float = None, + outdir: Optional[Path] = None, + bmaj: Optional[float] = None, + bmin: Optional[float] = None, + bpa: Optional[float] = None, + cutoff: Optional[float] = None, circularise: bool = False, - ref_chan: int = None, + ref_chan: Optional[int] = None, tolerance: float = 0.0001, epsilon: float = 0.0005, nsamps: int = 200, -): - """main script + ncores: Optional[int] = None, + executor_type: Literal["thread", "process", "mpi"] = "thread", + verbosity: int = 0, +) -> Tuple[List[CubeData], List[CommonBeamData]]: + """Convolve a set of FITS cubes to a common beam. Args: - args (args): Command line args + infiles_list (List[Path]): FITS files to convolve. + uselogs (bool, optional): Use beamlogs. Defaults to False. + mode (Literal["natural", "total"], optional): Frequency mode. Defaults to "natural". + conv_mode (Literal["robust", "scipy", "astropy", "astropy_fft"], optional): Convolution mode. Defaults to "robust". + dryrun (bool, optional): Do not write any images. Defaults to False. + prefix (str, optional): Output filename prefix. Defaults to None. + suffix (str, optional): Output filename suffix. Defaults to None. + outdir (Optional[Path], optional): Output directory. Defaults to None. + bmaj (Optional[float], optional): Target beam major axis in arcsec. Defaults to None. + bmin (Optional[float], optional): Target beam minor axis in arcsec. Defaults to None. + bpa (Optional[float], optional): Target beam position angle in deg. Defaults to None. + cutoff (Optional[float], optional): Beam cutoff in arcsec. Defaults to None. + circularise (bool, optional): Set minor axis to major axis. Defaults to False. + ref_chan (Optional[int], optional): Reference channel for PSF in header. Defaults to None. + tolerance (float, optional): Radio beam tolerance. Defaults to 0.0001. + epsilon (float, optional): Radio beam epsilon. Defaults to 0.0005. + nsamps (int, optional): Radio beam nsamps. Defaults to 200. + ncores (Optional[int], optional): Radio beam ncores. Defaults to None. + executor_type (Literal["thread", "process", "mpi"], optional): Executor type. Defaults to "thread". + Raises: + ValueError: If mode is not 'natural' or 'total'. + ValueError: If target beam is not fully specified. + FileNotFoundError: If no files are found. + ValueError: If number of channels are not equal. + + Returns: + Tuple[List[CubeData], List[CommonBeamData]]: Cube data and common beam data. """ + # Required for multiprocessing logging + log_listener.start() + if dryrun: + logger.info("Doing a dry run -- no files will be saved") - if myPE == 0: - logger.info(f"Total number of MPI ranks = {nPE}") - # Parse args - if dryrun: - logger.info("Doing a dry run -- no files will be saved") - - # Check mode - logger.info(f"Mode is {mode}") - if mode == "natural" and mode == "total": - raise Exception("'mode' must be 'natural' or 'total'") - if mode == "natural": - logger.info("Smoothing each channel to a common resolution") - if mode == "total": - logger.info("Smoothing all channels to a common resolution") - - # Check cutoff - if cutoff is not None: - cutoff *= u.arcsec - logger.info(f"Cutoff is: {cutoff}") - - # Check target - logger.debug(conv_mode) - if ( - not conv_mode == "robust" - and not conv_mode == "scipy" - and not conv_mode == "astropy" - and not conv_mode == "astropy_fft" - ): - raise Exception("Please select valid convolution method!") + # Check early as can fail + Executor = get_executor(executor_type) - logger.info(f"Using convolution method {conv_mode}") - if conv_mode == "robust": - logger.info("This is the most robust method. And fast!") - elif conv_mode == "scipy": - logger.info("This fast, but not robust to NaNs or small PSF changes") - else: - logger.info( - "This is slower, but robust to NaNs, but not to small PSF changes" - ) + # Check mode + logger.info(f"Mode is {mode}") + if mode == "natural": + logger.info("Smoothing each channel to a common resolution") + elif mode == "total": + logger.info("Smoothing all channels to a common resolution") + else: + raise ValueError(f"Mode must be 'natural' or 'total', not '{mode}'") + + # Check cutoff + if cutoff is not None: + cutoff *= u.arcsec + logger.info(f"Cutoff is: {cutoff}") - nonetest = [test is None for test in [bmaj, bmin, bpa]] + # Check target + conv_mode = parse_conv_mode(conv_mode) - if not all(nonetest) and mode != "total": - raise Exception("Only specify a target beam in 'total' mode") + nonetest = [param is None for param in (bmaj, bmin, bpa)] + if all(nonetest): + target_beam = None + elif any(nonetest): + raise ValueError("Please specify all target beam params!") + else: + target_beam = Beam(bmaj * u.arcsec, bmin * u.arcsec, bpa * u.deg) + logger.info(f"Target beam is {target_beam!r}") - if all(nonetest): - target_beam = None + files = sorted(infiles_list) + if len(files) == 0: + raise FileNotFoundError("No files found!") - elif not all(nonetest) and any(nonetest): - raise Exception("Please specify all target beam params!") + outdir_list: List[Path] = ( + [outdir] * len(files) if outdir is not None else [f.parent for f in files] + ) - elif not all(nonetest) and not any(nonetest): - target_beam = Beam(bmaj * u.arcsec, bmin * u.arcsec, bpa * u.deg) - logger.info(f"Target beam is {target_beam!r}") + cube_data_list = make_data(files, outdir_list) - files = sorted(infile) - if files == []: - raise Exception("No files found!") + # Sanity check channel counts + nchans = np.array([cube_data.nchan for cube_data in cube_data_list]) + check = all(nchans == nchans[0]) - if outdir is not None: - if outdir[-1] == "/": - outdir = outdir[:-1] - outdir = [outdir] * len(files) - else: - outdir = [] - for f in files: - out = os.path.dirname(f) - if out == "": - out = "." - outdir += [out] + if not check: + raise ValueError(f"Unequal number of spectral channels! Got {nchans}") - datadict = makedata(files, outdir) + nchans = nchans[0] - # Sanity check channel counts - nchans = np.array([datadict[key]["nchan"] for key in datadict.keys()]) - check = all(nchans == nchans[0]) + # Check suffix + if suffix is None: + suffix = mode - if not check: - raise Exception("Unequal number of spectral channels!") + # Apply some masking + cube_data_list = masking(cube_data_list, cutoff=cutoff) - else: - nchans = nchans[0] - - # Check suffix - if suffix is None: - suffix = mode - - # Construct Beams objects - for key in datadict.keys(): - beam = datadict[key]["beam"] - bmaj = np.array(beam["BMAJ"]) * beam["BMAJ"].unit - bmin = np.array(beam["BMIN"]) * beam["BMIN"].unit - bpa = np.array(beam["BPA"]) * beam["BPA"].unit - beams = Beams(major=bmaj, minor=bmin, pa=bpa) - datadict[key]["beams"] = beams - - # Apply some masking - datadict = masking(nchans=nchans, datadict=datadict, cutoff=cutoff) - - if not uselogs: - datadict = commonbeamer( - datadict=datadict, - nchans=nchans, - conv_mode=conv_mode, - target_beam=target_beam, - mode=mode, - suffix=suffix, - circularise=circularise, - tolerance=tolerance, - nsamps=nsamps, - epsilon=epsilon, - ) - else: - logger.info("Reading from convolve beamlog files") - for key in datadict.keys(): - commonbeam_log = datadict[key]["beamlog"].replace( - ".txt", f".{suffix}.txt" - ) - commonbeams, convbeams, facs = readlogs(commonbeam_log) - # Save to datadict - datadict[key]["facs"] = facs - datadict[key]["convbeams"] = convbeams - datadict[key]["commonbeams"] = commonbeams - datadict[key]["commonbeamlog"] = commonbeam_log + if not uselogs: + common_beam_data_list = commonbeamer( + cube_data_list=cube_data_list, + nchans=nchans, + conv_mode=conv_mode, + target_beam=target_beam, + mode=mode, + suffix=suffix, + circularise=circularise, + tolerance=tolerance, + nsamps=nsamps, + epsilon=epsilon, + ) else: - files = None - datadict = None - nchans = None + logger.info("Reading from convolve beamlog files") + common_beam_data_list: List[CommonBeamData] = [] + for cube_data in cube_data_list: + commonbeam_log = cube_data.beamlog.with_suffix(f".{suffix}.txt") + common_beam_data = readlogs(commonbeam_log) + common_beam_data_list.append(common_beam_data) if dryrun: logger.info("Doing a dryrun so all done!") - return datadict + return cube_data_list, common_beam_data_list # Init the files in parallel - if myPE == 0: - logger.info("Initialising output files") - if mpiSwitch: - files = comm.bcast(files, root=0) - datadict = comm.bcast(datadict, root=0) - nchans = comm.bcast(nchans, root=0) - - inputs = list(datadict.keys()) - dims = len(inputs) - - if nPE > dims: - my_start = myPE - my_end = myPE - - else: - count = dims // nPE - rem = dims % nPE - if myPE < rem: - # The first 'remainder' ranks get 'count + 1' tasks each - my_start = myPE * (count + 1) - my_end = my_start + count - - else: - # The remaining 'size - remainder' ranks get 'count' task each - my_start = myPE * count + rem - my_end = my_start + (count - 1) - - if myPE == 0: - logger.info(f"There are {dims} files to init") - logger.debug(f"My start is {my_start}, my end is {my_end}") - + logger.info("Initialising output files") # Init output files and retrieve file names - outfile_dict = {} - for inp in inputs[my_start : my_end + 1]: - outfile = initfiles( - filename=datadict[inp]["filename"], - commonbeams=datadict[inp]["commonbeams"], - outdir=datadict[inp]["outdir"], - mode=mode, - suffix=suffix, - prefix=prefix, - ref_chan=ref_chan, - ) - outfile_dict.update({inp: outfile}) - - if mpiSwitch: - # Send to master proc - outlist = comm.gather(outfile_dict, root=0) - else: - outlist = [outfile_dict] - - if mpiSwitch: - comm.Barrier() - - # Now do the convolution in parallel - if myPE == 0: - # Conver list to dict and save to main dict - outlist_dict = {} - for d in outlist: - outlist_dict.update(d) - # Also make inputs list - inputs = [] - for key in datadict.keys(): - datadict[key]["outfile"] = outlist_dict[key] + with Executor( + max_workers=ncores, initializer=init_worker, initargs=(log_queue, verbosity) + ) as executor: + futures = [] + for cube_data, common_beam_data in zip(cube_data_list, common_beam_data_list): + future = executor.submit( + initfiles, + filename=cube_data.filename, + commonbeams=common_beam_data.commonbeams, + outdir=cube_data.outdir, + mode=mode, + suffix=suffix, + prefix=prefix, + ref_chan=ref_chan, + ) + futures.append(future) + outfiles = [future.result() for future in futures] + + with Executor( + max_workers=ncores, initializer=init_worker, initargs=(log_queue, verbosity) + ) as executor: + futures = [] + for cube_data, common_beam_data, outfile in zip( + cube_data_list, common_beam_data_list, outfiles + ): for chan in range(nchans): - inputs.append((key, chan)) - - else: - datadict = None - inputs = None - if mpiSwitch: - comm.Barrier() - if mpiSwitch: - inputs = comm.bcast(inputs, root=0) - datadict = comm.bcast(datadict, root=0) - - dims = len(files) * nchans - assert len(inputs) == dims - count = dims // nPE - rem = dims % nPE - if myPE < rem: - # The first 'remainder' ranks get 'count + 1' tasks each - my_start = myPE * (count + 1) - my_end = my_start + count + future = executor.submit( + smooth_and_write_plane, + chan=chan, + cube_data=cube_data, + common_beam_data=common_beam_data, + outfile=outfile, + conv_mode=conv_mode, + ) + futures.append(future) + _ = [future.result() for future in futures] - else: - # The remaining 'size - remainder' ranks get 'count' task each - my_start = myPE * count + rem - my_end = my_start + (count - 1) - if myPE == 0: - logger.info(f"There are {nchans} channels, across {len(files)} files") - logger.debug(f"My start is {my_start}, my end is {my_end}") - - for inp in inputs[my_start : my_end + 1]: - key, chan = inp - outfile = datadict[key]["outfile"] - logger.debug(f"{outfile} - channel {chan} - Started") - - cubedict = datadict[key] - newim = worker( - filename=cubedict["filename"], - idx=chan, - dx=cubedict["dx"], - dy=cubedict["dy"], - old_beam=cubedict["beams"][chan], - final_beam=cubedict["commonbeams"][chan], - conbeam=cubedict["convbeams"][chan], - sfactor=cubedict["facs"][chan], - conv_mode=conv_mode, - ) + logger.info("Done!") - with fits.open(outfile, mode="update", memmap=True) as outfh: - # Find which axis is the spectral and stokes - wcs = WCS(outfh[0]) - axis_type_dict = wcs.get_axis_types()[::-1] # Reverse order for fits - axis_names = [i["coordinate_type"] for i in axis_type_dict] - spec_idx = axis_names.index("spectral") - stokes_idx = axis_names.index("stokes") - slicer = [slice(None)] * len(outfh[0].data.shape) - slicer[spec_idx] = chan - slicer[stokes_idx] = 0 # only do single stokes - slicer = tuple(slicer) - - outfh[0].data[slicer] = newim.astype(np.float32) # make sure data is 32-bit - outfh.flush() - logger.info(f"{outfile} - channel {chan} - Done") + log_listener.enqueue_sentinel() - logger.info("Done!") - return datadict + return cube_data_list, common_beam_data_list def cli(): @@ -1185,8 +1102,6 @@ def cli(): descStr = """ Smooth a field of 3D cubes to a common resolution. - - Parallelisation is done using MPI. - - Default names of output files are /path/to/beamlog{infile//.fits/.{SUFFIX}.fits} - By default, the smallest common beam will be automatically computed. @@ -1199,13 +1114,13 @@ def cli(): # Parse the command line options parser = argparse.ArgumentParser( - description=descStr, formatter_class=argparse.RawTextHelpFormatter + description=descStr, formatter_class=argparse.ArgumentDefaultsHelpFormatter ) parser.add_argument( "infile", metavar="infile", - type=str, + type=Path, help="""Input FITS image(s) to smooth (can be a wildcard) - CASA beamtable will be used if present i.e. if CASAMBM = T - Otherwise beam info must be in co-located beamlog files. @@ -1262,7 +1177,7 @@ def cli(): "--dryrun", dest="dryrun", action="store_true", - help="Compute common beam and stop [False].", + help="Compute common beam and stop.", ) parser.add_argument( @@ -1287,7 +1202,7 @@ def cli(): "-o", "--outdir", dest="outdir", - type=str, + type=Path, default=None, help="Output directory of smoothed FITS image(s) [None - same as input].", ) @@ -1368,21 +1283,51 @@ def cli(): help="nsamps for radio_beam.commonbeam.", ) + parser.add_argument( + "--ncores", + type=int, + default=None, + help="Number of cores to use for parallelisation. If None, use all available cores.", + ) + + parser.add_argument( + "--executor_type", + type=str, + default="thread", + choices=["thread", "process", "mpi"], + help="Executor type for parallelisation.", + ) + args = parser.parse_args() # Set up logging - logger = setup_logger( + set_verbosity( + logger=logger, verbosity=args.verbosity, - filename=args.logfile, ) - arg_dict = vars(args) - # Pop the verbosity argument - _ = arg_dict.pop("verbosity") - # Pop the logfile argument - _ = arg_dict.pop("logfile") - - _ = main(**arg_dict) + _ = smooth_fits_cube( + infiles_list=args.infile, + uselogs=args.uselogs, + mode=args.mode, + conv_mode=args.conv_mode, + dryrun=args.dryrun, + prefix=args.prefix, + suffix=args.suffix, + outdir=args.outdir, + bmaj=args.bmaj, + bmin=args.bmin, + bpa=args.bpa, + cutoff=args.cutoff, + circularise=args.circularise, + ref_chan=args.ref_chan, + tolerance=args.tolerance, + epsilon=args.epsilon, + ncores=args.ncores, + nsamps=args.nsamps, + executor_type=args.executor_type, + verbosity=args.verbosity, + ) if __name__ == "__main__": diff --git a/racs_tools/convolve_uv.py b/racs_tools/convolve_uv.py index 6af8d9a..0f7cc45 100644 --- a/racs_tools/convolve_uv.py +++ b/racs_tools/convolve_uv.py @@ -2,23 +2,138 @@ """ Fast convolution in the UV domain """ __author__ = "Wasim Raja" - -from typing import Tuple +import gc +from typing import Literal, NamedTuple, Optional, Tuple import astropy.units as units import numpy as np import scipy.signal from astropy import convolution from astropy import units as u -from radio_beam import Beam +from astropy.io import fits +from astropy.wcs import WCS +from astropy.wcs.utils import proj_plane_pixel_scales +from radio_beam import Beam, Beams +from radio_beam.utils import BeamError import racs_tools.gaussft as gaussft +from racs_tools import au2 from racs_tools.logging import logger +class ConvolutionResult(NamedTuple): + """Result of convolution""" + + image: np.ndarray + """Convolved image""" + scaling_factor: float + """Scaling factor for the image in Jy/beam""" + + +def round_up(n: float, decimals: int = 0) -> float: + """Round to number of decimals + + Args: + n (float): Number to round. + decimals (int, optional): Number of decimals. Defaults to 0. + + Returns: + float: Rounded number. + """ + multiplier = 10**decimals + return np.ceil(n * multiplier) / multiplier + + +def my_ceil(a: float, precision: float = 0.0) -> float: + """Modified ceil function to round up to precision + + Args: + a (float): Number to round. + precision (float, optional): Precision of rounding. Defaults to 0. + + Returns: + float: Rounded number. + """ + return np.round(a + 0.5 * 10 ** (-precision), precision) + + +def get_nyquist_beam( + target_beam: Beam, + target_header: fits.Header, + beams: Beams, +) -> Beam: + """Get the Nyquist sampled beam + + Args: + target_beam (Beam): Target beam. + target_header (fits.Header): Target header. + beams (Beams): All the beams to be convolved. + + Raises: + ValueError: If grid is not same in x and y. + ValueError: If beam is undersampled. + + Returns: + Beam: Nyquist sampled beam. + """ + wcs = WCS(target_header) + pixelscales = proj_plane_pixel_scales(wcs) + + dx = pixelscales[0] * u.deg + dy = pixelscales[1] * u.deg + if not np.isclose(dx, dy): + raise ValueError(f"GRID MUST BE SAME IN X AND Y. Got {dx=} and {dy=}") + grid = dy + # Get the minor axis of the convolving beams + minorcons = [] + for beam in beams: + try: + minorcons += [target_beam.deconvolve(beam).minor.to(u.arcsec).value] + except BeamError as err: + logger.error(err) + logger.warning( + f"Could not deconvolve. New: {target_beam!r}, Old: {beam!r} - will set convolving beam to 0.0" + ) + minorcons += [ + target_beam.deconvolve(beam, failure_returns_pointlike=True) + .minor.to(u.arcsec) + .value + ] + minorcons = np.array(minorcons) * u.arcsec + samps = minorcons / grid.to(u.arcsec) + # Check that convolving beam will be Nyquist sampled + if any(samps.value < 2): + # Set the convolving beam to be Nyquist sampled + nyq_con_beam = Beam(major=grid * 2, minor=grid * 2, pa=0 * u.deg) + # Find new target based on common beam * Nyquist beam + # Not sure if this is best - but it works + nyq_beam = target_beam.convolve(nyq_con_beam) + nyq_beam = Beam( + major=my_ceil(nyq_beam.major.to(u.arcsec).value, precision=1) * u.arcsec, + minor=my_ceil(nyq_beam.minor.to(u.arcsec).value, precision=1) * u.arcsec, + pa=round_up(nyq_beam.pa.to(u.deg), decimals=2), + ) + logger.info(f"Smallest common Nyquist sampled beam is: {nyq_beam!r}") + if target_beam is not None: + if target_beam < nyq_beam: + logger.warning("TARGET BEAM WILL BE UNDERSAMPLED!") + raise ValueError("CAN'T UNDERSAMPLE BEAM - EXITING") + else: + logger.warning("COMMON BEAM WILL BE UNDERSAMPLED!") + logger.warning("SETTING COMMON BEAM TO NYQUIST BEAM") + target_beam = nyq_beam + + return target_beam + + def convolve( - image: np.ndarray, old_beam: Beam, new_beam: Beam, dx: u.Quantity, dy: u.Quantity -) -> Tuple[np.ndarray, float]: + image: np.ndarray, + old_beam: Beam, + new_beam: Beam, + dx: u.Quantity, + dy: u.Quantity, + cutoff: Optional[float] = None, +) -> ConvolutionResult: """Convolve by X-ing in the Fourier domain. - convolution with Gaussian kernels only - no need for generation of a kernel image @@ -31,16 +146,17 @@ def convolve( new_beam (Beam): Target image PSF. dx (u.Quantity): Grid size in x (e.g. CDELT1) dy (u.Quantity): Grid size in y (e.g. CDELT2) + cutoff (Optional[float], optional): Dummy cutoff for beamszie in arcsec. Defaults to None. NOT USED. Returns: - Tuple[np.ndarray, float]: (convolved image, scaling factor) + ConvolutionResult: convolved image, scaling factor """ nanflag = np.isnan(image).any() if nanflag: # Create a mask for the NaNs - mask = np.isnan(image).astype(int) + mask = np.isnan(image).astype(np.int16) logger.warning( f"Image contains {mask.sum()} ({mask.sum()/ mask.size *100 :0.1f}%) NaNs" ) @@ -50,10 +166,9 @@ def convolve( ny = image.shape[1] # The coordinates in FT domain: - u_image = np.fft.fftfreq(nx, d=dx.to(units.rad).value) - v_image = np.fft.fftfreq(ny, d=dy.to(units.rad).value) + u_image = np.fft.fftfreq(nx, d=dx.to(units.rad).value).astype(np.complex64) + v_image = np.fft.fftfreq(ny, d=dy.to(units.rad).value).astype(np.complex64) - g_final = np.zeros((nx, ny), dtype=float) [g_final, g_ratio] = gaussft.gaussft( bmin_in=old_beam.minor.to(units.deg).value, bmaj_in=old_beam.major.to(units.deg).value, @@ -64,22 +179,33 @@ def convolve( u=u_image, v=v_image, ) + g_final = g_final.astype(np.complex64) + del u_image + del v_image + # Perform the x-ing in the FT domain - im_f = np.fft.fft2(image) + im_f = np.fft.fft2(image).astype(np.complex64) + del image # Now convolve with the desired Gaussian: - M = np.multiply(im_f, g_final) - im_conv = np.fft.ifft2(M) - im_conv = np.real(im_conv) + M = np.multiply(im_f, g_final).astype(np.complex64) + del im_f + im_conv = np.fft.ifft2(M).astype(np.complex64) + im_conv = np.real(im_conv).astype(np.float32) + del M if nanflag: # Convert the mask to the FT domain - mask_f = np.fft.fft2(mask) + mask_f = np.fft.fft2(mask).astype(np.complex64) + del mask # Multiply the mask by the FT of the Gaussian - M = np.multiply(mask_f, g_final) + M = np.multiply(mask_f, g_final).astype(np.complex64) + del g_final + del mask_f # Invert the FT of the mask - mask_conv = np.fft.ifft2(M) - mask_conv = np.real(mask_conv) + mask_conv = np.fft.ifft2(M).astype(np.complex64) + mask_conv = np.real(mask_conv).astype(np.float32) + del M # Use approx values to find the NaNs # Need this to get around numerical issues mask_conv = ~(mask_conv + 1 < 2) @@ -88,87 +214,311 @@ def convolve( ) im_conv[mask_conv > 0] = np.nan - return im_conv, g_ratio + return ConvolutionResult(image=im_conv, scaling_factor=g_ratio) -def smooth( +def convolve_scipy( image: np.ndarray, old_beam: Beam, - final_beam: Beam, + new_beam: Beam, dx: u.Quantity, dy: u.Quantity, - sfactor: float, - conbeam: Beam = None, - conv_mode: str = "robust", -) -> np.ndarray: - """Apply smoothing to image + cutoff: Optional[float] = None, +) -> ConvolutionResult: + """Convolve using scipy's convolution Args: - image (np.ndarray): 2D image array. + image (np.ndarray): Image to be convolved. old_beam (Beam): Current beam. - final_beam (Beam): Target beam. + new_beam (Beam): Target beam. dx (u.Quantity): Pixel size in x. dy (u.Quantity): Pixel size in y. - sfactor (float): Scaling factor. - conbeam (Beam, optional): Convoling beam to use. Defaults to None. - conv_mode (str, optional): Convolution mode to use. Defaults to "robust". + cutoff (Optional[float], optional): Cutoff for beamszie in arcsec. Defaults to None. Returns: - np.ndarray: Smoothed image. + ConvolutionResult: Convolved image, scaling factor. """ + conbeam, sfactor = get_convolving_beam( + old_beam=old_beam, + new_beam=new_beam, + dx=dx, + dy=dy, + cutoff=cutoff, + ) if np.isnan(sfactor): logger.warning("Beam larger than cutoff -- blanking") + newim = np.ones_like(image) * np.nan + return ConvolutionResult(newim, sfactor) + if conbeam is None: + conbeam = new_beam.deconvolve(old_beam) + if np.isnan(conbeam): + return ConvolutionResult(image * np.nan, sfactor) + if np.isnan(image).all(): + return ConvolutionResult(image, sfactor) + if conbeam == Beam(major=0 * u.deg, minor=0 * u.deg, pa=0 * u.deg) and sfactor == 1: + return ConvolutionResult(image, sfactor) + + gauss_kern = conbeam.as_kernel(dy) + conbm1 = gauss_kern.array / gauss_kern.array.max() + newim = scipy.signal.convolve(image.astype("f8"), conbm1, mode="same") + + return ConvolutionResult(newim, sfactor) + + +def convolve_astropy( + image: np.ndarray, + old_beam: Beam, + new_beam: Beam, + dx: u.Quantity, + dy: u.Quantity, + cutoff: Optional[float] = None, +) -> ConvolutionResult: + """Convolve using astropy's convolution + + Args: + image (np.ndarray): Image to be convolved. + old_beam (Beam): Current beam. + new_beam (Beam): Target beam. + dx (u.Quantity): Pixel size in x. + dy (u.Quantity): Pixel size in y. + cutoff (Optional[float], optional): Cutoff for beamszie in arcsec. Defaults to None. + Returns: + ConvolutionResult: Convolved image, scaling factor. + """ + conbeam, sfactor = get_convolving_beam( + old_beam=old_beam, + new_beam=new_beam, + dx=dx, + dy=dy, + cutoff=cutoff, + ) + if np.isnan(sfactor): + logger.warning("Beam larger than cutoff -- blanking") newim = np.ones_like(image) * np.nan - return newim + return ConvolutionResult(newim, sfactor) if conbeam is None: - conbeam = final_beam.deconvolve(old_beam) + conbeam = new_beam.deconvolve(old_beam) if np.isnan(conbeam): - return image * np.nan + return ConvolutionResult(image * np.nan, sfactor) if np.isnan(image).all(): - return image + return ConvolutionResult(image, sfactor) if conbeam == Beam(major=0 * u.deg, minor=0 * u.deg, pa=0 * u.deg) and sfactor == 1: - return image - # using Beams package - logger.debug(f"Old beam is {old_beam!r}") - logger.debug(f"Using convolving beam {conbeam!r}") - logger.debug(f"Target beam is {final_beam!r}") - logger.debug(f"Using scaling factor {sfactor}") - pix_scale = dy + return ConvolutionResult(image, sfactor) + gauss_kern = conbeam.as_kernel(dy) + conbm1 = gauss_kern.array / gauss_kern.array.max() + newim = convolution.convolve( + image.astype("f8"), + conbm1, + normalize_kernel=False, + ) + + return ConvolutionResult(newim, sfactor) + + +def convolve_astropy_fft( + image: np.ndarray, + old_beam: Beam, + new_beam: Beam, + dx: u.Quantity, + dy: u.Quantity, + cutoff: Optional[float] = None, +) -> ConvolutionResult: + """Convolve using astropy's FFT convolution + + Args: + image (np.ndarray): Image to be convolved. + old_beam (Beam): Current beam. + new_beam (Beam): Target beam. + dx (u.Quantity): Pixel size in x. + dy (u.Quantity): Pixel size in y. + cutoff (Optional[float], optional): Cutoff for beamszie in arcsec. Defaults to None. - gauss_kern = conbeam.as_kernel(pix_scale) + Returns: + ConvolutionResult: Convolved image, scaling factor. + """ + conbeam, sfactor = get_convolving_beam( + old_beam=old_beam, + new_beam=new_beam, + dx=dx, + dy=dy, + cutoff=cutoff, + ) + if np.isnan(sfactor): + logger.warning("Beam larger than cutoff -- blanking") + newim = np.ones_like(image) * np.nan + return ConvolutionResult(newim, sfactor) + if conbeam is None: + conbeam = new_beam.deconvolve(old_beam) + if np.isnan(conbeam): + return ConvolutionResult(image * np.nan, sfactor) + if np.isnan(image).all(): + return ConvolutionResult(image, sfactor) + if conbeam == Beam(major=0 * u.deg, minor=0 * u.deg, pa=0 * u.deg) and sfactor == 1: + return ConvolutionResult(image, sfactor) + gauss_kern = conbeam.as_kernel(dy) conbm1 = gauss_kern.array / gauss_kern.array.max() - fac = sfactor + newim = convolution.convolve_fft( + image.astype("f8"), + conbm1, + normalize_kernel=False, + allow_huge=True, + ) + + return ConvolutionResult(newim, sfactor) + + +CONVOLUTION_FUNCTIONS = { + "robust": convolve, + "scipy": convolve_scipy, + "astropy": convolve_astropy, + "astropy_fft": convolve_astropy_fft, +} + + +def parse_conv_mode( + conv_mode: Literal["robust", "scipy", "astropy", "astropy_fft"], +) -> str: + """Parse convolution mode + + Args: + conv_mode (str): Convolution mode. + + Returns: + str: Convolution mode. + """ + logger.info(f"Convolution mode: {conv_mode}") + if conv_mode not in CONVOLUTION_FUNCTIONS.keys(): + raise ValueError( + f"Please select valid convolution method! Expected one of {list(CONVOLUTION_FUNCTIONS.keys())}, got {conv_mode}" + ) + + logger.info(f"Using convolution method {conv_mode}") if conv_mode == "robust": - newim, fac = convolve( - image.astype("f8"), - old_beam, - final_beam, - dx, - dy, + logger.info("This is the most robust method. And fast!") + elif conv_mode == "scipy": + logger.info("This fast, but not robust to NaNs or small PSF changes") + else: + logger.info("This is slower, but robust to NaNs, but not to small PSF changes") + + return conv_mode + + +def get_convolving_beam( + old_beam: Beam, + new_beam: Beam, + dx: u.Quantity, + dy: u.Quantity, + cutoff: Optional[float] = None, +) -> Tuple[Beam, float]: + """Get the beam to use for smoothing + + Args: + old_beam (Beam): Current beam. + new_beam (Beam): Target beam. + dx (u.Quantity): Pixel size in x. + dy (u.Quantity): Pixel size in y. + cutoff (float, optional): Cutoff for beamsize in arcsec. Defaults to None. + + Raises: + err: If beam deconvolution fails. + + Returns: + Tuple[Beam, float]: Convolving beam and scaling factor. + """ + logger.info(f"Current beam is {old_beam!r}") + + if cutoff is not None and old_beam.major.to(u.arcsec) > cutoff * u.arcsec: + return ( + Beam( + major=np.nan * u.deg, + minor=np.nan * u.deg, + pa=np.nan * u.deg, + ), + np.nan, + ) + + if new_beam == old_beam: + conbm = Beam( + major=0 * u.deg, + minor=0 * u.deg, + pa=0 * u.deg, ) - # keep the new sfactor computed by this method - sfactor = fac - if conv_mode == "scipy": - newim = scipy.signal.convolve(image.astype("f8"), conbm1, mode="same") - elif conv_mode == "astropy": - newim = convolution.convolve( - image.astype("f8"), - conbm1, - normalize_kernel=False, + fac = 1.0 + logger.warning( + f"New beam {new_beam!r} and old beam {old_beam!r} are the same. Won't attempt convolution." ) - elif conv_mode == "astropy_fft": - newim = convolution.convolve_fft( - image.astype("f8"), - conbm1, - normalize_kernel=False, - allow_huge=True, + return conbm, fac + try: + conbm = new_beam.deconvolve(old_beam) + except BeamError as err: + logger.error(err) + logger.warning( + f"Could not deconvolve. New: {new_beam!r}, Old: {old_beam!r} - will set convolving beam to 0.0" ) + conbm = new_beam.deconvolve(old_beam, failure_returns_pointlike=True) + fac, _, _, _, _ = au2.gauss_factor( + beamConv=[ + conbm.major.to(u.arcsec).value, + conbm.minor.to(u.arcsec).value, + conbm.pa.to(u.deg).value, + ], + beamOrig=[ + old_beam.major.to(u.arcsec).value, + old_beam.minor.to(u.arcsec).value, + old_beam.pa.to(u.deg).value, + ], + dx1=dx.to(u.arcsec).value, + dy1=dy.to(u.arcsec).value, + ) + logger.debug(f"Old beam is {old_beam!r}") + logger.debug(f"Using convolving beam {conbm!r}") + logger.debug(f"Target beam is {new_beam!r}") + logger.debug(f"Using scaling factor {fac}") + return conbm, fac + + +def smooth( + image: np.ndarray, + old_beam: Beam, + new_beam: Beam, + dx: u.Quantity, + dy: u.Quantity, + conv_mode: Literal["robust", "scipy", "astropy", "astropy_fft"] = "robust", + cutoff: Optional[float] = None, +) -> np.ndarray: + """Apply smoothing to image in Jy/beam + + Args: + image (np.ndarray): 2D image array. + old_beam (Beam): Current beam. + new_beam (Beam): Target beam. + dx (u.Quantity): Pixel size in x. + dy (u.Quantity): Pixel size in y. + conv_mode (str, optional): Convolution mode to use. Defaults to "robust". + cutoff (Optional[float], optional): Cutoff for beamsize in arcsec. Defaults to None. + + Returns: + np.ndarray: Smoothed image. + + """ + out_dtype = image.dtype + conv_func = CONVOLUTION_FUNCTIONS.get(conv_mode, convolve) + new_image, fac = conv_func( + image=image, + old_beam=old_beam, + new_beam=new_beam, + dx=dx, + dy=dy, + cutoff=cutoff, + ) + del image + gc.collect() logger.debug(f"Using scaling factor {fac}") - if np.any(np.isnan(newim)): - logger.warning(f"{np.isnan(newim).sum()} NaNs present in smoothed output") + if np.any(np.isnan(new_image)): + logger.warning(f"{np.isnan(new_image).sum()} NaNs present in smoothed output") - newim *= fac + new_image *= fac # Ensure the output data-type is the same as the input - return newim.astype(image.dtype) + return new_image.astype(out_dtype) diff --git a/racs_tools/gaussft.py b/racs_tools/gaussft.py index 33fd0d1..8ce660b 100644 --- a/racs_tools/gaussft.py +++ b/racs_tools/gaussft.py @@ -50,11 +50,13 @@ def gaussft( ) bmaj_rad, bmin_rad, bpa_rad = bmaj * deg2rad, bmin * deg2rad, bpa * deg2rad - sx, sy = bmaj_rad / (2 * np.sqrt(2.0 * np.log(2.0))), bmin_rad / ( - 2 * np.sqrt(2.0 * np.log(2.0)) + sx, sy = ( + bmaj_rad / (2 * np.sqrt(2.0 * np.log(2.0))), + bmin_rad / (2 * np.sqrt(2.0 * np.log(2.0))), ) - sx_in, sy_in = bmaj_in_rad / (2.0 * np.sqrt(2.0 * np.log(2.0))), bmin_in_rad / ( - 2.0 * np.sqrt(2.0 * np.log(2.0)) + sx_in, sy_in = ( + bmaj_in_rad / (2.0 * np.sqrt(2.0 * np.log(2.0))), + bmin_in_rad / (2.0 * np.sqrt(2.0 * np.log(2.0))), ) u_cosbpa, u_sinbpa = u * np.cos(bpa_rad), u * np.sin(bpa_rad) diff --git a/racs_tools/getnoise_list.py b/racs_tools/getnoise_list.py index 3f520e6..626c798 100644 --- a/racs_tools/getnoise_list.py +++ b/racs_tools/getnoise_list.py @@ -2,9 +2,8 @@ """ Find bad channels by checking statistics of each channel image. """ import argparse -import logging import warnings -from typing import List, Tuple, Union +from typing import Tuple, Union import astropy.units as u import numpy as np @@ -12,7 +11,7 @@ from spectral_cube import SpectralCube from spectral_cube.utils import SpectralCubeWarning -from racs_tools.logging import logger, setup_logger +from racs_tools.logging import logger, set_verbosity warnings.filterwarnings(action="ignore", category=SpectralCubeWarning, append=True) @@ -31,7 +30,7 @@ def getcube(filename: str) -> SpectralCube: SpectralCube: Data cube """ cube = SpectralCube.read(filename) - mask = ~(cube == 0 * u.jansky / u.beam) + mask = ~(cube == 0 * cube.unit) cube = cube.with_mask(mask) return cube @@ -190,7 +189,7 @@ def main( if outfile: logger.info(f"Saving bad files to {outfile}") - np.savetxt(outfile, totalbad) + np.savetxt(outfile, totalbad.astype(int), fmt="%d") def cli() -> None: @@ -257,9 +256,9 @@ def cli() -> None: args = parser.parse_args() # Set up logging - logger = setup_logger( + set_verbosity( + logger=logger, verbosity=args.verbosity, - filename=args.logfile, ) main( diff --git a/racs_tools/logging.py b/racs_tools/logging.py index 7b7bc81..2fe1693 100644 --- a/racs_tools/logging.py +++ b/racs_tools/logging.py @@ -2,47 +2,79 @@ # -*- coding: utf-8 -*- import logging -from typing import Optional - -try: - from mpi4py import MPI - - myPE = MPI.COMM_WORLD.Get_rank() -except ImportError: - myPE = 0 +import multiprocessing as mp +from logging.handlers import QueueHandler, QueueListener +from typing import Optional, Tuple logging.captureWarnings(True) -logger = logging.getLogger("racs_tools") -logger.setLevel(logging.WARNING) -formatter = logging.Formatter( - fmt=f"[{myPE}] %(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s", - datefmt="%Y-%m-%d %H:%M:%S", -) +# Following guide from gwerbin/multiprocessing_logging.py +# https://gist.github.com/gwerbin/e9ab7a88fef03771ab0bf3a11cf921bc def setup_logger( - verbosity: int = 0, filename: Optional[str] = None, -): - if verbosity == 0: - level = logging.WARNING - elif verbosity == 1: - level = logging.INFO - elif verbosity >= 2: - level = logging.DEBUG +) -> Tuple[logging.Logger, QueueListener, mp.Queue]: + """Setup a logger + + Args: + filename (Optional[str], optional): Output log file. Defaults to None. + + Returns: + Tuple[logging.Logger, QueueListener, mp.Queue]: Logger, log listener and log queue + """ + logger = logging.getLogger("racs_tools") + logger.setLevel(logging.WARNING) + formatter = logging.Formatter( + fmt="[%(threadName)s] %(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) ch = logging.StreamHandler() - ch.setLevel(level) ch.setFormatter(formatter) - - logger.setLevel(level) logger.addHandler(ch) if filename is not None: fh = logging.FileHandler(filename) - fh.setLevel(level) fh.setFormatter(formatter) logger.addHandler(fh) - return logger + log_queue = mp.Queue() + log_listener = QueueListener(log_queue, ch) + + return logger, log_listener, log_queue + + +def set_verbosity(logger: logging.Logger, verbosity: int) -> None: + """Set the logger verbosity + + Args: + logger (logging.Logger): The logger + verbosity (int): Verbosity level + """ + if verbosity == 0: + level = logging.WARNING + elif verbosity == 1: + level = logging.INFO + elif verbosity >= 2: + level = logging.DEBUG + + logger.setLevel(level) + + +def init_worker(log_queue: mp.Queue, verbosity: int = 0) -> None: + """Initialise a worker process with a logger + + Args: + log_queue (mp.Queue): The log queue + verbosity (int, optional): Verbosity level. Defaults to 0. + """ + logger = logging.getLogger("racs_tools") + + set_verbosity(logger, verbosity) + + handler = QueueHandler(log_queue) + logger.addHandler(handler) + + +logger, log_listener, log_queue = setup_logger() diff --git a/racs_tools/parallel.py b/racs_tools/parallel.py new file mode 100644 index 0000000..e7daef6 --- /dev/null +++ b/racs_tools/parallel.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python3 +""" Utilities for parallelism """ +from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor +from typing import Dict, Literal, TypeVar + +from racs_tools.logging import logger + +# logger = setup_logger() + +try: + from mpi4py.futures import MPIPoolExecutor +except ImportError: + logger.debug("MPI not available") + MPIPoolExecutor = None + +Executor = TypeVar("Executor", ThreadPoolExecutor, ProcessPoolExecutor, MPIPoolExecutor) + +EXECUTORS: Dict[Literal["thread", "process", "mpi"], Executor] = { + "thread": ThreadPoolExecutor, + "process": ProcessPoolExecutor, + "mpi": MPIPoolExecutor, +} + + +def get_executor( + executor_type: Literal["thread", "process", "mpi"] = "thread", +) -> Executor: + """Get an executor based on the type + + Args: + executor_type (Literal["thread", "process", "mpi"], optional): Type of executor. Defaults to "thread". + + Raises: + ValueError: If the executor type is not available + + Returns: + Executor: Executor class + """ + ExecutorClass: Executor = EXECUTORS.get(executor_type, ThreadPoolExecutor) + if ExecutorClass is None: + raise ValueError(f"Executor type {executor_type} is not available!") + logger.info(f"Using {ExecutorClass.__name__} for parallelisation") + return ExecutorClass diff --git a/tests/test_2d.py b/tests/test_2d.py index bf96d1b..fd4ce6c 100644 --- a/tests/test_2d.py +++ b/tests/test_2d.py @@ -4,14 +4,12 @@ import logging import shutil import subprocess as sp -from os import path from pathlib import Path -from typing import List, NamedTuple, Tuple, Union +from typing import NamedTuple import astropy.units as u import numpy as np import pytest -import schwimmbad from astropy.io import fits from radio_beam import Beam @@ -141,16 +139,14 @@ def check_images(image_1: Path, image_2: Path) -> bool: def test_robust(make_2d_image: TestImage, mirsmooth: TestImage): """Test the robust convolution.""" - with schwimmbad.SerialPool() as pool: - beamcon_2D.main( - pool=pool, - infile=[make_2d_image.path.as_posix()], - suffix="robust", - conv_mode="robust", - bmaj=mirsmooth.beam.major.to(u.arcsec).value, - bmin=mirsmooth.beam.minor.to(u.arcsec).value, - bpa=mirsmooth.beam.pa.to(u.deg).value, - ) + beamcon_2D.smooth_fits_files( + infile_list=[make_2d_image.path], + suffix="robust", + conv_mode="robust", + bmaj=mirsmooth.beam.major.to(u.arcsec).value, + bmin=mirsmooth.beam.minor.to(u.arcsec).value, + bpa=mirsmooth.beam.pa.to(u.deg).value, + ) fname_beamcon = make_2d_image.path.with_suffix(".robust.fits") assert check_images(mirsmooth.path, fname_beamcon), "Beamcon does not match miriad" @@ -158,16 +154,14 @@ def test_robust(make_2d_image: TestImage, mirsmooth: TestImage): def test_astropy(make_2d_image: TestImage, mirsmooth: TestImage): """Test the astropy convolution.""" - with schwimmbad.SerialPool() as pool: - beamcon_2D.main( - pool=pool, - infile=[make_2d_image.path.as_posix()], - suffix="astropy", - conv_mode="astropy", - bmaj=mirsmooth.beam.major.to(u.arcsec).value, - bmin=mirsmooth.beam.minor.to(u.arcsec).value, - bpa=mirsmooth.beam.pa.to(u.deg).value, - ) + beamcon_2D.smooth_fits_files( + infile_list=[make_2d_image.path], + suffix="astropy", + conv_mode="astropy", + bmaj=mirsmooth.beam.major.to(u.arcsec).value, + bmin=mirsmooth.beam.minor.to(u.arcsec).value, + bpa=mirsmooth.beam.pa.to(u.deg).value, + ) fname_beamcon = make_2d_image.path.with_suffix(".astropy.fits") assert check_images(mirsmooth.path, fname_beamcon), "Beamcon does not match miriad" @@ -175,16 +169,14 @@ def test_astropy(make_2d_image: TestImage, mirsmooth: TestImage): def test_scipy(make_2d_image: TestImage, mirsmooth: TestImage): """Test the scipy convolution.""" - with schwimmbad.SerialPool() as pool: - beamcon_2D.main( - pool=pool, - infile=[make_2d_image.path.as_posix()], - suffix="scipy", - conv_mode="scipy", - bmaj=mirsmooth.beam.major.to(u.arcsec).value, - bmin=mirsmooth.beam.minor.to(u.arcsec).value, - bpa=mirsmooth.beam.pa.to(u.deg).value, - ) + beamcon_2D.smooth_fits_files( + infile_list=[make_2d_image.path], + suffix="scipy", + conv_mode="scipy", + bmaj=mirsmooth.beam.major.to(u.arcsec).value, + bmin=mirsmooth.beam.minor.to(u.arcsec).value, + bpa=mirsmooth.beam.pa.to(u.deg).value, + ) fname_beamcon = make_2d_image.path.with_suffix(".scipy.fits") assert check_images(mirsmooth.path, fname_beamcon), "Beamcon does not match miriad" @@ -219,10 +211,9 @@ def test_smooth(make_2d_image: TestImage, mirsmooth: TestImage): smooth_data = smooth( image=make_2d_image.data, old_beam=old_beam, - final_beam=target_beam, + new_beam=target_beam, dx=make_2d_image.pix_scale, dy=make_2d_image.pix_scale, - sfactor=fac, conv_mode=conv_mode, ) assert np.allclose( diff --git a/tests/test_3d.py b/tests/test_3d.py index 3d7e379..835db8c 100644 --- a/tests/test_3d.py +++ b/tests/test_3d.py @@ -1,16 +1,13 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- -import os import shutil import subprocess as sp -import unittest from pathlib import Path import astropy.units as u import numpy as np import pytest -import schwimmbad from astropy.io import fits from astropy.table import Table from radio_beam import Beam, Beams @@ -24,9 +21,21 @@ def make_3d_image( tmpdir: str, beams: Beams = Beams( - major=[50, 50, 50] * u.arcsec, - minor=[10, 10, 10] * u.arcsec, - pa=[0, 0, 0] * u.deg, + major=[ + 50, + ] + * 100 + * u.arcsec, + minor=[ + 10, + ] + * 100 + * u.arcsec, + pa=[ + 0, + ] + * 100 + * u.deg, ), ) -> TestImage: """Make a fake 3D image from with a Gaussian beam. @@ -42,9 +51,11 @@ def make_3d_image( freqs = np.linspace(1, 2, len(beams)) * u.GHz cube = [] - for beam in beams: + peaks = np.random.uniform(0.1, 10, len(beams)) + for beam, peak in zip(beams, peaks): data = beam.as_kernel(pixscale=pix_scale, x_size=100, y_size=100).array data /= data.max() + data *= peak cube.append(data) cube = np.array(cube)[np.newaxis] @@ -154,8 +165,8 @@ def mirsmooth_3d(make_3d_image: TestImage) -> TestImage: def test_robust_3d(make_3d_image, mirsmooth_3d): """Test the robust mode.""" - beamcon_3D.main( - infile=[make_3d_image.path.as_posix()], + beamcon_3D.smooth_fits_cube( + infiles_list=[make_3d_image.path], suffix="robust", conv_mode="robust", mode="total",