Skip to content

Commit

Permalink
Merge branch 'main' into issue_639/propagate_logs_config
Browse files Browse the repository at this point in the history
  • Loading branch information
agreenburg authored Nov 3, 2023
2 parents b9f3ed0 + b64eb9a commit 416f4fd
Show file tree
Hide file tree
Showing 10 changed files with 329 additions and 131 deletions.
31 changes: 25 additions & 6 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import contextlib
import tempfile
from dataclasses import dataclass, field
from dataclasses import InitVar, dataclass, field
from pathlib import Path
from typing import Any, Iterator, Callable

Expand All @@ -31,6 +31,9 @@ class RenderConfig:
:param select: A list of dbt select arguments (e.g. 'config.materialized:incremental')
:param exclude: A list of dbt exclude arguments (e.g. 'tag:nightly')
:param dbt_deps: Configure to run dbt deps when using dbt ls for dag parsing
:param node_converters: a dictionary mapping a ``DbtResourceType`` into a callable. Users can control how to render dbt nodes in Airflow. Only supported when using ``load_method=LoadMode.DBT_MANIFEST`` or ``LoadMode.DBT_LS``.
:param dbt_executable_path: The path to the dbt executable for dag generation. Defaults to dbt if available on the path. Mutually Exclusive with ProjectConfig.dbt_project_path
:param dbt_project_path Configures the DBT project location accessible on the airflow controller for DAG rendering - Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM``
"""

emit_datasets: bool = True
Expand All @@ -40,6 +43,13 @@ class RenderConfig:
exclude: list[str] = field(default_factory=list)
dbt_deps: bool = True
node_converters: dict[DbtResourceType, Callable[..., Any]] | None = None
dbt_executable_path: str | Path = get_system_dbt()
dbt_project_path: InitVar[str | Path | None] = None

project_path: Path | None = field(init=False)

def __post_init__(self, dbt_project_path: str | Path | None) -> None:
self.project_path = Path(dbt_project_path) if dbt_project_path else None


class ProjectConfig:
Expand Down Expand Up @@ -72,11 +82,13 @@ def __init__(
manifest_path: str | Path | None = None,
project_name: str | None = None,
):
# Since we allow dbt_project_path to be defined in ExecutionConfig and RenderConfig
# dbt_project_path may not always be defined here.
# We do, however, still require that both manifest_path and project_name be defined, or neither be defined.
if not dbt_project_path:
if not manifest_path or not project_name:
if manifest_path and not project_name or project_name and not manifest_path:
raise CosmosValueError(
"ProjectConfig requires dbt_project_path and/or manifest_path to be defined."
" If only manifest_path is defined, project_name must also be defined."
"If ProjectConfig.dbt_project_path is not defined, ProjectConfig.manifest_path and ProjectConfig.project_name must be defined together, or both left undefined."
)
if project_name:
self.project_name = project_name
Expand Down Expand Up @@ -210,10 +222,17 @@ class ExecutionConfig:
:param execution_mode: The execution mode for dbt. Defaults to local
:param test_indirect_selection: The mode to configure the test behavior when performing indirect selection.
:param dbt_executable_path: The path to the dbt executable. Defaults to dbt if
available on the path.
:param dbt_executable_path: The path to the dbt executable for runtime execution. Defaults to dbt if available on the path.
:param dbt_project_path Configures the DBT project location accessible at runtime for dag execution. This is the project path in a docker container for ExecutionMode.DOCKER or ExecutionMode.KUBERNETES. Mutually Exclusive with ProjectConfig.dbt_project_path
"""

execution_mode: ExecutionMode = ExecutionMode.LOCAL
test_indirect_selection: TestIndirectSelection = TestIndirectSelection.EAGER
dbt_executable_path: str | Path = get_system_dbt()

dbt_project_path: InitVar[str | Path | None] = None

project_path: Path | None = field(init=False)

def __post_init__(self, dbt_project_path: str | Path | None) -> None:
self.project_path = Path(dbt_project_path) if dbt_project_path else None
77 changes: 46 additions & 31 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from cosmos.exceptions import CosmosValueError
from cosmos.log import get_logger


logger = get_logger(__name__)


