Skip to content

Commit

Permalink
Adap 321/add iceberg incremental models (#1194)
Browse files Browse the repository at this point in the history
* Update expected rows to reflect what the append strategy actually does

* Empty commit to shift the test schema of the test off the bugged iceberg model

* More robust solution to avoid time-based model conflicts.

* prints on ci to see what's happening.

* Remove the superfluous test that is causing metadata conflicts for append.
  • Loading branch information
VersusFacit authored Sep 27, 2024
1 parent 0ae7479 commit 423111f
Showing 1 changed file with 7 additions and 8 deletions.
15 changes: 7 additions & 8 deletions tests/functional/iceberg/test_incremental_models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pytest
import time

from pathlib import Path

Expand Down Expand Up @@ -57,6 +58,8 @@


class TestIcebergIncrementalStrategies:
append: str = f"append_{hash(time.time())}"

@pytest.fixture(scope="class")
def project_config_update(self):
return {"flags": {"enable_iceberg_materializations": True}}
Expand All @@ -76,23 +79,19 @@ def setup_class(self, project):
def models(self):
return {
"upstream_table.sql": _MODEL_BASIC_TABLE_MODEL,
"append.sql": _MODEL_INCREMENTAL_ICEBERG_APPEND,
f"{self.append}.sql": _MODEL_INCREMENTAL_ICEBERG_APPEND,
"merge.sql": _MODEL_INCREMENTAL_ICEBERG_MERGE,
"delete_insert.sql": _MODEL_INCREMENTAL_ICEBERG_DELETE_INSERT,
}

def test_incremental_strategies_build(self, project, setup_class):
run_results = run_dbt()
assert len(run_results) == 4

def __check_correct_operations(self, model_name, /, rows_affected, status="SUCCESS"):
run_results = run_dbt(
["show", "--inline", f"select * from {{{{ ref('{model_name}') }}}} where world_id = 4"]
)
assert run_results[0].adapter_response["rows_affected"] == rows_affected
assert run_results[0].adapter_response["code"] == status

if model_name != "append":
if "append" not in model_name:
run_results, stdout = run_dbt_and_capture(
[
"show",
Expand All @@ -118,9 +117,9 @@ def test_incremental_strategies_with_update(self, project, setup_class):
)
)

run_results = run_dbt(["run", "-s", "append", "merge", "delete_insert"])
run_results = run_dbt(["run", "-s", self.append, "merge", "delete_insert"])
assert len(run_results) == 3

self.__check_correct_operations("append", rows_affected=3)
self.__check_correct_operations(self.append, rows_affected=2)
self.__check_correct_operations("merge", rows_affected=1)
self.__check_correct_operations("delete_insert", rows_affected=1)

0 comments on commit 423111f

Please sign in to comment.