Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release 1.4.3 #1035

Merged
merged 4 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/ISSUE_TEMPLATE/01-bug.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
name: Bug Report
description: File a bug report.
title: "[Bug]: "
title: "[Bug] "
labels: ["bug", "triage-needed"]
body:
- type: markdown
Expand Down
3 changes: 2 additions & 1 deletion .github/ISSUE_TEMPLATE/02-feature.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
---
name: Feature request
description: Suggest an idea for this project
labels: ["enhancement", "needs-triage"]
title: "[Feature] "
labels: ["enhancement", "triage-needed"]
body:
- type: markdown
attributes:
Expand Down
16 changes: 16 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,22 @@
Changelog
=========

1.4.3 (2024-06-07)
-----------------

Bug fixes

* Bring back ``dataset`` as a required field for BigQuery profile by @pankajkoti in #1033

Enhancements

* Only run ``dbt deps`` when there are dependencies by @tatiana in #1030

Docs

* Fix docs so it does not reference non-existing ``get_dbt_dataset`` by @tatiana in #1034


1.4.2 (2024-06-06)
------------------

Expand Down
2 changes: 1 addition & 1 deletion cosmos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

Contains dags, task groups, and operators.
"""
__version__ = "1.4.2"
__version__ = "1.4.3"


from cosmos.airflow.dag import DbtDag
Expand Down
1 change: 1 addition & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
DBT_TARGET_DIR_NAME = "target"
DBT_PARTIAL_PARSE_FILE_NAME = "partial_parse.msgpack"
DBT_MANIFEST_FILE_NAME = "manifest.json"
DBT_DEPENDENCIES_FILE_NAMES = {"packages.yml", "dependencies.yml"}
DBT_LOG_FILENAME = "dbt.log"
DBT_BINARY_NAME = "dbt"

Expand Down
6 changes: 4 additions & 2 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
LoadMode,
)
from cosmos.dbt.parser.project import LegacyDbtProject
from cosmos.dbt.project import create_symlinks, environ, get_partial_parse_path
from cosmos.dbt.project import create_symlinks, environ, get_partial_parse_path, has_non_empty_dependencies_file
from cosmos.dbt.selector import select_nodes
from cosmos.log import get_logger

Expand Down Expand Up @@ -285,7 +285,9 @@ def load_via_dbt_ls(self) -> None:
env[DBT_LOG_PATH_ENVVAR] = str(self.log_dir)
env[DBT_TARGET_PATH_ENVVAR] = str(self.target_dir)

if self.render_config.dbt_deps:
if self.render_config.dbt_deps and has_non_empty_dependencies_file(
Path(self.render_config.project_path)
):
deps_command = [dbt_cmd, "deps"]
deps_command.extend(self.local_flags)
stdout = run_command(deps_command, tmpdir_path, env)
Expand Down
30 changes: 29 additions & 1 deletion cosmos/dbt/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,35 @@
from pathlib import Path
from typing import Generator

from cosmos.constants import DBT_LOG_DIR_NAME, DBT_PARTIAL_PARSE_FILE_NAME, DBT_TARGET_DIR_NAME
from cosmos.constants import (
DBT_DEPENDENCIES_FILE_NAMES,
DBT_LOG_DIR_NAME,
DBT_PARTIAL_PARSE_FILE_NAME,
DBT_TARGET_DIR_NAME,
)
from cosmos.log import get_logger

logger = get_logger()


def has_non_empty_dependencies_file(project_path: Path) -> bool:
"""
Check if the dbt project has dependencies.yml or packages.yml.

