Skip to content

Commit

Permalink
Support ProjectConfig.dbt_project_path = None & different paths for…
Browse files Browse the repository at this point in the history
… Rendering and Execution (astronomer#634)

This MR finishes the work that was started in astronomer#605 to add full support
for ProjectConfig.dbt_project_path = None, and implements astronomer#568.

Within this PR, several things have been updated:
1 - Added project_path fields to RenderConfig and ExecutionConfig
2 - Simplified the consumption of RenderConfig in the dbtGraph class
3 - added option to configure different dbt executables for Rendering vs
Execution.

Closes: astronomer#568
  • Loading branch information
MrBones757 authored and LennartKloppenburg committed Dec 17, 2023
1 parent e5ccdce commit a0f9550
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 104 deletions.
2 changes: 1 addition & 1 deletion cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,8 @@ class ExecutionConfig:

dbt_project_path: InitVar[str | Path | None] = None
virtualenv_dir: str | Path | None = None

project_path: Path | None = field(init=False)

def __post_init__(self, dbt_project_path: str | Path | None) -> None:
self.project_path = Path(dbt_project_path) if dbt_project_path else None

21 changes: 3 additions & 18 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,23 +230,8 @@ def __init__(
raise CosmosValueError(
"RenderConfig.dbt_project_path is required for rendering an airflow DAG from a DBT Graph if no manifest is provided."
)
emit_datasets = render_config.emit_datasets
dbt_root_path = project_config.dbt_project_path.parent
dbt_project_name = project_config.dbt_project_path.name
dbt_models_dir = project_config.models_relative_path
dbt_seeds_dir = project_config.seeds_relative_path
dbt_snapshots_dir = project_config.snapshots_relative_path
test_behavior = render_config.test_behavior
select = render_config.select
exclude = render_config.exclude
dbt_deps = render_config.dbt_deps
execution_mode = execution_config.execution_mode
load_mode = render_config.load_method
manifest_path = project_config.parsed_manifest_path
dbt_executable_path = execution_config.dbt_executable_path
node_converters = render_config.node_converters

if execution_mode != ExecutionMode.VIRTUALENV and execution_config.virtualenv_dir is not None:

if execution_config.execution_mode != ExecutionMode.VIRTUALENV and execution_config.virtualenv_dir is not None:
logger.warning(
"`ExecutionConfig.virtualenv_dir` is only supported when \
ExecutionConfig.execution_mode is set to ExecutionMode.VIRTUALENV."
Expand Down Expand Up @@ -296,7 +281,7 @@ def __init__(
task_args,
execution_mode=execution_config.execution_mode,
)
if (execution_mode == ExecutionMode.VIRTUALENV and execution_config.virtualenv_dir is not None):
if execution_config.execution_mode == ExecutionMode.VIRTUALENV and execution_config.virtualenv_dir is not None:
task_args["virtualenv_dir"] = execution_config.virtualenv_dir

build_airflow_graph(
Expand Down
37 changes: 28 additions & 9 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
ExecutionMode,
LoadMode,
)
from cosmos.dbt.executable import get_system_dbt
from cosmos.dbt.parser.project import LegacyDbtProject
from cosmos.dbt.project import create_symlinks, environ
from cosmos.dbt.selector import select_nodes
Expand Down Expand Up @@ -69,6 +70,14 @@ def name(self) -> str:
return self.resource_name.replace(".", "_")


def create_symlinks(project_path: Path, tmp_dir: Path) -> None:
"""Helper function to create symlinks to the dbt project files."""
ignore_paths = (DBT_LOG_DIR_NAME, DBT_TARGET_DIR_NAME, "dbt_packages", "profiles.yml")
for child_name in os.listdir(project_path):
if child_name not in ignore_paths:
os.symlink(project_path / child_name, tmp_dir / child_name)


def run_command(command: list[str], tmp_dir: Path, env_vars: dict[str, str]) -> str:
"""Run a command in a subprocess, returning the stdout."""
logger.info("Running command: `%s`", " ".join(command))
Expand Down Expand Up @@ -124,6 +133,15 @@ class DbtGraph:
Supports different ways of loading the `dbt` project into this representation.
Different loading methods can result in different `nodes` and `filtered_nodes`.
Example of how to use:
dbt_graph = DbtGraph(
project=ProjectConfig(dbt_project_path=DBT_PROJECT_PATH),
render_config=RenderConfig(exclude=["*orders*"], select=[]),
dbt_cmd="/usr/local/bin/dbt"
)
dbt_graph.load(method=LoadMode.DBT_LS, execution_mode=ExecutionMode.LOCAL)
"""

nodes: dict[str, DbtNode] = dict()
Expand All @@ -137,12 +155,16 @@ def __init__(
profile_config: ProfileConfig | None = None,
# dbt_vars only supported for LegacyDbtProject
dbt_vars: dict[str, str] | None = None,
dbt_cmd: str = get_system_dbt(),
operator_args: dict[str, Any] | None = None,
):
self.project = project
self.render_config = render_config
self.profile_config = profile_config
self.execution_config = execution_config
self.dbt_vars = dbt_vars or {}
self.operator_args = operator_args or {}
self.dbt_cmd = dbt_cmd

def load(
self,
Expand Down Expand Up @@ -181,9 +203,7 @@ def load(
else:
load_method[method]()

def run_dbt_ls(
self, dbt_cmd: str, project_path: Path, tmp_dir: Path, env_vars: dict[str, str]
) -> dict[str, DbtNode]:
def run_dbt_ls(self, project_path: Path, tmp_dir: Path, env_vars: dict[str, str]) -> dict[str, DbtNode]:
"""Runs dbt ls command and returns the parsed nodes."""
ls_command = [dbt_cmd, "ls", "--output", "json"]

Expand Down Expand Up @@ -223,10 +243,6 @@ def load_via_dbt_ls(self) -> None:
* self.nodes
* self.filtered_nodes
"""
self.render_config.validate_dbt_command(fallback_cmd=self.execution_config.dbt_executable_path)
dbt_cmd = self.render_config.dbt_executable_path
dbt_cmd = dbt_cmd.as_posix() if isinstance(dbt_cmd, Path) else dbt_cmd

logger.info(f"Trying to parse the dbt project in `{self.render_config.project_path}` using dbt ls...")
if not self.render_config.project_path or not self.execution_config.project_path:
raise CosmosLoadDbtException(
Expand All @@ -236,6 +252,9 @@ def load_via_dbt_ls(self) -> None:
if not self.profile_config:
raise CosmosLoadDbtException("Unable to load project via dbt ls without a profile config.")

if not self.profile_config:
raise CosmosLoadDbtException("Unable to load project via dbt ls without a profile config.")

with tempfile.TemporaryDirectory() as tmpdir:
logger.info(
f"Content of the dbt project dir {self.render_config.project_path}: `{os.listdir(self.render_config.project_path)}`"
Expand Down Expand Up @@ -266,12 +285,12 @@ def load_via_dbt_ls(self) -> None:
env[DBT_TARGET_PATH_ENVVAR] = str(self.target_dir)

if self.render_config.dbt_deps:
deps_command = [dbt_cmd, "deps"]
deps_command = [self.dbt_cmd, "deps"]
deps_command.extend(self.local_flags)
stdout = run_command(deps_command, tmpdir_path, env)
logger.debug("dbt deps output: %s", stdout)

nodes = self.run_dbt_ls(dbt_cmd, self.execution_config.project_path, tmpdir_path, env)
nodes = self.run_dbt_ls(self.execution_config.project_path, tmpdir_path, env)

self.nodes = nodes
self.filtered_nodes = nodes
Expand Down
14 changes: 6 additions & 8 deletions tests/dbt/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import pytest

from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig, CosmosConfigException
from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig
from cosmos.constants import DbtResourceType, ExecutionMode
from cosmos.dbt.graph import (
CosmosLoadDbtException,
Expand Down Expand Up @@ -374,8 +374,9 @@ def test_load_via_dbt_ls_without_exclude(project_name, postgres_profile_config):
def test_load_via_custom_without_project_path():
project_config = ProjectConfig(manifest_path=SAMPLE_MANIFEST, project_name="test")
execution_config = ExecutionConfig()
render_config = RenderConfig(dbt_executable_path="/inexistent/dbt")
render_config = RenderConfig()
dbt_graph = DbtGraph(
dbt_cmd="/inexistent/dbt",
project=project_config,
execution_config=execution_config,
render_config=render_config,
Expand All @@ -391,10 +392,9 @@ def test_load_via_custom_without_project_path():
def test_load_via_dbt_ls_without_profile(mock_validate_dbt_command):
project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME)
execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME)
render_config = RenderConfig(
dbt_executable_path="existing-dbt-cmd", dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME
)
render_config = RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME)
dbt_graph = DbtGraph(
dbt_cmd="/inexistent/dbt",
project=project_config,
execution_config=execution_config,
render_config=render_config,
Expand All @@ -410,9 +410,7 @@ def test_load_via_dbt_ls_without_profile(mock_validate_dbt_command):
def test_load_via_dbt_ls_with_invalid_dbt_path(mock_which):
project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME)
execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME)
render_config = RenderConfig(
dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, dbt_executable_path="/inexistent/dbt"
)
render_config = RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME)
with patch("pathlib.Path.exists", return_value=True):
dbt_graph = DbtGraph(
project=project_config,
Expand Down
68 changes: 0 additions & 68 deletions tests/test_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,74 +270,6 @@ def test_converter_fails_execution_config_no_project_dir(mock_load_dbt_graph, ex
)


def test_converter_fails_render_config_invalid_dbt_path_with_dbt_ls():
"""
Validate that a dbt project fails to be rendered to Airflow with DBT_LS if
the dbt command is invalid.
"""
project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT.as_posix(), project_name="sample")
execution_config = ExecutionConfig(
execution_mode=ExecutionMode.LOCAL,
dbt_executable_path="invalid-execution-dbt",
)
render_config = RenderConfig(
emit_datasets=True,
dbt_executable_path="invalid-render-dbt",
)
profile_config = ProfileConfig(
profile_name="my_profile_name",
target_name="my_target_name",
profiles_yml_filepath=SAMPLE_PROFILE_YML,
)
with pytest.raises(CosmosConfigException) as err_info:
with DAG("test-id", start_date=datetime(2022, 1, 1)) as dag:
DbtToAirflowConverter(
dag=dag,
nodes=nodes,
project_config=project_config,
profile_config=profile_config,
execution_config=execution_config,
render_config=render_config,
)
assert (
err_info.value.args[0]
== "Unable to find the dbt executable, attempted: <invalid-render-dbt> and <invalid-execution-dbt>."
)


def test_converter_fails_render_config_invalid_dbt_path_with_manifest():
"""
Validate that a dbt project succeeds to be rendered to Airflow with DBT_MANIFEST even when
the dbt command is invalid.
"""
project_config = ProjectConfig(manifest_path=SAMPLE_DBT_MANIFEST.as_posix(), project_name="sample")

execution_config = ExecutionConfig(
execution_mode=ExecutionMode.LOCAL,
dbt_executable_path="invalid-execution-dbt",
dbt_project_path=SAMPLE_DBT_PROJECT.as_posix(),
)
render_config = RenderConfig(
emit_datasets=True,
dbt_executable_path="invalid-render-dbt",
)
profile_config = ProfileConfig(
profile_name="my_profile_name",
target_name="my_target_name",
profiles_yml_filepath=SAMPLE_PROFILE_YML,
)
with DAG("test-id", start_date=datetime(2022, 1, 1)) as dag:
converter = DbtToAirflowConverter(
dag=dag,
nodes=nodes,
project_config=project_config,
profile_config=profile_config,
execution_config=execution_config,
render_config=render_config,
)
assert converter


@pytest.mark.parametrize(
"execution_mode,operator_args",
[
Expand Down

0 comments on commit a0f9550

Please sign in to comment.