From 2ac5e237e4b5bdf55f46544544e3e41bc59ba526 Mon Sep 17 00:00:00 2001 From: Pradeep Srikakolapu Date: Wed, 25 Dec 2024 22:48:48 -0800 Subject: [PATCH] V1.9.0 - Added micro batching, snapshot improvements - Enabled dbt_valid_to, hard_deletes, and unique_key as list snapshot improvements --- dbt/adapters/fabric/fabric_adapter.py | 2 +- .../fabric/macros/adapters/columns.sql | 2 +- .../incremental/incremental_strategies.sql | 1 + .../models/incremental/merge.sql | 32 + .../materializations/snapshots/helpers.sql | 189 ++--- .../materializations/snapshots/snapshot.sql | 26 +- .../snapshots/snapshot_merge.sql | 25 +- .../adapter/test_incremental_microbatch.py | 38 + .../adapter/test_snapshot_new_record_mode.py | 225 ++++++ .../adapter/test_snpashot_configs.py | 713 ++++++++++++++++++ 10 files changed, 1142 insertions(+), 111 deletions(-) create mode 100644 tests/functional/adapter/test_incremental_microbatch.py create mode 100644 tests/functional/adapter/test_snapshot_new_record_mode.py create mode 100644 tests/functional/adapter/test_snpashot_configs.py diff --git a/dbt/adapters/fabric/fabric_adapter.py b/dbt/adapters/fabric/fabric_adapter.py index 92ccb24..57112dd 100644 --- a/dbt/adapters/fabric/fabric_adapter.py +++ b/dbt/adapters/fabric/fabric_adapter.py @@ -166,7 +166,7 @@ def valid_incremental_strategies(self): """The set of standard builtin strategies which this adapter supports out-of-the-box. Not used to validate custom strategies defined by end users. """ - return ["append", "delete+insert", "merge", "insert_overwrite"] + return ["append", "delete+insert", "microbatch"] # This is for use in the test suite def run_sql_for_tests(self, sql, fetch, conn): diff --git a/dbt/include/fabric/macros/adapters/columns.sql b/dbt/include/fabric/macros/adapters/columns.sql index 26c8fc2..a3d3eb4 100644 --- a/dbt/include/fabric/macros/adapters/columns.sql +++ b/dbt/include/fabric/macros/adapters/columns.sql @@ -1,5 +1,5 @@ {% macro fabric__get_empty_subquery_sql(select_sql, select_sql_header=none) %} - {% if sql.strip().lower().startswith('with') %} + {% if select_sql.strip().lower().startswith('with') %} {{ select_sql }} {% else -%} select * from ( diff --git a/dbt/include/fabric/macros/materializations/models/incremental/incremental_strategies.sql b/dbt/include/fabric/macros/materializations/models/incremental/incremental_strategies.sql index c95ce81..66078e5 100644 --- a/dbt/include/fabric/macros/materializations/models/incremental/incremental_strategies.sql +++ b/dbt/include/fabric/macros/materializations/models/incremental/incremental_strategies.sql @@ -1,6 +1,7 @@ {% macro fabric__get_incremental_default_sql(arg_dict) %} {% if arg_dict["unique_key"] %} + -- Delete + Insert Strategy, calls get_delete_insert_merge_sql {% do return(get_incremental_delete_insert_sql(arg_dict)) %} {% else %} -- Incremental Append will insert data into target table. diff --git a/dbt/include/fabric/macros/materializations/models/incremental/merge.sql b/dbt/include/fabric/macros/materializations/models/incremental/merge.sql index b5a0903..7b13bc8 100644 --- a/dbt/include/fabric/macros/materializations/models/incremental/merge.sql +++ b/dbt/include/fabric/macros/materializations/models/incremental/merge.sql @@ -58,3 +58,35 @@ from {{ source }} ){{ query_label }} {% endmacro %} + +{% macro fabric__get_incremental_microbatch_sql(arg_dict) %} + {%- set target = arg_dict["target_relation"] -%} + {%- set source = arg_dict["temp_relation"] -%} + {%- set dest_columns = arg_dict["dest_columns"] -%} + {%- set incremental_predicates = [] if arg_dict.get('incremental_predicates') is none else arg_dict.get('incremental_predicates') -%} + + {#-- Add additional incremental_predicates to filter for batch --#} + {% if model.config.get("__dbt_internal_microbatch_event_time_start") -%} + {{ log("incremenal append event start time > DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " >= '" ~ model.config.__dbt_internal_microbatch_event_time_start ~ "'") }} + {% do incremental_predicates.append("DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " >= '" ~ model.config.__dbt_internal_microbatch_event_time_start ~ "'") %} + {% endif %} + {% if model.config.__dbt_internal_microbatch_event_time_end -%} + {{ log("incremenal append event end time < DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " < '" ~ model.config.__dbt_internal_microbatch_event_time_end ~ "'") }} + {% do incremental_predicates.append("DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " < '" ~ model.config.__dbt_internal_microbatch_event_time_end ~ "'") %} + {% endif %} + {% do arg_dict.update({'incremental_predicates': incremental_predicates}) %} + + delete DBT_INTERNAL_TARGET from {{ target }} AS DBT_INTERNAL_TARGET + where ( + {% for predicate in incremental_predicates %} + {%- if not loop.first %}and {% endif -%} {{ predicate }} + {% endfor %} + ); + + {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} + insert into {{ target }} ({{ dest_cols_csv }}) + ( + select {{ dest_cols_csv }} + from {{ source }} + ) +{% endmacro %} diff --git a/dbt/include/fabric/macros/materializations/snapshots/helpers.sql b/dbt/include/fabric/macros/materializations/snapshots/helpers.sql index d5915e4..8b86822 100644 --- a/dbt/include/fabric/macros/materializations/snapshots/helpers.sql +++ b/dbt/include/fabric/macros/materializations/snapshots/helpers.sql @@ -17,12 +17,15 @@ {% macro fabric__build_snapshot_table(strategy, relation) %} - + {% set columns = config.get('snapshot_table_column_names') or get_snapshot_table_column_names() %} select *, - {{ strategy.scd_id }} as dbt_scd_id, - {{ strategy.updated_at }} as dbt_updated_at, - {{ strategy.updated_at }} as dbt_valid_from, - nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to + {{ strategy.scd_id }} as {{ columns.dbt_scd_id }}, + {{ strategy.updated_at }} as {{ columns.dbt_updated_at }}, + {{ strategy.updated_at }} as {{ columns.dbt_valid_from }}, + {{ get_dbt_valid_to_current(strategy, columns) }} + {%- if strategy.hard_deletes == 'new_record' -%} + , 'False' as {{ columns.dbt_is_deleted }} + {% endif -%} from ( select * from {{ relation }} ) sbq @@ -31,115 +34,125 @@ {% macro fabric__snapshot_staging_table(strategy, temp_snapshot_relation, target_relation) -%} - with snapshot_query as ( + {% set columns = config.get('snapshot_table_column_names') or get_snapshot_table_column_names() %} + with snapshot_query as ( select * from {{ temp_snapshot_relation }} - ), - snapshotted_data as ( - select *, - {{ strategy.unique_key }} as dbt_unique_key - + {{ unique_key_fields(strategy.unique_key) }} from {{ target_relation }} - where dbt_valid_to is null - + where + {% if config.get('dbt_valid_to_current') %} + {# Check for either dbt_valid_to_current OR null, in order to correctly update records with nulls #} + ( {{ columns.dbt_valid_to }} = {{ config.get('dbt_valid_to_current') }} or {{ columns.dbt_valid_to }} is null) + {% else %} + {{ columns.dbt_valid_to }} is null + {% endif %} ), - insertions_source_data as ( - - select - *, - {{ strategy.unique_key }} as dbt_unique_key, - {{ strategy.updated_at }} as dbt_updated_at, - {{ strategy.updated_at }} as dbt_valid_from, - nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to, - {{ strategy.scd_id }} as dbt_scd_id - + select *, + {{ unique_key_fields(strategy.unique_key) }}, + {{ strategy.updated_at }} as {{ columns.dbt_updated_at }}, + {{ strategy.updated_at }} as {{ columns.dbt_valid_from }}, + {{ get_dbt_valid_to_current(strategy, columns) }}, + {{ strategy.scd_id }} as {{ columns.dbt_scd_id }} from snapshot_query ), - updates_source_data as ( - - select - *, - {{ strategy.unique_key }} as dbt_unique_key, - {{ strategy.updated_at }} as dbt_updated_at, - {{ strategy.updated_at }} as dbt_valid_from, - {{ strategy.updated_at }} as dbt_valid_to - - from snapshot_query - ), - - {%- if strategy.invalidate_hard_deletes %} - - deletes_source_data as ( - - select - *, - {{ strategy.unique_key }} as dbt_unique_key + select *, + {{ unique_key_fields(strategy.unique_key) }}, + {{ strategy.updated_at }} as {{ columns.dbt_updated_at }}, + {{ strategy.updated_at }} as {{ columns.dbt_valid_from }}, + {{ strategy.updated_at }} as {{ columns.dbt_valid_to }} from snapshot_query ), + {%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %} + deletes_source_data as ( + select *, {{ unique_key_fields(strategy.unique_key) }} + from snapshot_query + ), {% endif %} - insertions as ( - - select - 'insert' as dbt_change_type, - source_data.* - + select 'insert' as dbt_change_type, source_data.* + {%- if strategy.hard_deletes == 'new_record' -%} + ,'False' as {{ columns.dbt_is_deleted }} + {%- endif %} from insertions_source_data as source_data - left outer join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key - where snapshotted_data.dbt_unique_key is null - or ( - snapshotted_data.dbt_unique_key is not null - and ( - {{ strategy.row_changed }} - ) - ) - + left outer join snapshotted_data + on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }} + where {{ unique_key_is_null(strategy.unique_key, "snapshotted_data") }} + or ({{ unique_key_is_not_null(strategy.unique_key, "snapshotted_data") }} and ({{ strategy.row_changed }})) ), - updates as ( - - select - 'update' as dbt_change_type, - source_data.*, - snapshotted_data.dbt_scd_id - + select 'update' as dbt_change_type, source_data.*, + snapshotted_data.{{ columns.dbt_scd_id }} + {%- if strategy.hard_deletes == 'new_record' -%} + , snapshotted_data.{{ columns.dbt_is_deleted }} + {%- endif %} from updates_source_data as source_data - join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key - where ( - {{ strategy.row_changed }} - ) + join snapshotted_data + on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }} + where ({{ strategy.row_changed }}) ) - - {%- if strategy.invalidate_hard_deletes -%} - , - - deletes as ( - - select - 'delete' as dbt_change_type, + {%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %} + , + deletes as ( + select 'delete' as dbt_change_type, source_data.*, - {{ snapshot_get_time() }} as dbt_valid_from, - {{ snapshot_get_time() }} as dbt_updated_at, - {{ snapshot_get_time() }} as dbt_valid_to, - snapshotted_data.dbt_scd_id - - from snapshotted_data - left join deletes_source_data as source_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key - where source_data.dbt_unique_key is null - ) + {{ snapshot_get_time() }} as {{ columns.dbt_valid_from }}, + {{ snapshot_get_time() }} as {{ columns.dbt_updated_at }}, + {{ snapshot_get_time() }} as {{ columns.dbt_valid_to }}, + snapshotted_data.{{ columns.dbt_scd_id }} + {%- if strategy.hard_deletes == 'new_record' -%} + , snapshotted_data.{{ columns.dbt_is_deleted }} + {%- endif %} + from snapshotted_data + left join deletes_source_data as source_data + on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }} + where {{ unique_key_is_null(strategy.unique_key, "source_data") }} + ) {%- endif %} + {%- if strategy.hard_deletes == 'new_record' %} + {%set source_query = "select * from "~temp_snapshot_relation%} + {% set source_sql_cols = get_column_schema_from_query(source_query) %} + , + deletion_records as ( + select + 'insert' as dbt_change_type, + {%- for col in source_sql_cols -%} + snapshotted_data.{{ adapter.quote(col.column) }}, + {% endfor -%} + {%- if strategy.unique_key | is_list -%} + {%- for key in strategy.unique_key -%} + snapshotted_data.{{ key }} as dbt_unique_key_{{ loop.index }}, + {% endfor -%} + {%- else -%} + snapshotted_data.dbt_unique_key as dbt_unique_key, + {% endif -%} + {{ snapshot_get_time() }} as {{ columns.dbt_valid_from }}, + {{ snapshot_get_time() }} as {{ columns.dbt_updated_at }}, + snapshotted_data.{{ columns.dbt_valid_to }} as {{ columns.dbt_valid_to }}, + snapshotted_data.{{ columns.dbt_scd_id }}, + 'True' as {{ columns.dbt_is_deleted }} + from snapshotted_data + left join deletes_source_data as source_data + on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }} + where {{ unique_key_is_null(strategy.unique_key, "source_data") }} + ) + {%- endif %} select * from insertions union all select * from updates - {%- if strategy.invalidate_hard_deletes %} - union all - select * from deletes + {%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %} + union all + select * from deletes + {%- endif %} + {%- if strategy.hard_deletes == 'new_record' %} + union all + select * from deletion_records {%- endif %} {%- endmacro %} diff --git a/dbt/include/fabric/macros/materializations/snapshots/snapshot.sql b/dbt/include/fabric/macros/materializations/snapshots/snapshot.sql index 9ed2f91..48600af 100644 --- a/dbt/include/fabric/macros/materializations/snapshots/snapshot.sql +++ b/dbt/include/fabric/macros/materializations/snapshots/snapshot.sql @@ -40,6 +40,7 @@ {% if not target_relation_exists %} {% set build_sql = build_snapshot_table(strategy, temp_snapshot_relation) %} + {% set build_or_select_sql = build_sql %} -- naming a temp relation {% set tmp_relation_view = target_relation.incorporate(path={"identifier": target_relation.identifier ~ '__dbt_tmp_vw'}, type='view')-%} @@ -51,32 +52,35 @@ {% else %} - {{ adapter.valid_snapshot_target(target_relation) }} + {% set columns = config.get("snapshot_meta_column_names") or get_snapshot_table_column_names() %} + {{ adapter.valid_snapshot_target(target_relation, columns) }} + {% set build_or_select_sql = snapshot_staging_table(strategy, temp_snapshot_relation, target_relation) %} {% set staging_table = build_snapshot_staging_table(strategy, temp_snapshot_relation, target_relation) %} -- this may no-op if the database does not require column expansion {% do adapter.expand_target_column_types(from_relation=staging_table, to_relation=target_relation) %} + + {% set remove_columns = ['dbt_change_type', 'DBT_CHANGE_TYPE', 'dbt_unique_key', 'DBT_UNIQUE_KEY'] %} + {% if unique_key | is_list %} + {% for key in strategy.unique_key %} + {{ remove_columns.append('dbt_unique_key_' + loop.index|string) }} + {{ remove_columns.append('DBT_UNIQUE_KEY_' + loop.index|string) }} + {% endfor %} + {% endif %} {% set missing_columns = adapter.get_missing_columns(staging_table, target_relation) - | rejectattr('name', 'equalto', 'dbt_change_type') - | rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE') - | rejectattr('name', 'equalto', 'dbt_unique_key') - | rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY') + | rejectattr('name', 'in', remove_columns) | list %} {% if missing_columns|length > 0 %} {{log("Missing columns length is: "~ missing_columns|length)}} {% do create_columns(target_relation, missing_columns) %} {% endif %} {% set source_columns = adapter.get_columns_in_relation(staging_table) - | rejectattr('name', 'equalto', 'dbt_change_type') - | rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE') - | rejectattr('name', 'equalto', 'dbt_unique_key') - | rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY') + | rejectattr('name', 'in', remove_columns) | list %} {% set quoted_source_columns = [] %} {% for column in source_columns %} {% do quoted_source_columns.append(adapter.quote(column.name)) %} {% endfor %} - {% set final_sql = snapshot_merge_sql( target = target_relation, source = staging_table, @@ -84,7 +88,7 @@ ) %} {% endif %} - + {{ check_time_data_types(build_or_select_sql) }} {% call statement('main') %} {{ final_sql }} {% endcall %} diff --git a/dbt/include/fabric/macros/materializations/snapshots/snapshot_merge.sql b/dbt/include/fabric/macros/materializations/snapshots/snapshot_merge.sql index e2d3e99..501d560 100644 --- a/dbt/include/fabric/macros/materializations/snapshots/snapshot_merge.sql +++ b/dbt/include/fabric/macros/materializations/snapshots/snapshot_merge.sql @@ -1,6 +1,7 @@ {% macro fabric__snapshot_merge_sql(target, source, insert_cols) %} {%- set insert_cols_csv = insert_cols | join(', ') -%} + {%- set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() -%} {%- set target_table = target.include(database=False) -%} {%- set source_table = source.include(database=False) -%} {% set target_columns_list = [] %} @@ -9,17 +10,21 @@ {% endfor %} {%- set target_columns = target_columns_list | join(', ') -%} - UPDATE DBT_INTERNAL_DEST - SET dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to - FROM {{ target_table }} as DBT_INTERNAL_DEST - INNER JOIN {{ source_table }} as DBT_INTERNAL_SOURCE - on DBT_INTERNAL_SOURCE.dbt_scd_id = DBT_INTERNAL_DEST.dbt_scd_id - WHERE DBT_INTERNAL_DEST.dbt_valid_to is null - AND DBT_INTERNAL_SOURCE.dbt_change_type in ('update', 'delete') + update DBT_INTERNAL_DEST + set {{ columns.dbt_valid_to }} = DBT_INTERNAL_SOURCE.{{ columns.dbt_valid_to }} + from {{ target_table }} as DBT_INTERNAL_DEST + inner join {{ source_table }} as DBT_INTERNAL_SOURCE + on DBT_INTERNAL_SOURCE.{{ columns.dbt_scd_id }} = DBT_INTERNAL_DEST.{{ columns.dbt_scd_id }} + where DBT_INTERNAL_SOURCE.dbt_change_type in ('update', 'delete') + {% if config.get("dbt_valid_to_current") %} + and (DBT_INTERNAL_DEST.{{ columns.dbt_valid_to }} = {{ config.get('dbt_valid_to_current') }} or DBT_INTERNAL_DEST.{{ columns.dbt_valid_to }} is null) + {% else %} + and DBT_INTERNAL_DEST.{{ columns.dbt_valid_to }} is null + {% endif %} {{ apply_label() }} - INSERT INTO {{ target_table }} ({{ insert_cols_csv }}) - SELECT {{target_columns}} FROM {{ source_table }} as DBT_INTERNAL_SOURCE - WHERE DBT_INTERNAL_SOURCE.dbt_change_type = 'insert' + insert into {{ target_table }} ({{ insert_cols_csv }}) + select {{target_columns}} from {{ source_table }} as DBT_INTERNAL_SOURCE + where DBT_INTERNAL_SOURCE.dbt_change_type = 'insert' {{ apply_label() }} {% endmacro %} diff --git a/tests/functional/adapter/test_incremental_microbatch.py b/tests/functional/adapter/test_incremental_microbatch.py new file mode 100644 index 0000000..eb2831e --- /dev/null +++ b/tests/functional/adapter/test_incremental_microbatch.py @@ -0,0 +1,38 @@ +import pytest +from dbt.tests.adapter.incremental.test_incremental_microbatch import BaseMicrobatch + +# No requirement for a unique_id for snowflake microbatch! +_microbatch_model_no_unique_id_sql = """ +{{ config(materialized='incremental', incremental_strategy='microbatch', event_time='event_time', batch_size='day', begin='2020-01-01 00:00:00.000000') }} +select * from {{ ref('input_model') }} +""" + +_input_model_sql = """ +{{ config(materialized='table', event_time='event_time') }} +select 1 as id, '2020-01-01 00:00:00.000000' as event_time +union all +select 2 as id, '2020-01-02 00:00:00.000000' as event_time +union all +select 3 as id, '2020-01-03 00:00:00.000000' as event_time +""" + + +class TestFabricMicrobatch(BaseMicrobatch): + @pytest.fixture(scope="class") + def microbatch_model_sql(self) -> str: + return _microbatch_model_no_unique_id_sql + + @pytest.fixture(scope="class") + def input_model_sql(self) -> str: + """ + This is the SQL that defines the input model to the microbatch model, including any {{ config(..) }}. + event_time is a required configuration of this input + """ + return _input_model_sql + + @pytest.fixture(scope="class") + def insert_two_rows_sql(self, project) -> str: + test_schema_relation = project.adapter.Relation.create( + database=project.database, schema=project.test_schema + ) + return f"insert into {test_schema_relation}.input_model (id, event_time) values (4, '2020-01-04 00:00:00.000000'), (5, '2020-01-05 00:00:00.000000')" diff --git a/tests/functional/adapter/test_snapshot_new_record_mode.py b/tests/functional/adapter/test_snapshot_new_record_mode.py new file mode 100644 index 0000000..49a508b --- /dev/null +++ b/tests/functional/adapter/test_snapshot_new_record_mode.py @@ -0,0 +1,225 @@ +import pytest +from dbt.tests.util import check_relations_equal, run_dbt + +_seed_new_record_mode = """ +create table {database}.{schema}.seed ( + id INTEGER, + first_name VARCHAR(50), + last_name VARCHAR(50), + email VARCHAR(50), + gender VARCHAR(50), + ip_address VARCHAR(20), + updated_at DATETIME2(6) +); + +create table {database}.{schema}.snapshot_expected ( + id INTEGER, + first_name VARCHAR(50), + last_name VARCHAR(50), + email VARCHAR(50), + gender VARCHAR(50), + ip_address VARCHAR(20), + + -- snapshotting fields + updated_at DATETIME2(6), + dbt_valid_from DATETIME2(6), + dbt_valid_to DATETIME2(6), + dbt_scd_id VARCHAR(50), + dbt_updated_at DATETIME2(6), + dbt_is_deleted VARCHAR(50) +); + + +-- seed inserts +-- use the same email for two users to verify that duplicated check_cols values +-- are handled appropriately +insert into {database}.{schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(1, 'Judith', 'Kennedy', '(not provided)', 'Female', '54.60.24.128', '2015-12-24 12:19:28'), +(2, 'Arthur', 'Kelly', '(not provided)', 'Male', '62.56.24.215', '2015-10-28 16:22:15'), +(3, 'Rachel', 'Moreno', 'rmoreno2@msu.edu', 'Female', '31.222.249.23', '2016-04-05 02:05:30'), +(4, 'Ralph', 'Turner', 'rturner3@hp.com', 'Male', '157.83.76.114', '2016-08-08 00:06:51'), +(5, 'Laura', 'Gonzales', 'lgonzales4@howstuffworks.com', 'Female', '30.54.105.168', '2016-09-01 08:25:38'), +(6, 'Katherine', 'Lopez', 'klopez5@yahoo.co.jp', 'Female', '169.138.46.89', '2016-08-30 18:52:11'), +(7, 'Jeremy', 'Hamilton', 'jhamilton6@mozilla.org', 'Male', '231.189.13.133', '2016-07-17 02:09:46'), +(8, 'Heather', 'Rose', 'hrose7@goodreads.com', 'Female', '87.165.201.65', '2015-12-29 22:03:56'), +(9, 'Gregory', 'Kelly', 'gkelly8@trellian.com', 'Male', '154.209.99.7', '2016-03-24 21:18:16'), +(10, 'Rachel', 'Lopez', 'rlopez9@themeforest.net', 'Female', '237.165.82.71', '2016-08-20 15:44:49'), +(11, 'Donna', 'Welch', 'dwelcha@shutterfly.com', 'Female', '103.33.110.138', '2016-02-27 01:41:48'), +(12, 'Russell', 'Lawrence', 'rlawrenceb@qq.com', 'Male', '189.115.73.4', '2016-06-11 03:07:09'), +(13, 'Michelle', 'Montgomery', 'mmontgomeryc@scientificamerican.com', 'Female', '243.220.95.82', '2016-06-18 16:27:19'), +(14, 'Walter', 'Castillo', 'wcastillod@pagesperso-orange.fr', 'Male', '71.159.238.196', '2016-10-06 01:55:44'), +(15, 'Robin', 'Mills', 'rmillse@vkontakte.ru', 'Female', '172.190.5.50', '2016-10-31 11:41:21'), +(16, 'Raymond', 'Holmes', 'rholmesf@usgs.gov', 'Male', '148.153.166.95', '2016-10-03 08:16:38'), +(17, 'Gary', 'Bishop', 'gbishopg@plala.or.jp', 'Male', '161.108.182.13', '2016-08-29 19:35:20'), +(18, 'Anna', 'Riley', 'arileyh@nasa.gov', 'Female', '253.31.108.22', '2015-12-11 04:34:27'), +(19, 'Sarah', 'Knight', 'sknighti@foxnews.com', 'Female', '222.220.3.177', '2016-09-26 00:49:06'), +(20, 'Phyllis', 'Fox', null, 'Female', '163.191.232.95', '2016-08-21 10:35:19'); + + +-- populate snapshot table +insert into {database}.{schema}.snapshot_expected ( + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + dbt_valid_from, + dbt_valid_to, + dbt_updated_at, + dbt_scd_id, + dbt_is_deleted +) + +select + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + -- fields added by snapshotting + updated_at as dbt_valid_from, + cast(null as date) as dbt_valid_to, + updated_at as dbt_updated_at, + convert(varchar(50), hashbytes('md5', coalesce(cast(id as varchar(8000)), '') + '-' + coalesce(cast(first_name as varchar(8000)), '') + '|' + coalesce(cast(updated_at as varchar(8000)), '')), 2) as dbt_scd_id, + 'False' as dbt_is_deleted +from {database}.{schema}.seed; +""" + +_snapshot_actual_sql = """ +{% snapshot snapshot_actual %} + + {{ + config( + unique_key='cast(id as varchar(8000)) + '~ "'-'" ~ ' + cast(first_name as varchar(8000))', + ) + }} + select * from {{target.database}}.{{target.schema}}.seed + +{% endsnapshot %} +""" + +_snapshots_yml = """ +snapshots: + - name: snapshot_actual + config: + strategy: timestamp + updated_at: updated_at + hard_deletes: new_record +""" + +_ref_snapshot_sql = """ +select * from {{ ref('snapshot_actual') }} +""" + + +_invalidate_sql = """ +-- update records 11 - 21. Change email and updated_at field +update {schema}.seed set +updated_at = CAST(DATEADD(HOUR, 1, updated_at) AS datetime2(6)), +email = case when id = 20 then 'pfoxj@creativecommons.org' else 'new_' + email end +where id >= 10 and id <= 20; + + +-- invalidate records 11 - 21 +update {schema}.snapshot_expected set +dbt_valid_to = CAST(DATEADD(HOUR, 1, updated_at) AS datetime2(6)) +where id >= 10 and id <= 20; + +""" + +_update_sql = """ +-- insert v2 of the 11 - 21 records + +insert into {database}.{schema}.snapshot_expected ( + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + dbt_valid_from, + dbt_valid_to, + dbt_updated_at, + dbt_scd_id, + dbt_is_deleted +) + +select + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + -- fields added by snapshotting + updated_at as dbt_valid_from, + cast(null as date) as dbt_valid_to, + updated_at as dbt_updated_at, + convert(varchar(50), hashbytes('md5', coalesce(cast(id as varchar(8000)), '') + '-' + coalesce(cast(first_name as varchar(8000)), '') + '|' + coalesce(cast(updated_at as varchar(8000)), '')), 2) as dbt_scd_id, + 'False' as dbt_is_deleted +from {database}.{schema}.seed +where id >= 10 and id <= 20; +""" + +_delete_sql = """ +delete from {schema}.seed where id = 1 +""" + + +class SnapshotNewRecordMode: + @pytest.fixture(scope="class") + def snapshots(self): + return {"snapshot.sql": _snapshot_actual_sql} + + @pytest.fixture(scope="class") + def models(self): + return { + "snapshots.yml": _snapshots_yml, + "ref_snapshot.sql": _ref_snapshot_sql, + } + + @pytest.fixture(scope="class") + def seed_new_record_mode(self): + return _seed_new_record_mode + + @pytest.fixture(scope="class") + def invalidate_sql(self): + return _invalidate_sql + + @pytest.fixture(scope="class") + def update_sql(self): + return _update_sql + + @pytest.fixture(scope="class") + def delete_sql(self): + return _delete_sql + + def test_snapshot_new_record_mode( + self, project, seed_new_record_mode, invalidate_sql, update_sql + ): + project.run_sql(seed_new_record_mode) + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + project.run_sql(invalidate_sql) + project.run_sql(update_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + check_relations_equal(project.adapter, ["snapshot_actual", "snapshot_expected"]) + + project.run_sql(_delete_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + +class TestSnapshotNewRecordMode(SnapshotNewRecordMode): + pass diff --git a/tests/functional/adapter/test_snpashot_configs.py b/tests/functional/adapter/test_snpashot_configs.py new file mode 100644 index 0000000..72ceabf --- /dev/null +++ b/tests/functional/adapter/test_snpashot_configs.py @@ -0,0 +1,713 @@ +import datetime + +import pytest +from dbt.tests.util import ( + check_relations_equal, + get_manifest, + run_dbt, + run_dbt_and_capture, + run_sql_with_adapter, + update_config_file, +) + +model_seed_sql = """ +select * from {{target.database}}.{{target.schema}}.seed +""" + +snapshots_multi_key_yml = """ +snapshots: + - name: snapshot_actual + relation: "ref('seed')" + config: + strategy: timestamp + updated_at: updated_at + unique_key: + - id1 + - id2 + snapshot_meta_column_names: + dbt_valid_to: test_valid_to + dbt_valid_from: test_valid_from + dbt_scd_id: test_scd_id + dbt_updated_at: test_updated_at +""" + +# multi-key snapshot fixtures + +create_multi_key_seed_sql = """ +create table {schema}.seed ( + id1 INTEGER, + id2 INTEGER, + first_name VARCHAR(50), + last_name VARCHAR(50), + email VARCHAR(50), + gender VARCHAR(50), + ip_address VARCHAR(20), + updated_at DATETIME2(6) +); +""" + +create_multi_key_snapshot_expected_sql = """ +create table {schema}.snapshot_expected ( + id1 INTEGER, + id2 INTEGER, + first_name VARCHAR(50), + last_name VARCHAR(50), + email VARCHAR(50), + gender VARCHAR(50), + ip_address VARCHAR(20), + + -- snapshotting fields + updated_at DATETIME2(6), + test_valid_from DATETIME2(6), + test_valid_to DATETIME2(6), + test_scd_id VARCHAR(50), + test_updated_at DATETIME2(6) +); +""" + +seed_multi_key_insert_sql = """ +-- seed inserts +-- use the same email for two users to verify that duplicated check_cols values +-- are handled appropriately +insert into {schema}.seed (id1, id2, first_name, last_name, email, gender, ip_address, updated_at) values +(1, 100, 'Judith', 'Kennedy', '(not provided)', 'Female', '54.60.24.128', '2015-12-24 12:19:28'), +(2, 200, 'Arthur', 'Kelly', '(not provided)', 'Male', '62.56.24.215', '2015-10-28 16:22:15'), +(3, 300, 'Rachel', 'Moreno', 'rmoreno2@msu.edu', 'Female', '31.222.249.23', '2016-04-05 02:05:30'), +(4, 400, 'Ralph', 'Turner', 'rturner3@hp.com', 'Male', '157.83.76.114', '2016-08-08 00:06:51'), +(5, 500, 'Laura', 'Gonzales', 'lgonzales4@howstuffworks.com', 'Female', '30.54.105.168', '2016-09-01 08:25:38'), +(6, 600, 'Katherine', 'Lopez', 'klopez5@yahoo.co.jp', 'Female', '169.138.46.89', '2016-08-30 18:52:11'), +(7, 700, 'Jeremy', 'Hamilton', 'jhamilton6@mozilla.org', 'Male', '231.189.13.133', '2016-07-17 02:09:46'), +(8, 800, 'Heather', 'Rose', 'hrose7@goodreads.com', 'Female', '87.165.201.65', '2015-12-29 22:03:56'), +(9, 900, 'Gregory', 'Kelly', 'gkelly8@trellian.com', 'Male', '154.209.99.7', '2016-03-24 21:18:16'), +(10, 1000, 'Rachel', 'Lopez', 'rlopez9@themeforest.net', 'Female', '237.165.82.71', '2016-08-20 15:44:49'), +(11, 1100, 'Donna', 'Welch', 'dwelcha@shutterfly.com', 'Female', '103.33.110.138', '2016-02-27 01:41:48'), +(12, 1200, 'Russell', 'Lawrence', 'rlawrenceb@qq.com', 'Male', '189.115.73.4', '2016-06-11 03:07:09'), +(13, 1300, 'Michelle', 'Montgomery', 'mmontgomeryc@scientificamerican.com', 'Female', '243.220.95.82', '2016-06-18 16:27:19'), +(14, 1400, 'Walter', 'Castillo', 'wcastillod@pagesperso-orange.fr', 'Male', '71.159.238.196', '2016-10-06 01:55:44'), +(15, 1500, 'Robin', 'Mills', 'rmillse@vkontakte.ru', 'Female', '172.190.5.50', '2016-10-31 11:41:21'), +(16, 1600, 'Raymond', 'Holmes', 'rholmesf@usgs.gov', 'Male', '148.153.166.95', '2016-10-03 08:16:38'), +(17, 1700, 'Gary', 'Bishop', 'gbishopg@plala.or.jp', 'Male', '161.108.182.13', '2016-08-29 19:35:20'), +(18, 1800, 'Anna', 'Riley', 'arileyh@nasa.gov', 'Female', '253.31.108.22', '2015-12-11 04:34:27'), +(19, 1900, 'Sarah', 'Knight', 'sknighti@foxnews.com', 'Female', '222.220.3.177', '2016-09-26 00:49:06'), +(20, 2000, 'Phyllis', 'Fox', null, 'Female', '163.191.232.95', '2016-08-21 10:35:19'); +""" + +populate_multi_key_snapshot_expected_sql = """ +-- populate snapshot table +insert into {schema}.snapshot_expected ( + id1, + id2, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + test_valid_from, + test_valid_to, + test_updated_at, + test_scd_id +) + +select + id1, + id2, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + -- fields added by snapshotting + updated_at as test_valid_from, + cast(null as Datetime2(6)) as test_valid_to, + updated_at as test_updated_at, + convert(varchar(50), hashbytes('md5', coalesce(cast(id1 as varchar(8000)), '') + '|' + coalesce(cast(id2 as varchar(8000)), '') + '|' + coalesce(cast(updated_at as varchar(8000)), '')), 2) as test_scd_id +from {schema}.seed; +""" + +model_seed_sql = """ +select * from {{target.database}}.{{target.schema}}.seed +""" + +snapshots_multi_key_yml = """ +snapshots: + - name: snapshot_actual + relation: "ref('seed')" + config: + strategy: timestamp + updated_at: updated_at + unique_key: + - id1 + - id2 + snapshot_meta_column_names: + dbt_valid_to: test_valid_to + dbt_valid_from: test_valid_from + dbt_scd_id: test_scd_id + dbt_updated_at: test_updated_at +""" + +invalidate_multi_key_sql = """ +-- update records 11 - 21. Change email and updated_at field +update {schema}.seed set + updated_at = CAST(DATEADD(HOUR, 1, updated_at) AS datetime2(6)), + email = case when id1 = 20 then 'pfoxj@creativecommons.org' else 'new_' + email end +where id1 >= 10 and id1 <= 20; + + +-- invalidate records 11 - 21 +update {schema}.snapshot_expected set + test_valid_to = CAST(DATEADD(HOUR, 1, updated_at) AS datetime2(6)) +where id1 >= 10 and id1 <= 20; + +""" + +update_multi_key_sql = """ +-- insert v2 of the 11 - 21 records + +insert into {schema}.snapshot_expected ( + id1, + id2, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + test_valid_from, + test_valid_to, + test_updated_at, + test_scd_id +) + +select + id1, + id2, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + -- fields added by snapshotting + updated_at as test_valid_from, + cast(null as Datetime2(6)) as test_valid_to, + updated_at as test_updated_at, + convert(varchar(50), hashbytes('md5', coalesce(cast(id1 as varchar(8000)), '') + '|' + coalesce(cast(id2 as varchar(8000)), '') + '|' + coalesce(cast(updated_at as varchar(8000)), '')), 2) as test_scd_id +from {schema}.seed +where id1 >= 10 and id1 <= 20; +""" + +snapshot_actual_sql = """ +{% snapshot snapshot_actual %} + + {{ + config( + unique_key='cast(id as varchar(8000)) + '~ "'-'" ~ ' + cast(first_name as varchar(8000))', + ) + }} + + select * from {{target.database}}.{{target.schema}}.seed + +{% endsnapshot %} +""" + +snapshots_valid_to_current_yml = """ +snapshots: + - name: snapshot_actual + config: + strategy: timestamp + updated_at: updated_at + dbt_valid_to_current: "cast('2099-12-31' as date)" + snapshot_meta_column_names: + dbt_valid_to: test_valid_to + dbt_valid_from: test_valid_from + dbt_scd_id: test_scd_id + dbt_updated_at: test_updated_at +""" + +ref_snapshot_sql = """ +select * from {{ ref('snapshot_actual') }} +""" + +create_seed_sql = """ +create table {schema}.seed ( + id INT, + first_name VARCHAR(50), + last_name VARCHAR(50), + email VARCHAR(50), + gender VARCHAR(50), + ip_address VARCHAR(20), + updated_at DATETIME2(6) +); +""" + +create_snapshot_expected_sql = """ +create table {schema}.snapshot_expected ( + id INT, + first_name VARCHAR(50), + last_name VARCHAR(50), + email VARCHAR(50), + gender VARCHAR(50), + ip_address VARCHAR(20), + + -- snapshotting fields + updated_at DATETIME2(6), + test_valid_from DATETIME2(6), + test_valid_to DATETIME2(6), + test_scd_id VARCHAR(50), + test_updated_at DATETIME2(6) +); +""" + +seed_insert_sql = """ +-- seed inserts +-- use the same email for two users to verify that duplicated check_cols values +-- are handled appropriately +insert into {schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(1, 'Judith', 'Kennedy', '(not provided)', 'Female', '54.60.24.128', '2015-12-24 12:19:28'), +(2, 'Arthur', 'Kelly', '(not provided)', 'Male', '62.56.24.215', '2015-10-28 16:22:15'), +(3, 'Rachel', 'Moreno', 'rmoreno2@msu.edu', 'Female', '31.222.249.23', '2016-04-05 02:05:30'), +(4, 'Ralph', 'Turner', 'rturner3@hp.com', 'Male', '157.83.76.114', '2016-08-08 00:06:51'), +(5, 'Laura', 'Gonzales', 'lgonzales4@howstuffworks.com', 'Female', '30.54.105.168', '2016-09-01 08:25:38'), +(6, 'Katherine', 'Lopez', 'klopez5@yahoo.co.jp', 'Female', '169.138.46.89', '2016-08-30 18:52:11'), +(7, 'Jeremy', 'Hamilton', 'jhamilton6@mozilla.org', 'Male', '231.189.13.133', '2016-07-17 02:09:46'), +(8, 'Heather', 'Rose', 'hrose7@goodreads.com', 'Female', '87.165.201.65', '2015-12-29 22:03:56'), +(9, 'Gregory', 'Kelly', 'gkelly8@trellian.com', 'Male', '154.209.99.7', '2016-03-24 21:18:16'), +(10, 'Rachel', 'Lopez', 'rlopez9@themeforest.net', 'Female', '237.165.82.71', '2016-08-20 15:44:49'), +(11, 'Donna', 'Welch', 'dwelcha@shutterfly.com', 'Female', '103.33.110.138', '2016-02-27 01:41:48'), +(12, 'Russell', 'Lawrence', 'rlawrenceb@qq.com', 'Male', '189.115.73.4', '2016-06-11 03:07:09'), +(13, 'Michelle', 'Montgomery', 'mmontgomeryc@scientificamerican.com', 'Female', '243.220.95.82', '2016-06-18 16:27:19'), +(14, 'Walter', 'Castillo', 'wcastillod@pagesperso-orange.fr', 'Male', '71.159.238.196', '2016-10-06 01:55:44'), +(15, 'Robin', 'Mills', 'rmillse@vkontakte.ru', 'Female', '172.190.5.50', '2016-10-31 11:41:21'), +(16, 'Raymond', 'Holmes', 'rholmesf@usgs.gov', 'Male', '148.153.166.95', '2016-10-03 08:16:38'), +(17, 'Gary', 'Bishop', 'gbishopg@plala.or.jp', 'Male', '161.108.182.13', '2016-08-29 19:35:20'), +(18, 'Anna', 'Riley', 'arileyh@nasa.gov', 'Female', '253.31.108.22', '2015-12-11 04:34:27'), +(19, 'Sarah', 'Knight', 'sknighti@foxnews.com', 'Female', '222.220.3.177', '2016-09-26 00:49:06'), +(20, 'Phyllis', 'Fox', null, 'Female', '163.191.232.95', '2016-08-21 10:35:19'); +""" + +populate_snapshot_expected_valid_to_current_sql = """ +-- populate snapshot table +insert into {schema}.snapshot_expected ( + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + test_valid_from, + test_valid_to, + test_updated_at, + test_scd_id +) + +select + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + -- fields added by snapshotting + updated_at as test_valid_from, + cast('2099-12-31' as date) as test_valid_to, + updated_at as test_updated_at, + convert(varchar(50), hashbytes('md5', coalesce(cast(id as varchar(8000)), '') + '-' + coalesce(cast(first_name as varchar(8000)), '') + '|' + coalesce(cast(updated_at as varchar(8000)), '')), 2) as test_scd_id +from {schema}.seed; +""" + +populate_snapshot_expected_sql = """ +-- populate snapshot table +insert into {schema}.snapshot_expected ( + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + test_valid_from, + test_valid_to, + test_updated_at, + test_scd_id +) + +select + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + -- fields added by snapshotting + updated_at as test_valid_from, + cast(null as date) as test_valid_to, + updated_at as test_updated_at, + convert(varchar(50), hashbytes('md5', coalesce(cast(id as varchar(8000)), '') + '-' + coalesce(cast(first_name as varchar(8000)), '') + '|' + coalesce(cast(updated_at as varchar(8000)), '')), 2) as test_scd_id +from {schema}.seed; +""" + +update_with_current_sql = """ +-- insert v2 of the 11 - 21 records + +insert into {schema}.snapshot_expected ( + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + test_valid_from, + test_valid_to, + test_updated_at, + test_scd_id +) + +select + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + -- fields added by snapshotting + updated_at as test_valid_from, + cast('2099-12-31' as date) as test_valid_to, + updated_at as test_updated_at, + convert(varchar(50), hashbytes('md5', coalesce(cast(id as varchar(8000)), '') + '-' + coalesce(cast(first_name as varchar(8000)), '') + '|' + coalesce(cast(updated_at as varchar(8000)), '')), 2) as test_scd_id +from {schema}.seed +where id >= 10 and id <= 20; +""" + +invalidate_sql = """ +-- update records 11 - 21. Change email and updated_at field +update {schema}.seed set + updated_at = CAST(DATEADD(HOUR, 1, updated_at) AS datetime2(6)), + email = case when id = 20 then 'pfoxj@creativecommons.org' else 'new_' + email end +where id >= 10 and id <= 20; + +-- invalidate records 11 - 21 +update {schema}.snapshot_expected set + test_valid_to = CAST(DATEADD(HOUR, 1, updated_at) AS datetime2(6)) +where id >= 10 and id <= 20; +""" + +snapshots_no_column_names_yml = """ +snapshots: + - name: snapshot_actual + config: + strategy: timestamp + updated_at: updated_at +""" + +ref_snapshot_sql = """ +select * from {{ ref('snapshot_actual') }} +""" + +update_sql = """ +-- insert v2 of the 11 - 21 records + +insert into {schema}.snapshot_expected ( + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + test_valid_from, + test_valid_to, + test_updated_at, + test_scd_id +) + +select + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + -- fields added by snapshotting + updated_at as test_valid_from, + cast (null as date) as test_valid_to, + updated_at as test_updated_at, + convert(varchar(50), hashbytes('md5', coalesce(cast(id as varchar(8000)), '') + '-' + coalesce(cast(first_name as varchar(8000)), '') + '|' + coalesce(cast(updated_at as varchar(8000)), '')), 2) as test_scd_id +from {schema}.seed +where id >= 10 and id <= 20; +""" + +snapshots_yml = """ +snapshots: + - name: snapshot_actual + config: + strategy: timestamp + updated_at: updated_at + snapshot_meta_column_names: + dbt_valid_to: test_valid_to + dbt_valid_from: test_valid_from + dbt_scd_id: test_scd_id + dbt_updated_at: test_updated_at +""" + + +class BaseSnapshotDbtValidToCurrent: + @pytest.fixture(scope="class") + def snapshots(self): + return {"snapshot.sql": snapshot_actual_sql} + + @pytest.fixture(scope="class") + def models(self): + return { + "snapshots.yml": snapshots_valid_to_current_yml, + "ref_snapshot.sql": ref_snapshot_sql, + } + + def test_valid_to_current(self, project): + project.run_sql(create_seed_sql) + project.run_sql(create_snapshot_expected_sql) + project.run_sql(seed_insert_sql) + project.run_sql(populate_snapshot_expected_valid_to_current_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + original_snapshot = run_sql_with_adapter( + project.adapter, + "select id, test_scd_id, test_valid_to from {schema}.snapshot_actual", + "all", + ) + assert original_snapshot[0][2] == datetime.datetime(2099, 12, 31, 0, 0) + assert original_snapshot[9][2] == datetime.datetime(2099, 12, 31, 0, 0) + + project.run_sql(invalidate_sql) + project.run_sql(update_with_current_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + updated_snapshot = run_sql_with_adapter( + project.adapter, + "select id, test_scd_id, test_valid_to from {schema}.snapshot_actual", + "all", + ) + assert len(updated_snapshot) == 31 + + updated_snapshot_row_count = run_sql_with_adapter( + project.adapter, + "select count(*) from {schema}.snapshot_actual where test_valid_to != '2099-12-31 00:00:00.000000'", + "all", + ) + assert updated_snapshot_row_count[0][0] == 11 + + updated_snapshot_row_17 = run_sql_with_adapter( + project.adapter, + "select id from {schema}.snapshot_actual where test_valid_to = '2016-08-29 20:35:20.000000'", + "all", + ) + assert updated_snapshot_row_17[0][0] == 17 + + updated_snapshot_row_16 = run_sql_with_adapter( + project.adapter, + "select id from {schema}.snapshot_actual where test_valid_to = '2016-10-03 09:16:38.000000'", + "all", + ) + assert updated_snapshot_row_16[0][0] == 16 + check_relations_equal(project.adapter, ["snapshot_actual", "snapshot_expected"]) + + +class TestSnapshotDbtValidToCurrent(BaseSnapshotDbtValidToCurrent): + pass + + +class BaseSnapshotColumnNamesFromDbtProject: + @pytest.fixture(scope="class") + def snapshots(self): + return {"snapshot.sql": snapshot_actual_sql} + + @pytest.fixture(scope="class") + def models(self): + return { + "snapshots.yml": snapshots_no_column_names_yml, + "ref_snapshot.sql": ref_snapshot_sql, + } + + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "snapshots": { + "test": { + "+snapshot_meta_column_names": { + "dbt_valid_to": "test_valid_to", + "dbt_valid_from": "test_valid_from", + "dbt_scd_id": "test_scd_id", + "dbt_updated_at": "test_updated_at", + } + } + } + } + + def test_snapshot_column_names_from_project(self, project): + project.run_sql(create_seed_sql) + project.run_sql(create_snapshot_expected_sql) + project.run_sql(seed_insert_sql) + project.run_sql(populate_snapshot_expected_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + project.run_sql(invalidate_sql) + project.run_sql(update_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + check_relations_equal(project.adapter, ["snapshot_actual", "snapshot_expected"]) + + +class TestBaseSnapshotColumnNamesFromDbtProject(BaseSnapshotColumnNamesFromDbtProject): + pass + + +class BaseSnapshotColumnNames: + @pytest.fixture(scope="class") + def snapshots(self): + return {"snapshot.sql": snapshot_actual_sql} + + @pytest.fixture(scope="class") + def models(self): + return { + "snapshots.yml": snapshots_yml, + "ref_snapshot.sql": ref_snapshot_sql, + } + + def test_snapshot_column_names(self, project): + project.run_sql(create_seed_sql) + project.run_sql(create_snapshot_expected_sql) + project.run_sql(seed_insert_sql) + project.run_sql(populate_snapshot_expected_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + project.run_sql(invalidate_sql) + project.run_sql(update_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + check_relations_equal(project.adapter, ["snapshot_actual", "snapshot_expected"]) + + +class TestBaseSnapshotColumnNames(BaseSnapshotColumnNames): + pass + + +class BaseSnapshotInvalidColumnNames: + @pytest.fixture(scope="class") + def snapshots(self): + return {"snapshot.sql": snapshot_actual_sql} + + @pytest.fixture(scope="class") + def models(self): + return { + "snapshots.yml": snapshots_no_column_names_yml, + "ref_snapshot.sql": ref_snapshot_sql, + } + + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "snapshots": { + "test": { + "+snapshot_meta_column_names": { + "dbt_valid_to": "test_valid_to", + "dbt_valid_from": "test_valid_from", + "dbt_scd_id": "test_scd_id", + "dbt_updated_at": "test_updated_at", + } + } + } + } + + def test_snapshot_invalid_column_names(self, project): + project.run_sql(create_seed_sql) + project.run_sql(create_snapshot_expected_sql) + project.run_sql(seed_insert_sql) + project.run_sql(populate_snapshot_expected_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + manifest = get_manifest(project.project_root) + snapshot_node = manifest.nodes["snapshot.test.snapshot_actual"] + snapshot_node.config.snapshot_meta_column_names == { + "dbt_valid_to": "test_valid_to", + "dbt_valid_from": "test_valid_from", + "dbt_scd_id": "test_scd_id", + "dbt_updated_at": "test_updated_at", + } + + project.run_sql(invalidate_sql) + project.run_sql(update_sql) + + # Change snapshot_meta_columns and look for an error + different_columns = { + "snapshots": { + "test": { + "+snapshot_meta_column_names": { + "dbt_valid_to": "test_valid_to", + "dbt_updated_at": "test_updated_at", + } + } + } + } + update_config_file(different_columns, "dbt_project.yml") + + results, log_output = run_dbt_and_capture(["snapshot"], expect_pass=False) + assert len(results) == 1 + assert "dbt_scd_id" in log_output + assert "1 of 1 ERROR snapshotting test" in log_output + + +class TestBaseSnapshotInvalidColumnNames(BaseSnapshotInvalidColumnNames): + pass + + +# This uses snapshot_meta_column_names, yaml-only snapshot def, +# and multiple keys +class BaseSnapshotMultiUniqueKey: + @pytest.fixture(scope="class") + def models(self): + return { + "seed.sql": model_seed_sql, + "snapshots.yml": snapshots_multi_key_yml, + "ref_snapshot.sql": ref_snapshot_sql, + } + + def test_multi_column_unique_key(self, project): + project.run_sql(create_multi_key_seed_sql) + project.run_sql(create_multi_key_snapshot_expected_sql) + project.run_sql(seed_multi_key_insert_sql) + project.run_sql(populate_multi_key_snapshot_expected_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + project.run_sql(invalidate_multi_key_sql) + project.run_sql(update_multi_key_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + check_relations_equal(project.adapter, ["snapshot_actual", "snapshot_expected"]) + + +class TestBaseSnapshotMultiUniqueKey(BaseSnapshotMultiUniqueKey): + pass