Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support customizing how dbt nodes are converted to Airflow #503

Merged
merged 29 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
989ffb3
Add support to source nodes in dbt ls and manifest load methods
tatiana Aug 31, 2023
eb3813b
Add an example DAG that has a dbt project with sources
tatiana Sep 1, 2023
a6c8940
Allow users to customize how DbtResource should be rendered in Airflow
tatiana Sep 1, 2023
bd05c16
Improve example DAG
tatiana Sep 1, 2023
df1ceeb
Fix CI
tatiana Sep 1, 2023
bf972fb
Fix CI
tatiana Sep 1, 2023
8b431e4
Fix imports
tatiana Sep 5, 2023
4e2c6de
Try to fix Ci issues
tatiana Sep 5, 2023
5fe1429
Fix test after rebase
tatiana Oct 4, 2023
99b95d4
Rename render_config.dbt_resource_converter -> render_config.node_con…
tatiana Oct 4, 2023
1204536
Separate Sqlite tests
tatiana Oct 9, 2023
6e9ba02
Removed k8s example that was unintentionally commited
tatiana Oct 9, 2023
f3ebd2a
Fix tests
tatiana Oct 9, 2023
9b8397e
Resolve conflict with latest main
tatiana Oct 9, 2023
0a8e6a3
Add documentation and warning for RenderConfig.node_converters
tatiana Oct 9, 2023
7fe44a6
Try to fix docs build issue
tatiana Oct 9, 2023
bdbc916
Add aenum to docs deps
tatiana Oct 9, 2023
1461aee
Fix codespell checks
tatiana Oct 9, 2023
671eb66
Fix sphinx dep issue
tatiana Oct 9, 2023
2ba56c8
Revert change to test related to dbt version
tatiana Oct 10, 2023
0370917
Unpin dbt to se if models test will pass
tatiana Oct 10, 2023
d39c689
Only run example DAGs compatible with the right version of dbt & git …
tatiana Oct 10, 2023
2d04433
Merge branch 'main' into issue-427-task-generator
tatiana Oct 10, 2023
f1c722e
Merge branch 'main' into issue-427-task-generator
tatiana Oct 10, 2023
a5f1c1e
Document sources example DAG
tatiana Oct 13, 2023
6ad3b29
Change log level for convertion functions
tatiana Oct 13, 2023
f75181f
Revert unnecessary change in pyproject
tatiana Oct 13, 2023
1c426d9
Revert unnecessary change
tatiana Oct 13, 2023
945f62f
Ignore mypy error
tatiana Oct 13, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,68 @@ jobs:
DATABRICKS_WAREHOUSE_ID: ${{ secrets.DATABRICKS_WAREHOUSE_ID }}
DATABRICKS_CLUSTER_ID: ${{ secrets.DATABRICKS_CLUSTER_ID }}

Run-Integration-Tests-Sqlite:
needs: Authorize
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.10"]
airflow-version: ["2.7"]

steps:
- uses: actions/checkout@v3
with:
ref: ${{ github.event.pull_request.head.sha || github.ref }}
- uses: actions/cache@v3
with:
path: |
~/.cache/pip
.nox
key: integration-sqlite-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ hashFiles('pyproject.toml') }}-${{ hashFiles('cosmos/__init__.py') }}

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}

- name: Install packages and dependencies
run: |
python -m pip install hatch
hatch -e tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }} run pip freeze
- name: Test Cosmos against Airflow ${{ matrix.airflow-version }} and Python ${{ matrix.python-version }}
run: |
hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-integration-sqlite-setup
hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-integration-sqlite
env:
AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/
AIRFLOW_CONN_AIRFLOW_DB: postgres://postgres:[email protected]:5432/postgres
PYTHONPATH: /home/runner/work/astronomer-cosmos/astronomer-cosmos/:$PYTHONPATH
AIRFLOW_CONN_DATABRICKS_DEFAULT: ${{ secrets.AIRFLOW_CONN_DATABRICKS_DEFAULT }}
DATABRICKS_HOST: ${{ secrets.DATABRICKS_HOST }}
DATABRICKS_TOKEN: ${{ secrets.DATABRICKS_TOKEN }}
DATABRICKS_WAREHOUSE_ID: ${{ secrets.DATABRICKS_WAREHOUSE_ID }}
DATABRICKS_CLUSTER_ID: ${{ secrets.DATABRICKS_CLUSTER_ID }}
COSMOS_CONN_POSTGRES_PASSWORD: ${{ secrets.COSMOS_CONN_POSTGRES_PASSWORD }}
POSTGRES_HOST: localhost
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: postgres
POSTGRES_SCHEMA: public
POSTGRES_PORT: 5432

- name: Upload coverage to Github
uses: actions/upload-artifact@v2
with:
name: coverage-integration-sqlite-test-${{ matrix.python-version }}-${{ matrix.airflow-version }}
path: .coverage

env:
AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/
AIRFLOW_CONN_AIRFLOW_DB: postgres://postgres:[email protected]:5432/postgres
PYTHONPATH: /home/runner/work/astronomer-cosmos/astronomer-cosmos/:$PYTHONPATH


