diff --git a/cosmos/converter.py b/cosmos/converter.py index 2142cc6e4..45c95c2e7 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -83,6 +83,59 @@ def validate_arguments( profile_config.validate_profiles_yml() +def validate_initial_user_config( + execution_config: ExecutionConfig | None, + profile_config: ProfileConfig | None, + project_config: ProjectConfig, + render_config: RenderConfig | None, +): + """ + Validates if the user set the fields as expected. + + :param execution_config: Configuration related to how to run dbt in Airflow tasks + :param profile_config: Configuration related to dbt database configuration (profile) + :param project_config: Configuration related to the overall dbt project + :param render_config: Configuration related to how to convert the dbt workflow into an Airflow DAG + """ + if profile_config is None and execution_config.execution_mode not in ( + ExecutionMode.KUBERNETES, + ExecutionMode.DOCKER, + ): + raise CosmosValueError(f"The profile_config is mandatory when using {execution_config.execution_mode}") + + # Since we now support both project_config.dbt_project_path, render_config.project_path and execution_config.project_path + # We need to ensure that only one interface is being used. + if project_config.dbt_project_path and (render_config.project_path or execution_config.project_path): + raise CosmosValueError( + "ProjectConfig.dbt_project_path is mutually exclusive with RenderConfig.dbt_project_path and ExecutionConfig.dbt_project_path." + + "If using RenderConfig.dbt_project_path or ExecutionConfig.dbt_project_path, ProjectConfig.dbt_project_path should be None" + ) + + +def validate_adapted_user_config( + execution_config: ExecutionConfig | None, project_config: ProjectConfig, render_config: RenderConfig | None +): + """ + Validates if all the necessary fields required by Cosmos to render the DAG are set. + + :param execution_config: Configuration related to how to run dbt in Airflow tasks + :param project_config: Configuration related to the overall dbt project + :param render_config: Configuration related to how to convert the dbt workflow into an Airflow DAG + """ + # At this point, execution_config.project_path should always be non-null + if not execution_config.project_path: + raise CosmosValueError( + "ExecutionConfig.dbt_project_path is required for the execution of dbt tasks in all execution modes." + ) + + # We now have a guaranteed execution_config.project_path, but still need to process render_config.project_path + # We require render_config.project_path when we dont have a manifest + if not project_config.manifest_path and not render_config.project_path: + raise CosmosValueError( + "RenderConfig.dbt_project_path is required for rendering an airflow DAG from a DBT Graph if no manifest is provided." + ) + + class DbtToAirflowConverter: """ Logic common to build an Airflow DbtDag and DbtTaskGroup from a DBT project. @@ -101,7 +154,7 @@ class DbtToAirflowConverter: def __init__( self, project_config: ProjectConfig, - profile_config: ProfileConfig, + profile_config: ProfileConfig | None = None, execution_config: ExecutionConfig | None = None, render_config: RenderConfig | None = None, dag: DAG | None = None, @@ -118,13 +171,7 @@ def __init__( if not render_config: render_config = RenderConfig() - # Since we now support both project_config.dbt_project_path, render_config.project_path and execution_config.project_path - # We need to ensure that only one interface is being used. - if project_config.dbt_project_path and (render_config.project_path or execution_config.project_path): - raise CosmosValueError( - "ProjectConfig.dbt_project_path is mutually exclusive with RenderConfig.dbt_project_path and ExecutionConfig.dbt_project_path." - + "If using RenderConfig.dbt_project_path or ExecutionConfig.dbt_project_path, ProjectConfig.dbt_project_path should be None" - ) + validate_initial_user_config(execution_config, profile_config, project_config, render_config) # If we are using the old interface, we should migrate it to the new interface # This is safe to do now since we have validated which config interface we're using @@ -136,18 +183,7 @@ def __init__( render_config.project_path = project_config.dbt_project_path execution_config.project_path = project_config.dbt_project_path - # At this point, execution_config.project_path should always be non-null - if not execution_config.project_path: - raise CosmosValueError( - "ExecutionConfig.dbt_project_path is required for the execution of dbt tasks in all execution modes." - ) - - # We now have a guaranteed execution_config.project_path, but still need to process render_config.project_path - # We require render_config.project_path when we dont have a manifest - if not project_config.manifest_path and not render_config.project_path: - raise CosmosValueError( - "RenderConfig.dbt_project_path is required for rendering an airflow DAG from a DBT Graph if no manifest is provided." - ) + validate_adapted_user_config(execution_config, project_config, render_config) if not operator_args: operator_args = {} diff --git a/tests/test_converter.py b/tests/test_converter.py index 3bb5af163..c04da2c3a 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -6,7 +6,7 @@ import pytest from airflow.models import DAG -from cosmos.converter import DbtToAirflowConverter, validate_arguments +from cosmos.converter import DbtToAirflowConverter, validate_arguments, validate_initial_user_config from cosmos.constants import DbtResourceType, ExecutionMode from cosmos.config import ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig, CosmosConfigException from cosmos.dbt.graph import DbtNode @@ -35,6 +35,31 @@ def test_validate_arguments_tags(argument_key): assert err.value.args[0] == expected +@pytest.mark.parametrize( + "execution_mode", + (ExecutionMode.LOCAL, ExecutionMode.VIRTUALENV), +) +def test_validate_initial_user_config_no_profile(execution_mode): + execution_config = ExecutionConfig(execution_mode=execution_mode) + profile_config = None + project_config = ProjectConfig() + with pytest.raises(CosmosValueError) as err_info: + validate_initial_user_config(execution_config, profile_config, project_config, None) + err_msg = f"The profile_config is mandatory when using {execution_mode}" + assert err_info.value.args[0] == err_msg + + +@pytest.mark.parametrize( + "execution_mode", + (ExecutionMode.DOCKER, ExecutionMode.KUBERNETES), +) +def test_validate_initial_user_config_expects_profile(execution_mode): + execution_config = ExecutionConfig(execution_mode=execution_mode) + profile_config = None + project_config = ProjectConfig() + assert validate_initial_user_config(execution_config, profile_config, project_config, None) is None + + def test_validate_arguments_schema_in_task_args(): profile_config = ProfileConfig( profile_name="test",