From 2f8d0e2f787a5792feacba609c0190877062375c Mon Sep 17 00:00:00 2001 From: MrBones757 Date: Wed, 25 Oct 2023 17:48:47 +0800 Subject: [PATCH 01/11] Resolve errors occurring when dbt_project_path is str and partial support dbt_project_path=None (#605) As part of the changes made #581, some downstream logic was missed relating to the handling of a None and String-based project dir. This MR attempts to remedy this issue by adding down steam support for the project dir being None (including generation of exceptions and guarding), as well as some property reference changes in the converter. Closes: #601 Co-authored-by: tabmra --- cosmos/config.py | 82 ++++++----- cosmos/converter.py | 36 +++-- cosmos/dbt/graph.py | 95 ++++++++----- cosmos/dbt/parser/project.py | 2 +- cosmos/dbt/project.py | 47 ------- cosmos/dbt/selector.py | 12 +- dev/dags/basic_cosmos_task_group.py | 2 +- dev/dags/cosmos_manifest_example.py | 3 +- tests/dbt/parser/test_project.py | 30 ++-- tests/dbt/test_graph.py | 207 ++++++++++++++++++---------- tests/test_config.py | 89 +++++++----- tests/test_converter.py | 71 +++++++++- 12 files changed, 410 insertions(+), 266 deletions(-) delete mode 100644 cosmos/dbt/project.py diff --git a/cosmos/config.py b/cosmos/config.py index e052cb9db..15f88e037 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -5,7 +5,6 @@ import contextlib import tempfile from dataclasses import dataclass, field -from functools import cached_property from pathlib import Path from typing import Any, Iterator, Callable @@ -43,7 +42,6 @@ class RenderConfig: node_converters: dict[DbtResourceType, Callable[..., Any]] | None = None -@dataclass class ProjectConfig: """ Class for setting project config. @@ -58,29 +56,41 @@ class ProjectConfig: Required if dbt_project_path is not defined. Defaults to the folder name of dbt_project_path. """ - dbt_project_path: str | Path | None = None - models_relative_path: str | Path = "models" - seeds_relative_path: str | Path = "seeds" - snapshots_relative_path: str | Path = "snapshots" - manifest_path: str | Path | None = None - project_name: str | None = None - - @cached_property - def parsed_dbt_project_path(self) -> Path | None: - return Path(self.dbt_project_path) if self.dbt_project_path else None + dbt_project_path: Path | None = None + manifest_path: Path | None = None + models_path: Path | None = None + seeds_path: Path | None = None + snapshots_path: Path | None = None + project_name: str + + def __init__( + self, + dbt_project_path: str | Path | None = None, + models_relative_path: str | Path = "models", + seeds_relative_path: str | Path = "seeds", + snapshots_relative_path: str | Path = "snapshots", + manifest_path: str | Path | None = None, + project_name: str | None = None, + ): + if not dbt_project_path: + if not manifest_path or not project_name: + raise CosmosValueError( + "ProjectConfig requires dbt_project_path and/or manifest_path to be defined." + " If only manifest_path is defined, project_name must also be defined." + ) + if project_name: + self.project_name = project_name - @cached_property - def parsed_manifest_path(self) -> Path | None: - return Path(self.manifest_path) if self.manifest_path else None + if dbt_project_path: + self.dbt_project_path = Path(dbt_project_path) + self.models_path = self.dbt_project_path / Path(models_relative_path) + self.seeds_path = self.dbt_project_path / Path(seeds_relative_path) + self.snapshots_path = self.dbt_project_path / Path(snapshots_relative_path) + if not project_name: + self.project_name = self.dbt_project_path.stem - def __post_init__(self) -> None: - "Converts paths to `Path` objects." - if self.parsed_dbt_project_path: - self.models_relative_path = self.parsed_dbt_project_path / Path(self.models_relative_path) - self.seeds_relative_path = self.parsed_dbt_project_path / Path(self.seeds_relative_path) - self.snapshots_relative_path = self.parsed_dbt_project_path / Path(self.snapshots_relative_path) - if not self.project_name: - self.project_name = self.parsed_dbt_project_path.stem + if manifest_path: + self.manifest_path = Path(manifest_path) def validate_project(self) -> None: """ @@ -94,20 +104,14 @@ def validate_project(self) -> None: mandatory_paths = {} - if self.parsed_dbt_project_path: - project_yml_path = self.parsed_dbt_project_path / "dbt_project.yml" + if self.dbt_project_path: + project_yml_path = self.dbt_project_path / "dbt_project.yml" mandatory_paths = { "dbt_project.yml": project_yml_path, - "models directory ": self.models_relative_path, + "models directory ": self.models_path, } - elif self.parsed_manifest_path: - if not self.project_name: - raise CosmosValueError( - "project_name required when manifest_path is present and dbt_project_path is not." - ) - mandatory_paths = {"manifest file": self.parsed_manifest_path} - else: - raise CosmosValueError("dbt_project_path or manifest_path are required parameters.") + if self.manifest_path: + mandatory_paths["manifest"] = self.manifest_path for name, path in mandatory_paths.items(): if path is None or not Path(path).exists(): @@ -117,10 +121,10 @@ def is_manifest_available(self) -> bool: """ Check if the `dbt` project manifest is set and if the file exists. """ - if not self.parsed_manifest_path: + if not self.manifest_path: return False - return self.parsed_manifest_path.exists() + return self.manifest_path.exists() @dataclass @@ -159,6 +163,12 @@ def validate_profile(self) -> None: if not self.profiles_yml_filepath and not self.profile_mapping: raise CosmosValueError("Either profiles_yml_filepath or profile_mapping must be set to render a profile") + def is_profile_yml_available(self) -> bool: + """ + Check if the `dbt` profiles.yml file exists. + """ + return Path(self.profiles_yml_filepath).exists() if self.profiles_yml_filepath else False + @contextlib.contextmanager def ensure_profile( self, desired_profile_path: Path | None = None, use_mock_values: bool = False diff --git a/cosmos/converter.py b/cosmos/converter.py index 43865c56b..b1653ba52 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -5,14 +5,12 @@ import inspect from typing import Any, Callable -from pathlib import Path from airflow.models.dag import DAG from airflow.utils.task_group import TaskGroup from cosmos.airflow.graph import build_airflow_graph from cosmos.dbt.graph import DbtGraph -from cosmos.dbt.project import DbtProject from cosmos.dbt.selector import retrieve_by_label from cosmos.config import ProjectConfig, ExecutionConfig, RenderConfig, ProfileConfig from cosmos.exceptions import CosmosValueError @@ -106,11 +104,6 @@ def __init__( project_config.validate_project() emit_datasets = render_config.emit_datasets - dbt_root_path = project_config.dbt_project_path.parent - dbt_project_name = project_config.dbt_project_path.name - dbt_models_dir = project_config.models_relative_path - dbt_seeds_dir = project_config.seeds_relative_path - dbt_snapshots_dir = project_config.snapshots_relative_path test_behavior = render_config.test_behavior select = render_config.select exclude = render_config.exclude @@ -118,10 +111,12 @@ def __init__( execution_mode = execution_config.execution_mode test_indirect_selection = execution_config.test_indirect_selection load_mode = render_config.load_method - manifest_path = project_config.parsed_manifest_path dbt_executable_path = execution_config.dbt_executable_path node_converters = render_config.node_converters + if not project_config.dbt_project_path: + raise CosmosValueError("A Project Path in ProjectConfig is required for generating a Task Operators.") + profile_args = {} if profile_config.profile_mapping: profile_args = profile_config.profile_mapping.profile_args @@ -129,17 +124,18 @@ def __init__( if not operator_args: operator_args = {} - dbt_project = DbtProject( - name=dbt_project_name, - root_dir=Path(dbt_root_path), - models_dir=Path(dbt_models_dir) if dbt_models_dir else None, - seeds_dir=Path(dbt_seeds_dir) if dbt_seeds_dir else None, - snapshots_dir=Path(dbt_snapshots_dir) if dbt_snapshots_dir else None, - manifest_path=manifest_path, - ) - + # Previously, we were creating a cosmos.dbt.project.DbtProject + # DbtProject has now been replaced with ProjectConfig directly + # since the interface of the two classes were effectively the same + # Under this previous implementation, we were passing: + # - name, root dir, models dir, snapshots dir and manifest path + # Internally in the dbtProject class, we were defaulting the profile_path + # To be root dir/profiles.yml + # To keep this logic working, if converter is given no ProfileConfig, + # we can create a default retaining this value to preserve this functionality. + # We may want to consider defaulting this value in our actual ProjceConfig class? dbt_graph = DbtGraph( - project=dbt_project, + project=project_config, exclude=exclude, select=select, dbt_cmd=dbt_executable_path, @@ -152,7 +148,7 @@ def __init__( task_args = { **operator_args, # the following args may be only needed for local / venv: - "project_dir": dbt_project.dir, + "project_dir": project_config.dbt_project_path, "profile_config": profile_config, "emit_datasets": emit_datasets, } @@ -169,7 +165,7 @@ def __init__( task_args=task_args, test_behavior=test_behavior, test_indirect_selection=test_indirect_selection, - dbt_project_name=dbt_project.name, + dbt_project_name=project_config.project_name, on_warning_callback=on_warning_callback, node_converters=node_converters, ) diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index 889948fdd..ac92c46f5 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -10,7 +10,7 @@ from subprocess import PIPE, Popen from typing import Any -from cosmos.config import ProfileConfig +from cosmos.config import ProfileConfig, ProjectConfig from cosmos.constants import ( DBT_LOG_DIR_NAME, DBT_LOG_FILENAME, @@ -22,8 +22,7 @@ LoadMode, ) from cosmos.dbt.executable import get_system_dbt -from cosmos.dbt.parser.project import DbtProject as LegacyDbtProject -from cosmos.dbt.project import DbtProject +from cosmos.dbt.parser.project import LegacyDbtProject from cosmos.dbt.selector import select_nodes from cosmos.log import get_logger @@ -64,7 +63,7 @@ class DbtGraph: Example of how to use: dbt_graph = DbtGraph( - project=DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR), + project=ProjectConfig(dbt_project_path=DBT_PROJECT_PATH), exclude=["*orders*"], select=[], dbt_cmd="/usr/local/bin/dbt", @@ -77,11 +76,11 @@ class DbtGraph: def __init__( self, - project: DbtProject, + project: ProjectConfig, + profile_config: ProfileConfig | None = None, exclude: list[str] | None = None, select: list[str] | None = None, dbt_cmd: str = get_system_dbt(), - profile_config: ProfileConfig | None = None, operator_args: dict[str, Any] | None = None, dbt_deps: bool | None = True, ): @@ -107,31 +106,34 @@ def load( :param method: How to load `nodes` from a `dbt` project (automatically, using custom parser, using dbt manifest or dbt ls) :param execution_mode: Where Cosmos should run each dbt task (e.g. ExecutionMode.KUBERNETES) + + Fundamentally, there are two different execution paths + There is automatic, and manual. """ + load_method = { LoadMode.CUSTOM: self.load_via_custom_parser, LoadMode.DBT_LS: self.load_via_dbt_ls, LoadMode.DBT_MANIFEST: self.load_from_dbt_manifest, } + if method == LoadMode.AUTOMATIC: if self.project.is_manifest_available(): self.load_from_dbt_manifest() - return - elif execution_mode == ExecutionMode.LOCAL and self.project.is_profile_yml_available(): - try: - self.load_via_dbt_ls() - return - except FileNotFoundError: - self.load_via_custom_parser() - return else: - self.load_via_custom_parser() - return - - if method == LoadMode.DBT_MANIFEST and not self.project.is_manifest_available(): - raise CosmosLoadDbtException(f"Unable to load manifest using {self.project.manifest_path}") - - load_method[method]() + if ( + execution_mode == ExecutionMode.LOCAL + and self.profile_config + and self.profile_config.is_profile_yml_available() + ): + try: + self.load_via_dbt_ls() + except FileNotFoundError: + self.load_via_custom_parser() + else: + self.load_via_custom_parser() + else: + load_method[method]() def load_via_dbt_ls(self) -> None: """ @@ -145,10 +147,14 @@ def load_via_dbt_ls(self) -> None: * self.nodes * self.filtered_nodes """ - logger.info("Trying to parse the dbt project `%s` in `%s` using dbt ls...", self.project.name, self.project.dir) + logger.info( + "Trying to parse the dbt project `%s` in `%s` using dbt ls...", + self.project.project_name, + self.project.dbt_project_path, + ) - if not self.profile_config: - raise CosmosLoadDbtException("Unable to load dbt project without a profile config") + if not self.project.dbt_project_path or not self.profile_config: + raise CosmosLoadDbtException("Unable to load dbt project without project files and a profile config") if not shutil.which(self.dbt_cmd): raise CosmosLoadDbtException(f"Unable to find the dbt executable: {self.dbt_cmd}") @@ -159,16 +165,20 @@ def load_via_dbt_ls(self) -> None: env.update(env_vars) with tempfile.TemporaryDirectory() as tmpdir: - logger.info("Content of the dbt project dir <%s>: `%s`", self.project.dir, os.listdir(self.project.dir)) - logger.info("Creating symlinks from %s to `%s`", self.project.dir, tmpdir) + logger.info( + "Content of the dbt project dir <%s>: `%s`", + self.project.dbt_project_path, + os.listdir(self.project.dbt_project_path), + ) + logger.info("Creating symlinks from %s to `%s`", self.project.dbt_project_path, tmpdir) # We create symbolic links to the original directory files and directories. # This allows us to run the dbt command from within the temporary directory, outputting any necessary # artifact and also allow us to run `dbt deps` tmpdir_path = Path(tmpdir) ignore_paths = (DBT_LOG_DIR_NAME, DBT_TARGET_DIR_NAME, "dbt_packages", "profiles.yml") - for child_name in os.listdir(self.project.dir): + for child_name in os.listdir(self.project.dbt_project_path): if child_name not in ignore_paths: - os.symlink(self.project.dir / child_name, tmpdir_path / child_name) + os.symlink(self.project.dbt_project_path / child_name, tmpdir_path / child_name) local_flags = [ "--project-dir", @@ -260,7 +270,7 @@ def load_via_dbt_ls(self) -> None: unique_id=node_dict["unique_id"], resource_type=DbtResourceType(node_dict["resource_type"]), depends_on=node_dict.get("depends_on", {}).get("nodes", []), - file_path=self.project.dir / node_dict["original_file_path"], + file_path=self.project.dbt_project_path / node_dict["original_file_path"], tags=node_dict["tags"], config=node_dict["config"], ) @@ -287,13 +297,16 @@ def load_via_custom_parser(self) -> None: * self.nodes * self.filtered_nodes """ - logger.info("Trying to parse the dbt project `%s` using a custom Cosmos method...", self.project.name) + logger.info("Trying to parse the dbt project `%s` using a custom Cosmos method...", self.project.project_name) + + if not self.project.dbt_project_path or not self.project.models_path or not self.project.seeds_path: + raise CosmosLoadDbtException("Unable to load dbt project without project files") project = LegacyDbtProject( - dbt_root_path=str(self.project.root_dir), - dbt_models_dir=self.project.models_dir.stem if self.project.models_dir else None, - dbt_seeds_dir=self.project.seeds_dir.stem if self.project.seeds_dir else None, - project_name=self.project.name, + project_name=self.project.dbt_project_path.stem, + dbt_root_path=self.project.dbt_project_path.parent.as_posix(), + dbt_models_dir=self.project.models_path.stem, + dbt_seeds_dir=self.project.seeds_path.stem, operator_args=self.operator_args, ) nodes = {} @@ -315,7 +328,7 @@ def load_via_custom_parser(self) -> None: self.nodes = nodes self.filtered_nodes = select_nodes( - project_dir=self.project.dir, nodes=nodes, select=self.select, exclude=self.exclude + project_dir=self.project.dbt_project_path, nodes=nodes, select=self.select, exclude=self.exclude ) self.update_node_dependency() @@ -337,7 +350,11 @@ def load_from_dbt_manifest(self) -> None: * self.nodes * self.filtered_nodes """ - logger.info("Trying to parse the dbt project `%s` using a dbt manifest...", self.project.name) + logger.info("Trying to parse the dbt project `%s` using a dbt manifest...", self.project.project_name) + + if not self.project.is_manifest_available(): + raise CosmosLoadDbtException(f"Unable to load manifest using {self.project.manifest_path}") + nodes = {} with open(self.project.manifest_path) as fp: # type: ignore[arg-type] manifest = json.load(fp) @@ -349,7 +366,9 @@ def load_from_dbt_manifest(self) -> None: unique_id=unique_id, resource_type=DbtResourceType(node_dict["resource_type"]), depends_on=node_dict.get("depends_on", {}).get("nodes", []), - file_path=self.project.dir / node_dict["original_file_path"], + file_path=self.project.dbt_project_path / Path(node_dict["original_file_path"]) + if self.project.dbt_project_path + else Path(node_dict["original_file_path"]), tags=node_dict["tags"], config=node_dict["config"], ) @@ -358,7 +377,7 @@ def load_from_dbt_manifest(self) -> None: self.nodes = nodes self.filtered_nodes = select_nodes( - project_dir=self.project.dir, nodes=nodes, select=self.select, exclude=self.exclude + project_dir=self.project.dbt_project_path, nodes=nodes, select=self.select, exclude=self.exclude ) self.update_node_dependency() diff --git a/cosmos/dbt/parser/project.py b/cosmos/dbt/parser/project.py index bb86d41a6..e154bb0ed 100644 --- a/cosmos/dbt/parser/project.py +++ b/cosmos/dbt/parser/project.py @@ -233,7 +233,7 @@ def __repr__(self) -> str: @dataclass -class DbtProject: +class LegacyDbtProject: """ Represents a single dbt project. """ diff --git a/cosmos/dbt/project.py b/cosmos/dbt/project.py deleted file mode 100644 index fe60f5751..000000000 --- a/cosmos/dbt/project.py +++ /dev/null @@ -1,47 +0,0 @@ -from __future__ import annotations -from dataclasses import dataclass -from pathlib import Path - - -DEFAULT_PROFILE_FILE_NAME = "profiles.yml" - - -@dataclass -class DbtProject: - name: str - root_dir: Path - models_dir: Path | None = None - seeds_dir: Path | None = None - snapshots_dir: Path | None = None - manifest_path: Path | None = None - profile_path: Path | None = None - _cosmos_created_profile_file: bool = False - - def __post_init__(self) -> None: - if self.models_dir is None: - self.models_dir = self.dir / "models" - if self.seeds_dir is None: - self.seeds_dir = self.dir / "seeds" - if self.snapshots_dir is None: - self.snapshots_dir = self.dir / "snapshots" - if self.profile_path is None: - self.profile_path = self.dir / "profiles.yml" - - @property - def dir(self) -> Path: - """ - Path to dbt pipeline, defined by self.root_dir and self.name. - """ - return self.root_dir / self.name - - def is_manifest_available(self) -> bool: - """ - Check if the `dbt` project manifest is set and if the file exists. - """ - return self.manifest_path is not None and Path(self.manifest_path).exists() - - def is_profile_yml_available(self) -> bool: - """ - Check if the `dbt` profiles.yml file exists. - """ - return Path(self.profile_path).exists() if self.profile_path else False diff --git a/cosmos/dbt/selector.py b/cosmos/dbt/selector.py index 926cc6b1a..0e6034a0b 100644 --- a/cosmos/dbt/selector.py +++ b/cosmos/dbt/selector.py @@ -27,7 +27,7 @@ class SelectorConfig: Supports to load it from a string. """ - def __init__(self, project_dir: Path, statement: str): + def __init__(self, project_dir: Path | None, statement: str): """ Create a selector config file. @@ -64,7 +64,10 @@ def load_from_statement(self, statement: str) -> None: for item in items: if item.startswith(PATH_SELECTOR): index = len(PATH_SELECTOR) - self.paths.append(self.project_dir / item[index:]) + if self.project_dir: + self.paths.append(self.project_dir / Path(item[index:])) + else: + self.paths.append(Path(item[index:])) elif item.startswith(TAG_SELECTOR): index = len(TAG_SELECTOR) self.tags.append(item[index:]) @@ -165,7 +168,10 @@ def retrieve_by_label(statement_list: list[str], label: str) -> set[str]: def select_nodes( - project_dir: Path, nodes: dict[str, DbtNode], select: list[str] | None = None, exclude: list[str] | None = None + project_dir: Path | None, + nodes: dict[str, DbtNode], + select: list[str] | None = None, + exclude: list[str] | None = None, ) -> dict[str, DbtNode]: """ Given a group of nodes within a project, apply select and exclude filters using diff --git a/dev/dags/basic_cosmos_task_group.py b/dev/dags/basic_cosmos_task_group.py index 2f875b8c3..50cb6ed09 100644 --- a/dev/dags/basic_cosmos_task_group.py +++ b/dev/dags/basic_cosmos_task_group.py @@ -38,7 +38,7 @@ def basic_cosmos_task_group() -> None: jaffle_shop = DbtTaskGroup( group_id="test_123", project_config=ProjectConfig( - DBT_ROOT_PATH / "jaffle_shop", + (DBT_ROOT_PATH / "jaffle_shop").as_posix(), ), operator_args={"install_deps": True}, profile_config=profile_config, diff --git a/dev/dags/cosmos_manifest_example.py b/dev/dags/cosmos_manifest_example.py index 14ce42606..67a9cb3ce 100644 --- a/dev/dags/cosmos_manifest_example.py +++ b/dev/dags/cosmos_manifest_example.py @@ -25,8 +25,9 @@ cosmos_manifest_example = DbtDag( # dbt/cosmos-specific parameters project_config=ProjectConfig( - DBT_ROOT_PATH / "jaffle_shop", + dbt_project_path=DBT_ROOT_PATH / "jaffle_shop", manifest_path=DBT_ROOT_PATH / "jaffle_shop" / "target" / "manifest.json", + project_name="jaffle_shop", ), profile_config=profile_config, render_config=RenderConfig(load_method=LoadMode.DBT_MANIFEST, select=["path:models/customers.sql"]), diff --git a/tests/dbt/parser/test_project.py b/tests/dbt/parser/test_project.py index 31a79ceba..4f13a3eb3 100644 --- a/tests/dbt/parser/test_project.py +++ b/tests/dbt/parser/test_project.py @@ -10,7 +10,7 @@ import pytest import yaml -from cosmos.dbt.parser.project import DbtModel, DbtModelType, DbtProject +from cosmos.dbt.parser.project import DbtModel, DbtModelType, LegacyDbtProject DBT_PROJECT_PATH = Path(__name__).parent.parent.parent.parent.parent / "dev/dags/dbt/" SAMPLE_CSV_PATH = DBT_PROJECT_PATH / "jaffle_shop/seeds/raw_customers.csv" @@ -19,8 +19,8 @@ SAMPLE_YML_PATH = DBT_PROJECT_PATH / "jaffle_shop/models/schema.yml" -def test_dbtproject__handle_csv_file(): - dbt_project = DbtProject( +def test_LegacyDbtProject__handle_csv_file(): + dbt_project = LegacyDbtProject( project_name="jaffle_shop", ) assert not dbt_project.seeds @@ -33,8 +33,8 @@ def test_dbtproject__handle_csv_file(): assert raw_customers.path == SAMPLE_CSV_PATH -def test_dbtproject__handle_sql_file_model(): - dbt_project = DbtProject( +def test_LegacyDbtProject__handle_sql_file_model(): + dbt_project = LegacyDbtProject( project_name="jaffle_shop", dbt_root_path=DBT_PROJECT_PATH, ) @@ -49,8 +49,8 @@ def test_dbtproject__handle_sql_file_model(): assert raw_customers.path == SAMPLE_MODEL_SQL_PATH -def test_dbtproject__handle_sql_file_snapshot(): - dbt_project = DbtProject( +def test_LegacyDbtProject__handle_sql_file_snapshot(): + dbt_project = LegacyDbtProject( project_name="jaffle_shop", dbt_root_path=DBT_PROJECT_PATH, ) @@ -65,8 +65,8 @@ def test_dbtproject__handle_sql_file_snapshot(): assert raw_customers.path == SAMPLE_SNAPSHOT_SQL_PATH -def test_dbtproject__handle_config_file(): - dbt_project = DbtProject( +def test_LegacyDbtProject__handle_config_file(): + dbt_project = LegacyDbtProject( project_name="jaffle_shop", dbt_root_path=DBT_PROJECT_PATH, ) @@ -81,25 +81,25 @@ def test_dbtproject__handle_config_file(): assert sample_test.path == SAMPLE_YML_PATH -def test_dbtproject__handle_config_file_empty_file(): +def test_LegacyDbtProject__handle_config_file_empty_file(): with NamedTemporaryFile("w") as tmp_fp: tmp_fp.flush() sample_config_file_path = Path(tmp_fp.name) - dbt_project = DbtProject(project_name="empty_project") + dbt_project = LegacyDbtProject(project_name="empty_project") assert not dbt_project.models dbt_project._handle_config_file(sample_config_file_path) assert not dbt_project.models -def test_dbtproject__handle_config_file_with_unknown_name(): +def test_LegacyDbtProject__handle_config_file_with_unknown_name(): yaml_data = {"models": [{"name": "unknown"}]} with NamedTemporaryFile("w") as tmp_fp: yaml.dump(yaml_data, tmp_fp) tmp_fp.flush() sample_config_file_path = Path(tmp_fp.name) - dbt_project = DbtProject(project_name="empty_project") + dbt_project = LegacyDbtProject(project_name="empty_project") assert not dbt_project.models dbt_project._handle_config_file(sample_config_file_path) assert not dbt_project.models @@ -112,8 +112,8 @@ def test_dbtproject__handle_config_file_with_unknown_name(): (["tag1", "tag2"], {"materialized:view", "tags:tag1", "tags:tag2"}), ], ) -def test_dbtproject__handle_config_file_with_selector(input_tags, expected_config_selectors): - dbt_project = DbtProject( +def test_LegacyDbtProject__handle_config_file_with_selector(input_tags, expected_config_selectors): + dbt_project = LegacyDbtProject( project_name="jaffle_shop", dbt_root_path=DBT_PROJECT_PATH, ) diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index dc4189aca..f176fa445 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -5,14 +5,13 @@ import pytest -from cosmos.config import ProfileConfig +from cosmos.config import ProfileConfig, ProjectConfig from cosmos.constants import DbtResourceType, ExecutionMode from cosmos.dbt.graph import CosmosLoadDbtException, DbtGraph, LoadMode -from cosmos.dbt.project import DbtProject from cosmos.profiles import PostgresUserPasswordProfileMapping DBT_PROJECTS_ROOT_DIR = Path(__file__).parent.parent.parent / "dev/dags/dbt" -DBT_PIPELINE_NAME = "jaffle_shop" +DBT_PROJECT_NAME = "jaffle_shop" SAMPLE_MANIFEST = Path(__file__).parent.parent / "sample/manifest.json" SAMPLE_MANIFEST_PY = Path(__file__).parent.parent / "sample/manifest_python.json" SAMPLE_MANIFEST_MODEL_VERSION = Path(__file__).parent.parent / "sample/manifest_model_version.json" @@ -24,10 +23,10 @@ def tmp_dbt_project_dir(): """ Creates a plain dbt project structure, which does not contain logs or target folders. """ - source_proj_dir = DBT_PROJECTS_ROOT_DIR / DBT_PIPELINE_NAME + source_proj_dir = DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME tmp_dir = Path(tempfile.mkdtemp()) - target_proj_dir = tmp_dir / DBT_PIPELINE_NAME + target_proj_dir = tmp_dir / DBT_PROJECT_NAME shutil.copytree(source_proj_dir, target_proj_dir) shutil.rmtree(target_proj_dir / "logs", ignore_errors=True) shutil.rmtree(target_proj_dir / "target", ignore_errors=True) @@ -37,12 +36,19 @@ def tmp_dbt_project_dir(): @pytest.mark.parametrize( - "pipeline_name,manifest_filepath,model_filepath", - [("jaffle_shop", SAMPLE_MANIFEST, "customers.sql"), ("jaffle_shop_python", SAMPLE_MANIFEST_PY, "customers.py")], + "project_name,manifest_filepath,model_filepath", + [(DBT_PROJECT_NAME, SAMPLE_MANIFEST, "customers.sql"), ("jaffle_shop_python", SAMPLE_MANIFEST_PY, "customers.py")], ) -def test_load_via_manifest_with_exclude(pipeline_name, manifest_filepath, model_filepath): - dbt_project = DbtProject(name=pipeline_name, root_dir=DBT_PROJECTS_ROOT_DIR, manifest_path=manifest_filepath) - dbt_graph = DbtGraph(project=dbt_project, exclude=["config.materialized:table"]) +def test_load_via_manifest_with_exclude(project_name, manifest_filepath, model_filepath): + project_config = ProjectConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name, manifest_path=manifest_filepath + ) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", + ) + dbt_graph = DbtGraph(project=project_config, profile_config=profile_config, exclude=["config.materialized:table"]) dbt_graph.load_from_dbt_manifest() assert len(dbt_graph.nodes) == 28 @@ -58,13 +64,20 @@ def test_load_via_manifest_with_exclude(pipeline_name, manifest_filepath, model_ "model.jaffle_shop.stg_orders", "model.jaffle_shop.stg_payments", ] - assert sample_node.file_path == DBT_PROJECTS_ROOT_DIR / f"{pipeline_name}/models/{model_filepath}" + assert sample_node.file_path == DBT_PROJECTS_ROOT_DIR / f"{project_name}/models/{model_filepath}" @patch("cosmos.dbt.graph.DbtGraph.load_from_dbt_manifest", return_value=None) def test_load_automatic_manifest_is_available(mock_load_from_dbt_manifest): - dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR, manifest_path=SAMPLE_MANIFEST) - dbt_graph = DbtGraph(project=dbt_project) + project_config = ProjectConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, manifest_path=SAMPLE_MANIFEST + ) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", + ) + dbt_graph = DbtGraph(project=project_config, profile_config=profile_config) dbt_graph.load(execution_mode=ExecutionMode.LOCAL) assert mock_load_from_dbt_manifest.called @@ -72,8 +85,15 @@ def test_load_automatic_manifest_is_available(mock_load_from_dbt_manifest): @patch("cosmos.dbt.graph.DbtGraph.load_via_custom_parser", side_effect=FileNotFoundError()) @patch("cosmos.dbt.graph.DbtGraph.load_via_dbt_ls", return_value=None) def test_load_automatic_without_manifest(mock_load_via_dbt_ls, mock_load_via_custom_parser): - dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR, manifest_path="/tmp/manifest.json") - dbt_graph = DbtGraph(project=dbt_project) + project_config = ProjectConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, manifest_path="/tmp/manifest.json" + ) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", + ) + dbt_graph = DbtGraph(project=project_config, profile_config=profile_config) dbt_graph.load(execution_mode=ExecutionMode.LOCAL) assert mock_load_via_dbt_ls.called assert not mock_load_via_custom_parser.called @@ -82,16 +102,26 @@ def test_load_automatic_without_manifest(mock_load_via_dbt_ls, mock_load_via_cus @patch("cosmos.dbt.graph.DbtGraph.load_via_custom_parser", return_value=None) @patch("cosmos.dbt.graph.DbtGraph.load_via_dbt_ls", side_effect=FileNotFoundError()) def test_load_automatic_without_manifest_and_without_dbt_cmd(mock_load_via_dbt_ls, mock_load_via_custom_parser): - dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR) - dbt_graph = DbtGraph(project=dbt_project) + project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", + ) + dbt_graph = DbtGraph(project=project_config, profile_config=profile_config) dbt_graph.load(execution_mode=ExecutionMode.LOCAL, method=LoadMode.AUTOMATIC) assert mock_load_via_dbt_ls.called assert mock_load_via_custom_parser.called def test_load_manifest_without_manifest(): - dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR) - dbt_graph = DbtGraph(project=dbt_project) + project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", + ) + dbt_graph = DbtGraph(project=project_config, profile_config=profile_config) with pytest.raises(CosmosLoadDbtException) as err_info: dbt_graph.load(execution_mode=ExecutionMode.LOCAL, method=LoadMode.DBT_MANIFEST) assert err_info.value.args[0] == "Unable to load manifest using None" @@ -99,8 +129,15 @@ def test_load_manifest_without_manifest(): @patch("cosmos.dbt.graph.DbtGraph.load_from_dbt_manifest", return_value=None) def test_load_manifest_with_manifest(mock_load_from_dbt_manifest): - dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR, manifest_path=SAMPLE_MANIFEST) - dbt_graph = DbtGraph(project=dbt_project) + project_config = ProjectConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, manifest_path=SAMPLE_MANIFEST + ) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", + ) + dbt_graph = DbtGraph(project=project_config, profile_config=profile_config) dbt_graph.load(execution_mode=ExecutionMode.LOCAL, method=LoadMode.DBT_MANIFEST) assert mock_load_from_dbt_manifest.called @@ -122,8 +159,13 @@ def test_load_manifest_with_manifest(mock_load_from_dbt_manifest): def test_load( mock_load_from_dbt_manifest, mock_load_via_dbt_ls, mock_load_via_custom_parser, exec_mode, method, expected_function ): - dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR) - dbt_graph = DbtGraph(project=dbt_project) + project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", + ) + dbt_graph = DbtGraph(project=project_config, profile_config=profile_config) dbt_graph.load(method=method, execution_mode=exec_mode) load_function = locals()[expected_function] @@ -138,9 +180,9 @@ def test_load_via_dbt_ls_does_not_create_target_logs_in_original_folder(mock_pop assert not (tmp_dbt_project_dir / "target").exists() assert not (tmp_dbt_project_dir / "logs").exists() - dbt_project = DbtProject(name=DBT_PIPELINE_NAME, root_dir=tmp_dbt_project_dir) + project_config = ProjectConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) dbt_graph = DbtGraph( - project=dbt_project, + project=project_config, profile_config=ProfileConfig( profile_name="default", target_name="default", @@ -155,15 +197,15 @@ def test_load_via_dbt_ls_does_not_create_target_logs_in_original_folder(mock_pop assert not (tmp_dbt_project_dir / "logs").exists() used_cwd = Path(mock_popen.call_args[0][0][-5]) - assert used_cwd != dbt_project.dir + assert used_cwd != project_config.dbt_project_path assert not used_cwd.exists() @pytest.mark.integration def test_load_via_dbt_ls_with_exclude(): - dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR) + project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) dbt_graph = DbtGraph( - project=dbt_project, + project=project_config, select=["*customers*"], exclude=["*orders*"], profile_config=ProfileConfig( @@ -204,11 +246,11 @@ def test_load_via_dbt_ls_with_exclude(): @pytest.mark.integration -@pytest.mark.parametrize("pipeline_name", ("jaffle_shop", "jaffle_shop_python")) -def test_load_via_dbt_ls_without_exclude(pipeline_name): - dbt_project = DbtProject(name=pipeline_name, root_dir=DBT_PROJECTS_ROOT_DIR) +@pytest.mark.parametrize("project_name", ("jaffle_shop", "jaffle_shop_python")) +def test_load_via_dbt_ls_without_exclude(project_name): + project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name) dbt_graph = DbtGraph( - project=dbt_project, + project=project_config, profile_config=ProfileConfig( profile_name="default", target_name="default", @@ -224,25 +266,32 @@ def test_load_via_dbt_ls_without_exclude(pipeline_name): assert len(dbt_graph.nodes) == 28 +def test_load_via_custom_without_project_path(): + project_config = ProjectConfig(manifest_path=SAMPLE_MANIFEST, project_name="test") + dbt_graph = DbtGraph(dbt_cmd="/inexistent/dbt", project=project_config) + with pytest.raises(CosmosLoadDbtException) as err_info: + dbt_graph.load_via_custom_parser() + + expected = "Unable to load dbt project without project files" + assert err_info.value.args[0] == expected + + def test_load_via_dbt_ls_without_profile(): - dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR) - dbt_graph = DbtGraph( - dbt_cmd="/inexistent/dbt", - project=dbt_project, - ) + project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + dbt_graph = DbtGraph(dbt_cmd="/inexistent/dbt", project=project_config) with pytest.raises(CosmosLoadDbtException) as err_info: dbt_graph.load_via_dbt_ls() - expected = "Unable to load dbt project without a profile config" + expected = "Unable to load dbt project without project files and a profile config" assert err_info.value.args[0] == expected def test_load_via_dbt_ls_with_invalid_dbt_path(): - dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR) + project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) with patch("pathlib.Path.exists", return_value=True): dbt_graph = DbtGraph( dbt_cmd="/inexistent/dbt", - project=dbt_project, + project=project_config, profile_config=ProfileConfig( profile_name="default", target_name="default", @@ -260,18 +309,17 @@ def test_load_via_dbt_ls_with_invalid_dbt_path(): @pytest.mark.parametrize("load_method", ["load_via_dbt_ls", "load_from_dbt_manifest"]) @pytest.mark.integration def test_load_via_dbt_ls_with_sources(load_method): - pipeline_name = "simple" + project_name = "simple" dbt_graph = DbtGraph( dbt_deps=False, - project=DbtProject( - name=pipeline_name, - root_dir=DBT_PROJECTS_ROOT_DIR, + project=ProjectConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name, manifest_path=SAMPLE_MANIFEST_SOURCE if load_method == "load_from_dbt_manifest" else None, ), profile_config=ProfileConfig( profile_name="simple", target_name="dev", - profiles_yml_filepath=(DBT_PROJECTS_ROOT_DIR / pipeline_name / "profiles.yml"), + profiles_yml_filepath=(DBT_PROJECTS_ROOT_DIR / project_name / "profiles.yml"), ), ) getattr(dbt_graph, load_method)() @@ -282,11 +330,10 @@ def test_load_via_dbt_ls_with_sources(load_method): @pytest.mark.integration def test_load_via_dbt_ls_without_dbt_deps(): - pipeline_name = "jaffle_shop" - dbt_project = DbtProject(name=pipeline_name, root_dir=DBT_PROJECTS_ROOT_DIR) + project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) dbt_graph = DbtGraph( dbt_deps=False, - project=dbt_project, + project=project_config, profile_config=ProfileConfig( profile_name="default", target_name="default", @@ -310,9 +357,9 @@ def test_load_via_dbt_ls_with_zero_returncode_and_non_empty_stderr(mock_popen, t mock_popen().communicate.return_value = ("", "Some stderr warnings") mock_popen().returncode = 0 - dbt_project = DbtProject(name=DBT_PIPELINE_NAME, root_dir=tmp_dbt_project_dir) + project_config = ProjectConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) dbt_graph = DbtGraph( - project=dbt_project, + project=project_config, profile_config=ProfileConfig( profile_name="default", target_name="default", @@ -332,9 +379,9 @@ def test_load_via_dbt_ls_with_non_zero_returncode(mock_popen): mock_popen().communicate.return_value = ("", "Some stderr message") mock_popen().returncode = 1 - dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR) + project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) dbt_graph = DbtGraph( - project=dbt_project, + project=project_config, profile_config=ProfileConfig( profile_name="default", target_name="default", @@ -355,9 +402,9 @@ def test_load_via_dbt_ls_with_non_zero_returncode(mock_popen): @patch("cosmos.dbt.graph.Popen.communicate", return_value=("Some Runtime Error", "")) def test_load_via_dbt_ls_with_runtime_error_in_stdout(mock_popen_communicate): # It may seem strange, but at least until dbt 1.6.0, there are circumstances when it outputs errors to stdout - dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR) + project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) dbt_graph = DbtGraph( - project=dbt_project, + project=project_config, profile_config=ProfileConfig( profile_name="default", target_name="default", @@ -375,13 +422,15 @@ def test_load_via_dbt_ls_with_runtime_error_in_stdout(mock_popen_communicate): mock_popen_communicate.assert_called_once() -@pytest.mark.parametrize("pipeline_name", ("jaffle_shop", "jaffle_shop_python")) -def test_load_via_load_via_custom_parser(pipeline_name): - dbt_project = DbtProject( - name=pipeline_name, - root_dir=DBT_PROJECTS_ROOT_DIR, +@pytest.mark.parametrize("project_name", ("jaffle_shop", "jaffle_shop_python")) +def test_load_via_load_via_custom_parser(project_name): + project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / project_name) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", ) - dbt_graph = DbtGraph(project=dbt_project) + dbt_graph = DbtGraph(project=project_config, profile_config=profile_config) dbt_graph.load_via_custom_parser() @@ -391,16 +440,30 @@ def test_load_via_load_via_custom_parser(pipeline_name): @patch("cosmos.dbt.graph.DbtGraph.update_node_dependency", return_value=None) def test_update_node_dependency_called(mock_update_node_dependency): - dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR, manifest_path=SAMPLE_MANIFEST) - dbt_graph = DbtGraph(project=dbt_project) + project_config = ProjectConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, manifest_path=SAMPLE_MANIFEST + ) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", + ) + dbt_graph = DbtGraph(project=project_config, profile_config=profile_config) dbt_graph.load() assert mock_update_node_dependency.called def test_update_node_dependency_target_exist(): - dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR, manifest_path=SAMPLE_MANIFEST) - dbt_graph = DbtGraph(project=dbt_project) + project_config = ProjectConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, manifest_path=SAMPLE_MANIFEST + ) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", + ) + dbt_graph = DbtGraph(project=project_config, profile_config=profile_config) dbt_graph.load() for _, nodes in dbt_graph.nodes.items(): @@ -410,8 +473,15 @@ def test_update_node_dependency_target_exist(): def test_update_node_dependency_test_not_exist(): - dbt_project = DbtProject(name="jaffle_shop", root_dir=DBT_PROJECTS_ROOT_DIR, manifest_path=SAMPLE_MANIFEST) - dbt_graph = DbtGraph(project=dbt_project, exclude=["config.materialized:test"]) + project_config = ProjectConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, manifest_path=SAMPLE_MANIFEST + ) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profiles_yml_filepath=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME / "profiles.yml", + ) + dbt_graph = DbtGraph(project=project_config, profile_config=profile_config, exclude=["config.materialized:test"]) dbt_graph.load_from_dbt_manifest() for _, nodes in dbt_graph.filtered_nodes.items(): @@ -422,9 +492,8 @@ def test_update_node_dependency_test_not_exist(): @pytest.mark.parametrize("load_method", ["load_via_dbt_ls", "load_from_dbt_manifest"]) def test_load_dbt_ls_and_manifest_with_model_version(load_method): dbt_graph = DbtGraph( - project=DbtProject( - name="model_version", - root_dir=DBT_PROJECTS_ROOT_DIR, + project=ProjectConfig( + dbt_project_path=DBT_PROJECTS_ROOT_DIR / "model_version", manifest_path=SAMPLE_MANIFEST_MODEL_VERSION if load_method == "load_from_dbt_manifest" else None, ), profile_config=ProfileConfig( diff --git a/tests/test_config.py b/tests/test_config.py index 9dd936fba..c0ae32101 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -2,7 +2,7 @@ import pytest -from cosmos.config import ProjectConfig +from cosmos.config import ProfileConfig, ProjectConfig from cosmos.exceptions import CosmosValueError @@ -10,29 +10,56 @@ PIPELINE_FOLDER = "jaffle_shop" -def test_valid_parameters(): +def test_init_with_project_path_only(): + """ + Passing dbt_project_path on its own should create a valid ProjectConfig with relative paths defined + It should also have a project_name based on the path + """ project_config = ProjectConfig(dbt_project_path="path/to/dbt/project") - assert project_config.parsed_dbt_project_path == Path("path/to/dbt/project") - assert project_config.models_relative_path == Path("path/to/dbt/project/models") - assert project_config.seeds_relative_path == Path("path/to/dbt/project/seeds") - assert project_config.snapshots_relative_path == Path("path/to/dbt/project/snapshots") + assert project_config.dbt_project_path == Path("path/to/dbt/project") + assert project_config.models_path == Path("path/to/dbt/project/models") + assert project_config.seeds_path == Path("path/to/dbt/project/seeds") + assert project_config.snapshots_path == Path("path/to/dbt/project/snapshots") + assert project_config.project_name == "project" assert project_config.manifest_path is None def test_init_with_manifest_path_and_project_path_succeeds(): """ - Passing a manifest path AND project path together should succeed, as previous + Passing a manifest path AND project path together should succeed + project_name in this case should be based on dbt_project_path """ project_config = ProjectConfig(dbt_project_path="/tmp/some-path", manifest_path="target/manifest.json") - assert project_config.parsed_manifest_path == Path("target/manifest.json") + assert project_config.manifest_path == Path("target/manifest.json") + assert project_config.project_name == "some-path" + + +def test_init_with_no_params(): + """ + The constructor now validates that the required base fields are present + As such, we should test here that the correct exception is raised if these are not correctly defined + This functionality has been moved from the validate method + """ + with pytest.raises(CosmosValueError) as err_info: + ProjectConfig() + print(err_info.value.args[0]) + assert err_info.value.args[0] == ( + "ProjectConfig requires dbt_project_path and/or manifest_path to be defined." + "If only manifest_path is defined, project_name must also be defined." + ) -def test_init_with_manifest_path_and_not_project_path_succeeds(): +def test_init_with_manifest_path_and_not_project_path_and_not_project_name_fails(): """ - Since dbt_project_path is optional, we should be able to operate with only a manifest + Passing a manifest alone should fail since we also require a project_name """ - project_config = ProjectConfig(manifest_path="target/manifest.json") - assert project_config.parsed_manifest_path == Path("target/manifest.json") + with pytest.raises(CosmosValueError) as err_info: + ProjectConfig(manifest_path=DBT_PROJECTS_ROOT_DIR / "manifest.json") + print(err_info.value.args[0]) + assert err_info.value.args[0] == ( + "ProjectConfig requires dbt_project_path and/or manifest_path to be defined." + "If only manifest_path is defined, project_name must also be defined." + ) def test_validate_with_project_path_and_manifest_path_succeeds(): @@ -53,17 +80,6 @@ def test_validate_with_project_path_and_not_manifest_path_succeeds(): assert project_config.validate_project() is None -def test_validate_with_manifest_path_and_not_project_path_and_not_project_name_fails(): - """ - Passing a manifest alone should fail since we also require a project_name - """ - project_config = ProjectConfig(manifest_path=DBT_PROJECTS_ROOT_DIR / "manifest.json") - with pytest.raises(CosmosValueError) as err_info: - assert project_config.validate_project() is None - print(err_info.value.args[0]) - assert err_info.value.args[0] == "project_name required when manifest_path is present and dbt_project_path is not." - - def test_validate_with_manifest_path_and_project_name_and_not_project_path_succeeds(): """ Passing a manifest and project name together should succeed. @@ -72,16 +88,6 @@ def test_validate_with_manifest_path_and_project_name_and_not_project_path_succe assert project_config.validate_project() is None -def test_validate_no_paths_fails(): - """ - Passing no manifest and no project directory should fail. - """ - project_config = ProjectConfig() - with pytest.raises(CosmosValueError) as err_info: - assert project_config.validate_project() is None - assert err_info.value.args[0] == "dbt_project_path or manifest_path are required parameters." - - def test_validate_project_missing_fails(): """ Passing a project dir that does not exist where specified should fail @@ -89,7 +95,7 @@ def test_validate_project_missing_fails(): project_config = ProjectConfig(dbt_project_path=Path("/tmp")) with pytest.raises(CosmosValueError) as err_info: assert project_config.validate_project() is None - assert err_info.value.args[0] == "Could not find dbt_project.yml at /tmp/dbt_project.yml" + assert err_info.value.args[0] == "Could not find dbt_project.yml at /tmp/dbt_project.yml" def test_is_manifest_available_is_true(): @@ -107,3 +113,18 @@ def test_is_manifest_available_is_false(): def test_project_name(): dbt_project = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR) assert dbt_project.project_name == "sample" + + +def test_profile_config_post_init(): + with pytest.raises(CosmosValueError) as err_info: + ProfileConfig(profiles_yml_filepath="/tmp/some-profile", profile_name="test", target_name="test") + assert err_info.value.args[0] == "The file /tmp/some-profile does not exist." + + +def test_profile_config_validate(): + with pytest.raises(CosmosValueError) as err_info: + profile_config = ProfileConfig(profile_name="test", target_name="test") + assert profile_config.validate_profile() is None + assert ( + err_info.value.args[0] == "Either profiles_yml_filepath or profile_mapping must be set to render a profile" + ) diff --git a/tests/test_converter.py b/tests/test_converter.py index 0d321730a..5fb50a00f 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -12,6 +12,7 @@ SAMPLE_PROFILE_YML = Path(__file__).parent / "sample/profiles.yml" SAMPLE_DBT_PROJECT = Path(__file__).parent / "sample/" +SAMPLE_DBT_MANIFEST = Path(__file__).parent / "sample/manifest.json" @pytest.mark.parametrize("argument_key", ["tags", "paths"]) @@ -48,7 +49,8 @@ def test_validate_arguments_tags(argument_key): @patch("cosmos.converter.DbtGraph.load") def test_converter_creates_dag_with_seed(mock_load_dbt_graph, execution_mode, operator_args): """ - This test will raise exceptions if we are trying to pass incorrect arguments to operator constructors. + This test validates that a project, given only a project path as a Path() Object, and seeds + is able to successfully generate a converter """ project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT) execution_config = ExecutionConfig(execution_mode=execution_mode) @@ -67,3 +69,70 @@ def test_converter_creates_dag_with_seed(mock_load_dbt_graph, execution_mode, op operator_args=operator_args, ) assert converter + + +@pytest.mark.parametrize( + "execution_mode,operator_args", + [ + (ExecutionMode.KUBERNETES, {}), + # (ExecutionMode.DOCKER, {"image": "sample-image"}), + ], +) +@patch("cosmos.converter.DbtGraph.filtered_nodes", nodes) +@patch("cosmos.converter.DbtGraph.load") +def test_converter_creates_dag_with_project_path_str(mock_load_dbt_graph, execution_mode, operator_args): + """ + This test validates that a project, given only a project path as a string, and seeds + is able to successfully generate a converter + """ + project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT.as_posix()) + execution_config = ExecutionConfig(execution_mode=execution_mode) + render_config = RenderConfig(emit_datasets=True) + profile_config = ProfileConfig( + profile_name="my_profile_name", + target_name="my_target_name", + profiles_yml_filepath=SAMPLE_PROFILE_YML, + ) + converter = DbtToAirflowConverter( + nodes=nodes, + project_config=project_config, + profile_config=profile_config, + execution_config=execution_config, + render_config=render_config, + operator_args=operator_args, + ) + assert converter + + +@pytest.mark.parametrize( + "execution_mode,operator_args", + [ + (ExecutionMode.KUBERNETES, {}), + # (ExecutionMode.DOCKER, {"image": "sample-image"}), + ], +) +@patch("cosmos.converter.DbtGraph.filtered_nodes", nodes) +@patch("cosmos.converter.DbtGraph.load") +def test_converter_fails_no_project_dir(mock_load_dbt_graph, execution_mode, operator_args): + """ + This test validates that a project, given a manifest path and project name, with seeds + is able to successfully generate a converter + """ + project_config = ProjectConfig(manifest_path=SAMPLE_DBT_MANIFEST.as_posix(), project_name="sample") + execution_config = ExecutionConfig(execution_mode=execution_mode) + render_config = RenderConfig(emit_datasets=True) + profile_config = ProfileConfig( + profile_name="my_profile_name", + target_name="my_target_name", + profiles_yml_filepath=SAMPLE_PROFILE_YML, + ) + with pytest.raises(CosmosValueError) as err_info: + DbtToAirflowConverter( + nodes=nodes, + project_config=project_config, + profile_config=profile_config, + execution_config=execution_config, + render_config=render_config, + operator_args=operator_args, + ) + assert err_info.value.args[0] == "A Project Path in ProjectConfig is required for generating a Task Operators." From 8b96120a7732f137bf1e8a4bff130ec3d7d3ca06 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 25 Oct 2023 11:19:37 +0100 Subject: [PATCH 02/11] =?UTF-8?q?=E2=AC=86=20[pre-commit.ci]=20pre-commit?= =?UTF-8?q?=20autoupdate=20(#621)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit updates: - [github.com/astral-sh/ruff-pre-commit: v0.0.292 → v0.1.1](https://github.com/astral-sh/ruff-pre-commit/compare/v0.0.292...v0.1.1) - [github.com/psf/black: 23.9.1 → 23.10.0](https://github.com/psf/black/compare/23.9.1...23.10.0) - [github.com/pre-commit/mirrors-mypy: v1.6.0 → v1.6.1](https://github.com/pre-commit/mirrors-mypy/compare/v1.6.0...v1.6.1) Co-authored-by: Tatiana Al-Chueyr --- .pre-commit-config.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 90a163e09..f363097ac 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -53,13 +53,13 @@ repos: - --py37-plus - --keep-runtime-typing - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.0.292 + rev: v0.1.1 hooks: - id: ruff args: - --fix - repo: https://github.com/psf/black - rev: 23.9.1 + rev: 23.10.0 hooks: - id: black args: [ "--config", "./pyproject.toml" ] @@ -70,7 +70,7 @@ repos: alias: black additional_dependencies: [black>=22.10.0] - repo: https://github.com/pre-commit/mirrors-mypy - rev: 'v1.6.0' + rev: 'v1.6.1' hooks: - id: mypy name: mypy-python From 9056125d300ec1ddc1d30654d5318eaeac45fd1f Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Wed, 25 Oct 2023 11:20:38 +0100 Subject: [PATCH 03/11] Fix running test that validates manifest-based DAGs (#619) When checking the latest test runs in several branches for**Run-Integration-Tests (3.8, 2.7)**, example: https://github.com/astronomer/astronomer-cosmos/actions/runs/6612130380/job/17957443769?pr=605#logs I noticed that `cosmos_manifest_example` was not being run. This PR solves this issue. --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index ef1fba2f9..204ef771b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -180,7 +180,7 @@ pytest -vv \ --cov-report=xml \ --durations=0 \ -m integration \ --k 'not (sqlite or example_cosmos_sources or example_cosmos_python_models or example_virtualenv or cosmos_manifest_example)'""" +-k 'not (sqlite or example_cosmos_sources or example_cosmos_python_models or example_virtualenv)'""" test-integration-expensive = """pytest -vv \ --cov=cosmos \ --cov-report=term-missing \ From 682fc67a6afee398a648d6350ec0e8448fe36c6e Mon Sep 17 00:00:00 2001 From: Justin Bandoro <79104794+jbandoro@users.noreply.github.com> Date: Wed, 25 Oct 2023 06:58:56 -0700 Subject: [PATCH 04/11] Add `DbtDocsGCSOperator` (#616) Adds `DbtDocsGCSOperator` so dbt docs can be uploaded to GCS. Closes: #541 ## Breaking Change? No breaking changes but standardized `DbtDocsS3LocalOperator`, `DbtDocsAzureStorageLocalOperator` to accept args for `connection_id` and `bucket_name`. The current args of `aws_conn_id` (S3), `azure_conn_id` and `container_name` (Azure) will still work with warnings to switch to `connection_id` and `bucket_name`. --- cosmos/operators/__init__.py | 2 + cosmos/operators/local.py | 141 ++++++++++++++++++------- dev/dags/dbt_docs.py | 23 +++- docs/configuration/generating-docs.rst | 30 +++++- tests/operators/test_local.py | 33 +++++- 5 files changed, 183 insertions(+), 46 deletions(-) diff --git a/cosmos/operators/__init__.py b/cosmos/operators/__init__.py index c3155a9f9..b7e36abff 100644 --- a/cosmos/operators/__init__.py +++ b/cosmos/operators/__init__.py @@ -2,6 +2,7 @@ from .local import DbtDocsAzureStorageLocalOperator as DbtDocsAzureStorageOperator from .local import DbtDocsLocalOperator as DbtDocsOperator from .local import DbtDocsS3LocalOperator as DbtDocsS3Operator +from .local import DbtDocsGCSLocalOperator as DbtDocsGCSOperator from .local import DbtLSLocalOperator as DbtLSOperator from .local import DbtRunLocalOperator as DbtRunOperator from .local import DbtRunOperationLocalOperator as DbtRunOperationOperator @@ -20,4 +21,5 @@ "DbtDocsOperator", "DbtDocsS3Operator", "DbtDocsAzureStorageOperator", + "DbtDocsGCSOperator", ] diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index c033f33f3..489a92ba9 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -7,6 +7,8 @@ from attr import define from pathlib import Path from typing import Any, Callable, Literal, Sequence, TYPE_CHECKING +from abc import ABC, abstractmethod +import warnings import airflow import yaml @@ -539,40 +541,65 @@ def __init__(self, **kwargs: Any) -> None: self.base_cmd = ["docs", "generate"] -class DbtDocsS3LocalOperator(DbtDocsLocalOperator): +class DbtDocsCloudLocalOperator(DbtDocsLocalOperator, ABC): """ - Executes `dbt docs generate` command and upload to S3 storage. Returns the S3 path to the generated documentation. - - :param aws_conn_id: S3's Airflow connection ID - :param bucket_name: S3's bucket name - :param folder_dir: This can be used to specify under which directory the generated DBT documentation should be - uploaded. + Abstract class for operators that upload the generated documentation to cloud storage. """ - ui_color = "#FF9900" - def __init__( self, - aws_conn_id: str, + connection_id: str, bucket_name: str, folder_dir: str | None = None, - **kwargs: str, + **kwargs: Any, ) -> None: "Initializes the operator." - self.aws_conn_id = aws_conn_id + self.connection_id = connection_id self.bucket_name = bucket_name self.folder_dir = folder_dir super().__init__(**kwargs) # override the callback with our own - self.callback = self.upload_to_s3 + self.callback = self.upload_to_cloud_storage + + @abstractmethod + def upload_to_cloud_storage(self, project_dir: str) -> None: + """Abstract method to upload the generated documentation to cloud storage.""" + + +class DbtDocsS3LocalOperator(DbtDocsCloudLocalOperator): + """ + Executes `dbt docs generate` command and upload to S3 storage. Returns the S3 path to the generated documentation. + + :param connection_id: S3's Airflow connection ID + :param bucket_name: S3's bucket name + :param folder_dir: This can be used to specify under which directory the generated DBT documentation should be + uploaded. + """ + + ui_color = "#FF9900" + + def __init__( + self, + *args: Any, + aws_conn_id: str | None = None, + **kwargs: Any, + ) -> None: + if aws_conn_id: + warnings.warn( + "Please, use `connection_id` instead of `aws_conn_id`. The argument `aws_conn_id` will be" + " deprecated in Cosmos 2.0", + DeprecationWarning, + ) + kwargs["connection_id"] = aws_conn_id + super().__init__(*args, **kwargs) - def upload_to_s3(self, project_dir: str) -> None: + def upload_to_cloud_storage(self, project_dir: str) -> None: "Uploads the generated documentation to S3." logger.info( 'Attempting to upload generated docs to S3 using S3Hook("%s")', - self.aws_conn_id, + self.connection_id, ) from airflow.providers.amazon.aws.hooks.s3 import S3Hook @@ -580,7 +607,7 @@ def upload_to_s3(self, project_dir: str) -> None: target_dir = f"{project_dir}/target" hook = S3Hook( - self.aws_conn_id, + self.connection_id, extra_args={ "ContentType": "text/html", }, @@ -599,12 +626,12 @@ def upload_to_s3(self, project_dir: str) -> None: ) -class DbtDocsAzureStorageLocalOperator(DbtDocsLocalOperator): +class DbtDocsAzureStorageLocalOperator(DbtDocsCloudLocalOperator): """ Executes `dbt docs generate` command and upload to Azure Blob Storage. - :param azure_conn_id: Azure Blob Storage's Airflow connection ID - :param container_name: Azure Blob Storage's bucket name + :param connection_id: Azure Blob Storage's Airflow connection ID + :param bucket_name: Azure Blob Storage's bucket name :param folder_dir: This can be used to specify under which directory the generated DBT documentation should be uploaded. """ @@ -613,26 +640,32 @@ class DbtDocsAzureStorageLocalOperator(DbtDocsLocalOperator): def __init__( self, - azure_conn_id: str, - container_name: str, - folder_dir: str | None = None, - **kwargs: str, + *args: Any, + azure_conn_id: str | None = None, + container_name: str | None = None, + **kwargs: Any, ) -> None: - "Initializes the operator." - self.azure_conn_id = azure_conn_id - self.container_name = container_name - self.folder_dir = folder_dir - - super().__init__(**kwargs) - - # override the callback with our own - self.callback = self.upload_to_azure + if azure_conn_id: + warnings.warn( + "Please, use `connection_id` instead of `azure_conn_id`. The argument `azure_conn_id` will" + " be deprecated in Cosmos 2.0", + DeprecationWarning, + ) + kwargs["connection_id"] = azure_conn_id + if container_name: + warnings.warn( + "Please, use `bucket_name` instead of `container_name`. The argument `container_name` will" + " be deprecated in Cosmos 2.0", + DeprecationWarning, + ) + kwargs["bucket_name"] = container_name + super().__init__(*args, **kwargs) - def upload_to_azure(self, project_dir: str) -> None: + def upload_to_cloud_storage(self, project_dir: str) -> None: "Uploads the generated documentation to Azure Blob Storage." logger.info( 'Attempting to upload generated docs to Azure Blob Storage using WasbHook(conn_id="%s")', - self.azure_conn_id, + self.connection_id, ) from airflow.providers.microsoft.azure.hooks.wasb import WasbHook @@ -640,26 +673,60 @@ def upload_to_azure(self, project_dir: str) -> None: target_dir = f"{project_dir}/target" hook = WasbHook( - self.azure_conn_id, + self.connection_id, ) for filename in self.required_files: logger.info( "Uploading %s to %s", filename, - f"wasb://{self.container_name}/{filename}", + f"wasb://{self.bucket_name}/{filename}", ) blob_name = f"{self.folder_dir}/{filename}" if self.folder_dir else filename hook.load_file( file_path=f"{target_dir}/{filename}", - container_name=self.container_name, + container_name=self.bucket_name, blob_name=blob_name, overwrite=True, ) +class DbtDocsGCSLocalOperator(DbtDocsCloudLocalOperator): + """ + Executes `dbt docs generate` command and upload to GCS. + + :param connection_id: Google Cloud Storage's Airflow connection ID + :param bucket_name: Google Cloud Storage's bucket name + :param folder_dir: This can be used to specify under which directory the generated DBT documentation should be + uploaded. + """ + + ui_color = "#4772d5" + + def upload_to_cloud_storage(self, project_dir: str) -> None: + "Uploads the generated documentation to Google Cloud Storage" + logger.info( + 'Attempting to upload generated docs to Storage using GCSHook(conn_id="%s")', + self.connection_id, + ) + + from airflow.providers.google.cloud.hooks.gcs import GCSHook + + target_dir = f"{project_dir}/target" + hook = GCSHook(self.connection_id) + + for filename in self.required_files: + blob_name = f"{self.folder_dir}/{filename}" if self.folder_dir else filename + logger.info("Uploading %s to %s", filename, f"gs://{self.bucket_name}/{blob_name}") + hook.upload( + filename=f"{target_dir}/{filename}", + bucket_name=self.bucket_name, + object_name=blob_name, + ) + + class DbtDepsLocalOperator(DbtLocalBaseOperator): """ Executes a dbt core deps command. diff --git a/dev/dags/dbt_docs.py b/dev/dags/dbt_docs.py index 1fcd1c341..edf89bdab 100644 --- a/dev/dags/dbt_docs.py +++ b/dev/dags/dbt_docs.py @@ -20,6 +20,7 @@ from cosmos.operators import ( DbtDocsAzureStorageOperator, DbtDocsS3Operator, + DbtDocsGCSOperator, ) from cosmos.profiles import PostgresUserPasswordProfileMapping @@ -28,6 +29,7 @@ S3_CONN_ID = "aws_docs" AZURE_CONN_ID = "azure_docs" +GCS_CONN_ID = "gcs_docs" profile_config = ProfileConfig( profile_name="default", @@ -56,6 +58,11 @@ def which_upload(): downstream_tasks_to_run += ["generate_dbt_docs_azure"] except AirflowNotFoundException: pass + try: + BaseHook.get_connection(GCS_CONN_ID) + downstream_tasks_to_run += ["generate_dbt_docs_gcs"] + except AirflowNotFoundException: + pass return downstream_tasks_to_run @@ -72,7 +79,7 @@ def which_upload(): task_id="generate_dbt_docs_aws", project_dir=DBT_ROOT_PATH / "jaffle_shop", profile_config=profile_config, - aws_conn_id=S3_CONN_ID, + connection_id=S3_CONN_ID, bucket_name="cosmos-docs", ) @@ -80,8 +87,16 @@ def which_upload(): task_id="generate_dbt_docs_azure", project_dir=DBT_ROOT_PATH / "jaffle_shop", profile_config=profile_config, - azure_conn_id=AZURE_CONN_ID, - container_name="$web", + connection_id=AZURE_CONN_ID, + bucket_name="$web", + ) + + generate_dbt_docs_gcs = DbtDocsGCSOperator( + task_id="generate_dbt_docs_gcs", + project_dir=DBT_ROOT_PATH / "jaffle_shop", + profile_config=profile_config, + connection_id=GCS_CONN_ID, + bucket_name="cosmos-docs", ) - which_upload() >> [generate_dbt_docs_aws, generate_dbt_docs_azure] + which_upload() >> [generate_dbt_docs_aws, generate_dbt_docs_azure, generate_dbt_docs_gcs] diff --git a/docs/configuration/generating-docs.rst b/docs/configuration/generating-docs.rst index 925b60e04..88459fd14 100644 --- a/docs/configuration/generating-docs.rst +++ b/docs/configuration/generating-docs.rst @@ -11,9 +11,10 @@ Cosmos offers two pre-built ways of generating and uploading dbt docs and a fall - :class:`~cosmos.operators.DbtDocsS3Operator`: generates and uploads docs to a S3 bucket. - :class:`~cosmos.operators.DbtDocsAzureStorageOperator`: generates and uploads docs to an Azure Blob Storage. +- :class:`~cosmos.operators.DbtDocsGCSOperator`: generates and uploads docs to a GCS bucket. - :class:`~cosmos.operators.DbtDocsOperator`: generates docs and runs a custom callback. -The first two operators require you to have a connection to the target storage. The third operator allows you to run custom code after the docs are generated in order to upload them to a storage of your choice. +The first three operators require you to have a connection to the target storage. The last operator allows you to run custom code after the docs are generated in order to upload them to a storage of your choice. Examples @@ -36,7 +37,7 @@ You can use the :class:`~cosmos.operators.DbtDocsS3Operator` to generate and upl project_dir="path/to/jaffle_shop", profile_config=profile_config, # docs-specific arguments - aws_conn_id="test_aws", + connection_id="test_aws", bucket_name="test_bucket", ) @@ -57,8 +58,29 @@ You can use the :class:`~cosmos.operators.DbtDocsAzureStorageOperator` to genera project_dir="path/to/jaffle_shop", profile_config=profile_config, # docs-specific arguments - azure_conn_id="test_azure", - container_name="$web", + connection_id="test_azure", + bucket_name="$web", + ) + +Upload to GCS +~~~~~~~~~~~~~~~~~~~~~~~ + +GCS supports serving static files directly from a bucket. To learn more (and to set it up), check out the `official GCS documentation `_. + +You can use the :class:`~cosmos.operators.DbtDocsGCSOperator` to generate and upload docs to a S3 bucket. The following code snippet shows how to do this with the default jaffle_shop project: + +.. code-block:: python + + from cosmos.operators import DbtDocsGCSOperator + + # then, in your DAG code: + generate_dbt_docs_aws = DbtDocsGCSOperator( + task_id="generate_dbt_docs_gcs", + project_dir="path/to/jaffle_shop", + profile_config=profile_config, + # docs-specific arguments + connection_id="test_gcs", + bucket_name="test_bucket", ) Custom Callback diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 07c186d4a..b883adea5 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -1,9 +1,10 @@ import logging import os +import sys import shutil import tempfile from pathlib import Path -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock, patch, call import pytest from airflow import DAG @@ -24,6 +25,7 @@ DbtDocsLocalOperator, DbtDocsS3LocalOperator, DbtDocsAzureStorageLocalOperator, + DbtDocsGCSLocalOperator, DbtSeedLocalOperator, DbtRunOperationLocalOperator, ) @@ -379,6 +381,7 @@ def test_operator_execute_with_flags(mock_build_and_run_cmd, operator_class, kwa DbtDocsLocalOperator, DbtDocsS3LocalOperator, DbtDocsAzureStorageLocalOperator, + DbtDocsGCSLocalOperator, ), ) @patch("cosmos.operators.local.DbtLocalBaseOperator.build_and_run_cmd") @@ -386,6 +389,7 @@ def test_operator_execute_without_flags(mock_build_and_run_cmd, operator_class): operator_class_kwargs = { DbtDocsS3LocalOperator: {"aws_conn_id": "fake-conn", "bucket_name": "fake-bucket"}, DbtDocsAzureStorageLocalOperator: {"azure_conn_id": "fake-conn", "container_name": "fake-container"}, + DbtDocsGCSLocalOperator: {"connection_id": "fake-conn", "bucket_name": "fake-bucket"}, } task = operator_class( profile_config=profile_config, @@ -413,3 +417,30 @@ def test_calculate_openlineage_events_completes_openlineage_errors(mock_processo assert instance.parse.called err_msg = "Unable to parse OpenLineage events" assert err_msg in caplog.text + + +@patch.object(DbtDocsGCSLocalOperator, "required_files", ["file1", "file2"]) +def test_dbt_docs_gcs_local_operator(): + mock_gcs = MagicMock() + with patch.dict(sys.modules, {"airflow.providers.google.cloud.hooks.gcs": mock_gcs}): + operator = DbtDocsGCSLocalOperator( + task_id="fake-task", + project_dir="fake-dir", + profile_config=profile_config, + connection_id="fake-conn", + bucket_name="fake-bucket", + folder_dir="fake-folder", + ) + operator.upload_to_cloud_storage("fake-dir") + + # assert that GCSHook was called with the connection id + mock_gcs.GCSHook.assert_called_once_with("fake-conn") + + mock_hook = mock_gcs.GCSHook.return_value + # assert that upload was called twice with the expected arguments + assert mock_hook.upload.call_count == 2 + expected_upload_calls = [ + call(filename="fake-dir/target/file1", bucket_name="fake-bucket", object_name="fake-folder/file1"), + call(filename="fake-dir/target/file2", bucket_name="fake-bucket", object_name="fake-folder/file2"), + ] + mock_hook.upload.assert_has_calls(expected_upload_calls) From a4aa5cc900b5af4f263e2e612dec19d213d27a5c Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Wed, 25 Oct 2023 17:25:01 +0100 Subject: [PATCH 05/11] Remove unnecessary stack trace from Cosmos initialization (#624) As of Cosmos 1.2.0, whenever imported, the library would print a long stack trace related to Openlineage if it was not available as a warning. Change only to print the stack trace if DEBUG log mode is used. --- cosmos/operators/local.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 489a92ba9..a593101fc 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -57,7 +57,10 @@ from openlineage.airflow.extractors.base import OperatorLineage except (ImportError, ModuleNotFoundError): logger.warning( - "To enable emitting Openlineage events, upgrade to Airflow 2.7 or install astronomer-cosmos[openlineage].", + "To enable emitting Openlineage events, upgrade to Airflow 2.7 or install astronomer-cosmos[openlineage]." + ) + logger.debug( + "Further details on lack of Openlineage:", stack_info=True, ) is_openlineage_available = False From ad7dcf0b419f247cab7effcc205ce16a9b922d32 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Wed, 25 Oct 2023 19:10:10 +0100 Subject: [PATCH 06/11] Fix `LoadMode.AUTOMATIC` behaviour to use `LoadMode.DBT_LS` when `ProfileMapping` is used (#625) Since #489 was merged, the behavior of `LoadMode.AUTOMATIC` changed to generate a `profiles.yml` file if the file didn't exist. However, we forgot to remove the previously necessary condition for being able to run `LoadMode.DBT_LS` (having the `profiles.yml` file). This leads to inconsistent behaviour in Cosmos when using `LoadMode.AUTOMATIC` and the `manifest.json` was not available: 1. If the user used a `ProfileConfig` with `profiles_yml_filepath`, it would use `LoadMode.DBT_LS` 2. If the user used a `ProfileConfig` with a ProfileMapping class, it would unnecessarily use `LoadMode.CUSTOM` This PR fixes the behaviour to attempt to use `LoadMode.DBT_LS` regardless of how the `ProfileConfig` was set. --- cosmos/config.py | 6 ------ cosmos/dbt/graph.py | 6 +----- tests/dbt/test_graph.py | 26 +++++++++++++++++++++----- tests/operators/test_local.py | 3 ++- 4 files changed, 24 insertions(+), 17 deletions(-) diff --git a/cosmos/config.py b/cosmos/config.py index 15f88e037..610e6e489 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -163,12 +163,6 @@ def validate_profile(self) -> None: if not self.profiles_yml_filepath and not self.profile_mapping: raise CosmosValueError("Either profiles_yml_filepath or profile_mapping must be set to render a profile") - def is_profile_yml_available(self) -> bool: - """ - Check if the `dbt` profiles.yml file exists. - """ - return Path(self.profiles_yml_filepath).exists() if self.profiles_yml_filepath else False - @contextlib.contextmanager def ensure_profile( self, desired_profile_path: Path | None = None, use_mock_values: bool = False diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index ac92c46f5..c41ee0713 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -121,11 +121,7 @@ def load( if self.project.is_manifest_available(): self.load_from_dbt_manifest() else: - if ( - execution_mode == ExecutionMode.LOCAL - and self.profile_config - and self.profile_config.is_profile_yml_available() - ): + if execution_mode == ExecutionMode.LOCAL and self.profile_config: try: self.load_via_dbt_ls() except FileNotFoundError: diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index f176fa445..3927bbfdd 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -82,12 +82,10 @@ def test_load_automatic_manifest_is_available(mock_load_from_dbt_manifest): assert mock_load_from_dbt_manifest.called -@patch("cosmos.dbt.graph.DbtGraph.load_via_custom_parser", side_effect=FileNotFoundError()) +@patch("cosmos.dbt.graph.DbtGraph.load_via_custom_parser", side_effect=None) @patch("cosmos.dbt.graph.DbtGraph.load_via_dbt_ls", return_value=None) -def test_load_automatic_without_manifest(mock_load_via_dbt_ls, mock_load_via_custom_parser): - project_config = ProjectConfig( - dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME, manifest_path="/tmp/manifest.json" - ) +def test_load_automatic_without_manifest_with_profile_yml(mock_load_via_dbt_ls, mock_load_via_custom_parser): + project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) profile_config = ProfileConfig( profile_name="test", target_name="test", @@ -99,6 +97,24 @@ def test_load_automatic_without_manifest(mock_load_via_dbt_ls, mock_load_via_cus assert not mock_load_via_custom_parser.called +@patch("cosmos.dbt.graph.DbtGraph.load_via_custom_parser", side_effect=None) +@patch("cosmos.dbt.graph.DbtGraph.load_via_dbt_ls", return_value=None) +def test_load_automatic_without_manifest_with_profile_mapping(mock_load_via_dbt_ls, mock_load_via_custom_parser): + project_config = ProjectConfig(dbt_project_path=DBT_PROJECTS_ROOT_DIR / DBT_PROJECT_NAME) + profile_config = ProfileConfig( + profile_name="test", + target_name="test", + profile_mapping=PostgresUserPasswordProfileMapping( + conn_id="airflow_db", + profile_args={"schema": "public"}, + ), + ) + dbt_graph = DbtGraph(project=project_config, profile_config=profile_config) + dbt_graph.load(execution_mode=ExecutionMode.LOCAL) + assert mock_load_via_dbt_ls.called + assert not mock_load_via_custom_parser.called + + @patch("cosmos.dbt.graph.DbtGraph.load_via_custom_parser", return_value=None) @patch("cosmos.dbt.graph.DbtGraph.load_via_dbt_ls", side_effect=FileNotFoundError()) def test_load_automatic_without_manifest_and_without_dbt_cmd(mock_load_via_dbt_ls, mock_load_via_custom_parser): diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index b883adea5..580d49e6c 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -139,7 +139,8 @@ def test_dbt_base_operator_use_indirect_selection(indirect_selection_type) -> No assert cmd[-2] == "--indirect-selection" assert cmd[-1] == indirect_selection_type else: - assert cmd == ["dbt", "run"] + assert cmd[0].endswith("dbt") + assert cmd[1] == "run" @pytest.mark.parametrize( From 4c65f25febdf254d513a878c06f5b50007033890 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Wed, 25 Oct 2023 23:23:20 +0100 Subject: [PATCH 07/11] Do not block Cosmos if openlineage-common raises a jinja exception (#626) Before this change, Cosmos failed to run if there is an issue in the openlineage-common parsing of the dbt project due to a jinja2 exception, which did not happen when running the dbt command by itself: https://github.com/astronomer/astronomer-cosmos/issues/612#issuecomment-1777563297 ``` File "/usr/local/lib/python3.11/site-packages/cosmos/operators/local.py", line 268, in calculate_openlineage_events_completes openlineage_processor = DbtLocalArtifactProcessor( ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/openlineage/common/provider/dbt/local.py", line 57, in __init__ dbt_project = self.load_yaml_with_jinja( ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/openlineage/common/provider/dbt/local.py", line 157, in load_yaml_with_jinja return self.render_values_jinja( ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/openlineage/common/provider/dbt/local.py", line 180, in render_values_jinja parsed_dict[key] = cls.render_values_jinja( ^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/openlineage/common/provider/dbt/local.py", line 180, in render_values_jinja parsed_dict[key] = cls.render_values_jinja( ^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/openlineage/common/provider/dbt/local.py", line 180, in render_values_jinja parsed_dict[key] = cls.render_values_jinja( ^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/openlineage/common/provider/dbt/local.py", line 190, in render_values_jinja return environment.from_string(value).render() # type: ignore ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/jinja2/environment.py", line 1301, in render self.environment.handle_exception() File "/usr/local/lib/python3.11/site-packages/jinja2/environment.py", line 936, in handle_exception raise rewrite_traceback_stack(source=source) File "