Skip to content

Commit

Permalink
PARQUET-2261: Implement SizeStatistics
Browse files Browse the repository at this point in the history
  • Loading branch information
wgtmac committed Nov 23, 2023
1 parent 452c94d commit 339d397
Show file tree
Hide file tree
Showing 21 changed files with 1,198 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.page.PageWriter;
import org.apache.parquet.column.statistics.SizeStatistics;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter;
Expand Down Expand Up @@ -56,6 +57,7 @@ abstract class ColumnWriterBase implements ColumnWriter {
private int valueCount;

private Statistics<?> statistics;
private SizeStatistics.Builder sizeStatisticsBuilder;
private long rowsWrittenSoFar = 0;
private int pageRowCount;

Expand Down Expand Up @@ -116,6 +118,8 @@ private void log(Object value, int r, int d) {

private void resetStatistics() {
this.statistics = Statistics.createStats(path.getPrimitiveType());
this.sizeStatisticsBuilder = new SizeStatistics.Builder(
path.getPrimitiveType(), path.getMaxRepetitionLevel(), path.getMaxDefinitionLevel());
}

private void definitionLevel(int definitionLevel) {
Expand Down Expand Up @@ -143,6 +147,7 @@ public void writeNull(int repetitionLevel, int definitionLevel) {
repetitionLevel(repetitionLevel);
definitionLevel(definitionLevel);
statistics.incrementNumNulls();
sizeStatisticsBuilder.add(repetitionLevel, definitionLevel);
++valueCount;
}

Expand Down Expand Up @@ -207,6 +212,7 @@ public void write(double value, int repetitionLevel, int definitionLevel) {
definitionLevel(definitionLevel);
dataColumn.writeDouble(value);
statistics.updateStats(value);
sizeStatisticsBuilder.add(repetitionLevel, definitionLevel);
updateBloomFilter(value);
++valueCount;
}
Expand All @@ -226,6 +232,7 @@ public void write(float value, int repetitionLevel, int definitionLevel) {
definitionLevel(definitionLevel);
dataColumn.writeFloat(value);
statistics.updateStats(value);
sizeStatisticsBuilder.add(repetitionLevel, definitionLevel);
updateBloomFilter(value);
++valueCount;
}
Expand All @@ -245,6 +252,7 @@ public void write(Binary value, int repetitionLevel, int definitionLevel) {
definitionLevel(definitionLevel);
dataColumn.writeBytes(value);
statistics.updateStats(value);
sizeStatisticsBuilder.add(repetitionLevel, definitionLevel, value);
updateBloomFilter(value);
++valueCount;
}
Expand All @@ -264,6 +272,7 @@ public void write(boolean value, int repetitionLevel, int definitionLevel) {
definitionLevel(definitionLevel);
dataColumn.writeBoolean(value);
statistics.updateStats(value);
sizeStatisticsBuilder.add(repetitionLevel, definitionLevel);
++valueCount;
}

Expand All @@ -282,6 +291,7 @@ public void write(int value, int repetitionLevel, int definitionLevel) {
definitionLevel(definitionLevel);
dataColumn.writeInteger(value);
statistics.updateStats(value);
sizeStatisticsBuilder.add(repetitionLevel, definitionLevel);
updateBloomFilter(value);
++valueCount;
}
Expand All @@ -301,6 +311,7 @@ public void write(long value, int repetitionLevel, int definitionLevel) {
definitionLevel(definitionLevel);
dataColumn.writeLong(value);
statistics.updateStats(value);
sizeStatisticsBuilder.add(repetitionLevel, definitionLevel);
updateBloomFilter(value);
++valueCount;
}
Expand Down Expand Up @@ -395,7 +406,8 @@ void writePage() {
if (DEBUG)
LOG.debug("write page");
try {
writePage(pageRowCount, valueCount, statistics, repetitionLevelColumn, definitionLevelColumn, dataColumn);
writePage(pageRowCount, valueCount, statistics, sizeStatisticsBuilder.build(),
repetitionLevelColumn, definitionLevelColumn, dataColumn);
} catch (IOException e) {
throw new ParquetEncodingException("could not write page for " + path, e);
}
Expand All @@ -407,6 +419,10 @@ void writePage() {
pageRowCount = 0;
}

@Deprecated
abstract void writePage(int rowCount, int valueCount, Statistics<?> statistics, ValuesWriter repetitionLevels,
ValuesWriter definitionLevels, ValuesWriter values) throws IOException;

abstract void writePage(int rowCount, int valueCount, Statistics<?> statistics, SizeStatistics sizeStatistics,
ValuesWriter repetitionLevels, ValuesWriter definitionLevels, ValuesWriter values) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.page.PageWriter;
import org.apache.parquet.column.statistics.SizeStatistics;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
Expand Down Expand Up @@ -54,13 +55,21 @@ ValuesWriter createDLWriter(ParquetProperties props, ColumnDescriptor path) {
}

@Override
@Deprecated
void writePage(int rowCount, int valueCount, Statistics<?> statistics, ValuesWriter repetitionLevels,
ValuesWriter definitionLevels, ValuesWriter values) throws IOException {
ValuesWriter definitionLevels, ValuesWriter values) throws IOException {
writePage(rowCount, valueCount, statistics, null, repetitionLevels, definitionLevels, values);
}

@Override
void writePage(int rowCount, int valueCount, Statistics<?> statistics, SizeStatistics sizeStatistics,
ValuesWriter repetitionLevels, ValuesWriter definitionLevels, ValuesWriter values) throws IOException {
pageWriter.writePage(
concat(repetitionLevels.getBytes(), definitionLevels.getBytes(), values.getBytes()),
valueCount,
rowCount,
statistics,
sizeStatistics,
repetitionLevels.getEncoding(),
definitionLevels.getEncoding(),
values.getEncoding());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.page.PageWriter;
import org.apache.parquet.column.statistics.SizeStatistics;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.column.values.bitpacking.DevNullValuesWriter;
Expand Down Expand Up @@ -76,8 +77,15 @@ ValuesWriter createDLWriter(ParquetProperties props, ColumnDescriptor path) {
}

@Override
@Deprecated
void writePage(int rowCount, int valueCount, Statistics<?> statistics, ValuesWriter repetitionLevels,
ValuesWriter definitionLevels, ValuesWriter values) throws IOException {
ValuesWriter definitionLevels, ValuesWriter values) throws IOException {
writePage(rowCount, valueCount, statistics, null, repetitionLevels, definitionLevels, values);
}

@Override
void writePage(int rowCount, int valueCount, Statistics<?> statistics, SizeStatistics sizeStatistics,
ValuesWriter repetitionLevels, ValuesWriter definitionLevels, ValuesWriter values) throws IOException {
// TODO: rework this API. The bytes shall be retrieved before the encoding (encoding might be different otherwise)
BytesInput bytes = values.getBytes();
Encoding encoding = values.getEncoding();
Expand All @@ -89,6 +97,7 @@ void writePage(int rowCount, int valueCount, Statistics<?> statistics, ValuesWri
definitionLevels.getBytes(),
encoding,
bytes,
statistics);
statistics,
sizeStatistics);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.statistics.SizeStatistics;
import org.apache.parquet.column.statistics.Statistics;

/**
Expand Down Expand Up @@ -55,7 +56,25 @@ public interface PageWriter {
* @param valuesEncoding values encoding
* @throws IOException
*/
void writePage(BytesInput bytesInput, int valueCount, int rowCount, Statistics<?> statistics, Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding) throws IOException;
@Deprecated
void writePage(BytesInput bytesInput, int valueCount, int rowCount, Statistics<?> statistics,
Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding) throws IOException;

/**
* writes a single page
* @param bytesInput the bytes for the page
* @param valueCount the number of values in that page
* @param rowCount the number of rows in that page
* @param statistics the statistics for that page
* @param sizeStatistics the size statistics for that page
* @param rlEncoding repetition level encoding
* @param dlEncoding definition level encoding
* @param valuesEncoding values encoding
* @throws IOException
*/
void writePage(BytesInput bytesInput, int valueCount, int rowCount, Statistics<?> statistics,
SizeStatistics sizeStatistics, Encoding rlEncoding, Encoding dlEncoding,
Encoding valuesEncoding) throws IOException;

/**
* writes a single page in the new format
Expand All @@ -69,13 +88,31 @@ public interface PageWriter {
* @param statistics optional stats for this page
* @throws IOException if there is an exception while writing page data
*/
@Deprecated
void writePageV2(
int rowCount, int nullCount, int valueCount,
BytesInput repetitionLevels, BytesInput definitionLevels,
Encoding dataEncoding,
BytesInput data,
Statistics<?> statistics) throws IOException;

/**
* writes a single page in the new format
* @param rowCount the number of rows in this page
* @param nullCount the number of null values (out of valueCount)
* @param valueCount the number of values in that page (there could be multiple values per row for repeated fields)
* @param repetitionLevels the repetition levels encoded in RLE without any size header
* @param definitionLevels the definition levels encoded in RLE without any size header
* @param dataEncoding the encoding for the data
* @param data the data encoded with dataEncoding
* @param statistics optional stats for this page
* @param sizeStatistics optional size stats for this page
* @throws IOException if there is an exception while writing page data
*/
void writePageV2(int rowCount, int nullCount, int valueCount, BytesInput repetitionLevels, BytesInput definitionLevels,
Encoding dataEncoding, BytesInput data, Statistics<?> statistics,
SizeStatistics sizeStatistics) throws IOException;

/**
* @return the current size used in the memory buffer for that column chunk
*/
Expand Down
Loading

0 comments on commit 339d397

Please sign in to comment.