Skip to content

Commit

Permalink
Merge branch 'main' into replace-flake-for-ruff
Browse files Browse the repository at this point in the history
  • Loading branch information
joppevos authored Dec 4, 2023
2 parents 734b886 + e1f34ea commit bb4d1bd
Show file tree
Hide file tree
Showing 57 changed files with 1,481 additions and 326 deletions.
4 changes: 3 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ repos:
args:
- --exclude-file=tests/sample/manifest_model_version.json
- --skip=**/manifest.json
- -L connexion
- repo: https://github.com/pre-commit/pygrep-hooks
rev: v1.10.0
hooks:
Expand All @@ -53,7 +54,7 @@ repos:
- --py37-plus
- --keep-runtime-typing
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.1.4
rev: v0.1.6
hooks:
- id: ruff
args:
Expand All @@ -71,6 +72,7 @@ repos:
additional_dependencies: [black>=22.10.0]
- repo: https://github.com/pre-commit/mirrors-mypy
rev: "v1.7.1"

hooks:
- id: mypy
name: mypy-python
Expand Down
47 changes: 46 additions & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,11 +1,56 @@
Changelog
=========

1.3.0a1 (2023-10-26)
1.3.0a2 (2023-11-23)
--------------------

Features

* Add ``ProfileMapping`` for Vertica by @perttus in #540 and #688
* Add ``ProfileMapping`` for Snowflake encrypted private key path by @ivanstillfront in #608
* Add support for Snowflake encrypted private key environment variable by @DanMawdsleyBA in #649
* Add ``DbtDocsGCSOperator`` for uploading dbt docs to GCS by @jbandoro in #616


1.2.5 (2023-11-23)
------------------

Bug fixes

* Fix running models that use alias while supporting dbt versions by @binhnq94 in #662
* Make ``profiles_yml_path`` optional for ``ExecutionMode.DOCKER`` and ``KUBERNETES`` by @MrBones757 in #681
* Prevent overriding dbt profile fields with profile args of "type" or "method" by @jbandoro in #702
* Fix ``LoadMode.DBT_LS`` fail when dbt outputs ``WarnErrorOptions`` by @adammarples in #692
* Add support for env vars in ``RenderConfig`` for dbt ls parsing by @jbandoro in #690
* Add support for Kubernetes ``on_warning_callback`` by @david-mag in #673
* Fix ``ExecutionConfig.dbt_executable_path`` to use ``default_factory`` by @jbandoro in #678

Others

* Docs fix: example DAG in the README and docs/index by @tatiana in #705
* Docs improvement: highlight DAG examples in README by @iancmoritz and @jlaneve in #695


1.2.4 (2023-11-14)
------------------

Bug fixes

* Store ``compiled_sql`` even when task fails by @agreenburg in #671
* Refactor ``LoadMethod.LOCAL`` to use symlinks instead of copying directory by @jbandoro in #660
* Fix 'Unable to find the dbt executable: dbt' error by @tatiana in #666
* Fix installing deps when using ``profile_mapping`` & ``ExecutionMode.LOCAL`` by @joppevos in #659

Others

* Docs: add execution config to MWAA code example by @ugmuka in #674
* Docs: highlight DAG examples in docs by @iancmoritz and @jlaneve in #695

1.2.3 (2023-11-09)
------------------

Features

* Add ``ProfileMapping`` for Vertica by @perttus in #540
* Add ``ProfileMapping`` for Snowflake encrypted private key path by @ivanstillfront in #608
* Add ``DbtDocsGCSOperator`` for uploading dbt docs to GCS by @jbandoro in #616
Expand Down
45 changes: 9 additions & 36 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,64 +31,37 @@ Run your dbt Core projects as `Apache Airflow <https://airflow.apache.org/>`_ DA
Quickstart
__________

Check out the Quickstart guide on our `docs <https://astronomer.github.io/astronomer-cosmos/#quickstart>`_.
Check out the Quickstart guide on our `docs <https://astronomer.github.io/astronomer-cosmos/#quickstart>`_. See more examples at `/dev/dags <https://github.com/astronomer/astronomer-cosmos/tree/main/dev/dags>`_ and at the `cosmos-demo repo <https://github.com/astronomer/cosmos-demo>`_.


Example Usage
___________________

You can render an Airflow Task Group using the ``DbtTaskGroup`` class. Here's an example with the `jaffle_shop project <https://github.com/dbt-labs/jaffle_shop>`_:
You can render a Cosmos Airflow DAG using the ``DbtDag`` class. Here's an example with the `jaffle_shop project <https://github.com/dbt-labs/jaffle_shop>`_:

