Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JP-3768: Move outlier detection median computers to stcal #292

Merged
merged 16 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/292.apichange.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add `outlier_detection` median calculators from jwst.
355 changes: 355 additions & 0 deletions src/stcal/outlier_detection/median.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,355 @@
"""Compute median of large datasets in memory- and runtime-efficient ways."""

from __future__ import annotations

import logging
import tempfile
import warnings
from pathlib import Path
from typing import Any

import numpy as np

log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG)

_ONE_MB = 1 << 20

braingram marked this conversation as resolved.
Show resolved Hide resolved

def nanmedian3D(cube: np.ndarray, overwrite_input: bool = True) -> np.ndarray:
"""Compute the median of a cube ignoring warnings and with
memory efficiency optimizations. np.nanmedian always uses at least 64-bit
precision internally, and this is too memory-intensive. Instead, loop over
the median calculation to avoid the memory usage of the internal upcasting
and temporary array allocation. The additional runtime of this loop is
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the largest index of shape[0] tested? When testing ramp fitting the file jw02589006001_04101_00001-seg001_nrs1_uncal.fits has 6103 integrations. This seriously slowed down computation time in ramp fitting when running python compared to data with the same dimensions, but a much lower number of integrations.

Copy link
Collaborator Author

@emolter emolter Sep 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand the question you are asking about the runtime performance of the nanmedian3D function, right? This conversation on the JWST PR may be relevant, although no test was done with the zeroth (time/n_groups) axis being as large as that.

When you say that it seriously slowed down runtime in ramp fitting, do you mean that a median calculation was done, or just that the entire step scaled poorly with the number of integrations?

For imaging data, where we've seen the slowest processing, the largest n_groups we have seen is only ~100, since the step takes _cal files as input instead of _calints. However, I can look into whether there would be any coronagraphic data that had a huge number of integrations in their _calints files, as this is also used by jwst for coronagraphic data

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried this on a relatively large coronagraphy dataset I had lying around from JWSTDMS-921. The shape of the array going into this function was (1250, 224, 288), so still not 6000 but this is on the large side of what is processed here in practice. The median calculation does not blow up the runtime in this case: the entire outlier detection step took only 5 seconds to run, as compared with ~1 hour for the coronagraphy-specific align_refs and klip steps. So I don't think this is a concern for runtime but let me know if you would like to see additional tests run.

indistinguishable from zero, but this loop cuts overall step memory usage
roughly in half for at least one test association.
"""
braingram marked this conversation as resolved.
Show resolved Hide resolved
with warnings.catch_warnings():
warnings.filterwarnings(action="ignore",
message="All-NaN slice encountered",
category=RuntimeWarning)
output_arr = np.empty(cube.shape[1:], dtype=np.float32)
braingram marked this conversation as resolved.
Show resolved Hide resolved
for i in range(output_arr.shape[0]):
# this for loop looks silly, but see docstring above
np.nanmedian(cube[:, i, :],
axis=0,
overwrite_input=overwrite_input,
out=output_arr[i, :])
return output_arr


class MedianComputer:
"""
Top-level class to treat median computation uniformly, whether in
memory or on disk.
"""

def __init__(self: MedianComputer,
braingram marked this conversation as resolved.
Show resolved Hide resolved
full_shape: tuple,
in_memory: bool,
buffer_size: int | None = None,
dtype: str | np.dtype = "float32"
braingram marked this conversation as resolved.
Show resolved Hide resolved
) -> None:
"""
Parameters
----------
full_shape
The shape of the full input dataset.

in_memory
Whether to perform the median computation in memory or using
temporary files on disk to save memory.

buffer_size
The buffer size for the median computation, units of bytes.
Has no effect if in_memory is True.

dtype
The data type of the input data.
"""
self.full_shape = full_shape
self.in_memory = in_memory
if buffer_size is None:
buffer_size = 0
self.buffer_size = buffer_size
self.dtype = dtype
if self.in_memory:
computer: Any = np.empty(full_shape, dtype=dtype)
else:
computer = OnDiskMedian(full_shape,
dtype=dtype,
buffer_size=buffer_size)
self._median_computer: Any = computer

def append(self: MedianComputer,
data: np.ndarray,
idx: int | None = None
) -> None:
"""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing docstring describing method.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know if the updated docstring looks ok to you

Parameters
----------
data
The data to append to the median computer.
braingram marked this conversation as resolved.
Show resolved Hide resolved

idx
The index at which to append the data. Required if using in-memory
braingram marked this conversation as resolved.
Show resolved Hide resolved
median computation.
"""
if self.in_memory:
if idx is None:
msg = "Index must be provided when using in-memory median"
raise ValueError(msg)

