Skip to content

Commit

Permalink
Fix parsing dbt ls outputs that contain JSONs that are not dbt nodes (#…
Browse files Browse the repository at this point in the history
…1296)

This change makes Cosmos more resilient, allowing it to be used even
when JSONs do not represent dbt nodes in the `dbt ls` output.

**Context**

An Astronomer customer [raised a P1
incident](https://astronomer.zendesk.com/agent/tickets/67681),
mentioning they could no longer run their Cosmos-powered DAGs.

They were using Cosmos 1.5.0, and the issue was observed whenever DAGs
were deployed using `Astro deploy --dags`, even if they only had
whitespace as a difference.

The DAGs could no longer be parsed, raising an exception similar to:

```
    File /usr/local/lib/python3.11/site-packages/cosmos/dbt/graph.py, line 135, in parse_dbt_ls_output
        unique_id=node_dict[unique_id]
    KeyError: 'unique_id'
```

**Explanation**

The customer recently changed their dbt project, adding print debug
statements to one of their dbt macros.

This caused the dbt ls output to contain lines that were valid JSON but
were not valid dbt nodes, as observed in:
```
11:20:43  Running with dbt=1.7.6
11:20:45  Registered adapter: bigquery=1.7.2
11:20:45  Unable to do partial parsing because saved manifest not found. Starting full parse.
/***************************/
Values returned by mac_get_values:
{}
/***************************/
{"name": "some_model", "resource_type": "model", "package_name": "some_package", "original_file_path": "models/some_model.sql", "unique_id": "model.some_package.some_model", "alias": "some_model_some_package_1_8_0", "config": {"enabled": true, "alias": "some_model_some_package-1.8.0", "schema": "some_schema", "database": null, "tags": [], "meta": {}, "group": null, "materialized": "view", "incremental_strategy": null, "persist_docs": {}, "post-hook": [], "pre-hook": [], "quoting": {}, "column_types": {}, "full_refresh": null, "unique_key": null, "on_schema_change": "ignore", "on_configuration_change": "apply", "grants": {}, "packages": [], "docs": {"show": true, "node_color": null}, "contract": {"enforced": false, "alias_types": true}, "access": "protected"}, "tags": [], "depends_on": {"macros": [], "nodes": ["source.some_source"]}}"""
```

Cosmos didn't consider this use case. It assumed if a line was a JSON,
it should be a dbt node:

https://github.com/astronomer/astronomer-cosmos/blob/42a397fb40ff537c74bb6f596b4936815b14abbb/cosmos/dbt/graph.py#L161-L185

**Workaround**

If customers updated the macro to print the information in a single
line, they'd no longer observe the issue:
```
Values returned by mac_get_values: {}
```

We also released
[1.5.0rc2](https://github.com/astronomer/astronomer-cosmos/releases/tag/astronomer-cosmos-v1.5.0rc2)
with the change #1295, similar to the one introduced by this PR.

**Fix**

This change makes Cosmos more resilient to scenarios where `dbt ls` may
output JSON lines that are not valid dbt nodes. It also logs those lines
to help troubleshoot. We added a unit test to make sure we continue
supporting this use-case.
  • Loading branch information
tatiana authored Oct 31, 2024
1 parent 42a397f commit cee1389
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 15 deletions.
34 changes: 19 additions & 15 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,21 +167,25 @@ def parse_dbt_ls_output(project_path: Path | None, ls_stdout: str) -> dict[str,
except json.decoder.JSONDecodeError:
logger.debug("Skipped dbt ls line: %s", line)
else:
node = DbtNode(
unique_id=node_dict["unique_id"],
resource_type=DbtResourceType(node_dict["resource_type"]),
depends_on=node_dict.get("depends_on", {}).get("nodes", []),
file_path=project_path / node_dict["original_file_path"],
tags=node_dict.get("tags", []),
config=node_dict.get("config", {}),
has_freshness=(
is_freshness_effective(node_dict.get("freshness"))
if DbtResourceType(node_dict["resource_type"]) == DbtResourceType.SOURCE
else False
),
)
nodes[node.unique_id] = node
logger.debug("Parsed dbt resource `%s` of type `%s`", node.unique_id, node.resource_type)
try:
node = DbtNode(
unique_id=node_dict["unique_id"],
resource_type=DbtResourceType(node_dict["resource_type"]),
depends_on=node_dict.get("depends_on", {}).get("nodes", []),
file_path=project_path / node_dict["original_file_path"],
tags=node_dict.get("tags", []),
config=node_dict.get("config", {}),
has_freshness=(
is_freshness_effective(node_dict.get("freshness"))
if DbtResourceType(node_dict["resource_type"]) == DbtResourceType.SOURCE
else False
),
)
except KeyError:
logger.info("Could not parse following the dbt ls line even though it was a valid JSON `%s`", line)
else:
nodes[node.unique_id] = node
logger.debug("Parsed dbt resource `%s` of type `%s`", node.unique_id, node.resource_type)
return nodes


Expand Down
57 changes: 57 additions & 0 deletions tests/dbt/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -1102,6 +1102,63 @@ def test_run_command(mock_popen, stdout, returncode):
assert return_value == stdout


def test_parse_dbt_ls_output_real_life_customer_bug(caplog):
dbt_ls_output = """
11:20:43 Running with dbt=1.7.6
11:20:45 Registered adapter: bigquery=1.7.2
11:20:45 Unable to do partial parsing because saved manifest not found. Starting full parse.
/***************************/
Values returned by mac_get_values:
{}
/***************************/
{"name": "some_model", "resource_type": "model", "package_name": "some_package", "original_file_path": "models/some_model.sql", "unique_id": "model.some_package.some_model", "alias": "some_model_some_package_1_8_0", "config": {"enabled": true, "alias": "some_model_some_package-1.8.0", "schema": "some_schema", "database": null, "tags": [], "meta": {}, "group": null, "materialized": "view", "incremental_strategy": null, "persist_docs": {}, "post-hook": [], "pre-hook": [], "quoting": {}, "column_types": {}, "full_refresh": null, "unique_key": null, "on_schema_change": "ignore", "on_configuration_change": "apply", "grants": {}, "packages": [], "docs": {"show": true, "node_color": null}, "contract": {"enforced": false, "alias_types": true}, "access": "protected"}, "tags": [], "depends_on": {"macros": [], "nodes": ["source.some_source"]}}"""

expected_nodes = {
"model.some_package.some_model": DbtNode(
unique_id="model.some_package.some_model",
resource_type=DbtResourceType.MODEL,
file_path=Path("fake-project/models/some_model.sql"),
tags=[],
config={
"access": "protected",
"alias": "some_model_some_package-1.8.0",
"column_types": {},
"contract": {
"alias_types": True,
"enforced": False,
},
"database": None,
"docs": {
"node_color": None,
"show": True,
},
"enabled": True,
"full_refresh": None,
"grants": {},
"group": None,
"incremental_strategy": None,
"materialized": "view",
"meta": {},
"on_configuration_change": "apply",
"on_schema_change": "ignore",
"packages": [],
"persist_docs": {},
"post-hook": [],
"pre-hook": [],
"quoting": {},
"schema": "some_schema",
"tags": [],
"unique_key": None,
},
depends_on=["source.some_source"],
),
}
nodes = parse_dbt_ls_output(Path("fake-project"), dbt_ls_output)

assert expected_nodes == nodes
assert "Could not parse following the dbt ls line even though it was a valid JSON `{}" in caplog.text


def test_parse_dbt_ls_output():
fake_ls_stdout = '{"resource_type": "model", "name": "fake-name", "original_file_path": "fake-file-path.sql", "unique_id": "fake-unique-id", "tags": [], "config": {}}'

Expand Down

0 comments on commit cee1389

Please sign in to comment.