From 8dc46be1c30e151e3f16984b2dea52bb52f1c810 Mon Sep 17 00:00:00 2001 From: t0momi219 Date: Thu, 24 Oct 2024 01:18:23 +0900 Subject: [PATCH 01/14] To support task display name. --- cosmos/airflow/graph.py | 23 ++++++++++++++++++++--- cosmos/config.py | 3 +++ 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index f507b03ac..979e1341b 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -135,6 +135,7 @@ def create_task_metadata( dbt_dag_task_group_identifier: str, use_task_group: bool = False, source_rendering_behavior: SourceRenderingBehavior = SourceRenderingBehavior.NONE, + set_task_id_by_node: Callable[..., Any] | None = None ) -> TaskMetadata | None: """ Create the metadata that will be used to instantiate the Airflow Task used to run the Dbt node. @@ -163,9 +164,13 @@ def create_task_metadata( "dbt_dag_task_group_identifier": dbt_dag_task_group_identifier, } if node.resource_type == DbtResourceType.MODEL: - task_id = f"{node.name}_run" if use_task_group is True: task_id = "run" + elif set_task_id_by_node: + task_id = set_task_id_by_node(node) + args["task_display_name"] = f"{node.name}_run" + else: + task_id = f"{node.name}_run" elif node.resource_type == DbtResourceType.SOURCE: if (source_rendering_behavior == SourceRenderingBehavior.NONE) or ( source_rendering_behavior == SourceRenderingBehavior.WITH_TESTS_OR_FRESHNESS @@ -175,18 +180,26 @@ def create_task_metadata( return None # TODO: https://github.com/astronomer/astronomer-cosmos # pragma: no cover - task_id = f"{node.name}_source" args["select"] = f"source:{node.resource_name}" args.pop("models") if use_task_group is True: task_id = node.resource_type.value + elif set_task_id_by_node: + task_id = set_task_id_by_node(node) + args["task_display_name"] = f"{node.name}_source" + else: + task_id = f"{node.name}_source" if node.has_freshness is False and source_rendering_behavior == SourceRenderingBehavior.ALL: # render sources without freshness as empty operators return TaskMetadata(id=task_id, operator_class="airflow.operators.empty.EmptyOperator") else: - task_id = f"{node.name}_{node.resource_type.value}" if use_task_group is True: task_id = node.resource_type.value + elif set_task_id_by_node: + task_id = set_task_id_by_node(node) + args["task_display_name"] = f"{node.name}_{node.resource_type.value}" + else: + task_id = f"{node.name}_{node.resource_type.value}" task_metadata = TaskMetadata( id=task_id, @@ -217,6 +230,7 @@ def generate_task_or_group( source_rendering_behavior: SourceRenderingBehavior, test_indirect_selection: TestIndirectSelection, on_warning_callback: Callable[..., Any] | None, + set_task_id_by_node: Callable[..., Any] | None, **kwargs: Any, ) -> BaseOperator | TaskGroup | None: task_or_group: BaseOperator | TaskGroup | None = None @@ -234,6 +248,7 @@ def generate_task_or_group( dbt_dag_task_group_identifier=_get_dbt_dag_task_group_identifier(dag, task_group), use_task_group=use_task_group, source_rendering_behavior=source_rendering_behavior, + set_task_id_by_node=set_task_id_by_node ) # In most cases, we'll map one DBT node to one Airflow task @@ -335,6 +350,7 @@ def build_airflow_graph( node_converters = render_config.node_converters or {} test_behavior = render_config.test_behavior source_rendering_behavior = render_config.source_rendering_behavior + set_task_id_by_node = render_config.set_task_id_by_node tasks_map = {} task_or_group: TaskGroup | BaseOperator @@ -356,6 +372,7 @@ def build_airflow_graph( source_rendering_behavior=source_rendering_behavior, test_indirect_selection=test_indirect_selection, on_warning_callback=on_warning_callback, + set_task_id_by_node=set_task_id_by_node, node=node, ) if task_or_group is not None: diff --git a/cosmos/config.py b/cosmos/config.py index ccda2c432..50064731e 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -62,6 +62,8 @@ class RenderConfig: :param dbt_ls_path: Configures the location of an output of ``dbt ls``. Required when using ``load_method=LoadMode.DBT_LS_FILE``. :param enable_mock_profile: Allows to enable/disable mocking profile. Enabled by default. Mock profiles are useful for parsing Cosmos DAGs in the CI, but should be disabled to benefit from partial parsing (since Cosmos 1.4). :param source_rendering_behavior: Determines how source nodes are rendered when using cosmos default source node rendering (ALL, NONE, WITH_TESTS_OR_FRESHNESS). Defaults to "NONE" (since Cosmos 1.6). + :param airflow_vars_to_purge_dbt_ls_cache: Specify Airflow variables that will affect the LoadMode.DBT_LS cache. + :param set_task_id_by_node: A callable that takes a dbt node as input and returns the task ID. This allows users to assign a custom node ID separate from the display name. """ emit_datasets: bool = True @@ -80,6 +82,7 @@ class RenderConfig: enable_mock_profile: bool = True source_rendering_behavior: SourceRenderingBehavior = SourceRenderingBehavior.NONE airflow_vars_to_purge_dbt_ls_cache: list[str] = field(default_factory=list) + set_task_id_by_node: Callable[..., Any] | None = None def __post_init__(self, dbt_project_path: str | Path | None) -> None: if self.env_vars: From 889861dcedeb4fc5dffe8edf459a38de567dbe7b Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 23 Oct 2024 16:44:08 +0000 Subject: [PATCH 02/14] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20f?= =?UTF-8?q?ormat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cosmos/airflow/graph.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 979e1341b..61e035fd8 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -135,7 +135,7 @@ def create_task_metadata( dbt_dag_task_group_identifier: str, use_task_group: bool = False, source_rendering_behavior: SourceRenderingBehavior = SourceRenderingBehavior.NONE, - set_task_id_by_node: Callable[..., Any] | None = None + set_task_id_by_node: Callable[..., Any] | None = None, ) -> TaskMetadata | None: """ Create the metadata that will be used to instantiate the Airflow Task used to run the Dbt node. @@ -248,7 +248,7 @@ def generate_task_or_group( dbt_dag_task_group_identifier=_get_dbt_dag_task_group_identifier(dag, task_group), use_task_group=use_task_group, source_rendering_behavior=source_rendering_behavior, - set_task_id_by_node=set_task_id_by_node + set_task_id_by_node=set_task_id_by_node, ) # In most cases, we'll map one DBT node to one Airflow task From 28dc04d7f707de00c71dbb978a866d8c3299dd78 Mon Sep 17 00:00:00 2001 From: t0momi219 Date: Sat, 26 Oct 2024 18:47:45 +0900 Subject: [PATCH 03/14] add tests --- cosmos/airflow/graph.py | 124 +++++++++++++++++++----------------- tests/airflow/test_graph.py | 116 +++++++++++++++++++++++++++++++++ 2 files changed, 183 insertions(+), 57 deletions(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 61e035fd8..a5477f739 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -127,6 +127,24 @@ def create_test_task_metadata( extra_context=extra_context, ) +def _get_task_id_and_args( + node: DbtNode, + use_task_group: bool, + set_task_id_by_node: Callable[..., Any] | None, + resource_suffix: str, +) -> tuple[str, dict]: + """ + Generate task ID and update args with display name if needed. + """ + args_update = {} + if use_task_group: + task_id = resource_suffix + elif set_task_id_by_node: + task_id = set_task_id_by_node(node) + args_update["task_display_name"] = f"{node.name}_{resource_suffix}" + else: + task_id = f"{node.name}_{resource_suffix}" + return task_id, args_update def create_task_metadata( node: DbtNode, @@ -156,62 +174,7 @@ def create_task_metadata( DbtResourceType.TEST: "DbtTest", DbtResourceType.SOURCE: "DbtSource", } - args = {**args, **{"models": node.resource_name}} - - if DbtResourceType(node.resource_type) in DEFAULT_DBT_RESOURCES and node.resource_type in dbt_resource_to_class: - extra_context = { - "dbt_node_config": node.context_dict, - "dbt_dag_task_group_identifier": dbt_dag_task_group_identifier, - } - if node.resource_type == DbtResourceType.MODEL: - if use_task_group is True: - task_id = "run" - elif set_task_id_by_node: - task_id = set_task_id_by_node(node) - args["task_display_name"] = f"{node.name}_run" - else: - task_id = f"{node.name}_run" - elif node.resource_type == DbtResourceType.SOURCE: - if (source_rendering_behavior == SourceRenderingBehavior.NONE) or ( - source_rendering_behavior == SourceRenderingBehavior.WITH_TESTS_OR_FRESHNESS - and node.has_freshness is False - and node.has_test is False - ): - return None - # TODO: https://github.com/astronomer/astronomer-cosmos - # pragma: no cover - args["select"] = f"source:{node.resource_name}" - args.pop("models") - if use_task_group is True: - task_id = node.resource_type.value - elif set_task_id_by_node: - task_id = set_task_id_by_node(node) - args["task_display_name"] = f"{node.name}_source" - else: - task_id = f"{node.name}_source" - if node.has_freshness is False and source_rendering_behavior == SourceRenderingBehavior.ALL: - # render sources without freshness as empty operators - return TaskMetadata(id=task_id, operator_class="airflow.operators.empty.EmptyOperator") - else: - if use_task_group is True: - task_id = node.resource_type.value - elif set_task_id_by_node: - task_id = set_task_id_by_node(node) - args["task_display_name"] = f"{node.name}_{node.resource_type.value}" - else: - task_id = f"{node.name}_{node.resource_type.value}" - - task_metadata = TaskMetadata( - id=task_id, - owner=node.owner, - operator_class=calculate_operator_class( - execution_mode=execution_mode, dbt_class=dbt_resource_to_class[node.resource_type] - ), - arguments=args, - extra_context=extra_context, - ) - return task_metadata - else: + if DbtResourceType(node.resource_type) not in DEFAULT_DBT_RESOURCES or node.resource_type not in dbt_resource_to_class: msg = ( f"Unavailable conversion function for <{node.resource_type}> (node <{node.unique_id}>). " "Define a converter function using render_config.node_converters." @@ -219,6 +182,53 @@ def create_task_metadata( logger.warning(msg) return None + args = {**args, **{"models": node.resource_name}} + extra_context = { + "dbt_node_config": node.context_dict, + "dbt_dag_task_group_identifier": dbt_dag_task_group_identifier, + } + + if node.resource_type == DbtResourceType.MODEL: + task_id, args_update = _get_task_id_and_args(node,use_task_group,set_task_id_by_node,"run") + args.update(args_update) + elif node.resource_type == DbtResourceType.SOURCE: + if (source_rendering_behavior == SourceRenderingBehavior.NONE) or ( + source_rendering_behavior == SourceRenderingBehavior.WITH_TESTS_OR_FRESHNESS + and node.has_freshness is False + and node.has_test is False + ): + return None + # TODO: https://github.com/astronomer/astronomer-cosmos + # pragma: no cover + args["select"] = f"source:{node.resource_name}" + args.pop("models") + task_id, args_update = _get_task_id_and_args(node,use_task_group,set_task_id_by_node,"source") + args.update(args_update) + if node.has_freshness is False and source_rendering_behavior == SourceRenderingBehavior.ALL: + # render sources without freshness as empty operators + # empty operator does not accept custom parameters (e.g., profile_args). recreate the args. + if "task_display_name" in args: + args = { + "task_display_name": args["task_display_name"] + } + else: + args = {} + return TaskMetadata(id=task_id, operator_class="airflow.operators.empty.EmptyOperator", arguments=args) + else: + task_id, args_update = _get_task_id_and_args(node,use_task_group,set_task_id_by_node,node.resource_type.value) + args.update(args_update) + + task_metadata = TaskMetadata( + id=task_id, + owner=node.owner, + operator_class=calculate_operator_class( + execution_mode=execution_mode, dbt_class=dbt_resource_to_class[node.resource_type] + ), + arguments=args, + extra_context=extra_context, + ) + return task_metadata + def generate_task_or_group( dag: DAG, @@ -230,7 +240,7 @@ def generate_task_or_group( source_rendering_behavior: SourceRenderingBehavior, test_indirect_selection: TestIndirectSelection, on_warning_callback: Callable[..., Any] | None, - set_task_id_by_node: Callable[..., Any] | None, + set_task_id_by_node: Callable[..., Any] | None = None, **kwargs: Any, ) -> BaseOperator | TaskGroup | None: task_or_group: BaseOperator | TaskGroup | None = None diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index 1bd8cab35..c7cf4cff6 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -568,6 +568,122 @@ def test_create_task_metadata_snapshot(caplog): assert metadata.arguments == {"models": "my_snapshot"} +def _set_task_id_by_node(node: DbtNode) -> str: + """for test_create_task_metadata_set_task_id_by_node""" + return f"new_task_id_{node.name}_{node.resource_type.value}" + +@pytest.mark.skipif( + version.parse(airflow_version) < version.parse("2.9"), + reason="Airflow task did not have display_name until the 2.9 release", +) +@pytest.mark.parametrize( + "node_type,node_id,set_task_id_by_node,use_task_group,expected_node_id,expected_display_name", + [ + # set_task_id_by_node is None (default) + ( + DbtResourceType.MODEL, + f"{DbtResourceType.MODEL.value}.my_folder.test_node", + None, + False, + "test_node_run", + None, + ), + ( + DbtResourceType.SOURCE, + f"{DbtResourceType.SOURCE.value}.my_folder.test_node", + None, + False, + "test_node_source", + None, + ), + ( + DbtResourceType.SEED, + f"{DbtResourceType.SEED.value}.my_folder.test_node", + None, + False, + "test_node_seed", + None, + ), + # set_task_id_by_node is passed and use_task_group is False + ( + DbtResourceType.MODEL, + f"{DbtResourceType.MODEL.value}.my_folder.test_node", + _set_task_id_by_node, + False, + "new_task_id_test_node_model", + "test_node_run", + ), + ( + DbtResourceType.SOURCE, + f"{DbtResourceType.MODEL.value}.my_folder.test_node", + _set_task_id_by_node, + False, + "new_task_id_test_node_source", + "test_node_source", + ), + ( + DbtResourceType.SEED, + f"{DbtResourceType.MODEL.value}.my_folder.test_node", + _set_task_id_by_node, + False, + "new_task_id_test_node_seed", + "test_node_seed", + ), + # set_task_id_by_node is passed and use_task_group is True + ( + DbtResourceType.MODEL, + f"{DbtResourceType.MODEL.value}.my_folder.test_node", + _set_task_id_by_node, + True, + "run", + None, + ), + ( + DbtResourceType.SOURCE, + f"{DbtResourceType.MODEL.value}.my_folder.test_node", + _set_task_id_by_node, + True, + "source", + None, + ), + ( + DbtResourceType.SEED, + f"{DbtResourceType.MODEL.value}.my_folder.test_node", + _set_task_id_by_node, + True, + "seed", + None, + ), + ], +) +def test_create_task_metadata_set_task_id_by_node( + node_type, node_id, set_task_id_by_node, use_task_group, expected_node_id, expected_display_name +): + node = DbtNode( + unique_id=node_id, + resource_type=node_type, + depends_on=[], + file_path="", + tags=[], + config={}, + ) + args = {} + metadata = create_task_metadata( + node, + execution_mode=ExecutionMode.LOCAL, + args=args, + dbt_dag_task_group_identifier="", + use_task_group=use_task_group, + set_task_id_by_node=set_task_id_by_node, + source_rendering_behavior=SourceRenderingBehavior.ALL, + ) + assert metadata.id == expected_node_id + if expected_display_name: + assert metadata.arguments["task_display_name"] == expected_display_name + else: + assert "task_display_name" not in metadata.arguments + + @pytest.mark.parametrize( "node_type,node_unique_id,test_indirect_selection,additional_arguments", [ From c3a1ee8b14bdb551a60d1459410f515a5c2ad0b3 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 26 Oct 2024 09:48:06 +0000 Subject: [PATCH 04/14] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20f?= =?UTF-8?q?ormat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cosmos/airflow/graph.py | 19 ++++++++++++------- tests/airflow/test_graph.py | 1 + 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index a5477f739..4ac532558 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -127,6 +127,7 @@ def create_test_task_metadata( extra_context=extra_context, ) + def _get_task_id_and_args( node: DbtNode, use_task_group: bool, @@ -146,6 +147,7 @@ def _get_task_id_and_args( task_id = f"{node.name}_{resource_suffix}" return task_id, args_update + def create_task_metadata( node: DbtNode, execution_mode: ExecutionMode, @@ -174,7 +176,10 @@ def create_task_metadata( DbtResourceType.TEST: "DbtTest", DbtResourceType.SOURCE: "DbtSource", } - if DbtResourceType(node.resource_type) not in DEFAULT_DBT_RESOURCES or node.resource_type not in dbt_resource_to_class: + if ( + DbtResourceType(node.resource_type) not in DEFAULT_DBT_RESOURCES + or node.resource_type not in dbt_resource_to_class + ): msg = ( f"Unavailable conversion function for <{node.resource_type}> (node <{node.unique_id}>). " "Define a converter function using render_config.node_converters." @@ -189,7 +194,7 @@ def create_task_metadata( } if node.resource_type == DbtResourceType.MODEL: - task_id, args_update = _get_task_id_and_args(node,use_task_group,set_task_id_by_node,"run") + task_id, args_update = _get_task_id_and_args(node, use_task_group, set_task_id_by_node, "run") args.update(args_update) elif node.resource_type == DbtResourceType.SOURCE: if (source_rendering_behavior == SourceRenderingBehavior.NONE) or ( @@ -202,20 +207,20 @@ def create_task_metadata( # pragma: no cover args["select"] = f"source:{node.resource_name}" args.pop("models") - task_id, args_update = _get_task_id_and_args(node,use_task_group,set_task_id_by_node,"source") + task_id, args_update = _get_task_id_and_args(node, use_task_group, set_task_id_by_node, "source") args.update(args_update) if node.has_freshness is False and source_rendering_behavior == SourceRenderingBehavior.ALL: # render sources without freshness as empty operators # empty operator does not accept custom parameters (e.g., profile_args). recreate the args. if "task_display_name" in args: - args = { - "task_display_name": args["task_display_name"] - } + args = {"task_display_name": args["task_display_name"]} else: args = {} return TaskMetadata(id=task_id, operator_class="airflow.operators.empty.EmptyOperator", arguments=args) else: - task_id, args_update = _get_task_id_and_args(node,use_task_group,set_task_id_by_node,node.resource_type.value) + task_id, args_update = _get_task_id_and_args( + node, use_task_group, set_task_id_by_node, node.resource_type.value + ) args.update(args_update) task_metadata = TaskMetadata( diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index c7cf4cff6..33d87a5cb 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -572,6 +572,7 @@ def _set_task_id_by_node(node: DbtNode) -> str: """for test_create_task_metadata_set_task_id_by_node""" return f"new_task_id_{node.name}_{node.resource_type.value}" + @pytest.mark.skipif( version.parse(airflow_version) < version.parse("2.9"), reason="Airflow task did not have display_name until the 2.9 release", From 9170c3aedacf91927b469e228fe41d51bae4550b Mon Sep 17 00:00:00 2001 From: t0momi219 Date: Sat, 26 Oct 2024 19:35:06 +0900 Subject: [PATCH 05/14] add docs. --- docs/configuration/render-config.rst | 2 ++ docs/configuration/task_display_name.rst | 28 ++++++++++++++++++++++++ 2 files changed, 30 insertions(+) create mode 100644 docs/configuration/task_display_name.rst diff --git a/docs/configuration/render-config.rst b/docs/configuration/render-config.rst index 068998de5..fd010453c 100644 --- a/docs/configuration/render-config.rst +++ b/docs/configuration/render-config.rst @@ -18,6 +18,8 @@ The ``RenderConfig`` class takes the following arguments: - ``env_vars``: (available in v1.2.5, use``ProjectConfig.env_vars`` for v1.3.0 onwards) A dictionary of environment variables for rendering. Only supported when using ``load_method=LoadMode.DBT_LS``. - ``dbt_project_path``: Configures the DBT project location accessible on their airflow controller for DAG rendering - Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM`` - ``airflow_vars_to_purge_cache``: (new in v1.5) Specify Airflow variables that will affect the ``LoadMode.DBT_LS`` cache. See `Caching <./caching.html>`_ for more information. +- ``source_rendering_behavior``: Determines how source nodes are rendered when using cosmos default source node rendering (ALL, NONE, WITH_TESTS_OR_FRESHNESS). Defaults to "NONE" (since Cosmos 1.6). See `Source Nodes Rendering <./source-nodes-rendering.html>`_ for more information. +- ``set_task_id_by_node``: A callable that takes a dbt node as input and returns the task ID. This allows users to assign a custom node ID separate from the display name. The display_name parameter is available above Airflow 2.9. See `Task display name<./task_display_name.html>`_ for more information. Customizing how nodes are rendered (experimental) ------------------------------------------------- diff --git a/docs/configuration/task_display_name.rst b/docs/configuration/task_display_name.rst new file mode 100644 index 000000000..9e9fe8538 --- /dev/null +++ b/docs/configuration/task_display_name.rst @@ -0,0 +1,28 @@ +.. _task_display_name: + +Task display name +================ + +.. note:: + This feature is only available for Airflow >= 2.9. + +In Airflow, `task_id` does not support non-ASCII characters. Therefore, if users wish to use non-ASCII characters (such as their native language) as display names while keeping `task_id` in ASCII, they can use the `display_name` parameter. + +To work with projects that use non-ASCII characters in model names, the `set_task_id_by_node` field of `RenderConfig` can be utilized. + +Example: +You can provide a function to convert the model name to an ASCII-compatible format. The function’s output is used as the TaskID, while the display name on Airflow remains as the original model name. + +```python +from slugify import slugify + +def set_task_id_by_node(node): + return slugify(node.name) + +from cosmos import DbtTaskGroup, RenderConfig + +jaffle_shop = DbtTaskGroup( + render_config=RenderConfig( + set_task_id_by_node=set_task_id_by_node + ) +) From 49363b05279fb3a79b7f70d3f01d7453cbb523c7 Mon Sep 17 00:00:00 2001 From: t0momi219 Date: Sat, 26 Oct 2024 20:31:02 +0900 Subject: [PATCH 06/14] fix mypy error --- cosmos/airflow/graph.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 4ac532558..4e1e6367c 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -133,7 +133,7 @@ def _get_task_id_and_args( use_task_group: bool, set_task_id_by_node: Callable[..., Any] | None, resource_suffix: str, -) -> tuple[str, dict]: +) -> tuple[str, dict[str, Any]]: """ Generate task ID and update args with display name if needed. """ From 1ed6e105dc98fb5401c9ce037d92a56d6bee742e Mon Sep 17 00:00:00 2001 From: t0momi219 Date: Sat, 26 Oct 2024 23:46:37 +0900 Subject: [PATCH 07/14] fix document --- docs/configuration/index.rst | 1 + docs/configuration/render-config.rst | 2 +- docs/configuration/task-display-name.rst | 30 ++++++++++++++++++++++++ 3 files changed, 32 insertions(+), 1 deletion(-) create mode 100644 docs/configuration/task-display-name.rst diff --git a/docs/configuration/index.rst b/docs/configuration/index.rst index f6e60f61b..0da03a86f 100644 --- a/docs/configuration/index.rst +++ b/docs/configuration/index.rst @@ -27,3 +27,4 @@ Cosmos offers a number of configuration options to customize its behavior. For m Compiled SQL Logging Caching + Task display name diff --git a/docs/configuration/render-config.rst b/docs/configuration/render-config.rst index fd010453c..a8f0b76b5 100644 --- a/docs/configuration/render-config.rst +++ b/docs/configuration/render-config.rst @@ -19,7 +19,7 @@ The ``RenderConfig`` class takes the following arguments: - ``dbt_project_path``: Configures the DBT project location accessible on their airflow controller for DAG rendering - Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM`` - ``airflow_vars_to_purge_cache``: (new in v1.5) Specify Airflow variables that will affect the ``LoadMode.DBT_LS`` cache. See `Caching <./caching.html>`_ for more information. - ``source_rendering_behavior``: Determines how source nodes are rendered when using cosmos default source node rendering (ALL, NONE, WITH_TESTS_OR_FRESHNESS). Defaults to "NONE" (since Cosmos 1.6). See `Source Nodes Rendering <./source-nodes-rendering.html>`_ for more information. -- ``set_task_id_by_node``: A callable that takes a dbt node as input and returns the task ID. This allows users to assign a custom node ID separate from the display name. The display_name parameter is available above Airflow 2.9. See `Task display name<./task_display_name.html>`_ for more information. +- ``set_task_id_by_node``: A callable that takes a dbt node as input and returns the task ID. This function allows users to set a custom task_id independently of the model name, which can be specified as the task’s display_name. This way, task_id can be modified using a user-defined function, while the model name remains as the task’s display name. The display_name parameter is available in Airflow 2.9 and above. See `Task display name <./task-display-name.html>`_ for more information. Customizing how nodes are rendered (experimental) ------------------------------------------------- diff --git a/docs/configuration/task-display-name.rst b/docs/configuration/task-display-name.rst new file mode 100644 index 000000000..d6e57dd32 --- /dev/null +++ b/docs/configuration/task-display-name.rst @@ -0,0 +1,30 @@ +.. _task-display-name: + +Task display name +================ + +.. note:: + This feature is only available for Airflow >= 2.9. + +In Airflow, `task_id` does not support non-ASCII characters. Therefore, if users wish to use non-ASCII characters (such as their native language) as display names while keeping `task_id` in ASCII, they can use the `display_name` parameter. + +To work with projects that use non-ASCII characters in model names, the `set_task_id_by_node` field of `RenderConfig` can be utilized. + +Example: + +You can provide a function to convert the model name to an ASCII-compatible format. The function’s output is used as the TaskID, while the display name on Airflow remains as the original model name. + +.. code-block:: python + + from slugify import slugify + + def set_task_id_by_node(node): + return slugify(node.name) + + from cosmos import DbtTaskGroup, RenderConfig + + jaffle_shop = DbtTaskGroup( + render_config=RenderConfig( + set_task_id_by_node=set_task_id_by_node + ) + ) From 946e18359deead6b598b3eec427aea709e0127cb Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 26 Oct 2024 14:46:53 +0000 Subject: [PATCH 08/14] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20f?= =?UTF-8?q?ormat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/configuration/task-display-name.rst | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/configuration/task-display-name.rst b/docs/configuration/task-display-name.rst index d6e57dd32..b90eb24b9 100644 --- a/docs/configuration/task-display-name.rst +++ b/docs/configuration/task-display-name.rst @@ -18,13 +18,13 @@ You can provide a function to convert the model name to an ASCII-compatible form from slugify import slugify + def set_task_id_by_node(node): return slugify(node.name) + from cosmos import DbtTaskGroup, RenderConfig jaffle_shop = DbtTaskGroup( - render_config=RenderConfig( - set_task_id_by_node=set_task_id_by_node - ) + render_config=RenderConfig(set_task_id_by_node=set_task_id_by_node) ) From 32586dcc19374119097114c4e93ba599e76fa205 Mon Sep 17 00:00:00 2001 From: t0momi219 Date: Sat, 26 Oct 2024 23:49:41 +0900 Subject: [PATCH 09/14] fix document --- docs/configuration/task_display_name.rst | 28 ------------------------ 1 file changed, 28 deletions(-) delete mode 100644 docs/configuration/task_display_name.rst diff --git a/docs/configuration/task_display_name.rst b/docs/configuration/task_display_name.rst deleted file mode 100644 index 9e9fe8538..000000000 --- a/docs/configuration/task_display_name.rst +++ /dev/null @@ -1,28 +0,0 @@ -.. _task_display_name: - -Task display name -================ - -.. note:: - This feature is only available for Airflow >= 2.9. - -In Airflow, `task_id` does not support non-ASCII characters. Therefore, if users wish to use non-ASCII characters (such as their native language) as display names while keeping `task_id` in ASCII, they can use the `display_name` parameter. - -To work with projects that use non-ASCII characters in model names, the `set_task_id_by_node` field of `RenderConfig` can be utilized. - -Example: -You can provide a function to convert the model name to an ASCII-compatible format. The function’s output is used as the TaskID, while the display name on Airflow remains as the original model name. - -```python -from slugify import slugify - -def set_task_id_by_node(node): - return slugify(node.name) - -from cosmos import DbtTaskGroup, RenderConfig - -jaffle_shop = DbtTaskGroup( - render_config=RenderConfig( - set_task_id_by_node=set_task_id_by_node - ) -) From 218bbc5fd42cc77408b4484d2195cc855b2c0354 Mon Sep 17 00:00:00 2001 From: t0momi219 Date: Thu, 31 Oct 2024 14:08:34 +0900 Subject: [PATCH 10/14] minimize changes --- cosmos/airflow/graph.py | 115 +++++++++++------------ cosmos/config.py | 4 +- docs/configuration/render-config.rst | 2 +- docs/configuration/task-display-name.rst | 6 +- tests/airflow/test_graph.py | 30 +++--- 5 files changed, 75 insertions(+), 82 deletions(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 4e1e6367c..1f8f80e3e 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -130,18 +130,19 @@ def create_test_task_metadata( def _get_task_id_and_args( node: DbtNode, + args: dict[str, Any], use_task_group: bool, - set_task_id_by_node: Callable[..., Any] | None, + normalize_task_id: Callable[..., Any] | None, resource_suffix: str, ) -> tuple[str, dict[str, Any]]: """ Generate task ID and update args with display name if needed. """ - args_update = {} + args_update = args if use_task_group: task_id = resource_suffix - elif set_task_id_by_node: - task_id = set_task_id_by_node(node) + elif normalize_task_id: + task_id = normalize_task_id(node) args_update["task_display_name"] = f"{node.name}_{resource_suffix}" else: task_id = f"{node.name}_{resource_suffix}" @@ -155,7 +156,7 @@ def create_task_metadata( dbt_dag_task_group_identifier: str, use_task_group: bool = False, source_rendering_behavior: SourceRenderingBehavior = SourceRenderingBehavior.NONE, - set_task_id_by_node: Callable[..., Any] | None = None, + normalize_task_id: Callable[..., Any] | None = None, ) -> TaskMetadata | None: """ Create the metadata that will be used to instantiate the Airflow Task used to run the Dbt node. @@ -176,10 +177,50 @@ def create_task_metadata( DbtResourceType.TEST: "DbtTest", DbtResourceType.SOURCE: "DbtSource", } - if ( - DbtResourceType(node.resource_type) not in DEFAULT_DBT_RESOURCES - or node.resource_type not in dbt_resource_to_class - ): + args = {**args, **{"models": node.resource_name}} + + if DbtResourceType(node.resource_type) in DEFAULT_DBT_RESOURCES and node.resource_type in dbt_resource_to_class: + extra_context = { + "dbt_node_config": node.context_dict, + "dbt_dag_task_group_identifier": dbt_dag_task_group_identifier, + } + if node.resource_type == DbtResourceType.MODEL: + task_id, args = _get_task_id_and_args(node, args, use_task_group, normalize_task_id, "run") + elif node.resource_type == DbtResourceType.SOURCE: + if (source_rendering_behavior == SourceRenderingBehavior.NONE) or ( + source_rendering_behavior == SourceRenderingBehavior.WITH_TESTS_OR_FRESHNESS + and node.has_freshness is False + and node.has_test is False + ): + return None + # TODO: https://github.com/astronomer/astronomer-cosmos + # pragma: no cover + task_id = f"{node.name}_source" + args["select"] = f"source:{node.resource_name}" + args.pop("models") + task_id, args = _get_task_id_and_args(node, args, use_task_group, normalize_task_id, "source") + if node.has_freshness is False and source_rendering_behavior == SourceRenderingBehavior.ALL: + # render sources without freshness as empty operators + # empty operator does not accept custom parameters (e.g., profile_args). recreate the args. + if "task_display_name" in args: + args = {"task_display_name": args["task_display_name"]} + else: + args = {} + return TaskMetadata(id=task_id, operator_class="airflow.operators.empty.EmptyOperator", arguments=args) + else: + task_id, args = _get_task_id_and_args(node, args, use_task_group, normalize_task_id, node.resource_type.value) + + task_metadata = TaskMetadata( + id=task_id, + owner=node.owner, + operator_class=calculate_operator_class( + execution_mode=execution_mode, dbt_class=dbt_resource_to_class[node.resource_type] + ), + arguments=args, + extra_context=extra_context, + ) + return task_metadata + else: msg = ( f"Unavailable conversion function for <{node.resource_type}> (node <{node.unique_id}>). " "Define a converter function using render_config.node_converters." @@ -187,54 +228,6 @@ def create_task_metadata( logger.warning(msg) return None - args = {**args, **{"models": node.resource_name}} - extra_context = { - "dbt_node_config": node.context_dict, - "dbt_dag_task_group_identifier": dbt_dag_task_group_identifier, - } - - if node.resource_type == DbtResourceType.MODEL: - task_id, args_update = _get_task_id_and_args(node, use_task_group, set_task_id_by_node, "run") - args.update(args_update) - elif node.resource_type == DbtResourceType.SOURCE: - if (source_rendering_behavior == SourceRenderingBehavior.NONE) or ( - source_rendering_behavior == SourceRenderingBehavior.WITH_TESTS_OR_FRESHNESS - and node.has_freshness is False - and node.has_test is False - ): - return None - # TODO: https://github.com/astronomer/astronomer-cosmos - # pragma: no cover - args["select"] = f"source:{node.resource_name}" - args.pop("models") - task_id, args_update = _get_task_id_and_args(node, use_task_group, set_task_id_by_node, "source") - args.update(args_update) - if node.has_freshness is False and source_rendering_behavior == SourceRenderingBehavior.ALL: - # render sources without freshness as empty operators - # empty operator does not accept custom parameters (e.g., profile_args). recreate the args. - if "task_display_name" in args: - args = {"task_display_name": args["task_display_name"]} - else: - args = {} - return TaskMetadata(id=task_id, operator_class="airflow.operators.empty.EmptyOperator", arguments=args) - else: - task_id, args_update = _get_task_id_and_args( - node, use_task_group, set_task_id_by_node, node.resource_type.value - ) - args.update(args_update) - - task_metadata = TaskMetadata( - id=task_id, - owner=node.owner, - operator_class=calculate_operator_class( - execution_mode=execution_mode, dbt_class=dbt_resource_to_class[node.resource_type] - ), - arguments=args, - extra_context=extra_context, - ) - return task_metadata - - def generate_task_or_group( dag: DAG, task_group: TaskGroup | None, @@ -245,7 +238,7 @@ def generate_task_or_group( source_rendering_behavior: SourceRenderingBehavior, test_indirect_selection: TestIndirectSelection, on_warning_callback: Callable[..., Any] | None, - set_task_id_by_node: Callable[..., Any] | None = None, + normalize_task_id: Callable[..., Any] | None = None, **kwargs: Any, ) -> BaseOperator | TaskGroup | None: task_or_group: BaseOperator | TaskGroup | None = None @@ -263,7 +256,7 @@ def generate_task_or_group( dbt_dag_task_group_identifier=_get_dbt_dag_task_group_identifier(dag, task_group), use_task_group=use_task_group, source_rendering_behavior=source_rendering_behavior, - set_task_id_by_node=set_task_id_by_node, + normalize_task_id=normalize_task_id, ) # In most cases, we'll map one DBT node to one Airflow task @@ -365,7 +358,7 @@ def build_airflow_graph( node_converters = render_config.node_converters or {} test_behavior = render_config.test_behavior source_rendering_behavior = render_config.source_rendering_behavior - set_task_id_by_node = render_config.set_task_id_by_node + normalize_task_id = render_config.normalize_task_id tasks_map = {} task_or_group: TaskGroup | BaseOperator @@ -387,7 +380,7 @@ def build_airflow_graph( source_rendering_behavior=source_rendering_behavior, test_indirect_selection=test_indirect_selection, on_warning_callback=on_warning_callback, - set_task_id_by_node=set_task_id_by_node, + normalize_task_id=normalize_task_id, node=node, ) if task_or_group is not None: diff --git a/cosmos/config.py b/cosmos/config.py index 50064731e..7ac9fda3a 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -63,7 +63,7 @@ class RenderConfig: :param enable_mock_profile: Allows to enable/disable mocking profile. Enabled by default. Mock profiles are useful for parsing Cosmos DAGs in the CI, but should be disabled to benefit from partial parsing (since Cosmos 1.4). :param source_rendering_behavior: Determines how source nodes are rendered when using cosmos default source node rendering (ALL, NONE, WITH_TESTS_OR_FRESHNESS). Defaults to "NONE" (since Cosmos 1.6). :param airflow_vars_to_purge_dbt_ls_cache: Specify Airflow variables that will affect the LoadMode.DBT_LS cache. - :param set_task_id_by_node: A callable that takes a dbt node as input and returns the task ID. This allows users to assign a custom node ID separate from the display name. + :param normalize_task_id: A callable that takes a dbt node as input and returns the task ID. This allows users to assign a custom node ID separate from the display name. """ emit_datasets: bool = True @@ -82,7 +82,7 @@ class RenderConfig: enable_mock_profile: bool = True source_rendering_behavior: SourceRenderingBehavior = SourceRenderingBehavior.NONE airflow_vars_to_purge_dbt_ls_cache: list[str] = field(default_factory=list) - set_task_id_by_node: Callable[..., Any] | None = None + normalize_task_id: Callable[..., Any] | None = None def __post_init__(self, dbt_project_path: str | Path | None) -> None: if self.env_vars: diff --git a/docs/configuration/render-config.rst b/docs/configuration/render-config.rst index a8f0b76b5..745b7018c 100644 --- a/docs/configuration/render-config.rst +++ b/docs/configuration/render-config.rst @@ -19,7 +19,7 @@ The ``RenderConfig`` class takes the following arguments: - ``dbt_project_path``: Configures the DBT project location accessible on their airflow controller for DAG rendering - Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM`` - ``airflow_vars_to_purge_cache``: (new in v1.5) Specify Airflow variables that will affect the ``LoadMode.DBT_LS`` cache. See `Caching <./caching.html>`_ for more information. - ``source_rendering_behavior``: Determines how source nodes are rendered when using cosmos default source node rendering (ALL, NONE, WITH_TESTS_OR_FRESHNESS). Defaults to "NONE" (since Cosmos 1.6). See `Source Nodes Rendering <./source-nodes-rendering.html>`_ for more information. -- ``set_task_id_by_node``: A callable that takes a dbt node as input and returns the task ID. This function allows users to set a custom task_id independently of the model name, which can be specified as the task’s display_name. This way, task_id can be modified using a user-defined function, while the model name remains as the task’s display name. The display_name parameter is available in Airflow 2.9 and above. See `Task display name <./task-display-name.html>`_ for more information. +- ``normalize_task_id``: A callable that takes a dbt node as input and returns the task ID. This function allows users to set a custom task_id independently of the model name, which can be specified as the task’s display_name. This way, task_id can be modified using a user-defined function, while the model name remains as the task’s display name. The display_name parameter is available in Airflow 2.9 and above. See `Task display name <./task-display-name.html>`_ for more information. Customizing how nodes are rendered (experimental) ------------------------------------------------- diff --git a/docs/configuration/task-display-name.rst b/docs/configuration/task-display-name.rst index b90eb24b9..659eb22e3 100644 --- a/docs/configuration/task-display-name.rst +++ b/docs/configuration/task-display-name.rst @@ -8,7 +8,7 @@ Task display name In Airflow, `task_id` does not support non-ASCII characters. Therefore, if users wish to use non-ASCII characters (such as their native language) as display names while keeping `task_id` in ASCII, they can use the `display_name` parameter. -To work with projects that use non-ASCII characters in model names, the `set_task_id_by_node` field of `RenderConfig` can be utilized. +To work with projects that use non-ASCII characters in model names, the `normalize_task_id` field of `RenderConfig` can be utilized. Example: @@ -19,12 +19,12 @@ You can provide a function to convert the model name to an ASCII-compatible form from slugify import slugify - def set_task_id_by_node(node): + def normalize_task_id(node): return slugify(node.name) from cosmos import DbtTaskGroup, RenderConfig jaffle_shop = DbtTaskGroup( - render_config=RenderConfig(set_task_id_by_node=set_task_id_by_node) + render_config=RenderConfig(normalize_task_id=normalize_task_id) ) diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index 33d87a5cb..89bfd060f 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -568,8 +568,8 @@ def test_create_task_metadata_snapshot(caplog): assert metadata.arguments == {"models": "my_snapshot"} -def _set_task_id_by_node(node: DbtNode) -> str: - """for test_create_task_metadata_set_task_id_by_node""" +def _normalize_task_id(node: DbtNode) -> str: + """for test_create_task_metadata_normalize_task_id""" return f"new_task_id_{node.name}_{node.resource_type.value}" @@ -578,9 +578,9 @@ def _set_task_id_by_node(node: DbtNode) -> str: reason="Airflow task did not have display_name until the 2.9 release", ) @pytest.mark.parametrize( - "node_type,node_id,set_task_id_by_node,use_task_group,expected_node_id,expected_display_name", + "node_type,node_id,normalize_task_id,use_task_group,expected_node_id,expected_display_name", [ - # set_task_id_by_node is None (default) + # normalize_task_id is None (default) ( DbtResourceType.MODEL, f"{DbtResourceType.MODEL.value}.my_folder.test_node", @@ -605,11 +605,11 @@ def _set_task_id_by_node(node: DbtNode) -> str: "test_node_seed", None, ), - # set_task_id_by_node is passed and use_task_group is False + # normalize_task_id is passed and use_task_group is False ( DbtResourceType.MODEL, f"{DbtResourceType.MODEL.value}.my_folder.test_node", - _set_task_id_by_node, + _normalize_task_id, False, "new_task_id_test_node_model", "test_node_run", @@ -617,7 +617,7 @@ def _set_task_id_by_node(node: DbtNode) -> str: ( DbtResourceType.SOURCE, f"{DbtResourceType.MODEL.value}.my_folder.test_node", - _set_task_id_by_node, + _normalize_task_id, False, "new_task_id_test_node_source", "test_node_source", @@ -625,16 +625,16 @@ def _set_task_id_by_node(node: DbtNode) -> str: ( DbtResourceType.SEED, f"{DbtResourceType.MODEL.value}.my_folder.test_node", - _set_task_id_by_node, + _normalize_task_id, False, "new_task_id_test_node_seed", "test_node_seed", ), - # set_task_id_by_node is passed and use_task_group is True + # normalize_task_id is passed and use_task_group is True ( DbtResourceType.MODEL, f"{DbtResourceType.MODEL.value}.my_folder.test_node", - _set_task_id_by_node, + _normalize_task_id, True, "run", None, @@ -642,7 +642,7 @@ def _set_task_id_by_node(node: DbtNode) -> str: ( DbtResourceType.SOURCE, f"{DbtResourceType.MODEL.value}.my_folder.test_node", - _set_task_id_by_node, + _normalize_task_id, True, "source", None, @@ -650,15 +650,15 @@ def _set_task_id_by_node(node: DbtNode) -> str: ( DbtResourceType.SEED, f"{DbtResourceType.MODEL.value}.my_folder.test_node", - _set_task_id_by_node, + _normalize_task_id, True, "seed", None, ), ], ) -def test_create_task_metadata_set_task_id_by_node( - node_type, node_id, set_task_id_by_node, use_task_group, expected_node_id, expected_display_name +def test_create_task_metadata_normalize_task_id( + node_type, node_id, normalize_task_id, use_task_group, expected_node_id, expected_display_name ): node = DbtNode( unique_id=node_id, @@ -675,7 +675,7 @@ def test_create_task_metadata_set_task_id_by_node( args=args, dbt_dag_task_group_identifier="", use_task_group=use_task_group, - set_task_id_by_node=set_task_id_by_node, + normalize_task_id=normalize_task_id, source_rendering_behavior=SourceRenderingBehavior.ALL, ) assert metadata.id == expected_node_id From fa6b2d99f55ef682b3a7ae3bdc11ae7a482e037f Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 31 Oct 2024 05:09:14 +0000 Subject: [PATCH 11/14] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20f?= =?UTF-8?q?ormat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cosmos/airflow/graph.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 1f8f80e3e..71a589647 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -208,7 +208,9 @@ def create_task_metadata( args = {} return TaskMetadata(id=task_id, operator_class="airflow.operators.empty.EmptyOperator", arguments=args) else: - task_id, args = _get_task_id_and_args(node, args, use_task_group, normalize_task_id, node.resource_type.value) + task_id, args = _get_task_id_and_args( + node, args, use_task_group, normalize_task_id, node.resource_type.value + ) task_metadata = TaskMetadata( id=task_id, @@ -228,6 +230,7 @@ def create_task_metadata( logger.warning(msg) return None + def generate_task_or_group( dag: DAG, task_group: TaskGroup | None, From 03076bcea91fc39e233fd24151a6dfea9fa4a171 Mon Sep 17 00:00:00 2001 From: t0momi219 Date: Thu, 31 Oct 2024 16:57:37 +0900 Subject: [PATCH 12/14] fix document --- docs/configuration/task-display-name.rst | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/docs/configuration/task-display-name.rst b/docs/configuration/task-display-name.rst index 659eb22e3..56c750dd2 100644 --- a/docs/configuration/task-display-name.rst +++ b/docs/configuration/task-display-name.rst @@ -6,9 +6,9 @@ Task display name .. note:: This feature is only available for Airflow >= 2.9. -In Airflow, `task_id` does not support non-ASCII characters. Therefore, if users wish to use non-ASCII characters (such as their native language) as display names while keeping `task_id` in ASCII, they can use the `display_name` parameter. +In Airflow, ``task_id`` does not support non-ASCII characters. Therefore, if users wish to use non-ASCII characters (such as their native language) as display names while keeping ``task_id`` in ASCII, they can use the ``display_name`` parameter. -To work with projects that use non-ASCII characters in model names, the `normalize_task_id` field of `RenderConfig` can be utilized. +To work with projects that use non-ASCII characters in model names, the ``normalize_task_id`` field of ``RenderConfig`` can be utilized. Example: @@ -28,3 +28,6 @@ You can provide a function to convert the model name to an ASCII-compatible form jaffle_shop = DbtTaskGroup( render_config=RenderConfig(normalize_task_id=normalize_task_id) ) + +.. note:: + Although the slugify example often works, it may not be suitable for use in actual production. Since slugify performs conversions based on pronunciation, there may be cases where task_id is not unique due to homophones and similar issues. From 3aa4495bd212ec6b2aa13343e399493ff2c1d07b Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 19 Dec 2024 15:45:52 +0000 Subject: [PATCH 13/14] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20f?= =?UTF-8?q?ormat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cosmos/airflow/graph.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index c0d08b261..0d6d1c9d7 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -152,7 +152,7 @@ def _get_task_id_and_args( task_id = f"{node.name}_{resource_suffix}" return task_id, args_update - + def create_dbt_resource_to_class(test_behavior: TestBehavior) -> dict[str, str]: """ Return the map from dbt node type to Cosmos class prefix that should be used From 0a797e92fa3bf110aba2ecf7e92d08a02032c2a1 Mon Sep 17 00:00:00 2001 From: Pankaj Singh <98807258+pankajastro@users.noreply.github.com> Date: Thu, 19 Dec 2024 21:18:17 +0530 Subject: [PATCH 14/14] Update cosmos/airflow/graph.py --- cosmos/airflow/graph.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 0d6d1c9d7..d20a7de22 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -153,7 +153,7 @@ def _get_task_id_and_args( return task_id, args_update - def create_dbt_resource_to_class(test_behavior: TestBehavior) -> dict[str, str]: +def create_dbt_resource_to_class(test_behavior: TestBehavior) -> dict[str, str]: """ Return the map from dbt node type to Cosmos class prefix that should be used to handle them.