Skip to content

Commit

Permalink
Microbatch parallelism (#10958)
Browse files Browse the repository at this point in the history
  • Loading branch information
MichelleArk authored Nov 21, 2024
1 parent ae95759 commit fd6ec71
Show file tree
Hide file tree
Showing 20 changed files with 816 additions and 462 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20241118-160038.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Allow microbatch batches to run in parallel
time: 2024-11-18T16:00:38.895449-06:00
custom:
Author: QMalcolm MichelleArk
Issue: 10853 10855
1 change: 1 addition & 0 deletions core/dbt/artifacts/resources/v1/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ class NodeConfig(NodeAndTestConfig):
metadata=MergeBehavior.Update.meta(),
)
event_time: Any = None
concurrent_batches: Any = None

def __post_init__(self):
# we validate that node_color has a suitable value to prevent dbt-docs from crashing
Expand Down
3 changes: 3 additions & 0 deletions core/dbt/artifacts/schemas/batch_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,6 @@ def __add__(self, other: BatchResults) -> BatchResults:
successful=self.successful + other.successful,
failed=self.failed + other.failed,
)

def __len__(self):
return len(self.successful) + len(self.failed)
12 changes: 12 additions & 0 deletions core/dbt/clients/jinja_static.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,18 @@
_TESTING_MACRO_CACHE: Dict[str, Any] = {}


def statically_extract_has_name_this(source: str) -> bool:
"""Checks whether the raw jinja has any references to `this`"""
env = get_environment(None, capture_macros=True)
parsed = env.parse(source)
names = tuple(parsed.find_all(jinja2.nodes.Name))

for name in names:
if hasattr(name, "name") and name.name == "this":
return True
return False


def statically_extract_macro_calls(
source: str, ctx: Dict[str, Any], db_wrapper: Optional["ParseDatabaseWrapper"] = None
) -> List[str]:
Expand Down
14 changes: 14 additions & 0 deletions core/dbt/contracts/graph/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
from dbt.artifacts.resources import TimeSpine
from dbt.artifacts.resources import UnitTestDefinition as UnitTestDefinitionResource
from dbt.artifacts.schemas.batch_results import BatchResults
from dbt.clients.jinja_static import statically_extract_has_name_this
from dbt.contracts.graph.model_config import UnitTestNodeConfig
from dbt.contracts.graph.node_args import ModelNodeArgs
from dbt.contracts.graph.unparsed import (
Expand Down Expand Up @@ -444,6 +445,13 @@ def resource_class(cls) -> Type[HookNodeResource]:
@dataclass
class ModelNode(ModelResource, CompiledNode):
batch_info: Optional[BatchResults] = None
_has_this: Optional[bool] = None

def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None):
dct = super().__post_serialize__(dct, context)
if "_has_this" in dct:
del dct["_has_this"]
return dct

