Skip to content

Commit

Permalink
implement clustering, partitioning, and auto refresh for materialized…
Browse files Browse the repository at this point in the history
… views
  • Loading branch information
mikealfare committed Oct 5, 2023
1 parent 37b2ccd commit 0696752
Show file tree
Hide file tree
Showing 12 changed files with 237 additions and 128 deletions.
14 changes: 9 additions & 5 deletions dbt/adapters/bigquery/relation_configs/auto_refresh.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Any, Dict, Optional

import agate
from dbt.adapters.relation_configs import RelationConfigChange
from dbt.adapters.relation_configs import RelationConfigChange, RelationResults
from dbt.contracts.graph.nodes import ModelNode

from dbt.adapters.bigquery.relation_configs._base import BigQueryRelationConfigBase
Expand Down Expand Up @@ -60,11 +60,15 @@ def parse_model_node(cls, model_node: ModelNode) -> Dict[str, Any]:
return config_dict

@classmethod
def parse_relation_results(cls, relation_results_entry: agate.Row) -> Dict[str, Any]: # type: ignore
def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, Any]:
options_config: agate.Table = relation_results.get("options") # type: ignore
options = {
option.get("option_name"): option.get("option_value") for option in options_config
}
config_dict = {
"enable_refresh": bool_setting(relation_results_entry.get("enable_refresh")),
"refresh_interval_minutes": relation_results_entry.get("refresh_interval_minutes"),
"max_staleness": relation_results_entry.get("max_staleness"),
"enable_refresh": bool_setting(options.get("enable_refresh")),
"refresh_interval_minutes": options.get("refresh_interval_minutes"),
"max_staleness": options.get("max_staleness"),
}
return config_dict

Expand Down
5 changes: 2 additions & 3 deletions dbt/adapters/bigquery/relation_configs/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ def parse_model_node(cls, model_node: ModelNode) -> Dict[str, Any]:
return config_dict

@classmethod
def parse_relation_results(cls, relation_results_entry: agate.Row) -> Dict[str, Any]: # type: ignore
field_list = relation_results_entry.get("cluster_by", "")
config_dict = {"fields": frozenset(field_list.split(","))}
def parse_relation_results(cls, relation_results: agate.Table) -> Dict[str, Any]: # type: ignore
config_dict = {"fields": frozenset(row.get("column_name") for row in relation_results)}
return config_dict


Expand Down
108 changes: 55 additions & 53 deletions dbt/adapters/bigquery/relation_configs/materialized_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ class BigQueryMaterializedViewConfig(BigQueryRelationConfigBase):
- materialized_view_name: name of the materialized view
- schema: dataset name of the materialized view
- database: project name of the database
- auto_refresh: object containing refresh scheduling information
- partition: object containing partition information
- cluster: object containing cluster information
- auto_refresh: object containing refresh scheduling information
- hours_to_expiration: The time when table expires
- expiration_timestamp: the time when table expires
- kms_key_name: user defined Cloud KMS encryption key
- labels: used to organized and group objects
- description: user description for materialized view
Expand All @@ -45,9 +45,9 @@ class BigQueryMaterializedViewConfig(BigQueryRelationConfigBase):
materialized_view_name: str
schema_name: str
database_name: str
auto_refresh: BigQueryAutoRefreshConfig
partition: Optional[PartitionConfig] = None
cluster: Optional[BigQueryClusterConfig] = None
auto_refresh: Optional[BigQueryAutoRefreshConfig] = None
expiration_timestamp: Optional[datetime] = None
kms_key_name: Optional[str] = None
labels: Optional[Dict[str, str]] = None
Expand All @@ -58,15 +58,22 @@ def from_dict(cls, config_dict: Dict[str, Any]) -> "BigQueryMaterializedViewConf
# required
kwargs_dict: Dict[str, Any] = {
"materialized_view_name": cls._render_part(
ComponentName.Identifier, config_dict.get("materialized_view_name")
ComponentName.Identifier, config_dict["materialized_view_name"]
),
"schema_name": cls._render_part(ComponentName.Schema, config_dict.get("schema_name")),
"schema_name": cls._render_part(ComponentName.Schema, config_dict["schema_name"]),
"database_name": cls._render_part(
ComponentName.Database, config_dict.get("database_name")
ComponentName.Database, config_dict["database_name"]
),
"auto_refresh": BigQueryAutoRefreshConfig.from_dict(config_dict["auto_refresh"]),
}

# optional
if partition := config_dict.get("partition"):
kwargs_dict.update({"partition": PartitionConfig.parse(partition)})

if cluster := config_dict.get("cluster"):
kwargs_dict.update({"cluster": BigQueryClusterConfig.from_dict(cluster)})

optional_attributes = [
"expiration_timestamp",
"kms_key_name",
Expand All @@ -78,15 +85,6 @@ def from_dict(cls, config_dict: Dict[str, Any]) -> "BigQueryMaterializedViewConf
}
kwargs_dict.update(optional_attributes_set_by_user)

if partition := config_dict.get("partition"):
kwargs_dict.update({"partition": PartitionConfig.parse(partition)})

