diff --git a/cosmos/constants.py b/cosmos/constants.py index b356d5542..92bf883b2 100644 --- a/cosmos/constants.py +++ b/cosmos/constants.py @@ -15,6 +15,7 @@ DBT_TARGET_DIR_NAME = "target" DBT_PARTIAL_PARSE_FILE_NAME = "partial_parse.msgpack" DBT_MANIFEST_FILE_NAME = "manifest.json" +DBT_DEPENDENCIES_FILE_NAMES = {"packages.yml", "dependencies.yml"} DBT_LOG_FILENAME = "dbt.log" DBT_BINARY_NAME = "dbt" diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index 40a44d3a6..9ad8caaee 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -24,7 +24,7 @@ LoadMode, ) from cosmos.dbt.parser.project import LegacyDbtProject -from cosmos.dbt.project import create_symlinks, environ, get_partial_parse_path +from cosmos.dbt.project import create_symlinks, environ, get_partial_parse_path, has_non_empty_dependencies_file from cosmos.dbt.selector import select_nodes from cosmos.log import get_logger @@ -320,7 +320,9 @@ def load_via_dbt_ls(self) -> None: env[DBT_LOG_PATH_ENVVAR] = str(self.log_dir) env[DBT_TARGET_PATH_ENVVAR] = str(self.target_dir) - if self.render_config.dbt_deps: + if self.render_config.dbt_deps and has_non_empty_dependencies_file( + Path(self.render_config.project_path) + ): deps_command = [dbt_cmd, "deps"] deps_command.extend(self.local_flags) stdout = run_command(deps_command, tmpdir_path, env) diff --git a/cosmos/dbt/project.py b/cosmos/dbt/project.py index c1c7aa080..d8750cd44 100644 --- a/cosmos/dbt/project.py +++ b/cosmos/dbt/project.py @@ -5,7 +5,35 @@ from pathlib import Path from typing import Generator -from cosmos.constants import DBT_LOG_DIR_NAME, DBT_PARTIAL_PARSE_FILE_NAME, DBT_TARGET_DIR_NAME +from cosmos.constants import ( + DBT_DEPENDENCIES_FILE_NAMES, + DBT_LOG_DIR_NAME, + DBT_PARTIAL_PARSE_FILE_NAME, + DBT_TARGET_DIR_NAME, +) +from cosmos.log import get_logger + +logger = get_logger() + + +def has_non_empty_dependencies_file(project_path: Path) -> bool: + """ + Check if the dbt project has dependencies.yml or packages.yml. + + :param project_path: Path to the project + :returns: True or False + """ + project_dir = Path(project_path) + has_deps = False + for filename in DBT_DEPENDENCIES_FILE_NAMES: + filepath = project_dir / filename + if filepath.exists() and filepath.stat().st_size > 0: + has_deps = True + break + + if not has_deps: + logger.info(f"Project {project_path} does not have {DBT_DEPENDENCIES_FILE_NAMES}") + return has_deps def create_symlinks(project_path: Path, tmp_dir: Path, ignore_dbt_packages: bool) -> None: diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 703b1fb1e..232a6680d 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -18,7 +18,7 @@ from cosmos import cache from cosmos.constants import InvocationMode -from cosmos.dbt.project import get_partial_parse_path +from cosmos.dbt.project import get_partial_parse_path, has_non_empty_dependencies_file from cosmos.exceptions import AirflowCompatibilityError from cosmos.settings import LINEAGE_NAMESPACE @@ -126,7 +126,6 @@ def __init__( **kwargs: Any, ) -> None: self.profile_config = profile_config - self.install_deps = install_deps self.callback = callback self.compiled_sql = "" self.should_store_compiled_sql = should_store_compiled_sql @@ -146,6 +145,9 @@ def __init__( # as it can break existing DAGs. self.append_env = append_env + # We should not spend time trying to install deps if the project doesn't have any dependencies + self.install_deps = install_deps and has_non_empty_dependencies_file(Path(self.project_dir)) + @cached_property def subprocess_hook(self) -> FullOutputSubprocessHook: """Returns hook for running the bash command.""" diff --git a/tests/dbt/test_project.py b/tests/dbt/test_project.py index f55525a43..df625182f 100644 --- a/tests/dbt/test_project.py +++ b/tests/dbt/test_project.py @@ -2,7 +2,9 @@ from pathlib import Path from unittest.mock import patch -from cosmos.dbt.project import change_working_directory, create_symlinks, environ +import pytest + +from cosmos.dbt.project import change_working_directory, create_symlinks, environ, has_non_empty_dependencies_file DBT_PROJECTS_ROOT_DIR = Path(__file__).parent.parent.parent / "dev/dags/dbt" @@ -49,3 +51,21 @@ def test_change_working_directory(mock_chdir): # Check if os.chdir is called with the previous working directory mock_chdir.assert_called_with(os.getcwd()) + + +@pytest.mark.parametrize("filename", ["packages.yml", "dependencies.yml"]) +def test_has_non_empty_dependencies_file_is_true(tmpdir, filename): + filepath = Path(tmpdir) / filename + filepath.write_text("content") + assert has_non_empty_dependencies_file(tmpdir) + + +@pytest.mark.parametrize("filename", ["packages.yml", "dependencies.yml"]) +def test_has_non_empty_dependencies_file_is_false(tmpdir, filename): + filepath = Path(tmpdir) / filename + filepath.touch() + assert not has_non_empty_dependencies_file(tmpdir) + + +def test_has_non_empty_dependencies_file_is_false_in_empty_dir(tmpdir): + assert not has_non_empty_dependencies_file(tmpdir) diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 5513b1c4b..f90237082 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -80,6 +80,13 @@ class ConcreteDbtLocalBaseOperator(DbtLocalBaseOperator): base_cmd = ["cmd"] +def test_install_deps_in_empty_dir_becomes_false(tmpdir): + dbt_base_operator = ConcreteDbtLocalBaseOperator( + profile_config=profile_config, task_id="my-task", project_dir=tmpdir, install_deps=True + ) + assert not dbt_base_operator.install_deps + + def test_dbt_base_operator_add_global_flags() -> None: dbt_base_operator = ConcreteDbtLocalBaseOperator( profile_config=profile_config,