diff --git a/cumulus_etl/etl/tasks/base.py b/cumulus_etl/etl/tasks/base.py index d6b2163..5091fc5 100644 --- a/cumulus_etl/etl/tasks/base.py +++ b/cumulus_etl/etl/tasks/base.py @@ -326,7 +326,13 @@ def is_unique(row): id_set.add(row_id) return True - return [row for row in rows if is_unique(row)] + # Uniquify in reverse, so that later rows will be preferred. + # This makes it easy to throw updates of source data alongside the originals, + # by either appending to an ndjson file or adding new files that sort later. + rows = [row for row in reversed(rows) if is_unique(row)] + # But keep original row ordering for cleanliness of ndjson output-format ordering. + rows.reverse() + return rows def _write_one_table_batch(self, rows: list[dict], table_index: int, batch_index: int) -> bool: # Checkpoint scrubber data before writing to the store, because if we get interrupted, it's safer to have an diff --git a/tests/etl/test_etl_cli.py b/tests/etl/test_etl_cli.py index 033d1b8..f388d50 100644 --- a/tests/etl/test_etl_cli.py +++ b/tests/etl/test_etl_cli.py @@ -455,14 +455,26 @@ async def test_etl_job_deltalake(self): { "_delta_log/00000000000000000000.json", # create "_delta_log/.00000000000000000000.json.crc", + "_delta_log/00000000000000000000.crc", + "_delta_log/.00000000000000000000.crc.crc", "_delta_log/00000000000000000001.json", # merge "_delta_log/.00000000000000000001.json.crc", + "_delta_log/00000000000000000001.crc", + "_delta_log/.00000000000000000001.crc.crc", "_delta_log/00000000000000000002.json", # optimize "_delta_log/.00000000000000000002.json.crc", + "_delta_log/00000000000000000002.crc", + "_delta_log/.00000000000000000002.crc.crc", "_delta_log/00000000000000000003.json", # vacuum start "_delta_log/.00000000000000000003.json.crc", + "_delta_log/00000000000000000003.crc", + "_delta_log/.00000000000000000003.crc.crc", "_delta_log/00000000000000000004.json", # vacuum end "_delta_log/.00000000000000000004.json.crc", + "_delta_log/00000000000000000004.crc", + "_delta_log/.00000000000000000004.crc.crc", + "_delta_log/_last_vacuum_info", + "_delta_log/._last_vacuum_info.crc", "_symlink_format_manifest/manifest", "_symlink_format_manifest/.manifest.crc", }, diff --git a/tests/etl/test_tasks.py b/tests/etl/test_tasks.py index 03ea31e..55fdb27 100644 --- a/tests/etl/test_tasks.py +++ b/tests/etl/test_tasks.py @@ -80,19 +80,28 @@ def test_task_selection_ordering(self, user_tasks, expected_tasks): async def test_drop_duplicates(self): """Verify that we run() will drop duplicate rows inside an input batch.""" # Two "A" ids and one "B" id - self.make_json("Patient", "A") - self.make_json("Patient", "A") + self.make_json("Patient", "A", birthDate="2020") + self.make_json("Patient", "A", birthDate="2021") self.make_json("Patient", "B") await basic_tasks.PatientTask(self.job_config, self.scrubber).run() - # Confirm that only one version of patient A got stored + # Confirm that only the later version of patient A got stored self.assertEqual(1, self.format.write_records.call_count) batch = self.format.write_records.call_args[0][0] - self.assertEqual(2, len(batch.rows)) self.assertEqual( - {self.codebook.db.patient("A"), self.codebook.db.patient("B")}, - {row["id"] for row in batch.rows}, + batch.rows, + [ # Output ordering is guaranteed to be stable + { + "resourceType": "Patient", + "id": self.codebook.db.patient("A"), + "birthDate": "2021", # the row that came later won + }, + { + "resourceType": "Patient", + "id": self.codebook.db.patient("B"), + }, + ], ) async def test_batch_write_errors_saved(self):