Skip to content

Commit

Permalink
Fix 'Unable to find the dbt executable: dbt' error (#666)
Browse files Browse the repository at this point in the history
Since Cosmos 1.2.2 users who used `ExecutionMode.DBT_LS` (directly or
via `ExecutionMode.AUTOMATIC`) and set
`ExecutionConfig.dbt_executable_path` (most, if not all, Astro CLI
users), like:

```
execution_config = ExecutionConfig(
    dbt_executable_path = f"{os.environ['AIRFLOW_HOME']}/dbt_venv/bin/dbt",
)
```

Started facing the issue:
```
Broken DAG: [/usr/local/airflow/dags/my_example.py] Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/cosmos/dbt/graph.py", line 178, in load
    self.load_via_dbt_ls()
  File "/usr/local/lib/python3.11/site-packages/cosmos/dbt/graph.py", line 233, in load_via_dbt_ls
    raise CosmosLoadDbtException(f"Unable to find the dbt executable: {self.dbt_cmd}")
cosmos.dbt.graph.CosmosLoadDbtException: Unable to find the dbt executable: dbt
```

This issue was initially reported in the Airflow #airflow-astronomer
Slack channel:
https://apache-airflow.slack.com/archives/C03T0AVNA6A/p1699584315506629

The workaround to avoid this error in Cosmos 1.2.2 and 1.2.3 is to set
the `dbt_executable_path` in the `RenderConfig`:
```
render_config=RenderConfig(dbt_executable_path = f"{os.environ['AIRFLOW_HOME']}/dbt_venv/bin/dbt",),
```

This PR solves the bug from Cosmos 1.2.4 onwards.

(cherry picked from commit 2b61364)
  • Loading branch information
tatiana committed Nov 15, 2023
1 parent 1eb931e commit 50df27c
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 34 deletions.
30 changes: 30 additions & 0 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations

import contextlib
import shutil
import tempfile
from dataclasses import InitVar, dataclass, field
from pathlib import Path
Expand All @@ -19,6 +20,14 @@
DEFAULT_PROFILES_FILE_NAME = "profiles.yml"


class CosmosConfigException(Exception):
"""
Exceptions related to user misconfiguration.
"""

pass


@dataclass
class RenderConfig:
"""
Expand Down Expand Up @@ -51,6 +60,27 @@ class RenderConfig:
def __post_init__(self, dbt_project_path: str | Path | None) -> None:
self.project_path = Path(dbt_project_path) if dbt_project_path else None

def validate_dbt_command(self, fallback_cmd: str | Path = "") -> None:
"""
When using LoadMode.DBT_LS, the dbt executable path is necessary for rendering.
Validates that the original dbt command works, if not, attempt to use the fallback_dbt_cmd.
If neither works, raise an exception.
The fallback behaviour is necessary for Cosmos < 1.2.2 backwards compatibility.
"""
if not shutil.which(self.dbt_executable_path):
if isinstance(fallback_cmd, Path):
fallback_cmd = fallback_cmd.as_posix()

if fallback_cmd and shutil.which(fallback_cmd):
self.dbt_executable_path = fallback_cmd
else:
raise CosmosConfigException(
"Unable to find the dbt executable, attempted: "
f"<{self.dbt_executable_path}>" + (f" and <{fallback_cmd}>." if fallback_cmd else ".")
)


class ProjectConfig:
"""
Expand Down
1 change: 0 additions & 1 deletion cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ def __init__(
project=project_config,
render_config=render_config,
execution_config=execution_config,
dbt_cmd=render_config.dbt_executable_path,
profile_config=profile_config,
operator_args=operator_args,
)
Expand Down
30 changes: 10 additions & 20 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import itertools
import json
import os
import shutil
import tempfile
from dataclasses import dataclass, field
from pathlib import Path
Expand All @@ -21,7 +20,6 @@
ExecutionMode,
LoadMode,
)
from cosmos.dbt.executable import get_system_dbt
from cosmos.dbt.parser.project import LegacyDbtProject
from cosmos.dbt.selector import select_nodes
from cosmos.log import get_logger
Expand Down Expand Up @@ -117,15 +115,6 @@ class DbtGraph:
Supports different ways of loading the `dbt` project into this representation.
Different loading methods can result in different `nodes` and `filtered_nodes`.
Example of how to use:
dbt_graph = DbtGraph(
project=ProjectConfig(dbt_project_path=DBT_PROJECT_PATH),
render_config=RenderConfig(exclude=["*orders*"], select=[]),
dbt_cmd="/usr/local/bin/dbt"
)
dbt_graph.load(method=LoadMode.DBT_LS, execution_mode=ExecutionMode.LOCAL)
"""

nodes: dict[str, DbtNode] = dict()
Expand All @@ -137,15 +126,13 @@ def __init__(
render_config: RenderConfig = RenderConfig(),
execution_config: ExecutionConfig = ExecutionConfig(),
profile_config: ProfileConfig | None = None,
dbt_cmd: str = get_system_dbt(),
operator_args: dict[str, Any] | None = None,
):
self.project = project
self.render_config = render_config
self.profile_config = profile_config
self.execution_config = execution_config
self.operator_args = operator_args or {}
self.dbt_cmd = dbt_cmd

def load(
self,
Expand Down Expand Up @@ -183,9 +170,11 @@ def load(
else:
load_method[method]()

def run_dbt_ls(self, project_path: Path, tmp_dir: Path, env_vars: dict[str, str]) -> dict[str, DbtNode]:
def run_dbt_ls(
self, dbt_cmd: str, project_path: Path, tmp_dir: Path, env_vars: dict[str, str]
) -> dict[str, DbtNode]:
"""Runs dbt ls command and returns the parsed nodes."""
ls_command = [self.dbt_cmd, "ls", "--output", "json"]
ls_command = [dbt_cmd, "ls", "--output", "json"]

if self.render_config.exclude:
ls_command.extend(["--exclude", *self.render_config.exclude])
Expand Down Expand Up @@ -220,6 +209,10 @@ def load_via_dbt_ls(self) -> None:
* self.nodes
* self.filtered_nodes
"""
self.render_config.validate_dbt_command(fallback_cmd=self.execution_config.dbt_executable_path)
dbt_cmd = self.render_config.dbt_executable_path
dbt_cmd = dbt_cmd.as_posix() if isinstance(dbt_cmd, Path) else dbt_cmd

