diff --git a/core/dbt/contracts/graph/nodes.py b/core/dbt/contracts/graph/nodes.py index d341b167ab9..0eaf758ae5a 100644 --- a/core/dbt/contracts/graph/nodes.py +++ b/core/dbt/contracts/graph/nodes.py @@ -444,13 +444,15 @@ def resource_class(cls) -> Type[HookNodeResource]: @dataclass class ModelNode(ModelResource, CompiledNode): - batch_info: Optional[BatchResults] = None + previous_batch_results: Optional[BatchResults] = None _has_this: Optional[bool] = None def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None): dct = super().__post_serialize__(dct, context) if "_has_this" in dct: del dct["_has_this"] + if "previous_batch_results" in dct: + del dct["previous_batch_results"] return dct @classmethod diff --git a/core/dbt/task/retry.py b/core/dbt/task/retry.py index 9b3c3874718..c808872b8f8 100644 --- a/core/dbt/task/retry.py +++ b/core/dbt/task/retry.py @@ -136,7 +136,7 @@ def run(self): # batch info if there were _no_ successful batches previously. This is # because passing the batch info _forces_ the microbatch process into # _incremental_ model, and it may be that we need to be in full refresh - # mode which is only handled if batch_info _isn't_ passed for a node + # mode which is only handled if previous_batch_results _isn't_ passed for a node batch_map = { result.unique_id: result.batch_results for result in self.previous_results.results diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 1cd41cd6f39..e52dd8d0abd 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -445,8 +445,8 @@ def merge_batch_results(self, result: RunResult, batch_results: List[RunResult]) result.batch_results.failed = sorted(result.batch_results.failed) # # If retrying, propagate previously successful batches into final result, even thoguh they were not run in this invocation - if self.node.batch_info is not None: - result.batch_results.successful += self.node.batch_info.successful + if self.node.previous_batch_results is not None: + result.batch_results.successful += self.node.previous_batch_results.successful def _build_succesful_run_batch_result( self, @@ -508,15 +508,15 @@ def _execute_microbatch_materialization( ) if self.batch_idx is None: - # Note currently (9/30/2024) model.batch_info is only ever _not_ `None` + # Note currently (9/30/2024) model.previous_batch_results is only ever _not_ `None` # IFF `dbt retry` is being run and the microbatch model had batches which # failed on the run of the model (which is being retried) - if model.batch_info is None: + if model.previous_batch_results is None: end = microbatch_builder.build_end_time() start = microbatch_builder.build_start_time(end) batches = microbatch_builder.build_batches(start, end) else: - batches = model.batch_info.failed + batches = model.previous_batch_results.failed # If there is batch info, then don't run as full_refresh and do force is_incremental # not doing this risks blowing away the work that has already been done if self._has_relation(model=model): @@ -885,7 +885,7 @@ def populate_microbatch_batches(self, selected_uids: AbstractSet[str]): if uid in self.batch_map: node = self.manifest.ref_lookup.perform_lookup(uid, self.manifest) if isinstance(node, ModelNode): - node.batch_info = self.batch_map[uid] + node.previous_batch_results = self.batch_map[uid] def before_run(self, adapter: BaseAdapter, selected_uids: AbstractSet[str]) -> RunStatus: with adapter.connection_named("master"): diff --git a/tests/unit/contracts/graph/test_manifest.py b/tests/unit/contracts/graph/test_manifest.py index 146a51d5856..0f3a80e5039 100644 --- a/tests/unit/contracts/graph/test_manifest.py +++ b/tests/unit/contracts/graph/test_manifest.py @@ -96,7 +96,6 @@ "deprecation_date", "defer_relation", "time_spine", - "batch_info", } )