Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support retryable exceptions during query execution #368

Merged
merged 13 commits into from
Dec 17, 2024
85 changes: 70 additions & 15 deletions dbt/adapters/sql/connections.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
import abc
import time
from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, TYPE_CHECKING
from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, TYPE_CHECKING, Callable, Type, Union

from dbt_common.events.contextvars import get_node_info
from dbt_common.events.functions import fire_event
from dbt_common.exceptions import DbtInternalError, NotImplementedError
from dbt_common.exceptions import DbtInternalError, NotImplementedError, DbtRuntimeError
from dbt_common.utils import cast_to_str

from dbt.adapters.base import BaseConnectionManager
from dbt.adapters.base.connections import SleepTime
from dbt.adapters.contracts.connection import (
AdapterResponse,
Connection,
ConnectionState,
)
from dbt.adapters.events.logging import AdapterLogger
from dbt.adapters.events.types import (
ConnectionUsed,
SQLCommit,
SQLQuery,
SQLQueryStatus,
SQLQueryStatus, AdapterEventDebug,
)

if TYPE_CHECKING:
Expand Down Expand Up @@ -56,11 +58,14 @@ def cancel_open(self) -> List[str]:
return names

def add_query(
self,
sql: str,
auto_begin: bool = True,
bindings: Optional[Any] = None,
abridge_sql_log: bool = False,
self,
sql: str,
auto_begin: bool = True,
bindings: Optional[Any] = None,
abridge_sql_log: bool = False,
retryable_exceptions: Iterable[Type[Exception]] = [],
retry_limit: int = 1,
retry_timeout: Union[Callable[[int], SleepTime], SleepTime] = 1,
) -> Tuple[Connection, Any]:
connection = self.get_thread_connection()
if auto_begin and connection.transaction_open is False:
Expand Down Expand Up @@ -90,7 +95,14 @@ def add_query(
pre = time.perf_counter()

cursor = connection.handle.cursor()
cursor.execute(sql, bindings)
self._retryable_cursor_execute(
execute_fn=cursor.execute,
sql=sql,
bindings=bindings,
retryable_exceptions=retryable_exceptions,
retry_limit=retry_limit,
retry_timeout=retry_timeout
)

result = self.get_response(cursor)

Expand All @@ -113,7 +125,7 @@ def get_response(cls, cursor: Any) -> AdapterResponse:

@classmethod
def process_results(
cls, column_names: Iterable[str], rows: Iterable[Any]
cls, column_names: Iterable[str], rows: Iterable[Any]
) -> Iterator[Dict[str, Any]]:
unique_col_names = dict() # type: ignore[var-annotated]
for idx in range(len(column_names)): # type: ignore[arg-type]
Expand Down Expand Up @@ -145,11 +157,11 @@ def get_result_from_cursor(cls, cursor: Any, limit: Optional[int]) -> "agate.Tab
return table_from_data_flat(data, column_names)

def execute(
self,
sql: str,
auto_begin: bool = False,
fetch: bool = False,
limit: Optional[int] = None,
self,
sql: str,
auto_begin: bool = False,
fetch: bool = False,
limit: Optional[int] = None,
) -> Tuple[AdapterResponse, "agate.Table"]:
from dbt_common.clients.agate_helper import empty_table

Expand Down Expand Up @@ -199,3 +211,46 @@ def commit(self):
connection.transaction_open = False

return connection

def _retryable_cursor_execute(self,
execute_fn: Callable,
sql: str,
bindings: Optional[Any] = None,
retryable_exceptions: Iterable[Type[Exception]] = [],
retry_limit: int = 1,
retry_timeout: Union[Callable[[int], SleepTime], SleepTime] = 1,
_attempts: int = 0,
mikealfare marked this conversation as resolved.
Show resolved Hide resolved
) -> None:
timeout = retry_timeout(_attempts) if callable(retry_timeout) else retry_timeout
if timeout < 0:
mikealfare marked this conversation as resolved.
Show resolved Hide resolved
raise DbtRuntimeError(
"retry_timeout cannot be negative or return a negative time."
)

try:
execute_fn(sql, bindings)

except tuple(retryable_exceptions) as e:
mikealfare marked this conversation as resolved.
Show resolved Hide resolved
retry_limit -= 1
if retry_limit <= 0:
raise e
fire_event(
AdapterEventDebug(
message=f"Got a retryable error {type(e)} when attempting to execute a query.\n"
f"{retry_limit} attempts remaining. Retrying in {timeout} seconds.\n"
f"Error:\n{e}"
)
)

time.sleep(timeout)
return self._retryable_cursor_execute(
execute_fn=execute_fn,
sql=sql,
retry_limit=retry_limit - 1,
VersusFacit marked this conversation as resolved.
Show resolved Hide resolved
retry_timeout=retry_timeout,
retryable_exceptions=retryable_exceptions,
_attempts=_attempts + 1,
VersusFacit marked this conversation as resolved.
Show resolved Hide resolved
)

except Exception as e:
raise e
Loading