Skip to content

Commit

Permalink
Ensure KeyboardInterrupt halts microbatch model execution (#10879)
Browse files Browse the repository at this point in the history
  • Loading branch information
QMalcolm authored Oct 31, 2024
1 parent 8a17a0d commit 289d2dd
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 1 deletion.
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20241017-153022.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Ensure KeyboardInterrupt/SystemExit halts microbatch model execution
time: 2024-10-17T15:30:22.781854-07:00
custom:
Author: QMalcolm
Issue: "10862"
3 changes: 3 additions & 0 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,9 @@ def _execute_microbatch_materialization(
# At least one batch has been inserted successfully!
incremental_batch = True

except (KeyboardInterrupt, SystemExit):
# reraise it for GraphRunnableTask.execute_nodes to handle
raise
except Exception as e:
exception = e
batch_run_result = self._build_failed_run_batch_result(
Expand Down
30 changes: 29 additions & 1 deletion tests/unit/task/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from dbt.adapters.contracts.connection import AdapterResponse
from dbt.adapters.postgres import PostgresAdapter
from dbt.artifacts.resources.base import FileHash
from dbt.artifacts.resources.types import NodeType, RunHookType
from dbt.artifacts.resources.types import BatchSize, NodeType, RunHookType
from dbt.artifacts.resources.v1.components import DependsOn
from dbt.artifacts.resources.v1.config import NodeConfig
from dbt.artifacts.resources.v1.model import ModelConfig
Expand All @@ -27,6 +27,7 @@
from dbt.events.types import LogModelResult
from dbt.exceptions import DbtRuntimeError
from dbt.flags import get_flags, set_from_args
from dbt.materializations.incremental.microbatch import MicrobatchBuilder
from dbt.task.run import ModelRunner, RunTask, _get_adapter_info
from dbt.tests.util import safe_set_invocation_context
from dbt_common.events.base_types import EventLevel
Expand Down Expand Up @@ -265,6 +266,33 @@ class Relation:
# Assert result of _is_incremental
assert model_runner._is_incremental(model) == expectation

def test_keyboard_breaks__execute_microbatch_materialization(
self,
table_model: ModelNode,
manifest: Manifest,
model_runner: ModelRunner,
) -> None:
def mock_build_batch_context(*args, **kwargs):
raise KeyboardInterrupt("Test exception")

def mock_is_incremental(*args, **kwargs):
return True

table_model.config.materialized = "incremental"
table_model.config.incremental_strategy = "microbatch"
table_model.config.batch_size = BatchSize.day

with patch.object(
MicrobatchBuilder, "build_batch_context", mock_build_batch_context
), patch.object(ModelRunner, "_is_incremental", mock_is_incremental):
try:
model_runner._execute_microbatch_materialization(
table_model, manifest, {}, MagicMock()
)
assert False, "KeybaordInterrupt failed to escape"
except KeyboardInterrupt:
assert True


class TestRunTask:
@pytest.fixture
Expand Down

0 comments on commit 289d2dd

Please sign in to comment.