Skip to content

Commit

Permalink
Merge branch 'main' into custom-selector-upstream-model
Browse files Browse the repository at this point in the history
  • Loading branch information
tatiana authored Oct 13, 2023
2 parents 127f5cd + a85f2f1 commit 25f50b4
Show file tree
Hide file tree
Showing 54 changed files with 7,655 additions and 12,860 deletions.
62 changes: 62 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,68 @@ jobs:
DATABRICKS_WAREHOUSE_ID: ${{ secrets.DATABRICKS_WAREHOUSE_ID }}
DATABRICKS_CLUSTER_ID: ${{ secrets.DATABRICKS_CLUSTER_ID }}

Run-Integration-Tests-Sqlite:
needs: Authorize
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.10"]
airflow-version: ["2.7"]

steps:
- uses: actions/checkout@v3
with:
ref: ${{ github.event.pull_request.head.sha || github.ref }}
- uses: actions/cache@v3
with:
path: |
~/.cache/pip
.nox
key: integration-sqlite-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ hashFiles('pyproject.toml') }}-${{ hashFiles('cosmos/__init__.py') }}

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}

- name: Install packages and dependencies
run: |
python -m pip install hatch
hatch -e tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }} run pip freeze
- name: Test Cosmos against Airflow ${{ matrix.airflow-version }} and Python ${{ matrix.python-version }}
run: |
hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-integration-sqlite-setup
hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-integration-sqlite
env:
AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/
AIRFLOW_CONN_AIRFLOW_DB: postgres://postgres:[email protected]:5432/postgres
PYTHONPATH: /home/runner/work/astronomer-cosmos/astronomer-cosmos/:$PYTHONPATH
AIRFLOW_CONN_DATABRICKS_DEFAULT: ${{ secrets.AIRFLOW_CONN_DATABRICKS_DEFAULT }}
DATABRICKS_HOST: ${{ secrets.DATABRICKS_HOST }}
DATABRICKS_TOKEN: ${{ secrets.DATABRICKS_TOKEN }}
DATABRICKS_WAREHOUSE_ID: ${{ secrets.DATABRICKS_WAREHOUSE_ID }}
DATABRICKS_CLUSTER_ID: ${{ secrets.DATABRICKS_CLUSTER_ID }}
COSMOS_CONN_POSTGRES_PASSWORD: ${{ secrets.COSMOS_CONN_POSTGRES_PASSWORD }}
POSTGRES_HOST: localhost
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: postgres
POSTGRES_SCHEMA: public
POSTGRES_PORT: 5432

- name: Upload coverage to Github
uses: actions/upload-artifact@v2
with:
name: coverage-integration-sqlite-test-${{ matrix.python-version }}-${{ matrix.airflow-version }}
path: .coverage

env:
AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/
AIRFLOW_CONN_AIRFLOW_DB: postgres://postgres:[email protected]:5432/postgres
PYTHONPATH: /home/runner/work/astronomer-cosmos/astronomer-cosmos/:$PYTHONPATH


Code-Coverage:
if: github.event.action != 'labeled'
needs:
Expand Down
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ You can render an Airflow Task Group using the ``DbtTaskGroup`` class. Here's an
with DAG(
dag_id="extract_dag",
start_date=datetime(2022, 11, 27),
schedule="@daily",
schedule_interval="@daily",
):
e1 = EmptyOperator(task_id="pre_dbt")
Expand Down
105 changes: 73 additions & 32 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
from airflow.models.dag import DAG
from airflow.utils.task_group import TaskGroup

from cosmos.constants import DbtResourceType, ExecutionMode, TestBehavior
from cosmos.constants import DbtResourceType, TestBehavior, ExecutionMode, TESTABLE_DBT_RESOURCES, DEFAULT_DBT_RESOURCES
from cosmos.core.airflow import get_airflow_task as create_airflow_task
from cosmos.core.graph.entities import Task as TaskMetadata
from cosmos.dbt.graph import DbtNode
from cosmos.log import get_logger


