diff --git a/sqlmesh/core/context.py b/sqlmesh/core/context.py index f0e3109ee..4035ad7d3 100644 --- a/sqlmesh/core/context.py +++ b/sqlmesh/core/context.py @@ -1278,7 +1278,7 @@ def plan_builder( backfill_models = { *context_diff.modified_snapshots, *[s.name for s in context_diff.added], - } + } or None # If no end date is specified, use the max interval end from prod # to prevent unintended evaluation of the entire DAG. diff --git a/tests/core/test_integration.py b/tests/core/test_integration.py index 639b7d757..fc63543ad 100644 --- a/tests/core/test_integration.py +++ b/tests/core/test_integration.py @@ -2251,6 +2251,141 @@ def _dates_in_table(table_name: str) -> t.List[str]: assert _dates_in_table("test__dev.b") == ["2024-01-02 00:30:00"] +def test_prod_restatement_plan_causes_dev_intervals_to_be_processed_in_next_dev_plan( + tmp_path: Path, +): + """ + Scenario: + I have a model A[hourly] in prod + I create dev and add a model B[daily] + I prod, I restate *one hour* of A + In dev, I run a normal plan instead of a cadence run + + Outcome: + The whole day for B should be restated as part of a normal plan + """ + + model_a = """ + MODEL ( + name test.a, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column "ts" + ), + start '2024-01-01 00:00:00', + cron '@hourly' + ); + + select account_id, ts from test.external_table; + """ + + model_b = """ + MODEL ( + name test.b, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column ts + ), + cron '@daily' + ); + + select account_id, ts from test.a where ts between @start_dt and @end_dt; + """ + + models_dir = tmp_path / "models" + models_dir.mkdir() + + with open(models_dir / "a.sql", "w") as f: + f.write(model_a) + + config = Config(model_defaults=ModelDefaultsConfig(dialect="duckdb")) + ctx = Context(paths=[tmp_path], config=config) + + engine_adapter = ctx.engine_adapter + engine_adapter.create_schema("test") + + # source data + df = pd.DataFrame( + { + "account_id": [1001, 1002, 1003, 1004], + "ts": [ + "2024-01-01 00:30:00", + "2024-01-01 01:30:00", + "2024-01-01 02:30:00", + "2024-01-02 00:30:00", + ], + } + ) + columns_to_types = { + "account_id": exp.DataType.build("int"), + "ts": exp.DataType.build("timestamp"), + } + external_table = exp.table_(table="external_table", db="test", quoted=True) + engine_adapter.create_table(table_name=external_table, columns_to_types=columns_to_types) + engine_adapter.insert_append( + table_name=external_table, query_or_df=df, columns_to_types=columns_to_types + ) + + # plan + apply A[hourly] in prod + ctx.plan(auto_apply=True, no_prompts=True) + + # add B[daily] in dev + with open(models_dir / "b.sql", "w") as f: + f.write(model_b) + + # plan + apply dev + ctx.load() + ctx.plan(environment="dev", auto_apply=True, no_prompts=True) + + def _dates_in_table(table_name: str) -> t.List[str]: + return [ + str(r[0]) for r in engine_adapter.fetchall(f"select ts from {table_name} order by ts") + ] + + # verify initial state + for tbl in ["test.a", "test__dev.b"]: + assert _dates_in_table(tbl) == [ + "2024-01-01 00:30:00", + "2024-01-01 01:30:00", + "2024-01-01 02:30:00", + "2024-01-02 00:30:00", + ] + + # restate A in prod + engine_adapter.execute("delete from test.external_table where ts = '2024-01-01 01:30:00'") + ctx.plan( + restate_models=["test.a"], + start="2024-01-01 01:00:00", + end="2024-01-01 02:00:00", + auto_apply=True, + no_prompts=True, + ) + + # verify result + assert _dates_in_table("test.a") == [ + "2024-01-01 00:30:00", + "2024-01-01 02:30:00", + "2024-01-02 00:30:00", + ] + + # dev shouldnt have been affected yet + assert _dates_in_table("test__dev.b") == [ + "2024-01-01 00:30:00", + "2024-01-01 01:30:00", + "2024-01-01 02:30:00", + "2024-01-02 00:30:00", + ] + + # plan dev which should trigger the missing intervals to get repopulated + ctx.plan(environment="dev", auto_apply=True, no_prompts=True) + + # dev should have the restated data + for tbl in ["test.a", "test__dev.b"]: + assert _dates_in_table(tbl) == [ + "2024-01-01 00:30:00", + "2024-01-01 02:30:00", + "2024-01-02 00:30:00", + ] + + @time_machine.travel("2023-01-08 15:00:00 UTC") def test_plan_against_expired_environment(init_and_plan_context: t.Callable): context, plan = init_and_plan_context("examples/sushi")