diff --git a/notebooks/SaveTasks.ipynb b/notebooks/SaveTasks.ipynb new file mode 100644 index 00000000..eab39c8e --- /dev/null +++ b/notebooks/SaveTasks.ipynb @@ -0,0 +1,494 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "id": "0de3635b-fc48-4e50-ac81-ceb865f32dc1", + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "# These are the default AWS configurations for the Analysis Sandbox.\n", + "# that are set in the environmnet variables. \n", + "aws_default_config = {\n", + " #'AWS_NO_SIGN_REQUEST': 'YES', \n", + " 'AWS_SECRET_ACCESS_KEY': 'fake',\n", + " 'AWS_ACCESS_KEY_ID': 'fake',\n", + "}\n", + "\n", + "# To access public bucket, need to remove the AWS credentials in \n", + "# the environment variables or the following error will occur.\n", + "# PermissionError: The AWS Access Key Id you provided does not exist in our records.\n", + "\n", + "for key in aws_default_config.keys():\n", + " if key in os.environ:\n", + " del os.environ[key]" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "f5ba7e34-981d-4174-aff8-1313978bb357", + "metadata": {}, + "outputs": [], + "source": [ + "import logging\n", + "import os\n", + "import queue\n", + "from threading import Thread\n", + "\n", + "import click\n", + "import datacube\n", + "import fsspec\n", + "from odc.dscache import create_cache\n", + "from odc.dscache.apps.slurpy import EOS, qmap\n", + "from odc.dscache.tools import (\n", + " bin_dataset_stream,\n", + " dataset_count,\n", + " db_connect,\n", + " dictionary_from_product_list,\n", + " mk_raw2ds,\n", + " ordered_dss,\n", + " raw_dataset_stream,\n", + ")\n", + "from odc.dscache.tools.tiling import parse_gridspec_with_name\n", + "from odc.stats.model import DateTimeRange\n", + "from tqdm import tqdm\n", + "\n", + "from deafrica_conflux.cli.logs import logging_setup\n", + "from deafrica_conflux.hopper import bin_solar_day, persist\n", + "from deafrica_conflux.io import (\n", + " check_dir_exists,\n", + " check_file_exists,\n", + " check_if_s3_uri,\n", + " find_geotiff_files,\n", + ")\n", + "from deafrica_conflux.text import parse_tile_ids" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "1f678da9-eaf3-4811-b0f6-e0dcc650a4fc", + "metadata": {}, + "outputs": [], + "source": [ + "verbose = 1\n", + "# Grid name africa_{10|20|30|60}\n", + "grid_name = \"africa_30\"\n", + "# Datacube product to search datasets for.\n", + "product = \"wofs_ls\"\n", + "# Only extract datasets for a given time range,\" \"Example '2020-05--P1M' month of May 2020\n", + "temporal_range = \"2023-03--P3M\"\n", + "# Compression setting for zstandard 1-fast, 9+ good but slow\n", + "complevel = 6\n", + "# Path to the directory containing the polygons raster files.\n", + "polygons_rasters_directory = \"s3://deafrica-waterbodies-dev/waterbodies/v0.0.2/senegal_basin/conflux/historical_extent_rasters\"\n", + "# Regular expression for filename matching when searching for the polygons raster files.\n", + "pattern = \".*\"\n", + "# Overwrite existing cache file.\n", + "overwrite = True\n", + "# Directory to write the cache file to.\n", + "output_directory = \"s3://deafrica-waterbodies-dev/waterbodies/v0.0.2/senegal_basin/conflux/\"" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "dff56126-bec4-4ef4-866d-31882088c74f", + "metadata": {}, + "outputs": [], + "source": [ + "# Set up logger.\n", + "logging_setup(verbose)\n", + "_log = logging.getLogger(__name__)" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "d10883bf-c871-4be6-8c8d-0a9f0755caec", + "metadata": {}, + "outputs": [], + "source": [ + "# Support pathlib Paths.\n", + "polygons_rasters_directory = str(polygons_rasters_directory)\n", + "output_directory = str(output_directory)" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "6e14e4c0-7229-4050-ad46-7b856f690724", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[2023-12-04 13:12:53,642] {credentials.py:611} INFO - Found credentials in shared credentials file: ~/.aws/credentials\n" + ] + } + ], + "source": [ + "# Create the output directory if it does not exist.\n", + "is_s3 = check_if_s3_uri(output_directory)\n", + "if is_s3:\n", + " fs = fsspec.filesystem(\"s3\")\n", + "else:\n", + " fs = fsspec.filesystem(\"file\")\n", + "\n", + "if not check_dir_exists(output_directory):\n", + " fs.makedirs(output_directory, exist_ok=True)\n", + " _log.info(f\"Created directory {output_directory}\")" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "49fba8ab-edd1-4f9d-9a09-bf410e251689", + "metadata": {}, + "outputs": [], + "source": [ + "# Validate the product\n", + "products = [product]\n", + "# Connect to the datacube.\n", + "dc = datacube.Datacube()\n", + "# Get all products.\n", + "all_products = {p.name: p for p in dc.index.products.get_all()}\n", + "if len(products) == 0:\n", + " raise ValueError(\"Have to supply at least one product\")\n", + "else:\n", + " for p in products:\n", + " if p not in all_products:\n", + " raise ValueError(f\"No such product found: {p}\")" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "ea6ea2dc-e82d-4497-8ea5-e44edb5a2daf", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[2023-12-04 13:12:54,194] {1838406043.py:11} INFO - Deleted s3://deafrica-waterbodies-dev/waterbodies/v0.0.2/senegal_basin/conflux/wofs_ls_2023-03--P3M.db\n" + ] + } + ], + "source": [ + "# Parse the temporal range.\n", + "temporal_range_ = DateTimeRange(temporal_range)\n", + "\n", + "output_db_fn = f\"{product}_{temporal_range_.short}.db\"\n", + "output_db_fp = os.path.join(output_directory, output_db_fn)\n", + "\n", + "# Check if the output file exists.\n", + "if check_file_exists(output_db_fp):\n", + " if overwrite:\n", + " fs.delete(output_db_fp, recursive=True)\n", + " _log.info(f\"Deleted {output_db_fp}\")\n", + " # Delete the local file created before uploading to s3.\n", + " if is_s3:\n", + " if check_file_exists(output_db_fn):\n", + " fsspec.filesystem(\"file\").delete(output_db_fn)\n", + " _log.info(f\"Deleted local file created before uploading to s3 {output_db_fn}\")\n", + " else:\n", + " raise FileExistsError(f\"{output_db_fp} exists!\")" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "45cdca5a-3047-4b37-883c-4e99d26639e8", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[2023-12-04 13:12:54,199] {2478973304.py:3} INFO - Query: {'time': (datetime.datetime(2023, 3, 1, 0, 0), datetime.datetime(2023, 5, 31, 23, 59, 59, 999999))}\n" + ] + } + ], + "source": [ + "# Create the query to find the datasets.\n", + "query = {\"time\": (temporal_range_.start, temporal_range_.end)}\n", + "_log.info(f\"Query: {query}\")" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "5e71ea03-f7f7-42a4-a2d8-d88606e3bc9e", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[2023-12-04 13:12:54,205] {279200897.py:1} INFO - Getting dataset counts\n", + "[2023-12-04 13:12:54,263] {279200897.py:6} INFO - ..wofs_ls: 22,852\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/usr/local/lib/python3.10/dist-packages/datacube/drivers/postgres/_api.py:752: RemovedIn20Warning: Deprecated API features detected! These feature(s) are not compatible with SQLAlchemy 2.0. To prevent incompatible upgrades prior to updating applications, ensure requirements files are pinned to \"sqlalchemy<2.0\". Set environment variable SQLALCHEMY_WARN_20=1 to show all deprecation warnings. Set environment variable SQLALCHEMY_SILENCE_UBER_WARNING=1 to silence this message. (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)\n", + " select(\n" + ] + } + ], + "source": [ + "_log.info(\"Getting dataset counts\")\n", + "counts = {p: dataset_count(dc.index, product=p, **query) for p in products}\n", + "\n", + "n_total = 0\n", + "for p, c in counts.items():\n", + " _log.info(f\"..{p}: {c:8,d}\")\n", + " n_total += c\n", + "\n", + "if n_total == 0:\n", + " raise ValueError(\"No datasets found\")" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "8c500ee6-fb6e-41c6-87ad-273ebcf7c016", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[2023-12-04 13:12:54,268] {2806460463.py:1} INFO - Training compression dictionary...\n", + "[2023-12-04 13:12:54,479] {2806460463.py:3} INFO - Done\n" + ] + } + ], + "source": [ + "_log.info(\"Training compression dictionary...\")\n", + "zdict = dictionary_from_product_list(dc, products, samples_per_product=50, query=query)\n", + "_log.info(\"Done\")" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "fafe21e5-2f5a-4fa7-8574-38aee3af329c", + "metadata": {}, + "outputs": [], + "source": [ + "if is_s3:\n", + " cache = create_cache(output_db_fn, zdict=zdict, complevel=complevel, truncate=True)\n", + "else:\n", + " cache = create_cache(output_db_fp, zdict=zdict, complevel=complevel, truncate=True)" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "5e3774fd-422d-470a-9dd0-c0dbb22cffe5", + "metadata": {}, + "outputs": [], + "source": [ + "raw2ds = mk_raw2ds(all_products)\n", + "\n", + "def db_task(products, conn, q):\n", + " for p in products:\n", + " if len(query) == 0:\n", + " dss = map(raw2ds, raw_dataset_stream(p, conn))\n", + " else:\n", + " dss = ordered_dss(dc, product=p, **query)\n", + "\n", + " for ds in dss:\n", + " q.put(ds)\n", + " q.put(EOS)\n", + "\n", + "conn = db_connect()\n", + "q = queue.Queue(maxsize=10_000)\n", + "db_thread = Thread(target=db_task, args=(products, conn, q))\n", + "db_thread.start()" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "id": "be571216-39da-4107-b3c6-2681d87151a6", + "metadata": {}, + "outputs": [], + "source": [ + "dss = qmap(lambda ds: ds, q, eos_marker=EOS)\n", + "dss = cache.tee(dss)" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "id": "ffe57a0f-1a78-4344-b0cd-c73c7ae91441", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "Processing 22,852 wofs_ls datasets: 100%|██████████| 22852/22852 [00:35<00:00, 639.99it/s]\n" + ] + } + ], + "source": [ + "cells = {}\n", + "grid, gridspec = parse_gridspec_with_name(grid_name)\n", + "cache.add_grid(gridspec, grid)\n", + "\n", + "cfg = dict(grid=grid)\n", + "cache.append_info_dict(\"stats/\", dict(config=cfg))\n", + "\n", + "dss = bin_dataset_stream(gridspec, dss, cells, persist=persist)\n", + "\n", + "label = f\"Processing {n_total:8,d} {product} datasets\"\n", + "with tqdm(dss, desc=label, total=n_total) as dss:\n", + " for _ in dss:\n", + " pass" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "id": "8aff909f-2855-4369-970e-c37fdabf5bf0", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[2023-12-04 13:13:30,265] {250296155.py:2} INFO - Total bins: 4511\n", + "[2023-12-04 13:13:30,266] {250296155.py:3} INFO - Filtering bins by required tiles...\n", + "[2023-12-04 13:13:30,394] {io.py:460} INFO - Found 60 GeoTIFF files.\n", + "[2023-12-04 13:13:30,395] {250296155.py:7} INFO - Found 60 tiles.\n", + "[2023-12-04 13:13:30,419] {250296155.py:12} INFO - Total bins: 60\n" + ] + } + ], + "source": [ + "# Find the required tiles.\n", + "_log.info(f\"Total bins: {len(cells):d}\")\n", + "_log.info(\"Filtering bins by required tiles...\")\n", + "geotiff_files = find_geotiff_files(path=polygons_rasters_directory, pattern=pattern)\n", + "\n", + "tiles_ids = [parse_tile_ids(file) for file in geotiff_files]\n", + "_log.info(f\"Found {len(tiles_ids)} tiles.\")\n", + "_log.debug(f\"Tile ids: {tiles_ids}\")\n", + "\n", + "# Filter cells by tile ids.\n", + "cells = {k: v for k, v in cells.items() if k in tiles_ids}\n", + "_log.info(f\"Total bins: {len(cells):d}\")" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "id": "a82a1b48-396e-498b-8c39-4838dd831cd7", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[2023-12-04 13:13:30,436] {2957611989.py:13} INFO - Total of 572 unique dataset IDs after filtering\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "Saving 1715 tasks to disk: 100%|██████████| 1715/1715 [00:02<00:00, 599.19it/s]\n" + ] + } + ], + "source": [ + "tasks = bin_solar_day(cells)\n", + "\n", + "# Remove duplicate source uuids.\n", + "# Duplicates occur when queried datasets are captured around UTC midnight\n", + "# and around weekly boundary\n", + "tasks = {k: set(dss) for k, dss in tasks.items()}\n", + "\n", + "tasks_uuid = {k: [ds.id for ds in dss] for k, dss in tasks.items()}\n", + "\n", + "all_ids = set()\n", + "for k, dss in tasks_uuid.items():\n", + " all_ids.update(dss)\n", + "_log.info(f\"Total of {len(all_ids):,d} unique dataset IDs after filtering\")\n", + "\n", + "label = f\"Saving {len(tasks)} tasks to disk\"\n", + "with tqdm(tasks_uuid.items(), desc=label, total=len(tasks_uuid)) as groups:\n", + " for group in groups:\n", + " cache.add_grid_tile(grid, group[0], group[1])\n", + "\n", + "db_thread.join()\n", + "cache.close()" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "id": "d7c249cc-dee2-4ab8-bd2f-3f9fb99dcbc5", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[2023-12-04 13:13:33,586] {2328381222.py:5} INFO - Cache file written to s3://deafrica-waterbodies-dev/waterbodies/v0.0.2/senegal_basin/conflux/wofs_ls_2023-03--P3M.db\n", + "[2023-12-04 13:13:33,587] {2328381222.py:9} INFO - Writing summary to s3://deafrica-waterbodies-dev/waterbodies/v0.0.2/senegal_basin/conflux/wofs_ls_2023-03--P3M.csv\n" + ] + } + ], + "source": [ + "if is_s3:\n", + " fs.upload(output_db_fn, output_db_fp, recursive=False)\n", + " fsspec.filesystem(\"file\").delete(output_db_fn)\n", + "\n", + "_log.info(f\"Cache file written to {output_db_fp}\")\n", + "\n", + "# pylint:disable=too-many-locals\n", + "csv_path = os.path.join(output_directory, f\"{product}_{temporal_range}.csv\")\n", + "_log.info(f\"Writing summary to {csv_path}\")\n", + "with fs.open(csv_path, \"wt\", encoding=\"utf8\") as f:\n", + " f.write('\"T\",\"X\",\"Y\",\"datasets\",\"days\"\\n')\n", + " for p, x, y in sorted(tasks):\n", + " dss = tasks[(p, x, y)]\n", + " n_dss = len(dss)\n", + " n_days = len(set(ds.time.date() for ds in dss))\n", + " line = f'\"{p}\", {x:+05d}, {y:+05d}, {n_dss:4d}, {n_days:4d}\\n'\n", + " f.write(line)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.12" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}