Expand Down Expand Up @@ -92,8 +91,8 @@ def __init__(
self,
project_config: ProjectConfig,
profile_config: ProfileConfig,
execution_config: ExecutionConfig = ExecutionConfig(),
render_config: RenderConfig = RenderConfig(),
execution_config: ExecutionConfig | None = None,
render_config: RenderConfig | None = None,
dag: DAG | None = None,
task_group: TaskGroup | None = None,
operator_args: dict[str, Any] | None = None,
Expand All @@ -103,19 +102,37 @@ def __init__(
) -> None:
project_config.validate_project()

emit_datasets = render_config.emit_datasets
test_behavior = render_config.test_behavior
select = render_config.select
exclude = render_config.exclude
dbt_deps = render_config.dbt_deps
execution_mode = execution_config.execution_mode
test_indirect_selection = execution_config.test_indirect_selection
load_mode = render_config.load_method
dbt_executable_path = execution_config.dbt_executable_path
node_converters = render_config.node_converters

if not project_config.dbt_project_path:
raise CosmosValueError("A Project Path in ProjectConfig is required for generating a Task Operators.")
if not execution_config:
execution_config = ExecutionConfig()
if not render_config:
render_config = RenderConfig()

# Since we now support both project_config.dbt_project_path, render_config.project_path and execution_config.project_path
# We need to ensure that only one interface is being used.
if project_config.dbt_project_path and (render_config.project_path or execution_config.project_path):
raise CosmosValueError(
"ProjectConfig.dbt_project_path is mutually exclusive with RenderConfig.dbt_project_path and ExecutionConfig.dbt_project_path."
+ "If using RenderConfig.dbt_project_path or ExecutionConfig.dbt_project_path, ProjectConfig.dbt_project_path should be None"
)

# If we are using the old interface, we should migrate it to the new interface
# This is safe to do now since we have validated which config interface we're using
if project_config.dbt_project_path:
render_config.project_path = project_config.dbt_project_path
execution_config.project_path = project_config.dbt_project_path

# At this point, execution_config.project_path should always be non-null
if not execution_config.project_path:
raise CosmosValueError(
"ExecutionConfig.dbt_project_path is required for the execution of dbt tasks in all execution modes."
)

# We now have a guaranteed execution_config.project_path, but still need to process render_config.project_path
# We require render_config.project_path when we dont have a manifest
if not project_config.manifest_path and not render_config.project_path:
raise CosmosValueError(
"RenderConfig.dbt_project_path is required for rendering an airflow DAG from a DBT Graph if no manifest is provided."
)

profile_args = {}
if profile_config.profile_mapping:
Expand All @@ -136,36 +153,34 @@ def __init__(
# We may want to consider defaulting this value in our actual ProjceConfig class?
dbt_graph = DbtGraph(
project=project_config,
exclude=exclude,
select=select,
dbt_cmd=dbt_executable_path,
render_config=render_config,
execution_config=execution_config,
dbt_cmd=render_config.dbt_executable_path,
profile_config=profile_config,
operator_args=operator_args,
dbt_deps=dbt_deps,
)
dbt_graph.load(method=load_mode, execution_mode=execution_mode)
dbt_graph.load(method=render_config.load_method, execution_mode=execution_config.execution_mode)

task_args = {
**operator_args,
# the following args may be only needed for local / venv:
"project_dir": project_config.dbt_project_path,
"project_dir": execution_config.project_path,
"profile_config": profile_config,
"emit_datasets": emit_datasets,
"emit_datasets": render_config.emit_datasets,
}
if dbt_executable_path:
task_args["dbt_executable_path"] = dbt_executable_path
if execution_config.dbt_executable_path:
task_args["dbt_executable_path"] = execution_config.dbt_executable_path

validate_arguments(select, exclude, profile_args, task_args)
validate_arguments(render_config.select, render_config.exclude, profile_args, task_args)

build_airflow_graph(
nodes=dbt_graph.filtered_nodes,
dag=dag or (task_group and task_group.dag),
task_group=task_group,
execution_mode=execution_mode,
execution_mode=execution_config.execution_mode,
task_args=task_args,
test_behavior=test_behavior,
test_indirect_selection=test_indirect_selection,
test_behavior=render_config.test_behavior,
test_indirect_selection=execution_config.test_indirect_selection,
dbt_project_name=project_config.project_name,
on_warning_callback=on_warning_callback,
node_converters=node_converters,
node_converters=render_config.node_converters,
)
Loading

0 comments on commit 416f4fd

Please sign in to comment.