From 7bdc13de9bfeaf4ef49c9c0635456cdc8a1db16b Mon Sep 17 00:00:00 2001 From: Erin Drummond Date: Thu, 19 Dec 2024 02:46:47 +0000 Subject: [PATCH] Populate missing intervals on plans too --- sqlmesh/core/plan/builder.py | 44 ++++++++--- tests/core/test_integration.py | 135 +++++++++++++++++++++++++++++++++ 2 files changed, 168 insertions(+), 11 deletions(-) diff --git a/sqlmesh/core/plan/builder.py b/sqlmesh/core/plan/builder.py index 479e37996..ac14ab5c4 100644 --- a/sqlmesh/core/plan/builder.py +++ b/sqlmesh/core/plan/builder.py @@ -24,7 +24,7 @@ SnapshotChangeCategory, ) from sqlmesh.core.snapshot.categorizer import categorize_change -from sqlmesh.core.snapshot.definition import Interval, SnapshotId +from sqlmesh.core.snapshot.definition import Interval, SnapshotId, missing_intervals from sqlmesh.utils import columns_to_types_all_known, random_id from sqlmesh.utils.dag import DAG from sqlmesh.utils.date import TimeLike, now, to_datetime, yesterday_ds, to_timestamp @@ -239,7 +239,7 @@ def build(self) -> Plan: restatements = self._build_restatements( dag, earliest_interval_start(self._context_diff.snapshots.values()) ) - models_to_backfill = self._build_models_to_backfill(dag, restatements) + models_to_backfill = self._build_models_to_backfill(dag, restatements, deployability_index) interval_end_per_model = self._interval_end_per_model if interval_end_per_model and self.override_end: @@ -412,18 +412,40 @@ def _build_directly_and_indirectly_modified( ) def _build_models_to_backfill( - self, dag: DAG[SnapshotId], restatements: t.Collection[SnapshotId] + self, + dag: DAG[SnapshotId], + restatements: t.Collection[SnapshotId], + deployability_index: DeployabilityIndex, ) -> t.Optional[t.Set[str]]: - backfill_models = ( - self._backfill_models - if self._backfill_models is not None - else [r.name for r in restatements] - # Only backfill models explicitly marked for restatement. - if self._restate_models - else None - ) + backfill_models = None + + if self._backfill_models: + # If this plan has models set to backfill explicitly, do those + backfill_models = self._backfill_models + elif self._restate_models: + # Restatements are mutually exclusive with backfills, so if theyre set then do those + backfill_models = [r.name for r in restatements] + elif self._is_dev: + # Otherwise, check if any models have missing historical intervals and backfill those + # this can happen if this environment contains new models downstream from snapshots in prod + # and a restatement plan is issued against prod which clears intervals from the models in this environment + backfill_models = [ + snapshot.name + for snapshot, missing in missing_intervals( + snapshots=self._context_diff.snapshots.values(), + start=self._start, + end=self._end, + execution_time=self._execution_time, + deployability_index=deployability_index, + interval_end_per_model=self._interval_end_per_model, + end_bounded=self._end_bounded, + ).items() + if snapshot.is_model and missing + ] or None + if backfill_models is None: return None + return { self._context_diff.snapshots[s_id].name for s_id in dag.subdag( diff --git a/tests/core/test_integration.py b/tests/core/test_integration.py index e566f2524..f65de1839 100644 --- a/tests/core/test_integration.py +++ b/tests/core/test_integration.py @@ -2250,6 +2250,141 @@ def _dates_in_table(table_name: str) -> t.List[str]: assert _dates_in_table("test__dev.b") == ["2024-01-02 00:30:00"] +def test_prod_restatement_plan_causes_dev_intervals_to_be_processed_in_next_dev_plan( + tmp_path: Path, +): + """ + Scenario: + I have a model A[hourly] in prod + I create dev and add a model B[daily] + I prod, I restate *one hour* of A + In dev, I run a normal plan instead of a cadence run + + Outcome: + The whole day for B should be restated as part of a normal plan + """ + + model_a = """ + MODEL ( + name test.a, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column "ts" + ), + start '2024-01-01 00:00:00', + cron '@hourly' + ); + + select account_id, ts from test.external_table; + """ + + model_b = """ + MODEL ( + name test.b, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column ts + ), + cron '@daily' + ); + + select account_id, ts from test.a where ts between @start_dt and @end_dt; + """ + + models_dir = tmp_path / "models" + models_dir.mkdir() + + with open(models_dir / "a.sql", "w") as f: + f.write(model_a) + + config = Config(model_defaults=ModelDefaultsConfig(dialect="duckdb")) + ctx = Context(paths=[tmp_path], config=config) + + engine_adapter = ctx.engine_adapter + engine_adapter.create_schema("test") + + # source data + df = pd.DataFrame( + { + "account_id": [1001, 1002, 1003, 1004], + "ts": [ + "2024-01-01 00:30:00", + "2024-01-01 01:30:00", + "2024-01-01 02:30:00", + "2024-01-02 00:30:00", + ], + } + ) + columns_to_types = { + "account_id": exp.DataType.build("int"), + "ts": exp.DataType.build("timestamp"), + } + external_table = exp.table_(table="external_table", db="test", quoted=True) + engine_adapter.create_table(table_name=external_table, columns_to_types=columns_to_types) + engine_adapter.insert_append( + table_name=external_table, query_or_df=df, columns_to_types=columns_to_types + ) + + # plan + apply A[hourly] in prod + ctx.plan(auto_apply=True, no_prompts=True) + + # add B[daily] in dev + with open(models_dir / "b.sql", "w") as f: + f.write(model_b) + + # plan + apply dev + ctx.load() + ctx.plan(environment="dev", auto_apply=True, no_prompts=True) + + def _dates_in_table(table_name: str) -> t.List[str]: + return [ + str(r[0]) for r in engine_adapter.fetchall(f"select ts from {table_name} order by ts") + ] + + # verify initial state + for tbl in ["test.a", "test__dev.b"]: + assert _dates_in_table(tbl) == [ + "2024-01-01 00:30:00", + "2024-01-01 01:30:00", + "2024-01-01 02:30:00", + "2024-01-02 00:30:00", + ] + + # restate A in prod + engine_adapter.execute("delete from test.external_table where ts = '2024-01-01 01:30:00'") + ctx.plan( + restate_models=["test.a"], + start="2024-01-01 01:00:00", + end="2024-01-01 02:00:00", + auto_apply=True, + no_prompts=True, + ) + + # verify result + assert _dates_in_table("test.a") == [ + "2024-01-01 00:30:00", + "2024-01-01 02:30:00", + "2024-01-02 00:30:00", + ] + + # dev shouldnt have been affected yet + assert _dates_in_table("test__dev.b") == [ + "2024-01-01 00:30:00", + "2024-01-01 01:30:00", + "2024-01-01 02:30:00", + "2024-01-02 00:30:00", + ] + + # plan dev to trigger the processing of the prod restatement + ctx.plan(environment="dev", auto_apply=True, no_prompts=True) + + # B should now have had 2024-01-01 refreshed from A + for tbl in ["test.a", "test__dev.b"]: + assert _dates_in_table(tbl) == [ + "2024-01-01 00:30:00", + "2024-01-01 02:30:00", + "2024-01-02 00:30:00", + ] + + @time_machine.travel("2023-01-08 15:00:00 UTC") def test_plan_against_expired_environment(init_and_plan_context: t.Callable): context, plan = init_and_plan_context("examples/sushi")