Skip to content

Commit

Permalink
replace accidentally removed stitchee and run_stitchee.py modules
Browse files Browse the repository at this point in the history
  • Loading branch information
danielfromearth committed Sep 13, 2023
1 parent 3ead854 commit 907484f
Show file tree
Hide file tree
Showing 2 changed files with 310 additions and 0 deletions.
164 changes: 164 additions & 0 deletions concatenator/run_stitchee.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
"""A simple CLI wrapper around the main concatenation process."""
import logging
import os
import shutil
import sys
import uuid
from argparse import ArgumentParser
from pathlib import Path
from typing import Tuple, Union

from concatenator.file_ops import add_label_to_path
from concatenator.stitchee import stitchee


def parse_args(args: list) -> Tuple[list[str], str, str, bool, Union[str, None]]:
"""
Parse args for this script.
Returns
-------
tuple
"""
parser = ArgumentParser(
prog='stitchee',
description='Run the along-existing-dimension concatenator.')

# Required arguments
req_grp = parser.add_argument_group(title='Required')
req_grp.add_argument(
'input',
metavar='path/directory or path list',
nargs='+',
help='Files to be concatenated, specified via a '
'(1) single directory containing the files to be concatenated, '
'(2) single text file containing linebreak-separated paths of the files to be concatenated, '
'or (3) multiple filepaths of the files to be concatenated.')
req_grp.add_argument(
'-o',
'--output_path',
metavar='output_path',
required=True,
help='The output filename for the merged output.')

# Optional arguments
parser.add_argument(
'--concat_dim',
metavar='concat_dim',
nargs=1,
help='Dimension to concatenate along, if possible.')
parser.add_argument(
'--make_dir_copy',
action='store_true',
help='Make a duplicate of the input directory to avoid modification of input files. '
'This is useful for testing, but uses more disk space.')
parser.add_argument(
'--keep_tmp_files',
action='store_true',
help="Prevents removal, after successful execution, of "
"(1) the flattened concatenated file and "
"(2) the input directory copy if created by '--make_dir_copy'.")
parser.add_argument(
'-O', '--overwrite',
action='store_true',
help='Overwrite output file if it already exists.')
parser.add_argument(
'-v', '--verbose',
help='Enable verbose output to stdout; useful for debugging',
action='store_true'
)

parsed = parser.parse_args(args)

if parsed.verbose:
logging.basicConfig(level=logging.DEBUG)

# The output file path is validated.
output_path = Path(parsed.output_path).resolve()
if output_path.is_file(): # the file already exists
if parsed.overwrite:
os.remove(output_path)
else:
raise FileExistsError(f"File already exists at <{output_path}>. Run again with option '-O' to overwrite.")

# The input directory or file is validated.
print(f"parsed_input === {parsed.input}")
if len(parsed.input) > 1:
input_files = parsed.input
elif len(parsed.input) == 1:
directory_or_path = Path(parsed.input[0]).resolve()
if directory_or_path.is_dir():
input_files = _get_list_of_filepaths_from_dir(directory_or_path)
elif directory_or_path.is_file():
input_files = _get_list_of_filepaths_from_file(directory_or_path)
else:
raise TypeError("if one path is provided for 'data_dir_or_file_or_filepaths', "
"then it must be an existing directory or file.")
else:
raise TypeError("input argument must be one path/directory or a list of paths.")

# If requested, make a temporary directory with copies of the original input files
temporary_dir_to_remove = None
if parsed.make_dir_copy:
new_data_dir = Path(add_label_to_path(str(output_path.parent / "temp_copy"), label=str(uuid.uuid4()))).resolve()
os.makedirs(new_data_dir, exist_ok=True)
print('Created temporary directory: %s', str(new_data_dir))

new_input_files = []
for file in input_files:
new_path = new_data_dir / Path(file).name
shutil.copyfile(file, new_path)
new_input_files.append(str(new_path))

input_files = new_input_files
print('Copied files to temporary directory: %s', new_data_dir)
temporary_dir_to_remove = str(new_data_dir)

return input_files, str(output_path), parsed.concat_dim[0], bool(parsed.keep_tmp_files), temporary_dir_to_remove


def _get_list_of_filepaths_from_file(file_with_paths: Path):
# Each path listed in the specified file is resolved using pathlib for validation.
paths_list = []
with open(file_with_paths, encoding="utf-8") as file:
while line := file.readline():
paths_list.append(str(Path(line.rstrip()).resolve()))

return paths_list

def _get_list_of_filepaths_from_dir(data_dir: Path):
# Get list of files (ignoring hidden files) in directory.
input_files = [str(f) for f in data_dir.iterdir() if not f.name.startswith(".")]
return input_files


def run_stitchee(args: list) -> None:
"""
Parse arguments and run subsetter on the specified input file
"""
input_files, output_path, concat_dim, keep_tmp_files, temporary_dir_to_remove = parse_args(args)
num_inputs = len(input_files)

logging.info('Executing stitchee concatenation on %d files...', num_inputs)
stitchee(input_files, output_path,
write_tmp_flat_concatenated=keep_tmp_files,
keep_tmp_files=keep_tmp_files,
concat_dim=concat_dim)
logging.info('STITCHEE complete. Result in %s', output_path)

if not keep_tmp_files and temporary_dir_to_remove:
shutil.rmtree(temporary_dir_to_remove)


