diff --git a/cosmos/config.py b/cosmos/config.py index cd95d6da1..b1062b6a3 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -265,8 +265,8 @@ class ExecutionConfig: dbt_project_path: InitVar[str | Path | None] = None virtualenv_dir: str | Path | None = None + project_path: Path | None = field(init=False) def __post_init__(self, dbt_project_path: str | Path | None) -> None: self.project_path = Path(dbt_project_path) if dbt_project_path else None - diff --git a/cosmos/converter.py b/cosmos/converter.py index 833b99a30..910c21f34 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -120,10 +120,6 @@ def __init__( # If we are using the old interface, we should migrate it to the new interface # This is safe to do now since we have validated which config interface we're using if project_config.dbt_project_path: - # We copy the configuration so the change does not affect other DAGs or TaskGroups - # that may reuse the same original configuration - render_config = copy.deepcopy(render_config) - execution_config = copy.deepcopy(execution_config) render_config.project_path = project_config.dbt_project_path execution_config.project_path = project_config.dbt_project_path @@ -139,23 +135,8 @@ def __init__( raise CosmosValueError( "RenderConfig.dbt_project_path is required for rendering an airflow DAG from a DBT Graph if no manifest is provided." ) - emit_datasets = render_config.emit_datasets - dbt_root_path = project_config.dbt_project_path.parent - dbt_project_name = project_config.dbt_project_path.name - dbt_models_dir = project_config.models_relative_path - dbt_seeds_dir = project_config.seeds_relative_path - dbt_snapshots_dir = project_config.snapshots_relative_path - test_behavior = render_config.test_behavior - select = render_config.select - exclude = render_config.exclude - dbt_deps = render_config.dbt_deps - execution_mode = execution_config.execution_mode - load_mode = render_config.load_method - manifest_path = project_config.parsed_manifest_path - dbt_executable_path = execution_config.dbt_executable_path - node_converters = render_config.node_converters - - if execution_mode != ExecutionMode.VIRTUALENV and execution_config.virtualenv_dir is not None: + + if execution_config.execution_mode != ExecutionMode.VIRTUALENV and execution_config.virtualenv_dir is not None: logger.warning( "`ExecutionConfig.virtualenv_dir` is only supported when \ ExecutionConfig.execution_mode is set to ExecutionMode.VIRTUALENV." @@ -196,7 +177,7 @@ def __init__( if execution_config.dbt_executable_path: task_args["dbt_executable_path"] = execution_config.dbt_executable_path - if execution_mode == ExecutionMode.VIRTUALENV and execution_config.virtualenv_dir is not None: + if execution_config.execution_mode == ExecutionMode.VIRTUALENV and execution_config.virtualenv_dir is not None: task_args["virtualenv_dir"] = execution_config.virtualenv_dir validate_arguments(render_config.select, render_config.exclude, profile_args, task_args) diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index 242bca6f9..c2551677e 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -20,6 +20,7 @@ ExecutionMode, LoadMode, ) +from cosmos.dbt.executable import get_system_dbt from cosmos.dbt.parser.project import LegacyDbtProject from cosmos.dbt.project import create_symlinks from cosmos.dbt.selector import select_nodes @@ -52,6 +53,14 @@ class DbtNode: has_test: bool = False +def create_symlinks(project_path: Path, tmp_dir: Path) -> None: + """Helper function to create symlinks to the dbt project files.""" + ignore_paths = (DBT_LOG_DIR_NAME, DBT_TARGET_DIR_NAME, "dbt_packages", "profiles.yml") + for child_name in os.listdir(project_path): + if child_name not in ignore_paths: + os.symlink(project_path / child_name, tmp_dir / child_name) + + def run_command(command: list[str], tmp_dir: Path, env_vars: dict[str, str]) -> str: """Run a command in a subprocess, returning the stdout.""" logger.info("Running command: `%s`", " ".join(command)) @@ -108,6 +117,15 @@ class DbtGraph: Supports different ways of loading the `dbt` project into this representation. Different loading methods can result in different `nodes` and `filtered_nodes`. + + Example of how to use: + + dbt_graph = DbtGraph( + project=ProjectConfig(dbt_project_path=DBT_PROJECT_PATH), + render_config=RenderConfig(exclude=["*orders*"], select=[]), + dbt_cmd="/usr/local/bin/dbt" + ) + dbt_graph.load(method=LoadMode.DBT_LS, execution_mode=ExecutionMode.LOCAL) """ nodes: dict[str, DbtNode] = dict() @@ -119,6 +137,7 @@ def __init__( render_config: RenderConfig = RenderConfig(), execution_config: ExecutionConfig = ExecutionConfig(), profile_config: ProfileConfig | None = None, + dbt_cmd: str = get_system_dbt(), operator_args: dict[str, Any] | None = None, ): self.project = project @@ -126,6 +145,7 @@ def __init__( self.profile_config = profile_config self.execution_config = execution_config self.operator_args = operator_args or {} + self.dbt_cmd = dbt_cmd def load( self, @@ -163,9 +183,7 @@ def load( else: load_method[method]() - def run_dbt_ls( - self, dbt_cmd: str, project_path: Path, tmp_dir: Path, env_vars: dict[str, str] - ) -> dict[str, DbtNode]: + def run_dbt_ls(self, project_path: Path, tmp_dir: Path, env_vars: dict[str, str]) -> dict[str, DbtNode]: """Runs dbt ls command and returns the parsed nodes.""" ls_command = [dbt_cmd, "ls", "--output", "json"] @@ -202,10 +220,6 @@ def load_via_dbt_ls(self) -> None: * self.nodes * self.filtered_nodes """ - self.render_config.validate_dbt_command(fallback_cmd=self.execution_config.dbt_executable_path) - dbt_cmd = self.render_config.dbt_executable_path - dbt_cmd = dbt_cmd.as_posix() if isinstance(dbt_cmd, Path) else dbt_cmd - logger.info(f"Trying to parse the dbt project in `{self.render_config.project_path}` using dbt ls...") if not self.render_config.project_path or not self.execution_config.project_path: raise CosmosLoadDbtException( @@ -215,6 +229,9 @@ def load_via_dbt_ls(self) -> None: if not self.profile_config: raise CosmosLoadDbtException("Unable to load project via dbt ls without a profile config.") + if not self.profile_config: + raise CosmosLoadDbtException("Unable to load project via dbt ls without a profile config.") + with tempfile.TemporaryDirectory() as tmpdir: logger.info( f"Content of the dbt project dir {self.render_config.project_path}: `{os.listdir(self.render_config.project_path)}`" @@ -243,12 +260,12 @@ def load_via_dbt_ls(self) -> None: env[DBT_TARGET_PATH_ENVVAR] = str(self.target_dir) if self.render_config.dbt_deps: - deps_command = [dbt_cmd, "deps"] + deps_command = [self.dbt_cmd, "deps"] deps_command.extend(self.local_flags) stdout = run_command(deps_command, tmpdir_path, env) logger.debug("dbt deps output: %s", stdout) - nodes = self.run_dbt_ls(dbt_cmd, self.execution_config.project_path, tmpdir_path, env) + nodes = self.run_dbt_ls(self.execution_config.project_path, tmpdir_path, env) self.nodes = nodes self.filtered_nodes = nodes diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 607e56641..de6e97c22 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -5,7 +5,7 @@ import pytest -from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig, CosmosConfigException +from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig from cosmos.constants import DbtResourceType, ExecutionMode from cosmos.dbt.graph import ( CosmosLoadDbtException, @@ -311,8 +311,9 @@ def test_load_via_dbt_ls_without_exclude(project_name): def test_load_via_custom_without_project_path(): project_config = ProjectConfig(manifest_path=SAMPLE_MANIFEST, project_name="test") execution_config = ExecutionConfig() - render_config = RenderConfig(dbt_executable_path="/inexistent/dbt") + render_config = RenderConfig() dbt_graph = DbtGraph( + dbt_cmd="/inexistent/dbt", project=project_config, execution_config=execution_config, render_config=render_config, @@ -328,10 +329,9 @@ def test_load_via_custom_without_project_path(): def test_load_via_dbt_ls_without_profile(mock_validate_dbt_command): project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) - render_config = RenderConfig( - dbt_executable_path="existing-dbt-cmd", dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME - ) + render_config = RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) dbt_graph = DbtGraph( + dbt_cmd="/inexistent/dbt", project=project_config, execution_config=execution_config, render_config=render_config, @@ -346,9 +346,7 @@ def test_load_via_dbt_ls_without_profile(mock_validate_dbt_command): def test_load_via_dbt_ls_with_invalid_dbt_path(): project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) - render_config = RenderConfig( - dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, dbt_executable_path="/inexistent/dbt" - ) + render_config = RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) with patch("pathlib.Path.exists", return_value=True): dbt_graph = DbtGraph( project=project_config, diff --git a/tests/test_config.py b/tests/test_config.py index cc0711043..56422646e 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -122,38 +122,3 @@ def test_profile_config_validate(): profile_config = ProfileConfig(profile_name="test", target_name="test") assert profile_config.validate_profile() is None assert err_info.value.args[0] == "Either profiles_yml_filepath or profile_mapping must be set to render a profile" - - -@patch("cosmos.config.shutil.which", return_value=None) -def test_render_config_without_dbt_cmd(mock_which): - render_config = RenderConfig() - with pytest.raises(CosmosConfigException) as err_info: - render_config.validate_dbt_command("inexistent-dbt") - - error_msg = err_info.value.args[0] - assert error_msg.startswith("Unable to find the dbt executable, attempted: <") - assert error_msg.endswith("dbt> and .") - - -@patch("cosmos.config.shutil.which", return_value=None) -def test_render_config_with_invalid_dbt_commands(mock_which): - render_config = RenderConfig(dbt_executable_path="invalid-dbt") - with pytest.raises(CosmosConfigException) as err_info: - render_config.validate_dbt_command() - - error_msg = err_info.value.args[0] - assert error_msg == "Unable to find the dbt executable, attempted: ." - - -@patch("cosmos.config.shutil.which", side_effect=(None, "fallback-dbt-path")) -def test_render_config_uses_fallback_if_default_not_found(mock_which): - render_config = RenderConfig() - render_config.validate_dbt_command(Path("/tmp/fallback-dbt-path")) - assert render_config.dbt_executable_path == "/tmp/fallback-dbt-path" - - -@patch("cosmos.config.shutil.which", side_effect=("user-dbt", "fallback-dbt-path")) -def test_render_config_uses_default_if_exists(mock_which): - render_config = RenderConfig(dbt_executable_path="user-dbt") - render_config.validate_dbt_command("fallback-dbt-path") - assert render_config.dbt_executable_path == "user-dbt" diff --git a/tests/test_converter.py b/tests/test_converter.py index 3b409256f..a848f1734 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -181,74 +181,6 @@ def test_converter_fails_execution_config_no_project_dir(mock_load_dbt_graph, ex ) -def test_converter_fails_render_config_invalid_dbt_path_with_dbt_ls(): - """ - Validate that a dbt project fails to be rendered to Airflow with DBT_LS if - the dbt command is invalid. - """ - project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT.as_posix(), project_name="sample") - execution_config = ExecutionConfig( - execution_mode=ExecutionMode.LOCAL, - dbt_executable_path="invalid-execution-dbt", - ) - render_config = RenderConfig( - emit_datasets=True, - dbt_executable_path="invalid-render-dbt", - ) - profile_config = ProfileConfig( - profile_name="my_profile_name", - target_name="my_target_name", - profiles_yml_filepath=SAMPLE_PROFILE_YML, - ) - with pytest.raises(CosmosConfigException) as err_info: - with DAG("test-id", start_date=datetime(2022, 1, 1)) as dag: - DbtToAirflowConverter( - dag=dag, - nodes=nodes, - project_config=project_config, - profile_config=profile_config, - execution_config=execution_config, - render_config=render_config, - ) - assert ( - err_info.value.args[0] - == "Unable to find the dbt executable, attempted: and ." - ) - - -def test_converter_fails_render_config_invalid_dbt_path_with_manifest(): - """ - Validate that a dbt project succeeds to be rendered to Airflow with DBT_MANIFEST even when - the dbt command is invalid. - """ - project_config = ProjectConfig(manifest_path=SAMPLE_DBT_MANIFEST.as_posix(), project_name="sample") - - execution_config = ExecutionConfig( - execution_mode=ExecutionMode.LOCAL, - dbt_executable_path="invalid-execution-dbt", - dbt_project_path=SAMPLE_DBT_PROJECT.as_posix(), - ) - render_config = RenderConfig( - emit_datasets=True, - dbt_executable_path="invalid-render-dbt", - ) - profile_config = ProfileConfig( - profile_name="my_profile_name", - target_name="my_target_name", - profiles_yml_filepath=SAMPLE_PROFILE_YML, - ) - with DAG("test-id", start_date=datetime(2022, 1, 1)) as dag: - converter = DbtToAirflowConverter( - dag=dag, - nodes=nodes, - project_config=project_config, - profile_config=profile_config, - execution_config=execution_config, - render_config=render_config, - ) - assert converter - - @pytest.mark.parametrize( "execution_mode,operator_args", [