-
Notifications
You must be signed in to change notification settings - Fork 180
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Emit telemetry to Scarf during DAG run (#1397)
Export telemetry related to Cosmos usage to [Scarf](https://about.scarf.sh/). This data assists the project maintainers in better understanding how Cosmos is used. Insights from this telemetry are critical for prioritizing patches, minor releases, and security fixes. Additionally, this information supports critical decisions related to the development road map. Deployments and individual users can opt out of analytics by setting the configuration: ``` [cosmos] enable_telemetry: False ``` As described in the [official documentation](https://docs.scarf.sh/gateway/#do-not-track), it is also possible to opt-out by setting one of the following environment variables: ```commandline AIRFLOW__COSMOS__ENABLE_TELEMETRY=False DO_NOT_TRACK=True SCARF_NO_ANALYTICS=True ``` In addition to Scarf's default data collection, Cosmos collects the following information when running Cosmos-powered DAGs: - Cosmos version - Airflow version - Python version - Operating system & machine architecture - Event type - DAG hash - Total tasks - Total Cosmos tasks No user-identifiable information (IP included) is stored in Scarf, even though Scarf infers information from the IP, such as location, and stores that. The data collection is GDPR compliant. The Apache Foundation supports this same strategy in many of its OpenSource projects, including Airflow ([#39510](apache/airflow#39510)). Example of visualisation of the data via the Scarf UI: <img width="1235" alt="Screenshot 2024-12-19 at 10 22 59" src="https://github.com/user-attachments/assets/12b9fbd4-2fdd-4e62-9876-defee3c4d8da" /> <img width="1231" alt="Screenshot 2024-12-19 at 10 23 13" src="https://github.com/user-attachments/assets/f98b849c-99be-4764-9e6d-cb7730da3688" /> <img width="1227" alt="Screenshot 2024-12-19 at 10 23 21" src="https://github.com/user-attachments/assets/421b7581-c641-422a-8469-252ba5a2fd33" /> <img width="1237" alt="Screenshot 2024-12-19 at 10 23 28" src="https://github.com/user-attachments/assets/2e5995a2-fe09-4017-a625-4dd4a60028d0" /> <img width="1248" alt="Screenshot 2024-12-19 at 10 23 51" src="https://github.com/user-attachments/assets/64a8a07f-df56-493c-a3f5-0f5165fd58e8" /> <img width="1229" alt="Screenshot 2024-12-19 at 10 24 01" src="https://github.com/user-attachments/assets/1e3e8b8d-b11d-4b31-8b46-853d541b01b8" /> <img width="1240" alt="Screenshot 2024-12-19 at 10 24 11" src="https://github.com/user-attachments/assets/b5e79cc7-4e2e-44b2-a94b-891b9226b152" /> <img width="1241" alt="Screenshot 2024-12-19 at 10 24 20" src="https://github.com/user-attachments/assets/2fb5d666-d749-416d-acf8-4a3bc94ba014" /> <img width="1234" alt="Screenshot 2024-12-19 at 10 24 31" src="https://github.com/user-attachments/assets/353eb82c-44d2-44ec-87e2-ace7138132f5" /> <img width="1245" alt="Screenshot 2024-12-19 at 10 24 39" src="https://github.com/user-attachments/assets/4a637a2a-14ad-41a8-b7fd-db186ec74357" /> <img width="1233" alt="Screenshot 2024-12-19 at 10 24 48" src="https://github.com/user-attachments/assets/bec4e2b0-49c3-4289-8f9b-3285db9ec40c" /> Closes: #1143
- Loading branch information
Showing
11 changed files
with
477 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
Privacy Notice | ||
============== | ||
|
||
This project follows the `Privacy Policy of Astronomer <https://www.astronomer.io/privacy/>`_. | ||
|
||
Collection of Data | ||
------------------ | ||
|
||
Astronomer Cosmos integrates `Scarf <https://about.scarf.sh/>`_ to collect basic telemetry data during operation. | ||
This data assists the project maintainers in better understanding how Cosmos is used. | ||
Insights gained from this telemetry are critical for prioritizing patches, minor releases, and | ||
security fixes. Additionally, this information supports key decisions related to the development road map. | ||
|
||
Deployments and individual users can opt-out of analytics by setting the configuration: | ||
|
||
|
||
.. code-block:: | ||
[cosmos] enable_telemetry False | ||
As described in the `official documentation <https://docs.scarf.sh/gateway/#do-not-track>`_, it is also possible to opt out by setting one of the following environment variables: | ||
|
||
.. code-block:: | ||
DO_NOT_TRACK=True | ||
SCARF_NO_ANALYTICS=True | ||
In addition to Scarf's default data collection, Cosmos collect the following information when running Cosmos-powered DAGs: | ||
|
||
- Cosmos version | ||
- Airflow version | ||
- Python version | ||
- Operating system & machine architecture | ||
- Event type | ||
- The DAG hash | ||
- Total tasks | ||
- Total Cosmos tasks | ||
|
||
No user-identifiable information (IP included) is stored in Scarf. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
from __future__ import annotations | ||
|
||
from airflow.listeners import hookimpl | ||
from airflow.models.dag import DAG | ||
from airflow.models.dagrun import DagRun | ||
|
||
from cosmos import telemetry | ||
from cosmos.log import get_logger | ||
|
||
logger = get_logger(__name__) | ||
|
||
|
||
class EventStatus: | ||
SUCCESS = "success" | ||
FAILED = "failed" | ||
|
||
|
||
DAG_RUN = "dag_run" | ||
|
||
|
||
def total_cosmos_tasks(dag: DAG) -> int: | ||
""" | ||
Identify if there are any Cosmos DAGs on a given serialized `airflow.serialization.serialized_objects.SerializedDAG`. | ||
The approach is naive, from the perspective it does not take into account subclasses, but it is inexpensive and | ||
works. | ||
""" | ||
cosmos_tasks = 0 | ||
for task in dag.task_dict.values(): | ||
# In a real Airflow deployment, the following `task` is an instance of | ||
# `airflow.serialization.serialized_objects.SerializedBaseOperator` | ||
# and the only reference to Cosmos is in the _task_module. | ||
# It is suboptimal, but works as of Airflow 2.10 | ||
task_module = getattr(task, "_task_module", None) or task.__class__.__module__ | ||
if task_module.startswith("cosmos."): | ||
cosmos_tasks += 1 | ||
return cosmos_tasks | ||
|
||
|
||
# @provide_session | ||
@hookimpl | ||
def on_dag_run_success(dag_run: DagRun, msg: str) -> None: | ||
logger.debug("Running on_dag_run_success") | ||
# In a real Airflow deployment, the following `serialized_dag` is an instance of | ||
# `airflow.serialization.serialized_objects.SerializedDAG` | ||
# and it is not a subclass of DbtDag, nor contain any references to Cosmos | ||
serialized_dag = dag_run.get_dag() | ||
|
||
if not total_cosmos_tasks(serialized_dag): | ||
logger.debug("The DAG does not use Cosmos") | ||
return | ||
|
||
additional_telemetry_metrics = { | ||
"dag_hash": dag_run.dag_hash, | ||
"status": EventStatus.SUCCESS, | ||
"task_count": len(serialized_dag.task_ids), | ||
"cosmos_task_count": total_cosmos_tasks(serialized_dag), | ||
} | ||
|
||
telemetry.emit_usage_metrics_if_enabled(DAG_RUN, additional_telemetry_metrics) | ||
logger.debug("Completed on_dag_run_success") | ||
|
||
|
||
@hookimpl | ||
def on_dag_run_failed(dag_run: DagRun, msg: str) -> None: | ||
logger.debug("Running on_dag_run_failed") | ||
# In a real Airflow deployment, the following `serialized_dag` is an instance of | ||
# `airflow.serialization.serialized_objects.SerializedDAG` | ||
# and it is not a subclass of DbtDag, nor contain any references to Cosmos | ||
serialized_dag = dag_run.get_dag() | ||
|
||
if not total_cosmos_tasks(serialized_dag): | ||
logger.debug("The DAG does not use Cosmos") | ||
return | ||
|
||
additional_telemetry_metrics = { | ||
"dag_hash": dag_run.dag_hash, | ||
"status": EventStatus.FAILED, | ||
"task_count": len(serialized_dag.task_ids), | ||
"cosmos_task_count": total_cosmos_tasks(serialized_dag), | ||
} | ||
|
||
telemetry.emit_usage_metrics_if_enabled(DAG_RUN, additional_telemetry_metrics) | ||
logger.debug("Completed on_dag_run_failed") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
from __future__ import annotations | ||
|
||
import platform | ||
from urllib import parse | ||
from urllib.parse import urlencode | ||
|
||
import httpx | ||
from airflow import __version__ as airflow_version | ||
|
||
import cosmos | ||
from cosmos import constants, settings | ||
from cosmos.log import get_logger | ||
|
||
logger = get_logger(__name__) | ||
|
||
|
||
def should_emit() -> bool: | ||
""" | ||
Identify if telemetry metrics should be emitted or not. | ||
""" | ||
return settings.enable_telemetry and not settings.do_not_track and not settings.no_analytics | ||
|
||
|
||
def collect_standard_usage_metrics() -> dict[str, object]: | ||
""" | ||
Return standard telemetry metrics. | ||
""" | ||
metrics = { | ||
"cosmos_version": cosmos.__version__, # type: ignore[attr-defined] | ||
"airflow_version": parse.quote(airflow_version), | ||
"python_version": platform.python_version(), | ||
"platform_system": platform.system(), | ||
"platform_machine": platform.machine(), | ||
"variables": {}, | ||
} | ||
return metrics | ||
|
||
|
||
def emit_usage_metrics(metrics: dict[str, object]) -> bool: | ||
""" | ||
Emit desired telemetry metrics to remote telemetry endpoint. | ||
The metrics must contain the necessary fields to build the TELEMETRY_URL. | ||
""" | ||
query_string = urlencode(metrics) | ||
telemetry_url = constants.TELEMETRY_URL.format( | ||
**metrics, telemetry_version=constants.TELEMETRY_VERSION, query_string=query_string | ||
) | ||
logger.debug("Telemetry is enabled. Emitting the following usage metrics to %s: %s", telemetry_url, metrics) | ||
response = httpx.get(telemetry_url, timeout=constants.TELEMETRY_TIMEOUT, follow_redirects=True) | ||
if not response.is_success: | ||
logger.warning( | ||
"Unable to emit usage metrics to %s. Status code: %s. Message: %s", | ||
telemetry_url, | ||
response.status_code, | ||
response.text, | ||
) | ||
return response.is_success | ||
|
||
|
||
def emit_usage_metrics_if_enabled(event_type: str, additional_metrics: dict[str, object]) -> bool: | ||
""" | ||
Checks if telemetry should be emitted, fetch standard metrics, complement with custom metrics | ||
and emit them to remote telemetry endpoint. | ||
:returns: If the event was successfully sent to the telemetry backend or not. | ||
""" | ||
if should_emit(): | ||
metrics = collect_standard_usage_metrics() | ||
metrics["event_type"] = event_type | ||
metrics["variables"].update(additional_metrics) # type: ignore[attr-defined] | ||
metrics.update(additional_metrics) | ||
is_success = emit_usage_metrics(metrics) | ||
return is_success | ||
else: | ||
logger.debug("Telemetry is disabled. To enable it, export AIRFLOW__COSMOS__ENABLE_TELEMETRY=True.") | ||
return False |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.