Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support ProjectConfig.dbt_project_path = None & different paths for Rendering and Execution #634

Merged
merged 10 commits into from
Nov 3, 2023
Merged
31 changes: 25 additions & 6 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import contextlib
import tempfile
from dataclasses import dataclass, field
from dataclasses import InitVar, dataclass, field
from pathlib import Path
from typing import Any, Iterator, Callable

Expand All @@ -31,6 +31,9 @@ class RenderConfig:
:param select: A list of dbt select arguments (e.g. 'config.materialized:incremental')
:param exclude: A list of dbt exclude arguments (e.g. 'tag:nightly')
:param dbt_deps: Configure to run dbt deps when using dbt ls for dag parsing
:param node_converters: a dictionary mapping a ``DbtResourceType`` into a callable. Users can control how to render dbt nodes in Airflow. Only supported when using ``load_method=LoadMode.DBT_MANIFEST`` or ``LoadMode.DBT_LS``.
:param dbt_executable_path: The path to the dbt executable for dag generation. Defaults to dbt if available on the path. Mutually Exclusive with ProjectConfig.dbt_project_path
:param dbt_project_path Configures the DBT project location accessible on the airflow controller for DAG rendering - Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM``
"""

emit_datasets: bool = True
Expand All @@ -40,6 +43,13 @@ class RenderConfig:
exclude: list[str] = field(default_factory=list)
dbt_deps: bool = True
node_converters: dict[DbtResourceType, Callable[..., Any]] | None = None
dbt_executable_path: str | Path = get_system_dbt()
dbt_project_path: InitVar[str | Path | None] = None

project_path: Path | None = field(init=False)
tatiana marked this conversation as resolved.
Show resolved Hide resolved

def __post_init__(self, dbt_project_path: str | Path | None) -> None:
self.project_path = Path(dbt_project_path) if dbt_project_path else None


class ProjectConfig:
Expand Down Expand Up @@ -72,11 +82,13 @@ def __init__(
manifest_path: str | Path | None = None,
project_name: str | None = None,
):
# Since we allow dbt_project_path to be defined in ExecutionConfig and RenderConfig
# dbt_project_path may not always be defined here.
# We do, however, still require that both manifest_path and project_name be defined, or neither be defined.
if not dbt_project_path:
if not manifest_path or not project_name:
if manifest_path and not project_name or project_name and not manifest_path:
raise CosmosValueError(
"ProjectConfig requires dbt_project_path and/or manifest_path to be defined."
" If only manifest_path is defined, project_name must also be defined."
"If ProjectConfig.dbt_project_path is not defined, ProjectConfig.manifest_path and ProjectConfig.project_name must be defined together, or both left undefined."
)
if project_name:
self.project_name = project_name
Expand Down Expand Up @@ -210,10 +222,17 @@ class ExecutionConfig:

:param execution_mode: The execution mode for dbt. Defaults to local
:param test_indirect_selection: The mode to configure the test behavior when performing indirect selection.
:param dbt_executable_path: The path to the dbt executable. Defaults to dbt if
available on the path.
:param dbt_executable_path: The path to the dbt executable for runtime execution. Defaults to dbt if available on the path.
:param dbt_project_path Configures the DBT project location accessible at runtime for dag execution. This is the project path in a docker container for ExecutionMode.DOCKER or ExecutionMode.KUBERNETES. Mutually Exclusive with ProjectConfig.dbt_project_path
"""

execution_mode: ExecutionMode = ExecutionMode.LOCAL
test_indirect_selection: TestIndirectSelection = TestIndirectSelection.EAGER
dbt_executable_path: str | Path = get_system_dbt()

dbt_project_path: InitVar[str | Path | None] = None

project_path: Path | None = field(init=False)

def __post_init__(self, dbt_project_path: str | Path | None) -> None:
self.project_path = Path(dbt_project_path) if dbt_project_path else None
77 changes: 46 additions & 31 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from cosmos.exceptions import CosmosValueError
from cosmos.log import get_logger


logger = get_logger(__name__)