logger.info(f"Trying to parse the dbt project in `{self.render_config.project_path}` using dbt ls...")
if not self.render_config.project_path or not self.execution_config.project_path:
raise CosmosLoadDbtException(
Expand All @@ -229,9 +222,6 @@ def load_via_dbt_ls(self) -> None:
if not self.profile_config:
raise CosmosLoadDbtException("Unable to load project via dbt ls without a profile config.")

if not shutil.which(self.dbt_cmd):
raise CosmosLoadDbtException(f"Unable to find the dbt executable: {self.dbt_cmd}")

with tempfile.TemporaryDirectory() as tmpdir:
logger.info(
f"Content of the dbt project dir {self.render_config.project_path}: `{os.listdir(self.render_config.project_path)}`"
Expand Down Expand Up @@ -260,12 +250,12 @@ def load_via_dbt_ls(self) -> None:
env[DBT_TARGET_PATH_ENVVAR] = str(self.target_dir)

if self.render_config.dbt_deps:
deps_command = [self.dbt_cmd, "deps"]
deps_command = [dbt_cmd, "deps"]
deps_command.extend(self.local_flags)
stdout = run_command(deps_command, tmpdir_path, env)
logger.debug("dbt deps output: %s", stdout)

nodes = self.run_dbt_ls(self.execution_config.project_path, tmpdir_path, env)
nodes = self.run_dbt_ls(dbt_cmd, self.execution_config.project_path, tmpdir_path, env)

self.nodes = nodes
self.filtered_nodes = nodes
Expand Down
22 changes: 12 additions & 10 deletions tests/dbt/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import pytest

from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig
from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig, CosmosConfigException
from cosmos.constants import DbtResourceType, ExecutionMode
from cosmos.dbt.graph import (
CosmosLoadDbtException,
Expand Down Expand Up @@ -312,9 +312,8 @@ def test_load_via_dbt_ls_without_exclude(project_name):
def test_load_via_custom_without_project_path():
project_config = ProjectConfig(manifest_path=SAMPLE_MANIFEST, project_name="test")
execution_config = ExecutionConfig()
render_config = RenderConfig()
render_config = RenderConfig(dbt_executable_path="/inexistent/dbt")
dbt_graph = DbtGraph(
dbt_cmd="/inexistent/dbt",
project=project_config,
execution_config=execution_config,
render_config=render_config,
Expand All @@ -326,12 +325,14 @@ def test_load_via_custom_without_project_path():
assert err_info.value.args[0] == expected


def test_load_via_dbt_ls_without_profile():
@patch("cosmos.config.RenderConfig.validate_dbt_command", return_value=None)
def test_load_via_dbt_ls_without_profile(mock_validate_dbt_command):
project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME)
execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME)
render_config = RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME)
render_config = RenderConfig(
dbt_executable_path="existing-dbt-cmd", dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME
)
dbt_graph = DbtGraph(
dbt_cmd="/inexistent/dbt",
project=project_config,
execution_config=execution_config,
render_config=render_config,
Expand All @@ -346,10 +347,11 @@ def test_load_via_dbt_ls_without_profile():
def test_load_via_dbt_ls_with_invalid_dbt_path():
project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME)
execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME)
render_config = RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME)
render_config = RenderConfig(
dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, dbt_executable_path="/inexistent/dbt"
)
with patch("pathlib.Path.exists", return_value=True):
dbt_graph = DbtGraph(
dbt_cmd="/inexistent/dbt",
project=project_config,
execution_config=execution_config,
render_config=render_config,
Expand All @@ -359,10 +361,10 @@ def test_load_via_dbt_ls_with_invalid_dbt_path():
profiles_yml_filepath=Path(__file__).parent.parent / "sample/profiles.yml",
),
)
with pytest.raises(CosmosLoadDbtException) as err_info:
with pytest.raises(CosmosConfigException) as err_info:
dbt_graph.load_via_dbt_ls()

