Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PARQUET-2366: Optimize random seek during rewriting #1174

Merged
merged 10 commits into from
Oct 30, 2023
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,10 @@ private void processBlocksFromReader() throws IOException {
BlockMetaData blockMetaData = meta.getBlocks().get(blockId);
List<ColumnChunkMetaData> columnsInOrder = blockMetaData.getColumns();

List<ColumnIndex> columnIndexes = readAllColumnIndexes(reader, columnsInOrder, descriptorsMap);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could add an option to it if someone is concerned about memory usage. This only caches the metadata for only one block and should be smaller than doing file writing which needs to cache all blocks' metadata.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding this! The change looks reasonable to me. I would suggest adding a new class to specifically cache and read these indexes. The new class have methods like readBloomFilter(), readColumnIndex() and readOffsetIndex() for a specific column path, and can be configured to cache required columns in advance. With this new class, we can do more optimizations including evict consumed items out of cache and use async I/O prefetch to load items. We can split them into separate patches. For the first one, we may simply add the new class without any caching (i.e. no behavior change). WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wgtmac thanks for your suggestions. Do you mean to read the indexes column by column to reduce memory footprint? The suggested way should have less memory usage. The indexes are stored as the following from my understanding:

// column index
block1_col1_column_index
...
block1_coln_column_index
block2_col1_column_index
...
block2_coln_column_index
...

// offset index
block1_col1_offset_index
...
block1_coln_offset_index
block2_col1_offset_index
...
block2_coln_offset_index
...

// bloom index
block1_col1_bloom_index
...
block1_coln_bloom_index
block2_col1_bloom_index
...
block2_coln_bloom_index
...

So the problem would be we still need to do random seek for a single rowgroup(3 * number of columns). The async I/O should be helpful for the random seek performance. With this PR, we only need 3 times random seek (except the column pruning) for a single rowgroup.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean to read the indexes column by column to reduce memory footprint?

No, my suggested interface does not restrict any implementation detail, at least they should be ready at the readXXX() call. You can still read all indexes at once (controlled by a config). We can configurated it to release any consumed index object to reduce memory footprint.

List<OffsetIndex> offsetIndexes = readAllOffsetIndexes(reader, columnsInOrder, descriptorsMap);
List<BloomFilter> bloomFilters = readAllBloomFilters(reader, columnsInOrder, descriptorsMap);

for (int i = 0, columnId = 0; i < columnsInOrder.size(); i++) {
ColumnChunkMetaData chunk = columnsInOrder.get(i);
ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
Expand Down Expand Up @@ -314,13 +318,20 @@ private void processBlocksFromReader() throws IOException {

// Translate compression and/or encryption
writer.startColumn(descriptor, crStore.getColumnReader(descriptor).getTotalValueCount(), newCodecName);
processChunk(chunk, newCodecName, columnChunkEncryptorRunTime, encryptColumn);
processChunk(
chunk,
newCodecName,
columnChunkEncryptorRunTime,
encryptColumn,
bloomFilters.get(i),
columnIndexes.get(i),
offsetIndexes.get(i));
writer.endColumn();
} else {
// Nothing changed, simply copy the binary data.
BloomFilter bloomFilter = reader.readBloomFilter(chunk);
ColumnIndex columnIndex = reader.readColumnIndex(chunk);
OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);
BloomFilter bloomFilter = bloomFilters.get(i);
ColumnIndex columnIndex = columnIndexes.get(i);
OffsetIndex offsetIndex = offsetIndexes.get(i);
writer.appendColumnChunk(descriptor, reader.getStream(), chunk, bloomFilter, columnIndex, offsetIndex);
}

Expand All @@ -338,7 +349,10 @@ private void processBlocksFromReader() throws IOException {
private void processChunk(ColumnChunkMetaData chunk,
CompressionCodecName newCodecName,
ColumnChunkEncryptorRunTime columnChunkEncryptorRunTime,
boolean encryptColumn) throws IOException {
boolean encryptColumn,
BloomFilter bloomFilter,
ColumnIndex columnIndex,
OffsetIndex offsetIndex) throws IOException {
CompressionCodecFactory codecFactory = HadoopCodecs.newFactory(0);
CompressionCodecFactory.BytesInputDecompressor decompressor = null;
CompressionCodecFactory.BytesInputCompressor compressor = null;
Expand All @@ -364,9 +378,6 @@ private void processChunk(ColumnChunkMetaData chunk,
dataPageHeaderAAD = columnChunkEncryptorRunTime.getDataPageHeaderAAD();
}

ColumnIndex columnIndex = reader.readColumnIndex(chunk);
OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);
BloomFilter bloomFilter = reader.readBloomFilter(chunk);
if (bloomFilter != null) {
writer.addBloomFilter(chunk.getPath().toDotString(), bloomFilter);
}
Expand Down Expand Up @@ -736,6 +747,54 @@ private Type extractField(GroupType candidate, Type targetField) {
return null;
}

private static List<ColumnIndex> readAllColumnIndexes(
TransParquetFileReader reader,
List<ColumnChunkMetaData> chunks,
Map<ColumnPath, ColumnDescriptor> descriptorsMap) throws IOException {
List<ColumnIndex> columnIndexList = new ArrayList<>(chunks.size());
for (ColumnChunkMetaData chunk : chunks) {
if (descriptorsMap.containsKey(chunk.getPath())) {
columnIndexList.add(reader.readColumnIndex(chunk));
} else {
columnIndexList.add(null);
}
}

return columnIndexList;
}

private static List<OffsetIndex> readAllOffsetIndexes(
TransParquetFileReader reader,
List<ColumnChunkMetaData> chunks,
Map<ColumnPath, ColumnDescriptor> descriptorsMap) throws IOException {
List<OffsetIndex> offsetIndexList = new ArrayList<>(chunks.size());
for (ColumnChunkMetaData chunk : chunks) {
if (descriptorsMap.containsKey(chunk.getPath())) {
offsetIndexList.add(reader.readOffsetIndex(chunk));
} else {
offsetIndexList.add(null);
}
}

return offsetIndexList;
}

private static List<BloomFilter> readAllBloomFilters(
TransParquetFileReader reader,
List<ColumnChunkMetaData> chunks,
Map<ColumnPath, ColumnDescriptor> descriptorsMap) throws IOException {
List<BloomFilter> bloomFilterList = new ArrayList<>(chunks.size());
for (ColumnChunkMetaData chunk : chunks) {
if (descriptorsMap.containsKey(chunk.getPath())) {
bloomFilterList.add(reader.readBloomFilter(chunk));
} else {
bloomFilterList.add(null);
}
}

return bloomFilterList;
}

private static final class DummyGroupConverter extends GroupConverter {
@Override
public void start() {
Expand Down