Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Iceberg table format in Dynamic Tables #1183

Merged
merged 36 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
4f7a40e
changelog
mikealfare Sep 17, 2024
7126592
changelog
mikealfare Sep 17, 2024
7d05939
first draft for iceberg dynamic tables
mikealfare Sep 17, 2024
01000a5
fix comparison operator
mikealfare Sep 17, 2024
1b9c00b
fix validation rules on catalog
mikealfare Sep 17, 2024
60094ed
Merge branch 'main' into dynamic-tables/iceberg-format
mikealfare Sep 18, 2024
cdc90b5
consider the entire catalog for a config change
mikealfare Sep 18, 2024
e65bba2
remove is_dynamic-related guards as that is ga now
mikealfare Sep 18, 2024
2ffe6ab
formatting
mikealfare Sep 18, 2024
f9f1d6a
formatting
mikealfare Sep 18, 2024
24626f9
Merge branch 'main' into dynamic-tables/iceberg-format
mikealfare Sep 18, 2024
3d6a76a
move .select back from agate.Row to agate.Table
mikealfare Sep 18, 2024
7444dff
formatting
mikealfare Sep 18, 2024
b1fa0ad
formatting
mikealfare Sep 18, 2024
0aee10b
fix catalog data marshalling
mikealfare Sep 19, 2024
c74283e
flip the flag for testing
mikealfare Sep 19, 2024
fb9aee1
simplify dynamic table testing
mikealfare Sep 20, 2024
50d75ae
Merge branch 'main' into dynamic-tables/iceberg-format
mikealfare Sep 20, 2024
04ec380
Merge branch 'main' into dynamic-tables/iceberg-format
mikealfare Sep 20, 2024
4373bdd
undo formatting changes to make function changes easier to see
mikealfare Sep 20, 2024
951a2ab
add iceberg dynamic tables to existing dynamic table tests
mikealfare Sep 20, 2024
610d379
update docstrings
mikealfare Sep 23, 2024
d9b8a6b
revert default iceberg to false, update tests to test both scenarios
mikealfare Sep 23, 2024
69d483b
Merge branch 'main' into dynamic-tables/iceberg-format
mikealfare Sep 24, 2024
24f7bee
use direct assignment instead of dict.update
mikealfare Sep 26, 2024
23419dd
update how the catalog gets built in the default scenario
mikealfare Sep 26, 2024
cf2f7be
Merge branch 'main' into dynamic-tables/iceberg-format
mikealfare Sep 26, 2024
8265e5e
remove string building from Model.name
mikealfare Sep 26, 2024
2c91dd4
stop testing odd target lag inputs which we don't want to support anyway
mikealfare Sep 26, 2024
529b875
comments and formatting
mikealfare Sep 26, 2024
f0684bf
comments and formatting
mikealfare Sep 26, 2024
7534895
Merge branch 'main' into dynamic-tables/iceberg-format
VersusFacit Sep 27, 2024
1d89f11
add standard incremental tables into the relation swap scenarios
mikealfare Sep 27, 2024
989c072
account for the fact that snowflake does not support renaming iceberg…
mikealfare Sep 27, 2024
5881986
account for all scenarios when swapping relation types, including tho…
mikealfare Sep 27, 2024
a42adda
make it clearer which scenarios are included in each run and why by p…
mikealfare Sep 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240917-100505.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Add support for Iceberg table format in Dynamic Tables
time: 2024-09-17T10:05:05.609859-04:00
custom:
Author: mikealfare
Issue: "1183"
15 changes: 15 additions & 0 deletions dbt/adapters/snowflake/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from dbt_common.events.functions import fire_event, warn_or_error

