Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 1.7.latest] Dynamic tables with Snowflake change bundle 2024_03 results in dynamic table to issue #1057

Merged
merged 3 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20240516-174337.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Update relation caching to correctly identify dynamic tables, accounting for Snowflake's `2024_03` bundle
time: 2024-05-16T17:43:37.336858-04:00
custom:
Author: mikealfare
Issue: "1016"
47 changes: 29 additions & 18 deletions dbt/adapters/snowflake/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,26 +139,37 @@ def list_relations_without_caching(self, schema_relation: SnowflakeRelation) ->
return []
raise

relations = []
quote_policy = {"database": True, "schema": True, "identifier": True}

# this can be reduced to always including `is_dynamic` once bundle `2024_03` is mandatory
columns = ["database_name", "schema_name", "name", "kind"]
for _database, _schema, _identifier, _type in results.select(columns): # type: ignore
try:
_type = self.Relation.get_relation_type(_type.lower())
except ValueError:
_type = self.Relation.External
relations.append(
self.Relation.create(
database=_database,
schema=_schema,
identifier=_identifier,
quote_policy=quote_policy,
type=_type,
)
)
if "is_dynamic" in results.column_names: # type: ignore
columns.append("is_dynamic")

return [self._parse_list_relations_result(result) for result in results.select(columns)] # type: ignore

def _parse_list_relations_result(self, result: agate.Row) -> SnowflakeRelation:
# this can be reduced to always including `is_dynamic` once bundle `2024_03` is mandatory
try:
database, schema, identifier, relation_type, is_dynamic = result
except ValueError:
database, schema, identifier, relation_type = result
is_dynamic = "N"

return relations
try:
relation_type = self.Relation.get_relation_type(relation_type.lower())
except ValueError:
relation_type = self.Relation.External

if relation_type == self.Relation.Table and is_dynamic == "Y":
relation_type = self.Relation.DynamicTable

quote_policy = {"database": True, "schema": True, "identifier": True}
return self.Relation.create(
database=database,
schema=schema,
identifier=identifier,
type=relation_type,
quote_policy=quote_policy,
)

def quote_seed_column(self, column: str, quote_config: Optional[bool]) -> str:
quote_columns: bool = False
Expand Down
4 changes: 2 additions & 2 deletions dbt/include/snowflake/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
{% for _ in range(0, max_iter) %}

{%- set paginated_sql -%}
show terse objects in {{ schema_relation }} limit {{ max_results_per_iter }} from '{{ watermark.table_name }}'
show objects in {{ schema_relation.database }}.{{ schema_relation.schema }} limit {{ max_results_per_iter }} from '{{ watermark.table_name }}'
{%- endset -%}

{%- set paginated_result = run_query(paginated_sql) %}
Expand Down Expand Up @@ -124,7 +124,7 @@
{%- set max_total_results = max_results_per_iter * max_iter -%}

{%- set sql -%}
show terse objects in {{ schema_relation }} limit {{ max_results_per_iter }}
show objects in {{ schema_relation.database }}.{{ schema_relation.schema }} limit {{ max_results_per_iter }}
{%- endset -%}

{%- set result = run_query(sql) -%}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
import os

import pytest

import json
from dbt.tests.util import run_dbt, run_dbt_and_capture

# Testing rationale:
# - snowflake SHOW TERSE OBJECTS command returns at max 10K objects in a single call
# - when dbt attempts to write into a scehma with more than 10K objects, compilation will fail
# - when dbt attempts to write into a schema with more than 10K objects, compilation will fail
# unless we paginate the result
# - however, testing this process is difficult at a full scale of 10K actual objects populated
# into a fresh testing schema
# - accordingly, we create a smaller set of views and test the looping iteration logic in
# smaller chunks

NUM_VIEWS = 100
NUM_EXPECTED_RELATIONS = 1 + NUM_VIEWS
NUM_VIEWS = 90
NUM_DYNAMIC_TABLES = 10
# the total number should be between the numbers referenced in the "passing" and "failing" macros below
# - MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING (11 iter * 10 results per iter -> 110 objects)
# - MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING_RAISE_ERROR (33 iter * 3 results per iter -> 99 objects)
NUM_EXPECTED_RELATIONS = 1 + NUM_VIEWS + NUM_DYNAMIC_TABLES

