diff --git a/.changes/unreleased/Fixes-20241017-153022.yaml b/.changes/unreleased/Fixes-20241017-153022.yaml new file mode 100644 index 00000000000..905a40db501 --- /dev/null +++ b/.changes/unreleased/Fixes-20241017-153022.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: Ensure KeyboardInterrupt/SystemExit halts microbatch model execution +time: 2024-10-17T15:30:22.781854-07:00 +custom: + Author: QMalcolm + Issue: "10862" diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index ce613c0e44d..0b6fae4fc4c 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -568,6 +568,9 @@ def _execute_microbatch_materialization( # At least one batch has been inserted successfully! incremental_batch = True + except (KeyboardInterrupt, SystemExit): + # reraise it for GraphRunnableTask.execute_nodes to handle + raise except Exception as e: exception = e batch_run_result = self._build_failed_run_batch_result( diff --git a/tests/unit/task/test_run.py b/tests/unit/task/test_run.py index 7063ca4200d..8f239ccfc3a 100644 --- a/tests/unit/task/test_run.py +++ b/tests/unit/task/test_run.py @@ -14,7 +14,7 @@ from dbt.adapters.contracts.connection import AdapterResponse from dbt.adapters.postgres import PostgresAdapter from dbt.artifacts.resources.base import FileHash -from dbt.artifacts.resources.types import NodeType, RunHookType +from dbt.artifacts.resources.types import BatchSize, NodeType, RunHookType from dbt.artifacts.resources.v1.components import DependsOn from dbt.artifacts.resources.v1.config import NodeConfig from dbt.artifacts.resources.v1.model import ModelConfig @@ -27,6 +27,7 @@ from dbt.events.types import LogModelResult from dbt.exceptions import DbtRuntimeError from dbt.flags import get_flags, set_from_args +from dbt.materializations.incremental.microbatch import MicrobatchBuilder from dbt.task.run import ModelRunner, RunTask, _get_adapter_info from dbt.tests.util import safe_set_invocation_context from dbt_common.events.base_types import EventLevel @@ -265,6 +266,33 @@ class Relation: # Assert result of _is_incremental assert model_runner._is_incremental(model) == expectation + def test_keyboard_breaks__execute_microbatch_materialization( + self, + table_model: ModelNode, + manifest: Manifest, + model_runner: ModelRunner, + ) -> None: + def mock_build_batch_context(*args, **kwargs): + raise KeyboardInterrupt("Test exception") + + def mock_is_incremental(*args, **kwargs): + return True + + table_model.config.materialized = "incremental" + table_model.config.incremental_strategy = "microbatch" + table_model.config.batch_size = BatchSize.day + + with patch.object( + MicrobatchBuilder, "build_batch_context", mock_build_batch_context + ), patch.object(ModelRunner, "_is_incremental", mock_is_incremental): + try: + model_runner._execute_microbatch_materialization( + table_model, manifest, {}, MagicMock() + ) + assert False, "KeybaordInterrupt failed to escape" + except KeyboardInterrupt: + assert True + class TestRunTask: @pytest.fixture