From 1ae8ea29062bca6950c5e9e044299425d9d94543 Mon Sep 17 00:00:00 2001 From: tabmra Date: Tue, 17 Oct 2023 11:50:43 +0800 Subject: [PATCH] resolved errors occuring when dbt_project_path was None, or str --- cosmos/converter.py | 6 ++-- cosmos/dbt/graph.py | 67 ++++++++++++++++++++++++++--------------- cosmos/dbt/project.py | 28 ++++++++++------- cosmos/dbt/selector.py | 9 ++++-- tests/dbt/test_graph.py | 2 +- tests/test_converter.py | 66 ++++++++++++++++++++++++++++++++++++++++ 6 files changed, 136 insertions(+), 42 deletions(-) diff --git a/cosmos/converter.py b/cosmos/converter.py index 8137da3ec..5b7e646f2 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -106,8 +106,8 @@ def __init__( project_config.validate_project() emit_datasets = render_config.emit_datasets - dbt_root_path = project_config.dbt_project_path.parent - dbt_project_name = project_config.dbt_project_path.name + dbt_root_path = project_config.parsed_dbt_project_path + dbt_project_name = project_config.project_name dbt_models_dir = project_config.models_relative_path dbt_seeds_dir = project_config.seeds_relative_path dbt_snapshots_dir = project_config.snapshots_relative_path @@ -130,7 +130,7 @@ def __init__( dbt_project = DbtProject( name=dbt_project_name, - root_dir=Path(dbt_root_path), + root_dir=dbt_root_path.parent if dbt_root_path else None, models_dir=Path(dbt_models_dir) if dbt_models_dir else None, seeds_dir=Path(dbt_seeds_dir) if dbt_seeds_dir else None, snapshots_dir=Path(dbt_snapshots_dir) if dbt_snapshots_dir else None, diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index 889948fdd..6bcde06e4 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -107,31 +107,45 @@ def load( :param method: How to load `nodes` from a `dbt` project (automatically, using custom parser, using dbt manifest or dbt ls) :param execution_mode: Where Cosmos should run each dbt task (e.g. ExecutionMode.KUBERNETES) + + Fundamentally, there are two different execution paths + There is automatic, and manual. """ - load_method = { - LoadMode.CUSTOM: self.load_via_custom_parser, - LoadMode.DBT_LS: self.load_via_dbt_ls, - LoadMode.DBT_MANIFEST: self.load_from_dbt_manifest, - } + + def load_manual(method: LoadMode) -> None: + load_method = { + LoadMode.CUSTOM: self.load_via_custom_parser, + LoadMode.DBT_LS: self.load_via_dbt_ls, + LoadMode.DBT_MANIFEST: self.load_from_dbt_manifest, + } + + if method == LoadMode.DBT_MANIFEST: + if not self.project.is_manifest_available(): + raise CosmosLoadDbtException(f"Unable to load manifest using {self.project.manifest_path}") + elif method in [LoadMode.DBT_LS, LoadMode.CUSTOM]: + if not self.project.dir: + raise CosmosLoadDbtException( + "Unable to load using dbt_ls or custom method - Project Dir was not provided or does not exist." + ) + + load_method[method]() + + def load_automatic() -> None: + try: + load_manual(method=LoadMode.DBT_MANIFEST) + except CosmosLoadDbtException: + if execution_mode == ExecutionMode.LOCAL and self.project.is_profile_yml_available(): + try: + load_manual(method=LoadMode.DBT_LS) + except FileNotFoundError: + load_manual(method=LoadMode.CUSTOM) + else: + load_manual(method=LoadMode.CUSTOM) + if method == LoadMode.AUTOMATIC: - if self.project.is_manifest_available(): - self.load_from_dbt_manifest() - return - elif execution_mode == ExecutionMode.LOCAL and self.project.is_profile_yml_available(): - try: - self.load_via_dbt_ls() - return - except FileNotFoundError: - self.load_via_custom_parser() - return - else: - self.load_via_custom_parser() - return - - if method == LoadMode.DBT_MANIFEST and not self.project.is_manifest_available(): - raise CosmosLoadDbtException(f"Unable to load manifest using {self.project.manifest_path}") - - load_method[method]() + load_automatic() + else: + load_manual(method=method) def load_via_dbt_ls(self) -> None: """ @@ -147,8 +161,8 @@ def load_via_dbt_ls(self) -> None: """ logger.info("Trying to parse the dbt project `%s` in `%s` using dbt ls...", self.project.name, self.project.dir) - if not self.profile_config: - raise CosmosLoadDbtException("Unable to load dbt project without a profile config") + if not self.project.dir or not self.profile_config: + raise CosmosLoadDbtException("Unable to load dbt project without project files and a profile config") if not shutil.which(self.dbt_cmd): raise CosmosLoadDbtException(f"Unable to find the dbt executable: {self.dbt_cmd}") @@ -289,6 +303,9 @@ def load_via_custom_parser(self) -> None: """ logger.info("Trying to parse the dbt project `%s` using a custom Cosmos method...", self.project.name) + if not self.project.dir: + raise CosmosLoadDbtException("Unable to load dbt project without project files") + project = LegacyDbtProject( dbt_root_path=str(self.project.root_dir), dbt_models_dir=self.project.models_dir.stem if self.project.models_dir else None, diff --git a/cosmos/dbt/project.py b/cosmos/dbt/project.py index fe60f5751..da5c4284b 100644 --- a/cosmos/dbt/project.py +++ b/cosmos/dbt/project.py @@ -9,7 +9,7 @@ @dataclass class DbtProject: name: str - root_dir: Path + root_dir: Path | None = None models_dir: Path | None = None seeds_dir: Path | None = None snapshots_dir: Path | None = None @@ -18,21 +18,27 @@ class DbtProject: _cosmos_created_profile_file: bool = False def __post_init__(self) -> None: - if self.models_dir is None: - self.models_dir = self.dir / "models" - if self.seeds_dir is None: - self.seeds_dir = self.dir / "seeds" - if self.snapshots_dir is None: - self.snapshots_dir = self.dir / "snapshots" - if self.profile_path is None: - self.profile_path = self.dir / "profiles.yml" + """ + Since ProjectConfig does not require the dbt_project_path to be defined + DbtProject should also no longer require root_dir or any dependent paths + The project should be renderable with only a manifest.json + """ + if self.dir: + if self.models_dir is None: + self.models_dir = self.dir / "models" + if self.seeds_dir is None: + self.seeds_dir = self.dir / "seeds" + if self.snapshots_dir is None: + self.snapshots_dir = self.dir / "snapshots" + if self.profile_path is None: + self.profile_path = self.dir / "profiles.yml" @property - def dir(self) -> Path: + def dir(self) -> Path | None: """ Path to dbt pipeline, defined by self.root_dir and self.name. """ - return self.root_dir / self.name + return self.root_dir / self.name if self.root_dir else None def is_manifest_available(self) -> bool: """ diff --git a/cosmos/dbt/selector.py b/cosmos/dbt/selector.py index e10f5b9b2..4c70a9eec 100644 --- a/cosmos/dbt/selector.py +++ b/cosmos/dbt/selector.py @@ -27,7 +27,7 @@ class SelectorConfig: Supports to load it from a string. """ - def __init__(self, project_dir: Path, statement: str): + def __init__(self, project_dir: Path | None, statement: str): """ Create a selector config file. @@ -63,6 +63,8 @@ def load_from_statement(self, statement: str) -> None: items = statement.split(",") for item in items: if item.startswith(PATH_SELECTOR): + if not self.project_dir: + raise CosmosValueError("Can not select by path when no project directory is provided") index = len(PATH_SELECTOR) self.paths.append(self.project_dir / item[index:]) elif item.startswith(TAG_SELECTOR): @@ -165,7 +167,10 @@ def retrieve_by_label(statement_list: list[str], label: str) -> set[str]: def select_nodes( - project_dir: Path, nodes: dict[str, DbtNode], select: list[str] | None = None, exclude: list[str] | None = None + project_dir: Path | None, + nodes: dict[str, DbtNode], + select: list[str] | None = None, + exclude: list[str] | None = None, ) -> dict[str, DbtNode]: """ Given a group of nodes within a project, apply select and exclude filters using diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index dc4189aca..ab4858c72 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -233,7 +233,7 @@ def test_load_via_dbt_ls_without_profile(): with pytest.raises(CosmosLoadDbtException) as err_info: dbt_graph.load_via_dbt_ls() - expected = "Unable to load dbt project without a profile config" + expected = "Unable to load dbt project without project files and a profile config" assert err_info.value.args[0] == expected diff --git a/tests/test_converter.py b/tests/test_converter.py index 0d321730a..f2e427f29 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -12,6 +12,7 @@ SAMPLE_PROFILE_YML = Path(__file__).parent / "sample/profiles.yml" SAMPLE_DBT_PROJECT = Path(__file__).parent / "sample/" +SAMPLE_DBT_MANIFEST = Path(__file__).parent / "sample/manifest.json" @pytest.mark.parametrize("argument_key", ["tags", "paths"]) @@ -67,3 +68,68 @@ def test_converter_creates_dag_with_seed(mock_load_dbt_graph, execution_mode, op operator_args=operator_args, ) assert converter + + +@pytest.mark.parametrize( + "execution_mode,operator_args", + [ + (ExecutionMode.KUBERNETES, {}), + # (ExecutionMode.DOCKER, {"image": "sample-image"}), + ], +) +@patch("cosmos.converter.DbtGraph.filtered_nodes", nodes) +@patch("cosmos.converter.DbtGraph.load") +def test_converter_creates_dag_with_project_path_str(mock_load_dbt_graph, execution_mode, operator_args): + """ + This test will raise exceptions if we are trying to pass incorrect arguments to operator constructors. + """ + + project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT.as_posix()) + execution_config = ExecutionConfig(execution_mode=execution_mode) + render_config = RenderConfig(emit_datasets=True) + profile_config = ProfileConfig( + profile_name="my_profile_name", + target_name="my_target_name", + profiles_yml_filepath=SAMPLE_PROFILE_YML, + ) + converter = DbtToAirflowConverter( + nodes=nodes, + project_config=project_config, + profile_config=profile_config, + execution_config=execution_config, + render_config=render_config, + operator_args=operator_args, + ) + assert converter + + +@pytest.mark.parametrize( + "execution_mode,operator_args", + [ + (ExecutionMode.KUBERNETES, {}), + # (ExecutionMode.DOCKER, {"image": "sample-image"}), + ], +) +@patch("cosmos.converter.DbtGraph.filtered_nodes", nodes) +@patch("cosmos.converter.DbtGraph.load") +def test_converter_creates_dag_with_no_project_dir(mock_load_dbt_graph, execution_mode, operator_args): + """ + This test will raise exceptions if we are trying to pass incorrect arguments to operator constructors. + """ + project_config = ProjectConfig(manifest_path=SAMPLE_DBT_MANIFEST.as_posix(), project_name="sample") + execution_config = ExecutionConfig(execution_mode=execution_mode) + render_config = RenderConfig(emit_datasets=True) + profile_config = ProfileConfig( + profile_name="my_profile_name", + target_name="my_target_name", + profiles_yml_filepath=SAMPLE_PROFILE_YML, + ) + converter = DbtToAirflowConverter( + nodes=nodes, + project_config=project_config, + profile_config=profile_config, + execution_config=execution_config, + render_config=render_config, + operator_args=operator_args, + ) + assert converter