Skip to content

Commit

Permalink
Add more csv implementation and testing.
Browse files Browse the repository at this point in the history
  • Loading branch information
delucchi-cmu committed Jan 29, 2024
1 parent 092aeeb commit c9143ee
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 3 deletions.
29 changes: 29 additions & 0 deletions src/hipscat/catalog/association_catalog/partition_join_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pyarrow as pa
from typing_extensions import Self

from hipscat.catalog.partition_info import PartitionInfo
from hipscat.io import FilePointer, file_io, paths
from hipscat.io.parquet_metadata import (
read_row_group_fragments,
Expand Down Expand Up @@ -91,6 +92,34 @@ def write_to_metadata_files(self, catalog_path: FilePointer, storage_options: di

write_parquet_metadata_for_batches(batches, catalog_path, storage_options)

def write_to_csv(self, catalog_path: FilePointer, storage_options: dict = None):
"""Write all partition data to CSV files.
Two files will be written::
- partition_info.csv - covers all primary catalog pixels, and should match the file structure
- partition_join_info.csv - covers all pairwise relationships between primary and
join catalogs.
Args:
catalog_path: FilePointer to the directory where the
`partition_join_info.csv` file will be written
storage_options (dict): dictionary that contains abstract filesystem credentials
"""
partition_join_info_file = paths.get_partition_join_info_pointer(catalog_path)
file_io.write_dataframe_to_csv(
self.data_frame, partition_join_info_file, index=False, storage_options=storage_options
)

primary_pixels = self.primary_to_join_map().keys()
partition_info_pointer = paths.get_partition_info_pointer(catalog_path)
partition_info = PartitionInfo.from_healpix(primary_pixels)
file_io.write_dataframe_to_csv(
partition_info.as_dataframe(),
partition_info_pointer,
index=False,
storage_options=storage_options,
)

@classmethod
def read_from_dir(cls, catalog_base_dir: FilePointer, storage_options: dict = None) -> Self:
"""Read partition join info from a file within a hipscat directory.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,35 @@ def test_empty_directory(tmp_path, association_catalog_info_data, association_ca

## Now we create the needed _metadata and everything is right.
part_info = PartitionJoinInfo(association_catalog_join_pixels)
part_info.write_to_metadata_files(
catalog_path=catalog_path,
)
part_info.write_to_metadata_files(catalog_path=catalog_path)
catalog = AssociationCatalog.read_from_hipscat(catalog_path)
assert catalog.catalog_name == association_catalog_info_data["catalog_name"]


def test_csv_round_trip(tmp_path, association_catalog_info_data, association_catalog_join_pixels):
## Path doesn't exist
with pytest.raises(FileNotFoundError):
AssociationCatalog.read_from_hipscat(os.path.join("path", "empty"))

catalog_path = os.path.join(tmp_path, "empty")
os.makedirs(catalog_path, exist_ok=True)

file_name = os.path.join(catalog_path, "catalog_info.json")
with open(file_name, "w", encoding="utf-8") as metadata_file:
metadata_file.write(json.dumps(association_catalog_info_data))

with pytest.raises(FileNotFoundError, match="partition"):
AssociationCatalog.read_from_hipscat(catalog_path)

file_name = os.path.join(catalog_path, "partition_info.csv")
with open(file_name, "w", encoding="utf-8") as metadata_file:
# dump some garbage in there - just needs to exist.
metadata_file.write(json.dumps(association_catalog_info_data))
with pytest.raises(FileNotFoundError, match="partition"):
AssociationCatalog.read_from_hipscat(catalog_path)

part_info = PartitionJoinInfo(association_catalog_join_pixels)
part_info.write_to_csv(catalog_path=catalog_path)

catalog = AssociationCatalog.read_from_hipscat(catalog_path)
pd.testing.assert_frame_equal(catalog.get_join_pixels(), association_catalog_join_pixels)
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,16 @@ def test_metadata_file_round_trip(association_catalog_join_pixels, tmp_path):
pd.testing.assert_frame_equal(new_info.data_frame, association_catalog_join_pixels)


def test_csv_file_round_trip(association_catalog_join_pixels, tmp_path):
info = PartitionJoinInfo(association_catalog_join_pixels)
pd.testing.assert_frame_equal(info.data_frame, association_catalog_join_pixels)
info.write_to_csv(tmp_path)

file_pointer = file_io.get_file_pointer_from_path(os.path.join(tmp_path, "partition_join_info.csv"))
new_info = PartitionJoinInfo.read_from_csv(file_pointer)
pd.testing.assert_frame_equal(new_info.data_frame, association_catalog_join_pixels)


def test_read_from_csv(association_catalog_partition_join_file, association_catalog_join_pixels):
file_pointer = file_io.get_file_pointer_from_path(association_catalog_partition_join_file)
info = PartitionJoinInfo.read_from_csv(file_pointer)
Expand Down

0 comments on commit c9143ee

Please sign in to comment.