From 255271a11f058b087580ce0cf39bef6b80ed1d4f Mon Sep 17 00:00:00 2001 From: Zi Wang Date: Fri, 21 Jul 2023 05:49:13 -0700 Subject: [PATCH 01/13] re:PR https://github.com/dbt-labs/dbt-bigquery/pull/840/files --- dbt/adapters/bigquery/python_submissions.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/dbt/adapters/bigquery/python_submissions.py b/dbt/adapters/bigquery/python_submissions.py index 0c7ce1917..2d27904b3 100644 --- a/dbt/adapters/bigquery/python_submissions.py +++ b/dbt/adapters/bigquery/python_submissions.py @@ -9,6 +9,8 @@ from google.api_core.client_options import ClientOptions from google.cloud import storage, dataproc_v1 # type: ignore from google.protobuf.json_format import ParseDict +import time +import uuid OPERATION_RETRY_TIME = 10 @@ -103,7 +105,6 @@ def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job: } ) response = operation.result(polling=self.result_polling_policy) - # check if job failed if response.status.state == 6: raise ValueError(response.status.details) return response @@ -118,16 +119,27 @@ def _get_job_client(self) -> dataproc_v1.BatchControllerClient: def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job: batch = self._configure_batch() parent = f"projects/{self.credential.execution_project}/locations/{self.credential.dataproc_region}" + batch_id = uuid.uuid4().hex request = dataproc_v1.CreateBatchRequest( parent=parent, batch=batch, + batch_id=batch_id ) # make the request operation = self.job_client.create_batch(request=request) # type: ignore # this takes quite a while, waiting on GCP response to resolve # (not a google-api-core issue, more likely a dataproc serverless issue) - response = operation.result(polling=self.result_polling_policy) + + state = "PENDING" + while state not in ["State.SUCCEEDED", "State.FAILED", "State.CANCELLED"]: + response = self.job_client.get_batch( + request = dataproc_v1.GetBatchRequest(name = ''.join([parent, "/batches/", batch_id])), + # retry=self.retry (This retry polls way too many times per second) + ) + state = str(response.state) + time.sleep(2) + return response # there might be useful results here that we can parse and return # Dataproc job output is saved to the Cloud Storage bucket From 024695cfbdbcb524f4460f3209f0c0b4a33e1cb4 Mon Sep 17 00:00:00 2001 From: Zi Wang Date: Fri, 21 Jul 2023 07:59:24 -0700 Subject: [PATCH 02/13] adding back comment # check if job failed --- dbt/adapters/bigquery/python_submissions.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dbt/adapters/bigquery/python_submissions.py b/dbt/adapters/bigquery/python_submissions.py index 2d27904b3..ebda962da 100644 --- a/dbt/adapters/bigquery/python_submissions.py +++ b/dbt/adapters/bigquery/python_submissions.py @@ -104,6 +104,7 @@ def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job: "job": job, } ) + # check if job failed response = operation.result(polling=self.result_polling_policy) if response.status.state == 6: raise ValueError(response.status.details) From 9d8fe5f3a851cd9b55019e984ffdc9d508f6475f Mon Sep 17 00:00:00 2001 From: Zi Wang Date: Fri, 21 Jul 2023 10:10:56 -0700 Subject: [PATCH 03/13] adding changelog --- .changes/unreleased/Fixes-20230721-101041.yaml | 6 ++++++ dbt/adapters/bigquery/python_submissions.py | 4 ++-- 2 files changed, 8 insertions(+), 2 deletions(-) create mode 100644 .changes/unreleased/Fixes-20230721-101041.yaml diff --git a/.changes/unreleased/Fixes-20230721-101041.yaml b/.changes/unreleased/Fixes-20230721-101041.yaml new file mode 100644 index 000000000..6db81cf50 --- /dev/null +++ b/.changes/unreleased/Fixes-20230721-101041.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: Serverless Spark to Poll with .GetBatch() instead of using operation.result() +time: 2023-07-21T10:10:41.64843-07:00 +custom: + Author: wazi55 + Issue: "734" diff --git a/dbt/adapters/bigquery/python_submissions.py b/dbt/adapters/bigquery/python_submissions.py index ebda962da..0bf701e20 100644 --- a/dbt/adapters/bigquery/python_submissions.py +++ b/dbt/adapters/bigquery/python_submissions.py @@ -128,11 +128,11 @@ def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job: batch_id=batch_id ) # make the request - operation = self.job_client.create_batch(request=request) # type: ignore + self.job_client.create_batch(request=request) # this takes quite a while, waiting on GCP response to resolve # (not a google-api-core issue, more likely a dataproc serverless issue) - state = "PENDING" + state = "State.PENDING" while state not in ["State.SUCCEEDED", "State.FAILED", "State.CANCELLED"]: response = self.job_client.get_batch( request = dataproc_v1.GetBatchRequest(name = ''.join([parent, "/batches/", batch_id])), From 336846c2a11a6605409e901742ab782af8ba3d54 Mon Sep 17 00:00:00 2001 From: Zi Wang Date: Fri, 21 Jul 2023 12:10:02 -0700 Subject: [PATCH 04/13] precommit code format --- dbt/adapters/bigquery/python_submissions.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/dbt/adapters/bigquery/python_submissions.py b/dbt/adapters/bigquery/python_submissions.py index 0bf701e20..b3f7f624a 100644 --- a/dbt/adapters/bigquery/python_submissions.py +++ b/dbt/adapters/bigquery/python_submissions.py @@ -122,11 +122,7 @@ def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job: parent = f"projects/{self.credential.execution_project}/locations/{self.credential.dataproc_region}" batch_id = uuid.uuid4().hex - request = dataproc_v1.CreateBatchRequest( - parent=parent, - batch=batch, - batch_id=batch_id - ) + request = dataproc_v1.CreateBatchRequest(parent=parent, batch=batch, batch_id=batch_id) # make the request self.job_client.create_batch(request=request) # this takes quite a while, waiting on GCP response to resolve @@ -135,7 +131,7 @@ def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job: state = "State.PENDING" while state not in ["State.SUCCEEDED", "State.FAILED", "State.CANCELLED"]: response = self.job_client.get_batch( - request = dataproc_v1.GetBatchRequest(name = ''.join([parent, "/batches/", batch_id])), + request=dataproc_v1.GetBatchRequest(name="".join([parent, "/batches/", batch_id])), # retry=self.retry (This retry polls way too many times per second) ) state = str(response.state) From b857b94add02d099112ce99d04bb8aa56513dccb Mon Sep 17 00:00:00 2001 From: Zi Wang Date: Sat, 19 Aug 2023 14:43:47 -0700 Subject: [PATCH 05/13] sleep(2) first in the while loop before the request to eliminate the last 2 seconds sleep if the response is in one of the 3 options --- dbt/adapters/bigquery/python_submissions.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbt/adapters/bigquery/python_submissions.py b/dbt/adapters/bigquery/python_submissions.py index b3f7f624a..6fd0bb540 100644 --- a/dbt/adapters/bigquery/python_submissions.py +++ b/dbt/adapters/bigquery/python_submissions.py @@ -130,12 +130,12 @@ def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job: state = "State.PENDING" while state not in ["State.SUCCEEDED", "State.FAILED", "State.CANCELLED"]: + time.sleep(2) response = self.job_client.get_batch( request=dataproc_v1.GetBatchRequest(name="".join([parent, "/batches/", batch_id])), - # retry=self.retry (This retry polls way too many times per second) ) state = str(response.state) - time.sleep(2) + return response # there might be useful results here that we can parse and return From ee67d628a5b98a3128c6f6f89d1d1c688f5c0e2e Mon Sep 17 00:00:00 2001 From: Zi Wang Date: Sat, 19 Aug 2023 15:09:13 -0700 Subject: [PATCH 06/13] removing empty spaces --- dbt/adapters/bigquery/python_submissions.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/dbt/adapters/bigquery/python_submissions.py b/dbt/adapters/bigquery/python_submissions.py index 7a0fcf386..317994489 100644 --- a/dbt/adapters/bigquery/python_submissions.py +++ b/dbt/adapters/bigquery/python_submissions.py @@ -138,8 +138,6 @@ def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job: request=dataproc_v1.GetBatchRequest(name="".join([parent, "/batches/", batch_id])), ) state = str(response.state) - - return response # there might be useful results here that we can parse and return # Dataproc job output is saved to the Cloud Storage bucket From 47d820cd7993a1ad6e9fc58bba866702720f29bd Mon Sep 17 00:00:00 2001 From: Colin Date: Mon, 18 Sep 2023 21:06:14 -0700 Subject: [PATCH 07/13] update batch request to handle `GetBatchRequest` --- dbt/adapters/bigquery/python_submissions.py | 40 ++++++++++++------- tests/functional/adapter/test_python_model.py | 2 +- 2 files changed, 26 insertions(+), 16 deletions(-) diff --git a/dbt/adapters/bigquery/python_submissions.py b/dbt/adapters/bigquery/python_submissions.py index 317994489..3feb66a60 100644 --- a/dbt/adapters/bigquery/python_submissions.py +++ b/dbt/adapters/bigquery/python_submissions.py @@ -1,3 +1,4 @@ +from datetime import datetime from typing import Dict, Union from dbt.adapters.base import PythonJobHelper @@ -9,8 +10,8 @@ from google.api_core.client_options import ClientOptions from google.cloud import storage, dataproc_v1 # type: ignore from google.protobuf.json_format import ParseDict +from google.cloud.dataproc_v1.types.batches import Batch import time -import uuid OPERATION_RETRY_TIME = 10 @@ -120,24 +121,33 @@ def _get_job_client(self) -> dataproc_v1.BatchControllerClient: def _get_batch_id(self) -> str: return self.parsed_model["config"].get("batch_id") - def _submit_dataproc_job(self) -> dataproc_v1.types.jobs.Job: + def _submit_dataproc_job(self) -> Batch: batch = self._configure_batch() parent = f"projects/{self.credential.execution_project}/locations/{self.credential.dataproc_region}" - batch_id = uuid.uuid4().hex - - request = dataproc_v1.CreateBatchRequest(parent=parent, batch=batch, batch_id=batch_id) + batch_id = self._get_batch_id() + request = dataproc_v1.CreateBatchRequest(parent=parent, batch=batch, batch_id=batch_id) # type: ignore # make the request - self.job_client.create_batch(request=request) - # this takes quite a while, waiting on GCP response to resolve - # (not a google-api-core issue, more likely a dataproc serverless issue) - - state = "State.PENDING" - while state not in ["State.SUCCEEDED", "State.FAILED", "State.CANCELLED"]: - time.sleep(2) - response = self.job_client.get_batch( - request=dataproc_v1.GetBatchRequest(name="".join([parent, "/batches/", batch_id])), + self.job_client.create_batch(request=request) # type: ignore + # using the creat_batch `.result()` method takes quite a while as it waits for all + # resources to tear down before returning, so we do the polling ourselves. This is a bit hacky but it works. + state = Batch.State.PENDING + response = None + run_time = 0 + while state in [Batch.State.PENDING, Batch.State.RUNNING] and run_time < self.timeout: + time.sleep(1) + response = self.job_client.get_batch( # type: ignore + request=dataproc_v1.GetBatchRequest(name="".join([parent, "/batches/", batch_id])), # type: ignore + ) + run_time = datetime.now().timestamp() - response.create_time.timestamp() # type: ignore + state = response.state + if not response: + raise ValueError("No response from Dataproc") + if run_time >= self.timeout: + raise ValueError( + f"Operation did not complete within the designated timeout of {self.timeout} seconds." ) - state = str(response.state) + if state != Batch.State.SUCCEEDED: + raise ValueError(response.state_message) return response # there might be useful results here that we can parse and return # Dataproc job output is saved to the Cloud Storage bucket diff --git a/tests/functional/adapter/test_python_model.py b/tests/functional/adapter/test_python_model.py index 241082cdb..b389fe8aa 100644 --- a/tests/functional/adapter/test_python_model.py +++ b/tests/functional/adapter/test_python_model.py @@ -66,7 +66,7 @@ def model(dbt, spark): """ models__python_array_batch_id_python = """ -import pandas +import pandas as pd def model(dbt, spark): random_array = [ From 705ae6dca1dd460dc841975e3d5b25c6af4f75a9 Mon Sep 17 00:00:00 2001 From: Colin Date: Tue, 19 Sep 2023 16:26:05 -0700 Subject: [PATCH 08/13] conditionally run python model tests and factor out batch functions to own module --- .github/workflows/integration.yml | 21 +++++++ dbt/adapters/bigquery/dataproc/__init__.py | 0 dbt/adapters/bigquery/dataproc/batch.py | 63 +++++++++++++++++++ dbt/adapters/bigquery/python_submissions.py | 67 +++++++-------------- 4 files changed, 105 insertions(+), 46 deletions(-) create mode 100644 dbt/adapters/bigquery/dataproc/__init__.py create mode 100644 dbt/adapters/bigquery/dataproc/batch.py diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index 99f78e33d..bb0211b35 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -64,6 +64,7 @@ jobs: outputs: matrix: ${{ steps.generate-matrix.outputs.result }} + run-python-tests: ${{ steps.filter.outputs.bigquery-python }} steps: - name: Check out the repository (non-PR) @@ -96,6 +97,11 @@ jobs: - 'dbt/**' - 'tests/**' - 'dev-requirements.txt' + bigquery-python: + - 'dbt/adapters/bigquery/dataproc/**' + - 'dbt/adapters/bigquery/python_submissions.py' + - 'dbt/include/bigquery/python_model/**' + - name: Generate integration test matrix id: generate-matrix uses: actions/github-script@v6 @@ -186,6 +192,21 @@ jobs: GCS_BUCKET: dbt-ci run: tox -- --ddtrace + # python models tests are slow so we only want to run them if we're changing them + - name: Run tox (python models) + if: needs.test-metadata.outputs.run-python-tests == 'true' + env: + BIGQUERY_TEST_SERVICE_ACCOUNT_JSON: ${{ secrets.BIGQUERY_TEST_SERVICE_ACCOUNT_JSON }} + BIGQUERY_TEST_ALT_DATABASE: ${{ secrets.BIGQUERY_TEST_ALT_DATABASE }} + BIGQUERY_TEST_NO_ACCESS_DATABASE: ${{ secrets.BIGQUERY_TEST_NO_ACCESS_DATABASE }} + DBT_TEST_USER_1: group:buildbot@dbtlabs.com + DBT_TEST_USER_2: group:engineering-core-team@dbtlabs.com + DBT_TEST_USER_3: serviceAccount:dbt-integration-test-user@dbt-test-env.iam.gserviceaccount.com + DATAPROC_REGION: us-central1 + DATAPROC_CLUSTER_NAME: dbt-test-1 + GCS_BUCKET: dbt-ci + run: tox -e python-tests -- --ddtrace + - uses: actions/upload-artifact@v3 if: always() with: diff --git a/dbt/adapters/bigquery/dataproc/__init__.py b/dbt/adapters/bigquery/dataproc/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/dbt/adapters/bigquery/dataproc/batch.py b/dbt/adapters/bigquery/dataproc/batch.py new file mode 100644 index 000000000..8a3a964a0 --- /dev/null +++ b/dbt/adapters/bigquery/dataproc/batch.py @@ -0,0 +1,63 @@ +from typing import Union, Dict + +import time +from datetime import datetime +from google.cloud.dataproc_v1 import ( + CreateBatchRequest, + BatchControllerClient, + Batch, + GetBatchRequest, +) +from google.protobuf.json_format import ParseDict + +from dbt.adapters.bigquery.connections import DataprocBatchConfig + +_BATCH_RUNNING_STATES = [Batch.State.PENDING, Batch.State.RUNNING] +DEFAULT_JAR_FILE_URI = "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.21.1.jar" + + +def create_batch_request(batch: Batch, batch_id, project, region) -> CreateBatchRequest: + return CreateBatchRequest( + parent=f"projects/{project}/locations/{region}", # type: ignore + batch_id=batch_id, + batch=batch, # type: ignore + ) + + +def poll_batch_job(parent, batch_id, job_client: BatchControllerClient, timeout: int) -> Batch: + batch_name = "".join([parent, "/batches/", batch_id]) + state = Batch.State.PENDING + response = None + run_time = 0 + while state in _BATCH_RUNNING_STATES and run_time < timeout: + time.sleep(1) + response = job_client.get_batch( # type: ignore + request=GetBatchRequest(name=batch_name), # type: ignore + ) + run_time = datetime.now().timestamp() - response.create_time.timestamp() # type: ignore + state = response.state + if not response: + raise ValueError("No response from Dataproc") + if run_time >= timeout: + raise ValueError( + f"Operation did not complete within the designated timeout of {timeout} seconds." + ) + if state != Batch.State.SUCCEEDED: + raise ValueError(response.state_message) + breakpoint() + return response + + +def update_batch_from_config(config_dict: Union[Dict, DataprocBatchConfig], target: Batch): + try: + # updates in place + ParseDict(config_dict, target._pb) + except Exception as e: + docurl = ( + "https://cloud.google.com/dataproc-serverless/docs/reference/rpc/google.cloud.dataproc.v1" + "#google.cloud.dataproc.v1.Batch" + ) + raise ValueError( + f"Unable to parse dataproc_batch as valid batch specification. See {docurl}. {str(e)}" + ) from e + return target diff --git a/dbt/adapters/bigquery/python_submissions.py b/dbt/adapters/bigquery/python_submissions.py index 3feb66a60..0f49af60c 100644 --- a/dbt/adapters/bigquery/python_submissions.py +++ b/dbt/adapters/bigquery/python_submissions.py @@ -1,17 +1,20 @@ -from datetime import datetime from typing import Dict, Union from dbt.adapters.base import PythonJobHelper from google.api_core.future.polling import POLLING_PREDICATE from dbt.adapters.bigquery import BigQueryConnectionManager, BigQueryCredentials -from dbt.adapters.bigquery.connections import DataprocBatchConfig from google.api_core import retry from google.api_core.client_options import ClientOptions from google.cloud import storage, dataproc_v1 # type: ignore -from google.protobuf.json_format import ParseDict from google.cloud.dataproc_v1.types.batches import Batch -import time + +from dbt.adapters.bigquery.datproc.batch import ( + create_batch_request, + poll_batch_job, + DEFAULT_JAR_FILE_URI, + update_batch_from_config, +) OPERATION_RETRY_TIME = 10 @@ -122,33 +125,21 @@ def _get_batch_id(self) -> str: return self.parsed_model["config"].get("batch_id") def _submit_dataproc_job(self) -> Batch: - batch = self._configure_batch() - parent = f"projects/{self.credential.execution_project}/locations/{self.credential.dataproc_region}" batch_id = self._get_batch_id() - request = dataproc_v1.CreateBatchRequest(parent=parent, batch=batch, batch_id=batch_id) # type: ignore + request = create_batch_request( + batch=self._configure_batch(), + batch_id=batch_id, + region=self.credential.dataproc_region, + project=self.credential.execution_project, + ) # type: ignore # make the request self.job_client.create_batch(request=request) # type: ignore - # using the creat_batch `.result()` method takes quite a while as it waits for all - # resources to tear down before returning, so we do the polling ourselves. This is a bit hacky but it works. - state = Batch.State.PENDING - response = None - run_time = 0 - while state in [Batch.State.PENDING, Batch.State.RUNNING] and run_time < self.timeout: - time.sleep(1) - response = self.job_client.get_batch( # type: ignore - request=dataproc_v1.GetBatchRequest(name="".join([parent, "/batches/", batch_id])), # type: ignore - ) - run_time = datetime.now().timestamp() - response.create_time.timestamp() # type: ignore - state = response.state - if not response: - raise ValueError("No response from Dataproc") - if run_time >= self.timeout: - raise ValueError( - f"Operation did not complete within the designated timeout of {self.timeout} seconds." - ) - if state != Batch.State.SUCCEEDED: - raise ValueError(response.state_message) - return response + return poll_batch_job( + parent=request.parent, + batch_id=batch_id, + job_client=self.job_client, + timeout=self.timeout, + ) # there might be useful results here that we can parse and return # Dataproc job output is saved to the Cloud Storage bucket # allocated to the job. Use regex to obtain the bucket and blob info. @@ -179,27 +170,11 @@ def _configure_batch(self): batch.pyspark_batch.main_python_file_uri = self.gcs_location jar_file_uri = self.parsed_model["config"].get( "jar_file_uri", - "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.21.1.jar", + DEFAULT_JAR_FILE_URI, ) batch.pyspark_batch.jar_file_uris = [jar_file_uri] # Apply configuration from dataproc_batch key, possibly overriding defaults. if self.credential.dataproc_batch: - self._update_batch_from_config(self.credential.dataproc_batch, batch) + batch = update_batch_from_config(self.credential.dataproc_batch, batch) return batch - - @classmethod - def _update_batch_from_config( - cls, config_dict: Union[Dict, DataprocBatchConfig], target: dataproc_v1.Batch - ): - try: - # updates in place - ParseDict(config_dict, target._pb) - except Exception as e: - docurl = ( - "https://cloud.google.com/dataproc-serverless/docs/reference/rpc/google.cloud.dataproc.v1" - "#google.cloud.dataproc.v1.Batch" - ) - raise ValueError( - f"Unable to parse dataproc_batch as valid batch specification. See {docurl}. {str(e)}" - ) from e From 0cf11ab9ae52aba5845aa7a7207ef520222f2137 Mon Sep 17 00:00:00 2001 From: Colin Date: Tue, 19 Sep 2023 16:44:10 -0700 Subject: [PATCH 09/13] Move events to common --- dbt/adapters/bigquery/connections.py | 6 +++--- dbt/adapters/bigquery/dataset.py | 2 +- dbt/adapters/bigquery/gcloud.py | 2 +- dbt/adapters/bigquery/impl.py | 6 +++--- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/dbt/adapters/bigquery/connections.py b/dbt/adapters/bigquery/connections.py index c136042c3..a19b4e2e6 100644 --- a/dbt/adapters/bigquery/connections.py +++ b/dbt/adapters/bigquery/connections.py @@ -35,9 +35,9 @@ DbtProfileError, ) from dbt.adapters.base import BaseConnectionManager, Credentials -from dbt.events import AdapterLogger -from dbt.events.functions import fire_event -from dbt.events.types import SQLQuery +from dbt.common.events import AdapterLogger +from dbt.common.events.functions import fire_event +from dbt.common.events.types import SQLQuery from dbt.version import __version__ as dbt_version from dbt.dataclass_schema import ExtensibleDbtClassMixin, StrEnum diff --git a/dbt/adapters/bigquery/dataset.py b/dbt/adapters/bigquery/dataset.py index c886637d7..5ac2ede94 100644 --- a/dbt/adapters/bigquery/dataset.py +++ b/dbt/adapters/bigquery/dataset.py @@ -1,7 +1,7 @@ from typing import List from google.cloud.bigquery import Dataset, AccessEntry -from dbt.events import AdapterLogger +from dbt.common.events import AdapterLogger logger = AdapterLogger("BigQuery") diff --git a/dbt/adapters/bigquery/gcloud.py b/dbt/adapters/bigquery/gcloud.py index c303097bc..2dbf5e57d 100644 --- a/dbt/adapters/bigquery/gcloud.py +++ b/dbt/adapters/bigquery/gcloud.py @@ -1,4 +1,4 @@ -from dbt.events import AdapterLogger +from dbt.common.events import AdapterLogger import dbt.exceptions from dbt.clients.system import run_cmd diff --git a/dbt/adapters/bigquery/impl.py b/dbt/adapters/bigquery/impl.py index bb04c78b8..884755226 100644 --- a/dbt/adapters/bigquery/impl.py +++ b/dbt/adapters/bigquery/impl.py @@ -35,11 +35,11 @@ ) from dbt.adapters.bigquery.connections import BigQueryAdapterResponse from dbt.contracts.graph.manifest import Manifest -from dbt.events import ( +from dbt.common.events import ( AdapterLogger, ) -from dbt.events.functions import fire_event -from dbt.events.types import SchemaCreation, SchemaDrop +from dbt.common.events.functions import fire_event +from dbt.common.events.types import SchemaCreation, SchemaDrop from dbt.utils import filter_null_values import google.auth From 3caa0e0aba2985bb456c29dd544efcca82cc0e79 Mon Sep 17 00:00:00 2001 From: Colin Date: Wed, 20 Sep 2023 10:21:07 -0700 Subject: [PATCH 10/13] fix import --- dbt/adapters/bigquery/python_submissions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/adapters/bigquery/python_submissions.py b/dbt/adapters/bigquery/python_submissions.py index 0f49af60c..69d407187 100644 --- a/dbt/adapters/bigquery/python_submissions.py +++ b/dbt/adapters/bigquery/python_submissions.py @@ -9,7 +9,7 @@ from google.cloud import storage, dataproc_v1 # type: ignore from google.cloud.dataproc_v1.types.batches import Batch -from dbt.adapters.bigquery.datproc.batch import ( +from dbt.adapters.bigquery.dataproc.batch import ( create_batch_request, poll_batch_job, DEFAULT_JAR_FILE_URI, From 3fbd35bd06e33c8ca98300a022f69aed35842f64 Mon Sep 17 00:00:00 2001 From: Colin Date: Wed, 20 Sep 2023 10:26:24 -0700 Subject: [PATCH 11/13] fix mistaken import change --- dbt/adapters/bigquery/connections.py | 6 +++--- dbt/adapters/bigquery/dataset.py | 2 +- dbt/adapters/bigquery/gcloud.py | 2 +- dbt/adapters/bigquery/impl.py | 6 +++--- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/dbt/adapters/bigquery/connections.py b/dbt/adapters/bigquery/connections.py index a19b4e2e6..c136042c3 100644 --- a/dbt/adapters/bigquery/connections.py +++ b/dbt/adapters/bigquery/connections.py @@ -35,9 +35,9 @@ DbtProfileError, ) from dbt.adapters.base import BaseConnectionManager, Credentials -from dbt.common.events import AdapterLogger -from dbt.common.events.functions import fire_event -from dbt.common.events.types import SQLQuery +from dbt.events import AdapterLogger +from dbt.events.functions import fire_event +from dbt.events.types import SQLQuery from dbt.version import __version__ as dbt_version from dbt.dataclass_schema import ExtensibleDbtClassMixin, StrEnum diff --git a/dbt/adapters/bigquery/dataset.py b/dbt/adapters/bigquery/dataset.py index 5ac2ede94..c886637d7 100644 --- a/dbt/adapters/bigquery/dataset.py +++ b/dbt/adapters/bigquery/dataset.py @@ -1,7 +1,7 @@ from typing import List from google.cloud.bigquery import Dataset, AccessEntry -from dbt.common.events import AdapterLogger +from dbt.events import AdapterLogger logger = AdapterLogger("BigQuery") diff --git a/dbt/adapters/bigquery/gcloud.py b/dbt/adapters/bigquery/gcloud.py index 2dbf5e57d..c303097bc 100644 --- a/dbt/adapters/bigquery/gcloud.py +++ b/dbt/adapters/bigquery/gcloud.py @@ -1,4 +1,4 @@ -from dbt.common.events import AdapterLogger +from dbt.events import AdapterLogger import dbt.exceptions from dbt.clients.system import run_cmd diff --git a/dbt/adapters/bigquery/impl.py b/dbt/adapters/bigquery/impl.py index 884755226..bb04c78b8 100644 --- a/dbt/adapters/bigquery/impl.py +++ b/dbt/adapters/bigquery/impl.py @@ -35,11 +35,11 @@ ) from dbt.adapters.bigquery.connections import BigQueryAdapterResponse from dbt.contracts.graph.manifest import Manifest -from dbt.common.events import ( +from dbt.events import ( AdapterLogger, ) -from dbt.common.events.functions import fire_event -from dbt.common.events.types import SchemaCreation, SchemaDrop +from dbt.events.functions import fire_event +from dbt.events.types import SchemaCreation, SchemaDrop from dbt.utils import filter_null_values import google.auth From 0d059b67117fb9ea9721326cfd3839620169dc36 Mon Sep 17 00:00:00 2001 From: Colin Date: Wed, 20 Sep 2023 12:07:47 -0700 Subject: [PATCH 12/13] update unit test --- tests/unit/test_configure_dataproc_batch.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/unit/test_configure_dataproc_batch.py b/tests/unit/test_configure_dataproc_batch.py index 58ff52bab..94cb28efb 100644 --- a/tests/unit/test_configure_dataproc_batch.py +++ b/tests/unit/test_configure_dataproc_batch.py @@ -1,6 +1,6 @@ from unittest.mock import patch -from dbt.adapters.bigquery.python_submissions import ServerlessDataProcHelper +from dbt.adapters.bigquery.dataproc.batch import update_batch_from_config from google.cloud import dataproc_v1 from .test_bigquery_adapter import BaseTestBigQueryAdapter @@ -39,7 +39,7 @@ def test_update_dataproc_serverless_batch(self, mock_get_bigquery_defaults): batch = dataproc_v1.Batch() - ServerlessDataProcHelper._update_batch_from_config(raw_batch_config, batch) + batch = update_batch_from_config(raw_batch_config, batch) def to_str_values(d): """google's protobuf types expose maps as dict[str, str]""" From 11facf638d572da932851e1b93a9ea1ff590c792 Mon Sep 17 00:00:00 2001 From: Colin Date: Tue, 26 Sep 2023 14:52:22 -0700 Subject: [PATCH 13/13] clean up and typing --- dbt/adapters/bigquery/dataproc/batch.py | 22 ++++++++++++--------- dbt/adapters/bigquery/python_submissions.py | 6 +++--- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/dbt/adapters/bigquery/dataproc/batch.py b/dbt/adapters/bigquery/dataproc/batch.py index 8a3a964a0..0dc54aa78 100644 --- a/dbt/adapters/bigquery/dataproc/batch.py +++ b/dbt/adapters/bigquery/dataproc/batch.py @@ -16,15 +16,19 @@ DEFAULT_JAR_FILE_URI = "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.21.1.jar" -def create_batch_request(batch: Batch, batch_id, project, region) -> CreateBatchRequest: +def create_batch_request( + batch: Batch, batch_id: str, project: str, region: str +) -> CreateBatchRequest: return CreateBatchRequest( parent=f"projects/{project}/locations/{region}", # type: ignore - batch_id=batch_id, + batch_id=batch_id, # type: ignore batch=batch, # type: ignore ) -def poll_batch_job(parent, batch_id, job_client: BatchControllerClient, timeout: int) -> Batch: +def poll_batch_job( + parent: str, batch_id: str, job_client: BatchControllerClient, timeout: int +) -> Batch: batch_name = "".join([parent, "/batches/", batch_id]) state = Batch.State.PENDING response = None @@ -38,13 +42,13 @@ def poll_batch_job(parent, batch_id, job_client: BatchControllerClient, timeout: state = response.state if not response: raise ValueError("No response from Dataproc") - if run_time >= timeout: - raise ValueError( - f"Operation did not complete within the designated timeout of {timeout} seconds." - ) if state != Batch.State.SUCCEEDED: - raise ValueError(response.state_message) - breakpoint() + if run_time >= timeout: + raise ValueError( + f"Operation did not complete within the designated timeout of {timeout} seconds." + ) + else: + raise ValueError(response.state_message) return response diff --git a/dbt/adapters/bigquery/python_submissions.py b/dbt/adapters/bigquery/python_submissions.py index 69d407187..8fd354eb5 100644 --- a/dbt/adapters/bigquery/python_submissions.py +++ b/dbt/adapters/bigquery/python_submissions.py @@ -129,15 +129,15 @@ def _submit_dataproc_job(self) -> Batch: request = create_batch_request( batch=self._configure_batch(), batch_id=batch_id, - region=self.credential.dataproc_region, - project=self.credential.execution_project, + region=self.credential.dataproc_region, # type: ignore + project=self.credential.execution_project, # type: ignore ) # type: ignore # make the request self.job_client.create_batch(request=request) # type: ignore return poll_batch_job( parent=request.parent, batch_id=batch_id, - job_client=self.job_client, + job_client=self.job_client, # type: ignore timeout=self.timeout, ) # there might be useful results here that we can parse and return