Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-40592: [C++][Parquet] Implement SizeStatistics #40594

Merged
merged 1 commit into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cpp/src/arrow/util/hashing.h
Original file line number Diff line number Diff line change
Expand Up @@ -843,6 +843,14 @@ class BinaryMemoTable : public MemoTable {
}
}

// Visit the stored value at a specific index in insertion order.
// The visitor function should have the signature `void(std::string_view)`
wgtmac marked this conversation as resolved.
Show resolved Hide resolved
// or `void(const std::string_view&)`.
template <typename VisitFunc>
void VisitValue(int32_t idx, VisitFunc&& visit) const {
visit(binary_builder_.GetView(idx));
}

protected:
struct Payload {
int32_t memo_index;
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/parquet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ set(PARQUET_SRCS
printer.cc
properties.cc
schema.cc
size_statistics.cc
statistics.cc
stream_reader.cc
stream_writer.cc
Expand Down Expand Up @@ -373,6 +374,7 @@ add_parquet_test(internals-test
metadata_test.cc
page_index_test.cc
public_api_test.cc
size_statistics_test.cc
types_test.cc)

set_source_files_properties(public_api_test.cc PROPERTIES SKIP_PRECOMPILE_HEADERS ON
Expand Down
22 changes: 15 additions & 7 deletions cpp/src/parquet/column_page.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <optional>
#include <string>

#include "parquet/size_statistics.h"
#include "parquet/statistics.h"
#include "parquet/types.h"

Expand Down Expand Up @@ -69,27 +70,30 @@ class DataPage : public Page {
/// Currently it is only present from data pages created by ColumnWriter in order
/// to collect page index.
std::optional<int64_t> first_row_index() const { return first_row_index_; }
const SizeStatistics& size_statistics() const { return size_statistics_; }

virtual ~DataPage() = default;

protected:
DataPage(PageType::type type, const std::shared_ptr<Buffer>& buffer, int32_t num_values,
Encoding::type encoding, int64_t uncompressed_size,
EncodedStatistics statistics = EncodedStatistics(),
std::optional<int64_t> first_row_index = std::nullopt)
EncodedStatistics statistics, std::optional<int64_t> first_row_index,
SizeStatistics size_statistics)
: Page(buffer, type),
num_values_(num_values),
encoding_(encoding),
uncompressed_size_(uncompressed_size),
statistics_(std::move(statistics)),
first_row_index_(std::move(first_row_index)) {}
first_row_index_(std::move(first_row_index)),
size_statistics_(std::move(size_statistics)) {}

int32_t num_values_;
Encoding::type encoding_;
int64_t uncompressed_size_;
EncodedStatistics statistics_;
/// Row ordinal within the row group to the first row in the data page.
std::optional<int64_t> first_row_index_;
SizeStatistics size_statistics_;
};

class DataPageV1 : public DataPage {
Expand All @@ -98,9 +102,11 @@ class DataPageV1 : public DataPage {
Encoding::type encoding, Encoding::type definition_level_encoding,
Encoding::type repetition_level_encoding, int64_t uncompressed_size,
EncodedStatistics statistics = EncodedStatistics(),
std::optional<int64_t> first_row_index = std::nullopt)
std::optional<int64_t> first_row_index = std::nullopt,
SizeStatistics size_statistics = SizeStatistics())
: DataPage(PageType::DATA_PAGE, buffer, num_values, encoding, uncompressed_size,
std::move(statistics), std::move(first_row_index)),
std::move(statistics), std::move(first_row_index),
std::move(size_statistics)),
definition_level_encoding_(definition_level_encoding),
repetition_level_encoding_(repetition_level_encoding) {}

Expand All @@ -120,9 +126,11 @@ class DataPageV2 : public DataPage {
int32_t definition_levels_byte_length, int32_t repetition_levels_byte_length,
int64_t uncompressed_size, bool is_compressed = false,
EncodedStatistics statistics = EncodedStatistics(),
std::optional<int64_t> first_row_index = std::nullopt)
std::optional<int64_t> first_row_index = std::nullopt,
SizeStatistics size_statistics = SizeStatistics())
: DataPage(PageType::DATA_PAGE_V2, buffer, num_values, encoding, uncompressed_size,
std::move(statistics), std::move(first_row_index)),
std::move(statistics), std::move(first_row_index),
std::move(size_statistics)),
num_nulls_(num_nulls),
num_rows_(num_rows),
definition_levels_byte_length_(definition_levels_byte_length),
Expand Down
123 changes: 103 additions & 20 deletions cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
#include "parquet/platform.h"
#include "parquet/properties.h"
#include "parquet/schema.h"
#include "parquet/size_statistics.h"
#include "parquet/statistics.h"
#include "parquet/thrift_internal.h"
#include "parquet/types.h"
Expand Down Expand Up @@ -437,7 +438,7 @@ class SerializedPageWriter : public PageWriter {

/// Collect page index
if (column_index_builder_ != nullptr) {
column_index_builder_->AddPage(page.statistics());
column_index_builder_->AddPage(page.statistics(), page.size_statistics());
}
if (offset_index_builder_ != nullptr) {
const int64_t compressed_size = output_data_len + header_size;
Expand All @@ -451,8 +452,9 @@ class SerializedPageWriter : public PageWriter {
/// start_pos is a relative offset in the buffered mode. It should be
/// adjusted via OffsetIndexBuilder::Finish() after BufferedPageWriter
/// has flushed all data pages.
offset_index_builder_->AddPage(start_pos, static_cast<int32_t>(compressed_size),
*page.first_row_index());
offset_index_builder_->AddPage(
start_pos, static_cast<int32_t>(compressed_size), *page.first_row_index(),
page.size_statistics().unencoded_byte_array_data_bytes);
}

total_uncompressed_size_ += uncompressed_size + header_size;
Expand Down Expand Up @@ -774,11 +776,17 @@ class ColumnWriterImpl {
// Serializes Dictionary Page if enabled
virtual void WriteDictionaryPage() = 0;

// A convenience struct to combine the encoded statistics and size statistics
struct StatisticsPair {
EncodedStatistics encoded_stats;
SizeStatistics size_stats;
};

// Plain-encoded statistics of the current page
virtual EncodedStatistics GetPageStatistics() = 0;
virtual StatisticsPair GetPageStatistics() = 0;

// Plain-encoded statistics of the whole chunk
virtual EncodedStatistics GetChunkStatistics() = 0;
virtual StatisticsPair GetChunkStatistics() = 0;

// Merges page statistics into chunk statistics, then resets the values
virtual void ResetPageStatistics() = 0;
Expand Down Expand Up @@ -981,8 +989,7 @@ void ColumnWriterImpl::BuildDataPageV1(int64_t definition_levels_rle_size,
PARQUET_THROW_NOT_OK(uncompressed_data_->Resize(uncompressed_size, false));
ConcatenateBuffers(definition_levels_rle_size, repetition_levels_rle_size, values,
uncompressed_data_->mutable_data());

EncodedStatistics page_stats = GetPageStatistics();
auto [page_stats, page_size_stats] = GetPageStatistics();
page_stats.ApplyStatSizeLimits(properties_->max_statistics_size(descr_->path()));
page_stats.set_is_signed(SortOrder::SIGNED == descr_->sort_order());
ResetPageStatistics();
Expand All @@ -1006,13 +1013,15 @@ void ColumnWriterImpl::BuildDataPageV1(int64_t definition_levels_rle_size,
compressed_data->CopySlice(0, compressed_data->size(), allocator_));
std::unique_ptr<DataPage> page_ptr = std::make_unique<DataPageV1>(
compressed_data_copy, num_values, encoding_, Encoding::RLE, Encoding::RLE,
uncompressed_size, std::move(page_stats), first_row_index);
uncompressed_size, std::move(page_stats), first_row_index,
std::move(page_size_stats));
total_compressed_bytes_ += page_ptr->size() + sizeof(format::PageHeader);

data_pages_.push_back(std::move(page_ptr));
} else { // Eagerly write pages
DataPageV1 page(compressed_data, num_values, encoding_, Encoding::RLE, Encoding::RLE,
uncompressed_size, std::move(page_stats), first_row_index);
uncompressed_size, std::move(page_stats), first_row_index,
std::move(page_size_stats));
WriteDataPage(page);
}
}
Expand All @@ -1039,7 +1048,7 @@ void ColumnWriterImpl::BuildDataPageV2(int64_t definition_levels_rle_size,
ConcatenateBuffers(definition_levels_rle_size, repetition_levels_rle_size,
compressed_values, combined->mutable_data());

EncodedStatistics page_stats = GetPageStatistics();
auto [page_stats, page_size_stats] = GetPageStatistics();
page_stats.ApplyStatSizeLimits(properties_->max_statistics_size(descr_->path()));
page_stats.set_is_signed(SortOrder::SIGNED == descr_->sort_order());
ResetPageStatistics();
Expand All @@ -1062,14 +1071,15 @@ void ColumnWriterImpl::BuildDataPageV2(int64_t definition_levels_rle_size,
combined->CopySlice(0, combined->size(), allocator_));
std::unique_ptr<DataPage> page_ptr = std::make_unique<DataPageV2>(
combined, num_values, null_count, num_rows, encoding_, def_levels_byte_length,
rep_levels_byte_length, uncompressed_size, pager_->has_compressor(), page_stats,
first_row_index);
rep_levels_byte_length, uncompressed_size, pager_->has_compressor(),
std::move(page_stats), first_row_index, std::move(page_size_stats));
total_compressed_bytes_ += page_ptr->size() + sizeof(format::PageHeader);
data_pages_.push_back(std::move(page_ptr));
} else {
DataPageV2 page(combined, num_values, null_count, num_rows, encoding_,
def_levels_byte_length, rep_levels_byte_length, uncompressed_size,
pager_->has_compressor(), page_stats, first_row_index);
pager_->has_compressor(), std::move(page_stats), first_row_index,
std::move(page_size_stats));
WriteDataPage(page);
}
}
Expand All @@ -1083,7 +1093,7 @@ int64_t ColumnWriterImpl::Close() {

FlushBufferedDataPages();

EncodedStatistics chunk_statistics = GetChunkStatistics();
auto [chunk_statistics, chunk_size_statistics] = GetChunkStatistics();
chunk_statistics.ApplyStatSizeLimits(
properties_->max_statistics_size(descr_->path()));
chunk_statistics.set_is_signed(SortOrder::SIGNED == descr_->sort_order());
Expand All @@ -1092,6 +1102,9 @@ int64_t ColumnWriterImpl::Close() {
if (rows_written_ > 0 && chunk_statistics.is_set()) {
metadata_->SetStatistics(chunk_statistics);
}
if (rows_written_ > 0 && chunk_size_statistics.is_set()) {
metadata_->SetSizeStatistics(chunk_size_statistics);
}
metadata_->SetKeyValueMetadata(key_value_metadata_);
pager_->Close(has_dictionary_, fallback_);
}
Expand Down Expand Up @@ -1217,6 +1230,11 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
page_statistics_ = MakeStatistics<DType>(descr_, allocator_);
chunk_statistics_ = MakeStatistics<DType>(descr_, allocator_);
}
if (properties->size_statistics_level() == SizeStatisticsLevel::ColumnChunk ||
properties->size_statistics_level() == SizeStatisticsLevel::PageAndColumnChunk) {
page_size_statistics_ = SizeStatistics::Make(descr_);
chunk_size_statistics_ = SizeStatistics::Make(descr_);
}
pages_change_on_record_boundaries_ =
properties->data_page_version() == ParquetDataPageVersion::V2 ||
properties->page_index_enabled(descr_->path());
Expand Down Expand Up @@ -1355,15 +1373,26 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
total_bytes_written_ += pager_->WriteDictionaryPage(page);
}

