Skip to content

Commit

Permalink
Merge branch 'main' into doc/update-contributing-guide
Browse files Browse the repository at this point in the history
  • Loading branch information
tatiana authored Oct 19, 2023
2 parents dcfa80a + c82e6bc commit e6a1a6a
Show file tree
Hide file tree
Showing 65 changed files with 8,176 additions and 12,908 deletions.
62 changes: 62 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,68 @@ jobs:
DATABRICKS_WAREHOUSE_ID: ${{ secrets.DATABRICKS_WAREHOUSE_ID }}
DATABRICKS_CLUSTER_ID: ${{ secrets.DATABRICKS_CLUSTER_ID }}

Run-Integration-Tests-Sqlite:
needs: Authorize
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.10"]
airflow-version: ["2.7"]

steps:
- uses: actions/checkout@v3
with:
ref: ${{ github.event.pull_request.head.sha || github.ref }}
- uses: actions/cache@v3
with:
path: |
~/.cache/pip
.nox
key: integration-sqlite-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ hashFiles('pyproject.toml') }}-${{ hashFiles('cosmos/__init__.py') }}

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}

- name: Install packages and dependencies
run: |
python -m pip install hatch
hatch -e tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }} run pip freeze
- name: Test Cosmos against Airflow ${{ matrix.airflow-version }} and Python ${{ matrix.python-version }}
run: |
hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-integration-sqlite-setup
hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-integration-sqlite
env:
AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/
AIRFLOW_CONN_AIRFLOW_DB: postgres://postgres:[email protected]:5432/postgres
PYTHONPATH: /home/runner/work/astronomer-cosmos/astronomer-cosmos/:$PYTHONPATH
AIRFLOW_CONN_DATABRICKS_DEFAULT: ${{ secrets.AIRFLOW_CONN_DATABRICKS_DEFAULT }}
DATABRICKS_HOST: ${{ secrets.DATABRICKS_HOST }}
DATABRICKS_TOKEN: ${{ secrets.DATABRICKS_TOKEN }}
DATABRICKS_WAREHOUSE_ID: ${{ secrets.DATABRICKS_WAREHOUSE_ID }}
DATABRICKS_CLUSTER_ID: ${{ secrets.DATABRICKS_CLUSTER_ID }}
COSMOS_CONN_POSTGRES_PASSWORD: ${{ secrets.COSMOS_CONN_POSTGRES_PASSWORD }}
POSTGRES_HOST: localhost
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: postgres
POSTGRES_SCHEMA: public
POSTGRES_PORT: 5432

- name: Upload coverage to Github
uses: actions/upload-artifact@v2
with:
name: coverage-integration-sqlite-test-${{ matrix.python-version }}-${{ matrix.airflow-version }}
path: .coverage

env:
AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/
AIRFLOW_CONN_AIRFLOW_DB: postgres://postgres:[email protected]:5432/postgres
PYTHONPATH: /home/runner/work/astronomer-cosmos/astronomer-cosmos/:$PYTHONPATH


Code-Coverage:
if: github.event.action != 'labeled'
needs:
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ repos:
alias: black
additional_dependencies: [black>=22.10.0]
- repo: https://github.com/pre-commit/mirrors-mypy
rev: 'v1.5.1'
rev: 'v1.6.0'
hooks:
- id: mypy
name: mypy-python
Expand Down
24 changes: 24 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,30 @@
Changelog
=========

1.2.0 (2023-10-13)
------------------

Features

* Add support to model versioning available since dbt 1.6 by @binhnq94 in #516
* Add AWS Athena profile mapping by @benjamin-awd in #578
* Support customizing how dbt nodes are converted to Airflow by @tatiana in #503
* Make the arg ``dbt_project_path`` in the ``ProjectConfig`` optional by @MrBones757 in #581

Bug fixes

* Fix Cosmos custom selector to support filtering a single model by @jlaneve and @harels in #576
* Fix using ``GoogleCloudServiceAccountDictProfileMapping`` together with ``LoadMethod.DBT_LS`` by @joppevos in #587
* Fix using the ``full_refresh`` argument in projects that contain tests by @EgorSemenov and @tatiana in #590
* Stop creating symbolic links for ``dbt_packages`` (solves ``LocalExecutor`` concurrency issue) by @tatiana in #600

Others

* Docs: add reference to original Jaffle Shop project by @erdos2n in #583
* Docs: retries & note about DagBag error by @TJaniF in #592
* pre-commit updates in #575 and #585


