Skip to content

Commit

Permalink
Merge branch 'astronomer:main' into patch-1
Browse files Browse the repository at this point in the history
  • Loading branch information
FouziaTariq authored Nov 25, 2023
2 parents 633d5fd + 1aace94 commit 472f14e
Show file tree
Hide file tree
Showing 16 changed files with 167 additions and 92 deletions.
28 changes: 24 additions & 4 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,16 +1,36 @@
Changelog
=========

1.3.0a1 (2023-10-26)
1.3.0a2 (2023-11-23)
--------------------

Features

* Add ``ProfileMapping`` for Vertica by @perttus in #540
* Add ``ProfileMapping`` for Vertica by @perttus in #540 and #688
* Add ``ProfileMapping`` for Snowflake encrypted private key path by @ivanstillfront in #608
* Add support for Snowflake encrypted private key environment variable by @DanMawdsleyBA in #649
* Add ``DbtDocsGCSOperator`` for uploading dbt docs to GCS by @jbandoro in #616


1.2.5 (2023-11-23)
------------------

Bug fixes

* Fix running models that use alias while supporting dbt versions by @binhnq94 in #662
* Make ``profiles_yml_path`` optional for ``ExecutionMode.DOCKER`` and ``KUBERNETES`` by @MrBones757 in #681
* Prevent overriding dbt profile fields with profile args of "type" or "method" by @jbandoro in #702
* Fix ``LoadMode.DBT_LS`` fail when dbt outputs ``WarnErrorOptions`` by @adammarples in #692
* Add support for env vars in ``RenderConfig`` for dbt ls parsing by @jbandoro in #690
* Add support for Kubernetes ``on_warning_callback`` by @david-mag in #673
* Fix ``ExecutionConfig.dbt_executable_path`` to use ``default_factory`` by @jbandoro in #678

Others

* Docs fix: example DAG in the README and docs/index by @tatiana in #705
* Docs improvement: highlight DAG examples in README by @iancmoritz and @jlaneve in #695


1.2.4 (2023-11-14)
------------------

Expand All @@ -23,8 +43,8 @@ Bug fixes

Others

* Docs fix: add execution config to MWAA code example by @ugmuka in #674

* Docs: add execution config to MWAA code example by @ugmuka in #674
* Docs: highlight DAG examples in docs by @iancmoritz and @jlaneve in #695

1.2.3 (2023-11-09)
------------------
Expand Down
45 changes: 9 additions & 36 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,64 +31,37 @@ Run your dbt Core projects as `Apache Airflow <https://airflow.apache.org/>`_ DA
Quickstart
__________

Check out the Quickstart guide on our `docs <https://astronomer.github.io/astronomer-cosmos/#quickstart>`_.
Check out the Quickstart guide on our `docs <https://astronomer.github.io/astronomer-cosmos/#quickstart>`_. See more examples at `/dev/dags <https://github.com/astronomer/astronomer-cosmos/tree/main/dev/dags>`_ and at the `cosmos-demo repo <https://github.com/astronomer/cosmos-demo>`_.


Example Usage
___________________

You can render an Airflow Task Group using the ``DbtTaskGroup`` class. Here's an example with the `jaffle_shop project <https://github.com/dbt-labs/jaffle_shop>`_:
You can render a Cosmos Airflow DAG using the ``DbtDag`` class. Here's an example with the `jaffle_shop project <https://github.com/dbt-labs/jaffle_shop>`_:

..
This renders on Github but not Sphinx:
.. code-block:: python
https://github.com/astronomer/astronomer-cosmos/blob/24aa38e528e299ef51ca6baf32f5a6185887d432/dev/dags/basic_cosmos_dag.py#L1-L42

from pendulum import datetime
This will generate an Airflow DAG that looks like this:

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from cosmos import DbtTaskGroup, ProfileConfig, ProjectConfig
from cosmos.profiles import PostgresUserPasswordProfileMapping
.. figure:: /docs/_static/jaffle_shop_dag.png

profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="airflow_db",
profile_args={"schema": "public"},
),
)
with DAG(
dag_id="extract_dag",
start_date=datetime(2022, 11, 27),
schedule_interval="@daily",
):
e1 = EmptyOperator(task_id="pre_dbt")
dbt_tg = DbtTaskGroup(
project_config=ProjectConfig("jaffle_shop"),
profile_config=profile_config,
)
e2 = EmptyOperator(task_id="post_dbt")
e1 >> dbt_tg >> e2
This will generate an Airflow Task Group that looks like this:

.. figure:: /docs/_static/jaffle_shop_task_group.png

Community
_________
- Join us on the Airflow `Slack <https://join.slack.com/t/apache-airflow/shared_invite/zt-1zy8e8h85-es~fn19iMzUmkhPwnyRT6Q>`_ at #airflow-dbt


Changelog
_________

We follow `Semantic Versioning <https://semver.org/>`_ for releases.
Check `CHANGELOG.rst <https://github.com/astronomer/astronomer-cosmos/blob/main/CHANGELOG.rst>`_
for the latest changes.


Contributing Guide
__________________

Expand Down
3 changes: 2 additions & 1 deletion cosmos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
Contains dags, task groups, and operators.
"""
__version__ = "1.2.4"
__version__ = "1.3.0a2"


from cosmos.airflow.dag import DbtDag
from cosmos.airflow.task_group import DbtTaskGroup
Expand Down
2 changes: 2 additions & 0 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class RenderConfig:
:param dbt_deps: Configure to run dbt deps when using dbt ls for dag parsing
:param node_converters: a dictionary mapping a ``DbtResourceType`` into a callable. Users can control how to render dbt nodes in Airflow. Only supported when using ``load_method=LoadMode.DBT_MANIFEST`` or ``LoadMode.DBT_LS``.
:param dbt_executable_path: The path to the dbt executable for dag generation. Defaults to dbt if available on the path.
:param env_vars: A dictionary of environment variables for rendering. Only supported when using ``LoadMode.DBT_LS``.
:param dbt_project_path Configures the DBT project location accessible on the airflow controller for DAG rendering. Mutually Exclusive with ProjectConfig.dbt_project_path. Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM``.
"""

Expand All @@ -53,6 +54,7 @@ class RenderConfig:
dbt_deps: bool = True
node_converters: dict[DbtResourceType, Callable[..., Any]] | None = None
dbt_executable_path: str | Path = get_system_dbt()
env_vars: dict[str, str] = field(default_factory=dict)
dbt_project_path: InitVar[str | Path | None] = None

project_path: Path | None = field(init=False)
Expand Down
8 changes: 5 additions & 3 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
LoadMode,
)
from cosmos.dbt.parser.project import LegacyDbtProject
from cosmos.dbt.project import create_symlinks
from cosmos.dbt.project import create_symlinks, environ
from cosmos.dbt.selector import select_nodes
from cosmos.log import get_logger

Expand Down Expand Up @@ -88,7 +88,7 @@ def run_command(command: list[str], tmp_dir: Path, env_vars: dict[str, str]) ->
"Unable to run dbt ls command due to missing dbt_packages. Set RenderConfig.dbt_deps=True."
)

if returncode or "Error" in stdout:
if returncode or "Error" in stdout.replace("WarnErrorOptions", ""):
details = stderr or stdout
raise CosmosLoadDbtException(f"Unable to run {command} due to the error:\n{details}")

Expand Down Expand Up @@ -234,7 +234,9 @@ def load_via_dbt_ls(self) -> None:
tmpdir_path = Path(tmpdir)
create_symlinks(self.render_config.project_path, tmpdir_path)

with self.profile_config.ensure_profile(use_mock_values=True) as profile_values:
with self.profile_config.ensure_profile(use_mock_values=True) as profile_values, environ(
self.render_config.env_vars
):
(profile_path, env_vars) = profile_values
env = os.environ.copy()
env.update(env_vars)
Expand Down
21 changes: 21 additions & 0 deletions cosmos/dbt/project.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
from __future__ import annotations

