Skip to content

Commit

Permalink
[ADAP-394] support change monitoring for materialized views (#914)
Browse files Browse the repository at this point in the history
* init push of ADAP-394

* update lots of framework and update to main

* updating based on feedback

* add changelog

* remove in preivew option

* fill out changeset and config change classes for specific options

* change partition_by and cluster_by to FrozenSet, initial attempt at describe.sql

* create utility.py to add bool_setting method, update parse_model_node to use new method

* update describe.sql query

* update describe sql to be able to create list of cluster by field names

* initital attempt at modifying get_alter_materialized_view_as_sql

* update to main and add space

* initial build out of mini classes for bigquery cluster, partition, auto_refresh in materialized_view dict. still need to fix some mypy issues

* remove local package (dbt-bigquery) on `make dev-uninstall`

* update changelog entry to encompass all features in this branch

* remove alteration to setup/teardown for materialized view materialization

* fix spelling error, prepend underscore on base class module to mark as private to its package

* update call to relation to include quote and include policies, update case to match convention

* update create statement to include partition, cluster, and options clauses

* update partition config to align with existing dbt-bigquery table config

* update cluster config to align with existing dbt-bigquery table config

* update auto refresh config to align with other configs

* revert parse results to accept an agate Row

* update how defaults are handled

* add description option to materialized view since it is handled for tables

* add description option to materialized view since it is handled for tables

* fix method call chain in parse_relation_results on cluster, partition, and auto_refresh

* move PartitionConfig into relation_configs to be used by materialized views, update references

* move PartitionConfig into relation_configs to be used by materialized views, update references; add get_materialized_view_options in alignment with get_table_options; fix wild import order; add factory method for materialized views to be used in the jinja template; update expiration timestamp attribute;

* update create materialized view to use the relation config

* condition on existence of properties before templating them

* allow for "drop if exists" functionality via the google sdk

* remove unnecessary trailing semicolon

* implement replace based on create

* implement clustering, partitioning, and auto refresh for materialized views

* remove include_policy from BigQueryRelation, it's causing unit tests to fail and is not used at the moment

* partition type cannot be queried for materialized views, adjust the describe query and equality check to account for that

* add describe_relation for materialized views

* break out common utilities into a mixin for materialized view tests

* change refresh_interval_minutes from an int to a float to match the bigquery docs

* make partition optional on relation results since it cannot be queried yet

* initial draft of materialized view change tests

* build changeset for materialized view

* implement change monitoring for autorefresh and clustering on materialized views, add describe_relation method on BigQueryAdapter for utility

* committing to park changes and wrap up other 1.7 items

* update describe to use the sdk instead of sql to pick up partition information

* basic tests pass

* existing change monitoring tests pass

* partition change monitoring tests pass

* ADAP-940: Add change monitoring for partitioning clause (#962)

* committing to park changes and wrap up other 1.7 items

* update describe to use the sdk instead of sql to pick up partition information

* basic tests pass

* existing change monitoring tests pass

* partition change monitoring tests pass

---------

Co-authored-by: colin-rogers-dbt <[email protected]>

* implement PR review feedback

* delete empty file

* add MV tests for cluster and partition alone, update combined tests to perform all checks

---------

Co-authored-by: Mike Alfare <[email protected]>
Co-authored-by: Mike Alfare <[email protected]>
Co-authored-by: colin-rogers-dbt <[email protected]>
  • Loading branch information
4 people authored Oct 11, 2023
1 parent b06414f commit e72cc51
Show file tree
Hide file tree
Showing 28 changed files with 1,424 additions and 237 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20230913-130445.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: "Support change monitoring for materialized views, including: autorefresh, clustering, partitioning"
time: 2023-09-13T13:04:45.761294-05:00
custom:
Author: McKnight-42
Issue: "924"
2 changes: 2 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@ ignore =
E741,
E501,
exclude = tests
per-file-ignores =
*/__init__.py: F401
197 changes: 62 additions & 135 deletions dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
@@ -1,58 +1,56 @@
from dataclasses import dataclass
import json
import threading
from typing import Dict, List, Optional, Any, Set, Union, Type

from dbt.contracts.connection import AdapterResponse
from dbt.contracts.graph.nodes import ColumnLevelConstraint, ModelLevelConstraint, ConstraintType # type: ignore
from dbt.dataclass_schema import dbtClassMixin, ValidationError

import dbt.deprecations
import dbt.exceptions
import dbt.clients.agate_helper
import time
from typing import Any, Dict, List, Optional, Type, Set, Union

import agate
from dbt import ui # type: ignore
from dbt.adapters.base import ( # type: ignore
AdapterConfig,
BaseAdapter,
BaseRelation,
ConstraintSupport,
available,
PythonJobHelper,
RelationType,
BaseRelation,
SchemaSearchMap,
AdapterConfig,
PythonJobHelper,
available,
)

from dbt.adapters.cache import _make_ref_key_dict # type: ignore

from dbt.adapters.bigquery.column import get_nested_column_data_types
from dbt.adapters.bigquery.relation import BigQueryRelation
from dbt.adapters.bigquery.dataset import add_access_entry_to_dataset, is_access_entry_in_dataset
from dbt.adapters.bigquery import BigQueryColumn
from dbt.adapters.bigquery import BigQueryConnectionManager
from dbt.adapters.bigquery.python_submissions import (
ClusterDataprocHelper,
ServerlessDataProcHelper,
)
from dbt.adapters.bigquery.connections import BigQueryAdapterResponse
import dbt.clients.agate_helper
from dbt.contracts.connection import AdapterResponse
from dbt.contracts.graph.manifest import Manifest
from dbt.events import (
AdapterLogger,
)
from dbt.contracts.graph.nodes import ColumnLevelConstraint, ConstraintType, ModelLevelConstraint # type: ignore
from dbt.dataclass_schema import dbtClassMixin
import dbt.deprecations
from dbt.events import AdapterLogger
from dbt.events.functions import fire_event
from dbt.events.types import SchemaCreation, SchemaDrop
import dbt.exceptions
from dbt.utils import filter_null_values

import google.auth
import google.api_core
import google.auth
import google.oauth2
import google.cloud.exceptions
import google.cloud.bigquery
from google.cloud.bigquery import AccessEntry, SchemaField, Table as BigQueryTable
import google.cloud.exceptions

from google.cloud.bigquery import AccessEntry, SchemaField
from dbt.adapters.bigquery import BigQueryColumn, BigQueryConnectionManager
from dbt.adapters.bigquery.column import get_nested_column_data_types
from dbt.adapters.bigquery.connections import BigQueryAdapterResponse
from dbt.adapters.bigquery.dataset import add_access_entry_to_dataset, is_access_entry_in_dataset
from dbt.adapters.bigquery.python_submissions import (
ClusterDataprocHelper,
ServerlessDataProcHelper,
)
from dbt.adapters.bigquery.relation import BigQueryRelation
from dbt.adapters.bigquery.relation_configs import (
BigQueryBaseRelationConfig,
BigQueryMaterializedViewConfig,
PartitionConfig,
)
from dbt.adapters.bigquery.utility import sql_escape

import time
import agate
import json

logger = AdapterLogger("BigQuery")

Expand All @@ -64,105 +62,6 @@
_dataset_lock = threading.Lock()


def sql_escape(string):
if not isinstance(string, str):
raise dbt.exceptions.CompilationError(f"cannot escape a non-string: {string}")
return json.dumps(string)[1:-1]


@dataclass
class PartitionConfig(dbtClassMixin):
field: str
data_type: str = "date"
granularity: str = "day"
range: Optional[Dict[str, Any]] = None
time_ingestion_partitioning: bool = False
copy_partitions: bool = False

PARTITION_DATE = "_PARTITIONDATE"
PARTITION_TIME = "_PARTITIONTIME"

def data_type_for_partition(self):
"""Return the data type of partitions for replacement.
When time_ingestion_partitioning is enabled, the data type supported are date & timestamp.
"""
if not self.time_ingestion_partitioning:
return self.data_type

return "date" if self.data_type == "date" else "timestamp"

def reject_partition_field_column(self, columns: List[Any]) -> List[str]:
return [c for c in columns if not c.name.upper() == self.field.upper()]

def data_type_should_be_truncated(self):
"""Return true if the data type should be truncated instead of cast to the data type."""
return not (
self.data_type == "int64" or (self.data_type == "date" and self.granularity == "day")
)

def time_partitioning_field(self) -> str:
"""Return the time partitioning field name based on the data type.
The default is _PARTITIONTIME, but for date it is _PARTITIONDATE
else it will fail statements for type mismatch."""
if self.data_type == "date":
return self.PARTITION_DATE
else:
return self.PARTITION_TIME

def insertable_time_partitioning_field(self) -> str:
"""Return the insertable time partitioning field name based on the data type.
Practically, only _PARTITIONTIME works so far.
The function is meant to keep the call sites consistent as it might evolve."""
return self.PARTITION_TIME

def render(self, alias: Optional[str] = None):
column: str = (
self.field if not self.time_ingestion_partitioning else self.time_partitioning_field()
)
if alias:
column = f"{alias}.{column}"

if self.data_type_should_be_truncated():
return f"{self.data_type}_trunc({column}, {self.granularity})"
else:
return column

def render_wrapped(self, alias: Optional[str] = None):
"""Wrap the partitioning column when time involved to ensure it is properly cast to matching time."""
# if data type is going to be truncated, no need to wrap
if (
self.data_type in ("date", "timestamp", "datetime")
and not self.data_type_should_be_truncated()
and not (
self.time_ingestion_partitioning and self.data_type == "date"
) # _PARTITIONDATE is already a date
):
return f"{self.data_type}({self.render(alias)})"
else:
return self.render(alias)

@classmethod
def parse(cls, raw_partition_by) -> Optional["PartitionConfig"]:
if raw_partition_by is None:
return None
try:
cls.validate(raw_partition_by)
return cls.from_dict(
{
key: (value.lower() if isinstance(value, str) else value)
for key, value in raw_partition_by.items()
}
)
except ValidationError as exc:
raise dbt.exceptions.DbtValidationError("Could not parse partition config") from exc
except TypeError:
raise dbt.exceptions.CompilationError(
f"Invalid partition_by config:\n"
f" Got: {raw_partition_by}\n"
f' Expected a dictionary with "field" and "data_type" keys'
)


@dataclass
class GrantTarget(dbtClassMixin):
dataset: str
Expand Down Expand Up @@ -241,7 +140,9 @@ def drop_relation(self, relation: BigQueryRelation) -> None:
conn = self.connections.get_thread_connection()

table_ref = self.get_table_ref_from_relation(relation)
conn.handle.delete_table(table_ref)

# mimic "drop if exists" functionality that's ubiquitous in most sql implementations
conn.handle.delete_table(table_ref, not_found_ok=True)

def truncate_relation(self, relation: BigQueryRelation) -> None:
raise dbt.exceptions.NotImplementedError("`truncate` is not implemented for this adapter!")
Expand Down Expand Up @@ -849,6 +750,32 @@ def get_view_options(self, config: Dict[str, Any], node: Dict[str, Any]) -> Dict
opts = self.get_common_options(config, node)
return opts

@available.parse(lambda *a, **k: True)
def get_bq_table(self, relation: BigQueryRelation) -> Optional[BigQueryTable]:
try:
table = self.connections.get_bq_table(
relation.database, relation.schema, relation.identifier
)
except google.cloud.exceptions.NotFound:
table = None
return table

@available.parse(lambda *a, **k: True)
def describe_relation(
self, relation: BigQueryRelation
) -> Optional[BigQueryBaseRelationConfig]:
if relation.type == RelationType.MaterializedView:
bq_table = self.get_bq_table(relation)
parser = BigQueryMaterializedViewConfig
else:
raise dbt.exceptions.DbtRuntimeError(
f"The method `BigQueryAdapter.describe_relation` is not implemented "
f"for the relation type: {relation.type}"
)
if bq_table:
return parser.from_bq_table(bq_table)
return None

@available.parse_none
def grant_access_to(self, entity, entity_type, role, grant_target_dict):
"""
Expand Down
58 changes: 52 additions & 6 deletions dbt/adapters/bigquery/relation.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
from dataclasses import dataclass
from typing import Optional
from typing import FrozenSet, Optional, TypeVar

from itertools import chain, islice

from dbt.context.providers import RuntimeConfigObject
from dbt.adapters.base.relation import BaseRelation, ComponentName, InformationSchema
from dbt.adapters.relation_configs import RelationConfigChangeAction
from dbt.adapters.bigquery.relation_configs import (
BigQueryClusterConfigChange,
BigQueryMaterializedViewConfig,
BigQueryMaterializedViewConfigChangeset,
BigQueryOptionsConfigChange,
BigQueryPartitionConfigChange,
)
from dbt.contracts.graph.nodes import ModelNode
from dbt.contracts.relation import RelationType
from dbt.exceptions import CompilationError
from dbt.utils import filter_null_values
from typing import TypeVar


Self = TypeVar("Self", bound="BigQueryRelation")
Expand All @@ -17,9 +25,10 @@
class BigQueryRelation(BaseRelation):
quote_character: str = "`"
location: Optional[str] = None
# why do we need to use default_factory here but we can assign it directly in dbt-postgres?
renameable_relations = frozenset({RelationType.Table})
replaceable_relations = frozenset({RelationType.Table, RelationType.View})
renameable_relations: FrozenSet[RelationType] = frozenset({RelationType.Table})
replaceable_relations: FrozenSet[RelationType] = frozenset(
{RelationType.Table, RelationType.View}
)

def matches(
self,
Expand Down Expand Up @@ -53,6 +62,43 @@ def project(self):
def dataset(self):
return self.schema

@classmethod
def materialized_view_from_model_node(
cls, model_node: ModelNode
) -> BigQueryMaterializedViewConfig:
return BigQueryMaterializedViewConfig.from_model_node(model_node) # type: ignore

@classmethod
def materialized_view_config_changeset(
cls,
existing_materialized_view: BigQueryMaterializedViewConfig,
runtime_config: RuntimeConfigObject,
) -> Optional[BigQueryMaterializedViewConfigChangeset]:
config_change_collection = BigQueryMaterializedViewConfigChangeset()
new_materialized_view = cls.materialized_view_from_model_node(runtime_config.model)

if new_materialized_view.options != existing_materialized_view.options:
config_change_collection.options = BigQueryOptionsConfigChange(
action=RelationConfigChangeAction.alter,
context=new_materialized_view.options,
)

if new_materialized_view.partition != existing_materialized_view.partition:
config_change_collection.partition = BigQueryPartitionConfigChange(
action=RelationConfigChangeAction.alter,
context=new_materialized_view.partition,
)

if new_materialized_view.cluster != existing_materialized_view.cluster:
config_change_collection.cluster = BigQueryClusterConfigChange(
action=RelationConfigChangeAction.alter,
context=new_materialized_view.cluster,
)

if config_change_collection:
return config_change_collection
return None

def information_schema(self, identifier: Optional[str] = None) -> "BigQueryInformationSchema":
return BigQueryInformationSchema.from_relation(self, identifier)

Expand Down
21 changes: 21 additions & 0 deletions dbt/adapters/bigquery/relation_configs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from dbt.adapters.bigquery.relation_configs._base import BigQueryBaseRelationConfig
from dbt.adapters.bigquery.relation_configs._cluster import (
BigQueryClusterConfig,
BigQueryClusterConfigChange,
)
from dbt.adapters.bigquery.relation_configs._materialized_view import (
BigQueryMaterializedViewConfig,
BigQueryMaterializedViewConfigChangeset,
)
from dbt.adapters.bigquery.relation_configs._options import (
BigQueryOptionsConfig,
BigQueryOptionsConfigChange,
)
from dbt.adapters.bigquery.relation_configs._partition import (
PartitionConfig,
BigQueryPartitionConfigChange,
)
from dbt.adapters.bigquery.relation_configs._policies import (
BigQueryIncludePolicy,
BigQueryQuotePolicy,
)
Loading

0 comments on commit e72cc51

Please sign in to comment.