Skip to content

Commit

Permalink
Merge pull request #5 from danielfromearth/develop
Browse files Browse the repository at this point in the history
sync feature branch
  • Loading branch information
danielfromearth authored Sep 6, 2023
2 parents ccc9175 + d993f3b commit d52a2a1
Show file tree
Hide file tree
Showing 7 changed files with 253 additions and 100 deletions.
4 changes: 4 additions & 0 deletions concatenator/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
"""Convenience variables used across the package."""

GROUP_DELIM = '__'
COORD_DELIM = " "
87 changes: 87 additions & 0 deletions concatenator/attribute_handling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
"""
attribute_handling.py
Functions for converting "coordinates" in netCDF variable attributes
between paths that reference a group hierarchy and flattened paths.
"""
import re

import netCDF4

from concatenator import COORD_DELIM, GROUP_DELIM


def regroup_coordinate_attribute(attribute_string: str) -> str:
"""
Examples
--------
>>> coord_att = "__Time_and_Position__time __Time_and_Position__instrument_fov_latitude __Time_and_Position__instrument_fov_longitude"
>>> _flatten_coordinate_attribute(coord_att)
Time_and_Position/time Time_and_Position/instrument_fov_latitude Time_and_Position/instrument_fov_longitude
Parameters
----------
attribute_string : str
Returns
-------
str
"""
# Use the separator that's in the attribute string only if all separators in the string are the same.
# Otherwise, we will use our own default separator.
whitespaces = re.findall(r'\s+', attribute_string)
if len(set(whitespaces)) <= 1:
new_sep = whitespaces[0]
else:
new_sep = COORD_DELIM

return new_sep.join(
'/'.join(c.split(GROUP_DELIM))[1:]
for c
in attribute_string.split() # split on any whitespace
)


def flatten_coordinate_attribute_paths(dataset: netCDF4.Dataset,
var: netCDF4.Variable,
variable_name: str) -> None:
"""Flatten the paths of variables referenced in the coordinates attribute."""
if 'coordinates' in var.ncattrs():
coord_att = var.getncattr('coordinates')

new_coord_att = _flatten_coordinate_attribute(coord_att)

dataset.variables[variable_name].setncattr('coordinates', new_coord_att)


def _flatten_coordinate_attribute(attribute_string: str) -> str:
"""Converts attributes that specify group membership via "/" to use new group delimiter, even for the root level.
Examples
--------
>>> coord_att = "Time_and_Position/time Time_and_Position/instrument_fov_latitude Time_and_Position/instrument_fov_longitude"
>>> _flatten_coordinate_attribute(coord_att)
__Time_and_Position__time __Time_and_Position__instrument_fov_latitude __Time_and_Position__instrument_fov_longitude
Parameters
----------
attribute_string : str
Returns
-------
str
"""
# Use the separator that's in the attribute string only if all separators in the string are the same.
# Otherwise, we will use our own default separator.
whitespaces = re.findall(r'\s+', attribute_string)
if len(set(whitespaces)) <= 1:
new_sep = whitespaces[0]
else:
new_sep = COORD_DELIM

# A new string is constructed.
return new_sep.join(
f'{GROUP_DELIM}{c.replace("/", GROUP_DELIM)}'
for c
in attribute_string.split() # split on any whitespace
)
34 changes: 24 additions & 10 deletions concatenator/bumblebee.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import netCDF4 as nc # type: ignore
import xarray as xr

