diff --git a/.changes/unreleased/Fixes-20231105-143145.yaml b/.changes/unreleased/Fixes-20231105-143145.yaml new file mode 100644 index 000000000..fd404e395 --- /dev/null +++ b/.changes/unreleased/Fixes-20231105-143145.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: In incremental models, add dummy merge condition on source partition column when partition is required +time: 2023-11-05T14:31:45.869783+09:00 +custom: + Author: tnk-ysk + Issue: "792" diff --git a/dbt/include/bigquery/macros/materializations/incremental_strategy/common.sql b/dbt/include/bigquery/macros/materializations/incremental_strategy/common.sql index 9d71ba7c0..1c02f4912 100644 --- a/dbt/include/bigquery/macros/materializations/incremental_strategy/common.sql +++ b/dbt/include/bigquery/macros/materializations/incremental_strategy/common.sql @@ -11,3 +11,23 @@ {%- endif -%} {% endmacro %} + +{% macro predicate_for_avoid_require_partition_filter(target='DBT_INTERNAL_DEST') %} + + {%- set raw_partition_by = config.get('partition_by', none) -%} + {%- set partition_config = adapter.parse_partition_by(raw_partition_by) -%} + {%- set predicate = none -%} + + {% if partition_config and config.get('require_partition_filter') -%} + {%- set partition_field = partition_config.time_partitioning_field() if partition_config.time_ingestion_partitioning else partition_config.field -%} + {% set predicate %} + ( + `{{ target }}`.`{{ partition_field }}` is null + or `{{ target }}`.`{{ partition_field }}` is not null + ) + {% endset %} + {%- endif -%} + + {{ return(predicate) }} + +{% endmacro %} diff --git a/dbt/include/bigquery/macros/materializations/incremental_strategy/merge.sql b/dbt/include/bigquery/macros/materializations/incremental_strategy/merge.sql index 90af66f52..a204caed9 100644 --- a/dbt/include/bigquery/macros/materializations/incremental_strategy/merge.sql +++ b/dbt/include/bigquery/macros/materializations/incremental_strategy/merge.sql @@ -21,7 +21,13 @@ {%- endif -%} {%- endset -%} - {% set build_sql = get_merge_sql(target_relation, source_sql, unique_key, dest_columns, incremental_predicates) %} + {%- set predicates = [] if incremental_predicates is none else [] + incremental_predicates -%} + {%- set avoid_require_partition_filter = predicate_for_avoid_require_partition_filter() -%} + {%- if avoid_require_partition_filter is not none -%} + {% do predicates.append(avoid_require_partition_filter) %} + {%- endif -%} + + {% set build_sql = get_merge_sql(target_relation, source_sql, unique_key, dest_columns, predicates) %} {{ return(build_sql) }} diff --git a/tests/functional/adapter/incremental/incremental_strategy_fixtures.py b/tests/functional/adapter/incremental/incremental_strategy_fixtures.py index a8f0004c5..17391b48d 100644 --- a/tests/functional/adapter/incremental/incremental_strategy_fixtures.py +++ b/tests/functional/adapter/incremental/incremental_strategy_fixtures.py @@ -90,6 +90,63 @@ {% endif %} """.lstrip() +merge_time_with_require_partition_sql = """ +{{ + config( + materialized="incremental", + unique_key="id", + cluster_by="id", + partition_by={ + "field": "date_time", + "data_type": "dateTime" + }, + post_hook=" + create or replace view `{{ schema }}.incremental_merge_time_with_require_partition_view` + as select * from {{ this }} where date_time is null or date_time is not null + ", + require_partition_filter=true + ) +}} + + + +with data as ( + + {% if not is_incremental() %} + + select 1 as id, cast('2020-01-01' as datetime) as date_time union all + select 2 as id, cast('2020-01-01' as datetime) as date_time union all + select 3 as id, cast('2020-01-01' as datetime) as date_time union all + select 4 as id, cast('2020-01-01' as datetime) as date_time + + {% else %} + + select 1 as id, cast('2020-01-01' as datetime) as date_time union all + select 2 as id, cast('2020-01-01' as datetime) as date_time union all + select 3 as id, cast('2020-01-01' as datetime) as date_time union all + select 4 as id, cast('2020-01-02' as datetime) as date_time union all + select 5 as id, cast('2020-01-02' as datetime) as date_time union all + select 6 as id, cast('2020-01-02' as datetime) as date_time + + {% endif %} + +) + +select * from data + +{% if is_incremental() %} +where date_time > ( + select max(date_time) + from {{ this }} + where ( + date_time is null + or date_time is not null + ) +) + +{% endif %} +""".lstrip() + overwrite_date_sql = """ {{ config( diff --git a/tests/functional/adapter/incremental/test_incremental_strategies.py b/tests/functional/adapter/incremental/test_incremental_strategies.py index b3a51ad09..1a339d601 100644 --- a/tests/functional/adapter/incremental/test_incremental_strategies.py +++ b/tests/functional/adapter/incremental/test_incremental_strategies.py @@ -17,6 +17,7 @@ from tests.functional.adapter.incremental.incremental_strategy_fixtures import ( merge_range_sql, merge_time_sql, + merge_time_with_require_partition_sql, overwrite_date_sql, overwrite_day_sql, overwrite_day_with_copy_partitions_sql, @@ -39,6 +40,7 @@ def models(self): return { "incremental_merge_range.sql": merge_range_sql, "incremental_merge_time.sql": merge_time_sql, + "incremental_merge_time_with_require_partition.sql": merge_time_with_require_partition_sql, "incremental_overwrite_date.sql": overwrite_date_sql, "incremental_overwrite_day.sql": overwrite_day_sql, "incremental_overwrite_day_with_copy_partitions.sql": overwrite_day_with_copy_partitions_sql, @@ -65,13 +67,14 @@ def seeds(self): def test__bigquery_assert_incremental_configurations_apply_the_right_strategy(self, project): run_dbt(["seed"]) results = run_dbt() - assert len(results) == 11 + assert len(results) == 12 results = run_dbt() - assert len(results) == 11 + assert len(results) == 12 incremental_strategies = [ ("incremental_merge_range", "merge_expected"), ("incremental_merge_time", "merge_expected"), + ("incremental_merge_time_with_require_partition_view", "merge_expected"), ("incremental_overwrite_time", "incremental_overwrite_time_expected"), ("incremental_overwrite_date", "incremental_overwrite_date_expected"), ("incremental_overwrite_partitions", "incremental_overwrite_date_expected"),