Skip to content

Commit

Permalink
allow sync all columns
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeremynadal33 committed Aug 9, 2024
1 parent 2580ac5 commit f3f7479
Showing 1 changed file with 107 additions and 12 deletions.
119 changes: 107 additions & 12 deletions dbt/include/spark/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -398,35 +398,130 @@
{% endcall %}
{% endmacro %}

{% macro check_table_properties(relation, expected_properties) %}
{# Fetching current properties and populate a dict to easily compare #}
{% set properties_table = fetch_tbl_properties(relation) %}

{% set current_properties = {} %}

{% for row in properties_table.rows %}
{% set current_properties = current_properties.update({ row['key']: row['value'] }) %}
{% endfor %}

{# Control variable for monitoring validation status #}
{% set missing_properties = {} %}

{# Iterated through expected properties #}

{% for key, expected_value in expected_properties.items() %}
{% set current_value = current_properties.get(key) %}

{# Check for known numeric values to be >= #}
{% if key in ['delta.minReaderVersion', 'delta.minWriterVersion'] %}
{% if current_value is not none and current_value | int >= expected_value %}
{{ log("Property '" ~ key ~ "' is valid : " ~ current_value ~ " >= " ~ expected_value) }}
{% else %}
{{ log("Property '" ~ key ~ "' is not valid. Found : " ~ current_value ~ ", expected : " ~ expected_value) }}
{% do missing_properties.update({key : expected_value}) %}
{% endif %}
{% else %}
{# Check for other properties to be = #}
{% if current_value == expected_value %}
{{ log("Property '" ~ key ~ "' valid : " ~ current_value) }}
{% else %}
{{ log("Property '" ~ key ~ "' is not valid. Found : " ~ current_value ~ ", expected : " ~ expected_value) }}
{% do missing_properties.update({key : expected_value}) %}
{% endif %}
{% endif %}
{% endfor %}

{{ return(missing_properties) }}
{% endmacro %}


{% macro spark__alter_relation_add_remove_columns(relation, add_columns, remove_columns) %}

{% if remove_columns %}
{% if relation.is_delta %}
{% set platform_name = 'Delta Lake' %}
{% elif relation.is_iceberg %}
{% if remove_columns and not relation.is_delta %}
{% if relation.is_iceberg %}
{% set platform_name = 'Iceberg' %}
{% else %}
{% set platform_name = 'Apache Spark' %}
{% endif %}
{{ exceptions.raise_compiler_error(platform_name + ' does not support dropping columns from tables') }}
{% elif remove_columns and relation.is_delta %}
{# Checking Delta table properties to see if we can drop columns #}
{# It must have the following properties #}

{% set expected_properties = {
'delta.minReaderVersion': 2,
'delta.minWriterVersion': 5,
'delta.columnMapping.mode': 'name'
} %}

{% set missing_properties = check_table_properties(relation, expected_properties) %}
{% if missing_properties %}
{% set msg %}
Delta table properties do not allow dropping columns. Dropping is available with the following properties:
{{ expected_properties }}
Either run the following command :

ALTER TABLE {{ relation }}
SET TBLPROPERTIES (
{% for key, value in missing_properties.items() %}
'{{ key }}' = '{{ value }}'{{ ',' if not loop.last }}
{% endfor %}
)

Or add the following to your model condfig and rebuild it :
table_properties={
'delta.minReaderVersion': '2',
'delta.minWriterVersion': '5',
'delta.columnMapping.mode': 'name'
}
{% endset %}

{{ exceptions.raise_compiler_error(msg) }}
{% endif %}
{% endif %}

{% if add_columns is none %}
{% set add_columns = [] %}
{% endif %}

{% set sql -%}
{% if remove_columns is none %}
{% set remove_columns = [] %}
{% endif %}

alter {{ relation.type }} {{ relation }}
{% if add_columns %}
{% set sql -%}
alter {{ relation.type }} {{ relation }}
add columns
{% for column in add_columns %}
{{ column.name }} {{ column.data_type }}{{ ',' if not loop.last }}
{% endfor %}
{%- endset -%}

{% call statement("run_query_statement", fetch_result=false, auto_begin=false) %}
{{ sql }}
{% endcall %}
{% endif %}

{% if add_columns %} add columns {% endif %}
{% for column in add_columns %}
{{ column.name }} {{ column.data_type }}{{ ',' if not loop.last }}
{% endfor %}
{# Will run only if using Delta format with appropriated table properties #}
{% if remove_columns %}
{% set sql -%}
alter {{ relation.type }} {{ relation }}
drop columns
{% for column in remove_columns %}
{{ column.name }}{{ ',' if not loop.last }}
{% endfor %}
{%- endset -%}

{% call statement("run_query_statement", fetch_result=false, auto_begin=false) %}
{{ sql }}
{% endcall %}
{% endif %}

{%- endset -%}

{% do run_query(sql) %}


{% endmacro %}

0 comments on commit f3f7479

Please sign in to comment.