From 771792c7480341a3dff9c56e4e66461b0de91b98 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Wed, 13 Nov 2024 16:23:51 -0500 Subject: [PATCH 1/3] fire GenericExceptionOnRun for batch-level exception --- core/dbt/task/run.py | 10 ++++++++-- tests/functional/microbatch/test_microbatch.py | 11 +++++++++-- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index d59e9c6cf72..9c65a7d8438 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -27,11 +27,11 @@ from dbt.contracts.graph.manifest import Manifest from dbt.contracts.graph.nodes import HookNode, ModelNode, ResultNode from dbt.events.types import ( + GenericExceptionOnRun, LogHookEndLine, LogHookStartLine, LogModelResult, LogStartLine, - RunningOperationCaughtError, ) from dbt.exceptions import CompilationError, DbtInternalError, DbtRuntimeError from dbt.graph import ResourceTypeSelector @@ -275,7 +275,13 @@ def print_batch_result_line( level=level, ) if exception: - fire_event(RunningOperationCaughtError(exc=str(exception))) + fire_event( + GenericExceptionOnRun( + unique_id=self.node.unique_id, + exc=f"Exception on worker thread. {str(exception)}", + node_info=self.node.node_info, + ) + ) def print_batch_start_line( self, batch_start: Optional[datetime], batch_idx: int, batch_total: int diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index 3de48225f44..c0f687f0d18 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -3,6 +3,7 @@ import pytest from dbt.events.types import ( + GenericExceptionOnRun, LogModelResult, MicrobatchMacroOutsideOfBatchesDeprecation, MicrobatchModelNoEventTimeInputs, @@ -526,9 +527,15 @@ def models(self): } def test_run_with_event_time(self, project): - # run all partitions from start - 2 expected rows in output, one failed + event_catcher = EventCatcher( + GenericExceptionOnRun, predicate=lambda event: event.data.node_info is not None + ) + with patch_microbatch_end_time("2020-01-03 13:57:00"): - run_dbt(["run"], expect_pass=False) + run_dbt(["run"], callbacks=[event_catcher.catch], expect_pass=False) + + assert len(event_catcher.caught_events) == 1 + # run all partitions from start - 2 expected rows in output, one failed self.assert_row_count(project, "microbatch_model", 2) run_results = get_artifact(project.project_root, "target", "run_results.json") From 9677e21b598ff03b88e02f4f3b51f297836c31aa Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Thu, 14 Nov 2024 11:25:40 -0500 Subject: [PATCH 2/3] changelog entry --- .changes/unreleased/Fixes-20241114-112535.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changes/unreleased/Fixes-20241114-112535.yaml diff --git a/.changes/unreleased/Fixes-20241114-112535.yaml b/.changes/unreleased/Fixes-20241114-112535.yaml new file mode 100644 index 00000000000..d82095deb9c --- /dev/null +++ b/.changes/unreleased/Fixes-20241114-112535.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: Emit batch-level exception with node_info on microbatch batch run failure +time: 2024-11-14T11:25:35.050914-05:00 +custom: + Author: michelleark + Issue: "10840" From a759c26656f74609b19149980ca762eaeaaaaf48 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Thu, 14 Nov 2024 11:37:04 -0500 Subject: [PATCH 3/3] more testing --- tests/functional/microbatch/test_microbatch.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index c0f687f0d18..0ef12367a07 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -517,7 +517,7 @@ def test_run_with_event_time_logs(self, project): """ -class TestMicrobatchIncrementalPartitionFailure(BaseMicrobatchTest): +class TestMicrobatchIncrementalBatchFailure(BaseMicrobatchTest): @pytest.fixture(scope="class") def models(self): return { @@ -531,11 +531,11 @@ def test_run_with_event_time(self, project): GenericExceptionOnRun, predicate=lambda event: event.data.node_info is not None ) + # run all partitions from start - 2 expected rows in output, one failed with patch_microbatch_end_time("2020-01-03 13:57:00"): run_dbt(["run"], callbacks=[event_catcher.catch], expect_pass=False) assert len(event_catcher.caught_events) == 1 - # run all partitions from start - 2 expected rows in output, one failed self.assert_row_count(project, "microbatch_model", 2) run_results = get_artifact(project.project_root, "target", "run_results.json") @@ -633,7 +633,7 @@ def test_run_with_event_time(self, project): """ -class TestMicrobatchInitialPartitionFailure(BaseMicrobatchTest): +class TestMicrobatchInitialBatchFailure(BaseMicrobatchTest): @pytest.fixture(scope="class") def models(self): return { @@ -642,9 +642,14 @@ def models(self): } def test_run_with_event_time(self, project): + event_catcher = EventCatcher( + GenericExceptionOnRun, predicate=lambda event: event.data.node_info is not None + ) + # run all partitions from start - 2 expected rows in output, one failed with patch_microbatch_end_time("2020-01-03 13:57:00"): - run_dbt(["run"]) + run_dbt(["run"], callbacks=[event_catcher.catch]) + assert len(event_catcher.caught_events) == 1 self.assert_row_count(project, "microbatch_model", 2)