From 2f8d0e2f787a5792feacba609c0190877062375c Mon Sep 17 00:00:00 2001 From: MrBones757 Date: Wed, 25 Oct 2023 17:48:47 +0800 Subject: [PATCH] Resolve errors occurring when dbt_project_path is str and partial support dbt_project_path=None (#605) As part of the changes made #581, some downstream logic was missed relating to the handling of a None and String-based project dir. This MR attempts to remedy this issue by adding down steam support for the project dir being None (including generation of exceptions and guarding), as well as some property reference changes in the converter. Closes: #601 Co-authored-by: tabmra --- cosmos/config.py | 82 ++++++----- cosmos/converter.py | 36 +++-- cosmos/dbt/graph.py | 95 ++++++++----- cosmos/dbt/parser/project.py | 2 +- cosmos/dbt/project.py | 47 ------- cosmos/dbt/selector.py | 12 +- dev/dags/basic_cosmos_task_group.py | 2 +- dev/dags/cosmos_manifest_example.py | 3 +- tests/dbt/parser/test_project.py | 30 ++-- tests/dbt/test_graph.py | 207 ++++++++++++++++++---------- tests/test_config.py | 89 +++++++----- tests/test_converter.py | 71 +++++++++- 12 files changed, 410 insertions(+), 266 deletions(-) delete mode 100644 cosmos/dbt/project.py diff --git a/cosmos/config.py b/cosmos/config.py index e052cb9db..15f88e037 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -5,7 +5,6 @@ import contextlib import tempfile from dataclasses import dataclass, field -from functools import cached_property from pathlib import Path from typing import Any, Iterator, Callable @@ -43,7 +42,6 @@ class RenderConfig: node_converters: dict[DbtResourceType, Callable[..., Any]] | None = None -@dataclass class ProjectConfig: """ Class for setting project config. @@ -58,29 +56,41 @@ class ProjectConfig: Required if dbt_project_path is not defined. Defaults to the folder name of dbt_project_path. """ - dbt_project_path: str | Path | None = None - models_relative_path: str | Path = "models" - seeds_relative_path: str | Path = "seeds" - snapshots_relative_path: str | Path = "snapshots" - manifest_path: str | Path | None = None - project_name: str | None = None - - @cached_property - def parsed_dbt_project_path(self) -> Path | None: - return Path(self.dbt_project_path) if self.dbt_project_path else None + dbt_project_path: Path | None = None + manifest_path: Path | None = None + models_path: Path | None = None + seeds_path: Path | None = None + snapshots_path: Path | None = None + project_name: str + + def __init__( + self, + dbt_project_path: str | Path | None = None, + models_relative_path: str | Path = "models", + seeds_relative_path: str | Path = "seeds", + snapshots_relative_path: str | Path = "snapshots", + manifest_path: str | Path | None = None, + project_name: str | None = None, + ): + if not dbt_project_path: + if not manifest_path or not project_name: + raise CosmosValueError( + "ProjectConfig requires dbt_project_path and/or manifest_path to be defined." + " If only manifest_path is defined, project_name must also be defined." + ) + if project_name: + self.project_name = project_name - @cached_property - def parsed_manifest_path(self) -> Path | None: - return Path(self.manifest_path) if self.manifest_path else None + if dbt_project_path: + self.dbt_project_path = Path(dbt_project_path) + self.models_path = self.dbt_project_path / Path(models_relative_path) + self.seeds_path = self.dbt_project_path / Path(seeds_relative_path) + self.snapshots_path = self.dbt_project_path / Path(snapshots_relative_path) + if not project_name: + self.project_name = self.dbt_project_path.stem - def __post_init__(self) -> None: - "Converts paths to `Path` objects." - if self.parsed_dbt_project_path: - self.models_relative_path = self.parsed_dbt_project_path / Path(self.models_relative_path) - self.seeds_relative_path = self.parsed_dbt_project_path / Path(self.seeds_relative_path) - self.snapshots_relative_path = self.parsed_dbt_project_path / Path(self.snapshots_relative_path) - if not self.project_name: - self.project_name = self.parsed_dbt_project_path.stem + if manifest_path: + self.manifest_path = Path(manifest_path) def validate_project(self) -> None: """ @@ -94,20 +104,14 @@ def validate_project(self) -> None: mandatory_paths = {} - if self.parsed_dbt_project_path: - project_yml_path = self.parsed_dbt_project_path / "dbt_project.yml" + if self.dbt_project_path: + project_yml_path = self.dbt_project_path / "dbt_project.yml" mandatory_paths = { "dbt_project.yml": project_yml_path, - "models directory ": self.models_relative_path, + "models directory ": self.models_path, } - elif self.parsed_manifest_path: - if not self.project_name: - raise CosmosValueError( - "project_name required when manifest_path is present and dbt_project_path is not." - ) - mandatory_paths = {"manifest file": self.parsed_manifest_path} - else: - raise CosmosValueError("dbt_project_path or manifest_path are required parameters.") + if self.manifest_path: + mandatory_paths["manifest"] = self.manifest_path for name, path in mandatory_paths.items(): if path is None or not Path(path).exists(): @@ -117,10 +121,10 @@ def is_manifest_available(self) -> bool: """ Check if the `dbt` project manifest is set and if the file exists. """ - if not self.parsed_manifest_path: + if not self.manifest_path: return False - return self.parsed_manifest_path.exists() + return self.manifest_path.exists() @dataclass @@ -159,6 +163,12 @@ def validate_profile(self) -> None: if not self.profiles_yml_filepath and not self.profile_mapping: raise CosmosValueError("Either profiles_yml_filepath or profile_mapping must be set to render a profile") + def is_profile_yml_available(self) -> bool: + """ + Check if the `dbt` profiles.yml file exists. + """ + return Path(self.profiles_yml_filepath).exists() if self.profiles_yml_filepath else False + @contextlib.contextmanager def ensure_profile( self, desired_profile_path: Path | None = None, use_mock_values: bool = False diff --git a/cosmos/converter.py b/cosmos/converter.py index 43865c56b..b1653ba52 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -5,14 +5,12 @@ import inspect from typing import Any, Callable -from pathlib import Path from airflow.models.dag import DAG from airflow.utils.task_group import TaskGroup from cosmos.airflow.graph import build_airflow_graph from cosmos.dbt.graph import DbtGraph -from cosmos.dbt.project import DbtProject from cosmos.dbt.selector import retrieve_by_label from cosmos.config import ProjectConfig, ExecutionConfig, RenderConfig, ProfileConfig from cosmos.exceptions import CosmosValueError @@ -106,11 +104,6 @@ def __init__( project_config.validate_project() emit_datasets = render_config.emit_datasets - dbt_root_path = project_config.dbt_project_path.parent - dbt_project_name = project_config.dbt_project_path.name - dbt_models_dir = project_config.models_relative_path - dbt_seeds_dir = project_config.seeds_relative_path - dbt_snapshots_dir = project_config.snapshots_relative_path test_behavior = render_config.test_behavior select = render_config.select exclude = render_config.exclude @@ -118,10 +111,12 @@ def __init__( execution_mode = execution_config.execution_mode test_indirect_selection = execution_config.test_indirect_selection load_mode = render_config.load_method - manifest_path = project_config.parsed_manifest_path dbt_executable_path = execution_config.dbt_executable_path node_converters = render_config.node_converters + if not project_config.dbt_project_path: + raise CosmosValueError("A Project Path in ProjectConfig is required for generating a Task Operators.") + profile_args = {} if profile_config.profile_mapping: profile_args = profile_config.profile_mapping.profile_args @@ -129,17 +124,18 @@ def __init__( if not operator_args: operator_args = {} - dbt_project = DbtProject( - name=dbt_project_name, - root_dir=Path(dbt_root_path), - models_dir=Path(dbt_models_dir) if dbt_models_dir else None, - seeds_dir=Path(dbt_seeds_dir) if dbt_seeds_dir else None, - snapshots_dir=Path(dbt_snapshots_dir) if dbt_snapshots_dir else None, - manifest_path=manifest_path, - ) - + # Previously, we were creating a cosmos.dbt.project.DbtProject + # DbtProject has now been replaced with ProjectConfig directly + # since the interface of the two classes were effectively the same + # Under this previous implementation, we were passing: + # - name, root dir, models dir, snapshots dir and manifest path + # Internally in the dbtProject class, we were defaulting the profile_path + # To be root dir/profiles.yml + # To keep this logic working, if converter is given no ProfileConfig, + # we can create a default retaining this value to preserve this functionality. + # We may want to consider defaulting this value in our actual ProjceConfig class? dbt_graph = DbtGraph( - project=dbt_project, + project=project_config, exclude=exclude, select=select, dbt_cmd=dbt_executable_path, @@ -152,7 +148,7 @@ def __init__( task_args = { **operator_args, # the following args may be only needed for local / venv: - "project_dir": dbt_project.dir, + "project_dir": project_config.dbt_project_path, "profile_config": profile_config, "emit_datasets": emit_datasets, } @@ -169,7 +165,7 @@ def __init__( task_args=task_args, test_behavior=test_behavior, test_indirect_selection=test_indirect_selection, - dbt_project_name=dbt_project.name, + dbt_project_name=project_config.project_name, on_warning_callback=on_warning_callback, node_converters=node_converters, ) diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index 889948fdd..ac92c46f5 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -10,7 +10,7 @@ from subprocess import PIPE, Popen from typing import Any -from cosmos.config import ProfileConfig +from cosmos.config import ProfileConfig, ProjectConfig from cosmos.constants import ( DBT_LOG_DIR_NAME, DBT_LOG_FILENAME, @@ -22,8 +22,7 @@ LoadMode, ) from cosmos.dbt.executable import get_system_dbt -from cosmos.dbt.parser.project import DbtProject as LegacyDbtProject -from cosmos.dbt.project import DbtProject +from cosmos.dbt.parser.project import LegacyDbtProject from cosmos.dbt.selector import select_nodes from cosmos.log import get_logger @@ -64,7 +63,7 @@ class DbtGraph: Example of how to use: dbt_graph = DbtGraph( - project=DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR), + project=ProjectConfig(dbt_project_path=DBT_PROJECT_PATH), exclude=["*orders*"], select=[], dbt_cmd="/usr/local/bin/dbt", @@ -77,11 +76,11 @@ class DbtGraph: def __init__( self, - project: DbtProject, + project: ProjectConfig, + profile_config: ProfileConfig | None = None, exclude: list[str] | None = None, select: list[str] | None = None, dbt_cmd: str = get_system_dbt(), - profile_config: ProfileConfig | None = None, operator_args: dict[str, Any] | None = None, dbt_deps: bool | None = True, ): @@ -107,31 +106,34 @@ def load( :param method: How to load `nodes` from a `dbt` project (automatically, using custom parser, using dbt manifest or dbt ls) :param execution_mode: Where Cosmos should run each dbt task (e.g. ExecutionMode.KUBERNETES) + + Fundamentally, there are two different execution paths + There is automatic, and manual. """ + load_method = { LoadMode.CUSTOM: self.load_via_custom_parser, LoadMode.DBT_LS: self.load_via_dbt_ls, LoadMode.DBT_MANIFEST: self.load_from_dbt_manifest, } + if method == LoadMode.AUTOMATIC: if self.project.is_manifest_available(): self.load_from_dbt_manifest() - return - elif execution_mode == ExecutionMode.LOCAL and self.project.is_profile_yml_available(): - try: - self.load_via_dbt_ls() - return - except FileNotFoundError: - self.load_via_custom_parser() - return else: - self.load_via_custom_parser() - return - - if method == LoadMode.DBT_MANIFEST and not self.project.is_manifest_available(): - raise CosmosLoadDbtException(f"Unable to load manifest using {self.project.manifest_path}") - - load_method[method]() + if ( + execution_mode == ExecutionMode.LOCAL + and self.profile_config + and self.profile_config.is_profile_yml_available() + ): + try: + self.load_via_dbt_ls() + except FileNotFoundError: + self.load_via_custom_parser() + else: + self.load_via_custom_parser() + else: + load_method[method]() def load_via_dbt_ls(self) -> None: """ @@ -145,10 +147,14 @@ def load_via_dbt_ls(self) -> None: * self.nodes * self.filtered_nodes """ - logger.info("Trying to parse the dbt project `%s` in `%s` using dbt ls...", self.project.name, self.project.dir) + logger.info( + "Trying to parse the dbt project `%s` in `%s` using dbt ls...", + self.project.project_name, + self.project.dbt_project_path, + ) - if not self.profile_config: - raise CosmosLoadDbtException("Unable to load dbt project without a profile config") + if not self.project.dbt_project_path or not self.profile_config: + raise CosmosLoadDbtException("Unable to load dbt project without project files and a profile config") if not shutil.which(self.dbt_cmd): raise CosmosLoadDbtException(f"Unable to find the dbt executable: {self.dbt_cmd}") @@ -159,16 +165,20 @@ def load_via_dbt_ls(self) -> None: env.update(env_vars) with tempfile.TemporaryDirectory() as tmpdir: - logger.info("Content of the dbt project dir <%s>: `%s`", self.project.dir, os.listdir(self.project.dir)) - logger.info("Creating symlinks from %s to `%s`", self.project.dir, tmpdir) + logger.info( + "Content of the dbt project dir <%s>: `%s`", + self.project.dbt_project_path, + os.listdir(self.project.dbt_project_path), + ) + logger.info("Creating symlinks from %s to `%s`", self.project.dbt_project_path, tmpdir) # We create symbolic links to the original directory files and directories. # This allows us to run the dbt command from within the temporary directory, outputting any necessary # artifact and also allow us to run `dbt deps` tmpdir_path = Path(tmpdir) ignore_paths = (DBT_LOG_DIR_NAME, DBT_TARGET_DIR_NAME, "dbt_packages", "profiles.yml") - for child_name in os.listdir(self.project.dir): + for child_name in os.listdir(self.project.dbt_project_path): if child_name not in ignore_paths: - os.symlink(self.project.dir / child_name, tmpdir_path / child_name) + os.symlink(self.project.dbt_project_path / child_name, tmpdir_path / child_name) local_flags = [ "--project-dir", @@ -260,7 +270,7 @@ def load_via_dbt_ls(self) -> None: unique_id=node_dict["unique_id"], resource_type=DbtResourceType(node_dict["resource_type"]), depends_on=node_dict.get("depends_on", {}).get("nodes", []), - file_path=self.project.dir / node_dict["original_file_path"], + file_path=self.project.dbt_project_path / node_dict["original_file_path"], tags=node_dict["tags"], config=node_dict["config"], ) @@ -287,13 +297,16 @@ def load_via_custom_parser(self) -> None: * self.nodes * self.filtered_nodes """ - logger.info("Trying to parse the dbt project `%s` using a custom Cosmos method...", self.project.name) + logger.info("Trying to parse the dbt project `%s` using a custom Cosmos method...", self.project.project_name) + + if not self.project.dbt_project_path or not self.project.models_path or not self.project.seeds_path: + raise CosmosLoadDbtException("Unable to load dbt project without project files") project = LegacyDbtProject( - dbt_root_path=str(self.project.root_dir), - dbt_models_dir=self.project.models_dir.stem if self.project.models_dir else None, - dbt_seeds_dir=self.project.seeds_dir.stem if self.project.seeds_dir else None, - project_name=self.project.name, + project_name=self.project.dbt_project_path.stem, + dbt_root_path=self.project.dbt_project_path.parent.as_posix(), + dbt_models_dir=self.project.models_path.stem, + dbt_seeds_dir=self.project.seeds_path.stem, operator_args=self.operator_args, ) nodes = {} @@ -315,7 +328,7 @@ def load_via_custom_parser(self) -> None: self.nodes = nodes self.filtered_nodes = select_nodes( - project_dir=self.project.dir, nodes=nodes, select=self.select, exclude=self.exclude + project_dir=self.project.dbt_project_path, nodes=nodes, select=self.select, exclude=self.exclude ) self.update_node_dependency() @@ -337,7 +350,11 @@ def load_from_dbt_manifest(self) -> None: * self.nodes * self.filtered_nodes """ - logger.info("Trying to parse the dbt project `%s` using a dbt manifest...", self.project.name) + logger.info("Trying to parse the dbt project `%s` using a dbt manifest...", self.project.project_name) + + if not self.project.is_manifest_available(): + raise CosmosLoadDbtException(f"Unable to load manifest using {self.project.manifest_path}") + nodes = {} with open(self.project.manifest_path) as fp: # type: ignore[arg-type] manifest = json.load(fp) @@ -349,7 +366,9 @@ def load_from_dbt_manifest(self) -> None: unique_id=unique_id, resource_type=DbtResourceType(node_dict["resource_type"]), depends_on=node_dict.get("depends_on", {}).get("nodes", []), - file_path=self.project.dir / node_dict["original_file_path"], + file_path=self.project.dbt_project_path / Path(node_dict["original_file_path"]) + if self.project.dbt_project_path + else Path(node_dict["original_file_path"]), tags=node_dict["tags"], config=node_dict["config"], ) @@ -358,7 +377,7 @@ def load_from_dbt_manifest(self) -> None: self.nodes = nodes self.filtered_nodes = select_nodes( - project_dir=self.project.dir, nodes=nodes, select=self.select, exclude=self.exclude + project_dir=self.project.dbt_project_path, nodes=nodes, select=self.select, exclude=self.exclude ) self.update_node_dependency() diff --git a/cosmos/dbt/parser/project.py b/cosmos/dbt/parser/project.py index bb86d41a6..e154bb0ed 100644 --- a/cosmos/dbt/parser/project.py +++ b/cosmos/dbt/parser/project.py @@ -233,7 +233,7 @@ def __repr__(self) -> str: @dataclass -class DbtProject: +class LegacyDbtProject: """ Represents a single dbt project. """ diff --git a/cosmos/dbt/project.py b/cosmos/dbt/project.py deleted file mode 100644 index fe60f5751..000000000 --- a/cosmos/dbt/project.py +++ /dev/null @@ -1,47 +0,0 @@ -from __future__ import annotations -from dataclasses import dataclass -from pathlib import Path - - -DEFAULT_PROFILE_FILE_NAME = "profiles.yml" - - -@dataclass -class DbtProject: - name: str - root_dir: Path - models_dir: Path | None = None - seeds_dir: Path | None = None - snapshots_dir: Path | None = None - manifest_path: Path | None = None - profile_path: Path | None = None - _cosmos_created_profile_file: bool = False - - def __post_init__(self) -> None: - if self.models_dir is None: - self.models_dir = self.dir / "models" - if self.seeds_dir is None: - self.seeds_dir = self.dir / "seeds" - if self.snapshots_dir is None: - self.snapshots_dir = self.dir / "snapshots" - if self.profile_path is None: - self.profile_path = self.dir / "profiles.yml" - - @property - def dir(self) -> Path: - """ - Path to dbt pipeline, defined by self.root_dir and self.name. - """ - return self.root_dir / self.name - - def is_manifest_available(self) -> bool: - """ - Check if the `dbt` project manifest is set and if the file exists. - """ - return self.manifest_path is not None and Path(self.manifest_path).exists() - - def is_profile_yml_available(self) -> bool: - """ - Check if the `dbt` profiles.yml file exists. - """ - return Path(self.profile_path).exists() if self.profile_path else False diff --git a/cosmos/dbt/selector.py b/cosmos/dbt/selector.py index 926cc6b1a..0e6034a0b 100644 --- a/cosmos/dbt/selector.py +++ b/cosmos/dbt/selector.py @@ -27,7 +27,7 @@ class SelectorConfig: Supports to load it from a string. """ - def __init__(self, project_dir: Path, statement: str): + def __init__(self, project_dir: Path | None, statement: str): """ Create a selector config file. @@ -64,7 +64,10 @@ def load_from_statement(self, statement: str) -> None: for item in items: if item.startswith(PATH_SELECTOR): index = len(PATH_SELECTOR) - self.paths.append(self.project_dir / item[index:]) + if self.project_dir: + self.paths.append(self.project_dir / Path(item[index:])) + else: + self.paths.append(Path(item[index:])) elif item.startswith(TAG_SELECTOR): index = len(TAG_SELECTOR) self.tags.append(item[index:]) @@ -165,7 +168,10 @@ def retrieve_by_label(statement_list: list[str], label: str) -> set[str]: def select_nodes( - project_dir: Path, nodes: dict[str, DbtNode], select: list[str] | None = None, exclude: list[str] | None = None + project_dir: Path | None, + nodes: dict[str, DbtNode], + select: list[str] | None = None, + exclude: list[str] | None = None, ) -> dict[str, DbtNode]: """ Given a group of nodes within a project, apply select and exclude filters using diff --git a/dev/dags/basic_cosmos_task_group.py b/dev/dags/basic_cosmos_task_group.py index 2f875b8c3..50cb6ed09 100644 --- a/dev/dags/basic_cosmos_task_group.py +++ b/dev/dags/basic_cosmos_task_group.py @@ -38,7 +38,7 @@ def basic_cosmos_task_group() -> None: jaffle_shop = DbtTaskGroup( group_id="test_123", project_config=ProjectConfig( - DBT_ROOT_PATH / "jaffle_shop", + (DBT_ROOT_PATH / "jaffle_shop").as_posix(), ), operator_args={"install_deps": True}, profile_config=profile_config, diff --git a/dev/dags/cosmos_manifest_example.py b/dev/dags/cosmos_manifest_example.py index 14ce42606..67a9cb3ce 100644 --- a/dev/dags/cosmos_manifest_example.py +++ b/dev/dags/cosmos_manifest_example.py @@ -25,8 +25,9 @@ cosmos_manifest_example = DbtDag( # dbt/cosmos-specific parameters project_config=ProjectConfig( - DBT_ROOT_PATH / "jaffle_shop", + dbt_project_path=DBT_ROOT_PATH / "jaffle_shop", manifest_path=DBT_ROOT_PATH / "jaffle_shop" / "target" / "manifest.json", + project_name="jaffle_shop", ), profile_config=profile_config, render_config=RenderConfig(load_method=LoadMode.DBT_MANIFEST, select=["path:models/customers.sql"]), diff --git a/tests/dbt/parser/test_project.py b/tests/dbt/parser/test_project.py index 31a79ceba..4f13a3eb3 100644 --- a/tests/dbt/parser/test_project.py +++ b/tests/dbt/parser/test_project.py @@ -10,7 +10,7 @@ import pytest import yaml -from cosmos.dbt.parser.project import DbtModel, DbtModelType, DbtProject +from cosmos.dbt.parser.project import DbtModel, DbtModelType, LegacyDbtProject DBT_PROJECT_PATH = Path(__name__).parent.parent.parent.parent.parent / "dev/dags/dbt/" SAMPLE_CSV_PATH = DBT_PROJECT_PATH / "jaffle_shop/seeds/raw_customers.csv" @@ -19,8 +19,8 @@ SAMPLE_YML_PATH = DBT_PROJECT_PATH / "jaffle_shop/models/schema.yml" -def test_dbtproject__handle_csv_file(): - dbt_project = DbtProject( +def test_LegacyDbtProject__handle_csv_file(): + dbt_project = LegacyDbtProject( project_name="jaffle_shop", ) assert not dbt_project.seeds @@ -33,8 +33,8 @@ def test_dbtproject__handle_csv_file(): assert raw_customers.path == SAMPLE_CSV_PATH -def test_dbtproject__handle_sql_file_model(): - dbt_project = DbtProject( +def test_LegacyDbtProject__handle_sql_file_model(): + dbt_project = LegacyDbtProject( project_name="jaffle_shop", dbt_root_path=DBT_PROJECT_PATH, ) @@ -49,8 +49,8 @@ def test_dbtproject__handle_sql_file_model(): assert raw_customers.path == SAMPLE_MODEL_SQL_PATH -def test_dbtproject__handle_sql_file_snapshot(): - dbt_project = DbtProject( +def test_LegacyDbtProject__handle_sql_file_snapshot(): + dbt_project = LegacyDbtProject( project_name="jaffle_shop", dbt_root_path=DBT_PROJECT_PATH, ) @@ -65,8 +65,8 @@ def test_dbtproject__handle_sql_file_snapshot(): assert raw_customers.path == SAMPLE_SNAPSHOT_SQL_PATH -def test_dbtproject__handle_config_file(): - dbt_project = DbtProject( +def test_LegacyDbtProject__handle_config_file(): + dbt_project = LegacyDbtProject( project_name="jaffle_shop", dbt_root_path=DBT_PROJECT_PATH, ) @@ -81,25 +81,25 @@ def test_dbtproject__handle_config_file(): assert sample_test.path == SAMPLE_YML_PATH -def test_dbtproject__handle_config_file_empty_file(): +def test_LegacyDbtProject__handle_config_file_empty_file(): with NamedTemporaryFile("w") as tmp_fp: tmp_fp.flush() sample_config_file_path = Path(tmp_fp.name) - dbt_project = DbtProject(project_name="empty_project") + dbt_project = LegacyDbtProject(project_name="empty_project") assert not dbt_project.models dbt_project._handle_config_file(sample_config_file_path) assert not dbt_project.models -def test_dbtproject__handle_config_file_with_unknown_name(): +def test_LegacyDbtProject__handle_config_file_with_unknown_name(): yaml_data = {"models": [{"name": "unknown"}]} with NamedTemporaryFile("w") as tmp_fp: yaml.dump(yaml_data, tmp_fp) tmp_fp.flush() sample_config_file_path = Path(tmp_fp.name) - dbt_project = DbtProject(project_name="empty_project") + dbt_project = LegacyDbtProject(project_name="empty_project") assert not dbt_project.models dbt_project._handle_config_file(sample_config_file_path) assert not dbt_project.models @@ -112,8 +112,8 @@ def test_dbtproject__handle_config_file_with_unknown_name(): (["tag1", "tag2"], {"materialized:view", "tags:tag1", "tags:tag2"}), ], ) -def test_dbtproject__handle_config_file_with_selector(input_tags, expected_config_selectors): - dbt_project = DbtProject( +def test_LegacyDbtProject__handle_config_file_with_selector(input_tags, expected_config_selectors): + dbt_project = LegacyDbtProject( project_name="jaffle_shop", dbt_root_path=DBT_PROJECT_PATH, ) diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index dc4189aca..f176fa445 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -5,14 +5,13 @@ import pytest -from cosmos.config import ProfileConfig +from cosmos.config import ProfileConfig, ProjectConfig from cosmos.constants import DbtResourceType, ExecutionMode from cosmos.dbt.graph import CosmosLoadDbtException, DbtGraph, LoadMode -from cosmos.dbt.project import DbtProject from cosmos.profiles import PostgresUserPasswordProfileMapping DBT_PROJECTS_ROOT_DIR = Path(__file__).parent.parent.parent / "dev/dags/dbt" -DBT_PIPELINE_NAME = "jaffle_shop" +DBT_PROJECT_NAME = "jaffle_shop" SAMPLE_MANIFEST = Path(__file__).parent.parent / "sample/manifest.json" SAMPLE_MANIFEST_PY = Path(__file__).parent.parent / "sample/manifest_python.json" SAMPLE_MANIFEST_MODEL_VERSION = Path(__file__).parent.parent / "sample/manifest_model_version.json" @@ -24,10 +23,10 @@ def tmp_dbt_project_dir(): """ Creates a plain dbt project structure, which does not contain logs or target folders. """ - source_proj_dir = DBT_PROJECTS_ROOT_DIR / DBT_PIPELINE_NAME + source_proj_dir = DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME tmp_dir = Path(tempfile.mkdtemp()) - target_proj_dir = tmp_dir / DBT_PIPELINE_NAME + target_proj_dir = tmp_dir / DBT_PROJECT_NAME shutil.copytree(source_proj_dir, target_proj_dir) shutil.rmtree(target_proj_dir / "logs", ignore_errors=True) shutil.rmtree(target_proj_dir / "target", ignore_errors=True) @@ -37,12 +36,19 @@ def tmp_dbt_project_dir(): @pytest.mark.parametrize( - "pipeline_name,manifest_filepath,model_filepath", - [("jaffle_shop", SAMPLE_MANIFEST, "customers.sql"), ("jaffle_shop_python", SAMPLE_MANIFEST_PY, "customers.py")], + "project_name,manifest_filepath,model_filepath", + [(DBT_PROJECT_NAME, SAMPLE_MANIFEST, "customers.sql"), ("jaffle_shop_python", SAMPLE_MANIFEST_PY, "customers.py")], ) -def test_load_via_manifest_with_exclude(pipeline_name, manifest_filepath, model_filepath): - dbt_project = DbtProject(name=pipeline_name, root_dir=DBT_PROJECTS_ROOT_DIR, manifest_path=manifest_filepath) - dbt_graph = DbtGraph(project=dbt_project, exclude=["config.materialized:table"]) +def test_load_via_manifest_with_exclude(project_name, manifest_filepath, model_filepath): + project_config = ProjectConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name, manifest_path=manifest_filepath + ) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", + ) + dbt_graph = DbtGraph(project=project_config, profile_config=profile_config, exclude=["config.materialized:table"]) dbt_graph.load_from_dbt_manifest() assert len(dbt_graph.nodes) == 28 @@ -58,13 +64,20 @@ def test_load_via_manifest_with_exclude(pipeline_name, manifest_filepath, model_ "model.jaffle_shop.stg_orders", "model.jaffle_shop.stg_payments", ] - assert sample_node.file_path == DBT_PROJECTS_ROOT_DIR / f"{pipeline_name}/models/{model_filepath}" + assert sample_node.file_path == DBT_PROJECTS_ROOT_DIR / f"{project_name}/models/{model_filepath}" @patch("cosmos.dbt.graph.DbtGraph.load_from_dbt_manifest", return_value=None) def test_load_automatic_manifest_is_available(mock_load_from_dbt_manifest): - dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR, manifest_path=SAMPLE_MANIFEST) - dbt_graph = DbtGraph(project=dbt_project) + project_config = ProjectConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, manifest_path=SAMPLE_MANIFEST + ) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", + ) + dbt_graph = DbtGraph(project=project_config, profile_config=profile_config) dbt_graph.load(execution_mode=ExecutionMode.LOCAL) assert mock_load_from_dbt_manifest.called @@ -72,8 +85,15 @@ def test_load_automatic_manifest_is_available(mock_load_from_dbt_manifest): @patch("cosmos.dbt.graph.DbtGraph.load_via_custom_parser", side_effect=FileNotFoundError()) @patch("cosmos.dbt.graph.DbtGraph.load_via_dbt_ls", return_value=None) def test_load_automatic_without_manifest(mock_load_via_dbt_ls, mock_load_via_custom_parser): - dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR, manifest_path="/tmp/manifest.json") - dbt_graph = DbtGraph(project=dbt_project) + project_config = ProjectConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, manifest_path="/tmp/manifest.json" + ) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", + ) + dbt_graph = DbtGraph(project=project_config, profile_config=profile_config) dbt_graph.load(execution_mode=ExecutionMode.LOCAL) assert mock_load_via_dbt_ls.called assert not mock_load_via_custom_parser.called @@ -82,16 +102,26 @@ def test_load_automatic_without_manifest(mock_load_via_dbt_ls, mock_load_via_cus @patch("cosmos.dbt.graph.DbtGraph.load_via_custom_parser", return_value=None) @patch("cosmos.dbt.graph.DbtGraph.load_via_dbt_ls", side_effect=FileNotFoundError()) def test_load_automatic_without_manifest_and_without_dbt_cmd(mock_load_via_dbt_ls, mock_load_via_custom_parser): - dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR) - dbt_graph = DbtGraph(project=dbt_project) + project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", + ) + dbt_graph = DbtGraph(project=project_config, profile_config=profile_config) dbt_graph.load(execution_mode=ExecutionMode.LOCAL, method=LoadMode.AUTOMATIC) assert mock_load_via_dbt_ls.called assert mock_load_via_custom_parser.called def test_load_manifest_without_manifest(): - dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR) - dbt_graph = DbtGraph(project=dbt_project) + project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", + ) + dbt_graph = DbtGraph(project=project_config, profile_config=profile_config) with pytest.raises(CosmosLoadDbtException) as err_info: dbt_graph.load(execution_mode=ExecutionMode.LOCAL, method=LoadMode.DBT_MANIFEST) assert err_info.value.args[0] == "Unable to load manifest using None" @@ -99,8 +129,15 @@ def test_load_manifest_without_manifest(): @patch("cosmos.dbt.graph.DbtGraph.load_from_dbt_manifest", return_value=None) def test_load_manifest_with_manifest(mock_load_from_dbt_manifest): - dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR, manifest_path=SAMPLE_MANIFEST) - dbt_graph = DbtGraph(project=dbt_project) + project_config = ProjectConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, manifest_path=SAMPLE_MANIFEST + ) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", + ) + dbt_graph = DbtGraph(project=project_config, profile_config=profile_config) dbt_graph.load(execution_mode=ExecutionMode.LOCAL, method=LoadMode.DBT_MANIFEST) assert mock_load_from_dbt_manifest.called @@ -122,8 +159,13 @@ def test_load_manifest_with_manifest(mock_load_from_dbt_manifest): def test_load( mock_load_from_dbt_manifest, mock_load_via_dbt_ls, mock_load_via_custom_parser, exec_mode, method, expected_function ): - dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR) - dbt_graph = DbtGraph(project=dbt_project) + project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", + ) + dbt_graph = DbtGraph(project=project_config, profile_config=profile_config) dbt_graph.load(method=method, execution_mode=exec_mode) load_function = locals()[expected_function] @@ -138,9 +180,9 @@ def test_load_via_dbt_ls_does_not_create_target_logs_in_original_folder(mock_pop assert not (tmp_dbt_project_dir / "target").exists() assert not (tmp_dbt_project_dir / "logs").exists() - dbt_project = DbtProject(name=DBT_PIPELINE_NAME, root_dir=tmp_dbt_project_dir) + project_config = ProjectConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) dbt_graph = DbtGraph( - project=dbt_project, + project=project_config, profile_config=ProfileConfig( profile_name="default", target_name="default", @@ -155,15 +197,15 @@ def test_load_via_dbt_ls_does_not_create_target_logs_in_original_folder(mock_pop assert not (tmp_dbt_project_dir / "logs").exists() used_cwd = Path(mock_popen.call_args[0][0][-5]) - assert used_cwd != dbt_project.dir + assert used_cwd != project_config.dbt_project_path assert not used_cwd.exists() @pytest.mark.integration def test_load_via_dbt_ls_with_exclude(): - dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR) + project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) dbt_graph = DbtGraph( - project=dbt_project, + project=project_config, select=["*customers*"], exclude=["*orders*"], profile_config=ProfileConfig( @@ -204,11 +246,11 @@ def test_load_via_dbt_ls_with_exclude(): @pytest.mark.integration -@pytest.mark.parametrize("pipeline_name", ("jaffle_shop", "jaffle_shop_python")) -def test_load_via_dbt_ls_without_exclude(pipeline_name): - dbt_project = DbtProject(name=pipeline_name, root_dir=DBT_PROJECTS_ROOT_DIR) +@pytest.mark.parametrize("project_name", ("jaffle_shop", "jaffle_shop_python")) +def test_load_via_dbt_ls_without_exclude(project_name): + project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name) dbt_graph = DbtGraph( - project=dbt_project, + project=project_config, profile_config=ProfileConfig( profile_name="default", target_name="default", @@ -224,25 +266,32 @@ def test_load_via_dbt_ls_without_exclude(pipeline_name): assert len(dbt_graph.nodes) == 28 +def test_load_via_custom_without_project_path(): + project_config = ProjectConfig(manifest_path=SAMPLE_MANIFEST, project_name="test") + dbt_graph = DbtGraph(dbt_cmd="/inexistent/dbt", project=project_config) + with pytest.raises(CosmosLoadDbtException) as err_info: + dbt_graph.load_via_custom_parser() + + expected = "Unable to load dbt project without project files" + assert err_info.value.args[0] == expected + + def test_load_via_dbt_ls_without_profile(): - dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR) - dbt_graph = DbtGraph( - dbt_cmd="/inexistent/dbt", - project=dbt_project, - ) + project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + dbt_graph = DbtGraph(dbt_cmd="/inexistent/dbt", project=project_config) with pytest.raises(CosmosLoadDbtException) as err_info: dbt_graph.load_via_dbt_ls() - expected = "Unable to load dbt project without a profile config" + expected = "Unable to load dbt project without project files and a profile config" assert err_info.value.args[0] == expected def test_load_via_dbt_ls_with_invalid_dbt_path(): - dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR) + project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) with patch("pathlib.Path.exists", return_value=True): dbt_graph = DbtGraph( dbt_cmd="/inexistent/dbt", - project=dbt_project, + project=project_config, profile_config=ProfileConfig( profile_name="default", target_name="default", @@ -260,18 +309,17 @@ def test_load_via_dbt_ls_with_invalid_dbt_path(): @pytest.mark.parametrize("load_method", ["load_via_dbt_ls", "load_from_dbt_manifest"]) @pytest.mark.integration def test_load_via_dbt_ls_with_sources(load_method): - pipeline_name = "simple" + project_name = "simple" dbt_graph = DbtGraph( dbt_deps=False, - project=DbtProject( - name=pipeline_name, - root_dir=DBT_PROJECTS_ROOT_DIR, + project=ProjectConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name, manifest_path=SAMPLE_MANIFEST_SOURCE if load_method == "load_from_dbt_manifest" else None, ), profile_config=ProfileConfig( profile_name="simple", target_name="dev", - profiles_yml_filepath=(DBT_PROJECTS_ROOT_DIR / pipeline_name / "profiles.yml"), + profiles_yml_filepath=(DBT_PROJECTS_ROOT_DIR / project_name / "profiles.yml"), ), ) getattr(dbt_graph, load_method)() @@ -282,11 +330,10 @@ def test_load_via_dbt_ls_with_sources(load_method): @pytest.mark.integration def test_load_via_dbt_ls_without_dbt_deps(): - pipeline_name = "jaffle_shop" - dbt_project = DbtProject(name=pipeline_name, root_dir=DBT_PROJECTS_ROOT_DIR) + project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) dbt_graph = DbtGraph( dbt_deps=False, - project=dbt_project, + project=project_config, profile_config=ProfileConfig( profile_name="default", target_name="default", @@ -310,9 +357,9 @@ def test_load_via_dbt_ls_with_zero_returncode_and_non_empty_stderr(mock_popen, t mock_popen().communicate.return_value = ("", "Some stderr warnings") mock_popen().returncode = 0 - dbt_project = DbtProject(name=DBT_PIPELINE_NAME, root_dir=tmp_dbt_project_dir) + project_config = ProjectConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) dbt_graph = DbtGraph( - project=dbt_project, + project=project_config, profile_config=ProfileConfig( profile_name="default", target_name="default", @@ -332,9 +379,9 @@ def test_load_via_dbt_ls_with_non_zero_returncode(mock_popen): mock_popen().communicate.return_value = ("", "Some stderr message") mock_popen().returncode = 1 - dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR) + project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) dbt_graph = DbtGraph( - project=dbt_project, + project=project_config, profile_config=ProfileConfig( profile_name="default", target_name="default", @@ -355,9 +402,9 @@ def test_load_via_dbt_ls_with_non_zero_returncode(mock_popen): @patch("cosmos.dbt.graph.Popen.communicate", return_value=("Some Runtime Error", "")) def test_load_via_dbt_ls_with_runtime_error_in_stdout(mock_popen_communicate): # It may seem strange, but at least until dbt 1.6.0, there are circumstances when it outputs errors to stdout - dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR) + project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) dbt_graph = DbtGraph( - project=dbt_project, + project=project_config, profile_config=ProfileConfig( profile_name="default", target_name="default", @@ -375,13 +422,15 @@ def test_load_via_dbt_ls_with_runtime_error_in_stdout(mock_popen_communicate): mock_popen_communicate.assert_called_once() -@pytest.mark.parametrize("pipeline_name", ("jaffle_shop", "jaffle_shop_python")) -def test_load_via_load_via_custom_parser(pipeline_name): - dbt_project = DbtProject( - name=pipeline_name, - root_dir=DBT_PROJECTS_ROOT_DIR, +@pytest.mark.parametrize("project_name", ("jaffle_shop", "jaffle_shop_python")) +def test_load_via_load_via_custom_parser(project_name): + project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", ) - dbt_graph = DbtGraph(project=dbt_project) + dbt_graph = DbtGraph(project=project_config, profile_config=profile_config) dbt_graph.load_via_custom_parser() @@ -391,16 +440,30 @@ def test_load_via_load_via_custom_parser(pipeline_name): @patch("cosmos.dbt.graph.DbtGraph.update_node_dependency", return_value=None) def test_update_node_dependency_called(mock_update_node_dependency): - dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR, manifest_path=SAMPLE_MANIFEST) - dbt_graph = DbtGraph(project=dbt_project) + project_config = ProjectConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, manifest_path=SAMPLE_MANIFEST + ) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", + ) + dbt_graph = DbtGraph(project=project_config, profile_config=profile_config) dbt_graph.load() assert mock_update_node_dependency.called def test_update_node_dependency_target_exist(): - dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR, manifest_path=SAMPLE_MANIFEST) - dbt_graph = DbtGraph(project=dbt_project) + project_config = ProjectConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, manifest_path=SAMPLE_MANIFEST + ) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", + ) + dbt_graph = DbtGraph(project=project_config, profile_config=profile_config) dbt_graph.load() for _, nodes in dbt_graph.nodes.items(): @@ -410,8 +473,15 @@ def test_update_node_dependency_target_exist(): def test_update_node_dependency_test_not_exist(): - dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR, manifest_path=SAMPLE_MANIFEST) - dbt_graph = DbtGraph(project=dbt_project, exclude=["config.materialized:test"]) + project_config = ProjectConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, manifest_path=SAMPLE_MANIFEST + ) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", + ) + dbt_graph = DbtGraph(project=project_config, profile_config=profile_config, exclude=["config.materialized:test"]) dbt_graph.load_from_dbt_manifest() for _, nodes in dbt_graph.filtered_nodes.items(): @@ -422,9 +492,8 @@ def test_update_node_dependency_test_not_exist(): @pytest.mark.parametrize("load_method", ["load_via_dbt_ls", "load_from_dbt_manifest"]) def test_load_dbt_ls_and_manifest_with_model_version(load_method): dbt_graph = DbtGraph( - project=DbtProject( - name="model_version", - root_dir=DBT_PROJECTS_ROOT_DIR, + project=ProjectConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / "model_version", manifest_path=SAMPLE_MANIFEST_MODEL_VERSION if load_method == "load_from_dbt_manifest" else None, ), profile_config=ProfileConfig( diff --git a/tests/test_config.py b/tests/test_config.py index 9dd936fba..c0ae32101 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -2,7 +2,7 @@ import pytest -from cosmos.config import ProjectConfig +from cosmos.config import ProfileConfig, ProjectConfig from cosmos.exceptions import CosmosValueError @@ -10,29 +10,56 @@ PIPELINE_FOLDER = "jaffle_shop" -def test_valid_parameters(): +def test_init_with_project_path_only(): + """ + Passing dbt_project_path on its own should create a valid ProjectConfig with relative paths defined + It should also have a project_name based on the path + """ project_config = ProjectConfig(dbt_project_path="path/to/dbt/project") - assert project_config.parsed_dbt_project_path == Path("path/to/dbt/project") - assert project_config.models_relative_path == Path("path/to/dbt/project/models") - assert project_config.seeds_relative_path == Path("path/to/dbt/project/seeds") - assert project_config.snapshots_relative_path == Path("path/to/dbt/project/snapshots") + assert project_config.dbt_project_path == Path("path/to/dbt/project") + assert project_config.models_path == Path("path/to/dbt/project/models") + assert project_config.seeds_path == Path("path/to/dbt/project/seeds") + assert project_config.snapshots_path == Path("path/to/dbt/project/snapshots") + assert project_config.project_name == "project" assert project_config.manifest_path is None def test_init_with_manifest_path_and_project_path_succeeds(): """ - Passing a manifest path AND project path together should succeed, as previous + Passing a manifest path AND project path together should succeed + project_name in this case should be based on dbt_project_path """ project_config = ProjectConfig(dbt_project_path="/tmp/some-path", manifest_path="target/manifest.json") - assert project_config.parsed_manifest_path == Path("target/manifest.json") + assert project_config.manifest_path == Path("target/manifest.json") + assert project_config.project_name == "some-path" + + +def test_init_with_no_params(): + """ + The constructor now validates that the required base fields are present + As such, we should test here that the correct exception is raised if these are not correctly defined + This functionality has been moved from the validate method + """ + with pytest.raises(CosmosValueError) as err_info: + ProjectConfig() + print(err_info.value.args[0]) + assert err_info.value.args[0] == ( + "ProjectConfig requires dbt_project_path and/or manifest_path to be defined." + "If only manifest_path is defined, project_name must also be defined." + ) -def test_init_with_manifest_path_and_not_project_path_succeeds(): +def test_init_with_manifest_path_and_not_project_path_and_not_project_name_fails(): """ - Since dbt_project_path is optional, we should be able to operate with only a manifest + Passing a manifest alone should fail since we also require a project_name """ - project_config = ProjectConfig(manifest_path="target/manifest.json") - assert project_config.parsed_manifest_path == Path("target/manifest.json") + with pytest.raises(CosmosValueError) as err_info: + ProjectConfig(manifest_path=DBT_PROJECTS_ROOT_DIR / "manifest.json") + print(err_info.value.args[0]) + assert err_info.value.args[0] == ( + "ProjectConfig requires dbt_project_path and/or manifest_path to be defined." + "If only manifest_path is defined, project_name must also be defined." + ) def test_validate_with_project_path_and_manifest_path_succeeds(): @@ -53,17 +80,6 @@ def test_validate_with_project_path_and_not_manifest_path_succeeds(): assert project_config.validate_project() is None -def test_validate_with_manifest_path_and_not_project_path_and_not_project_name_fails(): - """ - Passing a manifest alone should fail since we also require a project_name - """ - project_config = ProjectConfig(manifest_path=DBT_PROJECTS_ROOT_DIR / "manifest.json") - with pytest.raises(CosmosValueError) as err_info: - assert project_config.validate_project() is None - print(err_info.value.args[0]) - assert err_info.value.args[0] == "project_name required when manifest_path is present and dbt_project_path is not." - - def test_validate_with_manifest_path_and_project_name_and_not_project_path_succeeds(): """ Passing a manifest and project name together should succeed. @@ -72,16 +88,6 @@ def test_validate_with_manifest_path_and_project_name_and_not_project_path_succe assert project_config.validate_project() is None -def test_validate_no_paths_fails(): - """ - Passing no manifest and no project directory should fail. - """ - project_config = ProjectConfig() - with pytest.raises(CosmosValueError) as err_info: - assert project_config.validate_project() is None - assert err_info.value.args[0] == "dbt_project_path or manifest_path are required parameters." - - def test_validate_project_missing_fails(): """ Passing a project dir that does not exist where specified should fail @@ -89,7 +95,7 @@ def test_validate_project_missing_fails(): project_config = ProjectConfig(dbt_project_path=Path("/tmp")) with pytest.raises(CosmosValueError) as err_info: assert project_config.validate_project() is None - assert err_info.value.args[0] == "Could not find dbt_project.yml at /tmp/dbt_project.yml" + assert err_info.value.args[0] == "Could not find dbt_project.yml at /tmp/dbt_project.yml" def test_is_manifest_available_is_true(): @@ -107,3 +113,18 @@ def test_is_manifest_available_is_false(): def test_project_name(): dbt_project = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR) assert dbt_project.project_name == "sample" + + +def test_profile_config_post_init(): + with pytest.raises(CosmosValueError) as err_info: + ProfileConfig(profiles_yml_filepath="/tmp/some-profile", profile_name="test", target_name="test") + assert err_info.value.args[0] == "The file /tmp/some-profile does not exist." + + +def test_profile_config_validate(): + with pytest.raises(CosmosValueError) as err_info: + profile_config = ProfileConfig(profile_name="test", target_name="test") + assert profile_config.validate_profile() is None + assert ( + err_info.value.args[0] == "Either profiles_yml_filepath or profile_mapping must be set to render a profile" + ) diff --git a/tests/test_converter.py b/tests/test_converter.py index 0d321730a..5fb50a00f 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -12,6 +12,7 @@ SAMPLE_PROFILE_YML = Path(__file__).parent / "sample/profiles.yml" SAMPLE_DBT_PROJECT = Path(__file__).parent / "sample/" +SAMPLE_DBT_MANIFEST = Path(__file__).parent / "sample/manifest.json" @pytest.mark.parametrize("argument_key", ["tags", "paths"]) @@ -48,7 +49,8 @@ def test_validate_arguments_tags(argument_key): @patch("cosmos.converter.DbtGraph.load") def test_converter_creates_dag_with_seed(mock_load_dbt_graph, execution_mode, operator_args): """ - This test will raise exceptions if we are trying to pass incorrect arguments to operator constructors. + This test validates that a project, given only a project path as a Path() Object, and seeds + is able to successfully generate a converter """ project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT) execution_config = ExecutionConfig(execution_mode=execution_mode) @@ -67,3 +69,70 @@ def test_converter_creates_dag_with_seed(mock_load_dbt_graph, execution_mode, op operator_args=operator_args, ) assert converter + + +@pytest.mark.parametrize( + "execution_mode,operator_args", + [ + (ExecutionMode.KUBERNETES, {}), + # (ExecutionMode.DOCKER, {"image": "sample-image"}), + ], +) +@patch("cosmos.converter.DbtGraph.filtered_nodes", nodes) +@patch("cosmos.converter.DbtGraph.load") +def test_converter_creates_dag_with_project_path_str(mock_load_dbt_graph, execution_mode, operator_args): + """ + This test validates that a project, given only a project path as a string, and seeds + is able to successfully generate a converter + """ + project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT.as_posix()) + execution_config = ExecutionConfig(execution_mode=execution_mode) + render_config = RenderConfig(emit_datasets=True) + profile_config = ProfileConfig( + profile_name="my_profile_name", + target_name="my_target_name", + profiles_yml_filepath=SAMPLE_PROFILE_YML, + ) + converter = DbtToAirflowConverter( + nodes=nodes, + project_config=project_config, + profile_config=profile_config, + execution_config=execution_config, + render_config=render_config, + operator_args=operator_args, + ) + assert converter + + +@pytest.mark.parametrize( + "execution_mode,operator_args", + [ + (ExecutionMode.KUBERNETES, {}), + # (ExecutionMode.DOCKER, {"image": "sample-image"}), + ], +) +@patch("cosmos.converter.DbtGraph.filtered_nodes", nodes) +@patch("cosmos.converter.DbtGraph.load") +def test_converter_fails_no_project_dir(mock_load_dbt_graph, execution_mode, operator_args): + """ + This test validates that a project, given a manifest path and project name, with seeds + is able to successfully generate a converter + """ + project_config = ProjectConfig(manifest_path=SAMPLE_DBT_MANIFEST.as_posix(), project_name="sample") + execution_config = ExecutionConfig(execution_mode=execution_mode) + render_config = RenderConfig(emit_datasets=True) + profile_config = ProfileConfig( + profile_name="my_profile_name", + target_name="my_target_name", + profiles_yml_filepath=SAMPLE_PROFILE_YML, + ) + with pytest.raises(CosmosValueError) as err_info: + DbtToAirflowConverter( + nodes=nodes, + project_config=project_config, + profile_config=profile_config, + execution_config=execution_config, + render_config=render_config, + operator_args=operator_args, + ) + assert err_info.value.args[0] == "A Project Path in ProjectConfig is required for generating a Task Operators."