Skip to content

Commit

Permalink
Fix bug where backfill_models was set to empty set instead of None in…
Browse files Browse the repository at this point in the history
… dev environments, causing missing intervals to not be filled
  • Loading branch information
erindru committed Dec 19, 2024
1 parent 75c6058 commit c1e0ccf
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 1 deletion.
2 changes: 1 addition & 1 deletion sqlmesh/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -1278,7 +1278,7 @@ def plan_builder(
backfill_models = {
*context_diff.modified_snapshots,
*[s.name for s in context_diff.added],
}
} or None

# If no end date is specified, use the max interval end from prod
# to prevent unintended evaluation of the entire DAG.
Expand Down
135 changes: 135 additions & 0 deletions tests/core/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -2251,6 +2251,141 @@ def _dates_in_table(table_name: str) -> t.List[str]:
assert _dates_in_table("test__dev.b") == ["2024-01-02 00:30:00"]


def test_prod_restatement_plan_causes_dev_intervals_to_be_processed_in_next_dev_plan(
tmp_path: Path,
):
"""
Scenario:
I have a model A[hourly] in prod
I create dev and add a model B[daily]
I prod, I restate *one hour* of A
In dev, I run a normal plan instead of a cadence run
Outcome:
The whole day for B should be restated as part of a normal plan
"""

model_a = """
MODEL (
name test.a,
kind INCREMENTAL_BY_TIME_RANGE (
time_column "ts"
),
start '2024-01-01 00:00:00',
cron '@hourly'
);
select account_id, ts from test.external_table;
"""

model_b = """
MODEL (
name test.b,
kind INCREMENTAL_BY_TIME_RANGE (
time_column ts
),
cron '@daily'
);
select account_id, ts from test.a where ts between @start_dt and @end_dt;
"""

models_dir = tmp_path / "models"
models_dir.mkdir()

with open(models_dir / "a.sql", "w") as f:
f.write(model_a)

config = Config(model_defaults=ModelDefaultsConfig(dialect="duckdb"))
ctx = Context(paths=[tmp_path], config=config)

engine_adapter = ctx.engine_adapter
engine_adapter.create_schema("test")

# source data
df = pd.DataFrame(
{
"account_id": [1001, 1002, 1003, 1004],
"ts": [
"2024-01-01 00:30:00",
"2024-01-01 01:30:00",
"2024-01-01 02:30:00",
"2024-01-02 00:30:00",
],
}
)
columns_to_types = {
"account_id": exp.DataType.build("int"),
"ts": exp.DataType.build("timestamp"),
}
external_table = exp.table_(table="external_table", db="test", quoted=True)
engine_adapter.create_table(table_name=external_table, columns_to_types=columns_to_types)
engine_adapter.insert_append(
table_name=external_table, query_or_df=df, columns_to_types=columns_to_types
)

# plan + apply A[hourly] in prod
ctx.plan(auto_apply=True, no_prompts=True)

# add B[daily] in dev
with open(models_dir / "b.sql", "w") as f:
f.write(model_b)

# plan + apply dev
ctx.load()
ctx.plan(environment="dev", auto_apply=True, no_prompts=True)

def _dates_in_table(table_name: str) -> t.List[str]:
return [
str(r[0]) for r in engine_adapter.fetchall(f"select ts from {table_name} order by ts")
]

# verify initial state
for tbl in ["test.a", "test__dev.b"]:
assert _dates_in_table(tbl) == [
"2024-01-01 00:30:00",
"2024-01-01 01:30:00",
"2024-01-01 02:30:00",
"2024-01-02 00:30:00",
]

# restate A in prod
engine_adapter.execute("delete from test.external_table where ts = '2024-01-01 01:30:00'")
ctx.plan(
restate_models=["test.a"],
start="2024-01-01 01:00:00",
end="2024-01-01 02:00:00",
auto_apply=True,
no_prompts=True,
)

# verify result
assert _dates_in_table("test.a") == [
"2024-01-01 00:30:00",
"2024-01-01 02:30:00",
"2024-01-02 00:30:00",
]

# dev shouldnt have been affected yet
assert _dates_in_table("test__dev.b") == [
"2024-01-01 00:30:00",
"2024-01-01 01:30:00",
"2024-01-01 02:30:00",
"2024-01-02 00:30:00",
]

# plan dev which should trigger the missing intervals to get repopulated
ctx.plan(environment="dev", auto_apply=True, no_prompts=True)

# dev should have the restated data
for tbl in ["test.a", "test__dev.b"]:
assert _dates_in_table(tbl) == [
"2024-01-01 00:30:00",
"2024-01-01 02:30:00",
"2024-01-02 00:30:00",
]


@time_machine.travel("2023-01-08 15:00:00 UTC")
def test_plan_against_expired_environment(init_and_plan_context: t.Callable):
context, plan = init_and_plan_context("examples/sushi")
Expand Down

0 comments on commit c1e0ccf

Please sign in to comment.