diff --git a/.github/ISSUE_TEMPLATE/01-bug.yml b/.github/ISSUE_TEMPLATE/01-bug.yml index 658d0b9cb..4a5517338 100644 --- a/.github/ISSUE_TEMPLATE/01-bug.yml +++ b/.github/ISSUE_TEMPLATE/01-bug.yml @@ -1,7 +1,7 @@ --- name: Bug Report description: File a bug report. -title: "[Bug]: " +title: "[Bug] " labels: ["bug", "triage-needed"] body: - type: markdown diff --git a/.github/ISSUE_TEMPLATE/02-feature.yml b/.github/ISSUE_TEMPLATE/02-feature.yml index e179d357d..f8cd9e24d 100644 --- a/.github/ISSUE_TEMPLATE/02-feature.yml +++ b/.github/ISSUE_TEMPLATE/02-feature.yml @@ -1,7 +1,8 @@ --- name: Feature request description: Suggest an idea for this project -labels: ["enhancement", "needs-triage"] +title: "[Feature] " +labels: ["enhancement", "triage-needed"] body: - type: markdown attributes: diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 2e6331eda..7757d5fb5 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,6 +1,22 @@ Changelog ========= +1.4.3 (2024-06-07) +----------------- + +Bug fixes + +* Bring back ``dataset`` as a required field for BigQuery profile by @pankajkoti in #1033 + +Enhancements + +* Only run ``dbt deps`` when there are dependencies by @tatiana in #1030 + +Docs + +* Fix docs so it does not reference non-existing ``get_dbt_dataset`` by @tatiana in #1034 + + 1.4.2 (2024-06-06) ------------------ diff --git a/cosmos/__init__.py b/cosmos/__init__.py index 7a73e722e..100649bfb 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -5,7 +5,7 @@ Contains dags, task groups, and operators. """ -__version__ = "1.4.2" +__version__ = "1.4.3" from cosmos.airflow.dag import DbtDag diff --git a/cosmos/constants.py b/cosmos/constants.py index b356d5542..92bf883b2 100644 --- a/cosmos/constants.py +++ b/cosmos/constants.py @@ -15,6 +15,7 @@ DBT_TARGET_DIR_NAME = "target" DBT_PARTIAL_PARSE_FILE_NAME = "partial_parse.msgpack" DBT_MANIFEST_FILE_NAME = "manifest.json" +DBT_DEPENDENCIES_FILE_NAMES = {"packages.yml", "dependencies.yml"} DBT_LOG_FILENAME = "dbt.log" DBT_BINARY_NAME = "dbt" diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index c37932caf..b38469b08 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -24,7 +24,7 @@ LoadMode, ) from cosmos.dbt.parser.project import LegacyDbtProject -from cosmos.dbt.project import create_symlinks, environ, get_partial_parse_path +from cosmos.dbt.project import create_symlinks, environ, get_partial_parse_path, has_non_empty_dependencies_file from cosmos.dbt.selector import select_nodes from cosmos.log import get_logger @@ -285,7 +285,9 @@ def load_via_dbt_ls(self) -> None: env[DBT_LOG_PATH_ENVVAR] = str(self.log_dir) env[DBT_TARGET_PATH_ENVVAR] = str(self.target_dir) - if self.render_config.dbt_deps: + if self.render_config.dbt_deps and has_non_empty_dependencies_file( + Path(self.render_config.project_path) + ): deps_command = [dbt_cmd, "deps"] deps_command.extend(self.local_flags) stdout = run_command(deps_command, tmpdir_path, env) diff --git a/cosmos/dbt/project.py b/cosmos/dbt/project.py index c1c7aa080..d8750cd44 100644 --- a/cosmos/dbt/project.py +++ b/cosmos/dbt/project.py @@ -5,7 +5,35 @@ from pathlib import Path from typing import Generator -from cosmos.constants import DBT_LOG_DIR_NAME, DBT_PARTIAL_PARSE_FILE_NAME, DBT_TARGET_DIR_NAME +from cosmos.constants import ( + DBT_DEPENDENCIES_FILE_NAMES, + DBT_LOG_DIR_NAME, + DBT_PARTIAL_PARSE_FILE_NAME, + DBT_TARGET_DIR_NAME, +) +from cosmos.log import get_logger + +logger = get_logger() + + +def has_non_empty_dependencies_file(project_path: Path) -> bool: + """ + Check if the dbt project has dependencies.yml or packages.yml. + + :param project_path: Path to the project + :returns: True or False + """ + project_dir = Path(project_path) + has_deps = False + for filename in DBT_DEPENDENCIES_FILE_NAMES: + filepath = project_dir / filename + if filepath.exists() and filepath.stat().st_size > 0: + has_deps = True + break + + if not has_deps: + logger.info(f"Project {project_path} does not have {DBT_DEPENDENCIES_FILE_NAMES}") + return has_deps def create_symlinks(project_path: Path, tmp_dir: Path, ignore_dbt_packages: bool) -> None: diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index c62f708e8..27d377916 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -18,7 +18,7 @@ from cosmos import cache from cosmos.constants import InvocationMode -from cosmos.dbt.project import get_partial_parse_path +from cosmos.dbt.project import get_partial_parse_path, has_non_empty_dependencies_file from cosmos.exceptions import AirflowCompatibilityError from cosmos.settings import LINEAGE_NAMESPACE @@ -126,7 +126,6 @@ def __init__( **kwargs: Any, ) -> None: self.profile_config = profile_config - self.install_deps = install_deps self.callback = callback self.compiled_sql = "" self.should_store_compiled_sql = should_store_compiled_sql @@ -146,6 +145,9 @@ def __init__( # as it can break existing DAGs. self.append_env = append_env + # We should not spend time trying to install deps if the project doesn't have any dependencies + self.install_deps = install_deps and has_non_empty_dependencies_file(Path(self.project_dir)) + @cached_property def subprocess_hook(self) -> FullOutputSubprocessHook: """Returns hook for running the bash command.""" diff --git a/cosmos/profiles/bigquery/service_account_keyfile_dict.py b/cosmos/profiles/bigquery/service_account_keyfile_dict.py index 17858d7bb..480db669b 100644 --- a/cosmos/profiles/bigquery/service_account_keyfile_dict.py +++ b/cosmos/profiles/bigquery/service_account_keyfile_dict.py @@ -20,8 +20,11 @@ class GoogleCloudServiceAccountDictProfileMapping(BaseProfileMapping): dbt_profile_type: str = "bigquery" dbt_profile_method: str = "service-account-json" + # Do not remove dataset as a required field form the below list. Although it's observed that it's not a required + # field for some databases like Postgres, it's required for BigQuery. required_fields = [ "project", + "dataset", "keyfile_json", ] diff --git a/docs/configuration/scheduling.rst b/docs/configuration/scheduling.rst index a1275ee19..d96930395 100644 --- a/docs/configuration/scheduling.rst +++ b/docs/configuration/scheduling.rst @@ -24,11 +24,17 @@ To schedule a dbt project on a time-based schedule, you can use Airflow's schedu Data-Aware Scheduling --------------------- -By default, Cosmos emits `Airflow Datasets `_ when running dbt projects. This allows you to use Airflow's data-aware scheduling capabilities to schedule your dbt projects. Cosmos emits datasets in the following format: +Apache Airflow 2.4 introduced the concept of `scheduling based on Datasets `_. + +By default, if Airflow 2.4 or higher is used, Cosmos emits `Airflow Datasets `_ when running dbt projects. This allows you to use Airflow's data-aware scheduling capabilities to schedule your dbt projects. Cosmos emits datasets using the OpenLineage URI format, as detailed in the `OpenLineage Naming Convention `_. + +Cosmos calculates these URIs during the task execution, by using the library `OpenLineage Integration Common `_. + +This block illustrates a Cosmos-generated dataset for Postgres: .. code-block:: python - Dataset("DBT://{connection_id}/{project_name}/{model_name}") + Dataset("postgres://host:5432/database.schema.table") For example, let's say you have: @@ -36,11 +42,13 @@ For example, let's say you have: - A dbt project (``project_one``) with a model called ``my_model`` that runs daily - A second dbt project (``project_two``) with a model called ``my_other_model`` that you want to run immediately after ``my_model`` +We are assuming that the Database used is Postgres, the host is ``host``, the database is ``database`` and the schema is ``schema``. + Then, you can use Airflow's data-aware scheduling capabilities to schedule ``my_other_model`` to run after ``my_model``. For example, you can use the following DAGs: .. code-block:: python - from cosmos import DbtDag, get_dbt_dataset + from cosmos import DbtDag project_one = DbtDag( # ... @@ -49,10 +57,7 @@ Then, you can use Airflow's data-aware scheduling capabilities to schedule ``my_ ) project_two = DbtDag( - # for airflow <=2.3 - # schedule=[get_dbt_dataset("my_conn", "project_one", "my_model")], - # for airflow > 2.3 - schedule=[get_dbt_dataset("my_conn", "project_one", "my_model")], + schedule=[Dataset("postgres://host:5432/database.schema.my_model")], dbt_project_name="project_two", ) diff --git a/tests/dbt/test_project.py b/tests/dbt/test_project.py index f55525a43..df625182f 100644 --- a/tests/dbt/test_project.py +++ b/tests/dbt/test_project.py @@ -2,7 +2,9 @@ from pathlib import Path from unittest.mock import patch -from cosmos.dbt.project import change_working_directory, create_symlinks, environ +import pytest + +from cosmos.dbt.project import change_working_directory, create_symlinks, environ, has_non_empty_dependencies_file DBT_PROJECTS_ROOT_DIR = Path(__file__).parent.parent.parent / "dev/dags/dbt" @@ -49,3 +51,21 @@ def test_change_working_directory(mock_chdir): # Check if os.chdir is called with the previous working directory mock_chdir.assert_called_with(os.getcwd()) + + +@pytest.mark.parametrize("filename", ["packages.yml", "dependencies.yml"]) +def test_has_non_empty_dependencies_file_is_true(tmpdir, filename): + filepath = Path(tmpdir) / filename + filepath.write_text("content") + assert has_non_empty_dependencies_file(tmpdir) + + +@pytest.mark.parametrize("filename", ["packages.yml", "dependencies.yml"]) +def test_has_non_empty_dependencies_file_is_false(tmpdir, filename): + filepath = Path(tmpdir) / filename + filepath.touch() + assert not has_non_empty_dependencies_file(tmpdir) + + +def test_has_non_empty_dependencies_file_is_false_in_empty_dir(tmpdir): + assert not has_non_empty_dependencies_file(tmpdir) diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 5513b1c4b..f90237082 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -80,6 +80,13 @@ class ConcreteDbtLocalBaseOperator(DbtLocalBaseOperator): base_cmd = ["cmd"] +def test_install_deps_in_empty_dir_becomes_false(tmpdir): + dbt_base_operator = ConcreteDbtLocalBaseOperator( + profile_config=profile_config, task_id="my-task", project_dir=tmpdir, install_deps=True + ) + assert not dbt_base_operator.install_deps + + def test_dbt_base_operator_add_global_flags() -> None: dbt_base_operator = ConcreteDbtLocalBaseOperator( profile_config=profile_config, diff --git a/tests/profiles/bigquery/test_bq_service_account_keyfile_dict.py b/tests/profiles/bigquery/test_bq_service_account_keyfile_dict.py index 6f0d60b8d..d30c90021 100755 --- a/tests/profiles/bigquery/test_bq_service_account_keyfile_dict.py +++ b/tests/profiles/bigquery/test_bq_service_account_keyfile_dict.py @@ -96,6 +96,7 @@ def test_mock_profile(mock_bigquery_conn_with_dict: Connection): "type": "bigquery", "method": "service-account-json", "project": "mock_value", + "dataset": "mock_value", "threads": 1, "keyfile_json": None, }