Skip to content

Commit

Permalink
Merge pull request #165 from astronomy-commons/issue/147/metadata
Browse files Browse the repository at this point in the history
Load partition info from metadata file.
  • Loading branch information
delucchi-cmu authored Nov 20, 2023
2 parents 9b53aeb + 22d61c9 commit 72dd5ba
Show file tree
Hide file tree
Showing 33 changed files with 538 additions and 279 deletions.
27 changes: 19 additions & 8 deletions src/hipscat/catalog/healpix_dataset/healpix_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,18 +79,29 @@ def _get_pixel_tree_from_pixels(pixels: PixelInputTypes) -> PixelTree:

@classmethod
def _read_args(
cls, catalog_base_dir: FilePointer, storage_options: Union[Dict[Any, Any], None] = None
cls,
catalog_base_dir: FilePointer,
storage_options: Union[Dict[Any, Any], None] = None,
) -> Tuple[CatalogInfoClass, PartitionInfo]:
args = super()._read_args(catalog_base_dir, storage_options=storage_options)
partition_info_file = paths.get_partition_info_pointer(catalog_base_dir)
partition_info = PartitionInfo.read_from_file(partition_info_file, storage_options=storage_options)
metadata_file = paths.get_parquet_metadata_pointer(catalog_base_dir)
if file_io.does_file_or_directory_exist(metadata_file, storage_options=storage_options):
partition_info = PartitionInfo.read_from_file(metadata_file, storage_options=storage_options)
else:
partition_info_file = paths.get_partition_info_pointer(catalog_base_dir)
partition_info = PartitionInfo.read_from_csv(partition_info_file, storage_options=storage_options)
return args + (partition_info,)

@classmethod
def _check_files_exist(
cls, catalog_base_dir: FilePointer, storage_options: Union[Dict[Any, Any], None] = None
):
def _check_files_exist(cls, catalog_base_dir: FilePointer, storage_options: dict = None):
super()._check_files_exist(catalog_base_dir, storage_options=storage_options)

partition_info_file = paths.get_partition_info_pointer(catalog_base_dir)
if not file_io.does_file_or_directory_exist(partition_info_file, storage_options=storage_options):
raise FileNotFoundError(f"No partition info found where expected: {str(partition_info_file)}")
metadata_file = paths.get_parquet_metadata_pointer(catalog_base_dir)
if not (
file_io.does_file_or_directory_exist(partition_info_file, storage_options=storage_options)
or file_io.does_file_or_directory_exist(metadata_file, storage_options=storage_options)
):
raise FileNotFoundError(
f"_metadata or partition info file is required in catalog directory {catalog_base_dir}"
)
66 changes: 59 additions & 7 deletions src/hipscat/catalog/partition_info.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
"""Container class to hold per-partition metadata"""
from typing import Any, Dict, List, Union
from __future__ import annotations

from typing import List

import numpy as np
import pandas as pd
import pyarrow as pa

from hipscat.io import FilePointer, file_io
from hipscat.io.parquet_metadata import (
read_row_group_fragments,
row_group_stat_single_value,
write_parquet_metadata_for_batches,
)
from hipscat.pixel_math import HealpixPixel


Expand Down Expand Up @@ -44,15 +52,59 @@ def write_to_file(self, partition_info_file: FilePointer):
"""
file_io.write_dataframe_to_csv(self.as_dataframe(), partition_info_file, index=False)

def write_to_metadata_files(self, catalog_path: FilePointer, storage_options: dict = None):
"""Generate parquet metadata, using the known partitions.
Args:
catalog_path (FilePointer): base path for the catalog
storage_options (dict): dictionary that contains abstract filesystem credentials
"""
batches = [
pa.RecordBatch.from_arrays(
[[pixel.order], [pixel.dir], [pixel.pixel]],
names=[
self.METADATA_ORDER_COLUMN_NAME,
self.METADATA_DIR_COLUMN_NAME,
self.METADATA_PIXEL_COLUMN_NAME,
],
)
for pixel in self.get_healpix_pixels()
]

write_parquet_metadata_for_batches(batches, catalog_path, storage_options)

@classmethod
def read_from_file(cls, metadata_file: FilePointer, storage_options: dict = None) -> PartitionInfo:
"""Read partition info from a `_metadata` file to create an object
Args:
metadata_file (FilePointer): FilePointer to the `_metadata` file
storage_options (dict): dictionary that contains abstract filesystem credentials
Returns:
A `PartitionInfo` object with the data from the file
"""
pixel_list = [
HealpixPixel(
row_group_stat_single_value(row_group, cls.METADATA_ORDER_COLUMN_NAME),
row_group_stat_single_value(row_group, cls.METADATA_PIXEL_COLUMN_NAME),
)
for row_group in read_row_group_fragments(metadata_file, storage_options)
]
## Remove duplicates, preserving order.
## In the case of association partition join info, we may have multiple entries
## for the primary order/pixels.
pixel_list = list(dict.fromkeys(pixel_list))

return cls(pixel_list)

@classmethod
def read_from_file(
cls, partition_info_file: FilePointer, storage_options: Union[Dict[Any, Any], None] = None
):
def read_from_csv(cls, partition_info_file: FilePointer, storage_options: dict = None) -> PartitionInfo:
"""Read partition info from a `partition_info.csv` file to create an object
Args:
partition_info_file: FilePointer to the `partition_info.csv` file
storage_options: dictionary that contains abstract filesystem credentials
partition_info_file (FilePointer): FilePointer to the `partition_info.csv` file
storage_options (dict): dictionary that contains abstract filesystem credentials
Returns:
A `PartitionInfo` object with the data from the file
Expand Down Expand Up @@ -92,7 +144,7 @@ def as_dataframe(self):
return pd.DataFrame.from_dict(partition_info_dict)