..
This renders on Github but not Sphinx:
.. code-block:: python
https://github.com/astronomer/astronomer-cosmos/blob/24aa38e528e299ef51ca6baf32f5a6185887d432/dev/dags/basic_cosmos_dag.py#L1-L42

from pendulum import datetime
This will generate an Airflow DAG that looks like this:

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from cosmos import DbtTaskGroup, ProfileConfig, ProjectConfig
from cosmos.profiles import PostgresUserPasswordProfileMapping
.. figure:: /docs/_static/jaffle_shop_dag.png

profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="airflow_db",
profile_args={"schema": "public"},
),
)
with DAG(
dag_id="extract_dag",
start_date=datetime(2022, 11, 27),
schedule_interval="@daily",
):
e1 = EmptyOperator(task_id="pre_dbt")
dbt_tg = DbtTaskGroup(
project_config=ProjectConfig("jaffle_shop"),
profile_config=profile_config,
)
e2 = EmptyOperator(task_id="post_dbt")
e1 >> dbt_tg >> e2
This will generate an Airflow Task Group that looks like this:

.. figure:: /docs/_static/jaffle_shop_task_group.png

Community
_________
- Join us on the Airflow `Slack <https://join.slack.com/t/apache-airflow/shared_invite/zt-1zy8e8h85-es~fn19iMzUmkhPwnyRT6Q>`_ at #airflow-dbt


Changelog
_________

We follow `Semantic Versioning <https://semver.org/>`_ for releases.
Check `CHANGELOG.rst <https://github.com/astronomer/astronomer-cosmos/blob/main/CHANGELOG.rst>`_
for the latest changes.


Contributing Guide
__________________

