Skip to content

Commit

Permalink
Expose environment variables and dbt variables in ProjectConfig (#735)
Browse files Browse the repository at this point in the history
## Description

Currently users have to specify environment variables in both
`RenderConfig` and `operator_args` for the dbt dag so that they're used
during rendering and execution. dbt variables cannot currently be used
in rendering, only during execution in `operator_args`. This PR exposes
`env_vars` and `dbt_vars` in `ProjectConfig` and uses the dbt variables
in dbt ls load mode.

Updates in this PR:

- Deprecates `operator_args` "env" and "var", raising warnings that they
will be removed in Cosmos 2.x
- Deprecates `RenderConfig.env_vars` raising warnings that it will be
removed in Cosmos 2.x
- Adds both `dbt_vars` and `env_vars` within `ProjectConfig`
  - dbt variables are used in dbt ls load method
- Raises an exception if **both** operator_args and ProjectConfig
variables are used.
- Updates docs and example DAGs to use ProjectConfig args.

## Related Issue(s)

Closes #712 
Closes #544

## Breaking Change?

None

## Checklist

- [x] I have made corresponding changes to the documentation (if
required)
- [x] I have added tests that prove my fix is effective or that my
feature works

---------

Co-authored-by: Tatiana Al-Chueyr <[email protected]>
  • Loading branch information
jbandoro and tatiana authored Dec 7, 2023
1 parent b1cdc3c commit 7b3e8ab
Show file tree
Hide file tree
Showing 14 changed files with 281 additions and 37 deletions.
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ repos:
hooks:
- id: mypy
name: mypy-python
args: [--config-file, "./pyproject.toml"]
additional_dependencies:
[
types-PyYAML,
Expand Down
20 changes: 18 additions & 2 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import tempfile
from dataclasses import InitVar, dataclass, field
from pathlib import Path
import warnings
from typing import Any, Iterator, Callable

from cosmos.constants import DbtResourceType, TestBehavior, ExecutionMode, LoadMode, TestIndirectSelection
Expand Down Expand Up @@ -42,7 +43,7 @@ class RenderConfig:
:param dbt_deps: Configure to run dbt deps when using dbt ls for dag parsing
:param node_converters: a dictionary mapping a ``DbtResourceType`` into a callable. Users can control how to render dbt nodes in Airflow. Only supported when using ``load_method=LoadMode.DBT_MANIFEST`` or ``LoadMode.DBT_LS``.
:param dbt_executable_path: The path to the dbt executable for dag generation. Defaults to dbt if available on the path.
:param env_vars: A dictionary of environment variables for rendering. Only supported when using ``LoadMode.DBT_LS``.
:param env_vars: (Deprecated since Cosmos 1.3 use ProjectConfig.env_vars) A dictionary of environment variables for rendering. Only supported when using ``LoadMode.DBT_LS``.
:param dbt_project_path Configures the DBT project location accessible on the airflow controller for DAG rendering. Mutually Exclusive with ProjectConfig.dbt_project_path. Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM``.
"""

Expand All @@ -54,12 +55,17 @@ class RenderConfig:
dbt_deps: bool = True
node_converters: dict[DbtResourceType, Callable[..., Any]] | None = None
dbt_executable_path: str | Path = get_system_dbt()
env_vars: dict[str, str] = field(default_factory=dict)
env_vars: dict[str, str] | None = None
dbt_project_path: InitVar[str | Path | None] = None

project_path: Path | None = field(init=False)

def __post_init__(self, dbt_project_path: str | Path | None) -> None:
if self.env_vars:
warnings.warn(
"RenderConfig.env_vars is deprecated since Cosmos 1.3 and will be removed in Cosmos 2.0. Use ProjectConfig.env_vars instead.",
DeprecationWarning,
)
self.project_path = Path(dbt_project_path) if dbt_project_path else None

def validate_dbt_command(self, fallback_cmd: str | Path = "") -> None:
Expand Down Expand Up @@ -96,6 +102,11 @@ class ProjectConfig:
:param manifest_path: The absolute path to the dbt manifest file. Defaults to None
:param project_name: Allows the user to define the project name.
Required if dbt_project_path is not defined. Defaults to the folder name of dbt_project_path.
:param env_vars: Dictionary of environment variables that are used for both rendering and execution. Rendering with
env vars is only supported when using ``RenderConfig.LoadMode.DBT_LS`` load mode.
:param dbt_vars: Dictionary of dbt variables for the project. This argument overrides variables defined in your dbt_project.yml
file. The dictionary is dumped to a yaml string and passed to dbt commands as the --vars argument. Variables are only
supported for rendering when using ``RenderConfig.LoadMode.DBT_LS`` and ``RenderConfig.LoadMode.CUSTOM`` load mode.
"""

dbt_project_path: Path | None = None
Expand All @@ -113,6 +124,8 @@ def __init__(
snapshots_relative_path: str | Path = "snapshots",
manifest_path: str | Path | None = None,
project_name: str | None = None,
env_vars: dict[str, str] | None = None,
dbt_vars: dict[str, str] | None = None,
):
# Since we allow dbt_project_path to be defined in ExecutionConfig and RenderConfig
# dbt_project_path may not always be defined here.
Expand All @@ -136,6 +149,9 @@ def __init__(
if manifest_path:
self.manifest_path = Path(manifest_path)

self.env_vars = env_vars
self.dbt_vars = dbt_vars

def validate_project(self) -> None:
"""
Validates necessary context is present for a project.
Expand Down
44 changes: 38 additions & 6 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import inspect
from typing import Any, Callable
import copy
from warnings import warn

from airflow.models.dag import DAG
from airflow.utils.task_group import TaskGroup
Expand Down Expand Up @@ -96,10 +97,11 @@ def validate_arguments(


def validate_initial_user_config(
execution_config: ExecutionConfig | None,
execution_config: ExecutionConfig,
profile_config: ProfileConfig | None,
project_config: ProjectConfig,
render_config: RenderConfig | None,
render_config: RenderConfig,
operator_args: dict[str, Any],
):
"""
Validates if the user set the fields as expected.
Expand All @@ -108,6 +110,7 @@ def validate_initial_user_config(
:param profile_config: Configuration related to dbt database configuration (profile)
:param project_config: Configuration related to the overall dbt project
:param render_config: Configuration related to how to convert the dbt workflow into an Airflow DAG
:param operator_args: Arguments to pass to the underlying operators.
"""
if profile_config is None and execution_config.execution_mode not in (
ExecutionMode.KUBERNETES,
Expand All @@ -123,6 +126,33 @@ def validate_initial_user_config(
+ "If using RenderConfig.dbt_project_path or ExecutionConfig.dbt_project_path, ProjectConfig.dbt_project_path should be None"
)

# Cosmos 2.0 will remove the ability to pass in operator_args with 'env' and 'vars' in place of ProjectConfig.env_vars and
# ProjectConfig.dbt_vars.
if "env" in operator_args:
warn(
"operator_args with 'env' is deprecated since Cosmos 1.3 and will be removed in Cosmos 2.0. Use ProjectConfig.env_vars instead.",
DeprecationWarning,
)
if project_config.env_vars:
raise CosmosValueError(
"ProjectConfig.env_vars and operator_args with 'env' are mutually exclusive and only one can be used."
)
if "vars" in operator_args:
warn(
"operator_args with 'vars' is deprecated since Cosmos 1.3 and will be removed in Cosmos 2.0. Use ProjectConfig.vars instead.",
DeprecationWarning,
)
if project_config.dbt_vars:
raise CosmosValueError(
"ProjectConfig.dbt_vars and operator_args with 'vars' are mutually exclusive and only one can be used."
)
# Cosmos 2.0 will remove the ability to pass RenderConfig.env_vars in place of ProjectConfig.env_vars, check that both are not set.
if project_config.env_vars and render_config.env_vars:
raise CosmosValueError(
"Both ProjectConfig.env_vars and RenderConfig.env_vars were provided. RenderConfig.env_vars is deprecated since Cosmos 1.3, "
"please use ProjectConfig.env_vars instead."
)


def validate_adapted_user_config(
execution_config: ExecutionConfig | None, project_config: ProjectConfig, render_config: RenderConfig | None
Expand Down Expand Up @@ -182,7 +212,7 @@ def __init__(
render_config = render_config or RenderConfig()
operator_args = operator_args or {}

validate_initial_user_config(execution_config, profile_config, project_config, render_config)
validate_initial_user_config(execution_config, profile_config, project_config, render_config, operator_args)

# If we are using the old interface, we should migrate it to the new interface
# This is safe to do now since we have validated which config interface we're using
Expand All @@ -191,8 +221,8 @@ def __init__(

validate_adapted_user_config(execution_config, project_config, render_config)

if not operator_args:
operator_args = {}
env_vars = project_config.env_vars or operator_args.pop("env", None)
dbt_vars = project_config.dbt_vars or operator_args.pop("vars", None)

# Previously, we were creating a cosmos.dbt.project.DbtProject
# DbtProject has now been replaced with ProjectConfig directly
Expand All @@ -209,7 +239,7 @@ def __init__(
render_config=render_config,
execution_config=execution_config,
profile_config=profile_config,
operator_args=operator_args,
dbt_vars=dbt_vars,
)
dbt_graph.load(method=render_config.load_method, execution_mode=execution_config.execution_mode)

Expand All @@ -218,6 +248,8 @@ def __init__(
"project_dir": execution_config.project_path,
"profile_config": profile_config,
"emit_datasets": render_config.emit_datasets,
"env": env_vars,
"vars": dbt_vars,
}
if execution_config.dbt_executable_path:
task_args["dbt_executable_path"] = execution_config.dbt_executable_path
Expand Down
13 changes: 9 additions & 4 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import json
import os
import tempfile
import yaml
from dataclasses import dataclass, field
from pathlib import Path
from subprocess import PIPE, Popen
Expand Down Expand Up @@ -134,13 +135,14 @@ def __init__(
render_config: RenderConfig = RenderConfig(),
execution_config: ExecutionConfig = ExecutionConfig(),
profile_config: ProfileConfig | None = None,
operator_args: dict[str, Any] | None = None,
# dbt_vars only supported for LegacyDbtProject
dbt_vars: dict[str, str] | None = None,
):
self.project = project
self.render_config = render_config
self.profile_config = profile_config
self.execution_config = execution_config
self.operator_args = operator_args or {}
self.dbt_vars = dbt_vars or {}

def load(
self,
Expand Down Expand Up @@ -190,6 +192,9 @@ def run_dbt_ls(
if self.render_config.select:
ls_command.extend(["--select", *self.render_config.select])

if self.project.dbt_vars:
ls_command.extend(["--vars", yaml.dump(self.project.dbt_vars)])

ls_command.extend(self.local_flags)

stdout = run_command(ls_command, tmp_dir, env_vars)
Expand Down Expand Up @@ -235,7 +240,7 @@ def load_via_dbt_ls(self) -> None:
create_symlinks(self.render_config.project_path, tmpdir_path, self.render_config.dbt_deps)

with self.profile_config.ensure_profile(use_mock_values=True) as profile_values, environ(
self.render_config.env_vars
self.project.env_vars or self.render_config.env_vars or {}
):
(profile_path, env_vars) = profile_values
env = os.environ.copy()
Expand Down Expand Up @@ -296,7 +301,7 @@ def load_via_custom_parser(self) -> None:
dbt_root_path=self.render_config.project_path.parent.as_posix(),
dbt_models_dir=self.project.models_path.stem if self.project.models_path else "models",
dbt_seeds_dir=self.project.seeds_path.stem if self.project.seeds_path else "seeds",
operator_args=self.operator_args,
dbt_vars=self.dbt_vars,
)
nodes = {}
models = itertools.chain(
Expand Down
15 changes: 7 additions & 8 deletions cosmos/dbt/parser/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class DbtModel:
name: str
type: DbtModelType
path: Path
operator_args: Dict[str, Any] = field(default_factory=dict)
dbt_vars: Dict[str, str] = field(default_factory=dict)
config: DbtModelConfig = field(default_factory=DbtModelConfig)

def __post_init__(self) -> None:
Expand All @@ -141,7 +141,6 @@ def __post_init__(self) -> None:
return

config = DbtModelConfig()
self.var_args: Dict[str, Any] = self.operator_args.get("vars", {})
code = self.path.read_text()

if self.type == DbtModelType.DBT_SNAPSHOT:
Expand Down Expand Up @@ -203,7 +202,7 @@ def _parse_jinja_ref_node(self, base_node: jinja2.nodes.Call) -> str | None:
and isinstance(node.args[0], jinja2.nodes.Const)
and node.node.name == "var"
):
value += self.var_args[node.args[0].value]
value += self.dbt_vars[node.args[0].value] # type: ignore
elif isinstance(first_arg, jinja2.nodes.Const):
# and add it to the config
value = first_arg.value
Expand Down Expand Up @@ -272,7 +271,7 @@ class LegacyDbtProject:
snapshots_dir: Path = field(init=False)
seeds_dir: Path = field(init=False)

operator_args: Dict[str, Any] = field(default_factory=dict)
dbt_vars: Dict[str, str] = field(default_factory=dict)

def __post_init__(self) -> None:
"""
Expand Down Expand Up @@ -321,7 +320,7 @@ def _handle_csv_file(self, path: Path) -> None:
name=model_name,
type=DbtModelType.DBT_SEED,
path=path,
operator_args=self.operator_args,
dbt_vars=self.dbt_vars,
)
# add the model to the project
self.seeds[model_name] = model
Expand All @@ -339,7 +338,7 @@ def _handle_sql_file(self, path: Path) -> None:
name=model_name,
type=DbtModelType.DBT_MODEL,
path=path,
operator_args=self.operator_args,
dbt_vars=self.dbt_vars,
)
# add the model to the project
self.models[model.name] = model
Expand All @@ -349,7 +348,7 @@ def _handle_sql_file(self, path: Path) -> None:
name=model_name,
type=DbtModelType.DBT_SNAPSHOT,
path=path,
operator_args=self.operator_args,
dbt_vars=self.dbt_vars,
)
# add the snapshot to the project
self.snapshots[model.name] = model
Expand Down Expand Up @@ -410,7 +409,7 @@ def _extract_model_tests(
name=f"{test}_{column['name']}_{model_name}",
type=DbtModelType.DBT_TEST,
path=path,
operator_args=self.operator_args,
dbt_vars=self.dbt_vars,
config=DbtModelConfig(upstream_models=set({model_name})),
)
tests[test_model.name] = test_model
Expand Down
6 changes: 5 additions & 1 deletion dev/dags/dbt/simple/models/top_animations.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
{{ config(materialized='table') }}
{{ config(
materialized='table',
alias=var('animation_alias', 'top_animations')
)
}}

SELECT Title, Rating
FROM {{ ref('movies_ratings_simplified') }}
Expand Down
15 changes: 10 additions & 5 deletions dev/dags/example_cosmos_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,19 +62,24 @@ def convert_exposure(dag: DAG, task_group: TaskGroup, node: DbtNode, **kwargs):
node_converters={
DbtResourceType("source"): convert_source, # known dbt node type to Cosmos (part of DbtResourceType)
DbtResourceType("exposure"): convert_exposure, # dbt node type new to Cosmos (will be added to DbtResourceType)
},
}
)

