Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 1.6.latest] Partial parsing issue when adding groups and updating models at the same time #8839

Merged
merged 3 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20231010-202148.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Partial parsing fix for adding groups and updating models at the same time
time: 2023-10-10T20:21:48.154666-04:00
custom:
Author: gshank
Issue: "8697"
40 changes: 25 additions & 15 deletions core/dbt/parser/partial.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,10 @@ def remove_node_in_saved(self, source_file, unique_id):
file_id = node.patch_path
# it might be changed... then what?
if file_id not in self.file_diff["deleted"] and file_id in self.saved_files:
# schema_files should already be updated
# Schema files should already be updated if this comes from a node,
# but this code is also called when updating groups and exposures.
# This might save the old schema file element, so when the schema file
# is processed, it should overwrite it by passing True to "merge_patch"
schema_file = self.saved_files[file_id]
dict_key = parse_file_type_to_key[source_file.parse_file_type]
# look for a matching list dictionary
Expand Down Expand Up @@ -613,13 +616,13 @@ def handle_schema_file_changes(self, schema_file, saved_yaml_dict, new_yaml_dict
if key_diff["changed"]:
for elem in key_diff["changed"]:
self.delete_schema_mssa_links(schema_file, dict_key, elem)
self.merge_patch(schema_file, dict_key, elem)
self.merge_patch(schema_file, dict_key, elem, True)
if key_diff["deleted"]:
for elem in key_diff["deleted"]:
self.delete_schema_mssa_links(schema_file, dict_key, elem)
if key_diff["added"]:
for elem in key_diff["added"]:
self.merge_patch(schema_file, dict_key, elem)
self.merge_patch(schema_file, dict_key, elem, True)
# Handle schema file updates due to env_var changes
if dict_key in env_var_changes and dict_key in new_yaml_dict:
for name in env_var_changes[dict_key]:
Expand All @@ -628,7 +631,7 @@ def handle_schema_file_changes(self, schema_file, saved_yaml_dict, new_yaml_dict
elem = self.get_schema_element(new_yaml_dict[dict_key], name)
if elem:
self.delete_schema_mssa_links(schema_file, dict_key, elem)
self.merge_patch(schema_file, dict_key, elem)
self.merge_patch(schema_file, dict_key, elem, True)

# sources
dict_key = "sources"
Expand All @@ -638,7 +641,7 @@ def handle_schema_file_changes(self, schema_file, saved_yaml_dict, new_yaml_dict
if "overrides" in source: # This is a source patch; need to re-parse orig source
self.remove_source_override_target(source)
self.delete_schema_source(schema_file, source)
self.merge_patch(schema_file, dict_key, source)
self.merge_patch(schema_file, dict_key, source, True)
if source_diff["deleted"]:
for source in source_diff["deleted"]:
if "overrides" in source: # This is a source patch; need to re-parse orig source
Expand All @@ -648,7 +651,7 @@ def handle_schema_file_changes(self, schema_file, saved_yaml_dict, new_yaml_dict
for source in source_diff["added"]:
if "overrides" in source: # This is a source patch; need to re-parse orig source
self.remove_source_override_target(source)
self.merge_patch(schema_file, dict_key, source)
self.merge_patch(schema_file, dict_key, source, True)
# Handle schema file updates due to env_var changes
if dict_key in env_var_changes and dict_key in new_yaml_dict:
for name in env_var_changes[dict_key]:
Expand All @@ -659,7 +662,7 @@ def handle_schema_file_changes(self, schema_file, saved_yaml_dict, new_yaml_dict
if "overrides" in source:
self.remove_source_override_target(source)
self.delete_schema_source(schema_file, source)
self.merge_patch(schema_file, dict_key, source)
self.merge_patch(schema_file, dict_key, source, True)

def handle_change(key: str, delete: Callable):
self._handle_element_change(
Expand All @@ -679,13 +682,13 @@ def _handle_element_change(
if element_diff["changed"]:
for element in element_diff["changed"]:
delete(schema_file, element)
self.merge_patch(schema_file, dict_key, element)
self.merge_patch(schema_file, dict_key, element, True)
if element_diff["deleted"]:
for element in element_diff["deleted"]:
delete(schema_file, element)
if element_diff["added"]:
for element in element_diff["added"]:
self.merge_patch(schema_file, dict_key, element)
self.merge_patch(schema_file, dict_key, element, True)
# Handle schema file updates due to env_var changes
if dict_key in env_var_changes and dict_key in new_yaml_dict:
for name in env_var_changes[dict_key]:
Expand All @@ -694,7 +697,7 @@ def _handle_element_change(
elem = self.get_schema_element(new_yaml_dict[dict_key], name)
if elem:
delete(schema_file, elem)
self.merge_patch(schema_file, dict_key, elem)
self.merge_patch(schema_file, dict_key, elem, True)

# Take a "section" of the schema file yaml dictionary from saved and new schema files
# and determine which parts have changed
Expand Down Expand Up @@ -737,21 +740,28 @@ def get_diff_for(self, key, saved_yaml_dict, new_yaml_dict):
}
return diff

# Merge a patch file into the pp_dict in a schema file
def merge_patch(self, schema_file, key, patch):
# Merge a patch file into the pp_dict in a schema file. The "new_patch"
# flag indicates that we're processing a schema file, so if a matching
# patch has already been scheduled, replace it.
def merge_patch(self, schema_file, key, patch, new_patch=False):
if schema_file.pp_dict is None:
schema_file.pp_dict = {}
pp_dict = schema_file.pp_dict
if key not in pp_dict:
pp_dict[key] = [patch]
else:
# check that this patch hasn't already been saved
found = False
found_elem = None
for elem in pp_dict[key]:
if elem["name"] == patch["name"]:
found = True
if not found:
found_elem = elem
if not found_elem:
pp_dict[key].append(patch)
elif found_elem and new_patch:
# remove patch and replace with new one
pp_dict[key].remove(found_elem)
pp_dict[key].append(patch)

schema_file.delete_from_env_vars(key, patch["name"])
self.add_to_pp_files(schema_file)

Expand Down
96 changes: 1 addition & 95 deletions tests/functional/partial_parsing/test_partial_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,22 +64,15 @@
gsm_override_sql,
gsm_override2_sql,
orders_sql,
orders_downstream_sql,
snapshot_sql,
snapshot2_sql,
generic_schema_yml,
generic_test_sql,
generic_test_schema_yml,
generic_test_edited_sql,
groups_schema_yml_one_group,
groups_schema_yml_two_groups,
groups_schema_yml_two_groups_edited,
groups_schema_yml_one_group_model_in_group2,
groups_schema_yml_two_groups_private_orders_valid_access,
groups_schema_yml_two_groups_private_orders_invalid_access,
)

from dbt.exceptions import CompilationError, ParsingError
from dbt.exceptions import CompilationError
from dbt.contracts.files import ParseFileType
from dbt.contracts.results import TestStatus
from dbt.plugins.manifest import PluginNodes, ModelNodeArgs
Expand Down Expand Up @@ -663,93 +656,6 @@ def test_pp_generic_tests(self, project):
assert expected_nodes == list(manifest.nodes.keys())


class TestGroups:
@pytest.fixture(scope="class")
def models(self):
return {
"orders.sql": orders_sql,
"orders_downstream.sql": orders_downstream_sql,
"schema.yml": groups_schema_yml_one_group,
}

def test_pp_groups(self, project):

# initial run
results = run_dbt()
assert len(results) == 2
manifest = get_manifest(project.project_root)
expected_nodes = ["model.test.orders", "model.test.orders_downstream"]
expected_groups = ["group.test.test_group"]
assert expected_nodes == sorted(list(manifest.nodes.keys()))
assert expected_groups == sorted(list(manifest.groups.keys()))

# add group to schema
write_file(groups_schema_yml_two_groups, project.project_root, "models", "schema.yml")
results = run_dbt(["--partial-parse", "run"])
assert len(results) == 2
manifest = get_manifest(project.project_root)
expected_nodes = ["model.test.orders", "model.test.orders_downstream"]
expected_groups = ["group.test.test_group", "group.test.test_group2"]
assert expected_nodes == sorted(list(manifest.nodes.keys()))
assert expected_groups == sorted(list(manifest.groups.keys()))

# edit group in schema
write_file(
groups_schema_yml_two_groups_edited, project.project_root, "models", "schema.yml"
)
results = run_dbt(["--partial-parse", "run"])
assert len(results) == 2
manifest = get_manifest(project.project_root)
expected_nodes = ["model.test.orders", "model.test.orders_downstream"]
expected_groups = ["group.test.test_group", "group.test.test_group2_edited"]
assert expected_nodes == sorted(list(manifest.nodes.keys()))
assert expected_groups == sorted(list(manifest.groups.keys()))

# delete group in schema
write_file(groups_schema_yml_one_group, project.project_root, "models", "schema.yml")
results = run_dbt(["--partial-parse", "run"])
assert len(results) == 2
manifest = get_manifest(project.project_root)
expected_nodes = ["model.test.orders", "model.test.orders_downstream"]
expected_groups = ["group.test.test_group"]
assert expected_nodes == sorted(list(manifest.nodes.keys()))
assert expected_groups == sorted(list(manifest.groups.keys()))

# add back second group
write_file(groups_schema_yml_two_groups, project.project_root, "models", "schema.yml")
results = run_dbt(["--partial-parse", "run"])
assert len(results) == 2

# remove second group with model still configured to second group
write_file(
groups_schema_yml_one_group_model_in_group2,
project.project_root,
"models",
"schema.yml",
)
with pytest.raises(ParsingError):
results = run_dbt(["--partial-parse", "run"])

# add back second group, make orders private with valid ref
write_file(
groups_schema_yml_two_groups_private_orders_valid_access,
project.project_root,
"models",
"schema.yml",
)
results = run_dbt(["--partial-parse", "run"])
assert len(results) == 2

write_file(
groups_schema_yml_two_groups_private_orders_invalid_access,
project.project_root,
"models",
"schema.yml",
)
with pytest.raises(ParsingError):
results = run_dbt(["--partial-parse", "run"])


class TestExternalModels:
@pytest.fixture(scope="class")
def external_model_node(self):
Expand Down
Loading
Loading