Skip to content

Commit

Permalink
A couple bulk logging improvements
Browse files Browse the repository at this point in the history
- Add some debugging information about client version to the logs
- Parse oddly interweaved logs we come across (where there are multiple
  export events being written at same time - shouldn't happen, but we
  now correctly only grab one thread out)
  • Loading branch information
mikix committed Oct 17, 2024
1 parent 9bce129 commit 484de03
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 14 deletions.
27 changes: 21 additions & 6 deletions cumulus_etl/loaders/fhir/export_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import httpx

import cumulus_etl
from cumulus_etl import common, errors, fhir, store


Expand Down Expand Up @@ -46,12 +47,22 @@ def __init__(self, root: store.Root):
self._parse(root, self._find(root))

def _parse(self, root: store.Root, path: str) -> None:
# Go through every row, looking for the events we care about.
# Note that we parse every kickoff event we hit, for example.
# So we'll end up with the latest one (which works for single-export
# log files with maybe a false start at the beginning).
# Go through every row, looking for the final kickoff event.
# We only want to look at one series of events for one bulk export.
# So we pick the last one, in case there are multiple in the log.
# Those early events might be false starts.
export_id = None
for row in common.read_ndjson(root, path):
if row.get("eventId") == "kickoff":
export_id = row.get("exportId")
if not export_id:
raise self.IncompleteLog(f"No kickoff event found in '{path}'")

# Now read through the log file again, only looking for the events from the one export.
try:
for row in common.read_ndjson(root, path):
if row.get("exportId") != export_id:
continue
match row.get("eventId"):
case "kickoff":
self._parse_kickoff(row)
Expand All @@ -60,8 +71,6 @@ def _parse(self, root: store.Root, path: str) -> None:
except KeyError as exc:
raise self.IncompleteLog(f"Error parsing '{path}'") from exc

if self.group_name is None:
raise self.IncompleteLog(f"No kickoff event found in '{path}'")
if self.export_datetime is None:
raise self.IncompleteLog(f"No status_complete event found in '{path}'")

Expand Down Expand Up @@ -133,6 +142,12 @@ def _event(
"eventId": event_id,
"eventDetail": detail,
}
if event_id == "kickoff":
# The bulk logging spec says we can add whatever other keys we want,
# but does not encourage a namespace to separate them or anything.
# We use a sunder prefix, just in case the spec wants to add new keys itself.
row["_client"] = "cumulus-etl"
row["_clientVersion"] = cumulus_etl.__version__
json.dump(row, f)
f.write("\n")

Expand Down
5 changes: 5 additions & 0 deletions tests/loaders/ndjson/test_bulk_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ def assert_log_equals(self, *rows, num_export_ids: int = 1) -> None:
found_duration = found_rows[complete_index]["eventDetail"]["duration"]
self.assertEqual(found_duration, expected_duration)

# Confirm we add debugging info to the kickoff event
kickoff_index = all_event_ids.index("kickoff")
self.assertEqual(found_rows[kickoff_index]["_client"], "cumulus-etl")
self.assertEqual(found_rows[kickoff_index]["_clientVersion"], "1.0.0+test")

extracted_details = [(x["eventId"], x["eventDetail"]) for x in found_rows]

# Match and reorder download requests/completes because those can be async/non-deterministic
Expand Down
23 changes: 15 additions & 8 deletions tests/loaders/ndjson/test_log_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,21 @@
from tests.utils import AsyncTestCase


def kickoff(group: str) -> dict:
def kickoff(group: str, export_id: str = "test-export") -> dict:
url = f"https://host/Group/{group}" if group else "https://host/"
return {
"eventId": "kickoff",
"exportId": export_id,
"eventDetail": {
"exportUrl": url,
},
}


def status_complete(timestamp: str) -> dict:
def status_complete(timestamp: str, export_id: str = "test-export") -> dict:
return {
"eventId": "status_complete",
"exportId": export_id,
"eventDetail": {
"transactionTime": timestamp,
},
Expand Down Expand Up @@ -76,12 +78,14 @@ def test_no_dir(self):
[kickoff("G"), status_complete("2020-10-17")],
("G", "2020-10-17"),
),
( # multiple rows - we should pick last of each
( # multiple rows - we should pick last events for the last kickoff
[
kickoff("1st"),
kickoff("2nd"),
status_complete("2001-01-01"),
status_complete("2002-02-02"),
kickoff("1st", export_id="1st"),
kickoff("2nd", export_id="2nd"),
# shouldn't be two status completes, but just in case there are, we grab last
status_complete("2002-02-01", export_id="2nd"),
status_complete("2002-02-02", export_id="2nd"),
status_complete("2001-01-01", export_id="1st"),
],
("2nd", "2002-02-02"),
),
Expand All @@ -90,7 +94,10 @@ def test_no_dir(self):
([status_complete("2010-03-09")], BulkExportLogParser.IncompleteLog), # missing group
([kickoff("G")], BulkExportLogParser.IncompleteLog), # missing time
([], BulkExportLogParser.IncompleteLog), # missing all
([{"eventId": "kickoff"}], BulkExportLogParser.IncompleteLog), # missing eventDetail
( # missing eventDetail
[{"eventId": "kickoff", "exportId": "test"}],
BulkExportLogParser.IncompleteLog,
),
( # missing transactionTime
[{"eventId": "status_complete", "eventDetail": {}}],
BulkExportLogParser.IncompleteLog,
Expand Down
2 changes: 2 additions & 0 deletions tests/loaders/ndjson/test_ndjson_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,14 @@ def _write_log_file(path: str, group: str, timestamp: str) -> None:
writer.write(
{
"eventId": "kickoff",
"exportId": "testing",
"eventDetail": {"exportUrl": f"https://host/Group/{group}/$export"},
}
)
writer.write(
{
"eventId": "status_complete",
"exportId": "testing",
"eventDetail": {"transactionTime": timestamp},
}
)
Expand Down

0 comments on commit 484de03

Please sign in to comment.