Skip to content

Commit

Permalink
Adding in additional support for iceberg v2 tables
Browse files Browse the repository at this point in the history
Found a way to identify iceberg tables given that spark returns
an error when trying to execute "SHOW TABLE EXTENDED..."  See
https://issues.apache.org/jira/browse/SPARK-33393

Instead of show table extended a "DESCRIBE EXTENDED" is
performed to retrieve the provider information.  This allows
for identification of iceberg through an is_iceberg member
variable.

Allow for multiple join conditions to allow for mutliple columns to
make a row distinct

Use is_iceberg everywhere handling iceberg tables differs from other
sources of data.
  • Loading branch information
dparent1 authored and cccs-jc committed Nov 19, 2022
1 parent b759267 commit 9d88e83
Show file tree
Hide file tree
Showing 7 changed files with 200 additions and 51 deletions.
66 changes: 59 additions & 7 deletions dbt/adapters/spark/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
GET_COLUMNS_IN_RELATION_RAW_MACRO_NAME = "get_columns_in_relation_raw"
LIST_SCHEMAS_MACRO_NAME = "list_schemas"
LIST_RELATIONS_MACRO_NAME = "list_relations_without_caching"
LIST_RELATIONS_NO_EXTENDED_MACRO_NAME = "list_relations_without_caching_no_extended"
DESCRIBE_TABLE_EXTENDED_MACRO_NAME = "describe_table_extended_without_caching"
DROP_RELATION_MACRO_NAME = "drop_relation"
FETCH_TBL_PROPERTIES_MACRO_NAME = "fetch_tbl_properties"

Expand Down Expand Up @@ -126,38 +128,79 @@ def add_schema_to_cache(self, schema) -> str:
# so jinja doesn't render things
return ""

def use_show_tables(self, table_name):
information = ""
kwargs = {"table_name": table_name}
try:
table_results = self.execute_macro(DESCRIBE_TABLE_EXTENDED_MACRO_NAME, kwargs=kwargs)
except dbt.exceptions.RuntimeException as e:
logger.debug(f"Error while retrieving information about {table_name}: {e.msg}")
return []
for info_row in table_results:
info_type, info_value, _ = info_row
if info_type.startswith("#") is False:
information += f"{info_type}: {info_value}\n"
return information

def list_relations_without_caching(
self, schema_relation: SparkRelation
) -> List[SparkRelation]:
kwargs = {"schema_relation": schema_relation}
try_show_tables = False
expected_result_rows = 4
try:
results = self.execute_macro(LIST_RELATIONS_MACRO_NAME, kwargs=kwargs)
except dbt.exceptions.RuntimeException as e:
errmsg = getattr(e, "msg", "")
if f"Database '{schema_relation}' not found" in errmsg:
return []
elif "SHOW TABLE EXTENDED is not supported for v2 tables" in errmsg:
# this happens with spark-iceberg with v2 iceberg tables
# https://issues.apache.org/jira/browse/SPARK-33393
try_show_tables = True
else:
description = "Error while retrieving information about"
logger.debug(f"{description} {schema_relation}: {e.msg}")
return []

if try_show_tables:
expected_result_rows = 3
try:
results = self.execute_macro(LIST_RELATIONS_NO_EXTENDED_MACRO_NAME, kwargs=kwargs)
except dbt.exceptions.RuntimeException as e:
errmsg = getattr(e, "msg", "")
description = "Error while retrieving information about"
logger.debug(f"{description} {schema_relation}: {e.msg}")
return []

relations = []
for row in results:
if len(row) != 4:
if len(row) != expected_result_rows:
if try_show_tables:
description = 'Invalid value from "show tables ...", '
else:
description = 'Invalid value from "show table extended ...", '
raise dbt.exceptions.RuntimeException(
f'Invalid value from "show table extended ...", '
f"got {len(row)} values, expected 4"
f"{description} got {len(row)} values, expected {expected_result_rows}"
)
_schema, name, _, information = row
rel_type = RelationType.View if "Type: VIEW" in information else RelationType.Table

