From 57acd236afa510d5a082e83d1426c9cfead1bf4f Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Wed, 27 Nov 2024 15:26:24 -0500 Subject: [PATCH 1/2] move microbatch compilation to .compile method --- core/dbt/task/run.py | 47 ++++++++++++++++++++++---------------------- 1 file changed, 24 insertions(+), 23 deletions(-) diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 55b73be3d80..fb4ee145429 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -342,10 +342,30 @@ def __init__(self, config, adapter, node, node_index: int, num_nodes: int): self.relation_exists: bool = False def compile(self, manifest: Manifest): - # The default compile function is _always_ called. However, we do our - # compilation _later_ in `_execute_microbatch_materialization`. This - # meant the node was being compiled _twice_ for each batch. To get around - # this, we've overriden the default compile method to do nothing + if self.batch_idx is not None: + batch = self.batches[self.batch_idx] + + # LEGACY: Set start/end in context prior to re-compiling (Will be removed for 1.10+) + # TODO: REMOVE before 1.10 GA + self.node.config["__dbt_internal_microbatch_event_time_start"] = batch[0] + self.node.config["__dbt_internal_microbatch_event_time_end"] = batch[1] + # Create batch context on model node prior to re-compiling + self.node.batch = BatchContext( + id=MicrobatchBuilder.batch_id(batch[0], self.node.config.batch_size), + event_time_start=batch[0], + event_time_end=batch[1], + ) + # Recompile node to re-resolve refs with event time filters rendered, update context + self.compiler.compile_node( + self.node, + manifest, + {}, + split_suffix=MicrobatchBuilder.format_batch_start( + batch[0], self.node.config.batch_size + ), + ) + + # Skips compilation for non-batch runs return self.node def set_batch_idx(self, batch_idx: int) -> None: @@ -537,25 +557,6 @@ def _execute_microbatch_materialization( # call materialization_macro to get a batch-level run result start_time = time.perf_counter() try: - # LEGACY: Set start/end in context prior to re-compiling (Will be removed for 1.10+) - # TODO: REMOVE before 1.10 GA - model.config["__dbt_internal_microbatch_event_time_start"] = batch[0] - model.config["__dbt_internal_microbatch_event_time_end"] = batch[1] - # Create batch context on model node prior to re-compiling - model.batch = BatchContext( - id=MicrobatchBuilder.batch_id(batch[0], model.config.batch_size), - event_time_start=batch[0], - event_time_end=batch[1], - ) - # Recompile node to re-resolve refs with event time filters rendered, update context - self.compiler.compile_node( - model, - manifest, - {}, - split_suffix=MicrobatchBuilder.format_batch_start( - batch[0], model.config.batch_size - ), - ) # Update jinja context with batch context members jinja_context = microbatch_builder.build_jinja_context_for_batch( incremental_batch=self.relation_exists From af79cfc04cd03e368a7f99fca1237061d8faee06 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Wed, 27 Nov 2024 18:04:40 -0500 Subject: [PATCH 2/2] remove manifest parameter from ModelRunner._execute_model --- core/dbt/task/run.py | 24 +++--------------------- 1 file changed, 3 insertions(+), 21 deletions(-) diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index fb4ee145429..b9cf7e22a08 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -283,7 +283,6 @@ def _execute_model( hook_ctx: Any, context_config: Any, model: ModelNode, - manifest: Manifest, context: Dict[str, Any], materialization_macro: MacroProtocol, ) -> RunResult: @@ -328,9 +327,7 @@ def execute(self, model, manifest): hook_ctx = self.adapter.pre_model_hook(context_config) - return self._execute_model( - hook_ctx, context_config, model, manifest, context, materialization_macro - ) + return self._execute_model(hook_ctx, context_config, model, context, materialization_macro) class MicrobatchModelRunner(ModelRunner): @@ -522,7 +519,6 @@ def _build_run_microbatch_model_result(self, model: ModelNode) -> RunResult: def _execute_microbatch_materialization( self, model: ModelNode, - manifest: Manifest, context: Dict[str, Any], materialization_macro: MacroProtocol, ) -> RunResult: @@ -644,37 +640,23 @@ def _is_incremental(self, model) -> bool: else: return False - def _execute_microbatch_model( + def _execute_model( self, hook_ctx: Any, context_config: Any, model: ModelNode, - manifest: Manifest, context: Dict[str, Any], materialization_macro: MacroProtocol, ) -> RunResult: try: batch_result = self._execute_microbatch_materialization( - model, manifest, context, materialization_macro + model, context, materialization_macro ) finally: self.adapter.post_model_hook(context_config, hook_ctx) return batch_result - def _execute_model( - self, - hook_ctx: Any, - context_config: Any, - model: ModelNode, - manifest: Manifest, - context: Dict[str, Any], - materialization_macro: MacroProtocol, - ) -> RunResult: - return self._execute_microbatch_model( - hook_ctx, context_config, model, manifest, context, materialization_macro - ) - class RunTask(CompileTask): def __init__(