Skip to content

Commit

Permalink
pushed changes from memory leakage experiments
Browse files Browse the repository at this point in the history
  • Loading branch information
annajungbluth committed May 26, 2024
1 parent 1c77844 commit bee27ad
Showing 1 changed file with 41 additions and 48 deletions.
89 changes: 41 additions & 48 deletions rs_tools/_src/geoprocessing/goes/geoprocessor_goes16.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@
from rs_tools._src.utils.io import get_list_filenames
import typer
from loguru import logger
from satpy import Scene
import xarray as xr
from rs_tools._src.geoprocessing.interp import resample_rioxarray
from rs_tools._src.geoprocessing.goes import parse_goes16_dates_from_file, format_goes_dates
from rs_tools._src.geoprocessing.goes.validation import correct_goes16_bands, correct_goes16_satheight
from rs_tools._src.geoprocessing.goes.reproject import add_goes16_crs, add_goes16_crs_satpy
from rs_tools._src.geoprocessing.goes.reproject import add_goes16_crs
from rs_tools._src.geoprocessing.reproject import convert_lat_lon_to_x_y, calc_latlon
from rs_tools._src.geoprocessing.utils import check_sat_FOV
import pandas as pd
Expand All @@ -24,6 +23,7 @@
dask.config.set(**{'array.slicing.split_large_chunks': False})
warnings.filterwarnings('ignore', category=FutureWarning)

# TODO: Add unit conversion?
@dataclass
class GOES16GeoProcessing:
"""
Expand Down Expand Up @@ -73,18 +73,20 @@ def preprocess_fn(self, ds: xr.Dataset) -> Tuple[xr.Dataset, xr.Dataset]:
Returns:
Tuple[xr.Dataset, xr.Dataset]: The preprocessed dataset and the original dataset.
"""
# logger.debug('copying ds')
# # copy to avoid modifying original dataset
# ds = ds.copy()

logger.debug('correcting satheight')
ds = add_goes16_crs_satpy(ds)
# convert measurement angles to horizontal distance in meters
# ds = correct_goes16_satheight(ds)
# try:
# # correct band coordinates to reorganize xarray dataset
# ds = correct_goes16_bands(ds)
# except AttributeError:
# pass
ds = correct_goes16_satheight(ds)
try:
# correct band coordinates to reorganize xarray dataset
ds = correct_goes16_bands(ds)
except AttributeError:
pass
# assign coordinate reference system
# ds = add_goes16_crs(ds)
ds = add_goes16_crs(ds)

