diff --git a/cosmos/constants.py b/cosmos/constants.py index 51951d259..5104e9760 100644 --- a/cosmos/constants.py +++ b/cosmos/constants.py @@ -161,6 +161,6 @@ def _missing_value_(cls, value): # type: ignore DBT_COMPILE_TASK_ID = "dbt_compile" -TELEMETRY_URL = "https://astronomer.gateway.scarf.sh/astronomer-cosmos/{telemetry_version}/{cosmos_version}/{airflow_version}/{python_version}/{platform_system}/{platform_machine}/{event_type}/{status}/{dag_hash}/{cosmos_task_count}" +TELEMETRY_URL = "https://astronomer.gateway.scarf.sh/astronomer-cosmos/{telemetry_version}/{cosmos_version}/{airflow_version}/{python_version}/{platform_system}/{platform_machine}/{event_type}/{status}/{dag_hash}/{is_cosmos_dag}/{cosmos_task_groups_count}/{task_count}/{cosmos_task_count}" TELEMETRY_VERSION = "v1" TELEMETRY_TIMEOUT = 5.0 diff --git a/cosmos/listeners/dag_run.py b/cosmos/listeners/dag_run_listener.py similarity index 59% rename from cosmos/listeners/dag_run.py rename to cosmos/listeners/dag_run_listener.py index d702c727e..b65c6f409 100644 --- a/cosmos/listeners/dag_run.py +++ b/cosmos/listeners/dag_run_listener.py @@ -1,6 +1,9 @@ from __future__ import annotations import functools +import time +from contextlib import contextmanager +from typing import Generator from airflow.listeners import hookimpl from airflow.models.dag import DAG @@ -9,6 +12,17 @@ from cosmos import telemetry from cosmos.airflow.dag import DbtDag from cosmos.airflow.task_group import DbtTaskGroup +from cosmos.log import get_logger + +logger = get_logger(__name__) + + +@contextmanager +def measure_time() -> Generator[None, None, None]: + start = time.perf_counter() + yield + end = time.perf_counter() + logger.info(f"DAG listener metrics collection time: {end - start:.6f} seconds") class EventStatus: @@ -54,16 +68,19 @@ def uses_cosmos(dag: DAG) -> bool: @hookimpl def on_dag_run_success(dag_run: DagRun, msg: str) -> None: dag = dag_run.get_dag() + if not uses_cosmos(dag): return - additional_telemetry_metrics = { - "dag_hash": dag_run.dag_hash, - "status": EventStatus.SUCCESS, - "task_count": len(dag.task_ids), - "cosmos_task_count": total_cosmos_tasks(dag), - "cosmos_task_groups_count": total_cosmos_task_groups(dag), - "is_cosmos_dag": is_cosmos_dag(dag), - } + + with measure_time(): + additional_telemetry_metrics = { + "dag_hash": dag_run.dag_hash, + "status": EventStatus.SUCCESS, + "task_count": len(dag.task_ids), + "cosmos_task_count": total_cosmos_tasks(dag), + "cosmos_task_groups_count": total_cosmos_task_groups(dag), + "is_cosmos_dag": is_cosmos_dag(dag), + } telemetry.emit_usage_metrics_if_enabled(DAG_RUN, additional_telemetry_metrics) @@ -73,13 +90,15 @@ def on_dag_run_failed(dag_run: DagRun, msg: str) -> None: dag = dag_run.get_dag() if not uses_cosmos(dag): return - additional_telemetry_metrics = { - "dag_hash": dag_run.dag_hash, - "status": EventStatus.FAILED, - "task_count": len(dag.task_ids), - "cosmos_task_count": total_cosmos_tasks(dag), - "cosmos_task_groups_count": total_cosmos_task_groups(dag), - "is_cosmos_dag": is_cosmos_dag(dag), - } + + with measure_time(): + additional_telemetry_metrics = { + "dag_hash": dag_run.dag_hash, + "status": EventStatus.FAILED, + "task_count": len(dag.task_ids), + "cosmos_task_count": total_cosmos_tasks(dag), + "cosmos_task_groups_count": total_cosmos_task_groups(dag), + "is_cosmos_dag": is_cosmos_dag(dag), + } telemetry.emit_usage_metrics_if_enabled(DAG_RUN, additional_telemetry_metrics) diff --git a/cosmos/plugin/__init__.py b/cosmos/plugin/__init__.py index 496369121..4bbea4fa2 100644 --- a/cosmos/plugin/__init__.py +++ b/cosmos/plugin/__init__.py @@ -10,7 +10,7 @@ from flask import abort, url_for from flask_appbuilder import AppBuilder, expose -from cosmos.listeners import dag_run +from cosmos.listeners import dag_run_listener from cosmos.settings import dbt_docs_conn_id, dbt_docs_dir, dbt_docs_index_file_name, in_astro_cloud if in_astro_cloud: @@ -270,4 +270,4 @@ class CosmosPlugin(AirflowPlugin): "href": conf.get("webserver", "base_url") + "/cosmos/dbt_docs", } appbuilder_views = [item] - listeners = [dag_run] + listeners = [dag_run_listener] diff --git a/tests/test_telemetry.py b/tests/test_telemetry.py index d074a9bff..82760b0d8 100644 --- a/tests/test_telemetry.py +++ b/tests/test_telemetry.py @@ -55,16 +55,19 @@ def test_emit_usage_metrics_fails(mock_httpx_get, caplog): "event_type": "dag_run", "status": "success", "dag_hash": "d151d1fa2f03270ea116cc7494f2c591", + "is_cosmos_dag": True, + "cosmos_task_groups_count": 0, + "task_count": 3, "cosmos_task_count": 3, } is_success = telemetry.emit_usage_metrics(sample_metrics) mock_httpx_get.assert_called_once_with( - f"""https://astronomer.gateway.scarf.sh/astronomer-cosmos/v1/1.8.0a4/2.10.1/3.11/darwin/amd64/dag_run/success/d151d1fa2f03270ea116cc7494f2c591/3""", + f"""https://astronomer.gateway.scarf.sh/astronomer-cosmos/v1/1.8.0a4/2.10.1/3.11/darwin/amd64/dag_run/success/d151d1fa2f03270ea116cc7494f2c591/True/0/3/3""", timeout=5.0, follow_redirects=True, ) assert not is_success - log_msg = f"""Unable to emit usage metrics to https://astronomer.gateway.scarf.sh/astronomer-cosmos/v1/1.8.0a4/2.10.1/3.11/darwin/amd64/dag_run/success/d151d1fa2f03270ea116cc7494f2c591/3. Status code: 404. Message: Non existent URL""" + log_msg = f"""Unable to emit usage metrics to https://astronomer.gateway.scarf.sh/astronomer-cosmos/v1/1.8.0a4/2.10.1/3.11/darwin/amd64/dag_run/success/d151d1fa2f03270ea116cc7494f2c591/True/0/3/3. Status code: 404. Message: Non existent URL""" assert caplog.text.startswith("WARNING") assert log_msg in caplog.text @@ -80,8 +83,11 @@ def test_emit_usage_metrics_succeeds(caplog): "platform_machine": "amd64", "event_type": "dag_run", "status": "success", - "dag_hash": "d151d1fa2f03270ea116cc7494f2c591", - "cosmos_task_count": 3, + "dag_hash": "dag-hash-ci", + "is_cosmos_dag": False, + "cosmos_task_groups_count": 1, + "task_count": 33, + "cosmos_task_count": 33, } is_success = telemetry.emit_usage_metrics(sample_metrics) assert is_success