Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PARQUET-2365 : Fixes NPE when rewriting column without column index #1173

Merged
merged 6 commits into from
Nov 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -211,14 +211,7 @@ public void writePage(BytesInput bytes,
this.totalValueCount += valueCount;
this.pageCount += 1;

// Copying the statistics if it is not initialized yet so we have the correct typed one
if (totalStatistics == null) {
totalStatistics = statistics.copy();
} else {
totalStatistics.mergeStatistics(statistics);
}

columnIndexBuilder.add(statistics);
mergeColumnStatistics(statistics);
offsetIndexBuilder.add(toIntWithCheck(tempOutputStream.size() + compressedSize), rowCount);

// by concatenating before collecting instead of collecting twice,
Expand Down Expand Up @@ -298,14 +291,7 @@ public void writePageV2(
this.totalValueCount += valueCount;
this.pageCount += 1;

// Copying the statistics if it is not initialized yet so we have the correct typed one
if (totalStatistics == null) {
totalStatistics = statistics.copy();
} else {
totalStatistics.mergeStatistics(statistics);
}

columnIndexBuilder.add(statistics);
mergeColumnStatistics(statistics);
offsetIndexBuilder.add(toIntWithCheck((long) tempOutputStream.size() + compressedSize), rowCount);

// by concatenating before collecting instead of collecting twice,
Expand All @@ -329,6 +315,26 @@ private int toIntWithCheck(long size) {
return (int)size;
}

private void mergeColumnStatistics(Statistics<?> statistics) {
if (totalStatistics != null && totalStatistics.isEmpty()) {
return;
}

if (statistics == null || statistics.isEmpty()) {
// The column index and statistics should be invalid if some page statistics are null or empty.
// See PARQUET-2365 for more details
totalStatistics = Statistics.getBuilderForReading(path.getPrimitiveType()).build();
columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder();
} else if (totalStatistics == null) {
// Copying the statistics if it is not initialized yet, so we have the correct typed one
totalStatistics = statistics.copy();
columnIndexBuilder.add(statistics);
} else {
totalStatistics.mergeStatistics(statistics);
columnIndexBuilder.add(statistics);
}
}

@Override
public long getMemSize() {
return buf.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -722,14 +722,7 @@ public void writeDataPage(
LOG.debug("{}: write data page content {}", out.getPos(), compressedPageSize);
bytes.writeAllTo(out);

// Copying the statistics if it is not initialized yet so we have the correct typed one
if (currentStatistics == null) {
currentStatistics = statistics.copy();
} else {
currentStatistics.mergeStatistics(statistics);
}

columnIndexBuilder.add(statistics);
mergeColumnStatistics(statistics);
ConeyLiu marked this conversation as resolved.
Show resolved Hide resolved

encodingStatsBuilder.addDataEncoding(valuesEncoding);
currentEncodings.add(rlEncoding);
Expand Down Expand Up @@ -867,13 +860,8 @@ public void writeDataPageV2(
this.uncompressedLength += uncompressedSize + headersSize;
this.compressedLength += compressedSize + headersSize;

if (currentStatistics == null) {
currentStatistics = statistics.copy();
} else {
currentStatistics.mergeStatistics(statistics);
}
mergeColumnStatistics(statistics);

columnIndexBuilder.add(statistics);
currentEncodings.add(dataEncoding);
encodingStatsBuilder.addDataEncoding(dataEncoding);

Expand Down Expand Up @@ -988,6 +976,19 @@ void writeColumnChunk(ColumnDescriptor descriptor,
endColumn();
}

/**
* Overwrite the column total statistics. This special used when the column total statistics
* is known while all the page statistics are invalid, for example when rewriting the column.
*
* @param totalStatistics the column total statistics
*/
public void invalidateStatistics(Statistics<?> totalStatistics) {
Preconditions.checkArgument(totalStatistics != null, "Column total statistics can not be null");
currentStatistics = totalStatistics;
// Invalid the ColumnIndex
columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder();
wgtmac marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* end a column (once all rep, def and data have been written)
* @throws IOException if there is an error while writing
Expand Down Expand Up @@ -1317,6 +1318,26 @@ private int toIntWithCheck(long size) {
return (int)size;
}

private void mergeColumnStatistics(Statistics<?> statistics) {
if (currentStatistics != null && currentStatistics.isEmpty()) {
return;
}

if (statistics == null || statistics.isEmpty()) {
// The column index and statistics should be invalid if some page statistics are null or empty.
// See PARQUET-2365 for more details
currentStatistics = Statistics.getBuilderForReading(currentChunkType).build();
columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder();
} else if (currentStatistics == null) {
// Copying the statistics if it is not initialized yet, so we have the correct typed one
currentStatistics = statistics.copy();
columnIndexBuilder.add(statistics);
} else {
currentStatistics.mergeStatistics(statistics);
columnIndexBuilder.add(statistics);
}
}

private static void serializeOffsetIndexes(
List<List<OffsetIndex>> offsetIndexes,
List<BlockMetaData> blocks,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ private void processChunk(ColumnChunkMetaData chunk,
DictionaryPage dictionaryPage = null;
long readValues = 0;
Statistics<?> statistics = null;
boolean isColumnStatisticsMalformed = false;
ParquetMetadataConverter converter = new ParquetMetadataConverter();
int pageOrdinal = 0;
long totalChunkValues = chunk.getValueCount();
Expand Down Expand Up @@ -439,6 +440,14 @@ private void processChunk(ColumnChunkMetaData chunk,
dataPageAAD);
statistics = convertStatistics(
originalCreatedBy, chunk.getPrimitiveType(), headerV1.getStatistics(), columnIndex, pageOrdinal, converter);
if (statistics == null) {
// Reach here means both the columnIndex and the page header statistics are null
isColumnStatisticsMalformed = true;
} else {
Preconditions.checkState(
!isColumnStatisticsMalformed,
"Detected mixed null page statistics and non-null page statistics");
}
readValues += headerV1.getNum_values();
if (offsetIndex != null) {
long rowCount = 1 + offsetIndex.getLastRowIndex(
Expand Down Expand Up @@ -490,6 +499,14 @@ private void processChunk(ColumnChunkMetaData chunk,
dataPageAAD);
statistics = convertStatistics(
originalCreatedBy, chunk.getPrimitiveType(), headerV2.getStatistics(), columnIndex, pageOrdinal, converter);
if (statistics == null) {
// Reach here means both the columnIndex and the page header statistics are null
isColumnStatisticsMalformed = true;
} else {
Preconditions.checkState(
!isColumnStatisticsMalformed,
"Detected mixed null page statistics and non-null page statistics");
}
readValues += headerV2.getNum_values();
writer.writeDataPageV2(headerV2.getNum_rows(),
headerV2.getNum_nulls(),
Expand All @@ -509,6 +526,11 @@ private void processChunk(ColumnChunkMetaData chunk,
break;
}
}

if (isColumnStatisticsMalformed) {
// All the column statistics are invalid, so we need to overwrite the column statistics
writer.invalidateStatistics(chunk.getStatistics());
}
}

private Statistics<?> convertStatistics(String createdBy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@
import java.util.stream.Collectors;

import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;
import static org.apache.parquet.schema.Type.Repetition.REPEATED;
Expand Down Expand Up @@ -127,17 +129,7 @@ private void testPruneSingleColumnTranslateCodec(List<Path> inputPaths) throws E
rewriter.close();

// Verify the schema are not changed for the columns not pruned
ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new Path(outputFile), ParquetMetadataConverter.NO_FILTER);
MessageType schema = pmd.getFileMetaData().getSchema();
List<Type> fields = schema.getFields();
assertEquals(fields.size(), 3);
assertEquals(fields.get(0).getName(), "DocId");
assertEquals(fields.get(1).getName(), "Name");
assertEquals(fields.get(2).getName(), "Links");
List<Type> subFields = fields.get(2).asGroupType().getFields();
assertEquals(subFields.size(), 2);
assertEquals(subFields.get(0).getName(), "Backward");
assertEquals(subFields.get(1).getName(), "Forward");
validateSchema();

// Verify codec has been translated
verifyCodec(outputFile, new HashSet<CompressionCodecName>() {{
Expand Down Expand Up @@ -199,17 +191,7 @@ private void testPruneNullifyTranslateCodec(List<Path> inputPaths) throws Except
rewriter.close();

// Verify the schema are not changed for the columns not pruned
ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new Path(outputFile), ParquetMetadataConverter.NO_FILTER);
MessageType schema = pmd.getFileMetaData().getSchema();
List<Type> fields = schema.getFields();
assertEquals(fields.size(), 3);
assertEquals(fields.get(0).getName(), "DocId");
assertEquals(fields.get(1).getName(), "Name");
assertEquals(fields.get(2).getName(), "Links");
List<Type> subFields = fields.get(2).asGroupType().getFields();
assertEquals(subFields.size(), 2);
assertEquals(subFields.get(0).getName(), "Backward");
assertEquals(subFields.get(1).getName(), "Forward");
validateSchema();

// Verify codec has been translated
verifyCodec(outputFile, new HashSet<CompressionCodecName>() {{
Expand Down Expand Up @@ -276,17 +258,7 @@ private void testPruneEncryptTranslateCodec(List<Path> inputPaths) throws Except
rewriter.close();

// Verify the schema are not changed for the columns not pruned
ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new Path(outputFile), ParquetMetadataConverter.NO_FILTER);
MessageType schema = pmd.getFileMetaData().getSchema();
List<Type> fields = schema.getFields();
assertEquals(fields.size(), 3);
assertEquals(fields.get(0).getName(), "DocId");
assertEquals(fields.get(1).getName(), "Name");
assertEquals(fields.get(2).getName(), "Links");
List<Type> subFields = fields.get(2).asGroupType().getFields();
assertEquals(subFields.size(), 2);
assertEquals(subFields.get(0).getName(), "Backward");
assertEquals(subFields.get(1).getName(), "Forward");
validateSchema();

// Verify codec has been translated
FileDecryptionProperties fileDecryptionProperties = EncDecProperties.getFileDecryptionProperties();
Expand Down Expand Up @@ -672,6 +644,8 @@ private MessageType createSchema() {
new PrimitiveType(OPTIONAL, INT64, "DocId"),
new PrimitiveType(REQUIRED, BINARY, "Name"),
new PrimitiveType(OPTIONAL, BINARY, "Gender"),
new PrimitiveType(REPEATED, FLOAT, "FloatFraction"),
new PrimitiveType(OPTIONAL, DOUBLE, "DoubleFraction"),
new GroupType(OPTIONAL, "Links",
new PrimitiveType(REPEATED, BINARY, "Backward"),
new PrimitiveType(REPEATED, BINARY, "Forward")));
Expand Down Expand Up @@ -713,6 +687,16 @@ private void validateColumnData(Set<String> prunePaths,
expectGroup.getBinary("Gender", 0).getBytes());
}

if (!prunePaths.contains("FloatFraction") && !nullifiedPaths.contains("FloatFraction")) {
assertEquals(group.getFloat("FloatFraction", 0),
expectGroup.getFloat("FloatFraction", 0), 0);
}

if (!prunePaths.contains("DoubleFraction") && !nullifiedPaths.contains("DoubleFraction")) {
assertEquals(group.getDouble("DoubleFraction", 0),
expectGroup.getDouble("DoubleFraction", 0), 0);
}

Group subGroup = group.getGroup("Links", 0);

if (!prunePaths.contains("Links.Backward") && !nullifiedPaths.contains("Links.Backward")) {
Expand Down Expand Up @@ -949,4 +933,20 @@ private Map<ColumnPath, List<BloomFilter>> allBloomFilters(

return allBloomFilters;
}

private void validateSchema() throws IOException {
ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new Path(outputFile), ParquetMetadataConverter.NO_FILTER);
MessageType schema = pmd.getFileMetaData().getSchema();
List<Type> fields = schema.getFields();
assertEquals(fields.size(), 5);
assertEquals(fields.get(0).getName(), "DocId");
assertEquals(fields.get(1).getName(), "Name");
assertEquals(fields.get(2).getName(), "FloatFraction");
assertEquals(fields.get(3).getName(), "DoubleFraction");
assertEquals(fields.get(4).getName(), "Links");
List<Type> subFields = fields.get(4).asGroupType().getFields();
assertEquals(subFields.size(), 2);
assertEquals(subFields.get(0).getName(), "Backward");
assertEquals(subFields.get(1).getName(), "Forward");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import java.util.concurrent.ThreadLocalRandom;

import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;

Expand Down Expand Up @@ -175,7 +177,13 @@ else if (primitiveType.getPrimitiveTypeName().equals(INT64)) {
else if (primitiveType.getPrimitiveTypeName().equals(BINARY)) {
g.add(type.getName(), getString());
}
// Only support 3 types now, more can be added later
else if (primitiveType.getPrimitiveTypeName().equals(FLOAT)) {
g.add(type.getName(), getFloat());
}
else if (primitiveType.getPrimitiveTypeName().equals(DOUBLE)) {
g.add(type.getName(), getDouble());
}
// Only support 5 types now, more can be added later
}
else {
GroupType groupType = (GroupType) type;
Expand Down Expand Up @@ -206,6 +214,22 @@ private static String getString()
return sb.toString();
}

private static float getFloat()
{
if (ThreadLocalRandom.current().nextBoolean()) {
return Float.NaN;
}
return ThreadLocalRandom.current().nextFloat();
}

private static double getDouble()
{
if (ThreadLocalRandom.current().nextBoolean()) {
return Double.NaN;
}
return ThreadLocalRandom.current().nextDouble();
}

public static String createTempFile(String prefix)
{
try {
Expand Down