1.1.3 (2023-09-28)
------------------

Expand Down
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ You can render an Airflow Task Group using the ``DbtTaskGroup`` class. Here's an
with DAG(
dag_id="extract_dag",
start_date=datetime(2022, 11, 27),
schedule="@daily",
schedule_interval="@daily",
):
e1 = EmptyOperator(task_id="pre_dbt")
Expand Down
2 changes: 1 addition & 1 deletion cosmos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
Contains dags, task groups, and operators.
"""
__version__ = "1.2.0a1"
__version__ = "1.2.0"

from cosmos.airflow.dag import DbtDag
from cosmos.airflow.task_group import DbtTaskGroup
Expand Down
120 changes: 84 additions & 36 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
from airflow.models.dag import DAG
from airflow.utils.task_group import TaskGroup

from cosmos.constants import DbtResourceType, ExecutionMode, TestBehavior
from cosmos.constants import DbtResourceType, TestBehavior, ExecutionMode, TESTABLE_DBT_RESOURCES, DEFAULT_DBT_RESOURCES
from cosmos.core.airflow import get_airflow_task as create_airflow_task
from cosmos.core.graph.entities import Task as TaskMetadata
from cosmos.dbt.graph import DbtNode
from cosmos.log import get_logger


logger = get_logger(__name__)


Expand Down Expand Up @@ -55,7 +56,7 @@ def create_test_task_metadata(
execution_mode: ExecutionMode,
task_args: dict[str, Any],
on_warning_callback: Callable[..., Any] | None = None,
model_name: str | None = None,
node: DbtNode | None = None,
) -> TaskMetadata:
"""
Create the metadata that will be used to instantiate the Airflow Task that will be used to run the Dbt test node.
Expand All @@ -65,13 +66,18 @@ def create_test_task_metadata(
:param task_args: Arguments to be used to instantiate an Airflow Task
:param on_warning_callback: A callback function called on warnings with additional Context variables “test_names”
and “test_results” of type List.
:param model_name: If the test relates to a specific model, the name of the model it relates to
:param node: If the test relates to a specific node, the node reference
:returns: The metadata necessary to instantiate the source dbt node as an Airflow task.
"""
task_args = dict(task_args)
task_args["on_warning_callback"] = on_warning_callback
if model_name is not None:
task_args["models"] = model_name
if node is not None:
if node.resource_type == DbtResourceType.MODEL:
task_args["models"] = node.name
elif node.resource_type == DbtResourceType.SOURCE:
task_args["select"] = f"source:{node.unique_id[len('source.'):]}"
else: # tested with node.resource_type == DbtResourceType.SEED or DbtResourceType.SNAPSHOT
task_args["select"] = node.name
return TaskMetadata(
id=test_task_name,
operator_class=calculate_operator_class(
Expand Down Expand Up @@ -104,14 +110,15 @@ def create_task_metadata(
}
args = {**args, **{"models": node.name}}

if hasattr(node.resource_type, "value") and node.resource_type in dbt_resource_to_class:
if DbtResourceType(node.resource_type) in DEFAULT_DBT_RESOURCES and node.resource_type in dbt_resource_to_class:
if node.resource_type == DbtResourceType.MODEL:
task_id = f"{node.name}_run"

if use_task_group is True:
task_id = "run"
else:
task_id = f"{node.name}_{node.resource_type.value}"
if use_task_group is True:
task_id = node.resource_type.value

task_metadata = TaskMetadata(
id=task_id,
Expand All @@ -122,10 +129,58 @@ def create_task_metadata(
)
return task_metadata
else:
logger.error(f"Unsupported resource type {node.resource_type} (node {node.unique_id}).")
msg = (
f"Unavailable conversion function for <{node.resource_type}> (node <{node.unique_id}>). "
"Define a converter function using render_config.node_converters."
)
logger.warning(msg)
return None


def generate_task_or_group(
dag: DAG,
task_group: TaskGroup | None,
node: DbtNode,
execution_mode: ExecutionMode,
task_args: dict[str, Any],
test_behavior: TestBehavior,
on_warning_callback: Callable[..., Any] | None,
**kwargs: Any,
) -> BaseOperator | TaskGroup | None:
task_or_group: BaseOperator | TaskGroup | None = None

use_task_group = (
node.resource_type in TESTABLE_DBT_RESOURCES
and test_behavior == TestBehavior.AFTER_EACH
and node.has_test is True
)

task_meta = create_task_metadata(
node=node, execution_mode=execution_mode, args=task_args, use_task_group=use_task_group
)

# In most cases, we'll map one DBT node to one Airflow task
# The exception are the test nodes, since it would be too slow to run test tasks individually.
# If test_behaviour=="after_each", each model task will be bundled with a test task, using TaskGroup
if task_meta and node.resource_type != DbtResourceType.TEST:
if use_task_group:
with TaskGroup(dag=dag, group_id=node.name, parent_group=task_group) as model_task_group:
task = create_airflow_task(task_meta, dag, task_group=model_task_group)
test_meta = create_test_task_metadata(
"test",
execution_mode,
task_args=task_args,
node=node,
on_warning_callback=on_warning_callback,
)
test_task = create_airflow_task(test_meta, dag, task_group=model_task_group)
task >> test_task
task_or_group = model_task_group
else:
task_or_group = create_airflow_task(task_meta, dag, task_group=task_group)
return task_or_group


def build_airflow_graph(
nodes: dict[str, DbtNode],
dag: DAG, # Airflow-specific - parent DAG where to associate tasks and (optional) task groups
Expand All @@ -135,6 +190,7 @@ def build_airflow_graph(
dbt_project_name: str, # DBT / Cosmos - used to name test task if mode is after_all,
task_group: TaskGroup | None = None,
on_warning_callback: Callable[..., Any] | None = None, # argument specific to the DBT test command
node_converters: dict[DbtResourceType, Callable[..., Any]] | None = None,
) -> None:
"""
Instantiate dbt `nodes` as Airflow tasks within the given `task_group` (optional) or `dag` (mandatory).
Expand All @@ -160,41 +216,33 @@ def build_airflow_graph(
:param on_warning_callback: A callback function called on warnings with additional Context variables “test_names”
and “test_results” of type List.
"""
node_converters = node_converters or {}
tasks_map = {}
task_or_group: TaskGroup | BaseOperator

# In most cases, we'll map one DBT node to one Airflow task
# The exception are the test nodes, since it would be too slow to run test tasks individually.
# If test_behaviour=="after_each", each model task will be bundled with a test task, using TaskGroup
for node_id, node in nodes.items():
use_task_group = (
node.resource_type == DbtResourceType.MODEL
and test_behavior == TestBehavior.AFTER_EACH
and node.has_test is True
)
task_meta = create_task_metadata(
node=node, execution_mode=execution_mode, args=task_args, use_task_group=use_task_group
conversion_function = node_converters.get(node.resource_type, generate_task_or_group)
if conversion_function != generate_task_or_group:
logger.warning(
"The `node_converters` attribute is an experimental feature. "
"Its syntax and behavior can be changed before a major release."
)
logger.debug(f"Converting <{node.unique_id}> using <{conversion_function.__name__}>")
task_or_group = conversion_function( # type: ignore
dag=dag,
task_group=task_group,
dbt_project_name=dbt_project_name,
execution_mode=execution_mode,
task_args=task_args,
test_behavior=test_behavior,
on_warning_callback=on_warning_callback,
node=node,
)

if task_meta and node.resource_type != DbtResourceType.TEST:
if use_task_group is True:
with TaskGroup(dag=dag, group_id=node.name, parent_group=task_group) as model_task_group:
task = create_airflow_task(task_meta, dag, task_group=model_task_group)
test_meta = create_test_task_metadata(
"test",
execution_mode,
task_args=task_args,
model_name=node.name,
on_warning_callback=on_warning_callback,
)
test_task = create_airflow_task(test_meta, dag, task_group=model_task_group)
task >> test_task
task_or_group = model_task_group
else:
task_or_group = create_airflow_task(task_meta, dag, task_group=task_group)
if task_or_group is not None:
logger.debug(f"Conversion of <{node.unique_id}> was successful!")
tasks_map[node_id] = task_or_group

# If test_behaviour=="after_all", there will be one test task, run "by the end" of the DAG
# If test_behaviour=="after_all", there will be one test task, run by the end of the DAG
# The end of a DAG is defined by the DAG leaf tasks (tasks which do not have downstream tasks)
if test_behavior == TestBehavior.AFTER_ALL:
test_meta = create_test_task_metadata(
Expand Down
Loading

0 comments on commit e6a1a6a

Please sign in to comment.