Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Iceberg Table Materialization #1170

Merged
merged 29 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
5acb32c
Add materializations of table and dynamic table.
VersusFacit Aug 22, 2024
c2b8d78
Add the method to tell something is iceberg format and pipe that thro…
VersusFacit Aug 26, 2024
5afb551
Finish create macro and fix alters.
VersusFacit Aug 29, 2024
53eb5b9
Finish todo items and begin cleaning code.
VersusFacit Aug 29, 2024
a3b13b8
revert dynamic table changes.
VersusFacit Aug 29, 2024
37006ae
Fix the drop by fixing snowflake__show_iceberg_relations
VersusFacit Aug 29, 2024
1887208
Transient needs sophisticated handling based on what user specifies for
VersusFacit Aug 29, 2024
8150261
Try to figure out what the right None semantics are.
VersusFacit Aug 29, 2024
74ec1a3
Revert to original statement.
VersusFacit Aug 29, 2024
2297226
Fix the transient behavior by passing table_type again.
VersusFacit Aug 29, 2024
1c26ee3
Rename object_format config param to table_format
VersusFacit Sep 10, 2024
491a76a
Migrate Jinja macros to Python.
VersusFacit Sep 11, 2024
c7192d3
All classes are frozen
VersusFacit Sep 11, 2024
6d77f69
Clean up the metadata queries that power is_iceberg column generation
VersusFacit Sep 11, 2024
4bf934c
Fix Python models generation argument
VersusFacit Sep 11, 2024
493c6ae
Add changelog.
VersusFacit Sep 11, 2024
66c2e5a
Try to fix duplication of join record issues.
VersusFacit Sep 11, 2024
e913f28
Use the RelationConfig protocol for type checking.
VersusFacit Sep 11, 2024
ebcc728
Fix transient semantics.
VersusFacit Sep 11, 2024
1cf5b74
Add functional tests.
VersusFacit Sep 11, 2024
f198177
Fix test.
VersusFacit Sep 11, 2024
8a26754
Fix test.
VersusFacit Sep 11, 2024
e4d98e5
Fix test and remove strip calls
VersusFacit Sep 12, 2024
92d7bc0
Add view test case.
VersusFacit Sep 12, 2024
e961bb0
Code review comments.
VersusFacit Sep 12, 2024
4a6046f
I'm using too new a version of mypy for Self.
VersusFacit Sep 12, 2024
d0c39f3
Add a behavior flag for iceberg table materialization.
VersusFacit Sep 12, 2024
17cd094
Flip order of flag.
VersusFacit Sep 12, 2024
a623bb5
Adjust test.
VersusFacit Sep 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 46 additions & 3 deletions dbt/adapters/snowflake/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
from dbt_common.exceptions import CompilationError, DbtDatabaseError, DbtRuntimeError
from dbt_common.utils import filter_null_values

from dbt.adapters.snowflake.relation_configs import SnowflakeRelationType
from dbt.adapters.snowflake.relation_configs import (
SnowflakeRelationType,
SnowflakeObjectFormat,
)
from dbt.adapters.snowflake import SnowflakeColumn
from dbt.adapters.snowflake import SnowflakeConnectionManager
from dbt.adapters.snowflake import SnowflakeRelation
Expand All @@ -29,6 +32,7 @@
import agate

SHOW_OBJECT_METADATA_MACRO_NAME = "snowflake__show_object_metadata"
LIST_ICEBERG_RELATIONS_MACRO_NAME = "snowflake__show_iceberg_relations"
VersusFacit marked this conversation as resolved.
Show resolved Hide resolved


@dataclass
Expand All @@ -44,6 +48,11 @@ class SnowflakeConfig(AdapterConfig):
merge_update_columns: Optional[str] = None
target_lag: Optional[str] = None

# extended formats
object_format: Optional[str] = None
external_volume: Optional[str] = None
base_location_subpath: Optional[str] = None


class SnowflakeAdapter(SQLAdapter):
Relation = SnowflakeRelation
Expand Down Expand Up @@ -223,8 +232,35 @@ def list_relations_without_caching(
self, schema_relation: SnowflakeRelation
) -> List[SnowflakeRelation]:
kwargs = {"schema_relation": schema_relation}

def check_is_iceberg(row, table2):
VersusFacit marked this conversation as resolved.
Show resolved Hide resolved
for match_row in table2.rows:
if (
row["name"] == match_row["name"]
and row["database_name"] == match_row["database_name"]
and row["schema_name"] == match_row["schema_name"]
):
return "Y"
return "N"

