Skip to content

Commit

Permalink
Only run dbt deps when there are dependencies (astronomer#1030)
Browse files Browse the repository at this point in the history
As of Cosmos 1.4, Cosmos will attempt to run dbt deps even if there are
no `dependencies.yml` or `packages.yml` in the dbt project directory.
This causes an unnecessary overhead in creating subprocesses without any
benefit.

This problem was initially spotted by @AlgirdasDubickas, who created a
pull request proposing a solution to the problem:
astronomer#893

Despite the original PR becoming stale, the problem it addresses remains
relevant.

This PR proposes a different implementation to solve the same problem.
It addresses the issue from a rendering perspective (converting a dbt
project into an Airflow DAG using `LoadMode.DBT_LS`) and an execution
perspective (when Airflow worker nodes run/trigger dbt commands to be
run when using `ExecutionMode.LOCAL` or `ExecutionMode.VIRTUALENV`).

Co-authored-by: AlgirdasDubickas <[email protected]>
  • Loading branch information
2 people authored and arojasb3 committed Jul 14, 2024
1 parent 59b574e commit 7b5dff8
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 6 deletions.
1 change: 1 addition & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
DBT_TARGET_DIR_NAME = "target"
DBT_PARTIAL_PARSE_FILE_NAME = "partial_parse.msgpack"
DBT_MANIFEST_FILE_NAME = "manifest.json"
DBT_DEPENDENCIES_FILE_NAMES = {"packages.yml", "dependencies.yml"}
DBT_LOG_FILENAME = "dbt.log"
DBT_BINARY_NAME = "dbt"

Expand Down
6 changes: 4 additions & 2 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
LoadMode,
)
from cosmos.dbt.parser.project import LegacyDbtProject
from cosmos.dbt.project import create_symlinks, environ, get_partial_parse_path
from cosmos.dbt.project import create_symlinks, environ, get_partial_parse_path, has_non_empty_dependencies_file
from cosmos.dbt.selector import select_nodes
from cosmos.log import get_logger

Expand Down Expand Up @@ -320,7 +320,9 @@ def load_via_dbt_ls(self) -> None:
env[DBT_LOG_PATH_ENVVAR] = str(self.log_dir)
env[DBT_TARGET_PATH_ENVVAR] = str(self.target_dir)

if self.render_config.dbt_deps:
if self.render_config.dbt_deps and has_non_empty_dependencies_file(
Path(self.render_config.project_path)
):
deps_command = [dbt_cmd, "deps"]
deps_command.extend(self.local_flags)
stdout = run_command(deps_command, tmpdir_path, env)
Expand Down
30 changes: 29 additions & 1 deletion cosmos/dbt/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,35 @@
from pathlib import Path
from typing import Generator

from cosmos.constants import DBT_LOG_DIR_NAME, DBT_PARTIAL_PARSE_FILE_NAME, DBT_TARGET_DIR_NAME
from cosmos.constants import (
DBT_DEPENDENCIES_FILE_NAMES,
DBT_LOG_DIR_NAME,
DBT_PARTIAL_PARSE_FILE_NAME,
DBT_TARGET_DIR_NAME,
)
from cosmos.log import get_logger

logger = get_logger()


def has_non_empty_dependencies_file(project_path: Path) -> bool:
"""
Check if the dbt project has dependencies.yml or packages.yml.
:param project_path: Path to the project
:returns: True or False
"""
project_dir = Path(project_path)
has_deps = False
for filename in DBT_DEPENDENCIES_FILE_NAMES:
filepath = project_dir / filename
if filepath.exists() and filepath.stat().st_size > 0:
has_deps = True
break

if not has_deps:
logger.info(f"Project {project_path} does not have {DBT_DEPENDENCIES_FILE_NAMES}")
return has_deps


def create_symlinks(project_path: Path, tmp_dir: Path, ignore_dbt_packages: bool) -> None:
Expand Down
6 changes: 4 additions & 2 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from cosmos import cache
from cosmos.constants import InvocationMode
from cosmos.dbt.project import get_partial_parse_path
from cosmos.dbt.project import get_partial_parse_path, has_non_empty_dependencies_file
from cosmos.exceptions import AirflowCompatibilityError
from cosmos.settings import LINEAGE_NAMESPACE

Expand Down Expand Up @@ -126,7 +126,6 @@ def __init__(
**kwargs: Any,
) -> None:
self.profile_config = profile_config
self.install_deps = install_deps
self.callback = callback
self.compiled_sql = ""
self.should_store_compiled_sql = should_store_compiled_sql
Expand All @@ -146,6 +145,9 @@ def __init__(
# as it can break existing DAGs.
self.append_env = append_env

# We should not spend time trying to install deps if the project doesn't have any dependencies
self.install_deps = install_deps and has_non_empty_dependencies_file(Path(self.project_dir))

@cached_property
def subprocess_hook(self) -> FullOutputSubprocessHook:
"""Returns hook for running the bash command."""
Expand Down
22 changes: 21 additions & 1 deletion tests/dbt/test_project.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
from pathlib import Path
from unittest.mock import patch

from cosmos.dbt.project import change_working_directory, create_symlinks, environ
import pytest

from cosmos.dbt.project import change_working_directory, create_symlinks, environ, has_non_empty_dependencies_file

DBT_PROJECTS_ROOT_DIR = Path(__file__).parent.parent.parent / "dev/dags/dbt"

Expand Down Expand Up @@ -49,3 +51,21 @@ def test_change_working_directory(mock_chdir):

# Check if os.chdir is called with the previous working directory
mock_chdir.assert_called_with(os.getcwd())


@pytest.mark.parametrize("filename", ["packages.yml", "dependencies.yml"])
def test_has_non_empty_dependencies_file_is_true(tmpdir, filename):
filepath = Path(tmpdir) / filename
filepath.write_text("content")
assert has_non_empty_dependencies_file(tmpdir)


@pytest.mark.parametrize("filename", ["packages.yml", "dependencies.yml"])
def test_has_non_empty_dependencies_file_is_false(tmpdir, filename):
filepath = Path(tmpdir) / filename
filepath.touch()
assert not has_non_empty_dependencies_file(tmpdir)


def test_has_non_empty_dependencies_file_is_false_in_empty_dir(tmpdir):
assert not has_non_empty_dependencies_file(tmpdir)
7 changes: 7 additions & 0 deletions tests/operators/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ class ConcreteDbtLocalBaseOperator(DbtLocalBaseOperator):
base_cmd = ["cmd"]


def test_install_deps_in_empty_dir_becomes_false(tmpdir):
dbt_base_operator = ConcreteDbtLocalBaseOperator(
profile_config=profile_config, task_id="my-task", project_dir=tmpdir, install_deps=True
)
assert not dbt_base_operator.install_deps


def test_dbt_base_operator_add_global_flags() -> None:
dbt_base_operator = ConcreteDbtLocalBaseOperator(
profile_config=profile_config,
Expand Down

0 comments on commit 7b5dff8

Please sign in to comment.