logger = get_logger(__name__)


Expand Down Expand Up @@ -104,10 +105,9 @@ def create_task_metadata(
}
args = {**args, **{"models": node.name}}

if hasattr(node.resource_type, "value") and node.resource_type in dbt_resource_to_class:
if DbtResourceType(node.resource_type) in DEFAULT_DBT_RESOURCES and node.resource_type in dbt_resource_to_class:
if node.resource_type == DbtResourceType.MODEL:
task_id = f"{node.name}_run"

if use_task_group is True:
task_id = "run"
else:
Expand All @@ -122,10 +122,58 @@ def create_task_metadata(
)
return task_metadata
else:
logger.error(f"Unsupported resource type {node.resource_type} (node {node.unique_id}).")
msg = (
f"Unavailable conversion function for <{node.resource_type}> (node <{node.unique_id}>). "
"Define a converter function using render_config.node_converters."
)
logger.warning(msg)
return None


def generate_task_or_group(
dag: DAG,
task_group: TaskGroup | None,
node: DbtNode,
execution_mode: ExecutionMode,
task_args: dict[str, Any],
test_behavior: TestBehavior,
on_warning_callback: Callable[..., Any] | None,
**kwargs: Any,
) -> BaseOperator | TaskGroup | None:
task_or_group: BaseOperator | TaskGroup | None = None

use_task_group = (
node.resource_type in TESTABLE_DBT_RESOURCES
and test_behavior == TestBehavior.AFTER_EACH
and node.has_test is True
)

task_meta = create_task_metadata(
node=node, execution_mode=execution_mode, args=task_args, use_task_group=use_task_group
)

# In most cases, we'll map one DBT node to one Airflow task
# The exception are the test nodes, since it would be too slow to run test tasks individually.
# If test_behaviour=="after_each", each model task will be bundled with a test task, using TaskGroup
if task_meta and node.resource_type != DbtResourceType.TEST:
if use_task_group:
with TaskGroup(dag=dag, group_id=node.name, parent_group=task_group) as model_task_group:
task = create_airflow_task(task_meta, dag, task_group=model_task_group)
test_meta = create_test_task_metadata(
"test",
execution_mode,
task_args=task_args,
model_name=node.name,
on_warning_callback=on_warning_callback,
)
test_task = create_airflow_task(test_meta, dag, task_group=model_task_group)
task >> test_task
task_or_group = model_task_group
else:
task_or_group = create_airflow_task(task_meta, dag, task_group=task_group)
return task_or_group


