diff --git a/.changes/unreleased/Features-20241104-120653.yaml b/.changes/unreleased/Features-20241104-120653.yaml new file mode 100644 index 00000000..a85e1f7f --- /dev/null +++ b/.changes/unreleased/Features-20241104-120653.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Add new hard_deletes="new_record" mode for snapshots. +time: 2024-11-04T12:06:53.225939-05:00 +custom: + Author: peterallenwebb + Issue: "317" diff --git a/dbt-tests-adapter/dbt/tests/adapter/simple_snapshot/new_record_mode.py b/dbt-tests-adapter/dbt/tests/adapter/simple_snapshot/new_record_mode.py new file mode 100644 index 00000000..c50f0ff9 --- /dev/null +++ b/dbt-tests-adapter/dbt/tests/adapter/simple_snapshot/new_record_mode.py @@ -0,0 +1,225 @@ +import pytest + +from dbt.tests.util import check_relations_equal, run_dbt + +_seed_new_record_mode = """ +create table {database}.{schema}.seed ( + id INTEGER, + first_name VARCHAR(50), + last_name VARCHAR(50), + email VARCHAR(50), + gender VARCHAR(50), + ip_address VARCHAR(20), + updated_at TIMESTAMP WITHOUT TIME ZONE +); + +create table {database}.{schema}.snapshot_expected ( + id INTEGER, + first_name VARCHAR(50), + last_name VARCHAR(50), + email VARCHAR(50), + gender VARCHAR(50), + ip_address VARCHAR(20), + + -- snapshotting fields + updated_at TIMESTAMP WITHOUT TIME ZONE, + dbt_valid_from TIMESTAMP WITHOUT TIME ZONE, + dbt_valid_to TIMESTAMP WITHOUT TIME ZONE, + dbt_scd_id TEXT, + dbt_updated_at TIMESTAMP WITHOUT TIME ZONE, + dbt_is_deleted TEXT +); + + +-- seed inserts +-- use the same email for two users to verify that duplicated check_cols values +-- are handled appropriately +insert into {database}.{schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(1, 'Judith', 'Kennedy', '(not provided)', 'Female', '54.60.24.128', '2015-12-24 12:19:28'), +(2, 'Arthur', 'Kelly', '(not provided)', 'Male', '62.56.24.215', '2015-10-28 16:22:15'), +(3, 'Rachel', 'Moreno', 'rmoreno2@msu.edu', 'Female', '31.222.249.23', '2016-04-05 02:05:30'), +(4, 'Ralph', 'Turner', 'rturner3@hp.com', 'Male', '157.83.76.114', '2016-08-08 00:06:51'), +(5, 'Laura', 'Gonzales', 'lgonzales4@howstuffworks.com', 'Female', '30.54.105.168', '2016-09-01 08:25:38'), +(6, 'Katherine', 'Lopez', 'klopez5@yahoo.co.jp', 'Female', '169.138.46.89', '2016-08-30 18:52:11'), +(7, 'Jeremy', 'Hamilton', 'jhamilton6@mozilla.org', 'Male', '231.189.13.133', '2016-07-17 02:09:46'), +(8, 'Heather', 'Rose', 'hrose7@goodreads.com', 'Female', '87.165.201.65', '2015-12-29 22:03:56'), +(9, 'Gregory', 'Kelly', 'gkelly8@trellian.com', 'Male', '154.209.99.7', '2016-03-24 21:18:16'), +(10, 'Rachel', 'Lopez', 'rlopez9@themeforest.net', 'Female', '237.165.82.71', '2016-08-20 15:44:49'), +(11, 'Donna', 'Welch', 'dwelcha@shutterfly.com', 'Female', '103.33.110.138', '2016-02-27 01:41:48'), +(12, 'Russell', 'Lawrence', 'rlawrenceb@qq.com', 'Male', '189.115.73.4', '2016-06-11 03:07:09'), +(13, 'Michelle', 'Montgomery', 'mmontgomeryc@scientificamerican.com', 'Female', '243.220.95.82', '2016-06-18 16:27:19'), +(14, 'Walter', 'Castillo', 'wcastillod@pagesperso-orange.fr', 'Male', '71.159.238.196', '2016-10-06 01:55:44'), +(15, 'Robin', 'Mills', 'rmillse@vkontakte.ru', 'Female', '172.190.5.50', '2016-10-31 11:41:21'), +(16, 'Raymond', 'Holmes', 'rholmesf@usgs.gov', 'Male', '148.153.166.95', '2016-10-03 08:16:38'), +(17, 'Gary', 'Bishop', 'gbishopg@plala.or.jp', 'Male', '161.108.182.13', '2016-08-29 19:35:20'), +(18, 'Anna', 'Riley', 'arileyh@nasa.gov', 'Female', '253.31.108.22', '2015-12-11 04:34:27'), +(19, 'Sarah', 'Knight', 'sknighti@foxnews.com', 'Female', '222.220.3.177', '2016-09-26 00:49:06'), +(20, 'Phyllis', 'Fox', null, 'Female', '163.191.232.95', '2016-08-21 10:35:19'); + + +-- populate snapshot table +insert into {database}.{schema}.snapshot_expected ( + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + dbt_valid_from, + dbt_valid_to, + dbt_updated_at, + dbt_scd_id, + dbt_is_deleted +) + +select + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + -- fields added by snapshotting + updated_at as dbt_valid_from, + null::timestamp as dbt_valid_to, + updated_at as dbt_updated_at, + md5(id || '-' || first_name || '|' || updated_at::text) as dbt_scd_id, + 'False' as dbt_is_deleted +from {database}.{schema}.seed; +""" + +_snapshot_actual_sql = """ +{% snapshot snapshot_actual %} + + {{ + config( + unique_key='id || ' ~ "'-'" ~ ' || first_name', + ) + }} + + select * from {{target.database}}.{{target.schema}}.seed + +{% endsnapshot %} +""" + +_snapshots_yml = """ +snapshots: + - name: snapshot_actual + config: + strategy: timestamp + updated_at: updated_at + hard_deletes: new_record +""" + +_ref_snapshot_sql = """ +select * from {{ ref('snapshot_actual') }} +""" + + +_invalidate_sql = """ +-- update records 11 - 21. Change email and updated_at field +update {schema}.seed set + updated_at = updated_at + interval '1 hour', + email = case when id = 20 then 'pfoxj@creativecommons.org' else 'new_' || email end +where id >= 10 and id <= 20; + + +-- invalidate records 11 - 21 +update {schema}.snapshot_expected set + dbt_valid_to = updated_at + interval '1 hour' +where id >= 10 and id <= 20; + +""" + +_update_sql = """ +-- insert v2 of the 11 - 21 records + +insert into {database}.{schema}.snapshot_expected ( + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + dbt_valid_from, + dbt_valid_to, + dbt_updated_at, + dbt_scd_id, + dbt_is_deleted +) + +select + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + -- fields added by snapshotting + updated_at as dbt_valid_from, + null::timestamp as dbt_valid_to, + updated_at as dbt_updated_at, + md5(id || '-' || first_name || '|' || updated_at::text) as dbt_scd_id, + 'False' as dbt_is_deleted +from {database}.{schema}.seed +where id >= 10 and id <= 20; +""" + +_delete_sql = """ +delete from {schema}.seed where id = 1 +""" + + +class SnapshotNewRecordMode: + @pytest.fixture(scope="class") + def snapshots(self): + return {"snapshot.sql": _snapshot_actual_sql} + + @pytest.fixture(scope="class") + def models(self): + return { + "snapshots.yml": _snapshots_yml, + "ref_snapshot.sql": _ref_snapshot_sql, + } + + @pytest.fixture(scope="class") + def seed_new_record_mode(self): + return _seed_new_record_mode + + @pytest.fixture(scope="class") + def invalidate_sql(self): + return _invalidate_sql + + @pytest.fixture(scope="class") + def update_sql(self): + return _update_sql + + @pytest.fixture(scope="class") + def delete_sql(self): + return _delete_sql + + def test_snapshot_new_record_mode( + self, project, seed_new_record_mode, invalidate_sql, update_sql + ): + project.run_sql(seed_new_record_mode) + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + project.run_sql(invalidate_sql) + project.run_sql(update_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + check_relations_equal(project.adapter, ["snapshot_actual", "snapshot_expected"]) + + project.run_sql(_delete_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + # TODO: Further validate results. diff --git a/dbt/adapters/base/impl.py b/dbt/adapters/base/impl.py index 44817a18..ae172635 100644 --- a/dbt/adapters/base/impl.py +++ b/dbt/adapters/base/impl.py @@ -206,6 +206,14 @@ class FreshnessResponse(TypedDict): age: float # age in seconds +class SnapshotStrategy(TypedDict): + unique_key: Optional[str] + updated_at: Optional[str] + row_changed: Optional[str] + scd_id: Optional[str] + hard_deletes: Optional[str] + + class BaseAdapter(metaclass=AdapterMeta): """The BaseAdapter provides an abstract base class for adapters. @@ -795,8 +803,8 @@ def valid_snapshot_target( columns = self.get_columns_in_relation(relation) names = set(c.name.lower() for c in columns) missing = [] - # Note: we're not checking dbt_updated_at here because it's not - # always present. + # Note: we're not checking dbt_updated_at or dbt_is_deleted here because they + # aren't always present. for column in ("dbt_scd_id", "dbt_valid_from", "dbt_valid_to"): desired = column_names[column] if column_names else column if desired not in names: @@ -805,6 +813,28 @@ def valid_snapshot_target( if missing: raise SnapshotTargetNotSnapshotTableError(missing) + @available.parse_none + def assert_valid_snapshot_target_given_strategy( + self, relation: BaseRelation, column_names: Dict[str, str], strategy: SnapshotStrategy + ) -> None: + # Assert everything we can with the legacy function. + self.valid_snapshot_target(relation, column_names) + + # Now do strategy-specific checks. + # TODO: Make these checks more comprehensive. + if strategy.get("hard_deletes", None) == "new_record": + columns = self.get_columns_in_relation(relation) + names = set(c.name.lower() for c in columns) + missing = [] + + for column in ("dbt_is_deleted",): + desired = column_names[column] if column_names else column + if desired not in names: + missing.append(desired) + + if missing: + raise SnapshotTargetNotSnapshotTableError(missing) + @available.parse_none def expand_target_column_types( self, from_relation: BaseRelation, to_relation: BaseRelation @@ -1795,6 +1825,29 @@ def _get_adapter_specific_run_info(cls, config) -> Dict[str, Any]: """ return {} + @available.parse_none + @classmethod + def get_hard_deletes_behavior(cls, config): + """Check the hard_deletes config enum, and the legacy invalidate_hard_deletes + config flag in order to determine which behavior should be used for deleted + records in a snapshot. The default is to ignore them.""" + invalidate_hard_deletes = config.get("invalidate_hard_deletes", None) + hard_deletes = config.get("hard_deletes", None) + + if invalidate_hard_deletes is not None and hard_deletes is not None: + raise DbtValidationError( + "You cannot set both the invalidate_hard_deletes and hard_deletes config properties on the same snapshot." + ) + + if invalidate_hard_deletes or hard_deletes == "invalidate": + return "invalidate" + elif hard_deletes == "new_record": + return "new_record" + elif hard_deletes is None or hard_deletes == "ignore": + return "ignore" + + raise DbtValidationError("Invalid setting for property hard_deletes.") + COLUMNS_EQUAL_SQL = """ with diff_count as ( diff --git a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql index b4cd7c14..33492cc9 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql @@ -35,7 +35,7 @@ {% endmacro %} {% macro get_snapshot_table_column_names() %} - {{ return({'dbt_valid_to': 'dbt_valid_to', 'dbt_valid_from': 'dbt_valid_from', 'dbt_scd_id': 'dbt_scd_id', 'dbt_updated_at': 'dbt_updated_at'}) }} + {{ return({'dbt_valid_to': 'dbt_valid_to', 'dbt_valid_from': 'dbt_valid_from', 'dbt_scd_id': 'dbt_scd_id', 'dbt_updated_at': 'dbt_updated_at', 'dbt_is_deleted': 'dbt_is_deleted'}) }} {% endmacro %} {% macro default__snapshot_staging_table(strategy, source_sql, target_relation) -%} @@ -82,7 +82,7 @@ from snapshot_query ), - {%- if strategy.invalidate_hard_deletes %} + {%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %} deletes_source_data as ( @@ -96,6 +96,9 @@ select 'insert' as dbt_change_type, source_data.* + {%- if strategy.hard_deletes == 'new_record' -%} + ,'False' as {{ columns.dbt_is_deleted }} + {%- endif %} from insertions_source_data as source_data left outer join snapshotted_data @@ -113,6 +116,9 @@ 'update' as dbt_change_type, source_data.*, snapshotted_data.{{ columns.dbt_scd_id }} + {%- if strategy.hard_deletes == 'new_record' -%} + , snapshotted_data.{{ columns.dbt_is_deleted }} + {%- endif %} from updates_source_data as source_data join snapshotted_data @@ -122,9 +128,8 @@ ) ) - {%- if strategy.invalidate_hard_deletes -%} + {%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %} , - deletes as ( select @@ -134,7 +139,38 @@ {{ snapshot_get_time() }} as {{ columns.dbt_updated_at }}, {{ snapshot_get_time() }} as {{ columns.dbt_valid_to }}, snapshotted_data.{{ columns.dbt_scd_id }} + {%- if strategy.hard_deletes == 'new_record' -%} + , snapshotted_data.{{ columns.dbt_is_deleted }} + {%- endif %} + from snapshotted_data + left join deletes_source_data as source_data + on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }} + where {{ unique_key_is_null(strategy.unique_key, "source_data") }} + ) + {%- endif %} + + {%- if strategy.hard_deletes == 'new_record' %} + {% set source_sql_cols = get_column_schema_from_query(source_sql) %} + , + deletion_records as ( + select + 'insert' as dbt_change_type, + {%- for col in source_sql_cols -%} + snapshotted_data.{{ adapter.quote(col.column) }}, + {% endfor -%} + {%- if strategy.unique_key | is_list -%} + {%- for key in strategy.unique_key -%} + snapshotted_data.{{ key }} as dbt_unique_key_{{ loop.index }}, + {% endfor -%} + {%- else -%} + snapshotted_data.dbt_unique_key as dbt_unique_key, + {% endif -%} + {{ snapshot_get_time() }} as {{ columns.dbt_valid_from }}, + {{ snapshot_get_time() }} as {{ columns.dbt_updated_at }}, + snapshotted_data.{{ columns.dbt_valid_to }} as {{ columns.dbt_valid_to }}, + snapshotted_data.{{ columns.dbt_scd_id }}, + 'True' as {{ columns.dbt_is_deleted }} from snapshotted_data left join deletes_source_data as source_data on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }} @@ -145,10 +181,15 @@ select * from insertions union all select * from updates - {%- if strategy.invalidate_hard_deletes %} + {%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %} union all select * from deletes {%- endif %} + {%- if strategy.hard_deletes == 'new_record' %} + union all + select * from deletion_records + {%- endif %} + {%- endmacro %} @@ -165,6 +206,9 @@ {{ strategy.updated_at }} as {{ columns.dbt_updated_at }}, {{ strategy.updated_at }} as {{ columns.dbt_valid_from }}, {{ get_dbt_valid_to_current(strategy, columns) }} + {%- if strategy.hard_deletes == 'new_record' -%} + , 'False' as {{ columns.dbt_is_deleted }} + {% endif -%} from ( {{ sql }} ) sbq diff --git a/dbt/include/global_project/macros/materializations/snapshots/snapshot.sql b/dbt/include/global_project/macros/materializations/snapshots/snapshot.sql index 0c9590b6..683a0c58 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/snapshot.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/snapshot.sql @@ -37,7 +37,7 @@ {% set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() %} - {{ adapter.valid_snapshot_target(target_relation, columns) }} + {{ adapter.assert_valid_snapshot_target_given_strategy(target_relation, columns, strategy) }} {% set build_or_select_sql = snapshot_staging_table(strategy, sql, target_relation) %} {% set staging_table = build_snapshot_staging_table(strategy, sql, target_relation) %} diff --git a/dbt/include/global_project/macros/materializations/snapshots/strategies.sql b/dbt/include/global_project/macros/materializations/snapshots/strategies.sql index f9f5afbd..49a381e8 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/strategies.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/strategies.sql @@ -54,7 +54,8 @@ {# The model_config parameter is no longer used, but is passed in anyway for compatibility. #} {% set primary_key = config.get('unique_key') %} {% set updated_at = config.get('updated_at') %} - {% set invalidate_hard_deletes = config.get('invalidate_hard_deletes') or false %} + {% set hard_deletes = adapter.get_hard_deletes_behavior(config) %} + {% set invalidate_hard_deletes = hard_deletes == 'invalidate' %} {% set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() %} {#/* @@ -78,7 +79,8 @@ "updated_at": updated_at, "row_changed": row_changed_expr, "scd_id": scd_id_expr, - "invalidate_hard_deletes": invalidate_hard_deletes + "invalidate_hard_deletes": invalidate_hard_deletes, + "hard_deletes": hard_deletes }) %} {% endmacro %} @@ -141,7 +143,8 @@ {# The model_config parameter is no longer used, but is passed in anyway for compatibility. #} {% set check_cols_config = config.get('check_cols') %} {% set primary_key = config.get('unique_key') %} - {% set invalidate_hard_deletes = config.get('invalidate_hard_deletes') or false %} + {% set hard_deletes = adapter.get_hard_deletes_behavior(config) %} + {% set invalidate_hard_deletes = hard_deletes == 'invalidate' %} {% set updated_at = config.get('updated_at') or snapshot_get_time() %} {% set column_added = false %} @@ -175,6 +178,7 @@ "updated_at": updated_at, "row_changed": row_changed_expr, "scd_id": scd_id_expr, - "invalidate_hard_deletes": invalidate_hard_deletes + "invalidate_hard_deletes": invalidate_hard_deletes, + "hard_deletes": hard_deletes }) %} {% endmacro %}