From 50df27ce92b0a6c403cabdca1654e12b50c2aca5 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Mon, 13 Nov 2023 22:14:07 +0000 Subject: [PATCH] Fix 'Unable to find the dbt executable: dbt' error (#666) Since Cosmos 1.2.2 users who used `ExecutionMode.DBT_LS` (directly or via `ExecutionMode.AUTOMATIC`) and set `ExecutionConfig.dbt_executable_path` (most, if not all, Astro CLI users), like: ``` execution_config = ExecutionConfig( dbt_executable_path = f"{os.environ['AIRFLOW_HOME']}/dbt_venv/bin/dbt", ) ``` Started facing the issue: ``` Broken DAG: [/usr/local/airflow/dags/my_example.py] Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/cosmos/dbt/graph.py", line 178, in load self.load_via_dbt_ls() File "/usr/local/lib/python3.11/site-packages/cosmos/dbt/graph.py", line 233, in load_via_dbt_ls raise CosmosLoadDbtException(f"Unable to find the dbt executable: {self.dbt_cmd}") cosmos.dbt.graph.CosmosLoadDbtException: Unable to find the dbt executable: dbt ``` This issue was initially reported in the Airflow #airflow-astronomer Slack channel: https://apache-airflow.slack.com/archives/C03T0AVNA6A/p1699584315506629 The workaround to avoid this error in Cosmos 1.2.2 and 1.2.3 is to set the `dbt_executable_path` in the `RenderConfig`: ``` render_config=RenderConfig(dbt_executable_path = f"{os.environ['AIRFLOW_HOME']}/dbt_venv/bin/dbt",), ``` This PR solves the bug from Cosmos 1.2.4 onwards. (cherry picked from commit 2b6136486db92440a29c22024a7862861bc2b7c9) --- cosmos/config.py | 30 +++++++++++++++++ cosmos/converter.py | 1 - cosmos/dbt/graph.py | 30 ++++++----------- tests/dbt/test_graph.py | 22 ++++++------ tests/test_config.py | 38 ++++++++++++++++++++- tests/test_converter.py | 74 +++++++++++++++++++++++++++++++++++++++-- 6 files changed, 161 insertions(+), 34 deletions(-) diff --git a/cosmos/config.py b/cosmos/config.py index 87baba864..57d5200b1 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -3,6 +3,7 @@ from __future__ import annotations import contextlib +import shutil import tempfile from dataclasses import InitVar, dataclass, field from pathlib import Path @@ -19,6 +20,14 @@ DEFAULT_PROFILES_FILE_NAME = "profiles.yml" +class CosmosConfigException(Exception): + """ + Exceptions related to user misconfiguration. + """ + + pass + + @dataclass class RenderConfig: """ @@ -51,6 +60,27 @@ class RenderConfig: def __post_init__(self, dbt_project_path: str | Path | None) -> None: self.project_path = Path(dbt_project_path) if dbt_project_path else None + def validate_dbt_command(self, fallback_cmd: str | Path = "") -> None: + """ + When using LoadMode.DBT_LS, the dbt executable path is necessary for rendering. + + Validates that the original dbt command works, if not, attempt to use the fallback_dbt_cmd. + If neither works, raise an exception. + + The fallback behaviour is necessary for Cosmos < 1.2.2 backwards compatibility. + """ + if not shutil.which(self.dbt_executable_path): + if isinstance(fallback_cmd, Path): + fallback_cmd = fallback_cmd.as_posix() + + if fallback_cmd and shutil.which(fallback_cmd): + self.dbt_executable_path = fallback_cmd + else: + raise CosmosConfigException( + "Unable to find the dbt executable, attempted: " + f"<{self.dbt_executable_path}>" + (f" and <{fallback_cmd}>." if fallback_cmd else ".") + ) + class ProjectConfig: """ diff --git a/cosmos/converter.py b/cosmos/converter.py index 45d98a4cf..559b7ea69 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -160,7 +160,6 @@ def __init__( project=project_config, render_config=render_config, execution_config=execution_config, - dbt_cmd=render_config.dbt_executable_path, profile_config=profile_config, operator_args=operator_args, ) diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index 0322c8ac4..3b61ef57d 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -3,7 +3,6 @@ import itertools import json import os -import shutil import tempfile from dataclasses import dataclass, field from pathlib import Path @@ -21,7 +20,6 @@ ExecutionMode, LoadMode, ) -from cosmos.dbt.executable import get_system_dbt from cosmos.dbt.parser.project import LegacyDbtProject from cosmos.dbt.selector import select_nodes from cosmos.log import get_logger @@ -117,15 +115,6 @@ class DbtGraph: Supports different ways of loading the `dbt` project into this representation. Different loading methods can result in different `nodes` and `filtered_nodes`. - - Example of how to use: - - dbt_graph = DbtGraph( - project=ProjectConfig(dbt_project_path=DBT_PROJECT_PATH), - render_config=RenderConfig(exclude=["*orders*"], select=[]), - dbt_cmd="/usr/local/bin/dbt" - ) - dbt_graph.load(method=LoadMode.DBT_LS, execution_mode=ExecutionMode.LOCAL) """ nodes: dict[str, DbtNode] = dict() @@ -137,7 +126,6 @@ def __init__( render_config: RenderConfig = RenderConfig(), execution_config: ExecutionConfig = ExecutionConfig(), profile_config: ProfileConfig | None = None, - dbt_cmd: str = get_system_dbt(), operator_args: dict[str, Any] | None = None, ): self.project = project @@ -145,7 +133,6 @@ def __init__( self.profile_config = profile_config self.execution_config = execution_config self.operator_args = operator_args or {} - self.dbt_cmd = dbt_cmd def load( self, @@ -183,9 +170,11 @@ def load( else: load_method[method]() - def run_dbt_ls(self, project_path: Path, tmp_dir: Path, env_vars: dict[str, str]) -> dict[str, DbtNode]: + def run_dbt_ls( + self, dbt_cmd: str, project_path: Path, tmp_dir: Path, env_vars: dict[str, str] + ) -> dict[str, DbtNode]: """Runs dbt ls command and returns the parsed nodes.""" - ls_command = [self.dbt_cmd, "ls", "--output", "json"] + ls_command = [dbt_cmd, "ls", "--output", "json"] if self.render_config.exclude: ls_command.extend(["--exclude", *self.render_config.exclude]) @@ -220,6 +209,10 @@ def load_via_dbt_ls(self) -> None: * self.nodes * self.filtered_nodes """ + self.render_config.validate_dbt_command(fallback_cmd=self.execution_config.dbt_executable_path) + dbt_cmd = self.render_config.dbt_executable_path + dbt_cmd = dbt_cmd.as_posix() if isinstance(dbt_cmd, Path) else dbt_cmd + logger.info(f"Trying to parse the dbt project in `{self.render_config.project_path}` using dbt ls...") if not self.render_config.project_path or not self.execution_config.project_path: raise CosmosLoadDbtException( @@ -229,9 +222,6 @@ def load_via_dbt_ls(self) -> None: if not self.profile_config: raise CosmosLoadDbtException("Unable to load project via dbt ls without a profile config.") - if not shutil.which(self.dbt_cmd): - raise CosmosLoadDbtException(f"Unable to find the dbt executable: {self.dbt_cmd}") - with tempfile.TemporaryDirectory() as tmpdir: logger.info( f"Content of the dbt project dir {self.render_config.project_path}: `{os.listdir(self.render_config.project_path)}`" @@ -260,12 +250,12 @@ def load_via_dbt_ls(self) -> None: env[DBT_TARGET_PATH_ENVVAR] = str(self.target_dir) if self.render_config.dbt_deps: - deps_command = [self.dbt_cmd, "deps"] + deps_command = [dbt_cmd, "deps"] deps_command.extend(self.local_flags) stdout = run_command(deps_command, tmpdir_path, env) logger.debug("dbt deps output: %s", stdout) - nodes = self.run_dbt_ls(self.execution_config.project_path, tmpdir_path, env) + nodes = self.run_dbt_ls(dbt_cmd, self.execution_config.project_path, tmpdir_path, env) self.nodes = nodes self.filtered_nodes = nodes diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index b108878fc..6ae6ab200 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -5,7 +5,7 @@ import pytest -from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig +from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig, CosmosConfigException from cosmos.constants import DbtResourceType, ExecutionMode from cosmos.dbt.graph import ( CosmosLoadDbtException, @@ -312,9 +312,8 @@ def test_load_via_dbt_ls_without_exclude(project_name): def test_load_via_custom_without_project_path(): project_config = ProjectConfig(manifest_path=SAMPLE_MANIFEST, project_name="test") execution_config = ExecutionConfig() - render_config = RenderConfig() + render_config = RenderConfig(dbt_executable_path="/inexistent/dbt") dbt_graph = DbtGraph( - dbt_cmd="/inexistent/dbt", project=project_config, execution_config=execution_config, render_config=render_config, @@ -326,12 +325,14 @@ def test_load_via_custom_without_project_path(): assert err_info.value.args[0] == expected -def test_load_via_dbt_ls_without_profile(): +@patch("cosmos.config.RenderConfig.validate_dbt_command", return_value=None) +def test_load_via_dbt_ls_without_profile(mock_validate_dbt_command): project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) - render_config = RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + render_config = RenderConfig( + dbt_executable_path="existing-dbt-cmd", dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME + ) dbt_graph = DbtGraph( - dbt_cmd="/inexistent/dbt", project=project_config, execution_config=execution_config, render_config=render_config, @@ -346,10 +347,11 @@ def test_load_via_dbt_ls_without_profile(): def test_load_via_dbt_ls_with_invalid_dbt_path(): project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) - render_config = RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + render_config = RenderConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, dbt_executable_path="/inexistent/dbt" + ) with patch("pathlib.Path.exists", return_value=True): dbt_graph = DbtGraph( - dbt_cmd="/inexistent/dbt", project=project_config, execution_config=execution_config, render_config=render_config, @@ -359,10 +361,10 @@ def test_load_via_dbt_ls_with_invalid_dbt_path(): profiles_yml_filepath=Path(__file__).parent.parent / "sample/profiles.yml", ), ) - with pytest.raises(CosmosLoadDbtException) as err_info: + with pytest.raises(CosmosConfigException) as err_info: dbt_graph.load_via_dbt_ls() - expected = "Unable to find the dbt executable: /inexistent/dbt" + expected = "Unable to find the dbt executable, attempted: and ." assert err_info.value.args[0] == expected diff --git a/tests/test_config.py b/tests/test_config.py index 9eec48055..cc0711043 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -1,8 +1,9 @@ from pathlib import Path +from unittest.mock import patch import pytest -from cosmos.config import ProfileConfig, ProjectConfig +from cosmos.config import ProfileConfig, ProjectConfig, RenderConfig, CosmosConfigException from cosmos.exceptions import CosmosValueError @@ -121,3 +122,38 @@ def test_profile_config_validate(): profile_config = ProfileConfig(profile_name="test", target_name="test") assert profile_config.validate_profile() is None assert err_info.value.args[0] == "Either profiles_yml_filepath or profile_mapping must be set to render a profile" + + +@patch("cosmos.config.shutil.which", return_value=None) +def test_render_config_without_dbt_cmd(mock_which): + render_config = RenderConfig() + with pytest.raises(CosmosConfigException) as err_info: + render_config.validate_dbt_command("inexistent-dbt") + + error_msg = err_info.value.args[0] + assert error_msg.startswith("Unable to find the dbt executable, attempted: <") + assert error_msg.endswith("dbt> and .") + + +@patch("cosmos.config.shutil.which", return_value=None) +def test_render_config_with_invalid_dbt_commands(mock_which): + render_config = RenderConfig(dbt_executable_path="invalid-dbt") + with pytest.raises(CosmosConfigException) as err_info: + render_config.validate_dbt_command() + + error_msg = err_info.value.args[0] + assert error_msg == "Unable to find the dbt executable, attempted: ." + + +@patch("cosmos.config.shutil.which", side_effect=(None, "fallback-dbt-path")) +def test_render_config_uses_fallback_if_default_not_found(mock_which): + render_config = RenderConfig() + render_config.validate_dbt_command(Path("/tmp/fallback-dbt-path")) + assert render_config.dbt_executable_path == "/tmp/fallback-dbt-path" + + +@patch("cosmos.config.shutil.which", side_effect=("user-dbt", "fallback-dbt-path")) +def test_render_config_uses_default_if_exists(mock_which): + render_config = RenderConfig(dbt_executable_path="user-dbt") + render_config.validate_dbt_command("fallback-dbt-path") + assert render_config.dbt_executable_path == "user-dbt" diff --git a/tests/test_converter.py b/tests/test_converter.py index 5d89513b3..4210b24d6 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -1,11 +1,13 @@ +from datetime import datetime from pathlib import Path - from unittest.mock import patch + import pytest +from airflow.models import DAG from cosmos.converter import DbtToAirflowConverter, validate_arguments from cosmos.constants import DbtResourceType, ExecutionMode -from cosmos.config import ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig +from cosmos.config import ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig, CosmosConfigException from cosmos.dbt.graph import DbtNode from cosmos.exceptions import CosmosValueError @@ -141,6 +143,74 @@ def test_converter_fails_execution_config_no_project_dir(mock_load_dbt_graph, ex ) +def test_converter_fails_render_config_invalid_dbt_path_with_dbt_ls(): + """ + Validate that a dbt project fails to be rendered to Airflow with DBT_LS if + the dbt command is invalid. + """ + project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT.as_posix(), project_name="sample") + execution_config = ExecutionConfig( + execution_mode=ExecutionMode.LOCAL, + dbt_executable_path="invalid-execution-dbt", + ) + render_config = RenderConfig( + emit_datasets=True, + dbt_executable_path="invalid-render-dbt", + ) + profile_config = ProfileConfig( + profile_name="my_profile_name", + target_name="my_target_name", + profiles_yml_filepath=SAMPLE_PROFILE_YML, + ) + with pytest.raises(CosmosConfigException) as err_info: + with DAG("test-id", start_date=datetime(2022, 1, 1)) as dag: + DbtToAirflowConverter( + dag=dag, + nodes=nodes, + project_config=project_config, + profile_config=profile_config, + execution_config=execution_config, + render_config=render_config, + ) + assert ( + err_info.value.args[0] + == "Unable to find the dbt executable, attempted: and ." + ) + + +def test_converter_fails_render_config_invalid_dbt_path_with_manifest(): + """ + Validate that a dbt project succeeds to be rendered to Airflow with DBT_MANIFEST even when + the dbt command is invalid. + """ + project_config = ProjectConfig(manifest_path=SAMPLE_DBT_MANIFEST.as_posix(), project_name="sample") + + execution_config = ExecutionConfig( + execution_mode=ExecutionMode.LOCAL, + dbt_executable_path="invalid-execution-dbt", + dbt_project_path=SAMPLE_DBT_PROJECT.as_posix(), + ) + render_config = RenderConfig( + emit_datasets=True, + dbt_executable_path="invalid-render-dbt", + ) + profile_config = ProfileConfig( + profile_name="my_profile_name", + target_name="my_target_name", + profiles_yml_filepath=SAMPLE_PROFILE_YML, + ) + with DAG("test-id", start_date=datetime(2022, 1, 1)) as dag: + converter = DbtToAirflowConverter( + dag=dag, + nodes=nodes, + project_config=project_config, + profile_config=profile_config, + execution_config=execution_config, + render_config=render_config, + ) + assert converter + + @pytest.mark.parametrize( "execution_mode,operator_args", [