diff --git a/cosmos/config.py b/cosmos/config.py index 9758d0378..746890f66 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -43,7 +43,6 @@ class RenderConfig: node_converters: dict[DbtResourceType, Callable[..., Any]] | None = None -@dataclass class ProjectConfig: """ Class for setting project config. @@ -58,33 +57,45 @@ class ProjectConfig: Required if dbt_project_path is not defined. Defaults to the folder name of dbt_project_path. """ - dbt_project_path: str | Path | None = None - models_relative_path: str | Path = "models" - seeds_relative_path: str | Path = "seeds" - snapshots_relative_path: str | Path = "snapshots" - manifest_path: str | Path | None = None - project_name: str | None = None + dbt_project_path: Path | None = None + manifest_path: Path | None = None + models_path: Path | None = None + seeds_path: Path | None = None + snapshots_path: Path | None = None + project_name: str + + def __init__( + self, + dbt_project_path: str | Path | None = None, + models_relative_path: str | Path = "models", + seeds_relative_path: str | Path = "seeds", + snapshots_relative_path: str | Path = "snapshots", + manifest_path: str | Path | None = None, + project_name: str | None = None, + ): + if not dbt_project_path: + if not manifest_path or not project_name: + raise CosmosValueError( + "ProjectConfig requires dbt_project_path and/or manifest_path to be defined." + " If only manifest_path is defined, project_name must also be defined." + ) + if project_name: + self.project_name = project_name - @cached_property - def parsed_dbt_project_path(self) -> Path | None: - return Path(self.dbt_project_path) if self.dbt_project_path else None + if dbt_project_path: + self.dbt_project_path = Path(dbt_project_path) + self.models_path = self.dbt_project_path / Path(models_relative_path) + self.seeds_path = self.dbt_project_path / Path(seeds_relative_path) + self.snapshots_path = self.dbt_project_path / Path(snapshots_relative_path) + if not project_name: + self.project_name = self.dbt_project_path.stem - @cached_property - def parsed_manifest_path(self) -> Path | None: - return Path(self.manifest_path) if self.manifest_path else None + if manifest_path: + self.manifest_path = Path(manifest_path) @cached_property def dbt_project_path_parent(self) -> Path | None: - return self.parsed_dbt_project_path.parent if self.parsed_dbt_project_path else None - - def __post_init__(self) -> None: - "Converts paths to `Path` objects." - if self.parsed_dbt_project_path: - self.models_relative_path = self.parsed_dbt_project_path / Path(self.models_relative_path) - self.seeds_relative_path = self.parsed_dbt_project_path / Path(self.seeds_relative_path) - self.snapshots_relative_path = self.parsed_dbt_project_path / Path(self.snapshots_relative_path) - if not self.project_name: - self.project_name = self.parsed_dbt_project_path.stem + return self.dbt_project_path.parent if self.dbt_project_path else None def validate_project(self) -> None: """ @@ -98,20 +109,14 @@ def validate_project(self) -> None: mandatory_paths = {} - if self.parsed_dbt_project_path: - project_yml_path = self.parsed_dbt_project_path / "dbt_project.yml" + if self.dbt_project_path: + project_yml_path = self.dbt_project_path / "dbt_project.yml" mandatory_paths = { "dbt_project.yml": project_yml_path, - "models directory ": self.models_relative_path, + "models directory ": self.models_path, } - elif self.parsed_manifest_path: - if not self.project_name: - raise CosmosValueError( - "project_name required when manifest_path is present and dbt_project_path is not." - ) - mandatory_paths = {"manifest file": self.parsed_manifest_path} - else: - raise CosmosValueError("dbt_project_path or manifest_path are required parameters.") + if self.manifest_path: + mandatory_paths["manifest"] = self.manifest_path for name, path in mandatory_paths.items(): if path is None or not Path(path).exists(): @@ -121,10 +126,10 @@ def is_manifest_available(self) -> bool: """ Check if the `dbt` project manifest is set and if the file exists. """ - if not self.parsed_manifest_path: + if not self.manifest_path: return False - return self.parsed_manifest_path.exists() + return self.manifest_path.exists() @dataclass @@ -163,6 +168,12 @@ def validate_profile(self) -> None: if not self.profiles_yml_filepath and not self.profile_mapping: raise CosmosValueError("Either profiles_yml_filepath or profile_mapping must be set to render a profile") + def is_profile_yml_available(self) -> bool: + """ + Check if the `dbt` profiles.yml file exists. + """ + return Path(self.profiles_yml_filepath).exists() if self.profiles_yml_filepath else False + @contextlib.contextmanager def ensure_profile( self, desired_profile_path: Path | None = None, use_mock_values: bool = False diff --git a/cosmos/converter.py b/cosmos/converter.py index ca4f0381f..a0ede8c56 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -5,14 +5,12 @@ import inspect from typing import Any, Callable -from pathlib import Path from airflow.models.dag import DAG from airflow.utils.task_group import TaskGroup from cosmos.airflow.graph import build_airflow_graph from cosmos.dbt.graph import DbtGraph -from cosmos.dbt.project import DbtProject from cosmos.dbt.selector import retrieve_by_label from cosmos.config import ProjectConfig, ExecutionConfig, RenderConfig, ProfileConfig from cosmos.exceptions import CosmosValueError @@ -93,7 +91,7 @@ class DbtToAirflowConverter: def __init__( self, project_config: ProjectConfig, - profile_config: ProfileConfig, + profile_config: ProfileConfig = None, execution_config: ExecutionConfig = ExecutionConfig(), render_config: RenderConfig = RenderConfig(), dag: DAG | None = None, @@ -106,10 +104,6 @@ def __init__( project_config.validate_project() emit_datasets = render_config.emit_datasets - dbt_project_name = project_config.project_name - dbt_models_dir = project_config.models_relative_path - dbt_seeds_dir = project_config.seeds_relative_path - dbt_snapshots_dir = project_config.snapshots_relative_path test_behavior = render_config.test_behavior select = render_config.select exclude = render_config.exclude @@ -117,10 +111,12 @@ def __init__( execution_mode = execution_config.execution_mode test_indirect_selection = execution_config.test_indirect_selection load_mode = render_config.load_method - manifest_path = project_config.parsed_manifest_path dbt_executable_path = execution_config.dbt_executable_path node_converters = render_config.node_converters + if not profile_config: + profile_config = ProfileConfig(profiles_yml_filepath=project_config.dbt_project_path / "profiles.yml") + profile_args = {} if profile_config.profile_mapping: profile_args = profile_config.profile_mapping.profile_args @@ -128,17 +124,18 @@ def __init__( if not operator_args: operator_args = {} - dbt_project = DbtProject( - name=dbt_project_name, - root_dir=project_config.dbt_project_path_parent, - models_dir=Path(dbt_models_dir) if dbt_models_dir else None, - seeds_dir=Path(dbt_seeds_dir) if dbt_seeds_dir else None, - snapshots_dir=Path(dbt_snapshots_dir) if dbt_snapshots_dir else None, - manifest_path=manifest_path, - ) - + # Previously, we were creating a cosmos.dbt.project.DbtProject + # DbtProject has now been replaced with ProjectConfig directly + # since the interface of the two classes were effectively the same + # Under this previous implementation, we were passing: + # - name, root dir, models dir, snapshots dir and manifest path + # Internally in the dbtProject class, we were defaulting the profile_path + # To be root dir/profiles.yml + # To keep this logic working, if converter is given no ProfileConfig, + # we can create a default retaining this value to preserve this functionality. + # We may want to consider defaulting this value in our actual ProjceConfig class? dbt_graph = DbtGraph( - project=dbt_project, + project=project_config, exclude=exclude, select=select, dbt_cmd=dbt_executable_path, @@ -151,7 +148,7 @@ def __init__( task_args = { **operator_args, # the following args may be only needed for local / venv: - "project_dir": dbt_project.dir, + "project_dir": project_config.dbt_project_path, "profile_config": profile_config, "emit_datasets": emit_datasets, } @@ -168,7 +165,7 @@ def __init__( task_args=task_args, test_behavior=test_behavior, test_indirect_selection=test_indirect_selection, - dbt_project_name=dbt_project.name, + dbt_project_name=project_config.project_name, on_warning_callback=on_warning_callback, node_converters=node_converters, ) diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index 433f469bd..ac92c46f5 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -10,7 +10,7 @@ from subprocess import PIPE, Popen from typing import Any -from cosmos.config import ProfileConfig +from cosmos.config import ProfileConfig, ProjectConfig from cosmos.constants import ( DBT_LOG_DIR_NAME, DBT_LOG_FILENAME, @@ -22,8 +22,7 @@ LoadMode, ) from cosmos.dbt.executable import get_system_dbt -from cosmos.dbt.parser.project import DbtProject as LegacyDbtProject -from cosmos.dbt.project import DbtProject +from cosmos.dbt.parser.project import LegacyDbtProject from cosmos.dbt.selector import select_nodes from cosmos.log import get_logger @@ -64,7 +63,7 @@ class DbtGraph: Example of how to use: dbt_graph = DbtGraph( - project=DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR), + project=ProjectConfig(dbt_project_path=DBT_PROJECT_PATH), exclude=["*orders*"], select=[], dbt_cmd="/usr/local/bin/dbt", @@ -77,11 +76,11 @@ class DbtGraph: def __init__( self, - project: DbtProject, + project: ProjectConfig, + profile_config: ProfileConfig | None = None, exclude: list[str] | None = None, select: list[str] | None = None, dbt_cmd: str = get_system_dbt(), - profile_config: ProfileConfig | None = None, operator_args: dict[str, Any] | None = None, dbt_deps: bool | None = True, ): @@ -122,7 +121,11 @@ def load( if self.project.is_manifest_available(): self.load_from_dbt_manifest() else: - if execution_mode == ExecutionMode.LOCAL and self.project.is_profile_yml_available(): + if ( + execution_mode == ExecutionMode.LOCAL + and self.profile_config + and self.profile_config.is_profile_yml_available() + ): try: self.load_via_dbt_ls() except FileNotFoundError: @@ -144,9 +147,13 @@ def load_via_dbt_ls(self) -> None: * self.nodes * self.filtered_nodes """ - logger.info("Trying to parse the dbt project `%s` in `%s` using dbt ls...", self.project.name, self.project.dir) + logger.info( + "Trying to parse the dbt project `%s` in `%s` using dbt ls...", + self.project.project_name, + self.project.dbt_project_path, + ) - if not self.project.dir or not self.profile_config: + if not self.project.dbt_project_path or not self.profile_config: raise CosmosLoadDbtException("Unable to load dbt project without project files and a profile config") if not shutil.which(self.dbt_cmd): @@ -158,16 +165,20 @@ def load_via_dbt_ls(self) -> None: env.update(env_vars) with tempfile.TemporaryDirectory() as tmpdir: - logger.info("Content of the dbt project dir <%s>: `%s`", self.project.dir, os.listdir(self.project.dir)) - logger.info("Creating symlinks from %s to `%s`", self.project.dir, tmpdir) + logger.info( + "Content of the dbt project dir <%s>: `%s`", + self.project.dbt_project_path, + os.listdir(self.project.dbt_project_path), + ) + logger.info("Creating symlinks from %s to `%s`", self.project.dbt_project_path, tmpdir) # We create symbolic links to the original directory files and directories. # This allows us to run the dbt command from within the temporary directory, outputting any necessary # artifact and also allow us to run `dbt deps` tmpdir_path = Path(tmpdir) ignore_paths = (DBT_LOG_DIR_NAME, DBT_TARGET_DIR_NAME, "dbt_packages", "profiles.yml") - for child_name in os.listdir(self.project.dir): + for child_name in os.listdir(self.project.dbt_project_path): if child_name not in ignore_paths: - os.symlink(self.project.dir / child_name, tmpdir_path / child_name) + os.symlink(self.project.dbt_project_path / child_name, tmpdir_path / child_name) local_flags = [ "--project-dir", @@ -259,7 +270,7 @@ def load_via_dbt_ls(self) -> None: unique_id=node_dict["unique_id"], resource_type=DbtResourceType(node_dict["resource_type"]), depends_on=node_dict.get("depends_on", {}).get("nodes", []), - file_path=self.project.dir / node_dict["original_file_path"], + file_path=self.project.dbt_project_path / node_dict["original_file_path"], tags=node_dict["tags"], config=node_dict["config"], ) @@ -286,16 +297,16 @@ def load_via_custom_parser(self) -> None: * self.nodes * self.filtered_nodes """ - logger.info("Trying to parse the dbt project `%s` using a custom Cosmos method...", self.project.name) + logger.info("Trying to parse the dbt project `%s` using a custom Cosmos method...", self.project.project_name) - if not self.project.dir: + if not self.project.dbt_project_path or not self.project.models_path or not self.project.seeds_path: raise CosmosLoadDbtException("Unable to load dbt project without project files") project = LegacyDbtProject( - dbt_root_path=str(self.project.root_dir), - dbt_models_dir=self.project.models_dir.stem if self.project.models_dir else None, - dbt_seeds_dir=self.project.seeds_dir.stem if self.project.seeds_dir else None, - project_name=self.project.name, + project_name=self.project.dbt_project_path.stem, + dbt_root_path=self.project.dbt_project_path.parent.as_posix(), + dbt_models_dir=self.project.models_path.stem, + dbt_seeds_dir=self.project.seeds_path.stem, operator_args=self.operator_args, ) nodes = {} @@ -317,7 +328,7 @@ def load_via_custom_parser(self) -> None: self.nodes = nodes self.filtered_nodes = select_nodes( - project_dir=self.project.dir, nodes=nodes, select=self.select, exclude=self.exclude + project_dir=self.project.dbt_project_path, nodes=nodes, select=self.select, exclude=self.exclude ) self.update_node_dependency() @@ -339,7 +350,7 @@ def load_from_dbt_manifest(self) -> None: * self.nodes * self.filtered_nodes """ - logger.info("Trying to parse the dbt project `%s` using a dbt manifest...", self.project.name) + logger.info("Trying to parse the dbt project `%s` using a dbt manifest...", self.project.project_name) if not self.project.is_manifest_available(): raise CosmosLoadDbtException(f"Unable to load manifest using {self.project.manifest_path}") @@ -355,7 +366,9 @@ def load_from_dbt_manifest(self) -> None: unique_id=unique_id, resource_type=DbtResourceType(node_dict["resource_type"]), depends_on=node_dict.get("depends_on", {}).get("nodes", []), - file_path=self.project.dir / node_dict["original_file_path"], + file_path=self.project.dbt_project_path / Path(node_dict["original_file_path"]) + if self.project.dbt_project_path + else Path(node_dict["original_file_path"]), tags=node_dict["tags"], config=node_dict["config"], ) @@ -364,7 +377,7 @@ def load_from_dbt_manifest(self) -> None: self.nodes = nodes self.filtered_nodes = select_nodes( - project_dir=self.project.dir, nodes=nodes, select=self.select, exclude=self.exclude + project_dir=self.project.dbt_project_path, nodes=nodes, select=self.select, exclude=self.exclude ) self.update_node_dependency() diff --git a/cosmos/dbt/parser/project.py b/cosmos/dbt/parser/project.py index bb86d41a6..e154bb0ed 100644 --- a/cosmos/dbt/parser/project.py +++ b/cosmos/dbt/parser/project.py @@ -233,7 +233,7 @@ def __repr__(self) -> str: @dataclass -class DbtProject: +class LegacyDbtProject: """ Represents a single dbt project. """ diff --git a/cosmos/dbt/project.py b/cosmos/dbt/project.py deleted file mode 100644 index da5c4284b..000000000 --- a/cosmos/dbt/project.py +++ /dev/null @@ -1,53 +0,0 @@ -from __future__ import annotations -from dataclasses import dataclass -from pathlib import Path - - -DEFAULT_PROFILE_FILE_NAME = "profiles.yml" - - -@dataclass -class DbtProject: - name: str - root_dir: Path | None = None - models_dir: Path | None = None - seeds_dir: Path | None = None - snapshots_dir: Path | None = None - manifest_path: Path | None = None - profile_path: Path | None = None - _cosmos_created_profile_file: bool = False - - def __post_init__(self) -> None: - """ - Since ProjectConfig does not require the dbt_project_path to be defined - DbtProject should also no longer require root_dir or any dependent paths - The project should be renderable with only a manifest.json - """ - if self.dir: - if self.models_dir is None: - self.models_dir = self.dir / "models" - if self.seeds_dir is None: - self.seeds_dir = self.dir / "seeds" - if self.snapshots_dir is None: - self.snapshots_dir = self.dir / "snapshots" - if self.profile_path is None: - self.profile_path = self.dir / "profiles.yml" - - @property - def dir(self) -> Path | None: - """ - Path to dbt pipeline, defined by self.root_dir and self.name. - """ - return self.root_dir / self.name if self.root_dir else None - - def is_manifest_available(self) -> bool: - """ - Check if the `dbt` project manifest is set and if the file exists. - """ - return self.manifest_path is not None and Path(self.manifest_path).exists() - - def is_profile_yml_available(self) -> bool: - """ - Check if the `dbt` profiles.yml file exists. - """ - return Path(self.profile_path).exists() if self.profile_path else False diff --git a/cosmos/dbt/selector.py b/cosmos/dbt/selector.py index 72b794b73..0e6034a0b 100644 --- a/cosmos/dbt/selector.py +++ b/cosmos/dbt/selector.py @@ -63,10 +63,11 @@ def load_from_statement(self, statement: str) -> None: items = statement.split(",") for item in items: if item.startswith(PATH_SELECTOR): - if not self.project_dir: - raise CosmosValueError("Can not select by path when no project directory is provided") index = len(PATH_SELECTOR) - self.paths.append(self.project_dir / item[index:]) + if self.project_dir: + self.paths.append(self.project_dir / Path(item[index:])) + else: + self.paths.append(Path(item[index:])) elif item.startswith(TAG_SELECTOR): index = len(TAG_SELECTOR) self.tags.append(item[index:]) diff --git a/tests/dbt/parser/test_project.py b/tests/dbt/parser/test_project.py index 31a79ceba..4f13a3eb3 100644 --- a/tests/dbt/parser/test_project.py +++ b/tests/dbt/parser/test_project.py @@ -10,7 +10,7 @@ import pytest import yaml -from cosmos.dbt.parser.project import DbtModel, DbtModelType, DbtProject +from cosmos.dbt.parser.project import DbtModel, DbtModelType, LegacyDbtProject DBT_PROJECT_PATH = Path(__name__).parent.parent.parent.parent.parent / "dev/dags/dbt/" SAMPLE_CSV_PATH = DBT_PROJECT_PATH / "jaffle_shop/seeds/raw_customers.csv" @@ -19,8 +19,8 @@ SAMPLE_YML_PATH = DBT_PROJECT_PATH / "jaffle_shop/models/schema.yml" -def test_dbtproject__handle_csv_file(): - dbt_project = DbtProject( +def test_LegacyDbtProject__handle_csv_file(): + dbt_project = LegacyDbtProject( project_name="jaffle_shop", ) assert not dbt_project.seeds @@ -33,8 +33,8 @@ def test_dbtproject__handle_csv_file(): assert raw_customers.path == SAMPLE_CSV_PATH -def test_dbtproject__handle_sql_file_model(): - dbt_project = DbtProject( +def test_LegacyDbtProject__handle_sql_file_model(): + dbt_project = LegacyDbtProject( project_name="jaffle_shop", dbt_root_path=DBT_PROJECT_PATH, ) @@ -49,8 +49,8 @@ def test_dbtproject__handle_sql_file_model(): assert raw_customers.path == SAMPLE_MODEL_SQL_PATH -def test_dbtproject__handle_sql_file_snapshot(): - dbt_project = DbtProject( +def test_LegacyDbtProject__handle_sql_file_snapshot(): + dbt_project = LegacyDbtProject( project_name="jaffle_shop", dbt_root_path=DBT_PROJECT_PATH, ) @@ -65,8 +65,8 @@ def test_dbtproject__handle_sql_file_snapshot(): assert raw_customers.path == SAMPLE_SNAPSHOT_SQL_PATH -def test_dbtproject__handle_config_file(): - dbt_project = DbtProject( +def test_LegacyDbtProject__handle_config_file(): + dbt_project = LegacyDbtProject( project_name="jaffle_shop", dbt_root_path=DBT_PROJECT_PATH, ) @@ -81,25 +81,25 @@ def test_dbtproject__handle_config_file(): assert sample_test.path == SAMPLE_YML_PATH -def test_dbtproject__handle_config_file_empty_file(): +def test_LegacyDbtProject__handle_config_file_empty_file(): with NamedTemporaryFile("w") as tmp_fp: tmp_fp.flush() sample_config_file_path = Path(tmp_fp.name) - dbt_project = DbtProject(project_name="empty_project") + dbt_project = LegacyDbtProject(project_name="empty_project") assert not dbt_project.models dbt_project._handle_config_file(sample_config_file_path) assert not dbt_project.models -def test_dbtproject__handle_config_file_with_unknown_name(): +def test_LegacyDbtProject__handle_config_file_with_unknown_name(): yaml_data = {"models": [{"name": "unknown"}]} with NamedTemporaryFile("w") as tmp_fp: yaml.dump(yaml_data, tmp_fp) tmp_fp.flush() sample_config_file_path = Path(tmp_fp.name) - dbt_project = DbtProject(project_name="empty_project") + dbt_project = LegacyDbtProject(project_name="empty_project") assert not dbt_project.models dbt_project._handle_config_file(sample_config_file_path) assert not dbt_project.models @@ -112,8 +112,8 @@ def test_dbtproject__handle_config_file_with_unknown_name(): (["tag1", "tag2"], {"materialized:view", "tags:tag1", "tags:tag2"}), ], ) -def test_dbtproject__handle_config_file_with_selector(input_tags, expected_config_selectors): - dbt_project = DbtProject( +def test_LegacyDbtProject__handle_config_file_with_selector(input_tags, expected_config_selectors): + dbt_project = LegacyDbtProject( project_name="jaffle_shop", dbt_root_path=DBT_PROJECT_PATH, ) diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index ab4858c72..f4a48f2ac 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -5,14 +5,13 @@ import pytest -from cosmos.config import ProfileConfig +from cosmos.config import ProfileConfig, ProjectConfig from cosmos.constants import DbtResourceType, ExecutionMode from cosmos.dbt.graph import CosmosLoadDbtException, DbtGraph, LoadMode -from cosmos.dbt.project import DbtProject from cosmos.profiles import PostgresUserPasswordProfileMapping DBT_PROJECTS_ROOT_DIR = Path(__file__).parent.parent.parent / "dev/dags/dbt" -DBT_PIPELINE_NAME = "jaffle_shop" +DBT_PROJECT_NAME = "jaffle_shop" SAMPLE_MANIFEST = Path(__file__).parent.parent / "sample/manifest.json" SAMPLE_MANIFEST_PY = Path(__file__).parent.parent / "sample/manifest_python.json" SAMPLE_MANIFEST_MODEL_VERSION = Path(__file__).parent.parent / "sample/manifest_model_version.json" @@ -24,10 +23,10 @@ def tmp_dbt_project_dir(): """ Creates a plain dbt project structure, which does not contain logs or target folders. """ - source_proj_dir = DBT_PROJECTS_ROOT_DIR / DBT_PIPELINE_NAME + source_proj_dir = DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME tmp_dir = Path(tempfile.mkdtemp()) - target_proj_dir = tmp_dir / DBT_PIPELINE_NAME + target_proj_dir = tmp_dir / DBT_PROJECT_NAME shutil.copytree(source_proj_dir, target_proj_dir) shutil.rmtree(target_proj_dir / "logs", ignore_errors=True) shutil.rmtree(target_proj_dir / "target", ignore_errors=True) @@ -37,12 +36,19 @@ def tmp_dbt_project_dir(): @pytest.mark.parametrize( - "pipeline_name,manifest_filepath,model_filepath", - [("jaffle_shop", SAMPLE_MANIFEST, "customers.sql"), ("jaffle_shop_python", SAMPLE_MANIFEST_PY, "customers.py")], + "project_name,manifest_filepath,model_filepath", + [(DBT_PROJECT_NAME, SAMPLE_MANIFEST, "customers.sql"), ("jaffle_shop_python", SAMPLE_MANIFEST_PY, "customers.py")], ) -def test_load_via_manifest_with_exclude(pipeline_name, manifest_filepath, model_filepath): - dbt_project = DbtProject(name=pipeline_name, root_dir=DBT_PROJECTS_ROOT_DIR, manifest_path=manifest_filepath) - dbt_graph = DbtGraph(project=dbt_project, exclude=["config.materialized:table"]) +def test_load_via_manifest_with_exclude(project_name, manifest_filepath, model_filepath): + project_config = ProjectConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name, manifest_path=manifest_filepath + ) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", + ) + dbt_graph = DbtGraph(project=project_config, profile_config=profile_config, exclude=["config.materialized:table"]) dbt_graph.load_from_dbt_manifest() assert len(dbt_graph.nodes) == 28 @@ -58,13 +64,20 @@ def test_load_via_manifest_with_exclude(pipeline_name, manifest_filepath, model_ "model.jaffle_shop.stg_orders", "model.jaffle_shop.stg_payments", ] - assert sample_node.file_path == DBT_PROJECTS_ROOT_DIR / f"{pipeline_name}/models/{model_filepath}" + assert sample_node.file_path == DBT_PROJECTS_ROOT_DIR / f"{project_name}/models/{model_filepath}" @patch("cosmos.dbt.graph.DbtGraph.load_from_dbt_manifest", return_value=None) def test_load_automatic_manifest_is_available(mock_load_from_dbt_manifest): - dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR, manifest_path=SAMPLE_MANIFEST) - dbt_graph = DbtGraph(project=dbt_project) + project_config = ProjectConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, manifest_path=SAMPLE_MANIFEST + ) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", + ) + dbt_graph = DbtGraph(project=project_config, profile_config=profile_config) dbt_graph.load(execution_mode=ExecutionMode.LOCAL) assert mock_load_from_dbt_manifest.called @@ -72,8 +85,15 @@ def test_load_automatic_manifest_is_available(mock_load_from_dbt_manifest): @patch("cosmos.dbt.graph.DbtGraph.load_via_custom_parser", side_effect=FileNotFoundError()) @patch("cosmos.dbt.graph.DbtGraph.load_via_dbt_ls", return_value=None) def test_load_automatic_without_manifest(mock_load_via_dbt_ls, mock_load_via_custom_parser): - dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR, manifest_path="/tmp/manifest.json") - dbt_graph = DbtGraph(project=dbt_project) + project_config = ProjectConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, manifest_path="/tmp/manifest.json" + ) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", + ) + dbt_graph = DbtGraph(project=project_config, profile_config=profile_config) dbt_graph.load(execution_mode=ExecutionMode.LOCAL) assert mock_load_via_dbt_ls.called assert not mock_load_via_custom_parser.called @@ -82,16 +102,26 @@ def test_load_automatic_without_manifest(mock_load_via_dbt_ls, mock_load_via_cus @patch("cosmos.dbt.graph.DbtGraph.load_via_custom_parser", return_value=None) @patch("cosmos.dbt.graph.DbtGraph.load_via_dbt_ls", side_effect=FileNotFoundError()) def test_load_automatic_without_manifest_and_without_dbt_cmd(mock_load_via_dbt_ls, mock_load_via_custom_parser): - dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR) - dbt_graph = DbtGraph(project=dbt_project) + project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", + ) + dbt_graph = DbtGraph(project=project_config, profile_config=profile_config) dbt_graph.load(execution_mode=ExecutionMode.LOCAL, method=LoadMode.AUTOMATIC) assert mock_load_via_dbt_ls.called assert mock_load_via_custom_parser.called def test_load_manifest_without_manifest(): - dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR) - dbt_graph = DbtGraph(project=dbt_project) + project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", + ) + dbt_graph = DbtGraph(project=project_config, profile_config=profile_config) with pytest.raises(CosmosLoadDbtException) as err_info: dbt_graph.load(execution_mode=ExecutionMode.LOCAL, method=LoadMode.DBT_MANIFEST) assert err_info.value.args[0] == "Unable to load manifest using None" @@ -99,8 +129,15 @@ def test_load_manifest_without_manifest(): @patch("cosmos.dbt.graph.DbtGraph.load_from_dbt_manifest", return_value=None) def test_load_manifest_with_manifest(mock_load_from_dbt_manifest): - dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR, manifest_path=SAMPLE_MANIFEST) - dbt_graph = DbtGraph(project=dbt_project) + project_config = ProjectConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, manifest_path=SAMPLE_MANIFEST + ) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", + ) + dbt_graph = DbtGraph(project=project_config, profile_config=profile_config) dbt_graph.load(execution_mode=ExecutionMode.LOCAL, method=LoadMode.DBT_MANIFEST) assert mock_load_from_dbt_manifest.called @@ -122,8 +159,13 @@ def test_load_manifest_with_manifest(mock_load_from_dbt_manifest): def test_load( mock_load_from_dbt_manifest, mock_load_via_dbt_ls, mock_load_via_custom_parser, exec_mode, method, expected_function ): - dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR) - dbt_graph = DbtGraph(project=dbt_project) + project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", + ) + dbt_graph = DbtGraph(project=project_config, profile_config=profile_config) dbt_graph.load(method=method, execution_mode=exec_mode) load_function = locals()[expected_function] @@ -138,9 +180,9 @@ def test_load_via_dbt_ls_does_not_create_target_logs_in_original_folder(mock_pop assert not (tmp_dbt_project_dir / "target").exists() assert not (tmp_dbt_project_dir / "logs").exists() - dbt_project = DbtProject(name=DBT_PIPELINE_NAME, root_dir=tmp_dbt_project_dir) + project_config = ProjectConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) dbt_graph = DbtGraph( - project=dbt_project, + project=project_config, profile_config=ProfileConfig( profile_name="default", target_name="default", @@ -155,15 +197,15 @@ def test_load_via_dbt_ls_does_not_create_target_logs_in_original_folder(mock_pop assert not (tmp_dbt_project_dir / "logs").exists() used_cwd = Path(mock_popen.call_args[0][0][-5]) - assert used_cwd != dbt_project.dir + assert used_cwd != project_config.dir assert not used_cwd.exists() @pytest.mark.integration def test_load_via_dbt_ls_with_exclude(): - dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR) + project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) dbt_graph = DbtGraph( - project=dbt_project, + project=project_config, select=["*customers*"], exclude=["*orders*"], profile_config=ProfileConfig( @@ -204,11 +246,11 @@ def test_load_via_dbt_ls_with_exclude(): @pytest.mark.integration -@pytest.mark.parametrize("pipeline_name", ("jaffle_shop", "jaffle_shop_python")) -def test_load_via_dbt_ls_without_exclude(pipeline_name): - dbt_project = DbtProject(name=pipeline_name, root_dir=DBT_PROJECTS_ROOT_DIR) +@pytest.mark.parametrize("project_name", ("jaffle_shop", "jaffle_shop_python")) +def test_load_via_dbt_ls_without_exclude(project_name): + project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name) dbt_graph = DbtGraph( - project=dbt_project, + project=project_config, profile_config=ProfileConfig( profile_name="default", target_name="default", @@ -225,11 +267,8 @@ def test_load_via_dbt_ls_without_exclude(pipeline_name): def test_load_via_dbt_ls_without_profile(): - dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR) - dbt_graph = DbtGraph( - dbt_cmd="/inexistent/dbt", - project=dbt_project, - ) + project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + dbt_graph = DbtGraph(dbt_cmd="/inexistent/dbt", project=project_config) with pytest.raises(CosmosLoadDbtException) as err_info: dbt_graph.load_via_dbt_ls() @@ -238,11 +277,11 @@ def test_load_via_dbt_ls_without_profile(): def test_load_via_dbt_ls_with_invalid_dbt_path(): - dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR) + project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) with patch("pathlib.Path.exists", return_value=True): dbt_graph = DbtGraph( dbt_cmd="/inexistent/dbt", - project=dbt_project, + project=project_config, profile_config=ProfileConfig( profile_name="default", target_name="default", @@ -260,18 +299,17 @@ def test_load_via_dbt_ls_with_invalid_dbt_path(): @pytest.mark.parametrize("load_method", ["load_via_dbt_ls", "load_from_dbt_manifest"]) @pytest.mark.integration def test_load_via_dbt_ls_with_sources(load_method): - pipeline_name = "simple" + project_name = "simple" dbt_graph = DbtGraph( dbt_deps=False, - project=DbtProject( - name=pipeline_name, - root_dir=DBT_PROJECTS_ROOT_DIR, + project=ProjectConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name, manifest_path=SAMPLE_MANIFEST_SOURCE if load_method == "load_from_dbt_manifest" else None, ), profile_config=ProfileConfig( profile_name="simple", target_name="dev", - profiles_yml_filepath=(DBT_PROJECTS_ROOT_DIR / pipeline_name / "profiles.yml"), + profiles_yml_filepath=(DBT_PROJECTS_ROOT_DIR / project_name / "profiles.yml"), ), ) getattr(dbt_graph, load_method)() @@ -282,11 +320,10 @@ def test_load_via_dbt_ls_with_sources(load_method): @pytest.mark.integration def test_load_via_dbt_ls_without_dbt_deps(): - pipeline_name = "jaffle_shop" - dbt_project = DbtProject(name=pipeline_name, root_dir=DBT_PROJECTS_ROOT_DIR) + project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) dbt_graph = DbtGraph( dbt_deps=False, - project=dbt_project, + project=project_config, profile_config=ProfileConfig( profile_name="default", target_name="default", @@ -310,9 +347,9 @@ def test_load_via_dbt_ls_with_zero_returncode_and_non_empty_stderr(mock_popen, t mock_popen().communicate.return_value = ("", "Some stderr warnings") mock_popen().returncode = 0 - dbt_project = DbtProject(name=DBT_PIPELINE_NAME, root_dir=tmp_dbt_project_dir) + project_config = ProjectConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) dbt_graph = DbtGraph( - project=dbt_project, + project=project_config, profile_config=ProfileConfig( profile_name="default", target_name="default", @@ -332,9 +369,9 @@ def test_load_via_dbt_ls_with_non_zero_returncode(mock_popen): mock_popen().communicate.return_value = ("", "Some stderr message") mock_popen().returncode = 1 - dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR) + project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) dbt_graph = DbtGraph( - project=dbt_project, + project=project_config, profile_config=ProfileConfig( profile_name="default", target_name="default", @@ -355,9 +392,9 @@ def test_load_via_dbt_ls_with_non_zero_returncode(mock_popen): @patch("cosmos.dbt.graph.Popen.communicate", return_value=("Some Runtime Error", "")) def test_load_via_dbt_ls_with_runtime_error_in_stdout(mock_popen_communicate): # It may seem strange, but at least until dbt 1.6.0, there are circumstances when it outputs errors to stdout - dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR) + project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) dbt_graph = DbtGraph( - project=dbt_project, + project=project_config, profile_config=ProfileConfig( profile_name="default", target_name="default", @@ -375,13 +412,15 @@ def test_load_via_dbt_ls_with_runtime_error_in_stdout(mock_popen_communicate): mock_popen_communicate.assert_called_once() -@pytest.mark.parametrize("pipeline_name", ("jaffle_shop", "jaffle_shop_python")) -def test_load_via_load_via_custom_parser(pipeline_name): - dbt_project = DbtProject( - name=pipeline_name, - root_dir=DBT_PROJECTS_ROOT_DIR, +@pytest.mark.parametrize("project_name", ("jaffle_shop", "jaffle_shop_python")) +def test_load_via_load_via_custom_parser(project_name): + project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", ) - dbt_graph = DbtGraph(project=dbt_project) + dbt_graph = DbtGraph(project=project_config, profile_config=profile_config) dbt_graph.load_via_custom_parser() @@ -391,16 +430,30 @@ def test_load_via_load_via_custom_parser(pipeline_name): @patch("cosmos.dbt.graph.DbtGraph.update_node_dependency", return_value=None) def test_update_node_dependency_called(mock_update_node_dependency): - dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR, manifest_path=SAMPLE_MANIFEST) - dbt_graph = DbtGraph(project=dbt_project) + project_config = ProjectConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, manifest_path=SAMPLE_MANIFEST + ) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", + ) + dbt_graph = DbtGraph(project=project_config, profile_config=profile_config) dbt_graph.load() assert mock_update_node_dependency.called def test_update_node_dependency_target_exist(): - dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR, manifest_path=SAMPLE_MANIFEST) - dbt_graph = DbtGraph(project=dbt_project) + project_config = ProjectConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, manifest_path=SAMPLE_MANIFEST + ) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", + ) + dbt_graph = DbtGraph(project=project_config, profile_config=profile_config) dbt_graph.load() for _, nodes in dbt_graph.nodes.items(): @@ -410,8 +463,15 @@ def test_update_node_dependency_target_exist(): def test_update_node_dependency_test_not_exist(): - dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR, manifest_path=SAMPLE_MANIFEST) - dbt_graph = DbtGraph(project=dbt_project, exclude=["config.materialized:test"]) + project_config = ProjectConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, manifest_path=SAMPLE_MANIFEST + ) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", + ) + dbt_graph = DbtGraph(project=project_config, profile_config=profile_config, exclude=["config.materialized:test"]) dbt_graph.load_from_dbt_manifest() for _, nodes in dbt_graph.filtered_nodes.items(): @@ -422,9 +482,8 @@ def test_update_node_dependency_test_not_exist(): @pytest.mark.parametrize("load_method", ["load_via_dbt_ls", "load_from_dbt_manifest"]) def test_load_dbt_ls_and_manifest_with_model_version(load_method): dbt_graph = DbtGraph( - project=DbtProject( - name="model_version", - root_dir=DBT_PROJECTS_ROOT_DIR, + project_config=ProjectConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / "model_version", manifest_path=SAMPLE_MANIFEST_MODEL_VERSION if load_method == "load_from_dbt_manifest" else None, ), profile_config=ProfileConfig( diff --git a/tests/test_config.py b/tests/test_config.py index 9dd936fba..769649a86 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -10,29 +10,56 @@ PIPELINE_FOLDER = "jaffle_shop" -def test_valid_parameters(): +def test_init_with_project_path_only(): + """ + Passing dbt_project_path on its own should create a valid ProjectConfig with relative paths defined + It should also have a project_name based on the path + """ project_config = ProjectConfig(dbt_project_path="path/to/dbt/project") - assert project_config.parsed_dbt_project_path == Path("path/to/dbt/project") - assert project_config.models_relative_path == Path("path/to/dbt/project/models") - assert project_config.seeds_relative_path == Path("path/to/dbt/project/seeds") - assert project_config.snapshots_relative_path == Path("path/to/dbt/project/snapshots") + assert project_config.dbt_project_path == Path("path/to/dbt/project") + assert project_config.models_path == Path("path/to/dbt/project/models") + assert project_config.seeds_path == Path("path/to/dbt/project/seeds") + assert project_config.snapshots_path == Path("path/to/dbt/project/snapshots") + assert project_config.project_name == "project" assert project_config.manifest_path is None def test_init_with_manifest_path_and_project_path_succeeds(): """ - Passing a manifest path AND project path together should succeed, as previous + Passing a manifest path AND project path together should succeed + project_name in this case should be based on dbt_project_path """ project_config = ProjectConfig(dbt_project_path="/tmp/some-path", manifest_path="target/manifest.json") - assert project_config.parsed_manifest_path == Path("target/manifest.json") + assert project_config.manifest_path == Path("target/manifest.json") + assert project_config.project_name == "some-path" -def test_init_with_manifest_path_and_not_project_path_succeeds(): +def test_init_with_no_params(): """ - Since dbt_project_path is optional, we should be able to operate with only a manifest + The constructor now validates that the required base fields are present + As such, we should test here that the correct exception is raised if these are not correctly defined + This functionality has been moved from the validate method """ - project_config = ProjectConfig(manifest_path="target/manifest.json") - assert project_config.parsed_manifest_path == Path("target/manifest.json") + with pytest.raises(CosmosValueError) as err_info: + ProjectConfig() + print(err_info.value.args[0]) + assert err_info.value.args[0] == ( + "ProjectConfig requires dbt_project_path and/or manifest_path to be defined." + "If only manifest_path is defined, project_name must also be defined." + ) + + +def test_init_with_manifest_path_and_not_project_path_and_not_project_name_fails(): + """ + Passing a manifest alone should fail since we also require a project_name + """ + with pytest.raises(CosmosValueError) as err_info: + ProjectConfig(manifest_path=DBT_PROJECTS_ROOT_DIR / "manifest.json") + print(err_info.value.args[0]) + assert err_info.value.args[0] == ( + "ProjectConfig requires dbt_project_path and/or manifest_path to be defined." + "If only manifest_path is defined, project_name must also be defined." + ) def test_validate_with_project_path_and_manifest_path_succeeds(): @@ -53,17 +80,6 @@ def test_validate_with_project_path_and_not_manifest_path_succeeds(): assert project_config.validate_project() is None -def test_validate_with_manifest_path_and_not_project_path_and_not_project_name_fails(): - """ - Passing a manifest alone should fail since we also require a project_name - """ - project_config = ProjectConfig(manifest_path=DBT_PROJECTS_ROOT_DIR / "manifest.json") - with pytest.raises(CosmosValueError) as err_info: - assert project_config.validate_project() is None - print(err_info.value.args[0]) - assert err_info.value.args[0] == "project_name required when manifest_path is present and dbt_project_path is not." - - def test_validate_with_manifest_path_and_project_name_and_not_project_path_succeeds(): """ Passing a manifest and project name together should succeed. @@ -72,16 +88,6 @@ def test_validate_with_manifest_path_and_project_name_and_not_project_path_succe assert project_config.validate_project() is None -def test_validate_no_paths_fails(): - """ - Passing no manifest and no project directory should fail. - """ - project_config = ProjectConfig() - with pytest.raises(CosmosValueError) as err_info: - assert project_config.validate_project() is None - assert err_info.value.args[0] == "dbt_project_path or manifest_path are required parameters." - - def test_validate_project_missing_fails(): """ Passing a project dir that does not exist where specified should fail @@ -89,7 +95,7 @@ def test_validate_project_missing_fails(): project_config = ProjectConfig(dbt_project_path=Path("/tmp")) with pytest.raises(CosmosValueError) as err_info: assert project_config.validate_project() is None - assert err_info.value.args[0] == "Could not find dbt_project.yml at /tmp/dbt_project.yml" + assert err_info.value.args[0] == "Could not find dbt_project.yml at /tmp/dbt_project.yml" def test_is_manifest_available_is_true():