from concatenator import GROUP_DELIM
from concatenator.dimension_cleanup import remove_duplicate_dims
from concatenator.file_ops import add_label_to_path
from concatenator.group_handling import (flatten_grouped_dataset,
regroup_flattened_dataset)
Expand All @@ -20,6 +22,7 @@ def bumblebee(files_to_concat: list[str],
output_file: str,
write_tmp_flat_concatenated: bool = False,
keep_tmp_files: bool = True,
concat_dim: str = "",
logger: Logger = default_logger) -> str:
"""Concatenate netCDF data files along an existing dimension.
Expand All @@ -28,6 +31,7 @@ def bumblebee(files_to_concat: list[str],
files_to_concat : list[str]
output_file : str
keep_tmp_files : bool
concat_dim : str, optional
logger : logging.Logger
Returns
Expand All @@ -46,30 +50,40 @@ def bumblebee(files_to_concat: list[str],
return ""

logger.debug("Flattening all input files...")
xrdataset_list = []
for i, filepath in enumerate(input_files):
# The group structure is flattened.
start_time = time.time()
logger.debug(" ..file %03d/%03d <%s>..", i + 1, num_input_files, filepath)
flat_dataset, coord_vars, string_vars = flatten_grouped_dataset(nc.Dataset(filepath, 'r'), filepath,
flat_dataset, coord_vars, _ = flatten_grouped_dataset(nc.Dataset(filepath, 'r'), filepath,
ensure_all_dims_are_coords=True)

flat_dataset = remove_duplicate_dims(flat_dataset)

xrds = xr.open_dataset(xr.backends.NetCDF4DataStore(flat_dataset),
decode_times=False, decode_coords=False, drop_variables=coord_vars)

benchmark_log['flattening'] = time.time() - start_time

# The flattened file is written to disk.
flat_file_path = add_label_to_path(filepath, label="_flat_intermediate")
xrds.to_netcdf(flat_file_path, encoding={v_name: {'dtype': 'str'} for v_name in string_vars})
intermediate_flat_filepaths.append(flat_file_path)
# flat_file_path = add_label_to_path(filepath, label="_flat_intermediate")
# xrds.to_netcdf(flat_file_path, encoding={v_name: {'dtype': 'str'} for v_name in string_vars})
# intermediate_flat_filepaths.append(flat_file_path)
# xrdataset_list.append(xr.open_dataset(flat_file_path))
xrdataset_list.append(xrds)

# Flattened files are concatenated together (Using XARRAY).
start_time = time.time()
logger.debug("Concatenating flattened files...")
combined_ds = xr.open_mfdataset(intermediate_flat_filepaths,
decode_times=False,
decode_coords=False,
data_vars='minimal',
coords='minimal',
compat='override')
# combined_ds = xr.open_mfdataset(intermediate_flat_filepaths,
# decode_times=False,
# decode_coords=False,
# data_vars='minimal',
# coords='minimal',
# compat='override')

combined_ds = xr.concat(xrdataset_list, dim=GROUP_DELIM + concat_dim, data_vars='minimal', coords='minimal')

benchmark_log['concatenating'] = time.time() - start_time

if write_tmp_flat_concatenated:
Expand Down
110 changes: 110 additions & 0 deletions concatenator/dimension_cleanup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
"""
dimension_cleanup.py
Functions for renaming duplicated dimension names for netCDF variables, so that xarray can handle the dataset.
"""
import collections

import netCDF4 as nc


def remove_duplicate_dims(nc_dataset: nc.Dataset) -> nc.Dataset:
"""
xarray cannot read netCDF4 datasets with duplicate dimensions.
Function goes through a dataset to catch any variables with duplicate dimensions.
creates an exact copy of the dimension duplicated with a new name. Variable
is reset with new dimensions without duplicates. Old variable deleted, new variable's name
is changed to the original name.
Notes
-----
Requires the dataset to be 'flat', i.e., with no groups and every variable at the root-level.
"""
dup_vars = {}
dup_new_varnames = []

for var_name, var in nc_dataset.variables.items():
dim_list = list(var.dimensions)
if len(set(dim_list)) != len(dim_list): # get true if var.dimensions has a duplicate
dup_vars[var_name] = var # populate dictionary with variables with vars with dup dims

for dup_var_name, dup_var in dup_vars.items():
dim_list = list(dup_var.dimensions) # original dimensions of the variable with duplicated dims

# Dimension(s) that are duplicated are retrieved.
# Note: this is not yet tested for more than one duplicated dimension.
dim_dup = [item for item, count in collections.Counter(dim_list).items() if count > 1][0]
dim_dup_length = dup_var.shape[dup_var.dimensions.index(dim_dup)] # length of the duplicated dimension

# New dimension and variable names are created.
dim_dup_new = dim_dup+'_1'
var_name_new = dup_var_name+'_1'
dup_new_varnames.append(var_name_new)

# The last dimension for the variable is replaced with the new name in a temporary list.
new_dim_list = dim_list[:-1]
new_dim_list.extend([dim_dup_new])

new_dup_var = {}

# Attributes for the original variable are retrieved.
attrs_contents = get_attributes_minus_fillvalue_and_renamed_coords(original_var_name=dup_var_name,
new_var_name=dim_dup_new,
original_dataset=nc_dataset)
# for attrname in dup_var.ncattrs():
# if attrname != '_FillValue':
# contents: str = nc_dataset.variables[dup_var_name].getncattr(attrname)
# if attrname == 'coordinates':
# contents.replace(dim_dup, dim_dup_new)
#
# attrs_contents[attrname] = contents

fill_value = dup_var._FillValue # pylint: disable=W0212

# Only create a new *Dimension* if it doesn't already exist.
if dim_dup_new not in nc_dataset.dimensions.keys():

# New dimension is created by copying from the duplicated dimension.
nc_dataset.createDimension(dim_dup_new, dim_dup_length)

# Only create a new dimension *Variable* if it existed originally in the NetCDF structure.
if dim_dup in nc_dataset.variables.keys():

# New variable object is created for the renamed, previously duplicated dimension.
new_dup_var[dim_dup_new] = nc_dataset.createVariable(dim_dup_new, nc_dataset.variables[dim_dup].dtype,
(dim_dup_new,), fill_value=fill_value)
dim_var_attr_contents = get_attributes_minus_fillvalue_and_renamed_coords(original_var_name=dim_dup,
new_var_name=dim_dup_new,
original_dataset=nc_dataset)
for attr_name, contents in dim_var_attr_contents.items():
new_dup_var[dim_dup_new].setncattr(attr_name, contents)

new_dup_var[dim_dup_new][:] = nc_dataset.variables[dim_dup][:]

# Delete existing Variable
del nc_dataset.variables[dup_var_name]

# Replace original *Variable* with new variable with no duplicated dimensions.
new_dup_var[dup_var_name] = nc_dataset.createVariable(dup_var_name, str(dup_var[:].dtype),
tuple(new_dim_list), fill_value=fill_value)
for attr_name, contents in attrs_contents.items():
new_dup_var[dup_var_name].setncattr(attr_name, contents)
new_dup_var[dup_var_name][:] = dup_var[:]

return nc_dataset


def get_attributes_minus_fillvalue_and_renamed_coords(original_var_name: str,
new_var_name: str,
original_dataset: nc.Dataset) -> dict:
"""Variable attributes are retrieved."""
attrs_contents = {}

for ncattr in original_dataset.variables[original_var_name].ncattrs():
if ncattr != '_FillValue':
contents: str = original_dataset.variables[original_var_name].getncattr(ncattr)
if ncattr == 'coordinates':
contents.replace(original_var_name, new_var_name)
attrs_contents[ncattr] = contents

return attrs_contents
Loading

0 comments on commit d52a2a1

Please sign in to comment.