Skip to content

Commit

Permalink
[Bugfix][Selection] yaml selector do not obey default overwrite (#10009)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChenyuLInx authored Apr 24, 2024
1 parent ddd6506 commit 9a0b714
Show file tree
Hide file tree
Showing 11 changed files with 298 additions and 54 deletions.
7 changes: 7 additions & 0 deletions .changes/unreleased/Fixes-20240422-173532.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Fixes
body: Fix default value for indirect selection in selector cannot overwritten by CLI
flag and env var
time: 2024-04-22T17:35:32.465183-07:00
custom:
Author: ChenyuLInx
Issue: 9976 7673
8 changes: 8 additions & 0 deletions core/dbt/graph/README.md
Original file line number Diff line number Diff line change
@@ -1 +1,9 @@
# Graph README

## Graph Selector Creation

### Selector Loading
During dbt execution, the `@requires.project` decorator creates the final selector objects used in the graph. The `SelectorConfig` class loads selectors from the project configuration, while the `selector_config_from_data` function parses these selectors.

#### Indirect Selection Default Value
In `@requires.preflight`, dbt reads CLI flags, environment variables, and the parameter's default value. It resolves these inputs based on their precedence order and stores the resolved value in global flags. When loading selectors, the [`selection_criteria_from_dict`](https://github.com/dbt-labs/dbt-core/blob/b316c5f18021fef3d7fd6ec255427054b7d2205e/core/dbt/graph/selector_spec.py#L111) function resolves the indirect selection value to the global flags value if not set. This ensures correct resolution of the indirect selection value.
33 changes: 8 additions & 25 deletions core/dbt/graph/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
def parse_union(
components: List[str],
expect_exists: bool,
indirect_selection: IndirectSelection = IndirectSelection.Eager,
) -> SelectionUnion:
# turn ['a b', 'c'] -> ['a', 'b', 'c']
raw_specs = itertools.chain.from_iterable(r.split(" ") for r in components)
Expand All @@ -37,7 +36,7 @@ def parse_union(
# ['a', 'b', 'c,d'] -> union('a', 'b', intersection('c', 'd'))
for raw_spec in raw_specs:
intersection_components: List[SelectionSpec] = [
SelectionCriteria.from_single_spec(part, indirect_selection=indirect_selection)
SelectionCriteria.from_single_spec(part)
for part in raw_spec.split(INTERSECTION_DELIMITER)
]
union_components.append(
Expand All @@ -56,41 +55,25 @@ def parse_union(
)


def parse_union_from_default(
raw: Optional[List[str]],
default: List[str],
indirect_selection: IndirectSelection = IndirectSelection.Eager,
) -> SelectionUnion:
def parse_union_from_default(raw: Optional[List[str]], default: List[str]) -> SelectionUnion:
components: List[str]
expect_exists: bool
if raw is None:
return parse_union(
components=default, expect_exists=False, indirect_selection=indirect_selection
)
return parse_union(components=default, expect_exists=False)
else:
return parse_union(
components=raw, expect_exists=True, indirect_selection=indirect_selection
)
return parse_union(components=raw, expect_exists=True)


def parse_difference(
include: Optional[List[str]], exclude: Optional[List[str]], indirect_selection: Any
include: Optional[List[str]], exclude: Optional[List[str]]
) -> SelectionDifference:

if include == ():
include = None

included = parse_union_from_default(
include, DEFAULT_INCLUDES, indirect_selection=IndirectSelection(indirect_selection)
)
flags = get_flags()
excluded = parse_union_from_default(
exclude, DEFAULT_EXCLUDES, indirect_selection=IndirectSelection(flags.INDIRECT_SELECTION)
)
return SelectionDifference(
components=[included, excluded],
indirect_selection=IndirectSelection(flags.INDIRECT_SELECTION),
)
included = parse_union_from_default(include, DEFAULT_INCLUDES)
excluded = parse_union_from_default(exclude, DEFAULT_EXCLUDES)
return SelectionDifference(components=[included, excluded])


RawDefinition = Union[str, Dict[str, Any]]
Expand Down
13 changes: 5 additions & 8 deletions core/dbt/graph/selector_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
from dataclasses import dataclass
from dbt_common.dataclass_schema import StrEnum, dbtClassMixin


from typing import Set, Iterator, List, Optional, Dict, Union, Any, Iterable, Tuple
from .graph import UniqueId
from .selector_methods import MethodName
from dbt_common.exceptions import DbtRuntimeError
from dbt.exceptions import InvalidSelectorError
from dbt.flags import get_flags


RAW_SELECTOR_PATTERN = re.compile(
Expand Down Expand Up @@ -110,7 +112,6 @@ def selection_criteria_from_dict(
cls,
raw: Any,
dct: Dict[str, Any],
indirect_selection: IndirectSelection = IndirectSelection.Eager,
) -> "SelectionCriteria":
if "value" not in dct:
raise DbtRuntimeError(f'Invalid node spec "{raw}" - no search value!')
Expand All @@ -121,7 +122,7 @@ def selection_criteria_from_dict(

# If defined field in selector, override CLI flag
indirect_selection = IndirectSelection(
dct.get("indirect_selection", None) or indirect_selection
dct.get("indirect_selection", get_flags().INDIRECT_SELECTION)
)

return cls(
Expand Down Expand Up @@ -158,17 +159,13 @@ def dict_from_single_spec(cls, raw: str):
return dct

@classmethod
def from_single_spec(
cls, raw: str, indirect_selection: IndirectSelection = IndirectSelection.Eager
) -> "SelectionCriteria":
def from_single_spec(cls, raw: str) -> "SelectionCriteria":
result = RAW_SELECTOR_PATTERN.match(raw)
if result is None:
# bad spec!
raise DbtRuntimeError(f'Invalid selector spec "{raw}"')

return cls.selection_criteria_from_dict(
raw, result.groupdict(), indirect_selection=indirect_selection
)
return cls.selection_criteria_from_dict(raw, result.groupdict())


class BaseSelectionGroup(dbtClassMixin, Iterable[SelectionSpec], metaclass=ABCMeta):
Expand Down
9 changes: 1 addition & 8 deletions core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,6 @@ def exclusion_arg(self):

def get_selection_spec(self) -> SelectionSpec:
default_selector_name = self.config.get_default_selector_name()
# TODO: The "eager" string below needs to be replaced with programatic access
# to the default value for the indirect selection parameter in
# dbt.cli.params.indirect_selection
#
# Doing that is actually a little tricky, so I'm punting it to a new ticket GH #6397
indirect_selection = getattr(self.args, "INDIRECT_SELECTION", "eager")

if self.args.selector:
# use pre-defined selector (--selector)
spec = self.config.get_selector(self.args.selector)
Expand All @@ -125,7 +118,7 @@ def get_selection_spec(self) -> SelectionSpec:
else:
# This is what's used with no default selector and no selection
# use --select and --exclude args
spec = parse_difference(self.selection_arg, self.exclusion_arg, indirect_selection)
spec = parse_difference(self.selection_arg, self.exclusion_arg)
# mypy complains because the return values of get_selector and parse_difference
# are different
return spec # type: ignore
Expand Down
Empty file added tests/unit/graph/__init__.py
Empty file.
214 changes: 212 additions & 2 deletions tests/unit/test_graph.py → tests/unit/graph/test_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,218 @@
from dbt.events.logging import setup_event_logger
from dbt.mp_context import get_mp_context
from queue import Empty
from .utils import config_from_parts_or_dicts, generate_name_macros, inject_plugin
from tests.unit.utils import config_from_parts_or_dicts, generate_name_macros, inject_plugin

from argparse import Namespace


import pytest

import string
import dbt_common.exceptions
import dbt.graph.selector as graph_selector
import dbt.graph.cli as graph_cli
from dbt.node_types import NodeType

import networkx as nx


set_from_args(Namespace(WARN_ERROR=False), None)


def _get_graph():
integer_graph = nx.balanced_tree(2, 2, nx.DiGraph())

package_mapping = {
i: "m." + ("X" if i % 2 == 0 else "Y") + "." + letter
for (i, letter) in enumerate(string.ascii_lowercase)
}

# Edges: [(X.a, Y.b), (X.a, X.c), (Y.b, Y.d), (Y.b, X.e), (X.c, Y.f), (X.c, X.g)]
return graph_selector.Graph(nx.relabel_nodes(integer_graph, package_mapping))


def _get_manifest(graph):
nodes = {}
for unique_id in graph:
fqn = unique_id.split(".")
node = MagicMock(
unique_id=unique_id,
fqn=fqn,
package_name=fqn[0],
tags=[],
resource_type=NodeType.Model,
empty=False,
config=MagicMock(enabled=True),
is_versioned=False,
)
nodes[unique_id] = node

nodes["m.X.a"].tags = ["abc"]
nodes["m.Y.b"].tags = ["abc", "bcef"]
nodes["m.X.c"].tags = ["abc", "bcef"]
nodes["m.Y.d"].tags = []
nodes["m.X.e"].tags = ["efg", "bcef"]
nodes["m.Y.f"].tags = ["efg", "bcef"]
nodes["m.X.g"].tags = ["efg"]
return MagicMock(nodes=nodes)


@pytest.fixture
def graph():
return _get_graph()


@pytest.fixture
def manifest(graph):
return _get_manifest(graph)


def id_macro(arg):
if isinstance(arg, str):
return arg
try:
return "_".join(arg)
except TypeError:
return arg


run_specs = [
# include by fqn
(["X.a"], [], {"m.X.a"}),
# include by tag
(["tag:abc"], [], {"m.X.a", "m.Y.b", "m.X.c"}),
# exclude by tag
(["*"], ["tag:abc"], {"m.Y.d", "m.X.e", "m.Y.f", "m.X.g"}),
# tag + fqn
(["tag:abc", "a"], [], {"m.X.a", "m.Y.b", "m.X.c"}),
(["tag:abc", "d"], [], {"m.X.a", "m.Y.b", "m.X.c", "m.Y.d"}),
# multiple node selection across packages
(["X.a", "b"], [], {"m.X.a", "m.Y.b"}),
(["X.a+"], ["b"], {"m.X.a", "m.X.c", "m.Y.d", "m.X.e", "m.Y.f", "m.X.g"}),
# children
(["X.c+"], [], {"m.X.c", "m.Y.f", "m.X.g"}),
(["X.a+1"], [], {"m.X.a", "m.Y.b", "m.X.c"}),
(["X.a+"], ["tag:efg"], {"m.X.a", "m.Y.b", "m.X.c", "m.Y.d"}),
# parents
(["+Y.f"], [], {"m.X.c", "m.Y.f", "m.X.a"}),
(["1+Y.f"], [], {"m.X.c", "m.Y.f"}),
# childrens parents
(["@X.c"], [], {"m.X.a", "m.X.c", "m.Y.f", "m.X.g"}),
# multiple selection/exclusion
(["tag:abc", "tag:bcef"], [], {"m.X.a", "m.Y.b", "m.X.c", "m.X.e", "m.Y.f"}),
(["tag:abc", "tag:bcef"], ["tag:efg"], {"m.X.a", "m.Y.b", "m.X.c"}),
(["tag:abc", "tag:bcef"], ["tag:efg", "a"], {"m.Y.b", "m.X.c"}),
# intersections
(["a,a"], [], {"m.X.a"}),
(["+c,c+"], [], {"m.X.c"}),
(["a,b"], [], set()),
(["tag:abc,tag:bcef"], [], {"m.Y.b", "m.X.c"}),
(["*,tag:abc,a"], [], {"m.X.a"}),
(["a,tag:abc,*"], [], {"m.X.a"}),
(["tag:abc,tag:bcef"], ["c"], {"m.Y.b"}),
(["tag:bcef,tag:efg"], ["tag:bcef,@b"], {"m.Y.f"}),
(["tag:bcef,tag:efg"], ["tag:bcef,@a"], set()),
(["*,@a,+b"], ["*,tag:abc,tag:bcef"], {"m.X.a"}),
(["tag:bcef,tag:efg", "*,tag:abc"], [], {"m.X.a", "m.Y.b", "m.X.c", "m.X.e", "m.Y.f"}),
(["tag:bcef,tag:efg", "*,tag:abc"], ["e"], {"m.X.a", "m.Y.b", "m.X.c", "m.Y.f"}),
(["tag:bcef,tag:efg", "*,tag:abc"], ["e"], {"m.X.a", "m.Y.b", "m.X.c", "m.Y.f"}),
(["tag:bcef,tag:efg", "*,tag:abc"], ["e", "f"], {"m.X.a", "m.Y.b", "m.X.c"}),
(["tag:bcef,tag:efg", "*,tag:abc"], ["tag:abc,tag:bcef"], {"m.X.a", "m.X.e", "m.Y.f"}),
(["tag:bcef,tag:efg", "*,tag:abc"], ["tag:abc,tag:bcef", "tag:abc,a"], {"m.X.e", "m.Y.f"}),
]


@pytest.mark.parametrize("include,exclude,expected", run_specs, ids=id_macro)
def test_run_specs(include, exclude, expected, graph, manifest):
selector = graph_selector.NodeSelector(graph, manifest)
spec = graph_cli.parse_difference(include, exclude)
selected, _ = selector.select_nodes(spec)

assert selected == expected


param_specs = [
("a", False, None, False, None, "fqn", "a", False),
("+a", True, None, False, None, "fqn", "a", False),
("256+a", True, 256, False, None, "fqn", "a", False),
("a+", False, None, True, None, "fqn", "a", False),
("a+256", False, None, True, 256, "fqn", "a", False),
("+a+", True, None, True, None, "fqn", "a", False),
("16+a+32", True, 16, True, 32, "fqn", "a", False),
("@a", False, None, False, None, "fqn", "a", True),
("a.b", False, None, False, None, "fqn", "a.b", False),
("+a.b", True, None, False, None, "fqn", "a.b", False),
("256+a.b", True, 256, False, None, "fqn", "a.b", False),
("a.b+", False, None, True, None, "fqn", "a.b", False),
("a.b+256", False, None, True, 256, "fqn", "a.b", False),
("+a.b+", True, None, True, None, "fqn", "a.b", False),
("16+a.b+32", True, 16, True, 32, "fqn", "a.b", False),
("@a.b", False, None, False, None, "fqn", "a.b", True),
("a.b.*", False, None, False, None, "fqn", "a.b.*", False),
("+a.b.*", True, None, False, None, "fqn", "a.b.*", False),
("256+a.b.*", True, 256, False, None, "fqn", "a.b.*", False),
("a.b.*+", False, None, True, None, "fqn", "a.b.*", False),
("a.b.*+256", False, None, True, 256, "fqn", "a.b.*", False),
("+a.b.*+", True, None, True, None, "fqn", "a.b.*", False),
("16+a.b.*+32", True, 16, True, 32, "fqn", "a.b.*", False),
("@a.b.*", False, None, False, None, "fqn", "a.b.*", True),
("tag:a", False, None, False, None, "tag", "a", False),
("+tag:a", True, None, False, None, "tag", "a", False),
("256+tag:a", True, 256, False, None, "tag", "a", False),
("tag:a+", False, None, True, None, "tag", "a", False),
("tag:a+256", False, None, True, 256, "tag", "a", False),
("+tag:a+", True, None, True, None, "tag", "a", False),
("16+tag:a+32", True, 16, True, 32, "tag", "a", False),
("@tag:a", False, None, False, None, "tag", "a", True),
("source:a", False, None, False, None, "source", "a", False),
("source:a+", False, None, True, None, "source", "a", False),
("source:a+1", False, None, True, 1, "source", "a", False),
("source:a+32", False, None, True, 32, "source", "a", False),
("@source:a", False, None, False, None, "source", "a", True),
]


@pytest.mark.parametrize(
"spec,parents,parents_depth,children,children_depth,filter_type,filter_value,childrens_parents",
param_specs,
ids=id_macro,
)
def test_parse_specs(
spec,
parents,
parents_depth,
children,
children_depth,
filter_type,
filter_value,
childrens_parents,
):
parsed = graph_selector.SelectionCriteria.from_single_spec(spec)
assert parsed.parents == parents
assert parsed.parents_depth == parents_depth
assert parsed.children == children
assert parsed.children_depth == children_depth
assert parsed.method == filter_type
assert parsed.value == filter_value
assert parsed.childrens_parents == childrens_parents


invalid_specs = [
"@a+",
"@a.b+",
"@a.b*+",
"@tag:a+",
"@source:a+",
]


@pytest.mark.parametrize("invalid", invalid_specs, ids=lambda k: str(k))
def test_invalid_specs(invalid):
with pytest.raises(dbt_common.exceptions.DbtRuntimeError):
graph_selector.SelectionCriteria.from_single_spec(invalid)


class GraphTest(unittest.TestCase):
def tearDown(self):
self.mock_filesystem_search.stop()
Expand Down Expand Up @@ -342,7 +547,12 @@ def test__dependency_list(self):
# dbt.cli.params.indirect_selection
#
# Doing that is actually a little tricky, so I'm punting it to a new ticket GH #6397
queue = selector.get_graph_queue(parse_difference(None, None, "eager"))
queue = selector.get_graph_queue(
parse_difference(
None,
None,
)
)

for model_id in model_ids:
self.assertFalse(queue.empty())
Expand Down
Loading

0 comments on commit 9a0b714

Please sign in to comment.