From 79fbd9062c2d927fce2b105902466453e1d8b55f Mon Sep 17 00:00:00 2001 From: jaejun <63435794+jx2lee@users.noreply.github.com> Date: Fri, 25 Oct 2024 05:50:37 +0900 Subject: [PATCH 1/5] add test case when raise ServiceUnavailable in is_retryable (#1224) * raise ServiceUnavailable and test that it is retry-able --------- Co-authored-by: Mike Alfare <13974384+mikealfare@users.noreply.github.com> --- .changes/unreleased/Features-20240505-011838.yaml | 6 ++++++ tests/unit/test_bigquery_connection_manager.py | 2 ++ 2 files changed, 8 insertions(+) create mode 100644 .changes/unreleased/Features-20240505-011838.yaml diff --git a/.changes/unreleased/Features-20240505-011838.yaml b/.changes/unreleased/Features-20240505-011838.yaml new file mode 100644 index 000000000..66411853f --- /dev/null +++ b/.changes/unreleased/Features-20240505-011838.yaml @@ -0,0 +1,6 @@ +kind: Features +body: add is_retryable test case when raise ServiceUnavailable +time: 2024-05-05T01:18:38.737882+09:00 +custom: + Author: jx2lee + Issue: "682" diff --git a/tests/unit/test_bigquery_connection_manager.py b/tests/unit/test_bigquery_connection_manager.py index d09cb1635..1c14100f6 100644 --- a/tests/unit/test_bigquery_connection_manager.py +++ b/tests/unit/test_bigquery_connection_manager.py @@ -84,12 +84,14 @@ def test_is_retryable(self): rate_limit_error = exceptions.Forbidden( "code broke", errors=[{"reason": "rateLimitExceeded"}] ) + service_unavailable_error = exceptions.ServiceUnavailable("service is unavailable") self.assertTrue(_is_retryable(internal_server_error)) self.assertTrue(_is_retryable(bad_request_error)) self.assertTrue(_is_retryable(connection_error)) self.assertFalse(_is_retryable(client_error)) self.assertTrue(_is_retryable(rate_limit_error)) + self.assertTrue(_is_retryable(service_unavailable_error)) def test_drop_dataset(self): mock_table = Mock() From a09a8faefe247a4793918a0b857dc27f78cf8602 Mon Sep 17 00:00:00 2001 From: Colin Rogers <111200756+colin-rogers-dbt@users.noreply.github.com> Date: Tue, 29 Oct 2024 14:41:02 -0700 Subject: [PATCH 2/5] use "direct" write for non-partitioned python model materializations (#1388) * use dynamic schema in test_grant_access_to.py * use dynamic schema in test_grant_access_to.py * revert setup * use "direct" write for non-partitioned python model materializations * add changie log * add code comment * make code comment inline * make code comment inline * remove code comment * use set write_method instead of inline conditional * use set write_method instead of inline conditional --- .changes/unreleased/Fixes-20241028-172719.yaml | 6 ++++++ .../bigquery/macros/materializations/table.sql | 11 ++++++++++- 2 files changed, 16 insertions(+), 1 deletion(-) create mode 100644 .changes/unreleased/Fixes-20241028-172719.yaml diff --git a/.changes/unreleased/Fixes-20241028-172719.yaml b/.changes/unreleased/Fixes-20241028-172719.yaml new file mode 100644 index 000000000..87ee2c25d --- /dev/null +++ b/.changes/unreleased/Fixes-20241028-172719.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: use "direct" write for non-partitioned python model materializations +time: 2024-10-28T17:27:19.306348-07:00 +custom: + Author: colin-rogers-dbt + Issue: "1318" diff --git a/dbt/include/bigquery/macros/materializations/table.sql b/dbt/include/bigquery/macros/materializations/table.sql index e3c5b3598..41bb69770 100644 --- a/dbt/include/bigquery/macros/materializations/table.sql +++ b/dbt/include/bigquery/macros/materializations/table.sql @@ -113,10 +113,19 @@ else: msg = f"{type(df)} is not a supported type for dbt Python materialization" raise Exception(msg) +# For writeMethod we need to use "indirect" if materializing a partitioned table +# otherwise we can use "direct". Note that indirect will fail if the GCS bucket has a retention policy set on it. +{%- if partition_config %} + {%- set write_method = 'indirect' -%} +{%- else %} + {% set write_method = 'direct' -%} +{%- endif %} + df.write \ .mode("overwrite") \ .format("bigquery") \ - .option("writeMethod", "indirect").option("writeDisposition", 'WRITE_TRUNCATE') \ + .option("writeMethod", "{{ write_method }}") \ + .option("writeDisposition", 'WRITE_TRUNCATE') \ {%- if partition_config is not none %} {%- if partition_config.data_type | lower in ('date','timestamp','datetime') %} .option("partitionField", "{{- partition_config.field -}}") \ From 4e3f86e02a98e7c0ae9b17c7e14ee38de08314e3 Mon Sep 17 00:00:00 2001 From: Jack Date: Fri, 1 Nov 2024 20:05:23 +0000 Subject: [PATCH 3/5] Feature/add quota project option (#1345) * add ability to specify quota project in profile --------- Co-authored-by: Mike Alfare <13974384+mikealfare@users.noreply.github.com> --- .../unreleased/Features-20240911-234859.yaml | 6 +++++ dbt/adapters/bigquery/connections.py | 6 ++++- tests/functional/test_quota_project.py | 27 +++++++++++++++++++ tests/unit/test_bigquery_adapter.py | 5 +++- 4 files changed, 42 insertions(+), 2 deletions(-) create mode 100644 .changes/unreleased/Features-20240911-234859.yaml create mode 100644 tests/functional/test_quota_project.py diff --git a/.changes/unreleased/Features-20240911-234859.yaml b/.changes/unreleased/Features-20240911-234859.yaml new file mode 100644 index 000000000..5351c3315 --- /dev/null +++ b/.changes/unreleased/Features-20240911-234859.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Adds the ability to set optional `quota_project` in profile +time: 2024-09-11T23:48:59.767649+01:00 +custom: + Author: jcarpenter12 + Issue: 1343 1344 diff --git a/dbt/adapters/bigquery/connections.py b/dbt/adapters/bigquery/connections.py index d3eee3ef3..58b3dbe41 100644 --- a/dbt/adapters/bigquery/connections.py +++ b/dbt/adapters/bigquery/connections.py @@ -17,7 +17,7 @@ import google.auth.exceptions import google.cloud.bigquery import google.cloud.exceptions -from google.api_core import retry, client_info +from google.api_core import retry, client_info, client_options from google.auth import impersonated_credentials from google.oauth2 import ( credentials as GoogleCredentials, @@ -125,6 +125,7 @@ class BigQueryCredentials(Credentials): database: Optional[str] = None schema: Optional[str] = None execution_project: Optional[str] = None + quota_project: Optional[str] = None location: Optional[str] = None priority: Optional[Priority] = None maximum_bytes_billed: Optional[int] = None @@ -408,14 +409,17 @@ def get_credentials(cls, profile_credentials): def get_bigquery_client(cls, profile_credentials): creds = cls.get_credentials(profile_credentials) execution_project = profile_credentials.execution_project + quota_project = profile_credentials.quota_project location = getattr(profile_credentials, "location", None) info = client_info.ClientInfo(user_agent=f"dbt-bigquery-{dbt_version.version}") + options = client_options.ClientOptions(quota_project_id=quota_project) return google.cloud.bigquery.Client( execution_project, creds, location=location, client_info=info, + client_options=options, ) @classmethod diff --git a/tests/functional/test_quota_project.py b/tests/functional/test_quota_project.py new file mode 100644 index 000000000..0b4bb90c4 --- /dev/null +++ b/tests/functional/test_quota_project.py @@ -0,0 +1,27 @@ +import os + +import pytest + +from dbt.tests.util import run_dbt + +_QUOTA_PROJECT = os.getenv("BIGQUERY_TEST_ALT_DATABASE") + + +class TestNoQuotaProject: + def test_no_quota_project(self, project): + results = run_dbt() + for result in results: + assert None == result.adapter_response["quota_project"] + + +class TestQuotaProjectOption: + @pytest.fixture(scope="class") + def profiles_config_update(self, dbt_profile_target): + outputs = {"default": dbt_profile_target} + outputs["default"]["quota_project"] = _QUOTA_PROJECT + yield + + def test_quota_project_option(self, project): + results = run_dbt() + for result in results: + assert _QUOTA_PROJECT == result.adapter_response["quota_project"] diff --git a/tests/unit/test_bigquery_adapter.py b/tests/unit/test_bigquery_adapter.py index a922525fd..da499d266 100644 --- a/tests/unit/test_bigquery_adapter.py +++ b/tests/unit/test_bigquery_adapter.py @@ -386,15 +386,17 @@ def test_cancel_open_connections_single(self): adapter.connections.thread_connections.update({key: master, 1: model}) self.assertEqual(len(list(adapter.cancel_open_connections())), 1) + @patch("dbt.adapters.bigquery.impl.google.api_core.client_options.ClientOptions") @patch("dbt.adapters.bigquery.impl.google.auth.default") @patch("dbt.adapters.bigquery.impl.google.cloud.bigquery") - def test_location_user_agent(self, mock_bq, mock_auth_default): + def test_location_user_agent(self, mock_bq, mock_auth_default, MockClientOptions): creds = MagicMock() mock_auth_default.return_value = (creds, MagicMock()) adapter = self.get_adapter("loc") connection = adapter.acquire_connection("dummy") mock_client = mock_bq.Client + mock_client_options = MockClientOptions.return_value mock_client.assert_not_called() connection.handle @@ -403,6 +405,7 @@ def test_location_user_agent(self, mock_bq, mock_auth_default): creds, location="Luna Station", client_info=HasUserAgent(), + client_options=mock_client_options, ) From 23e8020801526787a72407d9dea3d42016e49ad0 Mon Sep 17 00:00:00 2001 From: Mike Alfare <13974384+mikealfare@users.noreply.github.com> Date: Mon, 4 Nov 2024 19:25:40 -0500 Subject: [PATCH 4/5] update macos runners from macos-12 to macos-14 (#1394) --- .github/scripts/integration-test-matrix.js | 4 ++-- .github/workflows/main.yml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/scripts/integration-test-matrix.js b/.github/scripts/integration-test-matrix.js index 49db45575..bebe08569 100644 --- a/.github/scripts/integration-test-matrix.js +++ b/.github/scripts/integration-test-matrix.js @@ -44,7 +44,7 @@ module.exports = ({ context }) => { if (labels.includes("test macos") || testAllLabel) { include.push({ - os: "macos-12", + os: "macos-14", adapter, "python-version": pythonVersion, }); @@ -78,7 +78,7 @@ module.exports = ({ context }) => { // additionally include runs for all adapters, on macos and windows, // but only for the default python version for (const adapter of supportedAdapters) { - for (const operatingSystem of ["windows-latest", "macos-12"]) { + for (const operatingSystem of ["windows-latest", "macos-14"]) { include.push({ os: operatingSystem, adapter: adapter, diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 6bfed5df6..7b82f3e0f 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -174,7 +174,7 @@ jobs: strategy: fail-fast: false matrix: - os: [ubuntu-latest, macos-12, windows-latest] + os: [ubuntu-latest, macos-14, windows-latest] python-version: ['3.9', '3.10', '3.11', '3.12'] dist-type: ["whl", "gz"] From 35c32f181b97f69d2b20a374c7a0dca765455c81 Mon Sep 17 00:00:00 2001 From: Mike Alfare <13974384+mikealfare@users.noreply.github.com> Date: Tue, 5 Nov 2024 14:03:44 -0500 Subject: [PATCH 5/5] Break out credentials as a separate module (#1391) * break out credentials into its own module * fix imports * update unit test mocks for new location of get_bigquery_defaults --- .../Under the Hood-20241104-173815.yaml | 7 + dbt/adapters/bigquery/__init__.py | 10 +- dbt/adapters/bigquery/column.py | 5 +- dbt/adapters/bigquery/connections.py | 197 ++---------------- dbt/adapters/bigquery/credentials.py | 187 +++++++++++++++++ dbt/adapters/bigquery/dataproc/batch.py | 13 +- dbt/adapters/bigquery/dataset.py | 4 +- dbt/adapters/bigquery/gcloud.py | 29 --- dbt/adapters/bigquery/impl.py | 51 +++-- dbt/adapters/bigquery/python_submissions.py | 14 +- dbt/adapters/bigquery/relation.py | 9 +- tests/unit/test_bigquery_adapter.py | 4 +- tests/unit/test_configure_dataproc_batch.py | 4 +- 13 files changed, 275 insertions(+), 259 deletions(-) create mode 100644 .changes/unreleased/Under the Hood-20241104-173815.yaml create mode 100644 dbt/adapters/bigquery/credentials.py delete mode 100644 dbt/adapters/bigquery/gcloud.py diff --git a/.changes/unreleased/Under the Hood-20241104-173815.yaml b/.changes/unreleased/Under the Hood-20241104-173815.yaml new file mode 100644 index 000000000..e3e81dec1 --- /dev/null +++ b/.changes/unreleased/Under the Hood-20241104-173815.yaml @@ -0,0 +1,7 @@ +kind: Under the Hood +body: Separate credentials functionality into its own module for reuse in retry and + python submissions +time: 2024-11-04T17:38:15.940962-05:00 +custom: + Author: mikealfare + Issue: "1391" diff --git a/dbt/adapters/bigquery/__init__.py b/dbt/adapters/bigquery/__init__.py index 5fe68e786..74fa17cda 100644 --- a/dbt/adapters/bigquery/__init__.py +++ b/dbt/adapters/bigquery/__init__.py @@ -1,8 +1,8 @@ -from dbt.adapters.bigquery.connections import BigQueryConnectionManager # noqa -from dbt.adapters.bigquery.connections import BigQueryCredentials -from dbt.adapters.bigquery.relation import BigQueryRelation # noqa -from dbt.adapters.bigquery.column import BigQueryColumn # noqa -from dbt.adapters.bigquery.impl import BigQueryAdapter, GrantTarget, PartitionConfig # noqa +from dbt.adapters.bigquery.column import BigQueryColumn +from dbt.adapters.bigquery.connections import BigQueryConnectionManager +from dbt.adapters.bigquery.credentials import BigQueryCredentials +from dbt.adapters.bigquery.impl import BigQueryAdapter, GrantTarget, PartitionConfig +from dbt.adapters.bigquery.relation import BigQueryRelation from dbt.adapters.base import AdapterPlugin from dbt.include import bigquery diff --git a/dbt/adapters/bigquery/column.py b/dbt/adapters/bigquery/column.py index 4a12e211f..a676fef4b 100644 --- a/dbt/adapters/bigquery/column.py +++ b/dbt/adapters/bigquery/column.py @@ -1,9 +1,10 @@ from dataclasses import dataclass -from typing import Optional, List, TypeVar, Iterable, Type, Any, Dict, Union +from typing import Any, Dict, Iterable, List, Optional, Type, TypeVar, Union + +from google.cloud.bigquery import SchemaField from dbt.adapters.base.column import Column -from google.cloud.bigquery import SchemaField _PARENT_DATA_TYPE_KEY = "__parent_data_type" diff --git a/dbt/adapters/bigquery/connections.py b/dbt/adapters/bigquery/connections.py index 58b3dbe41..bda54080b 100644 --- a/dbt/adapters/bigquery/connections.py +++ b/dbt/adapters/bigquery/connections.py @@ -1,58 +1,54 @@ from collections import defaultdict from concurrent.futures import TimeoutError +from contextlib import contextmanager +from dataclasses import dataclass import json +from multiprocessing.context import SpawnContext import re -from contextlib import contextmanager -from dataclasses import dataclass, field +from typing import Dict, Hashable, List, Optional, Tuple, TYPE_CHECKING import uuid -from mashumaro.helper import pass_through - -from functools import lru_cache -from requests.exceptions import ConnectionError - -from multiprocessing.context import SpawnContext -from typing import Optional, Any, Dict, Tuple, Hashable, List, TYPE_CHECKING +from google.api_core import client_info, client_options, retry import google.auth +from google.auth import impersonated_credentials import google.auth.exceptions import google.cloud.bigquery import google.cloud.exceptions -from google.api_core import retry, client_info, client_options -from google.auth import impersonated_credentials from google.oauth2 import ( credentials as GoogleCredentials, service_account as GoogleServiceAccountCredentials, ) +from requests.exceptions import ConnectionError from dbt_common.events.contextvars import get_node_info from dbt_common.events.functions import fire_event -from dbt_common.exceptions import ( - DbtRuntimeError, - DbtConfigError, - DbtDatabaseError, -) +from dbt_common.exceptions import DbtDatabaseError, DbtRuntimeError from dbt_common.invocation import get_invocation_id -from dbt.adapters.bigquery import gcloud +from dbt.adapters.base import BaseConnectionManager from dbt.adapters.contracts.connection import ( - ConnectionState, - AdapterResponse, - Credentials, AdapterRequiredConfig, + AdapterResponse, + ConnectionState, ) -from dbt.adapters.exceptions.connection import FailedToConnectError -from dbt.adapters.base import BaseConnectionManager from dbt.adapters.events.logging import AdapterLogger from dbt.adapters.events.types import SQLQuery -from dbt.adapters.bigquery import __version__ as dbt_version -from dbt.adapters.bigquery.utility import is_base64, base64_to_string +from dbt.adapters.exceptions.connection import FailedToConnectError -from dbt_common.dataclass_schema import ExtensibleDbtClassMixin, StrEnum +import dbt.adapters.bigquery.__version__ as dbt_version +from dbt.adapters.bigquery.credentials import ( + BigQueryConnectionMethod, + Priority, + get_bigquery_defaults, + setup_default_credentials, +) +from dbt.adapters.bigquery.utility import is_base64, base64_to_string if TYPE_CHECKING: # Indirectly imported via agate_helper, which is lazy loaded further downfile. # Used by mypy for earlier type hints. import agate + logger = AdapterLogger("BigQuery") BQ_QUERY_JOB_SPLIT = "-----Query Job SQL Follows-----" @@ -73,33 +69,6 @@ ) -@lru_cache() -def get_bigquery_defaults(scopes=None) -> Tuple[Any, Optional[str]]: - """ - Returns (credentials, project_id) - - project_id is returned available from the environment; otherwise None - """ - # Cached, because the underlying implementation shells out, taking ~1s - try: - credentials, _ = google.auth.default(scopes=scopes) - return credentials, _ - except google.auth.exceptions.DefaultCredentialsError as e: - raise DbtConfigError(f"Failed to authenticate with supplied credentials\nerror:\n{e}") - - -class Priority(StrEnum): - Interactive = "interactive" - Batch = "batch" - - -class BigQueryConnectionMethod(StrEnum): - OAUTH = "oauth" - SERVICE_ACCOUNT = "service-account" - SERVICE_ACCOUNT_JSON = "service-account-json" - OAUTH_SECRETS = "oauth-secrets" - - @dataclass class BigQueryAdapterResponse(AdapterResponse): bytes_processed: Optional[int] = None @@ -110,128 +79,6 @@ class BigQueryAdapterResponse(AdapterResponse): slot_ms: Optional[int] = None -@dataclass -class DataprocBatchConfig(ExtensibleDbtClassMixin): - def __init__(self, batch_config): - self.batch_config = batch_config - - -@dataclass -class BigQueryCredentials(Credentials): - method: BigQueryConnectionMethod = None # type: ignore - - # BigQuery allows an empty database / project, where it defers to the - # environment for the project - database: Optional[str] = None - schema: Optional[str] = None - execution_project: Optional[str] = None - quota_project: Optional[str] = None - location: Optional[str] = None - priority: Optional[Priority] = None - maximum_bytes_billed: Optional[int] = None - impersonate_service_account: Optional[str] = None - - job_retry_deadline_seconds: Optional[int] = None - job_retries: Optional[int] = 1 - job_creation_timeout_seconds: Optional[int] = None - job_execution_timeout_seconds: Optional[int] = None - - # Keyfile json creds (unicode or base 64 encoded) - keyfile: Optional[str] = None - keyfile_json: Optional[Dict[str, Any]] = None - - # oauth-secrets - token: Optional[str] = None - refresh_token: Optional[str] = None - client_id: Optional[str] = None - client_secret: Optional[str] = None - token_uri: Optional[str] = None - - dataproc_region: Optional[str] = None - dataproc_cluster_name: Optional[str] = None - gcs_bucket: Optional[str] = None - - dataproc_batch: Optional[DataprocBatchConfig] = field( - metadata={ - "serialization_strategy": pass_through, - }, - default=None, - ) - - scopes: Optional[Tuple[str, ...]] = ( - "https://www.googleapis.com/auth/bigquery", - "https://www.googleapis.com/auth/cloud-platform", - "https://www.googleapis.com/auth/drive", - ) - - _ALIASES = { - # 'legacy_name': 'current_name' - "project": "database", - "dataset": "schema", - "target_project": "target_database", - "target_dataset": "target_schema", - "retries": "job_retries", - "timeout_seconds": "job_execution_timeout_seconds", - } - - def __post_init__(self): - if self.keyfile_json and "private_key" in self.keyfile_json: - self.keyfile_json["private_key"] = self.keyfile_json["private_key"].replace( - "\\n", "\n" - ) - if not self.method: - raise DbtRuntimeError("Must specify authentication method") - - if not self.schema: - raise DbtRuntimeError("Must specify schema") - - @property - def type(self): - return "bigquery" - - @property - def unique_field(self): - return self.database - - def _connection_keys(self): - return ( - "method", - "database", - "execution_project", - "schema", - "location", - "priority", - "maximum_bytes_billed", - "impersonate_service_account", - "job_retry_deadline_seconds", - "job_retries", - "job_creation_timeout_seconds", - "job_execution_timeout_seconds", - "timeout_seconds", - "client_id", - "token_uri", - "dataproc_region", - "dataproc_cluster_name", - "gcs_bucket", - "dataproc_batch", - ) - - @classmethod - def __pre_deserialize__(cls, d: Dict[Any, Any]) -> Dict[Any, Any]: - # We need to inject the correct value of the database (aka project) at - # this stage, ref - # https://github.com/dbt-labs/dbt/pull/2908#discussion_r532927436. - - # `database` is an alias of `project` in BigQuery - if "database" not in d: - _, database = get_bigquery_defaults() - d["database"] = database - # `execution_project` default to dataset/project - if "execution_project" not in d: - d["execution_project"] = d["database"] - return d - - class BigQueryConnectionManager(BaseConnectionManager): TYPE = "bigquery" @@ -433,7 +280,7 @@ def open(cls, connection): except google.auth.exceptions.DefaultCredentialsError: logger.info("Please log into GCP to continue") - gcloud.setup_default_credentials() + setup_default_credentials() handle = cls.get_bigquery_client(connection.credentials) diff --git a/dbt/adapters/bigquery/credentials.py b/dbt/adapters/bigquery/credentials.py new file mode 100644 index 000000000..32f172dac --- /dev/null +++ b/dbt/adapters/bigquery/credentials.py @@ -0,0 +1,187 @@ +from dataclasses import dataclass, field +from functools import lru_cache +from typing import Any, Dict, Optional, Tuple + +import google.auth +from google.auth.exceptions import DefaultCredentialsError +from mashumaro import pass_through + +from dbt_common.clients.system import run_cmd +from dbt_common.dataclass_schema import ExtensibleDbtClassMixin, StrEnum +from dbt_common.exceptions import DbtConfigError, DbtRuntimeError +from dbt.adapters.contracts.connection import Credentials +from dbt.adapters.events.logging import AdapterLogger + + +_logger = AdapterLogger("BigQuery") + + +class Priority(StrEnum): + Interactive = "interactive" + Batch = "batch" + + +class BigQueryConnectionMethod(StrEnum): + OAUTH = "oauth" + SERVICE_ACCOUNT = "service-account" + SERVICE_ACCOUNT_JSON = "service-account-json" + OAUTH_SECRETS = "oauth-secrets" + + +@dataclass +class DataprocBatchConfig(ExtensibleDbtClassMixin): + def __init__(self, batch_config): + self.batch_config = batch_config + + +@lru_cache() +def get_bigquery_defaults(scopes=None) -> Tuple[Any, Optional[str]]: + """ + Returns (credentials, project_id) + + project_id is returned available from the environment; otherwise None + """ + # Cached, because the underlying implementation shells out, taking ~1s + try: + credentials, _ = google.auth.default(scopes=scopes) + return credentials, _ + except DefaultCredentialsError as e: + raise DbtConfigError(f"Failed to authenticate with supplied credentials\nerror:\n{e}") + + +def setup_default_credentials(): + if _gcloud_installed(): + run_cmd(".", ["gcloud", "auth", "application-default", "login"]) + else: + msg = """ + dbt requires the gcloud SDK to be installed to authenticate with BigQuery. + Please download and install the SDK, or use a Service Account instead. + + https://cloud.google.com/sdk/ + """ + raise DbtRuntimeError(msg) + + +def _gcloud_installed(): + try: + run_cmd(".", ["gcloud", "--version"]) + return True + except OSError as e: + _logger.debug(e) + return False + + +@dataclass +class BigQueryCredentials(Credentials): + method: BigQueryConnectionMethod = None # type: ignore + + # BigQuery allows an empty database / project, where it defers to the + # environment for the project + database: Optional[str] = None + schema: Optional[str] = None + execution_project: Optional[str] = None + quota_project: Optional[str] = None + location: Optional[str] = None + priority: Optional[Priority] = None + maximum_bytes_billed: Optional[int] = None + impersonate_service_account: Optional[str] = None + + job_retry_deadline_seconds: Optional[int] = None + job_retries: Optional[int] = 1 + job_creation_timeout_seconds: Optional[int] = None + job_execution_timeout_seconds: Optional[int] = None + + # Keyfile json creds (unicode or base 64 encoded) + keyfile: Optional[str] = None + keyfile_json: Optional[Dict[str, Any]] = None + + # oauth-secrets + token: Optional[str] = None + refresh_token: Optional[str] = None + client_id: Optional[str] = None + client_secret: Optional[str] = None + token_uri: Optional[str] = None + + dataproc_region: Optional[str] = None + dataproc_cluster_name: Optional[str] = None + gcs_bucket: Optional[str] = None + + dataproc_batch: Optional[DataprocBatchConfig] = field( + metadata={ + "serialization_strategy": pass_through, + }, + default=None, + ) + + scopes: Optional[Tuple[str, ...]] = ( + "https://www.googleapis.com/auth/bigquery", + "https://www.googleapis.com/auth/cloud-platform", + "https://www.googleapis.com/auth/drive", + ) + + _ALIASES = { + # 'legacy_name': 'current_name' + "project": "database", + "dataset": "schema", + "target_project": "target_database", + "target_dataset": "target_schema", + "retries": "job_retries", + "timeout_seconds": "job_execution_timeout_seconds", + } + + def __post_init__(self): + if self.keyfile_json and "private_key" in self.keyfile_json: + self.keyfile_json["private_key"] = self.keyfile_json["private_key"].replace( + "\\n", "\n" + ) + if not self.method: + raise DbtRuntimeError("Must specify authentication method") + + if not self.schema: + raise DbtRuntimeError("Must specify schema") + + @property + def type(self): + return "bigquery" + + @property + def unique_field(self): + return self.database + + def _connection_keys(self): + return ( + "method", + "database", + "execution_project", + "schema", + "location", + "priority", + "maximum_bytes_billed", + "impersonate_service_account", + "job_retry_deadline_seconds", + "job_retries", + "job_creation_timeout_seconds", + "job_execution_timeout_seconds", + "timeout_seconds", + "client_id", + "token_uri", + "dataproc_region", + "dataproc_cluster_name", + "gcs_bucket", + "dataproc_batch", + ) + + @classmethod + def __pre_deserialize__(cls, d: Dict[Any, Any]) -> Dict[Any, Any]: + # We need to inject the correct value of the database (aka project) at + # this stage, ref + # https://github.com/dbt-labs/dbt/pull/2908#discussion_r532927436. + + # `database` is an alias of `project` in BigQuery + if "database" not in d: + _, database = get_bigquery_defaults() + d["database"] = database + # `execution_project` default to dataset/project + if "execution_project" not in d: + d["execution_project"] = d["database"] + return d diff --git a/dbt/adapters/bigquery/dataproc/batch.py b/dbt/adapters/bigquery/dataproc/batch.py index e7f13c913..59f40d246 100644 --- a/dbt/adapters/bigquery/dataproc/batch.py +++ b/dbt/adapters/bigquery/dataproc/batch.py @@ -1,16 +1,17 @@ -from typing import Union, Dict - -import time from datetime import datetime +import time +from typing import Dict, Union + from google.cloud.dataproc_v1 import ( - CreateBatchRequest, - BatchControllerClient, Batch, + BatchControllerClient, + CreateBatchRequest, GetBatchRequest, ) from google.protobuf.json_format import ParseDict -from dbt.adapters.bigquery.connections import DataprocBatchConfig +from dbt.adapters.bigquery.credentials import DataprocBatchConfig + _BATCH_RUNNING_STATES = [Batch.State.PENDING, Batch.State.RUNNING] DEFAULT_JAR_FILE_URI = "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-0.34.0.jar" diff --git a/dbt/adapters/bigquery/dataset.py b/dbt/adapters/bigquery/dataset.py index 4ecd6daa5..a4504294a 100644 --- a/dbt/adapters/bigquery/dataset.py +++ b/dbt/adapters/bigquery/dataset.py @@ -1,8 +1,10 @@ from typing import List -from google.cloud.bigquery import Dataset, AccessEntry + +from google.cloud.bigquery import AccessEntry, Dataset from dbt.adapters.events.logging import AdapterLogger + logger = AdapterLogger("BigQuery") diff --git a/dbt/adapters/bigquery/gcloud.py b/dbt/adapters/bigquery/gcloud.py deleted file mode 100644 index ea1f644ba..000000000 --- a/dbt/adapters/bigquery/gcloud.py +++ /dev/null @@ -1,29 +0,0 @@ -from dbt_common.exceptions import DbtRuntimeError - -from dbt.adapters.events.logging import AdapterLogger -from dbt_common.clients.system import run_cmd - -NOT_INSTALLED_MSG = """ -dbt requires the gcloud SDK to be installed to authenticate with BigQuery. -Please download and install the SDK, or use a Service Account instead. - -https://cloud.google.com/sdk/ -""" - -logger = AdapterLogger("BigQuery") - - -def gcloud_installed(): - try: - run_cmd(".", ["gcloud", "--version"]) - return True - except OSError as e: - logger.debug(e) - return False - - -def setup_default_credentials(): - if gcloud_installed(): - run_cmd(".", ["gcloud", "auth", "application-default", "login"]) - else: - raise DbtRuntimeError(NOT_INSTALLED_MSG) diff --git a/dbt/adapters/bigquery/impl.py b/dbt/adapters/bigquery/impl.py index 0b49c0373..cf5800fd3 100644 --- a/dbt/adapters/bigquery/impl.py +++ b/dbt/adapters/bigquery/impl.py @@ -1,27 +1,41 @@ from dataclasses import dataclass from datetime import datetime import json -import threading from multiprocessing.context import SpawnContext - +import threading import time from typing import ( Any, Dict, + FrozenSet, + Iterable, List, Optional, + Tuple, + TYPE_CHECKING, Type, Set, Union, - FrozenSet, - Tuple, - Iterable, - TYPE_CHECKING, ) -from dbt.adapters.contracts.relation import RelationConfig +import google.api_core +import google.auth +import google.oauth2 +import google.cloud.bigquery +from google.cloud.bigquery import AccessEntry, SchemaField, Table as BigQueryTable +import google.cloud.exceptions +import pytz +from dbt_common.contracts.constraints import ( + ColumnLevelConstraint, + ConstraintType, + ModelLevelConstraint, +) +from dbt_common.dataclass_schema import dbtClassMixin +from dbt_common.events.functions import fire_event +import dbt_common.exceptions import dbt_common.exceptions.base +from dbt_common.utils import filter_null_values from dbt.adapters.base import ( AdapterConfig, BaseAdapter, @@ -37,28 +51,12 @@ from dbt.adapters.capability import Capability, CapabilityDict, CapabilitySupport, Support from dbt.adapters.contracts.connection import AdapterResponse from dbt.adapters.contracts.macros import MacroResolverProtocol -from dbt_common.contracts.constraints import ( - ColumnLevelConstraint, - ConstraintType, - ModelLevelConstraint, -) -from dbt_common.dataclass_schema import dbtClassMixin +from dbt.adapters.contracts.relation import RelationConfig from dbt.adapters.events.logging import AdapterLogger -from dbt_common.events.functions import fire_event from dbt.adapters.events.types import SchemaCreation, SchemaDrop -import dbt_common.exceptions -from dbt_common.utils import filter_null_values -import google.api_core -import google.auth -import google.oauth2 -import google.cloud.bigquery -from google.cloud.bigquery import AccessEntry, SchemaField, Table as BigQueryTable -import google.cloud.exceptions -import pytz -from dbt.adapters.bigquery import BigQueryColumn, BigQueryConnectionManager -from dbt.adapters.bigquery.column import get_nested_column_data_types -from dbt.adapters.bigquery.connections import BigQueryAdapterResponse +from dbt.adapters.bigquery.column import BigQueryColumn, get_nested_column_data_types +from dbt.adapters.bigquery.connections import BigQueryAdapterResponse, BigQueryConnectionManager from dbt.adapters.bigquery.dataset import add_access_entry_to_dataset, is_access_entry_in_dataset from dbt.adapters.bigquery.python_submissions import ( ClusterDataprocHelper, @@ -77,6 +75,7 @@ # Used by mypy for earlier type hints. import agate + logger = AdapterLogger("BigQuery") # Write dispositions for bigquery. diff --git a/dbt/adapters/bigquery/python_submissions.py b/dbt/adapters/bigquery/python_submissions.py index 368ed9d07..93c82ca92 100644 --- a/dbt/adapters/bigquery/python_submissions.py +++ b/dbt/adapters/bigquery/python_submissions.py @@ -1,21 +1,21 @@ import uuid from typing import Dict, Union -from dbt.adapters.events.logging import AdapterLogger - -from dbt.adapters.base import PythonJobHelper -from google.api_core.future.polling import POLLING_PREDICATE - -from dbt.adapters.bigquery import BigQueryConnectionManager, BigQueryCredentials from google.api_core import retry from google.api_core.client_options import ClientOptions +from google.api_core.future.polling import POLLING_PREDICATE from google.cloud import storage, dataproc_v1 from google.cloud.dataproc_v1.types.batches import Batch +from dbt.adapters.base import PythonJobHelper +from dbt.adapters.events.logging import AdapterLogger + +from dbt.adapters.bigquery.connections import BigQueryConnectionManager +from dbt.adapters.bigquery.credentials import BigQueryCredentials from dbt.adapters.bigquery.dataproc.batch import ( + DEFAULT_JAR_FILE_URI, create_batch_request, poll_batch_job, - DEFAULT_JAR_FILE_URI, update_batch_from_config, ) diff --git a/dbt/adapters/bigquery/relation.py b/dbt/adapters/bigquery/relation.py index 0e2c17670..4edc8d7ac 100644 --- a/dbt/adapters/bigquery/relation.py +++ b/dbt/adapters/bigquery/relation.py @@ -1,9 +1,13 @@ from dataclasses import dataclass, field +from itertools import chain, islice from typing import FrozenSet, Optional, TypeVar -from itertools import chain, islice +from dbt_common.exceptions import CompilationError +from dbt_common.utils.dict import filter_null_values from dbt.adapters.base.relation import BaseRelation, ComponentName, InformationSchema +from dbt.adapters.contracts.relation import RelationConfig, RelationType from dbt.adapters.relation_configs import RelationConfigChangeAction + from dbt.adapters.bigquery.relation_configs import ( BigQueryClusterConfigChange, BigQueryMaterializedViewConfig, @@ -11,9 +15,6 @@ BigQueryOptionsConfigChange, BigQueryPartitionConfigChange, ) -from dbt.adapters.contracts.relation import RelationType, RelationConfig -from dbt_common.exceptions import CompilationError -from dbt_common.utils.dict import filter_null_values Self = TypeVar("Self", bound="BigQueryRelation") diff --git a/tests/unit/test_bigquery_adapter.py b/tests/unit/test_bigquery_adapter.py index da499d266..ca3bfc24c 100644 --- a/tests/unit/test_bigquery_adapter.py +++ b/tests/unit/test_bigquery_adapter.py @@ -203,7 +203,7 @@ def get_adapter(self, target) -> BigQueryAdapter: class TestBigQueryAdapterAcquire(BaseTestBigQueryAdapter): @patch( - "dbt.adapters.bigquery.connections.get_bigquery_defaults", + "dbt.adapters.bigquery.credentials.get_bigquery_defaults", return_value=("credentials", "project_id"), ) @patch("dbt.adapters.bigquery.BigQueryConnectionManager.open", return_value=_bq_conn()) @@ -244,7 +244,7 @@ def test_acquire_connection_oauth_validations(self, mock_open_connection): mock_open_connection.assert_called_once() @patch( - "dbt.adapters.bigquery.connections.get_bigquery_defaults", + "dbt.adapters.bigquery.credentials.get_bigquery_defaults", return_value=("credentials", "project_id"), ) @patch("dbt.adapters.bigquery.BigQueryConnectionManager.open", return_value=_bq_conn()) diff --git a/tests/unit/test_configure_dataproc_batch.py b/tests/unit/test_configure_dataproc_batch.py index 94cb28efb..f56aee129 100644 --- a/tests/unit/test_configure_dataproc_batch.py +++ b/tests/unit/test_configure_dataproc_batch.py @@ -12,7 +12,7 @@ # parsed credentials class TestConfigureDataprocBatch(BaseTestBigQueryAdapter): @patch( - "dbt.adapters.bigquery.connections.get_bigquery_defaults", + "dbt.adapters.bigquery.credentials.get_bigquery_defaults", return_value=("credentials", "project_id"), ) def test_update_dataproc_serverless_batch(self, mock_get_bigquery_defaults): @@ -64,7 +64,7 @@ def to_str_values(d): ) @patch( - "dbt.adapters.bigquery.connections.get_bigquery_defaults", + "dbt.adapters.bigquery.credentials.get_bigquery_defaults", return_value=("credentials", "project_id"), ) def test_default_dataproc_serverless_batch(self, mock_get_bigquery_defaults):