Skip to content

Commit

Permalink
Merge pull request #370 from smart-on-fhir/mikix/duplicate-ordering
Browse files Browse the repository at this point in the history
Always prefer resource rows that come later
  • Loading branch information
mikix authored Jan 7, 2025
2 parents d7ad78e + f106ff7 commit 9b6c8ec
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 7 deletions.
8 changes: 7 additions & 1 deletion cumulus_etl/etl/tasks/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,13 @@ def is_unique(row):
id_set.add(row_id)
return True

return [row for row in rows if is_unique(row)]
# Uniquify in reverse, so that later rows will be preferred.
# This makes it easy to throw updates of source data alongside the originals,
# by either appending to an ndjson file or adding new files that sort later.
rows = [row for row in reversed(rows) if is_unique(row)]
# But keep original row ordering for cleanliness of ndjson output-format ordering.
rows.reverse()
return rows

def _write_one_table_batch(self, rows: list[dict], table_index: int, batch_index: int) -> bool:
# Checkpoint scrubber data before writing to the store, because if we get interrupted, it's safer to have an
Expand Down
12 changes: 12 additions & 0 deletions tests/etl/test_etl_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,14 +455,26 @@ async def test_etl_job_deltalake(self):
{
"_delta_log/00000000000000000000.json", # create
"_delta_log/.00000000000000000000.json.crc",
"_delta_log/00000000000000000000.crc",
"_delta_log/.00000000000000000000.crc.crc",
"_delta_log/00000000000000000001.json", # merge
"_delta_log/.00000000000000000001.json.crc",
"_delta_log/00000000000000000001.crc",
"_delta_log/.00000000000000000001.crc.crc",
"_delta_log/00000000000000000002.json", # optimize
"_delta_log/.00000000000000000002.json.crc",
"_delta_log/00000000000000000002.crc",
"_delta_log/.00000000000000000002.crc.crc",
"_delta_log/00000000000000000003.json", # vacuum start
"_delta_log/.00000000000000000003.json.crc",
"_delta_log/00000000000000000003.crc",
"_delta_log/.00000000000000000003.crc.crc",
"_delta_log/00000000000000000004.json", # vacuum end
"_delta_log/.00000000000000000004.json.crc",
"_delta_log/00000000000000000004.crc",
"_delta_log/.00000000000000000004.crc.crc",
"_delta_log/_last_vacuum_info",
"_delta_log/._last_vacuum_info.crc",
"_symlink_format_manifest/manifest",
"_symlink_format_manifest/.manifest.crc",
},
Expand Down
21 changes: 15 additions & 6 deletions tests/etl/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,19 +80,28 @@ def test_task_selection_ordering(self, user_tasks, expected_tasks):
async def test_drop_duplicates(self):
"""Verify that we run() will drop duplicate rows inside an input batch."""
# Two "A" ids and one "B" id
self.make_json("Patient", "A")
self.make_json("Patient", "A")
self.make_json("Patient", "A", birthDate="2020")
self.make_json("Patient", "A", birthDate="2021")
self.make_json("Patient", "B")

await basic_tasks.PatientTask(self.job_config, self.scrubber).run()

# Confirm that only one version of patient A got stored
# Confirm that only the later version of patient A got stored
self.assertEqual(1, self.format.write_records.call_count)
batch = self.format.write_records.call_args[0][0]
self.assertEqual(2, len(batch.rows))
self.assertEqual(
{self.codebook.db.patient("A"), self.codebook.db.patient("B")},
{row["id"] for row in batch.rows},
batch.rows,
[ # Output ordering is guaranteed to be stable
{
"resourceType": "Patient",
"id": self.codebook.db.patient("A"),
"birthDate": "2021", # the row that came later won
},
{
"resourceType": "Patient",
"id": self.codebook.db.patient("B"),
},
],
)

async def test_batch_write_errors_saved(self):
Expand Down

0 comments on commit 9b6c8ec

Please sign in to comment.