Skip to content

PseudoCode ‐ Full Pipeline

Juan Emmanuel Johnson edited this page Feb 16, 2024 · 1 revision

Draft Pipeline

This is a draft pipeline based on the pipeline we have in the miro board.

Steps

Data Downloader

  • GOES Download
  • MODIS Downloader
  • GOES Download Parameters
  • MODIS Download Parameters

Note: the limiting frequency is MODIS and the limiting region is GOES. So, the parameters:

  • MODIS freq --> freq
  • GOES region --> region

The period is for both (determined by us)


# PARAMETERS
period: list[str] = ...
region: list[...] = ...
save_path: str = "path/to/file"

Then, we download MODIS

# download modis
modis_channels = list[...]
modis_satellite = ...
modis_save_path = save_path.joinpath("...")
modis_download_params = {...}

Options:

  • Case I: Aligned --> choose the MODIS time stamps and use the nearest function from goes2go
  • Case II: Partially Aligned --> Choose a goes image at a rough freq on MODIS, e.g., daily
# download goes
freq: list[...] = ...
goes_channels: list[...] = ...
goes_satellite: list[...] = ...
goes_save_path = save_path.joinpath("...")
# blah  blah blah
goes_download_params = {...}

Save all of this to some filepath...


GeoProcessing

Parameters

  • Pre-Select Region
  • Choose Spatial Resolution (per satellite)

Operations

  • Create Target Grid
  • Harmonize DataStructures (per satellite)
  • Unit Conversion
  • CRS Transformation
  • Resampling Transformation

First, choose the global parameters

region_of_interest: Geometry = ...
spatial_resolution_goes: Resolution = ...
spatial_resolution_modis: Resolution = ...
crs: CRSObject = ...
MODIS_ANALYSIS_READY_BUCKET: str = ...
GOES_ANALYSIS_READY_BUCKET: str = ...

Second, choose harmonized dataset structure

ds: xr.Dataset["Channels Latitude Longitude"] = ...

CREATE TARGET GRID

GOES_TARGET_GRID = create_target_grid(goes_bounds, goes_resolution, *kwargs)
MODIS_TARGET_GRID = create_target_grid(modis_bounds, modis_resolution, *kwargs)

GOES HARMONIZER

GOES_BAND_NAMES = {
    'EV_250m....': "BAND_BLAH_1", 
    ...
}
# preprocess stuff
def preprocess(...):
    # calculate proper CRS transformation (radians [incoming angle] --> degrees [lat/lon])
    ds = calculate_crs(ds, **params)
    # make each band the same resolution (from our selection), [delta degrees]
    ds = resample(ds, GOES_TARGET_GRID, **params)
    return ds

# loop through each band
# aggregate the bands (C,H,W)
ds: xr.Dataset["channel lat lon"] = xr.open_mfdatasets(list_of_datasets, dim=["channels"], preprocess=preprocess)
# TODO: choose order of bands
# example: [16, lat, lon]
# save
ds.to_netcdf(GOES_ANALYSIS_READY_BUCKET)

MODIS HARMONIZER

Perform operations (per satellite)

MODIS_VARIABLES_NAMES = {
    'EV_250m....': "BAND_BLAH_1", 
    ...
}
# open dataset
ds = xr.open_dataset(...)
# select variables --> stack to channels
ds = ds.stack(MODIS_VARIABLES_NAMES)
# remap to target resolution
ds = resample(ds, MODIS_TARGET_GRID, **params)
# convert to W/m^2/sr/...
ds = convert_units(ds, **params)
# save
ds.to_netcdf(MODIS_ANALYSIS_READY_BUCKET)

Pre-Processing

Parameters

  • Normalizer Function + Params
  • Sub-Region
  • Sub-Period
  • Sub-Channels
  • Patch-Size
  • ML-Ready Bucket

Operators

  • Create Normalizer
  • Patcher
# Define Parameters
# Load Data / Load Data Files
# Choose Normalizer
normalizer: Normalizer = create_normalizer(*params)
# calculate normalizing parameters
normalizer.fit(...)
# Save Normalizer
save(normalizer, "path/to/file")
# Create Patches
# Save Patches

Normalization

# use XARRAY
# load dataset (NOTE - USE DASK)
ds_stats = xr.open_mfdataset(..., dim=["time", ...])
# calculate stats
mean = ds_stats.mean()
std = ds_stats.std()
# choose normalizer
normalizer: Normalizer = create_normalizer("name_of_normalizer", mean, std)
# save normalizer
save(normalizer, "path/to/file")

Patcher

Question: Datastructures

# load dataset
ds = ...
# Instantiate the patching logic for testing
patches = dict(longitude=30, latitude=30)
strides = dict(longitude=5, latitude=5)

patcher = XRDAPatcher(
    da=ds,
    patches=patches,
    strides=strides,        # Overlap
    check_full_scan=True    # check no extra dimensions
)
# save the patches

#### COPY_PASTED ####
save_base_path = Path(os.getcwd() + "/path/to/data")
save_base_path.mkdir(parents=True, exist_ok=True)

# CREATE STEP
def save_step(ifile, ipatch):

    # transform patch
    ipatch: np.ndarray = ipatch.values.astype(float32)

    # save name
    save_path = save_base_path.joinpath(f"patch_{ifile}.npy")
    # save with numpy
    np.save(save_path, ipatch)


# SAVE
for ifile, ipatch in enumerate(tqdm.tqdm(patcher)):
    save_step(ifile, ipatch)

PARALLEL

from joblib import Parallel, delayed


n_jobs = 4
verbose = 3
results = Parallel(n_jobs=n_jobs, verbose=verbose)(
        delayed(save_step)(ifile, ipatch) for (ifile, ipatch) in enumerate(tqdm.tqdm(patcher))
    )

DATASET

def transforms(input):
    return normalize(input)


class NumpyDataReader(Dataset):
    def __init__(self, data_dir: str, ext: str=".npy", transforms: Optional[Callable]=None):
        self.data_dir = data_dir
        self.data_filenames = get_list_filenames(data_dir, ext)
        self.transforms = transforms

    def __getitem__(self, ind) -> np.ndarray:
        img_path = self.data_filenames[ind]
        x = load_numpy_data(img_path)
        if self.transforms is not None:
            x = self.transforms(x)
        return x

    def __len__(self):
        return len(self.data_filenames)

    def sample(self, n_samples):
        it = DataLoader(self, batch_size=1, shuffle=True, num_workers=4).__iter__()
        samples = []
        while len(samples) < n_samples:
            try:
                samples.append(next(it).detach().numpy())
            except Exception as ex:
                logging.error(str(ex))
                continue
        del it
        return np.concatenate(samples)

Model Training etc