Skip to content

Commit

Permalink
Fix merge fails when partition is required
Browse files Browse the repository at this point in the history
  • Loading branch information
tnk-ysk committed Oct 27, 2023
1 parent 0fcb2c8 commit 8651295
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 1 deletion.
55 changes: 55 additions & 0 deletions dbt/include/bigquery/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,58 @@ having count(*) > 1
{% do adapter.upload_file(local_file_path, database, table_schema, table_name, kwargs=kwargs) %}

{% endmacro %}

{% macro bigquery__get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates=none) -%}
{%- set predicates = [] if incremental_predicates is none else [] + incremental_predicates -%}
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
{%- set merge_update_columns = config.get('merge_update_columns') -%}
{%- set merge_exclude_columns = config.get('merge_exclude_columns') -%}
{%- set update_columns = get_merge_update_columns(merge_update_columns, merge_exclude_columns, dest_columns) -%}
{%- set sql_header = config.get('sql_header', none) -%}
{%- set raw_partition_by = config.get('partition_by', none) -%}
{%- set partition_config = adapter.parse_partition_by(raw_partition_by) -%}

{% if unique_key %}
{% if unique_key is sequence and unique_key is not mapping and unique_key is not string %}
{% for key in unique_key %}
{% set this_key_match %}
DBT_INTERNAL_SOURCE.{{ key }} = DBT_INTERNAL_DEST.{{ key }}
{% endset %}
{% do predicates.append(this_key_match) %}
{% endfor %}
{% else %}
{% set unique_key_match %}
DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }}
{% endset %}
{% do predicates.append(unique_key_match) %}
{% endif %}
{% if partition_config %}
{% set avoid_require_partition %}
DBT_INTERNAL_DEST.{{ partition_config.field }} IS NULL OR DBT_INTERNAL_DEST.{{ partition_config.field }} IS NOT NULL
{% endset %}
{% do predicates.append(avoid_require_partition) %}
{% endif %}
{% else %}
{% do predicates.append('FALSE') %}
{% endif %}

{{ sql_header if sql_header is not none }}

MERGE into {{ target }} as DBT_INTERNAL_DEST
using {{ source }} as DBT_INTERNAL_SOURCE
on {{"(" ~ predicates | join(") and (") ~ ")"}}

{% if unique_key %}
when matched then update set
{% for column_name in update_columns -%}
{{ column_name }} = DBT_INTERNAL_SOURCE.{{ column_name }}
{%- if not loop.last %}, {%- endif %}
{%- endfor %}
{% endif %}

when not matched then insert
({{ dest_cols_csv }})
values
({{ dest_cols_csv }})

{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@
partition_by={
"field": "date_time",
"data_type": "dateTime"
}
},
require_partition_filter=true
)
}}
Expand Down

0 comments on commit 8651295

Please sign in to comment.