diff --git a/sqlmesh/core/plan/builder.py b/sqlmesh/core/plan/builder.py index a4858bd64..479e37996 100644 --- a/sqlmesh/core/plan/builder.py +++ b/sqlmesh/core/plan/builder.py @@ -369,7 +369,7 @@ def is_restateable_snapshot(snapshot: Snapshot) -> bool: # we need to ensure the whole affected day in Model B is restated floored_snapshot_start = snapshot.node.interval_unit.cron_floor(snapshot_start) floored_snapshot_end = snapshot.node.interval_unit.cron_floor(snapshot_end) - if floored_snapshot_end <= floored_snapshot_start: + if to_timestamp(floored_snapshot_end) < snapshot_end: snapshot_start = to_timestamp(floored_snapshot_start) snapshot_end = to_timestamp( snapshot.node.interval_unit.cron_next(floored_snapshot_end) diff --git a/tests/core/test_integration.py b/tests/core/test_integration.py index b58d988dc..e566f2524 100644 --- a/tests/core/test_integration.py +++ b/tests/core/test_integration.py @@ -1745,6 +1745,39 @@ def _dates_in_table(table_name: str) -> t.List[str]: "2024-01-02 00:30:00", ], f"Table {tbl} wasnt cleared" + # Put some data + df = pd.DataFrame( + { + "account_id": [1001, 1002, 1003, 1004], + "ts": [ + "2024-01-01 01:30:00", + "2024-01-01 23:30:00", + "2024-01-02 03:30:00", + "2024-01-03 12:30:00", + ], + } + ) + engine_adapter.replace_query( + table_name=external_table, query_or_df=df, columns_to_types=columns_to_types + ) + + # Restate A across a day boundary with the expectation that two day intervals in B are affected + ctx.plan( + restate_models=["test.a"], + start="2024-01-01 02:00:00", + end="2024-01-02 04:00:00", + auto_apply=True, + no_prompts=True, + ) + + for tbl in ["test.a", "test.b"]: + assert _dates_in_table(tbl) == [ + "2024-01-01 00:30:00", # present already + # "2024-01-01 02:30:00", #removed in last restatement + "2024-01-01 23:30:00", # added in last restatement + "2024-01-02 03:30:00", # added in last restatement + ], f"Table {tbl} wasnt cleared" + def test_restatement_plan_clears_correct_intervals_across_environments(tmp_path: Path): model1 = """