Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Named query generation #655

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# Query results
dc_logging_aws/named_queries/results

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
Expand Down
32 changes: 32 additions & 0 deletions dc_logging_aws/named_queries/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
### Make report Queries

You'll probably need to update some table names, but this is the gist of it:

Make the files:

eg

```shell
python dc_logging_aws/named_queries/commands/create_election_query_files.py 2024-05-02 2024-04-01
```

or

```shell
python dc_logging_aws/named_queries/commands/create_election_query_files.py 2024-07-04 2024-05-22
```

Check them

Then send to Athena:

```shell
UPDOWN_API_KEY=1234 python dc_logging_aws/named_queries/commands/create_athena_queries.py --profile prod-monitoring-dc 2024-07-04
```


Run them

```shell
RESULTS_BUCKET=**** python dc_logging_aws/named_queries/commands/run_queries.py 2024-07-04 --profile prod-monitoring-dc
```
Empty file.
Empty file.
117 changes: 117 additions & 0 deletions dc_logging_aws/named_queries/commands/create_athena_queries.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import argparse
import os
from pathlib import Path

import boto3


def get_existing_queries(athena_client):
existing_queries = {}
print("Getting existing queries...")
paginator = athena_client.get_paginator("list_named_queries")
for page in paginator.paginate():
for query_id in page["NamedQueryIds"]:
query = athena_client.get_named_query(NamedQueryId=query_id)
existing_queries[query["NamedQuery"]["Name"]] = query_id
return existing_queries


def get_queries_dir(subdirectory):
script_path = Path(__file__).resolve()
queries_dir = script_path.parent.parent / "queries" / subdirectory

if not queries_dir.is_dir():
raise FileNotFoundError(
f"Error: Directory '{queries_dir}' does not exist."
)

return queries_dir


def update_named_query(
athena_client, query_name, query_string, existing_queries
):
query_id = existing_queries[query_name]
try:
athena_client.update_named_query(
NamedQueryId=query_id,
Name=query_name,
QueryString=query_string,
)
print(f"Updated named query: {query_name}")
except Exception as e:
print(f"Error updating named query '{query_name}': {str(e)}")


def create_named_query(athena_client, query_name, query_string):
try:
athena_client.create_named_query(
Name=query_name,
Database="dc-wide-logs",
QueryString=query_string,
)
print(f"Created named query: {query_name}")
except athena_client.exceptions.NamedQueryAlreadyExistsException:
print(
f"Named query '{query_name}' already exists. Use update function to modify."
)
except Exception as e:
print(f"Error creating named query '{query_name}': {str(e)}")


def get_query_string(file_path):
query_string = file_path.read_text()

if updown_api_key := os.environ.get("UPDOWN_API_KEY"):
query_string = query_string.replace("UPDOWN_API_KEY", updown_api_key)
return query_string


def create_athena_queries(subdirectory, profile, overwrite):
queries_dir = get_queries_dir(subdirectory)

session = boto3.Session(profile_name=profile)
athena_client = session.client("athena")

# Get list of existing named queries
existing_queries = get_existing_queries(athena_client)

for file_path in queries_dir.glob("*.sql"):
query_name = f"{subdirectory}/{file_path.stem}"
query_string = get_query_string(file_path)

if query_name in existing_queries and not overwrite:
print(
f"Query '{query_name}' already exists. Use --overwrite to replace it."
)
continue

if query_name in existing_queries:
update_named_query(
athena_client, query_name, query_string, existing_queries
)
continue

create_named_query(athena_client, query_name, query_string)


def handle():
parser = argparse.ArgumentParser(
description="Create Athena named queries from SQL files in a specified subdirectory."
)
parser.add_argument(
"subdirectory", help="Name of the subdirectory in the queries folder"
)
parser.add_argument("--profile", required=True, help="AWS profile to use")
parser.add_argument(
"--overwrite",
action="store_true",
help="Overwrite existing queries with the same name",
)
args = parser.parse_args()

create_athena_queries(args.subdirectory, args.profile, args.overwrite)


if __name__ == "__main__":
handle()
176 changes: 176 additions & 0 deletions dc_logging_aws/named_queries/commands/create_election_query_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
import argparse
import sys
from datetime import datetime, time
from pathlib import Path

# Add the parent directory to the Python path
script_dir = Path(__file__).resolve().parent
project_root = script_dir.parent.parent.parent
sys.path.insert(0, str(project_root))

from dc_logging_aws.named_queries.query_template import ( # noqa: E402
QueryTemplate,
)


class QueryFileCreator:
def __init__(self, polling_day, start_of_election_period, overwrite=False):
self.polling_day = polling_day
self.start_of_election_period = start_of_election_period
self.overwrite = overwrite
self.date_str = polling_day.strftime("%Y-%m-%d")
self.query_template = QueryTemplate(
polling_day, start_of_election_period
)
self.script_dir = Path(__file__).resolve().parent

