From 1c7784426811f5b31ad41d1330926b7c58245bcd Mon Sep 17 00:00:00 2001 From: annajungbluth Date: Sun, 26 May 2024 22:38:39 +0000 Subject: [PATCH] fixed issue with msg geoprocessor --- .../geoprocessing/msg/geoprocessor_msg.py | 380 ++++++++---------- 1 file changed, 178 insertions(+), 202 deletions(-) diff --git a/rs_tools/_src/geoprocessing/msg/geoprocessor_msg.py b/rs_tools/_src/geoprocessing/msg/geoprocessor_msg.py index a6c4095..45ff5f5 100644 --- a/rs_tools/_src/geoprocessing/msg/geoprocessor_msg.py +++ b/rs_tools/_src/geoprocessing/msg/geoprocessor_msg.py @@ -1,4 +1,5 @@ import autoroot +from dotenv import load_dotenv import os import numpy as np import rioxarray @@ -8,26 +9,53 @@ from tqdm import tqdm from rs_tools._src.utils.io import get_list_filenames import typer +import pygrib from loguru import logger import xarray as xr +import datetime +from satpy import Scene from rs_tools._src.geoprocessing.interp import resample_rioxarray -from rs_tools._src.geoprocessing.goes import parse_goes16_dates_from_file, format_goes_dates -from rs_tools._src.geoprocessing.goes.validation import correct_goes16_bands, correct_goes16_satheight -from rs_tools._src.geoprocessing.goes.reproject import add_goes16_crs +from rs_tools._src.geoprocessing.msg.reproject import add_msg_crs from rs_tools._src.geoprocessing.reproject import convert_lat_lon_to_x_y, calc_latlon from rs_tools._src.geoprocessing.utils import check_sat_FOV +from rs_tools._src.geoprocessing.match import match_timestamps +from rs_tools._src.geoprocessing.msg import MSG_WAVELENGTHS import pandas as pd +from datetime import datetime +from functools import partial import dask import warnings +load_dotenv() + dask.config.set(**{'array.slicing.split_large_chunks': False}) warnings.filterwarnings('ignore', category=FutureWarning) +warnings.filterwarnings('ignore', category=UserWarning) + +from datetime import datetime +from pathlib import Path # TODO: Add unit conversion? + +def parse_msg_dates_from_file(file: str): + """ + Parses the date and time information from a MSG file name. + + Args: + file (str): The file name to parse. + + Returns: + str: The parsed date and time in the format 'YYYYJJJHHMM'. + """ + timestamp = Path(file).name.split("-")[-2] + timestamp = timestamp.split(".")[0] + return timestamp + + @dataclass -class GOES16GeoProcessing: +class MSGGeoProcessing: """ - A class for geoprocessing GOES-16 data. + A class for geoprocessing MSG data. Attributes: resolution (float): The resolution in meters. @@ -37,7 +65,7 @@ class GOES16GeoProcessing: resample_method (str): The resampling method to use. Methods: - goes_files(self) -> List[str]: Returns a list of all GOES files in the read path. + msg_files(self) -> List[str]: Returns a list of all MSG files in the read path. preprocess_fn(self, ds: xr.Dataset) -> Tuple[xr.Dataset, xr.Dataset]: Preprocesses the input dataset by applying corrections, subsetting, and resampling. preprocess_fn_radiances(self, ds: xr.Dataset) -> xr.Dataset: Preprocesses the input dataset for radiances. preprocess_fn_cloudmask(self, ds: xr.Dataset) -> xr.Dataset: Preprocesses the input dataset for cloud mask. @@ -52,16 +80,18 @@ class GOES16GeoProcessing: resample_method: str @property - def goes_files(self) -> List[str]: + def msg_files(self) -> List[str]: """ - Returns a list of all GOES files in the read path. + Returns a list of all MSG files in the read path. Returns: List[str]: A list of file paths. """ - # get a list of all GOES files from specified path - files = get_list_filenames(self.read_path, ".nc") - return files + # get a list of all MSG radiance files from specified path + files_radiances = get_list_filenames(self.read_path, ".nat") + # get a list of all MSG cloud mask files from specified path + files_cloudmask = get_list_filenames(self.read_path, ".grb") + return files_radiances, files_cloudmask def preprocess_fn(self, ds: xr.Dataset) -> Tuple[xr.Dataset, xr.Dataset]: """ @@ -73,29 +103,19 @@ def preprocess_fn(self, ds: xr.Dataset) -> Tuple[xr.Dataset, xr.Dataset]: Returns: Tuple[xr.Dataset, xr.Dataset]: The preprocessed dataset and the original dataset. """ - # logger.debug('copying ds') - # # copy to avoid modifying original dataset - # ds = ds.copy() - - logger.debug('correcting satheight') - # convert measurement angles to horizontal distance in meters - ds = correct_goes16_satheight(ds) - try: - # correct band coordinates to reorganize xarray dataset - ds = correct_goes16_bands(ds) - except AttributeError: - pass + # copy to avoid modifying original dataset + ds = ds.copy() + # assign coordinate reference system - ds = add_goes16_crs(ds) + ds = add_msg_crs(ds) - logger.debug('region subsetting') if self.region is not None: logger.info(f"Subsetting data to region: {self.region}") # subset data lon_bnds = (self.region[0], self.region[2]) lat_bnds = (self.region[1], self.region[3]) # convert lat lon bounds to x y (in meters) - x_bnds, y_bnds = convert_lat_lon_to_x_y(ds.FOV.crs, lon=lon_bnds, lat=lat_bnds, ) + x_bnds, y_bnds = convert_lat_lon_to_x_y(ds.rio.crs, lon=lon_bnds, lat=lat_bnds, ) # check that region is within the satellite field of view # compile satellite FOV satellite_FOV = (min(ds.x.values), min(ds.y.values), max(ds.x.values), max(ds.y.values)) @@ -114,88 +134,104 @@ def preprocess_fn(self, ds: xr.Dataset) -> Tuple[xr.Dataset, xr.Dataset]: logger.info(f"Resampling data to resolution: {self.resolution} m") # resampling ds_subset = resample_rioxarray(ds_subset, resolution=(self.resolution, self.resolution), method=self.resample_method) - - logger.debug('assigning lat lon') + # assign coordinates ds_subset = calc_latlon(ds_subset) - logger.debug('assigned lat lon, deleting ds') - del ds - logger.debug('returning ds_subset') - return ds_subset #, ds - def preprocess_fn_radiances(self, ds: xr.Dataset) -> xr.Dataset: + return ds_subset, ds + + def preprocess_fn_radiances(self, file: List[str], cloud_mask: np.array) -> xr.Dataset: """ - Preprocesses the GOES16 radiance dataset. + Preprocesses the MSG radiance dataset. Args: - ds (xr.Dataset): The input dataset. + file (List[str]): The input file. Returns: xr.Dataset: The preprocessed dataset. """ - variables = ["Rad", "DQF"] # "Rad" = radiance, "DQF" = data quality flag - - # TODO comment - time_stamp = pd.to_datetime(ds.t.values) - band_attributes = ds.band.attrs - band_wavelength_attributes = ds.band_wavelength.attrs - band_wavelength_values = ds.band_wavelength.values - band_values = ds.band.values - - # do core preprocess function (e.g. to correct band coordinates, subset data, resample, etc.) - ds_subset = self.preprocess_fn(ds) - - # select relevant variables - ds_subset = ds_subset[variables] - # convert measurement time (in seconds) to datetime - # time_stamp = pd.to_datetime(ds.t.values) - time_stamp = time_stamp.strftime("%Y-%m-%d %H:%M") - # assign bands data to each variable - ds_subset[variables] = ds_subset[variables].expand_dims({"band": band_values}) - # attach time coordinate - ds_subset = ds_subset.assign_coords({"time": [time_stamp]}) + + # Load file using satpy scenes + scn = Scene( + reader="seviri_l1b_native", + filenames=file + ) + # Load radiance bands + channels = [x for x in scn.available_dataset_names() if x!='HRV'] + assert len(channels) == 11, "Number of channels is not 11" + + scn.load(channels, generate=False, calibration='radiance') + + # change to xarray data + ds = scn.to_xarray() + + # attach cloud mask as data variable before preprocessing + ds = ds.assign(cloud_mask=(("y", "x"), cloud_mask)) + + # reset coordinates for resampling/reprojecting + # this drops all {channel}_acq_time coordinates + ds = ds.reset_coords(drop=True) + + # do core preprocess function (e.g. resample, add crs etc.) + ds_subset, ds = self.preprocess_fn(ds) + + # Store the attributes in a dict before concatenation + attrs_dict = {x: ds_subset[x].attrs for x in channels} + + # concatinate in new band dimension + # NOTE: Concatination overwrites attrs of bands. + ds_subset = ds_subset.assign(Rad=xr.concat(list(map(lambda x: ds_subset[x], channels)), dim="band")) + # rename band dimensions + ds_subset = ds_subset.assign_coords(band=list(map(lambda x: x, channels))) + + # re-index coordinates + ds_subset = ds_subset.set_coords(['latitude', 'longitude', 'cloud_mask']) + # drop variables that will no longer be needed - ds_subset = ds_subset.drop_vars(["t", "y_image", "x_image", "goes_imager_projection"]) - # assign band attributes to dataset - ds_subset.band.attrs = band_attributes - # TODO: Correct wavelength assignment. This attaches 16 wavelengths to each band. - # assign band wavelength to each variable - ds_subset = ds_subset.assign_coords({"band_wavelength": band_wavelength_values}) - ds_subset.band_wavelength.attrs = band_wavelength_attributes - logger.debug('finished preprocess fn radiances') + ds_subset = ds_subset.drop(list(map(lambda x: x, channels))) + + # extract measurement time + time_stamp = attrs_dict[list(attrs_dict.keys())[0]]['start_time'] + # assign bands and time data to each variable + ds_subset = ds_subset.assign_coords({"time": [time_stamp]}) + + # NOTE: Keep only certain relevant attributes + ds_subset.attrs = {} + ds_subset.attrs = dict( + calibration=attrs_dict[list(attrs_dict.keys())[0]]["calibration"], + standard_name=attrs_dict[list(attrs_dict.keys())[0]]["standard_name"], + platform_name=attrs_dict[list(attrs_dict.keys())[0]]["platform_name"], + sensor=attrs_dict[list(attrs_dict.keys())[0]]["sensor"], + units=attrs_dict[list(attrs_dict.keys())[0]]["units"], + orbital_parameters=attrs_dict[list(attrs_dict.keys())[0]]["orbital_parameters"] + ) + + # TODO: Correct wavelength assignment. This attaches 36++ wavelengths to each band. + # assign band wavelengths + ds_subset = ds_subset.assign_coords({"band_wavelength": list(MSG_WAVELENGTHS.values())}) + return ds_subset - def preprocess_fn_cloudmask(self, ds: xr.Dataset) -> xr.Dataset: + def preprocess_fn_cloudmask(self, file: List[str]) -> np.array: """ - Preprocesses the input dataset for GOES16 cloud masks. + Preprocesses the input dataset for MSG cloud masks. Args: - ds (xr.Dataset): The input dataset. + file (List[str]): The input file. Returns: - xr.Dataset: The preprocessed dataset. + np.array: The preprocessed cloud mask dataset. """ - variables = ["BCM"] - - time_stamp = pd.to_datetime(ds.t.values) - # do core preprocess function - ds_subset = self.preprocess_fn(ds) + grbs = pygrib.open(file[0]) + # Loop over all messages in the GRIB file + for grb in grbs: + if grb.name == 'Cloud mask': + # Extract values from grb and return np.array + cloud_mask = grb.values + return cloud_mask - # select relevant variable - ds_subset = ds_subset[variables] - # convert measurement time (in seconds) to datetime - # logger.debug(f'{ds.t.values} --- {ds_subset.t.values}') - # time_stamp = pd.to_datetime(ds.t.values) - time_stamp = time_stamp.strftime("%Y-%m-%d %H:%M") - # assign time data to variable - ds_subset = ds_subset.assign_coords({"time": [time_stamp]}) - # drop variables that will no longer be needed - ds_subset = ds_subset.drop_vars(["t", "y_image", "x_image", "goes_imager_projection"]) - - return ds_subset - - def preprocess_radiances(self, files: List[str]) -> xr.Dataset: + def preprocess_radiances(self, files: List[str], cloud_mask: np.array) -> xr.Dataset: """ Preprocesses radiances from the input files. @@ -206,60 +242,14 @@ def preprocess_radiances(self, files: List[str]) -> xr.Dataset: xr.Dataset: The preprocessed dataset. """ # Check that all files contain radiance data - files = list(filter(lambda x: "Rad" in x, files)) - - # Check that all 16 bands are present - logger.info(f"Number of radiance files: {len(files)}") - assert len(files) == 16 - - # ds_list = [] - for i, ifile in enumerate(files): - logger.debug(f"Loading file {i}: {ifile}") - # ds_file = xr.load_dataset(ifile, engine='h5netcdf')#, preprocess=self.preprocess_fn_radiances, concat_dim="band", combine="nested") - with xr.load_dataset(ifile, engine='h5netcdf') as ds_file: - ds_file = self.preprocess_fn_radiances(ds_file) - if i == 0: - # ds_list = [ds_file] - ds = ds_file - else: - logger.debug("Starting interpolation {i}...") - # reinterpolate to match coordinates of the first image - ds_file = ds_file.interp(x=ds.x, y=ds.y) - logger.debug("appending ds {i}...") - # ds_list.append(ds_file) - ds = xr.concat([ds, ds_file], dim="band") - del ds_file - # concatenate in new band dimension - # ds = xr.concat(ds_list, dim="band") - - - # # open multiple files as a single dataset - # ds = [xr.open_mfdataset(ifile, preprocess=self.preprocess_fn_radiances, concat_dim="band", combine="nested") for - # ifile in files] - - # logger.debug("Starting interpolation...") - # # reinterpolate to match coordinates of the first image - # ds = [ds[0]] + [ids.interp(x=ds[0].x, y=ds[0].y) for ids in ds[1:]] - # logger.debug("Starting concatenation...") - # # concatenate in new band dimension - # ds = xr.concat(ds, dim="band") - - logger.debug("Further processing...") - # Correct latitude longitude assignment after multiprocessing - ds['latitude'] = ds.latitude.isel(band=0) - ds['longitude'] = ds.longitude.isel(band=0) + file = list(filter(lambda x: ".nat" in x, files)) - # NOTE: Keep only certain relevant attributes - attrs_rad = ds["Rad"].attrs + # Check that only one file is selected + logger.info(f"Number of radiance files: {len(file)}") + assert len(file) == 1 - ds["Rad"].attrs = {} - ds["Rad"].attrs = dict( - long_name=attrs_rad["long_name"], - standard_name=attrs_rad["standard_name"], - units=attrs_rad["units"], - ) - ds["DQF"].attrs = {} - logger.debug("Finished preprocess radiances") + # load file using satpy, convert to xarray dataset, and preprocess + ds = self.preprocess_fn_radiances(file, cloud_mask=cloud_mask) return ds @@ -274,25 +264,14 @@ def preprocess_cloud_mask(self, files: List[str]) -> xr.Dataset: xr.Dataset: The preprocessed dataset. """ # Check that all files contain cloud mask data - files = list(filter(lambda x: "ACMF" in x, files)) + file = list(filter(lambda x: "CLMK" in x, files)) # Check that only one file is present - logger.info(f"Number of cloud mask files: {len(files)}") - assert len(files) == 1 + logger.info(f"Number of cloud mask files: {len(file)}") + assert len(file) == 1 - # open multiple files as a single dataset - ds = xr.open_mfdataset(files[0]) - ds = self.preprocess_fn_cloudmask(ds) - - # NOTE: Keep only certain relevant attributes - attrs_bcm = ds["BCM"].attrs - ds = ds.rename({"BCM": "cloud_mask"}) - ds["cloud_mask"].attrs = {} - ds["cloud_mask"].attrs = dict( - long_name=attrs_bcm["long_name"], - standard_name=attrs_bcm["standard_name"], - units=attrs_bcm["units"], - ) + # load file using satpy, convert to xarray dataset, and preprocess + ds = self.preprocess_fn_cloudmask(file) return ds @@ -301,60 +280,58 @@ def preprocess_files(self): Preprocesses multiple files in read path and saves processed files to save path. """ # get unique times from read path - unique_times = list(set(map(parse_goes16_dates_from_file, self.goes_files))) + files_radiances, files_cloudmask = self.msg_files + unique_times_radiances = list(set(map(parse_msg_dates_from_file, files_radiances))) + unique_times_cloudmask = list(set(map(parse_msg_dates_from_file, files_cloudmask))) + + df_matches = match_timestamps(unique_times_radiances, unique_times_cloudmask, cutoff=15) - pbar_time = tqdm(unique_times) + pbar_time = tqdm(df_matches["timestamps_data"].values) for itime in pbar_time: pbar_time.set_description(f"Processing: {itime}") - # get files from unique times - files = list(filter(lambda x: itime in x, self.goes_files)) - logger.debug(f"Loading radiances ...") + # get cloud mask file for specific time + itime_cloud = df_matches.loc[df_matches["timestamps_data"] == itime, "timestamps_cloudmask"].values[0] + files_cloud = list(filter(lambda x: itime_cloud in x, files_cloudmask)) + try: - # load radiances - ds = self.preprocess_radiances(files) + # load cloud mask + cloud_mask = self.preprocess_cloud_mask(files_cloud) except AssertionError: - logger.error(f"Skipping {itime} due to missing bands") + logger.error(f"Skipping {itime} due to missing cloud mask") continue + + # get data files for specific time + files = list(filter(lambda x: itime in x, files_radiances)) + try: - # load cloud mask - ds_clouds = self.preprocess_cloud_mask(files)["cloud_mask"] + # load radiances and attach cloud mask + ds = self.preprocess_radiances(files, cloud_mask=cloud_mask) except AssertionError: - logger.error(f"Skipping {itime} due to missing cloud mask") + logger.error(f"Skipping {itime} due to error loading") continue - pbar_time.set_description(f"Loaded data...") - # interpolate cloud mask to data - # fill in zeros for all nan values - ds_clouds = ds_clouds.fillna(0) - # NOTE: Interpolation changes values from integers to floats - # NOTE: This is fixed through rounding - ds_clouds = ds_clouds.interp(x=ds.x, y=ds.y) - ds_clouds = ds_clouds.round() - - # save cloud mask as data coordinate - ds = ds.assign_coords({"cloud_mask": (("y", "x"), ds_clouds.values.squeeze())}) - ds["cloud_mask"].attrs = ds_clouds.attrs - - logger.debug("Attempts saving to file...") + + # remove crs from dataset + ds = ds.drop_vars('msg_seviri_fes_3km') + + # remove attrs that cause netcdf error + for var in ds.data_vars: + ds[var].attrs.pop('grid_mapping', None) + # check if save path exists, and create if not if not os.path.exists(self.save_path): os.makedirs(self.save_path) - - + # remove file if it already exists - itime_name = format_goes_dates(itime) - save_filename = Path(self.save_path).joinpath(f"{itime_name}_goes16.nc") + save_filename = Path(self.save_path).joinpath(f"{itime}_msg.nc") if os.path.exists(save_filename): logger.info(f"File already exists. Overwriting file: {save_filename}") os.remove(save_filename) + # save to netcdf - pbar_time.set_description(f"Saving to file...:{save_filename}") - # TODO: Add "metrics" for printing (e.g., filesize) ds.to_netcdf(save_filename, engine="netcdf4") - logger.debug("Saved to file.") - del ds def geoprocess( resolution: float = None, # defined in meters @@ -364,7 +341,7 @@ def geoprocess( resample_method: str = "bilinear", ): """ - Geoprocesses GOES 16 files + Geoprocesses MSG files Args: resolution (float, optional): The resolution in meters to resample data to. Defaults to None. @@ -376,15 +353,13 @@ def geoprocess( Returns: None """ - print("READ PATH!!!:", read_path) - print("SAVE PATH!!!:", save_path) - # Initialize GOES 16 GeoProcessor - logger.info(f"Initializing GOES16 GeoProcessor...") + # Initialize MSG GeoProcessor + logger.info(f"Initializing MSG GeoProcessor...") # Extracting region from str if region is not None: region = tuple(map(lambda x: int(x), region.split(" "))) - goes16_geoprocessor = GOES16GeoProcessing( + msg_geoprocessor = MSGGeoProcessing( resolution=resolution, read_path=read_path, save_path=save_path, @@ -392,9 +367,9 @@ def geoprocess( resample_method=resample_method ) logger.info(f"GeoProcessing Files...") - goes16_geoprocessor.preprocess_files() + msg_geoprocessor.preprocess_files() - logger.info(f"Finished GOES 16 GeoProcessing Script...!") + logger.info(f"Finished MSG GeoProcessing Script...!") if __name__ == '__main__': @@ -402,14 +377,15 @@ def geoprocess( # ========================= # Test Cases # ========================= - python geoprocessor_goes16.py --read-path "/home/data" --save-path /home/data/goes/geoprocessed - python geoprocessor_goes16.py --read-path "/home/data" --save-path /home/data/goes/geoprocessed --resolution 1000 - python geoprocessor_goes16.py --read-path "/home/data" --save-path /home/data/goes/geoprocessed --resolution 5000 - python geoprocessor_goes16.py --read-path "/home/data" --save-path /home/data/goes/geoprocessed --resolution 5000 --region -130 -15 -90 5 - + python geoprocessor_msg.py --read-path "/home/data" --save-path /home/data/msg/geoprocessed + python geoprocessor_msg.py --read-path "/home/data" --save-path /home/data/msg/geoprocessed --resolution 5000 + python geoprocessor_msg.py --read-path "/home/data" --save-path /home/data/msg/geoprocessed --resolution 10000 + python geoprocessor_msg.py --read-path "/home/data" --save-path /home/data/msg/geoprocessed --region (-10, -10, 10, 10) + python geoprocessor_msg.py --read-path "/home/data" --save-path /home/data/msg/geoprocessed --resolution 2000 --region (-10, -10, 10, 10) + # ========================= # FAILURE TEST CASES # ========================= - python geoprocessor_goes16.py --read-path "/home/data" --save-path /home/data/goes/geoprocessed --resolution 5000 --region -200 -15 90 5 + python geoprocessor_msg.py --read-path "/home/data" --save-path /home/data/msg/geoprocessed --resolution 2000 --region (-100, -10, -90, 10) """ typer.run(geoprocess)