-
-
Notifications
You must be signed in to change notification settings - Fork 129
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
f215439
commit 993d35b
Showing
1 changed file
with
89 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
# a hack around tabular-fits that supports putting header info in the primary hdu | ||
|
||
from astropy.wcs import WCS | ||
from specutils import Spectrum1D | ||
import astropy.units as u | ||
from astropy.io import fits | ||
import numpy as np | ||
from astropy.table import Table | ||
|
||
def mef_tabular_fits_writer(spectrum, file_name, hdu=0, update_header=False, **kwargs): | ||
|
||
# no matter what the dimensionality of 'flux', there will always be a primary header and | ||
# a single BinTableHDU extension. the `hdu` arg controls where `meta.header` is dumped. | ||
hdulist = [fits.PrimaryHDU(), None] | ||
|
||
if hdu > 1: | ||
raise ValueError('`hdu`, which controls which extension `meta.header` is written to, must either be 0 or 1') | ||
|
||
# we expect anything that should be written out to the file header to be in `meta.header` | ||
# This should be a `fits.header.Header` object, so convert to if meta.header is a dictionary | ||
header = spectrum.meta.get('header', fits.header.Header()) # if no meta.header, create empty `fits.header.Header` | ||
if not isinstance(header, fits.header.Header): | ||
if isinstance(header, dict): | ||
header = fits.header.Header(header) | ||
else: | ||
raise ValueError('`Spectrum1d.meta.header must be `fits.header.Header` or dictionary.') | ||
|
||
if update_header: | ||
hdr_types = (str, int, float, complex, bool, | ||
np.floating, np.integer, np.complexfloating, np.bool_) | ||
header.update([keyword for keyword in spectrum.meta.items() if | ||
isinstance(keyword[1], hdr_types)]) | ||
|
||
# Strip header of FITS reserved keywords | ||
for keyword in ['NAXIS', 'NAXIS1', 'NAXIS2']: | ||
header.remove(keyword, ignore_missing=True) | ||
|
||
# Add dispersion array and unit | ||
wtype = kwargs.pop('wtype', spectrum.spectral_axis.dtype) | ||
wunit = u.Unit(kwargs.pop('wunit', spectrum.spectral_axis.unit)) | ||
|
||
disp = spectrum.spectral_axis.to(wunit, equivalencies=u.spectral()) | ||
|
||
# Mapping of spectral_axis types to header TTYPE1 (no "torque/work" types!) | ||
dispname = str(wunit.physical_type) | ||
if dispname == "length": | ||
dispname = "wavelength" | ||
elif "energy" in dispname: | ||
dispname = "energy" | ||
|
||
# Add flux array and unit | ||
ftype = kwargs.pop('ftype', spectrum.flux.dtype) | ||
funit = u.Unit(kwargs.pop('funit', spectrum.flux.unit)) | ||
flux = spectrum.flux.to(funit, equivalencies=u.spectral_density(disp)) | ||
|
||
columns = [disp.astype(wtype), flux.astype(ftype)] | ||
colnames = [dispname, "flux"] | ||
|
||
# Include uncertainty - units to be inferred from spectrum.flux | ||
if spectrum.uncertainty is not None: | ||
try: | ||
unc = ( | ||
spectrum | ||
.uncertainty | ||
.represent_as(StdDevUncertainty) | ||
.quantity | ||
.to(funit, equivalencies=u.spectral_density(disp)) | ||
) | ||
columns.append(unc.astype(ftype)) | ||
colnames.append("uncertainty") | ||
except RuntimeWarning: | ||
raise ValueError("Could not convert uncertainty to StdDevUncertainty due" | ||
" to divide-by-zero error.") | ||
|
||
# For > 1D data transpose from row-major format | ||
for c in range(1, len(columns)): | ||
if columns[c].ndim > 1: | ||
columns[c] = columns[c].T | ||
|
||
tab = Table(columns, names=colnames, meta=header) | ||
hdulist[1] = fits.BinTableHDU(tab) | ||
|
||
# now figure out where meta.header (and if specified, addl meta keys) should go | ||
if hdu==0: | ||
hdulist[0].header = header | ||
else: # must otherwise be 1, we already checked this | ||
hdulist[1].header = header | ||
|
||
hdulist = fits.HDUList(hdulist) |