Skip to content

Commit

Permalink
Release 1.2.3 (#665)
Browse files Browse the repository at this point in the history
Bug fix

* Fix reusing config accross TaskGroups/DAGs by @tatiana in #664
  • Loading branch information
tatiana authored Nov 9, 2023
1 parent 3e12377 commit 6fa970f
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 5 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
Changelog
=========

1.2.3 (2023-11-09)
------------------

Bug fixes

* Fix reusing config across TaskGroups/DAGs by @tatiana in #664


1.2.2 (2023-11-06)
------------------

Expand Down
2 changes: 1 addition & 1 deletion cosmos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
Contains dags, task groups, and operators.
"""
__version__ = "1.2.2"
__version__ = "1.2.3"

from cosmos.airflow.dag import DbtDag
from cosmos.airflow.task_group import DbtTaskGroup
Expand Down
5 changes: 5 additions & 0 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from __future__ import annotations

import copy
import inspect
from typing import Any, Callable

Expand Down Expand Up @@ -118,6 +119,10 @@ def __init__(
# If we are using the old interface, we should migrate it to the new interface
# This is safe to do now since we have validated which config interface we're using
if project_config.dbt_project_path:
# We copy the configuration so the change does not affect other DAGs or TaskGroups
# that may reuse the same original configuration
render_config = copy.deepcopy(render_config)
execution_config = copy.deepcopy(execution_config)
render_config.project_path = project_config.dbt_project_path
execution_config.project_path = project_config.dbt_project_path

Expand Down
26 changes: 22 additions & 4 deletions dev/dags/basic_cosmos_task_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
An example DAG that uses Cosmos to render a dbt project as a TaskGroup.
"""
import os

from datetime import datetime
from pathlib import Path

from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator

from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, RenderConfig, ExecutionConfig
from cosmos.profiles import PostgresUserPasswordProfileMapping

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
Expand All @@ -23,6 +24,8 @@
),
)

shared_execution_config = ExecutionConfig()


@dag(
schedule_interval="@daily",
Expand All @@ -35,19 +38,34 @@ def basic_cosmos_task_group() -> None:
"""
pre_dbt = EmptyOperator(task_id="pre_dbt")

jaffle_shop = DbtTaskGroup(
group_id="test_123",
customers = DbtTaskGroup(
group_id="customers",
project_config=ProjectConfig(
(DBT_ROOT_PATH / "jaffle_shop").as_posix(),
),
render_config=RenderConfig(select=["path:seeds/raw_customers.csv"]),
execution_config=shared_execution_config,
operator_args={"install_deps": True},
profile_config=profile_config,
default_args={"retries": 2},
)

orders = DbtTaskGroup(
group_id="orders",
project_config=ProjectConfig(
(DBT_ROOT_PATH / "jaffle_shop").as_posix(),
),
render_config=RenderConfig(select=["path:seeds/raw_orders.csv"]),
execution_config=shared_execution_config,
operator_args={"install_deps": True},
profile_config=profile_config,
default_args={"retries": 2},
)

post_dbt = EmptyOperator(task_id="post_dbt")

pre_dbt >> jaffle_shop >> post_dbt
pre_dbt >> customers >> post_dbt
pre_dbt >> orders >> post_dbt


basic_cosmos_task_group()

0 comments on commit 6fa970f

Please sign in to comment.