Skip to content

Commit

Permalink
Merge branch 'main' into patch-1
Browse files Browse the repository at this point in the history
  • Loading branch information
FouziaTariq authored Nov 17, 2023
2 parents 30a084c + e23a445 commit df6d609
Show file tree
Hide file tree
Showing 9 changed files with 150 additions and 76 deletions.
25 changes: 25 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,31 @@ Features
* Add ``DbtDocsGCSOperator`` for uploading dbt docs to GCS by @jbandoro in #616


1.2.4 (2023-11-14)
------------------

Bug fixes

* Store ``compiled_sql`` even when task fails by @agreenburg in #671
* Refactor ``LoadMethod.LOCAL`` to use symlinks instead of copying directory by @jbandoro in #660
* Fix 'Unable to find the dbt executable: dbt' error by @tatiana in #666
* Fix installing deps when using ``profile_mapping`` & ``ExecutionMode.LOCAL`` by @joppevos in #659

Others

* Docs fix: add execution config to MWAA code example by @ugmuka in #674


1.2.3 (2023-11-09)
------------------

Features

* Add ``ProfileMapping`` for Vertica by @perttus in #540
* Add ``ProfileMapping`` for Snowflake encrypted private key path by @ivanstillfront in #608
* Add ``DbtDocsGCSOperator`` for uploading dbt docs to GCS by @jbandoro in #616


1.2.2 (2023-11-06)
------------------