try:
results = self.execute_macro(LIST_RELATIONS_MACRO_NAME, kwargs=kwargs)
schema_objects = self.execute_macro(LIST_RELATIONS_MACRO_NAME, kwargs=kwargs)
iceberg_table_results = self.execute_macro(
LIST_ICEBERG_RELATIONS_MACRO_NAME, kwargs=kwargs
)
import agate

# this only seems to only inflate runtime 16%; TODO: stress test
results = schema_objects.compute(
[
(
"is_iceberg",
agate.Formula(
agate.Text(), lambda row: check_is_iceberg(row, iceberg_table_results)
),
)
]
)
except DbtDatabaseError as exc:
# if the schema doesn't exist, we just want to return.
# Alternatively, we could query the list of schemas before we start
Expand All @@ -237,16 +273,19 @@ def list_relations_without_caching(
columns = ["database_name", "schema_name", "name", "kind"]
if "is_dynamic" in results.column_names:
columns.append("is_dynamic")
if "is_iceberg" in results.column_names:
columns.append("is_iceberg")

return [self._parse_list_relations_result(result) for result in results.select(columns)]

def _parse_list_relations_result(self, result: "agate.Row") -> SnowflakeRelation:
# this can be reduced to always including `is_dynamic` once bundle `2024_03` is mandatory
try:
database, schema, identifier, relation_type, is_dynamic = result
database, schema, identifier, relation_type, is_dynamic, is_iceberg = result
except ValueError:
database, schema, identifier, relation_type = result
is_dynamic = "N"
is_iceberg = "N"

try:
relation_type = self.Relation.get_relation_type(relation_type.lower())
Expand All @@ -256,12 +295,16 @@ def _parse_list_relations_result(self, result: "agate.Row") -> SnowflakeRelation
if relation_type == self.Relation.Table and is_dynamic == "Y":
relation_type = self.Relation.DynamicTable

object_format: str = (
SnowflakeObjectFormat.ICEBERG if is_iceberg == "Y" else SnowflakeObjectFormat.DEFAULT
)
quote_policy = {"database": True, "schema": True, "identifier": True}
return self.Relation.create(
database=database,
schema=schema,
identifier=identifier,
type=relation_type,
object_format=object_format,
quote_policy=quote_policy,
)

Expand Down
6 changes: 6 additions & 0 deletions dbt/adapters/snowflake/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
SnowflakeDynamicTableRefreshModeConfigChange,
SnowflakeDynamicTableTargetLagConfigChange,
SnowflakeDynamicTableWarehouseConfigChange,
SnowflakeObjectFormat,
SnowflakeQuotePolicy,
SnowflakeRelationType,
)
Expand All @@ -25,6 +26,7 @@
@dataclass(frozen=True, eq=False, repr=False)
class SnowflakeRelation(BaseRelation):
type: Optional[SnowflakeRelationType] = None
object_format: str = SnowflakeObjectFormat.DEFAULT
quote_policy: SnowflakeQuotePolicy = field(default_factory=lambda: SnowflakeQuotePolicy())
require_alias: bool = False
relation_configs = {
Expand Down Expand Up @@ -53,6 +55,10 @@ class SnowflakeRelation(BaseRelation):
def is_dynamic_table(self) -> bool:
return self.type == SnowflakeRelationType.DynamicTable

@property
def is_iceberg_format(self) -> bool:
return self.object_format == SnowflakeObjectFormat.ICEBERG

@classproperty
def DynamicTable(cls) -> str:
return str(SnowflakeRelationType.DynamicTable)
Expand Down
1 change: 1 addition & 0 deletions dbt/adapters/snowflake/relation_configs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@
SnowflakeQuotePolicy,
SnowflakeRelationType,
)
from dbt.adapters.snowflake.relation_configs.formats import SnowflakeObjectFormat
9 changes: 9 additions & 0 deletions dbt/adapters/snowflake/relation_configs/formats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from dbt_common.dataclass_schema import StrEnum # doesn't exist in standard library until py3.11


class SnowflakeObjectFormat(StrEnum):
VersusFacit marked this conversation as resolved.
Show resolved Hide resolved
DEFAULT = "default"
ICEBERG = "iceberg"

def __str__(self):
return self.value
9 changes: 9 additions & 0 deletions dbt/include/snowflake/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -318,3 +318,12 @@
{{ snowflake_dml_explicit_transaction(truncate_dml) }}
{%- endcall %}
{% endmacro %}


