Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Iceberg Table Materialization #1170

Merged
merged 29 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
5acb32c
Add materializations of table and dynamic table.
VersusFacit Aug 22, 2024
c2b8d78
Add the method to tell something is iceberg format and pipe that thro…
VersusFacit Aug 26, 2024
5afb551
Finish create macro and fix alters.
VersusFacit Aug 29, 2024
53eb5b9
Finish todo items and begin cleaning code.
VersusFacit Aug 29, 2024
a3b13b8
revert dynamic table changes.
VersusFacit Aug 29, 2024
37006ae
Fix the drop by fixing snowflake__show_iceberg_relations
VersusFacit Aug 29, 2024
1887208
Transient needs sophisticated handling based on what user specifies for
VersusFacit Aug 29, 2024
8150261
Try to figure out what the right None semantics are.
VersusFacit Aug 29, 2024
74ec1a3
Revert to original statement.
VersusFacit Aug 29, 2024
2297226
Fix the transient behavior by passing table_type again.
VersusFacit Aug 29, 2024
1c26ee3
Rename object_format config param to table_format
VersusFacit Sep 10, 2024
491a76a
Migrate Jinja macros to Python.
VersusFacit Sep 11, 2024
c7192d3
All classes are frozen
VersusFacit Sep 11, 2024
6d77f69
Clean up the metadata queries that power is_iceberg column generation
VersusFacit Sep 11, 2024
4bf934c
Fix Python models generation argument
VersusFacit Sep 11, 2024
493c6ae
Add changelog.
VersusFacit Sep 11, 2024
66c2e5a
Try to fix duplication of join record issues.
VersusFacit Sep 11, 2024
e913f28
Use the RelationConfig protocol for type checking.
VersusFacit Sep 11, 2024
ebcc728
Fix transient semantics.
VersusFacit Sep 11, 2024
1cf5b74
Add functional tests.
VersusFacit Sep 11, 2024
f198177
Fix test.
VersusFacit Sep 11, 2024
8a26754
Fix test.
VersusFacit Sep 11, 2024
e4d98e5
Fix test and remove strip calls
VersusFacit Sep 12, 2024
92d7bc0
Add view test case.
VersusFacit Sep 12, 2024
e961bb0
Code review comments.
VersusFacit Sep 12, 2024
4a6046f
I'm using too new a version of mypy for Self.
VersusFacit Sep 12, 2024
d0c39f3
Add a behavior flag for iceberg table materialization.
VersusFacit Sep 12, 2024
17cd094
Flip order of flag.
VersusFacit Sep 12, 2024
a623bb5
Adjust test.
VersusFacit Sep 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 47 additions & 3 deletions dbt/adapters/snowflake/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
from dbt_common.exceptions import CompilationError, DbtDatabaseError, DbtRuntimeError
from dbt_common.utils import filter_null_values

from dbt.adapters.snowflake.relation_configs import SnowflakeRelationType
from dbt.adapters.snowflake.relation_configs import (
SnowflakeRelationType,
SnowflakeObjectFormat,
)
from dbt.adapters.snowflake import SnowflakeColumn
from dbt.adapters.snowflake import SnowflakeConnectionManager
from dbt.adapters.snowflake import SnowflakeRelation
Expand All @@ -29,6 +32,7 @@
import agate

SHOW_OBJECT_METADATA_MACRO_NAME = "snowflake__show_object_metadata"
LIST_ICEBERG_RELATIONS_MACRO_NAME = "snowflake__show_iceberg_relations"
VersusFacit marked this conversation as resolved.
Show resolved Hide resolved


@dataclass
Expand All @@ -44,6 +48,11 @@ class SnowflakeConfig(AdapterConfig):
merge_update_columns: Optional[str] = None
target_lag: Optional[str] = None

# extended formats
table_format: Optional[str] = None
external_volume: Optional[str] = None
base_location_subpath: Optional[str] = None


class SnowflakeAdapter(SQLAdapter):
Relation = SnowflakeRelation
Expand Down Expand Up @@ -223,8 +232,35 @@ def list_relations_without_caching(
self, schema_relation: SnowflakeRelation
) -> List[SnowflakeRelation]:
kwargs = {"schema_relation": schema_relation}

def check_is_iceberg(row, table2):
VersusFacit marked this conversation as resolved.
Show resolved Hide resolved
for match_row in table2.rows:
if (
row["name"] == match_row["name"]
and row["database_name"] == match_row["database_name"]
and row["schema_name"] == match_row["schema_name"]
):
return "Y"
return "N"

