Skip to content

Commit

Permalink
Merge pull request #6 from NERC-CEH/feature/FW-396-duckdb-queries
Browse files Browse the repository at this point in the history
Feature/fw 396 duckdb queries
  • Loading branch information
nkshaw23 authored Nov 6, 2024
2 parents df78ae6 + 39e8fa0 commit 10c0b75
Show file tree
Hide file tree
Showing 2 changed files with 410 additions and 0 deletions.
205 changes: 205 additions & 0 deletions src/driutils/benchmarking/partitioned_date_queries.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
"""Script to benchmark parquet file ingress from s3 using duckdb queries.
Two different bucket structures have been created for testing.
(current format)
'partitioned_date': cosmos-test/structure/dataset=dataset_type/date=YYYY-MM-DD/data.parquet
(proposed format)
'partitioned_date_site': cosmos-test/structure/dataset=dataset_type/site=site/date=YYYY-MM-DD/data.parquet
User can select which structure to query.
Each query profile is saved to ./profile.json. Final metrics are written to csv.
"""

import json
import os

import duckdb
import polars as pl

# User defined inputs
BUCKET = "ukceh-fdri"
PREFIX = "cosmos-test"
DATASET = "PRECIP_1MIN_2024_LOOPED"
OUTPUT_PROFILE = "profile.json"
OUTPUT_CSV = "metrics.csv"
# Select columns to filter. List to select some, empty to select all.
COLUMNS = ["SITE_ID", "time", "P_INTENSITY_RT"]

# Derived constants
BASE_BUCKET_PATH = f"s3://{BUCKET}/{PREFIX}/partitioned_date"
COLUMNS_SQL = ", ".join(COLUMNS) if isinstance(COLUMNS, list) else "*"


def extract_metrics(profile: str | os.PathLike) -> pl.DataFrame:
"""Extract the relevant metrics into a polars datframe.
Args:
profile: the saved query profile json.
Returns:
polars dataframes with required profile metrics.
"""

with open(profile) as f:
p = json.load(f)

metrics = {}
metrics["query"] = p["query_name"]
metrics["total_elapsed_query_time_(s)"] = p["latency"]
metrics["rows_returned"] = p["rows_returned"]
metrics["result_set_size_(Mb)"] = p["result_set_size"] / 1048576
metrics["rows_scanned"] = p["cumulative_rows_scanned"]
metrics["cpu_time_(s)"] = p["cpu_time"]

return pl.DataFrame(metrics)


def query_one_site_one_date(base_path, dataset): # noqa: ANN001, ANN201
# Test a very small return with partition filter
return f"""SELECT {COLUMNS_SQL} FROM read_parquet('{base_path}/dataset={dataset}/*/*.parquet')
WHERE date='2023-09-27' AND SITE_ID='BUNNY'"""


def query_multi_dates_using_conditionals_month(base_path, dataset): # noqa: ANN001, ANN201
# Test larger and more complex query parameters
# Dates are filtered using conditionals
return f"""
SELECT {COLUMNS_SQL}
FROM read_parquet('{base_path}/dataset={dataset}/*/*.parquet')
WHERE date >= '2019-01-01' AND date <= '2019-01-31' AND SITE_ID='BUNNY'
"""


def query_multi_dates_using_conditionals_year(base_path, dataset): # noqa: ANN001, ANN201
# Test larger and more complex query parameters
# Dates are filtered using conditionals
return f"""
SELECT {COLUMNS_SQL}
FROM read_parquet('{base_path}/dataset={dataset}/*/*.parquet')
WHERE date >= '2019-01-01' AND date <= '2019-12-31' AND SITE_ID='BUNNY'
"""


def query_multi_sites_and_multi_dates_using_conditionals_month(base_path, dataset): # noqa: ANN001, ANN201
# Test larger and more complex query parameters
# Dates are filtered using conditionals
# Non partitioned column used
return f"""
SELECT {COLUMNS_SQL}
FROM read_parquet('{base_path}/dataset={dataset}/*/*.parquet')
WHERE date >= '2019-01-01' AND date <= '2019-01-31'
AND SITE_ID IN ('BUNNY', 'ALIC1')
"""


