diff --git a/concatenator/harmony/__init__.py b/concatenator/harmony/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/concatenator/harmony/cli.py b/concatenator/harmony/cli.py new file mode 100644 index 0000000..3329095 --- /dev/null +++ b/concatenator/harmony/cli.py @@ -0,0 +1,33 @@ +"""A Harmony CLI wrapper around the concatenate-batcher""" +from argparse import ArgumentParser + +import harmony + +from concatenator.harmony.service_adapter import StitcheeAdapter as HarmonyAdapter + + +def main(config: harmony.util.Config = None) -> None: + """Parse command line arguments and invoke the service to respond to them. + + Parameters + ---------- + config : harmony.util.Config + harmony.util.Config is injectable for tests + + Returns + ------- + None + """ + parser = ArgumentParser( + prog="Stitchee", description="Run the STITCH by Extending a dimEnsion service" + ) + harmony.setup_cli(parser) + args = parser.parse_args() + if harmony.is_harmony_cli(args): + harmony.run_cli(parser, args, HarmonyAdapter, cfg=config) + else: + parser.error("Only --harmony CLIs are supported") + + +if __name__ == "__main__": + main() diff --git a/concatenator/harmony/download_worker.py b/concatenator/harmony/download_worker.py new file mode 100644 index 0000000..0c177af --- /dev/null +++ b/concatenator/harmony/download_worker.py @@ -0,0 +1,113 @@ +"""A utility for downloading multiple granules simultaneously""" + +import queue +import re +from copy import deepcopy +from multiprocessing import Manager, Process +from os import cpu_count +from pathlib import Path +from urllib.parse import urlparse + +from harmony.logging import build_logger +from harmony.util import download + + +def multi_core_download(urls, destination_dir, access_token, cfg, process_count=None): + """ + A method which automagically scales downloads to the number of CPU + cores. For further explaination, see documentation on "multi-track + drifting" + + Parameters + ---------- + urls : list + list of urls to download + destination_dir : str + output path for downloaded files + access_token : str + access token as provided in Harmony input + cfg : dict + Harmony configuration information + process_count : int + Number of worker processes to run (expected >= 1) + + Returns + ------- + list + list of downloaded files as pathlib.Path objects + """ + + if process_count is None: + process_count = cpu_count() + + with Manager() as manager: + url_queue = manager.Queue(len(urls)) + path_list = manager.list() + + for url in urls: + url_queue.put(url) + + # Spawn worker processes + processes = [] + for _ in range(process_count): + download_process = Process( + target=_download_worker, + args=(url_queue, path_list, destination_dir, access_token, cfg), + ) + processes.append(download_process) + download_process.start() + + # Ensure worker processes exit successfully + for process in processes: + process.join() + if process.exitcode != 0: + raise RuntimeError(f"Download failed - exit code: {process.exitcode}") + + process.close() + + path_list = deepcopy(path_list) # ensure GC can cleanup multiprocessing + + return [Path(path) for path in path_list] + + +def _download_worker(url_queue, path_list, destination_dir, access_token, cfg): + """ + A method to be executed in a separate process which processes the url_queue + and places paths to completed downloads into the path_list. Downloads are + handled by harmony.util.download + + Parameters + ---------- + url_queue : queue.Queue + URLs to process - should be filled from start and only decreases + path_list : list + paths to completed file downloads + destination_dir : str + output path for downloaded files + access_token : str + access token as provided in Harmony input + cfg : dict + Harmony configuration information + """ + + logger = build_logger(cfg) + + while not url_queue.empty(): + try: + url = url_queue.get_nowait() + except queue.Empty: + break + + path = Path( + download(url, destination_dir, logger=logger, access_token=access_token, cfg=cfg) + ) + filename_match = re.match(r".*\/(.+\..+)", urlparse(url).path) + + if filename_match is not None: + filename = filename_match.group(1) + dest_path = path.parent.joinpath(filename) + path = path.rename(dest_path) + else: + logger.warning("Origin filename could not be assertained - %s", url) + + path_list.append(str(path)) diff --git a/concatenator/harmony/service_adapter.py b/concatenator/harmony/service_adapter.py new file mode 100644 index 0000000..5905229 --- /dev/null +++ b/concatenator/harmony/service_adapter.py @@ -0,0 +1,128 @@ +from pathlib import Path +from tempfile import TemporaryDirectory +from uuid import uuid4 + +import pystac +from harmony.adapter import BaseHarmonyAdapter +from harmony.util import bbox_to_geometry +from pystac import Item +from pystac.item import Asset + +from concatenator.harmony.download_worker import multi_core_download +from concatenator.harmony.util import ( + _get_netcdf_urls, + _get_output_bounding_box, + _get_output_date_range, +) +from concatenator.stitchee import stitchee + + +class StitcheeAdapter(BaseHarmonyAdapter): + """ + A harmony-service-lib wrapper around the concatenate-batcher module. + This wrapper does not support Harmony calls that do not have STAC catalogs + as support for this behavior is being depreciated in harmony-service-lib + """ + + def __init__(self, message, catalog=None, config=None): + """ + Constructs the adapter + + Parameters + ---------- + message : harmony.Message + The Harmony input which needs acting upon + catalog : pystac.Catalog + A STAC catalog containing the files on which to act + config : harmony.util.Config + The configuration values for this runtime environment. + """ + super().__init__(message, catalog=catalog, config=config) + + def invoke(self): + """ + Primary entrypoint into the service wrapper. Overrides BaseHarmonyAdapter.invoke + """ + if not self.catalog: + # Message-only support is being depreciated in Harmony, so we should expect to + # only see requests with catalogs when invoked with a newer Harmony instance + # https://github.com/nasa/harmony-service-lib-py/blob/21bcfbda17caf626fb14d2ac4f8673be9726b549/harmony/adapter.py#L71 + raise RuntimeError("Invoking Batchee without a STAC catalog is not supported") + + return self.message, self.process_catalog(self.catalog) + + def process_catalog(self, catalog: pystac.Catalog): + """Converts a list of STAC catalogs into a list of lists of STAC catalogs.""" + self.logger.info("process_catalog() started.") + try: + result = catalog.clone() + result.id = str(uuid4()) + result.clear_children() + + # Get all the items from the catalog, including from child or linked catalogs + items = list(self.get_all_catalog_items(catalog)) + + self.logger.info(f"length of items==={len(items)}.") + + # Quick return if catalog contains no items + if len(items) == 0: + return result + + # # --- Get granule filepaths (urls) --- + netcdf_urls: list[str] = _get_netcdf_urls(items) + self.logger.info(f"netcdf_urls==={netcdf_urls}.") + + # -- Process metadata -- + bounding_box: list | None = _get_output_bounding_box(items) + datetimes = _get_output_date_range(items) + + # Items did not have a bbox; valid under spec + if bounding_box and len(bounding_box) == 0: + bounding_box = None + + # -- Perform merging -- + collection = self._get_item_source(items[0]).collection + filename = f"{collection}_merged.nc4" + + with TemporaryDirectory() as temp_dir: + self.logger.info("Starting granule downloads") + input_files = multi_core_download( + netcdf_urls, temp_dir, self.message.accessToken, self.config + ) + self.logger.info("Finished granule downloads") + + output_path = str(Path(temp_dir).joinpath(filename).resolve()) + + # # --- Run STITCHEE --- + stitchee( + input_files, + output_path, + write_tmp_flat_concatenated=False, + keep_tmp_files=False, + concat_dim="mirror_step", # This is currently set only for TEMPO + logger=self.logger, + ) + staged_url = self._stage(output_path, filename, "application/x-netcdf4") + + # -- Output to STAC catalog -- + result.clear_items() + properties = dict( + start_datetime=datetimes["start_datetime"], end_datetime=datetimes["end_datetime"] + ) + + item = Item( + str(uuid4()), bbox_to_geometry(bounding_box), bounding_box, None, properties + ) + asset = Asset( + staged_url, title=filename, media_type="application/x-netcdf4", roles=["data"] + ) + item.add_asset("data", asset) + result.add_item(item) + + self.logger.info("STAC catalog creation complete.") + + return result + + except Exception as service_exception: + self.logger.error(service_exception, exc_info=1) + raise service_exception diff --git a/concatenator/harmony/util.py b/concatenator/harmony/util.py new file mode 100644 index 0000000..f77aeb9 --- /dev/null +++ b/concatenator/harmony/util.py @@ -0,0 +1,98 @@ +"""Misc utility functions""" +from datetime import datetime + +from pystac import Asset, Item + +VALID_EXTENSIONS = (".nc4", ".nc") +VALID_MEDIA_TYPES = ["application/x-netcdf", "application/x-netcdf4"] + + +def _is_netcdf_asset(asset: Asset) -> bool: + """Check that a `pystac.Asset` is a valid NetCDF-4 granule. This can be + ascertained via either the media type or by checking the extension of + granule itself if that media type is absent. + + """ + return asset.media_type in VALID_MEDIA_TYPES or ( + asset.media_type is None and asset.href.lower().endswith(VALID_EXTENSIONS) + ) + + +def _get_item_url(item: Item) -> str | None: + """Check the `pystac.Item` for the first asset with the `data` role and a + valid input format. If there are no matching assets, return None + + """ + return next( + ( + asset.href + for asset in item.assets.values() + if "data" in (asset.roles or []) and _is_netcdf_asset(asset) + ), + None, + ) + + +def _get_netcdf_urls(items: list[Item]) -> list[str]: + """Iterate through a list of `pystac.Item` instances, from the input + `pystac.Catalog`. Extract the `pystac.Asset.href` for the first asset + of each item that has a role of "data". If there are any items that do + not have a data asset, then raise an exception. + + """ + catalog_urls = [_get_item_url(item) for item in items] + + if None in catalog_urls: + raise RuntimeError("Some input granules do not have NetCDF-4 assets.") + + return catalog_urls # type: ignore[return-value] + + +def _get_output_bounding_box(input_items: list[Item]) -> list[float]: + """Create a bounding box that is the maximum combined extent of all input + `pystac.Item` bounding box extents. + + """ + bounding_box = input_items[0].bbox + + for item in input_items: + bounding_box[0] = min(bounding_box[0], item.bbox[0]) + bounding_box[1] = min(bounding_box[1], item.bbox[1]) + bounding_box[2] = max(bounding_box[2], item.bbox[2]) + bounding_box[3] = max(bounding_box[3], item.bbox[3]) + + return bounding_box + + +def _get_output_date_range(input_items: list[Item]) -> dict[str, str]: + """Create a dictionary of start and end datetime, which encompasses the + full temporal range of all input `pystac.Item` instances. This output + dictionary will be used for the `properties` of the output Zarr store + `pystac.Item`. + + """ + start_datetime, end_datetime = _get_item_date_range(input_items[0]) + + for item in input_items: + new_start_datetime, new_end_datetime = _get_item_date_range(item) + start_datetime = min(start_datetime, new_start_datetime) + end_datetime = max(end_datetime, new_end_datetime) + + return {"start_datetime": start_datetime.isoformat(), "end_datetime": end_datetime.isoformat()} + + +def _get_item_date_range(item: Item) -> tuple[datetime, datetime]: + """A helper function to retrieve the temporal range from a `pystac.Item` + instance. If the `pystac.Item.datetime` property exists, there is a + single datetime associated with the granule, otherwise there will be a + start and end time contained within the `pystac.Item` metadata. + + """ + if item.datetime is None: + start_datetime = item.common_metadata.start_datetime + end_datetime = item.common_metadata.end_datetime + else: + start_datetime = item.datetime + end_datetime = item.datetime + + return start_datetime, end_datetime