Skip to content

Commit

Permalink
Populate missing intervals on plans too
Browse files Browse the repository at this point in the history
  • Loading branch information
erindru committed Dec 19, 2024
1 parent 263e6be commit 7bdc13d
Show file tree
Hide file tree
Showing 2 changed files with 168 additions and 11 deletions.
44 changes: 33 additions & 11 deletions sqlmesh/core/plan/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
SnapshotChangeCategory,
)
from sqlmesh.core.snapshot.categorizer import categorize_change
from sqlmesh.core.snapshot.definition import Interval, SnapshotId
from sqlmesh.core.snapshot.definition import Interval, SnapshotId, missing_intervals
from sqlmesh.utils import columns_to_types_all_known, random_id
from sqlmesh.utils.dag import DAG
from sqlmesh.utils.date import TimeLike, now, to_datetime, yesterday_ds, to_timestamp
Expand Down Expand Up @@ -239,7 +239,7 @@ def build(self) -> Plan:
restatements = self._build_restatements(
dag, earliest_interval_start(self._context_diff.snapshots.values())
)
models_to_backfill = self._build_models_to_backfill(dag, restatements)
models_to_backfill = self._build_models_to_backfill(dag, restatements, deployability_index)

interval_end_per_model = self._interval_end_per_model
if interval_end_per_model and self.override_end:
Expand Down Expand Up @@ -412,18 +412,40 @@ def _build_directly_and_indirectly_modified(
)

def _build_models_to_backfill(
self, dag: DAG[SnapshotId], restatements: t.Collection[SnapshotId]
self,
dag: DAG[SnapshotId],
restatements: t.Collection[SnapshotId],
deployability_index: DeployabilityIndex,
) -> t.Optional[t.Set[str]]:
backfill_models = (
self._backfill_models
if self._backfill_models is not None
else [r.name for r in restatements]
# Only backfill models explicitly marked for restatement.
if self._restate_models
else None
)
backfill_models = None

if self._backfill_models:
# If this plan has models set to backfill explicitly, do those
backfill_models = self._backfill_models
elif self._restate_models:
# Restatements are mutually exclusive with backfills, so if theyre set then do those
backfill_models = [r.name for r in restatements]
elif self._is_dev:
# Otherwise, check if any models have missing historical intervals and backfill those
# this can happen if this environment contains new models downstream from snapshots in prod
# and a restatement plan is issued against prod which clears intervals from the models in this environment
backfill_models = [
snapshot.name
for snapshot, missing in missing_intervals(
snapshots=self._context_diff.snapshots.values(),
start=self._start,
end=self._end,
execution_time=self._execution_time,
deployability_index=deployability_index,
interval_end_per_model=self._interval_end_per_model,
end_bounded=self._end_bounded,
).items()
if snapshot.is_model and missing
] or None

if backfill_models is None:
return None

return {
self._context_diff.snapshots[s_id].name
for s_id in dag.subdag(
Expand Down
135 changes: 135 additions & 0 deletions tests/core/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -2250,6 +2250,141 @@ def _dates_in_table(table_name: str) -> t.List[str]:
assert _dates_in_table("test__dev.b") == ["2024-01-02 00:30:00"]


def test_prod_restatement_plan_causes_dev_intervals_to_be_processed_in_next_dev_plan(
tmp_path: Path,
):
"""
Scenario:
I have a model A[hourly] in prod
I create dev and add a model B[daily]
I prod, I restate *one hour* of A
In dev, I run a normal plan instead of a cadence run
Outcome:
The whole day for B should be restated as part of a normal plan
"""

model_a = """
MODEL (
name test.a,
kind INCREMENTAL_BY_TIME_RANGE (
time_column "ts"
),
start '2024-01-01 00:00:00',
cron '@hourly'
);
select account_id, ts from test.external_table;
"""

model_b = """
MODEL (
name test.b,
kind INCREMENTAL_BY_TIME_RANGE (
time_column ts
),
cron '@daily'
);
select account_id, ts from test.a where ts between @start_dt and @end_dt;
"""

models_dir = tmp_path / "models"
models_dir.mkdir()

with open(models_dir / "a.sql", "w") as f:
f.write(model_a)

config = Config(model_defaults=ModelDefaultsConfig(dialect="duckdb"))
ctx = Context(paths=[tmp_path], config=config)

engine_adapter = ctx.engine_adapter
engine_adapter.create_schema("test")

# source data
df = pd.DataFrame(
{
"account_id": [1001, 1002, 1003, 1004],
"ts": [
"2024-01-01 00:30:00",
"2024-01-01 01:30:00",
"2024-01-01 02:30:00",
"2024-01-02 00:30:00",
],
}
)
columns_to_types = {
"account_id": exp.DataType.build("int"),
"ts": exp.DataType.build("timestamp"),
}
external_table = exp.table_(table="external_table", db="test", quoted=True)
engine_adapter.create_table(table_name=external_table, columns_to_types=columns_to_types)
engine_adapter.insert_append(
table_name=external_table, query_or_df=df, columns_to_types=columns_to_types
)

# plan + apply A[hourly] in prod
ctx.plan(auto_apply=True, no_prompts=True)

# add B[daily] in dev
with open(models_dir / "b.sql", "w") as f:
f.write(model_b)

# plan + apply dev
ctx.load()
ctx.plan(environment="dev", auto_apply=True, no_prompts=True)

def _dates_in_table(table_name: str) -> t.List[str]:
return [
str(r[0]) for r in engine_adapter.fetchall(f"select ts from {table_name} order by ts")
]

# verify initial state
for tbl in ["test.a", "test__dev.b"]:
assert _dates_in_table(tbl) == [
"2024-01-01 00:30:00",
"2024-01-01 01:30:00",
"2024-01-01 02:30:00",
"2024-01-02 00:30:00",
]

# restate A in prod
engine_adapter.execute("delete from test.external_table where ts = '2024-01-01 01:30:00'")
ctx.plan(
restate_models=["test.a"],
start="2024-01-01 01:00:00",
end="2024-01-01 02:00:00",
auto_apply=True,
no_prompts=True,
)

# verify result
assert _dates_in_table("test.a") == [
"2024-01-01 00:30:00",
"2024-01-01 02:30:00",
"2024-01-02 00:30:00",
]

# dev shouldnt have been affected yet
assert _dates_in_table("test__dev.b") == [
"2024-01-01 00:30:00",
"2024-01-01 01:30:00",
"2024-01-01 02:30:00",
"2024-01-02 00:30:00",
]

# plan dev to trigger the processing of the prod restatement
ctx.plan(environment="dev", auto_apply=True, no_prompts=True)

# B should now have had 2024-01-01 refreshed from A
for tbl in ["test.a", "test__dev.b"]:
assert _dates_in_table(tbl) == [
"2024-01-01 00:30:00",
"2024-01-01 02:30:00",
"2024-01-02 00:30:00",
]


@time_machine.travel("2023-01-08 15:00:00 UTC")
def test_plan_against_expired_environment(init_and_plan_context: t.Callable):
context, plan = init_and_plan_context("examples/sushi")
Expand Down

0 comments on commit 7bdc13d

Please sign in to comment.