Skip to content

Commit

Permalink
fixed small issues after memory investigation
Browse files Browse the repository at this point in the history
  • Loading branch information
annajungbluth committed May 27, 2024
1 parent a6f3237 commit d75c511
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 22 deletions.
39 changes: 29 additions & 10 deletions rs_tools/_src/geoprocessing/goes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,41 @@
GOES_WAVELENGTHS = {
"1": 0.47,
"2": 0.64,
"3": 0.865,
"4": 1.378,
"3": 0.87,
"4": 1.38,
"5": 1.61,
"6": 2.25,
"7": 3.9,
"8": 6.19,
"9": 6.95,
"7": 3.89,
"8": 6.17,
"9": 6.93,
"10": 7.34,
"11": 8.5,
"11": 8.44,
"12": 9.61,
"13": 10.35,
"14": 11.2,
"15": 12.3,
"16": 13.3,
"13": 10.33,
"14": 11.19,
"15": 12.27,
"16": 13.27,
}

GOES_CHANNELS = {
0.47: 1,
0.64: 2,
0.87: 3,
1.38: 4,
1.61: 5,
2.25: 6,
3.89: 7,
6.17: 8,
6.93: 9,
7.34: 10,
8.44: 11,
9.61: 12,
10.33: 13,
11.19: 14,
12.27: 15,
13.27: 16,
}

def parse_goes16_dates_from_file(file: str):
"""
Parses the date and time information from a GOES-16 file name.
Expand Down
40 changes: 28 additions & 12 deletions rs_tools/_src/geoprocessing/goes/geoprocessor_goes16.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import autoroot
import os
import gc
import numpy as np
import rioxarray
from pathlib import Path
Expand All @@ -16,6 +17,7 @@
from rs_tools._src.geoprocessing.goes.reproject import add_goes16_crs
from rs_tools._src.geoprocessing.reproject import convert_lat_lon_to_x_y, calc_latlon
from rs_tools._src.geoprocessing.utils import check_sat_FOV
from rs_tools._src.geoprocessing.goes import GOES_WAVELENGTHS, GOES_CHANNELS
import pandas as pd
import dask
import warnings
Expand Down Expand Up @@ -62,12 +64,13 @@ def goes_files(self) -> List[str]:
files = get_list_filenames(self.read_path, ".nc")
return files

def preprocess_fn(self, ds: xr.Dataset) -> Tuple[xr.Dataset, xr.Dataset]:
def preprocess_fn(self, ds: xr.Dataset, calc_coords=False) -> Tuple[xr.Dataset, xr.Dataset]:
"""
Preprocesses the input dataset by applying corrections, subsetting, and resampling etc.
Args:
ds (xr.Dataset): The input dataset.
calc_coords (bool): Whether to calculate latitude and longitude coordinates. Defaults to False.
Returns:
Tuple[xr.Dataset, xr.Dataset]: The preprocessed dataset and the original dataset.
Expand Down Expand Up @@ -109,16 +112,19 @@ def preprocess_fn(self, ds: xr.Dataset) -> Tuple[xr.Dataset, xr.Dataset]:
ds_subset = resample_rioxarray(ds_subset, resolution=(self.resolution, self.resolution), method=self.resample_method)

# assign coordinates
# ds_subset = calc_latlon(ds_subset)
if calc_coords:
logger.info("Assigning latitude and longitude coordinates.")
ds_subset = calc_latlon(ds_subset)
del ds # delete to avoid memory problems
return ds_subset

def preprocess_fn_radiances(self, ds: xr.Dataset) -> xr.Dataset:
def preprocess_fn_radiances(self, ds: xr.Dataset, calc_coords=False) -> xr.Dataset:
"""
Preprocesses the GOES16 radiance dataset.
Args:
ds (xr.Dataset): The input dataset.
calc_coords (bool): Whether to calculate latitude and longitude coordinates. Defaults to False.
Returns:
xr.Dataset: The preprocessed dataset.
Expand All @@ -131,9 +137,13 @@ def preprocess_fn_radiances(self, ds: xr.Dataset) -> xr.Dataset:
band_wavelength_attributes = ds.band_wavelength.attrs
band_wavelength_values = ds.band_wavelength.values
band_values = ds.band.values
# # Convert keys in dictionary to strings for easier comparison
# GOES_CHANNELS_STR = {f'{round(key, 1)}': value for key, value in GOES_CHANNELS.items()}
# # Round value and extract channel number
# band_values = int(GOES_CHANNELS_STR[f'{round(band_wavelength_values[0], 1):.1f}'])

# do core preprocess function (e.g. to correct band coordinates, subset data, resample, etc.)
ds_subset = self.preprocess_fn(ds)
ds_subset = self.preprocess_fn(ds, calc_coords=calc_coords)

# convert measurement time (in seconds) to datetime
time_stamp = time_stamp.strftime("%Y-%m-%d %H:%M")
Expand Down Expand Up @@ -196,21 +206,25 @@ def preprocess_radiances(self, files: List[str]) -> xr.Dataset:
logger.info(f"Number of radiance files: {len(files)}")
assert len(files) == 16

for i, ifile in tqdm(enumerate(files)):
for i, ifile in enumerate(files):
with xr.load_dataset(ifile, engine='h5netcdf') as ds_file:
ds_file = self.preprocess_fn_radiances(ds_file)
logger.info(f"Loading file {i}/{len(files)}: {ifile}")
if i == 0:
# Preprocess first file and assign lat lon coords
ds_file = self.preprocess_fn_radiances(ds_file, calc_coords=True)
ds = ds_file
else:
# Preprocess other files without calculating lat lon coords
ds_file = self.preprocess_fn_radiances(ds_file, calc_coords=False)
# reinterpolate to match coordinates of the first image
ds_file = ds_file.interp(x=ds.x, y=ds.y)
# concatenate in new band dimension
ds = xr.concat([ds, ds_file], dim="band")
del ds_file # delete to avoid memory problems
gc.collect() # Call the garbage collector to avoid memory problems

# assign coordinates
logger.info("Assigning latitude and longitude coordinates.")
ds = calc_latlon(ds)
# Fix band naming
ds = ds.assign_coords(band=list(GOES_CHANNELS.values()))

# # open multiple files as a single dataset
# ds = [xr.open_mfdataset(ifile, preprocess=self.preprocess_fn_radiances, concat_dim="band", combine="nested") for
Expand All @@ -222,10 +236,10 @@ def preprocess_radiances(self, files: List[str]) -> xr.Dataset:
# ds = xr.concat(ds, dim="band")

# Correct latitude longitude assignment after multiprocessing
ds['latitude'] = ds.latitude.isel(band=0)
ds['longitude'] = ds.longitude.isel(band=0)
# ds['latitude'] = ds.latitude.isel(band=0)
# ds['longitude'] = ds.longitude.isel(band=0)

# NOTE: Keep only certain relevant attributes
# Keep only certain relevant attributes
attrs_rad = ds["Rad"].attrs

ds["Rad"].attrs = {}
Expand Down Expand Up @@ -324,6 +338,8 @@ def preprocess_files(self):
# save to netcdf
pbar_time.set_description(f"Saving to file...:{save_filename}")
ds.to_netcdf(save_filename, engine="netcdf4")
del ds # delete to avoid memory problems
gc.collect() # Call the garbage collector to avoid memory problems

def geoprocess(
resolution: float = None, # defined in meters
Expand Down

0 comments on commit d75c511

Please sign in to comment.