From bee27adb8d1dccc1db998be3bbabdd6cca196265 Mon Sep 17 00:00:00 2001 From: annajungbluth Date: Sun, 26 May 2024 22:40:10 +0000 Subject: [PATCH] pushed changes from memory leakage experiments --- .../geoprocessing/goes/geoprocessor_goes16.py | 89 +++++++++---------- 1 file changed, 41 insertions(+), 48 deletions(-) diff --git a/rs_tools/_src/geoprocessing/goes/geoprocessor_goes16.py b/rs_tools/_src/geoprocessing/goes/geoprocessor_goes16.py index d21b62c..a6c4095 100644 --- a/rs_tools/_src/geoprocessing/goes/geoprocessor_goes16.py +++ b/rs_tools/_src/geoprocessing/goes/geoprocessor_goes16.py @@ -9,12 +9,11 @@ from rs_tools._src.utils.io import get_list_filenames import typer from loguru import logger -from satpy import Scene import xarray as xr from rs_tools._src.geoprocessing.interp import resample_rioxarray from rs_tools._src.geoprocessing.goes import parse_goes16_dates_from_file, format_goes_dates from rs_tools._src.geoprocessing.goes.validation import correct_goes16_bands, correct_goes16_satheight -from rs_tools._src.geoprocessing.goes.reproject import add_goes16_crs, add_goes16_crs_satpy +from rs_tools._src.geoprocessing.goes.reproject import add_goes16_crs from rs_tools._src.geoprocessing.reproject import convert_lat_lon_to_x_y, calc_latlon from rs_tools._src.geoprocessing.utils import check_sat_FOV import pandas as pd @@ -24,6 +23,7 @@ dask.config.set(**{'array.slicing.split_large_chunks': False}) warnings.filterwarnings('ignore', category=FutureWarning) +# TODO: Add unit conversion? @dataclass class GOES16GeoProcessing: """ @@ -73,18 +73,20 @@ def preprocess_fn(self, ds: xr.Dataset) -> Tuple[xr.Dataset, xr.Dataset]: Returns: Tuple[xr.Dataset, xr.Dataset]: The preprocessed dataset and the original dataset. """ + # logger.debug('copying ds') + # # copy to avoid modifying original dataset + # ds = ds.copy() logger.debug('correcting satheight') - ds = add_goes16_crs_satpy(ds) # convert measurement angles to horizontal distance in meters - # ds = correct_goes16_satheight(ds) - # try: - # # correct band coordinates to reorganize xarray dataset - # ds = correct_goes16_bands(ds) - # except AttributeError: - # pass + ds = correct_goes16_satheight(ds) + try: + # correct band coordinates to reorganize xarray dataset + ds = correct_goes16_bands(ds) + except AttributeError: + pass # assign coordinate reference system - # ds = add_goes16_crs(ds) + ds = add_goes16_crs(ds) logger.debug('region subsetting') if self.region is not None: @@ -131,37 +133,35 @@ def preprocess_fn_radiances(self, ds: xr.Dataset) -> xr.Dataset: Returns: xr.Dataset: The preprocessed dataset. """ - # variables = ["Rad", "DQF"] # "Rad" = radiance, "DQF" = data quality flag - variables = list(ds.data_vars.keys())[0] + variables = ["Rad", "DQF"] # "Rad" = radiance, "DQF" = data quality flag - # time_stamp = pd.to_datetime(ds.t.values) - # band_attributes = ds.band.attrs - # band_wavelength_attributes = ds.band_wavelength.attrs - # band_wavelength_values = ds.band_wavelength.values - # band_values = ds.band.values - - time_stamp = ds.attrs['start_time'] + # TODO comment + time_stamp = pd.to_datetime(ds.t.values) + band_attributes = ds.band.attrs + band_wavelength_attributes = ds.band_wavelength.attrs + band_wavelength_values = ds.band_wavelength.values + band_values = ds.band.values # do core preprocess function (e.g. to correct band coordinates, subset data, resample, etc.) ds_subset = self.preprocess_fn(ds) # select relevant variables - # ds_subset = ds_subset[variables] + ds_subset = ds_subset[variables] # convert measurement time (in seconds) to datetime # time_stamp = pd.to_datetime(ds.t.values) time_stamp = time_stamp.strftime("%Y-%m-%d %H:%M") # assign bands data to each variable - # ds_subset[variables] = ds_subset[variables].expand_dims({"band": band_values}) + ds_subset[variables] = ds_subset[variables].expand_dims({"band": band_values}) # attach time coordinate ds_subset = ds_subset.assign_coords({"time": [time_stamp]}) # drop variables that will no longer be needed - # ds_subset = ds_subset.drop_vars(["t", "y_image", "x_image", "goes_imager_projection"]) - ds_subset = ds_subset.drop_vars(["goes_imager_projection"]) + ds_subset = ds_subset.drop_vars(["t", "y_image", "x_image", "goes_imager_projection"]) # assign band attributes to dataset - # ds_subset.band.attrs = band_attributes + ds_subset.band.attrs = band_attributes + # TODO: Correct wavelength assignment. This attaches 16 wavelengths to each band. # assign band wavelength to each variable - # ds_subset = ds_subset.assign_coords({"band_wavelength": band_wavelength_values}) - # ds_subset.band_wavelength.attrs = band_wavelength_attributes + ds_subset = ds_subset.assign_coords({"band_wavelength": band_wavelength_values}) + ds_subset.band_wavelength.attrs = band_wavelength_attributes logger.debug('finished preprocess fn radiances') return ds_subset @@ -185,6 +185,7 @@ def preprocess_fn_cloudmask(self, ds: xr.Dataset) -> xr.Dataset: # select relevant variable ds_subset = ds_subset[variables] # convert measurement time (in seconds) to datetime + # logger.debug(f'{ds.t.values} --- {ds_subset.t.values}') # time_stamp = pd.to_datetime(ds.t.values) time_stamp = time_stamp.strftime("%Y-%m-%d %H:%M") # assign time data to variable @@ -215,27 +216,19 @@ def preprocess_radiances(self, files: List[str]) -> xr.Dataset: for i, ifile in enumerate(files): logger.debug(f"Loading file {i}: {ifile}") # ds_file = xr.load_dataset(ifile, engine='h5netcdf')#, preprocess=self.preprocess_fn_radiances, concat_dim="band", combine="nested") - scn = Scene( - reader='abi_l1b', - filenames=[ifile] - ) - # Load radiance bands - channels = [x for x in scn.available_dataset_names()] - scn.load(channels, generate=False, calibration='radiance') - # conver to xarray dataset - ds_file = scn.to_xarray_dataset() - ds_file = self.preprocess_fn_radiances(ds_file) - if i == 0: - # ds_list = [ds_file] - ds = ds_file - else: - logger.debug("Starting interpolation {i}...") - # reinterpolate to match coordinates of the first image - ds_file = ds_file.interp(x=ds.x, y=ds.y) - logger.debug("appending ds {i}...") - # ds_list.append(ds_file) - ds = xr.concat([ds, ds_file], dim="band") - del ds_file + with xr.load_dataset(ifile, engine='h5netcdf') as ds_file: + ds_file = self.preprocess_fn_radiances(ds_file) + if i == 0: + # ds_list = [ds_file] + ds = ds_file + else: + logger.debug("Starting interpolation {i}...") + # reinterpolate to match coordinates of the first image + ds_file = ds_file.interp(x=ds.x, y=ds.y) + logger.debug("appending ds {i}...") + # ds_list.append(ds_file) + ds = xr.concat([ds, ds_file], dim="band") + del ds_file # concatenate in new band dimension # ds = xr.concat(ds_list, dim="band") @@ -365,7 +358,7 @@ def preprocess_files(self): def geoprocess( resolution: float = None, # defined in meters - read_path: str = "/mnt/disks/data/miniset/goes16/raw", + read_path: str = "./", save_path: str = "./", region: str = None, resample_method: str = "bilinear",