Code-Coverage:
if: github.event.action != 'labeled'
needs:
Expand Down
105 changes: 73 additions & 32 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
from airflow.models.dag import DAG
from airflow.utils.task_group import TaskGroup

from cosmos.constants import DbtResourceType, ExecutionMode, TestBehavior
from cosmos.constants import DbtResourceType, TestBehavior, ExecutionMode, TESTABLE_DBT_RESOURCES, DEFAULT_DBT_RESOURCES
from cosmos.core.airflow import get_airflow_task as create_airflow_task
from cosmos.core.graph.entities import Task as TaskMetadata
from cosmos.dbt.graph import DbtNode
from cosmos.log import get_logger


logger = get_logger(__name__)


Expand Down Expand Up @@ -104,10 +105,9 @@
}
args = {**args, **{"models": node.name}}

if hasattr(node.resource_type, "value") and node.resource_type in dbt_resource_to_class:
if DbtResourceType(node.resource_type) in DEFAULT_DBT_RESOURCES and node.resource_type in dbt_resource_to_class:
if node.resource_type == DbtResourceType.MODEL:
task_id = f"{node.name}_run"

if use_task_group is True:
task_id = "run"
else:
Expand All @@ -122,10 +122,58 @@
)
return task_metadata
else:
logger.error(f"Unsupported resource type {node.resource_type} (node {node.unique_id}).")
msg = (
f"Unavailable conversion function for <{node.resource_type}> (node <{node.unique_id}>). "
"Define a converter function using render_config.node_converters."
)
logger.warning(msg)
return None


def generate_task_or_group(
dag: DAG,
task_group: TaskGroup | None,
node: DbtNode,
execution_mode: ExecutionMode,
task_args: dict[str, Any],
test_behavior: TestBehavior,
on_warning_callback: Callable[..., Any] | None,
**kwargs: Any,
) -> BaseOperator | TaskGroup | None:
task_or_group: BaseOperator | TaskGroup | None = None

use_task_group = (
node.resource_type in TESTABLE_DBT_RESOURCES
and test_behavior == TestBehavior.AFTER_EACH
and node.has_test is True
)

task_meta = create_task_metadata(
node=node, execution_mode=execution_mode, args=task_args, use_task_group=use_task_group
)

# In most cases, we'll map one DBT node to one Airflow task
# The exception are the test nodes, since it would be too slow to run test tasks individually.
# If test_behaviour=="after_each", each model task will be bundled with a test task, using TaskGroup
if task_meta and node.resource_type != DbtResourceType.TEST:
if use_task_group:
with TaskGroup(dag=dag, group_id=node.name, parent_group=task_group) as model_task_group:
task = create_airflow_task(task_meta, dag, task_group=model_task_group)
test_meta = create_test_task_metadata(
"test",
execution_mode,
task_args=task_args,
model_name=node.name,
on_warning_callback=on_warning_callback,
)
test_task = create_airflow_task(test_meta, dag, task_group=model_task_group)
task >> test_task
task_or_group = model_task_group
else:
task_or_group = create_airflow_task(task_meta, dag, task_group=task_group)
return task_or_group


def build_airflow_graph(
nodes: dict[str, DbtNode],
dag: DAG, # Airflow-specific - parent DAG where to associate tasks and (optional) task groups
Expand All @@ -135,6 +183,7 @@
dbt_project_name: str, # DBT / Cosmos - used to name test task if mode is after_all,
task_group: TaskGroup | None = None,
on_warning_callback: Callable[..., Any] | None = None, # argument specific to the DBT test command
node_converters: dict[DbtResourceType, Callable[..., Any]] | None = None,
) -> None:
"""
Instantiate dbt `nodes` as Airflow tasks within the given `task_group` (optional) or `dag` (mandatory).
Expand All @@ -160,41 +209,33 @@
:param on_warning_callback: A callback function called on warnings with additional Context variables “test_names”
and “test_results” of type List.
"""
node_converters = node_converters or {}
tasks_map = {}
task_or_group: TaskGroup | BaseOperator

