Skip to content

Commit

Permalink
MM-60600: move delete logic to nightly only (#1654)
Browse files Browse the repository at this point in the history
* MM0-60600: move delete logic to nightly only

* Fix tests

* Fix linter issues
  • Loading branch information
ifoukarakis authored Nov 5, 2024
1 parent 2691e11 commit 0beaea4
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 5 deletions.
25 changes: 21 additions & 4 deletions tests/utils/db/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ def test_should_merge_table_with_same_structure(mocker, base_table, delta_table_

# THEN: expect a merge statement according to the spec of the tables
assert (
mock_exec.call_args[0][0].text == 'MERGE INTO base_schema.base_table USING delta_schema.delta_table '
mock_exec.call_args_list[1].args[0].text == 'MERGE INTO base_schema.base_table USING delta_schema.delta_table '
'ON base_schema.base_table.id = delta_schema.delta_table.id '
'AND base_schema.base_table.received_at >= :first_duplicate_date '
'WHEN MATCHED THEN UPDATE SET after = base_table.after '
Expand All @@ -291,7 +291,15 @@ def test_should_merge_table_with_same_structure(mocker, base_table, delta_table_
)

# THEN: expect the merge statement to have the correct parameters
assert mock_exec.call_args[0][0].compile().params == {"first_duplicate_date": "2022-12-21T23:55:59"}
assert mock_exec.call_args_list[1].args[0].compile().params == {"first_duplicate_date": "2022-12-21T23:55:59"}
# THEN: expect a delete statement
assert (
mock_exec.call_args_list[2].args[0].text == 'DELETE FROM delta_schema.delta_table '
'WHERE id IN (SELECT id FROM base_schema.base_table WHERE received_at >= :first_duplicate_date)'
)

# THEN: expect the delete statement to have the correct parameters
assert mock_exec.call_args_list[2].args[0].compile().params == {"first_duplicate_date": "2022-12-21T23:55:59"}


def test_should_not_merge_table_if_delta_empty(mocker, base_table, delta_table_1):
Expand Down Expand Up @@ -354,7 +362,7 @@ def test_should_add_new_columns_and_merge_table(mocker, base_table, delta_table_

# THEN: expect a merge statement according to the spec of the tables
assert (
mock_exec.call_args[0][0].text == 'MERGE INTO base_schema.base_table USING delta_schema.delta_table '
mock_exec.call_args_list[2].args[0].text == 'MERGE INTO base_schema.base_table USING delta_schema.delta_table '
'ON base_schema.base_table.id = delta_schema.delta_table.id '
'AND base_schema.base_table.received_at >= :first_duplicate_date '
'WHEN MATCHED THEN UPDATE SET after = base_table.after '
Expand All @@ -364,7 +372,16 @@ def test_should_add_new_columns_and_merge_table(mocker, base_table, delta_table_
)

# THEN: expect the merge statement to have the correct parameters
assert mock_exec.call_args[0][0].compile().params == {"first_duplicate_date": "2022-12-21T23:55:59"}
assert mock_exec.call_args_list[2].args[0].compile().params == {"first_duplicate_date": "2022-12-21T23:55:59"}

# THEN: expect a delete statement
assert (
mock_exec.call_args_list[3].args[0].text == 'DELETE FROM delta_schema.delta_table '
'WHERE id IN (SELECT id FROM base_schema.base_table WHERE received_at >= :first_duplicate_date)'
)

# THEN: expect the delete statement to have the correct parameters
assert mock_exec.call_args_list[3].args[0].compile().params == {"first_duplicate_date": "2022-12-21T23:55:59"}


def test_load_query(sqlalchemy_memory_engine, test_data):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
'cluster_by': ['to_date(received_at)'],
'on_schema_change': 'append_new_columns',
'snowflake_warehouse': 'transform_l',
'post_hook': 'delete from {{this}} where id in (select id from {{ source(\'rudder_support\', \'base_events\') }} where received_at > dateadd(day, -5, current_date))'
})
}}

Expand Down
8 changes: 8 additions & 0 deletions utils/db/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,14 @@ def merge_event_delta_table_into(
stmt = text(merge.__repr__()).bindparams(first_duplicate_date=first_duplicate_date)
conn.execute(stmt)

# Delete rows from delta table that were merged
conn.execute(
text(
f'DELETE FROM {delta_schema}.{delta_table} '
f'WHERE id IN (SELECT id FROM {base_schema}.{base_table} WHERE received_at >= :first_duplicate_date)'
).bindparams(first_duplicate_date=first_duplicate_date)
)


def load_query(conn: Connection, query: str) -> pd.DataFrame:
"""
Expand Down

0 comments on commit 0beaea4

Please sign in to comment.