@staticmethod
def valid_date(s):
try:
return datetime.strptime(s, "%Y-%m-%d").date()
except ValueError:
msg = f"Not a valid date: '{s}'. Please use YYYY-MM-DD format."
raise argparse.ArgumentTypeError(msg)

def create_query_directory(self):
queries_dir = self.script_dir.parent / "queries"
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The outstanding question I have is whether we want the queries directory in the gh repo or not. I think I err to 'yes', because it's nice to keep a record of these things, and it's easy to delete things in Athena. But maybe it should be 'no' until we're actually using CI (or some other automation to) to run the queries. This is because we'll have 2 sources of truth (athena and gh) until that's the case...

directory = queries_dir / self.date_str
directory.mkdir(parents=True, exist_ok=True)
return directory

def create_query_files(self):
directory = self.create_query_directory()

count_files = {
"election-week-count.sql": self.query_template.postcode_search_count(
self.query_template.start_of_polling_week
),
"election-day-count.sql": self.query_template.postcode_search_count(
datetime.combine(self.polling_day, time(0, 0)).replace(
tzinfo=self.query_template.close_of_polls.tzinfo
)
),
"election-period-count.sql": self.query_template.postcode_search_count(
datetime.combine(
self.start_of_election_period, time(0, 0)
).replace(tzinfo=self.query_template.close_of_polls.tzinfo)
),
}

by_product_files = {
"election-week-postcode-searches-by-product.sql": self.query_template.postcode_searches_by_product(
self.query_template.start_of_polling_week
),
"election-day-postcode-searches-by-product.sql": self.query_template.postcode_searches_by_product(
datetime.combine(self.polling_day, time(0, 0)).replace(
tzinfo=self.query_template.close_of_polls.tzinfo
)
),
"election-period-postcode-searches-by-product.sql": self.query_template.postcode_searches_by_product(
datetime.combine(
self.start_of_election_period, time(0, 0)
).replace(tzinfo=self.query_template.close_of_polls.tzinfo)
),
}

by_local_authority = {
"election-week-postcode-searches-by-local-authority.sql": self.query_template.postcode_searches_by_local_authority(
self.query_template.start_of_polling_week
),
"election-day-postcode-searches-by-local-authority.sql": self.query_template.postcode_searches_by_local_authority(
datetime.combine(self.polling_day, time(0, 0)).replace(
tzinfo=self.query_template.close_of_polls.tzinfo
)
),
"election-period-postcode-searches-by-local-authority.sql": self.query_template.postcode_searches_by_local_authority(
datetime.combine(
self.start_of_election_period, time(0, 0)
).replace(tzinfo=self.query_template.close_of_polls.tzinfo)
),
}

by_constituency = {
"election-week-postcode-searches-by-constituency.sql": self.query_template.postcode_searches_by_constituency(
self.query_template.start_of_polling_week
),
"election-day-postcode-searches-by-constituency.sql": self.query_template.postcode_searches_by_constituency(
datetime.combine(self.polling_day, time(0, 0)).replace(
tzinfo=self.query_template.close_of_polls.tzinfo
)
),
"election-period-postcode-searches-by-constituency.sql": self.query_template.postcode_searches_by_constituency(
datetime.combine(
self.start_of_election_period, time(0, 0)
).replace(tzinfo=self.query_template.close_of_polls.tzinfo)
),
}

by_product_timeseries = {
"election-week-timeseries-by-product.sql": self.query_template.postcode_timeseries_by_product(
self.query_template.start_of_polling_week
),
"election-day-timeseries-by-product.sql": self.query_template.postcode_timeseries_by_product(
datetime.combine(self.polling_day, time(0, 0)).replace(
tzinfo=self.query_template.close_of_polls.tzinfo
),
),
"election-period-timeseries-by-product.sql": self.query_template.postcode_timeseries_by_product(
datetime.combine(
self.start_of_election_period, time(0, 0)
).replace(tzinfo=self.query_template.close_of_polls.tzinfo),
),
}

files_to_create = {
**count_files,
**by_product_files,
**by_local_authority,
**by_constituency,
**by_product_timeseries,
}

for filename, content in files_to_create.items():
file_path = directory / filename
if file_path.exists() and not self.overwrite:
print(
f"File {file_path} already exists. Use --overwrite to replace existing files."
)
else:
with open(file_path, "w") as f:
f.write(content)
print(f"Created {filename} in {directory}")

print(f"Process completed for {self.date_str}")

@classmethod
def handle(cls):
parser = argparse.ArgumentParser(
description="Create election query files for a specific polling day."
)
parser.add_argument(
"polling_day",
type=cls.valid_date,
help="The polling day in YYYY-MM-DD format",
)
parser.add_argument(
"start_of_election_period",
type=cls.valid_date,
help="The start date of the election period in YYYY-MM-DD format",
)
parser.add_argument(
"-o",
"--overwrite",
action="store_true",
help="Overwrite existing files",
)

args = parser.parse_args()

creator = cls(
args.polling_day, args.start_of_election_period, args.overwrite
)
creator.create_query_files()


if __name__ == "__main__":
QueryFileCreator.handle()
Loading