From cee13899299f8ef03400c9a5405ffb9a09b9f52f Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Thu, 31 Oct 2024 13:35:27 +0000 Subject: [PATCH] Fix parsing dbt ls outputs that contain JSONs that are not dbt nodes (#1296) This change makes Cosmos more resilient, allowing it to be used even when JSONs do not represent dbt nodes in the `dbt ls` output. **Context** An Astronomer customer [raised a P1 incident](https://astronomer.zendesk.com/agent/tickets/67681), mentioning they could no longer run their Cosmos-powered DAGs. They were using Cosmos 1.5.0, and the issue was observed whenever DAGs were deployed using `Astro deploy --dags`, even if they only had whitespace as a difference. The DAGs could no longer be parsed, raising an exception similar to: ``` File /usr/local/lib/python3.11/site-packages/cosmos/dbt/graph.py, line 135, in parse_dbt_ls_output unique_id=node_dict[unique_id] KeyError: 'unique_id' ``` **Explanation** The customer recently changed their dbt project, adding print debug statements to one of their dbt macros. This caused the dbt ls output to contain lines that were valid JSON but were not valid dbt nodes, as observed in: ``` 11:20:43 Running with dbt=1.7.6 11:20:45 Registered adapter: bigquery=1.7.2 11:20:45 Unable to do partial parsing because saved manifest not found. Starting full parse. /***************************/ Values returned by mac_get_values: {} /***************************/ {"name": "some_model", "resource_type": "model", "package_name": "some_package", "original_file_path": "models/some_model.sql", "unique_id": "model.some_package.some_model", "alias": "some_model_some_package_1_8_0", "config": {"enabled": true, "alias": "some_model_some_package-1.8.0", "schema": "some_schema", "database": null, "tags": [], "meta": {}, "group": null, "materialized": "view", "incremental_strategy": null, "persist_docs": {}, "post-hook": [], "pre-hook": [], "quoting": {}, "column_types": {}, "full_refresh": null, "unique_key": null, "on_schema_change": "ignore", "on_configuration_change": "apply", "grants": {}, "packages": [], "docs": {"show": true, "node_color": null}, "contract": {"enforced": false, "alias_types": true}, "access": "protected"}, "tags": [], "depends_on": {"macros": [], "nodes": ["source.some_source"]}}""" ``` Cosmos didn't consider this use case. It assumed if a line was a JSON, it should be a dbt node: https://github.com/astronomer/astronomer-cosmos/blob/42a397fb40ff537c74bb6f596b4936815b14abbb/cosmos/dbt/graph.py#L161-L185 **Workaround** If customers updated the macro to print the information in a single line, they'd no longer observe the issue: ``` Values returned by mac_get_values: {} ``` We also released [1.5.0rc2](https://github.com/astronomer/astronomer-cosmos/releases/tag/astronomer-cosmos-v1.5.0rc2) with the change #1295, similar to the one introduced by this PR. **Fix** This change makes Cosmos more resilient to scenarios where `dbt ls` may output JSON lines that are not valid dbt nodes. It also logs those lines to help troubleshoot. We added a unit test to make sure we continue supporting this use-case. --- cosmos/dbt/graph.py | 34 +++++++++++++----------- tests/dbt/test_graph.py | 57 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 76 insertions(+), 15 deletions(-) diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index 7a957b2fc..be37ec298 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -167,21 +167,25 @@ def parse_dbt_ls_output(project_path: Path | None, ls_stdout: str) -> dict[str, except json.decoder.JSONDecodeError: logger.debug("Skipped dbt ls line: %s", line) else: - node = DbtNode( - unique_id=node_dict["unique_id"], - resource_type=DbtResourceType(node_dict["resource_type"]), - depends_on=node_dict.get("depends_on", {}).get("nodes", []), - file_path=project_path / node_dict["original_file_path"], - tags=node_dict.get("tags", []), - config=node_dict.get("config", {}), - has_freshness=( - is_freshness_effective(node_dict.get("freshness")) - if DbtResourceType(node_dict["resource_type"]) == DbtResourceType.SOURCE - else False - ), - ) - nodes[node.unique_id] = node - logger.debug("Parsed dbt resource `%s` of type `%s`", node.unique_id, node.resource_type) + try: + node = DbtNode( + unique_id=node_dict["unique_id"], + resource_type=DbtResourceType(node_dict["resource_type"]), + depends_on=node_dict.get("depends_on", {}).get("nodes", []), + file_path=project_path / node_dict["original_file_path"], + tags=node_dict.get("tags", []), + config=node_dict.get("config", {}), + has_freshness=( + is_freshness_effective(node_dict.get("freshness")) + if DbtResourceType(node_dict["resource_type"]) == DbtResourceType.SOURCE + else False + ), + ) + except KeyError: + logger.info("Could not parse following the dbt ls line even though it was a valid JSON `%s`", line) + else: + nodes[node.unique_id] = node + logger.debug("Parsed dbt resource `%s` of type `%s`", node.unique_id, node.resource_type) return nodes diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 1c0912042..f5afa06c3 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -1102,6 +1102,63 @@ def test_run_command(mock_popen, stdout, returncode): assert return_value == stdout +def test_parse_dbt_ls_output_real_life_customer_bug(caplog): + dbt_ls_output = """ +11:20:43 Running with dbt=1.7.6 +11:20:45 Registered adapter: bigquery=1.7.2 +11:20:45 Unable to do partial parsing because saved manifest not found. Starting full parse. +/***************************/ +Values returned by mac_get_values: +{} +/***************************/ +{"name": "some_model", "resource_type": "model", "package_name": "some_package", "original_file_path": "models/some_model.sql", "unique_id": "model.some_package.some_model", "alias": "some_model_some_package_1_8_0", "config": {"enabled": true, "alias": "some_model_some_package-1.8.0", "schema": "some_schema", "database": null, "tags": [], "meta": {}, "group": null, "materialized": "view", "incremental_strategy": null, "persist_docs": {}, "post-hook": [], "pre-hook": [], "quoting": {}, "column_types": {}, "full_refresh": null, "unique_key": null, "on_schema_change": "ignore", "on_configuration_change": "apply", "grants": {}, "packages": [], "docs": {"show": true, "node_color": null}, "contract": {"enforced": false, "alias_types": true}, "access": "protected"}, "tags": [], "depends_on": {"macros": [], "nodes": ["source.some_source"]}}""" + + expected_nodes = { + "model.some_package.some_model": DbtNode( + unique_id="model.some_package.some_model", + resource_type=DbtResourceType.MODEL, + file_path=Path("fake-project/models/some_model.sql"), + tags=[], + config={ + "access": "protected", + "alias": "some_model_some_package-1.8.0", + "column_types": {}, + "contract": { + "alias_types": True, + "enforced": False, + }, + "database": None, + "docs": { + "node_color": None, + "show": True, + }, + "enabled": True, + "full_refresh": None, + "grants": {}, + "group": None, + "incremental_strategy": None, + "materialized": "view", + "meta": {}, + "on_configuration_change": "apply", + "on_schema_change": "ignore", + "packages": [], + "persist_docs": {}, + "post-hook": [], + "pre-hook": [], + "quoting": {}, + "schema": "some_schema", + "tags": [], + "unique_key": None, + }, + depends_on=["source.some_source"], + ), + } + nodes = parse_dbt_ls_output(Path("fake-project"), dbt_ls_output) + + assert expected_nodes == nodes + assert "Could not parse following the dbt ls line even though it was a valid JSON `{}" in caplog.text + + def test_parse_dbt_ls_output(): fake_ls_stdout = '{"resource_type": "model", "name": "fake-name", "original_file_path": "fake-file-path.sql", "unique_id": "fake-unique-id", "tags": [], "config": {}}'