from pathlib import Path
import os
from cosmos.constants import (
DBT_LOG_DIR_NAME,
DBT_TARGET_DIR_NAME,
)
from contextlib import contextmanager
from typing import Generator


def create_symlinks(project_path: Path, tmp_dir: Path) -> None:
Expand All @@ -12,3 +16,20 @@ def create_symlinks(project_path: Path, tmp_dir: Path) -> None:
for child_name in os.listdir(project_path):
if child_name not in ignore_paths:
os.symlink(project_path / child_name, tmp_dir / child_name)


@contextmanager
def environ(env_vars: dict[str, str]) -> Generator[None, None, None]:
"""Temporarily set environment variables inside the context manager and restore
when exiting.
"""
original_env = {key: os.getenv(key) for key in env_vars}
os.environ.update(env_vars)
try:
yield
finally:
for key, value in original_env.items():
if value is None:
del os.environ[key]
else:
os.environ[key] = value
31 changes: 27 additions & 4 deletions cosmos/profiles/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
if TYPE_CHECKING:
from airflow.models import Connection

DBT_PROFILE_TYPE_FIELD = "type"
DBT_PROFILE_METHOD_FIELD = "method"

logger = get_logger(__name__)


Expand All @@ -41,6 +44,26 @@ class BaseProfileMapping(ABC):
def __init__(self, conn_id: str, profile_args: dict[str, Any] | None = None):
self.conn_id = conn_id
self.profile_args = profile_args or {}
self._validate_profile_args()

def _validate_profile_args(self) -> None:
"""
Check if profile_args contains keys that should not be overridden from the
class variables when creating the profile.
"""
for profile_field in [DBT_PROFILE_TYPE_FIELD, DBT_PROFILE_METHOD_FIELD]:
if profile_field in self.profile_args and self.profile_args.get(profile_field) != getattr(
self, f"dbt_profile_{profile_field}"
):
raise CosmosValueError(
"`profile_args` for {0} has {1}='{2}' that will override the dbt profile required value of '{3}'. "
"To fix this, remove {1} from `profile_args`.".format(
self.__class__.__name__,
profile_field,
self.profile_args.get(profile_field),
getattr(self, f"dbt_profile_{profile_field}"),
)
)