Expand Down
2 changes: 1 addition & 1 deletion cosmos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
Contains dags, task groups, and operators.
"""
__version__ = "1.3.0a1"
__version__ = "1.2.4"

from cosmos.airflow.dag import DbtDag
from cosmos.airflow.task_group import DbtTaskGroup
Expand Down
12 changes: 6 additions & 6 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ def create_test_task_metadata(
task_args["indirect_selection"] = test_indirect_selection.value
if node is not None:
if node.resource_type == DbtResourceType.MODEL:
task_args["models"] = node.name
task_args["models"] = node.resource_name
elif node.resource_type == DbtResourceType.SOURCE:
task_args["select"] = f"source:{node.unique_id[len('source.'):]}"
task_args["select"] = f"source:{node.resource_name}"
else: # tested with node.resource_type == DbtResourceType.SEED or DbtResourceType.SNAPSHOT
task_args["select"] = node.name
task_args["select"] = node.resource_name
return TaskMetadata(
id=test_task_name,
operator_class=calculate_operator_class(
Expand All @@ -108,8 +108,8 @@ def create_task_metadata(
:param execution_mode: Where Cosmos should run each dbt task (e.g. ExecutionMode.LOCAL, ExecutionMode.KUBERNETES).
Default is ExecutionMode.LOCAL.
:param args: Arguments to be used to instantiate an Airflow Task
:param use_name_as_task_id_prefix: If resource_type is DbtResourceType.MODEL, it determines whether
using name as task id prefix or not. If it is True task_id = <node.name>_run, else task_id=run.
:param use_task_group: It determines whether to use the name as a prefix for the task id or not.
If it is False, then use the name as a prefix for the task id, otherwise do not.
:returns: The metadata necessary to instantiate the source dbt node as an Airflow task.
"""
dbt_resource_to_class = {
Expand All @@ -118,7 +118,7 @@ def create_task_metadata(
DbtResourceType.SEED: "DbtSeed",
DbtResourceType.TEST: "DbtTest",
}
args = {**args, **{"models": node.name}}
args = {**args, **{"models": node.resource_name}}

if DbtResourceType(node.resource_type) in DEFAULT_DBT_RESOURCES and node.resource_type in dbt_resource_to_class:
if node.resource_type == DbtResourceType.MODEL:
Expand Down
29 changes: 18 additions & 11 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ class DbtNode:
Metadata related to a dbt node (e.g. model, seed, snapshot).
"""

name: str
unique_id: str
resource_type: DbtResourceType
depends_on: list[str]
Expand All @@ -51,6 +50,23 @@ class DbtNode:
config: dict[str, Any] = field(default_factory=lambda: {})
has_test: bool = False

@property
def resource_name(self) -> str:
"""
Use this property to retrieve the resource name for command generation, for instance: ["dbt", "run", "--models", f"{resource_name}"].
The unique_id format is defined as [<resource_type>.<package>.<resource_name>](https://docs.getdbt.com/reference/artifacts/manifest-json#resource-details).
For a special case like a versioned model, the unique_id follows this pattern: [model.<package>.<resource_name>.<version>](https://github.com/dbt-labs/dbt-core/blob/main/core/dbt/contracts/graph/node_args.py#L26C3-L31)
"""
return self.unique_id.split(".", 2)[2]

@property
def name(self) -> str:
"""
Use this property as the task name or task group name.
Replace period (.) with underscore (_) due to versioned models.
"""
return self.resource_name.replace(".", "_")


def run_command(command: list[str], tmp_dir: Path, env_vars: dict[str, str]) -> str:
"""Run a command in a subprocess, returning the stdout."""
Expand Down Expand Up @@ -89,7 +105,6 @@ def parse_dbt_ls_output(project_path: Path, ls_stdout: str) -> dict[str, DbtNode
logger.debug("Skipped dbt ls line: %s", line)
else:
node = DbtNode(
name=node_dict.get("alias", node_dict["name"]),
unique_id=node_dict["unique_id"],
resource_type=DbtResourceType(node_dict["resource_type"]),
depends_on=node_dict.get("depends_on", {}).get("nodes", []),
Expand Down Expand Up @@ -195,9 +210,6 @@ def load_via_dbt_ls(self) -> None:
This is the most accurate way of loading `dbt` projects and filtering them out, since it uses the `dbt` command
line for both parsing and filtering the nodes.
Noted that if dbt project contains versioned models, need to use dbt>=1.6.0 instead. Because, as dbt<1.6.0,
dbt cli doesn't support select a specific versioned models as stg_customers_v1, customers_v1, ...
Updates in-place:
* self.nodes
* self.filtered_nodes
Expand Down Expand Up @@ -291,8 +303,7 @@ def load_via_custom_parser(self) -> None:
for model_name, model in models:
config = {item.split(":")[0]: item.split(":")[-1] for item in model.config.config_selectors}
node = DbtNode(
name=model_name,
unique_id=model_name,
unique_id=f"{model.type.value}.{self.project.project_name}.{model_name}",
resource_type=DbtResourceType(model.type.value),
depends_on=list(model.config.upstream_models),
file_path=Path(
Expand Down Expand Up @@ -325,9 +336,6 @@ def load_from_dbt_manifest(self) -> None:
However, since the Manifest does not represent filters, it relies on the Custom Cosmos implementation
to filter out the nodes relevant to the user (based on self.exclude and self.select).
Noted that if dbt project contains versioned models, need to use dbt>=1.6.0 instead. Because, as dbt<1.6.0,
dbt cli doesn't support select a specific versioned models as stg_customers_v1, customers_v1, ...
Updates in-place:
* self.nodes
* self.filtered_nodes
Expand All @@ -347,7 +355,6 @@ def load_from_dbt_manifest(self) -> None:
resources = {**manifest.get("nodes", {}), **manifest.get("sources", {}), **manifest.get("exposures", {})}
for unique_id, node_dict in resources.items():
node = DbtNode(
name=node_dict.get("alias", node_dict["name"]),
unique_id=unique_id,
resource_type=DbtResourceType(node_dict["resource_type"]),
depends_on=node_dict.get("depends_on", {}).get("nodes", []),
Expand Down
4 changes: 4 additions & 0 deletions dev/dags/dbt/model_version/models/schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@ models:
- include: all
exclude:
- full_name
config:
alias: '{{ "customers_" ~ var("division", "USA") ~ "_v1" }}'
- v: 2
config:
alias: '{{ "customers_" ~ var("division", "USA") ~ "_v2" }}'

- name: orders
description: This table has basic information about orders, as well as some derived facts based on payments
Expand Down
Loading

0 comments on commit df6d609

Please sign in to comment.