Skip to content

Commit

Permalink
Command to sync query files up to athena
Browse files Browse the repository at this point in the history
  • Loading branch information
GeoWill committed Jul 3, 2024
1 parent 0926721 commit 6d9280c
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 0 deletions.
5 changes: 5 additions & 0 deletions dc_logging_aws/named_queries/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,8 @@ python dc_logging_aws/named_queries/commands/create_election_query_files.py 2024

Check them

Then send to Athena:

```shell
UPDOWN_API_KEY=1234 python dc_logging_aws/named_queries/commands/create_athena_queries.py --profile prod-monitoring-dc 2024-07-04
```
117 changes: 117 additions & 0 deletions dc_logging_aws/named_queries/commands/create_athena_queries.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import argparse
import os
from pathlib import Path

import boto3


def get_existing_queries(athena_client):
existing_queries = {}
print("Getting existing queries...")
paginator = athena_client.get_paginator("list_named_queries")
for page in paginator.paginate():
for query_id in page["NamedQueryIds"]:
query = athena_client.get_named_query(NamedQueryId=query_id)
existing_queries[query["NamedQuery"]["Name"]] = query_id
return existing_queries


def get_queries_dir(subdirectory):
script_path = Path(__file__).resolve()
queries_dir = script_path.parent.parent / "queries" / subdirectory

if not queries_dir.is_dir():
raise FileNotFoundError(
f"Error: Directory '{queries_dir}' does not exist."
)

return queries_dir


def update_named_query(
athena_client, query_name, query_string, existing_queries
):
query_id = existing_queries[query_name]
try:
athena_client.update_named_query(
NamedQueryId=query_id,
Name=query_name,
QueryString=query_string,
)
print(f"Updated named query: {query_name}")
except Exception as e:
print(f"Error updating named query '{query_name}': {str(e)}")


def create_named_query(athena_client, query_name, query_string):
try:
athena_client.create_named_query(
Name=query_name,
Database="dc-wide-logs",
QueryString=query_string,
)
print(f"Created named query: {query_name}")
except athena_client.exceptions.NamedQueryAlreadyExistsException:
print(
f"Named query '{query_name}' already exists. Use update function to modify."
)
except Exception as e:
print(f"Error creating named query '{query_name}': {str(e)}")


def get_query_string(file_path):
query_string = file_path.read_text()

if updown_api_key := os.environ.get("UPDOWN_API_KEY"):
query_string = query_string.replace("UPDOWN_API_KEY", updown_api_key)
return query_string


def create_athena_queries(subdirectory, profile, overwrite):
queries_dir = get_queries_dir(subdirectory)

session = boto3.Session(profile_name=profile)
athena_client = session.client("athena")

# Get list of existing named queries
existing_queries = get_existing_queries(athena_client)

for file_path in queries_dir.glob("*.sql"):
query_name = f"{subdirectory}/{file_path.stem}"
query_string = get_query_string(file_path)

if query_name in existing_queries and not overwrite:
print(
f"Query '{query_name}' already exists. Use --overwrite to replace it."
)
continue

if query_name in existing_queries:
update_named_query(
athena_client, query_name, query_string, existing_queries
)
continue

create_named_query(athena_client, query_name, query_string)


def handle():
parser = argparse.ArgumentParser(
description="Create Athena named queries from SQL files in a specified subdirectory."
)
parser.add_argument(
"subdirectory", help="Name of the subdirectory in the queries folder"
)
parser.add_argument("--profile", required=True, help="AWS profile to use")
parser.add_argument(
"--overwrite",
action="store_true",
help="Overwrite existing queries with the same name",
)
args = parser.parse_args()

create_athena_queries(args.subdirectory, args.profile, args.overwrite)


if __name__ == "__main__":
handle()

0 comments on commit 6d9280c

Please sign in to comment.