From e1aa98ef0a369bd1f61f5d9ec4b8d4e648f4fdc3 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Thu, 24 Oct 2024 19:35:03 -0400 Subject: [PATCH 1/4] store model.to_dict in batch jinja context var of --- core/dbt/task/run.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 7a321e69d30..416a3daf854 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -532,7 +532,7 @@ def _execute_microbatch_materialization( batch[0], model.config.batch_size ), ) - context["model"] = model + context["model"] = model.to_dict() context["sql"] = model.compiled_code context["compiled_code"] = model.compiled_code From 072fe4411eb5f1fd29692d26ccf928cb02ae5f5c Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Mon, 28 Oct 2024 13:25:03 -0400 Subject: [PATCH 2/4] MicrobatchBuilder.build_batch_context + tests --- .../incremental/microbatch.py | 22 +++++++++++- core/dbt/task/run.py | 35 ++++++++++--------- .../incremental/test_microbatch.py | 27 ++++++++++++++ 3 files changed, 67 insertions(+), 17 deletions(-) diff --git a/core/dbt/materializations/incremental/microbatch.py b/core/dbt/materializations/incremental/microbatch.py index da8930acb89..690083f3c3e 100644 --- a/core/dbt/materializations/incremental/microbatch.py +++ b/core/dbt/materializations/incremental/microbatch.py @@ -1,5 +1,5 @@ from datetime import datetime, timedelta -from typing import List, Optional +from typing import Any, Dict, List, Optional import pytz @@ -99,6 +99,26 @@ def build_batches(self, start: datetime, end: datetime) -> List[BatchType]: return batches + def build_batch_context(self, incremental_batch: bool) -> Dict[str, Any]: + """ + Create context with entries that reflect microbatch model + incremental execution state + + Assumes self.model has been (re)-compiled with necessary batch filters applied. + """ + batch_context: Dict[str, Any] = {} + + # Microbatch model properties + batch_context["model"] = self.model.to_dict() + batch_context["sql"] = self.model.compiled_code + batch_context["compiled_code"] = self.model.compiled_code + + # Add incremental context variables for batches running incrementally + if incremental_batch: + batch_context["is_incremental"] = lambda: True + batch_context["should_full_refresh"] = lambda: False + + return batch_context + @staticmethod def offset_timestamp(timestamp: datetime, batch_size: BatchSize, offset: int) -> datetime: """Truncates the passed in timestamp based on the batch_size and then applies the offset by the batch_size. diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 416a3daf854..0700880c617 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -489,28 +489,29 @@ def _execute_microbatch_materialization( materialization_macro: MacroProtocol, ) -> List[RunResult]: batch_results: List[RunResult] = [] + microbatch_builder = MicrobatchBuilder( + model=model, + is_incremental=self._is_incremental(model), + event_time_start=getattr(self.config.args, "EVENT_TIME_START", None), + event_time_end=getattr(self.config.args, "EVENT_TIME_END", None), + default_end_time=self.config.invoked_at, + ) + # Indicates whether current batch should be run incrementally + incremental_batch = False # Note currently (9/30/2024) model.batch_info is only ever _not_ `None` # IFF `dbt retry` is being run and the microbatch model had batches which # failed on the run of the model (which is being retried) if model.batch_info is None: - microbatch_builder = MicrobatchBuilder( - model=model, - is_incremental=self._is_incremental(model), - event_time_start=getattr(self.config.args, "EVENT_TIME_START", None), - event_time_end=getattr(self.config.args, "EVENT_TIME_END", None), - default_end_time=self.config.invoked_at, - ) end = microbatch_builder.build_end_time() start = microbatch_builder.build_start_time(end) batches = microbatch_builder.build_batches(start, end) else: batches = model.batch_info.failed - # if there is batch info, then don't run as full_refresh and do force is_incremental + # If there is batch info, then don't run as full_refresh and do force is_incremental # not doing this risks blowing away the work that has already been done if self._has_relation(model=model): - context["is_incremental"] = lambda: True - context["should_full_refresh"] = lambda: False + incremental_batch = True # iterate over each batch, calling materialization_macro to get a batch-level run result for batch_idx, batch in enumerate(batches): @@ -532,9 +533,11 @@ def _execute_microbatch_materialization( batch[0], model.config.batch_size ), ) - context["model"] = model.to_dict() - context["sql"] = model.compiled_code - context["compiled_code"] = model.compiled_code + # Update jinja context with batch context members + batch_context = microbatch_builder.build_batch_context( + incremental_batch=incremental_batch + ) + context.update(batch_context) # Materialize batch and cache any materialized relations result = MacroGenerator( @@ -547,9 +550,9 @@ def _execute_microbatch_materialization( batch_run_result = self._build_succesful_run_batch_result( model, context, batch, time.perf_counter() - start_time ) - # Update context vars for future batches - context["is_incremental"] = lambda: True - context["should_full_refresh"] = lambda: False + # At least one batch has been inserted successfully! + incremental_batch = True + except Exception as e: exception = e batch_run_result = self._build_failed_run_batch_result( diff --git a/tests/unit/materializations/incremental/test_microbatch.py b/tests/unit/materializations/incremental/test_microbatch.py index 5c368f535a9..8581e074ee7 100644 --- a/tests/unit/materializations/incremental/test_microbatch.py +++ b/tests/unit/materializations/incremental/test_microbatch.py @@ -488,6 +488,33 @@ def test_build_batches(self, microbatch_model, start, end, batch_size, expected_ assert len(actual_batches) == len(expected_batches) assert actual_batches == expected_batches + def test_build_batch_context_incremental_batch(self, microbatch_model): + microbatch_builder = MicrobatchBuilder( + model=microbatch_model, is_incremental=True, event_time_start=None, event_time_end=None + ) + context = microbatch_builder.build_batch_context(incremental_batch=True) + + assert context["model"] == microbatch_model.to_dict() + assert context["sql"] == microbatch_model.compiled_code + assert context["compiled_code"] == microbatch_model.compiled_code + + assert context["is_incremental"]() is True + assert context["should_full_refresh"]() is False + + def test_build_batch_context_incremental_batch_false(self, microbatch_model): + microbatch_builder = MicrobatchBuilder( + model=microbatch_model, is_incremental=True, event_time_start=None, event_time_end=None + ) + context = microbatch_builder.build_batch_context(incremental_batch=False) + + assert context["model"] == microbatch_model.to_dict() + assert context["sql"] == microbatch_model.compiled_code + assert context["compiled_code"] == microbatch_model.compiled_code + + # Only build is_incremental callables when not first batch + assert "is_incremental" not in context + assert "should_full_refresh" not in context + @pytest.mark.parametrize( "timestamp,batch_size,offset,expected_timestamp", [ From e2913d2b771071cab3ee4786974dfb9f66a9506f Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Mon, 28 Oct 2024 13:27:56 -0400 Subject: [PATCH 3/4] changelog entry --- .changes/unreleased/Fixes-20241028-132751.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changes/unreleased/Fixes-20241028-132751.yaml diff --git a/.changes/unreleased/Fixes-20241028-132751.yaml b/.changes/unreleased/Fixes-20241028-132751.yaml new file mode 100644 index 00000000000..6e1f79b9b9c --- /dev/null +++ b/.changes/unreleased/Fixes-20241028-132751.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: 'Fix ''model'' jinja context variable type to dict ' +time: 2024-10-28T13:27:51.604093-04:00 +custom: + Author: michelleark + Issue: "10927" From d685226e5d42bb03d218281785ed39db4fc001bb Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Mon, 28 Oct 2024 15:39:05 -0400 Subject: [PATCH 4/4] add functional test: TestMicrobatchJinjaContext --- .../functional/microbatch/test_microbatch.py | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index 02c6976c848..46cbb2cdbfc 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -42,6 +42,31 @@ select * from {{ ref('input_model') }} """ +invalid_batch_context_macro_sql = """ +{% macro check_invalid_batch_context() %} + +{% if model is not mapping %} + {{ exceptions.raise_compiler_error("`model` is invalid: expected mapping type") }} +{% elif compiled_code and compiled_code is not string %} + {{ exceptions.raise_compiler_error("`compiled_code` is invalid: expected string type") }} +{% elif sql and sql is not string %} + {{ exceptions.raise_compiler_error("`sql` is invalid: expected string type") }} +{% elif is_incremental is not callable %} + {{ exceptions.raise_compiler_error("`is_incremental()` is invalid: expected callable type") }} +{% elif should_full_refresh is not callable %} + {{ exceptions.raise_compiler_error("`should_full_refresh()` is invalid: expected callable type") }} +{% endif %} + +{% endmacro %} +""" + +microbatch_model_with_context_checks_sql = """ +{{ config(pre_hook="{{ check_invalid_batch_context() }}", materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }} + +{{ check_invalid_batch_context() }} +select * from {{ ref('input_model') }} +""" + microbatch_model_downstream_sql = """ {{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }} select * from {{ ref('microbatch_model') }} @@ -324,6 +349,27 @@ def test_run_with_event_time(self, project): self.assert_row_count(project, "microbatch_model", 5) +class TestMicrobatchJinjaContext(BaseMicrobatchTest): + + @pytest.fixture(scope="class") + def macros(self): + return {"check_batch_context.sql": invalid_batch_context_macro_sql} + + @pytest.fixture(scope="class") + def models(self): + return { + "input_model.sql": input_model_sql, + "microbatch_model.sql": microbatch_model_with_context_checks_sql, + } + + @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) + def test_run_with_event_time(self, project): + # initial run -- backfills all data + with patch_microbatch_end_time("2020-01-03 13:57:00"): + run_dbt(["run"]) + self.assert_row_count(project, "microbatch_model", 3) + + class TestMicrobatchWithInputWithoutEventTime(BaseMicrobatchTest): @pytest.fixture(scope="class") def models(self):