Skip to content

Commit

Permalink
When retrying microbatch models, propagate prior successful state (#1…
Browse files Browse the repository at this point in the history
…0802)

* When retrying microbatch models, propagate prior successful state

* Changie doc for microbatch dbt retry fixes

* Fix test_manifest unit tests for batch_info key

* Add functional test for when a microbatch model has multiple retries

* Add comment about when batch_info will be something other than None
  • Loading branch information
QMalcolm authored Oct 1, 2024
1 parent a86e2b4 commit 25a68a9
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 12 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20240930-153158.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Ensure dbt retry of microbatch models doesn't lose prior successful state
time: 2024-09-30T15:31:58.541656-05:00
custom:
Author: QMalcolm
Issue: "10800"
4 changes: 2 additions & 2 deletions core/dbt/contracts/graph/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
from dbt.artifacts.resources import SqlOperation as SqlOperationResource
from dbt.artifacts.resources import TimeSpine
from dbt.artifacts.resources import UnitTestDefinition as UnitTestDefinitionResource
from dbt.artifacts.schemas.batch_results import BatchType
from dbt.artifacts.schemas.batch_results import BatchResults
from dbt.contracts.graph.model_config import UnitTestNodeConfig
from dbt.contracts.graph.node_args import ModelNodeArgs
from dbt.contracts.graph.unparsed import (
Expand Down Expand Up @@ -454,7 +454,7 @@ def resource_class(cls) -> Type[HookNodeResource]:

@dataclass
class ModelNode(ModelResource, CompiledNode):
batches: Optional[List[BatchType]] = None
batch_info: Optional[BatchResults] = None

@classmethod
def resource_class(cls) -> Type[ModelResource]:
Expand Down
12 changes: 9 additions & 3 deletions core/dbt/task/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,17 @@ def run(self):
)
}

# We need this so that re-running of a microbatch model will only rerun
# batches that previously failed. Note _explicitly_ do no pass the
# batch info if there were _no_ successful batches previously. This is
# because passing the batch info _forces_ the microbatch process into
# _incremental_ model, and it may be that we need to be in full refresh
# mode which is only handled if batch_info _isn't_ passed for a node
batch_map = {
result.unique_id: result.batch_results.failed
result.unique_id: result.batch_results
for result in self.previous_results.results
if result.status == NodeStatus.PartialSuccess
and result.batch_results is not None
if result.batch_results is not None
and len(result.batch_results.successful) != 0
and len(result.batch_results.failed) > 0
and not (
self.previous_command_name != "run-operation"
Expand Down
23 changes: 17 additions & 6 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import functools
import os
import threading
from copy import deepcopy
from datetime import datetime
from typing import AbstractSet, Any, Dict, Iterable, List, Optional, Set, Tuple, Type

Expand Down Expand Up @@ -327,6 +328,13 @@ def _build_run_microbatch_model_result(
status = RunStatus.PartialSuccess
msg = f"PARTIAL SUCCESS ({num_successes}/{num_successes + num_failures})"

if model.batch_info is not None:
new_batch_results = deepcopy(model.batch_info)
new_batch_results.failed = []
new_batch_results = new_batch_results + batch_results
else:
new_batch_results = batch_results

return RunResult(
node=model,
status=status,
Expand All @@ -337,7 +345,7 @@ def _build_run_microbatch_model_result(
message=msg,
adapter_response={},
failures=num_failures,
batch_results=batch_results,
batch_results=new_batch_results,
)

def _build_succesful_run_batch_result(
Expand Down Expand Up @@ -470,7 +478,10 @@ def _execute_microbatch_materialization(
) -> List[RunResult]:
batch_results: List[RunResult] = []

if model.batches is None:
# Note currently (9/30/2024) model.batch_info is only ever _not_ `None`
# IFF `dbt retry` is being run and the microbatch model had batches which
# failed on the run of the model (which is being retried)
if model.batch_info is None:
microbatch_builder = MicrobatchBuilder(
model=model,
is_incremental=self._is_incremental(model),
Expand All @@ -481,8 +492,8 @@ def _execute_microbatch_materialization(
start = microbatch_builder.build_start_time(end)
batches = microbatch_builder.build_batches(start, end)
else:
batches = model.batches
# if there are batches, then don't run as full_refresh and do force is_incremental
batches = model.batch_info.failed
# if there is batch info, then don't run as full_refresh and do force is_incremental
# not doing this risks blowing away the work that has already been done
if self._has_relation(model=model):
context["is_incremental"] = lambda: True
Expand Down Expand Up @@ -567,7 +578,7 @@ def __init__(
args: Flags,
config: RuntimeConfig,
manifest: Manifest,
batch_map: Optional[Dict[str, List[BatchType]]] = None,
batch_map: Optional[Dict[str, BatchResults]] = None,
) -> None:
super().__init__(args, config, manifest)
self.batch_map = batch_map
Expand Down Expand Up @@ -709,7 +720,7 @@ def populate_microbatch_batches(self, selected_uids: AbstractSet[str]):
if uid in self.batch_map:
node = self.manifest.ref_lookup.perform_lookup(uid, self.manifest)
if isinstance(node, ModelNode):
node.batches = self.batch_map[uid]
node.batch_info = self.batch_map[uid]

def before_run(self, adapter: BaseAdapter, selected_uids: AbstractSet[str]) -> RunStatus:
with adapter.connection_named("master"):
Expand Down
38 changes: 38 additions & 0 deletions tests/functional/microbatch/test_microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,44 @@ def test_run_with_event_time(self, project):
self.assert_row_count(project, "microbatch_model", 3)


class TestMicrobatchMultipleRetries(BaseMicrobatchTest):
@pytest.fixture(scope="class")
def models(self):
return {
"input_model.sql": input_model_sql,
"microbatch_model.sql": microbatch_model_failing_incremental_partition_sql,
}

@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_run_with_event_time(self, project):
# run all partitions from start - 2 expected rows in output, one failed
with patch_microbatch_end_time("2020-01-03 13:57:00"):
_, console_output = run_dbt_and_capture(["run", "--event-time-start", "2020-01-01"])

assert "PARTIAL SUCCESS (2/3)" in console_output
assert "Completed with 1 partial success" in console_output

self.assert_row_count(project, "microbatch_model", 2)

with patch_microbatch_end_time("2020-01-03 13:57:00"):
_, console_output = run_dbt_and_capture(["retry"], expect_pass=False)

assert "PARTIAL SUCCESS" not in console_output
assert "ERROR" in console_output
assert "Completed with 1 error, 0 partial successs, and 0 warnings" in console_output

self.assert_row_count(project, "microbatch_model", 2)

with patch_microbatch_end_time("2020-01-03 13:57:00"):
_, console_output = run_dbt_and_capture(["retry"], expect_pass=False)

assert "PARTIAL SUCCESS" not in console_output
assert "ERROR" in console_output
assert "Completed with 1 error, 0 partial successs, and 0 warnings" in console_output

self.assert_row_count(project, "microbatch_model", 2)


microbatch_model_first_partition_failing_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }}
{% if '2020-01-01' in (model.config.__dbt_internal_microbatch_event_time_start | string) %}
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/contracts/graph/test_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@
"deprecation_date",
"defer_relation",
"time_spine",
"batches",
"batch_info",
"vars",
}
)
Expand Down

0 comments on commit 25a68a9

Please sign in to comment.