Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Iceberg Table Materialization #1170

Merged
merged 29 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
5acb32c
Add materializations of table and dynamic table.
VersusFacit Aug 22, 2024
c2b8d78
Add the method to tell something is iceberg format and pipe that thro…
VersusFacit Aug 26, 2024
5afb551
Finish create macro and fix alters.
VersusFacit Aug 29, 2024
53eb5b9
Finish todo items and begin cleaning code.
VersusFacit Aug 29, 2024
a3b13b8
revert dynamic table changes.
VersusFacit Aug 29, 2024
37006ae
Fix the drop by fixing snowflake__show_iceberg_relations
VersusFacit Aug 29, 2024
1887208
Transient needs sophisticated handling based on what user specifies for
VersusFacit Aug 29, 2024
8150261
Try to figure out what the right None semantics are.
VersusFacit Aug 29, 2024
74ec1a3
Revert to original statement.
VersusFacit Aug 29, 2024
2297226
Fix the transient behavior by passing table_type again.
VersusFacit Aug 29, 2024
1c26ee3
Rename object_format config param to table_format
VersusFacit Sep 10, 2024
491a76a
Migrate Jinja macros to Python.
VersusFacit Sep 11, 2024
c7192d3
All classes are frozen
VersusFacit Sep 11, 2024
6d77f69
Clean up the metadata queries that power is_iceberg column generation
VersusFacit Sep 11, 2024
4bf934c
Fix Python models generation argument
VersusFacit Sep 11, 2024
493c6ae
Add changelog.
VersusFacit Sep 11, 2024
66c2e5a
Try to fix duplication of join record issues.
VersusFacit Sep 11, 2024
e913f28
Use the RelationConfig protocol for type checking.
VersusFacit Sep 11, 2024
ebcc728
Fix transient semantics.
VersusFacit Sep 11, 2024
1cf5b74
Add functional tests.
VersusFacit Sep 11, 2024
f198177
Fix test.
VersusFacit Sep 11, 2024
8a26754
Fix test.
VersusFacit Sep 11, 2024
e4d98e5
Fix test and remove strip calls
VersusFacit Sep 12, 2024
92d7bc0
Add view test case.
VersusFacit Sep 12, 2024
e961bb0
Code review comments.
VersusFacit Sep 12, 2024
4a6046f
I'm using too new a version of mypy for Self.
VersusFacit Sep 12, 2024
d0c39f3
Add a behavior flag for iceberg table materialization.
VersusFacit Sep 12, 2024
17cd094
Flip order of flag.
VersusFacit Sep 12, 2024
a623bb5
Adjust test.
VersusFacit Sep 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240911-001806.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Add support for Iceberg table materializations.
time: 2024-09-11T00:18:06.780586-07:00
custom:
Author: versusfacit
Issue: "321"
41 changes: 36 additions & 5 deletions dbt/adapters/snowflake/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
LIST_SCHEMAS_MACRO_NAME,
LIST_RELATIONS_MACRO_NAME,
)
from dbt_common.behavior_flags import BehaviorFlag
from dbt_common.contracts.constraints import ConstraintType
from dbt_common.contracts.metadata import (
TableMetadata,
Expand All @@ -20,7 +21,10 @@
from dbt_common.exceptions import CompilationError, DbtDatabaseError, DbtRuntimeError
from dbt_common.utils import filter_null_values

from dbt.adapters.snowflake.relation_configs import SnowflakeRelationType
from dbt.adapters.snowflake.relation_configs import (
SnowflakeRelationType,
TableFormat,
)
from dbt.adapters.snowflake import SnowflakeColumn
from dbt.adapters.snowflake import SnowflakeConnectionManager
from dbt.adapters.snowflake import SnowflakeRelation
Expand All @@ -44,6 +48,11 @@ class SnowflakeConfig(AdapterConfig):
merge_update_columns: Optional[str] = None
target_lag: Optional[str] = None

# extended formats
table_format: Optional[str] = None
external_volume: Optional[str] = None
base_location_subpath: Optional[str] = None


class SnowflakeAdapter(SQLAdapter):
Relation = SnowflakeRelation
Expand All @@ -69,6 +78,10 @@ class SnowflakeAdapter(SQLAdapter):
}
)