# In most cases, we'll map one DBT node to one Airflow task
# The exception are the test nodes, since it would be too slow to run test tasks individually.
# If test_behaviour=="after_each", each model task will be bundled with a test task, using TaskGroup
for node_id, node in nodes.items():
use_task_group = (
node.resource_type == DbtResourceType.MODEL
and test_behavior == TestBehavior.AFTER_EACH
and node.has_test is True
)
task_meta = create_task_metadata(
node=node, execution_mode=execution_mode, args=task_args, use_task_group=use_task_group
conversion_function = node_converters.get(node.resource_type, generate_task_or_group)
if conversion_function != generate_task_or_group:
logger.warning(

Check warning on line 219 in cosmos/airflow/graph.py

View check run for this annotation

Codecov / codecov/patch

cosmos/airflow/graph.py#L219

Added line #L219 was not covered by tests
"The `node_converters` attribute is an experimental feature. "
"Its syntax and behavior can be changed before a major release."
)
logger.debug(f"Converting <{node.unique_id}> using <{conversion_function.__name__}>")
task_or_group = conversion_function( # type: ignore
dag=dag,
task_group=task_group,
dbt_project_name=dbt_project_name,
execution_mode=execution_mode,
task_args=task_args,
test_behavior=test_behavior,
on_warning_callback=on_warning_callback,
node=node,
)

if task_meta and node.resource_type != DbtResourceType.TEST:
if use_task_group is True:
with TaskGroup(dag=dag, group_id=node.name, parent_group=task_group) as model_task_group:
task = create_airflow_task(task_meta, dag, task_group=model_task_group)
test_meta = create_test_task_metadata(
"test",
execution_mode,
task_args=task_args,
model_name=node.name,
on_warning_callback=on_warning_callback,
)
test_task = create_airflow_task(test_meta, dag, task_group=model_task_group)
task >> test_task
task_or_group = model_task_group
else:
task_or_group = create_airflow_task(task_meta, dag, task_group=task_group)
if task_or_group is not None:
logger.debug(f"Conversion of <{node.unique_id}> was successful!")
tasks_map[node_id] = task_or_group

# If test_behaviour=="after_all", there will be one test task, run "by the end" of the DAG
# If test_behaviour=="after_all", there will be one test task, run by the end of the DAG
# The end of a DAG is defined by the DAG leaf tasks (tasks which do not have downstream tasks)
if test_behavior == TestBehavior.AFTER_ALL:
test_meta = create_test_task_metadata(
Expand Down
5 changes: 3 additions & 2 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
import tempfile
from dataclasses import dataclass, field
from pathlib import Path
from typing import Iterator
from typing import Any, Iterator, Callable

from cosmos.constants import TestBehavior, ExecutionMode, LoadMode
from cosmos.constants import DbtResourceType, TestBehavior, ExecutionMode, LoadMode
from cosmos.dbt.executable import get_system_dbt
from cosmos.exceptions import CosmosValueError
from cosmos.log import get_logger
Expand Down Expand Up @@ -39,6 +39,7 @@ class RenderConfig:
select: list[str] = field(default_factory=list)
exclude: list[str] = field(default_factory=list)
dbt_deps: bool = True
node_converters: dict[DbtResourceType, Callable[..., Any]] | None = None


@dataclass
Expand Down
18 changes: 16 additions & 2 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from enum import Enum
from pathlib import Path

import aenum


DBT_PROFILE_PATH = Path(os.path.expanduser("~")).joinpath(".dbt/profiles.yml")
DEFAULT_DBT_PROFILE_NAME = "cosmos_profile"
Expand Down Expand Up @@ -49,8 +51,7 @@ class ExecutionMode(Enum):
VIRTUALENV = "virtualenv"


# Rename to DbtResourceType
class DbtResourceType(Enum):
class DbtResourceType(aenum.Enum): # type: ignore
"""
Type of dbt node.
"""
Expand All @@ -60,3 +61,16 @@ class DbtResourceType(Enum):
SEED = "seed"
TEST = "test"
SOURCE = "source"

@classmethod
def _missing_value_(cls, value): # type: ignore
aenum.extend_enum(cls, value.upper(), value.lower())
return getattr(DbtResourceType, value.upper())


DEFAULT_DBT_RESOURCES = DbtResourceType.__members__.values()


TESTABLE_DBT_RESOURCES = {
DbtResourceType.MODEL
} # TODO: extend with DbtResourceType.SOURCE, DbtResourceType.SNAPSHOT, DbtResourceType.SEED)
2 changes: 2 additions & 0 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ def __init__(
load_mode = render_config.load_method
manifest_path = project_config.parsed_manifest_path
dbt_executable_path = execution_config.dbt_executable_path
node_converters = render_config.node_converters

profile_args = {}
if profile_config.profile_mapping:
Expand Down Expand Up @@ -168,4 +169,5 @@ def __init__(
test_behavior=test_behavior,
dbt_project_name=dbt_project.name,
on_warning_callback=on_warning_callback,
node_converters=node_converters,
)
5 changes: 3 additions & 2 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,12 +342,13 @@ def load_from_dbt_manifest(self) -> None:
with open(self.project.manifest_path) as fp: # type: ignore[arg-type]
manifest = json.load(fp)

for unique_id, node_dict in manifest.get("nodes", {}).items():
resources = {**manifest.get("nodes", {}), **manifest.get("sources", {}), **manifest.get("exposures", {})}
tatiana marked this conversation as resolved.
Show resolved Hide resolved
for unique_id, node_dict in resources.items():
node = DbtNode(
name=node_dict.get("alias", node_dict["name"]),
unique_id=unique_id,
resource_type=DbtResourceType(node_dict["resource_type"]),
depends_on=node_dict["depends_on"].get("nodes", []),
depends_on=node_dict.get("depends_on", {}).get("nodes", []),
file_path=self.project.dir / node_dict["original_file_path"],
tags=node_dict["tags"],
config=node_dict["config"],
Expand Down
4 changes: 4 additions & 0 deletions dev/dags/dbt/jaffle_shop/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

target/
dbt_packages/
logs/
Loading
Loading