From c2a8217d440a6128fea750f3e17f32527c2e1710 Mon Sep 17 00:00:00 2001 From: Pankaj Singh <98807258+pankajastro@users.noreply.github.com> Date: Fri, 20 Dec 2024 16:24:10 +0530 Subject: [PATCH] Support rendering build operator task-id with non-ASCII characters (#1415) In PR [#1278](https://github.com/astronomer/astronomer-cosmos/pull/1278), we introduced support for rendering non-ASCII characters in task IDs. However, due to limited access, we were unable to implement the same functionality for the build operator. This PR aims to extend that functionality by adding support for rendering build task IDs with non-ASCII characters. --- cosmos/airflow/graph.py | 12 ++++++++--- tests/airflow/test_graph.py | 41 +++++++++++++++++++++++++++++++++++-- 2 files changed, 48 insertions(+), 5 deletions(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index d20a7de22..662833cda 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -138,18 +138,22 @@ def _get_task_id_and_args( use_task_group: bool, normalize_task_id: Callable[..., Any] | None, resource_suffix: str, + include_resource_type: bool = False, ) -> tuple[str, dict[str, Any]]: """ Generate task ID and update args with display name if needed. """ args_update = args + task_display_name = f"{node.name}_{resource_suffix}" + if include_resource_type: + task_display_name = f"{node.name}_{node.resource_type.value}_{resource_suffix}" if use_task_group: task_id = resource_suffix elif normalize_task_id: task_id = normalize_task_id(node) - args_update["task_display_name"] = f"{node.name}_{resource_suffix}" + args_update["task_display_name"] = task_display_name else: - task_id = f"{node.name}_{resource_suffix}" + task_id = task_display_name return task_id, args_update @@ -214,7 +218,9 @@ def create_task_metadata( } if test_behavior == TestBehavior.BUILD and node.resource_type in SUPPORTED_BUILD_RESOURCES: - task_id = f"{node.name}_{node.resource_type.value}_build" + task_id, args = _get_task_id_and_args( + node, args, use_task_group, normalize_task_id, "build", include_resource_type=True + ) elif node.resource_type == DbtResourceType.MODEL: task_id, args = _get_task_id_and_args(node, args, use_task_group, normalize_task_id, "run") elif node.resource_type == DbtResourceType.SOURCE: diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index c00f0cf53..61e6c3a8f 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -621,7 +621,7 @@ def _normalize_task_id(node: DbtNode) -> str: reason="Airflow task did not have display_name until the 2.9 release", ) @pytest.mark.parametrize( - "node_type,node_id,normalize_task_id,use_task_group,expected_node_id,expected_display_name", + "node_type,node_id,normalize_task_id,use_task_group,test_behavior,expected_node_id,expected_display_name", [ # normalize_task_id is None (default) ( @@ -629,6 +629,7 @@ def _normalize_task_id(node: DbtNode) -> str: f"{DbtResourceType.MODEL.value}.my_folder.test_node", None, False, + None, "test_node_run", None, ), @@ -637,6 +638,7 @@ def _normalize_task_id(node: DbtNode) -> str: f"{DbtResourceType.SOURCE.value}.my_folder.test_node", None, False, + None, "test_node_source", None, ), @@ -645,15 +647,26 @@ def _normalize_task_id(node: DbtNode) -> str: f"{DbtResourceType.SEED.value}.my_folder.test_node", None, False, + None, "test_node_seed", None, ), + ( + DbtResourceType.SEED, + f"{DbtResourceType.SEED.value}.my_folder.test_node", + None, + False, + TestBehavior.BUILD, + "test_node_seed_build", + None, + ), # normalize_task_id is passed and use_task_group is False ( DbtResourceType.MODEL, f"{DbtResourceType.MODEL.value}.my_folder.test_node", _normalize_task_id, False, + None, "new_task_id_test_node_model", "test_node_run", ), @@ -662,6 +675,7 @@ def _normalize_task_id(node: DbtNode) -> str: f"{DbtResourceType.MODEL.value}.my_folder.test_node", _normalize_task_id, False, + None, "new_task_id_test_node_source", "test_node_source", ), @@ -670,15 +684,26 @@ def _normalize_task_id(node: DbtNode) -> str: f"{DbtResourceType.MODEL.value}.my_folder.test_node", _normalize_task_id, False, + None, "new_task_id_test_node_seed", "test_node_seed", ), + ( + DbtResourceType.SEED, + f"{DbtResourceType.MODEL.value}.my_folder.test_node", + _normalize_task_id, + False, + TestBehavior.BUILD, + "new_task_id_test_node_seed", + "test_node_seed_build", + ), # normalize_task_id is passed and use_task_group is True ( DbtResourceType.MODEL, f"{DbtResourceType.MODEL.value}.my_folder.test_node", _normalize_task_id, True, + None, "run", None, ), @@ -687,6 +712,7 @@ def _normalize_task_id(node: DbtNode) -> str: f"{DbtResourceType.MODEL.value}.my_folder.test_node", _normalize_task_id, True, + None, "source", None, ), @@ -695,13 +721,23 @@ def _normalize_task_id(node: DbtNode) -> str: f"{DbtResourceType.MODEL.value}.my_folder.test_node", _normalize_task_id, True, + None, "seed", None, ), + ( + DbtResourceType.SEED, + f"{DbtResourceType.MODEL.value}.my_folder.test_node", + _normalize_task_id, + True, + TestBehavior.BUILD, + "build", + None, + ), ], ) def test_create_task_metadata_normalize_task_id( - node_type, node_id, normalize_task_id, use_task_group, expected_node_id, expected_display_name + node_type, node_id, normalize_task_id, use_task_group, test_behavior, expected_node_id, expected_display_name ): node = DbtNode( unique_id=node_id, @@ -720,6 +756,7 @@ def test_create_task_metadata_normalize_task_id( use_task_group=use_task_group, normalize_task_id=normalize_task_id, source_rendering_behavior=SourceRenderingBehavior.ALL, + test_behavior=test_behavior, ) assert metadata.id == expected_node_id if expected_display_name: