Skip to content

Commit

Permalink
V1.9.0 - Added micro batching, snapshot improvements - Enabled dbt_va…
Browse files Browse the repository at this point in the history
…lid_to, hard_deletes, and unique_key as list snapshot improvements
  • Loading branch information
prdpsvs committed Dec 26, 2024
1 parent 4bd7ea9 commit 2ac5e23
Show file tree
Hide file tree
Showing 10 changed files with 1,142 additions and 111 deletions.
2 changes: 1 addition & 1 deletion dbt/adapters/fabric/fabric_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def valid_incremental_strategies(self):
"""The set of standard builtin strategies which this adapter supports out-of-the-box.
Not used to validate custom strategies defined by end users.
"""
return ["append", "delete+insert", "merge", "insert_overwrite"]
return ["append", "delete+insert", "microbatch"]

# This is for use in the test suite
def run_sql_for_tests(self, sql, fetch, conn):
Expand Down
2 changes: 1 addition & 1 deletion dbt/include/fabric/macros/adapters/columns.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{% macro fabric__get_empty_subquery_sql(select_sql, select_sql_header=none) %}
{% if sql.strip().lower().startswith('with') %}
{% if select_sql.strip().lower().startswith('with') %}
{{ select_sql }}
{% else -%}
select * from (
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{% macro fabric__get_incremental_default_sql(arg_dict) %}

{% if arg_dict["unique_key"] %}
-- Delete + Insert Strategy, calls get_delete_insert_merge_sql
{% do return(get_incremental_delete_insert_sql(arg_dict)) %}
{% else %}
-- Incremental Append will insert data into target table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,35 @@
from {{ source }}
){{ query_label }}
{% endmacro %}

{% macro fabric__get_incremental_microbatch_sql(arg_dict) %}
{%- set target = arg_dict["target_relation"] -%}
{%- set source = arg_dict["temp_relation"] -%}
{%- set dest_columns = arg_dict["dest_columns"] -%}
{%- set incremental_predicates = [] if arg_dict.get('incremental_predicates') is none else arg_dict.get('incremental_predicates') -%}

