Skip to content

Commit

Permalink
feature/decouple adapters from core (#1026)
Browse files Browse the repository at this point in the history
* use dynamic schema in test_grant_access_to.py

* use dynamic schema in test_grant_access_to.py

* revert setup

* init

* init

* use dynamic schema in test_grant_access_to.py

* use dynamic schema in test_grant_access_to.py

* revert setup

* update bq feature branch to work against core feature

* add changie

* remove  --force-reinstall from make

* update dev-requirements.txt to point to dbt-core main

* fix imports in _materialized_view.py
  • Loading branch information
colin-rogers-dbt authored Jan 10, 2024
1 parent 0685167 commit f2804c0
Show file tree
Hide file tree
Showing 23 changed files with 202 additions and 162 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240102-152030.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Update base adapter references as part of decoupling migration
time: 2024-01-02T15:20:30.038221-08:00
custom:
Author: colin-rogers-dbt
Issue: "1067"
35 changes: 17 additions & 18 deletions dbt/adapters/bigquery/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
from contextlib import contextmanager
from dataclasses import dataclass, field

from dbt.events.contextvars import get_node_info
from dbt.common.invocation import get_invocation_id

from dbt.common.events.contextvars import get_node_info
from mashumaro.helper import pass_through

from functools import lru_cache
Expand All @@ -25,23 +27,21 @@
)

