From 0f158f0468a91ac5e3e8281456e0657f18865268 Mon Sep 17 00:00:00 2001 From: GeoWill Date: Wed, 3 Jul 2024 15:16:17 +0100 Subject: [PATCH 1/7] Templates for named queries for election reports --- dc_logging_aws/named_queries/__init__.py | 0 .../named_queries/query_template.py | 229 +++++++++++ .../named_queries/tests/__init__.py | 0 ...test_datetime_to_athena_datetime_string.py | 63 ++++ .../tests/test_query_templates.py | 354 ++++++++++++++++++ 5 files changed, 646 insertions(+) create mode 100644 dc_logging_aws/named_queries/__init__.py create mode 100644 dc_logging_aws/named_queries/query_template.py create mode 100644 dc_logging_aws/named_queries/tests/__init__.py create mode 100644 dc_logging_aws/named_queries/tests/test_datetime_to_athena_datetime_string.py create mode 100644 dc_logging_aws/named_queries/tests/test_query_templates.py diff --git a/dc_logging_aws/named_queries/__init__.py b/dc_logging_aws/named_queries/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dc_logging_aws/named_queries/query_template.py b/dc_logging_aws/named_queries/query_template.py new file mode 100644 index 0000000..bcae437 --- /dev/null +++ b/dc_logging_aws/named_queries/query_template.py @@ -0,0 +1,229 @@ +import textwrap +from datetime import date, datetime, time, timedelta + +from zoneinfo import ZoneInfo + + +def datetime_to_athena_datetime_string( + dt: datetime, tz_target: ZoneInfo +) -> str: + """ + Converts a timezone-aware datetime to an Athena-compatible string. + + Parameters: + dt (datetime): Timezone-aware datetime object. + tz_target (ZoneInfo): Target timezone (In this case the timezone your logs are in). + + Returns: + str: Datetime string formatted as "YYYY-MM-DD HH:MM". + + Raises: + ValueError: If `dt` is not timezone-aware. + """ + if dt.tzinfo is None: + raise ValueError("Input datetime must be timezone-aware") + + # Convert to target timezone + dt_target = dt.astimezone(tz_target) + + # Format the datetime string + return dt_target.strftime("%Y-%m-%d %H:%M") + + +def indent_cte_string(multiline_string: str, indent_length: int = 8) -> str: + """ + Indents all lines of a multiline string except the first one. + """ + lines = multiline_string.split("\n") + indented_lines = lines[:1] + [ + " " * indent_length + line for line in lines[1:] + ] + return "\n".join(indented_lines) + + +def utc_athena_time(dt): + return datetime_to_athena_datetime_string(dt, ZoneInfo("UTC")) + + +def london_athena_time(dt): + return datetime_to_athena_datetime_string(dt, ZoneInfo("Europe/London")) + + +class QueryTemplate: + def __init__(self, polling_day: date, start_of_election_period: date): + self.polling_day = polling_day + self.start_of_election_period = start_of_election_period + self.close_of_polls = datetime.combine( + self.polling_day, + time(22, 0, tzinfo=ZoneInfo("Europe/London")), + ) + + @property + def start_of_polling_week(self): + monday = self.polling_day - timedelta(days=3) + return datetime.combine( + monday, time(0, 0, tzinfo=ZoneInfo("Europe/London")) + ) + + def election_period_cte(self, exclude_calls_devs_dc_api=True): + start_date = self.start_of_election_period - timedelta(days=1) + cte = textwrap.dedent( + f""" + SELECT * + FROM "dc-wide-logs"."dc_postcode_searches_table" all_logs + WHERE "day" >= '{start_date.strftime('%Y/%m/%d')}' + AND "day" <= '{self.polling_day.strftime('%Y/%m/%d')}' + AND all_logs."api_key" != 'UPDOWN_API_KEY' --updown + AND ( + (all_logs."dc_product" != 'WDIV') + OR + (all_logs."dc_product" = 'WDIV' AND replace(all_logs."postcode",' ','') != 'BS44NN') --updown + ) + """ + ).strip() + + if exclude_calls_devs_dc_api: + cte = cte + """\n AND LOWER("calls_devs_dc_api") = 'false'""" + return cte + + def logs_cte(self, start_time, from_source="ELECTION_PERIOD"): + return textwrap.dedent( + f""" + SELECT * + FROM {from_source} + WHERE ( + "timestamp" >= cast('{utc_athena_time(start_time)}' AS timestamp) + AND "timestamp" <= cast('{utc_athena_time(self.close_of_polls)}' AS timestamp) + AND "dc_product" != 'WDIV' + ) OR ( + "timestamp" >= cast('{london_athena_time(start_time)}' AS timestamp) + AND "timestamp" <= cast('{london_athena_time(self.close_of_polls)}' AS timestamp) + AND "dc_product" = 'WDIV' + ) + """ + ).strip() + + def postcode_search_count(self, start_time): + logs_indent_cte_string = indent_cte_string( + self.logs_cte(start_time), 12 + ) + election_period_cte = indent_cte_string(self.election_period_cte(), 12) + return textwrap.dedent( + f""" + WITH ELECTION_PERIOD AS ( + {textwrap.indent(election_period_cte,' ')} + ), LOGS AS ( + {textwrap.indent(logs_indent_cte_string,' ')} + ) + SELECT + count(*) as count + FROM + LOGS; + """ + ).strip() + + def product_count_cte(self, logs_cte_name="LOGS"): + return textwrap.dedent( + f""" + SELECT + count(*) AS count, "dc_product", '' AS key_name, '' AS user_name, '' AS email, utm_source + FROM {logs_cte_name} + WHERE dc_product = 'WDIV' + GROUP BY "dc_product", "api_key", "utm_source" + UNION SELECT + count(*) AS count, "dc_product", '' AS key_name, '' AS user_name, '' AS email, utm_source + FROM {logs_cte_name} + WHERE dc_product = 'WCIVF' + GROUP BY "dc_product", "api_key", "utm_source" + UNION SELECT + count(*) AS count, "dc_product", api_users."key_name", api_users."user_name", api_users."email", utm_source + FROM {logs_cte_name} + JOIN "dc-wide-logs"."api-users-ec-api" as api_users ON {logs_cte_name}."api_key" = api_users."key" + WHERE dc_product = 'EC_API' + GROUP BY "dc_product", "key_name", "user_name", "utm_source", "email" + UNION SELECT + count(*) AS count, "dc_product", api_users."key_name", api_users."user_name", api_users."email", utm_source + FROM {logs_cte_name} + JOIN "dc-wide-logs"."api-users-aggregator-api" as api_users ON {logs_cte_name}."api_key" = api_users."key" + WHERE + dc_product = 'AGGREGATOR_API' + AND api_users."key_name" NOT IN ( + 'EC postcode pages - Dev', 'WhoCanIVoteFor', 'Updown', 'EC API' + ) + GROUP BY "dc_product", "key_name", "user_name", "utm_source", "email" + """ + ).strip() + + def postcode_searches_by_product(self, start_time): + election_period_cte = indent_cte_string( + self.election_period_cte(exclude_calls_devs_dc_api=False), 12 + ) + logs_indent_cte_string = indent_cte_string( + self.logs_cte(start_time), 12 + ) + product_indent_cte_string = indent_cte_string( + self.product_count_cte(), 12 + ) + + return textwrap.dedent( + f""" + WITH ELECTION_PERIOD AS ( + {textwrap.indent(election_period_cte,' ')} + ), LOGS AS ( + {textwrap.indent(logs_indent_cte_string,' ')} + ), PRODUCT_COUNTS AS ( + {textwrap.indent(product_indent_cte_string, ' ')} + ) + SELECT * + FROM + PRODUCT_COUNTS + ORDER BY count DESC; + """ + ).strip() + + def postcode_searches_by_local_authority(self, start_time: datetime): + election_period_cte = indent_cte_string(self.election_period_cte(), 12) + logs_indent_cte_string = indent_cte_string( + self.logs_cte(start_time), 12 + ) + return textwrap.dedent( + f""" + WITH ELECTION_PERIOD AS ( + {textwrap.indent(election_period_cte,' ')} + ), LOGS AS ( + {textwrap.indent(logs_indent_cte_string,' ')} + ) + SELECT + oslaua as gss, count(*) as postcode_searches + FROM + LOGS JOIN "pollingstations.public.data"."onspd-2024-feb" + ON upper(replace(replace("postcode",' ', '' ),'+','')) = upper(replace( "pcds",' ', '')) + GROUP BY oslaua + ORDER BY postcode_searches DESC; + """ + ).strip() + + def postcode_searches_by_constituency(self, start_time: datetime): + election_period_cte = indent_cte_string(self.election_period_cte(), 12) + logs_indent_cte_string = indent_cte_string( + self.logs_cte(start_time), 12 + ) + return textwrap.dedent( + f""" + WITH ELECTION_PERIOD AS ( + {textwrap.indent(election_period_cte,' ')} + ), LOGS AS ( + {textwrap.indent(logs_indent_cte_string,' ')} + ) + SELECT + c.name, + c.official_identifier, + count(*) as postcode_searches + FROM + LOGS + JOIN "ee.public.data"."postcode-to-2024-parl-constituency" c + ON upper(replace(replace(LOGS."postcode",' ', '' ),'+','')) = upper(replace(c.pcds,' ', '')) + GROUP BY c.name, c.official_identifier + ORDER BY postcode_searches DESC; + """ + ).strip() diff --git a/dc_logging_aws/named_queries/tests/__init__.py b/dc_logging_aws/named_queries/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dc_logging_aws/named_queries/tests/test_datetime_to_athena_datetime_string.py b/dc_logging_aws/named_queries/tests/test_datetime_to_athena_datetime_string.py new file mode 100644 index 0000000..ae8786d --- /dev/null +++ b/dc_logging_aws/named_queries/tests/test_datetime_to_athena_datetime_string.py @@ -0,0 +1,63 @@ +from datetime import datetime + +import pytest +from zoneinfo import ZoneInfo + +from dc_logging_aws.named_queries.query_template import ( + datetime_to_athena_datetime_string, +) + + +@pytest.fixture +def test_dates(): + london_tz = ZoneInfo("Europe/London") + return [ + datetime(2024, 4, 29, 0, 0, tzinfo=london_tz), + datetime(2024, 5, 2, 22, 0, tzinfo=london_tz), + datetime(2019, 12, 12, 22, 0, tzinfo=london_tz), + datetime(2019, 12, 9, 0, 0, tzinfo=london_tz), + ] + + +@pytest.fixture +def utc_tz(): + return ZoneInfo("UTC") + + +@pytest.fixture +def london_tz(): + return ZoneInfo("Europe/London") + + +@pytest.mark.parametrize( + "index, expected", + [ + (0, "2024-04-28 23:00"), # BST to UTC + (1, "2024-05-02 21:00"), # BST to UTC + (2, "2019-12-12 22:00"), # GMT to UTC (no change) + (3, "2019-12-09 00:00"), # GMT to UTC (no change) + ], +) +def test_london_to_utc(test_dates, utc_tz, index, expected): + result = datetime_to_athena_datetime_string(test_dates[index], utc_tz) + assert result == expected + + +@pytest.mark.parametrize( + "index, expected", + [ + (0, "2024-04-29 00:00"), + (1, "2024-05-02 22:00"), + (2, "2019-12-12 22:00"), + (3, "2019-12-09 00:00"), + ], +) +def test_london_to_london(test_dates, london_tz, index, expected): + result = datetime_to_athena_datetime_string(test_dates[index], london_tz) + assert result == expected + + +def test_non_tz_aware_input(utc_tz): + naive_dt = datetime(2024, 4, 29, 0, 0) + with pytest.raises(ValueError): + datetime_to_athena_datetime_string(naive_dt, utc_tz) diff --git a/dc_logging_aws/named_queries/tests/test_query_templates.py b/dc_logging_aws/named_queries/tests/test_query_templates.py new file mode 100644 index 0000000..cf20ecf --- /dev/null +++ b/dc_logging_aws/named_queries/tests/test_query_templates.py @@ -0,0 +1,354 @@ +import textwrap +from datetime import date, datetime + +import pytest +from zoneinfo import ZoneInfo + +from dc_logging_aws.named_queries.query_template import QueryTemplate + + +@pytest.fixture +def query_template_2024_05_02(): + polling_day = date(2024, 5, 2) + start_of_election_period = date(2024, 4, 1) + return QueryTemplate(polling_day, start_of_election_period) + + +@pytest.fixture +def query_template_2019_12_12(): + polling_day = date(2019, 12, 12) + start_of_election_period = date(2019, 11, 1) + return QueryTemplate(polling_day, start_of_election_period) + + +@pytest.mark.parametrize( + "query_template_fixture, expected_close_of_polls, expected_start_of_polling_week", + [ + ( + "query_template_2024_05_02", + datetime(2024, 5, 2, 22, tzinfo=ZoneInfo("Europe/London")), + datetime(2024, 4, 29, 0, tzinfo=ZoneInfo("Europe/London")), + ), + ( + "query_template_2019_12_12", + datetime(2019, 12, 12, 22, tzinfo=ZoneInfo("Europe/London")), + datetime(2019, 12, 9, 0, tzinfo=ZoneInfo("Europe/London")), + ), + ], +) +def test_named_query_templates( + request, + query_template_fixture, + expected_close_of_polls, + expected_start_of_polling_week, +): + query_template = request.getfixturevalue(query_template_fixture) + + assert ( + query_template.close_of_polls == expected_close_of_polls + ), f"Expected {expected_close_of_polls}, but got {query_template.close_of_polls}" + assert ( + query_template.start_of_polling_week == expected_start_of_polling_week + ), f"Expected {expected_start_of_polling_week}, but got {query_template.start_of_polling_week}" + + +def test_election_period_cte(query_template_2024_05_02): + expected_output = textwrap.dedent( + """ + SELECT * + FROM "dc-wide-logs"."dc_postcode_searches_table" all_logs + WHERE "day" >= '2024/03/31' + AND "day" <= '2024/05/02' + AND all_logs."api_key" != 'UPDOWN_API_KEY' --updown + AND ( + (all_logs."dc_product" != 'WDIV') + OR + (all_logs."dc_product" = 'WDIV' AND replace(all_logs."postcode",' ','') != 'BS44NN') --updown + ) + AND LOWER("calls_devs_dc_api") = 'false' + """ + ).strip() + + result = query_template_2024_05_02.election_period_cte() + assert result == expected_output + + +def test_election_period_cte_no_exclude_api(query_template_2024_05_02): + expected_output = textwrap.dedent( + """ + SELECT * + FROM "dc-wide-logs"."dc_postcode_searches_table" all_logs + WHERE "day" >= '2024/03/31' + AND "day" <= '2024/05/02' + AND all_logs."api_key" != 'UPDOWN_API_KEY' --updown + AND ( + (all_logs."dc_product" != 'WDIV') + OR + (all_logs."dc_product" = 'WDIV' AND replace(all_logs."postcode",' ','') != 'BS44NN') --updown + ) + """ + ).strip() + + result = query_template_2024_05_02.election_period_cte( + exclude_calls_devs_dc_api=False + ) + assert result == expected_output + + +def test_logs_cte(query_template_2024_05_02): + expected_output = """ + SELECT * + FROM ELECTION_PERIOD + WHERE ( + "timestamp" >= cast('2024-04-28 23:00' AS timestamp) + AND "timestamp" <= cast('2024-05-02 21:00' AS timestamp) + AND "dc_product" != 'WDIV' + ) OR ( + "timestamp" >= cast('2024-04-29 00:00' AS timestamp) + AND "timestamp" <= cast('2024-05-02 22:00' AS timestamp) + AND "dc_product" = 'WDIV' + ) + """ + + result = query_template_2024_05_02.logs_cte( + query_template_2024_05_02.start_of_polling_week, + ) + + # Remove leading/trailing whitespace and compare + assert result == textwrap.dedent(expected_output).strip() + + +def test_postcode_search_count(query_template_2024_05_02): + expected_output = textwrap.dedent( + """ + WITH ELECTION_PERIOD AS ( + SELECT * + FROM "dc-wide-logs"."dc_postcode_searches_table" all_logs + WHERE "day" >= '2024/03/31' + AND "day" <= '2024/05/02' + AND all_logs."api_key" != 'UPDOWN_API_KEY' --updown + AND ( + (all_logs."dc_product" != 'WDIV') + OR + (all_logs."dc_product" = 'WDIV' AND replace(all_logs."postcode",' ','') != 'BS44NN') --updown + ) + AND LOWER("calls_devs_dc_api") = 'false' + ), LOGS AS ( + SELECT * + FROM ELECTION_PERIOD + WHERE ( + "timestamp" >= cast('2024-04-28 23:00' AS timestamp) + AND "timestamp" <= cast('2024-05-02 21:00' AS timestamp) + AND "dc_product" != 'WDIV' + ) OR ( + "timestamp" >= cast('2024-04-29 00:00' AS timestamp) + AND "timestamp" <= cast('2024-05-02 22:00' AS timestamp) + AND "dc_product" = 'WDIV' + ) + ) + SELECT + count(*) as count + FROM + LOGS; + """ + ).strip() + + result = query_template_2024_05_02.postcode_search_count( + query_template_2024_05_02.start_of_polling_week + ) + assert result == expected_output + + +def test_product_count_cte(query_template_2024_05_02): + expected_output = textwrap.dedent( + """ + SELECT + count(*) AS count, "dc_product", '' AS key_name, '' AS user_name, '' AS email, utm_source + FROM LOGS + WHERE dc_product = 'WDIV' + GROUP BY "dc_product", "api_key", "utm_source" + UNION SELECT + count(*) AS count, "dc_product", '' AS key_name, '' AS user_name, '' AS email, utm_source + FROM LOGS + WHERE dc_product = 'WCIVF' + GROUP BY "dc_product", "api_key", "utm_source" + UNION SELECT + count(*) AS count, "dc_product", api_users."key_name", api_users."user_name", api_users."email", utm_source + FROM LOGS + JOIN "dc-wide-logs"."api-users-ec-api" as api_users ON LOGS."api_key" = api_users."key" + WHERE dc_product = 'EC_API' + GROUP BY "dc_product", "key_name", "user_name", "utm_source", "email" + UNION SELECT + count(*) AS count, "dc_product", api_users."key_name", api_users."user_name", api_users."email", utm_source + FROM LOGS + JOIN "dc-wide-logs"."api-users-aggregator-api" as api_users ON LOGS."api_key" = api_users."key" + WHERE + dc_product = 'AGGREGATOR_API' + AND api_users."key_name" NOT IN ( + 'EC postcode pages - Dev', 'WhoCanIVoteFor', 'Updown', 'EC API' + ) + GROUP BY "dc_product", "key_name", "user_name", "utm_source", "email" + """ + ).strip() + + result = query_template_2024_05_02.product_count_cte() + + assert result.strip() == expected_output + + +def test_postcode_searches_by_product(query_template_2024_05_02): + expected_output = textwrap.dedent( + """ + WITH ELECTION_PERIOD AS ( + SELECT * + FROM "dc-wide-logs"."dc_postcode_searches_table" all_logs + WHERE "day" >= '2024/03/31' + AND "day" <= '2024/05/02' + AND all_logs."api_key" != 'UPDOWN_API_KEY' --updown + AND ( + (all_logs."dc_product" != 'WDIV') + OR + (all_logs."dc_product" = 'WDIV' AND replace(all_logs."postcode",' ','') != 'BS44NN') --updown + ) + ), LOGS AS ( + SELECT * + FROM ELECTION_PERIOD + WHERE ( + "timestamp" >= cast('2024-04-28 23:00' AS timestamp) + AND "timestamp" <= cast('2024-05-02 21:00' AS timestamp) + AND "dc_product" != 'WDIV' + ) OR ( + "timestamp" >= cast('2024-04-29 00:00' AS timestamp) + AND "timestamp" <= cast('2024-05-02 22:00' AS timestamp) + AND "dc_product" = 'WDIV' + ) + ), PRODUCT_COUNTS AS ( + SELECT + count(*) AS count, "dc_product", '' AS key_name, '' AS user_name, '' AS email, utm_source + FROM LOGS + WHERE dc_product = 'WDIV' + GROUP BY "dc_product", "api_key", "utm_source" + UNION SELECT + count(*) AS count, "dc_product", '' AS key_name, '' AS user_name, '' AS email, utm_source + FROM LOGS + WHERE dc_product = 'WCIVF' + GROUP BY "dc_product", "api_key", "utm_source" + UNION SELECT + count(*) AS count, "dc_product", api_users."key_name", api_users."user_name", api_users."email", utm_source + FROM LOGS + JOIN "dc-wide-logs"."api-users-ec-api" as api_users ON LOGS."api_key" = api_users."key" + WHERE dc_product = 'EC_API' + GROUP BY "dc_product", "key_name", "user_name", "utm_source", "email" + UNION SELECT + count(*) AS count, "dc_product", api_users."key_name", api_users."user_name", api_users."email", utm_source + FROM LOGS + JOIN "dc-wide-logs"."api-users-aggregator-api" as api_users ON LOGS."api_key" = api_users."key" + WHERE + dc_product = 'AGGREGATOR_API' + AND api_users."key_name" NOT IN ( + 'EC postcode pages - Dev', 'WhoCanIVoteFor', 'Updown', 'EC API' + ) + GROUP BY "dc_product", "key_name", "user_name", "utm_source", "email" + ) + SELECT * + FROM + PRODUCT_COUNTS + ORDER BY count DESC; + """ + ).strip() + + result = query_template_2024_05_02.postcode_searches_by_product( + query_template_2024_05_02.start_of_polling_week + ) + assert result == expected_output + + +def test_postcode_searches_by_local_authority(query_template_2024_05_02): + expected_output = textwrap.dedent( + """ + WITH ELECTION_PERIOD AS ( + SELECT * + FROM "dc-wide-logs"."dc_postcode_searches_table" all_logs + WHERE "day" >= '2024/03/31' + AND "day" <= '2024/05/02' + AND all_logs."api_key" != 'UPDOWN_API_KEY' --updown + AND ( + (all_logs."dc_product" != 'WDIV') + OR + (all_logs."dc_product" = 'WDIV' AND replace(all_logs."postcode",' ','') != 'BS44NN') --updown + ) + AND LOWER("calls_devs_dc_api") = 'false' + ), LOGS AS ( + SELECT * + FROM ELECTION_PERIOD + WHERE ( + "timestamp" >= cast('2024-04-28 23:00' AS timestamp) + AND "timestamp" <= cast('2024-05-02 21:00' AS timestamp) + AND "dc_product" != 'WDIV' + ) OR ( + "timestamp" >= cast('2024-04-29 00:00' AS timestamp) + AND "timestamp" <= cast('2024-05-02 22:00' AS timestamp) + AND "dc_product" = 'WDIV' + ) + ) + SELECT + oslaua as gss, count(*) as postcode_searches + FROM + LOGS JOIN "pollingstations.public.data"."onspd-2024-feb" + ON upper(replace(replace("postcode",' ', '' ),'+','')) = upper(replace( "pcds",' ', '')) + GROUP BY oslaua + ORDER BY postcode_searches DESC; + """ + ).strip() + + result = query_template_2024_05_02.postcode_searches_by_local_authority( + query_template_2024_05_02.start_of_polling_week + ) + assert result == expected_output + + +def test_postcode_searches_by_constituency(query_template_2024_05_02): + expected_output = textwrap.dedent( + """ + WITH ELECTION_PERIOD AS ( + SELECT * + FROM "dc-wide-logs"."dc_postcode_searches_table" all_logs + WHERE "day" >= '2024/03/31' + AND "day" <= '2024/05/02' + AND all_logs."api_key" != 'UPDOWN_API_KEY' --updown + AND ( + (all_logs."dc_product" != 'WDIV') + OR + (all_logs."dc_product" = 'WDIV' AND replace(all_logs."postcode",' ','') != 'BS44NN') --updown + ) + AND LOWER("calls_devs_dc_api") = 'false' + ), LOGS AS ( + SELECT * + FROM ELECTION_PERIOD + WHERE ( + "timestamp" >= cast('2024-04-28 23:00' AS timestamp) + AND "timestamp" <= cast('2024-05-02 21:00' AS timestamp) + AND "dc_product" != 'WDIV' + ) OR ( + "timestamp" >= cast('2024-04-29 00:00' AS timestamp) + AND "timestamp" <= cast('2024-05-02 22:00' AS timestamp) + AND "dc_product" = 'WDIV' + ) + ) + SELECT + c.name, + c.official_identifier, + count(*) as postcode_searches + FROM + LOGS + JOIN "ee.public.data"."postcode-to-2024-parl-constituency" c + ON upper(replace(replace(LOGS."postcode",' ', '' ),'+','')) = upper(replace(c.pcds,' ', '')) + GROUP BY c.name, c.official_identifier + ORDER BY postcode_searches DESC; + """ + ).strip() + + result = query_template_2024_05_02.postcode_searches_by_constituency( + query_template_2024_05_02.start_of_polling_week + ) + assert result == expected_output From d73fb6fe2def94cd30df3311864c3111cb9ac1ac Mon Sep 17 00:00:00 2001 From: GeoWill Date: Wed, 3 Jul 2024 15:18:08 +0100 Subject: [PATCH 2/7] Command to create queries for election --- dc_logging_aws/named_queries/README.md | 20 +++ .../named_queries/commands/__init__.py | 0 .../commands/create_election_query_files.py | 143 ++++++++++++++++++ 3 files changed, 163 insertions(+) create mode 100644 dc_logging_aws/named_queries/README.md create mode 100644 dc_logging_aws/named_queries/commands/__init__.py create mode 100644 dc_logging_aws/named_queries/commands/create_election_query_files.py diff --git a/dc_logging_aws/named_queries/README.md b/dc_logging_aws/named_queries/README.md new file mode 100644 index 0000000..a9ce295 --- /dev/null +++ b/dc_logging_aws/named_queries/README.md @@ -0,0 +1,20 @@ +### Make report Queries + +You'll probably need to update some table names, but this is the gist of it: + +Make the files: + +eg + +```shell +python dc_logging_aws/named_queries/commands/create_election_query_files.py 2024-05-02 2024-04-01 +``` + +or + +```shell +python dc_logging_aws/named_queries/commands/create_election_query_files.py 2024-07-04 2024-05-22 +``` + +Check them + diff --git a/dc_logging_aws/named_queries/commands/__init__.py b/dc_logging_aws/named_queries/commands/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dc_logging_aws/named_queries/commands/create_election_query_files.py b/dc_logging_aws/named_queries/commands/create_election_query_files.py new file mode 100644 index 0000000..108ac21 --- /dev/null +++ b/dc_logging_aws/named_queries/commands/create_election_query_files.py @@ -0,0 +1,143 @@ +import argparse +import sys +from datetime import datetime, time +from pathlib import Path + +# Add the parent directory to the Python path +script_dir = Path(__file__).resolve().parent +project_root = script_dir.parent.parent.parent +sys.path.insert(0, str(project_root)) + +from dc_logging_aws.named_queries.query_template import ( # noqa: E402 + QueryTemplate, +) + + +class QueryFileCreator: + def __init__(self, polling_day, start_of_election_period, overwrite=False): + self.polling_day = polling_day + self.start_of_election_period = start_of_election_period + self.overwrite = overwrite + self.date_str = polling_day.strftime("%Y-%m-%d") + self.query_template = QueryTemplate( + polling_day, start_of_election_period + ) + self.script_dir = Path(__file__).resolve().parent + + @staticmethod + def valid_date(s): + try: + return datetime.strptime(s, "%Y-%m-%d").date() + except ValueError: + msg = f"Not a valid date: '{s}'. Please use YYYY-MM-DD format." + raise argparse.ArgumentTypeError(msg) + + def create_query_directory(self): + queries_dir = self.script_dir.parent / "queries" + directory = queries_dir / self.date_str + directory.mkdir(parents=True, exist_ok=True) + return directory + + def create_query_files(self): + directory = self.create_query_directory() + + files_to_create = { + "election-week-count.sql": self.query_template.postcode_search_count( + self.query_template.start_of_polling_week + ), + "election-day-count.sql": self.query_template.postcode_search_count( + datetime.combine(self.polling_day, time(0, 0)).replace( + tzinfo=self.query_template.close_of_polls.tzinfo + ) + ), + "election-period-count.sql": self.query_template.postcode_search_count( + datetime.combine( + self.start_of_election_period, time(0, 0) + ).replace(tzinfo=self.query_template.close_of_polls.tzinfo) + ), + "election-week-postcode-searches-by-product.sql": self.query_template.postcode_searches_by_product( + self.query_template.start_of_polling_week + ), + "election-day-postcode-searches-by-product.sql": self.query_template.postcode_searches_by_product( + datetime.combine(self.polling_day, time(0, 0)).replace( + tzinfo=self.query_template.close_of_polls.tzinfo + ) + ), + "election-period-postcode-searches-by-product.sql": self.query_template.postcode_searches_by_product( + datetime.combine( + self.start_of_election_period, time(0, 0) + ).replace(tzinfo=self.query_template.close_of_polls.tzinfo) + ), + "election-week-postcode-searches-by-local-authority.sql": self.query_template.postcode_searches_by_local_authority( + self.query_template.start_of_polling_week + ), + "election-day-postcode-searches-by-local-authority.sql": self.query_template.postcode_searches_by_local_authority( + datetime.combine(self.polling_day, time(0, 0)).replace( + tzinfo=self.query_template.close_of_polls.tzinfo + ) + ), + "election-period-postcode-searches-by-local-authority.sql": self.query_template.postcode_searches_by_local_authority( + datetime.combine( + self.start_of_election_period, time(0, 0) + ).replace(tzinfo=self.query_template.close_of_polls.tzinfo) + ), + "election-week-postcode-searches-by-constituency.sql": self.query_template.postcode_searches_by_constituency( + self.query_template.start_of_polling_week + ), + "election-day-postcode-searches-by-constituency.sql": self.query_template.postcode_searches_by_constituency( + datetime.combine(self.polling_day, time(0, 0)).replace( + tzinfo=self.query_template.close_of_polls.tzinfo + ) + ), + "election-period-postcode-searches-by-constituency.sql": self.query_template.postcode_searches_by_constituency( + datetime.combine( + self.start_of_election_period, time(0, 0) + ).replace(tzinfo=self.query_template.close_of_polls.tzinfo) + ), + } + + for filename, content in files_to_create.items(): + file_path = directory / filename + if file_path.exists() and not self.overwrite: + print( + f"File {file_path} already exists. Use --overwrite to replace existing files." + ) + else: + with open(file_path, "w") as f: + f.write(content) + print(f"Created {filename} in {directory}") + + print(f"Process completed for {self.date_str}") + + @classmethod + def handle(cls): + parser = argparse.ArgumentParser( + description="Create election query files for a specific polling day." + ) + parser.add_argument( + "polling_day", + type=cls.valid_date, + help="The polling day in YYYY-MM-DD format", + ) + parser.add_argument( + "start_of_election_period", + type=cls.valid_date, + help="The start date of the election period in YYYY-MM-DD format", + ) + parser.add_argument( + "-o", + "--overwrite", + action="store_true", + help="Overwrite existing files", + ) + + args = parser.parse_args() + + creator = cls( + args.polling_day, args.start_of_election_period, args.overwrite + ) + creator.create_query_files() + + +if __name__ == "__main__": + QueryFileCreator.handle() From c562c80f0b35634a7ac9408ed1ff57c88fab4ae4 Mon Sep 17 00:00:00 2001 From: GeoWill Date: Wed, 3 Jul 2024 15:18:31 +0100 Subject: [PATCH 3/7] Command to sync query files up to athena --- dc_logging_aws/named_queries/README.md | 5 + .../commands/create_athena_queries.py | 117 ++++++++++++++++++ 2 files changed, 122 insertions(+) create mode 100644 dc_logging_aws/named_queries/commands/create_athena_queries.py diff --git a/dc_logging_aws/named_queries/README.md b/dc_logging_aws/named_queries/README.md index a9ce295..a34b587 100644 --- a/dc_logging_aws/named_queries/README.md +++ b/dc_logging_aws/named_queries/README.md @@ -18,3 +18,8 @@ python dc_logging_aws/named_queries/commands/create_election_query_files.py 2024 Check them +Then send to Athena: + +```shell +UPDOWN_API_KEY=1234 python dc_logging_aws/named_queries/commands/create_athena_queries.py --profile prod-monitoring-dc 2024-07-04 +``` \ No newline at end of file diff --git a/dc_logging_aws/named_queries/commands/create_athena_queries.py b/dc_logging_aws/named_queries/commands/create_athena_queries.py new file mode 100644 index 0000000..50e8d4d --- /dev/null +++ b/dc_logging_aws/named_queries/commands/create_athena_queries.py @@ -0,0 +1,117 @@ +import argparse +import os +from pathlib import Path + +import boto3 + + +def get_existing_queries(athena_client): + existing_queries = {} + print("Getting existing queries...") + paginator = athena_client.get_paginator("list_named_queries") + for page in paginator.paginate(): + for query_id in page["NamedQueryIds"]: + query = athena_client.get_named_query(NamedQueryId=query_id) + existing_queries[query["NamedQuery"]["Name"]] = query_id + return existing_queries + + +def get_queries_dir(subdirectory): + script_path = Path(__file__).resolve() + queries_dir = script_path.parent.parent / "queries" / subdirectory + + if not queries_dir.is_dir(): + raise FileNotFoundError( + f"Error: Directory '{queries_dir}' does not exist." + ) + + return queries_dir + + +def update_named_query( + athena_client, query_name, query_string, existing_queries +): + query_id = existing_queries[query_name] + try: + athena_client.update_named_query( + NamedQueryId=query_id, + Name=query_name, + QueryString=query_string, + ) + print(f"Updated named query: {query_name}") + except Exception as e: + print(f"Error updating named query '{query_name}': {str(e)}") + + +def create_named_query(athena_client, query_name, query_string): + try: + athena_client.create_named_query( + Name=query_name, + Database="dc-wide-logs", + QueryString=query_string, + ) + print(f"Created named query: {query_name}") + except athena_client.exceptions.NamedQueryAlreadyExistsException: + print( + f"Named query '{query_name}' already exists. Use update function to modify." + ) + except Exception as e: + print(f"Error creating named query '{query_name}': {str(e)}") + + +def get_query_string(file_path): + query_string = file_path.read_text() + + if updown_api_key := os.environ.get("UPDOWN_API_KEY"): + query_string = query_string.replace("UPDOWN_API_KEY", updown_api_key) + return query_string + + +def create_athena_queries(subdirectory, profile, overwrite): + queries_dir = get_queries_dir(subdirectory) + + session = boto3.Session(profile_name=profile) + athena_client = session.client("athena") + + # Get list of existing named queries + existing_queries = get_existing_queries(athena_client) + + for file_path in queries_dir.glob("*.sql"): + query_name = f"{subdirectory}/{file_path.stem}" + query_string = get_query_string(file_path) + + if query_name in existing_queries and not overwrite: + print( + f"Query '{query_name}' already exists. Use --overwrite to replace it." + ) + continue + + if query_name in existing_queries: + update_named_query( + athena_client, query_name, query_string, existing_queries + ) + continue + + create_named_query(athena_client, query_name, query_string) + + +def handle(): + parser = argparse.ArgumentParser( + description="Create Athena named queries from SQL files in a specified subdirectory." + ) + parser.add_argument( + "subdirectory", help="Name of the subdirectory in the queries folder" + ) + parser.add_argument("--profile", required=True, help="AWS profile to use") + parser.add_argument( + "--overwrite", + action="store_true", + help="Overwrite existing queries with the same name", + ) + args = parser.parse_args() + + create_athena_queries(args.subdirectory, args.profile, args.overwrite) + + +if __name__ == "__main__": + handle() From 9bbd7a55b80893469aef804eb567b6e580ccda93 Mon Sep 17 00:00:00 2001 From: GeoWill Date: Thu, 4 Jul 2024 19:31:55 +0100 Subject: [PATCH 4/7] Command to run queries and save results --- .gitignore | 3 + dc_logging_aws/named_queries/README.md | 7 + .../named_queries/commands/run_queries.py | 128 ++++++++++++++++++ 3 files changed, 138 insertions(+) create mode 100644 dc_logging_aws/named_queries/commands/run_queries.py diff --git a/.gitignore b/.gitignore index a9dd61f..f79835d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ +# Query results +dc_logging_aws/named_queries/results + # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] diff --git a/dc_logging_aws/named_queries/README.md b/dc_logging_aws/named_queries/README.md index a34b587..88bacad 100644 --- a/dc_logging_aws/named_queries/README.md +++ b/dc_logging_aws/named_queries/README.md @@ -22,4 +22,11 @@ Then send to Athena: ```shell UPDOWN_API_KEY=1234 python dc_logging_aws/named_queries/commands/create_athena_queries.py --profile prod-monitoring-dc 2024-07-04 +``` + + +Run them + +```shell +RESULTS_BUCKET=**** python dc_logging_aws/named_queries/commands/run_queries.py 2024-07-04 --profile prod-monitoring-dc ``` \ No newline at end of file diff --git a/dc_logging_aws/named_queries/commands/run_queries.py b/dc_logging_aws/named_queries/commands/run_queries.py new file mode 100644 index 0000000..92eb4db --- /dev/null +++ b/dc_logging_aws/named_queries/commands/run_queries.py @@ -0,0 +1,128 @@ +import argparse +import csv +import os +import time +from pathlib import Path + +import boto3 + +# Get the S3 bucket name from environment variable +RESULTS_BUCKET = os.environ.get( + "ATHENA_RESULTS_BUCKET", "dc-monitoring-query-results" +) + + +def get_query_execution(client, query_execution_id): + return client.get_query_execution(QueryExecutionId=query_execution_id) + + +def wait_for_query_to_complete(client, query_execution_id): + while True: + query_execution = get_query_execution(client, query_execution_id) + state = query_execution["QueryExecution"]["Status"]["State"] + if state in ["SUCCEEDED", "FAILED", "CANCELLED"]: + return state + time.sleep(5) + + +def save_query_results(client, query_execution_id, output_location): + result = client.get_query_results(QueryExecutionId=query_execution_id) + rows = result["ResultSet"]["Rows"] + csv_rows = [ + [col.get("VarCharValue", "") for col in row["Data"]] for row in rows + ] + output_location.parent.mkdir(parents=True, exist_ok=True) + with open(output_location, "w", newline="") as out_file: + csv_writer = csv.writer(out_file, delimiter="\t") + csv_writer.writerows(csv_rows) + + +def run_election_queries(election_date, profile): + start_time = time.time() + session = boto3.Session(profile_name=profile) + athena_client = session.client("athena") + + base_dir = Path("dc_logging_aws/named_queries") + results_dir = base_dir / "results" / election_date + + query_types = { + "count": results_dir / "counts", + "postcode-searches-by-product": results_dir / "product_searches", + "postcode-searches-by-local-authority": results_dir + / "local_authority_searches", + "postcode-searches-by-constituency": results_dir + / "constituency_searches", + } + + named_queries = athena_client.list_named_queries() + + print(f"Running queries that start with {election_date}") + query_attempt_count = 0 + query_success_count = 0 + query_failure_count = 0 + execution_ids = {} + for query_id in named_queries["NamedQueryIds"]: + query = athena_client.get_named_query(NamedQueryId=query_id) + query_name = query["NamedQuery"]["Name"] + + if query_name.startswith(election_date): + print(f"Starting query: {query_name}") + + response = athena_client.start_query_execution( + QueryString=query["NamedQuery"]["QueryString"], + QueryExecutionContext={"Database": "dc-wide-logs"}, + ResultConfiguration={ + "OutputLocation": f"s3://{RESULTS_BUCKET}/" + }, + ) + + query_execution_id = response["QueryExecutionId"] + execution_ids[query_name] = query_execution_id + query_attempt_count += 1 + + for query_name, query_execution_id in execution_ids.items(): + query_status = wait_for_query_to_complete( + athena_client, query_execution_id + ) + + if query_status == "SUCCEEDED": + for query_type, output_dir in query_types.items(): + if query_type in query_name: + output_file = ( + output_dir + / f"{query_name.replace(f'{election_date}/', '')}.tsv" + ) + save_query_results( + athena_client, query_execution_id, output_file + ) + print(f"Results saved to: {output_file}") + query_success_count += 1 + break + else: + query_failure_count += 1 + print(f"Query failed with status: {query_status}") + + end_time = time.time() + duration = end_time - start_time + + print(f"{query_attempt_count} queries attempted") + print(f"{query_success_count} queries successfully run") + print(f"{query_failure_count} queries failed") + print(f"Total execution time: {duration:.2f} seconds") + + +def handle(): + parser = argparse.ArgumentParser( + description="Run Athena queries for a specific election and save results." + ) + parser.add_argument( + "election_date", help="The election date in YYYY-MM-DD format" + ) + parser.add_argument("--profile", required=True, help="AWS profile to use") + args = parser.parse_args() + + run_election_queries(args.election_date, args.profile) + + +if __name__ == "__main__": + handle() From a805ff0508e36d4883a7968927e0e4a0a65431b9 Mon Sep 17 00:00:00 2001 From: GeoWill Date: Thu, 4 Jul 2024 20:59:48 +0100 Subject: [PATCH 5/7] add timeseries queries --- .../commands/create_election_query_files.py | 35 ++++++++++- .../named_queries/commands/run_queries.py | 1 + .../named_queries/query_template.py | 61 +++++++++++++++---- 3 files changed, 85 insertions(+), 12 deletions(-) diff --git a/dc_logging_aws/named_queries/commands/create_election_query_files.py b/dc_logging_aws/named_queries/commands/create_election_query_files.py index 108ac21..c43c5c9 100644 --- a/dc_logging_aws/named_queries/commands/create_election_query_files.py +++ b/dc_logging_aws/named_queries/commands/create_election_query_files.py @@ -41,7 +41,7 @@ def create_query_directory(self): def create_query_files(self): directory = self.create_query_directory() - files_to_create = { + count_files = { "election-week-count.sql": self.query_template.postcode_search_count( self.query_template.start_of_polling_week ), @@ -55,6 +55,9 @@ def create_query_files(self): self.start_of_election_period, time(0, 0) ).replace(tzinfo=self.query_template.close_of_polls.tzinfo) ), + } + + by_product_files = { "election-week-postcode-searches-by-product.sql": self.query_template.postcode_searches_by_product( self.query_template.start_of_polling_week ), @@ -68,6 +71,9 @@ def create_query_files(self): self.start_of_election_period, time(0, 0) ).replace(tzinfo=self.query_template.close_of_polls.tzinfo) ), + } + + by_local_authority = { "election-week-postcode-searches-by-local-authority.sql": self.query_template.postcode_searches_by_local_authority( self.query_template.start_of_polling_week ), @@ -81,6 +87,9 @@ def create_query_files(self): self.start_of_election_period, time(0, 0) ).replace(tzinfo=self.query_template.close_of_polls.tzinfo) ), + } + + by_constituency = { "election-week-postcode-searches-by-constituency.sql": self.query_template.postcode_searches_by_constituency( self.query_template.start_of_polling_week ), @@ -96,6 +105,30 @@ def create_query_files(self): ), } + by_product_timeseries = { + "election-week-timeseries-by-product.sql": self.query_template.postcode_timeseries_by_product( + self.query_template.start_of_polling_week + ), + "election-day-timeseries-by-product.sql": self.query_template.postcode_timeseries_by_product( + datetime.combine(self.polling_day, time(0, 0)).replace( + tzinfo=self.query_template.close_of_polls.tzinfo + ), + ), + "election-period-timeseries-by-product.sql": self.query_template.postcode_timeseries_by_product( + datetime.combine( + self.start_of_election_period, time(0, 0) + ).replace(tzinfo=self.query_template.close_of_polls.tzinfo), + ), + } + + files_to_create = { + **count_files, + **by_product_files, + **by_local_authority, + **by_constituency, + **by_product_timeseries, + } + for filename, content in files_to_create.items(): file_path = directory / filename if file_path.exists() and not self.overwrite: diff --git a/dc_logging_aws/named_queries/commands/run_queries.py b/dc_logging_aws/named_queries/commands/run_queries.py index 92eb4db..1b43c2d 100644 --- a/dc_logging_aws/named_queries/commands/run_queries.py +++ b/dc_logging_aws/named_queries/commands/run_queries.py @@ -52,6 +52,7 @@ def run_election_queries(election_date, profile): / "local_authority_searches", "postcode-searches-by-constituency": results_dir / "constituency_searches", + "timeseries-by-product": results_dir / "time_series_by_product", } named_queries = athena_client.list_named_queries() diff --git a/dc_logging_aws/named_queries/query_template.py b/dc_logging_aws/named_queries/query_template.py index bcae437..9bc3cda 100644 --- a/dc_logging_aws/named_queries/query_template.py +++ b/dc_logging_aws/named_queries/query_template.py @@ -86,10 +86,17 @@ def election_period_cte(self, exclude_calls_devs_dc_api=True): cte = cte + """\n AND LOWER("calls_devs_dc_api") = 'false'""" return cte - def logs_cte(self, start_time, from_source="ELECTION_PERIOD"): + def logs_cte( + self, start_time, from_source="ELECTION_PERIOD", timeseries=False + ): + extra_select = "" + if timeseries: + extra_select = ( + ", date_format(timestamp, '%Y-%m-%d %H') as hour_segment" + ) return textwrap.dedent( f""" - SELECT * + SELECT *{extra_select} FROM {from_source} WHERE ( "timestamp" >= cast('{utc_athena_time(start_time)}' AS timestamp) @@ -122,27 +129,32 @@ def postcode_search_count(self, start_time): """ ).strip() - def product_count_cte(self, logs_cte_name="LOGS"): + def product_count_cte(self, logs_cte_name="LOGS", timeseries=False): + extra_select = "" + extra_group_by = "" + if timeseries: + extra_select = ", hour_segment" + extra_group_by = ", hour_segment" return textwrap.dedent( f""" SELECT - count(*) AS count, "dc_product", '' AS key_name, '' AS user_name, '' AS email, utm_source + count(*) AS count, "dc_product", '' AS key_name, '' AS user_name, '' AS email, utm_source{extra_select} FROM {logs_cte_name} WHERE dc_product = 'WDIV' - GROUP BY "dc_product", "api_key", "utm_source" + GROUP BY "dc_product", "api_key", "utm_source"{extra_group_by} UNION SELECT - count(*) AS count, "dc_product", '' AS key_name, '' AS user_name, '' AS email, utm_source + count(*) AS count, "dc_product", '' AS key_name, '' AS user_name, '' AS email, utm_source{extra_select} FROM {logs_cte_name} WHERE dc_product = 'WCIVF' - GROUP BY "dc_product", "api_key", "utm_source" + GROUP BY "dc_product", "api_key", "utm_source"{extra_group_by} UNION SELECT - count(*) AS count, "dc_product", api_users."key_name", api_users."user_name", api_users."email", utm_source + count(*) AS count, "dc_product", api_users."key_name", api_users."user_name", api_users."email", utm_source{extra_select} FROM {logs_cte_name} JOIN "dc-wide-logs"."api-users-ec-api" as api_users ON {logs_cte_name}."api_key" = api_users."key" WHERE dc_product = 'EC_API' - GROUP BY "dc_product", "key_name", "user_name", "utm_source", "email" + GROUP BY "dc_product", "key_name", "user_name", "utm_source", "email"{extra_group_by} UNION SELECT - count(*) AS count, "dc_product", api_users."key_name", api_users."user_name", api_users."email", utm_source + count(*) AS count, "dc_product", api_users."key_name", api_users."user_name", api_users."email", utm_source{extra_select} FROM {logs_cte_name} JOIN "dc-wide-logs"."api-users-aggregator-api" as api_users ON {logs_cte_name}."api_key" = api_users."key" WHERE @@ -150,7 +162,7 @@ def product_count_cte(self, logs_cte_name="LOGS"): AND api_users."key_name" NOT IN ( 'EC postcode pages - Dev', 'WhoCanIVoteFor', 'Updown', 'EC API' ) - GROUP BY "dc_product", "key_name", "user_name", "utm_source", "email" + GROUP BY "dc_product", "key_name", "user_name", "utm_source", "email"{extra_group_by} """ ).strip() @@ -181,6 +193,33 @@ def postcode_searches_by_product(self, start_time): """ ).strip() + def postcode_timeseries_by_product(self, start_time): + election_period_cte = indent_cte_string( + self.election_period_cte(exclude_calls_devs_dc_api=False), 12 + ) + + logs_indent_cte_string = indent_cte_string( + self.logs_cte(start_time, timeseries=True), 12 + ) + product_indent_cte_string = indent_cte_string( + self.product_count_cte(timeseries=True), 12 + ) + return textwrap.dedent( + f""" + WITH ELECTION_PERIOD AS ( + {textwrap.indent(election_period_cte, ' ')} + ), LOGS AS ( + {textwrap.indent(logs_indent_cte_string, ' ')} + ), PRODUCT_COUNTS AS ( + {textwrap.indent(product_indent_cte_string, ' ')} + ) + SELECT hour_segment, count, dc_product, key_name, email, utm_source + FROM + PRODUCT_COUNTS + ORDER BY hour_segment, dc_product,key_name, email, utm_source; + """ + ).strip() + def postcode_searches_by_local_authority(self, start_time: datetime): election_period_cte = indent_cte_string(self.election_period_cte(), 12) logs_indent_cte_string = indent_cte_string( From 89feacc8fb4bfd655bc2f4ee619e27c511d7f28b Mon Sep 17 00:00:00 2001 From: GeoWill Date: Thu, 4 Jul 2024 21:12:39 +0100 Subject: [PATCH 6/7] Add pagination to results --- .../named_queries/commands/run_queries.py | 40 +++++++++++++++---- 1 file changed, 33 insertions(+), 7 deletions(-) diff --git a/dc_logging_aws/named_queries/commands/run_queries.py b/dc_logging_aws/named_queries/commands/run_queries.py index 1b43c2d..da6bfcd 100644 --- a/dc_logging_aws/named_queries/commands/run_queries.py +++ b/dc_logging_aws/named_queries/commands/run_queries.py @@ -26,15 +26,42 @@ def wait_for_query_to_complete(client, query_execution_id): def save_query_results(client, query_execution_id, output_location): - result = client.get_query_results(QueryExecutionId=query_execution_id) - rows = result["ResultSet"]["Rows"] - csv_rows = [ - [col.get("VarCharValue", "") for col in row["Data"]] for row in rows - ] output_location.parent.mkdir(parents=True, exist_ok=True) with open(output_location, "w", newline="") as out_file: + page_number = 1 + print(f"Fetching page {page_number} of results for {output_location.name}") csv_writer = csv.writer(out_file, delimiter="\t") - csv_writer.writerows(csv_rows) + + # Get the first page of results + response = client.get_query_results(QueryExecutionId=query_execution_id) + + # Write the header + header = [ + col["Label"] + for col in response["ResultSet"]["ResultSetMetadata"]["ColumnInfo"] + ] + csv_writer.writerow(header) + + # Write the data from the first page + for row in response["ResultSet"]["Rows"][1:]: # Skip the header row + csv_writer.writerow( + [col.get("VarCharValue", "") for col in row["Data"]] + ) + + # Continue fetching and writing data while there's a NextToken + while "NextToken" in response: + page_number += 1 + print(f"Fetching page {page_number} of results for {output_location.name}") + response = client.get_query_results( + QueryExecutionId=query_execution_id, + NextToken=response["NextToken"], + ) + for row in response["ResultSet"]["Rows"]: + csv_writer.writerow( + [col.get("VarCharValue", "") for col in row["Data"]] + ) + + print(f"Results saved to: {output_location}") def run_election_queries(election_date, profile): @@ -96,7 +123,6 @@ def run_election_queries(election_date, profile): save_query_results( athena_client, query_execution_id, output_file ) - print(f"Results saved to: {output_file}") query_success_count += 1 break else: From d05114bd1f893270eeee20adf0091d7b26f72e96 Mon Sep 17 00:00:00 2001 From: GeoWill Date: Thu, 11 Jul 2024 09:04:08 +0100 Subject: [PATCH 7/7] fixup! add timeseries queries --- dc_logging_aws/named_queries/commands/run_queries.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/dc_logging_aws/named_queries/commands/run_queries.py b/dc_logging_aws/named_queries/commands/run_queries.py index da6bfcd..605a23d 100644 --- a/dc_logging_aws/named_queries/commands/run_queries.py +++ b/dc_logging_aws/named_queries/commands/run_queries.py @@ -29,7 +29,9 @@ def save_query_results(client, query_execution_id, output_location): output_location.parent.mkdir(parents=True, exist_ok=True) with open(output_location, "w", newline="") as out_file: page_number = 1 - print(f"Fetching page {page_number} of results for {output_location.name}") + print( + f"Fetching page {page_number} of results for {output_location.name}" + ) csv_writer = csv.writer(out_file, delimiter="\t") # Get the first page of results @@ -51,7 +53,9 @@ def save_query_results(client, query_execution_id, output_location): # Continue fetching and writing data while there's a NextToken while "NextToken" in response: page_number += 1 - print(f"Fetching page {page_number} of results for {output_location.name}") + print( + f"Fetching page {page_number} of results for {output_location.name}" + ) response = client.get_query_results( QueryExecutionId=query_execution_id, NextToken=response["NextToken"],