From d5ba070de291f4724fdb1144e752f9387be10b8c Mon Sep 17 00:00:00 2001 From: Cliff Lau Date: Wed, 27 Sep 2023 04:49:43 +0800 Subject: [PATCH] Use `returncode` instead of `stderr` to determine dbt graph loading errors (#547) Before, `DbtGraph.load_via_dbt_ls` raises `CosmosLoadDbtException` whenever `stderr` is not empty. This behavior does not seem consistent, as non-blocking warnings can still make `cosmos` refuse to continue. A specific case that I encountered is an OpenBLAS warning that is produced by `podman` and [`composer-local-dev`](https://github.com/GoogleCloudPlatform/composer-local-dev): ``` OpenBLAS WARNING - could not determine the L2 cache size on this system, assuming 256k ``` This warning does not affect how `dbt` works, but `cosmos` still throws out an exception. This PR solves this. --- cosmos/dbt/graph.py | 20 +++++++++------- tests/dbt/test_graph.py | 53 +++++++++++++++++++++++++++++++++++++++-- 2 files changed, 62 insertions(+), 11 deletions(-) diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index f83f490e2..1ad6c1737 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -194,9 +194,10 @@ def load_via_dbt_ls(self) -> None: env=env, ) stdout, stderr = process.communicate() + returncode = process.returncode logger.debug("dbt deps output: %s", stdout) - if stderr or "Error" in stdout: + if returncode or "Error" in stdout: details = stderr or stdout raise CosmosLoadDbtException(f"Unable to run dbt deps command due to the error:\n{details}") @@ -223,6 +224,7 @@ def load_via_dbt_ls(self) -> None: ) stdout, stderr = process.communicate() + returncode = process.returncode logger.debug("dbt output: %s", stdout) log_filepath = log_dir / DBT_LOG_FILENAME @@ -232,14 +234,14 @@ def load_via_dbt_ls(self) -> None: for line in logfile: logger.debug(line.strip()) - if stderr or "Error" in stdout: - if 'Run "dbt deps" to install package dependencies' in stdout: - raise CosmosLoadDbtException( - "Unable to run dbt ls command due to missing dbt_packages. Set render_config.dbt_deps=True." - ) - else: - details = stderr or stdout - raise CosmosLoadDbtException(f"Unable to run dbt ls command due to the error:\n{details}") + if 'Run "dbt deps" to install package dependencies' in stdout: + raise CosmosLoadDbtException( + "Unable to run dbt ls command due to missing dbt_packages. Set render_config.dbt_deps=True." + ) + + if returncode or "Error" in stdout: + details = stderr or stdout + raise CosmosLoadDbtException(f"Unable to run dbt ls command due to the error:\n{details}") nodes = {} for line in stdout.split("\n"): diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 4dbdeb411..317dce0bb 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -6,8 +6,8 @@ import pytest from cosmos.config import ProfileConfig -from cosmos.constants import ExecutionMode, DbtResourceType -from cosmos.dbt.graph import DbtGraph, LoadMode, CosmosLoadDbtException +from cosmos.constants import DbtResourceType, ExecutionMode +from cosmos.dbt.graph import CosmosLoadDbtException, DbtGraph, LoadMode from cosmos.dbt.project import DbtProject from cosmos.profiles import PostgresUserPasswordProfileMapping @@ -132,6 +132,7 @@ def test_load( @patch("cosmos.dbt.graph.Popen") def test_load_via_dbt_ls_does_not_create_target_logs_in_original_folder(mock_popen, tmp_dbt_project_dir): mock_popen().communicate.return_value = ("", "") + mock_popen().returncode = 0 assert not (tmp_dbt_project_dir / "target").exists() assert not (tmp_dbt_project_dir / "logs").exists() @@ -276,6 +277,53 @@ def test_load_via_dbt_ls_without_dbt_deps(): assert err_info.value.args[0] == expected +@pytest.mark.integration +@patch("cosmos.dbt.graph.Popen") +def test_load_via_dbt_ls_with_zero_returncode_and_non_empty_stderr(mock_popen, tmp_dbt_project_dir): + mock_popen().communicate.return_value = ("", "Some stderr warnings") + mock_popen().returncode = 0 + + dbt_project = DbtProject(name=DBT_PIPELINE_NAME, root_dir=tmp_dbt_project_dir) + dbt_graph = DbtGraph( + project=dbt_project, + profile_config=ProfileConfig( + profile_name="default", + target_name="default", + profile_mapping=PostgresUserPasswordProfileMapping( + conn_id="airflow_db", + profile_args={"schema": "public"}, + ), + ), + ) + + dbt_graph.load_via_dbt_ls() # does not raise exception + + +@pytest.mark.integration +@patch("cosmos.dbt.graph.Popen") +def test_load_via_dbt_ls_with_non_zero_returncode(mock_popen): + mock_popen().communicate.return_value = ("", "Some stderr message") + mock_popen().returncode = 1 + + dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR) + dbt_graph = DbtGraph( + project=dbt_project, + profile_config=ProfileConfig( + profile_name="default", + target_name="default", + profile_mapping=PostgresUserPasswordProfileMapping( + conn_id="airflow_db", + profile_args={"schema": "public"}, + ), + ), + ) + with pytest.raises(CosmosLoadDbtException) as err_info: + dbt_graph.load_via_dbt_ls() + + expected = "Unable to run dbt deps command due to the error:\nSome stderr message" + assert err_info.value.args[0] == expected + + @pytest.mark.integration @patch("cosmos.dbt.graph.Popen.communicate", return_value=("Some Runtime Error", "")) def test_load_via_dbt_ls_with_runtime_error_in_stdout(mock_popen_communicate): @@ -294,6 +342,7 @@ def test_load_via_dbt_ls_with_runtime_error_in_stdout(mock_popen_communicate): ) with pytest.raises(CosmosLoadDbtException) as err_info: dbt_graph.load_via_dbt_ls() + expected = "Unable to run dbt deps command due to the error:\nSome Runtime Error" assert err_info.value.args[0] == expected mock_popen_communicate.assert_called_once()