From 1828a0c0c8abf663e0b44ddfa2c333c69bfbb35f Mon Sep 17 00:00:00 2001 From: Lennart Kloppenburg Date: Wed, 18 Oct 2023 15:05:37 +0200 Subject: [PATCH 01/18] Add support for virtual env directory flag --- cosmos/config.py | 8 +++++-- cosmos/converter.py | 35 +++++++++++++++++++++++++++++ cosmos/operators/virtualenv.py | 40 ++++++++++++++++++++++++++-------- dev/dags/example_virtualenv.py | 4 ++++ 4 files changed, 76 insertions(+), 11 deletions(-) diff --git a/cosmos/config.py b/cosmos/config.py index 46e3f1915..57b55f95e 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -288,16 +288,20 @@ class ExecutionConfig: :param execution_mode: The execution mode for dbt. Defaults to local :param test_indirect_selection: The mode to configure the test behavior when performing indirect selection. - :param dbt_executable_path: The path to the dbt executable for runtime execution. Defaults to dbt if available on the path. :param dbt_project_path Configures the DBT project location accessible at runtime for dag execution. This is the project path in a docker container for ExecutionMode.DOCKER or ExecutionMode.KUBERNETES. Mutually Exclusive with ProjectConfig.dbt_project_path + :param dbt_executable_path: The path to the dbt executable. Defaults to dbt if + available on the path. + :param virtualenv_dir: Directory path to locate the (cached) virtual env that + should be used for execution when execution mode is set to `ExecutionMode.VIRTUALENV` """ - execution_mode: ExecutionMode = ExecutionMode.LOCAL test_indirect_selection: TestIndirectSelection = TestIndirectSelection.EAGER dbt_executable_path: str | Path = field(default_factory=get_system_dbt) dbt_project_path: InitVar[str | Path | None] = None + virtualenv_dir: str | Path | None = None project_path: Path | None = field(init=False) def __post_init__(self, dbt_project_path: str | Path | None) -> None: self.project_path = Path(dbt_project_path) if dbt_project_path else None + diff --git a/cosmos/converter.py b/cosmos/converter.py index c2b31700b..553b879ad 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -15,6 +15,7 @@ from cosmos.constants import ExecutionMode from cosmos.dbt.graph import DbtGraph from cosmos.dbt.selector import retrieve_by_label +from cosmos.constants import ExecutionMode from cosmos.config import ProjectConfig, ExecutionConfig, RenderConfig, ProfileConfig from cosmos.exceptions import CosmosValueError from cosmos.log import get_logger @@ -223,6 +224,38 @@ def __init__( env_vars = project_config.env_vars or operator_args.pop("env", None) dbt_vars = project_config.dbt_vars or operator_args.pop("vars", None) + # We now have a guaranteed execution_config.project_path, but still need to process render_config.project_path + # We require render_config.project_path when we dont have a manifest + if not project_config.manifest_path and not render_config.project_path: + raise CosmosValueError( + "RenderConfig.dbt_project_path is required for rendering an airflow DAG from a DBT Graph if no manifest is provided." + ) + emit_datasets = render_config.emit_datasets + dbt_root_path = project_config.dbt_project_path.parent + dbt_project_name = project_config.dbt_project_path.name + dbt_models_dir = project_config.models_relative_path + dbt_seeds_dir = project_config.seeds_relative_path + dbt_snapshots_dir = project_config.snapshots_relative_path + test_behavior = render_config.test_behavior + select = render_config.select + exclude = render_config.exclude + dbt_deps = render_config.dbt_deps + execution_mode = execution_config.execution_mode + load_mode = render_config.load_method + manifest_path = project_config.parsed_manifest_path + dbt_executable_path = execution_config.dbt_executable_path + node_converters = render_config.node_converters + + if (execution_mode != ExecutionMode.VIRTUALENV and execution_config.virtualenv_dir is not None): + logger.warning("`ExecutionConfig.virtualenv_dir` is only supported when \ + ExecutionConfig.execution_mode is set to ExecutionMode.VIRTUALENV.") + + profile_args = {} + if profile_config.profile_mapping: + profile_args = profile_config.profile_mapping.profile_args + + if not operator_args: + operator_args = {} # Previously, we were creating a cosmos.dbt.project.DbtProject # DbtProject has now been replaced with ProjectConfig directly @@ -261,6 +294,8 @@ def __init__( task_args, execution_mode=execution_config.execution_mode, ) + if (execution_mode == ExecutionMode.VIRTUALENV and execution_config.virtualenv_dir is not None): + task_args["virtualenv_dir"] = execution_config.virtualenv_dir build_airflow_graph( nodes=dbt_graph.filtered_nodes, diff --git a/cosmos/operators/virtualenv.py b/cosmos/operators/virtualenv.py index 4d6338e09..5fff03b9f 100644 --- a/cosmos/operators/virtualenv.py +++ b/cosmos/operators/virtualenv.py @@ -45,11 +45,13 @@ def __init__( self, py_requirements: list[str] | None = None, py_system_site_packages: bool = False, + virtualenv_dir: Path | str | None = None, **kwargs: Any, ) -> None: self.py_requirements = py_requirements or [] self.py_system_site_packages = py_system_site_packages super().__init__(**kwargs) + self._venv_dir = virtualenv_dir self._venv_tmp_dir: None | TemporaryDirectory[str] = None @cached_property @@ -59,19 +61,14 @@ def venv_dbt_path( """ Path to the dbt binary within a Python virtualenv. - The first time this property is called, it creates a virtualenv and installs the dependencies based on the - self.py_requirements and self.py_system_site_packages. This value is cached for future calls. + The first time this property is called, it creates a new/temporary and installs the dependencies + based on the self.py_requirements and self.py_system_site_packages, or retrieves an existing virtualenv. + This value is cached for future calls. """ # We are reusing the virtualenv directory for all subprocess calls within this task/operator. # For this reason, we are not using contexts at this point. # The deletion of this directory is done explicitly at the end of the `execute` method. - self._venv_tmp_dir = TemporaryDirectory(prefix="cosmos-venv") - py_interpreter = prepare_virtualenv( - venv_directory=self._venv_tmp_dir.name, - python_bin=PY_INTERPRETER, - system_site_packages=self.py_system_site_packages, - requirements=self.py_requirements, - ) + py_interpreter = self._get_or_create_venv_py_interpreter() dbt_binary = Path(py_interpreter).parent / "dbt" cmd_output = self.subprocess_hook.run_command( [ @@ -97,6 +94,31 @@ def execute(self, context: Context) -> None: self._venv_tmp_dir.cleanup() logger.info(output) + def _get_or_create_venv_py_interpreter(self) -> str: + if self._venv_dir is not None: + py_interpreter_path = Path(f"{self._venv_dir}/bin/python") + + self.log.info(f"Checking for venv interpreter: {py_interpreter_path} : {py_interpreter_path.is_file()}") + if py_interpreter_path.is_file(): + + self.log.info(f"Found Python interpreter in cached virtualenv: `{str(py_interpreter_path)}`") + return str(py_interpreter_path) + + self.log.info(f"Creating virtualenv at `{self._venv_dir}") + venv_directory = str(self._venv_dir) + + else: + self.log.info(f"Creating temporary virtualenv") + self._venv_tmp_dir = TemporaryDirectory(prefix="cosmos-venv") + venv_directory = self._venv_tmp_dir.name + + return prepare_virtualenv( + venv_directory=venv_directory, + python_bin=PY_INTERPRETER, + system_site_packages=self.py_system_site_packages, + requirements=self.py_requirements, + ) + class DbtLSVirtualenvOperator(DbtVirtualenvBaseOperator, DbtLSLocalOperator): """ diff --git a/dev/dags/example_virtualenv.py b/dev/dags/example_virtualenv.py index 7b1368f8c..7ec4b0a16 100644 --- a/dev/dags/example_virtualenv.py +++ b/dev/dags/example_virtualenv.py @@ -4,6 +4,7 @@ import os from datetime import datetime from pathlib import Path +from airflow.configuration import get_airflow_home from cosmos import DbtDag, ExecutionMode, ExecutionConfig, ProjectConfig, ProfileConfig from cosmos.profiles import PostgresUserPasswordProfileMapping @@ -30,6 +31,9 @@ profile_config=profile_config, execution_config=ExecutionConfig( execution_mode=ExecutionMode.VIRTUALENV, + # We can enable this flag if we want Airflow to create one virtualenv + # and reuse that within the whole DAG. + # virtualenv_dir=f"{get_airflow_home()}/persistent-venv", ), operator_args={ "py_system_site_packages": False, From ab5e54105818a4b920f58d0fb88fbbc68d300f62 Mon Sep 17 00:00:00 2001 From: Lennart Kloppenburg Date: Wed, 18 Oct 2023 15:58:00 +0200 Subject: [PATCH 02/18] Add basic test --- tests/operators/test_virtualenv.py | 35 ++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/tests/operators/test_virtualenv.py b/tests/operators/test_virtualenv.py index 13dba8f94..0e86cf7c4 100644 --- a/tests/operators/test_virtualenv.py +++ b/tests/operators/test_virtualenv.py @@ -65,3 +65,38 @@ def test_run_command( assert dbt_deps[0][0][0] == dbt_cmd[0][0][0] assert dbt_cmd[0][0][1] == "do-something" assert mock_execute.call_count == 2 + +@patch("airflow.utils.python_virtualenv.execute_in_subprocess") +@patch("cosmos.operators.virtualenv.DbtLocalBaseOperator.calculate_openlineage_events_completes") +@patch("cosmos.operators.virtualenv.DbtLocalBaseOperator.store_compiled_sql") +@patch("cosmos.operators.virtualenv.DbtLocalBaseOperator.exception_handling") +@patch("cosmos.operators.virtualenv.DbtLocalBaseOperator.subprocess_hook") +@patch("airflow.hooks.base.BaseHook.get_connection") +def test_supply_virtualenv_dir_flag( + mock_get_connection, + mock_subprocess_hook, + mock_exception_handling, + mock_store_compiled_sql, + mock_calculate_openlineage_events_completes, + mock_execute, +): + mock_get_connection.return_value = Connection( + conn_id="fake_conn", + conn_type="postgres", + host="fake_host", + port=5432, + login="fake_login", + password="fake_password", + schema="fake_schema", + ) + venv_operator = DbtVirtualenvBaseOperator( + profile_config=profile_config, + task_id="fake_task", + install_deps=True, + project_dir="./dev/dags/dbt/jaffle_shop", + py_system_site_packages=False, + py_requirements=["dbt-postgres==1.6.0b1"], + emit_datasets=False, + virtualenv_dir="mock-venv", + ) + assert venv_operator.venv_dbt_path == "mock-venv/bin/dbt" \ No newline at end of file From 7e8f82d78dfa4b5f3e09c55e54e25d263f943f04 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 18 Oct 2023 14:10:06 +0000 Subject: [PATCH 03/18] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20f?= =?UTF-8?q?ormat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cosmos/config.py | 3 ++- cosmos/converter.py | 8 +++++--- cosmos/operators/virtualenv.py | 9 ++++----- dev/dags/example_virtualenv.py | 5 ++--- tests/operators/test_virtualenv.py | 5 +++-- 5 files changed, 16 insertions(+), 14 deletions(-) diff --git a/cosmos/config.py b/cosmos/config.py index 57b55f95e..72b1f2eb9 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -291,9 +291,10 @@ class ExecutionConfig: :param dbt_project_path Configures the DBT project location accessible at runtime for dag execution. This is the project path in a docker container for ExecutionMode.DOCKER or ExecutionMode.KUBERNETES. Mutually Exclusive with ProjectConfig.dbt_project_path :param dbt_executable_path: The path to the dbt executable. Defaults to dbt if available on the path. - :param virtualenv_dir: Directory path to locate the (cached) virtual env that + :param virtualenv_dir: Directory path to locate the (cached) virtual env that should be used for execution when execution mode is set to `ExecutionMode.VIRTUALENV` """ + execution_mode: ExecutionMode = ExecutionMode.LOCAL test_indirect_selection: TestIndirectSelection = TestIndirectSelection.EAGER dbt_executable_path: str | Path = field(default_factory=get_system_dbt) diff --git a/cosmos/converter.py b/cosmos/converter.py index 553b879ad..6c87a416a 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -246,9 +246,11 @@ def __init__( dbt_executable_path = execution_config.dbt_executable_path node_converters = render_config.node_converters - if (execution_mode != ExecutionMode.VIRTUALENV and execution_config.virtualenv_dir is not None): - logger.warning("`ExecutionConfig.virtualenv_dir` is only supported when \ - ExecutionConfig.execution_mode is set to ExecutionMode.VIRTUALENV.") + if execution_mode != ExecutionMode.VIRTUALENV and execution_config.virtualenv_dir is not None: + logger.warning( + "`ExecutionConfig.virtualenv_dir` is only supported when \ + ExecutionConfig.execution_mode is set to ExecutionMode.VIRTUALENV." + ) profile_args = {} if profile_config.profile_mapping: diff --git a/cosmos/operators/virtualenv.py b/cosmos/operators/virtualenv.py index 5fff03b9f..6a8c43c25 100644 --- a/cosmos/operators/virtualenv.py +++ b/cosmos/operators/virtualenv.py @@ -61,7 +61,7 @@ def venv_dbt_path( """ Path to the dbt binary within a Python virtualenv. - The first time this property is called, it creates a new/temporary and installs the dependencies + The first time this property is called, it creates a new/temporary and installs the dependencies based on the self.py_requirements and self.py_system_site_packages, or retrieves an existing virtualenv. This value is cached for future calls. """ @@ -100,15 +100,14 @@ def _get_or_create_venv_py_interpreter(self) -> str: self.log.info(f"Checking for venv interpreter: {py_interpreter_path} : {py_interpreter_path.is_file()}") if py_interpreter_path.is_file(): - self.log.info(f"Found Python interpreter in cached virtualenv: `{str(py_interpreter_path)}`") return str(py_interpreter_path) - + self.log.info(f"Creating virtualenv at `{self._venv_dir}") venv_directory = str(self._venv_dir) - + else: - self.log.info(f"Creating temporary virtualenv") + self.log.info("Creating temporary virtualenv") self._venv_tmp_dir = TemporaryDirectory(prefix="cosmos-venv") venv_directory = self._venv_tmp_dir.name diff --git a/dev/dags/example_virtualenv.py b/dev/dags/example_virtualenv.py index 7ec4b0a16..475e4e66c 100644 --- a/dev/dags/example_virtualenv.py +++ b/dev/dags/example_virtualenv.py @@ -4,7 +4,6 @@ import os from datetime import datetime from pathlib import Path -from airflow.configuration import get_airflow_home from cosmos import DbtDag, ExecutionMode, ExecutionConfig, ProjectConfig, ProfileConfig from cosmos.profiles import PostgresUserPasswordProfileMapping @@ -31,9 +30,9 @@ profile_config=profile_config, execution_config=ExecutionConfig( execution_mode=ExecutionMode.VIRTUALENV, - # We can enable this flag if we want Airflow to create one virtualenv + # We can enable this flag if we want Airflow to create one virtualenv # and reuse that within the whole DAG. - # virtualenv_dir=f"{get_airflow_home()}/persistent-venv", + # virtualenv_dir=f"{get_airflow_home()}/persistent-venv", ), operator_args={ "py_system_site_packages": False, diff --git a/tests/operators/test_virtualenv.py b/tests/operators/test_virtualenv.py index 0e86cf7c4..9b70524a3 100644 --- a/tests/operators/test_virtualenv.py +++ b/tests/operators/test_virtualenv.py @@ -66,13 +66,14 @@ def test_run_command( assert dbt_cmd[0][0][1] == "do-something" assert mock_execute.call_count == 2 + @patch("airflow.utils.python_virtualenv.execute_in_subprocess") @patch("cosmos.operators.virtualenv.DbtLocalBaseOperator.calculate_openlineage_events_completes") @patch("cosmos.operators.virtualenv.DbtLocalBaseOperator.store_compiled_sql") @patch("cosmos.operators.virtualenv.DbtLocalBaseOperator.exception_handling") @patch("cosmos.operators.virtualenv.DbtLocalBaseOperator.subprocess_hook") @patch("airflow.hooks.base.BaseHook.get_connection") -def test_supply_virtualenv_dir_flag( +def test_supply_virtualenv_dir_flag( mock_get_connection, mock_subprocess_hook, mock_exception_handling, @@ -99,4 +100,4 @@ def test_supply_virtualenv_dir_flag( emit_datasets=False, virtualenv_dir="mock-venv", ) - assert venv_operator.venv_dbt_path == "mock-venv/bin/dbt" \ No newline at end of file + assert venv_operator.venv_dbt_path == "mock-venv/bin/dbt" From 877291ba7939880c07922ca0be9e2f5cb5b58efe Mon Sep 17 00:00:00 2001 From: Lennart Kloppenburg Date: Mon, 23 Oct 2023 15:23:26 +0200 Subject: [PATCH 04/18] Only accept Path(...) --- tests/operators/test_virtualenv.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/operators/test_virtualenv.py b/tests/operators/test_virtualenv.py index 9b70524a3..e13228f57 100644 --- a/tests/operators/test_virtualenv.py +++ b/tests/operators/test_virtualenv.py @@ -1,5 +1,5 @@ from unittest.mock import patch, MagicMock - +from pathlib import Path from cosmos.operators.virtualenv import DbtVirtualenvBaseOperator from airflow.models.connection import Connection @@ -98,6 +98,6 @@ def test_supply_virtualenv_dir_flag( py_system_site_packages=False, py_requirements=["dbt-postgres==1.6.0b1"], emit_datasets=False, - virtualenv_dir="mock-venv", + virtualenv_dir=Path("mock-venv"), ) assert venv_operator.venv_dbt_path == "mock-venv/bin/dbt" From c9613b58254fd943c92129d27c7f4924c952cd45 Mon Sep 17 00:00:00 2001 From: Lennart Kloppenburg Date: Mon, 23 Oct 2023 15:23:51 +0200 Subject: [PATCH 05/18] Check if venv_dir needs to be created + document helper --- cosmos/operators/virtualenv.py | 29 ++++++++++++++++++++++------- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/cosmos/operators/virtualenv.py b/cosmos/operators/virtualenv.py index 6a8c43c25..b9ae9dd64 100644 --- a/cosmos/operators/virtualenv.py +++ b/cosmos/operators/virtualenv.py @@ -1,5 +1,6 @@ from __future__ import annotations +import os from pathlib import Path from tempfile import TemporaryDirectory from typing import TYPE_CHECKING, Any @@ -45,7 +46,7 @@ def __init__( self, py_requirements: list[str] | None = None, py_system_site_packages: bool = False, - virtualenv_dir: Path | str | None = None, + virtualenv_dir: Path | None = None, **kwargs: Any, ) -> None: self.py_requirements = py_requirements or [] @@ -95,13 +96,27 @@ def execute(self, context: Context) -> None: logger.info(output) def _get_or_create_venv_py_interpreter(self) -> str: + """ + Helper method that parses virtual env configuration and returns a DBT binary within the resulting virtualenv: + Do we have a persistent virtual env dir set in `self._venv_dir`? + 1. Yes: Does a directory at that path exist? + 1. No: Create it, and create a virtual env inside it + 2. Yes: Does the directory have a virtual env inside it? + 1. No: Create one in this directory and return it + 2. Yes: Return this virtual env + 2. No: Create a temporary virtual env and return it + + """ if self._venv_dir is not None: - py_interpreter_path = Path(f"{self._venv_dir}/bin/python") - - self.log.info(f"Checking for venv interpreter: {py_interpreter_path} : {py_interpreter_path.is_file()}") - if py_interpreter_path.is_file(): - self.log.info(f"Found Python interpreter in cached virtualenv: `{str(py_interpreter_path)}`") - return str(py_interpreter_path) + if self._venv_dir.is_dir(): + py_interpreter_path = Path(f"{self._venv_dir}/bin/python") + + self.log.info(f"Checking for venv interpreter: {py_interpreter_path} : {py_interpreter_path.is_file()}") + if py_interpreter_path.is_file(): + self.log.info(f"Found Python interpreter in cached virtualenv: `{str(py_interpreter_path)}`") + return str(py_interpreter_path) + else: + os.mkdir(self._venv_dir) self.log.info(f"Creating virtualenv at `{self._venv_dir}") venv_directory = str(self._venv_dir) From 6e030dcf64d32cf4a7f065a6ed7c46499d274eef Mon Sep 17 00:00:00 2001 From: Lennart Kloppenburg Date: Mon, 23 Oct 2023 15:24:06 +0200 Subject: [PATCH 06/18] Assert warning --- tests/test_converter.py | 40 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 38 insertions(+), 2 deletions(-) diff --git a/tests/test_converter.py b/tests/test_converter.py index d84249aae..a09f29684 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -2,6 +2,7 @@ from pathlib import Path from unittest.mock import patch, MagicMock from cosmos.profiles.postgres import PostgresUserPasswordProfileMapping +import logging import pytest from airflow.models import DAG @@ -16,6 +17,7 @@ SAMPLE_PROFILE_YML = Path(__file__).parent / "sample/profiles.yml" SAMPLE_DBT_PROJECT = Path(__file__).parent / "sample/" SAMPLE_DBT_MANIFEST = Path(__file__).parent / "sample/manifest.json" +LOGGER = logging.getLogger(__name__) @pytest.mark.parametrize("argument_key", ["tags", "paths"]) @@ -167,8 +169,7 @@ def test_converter_creates_dag_with_seed(mock_load_dbt_graph, execution_mode, op "execution_mode,operator_args", [ (ExecutionMode.KUBERNETES, {}), - # (ExecutionMode.DOCKER, {"image": "sample-image"}), - ], + ] ) @patch("cosmos.converter.DbtGraph.filtered_nodes", nodes) @patch("cosmos.converter.DbtGraph.load") @@ -195,6 +196,41 @@ def test_converter_creates_dag_with_project_path_str(mock_load_dbt_graph, execut ) assert converter +@pytest.mark.parametrize( + "execution_mode,virtualenv_dir,operator_args", + [ + (ExecutionMode.KUBERNETES, Path("/some/virtualenv/dir"), {}), + # (ExecutionMode.DOCKER, {"image": "sample-image"}), + ], +) +def test_converter_raises_warning(mock_load_dbt_graph, execution_mode, virtualenv_dir, operator_args, caplog): + """ + This test will raise a warning if we are trying to pass ExecutionMode != `VirtualEnv` andm still pass a defined `virtualenv_dir` + """ + project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT) + execution_config = ExecutionConfig(execution_mode=execution_mode, virtualenv_dir=virtualenv_dir) + render_config = RenderConfig(emit_datasets=True) + profile_config = ProfileConfig( + profile_name="my_profile_name", + target_name="my_target_name", + profiles_yml_filepath=SAMPLE_PROFILE_YML, + ) + + converter = DbtToAirflowConverter( + nodes=nodes, + project_config=project_config, + profile_config=profile_config, + execution_config=execution_config, + render_config=render_config, + operator_args=operator_args, + ) + + assert converter + + assert "`ExecutionConfig.virtualenv_dir` is only supported when \ + ExecutionConfig.execution_mode is set to ExecutionMode.VIRTUALENV." in caplog.text + + @pytest.mark.parametrize( "execution_mode,operator_args", From 43dc58f6ce16284f726b1590140b59304f6922cf Mon Sep 17 00:00:00 2001 From: Lennart Kloppenburg Date: Mon, 23 Oct 2023 15:24:38 +0200 Subject: [PATCH 07/18] Support both virtualenv use-cases in two task groups in the example DAG --- dev/dags/example_virtualenv.py | 76 ++++++++++++++++++++++++---------- tests/test_converter.py | 3 +- 2 files changed, 56 insertions(+), 23 deletions(-) diff --git a/dev/dags/example_virtualenv.py b/dev/dags/example_virtualenv.py index 475e4e66c..a46473c3c 100644 --- a/dev/dags/example_virtualenv.py +++ b/dev/dags/example_virtualenv.py @@ -5,7 +5,11 @@ from datetime import datetime from pathlib import Path -from cosmos import DbtDag, ExecutionMode, ExecutionConfig, ProjectConfig, ProfileConfig +from airflow.decorators import dag +from airflow.configuration import get_airflow_home +from airflow.operators.empty import EmptyOperator + +from cosmos import DbtTaskGroup, ExecutionMode, ExecutionConfig, ProjectConfig, ProfileConfig from cosmos.profiles import PostgresUserPasswordProfileMapping DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt" @@ -21,29 +25,57 @@ ), ) -# [START virtualenv_example] -example_virtualenv = DbtDag( - # dbt/cosmos-specific parameters - project_config=ProjectConfig( - DBT_ROOT_PATH / "jaffle_shop", - ), - profile_config=profile_config, - execution_config=ExecutionConfig( - execution_mode=ExecutionMode.VIRTUALENV, - # We can enable this flag if we want Airflow to create one virtualenv - # and reuse that within the whole DAG. - # virtualenv_dir=f"{get_airflow_home()}/persistent-venv", - ), - operator_args={ - "py_system_site_packages": False, - "py_requirements": ["dbt-postgres==1.6.0b1"], - "install_deps": True, - }, - # normal dag parameters +@dag( schedule_interval="@daily", start_date=datetime(2023, 1, 1), catchup=False, - dag_id="example_virtualenv", - default_args={"retries": 2}, ) +def example_virtualenv() -> None: + start_task = EmptyOperator(task_id='start-venv-examples') + end_task = EmptyOperator(task_id='end-venv-examples') + + tmp_venv_task_group = DbtTaskGroup( + group_id='tmp-venv-group', + # dbt/cosmos-specific parameters + project_config=ProjectConfig( + DBT_ROOT_PATH / "jaffle_shop", + ), + profile_config=profile_config, + execution_config=ExecutionConfig( + execution_mode=ExecutionMode.VIRTUALENV, + # We can enable this flag if we want Airflow to create one virtualenv + # and reuse that within the whole DAG. + # virtualenv_dir=f"{get_airflow_home()}/persistent-venv", + ), + operator_args={ + "py_system_site_packages": False, + "py_requirements": ["dbt-postgres==1.6.0b1"], + "install_deps": True, + }, + ) + + cached_venv_task_group = DbtTaskGroup( + group_id='cached-venv-group', + # dbt/cosmos-specific parameters + project_config=ProjectConfig( + DBT_ROOT_PATH / "jaffle_shop", + ), + profile_config=profile_config, + execution_config=ExecutionConfig( + execution_mode=ExecutionMode.VIRTUALENV, + # We can enable this flag if we want Airflow to create one virtualenv + # and reuse that within the whole DAG. + virtualenv_dir=Path(f"{get_airflow_home()}/persistent-venv"), + ), + operator_args={ + "py_system_site_packages": False, + "py_requirements": ["dbt-postgres==1.6.0b1"], + "install_deps": True, + }, + ) + + start_task >> [tmp_venv_task_group, cached_venv_task_group] >> end_task + +example_virtualenv() # [END virtualenv_example] + diff --git a/tests/test_converter.py b/tests/test_converter.py index a09f29684..bf215ad7e 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -205,7 +205,8 @@ def test_converter_creates_dag_with_project_path_str(mock_load_dbt_graph, execut ) def test_converter_raises_warning(mock_load_dbt_graph, execution_mode, virtualenv_dir, operator_args, caplog): """ - This test will raise a warning if we are trying to pass ExecutionMode != `VirtualEnv` andm still pass a defined `virtualenv_dir` + This test will raise a warning if we are trying to pass ExecutionMode != `VirtualEnv` + and still pass a defined `virtualenv_dir` """ project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT) execution_config = ExecutionConfig(execution_mode=execution_mode, virtualenv_dir=virtualenv_dir) From 78748b07f84fda8cdaf71088238c77a193109193 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 23 Oct 2023 13:26:42 +0000 Subject: [PATCH 08/18] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20f?= =?UTF-8?q?ormat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cosmos/operators/virtualenv.py | 2 +- dev/dags/example_virtualenv.py | 11 ++++++----- tests/test_converter.py | 2 +- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/cosmos/operators/virtualenv.py b/cosmos/operators/virtualenv.py index b9ae9dd64..97a63f45b 100644 --- a/cosmos/operators/virtualenv.py +++ b/cosmos/operators/virtualenv.py @@ -105,7 +105,7 @@ def _get_or_create_venv_py_interpreter(self) -> str: 1. No: Create one in this directory and return it 2. Yes: Return this virtual env 2. No: Create a temporary virtual env and return it - + """ if self._venv_dir is not None: if self._venv_dir.is_dir(): diff --git a/dev/dags/example_virtualenv.py b/dev/dags/example_virtualenv.py index a46473c3c..45af6e0a8 100644 --- a/dev/dags/example_virtualenv.py +++ b/dev/dags/example_virtualenv.py @@ -25,17 +25,18 @@ ), ) + @dag( schedule_interval="@daily", start_date=datetime(2023, 1, 1), catchup=False, ) def example_virtualenv() -> None: - start_task = EmptyOperator(task_id='start-venv-examples') - end_task = EmptyOperator(task_id='end-venv-examples') + start_task = EmptyOperator(task_id="start-venv-examples") + end_task = EmptyOperator(task_id="end-venv-examples") tmp_venv_task_group = DbtTaskGroup( - group_id='tmp-venv-group', + group_id="tmp-venv-group", # dbt/cosmos-specific parameters project_config=ProjectConfig( DBT_ROOT_PATH / "jaffle_shop", @@ -55,7 +56,7 @@ def example_virtualenv() -> None: ) cached_venv_task_group = DbtTaskGroup( - group_id='cached-venv-group', + group_id="cached-venv-group", # dbt/cosmos-specific parameters project_config=ProjectConfig( DBT_ROOT_PATH / "jaffle_shop", @@ -76,6 +77,6 @@ def example_virtualenv() -> None: start_task >> [tmp_venv_task_group, cached_venv_task_group] >> end_task + example_virtualenv() # [END virtualenv_example] - diff --git a/tests/test_converter.py b/tests/test_converter.py index bf215ad7e..f0777bb02 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -205,7 +205,7 @@ def test_converter_creates_dag_with_project_path_str(mock_load_dbt_graph, execut ) def test_converter_raises_warning(mock_load_dbt_graph, execution_mode, virtualenv_dir, operator_args, caplog): """ - This test will raise a warning if we are trying to pass ExecutionMode != `VirtualEnv` + This test will raise a warning if we are trying to pass ExecutionMode != `VirtualEnv` and still pass a defined `virtualenv_dir` """ project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT) From 5a3a0bab7900aead52e0f7c3c4e40ab33e916050 Mon Sep 17 00:00:00 2001 From: Lennart Kloppenburg Date: Mon, 6 Nov 2023 13:28:58 +0100 Subject: [PATCH 09/18] Implement locking mechanism [WIP] --- cosmos/operators/virtualenv.py | 89 +++++++++++++++++++++++----------- 1 file changed, 60 insertions(+), 29 deletions(-) diff --git a/cosmos/operators/virtualenv.py b/cosmos/operators/virtualenv.py index 97a63f45b..6885e4161 100644 --- a/cosmos/operators/virtualenv.py +++ b/cosmos/operators/virtualenv.py @@ -1,6 +1,8 @@ from __future__ import annotations import os +import psutil +import time from pathlib import Path from tempfile import TemporaryDirectory from typing import TYPE_CHECKING, Any @@ -29,7 +31,6 @@ PY_INTERPRETER = "python3" - class DbtVirtualenvBaseOperator(DbtLocalBaseOperator): """ Executes a dbt core cli command within a Python Virtual Environment, that is created before running the dbt command @@ -41,6 +42,7 @@ class DbtVirtualenvBaseOperator(DbtLocalBaseOperator): within the virtual environment (if py_requirements argument is specified). Avoid using unless the dbt job requires it. """ + template_fields = DbtLocalBaseOperator.template_fields + ("virtualenv_dir",) def __init__( self, @@ -52,7 +54,7 @@ def __init__( self.py_requirements = py_requirements or [] self.py_system_site_packages = py_system_site_packages super().__init__(**kwargs) - self._venv_dir = virtualenv_dir + self.virtualenv_dir = virtualenv_dir self._venv_tmp_dir: None | TemporaryDirectory[str] = None @cached_property @@ -96,43 +98,71 @@ def execute(self, context: Context) -> None: logger.info(output) def _get_or_create_venv_py_interpreter(self) -> str: - """ - Helper method that parses virtual env configuration and returns a DBT binary within the resulting virtualenv: - Do we have a persistent virtual env dir set in `self._venv_dir`? - 1. Yes: Does a directory at that path exist? - 1. No: Create it, and create a virtual env inside it - 2. Yes: Does the directory have a virtual env inside it? - 1. No: Create one in this directory and return it - 2. Yes: Return this virtual env - 2. No: Create a temporary virtual env and return it + """Helper method that parses virtual env configuration + and returns a DBT binary within the resulting virtualenv""" - """ - if self._venv_dir is not None: - if self._venv_dir.is_dir(): - py_interpreter_path = Path(f"{self._venv_dir}/bin/python") + # No virtualenv_dir set, so revert to making a temporary virtualenv + if self.virtualenv_dir is None: + self.log.info("Creating temporary virtualenv") + self._venv_tmp_dir = TemporaryDirectory(prefix="cosmos-venv") - self.log.info(f"Checking for venv interpreter: {py_interpreter_path} : {py_interpreter_path.is_file()}") - if py_interpreter_path.is_file(): - self.log.info(f"Found Python interpreter in cached virtualenv: `{str(py_interpreter_path)}`") - return str(py_interpreter_path) - else: - os.mkdir(self._venv_dir) + return prepare_virtualenv( + venv_directory=self._venv_tmp_dir.name, + python_bin=PY_INTERPRETER, + system_site_packages=self.py_system_site_packages, + requirements=self.py_requirements, + ) - self.log.info(f"Creating virtualenv at `{self._venv_dir}") - venv_directory = str(self._venv_dir) + self.log.info(f"Checking if {str(self._lock_file)} exists") + while not self._is_lock_available(): + self.log.info("Waiting for lock to release") + time.sleep(1) - else: - self.log.info("Creating temporary virtualenv") - self._venv_tmp_dir = TemporaryDirectory(prefix="cosmos-venv") - venv_directory = self._venv_tmp_dir.name + self.log.info(f"Creating virtualenv at `{self.virtualenv_dir}") + self.log.info(f"Acquiring available lock") + self._acquire_venv_lock() - return prepare_virtualenv( - venv_directory=venv_directory, + py_bin = prepare_virtualenv( + venv_directory=str(self.virtualenv_dir), python_bin=PY_INTERPRETER, system_site_packages=self.py_system_site_packages, requirements=self.py_requirements, ) + self.log.info("Releasing lock") + self._release_venv_lock() + + return py_bin + + @property + def _lock_file(self) -> Path: + return Path(f"{self.virtualenv_dir}/LOCK") + + def _is_lock_available(self) -> bool: + if self._lock_file.is_file(): + with open(self._lock_file, "r") as lf: + pid = int(lf.read()) + + self.log.info(f"Checking for running process with PID {pid}") + _process_running = psutil.Process(pid).is_running() + + self.log.info(f"Process {pid} running: {_process_running}") + return not _process_running + + return True + + def _acquire_venv_lock(self) -> None: + if not self.virtualenv_dir.is_dir(): + os.mkdir(str(self.virtualenv_dir)) + + with open(self._lock_file, "w") as lf: + pid = str(os.getpid()) + self.log.info(f"Acquiring lock at {self._lock_file} with pid {pid}") + lf.write(pid) + + def _release_venv_lock(self) -> None: + self._lock_file.unlink() + class DbtLSVirtualenvOperator(DbtVirtualenvBaseOperator, DbtLSLocalOperator): """ @@ -181,3 +211,4 @@ class DbtDocsVirtualenvOperator(DbtVirtualenvBaseOperator, DbtDocsLocalOperator) Executes `dbt docs generate` command within a Python Virtual Environment, that is created before running the dbt command and deleted just after. """ + From 21838d3b1fe5ce9d0644ff366d746b613199da1f Mon Sep 17 00:00:00 2001 From: Lennart Kloppenburg Date: Wed, 8 Nov 2023 09:27:13 +0100 Subject: [PATCH 10/18] Return types and private methods+attrs --- cosmos/operators/virtualenv.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/cosmos/operators/virtualenv.py b/cosmos/operators/virtualenv.py index 6885e4161..773775ed2 100644 --- a/cosmos/operators/virtualenv.py +++ b/cosmos/operators/virtualenv.py @@ -42,7 +42,7 @@ class DbtVirtualenvBaseOperator(DbtLocalBaseOperator): within the virtual environment (if py_requirements argument is specified). Avoid using unless the dbt job requires it. """ - template_fields = DbtLocalBaseOperator.template_fields + ("virtualenv_dir",) + template_fields = DbtLocalBaseOperator.template_fields + ("virtualenv_dir",) # type: ignore[operator] def __init__( self, @@ -113,14 +113,14 @@ def _get_or_create_venv_py_interpreter(self) -> str: requirements=self.py_requirements, ) - self.log.info(f"Checking if {str(self._lock_file)} exists") + self.log.info(f"Checking if {str(self.__lock_file)} exists") while not self._is_lock_available(): self.log.info("Waiting for lock to release") time.sleep(1) self.log.info(f"Creating virtualenv at `{self.virtualenv_dir}") self.log.info(f"Acquiring available lock") - self._acquire_venv_lock() + self.__acquire_venv_lock() py_bin = prepare_virtualenv( venv_directory=str(self.virtualenv_dir), @@ -130,17 +130,17 @@ def _get_or_create_venv_py_interpreter(self) -> str: ) self.log.info("Releasing lock") - self._release_venv_lock() + self.__release_venv_lock() return py_bin @property - def _lock_file(self) -> Path: + def __lock_file(self) -> Path: return Path(f"{self.virtualenv_dir}/LOCK") def _is_lock_available(self) -> bool: - if self._lock_file.is_file(): - with open(self._lock_file, "r") as lf: + if self.__lock_file.is_file(): + with open(self.__lock_file, "r") as lf: pid = int(lf.read()) self.log.info(f"Checking for running process with PID {pid}") @@ -151,17 +151,17 @@ def _is_lock_available(self) -> bool: return True - def _acquire_venv_lock(self) -> None: + def __acquire_venv_lock(self) -> None: if not self.virtualenv_dir.is_dir(): os.mkdir(str(self.virtualenv_dir)) - with open(self._lock_file, "w") as lf: + with open(self.__lock_file, "w") as lf: pid = str(os.getpid()) - self.log.info(f"Acquiring lock at {self._lock_file} with pid {pid}") + self.log.info(f"Acquiring lock at {self.__lock_file} with pid {pid}") lf.write(pid) - def _release_venv_lock(self) -> None: - self._lock_file.unlink() + def __release_venv_lock(self) -> None: + self.__lock_file.unlink() class DbtLSVirtualenvOperator(DbtVirtualenvBaseOperator, DbtLSLocalOperator): From b2805f0cc6b84ecab0a5fdb93fa8d62fef8b2e3e Mon Sep 17 00:00:00 2001 From: Lennart Kloppenburg Date: Thu, 9 Nov 2023 19:43:13 +0100 Subject: [PATCH 11/18] Add better checks around releasing lock file --- cosmos/operators/virtualenv.py | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/cosmos/operators/virtualenv.py b/cosmos/operators/virtualenv.py index 773775ed2..b6e0f3f1c 100644 --- a/cosmos/operators/virtualenv.py +++ b/cosmos/operators/virtualenv.py @@ -114,7 +114,7 @@ def _get_or_create_venv_py_interpreter(self) -> str: ) self.log.info(f"Checking if {str(self.__lock_file)} exists") - while not self._is_lock_available(): + while not self.__is_lock_available(): self.log.info("Waiting for lock to release") time.sleep(1) @@ -138,7 +138,11 @@ def _get_or_create_venv_py_interpreter(self) -> str: def __lock_file(self) -> Path: return Path(f"{self.virtualenv_dir}/LOCK") - def _is_lock_available(self) -> bool: + @property + def _pid(self) -> int: + return os.getpid() + + def __is_lock_available(self) -> bool: if self.__lock_file.is_file(): with open(self.__lock_file, "r") as lf: pid = int(lf.read()) @@ -156,12 +160,20 @@ def __acquire_venv_lock(self) -> None: os.mkdir(str(self.virtualenv_dir)) with open(self.__lock_file, "w") as lf: - pid = str(os.getpid()) - self.log.info(f"Acquiring lock at {self.__lock_file} with pid {pid}") - lf.write(pid) + self.log.info(f"Acquiring lock at {self.__lock_file} with pid {str(self._pid)}") + lf.write(str(self._pid)) def __release_venv_lock(self) -> None: - self.__lock_file.unlink() + if not self.__lock_file.is_file(): + raise FileNotFoundError(f"Lockfile {self.__lock_file} not found") + + with open(self.__lock_file, "r") as lf: + lock_file_pid = int(lf.read()) + + if lock_file_pid == self._pid: + return self.__lock_file.unlink() + + self.log.warn(f"Lockfile owned by process of pid {lock_file_pid}, while operator has pid {self._pid}") class DbtLSVirtualenvOperator(DbtVirtualenvBaseOperator, DbtLSLocalOperator): From 4af7c31388493356c42a54111fb914f428a1f8a5 Mon Sep 17 00:00:00 2001 From: Lennart Kloppenburg Date: Thu, 9 Nov 2023 20:49:35 +0100 Subject: [PATCH 12/18] Add a validation decorator --- cosmos/operators/virtualenv.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/cosmos/operators/virtualenv.py b/cosmos/operators/virtualenv.py index b6e0f3f1c..babc431b8 100644 --- a/cosmos/operators/virtualenv.py +++ b/cosmos/operators/virtualenv.py @@ -10,6 +10,7 @@ from airflow.compat.functools import cached_property from airflow.utils.python_virtualenv import prepare_virtualenv from cosmos.hooks.subprocess import FullOutputSubprocessResult +from cosmos.exceptions import CosmosValueError from cosmos.log import get_logger from cosmos.operators.local import ( @@ -31,6 +32,16 @@ PY_INTERPRETER = "python3" +def depends_on_virtualenv_dir(method): + def wrapper(operator: DbtVirtualenvBaseOperator, *args): + if operator.virtualenv_dir is None: + raise CosmosValueError(f"Method relies on value of parameter `virtualenv_dir` which is None.") + + logger.info(f"Operator: {operator}") + logger.info(f"Args: {args}") + method(operator, *args) + return wrapper + class DbtVirtualenvBaseOperator(DbtLocalBaseOperator): """ Executes a dbt core cli command within a Python Virtual Environment, that is created before running the dbt command @@ -142,6 +153,7 @@ def __lock_file(self) -> Path: def _pid(self) -> int: return os.getpid() + @depends_on_virtualenv_dir def __is_lock_available(self) -> bool: if self.__lock_file.is_file(): with open(self.__lock_file, "r") as lf: @@ -155,6 +167,7 @@ def __is_lock_available(self) -> bool: return True + @depends_on_virtualenv_dir def __acquire_venv_lock(self) -> None: if not self.virtualenv_dir.is_dir(): os.mkdir(str(self.virtualenv_dir)) @@ -163,6 +176,7 @@ def __acquire_venv_lock(self) -> None: self.log.info(f"Acquiring lock at {self.__lock_file} with pid {str(self._pid)}") lf.write(str(self._pid)) + @depends_on_virtualenv_dir def __release_venv_lock(self) -> None: if not self.__lock_file.is_file(): raise FileNotFoundError(f"Lockfile {self.__lock_file} not found") From e5ccdce8db9e79fe34ce0ed5c49e4a9a012b7718 Mon Sep 17 00:00:00 2001 From: Lennart Kloppenburg Date: Thu, 16 Nov 2023 11:51:02 +0100 Subject: [PATCH 13/18] Mock + rebase --- cosmos/operators/virtualenv.py | 4 ++-- tests/operators/test_virtualenv.py | 3 +++ tests/test_converter.py | 6 +++--- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/cosmos/operators/virtualenv.py b/cosmos/operators/virtualenv.py index babc431b8..5036064f2 100644 --- a/cosmos/operators/virtualenv.py +++ b/cosmos/operators/virtualenv.py @@ -125,7 +125,7 @@ def _get_or_create_venv_py_interpreter(self) -> str: ) self.log.info(f"Checking if {str(self.__lock_file)} exists") - while not self.__is_lock_available(): + while not self._is_lock_available(): self.log.info("Waiting for lock to release") time.sleep(1) @@ -154,7 +154,7 @@ def _pid(self) -> int: return os.getpid() @depends_on_virtualenv_dir - def __is_lock_available(self) -> bool: + def _is_lock_available(self) -> bool: if self.__lock_file.is_file(): with open(self.__lock_file, "r") as lf: pid = int(lf.read()) diff --git a/tests/operators/test_virtualenv.py b/tests/operators/test_virtualenv.py index e13228f57..3997fa75e 100644 --- a/tests/operators/test_virtualenv.py +++ b/tests/operators/test_virtualenv.py @@ -72,15 +72,18 @@ def test_run_command( @patch("cosmos.operators.virtualenv.DbtLocalBaseOperator.store_compiled_sql") @patch("cosmos.operators.virtualenv.DbtLocalBaseOperator.exception_handling") @patch("cosmos.operators.virtualenv.DbtLocalBaseOperator.subprocess_hook") +@patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator._is_lock_available") @patch("airflow.hooks.base.BaseHook.get_connection") def test_supply_virtualenv_dir_flag( mock_get_connection, + mock_lock_available, mock_subprocess_hook, mock_exception_handling, mock_store_compiled_sql, mock_calculate_openlineage_events_completes, mock_execute, ): + mock_lock_available.return_value = True mock_get_connection.return_value = Connection( conn_id="fake_conn", conn_type="postgres", diff --git a/tests/test_converter.py b/tests/test_converter.py index f0777bb02..7bd84a046 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -203,12 +203,14 @@ def test_converter_creates_dag_with_project_path_str(mock_load_dbt_graph, execut # (ExecutionMode.DOCKER, {"image": "sample-image"}), ], ) +@patch("cosmos.converter.DbtGraph.filtered_nodes", nodes) +@patch("cosmos.converter.DbtGraph.load") def test_converter_raises_warning(mock_load_dbt_graph, execution_mode, virtualenv_dir, operator_args, caplog): """ This test will raise a warning if we are trying to pass ExecutionMode != `VirtualEnv` and still pass a defined `virtualenv_dir` """ - project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT) + project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT.as_posix()) execution_config = ExecutionConfig(execution_mode=execution_mode, virtualenv_dir=virtualenv_dir) render_config = RenderConfig(emit_datasets=True) profile_config = ProfileConfig( @@ -226,8 +228,6 @@ def test_converter_raises_warning(mock_load_dbt_graph, execution_mode, virtualen operator_args=operator_args, ) - assert converter - assert "`ExecutionConfig.virtualenv_dir` is only supported when \ ExecutionConfig.execution_mode is set to ExecutionMode.VIRTUALENV." in caplog.text From a0f955070495715a923b11a7ab7671e296ec8c76 Mon Sep 17 00:00:00 2001 From: MrBones757 Date: Sat, 4 Nov 2023 00:32:11 +0800 Subject: [PATCH 14/18] Support `ProjectConfig.dbt_project_path = None` & different paths for Rendering and Execution (#634) This MR finishes the work that was started in #605 to add full support for ProjectConfig.dbt_project_path = None, and implements #568. Within this PR, several things have been updated: 1 - Added project_path fields to RenderConfig and ExecutionConfig 2 - Simplified the consumption of RenderConfig in the dbtGraph class 3 - added option to configure different dbt executables for Rendering vs Execution. Closes: #568 --- cosmos/config.py | 2 +- cosmos/converter.py | 21 ++----------- cosmos/dbt/graph.py | 37 ++++++++++++++++------ tests/dbt/test_graph.py | 14 ++++----- tests/test_converter.py | 68 ----------------------------------------- 5 files changed, 38 insertions(+), 104 deletions(-) diff --git a/cosmos/config.py b/cosmos/config.py index 72b1f2eb9..e4985ed1b 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -301,8 +301,8 @@ class ExecutionConfig: dbt_project_path: InitVar[str | Path | None] = None virtualenv_dir: str | Path | None = None + project_path: Path | None = field(init=False) def __post_init__(self, dbt_project_path: str | Path | None) -> None: self.project_path = Path(dbt_project_path) if dbt_project_path else None - diff --git a/cosmos/converter.py b/cosmos/converter.py index 6c87a416a..eebb30de8 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -230,23 +230,8 @@ def __init__( raise CosmosValueError( "RenderConfig.dbt_project_path is required for rendering an airflow DAG from a DBT Graph if no manifest is provided." ) - emit_datasets = render_config.emit_datasets - dbt_root_path = project_config.dbt_project_path.parent - dbt_project_name = project_config.dbt_project_path.name - dbt_models_dir = project_config.models_relative_path - dbt_seeds_dir = project_config.seeds_relative_path - dbt_snapshots_dir = project_config.snapshots_relative_path - test_behavior = render_config.test_behavior - select = render_config.select - exclude = render_config.exclude - dbt_deps = render_config.dbt_deps - execution_mode = execution_config.execution_mode - load_mode = render_config.load_method - manifest_path = project_config.parsed_manifest_path - dbt_executable_path = execution_config.dbt_executable_path - node_converters = render_config.node_converters - - if execution_mode != ExecutionMode.VIRTUALENV and execution_config.virtualenv_dir is not None: + + if execution_config.execution_mode != ExecutionMode.VIRTUALENV and execution_config.virtualenv_dir is not None: logger.warning( "`ExecutionConfig.virtualenv_dir` is only supported when \ ExecutionConfig.execution_mode is set to ExecutionMode.VIRTUALENV." @@ -296,7 +281,7 @@ def __init__( task_args, execution_mode=execution_config.execution_mode, ) - if (execution_mode == ExecutionMode.VIRTUALENV and execution_config.virtualenv_dir is not None): + if execution_config.execution_mode == ExecutionMode.VIRTUALENV and execution_config.virtualenv_dir is not None: task_args["virtualenv_dir"] = execution_config.virtualenv_dir build_airflow_graph( diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index b41c15a49..df2390e49 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -21,6 +21,7 @@ ExecutionMode, LoadMode, ) +from cosmos.dbt.executable import get_system_dbt from cosmos.dbt.parser.project import LegacyDbtProject from cosmos.dbt.project import create_symlinks, environ from cosmos.dbt.selector import select_nodes @@ -69,6 +70,14 @@ def name(self) -> str: return self.resource_name.replace(".", "_") +def create_symlinks(project_path: Path, tmp_dir: Path) -> None: + """Helper function to create symlinks to the dbt project files.""" + ignore_paths = (DBT_LOG_DIR_NAME, DBT_TARGET_DIR_NAME, "dbt_packages", "profiles.yml") + for child_name in os.listdir(project_path): + if child_name not in ignore_paths: + os.symlink(project_path / child_name, tmp_dir / child_name) + + def run_command(command: list[str], tmp_dir: Path, env_vars: dict[str, str]) -> str: """Run a command in a subprocess, returning the stdout.""" logger.info("Running command: `%s`", " ".join(command)) @@ -124,6 +133,15 @@ class DbtGraph: Supports different ways of loading the `dbt` project into this representation. Different loading methods can result in different `nodes` and `filtered_nodes`. + + Example of how to use: + + dbt_graph = DbtGraph( + project=ProjectConfig(dbt_project_path=DBT_PROJECT_PATH), + render_config=RenderConfig(exclude=["*orders*"], select=[]), + dbt_cmd="/usr/local/bin/dbt" + ) + dbt_graph.load(method=LoadMode.DBT_LS, execution_mode=ExecutionMode.LOCAL) """ nodes: dict[str, DbtNode] = dict() @@ -137,12 +155,16 @@ def __init__( profile_config: ProfileConfig | None = None, # dbt_vars only supported for LegacyDbtProject dbt_vars: dict[str, str] | None = None, + dbt_cmd: str = get_system_dbt(), + operator_args: dict[str, Any] | None = None, ): self.project = project self.render_config = render_config self.profile_config = profile_config self.execution_config = execution_config self.dbt_vars = dbt_vars or {} + self.operator_args = operator_args or {} + self.dbt_cmd = dbt_cmd def load( self, @@ -181,9 +203,7 @@ def load( else: load_method[method]() - def run_dbt_ls( - self, dbt_cmd: str, project_path: Path, tmp_dir: Path, env_vars: dict[str, str] - ) -> dict[str, DbtNode]: + def run_dbt_ls(self, project_path: Path, tmp_dir: Path, env_vars: dict[str, str]) -> dict[str, DbtNode]: """Runs dbt ls command and returns the parsed nodes.""" ls_command = [dbt_cmd, "ls", "--output", "json"] @@ -223,10 +243,6 @@ def load_via_dbt_ls(self) -> None: * self.nodes * self.filtered_nodes """ - self.render_config.validate_dbt_command(fallback_cmd=self.execution_config.dbt_executable_path) - dbt_cmd = self.render_config.dbt_executable_path - dbt_cmd = dbt_cmd.as_posix() if isinstance(dbt_cmd, Path) else dbt_cmd - logger.info(f"Trying to parse the dbt project in `{self.render_config.project_path}` using dbt ls...") if not self.render_config.project_path or not self.execution_config.project_path: raise CosmosLoadDbtException( @@ -236,6 +252,9 @@ def load_via_dbt_ls(self) -> None: if not self.profile_config: raise CosmosLoadDbtException("Unable to load project via dbt ls without a profile config.") + if not self.profile_config: + raise CosmosLoadDbtException("Unable to load project via dbt ls without a profile config.") + with tempfile.TemporaryDirectory() as tmpdir: logger.info( f"Content of the dbt project dir {self.render_config.project_path}: `{os.listdir(self.render_config.project_path)}`" @@ -266,12 +285,12 @@ def load_via_dbt_ls(self) -> None: env[DBT_TARGET_PATH_ENVVAR] = str(self.target_dir) if self.render_config.dbt_deps: - deps_command = [dbt_cmd, "deps"] + deps_command = [self.dbt_cmd, "deps"] deps_command.extend(self.local_flags) stdout = run_command(deps_command, tmpdir_path, env) logger.debug("dbt deps output: %s", stdout) - nodes = self.run_dbt_ls(dbt_cmd, self.execution_config.project_path, tmpdir_path, env) + nodes = self.run_dbt_ls(self.execution_config.project_path, tmpdir_path, env) self.nodes = nodes self.filtered_nodes = nodes diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 7e941cb49..6d092d623 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -6,7 +6,7 @@ import pytest -from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig, CosmosConfigException +from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig from cosmos.constants import DbtResourceType, ExecutionMode from cosmos.dbt.graph import ( CosmosLoadDbtException, @@ -374,8 +374,9 @@ def test_load_via_dbt_ls_without_exclude(project_name, postgres_profile_config): def test_load_via_custom_without_project_path(): project_config = ProjectConfig(manifest_path=SAMPLE_MANIFEST, project_name="test") execution_config = ExecutionConfig() - render_config = RenderConfig(dbt_executable_path="/inexistent/dbt") + render_config = RenderConfig() dbt_graph = DbtGraph( + dbt_cmd="/inexistent/dbt", project=project_config, execution_config=execution_config, render_config=render_config, @@ -391,10 +392,9 @@ def test_load_via_custom_without_project_path(): def test_load_via_dbt_ls_without_profile(mock_validate_dbt_command): project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) - render_config = RenderConfig( - dbt_executable_path="existing-dbt-cmd", dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME - ) + render_config = RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) dbt_graph = DbtGraph( + dbt_cmd="/inexistent/dbt", project=project_config, execution_config=execution_config, render_config=render_config, @@ -410,9 +410,7 @@ def test_load_via_dbt_ls_without_profile(mock_validate_dbt_command): def test_load_via_dbt_ls_with_invalid_dbt_path(mock_which): project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) execution_config = ExecutionConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) - render_config = RenderConfig( - dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, dbt_executable_path="/inexistent/dbt" - ) + render_config = RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) with patch("pathlib.Path.exists", return_value=True): dbt_graph = DbtGraph( project=project_config, diff --git a/tests/test_converter.py b/tests/test_converter.py index 7bd84a046..2fd43cd7f 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -270,74 +270,6 @@ def test_converter_fails_execution_config_no_project_dir(mock_load_dbt_graph, ex ) -def test_converter_fails_render_config_invalid_dbt_path_with_dbt_ls(): - """ - Validate that a dbt project fails to be rendered to Airflow with DBT_LS if - the dbt command is invalid. - """ - project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT.as_posix(), project_name="sample") - execution_config = ExecutionConfig( - execution_mode=ExecutionMode.LOCAL, - dbt_executable_path="invalid-execution-dbt", - ) - render_config = RenderConfig( - emit_datasets=True, - dbt_executable_path="invalid-render-dbt", - ) - profile_config = ProfileConfig( - profile_name="my_profile_name", - target_name="my_target_name", - profiles_yml_filepath=SAMPLE_PROFILE_YML, - ) - with pytest.raises(CosmosConfigException) as err_info: - with DAG("test-id", start_date=datetime(2022, 1, 1)) as dag: - DbtToAirflowConverter( - dag=dag, - nodes=nodes, - project_config=project_config, - profile_config=profile_config, - execution_config=execution_config, - render_config=render_config, - ) - assert ( - err_info.value.args[0] - == "Unable to find the dbt executable, attempted: and ." - ) - - -def test_converter_fails_render_config_invalid_dbt_path_with_manifest(): - """ - Validate that a dbt project succeeds to be rendered to Airflow with DBT_MANIFEST even when - the dbt command is invalid. - """ - project_config = ProjectConfig(manifest_path=SAMPLE_DBT_MANIFEST.as_posix(), project_name="sample") - - execution_config = ExecutionConfig( - execution_mode=ExecutionMode.LOCAL, - dbt_executable_path="invalid-execution-dbt", - dbt_project_path=SAMPLE_DBT_PROJECT.as_posix(), - ) - render_config = RenderConfig( - emit_datasets=True, - dbt_executable_path="invalid-render-dbt", - ) - profile_config = ProfileConfig( - profile_name="my_profile_name", - target_name="my_target_name", - profiles_yml_filepath=SAMPLE_PROFILE_YML, - ) - with DAG("test-id", start_date=datetime(2022, 1, 1)) as dag: - converter = DbtToAirflowConverter( - dag=dag, - nodes=nodes, - project_config=project_config, - profile_config=profile_config, - execution_config=execution_config, - render_config=render_config, - ) - assert converter - - @pytest.mark.parametrize( "execution_mode,operator_args", [ From 73c5fe02ddb9057ed6c410379f900c08dc7870bb Mon Sep 17 00:00:00 2001 From: Lennart Kloppenburg Date: Fri, 17 Nov 2023 10:43:56 +0100 Subject: [PATCH 15/18] Fix rebase conflicts --- cosmos/dbt/graph.py | 15 ++++----------- cosmos/operators/virtualenv.py | 8 ++++---- tests/dbt/test_graph.py | 7 ++++--- 3 files changed, 12 insertions(+), 18 deletions(-) diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index df2390e49..7fc19b024 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -3,6 +3,7 @@ import itertools import json import os +import shutil import tempfile import yaml from dataclasses import dataclass, field @@ -70,14 +71,6 @@ def name(self) -> str: return self.resource_name.replace(".", "_") -def create_symlinks(project_path: Path, tmp_dir: Path) -> None: - """Helper function to create symlinks to the dbt project files.""" - ignore_paths = (DBT_LOG_DIR_NAME, DBT_TARGET_DIR_NAME, "dbt_packages", "profiles.yml") - for child_name in os.listdir(project_path): - if child_name not in ignore_paths: - os.symlink(project_path / child_name, tmp_dir / child_name) - - def run_command(command: list[str], tmp_dir: Path, env_vars: dict[str, str]) -> str: """Run a command in a subprocess, returning the stdout.""" logger.info("Running command: `%s`", " ".join(command)) @@ -205,7 +198,7 @@ def load( def run_dbt_ls(self, project_path: Path, tmp_dir: Path, env_vars: dict[str, str]) -> dict[str, DbtNode]: """Runs dbt ls command and returns the parsed nodes.""" - ls_command = [dbt_cmd, "ls", "--output", "json"] + ls_command = [self.dbt_cmd, "ls", "--output", "json"] if self.render_config.exclude: ls_command.extend(["--exclude", *self.render_config.exclude]) @@ -252,8 +245,8 @@ def load_via_dbt_ls(self) -> None: if not self.profile_config: raise CosmosLoadDbtException("Unable to load project via dbt ls without a profile config.") - if not self.profile_config: - raise CosmosLoadDbtException("Unable to load project via dbt ls without a profile config.") + if not shutil.which(self.dbt_cmd): + raise CosmosLoadDbtException(f"Unable to find the dbt executable: {self.dbt_cmd}") with tempfile.TemporaryDirectory() as tmpdir: logger.info( diff --git a/cosmos/operators/virtualenv.py b/cosmos/operators/virtualenv.py index 5036064f2..5d31f4ef8 100644 --- a/cosmos/operators/virtualenv.py +++ b/cosmos/operators/virtualenv.py @@ -5,7 +5,7 @@ import time from pathlib import Path from tempfile import TemporaryDirectory -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Callable from airflow.compat.functools import cached_property from airflow.utils.python_virtualenv import prepare_virtualenv @@ -32,8 +32,8 @@ PY_INTERPRETER = "python3" -def depends_on_virtualenv_dir(method): - def wrapper(operator: DbtVirtualenvBaseOperator, *args): +def depends_on_virtualenv_dir(method: Callable[[Any], Any]) -> Callable[[Any], Any]: + def wrapper(operator: DbtVirtualenvBaseOperator, *args: Any) -> None: if operator.virtualenv_dir is None: raise CosmosValueError(f"Method relies on value of parameter `virtualenv_dir` which is None.") @@ -169,7 +169,7 @@ def _is_lock_available(self) -> bool: @depends_on_virtualenv_dir def __acquire_venv_lock(self) -> None: - if not self.virtualenv_dir.is_dir(): + if not self.virtualenv_dir.is_dir(): # type: ignore os.mkdir(str(self.virtualenv_dir)) with open(self.__lock_file, "w") as lf: diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 6d092d623..995a8be16 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -6,7 +6,7 @@ import pytest -from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig +from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig, CosmosConfigException from cosmos.constants import DbtResourceType, ExecutionMode from cosmos.dbt.graph import ( CosmosLoadDbtException, @@ -413,6 +413,7 @@ def test_load_via_dbt_ls_with_invalid_dbt_path(mock_which): render_config = RenderConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) with patch("pathlib.Path.exists", return_value=True): dbt_graph = DbtGraph( + dbt_cmd="/inexistent/dbt", project=project_config, execution_config=execution_config, render_config=render_config, @@ -422,10 +423,10 @@ def test_load_via_dbt_ls_with_invalid_dbt_path(mock_which): profiles_yml_filepath=Path(__file__).parent.parent / "sample/profiles.yml", ), ) - with pytest.raises(CosmosConfigException) as err_info: + with pytest.raises(CosmosLoadDbtException) as err_info: dbt_graph.load_via_dbt_ls() - expected = "Unable to find the dbt executable, attempted: and ." + expected = "Unable to find the dbt executable: /inexistent/dbt" assert err_info.value.args[0] == expected From 3a84aa7288af071693bae9661fb1247e97676ace Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 9 Nov 2023 16:09:54 +0000 Subject: [PATCH 16/18] Fix reusing config accross TaskGroups/DAGs (#664) If execution_config was reused, Cosmos 1.2.2 would raise: ``` astronomer-cosmos/dags/basic_cosmos_task_group.py Traceback (most recent call last): File "/Users/tati/Code/cosmos-clean/astronomer-cosmos/venv-38/lib/python3.8/site-packages/airflow/models/dagbag.py", line 343, in parse loader.exec_module(new_module) File "", line 848, in exec_module File "", line 219, in _call_with_frames_removed File "/Users/tati/Code/cosmos-clean/astronomer-cosmos/dags/basic_cosmos_task_group.py", line 74, in basic_cosmos_task_group() File "/Users/tati/Code/cosmos-clean/astronomer-cosmos/venv-38/lib/python3.8/site-packages/airflow/models/dag.py", line 3817, in factory f(**f_kwargs) File "/Users/tati/Code/cosmos-clean/astronomer-cosmos/dags/basic_cosmos_task_group.py", line 54, in basic_cosmos_task_group orders = DbtTaskGroup( File "/Users/tati/Code/cosmos-clean/astronomer-cosmos/cosmos/airflow/task_group.py", line 26, in __init__ DbtToAirflowConverter.__init__(self, *args, **specific_kwargs(**kwargs)) File "/Users/tati/Code/cosmos-clean/astronomer-cosmos/cosmos/converter.py", line 113, in __init__ raise CosmosValueError( cosmos.exceptions.CosmosValueError: ProjectConfig.dbt_project_path is mutually exclusive with RenderConfig.dbt_project_path and ExecutionConfig.dbt_project_path.If using RenderConfig.dbt_project_path or ExecutionConfig.dbt_project_path, ProjectConfig.dbt_project_path should be None ``` This has been raised by an Astro customer and our field engineer, who tried to run: https://github.com/astronomer/cosmos-demo --- cosmos/converter.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/cosmos/converter.py b/cosmos/converter.py index eebb30de8..015a6f12b 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -218,7 +218,12 @@ def __init__( # If we are using the old interface, we should migrate it to the new interface # This is safe to do now since we have validated which config interface we're using if project_config.dbt_project_path: - execution_config, render_config = migrate_to_new_interface(execution_config, project_config, render_config) + # We copy the configuration so the change does not affect other DAGs or TaskGroups + # that may reuse the same original configuration + render_config = copy.deepcopy(render_config) + execution_config = copy.deepcopy(execution_config) + render_config.project_path = project_config.dbt_project_path + execution_config.project_path = project_config.dbt_project_path validate_adapted_user_config(execution_config, project_config, render_config) From 547b0aff7c5a5e47c8f17f47abeab374f245c8bf Mon Sep 17 00:00:00 2001 From: Lennart Kloppenburg Date: Wed, 20 Dec 2023 11:46:28 +0100 Subject: [PATCH 17/18] Iron out locking flow --- cosmos/operators/virtualenv.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/cosmos/operators/virtualenv.py b/cosmos/operators/virtualenv.py index 5d31f4ef8..34e02a745 100644 --- a/cosmos/operators/virtualenv.py +++ b/cosmos/operators/virtualenv.py @@ -36,9 +36,7 @@ def depends_on_virtualenv_dir(method: Callable[[Any], Any]) -> Callable[[Any], A def wrapper(operator: DbtVirtualenvBaseOperator, *args: Any) -> None: if operator.virtualenv_dir is None: raise CosmosValueError(f"Method relies on value of parameter `virtualenv_dir` which is None.") - - logger.info(f"Operator: {operator}") - logger.info(f"Args: {args}") + method(operator, *args) return wrapper @@ -153,14 +151,17 @@ def __lock_file(self) -> Path: def _pid(self) -> int: return os.getpid() - @depends_on_virtualenv_dir + #@depends_on_virtualenv_dir def _is_lock_available(self) -> bool: if self.__lock_file.is_file(): with open(self.__lock_file, "r") as lf: pid = int(lf.read()) self.log.info(f"Checking for running process with PID {pid}") - _process_running = psutil.Process(pid).is_running() + try: + _process_running = psutil.Process(pid).is_running() + except psutil.NoSuchProcess: + _process_running = False self.log.info(f"Process {pid} running: {_process_running}") return not _process_running @@ -179,7 +180,9 @@ def __acquire_venv_lock(self) -> None: @depends_on_virtualenv_dir def __release_venv_lock(self) -> None: if not self.__lock_file.is_file(): - raise FileNotFoundError(f"Lockfile {self.__lock_file} not found") + self.log.warn(f"Lockfile {self.__lock_file} not found, perhaps deleted by other concurrent operator?") + + return with open(self.__lock_file, "r") as lf: lock_file_pid = int(lf.read()) From be0de1a1b6675a05ff008fd09ac05a01c75fcd2d Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 20 Dec 2023 10:46:48 +0000 Subject: [PATCH 18/18] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20f?= =?UTF-8?q?ormat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cosmos/converter.py | 2 +- cosmos/operators/virtualenv.py | 25 ++++++++++++++----------- tests/test_converter.py | 11 +++++++---- 3 files changed, 22 insertions(+), 16 deletions(-) diff --git a/cosmos/converter.py b/cosmos/converter.py index 015a6f12b..7e284aacb 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -235,7 +235,7 @@ def __init__( raise CosmosValueError( "RenderConfig.dbt_project_path is required for rendering an airflow DAG from a DBT Graph if no manifest is provided." ) - + if execution_config.execution_mode != ExecutionMode.VIRTUALENV and execution_config.virtualenv_dir is not None: logger.warning( "`ExecutionConfig.virtualenv_dir` is only supported when \ diff --git a/cosmos/operators/virtualenv.py b/cosmos/operators/virtualenv.py index 34e02a745..e57a7808c 100644 --- a/cosmos/operators/virtualenv.py +++ b/cosmos/operators/virtualenv.py @@ -32,14 +32,17 @@ PY_INTERPRETER = "python3" + def depends_on_virtualenv_dir(method: Callable[[Any], Any]) -> Callable[[Any], Any]: def wrapper(operator: DbtVirtualenvBaseOperator, *args: Any) -> None: if operator.virtualenv_dir is None: raise CosmosValueError(f"Method relies on value of parameter `virtualenv_dir` which is None.") method(operator, *args) + return wrapper + class DbtVirtualenvBaseOperator(DbtLocalBaseOperator): """ Executes a dbt core cli command within a Python Virtual Environment, that is created before running the dbt command @@ -51,7 +54,8 @@ class DbtVirtualenvBaseOperator(DbtLocalBaseOperator): within the virtual environment (if py_requirements argument is specified). Avoid using unless the dbt job requires it. """ - template_fields = DbtLocalBaseOperator.template_fields + ("virtualenv_dir",) # type: ignore[operator] + + template_fields = DbtLocalBaseOperator.template_fields + ("virtualenv_dir",) # type: ignore[operator] def __init__( self, @@ -107,7 +111,7 @@ def execute(self, context: Context) -> None: logger.info(output) def _get_or_create_venv_py_interpreter(self) -> str: - """Helper method that parses virtual env configuration + """Helper method that parses virtual env configuration and returns a DBT binary within the resulting virtualenv""" # No virtualenv_dir set, so revert to making a temporary virtualenv @@ -142,19 +146,19 @@ def _get_or_create_venv_py_interpreter(self) -> str: self.__release_venv_lock() return py_bin - + @property def __lock_file(self) -> Path: return Path(f"{self.virtualenv_dir}/LOCK") - + @property def _pid(self) -> int: return os.getpid() - - #@depends_on_virtualenv_dir + + # @depends_on_virtualenv_dir def _is_lock_available(self) -> bool: if self.__lock_file.is_file(): - with open(self.__lock_file, "r") as lf: + with open(self.__lock_file) as lf: pid = int(lf.read()) self.log.info(f"Checking for running process with PID {pid}") @@ -170,13 +174,13 @@ def _is_lock_available(self) -> bool: @depends_on_virtualenv_dir def __acquire_venv_lock(self) -> None: - if not self.virtualenv_dir.is_dir(): # type: ignore + if not self.virtualenv_dir.is_dir(): # type: ignore os.mkdir(str(self.virtualenv_dir)) with open(self.__lock_file, "w") as lf: self.log.info(f"Acquiring lock at {self.__lock_file} with pid {str(self._pid)}") lf.write(str(self._pid)) - + @depends_on_virtualenv_dir def __release_venv_lock(self) -> None: if not self.__lock_file.is_file(): @@ -184,7 +188,7 @@ def __release_venv_lock(self) -> None: return - with open(self.__lock_file, "r") as lf: + with open(self.__lock_file) as lf: lock_file_pid = int(lf.read()) if lock_file_pid == self._pid: @@ -240,4 +244,3 @@ class DbtDocsVirtualenvOperator(DbtVirtualenvBaseOperator, DbtDocsLocalOperator) Executes `dbt docs generate` command within a Python Virtual Environment, that is created before running the dbt command and deleted just after. """ - diff --git a/tests/test_converter.py b/tests/test_converter.py index 2fd43cd7f..1d9632ef7 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -169,7 +169,7 @@ def test_converter_creates_dag_with_seed(mock_load_dbt_graph, execution_mode, op "execution_mode,operator_args", [ (ExecutionMode.KUBERNETES, {}), - ] + ], ) @patch("cosmos.converter.DbtGraph.filtered_nodes", nodes) @patch("cosmos.converter.DbtGraph.load") @@ -196,6 +196,7 @@ def test_converter_creates_dag_with_project_path_str(mock_load_dbt_graph, execut ) assert converter + @pytest.mark.parametrize( "execution_mode,virtualenv_dir,operator_args", [ @@ -228,9 +229,11 @@ def test_converter_raises_warning(mock_load_dbt_graph, execution_mode, virtualen operator_args=operator_args, ) - assert "`ExecutionConfig.virtualenv_dir` is only supported when \ - ExecutionConfig.execution_mode is set to ExecutionMode.VIRTUALENV." in caplog.text - + assert ( + "`ExecutionConfig.virtualenv_dir` is only supported when \ + ExecutionConfig.execution_mode is set to ExecutionMode.VIRTUALENV." + in caplog.text + ) @pytest.mark.parametrize(