EncodedStatistics GetPageStatistics() override {
EncodedStatistics result;
if (page_statistics_) result = page_statistics_->Encode();
StatisticsPair GetPageStatistics() override {
StatisticsPair result;
if (page_statistics_) {
result.encoded_stats = page_statistics_->Encode();
}
if (properties_->size_statistics_level() == SizeStatisticsLevel::PageAndColumnChunk) {
ARROW_DCHECK(page_size_statistics_ != nullptr);
result.size_stats = *page_size_statistics_;
}
return result;
}

EncodedStatistics GetChunkStatistics() override {
EncodedStatistics result;
if (chunk_statistics_) result = chunk_statistics_->Encode();
StatisticsPair GetChunkStatistics() override {
StatisticsPair result;
if (chunk_statistics_) {
result.encoded_stats = chunk_statistics_->Encode();
}
if (chunk_size_statistics_) {
result.size_stats = *chunk_size_statistics_;
}
return result;
}

Expand All @@ -1372,6 +1401,10 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
chunk_statistics_->Merge(*page_statistics_);
page_statistics_->Reset();
}
if (page_size_statistics_ != nullptr) {
chunk_size_statistics_->Merge(*page_size_statistics_);
page_size_statistics_->Reset();
}
}

Type::type type() const override { return descr_->physical_type(); }
Expand Down Expand Up @@ -1425,6 +1458,8 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
DictEncoder<DType>* current_dict_encoder_;
std::shared_ptr<TypedStats> page_statistics_;
std::shared_ptr<TypedStats> chunk_statistics_;
std::unique_ptr<SizeStatistics> page_size_statistics_;
wgtmac marked this conversation as resolved.
Show resolved Hide resolved
std::shared_ptr<SizeStatistics> chunk_size_statistics_;
bool pages_change_on_record_boundaries_;

