Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bugfix][Selection] yaml selector do not obey default overwrite #10009

Merged
merged 5 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions core/dbt/graph/README.md
Original file line number Diff line number Diff line change
@@ -1 +1,9 @@
# Graph README

## Graph Selector Creation

### Selector Loading
During dbt execution, the `@requires.project` decorator creates the final selector objects used in the graph. The `SelectorConfig` class loads selectors from the project configuration, while the `selector_config_from_data` function parses these selectors.

#### Indirect Selection Default Value
In `@requires.preflight`, dbt reads CLI flags, environment variables, and the parameter's default value. It resolves these inputs based on their precedence order and stores the resolved value in global flags. When loading selectors, the [`selection_criteria_from_dict`](https://github.com/dbt-labs/dbt-core/blob/b316c5f18021fef3d7fd6ec255427054b7d2205e/core/dbt/graph/selector_spec.py#L111) function resolves the indirect selection value to the global flags value if not set. This ensures correct resolution of the indirect selection value.
33 changes: 8 additions & 25 deletions core/dbt/graph/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
def parse_union(
components: List[str],
expect_exists: bool,
indirect_selection: IndirectSelection = IndirectSelection.Eager,
) -> SelectionUnion:
# turn ['a b', 'c'] -> ['a', 'b', 'c']
raw_specs = itertools.chain.from_iterable(r.split(" ") for r in components)
Expand All @@ -37,7 +36,7 @@ def parse_union(
# ['a', 'b', 'c,d'] -> union('a', 'b', intersection('c', 'd'))
for raw_spec in raw_specs:
intersection_components: List[SelectionSpec] = [
SelectionCriteria.from_single_spec(part, indirect_selection=indirect_selection)
SelectionCriteria.from_single_spec(part)
for part in raw_spec.split(INTERSECTION_DELIMITER)
]
union_components.append(
Expand All @@ -56,41 +55,25 @@ def parse_union(
)


def parse_union_from_default(
raw: Optional[List[str]],
default: List[str],
indirect_selection: IndirectSelection = IndirectSelection.Eager,
) -> SelectionUnion:
def parse_union_from_default(raw: Optional[List[str]], default: List[str]) -> SelectionUnion:
components: List[str]
expect_exists: bool
if raw is None:
return parse_union(
components=default, expect_exists=False, indirect_selection=indirect_selection
)
return parse_union(components=default, expect_exists=False)
else:
return parse_union(
components=raw, expect_exists=True, indirect_selection=indirect_selection
)
return parse_union(components=raw, expect_exists=True)


def parse_difference(
include: Optional[List[str]], exclude: Optional[List[str]], indirect_selection: Any
include: Optional[List[str]], exclude: Optional[List[str]]
) -> SelectionDifference:

if include == ():
include = None

included = parse_union_from_default(
include, DEFAULT_INCLUDES, indirect_selection=IndirectSelection(indirect_selection)
)
flags = get_flags()
excluded = parse_union_from_default(
exclude, DEFAULT_EXCLUDES, indirect_selection=IndirectSelection(flags.INDIRECT_SELECTION)
)
return SelectionDifference(
components=[included, excluded],
indirect_selection=IndirectSelection(flags.INDIRECT_SELECTION),
)
included = parse_union_from_default(include, DEFAULT_INCLUDES)
excluded = parse_union_from_default(exclude, DEFAULT_EXCLUDES)
return SelectionDifference(components=[included, excluded])


RawDefinition = Union[str, Dict[str, Any]]
Expand Down
13 changes: 5 additions & 8 deletions core/dbt/graph/selector_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
from dataclasses import dataclass
from dbt_common.dataclass_schema import StrEnum, dbtClassMixin


from typing import Set, Iterator, List, Optional, Dict, Union, Any, Iterable, Tuple
from .graph import UniqueId
from .selector_methods import MethodName
from dbt_common.exceptions import DbtRuntimeError
from dbt.exceptions import InvalidSelectorError
from dbt.flags import get_flags


RAW_SELECTOR_PATTERN = re.compile(
Expand Down Expand Up @@ -110,7 +112,6 @@ def selection_criteria_from_dict(
cls,
raw: Any,
dct: Dict[str, Any],
indirect_selection: IndirectSelection = IndirectSelection.Eager,
) -> "SelectionCriteria":
if "value" not in dct:
raise DbtRuntimeError(f'Invalid node spec "{raw}" - no search value!')
Expand All @@ -121,7 +122,7 @@ def selection_criteria_from_dict(

# If defined field in selector, override CLI flag
indirect_selection = IndirectSelection(
dct.get("indirect_selection", None) or indirect_selection
dct.get("indirect_selection", get_flags().INDIRECT_SELECTION)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the most important change that leads to resolving the bug and also removes all of the variable pass-through.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the motivation for removing the variable pass-through here? I think it could be preferable to call get_flags().INDIRECT_SELECTION in runnable.py to avoid lower-level methods accessing global state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked up all call paths using this variable and all of them are referring to flags. INDIRECT_SELECTION. We can either pass it all the way everywhere or call it once in this lower level function(since this is where the selector being created.).
I think calling once at the lower level is a cleaner implementation overall. Since dbt is built on the assumption of flags is the first thing being constructed during an invocation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, that's fair. I recall from our 'de-globalization' conversations in the past that flags was more acceptable to access because its never mutated and initialized at the very beginning of an invocation, as you mentioned.

)

return cls(
Expand Down Expand Up @@ -158,17 +159,13 @@ def dict_from_single_spec(cls, raw: str):
return dct

@classmethod
def from_single_spec(
cls, raw: str, indirect_selection: IndirectSelection = IndirectSelection.Eager
) -> "SelectionCriteria":
def from_single_spec(cls, raw: str) -> "SelectionCriteria":
result = RAW_SELECTOR_PATTERN.match(raw)
if result is None:
# bad spec!
raise DbtRuntimeError(f'Invalid selector spec "{raw}"')

return cls.selection_criteria_from_dict(
raw, result.groupdict(), indirect_selection=indirect_selection
)
return cls.selection_criteria_from_dict(raw, result.groupdict())


class BaseSelectionGroup(dbtClassMixin, Iterable[SelectionSpec], metaclass=ABCMeta):
Expand Down
9 changes: 1 addition & 8 deletions core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,6 @@ def exclusion_arg(self):

def get_selection_spec(self) -> SelectionSpec:
default_selector_name = self.config.get_default_selector_name()
# TODO: The "eager" string below needs to be replaced with programatic access
# to the default value for the indirect selection parameter in
# dbt.cli.params.indirect_selection
#
# Doing that is actually a little tricky, so I'm punting it to a new ticket GH #6397
indirect_selection = getattr(self.args, "INDIRECT_SELECTION", "eager")

if self.args.selector:
# use pre-defined selector (--selector)
spec = self.config.get_selector(self.args.selector)
Expand All @@ -125,7 +118,7 @@ def get_selection_spec(self) -> SelectionSpec:
else:
# This is what's used with no default selector and no selection
# use --select and --exclude args
spec = parse_difference(self.selection_arg, self.exclusion_arg, indirect_selection)
spec = parse_difference(self.selection_arg, self.exclusion_arg)
# mypy complains because the return values of get_selector and parse_difference
# are different
return spec # type: ignore
Expand Down
Empty file added tests/unit/graph/__init__.py
Empty file.
214 changes: 212 additions & 2 deletions tests/unit/test_graph.py → tests/unit/graph/test_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,218 @@
from dbt.events.logging import setup_event_logger
from dbt.mp_context import get_mp_context
from queue import Empty
from .utils import config_from_parts_or_dicts, generate_name_macros, inject_plugin
from tests.unit.utils import config_from_parts_or_dicts, generate_name_macros, inject_plugin

from argparse import Namespace


import pytest

import string
import dbt_common.exceptions
import dbt.graph.selector as graph_selector
import dbt.graph.cli as graph_cli
from dbt.node_types import NodeType

import networkx as nx


set_from_args(Namespace(WARN_ERROR=False), None)


def _get_graph():
integer_graph = nx.balanced_tree(2, 2, nx.DiGraph())

package_mapping = {
i: "m." + ("X" if i % 2 == 0 else "Y") + "." + letter
for (i, letter) in enumerate(string.ascii_lowercase)
}

# Edges: [(X.a, Y.b), (X.a, X.c), (Y.b, Y.d), (Y.b, X.e), (X.c, Y.f), (X.c, X.g)]
return graph_selector.Graph(nx.relabel_nodes(integer_graph, package_mapping))


def _get_manifest(graph):
nodes = {}
for unique_id in graph:
fqn = unique_id.split(".")
node = MagicMock(
unique_id=unique_id,
fqn=fqn,
package_name=fqn[0],
tags=[],
resource_type=NodeType.Model,
empty=False,
config=MagicMock(enabled=True),
is_versioned=False,
)
nodes[unique_id] = node

nodes["m.X.a"].tags = ["abc"]
nodes["m.Y.b"].tags = ["abc", "bcef"]
nodes["m.X.c"].tags = ["abc", "bcef"]
nodes["m.Y.d"].tags = []
nodes["m.X.e"].tags = ["efg", "bcef"]
nodes["m.Y.f"].tags = ["efg", "bcef"]
nodes["m.X.g"].tags = ["efg"]
return MagicMock(nodes=nodes)


@pytest.fixture
def graph():
return _get_graph()


@pytest.fixture
def manifest(graph):
return _get_manifest(graph)


def id_macro(arg):
if isinstance(arg, str):
return arg
try:
return "_".join(arg)
except TypeError:
return arg


run_specs = [
# include by fqn
(["X.a"], [], {"m.X.a"}),
# include by tag
(["tag:abc"], [], {"m.X.a", "m.Y.b", "m.X.c"}),
# exclude by tag
(["*"], ["tag:abc"], {"m.Y.d", "m.X.e", "m.Y.f", "m.X.g"}),
# tag + fqn
(["tag:abc", "a"], [], {"m.X.a", "m.Y.b", "m.X.c"}),
(["tag:abc", "d"], [], {"m.X.a", "m.Y.b", "m.X.c", "m.Y.d"}),
# multiple node selection across packages
(["X.a", "b"], [], {"m.X.a", "m.Y.b"}),
(["X.a+"], ["b"], {"m.X.a", "m.X.c", "m.Y.d", "m.X.e", "m.Y.f", "m.X.g"}),
# children
(["X.c+"], [], {"m.X.c", "m.Y.f", "m.X.g"}),
(["X.a+1"], [], {"m.X.a", "m.Y.b", "m.X.c"}),
(["X.a+"], ["tag:efg"], {"m.X.a", "m.Y.b", "m.X.c", "m.Y.d"}),
# parents
(["+Y.f"], [], {"m.X.c", "m.Y.f", "m.X.a"}),
(["1+Y.f"], [], {"m.X.c", "m.Y.f"}),
# childrens parents
(["@X.c"], [], {"m.X.a", "m.X.c", "m.Y.f", "m.X.g"}),
# multiple selection/exclusion
(["tag:abc", "tag:bcef"], [], {"m.X.a", "m.Y.b", "m.X.c", "m.X.e", "m.Y.f"}),
(["tag:abc", "tag:bcef"], ["tag:efg"], {"m.X.a", "m.Y.b", "m.X.c"}),
(["tag:abc", "tag:bcef"], ["tag:efg", "a"], {"m.Y.b", "m.X.c"}),
# intersections
(["a,a"], [], {"m.X.a"}),
(["+c,c+"], [], {"m.X.c"}),
(["a,b"], [], set()),
(["tag:abc,tag:bcef"], [], {"m.Y.b", "m.X.c"}),
(["*,tag:abc,a"], [], {"m.X.a"}),
(["a,tag:abc,*"], [], {"m.X.a"}),
(["tag:abc,tag:bcef"], ["c"], {"m.Y.b"}),
(["tag:bcef,tag:efg"], ["tag:bcef,@b"], {"m.Y.f"}),
(["tag:bcef,tag:efg"], ["tag:bcef,@a"], set()),
(["*,@a,+b"], ["*,tag:abc,tag:bcef"], {"m.X.a"}),
(["tag:bcef,tag:efg", "*,tag:abc"], [], {"m.X.a", "m.Y.b", "m.X.c", "m.X.e", "m.Y.f"}),
(["tag:bcef,tag:efg", "*,tag:abc"], ["e"], {"m.X.a", "m.Y.b", "m.X.c", "m.Y.f"}),
(["tag:bcef,tag:efg", "*,tag:abc"], ["e"], {"m.X.a", "m.Y.b", "m.X.c", "m.Y.f"}),
(["tag:bcef,tag:efg", "*,tag:abc"], ["e", "f"], {"m.X.a", "m.Y.b", "m.X.c"}),
(["tag:bcef,tag:efg", "*,tag:abc"], ["tag:abc,tag:bcef"], {"m.X.a", "m.X.e", "m.Y.f"}),
(["tag:bcef,tag:efg", "*,tag:abc"], ["tag:abc,tag:bcef", "tag:abc,a"], {"m.X.e", "m.Y.f"}),
]


@pytest.mark.parametrize("include,exclude,expected", run_specs, ids=id_macro)
def test_run_specs(include, exclude, expected, graph, manifest):
selector = graph_selector.NodeSelector(graph, manifest)
spec = graph_cli.parse_difference(include, exclude)
selected, _ = selector.select_nodes(spec)

assert selected == expected


param_specs = [
("a", False, None, False, None, "fqn", "a", False),
("+a", True, None, False, None, "fqn", "a", False),
("256+a", True, 256, False, None, "fqn", "a", False),
("a+", False, None, True, None, "fqn", "a", False),
("a+256", False, None, True, 256, "fqn", "a", False),
("+a+", True, None, True, None, "fqn", "a", False),
("16+a+32", True, 16, True, 32, "fqn", "a", False),
("@a", False, None, False, None, "fqn", "a", True),
("a.b", False, None, False, None, "fqn", "a.b", False),
("+a.b", True, None, False, None, "fqn", "a.b", False),
("256+a.b", True, 256, False, None, "fqn", "a.b", False),
("a.b+", False, None, True, None, "fqn", "a.b", False),
("a.b+256", False, None, True, 256, "fqn", "a.b", False),
("+a.b+", True, None, True, None, "fqn", "a.b", False),
("16+a.b+32", True, 16, True, 32, "fqn", "a.b", False),
("@a.b", False, None, False, None, "fqn", "a.b", True),
("a.b.*", False, None, False, None, "fqn", "a.b.*", False),
("+a.b.*", True, None, False, None, "fqn", "a.b.*", False),
("256+a.b.*", True, 256, False, None, "fqn", "a.b.*", False),
("a.b.*+", False, None, True, None, "fqn", "a.b.*", False),
("a.b.*+256", False, None, True, 256, "fqn", "a.b.*", False),
("+a.b.*+", True, None, True, None, "fqn", "a.b.*", False),
("16+a.b.*+32", True, 16, True, 32, "fqn", "a.b.*", False),
("@a.b.*", False, None, False, None, "fqn", "a.b.*", True),
("tag:a", False, None, False, None, "tag", "a", False),
("+tag:a", True, None, False, None, "tag", "a", False),
("256+tag:a", True, 256, False, None, "tag", "a", False),
("tag:a+", False, None, True, None, "tag", "a", False),
("tag:a+256", False, None, True, 256, "tag", "a", False),
("+tag:a+", True, None, True, None, "tag", "a", False),
("16+tag:a+32", True, 16, True, 32, "tag", "a", False),
("@tag:a", False, None, False, None, "tag", "a", True),
("source:a", False, None, False, None, "source", "a", False),
("source:a+", False, None, True, None, "source", "a", False),
("source:a+1", False, None, True, 1, "source", "a", False),
("source:a+32", False, None, True, 32, "source", "a", False),
("@source:a", False, None, False, None, "source", "a", True),
]


@pytest.mark.parametrize(
"spec,parents,parents_depth,children,children_depth,filter_type,filter_value,childrens_parents",
param_specs,
ids=id_macro,
)
def test_parse_specs(
spec,
parents,
parents_depth,
children,
children_depth,
filter_type,
filter_value,
childrens_parents,
):
parsed = graph_selector.SelectionCriteria.from_single_spec(spec)
assert parsed.parents == parents
assert parsed.parents_depth == parents_depth
assert parsed.children == children
assert parsed.children_depth == children_depth
assert parsed.method == filter_type
assert parsed.value == filter_value
assert parsed.childrens_parents == childrens_parents


invalid_specs = [
"@a+",
"@a.b+",
"@a.b*+",
"@tag:a+",
"@source:a+",
]


@pytest.mark.parametrize("invalid", invalid_specs, ids=lambda k: str(k))
def test_invalid_specs(invalid):
with pytest.raises(dbt_common.exceptions.DbtRuntimeError):
graph_selector.SelectionCriteria.from_single_spec(invalid)


class GraphTest(unittest.TestCase):
def tearDown(self):
self.mock_filesystem_search.stop()
Expand Down Expand Up @@ -342,7 +547,12 @@ def test__dependency_list(self):
# dbt.cli.params.indirect_selection
#
# Doing that is actually a little tricky, so I'm punting it to a new ticket GH #6397
queue = selector.get_graph_queue(parse_difference(None, None, "eager"))
queue = selector.get_graph_queue(
parse_difference(
None,
None,
)
)

for model_id in model_ids:
self.assertFalse(queue.empty())
Expand Down
Loading
Loading