diff --git a/.changes/unreleased/Fixes-20240516-174337.yaml b/.changes/unreleased/Fixes-20240516-174337.yaml new file mode 100644 index 000000000..955d90ed3 --- /dev/null +++ b/.changes/unreleased/Fixes-20240516-174337.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: Update relation caching to correctly identify dynamic tables, accounting for Snowflake's `2024_03` bundle +time: 2024-05-16T17:43:37.336858-04:00 +custom: + Author: mikealfare + Issue: "1016" diff --git a/dbt/adapters/snowflake/impl.py b/dbt/adapters/snowflake/impl.py index 11765533b..ea7d4ac52 100644 --- a/dbt/adapters/snowflake/impl.py +++ b/dbt/adapters/snowflake/impl.py @@ -141,26 +141,37 @@ def list_relations_without_caching(self, schema_relation: SnowflakeRelation) -> return [] raise - relations = [] - quote_policy = {"database": True, "schema": True, "identifier": True} - + # this can be reduced to always including `is_dynamic` once bundle `2024_03` is mandatory columns = ["database_name", "schema_name", "name", "kind"] - for _database, _schema, _identifier, _type in results.select(columns): # type: ignore - try: - _type = self.Relation.get_relation_type(_type.lower()) - except ValueError: - _type = self.Relation.External - relations.append( - self.Relation.create( - database=_database, - schema=_schema, - identifier=_identifier, - quote_policy=quote_policy, - type=_type, - ) - ) + if "is_dynamic" in results.column_names: # type: ignore + columns.append("is_dynamic") + + return [self._parse_list_relations_result(result) for result in results.select(columns)] # type: ignore + + def _parse_list_relations_result(self, result: agate.Row) -> SnowflakeRelation: + # this can be reduced to always including `is_dynamic` once bundle `2024_03` is mandatory + try: + database, schema, identifier, relation_type, is_dynamic = result + except ValueError: + database, schema, identifier, relation_type = result + is_dynamic = "N" - return relations + try: + relation_type = self.Relation.get_relation_type(relation_type.lower()) + except ValueError: + relation_type = self.Relation.External + + if relation_type == self.Relation.Table and is_dynamic == "Y": + relation_type = self.Relation.DynamicTable + + quote_policy = {"database": True, "schema": True, "identifier": True} + return self.Relation.create( + database=database, + schema=schema, + identifier=identifier, + type=relation_type, + quote_policy=quote_policy, + ) def quote_seed_column(self, column: str, quote_config: Optional[bool]) -> str: quote_columns: bool = False diff --git a/dbt/include/snowflake/macros/adapters.sql b/dbt/include/snowflake/macros/adapters.sql index 157738187..0bf7b7d1b 100644 --- a/dbt/include/snowflake/macros/adapters.sql +++ b/dbt/include/snowflake/macros/adapters.sql @@ -73,7 +73,7 @@ {% for _ in range(0, max_iter) %} {%- set paginated_sql -%} - show terse objects in {{ schema_relation.database }}.{{ schema_relation.schema }} limit {{ max_results_per_iter }} from '{{ watermark.table_name }}' + show objects in {{ schema_relation.database }}.{{ schema_relation.schema }} limit {{ max_results_per_iter }} from '{{ watermark.table_name }}' {%- endset -%} {%- set paginated_result = run_query(paginated_sql) %} @@ -124,7 +124,7 @@ {%- set max_total_results = max_results_per_iter * max_iter -%} {%- set sql -%} - show terse objects in {{ schema_relation.database }}.{{ schema_relation.schema }} limit {{ max_results_per_iter }} + show objects in {{ schema_relation.database }}.{{ schema_relation.schema }} limit {{ max_results_per_iter }} {%- endset -%} {%- set result = run_query(sql) -%} diff --git a/tests/functional/adapter/test_list_relations_without_caching.py b/tests/functional/adapter/list_relations_tests/test_pagination.py similarity index 86% rename from tests/functional/adapter/test_list_relations_without_caching.py rename to tests/functional/adapter/list_relations_tests/test_pagination.py index b126984a3..8f14a0012 100644 --- a/tests/functional/adapter/test_list_relations_without_caching.py +++ b/tests/functional/adapter/list_relations_tests/test_pagination.py @@ -1,3 +1,5 @@ +import os + import pytest import json @@ -5,15 +7,19 @@ # Testing rationale: # - snowflake SHOW TERSE OBJECTS command returns at max 10K objects in a single call -# - when dbt attempts to write into a scehma with more than 10K objects, compilation will fail +# - when dbt attempts to write into a schema with more than 10K objects, compilation will fail # unless we paginate the result # - however, testing this process is difficult at a full scale of 10K actual objects populated # into a fresh testing schema # - accordingly, we create a smaller set of views and test the looping iteration logic in # smaller chunks -NUM_VIEWS = 100 -NUM_EXPECTED_RELATIONS = 1 + NUM_VIEWS +NUM_VIEWS = 90 +NUM_DYNAMIC_TABLES = 10 +# the total number should be between the numbers referenced in the "passing" and "failing" macros below +# - MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING (11 iter * 10 results per iter -> 110 objects) +# - MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING_RAISE_ERROR (33 iter * 3 results per iter -> 99 objects) +NUM_EXPECTED_RELATIONS = 1 + NUM_VIEWS + NUM_DYNAMIC_TABLES TABLE_BASE_SQL = """ {{ config(materialized='table') }} @@ -25,6 +31,20 @@ select id from {{ ref('my_model_base') }} """.lstrip() +DYNAMIC_TABLE = ( + """ +{{ config( + materialized='dynamic_table', + target_lag='1 hour', + snowflake_warehouse='""" + + os.getenv("SNOWFLAKE_TEST_WAREHOUSE") + + """', +) }} + +select id from {{ ref('my_model_base') }} +""" +) + MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING = """ {% macro validate_list_relations_without_caching(schema_relation) %} {% set relation_list_result = snowflake__list_relations_without_caching(schema_relation, max_iter=11, max_results_per_iter=10) %} @@ -81,7 +101,8 @@ def models(self): my_models = {"my_model_base.sql": TABLE_BASE_SQL} for view in range(0, NUM_VIEWS): my_models.update({f"my_model_{view}.sql": VIEW_X_SQL}) - + for dynamic_table in range(0, NUM_DYNAMIC_TABLES): + my_models.update({f"my_dynamic_table_{dynamic_table}.sql": DYNAMIC_TABLE}) return my_models @pytest.fixture(scope="class") @@ -126,7 +147,8 @@ def models(self): my_models = {"my_model_base.sql": TABLE_BASE_SQL} for view in range(0, NUM_VIEWS): my_models.update({f"my_model_{view}.sql": VIEW_X_SQL}) - + for dynamic_table in range(0, NUM_DYNAMIC_TABLES): + my_models.update({f"my_dynamic_table_{dynamic_table}.sql": DYNAMIC_TABLE}) return my_models @pytest.fixture(scope="class") diff --git a/tests/functional/adapter/list_relations_tests/test_show_objects.py b/tests/functional/adapter/list_relations_tests/test_show_objects.py new file mode 100644 index 000000000..e5eee39d9 --- /dev/null +++ b/tests/functional/adapter/list_relations_tests/test_show_objects.py @@ -0,0 +1,89 @@ +import os +from typing import List + +import pytest + +from dbt.adapters.factory import get_adapter_by_type +from dbt.adapters.snowflake import SnowflakeRelation + +from dbt.tests.util import run_dbt, get_connection + + +SEED = """ +id,value +0,red +1,yellow +2,blue +""".strip() + + +VIEW = """ +select * from {{ ref('my_seed') }} +""" + + +TABLE = """ +{{ config(materialized='table') }} +select * from {{ ref('my_seed') }} +""" + + +DYNAMIC_TABLE = ( + """ +{{ config( + materialized='dynamic_table', + target_lag='1 day', + snowflake_warehouse='""" + + os.getenv("SNOWFLAKE_TEST_WAREHOUSE") + + """', +) }} +select * from {{ ref('my_seed') }} +""" +) + + +class TestShowObjects: + views: int = 10 + tables: int = 10 + dynamic_tables: int = 10 + + @pytest.fixture(scope="class") + def seeds(self): + yield {"my_seed.csv": SEED} + + @pytest.fixture(scope="class") + def models(self): + models = {} + models.update({f"my_view_{i}.sql": VIEW for i in range(self.views)}) + models.update({f"my_table_{i}.sql": TABLE for i in range(self.tables)}) + models.update( + {f"my_dynamic_table_{i}.sql": DYNAMIC_TABLE for i in range(self.dynamic_tables)} + ) + yield models + + @pytest.fixture(scope="class", autouse=True) + def setup(self, project): + run_dbt(["seed"]) + run_dbt(["run"]) + + @staticmethod + def list_relations_without_caching(project) -> List[SnowflakeRelation]: + my_adapter = get_adapter_by_type("snowflake") + schema = my_adapter.Relation.create( + database=project.database, schema=project.test_schema, identifier="" + ) + with get_connection(my_adapter): + relations = my_adapter.list_relations_without_caching(schema) + return relations + + def test_list_relations_without_caching(self, project): + relations = self.list_relations_without_caching(project) + assert len([relation for relation in relations if relation.is_view]) == self.views + assert ( + len([relation for relation in relations if relation.is_table]) + == self.tables + 1 # add the seed + ) + assert ( + len([relation for relation in relations if relation.is_dynamic_table]) + == self.dynamic_tables + )