Skip to content

Commit

Permalink
[Backport 1.7.latest] Dynamic tables with Snowflake change bundle 202…
Browse files Browse the repository at this point in the history
…4_03 results in dynamic table to issue (#1057)

* backport #1049
  • Loading branch information
mikealfare authored May 21, 2024
1 parent ca79e00 commit c502788
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 25 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20240516-174337.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Update relation caching to correctly identify dynamic tables, accounting for Snowflake's `2024_03` bundle
time: 2024-05-16T17:43:37.336858-04:00
custom:
Author: mikealfare
Issue: "1016"
47 changes: 29 additions & 18 deletions dbt/adapters/snowflake/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,26 +139,37 @@ def list_relations_without_caching(self, schema_relation: SnowflakeRelation) ->
return []
raise

relations = []
quote_policy = {"database": True, "schema": True, "identifier": True}

# this can be reduced to always including `is_dynamic` once bundle `2024_03` is mandatory
columns = ["database_name", "schema_name", "name", "kind"]
for _database, _schema, _identifier, _type in results.select(columns): # type: ignore
try:
_type = self.Relation.get_relation_type(_type.lower())
except ValueError:
_type = self.Relation.External
relations.append(
self.Relation.create(
database=_database,
schema=_schema,
identifier=_identifier,
quote_policy=quote_policy,
type=_type,
)
)
if "is_dynamic" in results.column_names: # type: ignore
columns.append("is_dynamic")

return [self._parse_list_relations_result(result) for result in results.select(columns)] # type: ignore

def _parse_list_relations_result(self, result: agate.Row) -> SnowflakeRelation:
# this can be reduced to always including `is_dynamic` once bundle `2024_03` is mandatory
try:
database, schema, identifier, relation_type, is_dynamic = result
except ValueError:
database, schema, identifier, relation_type = result
is_dynamic = "N"

return relations
try:
relation_type = self.Relation.get_relation_type(relation_type.lower())
except ValueError:
relation_type = self.Relation.External

if relation_type == self.Relation.Table and is_dynamic == "Y":
relation_type = self.Relation.DynamicTable

quote_policy = {"database": True, "schema": True, "identifier": True}
return self.Relation.create(
database=database,
schema=schema,
identifier=identifier,
type=relation_type,
quote_policy=quote_policy,
)

def quote_seed_column(self, column: str, quote_config: Optional[bool]) -> str:
quote_columns: bool = False
Expand Down
4 changes: 2 additions & 2 deletions dbt/include/snowflake/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
{% for _ in range(0, max_iter) %}

{%- set paginated_sql -%}
show terse objects in {{ schema_relation }} limit {{ max_results_per_iter }} from '{{ watermark.table_name }}'
show objects in {{ schema_relation }} limit {{ max_results_per_iter }} from '{{ watermark.table_name }}'
{%- endset -%}

{%- set paginated_result = run_query(paginated_sql) %}
Expand Down Expand Up @@ -124,7 +124,7 @@
{%- set max_total_results = max_results_per_iter * max_iter -%}

{%- set sql -%}
show terse objects in {{ schema_relation }} limit {{ max_results_per_iter }}
show objects in {{ schema_relation }} limit {{ max_results_per_iter }}
{%- endset -%}

{%- set result = run_query(sql) -%}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
import os

import pytest

import json
from dbt.tests.util import run_dbt, run_dbt_and_capture

# Testing rationale:
# - snowflake SHOW TERSE OBJECTS command returns at max 10K objects in a single call
# - when dbt attempts to write into a scehma with more than 10K objects, compilation will fail
# - when dbt attempts to write into a schema with more than 10K objects, compilation will fail
# unless we paginate the result
# - however, testing this process is difficult at a full scale of 10K actual objects populated
# into a fresh testing schema
# - accordingly, we create a smaller set of views and test the looping iteration logic in
# smaller chunks

NUM_VIEWS = 100
NUM_EXPECTED_RELATIONS = 1 + NUM_VIEWS
NUM_VIEWS = 90
NUM_DYNAMIC_TABLES = 10
# the total number should be between the numbers referenced in the "passing" and "failing" macros below
# - MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING (11 iter * 10 results per iter -> 110 objects)
# - MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING_RAISE_ERROR (33 iter * 3 results per iter -> 99 objects)
NUM_EXPECTED_RELATIONS = 1 + NUM_VIEWS + NUM_DYNAMIC_TABLES

TABLE_BASE_SQL = """
{{ config(materialized='table') }}
Expand All @@ -25,6 +31,20 @@
select id from {{ ref('my_model_base') }}
""".lstrip()

DYNAMIC_TABLE = (
"""
{{ config(
materialized='dynamic_table',
target_lag='1 hour',
snowflake_warehouse='"""
+ os.getenv("SNOWFLAKE_TEST_WAREHOUSE")
+ """',
) }}
select id from {{ ref('my_model_base') }}
"""
)

MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING = """
{% macro validate_list_relations_without_caching(schema_relation) %}
{% set relation_list_result = snowflake__list_relations_without_caching(schema_relation, max_iter=11, max_results_per_iter=10) %}
Expand Down Expand Up @@ -81,7 +101,8 @@ def models(self):
my_models = {"my_model_base.sql": TABLE_BASE_SQL}
for view in range(0, NUM_VIEWS):
my_models.update({f"my_model_{view}.sql": VIEW_X_SQL})

for dynamic_table in range(0, NUM_DYNAMIC_TABLES):
my_models.update({f"my_dynamic_table_{dynamic_table}.sql": DYNAMIC_TABLE})
return my_models

@pytest.fixture(scope="class")
Expand Down Expand Up @@ -126,7 +147,8 @@ def models(self):
my_models = {"my_model_base.sql": TABLE_BASE_SQL}
for view in range(0, NUM_VIEWS):
my_models.update({f"my_model_{view}.sql": VIEW_X_SQL})

for dynamic_table in range(0, NUM_DYNAMIC_TABLES):
my_models.update({f"my_dynamic_table_{dynamic_table}.sql": DYNAMIC_TABLE})
return my_models

@pytest.fixture(scope="class")
Expand Down
89 changes: 89 additions & 0 deletions tests/functional/adapter/list_relations_tests/test_show_objects.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import os
from typing import List

import pytest

from dbt.adapters.factory import get_adapter_by_type
from dbt.adapters.snowflake import SnowflakeRelation

from dbt.tests.util import run_dbt, get_connection


SEED = """
id,value
0,red
1,yellow
2,blue
""".strip()


VIEW = """
select * from {{ ref('my_seed') }}
"""


TABLE = """
{{ config(materialized='table') }}
select * from {{ ref('my_seed') }}
"""


DYNAMIC_TABLE = (
"""
{{ config(
materialized='dynamic_table',
target_lag='1 day',
snowflake_warehouse='"""
+ os.getenv("SNOWFLAKE_TEST_WAREHOUSE")
+ """',
) }}
select * from {{ ref('my_seed') }}
"""
)


class TestShowObjects:
views: int = 10
tables: int = 10
dynamic_tables: int = 10

@pytest.fixture(scope="class")
def seeds(self):
yield {"my_seed.csv": SEED}

@pytest.fixture(scope="class")
def models(self):
models = {}
models.update({f"my_view_{i}.sql": VIEW for i in range(self.views)})
models.update({f"my_table_{i}.sql": TABLE for i in range(self.tables)})
models.update(
{f"my_dynamic_table_{i}.sql": DYNAMIC_TABLE for i in range(self.dynamic_tables)}
)
yield models

@pytest.fixture(scope="class", autouse=True)
def setup(self, project):
run_dbt(["seed"])
run_dbt(["run"])

@staticmethod
def list_relations_without_caching(project) -> List[SnowflakeRelation]:
my_adapter = get_adapter_by_type("snowflake")
schema = my_adapter.Relation.create(
database=project.database, schema=project.test_schema, identifier=""
)
with get_connection(my_adapter):
relations = my_adapter.list_relations_without_caching(schema.without_identifier())
return relations

def test_list_relations_without_caching(self, project):
relations = self.list_relations_without_caching(project)
assert len([relation for relation in relations if relation.is_view]) == self.views
assert (
len([relation for relation in relations if relation.is_table])
== self.tables + 1 # add the seed
)
assert (
len([relation for relation in relations if relation.is_dynamic_table])
== self.dynamic_tables
)

0 comments on commit c502788

Please sign in to comment.