# `ProjectConfig` can pass dbt variables and environment variables to dbt commands. Below is an example of
# passing a required env var for the profiles.yml file and a dbt variable that is used for rendering and
# executing dbt models.
project_config = ProjectConfig(
DBT_ROOT_PATH / "simple",
env_vars={"DBT_SQLITE_PATH": DBT_SQLITE_PATH},
dbt_vars={"animation_alias": "top_5_animated_movies"},
)


example_cosmos_sources = DbtDag(
# dbt/cosmos-specific parameters
project_config=ProjectConfig(
DBT_ROOT_PATH / "simple",
),
project_config=project_config,
profile_config=profile_config,
render_config=render_config,
operator_args={"env": {"DBT_SQLITE_PATH": DBT_SQLITE_PATH}},
# normal dag parameters
schedule_interval="@daily",
start_date=datetime(2023, 1, 1),
Expand Down
4 changes: 2 additions & 2 deletions docs/configuration/operator-args.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ dbt-related
- ``dbt_cmd_flags``: List of command flags to pass to ``dbt`` command, added after dbt subcommand
- ``dbt_cmd_global_flags``: List of ``dbt`` `global flags <https://docs.getdbt.com/reference/global-configs/about-global-configs>`_ to be passed to the ``dbt`` command, before the subcommand
- ``dbt_executable_path``: Path to dbt executable.
- ``env``: Declare, using a Python dictionary, values to be set as environment variables when running ``dbt`` commands.
- ``env``: (Deprecated since Cosmos 1.3 use ``ProjectConfig.env_vars`` instead) Declare, using a Python dictionary, values to be set as environment variables when running ``dbt`` commands.
- ``fail_fast``: ``dbt`` exits immediately if ``dbt`` fails to process a resource.
- ``models``: Specifies which nodes to include.
- ``no_version_check``: If set, skip ensuring ``dbt``'s version matches the one specified in the ``dbt_project.yml``.
- ``quiet``: run ``dbt`` in silent mode, only displaying its error logs.
- ``vars``: Supply variables to the project. This argument overrides variables defined in the ``dbt_project.yml``.
- ``vars``: (Deprecated since Cosmos 1.3 use ``ProjectConfig.dbt_vars`` instead) Supply variables to the project. This argument overrides variables defined in the ``dbt_project.yml``.
- ``warn_error``: convert ``dbt`` warnings into errors.

Airflow-related
Expand Down
18 changes: 15 additions & 3 deletions docs/configuration/project-config.rst
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
Project Config
================

The ``cosmos.config.ProjectConfig`` allows you to specify information about where your dbt project is located. It
takes the following arguments:
The ``cosmos.config.ProjectConfig`` allows you to specify information about where your dbt project is located and project
variables that should be used for rendering and execution. It takes the following arguments:

- ``dbt_project_path``: The full path to your dbt project. This directory should have a ``dbt_project.yml`` file
- ``models_relative_path``: The path to your models directory, relative to the ``dbt_project_path``. This defaults to
Expand All @@ -16,7 +16,13 @@ takes the following arguments:
- ``project_name`` : The name of the project. If ``dbt_project_path`` is provided, the ``project_name`` defaults to the
folder name containing ``dbt_project.yml``. If ``dbt_project_path`` is not provided, and ``manifest_path`` is provided,
``project_name`` is required as the name can not be inferred from ``dbt_project_path``

- ``dbt_vars``: (new in v1.3) A dictionary of dbt variables for the project rendering and execution. This argument overrides variables
defined in the dbt_project.yml file. The dictionary of variables is dumped to a yaml string and passed to dbt commands
as the --vars argument. Variables are only supported for rendering when using ``RenderConfig.LoadMode.DBT_LS`` and
``RenderConfig.LoadMode.CUSTOM`` load mode. Variables using `Airflow templating <https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html#templates-reference>`_
will only be rendered at execution time, not at render time.
- ``env_vars``: (new in v1.3) A dictionary of environment variables used for rendering and execution. Rendering with
env vars is only supported when using ``RenderConfig.LoadMode.DBT_LS`` load mode.

Project Config Example
----------------------
Expand All @@ -31,4 +37,10 @@ Project Config Example
seeds_relative_path="data",
snapshots_relative_path="snapshots",
manifest_path="/path/to/manifests",
env_vars={"MY_ENV_VAR": "my_env_value"},
dbt_vars={
"my_dbt_var": "my_value",
"start_time": "{{ data_interval_start.strftime('%Y%m%d%H%M%S') }}",
"end_time": "{{ data_interval_end.strftime('%Y%m%d%H%M%S') }}",
},
)
Loading

0 comments on commit 7b3e8ab

Please sign in to comment.