From 101aad24eddd87eecb2b02674fd382d4579d98ae Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Wed, 25 Sep 2024 22:28:55 +0100 Subject: [PATCH] Microbatch Strategy (#1108) --- .../unreleased/Features-20240925-125242.yaml | 6 ++++++ CONTRIBUTING.md | 1 + .../incremental/incremental.sql | 2 +- .../incremental/strategies.sql | 11 ++++++++++ .../materializations/incremental/validate.sql | 6 +++--- .../incremental_strategies/test_microbatch.py | 21 +++++++++++++++++++ 6 files changed, 43 insertions(+), 4 deletions(-) create mode 100644 .changes/unreleased/Features-20240925-125242.yaml create mode 100644 tests/functional/adapter/incremental_strategies/test_microbatch.py diff --git a/.changes/unreleased/Features-20240925-125242.yaml b/.changes/unreleased/Features-20240925-125242.yaml new file mode 100644 index 000000000..1cb51c004 --- /dev/null +++ b/.changes/unreleased/Features-20240925-125242.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Add Microbatch Strategy to dbt-spark +time: 2024-09-25T12:52:42.872017+01:00 +custom: + Author: michelleark + Issue: "1109" diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 6fcaacea8..903507b7a 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -78,6 +78,7 @@ python dagger/run_dbt_spark_tests.py --profile databricks_sql_endpoint --test-pa _options_: - "apache_spark" - "spark_session" + - "spark_http_odbc" - "databricks_sql_endpoint" - "databricks_cluster" - "databricks_http_cluster" diff --git a/dbt/include/spark/macros/materializations/incremental/incremental.sql b/dbt/include/spark/macros/materializations/incremental/incremental.sql index 9a66bab51..77bfc59c9 100644 --- a/dbt/include/spark/macros/materializations/incremental/incremental.sql +++ b/dbt/include/spark/macros/materializations/incremental/incremental.sql @@ -24,7 +24,7 @@ {%- endif -%} {#-- Set Overwrite Mode --#} - {%- if strategy == 'insert_overwrite' and partition_by -%} + {%- if strategy in ['insert_overwrite', 'microbatch'] and partition_by -%} {%- call statement() -%} set spark.sql.sources.partitionOverwriteMode = DYNAMIC {%- endcall -%} diff --git a/dbt/include/spark/macros/materializations/incremental/strategies.sql b/dbt/include/spark/macros/materializations/incremental/strategies.sql index eeb920493..4ffead6a0 100644 --- a/dbt/include/spark/macros/materializations/incremental/strategies.sql +++ b/dbt/include/spark/macros/materializations/incremental/strategies.sql @@ -75,6 +75,17 @@ {%- elif strategy == 'insert_overwrite' -%} {#-- insert statements don't like CTEs, so support them via a temp view #} {{ get_insert_overwrite_sql(source, target, existing) }} + {%- elif strategy == 'microbatch' -%} + {#-- microbatch wraps insert_overwrite, and requires a partition_by config #} + {% set missing_partition_key_microbatch_msg -%} + dbt-spark 'microbatch' incremental strategy requires a `partition_by` config. + Ensure you are using a `partition_by` column that is of grain {{ config.get('batch_size') }}. + {%- endset %} + + {%- if not config.get('partition_by') -%} + {{ exceptions.raise_compiler_error(missing_partition_key_microbatch_msg) }} + {%- endif -%} + {{ get_insert_overwrite_sql(source, target, existing) }} {%- elif strategy == 'merge' -%} {#-- merge all columns for datasources which implement MERGE INTO (e.g. databricks, iceberg) - schema changes are handled for us #} {{ get_merge_sql(target, source, unique_key, dest_columns=none, incremental_predicates=incremental_predicates) }} diff --git a/dbt/include/spark/macros/materializations/incremental/validate.sql b/dbt/include/spark/macros/materializations/incremental/validate.sql index 0d4c4d8b6..4a1ac9943 100644 --- a/dbt/include/spark/macros/materializations/incremental/validate.sql +++ b/dbt/include/spark/macros/materializations/incremental/validate.sql @@ -21,7 +21,7 @@ {% set invalid_strategy_msg -%} Invalid incremental strategy provided: {{ raw_strategy }} - Expected one of: 'append', 'merge', 'insert_overwrite' + Expected one of: 'append', 'merge', 'insert_overwrite', 'microbatch' {%- endset %} {% set invalid_merge_msg -%} @@ -35,13 +35,13 @@ Use the 'append' or 'merge' strategy instead {%- endset %} - {% if raw_strategy not in ['append', 'merge', 'insert_overwrite'] %} + {% if raw_strategy not in ['append', 'merge', 'insert_overwrite', 'microbatch'] %} {% do exceptions.raise_compiler_error(invalid_strategy_msg) %} {%-else %} {% if raw_strategy == 'merge' and file_format not in ['delta', 'iceberg', 'hudi'] %} {% do exceptions.raise_compiler_error(invalid_merge_msg) %} {% endif %} - {% if raw_strategy == 'insert_overwrite' and target.endpoint %} + {% if raw_strategy in ['insert_overwrite', 'microbatch'] and target.endpoint %} {% do exceptions.raise_compiler_error(invalid_insert_overwrite_endpoint_msg) %} {% endif %} {% endif %} diff --git a/tests/functional/adapter/incremental_strategies/test_microbatch.py b/tests/functional/adapter/incremental_strategies/test_microbatch.py new file mode 100644 index 000000000..088b35baf --- /dev/null +++ b/tests/functional/adapter/incremental_strategies/test_microbatch.py @@ -0,0 +1,21 @@ +import pytest + +from dbt.tests.adapter.incremental.test_incremental_microbatch import ( + BaseMicrobatch, +) + +# No requirement for a unique_id for spark microbatch! +_microbatch_model_no_unique_id_sql = """ +{{ config(materialized='incremental', incremental_strategy='microbatch', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0), partition_by=['date_day'], file_format='parquet') }} +select *, cast(event_time as date) as date_day +from {{ ref('input_model') }} +""" + + +@pytest.mark.skip_profile( + "databricks_http_cluster", "databricks_sql_endpoint", "spark_session", "spark_http_odbc" +) +class TestMicrobatch(BaseMicrobatch): + @pytest.fixture(scope="class") + def microbatch_model_sql(self) -> str: + return _microbatch_model_no_unique_id_sql