From 18e8753f3b02026bd3153162262352d3d34d27ff Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Thu, 19 Jan 2023 16:44:22 -0800 Subject: [PATCH 1/4] Convert incremental on_schema_change tests. --- .../test_incremental_on_schema_change.py | 88 ++++++++ .../test_incremental_predicates.py | 0 .../test_incremental_unique_id.py | 0 .../models/incremental_append_new_columns.sql | 28 --- .../incremental_append_new_columns_target.sql | 19 -- .../models/incremental_fail.sql | 18 -- .../models/incremental_ignore.sql | 18 -- .../models/incremental_ignore_target.sql | 15 -- .../models/incremental_sync_all_columns.sql | 29 --- .../incremental_sync_all_columns_target.sql | 20 -- .../models/model_a.sql | 22 -- .../test_incremental_on_schema_change.py | 190 ------------------ 12 files changed, 88 insertions(+), 359 deletions(-) create mode 100644 tests/functional/adapter/incremental/test_incremental_on_schema_change.py rename tests/functional/adapter/{ => incremental}/test_incremental_predicates.py (100%) rename tests/functional/adapter/{ => incremental}/test_incremental_unique_id.py (100%) delete mode 100644 tests/integration/incremental_on_schema_change/models/incremental_append_new_columns.sql delete mode 100644 tests/integration/incremental_on_schema_change/models/incremental_append_new_columns_target.sql delete mode 100644 tests/integration/incremental_on_schema_change/models/incremental_fail.sql delete mode 100644 tests/integration/incremental_on_schema_change/models/incremental_ignore.sql delete mode 100644 tests/integration/incremental_on_schema_change/models/incremental_ignore_target.sql delete mode 100644 tests/integration/incremental_on_schema_change/models/incremental_sync_all_columns.sql delete mode 100644 tests/integration/incremental_on_schema_change/models/incremental_sync_all_columns_target.sql delete mode 100644 tests/integration/incremental_on_schema_change/models/model_a.sql delete mode 100644 tests/integration/incremental_on_schema_change/test_incremental_on_schema_change.py diff --git a/tests/functional/adapter/incremental/test_incremental_on_schema_change.py b/tests/functional/adapter/incremental/test_incremental_on_schema_change.py new file mode 100644 index 000000000..6a54f42d8 --- /dev/null +++ b/tests/functional/adapter/incremental/test_incremental_on_schema_change.py @@ -0,0 +1,88 @@ +import pytest + +from dbt.tests.util import run_dbt + +from dbt.tests.adapter.incremental.test_incremental_on_schema_change import ( + BaseIncrementalOnSchemaChangeSetup, +) + + +class IncrementalOnSchemaChangeIgnoreFail(BaseIncrementalOnSchemaChangeSetup): + def test_run_incremental_ignore(self, project): + select = "model_a incremental_ignore incremental_ignore_target" + compare_source = "incremental_ignore" + compare_target = "incremental_ignore_target" + self.run_twice_and_assert(select, compare_source, compare_target, project) + + def test_run_incremental_fail_on_schema_change(self, project): + select = "model_a incremental_fail" + run_dbt(["run", "--models", select, "--full-refresh"]) + results_two = run_dbt(["run", "--models", select], expect_pass=False) + assert "Compilation Error" in results_two[1].message + + +@pytest.mark.skip_profile( + "databricks_uc_cluster", "databricks_sql_endpoint", "databricks_uc_sql_endpoint" +) +class TestAppendOnSchemaChange(IncrementalOnSchemaChangeIgnoreFail): + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "models": { + "+file_format": "parquet", + "+incremental_strategy": "append", + } + } + + +@pytest.mark.skip_profile( + "databricks_uc_cluster", "databricks_sql_endpoint", "databricks_uc_sql_endpoint" +) +class TestInsertOverwriteOnSchemaChange(IncrementalOnSchemaChangeIgnoreFail): + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "models": { + "+file_format": "parquet", + "+partition_by": "id", + "+incremental_strategy": "insert_overwrite", + } + } + + +class TestDeltaOnSchemaChange(BaseIncrementalOnSchemaChangeSetup): + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "models": { + "+unique_key": "id", + } + } + + def run_incremental_sync_all_columns(self, project): + select = "model_a incremental_sync_all_columns incremental_sync_all_columns_target" + compare_source = "incremental_sync_all_columns" + compare_target = "incremental_sync_all_columns_target" + run_dbt(["run", "--models", select, "--full-refresh"]) + # Delta Lake doesn"t support removing columns -- show a nice compilation error + results = run_dbt(["run", "--models", select], expect_pass=False) + assert "Compilation Error" in results[1].message + + def run_incremental_sync_remove_only(self, project): + select = "model_a incremental_sync_remove_only incremental_sync_remove_only_target" + compare_source = "incremental_sync_remove_only" + compare_target = "incremental_sync_remove_only_target" + run_dbt(["run", "--models", select, "--full-refresh"]) + # Delta Lake doesn"t support removing columns -- show a nice compilation error + results = run_dbt(["run", "--models", select], expect_pass=False) + assert "Compilation Error" in results[1].message + + def test_run_incremental_append_new_columns(self, project): + # only adding new columns in supported + self.run_incremental_append_new_columns(project) + # handling columns that have been removed doesn"t work on Delta Lake today + # self.run_incremental_append_new_columns_remove_one(project) + + def test_run_incremental_sync_all_columns(self, project): + self.run_incremental_sync_all_columns(project) + self.run_incremental_sync_remove_only(project) diff --git a/tests/functional/adapter/test_incremental_predicates.py b/tests/functional/adapter/incremental/test_incremental_predicates.py similarity index 100% rename from tests/functional/adapter/test_incremental_predicates.py rename to tests/functional/adapter/incremental/test_incremental_predicates.py diff --git a/tests/functional/adapter/test_incremental_unique_id.py b/tests/functional/adapter/incremental/test_incremental_unique_id.py similarity index 100% rename from tests/functional/adapter/test_incremental_unique_id.py rename to tests/functional/adapter/incremental/test_incremental_unique_id.py diff --git a/tests/integration/incremental_on_schema_change/models/incremental_append_new_columns.sql b/tests/integration/incremental_on_schema_change/models/incremental_append_new_columns.sql deleted file mode 100644 index c1a95ac42..000000000 --- a/tests/integration/incremental_on_schema_change/models/incremental_append_new_columns.sql +++ /dev/null @@ -1,28 +0,0 @@ -{{ - config( - materialized='incremental', - on_schema_change='append_new_columns' - ) -}} - -{% set string_type = 'string' if target.type == 'bigquery' else 'varchar(10)' %} - -WITH source_data AS (SELECT * FROM {{ ref('model_a') }} ) - -{% if is_incremental() %} - -SELECT id, - cast(field1 as {{string_type}}) as field1, - cast(field2 as {{string_type}}) as field2, - cast(field3 as {{string_type}}) as field3, - cast(field4 as {{string_type}}) as field4 -FROM source_data WHERE id NOT IN (SELECT id from {{ this }} ) - -{% else %} - -SELECT id, - cast(field1 as {{string_type}}) as field1, - cast(field2 as {{string_type}}) as field2 -FROM source_data where id <= 3 - -{% endif %} diff --git a/tests/integration/incremental_on_schema_change/models/incremental_append_new_columns_target.sql b/tests/integration/incremental_on_schema_change/models/incremental_append_new_columns_target.sql deleted file mode 100644 index ad97ed5cd..000000000 --- a/tests/integration/incremental_on_schema_change/models/incremental_append_new_columns_target.sql +++ /dev/null @@ -1,19 +0,0 @@ -{{ - config(materialized='table') -}} - -{% set string_type = 'string' if target.type == 'bigquery' else 'varchar(10)' %} - -with source_data as ( - - select * from {{ ref('model_a') }} - -) - -select id - ,cast(field1 as {{string_type}}) as field1 - ,cast(field2 as {{string_type}}) as field2 - ,cast(CASE WHEN id <= 3 THEN NULL ELSE field3 END as {{string_type}}) AS field3 - ,cast(CASE WHEN id <= 3 THEN NULL ELSE field4 END as {{string_type}}) AS field4 - -from source_data \ No newline at end of file diff --git a/tests/integration/incremental_on_schema_change/models/incremental_fail.sql b/tests/integration/incremental_on_schema_change/models/incremental_fail.sql deleted file mode 100644 index 939fc20c2..000000000 --- a/tests/integration/incremental_on_schema_change/models/incremental_fail.sql +++ /dev/null @@ -1,18 +0,0 @@ -{{ - config( - materialized='incremental', - on_schema_change='fail' - ) -}} - -WITH source_data AS (SELECT * FROM {{ ref('model_a') }} ) - -{% if is_incremental() %} - -SELECT id, field1, field2 FROM source_data - -{% else %} - -SELECT id, field1, field3 FROm source_data - -{% endif %} \ No newline at end of file diff --git a/tests/integration/incremental_on_schema_change/models/incremental_ignore.sql b/tests/integration/incremental_on_schema_change/models/incremental_ignore.sql deleted file mode 100644 index 98f0a74a8..000000000 --- a/tests/integration/incremental_on_schema_change/models/incremental_ignore.sql +++ /dev/null @@ -1,18 +0,0 @@ -{{ - config( - materialized='incremental', - on_schema_change='ignore' - ) -}} - -WITH source_data AS (SELECT * FROM {{ ref('model_a') }} ) - -{% if is_incremental() %} - -SELECT id, field1, field2, field3, field4 FROM source_data WHERE id NOT IN (SELECT id from {{ this }} ) - -{% else %} - -SELECT id, field1, field2 FROM source_data LIMIT 3 - -{% endif %} \ No newline at end of file diff --git a/tests/integration/incremental_on_schema_change/models/incremental_ignore_target.sql b/tests/integration/incremental_on_schema_change/models/incremental_ignore_target.sql deleted file mode 100644 index 6fea042e3..000000000 --- a/tests/integration/incremental_on_schema_change/models/incremental_ignore_target.sql +++ /dev/null @@ -1,15 +0,0 @@ -{{ - config(materialized='table') -}} - -with source_data as ( - - select * from {{ ref('model_a') }} - -) - -select id - ,field1 - ,field2 - -from source_data \ No newline at end of file diff --git a/tests/integration/incremental_on_schema_change/models/incremental_sync_all_columns.sql b/tests/integration/incremental_on_schema_change/models/incremental_sync_all_columns.sql deleted file mode 100644 index 91a2e7384..000000000 --- a/tests/integration/incremental_on_schema_change/models/incremental_sync_all_columns.sql +++ /dev/null @@ -1,29 +0,0 @@ -{{ - config( - materialized='incremental', - on_schema_change='sync_all_columns' - ) -}} - -WITH source_data AS (SELECT * FROM {{ ref('model_a') }} ) - -{% set string_type = 'string' if target.type == 'bigquery' else 'varchar(10)' %} - -{% if is_incremental() %} - -SELECT id, - cast(field1 as {{string_type}}) as field1, - cast(field3 as {{string_type}}) as field3, -- to validate new fields - cast(field4 as {{string_type}}) AS field4 -- to validate new fields - -FROM source_data WHERE id NOT IN (SELECT id from {{ this }} ) - -{% else %} - -select id, - cast(field1 as {{string_type}}) as field1, - cast(field2 as {{string_type}}) as field2 - -from source_data where id <= 3 - -{% endif %} \ No newline at end of file diff --git a/tests/integration/incremental_on_schema_change/models/incremental_sync_all_columns_target.sql b/tests/integration/incremental_on_schema_change/models/incremental_sync_all_columns_target.sql deleted file mode 100644 index dcc34a754..000000000 --- a/tests/integration/incremental_on_schema_change/models/incremental_sync_all_columns_target.sql +++ /dev/null @@ -1,20 +0,0 @@ -{{ - config(materialized='table') -}} - -with source_data as ( - - select * from {{ ref('model_a') }} - -) - -{% set string_type = 'string' if target.type == 'bigquery' else 'varchar(10)' %} - -select id - ,cast(field1 as {{string_type}}) as field1 - --,field2 - ,cast(case when id <= 3 then null else field3 end as {{string_type}}) as field3 - ,cast(case when id <= 3 then null else field4 end as {{string_type}}) as field4 - -from source_data -order by id \ No newline at end of file diff --git a/tests/integration/incremental_on_schema_change/models/model_a.sql b/tests/integration/incremental_on_schema_change/models/model_a.sql deleted file mode 100644 index 0a970b513..000000000 --- a/tests/integration/incremental_on_schema_change/models/model_a.sql +++ /dev/null @@ -1,22 +0,0 @@ -{{ - config(materialized='table') -}} - -with source_data as ( - - select 1 as id, 'aaa' as field1, 'bbb' as field2, 111 as field3, 'TTT' as field4 - union all select 2 as id, 'ccc' as field1, 'ddd' as field2, 222 as field3, 'UUU' as field4 - union all select 3 as id, 'eee' as field1, 'fff' as field2, 333 as field3, 'VVV' as field4 - union all select 4 as id, 'ggg' as field1, 'hhh' as field2, 444 as field3, 'WWW' as field4 - union all select 5 as id, 'iii' as field1, 'jjj' as field2, 555 as field3, 'XXX' as field4 - union all select 6 as id, 'kkk' as field1, 'lll' as field2, 666 as field3, 'YYY' as field4 - -) - -select id - ,field1 - ,field2 - ,field3 - ,field4 - -from source_data \ No newline at end of file diff --git a/tests/integration/incremental_on_schema_change/test_incremental_on_schema_change.py b/tests/integration/incremental_on_schema_change/test_incremental_on_schema_change.py deleted file mode 100644 index a3aa9b074..000000000 --- a/tests/integration/incremental_on_schema_change/test_incremental_on_schema_change.py +++ /dev/null @@ -1,190 +0,0 @@ -from tests.integration.base import DBTIntegrationTest, use_profile - - -class TestIncrementalOnSchemaChange(DBTIntegrationTest): - @property - def schema(self): - return "incremental_on_schema_change" - - @property - def models(self): - return "models" - - @property - def project_config(self): - return {"config-version": 2, "test-paths": ["tests"]} - - def run_twice_and_assert(self, include, compare_source, compare_target): - - # dbt run (twice) - run_args = ["run"] - if include: - run_args.extend(("--models", include)) - results_one = self.run_dbt(run_args) - results_two = self.run_dbt(run_args) - - self.assertEqual(len(results_one), 3) - self.assertEqual(len(results_two), 3) - - self.assertTablesEqual(compare_source, compare_target) - - def run_incremental_ignore(self): - select = "model_a incremental_ignore incremental_ignore_target" - compare_source = "incremental_ignore" - compare_target = "incremental_ignore_target" - self.run_twice_and_assert(select, compare_source, compare_target) - - def run_incremental_append_new_columns(self): - select = "model_a incremental_append_new_columns incremental_append_new_columns_target" - compare_source = "incremental_append_new_columns" - compare_target = "incremental_append_new_columns_target" - self.run_twice_and_assert(select, compare_source, compare_target) - - def run_incremental_fail_on_schema_change(self): - select = "model_a incremental_fail" - results_one = self.run_dbt(["run", "--models", select, "--full-refresh"]) # noqa: F841 - results_two = self.run_dbt(["run", "--models", select], expect_pass=False) - self.assertIn("Compilation Error", results_two[1].message) - - def run_incremental_sync_all_columns(self): - # this doesn't work on Delta today - select = "model_a incremental_sync_all_columns incremental_sync_all_columns_target" - results_one = self.run_dbt(["run", "--models", select, "--full-refresh"]) # noqa: F841 - results_two = self.run_dbt(["run", "--models", select], expect_pass=False) - self.assertIn("Compilation Error", results_two[1].message) - - -class TestDeltaAppend(TestIncrementalOnSchemaChange): - @property - def project_config(self): - return { - "config-version": 2, - "test-paths": ["tests"], - "models": { - "+incremental_strategy": "append", - }, - } - - @use_profile("databricks_cluster") - def test__databricks_cluster__run_incremental_ignore(self): - self.run_incremental_ignore() - - @use_profile("databricks_cluster") - def test__databricks_cluster__run_incremental_fail_on_schema_change(self): - self.run_incremental_fail_on_schema_change() - - @use_profile("databricks_cluster") - def test__databricks_cluster__run_incremental_sync_all_columns(self): - self.run_incremental_sync_all_columns() - - @use_profile("databricks_uc_cluster") - def test__databricks_uc_cluster__run_incremental_ignore(self): - self.run_incremental_ignore() - - @use_profile("databricks_uc_cluster") - def test__databricks_uc_cluster__run_incremental_fail_on_schema_change(self): - self.run_incremental_fail_on_schema_change() - - @use_profile("databricks_uc_cluster") - def test__databricks_uc_cluster__run_incremental_sync_all_columns(self): - self.run_incremental_sync_all_columns() - - @use_profile("databricks_sql_endpoint") - def test__databricks_sql_endpoint__run_incremental_ignore(self): - self.run_incremental_ignore() - - @use_profile("databricks_sql_endpoint") - def test__databricks_sql_endpoint__run_incremental_fail_on_schema_change(self): - self.run_incremental_fail_on_schema_change() - - @use_profile("databricks_sql_endpoint") - def test__databricks_sql_endpoint__run_incremental_sync_all_columns(self): - self.run_incremental_sync_all_columns() - - @use_profile("databricks_uc_sql_endpoint") - def test__databricks_uc_sql_endpoint__run_incremental_ignore(self): - self.run_incremental_ignore() - - @use_profile("databricks_uc_sql_endpoint") - def test__databricks_uc_sql_endpoint__run_incremental_fail_on_schema_change(self): - self.run_incremental_fail_on_schema_change() - - @use_profile("databricks_uc_sql_endpoint") - def test__databricks_uc_sql_endpoint__run_incremental_sync_all_columns(self): - self.run_incremental_sync_all_columns() - - -class TestDeltaOnSchemaChange(TestIncrementalOnSchemaChange): - @property - def project_config(self): - return { - "config-version": 2, - "test-paths": ["tests"], - "models": { - "+unique_key": "id", - }, - } - - @use_profile("databricks_cluster") - def test__databricks_cluster__run_incremental_ignore(self): - self.run_incremental_ignore() - - @use_profile("databricks_cluster") - def test__databricks_cluster__run_incremental_fail_on_schema_change(self): - self.run_incremental_fail_on_schema_change() - - @use_profile("databricks_cluster") - def test__databricks_cluster__run_incremental_append_new_columns(self): - self.run_incremental_append_new_columns() - - @use_profile("databricks_cluster") - def test__databricks_cluster__run_incremental_sync_all_columns(self): - self.run_incremental_sync_all_columns() - - @use_profile("databricks_uc_cluster") - def test__databricks_uc_cluster__run_incremental_ignore(self): - self.run_incremental_ignore() - - @use_profile("databricks_uc_cluster") - def test__databricks_uc_cluster__run_incremental_fail_on_schema_change(self): - self.run_incremental_fail_on_schema_change() - - @use_profile("databricks_uc_cluster") - def test__databricks_uc_cluster__run_incremental_append_new_columns(self): - self.run_incremental_append_new_columns() - - @use_profile("databricks_uc_cluster") - def test__databricks_uc_cluster__run_incremental_sync_all_columns(self): - self.run_incremental_sync_all_columns() - - @use_profile("databricks_sql_endpoint") - def test__databricks_sql_endpoint__run_incremental_ignore(self): - self.run_incremental_ignore() - - @use_profile("databricks_sql_endpoint") - def test__databricks_sql_endpoint__run_incremental_fail_on_schema_change(self): - self.run_incremental_fail_on_schema_change() - - @use_profile("databricks_sql_endpoint") - def test__databricks_sql_endpoint__run_incremental_append_new_columns(self): - self.run_incremental_append_new_columns() - - @use_profile("databricks_sql_endpoint") - def test__databricks_sql_endpoint__run_incremental_sync_all_columns(self): - self.run_incremental_sync_all_columns() - - @use_profile("databricks_uc_sql_endpoint") - def test__databricks_uc_sql_endpoint__run_incremental_ignore(self): - self.run_incremental_ignore() - - @use_profile("databricks_uc_sql_endpoint") - def test__databricks_uc_sql_endpoint__run_incremental_fail_on_schema_change(self): - self.run_incremental_fail_on_schema_change() - - @use_profile("databricks_uc_sql_endpoint") - def test__databricks_uc_sql_endpoint__run_incremental_append_new_columns(self): - self.run_incremental_append_new_columns() - - @use_profile("databricks_uc_sql_endpoint") - def test__databricks_uc_sql_endpoint__run_incremental_sync_all_columns(self): - self.run_incremental_sync_all_columns() From bf2c53bafe1171693458be883134917fd1168a9e Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Thu, 19 Jan 2023 16:59:03 -0800 Subject: [PATCH 2/4] Fix linter. --- .../incremental/test_incremental_on_schema_change.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/functional/adapter/incremental/test_incremental_on_schema_change.py b/tests/functional/adapter/incremental/test_incremental_on_schema_change.py index 6a54f42d8..d44592f70 100644 --- a/tests/functional/adapter/incremental/test_incremental_on_schema_change.py +++ b/tests/functional/adapter/incremental/test_incremental_on_schema_change.py @@ -61,8 +61,8 @@ def project_config_update(self): def run_incremental_sync_all_columns(self, project): select = "model_a incremental_sync_all_columns incremental_sync_all_columns_target" - compare_source = "incremental_sync_all_columns" - compare_target = "incremental_sync_all_columns_target" + compare_source = "incremental_sync_all_columns" # noqa: F841 + compare_target = "incremental_sync_all_columns_target" # noqa: F841 run_dbt(["run", "--models", select, "--full-refresh"]) # Delta Lake doesn"t support removing columns -- show a nice compilation error results = run_dbt(["run", "--models", select], expect_pass=False) @@ -70,8 +70,8 @@ def run_incremental_sync_all_columns(self, project): def run_incremental_sync_remove_only(self, project): select = "model_a incremental_sync_remove_only incremental_sync_remove_only_target" - compare_source = "incremental_sync_remove_only" - compare_target = "incremental_sync_remove_only_target" + compare_source = "incremental_sync_remove_only" # noqa: F841 + compare_target = "incremental_sync_remove_only_target" # noqa: F841 run_dbt(["run", "--models", select, "--full-refresh"]) # Delta Lake doesn"t support removing columns -- show a nice compilation error results = run_dbt(["run", "--models", select], expect_pass=False) From 8be7609f78f5dcba0f69afbd9606a1a4efab3635 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Fri, 20 Jan 2023 11:03:19 -0800 Subject: [PATCH 3/4] Fix. --- .../test_incremental_on_schema_change.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/tests/functional/adapter/incremental/test_incremental_on_schema_change.py b/tests/functional/adapter/incremental/test_incremental_on_schema_change.py index d44592f70..e0cfe568d 100644 --- a/tests/functional/adapter/incremental/test_incremental_on_schema_change.py +++ b/tests/functional/adapter/incremental/test_incremental_on_schema_change.py @@ -21,10 +21,20 @@ def test_run_incremental_fail_on_schema_change(self, project): assert "Compilation Error" in results_two[1].message +class TestAppendOnSchemaChange(IncrementalOnSchemaChangeIgnoreFail): + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "models": { + "+incremental_strategy": "append", + } + } + + @pytest.mark.skip_profile( "databricks_uc_cluster", "databricks_sql_endpoint", "databricks_uc_sql_endpoint" ) -class TestAppendOnSchemaChange(IncrementalOnSchemaChangeIgnoreFail): +class TestAppendParquetOnSchemaChange(IncrementalOnSchemaChangeIgnoreFail): @pytest.fixture(scope="class") def project_config_update(self): return { @@ -50,7 +60,7 @@ def project_config_update(self): } -class TestDeltaOnSchemaChange(BaseIncrementalOnSchemaChangeSetup): +class TestMergeOnSchemaChange(BaseIncrementalOnSchemaChangeSetup): @pytest.fixture(scope="class") def project_config_update(self): return { From 4ed7f925c11a961186feaa5f958e74401d03aa8d Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Fri, 27 Jan 2023 14:39:08 -0800 Subject: [PATCH 4/4] Fix. --- .../test_incremental_on_schema_change.py | 73 ++++++++----------- 1 file changed, 31 insertions(+), 42 deletions(-) diff --git a/tests/functional/adapter/incremental/test_incremental_on_schema_change.py b/tests/functional/adapter/incremental/test_incremental_on_schema_change.py index e0cfe568d..0bbe973c2 100644 --- a/tests/functional/adapter/incremental/test_incremental_on_schema_change.py +++ b/tests/functional/adapter/incremental/test_incremental_on_schema_change.py @@ -3,25 +3,42 @@ from dbt.tests.util import run_dbt from dbt.tests.adapter.incremental.test_incremental_on_schema_change import ( + BaseIncrementalOnSchemaChange, BaseIncrementalOnSchemaChangeSetup, ) -class IncrementalOnSchemaChangeIgnoreFail(BaseIncrementalOnSchemaChangeSetup): - def test_run_incremental_ignore(self, project): - select = "model_a incremental_ignore incremental_ignore_target" - compare_source = "incremental_ignore" - compare_target = "incremental_ignore_target" - self.run_twice_and_assert(select, compare_source, compare_target, project) +class BaseIncrementalOnSchemaChangeDatabricksSetup(BaseIncrementalOnSchemaChangeSetup): + def run_incremental_sync_all_columns(self, project): + select = "model_a incremental_sync_all_columns incremental_sync_all_columns_target" + compare_source = "incremental_sync_all_columns" # noqa: F841 + compare_target = "incremental_sync_all_columns_target" # noqa: F841 + run_dbt(["run", "--models", select, "--full-refresh"]) + # Delta Lake doesn"t support removing columns -- show a nice compilation error + results = run_dbt(["run", "--models", select], expect_pass=False) + assert "Compilation Error" in results[1].message - def test_run_incremental_fail_on_schema_change(self, project): - select = "model_a incremental_fail" + def run_incremental_sync_remove_only(self, project): + select = "model_a incremental_sync_remove_only incremental_sync_remove_only_target" + compare_source = "incremental_sync_remove_only" # noqa: F841 + compare_target = "incremental_sync_remove_only_target" # noqa: F841 run_dbt(["run", "--models", select, "--full-refresh"]) - results_two = run_dbt(["run", "--models", select], expect_pass=False) - assert "Compilation Error" in results_two[1].message + # Delta Lake doesn"t support removing columns -- show a nice compilation error + results = run_dbt(["run", "--models", select], expect_pass=False) + assert "Compilation Error" in results[1].message + + +class BaseIncrementalOnSchemaChangeDatabricks( + BaseIncrementalOnSchemaChange, BaseIncrementalOnSchemaChangeDatabricksSetup +): + def test_run_incremental_append_new_columns(self, project): + # only adding new columns in supported + self.run_incremental_append_new_columns(project) + # handling columns that have been removed doesn"t work on Delta Lake today + # self.run_incremental_append_new_columns_remove_one(project) -class TestAppendOnSchemaChange(IncrementalOnSchemaChangeIgnoreFail): +class TestAppendOnSchemaChangeDatabricks(BaseIncrementalOnSchemaChangeDatabricks): @pytest.fixture(scope="class") def project_config_update(self): return { @@ -34,7 +51,7 @@ def project_config_update(self): @pytest.mark.skip_profile( "databricks_uc_cluster", "databricks_sql_endpoint", "databricks_uc_sql_endpoint" ) -class TestAppendParquetOnSchemaChange(IncrementalOnSchemaChangeIgnoreFail): +class TestAppendParquetOnSchemaChangeDatabricks(BaseIncrementalOnSchemaChangeDatabricks): @pytest.fixture(scope="class") def project_config_update(self): return { @@ -48,7 +65,7 @@ def project_config_update(self): @pytest.mark.skip_profile( "databricks_uc_cluster", "databricks_sql_endpoint", "databricks_uc_sql_endpoint" ) -class TestInsertOverwriteOnSchemaChange(IncrementalOnSchemaChangeIgnoreFail): +class TestInsertOverwriteOnSchemaChangeDatabricks(BaseIncrementalOnSchemaChangeDatabricks): @pytest.fixture(scope="class") def project_config_update(self): return { @@ -60,7 +77,7 @@ def project_config_update(self): } -class TestMergeOnSchemaChange(BaseIncrementalOnSchemaChangeSetup): +class TestMergeOnSchemaChangeDatabricks(BaseIncrementalOnSchemaChangeDatabricks): @pytest.fixture(scope="class") def project_config_update(self): return { @@ -68,31 +85,3 @@ def project_config_update(self): "+unique_key": "id", } } - - def run_incremental_sync_all_columns(self, project): - select = "model_a incremental_sync_all_columns incremental_sync_all_columns_target" - compare_source = "incremental_sync_all_columns" # noqa: F841 - compare_target = "incremental_sync_all_columns_target" # noqa: F841 - run_dbt(["run", "--models", select, "--full-refresh"]) - # Delta Lake doesn"t support removing columns -- show a nice compilation error - results = run_dbt(["run", "--models", select], expect_pass=False) - assert "Compilation Error" in results[1].message - - def run_incremental_sync_remove_only(self, project): - select = "model_a incremental_sync_remove_only incremental_sync_remove_only_target" - compare_source = "incremental_sync_remove_only" # noqa: F841 - compare_target = "incremental_sync_remove_only_target" # noqa: F841 - run_dbt(["run", "--models", select, "--full-refresh"]) - # Delta Lake doesn"t support removing columns -- show a nice compilation error - results = run_dbt(["run", "--models", select], expect_pass=False) - assert "Compilation Error" in results[1].message - - def test_run_incremental_append_new_columns(self, project): - # only adding new columns in supported - self.run_incremental_append_new_columns(project) - # handling columns that have been removed doesn"t work on Delta Lake today - # self.run_incremental_append_new_columns_remove_one(project) - - def test_run_incremental_sync_all_columns(self, project): - self.run_incremental_sync_all_columns(project) - self.run_incremental_sync_remove_only(project)