from dbt.adapters.bigquery import gcloud
from dbt.clients import agate_helper
from dbt.config.profile import INVALID_PROFILE_MESSAGE
from dbt.tracking import active_user
from dbt.contracts.connection import ConnectionState, AdapterResponse
from dbt.exceptions import (
FailedToConnectError,
from dbt.common.clients import agate_helper
from dbt.adapters.contracts.connection import ConnectionState, AdapterResponse
from dbt.common.exceptions import (
DbtRuntimeError,
DbtDatabaseError,
DbtProfileError,
DbtConfigError,
)
from dbt.common.exceptions import DbtDatabaseError
from dbt.adapters.exceptions.connection import FailedToConnectError
from dbt.adapters.base import BaseConnectionManager, Credentials
from dbt.events import AdapterLogger
from dbt.events.functions import fire_event
from dbt.events.types import SQLQuery
from dbt.version import __version__ as dbt_version
from dbt.adapters.events.logging import AdapterLogger
from dbt.adapters.events.types import SQLQuery
from dbt.common.events.functions import fire_event
from dbt.adapters.bigquery import __version__ as dbt_version

from dbt.dataclass_schema import ExtensibleDbtClassMixin, StrEnum
from dbt.common.dataclass_schema import ExtensibleDbtClassMixin, StrEnum

logger = AdapterLogger("BigQuery")

Expand Down Expand Up @@ -85,7 +85,7 @@ def get_bigquery_defaults(scopes=None) -> Tuple[Any, Optional[str]]:
credentials, _ = google.auth.default(scopes=scopes)
return credentials, _
except google.auth.exceptions.DefaultCredentialsError as e:
raise DbtProfileError(INVALID_PROFILE_MESSAGE.format(error_string=e))
raise DbtConfigError(f"Failed to authenticate with supplied credentials\nerror:\n{e}")


class Priority(StrEnum):
Expand Down Expand Up @@ -382,7 +382,7 @@ def get_bigquery_client(cls, profile_credentials):
execution_project = profile_credentials.execution_project
location = getattr(profile_credentials, "location", None)

info = client_info.ClientInfo(user_agent=f"dbt-{dbt_version}")
info = client_info.ClientInfo(user_agent=f"dbt-bigquery-{dbt_version.version}")
return google.cloud.bigquery.Client(
execution_project,
creds,
Expand Down Expand Up @@ -470,8 +470,7 @@ def raw_execute(

labels = self.get_labels_from_query_comment()

if active_user:
labels["dbt_invocation_id"] = active_user.invocation_id
labels["dbt_invocation_id"] = get_invocation_id()

job_params = {
"use_legacy_sql": use_legacy_sql,
Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/bigquery/dataset.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import List
from google.cloud.bigquery import Dataset, AccessEntry

from dbt.events import AdapterLogger
from dbt.adapters.events.logging import AdapterLogger

logger = AdapterLogger("BigQuery")

Expand Down
8 changes: 4 additions & 4 deletions dbt/adapters/bigquery/gcloud.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from dbt.events import AdapterLogger
import dbt.exceptions
from dbt.clients.system import run_cmd
from dbt.adapters.events.logging import AdapterLogger
import dbt.common.exceptions
from dbt.common.clients.system import run_cmd

NOT_INSTALLED_MSG = """
dbt requires the gcloud SDK to be installed to authenticate with BigQuery.
Expand All @@ -25,4 +25,4 @@ def setup_default_credentials():
if gcloud_installed():
run_cmd(".", ["gcloud", "auth", "application-default", "login"])
else:
raise dbt.exceptions.DbtRuntimeError(NOT_INSTALLED_MSG)
raise dbt.common.exceptions.DbtRuntimeError(NOT_INSTALLED_MSG)
63 changes: 34 additions & 29 deletions dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
from dataclasses import dataclass
import json
import threading
from multiprocessing.context import SpawnContext

import time
from typing import Any, Dict, List, Optional, Type, Set, Union
from typing import Any, Dict, List, Optional, Type, Set, Union, FrozenSet, Tuple, Iterable

import agate
from dbt import ui # type: ignore
from dbt.adapters.contracts.relation import RelationConfig

import dbt.common.exceptions.base
from dbt.adapters.base import ( # type: ignore
AdapterConfig,
BaseAdapter,
Expand All @@ -17,17 +21,15 @@
available,
)
from dbt.adapters.cache import _make_ref_key_dict # type: ignore
import dbt.clients.agate_helper
from dbt.contracts.connection import AdapterResponse
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.nodes import ColumnLevelConstraint, ConstraintType, ModelLevelConstraint # type: ignore
from dbt.dataclass_schema import dbtClassMixin
import dbt.deprecations
from dbt.events import AdapterLogger
from dbt.events.functions import fire_event
from dbt.events.types import SchemaCreation, SchemaDrop
import dbt.exceptions
from dbt.utils import filter_null_values
import dbt.common.clients.agate_helper
from dbt.adapters.contracts.connection import AdapterResponse
from dbt.common.contracts.constraints import ColumnLevelConstraint, ConstraintType, ModelLevelConstraint # type: ignore
from dbt.common.dataclass_schema import dbtClassMixin
from dbt.adapters.events.logging import AdapterLogger
from dbt.common.events.functions import fire_event
from dbt.adapters.events.types import SchemaCreation, SchemaDrop
import dbt.common.exceptions
from dbt.common.utils import filter_null_values
import google.api_core
import google.auth
import google.oauth2
Expand Down Expand Up @@ -116,8 +118,8 @@ class BigQueryAdapter(BaseAdapter):
ConstraintType.foreign_key: ConstraintSupport.ENFORCED,
}

def __init__(self, config) -> None:
super().__init__(config)
def __init__(self, config, mp_context: SpawnContext) -> None:
super().__init__(config, mp_context)
self.connections: BigQueryConnectionManager = self.connections

###
Expand Down Expand Up @@ -145,7 +147,9 @@ def drop_relation(self, relation: BigQueryRelation) -> None:
conn.handle.delete_table(table_ref, not_found_ok=True)

def truncate_relation(self, relation: BigQueryRelation) -> None:
raise dbt.exceptions.NotImplementedError("`truncate` is not implemented for this adapter!")
raise dbt.common.exceptions.base.NotImplementedError(
"`truncate` is not implemented for this adapter!"
)

def rename_relation(
self, from_relation: BigQueryRelation, to_relation: BigQueryRelation
Expand All @@ -160,7 +164,7 @@ def rename_relation(
or from_relation.type == RelationType.View
or to_relation.type == RelationType.View
):
raise dbt.exceptions.DbtRuntimeError(
raise dbt.common.exceptions.DbtRuntimeError(
"Renaming of views is not currently supported in BigQuery"
)

Expand Down Expand Up @@ -386,7 +390,7 @@ def copy_table(self, source, destination, materialization):
elif materialization == "table":
write_disposition = WRITE_TRUNCATE
else:
raise dbt.exceptions.CompilationError(
raise dbt.common.exceptions.CompilationError(
'Copy table materialization must be "copy" or "table", but '
f"config.get('copy_materialization', 'table') was "
f"{materialization}"
Expand Down Expand Up @@ -433,11 +437,11 @@ def poll_until_job_completes(cls, job, timeout):
job.reload()

if job.state != "DONE":
raise dbt.exceptions.DbtRuntimeError("BigQuery Timeout Exceeded")
raise dbt.common.exceptions.DbtRuntimeError("BigQuery Timeout Exceeded")

elif job.error_result:
message = "\n".join(error["message"].strip() for error in job.errors)
raise dbt.exceptions.DbtRuntimeError(message)
raise dbt.common.exceptions.DbtRuntimeError(message)

def _bq_table_to_relation(self, bq_table) -> Union[BigQueryRelation, None]:
if bq_table is None:
Expand All @@ -454,15 +458,14 @@ def _bq_table_to_relation(self, bq_table) -> Union[BigQueryRelation, None]:
@classmethod
def warning_on_hooks(cls, hook_type):
msg = "{} is not supported in bigquery and will be ignored"
warn_msg = dbt.ui.color(msg, ui.COLOR_FG_YELLOW)
logger.info(warn_msg)
logger.info(msg)

@available
def add_query(self, sql, auto_begin=True, bindings=None, abridge_sql_log=False):
if self.nice_connection_name() in ["on-run-start", "on-run-end"]:
self.warning_on_hooks(self.nice_connection_name())
else:
raise dbt.exceptions.NotImplementedError(
raise dbt.common.exceptions.base.NotImplementedError(
"`add_query` is not implemented for this adapter!"
)

Expand Down Expand Up @@ -679,14 +682,16 @@ def upload_file(
self.poll_until_job_completes(job, timeout)

@classmethod
def _catalog_filter_table(cls, table: agate.Table, manifest: Manifest) -> agate.Table:
def _catalog_filter_table(
cls, table: agate.Table, used_schemas: FrozenSet[Tuple[str, str]]
) -> agate.Table:
table = table.rename(
column_names={col.name: col.name.replace("__", ":") for col in table.columns}
)
return super()._catalog_filter_table(table, manifest)
return super()._catalog_filter_table(table, used_schemas)

def _get_catalog_schemas(self, manifest: Manifest) -> SchemaSearchMap:
candidates = super()._get_catalog_schemas(manifest)
def _get_catalog_schemas(self, relation_config: Iterable[RelationConfig]) -> SchemaSearchMap:
candidates = super()._get_catalog_schemas(relation_config)
db_schemas: Dict[str, Set[str]] = {}
result = SchemaSearchMap()

Expand Down Expand Up @@ -772,7 +777,7 @@ def describe_relation(
bq_table = self.get_bq_table(relation)
parser = BigQueryMaterializedViewConfig
else:
raise dbt.exceptions.DbtRuntimeError(
raise dbt.common.exceptions.DbtRuntimeError(
f"The method `BigQueryAdapter.describe_relation` is not implemented "
f"for the relation type: {relation.type}"
)
Expand Down Expand Up @@ -838,7 +843,7 @@ def string_add_sql(
elif location == "prepend":
return f"concat('{value}', {add_to})"
else:
raise dbt.exceptions.DbtRuntimeError(
raise dbt.common.exceptions.DbtRuntimeError(
f'Got an unexpected location value of "{location}"'
)

Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/bigquery/python_submissions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import uuid
from typing import Dict, Union

from dbt.events import AdapterLogger
from dbt.adapters.events.logging import AdapterLogger

from dbt.adapters.base import PythonJobHelper
from google.api_core.future.polling import POLLING_PREDICATE
Expand Down
18 changes: 8 additions & 10 deletions dbt/adapters/bigquery/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from typing import FrozenSet, Optional, TypeVar

from itertools import chain, islice
from dbt.context.providers import RuntimeConfigObject
from dbt.adapters.base.relation import BaseRelation, ComponentName, InformationSchema
from dbt.adapters.relation_configs import RelationConfigChangeAction
from dbt.adapters.bigquery.relation_configs import (
Expand All @@ -12,10 +11,9 @@
BigQueryOptionsConfigChange,
BigQueryPartitionConfigChange,
)
from dbt.contracts.graph.nodes import ModelNode
from dbt.contracts.relation import RelationType
from dbt.exceptions import CompilationError
from dbt.utils import filter_null_values
from dbt.adapters.contracts.relation import RelationType, RelationConfig
from dbt.common.exceptions import CompilationError
from dbt.common.utils.dict import filter_null_values


Self = TypeVar("Self", bound="BigQueryRelation")
Expand Down Expand Up @@ -63,19 +61,19 @@ def dataset(self):
return self.schema

@classmethod
def materialized_view_from_model_node(
cls, model_node: ModelNode
def materialized_view_from_relation_config(
cls, relation_config: RelationConfig
) -> BigQueryMaterializedViewConfig:
return BigQueryMaterializedViewConfig.from_model_node(model_node) # type: ignore
return BigQueryMaterializedViewConfig.from_relation_config(relation_config) # type: ignore

@classmethod
def materialized_view_config_changeset(
cls,
existing_materialized_view: BigQueryMaterializedViewConfig,
runtime_config: RuntimeConfigObject,
relation_config: RelationConfig,
) -> Optional[BigQueryMaterializedViewConfigChangeset]:
config_change_collection = BigQueryMaterializedViewConfigChangeset()
new_materialized_view = cls.materialized_view_from_model_node(runtime_config.model)
new_materialized_view = cls.materialized_view_from_relation_config(relation_config)

if new_materialized_view.options != existing_materialized_view.options:
config_change_collection.options = BigQueryOptionsConfigChange(
Expand Down
18 changes: 9 additions & 9 deletions dbt/adapters/bigquery/relation_configs/_base.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
from dataclasses import dataclass
from typing import Optional
from typing import Optional, Dict

import agate
from dbt.adapters.base.relation import Policy
from dbt.adapters.relation_configs import RelationConfigBase
from google.cloud.bigquery import Table as BigQueryTable
from typing_extensions import Self

from dbt.adapters.bigquery.relation_configs._policies import (
BigQueryIncludePolicy,
BigQueryQuotePolicy,
)
from dbt.contracts.graph.nodes import ModelNode
from dbt.contracts.relation import ComponentName
from dbt.adapters.contracts.relation import ComponentName, RelationConfig


@dataclass(frozen=True, eq=True, unsafe_hash=True)
Expand All @@ -25,25 +25,25 @@ def quote_policy(cls) -> Policy:
return BigQueryQuotePolicy()

@classmethod
def from_model_node(cls, model_node: ModelNode) -> "BigQueryBaseRelationConfig":
relation_config = cls.parse_model_node(model_node)
relation = cls.from_dict(relation_config)
def from_relation_config(cls, relation_config: RelationConfig) -> Self:
relation_config_dict = cls.parse_relation_config(relation_config)
relation = cls.from_dict(relation_config_dict)
return relation # type: ignore

@classmethod
def parse_model_node(cls, model_node: ModelNode) -> dict:
def parse_relation_config(cls, relation_config: RelationConfig) -> Dict:
raise NotImplementedError(
"`parse_model_node()` needs to be implemented on this RelationConfigBase instance"
)

@classmethod
def from_bq_table(cls, table: BigQueryTable) -> "BigQueryBaseRelationConfig":
def from_bq_table(cls, table: BigQueryTable) -> Self:
relation_config = cls.parse_bq_table(table)
relation = cls.from_dict(relation_config)
return relation # type: ignore

@classmethod
def parse_bq_table(cls, table: BigQueryTable) -> dict:
def parse_bq_table(cls, table: BigQueryTable) -> Dict:
raise NotImplementedError("`parse_bq_table()` is not implemented for this relation type")

@classmethod
Expand Down
Loading

0 comments on commit f2804c0

Please sign in to comment.