From 0c17b472d4063ee6a66d43b200d276bda63f5e61 Mon Sep 17 00:00:00 2001 From: tnk-ysk Date: Sat, 28 Oct 2023 02:08:48 +0900 Subject: [PATCH] Fix merge fails when partition is required --- dbt/include/bigquery/macros/adapters.sql | 55 +++++++++++++++++++ .../incremental_strategy_fixtures.py | 3 +- 2 files changed, 57 insertions(+), 1 deletion(-) diff --git a/dbt/include/bigquery/macros/adapters.sql b/dbt/include/bigquery/macros/adapters.sql index f166e5d05..bb56dfaa4 100644 --- a/dbt/include/bigquery/macros/adapters.sql +++ b/dbt/include/bigquery/macros/adapters.sql @@ -193,3 +193,58 @@ having count(*) > 1 {% do adapter.upload_file(local_file_path, database, table_schema, table_name, kwargs=kwargs) %} {% endmacro %} + +{% macro bigquery__get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates=none) -%} + {%- set predicates = [] if incremental_predicates is none else [] + incremental_predicates -%} + {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} + {%- set merge_update_columns = config.get('merge_update_columns') -%} + {%- set merge_exclude_columns = config.get('merge_exclude_columns') -%} + {%- set update_columns = get_merge_update_columns(merge_update_columns, merge_exclude_columns, dest_columns) -%} + {%- set sql_header = config.get('sql_header', none) -%} + {%- set raw_partition_by = config.get('partition_by', none) -%} + {%- set partition_config = adapter.parse_partition_by(raw_partition_by) -%} + + {% if unique_key %} + {% if unique_key is sequence and unique_key is not mapping and unique_key is not string %} + {% for key in unique_key %} + {% set this_key_match %} + DBT_INTERNAL_SOURCE.{{ key }} = DBT_INTERNAL_DEST.{{ key }} + {% endset %} + {% do predicates.append(this_key_match) %} + {% endfor %} + {% else %} + {% set unique_key_match %} + DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }} + {% endset %} + {% do predicates.append(unique_key_match) %} + {% endif %} + {% if partition_config %} + {% set avoid_require_partition %} + DBT_INTERNAL_DEST.{{ partition_config.field }} IS NULL OR DBT_INTERNAL_DEST.{{ partition_config.field }} IS NOT NULL + {% endset %} + {% do predicates.append(avoid_require_partition) %} + {% endif %} + {% else %} + {% do predicates.append('FALSE') %} + {% endif %} + + {{ sql_header if sql_header is not none }} + + MERGE into {{ target }} as DBT_INTERNAL_DEST + using {{ source }} as DBT_INTERNAL_SOURCE + on {{"(" ~ predicates | join(") and (") ~ ")"}} + + {% if unique_key %} + when matched then update set + {% for column_name in update_columns -%} + {{ column_name }} = DBT_INTERNAL_SOURCE.{{ column_name }} + {%- if not loop.last %}, {%- endif %} + {%- endfor %} + {% endif %} + + when not matched then insert + ({{ dest_cols_csv }}) + values + ({{ dest_cols_csv }}) + +{% endmacro %} diff --git a/tests/functional/adapter/incremental/incremental_strategy_fixtures.py b/tests/functional/adapter/incremental/incremental_strategy_fixtures.py index 8dd470ffb..d0c2a83c7 100644 --- a/tests/functional/adapter/incremental/incremental_strategy_fixtures.py +++ b/tests/functional/adapter/incremental/incremental_strategy_fixtures.py @@ -55,7 +55,8 @@ partition_by={ "field": "date_time", "data_type": "dateTime" - } + }, + require_partition_filter=true ) }}