Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ConeyLiu committed Oct 27, 2023
1 parent 73e8a94 commit 94b0cd1
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -451,16 +451,13 @@ int sizeOf(Object value) {
}
};

private PrimitiveType type;
private final BooleanList nullPages = new BooleanArrayList();
private final LongList nullCounts = new LongArrayList();
private final IntList pageIndexes = new IntArrayList();

private PrimitiveType type;
private long minMaxSize;
private final IntList pageIndexes = new IntArrayList();
private int nextPageIndex;

protected boolean invalid;

/**
* @return a no-op builder that does not collect statistics objects and therefore returns {@code null} at
* {@link #build()}.
Expand Down Expand Up @@ -546,11 +543,6 @@ public static ColumnIndex build(
* the statistics to be added
*/
public void add(Statistics<?> stats) {
if (stats.isEmpty()) {
invalid = true;
return;
}

if (stats.hasNonNullValue()) {
nullPages.add(false);
Object min = stats.genericGetMin();
Expand Down Expand Up @@ -611,7 +603,7 @@ public ColumnIndex build() {
}

private ColumnIndexBase<?> build(PrimitiveType type) {
if (nullPages.isEmpty() || invalid) {
if (nullPages.isEmpty()) {
return null;
}
ColumnIndexBase<?> columnIndex = createColumnIndex(type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ int compareValueToMax(int arrayIndex) {

private final DoubleList minValues = new DoubleArrayList();
private final DoubleList maxValues = new DoubleArrayList();
private boolean invalid;

private static double convert(ByteBuffer buffer) {
return buffer.order(LITTLE_ENDIAN).getDouble(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ int compareValueToMax(int arrayIndex) {

private final FloatList minValues = new FloatArrayList();
private final FloatList maxValues = new FloatArrayList();
private boolean invalid;

private static float convert(ByteBuffer buffer) {
return buffer.order(LITTLE_ENDIAN).getFloat(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ public static enum Mode {
private long uncompressedLength;
private long compressedLength;
private Statistics<?> currentStatistics; // accumulated in writePage(s)
private boolean currentStatisticsAreValid;
private ColumnIndexBuilder columnIndexBuilder;
private OffsetIndexBuilder offsetIndexBuilder;

Expand Down Expand Up @@ -474,6 +475,7 @@ public void startColumn(ColumnDescriptor descriptor,
uncompressedLength = 0;
// The statistics will be copied from the first one added at writeDataPage(s) so we have the correct typed one
currentStatistics = null;
currentStatisticsAreValid = true;

columnIndexBuilder = ColumnIndexBuilder.getBuilder(currentChunkType, columnIndexTruncateLength);
offsetIndexBuilder = OffsetIndexBuilder.getBuilder();
Expand Down Expand Up @@ -722,14 +724,7 @@ public void writeDataPage(
LOG.debug("{}: write data page content {}", out.getPos(), compressedPageSize);
bytes.writeAllTo(out);

// Copying the statistics if it is not initialized yet so we have the correct typed one
if (currentStatistics == null) {
currentStatistics = statistics.copy();
} else {
currentStatistics.mergeStatistics(statistics);
}

columnIndexBuilder.add(statistics);
mergeColumnStatistics(statistics);

encodingStatsBuilder.addDataEncoding(valuesEncoding);
currentEncodings.add(rlEncoding);
Expand Down Expand Up @@ -867,13 +862,8 @@ public void writeDataPageV2(
this.uncompressedLength += uncompressedSize + headersSize;
this.compressedLength += compressedSize + headersSize;

if (currentStatistics == null) {
currentStatistics = statistics.copy();
} else {
currentStatistics.mergeStatistics(statistics);
}
mergeColumnStatistics(statistics);

columnIndexBuilder.add(statistics);
currentEncodings.add(dataEncoding);
encodingStatsBuilder.addDataEncoding(dataEncoding);

Expand Down Expand Up @@ -1000,6 +990,7 @@ public void endColumn(Statistics<?> totalStatistics) throws IOException {
currentStatistics = totalStatistics;
// Invalid the ColumnIndex
columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder();
currentStatisticsAreValid = true;
endColumn();
}

Expand All @@ -1010,6 +1001,7 @@ public void endColumn(Statistics<?> totalStatistics) throws IOException {
public void endColumn() throws IOException {
state = state.endColumn();
LOG.debug("{}: end column", out.getPos());
Preconditions.checkState(currentStatisticsAreValid, "Column statistics should be valid");
if (columnIndexBuilder.getMinMaxSize() > columnIndexBuilder.getPageCount() * MAX_STATS_SIZE) {
currentColumnIndexes.add(null);
} else {
Expand Down Expand Up @@ -1332,6 +1324,27 @@ private int toIntWithCheck(long size) {
return (int)size;
}

private void mergeColumnStatistics(Statistics<?> statistics) {
if (!currentStatisticsAreValid) {
return;
}

if (statistics == null) {
// The column index should be invalid if some page statistics are null.
// See PARQUET-2365 for more details
currentStatistics = null;
currentStatisticsAreValid = false;
columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder();
} else if (currentStatistics == null) {
// Copying the statistics if it is not initialized yet so we have the correct typed one
currentStatistics = statistics.copy();
columnIndexBuilder.add(statistics);
} else {
currentStatistics.mergeStatistics(statistics);
columnIndexBuilder.add(statistics);
}
}

private static void serializeOffsetIndexes(
List<List<OffsetIndex>> offsetIndexes,
List<BlockMetaData> blocks,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ private void processBlocksFromReader() throws IOException {
writer.startColumn(descriptor, crStore.getColumnReader(descriptor).getTotalValueCount(), newCodecName);
boolean needOverwriteStatistics = processChunk(chunk, newCodecName, columnChunkEncryptorRunTime, encryptColumn);
if (needOverwriteStatistics) {
// All the page statistics are invalid, so we need to overwrite the column statistics
// All the column statistics are invalid, so we need to overwrite the column statistics
writer.endColumn(chunk.getStatistics());
} else {
writer.endColumn();
Expand Down Expand Up @@ -385,7 +385,7 @@ private boolean processChunk(ColumnChunkMetaData chunk,
DictionaryPage dictionaryPage = null;
long readValues = 0;
Statistics<?> statistics = null;
Statistics<?> emptyStatistics = Statistics.getBuilderForReading(chunk.getPrimitiveType()).build();
boolean needOverwriteColumnStatistics = false;
ParquetMetadataConverter converter = new ParquetMetadataConverter();
int pageOrdinal = 0;
long totalChunkValues = chunk.getValueCount();
Expand Down Expand Up @@ -431,28 +431,24 @@ private boolean processChunk(ColumnChunkMetaData chunk,
encryptColumn,
dataEncryptor,
dataPageAAD);
Statistics<?> v1PageStatistics = convertStatistics(
statistics = convertStatistics(
originalCreatedBy, chunk.getPrimitiveType(), headerV1.getStatistics(), columnIndex, pageOrdinal, converter);
if (v1PageStatistics == null) {
if (statistics == null) {
// Reach here means both the columnIndex and the page header statistics are null
if (statistics != null) {
// Mixed null page statistics and non-null page statistics is not allowed
throw new IOException("Detected mixed null page statistics and non-null page statistics");
}
// Pass an empty page statistics to writer and overwrite the column statistics in the end
v1PageStatistics = emptyStatistics;
needOverwriteColumnStatistics = true;
} else {
statistics = v1PageStatistics;
Preconditions.checkState(
!needOverwriteColumnStatistics,
"Detected mixed null page statistics and non-null page statistics");
}

readValues += headerV1.getNum_values();
if (offsetIndex != null) {
long rowCount = 1 + offsetIndex.getLastRowIndex(
pageOrdinal, totalChunkValues) - offsetIndex.getFirstRowIndex(pageOrdinal);
writer.writeDataPage(toIntWithCheck(headerV1.getNum_values()),
pageHeader.getUncompressed_page_size(),
BytesInput.from(pageLoad),
v1PageStatistics,
statistics,
toIntWithCheck(rowCount),
converter.getEncoding(headerV1.getRepetition_level_encoding()),
converter.getEncoding(headerV1.getDefinition_level_encoding()),
Expand All @@ -463,7 +459,7 @@ private boolean processChunk(ColumnChunkMetaData chunk,
writer.writeDataPage(toIntWithCheck(headerV1.getNum_values()),
pageHeader.getUncompressed_page_size(),
BytesInput.from(pageLoad),
v1PageStatistics,
statistics,
converter.getEncoding(headerV1.getRepetition_level_encoding()),
converter.getEncoding(headerV1.getDefinition_level_encoding()),
converter.getEncoding(headerV1.getEncoding()),
Expand Down Expand Up @@ -494,18 +490,15 @@ private boolean processChunk(ColumnChunkMetaData chunk,
encryptColumn,
dataEncryptor,
dataPageAAD);
Statistics<?> v2PageStatistics = convertStatistics(
statistics = convertStatistics(
originalCreatedBy, chunk.getPrimitiveType(), headerV2.getStatistics(), columnIndex, pageOrdinal, converter);
if (v2PageStatistics == null) {
if (statistics == null) {
// Reach here means both the columnIndex and the page header statistics are null
if (statistics != null) {
// Mixed null page statistics and non-null page statistics is not allowed
throw new IOException("Detected mixed null page statistics and non-null page statistics");
}
// Pass an empty page statistics to writer and overwrite the column statistics in the end
v2PageStatistics = emptyStatistics;
needOverwriteColumnStatistics = true;
} else {
statistics = v2PageStatistics;
Preconditions.checkState(
!needOverwriteColumnStatistics,
"Detected mixed null page statistics and non-null page statistics");
}
readValues += headerV2.getNum_values();
writer.writeDataPageV2(headerV2.getNum_rows(),
Expand All @@ -516,7 +509,7 @@ private boolean processChunk(ColumnChunkMetaData chunk,
converter.getEncoding(headerV2.getEncoding()),
BytesInput.from(pageLoad),
rawDataLength,
v2PageStatistics,
statistics,
metaEncryptor,
dataPageHeaderAAD);
pageOrdinal++;
Expand All @@ -527,7 +520,7 @@ private boolean processChunk(ColumnChunkMetaData chunk,
}
}

return statistics == null;
return needOverwriteColumnStatistics;
}

private Statistics<?> convertStatistics(String createdBy,
Expand Down

0 comments on commit 94b0cd1

Please sign in to comment.