if cluster := config_dict.get("cluster"):
kwargs_dict.update({"cluster": BigQueryClusterConfig.from_dict(cluster)})

if auto_refresh := config_dict.get("auto_refresh"):
kwargs_dict.update({"auto_refresh": BigQueryAutoRefreshConfig.from_dict(auto_refresh)})

materialized_view: "BigQueryMaterializedViewConfig" = super().from_dict(kwargs_dict) # type: ignore
return materialized_view

Expand All @@ -108,79 +106,83 @@ def parse_model_node(cls, model_node: ModelNode) -> Dict[str, Any]:
"materialized_view_name": model_node.identifier,
"schema_name": model_node.schema,
"database_name": model_node.database,
"kms_key_name": model_node.config.extra.get("kms_key_name"),
"labels": model_node.config.extra.get("labels"),
"auto_refresh": BigQueryAutoRefreshConfig.parse_model_node(model_node),
}

if description := model_node.config.extra.get("description"):
if model_node.config.persist_docs:
config_dict.update({"description": description})

if hours_to_expiration := model_node.config.extra.get("hours_to_expiration"):
config_dict.update(
{"expiration_timestamp": datetime.now() + timedelta(hours=hours_to_expiration)}
)

# optional
if "partition_by" in model_node.config:
config_dict.update({"partition": PartitionConfig.parse_model_node(model_node)})

if "cluster_by" in model_node.config:
config_dict.update({"cluster": BigQueryClusterConfig.parse_model_node(model_node)})

if "enable_refresh" in model_node.config:
if hours_to_expiration := model_node.config.extra.get("hours_to_expiration"):
config_dict.update(
{"auto_refresh": BigQueryAutoRefreshConfig.parse_model_node(model_node)}
{"expiration_timestamp": datetime.now() + timedelta(hours=hours_to_expiration)}
)

if kms_key_name := model_node.config.extra.get("kms_key_name"):
config_dict.update({"kms_key_name": kms_key_name})

if labels := model_node.config.extra.get("labels"):
config_dict.update({"labels": labels})

if description := model_node.config.extra.get("description"):
if model_node.config.persist_docs:
config_dict.update({"description": description})

return config_dict

@classmethod
def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, Any]:
materialized_view_config = relation_results.get("materialized_view")
if isinstance(materialized_view_config, agate.Table):
materialized_view = cls._get_first_row(materialized_view_config)
else:
raise DbtRuntimeError("Unsupported type returned ex. None")
materialized_view_config: agate.Table = relation_results.get("materialized_view") # type: ignore
materialized_view: agate.Row = cls._get_first_row(materialized_view_config)
options_config: agate.Table = relation_results.get("options") # type: ignore
options = {
option.get("option_name"): option.get("option_value") for option in options_config.rows
}

config_dict = {
"materialized_view_name": materialized_view.get("materialized_view_name"),
"schema_name": materialized_view.get("schema"),
"database_name": materialized_view.get("database"),
"expiration_timestamp": materialized_view.get("expiration_timestamp"),
"kms_key_name": materialized_view.get("kms_key_name"),
"labels": materialized_view.get("labels"),
"description": materialized_view.get("description"),
"materialized_view_name": materialized_view.get("table_name"),
"schema_name": materialized_view.get("table_schema"),
"database_name": materialized_view.get("table_catalog"),
"auto_refresh": BigQueryAutoRefreshConfig.parse_relation_results(relation_results),
}

if materialized_view.get("partition_field"):
# optional
partition_by: agate.Table = relation_results.get("partition_by") # type: ignore
if len(partition_by) > 0:
config_dict.update(
{"partition": PartitionConfig.parse_relation_results(materialized_view)}
{"partition": PartitionConfig.parse_relation_results(partition_by[0])}
)

if materialized_view.get("cluster_by"):
cluster_by: agate.Table = relation_results.get("cluster_by") # type: ignore
if len(cluster_by) > 0:
config_dict.update(
{"cluster": BigQueryClusterConfig.parse_relation_results(materialized_view)}
{"cluster": BigQueryClusterConfig.parse_relation_results(cluster_by)}
)

if materialized_view.get("enable_refresh"):
config_dict.update(
{
"auto_refresh": BigQueryAutoRefreshConfig.parse_relation_results(
materialized_view
)
}
)
config_dict.update(
{
"expiration_timestamp": options.get("expiration_timestamp"),
"kms_key_name": options.get("kms_key_name"),
"labels": options.get("labels"),
"description": options.get("description"),
}
)

return config_dict


@dataclass
class BigQueryMaterializedViewConfigChangeset:
auto_refresh: Optional[BigQueryAutoRefreshConfigChange] = None
partition: Optional[BigQueryPartitionConfigChange] = None
cluster: Optional[BigQueryClusterConfigChange] = None
auto_refresh: Optional[BigQueryAutoRefreshConfigChange] = None
expiration_timestamp: Optional[datetime] = None
kms_key_name: Optional[str] = None
labels: Optional[Dict[str, str]] = None
description: Optional[str] = None