{% macro snowflake__show_iceberg_relations(schema_relation) %}
{%- set sql -%}
show iceberg tables in {{ schema_relation }}
{%- endset -%}
{%- set result = run_query(sql) -%}
{%- do return(result) -%}
{% endmacro %}
48 changes: 39 additions & 9 deletions dbt/include/snowflake/macros/materializations/table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,17 @@
{% set grant_config = config.get('grants') %}

{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
{%- set target_relation = api.Relation.create(identifier=identifier,
schema=schema,
database=database, type='table') -%}
{%- set target_relation = api.Relation.create(
identifier=identifier,
schema=schema,
database=database,
type='table',
object_format=config.get('object_format', 'default')
) -%}

{{ run_hooks(pre_hooks) }}

{#-- Drop the relation if it was a view to "convert" it in a table. This may lead to
-- downtime, but it should be a relatively infrequent occurrence #}
{% if old_relation is not none and not old_relation.is_table %}
{{ log("Dropping relation " ~ old_relation ~ " because it is of type " ~ old_relation.type) }}
{{ drop_relation_if_exists(old_relation) }}
{% endif %}
{{ drop_old_relation_as_needed(old_relation, target_relation) }}

{% call statement('main', language=language) -%}
{{ create_table_as(False, target_relation, compiled_code, language) }}
Expand Down Expand Up @@ -85,3 +84,34 @@ def main(session):
# dbt = dbtObj(session.table)
# df = model(dbt, session)
{%endmacro%}


{% macro drop_old_relation_as_needed(old_relation, target_relation) %}
VersusFacit marked this conversation as resolved.
Show resolved Hide resolved
{% if old_relation is none %}
{{ return('') }}
{% endif %}

{#
-- Each of these will cause some latency, but it shoudl be a relatively infrequent occurrence.

-- An existing view must be dropped for model to "convert" into a table"
#}
{% if not old_relation.is_table %}
{{ log("Dropping relation " ~ old_relation ~ " because it is of type " ~ old_relation.type) }}
{{ drop_relation_if_exists(old_relation) }}

{#
-- An existing Iceberg table must be dropped for model to "convert" into a table.
#}
{% elif old_relation.is_iceberg_format and not target_relation.is_iceberg_format %}
{{ log("Dropping relation " ~ old_relation ~ " because it is an Iceberg format table and target relation " ~ target_relation ~ " is a default format table.") }}
{{ drop_relation_if_exists(old_relation) }}

{#
-- An existing table must be dropped for model to "convert" into an Iceberg table.
#}
{% elif old_relation.is_table and not old_relation.is_iceberg_format and target_relation.is_iceberg_format %}
{{ log("Dropping relation " ~ old_relation ~ " because it is a default format table and target relation is an Iceberg format table.") }}
{{ drop_relation_if_exists(old_relation) }}
{% endif %}
{% endmacro %}
110 changes: 97 additions & 13 deletions dbt/include/snowflake/macros/relations/table/create.sql
Original file line number Diff line number Diff line change
@@ -1,14 +1,8 @@
{% macro snowflake__create_table_as(temporary, relation, compiled_code, language='sql') -%}
{%- set transient = config.get('transient', default=true) -%}

{% if temporary -%}
{%- set table_type = "temporary" -%}
{%- elif transient -%}
{%- set table_type = "transient" -%}
{%- else -%}
{%- set table_type = "" -%}
{%- endif %}
{%- set materialization_prefix = get_create_ddl_prefix(temporary) -%}
{%- set alter_prefix = get_alter_ddl_prefix() -%}

{# Generate DDL/DML #}
{%- if language == 'sql' -%}
{%- set cluster_by_keys = config.get('cluster_by', default=none) -%}
{%- set enable_automatic_clustering = config.get('automatic_clustering', default=false) -%}
Expand All @@ -26,7 +20,16 @@

{{ sql_header if sql_header is not none }}

create or replace {{ table_type }} table {{ relation }}
create or replace {{ materialization_prefix }} table {{ relation }}
{%- if _is_iceberg_relation() %}
{#
Valid DDL in CTAS statements. Plain create statements have a different order.
VersusFacit marked this conversation as resolved.
Show resolved Hide resolved
https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table
#}
{{ render_iceberg_ddl(relation) }}
{% else %}
{%- endif -%}

{%- set contract_config = config.get('contract') -%}
{%- if contract_config.enforced -%}
{{ get_assert_columns_equivalent(sql) }}
Expand All @@ -44,16 +47,97 @@
{%- endif %}
);
{% if cluster_by_string is not none and not temporary -%}
alter table {{relation}} cluster by ({{cluster_by_string}});
alter {{ alter_prefix }} table {{relation}} cluster by ({{cluster_by_string}});
{%- endif -%}
{% if enable_automatic_clustering and cluster_by_string is not none and not temporary -%}
alter table {{relation}} resume recluster;
{% if enable_automatic_clustering and cluster_by_string is not none and not temporary %}
alter {{ alter_prefix }} table {{relation}} resume recluster;
{%- endif -%}

{%- elif language == 'python' -%}
{%- if iceberg -%}
{% do exceptions.raise_compiler_error('Iceberg is incompatible with Python models. Please use a SQL model for the iceberg format.') %}
VersusFacit marked this conversation as resolved.
Show resolved Hide resolved
{%- endif %}
{{ py_write_table(compiled_code=compiled_code, target_relation=relation, table_type=table_type) }}
{%- else -%}
{% do exceptions.raise_compiler_error("snowflake__create_table_as macro didn't get supported language, it got %s" % language) %}
{%- endif -%}

{% endmacro %}


{#
# Helper Macros
#}

{% macro get_create_ddl_prefix(temporary) %}
{#
This macro generates the appropriate DDL prefix for creating a table in Snowflake,
considering the mutually exclusive nature of certain table types:

- ICEBERG: A specific storage format that requires a distinct DDL layout.
- TEMPORARY: Indicates a table that exists only for the duration of the session.
- TRANSIENT: A type of table that is similar to a permanent table but without fail-safe.

Note: If ICEBERG is specified, transient=true throws a warning because ICEBERG
does not support transient tables.
#}

{%- set is_iceberg = _is_iceberg_relation() -%}
{%- set is_temporary = temporary -%}

{%- if is_iceberg -%}
{# -- Check if user supplied a transient model config of True. #}
{%- if config.get('transient') == True -%}
{{ exceptions.warn("Iceberg format relations cannot be transient. Please remove either the transient or iceberg parameters from %s. If left unmodified, dbt will ignore 'transient'." % this) }}
{%- endif %}

{# -- Check if runtime is trying to create a Temporary Iceberg table. #}
{%- if is_temporary -%}
{{ exceptions.raise_compiler_error("Iceberg format relations cannot be temporary. Temporary is being inserted into an Iceberg format table while materializing %s." % this) }}
{%- endif %}

{{ return('iceberg') }}

{%- elif is_temporary -%}
{{ return('temporary') }}

{# -- Always supply transient on table create DDL unless user specifically sets transient to false. #}
{%- elif config.get('transient', default=true) -%}
{{ return('transient') }}

{%- else -%}
{{ return('') }}
{%- endif -%}
{% endmacro %}


{% macro get_alter_ddl_prefix() %}
{# All ALTER statements on Iceberg tables require an ICEBERG prefix #}
{%- if _get_relation_object_format() == 'iceberg' -%}
{{ return('iceberg') }}
{%- else -%}
{{ return('') }}
{%- endif -%}
{% endmacro %}


{% macro _get_relation_object_format() %}
{{ return(config.get('object_format', default='')) }}
{% endmacro %}


{% macro _is_iceberg_relation() %}
{{ return(_get_relation_object_format() == 'iceberg') }}
{% endmacro %}


{% macro render_iceberg_ddl(relation) -%}
{%- set external_volume = config.get('external_volume') -%}
{# S3 treats subpaths with or without a trailing '/' as functionally equivalent #}
{%- set subpath = config.get('base_location_subpath') -%}
{%- set base_location = '_dbt/' ~ relation.schema ~ '/' ~ relation.name ~ (('/' ~ subpath) if subpath else '') -%}

external_volume = '{{ external_volume }}'
catalog = 'snowflake'
base_location = '{{ base_location }}'
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def list_relations_without_caching(project) -> List[SnowflakeRelation]:
database=project.database, schema=project.test_schema, identifier=""
)
with get_connection(my_adapter):
relations = my_adapter.list_relations_without_caching(schema)
relations = my_adapter.list_relations_without_caching(schema.path.schema)
mikealfare marked this conversation as resolved.
Show resolved Hide resolved
return relations

def test_list_relations_without_caching(self, project):
Expand Down
Loading