Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add hard_deletes config and new_record Option for Snapshots #317

Merged
merged 15 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20241104-120653.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Add new hard_deletes="new_record" mode for snapshots.
time: 2024-11-04T12:06:53.225939-05:00
custom:
Author: peterallenwebb
Issue: "317"
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
import pytest

from dbt.tests.util import check_relations_equal, run_dbt

_seed_new_record_mode = """
create table {database}.{schema}.seed (
id INTEGER,
first_name VARCHAR(50),
last_name VARCHAR(50),
email VARCHAR(50),
gender VARCHAR(50),
ip_address VARCHAR(20),
updated_at TIMESTAMP WITHOUT TIME ZONE
);
create table {database}.{schema}.snapshot_expected (
id INTEGER,
first_name VARCHAR(50),
last_name VARCHAR(50),
email VARCHAR(50),
gender VARCHAR(50),
ip_address VARCHAR(20),
-- snapshotting fields
updated_at TIMESTAMP WITHOUT TIME ZONE,
dbt_valid_from TIMESTAMP WITHOUT TIME ZONE,
dbt_valid_to TIMESTAMP WITHOUT TIME ZONE,
dbt_scd_id TEXT,
dbt_updated_at TIMESTAMP WITHOUT TIME ZONE,
dbt_is_deleted TEXT
);
-- seed inserts
-- use the same email for two users to verify that duplicated check_cols values
-- are handled appropriately
insert into {database}.{schema}.seed (id, first_name, last_name, email, gender, ip_address, updated_at) values
(1, 'Judith', 'Kennedy', '(not provided)', 'Female', '54.60.24.128', '2015-12-24 12:19:28'),
(2, 'Arthur', 'Kelly', '(not provided)', 'Male', '62.56.24.215', '2015-10-28 16:22:15'),
(3, 'Rachel', 'Moreno', '[email protected]', 'Female', '31.222.249.23', '2016-04-05 02:05:30'),
(4, 'Ralph', 'Turner', '[email protected]', 'Male', '157.83.76.114', '2016-08-08 00:06:51'),
(5, 'Laura', 'Gonzales', '[email protected]', 'Female', '30.54.105.168', '2016-09-01 08:25:38'),
(6, 'Katherine', 'Lopez', '[email protected]', 'Female', '169.138.46.89', '2016-08-30 18:52:11'),
(7, 'Jeremy', 'Hamilton', '[email protected]', 'Male', '231.189.13.133', '2016-07-17 02:09:46'),
(8, 'Heather', 'Rose', '[email protected]', 'Female', '87.165.201.65', '2015-12-29 22:03:56'),
(9, 'Gregory', 'Kelly', '[email protected]', 'Male', '154.209.99.7', '2016-03-24 21:18:16'),
(10, 'Rachel', 'Lopez', '[email protected]', 'Female', '237.165.82.71', '2016-08-20 15:44:49'),
(11, 'Donna', 'Welch', '[email protected]', 'Female', '103.33.110.138', '2016-02-27 01:41:48'),
(12, 'Russell', 'Lawrence', '[email protected]', 'Male', '189.115.73.4', '2016-06-11 03:07:09'),
(13, 'Michelle', 'Montgomery', '[email protected]', 'Female', '243.220.95.82', '2016-06-18 16:27:19'),
(14, 'Walter', 'Castillo', '[email protected]', 'Male', '71.159.238.196', '2016-10-06 01:55:44'),
(15, 'Robin', 'Mills', '[email protected]', 'Female', '172.190.5.50', '2016-10-31 11:41:21'),
(16, 'Raymond', 'Holmes', '[email protected]', 'Male', '148.153.166.95', '2016-10-03 08:16:38'),
(17, 'Gary', 'Bishop', '[email protected]', 'Male', '161.108.182.13', '2016-08-29 19:35:20'),
(18, 'Anna', 'Riley', '[email protected]', 'Female', '253.31.108.22', '2015-12-11 04:34:27'),
(19, 'Sarah', 'Knight', '[email protected]', 'Female', '222.220.3.177', '2016-09-26 00:49:06'),
(20, 'Phyllis', 'Fox', null, 'Female', '163.191.232.95', '2016-08-21 10:35:19');
-- populate snapshot table
insert into {database}.{schema}.snapshot_expected (
id,
first_name,
last_name,
email,
gender,
ip_address,
updated_at,
dbt_valid_from,
dbt_valid_to,
dbt_updated_at,
dbt_scd_id,
dbt_is_deleted
)
select
id,
first_name,
last_name,
email,
gender,
ip_address,
updated_at,
-- fields added by snapshotting
updated_at as dbt_valid_from,
null::timestamp as dbt_valid_to,
updated_at as dbt_updated_at,
md5(id || '-' || first_name || '|' || updated_at::text) as dbt_scd_id,
'False' as dbt_is_deleted
from {database}.{schema}.seed;
"""

_snapshot_actual_sql = """
{% snapshot snapshot_actual %}
{{
config(
unique_key='id || ' ~ "'-'" ~ ' || first_name',
)
}}
select * from {{target.database}}.{{target.schema}}.seed
{% endsnapshot %}
"""

_snapshots_yml = """
snapshots:
- name: snapshot_actual
config:
strategy: timestamp
updated_at: updated_at
hard_deletes: new_record
"""

_ref_snapshot_sql = """
select * from {{ ref('snapshot_actual') }}
"""