@property
def _behavior_flags(self) -> List[BehaviorFlag]:
return [{"name": "enable_iceberg_materializations", "default": False}]

@classmethod
def date_function(cls):
return "CURRENT_TIMESTAMP()"
Expand Down Expand Up @@ -223,8 +236,9 @@ def list_relations_without_caching(
self, schema_relation: SnowflakeRelation
) -> List[SnowflakeRelation]:
kwargs = {"schema_relation": schema_relation}

try:
results = self.execute_macro(LIST_RELATIONS_MACRO_NAME, kwargs=kwargs)
schema_objects = self.execute_macro(LIST_RELATIONS_MACRO_NAME, kwargs=kwargs)
except DbtDatabaseError as exc:
# if the schema doesn't exist, we just want to return.
# Alternatively, we could query the list of schemas before we start
Expand All @@ -235,18 +249,26 @@ def list_relations_without_caching(

# this can be reduced to always including `is_dynamic` once bundle `2024_03` is mandatory
columns = ["database_name", "schema_name", "name", "kind"]
if "is_dynamic" in results.column_names:
if "is_dynamic" in schema_objects.column_names:
columns.append("is_dynamic")
if "is_iceberg" in schema_objects.column_names:
columns.append("is_iceberg")

return [self._parse_list_relations_result(result) for result in results.select(columns)]
return [self._parse_list_relations_result(obj) for obj in schema_objects.select(columns)]

def _parse_list_relations_result(self, result: "agate.Row") -> SnowflakeRelation:
# this can be reduced to always including `is_dynamic` once bundle `2024_03` is mandatory
# this can be reduced to always including `is_iceberg` once Snowflake adds it to show objects
try:
database, schema, identifier, relation_type, is_dynamic = result
if self.behavior.enable_iceberg_materializations.no_warn:
database, schema, identifier, relation_type, is_dynamic, is_iceberg = result
else:
database, schema, identifier, relation_type, is_dynamic = result
except ValueError:
database, schema, identifier, relation_type = result
is_dynamic = "N"
if self.behavior.enable_iceberg_materializations.no_warn:
is_iceberg = "N"

try:
relation_type = self.Relation.get_relation_type(relation_type.lower())
Expand All @@ -256,12 +278,21 @@ def _parse_list_relations_result(self, result: "agate.Row") -> SnowflakeRelation
if relation_type == self.Relation.Table and is_dynamic == "Y":
relation_type = self.Relation.DynamicTable

# This line is the main gate on supporting Iceberg materializations. Pass forward a default
# table format, and no downstream table macros can build iceberg relations.
table_format: str = (
TableFormat.ICEBERG
if self.behavior.enable_iceberg_materializations.no_warn and is_iceberg in ("Y", "YES")
else TableFormat.DEFAULT
)
quote_policy = {"database": True, "schema": True, "identifier": True}

return self.Relation.create(
database=database,
schema=schema,
identifier=identifier,
type=relation_type,
table_format=table_format,
quote_policy=quote_policy,
)

Expand Down
132 changes: 131 additions & 1 deletion dbt/adapters/snowflake/relation.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,28 @@
import textwrap

from dataclasses import dataclass, field
from typing import FrozenSet, Optional, Type
from typing import FrozenSet, Optional, Type, Iterator, Tuple


from dbt.adapters.base.relation import BaseRelation
from dbt.adapters.contracts.relation import ComponentName, RelationConfig
from dbt.adapters.events.types import AdapterEventWarning, AdapterEventDebug
from dbt.adapters.relation_configs import (
RelationConfigBase,
RelationConfigChangeAction,
RelationResults,
)
from dbt.adapters.utils import classproperty
from dbt_common.exceptions import DbtRuntimeError
from dbt_common.events.functions import fire_event, warn_or_error

from dbt.adapters.snowflake.relation_configs import (
SnowflakeDynamicTableConfig,
SnowflakeDynamicTableConfigChangeset,
SnowflakeDynamicTableRefreshModeConfigChange,
SnowflakeDynamicTableTargetLagConfigChange,
SnowflakeDynamicTableWarehouseConfigChange,
TableFormat,
SnowflakeQuotePolicy,
SnowflakeRelationType,
)
Expand All @@ -25,6 +31,7 @@
@dataclass(frozen=True, eq=False, repr=False)
class SnowflakeRelation(BaseRelation):
type: Optional[SnowflakeRelationType] = None
table_format: str = TableFormat.DEFAULT
quote_policy: SnowflakeQuotePolicy = field(default_factory=lambda: SnowflakeQuotePolicy())
require_alias: bool = False
relation_configs = {
Expand Down Expand Up @@ -53,6 +60,10 @@ class SnowflakeRelation(BaseRelation):
def is_dynamic_table(self) -> bool:
return self.type == SnowflakeRelationType.DynamicTable

@property
def is_iceberg_format(self) -> bool:
return self.table_format == TableFormat.ICEBERG

@classproperty
def DynamicTable(cls) -> str:
return str(SnowflakeRelationType.DynamicTable)
Expand Down Expand Up @@ -120,3 +131,122 @@ def as_case_sensitive(self) -> "SnowflakeRelation":
path_part_map[path] = part.upper()

return self.replace_path(**path_part_map)

def get_ddl_prefix_for_create(self, config: RelationConfig, temporary: bool) -> str:
"""
This macro renders the appropriate DDL prefix during the create_table_as
macro. It decides based on mutually exclusive table configuration options:

- TEMPORARY: Indicates a table that exists only for the duration of the session.
- ICEBERG: A specific storage format that requires a distinct DDL layout.
- TRANSIENT: A table similar to a permanent table but without fail-safe.

Additional Caveats for Iceberg models:
- transient=true throws a warning because Iceberg does not support transient tables
- A temporary relation is never an Iceberg relation because Iceberg does not
support temporary relations.
"""

transient_explicitly_set_true: bool = config.get("transient", False)

# Temporary tables are a Snowflake feature that do not exist in the
# Iceberg framework. We ignore the Iceberg status of the model.
if temporary:
return "temporary"
elif self.is_iceberg_format:
# Log a warning that transient=true on an Iceberg relation is ignored.
if transient_explicitly_set_true:
warn_or_error(
AdapterEventWarning(
base_msg=(
"Iceberg format relations cannot be transient. Please "
"remove either the transient or iceberg config options "
f"from {self.path.database}.{self.path.schema}."
f"{self.path.identifier}. If left unmodified, dbt will "
"ignore 'transient'."
)
)
)

return "iceberg"

# Always supply transient on table create DDL unless user specifically sets
# transient to false or unset. Might as well update the object attribute too!
elif transient_explicitly_set_true or config.get("transient", True):
return "transient"
else:
return ""

def get_ddl_prefix_for_alter(self) -> str:
"""All ALTER statements on Iceberg tables require an ICEBERG prefix"""
if self.is_iceberg_format:
return "iceberg"
else:
return ""

def get_iceberg_ddl_options(self, config: RelationConfig) -> str:
base_location: str = f"_dbt/{self.schema}/{self.name}"

if subpath := config.get("base_location_subpath"):
base_location += f"/{subpath}"

iceberg_ddl_predicates: str = f"""
external_volume = '{config.get('external_volume')}'
catalog = 'snowflake'
base_location = '{base_location}'
"""
return textwrap.indent(textwrap.dedent(iceberg_ddl_predicates), " " * 10)

def __drop_conditions(self, old_relation: "SnowflakeRelation") -> Iterator[Tuple[bool, str]]:
drop_view_message: str = (
f"Dropping relation {old_relation} because it is a view and target relation {self} "
f"is of type {self.type}."
)

drop_table_for_iceberg_message: str = (
f"Dropping relation {old_relation} because it is a default format table "
f"and target relation {self} is an Iceberg format table."
)

drop_iceberg_for_table_message: str = (
f"Dropping relation {old_relation} because it is an Iceberg format table "
f"and target relation {self} is a default format table."
)

# An existing view must be dropped for model to build into a table".
yield (not old_relation.is_table, drop_view_message)
# An existing table must be dropped for model to build into an Iceberg table.
yield (
old_relation.is_table
and not old_relation.is_iceberg_format
and self.is_iceberg_format,
drop_table_for_iceberg_message,
)
# existing Iceberg table must be dropped for model to build into a table.
yield (
old_relation.is_table
and old_relation.is_iceberg_format
and not self.is_iceberg_format,
drop_iceberg_for_table_message,
)

def needs_to_drop(self, old_relation: Optional["SnowflakeRelation"]) -> bool:
"""
To convert between Iceberg and non-Iceberg relations, a preemptive drop is
required.

drops cause latency, but it should be a relatively infrequent occurrence.

Some Boolean expression below are logically redundant, but this is done for easier
readability.
"""

if old_relation is None:
return False

for condition, message in self.__drop_conditions(old_relation):
if condition:
fire_event(AdapterEventDebug(base_msg=message))
return True

return False
1 change: 1 addition & 0 deletions dbt/adapters/snowflake/relation_configs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@
SnowflakeQuotePolicy,
SnowflakeRelationType,
)
from dbt.adapters.snowflake.relation_configs.formats import TableFormat
14 changes: 14 additions & 0 deletions dbt/adapters/snowflake/relation_configs/formats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from dbt_common.dataclass_schema import StrEnum # doesn't exist in standard library until py3.11


class TableFormat(StrEnum):
"""
Snowflake docs refers to this an 'Object Format.'
Data practitioners and interfaces refer to this as 'Table Format's, hence the term's use here.
"""

DEFAULT = "default"
ICEBERG = "iceberg"

def __str__(self):
return self.value
27 changes: 18 additions & 9 deletions dbt/include/snowflake/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,24 @@
{% macro snowflake__list_relations_without_caching(schema_relation, max_iter=10, max_results_per_iter=10000) %}

{%- set max_total_results = max_results_per_iter * max_iter -%}
{% if schema_relation is string %}
{%- set sql -%}
show objects in {{ schema_relation }} limit {{ max_results_per_iter }}
{%- endset -%}
{% else %}
{%- set sql -%}
show objects in {{ schema_relation.include(identifier=False) }} limit {{ max_results_per_iter }}
{%- endset -%}
{% endif -%}
{%- set sql -%}
{% if schema_relation is string %}
show objects in {{ schema_relation }} limit {{ max_results_per_iter }};
{% else %}
show objects in {{ schema_relation.include(identifier=False) }} limit {{ max_results_per_iter }};
{% endif -%}

{# -- Gated for performance reason. If you don't want Iceberg, you shouldn't pay the
-- latency penalty. #}
{% if adapter.behavior.enable_iceberg_materializations.no_warn %}
select all_objects.*, is_iceberg as "is_iceberg"
from table(result_scan(last_query_id(-1))) all_objects
left join INFORMATION_SCHEMA.tables as all_tables
on all_tables.table_name = all_objects."name"
and all_tables.table_schema = all_objects."schema_name"
and all_tables.table_catalog = all_objects."database_name"
{% endif -%}
{%- endset -%}

{%- set result = run_query(sql) -%}

Expand Down
15 changes: 8 additions & 7 deletions dbt/include/snowflake/macros/materializations/table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@
{% set grant_config = config.get('grants') %}

{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
{%- set target_relation = api.Relation.create(identifier=identifier,
schema=schema,
database=database, type='table') -%}
{%- set target_relation = api.Relation.create(
identifier=identifier,
schema=schema,
database=database,
type='table',
table_format=config.get('table_format', 'default')
) -%}

{{ run_hooks(pre_hooks) }}

{#-- Drop the relation if it was a view to "convert" it in a table. This may lead to
-- downtime, but it should be a relatively infrequent occurrence #}
{% if old_relation is not none and not old_relation.is_table %}
{{ log("Dropping relation " ~ old_relation ~ " because it is of type " ~ old_relation.type) }}
{% if target_relation.needs_to_drop(old_relation) %}
{{ drop_relation_if_exists(old_relation) }}
{% endif %}

Expand Down
Loading