Expand Down
30 changes: 29 additions & 1 deletion cosmos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
Contains dags, task groups, and operators.
"""
__version__ = "1.3.0a1"
__version__ = "1.3.0a2"


from cosmos.airflow.dag import DbtDag
from cosmos.airflow.task_group import DbtTaskGroup
Expand Down Expand Up @@ -116,3 +117,30 @@
"LoadMode",
"TestBehavior",
]

"""
Required provider info for using Airflow config for configuration
"""


def get_provider_info():
return {
"package-name": "astronomer-cosmos", # Required
"name": "Astronomer Cosmos", # Required
"description": "Astronomer Cosmos is a library for rendering dbt workflows in Airflow. Contains dags, task groups, and operators.", # Required
"versions": [__version__], # Required
"config": {
"cosmos": {
"description": None,
"options": {
"propagate_logs": {
"description": "Enable log propagation from Cosmos custom logger\n",
"version_added": "1.3.0a1",
"type": "boolean",
"example": None,
"default": "True",
},
},
},
},
}
12 changes: 6 additions & 6 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ def create_test_task_metadata(
task_args["indirect_selection"] = test_indirect_selection.value
if node is not None:
if node.resource_type == DbtResourceType.MODEL:
task_args["models"] = node.name
task_args["models"] = node.resource_name
elif node.resource_type == DbtResourceType.SOURCE:
task_args["select"] = f"source:{node.unique_id[len('source.'):]}"
task_args["select"] = f"source:{node.resource_name}"
else: # tested with node.resource_type == DbtResourceType.SEED or DbtResourceType.SNAPSHOT
task_args["select"] = node.name
task_args["select"] = node.resource_name
return TaskMetadata(
id=test_task_name,
operator_class=calculate_operator_class(
Expand All @@ -108,8 +108,8 @@ def create_task_metadata(
:param execution_mode: Where Cosmos should run each dbt task (e.g. ExecutionMode.LOCAL, ExecutionMode.KUBERNETES).
Default is ExecutionMode.LOCAL.
:param args: Arguments to be used to instantiate an Airflow Task
:param use_name_as_task_id_prefix: If resource_type is DbtResourceType.MODEL, it determines whether
using name as task id prefix or not. If it is True task_id = <node.name>_run, else task_id=run.
:param use_task_group: It determines whether to use the name as a prefix for the task id or not.
If it is False, then use the name as a prefix for the task id, otherwise do not.
:returns: The metadata necessary to instantiate the source dbt node as an Airflow task.
"""
dbt_resource_to_class = {
Expand All @@ -118,7 +118,7 @@ def create_task_metadata(
DbtResourceType.SEED: "DbtSeed",
DbtResourceType.TEST: "DbtTest",
}
args = {**args, **{"models": node.name}}
args = {**args, **{"models": node.resource_name}}

if DbtResourceType(node.resource_type) in DEFAULT_DBT_RESOURCES and node.resource_type in dbt_resource_to_class:
if node.resource_type == DbtResourceType.MODEL:
Expand Down
52 changes: 45 additions & 7 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations

import contextlib
import shutil
import tempfile
from dataclasses import InitVar, dataclass, field
from pathlib import Path
Expand All @@ -19,6 +20,14 @@
DEFAULT_PROFILES_FILE_NAME = "profiles.yml"


class CosmosConfigException(Exception):
"""
Exceptions related to user misconfiguration.
"""

pass


@dataclass
class RenderConfig:
"""
Expand All @@ -32,8 +41,9 @@ class RenderConfig:
:param exclude: A list of dbt exclude arguments (e.g. 'tag:nightly')
:param dbt_deps: Configure to run dbt deps when using dbt ls for dag parsing
:param node_converters: a dictionary mapping a ``DbtResourceType`` into a callable. Users can control how to render dbt nodes in Airflow. Only supported when using ``load_method=LoadMode.DBT_MANIFEST`` or ``LoadMode.DBT_LS``.
:param dbt_executable_path: The path to the dbt executable for dag generation. Defaults to dbt if available on the path. Mutually Exclusive with ProjectConfig.dbt_project_path
:param dbt_project_path Configures the DBT project location accessible on the airflow controller for DAG rendering - Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM``
:param dbt_executable_path: The path to the dbt executable for dag generation. Defaults to dbt if available on the path.
:param env_vars: A dictionary of environment variables for rendering. Only supported when using ``LoadMode.DBT_LS``.
:param dbt_project_path Configures the DBT project location accessible on the airflow controller for DAG rendering. Mutually Exclusive with ProjectConfig.dbt_project_path. Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM``.
"""

emit_datasets: bool = True
Expand All @@ -44,13 +54,35 @@ class RenderConfig:
dbt_deps: bool = True
node_converters: dict[DbtResourceType, Callable[..., Any]] | None = None
dbt_executable_path: str | Path = get_system_dbt()
env_vars: dict[str, str] = field(default_factory=dict)
dbt_project_path: InitVar[str | Path | None] = None

project_path: Path | None = field(init=False)

def __post_init__(self, dbt_project_path: str | Path | None) -> None:
self.project_path = Path(dbt_project_path) if dbt_project_path else None

def validate_dbt_command(self, fallback_cmd: str | Path = "") -> None:
"""
When using LoadMode.DBT_LS, the dbt executable path is necessary for rendering.
Validates that the original dbt command works, if not, attempt to use the fallback_dbt_cmd.
If neither works, raise an exception.
The fallback behaviour is necessary for Cosmos < 1.2.2 backwards compatibility.
"""
if not shutil.which(self.dbt_executable_path):
if isinstance(fallback_cmd, Path):
fallback_cmd = fallback_cmd.as_posix()

if fallback_cmd and shutil.which(fallback_cmd):
self.dbt_executable_path = fallback_cmd
else:
raise CosmosConfigException(
"Unable to find the dbt executable, attempted: "
f"<{self.dbt_executable_path}>" + (f" and <{fallback_cmd}>." if fallback_cmd else ".")
)


class ProjectConfig:
"""
Expand Down Expand Up @@ -165,15 +197,21 @@ class ProfileConfig:
profile_mapping: BaseProfileMapping | None = None

def __post_init__(self) -> None:
"Validates that we have enough information to render a profile."
# if using a user-supplied profiles.yml, validate that it exists
if self.profiles_yml_filepath and not Path(self.profiles_yml_filepath).exists():
raise CosmosValueError(f"The file {self.profiles_yml_filepath} does not exist.")
self.validate_profile()

def validate_profile(self) -> None:
"Validates that we have enough information to render a profile."
if not self.profiles_yml_filepath and not self.profile_mapping:
raise CosmosValueError("Either profiles_yml_filepath or profile_mapping must be set to render a profile")
if self.profiles_yml_filepath and self.profile_mapping:
raise CosmosValueError(
"Both profiles_yml_filepath and profile_mapping are defined and are mutually exclusive. Ensure only one of these is defined."
)

def validate_profiles_yml(self) -> None:
"Validates a user-supplied profiles.yml is present"
if self.profiles_yml_filepath and not Path(self.profiles_yml_filepath).exists():
raise CosmosValueError(f"The file {self.profiles_yml_filepath} does not exist.")

@contextlib.contextmanager
def ensure_profile(
Expand Down Expand Up @@ -228,7 +266,7 @@ class ExecutionConfig:

execution_mode: ExecutionMode = ExecutionMode.LOCAL
test_indirect_selection: TestIndirectSelection = TestIndirectSelection.EAGER
dbt_executable_path: str | Path = get_system_dbt()
dbt_executable_path: str | Path = field(default_factory=get_system_dbt)

dbt_project_path: InitVar[str | Path | None] = None

Expand Down
Loading

0 comments on commit bb4d1bd

Please sign in to comment.