from dbt.adapters.snowflake.relation_configs import (
SnowflakeCatalogConfigChange,
SnowflakeDynamicTableConfig,
SnowflakeDynamicTableConfigChangeset,
SnowflakeDynamicTableRefreshModeConfigChange,
Expand Down Expand Up @@ -114,6 +115,12 @@ def dynamic_table_config_changeset(
context=new_dynamic_table.refresh_mode,
)

if new_dynamic_table.catalog != existing_dynamic_table.catalog:
config_change_collection.catalog = SnowflakeCatalogConfigChange(
action=RelationConfigChangeAction.create,
context=new_dynamic_table.catalog,
)

if config_change_collection.has_changes:
return config_change_collection
return None
Expand All @@ -132,6 +139,14 @@ def as_case_sensitive(self) -> "SnowflakeRelation":

return self.replace_path(**path_part_map)

@property
def can_be_renamed(self) -> bool:
"""
Standard tables and dynamic tables can be renamed, but Snowflake does not support renaming iceberg relations.
The iceberg standard does support renaming, so this may change in the future.
"""
return self.type in self.renameable_relations and not self.is_iceberg_format

def get_ddl_prefix_for_create(self, config: RelationConfig, temporary: bool) -> str:
"""
This macro renders the appropriate DDL prefix during the create_table_as
Expand Down
6 changes: 5 additions & 1 deletion dbt/adapters/snowflake/relation_configs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
from dbt.adapters.snowflake.relation_configs.catalog import (
SnowflakeCatalogConfig,
SnowflakeCatalogConfigChange,
)
from dbt.adapters.snowflake.relation_configs.dynamic_table import (
SnowflakeDynamicTableConfig,
SnowflakeDynamicTableConfigChangeset,
SnowflakeDynamicTableRefreshModeConfigChange,
SnowflakeDynamicTableWarehouseConfigChange,
SnowflakeDynamicTableTargetLagConfigChange,
)
from dbt.adapters.snowflake.relation_configs.formats import TableFormat
from dbt.adapters.snowflake.relation_configs.policies import (
SnowflakeIncludePolicy,
SnowflakeQuotePolicy,
SnowflakeRelationType,
)
from dbt.adapters.snowflake.relation_configs.formats import TableFormat
123 changes: 123 additions & 0 deletions dbt/adapters/snowflake/relation_configs/catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
from dataclasses import dataclass
from typing import Any, Dict, Optional, TYPE_CHECKING, Set

if TYPE_CHECKING:
import agate

from dbt.adapters.relation_configs import (
RelationConfigChange,
RelationResults,
RelationConfigValidationMixin,
RelationConfigValidationRule,
)
from dbt.adapters.contracts.relation import RelationConfig
from dbt_common.exceptions import DbtConfigError
from typing_extensions import Self

from dbt.adapters.snowflake.relation_configs.base import SnowflakeRelationConfigBase
from dbt.adapters.snowflake.relation_configs.formats import TableFormat


@dataclass(frozen=True, eq=True, unsafe_hash=True)
class SnowflakeCatalogConfig(SnowflakeRelationConfigBase, RelationConfigValidationMixin):
"""
This config follow the specs found here:
https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table
https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table#create-dynamic-iceberg-table

The following parameters are configurable by dbt:
- table_format: format for interfacing with the table, e.g. default, iceberg
- external_volume: name of the external volume in Snowflake
- base_location: the directory within the external volume that contains the data
*Note*: This directory can’t be changed after you create a table.

The following parameters are not currently configurable by dbt:
- name: snowflake
"""

table_format: Optional[TableFormat] = TableFormat.default()
name: Optional[str] = "SNOWFLAKE"
external_volume: Optional[str] = None
base_location: Optional[str] = None

@property
def validation_rules(self) -> Set[RelationConfigValidationRule]:
return {
RelationConfigValidationRule(
(self.table_format == "default")
or (self.table_format == "iceberg" and self.base_location is not None),
DbtConfigError("Please provide a `base_location` when using iceberg"),
),
RelationConfigValidationRule(
(self.table_format == "default")
or (self.table_format == "iceberg" and self.name == "SNOWFLAKE"),
DbtConfigError(
"Only Snowflake catalogs are currently supported when using iceberg"
),
),
}

@classmethod
def from_dict(cls, config_dict: Dict[str, Any]) -> Self:
kwargs_dict = {
"name": config_dict.get("name"),
"external_volume": config_dict.get("external_volume"),
"base_location": config_dict.get("base_location"),
}
if table_format := config_dict.get("table_format"):
kwargs_dict["table_format"] = TableFormat(table_format)
return super().from_dict(kwargs_dict)

@classmethod
def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any]:

if relation_config.config.extra.get("table_format") is None:
return {}

config_dict = {
"table_format": relation_config.config.extra.get("table_format"),
"name": "SNOWFLAKE", # this is not currently configurable
}

if external_volume := relation_config.config.extra.get("external_volume"):
config_dict["external_volume"] = external_volume

if base_location := relation_config.config.extra.get("base_location_subpath"):
config_dict["base_location"] = base_location

return config_dict

@classmethod
def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, Any]:
# this try block can be removed once enable_iceberg_materializations is retired
try:
catalog_results: "agate.Table" = relation_results["catalog"]
except KeyError:
# this happens when `enable_iceberg_materializations` is turned off
return {}

if len(catalog_results) == 0:
# this happens when the dynamic table is a standard dynamic table (e.g. not iceberg)
return {}

# for now, if we get catalog results, it's because this is an iceberg table
# this is because we only run `show iceberg tables` to get catalog metadata
# this will need to be updated once this is in `show objects`
catalog: "agate.Row" = catalog_results.rows[0]
config_dict = {
"table_format": "iceberg",
"name": catalog.get("catalog_name"),
"external_volume": catalog.get("external_volume_name"),
"base_location": catalog.get("base_location"),
}

return config_dict


@dataclass(frozen=True, eq=True, unsafe_hash=True)
class SnowflakeCatalogConfigChange(RelationConfigChange):
context: Optional[SnowflakeCatalogConfig] = None

@property
def requires_full_refresh(self) -> bool:
return True
mikealfare marked this conversation as resolved.
Show resolved Hide resolved
24 changes: 17 additions & 7 deletions dbt/adapters/snowflake/relation_configs/dynamic_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@
from typing_extensions import Self

from dbt.adapters.snowflake.relation_configs.base import SnowflakeRelationConfigBase
from dbt.adapters.snowflake.relation_configs.catalog import (
SnowflakeCatalogConfig,
SnowflakeCatalogConfigChange,
)


if TYPE_CHECKING:
import agate
Expand Down Expand Up @@ -55,11 +60,12 @@ class SnowflakeDynamicTableConfig(SnowflakeRelationConfigBase):
query: str
target_lag: str
snowflake_warehouse: str
catalog: SnowflakeCatalogConfig
refresh_mode: Optional[RefreshMode] = RefreshMode.default()
initialize: Optional[Initialize] = Initialize.default()

@classmethod
def from_dict(cls, config_dict) -> "SnowflakeDynamicTableConfig":
def from_dict(cls, config_dict: Dict[str, Any]) -> Self:
mikealfare marked this conversation as resolved.
Show resolved Hide resolved
kwargs_dict = {
"name": cls._render_part(ComponentName.Identifier, config_dict.get("name")),
"schema_name": cls._render_part(ComponentName.Schema, config_dict.get("schema_name")),
Expand All @@ -69,12 +75,12 @@ def from_dict(cls, config_dict) -> "SnowflakeDynamicTableConfig":
"query": config_dict.get("query"),
"target_lag": config_dict.get("target_lag"),
"snowflake_warehouse": config_dict.get("snowflake_warehouse"),
"catalog": SnowflakeCatalogConfig.from_dict(config_dict["catalog"]),
mikealfare marked this conversation as resolved.
Show resolved Hide resolved
"refresh_mode": config_dict.get("refresh_mode"),
"initialize": config_dict.get("initialize"),
}

dynamic_table: "SnowflakeDynamicTableConfig" = super().from_dict(kwargs_dict)
return dynamic_table
return super().from_dict(kwargs_dict)

@classmethod
def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any]:
Expand All @@ -85,18 +91,19 @@ def parse_relation_config(cls, relation_config: RelationConfig) -> Dict[str, Any
"query": relation_config.compiled_code,
"target_lag": relation_config.config.extra.get("target_lag"),
"snowflake_warehouse": relation_config.config.extra.get("snowflake_warehouse"),
"catalog": SnowflakeCatalogConfig.parse_relation_config(relation_config),
}

if refresh_mode := relation_config.config.extra.get("refresh_mode"):
config_dict.update(refresh_mode=refresh_mode.upper())
config_dict["refresh_mode"] = refresh_mode.upper()

if initialize := relation_config.config.extra.get("initialize"):
config_dict.update(initialize=initialize.upper())
config_dict["initialize"] = initialize.upper()

return config_dict

@classmethod
def parse_relation_results(cls, relation_results: RelationResults) -> Dict:
def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, Any]:
dynamic_table: "agate.Row" = relation_results["dynamic_table"].rows[0]

config_dict = {
Expand All @@ -106,6 +113,7 @@ def parse_relation_results(cls, relation_results: RelationResults) -> Dict:
"query": dynamic_table.get("text"),
"target_lag": dynamic_table.get("target_lag"),
"snowflake_warehouse": dynamic_table.get("warehouse"),
"catalog": SnowflakeCatalogConfig.parse_relation_results(relation_results),
"refresh_mode": dynamic_table.get("refresh_mode"),
# we don't get initialize since that's a one-time scheduler attribute, not a DT attribute
}
Expand Down Expand Up @@ -145,6 +153,7 @@ class SnowflakeDynamicTableConfigChangeset:
target_lag: Optional[SnowflakeDynamicTableTargetLagConfigChange] = None
snowflake_warehouse: Optional[SnowflakeDynamicTableWarehouseConfigChange] = None
refresh_mode: Optional[SnowflakeDynamicTableRefreshModeConfigChange] = None
catalog: Optional[SnowflakeCatalogConfigChange] = None

@property
def requires_full_refresh(self) -> bool:
Expand All @@ -157,9 +166,10 @@ def requires_full_refresh(self) -> bool:
else False
),
self.refresh_mode.requires_full_refresh if self.refresh_mode else False,
self.catalog.requires_full_refresh if self.catalog else False,
]
)

@property
def has_changes(self) -> bool:
return any([self.target_lag, self.snowflake_warehouse, self.refresh_mode])
return any([self.target_lag, self.snowflake_warehouse, self.refresh_mode, self.catalog])
5 changes: 5 additions & 0 deletions dbt/adapters/snowflake/relation_configs/formats.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from dbt_common.dataclass_schema import StrEnum # doesn't exist in standard library until py3.11
from typing_extensions import Self


class TableFormat(StrEnum):
Expand All @@ -10,5 +11,9 @@ class TableFormat(StrEnum):
DEFAULT = "default"
ICEBERG = "iceberg"

@classmethod
def default(cls) -> Self:
return cls("default")

def __str__(self):
return self.value
79 changes: 73 additions & 6 deletions dbt/include/snowflake/macros/relations/dynamic_table/create.sql
Original file line number Diff line number Diff line change
@@ -1,16 +1,83 @@
{% macro snowflake__get_create_dynamic_table_as_sql(relation, sql) -%}
{#-
-- Produce DDL that creates a dynamic table
--
-- Args:
-- - relation: Union[SnowflakeRelation, str]
-- - SnowflakeRelation - required for relation.render()
-- - str - is already the rendered relation name
-- - sql: str - the code defining the model
-- Globals:
-- - config: NodeConfig - contains the attribution required to produce a SnowflakeDynamicTableConfig
-- Returns:
-- A valid DDL statement which will result in a new dynamic table.
-#}

{%- set dynamic_table = relation.from_config(config.model) -%}

{%- if dynamic_table.catalog.table_format == 'iceberg' -%}
mikealfare marked this conversation as resolved.
Show resolved Hide resolved
{{ _get_create_dynamic_iceberg_table_as_sql(dynamic_table, relation, sql) }}
{%- else -%}
{{ _get_create_dynamic_standard_table_as_sql(dynamic_table, relation, sql) }}
{%- endif -%}

{%- endmacro %}


{% macro _get_create_dynamic_standard_table_as_sql(dynamic_table, relation, sql) -%}
{#-
-- Produce DDL that creates a standard dynamic table
--
-- This follows the syntax outlined here:
-- https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table#syntax
--
-- Args:
-- - dynamic_table: SnowflakeDynamicTableConfig - contains all of the configuration for the dynamic table
-- - relation: Union[SnowflakeRelation, str]
-- - SnowflakeRelation - required for relation.render()
-- - str - is already the rendered relation name
-- - sql: str - the code defining the model
-- Returns:
-- A valid DDL statement which will result in a new dynamic standard table.
-#}

create dynamic table {{ relation }}
target_lag = '{{ dynamic_table.target_lag }}'
warehouse = {{ dynamic_table.snowflake_warehouse }}
{% if dynamic_table.refresh_mode %}
refresh_mode = {{ dynamic_table.refresh_mode }}
{% endif %}
{% if dynamic_table.initialize %}
initialize = {{ dynamic_table.initialize }}
{% endif %}
{{ optional('refresh_mode', dynamic_table.refresh_mode) }}
{{ optional('initialize', dynamic_table.initialize) }}
as (
{{ sql }}
)

{%- endmacro %}


{% macro _get_create_dynamic_iceberg_table_as_sql(dynamic_table, relation, sql) -%}
{#-
-- Produce DDL that creates a dynamic iceberg table
--
-- This follows the syntax outlined here:
-- https://docs.snowflake.com/en/sql-reference/sql/create-dynamic-table#create-dynamic-iceberg-table
--
-- Args:
-- - dynamic_table: SnowflakeDynamicTableConfig - contains all of the configuration for the dynamic table
-- - relation: Union[SnowflakeRelation, str]
-- - SnowflakeRelation - required for relation.render()
-- - str - is already the rendered relation name
-- - sql: str - the code defining the model
-- Returns:
-- A valid DDL statement which will result in a new dynamic iceberg table.
-#}

create dynamic iceberg table {{ relation }}
mikealfare marked this conversation as resolved.
Show resolved Hide resolved
target_lag = '{{ dynamic_table.target_lag }}'
warehouse = {{ dynamic_table.snowflake_warehouse }}
{{ optional('external_volume', dynamic_table.catalog.external_volume) }}
{{ optional('catalog', dynamic_table.catalog.name) }}
base_location = {{ dynamic_table.catalog.base_location }}
{{ optional('refresh_mode', dynamic_table.refresh_mode) }}
{{ optional('initialize', dynamic_table.initialize) }}
as (
{{ sql }}
)
Expand Down
Loading