diff --git a/cosmos/config.py b/cosmos/config.py index f2c07bb98..2cebbf3cc 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -359,6 +359,8 @@ class ExecutionConfig: :param test_indirect_selection: The mode to configure the test behavior when performing indirect selection. :param dbt_executable_path: The path to the dbt executable for runtime execution. Defaults to dbt if available on the path. :param dbt_project_path: Configures the DBT project location accessible at runtime for dag execution. This is the project path in a docker container for ExecutionMode.DOCKER or ExecutionMode.KUBERNETES. Mutually Exclusive with ProjectConfig.dbt_project_path + :param virtualenv_dir: Directory path to locate the (cached) virtual env that + should be used for execution when execution mode is set to `ExecutionMode.VIRTUALENV` """ execution_mode: ExecutionMode = ExecutionMode.LOCAL @@ -367,6 +369,8 @@ class ExecutionConfig: dbt_executable_path: str | Path = field(default_factory=get_system_dbt) dbt_project_path: InitVar[str | Path | None] = None + virtualenv_dir: str | Path | None = None + project_path: Path | None = field(init=False) def __post_init__(self, dbt_project_path: str | Path | None) -> None: diff --git a/cosmos/converter.py b/cosmos/converter.py index 40929ef55..553b7b49e 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -220,12 +220,26 @@ def __init__( validate_initial_user_config(execution_config, profile_config, project_config, render_config, operator_args) if project_config.dbt_project_path: - execution_config, render_config = migrate_to_new_interface(execution_config, project_config, render_config) + # We copy the configuration so the change does not affect other DAGs or TaskGroups + # that may reuse the same original configuration + render_config = copy.deepcopy(render_config) + execution_config = copy.deepcopy(execution_config) + render_config.project_path = project_config.dbt_project_path + execution_config.project_path = project_config.dbt_project_path validate_adapted_user_config(execution_config, project_config, render_config) - env_vars = project_config.env_vars or operator_args.get("env") - dbt_vars = project_config.dbt_vars or operator_args.get("vars") + env_vars = copy.deepcopy(project_config.env_vars or operator_args.get("env")) + dbt_vars = copy.deepcopy(project_config.dbt_vars or operator_args.get("vars")) + + if execution_config.execution_mode != ExecutionMode.VIRTUALENV and execution_config.virtualenv_dir is not None: + logger.warning( + "`ExecutionConfig.virtualenv_dir` is only supported when \ + ExecutionConfig.execution_mode is set to ExecutionMode.VIRTUALENV." + ) + + if not operator_args: + operator_args = {} cache_dir = None cache_identifier = None @@ -275,6 +289,8 @@ def __init__( task_args, execution_mode=execution_config.execution_mode, ) + if execution_config.execution_mode == ExecutionMode.VIRTUALENV and execution_config.virtualenv_dir is not None: + task_args["virtualenv_dir"] = execution_config.virtualenv_dir build_airflow_graph( nodes=self.dbt_graph.filtered_nodes, diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index ad759d675..1c0237e8f 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -191,6 +191,15 @@ class DbtGraph: Supports different ways of loading the `dbt` project into this representation. Different loading methods can result in different `nodes` and `filtered_nodes`. + + Example of how to use: + + dbt_graph = DbtGraph( + project=ProjectConfig(dbt_project_path=DBT_PROJECT_PATH), + render_config=RenderConfig(exclude=["*orders*"], select=[]), + dbt_cmd="/usr/local/bin/dbt" + ) + dbt_graph.load(method=LoadMode.DBT_LS, execution_mode=ExecutionMode.LOCAL) """ nodes: dict[str, DbtNode] = dict() @@ -207,6 +216,7 @@ def __init__( cache_identifier: str = "", dbt_vars: dict[str, str] | None = None, airflow_metadata: dict[str, str] | None = None, + operator_args: dict[str, Any] | None = None, ): self.project = project self.render_config = render_config @@ -219,6 +229,7 @@ def __init__( else: self.dbt_ls_cache_key = "" self.dbt_vars = dbt_vars or {} + self.operator_args = operator_args or {} @cached_property def env_vars(self) -> dict[str, str]: @@ -568,7 +579,6 @@ def load_via_dbt_ls_without_cache(self) -> None: self.run_dbt_deps(dbt_cmd, tmpdir_path, env) nodes = self.run_dbt_ls(dbt_cmd, self.project_path, tmpdir_path, env) - self.nodes = nodes self.filtered_nodes = nodes diff --git a/cosmos/operators/virtualenv.py b/cosmos/operators/virtualenv.py index d12e2afad..60918a0a6 100644 --- a/cosmos/operators/virtualenv.py +++ b/cosmos/operators/virtualenv.py @@ -1,12 +1,17 @@ from __future__ import annotations -from functools import cached_property +import os +import shutil +import time from pathlib import Path from tempfile import TemporaryDirectory -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Callable +import psutil from airflow.utils.python_virtualenv import prepare_virtualenv +from cosmos import settings +from cosmos.exceptions import CosmosValueError from cosmos.hooks.subprocess import FullOutputSubprocessResult from cosmos.log import get_logger from cosmos.operators.local import ( @@ -25,10 +30,19 @@ if TYPE_CHECKING: from airflow.utils.context import Context + +PY_INTERPRETER = "python3" +LOCK_FILENAME = "cosmos_virtualenv.lock" logger = get_logger(__name__) -PY_INTERPRETER = "python3" +def depends_on_virtualenv_dir(method: Callable[[Any], Any]) -> Callable[[Any], Any]: + def wrapper(operator: DbtVirtualenvBaseOperator, *args: Any) -> Any: + if operator.virtualenv_dir is None: + raise CosmosValueError(f"Method relies on value of parameter `virtualenv_dir` which is None.") + return method(operator, *args) + + return wrapper class DbtVirtualenvBaseOperator(DbtLocalBaseOperator): @@ -42,71 +56,147 @@ class DbtVirtualenvBaseOperator(DbtLocalBaseOperator): :param py_system_site_packages: Whether or not all the Python packages from the Airflow instance will be accessible within the virtual environment (if py_requirements argument is specified). Avoid using unless the dbt job requires it. + :param virtualenv_dir: Directory path where Cosmos will create/update Python virtualenv. If defined, will persist the Python virtualenv in the Airflow worker node. + :param is_virtualenv_dir_temporary: Tells Cosmos if virtualenv should be persisted or not. """ + template_fields = DbtLocalBaseOperator.template_fields + ("virtualenv_dir", "is_virtualenv_dir_temporary") # type: ignore[operator] + def __init__( self, py_requirements: list[str] | None = None, pip_install_options: list[str] | None = None, py_system_site_packages: bool = False, + virtualenv_dir: Path | None = None, + is_virtualenv_dir_temporary: bool = False, **kwargs: Any, ) -> None: self.py_requirements = py_requirements or [] self.pip_install_options = pip_install_options or [] self.py_system_site_packages = py_system_site_packages + self.virtualenv_dir = virtualenv_dir + self.is_virtualenv_dir_temporary = is_virtualenv_dir_temporary + self.max_retries_lock = settings.virtualenv_max_retries_lock super().__init__(**kwargs) - self._venv_tmp_dir: None | TemporaryDirectory[str] = None - - @cached_property - def venv_dbt_path( - self, - ) -> str: - """ - Path to the dbt binary within a Python virtualenv. - - The first time this property is called, it creates a virtualenv and installs the dependencies based on the - self.py_requirements, self.pip_install_options, and self.py_system_site_packages. This value is cached for future calls. - """ - # We are reusing the virtualenv directory for all subprocess calls within this task/operator. - # For this reason, we are not using contexts at this point. - # The deletion of this directory is done explicitly at the end of the `execute` method. - self._venv_tmp_dir = TemporaryDirectory(prefix="cosmos-venv") - py_interpreter = prepare_virtualenv( - venv_directory=self._venv_tmp_dir.name, - python_bin=PY_INTERPRETER, - system_site_packages=self.py_system_site_packages, - requirements=self.py_requirements, - pip_install_options=self.pip_install_options, - ) - dbt_binary = Path(py_interpreter).parent / "dbt" - cmd_output = self.subprocess_hook.run_command( - [ - py_interpreter, - "-c", - "from importlib.metadata import version; print(version('dbt-core'))", - ] - ) - dbt_version = cmd_output.output - self.log.info("Using dbt version %s available at %s", dbt_version, dbt_binary) - return str(dbt_binary) + if not self.py_requirements: + self.log.error("Cosmos virtualenv operators require the `py_requirements` parameter") def run_subprocess(self, command: list[str], env: dict[str, str], cwd: str) -> FullOutputSubprocessResult: - if self.py_requirements: - command[0] = self.venv_dbt_path - - subprocess_result: FullOutputSubprocessResult = self.subprocess_hook.run_command( + # No virtualenv_dir set, so create a temporary virtualenv + if self.virtualenv_dir is None or self.is_virtualenv_dir_temporary: + self.log.info("Creating temporary virtualenv") + with TemporaryDirectory(prefix="cosmos-venv") as tempdir: + self.virtualenv_dir = Path(tempdir) + py_bin = self._prepare_virtualenv() + dbt_bin = str(Path(py_bin).parent / "dbt") + command[0] = dbt_bin # type: ignore + subprocess_result: FullOutputSubprocessResult = self.subprocess_hook.run_command( + command=command, + env=env, + cwd=cwd, + output_encoding=self.output_encoding, + ) + return subprocess_result + + # Use a reusable virtualenv + self.log.info(f"Checking if the virtualenv lock {str(self._lock_file)} exists") + while not self._is_lock_available() and self.max_retries_lock: + logger.info("Waiting for virtualenv lock to be released") + time.sleep(1) + self.max_retries_lock -= 1 + + self.log.info(f"Acquiring the virtualenv lock") + self._acquire_venv_lock() + py_bin = self._prepare_virtualenv() + dbt_bin = str(Path(py_bin).parent / "dbt") + command[0] = dbt_bin # type: ignore + subprocess_result = self.subprocess_hook.run_command( command=command, env=env, cwd=cwd, output_encoding=self.output_encoding, ) + self.log.info("Releasing virtualenv lock") + self._release_venv_lock() return subprocess_result + def clean_dir_if_temporary(self) -> None: + """ + Delete the virtualenv directory if it is temporary. + """ + if self.is_virtualenv_dir_temporary and self.virtualenv_dir and self.virtualenv_dir.exists(): + self.log.info(f"Deleting the Python virtualenv {self.virtualenv_dir}") + shutil.rmtree(str(self.virtualenv_dir), ignore_errors=True) + def execute(self, context: Context) -> None: - output = super().execute(context) - if self._venv_tmp_dir: - self._venv_tmp_dir.cleanup() - self.log.info(output) + try: + output = super().execute(context) + self.log.info(output) + finally: + self.clean_dir_if_temporary() + + def on_kill(self) -> None: + self.clean_dir_if_temporary() + + def _prepare_virtualenv(self) -> str: + self.log.info(f"Creating or updating the virtualenv at `{self.virtualenv_dir}") + py_bin = prepare_virtualenv( + venv_directory=str(self.virtualenv_dir), + python_bin=PY_INTERPRETER, + system_site_packages=self.py_system_site_packages, + requirements=self.py_requirements, + pip_install_options=self.pip_install_options, + ) + return py_bin + + @property + def _lock_file(self) -> Path: + filepath = Path(f"{self.virtualenv_dir}/{LOCK_FILENAME}") + return filepath + + @property + def _pid(self) -> int: + return os.getpid() + + @depends_on_virtualenv_dir + def _is_lock_available(self) -> bool: + is_available = True + if self._lock_file.is_file(): + with open(self._lock_file) as lf: + pid = int(lf.read()) + self.log.info(f"Checking for running process with PID {pid}") + try: + _process_running = psutil.Process(pid).is_running() + self.log.info(f"Process {pid} running: {_process_running} and has the lock {self._lock_file}.") + except psutil.NoSuchProcess: + self.log.info(f"Process {pid} is not running. Lock {self._lock_file} was outdated.") + is_available = True + else: + is_available = not _process_running + return is_available + + @depends_on_virtualenv_dir + def _acquire_venv_lock(self) -> None: + if not self.virtualenv_dir.is_dir(): # type: ignore + os.mkdir(str(self.virtualenv_dir)) + + with open(self._lock_file, "w") as lf: + logger.info(f"Acquiring lock at {self._lock_file} with pid {str(self._pid)}") + lf.write(str(self._pid)) + + @depends_on_virtualenv_dir + def _release_venv_lock(self) -> None: + if not self._lock_file.is_file(): + logger.warning(f"Lockfile {self._lock_file} not found, perhaps deleted by other concurrent operator?") + return + + with open(self._lock_file) as lf: + lock_file_pid = int(lf.read()) + + if lock_file_pid == self._pid: + return self._lock_file.unlink() + + logger.warning(f"Lockfile owned by process of pid {lock_file_pid}, while operator has pid {self._pid}") class DbtBuildVirtualenvOperator(DbtVirtualenvBaseOperator, DbtBuildLocalOperator): # type: ignore[misc] diff --git a/cosmos/settings.py b/cosmos/settings.py index 37fe437c8..43abc8897 100644 --- a/cosmos/settings.py +++ b/cosmos/settings.py @@ -27,6 +27,7 @@ dbt_docs_index_file_name = conf.get("cosmos", "dbt_docs_index_file_name", fallback="index.html") enable_cache_profile = conf.getboolean("cosmos", "enable_cache_profile", fallback=True) dbt_profile_cache_dir_name = conf.get("cosmos", "profile_cache_dir_name", fallback="profile") +virtualenv_max_retries_lock = conf.getint("cosmos", "virtualenv_max_retries_lock", fallback=120) # Experimentally adding `remote_cache_dir` as a separate entity in the Cosmos 1.6 release to gather feedback. # This will be merged with the `cache_dir` config parameter in upcoming releases. diff --git a/dev/dags/example_virtualenv.py b/dev/dags/example_virtualenv.py index cd38cba9e..24f4a8250 100644 --- a/dev/dags/example_virtualenv.py +++ b/dev/dags/example_virtualenv.py @@ -6,7 +6,10 @@ from datetime import datetime from pathlib import Path -from cosmos import DbtDag, ExecutionConfig, ExecutionMode, ProfileConfig, ProjectConfig +from airflow.decorators import dag +from airflow.operators.empty import EmptyOperator + +from cosmos import DbtTaskGroup, ExecutionConfig, ExecutionMode, ProfileConfig, ProjectConfig from cosmos.profiles import PostgresUserPasswordProfileMapping DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt" @@ -22,27 +25,64 @@ ), ) + # [START virtualenv_example] -example_virtualenv = DbtDag( - # dbt/cosmos-specific parameters - project_config=ProjectConfig( - DBT_ROOT_PATH / "jaffle_shop", - ), - profile_config=profile_config, - execution_config=ExecutionConfig( - execution_mode=ExecutionMode.VIRTUALENV, - ), - operator_args={ - "py_system_site_packages": False, - "py_requirements": ["dbt-postgres==1.6.0b1"], - "install_deps": True, - "emit_datasets": False, # Example of how to not set inlets and outlets - }, - # normal dag parameters +@dag( schedule_interval="@daily", start_date=datetime(2023, 1, 1), catchup=False, - dag_id="example_virtualenv", - default_args={"retries": 2}, ) +def example_virtualenv() -> None: + start_task = EmptyOperator(task_id="start-venv-examples") + end_task = EmptyOperator(task_id="end-venv-examples") + + # This first task group creates a new Cosmos virtualenv every time a task is run + # and deletes it afterwards + # It is much slower than if the user sets the `virtualenv_dir` + tmp_venv_task_group = DbtTaskGroup( + group_id="tmp-venv-group", + # dbt/cosmos-specific parameters + project_config=ProjectConfig( + DBT_ROOT_PATH / "jaffle_shop", + ), + profile_config=profile_config, + execution_config=ExecutionConfig( + execution_mode=ExecutionMode.VIRTUALENV, + # Without setting virtualenv_dir="/some/path/persistent-venv", + # Cosmos creates a new Python virtualenv for each dbt task being executed + ), + operator_args={ + "py_system_site_packages": False, + "py_requirements": ["dbt-postgres"], + "install_deps": True, + "emit_datasets": False, # Example of how to not set inlets and outlets + }, + ) + + # The following task group reuses the Cosmos-managed Python virtualenv across multiple tasks. + # It runs approximately 70% faster than the previous TaskGroup. + cached_venv_task_group = DbtTaskGroup( + group_id="cached-venv-group", + # dbt/cosmos-specific parameters + project_config=ProjectConfig( + DBT_ROOT_PATH / "jaffle_shop", + ), + profile_config=profile_config, + execution_config=ExecutionConfig( + execution_mode=ExecutionMode.VIRTUALENV, + # We can set the argument `virtualenv_dir` if we want Cosmos to create one Python virtualenv + # and reuse that to run all the dbt tasks within the same worker node + virtualenv_dir=Path("/tmp/persistent-venv2"), + ), + operator_args={ + "py_system_site_packages": False, + "py_requirements": ["dbt-postgres"], + "install_deps": True, + }, + ) + + start_task >> [tmp_venv_task_group, cached_venv_task_group] >> end_task + + +example_virtualenv() # [END virtualenv_example] diff --git a/docs/configuration/cosmos-conf.rst b/docs/configuration/cosmos-conf.rst index 6b05ec814..95a4adcad 100644 --- a/docs/configuration/cosmos-conf.rst +++ b/docs/configuration/cosmos-conf.rst @@ -94,6 +94,13 @@ This page lists all available Airflow configurations that affect ``astronomer-co - Default: ``profile`` - Environment Variable: ``AIRFLOW__COSMOS__PROFILE_CACHE_DIR_NAME`` +.. `virtualenv_max_retries_lock`_: + When using ``ExecutionMode.VIRTUALENV`` and persisted virtualenv directories (`virtualenv_dir` argument), + users can define how many seconds Cosmos waits for the lock to be released. + + - Default: 120 + - Environment Variable: ``AIRFLOW__COSMOS__VIRTUALENV_MAX_RETRIES_LOCK`` + .. _remote_cache_dir: `remote_cache_dir`_: @@ -119,6 +126,7 @@ This page lists all available Airflow configurations that affect ``astronomer-co - Default: ``None`` - Environment Variable: ``AIRFLOW__COSMOS__REMOTE_CACHE_DIR_CONN_ID`` + [openlineage] ~~~~~~~~~~~~~ diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 879663009..4174c9a2d 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -1522,7 +1522,7 @@ def test_save_dbt_ls_cache(mock_variable_set, mock_datetime, tmp_dbt_project_dir hash_dir, hash_args = version.split(",") assert hash_args == "d41d8cd98f00b204e9800998ecf8427e" if sys.platform == "darwin": - assert hash_dir == "b556a0a268e28868971d14c98548c349" + assert hash_dir == "c1e25b0679b5ddcb636bcc30f2f85a06" else: assert hash_dir == "6f63493009733a7be34364a6ea3ffd3c" diff --git a/tests/operators/test_virtualenv.py b/tests/operators/test_virtualenv.py index deb7151e5..0a7128626 100644 --- a/tests/operators/test_virtualenv.py +++ b/tests/operators/test_virtualenv.py @@ -1,11 +1,18 @@ +from __future__ import annotations + +import logging +import os from datetime import datetime +from pathlib import Path from unittest.mock import MagicMock, patch +import pytest from airflow.models import DAG from airflow.models.connection import Connection from cosmos.config import ProfileConfig from cosmos.constants import InvocationMode +from cosmos.exceptions import CosmosValueError from cosmos.operators.virtualenv import DbtVirtualenvBaseOperator from cosmos.profiles import PostgresUserPasswordProfileMapping @@ -20,7 +27,10 @@ class ConcreteDbtVirtualenvBaseOperator(DbtVirtualenvBaseOperator): - base_cmd = ["cmd"] + + @property + def base_cmd(self) -> list[str]: + return ["cmd"] @patch("airflow.utils.python_virtualenv.execute_in_subprocess") @@ -29,7 +39,7 @@ class ConcreteDbtVirtualenvBaseOperator(DbtVirtualenvBaseOperator): @patch("cosmos.operators.virtualenv.DbtLocalBaseOperator.handle_exception_subprocess") @patch("cosmos.operators.virtualenv.DbtLocalBaseOperator.subprocess_hook") @patch("airflow.hooks.base.BaseHook.get_connection") -def test_run_command( +def test_run_command_without_virtualenv_dir( mock_get_connection, mock_subprocess_hook, mock_exception_handling, @@ -58,21 +68,62 @@ def test_run_command( emit_datasets=False, invocation_mode=InvocationMode.SUBPROCESS, ) - assert venv_operator._venv_tmp_dir is None # Otherwise we are creating empty directories during DAG parsing time - # and not deleting them + assert venv_operator.virtualenv_dir == None venv_operator.run_command(cmd=["fake-dbt", "do-something"], env={}, context={"task_instance": MagicMock()}) run_command_args = mock_subprocess_hook.run_command.call_args_list - assert len(run_command_args) == 3 - python_cmd = run_command_args[0] - dbt_deps = run_command_args[1].kwargs - dbt_cmd = run_command_args[2].kwargs - assert python_cmd[0][0][0].endswith("/bin/python") - assert python_cmd[0][-1][-1] == "from importlib.metadata import version; print(version('dbt-core'))" - assert dbt_deps["command"][1] == "deps" - assert dbt_deps["command"][0].endswith("/bin/dbt") + assert len(run_command_args) == 2 + dbt_deps = run_command_args[0].kwargs + dbt_cmd = run_command_args[1].kwargs assert dbt_deps["command"][0] == dbt_cmd["command"][0] + assert dbt_deps["command"][1] == "deps" assert dbt_cmd["command"][1] == "do-something" - assert mock_execute.call_count == 2 + assert mock_execute.call_count == 4 + + +@patch("airflow.utils.python_virtualenv.execute_in_subprocess") +@patch("cosmos.operators.virtualenv.DbtLocalBaseOperator.calculate_openlineage_events_completes") +@patch("cosmos.operators.virtualenv.DbtLocalBaseOperator.store_compiled_sql") +@patch("cosmos.operators.virtualenv.DbtLocalBaseOperator.handle_exception_subprocess") +@patch("cosmos.operators.virtualenv.DbtLocalBaseOperator.subprocess_hook") +@patch("airflow.hooks.base.BaseHook.get_connection") +def test_run_command_with_virtualenv_dir( + mock_get_connection, + mock_subprocess_hook, + mock_exception_handling, + mock_store_compiled_sql, + mock_calculate_openlineage_events_completes, + mock_execute, +): + mock_get_connection.return_value = Connection( + conn_id="fake_conn", + conn_type="postgres", + host="fake_host", + port=5432, + login="fake_login", + password="fake_password", + schema="fake_schema", + ) + venv_operator = ConcreteDbtVirtualenvBaseOperator( + dag=DAG("sample_dag", start_date=datetime(2024, 4, 16)), + profile_config=profile_config, + task_id="fake_task", + install_deps=True, + project_dir="./dev/dags/dbt/jaffle_shop", + py_system_site_packages=False, + pip_install_options=["--test-flag"], + py_requirements=["dbt-postgres==1.6.0b1"], + emit_datasets=False, + invocation_mode=InvocationMode.SUBPROCESS, + virtualenv_dir=Path("mock-venv"), + ) + venv_operator.run_command(cmd=["fake-dbt", "do-something"], env={}, context={"task_instance": MagicMock()}) + assert str(venv_operator.virtualenv_dir) == "mock-venv" + run_command_args = mock_subprocess_hook.run_command.call_args_list + assert len(run_command_args) == 2 + dbt_deps = run_command_args[0].kwargs + dbt_cmd = run_command_args[1].kwargs + assert dbt_deps["command"][0] == "mock-venv/bin/dbt" + assert dbt_cmd["command"][0] == "mock-venv/bin/dbt" def test_virtualenv_operator_append_env_is_true_by_default(): @@ -90,3 +141,183 @@ def test_virtualenv_operator_append_env_is_true_by_default(): ) assert venv_operator.append_env is True + + +def test_depends_on_virtualenv_dir_raises_exeption(): + venv_operator = ConcreteDbtVirtualenvBaseOperator( + profile_config=profile_config, + project_dir="./dev/dags/dbt/jaffle_shop", + task_id="buggy_task", + ) + venv_operator.virtualenv_dir = None + with pytest.raises(CosmosValueError) as excepion_info: + venv_operator._is_lock_available() + assert str(excepion_info.value) == "Method relies on value of parameter `virtualenv_dir` which is None." + + +def test_clean_dir_if_temporary(tmpdir): + tmp_filepath = Path(tmpdir / "tmpfile.txt") + tmp_filepath.touch() + assert tmp_filepath.exists() + + venv_operator = ConcreteDbtVirtualenvBaseOperator( + profile_config=profile_config, + project_dir="./dev/dags/dbt/jaffle_shop", + task_id="okay_task", + is_virtualenv_dir_temporary=True, + virtualenv_dir=tmpdir, + ) + venv_operator.clean_dir_if_temporary() + assert not tmp_filepath.exists() + assert not tmpdir.exists() + + +@patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator.clean_dir_if_temporary") +def test_on_kill(mock_clean_dir_if_temporary): + venv_operator = ConcreteDbtVirtualenvBaseOperator( + profile_config=profile_config, + project_dir="./dev/dags/dbt/jaffle_shop", + task_id="okay_task", + ) + venv_operator.on_kill() + assert mock_clean_dir_if_temporary.called + + +@patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator.subprocess_hook") +@patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator._release_venv_lock") +@patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator._prepare_virtualenv") +@patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator._acquire_venv_lock") +@patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator._is_lock_available", side_effect=[False, False, True]) +def test_run_subprocess_waits_for_lock( + mock_is_lock_available, mock_acquire, mock_prepare, mock_release, mock_subprocess_hook, tmpdir, caplog +): + venv_operator = ConcreteDbtVirtualenvBaseOperator( + profile_config=profile_config, + project_dir="./dev/dags/dbt/jaffle_shop", + task_id="okay_task", + is_virtualenv_dir_temporary=False, + virtualenv_dir=tmpdir, + ) + venv_operator.run_subprocess(["dbt", "run"], {}, "./dev/dags/dbt/jaffle_shop") + assert caplog.text.count("Waiting for virtualenv lock to be released") == 2 + + +@patch("cosmos.operators.local.DbtLocalBaseOperator.execute", side_effect=ValueError) +@patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator.clean_dir_if_temporary") +def test__execute_cleans_dir(mock_clean_dir_if_temporary, mock_execute, caplog): + venv_operator = ConcreteDbtVirtualenvBaseOperator( + profile_config=profile_config, + project_dir="./dev/dags/dbt/jaffle_shop", + task_id="okay_task", + ) + with pytest.raises(ValueError): + venv_operator.execute(None) + assert mock_clean_dir_if_temporary.called + + +def test__is_lock_available_returns_false(tmpdir): + parent_pid = os.getppid() + lockfile = tmpdir / "cosmos_virtualenv.lock" + lockfile.write_text(str(parent_pid), encoding="utf-8") + venv_operator = ConcreteDbtVirtualenvBaseOperator( + profile_config=profile_config, + project_dir="./dev/dags/dbt/jaffle_shop", + task_id="okay_task", + is_virtualenv_dir_temporary=False, + virtualenv_dir=tmpdir, + ) + assert not venv_operator._is_lock_available() + + +def test__is_lock_available_returns_true_pid_no_longer_running(tmpdir): + non_existent_pid = "74717471" + lockfile = tmpdir / "cosmos_virtualenv.lock" + lockfile.write_text(str(non_existent_pid), encoding="utf-8") + venv_operator = ConcreteDbtVirtualenvBaseOperator( + profile_config=profile_config, + project_dir="./dev/dags/dbt/jaffle_shop", + task_id="okay_task", + is_virtualenv_dir_temporary=False, + virtualenv_dir=tmpdir, + ) + assert venv_operator._is_lock_available() + + +def test__is_lock_available_returns_true_pid_no_lockfile(tmpdir): + venv_operator = ConcreteDbtVirtualenvBaseOperator( + profile_config=profile_config, + project_dir="./dev/dags/dbt/jaffle_shop", + task_id="okay_task", + is_virtualenv_dir_temporary=False, + virtualenv_dir=tmpdir, + ) + assert venv_operator._is_lock_available() + + +def test__acquire_venv_lock_existing_dir(tmpdir, caplog): + venv_operator = ConcreteDbtVirtualenvBaseOperator( + profile_config=profile_config, + project_dir="./dev/dags/dbt/jaffle_shop", + task_id="okay_task", + is_virtualenv_dir_temporary=False, + virtualenv_dir=Path(tmpdir), + ) + assert venv_operator._acquire_venv_lock() is None + assert "Acquiring lock at" in caplog.text + + +def test__acquire_venv_lock_new_subdir(tmpdir, caplog): + subdir = Path(tmpdir / "subdir") + venv_operator = ConcreteDbtVirtualenvBaseOperator( + profile_config=profile_config, + project_dir="./dev/dags/dbt/jaffle_shop", + task_id="okay_task", + is_virtualenv_dir_temporary=False, + virtualenv_dir=subdir, + ) + assert venv_operator._acquire_venv_lock() is None + assert "Acquiring lock at" in caplog.text + + +def test__release_venv_lock_inexistent(tmpdir, caplog): + venv_operator = ConcreteDbtVirtualenvBaseOperator( + profile_config=profile_config, + project_dir="./dev/dags/dbt/jaffle_shop", + task_id="okay_task", + is_virtualenv_dir_temporary=False, + virtualenv_dir=tmpdir, + ) + assert venv_operator._release_venv_lock() is None + assert "not found, perhaps deleted by other concurrent operator?" in caplog.text + + +def test__release_venv_lock_another_process(tmpdir, caplog): + caplog.set_level(logging.WARNING) + non_existent_pid = "747174" + lockfile = tmpdir / "cosmos_virtualenv.lock" + lockfile.write_text(str(non_existent_pid), encoding="utf-8") + venv_operator = ConcreteDbtVirtualenvBaseOperator( + profile_config=profile_config, + project_dir="./dev/dags/dbt/jaffle_shop", + task_id="okay_task", + is_virtualenv_dir_temporary=False, + virtualenv_dir=Path(tmpdir), + ) + assert venv_operator._release_venv_lock() is None + assert lockfile.exists() + assert "Lockfile owned by process of pid 747174, while operator has pid" in caplog.text + + +def test__release_venv_lock_current_process(tmpdir): + parent_pid = os.getpid() + lockfile = tmpdir / "cosmos_virtualenv.lock" + lockfile.write_text(str(parent_pid), encoding="utf-8") + venv_operator = ConcreteDbtVirtualenvBaseOperator( + profile_config=profile_config, + project_dir="./dev/dags/dbt/jaffle_shop", + task_id="okay_task", + is_virtualenv_dir_temporary=False, + virtualenv_dir=Path(tmpdir), + ) + assert venv_operator._release_venv_lock() is None + assert not lockfile.exists() diff --git a/tests/test_converter.py b/tests/test_converter.py index bef2dc06d..573eba138 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -6,7 +6,7 @@ import pytest from airflow.models import DAG -from cosmos.config import CosmosConfigException, ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig +from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig from cosmos.constants import DbtResourceType, ExecutionMode, InvocationMode, LoadMode from cosmos.converter import DbtToAirflowConverter, validate_arguments, validate_initial_user_config from cosmos.dbt.graph import DbtGraph, DbtNode @@ -168,7 +168,6 @@ def test_converter_creates_dag_with_seed(mock_load_dbt_graph, execution_mode, op "execution_mode,operator_args", [ (ExecutionMode.KUBERNETES, {}), - # (ExecutionMode.DOCKER, {"image": "sample-image"}), ], ) @patch("cosmos.converter.DbtGraph.filtered_nodes", nodes) @@ -199,108 +198,80 @@ def test_converter_creates_dag_with_project_path_str(mock_load_dbt_graph, execut @pytest.mark.parametrize( - "execution_mode,operator_args", + "execution_mode,virtualenv_dir,operator_args", [ - (ExecutionMode.KUBERNETES, {}), + (ExecutionMode.KUBERNETES, Path("/some/virtualenv/dir"), {}), # (ExecutionMode.DOCKER, {"image": "sample-image"}), ], ) @patch("cosmos.converter.DbtGraph.filtered_nodes", nodes) @patch("cosmos.converter.DbtGraph.load") -def test_converter_fails_execution_config_no_project_dir(mock_load_dbt_graph, execution_mode, operator_args): +def test_converter_raises_warning(mock_load_dbt_graph, execution_mode, virtualenv_dir, operator_args, caplog): """ - This test validates that a project, given a manifest path and project name, with seeds - is able to successfully generate a converter + This test will raise a warning if we are trying to pass ExecutionMode != `VirtualEnv` + and still pass a defined `virtualenv_dir` """ - project_config = ProjectConfig(manifest_path=SAMPLE_DBT_MANIFEST.as_posix(), project_name="sample") - execution_config = ExecutionConfig(execution_mode=execution_mode) + project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT.as_posix()) + execution_config = ExecutionConfig(execution_mode=execution_mode, virtualenv_dir=virtualenv_dir) render_config = RenderConfig(emit_datasets=True) profile_config = ProfileConfig( profile_name="my_profile_name", target_name="my_target_name", profiles_yml_filepath=SAMPLE_PROFILE_YML, ) - with pytest.raises(CosmosValueError) as err_info: - DbtToAirflowConverter( - nodes=nodes, - project_config=project_config, - profile_config=profile_config, - execution_config=execution_config, - render_config=render_config, - operator_args=operator_args, - ) - assert ( - err_info.value.args[0] - == "ExecutionConfig.dbt_project_path is required for the execution of dbt tasks in all execution modes." - ) - -def test_converter_fails_render_config_invalid_dbt_path_with_dbt_ls(): - """ - Validate that a dbt project fails to be rendered to Airflow with DBT_LS if - the dbt command is invalid. - """ - project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT.as_posix(), project_name="sample") - execution_config = ExecutionConfig( - execution_mode=ExecutionMode.LOCAL, - dbt_executable_path="invalid-execution-dbt", - ) - render_config = RenderConfig( - emit_datasets=True, - dbt_executable_path="invalid-render-dbt", - ) - profile_config = ProfileConfig( - profile_name="my_profile_name", - target_name="my_target_name", - profiles_yml_filepath=SAMPLE_PROFILE_YML, + DbtToAirflowConverter( + dag=DAG("sample_dag", start_date=datetime(2024, 4, 16)), + nodes=nodes, + project_config=project_config, + profile_config=profile_config, + execution_config=execution_config, + render_config=render_config, + operator_args=operator_args, ) - with pytest.raises(CosmosConfigException) as err_info: - with DAG("test-id", start_date=datetime(2022, 1, 1)) as dag: - DbtToAirflowConverter( - dag=dag, - nodes=nodes, - project_config=project_config, - profile_config=profile_config, - execution_config=execution_config, - render_config=render_config, - ) + assert ( - err_info.value.args[0] - == "Unable to find the dbt executable, attempted: and ." + "`ExecutionConfig.virtualenv_dir` is only supported when \ + ExecutionConfig.execution_mode is set to ExecutionMode.VIRTUALENV." + in caplog.text ) -def test_converter_fails_render_config_invalid_dbt_path_with_manifest(): +@pytest.mark.parametrize( + "execution_mode,operator_args", + [ + (ExecutionMode.KUBERNETES, {}), + # (ExecutionMode.DOCKER, {"image": "sample-image"}), + ], +) +@patch("cosmos.converter.DbtGraph.filtered_nodes", nodes) +@patch("cosmos.converter.DbtGraph.load") +def test_converter_fails_execution_config_no_project_dir(mock_load_dbt_graph, execution_mode, operator_args): """ - Validate that a dbt project succeeds to be rendered to Airflow with DBT_MANIFEST even when - the dbt command is invalid. + This test validates that a project, given a manifest path and project name, with seeds + is able to successfully generate a converter """ project_config = ProjectConfig(manifest_path=SAMPLE_DBT_MANIFEST.as_posix(), project_name="sample") - - execution_config = ExecutionConfig( - execution_mode=ExecutionMode.LOCAL, - dbt_executable_path="invalid-execution-dbt", - dbt_project_path=SAMPLE_DBT_PROJECT.as_posix(), - ) - render_config = RenderConfig( - emit_datasets=True, - dbt_executable_path="invalid-render-dbt", - ) + execution_config = ExecutionConfig(execution_mode=execution_mode) + render_config = RenderConfig(emit_datasets=True) profile_config = ProfileConfig( profile_name="my_profile_name", target_name="my_target_name", profiles_yml_filepath=SAMPLE_PROFILE_YML, ) - with DAG("test-id", start_date=datetime(2022, 1, 1)) as dag: - converter = DbtToAirflowConverter( - dag=dag, + with pytest.raises(CosmosValueError) as err_info: + DbtToAirflowConverter( nodes=nodes, project_config=project_config, profile_config=profile_config, execution_config=execution_config, render_config=render_config, + operator_args=operator_args, ) - assert converter + assert ( + err_info.value.args[0] + == "ExecutionConfig.dbt_project_path is required for the execution of dbt tasks in all execution modes." + ) @pytest.mark.parametrize(