Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PARQUET-2366: Optimize random seek during rewriting #1174

Merged
merged 10 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions parquet-hadoop/src/main/java/org/apache/parquet/hadoop/IndexCache.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.hadoop;

import org.apache.parquet.column.values.bloomfilter.BloomFilter;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.internal.column.columnindex.ColumnIndex;
import org.apache.parquet.internal.column.columnindex.OffsetIndex;

import java.io.IOException;
import java.util.Set;

/**
* A cache for caching indexes(including: ColumnIndex, OffsetIndex and BloomFilter)
*/
public interface IndexCache {

enum CacheStrategy {
NONE, /* No cache */
PREFETCH_BLOCK /* Prefetch block indexes */
}

/**
* Create an index cache for the given file reader
*
* @param fileReader the file reader
* @param columns the columns that need to do cache
* @param cacheStrategy the cache strategy, supports NONE and PREFETCH_BLOCK
* @param freeCacheAfterGet whether free the given index cache after calling the given get method
* @return the index cache
*/
static IndexCache create(
ParquetFileReader fileReader,
Set<ColumnPath> columns,
CacheStrategy cacheStrategy,
boolean freeCacheAfterGet) {
if (cacheStrategy == CacheStrategy.NONE) {
return new NoneIndexCache(fileReader);
} else if (cacheStrategy == CacheStrategy.PREFETCH_BLOCK) {
return new PrefetchIndexCache(fileReader, columns, freeCacheAfterGet);
ConeyLiu marked this conversation as resolved.
Show resolved Hide resolved
} else {
throw new UnsupportedOperationException("Unknown cache strategy: " + cacheStrategy);
}
}

/**
* Set the current BlockMetadata
wgtmac marked this conversation as resolved.
Show resolved Hide resolved
*/
void setBlockMetadata(BlockMetaData currentBlockMetadata) throws IOException;

/**
* Get the ColumnIndex for the given column in the set row group.
*
* @param chunk the given column chunk
* @return the ColumnIndex for the given column
* @throws IOException if any I/O error occurs during get the ColumnIndex
*/
ColumnIndex getColumnIndex(ColumnChunkMetaData chunk) throws IOException;

/**
* Get the OffsetIndex for the given column in the set row group.
*
* @param chunk the given column chunk
* @return the OffsetIndex for the given column
* @throws IOException if any I/O error occurs during get the OffsetIndex
*/
OffsetIndex getOffsetIndex(ColumnChunkMetaData chunk) throws IOException;

/**
* Get the BloomFilter for the given column in the set row group.
*
* @param chunk the given column chunk
* @return the BloomFilter for the given column
* @throws IOException if any I/O error occurs during get the BloomFilter
*/
BloomFilter getBloomFilter(ColumnChunkMetaData chunk) throws IOException;

/**
* Clean the cache
*/
void clean();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.hadoop;

import org.apache.parquet.column.values.bloomfilter.BloomFilter;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.internal.column.columnindex.ColumnIndex;
import org.apache.parquet.internal.column.columnindex.OffsetIndex;

import java.io.IOException;

/**
* Cache nothing. All the get methods are pushed to ParquetFileReader to read the given index.
*/
class NoneIndexCache implements IndexCache {
private final ParquetFileReader fileReader;

NoneIndexCache(ParquetFileReader fileReader) {
this.fileReader = fileReader;
}

@Override
public void setBlockMetadata(BlockMetaData currentBlockMetadata) throws IOException {
// Do nothing
}

@Override
public ColumnIndex getColumnIndex(ColumnChunkMetaData chunk) throws IOException {
return fileReader.readColumnIndex(chunk);
}

@Override
public OffsetIndex getOffsetIndex(ColumnChunkMetaData chunk) throws IOException {
return fileReader.readOffsetIndex(chunk);
}

@Override
public BloomFilter getBloomFilter(ColumnChunkMetaData chunk) throws IOException {
return fileReader.readBloomFilter(chunk);
}

@Override
public void clean() {
// Do nothing
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.hadoop;

import org.apache.parquet.Preconditions;
import org.apache.parquet.column.values.bloomfilter.BloomFilter;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.internal.column.columnindex.ColumnIndex;
import org.apache.parquet.internal.column.columnindex.OffsetIndex;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

/**
* This index cache will prefetch indexes of all columns when calling {@link #setBlockMetadata(BlockMetaData)}.
* <p>
* <b>NOTE:</b> the {@link #setBlockMetadata(BlockMetaData)} will free the previous block cache
*/
class PrefetchIndexCache implements IndexCache {
private final ParquetFileReader fileReader;
private final Set<ColumnPath> columns;
private final boolean freeCacheAfterGet;

private Map<ColumnPath, ColumnIndex> columnIndexCache;
private Map<ColumnPath, OffsetIndex> offsetIndexCache;
private Map<ColumnPath, BloomFilter> bloomIndexCache;

/**
* @param fileReader the file reader
* @param columns the columns that need to cache
* @param freeCacheAfterGet whether free the given index cache after calling the given get method
*/
PrefetchIndexCache(
ParquetFileReader fileReader,
Set<ColumnPath> columns,
boolean freeCacheAfterGet) {
this.fileReader = fileReader;
this.columns = columns;
this.freeCacheAfterGet = freeCacheAfterGet;
}

@Override
public void setBlockMetadata(BlockMetaData currentBlockMetadata) throws IOException {
clean();
this.columnIndexCache = readAllColumnIndexes(currentBlockMetadata);
this.offsetIndexCache = readAllOffsetIndexes(currentBlockMetadata);
this.bloomIndexCache = readAllBloomFilters(currentBlockMetadata);
}

@Override
public ColumnIndex getColumnIndex(ColumnChunkMetaData chunk) throws IOException {
ColumnPath columnPath = chunk.getPath();
if (columns.contains(columnPath)) {
Preconditions.checkState(
columnIndexCache.containsKey(columnPath),
"Not found cached ColumnIndex for column: %s with cache strategy: %s",
columnPath.toDotString(),
CacheStrategy.PREFETCH_BLOCK);
}

if (freeCacheAfterGet) {
return columnIndexCache.remove(columnPath);
} else {
return columnIndexCache.get(columnPath);
}
}

@Override
public OffsetIndex getOffsetIndex(ColumnChunkMetaData chunk) throws IOException {
ColumnPath columnPath = chunk.getPath();

if (columns.contains(columnPath)) {
Preconditions.checkState(
offsetIndexCache.containsKey(columnPath),
"Not found cached OffsetIndex for column: %s with cache strategy: %s",
columnPath.toDotString(),
CacheStrategy.PREFETCH_BLOCK);
}

if (freeCacheAfterGet) {
return offsetIndexCache.remove(columnPath);
} else {
return offsetIndexCache.get(columnPath);
}
}

@Override
public BloomFilter getBloomFilter(ColumnChunkMetaData chunk) throws IOException {
ColumnPath columnPath = chunk.getPath();

if (columns.contains(columnPath)) {
Preconditions.checkState(
bloomIndexCache.containsKey(columnPath),
"Not found cached BloomFilter for column: %s with cache strategy: %s",
columnPath.toDotString(),
CacheStrategy.PREFETCH_BLOCK);
}

if (freeCacheAfterGet) {
return bloomIndexCache.remove(columnPath);
} else {
return bloomIndexCache.get(columnPath);
}
}

@Override
public void clean() {
if (columnIndexCache != null) {
columnIndexCache.clear();
columnIndexCache = null;
}

if (offsetIndexCache != null) {
offsetIndexCache.clear();
offsetIndexCache = null;
}

if (bloomIndexCache != null) {
bloomIndexCache.clear();
bloomIndexCache = null;
}
}

private Map<ColumnPath, ColumnIndex> readAllColumnIndexes(BlockMetaData blockMetaData) throws IOException {
Map<ColumnPath, ColumnIndex> columnIndexMap = new HashMap<>(columns.size());
for (ColumnChunkMetaData chunk : blockMetaData.getColumns()) {
if (columns.contains(chunk.getPath())) {
columnIndexMap.put(chunk.getPath(), fileReader.readColumnIndex(chunk));
}
}

return columnIndexMap;
}

private Map<ColumnPath, OffsetIndex> readAllOffsetIndexes(BlockMetaData blockMetaData) throws IOException {
Map<ColumnPath, OffsetIndex> offsetIndexMap = new HashMap<>(columns.size());
for (ColumnChunkMetaData chunk : blockMetaData.getColumns()) {
if (columns.contains(chunk.getPath())) {
offsetIndexMap.put(chunk.getPath(), fileReader.readOffsetIndex(chunk));
}
}

return offsetIndexMap;
}

private Map<ColumnPath, BloomFilter> readAllBloomFilters(BlockMetaData blockMetaData) throws IOException {
Map<ColumnPath, BloomFilter> bloomFilterMap = new HashMap<>(columns.size());
for (ColumnChunkMetaData chunk : blockMetaData.getColumns()) {
if (columns.contains(chunk.getPath())) {
bloomFilterMap.put(chunk.getPath(), fileReader.readBloomFilter(chunk));
}
}

return bloomFilterMap;
}
}
Loading