From 23e8020801526787a72407d9dea3d42016e49ad0 Mon Sep 17 00:00:00 2001 From: Mike Alfare <13974384+mikealfare@users.noreply.github.com> Date: Mon, 4 Nov 2024 19:25:40 -0500 Subject: [PATCH 1/2] update macos runners from macos-12 to macos-14 (#1394) --- .github/scripts/integration-test-matrix.js | 4 ++-- .github/workflows/main.yml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/scripts/integration-test-matrix.js b/.github/scripts/integration-test-matrix.js index 49db45575..bebe08569 100644 --- a/.github/scripts/integration-test-matrix.js +++ b/.github/scripts/integration-test-matrix.js @@ -44,7 +44,7 @@ module.exports = ({ context }) => { if (labels.includes("test macos") || testAllLabel) { include.push({ - os: "macos-12", + os: "macos-14", adapter, "python-version": pythonVersion, }); @@ -78,7 +78,7 @@ module.exports = ({ context }) => { // additionally include runs for all adapters, on macos and windows, // but only for the default python version for (const adapter of supportedAdapters) { - for (const operatingSystem of ["windows-latest", "macos-12"]) { + for (const operatingSystem of ["windows-latest", "macos-14"]) { include.push({ os: operatingSystem, adapter: adapter, diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 6bfed5df6..7b82f3e0f 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -174,7 +174,7 @@ jobs: strategy: fail-fast: false matrix: - os: [ubuntu-latest, macos-12, windows-latest] + os: [ubuntu-latest, macos-14, windows-latest] python-version: ['3.9', '3.10', '3.11', '3.12'] dist-type: ["whl", "gz"] From 35c32f181b97f69d2b20a374c7a0dca765455c81 Mon Sep 17 00:00:00 2001 From: Mike Alfare <13974384+mikealfare@users.noreply.github.com> Date: Tue, 5 Nov 2024 14:03:44 -0500 Subject: [PATCH 2/2] Break out credentials as a separate module (#1391) * break out credentials into its own module * fix imports * update unit test mocks for new location of get_bigquery_defaults --- .../Under the Hood-20241104-173815.yaml | 7 + dbt/adapters/bigquery/__init__.py | 10 +- dbt/adapters/bigquery/column.py | 5 +- dbt/adapters/bigquery/connections.py | 197 ++---------------- dbt/adapters/bigquery/credentials.py | 187 +++++++++++++++++ dbt/adapters/bigquery/dataproc/batch.py | 13 +- dbt/adapters/bigquery/dataset.py | 4 +- dbt/adapters/bigquery/gcloud.py | 29 --- dbt/adapters/bigquery/impl.py | 51 +++-- dbt/adapters/bigquery/python_submissions.py | 14 +- dbt/adapters/bigquery/relation.py | 9 +- tests/unit/test_bigquery_adapter.py | 4 +- tests/unit/test_configure_dataproc_batch.py | 4 +- 13 files changed, 275 insertions(+), 259 deletions(-) create mode 100644 .changes/unreleased/Under the Hood-20241104-173815.yaml create mode 100644 dbt/adapters/bigquery/credentials.py delete mode 100644 dbt/adapters/bigquery/gcloud.py diff --git a/.changes/unreleased/Under the Hood-20241104-173815.yaml b/.changes/unreleased/Under the Hood-20241104-173815.yaml new file mode 100644 index 000000000..e3e81dec1 --- /dev/null +++ b/.changes/unreleased/Under the Hood-20241104-173815.yaml @@ -0,0 +1,7 @@ +kind: Under the Hood +body: Separate credentials functionality into its own module for reuse in retry and + python submissions +time: 2024-11-04T17:38:15.940962-05:00 +custom: + Author: mikealfare + Issue: "1391" diff --git a/dbt/adapters/bigquery/__init__.py b/dbt/adapters/bigquery/__init__.py index 5fe68e786..74fa17cda 100644 --- a/dbt/adapters/bigquery/__init__.py +++ b/dbt/adapters/bigquery/__init__.py @@ -1,8 +1,8 @@ -from dbt.adapters.bigquery.connections import BigQueryConnectionManager # noqa -from dbt.adapters.bigquery.connections import BigQueryCredentials -from dbt.adapters.bigquery.relation import BigQueryRelation # noqa -from dbt.adapters.bigquery.column import BigQueryColumn # noqa -from dbt.adapters.bigquery.impl import BigQueryAdapter, GrantTarget, PartitionConfig # noqa +from dbt.adapters.bigquery.column import BigQueryColumn +from dbt.adapters.bigquery.connections import BigQueryConnectionManager +from dbt.adapters.bigquery.credentials import BigQueryCredentials +from dbt.adapters.bigquery.impl import BigQueryAdapter, GrantTarget, PartitionConfig +from dbt.adapters.bigquery.relation import BigQueryRelation from dbt.adapters.base import AdapterPlugin from dbt.include import bigquery diff --git a/dbt/adapters/bigquery/column.py b/dbt/adapters/bigquery/column.py index 4a12e211f..a676fef4b 100644 --- a/dbt/adapters/bigquery/column.py +++ b/dbt/adapters/bigquery/column.py @@ -1,9 +1,10 @@ from dataclasses import dataclass -from typing import Optional, List, TypeVar, Iterable, Type, Any, Dict, Union +from typing import Any, Dict, Iterable, List, Optional, Type, TypeVar, Union + +from google.cloud.bigquery import SchemaField from dbt.adapters.base.column import Column -from google.cloud.bigquery import SchemaField _PARENT_DATA_TYPE_KEY = "__parent_data_type" diff --git a/dbt/adapters/bigquery/connections.py b/dbt/adapters/bigquery/connections.py index 58b3dbe41..bda54080b 100644 --- a/dbt/adapters/bigquery/connections.py +++ b/dbt/adapters/bigquery/connections.py @@ -1,58 +1,54 @@ from collections import defaultdict from concurrent.futures import TimeoutError +from contextlib import contextmanager +from dataclasses import dataclass import json +from multiprocessing.context import SpawnContext import re -from contextlib import contextmanager -from dataclasses import dataclass, field +from typing import Dict, Hashable, List, Optional, Tuple, TYPE_CHECKING import uuid -from mashumaro.helper import pass_through - -from functools import lru_cache -from requests.exceptions import ConnectionError - -from multiprocessing.context import SpawnContext -from typing import Optional, Any, Dict, Tuple, Hashable, List, TYPE_CHECKING +from google.api_core import client_info, client_options, retry import google.auth +from google.auth import impersonated_credentials import google.auth.exceptions import google.cloud.bigquery import google.cloud.exceptions -from google.api_core import retry, client_info, client_options -from google.auth import impersonated_credentials from google.oauth2 import ( credentials as GoogleCredentials, service_account as GoogleServiceAccountCredentials, ) +from requests.exceptions import ConnectionError from dbt_common.events.contextvars import get_node_info from dbt_common.events.functions import fire_event -from dbt_common.exceptions import ( - DbtRuntimeError, - DbtConfigError, - DbtDatabaseError, -) +from dbt_common.exceptions import DbtDatabaseError, DbtRuntimeError from dbt_common.invocation import get_invocation_id -from dbt.adapters.bigquery import gcloud +from dbt.adapters.base import BaseConnectionManager from dbt.adapters.contracts.connection import ( - ConnectionState, - AdapterResponse, - Credentials, AdapterRequiredConfig, + AdapterResponse, + ConnectionState, ) -from dbt.adapters.exceptions.connection import FailedToConnectError -from dbt.adapters.base import BaseConnectionManager from dbt.adapters.events.logging import AdapterLogger from dbt.adapters.events.types import SQLQuery -from dbt.adapters.bigquery import __version__ as dbt_version -from dbt.adapters.bigquery.utility import is_base64, base64_to_string +from dbt.adapters.exceptions.connection import FailedToConnectError -from dbt_common.dataclass_schema import ExtensibleDbtClassMixin, StrEnum +import dbt.adapters.bigquery.__version__ as dbt_version +from dbt.adapters.bigquery.credentials import ( + BigQueryConnectionMethod, + Priority, + get_bigquery_defaults, + setup_default_credentials, +) +from dbt.adapters.bigquery.utility import is_base64, base64_to_string if TYPE_CHECKING: # Indirectly imported via agate_helper, which is lazy loaded further downfile. # Used by mypy for earlier type hints. import agate + logger = AdapterLogger("BigQuery") BQ_QUERY_JOB_SPLIT = "-----Query Job SQL Follows-----" @@ -73,33 +69,6 @@ ) -@lru_cache() -def get_bigquery_defaults(scopes=None) -> Tuple[Any, Optional[str]]: - """ - Returns (credentials, project_id) - - project_id is returned available from the environment; otherwise None - """ - # Cached, because the underlying implementation shells out, taking ~1s - try: - credentials, _ = google.auth.default(scopes=scopes) - return credentials, _ - except google.auth.exceptions.DefaultCredentialsError as e: - raise DbtConfigError(f"Failed to authenticate with supplied credentials\nerror:\n{e}") - - -class Priority(StrEnum): - Interactive = "interactive" - Batch = "batch" - - -class BigQueryConnectionMethod(StrEnum): - OAUTH = "oauth" - SERVICE_ACCOUNT = "service-account" - SERVICE_ACCOUNT_JSON = "service-account-json" - OAUTH_SECRETS = "oauth-secrets" - - @dataclass class BigQueryAdapterResponse(AdapterResponse): bytes_processed: Optional[int] = None @@ -110,128 +79,6 @@ class BigQueryAdapterResponse(AdapterResponse): slot_ms: Optional[int] = None -@dataclass -class DataprocBatchConfig(ExtensibleDbtClassMixin): - def __init__(self, batch_config): - self.batch_config = batch_config - - -@dataclass -class BigQueryCredentials(Credentials): - method: BigQueryConnectionMethod = None # type: ignore - - # BigQuery allows an empty database / project, where it defers to the - # environment for the project - database: Optional[str] = None - schema: Optional[str] = None - execution_project: Optional[str] = None - quota_project: Optional[str] = None - location: Optional[str] = None - priority: Optional[Priority] = None - maximum_bytes_billed: Optional[int] = None - impersonate_service_account: Optional[str] = None - - job_retry_deadline_seconds: Optional[int] = None - job_retries: Optional[int] = 1 - job_creation_timeout_seconds: Optional[int] = None - job_execution_timeout_seconds: Optional[int] = None - - # Keyfile json creds (unicode or base 64 encoded) - keyfile: Optional[str] = None - keyfile_json: Optional[Dict[str, Any]] = None - - # oauth-secrets - token: Optional[str] = None - refresh_token: Optional[str] = None - client_id: Optional[str] = None - client_secret: Optional[str] = None - token_uri: Optional[str] = None - - dataproc_region: Optional[str] = None - dataproc_cluster_name: Optional[str] = None - gcs_bucket: Optional[str] = None - - dataproc_batch: Optional[DataprocBatchConfig] = field( - metadata={ - "serialization_strategy": pass_through, - }, - default=None, - ) - - scopes: Optional[Tuple[str, ...]] = ( - "https://www.googleapis.com/auth/bigquery", - "https://www.googleapis.com/auth/cloud-platform", - "https://www.googleapis.com/auth/drive", - ) - - _ALIASES = { - # 'legacy_name': 'current_name' - "project": "database", - "dataset": "schema", - "target_project": "target_database", - "target_dataset": "target_schema", - "retries": "job_retries", - "timeout_seconds": "job_execution_timeout_seconds", - } - - def __post_init__(self): - if self.keyfile_json and "private_key" in self.keyfile_json: - self.keyfile_json["private_key"] = self.keyfile_json["private_key"].replace( - "\\n", "\n" - ) - if not self.method: - raise DbtRuntimeError("Must specify authentication method") - - if not self.schema: - raise DbtRuntimeError("Must specify schema") - - @property - def type(self): - return "bigquery" - - @property - def unique_field(self): - return self.database - - def _connection_keys(self): - return ( - "method", - "database", - "execution_project", - "schema", - "location", - "priority", - "maximum_bytes_billed", - "impersonate_service_account", - "job_retry_deadline_seconds", - "job_retries", - "job_creation_timeout_seconds", - "job_execution_timeout_seconds", - "timeout_seconds", - "client_id", - "token_uri", - "dataproc_region", - "dataproc_cluster_name", - "gcs_bucket", - "dataproc_batch", - ) - - @classmethod - def __pre_deserialize__(cls, d: Dict[Any, Any]) -> Dict[Any, Any]: - # We need to inject the correct value of the database (aka project) at - # this stage, ref - # https://github.com/dbt-labs/dbt/pull/2908#discussion_r532927436. - - # `database` is an alias of `project` in BigQuery - if "database" not in d: - _, database = get_bigquery_defaults() - d["database"] = database - # `execution_project` default to dataset/project - if "execution_project" not in d: - d["execution_project"] = d["database"] - return d - - class BigQueryConnectionManager(BaseConnectionManager): TYPE = "bigquery" @@ -433,7 +280,7 @@ def open(cls, connection): except google.auth.exceptions.DefaultCredentialsError: logger.info("Please log into GCP to continue") - gcloud.setup_default_credentials() + setup_default_credentials() handle = cls.get_bigquery_client(connection.credentials) diff --git a/dbt/adapters/bigquery/credentials.py b/dbt/adapters/bigquery/credentials.py new file mode 100644 index 000000000..32f172dac --- /dev/null +++ b/dbt/adapters/bigquery/credentials.py @@ -0,0 +1,187 @@ +from dataclasses import dataclass, field +from functools import lru_cache +from typing import Any, Dict, Optional, Tuple + +import google.auth +from google.auth.exceptions import DefaultCredentialsError +from mashumaro import pass_through + +from dbt_common.clients.system import run_cmd +from dbt_common.dataclass_schema import ExtensibleDbtClassMixin, StrEnum +from dbt_common.exceptions import DbtConfigError, DbtRuntimeError +from dbt.adapters.contracts.connection import Credentials +from dbt.adapters.events.logging import AdapterLogger + + +_logger = AdapterLogger("BigQuery") + + +class Priority(StrEnum): + Interactive = "interactive" + Batch = "batch" + + +class BigQueryConnectionMethod(StrEnum): + OAUTH = "oauth" + SERVICE_ACCOUNT = "service-account" + SERVICE_ACCOUNT_JSON = "service-account-json" + OAUTH_SECRETS = "oauth-secrets" + + +@dataclass +class DataprocBatchConfig(ExtensibleDbtClassMixin): + def __init__(self, batch_config): + self.batch_config = batch_config + + +@lru_cache() +def get_bigquery_defaults(scopes=None) -> Tuple[Any, Optional[str]]: + """ + Returns (credentials, project_id) + + project_id is returned available from the environment; otherwise None + """ + # Cached, because the underlying implementation shells out, taking ~1s + try: + credentials, _ = google.auth.default(scopes=scopes) + return credentials, _ + except DefaultCredentialsError as e: + raise DbtConfigError(f"Failed to authenticate with supplied credentials\nerror:\n{e}") + + +def setup_default_credentials(): + if _gcloud_installed(): + run_cmd(".", ["gcloud", "auth", "application-default", "login"]) + else: + msg = """ + dbt requires the gcloud SDK to be installed to authenticate with BigQuery. + Please download and install the SDK, or use a Service Account instead. + + https://cloud.google.com/sdk/ + """ + raise DbtRuntimeError(msg) + + +def _gcloud_installed(): + try: + run_cmd(".", ["gcloud", "--version"]) + return True + except OSError as e: + _logger.debug(e) + return False + + +@dataclass +class BigQueryCredentials(Credentials): + method: BigQueryConnectionMethod = None # type: ignore + + # BigQuery allows an empty database / project, where it defers to the + # environment for the project + database: Optional[str] = None + schema: Optional[str] = None + execution_project: Optional[str] = None + quota_project: Optional[str] = None + location: Optional[str] = None + priority: Optional[Priority] = None + maximum_bytes_billed: Optional[int] = None + impersonate_service_account: Optional[str] = None + + job_retry_deadline_seconds: Optional[int] = None + job_retries: Optional[int] = 1 + job_creation_timeout_seconds: Optional[int] = None + job_execution_timeout_seconds: Optional[int] = None + + # Keyfile json creds (unicode or base 64 encoded) + keyfile: Optional[str] = None + keyfile_json: Optional[Dict[str, Any]] = None + + # oauth-secrets + token: Optional[str] = None + refresh_token: Optional[str] = None + client_id: Optional[str] = None + client_secret: Optional[str] = None + token_uri: Optional[str] = None + + dataproc_region: Optional[str] = None + dataproc_cluster_name: Optional[str] = None + gcs_bucket: Optional[str] = None + + dataproc_batch: Optional[DataprocBatchConfig] = field( + metadata={ + "serialization_strategy": pass_through, + }, + default=None, + ) + + scopes: Optional[Tuple[str, ...]] = ( + "https://www.googleapis.com/auth/bigquery", + "https://www.googleapis.com/auth/cloud-platform", + "https://www.googleapis.com/auth/drive", + ) + + _ALIASES = { + # 'legacy_name': 'current_name' + "project": "database", + "dataset": "schema", + "target_project": "target_database", + "target_dataset": "target_schema", + "retries": "job_retries", + "timeout_seconds": "job_execution_timeout_seconds", + } + + def __post_init__(self): + if self.keyfile_json and "private_key" in self.keyfile_json: + self.keyfile_json["private_key"] = self.keyfile_json["private_key"].replace( + "\\n", "\n" + ) + if not self.method: + raise DbtRuntimeError("Must specify authentication method") + + if not self.schema: + raise DbtRuntimeError("Must specify schema") + + @property + def type(self): + return "bigquery" + + @property + def unique_field(self): + return self.database + + def _connection_keys(self): + return ( + "method", + "database", + "execution_project", + "schema", + "location", + "priority", + "maximum_bytes_billed", + "impersonate_service_account", + "job_retry_deadline_seconds", + "job_retries", + "job_creation_timeout_seconds", + "job_execution_timeout_seconds", + "timeout_seconds", + "client_id", + "token_uri", + "dataproc_region", + "dataproc_cluster_name", + "gcs_bucket", + "dataproc_batch", + ) + + @classmethod + def __pre_deserialize__(cls, d: Dict[Any, Any]) -> Dict[Any, Any]: + # We need to inject the correct value of the database (aka project) at + # this stage, ref + # https://github.com/dbt-labs/dbt/pull/2908#discussion_r532927436. + + # `database` is an alias of `project` in BigQuery + if "database" not in d: + _, database = get_bigquery_defaults() + d["database"] = database + # `execution_project` default to dataset/project + if "execution_project" not in d: + d["execution_project"] = d["database"] + return d diff --git a/dbt/adapters/bigquery/dataproc/batch.py b/dbt/adapters/bigquery/dataproc/batch.py index e7f13c913..59f40d246 100644 --- a/dbt/adapters/bigquery/dataproc/batch.py +++ b/dbt/adapters/bigquery/dataproc/batch.py @@ -1,16 +1,17 @@ -from typing import Union, Dict - -import time from datetime import datetime +import time +from typing import Dict, Union + from google.cloud.dataproc_v1 import ( - CreateBatchRequest, - BatchControllerClient, Batch, + BatchControllerClient, + CreateBatchRequest, GetBatchRequest, ) from google.protobuf.json_format import ParseDict -from dbt.adapters.bigquery.connections import DataprocBatchConfig +from dbt.adapters.bigquery.credentials import DataprocBatchConfig + _BATCH_RUNNING_STATES = [Batch.State.PENDING, Batch.State.RUNNING] DEFAULT_JAR_FILE_URI = "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-0.34.0.jar" diff --git a/dbt/adapters/bigquery/dataset.py b/dbt/adapters/bigquery/dataset.py index 4ecd6daa5..a4504294a 100644 --- a/dbt/adapters/bigquery/dataset.py +++ b/dbt/adapters/bigquery/dataset.py @@ -1,8 +1,10 @@ from typing import List -from google.cloud.bigquery import Dataset, AccessEntry + +from google.cloud.bigquery import AccessEntry, Dataset from dbt.adapters.events.logging import AdapterLogger + logger = AdapterLogger("BigQuery") diff --git a/dbt/adapters/bigquery/gcloud.py b/dbt/adapters/bigquery/gcloud.py deleted file mode 100644 index ea1f644ba..000000000 --- a/dbt/adapters/bigquery/gcloud.py +++ /dev/null @@ -1,29 +0,0 @@ -from dbt_common.exceptions import DbtRuntimeError - -from dbt.adapters.events.logging import AdapterLogger -from dbt_common.clients.system import run_cmd - -NOT_INSTALLED_MSG = """ -dbt requires the gcloud SDK to be installed to authenticate with BigQuery. -Please download and install the SDK, or use a Service Account instead. - -https://cloud.google.com/sdk/ -""" - -logger = AdapterLogger("BigQuery") - - -def gcloud_installed(): - try: - run_cmd(".", ["gcloud", "--version"]) - return True - except OSError as e: - logger.debug(e) - return False - - -def setup_default_credentials(): - if gcloud_installed(): - run_cmd(".", ["gcloud", "auth", "application-default", "login"]) - else: - raise DbtRuntimeError(NOT_INSTALLED_MSG) diff --git a/dbt/adapters/bigquery/impl.py b/dbt/adapters/bigquery/impl.py index 0b49c0373..cf5800fd3 100644 --- a/dbt/adapters/bigquery/impl.py +++ b/dbt/adapters/bigquery/impl.py @@ -1,27 +1,41 @@ from dataclasses import dataclass from datetime import datetime import json -import threading from multiprocessing.context import SpawnContext - +import threading import time from typing import ( Any, Dict, + FrozenSet, + Iterable, List, Optional, + Tuple, + TYPE_CHECKING, Type, Set, Union, - FrozenSet, - Tuple, - Iterable, - TYPE_CHECKING, ) -from dbt.adapters.contracts.relation import RelationConfig +import google.api_core +import google.auth +import google.oauth2 +import google.cloud.bigquery +from google.cloud.bigquery import AccessEntry, SchemaField, Table as BigQueryTable +import google.cloud.exceptions +import pytz +from dbt_common.contracts.constraints import ( + ColumnLevelConstraint, + ConstraintType, + ModelLevelConstraint, +) +from dbt_common.dataclass_schema import dbtClassMixin +from dbt_common.events.functions import fire_event +import dbt_common.exceptions import dbt_common.exceptions.base +from dbt_common.utils import filter_null_values from dbt.adapters.base import ( AdapterConfig, BaseAdapter, @@ -37,28 +51,12 @@ from dbt.adapters.capability import Capability, CapabilityDict, CapabilitySupport, Support from dbt.adapters.contracts.connection import AdapterResponse from dbt.adapters.contracts.macros import MacroResolverProtocol -from dbt_common.contracts.constraints import ( - ColumnLevelConstraint, - ConstraintType, - ModelLevelConstraint, -) -from dbt_common.dataclass_schema import dbtClassMixin +from dbt.adapters.contracts.relation import RelationConfig from dbt.adapters.events.logging import AdapterLogger -from dbt_common.events.functions import fire_event from dbt.adapters.events.types import SchemaCreation, SchemaDrop -import dbt_common.exceptions -from dbt_common.utils import filter_null_values -import google.api_core -import google.auth -import google.oauth2 -import google.cloud.bigquery -from google.cloud.bigquery import AccessEntry, SchemaField, Table as BigQueryTable -import google.cloud.exceptions -import pytz -from dbt.adapters.bigquery import BigQueryColumn, BigQueryConnectionManager -from dbt.adapters.bigquery.column import get_nested_column_data_types -from dbt.adapters.bigquery.connections import BigQueryAdapterResponse +from dbt.adapters.bigquery.column import BigQueryColumn, get_nested_column_data_types +from dbt.adapters.bigquery.connections import BigQueryAdapterResponse, BigQueryConnectionManager from dbt.adapters.bigquery.dataset import add_access_entry_to_dataset, is_access_entry_in_dataset from dbt.adapters.bigquery.python_submissions import ( ClusterDataprocHelper, @@ -77,6 +75,7 @@ # Used by mypy for earlier type hints. import agate + logger = AdapterLogger("BigQuery") # Write dispositions for bigquery. diff --git a/dbt/adapters/bigquery/python_submissions.py b/dbt/adapters/bigquery/python_submissions.py index 368ed9d07..93c82ca92 100644 --- a/dbt/adapters/bigquery/python_submissions.py +++ b/dbt/adapters/bigquery/python_submissions.py @@ -1,21 +1,21 @@ import uuid from typing import Dict, Union -from dbt.adapters.events.logging import AdapterLogger - -from dbt.adapters.base import PythonJobHelper -from google.api_core.future.polling import POLLING_PREDICATE - -from dbt.adapters.bigquery import BigQueryConnectionManager, BigQueryCredentials from google.api_core import retry from google.api_core.client_options import ClientOptions +from google.api_core.future.polling import POLLING_PREDICATE from google.cloud import storage, dataproc_v1 from google.cloud.dataproc_v1.types.batches import Batch +from dbt.adapters.base import PythonJobHelper +from dbt.adapters.events.logging import AdapterLogger + +from dbt.adapters.bigquery.connections import BigQueryConnectionManager +from dbt.adapters.bigquery.credentials import BigQueryCredentials from dbt.adapters.bigquery.dataproc.batch import ( + DEFAULT_JAR_FILE_URI, create_batch_request, poll_batch_job, - DEFAULT_JAR_FILE_URI, update_batch_from_config, ) diff --git a/dbt/adapters/bigquery/relation.py b/dbt/adapters/bigquery/relation.py index 0e2c17670..4edc8d7ac 100644 --- a/dbt/adapters/bigquery/relation.py +++ b/dbt/adapters/bigquery/relation.py @@ -1,9 +1,13 @@ from dataclasses import dataclass, field +from itertools import chain, islice from typing import FrozenSet, Optional, TypeVar -from itertools import chain, islice +from dbt_common.exceptions import CompilationError +from dbt_common.utils.dict import filter_null_values from dbt.adapters.base.relation import BaseRelation, ComponentName, InformationSchema +from dbt.adapters.contracts.relation import RelationConfig, RelationType from dbt.adapters.relation_configs import RelationConfigChangeAction + from dbt.adapters.bigquery.relation_configs import ( BigQueryClusterConfigChange, BigQueryMaterializedViewConfig, @@ -11,9 +15,6 @@ BigQueryOptionsConfigChange, BigQueryPartitionConfigChange, ) -from dbt.adapters.contracts.relation import RelationType, RelationConfig -from dbt_common.exceptions import CompilationError -from dbt_common.utils.dict import filter_null_values Self = TypeVar("Self", bound="BigQueryRelation") diff --git a/tests/unit/test_bigquery_adapter.py b/tests/unit/test_bigquery_adapter.py index da499d266..ca3bfc24c 100644 --- a/tests/unit/test_bigquery_adapter.py +++ b/tests/unit/test_bigquery_adapter.py @@ -203,7 +203,7 @@ def get_adapter(self, target) -> BigQueryAdapter: class TestBigQueryAdapterAcquire(BaseTestBigQueryAdapter): @patch( - "dbt.adapters.bigquery.connections.get_bigquery_defaults", + "dbt.adapters.bigquery.credentials.get_bigquery_defaults", return_value=("credentials", "project_id"), ) @patch("dbt.adapters.bigquery.BigQueryConnectionManager.open", return_value=_bq_conn()) @@ -244,7 +244,7 @@ def test_acquire_connection_oauth_validations(self, mock_open_connection): mock_open_connection.assert_called_once() @patch( - "dbt.adapters.bigquery.connections.get_bigquery_defaults", + "dbt.adapters.bigquery.credentials.get_bigquery_defaults", return_value=("credentials", "project_id"), ) @patch("dbt.adapters.bigquery.BigQueryConnectionManager.open", return_value=_bq_conn()) diff --git a/tests/unit/test_configure_dataproc_batch.py b/tests/unit/test_configure_dataproc_batch.py index 94cb28efb..f56aee129 100644 --- a/tests/unit/test_configure_dataproc_batch.py +++ b/tests/unit/test_configure_dataproc_batch.py @@ -12,7 +12,7 @@ # parsed credentials class TestConfigureDataprocBatch(BaseTestBigQueryAdapter): @patch( - "dbt.adapters.bigquery.connections.get_bigquery_defaults", + "dbt.adapters.bigquery.credentials.get_bigquery_defaults", return_value=("credentials", "project_id"), ) def test_update_dataproc_serverless_batch(self, mock_get_bigquery_defaults): @@ -64,7 +64,7 @@ def to_str_values(d): ) @patch( - "dbt.adapters.bigquery.connections.get_bigquery_defaults", + "dbt.adapters.bigquery.credentials.get_bigquery_defaults", return_value=("credentials", "project_id"), ) def test_default_dataproc_serverless_batch(self, mock_get_bigquery_defaults):