diff --git a/.changes/unreleased/Breaking Changes-20241016-185117.yaml b/.changes/unreleased/Breaking Changes-20241016-185117.yaml new file mode 100644 index 000000000..55bb37461 --- /dev/null +++ b/.changes/unreleased/Breaking Changes-20241016-185117.yaml @@ -0,0 +1,6 @@ +kind: Breaking Changes +body: Drop support for Python 3.8 +time: 2024-10-16T18:51:17.581547-04:00 +custom: + Author: mikealfare + Issue: "1373" diff --git a/.changes/unreleased/Features-20240505-011838.yaml b/.changes/unreleased/Features-20240505-011838.yaml new file mode 100644 index 000000000..66411853f --- /dev/null +++ b/.changes/unreleased/Features-20240505-011838.yaml @@ -0,0 +1,6 @@ +kind: Features +body: add is_retryable test case when raise ServiceUnavailable +time: 2024-05-05T01:18:38.737882+09:00 +custom: + Author: jx2lee + Issue: "682" diff --git a/.changes/unreleased/Features-20240911-234859.yaml b/.changes/unreleased/Features-20240911-234859.yaml new file mode 100644 index 000000000..5351c3315 --- /dev/null +++ b/.changes/unreleased/Features-20240911-234859.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Adds the ability to set optional `quota_project` in profile +time: 2024-09-11T23:48:59.767649+01:00 +custom: + Author: jcarpenter12 + Issue: 1343 1344 diff --git a/.changes/unreleased/Fixes-20241028-172719.yaml b/.changes/unreleased/Fixes-20241028-172719.yaml new file mode 100644 index 000000000..87ee2c25d --- /dev/null +++ b/.changes/unreleased/Fixes-20241028-172719.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: use "direct" write for non-partitioned python model materializations +time: 2024-10-28T17:27:19.306348-07:00 +custom: + Author: colin-rogers-dbt + Issue: "1318" diff --git a/.changes/unreleased/Under the Hood-20240910-212052.yaml b/.changes/unreleased/Under the Hood-20240910-212052.yaml new file mode 100644 index 000000000..3e4885dcd --- /dev/null +++ b/.changes/unreleased/Under the Hood-20240910-212052.yaml @@ -0,0 +1,6 @@ +kind: Under the Hood +body: Isolating distribution testing +time: 2024-09-10T21:20:52.574204-04:00 +custom: + Author: leahwicz + Issue: "1290" diff --git a/.changes/unreleased/Under the Hood-20241104-173815.yaml b/.changes/unreleased/Under the Hood-20241104-173815.yaml new file mode 100644 index 000000000..e3e81dec1 --- /dev/null +++ b/.changes/unreleased/Under the Hood-20241104-173815.yaml @@ -0,0 +1,7 @@ +kind: Under the Hood +body: Separate credentials functionality into its own module for reuse in retry and + python submissions +time: 2024-11-04T17:38:15.940962-05:00 +custom: + Author: mikealfare + Issue: "1391" diff --git a/.github/scripts/integration-test-matrix.js b/.github/scripts/integration-test-matrix.js index 8e4b351fc..bebe08569 100644 --- a/.github/scripts/integration-test-matrix.js +++ b/.github/scripts/integration-test-matrix.js @@ -1,6 +1,6 @@ module.exports = ({ context }) => { - const defaultPythonVersion = "3.8"; - const supportedPythonVersions = ["3.8", "3.9", "3.10", "3.11", "3.12"]; + const defaultPythonVersion = "3.9"; + const supportedPythonVersions = ["3.9", "3.10", "3.11", "3.12"]; const supportedAdapters = ["bigquery"]; // if PR, generate matrix based on files changed and PR labels @@ -44,7 +44,7 @@ module.exports = ({ context }) => { if (labels.includes("test macos") || testAllLabel) { include.push({ - os: "macos-12", + os: "macos-14", adapter, "python-version": pythonVersion, }); @@ -78,7 +78,7 @@ module.exports = ({ context }) => { // additionally include runs for all adapters, on macos and windows, // but only for the default python version for (const adapter of supportedAdapters) { - for (const operatingSystem of ["windows-latest", "macos-12"]) { + for (const operatingSystem of ["windows-latest", "macos-14"]) { include.push({ os: operatingSystem, adapter: adapter, diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index 43ac4ecb3..a9179f9ce 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -33,6 +33,7 @@ on: # all PRs, important to note that `pull_request_target` workflows # will run in the context of the target branch of a PR pull_request_target: + types: [opened, reopened, synchronize, labeled] # manual trigger workflow_dispatch: inputs: @@ -280,10 +281,10 @@ jobs: persist-credentials: false ref: ${{ github.event.pull_request.head.sha }} - - name: Set up Python 3.8 + - name: Set up Python 3.9 uses: actions/setup-python@v5 with: - python-version: "3.8" + python-version: "3.9" - name: Install python dependencies run: | diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index d04b4307f..7b82f3e0f 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -50,7 +50,7 @@ jobs: - name: Set up Python uses: actions/setup-python@v5 with: - python-version: '3.8' + python-version: '3.9' - name: Install python dependencies run: | @@ -70,7 +70,7 @@ jobs: strategy: fail-fast: false matrix: - python-version: ['3.8', '3.9', '3.10', '3.11', '3.12'] + python-version: ['3.9', '3.10', '3.11', '3.12'] env: TOXENV: "unit" @@ -127,7 +127,7 @@ jobs: - name: Set up Python uses: actions/setup-python@v5 with: - python-version: '3.8' + python-version: '3.9' - name: Install python dependencies run: | @@ -163,7 +163,7 @@ jobs: overwrite: true test-build: - name: verify packages / python ${{ matrix.python-version }} / ${{ matrix.os }} + name: verify packages / python ${{ matrix.python-version }} / ${{ matrix.os }} / ${{ matrix.dist-type }} if: needs.build.outputs.is_alpha == 0 @@ -174,19 +174,22 @@ jobs: strategy: fail-fast: false matrix: - os: [ubuntu-latest, macos-12, windows-latest] - python-version: ['3.8', '3.9', '3.10', '3.11', '3.12'] + os: [ubuntu-latest, macos-14, windows-latest] + python-version: ['3.9', '3.10', '3.11', '3.12'] + dist-type: ["whl", "gz"] steps: - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} + - name: Install python dependencies run: | python -m pip install --user --upgrade pip - python -m pip install --upgrade wheel setuptools twine check-wheel-contents + python -m pip install --upgrade wheel python -m pip --version + - uses: actions/download-artifact@v4 with: name: dist @@ -195,15 +198,10 @@ jobs: - name: Show distributions run: ls -lh dist/ - - name: Install wheel distributions - run: | - find ./dist/*.whl -maxdepth 1 -type f | xargs python -m pip install --force-reinstall --find-links=dist/ - - name: Check wheel distributions + - name: Install ${{ matrix.dist-type }} distributions run: | - python -c "import dbt.adapters.bigquery" - - name: Install source distributions - run: | - find ./dist/*.gz -maxdepth 1 -type f | xargs python -m pip install --force-reinstall --find-links=dist/ - - name: Check source distributions + find ./dist/*.${{ matrix.dist-type }} -maxdepth 1 -type f | xargs python -m pip install --force-reinstall --find-links=dist/ + + - name: Check ${{ matrix.dist-type }} distributions run: | python -c "import dbt.adapters.bigquery" diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 3d0b98d45..16760bf07 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -24,7 +24,6 @@ repos: - id: black args: - --line-length=99 - - --target-version=py38 - --target-version=py39 - --target-version=py310 - --target-version=py311 diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 1af648741..f915af713 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -54,7 +54,7 @@ To confirm you have the correct version of `dbt-core` installed please run `dbt ### Initial Setup -`dbt-bigquery` contains [unit](https://github.com/dbt-labs/dbt-bigquery/tree/main/tests/unit) and [integration](https://github.com/dbt-labs/dbt-bigquery/tree/main/tests/integration) tests. Integration tests require testing against an actual BigQuery warehouse. We have CI set up to test against a BigQuery warehouse. In order to run integration tests locally, you will need a `test.env` file in the root of the repository that contains credentials for BigQuery. +`dbt-bigquery` contains [unit](https://github.com/dbt-labs/dbt-bigquery/tree/main/tests/unit) and [functional](https://github.com/dbt-labs/dbt-bigquery/tree/main/tests/functional) tests. functional tests require testing against an actual BigQuery warehouse. We have CI set up to test against a BigQuery warehouse. In order to run functional tests locally, you will need a `test.env` file in the root of the repository that contains credentials for BigQuery. Note: This `test.env` file is git-ignored, but please be _extra_ careful to never check in credentials or other sensitive information when developing. To create your `test.env` file, copy the provided example file, then supply your relevant credentials. @@ -67,7 +67,7 @@ $EDITOR test.env There are a few methods for running tests locally. #### `tox` -`tox` takes care of managing Python virtualenvs and installing dependencies in order to run tests. You can also run tests in parallel, for example you can run unit tests for Python 3.8, Python 3.9, Python 3.10, and Python 3.11 in parallel with `tox -p`. Also, you can run unit tests for specific python versions with `tox -e py38`. The configuration of these tests are located in `tox.ini`. +`tox` takes care of managing Python virtualenvs and installing dependencies in order to run tests. You can also run tests in parallel, for example you can run unit tests for Python 3.9, Python 3.10, and Python 3.11 in parallel with `tox -p`. Also, you can run unit tests for specific python versions with `tox -e py39`. The configuration of these tests are located in `tox.ini`. #### `pytest` Finally, you can also run a specific test or group of tests using `pytest` directly. With a Python virtualenv active and dev dependencies installed you can do things like: @@ -104,6 +104,6 @@ You don't need to worry about which `dbt-bigquery` version your change will go i dbt Labs provides a CI environment to test changes to the `dbt-bigquery` adapter and periodic checks against the development version of `dbt-core` through Github Actions. -A `dbt-bigquery` maintainer will review your PR. They may suggest code revision for style or clarity, or request that you add unit or integration test(s). These are good things! We believe that, with a little bit of help, anyone can contribute high-quality code. +A `dbt-bigquery` maintainer will review your PR. They may suggest code revision for style or clarity, or request that you add unit or functional test(s). These are good things! We believe that, with a little bit of help, anyone can contribute high-quality code. Once all tests are passing, you have updated the changelog to reflect and tag your issue/pr for reference with a small description of the change, and your PR has been approved, a `dbt-bigquery` maintainer will merge your changes into the active development branch. And that's it! Happy developing :tada: diff --git a/dbt/adapters/bigquery/__init__.py b/dbt/adapters/bigquery/__init__.py index 5fe68e786..74fa17cda 100644 --- a/dbt/adapters/bigquery/__init__.py +++ b/dbt/adapters/bigquery/__init__.py @@ -1,8 +1,8 @@ -from dbt.adapters.bigquery.connections import BigQueryConnectionManager # noqa -from dbt.adapters.bigquery.connections import BigQueryCredentials -from dbt.adapters.bigquery.relation import BigQueryRelation # noqa -from dbt.adapters.bigquery.column import BigQueryColumn # noqa -from dbt.adapters.bigquery.impl import BigQueryAdapter, GrantTarget, PartitionConfig # noqa +from dbt.adapters.bigquery.column import BigQueryColumn +from dbt.adapters.bigquery.connections import BigQueryConnectionManager +from dbt.adapters.bigquery.credentials import BigQueryCredentials +from dbt.adapters.bigquery.impl import BigQueryAdapter, GrantTarget, PartitionConfig +from dbt.adapters.bigquery.relation import BigQueryRelation from dbt.adapters.base import AdapterPlugin from dbt.include import bigquery diff --git a/dbt/adapters/bigquery/column.py b/dbt/adapters/bigquery/column.py index 4a12e211f..a676fef4b 100644 --- a/dbt/adapters/bigquery/column.py +++ b/dbt/adapters/bigquery/column.py @@ -1,9 +1,10 @@ from dataclasses import dataclass -from typing import Optional, List, TypeVar, Iterable, Type, Any, Dict, Union +from typing import Any, Dict, Iterable, List, Optional, Type, TypeVar, Union + +from google.cloud.bigquery import SchemaField from dbt.adapters.base.column import Column -from google.cloud.bigquery import SchemaField _PARENT_DATA_TYPE_KEY = "__parent_data_type" diff --git a/dbt/adapters/bigquery/connections.py b/dbt/adapters/bigquery/connections.py index d3eee3ef3..bda54080b 100644 --- a/dbt/adapters/bigquery/connections.py +++ b/dbt/adapters/bigquery/connections.py @@ -1,58 +1,54 @@ from collections import defaultdict from concurrent.futures import TimeoutError +from contextlib import contextmanager +from dataclasses import dataclass import json +from multiprocessing.context import SpawnContext import re -from contextlib import contextmanager -from dataclasses import dataclass, field +from typing import Dict, Hashable, List, Optional, Tuple, TYPE_CHECKING import uuid -from mashumaro.helper import pass_through - -from functools import lru_cache -from requests.exceptions import ConnectionError - -from multiprocessing.context import SpawnContext -from typing import Optional, Any, Dict, Tuple, Hashable, List, TYPE_CHECKING +from google.api_core import client_info, client_options, retry import google.auth +from google.auth import impersonated_credentials import google.auth.exceptions import google.cloud.bigquery import google.cloud.exceptions -from google.api_core import retry, client_info -from google.auth import impersonated_credentials from google.oauth2 import ( credentials as GoogleCredentials, service_account as GoogleServiceAccountCredentials, ) +from requests.exceptions import ConnectionError from dbt_common.events.contextvars import get_node_info from dbt_common.events.functions import fire_event -from dbt_common.exceptions import ( - DbtRuntimeError, - DbtConfigError, - DbtDatabaseError, -) +from dbt_common.exceptions import DbtDatabaseError, DbtRuntimeError from dbt_common.invocation import get_invocation_id -from dbt.adapters.bigquery import gcloud +from dbt.adapters.base import BaseConnectionManager from dbt.adapters.contracts.connection import ( - ConnectionState, - AdapterResponse, - Credentials, AdapterRequiredConfig, + AdapterResponse, + ConnectionState, ) -from dbt.adapters.exceptions.connection import FailedToConnectError -from dbt.adapters.base import BaseConnectionManager from dbt.adapters.events.logging import AdapterLogger from dbt.adapters.events.types import SQLQuery -from dbt.adapters.bigquery import __version__ as dbt_version -from dbt.adapters.bigquery.utility import is_base64, base64_to_string +from dbt.adapters.exceptions.connection import FailedToConnectError -from dbt_common.dataclass_schema import ExtensibleDbtClassMixin, StrEnum +import dbt.adapters.bigquery.__version__ as dbt_version +from dbt.adapters.bigquery.credentials import ( + BigQueryConnectionMethod, + Priority, + get_bigquery_defaults, + setup_default_credentials, +) +from dbt.adapters.bigquery.utility import is_base64, base64_to_string if TYPE_CHECKING: # Indirectly imported via agate_helper, which is lazy loaded further downfile. # Used by mypy for earlier type hints. import agate + logger = AdapterLogger("BigQuery") BQ_QUERY_JOB_SPLIT = "-----Query Job SQL Follows-----" @@ -73,33 +69,6 @@ ) -@lru_cache() -def get_bigquery_defaults(scopes=None) -> Tuple[Any, Optional[str]]: - """ - Returns (credentials, project_id) - - project_id is returned available from the environment; otherwise None - """ - # Cached, because the underlying implementation shells out, taking ~1s - try: - credentials, _ = google.auth.default(scopes=scopes) - return credentials, _ - except google.auth.exceptions.DefaultCredentialsError as e: - raise DbtConfigError(f"Failed to authenticate with supplied credentials\nerror:\n{e}") - - -class Priority(StrEnum): - Interactive = "interactive" - Batch = "batch" - - -class BigQueryConnectionMethod(StrEnum): - OAUTH = "oauth" - SERVICE_ACCOUNT = "service-account" - SERVICE_ACCOUNT_JSON = "service-account-json" - OAUTH_SECRETS = "oauth-secrets" - - @dataclass class BigQueryAdapterResponse(AdapterResponse): bytes_processed: Optional[int] = None @@ -110,127 +79,6 @@ class BigQueryAdapterResponse(AdapterResponse): slot_ms: Optional[int] = None -@dataclass -class DataprocBatchConfig(ExtensibleDbtClassMixin): - def __init__(self, batch_config): - self.batch_config = batch_config - - -@dataclass -class BigQueryCredentials(Credentials): - method: BigQueryConnectionMethod = None # type: ignore - - # BigQuery allows an empty database / project, where it defers to the - # environment for the project - database: Optional[str] = None - schema: Optional[str] = None - execution_project: Optional[str] = None - location: Optional[str] = None - priority: Optional[Priority] = None - maximum_bytes_billed: Optional[int] = None - impersonate_service_account: Optional[str] = None - - job_retry_deadline_seconds: Optional[int] = None - job_retries: Optional[int] = 1 - job_creation_timeout_seconds: Optional[int] = None - job_execution_timeout_seconds: Optional[int] = None - - # Keyfile json creds (unicode or base 64 encoded) - keyfile: Optional[str] = None - keyfile_json: Optional[Dict[str, Any]] = None - - # oauth-secrets - token: Optional[str] = None - refresh_token: Optional[str] = None - client_id: Optional[str] = None - client_secret: Optional[str] = None - token_uri: Optional[str] = None - - dataproc_region: Optional[str] = None - dataproc_cluster_name: Optional[str] = None - gcs_bucket: Optional[str] = None - - dataproc_batch: Optional[DataprocBatchConfig] = field( - metadata={ - "serialization_strategy": pass_through, - }, - default=None, - ) - - scopes: Optional[Tuple[str, ...]] = ( - "https://www.googleapis.com/auth/bigquery", - "https://www.googleapis.com/auth/cloud-platform", - "https://www.googleapis.com/auth/drive", - ) - - _ALIASES = { - # 'legacy_name': 'current_name' - "project": "database", - "dataset": "schema", - "target_project": "target_database", - "target_dataset": "target_schema", - "retries": "job_retries", - "timeout_seconds": "job_execution_timeout_seconds", - } - - def __post_init__(self): - if self.keyfile_json and "private_key" in self.keyfile_json: - self.keyfile_json["private_key"] = self.keyfile_json["private_key"].replace( - "\\n", "\n" - ) - if not self.method: - raise DbtRuntimeError("Must specify authentication method") - - if not self.schema: - raise DbtRuntimeError("Must specify schema") - - @property - def type(self): - return "bigquery" - - @property - def unique_field(self): - return self.database - - def _connection_keys(self): - return ( - "method", - "database", - "execution_project", - "schema", - "location", - "priority", - "maximum_bytes_billed", - "impersonate_service_account", - "job_retry_deadline_seconds", - "job_retries", - "job_creation_timeout_seconds", - "job_execution_timeout_seconds", - "timeout_seconds", - "client_id", - "token_uri", - "dataproc_region", - "dataproc_cluster_name", - "gcs_bucket", - "dataproc_batch", - ) - - @classmethod - def __pre_deserialize__(cls, d: Dict[Any, Any]) -> Dict[Any, Any]: - # We need to inject the correct value of the database (aka project) at - # this stage, ref - # https://github.com/dbt-labs/dbt/pull/2908#discussion_r532927436. - - # `database` is an alias of `project` in BigQuery - if "database" not in d: - _, database = get_bigquery_defaults() - d["database"] = database - # `execution_project` default to dataset/project - if "execution_project" not in d: - d["execution_project"] = d["database"] - return d - - class BigQueryConnectionManager(BaseConnectionManager): TYPE = "bigquery" @@ -408,14 +256,17 @@ def get_credentials(cls, profile_credentials): def get_bigquery_client(cls, profile_credentials): creds = cls.get_credentials(profile_credentials) execution_project = profile_credentials.execution_project + quota_project = profile_credentials.quota_project location = getattr(profile_credentials, "location", None) info = client_info.ClientInfo(user_agent=f"dbt-bigquery-{dbt_version.version}") + options = client_options.ClientOptions(quota_project_id=quota_project) return google.cloud.bigquery.Client( execution_project, creds, location=location, client_info=info, + client_options=options, ) @classmethod @@ -429,7 +280,7 @@ def open(cls, connection): except google.auth.exceptions.DefaultCredentialsError: logger.info("Please log into GCP to continue") - gcloud.setup_default_credentials() + setup_default_credentials() handle = cls.get_bigquery_client(connection.credentials) diff --git a/dbt/adapters/bigquery/credentials.py b/dbt/adapters/bigquery/credentials.py new file mode 100644 index 000000000..32f172dac --- /dev/null +++ b/dbt/adapters/bigquery/credentials.py @@ -0,0 +1,187 @@ +from dataclasses import dataclass, field +from functools import lru_cache +from typing import Any, Dict, Optional, Tuple + +import google.auth +from google.auth.exceptions import DefaultCredentialsError +from mashumaro import pass_through + +from dbt_common.clients.system import run_cmd +from dbt_common.dataclass_schema import ExtensibleDbtClassMixin, StrEnum +from dbt_common.exceptions import DbtConfigError, DbtRuntimeError +from dbt.adapters.contracts.connection import Credentials +from dbt.adapters.events.logging import AdapterLogger + + +_logger = AdapterLogger("BigQuery") + + +class Priority(StrEnum): + Interactive = "interactive" + Batch = "batch" + + +class BigQueryConnectionMethod(StrEnum): + OAUTH = "oauth" + SERVICE_ACCOUNT = "service-account" + SERVICE_ACCOUNT_JSON = "service-account-json" + OAUTH_SECRETS = "oauth-secrets" + + +@dataclass +class DataprocBatchConfig(ExtensibleDbtClassMixin): + def __init__(self, batch_config): + self.batch_config = batch_config + + +@lru_cache() +def get_bigquery_defaults(scopes=None) -> Tuple[Any, Optional[str]]: + """ + Returns (credentials, project_id) + + project_id is returned available from the environment; otherwise None + """ + # Cached, because the underlying implementation shells out, taking ~1s + try: + credentials, _ = google.auth.default(scopes=scopes) + return credentials, _ + except DefaultCredentialsError as e: + raise DbtConfigError(f"Failed to authenticate with supplied credentials\nerror:\n{e}") + + +def setup_default_credentials(): + if _gcloud_installed(): + run_cmd(".", ["gcloud", "auth", "application-default", "login"]) + else: + msg = """ + dbt requires the gcloud SDK to be installed to authenticate with BigQuery. + Please download and install the SDK, or use a Service Account instead. + + https://cloud.google.com/sdk/ + """ + raise DbtRuntimeError(msg) + + +def _gcloud_installed(): + try: + run_cmd(".", ["gcloud", "--version"]) + return True + except OSError as e: + _logger.debug(e) + return False + + +@dataclass +class BigQueryCredentials(Credentials): + method: BigQueryConnectionMethod = None # type: ignore + + # BigQuery allows an empty database / project, where it defers to the + # environment for the project + database: Optional[str] = None + schema: Optional[str] = None + execution_project: Optional[str] = None + quota_project: Optional[str] = None + location: Optional[str] = None + priority: Optional[Priority] = None + maximum_bytes_billed: Optional[int] = None + impersonate_service_account: Optional[str] = None + + job_retry_deadline_seconds: Optional[int] = None + job_retries: Optional[int] = 1 + job_creation_timeout_seconds: Optional[int] = None + job_execution_timeout_seconds: Optional[int] = None + + # Keyfile json creds (unicode or base 64 encoded) + keyfile: Optional[str] = None + keyfile_json: Optional[Dict[str, Any]] = None + + # oauth-secrets + token: Optional[str] = None + refresh_token: Optional[str] = None + client_id: Optional[str] = None + client_secret: Optional[str] = None + token_uri: Optional[str] = None + + dataproc_region: Optional[str] = None + dataproc_cluster_name: Optional[str] = None + gcs_bucket: Optional[str] = None + + dataproc_batch: Optional[DataprocBatchConfig] = field( + metadata={ + "serialization_strategy": pass_through, + }, + default=None, + ) + + scopes: Optional[Tuple[str, ...]] = ( + "https://www.googleapis.com/auth/bigquery", + "https://www.googleapis.com/auth/cloud-platform", + "https://www.googleapis.com/auth/drive", + ) + + _ALIASES = { + # 'legacy_name': 'current_name' + "project": "database", + "dataset": "schema", + "target_project": "target_database", + "target_dataset": "target_schema", + "retries": "job_retries", + "timeout_seconds": "job_execution_timeout_seconds", + } + + def __post_init__(self): + if self.keyfile_json and "private_key" in self.keyfile_json: + self.keyfile_json["private_key"] = self.keyfile_json["private_key"].replace( + "\\n", "\n" + ) + if not self.method: + raise DbtRuntimeError("Must specify authentication method") + + if not self.schema: + raise DbtRuntimeError("Must specify schema") + + @property + def type(self): + return "bigquery" + + @property + def unique_field(self): + return self.database + + def _connection_keys(self): + return ( + "method", + "database", + "execution_project", + "schema", + "location", + "priority", + "maximum_bytes_billed", + "impersonate_service_account", + "job_retry_deadline_seconds", + "job_retries", + "job_creation_timeout_seconds", + "job_execution_timeout_seconds", + "timeout_seconds", + "client_id", + "token_uri", + "dataproc_region", + "dataproc_cluster_name", + "gcs_bucket", + "dataproc_batch", + ) + + @classmethod + def __pre_deserialize__(cls, d: Dict[Any, Any]) -> Dict[Any, Any]: + # We need to inject the correct value of the database (aka project) at + # this stage, ref + # https://github.com/dbt-labs/dbt/pull/2908#discussion_r532927436. + + # `database` is an alias of `project` in BigQuery + if "database" not in d: + _, database = get_bigquery_defaults() + d["database"] = database + # `execution_project` default to dataset/project + if "execution_project" not in d: + d["execution_project"] = d["database"] + return d diff --git a/dbt/adapters/bigquery/dataproc/batch.py b/dbt/adapters/bigquery/dataproc/batch.py index e7f13c913..59f40d246 100644 --- a/dbt/adapters/bigquery/dataproc/batch.py +++ b/dbt/adapters/bigquery/dataproc/batch.py @@ -1,16 +1,17 @@ -from typing import Union, Dict - -import time from datetime import datetime +import time +from typing import Dict, Union + from google.cloud.dataproc_v1 import ( - CreateBatchRequest, - BatchControllerClient, Batch, + BatchControllerClient, + CreateBatchRequest, GetBatchRequest, ) from google.protobuf.json_format import ParseDict -from dbt.adapters.bigquery.connections import DataprocBatchConfig +from dbt.adapters.bigquery.credentials import DataprocBatchConfig + _BATCH_RUNNING_STATES = [Batch.State.PENDING, Batch.State.RUNNING] DEFAULT_JAR_FILE_URI = "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-0.34.0.jar" diff --git a/dbt/adapters/bigquery/dataset.py b/dbt/adapters/bigquery/dataset.py index 4ecd6daa5..a4504294a 100644 --- a/dbt/adapters/bigquery/dataset.py +++ b/dbt/adapters/bigquery/dataset.py @@ -1,8 +1,10 @@ from typing import List -from google.cloud.bigquery import Dataset, AccessEntry + +from google.cloud.bigquery import AccessEntry, Dataset from dbt.adapters.events.logging import AdapterLogger + logger = AdapterLogger("BigQuery") diff --git a/dbt/adapters/bigquery/gcloud.py b/dbt/adapters/bigquery/gcloud.py deleted file mode 100644 index ea1f644ba..000000000 --- a/dbt/adapters/bigquery/gcloud.py +++ /dev/null @@ -1,29 +0,0 @@ -from dbt_common.exceptions import DbtRuntimeError - -from dbt.adapters.events.logging import AdapterLogger -from dbt_common.clients.system import run_cmd - -NOT_INSTALLED_MSG = """ -dbt requires the gcloud SDK to be installed to authenticate with BigQuery. -Please download and install the SDK, or use a Service Account instead. - -https://cloud.google.com/sdk/ -""" - -logger = AdapterLogger("BigQuery") - - -def gcloud_installed(): - try: - run_cmd(".", ["gcloud", "--version"]) - return True - except OSError as e: - logger.debug(e) - return False - - -def setup_default_credentials(): - if gcloud_installed(): - run_cmd(".", ["gcloud", "auth", "application-default", "login"]) - else: - raise DbtRuntimeError(NOT_INSTALLED_MSG) diff --git a/dbt/adapters/bigquery/impl.py b/dbt/adapters/bigquery/impl.py index 0b49c0373..cf5800fd3 100644 --- a/dbt/adapters/bigquery/impl.py +++ b/dbt/adapters/bigquery/impl.py @@ -1,27 +1,41 @@ from dataclasses import dataclass from datetime import datetime import json -import threading from multiprocessing.context import SpawnContext - +import threading import time from typing import ( Any, Dict, + FrozenSet, + Iterable, List, Optional, + Tuple, + TYPE_CHECKING, Type, Set, Union, - FrozenSet, - Tuple, - Iterable, - TYPE_CHECKING, ) -from dbt.adapters.contracts.relation import RelationConfig +import google.api_core +import google.auth +import google.oauth2 +import google.cloud.bigquery +from google.cloud.bigquery import AccessEntry, SchemaField, Table as BigQueryTable +import google.cloud.exceptions +import pytz +from dbt_common.contracts.constraints import ( + ColumnLevelConstraint, + ConstraintType, + ModelLevelConstraint, +) +from dbt_common.dataclass_schema import dbtClassMixin +from dbt_common.events.functions import fire_event +import dbt_common.exceptions import dbt_common.exceptions.base +from dbt_common.utils import filter_null_values from dbt.adapters.base import ( AdapterConfig, BaseAdapter, @@ -37,28 +51,12 @@ from dbt.adapters.capability import Capability, CapabilityDict, CapabilitySupport, Support from dbt.adapters.contracts.connection import AdapterResponse from dbt.adapters.contracts.macros import MacroResolverProtocol -from dbt_common.contracts.constraints import ( - ColumnLevelConstraint, - ConstraintType, - ModelLevelConstraint, -) -from dbt_common.dataclass_schema import dbtClassMixin +from dbt.adapters.contracts.relation import RelationConfig from dbt.adapters.events.logging import AdapterLogger -from dbt_common.events.functions import fire_event from dbt.adapters.events.types import SchemaCreation, SchemaDrop -import dbt_common.exceptions -from dbt_common.utils import filter_null_values -import google.api_core -import google.auth -import google.oauth2 -import google.cloud.bigquery -from google.cloud.bigquery import AccessEntry, SchemaField, Table as BigQueryTable -import google.cloud.exceptions -import pytz -from dbt.adapters.bigquery import BigQueryColumn, BigQueryConnectionManager -from dbt.adapters.bigquery.column import get_nested_column_data_types -from dbt.adapters.bigquery.connections import BigQueryAdapterResponse +from dbt.adapters.bigquery.column import BigQueryColumn, get_nested_column_data_types +from dbt.adapters.bigquery.connections import BigQueryAdapterResponse, BigQueryConnectionManager from dbt.adapters.bigquery.dataset import add_access_entry_to_dataset, is_access_entry_in_dataset from dbt.adapters.bigquery.python_submissions import ( ClusterDataprocHelper, @@ -77,6 +75,7 @@ # Used by mypy for earlier type hints. import agate + logger = AdapterLogger("BigQuery") # Write dispositions for bigquery. diff --git a/dbt/adapters/bigquery/python_submissions.py b/dbt/adapters/bigquery/python_submissions.py index 368ed9d07..93c82ca92 100644 --- a/dbt/adapters/bigquery/python_submissions.py +++ b/dbt/adapters/bigquery/python_submissions.py @@ -1,21 +1,21 @@ import uuid from typing import Dict, Union -from dbt.adapters.events.logging import AdapterLogger - -from dbt.adapters.base import PythonJobHelper -from google.api_core.future.polling import POLLING_PREDICATE - -from dbt.adapters.bigquery import BigQueryConnectionManager, BigQueryCredentials from google.api_core import retry from google.api_core.client_options import ClientOptions +from google.api_core.future.polling import POLLING_PREDICATE from google.cloud import storage, dataproc_v1 from google.cloud.dataproc_v1.types.batches import Batch +from dbt.adapters.base import PythonJobHelper +from dbt.adapters.events.logging import AdapterLogger + +from dbt.adapters.bigquery.connections import BigQueryConnectionManager +from dbt.adapters.bigquery.credentials import BigQueryCredentials from dbt.adapters.bigquery.dataproc.batch import ( + DEFAULT_JAR_FILE_URI, create_batch_request, poll_batch_job, - DEFAULT_JAR_FILE_URI, update_batch_from_config, ) diff --git a/dbt/adapters/bigquery/relation.py b/dbt/adapters/bigquery/relation.py index 0e2c17670..4edc8d7ac 100644 --- a/dbt/adapters/bigquery/relation.py +++ b/dbt/adapters/bigquery/relation.py @@ -1,9 +1,13 @@ from dataclasses import dataclass, field +from itertools import chain, islice from typing import FrozenSet, Optional, TypeVar -from itertools import chain, islice +from dbt_common.exceptions import CompilationError +from dbt_common.utils.dict import filter_null_values from dbt.adapters.base.relation import BaseRelation, ComponentName, InformationSchema +from dbt.adapters.contracts.relation import RelationConfig, RelationType from dbt.adapters.relation_configs import RelationConfigChangeAction + from dbt.adapters.bigquery.relation_configs import ( BigQueryClusterConfigChange, BigQueryMaterializedViewConfig, @@ -11,9 +15,6 @@ BigQueryOptionsConfigChange, BigQueryPartitionConfigChange, ) -from dbt.adapters.contracts.relation import RelationType, RelationConfig -from dbt_common.exceptions import CompilationError -from dbt_common.utils.dict import filter_null_values Self = TypeVar("Self", bound="BigQueryRelation") diff --git a/dbt/include/bigquery/macros/materializations/table.sql b/dbt/include/bigquery/macros/materializations/table.sql index e3c5b3598..41bb69770 100644 --- a/dbt/include/bigquery/macros/materializations/table.sql +++ b/dbt/include/bigquery/macros/materializations/table.sql @@ -113,10 +113,19 @@ else: msg = f"{type(df)} is not a supported type for dbt Python materialization" raise Exception(msg) +# For writeMethod we need to use "indirect" if materializing a partitioned table +# otherwise we can use "direct". Note that indirect will fail if the GCS bucket has a retention policy set on it. +{%- if partition_config %} + {%- set write_method = 'indirect' -%} +{%- else %} + {% set write_method = 'direct' -%} +{%- endif %} + df.write \ .mode("overwrite") \ .format("bigquery") \ - .option("writeMethod", "indirect").option("writeDisposition", 'WRITE_TRUNCATE') \ + .option("writeMethod", "{{ write_method }}") \ + .option("writeDisposition", 'WRITE_TRUNCATE') \ {%- if partition_config is not none %} {%- if partition_config.data_type | lower in ('date','timestamp','datetime') %} .option("partitionField", "{{- partition_config.field -}}") \ diff --git a/dev-requirements.txt b/dev-requirements.txt index 34169172a..2c0134110 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -6,8 +6,7 @@ git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-core&subdirectory=core # dev ddtrace==2.3.0 -pre-commit~=3.7.0;python_version>="3.9" -pre-commit~=3.5.0;python_version<"3.9" +pre-commit~=3.7.0 pytest~=7.4 pytest-csv~=3.0 pytest-dotenv~=0.5.2 diff --git a/docker/Dockerfile b/docker/Dockerfile index 3b9431fd1..bda507dc5 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -1,7 +1,7 @@ # this image gets published to GHCR for production use ARG py_version=3.11.2 -FROM python:$py_version-slim-bullseye as base +FROM python:$py_version-slim-bullseye AS base RUN apt-get update \ && apt-get dist-upgrade -y \ @@ -25,7 +25,7 @@ ENV LANG=C.UTF-8 RUN python -m pip install --upgrade "pip==24.0" "setuptools==69.2.0" "wheel==0.43.0" --no-cache-dir -FROM base as dbt-bigquery +FROM base AS dbt-bigquery ARG commit_ref=main diff --git a/docker/dev.Dockerfile b/docker/dev.Dockerfile index 2afad0a95..f122f5343 100644 --- a/docker/dev.Dockerfile +++ b/docker/dev.Dockerfile @@ -1,43 +1,43 @@ # this image does not get published, it is intended for local development only, see `Makefile` for usage -FROM ubuntu:24.04 as base +FROM ubuntu:24.04 AS base # prevent python installation from asking for time zone region ARG DEBIAN_FRONTEND=noninteractive # add python repository RUN apt-get update \ - && apt-get install -y software-properties-common=0.99.22.9 \ - && add-apt-repository -y ppa:deadsnakes/ppa \ - && apt-get clean \ - && rm -rf \ - /var/lib/apt/lists/* \ - /tmp/* \ - /var/tmp/* + && apt-get install -y software-properties-common=0.99.48 \ + && add-apt-repository -y ppa:deadsnakes/ppa \ + && apt-get clean \ + && rm -rf \ + /var/lib/apt/lists/* \ + /tmp/* \ + /var/tmp/* # install python RUN apt-get update \ - && apt-get install -y --no-install-recommends \ - build-essential=12.9ubuntu3 \ - git-all=1:2.34.1-1ubuntu1.10 \ - python3.8=3.8.19-1+jammy1 \ - python3.8-dev=3.8.19-1+jammy1 \ - python3.8-distutils=3.8.19-1+jammy1 \ - python3.8-venv=3.8.19-1+jammy1 \ - python3-pip=22.0.2+dfsg-1ubuntu0.4 \ - python3-wheel=0.37.1-2ubuntu0.22.04.1 \ - && apt-get clean \ - && rm -rf \ - /var/lib/apt/lists/* \ - /tmp/* \ - /var/tmp/* + && apt-get install -y --no-install-recommends \ + build-essential=12.10ubuntu1 \ + git-all=1:2.43.0-1ubuntu7.1 \ + python3.9=3.9.20-1+noble1 \ + python3.9-dev=3.9.20-1+noble1 \ + python3.9-distutils=3.9.20-1+noble1 \ + python3.9-venv=3.9.20-1+noble1 \ + python3-pip=24.0+dfsg-1ubuntu1 \ + python3-wheel=0.42.0-2 \ + && apt-get clean \ + && rm -rf \ + /var/lib/apt/lists/* \ + /tmp/* \ + /var/tmp/* # update the default system interpreter to the newly installed version -RUN update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.8 1 +RUN update-alternatives --install /usr/bin/python python /usr/bin/python3.9 1 -FROM base as dbt-bigquery-dev +FROM base AS dbt-bigquery-dev -HEALTHCHECK CMD python3 --version || exit 1 +HEALTHCHECK CMD python --version || exit 1 # send stdout/stderr to terminal ENV PYTHONUNBUFFERED=1 diff --git a/setup.py b/setup.py index ab89a3c39..79f6025ea 100644 --- a/setup.py +++ b/setup.py @@ -2,9 +2,9 @@ import sys # require a supported version of Python -if sys.version_info < (3, 8): +if sys.version_info < (3, 9): print("Error: dbt does not support this version of Python.") - print("Please upgrade to Python 3.8 or higher.") + print("Please upgrade to Python 3.9 or higher.") sys.exit(1) try: @@ -69,11 +69,10 @@ def _dbt_bigquery_version() -> str: "Operating System :: Microsoft :: Windows", "Operating System :: MacOS :: MacOS X", "Operating System :: POSIX :: Linux", - "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", ], - python_requires=">=3.8", + python_requires=">=3.9", ) diff --git a/tests/functional/test_quota_project.py b/tests/functional/test_quota_project.py new file mode 100644 index 000000000..0b4bb90c4 --- /dev/null +++ b/tests/functional/test_quota_project.py @@ -0,0 +1,27 @@ +import os + +import pytest + +from dbt.tests.util import run_dbt + +_QUOTA_PROJECT = os.getenv("BIGQUERY_TEST_ALT_DATABASE") + + +class TestNoQuotaProject: + def test_no_quota_project(self, project): + results = run_dbt() + for result in results: + assert None == result.adapter_response["quota_project"] + + +class TestQuotaProjectOption: + @pytest.fixture(scope="class") + def profiles_config_update(self, dbt_profile_target): + outputs = {"default": dbt_profile_target} + outputs["default"]["quota_project"] = _QUOTA_PROJECT + yield + + def test_quota_project_option(self, project): + results = run_dbt() + for result in results: + assert _QUOTA_PROJECT == result.adapter_response["quota_project"] diff --git a/tests/unit/test_bigquery_adapter.py b/tests/unit/test_bigquery_adapter.py index a922525fd..ca3bfc24c 100644 --- a/tests/unit/test_bigquery_adapter.py +++ b/tests/unit/test_bigquery_adapter.py @@ -203,7 +203,7 @@ def get_adapter(self, target) -> BigQueryAdapter: class TestBigQueryAdapterAcquire(BaseTestBigQueryAdapter): @patch( - "dbt.adapters.bigquery.connections.get_bigquery_defaults", + "dbt.adapters.bigquery.credentials.get_bigquery_defaults", return_value=("credentials", "project_id"), ) @patch("dbt.adapters.bigquery.BigQueryConnectionManager.open", return_value=_bq_conn()) @@ -244,7 +244,7 @@ def test_acquire_connection_oauth_validations(self, mock_open_connection): mock_open_connection.assert_called_once() @patch( - "dbt.adapters.bigquery.connections.get_bigquery_defaults", + "dbt.adapters.bigquery.credentials.get_bigquery_defaults", return_value=("credentials", "project_id"), ) @patch("dbt.adapters.bigquery.BigQueryConnectionManager.open", return_value=_bq_conn()) @@ -386,15 +386,17 @@ def test_cancel_open_connections_single(self): adapter.connections.thread_connections.update({key: master, 1: model}) self.assertEqual(len(list(adapter.cancel_open_connections())), 1) + @patch("dbt.adapters.bigquery.impl.google.api_core.client_options.ClientOptions") @patch("dbt.adapters.bigquery.impl.google.auth.default") @patch("dbt.adapters.bigquery.impl.google.cloud.bigquery") - def test_location_user_agent(self, mock_bq, mock_auth_default): + def test_location_user_agent(self, mock_bq, mock_auth_default, MockClientOptions): creds = MagicMock() mock_auth_default.return_value = (creds, MagicMock()) adapter = self.get_adapter("loc") connection = adapter.acquire_connection("dummy") mock_client = mock_bq.Client + mock_client_options = MockClientOptions.return_value mock_client.assert_not_called() connection.handle @@ -403,6 +405,7 @@ def test_location_user_agent(self, mock_bq, mock_auth_default): creds, location="Luna Station", client_info=HasUserAgent(), + client_options=mock_client_options, ) diff --git a/tests/unit/test_bigquery_connection_manager.py b/tests/unit/test_bigquery_connection_manager.py index d09cb1635..1c14100f6 100644 --- a/tests/unit/test_bigquery_connection_manager.py +++ b/tests/unit/test_bigquery_connection_manager.py @@ -84,12 +84,14 @@ def test_is_retryable(self): rate_limit_error = exceptions.Forbidden( "code broke", errors=[{"reason": "rateLimitExceeded"}] ) + service_unavailable_error = exceptions.ServiceUnavailable("service is unavailable") self.assertTrue(_is_retryable(internal_server_error)) self.assertTrue(_is_retryable(bad_request_error)) self.assertTrue(_is_retryable(connection_error)) self.assertFalse(_is_retryable(client_error)) self.assertTrue(_is_retryable(rate_limit_error)) + self.assertTrue(_is_retryable(service_unavailable_error)) def test_drop_dataset(self): mock_table = Mock() diff --git a/tests/unit/test_configure_dataproc_batch.py b/tests/unit/test_configure_dataproc_batch.py index 94cb28efb..f56aee129 100644 --- a/tests/unit/test_configure_dataproc_batch.py +++ b/tests/unit/test_configure_dataproc_batch.py @@ -12,7 +12,7 @@ # parsed credentials class TestConfigureDataprocBatch(BaseTestBigQueryAdapter): @patch( - "dbt.adapters.bigquery.connections.get_bigquery_defaults", + "dbt.adapters.bigquery.credentials.get_bigquery_defaults", return_value=("credentials", "project_id"), ) def test_update_dataproc_serverless_batch(self, mock_get_bigquery_defaults): @@ -64,7 +64,7 @@ def to_str_values(d): ) @patch( - "dbt.adapters.bigquery.connections.get_bigquery_defaults", + "dbt.adapters.bigquery.credentials.get_bigquery_defaults", return_value=("credentials", "project_id"), ) def test_default_dataproc_serverless_batch(self, mock_get_bigquery_defaults): diff --git a/tox.ini b/tox.ini index b388dc5b3..240d85e34 100644 --- a/tox.ini +++ b/tox.ini @@ -1,8 +1,8 @@ [tox] skipsdist = True -envlist = py38,py39,py310,py311,py312 +envlist = py39,py310,py311,py312 -[testenv:{unit,py38,py39,py310,py311,py312,py}] +[testenv:{unit,py39,py310,py311,py312,py}] description = unit testing skip_install = true passenv = @@ -13,7 +13,7 @@ deps = -rdev-requirements.txt -e. -[testenv:{integration,py38,py39,py310,py311,py312,py}-{bigquery}] +[testenv:{integration,py39,py310,py311,py312,py}-{bigquery}] description = adapter plugin integration testing skip_install = true passenv = @@ -33,7 +33,7 @@ deps = -rdev-requirements.txt . -[testenv:{python-tests,py38,py39,py310,py311,py312,py}] +[testenv:{python-tests,py39,py310,py311,py312,py}] description = python integration testing skip_install = true passenv =