Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure KeyboardInterrupt halts microbatch model execution #10879

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20241017-153022.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Ensure KeyboardInterrupt/SystemExit halts microbatch model execution
time: 2024-10-17T15:30:22.781854-07:00
custom:
Author: QMalcolm
Issue: "10862"
3 changes: 3 additions & 0 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,9 @@ def _execute_microbatch_materialization(
# At least one batch has been inserted successfully!
incremental_batch = True

except (KeyboardInterrupt, SystemExit):
# reraise it for GraphRunnableTask.execute_nodes to handle
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be writing out run results in this case?

Copy link
Contributor Author

@QMalcolm QMalcolm Oct 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The KeyboardInterrupt handler in GraphRunnableTask.execute_nodes should handle it

raise
except Exception as e:
exception = e
batch_run_result = self._build_failed_run_batch_result(
Expand Down
30 changes: 29 additions & 1 deletion tests/unit/task/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from dbt.adapters.contracts.connection import AdapterResponse
from dbt.adapters.postgres import PostgresAdapter
from dbt.artifacts.resources.base import FileHash
from dbt.artifacts.resources.types import NodeType, RunHookType
from dbt.artifacts.resources.types import BatchSize, NodeType, RunHookType
from dbt.artifacts.resources.v1.components import DependsOn
from dbt.artifacts.resources.v1.config import NodeConfig
from dbt.artifacts.resources.v1.model import ModelConfig
Expand All @@ -27,6 +27,7 @@
from dbt.events.types import LogModelResult
from dbt.exceptions import DbtRuntimeError
from dbt.flags import get_flags, set_from_args
from dbt.materializations.incremental.microbatch import MicrobatchBuilder
from dbt.task.run import ModelRunner, RunTask, _get_adapter_info
from dbt.tests.util import safe_set_invocation_context
from dbt_common.events.base_types import EventLevel
Expand Down Expand Up @@ -265,6 +266,33 @@ class Relation:
# Assert result of _is_incremental
assert model_runner._is_incremental(model) == expectation

def test_keyboard_breaks__execute_microbatch_materialization(
self,
table_model: ModelNode,
manifest: Manifest,
model_runner: ModelRunner,
) -> None:
def mock_build_batch_context(*args, **kwargs):
raise KeyboardInterrupt("Test exception")

def mock_is_incremental(*args, **kwargs):
return True

table_model.config.materialized = "incremental"
table_model.config.incremental_strategy = "microbatch"
table_model.config.batch_size = BatchSize.day

with patch.object(
MicrobatchBuilder, "build_batch_context", mock_build_batch_context
), patch.object(ModelRunner, "_is_incremental", mock_is_incremental):
try:
model_runner._execute_microbatch_materialization(
table_model, manifest, {}, MagicMock()
)
assert False, "KeybaordInterrupt failed to escape"
except KeyboardInterrupt:
assert True


class TestRunTask:
@pytest.fixture
Expand Down
Loading