Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix insert_overwrite incremental strategy in combination with required partition_filter fails when there are zero rows to insert #994

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changes/unreleased/Fixes-20231105-174940.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Fixes
body: Insert_overwrite incremental strategy in combination with required partition_filter
fails when there are zero rows to insert
time: 2023-11-05T17:49:40.356397+09:00
custom:
Author: tnk-ysk
Issue: "319"
34 changes: 34 additions & 0 deletions dbt/include/bigquery/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,37 @@ having count(*) > 1
{% do adapter.upload_file(local_file_path, database, table_schema, table_name, kwargs=kwargs) %}

{% endmacro %}

{% macro bigquery__get_insert_overwrite_merge_sql(target, source, dest_columns, predicates, include_sql_header) -%}
{#-- The only time include_sql_header is True: --#}
{#-- BigQuery + insert_overwrite strategy + "static" partitions config --#}
{#-- We should consider including the sql header at the materialization level instead --#}

{%- set predicates = [] if predicates is none else [] + predicates -%}
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
{%- set sql_header = config.get('sql_header', none) -%}
{%- set raw_partition_by = config.get('partition_by', none) -%}
{%- set partition_config = adapter.parse_partition_by(raw_partition_by) -%}

{{ sql_header if sql_header is not none and include_sql_header }}
{% if partition_config and config.get('require_partition_filter') -%}
{% set avoid_require_partition %}
DBT_INTERNAL_DEST.{{ partition_config.field }} IS NULL OR DBT_INTERNAL_DEST.{{ partition_config.field }} IS NOT NULL
{% endset %}
{% do predicates.append(avoid_require_partition) %}
{%- endif -%}

merge into {{ target }} as DBT_INTERNAL_DEST
using {{ source }} as DBT_INTERNAL_SOURCE
on FALSE

when not matched by source
{% if predicates %} and {{"(" ~ predicates | join(") and (") ~ ")"}} {% endif %}
then delete

when not matched then insert
({{ dest_cols_csv }})
values
({{ dest_cols_csv }})

{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -555,3 +555,38 @@

select * from data
""".lstrip()

overwrite_zero_rows_sql = """
{{
config(
materialized="incremental",
incremental_strategy='insert_overwrite',
cluster_by="id",
partition_by={
"field": "date_day",
"data_type": "date"
},
post_hook="
create or replace view `{{ schema }}.incremental_overwrite_zero_rows_view`
as select * from {{ this }} where date_day is null or date_day is not null
",
require_partition_filter = true
)
}}


with data as (

select 10 as id, cast('2020-01-01' as date) as date_day union all
select 20 as id, cast('2020-01-01' as date) as date_day union all
select 30 as id, cast('2020-01-02' as date) as date_day union all
select 40 as id, cast('2020-01-02' as date) as date_day

)

select * from data

{% if is_incremental() %}
where false
{% endif %}
""".lstrip()
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
overwrite_day_with_time_ingestion_sql,
overwrite_day_with_time_partition_datetime_sql,
overwrite_static_day_sql,
overwrite_zero_rows_sql,
)


Expand All @@ -50,6 +51,7 @@ def models(self):
"incremental_overwrite_day_with_time_partition.sql": overwrite_day_with_time_ingestion_sql,
"incremental_overwrite_day_with_time_partition_datetime.sql": overwrite_day_with_time_partition_datetime_sql,
"incremental_overwrite_static_day.sql": overwrite_static_day_sql,
"incremental_overwrite_zero_rows.sql": overwrite_zero_rows_sql,
}

@pytest.fixture(scope="class")
Expand Down Expand Up @@ -85,6 +87,7 @@ def test__bigquery_assert_incremental_configurations_apply_the_right_strategy(se
"incremental_overwrite_day_with_time_partition_expected",
),
("incremental_overwrite_static_day", "incremental_overwrite_day_expected"),
("incremental_overwrite_zero_rows_view", "incremental_overwrite_date_expected"),
]
db_with_schema = f"{project.database}.{project.test_schema}"
for incremental_strategy in incremental_strategies:
Expand Down
Loading