Skip to content

Commit

Permalink
Fix dag randering for taskflow + DbtTaskGroup combo (#1360)
Browse files Browse the repository at this point in the history
## Description
I believe the deepcopy results in two separate DAG objects—the original
object and its deepcopy and the task is referencing to both objects
causing failure
```
airflow.exceptions.AirflowException: Tried to set relationships between tasks in more than one DAG: {<DAG: DAG_NAME>, <DAG: DAG_NAME>}
``` 

<img width="1681" alt="Screenshot 2024-12-03 at 2 30 23 AM"
src="https://github.com/user-attachments/assets/93859f4b-84d1-4e60-87a7-25a1b750a8e3">

DAG Code

```python
from datetime import datetime

from airflow.decorators import task, dag
from cosmos import DbtTaskGroup, ProjectConfig
from include.constants import jaffle_shop_path, venv_execution_config, manifest_path
from include.profiles import airflow_db

@task(task_id="build_partial_dbt_env_vars_operator")
def build_partial_dbt_env():
    # some code
    # This return is for demonstration purposes only
    return {
        "ENV_VAR_NAME": "value",
        "ENV_VAR_NAME_2": False
    }


# partial_dbt_env = build_partial_dbt_env()



@dag(
    schedule_interval="@daily",
    start_date=datetime(2023, 1, 1),
    catchup=False,
    tags=["simple"],
)
def simple_task_group1() -> None:
    DbtTaskGroup(
        group_id="transform_task_group",
        project_config=ProjectConfig(
        dbt_project_path=jaffle_shop_path,
        manifest_path=manifest_path,
        env_vars=build_partial_dbt_env()
    ),
        profile_config=airflow_db,
        execution_config=venv_execution_config,
    )

simple_task_group1()
#partial_dbt_env >> transform_task_group
``` 

<!-- Add a brief but complete description of the change. -->

## Related Issue(s)

closes: #1218

## Breaking Change?

<!-- If this introduces a breaking change, specify that here. -->

## Checklist

- [ ] I have made corresponding changes to the documentation (if
required)
- [ ] I have added tests that prove my fix is effective or that my
feature works
  • Loading branch information
pankajastro authored Dec 19, 2024
1 parent ea67123 commit c21f8c9
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 2 deletions.
4 changes: 2 additions & 2 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,8 @@ def __init__(

validate_changed_config_paths(execution_config, project_config, render_config)

env_vars = copy.deepcopy(project_config.env_vars or operator_args.get("env"))
dbt_vars = copy.deepcopy(project_config.dbt_vars or operator_args.get("vars"))
env_vars = project_config.env_vars or operator_args.get("env")
dbt_vars = project_config.dbt_vars or operator_args.get("vars")

if execution_config.execution_mode != ExecutionMode.VIRTUALENV and execution_config.virtualenv_dir is not None:
logger.warning(
Expand Down
48 changes: 48 additions & 0 deletions dev/dags/example_taskflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import os
from datetime import datetime
from pathlib import Path

from airflow.decorators import dag, task

from cosmos import DbtTaskGroup, ProfileConfig, ProjectConfig
from cosmos.profiles import PostgresUserPasswordProfileMapping

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))


profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="example_conn",
profile_args={"schema": "public"},
disable_event_tracking=True,
),
)


@task(task_id="build_partial_dbt_env_vars_operator")
def build_partial_dbt_env():
return {"ENV_VAR_NAME": "value", "ENV_VAR_NAME_2": False}


@dag(
schedule_interval="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
)
def example_taskflow() -> None:
DbtTaskGroup(
group_id="transform_task_group",
project_config=ProjectConfig(
dbt_project_path=DBT_ROOT_PATH / "jaffle_shop",
manifest_path=DBT_ROOT_PATH / "jaffle_shop" / "target" / "manifest.json",
env_vars=build_partial_dbt_env(),
),
profile_config=profile_config,
operator_args={"install_deps": True},
)


example_taskflow()

0 comments on commit c21f8c9

Please sign in to comment.