Skip to content

Commit

Permalink
Implement retry for query execution (#42)
Browse files Browse the repository at this point in the history
* Retry query execution

* Add tenacity as direct dependency
  • Loading branch information
Dandandan authored Nov 24, 2021
1 parent 3b0d2a8 commit 01710c8
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 19 deletions.
59 changes: 40 additions & 19 deletions dbt/adapters/athena/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@
from dbt.adapters.sql import SQLConnectionManager
from dbt.exceptions import RuntimeException, FailedToConnectException
from dbt.logger import GLOBAL_LOGGER as logger
import tenacity
from tenacity.retry import retry_if_exception
from tenacity.stop import stop_after_attempt
from tenacity.wait import wait_exponential


@dataclass
class AthenaCredentials(Credentials):
Expand Down Expand Up @@ -69,26 +74,42 @@ def execute(
cache_size: int = 0,
cache_expiration_time: int = 0,
):
query_id = self._execute(
operation,
parameters=parameters,
work_group=work_group,
s3_staging_dir=s3_staging_dir,
cache_size=cache_size,
cache_expiration_time=cache_expiration_time,
)
query_execution = self._executor.submit(self._collect_result_set, query_id).result()
if query_execution.state == AthenaQueryExecution.STATE_SUCCEEDED:
self.result_set = self._result_set_class(
self._connection,
self._converter,
query_execution,
self.arraysize,
self._retry_config,
def inner():
query_id = self._execute(
operation,
parameters=parameters,
work_group=work_group,
s3_staging_dir=s3_staging_dir,
cache_size=cache_size,
cache_expiration_time=cache_expiration_time,
)
else:
raise OperationalError(query_execution.state_change_reason)
return self
query_execution = self._executor.submit(
self._collect_result_set, query_id
).result()
if query_execution.state == AthenaQueryExecution.STATE_SUCCEEDED:
self.result_set = self._result_set_class(
self._connection,
self._converter,
query_execution,
self.arraysize,
self._retry_config,
)

else:
raise OperationalError(query_execution.state_change_reason)
return self

retry = tenacity.Retrying(
retry=retry_if_exception(lambda _: True),
stop=stop_after_attempt(self._retry_config.attempt),
wait=wait_exponential(
multiplier=self._retry_config.attempt,
max=self._retry_config.max_delay,
exp_base=self._retry_config.exponential_base,
),
reraise=True,
)
return retry(inner)


class AthenaConnectionManager(SQLConnectionManager):
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
dbt-core==0.21.0
pyathena==2.2.0
boto3==1.18.12
tenacity==6.3.1

0 comments on commit 01710c8

Please sign in to comment.