expected = "Unable to find the dbt executable: /inexistent/dbt"
expected = "Unable to find the dbt executable, attempted: </inexistent/dbt> and <dbt>."
assert err_info.value.args[0] == expected


Expand Down
38 changes: 37 additions & 1 deletion tests/test_config.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from pathlib import Path
from unittest.mock import patch

import pytest

from cosmos.config import ProfileConfig, ProjectConfig
from cosmos.config import ProfileConfig, ProjectConfig, RenderConfig, CosmosConfigException
from cosmos.exceptions import CosmosValueError


Expand Down Expand Up @@ -121,3 +122,38 @@ def test_profile_config_validate():
profile_config = ProfileConfig(profile_name="test", target_name="test")
assert profile_config.validate_profile() is None
assert err_info.value.args[0] == "Either profiles_yml_filepath or profile_mapping must be set to render a profile"


@patch("cosmos.config.shutil.which", return_value=None)
def test_render_config_without_dbt_cmd(mock_which):
render_config = RenderConfig()
with pytest.raises(CosmosConfigException) as err_info:
render_config.validate_dbt_command("inexistent-dbt")

error_msg = err_info.value.args[0]
assert error_msg.startswith("Unable to find the dbt executable, attempted: <")
assert error_msg.endswith("dbt> and <inexistent-dbt>.")


@patch("cosmos.config.shutil.which", return_value=None)
def test_render_config_with_invalid_dbt_commands(mock_which):
render_config = RenderConfig(dbt_executable_path="invalid-dbt")
with pytest.raises(CosmosConfigException) as err_info:
render_config.validate_dbt_command()

