diff --git a/cosmos/config.py b/cosmos/config.py index fa68b44ab..fbd0cb9f7 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -207,7 +207,10 @@ class ExecutionConfig: :param execution_mode: The execution mode for dbt. Defaults to local :param dbt_executable_path: The path to the dbt executable. Defaults to dbt if available on the path. + :param virtualenv_dir: Directory path to locate the (cached) virtual env that + should be used for execution when execution mode is set to `ExecutionMode.VIRTUALENV` """ - execution_mode: ExecutionMode = ExecutionMode.LOCAL dbt_executable_path: str | Path = get_system_dbt() + virtualenv_dir: str | Path | None = None + diff --git a/cosmos/converter.py b/cosmos/converter.py index 8137da3ec..7a4b3e3f0 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -14,6 +14,7 @@ from cosmos.dbt.graph import DbtGraph from cosmos.dbt.project import DbtProject from cosmos.dbt.selector import retrieve_by_label +from cosmos.constants import ExecutionMode from cosmos.config import ProjectConfig, ExecutionConfig, RenderConfig, ProfileConfig from cosmos.exceptions import CosmosValueError from cosmos.log import get_logger @@ -104,7 +105,6 @@ def __init__( **kwargs: Any, ) -> None: project_config.validate_project() - emit_datasets = render_config.emit_datasets dbt_root_path = project_config.dbt_project_path.parent dbt_project_name = project_config.dbt_project_path.name @@ -121,6 +121,10 @@ def __init__( dbt_executable_path = execution_config.dbt_executable_path node_converters = render_config.node_converters + if (execution_mode != ExecutionMode.VIRTUALENV and execution_config.virtualenv_dir is not None): + logger.warning("`ExecutionConfig.virtualenv_dir` is only supported when \ + ExecutionConfig.execution_mode is set to ExecutionMode.VIRTUALENV.") + profile_args = {} if profile_config.profile_mapping: profile_args = profile_config.profile_mapping.profile_args @@ -155,6 +159,10 @@ def __init__( "profile_config": profile_config, "emit_datasets": emit_datasets, } + + if (execution_mode == ExecutionMode.VIRTUALENV and execution_config.virtualenv_dir is not None): + task_args["virtualenv_dir"] = execution_config.virtualenv_dir + if dbt_executable_path: task_args["dbt_executable_path"] = dbt_executable_path diff --git a/cosmos/operators/virtualenv.py b/cosmos/operators/virtualenv.py index 4d6338e09..5fff03b9f 100644 --- a/cosmos/operators/virtualenv.py +++ b/cosmos/operators/virtualenv.py @@ -45,11 +45,13 @@ def __init__( self, py_requirements: list[str] | None = None, py_system_site_packages: bool = False, + virtualenv_dir: Path | str | None = None, **kwargs: Any, ) -> None: self.py_requirements = py_requirements or [] self.py_system_site_packages = py_system_site_packages super().__init__(**kwargs) + self._venv_dir = virtualenv_dir self._venv_tmp_dir: None | TemporaryDirectory[str] = None @cached_property @@ -59,19 +61,14 @@ def venv_dbt_path( """ Path to the dbt binary within a Python virtualenv. - The first time this property is called, it creates a virtualenv and installs the dependencies based on the - self.py_requirements and self.py_system_site_packages. This value is cached for future calls. + The first time this property is called, it creates a new/temporary and installs the dependencies + based on the self.py_requirements and self.py_system_site_packages, or retrieves an existing virtualenv. + This value is cached for future calls. """ # We are reusing the virtualenv directory for all subprocess calls within this task/operator. # For this reason, we are not using contexts at this point. # The deletion of this directory is done explicitly at the end of the `execute` method. - self._venv_tmp_dir = TemporaryDirectory(prefix="cosmos-venv") - py_interpreter = prepare_virtualenv( - venv_directory=self._venv_tmp_dir.name, - python_bin=PY_INTERPRETER, - system_site_packages=self.py_system_site_packages, - requirements=self.py_requirements, - ) + py_interpreter = self._get_or_create_venv_py_interpreter() dbt_binary = Path(py_interpreter).parent / "dbt" cmd_output = self.subprocess_hook.run_command( [ @@ -97,6 +94,31 @@ def execute(self, context: Context) -> None: self._venv_tmp_dir.cleanup() logger.info(output) + def _get_or_create_venv_py_interpreter(self) -> str: + if self._venv_dir is not None: + py_interpreter_path = Path(f"{self._venv_dir}/bin/python") + + self.log.info(f"Checking for venv interpreter: {py_interpreter_path} : {py_interpreter_path.is_file()}") + if py_interpreter_path.is_file(): + + self.log.info(f"Found Python interpreter in cached virtualenv: `{str(py_interpreter_path)}`") + return str(py_interpreter_path) + + self.log.info(f"Creating virtualenv at `{self._venv_dir}") + venv_directory = str(self._venv_dir) + + else: + self.log.info(f"Creating temporary virtualenv") + self._venv_tmp_dir = TemporaryDirectory(prefix="cosmos-venv") + venv_directory = self._venv_tmp_dir.name + + return prepare_virtualenv( + venv_directory=venv_directory, + python_bin=PY_INTERPRETER, + system_site_packages=self.py_system_site_packages, + requirements=self.py_requirements, + ) + class DbtLSVirtualenvOperator(DbtVirtualenvBaseOperator, DbtLSLocalOperator): """ diff --git a/dev/dags/example_virtualenv.py b/dev/dags/example_virtualenv.py index 7b1368f8c..7ec4b0a16 100644 --- a/dev/dags/example_virtualenv.py +++ b/dev/dags/example_virtualenv.py @@ -4,6 +4,7 @@ import os from datetime import datetime from pathlib import Path +from airflow.configuration import get_airflow_home from cosmos import DbtDag, ExecutionMode, ExecutionConfig, ProjectConfig, ProfileConfig from cosmos.profiles import PostgresUserPasswordProfileMapping @@ -30,6 +31,9 @@ profile_config=profile_config, execution_config=ExecutionConfig( execution_mode=ExecutionMode.VIRTUALENV, + # We can enable this flag if we want Airflow to create one virtualenv + # and reuse that within the whole DAG. + # virtualenv_dir=f"{get_airflow_home()}/persistent-venv", ), operator_args={ "py_system_site_packages": False,