-
Notifications
You must be signed in to change notification settings - Fork 1
PseudoCode ‐ Full Pipeline
Juan Emmanuel Johnson edited this page Feb 16, 2024
·
1 revision
This is a draft pipeline based on the pipeline we have in the miro board.
- GOES Download
- MODIS Downloader
- GOES Download Parameters
- MODIS Download Parameters
Note: the limiting frequency is MODIS and the limiting region is GOES. So, the parameters:
- MODIS freq --> freq
- GOES region --> region
The period is for both (determined by us)
# PARAMETERS
period: list[str] = ...
region: list[...] = ...
save_path: str = "path/to/file"
Then, we download MODIS
# download modis
modis_channels = list[...]
modis_satellite = ...
modis_save_path = save_path.joinpath("...")
modis_download_params = {...}
Options:
- Case I: Aligned --> choose the MODIS time stamps and use the
nearest
function fromgoes2go
- Case II: Partially Aligned --> Choose a goes image at a rough freq on MODIS, e.g., daily
# download goes
freq: list[...] = ...
goes_channels: list[...] = ...
goes_satellite: list[...] = ...
goes_save_path = save_path.joinpath("...")
# blah blah blah
goes_download_params = {...}
Save all of this to some filepath...
Parameters
- Pre-Select Region
- Choose Spatial Resolution (per satellite)
Operations
- Create Target Grid
- Harmonize DataStructures (per satellite)
- Unit Conversion
- CRS Transformation
- Resampling Transformation
First, choose the global parameters
region_of_interest: Geometry = ...
spatial_resolution_goes: Resolution = ...
spatial_resolution_modis: Resolution = ...
crs: CRSObject = ...
MODIS_ANALYSIS_READY_BUCKET: str = ...
GOES_ANALYSIS_READY_BUCKET: str = ...
Second, choose harmonized dataset structure
ds: xr.Dataset["Channels Latitude Longitude"] = ...
CREATE TARGET GRID
GOES_TARGET_GRID = create_target_grid(goes_bounds, goes_resolution, *kwargs)
MODIS_TARGET_GRID = create_target_grid(modis_bounds, modis_resolution, *kwargs)
GOES HARMONIZER
GOES_BAND_NAMES = {
'EV_250m....': "BAND_BLAH_1",
...
}
# preprocess stuff
def preprocess(...):
# calculate proper CRS transformation (radians [incoming angle] --> degrees [lat/lon])
ds = calculate_crs(ds, **params)
# make each band the same resolution (from our selection), [delta degrees]
ds = resample(ds, GOES_TARGET_GRID, **params)
return ds
# loop through each band
# aggregate the bands (C,H,W)
ds: xr.Dataset["channel lat lon"] = xr.open_mfdatasets(list_of_datasets, dim=["channels"], preprocess=preprocess)
# TODO: choose order of bands
# example: [16, lat, lon]
# save
ds.to_netcdf(GOES_ANALYSIS_READY_BUCKET)
MODIS HARMONIZER
Perform operations (per satellite)
MODIS_VARIABLES_NAMES = {
'EV_250m....': "BAND_BLAH_1",
...
}
# open dataset
ds = xr.open_dataset(...)
# select variables --> stack to channels
ds = ds.stack(MODIS_VARIABLES_NAMES)
# remap to target resolution
ds = resample(ds, MODIS_TARGET_GRID, **params)
# convert to W/m^2/sr/...
ds = convert_units(ds, **params)
# save
ds.to_netcdf(MODIS_ANALYSIS_READY_BUCKET)
Parameters
- Normalizer Function + Params
- Sub-Region
- Sub-Period
- Sub-Channels
- Patch-Size
- ML-Ready Bucket
Operators
- Create Normalizer
- Patcher
# Define Parameters
# Load Data / Load Data Files
# Choose Normalizer
normalizer: Normalizer = create_normalizer(*params)
# calculate normalizing parameters
normalizer.fit(...)
# Save Normalizer
save(normalizer, "path/to/file")
# Create Patches
# Save Patches
Normalization
# use XARRAY
# load dataset (NOTE - USE DASK)
ds_stats = xr.open_mfdataset(..., dim=["time", ...])
# calculate stats
mean = ds_stats.mean()
std = ds_stats.std()
# choose normalizer
normalizer: Normalizer = create_normalizer("name_of_normalizer", mean, std)
# save normalizer
save(normalizer, "path/to/file")
Patcher
Question: Datastructures
# load dataset
ds = ...
# Instantiate the patching logic for testing
patches = dict(longitude=30, latitude=30)
strides = dict(longitude=5, latitude=5)
patcher = XRDAPatcher(
da=ds,
patches=patches,
strides=strides, # Overlap
check_full_scan=True # check no extra dimensions
)
# save the patches
#### COPY_PASTED ####
save_base_path = Path(os.getcwd() + "/path/to/data")
save_base_path.mkdir(parents=True, exist_ok=True)
# CREATE STEP
def save_step(ifile, ipatch):
# transform patch
ipatch: np.ndarray = ipatch.values.astype(float32)
# save name
save_path = save_base_path.joinpath(f"patch_{ifile}.npy")
# save with numpy
np.save(save_path, ipatch)
# SAVE
for ifile, ipatch in enumerate(tqdm.tqdm(patcher)):
save_step(ifile, ipatch)
PARALLEL
from joblib import Parallel, delayed
n_jobs = 4
verbose = 3
results = Parallel(n_jobs=n_jobs, verbose=verbose)(
delayed(save_step)(ifile, ipatch) for (ifile, ipatch) in enumerate(tqdm.tqdm(patcher))
)
def transforms(input):
return normalize(input)
class NumpyDataReader(Dataset):
def __init__(self, data_dir: str, ext: str=".npy", transforms: Optional[Callable]=None):
self.data_dir = data_dir
self.data_filenames = get_list_filenames(data_dir, ext)
self.transforms = transforms
def __getitem__(self, ind) -> np.ndarray:
img_path = self.data_filenames[ind]
x = load_numpy_data(img_path)
if self.transforms is not None:
x = self.transforms(x)
return x
def __len__(self):
return len(self.data_filenames)
def sample(self, n_samples):
it = DataLoader(self, batch_size=1, shuffle=True, num_workers=4).__iter__()
samples = []
while len(samples) < n_samples:
try:
samples.append(next(it).detach().numpy())
except Exception as ex:
logging.error(str(ex))
continue
del it
return np.concatenate(samples)
This research is funded through a NASA 22-MDRAIT22-0018 award (No 80NSSC23K1045) and managed by Trillium Technologies Inc (trillium.tech).