diff --git a/dbt/adapters/spark/impl.py b/dbt/adapters/spark/impl.py index 6eff652eb..4655d710a 100644 --- a/dbt/adapters/spark/impl.py +++ b/dbt/adapters/spark/impl.py @@ -31,6 +31,8 @@ GET_COLUMNS_IN_RELATION_RAW_MACRO_NAME = "get_columns_in_relation_raw" LIST_SCHEMAS_MACRO_NAME = "list_schemas" LIST_RELATIONS_MACRO_NAME = "list_relations_without_caching" +LIST_RELATIONS_NO_EXTENDED_MACRO_NAME = "list_relations_without_caching_no_extended" +DESCRIBE_TABLE_EXTENDED_MACRO_NAME = "describe_table_extended_without_caching" DROP_RELATION_MACRO_NAME = "drop_relation" FETCH_TBL_PROPERTIES_MACRO_NAME = "fetch_tbl_properties" @@ -126,38 +128,79 @@ def add_schema_to_cache(self, schema) -> str: # so jinja doesn't render things return "" + def use_show_tables(self, table_name): + information = "" + kwargs = {"table_name": table_name} + try: + table_results = self.execute_macro(DESCRIBE_TABLE_EXTENDED_MACRO_NAME, kwargs=kwargs) + except dbt.exceptions.RuntimeException as e: + logger.debug(f"Error while retrieving information about {table_name}: {e.msg}") + return [] + for info_row in table_results: + info_type, info_value, _ = info_row + if info_type.startswith("#") is False: + information += f"{info_type}: {info_value}\n" + return information + def list_relations_without_caching( self, schema_relation: SparkRelation ) -> List[SparkRelation]: kwargs = {"schema_relation": schema_relation} + try_show_tables = False + expected_result_rows = 4 try: results = self.execute_macro(LIST_RELATIONS_MACRO_NAME, kwargs=kwargs) except dbt.exceptions.RuntimeException as e: errmsg = getattr(e, "msg", "") if f"Database '{schema_relation}' not found" in errmsg: return [] + elif "SHOW TABLE EXTENDED is not supported for v2 tables" in errmsg: + # this happens with spark-iceberg with v2 iceberg tables + # https://issues.apache.org/jira/browse/SPARK-33393 + try_show_tables = True else: description = "Error while retrieving information about" logger.debug(f"{description} {schema_relation}: {e.msg}") return [] + if try_show_tables: + expected_result_rows = 3 + try: + results = self.execute_macro(LIST_RELATIONS_NO_EXTENDED_MACRO_NAME, kwargs=kwargs) + except dbt.exceptions.RuntimeException as e: + errmsg = getattr(e, "msg", "") + description = "Error while retrieving information about" + logger.debug(f"{description} {schema_relation}: {e.msg}") + return [] + relations = [] for row in results: - if len(row) != 4: + if len(row) != expected_result_rows: + if try_show_tables: + description = 'Invalid value from "show tables ...", ' + else: + description = 'Invalid value from "show table extended ...", ' raise dbt.exceptions.RuntimeException( - f'Invalid value from "show table extended ...", ' - f"got {len(row)} values, expected 4" + f"{description} got {len(row)} values, expected {expected_result_rows}" ) - _schema, name, _, information = row - rel_type = RelationType.View if "Type: VIEW" in information else RelationType.Table + + if try_show_tables: + _, name, _ = row + information = self.use_show_tables(name) + else: + _schema, name, _, information = row is_delta = "Provider: delta" in information is_hudi = "Provider: hudi" in information + is_iceberg = "Provider: iceberg" in information + rel_type = RelationType.View if "Type: VIEW" in information else RelationType.Table + relation = self.Relation.create( - schema=_schema, + schema=schema_relation.schema, identifier=name, type=rel_type, information=information, is_delta=is_delta, + is_iceberg=is_iceberg, is_hudi=is_hudi, ) relations.append(relation) @@ -254,7 +297,16 @@ def parse_columns_from_information(self, relation: SparkRelation) -> List[SparkC return columns def _get_columns_for_catalog(self, relation: SparkRelation) -> Iterable[Dict[str, Any]]: - columns = self.parse_columns_from_information(relation) + columns = [] + if relation and relation.information: + columns = self.parse_columns_from_information(relation) + else: + # in open source delta 'show table extended' query output doesn't + # return relation's schema. if columns are empty from cache, + # use get_columns_in_relation spark macro + # which would execute 'describe extended tablename' query + rows: List[agate.Row] = super().get_columns_in_relation(relation) + columns = self.parse_describe_extended(relation, rows) for column in columns: # convert SparkColumns into catalog dicts diff --git a/dbt/adapters/spark/relation.py b/dbt/adapters/spark/relation.py index 249caf0d7..3aa9acf11 100644 --- a/dbt/adapters/spark/relation.py +++ b/dbt/adapters/spark/relation.py @@ -1,9 +1,15 @@ -from typing import Optional - +from typing import Optional, TypeVar, Any, Type, Dict +from dbt.contracts.graph.parsed import ParsedSourceDefinition +from dbt.utils import deep_merge from dataclasses import dataclass from dbt.adapters.base.relation import BaseRelation, Policy from dbt.exceptions import RuntimeException +from dbt.events import AdapterLogger + +logger = AdapterLogger("Spark") + +Self = TypeVar("Self", bound="BaseRelation") @dataclass @@ -27,12 +33,37 @@ class SparkRelation(BaseRelation): quote_character: str = "`" is_delta: Optional[bool] = None is_hudi: Optional[bool] = None + is_iceberg: Optional[bool] = None information: Optional[str] = None + loader: Optional[str] = None + source_meta: Optional[Dict[str, Any]] = None + meta: Optional[Dict[str, Any]] = None def __post_init__(self): - if self.database != self.schema and self.database: + if self.is_iceberg is not True and self.database != self.schema and self.database: raise RuntimeException("Cannot set database in spark!") + @classmethod + def create_from_source(cls: Type[Self], source: ParsedSourceDefinition, **kwargs: Any) -> Self: + source_quoting = source.quoting.to_dict(omit_none=True) + source_quoting.pop("column", None) + quote_policy = deep_merge( + cls.get_default_quote_policy().to_dict(omit_none=True), + source_quoting, + kwargs.get("quote_policy", {}), + ) + + return cls.create( + database=source.database, + schema=source.schema, + identifier=source.identifier, + quote_policy=quote_policy, + loader=source.loader, + source_meta=source.source_meta, + meta=source.meta, + **kwargs, + ) + def render(self): if self.include_policy.database and self.include_policy.schema: raise RuntimeException( diff --git a/dbt/include/spark/macros/adapters.sql b/dbt/include/spark/macros/adapters.sql index abc7a0ba3..3d0406dc0 100644 --- a/dbt/include/spark/macros/adapters.sql +++ b/dbt/include/spark/macros/adapters.sql @@ -1,3 +1,14 @@ +{% macro dbt_spark_tblproperties_clause() -%} + {%- set tblproperties = config.get('tblproperties') -%} + {%- if tblproperties is not none %} + tblproperties ( + {%- for prop in tblproperties -%} + '{{ prop }}' = '{{ tblproperties[prop] }}' {% if not loop.last %}, {% endif %} + {%- endfor %} + ) + {%- endif %} +{%- endmacro -%} + {% macro file_format_clause() %} {{ return(adapter.dispatch('file_format_clause', 'dbt')()) }} {%- endmacro -%} @@ -135,6 +146,8 @@ {%- else -%} {% if config.get('file_format', validator=validation.any[basestring]) == 'delta' %} create or replace table {{ relation }} + {% elif config.get('file_format', validator=validation.any[basestring]) == 'iceberg' %} + create or replace table {{ relation }} {% else %} create table {{ relation }} {% endif %} @@ -191,7 +204,10 @@ {% endmacro %} {% macro spark__get_columns_in_relation(relation) -%} - {{ return(adapter.get_columns_in_relation(relation)) }} + {% call statement('get_columns_in_relation', fetch_result=True) %} + describe extended {{ relation.include(schema=(schema is not none)) }} + {% endcall %} + {% do return(load_result('get_columns_in_relation').table) %} {% endmacro %} {% macro spark__list_relations_without_caching(relation) %} @@ -202,6 +218,29 @@ {% do return(load_result('list_relations_without_caching').table) %} {% endmacro %} +{% macro list_relations_without_caching_no_extended(schema_relation) %} + {#-- We can't use temporary tables with `create ... as ()` syntax #} + {#-- Spark with iceberg tables don't work with show table extended for #} + {#-- V2 iceberg tables #} + {#-- https://issues.apache.org/jira/browse/SPARK-33393 #} + {% call statement('list_relations_without_caching_no_extended', fetch_result=True) -%} + show tables in {{ schema_relation }} like '*' + {% endcall %} + + {% do return(load_result('list_relations_without_caching_no_extended').table) %} +{% endmacro %} + +{% macro describe_table_extended_without_caching(table_name) %} + {#-- We can't use temporary tables with `create ... as ()` syntax #} + {#-- Spark with iceberg tables don't work with show table extended for #} + {#-- V2 iceberg tables #} + {#-- https://issues.apache.org/jira/browse/SPARK-33393 #} + {% call statement('describe_table_extended_without_caching', fetch_result=True) -%} + describe extended {{ table_name }} + {% endcall %} + {% do return(load_result('describe_table_extended_without_caching').table) %} +{% endmacro %} + {% macro spark__list_schemas(database) -%} {% call statement('list_schemas', fetch_result=True, auto_begin=False) %} show databases @@ -246,9 +285,15 @@ {% set comment = column_dict[column_name]['description'] %} {% set escaped_comment = comment | replace('\'', '\\\'') %} {% set comment_query %} - alter table {{ relation }} change column - {{ adapter.quote(column_name) if column_dict[column_name]['quote'] else column_name }} - comment '{{ escaped_comment }}'; + {% if relation.is_iceberg %} + alter table {{ relation }} alter column + {{ adapter.quote(column_name) if column_dict[column_name]['quote'] else column_name }} + comment '{{ escaped_comment }}'; + {% else %} + alter table {{ relation }} change column + {{ adapter.quote(column_name) if column_dict[column_name]['quote'] else column_name }} + comment '{{ escaped_comment }}'; + {% endif %} {% endset %} {% do run_query(comment_query) %} {% endfor %} @@ -276,7 +321,13 @@ {% macro spark__alter_relation_add_remove_columns(relation, add_columns, remove_columns) %} {% if remove_columns %} - {% set platform_name = 'Delta Lake' if relation.is_delta else 'Apache Spark' %} + {% if relation.is_delta %} + {% set platform_name = 'Delta Lake' %} + {% elif relation.is_iceberg %} + {% set platform_name = 'Iceberg' %} + {% else %} + {% set platform_name = 'Apache Spark' %} + {% endif %} {{ exceptions.raise_compiler_error(platform_name + ' does not support dropping columns from tables') }} {% endif %} diff --git a/dbt/include/spark/macros/materializations/incremental/strategies.sql b/dbt/include/spark/macros/materializations/incremental/strategies.sql index 17196e85d..19dc3e175 100644 --- a/dbt/include/spark/macros/materializations/incremental/strategies.sql +++ b/dbt/include/spark/macros/materializations/incremental/strategies.sql @@ -5,6 +5,7 @@ insert overwrite table {{ target_relation }} {{ partition_cols(label="partition") }} select {{dest_cols_csv}} from {{ source_relation }} + select {{dest_cols_csv}} from {{ source_relation.include(database=false, schema=false) }} {% endmacro %} @@ -27,29 +28,26 @@ {%- set merge_exclude_columns = config.get('merge_exclude_columns') -%} {%- set update_columns = get_merge_update_columns(merge_update_columns, merge_exclude_columns, dest_columns) -%} - {% if unique_key %} - {% if unique_key is sequence and unique_key is not mapping and unique_key is not string %} - {% for key in unique_key %} - {% set this_key_match %} - DBT_INTERNAL_SOURCE.{{ key }} = DBT_INTERNAL_DEST.{{ key }} - {% endset %} - {% do predicates.append(this_key_match) %} - {% endfor %} - {% else %} - {% set unique_key_match %} + {% set merge_condition %} + {% if unique_key %} + {# added support for multiple join condition, multiple unique_key #} + on {% if unique_key is string %} DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }} - {% endset %} - {% do predicates.append(unique_key_match) %} - {% endif %} - {% else %} - {% do predicates.append('FALSE') %} - {% endif %} - - {{ sql_header if sql_header is not none }} + {% else %} + {%- for k in unique_key %} + DBT_INTERNAL_SOURCE.{{ k }} = DBT_INTERNAL_DEST.{{ k }} + {%- if not loop.last %} AND {%- endif %} + {%- endfor %} + {% endif %} + {% else %} + on false + {% endif %} + {% endset %} merge into {{ target }} as DBT_INTERNAL_DEST - using {{ source }} as DBT_INTERNAL_SOURCE - on {{ predicates | join(' and ') }} + using {{ source.include(schema=false) }} as DBT_INTERNAL_SOURCE + + {{ merge_condition }} when matched then update set {% if update_columns -%}{%- for column_name in update_columns %} @@ -70,7 +68,7 @@ {#-- insert statements don't like CTEs, so support them via a temp view #} {{ get_insert_overwrite_sql(source, target) }} {%- elif strategy == 'merge' -%} - {#-- merge all columns with databricks delta - schema changes are handled for us #} + {#-- merge all columns with databricks delta or iceberg - schema changes are handled for us #} {{ get_merge_sql(target, source, unique_key, dest_columns=none, predicates=none) }} {%- else -%} {% set no_sql_for_strategy_msg -%} diff --git a/dbt/include/spark/macros/materializations/incremental/validate.sql b/dbt/include/spark/macros/materializations/incremental/validate.sql index ffd56f106..71ec01821 100644 --- a/dbt/include/spark/macros/materializations/incremental/validate.sql +++ b/dbt/include/spark/macros/materializations/incremental/validate.sql @@ -1,7 +1,7 @@ {% macro dbt_spark_validate_get_file_format(raw_file_format) %} {#-- Validate the file format #} - {% set accepted_formats = ['text', 'csv', 'json', 'jdbc', 'parquet', 'orc', 'hive', 'delta', 'libsvm', 'hudi'] %} + {% set accepted_formats = ['text', 'csv', 'json', 'jdbc', 'parquet', 'orc', 'hive', 'delta', 'iceberg', 'libsvm', 'hudi'] %} {% set invalid_file_format_msg -%} Invalid file format provided: {{ raw_file_format }} @@ -26,12 +26,12 @@ {% set invalid_merge_msg -%} Invalid incremental strategy provided: {{ raw_strategy }} - You can only choose this strategy when file_format is set to 'delta' or 'hudi' + You can only choose this strategy when file_format is set to 'delta' or 'iceberg' or 'hudi' {%- endset %} {% set invalid_insert_overwrite_delta_msg -%} Invalid incremental strategy provided: {{ raw_strategy }} - You cannot use this strategy when file_format is set to 'delta' + You cannot use this strategy when file_format is set to 'delta' or 'iceberg' Use the 'append' or 'merge' strategy instead {%- endset %} @@ -44,7 +44,7 @@ {% if raw_strategy not in ['append', 'merge', 'insert_overwrite'] %} {% do exceptions.raise_compiler_error(invalid_strategy_msg) %} {%-else %} - {% if raw_strategy == 'merge' and file_format not in ['delta', 'hudi'] %} + {% if raw_strategy == 'merge' and file_format not in ['delta', 'iceberg', 'hudi'] %} {% do exceptions.raise_compiler_error(invalid_merge_msg) %} {% endif %} {% if raw_strategy == 'insert_overwrite' and file_format == 'delta' %} diff --git a/dbt/include/spark/macros/materializations/snapshot.sql b/dbt/include/spark/macros/materializations/snapshot.sql index 6cf2358fe..a397f84e5 100644 --- a/dbt/include/spark/macros/materializations/snapshot.sql +++ b/dbt/include/spark/macros/materializations/snapshot.sql @@ -15,7 +15,12 @@ {% macro spark__snapshot_merge_sql(target, source, insert_cols) -%} merge into {{ target }} as DBT_INTERNAL_DEST - using {{ source }} as DBT_INTERNAL_SOURCE + {% if target.is_iceberg %} + {# create view only supports a name (no catalog, or schema) #} + using {{ source.identifier }} as DBT_INTERNAL_SOURCE + {% else %} + using {{ source }} as DBT_INTERNAL_SOURCE + {% endif %} on DBT_INTERNAL_SOURCE.dbt_scd_id = DBT_INTERNAL_DEST.dbt_scd_id when matched and DBT_INTERNAL_DEST.dbt_valid_to is null @@ -33,10 +38,18 @@ {% macro spark_build_snapshot_staging_table(strategy, sql, target_relation) %} {% set tmp_identifier = target_relation.identifier ~ '__dbt_tmp' %} - {%- set tmp_relation = api.Relation.create(identifier=tmp_identifier, - schema=target_relation.schema, - database=none, - type='view') -%} + {% if target_relation.is_iceberg %} + {# iceberg catalog does not support create view, but regular spark does. We removed the catalog and schema #} + {%- set tmp_relation = api.Relation.create(identifier=tmp_identifier, + schema=none, + database=none, + type='view') -%} + {% else %} + {%- set tmp_relation = api.Relation.create(identifier=tmp_identifier, + schema=target_relation.schema, + database=none, + type='view') -%} + {% endif %} {% set select = snapshot_staging_table(strategy, sql, target_relation) %} @@ -83,25 +96,25 @@ identifier=target_table, type='table') -%} - {%- if file_format not in ['delta', 'hudi'] -%} + {%- if file_format not in ['delta', 'iceberg', 'hudi'] -%} {% set invalid_format_msg -%} Invalid file format: {{ file_format }} - Snapshot functionality requires file_format be set to 'delta' or 'hudi' + Snapshot functionality requires file_format be set to 'delta' or 'iceberg' or 'hudi' {%- endset %} {% do exceptions.raise_compiler_error(invalid_format_msg) %} {% endif %} {%- if target_relation_exists -%} - {%- if not target_relation.is_delta and not target_relation.is_hudi -%} + {%- if not target_relation.is_delta and not target_relation.is_iceberg and not target_relation.is_hudi -%} {% set invalid_format_msg -%} - The existing table {{ model.schema }}.{{ target_table }} is in another format than 'delta' or 'hudi' + The existing table {{ model.schema }}.{{ target_table }} is in another format than 'delta' or 'iceberg' or 'hudi' {%- endset %} {% do exceptions.raise_compiler_error(invalid_format_msg) %} {% endif %} {% endif %} {% if not adapter.check_schema_exists(model.database, model.schema) %} - {% do create_schema(model.database, model.schema) %} + {% do create_schema(model.schema) %} {% endif %} {%- if not target_relation.is_table -%} diff --git a/dbt/include/spark/macros/materializations/table.sql b/dbt/include/spark/macros/materializations/table.sql index c82e27e9c..96cda9c1d 100644 --- a/dbt/include/spark/macros/materializations/table.sql +++ b/dbt/include/spark/macros/materializations/table.sql @@ -12,12 +12,16 @@ {{ run_hooks(pre_hooks) }} -- setup: if the target relation already exists, drop it - -- in case if the existing and future table is delta, we want to do a + -- in case if the existing and future table is delta or iceberg, we want to do a -- create or replace table instead of dropping, so we don't have the table unavailable {% if old_relation and not (old_relation.is_delta and config.get('file_format', validator=validation.any[basestring]) == 'delta') -%} {{ adapter.drop_relation(old_relation) }} {%- endif %} + {% if old_relation and not (old_relation.is_iceberg and config.get('file_format', validator=validation.any[basestring]) == 'iceberg') -%} + {{ adapter.drop_relation(old_relation) }} + {%- endif %} + -- build model {%- call statement('main', language=language) -%}