Skip to content

Commit

Permalink
Fix erroneous additional batch execution (#11113)
Browse files Browse the repository at this point in the history
* Update single batch test case to check for generic exceptions

* Explicitly skip last final batch execution when there is only one batch

Previously if there was only one batch, we would try to execute _two_
batches. The first batch, and a "last" non existent batch. This would
result in an unhandled exception.

* Changie doc
  • Loading branch information
QMalcolm authored Dec 10, 2024
1 parent 03fdb4c commit c9582c2
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 14 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20241209-133317.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Fix running of extra "last" batch when there is only one batch
time: 2024-12-09T13:33:17.253326-06:00
custom:
Author: QMalcolm
Issue: "11112"
27 changes: 15 additions & 12 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -768,18 +768,21 @@ def handle_microbatch_model(
# Wait until all submitted batches have completed
while len(batch_results) != batch_idx:
pass
# Final batch runs once all others complete to ensure post_hook runs at the end
self._submit_batch(
node=node,
adapter=runner.adapter,
relation_exists=relation_exists,
batches=batches,
batch_idx=batch_idx,
batch_results=batch_results,
pool=pool,
force_sequential_run=True,
skip=skip_batches,
)

# Only run "last" batch if there is more than one batch
if len(batches) != 1:
# Final batch runs once all others complete to ensure post_hook runs at the end
self._submit_batch(
node=node,
adapter=runner.adapter,
relation_exists=relation_exists,
batches=batches,
batch_idx=batch_idx,
batch_results=batch_results,
pool=pool,
force_sequential_run=True,
skip=skip_batches,
)

# Finalize run: merge results, track model run, and print final result line
runner.merge_batch_results(result, batch_results)
Expand Down
16 changes: 14 additions & 2 deletions tests/functional/microbatch/test_microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -1057,11 +1057,20 @@ def pre_or_post_hook(event) -> bool:

return EventCatcher(event_to_catch=JinjaLogDebug, predicate=pre_or_post_hook) # type: ignore

@pytest.fixture
def generic_exception_catcher(self) -> EventCatcher:
return EventCatcher(event_to_catch=GenericExceptionOnRun) # type: ignore

def test_microbatch(
self, mocker: MockerFixture, project, batch_log_catcher: EventCatcher
self,
project,
batch_log_catcher: EventCatcher,
generic_exception_catcher: EventCatcher,
) -> None:
with patch_microbatch_end_time("2020-01-01 13:57:00"):
_ = run_dbt(["run"], callbacks=[batch_log_catcher.catch])
_ = run_dbt(
["run"], callbacks=[batch_log_catcher.catch, generic_exception_catcher.catch]
)

# There should be two logs as the pre-hook and post-hook should
# both only be run once
Expand All @@ -1071,3 +1080,6 @@ def test_microbatch(
assert "pre-hook" in batch_log_catcher.caught_events[0].data.msg # type: ignore
assert "20200101" in batch_log_catcher.caught_events[1].data.msg # type: ignore
assert "post-hook" in batch_log_catcher.caught_events[1].data.msg # type: ignore

# we had a bug where having only one batch caused a generic exception
assert len(generic_exception_catcher.caught_events) == 0

0 comments on commit c9582c2

Please sign in to comment.