try:
results = self.execute_macro(LIST_RELATIONS_MACRO_NAME, kwargs=kwargs)
schema_objects = self.execute_macro(LIST_RELATIONS_MACRO_NAME, kwargs=kwargs)
iceberg_table_results = self.execute_macro(
LIST_ICEBERG_RELATIONS_MACRO_NAME, kwargs=kwargs
)
import agate

# this only seems to only inflate runtime 16%; TODO: stress test
results = schema_objects.compute(
[
(
"is_iceberg",
agate.Formula(
agate.Text(), lambda row: check_is_iceberg(row, iceberg_table_results)
),
)
]
)
except DbtDatabaseError as exc:
# if the schema doesn't exist, we just want to return.
# Alternatively, we could query the list of schemas before we start
Expand All @@ -237,16 +273,19 @@ def list_relations_without_caching(
columns = ["database_name", "schema_name", "name", "kind"]
if "is_dynamic" in results.column_names:
columns.append("is_dynamic")
if "is_iceberg" in results.column_names:
columns.append("is_iceberg")

return [self._parse_list_relations_result(result) for result in results.select(columns)]

def _parse_list_relations_result(self, result: "agate.Row") -> SnowflakeRelation:
# this can be reduced to always including `is_dynamic` once bundle `2024_03` is mandatory
try:
database, schema, identifier, relation_type, is_dynamic = result
database, schema, identifier, relation_type, is_dynamic, is_iceberg = result
except ValueError:
database, schema, identifier, relation_type = result
is_dynamic = "N"
is_iceberg = "N"

try:
relation_type = self.Relation.get_relation_type(relation_type.lower())
Expand All @@ -256,12 +295,17 @@ def _parse_list_relations_result(self, result: "agate.Row") -> SnowflakeRelation
if relation_type == self.Relation.Table and is_dynamic == "Y":
relation_type = self.Relation.DynamicTable

table_format: str = (
SnowflakeObjectFormat.ICEBERG if is_iceberg == "Y" else SnowflakeObjectFormat.DEFAULT
)
quote_policy = {"database": True, "schema": True, "identifier": True}

return self.Relation.create(
database=database,
schema=schema,
identifier=identifier,
type=relation_type,
table_format=table_format,
quote_policy=quote_policy,
)

Expand Down
82 changes: 81 additions & 1 deletion dbt/adapters/snowflake/relation.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,41 @@
import textwrap

from dataclasses import dataclass, field
from typing import FrozenSet, Optional, Type
from typing import FrozenSet, Optional, Type, TYPE_CHECKING


from dbt.adapters.base.relation import BaseRelation
from dbt.adapters.contracts.relation import ComponentName, RelationConfig
from dbt.adapters.events.types import AdapterEventWarning
from dbt.adapters.relation_configs import (
RelationConfigBase,
RelationConfigChangeAction,
RelationResults,
)
from dbt.adapters.utils import classproperty
from dbt_common.exceptions import DbtRuntimeError
from dbt_common.events.functions import warn_or_error

from dbt.adapters.snowflake.relation_configs import (
SnowflakeDynamicTableConfig,
SnowflakeDynamicTableConfigChangeset,
SnowflakeDynamicTableRefreshModeConfigChange,
SnowflakeDynamicTableTargetLagConfigChange,
SnowflakeDynamicTableWarehouseConfigChange,
SnowflakeObjectFormat,
SnowflakeQuotePolicy,
SnowflakeRelationType,
)

if TYPE_CHECKING:
from dbt.artifacts.resources.v1.model import ModelConfig
VersusFacit marked this conversation as resolved.
Show resolved Hide resolved


@dataclass(frozen=True, eq=False, repr=False)
class SnowflakeRelation(BaseRelation):
transient: Optional[bool] = None
type: Optional[SnowflakeRelationType] = None
table_format: str = SnowflakeObjectFormat.DEFAULT
quote_policy: SnowflakeQuotePolicy = field(default_factory=lambda: SnowflakeQuotePolicy())
require_alias: bool = False
relation_configs = {
Expand Down Expand Up @@ -53,6 +64,10 @@ class SnowflakeRelation(BaseRelation):
def is_dynamic_table(self) -> bool:
return self.type == SnowflakeRelationType.DynamicTable

@property
def is_iceberg_format(self) -> bool:
return self.table_format == SnowflakeObjectFormat.ICEBERG

@classproperty
def DynamicTable(cls) -> str:
return str(SnowflakeRelationType.DynamicTable)
Expand Down Expand Up @@ -120,3 +135,68 @@ def as_case_sensitive(self) -> "SnowflakeRelation":
path_part_map[path] = part.upper()

return self.replace_path(**path_part_map)

def get_ddl_prefix_for_create(self, config: "ModelConfig", temporary: bool):
"""
This macro renders the appropriate DDL prefix during the create_table_as
macro. It decides based on mutually exclusive table configuration options:

- TEMPORARY: Indicates a table that exists only for the duration of the session.
- ICEBERG: A specific storage format that requires a distinct DDL layout.
- TRANSIENT: A table similar to a permanent table but without fail-safe.

Additional Caveats for Iceberg models:
- transient=true throws a warning because Iceberg does not support transient tables
- A temporary relation is never an Iceberg relation because Iceberg does not
support temporary relations.
"""

transient_explicitly_set_true: bool = config.get("transient", False)

# Temporary tables are a Snowflake feature that do not exist in the
# Iceberg framework. We ignore the Iceberg status of the model.
if temporary:
return "temporary"
elif self.is_iceberg_format:
# Log a warning that transient=true on an Iceberg relation is ignored.
if transient_explicitly_set_true:
warn_or_error(
AdapterEventWarning(
base_msg=(
"Iceberg format relations cannot be transient. Please "
"remove either the transient or iceberg config options "
f"from {self.path.database}.{self.path.schema}."
f"{self.path.identifier}. If left unmodified, dbt will "
"ignore 'transient'."
)
)
)

return "iceberg"

# Always supply transient on table create DDL unless user specifically sets
# transient to false or unset.
elif transient_explicitly_set_true or config.get("transient") is None:
return "transient"
else:
return ""

def get_ddl_prefix_for_alter(self):
"""All ALTER statements on Iceberg tables require an ICEBERG prefix"""
if self.is_iceberg_format:
return "iceberg"
else:
return ""

def render_iceberg_ddl(self, config: "ModelConfig"):
base_location: str = f"_dbt/{self.schema}/{self.name}"

if subpath := config.get("base_location_subpath"):
base_location += f"/{subpath}"

iceberg_ddl_predicates: str = f"""
external_volume = '{config.get('external_volume')}'
catalog = 'snowflake'
base_location = '{base_location}'
"""
return textwrap.indent(textwrap.dedent(iceberg_ddl_predicates), " " * 10)
1 change: 1 addition & 0 deletions dbt/adapters/snowflake/relation_configs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@
SnowflakeQuotePolicy,
SnowflakeRelationType,
)
from dbt.adapters.snowflake.relation_configs.formats import SnowflakeObjectFormat
9 changes: 9 additions & 0 deletions dbt/adapters/snowflake/relation_configs/formats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from dbt_common.dataclass_schema import StrEnum # doesn't exist in standard library until py3.11


class SnowflakeObjectFormat(StrEnum):
VersusFacit marked this conversation as resolved.
Show resolved Hide resolved
DEFAULT = "default"
ICEBERG = "iceberg"

def __str__(self):
return self.value
9 changes: 9 additions & 0 deletions dbt/include/snowflake/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -318,3 +318,12 @@
{{ snowflake_dml_explicit_transaction(truncate_dml) }}
{%- endcall %}
{% endmacro %}


{% macro snowflake__show_iceberg_relations(schema_relation) %}
{%- set sql -%}
show iceberg tables in {{ schema_relation }}
{%- endset -%}
{%- set result = run_query(sql) -%}
{%- do return(result) -%}
{% endmacro %}
48 changes: 39 additions & 9 deletions dbt/include/snowflake/macros/materializations/table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,17 @@
{% set grant_config = config.get('grants') %}

{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
{%- set target_relation = api.Relation.create(identifier=identifier,
schema=schema,
database=database, type='table') -%}
{%- set target_relation = api.Relation.create(
identifier=identifier,
schema=schema,
database=database,
type='table',
table_format=config.get('table_format', 'default')
) -%}

{{ run_hooks(pre_hooks) }}

{#-- Drop the relation if it was a view to "convert" it in a table. This may lead to
-- downtime, but it should be a relatively infrequent occurrence #}
{% if old_relation is not none and not old_relation.is_table %}
{{ log("Dropping relation " ~ old_relation ~ " because it is of type " ~ old_relation.type) }}
{{ drop_relation_if_exists(old_relation) }}
{% endif %}
{{ drop_old_relation_as_needed(old_relation, target_relation) }}

{% call statement('main', language=language) -%}
{{ create_table_as(False, target_relation, compiled_code, language) }}
Expand Down Expand Up @@ -85,3 +84,34 @@ def main(session):
# dbt = dbtObj(session.table)
# df = model(dbt, session)
{%endmacro%}


{% macro drop_old_relation_as_needed(old_relation, target_relation) %}
VersusFacit marked this conversation as resolved.
Show resolved Hide resolved
{% if old_relation is none %}
{{ return('') }}
{% endif %}

{#
-- Each of these will cause some latency, but it shoudl be a relatively infrequent occurrence.

-- An existing view must be dropped for model to "convert" into a table"
#}
{% if not old_relation.is_table %}
{{ log("Dropping relation " ~ old_relation ~ " because it is of type " ~ old_relation.type) }}
{{ drop_relation_if_exists(old_relation) }}

{#
-- An existing Iceberg table must be dropped for model to "convert" into a table.
#}
{% elif old_relation.is_iceberg_format and not target_relation.is_iceberg_format %}
{{ log("Dropping relation " ~ old_relation ~ " because it is an Iceberg format table and target relation " ~ target_relation ~ " is a default format table.") }}
{{ drop_relation_if_exists(old_relation) }}

{#
-- An existing table must be dropped for model to "convert" into an Iceberg table.
#}
{% elif old_relation.is_table and not old_relation.is_iceberg_format and target_relation.is_iceberg_format %}
{{ log("Dropping relation " ~ old_relation ~ " because it is a default format table and target relation is an Iceberg format table.") }}
{{ drop_relation_if_exists(old_relation) }}
{% endif %}
{% endmacro %}
34 changes: 20 additions & 14 deletions dbt/include/snowflake/macros/relations/table/create.sql
Original file line number Diff line number Diff line change
@@ -1,14 +1,8 @@
{% macro snowflake__create_table_as(temporary, relation, compiled_code, language='sql') -%}
{%- set transient = config.get('transient', default=true) -%}

{% if temporary -%}
{%- set table_type = "temporary" -%}
{%- elif transient -%}
{%- set table_type = "transient" -%}
{%- else -%}
{%- set table_type = "" -%}
{%- endif %}
{%- set materialization_prefix = relation.get_ddl_prefix_for_create(config.model.config, temporary) -%}
{%- set alter_prefix = relation.get_ddl_prefix_for_alter() -%}

{# Generate DDL/DML #}
{%- if language == 'sql' -%}
{%- set cluster_by_keys = config.get('cluster_by', default=none) -%}
{%- set enable_automatic_clustering = config.get('automatic_clustering', default=false) -%}
Expand All @@ -26,7 +20,16 @@

{{ sql_header if sql_header is not none }}

create or replace {{ table_type }} table {{ relation }}
create or replace {{ materialization_prefix }} table {{ relation }}
{%- if relation.is_iceberg_format %}
{#
Valid DDL in CTAS statements. Plain create statements have a different order.
VersusFacit marked this conversation as resolved.
Show resolved Hide resolved
https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table
#}
{{ relation.render_iceberg_ddl(config.model.config) }}
{% else %}
{%- endif -%}

{%- set contract_config = config.get('contract') -%}
{%- if contract_config.enforced -%}
{{ get_assert_columns_equivalent(sql) }}
Expand All @@ -44,14 +47,17 @@
{%- endif %}
);
{% if cluster_by_string is not none and not temporary -%}
alter table {{relation}} cluster by ({{cluster_by_string}});
alter {{ alter_prefix }} table {{relation}} cluster by ({{cluster_by_string}});
{%- endif -%}
{% if enable_automatic_clustering and cluster_by_string is not none and not temporary -%}
alter table {{relation}} resume recluster;
{% if enable_automatic_clustering and cluster_by_string is not none and not temporary %}
alter {{ alter_prefix }} table {{relation}} resume recluster;
{%- endif -%}

{%- elif language == 'python' -%}
{{ py_write_table(compiled_code=compiled_code, target_relation=relation, table_type=table_type) }}
{%- if iceberg -%}
{% do exceptions.raise_compiler_error('Iceberg is incompatible with Python models. Please use a SQL model for the iceberg format.') %}
VersusFacit marked this conversation as resolved.
Show resolved Hide resolved
{%- endif %}
{{ py_write_table(compiled_code=compiled_code, target_relation=relation, table_type=get_create_ddl_prefix(temporary)) }}
{%- else -%}
{% do exceptions.raise_compiler_error("snowflake__create_table_as macro didn't get supported language, it got %s" % language) %}
{%- endif -%}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def list_relations_without_caching(project) -> List[SnowflakeRelation]:
database=project.database, schema=project.test_schema, identifier=""
)
with get_connection(my_adapter):
relations = my_adapter.list_relations_without_caching(schema)
relations = my_adapter.list_relations_without_caching(schema.path.schema)
mikealfare marked this conversation as resolved.
Show resolved Hide resolved
return relations

def test_list_relations_without_caching(self, project):
Expand Down
Loading