diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java index f0ecb71ccd..6917e5e70b 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java @@ -27,6 +27,7 @@ import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.page.DictionaryPage; import org.apache.parquet.column.page.PageWriter; +import org.apache.parquet.column.statistics.SizeStatistics; import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.column.values.ValuesWriter; import org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter; @@ -56,6 +57,7 @@ abstract class ColumnWriterBase implements ColumnWriter { private int valueCount; private Statistics statistics; + private SizeStatistics.Builder sizeStatisticsBuilder; private long rowsWrittenSoFar = 0; private int pageRowCount; @@ -116,6 +118,8 @@ private void log(Object value, int r, int d) { private void resetStatistics() { this.statistics = Statistics.createStats(path.getPrimitiveType()); + this.sizeStatisticsBuilder = new SizeStatistics.Builder( + path.getPrimitiveType(), path.getMaxRepetitionLevel(), path.getMaxDefinitionLevel()); } private void definitionLevel(int definitionLevel) { @@ -143,6 +147,7 @@ public void writeNull(int repetitionLevel, int definitionLevel) { repetitionLevel(repetitionLevel); definitionLevel(definitionLevel); statistics.incrementNumNulls(); + sizeStatisticsBuilder.add(repetitionLevel, definitionLevel); ++valueCount; } @@ -207,6 +212,7 @@ public void write(double value, int repetitionLevel, int definitionLevel) { definitionLevel(definitionLevel); dataColumn.writeDouble(value); statistics.updateStats(value); + sizeStatisticsBuilder.add(repetitionLevel, definitionLevel); updateBloomFilter(value); ++valueCount; } @@ -226,6 +232,7 @@ public void write(float value, int repetitionLevel, int definitionLevel) { definitionLevel(definitionLevel); dataColumn.writeFloat(value); statistics.updateStats(value); + sizeStatisticsBuilder.add(repetitionLevel, definitionLevel); updateBloomFilter(value); ++valueCount; } @@ -245,6 +252,7 @@ public void write(Binary value, int repetitionLevel, int definitionLevel) { definitionLevel(definitionLevel); dataColumn.writeBytes(value); statistics.updateStats(value); + sizeStatisticsBuilder.add(repetitionLevel, definitionLevel, value); updateBloomFilter(value); ++valueCount; } @@ -264,6 +272,7 @@ public void write(boolean value, int repetitionLevel, int definitionLevel) { definitionLevel(definitionLevel); dataColumn.writeBoolean(value); statistics.updateStats(value); + sizeStatisticsBuilder.add(repetitionLevel, definitionLevel); ++valueCount; } @@ -282,6 +291,7 @@ public void write(int value, int repetitionLevel, int definitionLevel) { definitionLevel(definitionLevel); dataColumn.writeInteger(value); statistics.updateStats(value); + sizeStatisticsBuilder.add(repetitionLevel, definitionLevel); updateBloomFilter(value); ++valueCount; } @@ -301,6 +311,7 @@ public void write(long value, int repetitionLevel, int definitionLevel) { definitionLevel(definitionLevel); dataColumn.writeLong(value); statistics.updateStats(value); + sizeStatisticsBuilder.add(repetitionLevel, definitionLevel); updateBloomFilter(value); ++valueCount; } @@ -395,7 +406,8 @@ void writePage() { if (DEBUG) LOG.debug("write page"); try { - writePage(pageRowCount, valueCount, statistics, repetitionLevelColumn, definitionLevelColumn, dataColumn); + writePage(pageRowCount, valueCount, statistics, sizeStatisticsBuilder.build(), + repetitionLevelColumn, definitionLevelColumn, dataColumn); } catch (IOException e) { throw new ParquetEncodingException("could not write page for " + path, e); } @@ -407,6 +419,10 @@ void writePage() { pageRowCount = 0; } + @Deprecated abstract void writePage(int rowCount, int valueCount, Statistics statistics, ValuesWriter repetitionLevels, ValuesWriter definitionLevels, ValuesWriter values) throws IOException; + + abstract void writePage(int rowCount, int valueCount, Statistics statistics, SizeStatistics sizeStatistics, + ValuesWriter repetitionLevels, ValuesWriter definitionLevels, ValuesWriter values) throws IOException; } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java index 752042480b..3c3d589811 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java @@ -25,6 +25,7 @@ import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.page.PageWriter; +import org.apache.parquet.column.statistics.SizeStatistics; import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.column.values.ValuesWriter; import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter; @@ -54,13 +55,21 @@ ValuesWriter createDLWriter(ParquetProperties props, ColumnDescriptor path) { } @Override + @Deprecated void writePage(int rowCount, int valueCount, Statistics statistics, ValuesWriter repetitionLevels, - ValuesWriter definitionLevels, ValuesWriter values) throws IOException { + ValuesWriter definitionLevels, ValuesWriter values) throws IOException { + writePage(rowCount, valueCount, statistics, null, repetitionLevels, definitionLevels, values); + } + + @Override + void writePage(int rowCount, int valueCount, Statistics statistics, SizeStatistics sizeStatistics, + ValuesWriter repetitionLevels, ValuesWriter definitionLevels, ValuesWriter values) throws IOException { pageWriter.writePage( concat(repetitionLevels.getBytes(), definitionLevels.getBytes(), values.getBytes()), valueCount, rowCount, statistics, + sizeStatistics, repetitionLevels.getEncoding(), definitionLevels.getEncoding(), values.getEncoding()); diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java index cc44e2d630..f082f7ae98 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java @@ -25,6 +25,7 @@ import org.apache.parquet.column.Encoding; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.page.PageWriter; +import org.apache.parquet.column.statistics.SizeStatistics; import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.column.values.ValuesWriter; import org.apache.parquet.column.values.bitpacking.DevNullValuesWriter; @@ -76,8 +77,15 @@ ValuesWriter createDLWriter(ParquetProperties props, ColumnDescriptor path) { } @Override + @Deprecated void writePage(int rowCount, int valueCount, Statistics statistics, ValuesWriter repetitionLevels, - ValuesWriter definitionLevels, ValuesWriter values) throws IOException { + ValuesWriter definitionLevels, ValuesWriter values) throws IOException { + writePage(rowCount, valueCount, statistics, null, repetitionLevels, definitionLevels, values); + } + + @Override + void writePage(int rowCount, int valueCount, Statistics statistics, SizeStatistics sizeStatistics, + ValuesWriter repetitionLevels, ValuesWriter definitionLevels, ValuesWriter values) throws IOException { // TODO: rework this API. The bytes shall be retrieved before the encoding (encoding might be different otherwise) BytesInput bytes = values.getBytes(); Encoding encoding = values.getEncoding(); @@ -89,6 +97,7 @@ void writePage(int rowCount, int valueCount, Statistics statistics, ValuesWri definitionLevels.getBytes(), encoding, bytes, - statistics); + statistics, + sizeStatistics); } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriter.java index a72be48b54..419ee3b236 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriter.java @@ -22,6 +22,7 @@ import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.statistics.SizeStatistics; import org.apache.parquet.column.statistics.Statistics; /** @@ -55,7 +56,25 @@ public interface PageWriter { * @param valuesEncoding values encoding * @throws IOException */ - void writePage(BytesInput bytesInput, int valueCount, int rowCount, Statistics statistics, Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding) throws IOException; + @Deprecated + void writePage(BytesInput bytesInput, int valueCount, int rowCount, Statistics statistics, + Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding) throws IOException; + + /** + * writes a single page + * @param bytesInput the bytes for the page + * @param valueCount the number of values in that page + * @param rowCount the number of rows in that page + * @param statistics the statistics for that page + * @param sizeStatistics the size statistics for that page + * @param rlEncoding repetition level encoding + * @param dlEncoding definition level encoding + * @param valuesEncoding values encoding + * @throws IOException + */ + void writePage(BytesInput bytesInput, int valueCount, int rowCount, Statistics statistics, + SizeStatistics sizeStatistics, Encoding rlEncoding, Encoding dlEncoding, + Encoding valuesEncoding) throws IOException; /** * writes a single page in the new format @@ -69,6 +88,7 @@ public interface PageWriter { * @param statistics optional stats for this page * @throws IOException if there is an exception while writing page data */ + @Deprecated void writePageV2( int rowCount, int nullCount, int valueCount, BytesInput repetitionLevels, BytesInput definitionLevels, @@ -76,6 +96,23 @@ void writePageV2( BytesInput data, Statistics statistics) throws IOException; + /** + * writes a single page in the new format + * @param rowCount the number of rows in this page + * @param nullCount the number of null values (out of valueCount) + * @param valueCount the number of values in that page (there could be multiple values per row for repeated fields) + * @param repetitionLevels the repetition levels encoded in RLE without any size header + * @param definitionLevels the definition levels encoded in RLE without any size header + * @param dataEncoding the encoding for the data + * @param data the data encoded with dataEncoding + * @param statistics optional stats for this page + * @param sizeStatistics optional size stats for this page + * @throws IOException if there is an exception while writing page data + */ + void writePageV2(int rowCount, int nullCount, int valueCount, BytesInput repetitionLevels, BytesInput definitionLevels, + Encoding dataEncoding, BytesInput data, Statistics statistics, + SizeStatistics sizeStatistics) throws IOException; + /** * @return the current size used in the memory buffer for that column chunk */ diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/SizeStatistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/SizeStatistics.java new file mode 100644 index 0000000000..7439fdd8ae --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/SizeStatistics.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.statistics; + +import it.unimi.dsi.fastutil.longs.LongArrayList; +import org.apache.parquet.Preconditions; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.PrimitiveType; + +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +/** + * A structure for capturing metadata for estimating the unencoded, + * uncompressed size of data written. This is useful for readers to estimate + * how much memory is needed to reconstruct data in their memory model and for + * fine-grained filter push down on nested structures (the histograms contained + * in this structure can help determine the number of nulls at a particular + * nesting level and maximum length of lists). + */ +public class SizeStatistics { + + private PrimitiveType type; + /** + * The number of physical bytes stored for BYTE_ARRAY data values assuming + * no encoding. This is exclusive of the bytes needed to store the length of + * each byte array. In other words, this field is equivalent to the `(size + * of PLAIN-ENCODING the byte array values) - (4 bytes * number of values + * written)`. To determine unencoded sizes of other types readers can use + * schema information multiplied by the number of non-null and null values. + * The number of null/non-null values can be inferred from the histograms + * below. + * + * For example, if a column chunk is dictionary-encoded with dictionary + * ["a", "bc", "cde"], and a data page contains the indices [0, 0, 1, 2], + * then this value for that data page should be 7 (1 + 1 + 2 + 3). + * + * This field should only be set for types that use BYTE_ARRAY as their + * physical type. + */ + private long unencodedByteArrayDataBytes; + /** + * When present, there is expected to be one element corresponding to each + * repetition (i.e. size=max repetition_level+1) where each element + * represents the number of times the repetition level was observed in the + * data. + * + * This field may be omitted if max_repetition_level is 0 without loss + * of information. + */ + private List repetitionLevelHistogram; + /** + * Same as repetition_level_histogram except for definition levels. + * + * This field may be omitted if max_definition_level is 0 or 1 without + * loss of information. + */ + private List definitionLevelHistogram; + + /** + * Whether the statistics has valid value. + * + * It is true by default. Only set to false while it fails to merge statistics. + */ + private boolean hasValue = true; + + /** + * Builder to create a SizeStatistics. + */ + public static class Builder { + private PrimitiveType type; + private long unencodedByteArrayDataBytes; + private long[] repetitionLevelHistogram; + private long[] definitionLevelHistogram; + + /** + * Create a builder to create a SizeStatistics. + * + * @param type physical type of the column associated with this statistics + * @param maxRepetitionLevel maximum repetition level of the column + * @param maxDefinitionLevel maximum definition level of the column + */ + public Builder(PrimitiveType type, int maxRepetitionLevel, int maxDefinitionLevel) { + this.type = type; + this.unencodedByteArrayDataBytes = 0L; + repetitionLevelHistogram = new long[maxRepetitionLevel + 1]; + definitionLevelHistogram = new long[maxDefinitionLevel + 1]; + Arrays.fill(repetitionLevelHistogram, 0L); + Arrays.fill(definitionLevelHistogram, 0L); + } + + /** + * Add repetition and definition level of a value to the statistics. + * It is called when value is null, or the column is not of BYTE_ARRAY type. + * + * @param repetitionLevel repetition level of the value + * @param definitionLevel definition level of the value + */ + public void add(int repetitionLevel, int definitionLevel) { + Preconditions.checkArgument(0 <= repetitionLevel && repetitionLevel < repetitionLevelHistogram.length, + "repetitionLevel %s is out of range [0, %s]", repetitionLevel, repetitionLevelHistogram.length - 1); + Preconditions.checkArgument(definitionLevel < definitionLevelHistogram.length, + "definitionLevel %s is out of range [0, %s]", definitionLevel, definitionLevelHistogram.length - 1); + repetitionLevelHistogram[repetitionLevel]++; + definitionLevelHistogram[definitionLevel]++; + } + + /** + * Add repetition and definition level of a value to the statistics. + * It is called when value is null, or the column is not of BYTE_ARRAY type. + * + * @param repetitionLevel repetition level of the value + * @param definitionLevel definition level of the value + * @param value value of to be added + */ + public void add(int repetitionLevel, int definitionLevel, Binary value) { + add(repetitionLevel, definitionLevel); + if (type.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.BINARY) { + unencodedByteArrayDataBytes = Math.addExact(unencodedByteArrayDataBytes, value.length()); + } + } + + /** + * Build a SizeStatistics from the builder. + */ + public SizeStatistics build() { + return new SizeStatistics(type, unencodedByteArrayDataBytes, + new LongArrayList(repetitionLevelHistogram), new LongArrayList(definitionLevelHistogram)); + } + + } + + /** + * Create a builder to create a SizeStatistics. + * + * @param type physical type of the column associated with this statistics + * @param maxRepetitionLevel maximum repetition level of the column + * @param maxDefinitionLevel maximum definition level of the column + */ + public static Builder newBuilder(PrimitiveType type, int maxRepetitionLevel, int maxDefinitionLevel) { + return new Builder(type, maxRepetitionLevel, maxDefinitionLevel); + } + + /** + * Create a SizeStatistics. + * + * @param type physical type of the column associated with this statistics + * @param unencodedByteArrayDataBytes number of physical bytes stored for BYTE_ARRAY data values assuming no encoding + * @param repetitionLevelHistogram histogram for all repetition levels if non-empty + * @param definitionLevelHistogram histogram for all definition levels if non-empty + */ + public SizeStatistics(PrimitiveType type, + long unencodedByteArrayDataBytes, + List repetitionLevelHistogram, + List definitionLevelHistogram) { + this.type = type; + this.unencodedByteArrayDataBytes = unencodedByteArrayDataBytes; + this.repetitionLevelHistogram = repetitionLevelHistogram; + this.definitionLevelHistogram = definitionLevelHistogram; + } + + /** + * Merge two SizeStatistics of the same column. + * It is used to merge size statistics from all pages of the same column chunk. + */ + public void mergeStatistics(SizeStatistics other) { + if (!hasValue) { + return; + } + + // Stop merge if other is invalid. + if (other == null || !other.isValid()) { + hasValue = false; + unencodedByteArrayDataBytes = 0L; + repetitionLevelHistogram.clear(); + definitionLevelHistogram.clear(); + return; + } + + Preconditions.checkArgument(type.equals(other.type), "Cannot merge SizeStatistics of different types"); + Preconditions.checkArgument(repetitionLevelHistogram.size() == other.repetitionLevelHistogram.size(), + "Cannot merge repetitionLevelHistogram with different sizes"); + Preconditions.checkArgument(definitionLevelHistogram.size() == other.definitionLevelHistogram.size(), + "Cannot merge definitionLevelHistogram with different sizes"); + unencodedByteArrayDataBytes = Math.addExact(unencodedByteArrayDataBytes, other.unencodedByteArrayDataBytes); + for (int i = 0; i < repetitionLevelHistogram.size(); i++) { + repetitionLevelHistogram.set(i, repetitionLevelHistogram.get(i) + other.getRepetitionLevelHistogram().get(i)); + } + for (int i = 0; i < definitionLevelHistogram.size(); i++) { + definitionLevelHistogram.set(i, definitionLevelHistogram.get(i) + other.getDefinitionLevelHistogram().get(i)); + } + } + + public PrimitiveType getType() { + return type; + } + + /** + * @return unencoded and uncompressed byte size of the BYTE_ARRAY column, or empty for other types. + */ + public Optional getUnencodedByteArrayDataBytes() { + if (type.getPrimitiveTypeName() != PrimitiveType.PrimitiveTypeName.BINARY) { + return Optional.empty(); + } + return Optional.of(unencodedByteArrayDataBytes); + } + + /** + * @return repetition level histogram of all levels if not empty. + */ + public List getRepetitionLevelHistogram() { + return repetitionLevelHistogram; + } + + /** + * @return definition level histogram of all levels if not empty. + */ + public List getDefinitionLevelHistogram() { + return definitionLevelHistogram; + } + + /** + * @return a new independent statistics instance of this class. + */ + public SizeStatistics copy() { + return new SizeStatistics(type, unencodedByteArrayDataBytes, + new LongArrayList(repetitionLevelHistogram), new LongArrayList(definitionLevelHistogram)); + } + + /** + * @return whether the statistics has valid value. + */ + public boolean isValid() { + return hasValue; + } + +} diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndex.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndex.java index b91a5c0d96..99c11581bc 100644 --- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndex.java +++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndex.java @@ -57,4 +57,16 @@ public interface ColumnIndex extends Visitor { */ public List getMaxValues(); + /** + * @return the unmodifiable list of the repetition level histograms for each page concatenated together; used for + * converting to the related thrift object + */ + public List getRepetitionLevelHistogram(); + + /** + * @return the unmodifiable list of the definition level histograms for each page concatenated together; used for + * converting to the related thrift object + */ + public List getDefinitionLevelHistogram(); + } diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java index 7c221efbfb..369070b91c 100644 --- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java +++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java @@ -31,6 +31,7 @@ import java.util.function.IntPredicate; import org.apache.parquet.column.MinMax; +import org.apache.parquet.column.statistics.SizeStatistics; import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.filter2.predicate.Operators.And; import org.apache.parquet.filter2.predicate.Operators.Eq; @@ -104,6 +105,10 @@ int translate(int arrayIndex) { private int[] pageIndexes; // might be null private long[] nullCounts; + // might be null + private long[] repLevelHistogram; + // might be null + private long[] defLevelHistogram; static String truncate(String str) { if (str.length() <= MAX_VALUE_LENGTH_FOR_TOSTRING) { @@ -164,6 +169,22 @@ public List getMaxValues() { return list; } + @Override + public List getRepetitionLevelHistogram() { + if (repLevelHistogram == null) { + return LongList.of(); + } + return LongLists.unmodifiable(LongArrayList.wrap(repLevelHistogram)); + } + + @Override + public List getDefinitionLevelHistogram() { + if (defLevelHistogram == null) { + return LongList.of(); + } + return LongLists.unmodifiable(LongArrayList.wrap(defLevelHistogram)); + } + @Override public String toString() { try (Formatter formatter = new Formatter()) { @@ -418,6 +439,10 @@ public ColumnIndex build() { public void add(Statistics stats) { } + @Override + public void add(Statistics stats, SizeStatistics sizeStats) { + } + @Override void addMinMax(Object min, Object max) { } @@ -457,6 +482,8 @@ int sizeOf(Object value) { private long minMaxSize; private final IntList pageIndexes = new IntArrayList(); private int nextPageIndex; + private LongList repLevelHistogram = new LongArrayList(); + private LongList defLevelHistogram = new LongArrayList(); /** * @return a no-op builder that does not collect statistics objects and therefore returns {@code null} at @@ -516,6 +543,7 @@ private static ColumnIndexBuilder createNewBuilder(PrimitiveType type, int trunc * the max values for each page * @return the newly created {@link ColumnIndex} object based on the specified arguments */ + @Deprecated public static ColumnIndex build( PrimitiveType type, BoundaryOrder boundaryOrder, @@ -523,10 +551,42 @@ public static ColumnIndex build( List nullCounts, List minValues, List maxValues) { + return build(type, boundaryOrder, nullPages, nullCounts, minValues, maxValues, null, null); + } + + /** + * @param type + * the primitive type + * @param boundaryOrder + * the boundary order of the min/max values + * @param nullPages + * the null pages (one boolean value for each page that signifies whether the page consists of nulls + * entirely) + * @param nullCounts + * the number of null values for each page + * @param minValues + * the min values for each page + * @param maxValues + * the max values for each page + * @param repLevelHistogram + * the repetition level histogram for all levels of each page + * @param defLevelHistogram + * the definition level histogram for all levels of each page + * @return the newly created {@link ColumnIndex} object based on the specified arguments + */ + public static ColumnIndex build( + PrimitiveType type, + BoundaryOrder boundaryOrder, + List nullPages, + List nullCounts, + List minValues, + List maxValues, + List repLevelHistogram, + List defLevelHistogram) { ColumnIndexBuilder builder = createNewBuilder(type, Integer.MAX_VALUE); - builder.fill(nullPages, nullCounts, minValues, maxValues); + builder.fill(nullPages, nullCounts, minValues, maxValues, repLevelHistogram, defLevelHistogram); ColumnIndexBase columnIndex = builder.build(type); columnIndex.boundaryOrder = requireNonNull(boundaryOrder); return columnIndex; @@ -542,7 +602,20 @@ public static ColumnIndex build( * @param stats * the statistics to be added */ + @Deprecated public void add(Statistics stats) { + add(stats, null); + } + + /** + * Adds the data from the specified statistics to this builder + * + * @param stats + * the statistics to be added + * @param sizeStats + * the size statistics to be added + */ + public void add(Statistics stats, SizeStatistics sizeStats) { if (stats.hasNonNullValue()) { nullPages.add(false); Object min = stats.genericGetMin(); @@ -555,6 +628,16 @@ public void add(Statistics stats) { nullPages.add(true); } nullCounts.add(stats.getNumNulls()); + + // Collect repetition and definition level histograms only when all pages are valid. + if (sizeStats != null && sizeStats.isValid() && repLevelHistogram != null && defLevelHistogram != null) { + repLevelHistogram.addAll(sizeStats.getRepetitionLevelHistogram()); + defLevelHistogram.addAll(sizeStats.getDefinitionLevelHistogram()); + } else { + repLevelHistogram = null; + defLevelHistogram = null; + } + ++nextPageIndex; } @@ -563,7 +646,7 @@ public void add(Statistics stats) { abstract void addMinMax(Object min, Object max); private void fill(List nullPages, List nullCounts, List minValues, - List maxValues) { + List maxValues, List repLevelHistogram, List defLevelHistogram) { clear(); int pageCount = nullPages.size(); if ((nullCounts != null && nullCounts.size() != pageCount) || minValues.size() != pageCount @@ -572,6 +655,18 @@ private void fill(List nullPages, List nullCounts, List nullPages, List nullCounts, List build(PrimitiveType type) { columnIndex.nullCounts = nullCounts.toLongArray(); } columnIndex.pageIndexes = pageIndexes.toIntArray(); + // Repetition and definition level histograms are optional so keep them null if the builder has no values + if (repLevelHistogram != null && !repLevelHistogram.isEmpty()) { + columnIndex.repLevelHistogram = repLevelHistogram.toLongArray(); + } + if (defLevelHistogram != null && !defLevelHistogram.isEmpty()) { + columnIndex.defLevelHistogram = defLevelHistogram.toLongArray(); + } return columnIndex; } @@ -662,6 +772,8 @@ private void clear() { minMaxSize = 0; nextPageIndex = 0; pageIndexes.clear(); + repLevelHistogram.clear(); + defLevelHistogram.clear(); } abstract void clearMinMax(); diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndex.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndex.java index 6c16294953..f2d187b433 100644 --- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndex.java +++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndex.java @@ -18,6 +18,8 @@ */ package org.apache.parquet.internal.column.columnindex; +import java.util.Optional; + /** * Offset index containing the offset and size of the page and the index of the first row in the page. * @@ -70,4 +72,11 @@ public default long getLastRowIndex(int pageIndex, long rowGroupRowCount) { int nextPageIndex = pageIndex + 1; return (nextPageIndex >= getPageCount() ? rowGroupRowCount : getFirstRowIndex(nextPageIndex)) - 1; } + + /** + * @param pageIndex + * the index of the page + * @return unencoded/uncompressed size for BYTE_ARRAY types; or empty for other types. + */ + public Optional getUnencodedByteArrayDataBytes(int pageIndex); } diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndexBuilder.java index 169c82e54a..497e7990f6 100644 --- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndexBuilder.java +++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndexBuilder.java @@ -19,6 +19,7 @@ package org.apache.parquet.internal.column.columnindex; import java.util.Formatter; +import java.util.Optional; import it.unimi.dsi.fastutil.ints.IntArrayList; import it.unimi.dsi.fastutil.ints.IntList; @@ -34,6 +35,7 @@ private static class OffsetIndexImpl implements OffsetIndex { private long[] offsets; private int[] compressedPageSizes; private long[] firstRowIndexes; + private long[] unencodedByteArrayDataBytes; @Override public String toString() { @@ -70,6 +72,14 @@ public long getFirstRowIndex(int pageIndex) { public int getPageOrdinal(int pageIndex) { return pageIndex; } + + @Override + public Optional getUnencodedByteArrayDataBytes(int pageIndex) { + if (unencodedByteArrayDataBytes == null || unencodedByteArrayDataBytes.length == 0) { + return Optional.empty(); + } + return Optional.of(unencodedByteArrayDataBytes[pageIndex]); + } } private static final OffsetIndexBuilder NO_OP_BUILDER = new OffsetIndexBuilder() { @@ -85,6 +95,7 @@ public void add(long offset, int compressedPageSize, long rowCount) { private final LongList offsets = new LongArrayList(); private final IntList compressedPageSizes = new IntArrayList(); private final LongList firstRowIndexes = new LongArrayList(); + private final LongList unencodedDataBytes = new LongArrayList(); private long previousOffset; private int previousPageSize; private long previousRowIndex; @@ -116,11 +127,28 @@ private OffsetIndexBuilder() { * @param rowCount * the number of rows in the page */ + @Deprecated public void add(int compressedPageSize, long rowCount) { - add(previousOffset + previousPageSize, compressedPageSize, previousRowIndex + previousRowCount); + add(compressedPageSize, rowCount, Optional.empty()); + } + + /** + * Adds the specified parameters to this builder. Used by the writers to building up {@link OffsetIndex} objects to be + * written to the Parquet file. + * + * @param compressedPageSize + * the size of the page (including header) + * @param rowCount + * the number of rows in the page + * @param unencodedDataBytes + * the number of bytes of unencoded data of BYTE_ARRAY type + */ + public void add(int compressedPageSize, long rowCount, Optional unencodedDataBytes) { + add(previousOffset + previousPageSize, compressedPageSize, previousRowIndex + previousRowCount, unencodedDataBytes); previousRowCount = rowCount; } + /** * Adds the specified parameters to this builder. Used by the metadata converter to building up {@link OffsetIndex} * objects read from the Parquet file. @@ -132,13 +160,34 @@ public void add(int compressedPageSize, long rowCount) { * @param firstRowIndex * the index of the first row in the page (within the row group) */ + @Deprecated public void add(long offset, int compressedPageSize, long firstRowIndex) { + add(offset, compressedPageSize, firstRowIndex, Optional.empty()); + } + + /** + * Adds the specified parameters to this builder. Used by the metadata converter to building up {@link OffsetIndex} + * objects read from the Parquet file. + * + * @param offset + * the offset of the page in the file + * @param compressedPageSize + * the size of the page (including header) + * @param firstRowIndex + * the index of the first row in the page (within the row group) + * @param unencodedDataBytes + * the number of bytes of unencoded data of BYTE_ARRAY type + */ + public void add(long offset, int compressedPageSize, long firstRowIndex, Optional unencodedDataBytes) { previousOffset = offset; offsets.add(offset); previousPageSize = compressedPageSize; compressedPageSizes.add(compressedPageSize); previousRowIndex = firstRowIndex; firstRowIndexes.add(firstRowIndex); + if (unencodedDataBytes.isPresent()) { + this.unencodedDataBytes.add(unencodedDataBytes.get()); + } } /** @@ -157,6 +206,9 @@ public OffsetIndexBuilder fromOffsetIndex(OffsetIndex offsetIndex) { this.offsets.addAll(new LongArrayList(offsetIndexImpl.offsets)); this.compressedPageSizes.addAll(new IntArrayList(offsetIndexImpl.compressedPageSizes)); this.firstRowIndexes.addAll(new LongArrayList(offsetIndexImpl.firstRowIndexes)); + if (offsetIndexImpl.unencodedByteArrayDataBytes != null) { + this.unencodedDataBytes.addAll(new LongArrayList(offsetIndexImpl.unencodedByteArrayDataBytes)); + } this.previousOffset = 0; this.previousPageSize = 0; this.previousRowIndex = 0; @@ -187,6 +239,12 @@ public OffsetIndex build(long shift) { offsetIndex.offsets = offsets; offsetIndex.compressedPageSizes = compressedPageSizes.toIntArray(); offsetIndex.firstRowIndexes = firstRowIndexes.toLongArray(); + if (!unencodedDataBytes.isEmpty()) { + if (unencodedDataBytes.size() != this.offsets.size()) { + throw new IllegalStateException("unencodedDataBytes does not have the same size as offsets"); + } + offsetIndex.unencodedByteArrayDataBytes = unencodedDataBytes.toLongArray(); + } return offsetIndex; } diff --git a/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java b/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java index 4af185e50c..34a197851f 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java @@ -221,8 +221,9 @@ public void testPageSize() { for (int i = 0; i < 123; ++i) { // Writing 10 values per record for (int j = 0; j < 10; ++j) { - binaryColWriter.write(Binary.fromString("aaaaaaaaaaaa"), j == 0 ? 0 : 2, 2); - int32ColWriter.write(42, j == 0 ? 0 : 2, 2); + binaryColWriter.write(Binary.fromString("aaaaaaaaaaaa"), + j == 0 ? 0 : binaryCol.getMaxRepetitionLevel(), binaryCol.getMaxDefinitionLevel()); + int32ColWriter.write(42, j == 0 ? 0 : int32Col.getMaxRepetitionLevel(), int32Col.getMaxDefinitionLevel()); } writeStore.endRecord(); } diff --git a/parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageWriter.java b/parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageWriter.java index 77bf6247a9..d3c3ab8d3c 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageWriter.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageWriter.java @@ -25,6 +25,7 @@ import org.apache.parquet.column.page.DataPageV2; import org.apache.parquet.column.page.DictionaryPage; import org.apache.parquet.column.page.PageWriter; +import org.apache.parquet.column.statistics.SizeStatistics; import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.io.ParquetEncodingException; import org.slf4j.Logger; @@ -62,6 +63,13 @@ public void writePage(BytesInput bytesInput, int valueCount, int rowCount, Stati writePage(bytesInput, valueCount, statistics, rlEncoding, dlEncoding, valuesEncoding); } + @Override + public void writePage(BytesInput bytesInput, int valueCount, int rowCount, Statistics statistics, + SizeStatistics sizeStatistics, Encoding rlEncoding, Encoding dlEncoding, + Encoding valuesEncoding) throws IOException { + writePage(bytesInput, valueCount, statistics, rlEncoding, dlEncoding, valuesEncoding); + } + @Override public void writePageV2(int rowCount, int nullCount, int valueCount, BytesInput repetitionLevels, BytesInput definitionLevels, @@ -76,6 +84,13 @@ public void writePageV2(int rowCount, int nullCount, int valueCount, LOG.debug("page written for {} bytes and {} records", size, valueCount); } + @Override + public void writePageV2(int rowCount, int nullCount, int valueCount, BytesInput repetitionLevels, + BytesInput definitionLevels, Encoding dataEncoding, BytesInput data, + Statistics statistics, SizeStatistics sizeStatistics) throws IOException { + writePageV2(rowCount, nullCount, valueCount, repetitionLevels, definitionLevels, dataEncoding, data, statistics); + } + @Override public long getMemSize() { return memSize; diff --git a/parquet-column/src/test/java/org/apache/parquet/column/statistics/TestSizeStatistics.java b/parquet-column/src/test/java/org/apache/parquet/column/statistics/TestSizeStatistics.java new file mode 100644 index 0000000000..255141f4f6 --- /dev/null +++ b/parquet-column/src/test/java/org/apache/parquet/column/statistics/TestSizeStatistics.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.statistics; + +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Types; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Optional; + +public class TestSizeStatistics { + + @Test + public void testAddBinaryType() { + PrimitiveType type = Types.optional(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("a"); + final int maxRepetitionLevel = 2; + final int maxDefinitionLevel = 2; + SizeStatistics.Builder builder = new SizeStatistics.Builder(type, maxRepetitionLevel, maxDefinitionLevel); + builder.add(0, 2, Binary.fromString("a")); + builder.add(1, 2, Binary.fromString("")); + builder.add(2, 2, Binary.fromString("bb")); + Assert.assertThrows(IllegalArgumentException.class, () -> builder.add(3, 0, Binary.fromString("c"))); + Assert.assertThrows(IllegalArgumentException.class, () -> builder.add(0, 3, Binary.fromString("c"))); + SizeStatistics statistics = builder.build(); + Assert.assertEquals(Optional.of(3L), statistics.getUnencodedByteArrayDataBytes()); + Assert.assertEquals(Arrays.asList(1L, 1L, 1L), statistics.getRepetitionLevelHistogram()); + Assert.assertEquals(Arrays.asList(0L, 0L, 3L), statistics.getDefinitionLevelHistogram()); + } + + @Test + public void testAddNonBinaryType() { + PrimitiveType type = Types.optional(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY).length(2).named("a"); + final int maxRepetitionLevel = 1; + final int maxDefinitionLevel = 1; + SizeStatistics.Builder builder = new SizeStatistics.Builder(type, maxRepetitionLevel, maxDefinitionLevel); + builder.add(0, 1, Binary.fromString("aa")); + builder.add(0, 1, Binary.fromString("aa")); + builder.add(1, 1, Binary.fromString("aa")); + builder.add(1, 0); + builder.add(1, 0); + builder.add(1, 0); + SizeStatistics statistics = builder.build(); + Assert.assertEquals(Optional.empty(), statistics.getUnencodedByteArrayDataBytes()); + Assert.assertEquals(Arrays.asList(2L, 4L), statistics.getRepetitionLevelHistogram()); + Assert.assertEquals(Arrays.asList(3L, 3L), statistics.getDefinitionLevelHistogram()); + } + + @Test + public void testMergeStatistics() { + PrimitiveType type = Types.optional(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("a"); + final int maxRepetitionLevel = 2; + final int maxDefinitionLevel = 2; + SizeStatistics.Builder builder1 = new SizeStatistics.Builder(type, maxRepetitionLevel, maxDefinitionLevel); + builder1.add(0, 0, Binary.fromString("a")); + builder1.add(1, 1, Binary.fromString("b")); + builder1.add(2, 2, Binary.fromString("c")); + SizeStatistics statistics1 = builder1.build(); + SizeStatistics.Builder builder2 = new SizeStatistics.Builder(type, maxRepetitionLevel, maxDefinitionLevel); + builder2.add(0, 1, Binary.fromString("d")); + builder2.add(0, 1, Binary.fromString("e")); + SizeStatistics statistics2 = builder2.build(); + statistics1.mergeStatistics(statistics2); + Assert.assertEquals(Optional.of(5L), statistics1.getUnencodedByteArrayDataBytes()); + Assert.assertEquals(Arrays.asList(3L, 1L, 1L), statistics1.getRepetitionLevelHistogram()); + Assert.assertEquals(Arrays.asList(1L, 3L, 1L), statistics1.getDefinitionLevelHistogram()); + } + + @Test + public void testMergeThrowException() { + // Merge different types. + { + PrimitiveType type1 = Types.optional(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("a"); + PrimitiveType type2 = Types.optional(PrimitiveType.PrimitiveTypeName.INT32).named("a"); + SizeStatistics.Builder builder1 = new SizeStatistics.Builder(type1, 1, 1); + SizeStatistics.Builder builder2 = new SizeStatistics.Builder(type2, 1, 1); + SizeStatistics statistics1 = builder1.build(); + SizeStatistics statistics2 = builder2.build(); + Assert.assertThrows(IllegalArgumentException.class, () -> statistics1.mergeStatistics(statistics2)); + } + + // Merge different levels. + { + PrimitiveType type = Types.optional(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("a"); + SizeStatistics.Builder builder1 = new SizeStatistics.Builder(type, 1, 1); + SizeStatistics.Builder builder2 = new SizeStatistics.Builder(type, 2, 2); + SizeStatistics statistics1 = builder1.build(); + SizeStatistics statistics2 = builder2.build(); + Assert.assertThrows(IllegalArgumentException.class, () -> statistics1.mergeStatistics(statistics2)); + } + } + + @Test + public void testCopyStatistics() { + PrimitiveType type = Types.optional(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("a"); + final int maxRepetitionLevel = 2; + final int maxDefinitionLevel = 2; + SizeStatistics.Builder builder = new SizeStatistics.Builder(type, maxRepetitionLevel, maxDefinitionLevel); + builder.add(0, 0, Binary.fromString("a")); + builder.add(1, 1, Binary.fromString("b")); + builder.add(2, 2, Binary.fromString("c")); + SizeStatistics statistics = builder.build(); + SizeStatistics copy = statistics.copy(); + Assert.assertEquals(Optional.of(3L), copy.getUnencodedByteArrayDataBytes()); + Assert.assertEquals(Arrays.asList(1L, 1L, 1L), copy.getRepetitionLevelHistogram()); + Assert.assertEquals(Arrays.asList(1L, 1L, 1L), copy.getDefinitionLevelHistogram()); + } + +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index 09b21538e5..a60a2d96bc 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -80,6 +80,7 @@ import org.apache.parquet.format.NanoSeconds; import org.apache.parquet.format.NullType; import org.apache.parquet.format.PageEncodingStats; +import org.apache.parquet.format.SizeStatistics; import org.apache.parquet.format.SplitBlockAlgorithm; import org.apache.parquet.format.StringType; import org.apache.parquet.format.TimeType; @@ -137,6 +138,7 @@ import org.apache.parquet.schema.TypeVisitor; import org.apache.parquet.schema.Types; import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.zookeeper.Op; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -547,6 +549,9 @@ private void addRowGroup(ParquetMetadata parquetMetadata, List rowGrou if (columnMetaData.getEncodingStats() != null) { metaData.setEncoding_stats(convertEncodingStats(columnMetaData.getEncodingStats())); } + if (columnMetaData.getSizeStatistics() != null && columnMetaData.getSizeStatistics().isValid()) { + metaData.setSize_statistics(toParquetSizeStatistics(columnMetaData.getSizeStatistics())); + } if (!encryptMetaData) { columnChunk.setMeta_data(metaData); @@ -1507,7 +1512,8 @@ public ColumnChunkMetaData buildColumnChunkMetaData(ColumnMetaData metaData, Col metaData.dictionary_page_offset, metaData.num_values, metaData.total_compressed_size, - metaData.total_uncompressed_size); + metaData.total_uncompressed_size, + fromParquetSizeStatistics(metaData.size_statistics, type)); } public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata) throws IOException { @@ -2044,6 +2050,14 @@ public static ColumnIndex toParquetColumnIndex(PrimitiveType type, columnIndex.getMaxValues(), toParquetBoundaryOrder(columnIndex.getBoundaryOrder())); parquetColumnIndex.setNull_counts(columnIndex.getNullCounts()); + List repLevelHistogram = columnIndex.getRepetitionLevelHistogram(); + if (repLevelHistogram != null && !repLevelHistogram.isEmpty()) { + parquetColumnIndex.setRepetition_level_histograms(repLevelHistogram); + } + List defLevelHistogram = columnIndex.getDefinitionLevelHistogram(); + if (defLevelHistogram != null && !defLevelHistogram.isEmpty()) { + parquetColumnIndex.setDefinition_level_histograms(defLevelHistogram); + } return parquetColumnIndex; } @@ -2057,25 +2071,43 @@ public static org.apache.parquet.internal.column.columnindex.ColumnIndex fromPar parquetColumnIndex.getNull_pages(), parquetColumnIndex.getNull_counts(), parquetColumnIndex.getMin_values(), - parquetColumnIndex.getMax_values()); + parquetColumnIndex.getMax_values(), + parquetColumnIndex.getRepetition_level_histograms(), + parquetColumnIndex.getDefinition_level_histograms()); } public static OffsetIndex toParquetOffsetIndex(org.apache.parquet.internal.column.columnindex.OffsetIndex offsetIndex) { List pageLocations = new ArrayList<>(offsetIndex.getPageCount()); + List unencodedByteArrayDataTypes = new ArrayList<>(offsetIndex.getPageCount()); for (int i = 0, n = offsetIndex.getPageCount(); i < n; ++i) { pageLocations.add(new PageLocation( offsetIndex.getOffset(i), offsetIndex.getCompressedPageSize(i), offsetIndex.getFirstRowIndex(i))); + Optional unencodedByteArrayDataType = offsetIndex.getUnencodedByteArrayDataBytes(i); + if (unencodedByteArrayDataType.isPresent() && unencodedByteArrayDataTypes.size() == i) { + unencodedByteArrayDataTypes.add(unencodedByteArrayDataType.get()); + } + } + OffsetIndex parquetOffsetIndex = new OffsetIndex(pageLocations); + if (unencodedByteArrayDataTypes.size() == pageLocations.size()) { + // Do not add the field if we are missing that from any page. + parquetOffsetIndex.setUnencoded_byte_array_data_bytes(unencodedByteArrayDataTypes); } - return new OffsetIndex(pageLocations); + return parquetOffsetIndex; } public static org.apache.parquet.internal.column.columnindex.OffsetIndex fromParquetOffsetIndex( OffsetIndex parquetOffsetIndex) { + boolean hasUnencodedByteArrayDataBytes = parquetOffsetIndex.isSetUnencoded_byte_array_data_bytes() && + parquetOffsetIndex.unencoded_byte_array_data_bytes.size() == parquetOffsetIndex.page_locations.size(); OffsetIndexBuilder builder = OffsetIndexBuilder.getBuilder(); - for (PageLocation pageLocation : parquetOffsetIndex.getPage_locations()) { - builder.add(pageLocation.getOffset(), pageLocation.getCompressed_page_size(), pageLocation.getFirst_row_index()); + for (int i = 0; i < parquetOffsetIndex.page_locations.size(); ++i) { + PageLocation pageLocation = parquetOffsetIndex.page_locations.get(i); + Optional unencodedByteArrayDataBytes = hasUnencodedByteArrayDataBytes ? + Optional.of(parquetOffsetIndex.unencoded_byte_array_data_bytes.get(i)) : Optional.empty(); + builder.add(pageLocation.getOffset(), pageLocation.getCompressed_page_size(), pageLocation.getFirst_row_index(), + unencodedByteArrayDataBytes); } return builder.build(); } @@ -2107,4 +2139,30 @@ public static BloomFilterHeader toBloomFilterHeader( bloomFilter.getAlgorithm(), bloomFilter.getHashStrategy(), bloomFilter.getCompression())); } } + + public static org.apache.parquet.column.statistics.SizeStatistics fromParquetSizeStatistics( + SizeStatistics statistics, PrimitiveType type) { + if (statistics == null) { + return null; + } + return new org.apache.parquet.column.statistics.SizeStatistics( + type, + statistics.getUnencoded_byte_array_data_bytes(), + statistics.getRepetition_level_histogram(), + statistics.getDefinition_level_histogram()); + } + + public static SizeStatistics toParquetSizeStatistics(org.apache.parquet.column.statistics.SizeStatistics stats) { + if (stats == null) { + return null; + } + SizeStatistics formatStats = new SizeStatistics(); + if (stats.getUnencodedByteArrayDataBytes().isPresent()) { + formatStats.setUnencoded_byte_array_data_bytes(stats.getUnencodedByteArrayDataBytes().get()); + } + formatStats.setRepetition_level_histogram(stats.getRepetitionLevelHistogram()); + formatStats.setDefinition_level_histogram(stats.getDefinitionLevelHistogram()); + return formatStats; + } + } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java index 7b81ba4363..fea10b5b21 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java @@ -25,9 +25,13 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.zip.CRC32; +import com.sun.tools.corba.se.idl.InvalidArgument; +import org.apache.parquet.ParquetRuntimeException; +import org.apache.parquet.Preconditions; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.bytes.ConcatenatingByteArrayCollector; import org.apache.parquet.column.ColumnDescriptor; @@ -36,6 +40,7 @@ import org.apache.parquet.column.page.DictionaryPage; import org.apache.parquet.column.page.PageWriteStore; import org.apache.parquet.column.page.PageWriter; +import org.apache.parquet.column.statistics.SizeStatistics; import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.column.values.bloomfilter.BloomFilter; import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore; @@ -87,6 +92,7 @@ private static final class ColumnChunkPageWriter implements PageWriter, BloomFil private ColumnIndexBuilder columnIndexBuilder; private OffsetIndexBuilder offsetIndexBuilder; private Statistics totalStatistics; + private SizeStatistics totalSizeStatistics; private final ByteBufferAllocator allocator; private final CRC32 crc; @@ -117,9 +123,10 @@ private ColumnChunkPageWriter(ColumnDescriptor path, this.buf = new ConcatenatingByteArrayCollector(); this.columnIndexBuilder = ColumnIndexBuilder.getBuilder(path.getPrimitiveType(), columnIndexTruncateLength); this.offsetIndexBuilder = OffsetIndexBuilder.getBuilder(); + this.totalSizeStatistics = SizeStatistics.newBuilder(path.getPrimitiveType(), + path.getMaxRepetitionLevel(), path.getMaxDefinitionLevel()).build(); this.pageWriteChecksumEnabled = pageWriteChecksumEnabled; this.crc = pageWriteChecksumEnabled ? new CRC32() : null; - this.headerBlockEncryptor = headerBlockEncryptor; this.pageBlockEncryptor = pageBlockEncryptor; this.fileAAD = fileAAD; @@ -152,6 +159,7 @@ public void writePage(BytesInput bytesInput, int valueCount, Statistics stati } @Override + @Deprecated public void writePage(BytesInput bytes, int valueCount, int rowCount, @@ -159,6 +167,18 @@ public void writePage(BytesInput bytes, Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding) throws IOException { + writePage(bytes, valueCount, rowCount, statistics, null, rlEncoding, dlEncoding, valuesEncoding); + } + + @Override + public void writePage(BytesInput bytes, + int valueCount, + int rowCount, + Statistics statistics, + SizeStatistics sizeStatistics, + Encoding rlEncoding, + Encoding dlEncoding, + Encoding valuesEncoding) throws IOException { pageOrdinal++; long uncompressedSize = bytes.size(); if (uncompressedSize > Integer.MAX_VALUE || uncompressedSize < 0) { @@ -212,8 +232,9 @@ public void writePage(BytesInput bytes, this.totalValueCount += valueCount; this.pageCount += 1; - mergeColumnStatistics(statistics); - offsetIndexBuilder.add(toIntWithCheck(tempOutputStream.size() + compressedSize), rowCount); + mergeColumnStatistics(statistics, sizeStatistics); + offsetIndexBuilder.add(toIntWithCheck(tempOutputStream.size() + compressedSize), rowCount, + sizeStatistics != null ? sizeStatistics.getUnencodedByteArrayDataBytes() : Optional.empty()); // by concatenating before collecting instead of collecting twice, // we only allocate one buffer to copy into instead of multiple. @@ -223,12 +244,22 @@ public void writePage(BytesInput bytes, dataEncodings.add(valuesEncoding); } + @Override + @Deprecated + public void writePageV2( + int rowCount, int nullCount, int valueCount, + BytesInput repetitionLevels, BytesInput definitionLevels, + Encoding dataEncoding, BytesInput data, + Statistics statistics) throws IOException { + writePageV2(rowCount, nullCount, valueCount, repetitionLevels, definitionLevels, dataEncoding, data, statistics, null); + } + @Override public void writePageV2( int rowCount, int nullCount, int valueCount, BytesInput repetitionLevels, BytesInput definitionLevels, Encoding dataEncoding, BytesInput data, - Statistics statistics) throws IOException { + Statistics statistics, SizeStatistics sizeStatistics) throws IOException { pageOrdinal++; int rlByteLength = toIntWithCheck(repetitionLevels.size()); @@ -292,8 +323,9 @@ public void writePageV2( this.totalValueCount += valueCount; this.pageCount += 1; - mergeColumnStatistics(statistics); - offsetIndexBuilder.add(toIntWithCheck((long) tempOutputStream.size() + compressedSize), rowCount); + mergeColumnStatistics(statistics, sizeStatistics); + offsetIndexBuilder.add(toIntWithCheck((long) tempOutputStream.size() + compressedSize), rowCount, + sizeStatistics != null ? sizeStatistics.getUnencodedByteArrayDataBytes() : Optional.empty()); // by concatenating before collecting instead of collecting twice, // we only allocate one buffer to copy into instead of multiple. @@ -316,7 +348,14 @@ private int toIntWithCheck(long size) { return (int)size; } - private void mergeColumnStatistics(Statistics statistics) { + private void mergeColumnStatistics(Statistics statistics, SizeStatistics sizeStatistics) { + Preconditions.checkState(totalSizeStatistics != null, "Aggregate size statistics should not be null"); + totalSizeStatistics.mergeStatistics(sizeStatistics); + if (!totalSizeStatistics.isValid()) { + // Set page size statistics to null to clear state in the ColumnIndexBuilder. + sizeStatistics = null; + } + if (totalStatistics != null && totalStatistics.isEmpty()) { return; } @@ -329,10 +368,10 @@ private void mergeColumnStatistics(Statistics statistics) { } else if (totalStatistics == null) { // Copying the statistics if it is not initialized yet, so we have the correct typed one totalStatistics = statistics.copy(); - columnIndexBuilder.add(statistics); + columnIndexBuilder.add(statistics, sizeStatistics); } else { totalStatistics.mergeStatistics(statistics); - columnIndexBuilder.add(statistics); + columnIndexBuilder.add(statistics, sizeStatistics); } } @@ -352,6 +391,7 @@ public void writeToFileWriter(ParquetFileWriter writer) throws IOException { uncompressedLength, compressedLength, totalStatistics, + totalSizeStatistics, columnIndexBuilder, offsetIndexBuilder, bloomFilter, @@ -368,6 +408,7 @@ public void writeToFileWriter(ParquetFileWriter writer) throws IOException { uncompressedLength, compressedLength, totalStatistics, + totalSizeStatistics, columnIndexBuilder, offsetIndexBuilder, bloomFilter, diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnIndexFilterUtils.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnIndexFilterUtils.java index fbec3b26d4..0f4022b3d9 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnIndexFilterUtils.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnIndexFilterUtils.java @@ -22,6 +22,7 @@ import java.util.Arrays; import java.util.Formatter; import java.util.List; +import java.util.Optional; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.internal.column.columnindex.OffsetIndex; @@ -101,6 +102,11 @@ public long getLastRowIndex(int pageIndex, long totalRowCount) { return (nextIndex >= offsetIndex.getPageCount() ? totalRowCount : offsetIndex.getFirstRowIndex(nextIndex)) - 1; } + @Override + public Optional getUnencodedByteArrayDataBytes(int pageIndex) { + return offsetIndex.getUnencodedByteArrayDataBytes(indexMap[pageIndex]); + } + @Override public String toString() { try (Formatter formatter = new Formatter()) { diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java index 42ea6d921d..18b722f7bd 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; import java.util.zip.CRC32; @@ -51,6 +52,7 @@ import org.apache.parquet.column.EncodingStats; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.column.statistics.SizeStatistics; import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.column.values.bloomfilter.BloomFilter; import org.apache.parquet.crypto.AesCipher; @@ -152,6 +154,7 @@ public static enum Mode { private long uncompressedLength; private long compressedLength; private Statistics currentStatistics; // accumulated in writePage(s) + private SizeStatistics currentSizeStatistics; // accumulated in writePage(s) private ColumnIndexBuilder columnIndexBuilder; private OffsetIndexBuilder offsetIndexBuilder; @@ -474,6 +477,8 @@ public void startColumn(ColumnDescriptor descriptor, uncompressedLength = 0; // The statistics will be copied from the first one added at writeDataPage(s) so we have the correct typed one currentStatistics = null; + currentSizeStatistics = SizeStatistics.newBuilder(descriptor.getPrimitiveType(), + descriptor.getMaxRepetitionLevel(), descriptor.getMaxDefinitionLevel()).build(); columnIndexBuilder = ColumnIndexBuilder.getBuilder(currentChunkType, columnIndexTruncateLength); offsetIndexBuilder = OffsetIndexBuilder.getBuilder(); @@ -596,7 +601,8 @@ public void writeDataPage( // We are unable to build indexes without rowCount so skip them for this column offsetIndexBuilder = OffsetIndexBuilder.getNoOpBuilder(); columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder(); - innerWriteDataPage(valueCount, uncompressedPageSize, bytes, statistics, rlEncoding, dlEncoding, valuesEncoding, null, null); + innerWriteDataPage(valueCount, uncompressedPageSize, bytes, statistics, rlEncoding, dlEncoding, valuesEncoding, + null, null, null); } /** @@ -611,6 +617,7 @@ public void writeDataPage( * @param valuesEncoding encoding of values * @throws IOException if any I/O error occurs during writing the file */ + @Deprecated public void writeDataPage( int valueCount, int uncompressedPageSize, BytesInput bytes, @@ -619,7 +626,37 @@ public void writeDataPage( Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding) throws IOException { - writeDataPage(valueCount, uncompressedPageSize, bytes, statistics, rowCount, rlEncoding, dlEncoding, valuesEncoding, null, null); + writeDataPage(valueCount, uncompressedPageSize, bytes, statistics, rowCount, rlEncoding, dlEncoding, valuesEncoding, + null, null, null); + } + + /** + * Writes a single page + * @param valueCount count of values + * @param uncompressedPageSize the size of the data once uncompressed + * @param bytes the compressed data for the page without header + * @param statistics the statistics of the page + * @param rowCount the number of rows in the page + * @param rlEncoding encoding of the repetition level + * @param dlEncoding encoding of the definition level + * @param valuesEncoding encoding of values + * @param metadataBlockEncryptor encryptor for block data + * @param pageHeaderAAD pageHeader AAD + * @throws IOException if any I/O error occurs during writing the file + */ + @Deprecated + public void writeDataPage( + int valueCount, int uncompressedPageSize, + BytesInput bytes, + Statistics statistics, + long rowCount, + Encoding rlEncoding, + Encoding dlEncoding, + Encoding valuesEncoding, + BlockCipher.Encryptor metadataBlockEncryptor, + byte[] pageHeaderAAD) throws IOException { + writeDataPage(valueCount, uncompressedPageSize, bytes, statistics, rowCount, rlEncoding, dlEncoding, + valuesEncoding, metadataBlockEncryptor, pageHeaderAAD, null); } /** @@ -634,6 +671,7 @@ public void writeDataPage( * @param valuesEncoding encoding of values * @param metadataBlockEncryptor encryptor for block data * @param pageHeaderAAD pageHeader AAD + * @param sizeStatistics size statistics for the page * @throws IOException if any I/O error occurs during writing the file */ public void writeDataPage( @@ -645,10 +683,13 @@ public void writeDataPage( Encoding dlEncoding, Encoding valuesEncoding, BlockCipher.Encryptor metadataBlockEncryptor, - byte[] pageHeaderAAD) throws IOException { + byte[] pageHeaderAAD, + SizeStatistics sizeStatistics) throws IOException { long beforeHeader = out.getPos(); - innerWriteDataPage(valueCount, uncompressedPageSize, bytes, statistics, rlEncoding, dlEncoding, valuesEncoding, metadataBlockEncryptor, pageHeaderAAD); - offsetIndexBuilder.add((int) (out.getPos() - beforeHeader), rowCount); + innerWriteDataPage(valueCount, uncompressedPageSize, bytes, statistics, rlEncoding, dlEncoding, valuesEncoding, + metadataBlockEncryptor, pageHeaderAAD, sizeStatistics); + offsetIndexBuilder.add((int) (out.getPos() - beforeHeader), rowCount, + sizeStatistics != null ? sizeStatistics.getUnencodedByteArrayDataBytes() : Optional.empty()); } private void innerWriteDataPage( @@ -659,8 +700,10 @@ private void innerWriteDataPage( Encoding dlEncoding, Encoding valuesEncoding, BlockCipher.Encryptor metadataBlockEncryptor, - byte[] pageHeaderAAD) throws IOException { - writeDataPage(valueCount, uncompressedPageSize, bytes, statistics, rlEncoding, dlEncoding, valuesEncoding, metadataBlockEncryptor, pageHeaderAAD); + byte[] pageHeaderAAD, + SizeStatistics sizeStatistics) throws IOException { + writeDataPage(valueCount, uncompressedPageSize, bytes, statistics, rlEncoding, dlEncoding, valuesEncoding, + metadataBlockEncryptor, pageHeaderAAD, sizeStatistics); } /** @@ -676,6 +719,7 @@ private void innerWriteDataPage( * @param pageHeaderAAD pageHeader AAD * @throws IOException if there is an error while writing */ + @Deprecated public void writeDataPage( int valueCount, int uncompressedPageSize, BytesInput bytes, @@ -685,6 +729,34 @@ public void writeDataPage( Encoding valuesEncoding, BlockCipher.Encryptor metadataBlockEncryptor, byte[] pageHeaderAAD) throws IOException { + writeDataPage(valueCount, uncompressedPageSize, bytes, statistics, rlEncoding, dlEncoding, valuesEncoding, + metadataBlockEncryptor, pageHeaderAAD, null); + } + + /** + * writes a single page + * @param valueCount count of values + * @param uncompressedPageSize the size of the data once uncompressed + * @param bytes the compressed data for the page without header + * @param statistics statistics for the page + * @param rlEncoding encoding of the repetition level + * @param dlEncoding encoding of the definition level + * @param valuesEncoding encoding of values + * @param metadataBlockEncryptor encryptor for block data + * @param pageHeaderAAD pageHeader AAD + * @param sizeStatistics size statistics for the page + * @throws IOException if there is an error while writing + */ + public void writeDataPage( + int valueCount, int uncompressedPageSize, + BytesInput bytes, + Statistics statistics, + Encoding rlEncoding, + Encoding dlEncoding, + Encoding valuesEncoding, + BlockCipher.Encryptor metadataBlockEncryptor, + byte[] pageHeaderAAD, + SizeStatistics sizeStatistics) throws IOException { state = state.write(); long beforeHeader = out.getPos(); if (currentChunkFirstDataPage < 0) { @@ -722,7 +794,7 @@ public void writeDataPage( LOG.debug("{}: write data page content {}", out.getPos(), compressedPageSize); bytes.writeAllTo(out); - mergeColumnStatistics(statistics); + mergeColumnStatistics(statistics, sizeStatistics); encodingStatsBuilder.addDataEncoding(valuesEncoding); currentEncodings.add(rlEncoding); @@ -754,6 +826,7 @@ public void addBloomFilter(String column, BloomFilter bloomFilter) { * @param statistics the statistics of the page * @throws IOException if any I/O error occurs during writing the file */ + @Deprecated public void writeDataPageV2( int rowCount, int nullCount, @@ -775,6 +848,51 @@ public void writeDataPageV2( uncompressedDataSize, statistics, null, + null, + null); + } + + /** + * Writes a single v2 data page + * + * @param rowCount count of rows + * @param nullCount count of nulls + * @param valueCount count of values + * @param repetitionLevels repetition level bytes + * @param definitionLevels definition level bytes + * @param dataEncoding encoding for data + * @param compressedData compressed data bytes + * @param uncompressedDataSize the size of uncompressed data + * @param statistics the statistics of the page + * @param metadataBlockEncryptor encryptor for block data + * @param pageHeaderAAD pageHeader AAD + * @throws IOException if any I/O error occurs during writing the file + */ + @Deprecated + public void writeDataPageV2( + int rowCount, + int nullCount, + int valueCount, + BytesInput repetitionLevels, + BytesInput definitionLevels, + Encoding dataEncoding, + BytesInput compressedData, + int uncompressedDataSize, + Statistics statistics, + BlockCipher.Encryptor metadataBlockEncryptor, + byte[] pageHeaderAAD) throws IOException { + writeDataPageV2( + rowCount, + nullCount, + valueCount, + repetitionLevels, + definitionLevels, + dataEncoding, + compressedData, + uncompressedDataSize, + statistics, + metadataBlockEncryptor, + pageHeaderAAD, null); } @@ -792,6 +910,7 @@ public void writeDataPageV2( * @param statistics the statistics of the page * @param metadataBlockEncryptor encryptor for block data * @param pageHeaderAAD pageHeader AAD + * @param sizeStatistics size statistics for the page * @throws IOException if any I/O error occurs during writing the file */ public void writeDataPageV2( @@ -805,7 +924,8 @@ public void writeDataPageV2( int uncompressedDataSize, Statistics statistics, BlockCipher.Encryptor metadataBlockEncryptor, - byte[] pageHeaderAAD) throws IOException { + byte[] pageHeaderAAD, + SizeStatistics sizeStatistics) throws IOException { state = state.write(); int rlByteLength = toIntWithCheck(repetitionLevels.size()); int dlByteLength = toIntWithCheck(definitionLevels.size()); @@ -860,7 +980,7 @@ public void writeDataPageV2( this.uncompressedLength += uncompressedSize + headersSize; this.compressedLength += compressedSize + headersSize; - mergeColumnStatistics(statistics); + mergeColumnStatistics(statistics, sizeStatistics); currentEncodings.add(dataEncoding); encodingStatsBuilder.addDataEncoding(dataEncoding); @@ -868,7 +988,8 @@ public void writeDataPageV2( BytesInput.concat(repetitionLevels, definitionLevels, compressedData) .writeAllTo(out); - offsetIndexBuilder.add((int) (out.getPos() - beforeHeader), rowCount); + offsetIndexBuilder.add((int) (out.getPos() - beforeHeader), rowCount, + sizeStatistics != null ? sizeStatistics.getUnencodedByteArrayDataBytes() : Optional.empty()); } /** @@ -897,6 +1018,7 @@ void writeColumnChunk(ColumnDescriptor descriptor, long uncompressedTotalPageSize, long compressedTotalPageSize, Statistics totalStats, + SizeStatistics totalSizeStats, ColumnIndexBuilder columnIndexBuilder, OffsetIndexBuilder offsetIndexBuilder, BloomFilter bloomFilter, @@ -904,8 +1026,8 @@ void writeColumnChunk(ColumnDescriptor descriptor, Set dlEncodings, List dataEncodings) throws IOException { writeColumnChunk(descriptor, valueCount, compressionCodecName, dictionaryPage, bytes, - uncompressedTotalPageSize, compressedTotalPageSize, totalStats, columnIndexBuilder, offsetIndexBuilder, - bloomFilter, rlEncodings, dlEncodings, dataEncodings, null, 0, 0, null); + uncompressedTotalPageSize, compressedTotalPageSize, totalStats, totalSizeStats, + columnIndexBuilder, offsetIndexBuilder, bloomFilter, rlEncodings, dlEncodings, dataEncodings, null, 0, 0, null); } void writeColumnChunk(ColumnDescriptor descriptor, @@ -916,6 +1038,7 @@ void writeColumnChunk(ColumnDescriptor descriptor, long uncompressedTotalPageSize, long compressedTotalPageSize, Statistics totalStats, + SizeStatistics totalSizeStats, ColumnIndexBuilder columnIndexBuilder, OffsetIndexBuilder offsetIndexBuilder, BloomFilter bloomFilter, @@ -969,6 +1092,7 @@ void writeColumnChunk(ColumnDescriptor descriptor, currentEncodings.addAll(dlEncodings); currentEncodings.addAll(dataEncodings); currentStatistics = totalStats; + currentSizeStatistics = totalSizeStats; this.columnIndexBuilder = columnIndexBuilder; this.offsetIndexBuilder = offsetIndexBuilder; @@ -1013,7 +1137,8 @@ public void endColumn() throws IOException { currentChunkDictionaryPageOffset, currentChunkValueCount, compressedLength, - uncompressedLength)); + uncompressedLength, + currentSizeStatistics)); this.currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + uncompressedLength); this.uncompressedLength = 0; this.compressedLength = 0; @@ -1318,7 +1443,15 @@ private int toIntWithCheck(long size) { return (int)size; } - private void mergeColumnStatistics(Statistics statistics) { + private void mergeColumnStatistics(Statistics statistics, SizeStatistics sizeStatistics) { + Preconditions.checkState(currentSizeStatistics != null, "Aggregate size statistics should not be null"); + currentSizeStatistics.mergeStatistics(sizeStatistics); + if (!currentSizeStatistics.isValid()) { + // Set page size statistics to null to clear state in the ColumnIndexBuilder. + sizeStatistics = null; + } + + // Do not merge statistics and build column index if any page statistics is invalid. if (currentStatistics != null && currentStatistics.isEmpty()) { return; } @@ -1331,10 +1464,10 @@ private void mergeColumnStatistics(Statistics statistics) { } else if (currentStatistics == null) { // Copying the statistics if it is not initialized yet, so we have the correct typed one currentStatistics = statistics.copy(); - columnIndexBuilder.add(statistics); + columnIndexBuilder.add(statistics, sizeStatistics); } else { currentStatistics.mergeStatistics(statistics); - columnIndexBuilder.add(statistics); + columnIndexBuilder.add(statistics, sizeStatistics); } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java index 158348e01c..0b0edd0533 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java @@ -29,6 +29,7 @@ import org.apache.parquet.column.Encoding; import org.apache.parquet.column.EncodingStats; import org.apache.parquet.column.statistics.BooleanStatistics; +import org.apache.parquet.column.statistics.SizeStatistics; import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.crypto.AesCipher; import org.apache.parquet.crypto.InternalColumnDecryptionSetup; @@ -116,6 +117,38 @@ public static ColumnChunkMetaData get( firstDataPage, dictionaryPageOffset, valueCount, totalSize, totalUncompressedSize); } + @Deprecated + public static ColumnChunkMetaData get( + ColumnPath path, + PrimitiveType type, + CompressionCodecName codec, + EncodingStats encodingStats, + Set encodings, + Statistics statistics, + long firstDataPage, + long dictionaryPageOffset, + long valueCount, + long totalSize, + long totalUncompressedSize) { + return get(path, type, codec, encodingStats, encodings, statistics, firstDataPage, dictionaryPageOffset, + valueCount, totalSize, totalUncompressedSize, null); + } + + /** + * @param path the path of this column in the write schema + * @param type primitive type for this column + * @param codec the compression codec used to compress + * @param encodingStats EncodingStats for the encodings used in this column + * @param encodings a set of encoding used in this column + * @param statistics statistics for the data in this column + * @param firstDataPage offset of the first non-dictionary page + * @param dictionaryPageOffset offset of the dictionary page + * @param valueCount number of values + * @param totalSize total compressed size + * @param totalUncompressedSize uncompressed data size + * @param sizeStatistics size statistics for the data in this column + * @return a column chunk metadata instance + */ public static ColumnChunkMetaData get( ColumnPath path, PrimitiveType type, @@ -127,7 +160,8 @@ public static ColumnChunkMetaData get( long dictionaryPageOffset, long valueCount, long totalSize, - long totalUncompressedSize) { + long totalUncompressedSize, + SizeStatistics sizeStatistics) { // to save space we store those always positive longs in ints when they fit. if (positiveLongFitsInAnInt(firstDataPage) @@ -143,7 +177,8 @@ && positiveLongFitsInAnInt(totalUncompressedSize)) { dictionaryPageOffset, valueCount, totalSize, - totalUncompressedSize); + totalUncompressedSize, + sizeStatistics); } else { return new LongColumnChunkMetaData( path, type, codec, @@ -153,7 +188,8 @@ && positiveLongFitsInAnInt(totalUncompressedSize)) { dictionaryPageOffset, valueCount, totalSize, - totalUncompressedSize); + totalUncompressedSize, + sizeStatistics); } } @@ -286,6 +322,13 @@ public PrimitiveType getPrimitiveType() { */ abstract public Statistics getStatistics(); + /** + * Method should be considered private + * + * @return the size stats for this column + */ + abstract public SizeStatistics getSizeStatistics(); + /** * Method should be considered private * @@ -391,6 +434,7 @@ class IntColumnChunkMetaData extends ColumnChunkMetaData { private final int totalSize; private final int totalUncompressedSize; private final Statistics statistics; + private final SizeStatistics sizeStatistics; /** * @param path column identifier @@ -403,6 +447,7 @@ class IntColumnChunkMetaData extends ColumnChunkMetaData { * @param valueCount * @param totalSize * @param totalUncompressedSize + * @param sizeStatistics */ IntColumnChunkMetaData( ColumnPath path, @@ -415,7 +460,8 @@ class IntColumnChunkMetaData extends ColumnChunkMetaData { long dictionaryPageOffset, long valueCount, long totalSize, - long totalUncompressedSize) { + long totalUncompressedSize, + SizeStatistics sizeStatistics) { super(encodingStats, ColumnChunkProperties.get(path, type, codec, encodings)); this.firstDataPage = positiveLongToInt(firstDataPage); this.dictionaryPageOffset = positiveLongToInt(dictionaryPageOffset); @@ -423,6 +469,7 @@ class IntColumnChunkMetaData extends ColumnChunkMetaData { this.totalSize = positiveLongToInt(totalSize); this.totalUncompressedSize = positiveLongToInt(totalUncompressedSize); this.statistics = statistics; + this.sizeStatistics = sizeStatistics; } /** @@ -487,6 +534,14 @@ public long getTotalSize() { public Statistics getStatistics() { return statistics; } + + /** + * @return the size stats for this column + */ + @Override + public SizeStatistics getSizeStatistics() { + return sizeStatistics; + } } class LongColumnChunkMetaData extends ColumnChunkMetaData { @@ -497,6 +552,8 @@ class LongColumnChunkMetaData extends ColumnChunkMetaData { private final long totalSize; private final long totalUncompressedSize; private final Statistics statistics; + private final SizeStatistics sizeStatistics; + /** * @param path column identifier @@ -509,6 +566,7 @@ class LongColumnChunkMetaData extends ColumnChunkMetaData { * @param valueCount * @param totalSize * @param totalUncompressedSize + * @param sizeStatistics */ LongColumnChunkMetaData( ColumnPath path, @@ -521,7 +579,8 @@ class LongColumnChunkMetaData extends ColumnChunkMetaData { long dictionaryPageOffset, long valueCount, long totalSize, - long totalUncompressedSize) { + long totalUncompressedSize, + SizeStatistics sizeStatistics) { super(encodingStats, ColumnChunkProperties.get(path, type, codec, encodings)); this.firstDataPageOffset = firstDataPageOffset; this.dictionaryPageOffset = dictionaryPageOffset; @@ -529,6 +588,7 @@ class LongColumnChunkMetaData extends ColumnChunkMetaData { this.totalSize = totalSize; this.totalUncompressedSize = totalUncompressedSize; this.statistics = statistics; + this.sizeStatistics = sizeStatistics; } /** @@ -572,6 +632,14 @@ public long getTotalSize() { public Statistics getStatistics() { return statistics; } + + /** + * @return the size stats for this column + */ + @Override + public SizeStatistics getSizeStatistics() { + return sizeStatistics; + } } class EncryptedColumnChunkMetaData extends ColumnChunkMetaData { @@ -676,6 +744,12 @@ public Statistics getStatistics() { return shadowColumnChunkMetaData.getStatistics(); } + @Override + public SizeStatistics getSizeStatistics() { + decryptIfNeeded(); + return shadowColumnChunkMetaData.getSizeStatistics(); + } + /** * @return whether or not this column is encrypted */ diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java index 3997808cfb..ab4a7c36b8 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java @@ -63,11 +63,13 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.TreeSet; import com.google.common.collect.Sets; +import it.unimi.dsi.fastutil.longs.LongArrayList; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.UTF8; @@ -84,6 +86,7 @@ import org.apache.parquet.column.statistics.FloatStatistics; import org.apache.parquet.column.statistics.IntStatistics; import org.apache.parquet.column.statistics.LongStatistics; +import org.apache.parquet.column.statistics.SizeStatistics; import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.example.data.Group; import org.apache.parquet.example.data.simple.SimpleGroup; @@ -109,6 +112,7 @@ import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.zookeeper.Op; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -1243,58 +1247,77 @@ public void testColumnOrders() throws IOException { @Test public void testOffsetIndexConversion() { - OffsetIndexBuilder builder = OffsetIndexBuilder.getBuilder(); - builder.add(1000, 10000, 0); - builder.add(22000, 12000, 100); - OffsetIndex offsetIndex = ParquetMetadataConverter + for (boolean withSizeStats : new boolean[] {false, true}) { + OffsetIndexBuilder builder = OffsetIndexBuilder.getBuilder(); + builder.add(1000, 10000, 0, withSizeStats ? Optional.of(11L) : Optional.empty()); + builder.add(22000, 12000, 100, withSizeStats ? Optional.of(22L) : Optional.empty()); + OffsetIndex offsetIndex = ParquetMetadataConverter .fromParquetOffsetIndex(ParquetMetadataConverter.toParquetOffsetIndex(builder.build(100000))); - assertEquals(2, offsetIndex.getPageCount()); - assertEquals(101000, offsetIndex.getOffset(0)); - assertEquals(10000, offsetIndex.getCompressedPageSize(0)); - assertEquals(0, offsetIndex.getFirstRowIndex(0)); - assertEquals(122000, offsetIndex.getOffset(1)); - assertEquals(12000, offsetIndex.getCompressedPageSize(1)); - assertEquals(100, offsetIndex.getFirstRowIndex(1)); + assertEquals(2, offsetIndex.getPageCount()); + assertEquals(101000, offsetIndex.getOffset(0)); + assertEquals(10000, offsetIndex.getCompressedPageSize(0)); + assertEquals(0, offsetIndex.getFirstRowIndex(0)); + assertEquals(122000, offsetIndex.getOffset(1)); + assertEquals(12000, offsetIndex.getCompressedPageSize(1)); + assertEquals(100, offsetIndex.getFirstRowIndex(1)); + if (withSizeStats) { + assertEquals(Optional.of(11L), offsetIndex.getUnencodedByteArrayDataBytes(0)); + assertEquals(Optional.of(22L), offsetIndex.getUnencodedByteArrayDataBytes(1)); + } else { + assertFalse(offsetIndex.getUnencodedByteArrayDataBytes(0).isPresent()); + assertFalse(offsetIndex.getUnencodedByteArrayDataBytes(1).isPresent()); + } + } } @Test public void testColumnIndexConversion() { - PrimitiveType type = Types.required(PrimitiveTypeName.INT64).named("test_int64"); - ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE); - Statistics stats = Statistics.createStats(type); - stats.incrementNumNulls(16); - stats.updateStats(-100l); - stats.updateStats(100l); - builder.add(stats); - stats = Statistics.createStats(type); - stats.incrementNumNulls(111); - builder.add(stats); - stats = Statistics.createStats(type); - stats.updateStats(200l); - stats.updateStats(500l); - builder.add(stats); - org.apache.parquet.format.ColumnIndex parquetColumnIndex = + for (boolean withSizeStats : new boolean[] {false, true}) { + PrimitiveType type = Types.required(PrimitiveTypeName.INT64).named("test_int64"); + ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE); + Statistics stats = Statistics.createStats(type); + stats.incrementNumNulls(16); + stats.updateStats(-100l); + stats.updateStats(100l); + builder.add(stats, withSizeStats ? new SizeStatistics(type, 0, LongArrayList.of(1, 2), LongArrayList.of(6, 5)): null); + stats = Statistics.createStats(type); + stats.incrementNumNulls(111); + builder.add(stats, withSizeStats ? new SizeStatistics(type, 0, LongArrayList.of(3, 4), LongArrayList.of(4, 3)): null); + stats = Statistics.createStats(type); + stats.updateStats(200l); + stats.updateStats(500l); + builder.add(stats, withSizeStats ? new SizeStatistics(type, 0, LongArrayList.of(5, 6), LongArrayList.of(2, 1)): null); + org.apache.parquet.format.ColumnIndex parquetColumnIndex = ParquetMetadataConverter.toParquetColumnIndex(type, builder.build()); - ColumnIndex columnIndex = ParquetMetadataConverter.fromParquetColumnIndex(type, parquetColumnIndex); - assertEquals(BoundaryOrder.ASCENDING, columnIndex.getBoundaryOrder()); - assertTrue(Arrays.asList(false, true, false).equals(columnIndex.getNullPages())); - assertTrue(Arrays.asList(16l, 111l, 0l).equals(columnIndex.getNullCounts())); - assertTrue(Arrays.asList( + ColumnIndex columnIndex = ParquetMetadataConverter.fromParquetColumnIndex(type, parquetColumnIndex); + assertEquals(BoundaryOrder.ASCENDING, columnIndex.getBoundaryOrder()); + assertTrue(Arrays.asList(false, true, false).equals(columnIndex.getNullPages())); + assertTrue(Arrays.asList(16l, 111l, 0l).equals(columnIndex.getNullCounts())); + assertTrue(Arrays.asList( ByteBuffer.wrap(BytesUtils.longToBytes(-100l)), ByteBuffer.allocate(0), ByteBuffer.wrap(BytesUtils.longToBytes(200l))).equals(columnIndex.getMinValues())); - assertTrue(Arrays.asList( + assertTrue(Arrays.asList( ByteBuffer.wrap(BytesUtils.longToBytes(100l)), ByteBuffer.allocate(0), ByteBuffer.wrap(BytesUtils.longToBytes(500l))).equals(columnIndex.getMaxValues())); - assertNull("Should handle null column index", ParquetMetadataConverter + assertNull("Should handle null column index", ParquetMetadataConverter .toParquetColumnIndex(Types.required(PrimitiveTypeName.INT32).named("test_int32"), null)); - assertNull("Should ignore unsupported types", ParquetMetadataConverter + assertNull("Should ignore unsupported types", ParquetMetadataConverter .toParquetColumnIndex(Types.required(PrimitiveTypeName.INT96).named("test_int96"), columnIndex)); - assertNull("Should ignore unsupported types", + assertNull("Should ignore unsupported types", ParquetMetadataConverter.fromParquetColumnIndex(Types.required(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) - .length(12).as(OriginalType.INTERVAL).named("test_interval"), parquetColumnIndex)); + .length(12).as(OriginalType.INTERVAL).named("test_interval"), parquetColumnIndex)); + + if (withSizeStats) { + assertEquals(LongArrayList.of(1, 2, 3, 4, 5, 6), columnIndex.getRepetitionLevelHistogram()); + assertEquals(LongArrayList.of(6, 5, 4, 3, 2, 1), columnIndex.getDefinitionLevelHistogram()); + } else { + assertEquals(LongArrayList.of(), columnIndex.getRepetitionLevelHistogram()); + assertEquals(LongArrayList.of(), columnIndex.getDefinitionLevelHistogram()); + } + } } @Test @@ -1386,4 +1409,18 @@ private void verifyMapMessageType(final MessageType messageType, final String ke } } } + + @Test + public void testSizeStatisticsConversion() { + PrimitiveType type = Types.required(PrimitiveTypeName.BINARY).named("test"); + List repLevelHistogram = Arrays.asList(1L, 2L, 3L, 4L, 5L); + List defLevelHistogram = Arrays.asList(6L, 7L, 8L, 9L, 10L); + SizeStatistics sizeStatistics = ParquetMetadataConverter.fromParquetSizeStatistics( + ParquetMetadataConverter.toParquetSizeStatistics( + new SizeStatistics(type, 1024, repLevelHistogram, defLevelHistogram)), type); + assertEquals(type, sizeStatistics.getType()); + assertEquals(Optional.of(1024L), sizeStatistics.getUnencodedByteArrayDataBytes()); + assertEquals(repLevelHistogram, sizeStatistics.getRepetitionLevelHistogram()); + assertEquals(defLevelHistogram, sizeStatistics.getDefinitionLevelHistogram()); + } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java index a931051891..9ec51dfb3a 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java @@ -274,6 +274,7 @@ public void testColumnOrderV1() throws IOException { eq(fakeData.size()), eq(fakeData.size()), eq(fakeStats), + any(), same(ColumnIndexBuilder.getNoOpBuilder()), // Deprecated writePage -> no column index same(OffsetIndexBuilder.getNoOpBuilder()), // Deprecated writePage -> no offset index any(), diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestSizeStatisticsRoundTrip.java b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestSizeStatisticsRoundTrip.java new file mode 100644 index 0000000000..261f88d8cc --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestSizeStatisticsRoundTrip.java @@ -0,0 +1,85 @@ +package org.apache.parquet.statistics; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.Preconditions; +import org.apache.parquet.column.statistics.SizeStatistics; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.GroupFactory; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.internal.column.columnindex.ColumnIndex; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Types; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Optional; + +public class TestSizeStatisticsRoundTrip { + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + @Test + public void testBinaryColumnSizeStatistics() throws IOException { + MessageType schema = Types.buildMessage().optional(PrimitiveType.PrimitiveTypeName.BINARY) + .as(LogicalTypeAnnotation.stringType()).named("name").named("msg"); + + Configuration conf = new Configuration(); + GroupWriteSupport.setSchema(schema, conf); + + GroupFactory factory = new SimpleGroupFactory(schema); + Path path = newTempPath(); + try (ParquetWriter writer = ExampleParquetWriter.builder(path) + .withPageRowCountLimit(2) + .withMinRowCountForPageSizeCheck(1) + .withConf(conf) + .build()) { + writer.write(factory.newGroup().append("name", "a")); + writer.write(factory.newGroup().append("name", "b")); + writer.write(factory.newGroup().append("name", "c")); + writer.write(factory.newGroup().append("name", "d")); + } + + try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, new Configuration()))) { + ParquetMetadata footer = reader.getFooter(); + ColumnChunkMetaData column = footer.getBlocks().get(0).getColumns().get(0); + + SizeStatistics sizeStatistics = column.getSizeStatistics(); + Assert.assertEquals(Optional.of(4L), sizeStatistics.getUnencodedByteArrayDataBytes()); + Assert.assertEquals(Arrays.asList(4L), sizeStatistics.getRepetitionLevelHistogram()); + Assert.assertEquals(Arrays.asList(0L, 4L), sizeStatistics.getDefinitionLevelHistogram()); + + ColumnIndex columnIndex = reader.readColumnIndex(column); + Assert.assertEquals(Arrays.asList(2L, 2L), columnIndex.getRepetitionLevelHistogram()); + Assert.assertEquals(Arrays.asList(0L, 2L, 0L, 2L), columnIndex.getDefinitionLevelHistogram()); + + OffsetIndex offsetIndex = reader.readOffsetIndex(column); + Assert.assertEquals(2, offsetIndex.getPageCount()); + Assert.assertEquals(Optional.of(2L), offsetIndex.getUnencodedByteArrayDataBytes(0)); + Assert.assertEquals(Optional.of(2L), offsetIndex.getUnencodedByteArrayDataBytes(1)); + } + } + + private Path newTempPath() throws IOException { + File file = temp.newFile(); + Preconditions.checkArgument(file.delete(), "Could not remove temp file"); + return new Path(file.getAbsolutePath()); + } + +} diff --git a/pom.xml b/pom.xml index 3e41648067..fdc1c2d7c8 100644 --- a/pom.xml +++ b/pom.xml @@ -554,9 +554,18 @@ org.apache.parquet.hadoop.thrift.TBaseWriteSupport#setThriftClass(org.apache.parquet.conf.ParquetConfiguration,java.lang.Class) org.apache.parquet.proto.ProtoParquetReader#builder(org.apache.hadoop.fs.Path,boolean) org.apache.parquet.proto.ProtoParquetReader#builder(org.apache.parquet.io.InputFile,boolean) +<<<<<<< HEAD org.apache.parquet.arrow.schema.SchemaMapping +======= + + org.apache.parquet.column.page.PageWriter + org.apache.parquet.internal.column.columnindex.ColumnIndex#getDefinitionLevelHistogram() + org.apache.parquet.internal.column.columnindex.ColumnIndex#getRepetitionLevelHistogram() + org.apache.parquet.internal.column.columnindex.OffsetIndex#getUnencodedByteArrayDataBytes(int) + org.apache.parquet.hadoop.metadata.ColumnChunkMetaData#getSizeStatistics(): +>>>>>>> 0acf99fda (PARQUET-2261: Implement SizeStatistics)