@classmethod
def from_healpix(cls, healpix_pixels: List[HealpixPixel]):
def from_healpix(cls, healpix_pixels: List[HealpixPixel]) -> PartitionInfo:
"""Create a partition info object from a list of constituent healpix pixels.
Args:
Expand Down
5 changes: 5 additions & 0 deletions src/hipscat/io/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
"""Utilities for reading and writing catalog files"""

from .file_io import FilePointer, get_file_pointer_from_path
from .parquet_metadata import (
read_row_group_fragments,
row_group_stat_single_value,
write_parquet_metadata_for_batches,
)
from .paths import (
create_hive_directory_name,
create_hive_parquet_file_name,
Expand Down
122 changes: 122 additions & 0 deletions src/hipscat/io/parquet_metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
"""Utility functions for handling parquet metadata files"""
import tempfile
from typing import List

import pyarrow as pa
import pyarrow.dataset as pds
import pyarrow.parquet as pq

from hipscat.io import file_io, paths
from hipscat.io.file_io.file_pointer import get_fs, strip_leading_slash_for_pyarrow


def row_group_stat_single_value(row_group, stat_key: str):
"""Convenience method to find the min and max inside a statistics dictionary,
and raise an error if they're unequal.
Args:
row_group: dataset fragment row group
stat_key (str): column name of interest.
Returns:
The value of the specified row group statistic
"""
if stat_key not in row_group.statistics:
raise ValueError(f"row group doesn't have expected key {stat_key}")
stat_dict = row_group.statistics[stat_key]
min_val = stat_dict["min"]
max_val = stat_dict["max"]
if min_val != max_val:
raise ValueError(f"stat min != max ({min_val} != {max_val})")
return min_val


def write_parquet_metadata(catalog_path: str, storage_options: dict = None, output_path: str = None):
"""Generate parquet metadata, using the already-partitioned parquet files
for this catalog.
For more information on the general parquet metadata files, and why we write them, see
https://arrow.apache.org/docs/python/parquet.html#writing-metadata-and-common-metadata-files
Args:
catalog_path (str): base path for the catalog
storage_options: dictionary that contains abstract filesystem credentials
output_path (str): base path for writing out metadata files
defaults to `catalog_path` if unspecified
"""
ignore_prefixes = [
"intermediate",
"_common_metadata",
"_metadata",
]

dataset = file_io.read_parquet_dataset(
catalog_path,
storage_options=storage_options,
ignore_prefixes=ignore_prefixes,
exclude_invalid_files=True,
)
metadata_collector = []

for hips_file in dataset.files:
hips_file_pointer = file_io.get_file_pointer_from_path(hips_file, include_protocol=catalog_path)
single_metadata = file_io.read_parquet_metadata(hips_file_pointer, storage_options=storage_options)

# Users must set the file path of each chunk before combining the metadata.
relative_path = hips_file[len(catalog_path) :]
single_metadata.set_file_path(relative_path)
metadata_collector.append(single_metadata)

## Write out the two metadata files
if output_path is None:
output_path = catalog_path
catalog_base_dir = file_io.get_file_pointer_from_path(output_path)
metadata_file_pointer = paths.get_parquet_metadata_pointer(catalog_base_dir)
common_metadata_file_pointer = paths.get_common_metadata_pointer(catalog_base_dir)

file_io.write_parquet_metadata(
dataset.schema,
metadata_file_pointer,
metadata_collector=metadata_collector,
write_statistics=True,
storage_options=storage_options,
)
file_io.write_parquet_metadata(
dataset.schema, common_metadata_file_pointer, storage_options=storage_options
)


def write_parquet_metadata_for_batches(
batches: List[pa.RecordBatch], output_path: str = None, storage_options: dict = None
):
"""Write parquet metadata files for some pyarrow table batches.
This writes the batches to a temporary parquet dataset using local storage, and
generates the metadata for the partitioned catalog parquet files.
Args:
batches (List[pa.RecordBatch]): create one batch per group of data (partition or row group)
output_path (str): base path for writing out metadata files
defaults to `catalog_path` if unspecified
storage_options: dictionary that contains abstract filesystem credentials
"""

temp_info_table = pa.Table.from_batches(batches)

with tempfile.TemporaryDirectory() as temp_pq_file:
pq.write_to_dataset(temp_info_table, temp_pq_file)
write_parquet_metadata(temp_pq_file, storage_options=storage_options, output_path=output_path)


def read_row_group_fragments(metadata_file: str, storage_options: dict = None):
"""Generator for metadata fragment row groups in a parquet metadata file.
Args:
metadata_file (str): path to `_metadata` file.
storage_options: dictionary that contains abstract filesystem credentials
"""
file_system, dir_pointer = get_fs(file_pointer=metadata_file, storage_options=storage_options)
dir_pointer = strip_leading_slash_for_pyarrow(dir_pointer, file_system.protocol)
dataset = pds.parquet_dataset(dir_pointer, filesystem=file_system)

for frag in dataset.get_fragments():
for row_group in frag.row_groups:
yield row_group
28 changes: 21 additions & 7 deletions src/hipscat/io/validation.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
from hipscat.catalog.dataset.catalog_info_factory import from_catalog_dir
from hipscat.io import get_partition_info_pointer
from hipscat.io import get_parquet_metadata_pointer, get_partition_info_pointer
from hipscat.io.file_io.file_pointer import FilePointer, is_regular_file


def is_valid_catalog(pointer: FilePointer) -> bool:
"""Checks if a catalog is valid for a given base catalog pointer
Args:
pointer: pointer to base catalog directory
pointer (FilePointer): pointer to base catalog directory
Returns:
True if both the catalog_info and partition_info files are
valid, False otherwise
"""
return is_catalog_info_valid(pointer) and is_partition_info_valid(pointer)
return is_catalog_info_valid(pointer) and (is_partition_info_valid(pointer) or is_metadata_valid(pointer))


def is_catalog_info_valid(pointer):
def is_catalog_info_valid(pointer: FilePointer) -> bool:
"""Checks if catalog_info is valid for a given base catalog pointer
Args:
pointer: pointer to base catalog directory
pointer (FilePointer): pointer to base catalog directory
Returns:
True if the catalog_info file exists, and it is correctly formatted,
Expand All @@ -34,15 +34,29 @@ def is_catalog_info_valid(pointer):
return is_valid