if try_show_tables:
_, name, _ = row
information = self.use_show_tables(name)
else:
_schema, name, _, information = row
is_delta = "Provider: delta" in information
is_hudi = "Provider: hudi" in information
is_iceberg = "Provider: iceberg" in information
rel_type = RelationType.View if "Type: VIEW" in information else RelationType.Table

relation = self.Relation.create(
schema=_schema,
schema=schema_relation.schema,
identifier=name,
type=rel_type,
information=information,
is_delta=is_delta,
is_iceberg=is_iceberg,
is_hudi=is_hudi,
)
relations.append(relation)
Expand Down Expand Up @@ -254,7 +297,16 @@ def parse_columns_from_information(self, relation: SparkRelation) -> List[SparkC
return columns

def _get_columns_for_catalog(self, relation: SparkRelation) -> Iterable[Dict[str, Any]]:
columns = self.parse_columns_from_information(relation)
columns = []
if relation and relation.information:
columns = self.parse_columns_from_information(relation)
else:
# in open source delta 'show table extended' query output doesn't
# return relation's schema. if columns are empty from cache,
# use get_columns_in_relation spark macro
# which would execute 'describe extended tablename' query
rows: List[agate.Row] = super().get_columns_in_relation(relation)
columns = self.parse_describe_extended(relation, rows)

for column in columns:
# convert SparkColumns into catalog dicts
Expand Down
37 changes: 34 additions & 3 deletions dbt/adapters/spark/relation.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
from typing import Optional

from typing import Optional, TypeVar, Any, Type, Dict
from dbt.contracts.graph.parsed import ParsedSourceDefinition
from dbt.utils import deep_merge
from dataclasses import dataclass

from dbt.adapters.base.relation import BaseRelation, Policy
from dbt.exceptions import RuntimeException
from dbt.events import AdapterLogger

logger = AdapterLogger("Spark")

Self = TypeVar("Self", bound="BaseRelation")


@dataclass
Expand All @@ -27,12 +33,37 @@ class SparkRelation(BaseRelation):
quote_character: str = "`"
is_delta: Optional[bool] = None
is_hudi: Optional[bool] = None
is_iceberg: Optional[bool] = None
information: Optional[str] = None
loader: Optional[str] = None
source_meta: Optional[Dict[str, Any]] = None
meta: Optional[Dict[str, Any]] = None

def __post_init__(self):
if self.database != self.schema and self.database:
if self.is_iceberg is not True and self.database != self.schema and self.database:
raise RuntimeException("Cannot set database in spark!")

@classmethod
def create_from_source(cls: Type[Self], source: ParsedSourceDefinition, **kwargs: Any) -> Self:
source_quoting = source.quoting.to_dict(omit_none=True)
source_quoting.pop("column", None)
quote_policy = deep_merge(
cls.get_default_quote_policy().to_dict(omit_none=True),
source_quoting,
kwargs.get("quote_policy", {}),
)

return cls.create(
database=source.database,
schema=source.schema,
identifier=source.identifier,
quote_policy=quote_policy,
loader=source.loader,
source_meta=source.source_meta,
meta=source.meta,
**kwargs,
)

def render(self):
if self.include_policy.database and self.include_policy.schema:
raise RuntimeException(
Expand Down
61 changes: 56 additions & 5 deletions dbt/include/spark/macros/adapters.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
{% macro dbt_spark_tblproperties_clause() -%}
{%- set tblproperties = config.get('tblproperties') -%}
{%- if tblproperties is not none %}
tblproperties (
{%- for prop in tblproperties -%}
'{{ prop }}' = '{{ tblproperties[prop] }}' {% if not loop.last %}, {% endif %}
{%- endfor %}
)
{%- endif %}
{%- endmacro -%}

{% macro file_format_clause() %}
{{ return(adapter.dispatch('file_format_clause', 'dbt')()) }}
{%- endmacro -%}
Expand Down Expand Up @@ -135,6 +146,8 @@
{%- else -%}
{% if config.get('file_format', validator=validation.any[basestring]) == 'delta' %}
create or replace table {{ relation }}
{% elif config.get('file_format', validator=validation.any[basestring]) == 'iceberg' %}
create or replace table {{ relation }}
{% else %}
create table {{ relation }}
{% endif %}
Expand Down Expand Up @@ -191,7 +204,10 @@
{% endmacro %}

{% macro spark__get_columns_in_relation(relation) -%}
{{ return(adapter.get_columns_in_relation(relation)) }}
{% call statement('get_columns_in_relation', fetch_result=True) %}
describe extended {{ relation.include(schema=(schema is not none)) }}
{% endcall %}
{% do return(load_result('get_columns_in_relation').table) %}
{% endmacro %}

{% macro spark__list_relations_without_caching(relation) %}
Expand All @@ -202,6 +218,29 @@
{% do return(load_result('list_relations_without_caching').table) %}
{% endmacro %}

{% macro list_relations_without_caching_no_extended(schema_relation) %}
{#-- We can't use temporary tables with `create ... as ()` syntax #}
{#-- Spark with iceberg tables don't work with show table extended for #}
{#-- V2 iceberg tables #}
{#-- https://issues.apache.org/jira/browse/SPARK-33393 #}
{% call statement('list_relations_without_caching_no_extended', fetch_result=True) -%}
show tables in {{ schema_relation }} like '*'
{% endcall %}

{% do return(load_result('list_relations_without_caching_no_extended').table) %}
{% endmacro %}

{% macro describe_table_extended_without_caching(table_name) %}
{#-- We can't use temporary tables with `create ... as ()` syntax #}
{#-- Spark with iceberg tables don't work with show table extended for #}
{#-- V2 iceberg tables #}
{#-- https://issues.apache.org/jira/browse/SPARK-33393 #}
{% call statement('describe_table_extended_without_caching', fetch_result=True) -%}
describe extended {{ table_name }}
{% endcall %}
{% do return(load_result('describe_table_extended_without_caching').table) %}
{% endmacro %}

{% macro spark__list_schemas(database) -%}
{% call statement('list_schemas', fetch_result=True, auto_begin=False) %}
show databases
Expand Down Expand Up @@ -246,9 +285,15 @@
{% set comment = column_dict[column_name]['description'] %}
{% set escaped_comment = comment | replace('\'', '\\\'') %}
{% set comment_query %}
alter table {{ relation }} change column
{{ adapter.quote(column_name) if column_dict[column_name]['quote'] else column_name }}
comment '{{ escaped_comment }}';
{% if relation.is_iceberg %}
alter table {{ relation }} alter column
{{ adapter.quote(column_name) if column_dict[column_name]['quote'] else column_name }}
comment '{{ escaped_comment }}';
{% else %}
alter table {{ relation }} change column
{{ adapter.quote(column_name) if column_dict[column_name]['quote'] else column_name }}
comment '{{ escaped_comment }}';
{% endif %}
{% endset %}
{% do run_query(comment_query) %}
{% endfor %}
Expand Down Expand Up @@ -276,7 +321,13 @@
{% macro spark__alter_relation_add_remove_columns(relation, add_columns, remove_columns) %}

{% if remove_columns %}
{% set platform_name = 'Delta Lake' if relation.is_delta else 'Apache Spark' %}
{% if relation.is_delta %}
{% set platform_name = 'Delta Lake' %}
{% elif relation.is_iceberg %}
{% set platform_name = 'Iceberg' %}
{% else %}
{% set platform_name = 'Apache Spark' %}
{% endif %}
{{ exceptions.raise_compiler_error(platform_name + ' does not support dropping columns from tables') }}
{% endif %}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
insert overwrite table {{ target_relation }}
{{ partition_cols(label="partition") }}
select {{dest_cols_csv}} from {{ source_relation }}
select {{dest_cols_csv}} from {{ source_relation.include(database=false, schema=false) }}

{% endmacro %}

Expand All @@ -27,29 +28,26 @@
{%- set merge_exclude_columns = config.get('merge_exclude_columns') -%}
{%- set update_columns = get_merge_update_columns(merge_update_columns, merge_exclude_columns, dest_columns) -%}

{% if unique_key %}
{% if unique_key is sequence and unique_key is not mapping and unique_key is not string %}
{% for key in unique_key %}
{% set this_key_match %}
DBT_INTERNAL_SOURCE.{{ key }} = DBT_INTERNAL_DEST.{{ key }}
{% endset %}
{% do predicates.append(this_key_match) %}
{% endfor %}
{% else %}
{% set unique_key_match %}
{% set merge_condition %}
{% if unique_key %}
{# added support for multiple join condition, multiple unique_key #}
on {% if unique_key is string %}
DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }}
{% endset %}
{% do predicates.append(unique_key_match) %}
{% endif %}
{% else %}
{% do predicates.append('FALSE') %}
{% endif %}

{{ sql_header if sql_header is not none }}
{% else %}
{%- for k in unique_key %}
DBT_INTERNAL_SOURCE.{{ k }} = DBT_INTERNAL_DEST.{{ k }}
{%- if not loop.last %} AND {%- endif %}
{%- endfor %}
{% endif %}
{% else %}
on false
{% endif %}
{% endset %}

merge into {{ target }} as DBT_INTERNAL_DEST
using {{ source }} as DBT_INTERNAL_SOURCE
on {{ predicates | join(' and ') }}
using {{ source.include(schema=false) }} as DBT_INTERNAL_SOURCE

{{ merge_condition }}

when matched then update set
{% if update_columns -%}{%- for column_name in update_columns %}
Expand All @@ -70,7 +68,7 @@
{#-- insert statements don't like CTEs, so support them via a temp view #}
{{ get_insert_overwrite_sql(source, target) }}
{%- elif strategy == 'merge' -%}
{#-- merge all columns with databricks delta - schema changes are handled for us #}
{#-- merge all columns with databricks delta or iceberg - schema changes are handled for us #}
{{ get_merge_sql(target, source, unique_key, dest_columns=none, predicates=none) }}
{%- else -%}
{% set no_sql_for_strategy_msg -%}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{% macro dbt_spark_validate_get_file_format(raw_file_format) %}
{#-- Validate the file format #}

{% set accepted_formats = ['text', 'csv', 'json', 'jdbc', 'parquet', 'orc', 'hive', 'delta', 'libsvm', 'hudi'] %}
{% set accepted_formats = ['text', 'csv', 'json', 'jdbc', 'parquet', 'orc', 'hive', 'delta', 'iceberg', 'libsvm', 'hudi'] %}

{% set invalid_file_format_msg -%}
Invalid file format provided: {{ raw_file_format }}
Expand All @@ -26,12 +26,12 @@

{% set invalid_merge_msg -%}
Invalid incremental strategy provided: {{ raw_strategy }}
You can only choose this strategy when file_format is set to 'delta' or 'hudi'
You can only choose this strategy when file_format is set to 'delta' or 'iceberg' or 'hudi'
{%- endset %}

{% set invalid_insert_overwrite_delta_msg -%}
Invalid incremental strategy provided: {{ raw_strategy }}
You cannot use this strategy when file_format is set to 'delta'
You cannot use this strategy when file_format is set to 'delta' or 'iceberg'
Use the 'append' or 'merge' strategy instead
{%- endset %}

Expand All @@ -44,7 +44,7 @@
{% if raw_strategy not in ['append', 'merge', 'insert_overwrite'] %}
{% do exceptions.raise_compiler_error(invalid_strategy_msg) %}
{%-else %}
{% if raw_strategy == 'merge' and file_format not in ['delta', 'hudi'] %}
{% if raw_strategy == 'merge' and file_format not in ['delta', 'iceberg', 'hudi'] %}
{% do exceptions.raise_compiler_error(invalid_merge_msg) %}
{% endif %}
{% if raw_strategy == 'insert_overwrite' and file_format == 'delta' %}
Expand Down
Loading

0 comments on commit 9d88e83

Please sign in to comment.