From 0c17b472d4063ee6a66d43b200d276bda63f5e61 Mon Sep 17 00:00:00 2001 From: tnk-ysk Date: Sat, 28 Oct 2023 02:08:48 +0900 Subject: [PATCH 1/8] Fix merge fails when partition is required --- dbt/include/bigquery/macros/adapters.sql | 55 +++++++++++++++++++ .../incremental_strategy_fixtures.py | 3 +- 2 files changed, 57 insertions(+), 1 deletion(-) diff --git a/dbt/include/bigquery/macros/adapters.sql b/dbt/include/bigquery/macros/adapters.sql index f166e5d05..bb56dfaa4 100644 --- a/dbt/include/bigquery/macros/adapters.sql +++ b/dbt/include/bigquery/macros/adapters.sql @@ -193,3 +193,58 @@ having count(*) > 1 {% do adapter.upload_file(local_file_path, database, table_schema, table_name, kwargs=kwargs) %} {% endmacro %} + +{% macro bigquery__get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates=none) -%} + {%- set predicates = [] if incremental_predicates is none else [] + incremental_predicates -%} + {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} + {%- set merge_update_columns = config.get('merge_update_columns') -%} + {%- set merge_exclude_columns = config.get('merge_exclude_columns') -%} + {%- set update_columns = get_merge_update_columns(merge_update_columns, merge_exclude_columns, dest_columns) -%} + {%- set sql_header = config.get('sql_header', none) -%} + {%- set raw_partition_by = config.get('partition_by', none) -%} + {%- set partition_config = adapter.parse_partition_by(raw_partition_by) -%} + + {% if unique_key %} + {% if unique_key is sequence and unique_key is not mapping and unique_key is not string %} + {% for key in unique_key %} + {% set this_key_match %} + DBT_INTERNAL_SOURCE.{{ key }} = DBT_INTERNAL_DEST.{{ key }} + {% endset %} + {% do predicates.append(this_key_match) %} + {% endfor %} + {% else %} + {% set unique_key_match %} + DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }} + {% endset %} + {% do predicates.append(unique_key_match) %} + {% endif %} + {% if partition_config %} + {% set avoid_require_partition %} + DBT_INTERNAL_DEST.{{ partition_config.field }} IS NULL OR DBT_INTERNAL_DEST.{{ partition_config.field }} IS NOT NULL + {% endset %} + {% do predicates.append(avoid_require_partition) %} + {% endif %} + {% else %} + {% do predicates.append('FALSE') %} + {% endif %} + + {{ sql_header if sql_header is not none }} + + MERGE into {{ target }} as DBT_INTERNAL_DEST + using {{ source }} as DBT_INTERNAL_SOURCE + on {{"(" ~ predicates | join(") and (") ~ ")"}} + + {% if unique_key %} + when matched then update set + {% for column_name in update_columns -%} + {{ column_name }} = DBT_INTERNAL_SOURCE.{{ column_name }} + {%- if not loop.last %}, {%- endif %} + {%- endfor %} + {% endif %} + + when not matched then insert + ({{ dest_cols_csv }}) + values + ({{ dest_cols_csv }}) + +{% endmacro %} diff --git a/tests/functional/adapter/incremental/incremental_strategy_fixtures.py b/tests/functional/adapter/incremental/incremental_strategy_fixtures.py index 8dd470ffb..d0c2a83c7 100644 --- a/tests/functional/adapter/incremental/incremental_strategy_fixtures.py +++ b/tests/functional/adapter/incremental/incremental_strategy_fixtures.py @@ -55,7 +55,8 @@ partition_by={ "field": "date_time", "data_type": "dateTime" - } + }, + require_partition_filter=true ) }} From fd7cbb0c0e4af585fa051ed618ea216eda835167 Mon Sep 17 00:00:00 2001 From: tnk-ysk Date: Sat, 28 Oct 2023 21:49:27 +0900 Subject: [PATCH 2/8] Fix condition for avoid_require_partition --- dbt/include/bigquery/macros/adapters.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbt/include/bigquery/macros/adapters.sql b/dbt/include/bigquery/macros/adapters.sql index bb56dfaa4..147fd814d 100644 --- a/dbt/include/bigquery/macros/adapters.sql +++ b/dbt/include/bigquery/macros/adapters.sql @@ -218,12 +218,12 @@ having count(*) > 1 {% endset %} {% do predicates.append(unique_key_match) %} {% endif %} - {% if partition_config %} + {% if partition_config and config.get('require_partition_filter') -%} {% set avoid_require_partition %} DBT_INTERNAL_DEST.{{ partition_config.field }} IS NULL OR DBT_INTERNAL_DEST.{{ partition_config.field }} IS NOT NULL {% endset %} {% do predicates.append(avoid_require_partition) %} - {% endif %} + {%- endif -%} {% else %} {% do predicates.append('FALSE') %} {% endif %} From 005372ba069b1b9ee5f1765d5b3c1672b70969a2 Mon Sep 17 00:00:00 2001 From: tnk-ysk Date: Sun, 5 Nov 2023 15:18:39 +0900 Subject: [PATCH 3/8] Add changes --- .changes/unreleased/Fixes-20231105-143145.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changes/unreleased/Fixes-20231105-143145.yaml diff --git a/.changes/unreleased/Fixes-20231105-143145.yaml b/.changes/unreleased/Fixes-20231105-143145.yaml new file mode 100644 index 000000000..715fb0f7f --- /dev/null +++ b/.changes/unreleased/Fixes-20231105-143145.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: Merge fails when partition is required +time: 2023-11-05T14:31:45.869783+09:00 +custom: + Author: tnk-ysk + Issue: "792" From f3c1bf29ce52eb781b9544d669310a8059d614ca Mon Sep 17 00:00:00 2001 From: tnk-ysk Date: Sun, 5 Nov 2023 15:19:30 +0900 Subject: [PATCH 4/8] Add test for require_partition_filter --- .../incremental_strategy_fixtures.py | 58 ++++++++++++++++++- .../test_incremental_strategies.py | 7 ++- 2 files changed, 62 insertions(+), 3 deletions(-) diff --git a/tests/functional/adapter/incremental/incremental_strategy_fixtures.py b/tests/functional/adapter/incremental/incremental_strategy_fixtures.py index d0c2a83c7..d1f2d5c29 100644 --- a/tests/functional/adapter/incremental/incremental_strategy_fixtures.py +++ b/tests/functional/adapter/incremental/incremental_strategy_fixtures.py @@ -47,6 +47,50 @@ """.lstrip() merge_time_sql = """ +{{ + config( + materialized="incremental", + unique_key="id", + cluster_by="id", + partition_by={ + "field": "date_time", + "data_type": "dateTime" + } + ) +}} + + + +with data as ( + + {% if not is_incremental() %} + + select 1 as id, cast('2020-01-01' as datetime) as date_time union all + select 2 as id, cast('2020-01-01' as datetime) as date_time union all + select 3 as id, cast('2020-01-01' as datetime) as date_time union all + select 4 as id, cast('2020-01-01' as datetime) as date_time + + {% else %} + + select 1 as id, cast('2020-01-01' as datetime) as date_time union all + select 2 as id, cast('2020-01-01' as datetime) as date_time union all + select 3 as id, cast('2020-01-01' as datetime) as date_time union all + select 4 as id, cast('2020-01-02' as datetime) as date_time union all + select 5 as id, cast('2020-01-02' as datetime) as date_time union all + select 6 as id, cast('2020-01-02' as datetime) as date_time + + {% endif %} + +) + +select * from data + +{% if is_incremental() %} +where date_time > (select max(date_time) from {{ this }}) +{% endif %} +""".lstrip() + +merge_time_with_require_partition_sql = """ {{ config( materialized="incremental", @@ -56,6 +100,10 @@ "field": "date_time", "data_type": "dateTime" }, + post_hook=" + create or replace view `{{ schema }}.incremental_merge_time_with_require_partition_view` + as select * from {{ this }} where date_time is null or date_time is not null + ", require_partition_filter=true ) }} @@ -87,7 +135,15 @@ select * from data {% if is_incremental() %} -where date_time > (select max(date_time) from {{ this }}) +where date_time > ( + select max(date_time) + from {{ this }} + where ( + date_time is null + or date_time is not null + ) +) + {% endif %} """.lstrip() diff --git a/tests/functional/adapter/incremental/test_incremental_strategies.py b/tests/functional/adapter/incremental/test_incremental_strategies.py index b3a51ad09..1a339d601 100644 --- a/tests/functional/adapter/incremental/test_incremental_strategies.py +++ b/tests/functional/adapter/incremental/test_incremental_strategies.py @@ -17,6 +17,7 @@ from tests.functional.adapter.incremental.incremental_strategy_fixtures import ( merge_range_sql, merge_time_sql, + merge_time_with_require_partition_sql, overwrite_date_sql, overwrite_day_sql, overwrite_day_with_copy_partitions_sql, @@ -39,6 +40,7 @@ def models(self): return { "incremental_merge_range.sql": merge_range_sql, "incremental_merge_time.sql": merge_time_sql, + "incremental_merge_time_with_require_partition.sql": merge_time_with_require_partition_sql, "incremental_overwrite_date.sql": overwrite_date_sql, "incremental_overwrite_day.sql": overwrite_day_sql, "incremental_overwrite_day_with_copy_partitions.sql": overwrite_day_with_copy_partitions_sql, @@ -65,13 +67,14 @@ def seeds(self): def test__bigquery_assert_incremental_configurations_apply_the_right_strategy(self, project): run_dbt(["seed"]) results = run_dbt() - assert len(results) == 11 + assert len(results) == 12 results = run_dbt() - assert len(results) == 11 + assert len(results) == 12 incremental_strategies = [ ("incremental_merge_range", "merge_expected"), ("incremental_merge_time", "merge_expected"), + ("incremental_merge_time_with_require_partition_view", "merge_expected"), ("incremental_overwrite_time", "incremental_overwrite_time_expected"), ("incremental_overwrite_date", "incremental_overwrite_date_expected"), ("incremental_overwrite_partitions", "incremental_overwrite_date_expected"), From e3c243572f57b7f05460a5e940054760d20ed629 Mon Sep 17 00:00:00 2001 From: tnk-ysk Date: Sun, 5 Nov 2023 15:19:52 +0900 Subject: [PATCH 5/8] Refactoring avoid require_partition_filter --- dbt/include/bigquery/macros/adapters.sql | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/dbt/include/bigquery/macros/adapters.sql b/dbt/include/bigquery/macros/adapters.sql index 147fd814d..af0477e0d 100644 --- a/dbt/include/bigquery/macros/adapters.sql +++ b/dbt/include/bigquery/macros/adapters.sql @@ -194,6 +194,17 @@ having count(*) > 1 {% endmacro %} +{% macro avoid_require_partition_filter(target, predicates) %} + {%- set raw_partition_by = config.get('partition_by', none) -%} + {%- set partition_config = adapter.parse_partition_by(raw_partition_by) -%} + {% if partition_config and config.get('require_partition_filter') -%} + {% set predicate %} + {{ target }}.{{ partition_config.field }} is null or {{ target }}.{{ partition_config.field }} is not null + {% endset %} + {% do predicates.append(predicate) %} + {%- endif -%} +{% endmacro %} + {% macro bigquery__get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates=none) -%} {%- set predicates = [] if incremental_predicates is none else [] + incremental_predicates -%} {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} @@ -201,8 +212,6 @@ having count(*) > 1 {%- set merge_exclude_columns = config.get('merge_exclude_columns') -%} {%- set update_columns = get_merge_update_columns(merge_update_columns, merge_exclude_columns, dest_columns) -%} {%- set sql_header = config.get('sql_header', none) -%} - {%- set raw_partition_by = config.get('partition_by', none) -%} - {%- set partition_config = adapter.parse_partition_by(raw_partition_by) -%} {% if unique_key %} {% if unique_key is sequence and unique_key is not mapping and unique_key is not string %} @@ -218,12 +227,7 @@ having count(*) > 1 {% endset %} {% do predicates.append(unique_key_match) %} {% endif %} - {% if partition_config and config.get('require_partition_filter') -%} - {% set avoid_require_partition %} - DBT_INTERNAL_DEST.{{ partition_config.field }} IS NULL OR DBT_INTERNAL_DEST.{{ partition_config.field }} IS NOT NULL - {% endset %} - {% do predicates.append(avoid_require_partition) %} - {%- endif -%} + {{ avoid_require_partition_filter('DBT_INTERNAL_DEST', predicates) }} {% else %} {% do predicates.append('FALSE') %} {% endif %} From a28a5db317e67fad86ab1513582ffbe58fe070b9 Mon Sep 17 00:00:00 2001 From: tnk-ysk Date: Sun, 5 Nov 2023 18:31:59 +0900 Subject: [PATCH 6/8] Fix time_ingestion_partitioning --- dbt/include/bigquery/macros/adapters.sql | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dbt/include/bigquery/macros/adapters.sql b/dbt/include/bigquery/macros/adapters.sql index af0477e0d..329064f6d 100644 --- a/dbt/include/bigquery/macros/adapters.sql +++ b/dbt/include/bigquery/macros/adapters.sql @@ -198,8 +198,9 @@ having count(*) > 1 {%- set raw_partition_by = config.get('partition_by', none) -%} {%- set partition_config = adapter.parse_partition_by(raw_partition_by) -%} {% if partition_config and config.get('require_partition_filter') -%} + {%- set partition_field = partition_config.time_partitioning_field() if partition_config.time_ingestion_partitioning else partition_config.field -%} {% set predicate %} - {{ target }}.{{ partition_config.field }} is null or {{ target }}.{{ partition_config.field }} is not null + {{ target }}.`{{ partition_field }}` is null or {{ target }}.`{{ partition_field }}` is not null {% endset %} {% do predicates.append(predicate) %} {%- endif -%} From 4468dbb372e10774a7fa6eb72952a3d770c38b36 Mon Sep 17 00:00:00 2001 From: tnk-ysk Date: Mon, 6 Nov 2023 11:47:26 +0900 Subject: [PATCH 7/8] Refactoring avoid require_partition_filter --- dbt/include/bigquery/macros/adapters.sql | 60 ------------------- .../incremental_strategy/common.sql | 20 +++++++ .../incremental_strategy/merge.sql | 8 ++- 3 files changed, 27 insertions(+), 61 deletions(-) diff --git a/dbt/include/bigquery/macros/adapters.sql b/dbt/include/bigquery/macros/adapters.sql index 329064f6d..f166e5d05 100644 --- a/dbt/include/bigquery/macros/adapters.sql +++ b/dbt/include/bigquery/macros/adapters.sql @@ -193,63 +193,3 @@ having count(*) > 1 {% do adapter.upload_file(local_file_path, database, table_schema, table_name, kwargs=kwargs) %} {% endmacro %} - -{% macro avoid_require_partition_filter(target, predicates) %} - {%- set raw_partition_by = config.get('partition_by', none) -%} - {%- set partition_config = adapter.parse_partition_by(raw_partition_by) -%} - {% if partition_config and config.get('require_partition_filter') -%} - {%- set partition_field = partition_config.time_partitioning_field() if partition_config.time_ingestion_partitioning else partition_config.field -%} - {% set predicate %} - {{ target }}.`{{ partition_field }}` is null or {{ target }}.`{{ partition_field }}` is not null - {% endset %} - {% do predicates.append(predicate) %} - {%- endif -%} -{% endmacro %} - -{% macro bigquery__get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates=none) -%} - {%- set predicates = [] if incremental_predicates is none else [] + incremental_predicates -%} - {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} - {%- set merge_update_columns = config.get('merge_update_columns') -%} - {%- set merge_exclude_columns = config.get('merge_exclude_columns') -%} - {%- set update_columns = get_merge_update_columns(merge_update_columns, merge_exclude_columns, dest_columns) -%} - {%- set sql_header = config.get('sql_header', none) -%} - - {% if unique_key %} - {% if unique_key is sequence and unique_key is not mapping and unique_key is not string %} - {% for key in unique_key %} - {% set this_key_match %} - DBT_INTERNAL_SOURCE.{{ key }} = DBT_INTERNAL_DEST.{{ key }} - {% endset %} - {% do predicates.append(this_key_match) %} - {% endfor %} - {% else %} - {% set unique_key_match %} - DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }} - {% endset %} - {% do predicates.append(unique_key_match) %} - {% endif %} - {{ avoid_require_partition_filter('DBT_INTERNAL_DEST', predicates) }} - {% else %} - {% do predicates.append('FALSE') %} - {% endif %} - - {{ sql_header if sql_header is not none }} - - MERGE into {{ target }} as DBT_INTERNAL_DEST - using {{ source }} as DBT_INTERNAL_SOURCE - on {{"(" ~ predicates | join(") and (") ~ ")"}} - - {% if unique_key %} - when matched then update set - {% for column_name in update_columns -%} - {{ column_name }} = DBT_INTERNAL_SOURCE.{{ column_name }} - {%- if not loop.last %}, {%- endif %} - {%- endfor %} - {% endif %} - - when not matched then insert - ({{ dest_cols_csv }}) - values - ({{ dest_cols_csv }}) - -{% endmacro %} diff --git a/dbt/include/bigquery/macros/materializations/incremental_strategy/common.sql b/dbt/include/bigquery/macros/materializations/incremental_strategy/common.sql index 9d71ba7c0..1c02f4912 100644 --- a/dbt/include/bigquery/macros/materializations/incremental_strategy/common.sql +++ b/dbt/include/bigquery/macros/materializations/incremental_strategy/common.sql @@ -11,3 +11,23 @@ {%- endif -%} {% endmacro %} + +{% macro predicate_for_avoid_require_partition_filter(target='DBT_INTERNAL_DEST') %} + + {%- set raw_partition_by = config.get('partition_by', none) -%} + {%- set partition_config = adapter.parse_partition_by(raw_partition_by) -%} + {%- set predicate = none -%} + + {% if partition_config and config.get('require_partition_filter') -%} + {%- set partition_field = partition_config.time_partitioning_field() if partition_config.time_ingestion_partitioning else partition_config.field -%} + {% set predicate %} + ( + `{{ target }}`.`{{ partition_field }}` is null + or `{{ target }}`.`{{ partition_field }}` is not null + ) + {% endset %} + {%- endif -%} + + {{ return(predicate) }} + +{% endmacro %} diff --git a/dbt/include/bigquery/macros/materializations/incremental_strategy/merge.sql b/dbt/include/bigquery/macros/materializations/incremental_strategy/merge.sql index 90af66f52..a204caed9 100644 --- a/dbt/include/bigquery/macros/materializations/incremental_strategy/merge.sql +++ b/dbt/include/bigquery/macros/materializations/incremental_strategy/merge.sql @@ -21,7 +21,13 @@ {%- endif -%} {%- endset -%} - {% set build_sql = get_merge_sql(target_relation, source_sql, unique_key, dest_columns, incremental_predicates) %} + {%- set predicates = [] if incremental_predicates is none else [] + incremental_predicates -%} + {%- set avoid_require_partition_filter = predicate_for_avoid_require_partition_filter() -%} + {%- if avoid_require_partition_filter is not none -%} + {% do predicates.append(avoid_require_partition_filter) %} + {%- endif -%} + + {% set build_sql = get_merge_sql(target_relation, source_sql, unique_key, dest_columns, predicates) %} {{ return(build_sql) }} From 8ee823e59762727a3d481a6bfee7eb6f1f2cac6e Mon Sep 17 00:00:00 2001 From: colin-rogers-dbt <111200756+colin-rogers-dbt@users.noreply.github.com> Date: Tue, 2 Jan 2024 09:41:12 -0800 Subject: [PATCH 8/8] Update .changes/unreleased/Fixes-20231105-143145.yaml --- .changes/unreleased/Fixes-20231105-143145.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.changes/unreleased/Fixes-20231105-143145.yaml b/.changes/unreleased/Fixes-20231105-143145.yaml index 715fb0f7f..fd404e395 100644 --- a/.changes/unreleased/Fixes-20231105-143145.yaml +++ b/.changes/unreleased/Fixes-20231105-143145.yaml @@ -1,5 +1,5 @@ kind: Fixes -body: Merge fails when partition is required +body: In incremental models, add dummy merge condition on source partition column when partition is required time: 2023-11-05T14:31:45.869783+09:00 custom: Author: tnk-ysk