diff --git a/.changes/unreleased/Fixes-20241114-112535.yaml b/.changes/unreleased/Fixes-20241114-112535.yaml new file mode 100644 index 00000000000..d82095deb9c --- /dev/null +++ b/.changes/unreleased/Fixes-20241114-112535.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: Emit batch-level exception with node_info on microbatch batch run failure +time: 2024-11-14T11:25:35.050914-05:00 +custom: + Author: michelleark + Issue: "10840" diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index d59e9c6cf72..9c65a7d8438 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -27,11 +27,11 @@ from dbt.contracts.graph.manifest import Manifest from dbt.contracts.graph.nodes import HookNode, ModelNode, ResultNode from dbt.events.types import ( + GenericExceptionOnRun, LogHookEndLine, LogHookStartLine, LogModelResult, LogStartLine, - RunningOperationCaughtError, ) from dbt.exceptions import CompilationError, DbtInternalError, DbtRuntimeError from dbt.graph import ResourceTypeSelector @@ -275,7 +275,13 @@ def print_batch_result_line( level=level, ) if exception: - fire_event(RunningOperationCaughtError(exc=str(exception))) + fire_event( + GenericExceptionOnRun( + unique_id=self.node.unique_id, + exc=f"Exception on worker thread. {str(exception)}", + node_info=self.node.node_info, + ) + ) def print_batch_start_line( self, batch_start: Optional[datetime], batch_idx: int, batch_total: int diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index 3de48225f44..0ef12367a07 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -3,6 +3,7 @@ import pytest from dbt.events.types import ( + GenericExceptionOnRun, LogModelResult, MicrobatchMacroOutsideOfBatchesDeprecation, MicrobatchModelNoEventTimeInputs, @@ -516,7 +517,7 @@ def test_run_with_event_time_logs(self, project): """ -class TestMicrobatchIncrementalPartitionFailure(BaseMicrobatchTest): +class TestMicrobatchIncrementalBatchFailure(BaseMicrobatchTest): @pytest.fixture(scope="class") def models(self): return { @@ -526,9 +527,15 @@ def models(self): } def test_run_with_event_time(self, project): + event_catcher = EventCatcher( + GenericExceptionOnRun, predicate=lambda event: event.data.node_info is not None + ) + # run all partitions from start - 2 expected rows in output, one failed with patch_microbatch_end_time("2020-01-03 13:57:00"): - run_dbt(["run"], expect_pass=False) + run_dbt(["run"], callbacks=[event_catcher.catch], expect_pass=False) + + assert len(event_catcher.caught_events) == 1 self.assert_row_count(project, "microbatch_model", 2) run_results = get_artifact(project.project_root, "target", "run_results.json") @@ -626,7 +633,7 @@ def test_run_with_event_time(self, project): """ -class TestMicrobatchInitialPartitionFailure(BaseMicrobatchTest): +class TestMicrobatchInitialBatchFailure(BaseMicrobatchTest): @pytest.fixture(scope="class") def models(self): return { @@ -635,9 +642,14 @@ def models(self): } def test_run_with_event_time(self, project): + event_catcher = EventCatcher( + GenericExceptionOnRun, predicate=lambda event: event.data.node_info is not None + ) + # run all partitions from start - 2 expected rows in output, one failed with patch_microbatch_end_time("2020-01-03 13:57:00"): - run_dbt(["run"]) + run_dbt(["run"], callbacks=[event_catcher.catch]) + assert len(event_catcher.caught_events) == 1 self.assert_row_count(project, "microbatch_model", 2)