logger.debug('region subsetting')
if self.region is not None:
Expand Down Expand Up @@ -131,37 +133,35 @@ def preprocess_fn_radiances(self, ds: xr.Dataset) -> xr.Dataset:
Returns:
xr.Dataset: The preprocessed dataset.
"""
# variables = ["Rad", "DQF"] # "Rad" = radiance, "DQF" = data quality flag
variables = list(ds.data_vars.keys())[0]
variables = ["Rad", "DQF"] # "Rad" = radiance, "DQF" = data quality flag

# time_stamp = pd.to_datetime(ds.t.values)
# band_attributes = ds.band.attrs
# band_wavelength_attributes = ds.band_wavelength.attrs
# band_wavelength_values = ds.band_wavelength.values
# band_values = ds.band.values

time_stamp = ds.attrs['start_time']
# TODO comment
time_stamp = pd.to_datetime(ds.t.values)
band_attributes = ds.band.attrs
band_wavelength_attributes = ds.band_wavelength.attrs
band_wavelength_values = ds.band_wavelength.values
band_values = ds.band.values

# do core preprocess function (e.g. to correct band coordinates, subset data, resample, etc.)
ds_subset = self.preprocess_fn(ds)

# select relevant variables
# ds_subset = ds_subset[variables]
ds_subset = ds_subset[variables]
# convert measurement time (in seconds) to datetime
# time_stamp = pd.to_datetime(ds.t.values)
time_stamp = time_stamp.strftime("%Y-%m-%d %H:%M")
# assign bands data to each variable
# ds_subset[variables] = ds_subset[variables].expand_dims({"band": band_values})
ds_subset[variables] = ds_subset[variables].expand_dims({"band": band_values})
# attach time coordinate
ds_subset = ds_subset.assign_coords({"time": [time_stamp]})
# drop variables that will no longer be needed
# ds_subset = ds_subset.drop_vars(["t", "y_image", "x_image", "goes_imager_projection"])
ds_subset = ds_subset.drop_vars(["goes_imager_projection"])
ds_subset = ds_subset.drop_vars(["t", "y_image", "x_image", "goes_imager_projection"])
# assign band attributes to dataset
# ds_subset.band.attrs = band_attributes
ds_subset.band.attrs = band_attributes
# TODO: Correct wavelength assignment. This attaches 16 wavelengths to each band.
# assign band wavelength to each variable
# ds_subset = ds_subset.assign_coords({"band_wavelength": band_wavelength_values})
# ds_subset.band_wavelength.attrs = band_wavelength_attributes
ds_subset = ds_subset.assign_coords({"band_wavelength": band_wavelength_values})
ds_subset.band_wavelength.attrs = band_wavelength_attributes
logger.debug('finished preprocess fn radiances')
return ds_subset

Expand All @@ -185,6 +185,7 @@ def preprocess_fn_cloudmask(self, ds: xr.Dataset) -> xr.Dataset:
# select relevant variable
ds_subset = ds_subset[variables]
# convert measurement time (in seconds) to datetime
# logger.debug(f'{ds.t.values} --- {ds_subset.t.values}')
# time_stamp = pd.to_datetime(ds.t.values)
time_stamp = time_stamp.strftime("%Y-%m-%d %H:%M")
# assign time data to variable
Expand Down Expand Up @@ -215,27 +216,19 @@ def preprocess_radiances(self, files: List[str]) -> xr.Dataset:
for i, ifile in enumerate(files):
logger.debug(f"Loading file {i}: {ifile}")
# ds_file = xr.load_dataset(ifile, engine='h5netcdf')#, preprocess=self.preprocess_fn_radiances, concat_dim="band", combine="nested")
scn = Scene(
reader='abi_l1b',
filenames=[ifile]
)
# Load radiance bands
channels = [x for x in scn.available_dataset_names()]
scn.load(channels, generate=False, calibration='radiance')
# conver to xarray dataset
ds_file = scn.to_xarray_dataset()
ds_file = self.preprocess_fn_radiances(ds_file)
if i == 0:
# ds_list = [ds_file]
ds = ds_file
else:
logger.debug("Starting interpolation {i}...")
# reinterpolate to match coordinates of the first image
ds_file = ds_file.interp(x=ds.x, y=ds.y)
logger.debug("appending ds {i}...")
# ds_list.append(ds_file)
ds = xr.concat([ds, ds_file], dim="band")
del ds_file
with xr.load_dataset(ifile, engine='h5netcdf') as ds_file:
ds_file = self.preprocess_fn_radiances(ds_file)
if i == 0:
# ds_list = [ds_file]
ds = ds_file
else:
logger.debug("Starting interpolation {i}...")
# reinterpolate to match coordinates of the first image
ds_file = ds_file.interp(x=ds.x, y=ds.y)
logger.debug("appending ds {i}...")
# ds_list.append(ds_file)
ds = xr.concat([ds, ds_file], dim="band")
del ds_file
# concatenate in new band dimension
# ds = xr.concat(ds_list, dim="band")

Expand Down Expand Up @@ -365,7 +358,7 @@ def preprocess_files(self):

def geoprocess(
resolution: float = None, # defined in meters
read_path: str = "/mnt/disks/data/miniset/goes16/raw",
read_path: str = "./",
save_path: str = "./",
region: str = None,
resample_method: str = "bilinear",
Expand Down

0 comments on commit bee27ad

Please sign in to comment.