@classmethod
def resource_class(cls) -> Type[ModelResource]:
Expand Down Expand Up @@ -520,6 +528,12 @@ def all_constraints(self) -> List[Union[ModelLevelConstraint, ColumnLevelConstra

return constraints

@property
def has_this(self) -> bool:
if self._has_this is None:
self._has_this = statically_extract_has_name_this(self.raw_code)
return self._has_this

def infer_primary_key(self, data_tests: List["GenericTestNode"]) -> List[str]:
"""
Infers the columns that can be used as primary key of a model in the following order:
Expand Down
10 changes: 10 additions & 0 deletions core/dbt/events/core_types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1680,6 +1680,16 @@ message SnapshotTimestampWarningMsg {
SnapshotTimestampWarning data = 2;
}

// Q044
message MicrobatchExecutionDebug {
string msg = 1;
}

message MicrobatchExecutionDebugMsg {
CoreEventInfo info = 1;
MicrobatchExecutionDebug data = 2;
}

// W - Node testing

// Skipped W001
Expand Down
346 changes: 175 additions & 171 deletions core/dbt/events/core_types_pb2.py

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions core/dbt/events/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -1702,6 +1702,14 @@ def message(self) -> str:
)


class MicrobatchExecutionDebug(DebugLevel):
def code(self) -> str:
return "Q044"

def message(self) -> str:
return self.msg


# =======================================================
# W - Node testing
# =======================================================
Expand Down
7 changes: 7 additions & 0 deletions core/dbt/parser/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1461,6 +1461,13 @@ def check_valid_microbatch_config(self):
f"Microbatch model '{node.name}' must provide the optional 'lookback' config as type int, but got: {type(lookback)})."
)

# optional config: concurrent_batches (bool)
concurrent_batches = node.config.concurrent_batches
if not isinstance(concurrent_batches, bool) and concurrent_batches is not None:
raise dbt.exceptions.ParsingError(
f"Microbatch model '{node.name}' optional 'concurrent_batches' config must be of type `bool` if specified, but got: {type(concurrent_batches)})."
)

# Validate upstream node event_time (if configured)
has_input_with_event_time_config = False
for input_unique_id in node.depends_on.nodes:
Expand Down
24 changes: 18 additions & 6 deletions core/dbt/task/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from dbt.graph import Graph, GraphQueue, ResourceTypeSelector
from dbt.node_types import NodeType
from dbt.task.base import BaseRunner, resource_types_from_args
from dbt.task.run import MicrobatchModelRunner
from dbt_common.events.functions import fire_event

from .run import ModelRunner as run_model_runner
Expand Down Expand Up @@ -141,13 +142,13 @@ def handle_job_queue(self, pool, callback):

def handle_model_with_unit_tests_node(self, node, pool, callback):
self._raise_set_error()
args = [node]
args = [node, pool]
if self.config.args.single_threaded:
callback(self.call_model_and_unit_tests_runner(*args))
else:
pool.apply_async(self.call_model_and_unit_tests_runner, args=args, callback=callback)

def call_model_and_unit_tests_runner(self, node) -> RunResult:
def call_model_and_unit_tests_runner(self, node, pool) -> RunResult:
assert self.manifest
for unit_test_unique_id in self.model_to_unit_test_map[node.unique_id]:
unit_test_node = self.manifest.unit_tests[unit_test_unique_id]
Expand All @@ -166,6 +167,10 @@ def call_model_and_unit_tests_runner(self, node) -> RunResult:
if runner.node.unique_id in self._skipped_children:
cause = self._skipped_children.pop(runner.node.unique_id)
runner.do_skip(cause=cause)

if isinstance(runner, MicrobatchModelRunner):
return self.handle_microbatch_model(runner, pool)

return self.call_runner(runner)

# handle non-model-plus-unit-tests nodes
Expand All @@ -177,11 +182,12 @@ def handle_job_queue_node(self, node, pool, callback):
if runner.node.unique_id in self._skipped_children:
cause = self._skipped_children.pop(runner.node.unique_id)
runner.do_skip(cause=cause)
args = [runner]
if self.config.args.single_threaded:
callback(self.call_runner(*args))

if isinstance(runner, MicrobatchModelRunner):
callback(self.handle_microbatch_model(runner, pool))
else:
pool.apply_async(self.call_runner, args=args, callback=callback)
args = [runner]
self._submit(pool, args, callback)

# Make a map of model unique_ids to selected unit test unique_ids,
# for processing before the model.
Expand Down Expand Up @@ -210,6 +216,12 @@ def get_node_selector(self, no_unit_tests=False) -> ResourceTypeSelector:
)

def get_runner_type(self, node) -> Optional[Type[BaseRunner]]:
if (
node.resource_type == NodeType.Model
and super().get_runner_type(node) == MicrobatchModelRunner
):
return MicrobatchModelRunner

return self.RUNNER_MAP.get(node.resource_type)

# Special build compile_manifest method to pass add_test_edges to the compiler
Expand Down
Loading

0 comments on commit fd6ec71

Please sign in to comment.