def build_airflow_graph(
nodes: dict[str, DbtNode],
dag: DAG, # Airflow-specific - parent DAG where to associate tasks and (optional) task groups
Expand All @@ -135,6 +183,7 @@ def build_airflow_graph(
dbt_project_name: str, # DBT / Cosmos - used to name test task if mode is after_all,
task_group: TaskGroup | None = None,
on_warning_callback: Callable[..., Any] | None = None, # argument specific to the DBT test command
node_converters: dict[DbtResourceType, Callable[..., Any]] | None = None,
) -> None:
"""
Instantiate dbt `nodes` as Airflow tasks within the given `task_group` (optional) or `dag` (mandatory).
Expand All @@ -160,41 +209,33 @@ def build_airflow_graph(
:param on_warning_callback: A callback function called on warnings with additional Context variables “test_names”
and “test_results” of type List.
"""
node_converters = node_converters or {}
tasks_map = {}
task_or_group: TaskGroup | BaseOperator

# In most cases, we'll map one DBT node to one Airflow task
# The exception are the test nodes, since it would be too slow to run test tasks individually.
# If test_behaviour=="after_each", each model task will be bundled with a test task, using TaskGroup
for node_id, node in nodes.items():
use_task_group = (
node.resource_type == DbtResourceType.MODEL
and test_behavior == TestBehavior.AFTER_EACH
and node.has_test is True
)
task_meta = create_task_metadata(
node=node, execution_mode=execution_mode, args=task_args, use_task_group=use_task_group
conversion_function = node_converters.get(node.resource_type, generate_task_or_group)
if conversion_function != generate_task_or_group:
logger.warning(
"The `node_converters` attribute is an experimental feature. "
"Its syntax and behavior can be changed before a major release."
)
logger.debug(f"Converting <{node.unique_id}> using <{conversion_function.__name__}>")
task_or_group = conversion_function( # type: ignore
dag=dag,
task_group=task_group,
dbt_project_name=dbt_project_name,
execution_mode=execution_mode,
task_args=task_args,
test_behavior=test_behavior,
on_warning_callback=on_warning_callback,
node=node,
)

if task_meta and node.resource_type != DbtResourceType.TEST:
if use_task_group is True:
with TaskGroup(dag=dag, group_id=node.name, parent_group=task_group) as model_task_group:
task = create_airflow_task(task_meta, dag, task_group=model_task_group)
test_meta = create_test_task_metadata(
"test",
execution_mode,
task_args=task_args,
model_name=node.name,
on_warning_callback=on_warning_callback,
)
test_task = create_airflow_task(test_meta, dag, task_group=model_task_group)
task >> test_task
task_or_group = model_task_group
else:
task_or_group = create_airflow_task(task_meta, dag, task_group=task_group)
if task_or_group is not None:
logger.debug(f"Conversion of <{node.unique_id}> was successful!")
tasks_map[node_id] = task_or_group

# If test_behaviour=="after_all", there will be one test task, run "by the end" of the DAG
# If test_behaviour=="after_all", there will be one test task, run by the end of the DAG
# The end of a DAG is defined by the DAG leaf tasks (tasks which do not have downstream tasks)
if test_behavior == TestBehavior.AFTER_ALL:
test_meta = create_test_task_metadata(
Expand Down
5 changes: 3 additions & 2 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
import tempfile
from dataclasses import dataclass, field
from pathlib import Path
from typing import Iterator
from typing import Any, Iterator, Callable

from cosmos.constants import TestBehavior, ExecutionMode, LoadMode
from cosmos.constants import DbtResourceType, TestBehavior, ExecutionMode, LoadMode
from cosmos.dbt.executable import get_system_dbt
from cosmos.exceptions import CosmosValueError
from cosmos.log import get_logger
Expand Down Expand Up @@ -39,6 +39,7 @@ class RenderConfig:
select: list[str] = field(default_factory=list)
exclude: list[str] = field(default_factory=list)
dbt_deps: bool = True
node_converters: dict[DbtResourceType, Callable[..., Any]] | None = None


@dataclass
Expand Down
18 changes: 16 additions & 2 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from enum import Enum
from pathlib import Path

import aenum


DBT_PROFILE_PATH = Path(os.path.expanduser("~")).joinpath(".dbt/profiles.yml")
DEFAULT_DBT_PROFILE_NAME = "cosmos_profile"
Expand Down Expand Up @@ -49,8 +51,7 @@ class ExecutionMode(Enum):
VIRTUALENV = "virtualenv"


# Rename to DbtResourceType
class DbtResourceType(Enum):
class DbtResourceType(aenum.Enum): # type: ignore
"""
Type of dbt node.
"""
Expand All @@ -60,3 +61,16 @@ class DbtResourceType(Enum):
SEED = "seed"
TEST = "test"
SOURCE = "source"

@classmethod
def _missing_value_(cls, value): # type: ignore
aenum.extend_enum(cls, value.upper(), value.lower())
return getattr(DbtResourceType, value.upper())


DEFAULT_DBT_RESOURCES = DbtResourceType.__members__.values()


TESTABLE_DBT_RESOURCES = {
DbtResourceType.MODEL
} # TODO: extend with DbtResourceType.SOURCE, DbtResourceType.SNAPSHOT, DbtResourceType.SEED)
2 changes: 2 additions & 0 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ def __init__(
load_mode = render_config.load_method
manifest_path = project_config.parsed_manifest_path
dbt_executable_path = execution_config.dbt_executable_path
node_converters = render_config.node_converters

profile_args = {}
if profile_config.profile_mapping:
Expand Down Expand Up @@ -168,4 +169,5 @@ def __init__(
test_behavior=test_behavior,
dbt_project_name=dbt_project.name,
on_warning_callback=on_warning_callback,
node_converters=node_converters,
)
5 changes: 3 additions & 2 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,12 +342,13 @@ def load_from_dbt_manifest(self) -> None:
with open(self.project.manifest_path) as fp: # type: ignore[arg-type]
manifest = json.load(fp)

for unique_id, node_dict in manifest.get("nodes", {}).items():
resources = {**manifest.get("nodes", {}), **manifest.get("sources", {}), **manifest.get("exposures", {})}
for unique_id, node_dict in resources.items():
node = DbtNode(
name=node_dict.get("alias", node_dict["name"]),
unique_id=unique_id,
resource_type=DbtResourceType(node_dict["resource_type"]),
depends_on=node_dict["depends_on"].get("nodes", []),
depends_on=node_dict.get("depends_on", {}).get("nodes", []),
file_path=self.project.dir / node_dict["original_file_path"],
tags=node_dict["tags"],
config=node_dict["config"],
Expand Down
1 change: 1 addition & 0 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def __init__(
self.compiled_sql = ""
self.should_store_compiled_sql = should_store_compiled_sql
self.openlineage_events_completes: list[RunEvent] = []
kwargs.pop("full_refresh", None) # usage of this param should be implemented in child classes
super().__init__(**kwargs)

@cached_property
Expand Down
6 changes: 5 additions & 1 deletion dev/dags/basic_cosmos_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,15 @@
DBT_ROOT_PATH / "jaffle_shop",
),
profile_config=profile_config,
operator_args={"install_deps": True},
operator_args={
"install_deps": True, # install any necessary dependencies before running any dbt command
"full_refresh": True, # used only in dbt commands that support this flag
},
# normal dag parameters
schedule_interval="@daily",
start_date=datetime(2023, 1, 1),
catchup=False,
dag_id="basic_cosmos_dag",
default_args={"retries": 2},
)
# [END local_example]
1 change: 1 addition & 0 deletions dev/dags/basic_cosmos_task_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def basic_cosmos_task_group() -> None:
),
operator_args={"install_deps": True},
profile_config=profile_config,
default_args={"retries": 2},
)

post_dbt = EmptyOperator(task_id="post_dbt")
Expand Down
1 change: 1 addition & 0 deletions dev/dags/cosmos_manifest_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,6 @@
start_date=datetime(2023, 1, 1),
catchup=False,
dag_id="cosmos_manifest_example",
default_args={"retries": 2},
)
# [END local_example]
1 change: 1 addition & 0 deletions dev/dags/cosmos_profile_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def cosmos_profile_mapping() -> None:
),
),
operator_args={"install_deps": True},
default_args={"retries": 2},
)

post_dbt = EmptyOperator(task_id="post_dbt")
Expand Down
4 changes: 2 additions & 2 deletions dev/dags/cosmos_seed_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@
with DAG(
dag_id="extract_dag",
start_date=datetime(2022, 11, 27),
schedule="@daily",
schedule_interval="@daily",
doc_md=__doc__,
catchup=False,
max_active_runs=1,
default_args={"owner": "01-EXTRACT"},
default_args={"owner": "01-EXTRACT", "retries": 2},
) as dag:
with TaskGroup(group_id="drop_seeds_if_exist") as drop_seeds:
for seed in ["raw_customers", "raw_payments", "raw_orders"]:
Expand Down
4 changes: 4 additions & 0 deletions dev/dags/dbt/jaffle_shop/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

target/
dbt_packages/
logs/
Loading

0 comments on commit 25f50b4

Please sign in to comment.