From 7b3e8ab93eda1dfd233ad201a231fb5033615bd7 Mon Sep 17 00:00:00 2001 From: Justin Bandoro <79104794+jbandoro@users.noreply.github.com> Date: Thu, 7 Dec 2023 09:55:06 -0800 Subject: [PATCH] Expose environment variables and dbt variables in `ProjectConfig` (#735) ## Description Currently users have to specify environment variables in both `RenderConfig` and `operator_args` for the dbt dag so that they're used during rendering and execution. dbt variables cannot currently be used in rendering, only during execution in `operator_args`. This PR exposes `env_vars` and `dbt_vars` in `ProjectConfig` and uses the dbt variables in dbt ls load mode. Updates in this PR: - Deprecates `operator_args` "env" and "var", raising warnings that they will be removed in Cosmos 2.x - Deprecates `RenderConfig.env_vars` raising warnings that it will be removed in Cosmos 2.x - Adds both `dbt_vars` and `env_vars` within `ProjectConfig` - dbt variables are used in dbt ls load method - Raises an exception if **both** operator_args and ProjectConfig variables are used. - Updates docs and example DAGs to use ProjectConfig args. ## Related Issue(s) Closes #712 Closes #544 ## Breaking Change? None ## Checklist - [x] I have made corresponding changes to the documentation (if required) - [x] I have added tests that prove my fix is effective or that my feature works --------- Co-authored-by: Tatiana Al-Chueyr --- .pre-commit-config.yaml | 1 + cosmos/config.py | 20 ++++- cosmos/converter.py | 44 ++++++++-- cosmos/dbt/graph.py | 13 ++- cosmos/dbt/parser/project.py | 15 ++-- dev/dags/dbt/simple/models/top_animations.sql | 6 +- dev/dags/example_cosmos_sources.py | 15 ++-- docs/configuration/operator-args.rst | 4 +- docs/configuration/project-config.rst | 18 +++- docs/configuration/render-config.rst | 2 +- tests/dbt/parser/test_project.py | 2 +- tests/dbt/test_graph.py | 86 +++++++++++++++++++ tests/test_config.py | 6 ++ tests/test_converter.py | 86 ++++++++++++++++++- 14 files changed, 281 insertions(+), 37 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 25bfe1d52..b0c1aac6a 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -76,6 +76,7 @@ repos: hooks: - id: mypy name: mypy-python + args: [--config-file, "./pyproject.toml"] additional_dependencies: [ types-PyYAML, diff --git a/cosmos/config.py b/cosmos/config.py index 40756d2bb..c5e7a69a3 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -7,6 +7,7 @@ import tempfile from dataclasses import InitVar, dataclass, field from pathlib import Path +import warnings from typing import Any, Iterator, Callable from cosmos.constants import DbtResourceType, TestBehavior, ExecutionMode, LoadMode, TestIndirectSelection @@ -42,7 +43,7 @@ class RenderConfig: :param dbt_deps: Configure to run dbt deps when using dbt ls for dag parsing :param node_converters: a dictionary mapping a ``DbtResourceType`` into a callable. Users can control how to render dbt nodes in Airflow. Only supported when using ``load_method=LoadMode.DBT_MANIFEST`` or ``LoadMode.DBT_LS``. :param dbt_executable_path: The path to the dbt executable for dag generation. Defaults to dbt if available on the path. - :param env_vars: A dictionary of environment variables for rendering. Only supported when using ``LoadMode.DBT_LS``. + :param env_vars: (Deprecated since Cosmos 1.3 use ProjectConfig.env_vars) A dictionary of environment variables for rendering. Only supported when using ``LoadMode.DBT_LS``. :param dbt_project_path Configures the DBT project location accessible on the airflow controller for DAG rendering. Mutually Exclusive with ProjectConfig.dbt_project_path. Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM``. """ @@ -54,12 +55,17 @@ class RenderConfig: dbt_deps: bool = True node_converters: dict[DbtResourceType, Callable[..., Any]] | None = None dbt_executable_path: str | Path = get_system_dbt() - env_vars: dict[str, str] = field(default_factory=dict) + env_vars: dict[str, str] | None = None dbt_project_path: InitVar[str | Path | None] = None project_path: Path | None = field(init=False) def __post_init__(self, dbt_project_path: str | Path | None) -> None: + if self.env_vars: + warnings.warn( + "RenderConfig.env_vars is deprecated since Cosmos 1.3 and will be removed in Cosmos 2.0. Use ProjectConfig.env_vars instead.", + DeprecationWarning, + ) self.project_path = Path(dbt_project_path) if dbt_project_path else None def validate_dbt_command(self, fallback_cmd: str | Path = "") -> None: @@ -96,6 +102,11 @@ class ProjectConfig: :param manifest_path: The absolute path to the dbt manifest file. Defaults to None :param project_name: Allows the user to define the project name. Required if dbt_project_path is not defined. Defaults to the folder name of dbt_project_path. + :param env_vars: Dictionary of environment variables that are used for both rendering and execution. Rendering with + env vars is only supported when using ``RenderConfig.LoadMode.DBT_LS`` load mode. + :param dbt_vars: Dictionary of dbt variables for the project. This argument overrides variables defined in your dbt_project.yml + file. The dictionary is dumped to a yaml string and passed to dbt commands as the --vars argument. Variables are only + supported for rendering when using ``RenderConfig.LoadMode.DBT_LS`` and ``RenderConfig.LoadMode.CUSTOM`` load mode. """ dbt_project_path: Path | None = None @@ -113,6 +124,8 @@ def __init__( snapshots_relative_path: str | Path = "snapshots", manifest_path: str | Path | None = None, project_name: str | None = None, + env_vars: dict[str, str] | None = None, + dbt_vars: dict[str, str] | None = None, ): # Since we allow dbt_project_path to be defined in ExecutionConfig and RenderConfig # dbt_project_path may not always be defined here. @@ -136,6 +149,9 @@ def __init__( if manifest_path: self.manifest_path = Path(manifest_path) + self.env_vars = env_vars + self.dbt_vars = dbt_vars + def validate_project(self) -> None: """ Validates necessary context is present for a project. diff --git a/cosmos/converter.py b/cosmos/converter.py index 637ef9826..c2b31700b 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -6,6 +6,7 @@ import inspect from typing import Any, Callable import copy +from warnings import warn from airflow.models.dag import DAG from airflow.utils.task_group import TaskGroup @@ -96,10 +97,11 @@ def validate_arguments( def validate_initial_user_config( - execution_config: ExecutionConfig | None, + execution_config: ExecutionConfig, profile_config: ProfileConfig | None, project_config: ProjectConfig, - render_config: RenderConfig | None, + render_config: RenderConfig, + operator_args: dict[str, Any], ): """ Validates if the user set the fields as expected. @@ -108,6 +110,7 @@ def validate_initial_user_config( :param profile_config: Configuration related to dbt database configuration (profile) :param project_config: Configuration related to the overall dbt project :param render_config: Configuration related to how to convert the dbt workflow into an Airflow DAG + :param operator_args: Arguments to pass to the underlying operators. """ if profile_config is None and execution_config.execution_mode not in ( ExecutionMode.KUBERNETES, @@ -123,6 +126,33 @@ def validate_initial_user_config( + "If using RenderConfig.dbt_project_path or ExecutionConfig.dbt_project_path, ProjectConfig.dbt_project_path should be None" ) + # Cosmos 2.0 will remove the ability to pass in operator_args with 'env' and 'vars' in place of ProjectConfig.env_vars and + # ProjectConfig.dbt_vars. + if "env" in operator_args: + warn( + "operator_args with 'env' is deprecated since Cosmos 1.3 and will be removed in Cosmos 2.0. Use ProjectConfig.env_vars instead.", + DeprecationWarning, + ) + if project_config.env_vars: + raise CosmosValueError( + "ProjectConfig.env_vars and operator_args with 'env' are mutually exclusive and only one can be used." + ) + if "vars" in operator_args: + warn( + "operator_args with 'vars' is deprecated since Cosmos 1.3 and will be removed in Cosmos 2.0. Use ProjectConfig.vars instead.", + DeprecationWarning, + ) + if project_config.dbt_vars: + raise CosmosValueError( + "ProjectConfig.dbt_vars and operator_args with 'vars' are mutually exclusive and only one can be used." + ) + # Cosmos 2.0 will remove the ability to pass RenderConfig.env_vars in place of ProjectConfig.env_vars, check that both are not set. + if project_config.env_vars and render_config.env_vars: + raise CosmosValueError( + "Both ProjectConfig.env_vars and RenderConfig.env_vars were provided. RenderConfig.env_vars is deprecated since Cosmos 1.3, " + "please use ProjectConfig.env_vars instead." + ) + def validate_adapted_user_config( execution_config: ExecutionConfig | None, project_config: ProjectConfig, render_config: RenderConfig | None @@ -182,7 +212,7 @@ def __init__( render_config = render_config or RenderConfig() operator_args = operator_args or {} - validate_initial_user_config(execution_config, profile_config, project_config, render_config) + validate_initial_user_config(execution_config, profile_config, project_config, render_config, operator_args) # If we are using the old interface, we should migrate it to the new interface # This is safe to do now since we have validated which config interface we're using @@ -191,8 +221,8 @@ def __init__( validate_adapted_user_config(execution_config, project_config, render_config) - if not operator_args: - operator_args = {} + env_vars = project_config.env_vars or operator_args.pop("env", None) + dbt_vars = project_config.dbt_vars or operator_args.pop("vars", None) # Previously, we were creating a cosmos.dbt.project.DbtProject # DbtProject has now been replaced with ProjectConfig directly @@ -209,7 +239,7 @@ def __init__( render_config=render_config, execution_config=execution_config, profile_config=profile_config, - operator_args=operator_args, + dbt_vars=dbt_vars, ) dbt_graph.load(method=render_config.load_method, execution_mode=execution_config.execution_mode) @@ -218,6 +248,8 @@ def __init__( "project_dir": execution_config.project_path, "profile_config": profile_config, "emit_datasets": render_config.emit_datasets, + "env": env_vars, + "vars": dbt_vars, } if execution_config.dbt_executable_path: task_args["dbt_executable_path"] = execution_config.dbt_executable_path diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index dab4e7405..92ef5e66f 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -4,6 +4,7 @@ import json import os import tempfile +import yaml from dataclasses import dataclass, field from pathlib import Path from subprocess import PIPE, Popen @@ -134,13 +135,14 @@ def __init__( render_config: RenderConfig = RenderConfig(), execution_config: ExecutionConfig = ExecutionConfig(), profile_config: ProfileConfig | None = None, - operator_args: dict[str, Any] | None = None, + # dbt_vars only supported for LegacyDbtProject + dbt_vars: dict[str, str] | None = None, ): self.project = project self.render_config = render_config self.profile_config = profile_config self.execution_config = execution_config - self.operator_args = operator_args or {} + self.dbt_vars = dbt_vars or {} def load( self, @@ -190,6 +192,9 @@ def run_dbt_ls( if self.render_config.select: ls_command.extend(["--select", *self.render_config.select]) + if self.project.dbt_vars: + ls_command.extend(["--vars", yaml.dump(self.project.dbt_vars)]) + ls_command.extend(self.local_flags) stdout = run_command(ls_command, tmp_dir, env_vars) @@ -235,7 +240,7 @@ def load_via_dbt_ls(self) -> None: create_symlinks(self.render_config.project_path, tmpdir_path, self.render_config.dbt_deps) with self.profile_config.ensure_profile(use_mock_values=True) as profile_values, environ( - self.render_config.env_vars + self.project.env_vars or self.render_config.env_vars or {} ): (profile_path, env_vars) = profile_values env = os.environ.copy() @@ -296,7 +301,7 @@ def load_via_custom_parser(self) -> None: dbt_root_path=self.render_config.project_path.parent.as_posix(), dbt_models_dir=self.project.models_path.stem if self.project.models_path else "models", dbt_seeds_dir=self.project.seeds_path.stem if self.project.seeds_path else "seeds", - operator_args=self.operator_args, + dbt_vars=self.dbt_vars, ) nodes = {} models = itertools.chain( diff --git a/cosmos/dbt/parser/project.py b/cosmos/dbt/parser/project.py index cadedef6c..de506e02d 100644 --- a/cosmos/dbt/parser/project.py +++ b/cosmos/dbt/parser/project.py @@ -130,7 +130,7 @@ class DbtModel: name: str type: DbtModelType path: Path - operator_args: Dict[str, Any] = field(default_factory=dict) + dbt_vars: Dict[str, str] = field(default_factory=dict) config: DbtModelConfig = field(default_factory=DbtModelConfig) def __post_init__(self) -> None: @@ -141,7 +141,6 @@ def __post_init__(self) -> None: return config = DbtModelConfig() - self.var_args: Dict[str, Any] = self.operator_args.get("vars", {}) code = self.path.read_text() if self.type == DbtModelType.DBT_SNAPSHOT: @@ -203,7 +202,7 @@ def _parse_jinja_ref_node(self, base_node: jinja2.nodes.Call) -> str | None: and isinstance(node.args[0], jinja2.nodes.Const) and node.node.name == "var" ): - value += self.var_args[node.args[0].value] + value += self.dbt_vars[node.args[0].value] # type: ignore elif isinstance(first_arg, jinja2.nodes.Const): # and add it to the config value = first_arg.value @@ -272,7 +271,7 @@ class LegacyDbtProject: snapshots_dir: Path = field(init=False) seeds_dir: Path = field(init=False) - operator_args: Dict[str, Any] = field(default_factory=dict) + dbt_vars: Dict[str, str] = field(default_factory=dict) def __post_init__(self) -> None: """ @@ -321,7 +320,7 @@ def _handle_csv_file(self, path: Path) -> None: name=model_name, type=DbtModelType.DBT_SEED, path=path, - operator_args=self.operator_args, + dbt_vars=self.dbt_vars, ) # add the model to the project self.seeds[model_name] = model @@ -339,7 +338,7 @@ def _handle_sql_file(self, path: Path) -> None: name=model_name, type=DbtModelType.DBT_MODEL, path=path, - operator_args=self.operator_args, + dbt_vars=self.dbt_vars, ) # add the model to the project self.models[model.name] = model @@ -349,7 +348,7 @@ def _handle_sql_file(self, path: Path) -> None: name=model_name, type=DbtModelType.DBT_SNAPSHOT, path=path, - operator_args=self.operator_args, + dbt_vars=self.dbt_vars, ) # add the snapshot to the project self.snapshots[model.name] = model @@ -410,7 +409,7 @@ def _extract_model_tests( name=f"{test}_{column['name']}_{model_name}", type=DbtModelType.DBT_TEST, path=path, - operator_args=self.operator_args, + dbt_vars=self.dbt_vars, config=DbtModelConfig(upstream_models=set({model_name})), ) tests[test_model.name] = test_model diff --git a/dev/dags/dbt/simple/models/top_animations.sql b/dev/dags/dbt/simple/models/top_animations.sql index 2b365b09c..cfae1c595 100644 --- a/dev/dags/dbt/simple/models/top_animations.sql +++ b/dev/dags/dbt/simple/models/top_animations.sql @@ -1,4 +1,8 @@ -{{ config(materialized='table') }} +{{ config( + materialized='table', + alias=var('animation_alias', 'top_animations') + ) +}} SELECT Title, Rating FROM {{ ref('movies_ratings_simplified') }} diff --git a/dev/dags/example_cosmos_sources.py b/dev/dags/example_cosmos_sources.py index 157b3adb3..0553b2f10 100644 --- a/dev/dags/example_cosmos_sources.py +++ b/dev/dags/example_cosmos_sources.py @@ -62,19 +62,24 @@ def convert_exposure(dag: DAG, task_group: TaskGroup, node: DbtNode, **kwargs): node_converters={ DbtResourceType("source"): convert_source, # known dbt node type to Cosmos (part of DbtResourceType) DbtResourceType("exposure"): convert_exposure, # dbt node type new to Cosmos (will be added to DbtResourceType) - }, + } +) + +# `ProjectConfig` can pass dbt variables and environment variables to dbt commands. Below is an example of +# passing a required env var for the profiles.yml file and a dbt variable that is used for rendering and +# executing dbt models. +project_config = ProjectConfig( + DBT_ROOT_PATH / "simple", env_vars={"DBT_SQLITE_PATH": DBT_SQLITE_PATH}, + dbt_vars={"animation_alias": "top_5_animated_movies"}, ) example_cosmos_sources = DbtDag( # dbt/cosmos-specific parameters - project_config=ProjectConfig( - DBT_ROOT_PATH / "simple", - ), + project_config=project_config, profile_config=profile_config, render_config=render_config, - operator_args={"env": {"DBT_SQLITE_PATH": DBT_SQLITE_PATH}}, # normal dag parameters schedule_interval="@daily", start_date=datetime(2023, 1, 1), diff --git a/docs/configuration/operator-args.rst b/docs/configuration/operator-args.rst index 9d533bf13..5ddbe6565 100644 --- a/docs/configuration/operator-args.rst +++ b/docs/configuration/operator-args.rst @@ -47,12 +47,12 @@ dbt-related - ``dbt_cmd_flags``: List of command flags to pass to ``dbt`` command, added after dbt subcommand - ``dbt_cmd_global_flags``: List of ``dbt`` `global flags `_ to be passed to the ``dbt`` command, before the subcommand - ``dbt_executable_path``: Path to dbt executable. -- ``env``: Declare, using a Python dictionary, values to be set as environment variables when running ``dbt`` commands. +- ``env``: (Deprecated since Cosmos 1.3 use ``ProjectConfig.env_vars`` instead) Declare, using a Python dictionary, values to be set as environment variables when running ``dbt`` commands. - ``fail_fast``: ``dbt`` exits immediately if ``dbt`` fails to process a resource. - ``models``: Specifies which nodes to include. - ``no_version_check``: If set, skip ensuring ``dbt``'s version matches the one specified in the ``dbt_project.yml``. - ``quiet``: run ``dbt`` in silent mode, only displaying its error logs. -- ``vars``: Supply variables to the project. This argument overrides variables defined in the ``dbt_project.yml``. +- ``vars``: (Deprecated since Cosmos 1.3 use ``ProjectConfig.dbt_vars`` instead) Supply variables to the project. This argument overrides variables defined in the ``dbt_project.yml``. - ``warn_error``: convert ``dbt`` warnings into errors. Airflow-related diff --git a/docs/configuration/project-config.rst b/docs/configuration/project-config.rst index c1d952f6e..c062a1de5 100644 --- a/docs/configuration/project-config.rst +++ b/docs/configuration/project-config.rst @@ -1,8 +1,8 @@ Project Config ================ -The ``cosmos.config.ProjectConfig`` allows you to specify information about where your dbt project is located. It -takes the following arguments: +The ``cosmos.config.ProjectConfig`` allows you to specify information about where your dbt project is located and project +variables that should be used for rendering and execution. It takes the following arguments: - ``dbt_project_path``: The full path to your dbt project. This directory should have a ``dbt_project.yml`` file - ``models_relative_path``: The path to your models directory, relative to the ``dbt_project_path``. This defaults to @@ -16,7 +16,13 @@ takes the following arguments: - ``project_name`` : The name of the project. If ``dbt_project_path`` is provided, the ``project_name`` defaults to the folder name containing ``dbt_project.yml``. If ``dbt_project_path`` is not provided, and ``manifest_path`` is provided, ``project_name`` is required as the name can not be inferred from ``dbt_project_path`` - +- ``dbt_vars``: (new in v1.3) A dictionary of dbt variables for the project rendering and execution. This argument overrides variables + defined in the dbt_project.yml file. The dictionary of variables is dumped to a yaml string and passed to dbt commands + as the --vars argument. Variables are only supported for rendering when using ``RenderConfig.LoadMode.DBT_LS`` and + ``RenderConfig.LoadMode.CUSTOM`` load mode. Variables using `Airflow templating `_ + will only be rendered at execution time, not at render time. +- ``env_vars``: (new in v1.3) A dictionary of environment variables used for rendering and execution. Rendering with + env vars is only supported when using ``RenderConfig.LoadMode.DBT_LS`` load mode. Project Config Example ---------------------- @@ -31,4 +37,10 @@ Project Config Example seeds_relative_path="data", snapshots_relative_path="snapshots", manifest_path="/path/to/manifests", + env_vars={"MY_ENV_VAR": "my_env_value"}, + dbt_vars={ + "my_dbt_var": "my_value", + "start_time": "{{ data_interval_start.strftime('%Y%m%d%H%M%S') }}", + "end_time": "{{ data_interval_end.strftime('%Y%m%d%H%M%S') }}", + }, ) diff --git a/docs/configuration/render-config.rst b/docs/configuration/render-config.rst index 5e1c23824..1028ecf62 100644 --- a/docs/configuration/render-config.rst +++ b/docs/configuration/render-config.rst @@ -14,7 +14,7 @@ The ``RenderConfig`` class takes the following arguments: - ``dbt_deps``: A Boolean to run dbt deps when using dbt ls for dag parsing. Default True - ``node_converters``: a dictionary mapping a ``DbtResourceType`` into a callable. Users can control how to render dbt nodes in Airflow. Only supported when using ``load_method=LoadMode.DBT_MANIFEST`` or ``LoadMode.DBT_LS``. Find more information below. - ``dbt_executable_path``: The path to the dbt executable for dag generation. Defaults to dbt if available on the path. -- ``env_vars``: A dictionary of environment variables for rendering. Only supported when using ``load_method=LoadMode.DBT_LS``. +- ``env_vars``: (available in v1.2.5, use``ProjectConfig.env_vars`` for v1.3.0 onwards) A dictionary of environment variables for rendering. Only supported when using ``load_method=LoadMode.DBT_LS``. - ``dbt_project_path``: Configures the DBT project location accessible on their airflow controller for DAG rendering - Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM`` Customizing how nodes are rendered (experimental) diff --git a/tests/dbt/parser/test_project.py b/tests/dbt/parser/test_project.py index 4f13a3eb3..31fe7e18d 100644 --- a/tests/dbt/parser/test_project.py +++ b/tests/dbt/parser/test_project.py @@ -219,6 +219,6 @@ def test_dbtmodelconfig_with_vars(tmp_path): name="some_name", type=DbtModelType.DBT_MODEL, path=path_with_sources, - operator_args={"vars": {"country_code": "us"}}, + dbt_vars={"country_code": "us"}, ) assert "stg_customers_us" in dbt_model.config.upstream_models diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 58a7ef742..3b80424b6 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -2,6 +2,7 @@ import tempfile from pathlib import Path from unittest.mock import patch +import yaml import pytest @@ -767,3 +768,88 @@ def test_parse_dbt_ls_output(): nodes = parse_dbt_ls_output(Path("fake-project"), fake_ls_stdout) assert expected_nodes == nodes + + +@patch("cosmos.dbt.graph.Popen") +@patch("cosmos.dbt.graph.DbtGraph.update_node_dependency") +@patch("cosmos.config.RenderConfig.validate_dbt_command") +def test_load_via_dbt_ls_project_config_env_vars(mock_validate, mock_update_nodes, mock_popen, tmp_dbt_project_dir): + """Tests that the dbt ls command in the subprocess has the project config env vars set.""" + mock_popen().communicate.return_value = ("", "") + mock_popen().returncode = 0 + env_vars = {"MY_ENV_VAR": "my_value"} + project_config = ProjectConfig(env_vars=env_vars) + render_config = RenderConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", + ) + execution_config = ExecutionConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) + dbt_graph = DbtGraph( + project=project_config, + render_config=render_config, + execution_config=execution_config, + profile_config=profile_config, + ) + dbt_graph.load_via_dbt_ls() + + assert "MY_ENV_VAR" in mock_popen.call_args.kwargs["env"] + assert mock_popen.call_args.kwargs["env"]["MY_ENV_VAR"] == "my_value" + + +@patch("cosmos.dbt.graph.Popen") +@patch("cosmos.dbt.graph.DbtGraph.update_node_dependency") +@patch("cosmos.config.RenderConfig.validate_dbt_command") +def test_load_via_dbt_ls_project_config_dbt_vars(mock_validate, mock_update_nodes, mock_popen, tmp_dbt_project_dir): + """Tests that the dbt ls command in the subprocess has "--vars" with the project config dbt_vars.""" + mock_popen().communicate.return_value = ("", "") + mock_popen().returncode = 0 + dbt_vars = {"my_var1": "my_value1", "my_var2": "my_value2"} + project_config = ProjectConfig(dbt_vars=dbt_vars) + render_config = RenderConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", + ) + execution_config = ExecutionConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) + dbt_graph = DbtGraph( + project=project_config, + render_config=render_config, + execution_config=execution_config, + profile_config=profile_config, + ) + dbt_graph.load_via_dbt_ls() + ls_command = mock_popen.call_args.args[0] + assert "--vars" in ls_command + assert ls_command[ls_command.index("--vars") + 1] == yaml.dump(dbt_vars) + + +@pytest.mark.sqlite +@pytest.mark.integration +def test_load_via_dbt_ls_with_project_config_vars(): + """ + Integration that tests that the dbt ls command is successful and that the node affected by the dbt_vars is + rendered correctly. + """ + project_name = "simple" + dbt_graph = DbtGraph( + project=ProjectConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name, + env_vars={"DBT_SQLITE_PATH": str(DBT_PROJECTS_ROOT_DIR / "data")}, + dbt_vars={"animation_alias": "top_5_animated_movies"}, + ), + render_config=RenderConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name, + dbt_deps=False, + ), + execution_config=ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name), + profile_config=ProfileConfig( + profile_name="simple", + target_name="dev", + profiles_yml_filepath=(DBT_PROJECTS_ROOT_DIR / project_name / "profiles.yml"), + ), + ) + dbt_graph.load_via_dbt_ls() + assert dbt_graph.nodes["model.simple.top_animations"].config["alias"] == "top_5_animated_movies" diff --git a/tests/test_config.py b/tests/test_config.py index 578a68f76..734303a3e 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -174,3 +174,9 @@ def test_render_config_uses_default_if_exists(mock_which): render_config = RenderConfig(dbt_executable_path="user-dbt") render_config.validate_dbt_command("fallback-dbt-path") assert render_config.dbt_executable_path == "user-dbt" + + +def test_render_config_env_vars_deprecated(): + """RenderConfig.env_vars is deprecated since Cosmos 1.3, should warn user.""" + with pytest.deprecated_call(): + RenderConfig(env_vars={"VAR": "value"}) diff --git a/tests/test_converter.py b/tests/test_converter.py index c04da2c3a..d84249aae 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -1,13 +1,13 @@ from datetime import datetime from pathlib import Path -from unittest.mock import patch +from unittest.mock import patch, MagicMock from cosmos.profiles.postgres import PostgresUserPasswordProfileMapping import pytest from airflow.models import DAG from cosmos.converter import DbtToAirflowConverter, validate_arguments, validate_initial_user_config -from cosmos.constants import DbtResourceType, ExecutionMode +from cosmos.constants import DbtResourceType, ExecutionMode, LoadMode from cosmos.config import ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig, CosmosConfigException from cosmos.dbt.graph import DbtNode from cosmos.exceptions import CosmosValueError @@ -44,7 +44,7 @@ def test_validate_initial_user_config_no_profile(execution_mode): profile_config = None project_config = ProjectConfig() with pytest.raises(CosmosValueError) as err_info: - validate_initial_user_config(execution_config, profile_config, project_config, None) + validate_initial_user_config(execution_config, profile_config, project_config, None, {}) err_msg = f"The profile_config is mandatory when using {execution_mode}" assert err_info.value.args[0] == err_msg @@ -57,7 +57,55 @@ def test_validate_initial_user_config_expects_profile(execution_mode): execution_config = ExecutionConfig(execution_mode=execution_mode) profile_config = None project_config = ProjectConfig() - assert validate_initial_user_config(execution_config, profile_config, project_config, None) is None + assert validate_initial_user_config(execution_config, profile_config, project_config, None, {}) is None + + +@pytest.mark.parametrize("operator_args", [{"env": {"key": "value"}}, {"vars": {"key": "value"}}]) +def test_validate_user_config_operator_args_deprecated(operator_args): + """Deprecating warnings should be raised when using operator_args with "vars" or "env".""" + project_config = ProjectConfig() + execution_config = ExecutionConfig() + render_config = RenderConfig() + profile_config = MagicMock() + + with pytest.deprecated_call(): + validate_initial_user_config(execution_config, profile_config, project_config, render_config, operator_args) + + +@pytest.mark.parametrize("project_config_arg, operator_arg", [("dbt_vars", "vars"), ("env_vars", "env")]) +def test_validate_user_config_fails_project_config_and_operator_args_overlap(project_config_arg, operator_arg): + """ + The validation should fail if a user specifies both a ProjectConfig and operator_args with dbt_vars/vars or env_vars/env + that overlap. + """ + project_config = ProjectConfig( + project_name="fake-project", + dbt_project_path="/some/project/path", + **{project_config_arg: {"key": "value"}}, # type: ignore + ) + execution_config = ExecutionConfig() + render_config = RenderConfig() + profile_config = MagicMock() + operator_args = {operator_arg: {"key": "value"}} + + expected_error_msg = f"ProjectConfig.{project_config_arg} and operator_args with '{operator_arg}' are mutually exclusive and only one can be used." + with pytest.raises(CosmosValueError, match=expected_error_msg): + validate_initial_user_config(execution_config, profile_config, project_config, render_config, operator_args) + + +def test_validate_user_config_fails_project_config_render_config_env_vars(): + """ + The validation should fail if a user specifies both ProjectConfig.env_vars and RenderConfig.env_vars. + """ + project_config = ProjectConfig(env_vars={"key": "value"}) + execution_config = ExecutionConfig() + render_config = RenderConfig(env_vars={"key": "value"}) + profile_config = MagicMock() + operator_args = {} + + expected_error_match = "Both ProjectConfig.env_vars and RenderConfig.env_vars were provided.*" + with pytest.raises(CosmosValueError, match=expected_error_match): + validate_initial_user_config(execution_config, profile_config, project_config, render_config, operator_args) def test_validate_arguments_schema_in_task_args(): @@ -327,3 +375,33 @@ def test_converter_fails_no_manifest_no_render_config(mock_load_dbt_graph, execu err_info.value.args[0] == "RenderConfig.dbt_project_path is required for rendering an airflow DAG from a DBT Graph if no manifest is provided." ) + + +@patch("cosmos.config.ProjectConfig.validate_project") +@patch("cosmos.converter.build_airflow_graph") +@patch("cosmos.dbt.graph.LegacyDbtProject") +def test_converter_project_config_dbt_vars_with_custom_load_mode( + mock_legacy_dbt_project, mock_validate_project, mock_build_airflow_graph +): + """Tests that if ProjectConfig.dbt_vars are used with RenderConfig.load_method of "custom" that the + expected dbt_vars are passed to LegacyDbtProject. + """ + project_config = ProjectConfig( + project_name="fake-project", dbt_project_path="/some/project/path", dbt_vars={"key": "value"} + ) + execution_config = ExecutionConfig() + render_config = RenderConfig(load_method=LoadMode.CUSTOM) + profile_config = MagicMock() + + with DAG("test-id", start_date=datetime(2022, 1, 1)) as dag: + DbtToAirflowConverter( + dag=dag, + nodes=nodes, + project_config=project_config, + profile_config=profile_config, + execution_config=execution_config, + render_config=render_config, + operator_args={}, + ) + _, kwargs = mock_legacy_dbt_project.call_args + assert kwargs["dbt_vars"] == {"key": "value"}