Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for virtual env directory flag #611

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,15 +288,20 @@ class ExecutionConfig:

:param execution_mode: The execution mode for dbt. Defaults to local
:param test_indirect_selection: The mode to configure the test behavior when performing indirect selection.
:param dbt_executable_path: The path to the dbt executable for runtime execution. Defaults to dbt if available on the path.
:param dbt_project_path Configures the DBT project location accessible at runtime for dag execution. This is the project path in a docker container for ExecutionMode.DOCKER or ExecutionMode.KUBERNETES. Mutually Exclusive with ProjectConfig.dbt_project_path
:param dbt_executable_path: The path to the dbt executable. Defaults to dbt if
available on the path.
:param virtualenv_dir: Directory path to locate the (cached) virtual env that
should be used for execution when execution mode is set to `ExecutionMode.VIRTUALENV`
"""

execution_mode: ExecutionMode = ExecutionMode.LOCAL
test_indirect_selection: TestIndirectSelection = TestIndirectSelection.EAGER
dbt_executable_path: str | Path = field(default_factory=get_system_dbt)

dbt_project_path: InitVar[str | Path | None] = None
virtualenv_dir: str | Path | None = None
LennartKloppenburg marked this conversation as resolved.
Show resolved Hide resolved

project_path: Path | None = field(init=False)

def __post_init__(self, dbt_project_path: str | Path | None) -> None:
Expand Down
29 changes: 28 additions & 1 deletion cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from cosmos.constants import ExecutionMode
from cosmos.dbt.graph import DbtGraph
from cosmos.dbt.selector import retrieve_by_label
from cosmos.constants import ExecutionMode
from cosmos.config import ProjectConfig, ExecutionConfig, RenderConfig, ProfileConfig
from cosmos.exceptions import CosmosValueError
from cosmos.log import get_logger
Expand Down Expand Up @@ -217,12 +218,36 @@ def __init__(
# If we are using the old interface, we should migrate it to the new interface
# This is safe to do now since we have validated which config interface we're using
if project_config.dbt_project_path:
execution_config, render_config = migrate_to_new_interface(execution_config, project_config, render_config)
# We copy the configuration so the change does not affect other DAGs or TaskGroups
# that may reuse the same original configuration
render_config = copy.deepcopy(render_config)
execution_config = copy.deepcopy(execution_config)
render_config.project_path = project_config.dbt_project_path
execution_config.project_path = project_config.dbt_project_path

validate_adapted_user_config(execution_config, project_config, render_config)

env_vars = project_config.env_vars or operator_args.pop("env", None)
dbt_vars = project_config.dbt_vars or operator_args.pop("vars", None)
# We now have a guaranteed execution_config.project_path, but still need to process render_config.project_path
# We require render_config.project_path when we dont have a manifest
if not project_config.manifest_path and not render_config.project_path:
raise CosmosValueError(
"RenderConfig.dbt_project_path is required for rendering an airflow DAG from a DBT Graph if no manifest is provided."
)

if execution_config.execution_mode != ExecutionMode.VIRTUALENV and execution_config.virtualenv_dir is not None:
logger.warning(
"`ExecutionConfig.virtualenv_dir` is only supported when \
ExecutionConfig.execution_mode is set to ExecutionMode.VIRTUALENV."
)
LennartKloppenburg marked this conversation as resolved.
Show resolved Hide resolved

profile_args = {}
if profile_config.profile_mapping:
profile_args = profile_config.profile_mapping.profile_args

if not operator_args:
operator_args = {}

# Previously, we were creating a cosmos.dbt.project.DbtProject
# DbtProject has now been replaced with ProjectConfig directly
Expand Down Expand Up @@ -261,6 +286,8 @@ def __init__(
task_args,
execution_mode=execution_config.execution_mode,
)
if execution_config.execution_mode == ExecutionMode.VIRTUALENV and execution_config.virtualenv_dir is not None:
task_args["virtualenv_dir"] = execution_config.virtualenv_dir

build_airflow_graph(
nodes=dbt_graph.filtered_nodes,
Expand Down
32 changes: 22 additions & 10 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import itertools
import json
import os
import shutil
import tempfile
import yaml
from dataclasses import dataclass, field
Expand All @@ -21,6 +22,7 @@
ExecutionMode,
LoadMode,
)
from cosmos.dbt.executable import get_system_dbt
from cosmos.dbt.parser.project import LegacyDbtProject
from cosmos.dbt.project import create_symlinks, environ
from cosmos.dbt.selector import select_nodes
Expand Down Expand Up @@ -124,6 +126,15 @@ class DbtGraph:
Supports different ways of loading the `dbt` project into this representation.

Different loading methods can result in different `nodes` and `filtered_nodes`.

Example of how to use:

dbt_graph = DbtGraph(
project=ProjectConfig(dbt_project_path=DBT_PROJECT_PATH),
render_config=RenderConfig(exclude=["*orders*"], select=[]),
dbt_cmd="/usr/local/bin/dbt"
)
dbt_graph.load(method=LoadMode.DBT_LS, execution_mode=ExecutionMode.LOCAL)
"""

