Skip to content

Commit

Permalink
Gate microbatch functionality by use_microbatch_batches manifest fu…
Browse files Browse the repository at this point in the history
…nction
  • Loading branch information
QMalcolm committed Nov 5, 2024
1 parent a620919 commit e77a13c
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 4 deletions.
4 changes: 3 additions & 1 deletion core/dbt/context/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,9 @@ def resolve_limit(self) -> Optional[int]:
def resolve_event_time_filter(self, target: ManifestNode) -> Optional[EventTimeFilter]:
event_time_filter = None
if (
get_flags().require_builtin_microbatch_strategy
self.manifest.use_microbatch_batches(
project_name=self.config.project_name, adapter_type=self.config.credentials.type
)
and (isinstance(target.config, NodeConfig) or isinstance(target.config, SourceConfig))
and target.config.event_time
and self.model.config.materialized == "incremental"
Expand Down
5 changes: 4 additions & 1 deletion core/dbt/parser/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1390,7 +1390,10 @@ def check_valid_snapshot_config(self):
node.config.final_validate()

def check_valid_microbatch_config(self):
if get_flags().require_builtin_microbatch_strategy:
if self.manifest.use_microbatch_batches(
project_name=self.root_project.project_name,
adapter_type=self.root_project.credentials.type,
):
for node in self.manifest.nodes.values():
if (
node.config.materialized == "incremental"
Expand Down
5 changes: 3 additions & 2 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
RunningOperationCaughtError,
)
from dbt.exceptions import CompilationError, DbtInternalError, DbtRuntimeError
from dbt.flags import get_flags
from dbt.graph import ResourceTypeSelector
from dbt.hooks import get_hook_dict
from dbt.materializations.incremental.microbatch import MicrobatchBuilder
Expand Down Expand Up @@ -483,7 +482,9 @@ def execute(self, model, manifest):

hook_ctx = self.adapter.pre_model_hook(context_config)
if (
get_flags().require_builtin_microbatch_strategy
manifest.use_microbatch_batches(
project_name=self.config.project_name, adapter_type=self.config.credentials.type
)
and model.config.materialized == "incremental"
and model.config.incremental_strategy == "microbatch"
):
Expand Down

0 comments on commit e77a13c

Please sign in to comment.