Check warning on line 102 in src/stcal/outlier_detection/median.py

View check run for this annotation

Codecov / codecov/patch

src/stcal/outlier_detection/median.py#L101-L102

Added lines #L101 - L102 were not covered by tests
# populate pre-allocated memory with the drizzled data
self._median_computer[idx] = data
else:
# distribute the drizzled data into the temporary storage
self._median_computer.add_image(data)

def evaluate(self: MedianComputer) -> np.ndarray:
"""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing docstring describing method.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know if the updated docstring looks ok to you

Returns
-------
median_data
The median data computed from the input data.
"""
if self.in_memory:
median_data = nanmedian3D(self._median_computer)
else:
median_data = self._median_computer.compute_median()
self._median_computer.cleanup()
return median_data


class DiskAppendableArray:
braingram marked this conversation as resolved.
Show resolved Hide resolved
"""
Creates a temporary file to which to append data, in order to perform
timewise operations on a stack of input images without holding all of them
in memory.

This class is purpose-built for median computation during outlier detection
and is not very flexible. It is assumed that each data array passed to
`append` represents the same spatial segment of the full dataset. It is
also assumed that each array passed to `append` represents only a single
instant in time; the append operation will stack them along a new axis.

The `read` operation is only capable of loading the full array back
into memory. When working with large datasets that do not fit in memory
the required workflow is to create many DiskAppendableArray objects, each
holding a small spatial segment of the full dataset.
"""

def __init__(self: DiskAppendableArray,
slice_shape: tuple,
dtype: str | np.dtype,
filename: str | Path
) -> None:
"""
Parameters
----------
slice_shape
The shape of the spatial section of input data to be appended
to the array.
braingram marked this conversation as resolved.
Show resolved Hide resolved

dtype
The data type of the array. Must be a valid numpy array datatype.

filename
The full file path in which to store the array
"""
if len(slice_shape) != 2:
msg = f"Invalid slice shape {slice_shape}. Only 2-D arrays "
msg += "are supported."
raise ValueError(msg)
self._filename = Path(filename)
with Path.open(self._filename, "wb") as f: # noqa: F841
pass
self._slice_shape = slice_shape
self._dtype = np.dtype(dtype)
self._append_count = 0

@property
def shape(self: DiskAppendableArray) -> tuple:
return (self._append_count, *self._slice_shape)

def append(self: DiskAppendableArray, data: np.ndarray) -> None:
"""Add a new slice to the temporary file."""
braingram marked this conversation as resolved.
Show resolved Hide resolved
if data.shape != self._slice_shape:
msg = f"Data shape {data.shape} does not match slice shape "
msg += f"{self._slice_shape}"
raise ValueError(msg)
if data.dtype != self._dtype:
msg = f"Data dtype {data.dtype} does not match array dtype "
msg += f"{self._dtype}"
raise ValueError(msg)
with Path.open(self._filename, "ab") as f:
data.tofile(f, sep="")
self._append_count += 1

def read(self: DiskAppendableArray) -> np.ndarray:
"""Read the 3-D array into memory."""
braingram marked this conversation as resolved.
Show resolved Hide resolved
shp = (self._append_count, *self._slice_shape)
with Path.open(self._filename, "rb") as f:
return np.fromfile(f, dtype=self._dtype).reshape(shp)


class OnDiskMedian:
braingram marked this conversation as resolved.
Show resolved Hide resolved

def __init__(self: OnDiskMedian,
shape: tuple,
dtype: str | np.dtype = "float32",
tempdir: str = "",
buffer_size: int = 0
) -> None:
"""
Set up temporary files to perform operations on a stack of 2-D input
arrays along the stacking axis (e.g., a time axis) without
braingram marked this conversation as resolved.
Show resolved Hide resolved
holding all of them in memory. Currently the only supported operation
is the median.

Parameters
----------
shape
The shape of the entire input, (n_images, imrows, imcols).

dtype
The data type of the input data.

tempdir
The parent directory in which to create the temporary directory,
which itself holds all the DiskAppendableArray tempfiles.
Default is the current working directory.

buffer_size
The buffer size, units of bytes.
Default is the size of one input image.
"""
if len(shape) != 3:
msg = f"Invalid input shape {shape}; "
msg += "only three-dimensional data are supported."
raise ValueError(msg)
self._expected_nframes = shape[0]
self.frame_shape = shape[1:]
self.dtype = np.dtype(dtype)
self.itemsize = self.dtype.itemsize
self._temp_dir = tempfile.TemporaryDirectory(dir=tempdir)
self._temp_path = Path(self._temp_dir.name)

