Skip to content

Commit

Permalink
resolved errors occuring when dbt_project_path was None, or str
Browse files Browse the repository at this point in the history
  • Loading branch information
tabmra committed Oct 17, 2023
1 parent d61f120 commit 1ae8ea2
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 42 deletions.
6 changes: 3 additions & 3 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ def __init__(
project_config.validate_project()

emit_datasets = render_config.emit_datasets
dbt_root_path = project_config.dbt_project_path.parent
dbt_project_name = project_config.dbt_project_path.name
dbt_root_path = project_config.parsed_dbt_project_path
dbt_project_name = project_config.project_name
dbt_models_dir = project_config.models_relative_path
dbt_seeds_dir = project_config.seeds_relative_path
dbt_snapshots_dir = project_config.snapshots_relative_path
Expand All @@ -130,7 +130,7 @@ def __init__(

dbt_project = DbtProject(
name=dbt_project_name,
root_dir=Path(dbt_root_path),
root_dir=dbt_root_path.parent if dbt_root_path else None,
models_dir=Path(dbt_models_dir) if dbt_models_dir else None,
seeds_dir=Path(dbt_seeds_dir) if dbt_seeds_dir else None,
snapshots_dir=Path(dbt_snapshots_dir) if dbt_snapshots_dir else None,
Expand Down
67 changes: 42 additions & 25 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,31 +107,45 @@ def load(
:param method: How to load `nodes` from a `dbt` project (automatically, using custom parser, using dbt manifest
or dbt ls)
:param execution_mode: Where Cosmos should run each dbt task (e.g. ExecutionMode.KUBERNETES)
Fundamentally, there are two different execution paths
There is automatic, and manual.
"""
load_method = {
LoadMode.CUSTOM: self.load_via_custom_parser,
LoadMode.DBT_LS: self.load_via_dbt_ls,
LoadMode.DBT_MANIFEST: self.load_from_dbt_manifest,
}

def load_manual(method: LoadMode) -> None:
load_method = {
LoadMode.CUSTOM: self.load_via_custom_parser,
LoadMode.DBT_LS: self.load_via_dbt_ls,
LoadMode.DBT_MANIFEST: self.load_from_dbt_manifest,
}

if method == LoadMode.DBT_MANIFEST:
if not self.project.is_manifest_available():
raise CosmosLoadDbtException(f"Unable to load manifest using {self.project.manifest_path}")
elif method in [LoadMode.DBT_LS, LoadMode.CUSTOM]:
if not self.project.dir:
raise CosmosLoadDbtException(
"Unable to load using dbt_ls or custom method - Project Dir was not provided or does not exist."
)

load_method[method]()

def load_automatic() -> None:
try:
load_manual(method=LoadMode.DBT_MANIFEST)
except CosmosLoadDbtException:
if execution_mode == ExecutionMode.LOCAL and self.project.is_profile_yml_available():
try:
load_manual(method=LoadMode.DBT_LS)
except FileNotFoundError:
load_manual(method=LoadMode.CUSTOM)
else:
load_manual(method=LoadMode.CUSTOM)

if method == LoadMode.AUTOMATIC:
if self.project.is_manifest_available():
self.load_from_dbt_manifest()
return
elif execution_mode == ExecutionMode.LOCAL and self.project.is_profile_yml_available():
try:
self.load_via_dbt_ls()
return
except FileNotFoundError:
self.load_via_custom_parser()
return
else:
self.load_via_custom_parser()
return

if method == LoadMode.DBT_MANIFEST and not self.project.is_manifest_available():
raise CosmosLoadDbtException(f"Unable to load manifest using {self.project.manifest_path}")

load_method[method]()
load_automatic()
else:
load_manual(method=method)

def load_via_dbt_ls(self) -> None:
"""
Expand All @@ -147,8 +161,8 @@ def load_via_dbt_ls(self) -> None:
"""
logger.info("Trying to parse the dbt project `%s` in `%s` using dbt ls...", self.project.name, self.project.dir)

if not self.profile_config:
raise CosmosLoadDbtException("Unable to load dbt project without a profile config")
if not self.project.dir or not self.profile_config:
raise CosmosLoadDbtException("Unable to load dbt project without project files and a profile config")

if not shutil.which(self.dbt_cmd):
raise CosmosLoadDbtException(f"Unable to find the dbt executable: {self.dbt_cmd}")
Expand Down Expand Up @@ -289,6 +303,9 @@ def load_via_custom_parser(self) -> None:
"""
logger.info("Trying to parse the dbt project `%s` using a custom Cosmos method...", self.project.name)

if not self.project.dir:
raise CosmosLoadDbtException("Unable to load dbt project without project files")

project = LegacyDbtProject(
dbt_root_path=str(self.project.root_dir),
dbt_models_dir=self.project.models_dir.stem if self.project.models_dir else None,
Expand Down
28 changes: 17 additions & 11 deletions cosmos/dbt/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
@dataclass
class DbtProject:
name: str
root_dir: Path
root_dir: Path | None = None
models_dir: Path | None = None
seeds_dir: Path | None = None
snapshots_dir: Path | None = None
Expand All @@ -18,21 +18,27 @@ class DbtProject:
_cosmos_created_profile_file: bool = False

def __post_init__(self) -> None:
if self.models_dir is None:
self.models_dir = self.dir / "models"
if self.seeds_dir is None:
self.seeds_dir = self.dir / "seeds"
if self.snapshots_dir is None:
self.snapshots_dir = self.dir / "snapshots"
if self.profile_path is None:
self.profile_path = self.dir / "profiles.yml"
"""
Since ProjectConfig does not require the dbt_project_path to be defined
DbtProject should also no longer require root_dir or any dependent paths
The project should be renderable with only a manifest.json
"""
if self.dir:
if self.models_dir is None:
self.models_dir = self.dir / "models"
if self.seeds_dir is None:
self.seeds_dir = self.dir / "seeds"
if self.snapshots_dir is None:
self.snapshots_dir = self.dir / "snapshots"
if self.profile_path is None:
self.profile_path = self.dir / "profiles.yml"

@property
def dir(self) -> Path:
def dir(self) -> Path | None:
"""
Path to dbt pipeline, defined by self.root_dir and self.name.
"""
return self.root_dir / self.name
return self.root_dir / self.name if self.root_dir else None

def is_manifest_available(self) -> bool:
"""
Expand Down
9 changes: 7 additions & 2 deletions cosmos/dbt/selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class SelectorConfig:
Supports to load it from a string.
"""

def __init__(self, project_dir: Path, statement: str):
def __init__(self, project_dir: Path | None, statement: str):
"""
Create a selector config file.
Expand Down Expand Up @@ -63,6 +63,8 @@ def load_from_statement(self, statement: str) -> None:
items = statement.split(",")
for item in items:
if item.startswith(PATH_SELECTOR):
if not self.project_dir:
raise CosmosValueError("Can not select by path when no project directory is provided")
index = len(PATH_SELECTOR)
self.paths.append(self.project_dir / item[index:])
elif item.startswith(TAG_SELECTOR):
Expand Down Expand Up @@ -165,7 +167,10 @@ def retrieve_by_label(statement_list: list[str], label: str) -> set[str]:


def select_nodes(
project_dir: Path, nodes: dict[str, DbtNode], select: list[str] | None = None, exclude: list[str] | None = None
project_dir: Path | None,
nodes: dict[str, DbtNode],
select: list[str] | None = None,
exclude: list[str] | None = None,
) -> dict[str, DbtNode]:
"""
Given a group of nodes within a project, apply select and exclude filters using
Expand Down
2 changes: 1 addition & 1 deletion tests/dbt/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ def test_load_via_dbt_ls_without_profile():
with pytest.raises(CosmosLoadDbtException) as err_info:
dbt_graph.load_via_dbt_ls()

expected = "Unable to load dbt project without a profile config"
expected = "Unable to load dbt project without project files and a profile config"
assert err_info.value.args[0] == expected


Expand Down
66 changes: 66 additions & 0 deletions tests/test_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

SAMPLE_PROFILE_YML = Path(__file__).parent / "sample/profiles.yml"
SAMPLE_DBT_PROJECT = Path(__file__).parent / "sample/"
SAMPLE_DBT_MANIFEST = Path(__file__).parent / "sample/manifest.json"


@pytest.mark.parametrize("argument_key", ["tags", "paths"])
Expand Down Expand Up @@ -67,3 +68,68 @@ def test_converter_creates_dag_with_seed(mock_load_dbt_graph, execution_mode, op
operator_args=operator_args,
)
assert converter


@pytest.mark.parametrize(
"execution_mode,operator_args",
[
(ExecutionMode.KUBERNETES, {}),
# (ExecutionMode.DOCKER, {"image": "sample-image"}),
],
)
@patch("cosmos.converter.DbtGraph.filtered_nodes", nodes)
@patch("cosmos.converter.DbtGraph.load")
def test_converter_creates_dag_with_project_path_str(mock_load_dbt_graph, execution_mode, operator_args):
"""
This test will raise exceptions if we are trying to pass incorrect arguments to operator constructors.
"""

project_config = ProjectConfig(dbt_project_path=SAMPLE_DBT_PROJECT.as_posix())
execution_config = ExecutionConfig(execution_mode=execution_mode)
render_config = RenderConfig(emit_datasets=True)
profile_config = ProfileConfig(
profile_name="my_profile_name",
target_name="my_target_name",
profiles_yml_filepath=SAMPLE_PROFILE_YML,
)
converter = DbtToAirflowConverter(
nodes=nodes,
project_config=project_config,
profile_config=profile_config,
execution_config=execution_config,
render_config=render_config,
operator_args=operator_args,
)
assert converter


@pytest.mark.parametrize(
"execution_mode,operator_args",
[
(ExecutionMode.KUBERNETES, {}),
# (ExecutionMode.DOCKER, {"image": "sample-image"}),
],
)
@patch("cosmos.converter.DbtGraph.filtered_nodes", nodes)
@patch("cosmos.converter.DbtGraph.load")
def test_converter_creates_dag_with_no_project_dir(mock_load_dbt_graph, execution_mode, operator_args):
"""
This test will raise exceptions if we are trying to pass incorrect arguments to operator constructors.
"""
project_config = ProjectConfig(manifest_path=SAMPLE_DBT_MANIFEST.as_posix(), project_name="sample")
execution_config = ExecutionConfig(execution_mode=execution_mode)
render_config = RenderConfig(emit_datasets=True)
profile_config = ProfileConfig(
profile_name="my_profile_name",
target_name="my_target_name",
profiles_yml_filepath=SAMPLE_PROFILE_YML,
)
converter = DbtToAirflowConverter(
nodes=nodes,
project_config=project_config,
profile_config=profile_config,
execution_config=execution_config,
render_config=render_config,
operator_args=operator_args,
)
assert converter

0 comments on commit 1ae8ea2

Please sign in to comment.