Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add edges #10888

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions core/dbt/compilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,10 @@ def add_test_edges(self, manifest: Manifest) -> None:
#
# Produce the following graph:
# model1 --> model2 --> model3
# | /\ | /\ /\
# | | \/ | |
# \/ | test2 ----| |
# test1 ----|---------------|
# | /\ | /\
# | | \/ |
# \/ | test2 ----|
# test1 ----|

for node_id in self.graph:
# If node is executable (in manifest.nodes) and does _not_
Expand All @@ -224,11 +224,17 @@ def add_test_edges(self, manifest: Manifest) -> None:
# Get *everything* upstream of the node
all_upstream_nodes = nx.traversal.bfs_tree(self.graph, node_id, reverse=True)
# Get the set of upstream nodes not including the current node.
upstream_nodes = set([n for n in all_upstream_nodes if n != node_id])
all_upstream_nodes = set([n for n in all_upstream_nodes if n != node_id])

# Get all tests that depend on any upstream nodes.
# Get 1 depth upsteam of the node
direct_upsteam_nodes = nx.traversal.bfs_tree(
self.graph, node_id, reverse=True, depth_limit=1
)
direct_upsteam_nodes = set([n for n in direct_upsteam_nodes if n != node_id])

# Get all tests that depend on direct upstream nodes.
upstream_tests = []
for upstream_node in upstream_nodes:
for upstream_node in direct_upsteam_nodes:
# This gets tests with unique_ids starting with "test."
upstream_tests += _get_tests_for_node(manifest, upstream_node)

Expand All @@ -244,7 +250,7 @@ def add_test_edges(self, manifest: Manifest) -> None:
# If the set of nodes that an upstream test depends on
# is a subset of all upstream nodes of the current node,
# add an edge from the upstream test to the current node.
if test_depends_on.issubset(upstream_nodes):
if test_depends_on.issubset(all_upstream_nodes):
self.graph.add_edge(upstream_test, node_id, edge_type="parent_test")

def get_graph(self, manifest: Manifest) -> Graph:
Expand Down
Loading