# figure out number of sections and rows per section that are needed
self.nsections, self.section_nrows = \
self._get_buffer_indices(buffer_size=buffer_size)
self.slice_shape = (self.section_nrows, shape[2])
self._n_adds = 0

# instantiate a temporary DiskAppendableArray for each section
self._temp_arrays = self._temparray_setup(dtype)

def _get_buffer_indices(self: OnDiskMedian,
buffer_size: int = 0
) -> tuple[int, int]:
"""
Parameters
----------
buffer_size
The buffer size for the median computation, units of bytes.

Returns
-------
nsections
The number of sections to divide the input data into.

section_nrows
The number of rows in each section (except the last one).
"""
imrows, imcols = self.frame_shape
if buffer_size == 0:
buffer_size = imrows * imcols * self.itemsize
per_model_buffer_size = buffer_size / self._expected_nframes
min_buffer_size = imcols * self.itemsize
section_nrows = \
min(imrows, int(per_model_buffer_size // min_buffer_size))

if section_nrows <= 0:
buffer_size = min_buffer_size * self._expected_nframes
msg = "Buffer size is too small to hold a single row. "
msg += f"Increasing buffer size to {buffer_size / _ONE_MB} MB"
braingram marked this conversation as resolved.
Show resolved Hide resolved
log.warning(msg)
section_nrows = 1
self.buffer_size = buffer_size

nsections = int(np.ceil(imrows / section_nrows))
msg = f"Computing median over {self._expected_nframes} "
msg += f"groups in {nsections} sections "
msg += f"with total memory buffer {buffer_size / _ONE_MB} MB"
log.info(msg)
return nsections, section_nrows

def _temparray_setup(self: OnDiskMedian,
dtype: str | np.dtype
) -> list[DiskAppendableArray]:
"""Set up temp file handlers for each spatial section."""
braingram marked this conversation as resolved.
Show resolved Hide resolved
temp_arrays = []
for i in range(self.nsections):
shp = self.slice_shape
if i == self.nsections - 1:
# last section has whatever shape is left over
shp = (self.frame_shape[0] - (self.nsections-1) *
self.section_nrows, self.frame_shape[1])
arr = DiskAppendableArray(shp, dtype, self._temp_path / f"{i}.bin")
temp_arrays.append(arr)
return temp_arrays

def add_image(self: OnDiskMedian, data: np.ndarray) -> None:
"""
Split resampled model data into spatial sections
and write to disk.
braingram marked this conversation as resolved.
Show resolved Hide resolved
"""
if self._n_adds >= self.nsections:
msg = "Too many calls to add_image. "
msg += f"Expected at most {self.nsections} input models."
raise IndexError(msg)
self._validate_data(data)
self._n_adds += 1
for i in range(self.nsections):
row1 = i * self.section_nrows
row2 = min(row1 + self.section_nrows, self.frame_shape[0])
arr = self._temp_arrays[i]
arr.append(data[row1:row2])

def _validate_data(self: OnDiskMedian, data: np.ndarray) -> None:
"""Ensure data array being appended has correct shape and dtype."""
if data.shape != self.frame_shape:
msg = f"Data shape {data.shape} does not match expected shape "
msg += f"{self.frame_shape}"
raise ValueError(msg)
if data.dtype != self.dtype:
msg = f"Data dtype {data.dtype} does not match expected dtype "
msg += f"{self.dtype}"
raise ValueError(msg)

def cleanup(self: OnDiskMedian) -> None:
"""Remove the temporary files and directory when finished."""
braingram marked this conversation as resolved.
Show resolved Hide resolved
self._temp_dir.cleanup()

def compute_median(self: OnDiskMedian) -> np.ndarray:
"""
Read spatial sections from disk and compute the median across groups
(median over number of exposures on a per-pixel basis).
braingram marked this conversation as resolved.
Show resolved Hide resolved
"""
row_indices = [(i * self.section_nrows,
min((i+1) * self.section_nrows, self.frame_shape[0]))
for i in range(self.nsections)]

output_rows = row_indices[-1][1]
output_cols = self._temp_arrays[0].shape[2]
median_image = np.full((output_rows, output_cols),
np.nan,
dtype=self.dtype)

for i, disk_arr in enumerate(self._temp_arrays):
row1, row2 = row_indices[i]
arr = disk_arr.read()
median_image[row1:row2] = nanmedian3D(arr)
del arr, disk_arr

return median_image
Loading
Loading