From ff36d6bca13e0c626dd3c7932d94fad2ee2c9214 Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Sat, 4 Nov 2023 18:41:24 +0800 Subject: [PATCH] PARQUET-2365 : Fixes NPE when rewriting column without column index (#1173) --- .../hadoop/ColumnChunkPageWriteStore.java | 38 ++++++----- .../parquet/hadoop/ParquetFileWriter.java | 49 ++++++++++---- .../hadoop/rewrite/ParquetRewriter.java | 22 +++++++ .../hadoop/rewrite/ParquetRewriterTest.java | 66 +++++++++---------- .../parquet/hadoop/util/TestFileBuilder.java | 26 +++++++- 5 files changed, 137 insertions(+), 64 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java index cd6ec5d930..54ce829c00 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java @@ -211,14 +211,7 @@ public void writePage(BytesInput bytes, this.totalValueCount += valueCount; this.pageCount += 1; - // Copying the statistics if it is not initialized yet so we have the correct typed one - if (totalStatistics == null) { - totalStatistics = statistics.copy(); - } else { - totalStatistics.mergeStatistics(statistics); - } - - columnIndexBuilder.add(statistics); + mergeColumnStatistics(statistics); offsetIndexBuilder.add(toIntWithCheck(tempOutputStream.size() + compressedSize), rowCount); // by concatenating before collecting instead of collecting twice, @@ -298,14 +291,7 @@ public void writePageV2( this.totalValueCount += valueCount; this.pageCount += 1; - // Copying the statistics if it is not initialized yet so we have the correct typed one - if (totalStatistics == null) { - totalStatistics = statistics.copy(); - } else { - totalStatistics.mergeStatistics(statistics); - } - - columnIndexBuilder.add(statistics); + mergeColumnStatistics(statistics); offsetIndexBuilder.add(toIntWithCheck((long) tempOutputStream.size() + compressedSize), rowCount); // by concatenating before collecting instead of collecting twice, @@ -329,6 +315,26 @@ private int toIntWithCheck(long size) { return (int)size; } + private void mergeColumnStatistics(Statistics statistics) { + if (totalStatistics != null && totalStatistics.isEmpty()) { + return; + } + + if (statistics == null || statistics.isEmpty()) { + // The column index and statistics should be invalid if some page statistics are null or empty. + // See PARQUET-2365 for more details + totalStatistics = Statistics.getBuilderForReading(path.getPrimitiveType()).build(); + columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder(); + } else if (totalStatistics == null) { + // Copying the statistics if it is not initialized yet, so we have the correct typed one + totalStatistics = statistics.copy(); + columnIndexBuilder.add(statistics); + } else { + totalStatistics.mergeStatistics(statistics); + columnIndexBuilder.add(statistics); + } + } + @Override public long getMemSize() { return buf.size(); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java index 146ffd4964..42ea6d921d 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java @@ -722,14 +722,7 @@ public void writeDataPage( LOG.debug("{}: write data page content {}", out.getPos(), compressedPageSize); bytes.writeAllTo(out); - // Copying the statistics if it is not initialized yet so we have the correct typed one - if (currentStatistics == null) { - currentStatistics = statistics.copy(); - } else { - currentStatistics.mergeStatistics(statistics); - } - - columnIndexBuilder.add(statistics); + mergeColumnStatistics(statistics); encodingStatsBuilder.addDataEncoding(valuesEncoding); currentEncodings.add(rlEncoding); @@ -867,13 +860,8 @@ public void writeDataPageV2( this.uncompressedLength += uncompressedSize + headersSize; this.compressedLength += compressedSize + headersSize; - if (currentStatistics == null) { - currentStatistics = statistics.copy(); - } else { - currentStatistics.mergeStatistics(statistics); - } + mergeColumnStatistics(statistics); - columnIndexBuilder.add(statistics); currentEncodings.add(dataEncoding); encodingStatsBuilder.addDataEncoding(dataEncoding); @@ -988,6 +976,19 @@ void writeColumnChunk(ColumnDescriptor descriptor, endColumn(); } + /** + * Overwrite the column total statistics. This special used when the column total statistics + * is known while all the page statistics are invalid, for example when rewriting the column. + * + * @param totalStatistics the column total statistics + */ + public void invalidateStatistics(Statistics totalStatistics) { + Preconditions.checkArgument(totalStatistics != null, "Column total statistics can not be null"); + currentStatistics = totalStatistics; + // Invalid the ColumnIndex + columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder(); + } + /** * end a column (once all rep, def and data have been written) * @throws IOException if there is an error while writing @@ -1317,6 +1318,26 @@ private int toIntWithCheck(long size) { return (int)size; } + private void mergeColumnStatistics(Statistics statistics) { + if (currentStatistics != null && currentStatistics.isEmpty()) { + return; + } + + if (statistics == null || statistics.isEmpty()) { + // The column index and statistics should be invalid if some page statistics are null or empty. + // See PARQUET-2365 for more details + currentStatistics = Statistics.getBuilderForReading(currentChunkType).build(); + columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder(); + } else if (currentStatistics == null) { + // Copying the statistics if it is not initialized yet, so we have the correct typed one + currentStatistics = statistics.copy(); + columnIndexBuilder.add(statistics); + } else { + currentStatistics.mergeStatistics(statistics); + columnIndexBuilder.add(statistics); + } + } + private static void serializeOffsetIndexes( List> offsetIndexes, List blocks, diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java index 2eaaab18c7..bf2155e28c 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java @@ -392,6 +392,7 @@ private void processChunk(ColumnChunkMetaData chunk, DictionaryPage dictionaryPage = null; long readValues = 0; Statistics statistics = null; + boolean isColumnStatisticsMalformed = false; ParquetMetadataConverter converter = new ParquetMetadataConverter(); int pageOrdinal = 0; long totalChunkValues = chunk.getValueCount(); @@ -439,6 +440,14 @@ private void processChunk(ColumnChunkMetaData chunk, dataPageAAD); statistics = convertStatistics( originalCreatedBy, chunk.getPrimitiveType(), headerV1.getStatistics(), columnIndex, pageOrdinal, converter); + if (statistics == null) { + // Reach here means both the columnIndex and the page header statistics are null + isColumnStatisticsMalformed = true; + } else { + Preconditions.checkState( + !isColumnStatisticsMalformed, + "Detected mixed null page statistics and non-null page statistics"); + } readValues += headerV1.getNum_values(); if (offsetIndex != null) { long rowCount = 1 + offsetIndex.getLastRowIndex( @@ -490,6 +499,14 @@ private void processChunk(ColumnChunkMetaData chunk, dataPageAAD); statistics = convertStatistics( originalCreatedBy, chunk.getPrimitiveType(), headerV2.getStatistics(), columnIndex, pageOrdinal, converter); + if (statistics == null) { + // Reach here means both the columnIndex and the page header statistics are null + isColumnStatisticsMalformed = true; + } else { + Preconditions.checkState( + !isColumnStatisticsMalformed, + "Detected mixed null page statistics and non-null page statistics"); + } readValues += headerV2.getNum_values(); writer.writeDataPageV2(headerV2.getNum_rows(), headerV2.getNum_nulls(), @@ -509,6 +526,11 @@ private void processChunk(ColumnChunkMetaData chunk, break; } } + + if (isColumnStatisticsMalformed) { + // All the column statistics are invalid, so we need to overwrite the column statistics + writer.invalidateStatistics(chunk.getStatistics()); + } } private Statistics convertStatistics(String createdBy, diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java index 6ce7e2c91f..ab25566a9e 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java @@ -80,6 +80,8 @@ import java.util.stream.Collectors; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; import static org.apache.parquet.schema.Type.Repetition.OPTIONAL; import static org.apache.parquet.schema.Type.Repetition.REPEATED; @@ -127,17 +129,7 @@ private void testPruneSingleColumnTranslateCodec(List inputPaths) throws E rewriter.close(); // Verify the schema are not changed for the columns not pruned - ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new Path(outputFile), ParquetMetadataConverter.NO_FILTER); - MessageType schema = pmd.getFileMetaData().getSchema(); - List fields = schema.getFields(); - assertEquals(fields.size(), 3); - assertEquals(fields.get(0).getName(), "DocId"); - assertEquals(fields.get(1).getName(), "Name"); - assertEquals(fields.get(2).getName(), "Links"); - List subFields = fields.get(2).asGroupType().getFields(); - assertEquals(subFields.size(), 2); - assertEquals(subFields.get(0).getName(), "Backward"); - assertEquals(subFields.get(1).getName(), "Forward"); + validateSchema(); // Verify codec has been translated verifyCodec(outputFile, new HashSet() {{ @@ -199,17 +191,7 @@ private void testPruneNullifyTranslateCodec(List inputPaths) throws Except rewriter.close(); // Verify the schema are not changed for the columns not pruned - ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new Path(outputFile), ParquetMetadataConverter.NO_FILTER); - MessageType schema = pmd.getFileMetaData().getSchema(); - List fields = schema.getFields(); - assertEquals(fields.size(), 3); - assertEquals(fields.get(0).getName(), "DocId"); - assertEquals(fields.get(1).getName(), "Name"); - assertEquals(fields.get(2).getName(), "Links"); - List subFields = fields.get(2).asGroupType().getFields(); - assertEquals(subFields.size(), 2); - assertEquals(subFields.get(0).getName(), "Backward"); - assertEquals(subFields.get(1).getName(), "Forward"); + validateSchema(); // Verify codec has been translated verifyCodec(outputFile, new HashSet() {{ @@ -276,17 +258,7 @@ private void testPruneEncryptTranslateCodec(List inputPaths) throws Except rewriter.close(); // Verify the schema are not changed for the columns not pruned - ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new Path(outputFile), ParquetMetadataConverter.NO_FILTER); - MessageType schema = pmd.getFileMetaData().getSchema(); - List fields = schema.getFields(); - assertEquals(fields.size(), 3); - assertEquals(fields.get(0).getName(), "DocId"); - assertEquals(fields.get(1).getName(), "Name"); - assertEquals(fields.get(2).getName(), "Links"); - List subFields = fields.get(2).asGroupType().getFields(); - assertEquals(subFields.size(), 2); - assertEquals(subFields.get(0).getName(), "Backward"); - assertEquals(subFields.get(1).getName(), "Forward"); + validateSchema(); // Verify codec has been translated FileDecryptionProperties fileDecryptionProperties = EncDecProperties.getFileDecryptionProperties(); @@ -672,6 +644,8 @@ private MessageType createSchema() { new PrimitiveType(OPTIONAL, INT64, "DocId"), new PrimitiveType(REQUIRED, BINARY, "Name"), new PrimitiveType(OPTIONAL, BINARY, "Gender"), + new PrimitiveType(REPEATED, FLOAT, "FloatFraction"), + new PrimitiveType(OPTIONAL, DOUBLE, "DoubleFraction"), new GroupType(OPTIONAL, "Links", new PrimitiveType(REPEATED, BINARY, "Backward"), new PrimitiveType(REPEATED, BINARY, "Forward"))); @@ -713,6 +687,16 @@ private void validateColumnData(Set prunePaths, expectGroup.getBinary("Gender", 0).getBytes()); } + if (!prunePaths.contains("FloatFraction") && !nullifiedPaths.contains("FloatFraction")) { + assertEquals(group.getFloat("FloatFraction", 0), + expectGroup.getFloat("FloatFraction", 0), 0); + } + + if (!prunePaths.contains("DoubleFraction") && !nullifiedPaths.contains("DoubleFraction")) { + assertEquals(group.getDouble("DoubleFraction", 0), + expectGroup.getDouble("DoubleFraction", 0), 0); + } + Group subGroup = group.getGroup("Links", 0); if (!prunePaths.contains("Links.Backward") && !nullifiedPaths.contains("Links.Backward")) { @@ -949,4 +933,20 @@ private Map> allBloomFilters( return allBloomFilters; } + + private void validateSchema() throws IOException { + ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new Path(outputFile), ParquetMetadataConverter.NO_FILTER); + MessageType schema = pmd.getFileMetaData().getSchema(); + List fields = schema.getFields(); + assertEquals(fields.size(), 5); + assertEquals(fields.get(0).getName(), "DocId"); + assertEquals(fields.get(1).getName(), "Name"); + assertEquals(fields.get(2).getName(), "FloatFraction"); + assertEquals(fields.get(3).getName(), "DoubleFraction"); + assertEquals(fields.get(4).getName(), "Links"); + List subFields = fields.get(4).asGroupType().getFields(); + assertEquals(subFields.size(), 2); + assertEquals(subFields.get(0).getName(), "Backward"); + assertEquals(subFields.get(1).getName(), "Forward"); + } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileBuilder.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileBuilder.java index 951899bb3d..9eef65d7f5 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileBuilder.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileBuilder.java @@ -36,6 +36,8 @@ import java.util.concurrent.ThreadLocalRandom; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; @@ -175,7 +177,13 @@ else if (primitiveType.getPrimitiveTypeName().equals(INT64)) { else if (primitiveType.getPrimitiveTypeName().equals(BINARY)) { g.add(type.getName(), getString()); } - // Only support 3 types now, more can be added later + else if (primitiveType.getPrimitiveTypeName().equals(FLOAT)) { + g.add(type.getName(), getFloat()); + } + else if (primitiveType.getPrimitiveTypeName().equals(DOUBLE)) { + g.add(type.getName(), getDouble()); + } + // Only support 5 types now, more can be added later } else { GroupType groupType = (GroupType) type; @@ -206,6 +214,22 @@ private static String getString() return sb.toString(); } + private static float getFloat() + { + if (ThreadLocalRandom.current().nextBoolean()) { + return Float.NaN; + } + return ThreadLocalRandom.current().nextFloat(); + } + + private static double getDouble() + { + if (ThreadLocalRandom.current().nextBoolean()) { + return Double.NaN; + } + return ThreadLocalRandom.current().nextDouble(); + } + public static String createTempFile(String prefix) { try {