// If writing a sequence of ::arrow::DictionaryArray to the writer, we keep the
Expand Down Expand Up @@ -1467,6 +1502,8 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
rows_written_ += num_values;
num_buffered_rows_ += num_values;
}

UpdateLevelHistogram(num_values, def_levels, rep_levels);
return values_to_write;
}

Expand Down Expand Up @@ -1558,6 +1595,47 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
rows_written_ += num_levels;
num_buffered_rows_ += num_levels;
}

UpdateLevelHistogram(num_levels, def_levels, rep_levels);
}

void UpdateLevelHistogram(int64_t num_levels, const int16_t* def_levels,
const int16_t* rep_levels) const {
if (page_size_statistics_ == nullptr) {
return;
}

auto add_levels = [](std::vector<int64_t>& level_histogram,
::arrow::util::span<const int16_t> levels) {
wgtmac marked this conversation as resolved.
Show resolved Hide resolved
for (int16_t level : levels) {
ARROW_DCHECK_LT(level, static_cast<int16_t>(level_histogram.size()));
++level_histogram[level];
}
};

if (descr_->max_definition_level() > 0) {
add_levels(page_size_statistics_->definition_level_histogram,
{def_levels, static_cast<size_t>(num_levels)});
} else {
page_size_statistics_->definition_level_histogram[0] += num_levels;
}

if (descr_->max_repetition_level() > 0) {
add_levels(page_size_statistics_->repetition_level_histogram,
{rep_levels, static_cast<size_t>(num_levels)});
} else {
page_size_statistics_->repetition_level_histogram[0] += num_levels;
}
}