nodes: dict[str, DbtNode] = dict()
Expand All @@ -137,12 +148,16 @@ def __init__(
profile_config: ProfileConfig | None = None,
# dbt_vars only supported for LegacyDbtProject
dbt_vars: dict[str, str] | None = None,
dbt_cmd: str = get_system_dbt(),
operator_args: dict[str, Any] | None = None,
):
self.project = project
self.render_config = render_config
self.profile_config = profile_config
self.execution_config = execution_config
self.dbt_vars = dbt_vars or {}
self.operator_args = operator_args or {}
self.dbt_cmd = dbt_cmd

def load(
self,
Expand Down Expand Up @@ -181,11 +196,9 @@ def load(
else:
load_method[method]()

def run_dbt_ls(
self, dbt_cmd: str, project_path: Path, tmp_dir: Path, env_vars: dict[str, str]
) -> dict[str, DbtNode]:
def run_dbt_ls(self, project_path: Path, tmp_dir: Path, env_vars: dict[str, str]) -> dict[str, DbtNode]:
"""Runs dbt ls command and returns the parsed nodes."""
ls_command = [dbt_cmd, "ls", "--output", "json"]
ls_command = [self.dbt_cmd, "ls", "--output", "json"]

if self.render_config.exclude:
ls_command.extend(["--exclude", *self.render_config.exclude])
Expand Down Expand Up @@ -223,10 +236,6 @@ def load_via_dbt_ls(self) -> None:
* self.nodes
* self.filtered_nodes
"""
self.render_config.validate_dbt_command(fallback_cmd=self.execution_config.dbt_executable_path)
dbt_cmd = self.render_config.dbt_executable_path
dbt_cmd = dbt_cmd.as_posix() if isinstance(dbt_cmd, Path) else dbt_cmd

logger.info(f"Trying to parse the dbt project in `{self.render_config.project_path}` using dbt ls...")
if not self.render_config.project_path or not self.execution_config.project_path:
raise CosmosLoadDbtException(
Expand All @@ -236,6 +245,9 @@ def load_via_dbt_ls(self) -> None:
if not self.profile_config:
raise CosmosLoadDbtException("Unable to load project via dbt ls without a profile config.")

if not shutil.which(self.dbt_cmd):
raise CosmosLoadDbtException(f"Unable to find the dbt executable: {self.dbt_cmd}")

with tempfile.TemporaryDirectory() as tmpdir:
logger.info(
f"Content of the dbt project dir {self.render_config.project_path}: `{os.listdir(self.render_config.project_path)}`"
Expand Down Expand Up @@ -266,12 +278,12 @@ def load_via_dbt_ls(self) -> None:
env[DBT_TARGET_PATH_ENVVAR] = str(self.target_dir)

if self.render_config.dbt_deps:
deps_command = [dbt_cmd, "deps"]
deps_command = [self.dbt_cmd, "deps"]
deps_command.extend(self.local_flags)
stdout = run_command(deps_command, tmpdir_path, env)
logger.debug("dbt deps output: %s", stdout)

nodes = self.run_dbt_ls(dbt_cmd, self.execution_config.project_path, tmpdir_path, env)
nodes = self.run_dbt_ls(self.execution_config.project_path, tmpdir_path, env)

self.nodes = nodes
self.filtered_nodes = nodes
Expand Down
119 changes: 109 additions & 10 deletions cosmos/operators/virtualenv.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
from __future__ import annotations

import os
import psutil
import time
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Callable

from airflow.compat.functools import cached_property
from airflow.utils.python_virtualenv import prepare_virtualenv
from cosmos.hooks.subprocess import FullOutputSubprocessResult
from cosmos.exceptions import CosmosValueError

from cosmos.log import get_logger
from cosmos.operators.local import (
Expand All @@ -29,6 +33,16 @@
PY_INTERPRETER = "python3"


def depends_on_virtualenv_dir(method: Callable[[Any], Any]) -> Callable[[Any], Any]:
def wrapper(operator: DbtVirtualenvBaseOperator, *args: Any) -> None:
if operator.virtualenv_dir is None:
raise CosmosValueError(f"Method relies on value of parameter `virtualenv_dir` which is None.")

method(operator, *args)

return wrapper


class DbtVirtualenvBaseOperator(DbtLocalBaseOperator):
"""
Executes a dbt core cli command within a Python Virtual Environment, that is created before running the dbt command
Expand All @@ -41,15 +55,19 @@ class DbtVirtualenvBaseOperator(DbtLocalBaseOperator):
Avoid using unless the dbt job requires it.
"""

template_fields = DbtLocalBaseOperator.template_fields + ("virtualenv_dir",) # type: ignore[operator]

def __init__(
self,
py_requirements: list[str] | None = None,
py_system_site_packages: bool = False,
virtualenv_dir: Path | None = None,
**kwargs: Any,
) -> None:
self.py_requirements = py_requirements or []
self.py_system_site_packages = py_system_site_packages
super().__init__(**kwargs)
self.virtualenv_dir = virtualenv_dir
self._venv_tmp_dir: None | TemporaryDirectory[str] = None

@cached_property
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A general thought: do we still want to cache this property? Is there any risk that we could end up caching the incorrect path?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this property cached? If people are debugging or want to pass in more dynamically configured directories, I don't know how this decorator behaves :) Is it per task_id per dagrun_id or is it more persistent?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The property is cached while the Python process is alive.

Expand All @@ -59,19 +77,14 @@ def venv_dbt_path(
"""
Path to the dbt binary within a Python virtualenv.

The first time this property is called, it creates a virtualenv and installs the dependencies based on the
self.py_requirements and self.py_system_site_packages. This value is cached for future calls.
The first time this property is called, it creates a new/temporary and installs the dependencies
based on the self.py_requirements and self.py_system_site_packages, or retrieves an existing virtualenv.
This value is cached for future calls.
"""
# We are reusing the virtualenv directory for all subprocess calls within this task/operator.
# For this reason, we are not using contexts at this point.
# The deletion of this directory is done explicitly at the end of the `execute` method.
self._venv_tmp_dir = TemporaryDirectory(prefix="cosmos-venv")
py_interpreter = prepare_virtualenv(
venv_directory=self._venv_tmp_dir.name,
python_bin=PY_INTERPRETER,
system_site_packages=self.py_system_site_packages,
requirements=self.py_requirements,
)
py_interpreter = self._get_or_create_venv_py_interpreter()
dbt_binary = Path(py_interpreter).parent / "dbt"
cmd_output = self.subprocess_hook.run_command(
[
Expand All @@ -97,6 +110,92 @@ def execute(self, context: Context) -> None:
self._venv_tmp_dir.cleanup()
logger.info(output)

def _get_or_create_venv_py_interpreter(self) -> str:
"""Helper method that parses virtual env configuration
and returns a DBT binary within the resulting virtualenv"""

# No virtualenv_dir set, so revert to making a temporary virtualenv
if self.virtualenv_dir is None:
self.log.info("Creating temporary virtualenv")
self._venv_tmp_dir = TemporaryDirectory(prefix="cosmos-venv")

return prepare_virtualenv(
venv_directory=self._venv_tmp_dir.name,
python_bin=PY_INTERPRETER,
system_site_packages=self.py_system_site_packages,
requirements=self.py_requirements,
)

self.log.info(f"Checking if {str(self.__lock_file)} exists")
while not self._is_lock_available():
self.log.info("Waiting for lock to release")
time.sleep(1)

self.log.info(f"Creating virtualenv at `{self.virtualenv_dir}")
self.log.info(f"Acquiring available lock")
self.__acquire_venv_lock()

py_bin = prepare_virtualenv(
venv_directory=str(self.virtualenv_dir),
python_bin=PY_INTERPRETER,
system_site_packages=self.py_system_site_packages,
requirements=self.py_requirements,
)

self.log.info("Releasing lock")
self.__release_venv_lock()

return py_bin

@property
def __lock_file(self) -> Path:
return Path(f"{self.virtualenv_dir}/LOCK")

@property
def _pid(self) -> int:
return os.getpid()

# @depends_on_virtualenv_dir
def _is_lock_available(self) -> bool:
if self.__lock_file.is_file():
with open(self.__lock_file) as lf:
pid = int(lf.read())

self.log.info(f"Checking for running process with PID {pid}")
try:
_process_running = psutil.Process(pid).is_running()
except psutil.NoSuchProcess:
_process_running = False

self.log.info(f"Process {pid} running: {_process_running}")
return not _process_running

return True

@depends_on_virtualenv_dir
def __acquire_venv_lock(self) -> None:
if not self.virtualenv_dir.is_dir(): # type: ignore
os.mkdir(str(self.virtualenv_dir))

with open(self.__lock_file, "w") as lf:
self.log.info(f"Acquiring lock at {self.__lock_file} with pid {str(self._pid)}")
lf.write(str(self._pid))

@depends_on_virtualenv_dir
def __release_venv_lock(self) -> None:
if not self.__lock_file.is_file():
self.log.warn(f"Lockfile {self.__lock_file} not found, perhaps deleted by other concurrent operator?")

return

with open(self.__lock_file) as lf:
lock_file_pid = int(lf.read())

if lock_file_pid == self._pid:
return self.__lock_file.unlink()

self.log.warn(f"Lockfile owned by process of pid {lock_file_pid}, while operator has pid {self._pid}")


class DbtLSVirtualenvOperator(DbtVirtualenvBaseOperator, DbtLSLocalOperator):
"""
Expand Down
Loading
Loading