From 94b0cd179502ca78ba908b558bc8f6e1df0def5f Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Fri, 27 Oct 2023 21:20:19 +0800 Subject: [PATCH] address comments --- .../columnindex/ColumnIndexBuilder.java | 14 ++---- .../columnindex/DoubleColumnIndexBuilder.java | 1 + .../columnindex/FloatColumnIndexBuilder.java | 1 + .../parquet/hadoop/ParquetFileWriter.java | 41 ++++++++++++------ .../hadoop/rewrite/ParquetRewriter.java | 43 ++++++++----------- 5 files changed, 50 insertions(+), 50 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java index 35144467d2..7c221efbfb 100644 --- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java +++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java @@ -451,16 +451,13 @@ int sizeOf(Object value) { } }; + private PrimitiveType type; private final BooleanList nullPages = new BooleanArrayList(); private final LongList nullCounts = new LongArrayList(); - private final IntList pageIndexes = new IntArrayList(); - - private PrimitiveType type; private long minMaxSize; + private final IntList pageIndexes = new IntArrayList(); private int nextPageIndex; - protected boolean invalid; - /** * @return a no-op builder that does not collect statistics objects and therefore returns {@code null} at * {@link #build()}. @@ -546,11 +543,6 @@ public static ColumnIndex build( * the statistics to be added */ public void add(Statistics stats) { - if (stats.isEmpty()) { - invalid = true; - return; - } - if (stats.hasNonNullValue()) { nullPages.add(false); Object min = stats.genericGetMin(); @@ -611,7 +603,7 @@ public ColumnIndex build() { } private ColumnIndexBase build(PrimitiveType type) { - if (nullPages.isEmpty() || invalid) { + if (nullPages.isEmpty()) { return null; } ColumnIndexBase columnIndex = createColumnIndex(type); diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/DoubleColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/DoubleColumnIndexBuilder.java index 72b8b2d088..5a5e7c5578 100644 --- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/DoubleColumnIndexBuilder.java +++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/DoubleColumnIndexBuilder.java @@ -84,6 +84,7 @@ int compareValueToMax(int arrayIndex) { private final DoubleList minValues = new DoubleArrayList(); private final DoubleList maxValues = new DoubleArrayList(); + private boolean invalid; private static double convert(ByteBuffer buffer) { return buffer.order(LITTLE_ENDIAN).getDouble(0); diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/FloatColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/FloatColumnIndexBuilder.java index 2ba5718f88..8bc21cd23a 100644 --- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/FloatColumnIndexBuilder.java +++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/FloatColumnIndexBuilder.java @@ -84,6 +84,7 @@ int compareValueToMax(int arrayIndex) { private final FloatList minValues = new FloatArrayList(); private final FloatList maxValues = new FloatArrayList(); + private boolean invalid; private static float convert(ByteBuffer buffer) { return buffer.order(LITTLE_ENDIAN).getFloat(0); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java index c9ac7d6a90..34e13c3c01 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java @@ -152,6 +152,7 @@ public static enum Mode { private long uncompressedLength; private long compressedLength; private Statistics currentStatistics; // accumulated in writePage(s) + private boolean currentStatisticsAreValid; private ColumnIndexBuilder columnIndexBuilder; private OffsetIndexBuilder offsetIndexBuilder; @@ -474,6 +475,7 @@ public void startColumn(ColumnDescriptor descriptor, uncompressedLength = 0; // The statistics will be copied from the first one added at writeDataPage(s) so we have the correct typed one currentStatistics = null; + currentStatisticsAreValid = true; columnIndexBuilder = ColumnIndexBuilder.getBuilder(currentChunkType, columnIndexTruncateLength); offsetIndexBuilder = OffsetIndexBuilder.getBuilder(); @@ -722,14 +724,7 @@ public void writeDataPage( LOG.debug("{}: write data page content {}", out.getPos(), compressedPageSize); bytes.writeAllTo(out); - // Copying the statistics if it is not initialized yet so we have the correct typed one - if (currentStatistics == null) { - currentStatistics = statistics.copy(); - } else { - currentStatistics.mergeStatistics(statistics); - } - - columnIndexBuilder.add(statistics); + mergeColumnStatistics(statistics); encodingStatsBuilder.addDataEncoding(valuesEncoding); currentEncodings.add(rlEncoding); @@ -867,13 +862,8 @@ public void writeDataPageV2( this.uncompressedLength += uncompressedSize + headersSize; this.compressedLength += compressedSize + headersSize; - if (currentStatistics == null) { - currentStatistics = statistics.copy(); - } else { - currentStatistics.mergeStatistics(statistics); - } + mergeColumnStatistics(statistics); - columnIndexBuilder.add(statistics); currentEncodings.add(dataEncoding); encodingStatsBuilder.addDataEncoding(dataEncoding); @@ -1000,6 +990,7 @@ public void endColumn(Statistics totalStatistics) throws IOException { currentStatistics = totalStatistics; // Invalid the ColumnIndex columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder(); + currentStatisticsAreValid = true; endColumn(); } @@ -1010,6 +1001,7 @@ public void endColumn(Statistics totalStatistics) throws IOException { public void endColumn() throws IOException { state = state.endColumn(); LOG.debug("{}: end column", out.getPos()); + Preconditions.checkState(currentStatisticsAreValid, "Column statistics should be valid"); if (columnIndexBuilder.getMinMaxSize() > columnIndexBuilder.getPageCount() * MAX_STATS_SIZE) { currentColumnIndexes.add(null); } else { @@ -1332,6 +1324,27 @@ private int toIntWithCheck(long size) { return (int)size; } + private void mergeColumnStatistics(Statistics statistics) { + if (!currentStatisticsAreValid) { + return; + } + + if (statistics == null) { + // The column index should be invalid if some page statistics are null. + // See PARQUET-2365 for more details + currentStatistics = null; + currentStatisticsAreValid = false; + columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder(); + } else if (currentStatistics == null) { + // Copying the statistics if it is not initialized yet so we have the correct typed one + currentStatistics = statistics.copy(); + columnIndexBuilder.add(statistics); + } else { + currentStatistics.mergeStatistics(statistics); + columnIndexBuilder.add(statistics); + } + } + private static void serializeOffsetIndexes( List> offsetIndexes, List blocks, diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java index 458205b245..43d9d40c23 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java @@ -316,7 +316,7 @@ private void processBlocksFromReader() throws IOException { writer.startColumn(descriptor, crStore.getColumnReader(descriptor).getTotalValueCount(), newCodecName); boolean needOverwriteStatistics = processChunk(chunk, newCodecName, columnChunkEncryptorRunTime, encryptColumn); if (needOverwriteStatistics) { - // All the page statistics are invalid, so we need to overwrite the column statistics + // All the column statistics are invalid, so we need to overwrite the column statistics writer.endColumn(chunk.getStatistics()); } else { writer.endColumn(); @@ -385,7 +385,7 @@ private boolean processChunk(ColumnChunkMetaData chunk, DictionaryPage dictionaryPage = null; long readValues = 0; Statistics statistics = null; - Statistics emptyStatistics = Statistics.getBuilderForReading(chunk.getPrimitiveType()).build(); + boolean needOverwriteColumnStatistics = false; ParquetMetadataConverter converter = new ParquetMetadataConverter(); int pageOrdinal = 0; long totalChunkValues = chunk.getValueCount(); @@ -431,20 +431,16 @@ private boolean processChunk(ColumnChunkMetaData chunk, encryptColumn, dataEncryptor, dataPageAAD); - Statistics v1PageStatistics = convertStatistics( + statistics = convertStatistics( originalCreatedBy, chunk.getPrimitiveType(), headerV1.getStatistics(), columnIndex, pageOrdinal, converter); - if (v1PageStatistics == null) { + if (statistics == null) { // Reach here means both the columnIndex and the page header statistics are null - if (statistics != null) { - // Mixed null page statistics and non-null page statistics is not allowed - throw new IOException("Detected mixed null page statistics and non-null page statistics"); - } - // Pass an empty page statistics to writer and overwrite the column statistics in the end - v1PageStatistics = emptyStatistics; + needOverwriteColumnStatistics = true; } else { - statistics = v1PageStatistics; + Preconditions.checkState( + !needOverwriteColumnStatistics, + "Detected mixed null page statistics and non-null page statistics"); } - readValues += headerV1.getNum_values(); if (offsetIndex != null) { long rowCount = 1 + offsetIndex.getLastRowIndex( @@ -452,7 +448,7 @@ private boolean processChunk(ColumnChunkMetaData chunk, writer.writeDataPage(toIntWithCheck(headerV1.getNum_values()), pageHeader.getUncompressed_page_size(), BytesInput.from(pageLoad), - v1PageStatistics, + statistics, toIntWithCheck(rowCount), converter.getEncoding(headerV1.getRepetition_level_encoding()), converter.getEncoding(headerV1.getDefinition_level_encoding()), @@ -463,7 +459,7 @@ private boolean processChunk(ColumnChunkMetaData chunk, writer.writeDataPage(toIntWithCheck(headerV1.getNum_values()), pageHeader.getUncompressed_page_size(), BytesInput.from(pageLoad), - v1PageStatistics, + statistics, converter.getEncoding(headerV1.getRepetition_level_encoding()), converter.getEncoding(headerV1.getDefinition_level_encoding()), converter.getEncoding(headerV1.getEncoding()), @@ -494,18 +490,15 @@ private boolean processChunk(ColumnChunkMetaData chunk, encryptColumn, dataEncryptor, dataPageAAD); - Statistics v2PageStatistics = convertStatistics( + statistics = convertStatistics( originalCreatedBy, chunk.getPrimitiveType(), headerV2.getStatistics(), columnIndex, pageOrdinal, converter); - if (v2PageStatistics == null) { + if (statistics == null) { // Reach here means both the columnIndex and the page header statistics are null - if (statistics != null) { - // Mixed null page statistics and non-null page statistics is not allowed - throw new IOException("Detected mixed null page statistics and non-null page statistics"); - } - // Pass an empty page statistics to writer and overwrite the column statistics in the end - v2PageStatistics = emptyStatistics; + needOverwriteColumnStatistics = true; } else { - statistics = v2PageStatistics; + Preconditions.checkState( + !needOverwriteColumnStatistics, + "Detected mixed null page statistics and non-null page statistics"); } readValues += headerV2.getNum_values(); writer.writeDataPageV2(headerV2.getNum_rows(), @@ -516,7 +509,7 @@ private boolean processChunk(ColumnChunkMetaData chunk, converter.getEncoding(headerV2.getEncoding()), BytesInput.from(pageLoad), rawDataLength, - v2PageStatistics, + statistics, metaEncryptor, dataPageHeaderAAD); pageOrdinal++; @@ -527,7 +520,7 @@ private boolean processChunk(ColumnChunkMetaData chunk, } } - return statistics == null; + return needOverwriteColumnStatistics; } private Statistics convertStatistics(String createdBy,