@property
def requires_full_refresh(self) -> bool:
Expand Down
4 changes: 2 additions & 2 deletions dbt/adapters/bigquery/relation_configs/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,9 @@ def parse_relation_results(cls, describe_relation_results: agate.Row) -> Dict[st
Parse the results of a describe query into a raw config for `PartitionConfig.parse`
"""
config_dict = {
"field": describe_relation_results.get("partition_field"),
"field": describe_relation_results.get("partition_column_name"),
"data_type": describe_relation_results.get("partition_data_type"),
"granularity": describe_relation_results.get("partition_granularity"),
"granularity": describe_relation_results.get("partition_type"),
}

# combine range fields into dictionary, like the model config
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{% macro bigquery__get_describe_cluster_sql(relation) %}
select
column_name
from {{ relation.information_schema('COLUMNS') }}
where table_name = '{{ relation.identifier }}'
and table_schema = '{{ relation.schema }}'
and table_catalog = '{{ relation.database }}'
and clustering_ordinal_position is not null
{% endmacro %}


{% macro bigquery__describe_cluster(relation) %}
{%- set _sql = bigquery__get_describe_cluster_sql(relation) -%}
{% do return(run_query(_sql)) %}
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{% macro bigquery__get_describe_options_sql(relation) %}
select
option_name,
option_value
from {{ relation.information_schema('TABLE_OPTIONS') }}
where table_name = '{{ relation.identifier }}'
and table_schema = '{{ relation.schema }}'
and table_catalog = '{{ relation.database }}'
{% endmacro %}


{% macro bigquery__describe_options(relation) %}
{%- set _sql = bigquery__get_describe_options_sql(relation) -%}
{% do return(run_query(_sql)) %}
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{% macro bigquery__get_describe_partition_sql(relation) %}
select
c.column_name as partition_column_name,
c.data_type as partition_data_type,
case
when regexp_contains(p.partition_id, '^[0-9]{4}$') THEN 'year'
when regexp_contains(p.partition_id, '^[0-9]{6}$') THEN 'month'
when regexp_contains(p.partition_id, '^[0-9]{8}$') THEN 'day'
when regexp_contains(p.partition_id, '^[0-9]{10}$') THEN 'hour'
end as partition_type
from {{ relation.information_schema('PARTITIONS') }} p
join {{ relation.information_schema('COLUMNS') }} c
on c.table_name = p.table_name
and c.table_schema = p.table_schema
and c.table_catalog = p.table_catalog
where p.table_name = '{{ relation.identifier }}'
and p.table_schema = '{{ relation.schema }}'
and p.table_catalog = '{{ relation.database }}'
and c.is_partitioning_column = 'YES'
{% endmacro %}


{% macro bigquery__describe_partition(relation) %}
{% set _sql = bigquery__get_describe_partition_sql(relation) %}
{% do return(run_query(_sql)) %}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
relation,
configuration_changes,
sql,
existing_relation
existing_relation,
backup_relation,
intermediate_relation
) %}

{% if configuration_changes.requires_full_refresh %}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,45 +1,24 @@
{% macro bigquery__describe_materialized_view(relation) %}
{%- set _materialized_view_sql -%}
-- checks each column to see if its a cluster_by field then adds it to a new list
with ClusteringColumns as (
select
table_name,
ARRAY_AGG(
case
when clustering_ordinal_position is not null then column_name
else null
end
ignore nulls
) as clustering_fields
from
`{{ relation.database }}.{{ relation.schema }}.INFORMATION_SCHEMA.COLUMNS`
where
table_name = '{{ relation.name }}'
GROUP BY
table_name
)
select
mv.table_name as materialized_view,
c.column_name,
c.is_partitioning_column,
c.clustering_ordinal_position,
topt.option_name,
topt.option_value,
topt.option_type
from
`{{ relation.database }}.{{ relation.schema }}.INFORMATION_SCHEMA.MATERIALIZED_VIEWS` mv
left join
`{{ relation.database }}.{{ relation.schema }}.INFORMATION_SCHEMA.COLUMNS` c
on
mv.table_name = c.table_name
left join
`{{ relation.database }}.{{ relation.schema }}.INFORMATION_SCHEMA.TABLE_OPTIONS` topt
on
mv.table_name = topt.table_name
where
mv.table_name = '{{ relation.name }}'
table_name,
table_schema,
table_catalog
from {{ relation.information_schema('MATERIALIZED_VIEWS') }}
where table_name = '{{ relation.identifier }}'
and table_schema = '{{ relation.schema }}'
and table_catalog = '{{ relation.database }}'
{%- endset %}
{% set _materialized_view = run_query(_materialized_view_sql) %}

{% do return({'materialized_view': _materialized_viewy}) %}
{%- set _partition_by = bigquery__describe_partition(relation) -%}
{%- set _cluster_by = bigquery__describe_cluster(relation) -%}
{%- set _options = bigquery__describe_options(relation) -%}

{% do return({
'materialized_view': _materialized_view,
'partition_by': _partition_by,
'cluster_by': _cluster_by,
'options': _options
}) %}
{% endmacro %}
Loading

0 comments on commit 0696752

Please sign in to comment.