Skip to content

Commit

Permalink
Add support for virtual env directory flag
Browse files Browse the repository at this point in the history
  • Loading branch information
LennartKloppenburg committed Oct 18, 2023
1 parent e09ac6d commit 0364ea3
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 11 deletions.
5 changes: 4 additions & 1 deletion cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,10 @@ class ExecutionConfig:
:param execution_mode: The execution mode for dbt. Defaults to local
:param dbt_executable_path: The path to the dbt executable. Defaults to dbt if
available on the path.
:param virtualenv_dir: Directory path to locate the (cached) virtual env that
should be used for execution when execution mode is set to `ExecutionMode.VIRTUALENV`
"""

execution_mode: ExecutionMode = ExecutionMode.LOCAL
dbt_executable_path: str | Path = get_system_dbt()
virtualenv_dir: str | Path | None = None

10 changes: 9 additions & 1 deletion cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from cosmos.dbt.graph import DbtGraph
from cosmos.dbt.project import DbtProject
from cosmos.dbt.selector import retrieve_by_label
from cosmos.constants import ExecutionMode
from cosmos.config import ProjectConfig, ExecutionConfig, RenderConfig, ProfileConfig
from cosmos.exceptions import CosmosValueError
from cosmos.log import get_logger
Expand Down Expand Up @@ -104,7 +105,6 @@ def __init__(
**kwargs: Any,
) -> None:
project_config.validate_project()

emit_datasets = render_config.emit_datasets
dbt_root_path = project_config.dbt_project_path.parent
dbt_project_name = project_config.dbt_project_path.name
Expand All @@ -121,6 +121,10 @@ def __init__(
dbt_executable_path = execution_config.dbt_executable_path
node_converters = render_config.node_converters

if (execution_mode != ExecutionMode.VIRTUALENV and execution_config.virtualenv_dir is not None):
logger.warning("`ExecutionConfig.virtualenv_dir` is only supported when \
ExecutionConfig.execution_mode is set to ExecutionMode.VIRTUALENV.")

profile_args = {}
if profile_config.profile_mapping:
profile_args = profile_config.profile_mapping.profile_args
Expand Down Expand Up @@ -155,6 +159,10 @@ def __init__(
"profile_config": profile_config,
"emit_datasets": emit_datasets,
}

if (execution_mode == ExecutionMode.VIRTUALENV and execution_config.virtualenv_dir is not None):
task_args["virtualenv_dir"] = execution_config.virtualenv_dir

if dbt_executable_path:
task_args["dbt_executable_path"] = dbt_executable_path

Expand Down
40 changes: 31 additions & 9 deletions cosmos/operators/virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@ def __init__(
self,
py_requirements: list[str] | None = None,
py_system_site_packages: bool = False,
virtualenv_dir: Path | str | None = None,
**kwargs: Any,
) -> None:
self.py_requirements = py_requirements or []
self.py_system_site_packages = py_system_site_packages
super().__init__(**kwargs)
self._venv_dir = virtualenv_dir
self._venv_tmp_dir: None | TemporaryDirectory[str] = None

@cached_property
Expand All @@ -59,19 +61,14 @@ def venv_dbt_path(
"""
Path to the dbt binary within a Python virtualenv.
The first time this property is called, it creates a virtualenv and installs the dependencies based on the
self.py_requirements and self.py_system_site_packages. This value is cached for future calls.
The first time this property is called, it creates a new/temporary and installs the dependencies
based on the self.py_requirements and self.py_system_site_packages, or retrieves an existing virtualenv.
This value is cached for future calls.
"""
# We are reusing the virtualenv directory for all subprocess calls within this task/operator.
# For this reason, we are not using contexts at this point.
# The deletion of this directory is done explicitly at the end of the `execute` method.
self._venv_tmp_dir = TemporaryDirectory(prefix="cosmos-venv")
py_interpreter = prepare_virtualenv(
venv_directory=self._venv_tmp_dir.name,
python_bin=PY_INTERPRETER,
system_site_packages=self.py_system_site_packages,
requirements=self.py_requirements,
)
py_interpreter = self._get_or_create_venv_py_interpreter()
dbt_binary = Path(py_interpreter).parent / "dbt"
cmd_output = self.subprocess_hook.run_command(
[
Expand All @@ -97,6 +94,31 @@ def execute(self, context: Context) -> None:
self._venv_tmp_dir.cleanup()
logger.info(output)

def _get_or_create_venv_py_interpreter(self) -> str:
if self._venv_dir is not None:
py_interpreter_path = Path(f"{self._venv_dir}/bin/python")

self.log.info(f"Checking for venv interpreter: {py_interpreter_path} : {py_interpreter_path.is_file()}")
if py_interpreter_path.is_file():

self.log.info(f"Found Python interpreter in cached virtualenv: `{str(py_interpreter_path)}`")
return str(py_interpreter_path)

self.log.info(f"Creating virtualenv at `{self._venv_dir}")
venv_directory = str(self._venv_dir)

else:
self.log.info(f"Creating temporary virtualenv")
self._venv_tmp_dir = TemporaryDirectory(prefix="cosmos-venv")
venv_directory = self._venv_tmp_dir.name

return prepare_virtualenv(
venv_directory=venv_directory,
python_bin=PY_INTERPRETER,
system_site_packages=self.py_system_site_packages,
requirements=self.py_requirements,
)


class DbtLSVirtualenvOperator(DbtVirtualenvBaseOperator, DbtLSLocalOperator):
"""
Expand Down
4 changes: 4 additions & 0 deletions dev/dags/example_virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
from datetime import datetime
from pathlib import Path
from airflow.configuration import get_airflow_home

from cosmos import DbtDag, ExecutionMode, ExecutionConfig, ProjectConfig, ProfileConfig
from cosmos.profiles import PostgresUserPasswordProfileMapping
Expand All @@ -30,6 +31,9 @@
profile_config=profile_config,
execution_config=ExecutionConfig(
execution_mode=ExecutionMode.VIRTUALENV,
# We can enable this flag if we want Airflow to create one virtualenv
# and reuse that within the whole DAG.
# virtualenv_dir=f"{get_airflow_home()}/persistent-venv",
),
operator_args={
"py_system_site_packages": False,
Expand Down

0 comments on commit 0364ea3

Please sign in to comment.