Expand Down Expand Up @@ -92,8 +91,8 @@ def __init__(
self,
project_config: ProjectConfig,
profile_config: ProfileConfig,
execution_config: ExecutionConfig = ExecutionConfig(),
render_config: RenderConfig = RenderConfig(),
execution_config: ExecutionConfig | None = None,
render_config: RenderConfig | None = None,
dag: DAG | None = None,
task_group: TaskGroup | None = None,
operator_args: dict[str, Any] | None = None,
Expand All @@ -103,19 +102,37 @@ def __init__(
) -> None:
project_config.validate_project()

emit_datasets = render_config.emit_datasets
test_behavior = render_config.test_behavior
select = render_config.select
exclude = render_config.exclude
dbt_deps = render_config.dbt_deps
execution_mode = execution_config.execution_mode
test_indirect_selection = execution_config.test_indirect_selection
load_mode = render_config.load_method
dbt_executable_path = execution_config.dbt_executable_path
node_converters = render_config.node_converters

if not project_config.dbt_project_path:
raise CosmosValueError("A Project Path in ProjectConfig is required for generating a Task Operators.")
if not execution_config:
execution_config = ExecutionConfig()
if not render_config:
render_config = RenderConfig()

# Since we now support both project_config.dbt_project_path, render_config.project_path and execution_config.project_path
# We need to ensure that only one interface is being used.
if project_config.dbt_project_path and (render_config.project_path or execution_config.project_path):
raise CosmosValueError(
"ProjectConfig.dbt_project_path is mutually exclusive with RenderConfig.dbt_project_path and ExecutionConfig.dbt_project_path."
+ "If using RenderConfig.dbt_project_path or ExecutionConfig.dbt_project_path, ProjectConfig.dbt_project_path should be None"
)

# If we are using the old interface, we should migrate it to the new interface
# This is safe to do now since we have validated which config interface we're using
if project_config.dbt_project_path:
render_config.project_path = project_config.dbt_project_path
execution_config.project_path = project_config.dbt_project_path

# At this point, execution_config.project_path should always be non-null
if not execution_config.project_path:
raise CosmosValueError(
"ExecutionConfig.dbt_project_path is required for the execution of dbt tasks in all execution modes."
)

# We now have a guaranteed execution_config.project_path, but still need to process render_config.project_path
# We require render_config.project_path when we dont have a manifest
if not project_config.manifest_path and not render_config.project_path:
raise CosmosValueError(
"RenderConfig.dbt_project_path is required for rendering an airflow DAG from a DBT Graph if no manifest is provided."
)

profile_args = {}
if profile_config.profile_mapping:
Expand All @@ -136,36 +153,34 @@ def __init__(
# We may want to consider defaulting this value in our actual ProjceConfig class?
dbt_graph = DbtGraph(
project=project_config,
exclude=exclude,
select=select,
dbt_cmd=dbt_executable_path,
render_config=render_config,
execution_config=execution_config,
dbt_cmd=render_config.dbt_executable_path,
profile_config=profile_config,
operator_args=operator_args,
dbt_deps=dbt_deps,
)
dbt_graph.load(method=load_mode, execution_mode=execution_mode)
dbt_graph.load(method=render_config.load_method, execution_mode=execution_config.execution_mode)

task_args = {
**operator_args,
# the following args may be only needed for local / venv:
"project_dir": project_config.dbt_project_path,
"project_dir": execution_config.project_path,
"profile_config": profile_config,
"emit_datasets": emit_datasets,
"emit_datasets": render_config.emit_datasets,
}
if dbt_executable_path:
task_args["dbt_executable_path"] = dbt_executable_path
if execution_config.dbt_executable_path:
task_args["dbt_executable_path"] = execution_config.dbt_executable_path

validate_arguments(select, exclude, profile_args, task_args)
validate_arguments(render_config.select, render_config.exclude, profile_args, task_args)

build_airflow_graph(
nodes=dbt_graph.filtered_nodes,
dag=dag or (task_group and task_group.dag),
task_group=task_group,
execution_mode=execution_mode,
execution_mode=execution_config.execution_mode,
task_args=task_args,
test_behavior=test_behavior,
test_indirect_selection=test_indirect_selection,
test_behavior=render_config.test_behavior,
test_indirect_selection=execution_config.test_indirect_selection,
dbt_project_name=project_config.project_name,
on_warning_callback=on_warning_callback,
node_converters=node_converters,
node_converters=render_config.node_converters,
)
Loading
Loading