From c9143ee95ac6ec00e543c7baeee974188da69bcd Mon Sep 17 00:00:00 2001 From: delucchi-cmu Date: Mon, 29 Jan 2024 11:17:12 -0500 Subject: [PATCH] Add more csv implementation and testing. --- .../partition_join_info.py | 29 ++++++++++++++++ .../test_association_catalog.py | 33 +++++++++++++++++-- .../test_partition_join_info.py | 10 ++++++ 3 files changed, 69 insertions(+), 3 deletions(-) diff --git a/src/hipscat/catalog/association_catalog/partition_join_info.py b/src/hipscat/catalog/association_catalog/partition_join_info.py index 0e31b6f7..3707ec90 100644 --- a/src/hipscat/catalog/association_catalog/partition_join_info.py +++ b/src/hipscat/catalog/association_catalog/partition_join_info.py @@ -7,6 +7,7 @@ import pyarrow as pa from typing_extensions import Self +from hipscat.catalog.partition_info import PartitionInfo from hipscat.io import FilePointer, file_io, paths from hipscat.io.parquet_metadata import ( read_row_group_fragments, @@ -91,6 +92,34 @@ def write_to_metadata_files(self, catalog_path: FilePointer, storage_options: di write_parquet_metadata_for_batches(batches, catalog_path, storage_options) + def write_to_csv(self, catalog_path: FilePointer, storage_options: dict = None): + """Write all partition data to CSV files. + + Two files will be written:: + - partition_info.csv - covers all primary catalog pixels, and should match the file structure + - partition_join_info.csv - covers all pairwise relationships between primary and + join catalogs. + + Args: + catalog_path: FilePointer to the directory where the + `partition_join_info.csv` file will be written + storage_options (dict): dictionary that contains abstract filesystem credentials + """ + partition_join_info_file = paths.get_partition_join_info_pointer(catalog_path) + file_io.write_dataframe_to_csv( + self.data_frame, partition_join_info_file, index=False, storage_options=storage_options + ) + + primary_pixels = self.primary_to_join_map().keys() + partition_info_pointer = paths.get_partition_info_pointer(catalog_path) + partition_info = PartitionInfo.from_healpix(primary_pixels) + file_io.write_dataframe_to_csv( + partition_info.as_dataframe(), + partition_info_pointer, + index=False, + storage_options=storage_options, + ) + @classmethod def read_from_dir(cls, catalog_base_dir: FilePointer, storage_options: dict = None) -> Self: """Read partition join info from a file within a hipscat directory. diff --git a/tests/hipscat/catalog/association_catalog/test_association_catalog.py b/tests/hipscat/catalog/association_catalog/test_association_catalog.py index 3a61bf50..1e7e63a7 100644 --- a/tests/hipscat/catalog/association_catalog/test_association_catalog.py +++ b/tests/hipscat/catalog/association_catalog/test_association_catalog.py @@ -89,8 +89,35 @@ def test_empty_directory(tmp_path, association_catalog_info_data, association_ca ## Now we create the needed _metadata and everything is right. part_info = PartitionJoinInfo(association_catalog_join_pixels) - part_info.write_to_metadata_files( - catalog_path=catalog_path, - ) + part_info.write_to_metadata_files(catalog_path=catalog_path) catalog = AssociationCatalog.read_from_hipscat(catalog_path) assert catalog.catalog_name == association_catalog_info_data["catalog_name"] + + +def test_csv_round_trip(tmp_path, association_catalog_info_data, association_catalog_join_pixels): + ## Path doesn't exist + with pytest.raises(FileNotFoundError): + AssociationCatalog.read_from_hipscat(os.path.join("path", "empty")) + + catalog_path = os.path.join(tmp_path, "empty") + os.makedirs(catalog_path, exist_ok=True) + + file_name = os.path.join(catalog_path, "catalog_info.json") + with open(file_name, "w", encoding="utf-8") as metadata_file: + metadata_file.write(json.dumps(association_catalog_info_data)) + + with pytest.raises(FileNotFoundError, match="partition"): + AssociationCatalog.read_from_hipscat(catalog_path) + + file_name = os.path.join(catalog_path, "partition_info.csv") + with open(file_name, "w", encoding="utf-8") as metadata_file: + # dump some garbage in there - just needs to exist. + metadata_file.write(json.dumps(association_catalog_info_data)) + with pytest.raises(FileNotFoundError, match="partition"): + AssociationCatalog.read_from_hipscat(catalog_path) + + part_info = PartitionJoinInfo(association_catalog_join_pixels) + part_info.write_to_csv(catalog_path=catalog_path) + + catalog = AssociationCatalog.read_from_hipscat(catalog_path) + pd.testing.assert_frame_equal(catalog.get_join_pixels(), association_catalog_join_pixels) diff --git a/tests/hipscat/catalog/association_catalog/test_partition_join_info.py b/tests/hipscat/catalog/association_catalog/test_partition_join_info.py index 72ffda54..66d21325 100644 --- a/tests/hipscat/catalog/association_catalog/test_partition_join_info.py +++ b/tests/hipscat/catalog/association_catalog/test_partition_join_info.py @@ -95,6 +95,16 @@ def test_metadata_file_round_trip(association_catalog_join_pixels, tmp_path): pd.testing.assert_frame_equal(new_info.data_frame, association_catalog_join_pixels) +def test_csv_file_round_trip(association_catalog_join_pixels, tmp_path): + info = PartitionJoinInfo(association_catalog_join_pixels) + pd.testing.assert_frame_equal(info.data_frame, association_catalog_join_pixels) + info.write_to_csv(tmp_path) + + file_pointer = file_io.get_file_pointer_from_path(os.path.join(tmp_path, "partition_join_info.csv")) + new_info = PartitionJoinInfo.read_from_csv(file_pointer) + pd.testing.assert_frame_equal(new_info.data_frame, association_catalog_join_pixels) + + def test_read_from_csv(association_catalog_partition_join_file, association_catalog_join_pixels): file_pointer = file_io.get_file_pointer_from_path(association_catalog_partition_join_file) info = PartitionJoinInfo.read_from_csv(file_pointer)