Skip to content

Commit

Permalink
Support no profile_config for K8s & Docker (#721)
Browse files Browse the repository at this point in the history
Make `profile_config` optional for users using ExecutionMode.KUBERTES
and DOCKER.

Solve the issue raised by @david-mag in the #airflow-dbt slack channel:
https://apache-airflow.slack.com/archives/C059CC42E9W/p1700814078752829
  • Loading branch information
tatiana authored Dec 5, 2023
1 parent e1f34ea commit 32ccc6b
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 21 deletions.
76 changes: 56 additions & 20 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,59 @@ def validate_arguments(
profile_config.validate_profiles_yml()


def validate_initial_user_config(
execution_config: ExecutionConfig | None,
profile_config: ProfileConfig | None,
project_config: ProjectConfig,
render_config: RenderConfig | None,
):
"""
Validates if the user set the fields as expected.
:param execution_config: Configuration related to how to run dbt in Airflow tasks
:param profile_config: Configuration related to dbt database configuration (profile)
:param project_config: Configuration related to the overall dbt project
:param render_config: Configuration related to how to convert the dbt workflow into an Airflow DAG
"""
if profile_config is None and execution_config.execution_mode not in (
ExecutionMode.KUBERNETES,
ExecutionMode.DOCKER,
):
raise CosmosValueError(f"The profile_config is mandatory when using {execution_config.execution_mode}")

# Since we now support both project_config.dbt_project_path, render_config.project_path and execution_config.project_path
# We need to ensure that only one interface is being used.
if project_config.dbt_project_path and (render_config.project_path or execution_config.project_path):
raise CosmosValueError(
"ProjectConfig.dbt_project_path is mutually exclusive with RenderConfig.dbt_project_path and ExecutionConfig.dbt_project_path."
+ "If using RenderConfig.dbt_project_path or ExecutionConfig.dbt_project_path, ProjectConfig.dbt_project_path should be None"
)


def validate_adapted_user_config(
execution_config: ExecutionConfig | None, project_config: ProjectConfig, render_config: RenderConfig | None
):
"""
Validates if all the necessary fields required by Cosmos to render the DAG are set.
:param execution_config: Configuration related to how to run dbt in Airflow tasks
:param project_config: Configuration related to the overall dbt project
:param render_config: Configuration related to how to convert the dbt workflow into an Airflow DAG
"""
# At this point, execution_config.project_path should always be non-null
if not execution_config.project_path:
raise CosmosValueError(
"ExecutionConfig.dbt_project_path is required for the execution of dbt tasks in all execution modes."
)

# We now have a guaranteed execution_config.project_path, but still need to process render_config.project_path
# We require render_config.project_path when we dont have a manifest
if not project_config.manifest_path and not render_config.project_path:
raise CosmosValueError(
"RenderConfig.dbt_project_path is required for rendering an airflow DAG from a DBT Graph if no manifest is provided."
)


class DbtToAirflowConverter:
"""
Logic common to build an Airflow DbtDag and DbtTaskGroup from a DBT project.
Expand All @@ -101,7 +154,7 @@ class DbtToAirflowConverter:
def __init__(
self,
project_config: ProjectConfig,
profile_config: ProfileConfig,
profile_config: ProfileConfig | None = None,
execution_config: ExecutionConfig | None = None,
render_config: RenderConfig | None = None,
dag: DAG | None = None,
Expand All @@ -118,13 +171,7 @@ def __init__(
if not render_config:
render_config = RenderConfig()

# Since we now support both project_config.dbt_project_path, render_config.project_path and execution_config.project_path
# We need to ensure that only one interface is being used.
if project_config.dbt_project_path and (render_config.project_path or execution_config.project_path):
raise CosmosValueError(
"ProjectConfig.dbt_project_path is mutually exclusive with RenderConfig.dbt_project_path and ExecutionConfig.dbt_project_path."
+ "If using RenderConfig.dbt_project_path or ExecutionConfig.dbt_project_path, ProjectConfig.dbt_project_path should be None"
)
validate_initial_user_config(execution_config, profile_config, project_config, render_config)

# If we are using the old interface, we should migrate it to the new interface
# This is safe to do now since we have validated which config interface we're using
Expand All @@ -136,18 +183,7 @@ def __init__(
render_config.project_path = project_config.dbt_project_path
execution_config.project_path = project_config.dbt_project_path

# At this point, execution_config.project_path should always be non-null
if not execution_config.project_path:
raise CosmosValueError(
"ExecutionConfig.dbt_project_path is required for the execution of dbt tasks in all execution modes."
)

# We now have a guaranteed execution_config.project_path, but still need to process render_config.project_path
# We require render_config.project_path when we dont have a manifest
if not project_config.manifest_path and not render_config.project_path:
raise CosmosValueError(
"RenderConfig.dbt_project_path is required for rendering an airflow DAG from a DBT Graph if no manifest is provided."
)
validate_adapted_user_config(execution_config, project_config, render_config)

if not operator_args:
operator_args = {}
Expand Down
27 changes: 26 additions & 1 deletion tests/test_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import pytest
from airflow.models import DAG

from cosmos.converter import DbtToAirflowConverter, validate_arguments
from cosmos.converter import DbtToAirflowConverter, validate_arguments, validate_initial_user_config
from cosmos.constants import DbtResourceType, ExecutionMode
from cosmos.config import ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig, CosmosConfigException
from cosmos.dbt.graph import DbtNode
Expand Down Expand Up @@ -35,6 +35,31 @@ def test_validate_arguments_tags(argument_key):
assert err.value.args[0] == expected


@pytest.mark.parametrize(
"execution_mode",
(ExecutionMode.LOCAL, ExecutionMode.VIRTUALENV),
)
def test_validate_initial_user_config_no_profile(execution_mode):
execution_config = ExecutionConfig(execution_mode=execution_mode)
profile_config = None
project_config = ProjectConfig()
with pytest.raises(CosmosValueError) as err_info:
validate_initial_user_config(execution_config, profile_config, project_config, None)
err_msg = f"The profile_config is mandatory when using {execution_mode}"
assert err_info.value.args[0] == err_msg


@pytest.mark.parametrize(
"execution_mode",
(ExecutionMode.DOCKER, ExecutionMode.KUBERNETES),
)
def test_validate_initial_user_config_expects_profile(execution_mode):
execution_config = ExecutionConfig(execution_mode=execution_mode)
profile_config = None
project_config = ProjectConfig()
assert validate_initial_user_config(execution_config, profile_config, project_config, None) is None


def test_validate_arguments_schema_in_task_args():
profile_config = ProfileConfig(
profile_name="test",
Expand Down

0 comments on commit 32ccc6b

Please sign in to comment.