From 47a89fe26da74d0062f99e1a175518df8ac69343 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Tue, 3 Dec 2024 02:20:07 +0530 Subject: [PATCH 1/6] Fix dag randering for config using from upstream task --- cosmos/converter.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cosmos/converter.py b/cosmos/converter.py index 5bf99cac8..fae07e2e8 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -229,8 +229,8 @@ def __init__( validate_changed_config_paths(execution_config, project_config, render_config) - env_vars = copy.deepcopy(project_config.env_vars or operator_args.get("env")) - dbt_vars = copy.deepcopy(project_config.dbt_vars or operator_args.get("vars")) + env_vars = project_config.env_vars or operator_args.get("vars") + dbt_vars = project_config.dbt_vars or operator_args.get("vars") if execution_config.execution_mode != ExecutionMode.VIRTUALENV and execution_config.virtualenv_dir is not None: logger.warning( From 3ca5f7b47bd95f9eafaa43b74efb582859f93846 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Tue, 10 Dec 2024 23:26:10 +0530 Subject: [PATCH 2/6] Fix --- cosmos/converter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/converter.py b/cosmos/converter.py index fae07e2e8..524864aa9 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -229,7 +229,7 @@ def __init__( validate_changed_config_paths(execution_config, project_config, render_config) - env_vars = project_config.env_vars or operator_args.get("vars") + env_vars = project_config.env_vars or operator_args.get("env") dbt_vars = project_config.dbt_vars or operator_args.get("vars") if execution_config.execution_mode != ExecutionMode.VIRTUALENV and execution_config.virtualenv_dir is not None: From 000547d4a49ea556e6b425e00616a04d20ca0af6 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Thu, 19 Dec 2024 21:00:34 +0530 Subject: [PATCH 3/6] Add tests --- dev/dags/example_taskflow.py | 46 ++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) create mode 100644 dev/dags/example_taskflow.py diff --git a/dev/dags/example_taskflow.py b/dev/dags/example_taskflow.py new file mode 100644 index 000000000..572f3e7f0 --- /dev/null +++ b/dev/dags/example_taskflow.py @@ -0,0 +1,46 @@ +import os +from datetime import datetime +from pathlib import Path + +from airflow.decorators import dag, task + +from cosmos import DbtTaskGroup, ProfileConfig, ProjectConfig +from cosmos.profiles import PostgresUserPasswordProfileMapping + +DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt" +DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH)) + + +profile_config = ProfileConfig( + profile_name="default", + target_name="dev", + profile_mapping=PostgresUserPasswordProfileMapping( + conn_id="example_conn", + profile_args={"schema": "public"}, + disable_event_tracking=True, + ), +) + + +@task(task_id="build_partial_dbt_env_vars_operator") +def build_partial_dbt_env(): + return {"ENV_VAR_NAME": "value", "ENV_VAR_NAME_2": False} + + +@dag( + schedule_interval="@daily", + start_date=datetime(2024, 1, 1), + catchup=False, + tags=["simple"], +) +def example_taskflow() -> None: + DbtTaskGroup( + group_id="transform_task_group", + project_config=ProjectConfig( + dbt_project_path=DBT_ROOT_PATH / "altered_jaffle_shop", env_vars=build_partial_dbt_env() + ), + profile_config=profile_config, + ) + + +example_taskflow() From 5da600e08324f5a9dfe54265adfac76f1be09e48 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Thu, 19 Dec 2024 21:01:45 +0530 Subject: [PATCH 4/6] Add tests --- dev/dags/example_taskflow.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dev/dags/example_taskflow.py b/dev/dags/example_taskflow.py index 572f3e7f0..a81db05f9 100644 --- a/dev/dags/example_taskflow.py +++ b/dev/dags/example_taskflow.py @@ -31,7 +31,6 @@ def build_partial_dbt_env(): schedule_interval="@daily", start_date=datetime(2024, 1, 1), catchup=False, - tags=["simple"], ) def example_taskflow() -> None: DbtTaskGroup( From f7dffc38285db4fb6cbc7700144d92aca7dcfcf7 Mon Sep 17 00:00:00 2001 From: pankajastro Date: Thu, 19 Dec 2024 21:56:25 +0530 Subject: [PATCH 5/6] Fix test --- dev/dags/example_taskflow.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dev/dags/example_taskflow.py b/dev/dags/example_taskflow.py index a81db05f9..36020cf51 100644 --- a/dev/dags/example_taskflow.py +++ b/dev/dags/example_taskflow.py @@ -36,7 +36,9 @@ def example_taskflow() -> None: DbtTaskGroup( group_id="transform_task_group", project_config=ProjectConfig( - dbt_project_path=DBT_ROOT_PATH / "altered_jaffle_shop", env_vars=build_partial_dbt_env() + dbt_project_path=DBT_ROOT_PATH / "jaffle_shop", + manifest_path=DBT_ROOT_PATH / "jaffle_shop" / "target" / "manifest.json", + env_vars=build_partial_dbt_env(), ), profile_config=profile_config, ) From 41e892efe3d621499ba12d0efda91e6599e8b75f Mon Sep 17 00:00:00 2001 From: pankajastro Date: Thu, 19 Dec 2024 22:37:31 +0530 Subject: [PATCH 6/6] Fix example --- dev/dags/example_taskflow.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dev/dags/example_taskflow.py b/dev/dags/example_taskflow.py index 36020cf51..e75be2adc 100644 --- a/dev/dags/example_taskflow.py +++ b/dev/dags/example_taskflow.py @@ -41,6 +41,7 @@ def example_taskflow() -> None: env_vars=build_partial_dbt_env(), ), profile_config=profile_config, + operator_args={"install_deps": True}, )