Skip to content

Commit

Permalink
PARQUET-2365 : Fixes NPE when rewriting column without column index (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ConeyLiu authored Nov 4, 2023
1 parent 7c6758b commit ff36d6b
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,14 +211,7 @@ public void writePage(BytesInput bytes,
this.totalValueCount += valueCount;
this.pageCount += 1;

// Copying the statistics if it is not initialized yet so we have the correct typed one
if (totalStatistics == null) {
totalStatistics = statistics.copy();
} else {
totalStatistics.mergeStatistics(statistics);
}

columnIndexBuilder.add(statistics);
mergeColumnStatistics(statistics);
offsetIndexBuilder.add(toIntWithCheck(tempOutputStream.size() + compressedSize), rowCount);

// by concatenating before collecting instead of collecting twice,
Expand Down Expand Up @@ -298,14 +291,7 @@ public void writePageV2(
this.totalValueCount += valueCount;
this.pageCount += 1;

// Copying the statistics if it is not initialized yet so we have the correct typed one
if (totalStatistics == null) {
totalStatistics = statistics.copy();
} else {
totalStatistics.mergeStatistics(statistics);
}

columnIndexBuilder.add(statistics);
mergeColumnStatistics(statistics);
offsetIndexBuilder.add(toIntWithCheck((long) tempOutputStream.size() + compressedSize), rowCount);

// by concatenating before collecting instead of collecting twice,
Expand All @@ -329,6 +315,26 @@ private int toIntWithCheck(long size) {
return (int)size;
}

private void mergeColumnStatistics(Statistics<?> statistics) {
if (totalStatistics != null && totalStatistics.isEmpty()) {
return;
}

if (statistics == null || statistics.isEmpty()) {
// The column index and statistics should be invalid if some page statistics are null or empty.
// See PARQUET-2365 for more details
totalStatistics = Statistics.getBuilderForReading(path.getPrimitiveType()).build();
columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder();
} else if (totalStatistics == null) {
// Copying the statistics if it is not initialized yet, so we have the correct typed one
totalStatistics = statistics.copy();
columnIndexBuilder.add(statistics);
} else {
totalStatistics.mergeStatistics(statistics);
columnIndexBuilder.add(statistics);
}
}

@Override
public long getMemSize() {
return buf.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -722,14 +722,7 @@ public void writeDataPage(
LOG.debug("{}: write data page content {}", out.getPos(), compressedPageSize);
bytes.writeAllTo(out);

// Copying the statistics if it is not initialized yet so we have the correct typed one
if (currentStatistics == null) {
currentStatistics = statistics.copy();
} else {
currentStatistics.mergeStatistics(statistics);
}

columnIndexBuilder.add(statistics);
mergeColumnStatistics(statistics);

encodingStatsBuilder.addDataEncoding(valuesEncoding);
currentEncodings.add(rlEncoding);
Expand Down Expand Up @@ -867,13 +860,8 @@ public void writeDataPageV2(
this.uncompressedLength += uncompressedSize + headersSize;
this.compressedLength += compressedSize + headersSize;

if (currentStatistics == null) {
currentStatistics = statistics.copy();
} else {
currentStatistics.mergeStatistics(statistics);
}
mergeColumnStatistics(statistics);

columnIndexBuilder.add(statistics);
currentEncodings.add(dataEncoding);
encodingStatsBuilder.addDataEncoding(dataEncoding);

Expand Down Expand Up @@ -988,6 +976,19 @@ void writeColumnChunk(ColumnDescriptor descriptor,
endColumn();
}

/**
* Overwrite the column total statistics. This special used when the column total statistics
* is known while all the page statistics are invalid, for example when rewriting the column.
*
* @param totalStatistics the column total statistics
*/
public void invalidateStatistics(Statistics<?> totalStatistics) {
Preconditions.checkArgument(totalStatistics != null, "Column total statistics can not be null");
currentStatistics = totalStatistics;
// Invalid the ColumnIndex
columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder();
}

/**
* end a column (once all rep, def and data have been written)
* @throws IOException if there is an error while writing
Expand Down Expand Up @@ -1317,6 +1318,26 @@ private int toIntWithCheck(long size) {
return (int)size;
}

private void mergeColumnStatistics(Statistics<?> statistics) {
if (currentStatistics != null && currentStatistics.isEmpty()) {
return;
}

if (statistics == null || statistics.isEmpty()) {
// The column index and statistics should be invalid if some page statistics are null or empty.
// See PARQUET-2365 for more details
currentStatistics = Statistics.getBuilderForReading(currentChunkType).build();
columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder();
} else if (currentStatistics == null) {
// Copying the statistics if it is not initialized yet, so we have the correct typed one
currentStatistics = statistics.copy();
columnIndexBuilder.add(statistics);
} else {
currentStatistics.mergeStatistics(statistics);
columnIndexBuilder.add(statistics);
}
}

private static void serializeOffsetIndexes(
List<List<OffsetIndex>> offsetIndexes,
List<BlockMetaData> blocks,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ private void processChunk(ColumnChunkMetaData chunk,
DictionaryPage dictionaryPage = null;
long readValues = 0;
Statistics<?> statistics = null;
boolean isColumnStatisticsMalformed = false;
ParquetMetadataConverter converter = new ParquetMetadataConverter();
int pageOrdinal = 0;
long totalChunkValues = chunk.getValueCount();
Expand Down Expand Up @@ -439,6 +440,14 @@ private void processChunk(ColumnChunkMetaData chunk,
dataPageAAD);
statistics = convertStatistics(
originalCreatedBy, chunk.getPrimitiveType(), headerV1.getStatistics(), columnIndex, pageOrdinal, converter);
if (statistics == null) {
// Reach here means both the columnIndex and the page header statistics are null
isColumnStatisticsMalformed = true;
} else {
Preconditions.checkState(
!isColumnStatisticsMalformed,
"Detected mixed null page statistics and non-null page statistics");
}
readValues += headerV1.getNum_values();
if (offsetIndex != null) {
long rowCount = 1 + offsetIndex.getLastRowIndex(
Expand Down Expand Up @@ -490,6 +499,14 @@ private void processChunk(ColumnChunkMetaData chunk,
dataPageAAD);
statistics = convertStatistics(
originalCreatedBy, chunk.getPrimitiveType(), headerV2.getStatistics(), columnIndex, pageOrdinal, converter);
if (statistics == null) {
// Reach here means both the columnIndex and the page header statistics are null
isColumnStatisticsMalformed = true;
} else {
Preconditions.checkState(
!isColumnStatisticsMalformed,
"Detected mixed null page statistics and non-null page statistics");
}
readValues += headerV2.getNum_values();
writer.writeDataPageV2(headerV2.getNum_rows(),
headerV2.getNum_nulls(),
Expand All @@ -509,6 +526,11 @@ private void processChunk(ColumnChunkMetaData chunk,
break;
}
}

if (isColumnStatisticsMalformed) {
// All the column statistics are invalid, so we need to overwrite the column statistics
writer.invalidateStatistics(chunk.getStatistics());
}
}

private Statistics<?> convertStatistics(String createdBy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@
import java.util.stream.Collectors;

import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;
import static org.apache.parquet.schema.Type.Repetition.REPEATED;
Expand Down Expand Up @@ -127,17 +129,7 @@ private void testPruneSingleColumnTranslateCodec(List<Path> inputPaths) throws E
rewriter.close();

// Verify the schema are not changed for the columns not pruned
ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new Path(outputFile), ParquetMetadataConverter.NO_FILTER);
MessageType schema = pmd.getFileMetaData().getSchema();
List<Type> fields = schema.getFields();
assertEquals(fields.size(), 3);
assertEquals(fields.get(0).getName(), "DocId");
assertEquals(fields.get(1).getName(), "Name");
assertEquals(fields.get(2).getName(), "Links");
List<Type> subFields = fields.get(2).asGroupType().getFields();
assertEquals(subFields.size(), 2);
assertEquals(subFields.get(0).getName(), "Backward");
assertEquals(subFields.get(1).getName(), "Forward");
validateSchema();

// Verify codec has been translated
verifyCodec(outputFile, new HashSet<CompressionCodecName>() {{
Expand Down Expand Up @@ -199,17 +191,7 @@ private void testPruneNullifyTranslateCodec(List<Path> inputPaths) throws Except
rewriter.close();

// Verify the schema are not changed for the columns not pruned
ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new Path(outputFile), ParquetMetadataConverter.NO_FILTER);
MessageType schema = pmd.getFileMetaData().getSchema();
List<Type> fields = schema.getFields();
assertEquals(fields.size(), 3);
assertEquals(fields.get(0).getName(), "DocId");
assertEquals(fields.get(1).getName(), "Name");
assertEquals(fields.get(2).getName(), "Links");
List<Type> subFields = fields.get(2).asGroupType().getFields();
assertEquals(subFields.size(), 2);
assertEquals(subFields.get(0).getName(), "Backward");
assertEquals(subFields.get(1).getName(), "Forward");
validateSchema();

// Verify codec has been translated
verifyCodec(outputFile, new HashSet<CompressionCodecName>() {{
Expand Down Expand Up @@ -276,17 +258,7 @@ private void testPruneEncryptTranslateCodec(List<Path> inputPaths) throws Except
rewriter.close();

// Verify the schema are not changed for the columns not pruned
ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new Path(outputFile), ParquetMetadataConverter.NO_FILTER);
MessageType schema = pmd.getFileMetaData().getSchema();
List<Type> fields = schema.getFields();
assertEquals(fields.size(), 3);
assertEquals(fields.get(0).getName(), "DocId");
assertEquals(fields.get(1).getName(), "Name");
assertEquals(fields.get(2).getName(), "Links");
List<Type> subFields = fields.get(2).asGroupType().getFields();
assertEquals(subFields.size(), 2);
assertEquals(subFields.get(0).getName(), "Backward");
assertEquals(subFields.get(1).getName(), "Forward");
validateSchema();

// Verify codec has been translated
FileDecryptionProperties fileDecryptionProperties = EncDecProperties.getFileDecryptionProperties();
Expand Down Expand Up @@ -672,6 +644,8 @@ private MessageType createSchema() {
new PrimitiveType(OPTIONAL, INT64, "DocId"),
new PrimitiveType(REQUIRED, BINARY, "Name"),
new PrimitiveType(OPTIONAL, BINARY, "Gender"),
new PrimitiveType(REPEATED, FLOAT, "FloatFraction"),
new PrimitiveType(OPTIONAL, DOUBLE, "DoubleFraction"),
new GroupType(OPTIONAL, "Links",
new PrimitiveType(REPEATED, BINARY, "Backward"),
new PrimitiveType(REPEATED, BINARY, "Forward")));
Expand Down Expand Up @@ -713,6 +687,16 @@ private void validateColumnData(Set<String> prunePaths,
expectGroup.getBinary("Gender", 0).getBytes());
}

if (!prunePaths.contains("FloatFraction") && !nullifiedPaths.contains("FloatFraction")) {
assertEquals(group.getFloat("FloatFraction", 0),
expectGroup.getFloat("FloatFraction", 0), 0);
}

if (!prunePaths.contains("DoubleFraction") && !nullifiedPaths.contains("DoubleFraction")) {
assertEquals(group.getDouble("DoubleFraction", 0),
expectGroup.getDouble("DoubleFraction", 0), 0);
}

Group subGroup = group.getGroup("Links", 0);

if (!prunePaths.contains("Links.Backward") && !nullifiedPaths.contains("Links.Backward")) {
Expand Down Expand Up @@ -949,4 +933,20 @@ private Map<ColumnPath, List<BloomFilter>> allBloomFilters(

return allBloomFilters;
}

private void validateSchema() throws IOException {
ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new Path(outputFile), ParquetMetadataConverter.NO_FILTER);
MessageType schema = pmd.getFileMetaData().getSchema();
List<Type> fields = schema.getFields();
assertEquals(fields.size(), 5);
assertEquals(fields.get(0).getName(), "DocId");
assertEquals(fields.get(1).getName(), "Name");
assertEquals(fields.get(2).getName(), "FloatFraction");
assertEquals(fields.get(3).getName(), "DoubleFraction");
assertEquals(fields.get(4).getName(), "Links");
List<Type> subFields = fields.get(4).asGroupType().getFields();
assertEquals(subFields.size(), 2);
assertEquals(subFields.get(0).getName(), "Backward");
assertEquals(subFields.get(1).getName(), "Forward");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import java.util.concurrent.ThreadLocalRandom;

import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;

Expand Down Expand Up @@ -175,7 +177,13 @@ else if (primitiveType.getPrimitiveTypeName().equals(INT64)) {
else if (primitiveType.getPrimitiveTypeName().equals(BINARY)) {
g.add(type.getName(), getString());
}
// Only support 3 types now, more can be added later
else if (primitiveType.getPrimitiveTypeName().equals(FLOAT)) {
g.add(type.getName(), getFloat());
}
else if (primitiveType.getPrimitiveTypeName().equals(DOUBLE)) {
g.add(type.getName(), getDouble());
}
// Only support 5 types now, more can be added later
}
else {
GroupType groupType = (GroupType) type;
Expand Down Expand Up @@ -206,6 +214,22 @@ private static String getString()
return sb.toString();
}

private static float getFloat()
{
if (ThreadLocalRandom.current().nextBoolean()) {
return Float.NaN;
}
return ThreadLocalRandom.current().nextFloat();
}

private static double getDouble()
{
if (ThreadLocalRandom.current().nextBoolean()) {
return Double.NaN;
}
return ThreadLocalRandom.current().nextDouble();
}

public static String createTempFile(String prefix)
{
try {
Expand Down

0 comments on commit ff36d6b

Please sign in to comment.