Skip to content

Commit

Permalink
Fix detached test tasks names so they do not exceed 250 chars (#1464)
Browse files Browse the repository at this point in the history
Since we introduced detached test tasks in #1433 (released in 1.8.0),
users started facing issues due to very long task names exceeding
Airflow's limits.
    
Example of Python traceback reported by user:
```
     Traceback (most recent call last):
      File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 968, in __init__
        validate_key(task_id)
      File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/helpers.py", line 55, in validate_key
        raise AirflowException(f"The key has to be less than {max_length} characters")
    airflow.exceptions.AirflowException: The key has to be less than 250 characters
```
    
This PR fixes this issue. In case the name exceeds Airflow's limit (250
ATM), it will name the detached test using:
- "detached_{incremental unique number}_test"

We also considered naming the new test using:
- "parent1_parent2_..._test" - but that may not solve the issue,
especially in circumstances where the same parents may have multiple
detached nodes. Perhaps in future, we could bundle them in a single
task.

Closes: #1440
  • Loading branch information
tatiana authored Jan 15, 2025
1 parent c359813 commit 42d30a1
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 4 deletions.
37 changes: 33 additions & 4 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from __future__ import annotations

from collections import defaultdict
from collections import OrderedDict, defaultdict
from typing import Any, Callable, Union

from airflow.models import BaseOperator
from airflow.models.base import ID_LEN as AIRFLOW_MAX_ID_LENGTH
from airflow.models.dag import DAG
from airflow.utils.task_group import TaskGroup

Expand Down Expand Up @@ -409,6 +410,13 @@ def _get_dbt_dag_task_group_identifier(dag: DAG, task_group: TaskGroup | None) -
return dag_task_group_identifier


def should_create_detached_nodes(test_behavior: TestBehavior) -> bool:
"""
Decide if we should calculate / insert detached nodes into the graph.
"""
return test_behavior in (TestBehavior.BUILD, TestBehavior.AFTER_EACH)


def identify_detached_nodes(
nodes: dict[str, DbtNode],
test_behavior: TestBehavior,
Expand All @@ -422,14 +430,34 @@ def identify_detached_nodes(
Change in-place the dictionaries detached_nodes (detached node ID : node) and detached_from_parent (parent node ID that
is upstream to this test and the test node).
"""
if test_behavior in (TestBehavior.BUILD, TestBehavior.AFTER_EACH):
if should_create_detached_nodes(test_behavior):
for node_id, node in nodes.items():
if is_detached_test(node):
detached_nodes[node_id] = node
for parent_id in node.depends_on:
detached_from_parent[parent_id].append(node)


_counter = 0


def calculate_detached_node_name(node: DbtNode) -> str:
"""
Given a detached test node, calculate its name. It will either be:
- the name of the test with a "_test" suffix, if this is smaller than 250
- or detached_{an incremental number}_test
"""
# Note: this implementation currently relies on the fact that Airflow creates a new process
# to parse each DAG both in the scheduler and also in the worker nodes. We logged a ticket to improved this:
# https://github.com/astronomer/astronomer-cosmos/issues/1469
node_name = f"{node.resource_name.split('.')[0]}_test"
if not len(node_name) < AIRFLOW_MAX_ID_LENGTH:
global _counter
node_name = f"detached_{_counter}_test"
_counter += 1
return node_name


def build_airflow_graph(
nodes: dict[str, DbtNode],
dag: DAG, # Airflow-specific - parent DAG where to associate tasks and (optional) task groups
Expand Down Expand Up @@ -474,7 +502,7 @@ def build_airflow_graph(

# Identify test nodes that should be run detached from the associated dbt resource nodes because they
# have multiple parents
detached_nodes: dict[str, DbtNode] = {}
detached_nodes: dict[str, DbtNode] = OrderedDict()
detached_from_parent: dict[str, list[DbtNode]] = defaultdict(list)
identify_detached_nodes(nodes, test_behavior, detached_nodes, detached_from_parent)

Expand Down Expand Up @@ -522,8 +550,9 @@ def build_airflow_graph(
elif test_behavior in (TestBehavior.BUILD, TestBehavior.AFTER_EACH):
# Handle detached test nodes
for node_id, node in detached_nodes.items():
datached_node_name = calculate_detached_node_name(node)
test_meta = create_test_task_metadata(
f"{node.resource_name.split('.')[0]}_test",
datached_node_name,
execution_mode,
test_indirect_selection,
task_args=task_args,
Expand Down
27 changes: 27 additions & 0 deletions tests/airflow/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from cosmos.airflow.graph import (
_snake_case_to_camelcase,
build_airflow_graph,
calculate_detached_node_name,
calculate_leaves,
calculate_operator_class,
create_task_metadata,
Expand Down Expand Up @@ -78,6 +79,32 @@
sample_nodes = {node.unique_id: node for node in sample_nodes_list}


def test_calculate_datached_node_name_under_is_under_250():
node = DbtNode(
unique_id="model.my_dbt_project.a_very_short_name",
resource_type=DbtResourceType.MODEL,
depends_on=[],
file_path="",
)
assert calculate_detached_node_name(node) == "a_very_short_name_test"

node = DbtNode(
unique_id="model.my_dbt_project." + "this_is_a_very_long_name" * 20, # 24 x 20 = 480 characters
resource_type=DbtResourceType.MODEL,
depends_on=[],
file_path="",
)
assert calculate_detached_node_name(node) == "detached_0_test"

node = DbtNode(
unique_id="model.my_dbt_project." + "this_is_another_very_long_name" * 20,
resource_type=DbtResourceType.MODEL,
depends_on=[],
file_path="",
)
assert calculate_detached_node_name(node) == "detached_1_test"


@pytest.mark.skipif(
version.parse(airflow_version) < version.parse("2.4"),
reason="Airflow DAG did not have task_group_dict until the 2.4 release",
Expand Down
4 changes: 4 additions & 0 deletions tests/test_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ def test_converter_creates_dag_with_test_with_multiple_parents():
# We should have a task dedicated to run the test with multiple parents
args = tasks["test.my_dbt_project.custom_test_combined_model_combined_model_.c6e4587380"].build_cmd({})[0]
assert args[1:] == ["test", "--select", "custom_test_combined_model_combined_model_.c6e4587380"]
assert (
tasks["test.my_dbt_project.custom_test_combined_model_combined_model_.c6e4587380"].task_id
== "custom_test_combined_model_combined_model__test"
)


@pytest.mark.integration
Expand Down

0 comments on commit 42d30a1

Please sign in to comment.