TABLE_BASE_SQL = """
{{ config(materialized='table') }}
Expand All @@ -25,6 +31,20 @@
select id from {{ ref('my_model_base') }}
""".lstrip()

DYNAMIC_TABLE = (
"""
{{ config(
materialized='dynamic_table',
target_lag='1 hour',
snowflake_warehouse='"""
+ os.getenv("SNOWFLAKE_TEST_WAREHOUSE")
+ """',
) }}

select id from {{ ref('my_model_base') }}
"""
)

MACROS__VALIDATE__SNOWFLAKE__LIST_RELATIONS_WITHOUT_CACHING = """
{% macro validate_list_relations_without_caching(schema_relation) %}
{% set relation_list_result = snowflake__list_relations_without_caching(schema_relation, max_iter=11, max_results_per_iter=10) %}
Expand Down Expand Up @@ -81,7 +101,8 @@ def models(self):
my_models = {"my_model_base.sql": TABLE_BASE_SQL}
for view in range(0, NUM_VIEWS):
my_models.update({f"my_model_{view}.sql": VIEW_X_SQL})

for dynamic_table in range(0, NUM_DYNAMIC_TABLES):
my_models.update({f"my_dynamic_table_{dynamic_table}.sql": DYNAMIC_TABLE})
return my_models

@pytest.fixture(scope="class")
Expand Down Expand Up @@ -126,7 +147,8 @@ def models(self):
my_models = {"my_model_base.sql": TABLE_BASE_SQL}
for view in range(0, NUM_VIEWS):
my_models.update({f"my_model_{view}.sql": VIEW_X_SQL})

for dynamic_table in range(0, NUM_DYNAMIC_TABLES):
my_models.update({f"my_dynamic_table_{dynamic_table}.sql": DYNAMIC_TABLE})
return my_models

@pytest.fixture(scope="class")
Expand Down
89 changes: 89 additions & 0 deletions tests/functional/adapter/list_relations_tests/test_show_objects.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import os
from typing import List

import pytest

from dbt.adapters.factory import get_adapter_by_type
from dbt.adapters.snowflake import SnowflakeRelation

from dbt.tests.util import run_dbt, get_connection


SEED = """
id,value
0,red
1,yellow
2,blue
""".strip()


VIEW = """
select * from {{ ref('my_seed') }}
"""


TABLE = """
{{ config(materialized='table') }}
select * from {{ ref('my_seed') }}
"""


DYNAMIC_TABLE = (
"""
{{ config(
materialized='dynamic_table',
target_lag='1 day',
snowflake_warehouse='"""
+ os.getenv("SNOWFLAKE_TEST_WAREHOUSE")
+ """',
) }}
select * from {{ ref('my_seed') }}
"""
)


class TestShowObjects:
views: int = 10
tables: int = 10
dynamic_tables: int = 10

@pytest.fixture(scope="class")
def seeds(self):
yield {"my_seed.csv": SEED}

@pytest.fixture(scope="class")
def models(self):
models = {}
models.update({f"my_view_{i}.sql": VIEW for i in range(self.views)})
models.update({f"my_table_{i}.sql": TABLE for i in range(self.tables)})
models.update(
{f"my_dynamic_table_{i}.sql": DYNAMIC_TABLE for i in range(self.dynamic_tables)}
)
yield models

@pytest.fixture(scope="class", autouse=True)
def setup(self, project):
run_dbt(["seed"])
run_dbt(["run"])

@staticmethod
def list_relations_without_caching(project) -> List[SnowflakeRelation]:
my_adapter = get_adapter_by_type("snowflake")
schema = my_adapter.Relation.create(
database=project.database, schema=project.test_schema, identifier=""
)
with get_connection(my_adapter):
relations = my_adapter.list_relations_without_caching(schema)
return relations

def test_list_relations_without_caching(self, project):
relations = self.list_relations_without_caching(project)
assert len([relation for relation in relations if relation.is_view]) == self.views
assert (
len([relation for relation in relations if relation.is_table])
== self.tables + 1 # add the seed
)
assert (
len([relation for relation in relations if relation.is_dynamic_table])
== self.dynamic_tables
)
Loading