Skip to content

Commit

Permalink
Make profiles_yml_path optional for ExecutionMode.DOCKER and `KUB…
Browse files Browse the repository at this point in the history
…ERNETES` (#681)

This PR moves validation of the `profiles_yml_path` to later in the dag
generation process such that additional context can be gathered from
`ExecutionConfig` to avoid failing unnecessarily when the file does not
exist.

Closes: #680
Closes: #656
  • Loading branch information
MrBones757 authored Nov 18, 2023
1 parent e23a445 commit d28d3df
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 23 deletions.
18 changes: 12 additions & 6 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ class RenderConfig:
:param exclude: A list of dbt exclude arguments (e.g. 'tag:nightly')
:param dbt_deps: Configure to run dbt deps when using dbt ls for dag parsing
:param node_converters: a dictionary mapping a ``DbtResourceType`` into a callable. Users can control how to render dbt nodes in Airflow. Only supported when using ``load_method=LoadMode.DBT_MANIFEST`` or ``LoadMode.DBT_LS``.
:param dbt_executable_path: The path to the dbt executable for dag generation. Defaults to dbt if available on the path. Mutually Exclusive with ProjectConfig.dbt_project_path
:param dbt_project_path Configures the DBT project location accessible on the airflow controller for DAG rendering - Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM``
:param dbt_executable_path: The path to the dbt executable for dag generation. Defaults to dbt if available on the path.
:param dbt_project_path Configures the DBT project location accessible on the airflow controller for DAG rendering. Mutually Exclusive with ProjectConfig.dbt_project_path. Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM``.
"""

emit_datasets: bool = True
Expand Down Expand Up @@ -195,15 +195,21 @@ class ProfileConfig:
profile_mapping: BaseProfileMapping | None = None

def __post_init__(self) -> None:
"Validates that we have enough information to render a profile."
# if using a user-supplied profiles.yml, validate that it exists
if self.profiles_yml_filepath and not Path(self.profiles_yml_filepath).exists():
raise CosmosValueError(f"The file {self.profiles_yml_filepath} does not exist.")
self.validate_profile()

def validate_profile(self) -> None:
"Validates that we have enough information to render a profile."
if not self.profiles_yml_filepath and not self.profile_mapping:
raise CosmosValueError("Either profiles_yml_filepath or profile_mapping must be set to render a profile")
if self.profiles_yml_filepath and self.profile_mapping:
raise CosmosValueError(
"Both profiles_yml_filepath and profile_mapping are defined and are mutually exclusive. Ensure only one of these is defined."
)

def validate_profiles_yml(self) -> None:
"Validates a user-supplied profiles.yml is present"
if self.profiles_yml_filepath and not Path(self.profiles_yml_filepath).exists():
raise CosmosValueError(f"The file {self.profiles_yml_filepath} does not exist.")

@contextlib.contextmanager
def ensure_profile(
Expand Down
28 changes: 20 additions & 8 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from airflow.utils.task_group import TaskGroup

from cosmos.airflow.graph import build_airflow_graph
from cosmos.constants import ExecutionMode
from cosmos.dbt.graph import DbtGraph
from cosmos.dbt.selector import retrieve_by_label
from cosmos.config import ProjectConfig, ExecutionConfig, RenderConfig, ProfileConfig
Expand Down Expand Up @@ -49,16 +50,21 @@ def airflow_kwargs(**kwargs: dict[str, Any]) -> dict[str, Any]:


def validate_arguments(
select: list[str], exclude: list[str], profile_args: dict[str, Any], task_args: dict[str, Any]
select: list[str],
exclude: list[str],
profile_config: ProfileConfig,
task_args: dict[str, Any],
execution_mode: ExecutionMode,
) -> None:
"""
Validate that mutually exclusive selectors filters have not been given.
Validate deprecated arguments.
:param select: A list of dbt select arguments (e.g. 'config.materialized:incremental')
:param exclude: A list of dbt exclude arguments (e.g. 'tag:nightly')
:param profile_args: Arguments to pass to the dbt profile
:param profile_config: ProfileConfig Object
:param task_args: Arguments to be used to instantiate an Airflow Task
:param execution_mode: the current execution mode
"""
for field in ("tags", "paths"):
select_items = retrieve_by_label(select, field)
Expand All @@ -69,8 +75,12 @@ def validate_arguments(

# if task_args has a schema, add it to the profile args and add a deprecated warning
if "schema" in task_args:
profile_args["schema"] = task_args["schema"]
logger.warning("Specifying a schema in the `task_args` is deprecated. Please use the `profile_args` instead.")
if profile_config.profile_mapping:
profile_config.profile_mapping.profile_args["schema"] = task_args["schema"]

if execution_mode in [ExecutionMode.LOCAL, ExecutionMode.VIRTUALENV]:
profile_config.validate_profiles_yml()


class DbtToAirflowConverter:
Expand Down Expand Up @@ -139,10 +149,6 @@ def __init__(
"RenderConfig.dbt_project_path is required for rendering an airflow DAG from a DBT Graph if no manifest is provided."
)

profile_args = {}
if profile_config.profile_mapping:
profile_args = profile_config.profile_mapping.profile_args

if not operator_args:
operator_args = {}

Expand Down Expand Up @@ -174,7 +180,13 @@ def __init__(
if execution_config.dbt_executable_path:
task_args["dbt_executable_path"] = execution_config.dbt_executable_path

validate_arguments(render_config.select, render_config.exclude, profile_args, task_args)
validate_arguments(
render_config.select,
render_config.exclude,
profile_config,
task_args,
execution_mode=execution_config.execution_mode,
)

build_airflow_graph(
nodes=dbt_graph.filtered_nodes,
Expand Down
31 changes: 24 additions & 7 deletions tests/test_config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from pathlib import Path
from unittest.mock import patch
from cosmos.profiles.postgres.user_pass import PostgresUserPasswordProfileMapping

import pytest

Expand All @@ -8,6 +9,7 @@


DBT_PROJECTS_ROOT_DIR = Path(__file__).parent / "sample/"
SAMPLE_PROFILE_YML = Path(__file__).parent / "sample/profiles.yml"
PIPELINE_FOLDER = "jaffle_shop"


Expand Down Expand Up @@ -111,17 +113,32 @@ def test_project_name():
assert dbt_project.project_name == "sample"


def test_profile_config_post_init():
def test_profile_config_validate_none():
with pytest.raises(CosmosValueError) as err_info:
ProfileConfig(profiles_yml_filepath="/tmp/some-profile", profile_name="test", target_name="test")
assert err_info.value.args[0] == "The file /tmp/some-profile does not exist."
ProfileConfig(profile_name="test", target_name="test")
assert err_info.value.args[0] == "Either profiles_yml_filepath or profile_mapping must be set to render a profile"


def test_profile_config_validate():
def test_profile_config_validate_both():
with pytest.raises(CosmosValueError) as err_info:
profile_config = ProfileConfig(profile_name="test", target_name="test")
assert profile_config.validate_profile() is None
assert err_info.value.args[0] == "Either profiles_yml_filepath or profile_mapping must be set to render a profile"
ProfileConfig(
profile_name="test",
target_name="test",
profiles_yml_filepath=SAMPLE_PROFILE_YML,
profile_mapping=PostgresUserPasswordProfileMapping(conn_id="test", profile_args={}),
)
assert (
err_info.value.args[0]
== "Both profiles_yml_filepath and profile_mapping are defined and are mutually exclusive. Ensure only one of these is defined."
)


def test_profile_config_validate_profiles_yml():
profile_config = ProfileConfig(profile_name="test", target_name="test", profiles_yml_filepath="/tmp/no-exists")
with pytest.raises(CosmosValueError) as err_info:
profile_config.validate_profiles_yml()

assert err_info.value.args[0] == "The file /tmp/no-exists does not exist."


@patch("cosmos.config.shutil.which", return_value=None)
Expand Down
22 changes: 20 additions & 2 deletions tests/test_converter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from datetime import datetime
from pathlib import Path
from unittest.mock import patch
from cosmos.profiles.postgres import PostgresUserPasswordProfileMapping

import pytest
from airflow.models import DAG
Expand All @@ -22,14 +23,31 @@ def test_validate_arguments_tags(argument_key):
selector_name = argument_key[:-1]
select = [f"{selector_name}:a,{selector_name}:b"]
exclude = [f"{selector_name}:b,{selector_name}:c"]
profile_args = {}
profile_config = ProfileConfig(
profile_name="test",
target_name="test",
profile_mapping=PostgresUserPasswordProfileMapping(conn_id="test", profile_args={}),
)
task_args = {}
with pytest.raises(CosmosValueError) as err:
validate_arguments(select, exclude, profile_args, task_args)
validate_arguments(select, exclude, profile_config, task_args, execution_mode=ExecutionMode.LOCAL)
expected = f"Can't specify the same {selector_name} in `select` and `exclude`: {{'b'}}"
assert err.value.args[0] == expected


def test_validate_arguments_schema_in_task_args():
profile_config = ProfileConfig(
profile_name="test",
target_name="test",
profile_mapping=PostgresUserPasswordProfileMapping(conn_id="test", profile_args={}),
)
task_args = {"schema": "abcd"}
validate_arguments(
select=[], exclude=[], profile_config=profile_config, task_args=task_args, execution_mode=ExecutionMode.LOCAL
)
assert profile_config.profile_mapping.profile_args["schema"] == "abcd"


parent_seed = DbtNode(
unique_id=f"{DbtResourceType.SEED}.{SAMPLE_DBT_PROJECT.stem}.seed_parent",
resource_type=DbtResourceType.SEED,
Expand Down

0 comments on commit d28d3df

Please sign in to comment.