From 98d001d363b041e784f6ae8d41ea5e9d0b687dea Mon Sep 17 00:00:00 2001 From: Theo Kiehn <162969167+tkiehn@users.noreply.github.com> Date: Tue, 19 Nov 2024 08:21:53 +0100 Subject: [PATCH 1/6] add macro to process new prejoin list syntax --- .../helpers/stage_processing_macros.sql | 73 ++++++++++++++++++- 1 file changed, 72 insertions(+), 1 deletion(-) diff --git a/macros/internal/helpers/stage_processing_macros.sql b/macros/internal/helpers/stage_processing_macros.sql index 6ecf2676..75a90978 100644 --- a/macros/internal/helpers/stage_processing_macros.sql +++ b/macros/internal/helpers/stage_processing_macros.sql @@ -123,4 +123,75 @@ {%- endif %} {%- endfor -%} -{%- endmacro -%} \ No newline at end of file +{%- endmacro -%} + + +{%- macro process_prejoined_columns(prejoined_columns=none) -%} +{# Check if the new list syntax is used for prejoined columns + If so parse it to dictionaries #} + +{% if not datavault4dbt.is_list(prejoined_columns) %} + {% do return(prejoined_columns) %} +{% else %} + {# if the (new) list syntax for prejoins is used + it needs to be converted to the old syntax #} + + {# Initialize emtpy dict which will be filled by each entry #} + {% set return_dict = {} %} + + {# Iterate over each dictionary in the prejoined_colums-list #} + {% for dict_item in prejoined_columns %} + + {# If column aliases are present they they have to map 1:1 to the extract_columns #} + {% if datavault4dbt.is_something(dict_item.aliases) + and not dict_item.aliases|length == dict_item.extract_columns|length%} + {{ exceptions.raise_compiler_error("Prejoin aliases must have the same length as extract_columns") }} + {% endif %} + + {# If multiple columns from the same source should be extracted each column has to be processed once #} + {% if datavault4dbt.is_list(dict_item.extract_columns) %} + {% for column in dict_item.extract_columns %} + {# If aliases are defined they should be used as dict keys + These will be used as new column names #} + {% if datavault4dbt.is_something(dict_item.aliases) %} + {% set dict_key = dict_item.aliases[loop.index-1] %} + {% else %} + {% set dict_key = dict_item.extract_columns[loop.index-1] %} + {% endif %} + + {% set tmp_dict %} + {{dict_key}}: + ref_model: {{dict_item.ref_model}} + bk: {{dict_item.extract_columns[loop.index-1]}} + this_column_name: {{dict_item.this_column_name}} + ref_column_name: {{dict_item.ref_column_name}} + {% endset %} + {% do return_dict.update(fromyaml(tmp_dict)) %} + {% endfor %} + + {% else %} + + {# If aliases are defined they should be used as dict keys + These will be used as new column names #} + {% if datavault4dbt.is_something(dict_item.aliases) %} + {% set dict_key = dict_item.aliases[loop.index-1] %} + {% else %} + {% set dict_key = dict_item.extract_columns[loop.index-1] %} + {% endif %} + + {% set tmp_dict %} + {{dict_key}}: + ref_model: {{dict_item.ref_model}} + bk: {{dict_item.extract_columns[loop.index-1]}} + this_column_name: {{dict_item.this_column_name}} + ref_column_name: {{dict_item.ref_column_name}} + {% endset %} + {% do return_dict.update(fromyaml(tmp_dict)) %} + {% endif %} + {% endfor %} + + {%- do return(return_dict) -%} + +{% endif %} + +{%- endmacro -%} From 4b0e02d5c27b82decb0c396318b26aae08000229 Mon Sep 17 00:00:00 2001 From: Theo Kiehn <162969167+tkiehn@users.noreply.github.com> Date: Tue, 19 Nov 2024 08:38:16 +0100 Subject: [PATCH 2/6] add process_prejoined_columns macro to top-level stage macro --- macros/staging/stage.sql | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/macros/staging/stage.sql b/macros/staging/stage.sql index 76b17ed0..403df72c 100644 --- a/macros/staging/stage.sql +++ b/macros/staging/stage.sql @@ -120,6 +120,11 @@ {%- if datavault4dbt.is_nothing(ldts) -%} {%- set ldts = datavault4dbt.current_timestamp() -%} {%- endif -%} + + {# To parse the list syntax of prejoined columns #} + {%- if datavault4dbt.is_something(prejoined_columns) -%} + {%- set prejoined_columns = datavault4dbt.process_prejoined_columns(prejoined_columns) -%} + {%- endif -%} {{- adapter.dispatch('stage', 'datavault4dbt')(include_source_columns=include_source_columns, ldts=ldts, From 07ec2dec22339ed4c54a70ce0f38b53ddb0eb13d Mon Sep 17 00:00:00 2001 From: tkiehn <162969167+tkiehn@users.noreply.github.com> Date: Wed, 20 Nov 2024 18:29:51 +0100 Subject: [PATCH 3/6] change prejoin-logic to perform less joins --- macros/staging/bigquery/stage.sql | 37 +++++++++++++++++--------- macros/staging/databricks/stage.sql | 38 +++++++++++++++++---------- macros/staging/exasol/stage.sql | 37 +++++++++++++++++--------- macros/staging/fabric/stage.sql | 40 +++++++++++++++++++---------- macros/staging/oracle/stage.sql | 37 +++++++++++++++++--------- macros/staging/postgres/stage.sql | 37 +++++++++++++++++--------- macros/staging/redshift/stage.sql | 38 +++++++++++++++++---------- macros/staging/snowflake/stage.sql | 37 +++++++++++++++++--------- macros/staging/synapse/stage.sql | 39 ++++++++++++++++++---------- 9 files changed, 226 insertions(+), 114 deletions(-) diff --git a/macros/staging/bigquery/stage.sql b/macros/staging/bigquery/stage.sql index 8c94c387..0486c5f2 100644 --- a/macros/staging/bigquery/stage.sql +++ b/macros/staging/bigquery/stage.sql @@ -256,6 +256,7 @@ missing_columns AS ( ), {%- endif -%} + {%- if datavault4dbt.is_something(prejoined_columns) %} {# Prejoining Business Keys of other source objects for Link purposes #} prejoined_columns AS ( @@ -263,14 +264,12 @@ prejoined_columns AS ( SELECT {% if final_columns_to_select | length > 0 -%} {{ datavault4dbt.print_list(datavault4dbt.prefix(columns=datavault4dbt.escape_column_names(final_columns_to_select), prefix_str='lcte').split(',')) }} - {% endif %} - {%- for col, vals in prejoined_columns.items() -%} - ,pj_{{loop.index}}.{{ vals['bk'] }} AS {{ col }} - {% endfor -%} - - FROM {{ last_cte }} lcte + {%- endif -%} - {% for col, vals in prejoined_columns.items() %} + {#- prepare join statements -#} + {%- set prejoin_statements_list = [] -%} + {%- set processed_prejoin_hashes = [] -%} + {%- for col, vals in prejoined_columns.items() -%} {%- if 'src_name' in vals.keys() or 'src_table' in vals.keys() -%} {%- set relation = source(vals['src_name']|string, vals['src_table']) -%} @@ -309,15 +308,29 @@ prejoined_columns AS ( {%- set operator = vals['operator'] -%} {%- endif -%} - {%- set prejoin_alias = 'pj_' + loop.index|string -%} - left join {{ relation }} as {{ prejoin_alias }} - on {{ datavault4dbt.multikey(columns=vals['this_column_name'], prefix=['lcte', prejoin_alias], condition='=', operator=operator, right_columns=vals['ref_column_name']) }} + {%- set prejoin_hash = '`' ~ local_md5(relation~vals['this_column_name']~operator~vals['ref_column_name']) ~ '`' -%} - {% endfor %} + {%- if not prejoin_hash in processed_prejoin_hashes %} + {%- do processed_prejoin_hashes.append(prejoin_hash) %} + {%- set prejoin_join_statement_tmp -%} + left join {{ relation }} as {{ prejoin_hash }} + on {{ datavault4dbt.multikey(columns=vals['this_column_name'], prefix=['lcte', prejoin_hash], condition='=', operator=operator, right_columns=vals['ref_column_name']) }} + + {% endset -%} + {%- do prejoin_statements_list.append(prejoin_join_statement_tmp) -%} + {%- endif -%} + +{# select the prejoined columns #} + ,{{prejoin_hash}}.{{ vals['bk'] }} AS {{ col }} + {% endfor -%} + + FROM {{ last_cte }} lcte + + {{ prejoin_statements_list|join(' ')}} {% set last_cte = "prejoined_columns" -%} - {%- set final_columns_to_select = final_columns_to_select + prejoined_column_names %} + {%- set final_columns_to_select = final_columns_to_select + prejoined_column_names -%} ), {%- endif -%} diff --git a/macros/staging/databricks/stage.sql b/macros/staging/databricks/stage.sql index fc76044b..a3ff3b28 100644 --- a/macros/staging/databricks/stage.sql +++ b/macros/staging/databricks/stage.sql @@ -253,6 +253,7 @@ missing_columns AS ( ), {%- endif -%} + {%- if datavault4dbt.is_something(prejoined_columns) %} {# Prejoining Business Keys of other source objects for Link purposes #} prejoined_columns AS ( @@ -260,14 +261,12 @@ prejoined_columns AS ( SELECT {% if final_columns_to_select | length > 0 -%} {{ datavault4dbt.print_list(datavault4dbt.prefix(columns=datavault4dbt.escape_column_names(final_columns_to_select), prefix_str='lcte').split(',')) }} - {% endif %} - {%- for col, vals in prejoined_columns.items() -%} - ,pj_{{loop.index}}.{{ vals['bk'] }} AS {{ col }} - {% endfor -%} - - FROM {{ last_cte }} lcte + {%- endif -%} - {% for col, vals in prejoined_columns.items() %} + {#- prepare join statements -#} + {%- set prejoin_statements_list = [] -%} + {%- set processed_prejoin_hashes = [] -%} + {%- for col, vals in prejoined_columns.items() -%} {%- if 'src_name' in vals.keys() or 'src_table' in vals.keys() -%} {%- set relation = source(vals['src_name']|string, vals['src_table']) -%} @@ -306,19 +305,32 @@ prejoined_columns AS ( {%- set operator = vals['operator'] -%} {%- endif -%} - {%- set prejoin_alias = 'pj_' + loop.index|string -%} - left join {{ relation }} as {{ prejoin_alias }} - on {{ datavault4dbt.multikey(columns=vals['this_column_name'], prefix=['lcte', prejoin_alias], condition='=', operator=operator, right_columns=vals['ref_column_name']) }} + {%- set prejoin_hash = '`' ~ local_md5(relation~vals['this_column_name']~operator~vals['ref_column_name']) ~ '`' -%} - {% endfor %} + {%- if not prejoin_hash in processed_prejoin_hashes %} + {%- do processed_prejoin_hashes.append(prejoin_hash) %} + {%- set prejoin_join_statement_tmp -%} + left join {{ relation }} as {{ prejoin_hash }} + on {{ datavault4dbt.multikey(columns=vals['this_column_name'], prefix=['lcte', prejoin_hash], condition='=', operator=operator, right_columns=vals['ref_column_name']) }} + + {% endset -%} + {%- do prejoin_statements_list.append(prejoin_join_statement_tmp) -%} + {%- endif -%} + +{# select the prejoined columns #} + ,{{prejoin_hash}}.{{ vals['bk'] }} AS {{ col }} + {% endfor -%} + + FROM {{ last_cte }} lcte + + {{ prejoin_statements_list|join(' ')}} {% set last_cte = "prejoined_columns" -%} - {%- set final_columns_to_select = final_columns_to_select + prejoined_column_names %} + {%- set final_columns_to_select = final_columns_to_select + prejoined_column_names -%} ), {%- endif -%} - {%- if datavault4dbt.is_something(derived_columns) %} {# Adding derived columns to the selection #} derived_columns AS ( diff --git a/macros/staging/exasol/stage.sql b/macros/staging/exasol/stage.sql index 50dd35d3..9812f462 100644 --- a/macros/staging/exasol/stage.sql +++ b/macros/staging/exasol/stage.sql @@ -244,6 +244,7 @@ missing_columns AS ( ), {%- endif -%} + {%- if datavault4dbt.is_something(prejoined_columns) %} {# Prejoining Business Keys of other source objects for Link purposes #} prejoined_columns AS ( @@ -251,14 +252,12 @@ prejoined_columns AS ( SELECT {% if final_columns_to_select | length > 0 -%} {{ datavault4dbt.print_list(datavault4dbt.prefix(columns=datavault4dbt.escape_column_names(final_columns_to_select), prefix_str='lcte').split(',')) }} - {% endif %} - {%- for col, vals in prejoined_columns.items() -%} - ,pj_{{loop.index}}.{{ vals['bk'] }} AS "{{ col | upper }}" - {% endfor -%} - - FROM {{ last_cte }} lcte + {%- endif -%} - {% for col, vals in prejoined_columns.items() %} + {#- prepare join statements -#} + {%- set prejoin_statements_list = [] -%} + {%- set processed_prejoin_hashes = [] -%} + {%- for col, vals in prejoined_columns.items() -%} {%- if 'src_name' in vals.keys() or 'src_table' in vals.keys() -%} {%- set relation = source(vals['src_name']|string, vals['src_table']) -%} @@ -297,15 +296,29 @@ prejoined_columns AS ( {%- set operator = vals['operator'] -%} {%- endif -%} - {%- set prejoin_alias = 'pj_' + loop.index|string -%} - left join {{ relation }} as {{ prejoin_alias }} - on {{ datavault4dbt.multikey(columns=vals['this_column_name'], prefix=['lcte', prejoin_alias], condition='=', operator=operator, right_columns=vals['ref_column_name']) }} + {%- set prejoin_hash = '"' ~ local_md5(relation~vals['this_column_name']~operator~vals['ref_column_name']) ~ '"' -%} - {% endfor %} + {%- if not prejoin_hash in processed_prejoin_hashes %} + {%- do processed_prejoin_hashes.append(prejoin_hash) %} + {%- set prejoin_join_statement_tmp -%} + left join {{ relation }} as {{ prejoin_hash }} + on {{ datavault4dbt.multikey(columns=vals['this_column_name'], prefix=['lcte', prejoin_hash], condition='=', operator=operator, right_columns=vals['ref_column_name']) }} + + {% endset -%} + {%- do prejoin_statements_list.append(prejoin_join_statement_tmp) -%} + {%- endif -%} + +{# select the prejoined columns #} + ,{{prejoin_hash}}.{{ vals['bk'] }} AS {{ col }} + {% endfor -%} + + FROM {{ last_cte }} lcte + + {{ prejoin_statements_list|join(' ')}} {% set last_cte = "prejoined_columns" -%} - {%- set final_columns_to_select = final_columns_to_select + prejoined_column_names %} + {%- set final_columns_to_select = final_columns_to_select + prejoined_column_names -%} ), {%- endif -%} diff --git a/macros/staging/fabric/stage.sql b/macros/staging/fabric/stage.sql index 605f6861..d042d7a9 100644 --- a/macros/staging/fabric/stage.sql +++ b/macros/staging/fabric/stage.sql @@ -253,23 +253,21 @@ missing_columns AS ( ), {%- endif -%} -{%- if datavault4dbt.is_something(prejoined_columns) %} -{%- set final_columns_to_select = (final_columns_to_select + derived_input_columns) | unique | list -%} +{%- if datavault4dbt.is_something(prejoined_columns) %} {# Prejoining Business Keys of other source objects for Link purposes #} + prejoined_columns AS ( SELECT {% if final_columns_to_select | length > 0 -%} {{ datavault4dbt.print_list(datavault4dbt.prefix(columns=datavault4dbt.escape_column_names(final_columns_to_select), prefix_str='lcte').split(',')) }} - {% endif %} - {%- for col, vals in prejoined_columns.items() -%} - ,pj_{{loop.index}}.{{datavault4dbt.escape_column_names(vals['bk'])}} AS {{datavault4dbt.escape_column_names(col)}} - {% endfor -%} - - FROM {{ last_cte }} lcte + {%- endif -%} - {% for col, vals in prejoined_columns.items() %} + {#- prepare join statements -#} + {%- set prejoin_statements_list = [] -%} + {%- set processed_prejoin_hashes = [] -%} + {%- for col, vals in prejoined_columns.items() -%} {%- if 'src_name' in vals.keys() or 'src_table' in vals.keys() -%} {%- set relation = source(vals['src_name']|string, vals['src_table']) -%} @@ -308,15 +306,29 @@ prejoined_columns AS ( {%- set operator = vals['operator'] -%} {%- endif -%} - {%- set prejoin_alias = 'pj_' + loop.index|string -%} - left join {{ relation }} as {{ prejoin_alias }} - on {{ datavault4dbt.multikey(columns=datavault4dbt.escape_column_names(vals['this_column_name']), prefix=['lcte', prejoin_alias], condition='=', operator=operator, right_columns=datavault4dbt.escape_column_names(vals['ref_column_name'])) }} + {%- set prejoin_hash = '"' ~ local_md5(relation~vals['this_column_name']~operator~vals['ref_column_name']) ~ '"' -%} - {% endfor %} + {%- if not prejoin_hash in processed_prejoin_hashes %} + {%- do processed_prejoin_hashes.append(prejoin_hash) %} + {%- set prejoin_join_statement_tmp -%} + left join {{ relation }} as {{ prejoin_hash }} + on {{ datavault4dbt.multikey(columns=vals['this_column_name'], prefix=['lcte', prejoin_hash], condition='=', operator=operator, right_columns=vals['ref_column_name']) }} + + {% endset -%} + {%- do prejoin_statements_list.append(prejoin_join_statement_tmp) -%} + {%- endif -%} + +{# select the prejoined columns #} + ,{{prejoin_hash}}.{{ vals['bk'] }} AS {{ col }} + {% endfor -%} + + FROM {{ last_cte }} lcte + + {{ prejoin_statements_list|join(' ')}} {% set last_cte = "prejoined_columns" -%} - {%- set final_columns_to_select = final_columns_to_select + prejoined_column_names %} + {%- set final_columns_to_select = final_columns_to_select + prejoined_column_names -%} ), {%- endif -%} diff --git a/macros/staging/oracle/stage.sql b/macros/staging/oracle/stage.sql index c2be1409..40afc202 100644 --- a/macros/staging/oracle/stage.sql +++ b/macros/staging/oracle/stage.sql @@ -263,6 +263,7 @@ missing_columns AS ( ), {%- endif -%} + {%- if datavault4dbt.is_something(prejoined_columns) %} {# Prejoining Business Keys of other source objects for Link purposes #} prejoined_columns AS ( @@ -270,14 +271,12 @@ prejoined_columns AS ( SELECT {% if final_columns_to_select | length > 0 -%} {{ datavault4dbt.print_list(datavault4dbt.prefix(columns=datavault4dbt.escape_column_names(final_columns_to_select), prefix_str='lcte').split(',')) }} - {% endif %} - {%- for col, vals in prejoined_columns.items() -%} - ,pj_{{loop.index}}.{{ vals['bk'] }} AS {{ col }} - {% endfor -%} - - FROM {{ last_cte }} lcte + {%- endif -%} - {% for col, vals in prejoined_columns.items() %} + {#- prepare join statements -#} + {%- set prejoin_statements_list = [] -%} + {%- set processed_prejoin_hashes = [] -%} + {%- for col, vals in prejoined_columns.items() -%} {%- if 'src_name' in vals.keys() or 'src_table' in vals.keys() -%} {%- set relation = source(vals['src_name']|string, vals['src_table']) -%} @@ -316,15 +315,29 @@ prejoined_columns AS ( {%- set operator = vals['operator'] -%} {%- endif -%} - {%- set prejoin_alias = 'pj_' + loop.index|string -%} - left join {{ relation }} {{ prejoin_alias }} - on {{ datavault4dbt.multikey(columns=vals['this_column_name'], prefix=['lcte', prejoin_alias], condition='=', operator=operator, right_columns=vals['ref_column_name']) }} + {%- set prejoin_hash = '"' ~ local_md5(relation~vals['this_column_name']~operator~vals['ref_column_name']) ~ '"' -%} - {% endfor %} + {%- if not prejoin_hash in processed_prejoin_hashes %} + {%- do processed_prejoin_hashes.append(prejoin_hash) %} + {%- set prejoin_join_statement_tmp -%} + left join {{ relation }} as {{ prejoin_hash }} + on {{ datavault4dbt.multikey(columns=vals['this_column_name'], prefix=['lcte', prejoin_hash], condition='=', operator=operator, right_columns=vals['ref_column_name']) }} + + {% endset -%} + {%- do prejoin_statements_list.append(prejoin_join_statement_tmp) -%} + {%- endif -%} + +{# select the prejoined columns #} + ,{{prejoin_hash}}.{{ vals['bk'] }} AS {{ col }} + {% endfor -%} + + FROM {{ last_cte }} lcte + + {{ prejoin_statements_list|join(' ')}} {% set last_cte = "prejoined_columns" -%} - {%- set final_columns_to_select = final_columns_to_select + prejoined_column_names %} + {%- set final_columns_to_select = final_columns_to_select + prejoined_column_names -%} ), {%- endif -%} diff --git a/macros/staging/postgres/stage.sql b/macros/staging/postgres/stage.sql index 9edd3a38..1f4ef548 100644 --- a/macros/staging/postgres/stage.sql +++ b/macros/staging/postgres/stage.sql @@ -256,6 +256,7 @@ missing_columns AS ( ), {%- endif -%} + {%- if datavault4dbt.is_something(prejoined_columns) %} {# Prejoining Business Keys of other source objects for Link purposes #} prejoined_columns AS ( @@ -263,14 +264,12 @@ prejoined_columns AS ( SELECT {% if final_columns_to_select | length > 0 -%} {{ datavault4dbt.print_list(datavault4dbt.prefix(columns=datavault4dbt.escape_column_names(final_columns_to_select), prefix_str='lcte').split(',')) }} - {% endif %} - {%- for col, vals in prejoined_columns.items() -%} - ,pj_{{loop.index}}.{{ vals['bk'] }} AS {{ col }} - {% endfor -%} - - FROM {{ last_cte }} lcte + {%- endif -%} - {% for col, vals in prejoined_columns.items() %} + {#- prepare join statements -#} + {%- set prejoin_statements_list = [] -%} + {%- set processed_prejoin_hashes = [] -%} + {%- for col, vals in prejoined_columns.items() -%} {%- if 'src_name' in vals.keys() or 'src_table' in vals.keys() -%} {%- set relation = source(vals['src_name']|string, vals['src_table']) -%} @@ -309,15 +308,29 @@ prejoined_columns AS ( {%- set operator = vals['operator'] -%} {%- endif -%} - {%- set prejoin_alias = 'pj_' + loop.index|string -%} - left join {{ relation }} as {{ prejoin_alias }} - on {{ datavault4dbt.multikey(columns=vals['this_column_name'], prefix=['lcte', prejoin_alias], condition='=', operator=operator, right_columns=vals['ref_column_name']) }} + {%- set prejoin_hash = '"' ~ local_md5(relation~vals['this_column_name']~operator~vals['ref_column_name']) ~ '"' -%} - {% endfor %} + {%- if not prejoin_hash in processed_prejoin_hashes %} + {%- do processed_prejoin_hashes.append(prejoin_hash) %} + {%- set prejoin_join_statement_tmp -%} + left join {{ relation }} as {{ prejoin_hash }} + on {{ datavault4dbt.multikey(columns=vals['this_column_name'], prefix=['lcte', prejoin_hash], condition='=', operator=operator, right_columns=vals['ref_column_name']) }} + + {% endset -%} + {%- do prejoin_statements_list.append(prejoin_join_statement_tmp) -%} + {%- endif -%} + +{# select the prejoined columns #} + ,{{prejoin_hash}}.{{ vals['bk'] }} AS {{ col }} + {% endfor -%} + + FROM {{ last_cte }} lcte + + {{ prejoin_statements_list|join(' ')}} {% set last_cte = "prejoined_columns" -%} - {%- set final_columns_to_select = final_columns_to_select + prejoined_column_names %} + {%- set final_columns_to_select = final_columns_to_select + prejoined_column_names -%} ), {%- endif -%} diff --git a/macros/staging/redshift/stage.sql b/macros/staging/redshift/stage.sql index a7704c03..6c9238b0 100644 --- a/macros/staging/redshift/stage.sql +++ b/macros/staging/redshift/stage.sql @@ -260,16 +260,14 @@ missing_columns AS ( prejoined_columns AS ( SELECT - {% if final_columns_to_select | length > 0 -%} + {% if final_columns_to_select | length > 0 -%} {{ datavault4dbt.print_list(datavault4dbt.prefix(columns=datavault4dbt.escape_column_names(final_columns_to_select), prefix_str='lcte').split(',')) }} - {% endif %} - {%- for col, vals in prejoined_columns.items() -%} - ,pj_{{loop.index}}.{{ vals['bk'] }} AS {{ col }} - {% endfor -%} - - FROM {{ last_cte }} lcte + {%- endif -%} - {% for col, vals in prejoined_columns.items() %} + {#- prepare join statements -#} + {%- set prejoin_statements_list = [] -%} + {%- set processed_prejoin_hashes = [] -%} + {%- for col, vals in prejoined_columns.items() -%} {%- if 'src_name' in vals.keys() or 'src_table' in vals.keys() -%} {%- set relation = source(vals['src_name']|string, vals['src_table']) -%} @@ -308,15 +306,29 @@ prejoined_columns AS ( {%- set operator = vals['operator'] -%} {%- endif -%} - {%- set prejoin_alias = 'pj_' + loop.index|string -%} - left join {{ relation }} as {{ prejoin_alias }} - on {{ datavault4dbt.multikey(columns=vals['this_column_name'], prefix=['lcte', prejoin_alias], condition='=', operator=operator, right_columns=vals['ref_column_name']) }} + {%- set prejoin_hash = '"' ~ local_md5(relation~vals['this_column_name']~operator~vals['ref_column_name']) ~ '"' -%} - {% endfor %} + {%- if not prejoin_hash in processed_prejoin_hashes %} + {%- do processed_prejoin_hashes.append(prejoin_hash) %} + {%- set prejoin_join_statement_tmp -%} + left join {{ relation }} as {{ prejoin_hash }} + on {{ datavault4dbt.multikey(columns=vals['this_column_name'], prefix=['lcte', prejoin_hash], condition='=', operator=operator, right_columns=vals['ref_column_name']) }} + + {% endset -%} + {%- do prejoin_statements_list.append(prejoin_join_statement_tmp) -%} + {%- endif -%} + +{# select the prejoined columns #} + ,{{prejoin_hash}}.{{ vals['bk'] }} AS {{ col }} + {% endfor -%} + + FROM {{ last_cte }} lcte + + {{ prejoin_statements_list|join(' ')}} {% set last_cte = "prejoined_columns" -%} - {%- set final_columns_to_select = final_columns_to_select + prejoined_column_names %} + {%- set final_columns_to_select = final_columns_to_select + prejoined_column_names -%} ), {%- endif -%} diff --git a/macros/staging/snowflake/stage.sql b/macros/staging/snowflake/stage.sql index 956c632e..ed4c9d22 100644 --- a/macros/staging/snowflake/stage.sql +++ b/macros/staging/snowflake/stage.sql @@ -263,6 +263,7 @@ missing_columns AS ( ), {%- endif -%} + {%- if datavault4dbt.is_something(prejoined_columns) %} {# Prejoining Business Keys of other source objects for Link purposes #} prejoined_columns AS ( @@ -270,14 +271,12 @@ prejoined_columns AS ( SELECT {% if final_columns_to_select | length > 0 -%} {{ datavault4dbt.print_list(datavault4dbt.prefix(columns=datavault4dbt.escape_column_names(final_columns_to_select), prefix_str='lcte').split(',')) }} - {% endif %} - {%- for col, vals in prejoined_columns.items() -%} - ,pj_{{loop.index}}.{{ vals['bk'] }} AS {{ col }} - {% endfor -%} - - FROM {{ last_cte }} lcte + {%- endif -%} - {% for col, vals in prejoined_columns.items() %} + {#- prepare join statements -#} + {%- set prejoin_statements_list = [] -%} + {%- set processed_prejoin_hashes = [] -%} + {%- for col, vals in prejoined_columns.items() -%} {%- if 'src_name' in vals.keys() or 'src_table' in vals.keys() -%} {%- set relation = source(vals['src_name']|string, vals['src_table']) -%} @@ -316,15 +315,29 @@ prejoined_columns AS ( {%- set operator = vals['operator'] -%} {%- endif -%} - {%- set prejoin_alias = 'pj_' + loop.index|string -%} - left join {{ relation }} as {{ prejoin_alias }} - on {{ datavault4dbt.multikey(columns=vals['this_column_name'], prefix=['lcte', prejoin_alias], condition='=', operator=operator, right_columns=vals['ref_column_name']) }} + {%- set prejoin_hash = '"' ~ local_md5(relation~vals['this_column_name']~operator~vals['ref_column_name']) ~ '"' -%} - {% endfor %} + {%- if not prejoin_hash in processed_prejoin_hashes %} + {%- do processed_prejoin_hashes.append(prejoin_hash) %} + {%- set prejoin_join_statement_tmp -%} + left join {{ relation }} as {{ prejoin_hash }} + on {{ datavault4dbt.multikey(columns=vals['this_column_name'], prefix=['lcte', prejoin_hash], condition='=', operator=operator, right_columns=vals['ref_column_name']) }} + + {% endset -%} + {%- do prejoin_statements_list.append(prejoin_join_statement_tmp) -%} + {%- endif -%} + +{# select the prejoined columns #} + ,{{prejoin_hash}}.{{ vals['bk'] }} AS {{ col }} + {% endfor -%} + + FROM {{ last_cte }} lcte + + {{ prejoin_statements_list|join(' ')}} {% set last_cte = "prejoined_columns" -%} - {%- set final_columns_to_select = final_columns_to_select + prejoined_column_names %} + {%- set final_columns_to_select = final_columns_to_select + prejoined_column_names -%} ), {%- endif -%} diff --git a/macros/staging/synapse/stage.sql b/macros/staging/synapse/stage.sql index 84edee88..88356508 100644 --- a/macros/staging/synapse/stage.sql +++ b/macros/staging/synapse/stage.sql @@ -255,23 +255,20 @@ missing_columns AS ( ), {%- endif -%} -{%- if datavault4dbt.is_something(prejoined_columns) %} -{%- set final_columns_to_select = (final_columns_to_select + derived_input_columns) | unique | list -%} +{%- if datavault4dbt.is_something(prejoined_columns) %} {# Prejoining Business Keys of other source objects for Link purposes #} prejoined_columns AS ( SELECT {% if final_columns_to_select | length > 0 -%} {{ datavault4dbt.print_list(datavault4dbt.prefix(columns=datavault4dbt.escape_column_names(final_columns_to_select), prefix_str='lcte').split(',')) }} - {% endif %} - {%- for col, vals in prejoined_columns.items() -%} - ,pj_{{loop.index}}.{{ vals['bk'] }} AS {{ col }} - {% endfor -%} - - FROM {{ last_cte }} lcte + {%- endif -%} - {% for col, vals in prejoined_columns.items() %} + {#- prepare join statements -#} + {%- set prejoin_statements_list = [] -%} + {%- set processed_prejoin_hashes = [] -%} + {%- for col, vals in prejoined_columns.items() -%} {%- if 'src_name' in vals.keys() or 'src_table' in vals.keys() -%} {%- set relation = source(vals['src_name']|string, vals['src_table']) -%} @@ -310,15 +307,29 @@ prejoined_columns AS ( {%- set operator = vals['operator'] -%} {%- endif -%} - {%- set prejoin_alias = 'pj_' + loop.index|string -%} - left join {{ relation }} as {{ prejoin_alias }} - on {{ datavault4dbt.multikey(columns=vals['this_column_name'], prefix=['lcte', prejoin_alias], condition='=', operator=operator, right_columns=vals['ref_column_name']) }} + {%- set prejoin_hash = '"' ~ local_md5(relation~vals['this_column_name']~operator~vals['ref_column_name']) ~ '"' -%} - {% endfor %} + {%- if not prejoin_hash in processed_prejoin_hashes %} + {%- do processed_prejoin_hashes.append(prejoin_hash) %} + {%- set prejoin_join_statement_tmp -%} + left join {{ relation }} as {{ prejoin_hash }} + on {{ datavault4dbt.multikey(columns=vals['this_column_name'], prefix=['lcte', prejoin_hash], condition='=', operator=operator, right_columns=vals['ref_column_name']) }} + + {% endset -%} + {%- do prejoin_statements_list.append(prejoin_join_statement_tmp) -%} + {%- endif -%} + +{# select the prejoined columns #} + ,{{prejoin_hash}}.{{ vals['bk'] }} AS {{ col }} + {% endfor -%} + + FROM {{ last_cte }} lcte + + {{ prejoin_statements_list|join(' ')}} {% set last_cte = "prejoined_columns" -%} - {%- set final_columns_to_select = final_columns_to_select + prejoined_column_names %} + {%- set final_columns_to_select = final_columns_to_select + prejoined_column_names -%} ), {%- endif -%} From fa4087e5aad952b5038305f972dbb9448f0982b3 Mon Sep 17 00:00:00 2001 From: Theo Kiehn <162969167+tkiehn@users.noreply.github.com> Date: Wed, 27 Nov 2024 15:43:22 +0100 Subject: [PATCH 4/6] add check and compilation error if a prejoined column is defined twice --- .../helpers/stage_processing_macros.sql | 24 +++++++++++++------ 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/macros/internal/helpers/stage_processing_macros.sql b/macros/internal/helpers/stage_processing_macros.sql index 75a90978..71b0c718 100644 --- a/macros/internal/helpers/stage_processing_macros.sql +++ b/macros/internal/helpers/stage_processing_macros.sql @@ -144,7 +144,7 @@ {# If column aliases are present they they have to map 1:1 to the extract_columns #} {% if datavault4dbt.is_something(dict_item.aliases) - and not dict_item.aliases|length == dict_item.extract_columns|length%} + and not dict_item.aliases|length == dict_item.extract_columns|length %} {{ exceptions.raise_compiler_error("Prejoin aliases must have the same length as extract_columns") }} {% endif %} @@ -154,15 +154,20 @@ {# If aliases are defined they should be used as dict keys These will be used as new column names #} {% if datavault4dbt.is_something(dict_item.aliases) %} - {% set dict_key = dict_item.aliases[loop.index-1] %} + {% set dict_key = dict_item.aliases[loop.index0] %} {% else %} - {% set dict_key = dict_item.extract_columns[loop.index-1] %} + {% set dict_key = dict_item.extract_columns[loop.index0] %} + {% endif %} + + {# To make sure each column or alias is present only once #} + {% if dict_key|lower in return_dict.keys()|map('lower') %} + {{ exceptions.raise_compiler_error("Prejoined Column name or alias '" ~ dict_key ~ "' is defined twice.") }} {% endif %} {% set tmp_dict %} {{dict_key}}: ref_model: {{dict_item.ref_model}} - bk: {{dict_item.extract_columns[loop.index-1]}} + bk: {{dict_item.extract_columns[loop.index0]}} this_column_name: {{dict_item.this_column_name}} ref_column_name: {{dict_item.ref_column_name}} {% endset %} @@ -174,15 +179,20 @@ {# If aliases are defined they should be used as dict keys These will be used as new column names #} {% if datavault4dbt.is_something(dict_item.aliases) %} - {% set dict_key = dict_item.aliases[loop.index-1] %} + {% set dict_key = dict_item.aliases[loop.index0] %} {% else %} - {% set dict_key = dict_item.extract_columns[loop.index-1] %} + {% set dict_key = dict_item.extract_columns[loop.index0] %} + {% endif %} + + {# To make sure each column or alias is present only once #} + {% if dict_key|lower in return_dict.keys()|map('lower') %} + {{ exceptions.raise_compiler_error("Prejoined Column name or alias '" ~ dict_key ~ "' is defined twice.") }} {% endif %} {% set tmp_dict %} {{dict_key}}: ref_model: {{dict_item.ref_model}} - bk: {{dict_item.extract_columns[loop.index-1]}} + bk: {{dict_item.extract_columns[loop.index0]}} this_column_name: {{dict_item.this_column_name}} ref_column_name: {{dict_item.ref_column_name}} {% endset %} From 54b8720e9ccdb23e0f0fc0e282c6ec9c61ff5eb7 Mon Sep 17 00:00:00 2001 From: Theo Kiehn <162969167+tkiehn@users.noreply.github.com> Date: Wed, 27 Nov 2024 16:17:06 +0100 Subject: [PATCH 5/6] add amount of extract_columns and aliases to amount-mismatch compilation error message --- macros/internal/helpers/stage_processing_macros.sql | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/macros/internal/helpers/stage_processing_macros.sql b/macros/internal/helpers/stage_processing_macros.sql index 71b0c718..ad84a86a 100644 --- a/macros/internal/helpers/stage_processing_macros.sql +++ b/macros/internal/helpers/stage_processing_macros.sql @@ -145,7 +145,8 @@ {# If column aliases are present they they have to map 1:1 to the extract_columns #} {% if datavault4dbt.is_something(dict_item.aliases) and not dict_item.aliases|length == dict_item.extract_columns|length %} - {{ exceptions.raise_compiler_error("Prejoin aliases must have the same length as extract_columns") }} + {{ exceptions.raise_compiler_error("Prejoin aliases must have the same length as extract_columns. Got " + ~ dict_item.extract_columns|length ~ " extract_columns and " ~ dict_item.aliases|length ~ " aliases.") }} {% endif %} {# If multiple columns from the same source should be extracted each column has to be processed once #} From 60b2a985ef13f85f0ce80dbdc3dd93791a563c48 Mon Sep 17 00:00:00 2001 From: Theo Kiehn <162969167+tkiehn@users.noreply.github.com> Date: Wed, 27 Nov 2024 16:50:45 +0100 Subject: [PATCH 6/6] add prejoin with source to processing-macro --- .../internal/helpers/stage_processing_macros.sql | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/macros/internal/helpers/stage_processing_macros.sql b/macros/internal/helpers/stage_processing_macros.sql index ad84a86a..5881fc6a 100644 --- a/macros/internal/helpers/stage_processing_macros.sql +++ b/macros/internal/helpers/stage_processing_macros.sql @@ -167,7 +167,14 @@ {% set tmp_dict %} {{dict_key}}: + {%- if 'ref_model' in dict_item.keys()|map('lower') %} ref_model: {{dict_item.ref_model}} + {%- elif 'src_name' in dict_item.keys()|map('lower') and 'src_table' in dict_item.keys()|map('lower') %} + src_name: {{dict_item.src_name}} + src_table: {{dict_item.src_table}} + {%- else %} + {{ exceptions.raise_compiler_error("Either ref_model or src_name and src_table have to be defined for each prejoin") }} + {%- endif %} bk: {{dict_item.extract_columns[loop.index0]}} this_column_name: {{dict_item.this_column_name}} ref_column_name: {{dict_item.ref_column_name}} @@ -192,7 +199,14 @@ {% set tmp_dict %} {{dict_key}}: + {%- if 'ref_model' in dict_item.keys()|map('lower') %} ref_model: {{dict_item.ref_model}} + {%- elif 'src_name' in dict_item.keys()|map('lower') and 'src_table' in dict_item.keys()|map('lower') %} + src_name: {{dict_item.src_name}} + src_table: {{dict_item.src_table}} + {%- else %} + {{ exceptions.raise_compiler_error("Either ref_model or src_name and src_table have to be defined for each prejoin") }} + {%- endif %} bk: {{dict_item.extract_columns[loop.index0]}} this_column_name: {{dict_item.this_column_name}} ref_column_name: {{dict_item.ref_column_name}}