_invalidate_sql = """
-- update records 11 - 21. Change email and updated_at field
update {schema}.seed set
updated_at = updated_at + interval '1 hour',
email = case when id = 20 then '[email protected]' else 'new_' || email end
where id >= 10 and id <= 20;
-- invalidate records 11 - 21
update {schema}.snapshot_expected set
dbt_valid_to = updated_at + interval '1 hour'
where id >= 10 and id <= 20;
"""

_update_sql = """
-- insert v2 of the 11 - 21 records
insert into {database}.{schema}.snapshot_expected (
id,
first_name,
last_name,
email,
gender,
ip_address,
updated_at,
dbt_valid_from,
dbt_valid_to,
dbt_updated_at,
dbt_scd_id,
dbt_is_deleted
)
select
id,
first_name,
last_name,
email,
gender,
ip_address,
updated_at,
-- fields added by snapshotting
updated_at as dbt_valid_from,
null::timestamp as dbt_valid_to,
updated_at as dbt_updated_at,
md5(id || '-' || first_name || '|' || updated_at::text) as dbt_scd_id,
'False' as dbt_is_deleted
from {database}.{schema}.seed
where id >= 10 and id <= 20;
"""

_delete_sql = """
delete from {schema}.seed where id = 1
"""


class SnapshotNewRecordMode:
@pytest.fixture(scope="class")
def snapshots(self):
return {"snapshot.sql": _snapshot_actual_sql}

@pytest.fixture(scope="class")
def models(self):
return {
"snapshots.yml": _snapshots_yml,
"ref_snapshot.sql": _ref_snapshot_sql,
}

@pytest.fixture(scope="class")
def seed_new_record_mode(self):
return _seed_new_record_mode

@pytest.fixture(scope="class")
def invalidate_sql(self):
return _invalidate_sql

@pytest.fixture(scope="class")
def update_sql(self):
return _update_sql

@pytest.fixture(scope="class")
def delete_sql(self):
return _delete_sql

def test_snapshot_new_record_mode(
self, project, seed_new_record_mode, invalidate_sql, update_sql
):
project.run_sql(seed_new_record_mode)
results = run_dbt(["snapshot"])
assert len(results) == 1

project.run_sql(invalidate_sql)
project.run_sql(update_sql)

results = run_dbt(["snapshot"])
assert len(results) == 1

check_relations_equal(project.adapter, ["snapshot_actual", "snapshot_expected"])

project.run_sql(_delete_sql)

results = run_dbt(["snapshot"])
assert len(results) == 1

# TODO: Further validate results.
57 changes: 55 additions & 2 deletions dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,14 @@ class FreshnessResponse(TypedDict):
age: float # age in seconds


class SnapshotStrategy(TypedDict):
unique_key: Optional[str]
updated_at: Optional[str]
row_changed: Optional[str]
scd_id: Optional[str]
hard_deletes: Optional[str]


class BaseAdapter(metaclass=AdapterMeta):
"""The BaseAdapter provides an abstract base class for adapters.
Expand Down Expand Up @@ -795,8 +803,8 @@ def valid_snapshot_target(
columns = self.get_columns_in_relation(relation)
names = set(c.name.lower() for c in columns)
missing = []
# Note: we're not checking dbt_updated_at here because it's not
# always present.
# Note: we're not checking dbt_updated_at or dbt_is_deleted here because they
peterallenwebb marked this conversation as resolved.
Show resolved Hide resolved
# aren't always present.
for column in ("dbt_scd_id", "dbt_valid_from", "dbt_valid_to"):
desired = column_names[column] if column_names else column
if desired not in names:
Expand All @@ -805,6 +813,28 @@ def valid_snapshot_target(
if missing:
raise SnapshotTargetNotSnapshotTableError(missing)

@available.parse_none
def assert_valid_snapshot_target_given_strategy(
self, relation: BaseRelation, column_names: Dict[str, str], strategy: SnapshotStrategy
) -> None:
# Assert everything we can with the legacy function.
self.valid_snapshot_target(relation, column_names)

# Now do strategy-specific checks.
# TODO: Make these checks more comprehensive.
if strategy.get("hard_deletes", None) == "new_record":
columns = self.get_columns_in_relation(relation)
names = set(c.name.lower() for c in columns)
missing = []

for column in ("dbt_is_deleted",):
desired = column_names[column] if column_names else column
if desired not in names:
missing.append(desired)

if missing:
raise SnapshotTargetNotSnapshotTableError(missing)

@available.parse_none
def expand_target_column_types(
self, from_relation: BaseRelation, to_relation: BaseRelation
Expand Down Expand Up @@ -1795,6 +1825,29 @@ def _get_adapter_specific_run_info(cls, config) -> Dict[str, Any]:
"""
return {}

@available.parse_none
@classmethod
def get_hard_deletes_behavior(cls, config):
"""Check the hard_deletes config enum, and the legacy invalidate_hard_deletes
config flag in order to determine which behavior should be used for deleted
records in a snapshot. The default is to ignore them."""
invalidate_hard_deletes = config.get("invalidate_hard_deletes", None)
hard_deletes = config.get("hard_deletes", None)

if invalidate_hard_deletes is not None and hard_deletes is not None:
peterallenwebb marked this conversation as resolved.
Show resolved Hide resolved
raise DbtValidationError(
"You cannot set both the invalidate_hard_deletes and hard_deletes config properties on the same snapshot."
)

if invalidate_hard_deletes or hard_deletes == "invalidate":
return "invalidate"
elif hard_deletes == "new_record":
return "new_record"
elif hard_deletes is None or hard_deletes == "ignore":
return "ignore"

raise DbtValidationError("Invalid setting for property hard_deletes.")


COLUMNS_EQUAL_SQL = """
with diff_count as (
Expand Down
Loading
Loading