def main() -> None:
"""Entry point to the script"""
logging.basicConfig(
stream=sys.stdout,
format='[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s',
level=logging.DEBUG
)
run_stitchee(sys.argv[1:])


if __name__ == '__main__':
main()
146 changes: 146 additions & 0 deletions concatenator/stitchee.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
"""Concatenation service that appends data along an existing dimension, using netCDF4 and xarray."""

import logging
import os
import time
from logging import Logger
from typing import Tuple, Union

import netCDF4 as nc # type: ignore
import xarray as xr

from concatenator import GROUP_DELIM
from concatenator.dimension_cleanup import remove_duplicate_dims
from concatenator.file_ops import add_label_to_path
from concatenator.group_handling import (flatten_grouped_dataset,
regroup_flattened_dataset)

default_logger = logging.getLogger(__name__)


def stitchee(files_to_concat: list[str],
output_file: str,
write_tmp_flat_concatenated: bool = False,
keep_tmp_files: bool = True,
concat_dim: str = "",
logger: Logger = default_logger) -> str:
"""Concatenate netCDF data files along an existing dimension.
Parameters
----------
files_to_concat : list[str]
output_file : str
keep_tmp_files : bool
concat_dim : str, optional
logger : logging.Logger
Returns
-------
str
"""
intermediate_flat_filepaths: list[str] = []
benchmark_log = {"flattening": 0.0, "concatenating": 0.0, "reconstructing_groups": 0.0}

# Proceed to concatenate only files that are workable (can be opened and are not empty).
input_files, num_input_files = _validate_workable_files(files_to_concat, logger)

# Exit cleanly if no workable netCDF files found.
if num_input_files < 1:
logger.info("No non-empty netCDF files found. Exiting.")
return ""

logger.debug("Flattening all input files...")
xrdataset_list = []
for i, filepath in enumerate(input_files):
# The group structure is flattened.
start_time = time.time()
logger.debug(" ..file %03d/%03d <%s>..", i + 1, num_input_files, filepath)
flat_dataset, coord_vars, _ = flatten_grouped_dataset(nc.Dataset(filepath, 'r'), filepath,
ensure_all_dims_are_coords=True)

flat_dataset = remove_duplicate_dims(flat_dataset)

xrds = xr.open_dataset(xr.backends.NetCDF4DataStore(flat_dataset),
decode_times=False, decode_coords=False, drop_variables=coord_vars)

benchmark_log['flattening'] = time.time() - start_time

# The flattened file is written to disk.
# flat_file_path = add_label_to_path(filepath, label="_flat_intermediate")
# xrds.to_netcdf(flat_file_path, encoding={v_name: {'dtype': 'str'} for v_name in string_vars})
# intermediate_flat_filepaths.append(flat_file_path)
# xrdataset_list.append(xr.open_dataset(flat_file_path))
xrdataset_list.append(xrds)

# Flattened files are concatenated together (Using XARRAY).
start_time = time.time()
logger.debug("Concatenating flattened files...")
# combined_ds = xr.open_mfdataset(intermediate_flat_filepaths,
# decode_times=False,
# decode_coords=False,
# data_vars='minimal',
# coords='minimal',
# compat='override')

combined_ds = xr.concat(xrdataset_list, dim=GROUP_DELIM + concat_dim, data_vars='minimal', coords='minimal')

benchmark_log['concatenating'] = time.time() - start_time

if write_tmp_flat_concatenated:
logger.debug("Writing concatenated flattened temporary file to disk...")
# Concatenated, yet still flat, file is written to disk for debugging.
tmp_flat_concatenated_path = add_label_to_path(output_file, label="_flat_intermediate")
combined_ds.to_netcdf(tmp_flat_concatenated_path, format="NETCDF4")
else:
tmp_flat_concatenated_path = None

# The group hierarchy of the concatenated file is reconstructed (using XARRAY).
start_time = time.time()
logger.debug("Reconstructing groups within concatenated file...")
regroup_flattened_dataset(combined_ds, output_file)
benchmark_log['reconstructing_groups'] = time.time() - start_time

logger.info("--- Benchmark results ---")
total_time = 0.0
for k, v in benchmark_log.items():
logger.info("%s: %f", k, v)
total_time += v
logger.info("-- total processing time: %f", total_time)

# If requested, remove temporary intermediate files.
if not keep_tmp_files:
for file in intermediate_flat_filepaths:
os.remove(file)
if tmp_flat_concatenated_path:
os.remove(tmp_flat_concatenated_path)

return output_file


def _validate_workable_files(files_to_concat, logger) -> Tuple[list[str], int]:
"""Remove files from list that are not open-able as netCDF or that are empty."""
workable_files = []
for file in files_to_concat:
try:
with nc.Dataset(file, 'r') as dataset:
is_empty = _is_file_empty(dataset)
if is_empty is False:
workable_files.append(file)
except OSError:
logger.debug("Error opening <%s> as a netCDF dataset. Skipping.", file)

number_of_workable_files = len(workable_files)

return workable_files, number_of_workable_files


def _is_file_empty(parent_group: Union[nc.Dataset, nc.Group]) -> bool:
"""
Function to test if a all variable size in a dataset is 0
"""
for var in parent_group.variables.values():
if var.size != 0:
return False
for child_group in parent_group.groups.values():
return _is_file_empty(child_group)
return True

0 comments on commit 907484f

Please sign in to comment.