// Update the unencoded data bytes for ByteArray only per the specification.
void UpdateUnencodedDataBytes() const {
if constexpr (std::is_same_v<T, ByteArray>) {
wgtmac marked this conversation as resolved.
Show resolved Hide resolved
if (page_size_statistics_ != nullptr) {
page_size_statistics_->IncrementUnencodedByteArrayDataBytes(
current_encoder_->ReportUnencodedDataBytes());
}
}
}

void CommitWriteAndCheckPageLimit(int64_t num_levels, int64_t num_values,
Expand Down Expand Up @@ -1611,6 +1689,7 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
if (page_statistics_ != nullptr) {
page_statistics_->Update(values, num_values, num_nulls);
}
UpdateUnencodedDataBytes();
}

/// \brief Write values with spaces and update page statistics accordingly.
Expand Down Expand Up @@ -1639,6 +1718,7 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
page_statistics_->UpdateSpaced(values, valid_bits, valid_bits_offset,
num_spaced_values, num_values, num_nulls);
}
UpdateUnencodedDataBytes();
}
};

Expand Down Expand Up @@ -1739,6 +1819,8 @@ Status TypedColumnWriterImpl<DType>::WriteArrowDictionary(
writeable_indices,
MaybeReplaceValidity(writeable_indices, null_count, ctx->memory_pool));
dict_encoder->PutIndices(*writeable_indices);
// Update unencoded byte array data size to size statistics
UpdateUnencodedDataBytes();
CommitWriteAndCheckPageLimit(batch_size, batch_num_values, null_count, check_page);
value_offset += batch_num_spaced_values;
};
Expand Down Expand Up @@ -2219,6 +2301,7 @@ Status TypedColumnWriterImpl<ByteArrayType>::WriteArrowDense(
page_statistics_->IncrementNullCount(batch_size - non_null);
page_statistics_->IncrementNumValues(non_null);
}
UpdateUnencodedDataBytes();
CommitWriteAndCheckPageLimit(batch_size, batch_num_values, batch_size - non_null,
check_page);
CheckDictionarySizeLimit();
Expand Down
Loading
Loading