:param project_path: Path to the project
:returns: True or False
"""
project_dir = Path(project_path)
has_deps = False
for filename in DBT_DEPENDENCIES_FILE_NAMES:
filepath = project_dir / filename
if filepath.exists() and filepath.stat().st_size > 0:
has_deps = True
break

if not has_deps:
logger.info(f"Project {project_path} does not have {DBT_DEPENDENCIES_FILE_NAMES}")
return has_deps


def create_symlinks(project_path: Path, tmp_dir: Path, ignore_dbt_packages: bool) -> None:
Expand Down
6 changes: 4 additions & 2 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from cosmos import cache
from cosmos.constants import InvocationMode
from cosmos.dbt.project import get_partial_parse_path
from cosmos.dbt.project import get_partial_parse_path, has_non_empty_dependencies_file
from cosmos.exceptions import AirflowCompatibilityError
from cosmos.settings import LINEAGE_NAMESPACE

Expand Down Expand Up @@ -126,7 +126,6 @@ def __init__(
**kwargs: Any,
) -> None:
self.profile_config = profile_config
self.install_deps = install_deps
self.callback = callback
self.compiled_sql = ""
self.should_store_compiled_sql = should_store_compiled_sql
Expand All @@ -146,6 +145,9 @@ def __init__(
# as it can break existing DAGs.
self.append_env = append_env

# We should not spend time trying to install deps if the project doesn't have any dependencies
self.install_deps = install_deps and has_non_empty_dependencies_file(Path(self.project_dir))

@cached_property
def subprocess_hook(self) -> FullOutputSubprocessHook:
"""Returns hook for running the bash command."""
Expand Down
3 changes: 3 additions & 0 deletions cosmos/profiles/bigquery/service_account_keyfile_dict.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ class GoogleCloudServiceAccountDictProfileMapping(BaseProfileMapping):
dbt_profile_type: str = "bigquery"
dbt_profile_method: str = "service-account-json"

# Do not remove dataset as a required field form the below list. Although it's observed that it's not a required
# field for some databases like Postgres, it's required for BigQuery.
required_fields = [
"project",
"dataset",
"keyfile_json",
]

Expand Down
19 changes: 12 additions & 7 deletions docs/configuration/scheduling.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,31 @@ To schedule a dbt project on a time-based schedule, you can use Airflow's schedu
Data-Aware Scheduling
---------------------

By default, Cosmos emits `Airflow Datasets <https://airflow.apache.org/docs/apache-airflow/stable/concepts/datasets.html>`_ when running dbt projects. This allows you to use Airflow's data-aware scheduling capabilities to schedule your dbt projects. Cosmos emits datasets in the following format:
Apache Airflow 2.4 introduced the concept of `scheduling based on Datasets <https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/datasets.html>`_.

By default, if Airflow 2.4 or higher is used, Cosmos emits `Airflow Datasets <https://airflow.apache.org/docs/apache-airflow/stable/concepts/datasets.html>`_ when running dbt projects. This allows you to use Airflow's data-aware scheduling capabilities to schedule your dbt projects. Cosmos emits datasets using the OpenLineage URI format, as detailed in the `OpenLineage Naming Convention <https://github.com/OpenLineage/OpenLineage/blob/main/spec/Naming.md>`_.

Cosmos calculates these URIs during the task execution, by using the library `OpenLineage Integration Common <https://pypi.org/project/openlineage-integration-common/>`_.

This block illustrates a Cosmos-generated dataset for Postgres:

.. code-block:: python

Dataset("DBT://{connection_id}/{project_name}/{model_name}")
Dataset("postgres://host:5432/database.schema.table")


For example, let's say you have:

- A dbt project (``project_one``) with a model called ``my_model`` that runs daily
- A second dbt project (``project_two``) with a model called ``my_other_model`` that you want to run immediately after ``my_model``

We are assuming that the Database used is Postgres, the host is ``host``, the database is ``database`` and the schema is ``schema``.

Then, you can use Airflow's data-aware scheduling capabilities to schedule ``my_other_model`` to run after ``my_model``. For example, you can use the following DAGs:

.. code-block:: python

from cosmos import DbtDag, get_dbt_dataset
from cosmos import DbtDag

project_one = DbtDag(
# ...
Expand All @@ -49,10 +57,7 @@ Then, you can use Airflow's data-aware scheduling capabilities to schedule ``my_
)

project_two = DbtDag(
# for airflow <=2.3
# schedule=[get_dbt_dataset("my_conn", "project_one", "my_model")],
# for airflow > 2.3
schedule=[get_dbt_dataset("my_conn", "project_one", "my_model")],
schedule=[Dataset("postgres://host:5432/database.schema.my_model")],
dbt_project_name="project_two",
)

Expand Down
22 changes: 21 additions & 1 deletion tests/dbt/test_project.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
from pathlib import Path
from unittest.mock import patch

from cosmos.dbt.project import change_working_directory, create_symlinks, environ
import pytest

from cosmos.dbt.project import change_working_directory, create_symlinks, environ, has_non_empty_dependencies_file

DBT_PROJECTS_ROOT_DIR = Path(__file__).parent.parent.parent / "dev/dags/dbt"

Expand Down Expand Up @@ -49,3 +51,21 @@ def test_change_working_directory(mock_chdir):

# Check if os.chdir is called with the previous working directory
mock_chdir.assert_called_with(os.getcwd())


@pytest.mark.parametrize("filename", ["packages.yml", "dependencies.yml"])
def test_has_non_empty_dependencies_file_is_true(tmpdir, filename):
filepath = Path(tmpdir) / filename
filepath.write_text("content")
assert has_non_empty_dependencies_file(tmpdir)


@pytest.mark.parametrize("filename", ["packages.yml", "dependencies.yml"])
def test_has_non_empty_dependencies_file_is_false(tmpdir, filename):
filepath = Path(tmpdir) / filename
filepath.touch()
assert not has_non_empty_dependencies_file(tmpdir)


def test_has_non_empty_dependencies_file_is_false_in_empty_dir(tmpdir):
assert not has_non_empty_dependencies_file(tmpdir)
7 changes: 7 additions & 0 deletions tests/operators/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ class ConcreteDbtLocalBaseOperator(DbtLocalBaseOperator):
base_cmd = ["cmd"]


def test_install_deps_in_empty_dir_becomes_false(tmpdir):
dbt_base_operator = ConcreteDbtLocalBaseOperator(
profile_config=profile_config, task_id="my-task", project_dir=tmpdir, install_deps=True
)
assert not dbt_base_operator.install_deps


def test_dbt_base_operator_add_global_flags() -> None:
dbt_base_operator = ConcreteDbtLocalBaseOperator(
profile_config=profile_config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ def test_mock_profile(mock_bigquery_conn_with_dict: Connection):
"type": "bigquery",
"method": "service-account-json",
"project": "mock_value",
"dataset": "mock_value",
"threads": 1,
"keyfile_json": None,
}
Expand Down
Loading