diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 590b864b0..ecade7112 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,16 +1,36 @@ Changelog ========= -1.3.0a1 (2023-10-26) +1.3.0a2 (2023-11-23) -------------------- Features -* Add ``ProfileMapping`` for Vertica by @perttus in #540 +* Add ``ProfileMapping`` for Vertica by @perttus in #540 and #688 * Add ``ProfileMapping`` for Snowflake encrypted private key path by @ivanstillfront in #608 +* Add support for Snowflake encrypted private key environment variable by @DanMawdsleyBA in #649 * Add ``DbtDocsGCSOperator`` for uploading dbt docs to GCS by @jbandoro in #616 +1.2.5 (2023-11-23) +------------------ + +Bug fixes + +* Fix running models that use alias while supporting dbt versions by @binhnq94 in #662 +* Make ``profiles_yml_path`` optional for ``ExecutionMode.DOCKER`` and ``KUBERNETES`` by @MrBones757 in #681 +* Prevent overriding dbt profile fields with profile args of "type" or "method" by @jbandoro in #702 +* Fix ``LoadMode.DBT_LS`` fail when dbt outputs ``WarnErrorOptions`` by @adammarples in #692 +* Add support for env vars in ``RenderConfig`` for dbt ls parsing by @jbandoro in #690 +* Add support for Kubernetes ``on_warning_callback`` by @david-mag in #673 +* Fix ``ExecutionConfig.dbt_executable_path`` to use ``default_factory`` by @jbandoro in #678 + +Others + +* Docs fix: example DAG in the README and docs/index by @tatiana in #705 +* Docs improvement: highlight DAG examples in README by @iancmoritz and @jlaneve in #695 + + 1.2.4 (2023-11-14) ------------------ @@ -23,8 +43,8 @@ Bug fixes Others -* Docs fix: add execution config to MWAA code example by @ugmuka in #674 - +* Docs: add execution config to MWAA code example by @ugmuka in #674 +* Docs: highlight DAG examples in docs by @iancmoritz and @jlaneve in #695 1.2.3 (2023-11-09) ------------------ diff --git a/README.rst b/README.rst index e4f69af63..041c23f3f 100644 --- a/README.rst +++ b/README.rst @@ -31,57 +31,29 @@ Run your dbt Core projects as `Apache Airflow `_ DA Quickstart __________ -Check out the Quickstart guide on our `docs `_. +Check out the Quickstart guide on our `docs `_. See more examples at `/dev/dags `_ and at the `cosmos-demo repo `_. Example Usage ___________________ -You can render an Airflow Task Group using the ``DbtTaskGroup`` class. Here's an example with the `jaffle_shop project `_: +You can render a Cosmos Airflow DAG using the ``DbtDag`` class. Here's an example with the `jaffle_shop project `_: +.. + This renders on Github but not Sphinx: -.. code-block:: python +https://github.com/astronomer/astronomer-cosmos/blob/24aa38e528e299ef51ca6baf32f5a6185887d432/dev/dags/basic_cosmos_dag.py#L1-L42 - from pendulum import datetime +This will generate an Airflow DAG that looks like this: - from airflow import DAG - from airflow.operators.empty import EmptyOperator - from cosmos import DbtTaskGroup, ProfileConfig, ProjectConfig - from cosmos.profiles import PostgresUserPasswordProfileMapping +.. figure:: /docs/_static/jaffle_shop_dag.png - profile_config = ProfileConfig( - profile_name="default", - target_name="dev", - profile_mapping=PostgresUserPasswordProfileMapping( - conn_id="airflow_db", - profile_args={"schema": "public"}, - ), - ) - - with DAG( - dag_id="extract_dag", - start_date=datetime(2022, 11, 27), - schedule_interval="@daily", - ): - e1 = EmptyOperator(task_id="pre_dbt") - - dbt_tg = DbtTaskGroup( - project_config=ProjectConfig("jaffle_shop"), - profile_config=profile_config, - ) - - e2 = EmptyOperator(task_id="post_dbt") - - e1 >> dbt_tg >> e2 - -This will generate an Airflow Task Group that looks like this: - -.. figure:: /docs/_static/jaffle_shop_task_group.png Community _________ - Join us on the Airflow `Slack `_ at #airflow-dbt + Changelog _________ @@ -89,6 +61,7 @@ We follow `Semantic Versioning `_ for releases. Check `CHANGELOG.rst `_ for the latest changes. + Contributing Guide __________________ diff --git a/cosmos/__init__.py b/cosmos/__init__.py index f4af1bd63..f1f204634 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -5,7 +5,8 @@ Contains dags, task groups, and operators. """ -__version__ = "1.2.4" +__version__ = "1.3.0a2" + from cosmos.airflow.dag import DbtDag from cosmos.airflow.task_group import DbtTaskGroup diff --git a/cosmos/config.py b/cosmos/config.py index 5c64193c1..40756d2bb 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -42,6 +42,7 @@ class RenderConfig: :param dbt_deps: Configure to run dbt deps when using dbt ls for dag parsing :param node_converters: a dictionary mapping a ``DbtResourceType`` into a callable. Users can control how to render dbt nodes in Airflow. Only supported when using ``load_method=LoadMode.DBT_MANIFEST`` or ``LoadMode.DBT_LS``. :param dbt_executable_path: The path to the dbt executable for dag generation. Defaults to dbt if available on the path. + :param env_vars: A dictionary of environment variables for rendering. Only supported when using ``LoadMode.DBT_LS``. :param dbt_project_path Configures the DBT project location accessible on the airflow controller for DAG rendering. Mutually Exclusive with ProjectConfig.dbt_project_path. Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM``. """ @@ -53,6 +54,7 @@ class RenderConfig: dbt_deps: bool = True node_converters: dict[DbtResourceType, Callable[..., Any]] | None = None dbt_executable_path: str | Path = get_system_dbt() + env_vars: dict[str, str] = field(default_factory=dict) dbt_project_path: InitVar[str | Path | None] = None project_path: Path | None = field(init=False) diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index 40154308b..a890c137c 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -21,7 +21,7 @@ LoadMode, ) from cosmos.dbt.parser.project import LegacyDbtProject -from cosmos.dbt.project import create_symlinks +from cosmos.dbt.project import create_symlinks, environ from cosmos.dbt.selector import select_nodes from cosmos.log import get_logger @@ -88,7 +88,7 @@ def run_command(command: list[str], tmp_dir: Path, env_vars: dict[str, str]) -> "Unable to run dbt ls command due to missing dbt_packages. Set RenderConfig.dbt_deps=True." ) - if returncode or "Error" in stdout: + if returncode or "Error" in stdout.replace("WarnErrorOptions", ""): details = stderr or stdout raise CosmosLoadDbtException(f"Unable to run {command} due to the error:\n{details}") @@ -234,7 +234,9 @@ def load_via_dbt_ls(self) -> None: tmpdir_path = Path(tmpdir) create_symlinks(self.render_config.project_path, tmpdir_path) - with self.profile_config.ensure_profile(use_mock_values=True) as profile_values: + with self.profile_config.ensure_profile(use_mock_values=True) as profile_values, environ( + self.render_config.env_vars + ): (profile_path, env_vars) = profile_values env = os.environ.copy() env.update(env_vars) diff --git a/cosmos/dbt/project.py b/cosmos/dbt/project.py index 63f4fc007..14b2f5e4b 100644 --- a/cosmos/dbt/project.py +++ b/cosmos/dbt/project.py @@ -1,9 +1,13 @@ +from __future__ import annotations + from pathlib import Path import os from cosmos.constants import ( DBT_LOG_DIR_NAME, DBT_TARGET_DIR_NAME, ) +from contextlib import contextmanager +from typing import Generator def create_symlinks(project_path: Path, tmp_dir: Path) -> None: @@ -12,3 +16,20 @@ def create_symlinks(project_path: Path, tmp_dir: Path) -> None: for child_name in os.listdir(project_path): if child_name not in ignore_paths: os.symlink(project_path / child_name, tmp_dir / child_name) + + +@contextmanager +def environ(env_vars: dict[str, str]) -> Generator[None, None, None]: + """Temporarily set environment variables inside the context manager and restore + when exiting. + """ + original_env = {key: os.getenv(key) for key in env_vars} + os.environ.update(env_vars) + try: + yield + finally: + for key, value in original_env.items(): + if value is None: + del os.environ[key] + else: + os.environ[key] = value diff --git a/cosmos/profiles/base.py b/cosmos/profiles/base.py index 36f2ccbc5..b1cebb38b 100644 --- a/cosmos/profiles/base.py +++ b/cosmos/profiles/base.py @@ -18,6 +18,9 @@ if TYPE_CHECKING: from airflow.models import Connection +DBT_PROFILE_TYPE_FIELD = "type" +DBT_PROFILE_METHOD_FIELD = "method" + logger = get_logger(__name__) @@ -41,6 +44,26 @@ class BaseProfileMapping(ABC): def __init__(self, conn_id: str, profile_args: dict[str, Any] | None = None): self.conn_id = conn_id self.profile_args = profile_args or {} + self._validate_profile_args() + + def _validate_profile_args(self) -> None: + """ + Check if profile_args contains keys that should not be overridden from the + class variables when creating the profile. + """ + for profile_field in [DBT_PROFILE_TYPE_FIELD, DBT_PROFILE_METHOD_FIELD]: + if profile_field in self.profile_args and self.profile_args.get(profile_field) != getattr( + self, f"dbt_profile_{profile_field}" + ): + raise CosmosValueError( + "`profile_args` for {0} has {1}='{2}' that will override the dbt profile required value of '{3}'. " + "To fix this, remove {1} from `profile_args`.".format( + self.__class__.__name__, + profile_field, + self.profile_args.get(profile_field), + getattr(self, f"dbt_profile_{profile_field}"), + ) + ) @property def conn(self) -> Connection: @@ -100,11 +123,11 @@ def mock_profile(self) -> dict[str, Any]: where live connection values don't matter. """ mock_profile = { - "type": self.dbt_profile_type, + DBT_PROFILE_TYPE_FIELD: self.dbt_profile_type, } if self.dbt_profile_method: - mock_profile["method"] = self.dbt_profile_method + mock_profile[DBT_PROFILE_METHOD_FIELD] = self.dbt_profile_method for field in self.required_fields: # if someone has passed in a value for this field, use it @@ -199,11 +222,11 @@ def get_dbt_value(self, name: str) -> Any: def mapped_params(self) -> dict[str, Any]: "Turns the self.airflow_param_mapping into a dictionary of dbt fields and their values." mapped_params = { - "type": self.dbt_profile_type, + DBT_PROFILE_TYPE_FIELD: self.dbt_profile_type, } if self.dbt_profile_method: - mapped_params["method"] = self.dbt_profile_method + mapped_params[DBT_PROFILE_METHOD_FIELD] = self.dbt_profile_method for dbt_field in self.airflow_param_mapping: mapped_params[dbt_field] = self.get_dbt_value(dbt_field) diff --git a/dev/dags/example_cosmos_sources.py b/dev/dags/example_cosmos_sources.py index 29c70db5a..157b3adb3 100644 --- a/dev/dags/example_cosmos_sources.py +++ b/dev/dags/example_cosmos_sources.py @@ -26,7 +26,7 @@ DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt" DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH)) -os.environ["DBT_SQLITE_PATH"] = str(DEFAULT_DBT_ROOT_PATH / "data") +DBT_SQLITE_PATH = str(DEFAULT_DBT_ROOT_PATH / "data") profile_config = ProfileConfig( @@ -62,7 +62,8 @@ def convert_exposure(dag: DAG, task_group: TaskGroup, node: DbtNode, **kwargs): node_converters={ DbtResourceType("source"): convert_source, # known dbt node type to Cosmos (part of DbtResourceType) DbtResourceType("exposure"): convert_exposure, # dbt node type new to Cosmos (will be added to DbtResourceType) - } + }, + env_vars={"DBT_SQLITE_PATH": DBT_SQLITE_PATH}, ) @@ -73,7 +74,7 @@ def convert_exposure(dag: DAG, task_group: TaskGroup, node: DbtNode, **kwargs): ), profile_config=profile_config, render_config=render_config, - operator_args={"append_env": True}, + operator_args={"env": {"DBT_SQLITE_PATH": DBT_SQLITE_PATH}}, # normal dag parameters schedule_interval="@daily", start_date=datetime(2023, 1, 1), diff --git a/docs/_static/jaffle_shop_dag.png b/docs/_static/jaffle_shop_dag.png new file mode 100644 index 000000000..1a8cdf5cb Binary files /dev/null and b/docs/_static/jaffle_shop_dag.png differ diff --git a/docs/configuration/render-config.rst b/docs/configuration/render-config.rst index de0a08cdb..5e1c23824 100644 --- a/docs/configuration/render-config.rst +++ b/docs/configuration/render-config.rst @@ -14,6 +14,7 @@ The ``RenderConfig`` class takes the following arguments: - ``dbt_deps``: A Boolean to run dbt deps when using dbt ls for dag parsing. Default True - ``node_converters``: a dictionary mapping a ``DbtResourceType`` into a callable. Users can control how to render dbt nodes in Airflow. Only supported when using ``load_method=LoadMode.DBT_MANIFEST`` or ``LoadMode.DBT_LS``. Find more information below. - ``dbt_executable_path``: The path to the dbt executable for dag generation. Defaults to dbt if available on the path. +- ``env_vars``: A dictionary of environment variables for rendering. Only supported when using ``load_method=LoadMode.DBT_LS``. - ``dbt_project_path``: Configures the DBT project location accessible on their airflow controller for DAG rendering - Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM`` Customizing how nodes are rendered (experimental) diff --git a/docs/index.rst b/docs/index.rst index 3c61b645d..f5cd3673d 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -40,53 +40,25 @@ Run your dbt Core projects as `Apache Airflow `_ DA Example Usage ___________________ -You can render an Airflow Task Group using the ``DbtTaskGroup`` class. Here's an example with the jaffle_shop project: +You can render a Cosmos Airflow DAG using the ``DbtDag`` class. Here's an example with the `jaffle_shop project `_: -.. code-block:: python +.. + The following renders in Sphinx but not Github: - from pendulum import datetime +.. literalinclude:: ./dev/dags/basic_cosmos_dag.py + :language: python + :start-after: [START local_example] + :end-before: [END local_example] - from airflow import DAG - from airflow.operators.empty import EmptyOperator - from cosmos import DbtTaskGroup +This will generate an Airflow DAG that looks like this: - profile_config = ProfileConfig( - profile_name="default", - target_name="dev", - profile_mapping=PostgresUserPasswordProfileMapping( - conn_id="airflow_db", - profile_args={"schema": "public"}, - ), - ) - - with DAG( - dag_id="extract_dag", - start_date=datetime(2022, 11, 27), - schedule_interval="@daily", - ): - e1 = EmptyOperator(task_id="pre_dbt") - - dbt_tg = DbtTaskGroup( - project_config=ProjectConfig("jaffle_shop"), - profile_config=profile_config, - default_args={"retries": 2}, - ) - - e2 = EmptyOperator(task_id="post_dbt") - - e1 >> dbt_tg >> e2 - - -This will generate an Airflow Task Group that looks like this: - -.. image:: https://raw.githubusercontent.com/astronomer/astronomer-cosmos/main/docs/_static/jaffle_shop_task_group.png - +.. image:: https://raw.githubusercontent.com/astronomer/astronomer-cosmos/main/docs/_static/jaffle_shop_dag.png Getting Started _______________ -To get started now, check out the `Getting Started Guide `_. +Check out the Quickstart guide on our `docs `_. See more examples at `/dev/dags `_ and at the `cosmos-demo repo `_. Changelog diff --git a/docs/requirements.txt b/docs/requirements.txt index 6ead43485..420d62a59 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -4,4 +4,5 @@ pydata-sphinx-theme sphinx-autobuild sphinx-autoapi apache-airflow +apache-airflow-providers-cncf-kubernetes>=5.1.1 openlineage-airflow diff --git a/pyproject.toml b/pyproject.toml index 0041d9488..df304c3b5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -96,7 +96,8 @@ docs =[ "sphinx", "pydata-sphinx-theme", "sphinx-autobuild", - "sphinx-autoapi" + "sphinx-autoapi", + "apache-airflow-providers-cncf-kubernetes>=5.1.1" ] tests = [ "packaging", @@ -227,6 +228,7 @@ dependencies = [ "sphinx-autobuild", "sphinx-autoapi", "openlineage-airflow", + "apache-airflow-providers-cncf-kubernetes>=5.1.1" ] [tool.hatch.envs.docs.scripts] diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 3e3218259..a424976a1 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -392,7 +392,11 @@ def test_load_via_dbt_ls_with_sources(load_method): dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name, manifest_path=SAMPLE_MANIFEST_SOURCE if load_method == "load_from_dbt_manifest" else None, ), - render_config=RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name, dbt_deps=False), + render_config=RenderConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name, + dbt_deps=False, + env_vars={"DBT_SQLITE_PATH": str(DBT_PROJECTS_ROOT_DIR / "data")}, + ), execution_config=ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name), profile_config=ProfileConfig( profile_name="simple", @@ -679,6 +683,7 @@ def test_load_dbt_ls_and_manifest_with_model_version(load_method): "stdout,returncode", [ ("all good", None), + ("WarnErrorOptions", None), pytest.param("fail", 599, marks=pytest.mark.xfail(raises=CosmosLoadDbtException)), pytest.param("Error", None, marks=pytest.mark.xfail(raises=CosmosLoadDbtException)), ], diff --git a/tests/dbt/test_project.py b/tests/dbt/test_project.py index bd2555c98..ec5612904 100644 --- a/tests/dbt/test_project.py +++ b/tests/dbt/test_project.py @@ -1,5 +1,7 @@ from pathlib import Path -from cosmos.dbt.project import create_symlinks +from cosmos.dbt.project import create_symlinks, environ +import os +from unittest.mock import patch DBT_PROJECTS_ROOT_DIR = Path(__file__).parent.parent.parent / "dev/dags/dbt" @@ -13,3 +15,21 @@ def test_create_symlinks(tmp_path): for child in tmp_dir.iterdir(): assert child.is_symlink() assert child.name not in ("logs", "target", "profiles.yml", "dbt_packages") + + +@patch.dict(os.environ, {"VAR1": "value1", "VAR2": "value2"}) +def test_environ_context_manager(): + # Define the expected environment variables + expected_env_vars = {"VAR2": "new_value2", "VAR3": "value3"} + # Use the environ context manager + with environ(expected_env_vars): + # Check if the environment variables are set correctly + for key, value in expected_env_vars.items(): + assert value == os.environ.get(key) + # Check if the original non-overlapping environment variable is still set + assert "value1" == os.environ.get("VAR1") + # Check if the environment variables are unset after exiting the context manager + assert os.environ.get("VAR3") is None + # Check if the original environment variables are still set + assert "value1" == os.environ.get("VAR1") + assert "value2" == os.environ.get("VAR2") diff --git a/tests/profiles/test_base_profile.py b/tests/profiles/test_base_profile.py new file mode 100644 index 000000000..1b1ba3e8a --- /dev/null +++ b/tests/profiles/test_base_profile.py @@ -0,0 +1,31 @@ +import pytest +from cosmos.profiles.base import BaseProfileMapping +from cosmos.exceptions import CosmosValueError + + +class TestProfileMapping(BaseProfileMapping): + dbt_profile_method: str = "fake-method" + dbt_profile_type: str = "fake-type" + + def profile(self): + raise NotImplementedError + + +@pytest.mark.parametrize("profile_arg", ["type", "method"]) +def test_validate_profile_args(profile_arg: str): + """ + An error should be raised if the profile_args contains a key that should not be overridden from the class variables. + """ + profile_args = {profile_arg: "fake-value"} + dbt_profile_value = getattr(TestProfileMapping, f"dbt_profile_{profile_arg}") + + expected_cosmos_error = ( + f"`profile_args` for TestProfileMapping has {profile_arg}='fake-value' that will override the dbt profile required value of " + f"'{dbt_profile_value}'. To fix this, remove {profile_arg} from `profile_args`." + ) + + with pytest.raises(CosmosValueError, match=expected_cosmos_error): + TestProfileMapping( + conn_id="fake_conn_id", + profile_args=profile_args, + )