Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract file-finding for partition info reading. #196

Merged
merged 3 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 3 additions & 10 deletions src/hipscat/catalog/association_catalog/association_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,9 @@ def _read_args(
cls, catalog_base_dir: FilePointer, storage_options: Union[Dict[Any, Any], None] = None
) -> Tuple[CatalogInfoClass, PixelInputTypes, JoinPixelInputTypes]: # type: ignore[override]
args = super()._read_args(catalog_base_dir, storage_options=storage_options)
metadata_file = paths.get_parquet_metadata_pointer(catalog_base_dir)
if file_io.does_file_or_directory_exist(metadata_file, storage_options=storage_options):
partition_join_info = PartitionJoinInfo.read_from_file(
metadata_file, storage_options=storage_options
)
else:
partition_join_info_file = paths.get_partition_join_info_pointer(catalog_base_dir)
partition_join_info = PartitionJoinInfo.read_from_csv(
partition_join_info_file, storage_options=storage_options
)
partition_join_info = PartitionJoinInfo.read_from_dir(
catalog_base_dir, storage_options=storage_options
)
return args + (partition_join_info,)

@classmethod
Expand Down
64 changes: 63 additions & 1 deletion src/hipscat/catalog/association_catalog/partition_join_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
import pyarrow as pa
from typing_extensions import Self