error_msg = err_info.value.args[0]
assert error_msg == "Unable to find the dbt executable, attempted: <invalid-dbt>."


@patch("cosmos.config.shutil.which", side_effect=(None, "fallback-dbt-path"))
def test_render_config_uses_fallback_if_default_not_found(mock_which):
render_config = RenderConfig()
render_config.validate_dbt_command(Path("/tmp/fallback-dbt-path"))
assert render_config.dbt_executable_path == "/tmp/fallback-dbt-path"


@patch("cosmos.config.shutil.which", side_effect=("user-dbt", "fallback-dbt-path"))
def test_render_config_uses_default_if_exists(mock_which):
render_config = RenderConfig(dbt_executable_path="user-dbt")
render_config.validate_dbt_command("fallback-dbt-path")
assert render_config.dbt_executable_path == "user-dbt"
74 changes: 72 additions & 2 deletions tests/test_converter.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from datetime import datetime
from pathlib import Path

from unittest.mock import patch

import pytest
from airflow.models import DAG

from cosmos.converter import DbtToAirflowConverter, validate_arguments
from cosmos.constants import DbtResourceType, ExecutionMode
from cosmos.config import ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from cosmos.config import ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig, CosmosConfigException
from cosmos.dbt.graph import DbtNode
from cosmos.exceptions import CosmosValueError

Expand Down Expand Up @@ -141,6 +143,74 @@ def test_converter_fails_execution_config_no_project_dir(mock_load_dbt_graph, ex
)


def test_converter_fails_render_config_invalid_dbt_path_with_dbt_ls():
"""
Validate that a dbt project fails to be rendered to Airflow with DBT_LS if
the dbt command is invalid.
"""
project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT.as_posix(), project_name="sample")
execution_config = ExecutionConfig(
execution_mode=ExecutionMode.LOCAL,
dbt_executable_path="invalid-execution-dbt",
)
render_config = RenderConfig(
emit_datasets=True,
dbt_executable_path="invalid-render-dbt",
)
profile_config = ProfileConfig(
profile_name="my_profile_name",
target_name="my_target_name",
profiles_yml_filepath=SAMPLE_PROFILE_YML,
)
with pytest.raises(CosmosConfigException) as err_info:
with DAG("test-id", start_date=datetime(2022, 1, 1)) as dag:
DbtToAirflowConverter(
dag=dag,
nodes=nodes,
project_config=project_config,
profile_config=profile_config,
execution_config=execution_config,
render_config=render_config,
)
assert (
err_info.value.args[0]
== "Unable to find the dbt executable, attempted: <invalid-render-dbt> and <invalid-execution-dbt>."
)


def test_converter_fails_render_config_invalid_dbt_path_with_manifest():
"""
Validate that a dbt project succeeds to be rendered to Airflow with DBT_MANIFEST even when
the dbt command is invalid.
"""
project_config = ProjectConfig(manifest_path=SAMPLE_DBT_MANIFEST.as_posix(), project_name="sample")

execution_config = ExecutionConfig(
execution_mode=ExecutionMode.LOCAL,
dbt_executable_path="invalid-execution-dbt",
dbt_project_path=SAMPLE_DBT_PROJECT.as_posix(),
)
render_config = RenderConfig(
emit_datasets=True,
dbt_executable_path="invalid-render-dbt",
)
profile_config = ProfileConfig(
profile_name="my_profile_name",
target_name="my_target_name",
profiles_yml_filepath=SAMPLE_PROFILE_YML,
)
with DAG("test-id", start_date=datetime(2022, 1, 1)) as dag:
converter = DbtToAirflowConverter(
dag=dag,
nodes=nodes,
project_config=project_config,
profile_config=profile_config,
execution_config=execution_config,
render_config=render_config,
)
assert converter


@pytest.mark.parametrize(
"execution_mode,operator_args",
[
Expand Down

0 comments on commit 50df27c

Please sign in to comment.