From 78dee30051b6a42acd4834eb5093daae1c076a51 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=A9my=20Guiselin?= <9251353+Jrmyy@users.noreply.github.com> Date: Fri, 5 Jan 2024 11:31:17 +0100 Subject: [PATCH] feat: optimize retry if needed (#532) --- dbt/adapters/athena/impl.py | 30 +++++++++++++++---- .../athena/macros/materializations/hooks.sql | 15 ++++++++++ 2 files changed, 39 insertions(+), 6 deletions(-) create mode 100644 dbt/include/athena/macros/materializations/hooks.sql diff --git a/dbt/adapters/athena/impl.py b/dbt/adapters/athena/impl.py index 6ce13fff..76b92794 100755 --- a/dbt/adapters/athena/impl.py +++ b/dbt/adapters/athena/impl.py @@ -29,6 +29,7 @@ from dbt.adapters.athena import AthenaConnectionManager from dbt.adapters.athena.column import AthenaColumn from dbt.adapters.athena.config import get_boto3_config +from dbt.adapters.athena.connections import AthenaCursor from dbt.adapters.athena.constants import LOGGER from dbt.adapters.athena.exceptions import ( S3LocationException, @@ -1247,14 +1248,9 @@ def _get_table_input(table: TableTypeDef) -> TableInputTypeDef: @available def run_query_with_partitions_limit_catching(self, sql: str) -> str: - query = self.connections._add_query_comment(sql) - conn = self.connections.get_thread_connection() - cursor = conn.handle.cursor() - LOGGER.debug(f"Running Athena query:\n{query}") try: - cursor.execute(query, catch_partitions_limit=True) + cursor = self._run_query(sql, catch_partitions_limit=True) except OperationalError as e: - LOGGER.debug(f"CAUGHT EXCEPTION: {e}") if "TOO_MANY_OPEN_PARTITIONS" in str(e): return "TOO_MANY_OPEN_PARTITIONS" raise e @@ -1314,3 +1310,25 @@ def format_value_for_partition(self, value: Any, column_type: str) -> Tuple[str, else: # Raise an error for unsupported column types raise ValueError(f"Unsupported column type: {column_type}") + + @available + def run_optimize_with_partition_limit_catching(self, optimize_query: str) -> None: + while True: + try: + self._run_query(optimize_query, catch_partitions_limit=False) + break + except OperationalError as e: + if "ICEBERG_OPTIMIZE_MORE_RUNS_NEEDED" not in str(e): + raise e + + def _run_query(self, sql: str, catch_partitions_limit: bool) -> AthenaCursor: + query = self.connections._add_query_comment(sql) + conn = self.connections.get_thread_connection() + cursor: AthenaCursor = conn.handle.cursor() + LOGGER.debug(f"Running Athena query:\n{query}") + try: + cursor.execute(query, catch_partitions_limit=catch_partitions_limit) + except OperationalError as e: + LOGGER.debug(f"CAUGHT EXCEPTION: {e}") + raise e + return cursor diff --git a/dbt/include/athena/macros/materializations/hooks.sql b/dbt/include/athena/macros/materializations/hooks.sql new file mode 100644 index 00000000..1c132f9d --- /dev/null +++ b/dbt/include/athena/macros/materializations/hooks.sql @@ -0,0 +1,15 @@ +{% macro run_hooks(hooks, inside_transaction=True) %} + {% set re = modules.re %} + {% for hook in hooks %} + {% set rendered = render(hook.get('sql')) | trim %} + {% if (rendered | length) > 0 %} + {%- if re.match("optimize\W+\w+\W+rewrite data using bin_pack", rendered.lower(), re.MULTILINE) -%} + {%- do adapter.run_optimize_with_partition_limit_catching(rendered) -%} + {%- else -%} + {% call statement(auto_begin=inside_transaction) %} + {{ rendered }} + {% endcall %} + {%- endif -%} + {% endif %} + {% endfor %} +{% endmacro %}