From f3f7479997d031fc53ae46e8c315604a94a9d42a Mon Sep 17 00:00:00 2001 From: jnadal Date: Fri, 9 Aug 2024 18:12:28 +0200 Subject: [PATCH] allow sync all columns --- dbt/include/spark/macros/adapters.sql | 119 +++++++++++++++++++++++--- 1 file changed, 107 insertions(+), 12 deletions(-) diff --git a/dbt/include/spark/macros/adapters.sql b/dbt/include/spark/macros/adapters.sql index a6404a2de..5c1ccd082 100644 --- a/dbt/include/spark/macros/adapters.sql +++ b/dbt/include/spark/macros/adapters.sql @@ -398,35 +398,130 @@ {% endcall %} {% endmacro %} +{% macro check_table_properties(relation, expected_properties) %} + {# Fetching current properties and populate a dict to easily compare #} + {% set properties_table = fetch_tbl_properties(relation) %} + + {% set current_properties = {} %} + + {% for row in properties_table.rows %} + {% set current_properties = current_properties.update({ row['key']: row['value'] }) %} + {% endfor %} + + {# Control variable for monitoring validation status #} + {% set missing_properties = {} %} + + {# Iterated through expected properties #} + + {% for key, expected_value in expected_properties.items() %} + {% set current_value = current_properties.get(key) %} + + {# Check for known numeric values to be >= #} + {% if key in ['delta.minReaderVersion', 'delta.minWriterVersion'] %} + {% if current_value is not none and current_value | int >= expected_value %} + {{ log("Property '" ~ key ~ "' is valid : " ~ current_value ~ " >= " ~ expected_value) }} + {% else %} + {{ log("Property '" ~ key ~ "' is not valid. Found : " ~ current_value ~ ", expected : " ~ expected_value) }} + {% do missing_properties.update({key : expected_value}) %} + {% endif %} + {% else %} + {# Check for other properties to be = #} + {% if current_value == expected_value %} + {{ log("Property '" ~ key ~ "' valid : " ~ current_value) }} + {% else %} + {{ log("Property '" ~ key ~ "' is not valid. Found : " ~ current_value ~ ", expected : " ~ expected_value) }} + {% do missing_properties.update({key : expected_value}) %} + {% endif %} + {% endif %} + {% endfor %} + + {{ return(missing_properties) }} +{% endmacro %} + {% macro spark__alter_relation_add_remove_columns(relation, add_columns, remove_columns) %} - {% if remove_columns %} - {% if relation.is_delta %} - {% set platform_name = 'Delta Lake' %} - {% elif relation.is_iceberg %} + {% if remove_columns and not relation.is_delta %} + {% if relation.is_iceberg %} {% set platform_name = 'Iceberg' %} {% else %} {% set platform_name = 'Apache Spark' %} {% endif %} {{ exceptions.raise_compiler_error(platform_name + ' does not support dropping columns from tables') }} + {% elif remove_columns and relation.is_delta %} + {# Checking Delta table properties to see if we can drop columns #} + {# It must have the following properties #} + + {% set expected_properties = { + 'delta.minReaderVersion': 2, + 'delta.minWriterVersion': 5, + 'delta.columnMapping.mode': 'name' + } %} + + {% set missing_properties = check_table_properties(relation, expected_properties) %} + {% if missing_properties %} + {% set msg %} + Delta table properties do not allow dropping columns. Dropping is available with the following properties: + {{ expected_properties }} + Either run the following command : + + ALTER TABLE {{ relation }} + SET TBLPROPERTIES ( + {% for key, value in missing_properties.items() %} + '{{ key }}' = '{{ value }}'{{ ',' if not loop.last }} + {% endfor %} + ) + + Or add the following to your model condfig and rebuild it : + table_properties={ + 'delta.minReaderVersion': '2', + 'delta.minWriterVersion': '5', + 'delta.columnMapping.mode': 'name' + } + {% endset %} + + {{ exceptions.raise_compiler_error(msg) }} + {% endif %} {% endif %} {% if add_columns is none %} {% set add_columns = [] %} {% endif %} - {% set sql -%} + {% if remove_columns is none %} + {% set remove_columns = [] %} + {% endif %} - alter {{ relation.type }} {{ relation }} + {% if add_columns %} + {% set sql -%} + alter {{ relation.type }} {{ relation }} + add columns + {% for column in add_columns %} + {{ column.name }} {{ column.data_type }}{{ ',' if not loop.last }} + {% endfor %} + {%- endset -%} + + {% call statement("run_query_statement", fetch_result=false, auto_begin=false) %} + {{ sql }} + {% endcall %} + {% endif %} - {% if add_columns %} add columns {% endif %} - {% for column in add_columns %} - {{ column.name }} {{ column.data_type }}{{ ',' if not loop.last }} - {% endfor %} + {# Will run only if using Delta format with appropriated table properties #} + {% if remove_columns %} + {% set sql -%} + alter {{ relation.type }} {{ relation }} + drop columns + {% for column in remove_columns %} + {{ column.name }}{{ ',' if not loop.last }} + {% endfor %} + {%- endset -%} + + {% call statement("run_query_statement", fetch_result=false, auto_begin=false) %} + {{ sql }} + {% endcall %} + {% endif %} - {%- endset -%} - {% do run_query(sql) %} + {% endmacro %}