Skip to content

Commit

Permalink
Add preprocessor #3
Browse files Browse the repository at this point in the history
  • Loading branch information
mattjbr123 committed Aug 27, 2024
1 parent 0a1cf08 commit b55a161
Showing 1 changed file with 40 additions and 1 deletion.
41 changes: 40 additions & 1 deletion scripts/convert_GEAR_beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
StoreToZarr,
ConsolidateDimensionCoordinates,
ConsolidateMetadata,
Indexed,
T,
)

startyear = 1990
Expand Down Expand Up @@ -44,9 +46,46 @@ def make_path(time):
if prune > 0:
pattern = pattern.prune(nkeep=prune)

# Add in our own custom Beam PTransform (Parallel Transform) to apply
# some preprocessing to the dataset. In this case to convert the
# 'bounds' variables to coordinate rather than data variables.

# They are implemented as subclasses of the beam.PTransform class
class DataVarToCoordVar(beam.PTransform):

# not sure why it needs to be a staticmethod
@staticmethod
# the preprocess function should take in and return an
# object of type Indexed[T]. These are pangeo-forge-recipes
# derived types, internal to the functioning of the
# pangeo-forge-recipes transforms.
# I think they consist of a list of 2-item tuples,
# each containing some type of 'index' and a 'chunk' of
# the dataset or a reference to it, as can be seen in
# the first line of the function below
def _datavar_to_coordvar(item: Indexed[T]) -> Indexed[T]:
index, ds = item
# do something to each ds chunk here
# and leave index untouched.
# Here we convert some of the variables in the file
# to coordinate variables so that pangeo-forge-recipes
# can process them
print(f'Preprocessing before {ds =}')
ds = ds.set_coords(['x_bnds', 'y_bnds', 'time_bnds', 'crs'])
print(f'Preprocessing after {ds =}')
return index, ds

# this expand function is a necessary part of
# developing your own Beam PTransforms, I think
# it wraps the above preprocess function and applies
# it to the PCollection, i.e. all the 'ds' chunks in Indexed
def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
return pcoll | beam.Map(self._datavar_to_coordvar)

recipe = (
beam.Create(pattern.items())
| OpenWithXarray(file_type=pattern.file_type, xarray_open_kwargs={'preprocess':lambda ds: ds.set_coords(['x_bnds', 'y_bnds', 'time_bnds', 'crs'])})
| OpenWithXarray(file_type=pattern.file_type)
| DataVarToCoordVar() # the preprocess
| StoreToZarr(
target_root=td,
store_name=tn,
Expand Down

0 comments on commit b55a161

Please sign in to comment.