{#-- Add additional incremental_predicates to filter for batch --#}
{% if model.config.get("__dbt_internal_microbatch_event_time_start") -%}
{{ log("incremenal append event start time > DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " >= '" ~ model.config.__dbt_internal_microbatch_event_time_start ~ "'") }}
{% do incremental_predicates.append("DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " >= '" ~ model.config.__dbt_internal_microbatch_event_time_start ~ "'") %}
{% endif %}
{% if model.config.__dbt_internal_microbatch_event_time_end -%}
{{ log("incremenal append event end time < DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " < '" ~ model.config.__dbt_internal_microbatch_event_time_end ~ "'") }}
{% do incremental_predicates.append("DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " < '" ~ model.config.__dbt_internal_microbatch_event_time_end ~ "'") %}
{% endif %}
{% do arg_dict.update({'incremental_predicates': incremental_predicates}) %}

delete DBT_INTERNAL_TARGET from {{ target }} AS DBT_INTERNAL_TARGET
where (
{% for predicate in incremental_predicates %}
{%- if not loop.first %}and {% endif -%} {{ predicate }}
{% endfor %}
);

{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
insert into {{ target }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ source }}
)
{% endmacro %}
189 changes: 101 additions & 88 deletions dbt/include/fabric/macros/materializations/snapshots/helpers.sql
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@


{% macro fabric__build_snapshot_table(strategy, relation) %}

{% set columns = config.get('snapshot_table_column_names') or get_snapshot_table_column_names() %}
select *,
{{ strategy.scd_id }} as dbt_scd_id,
{{ strategy.updated_at }} as dbt_updated_at,
{{ strategy.updated_at }} as dbt_valid_from,
nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to
{{ strategy.scd_id }} as {{ columns.dbt_scd_id }},
{{ strategy.updated_at }} as {{ columns.dbt_updated_at }},
{{ strategy.updated_at }} as {{ columns.dbt_valid_from }},
{{ get_dbt_valid_to_current(strategy, columns) }}
{%- if strategy.hard_deletes == 'new_record' -%}
, 'False' as {{ columns.dbt_is_deleted }}
{% endif -%}
from (
select * from {{ relation }}
) sbq
Expand All @@ -31,115 +34,125 @@

{% macro fabric__snapshot_staging_table(strategy, temp_snapshot_relation, target_relation) -%}

with snapshot_query as (
{% set columns = config.get('snapshot_table_column_names') or get_snapshot_table_column_names() %}

with snapshot_query as (
select * from {{ temp_snapshot_relation }}

),

snapshotted_data as (

select *,
{{ strategy.unique_key }} as dbt_unique_key

{{ unique_key_fields(strategy.unique_key) }}
from {{ target_relation }}
where dbt_valid_to is null

where
{% if config.get('dbt_valid_to_current') %}
{# Check for either dbt_valid_to_current OR null, in order to correctly update records with nulls #}
( {{ columns.dbt_valid_to }} = {{ config.get('dbt_valid_to_current') }} or {{ columns.dbt_valid_to }} is null)
{% else %}
{{ columns.dbt_valid_to }} is null
{% endif %}
),

insertions_source_data as (

select
*,
{{ strategy.unique_key }} as dbt_unique_key,
{{ strategy.updated_at }} as dbt_updated_at,
{{ strategy.updated_at }} as dbt_valid_from,
nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to,
{{ strategy.scd_id }} as dbt_scd_id

select *,
{{ unique_key_fields(strategy.unique_key) }},
{{ strategy.updated_at }} as {{ columns.dbt_updated_at }},
{{ strategy.updated_at }} as {{ columns.dbt_valid_from }},
{{ get_dbt_valid_to_current(strategy, columns) }},
{{ strategy.scd_id }} as {{ columns.dbt_scd_id }}
from snapshot_query
),

updates_source_data as (

select
*,
{{ strategy.unique_key }} as dbt_unique_key,
{{ strategy.updated_at }} as dbt_updated_at,
{{ strategy.updated_at }} as dbt_valid_from,
{{ strategy.updated_at }} as dbt_valid_to

from snapshot_query
),

{%- if strategy.invalidate_hard_deletes %}

deletes_source_data as (

select
*,
{{ strategy.unique_key }} as dbt_unique_key
select *,
{{ unique_key_fields(strategy.unique_key) }},
{{ strategy.updated_at }} as {{ columns.dbt_updated_at }},
{{ strategy.updated_at }} as {{ columns.dbt_valid_from }},
{{ strategy.updated_at }} as {{ columns.dbt_valid_to }}
from snapshot_query
),
{%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %}
deletes_source_data as (
select *, {{ unique_key_fields(strategy.unique_key) }}
from snapshot_query
),
{% endif %}

insertions as (

select
'insert' as dbt_change_type,
source_data.*

select 'insert' as dbt_change_type, source_data.*
{%- if strategy.hard_deletes == 'new_record' -%}
,'False' as {{ columns.dbt_is_deleted }}
{%- endif %}
from insertions_source_data as source_data
left outer join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
where snapshotted_data.dbt_unique_key is null
or (
snapshotted_data.dbt_unique_key is not null
and (
{{ strategy.row_changed }}
)
)

left outer join snapshotted_data
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
where {{ unique_key_is_null(strategy.unique_key, "snapshotted_data") }}
or ({{ unique_key_is_not_null(strategy.unique_key, "snapshotted_data") }} and ({{ strategy.row_changed }}))
),

updates as (

select
'update' as dbt_change_type,
source_data.*,
snapshotted_data.dbt_scd_id

select 'update' as dbt_change_type, source_data.*,
snapshotted_data.{{ columns.dbt_scd_id }}
{%- if strategy.hard_deletes == 'new_record' -%}
, snapshotted_data.{{ columns.dbt_is_deleted }}
{%- endif %}
from updates_source_data as source_data
join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
where (
{{ strategy.row_changed }}
)
join snapshotted_data
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
where ({{ strategy.row_changed }})
)

{%- if strategy.invalidate_hard_deletes -%}
,

deletes as (

select
'delete' as dbt_change_type,
{%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %}
,
deletes as (
select 'delete' as dbt_change_type,
source_data.*,
{{ snapshot_get_time() }} as dbt_valid_from,
{{ snapshot_get_time() }} as dbt_updated_at,
{{ snapshot_get_time() }} as dbt_valid_to,
snapshotted_data.dbt_scd_id

from snapshotted_data
left join deletes_source_data as source_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
where source_data.dbt_unique_key is null
)
{{ snapshot_get_time() }} as {{ columns.dbt_valid_from }},
{{ snapshot_get_time() }} as {{ columns.dbt_updated_at }},
{{ snapshot_get_time() }} as {{ columns.dbt_valid_to }},
snapshotted_data.{{ columns.dbt_scd_id }}
{%- if strategy.hard_deletes == 'new_record' -%}
, snapshotted_data.{{ columns.dbt_is_deleted }}
{%- endif %}
from snapshotted_data
left join deletes_source_data as source_data
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
where {{ unique_key_is_null(strategy.unique_key, "source_data") }}
)
{%- endif %}
{%- if strategy.hard_deletes == 'new_record' %}
{%set source_query = "select * from "~temp_snapshot_relation%}
{% set source_sql_cols = get_column_schema_from_query(source_query) %}
,
deletion_records as (

select
'insert' as dbt_change_type,
{%- for col in source_sql_cols -%}
snapshotted_data.{{ adapter.quote(col.column) }},
{% endfor -%}
{%- if strategy.unique_key | is_list -%}
{%- for key in strategy.unique_key -%}
snapshotted_data.{{ key }} as dbt_unique_key_{{ loop.index }},
{% endfor -%}
{%- else -%}
snapshotted_data.dbt_unique_key as dbt_unique_key,
{% endif -%}
{{ snapshot_get_time() }} as {{ columns.dbt_valid_from }},
{{ snapshot_get_time() }} as {{ columns.dbt_updated_at }},
snapshotted_data.{{ columns.dbt_valid_to }} as {{ columns.dbt_valid_to }},
snapshotted_data.{{ columns.dbt_scd_id }},
'True' as {{ columns.dbt_is_deleted }}
from snapshotted_data
left join deletes_source_data as source_data
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
where {{ unique_key_is_null(strategy.unique_key, "source_data") }}
)
{%- endif %}
select * from insertions
union all
select * from updates
{%- if strategy.invalidate_hard_deletes %}
union all
select * from deletes
{%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %}
union all
select * from deletes
{%- endif %}
{%- if strategy.hard_deletes == 'new_record' %}
union all
select * from deletion_records
{%- endif %}

{%- endmacro %}
Expand Down
26 changes: 15 additions & 11 deletions dbt/include/fabric/macros/materializations/snapshots/snapshot.sql
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
{% if not target_relation_exists %}

{% set build_sql = build_snapshot_table(strategy, temp_snapshot_relation) %}
{% set build_or_select_sql = build_sql %}

-- naming a temp relation
{% set tmp_relation_view = target_relation.incorporate(path={"identifier": target_relation.identifier ~ '__dbt_tmp_vw'}, type='view')-%}
Expand All @@ -51,40 +52,43 @@

{% else %}

{{ adapter.valid_snapshot_target(target_relation) }}
{% set columns = config.get("snapshot_meta_column_names") or get_snapshot_table_column_names() %}
{{ adapter.valid_snapshot_target(target_relation, columns) }}
{% set build_or_select_sql = snapshot_staging_table(strategy, temp_snapshot_relation, target_relation) %}
{% set staging_table = build_snapshot_staging_table(strategy, temp_snapshot_relation, target_relation) %}
-- this may no-op if the database does not require column expansion
{% do adapter.expand_target_column_types(from_relation=staging_table,
to_relation=target_relation) %}

{% set remove_columns = ['dbt_change_type', 'DBT_CHANGE_TYPE', 'dbt_unique_key', 'DBT_UNIQUE_KEY'] %}
{% if unique_key | is_list %}
{% for key in strategy.unique_key %}
{{ remove_columns.append('dbt_unique_key_' + loop.index|string) }}
{{ remove_columns.append('DBT_UNIQUE_KEY_' + loop.index|string) }}
{% endfor %}
{% endif %}
{% set missing_columns = adapter.get_missing_columns(staging_table, target_relation)
| rejectattr('name', 'equalto', 'dbt_change_type')
| rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE')
| rejectattr('name', 'equalto', 'dbt_unique_key')
| rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY')
| rejectattr('name', 'in', remove_columns)
| list %}
{% if missing_columns|length > 0 %}
{{log("Missing columns length is: "~ missing_columns|length)}}
{% do create_columns(target_relation, missing_columns) %}
{% endif %}
{% set source_columns = adapter.get_columns_in_relation(staging_table)
| rejectattr('name', 'equalto', 'dbt_change_type')
| rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE')
| rejectattr('name', 'equalto', 'dbt_unique_key')
| rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY')
| rejectattr('name', 'in', remove_columns)
| list %}
{% set quoted_source_columns = [] %}
{% for column in source_columns %}
{% do quoted_source_columns.append(adapter.quote(column.name)) %}
{% endfor %}

{% set final_sql = snapshot_merge_sql(
target = target_relation,
source = staging_table,
insert_cols = quoted_source_columns
)
%}
{% endif %}

{{ check_time_data_types(build_or_select_sql) }}
{% call statement('main') %}
{{ final_sql }}
{% endcall %}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{% macro fabric__snapshot_merge_sql(target, source, insert_cols) %}

{%- set insert_cols_csv = insert_cols | join(', ') -%}
{%- set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() -%}
{%- set target_table = target.include(database=False) -%}
{%- set source_table = source.include(database=False) -%}
{% set target_columns_list = [] %}
Expand All @@ -9,17 +10,21 @@
{% endfor %}
{%- set target_columns = target_columns_list | join(', ') -%}

UPDATE DBT_INTERNAL_DEST
SET dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to
FROM {{ target_table }} as DBT_INTERNAL_DEST
INNER JOIN {{ source_table }} as DBT_INTERNAL_SOURCE
on DBT_INTERNAL_SOURCE.dbt_scd_id = DBT_INTERNAL_DEST.dbt_scd_id
WHERE DBT_INTERNAL_DEST.dbt_valid_to is null
AND DBT_INTERNAL_SOURCE.dbt_change_type in ('update', 'delete')
update DBT_INTERNAL_DEST
set {{ columns.dbt_valid_to }} = DBT_INTERNAL_SOURCE.{{ columns.dbt_valid_to }}
from {{ target_table }} as DBT_INTERNAL_DEST
inner join {{ source_table }} as DBT_INTERNAL_SOURCE
on DBT_INTERNAL_SOURCE.{{ columns.dbt_scd_id }} = DBT_INTERNAL_DEST.{{ columns.dbt_scd_id }}
where DBT_INTERNAL_SOURCE.dbt_change_type in ('update', 'delete')
{% if config.get("dbt_valid_to_current") %}
and (DBT_INTERNAL_DEST.{{ columns.dbt_valid_to }} = {{ config.get('dbt_valid_to_current') }} or DBT_INTERNAL_DEST.{{ columns.dbt_valid_to }} is null)
{% else %}
and DBT_INTERNAL_DEST.{{ columns.dbt_valid_to }} is null
{% endif %}
{{ apply_label() }}

INSERT INTO {{ target_table }} ({{ insert_cols_csv }})
SELECT {{target_columns}} FROM {{ source_table }} as DBT_INTERNAL_SOURCE
WHERE DBT_INTERNAL_SOURCE.dbt_change_type = 'insert'
insert into {{ target_table }} ({{ insert_cols_csv }})
select {{target_columns}} from {{ source_table }} as DBT_INTERNAL_SOURCE
where DBT_INTERNAL_SOURCE.dbt_change_type = 'insert'
{{ apply_label() }}
{% endmacro %}
Loading

0 comments on commit 2ac5e23

Please sign in to comment.