Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Tech Debt] Fix dynamic table tests #1085

Merged
merged 6 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20240614-170858.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: Automate all manual integration tests for Dynamic Tables
time: 2024-06-14T17:08:58.231472-04:00
custom:
Author: mikealfare
Issue: "1084"
2 changes: 1 addition & 1 deletion tests/functional/adapter/dynamic_table_tests/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
{{ config(
materialized='dynamic_table',
snowflake_warehouse='DBT_TESTING',
target_lag='120 seconds',
target_lag='2 minutes',
) }}
select * from {{ ref('my_seed') }}
"""
Original file line number Diff line number Diff line change
Expand Up @@ -25,43 +25,35 @@

class SnowflakeDynamicTableChanges:
@staticmethod
def check_start_state(adapter, dynamic_table):
"""
This needs to be done manually for now until we fix the test suite's runner. The test suite's
runner cannot run queries with multiple statements. Snowflake's metadata is all behind `show`
and `describe` calls that require a second call to fetch the results; hence, the results
cannot be fetched.
"""
assert query_target_lag(adapter, dynamic_table) is None == "120 seconds"
assert query_warehouse(adapter, dynamic_table) is None == "DBT_TESTING"
def check_start_state(project, dynamic_table):
assert query_target_lag(project, dynamic_table) == "2 minutes"
assert query_warehouse(project, dynamic_table) == "DBT_TESTING"

@staticmethod
def change_config_via_alter(project, dynamic_table):
initial_model = get_model_file(project, dynamic_table)
new_model = initial_model.replace(
"target_lag='120 seconds'", "target_lag='5 minutes'"
"target_lag='2 minutes'", "target_lag='5 minutes'"
)
set_model_file(project, dynamic_table, new_model)

@staticmethod
def change_config_via_alter_downstream(project, dynamic_table):
initial_model = get_model_file(project, dynamic_table)
new_model = initial_model.replace(
"target_lag='120 seconds'", "target_lag='downstream'"
"target_lag='2 minutes'", "target_lag='DOWNSTREAM'"
)
set_model_file(project, dynamic_table, new_model)

@staticmethod
def check_state_alter_change_is_applied(adapter, dynamic_table):
# see above
assert query_target_lag(adapter, dynamic_table) == "5 minutes"
assert query_warehouse(adapter, dynamic_table) == "DBT_TESTING"
def check_state_alter_change_is_applied(project, dynamic_table):
assert query_target_lag(project, dynamic_table) == "5 minutes"
assert query_warehouse(project, dynamic_table) == "DBT_TESTING"

@staticmethod
def check_state_alter_change_is_applied_downstream(adapter, dynamic_table):
# see above
assert query_target_lag(adapter, dynamic_table) == "downstream"
assert query_warehouse(adapter, dynamic_table) == "DBT_TESTING"
def check_state_alter_change_is_applied_downstream(project, dynamic_table):
assert query_target_lag(project, dynamic_table) == "DOWNSTREAM"
assert query_warehouse(project, dynamic_table) == "DBT_TESTING"

@staticmethod
def change_config_via_replace(project, dynamic_table):
Expand Down Expand Up @@ -103,6 +95,9 @@ def setup(self, project, my_dynamic_table):
# the tests touch these files, store their contents in memory
initial_model = get_model_file(project, my_dynamic_table)

# verify the initial settings are correct in Snowflake
self.check_start_state(project, my_dynamic_table)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This used to be commented at the beginning of every test. It was moved here and will run automatically. So it looks like the comment was removed from each test, but it as effectively just uncommented.


yield

# and then reset them after the test runs
Expand All @@ -112,12 +107,19 @@ def setup(self, project, my_dynamic_table):
project.run_sql(f"drop schema if exists {project.test_schema} cascade")

def test_full_refresh_occurs_with_changes(self, project, my_dynamic_table):

# update the settings
self.change_config_via_alter(project, my_dynamic_table)
self.change_config_via_replace(project, my_dynamic_table)
_, logs = run_dbt_and_capture(
["--debug", "run", "--models", my_dynamic_table.identifier, "--full-refresh"]
)
assert self.query_relation_type(project, my_dynamic_table) == "dynamic_table"

# verify the updated settings are correct in Snowflake
self.check_state_alter_change_is_applied(project, my_dynamic_table)
self.check_state_replace_change_is_applied(project, my_dynamic_table)

# verify the settings were changed with the correct method
assert_message_in_logs(
f"Applying ALTER to: {my_dynamic_table.render().upper()}", logs.replace('"', ""), False
)
Expand All @@ -131,17 +133,16 @@ class TestSnowflakeDynamicTableChangesApply(SnowflakeDynamicTableChanges):
def project_config_update(self):
return {"models": {"on_configuration_change": OnConfigurationChangeOption.Apply.value}}

def test_change_is_applied_via_alter(self, project, adapter, my_dynamic_table):
"""
See above about the two commented assertions. In the meantime, these have been validated manually.
"""
# self.check_start_state(adapter, my_dynamic_table)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this was deleted, but it was just uncommented. See my GH comment on line 99. This is true of all lines like this, but I'm leaving one comment for the sake of brevity.

def test_change_is_applied_via_alter(self, project, my_dynamic_table):

# update the settings
self.change_config_via_alter(project, my_dynamic_table)
_, logs = run_dbt_and_capture(["--debug", "run", "--models", my_dynamic_table.name])

# self.check_state_alter_change_is_applied(adapter, my_dynamic_table)
# verify the updated settings are correct in Snowflake
self.check_state_alter_change_is_applied(project, my_dynamic_table)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're now checking in the database to verify the settings were applied. Even though that is a stronger check than checking the logs, we still want to check the logs so that we know how the change was made. If a change that was supposed to be applied via an alter is actually applied via a drop/create, that's not passing.


# verify the settings were changed with the correct method
assert_message_in_logs(
f"Applying ALTER to: {my_dynamic_table.render().upper()}", logs.replace('"', "")
)
Expand All @@ -151,17 +152,16 @@ def test_change_is_applied_via_alter(self, project, adapter, my_dynamic_table):
False,
)

def test_change_is_applied_via_alter_downstream(self, project, adapter, my_dynamic_table):
"""
See above about the two commented assertions. In the meantime, these have been validated manually.
"""
# self.check_start_state(adapter, my_dynamic_table)
def test_change_is_applied_via_alter_downstream(self, project, my_dynamic_table):

# update the settings
self.change_config_via_alter_downstream(project, my_dynamic_table)
_, logs = run_dbt_and_capture(["--debug", "run", "--models", my_dynamic_table.name])

# self.check_state_alter_change_is_applied_downstream(adapter, my_dynamic_table)
# verify the updated settings are correct in Snowflake
self.check_state_alter_change_is_applied_downstream(project, my_dynamic_table)

# verify the settings were changed with the correct method
assert_message_in_logs(
f"Applying ALTER to: {my_dynamic_table.render().upper()}", logs.replace('"', "")
)
Expand All @@ -174,16 +174,18 @@ def test_change_is_applied_via_alter_downstream(self, project, adapter, my_dynam
@pytest.mark.skip(
"dbt-snowflake does not currently monitor any changes the trigger a full refresh"
)
def test_change_is_applied_via_replace(self, project, adapter, my_dynamic_table):
# self.check_start_state(adapter, my_dynamic_table)
def test_change_is_applied_via_replace(self, project, my_dynamic_table):

# update the settings
self.change_config_via_alter(project, my_dynamic_table)
self.change_config_via_replace(project, my_dynamic_table)
_, logs = run_dbt_and_capture(["--debug", "run", "--models", my_dynamic_table.name])

# self.check_state_alter_change_is_applied(adapter, my_dynamic_table)
# self.check_state_replace_change_is_applied(adapter, my_dynamic_table)
# verify the updated settings are correct in Snowflake
self.check_state_alter_change_is_applied(project, my_dynamic_table)
self.check_state_replace_change_is_applied(project, my_dynamic_table)

# verify the settings were changed with the correct method
assert_message_in_logs(
f"Applying REPLACE to: {my_dynamic_table.render().upper()}", logs.replace('"', "")
)
Expand All @@ -194,17 +196,16 @@ class TestSnowflakeDynamicTableChangesContinue(SnowflakeDynamicTableChanges):
def project_config_update(self):
return {"models": {"on_configuration_change": OnConfigurationChangeOption.Continue.value}}

def test_change_is_not_applied_via_alter(self, project, adapter, my_dynamic_table):
"""
See above about the two commented assertions. In the meantime, these have been validated manually.
"""
# self.check_start_state(adapter, my_dynamic_table)
def test_change_is_not_applied_via_alter(self, project, my_dynamic_table):

# update the settings
self.change_config_via_alter(project, my_dynamic_table)
_, logs = run_dbt_and_capture(["--debug", "run", "--models", my_dynamic_table.name])

# self.check_start_state(adapter, my_dynamic_table)
# verify the updated settings are correct in Snowflake
self.check_start_state(project, my_dynamic_table)

# verify the settings were changed with the correct method
assert_message_in_logs(
f"Configuration changes were identified and `on_configuration_change` was set"
f" to `continue` for `{my_dynamic_table}`",
Expand All @@ -219,15 +220,17 @@ def test_change_is_not_applied_via_alter(self, project, adapter, my_dynamic_tabl
False,
)

def test_change_is_not_applied_via_replace(self, project, adapter, my_dynamic_table):
# self.check_start_state(adapter, my_dynamic_table)
def test_change_is_not_applied_via_replace(self, project, my_dynamic_table):

# update the settings
self.change_config_via_alter(project, my_dynamic_table)
self.change_config_via_replace(project, my_dynamic_table)
_, logs = run_dbt_and_capture(["--debug", "run", "--models", my_dynamic_table.name])

# self.check_start_state(adapter, my_dynamic_table)
# verify the updated settings are correct in Snowflake
self.check_start_state(project, my_dynamic_table)

# verify the settings were changed with the correct method
assert_message_in_logs(
f"Configuration changes were identified and `on_configuration_change` was set"
f" to `continue` for `{my_dynamic_table}`",
Expand All @@ -248,19 +251,18 @@ class TestSnowflakeDynamicTableChangesFailMixin(SnowflakeDynamicTableChanges):
def project_config_update(self):
return {"models": {"on_configuration_change": OnConfigurationChangeOption.Fail.value}}

def test_change_is_not_applied_via_alter(self, project, adapter, my_dynamic_table):
"""
See above about the two commented assertions. In the meantime, these have been validated manually.
"""
# self.check_start_state(adapter, my_dynamic_table)
def test_change_is_not_applied_via_alter(self, project, my_dynamic_table):

# update the settings
self.change_config_via_alter(project, my_dynamic_table)
_, logs = run_dbt_and_capture(
["--debug", "run", "--models", my_dynamic_table.name], expect_pass=False
)

# self.check_start_state(adapter, my_dynamic_table)
# verify the updated settings are correct in Snowflake
self.check_start_state(project, my_dynamic_table)

# verify the settings were changed with the correct method
assert_message_in_logs(
f"Configuration changes were identified and `on_configuration_change` was set"
f" to `fail` for `{my_dynamic_table}`",
Expand All @@ -275,17 +277,19 @@ def test_change_is_not_applied_via_alter(self, project, adapter, my_dynamic_tabl
False,
)

def test_change_is_not_applied_via_replace(self, project, adapter, my_dynamic_table):
# self.check_start_state(adapter, my_dynamic_table)
def test_change_is_not_applied_via_replace(self, project, my_dynamic_table):

# update the settings
self.change_config_via_alter(project, my_dynamic_table)
self.change_config_via_replace(project, my_dynamic_table)
_, logs = run_dbt_and_capture(
["--debug", "run", "--models", my_dynamic_table.name], expect_pass=False
)

# self.check_start_state(adapter, my_dynamic_table)
# verify the updated settings are correct in Snowflake
self.check_start_state(project, my_dynamic_table)

# verify the settings were changed with the correct method
assert_message_in_logs(
f"Configuration changes were identified and `on_configuration_change` was set"
f" to `fail` for `{my_dynamic_table}`",
Expand Down
15 changes: 7 additions & 8 deletions tests/functional/adapter/dynamic_table_tests/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from typing import Optional

import agate
from dbt.adapters.base import BaseAdapter
from dbt.tests.util import get_connection

from dbt.adapters.snowflake.relation import SnowflakeRelation
Expand Down Expand Up @@ -30,19 +29,19 @@ def query_relation_type(project, relation: SnowflakeRelation) -> Optional[str]:
return results[0].lower()


def query_target_lag(adapter, dynamic_table: SnowflakeRelation) -> Optional[str]:
config = describe_dynamic_table(adapter, dynamic_table)
def query_target_lag(project, dynamic_table: SnowflakeRelation) -> Optional[str]:
config = describe_dynamic_table(project, dynamic_table)
return config.get("target_lag")


def query_warehouse(adapter, dynamic_table: SnowflakeRelation) -> Optional[str]:
config = describe_dynamic_table(adapter, dynamic_table)
def query_warehouse(project, dynamic_table: SnowflakeRelation) -> Optional[str]:
config = describe_dynamic_table(project, dynamic_table)
return config.get("warehouse")


def describe_dynamic_table(adapter: BaseAdapter, dynamic_table: SnowflakeRelation) -> agate.Row:
with get_connection(adapter):
macro_results = adapter.execute_macro(
def describe_dynamic_table(project, dynamic_table: SnowflakeRelation) -> agate.Row:
with get_connection(project.adapter):
macro_results = project.adapter.execute_macro(
"snowflake__describe_dynamic_table", kwargs={"relation": dynamic_table}
)
config = macro_results["dynamic_table"]
Expand Down
Loading