Skip to content

Commit

Permalink
Update bucket code
Browse files Browse the repository at this point in the history
  • Loading branch information
ghiggi committed Aug 15, 2023
1 parent 9c7dc01 commit c1f2392
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 1 deletion.
10 changes: 9 additions & 1 deletion gpm_api/bucket/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,17 @@ def _check_is_callable_or_none(argument, argument_name):
raise TypeError(f"{argument_name} must be a function (or None).")


def convert_ds_to_df(ds, preprocessing_function, ds_to_df_function, filtering_function):
def convert_ds_to_df(
ds, preprocessing_function, ds_to_df_function, filtering_function, precompute_granule=False
):
# Check inputs
_check_is_callable_or_none(preprocessing_function, argument_name="preprocessing_function")
_check_is_callable_or_none(ds_to_df_function, argument_name="ds_to_df_function")
_check_is_callable_or_none(filtering_function, argument_name="filtering_function")

if precompute_granule:
ds = ds.compute()

# Preprocess xarray Dataset
if callable(preprocessing_function):
ds = preprocessing_function(ds)
Expand All @@ -150,15 +155,18 @@ def get_granule_dataframe(
preprocessing_function=None,
ds_to_df_function=ds_to_dask_df_function,
filtering_function=None,
precompute_granule=False,
):
# Open granule
ds = gpm_api.open_granule(fpath, **open_granule_kwargs)

# Convert to dataframe
df = convert_ds_to_df(
ds=ds,
preprocessing_function=preprocessing_function,
ds_to_df_function=ds_to_df_function,
filtering_function=filtering_function,
precompute_granule=precompute_granule,
)

return df
Expand Down
2 changes: 2 additions & 0 deletions gpm_api/bucket/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def write_granule_bucket(
preprocessing_function=None,
ds_to_df_function=ds_to_pd_df_function,
filtering_function=None,
precompute_granule=True,
# Partitioning arguments
xbin_size=15,
ybin_size=15,
Expand All @@ -59,6 +60,7 @@ def write_granule_bucket(
preprocessing_function=preprocessing_function,
ds_to_df_function=ds_to_df_function,
filtering_function=filtering_function,
precompute_granule=precompute_granule,
)

# Define partitioning columns names
Expand Down

0 comments on commit c1f2392

Please sign in to comment.