@property
def conn(self) -> Connection:
Expand Down Expand Up @@ -100,11 +123,11 @@ def mock_profile(self) -> dict[str, Any]:
where live connection values don't matter.
"""
mock_profile = {
"type": self.dbt_profile_type,
DBT_PROFILE_TYPE_FIELD: self.dbt_profile_type,
}

if self.dbt_profile_method:
mock_profile["method"] = self.dbt_profile_method
mock_profile[DBT_PROFILE_METHOD_FIELD] = self.dbt_profile_method

for field in self.required_fields:
# if someone has passed in a value for this field, use it
Expand Down Expand Up @@ -199,11 +222,11 @@ def get_dbt_value(self, name: str) -> Any:
def mapped_params(self) -> dict[str, Any]:
"Turns the self.airflow_param_mapping into a dictionary of dbt fields and their values."
mapped_params = {
"type": self.dbt_profile_type,
DBT_PROFILE_TYPE_FIELD: self.dbt_profile_type,
}

if self.dbt_profile_method:
mapped_params["method"] = self.dbt_profile_method
mapped_params[DBT_PROFILE_METHOD_FIELD] = self.dbt_profile_method

for dbt_field in self.airflow_param_mapping:
mapped_params[dbt_field] = self.get_dbt_value(dbt_field)
Expand Down
7 changes: 4 additions & 3 deletions dev/dags/example_cosmos_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))

os.environ["DBT_SQLITE_PATH"] = str(DEFAULT_DBT_ROOT_PATH / "data")
DBT_SQLITE_PATH = str(DEFAULT_DBT_ROOT_PATH / "data")


profile_config = ProfileConfig(
Expand Down Expand Up @@ -62,7 +62,8 @@ def convert_exposure(dag: DAG, task_group: TaskGroup, node: DbtNode, **kwargs):
node_converters={
DbtResourceType("source"): convert_source, # known dbt node type to Cosmos (part of DbtResourceType)
DbtResourceType("exposure"): convert_exposure, # dbt node type new to Cosmos (will be added to DbtResourceType)
}
},
env_vars={"DBT_SQLITE_PATH": DBT_SQLITE_PATH},
)


Expand All @@ -73,7 +74,7 @@ def convert_exposure(dag: DAG, task_group: TaskGroup, node: DbtNode, **kwargs):
),
profile_config=profile_config,
render_config=render_config,
operator_args={"append_env": True},
operator_args={"env": {"DBT_SQLITE_PATH": DBT_SQLITE_PATH}},
# normal dag parameters
schedule_interval="@daily",
start_date=datetime(2023, 1, 1),
Expand Down
Binary file added docs/_static/jaffle_shop_dag.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions docs/configuration/render-config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ The ``RenderConfig`` class takes the following arguments:
- ``dbt_deps``: A Boolean to run dbt deps when using dbt ls for dag parsing. Default True
- ``node_converters``: a dictionary mapping a ``DbtResourceType`` into a callable. Users can control how to render dbt nodes in Airflow. Only supported when using ``load_method=LoadMode.DBT_MANIFEST`` or ``LoadMode.DBT_LS``. Find more information below.
- ``dbt_executable_path``: The path to the dbt executable for dag generation. Defaults to dbt if available on the path.
- ``env_vars``: A dictionary of environment variables for rendering. Only supported when using ``load_method=LoadMode.DBT_LS``.
- ``dbt_project_path``: Configures the DBT project location accessible on their airflow controller for DAG rendering - Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM``

Customizing how nodes are rendered (experimental)
Expand Down
48 changes: 10 additions & 38 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,53 +40,25 @@ Run your dbt Core projects as `Apache Airflow <https://airflow.apache.org/>`_ DA
Example Usage
___________________

You can render an Airflow Task Group using the ``DbtTaskGroup`` class. Here's an example with the jaffle_shop project:
You can render a Cosmos Airflow DAG using the ``DbtDag`` class. Here's an example with the `jaffle_shop project <https://github.com/dbt-labs/jaffle_shop>`_:

.. code-block:: python
..
The following renders in Sphinx but not Github:
from pendulum import datetime
.. literalinclude:: ./dev/dags/basic_cosmos_dag.py
:language: python
:start-after: [START local_example]
:end-before: [END local_example]

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from cosmos import DbtTaskGroup

This will generate an Airflow DAG that looks like this:

profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="airflow_db",
profile_args={"schema": "public"},
),
)
with DAG(
dag_id="extract_dag",
start_date=datetime(2022, 11, 27),
schedule_interval="@daily",
):
e1 = EmptyOperator(task_id="pre_dbt")
dbt_tg = DbtTaskGroup(
project_config=ProjectConfig("jaffle_shop"),
profile_config=profile_config,
default_args={"retries": 2},
)
e2 = EmptyOperator(task_id="post_dbt")
e1 >> dbt_tg >> e2
This will generate an Airflow Task Group that looks like this:

.. image:: https://raw.githubusercontent.com/astronomer/astronomer-cosmos/main/docs/_static/jaffle_shop_task_group.png

.. image:: https://raw.githubusercontent.com/astronomer/astronomer-cosmos/main/docs/_static/jaffle_shop_dag.png

Getting Started
_______________

To get started now, check out the `Getting Started Guide <getting_started/index.html>`_.
Check out the Quickstart guide on our `docs <https://astronomer.github.io/astronomer-cosmos/#quickstart>`_. See more examples at `/dev/dags <https://github.com/astronomer/astronomer-cosmos/tree/main/dev/dags>`_ and at the `cosmos-demo repo <https://github.com/astronomer/cosmos-demo>`_.


Changelog
Expand Down
1 change: 1 addition & 0 deletions docs/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ pydata-sphinx-theme
sphinx-autobuild
sphinx-autoapi
apache-airflow
apache-airflow-providers-cncf-kubernetes>=5.1.1
openlineage-airflow
Loading

0 comments on commit 472f14e

Please sign in to comment.