Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: extract multiple columns from one prejoined object #290

98 changes: 97 additions & 1 deletion macros/internal/helpers/stage_processing_macros.sql
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,100 @@
{%- endif %}
{%- endfor -%}

{%- endmacro -%}
{%- endmacro -%}


{%- macro process_prejoined_columns(prejoined_columns=none) -%}
{# Check if the new list syntax is used for prejoined columns
If so parse it to dictionaries #}

{% if not datavault4dbt.is_list(prejoined_columns) %}
{% do return(prejoined_columns) %}
{% else %}
{# if the (new) list syntax for prejoins is used
it needs to be converted to the old syntax #}

{# Initialize emtpy dict which will be filled by each entry #}
{% set return_dict = {} %}

{# Iterate over each dictionary in the prejoined_colums-list #}
{% for dict_item in prejoined_columns %}

{# If column aliases are present they they have to map 1:1 to the extract_columns #}
{% if datavault4dbt.is_something(dict_item.aliases)
and not dict_item.aliases|length == dict_item.extract_columns|length %}
{{ exceptions.raise_compiler_error("Prejoin aliases must have the same length as extract_columns. Got "
~ dict_item.extract_columns|length ~ " extract_columns and " ~ dict_item.aliases|length ~ " aliases.") }}
{% endif %}

{# If multiple columns from the same source should be extracted each column has to be processed once #}
{% if datavault4dbt.is_list(dict_item.extract_columns) %}
{% for column in dict_item.extract_columns %}
{# If aliases are defined they should be used as dict keys
These will be used as new column names #}
{% if datavault4dbt.is_something(dict_item.aliases) %}
{% set dict_key = dict_item.aliases[loop.index0] %}
{% else %}
{% set dict_key = dict_item.extract_columns[loop.index0] %}
{% endif %}

{# To make sure each column or alias is present only once #}
{% if dict_key|lower in return_dict.keys()|map('lower') %}
{{ exceptions.raise_compiler_error("Prejoined Column name or alias '" ~ dict_key ~ "' is defined twice.") }}
{% endif %}

{% set tmp_dict %}
{{dict_key}}:
{%- if 'ref_model' in dict_item.keys()|map('lower') %}
ref_model: {{dict_item.ref_model}}
{%- elif 'src_name' in dict_item.keys()|map('lower') and 'src_table' in dict_item.keys()|map('lower') %}
src_name: {{dict_item.src_name}}
src_table: {{dict_item.src_table}}
{%- else %}
{{ exceptions.raise_compiler_error("Either ref_model or src_name and src_table have to be defined for each prejoin") }}
{%- endif %}
bk: {{dict_item.extract_columns[loop.index0]}}
this_column_name: {{dict_item.this_column_name}}
ref_column_name: {{dict_item.ref_column_name}}
{% endset %}
{% do return_dict.update(fromyaml(tmp_dict)) %}
{% endfor %}

{% else %}

{# If aliases are defined they should be used as dict keys
These will be used as new column names #}
{% if datavault4dbt.is_something(dict_item.aliases) %}
{% set dict_key = dict_item.aliases[loop.index0] %}
{% else %}
{% set dict_key = dict_item.extract_columns[loop.index0] %}
{% endif %}

{# To make sure each column or alias is present only once #}
{% if dict_key|lower in return_dict.keys()|map('lower') %}
{{ exceptions.raise_compiler_error("Prejoined Column name or alias '" ~ dict_key ~ "' is defined twice.") }}
{% endif %}

{% set tmp_dict %}
{{dict_key}}:
{%- if 'ref_model' in dict_item.keys()|map('lower') %}
ref_model: {{dict_item.ref_model}}
{%- elif 'src_name' in dict_item.keys()|map('lower') and 'src_table' in dict_item.keys()|map('lower') %}
src_name: {{dict_item.src_name}}
src_table: {{dict_item.src_table}}
{%- else %}
{{ exceptions.raise_compiler_error("Either ref_model or src_name and src_table have to be defined for each prejoin") }}
{%- endif %}
bk: {{dict_item.extract_columns[loop.index0]}}
this_column_name: {{dict_item.this_column_name}}
ref_column_name: {{dict_item.ref_column_name}}
{% endset %}
{% do return_dict.update(fromyaml(tmp_dict)) %}
{% endif %}
{% endfor %}

{%- do return(return_dict) -%}

{% endif %}

{%- endmacro -%}
37 changes: 25 additions & 12 deletions macros/staging/bigquery/stage.sql
Original file line number Diff line number Diff line change
Expand Up @@ -256,21 +256,20 @@ missing_columns AS (
),
{%- endif -%}


{%- if datavault4dbt.is_something(prejoined_columns) %}
{# Prejoining Business Keys of other source objects for Link purposes #}
prejoined_columns AS (

SELECT
{% if final_columns_to_select | length > 0 -%}
{{ datavault4dbt.print_list(datavault4dbt.prefix(columns=datavault4dbt.escape_column_names(final_columns_to_select), prefix_str='lcte').split(',')) }}
{% endif %}
{%- for col, vals in prejoined_columns.items() -%}
,pj_{{loop.index}}.{{ vals['bk'] }} AS {{ col }}
{% endfor -%}

FROM {{ last_cte }} lcte
{%- endif -%}

{% for col, vals in prejoined_columns.items() %}
{#- prepare join statements -#}
{%- set prejoin_statements_list = [] -%}
{%- set processed_prejoin_hashes = [] -%}
{%- for col, vals in prejoined_columns.items() -%}

{%- if 'src_name' in vals.keys() or 'src_table' in vals.keys() -%}
{%- set relation = source(vals['src_name']|string, vals['src_table']) -%}
Expand Down Expand Up @@ -309,15 +308,29 @@ prejoined_columns AS (
{%- set operator = vals['operator'] -%}
{%- endif -%}

{%- set prejoin_alias = 'pj_' + loop.index|string -%}

left join {{ relation }} as {{ prejoin_alias }}
on {{ datavault4dbt.multikey(columns=vals['this_column_name'], prefix=['lcte', prejoin_alias], condition='=', operator=operator, right_columns=vals['ref_column_name']) }}
{%- set prejoin_hash = '`' ~ local_md5(relation~vals['this_column_name']~operator~vals['ref_column_name']) ~ '`' -%}

{% endfor %}
{%- if not prejoin_hash in processed_prejoin_hashes %}
{%- do processed_prejoin_hashes.append(prejoin_hash) %}
{%- set prejoin_join_statement_tmp -%}
left join {{ relation }} as {{ prejoin_hash }}
on {{ datavault4dbt.multikey(columns=vals['this_column_name'], prefix=['lcte', prejoin_hash], condition='=', operator=operator, right_columns=vals['ref_column_name']) }}

{% endset -%}
{%- do prejoin_statements_list.append(prejoin_join_statement_tmp) -%}
{%- endif -%}

{# select the prejoined columns #}
,{{prejoin_hash}}.{{ vals['bk'] }} AS {{ col }}
{% endfor -%}

FROM {{ last_cte }} lcte

{{ prejoin_statements_list|join(' ')}}

{% set last_cte = "prejoined_columns" -%}
{%- set final_columns_to_select = final_columns_to_select + prejoined_column_names %}
{%- set final_columns_to_select = final_columns_to_select + prejoined_column_names -%}
),
{%- endif -%}

Expand Down
38 changes: 25 additions & 13 deletions macros/staging/databricks/stage.sql
Original file line number Diff line number Diff line change
Expand Up @@ -253,21 +253,20 @@ missing_columns AS (
),
{%- endif -%}


{%- if datavault4dbt.is_something(prejoined_columns) %}
{# Prejoining Business Keys of other source objects for Link purposes #}
prejoined_columns AS (

SELECT
{% if final_columns_to_select | length > 0 -%}
{{ datavault4dbt.print_list(datavault4dbt.prefix(columns=datavault4dbt.escape_column_names(final_columns_to_select), prefix_str='lcte').split(',')) }}
{% endif %}
{%- for col, vals in prejoined_columns.items() -%}
,pj_{{loop.index}}.{{ vals['bk'] }} AS {{ col }}
{% endfor -%}

FROM {{ last_cte }} lcte
{%- endif -%}

{% for col, vals in prejoined_columns.items() %}
{#- prepare join statements -#}
{%- set prejoin_statements_list = [] -%}
{%- set processed_prejoin_hashes = [] -%}
{%- for col, vals in prejoined_columns.items() -%}

{%- if 'src_name' in vals.keys() or 'src_table' in vals.keys() -%}
{%- set relation = source(vals['src_name']|string, vals['src_table']) -%}
Expand Down Expand Up @@ -306,19 +305,32 @@ prejoined_columns AS (
{%- set operator = vals['operator'] -%}
{%- endif -%}

{%- set prejoin_alias = 'pj_' + loop.index|string -%}

left join {{ relation }} as {{ prejoin_alias }}
on {{ datavault4dbt.multikey(columns=vals['this_column_name'], prefix=['lcte', prejoin_alias], condition='=', operator=operator, right_columns=vals['ref_column_name']) }}
{%- set prejoin_hash = '`' ~ local_md5(relation~vals['this_column_name']~operator~vals['ref_column_name']) ~ '`' -%}

{% endfor %}
{%- if not prejoin_hash in processed_prejoin_hashes %}
{%- do processed_prejoin_hashes.append(prejoin_hash) %}
{%- set prejoin_join_statement_tmp -%}
left join {{ relation }} as {{ prejoin_hash }}
on {{ datavault4dbt.multikey(columns=vals['this_column_name'], prefix=['lcte', prejoin_hash], condition='=', operator=operator, right_columns=vals['ref_column_name']) }}

{% endset -%}
{%- do prejoin_statements_list.append(prejoin_join_statement_tmp) -%}
{%- endif -%}

{# select the prejoined columns #}
,{{prejoin_hash}}.{{ vals['bk'] }} AS {{ col }}
{% endfor -%}

FROM {{ last_cte }} lcte

{{ prejoin_statements_list|join(' ')}}

{% set last_cte = "prejoined_columns" -%}
{%- set final_columns_to_select = final_columns_to_select + prejoined_column_names %}
{%- set final_columns_to_select = final_columns_to_select + prejoined_column_names -%}
),
{%- endif -%}


{%- if datavault4dbt.is_something(derived_columns) %}
{# Adding derived columns to the selection #}
derived_columns AS (
Expand Down
37 changes: 25 additions & 12 deletions macros/staging/exasol/stage.sql
Original file line number Diff line number Diff line change
Expand Up @@ -244,21 +244,20 @@ missing_columns AS (
),
{%- endif -%}


{%- if datavault4dbt.is_something(prejoined_columns) %}
{# Prejoining Business Keys of other source objects for Link purposes #}
prejoined_columns AS (

SELECT
{% if final_columns_to_select | length > 0 -%}
{{ datavault4dbt.print_list(datavault4dbt.prefix(columns=datavault4dbt.escape_column_names(final_columns_to_select), prefix_str='lcte').split(',')) }}
{% endif %}
{%- for col, vals in prejoined_columns.items() -%}
,pj_{{loop.index}}.{{ vals['bk'] }} AS "{{ col | upper }}"
{% endfor -%}

FROM {{ last_cte }} lcte
{%- endif -%}

{% for col, vals in prejoined_columns.items() %}
{#- prepare join statements -#}
{%- set prejoin_statements_list = [] -%}
{%- set processed_prejoin_hashes = [] -%}
{%- for col, vals in prejoined_columns.items() -%}

{%- if 'src_name' in vals.keys() or 'src_table' in vals.keys() -%}
{%- set relation = source(vals['src_name']|string, vals['src_table']) -%}
Expand Down Expand Up @@ -297,15 +296,29 @@ prejoined_columns AS (
{%- set operator = vals['operator'] -%}
{%- endif -%}

{%- set prejoin_alias = 'pj_' + loop.index|string -%}

left join {{ relation }} as {{ prejoin_alias }}
on {{ datavault4dbt.multikey(columns=vals['this_column_name'], prefix=['lcte', prejoin_alias], condition='=', operator=operator, right_columns=vals['ref_column_name']) }}
{%- set prejoin_hash = '"' ~ local_md5(relation~vals['this_column_name']~operator~vals['ref_column_name']) ~ '"' -%}

{% endfor %}
{%- if not prejoin_hash in processed_prejoin_hashes %}
{%- do processed_prejoin_hashes.append(prejoin_hash) %}
{%- set prejoin_join_statement_tmp -%}
left join {{ relation }} as {{ prejoin_hash }}
on {{ datavault4dbt.multikey(columns=vals['this_column_name'], prefix=['lcte', prejoin_hash], condition='=', operator=operator, right_columns=vals['ref_column_name']) }}

{% endset -%}
{%- do prejoin_statements_list.append(prejoin_join_statement_tmp) -%}
{%- endif -%}

{# select the prejoined columns #}
,{{prejoin_hash}}.{{ vals['bk'] }} AS {{ col }}
{% endfor -%}

FROM {{ last_cte }} lcte

{{ prejoin_statements_list|join(' ')}}

{% set last_cte = "prejoined_columns" -%}
{%- set final_columns_to_select = final_columns_to_select + prejoined_column_names %}
{%- set final_columns_to_select = final_columns_to_select + prejoined_column_names -%}
),
{%- endif -%}

Expand Down
40 changes: 26 additions & 14 deletions macros/staging/fabric/stage.sql
Original file line number Diff line number Diff line change
Expand Up @@ -253,23 +253,21 @@ missing_columns AS (
),
{%- endif -%}

{%- if datavault4dbt.is_something(prejoined_columns) %}

{%- set final_columns_to_select = (final_columns_to_select + derived_input_columns) | unique | list -%}
{%- if datavault4dbt.is_something(prejoined_columns) %}
{# Prejoining Business Keys of other source objects for Link purposes #}

prejoined_columns AS (

SELECT
{% if final_columns_to_select | length > 0 -%}
{{ datavault4dbt.print_list(datavault4dbt.prefix(columns=datavault4dbt.escape_column_names(final_columns_to_select), prefix_str='lcte').split(',')) }}
{% endif %}
{%- for col, vals in prejoined_columns.items() -%}
,pj_{{loop.index}}.{{datavault4dbt.escape_column_names(vals['bk'])}} AS {{datavault4dbt.escape_column_names(col)}}
{% endfor -%}

FROM {{ last_cte }} lcte
{%- endif -%}

{% for col, vals in prejoined_columns.items() %}
{#- prepare join statements -#}
{%- set prejoin_statements_list = [] -%}
{%- set processed_prejoin_hashes = [] -%}
{%- for col, vals in prejoined_columns.items() -%}

{%- if 'src_name' in vals.keys() or 'src_table' in vals.keys() -%}
{%- set relation = source(vals['src_name']|string, vals['src_table']) -%}
Expand Down Expand Up @@ -308,15 +306,29 @@ prejoined_columns AS (
{%- set operator = vals['operator'] -%}
{%- endif -%}

{%- set prejoin_alias = 'pj_' + loop.index|string -%}

left join {{ relation }} as {{ prejoin_alias }}
on {{ datavault4dbt.multikey(columns=datavault4dbt.escape_column_names(vals['this_column_name']), prefix=['lcte', prejoin_alias], condition='=', operator=operator, right_columns=datavault4dbt.escape_column_names(vals['ref_column_name'])) }}
{%- set prejoin_hash = '"' ~ local_md5(relation~vals['this_column_name']~operator~vals['ref_column_name']) ~ '"' -%}

{% endfor %}
{%- if not prejoin_hash in processed_prejoin_hashes %}
{%- do processed_prejoin_hashes.append(prejoin_hash) %}
{%- set prejoin_join_statement_tmp -%}
left join {{ relation }} as {{ prejoin_hash }}
on {{ datavault4dbt.multikey(columns=vals['this_column_name'], prefix=['lcte', prejoin_hash], condition='=', operator=operator, right_columns=vals['ref_column_name']) }}

{% endset -%}
{%- do prejoin_statements_list.append(prejoin_join_statement_tmp) -%}
{%- endif -%}

{# select the prejoined columns #}
,{{prejoin_hash}}.{{ vals['bk'] }} AS {{ col }}
{% endfor -%}

FROM {{ last_cte }} lcte

{{ prejoin_statements_list|join(' ')}}

{% set last_cte = "prejoined_columns" -%}
{%- set final_columns_to_select = final_columns_to_select + prejoined_column_names %}
{%- set final_columns_to_select = final_columns_to_select + prejoined_column_names -%}
),
{%- endif -%}

Expand Down
Loading