diff --git a/changes/292.apichange.rst b/changes/292.apichange.rst new file mode 100644 index 000000000..63c63b169 --- /dev/null +++ b/changes/292.apichange.rst @@ -0,0 +1 @@ +Add `outlier_detection` median calculators from jwst. \ No newline at end of file diff --git a/docs/stcal/outlier_detection/index.rst b/docs/stcal/outlier_detection/index.rst index 7c3bac073..a72b7a446 100644 --- a/docs/stcal/outlier_detection/index.rst +++ b/docs/stcal/outlier_detection/index.rst @@ -9,4 +9,5 @@ Outlier Detection Utils description.rst +.. automodapi:: stcal.outlier_detection.median .. automodapi:: stcal.outlier_detection.utils diff --git a/pyproject.toml b/pyproject.toml index 50fdbba51..c0435f984 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -101,6 +101,11 @@ norecursedirs = [ ] filterwarnings = [ "error::ResourceWarning", + # Files left open by tests can trigger ResourceWarnings during test + # teardown which otherwise would be converted back into a warning + # by pytest. Turn these PytestUnraisableExceptionWarnings back + # into errors + "error::pytest.PytestUnraisableExceptionWarning", ] markers = [ "soctests", @@ -166,6 +171,8 @@ ignore = [ "PLR0913", # Too many arguments "PLR0915", # Too many statements "PLR2004", # Magic value used in comparison + "ANN101", # Missing type annotation for self in method + "ANN102", # Missing type annotation for cls in classmethod # Pydocstyle (to fix over time "D100", # Undocumented public module diff --git a/src/stcal/outlier_detection/median.py b/src/stcal/outlier_detection/median.py new file mode 100644 index 000000000..2ff899fe1 --- /dev/null +++ b/src/stcal/outlier_detection/median.py @@ -0,0 +1,400 @@ +"""Compute median of large datasets in memory- and runtime-efficient ways.""" + +from __future__ import annotations + +import logging +import tempfile +import warnings +from pathlib import Path +from typing import Any + +import numpy as np + +log = logging.getLogger(__name__) +log.setLevel(logging.DEBUG) + +_ONE_MB = 1 << 20 + + +__all__ = ["MedianComputer", "nanmedian3D"] + + +def nanmedian3D(cube: np.ndarray, overwrite_input: bool = True) -> np.ndarray: + """Compute the nanmedian of a cube. + + This produces identical results to np.nanmedian but is much more + memory efficient for large cubes. + + RuntimeWarnings reporting "All-Nan slice" will be ignored. + + Parameters + ---------- + cube + 3-dimensional array. Will be modified in-place if `overwrite_input` is True + overwrite_input + Passed to ``np.nanmedian``, if True the input cube will be modified + + Returns + ------- + np.ndarray + 2-dimensional computed median array + """ + with warnings.catch_warnings(): + warnings.filterwarnings(action="ignore", + message="All-NaN slice encountered", + category=RuntimeWarning) + output_arr = np.empty(cube.shape[1:], dtype=cube.dtype) + for i in range(output_arr.shape[0]): + # np.nanmedian allocates lots of memory; this for loop gets around that + np.nanmedian(cube[:, i, :], + axis=0, + overwrite_input=overwrite_input, + out=output_arr[i, :]) + return output_arr + + +class MedianComputer: + """ + Top-level class to treat median computation uniformly, whether in + memory or on disk. + """ + + def __init__(self, + full_shape: tuple, + in_memory: bool, + buffer_size: int | None = None, + dtype: str | np.dtype = "float32", + tempdir: str = "", + ) -> None: + """ + Parameters + ---------- + full_shape + The shape of the full input dataset. + + in_memory + Whether to perform the median computation in memory or using + temporary files on disk to save memory. + + buffer_size + The buffer size for the median computation, units of bytes. + Has no effect if `in_memory` is True. + + dtype + The data type of the input data. + + tempdir + The parent directory in which to create the temporary directory. + Default is the current working directory. + """ + self.full_shape = full_shape + self.in_memory = in_memory + if buffer_size is None: + buffer_size = 0 + self.buffer_size = buffer_size + self.dtype = dtype + if self.in_memory: + computer: Any = np.empty(full_shape, dtype=dtype) + else: + computer = _OnDiskMedian(full_shape, + dtype=dtype, + buffer_size=buffer_size, + tempdir=tempdir) + self._median_computer: Any = computer + + def append(self, + data: np.ndarray, + idx: int | None = None + ) -> None: + """ + Append data to the median computer. + + Parameters + ---------- + data + The data to append to the median computer. Must have shape full_shape[1:]. + + idx + The index at which to append the data. Must be between 0 and full_shape[0]. + Required if using in-memory median computation. + """ + if self.in_memory: + if idx is None: + msg = "Index must be provided when using in-memory median" + raise ValueError(msg) + # populate pre-allocated memory with the drizzled data + self._median_computer[idx] = data + else: + # distribute the drizzled data into the temporary storage + self._median_computer.add_image(data) + + def evaluate(self) -> np.ndarray: + """ + Compute the median data from the input data. + + Returns + ------- + np.ndarray + The median data computed from the input data. + """ + if self.in_memory: + median_data = nanmedian3D(self._median_computer) + else: + median_data = self._median_computer.compute_median() + self._median_computer.cleanup() + return median_data + + +class _DiskAppendableArray: + """ + Creates a temporary file to which to append data, in order to perform + timewise operations on a stack of input images without holding all of them + in memory. + + This class is purpose-built for median computation during outlier detection + and is not very flexible. It is assumed that each data array passed to + `append` represents the same spatial segment of the full dataset. It is + also assumed that each array passed to `append` represents only a single + instant in time; the append operation will stack them along a new axis. + + The `read` operation is only capable of loading the full array back + into memory. When working with large datasets that do not fit in memory + the required workflow is to create many DiskAppendableArray objects, each + holding a small spatial segment of the full dataset. + """ + + def __init__(self, + slice_shape: tuple, + dtype: str | np.dtype, + filename: str | Path + ) -> None: + """ + Parameters + ---------- + slice_shape + The required shape of each appended slice. + + dtype + The data type of the array. Must be a valid numpy array datatype. + + filename + The full file path in which to store the array + """ + if len(slice_shape) != 2: + msg = f"Invalid slice shape {slice_shape}. Only 2-D arrays are supported." + raise ValueError(msg) + self._filename = Path(filename) + with Path.open(self._filename, "wb") as f: # noqa: F841 + pass + self._slice_shape = slice_shape + self._dtype = np.dtype(dtype) + self._append_count = 0 + + @property + def shape(self) -> tuple: + return (self._append_count, *self._slice_shape) + + def append(self, data: np.ndarray) -> None: + """ + Add a new slice to the temporary file. + + Parameters + ---------- + data + The data to append to the array. Must have shape ``slice_shape`` + and dtype matching the one provided to ``__init__``. + """ + if data.shape != self._slice_shape: + msg = f"Data shape {data.shape} does not match slice shape {self._slice_shape}" + raise ValueError(msg) + if data.dtype != self._dtype: + msg = f"Data dtype {data.dtype} does not match array dtype {self._dtype}" + raise ValueError(msg) + with Path.open(self._filename, "ab") as f: + data.tofile(f, sep="") + self._append_count += 1 + + def read(self) -> np.ndarray: + """ + Read the 3-D array into memory. + + Returns + ------- + array + The full array stored in the temporary file, shape (append_count, *slice_shape). + """ + shp = (self._append_count, *self._slice_shape) + with Path.open(self._filename, "rb") as f: + return np.fromfile(f, dtype=self._dtype).reshape(shp) + + +class _OnDiskMedian: + + def __init__(self, + shape: tuple, + dtype: str | np.dtype = "float32", + tempdir: str = "", + buffer_size: int = 0 + ) -> None: + """ + Set up temporary files to perform operations on a stack of 2-D input + arrays along the first dimension (e.g., a time axis) without + holding all of them in memory. Currently the only supported operation + is the median. + + Parameters + ---------- + shape + The shape of the entire input, (n_images, imrows, imcols). + + dtype + The data type of the input data. + + tempdir + The parent directory in which to create the temporary directory, + which itself holds all the DiskAppendableArray tempfiles. + Default is the current working directory. + + buffer_size + The buffer size, units of bytes. + Default is the size of one input image. + """ + if len(shape) != 3: + msg = f"Invalid input shape {shape}; only three-dimensional data are supported." + raise ValueError(msg) + self._expected_nframes = shape[0] + self.frame_shape = shape[1:] + self.dtype = np.dtype(dtype) + self.itemsize = self.dtype.itemsize + self._temp_dir = tempfile.TemporaryDirectory(dir=tempdir) + self._temp_path = Path(self._temp_dir.name) + + # figure out number of sections and rows per section that are needed + self.nsections, self.section_nrows = \ + self._get_buffer_indices(buffer_size=buffer_size) + self.slice_shape = (self.section_nrows, shape[2]) + self._n_adds = 0 + + # instantiate a temporary DiskAppendableArray for each section + self._temp_arrays = self._temparray_setup(dtype) + + def _get_buffer_indices(self, + buffer_size: int = 0 + ) -> tuple[int, int]: + """ + Determine the number of sections and rows per section needed to + divide the input data into sections that fit within the specified + buffer size. + + Parameters + ---------- + buffer_size + The buffer size for the median computation, units of bytes. + + Returns + ------- + nsections + The number of sections to divide the input data into. + + section_nrows + The number of rows in each section (except the last one). + """ + imrows, imcols = self.frame_shape + if buffer_size == 0: + buffer_size = imrows * imcols * self.itemsize + per_model_buffer_size = buffer_size / self._expected_nframes + min_buffer_size = imcols * self.itemsize + section_nrows = \ + min(imrows, int(per_model_buffer_size // min_buffer_size)) + + if section_nrows <= 0: + buffer_size = min_buffer_size * self._expected_nframes + msg = ("Buffer size is too small to hold a single row. " + f"Increasing buffer size to {buffer_size / _ONE_MB} MB") + log.warning(msg) + section_nrows = 1 + self.buffer_size = buffer_size + + nsections = int(np.ceil(imrows / section_nrows)) + msg = (f"Computing median over {self._expected_nframes} " + f"groups in {nsections} sections " + f"with total memory buffer {buffer_size / _ONE_MB} MB") + log.info(msg) + return nsections, section_nrows + + def _temparray_setup(self, + dtype: str | np.dtype + ) -> list[_DiskAppendableArray]: + """Set up temp file handlers, one for each section.""" + temp_arrays = [] + for i in range(self.nsections): + shp = self.slice_shape + if i == self.nsections - 1: + # last section has whatever shape is left over + shp = (self.frame_shape[0] - (self.nsections-1) * + self.section_nrows, self.frame_shape[1]) + arr = _DiskAppendableArray(shp, dtype, self._temp_path / f"{i}.bin") + temp_arrays.append(arr) + return temp_arrays + + def add_image(self, data: np.ndarray) -> None: + """ + Split data into ``nsections`` sections and write/append each section to + the corresponding temporary file. + """ + if self._n_adds >= self.nsections: + msg = f"Too many calls to add_image. Expected at most {self.nsections} input models." + raise IndexError(msg) + self._validate_data(data) + self._n_adds += 1 + for i in range(self.nsections): + row1 = i * self.section_nrows + row2 = min(row1 + self.section_nrows, self.frame_shape[0]) + arr = self._temp_arrays[i] + arr.append(data[row1:row2]) + + def _validate_data(self, data: np.ndarray) -> None: + """Ensure data array being appended has correct shape and dtype.""" + if data.shape != self.frame_shape: + msg = f"Data shape {data.shape} does not match expected shape {self.frame_shape}." + raise ValueError(msg) + if data.dtype != self.dtype: + msg = f"Data dtype {data.dtype} does not match expected dtype {self.dtype}." + raise ValueError(msg) + + def cleanup(self) -> None: + """ + Remove the temporary files and directory when finished. + + This method should be called when this instance is no longer needed, + and the instance should not be used after this method is called. + """ + self._temp_dir.cleanup() + + def compute_median(self) -> np.ndarray: + """ + Compute the median of the previously added data. + + Returns + ------- + median_image + 2-dimensional array of shape [``imrows``, ``imcols``] containing + median values computed across all images previously provided to ``add_image``. + """ + row_indices = [(i * self.section_nrows, + min((i+1) * self.section_nrows, self.frame_shape[0])) + for i in range(self.nsections)] + + output_rows = row_indices[-1][1] + output_cols = self._temp_arrays[0].shape[2] + median_image = np.full((output_rows, output_cols), + np.nan, + dtype=self.dtype) + + for i, disk_arr in enumerate(self._temp_arrays): + row1, row2 = row_indices[i] + arr = disk_arr.read() + median_image[row1:row2] = nanmedian3D(arr) + del arr, disk_arr + + return median_image diff --git a/tests/outlier_detection/test_median.py b/tests/outlier_detection/test_median.py new file mode 100644 index 000000000..5361e44b4 --- /dev/null +++ b/tests/outlier_detection/test_median.py @@ -0,0 +1,196 @@ +import os +from pathlib import Path + +import numpy as np +import pytest + +from stcal.outlier_detection.median import ( + MedianComputer, + _DiskAppendableArray, + _OnDiskMedian, + nanmedian3D, +) + + +def test_disk_appendable_array(tmp_path): + + slice_shape = (8, 7) + dtype = "float32" + tempdir = tmp_path / Path("tmptest") + Path.mkdir(tempdir) + fname = tempdir / "test.bin" + + arr = _DiskAppendableArray(slice_shape, dtype, fname) + + # check temporary file setup + assert str(arr._filename).split("/")[-1] in os.listdir(tempdir) # noqa: SLF001 + assert len(os.listdir(tempdir)) == 1 + assert arr.shape == (0, *slice_shape) + + # check cwd contains no files + assert all(not Path.is_file(Path(f)) for f in os.listdir(tmp_path)) + + # check expected append failures + candidate = np.zeros((7, 7), dtype=dtype) + with pytest.raises(ValueError, match="shape"): + arr.append(candidate) + candidate = np.zeros((8, 7), dtype="float64") + with pytest.raises(ValueError, match="dtype"): + arr.append(candidate) + + # check append and read + candidate0 = np.zeros(slice_shape, dtype=dtype) + candidate1 = np.full(slice_shape, 2, dtype=dtype) + candidate2 = np.full(slice_shape, np.nan, dtype=dtype) + for candidate in [candidate0, candidate1, candidate2]: + arr.append(candidate) + + arr_in_memory = arr.read() + + assert arr_in_memory.shape == (3, *slice_shape) + assert np.all(arr_in_memory[0] == candidate0) + assert np.all(arr_in_memory[1] == candidate1) + assert np.allclose(arr_in_memory[2], candidate2, equal_nan=True) + + +def test_disk_appendable_array_bad_inputs(tmp_path): + + slice_shape = (8, 7) + dtype = "float32" + tempdir = tmp_path / Path("tmptest") + fname = "test.bin" + + # test input directory does not exist + with pytest.raises(FileNotFoundError): + _DiskAppendableArray(slice_shape, dtype, tempdir / fname) + + # make the input directory + Path.mkdir(tempdir) + + # ensure failure if slice_shape is not 2-D + with pytest.raises(ValueError, match="slice shape"): + _DiskAppendableArray((3, 5, 7), dtype, tempdir / fname) + + # ensure failure if dtype is not valid + with pytest.raises(TypeError): + _DiskAppendableArray(slice_shape, "float3", tempdir / fname) + + # ensure failure if pass directory instead of filename + with pytest.raises(IsADirectoryError): + _DiskAppendableArray(slice_shape, "float3", tempdir) + + +def test_on_disk_median(tmp_path): + + library_length = 3 + frame_shape = (21, 20) + dtype = "float32" + tempdir = tmp_path / Path("tmptest") + Path.mkdir(tempdir) + shape = (library_length, *frame_shape) + + median_computer = _OnDiskMedian(shape, dtype=dtype, tempdir=tempdir) + + # test compute buffer indices + # buffer size equals size of single input model by default + # which means we expect same number of sections as library length + # in reality there is often one more section than that because + # need integer number of rows per section, but math is exact in this case + expected_buffer_size = frame_shape[0] * frame_shape[1] * \ + np.dtype(dtype).itemsize + expected_section_nrows = frame_shape[0] // library_length + assert median_computer.nsections == library_length + assert median_computer.section_nrows == expected_section_nrows + assert median_computer.buffer_size == expected_buffer_size + + # test temp file setup + assert len(os.listdir(tempdir)) == 1 + assert str(median_computer._temp_path)\ + .startswith(str(tempdir)) # noqa: SLF001 + assert len(os.listdir(median_computer._temp_path)) \ + == library_length # noqa: SLF001 + # check cwd and parent tempdir contain no files + assert all(not Path.is_file(Path(f)) for f in os.listdir(tmp_path)) + assert all(not Path.is_file(Path(f)) for f in os.listdir(tempdir)) + + # test validate data + candidate = np.zeros((20, 20), dtype=dtype) + with pytest.raises(ValueError, match="shape"): + median_computer.add_image(candidate) + candidate = np.zeros((21, 20), dtype="float64") + with pytest.raises(ValueError, match="dtype"): + median_computer.add_image(candidate) + + # test add and compute + candidate0 = np.full(frame_shape, 3, dtype=dtype) + candidate1 = np.full(frame_shape, 2, dtype=dtype) + candidate2 = np.full(frame_shape, np.nan, dtype=dtype) + for candidate in [candidate0, candidate1, candidate2]: + median_computer.add_image(candidate) + median = median_computer.compute_median() + median_computer.cleanup() + assert median.shape == frame_shape + assert np.all(median == 2.5) + + # test expected error trying to add too many frames + # for loop to ensure always happens, not just the first time + candidate3 = np.zeros_like(candidate0) + for _ in range(2): + with pytest.raises(IndexError): + median_computer.add_image(candidate3) + + # test cleanup of tmpdir and everything else + assert not Path.exists(median_computer._temp_path) # noqa: SLF001 + assert len(os.listdir(tempdir)) == 0 + + +def test_computer(): + """Ensure MedianComputer works the same on disk and in memory""" + full_shape = (3, 21, 20) + comp_memory = MedianComputer(full_shape, True) + comp_disk = MedianComputer(full_shape, False) + for i in range(full_shape[0]): + frame = np.full((21, 20), i, dtype=np.float32) + comp_memory.append(frame, i) + comp_disk.append(frame, i) + assert np.allclose(comp_memory.evaluate(), comp_disk.evaluate()) + + +def test_on_disk_median_bad_inputs(tmp_path): + + library_length = 3 + frame_shape = (21, 20) + dtype = "float32" + tempdir = tmp_path / Path("tmptest") + Path.mkdir(tempdir) + shape = (library_length, *frame_shape) + + with pytest.raises(ValueError, match="shape"): + _OnDiskMedian(frame_shape, dtype=dtype, tempdir=tempdir) + + with pytest.raises(TypeError): + _OnDiskMedian(shape, dtype="float3", tempdir=tempdir) + + with pytest.raises(FileNotFoundError): + _OnDiskMedian(shape, dtype="float32", tempdir="dne") + + # ensure unreasonable buffer size will get set to minimum reasonable buffer + min_buffer = np.dtype(dtype).itemsize*frame_shape[1]*library_length + median_computer = _OnDiskMedian(shape, + dtype=dtype, + tempdir=tempdir, + buffer_size=-1) + assert median_computer.buffer_size == min_buffer + median_computer.cleanup() + + +def test_nanmedian3D(): + + shp = (11, 50, 60) + generator = np.random.default_rng(77) + cube = generator.normal(size=shp) + cube[5, 5:7, 5:8] = np.nan + med = nanmedian3D(cube.astype(np.float32)) + + assert med.dtype == np.float32 + assert np.allclose(med, np.nanmedian(cube, axis=0), equal_nan=True)