Skip to content

Commit

Permalink
Merge pull request #10 from digitalearthafrica/px_wet
Browse files Browse the repository at this point in the history
Always record wet and dry pixel numbers and areas
  • Loading branch information
vikineema authored Oct 16, 2023
2 parents a96f0ed + 9fec951 commit c3d4fdb
Show file tree
Hide file tree
Showing 18 changed files with 274 additions and 256 deletions.
4 changes: 2 additions & 2 deletions deafrica_conflux/cli/db_to_csv.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import click

import deafrica_conflux.stack
from deafrica_conflux.cli.logs import logging_setup
from deafrica_conflux.stack import stack_waterbodies_db_to_csv


@click.command("db-to-csv", no_args_is_help=True)
Expand Down Expand Up @@ -42,7 +42,7 @@ def db_to_csv(output_directory, verbose, jobs, index_num, split_num, remove_dupl
"""
logging_setup(verbose)

deafrica_conflux.stack.stack_waterbodies_db_to_csv(
stack_waterbodies_db_to_csv(
output_directory=output_directory,
verbose=verbose > 0,
remove_duplicated_data=remove_duplicated_data,
Expand Down
16 changes: 6 additions & 10 deletions deafrica_conflux/cli/delete_queue.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import boto3
import click

import deafrica_conflux.queues
from deafrica_conflux.queues import delete_queue, get_queue_attribute, get_queue_url


@click.command("delete-sqs-queue", no_args_is_help=True)
Expand All @@ -14,18 +14,16 @@ def delete_sqs_queue(queue_name):
sqs_client = boto3.client("sqs")

# Get the Amazon Resource Name (ARN) of the source queue.
source_queue_arn = deafrica_conflux.queues.get_queue_attribute(
source_queue_arn = get_queue_attribute(
queue_name=queue_name, attribute_name="QueueArn", sqs_client=sqs_client
)
# Delete the source queue.
deafrica_conflux.queues.delete_queue(queue_name=queue_name, sqs_client=sqs_client)
delete_queue(queue_name=queue_name, sqs_client=sqs_client)

# Get the Amazon Resource Name (ARN) of the dead-letter queue.
dead_letter_queue_name = queue_name + "_deadletter"
dead_letter_queue_url = deafrica_conflux.queues.get_queue_url(
queue_name=dead_letter_queue_name, sqs_client=sqs_client
)
dead_letter_queue_arn = deafrica_conflux.queues.get_queue_attribute(
dead_letter_queue_url = get_queue_url(queue_name=dead_letter_queue_name, sqs_client=sqs_client)
dead_letter_queue_arn = get_queue_attribute(
queue_name=dead_letter_queue_name, attribute_name="QueueArn", sqs_client=sqs_client
)
# Check if the deadletter queue is empty or not.
Expand All @@ -37,9 +35,7 @@ def delete_sqs_queue(queue_name):
messages = response["Messages"]

if len(messages) == 0:
deafrica_conflux.queues.delete_queue(
queue_name=dead_letter_queue_name, sqs_client=sqs_client
)
delete_queue(queue_name=dead_letter_queue_name, sqs_client=sqs_client)
arn = ",".join([source_queue_arn, dead_letter_queue_arn])

return arn
26 changes: 11 additions & 15 deletions deafrica_conflux/cli/filter_from_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
import datacube
import geopandas as gpd

import deafrica_conflux.drill
import deafrica_conflux.id_field
import deafrica_conflux.queues
from deafrica_conflux.cli.logs import logging_setup
from deafrica_conflux.drill import filter_datasets
from deafrica_conflux.id_field import guess_id_field
from deafrica_conflux.queues import delete_batch, get_queue_url, send_batch_with_retry


@click.command("filter-from-sqs-queue", no_args_is_help=True)
Expand Down Expand Up @@ -60,7 +60,7 @@ def filter_from_queue(
raise error

# Guess the ID field.
id_field = deafrica_conflux.id_field.guess_id_field(polygons_gdf, use_id)
id_field = guess_id_field(polygons_gdf, use_id)
_log.debug(f"Guessed ID field: {id_field}")

# Set the ID field as the index.
Expand All @@ -69,13 +69,9 @@ def filter_from_queue(
sqs_client = boto3.client("sqs")

# Input queue should have a dead letter queue configured in its RedrivePolicy.
input_queue_url = deafrica_conflux.queues.get_queue_url(
queue_name=input_queue, sqs_client=sqs_client
)
input_queue_url = get_queue_url(queue_name=input_queue, sqs_client=sqs_client)

output_queue_url = deafrica_conflux.queues.get_queue_url(
queue_name=output_queue, sqs_client=sqs_client
)
output_queue_url = get_queue_url(queue_name=output_queue, sqs_client=sqs_client)

# Maximum number of retries to get messages from the input queue.
message_retries = 10
Expand Down Expand Up @@ -111,7 +107,7 @@ def filter_from_queue(
dss = [dc.index.datasets.get(dataset_id) for dataset_id in dataset_ids]

# Filter the Datasets.
filtered_dataset_ids = deafrica_conflux.drill.filter_datasets(
filtered_dataset_ids = filter_datasets(
dss=dss, polygons_gdf=polygons_gdf, worker_num=num_worker
)
_log.info(f"After filter {' '.join(filtered_dataset_ids)}")
Expand All @@ -121,7 +117,7 @@ def filter_from_queue(
for idx, filtered_dataset_id in enumerate(filtered_dataset_ids):
messages_to_send.append(filtered_dataset_id)
if (idx + 1) % 10 == 0:
successful, failed = deafrica_conflux.queues.send_batch_with_retry(
successful, failed = send_batch_with_retry(
queue_url=output_queue_url,
messages=messages_to_send,
max_retries=10,
Expand All @@ -131,7 +127,7 @@ def filter_from_queue(
messages_to_delete = [
retrieved_receipt_handles[dataset_ids.index(msg)] for msg in successful
]
deafrica_conflux.queues.delete_batch(
delete_batch(
queue_url=input_queue_url,
receipt_handles=messages_to_delete,
sqs_client=sqs_client,
Expand All @@ -140,7 +136,7 @@ def filter_from_queue(
messages_to_send = []

# Send the remaining messages if there are any.
successful, failed = deafrica_conflux.queues.send_batch_with_retry(
successful, failed = send_batch_with_retry(
queue_url=output_queue_url,
messages=messages_to_send,
max_retries=10,
Expand All @@ -150,6 +146,6 @@ def filter_from_queue(
messages_to_delete = [
retrieved_receipt_handles[dataset_ids.index(msg)] for msg in successful
]
deafrica_conflux.queues.delete_batch(
delete_batch(
queue_url=input_queue_url, receipt_handles=messages_to_delete, sqs_client=sqs_client
)
20 changes: 9 additions & 11 deletions deafrica_conflux/cli/get_dataset_ids.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
import geopandas as gpd
from datacube.ui import click as ui

import deafrica_conflux.drill
import deafrica_conflux.hopper
import deafrica_conflux.id_field
import deafrica_conflux.io
from deafrica_conflux.cli.logs import logging_setup
from deafrica_conflux.drill import filter_datasets
from deafrica_conflux.hopper import find_datasets
from deafrica_conflux.id_field import guess_id_field
from deafrica_conflux.io import check_file_exists, check_if_s3_uri


@click.command(
Expand Down Expand Up @@ -63,7 +63,7 @@ def get_dataset_ids(
logging_setup(verbose)
_log = logging.getLogger(__name__)

dss = deafrica_conflux.hopper.find_datasets(expressions, [product])
dss = find_datasets(expressions, [product])

if polygons_vector_file is not None:
# Read the vector file.
Expand All @@ -74,7 +74,7 @@ def get_dataset_ids(
raise error
else:
# Guess the ID field.
id_field = deafrica_conflux.id_field.guess_id_field(polygons_gdf, use_id)
id_field = guess_id_field(polygons_gdf, use_id)
_log.info(f"Guessed ID field: {id_field}")

# Set the ID field as the index.
Expand All @@ -89,20 +89,18 @@ def get_dataset_ids(
_log.info(
f"Filtering out datasets that are not near the polygons in {polygons_vector_file}"
)
dataset_ids = deafrica_conflux.drill.filter_datasets(
dss, polygons_gdf, worker_num=num_worker
)
dataset_ids = filter_datasets(dss, polygons_gdf, worker_num=num_worker)
else:
dataset_ids = [str(ds.id) for ds in dss]

_log.info(f"Found {len(dataset_ids)} datasets.")

# Check if the output file exists.
if deafrica_conflux.io.check_file_exists(output_file_path):
if check_file_exists(output_file_path):
_log.error(f"{output_file_path} exists!")
raise FileExistsError(f"{output_file_path} exists!")
else:
if deafrica_conflux.io.check_if_s3_uri(output_file_path):
if check_if_s3_uri(output_file_path):
fs = fsspec.filesystem("s3")
_log.info("Dataset ids will be saved to a s3 text file")
else:
Expand Down
32 changes: 0 additions & 32 deletions deafrica_conflux/cli/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,35 +27,3 @@ def logging_setup(verbose: int = 1):
# Suppress all (other than CRITICAL) errors for boto3
# logging.getLogger('botocore').setLevel(logging.WARNING)
# logging.getLogger('boto3').setLevel(logging.WARNING)


# Keeping this here incase its needed.
def setup_logging(verbose: int):
"""
Set up logging.
Arguments
---------
verbose : int
Verbosity level (0, 1, 2).
"""
loggers = [
logging.getLogger(name)
for name in logging.root.manager.loggerDict
if not name.startswith("fiona")
and not name.startswith("sqlalchemy")
and not name.startswith("boto")
]
# For compatibility with docker+pytest+click stack...
stdout_hdlr = logging.StreamHandler(sys.stdout)
for logger in loggers:
if verbose == 0:
logging.basicConfig(level=logging.WARNING)
elif verbose == 1:
logging.basicConfig(level=logging.INFO)
elif verbose == 2:
logging.basicConfig(level=logging.DEBUG)
else:
raise click.ClickException("Maximum verbosity is -vv")
logger.addHandler(stdout_hdlr)
logger.propagate = False
4 changes: 2 additions & 2 deletions deafrica_conflux/cli/make_queue.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import click

import deafrica_conflux.queues
from deafrica_conflux.queues import make_source_queue


@click.command("make-sqs-queue", no_args_is_help=True)
Expand All @@ -20,7 +20,7 @@ def make_sqs_queue(queue_name, timeout, retention_period, retries):
# Verify dead-letter queue name.
dead_letter_queue_name = queue_name + "_deadletter"

deafrica_conflux.queues.make_source_queue(
make_source_queue(
queue_name=queue_name,
dead_letter_queue_name=dead_letter_queue_name,
timeout=timeout,
Expand Down
12 changes: 8 additions & 4 deletions deafrica_conflux/cli/push_to_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@
import boto3
import click

import deafrica_conflux.queues
from deafrica_conflux.cli.logs import logging_setup
from deafrica_conflux.queues import (
get_queue_url,
move_to_dead_letter_queue,
push_dataset_ids_to_queue_from_txt,
)


@click.command("push-to-sqs-queue", no_args_is_help=True)
Expand Down Expand Up @@ -32,7 +36,7 @@ def push_to_sqs_queue(text_file_path, queue_name, max_retries, verbose):
# Create an sqs client.
sqs_client = boto3.client("sqs")

failed_to_push = deafrica_conflux.queues.push_dataset_ids_to_queue_from_txt(
failed_to_push = push_dataset_ids_to_queue_from_txt(
text_file_path=text_file_path,
queue_name=queue_name,
max_retries=max_retries,
Expand All @@ -42,12 +46,12 @@ def push_to_sqs_queue(text_file_path, queue_name, max_retries, verbose):
if failed_to_push:
# Push the failed dataset ids to the deadletter queue.
dead_letter_queue_name = f"{queue_name}-deadletter"
dead_letter_queue_url = deafrica_conflux.queues.get_queue_url(
dead_letter_queue_url = get_queue_url(
queue_name=dead_letter_queue_name, sqs_client=sqs_client
)

for idx in failed_to_push:
deafrica_conflux.queues.move_to_dead_letter_queue(
move_to_dead_letter_queue(
dead_letter_queue_url=dead_letter_queue_url,
message_body=idx,
max_retries=max_retries,
Expand Down
Loading

0 comments on commit c3d4fdb

Please sign in to comment.