From bdbe8694fd85aa5f59dbc005a29d5e72ba0d081a Mon Sep 17 00:00:00 2001 From: Peter Allen Webb Date: Thu, 26 Sep 2024 13:48:10 -0400 Subject: [PATCH 01/12] Add hard_deletes config and new_record option (not yet respected) for snapshots. --- .../materializations/snapshots/helpers.sql | 34 ++++++++++++++++--- .../materializations/snapshots/strategies.sql | 12 ++++--- 2 files changed, 38 insertions(+), 8 deletions(-) diff --git a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql index 8d982855..35f1f092 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql @@ -35,7 +35,29 @@ {% endmacro %} {% macro get_snapshot_table_column_names() %} - {{ return({'dbt_valid_to': 'dbt_valid_to', 'dbt_valid_from': 'dbt_valid_from', 'dbt_scd_id': 'dbt_scd_id', 'dbt_updated_at': 'dbt_updated_at'}) }} + {{ return({'dbt_valid_to': 'dbt_valid_to', 'dbt_valid_from': 'dbt_valid_from', 'dbt_scd_id': 'dbt_scd_id', 'dbt_updated_at': 'dbt_updated_at', 'dbt_is_deleted': 'dbt_is_deleted'}) }} +{% endmacro %} + +{# Check the hard_deletes config enum, and the legacy invalidate_hard_deletes + config flag in order to determine which behavior should be used for deleted + records in the current snapshot. The default is to ignore them. #} +{% macro get_hard_delete_behavior() %} + {% set invalidate_hard_deletes = config.get('invalidate_hard_deletes') %} + {% set hard_deletes = config.get('hard_deletes') %} + + {% if invalidate_hard_deletes is not none and hard_deletes is not none %} + {% do exceptions.raise_compiler_error("You cannot set both the invalidate_hard_deletes and hard_deletes config properties on the same snapshot.") %} + {% endif %} + + {% if invalidate_hard_deletes or hard_deletes == 'invalidate' %} + {{ return('invalidate') }} + {% elif hard_deletes == 'new_record' %} + {{ return('new_record') }} + {% elif hard_deletes is none or hard_deletes == 'ignore' %} + {{ return('ignore') }} + {% else %} + {% do exceptions.raise_compiler_error("Invalid setting for property hard_deletes.") %} + {% endif %} {% endmacro %} {% macro default__snapshot_staging_table(strategy, source_sql, target_relation) -%} @@ -82,7 +104,7 @@ from snapshot_query ), - {%- if strategy.invalidate_hard_deletes %} + {%- if strategy.hard_deletes == 'invalidate' %} deletes_source_data as ( @@ -125,7 +147,7 @@ ) ) - {%- if strategy.invalidate_hard_deletes -%} + {%- if strategy.hard_deletes == 'invalidate' %} , deletes as ( @@ -147,7 +169,7 @@ select * from insertions union all select * from updates - {%- if strategy.invalidate_hard_deletes %} + {%- if strategy.hard_deletes == 'invalidate' %} union all select * from deletes {%- endif %} @@ -167,6 +189,10 @@ {{ strategy.updated_at }} as {{ columns.dbt_updated_at }}, {{ strategy.updated_at }} as {{ columns.dbt_valid_from }}, nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as {{ columns.dbt_valid_to }} + {% if strategy.hard_deletes == 'new_record' -%} + , + {{ strategy.dbt_is_deleted }} as {{ columns.dbt_is_deleted }} + {%- endif -%} from ( {{ sql }} ) sbq diff --git a/dbt/include/global_project/macros/materializations/snapshots/strategies.sql b/dbt/include/global_project/macros/materializations/snapshots/strategies.sql index 8c086182..de8cf653 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/strategies.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/strategies.sql @@ -54,7 +54,8 @@ {# The model_config parameter is no longer used, but is passed in anyway for compatibility. #} {% set primary_key = config.get('unique_key') %} {% set updated_at = config.get('updated_at') %} - {% set invalidate_hard_deletes = config.get('invalidate_hard_deletes') or false %} + {% set hard_deletes = get_hard_delete_behavior() %} + {% set invalidate_hard_deletes = hard_deletes == 'invalidate' %} {% set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() %} {#/* @@ -77,7 +78,8 @@ "updated_at": updated_at, "row_changed": row_changed_expr, "scd_id": scd_id_expr, - "invalidate_hard_deletes": invalidate_hard_deletes + "invalidate_hard_deletes": invalidate_hard_deletes, + "hard_deletes": hard_deletes }) %} {% endmacro %} @@ -140,7 +142,8 @@ {# The model_config parameter is no longer used, but is passed in anyway for compatibility. #} {% set check_cols_config = config.get('check_cols') %} {% set primary_key = config.get('unique_key') %} - {% set invalidate_hard_deletes = config.get('invalidate_hard_deletes') or false %} + {% set hard_deletes = get_hard_delete_behavior() %} + {% set invalidate_hard_deletes = hard_deletes == 'invalidate' %} {% set updated_at = config.get('updated_at') or snapshot_get_time() %} {% set column_added = false %} @@ -173,6 +176,7 @@ "updated_at": updated_at, "row_changed": row_changed_expr, "scd_id": scd_id_expr, - "invalidate_hard_deletes": invalidate_hard_deletes + "invalidate_hard_deletes": invalidate_hard_deletes, + "hard_deletes": hard_deletes }) %} {% endmacro %} From e426185743e82228f6570d48173bef9c82856f7d Mon Sep 17 00:00:00 2001 From: Peter Allen Webb Date: Fri, 1 Nov 2024 19:00:34 -0400 Subject: [PATCH 02/12] More implementation of hard_deletes='new_record' snapshot mode --- dbt/adapters/base/impl.py | 4 +- .../materializations/snapshots/helpers.sql | 46 +++++++++++++++---- 2 files changed, 40 insertions(+), 10 deletions(-) diff --git a/dbt/adapters/base/impl.py b/dbt/adapters/base/impl.py index 41481535..405f1668 100644 --- a/dbt/adapters/base/impl.py +++ b/dbt/adapters/base/impl.py @@ -788,8 +788,8 @@ def valid_snapshot_target( columns = self.get_columns_in_relation(relation) names = set(c.name.lower() for c in columns) missing = [] - # Note: we're not checking dbt_updated_at here because it's not - # always present. + # Note: we're not checking dbt_updated_at or dbt_is_deleted here because they + # aren't always present. for column in ("dbt_scd_id", "dbt_valid_from", "dbt_valid_to"): desired = column_names[column] if column_names else column if desired not in names: diff --git a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql index 17566358..dbd1207e 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql @@ -104,7 +104,7 @@ from snapshot_query ), - {%- if strategy.hard_deletes == 'invalidate' %} + {%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %} deletes_source_data as ( @@ -118,6 +118,9 @@ select 'insert' as dbt_change_type, source_data.* + {%- if strategy.hard_deletes == 'new_record' -%}, + 'False' as {{ columns.dbt_is_deleted }} {# Implies I, II, or III #} + {%- endif %} from insertions_source_data as source_data left outer join snapshotted_data @@ -135,6 +138,9 @@ 'update' as dbt_change_type, source_data.*, snapshotted_data.{{ columns.dbt_scd_id }} + {%- if strategy.hard_deletes == 'new_record' -%}, + snapshotted_data.{{ columns.dbt_is_deleted }} + {%- endif %} from updates_source_data as source_data join snapshotted_data @@ -144,9 +150,8 @@ ) ) - {%- if strategy.hard_deletes == 'invalidate' %} + {%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %} , - deletes as ( select @@ -156,7 +161,28 @@ {{ snapshot_get_time() }} as {{ columns.dbt_updated_at }}, {{ snapshot_get_time() }} as {{ columns.dbt_valid_to }}, snapshotted_data.{{ columns.dbt_scd_id }} + {%- if strategy.hard_deletes == 'new_record' -%}, + snapshotted_data.{{ columns.dbt_is_deleted }} + {%- endif %} + from snapshotted_data + left join deletes_source_data as source_data + on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }} + where {{ unique_key_is_null(strategy.unique_key, "source_data") }} + ) + {%- endif %} + + {%- if strategy.hard_deletes == 'new_record' %} + , + deletion_records as ( + select + 'insert' as dbt_change_type, + source_data.*, + {{ snapshot_get_time() }} as {{ columns.dbt_valid_from }}, + {{ snapshot_get_time() }} as {{ columns.dbt_updated_at }}, + snapshotted_data.{{ columns.dbt_valid_to }} as {{ columns.dbt_valid_to }}, + snapshotted_data.{{ columns.dbt_scd_id }}, + 'True' as {{ columns.dbt_is_deleted }} from snapshotted_data left join deletes_source_data as source_data on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }} @@ -167,10 +193,15 @@ select * from insertions union all select * from updates - {%- if strategy.hard_deletes == 'invalidate' %} + {%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %} union all select * from deletes {%- endif %} + {%- if strategy.hard_deletes == 'new_record' %} + union all + select * from deletion_records + {%- endif %} + {%- endmacro %} @@ -187,10 +218,9 @@ {{ strategy.updated_at }} as {{ columns.dbt_updated_at }}, {{ strategy.updated_at }} as {{ columns.dbt_valid_from }}, {{ get_dbt_valid_to_current(strategy, columns) }} - {% if strategy.hard_deletes == 'new_record' -%} - , - {{ strategy.dbt_is_deleted }} as {{ columns.dbt_is_deleted }} - {%- endif -%} + {%- if strategy.hard_deletes == 'new_record' -%}, + 'False' as {{ columns.dbt_is_deleted }} + {% endif -%} from ( {{ sql }} ) sbq From b0e91e2f37e19521068a47fec60685db03169ec6 Mon Sep 17 00:00:00 2001 From: Peter Allen Webb Date: Sat, 2 Nov 2024 14:02:50 -0400 Subject: [PATCH 03/12] Complete implementation of new hard_delete mode. --- .../macros/materializations/snapshots/helpers.sql | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql index dbd1207e..273a1582 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql @@ -172,12 +172,22 @@ {%- endif %} {%- if strategy.hard_deletes == 'new_record' %} + {% set source_sql_cols = get_column_schema_from_query(source_sql) %} , deletion_records as ( select 'insert' as dbt_change_type, - source_data.*, + {%- for col in source_sql_cols -%} + snapshotted_data.{{ adapter.quote(col.column) }}, + {% endfor -%} + {%- if strategy.unique_key | is_list -%} + {%- for key in strategy.unique_key -%} + snapshotted_data.{{ key }} as dbt_unique_key_{{ loop.index }}, + {% endfor -%} + {%- else -%} + snapshotted_data.{{ strategy.unique_key }} as dbt_unique_key, + {% endif -%} {{ snapshot_get_time() }} as {{ columns.dbt_valid_from }}, {{ snapshot_get_time() }} as {{ columns.dbt_updated_at }}, snapshotted_data.{{ columns.dbt_valid_to }} as {{ columns.dbt_valid_to }}, From daba363e5722f972abcf2bdb80c4b31665d19397 Mon Sep 17 00:00:00 2001 From: Peter Allen Webb Date: Mon, 4 Nov 2024 11:54:13 -0500 Subject: [PATCH 04/12] Raise an exception if the user changes the snapshot mode to hard_deletes="new_record" from a different mode. --- .../materializations/snapshots/helpers.sql | 2 +- .../materializations/snapshots/snapshot.sql | 16 ++++++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql index 273a1582..b41a1986 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql @@ -119,7 +119,7 @@ 'insert' as dbt_change_type, source_data.* {%- if strategy.hard_deletes == 'new_record' -%}, - 'False' as {{ columns.dbt_is_deleted }} {# Implies I, II, or III #} + 'False' as {{ columns.dbt_is_deleted }} {%- endif %} from insertions_source_data as source_data diff --git a/dbt/include/global_project/macros/materializations/snapshots/snapshot.sql b/dbt/include/global_project/macros/materializations/snapshots/snapshot.sql index 0c9590b6..0c54ede6 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/snapshot.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/snapshot.sql @@ -39,6 +39,22 @@ {{ adapter.valid_snapshot_target(target_relation, columns) }} + {# Raise an exception if the user has selected the new_record mode for + hard deletes, but there is no dbt_is_deleted column in the target, + which would indicate it was created in a different mode. #} + {% if strategy.hard_deletes == 'new_record' %} + {% set target_cols = adapter.get_columns_in_relation(target_relation) %} + {% set ns = namespace(found_is_deleted_col = false) %} + {% for col in target_cols %} + {% if col.column == columns['dbt_is_deleted'] %} + {% set ns.found_is_deleted_col = true %} + {% endif %} + {% endfor %} + {% if not ns.found_is_deleted_col %} + {% do exceptions.raise_compiler_error('Did not find a dbt_is_deleted column in snapshot target. Changing the snapshot hard_deletes mode is not supported after a snapshot has been created.') %} + {% endif %} + {% endif %} + {% set build_or_select_sql = snapshot_staging_table(strategy, sql, target_relation) %} {% set staging_table = build_snapshot_staging_table(strategy, sql, target_relation) %} From 9f8c130cd652e2a2ec21f3b260ba9680a17c3d6a Mon Sep 17 00:00:00 2001 From: Peter Allen Webb Date: Mon, 4 Nov 2024 12:07:03 -0500 Subject: [PATCH 05/12] Add changelog entry. --- .changes/unreleased/Features-20241104-120653.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changes/unreleased/Features-20241104-120653.yaml diff --git a/.changes/unreleased/Features-20241104-120653.yaml b/.changes/unreleased/Features-20241104-120653.yaml new file mode 100644 index 00000000..a85e1f7f --- /dev/null +++ b/.changes/unreleased/Features-20241104-120653.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Add new hard_deletes="new_record" mode for snapshots. +time: 2024-11-04T12:06:53.225939-05:00 +custom: + Author: peterallenwebb + Issue: "317" From 72a04978ad39b57eac4d8190f5cf3033cb0923b7 Mon Sep 17 00:00:00 2001 From: Peter Allen Webb Date: Fri, 15 Nov 2024 11:52:51 -0500 Subject: [PATCH 06/12] Move pure logic into Python. --- dbt/adapters/base/impl.py | 23 +++++++++++++++++++ .../materializations/snapshots/helpers.sql | 22 ------------------ .../materializations/snapshots/strategies.sql | 4 ++-- 3 files changed, 25 insertions(+), 24 deletions(-) diff --git a/dbt/adapters/base/impl.py b/dbt/adapters/base/impl.py index f23bb5e4..becce4b2 100644 --- a/dbt/adapters/base/impl.py +++ b/dbt/adapters/base/impl.py @@ -1787,6 +1787,29 @@ def _get_adapter_specific_run_info(cls, config) -> Dict[str, Any]: """ return {} + @available.parse_none + @classmethod + def get_hard_deletes_behavior(cls, config): + """Check the hard_deletes config enum, and the legacy invalidate_hard_deletes + config flag in order to determine which behavior should be used for deleted + records in a snapshot. The default is to ignore them.""" + invalidate_hard_deletes = config.get("invalidate_hard_deletes", None) + hard_deletes = config.get("hard_deletes", None) + + if invalidate_hard_deletes is not None and hard_deletes is not None: + raise DbtValidationError( + "You cannot set both the invalidate_hard_deletes and hard_deletes config properties on the same snapshot." + ) + + if invalidate_hard_deletes or hard_deletes == "invalidate": + return "invalidate" + elif hard_deletes == "new_record": + return "new_record" + elif hard_deletes is None or hard_deletes == "ignore": + return "ignore" + + raise DbtValidationError("Invalid setting for property hard_deletes.") + COLUMNS_EQUAL_SQL = """ with diff_count as ( diff --git a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql index b41a1986..0edff4ed 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql @@ -38,28 +38,6 @@ {{ return({'dbt_valid_to': 'dbt_valid_to', 'dbt_valid_from': 'dbt_valid_from', 'dbt_scd_id': 'dbt_scd_id', 'dbt_updated_at': 'dbt_updated_at', 'dbt_is_deleted': 'dbt_is_deleted'}) }} {% endmacro %} -{# Check the hard_deletes config enum, and the legacy invalidate_hard_deletes - config flag in order to determine which behavior should be used for deleted - records in the current snapshot. The default is to ignore them. #} -{% macro get_hard_delete_behavior() %} - {% set invalidate_hard_deletes = config.get('invalidate_hard_deletes') %} - {% set hard_deletes = config.get('hard_deletes') %} - - {% if invalidate_hard_deletes is not none and hard_deletes is not none %} - {% do exceptions.raise_compiler_error("You cannot set both the invalidate_hard_deletes and hard_deletes config properties on the same snapshot.") %} - {% endif %} - - {% if invalidate_hard_deletes or hard_deletes == 'invalidate' %} - {{ return('invalidate') }} - {% elif hard_deletes == 'new_record' %} - {{ return('new_record') }} - {% elif hard_deletes is none or hard_deletes == 'ignore' %} - {{ return('ignore') }} - {% else %} - {% do exceptions.raise_compiler_error("Invalid setting for property hard_deletes.") %} - {% endif %} -{% endmacro %} - {% macro default__snapshot_staging_table(strategy, source_sql, target_relation) -%} {% set columns = config.get('snapshot_table_column_names') or get_snapshot_table_column_names() %} diff --git a/dbt/include/global_project/macros/materializations/snapshots/strategies.sql b/dbt/include/global_project/macros/materializations/snapshots/strategies.sql index ab68fdfc..49a381e8 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/strategies.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/strategies.sql @@ -54,7 +54,7 @@ {# The model_config parameter is no longer used, but is passed in anyway for compatibility. #} {% set primary_key = config.get('unique_key') %} {% set updated_at = config.get('updated_at') %} - {% set hard_deletes = get_hard_delete_behavior() %} + {% set hard_deletes = adapter.get_hard_deletes_behavior(config) %} {% set invalidate_hard_deletes = hard_deletes == 'invalidate' %} {% set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() %} @@ -143,7 +143,7 @@ {# The model_config parameter is no longer used, but is passed in anyway for compatibility. #} {% set check_cols_config = config.get('check_cols') %} {% set primary_key = config.get('unique_key') %} - {% set hard_deletes = get_hard_delete_behavior() %} + {% set hard_deletes = adapter.get_hard_deletes_behavior(config) %} {% set invalidate_hard_deletes = hard_deletes == 'invalidate' %} {% set updated_at = config.get('updated_at') or snapshot_get_time() %} From 9ad9ce56d1f2528c94795bb8e60359b7977c94cf Mon Sep 17 00:00:00 2001 From: Peter Allen Webb Date: Thu, 21 Nov 2024 17:00:19 -0500 Subject: [PATCH 07/12] Incorporate code-review tweaks to whitespace management and validation. --- dbt/adapters/base/impl.py | 30 +++++++++++++++++++ .../materializations/snapshots/helpers.sql | 16 +++++----- .../materializations/snapshots/snapshot.sql | 2 +- 3 files changed, 39 insertions(+), 9 deletions(-) diff --git a/dbt/adapters/base/impl.py b/dbt/adapters/base/impl.py index 31fc54dd..ae172635 100644 --- a/dbt/adapters/base/impl.py +++ b/dbt/adapters/base/impl.py @@ -206,6 +206,14 @@ class FreshnessResponse(TypedDict): age: float # age in seconds +class SnapshotStrategy(TypedDict): + unique_key: Optional[str] + updated_at: Optional[str] + row_changed: Optional[str] + scd_id: Optional[str] + hard_deletes: Optional[str] + + class BaseAdapter(metaclass=AdapterMeta): """The BaseAdapter provides an abstract base class for adapters. @@ -805,6 +813,28 @@ def valid_snapshot_target( if missing: raise SnapshotTargetNotSnapshotTableError(missing) + @available.parse_none + def assert_valid_snapshot_target_given_strategy( + self, relation: BaseRelation, column_names: Dict[str, str], strategy: SnapshotStrategy + ) -> None: + # Assert everything we can with the legacy function. + self.valid_snapshot_target(relation, column_names) + + # Now do strategy-specific checks. + # TODO: Make these checks more comprehensive. + if strategy.get("hard_deletes", None) == "new_record": + columns = self.get_columns_in_relation(relation) + names = set(c.name.lower() for c in columns) + missing = [] + + for column in ("dbt_is_deleted",): + desired = column_names[column] if column_names else column + if desired not in names: + missing.append(desired) + + if missing: + raise SnapshotTargetNotSnapshotTableError(missing) + @available.parse_none def expand_target_column_types( self, from_relation: BaseRelation, to_relation: BaseRelation diff --git a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql index 0edff4ed..4004c65d 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql @@ -96,8 +96,8 @@ select 'insert' as dbt_change_type, source_data.* - {%- if strategy.hard_deletes == 'new_record' -%}, - 'False' as {{ columns.dbt_is_deleted }} + {%- if strategy.hard_deletes == 'new_record' -%} + ,'False' as {{ columns.dbt_is_deleted }} {%- endif %} from insertions_source_data as source_data @@ -116,8 +116,8 @@ 'update' as dbt_change_type, source_data.*, snapshotted_data.{{ columns.dbt_scd_id }} - {%- if strategy.hard_deletes == 'new_record' -%}, - snapshotted_data.{{ columns.dbt_is_deleted }} + {%- if strategy.hard_deletes == 'new_record' -%} + , snapshotted_data.{{ columns.dbt_is_deleted }} {%- endif %} from updates_source_data as source_data @@ -139,8 +139,8 @@ {{ snapshot_get_time() }} as {{ columns.dbt_updated_at }}, {{ snapshot_get_time() }} as {{ columns.dbt_valid_to }}, snapshotted_data.{{ columns.dbt_scd_id }} - {%- if strategy.hard_deletes == 'new_record' -%}, - snapshotted_data.{{ columns.dbt_is_deleted }} + {%- if strategy.hard_deletes == 'new_record' -%} + , snapshotted_data.{{ columns.dbt_is_deleted }} {%- endif %} from snapshotted_data left join deletes_source_data as source_data @@ -206,8 +206,8 @@ {{ strategy.updated_at }} as {{ columns.dbt_updated_at }}, {{ strategy.updated_at }} as {{ columns.dbt_valid_from }}, {{ get_dbt_valid_to_current(strategy, columns) }} - {%- if strategy.hard_deletes == 'new_record' -%}, - 'False' as {{ columns.dbt_is_deleted }} + {%- if strategy.hard_deletes == 'new_record' -%} + , 'False' as {{ columns.dbt_is_deleted }} {% endif -%} from ( {{ sql }} diff --git a/dbt/include/global_project/macros/materializations/snapshots/snapshot.sql b/dbt/include/global_project/macros/materializations/snapshots/snapshot.sql index 0c54ede6..498fd8f0 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/snapshot.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/snapshot.sql @@ -37,7 +37,7 @@ {% set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() %} - {{ adapter.valid_snapshot_target(target_relation, columns) }} + {{ adapter.assert_valid_snapshot_target_given_strategy(target_relation, columns, strategy) }} {# Raise an exception if the user has selected the new_record mode for hard deletes, but there is no dbt_is_deleted column in the target, From a0afc9dd285023c627e685aeac9f776d88302375 Mon Sep 17 00:00:00 2001 From: Peter Allen Webb Date: Thu, 21 Nov 2024 17:07:53 -0500 Subject: [PATCH 08/12] Remove now-redundant check. --- .../materializations/snapshots/snapshot.sql | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/dbt/include/global_project/macros/materializations/snapshots/snapshot.sql b/dbt/include/global_project/macros/materializations/snapshots/snapshot.sql index 498fd8f0..683a0c58 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/snapshot.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/snapshot.sql @@ -39,22 +39,6 @@ {{ adapter.assert_valid_snapshot_target_given_strategy(target_relation, columns, strategy) }} - {# Raise an exception if the user has selected the new_record mode for - hard deletes, but there is no dbt_is_deleted column in the target, - which would indicate it was created in a different mode. #} - {% if strategy.hard_deletes == 'new_record' %} - {% set target_cols = adapter.get_columns_in_relation(target_relation) %} - {% set ns = namespace(found_is_deleted_col = false) %} - {% for col in target_cols %} - {% if col.column == columns['dbt_is_deleted'] %} - {% set ns.found_is_deleted_col = true %} - {% endif %} - {% endfor %} - {% if not ns.found_is_deleted_col %} - {% do exceptions.raise_compiler_error('Did not find a dbt_is_deleted column in snapshot target. Changing the snapshot hard_deletes mode is not supported after a snapshot has been created.') %} - {% endif %} - {% endif %} - {% set build_or_select_sql = snapshot_staging_table(strategy, sql, target_relation) %} {% set staging_table = build_snapshot_staging_table(strategy, sql, target_relation) %} From 8157a5a16050ff118738863363cb0dfdfa6780c7 Mon Sep 17 00:00:00 2001 From: Peter Allen Webb Date: Fri, 22 Nov 2024 11:34:20 -0500 Subject: [PATCH 09/12] Tweak column name. --- .../macros/materializations/snapshots/helpers.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql index 4004c65d..33492cc9 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql @@ -164,7 +164,7 @@ snapshotted_data.{{ key }} as dbt_unique_key_{{ loop.index }}, {% endfor -%} {%- else -%} - snapshotted_data.{{ strategy.unique_key }} as dbt_unique_key, + snapshotted_data.dbt_unique_key as dbt_unique_key, {% endif -%} {{ snapshot_get_time() }} as {{ columns.dbt_valid_from }}, {{ snapshot_get_time() }} as {{ columns.dbt_updated_at }}, From 2318519855bb1783816d0c12157e923036022924 Mon Sep 17 00:00:00 2001 From: Peter Allen Webb Date: Fri, 22 Nov 2024 14:09:38 -0500 Subject: [PATCH 10/12] Move test from core to dbt-adapters, where it can be more universally useful. --- .../simple_snapshot/new_record_mode.py | 210 ++++++++++++++++++ 1 file changed, 210 insertions(+) create mode 100644 dbt-tests-adapter/dbt/tests/adapter/simple_snapshot/new_record_mode.py diff --git a/dbt-tests-adapter/dbt/tests/adapter/simple_snapshot/new_record_mode.py b/dbt-tests-adapter/dbt/tests/adapter/simple_snapshot/new_record_mode.py new file mode 100644 index 00000000..30806fce --- /dev/null +++ b/dbt-tests-adapter/dbt/tests/adapter/simple_snapshot/new_record_mode.py @@ -0,0 +1,210 @@ +import os + +import pytest + +from dbt.tests.util import check_relations_equal, run_dbt + +seed_new_record_mode = """ +create table {database}.{schema}.seed ( + id INTEGER, + first_name VARCHAR(50), + last_name VARCHAR(50), + email VARCHAR(50), + gender VARCHAR(50), + ip_address VARCHAR(20), + updated_at TIMESTAMP WITHOUT TIME ZONE +); + +create table {database}.{schema}.snapshot_expected ( + id INTEGER, + first_name VARCHAR(50), + last_name VARCHAR(50), + email VARCHAR(50), + gender VARCHAR(50), + ip_address VARCHAR(20), + + -- snapshotting fields + updated_at TIMESTAMP WITHOUT TIME ZONE, + dbt_valid_from TIMESTAMP WITHOUT TIME ZONE, + dbt_valid_to TIMESTAMP WITHOUT TIME ZONE, + dbt_scd_id TEXT, + dbt_updated_at TIMESTAMP WITHOUT TIME ZONE, + dbt_is_deleted TEXT +); + + +-- seed inserts +-- use the same email for two users to verify that duplicated check_cols values +-- are handled appropriately +insert into {database}.{schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values +(1, 'Judith', 'Kennedy', '(not provided)', 'Female', '54.60.24.128', '2015-12-24 12:19:28'), +(2, 'Arthur', 'Kelly', '(not provided)', 'Male', '62.56.24.215', '2015-10-28 16:22:15'), +(3, 'Rachel', 'Moreno', 'rmoreno2@msu.edu', 'Female', '31.222.249.23', '2016-04-05 02:05:30'), +(4, 'Ralph', 'Turner', 'rturner3@hp.com', 'Male', '157.83.76.114', '2016-08-08 00:06:51'), +(5, 'Laura', 'Gonzales', 'lgonzales4@howstuffworks.com', 'Female', '30.54.105.168', '2016-09-01 08:25:38'), +(6, 'Katherine', 'Lopez', 'klopez5@yahoo.co.jp', 'Female', '169.138.46.89', '2016-08-30 18:52:11'), +(7, 'Jeremy', 'Hamilton', 'jhamilton6@mozilla.org', 'Male', '231.189.13.133', '2016-07-17 02:09:46'), +(8, 'Heather', 'Rose', 'hrose7@goodreads.com', 'Female', '87.165.201.65', '2015-12-29 22:03:56'), +(9, 'Gregory', 'Kelly', 'gkelly8@trellian.com', 'Male', '154.209.99.7', '2016-03-24 21:18:16'), +(10, 'Rachel', 'Lopez', 'rlopez9@themeforest.net', 'Female', '237.165.82.71', '2016-08-20 15:44:49'), +(11, 'Donna', 'Welch', 'dwelcha@shutterfly.com', 'Female', '103.33.110.138', '2016-02-27 01:41:48'), +(12, 'Russell', 'Lawrence', 'rlawrenceb@qq.com', 'Male', '189.115.73.4', '2016-06-11 03:07:09'), +(13, 'Michelle', 'Montgomery', 'mmontgomeryc@scientificamerican.com', 'Female', '243.220.95.82', '2016-06-18 16:27:19'), +(14, 'Walter', 'Castillo', 'wcastillod@pagesperso-orange.fr', 'Male', '71.159.238.196', '2016-10-06 01:55:44'), +(15, 'Robin', 'Mills', 'rmillse@vkontakte.ru', 'Female', '172.190.5.50', '2016-10-31 11:41:21'), +(16, 'Raymond', 'Holmes', 'rholmesf@usgs.gov', 'Male', '148.153.166.95', '2016-10-03 08:16:38'), +(17, 'Gary', 'Bishop', 'gbishopg@plala.or.jp', 'Male', '161.108.182.13', '2016-08-29 19:35:20'), +(18, 'Anna', 'Riley', 'arileyh@nasa.gov', 'Female', '253.31.108.22', '2015-12-11 04:34:27'), +(19, 'Sarah', 'Knight', 'sknighti@foxnews.com', 'Female', '222.220.3.177', '2016-09-26 00:49:06'), +(20, 'Phyllis', 'Fox', null, 'Female', '163.191.232.95', '2016-08-21 10:35:19'); + + +-- populate snapshot table +insert into {database}.{schema}.snapshot_expected ( + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + dbt_valid_from, + dbt_valid_to, + dbt_updated_at, + dbt_scd_id, + dbt_is_deleted +) + +select + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + -- fields added by snapshotting + updated_at as dbt_valid_from, + null::timestamp as dbt_valid_to, + updated_at as dbt_updated_at, + md5(id || '-' || first_name || '|' || updated_at::text) as dbt_scd_id, + 'False' as dbt_is_deleted +from {database}.{schema}.seed; +""" + +snapshot_actual_sql = """ +{% snapshot snapshot_actual %} + + {{ + config( + unique_key='id || ' ~ "'-'" ~ ' || first_name', + ) + }} + + select * from {{target.database}}.{{target.schema}}.seed + +{% endsnapshot %} +""" + +snapshots_yml = """ +snapshots: + - name: snapshot_actual + config: + strategy: timestamp + updated_at: updated_at + hard_deletes: new_record +""" + +ref_snapshot_sql = """ +select * from {{ ref('snapshot_actual') }} +""" + + +invalidate_sql = """ +-- update records 11 - 21. Change email and updated_at field +update {schema}.seed set + updated_at = updated_at + interval '1 hour', + email = case when id = 20 then 'pfoxj@creativecommons.org' else 'new_' || email end +where id >= 10 and id <= 20; + + +-- invalidate records 11 - 21 +update {schema}.snapshot_expected set + dbt_valid_to = updated_at + interval '1 hour' +where id >= 10 and id <= 20; + +""" + +update_sql = """ +-- insert v2 of the 11 - 21 records + +insert into {database}.{schema}.snapshot_expected ( + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + dbt_valid_from, + dbt_valid_to, + dbt_updated_at, + dbt_scd_id, + dbt_is_deleted +) + +select + id, + first_name, + last_name, + email, + gender, + ip_address, + updated_at, + -- fields added by snapshotting + updated_at as dbt_valid_from, + null::timestamp as dbt_valid_to, + updated_at as dbt_updated_at, + md5(id || '-' || first_name || '|' || updated_at::text) as dbt_scd_id, + 'False' as dbt_is_deleted +from {database}.{schema}.seed +where id >= 10 and id <= 20; +""" + +delete_sql = """ +delete from {schema}.seed where id = 1 +""" + + +class SnapshotNewRecordMode: + @pytest.fixture(scope="class") + def snapshots(self): + return {"snapshot.sql": snapshot_actual_sql} + + @pytest.fixture(scope="class") + def models(self): + return { + "snapshots.yml": snapshots_yml, + "ref_snapshot.sql": ref_snapshot_sql, + } + + def test_snapshot_new_record_mode(self, project): + path = os.path.join(project.test_data_dir, "seed_new_record_mode.sql") + project.run_sql(seed_new_record_mode) + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + project.run_sql(invalidate_sql) + project.run_sql(update_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + check_relations_equal(project.adapter, ["snapshot_actual", "snapshot_expected"]) + + project.run_sql(delete_sql) + + results = run_dbt(["snapshot"]) + assert len(results) == 1 + + # TODO: Further validate results. From 84062ae1ecd3fce6241f576758cc07113d039342 Mon Sep 17 00:00:00 2001 From: Peter Allen Webb Date: Fri, 22 Nov 2024 14:43:58 -0500 Subject: [PATCH 11/12] Make test more flexible for downstream implementors. --- .../simple_snapshot/new_record_mode.py | 43 ++++++++++++------- 1 file changed, 28 insertions(+), 15 deletions(-) diff --git a/dbt-tests-adapter/dbt/tests/adapter/simple_snapshot/new_record_mode.py b/dbt-tests-adapter/dbt/tests/adapter/simple_snapshot/new_record_mode.py index 30806fce..8b4048cf 100644 --- a/dbt-tests-adapter/dbt/tests/adapter/simple_snapshot/new_record_mode.py +++ b/dbt-tests-adapter/dbt/tests/adapter/simple_snapshot/new_record_mode.py @@ -1,10 +1,8 @@ -import os - import pytest from dbt.tests.util import check_relations_equal, run_dbt -seed_new_record_mode = """ +_seed_new_record_mode = """ create table {database}.{schema}.seed ( id INTEGER, first_name VARCHAR(50), @@ -92,7 +90,7 @@ from {database}.{schema}.seed; """ -snapshot_actual_sql = """ +_snapshot_actual_sql = """ {% snapshot snapshot_actual %} {{ @@ -106,7 +104,7 @@ {% endsnapshot %} """ -snapshots_yml = """ +_snapshots_yml = """ snapshots: - name: snapshot_actual config: @@ -115,12 +113,12 @@ hard_deletes: new_record """ -ref_snapshot_sql = """ +_ref_snapshot_sql = """ select * from {{ ref('snapshot_actual') }} """ -invalidate_sql = """ +_invalidate_sql = """ -- update records 11 - 21. Change email and updated_at field update {schema}.seed set updated_at = updated_at + interval '1 hour', @@ -135,7 +133,7 @@ """ -update_sql = """ +_update_sql = """ -- insert v2 of the 11 - 21 records insert into {database}.{schema}.snapshot_expected ( @@ -171,7 +169,7 @@ where id >= 10 and id <= 20; """ -delete_sql = """ +_delete_sql = """ delete from {schema}.seed where id = 1 """ @@ -179,17 +177,32 @@ class SnapshotNewRecordMode: @pytest.fixture(scope="class") def snapshots(self): - return {"snapshot.sql": snapshot_actual_sql} + return {"snapshot.sql": _snapshot_actual_sql} @pytest.fixture(scope="class") def models(self): return { - "snapshots.yml": snapshots_yml, - "ref_snapshot.sql": ref_snapshot_sql, + "snapshots.yml": _snapshots_yml, + "ref_snapshot.sql": _ref_snapshot_sql, } - def test_snapshot_new_record_mode(self, project): - path = os.path.join(project.test_data_dir, "seed_new_record_mode.sql") + @pytest.fixture(scope="class") + def seed_new_record_mode(self): + return _seed_new_record_mode + + @pytest.fixture(scope="class") + def invalidate_sql(self): + return _invalidate_sql + + @pytest.fixture(scope="class") + def update_sql(self): + return _update_sql + + @pytest.fixture(scope="class") + def delete_sql(self): + return _delete_sql + + def test_snapshot_new_record_mode(self, project, seed_new_record_mode, invalidate_sql, update_sql): project.run_sql(seed_new_record_mode) results = run_dbt(["snapshot"]) assert len(results) == 1 @@ -202,7 +215,7 @@ def test_snapshot_new_record_mode(self, project): check_relations_equal(project.adapter, ["snapshot_actual", "snapshot_expected"]) - project.run_sql(delete_sql) + project.run_sql(_delete_sql) results = run_dbt(["snapshot"]) assert len(results) == 1 From 5ea46b51ed45ae08159d0d666f79e3bfdd840827 Mon Sep 17 00:00:00 2001 From: Peter Allen Webb Date: Fri, 22 Nov 2024 14:52:36 -0500 Subject: [PATCH 12/12] Fix whitespace. --- .../simple_snapshot/new_record_mode.py | 44 ++++++++++--------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/dbt-tests-adapter/dbt/tests/adapter/simple_snapshot/new_record_mode.py b/dbt-tests-adapter/dbt/tests/adapter/simple_snapshot/new_record_mode.py index 8b4048cf..c50f0ff9 100644 --- a/dbt-tests-adapter/dbt/tests/adapter/simple_snapshot/new_record_mode.py +++ b/dbt-tests-adapter/dbt/tests/adapter/simple_snapshot/new_record_mode.py @@ -4,29 +4,29 @@ _seed_new_record_mode = """ create table {database}.{schema}.seed ( - id INTEGER, - first_name VARCHAR(50), - last_name VARCHAR(50), - email VARCHAR(50), - gender VARCHAR(50), - ip_address VARCHAR(20), - updated_at TIMESTAMP WITHOUT TIME ZONE + id INTEGER, + first_name VARCHAR(50), + last_name VARCHAR(50), + email VARCHAR(50), + gender VARCHAR(50), + ip_address VARCHAR(20), + updated_at TIMESTAMP WITHOUT TIME ZONE ); create table {database}.{schema}.snapshot_expected ( - id INTEGER, - first_name VARCHAR(50), - last_name VARCHAR(50), - email VARCHAR(50), - gender VARCHAR(50), - ip_address VARCHAR(20), - - -- snapshotting fields - updated_at TIMESTAMP WITHOUT TIME ZONE, - dbt_valid_from TIMESTAMP WITHOUT TIME ZONE, - dbt_valid_to TIMESTAMP WITHOUT TIME ZONE, - dbt_scd_id TEXT, - dbt_updated_at TIMESTAMP WITHOUT TIME ZONE, + id INTEGER, + first_name VARCHAR(50), + last_name VARCHAR(50), + email VARCHAR(50), + gender VARCHAR(50), + ip_address VARCHAR(20), + + -- snapshotting fields + updated_at TIMESTAMP WITHOUT TIME ZONE, + dbt_valid_from TIMESTAMP WITHOUT TIME ZONE, + dbt_valid_to TIMESTAMP WITHOUT TIME ZONE, + dbt_scd_id TEXT, + dbt_updated_at TIMESTAMP WITHOUT TIME ZONE, dbt_is_deleted TEXT ); @@ -202,7 +202,9 @@ def update_sql(self): def delete_sql(self): return _delete_sql - def test_snapshot_new_record_mode(self, project, seed_new_record_mode, invalidate_sql, update_sql): + def test_snapshot_new_record_mode( + self, project, seed_new_record_mode, invalidate_sql, update_sql + ): project.run_sql(seed_new_record_mode) results = run_dbt(["snapshot"]) assert len(results) == 1