Skip to content

Commit

Permalink
Support rendering build operator task-id with non-ASCII characters (#…
Browse files Browse the repository at this point in the history
…1415)

In PR [#1278](#1278), we
introduced support for rendering non-ASCII
characters in task IDs. However, due to limited access, we 
were unable to implement the same functionality for the 
build operator. This PR aims to extend that functionality 
by adding support for rendering build task IDs with 
non-ASCII characters.
  • Loading branch information
pankajastro authored and tatiana committed Dec 20, 2024
1 parent 1cbfdbe commit c2a8217
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 5 deletions.
12 changes: 9 additions & 3 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,18 +138,22 @@ def _get_task_id_and_args(
use_task_group: bool,
normalize_task_id: Callable[..., Any] | None,
resource_suffix: str,
include_resource_type: bool = False,
) -> tuple[str, dict[str, Any]]:
"""
Generate task ID and update args with display name if needed.
"""
args_update = args
task_display_name = f"{node.name}_{resource_suffix}"
if include_resource_type:
task_display_name = f"{node.name}_{node.resource_type.value}_{resource_suffix}"
if use_task_group:
task_id = resource_suffix
elif normalize_task_id:
task_id = normalize_task_id(node)
args_update["task_display_name"] = f"{node.name}_{resource_suffix}"
args_update["task_display_name"] = task_display_name
else:
task_id = f"{node.name}_{resource_suffix}"
task_id = task_display_name
return task_id, args_update


Expand Down Expand Up @@ -214,7 +218,9 @@ def create_task_metadata(
}

if test_behavior == TestBehavior.BUILD and node.resource_type in SUPPORTED_BUILD_RESOURCES:
task_id = f"{node.name}_{node.resource_type.value}_build"
task_id, args = _get_task_id_and_args(
node, args, use_task_group, normalize_task_id, "build", include_resource_type=True
)
elif node.resource_type == DbtResourceType.MODEL:
task_id, args = _get_task_id_and_args(node, args, use_task_group, normalize_task_id, "run")
elif node.resource_type == DbtResourceType.SOURCE:
Expand Down
41 changes: 39 additions & 2 deletions tests/airflow/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -621,14 +621,15 @@ def _normalize_task_id(node: DbtNode) -> str:
reason="Airflow task did not have display_name until the 2.9 release",
)
@pytest.mark.parametrize(
"node_type,node_id,normalize_task_id,use_task_group,expected_node_id,expected_display_name",
"node_type,node_id,normalize_task_id,use_task_group,test_behavior,expected_node_id,expected_display_name",
[
# normalize_task_id is None (default)
(
DbtResourceType.MODEL,
f"{DbtResourceType.MODEL.value}.my_folder.test_node",
None,
False,
None,
"test_node_run",
None,
),
Expand All @@ -637,6 +638,7 @@ def _normalize_task_id(node: DbtNode) -> str:
f"{DbtResourceType.SOURCE.value}.my_folder.test_node",
None,
False,
None,
"test_node_source",
None,
),
Expand All @@ -645,15 +647,26 @@ def _normalize_task_id(node: DbtNode) -> str:
f"{DbtResourceType.SEED.value}.my_folder.test_node",
None,
False,
None,
"test_node_seed",
None,
),
(
DbtResourceType.SEED,
f"{DbtResourceType.SEED.value}.my_folder.test_node",
None,
False,
TestBehavior.BUILD,
"test_node_seed_build",
None,
),
# normalize_task_id is passed and use_task_group is False
(
DbtResourceType.MODEL,
f"{DbtResourceType.MODEL.value}.my_folder.test_node",
_normalize_task_id,
False,
None,
"new_task_id_test_node_model",
"test_node_run",
),
Expand All @@ -662,6 +675,7 @@ def _normalize_task_id(node: DbtNode) -> str:
f"{DbtResourceType.MODEL.value}.my_folder.test_node",
_normalize_task_id,
False,
None,
"new_task_id_test_node_source",
"test_node_source",
),
Expand All @@ -670,15 +684,26 @@ def _normalize_task_id(node: DbtNode) -> str:
f"{DbtResourceType.MODEL.value}.my_folder.test_node",
_normalize_task_id,
False,
None,
"new_task_id_test_node_seed",
"test_node_seed",
),
(
DbtResourceType.SEED,
f"{DbtResourceType.MODEL.value}.my_folder.test_node",
_normalize_task_id,
False,
TestBehavior.BUILD,
"new_task_id_test_node_seed",
"test_node_seed_build",
),
# normalize_task_id is passed and use_task_group is True
(
DbtResourceType.MODEL,
f"{DbtResourceType.MODEL.value}.my_folder.test_node",
_normalize_task_id,
True,
None,
"run",
None,
),
Expand All @@ -687,6 +712,7 @@ def _normalize_task_id(node: DbtNode) -> str:
f"{DbtResourceType.MODEL.value}.my_folder.test_node",
_normalize_task_id,
True,
None,
"source",
None,
),
Expand All @@ -695,13 +721,23 @@ def _normalize_task_id(node: DbtNode) -> str:
f"{DbtResourceType.MODEL.value}.my_folder.test_node",
_normalize_task_id,
True,
None,
"seed",
None,
),
(
DbtResourceType.SEED,
f"{DbtResourceType.MODEL.value}.my_folder.test_node",
_normalize_task_id,
True,
TestBehavior.BUILD,
"build",
None,
),
],
)
def test_create_task_metadata_normalize_task_id(
node_type, node_id, normalize_task_id, use_task_group, expected_node_id, expected_display_name
node_type, node_id, normalize_task_id, use_task_group, test_behavior, expected_node_id, expected_display_name
):
node = DbtNode(
unique_id=node_id,
Expand All @@ -720,6 +756,7 @@ def test_create_task_metadata_normalize_task_id(
use_task_group=use_task_group,
normalize_task_id=normalize_task_id,
source_rendering_behavior=SourceRenderingBehavior.ALL,
test_behavior=test_behavior,
)
assert metadata.id == expected_node_id
if expected_display_name:
Expand Down

0 comments on commit c2a8217

Please sign in to comment.