Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure filtering with manifest loading works with single model #576

Merged
merged 7 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
# these files get autogenerated
docs/profiles/*

# dbt_packages is a directory that gets created when you run dbt deps
dbt_packages/

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
Expand Down
63 changes: 46 additions & 17 deletions cosmos/dbt/selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from typing import TYPE_CHECKING

from cosmos.constants import DbtResourceType
from cosmos.exceptions import CosmosValueError
from cosmos.log import get_logger

Expand Down Expand Up @@ -76,6 +77,9 @@ def load_from_statement(self, statement: str) -> None:
self.other.append(item)
logger.warning("Unsupported select statement: %s", item)

def __repr__(self) -> str:
return f"SelectorConfig(paths={self.paths}, tags={self.tags}, config={self.config}, other={self.other})"


def select_nodes_ids_by_intersection(nodes: dict[str, DbtNode], config: SelectorConfig) -> set[str]:
"""
Expand All @@ -88,30 +92,55 @@ def select_nodes_ids_by_intersection(nodes: dict[str, DbtNode], config: Selector
https://docs.getdbt.com/reference/node-selection/syntax
https://docs.getdbt.com/reference/node-selection/yaml-selectors
"""
if config.is_empty:
harels marked this conversation as resolved.
Show resolved Hide resolved
return set(nodes.keys())

selected_nodes = set()
visited_nodes = set()

if not config.is_empty:
for node_id, node in nodes.items():
if config.tags and not (sorted(node.tags) == sorted(config.tags)):
continue
def should_include_node(node_id: str, node: DbtNode) -> bool:
"Checks if a single node should be included. Only runs once per node with caching."
if node_id in visited_nodes:
return node_id in selected_nodes

supported_node_config = {key: value for key, value in node.config.items() if key in SUPPORTED_CONFIG}
config_tag = config.config.get("tags")
if config.config:
if config_tag and config_tag not in supported_node_config.get("tags", []):
continue
visited_nodes.add(node_id)

# Remove 'tags' as they've already been filtered for
config_copy = copy.deepcopy(config.config)
config_copy.pop("tags", None)
supported_node_config.pop("tags", None)
if config.tags:
if not (set(config.tags) == set(node.tags)):
return False

if not (config_copy.items() <= supported_node_config.items()):
continue
node_config = {key: value for key, value in node.config.items() if key in SUPPORTED_CONFIG}
config_tags = config.config.get("tags")
if config_tags and config_tags not in node_config.get("tags", []):
return False

if config.paths and not (set(config.paths).issubset(set(node.file_path.parents))):
continue
# Remove 'tags' as they've already been filtered for
config_copy = copy.deepcopy(config.config)
config_copy.pop("tags", None)
node_config.pop("tags", None)

if not (config_copy.items() <= node_config.items()):
return False

if config.paths:
for filter_path in config.paths:
if filter_path in node.file_path.parents or filter_path == node.file_path:
return True

# if it's a test coming from a schema.yml file, check the model's file_path
if node.resource_type == DbtResourceType.TEST and node.file_path.name == "schema.yml":
# try to get the corresponding model from node.depends_on
if len(node.depends_on) == 1:
model_node = nodes.get(node.depends_on[0])
if model_node:
return should_include_node(node.depends_on[0], model_node)

return False

return True

for node_id, node in nodes.items():
if should_include_node(node_id, node):
selected_nodes.add(node_id)

return selected_nodes
Expand Down
40 changes: 40 additions & 0 deletions dev/dags/cosmos_manifest_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""
An example DAG that uses Cosmos to render a dbt project.
"""

import os
from datetime import datetime
from pathlib import Path

from cosmos import DbtDag, ProjectConfig, ProfileConfig, RenderConfig, LoadMode
from cosmos.profiles import PostgresUserPasswordProfileMapping

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))

profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="airflow_db",
profile_args={"schema": "public"},
),
)

# [START local_example]
cosmos_manifest_example = DbtDag(
# dbt/cosmos-specific parameters
project_config=ProjectConfig(
DBT_ROOT_PATH / "jaffle_shop",
manifest_path=DBT_ROOT_PATH / "jaffle_shop" / "target" / "manifest.json",
),
profile_config=profile_config,
render_config=RenderConfig(load_method=LoadMode.DBT_MANIFEST, select=["path:models/customers.sql"]),
operator_args={"install_deps": True},
# normal dag parameters
schedule_interval="@daily",
start_date=datetime(2023, 1, 1),
catchup=False,
dag_id="cosmos_manifest_example",
)
# [END local_example]
Loading
Loading