diff --git a/cosmos/config.py b/cosmos/config.py index 46e3f1915..e4985ed1b 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -288,8 +288,11 @@ class ExecutionConfig: :param execution_mode: The execution mode for dbt. Defaults to local :param test_indirect_selection: The mode to configure the test behavior when performing indirect selection. - :param dbt_executable_path: The path to the dbt executable for runtime execution. Defaults to dbt if available on the path. :param dbt_project_path Configures the DBT project location accessible at runtime for dag execution. This is the project path in a docker container for ExecutionMode.DOCKER or ExecutionMode.KUBERNETES. Mutually Exclusive with ProjectConfig.dbt_project_path + :param dbt_executable_path: The path to the dbt executable. Defaults to dbt if + available on the path. + :param virtualenv_dir: Directory path to locate the (cached) virtual env that + should be used for execution when execution mode is set to `ExecutionMode.VIRTUALENV` """ execution_mode: ExecutionMode = ExecutionMode.LOCAL @@ -297,6 +300,8 @@ class ExecutionConfig: dbt_executable_path: str | Path = field(default_factory=get_system_dbt) dbt_project_path: InitVar[str | Path | None] = None + virtualenv_dir: str | Path | None = None + project_path: Path | None = field(init=False) def __post_init__(self, dbt_project_path: str | Path | None) -> None: diff --git a/cosmos/converter.py b/cosmos/converter.py index c2b31700b..7e284aacb 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -15,6 +15,7 @@ from cosmos.constants import ExecutionMode from cosmos.dbt.graph import DbtGraph from cosmos.dbt.selector import retrieve_by_label +from cosmos.constants import ExecutionMode from cosmos.config import ProjectConfig, ExecutionConfig, RenderConfig, ProfileConfig from cosmos.exceptions import CosmosValueError from cosmos.log import get_logger @@ -217,12 +218,36 @@ def __init__( # If we are using the old interface, we should migrate it to the new interface # This is safe to do now since we have validated which config interface we're using if project_config.dbt_project_path: - execution_config, render_config = migrate_to_new_interface(execution_config, project_config, render_config) + # We copy the configuration so the change does not affect other DAGs or TaskGroups + # that may reuse the same original configuration + render_config = copy.deepcopy(render_config) + execution_config = copy.deepcopy(execution_config) + render_config.project_path = project_config.dbt_project_path + execution_config.project_path = project_config.dbt_project_path validate_adapted_user_config(execution_config, project_config, render_config) env_vars = project_config.env_vars or operator_args.pop("env", None) dbt_vars = project_config.dbt_vars or operator_args.pop("vars", None) + # We now have a guaranteed execution_config.project_path, but still need to process render_config.project_path + # We require render_config.project_path when we dont have a manifest + if not project_config.manifest_path and not render_config.project_path: + raise CosmosValueError( + "RenderConfig.dbt_project_path is required for rendering an airflow DAG from a DBT Graph if no manifest is provided." + ) + + if execution_config.execution_mode != ExecutionMode.VIRTUALENV and execution_config.virtualenv_dir is not None: + logger.warning( + "`ExecutionConfig.virtualenv_dir` is only supported when \ + ExecutionConfig.execution_mode is set to ExecutionMode.VIRTUALENV." + ) + + profile_args = {} + if profile_config.profile_mapping: + profile_args = profile_config.profile_mapping.profile_args + + if not operator_args: + operator_args = {} # Previously, we were creating a cosmos.dbt.project.DbtProject # DbtProject has now been replaced with ProjectConfig directly @@ -261,6 +286,8 @@ def __init__( task_args, execution_mode=execution_config.execution_mode, ) + if execution_config.execution_mode == ExecutionMode.VIRTUALENV and execution_config.virtualenv_dir is not None: + task_args["virtualenv_dir"] = execution_config.virtualenv_dir build_airflow_graph( nodes=dbt_graph.filtered_nodes, diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index b41c15a49..7fc19b024 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -3,6 +3,7 @@ import itertools import json import os +import shutil import tempfile import yaml from dataclasses import dataclass, field @@ -21,6 +22,7 @@ ExecutionMode, LoadMode, ) +from cosmos.dbt.executable import get_system_dbt from cosmos.dbt.parser.project import LegacyDbtProject from cosmos.dbt.project import create_symlinks, environ from cosmos.dbt.selector import select_nodes @@ -124,6 +126,15 @@ class DbtGraph: Supports different ways of loading the `dbt` project into this representation. Different loading methods can result in different `nodes` and `filtered_nodes`. + + Example of how to use: + + dbt_graph = DbtGraph( + project=ProjectConfig(dbt_project_path=DBT_PROJECT_PATH), + render_config=RenderConfig(exclude=["*orders*"], select=[]), + dbt_cmd="/usr/local/bin/dbt" + ) + dbt_graph.load(method=LoadMode.DBT_LS, execution_mode=ExecutionMode.LOCAL) """ nodes: dict[str, DbtNode] = dict() @@ -137,12 +148,16 @@ def __init__( profile_config: ProfileConfig | None = None, # dbt_vars only supported for LegacyDbtProject dbt_vars: dict[str, str] | None = None, + dbt_cmd: str = get_system_dbt(), + operator_args: dict[str, Any] | None = None, ): self.project = project self.render_config = render_config self.profile_config = profile_config self.execution_config = execution_config self.dbt_vars = dbt_vars or {} + self.operator_args = operator_args or {} + self.dbt_cmd = dbt_cmd def load( self, @@ -181,11 +196,9 @@ def load( else: load_method[method]() - def run_dbt_ls( - self, dbt_cmd: str, project_path: Path, tmp_dir: Path, env_vars: dict[str, str] - ) -> dict[str, DbtNode]: + def run_dbt_ls(self, project_path: Path, tmp_dir: Path, env_vars: dict[str, str]) -> dict[str, DbtNode]: """Runs dbt ls command and returns the parsed nodes.""" - ls_command = [dbt_cmd, "ls", "--output", "json"] + ls_command = [self.dbt_cmd, "ls", "--output", "json"] if self.render_config.exclude: ls_command.extend(["--exclude", *self.render_config.exclude]) @@ -223,10 +236,6 @@ def load_via_dbt_ls(self) -> None: * self.nodes * self.filtered_nodes """ - self.render_config.validate_dbt_command(fallback_cmd=self.execution_config.dbt_executable_path) - dbt_cmd = self.render_config.dbt_executable_path - dbt_cmd = dbt_cmd.as_posix() if isinstance(dbt_cmd, Path) else dbt_cmd - logger.info(f"Trying to parse the dbt project in `{self.render_config.project_path}` using dbt ls...") if not self.render_config.project_path or not self.execution_config.project_path: raise CosmosLoadDbtException( @@ -236,6 +245,9 @@ def load_via_dbt_ls(self) -> None: if not self.profile_config: raise CosmosLoadDbtException("Unable to load project via dbt ls without a profile config.") + if not shutil.which(self.dbt_cmd): + raise CosmosLoadDbtException(f"Unable to find the dbt executable: {self.dbt_cmd}") + with tempfile.TemporaryDirectory() as tmpdir: logger.info( f"Content of the dbt project dir {self.render_config.project_path}: `{os.listdir(self.render_config.project_path)}`" @@ -266,12 +278,12 @@ def load_via_dbt_ls(self) -> None: env[DBT_TARGET_PATH_ENVVAR] = str(self.target_dir) if self.render_config.dbt_deps: - deps_command = [dbt_cmd, "deps"] + deps_command = [self.dbt_cmd, "deps"] deps_command.extend(self.local_flags) stdout = run_command(deps_command, tmpdir_path, env) logger.debug("dbt deps output: %s", stdout) - nodes = self.run_dbt_ls(dbt_cmd, self.execution_config.project_path, tmpdir_path, env) + nodes = self.run_dbt_ls(self.execution_config.project_path, tmpdir_path, env) self.nodes = nodes self.filtered_nodes = nodes diff --git a/cosmos/operators/virtualenv.py b/cosmos/operators/virtualenv.py index 4d6338e09..e57a7808c 100644 --- a/cosmos/operators/virtualenv.py +++ b/cosmos/operators/virtualenv.py @@ -1,12 +1,16 @@ from __future__ import annotations +import os +import psutil +import time from pathlib import Path from tempfile import TemporaryDirectory -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Callable from airflow.compat.functools import cached_property from airflow.utils.python_virtualenv import prepare_virtualenv from cosmos.hooks.subprocess import FullOutputSubprocessResult +from cosmos.exceptions import CosmosValueError from cosmos.log import get_logger from cosmos.operators.local import ( @@ -29,6 +33,16 @@ PY_INTERPRETER = "python3" +def depends_on_virtualenv_dir(method: Callable[[Any], Any]) -> Callable[[Any], Any]: + def wrapper(operator: DbtVirtualenvBaseOperator, *args: Any) -> None: + if operator.virtualenv_dir is None: + raise CosmosValueError(f"Method relies on value of parameter `virtualenv_dir` which is None.") + + method(operator, *args) + + return wrapper + + class DbtVirtualenvBaseOperator(DbtLocalBaseOperator): """ Executes a dbt core cli command within a Python Virtual Environment, that is created before running the dbt command @@ -41,15 +55,19 @@ class DbtVirtualenvBaseOperator(DbtLocalBaseOperator): Avoid using unless the dbt job requires it. """ + template_fields = DbtLocalBaseOperator.template_fields + ("virtualenv_dir",) # type: ignore[operator] + def __init__( self, py_requirements: list[str] | None = None, py_system_site_packages: bool = False, + virtualenv_dir: Path | None = None, **kwargs: Any, ) -> None: self.py_requirements = py_requirements or [] self.py_system_site_packages = py_system_site_packages super().__init__(**kwargs) + self.virtualenv_dir = virtualenv_dir self._venv_tmp_dir: None | TemporaryDirectory[str] = None @cached_property @@ -59,19 +77,14 @@ def venv_dbt_path( """ Path to the dbt binary within a Python virtualenv. - The first time this property is called, it creates a virtualenv and installs the dependencies based on the - self.py_requirements and self.py_system_site_packages. This value is cached for future calls. + The first time this property is called, it creates a new/temporary and installs the dependencies + based on the self.py_requirements and self.py_system_site_packages, or retrieves an existing virtualenv. + This value is cached for future calls. """ # We are reusing the virtualenv directory for all subprocess calls within this task/operator. # For this reason, we are not using contexts at this point. # The deletion of this directory is done explicitly at the end of the `execute` method. - self._venv_tmp_dir = TemporaryDirectory(prefix="cosmos-venv") - py_interpreter = prepare_virtualenv( - venv_directory=self._venv_tmp_dir.name, - python_bin=PY_INTERPRETER, - system_site_packages=self.py_system_site_packages, - requirements=self.py_requirements, - ) + py_interpreter = self._get_or_create_venv_py_interpreter() dbt_binary = Path(py_interpreter).parent / "dbt" cmd_output = self.subprocess_hook.run_command( [ @@ -97,6 +110,92 @@ def execute(self, context: Context) -> None: self._venv_tmp_dir.cleanup() logger.info(output) + def _get_or_create_venv_py_interpreter(self) -> str: + """Helper method that parses virtual env configuration + and returns a DBT binary within the resulting virtualenv""" + + # No virtualenv_dir set, so revert to making a temporary virtualenv + if self.virtualenv_dir is None: + self.log.info("Creating temporary virtualenv") + self._venv_tmp_dir = TemporaryDirectory(prefix="cosmos-venv") + + return prepare_virtualenv( + venv_directory=self._venv_tmp_dir.name, + python_bin=PY_INTERPRETER, + system_site_packages=self.py_system_site_packages, + requirements=self.py_requirements, + ) + + self.log.info(f"Checking if {str(self.__lock_file)} exists") + while not self._is_lock_available(): + self.log.info("Waiting for lock to release") + time.sleep(1) + + self.log.info(f"Creating virtualenv at `{self.virtualenv_dir}") + self.log.info(f"Acquiring available lock") + self.__acquire_venv_lock() + + py_bin = prepare_virtualenv( + venv_directory=str(self.virtualenv_dir), + python_bin=PY_INTERPRETER, + system_site_packages=self.py_system_site_packages, + requirements=self.py_requirements, + ) + + self.log.info("Releasing lock") + self.__release_venv_lock() + + return py_bin + + @property + def __lock_file(self) -> Path: + return Path(f"{self.virtualenv_dir}/LOCK") + + @property + def _pid(self) -> int: + return os.getpid() + + # @depends_on_virtualenv_dir + def _is_lock_available(self) -> bool: + if self.__lock_file.is_file(): + with open(self.__lock_file) as lf: + pid = int(lf.read()) + + self.log.info(f"Checking for running process with PID {pid}") + try: + _process_running = psutil.Process(pid).is_running() + except psutil.NoSuchProcess: + _process_running = False + + self.log.info(f"Process {pid} running: {_process_running}") + return not _process_running + + return True + + @depends_on_virtualenv_dir + def __acquire_venv_lock(self) -> None: + if not self.virtualenv_dir.is_dir(): # type: ignore + os.mkdir(str(self.virtualenv_dir)) + + with open(self.__lock_file, "w") as lf: + self.log.info(f"Acquiring lock at {self.__lock_file} with pid {str(self._pid)}") + lf.write(str(self._pid)) + + @depends_on_virtualenv_dir + def __release_venv_lock(self) -> None: + if not self.__lock_file.is_file(): + self.log.warn(f"Lockfile {self.__lock_file} not found, perhaps deleted by other concurrent operator?") + + return + + with open(self.__lock_file) as lf: + lock_file_pid = int(lf.read()) + + if lock_file_pid == self._pid: + return self.__lock_file.unlink() + + self.log.warn(f"Lockfile owned by process of pid {lock_file_pid}, while operator has pid {self._pid}") + class DbtLSVirtualenvOperator(DbtVirtualenvBaseOperator, DbtLSLocalOperator): """ diff --git a/dev/dags/example_virtualenv.py b/dev/dags/example_virtualenv.py index 7b1368f8c..45af6e0a8 100644 --- a/dev/dags/example_virtualenv.py +++ b/dev/dags/example_virtualenv.py @@ -5,7 +5,11 @@ from datetime import datetime from pathlib import Path -from cosmos import DbtDag, ExecutionMode, ExecutionConfig, ProjectConfig, ProfileConfig +from airflow.decorators import dag +from airflow.configuration import get_airflow_home +from airflow.operators.empty import EmptyOperator + +from cosmos import DbtTaskGroup, ExecutionMode, ExecutionConfig, ProjectConfig, ProfileConfig from cosmos.profiles import PostgresUserPasswordProfileMapping DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt" @@ -21,26 +25,58 @@ ), ) -# [START virtualenv_example] -example_virtualenv = DbtDag( - # dbt/cosmos-specific parameters - project_config=ProjectConfig( - DBT_ROOT_PATH / "jaffle_shop", - ), - profile_config=profile_config, - execution_config=ExecutionConfig( - execution_mode=ExecutionMode.VIRTUALENV, - ), - operator_args={ - "py_system_site_packages": False, - "py_requirements": ["dbt-postgres==1.6.0b1"], - "install_deps": True, - }, - # normal dag parameters + +@dag( schedule_interval="@daily", start_date=datetime(2023, 1, 1), catchup=False, - dag_id="example_virtualenv", - default_args={"retries": 2}, ) +def example_virtualenv() -> None: + start_task = EmptyOperator(task_id="start-venv-examples") + end_task = EmptyOperator(task_id="end-venv-examples") + + tmp_venv_task_group = DbtTaskGroup( + group_id="tmp-venv-group", + # dbt/cosmos-specific parameters + project_config=ProjectConfig( + DBT_ROOT_PATH / "jaffle_shop", + ), + profile_config=profile_config, + execution_config=ExecutionConfig( + execution_mode=ExecutionMode.VIRTUALENV, + # We can enable this flag if we want Airflow to create one virtualenv + # and reuse that within the whole DAG. + # virtualenv_dir=f"{get_airflow_home()}/persistent-venv", + ), + operator_args={ + "py_system_site_packages": False, + "py_requirements": ["dbt-postgres==1.6.0b1"], + "install_deps": True, + }, + ) + + cached_venv_task_group = DbtTaskGroup( + group_id="cached-venv-group", + # dbt/cosmos-specific parameters + project_config=ProjectConfig( + DBT_ROOT_PATH / "jaffle_shop", + ), + profile_config=profile_config, + execution_config=ExecutionConfig( + execution_mode=ExecutionMode.VIRTUALENV, + # We can enable this flag if we want Airflow to create one virtualenv + # and reuse that within the whole DAG. + virtualenv_dir=Path(f"{get_airflow_home()}/persistent-venv"), + ), + operator_args={ + "py_system_site_packages": False, + "py_requirements": ["dbt-postgres==1.6.0b1"], + "install_deps": True, + }, + ) + + start_task >> [tmp_venv_task_group, cached_venv_task_group] >> end_task + + +example_virtualenv() # [END virtualenv_example] diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 7e941cb49..995a8be16 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -374,8 +374,9 @@ def test_load_via_dbt_ls_without_exclude(project_name, postgres_profile_config): def test_load_via_custom_without_project_path(): project_config = ProjectConfig(manifest_path=SAMPLE_MANIFEST, project_name="test") execution_config = ExecutionConfig() - render_config = RenderConfig(dbt_executable_path="/inexistent/dbt") + render_config = RenderConfig() dbt_graph = DbtGraph( + dbt_cmd="/inexistent/dbt", project=project_config, execution_config=execution_config, render_config=render_config, @@ -391,10 +392,9 @@ def test_load_via_custom_without_project_path(): def test_load_via_dbt_ls_without_profile(mock_validate_dbt_command): project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) - render_config = RenderConfig( - dbt_executable_path="existing-dbt-cmd", dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME - ) + render_config = RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) dbt_graph = DbtGraph( + dbt_cmd="/inexistent/dbt", project=project_config, execution_config=execution_config, render_config=render_config, @@ -410,11 +410,10 @@ def test_load_via_dbt_ls_without_profile(mock_validate_dbt_command): def test_load_via_dbt_ls_with_invalid_dbt_path(mock_which): project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) - render_config = RenderConfig( - dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, dbt_executable_path="/inexistent/dbt" - ) + render_config = RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) with patch("pathlib.Path.exists", return_value=True): dbt_graph = DbtGraph( + dbt_cmd="/inexistent/dbt", project=project_config, execution_config=execution_config, render_config=render_config, @@ -424,10 +423,10 @@ def test_load_via_dbt_ls_with_invalid_dbt_path(mock_which): profiles_yml_filepath=Path(__file__).parent.parent / "sample/profiles.yml", ), ) - with pytest.raises(CosmosConfigException) as err_info: + with pytest.raises(CosmosLoadDbtException) as err_info: dbt_graph.load_via_dbt_ls() - expected = "Unable to find the dbt executable, attempted: and ." + expected = "Unable to find the dbt executable: /inexistent/dbt" assert err_info.value.args[0] == expected diff --git a/tests/operators/test_virtualenv.py b/tests/operators/test_virtualenv.py index 13dba8f94..3997fa75e 100644 --- a/tests/operators/test_virtualenv.py +++ b/tests/operators/test_virtualenv.py @@ -1,5 +1,5 @@ from unittest.mock import patch, MagicMock - +from pathlib import Path from cosmos.operators.virtualenv import DbtVirtualenvBaseOperator from airflow.models.connection import Connection @@ -65,3 +65,42 @@ def test_run_command( assert dbt_deps[0][0][0] == dbt_cmd[0][0][0] assert dbt_cmd[0][0][1] == "do-something" assert mock_execute.call_count == 2 + + +@patch("airflow.utils.python_virtualenv.execute_in_subprocess") +@patch("cosmos.operators.virtualenv.DbtLocalBaseOperator.calculate_openlineage_events_completes") +@patch("cosmos.operators.virtualenv.DbtLocalBaseOperator.store_compiled_sql") +@patch("cosmos.operators.virtualenv.DbtLocalBaseOperator.exception_handling") +@patch("cosmos.operators.virtualenv.DbtLocalBaseOperator.subprocess_hook") +@patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator._is_lock_available") +@patch("airflow.hooks.base.BaseHook.get_connection") +def test_supply_virtualenv_dir_flag( + mock_get_connection, + mock_lock_available, + mock_subprocess_hook, + mock_exception_handling, + mock_store_compiled_sql, + mock_calculate_openlineage_events_completes, + mock_execute, +): + mock_lock_available.return_value = True + mock_get_connection.return_value = Connection( + conn_id="fake_conn", + conn_type="postgres", + host="fake_host", + port=5432, + login="fake_login", + password="fake_password", + schema="fake_schema", + ) + venv_operator = DbtVirtualenvBaseOperator( + profile_config=profile_config, + task_id="fake_task", + install_deps=True, + project_dir="./dev/dags/dbt/jaffle_shop", + py_system_site_packages=False, + py_requirements=["dbt-postgres==1.6.0b1"], + emit_datasets=False, + virtualenv_dir=Path("mock-venv"), + ) + assert venv_operator.venv_dbt_path == "mock-venv/bin/dbt" diff --git a/tests/test_converter.py b/tests/test_converter.py index d84249aae..1d9632ef7 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -2,6 +2,7 @@ from pathlib import Path from unittest.mock import patch, MagicMock from cosmos.profiles.postgres import PostgresUserPasswordProfileMapping +import logging import pytest from airflow.models import DAG @@ -16,6 +17,7 @@ SAMPLE_PROFILE_YML = Path(__file__).parent / "sample/profiles.yml" SAMPLE_DBT_PROJECT = Path(__file__).parent / "sample/" SAMPLE_DBT_MANIFEST = Path(__file__).parent / "sample/manifest.json" +LOGGER = logging.getLogger(__name__) @pytest.mark.parametrize("argument_key", ["tags", "paths"]) @@ -167,7 +169,6 @@ def test_converter_creates_dag_with_seed(mock_load_dbt_graph, execution_mode, op "execution_mode,operator_args", [ (ExecutionMode.KUBERNETES, {}), - # (ExecutionMode.DOCKER, {"image": "sample-image"}), ], ) @patch("cosmos.converter.DbtGraph.filtered_nodes", nodes) @@ -197,108 +198,79 @@ def test_converter_creates_dag_with_project_path_str(mock_load_dbt_graph, execut @pytest.mark.parametrize( - "execution_mode,operator_args", + "execution_mode,virtualenv_dir,operator_args", [ - (ExecutionMode.KUBERNETES, {}), + (ExecutionMode.KUBERNETES, Path("/some/virtualenv/dir"), {}), # (ExecutionMode.DOCKER, {"image": "sample-image"}), ], ) @patch("cosmos.converter.DbtGraph.filtered_nodes", nodes) @patch("cosmos.converter.DbtGraph.load") -def test_converter_fails_execution_config_no_project_dir(mock_load_dbt_graph, execution_mode, operator_args): +def test_converter_raises_warning(mock_load_dbt_graph, execution_mode, virtualenv_dir, operator_args, caplog): """ - This test validates that a project, given a manifest path and project name, with seeds - is able to successfully generate a converter + This test will raise a warning if we are trying to pass ExecutionMode != `VirtualEnv` + and still pass a defined `virtualenv_dir` """ - project_config = ProjectConfig(manifest_path=SAMPLE_DBT_MANIFEST.as_posix(), project_name="sample") - execution_config = ExecutionConfig(execution_mode=execution_mode) + project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT.as_posix()) + execution_config = ExecutionConfig(execution_mode=execution_mode, virtualenv_dir=virtualenv_dir) render_config = RenderConfig(emit_datasets=True) profile_config = ProfileConfig( profile_name="my_profile_name", target_name="my_target_name", profiles_yml_filepath=SAMPLE_PROFILE_YML, ) - with pytest.raises(CosmosValueError) as err_info: - DbtToAirflowConverter( - nodes=nodes, - project_config=project_config, - profile_config=profile_config, - execution_config=execution_config, - render_config=render_config, - operator_args=operator_args, - ) - assert ( - err_info.value.args[0] - == "ExecutionConfig.dbt_project_path is required for the execution of dbt tasks in all execution modes." - ) - -def test_converter_fails_render_config_invalid_dbt_path_with_dbt_ls(): - """ - Validate that a dbt project fails to be rendered to Airflow with DBT_LS if - the dbt command is invalid. - """ - project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT.as_posix(), project_name="sample") - execution_config = ExecutionConfig( - execution_mode=ExecutionMode.LOCAL, - dbt_executable_path="invalid-execution-dbt", - ) - render_config = RenderConfig( - emit_datasets=True, - dbt_executable_path="invalid-render-dbt", - ) - profile_config = ProfileConfig( - profile_name="my_profile_name", - target_name="my_target_name", - profiles_yml_filepath=SAMPLE_PROFILE_YML, + converter = DbtToAirflowConverter( + nodes=nodes, + project_config=project_config, + profile_config=profile_config, + execution_config=execution_config, + render_config=render_config, + operator_args=operator_args, ) - with pytest.raises(CosmosConfigException) as err_info: - with DAG("test-id", start_date=datetime(2022, 1, 1)) as dag: - DbtToAirflowConverter( - dag=dag, - nodes=nodes, - project_config=project_config, - profile_config=profile_config, - execution_config=execution_config, - render_config=render_config, - ) + assert ( - err_info.value.args[0] - == "Unable to find the dbt executable, attempted: and ." + "`ExecutionConfig.virtualenv_dir` is only supported when \ + ExecutionConfig.execution_mode is set to ExecutionMode.VIRTUALENV." + in caplog.text ) -def test_converter_fails_render_config_invalid_dbt_path_with_manifest(): +@pytest.mark.parametrize( + "execution_mode,operator_args", + [ + (ExecutionMode.KUBERNETES, {}), + # (ExecutionMode.DOCKER, {"image": "sample-image"}), + ], +) +@patch("cosmos.converter.DbtGraph.filtered_nodes", nodes) +@patch("cosmos.converter.DbtGraph.load") +def test_converter_fails_execution_config_no_project_dir(mock_load_dbt_graph, execution_mode, operator_args): """ - Validate that a dbt project succeeds to be rendered to Airflow with DBT_MANIFEST even when - the dbt command is invalid. + This test validates that a project, given a manifest path and project name, with seeds + is able to successfully generate a converter """ project_config = ProjectConfig(manifest_path=SAMPLE_DBT_MANIFEST.as_posix(), project_name="sample") - - execution_config = ExecutionConfig( - execution_mode=ExecutionMode.LOCAL, - dbt_executable_path="invalid-execution-dbt", - dbt_project_path=SAMPLE_DBT_PROJECT.as_posix(), - ) - render_config = RenderConfig( - emit_datasets=True, - dbt_executable_path="invalid-render-dbt", - ) + execution_config = ExecutionConfig(execution_mode=execution_mode) + render_config = RenderConfig(emit_datasets=True) profile_config = ProfileConfig( profile_name="my_profile_name", target_name="my_target_name", profiles_yml_filepath=SAMPLE_PROFILE_YML, ) - with DAG("test-id", start_date=datetime(2022, 1, 1)) as dag: - converter = DbtToAirflowConverter( - dag=dag, + with pytest.raises(CosmosValueError) as err_info: + DbtToAirflowConverter( nodes=nodes, project_config=project_config, profile_config=profile_config, execution_config=execution_config, render_config=render_config, + operator_args=operator_args, ) - assert converter + assert ( + err_info.value.args[0] + == "ExecutionConfig.dbt_project_path is required for the execution of dbt tasks in all execution modes." + ) @pytest.mark.parametrize(