Skip to content

Commit

Permalink
retry wait for result independently from job creation
Browse files Browse the repository at this point in the history
  • Loading branch information
Kayrnt committed Dec 6, 2023
1 parent c489332 commit 36051fb
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 32 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20231206-172009.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: retry wait for result independently from job creation
time: 2023-12-06T17:20:09.264805+01:00
custom:
Author: Kayrnt
Issue: "1045"
34 changes: 18 additions & 16 deletions dbt/adapters/bigquery/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,17 +492,23 @@ def raw_execute(
job_creation_timeout = self.get_job_creation_timeout_seconds(conn)
job_execution_timeout = self.get_job_execution_timeout_seconds(conn)

def fn():
def job_creation_fn():
return self._query_and_results(
client,
sql,
job_params,
job_creation_timeout=job_creation_timeout,
client, sql, job_params, job_creation_timeout=job_creation_timeout
)

query_job = self._retry_and_handle(msg=sql, conn=conn, fn=job_creation_fn)

def wait_for_job_results_fn():
return self._wait_for_results(
query_job=query_job,
job_execution_timeout=job_execution_timeout,
limit=limit,
)

query_job, iterator = self._retry_and_handle(msg=sql, conn=conn, fn=fn)
query_job, iterator = self._retry_and_handle(
msg=sql, conn=conn, fn=wait_for_job_results_fn
)

return query_job, iterator

Expand Down Expand Up @@ -729,15 +735,7 @@ def query_schemas():

return self._retry_and_handle(msg="list dataset", conn=conn, fn=query_schemas)

def _query_and_results(
self,
client,
sql,
job_params,
job_creation_timeout=None,
job_execution_timeout=None,
limit: Optional[int] = None,
):
def _query_and_results(self, client, sql, job_params, job_creation_timeout=None):
"""Query the client and wait for results."""
# Cannot reuse job_config if destination is set and ddl is used
job_config = google.cloud.bigquery.QueryJobConfig(**job_params)
Expand All @@ -750,7 +748,11 @@ def _query_and_results(
logger.debug(
self._bq_job_link(query_job.location, query_job.project, query_job.job_id)
)
return query_job

def _wait_for_results(
self, query_job, job_execution_timeout=None, limit: Optional[int] = None
):
# only use async logic if user specifies a timeout
if job_execution_timeout:
loop = asyncio.new_event_loop()
Expand Down Expand Up @@ -787,7 +789,7 @@ def reopen_conn_on_error(error):
target=fn,
predicate=_ErrorCounter(self.get_job_retries(conn)).count_error,
sleep_generator=self._retry_generator(),
deadline=self.get_job_retry_deadline_seconds(conn),
timeout=self.get_job_retry_deadline_seconds(conn),
on_error=reopen_conn_on_error,
)

Expand Down
26 changes: 10 additions & 16 deletions tests/unit/test_bigquery_connection_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def setUp(self):
self.connections = BigQueryConnectionManager(profile=profile)

self.mock_client = Mock(dbt.adapters.bigquery.impl.google.cloud.bigquery.Client)
self.query_job = Mock(dbt.adapters.bigquery.impl.google.cloud.bigquery.QueryJob)
self.mock_connection = MagicMock()

self.mock_connection.handle = self.mock_client
Expand Down Expand Up @@ -114,7 +115,6 @@ def test_query_and_results(self, mock_bq):
"sql",
{"job_param_1": "blah"},
job_creation_timeout=15,
job_execution_timeout=3,
)

mock_bq.QueryJobConfig.assert_called_once()
Expand All @@ -123,23 +123,17 @@ def test_query_and_results(self, mock_bq):
)

@patch("dbt.adapters.bigquery.impl.google.cloud.bigquery")
def test_query_and_results_timeout(self, mock_bq):
self.mock_client.query = Mock(
return_value=Mock(result=lambda *args, **kwargs: time.sleep(4))
)
def test_wait_for_results_timeout(self, mock_bq):
def mock_result(max_results):
time.sleep(3)
return None

self.query_job.result = MagicMock(side_effect=mock_result)

with pytest.raises(dbt.exceptions.DbtRuntimeError) as exc:
self.connections._query_and_results(
self.mock_client,
"sql",
{"job_param_1": "blah"},
job_creation_timeout=15,
job_execution_timeout=1,
)
self.connections._wait_for_results(self.query_job, job_execution_timeout=1, limit=100)

mock_bq.QueryJobConfig.assert_called_once()
self.mock_client.query.assert_called_once_with(
query="sql", job_config=mock_bq.QueryJobConfig(), timeout=15
)
self.query_job.result.assert_called_once_with(max_results=100)
assert "Query exceeded configured timeout of 1s" in str(exc.value)

def test_copy_bq_table_appends(self):
Expand Down

0 comments on commit 36051fb

Please sign in to comment.