From 01710c80888871702e902a2a8dc9dc5177f6b749 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Wed, 24 Nov 2021 14:22:04 +0100 Subject: [PATCH] Implement retry for query execution (#42) * Retry query execution * Add tenacity as direct dependency --- dbt/adapters/athena/connections.py | 59 ++++++++++++++++++++---------- requirements.txt | 1 + 2 files changed, 41 insertions(+), 19 deletions(-) diff --git a/dbt/adapters/athena/connections.py b/dbt/adapters/athena/connections.py index c05a2887..9b6952f1 100644 --- a/dbt/adapters/athena/connections.py +++ b/dbt/adapters/athena/connections.py @@ -21,6 +21,11 @@ from dbt.adapters.sql import SQLConnectionManager from dbt.exceptions import RuntimeException, FailedToConnectException from dbt.logger import GLOBAL_LOGGER as logger +import tenacity +from tenacity.retry import retry_if_exception +from tenacity.stop import stop_after_attempt +from tenacity.wait import wait_exponential + @dataclass class AthenaCredentials(Credentials): @@ -69,26 +74,42 @@ def execute( cache_size: int = 0, cache_expiration_time: int = 0, ): - query_id = self._execute( - operation, - parameters=parameters, - work_group=work_group, - s3_staging_dir=s3_staging_dir, - cache_size=cache_size, - cache_expiration_time=cache_expiration_time, - ) - query_execution = self._executor.submit(self._collect_result_set, query_id).result() - if query_execution.state == AthenaQueryExecution.STATE_SUCCEEDED: - self.result_set = self._result_set_class( - self._connection, - self._converter, - query_execution, - self.arraysize, - self._retry_config, + def inner(): + query_id = self._execute( + operation, + parameters=parameters, + work_group=work_group, + s3_staging_dir=s3_staging_dir, + cache_size=cache_size, + cache_expiration_time=cache_expiration_time, ) - else: - raise OperationalError(query_execution.state_change_reason) - return self + query_execution = self._executor.submit( + self._collect_result_set, query_id + ).result() + if query_execution.state == AthenaQueryExecution.STATE_SUCCEEDED: + self.result_set = self._result_set_class( + self._connection, + self._converter, + query_execution, + self.arraysize, + self._retry_config, + ) + + else: + raise OperationalError(query_execution.state_change_reason) + return self + + retry = tenacity.Retrying( + retry=retry_if_exception(lambda _: True), + stop=stop_after_attempt(self._retry_config.attempt), + wait=wait_exponential( + multiplier=self._retry_config.attempt, + max=self._retry_config.max_delay, + exp_base=self._retry_config.exponential_base, + ), + reraise=True, + ) + return retry(inner) class AthenaConnectionManager(SQLConnectionManager): diff --git a/requirements.txt b/requirements.txt index 5e07facd..580ac925 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ dbt-core==0.21.0 pyathena==2.2.0 boto3==1.18.12 +tenacity==6.3.1 \ No newline at end of file