Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow sync all columns for Delta incremental models #1088

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 107 additions & 13 deletions dbt/include/spark/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -391,42 +391,136 @@
{% do return(tmp_relation) %}
{% endmacro %}


{% macro spark__alter_column_type(relation, column_name, new_column_type) -%}
{% call statement('alter_column_type') %}
alter table {{ relation }} alter column {{ column_name }} type {{ new_column_type }};
{% endcall %}
{% endmacro %}

{% macro check_table_properties(relation, expected_properties) %}
{# Fetching current properties and populate a dict to easily compare #}
{% set properties_table = fetch_tbl_properties(relation) %}

{% set current_properties = {} %}

{% for row in properties_table.rows %}
{% set current_properties = current_properties.update({ row['key']: row['value'] }) %}
{% endfor %}

{# Control variable for monitoring validation status #}
{% set missing_properties = {} %}

{# Iterated through expected properties #}

{% for key, expected_value in expected_properties.items() %}
{% set current_value = current_properties.get(key) %}

{# Check for known numeric values to be >= #}
{% if key in ['delta.minReaderVersion', 'delta.minWriterVersion'] %}
{% if current_value is not none and current_value | int >= expected_value %}
{{ log("Property '" ~ key ~ "' is valid : " ~ current_value ~ " >= " ~ expected_value) }}
{% else %}
{{ log("Property '" ~ key ~ "' is not valid. Found : " ~ current_value ~ ", expected : " ~ expected_value) }}
{% do missing_properties.update({key : expected_value}) %}
{% endif %}
{% else %}
{# Check for other properties to be = #}
{% if current_value == expected_value %}
{{ log("Property '" ~ key ~ "' valid : " ~ current_value) }}
{% else %}
{{ log("Property '" ~ key ~ "' is not valid. Found : " ~ current_value ~ ", expected : " ~ expected_value) }}
{% do missing_properties.update({key : expected_value}) %}
{% endif %}
{% endif %}
{% endfor %}

{{ return(missing_properties) }}
{% endmacro %}


{% macro spark__alter_relation_add_remove_columns(relation, add_columns, remove_columns) %}

{% if remove_columns %}
{% if relation.is_delta %}
{% set platform_name = 'Delta Lake' %}
{% elif relation.is_iceberg %}
{% if remove_columns and not relation.is_delta %}
{% if relation.is_iceberg %}
{% set platform_name = 'Iceberg' %}
{% else %}
{% set platform_name = 'Apache Spark' %}
{% endif %}
{{ exceptions.raise_compiler_error(platform_name + ' does not support dropping columns from tables') }}
{% elif remove_columns and relation.is_delta %}
{# Checking Delta table properties to see if we can drop columns #}
{# It must have the following properties #}

{% set expected_properties = {
'delta.minReaderVersion': 2,
'delta.minWriterVersion': 5,
'delta.columnMapping.mode': 'name'
} %}

{% set missing_properties = check_table_properties(relation, expected_properties) %}
{% if missing_properties %}
{% set msg %}
Delta table properties do not allow dropping columns. Dropping is available with the following properties:
{{ expected_properties }}
Either run the following command :

ALTER TABLE {{ relation }}
SET TBLPROPERTIES (
{% for key, value in missing_properties.items() %}
'{{ key }}' = '{{ value }}'{{ ',' if not loop.last }}
{% endfor %}
)

Or add the following to your model condfig and rebuild it :
table_properties={
'delta.minReaderVersion': '2',
'delta.minWriterVersion': '5',
'delta.columnMapping.mode': 'name'
}
{% endset %}

{{ exceptions.raise_compiler_error(msg) }}
{% endif %}
{% endif %}

{% if add_columns is none %}
{% set add_columns = [] %}
{% endif %}

{% set sql -%}
{% if remove_columns is none %}
{% set remove_columns = [] %}
{% endif %}

alter {{ relation.type }} {{ relation }}
{% if add_columns %}
{% set sql -%}
alter {{ relation.type }} {{ relation }}
add columns
{% for column in add_columns %}
{{ column.name }} {{ column.data_type }}{{ ',' if not loop.last }}
{% endfor %}
{%- endset -%}

{% call statement("run_query_statement", fetch_result=false, auto_begin=false) %}
{{ sql }}
{% endcall %}
{% endif %}

{% if add_columns %} add columns {% endif %}
{% for column in add_columns %}
{{ column.name }} {{ column.data_type }}{{ ',' if not loop.last }}
{% endfor %}
{# Will run only if using Delta format with appropriated table properties #}
{% if remove_columns %}
{% set sql -%}
alter {{ relation.type }} {{ relation }}
drop columns
{% for column in remove_columns %}
{{ column.name }}{{ ',' if not loop.last }}
{% endfor %}
{%- endset -%}

{% call statement("run_query_statement", fetch_result=false, auto_begin=false) %}
{{ sql }}
{% endcall %}
{% endif %}

{%- endset -%}

{% do run_query(sql) %}


{% endmacro %}
Loading