From 514cc6c257fe8e618b100a19d86d304f6442cb94 Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Mon, 30 Oct 2023 09:54:02 +0800 Subject: [PATCH] PARQUET-2366: Optimize random seek during rewriting (#1174) --- .../org/apache/parquet/hadoop/IndexCache.java | 100 ++++++++++ .../apache/parquet/hadoop/NoneIndexCache.java | 63 ++++++ .../parquet/hadoop/PrefetchIndexCache.java | 176 +++++++++++++++++ .../hadoop/rewrite/ParquetRewriter.java | 57 ++++-- .../hadoop/rewrite/RewriteOptions.java | 50 +++-- .../apache/parquet/hadoop/TestIndexCache.java | 183 ++++++++++++++++++ .../hadoop/rewrite/ParquetRewriterTest.java | 36 ++-- 7 files changed, 620 insertions(+), 45 deletions(-) create mode 100644 parquet-hadoop/src/main/java/org/apache/parquet/hadoop/IndexCache.java create mode 100644 parquet-hadoop/src/main/java/org/apache/parquet/hadoop/NoneIndexCache.java create mode 100644 parquet-hadoop/src/main/java/org/apache/parquet/hadoop/PrefetchIndexCache.java create mode 100644 parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestIndexCache.java diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/IndexCache.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/IndexCache.java new file mode 100644 index 0000000000..8002ee385d --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/IndexCache.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop; + +import org.apache.parquet.column.values.bloomfilter.BloomFilter; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.internal.column.columnindex.ColumnIndex; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; + +import java.io.IOException; +import java.util.Set; + +/** + * A cache for caching indexes(including: ColumnIndex, OffsetIndex and BloomFilter) + */ +public interface IndexCache { + + enum CacheStrategy { + NONE, /* No cache */ + PREFETCH_BLOCK /* Prefetch block indexes */ + } + + /** + * Create an index cache for the given file reader + * + * @param fileReader the file reader + * @param columns the columns that need to do cache + * @param cacheStrategy the cache strategy, supports NONE and PREFETCH_BLOCK + * @param freeCacheAfterGet whether free the given index cache after calling the given get method + * @return the index cache + */ + static IndexCache create( + ParquetFileReader fileReader, + Set columns, + CacheStrategy cacheStrategy, + boolean freeCacheAfterGet) { + if (cacheStrategy == CacheStrategy.NONE) { + return new NoneIndexCache(fileReader); + } else if (cacheStrategy == CacheStrategy.PREFETCH_BLOCK) { + return new PrefetchIndexCache(fileReader, columns, freeCacheAfterGet); + } else { + throw new UnsupportedOperationException("Unknown cache strategy: " + cacheStrategy); + } + } + + /** + * Set the current BlockMetadata + */ + void setBlockMetadata(BlockMetaData currentBlockMetadata) throws IOException; + + /** + * Get the ColumnIndex for the given column in the set row group. + * + * @param chunk the given column chunk + * @return the ColumnIndex for the given column + * @throws IOException if any I/O error occurs during get the ColumnIndex + */ + ColumnIndex getColumnIndex(ColumnChunkMetaData chunk) throws IOException; + + /** + * Get the OffsetIndex for the given column in the set row group. + * + * @param chunk the given column chunk + * @return the OffsetIndex for the given column + * @throws IOException if any I/O error occurs during get the OffsetIndex + */ + OffsetIndex getOffsetIndex(ColumnChunkMetaData chunk) throws IOException; + + /** + * Get the BloomFilter for the given column in the set row group. + * + * @param chunk the given column chunk + * @return the BloomFilter for the given column + * @throws IOException if any I/O error occurs during get the BloomFilter + */ + BloomFilter getBloomFilter(ColumnChunkMetaData chunk) throws IOException; + + /** + * Clean the cache + */ + void clean(); +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/NoneIndexCache.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/NoneIndexCache.java new file mode 100644 index 0000000000..e1aded1990 --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/NoneIndexCache.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop; + +import org.apache.parquet.column.values.bloomfilter.BloomFilter; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.internal.column.columnindex.ColumnIndex; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; + +import java.io.IOException; + +/** + * Cache nothing. All the get methods are pushed to ParquetFileReader to read the given index. + */ +class NoneIndexCache implements IndexCache { + private final ParquetFileReader fileReader; + + NoneIndexCache(ParquetFileReader fileReader) { + this.fileReader = fileReader; + } + + @Override + public void setBlockMetadata(BlockMetaData currentBlockMetadata) throws IOException { + // Do nothing + } + + @Override + public ColumnIndex getColumnIndex(ColumnChunkMetaData chunk) throws IOException { + return fileReader.readColumnIndex(chunk); + } + + @Override + public OffsetIndex getOffsetIndex(ColumnChunkMetaData chunk) throws IOException { + return fileReader.readOffsetIndex(chunk); + } + + @Override + public BloomFilter getBloomFilter(ColumnChunkMetaData chunk) throws IOException { + return fileReader.readBloomFilter(chunk); + } + + @Override + public void clean() { + // Do nothing + } +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/PrefetchIndexCache.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/PrefetchIndexCache.java new file mode 100644 index 0000000000..9bbf901ff0 --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/PrefetchIndexCache.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop; + +import org.apache.parquet.Preconditions; +import org.apache.parquet.column.values.bloomfilter.BloomFilter; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.internal.column.columnindex.ColumnIndex; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * This index cache will prefetch indexes of all columns when calling {@link #setBlockMetadata(BlockMetaData)}. + *

+ * NOTE: the {@link #setBlockMetadata(BlockMetaData)} will free the previous block cache + */ +class PrefetchIndexCache implements IndexCache { + private final ParquetFileReader fileReader; + private final Set columns; + private final boolean freeCacheAfterGet; + + private Map columnIndexCache; + private Map offsetIndexCache; + private Map bloomIndexCache; + + /** + * @param fileReader the file reader + * @param columns the columns that need to cache + * @param freeCacheAfterGet whether free the given index cache after calling the given get method + */ + PrefetchIndexCache( + ParquetFileReader fileReader, + Set columns, + boolean freeCacheAfterGet) { + this.fileReader = fileReader; + this.columns = columns; + this.freeCacheAfterGet = freeCacheAfterGet; + } + + @Override + public void setBlockMetadata(BlockMetaData currentBlockMetadata) throws IOException { + clean(); + this.columnIndexCache = readAllColumnIndexes(currentBlockMetadata); + this.offsetIndexCache = readAllOffsetIndexes(currentBlockMetadata); + this.bloomIndexCache = readAllBloomFilters(currentBlockMetadata); + } + + @Override + public ColumnIndex getColumnIndex(ColumnChunkMetaData chunk) throws IOException { + ColumnPath columnPath = chunk.getPath(); + if (columns.contains(columnPath)) { + Preconditions.checkState( + columnIndexCache.containsKey(columnPath), + "Not found cached ColumnIndex for column: %s with cache strategy: %s", + columnPath.toDotString(), + CacheStrategy.PREFETCH_BLOCK); + } + + if (freeCacheAfterGet) { + return columnIndexCache.remove(columnPath); + } else { + return columnIndexCache.get(columnPath); + } + } + + @Override + public OffsetIndex getOffsetIndex(ColumnChunkMetaData chunk) throws IOException { + ColumnPath columnPath = chunk.getPath(); + + if (columns.contains(columnPath)) { + Preconditions.checkState( + offsetIndexCache.containsKey(columnPath), + "Not found cached OffsetIndex for column: %s with cache strategy: %s", + columnPath.toDotString(), + CacheStrategy.PREFETCH_BLOCK); + } + + if (freeCacheAfterGet) { + return offsetIndexCache.remove(columnPath); + } else { + return offsetIndexCache.get(columnPath); + } + } + + @Override + public BloomFilter getBloomFilter(ColumnChunkMetaData chunk) throws IOException { + ColumnPath columnPath = chunk.getPath(); + + if (columns.contains(columnPath)) { + Preconditions.checkState( + bloomIndexCache.containsKey(columnPath), + "Not found cached BloomFilter for column: %s with cache strategy: %s", + columnPath.toDotString(), + CacheStrategy.PREFETCH_BLOCK); + } + + if (freeCacheAfterGet) { + return bloomIndexCache.remove(columnPath); + } else { + return bloomIndexCache.get(columnPath); + } + } + + @Override + public void clean() { + if (columnIndexCache != null) { + columnIndexCache.clear(); + columnIndexCache = null; + } + + if (offsetIndexCache != null) { + offsetIndexCache.clear(); + offsetIndexCache = null; + } + + if (bloomIndexCache != null) { + bloomIndexCache.clear(); + bloomIndexCache = null; + } + } + + private Map readAllColumnIndexes(BlockMetaData blockMetaData) throws IOException { + Map columnIndexMap = new HashMap<>(columns.size()); + for (ColumnChunkMetaData chunk : blockMetaData.getColumns()) { + if (columns.contains(chunk.getPath())) { + columnIndexMap.put(chunk.getPath(), fileReader.readColumnIndex(chunk)); + } + } + + return columnIndexMap; + } + + private Map readAllOffsetIndexes(BlockMetaData blockMetaData) throws IOException { + Map offsetIndexMap = new HashMap<>(columns.size()); + for (ColumnChunkMetaData chunk : blockMetaData.getColumns()) { + if (columns.contains(chunk.getPath())) { + offsetIndexMap.put(chunk.getPath(), fileReader.readOffsetIndex(chunk)); + } + } + + return offsetIndexMap; + } + + private Map readAllBloomFilters(BlockMetaData blockMetaData) throws IOException { + Map bloomFilterMap = new HashMap<>(columns.size()); + for (ColumnChunkMetaData chunk : blockMetaData.getColumns()) { + if (columns.contains(chunk.getPath())) { + bloomFilterMap.put(chunk.getPath(), fileReader.readBloomFilter(chunk)); + } + } + + return bloomFilterMap; + } +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java index 659ac1e5ca..2eaaab18c7 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java @@ -45,6 +45,7 @@ import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.CodecFactory; import org.apache.parquet.hadoop.ColumnChunkPageWriteStore; +import org.apache.parquet.hadoop.IndexCache; import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; @@ -96,19 +97,19 @@ public class ParquetRewriter implements Closeable { private final byte[] pageBuffer = new byte[pageBufferSize]; // Configurations for the new file private CompressionCodecName newCodecName = null; - private List pruneColumns = null; private Map maskColumns = null; private Set encryptColumns = null; private boolean encryptMode = false; - private Map extraMetaData = new HashMap<>(); + private final Map extraMetaData = new HashMap<>(); // Writer to rewrite the input files - private ParquetFileWriter writer; + private final ParquetFileWriter writer; // Number of blocks written which is used to keep track of the actual row group ordinal private int numBlocksRewritten = 0; // Reader and relevant states of the in-processing input file - private Queue inputFiles = new LinkedList<>(); + private final Queue inputFiles = new LinkedList<>(); // Schema of input files (should be the same) and to write to the output file private MessageType schema = null; + private final Map descriptorsMap; // The reader for the current input file private TransParquetFileReader reader = null; // The metadata of current reader being processed @@ -116,7 +117,9 @@ public class ParquetRewriter implements Closeable { // created_by information of current reader being processed private String originalCreatedBy = ""; // Unique created_by information from all input files - private Set allOriginalCreatedBys = new HashSet<>(); + private final Set allOriginalCreatedBys = new HashSet<>(); + // The index cache strategy + private final IndexCache.CacheStrategy indexCacheStrategy; public ParquetRewriter(RewriteOptions options) throws IOException { Configuration conf = options.getConf(); @@ -129,8 +132,7 @@ public ParquetRewriter(RewriteOptions options) throws IOException { initNextReader(); newCodecName = options.getNewCodecName(); - pruneColumns = options.getPruneColumns(); - + List pruneColumns = options.getPruneColumns(); // Prune columns if specified if (pruneColumns != null && !pruneColumns.isEmpty()) { List paths = new ArrayList<>(); @@ -145,6 +147,9 @@ public ParquetRewriter(RewriteOptions options) throws IOException { schema = pruneColumnsInSchema(schema, prunePaths); } + this.descriptorsMap = + schema.getColumns().stream().collect(Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x)); + if (options.getMaskColumns() != null) { this.maskColumns = new HashMap<>(); for (Map.Entry col : options.getMaskColumns().entrySet()) { @@ -157,6 +162,8 @@ public ParquetRewriter(RewriteOptions options) throws IOException { this.encryptMode = true; } + this.indexCacheStrategy = options.getIndexCacheStrategy(); + ParquetFileWriter.Mode writerMode = ParquetFileWriter.Mode.CREATE; writer = new ParquetFileWriter(HadoopOutputFile.fromPath(outPath, conf), schema, writerMode, DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT, DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, @@ -178,6 +185,8 @@ public ParquetRewriter(TransParquetFileReader reader, this.writer = writer; this.meta = meta; this.schema = schema; + this.descriptorsMap = + schema.getColumns().stream().collect(Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x)); this.newCodecName = codecName; originalCreatedBy = originalCreatedBy == null ? meta.getFileMetaData().getCreatedBy() : originalCreatedBy; extraMetaData.putAll(meta.getFileMetaData().getKeyValueMetaData()); @@ -188,6 +197,7 @@ public ParquetRewriter(TransParquetFileReader reader, this.maskColumns.put(ColumnPath.fromDotString(col), maskMode); } } + this.indexCacheStrategy = IndexCache.CacheStrategy.NONE; } // Open all input files to validate their schemas are compatible to merge @@ -247,24 +257,24 @@ public void close() throws IOException { public void processBlocks() throws IOException { while (reader != null) { - processBlocksFromReader(); + IndexCache indexCache = IndexCache.create(reader, descriptorsMap.keySet(), indexCacheStrategy, true); + processBlocksFromReader(indexCache); + indexCache.clean(); initNextReader(); } } - private void processBlocksFromReader() throws IOException { + private void processBlocksFromReader(IndexCache indexCache) throws IOException { PageReadStore store = reader.readNextRowGroup(); ColumnReadStoreImpl crStore = new ColumnReadStoreImpl(store, new DummyGroupConverter(), schema, originalCreatedBy); - Map descriptorsMap = schema.getColumns().stream().collect( - Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x)); int blockId = 0; while (store != null) { writer.startBlock(store.getRowCount()); BlockMetaData blockMetaData = meta.getBlocks().get(blockId); + indexCache.setBlockMetadata(blockMetaData); List columnsInOrder = blockMetaData.getColumns(); - for (int i = 0, columnId = 0; i < columnsInOrder.size(); i++) { ColumnChunkMetaData chunk = columnsInOrder.get(i); ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath()); @@ -314,13 +324,20 @@ private void processBlocksFromReader() throws IOException { // Translate compression and/or encryption writer.startColumn(descriptor, crStore.getColumnReader(descriptor).getTotalValueCount(), newCodecName); - processChunk(chunk, newCodecName, columnChunkEncryptorRunTime, encryptColumn); + processChunk( + chunk, + newCodecName, + columnChunkEncryptorRunTime, + encryptColumn, + indexCache.getBloomFilter(chunk), + indexCache.getColumnIndex(chunk), + indexCache.getOffsetIndex(chunk)); writer.endColumn(); } else { // Nothing changed, simply copy the binary data. - BloomFilter bloomFilter = reader.readBloomFilter(chunk); - ColumnIndex columnIndex = reader.readColumnIndex(chunk); - OffsetIndex offsetIndex = reader.readOffsetIndex(chunk); + BloomFilter bloomFilter = indexCache.getBloomFilter(chunk); + ColumnIndex columnIndex = indexCache.getColumnIndex(chunk); + OffsetIndex offsetIndex = indexCache.getOffsetIndex(chunk); writer.appendColumnChunk(descriptor, reader.getStream(), chunk, bloomFilter, columnIndex, offsetIndex); } @@ -338,7 +355,10 @@ private void processBlocksFromReader() throws IOException { private void processChunk(ColumnChunkMetaData chunk, CompressionCodecName newCodecName, ColumnChunkEncryptorRunTime columnChunkEncryptorRunTime, - boolean encryptColumn) throws IOException { + boolean encryptColumn, + BloomFilter bloomFilter, + ColumnIndex columnIndex, + OffsetIndex offsetIndex) throws IOException { CompressionCodecFactory codecFactory = HadoopCodecs.newFactory(0); CompressionCodecFactory.BytesInputDecompressor decompressor = null; CompressionCodecFactory.BytesInputCompressor compressor = null; @@ -364,9 +384,6 @@ private void processChunk(ColumnChunkMetaData chunk, dataPageHeaderAAD = columnChunkEncryptorRunTime.getDataPageHeaderAAD(); } - ColumnIndex columnIndex = reader.readColumnIndex(chunk); - OffsetIndex offsetIndex = reader.readOffsetIndex(chunk); - BloomFilter bloomFilter = reader.readBloomFilter(chunk); if (bloomFilter != null) { writer.addBloomFilter(chunk.getPath().toDotString(), bloomFilter); } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java index cc1280921b..5bdc8d590d 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path; import org.apache.parquet.Preconditions; import org.apache.parquet.crypto.FileEncryptionProperties; +import org.apache.parquet.hadoop.IndexCache; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import java.util.Arrays; @@ -33,14 +34,15 @@ */ public class RewriteOptions { - final Configuration conf; - final List inputFiles; - final Path outputFile; - final List pruneColumns; - final CompressionCodecName newCodecName; - final Map maskColumns; - final List encryptColumns; - final FileEncryptionProperties fileEncryptionProperties; + private final Configuration conf; + private final List inputFiles; + private final Path outputFile; + private final List pruneColumns; + private final CompressionCodecName newCodecName; + private final Map maskColumns; + private final List encryptColumns; + private final FileEncryptionProperties fileEncryptionProperties; + private final IndexCache.CacheStrategy indexCacheStrategy; private RewriteOptions(Configuration conf, List inputFiles, @@ -49,7 +51,8 @@ private RewriteOptions(Configuration conf, CompressionCodecName newCodecName, Map maskColumns, List encryptColumns, - FileEncryptionProperties fileEncryptionProperties) { + FileEncryptionProperties fileEncryptionProperties, + IndexCache.CacheStrategy indexCacheStrategy) { this.conf = conf; this.inputFiles = inputFiles; this.outputFile = outputFile; @@ -58,6 +61,7 @@ private RewriteOptions(Configuration conf, this.maskColumns = maskColumns; this.encryptColumns = encryptColumns; this.fileEncryptionProperties = fileEncryptionProperties; + this.indexCacheStrategy = indexCacheStrategy; } public Configuration getConf() { @@ -92,16 +96,21 @@ public FileEncryptionProperties getFileEncryptionProperties() { return fileEncryptionProperties; } + public IndexCache.CacheStrategy getIndexCacheStrategy() { + return indexCacheStrategy; + } + // Builder to create a RewriterOptions. public static class Builder { - private Configuration conf; - private List inputFiles; - private Path outputFile; + private final Configuration conf; + private final List inputFiles; + private final Path outputFile; private List pruneColumns; private CompressionCodecName newCodecName; private Map maskColumns; private List encryptColumns; private FileEncryptionProperties fileEncryptionProperties; + private IndexCache.CacheStrategy indexCacheStrategy = IndexCache.CacheStrategy.NONE; /** * Create a builder to create a RewriterOptions. @@ -213,6 +222,20 @@ public Builder addInputFile(Path path) { return this; } + /** + * Set the index(ColumnIndex, Offset and BloomFilter) cache strategy. + *

+ * This could reduce the random seek while rewriting with PREFETCH_BLOCK strategy, NONE by default. + * + * @param cacheStrategy the index cache strategy, supports: {@link IndexCache.CacheStrategy#NONE} or + * {@link IndexCache.CacheStrategy#PREFETCH_BLOCK} + * @return self + */ + public Builder indexCacheStrategy(IndexCache.CacheStrategy cacheStrategy) { + this.indexCacheStrategy = cacheStrategy; + return this; + } + /** * Build the RewriterOptions. * @@ -255,7 +278,8 @@ public RewriteOptions build() { newCodecName, maskColumns, encryptColumns, - fileEncryptionProperties); + fileEncryptionProperties, + indexCacheStrategy); } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestIndexCache.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestIndexCache.java new file mode 100644 index 0000000000..32874f795b --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestIndexCache.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.hadoop.util.TestFileBuilder; +import org.apache.parquet.internal.column.columnindex.ColumnIndex; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; +import org.apache.parquet.io.LocalInputFile; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.nio.file.Paths; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; +import static org.apache.parquet.schema.Type.Repetition.OPTIONAL; +import static org.apache.parquet.schema.Type.Repetition.REPEATED; +import static org.apache.parquet.schema.Type.Repetition.REQUIRED; + +@RunWith(Parameterized.class) +public class TestIndexCache { + private final Configuration conf = new Configuration(); + private final int numRecords = 100000; + private final MessageType schema = new MessageType("schema", + new PrimitiveType(OPTIONAL, INT64, "DocId"), + new PrimitiveType(REQUIRED, BINARY, "Name"), + new PrimitiveType(OPTIONAL, BINARY, "Gender"), + new GroupType(OPTIONAL, "Links", + new PrimitiveType(REPEATED, BINARY, "Backward"), + new PrimitiveType(REPEATED, BINARY, "Forward"))); + + private final ParquetProperties.WriterVersion writerVersion; + + @Parameterized.Parameters(name = "WriterVersion = {0}, IndexCacheStrategy = {1}") + public static Object[] parameters() { + return new Object[] {"v1", "v2"}; + } + + public TestIndexCache(String writerVersion) { + this.writerVersion = ParquetProperties.WriterVersion.fromString(writerVersion); + } + + @Test + public void testNoneCacheStrategy() throws IOException { + String file = createTestFile("DocID"); + + ParquetReadOptions options = ParquetReadOptions.builder().build(); + ParquetFileReader fileReader = new ParquetFileReader( + new LocalInputFile(Paths.get(file)), options); + IndexCache indexCache = IndexCache.create(fileReader, new HashSet<>(), IndexCache.CacheStrategy.NONE, false); + Assert.assertTrue(indexCache instanceof NoneIndexCache); + List blocks = fileReader.getFooter().getBlocks(); + for (BlockMetaData blockMetaData : blocks) { + indexCache.setBlockMetadata(blockMetaData); + for (ColumnChunkMetaData chunk : blockMetaData.getColumns()) { + validateColumnIndex(fileReader.readColumnIndex(chunk), indexCache.getColumnIndex(chunk)); + validateOffsetIndex(fileReader.readOffsetIndex(chunk), indexCache.getOffsetIndex(chunk)); + + Assert.assertEquals( + "BloomFilter should match", + fileReader.readBloomFilter(chunk), + indexCache.getBloomFilter(chunk)); + } + } + } + + @Test + public void testPrefetchCacheStrategy() throws IOException { + String file = createTestFile("DocID", "Name"); + + ParquetReadOptions options = ParquetReadOptions.builder().build(); + ParquetFileReader fileReader = new ParquetFileReader( + new LocalInputFile(Paths.get(file)), options); + Set columns = new HashSet<>(); + columns.add(ColumnPath.fromDotString("DocId")); + columns.add(ColumnPath.fromDotString("Name")); + columns.add(ColumnPath.fromDotString("Gender")); + columns.add(ColumnPath.fromDotString("Links.Backward")); + columns.add(ColumnPath.fromDotString("Links.Forward")); + + IndexCache indexCache = IndexCache.create(fileReader, columns, IndexCache.CacheStrategy.PREFETCH_BLOCK, false); + Assert.assertTrue(indexCache instanceof PrefetchIndexCache); + validPrecacheIndexCache(fileReader, indexCache, columns, false); + + indexCache = IndexCache.create(fileReader, columns, IndexCache.CacheStrategy.PREFETCH_BLOCK, true); + Assert.assertTrue(indexCache instanceof PrefetchIndexCache); + validPrecacheIndexCache(fileReader, indexCache, columns, true); + } + + private String createTestFile(String... bloomFilterEnabledColumns) throws IOException { + return new TestFileBuilder(conf, schema) + .withNumRecord(numRecords) + .withCodec("ZSTD") + .withRowGroupSize(8L * 1024 * 1024) + .withBloomFilterEnabled(bloomFilterEnabledColumns) + .withWriterVersion(writerVersion) + .build() + .getFileName(); + } + + private static void validPrecacheIndexCache( + ParquetFileReader fileReader, + IndexCache indexCache, + Set columns, + boolean freeCacheAfterGet) throws IOException { + List blocks = fileReader.getFooter().getBlocks(); + for (BlockMetaData blockMetaData : blocks) { + indexCache.setBlockMetadata(blockMetaData); + for (ColumnChunkMetaData chunk : blockMetaData.getColumns()) { + validateColumnIndex(fileReader.readColumnIndex(chunk), indexCache.getColumnIndex(chunk)); + validateOffsetIndex(fileReader.readOffsetIndex(chunk), indexCache.getOffsetIndex(chunk)); + + Assert.assertEquals( + "BloomFilter should match", + fileReader.readBloomFilter(chunk), + indexCache.getBloomFilter(chunk)); + + if (freeCacheAfterGet) { + Assert.assertThrows(IllegalStateException.class, () -> indexCache.getColumnIndex(chunk)); + Assert.assertThrows(IllegalStateException.class, () -> indexCache.getOffsetIndex(chunk)); + if (columns.contains(chunk.getPath())) { + Assert.assertThrows(IllegalStateException.class, () -> indexCache.getBloomFilter(chunk)); + } + } + } + } + } + + private static void validateColumnIndex(ColumnIndex expected, ColumnIndex target) { + if (expected == null) { + Assert.assertEquals("ColumnIndex should should equal", expected, target); + } else { + Assert.assertNotNull("ColumnIndex should not be null", target); + Assert.assertEquals(expected.getClass(), target.getClass()); + Assert.assertEquals(expected.getMinValues(), target.getMinValues()); + Assert.assertEquals(expected.getMaxValues(), target.getMaxValues()); + Assert.assertEquals(expected.getBoundaryOrder(), target.getBoundaryOrder()); + Assert.assertEquals(expected.getNullCounts(), target.getNullCounts()); + Assert.assertEquals(expected.getNullPages(), target.getNullPages()); + } + } + + private static void validateOffsetIndex(OffsetIndex expected, OffsetIndex target) { + if (expected == null) { + Assert.assertEquals("OffsetIndex should should equal", expected, target); + } else { + Assert.assertNotNull("OffsetIndex should not be null", target); + Assert.assertEquals(expected.getClass(), target.getClass()); + Assert.assertEquals(expected.toString(), target.toString()); + } + } +} diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java index 1f03decebf..6ce7e2c91f 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java @@ -37,6 +37,7 @@ import org.apache.parquet.format.DataPageHeaderV2; import org.apache.parquet.format.PageHeader; import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.IndexCache; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.ParquetWriter; @@ -97,18 +98,20 @@ public class ParquetRewriterTest { private final int numRecord = 100000; private final Configuration conf = new Configuration(); private final ParquetProperties.WriterVersion writerVersion; + private final IndexCache.CacheStrategy indexCacheStrategy; private List inputFiles = null; private String outputFile = null; private ParquetRewriter rewriter = null; - @Parameterized.Parameters(name = "WriterVersion = {0}") - public static Object[] parameters() { - return new Object[] {"v1", "v2"}; + @Parameterized.Parameters(name = "WriterVersion = {0}, IndexCacheStrategy = {1}") + public static Object[][] parameters() { + return new Object[][] {{"v1", "NONE"}, {"v1", "PREFETCH_BLOCK"}, {"v2", "NONE"}, {"v2", "PREFETCH_BLOCK"}}; } - public ParquetRewriterTest(String writerVersion) { + public ParquetRewriterTest(String writerVersion, String indexCacheStrategy) { this.writerVersion = ParquetProperties.WriterVersion.fromString(writerVersion); + this.indexCacheStrategy = IndexCache.CacheStrategy.valueOf(indexCacheStrategy); } private void testPruneSingleColumnTranslateCodec(List inputPaths) throws Exception { @@ -116,7 +119,8 @@ private void testPruneSingleColumnTranslateCodec(List inputPaths) throws E List pruneColumns = Arrays.asList("Gender"); CompressionCodecName newCodec = CompressionCodecName.ZSTD; RewriteOptions.Builder builder = new RewriteOptions.Builder(conf, inputPaths, outputPath); - RewriteOptions options = builder.prune(pruneColumns).transform(newCodec).build(); + RewriteOptions options = + builder.prune(pruneColumns).transform(newCodec).indexCacheStrategy(indexCacheStrategy).build(); rewriter = new ParquetRewriter(options); rewriter.processBlocks(); @@ -187,7 +191,8 @@ private void testPruneNullifyTranslateCodec(List inputPaths) throws Except maskColumns.put("Links.Forward", MaskMode.NULLIFY); CompressionCodecName newCodec = CompressionCodecName.ZSTD; RewriteOptions.Builder builder = new RewriteOptions.Builder(conf, inputPaths, outputPath); - RewriteOptions options = builder.prune(pruneColumns).mask(maskColumns).transform(newCodec).build(); + RewriteOptions options = + builder.prune(pruneColumns).mask(maskColumns).transform(newCodec).indexCacheStrategy(indexCacheStrategy).build(); rewriter = new ParquetRewriter(options); rewriter.processBlocks(); @@ -263,6 +268,8 @@ private void testPruneEncryptTranslateCodec(List inputPaths) throws Except EncDecProperties.getFileEncryptionProperties(encryptColumns, ParquetCipher.AES_GCM_CTR_V1, false); builder.encrypt(Arrays.asList(encryptColumns)).encryptionProperties(fileEncryptionProperties); + builder.indexCacheStrategy(indexCacheStrategy); + RewriteOptions options = builder.build(); rewriter = new ParquetRewriter(options); rewriter.processBlocks(); @@ -345,7 +352,8 @@ public void testRewriteWithoutColumnIndexes() throws Exception { List pruneCols = Lists.newArrayList("phoneNumbers"); - RewriteOptions options = builder.mask(maskCols).prune(pruneCols).build(); + RewriteOptions options = + builder.mask(maskCols).prune(pruneCols).indexCacheStrategy(indexCacheStrategy).build(); rewriter = new ParquetRewriter(options); rewriter.processBlocks(); rewriter.close(); @@ -401,9 +409,13 @@ private void testNullifyAndEncryptColumn(List inputPaths) throws Exception encryptColumns, ParquetCipher.AES_GCM_CTR_V1, false); Path outputPath = new Path(outputFile); - RewriteOptions options = new RewriteOptions.Builder(conf, inputPaths, outputPath).mask(maskColumns) - .transform(CompressionCodecName.ZSTD) - .encrypt(Arrays.asList(encryptColumns)).encryptionProperties(fileEncryptionProperties).build(); + RewriteOptions options = new RewriteOptions.Builder(conf, inputPaths, outputPath) + .mask(maskColumns) + .transform(CompressionCodecName.ZSTD) + .encrypt(Arrays.asList(encryptColumns)) + .encryptionProperties(fileEncryptionProperties) + .indexCacheStrategy(indexCacheStrategy) + .build(); rewriter = new ParquetRewriter(options); rewriter.processBlocks(); @@ -474,7 +486,7 @@ public void testMergeTwoFilesOnly() throws Exception { } Path outputPath = new Path(outputFile); RewriteOptions.Builder builder = new RewriteOptions.Builder(conf, inputPaths, outputPath); - RewriteOptions options = builder.build(); + RewriteOptions options = builder.indexCacheStrategy(indexCacheStrategy).build(); rewriter = new ParquetRewriter(options); rewriter.processBlocks(); @@ -542,7 +554,7 @@ public void testMergeTwoFilesWithDifferentSchema() throws Exception { } Path outputPath = new Path(outputFile); RewriteOptions.Builder builder = new RewriteOptions.Builder(conf, inputPaths, outputPath); - RewriteOptions options = builder.build(); + RewriteOptions options = builder.indexCacheStrategy(indexCacheStrategy).build(); // This should throw an exception because the schemas are different rewriter = new ParquetRewriter(options);