from hipscat.io import FilePointer, file_io
from hipscat.catalog.partition_info import PartitionInfo
from hipscat.io import FilePointer, file_io, paths
from hipscat.io.parquet_metadata import (
read_row_group_fragments,
row_group_stat_single_value,
Expand Down Expand Up @@ -91,6 +92,67 @@ def write_to_metadata_files(self, catalog_path: FilePointer, storage_options: di

write_parquet_metadata_for_batches(batches, catalog_path, storage_options)

def write_to_csv(self, catalog_path: FilePointer, storage_options: dict = None):
"""Write all partition data to CSV files.

Two files will be written::
- partition_info.csv - covers all primary catalog pixels, and should match the file structure
- partition_join_info.csv - covers all pairwise relationships between primary and
join catalogs.

Args:
catalog_path: FilePointer to the directory where the
`partition_join_info.csv` file will be written
storage_options (dict): dictionary that contains abstract filesystem credentials
"""
partition_join_info_file = paths.get_partition_join_info_pointer(catalog_path)
file_io.write_dataframe_to_csv(
self.data_frame, partition_join_info_file, index=False, storage_options=storage_options
)

primary_pixels = self.primary_to_join_map().keys()
partition_info_pointer = paths.get_partition_info_pointer(catalog_path)
partition_info = PartitionInfo.from_healpix(primary_pixels)
file_io.write_dataframe_to_csv(
partition_info.as_dataframe(),
partition_info_pointer,
index=False,
storage_options=storage_options,
)
delucchi-cmu marked this conversation as resolved.
Show resolved Hide resolved

@classmethod
def read_from_dir(cls, catalog_base_dir: FilePointer, storage_options: dict = None) -> Self:
"""Read partition join info from a file within a hipscat directory.

This will look for a `_metadata` file, and if not found, will look for
a `partition_join_info.csv` file.

Args:
catalog_base_dir: path to the root directory of the catalog
storage_options (dict): dictionary that contains abstract filesystem credentials

Returns:
A `PartitionInfo` object with the data from the file
delucchi-cmu marked this conversation as resolved.
Show resolved Hide resolved

Raises:
FileNotFoundError: if neither desired file is found in the catalog_base_dir
"""
metadata_file = paths.get_parquet_metadata_pointer(catalog_base_dir)
partition_join_info_file = paths.get_partition_join_info_pointer(catalog_base_dir)
if file_io.does_file_or_directory_exist(metadata_file, storage_options=storage_options):
partition_join_info = PartitionJoinInfo.read_from_file(
metadata_file, storage_options=storage_options
)
elif file_io.does_file_or_directory_exist(partition_join_info_file, storage_options=storage_options):
partition_join_info = PartitionJoinInfo.read_from_csv(
partition_join_info_file, storage_options=storage_options
)
else:
raise FileNotFoundError(
f"_metadata or partition join info file is required in catalog directory {catalog_base_dir}"
)
return partition_join_info

@classmethod
def read_from_file(
cls, metadata_file: FilePointer, strict=False, storage_options: Union[Dict[Any, Any], None] = None
Expand Down
7 changes: 1 addition & 6 deletions src/hipscat/catalog/healpix_dataset/healpix_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,7 @@ def _read_args(
storage_options: Union[Dict[Any, Any], None] = None,
) -> Tuple[CatalogInfoClass, PartitionInfo]:
args = super()._read_args(catalog_base_dir, storage_options=storage_options)
metadata_file = paths.get_parquet_metadata_pointer(catalog_base_dir)
if file_io.does_file_or_directory_exist(metadata_file, storage_options=storage_options):
partition_info = PartitionInfo.read_from_file(metadata_file, storage_options=storage_options)
else:
partition_info_file = paths.get_partition_info_pointer(catalog_base_dir)
partition_info = PartitionInfo.read_from_csv(partition_info_file, storage_options=storage_options)
partition_info = PartitionInfo.read_from_dir(catalog_base_dir, storage_options=storage_options)
return args + (partition_info,)

@classmethod
Expand Down
31 changes: 30 additions & 1 deletion src/hipscat/catalog/partition_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import pandas as pd
import pyarrow as pa

from hipscat.io import FilePointer, file_io
from hipscat.io import FilePointer, file_io, paths
from hipscat.io.parquet_metadata import (
read_row_group_fragments,
row_group_stat_single_value,
Expand Down Expand Up @@ -75,6 +75,35 @@ def write_to_metadata_files(self, catalog_path: FilePointer, storage_options: di

write_parquet_metadata_for_batches(batches, catalog_path, storage_options)

@classmethod
def read_from_dir(cls, catalog_base_dir: FilePointer, storage_options: dict = None) -> PartitionInfo:
delucchi-cmu marked this conversation as resolved.
Show resolved Hide resolved
"""Read partition info from a file within a hipscat directory.

This will look for a `_metadata` file, and if not found, will look for
a `partition_info.csv` file.

Args:
catalog_base_dir: path to the root directory of the catalog
storage_options (dict): dictionary that contains abstract filesystem credentials

Returns:
A `PartitionInfo` object with the data from the file

Raises:
FileNotFoundError: if neither desired file is found in the catalog_base_dir
"""
metadata_file = paths.get_parquet_metadata_pointer(catalog_base_dir)
partition_info_file = paths.get_partition_info_pointer(catalog_base_dir)
if file_io.does_file_or_directory_exist(metadata_file, storage_options=storage_options):
partition_info = PartitionInfo.read_from_file(metadata_file, storage_options=storage_options)
elif file_io.does_file_or_directory_exist(partition_info_file, storage_options=storage_options):
partition_info = PartitionInfo.read_from_csv(partition_info_file, storage_options=storage_options)
else:
raise FileNotFoundError(
f"_metadata or partition info file is required in catalog directory {catalog_base_dir}"
)
return partition_info

@classmethod
def read_from_file(
cls, metadata_file: FilePointer, strict=False, storage_options: dict = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,35 @@ def test_empty_directory(tmp_path, association_catalog_info_data, association_ca

## Now we create the needed _metadata and everything is right.
part_info = PartitionJoinInfo(association_catalog_join_pixels)
part_info.write_to_metadata_files(
catalog_path=catalog_path,
)
part_info.write_to_metadata_files(catalog_path=catalog_path)
catalog = AssociationCatalog.read_from_hipscat(catalog_path)
assert catalog.catalog_name == association_catalog_info_data["catalog_name"]


def test_csv_round_trip(tmp_path, association_catalog_info_data, association_catalog_join_pixels):
## Path doesn't exist
with pytest.raises(FileNotFoundError):
AssociationCatalog.read_from_hipscat(os.path.join("path", "empty"))

catalog_path = os.path.join(tmp_path, "empty")
os.makedirs(catalog_path, exist_ok=True)

file_name = os.path.join(catalog_path, "catalog_info.json")
with open(file_name, "w", encoding="utf-8") as metadata_file:
metadata_file.write(json.dumps(association_catalog_info_data))

with pytest.raises(FileNotFoundError, match="partition"):
AssociationCatalog.read_from_hipscat(catalog_path)

file_name = os.path.join(catalog_path, "partition_info.csv")
with open(file_name, "w", encoding="utf-8") as metadata_file:
# dump some garbage in there - just needs to exist.
metadata_file.write(json.dumps(association_catalog_info_data))
with pytest.raises(FileNotFoundError, match="partition"):
AssociationCatalog.read_from_hipscat(catalog_path)

part_info = PartitionJoinInfo(association_catalog_join_pixels)
part_info.write_to_csv(catalog_path=catalog_path)

catalog = AssociationCatalog.read_from_hipscat(catalog_path)
pd.testing.assert_frame_equal(catalog.get_join_pixels(), association_catalog_join_pixels)
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,20 @@ def test_read_from_metadata_fail(tmp_path):
PartitionJoinInfo.read_from_file(metadata_filename, strict=True)


def test_load_partition_info_from_dir_fail(tmp_path):
delucchi-cmu marked this conversation as resolved.
Show resolved Hide resolved
empty_dataframe = pd.DataFrame()
metadata_filename = os.path.join(tmp_path, "empty_metadata.parquet")
empty_dataframe.to_parquet(metadata_filename)
with pytest.raises(FileNotFoundError, match="_metadata or partition join info"):
PartitionJoinInfo.read_from_dir(tmp_path)

# The file is there, but doesn't have the required content.
metadata_filename = os.path.join(tmp_path, "_metadata")
empty_dataframe.to_parquet(metadata_filename)
with pytest.raises(ValueError, match="missing columns"):
PartitionJoinInfo.read_from_dir(tmp_path)


def test_primary_to_join_map(association_catalog_join_pixels):
info = PartitionJoinInfo(association_catalog_join_pixels)
pd.testing.assert_frame_equal(info.data_frame, association_catalog_join_pixels)
Expand All @@ -81,6 +95,16 @@ def test_metadata_file_round_trip(association_catalog_join_pixels, tmp_path):
pd.testing.assert_frame_equal(new_info.data_frame, association_catalog_join_pixels)


def test_csv_file_round_trip(association_catalog_join_pixels, tmp_path):
info = PartitionJoinInfo(association_catalog_join_pixels)
pd.testing.assert_frame_equal(info.data_frame, association_catalog_join_pixels)
info.write_to_csv(tmp_path)

file_pointer = file_io.get_file_pointer_from_path(os.path.join(tmp_path, "partition_join_info.csv"))
new_info = PartitionJoinInfo.read_from_csv(file_pointer)
pd.testing.assert_frame_equal(new_info.data_frame, association_catalog_join_pixels)


def test_read_from_csv(association_catalog_partition_join_file, association_catalog_join_pixels):
file_pointer = file_io.get_file_pointer_from_path(association_catalog_partition_join_file)
info = PartitionJoinInfo.read_from_csv(file_pointer)
Expand Down
14 changes: 14 additions & 0 deletions tests/hipscat/catalog/test_partition_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,20 @@ def test_load_partition_info_from_metadata_fail(tmp_path):
PartitionInfo.read_from_file(metadata_filename, strict=True)


def test_load_partition_info_from_dir_fail(tmp_path):
empty_dataframe = pd.DataFrame()
metadata_filename = os.path.join(tmp_path, "empty_metadata.parquet")
empty_dataframe.to_parquet(metadata_filename)
with pytest.raises(FileNotFoundError, match="_metadata or partition info"):
PartitionInfo.read_from_dir(tmp_path)

# The file is there, but doesn't have the required content.
metadata_filename = os.path.join(tmp_path, "_metadata")
empty_dataframe.to_parquet(metadata_filename)
with pytest.raises(ValueError, match="missing Norder"):
PartitionInfo.read_from_dir(tmp_path)


def test_load_partition_info_small_sky_order1(small_sky_order1_dir):
"""Instantiate the partition info for catalog with 4 pixels"""
partition_info_file = paths.get_parquet_metadata_pointer(small_sky_order1_dir)
Expand Down
Loading