diff --git a/cosmos/config.py b/cosmos/config.py index 72b1f2eb9..e4985ed1b 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -301,8 +301,8 @@ class ExecutionConfig: dbt_project_path: InitVar[str | Path | None] = None virtualenv_dir: str | Path | None = None + project_path: Path | None = field(init=False) def __post_init__(self, dbt_project_path: str | Path | None) -> None: self.project_path = Path(dbt_project_path) if dbt_project_path else None - diff --git a/cosmos/converter.py b/cosmos/converter.py index 6c87a416a..eebb30de8 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -230,23 +230,8 @@ def __init__( raise CosmosValueError( "RenderConfig.dbt_project_path is required for rendering an airflow DAG from a DBT Graph if no manifest is provided." ) - emit_datasets = render_config.emit_datasets - dbt_root_path = project_config.dbt_project_path.parent - dbt_project_name = project_config.dbt_project_path.name - dbt_models_dir = project_config.models_relative_path - dbt_seeds_dir = project_config.seeds_relative_path - dbt_snapshots_dir = project_config.snapshots_relative_path - test_behavior = render_config.test_behavior - select = render_config.select - exclude = render_config.exclude - dbt_deps = render_config.dbt_deps - execution_mode = execution_config.execution_mode - load_mode = render_config.load_method - manifest_path = project_config.parsed_manifest_path - dbt_executable_path = execution_config.dbt_executable_path - node_converters = render_config.node_converters - - if execution_mode != ExecutionMode.VIRTUALENV and execution_config.virtualenv_dir is not None: + + if execution_config.execution_mode != ExecutionMode.VIRTUALENV and execution_config.virtualenv_dir is not None: logger.warning( "`ExecutionConfig.virtualenv_dir` is only supported when \ ExecutionConfig.execution_mode is set to ExecutionMode.VIRTUALENV." @@ -296,7 +281,7 @@ def __init__( task_args, execution_mode=execution_config.execution_mode, ) - if (execution_mode == ExecutionMode.VIRTUALENV and execution_config.virtualenv_dir is not None): + if execution_config.execution_mode == ExecutionMode.VIRTUALENV and execution_config.virtualenv_dir is not None: task_args["virtualenv_dir"] = execution_config.virtualenv_dir build_airflow_graph( diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index b41c15a49..df2390e49 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -21,6 +21,7 @@ ExecutionMode, LoadMode, ) +from cosmos.dbt.executable import get_system_dbt from cosmos.dbt.parser.project import LegacyDbtProject from cosmos.dbt.project import create_symlinks, environ from cosmos.dbt.selector import select_nodes @@ -69,6 +70,14 @@ def name(self) -> str: return self.resource_name.replace(".", "_") +def create_symlinks(project_path: Path, tmp_dir: Path) -> None: + """Helper function to create symlinks to the dbt project files.""" + ignore_paths = (DBT_LOG_DIR_NAME, DBT_TARGET_DIR_NAME, "dbt_packages", "profiles.yml") + for child_name in os.listdir(project_path): + if child_name not in ignore_paths: + os.symlink(project_path / child_name, tmp_dir / child_name) + + def run_command(command: list[str], tmp_dir: Path, env_vars: dict[str, str]) -> str: """Run a command in a subprocess, returning the stdout.""" logger.info("Running command: `%s`", " ".join(command)) @@ -124,6 +133,15 @@ class DbtGraph: Supports different ways of loading the `dbt` project into this representation. Different loading methods can result in different `nodes` and `filtered_nodes`. + + Example of how to use: + + dbt_graph = DbtGraph( + project=ProjectConfig(dbt_project_path=DBT_PROJECT_PATH), + render_config=RenderConfig(exclude=["*orders*"], select=[]), + dbt_cmd="/usr/local/bin/dbt" + ) + dbt_graph.load(method=LoadMode.DBT_LS, execution_mode=ExecutionMode.LOCAL) """ nodes: dict[str, DbtNode] = dict() @@ -137,12 +155,16 @@ def __init__( profile_config: ProfileConfig | None = None, # dbt_vars only supported for LegacyDbtProject dbt_vars: dict[str, str] | None = None, + dbt_cmd: str = get_system_dbt(), + operator_args: dict[str, Any] | None = None, ): self.project = project self.render_config = render_config self.profile_config = profile_config self.execution_config = execution_config self.dbt_vars = dbt_vars or {} + self.operator_args = operator_args or {} + self.dbt_cmd = dbt_cmd def load( self, @@ -181,9 +203,7 @@ def load( else: load_method[method]() - def run_dbt_ls( - self, dbt_cmd: str, project_path: Path, tmp_dir: Path, env_vars: dict[str, str] - ) -> dict[str, DbtNode]: + def run_dbt_ls(self, project_path: Path, tmp_dir: Path, env_vars: dict[str, str]) -> dict[str, DbtNode]: """Runs dbt ls command and returns the parsed nodes.""" ls_command = [dbt_cmd, "ls", "--output", "json"] @@ -223,10 +243,6 @@ def load_via_dbt_ls(self) -> None: * self.nodes * self.filtered_nodes """ - self.render_config.validate_dbt_command(fallback_cmd=self.execution_config.dbt_executable_path) - dbt_cmd = self.render_config.dbt_executable_path - dbt_cmd = dbt_cmd.as_posix() if isinstance(dbt_cmd, Path) else dbt_cmd - logger.info(f"Trying to parse the dbt project in `{self.render_config.project_path}` using dbt ls...") if not self.render_config.project_path or not self.execution_config.project_path: raise CosmosLoadDbtException( @@ -236,6 +252,9 @@ def load_via_dbt_ls(self) -> None: if not self.profile_config: raise CosmosLoadDbtException("Unable to load project via dbt ls without a profile config.") + if not self.profile_config: + raise CosmosLoadDbtException("Unable to load project via dbt ls without a profile config.") + with tempfile.TemporaryDirectory() as tmpdir: logger.info( f"Content of the dbt project dir {self.render_config.project_path}: `{os.listdir(self.render_config.project_path)}`" @@ -266,12 +285,12 @@ def load_via_dbt_ls(self) -> None: env[DBT_TARGET_PATH_ENVVAR] = str(self.target_dir) if self.render_config.dbt_deps: - deps_command = [dbt_cmd, "deps"] + deps_command = [self.dbt_cmd, "deps"] deps_command.extend(self.local_flags) stdout = run_command(deps_command, tmpdir_path, env) logger.debug("dbt deps output: %s", stdout) - nodes = self.run_dbt_ls(dbt_cmd, self.execution_config.project_path, tmpdir_path, env) + nodes = self.run_dbt_ls(self.execution_config.project_path, tmpdir_path, env) self.nodes = nodes self.filtered_nodes = nodes diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 7e941cb49..6d092d623 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -6,7 +6,7 @@ import pytest -from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig, CosmosConfigException +from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig from cosmos.constants import DbtResourceType, ExecutionMode from cosmos.dbt.graph import ( CosmosLoadDbtException, @@ -374,8 +374,9 @@ def test_load_via_dbt_ls_without_exclude(project_name, postgres_profile_config): def test_load_via_custom_without_project_path(): project_config = ProjectConfig(manifest_path=SAMPLE_MANIFEST, project_name="test") execution_config = ExecutionConfig() - render_config = RenderConfig(dbt_executable_path="/inexistent/dbt") + render_config = RenderConfig() dbt_graph = DbtGraph( + dbt_cmd="/inexistent/dbt", project=project_config, execution_config=execution_config, render_config=render_config, @@ -391,10 +392,9 @@ def test_load_via_custom_without_project_path(): def test_load_via_dbt_ls_without_profile(mock_validate_dbt_command): project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) - render_config = RenderConfig( - dbt_executable_path="existing-dbt-cmd", dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME - ) + render_config = RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) dbt_graph = DbtGraph( + dbt_cmd="/inexistent/dbt", project=project_config, execution_config=execution_config, render_config=render_config, @@ -410,9 +410,7 @@ def test_load_via_dbt_ls_without_profile(mock_validate_dbt_command): def test_load_via_dbt_ls_with_invalid_dbt_path(mock_which): project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) - render_config = RenderConfig( - dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, dbt_executable_path="/inexistent/dbt" - ) + render_config = RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) with patch("pathlib.Path.exists", return_value=True): dbt_graph = DbtGraph( project=project_config, diff --git a/tests/test_converter.py b/tests/test_converter.py index 7bd84a046..2fd43cd7f 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -270,74 +270,6 @@ def test_converter_fails_execution_config_no_project_dir(mock_load_dbt_graph, ex ) -def test_converter_fails_render_config_invalid_dbt_path_with_dbt_ls(): - """ - Validate that a dbt project fails to be rendered to Airflow with DBT_LS if - the dbt command is invalid. - """ - project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT.as_posix(), project_name="sample") - execution_config = ExecutionConfig( - execution_mode=ExecutionMode.LOCAL, - dbt_executable_path="invalid-execution-dbt", - ) - render_config = RenderConfig( - emit_datasets=True, - dbt_executable_path="invalid-render-dbt", - ) - profile_config = ProfileConfig( - profile_name="my_profile_name", - target_name="my_target_name", - profiles_yml_filepath=SAMPLE_PROFILE_YML, - ) - with pytest.raises(CosmosConfigException) as err_info: - with DAG("test-id", start_date=datetime(2022, 1, 1)) as dag: - DbtToAirflowConverter( - dag=dag, - nodes=nodes, - project_config=project_config, - profile_config=profile_config, - execution_config=execution_config, - render_config=render_config, - ) - assert ( - err_info.value.args[0] - == "Unable to find the dbt executable, attempted: and ." - ) - - -def test_converter_fails_render_config_invalid_dbt_path_with_manifest(): - """ - Validate that a dbt project succeeds to be rendered to Airflow with DBT_MANIFEST even when - the dbt command is invalid. - """ - project_config = ProjectConfig(manifest_path=SAMPLE_DBT_MANIFEST.as_posix(), project_name="sample") - - execution_config = ExecutionConfig( - execution_mode=ExecutionMode.LOCAL, - dbt_executable_path="invalid-execution-dbt", - dbt_project_path=SAMPLE_DBT_PROJECT.as_posix(), - ) - render_config = RenderConfig( - emit_datasets=True, - dbt_executable_path="invalid-render-dbt", - ) - profile_config = ProfileConfig( - profile_name="my_profile_name", - target_name="my_target_name", - profiles_yml_filepath=SAMPLE_PROFILE_YML, - ) - with DAG("test-id", start_date=datetime(2022, 1, 1)) as dag: - converter = DbtToAirflowConverter( - dag=dag, - nodes=nodes, - project_config=project_config, - profile_config=profile_config, - execution_config=execution_config, - render_config=render_config, - ) - assert converter - - @pytest.mark.parametrize( "execution_mode,operator_args", [