Skip to content

Commit

Permalink
PARQUET-2261: Implement SizeStatistics
Browse files Browse the repository at this point in the history
  • Loading branch information
wgtmac committed Dec 9, 2023
1 parent c061d54 commit eb7a1a5
Show file tree
Hide file tree
Showing 21 changed files with 1,390 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.page.PageWriter;
import org.apache.parquet.column.statistics.SizeStatistics;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.column.values.bloomfilter.AdaptiveBlockSplitBloomFilter;
Expand Down Expand Up @@ -55,6 +56,7 @@ abstract class ColumnWriterBase implements ColumnWriter {
private int valueCount;

private Statistics<?> statistics;
private SizeStatistics.Builder sizeStatisticsBuilder;
private long rowsWrittenSoFar = 0;
private int pageRowCount;

Expand Down Expand Up @@ -112,6 +114,8 @@ private void log(Object value, int r, int d) {

private void resetStatistics() {
this.statistics = Statistics.createStats(path.getPrimitiveType());
this.sizeStatisticsBuilder = new SizeStatistics.Builder(
path.getPrimitiveType(), path.getMaxRepetitionLevel(), path.getMaxDefinitionLevel());
}

private void definitionLevel(int definitionLevel) {
Expand All @@ -138,6 +142,7 @@ public void writeNull(int repetitionLevel, int definitionLevel) {
repetitionLevel(repetitionLevel);
definitionLevel(definitionLevel);
statistics.incrementNumNulls();
sizeStatisticsBuilder.add(repetitionLevel, definitionLevel);
++valueCount;
}

Expand Down Expand Up @@ -201,6 +206,7 @@ public void write(double value, int repetitionLevel, int definitionLevel) {
definitionLevel(definitionLevel);
dataColumn.writeDouble(value);
statistics.updateStats(value);
sizeStatisticsBuilder.add(repetitionLevel, definitionLevel);
updateBloomFilter(value);
++valueCount;
}
Expand All @@ -219,6 +225,7 @@ public void write(float value, int repetitionLevel, int definitionLevel) {
definitionLevel(definitionLevel);
dataColumn.writeFloat(value);
statistics.updateStats(value);
sizeStatisticsBuilder.add(repetitionLevel, definitionLevel);
updateBloomFilter(value);
++valueCount;
}
Expand All @@ -237,6 +244,7 @@ public void write(Binary value, int repetitionLevel, int definitionLevel) {
definitionLevel(definitionLevel);
dataColumn.writeBytes(value);
statistics.updateStats(value);
sizeStatisticsBuilder.add(repetitionLevel, definitionLevel, value);
updateBloomFilter(value);
++valueCount;
}
Expand All @@ -255,6 +263,7 @@ public void write(boolean value, int repetitionLevel, int definitionLevel) {
definitionLevel(definitionLevel);
dataColumn.writeBoolean(value);
statistics.updateStats(value);
sizeStatisticsBuilder.add(repetitionLevel, definitionLevel);
++valueCount;
}

Expand All @@ -272,6 +281,7 @@ public void write(int value, int repetitionLevel, int definitionLevel) {
definitionLevel(definitionLevel);
dataColumn.writeInteger(value);
statistics.updateStats(value);
sizeStatisticsBuilder.add(repetitionLevel, definitionLevel);
updateBloomFilter(value);
++valueCount;
}
Expand All @@ -290,6 +300,7 @@ public void write(long value, int repetitionLevel, int definitionLevel) {
definitionLevel(definitionLevel);
dataColumn.writeLong(value);
statistics.updateStats(value);
sizeStatisticsBuilder.add(repetitionLevel, definitionLevel);
updateBloomFilter(value);
++valueCount;
}
Expand Down Expand Up @@ -389,7 +400,15 @@ void writePage() {
this.rowsWrittenSoFar += pageRowCount;
if (DEBUG) LOG.debug("write page");
try {
writePage(pageRowCount, valueCount, statistics, repetitionLevelColumn, definitionLevelColumn, dataColumn);
SizeStatistics sizeStatistics = sizeStatisticsBuilder.build();
writePage(
pageRowCount,
valueCount,
statistics,
sizeStatistics,
repetitionLevelColumn,
definitionLevelColumn,
dataColumn);
} catch (IOException e) {
throw new ParquetEncodingException("could not write page for " + path, e);
}
Expand All @@ -401,6 +420,7 @@ void writePage() {
pageRowCount = 0;
}

@Deprecated
abstract void writePage(
int rowCount,
int valueCount,
Expand All @@ -409,4 +429,14 @@ abstract void writePage(
ValuesWriter definitionLevels,
ValuesWriter values)
throws IOException;

abstract void writePage(
int rowCount,
int valueCount,
Statistics<?> statistics,
SizeStatistics sizeStatistics,
ValuesWriter repetitionLevels,
ValuesWriter definitionLevels,
ValuesWriter values)
throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.page.PageWriter;
import org.apache.parquet.column.statistics.SizeStatistics;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
Expand Down Expand Up @@ -56,6 +57,7 @@ ValuesWriter createDLWriter(ParquetProperties props, ColumnDescriptor path) {
}

@Override
@Deprecated
void writePage(
int rowCount,
int valueCount,
Expand All @@ -64,11 +66,25 @@ void writePage(
ValuesWriter definitionLevels,
ValuesWriter values)
throws IOException {
writePage(rowCount, valueCount, statistics, null, repetitionLevels, definitionLevels, values);
}

@Override
void writePage(
int rowCount,
int valueCount,
Statistics<?> statistics,
SizeStatistics sizeStatistics,
ValuesWriter repetitionLevels,
ValuesWriter definitionLevels,
ValuesWriter values)
throws IOException {
pageWriter.writePage(
concat(repetitionLevels.getBytes(), definitionLevels.getBytes(), values.getBytes()),
valueCount,
rowCount,
statistics,
sizeStatistics,
repetitionLevels.getEncoding(),
definitionLevels.getEncoding(),
values.getEncoding());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.page.PageWriter;
import org.apache.parquet.column.statistics.SizeStatistics;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.column.values.bitpacking.DevNullValuesWriter;
Expand Down Expand Up @@ -82,6 +83,7 @@ ValuesWriter createDLWriter(ParquetProperties props, ColumnDescriptor path) {
}

@Override
@Deprecated
void writePage(
int rowCount,
int valueCount,
Expand All @@ -90,6 +92,19 @@ void writePage(
ValuesWriter definitionLevels,
ValuesWriter values)
throws IOException {
writePage(rowCount, valueCount, statistics, null, repetitionLevels, definitionLevels, values);
}

@Override
void writePage(
int rowCount,
int valueCount,
Statistics<?> statistics,
SizeStatistics sizeStatistics,
ValuesWriter repetitionLevels,
ValuesWriter definitionLevels,
ValuesWriter values)
throws IOException {
// TODO: rework this API. The bytes shall be retrieved before the encoding (encoding might be different
// otherwise)
BytesInput bytes = values.getBytes();
Expand All @@ -102,6 +117,7 @@ void writePage(
definitionLevels.getBytes(),
encoding,
bytes,
statistics);
statistics,
sizeStatistics);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.statistics.SizeStatistics;
import org.apache.parquet.column.statistics.Statistics;

/**
Expand Down Expand Up @@ -63,6 +64,7 @@ void writePage(
* @param valuesEncoding values encoding
* @throws IOException
*/
@Deprecated
void writePage(
BytesInput bytesInput,
int valueCount,
Expand All @@ -73,6 +75,29 @@ void writePage(
Encoding valuesEncoding)
throws IOException;

/**
* writes a single page
* @param bytesInput the bytes for the page
* @param valueCount the number of values in that page
* @param rowCount the number of rows in that page
* @param statistics the statistics for that page
* @param sizeStatistics the size statistics for that page
* @param rlEncoding repetition level encoding
* @param dlEncoding definition level encoding
* @param valuesEncoding values encoding
* @throws IOException
*/
void writePage(
BytesInput bytesInput,
int valueCount,
int rowCount,
Statistics<?> statistics,
SizeStatistics sizeStatistics,
Encoding rlEncoding,
Encoding dlEncoding,
Encoding valuesEncoding)
throws IOException;

/**
* writes a single page in the new format
*
Expand All @@ -86,6 +111,7 @@ void writePage(
* @param statistics optional stats for this page
* @throws IOException if there is an exception while writing page data
*/
@Deprecated
void writePageV2(
int rowCount,
int nullCount,
Expand All @@ -97,6 +123,31 @@ void writePageV2(
Statistics<?> statistics)
throws IOException;

/**
* writes a single page in the new format
* @param rowCount the number of rows in this page
* @param nullCount the number of null values (out of valueCount)
* @param valueCount the number of values in that page (there could be multiple values per row for repeated fields)
* @param repetitionLevels the repetition levels encoded in RLE without any size header
* @param definitionLevels the definition levels encoded in RLE without any size header
* @param dataEncoding the encoding for the data
* @param data the data encoded with dataEncoding
* @param statistics optional stats for this page
* @param sizeStatistics optional size stats for this page
* @throws IOException if there is an exception while writing page data
*/
void writePageV2(
int rowCount,
int nullCount,
int valueCount,
BytesInput repetitionLevels,
BytesInput definitionLevels,
Encoding dataEncoding,
BytesInput data,
Statistics<?> statistics,
SizeStatistics sizeStatistics)
throws IOException;

/**
* @return the current size used in the memory buffer for that column chunk
*/
Expand Down
Loading

0 comments on commit eb7a1a5

Please sign in to comment.