Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make microbatch models skippable #11020

Merged
merged 3 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20241121-112638.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Make microbatch models skippable
time: 2024-11-21T11:26:38.192345-05:00
custom:
Author: michelleark
Issue: "11021"
16 changes: 11 additions & 5 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import threading
import time
from copy import deepcopy
from dataclasses import asdict, field
from dataclasses import asdict
from datetime import datetime
from multiprocessing.pool import ThreadPool
from typing import AbstractSet, Any, Dict, Iterable, List, Optional, Set, Tuple, Type
Expand Down Expand Up @@ -334,9 +334,12 @@


class MicrobatchModelRunner(ModelRunner):
batch_idx: Optional[int] = None
batches: Dict[int, BatchType] = field(default_factory=dict)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usage of field(default_factory=...) was invalid outside of a dataclass

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we verified that having it set as batches: Dict[int, BatchType] = {} doesn't make it so two instances of MicrobatchModelRunner share the same underlying dict?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did a test. We do indeed need to do something different

>>> class MyClass:
...   batches = {}
... 
>>> instance1 = MyClass()
>>> instance2 = MyClass()
>>> instance1.batches
{}
>>> instance2.batches
{}
>>> instance1.batches['catch_phrase'] = "I like cats!"
>>> instance1.batches
{'catch_phrase': 'I like cats!'}
>>> instance2.batches
{'catch_phrase': 'I like cats!'}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. Fixed with init override:

>>> class MyClass:
...   def __init__(self):
...     self.batches = {}
...
>>> instance1 = MyClass()
>>> instance2 = MyClass()
>>> instance1.batches
{}
>>> instance2.batches
{}
>>> instance1.batches['catch_phrase'] = "I like cats!"
>>> instance1.batches
{'catch_phrase': 'I like cats!'}
>>> instance2.batches
{}
>>>

relation_exists: bool = False
def __init__(self, config, adapter, node, node_index: int, num_nodes: int):
super().__init__(config, adapter, node, node_index, num_nodes)

self.batch_idx: Optional[int] = None
self.batches: Dict[int, BatchType] = {}
self.relation_exists: bool = False

def set_batch_idx(self, batch_idx: int) -> None:
self.batch_idx = batch_idx
Expand Down Expand Up @@ -704,8 +707,11 @@
runner: MicrobatchModelRunner,
pool: ThreadPool,
) -> RunResult:
# Initial run computes batch metadata
# Initial run computes batch metadata, unless model is skipped
result = self.call_runner(runner)
if result.status == RunStatus.Skipped:
return result

Check warning on line 713 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L712-L713

Added lines #L712 - L713 were not covered by tests

batch_results: List[RunResult] = []

# Execute batches serially until a relation exists, at which point future batches are run in parallel
Expand Down
24 changes: 24 additions & 0 deletions tests/functional/microbatch/test_microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@
select 3 as id, TIMESTAMP '2020-01-03 00:00:00-0' as event_time
"""

input_model_invalid_sql = """
{{ config(materialized='table', event_time='event_time') }}

select invalid as event_time
"""

input_model_without_event_time_sql = """
{{ config(materialized='table') }}

Expand Down Expand Up @@ -835,6 +841,24 @@ def test_microbatch(
assert len(catch_aw.caught_events) == 1


class TestMicrobatchModelSkipped(BaseMicrobatchTest):
@pytest.fixture(scope="class")
def models(self):
return {
"input_model.sql": input_model_invalid_sql,
"microbatch_model.sql": microbatch_model_sql,
}

def test_microbatch_model_skipped(self, project) -> None:
run_dbt(["run"], expect_pass=False)

run_results = get_artifact(project.project_root, "target", "run_results.json")

microbatch_result = run_results["results"][1]
assert microbatch_result["status"] == "skipped"
assert microbatch_result["batch_results"] is None


class TestMicrobatchCanRunParallelOrSequential(BaseMicrobatchTest):
@pytest.fixture
def batch_exc_catcher(self) -> EventCatcher:
Expand Down
Loading