def query_multi_sites_and_multi_dates_using_conditionals_year(base_path, dataset): # noqa: ANN001, ANN201
# Test larger and more complex query parameters
# Dates are filtered using conditionals
# Non partitioned column used
return f"""
SELECT {COLUMNS_SQL}
FROM read_parquet('{base_path}/dataset={dataset}/*/*.parquet')
WHERE date >= '2019-01-01' AND date <= '2019-12-31'
AND SITE_ID IN ('BUNNY', 'ALIC1')
"""


def query_multi_dates_using_hive_types_month(base_path, dataset): # noqa: ANN001, ANN201
# Test larger and more complex query parameters
# Dates are hive types and filtered using BETWEEN
# Fields of type DATE automatically picked up by duckdb so no need to specify as a hive type
return f"""
SELECT {COLUMNS_SQL}
FROM read_parquet('{base_path}/dataset={dataset}/*/*.parquet')
WHERE date BETWEEN '2019-01-01' AND '2019-01-31' AND SITE_ID='BUNNY'
"""


def query_multi_dates_using_hive_types_year(base_path, dataset): # noqa: ANN001, ANN201
# Test larger and more complex query parameters
# Dates are hive types and filtered using BETWEEN
# Fields of type DATE automatically picked up by duckdb so no need to specify as a hive type
return f"""
SELECT {COLUMNS_SQL}
FROM read_parquet('{base_path}/dataset={dataset}/*/*.parquet')
WHERE date BETWEEN '2019-01-01' AND '2019-12-31' AND SITE_ID='BUNNY'
"""


def query_multi_sites_and_multi_dates_using_hive_types_month(base_path, dataset): # noqa: ANN001, ANN201
# Test larger and more complex query parameters
# Dates are hive types and filtered using BETWEEN
# Non partitioned column used
# Fields of type DATE automatically picked up by duckdb so no need to specify as a hive type
return f"""
SELECT {COLUMNS_SQL}
FROM read_parquet('{base_path}/dataset={dataset}/*/*.parquet')
WHERE date BETWEEN '2019-01-01' AND '2019-01-31'
AND SITE_ID IN ('BUNNY', 'ALIC1')
"""


def query_multi_sites_and_multi_dates_using_hive_types_year(base_path, dataset): # noqa: ANN001, ANN201
# Test larger and more complex query parameters
# Dates are hive types and filtered using BETWEEN
# Non partitioned column used
# Fields of type DATE automatically picked up by duckdb so no need to specify as a hive type
return f"""
SELECT {COLUMNS_SQL}
FROM read_parquet('{base_path}/dataset={dataset}/*/*.parquet')
WHERE date BETWEEN '2019-01-01' AND '2019-12-31'
AND SITE_ID IN ('BUNNY', 'ALIC1')
"""


if __name__ == "__main__":
# Setup basic duckdb connection
conn = duckdb.connect()

conn.execute("""
INSTALL httpfs;
LOAD httpfs;
SET force_download = true;
SET enable_profiling = json;
SET profiling_output = 'profile.json';
""")

# Add s3 connection details
conn.execute("""
CREATE SECRET aws_secret (
TYPE S3,
PROVIDER CREDENTIAL_CHAIN,
CHAIN 'sts'
);
""")

queries = [
query_one_site_one_date(BASE_BUCKET_PATH, DATASET),
query_multi_dates_using_conditionals_month(BASE_BUCKET_PATH, DATASET),
query_multi_dates_using_conditionals_year(BASE_BUCKET_PATH, DATASET),
query_multi_sites_and_multi_dates_using_conditionals_month(BASE_BUCKET_PATH, DATASET),
query_multi_sites_and_multi_dates_using_conditionals_year(BASE_BUCKET_PATH, DATASET),
query_multi_dates_using_hive_types_month(BASE_BUCKET_PATH, DATASET),
query_multi_dates_using_hive_types_year(BASE_BUCKET_PATH, DATASET),
query_multi_sites_and_multi_dates_using_hive_types_month(BASE_BUCKET_PATH, DATASET),
query_multi_sites_and_multi_dates_using_hive_types_year(BASE_BUCKET_PATH, DATASET),
]

# Create empty dataframe to store the results
data = pl.DataFrame()

for query in queries:
print(f"Running {query}\n")

# Query profile is saved to ./profile.json
conn.execute(query).pl()

# Extract whats need from the profiler
df = extract_metrics(profile=OUTPUT_PROFILE)
print(df.glimpse())

data = pl.concat([data, df], how="diagonal")

data.write_csv(OUTPUT_CSV)
Loading

0 comments on commit 10c0b75

Please sign in to comment.