def is_partition_info_valid(pointer):
def is_partition_info_valid(pointer: FilePointer) -> bool:
"""Checks if partition_info is valid for a given base catalog pointer
Args:
pointer: pointer to base catalog directory
pointer (FilePointer): pointer to base catalog directory
Returns:
True if the partition_info file exists, False otherwise
"""
partition_info_pointer = get_partition_info_pointer(pointer)
partition_info_exists = is_regular_file(partition_info_pointer)
return partition_info_exists


def is_metadata_valid(pointer: FilePointer) -> bool:
"""Checks if _metadata is valid for a given base catalog pointer
Args:
pointer (FilePointer): pointer to base catalog directory
Returns:
True if the _metadata file exists, False otherwise
"""
metadata_file = get_parquet_metadata_pointer(pointer)
metadata_file_exists = is_regular_file(metadata_file)
return metadata_file_exists
38 changes: 4 additions & 34 deletions src/hipscat/io/write_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import pandas as pd

from hipscat.io import file_io, paths
from hipscat.io.parquet_metadata import write_parquet_metadata as wpm


def write_json_file(
Expand Down Expand Up @@ -116,41 +117,10 @@ def write_parquet_metadata(catalog_path, storage_options: Union[Dict[Any, Any],
catalog_path (str): base path for the catalog
storage_options: dictionary that contains abstract filesystem credentials
"""

ignore_prefixes = [
"intermediate",
"_common_metadata",
"_metadata",
]

dataset = file_io.read_parquet_dataset(
catalog_path,
storage_options=storage_options,
ignore_prefixes=ignore_prefixes,
exclude_invalid_files=True,
)
metadata_collector = []

for hips_file in dataset.files:
hips_file_pointer = file_io.get_file_pointer_from_path(hips_file, include_protocol=catalog_path)
single_metadata = file_io.read_parquet_metadata(hips_file_pointer, storage_options=storage_options)
relative_path = hips_file[len(catalog_path) :]
single_metadata.set_file_path(relative_path)
metadata_collector.append(single_metadata)

## Write out the two metadata files
catalog_base_dir = file_io.get_file_pointer_from_path(catalog_path)
metadata_file_pointer = paths.get_parquet_metadata_pointer(catalog_base_dir)
common_metadata_file_pointer = paths.get_common_metadata_pointer(catalog_base_dir)

file_io.write_parquet_metadata(
dataset.schema,
metadata_file_pointer,
metadata_collector=metadata_collector,
wpm(
catalog_path=catalog_path,
storage_options=storage_options,
)
file_io.write_parquet_metadata(
dataset.schema, common_metadata_file_pointer, storage_options=storage_options
output_path=catalog_path,
)


Expand Down
Loading

0 comments on commit 72dd5ba

Please sign in to comment.