Skip to content

Commit

Permalink
Avoid random seek
Browse files Browse the repository at this point in the history
  • Loading branch information
ConeyLiu committed Oct 17, 2023
1 parent 5ee5133 commit 175f9f9
Showing 1 changed file with 68 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,10 @@ private void processBlocksFromReader() throws IOException {
BlockMetaData blockMetaData = meta.getBlocks().get(blockId);
List<ColumnChunkMetaData> columnsInOrder = blockMetaData.getColumns();

List<ColumnIndex> columnIndexes = readAllColumnIndexes(reader, columnsInOrder, descriptorsMap);
List<OffsetIndex> offsetIndexes = readAllOffsetIndexes(reader, columnsInOrder, descriptorsMap);
List<BloomFilter> bloomFilters = readAllBloomFilters(reader, columnsInOrder, descriptorsMap);

for (int i = 0, columnId = 0; i < columnsInOrder.size(); i++) {
ColumnChunkMetaData chunk = columnsInOrder.get(i);
ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
Expand Down Expand Up @@ -314,13 +318,20 @@ private void processBlocksFromReader() throws IOException {

// Translate compression and/or encryption
writer.startColumn(descriptor, crStore.getColumnReader(descriptor).getTotalValueCount(), newCodecName);
processChunk(chunk, newCodecName, columnChunkEncryptorRunTime, encryptColumn);
processChunk(
chunk,
newCodecName,
columnChunkEncryptorRunTime,
encryptColumn,
bloomFilters.get(i),
columnIndexes.get(i),
offsetIndexes.get(i));
writer.endColumn();
} else {
// Nothing changed, simply copy the binary data.
BloomFilter bloomFilter = reader.readBloomFilter(chunk);
ColumnIndex columnIndex = reader.readColumnIndex(chunk);
OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);
BloomFilter bloomFilter = bloomFilters.get(i);
ColumnIndex columnIndex = columnIndexes.get(i);
OffsetIndex offsetIndex = offsetIndexes.get(i);
writer.appendColumnChunk(descriptor, reader.getStream(), chunk, bloomFilter, columnIndex, offsetIndex);
}

Expand All @@ -338,7 +349,10 @@ private void processBlocksFromReader() throws IOException {
private void processChunk(ColumnChunkMetaData chunk,
CompressionCodecName newCodecName,
ColumnChunkEncryptorRunTime columnChunkEncryptorRunTime,
boolean encryptColumn) throws IOException {
boolean encryptColumn,
BloomFilter bloomFilter,
ColumnIndex columnIndex,
OffsetIndex offsetIndex) throws IOException {
CompressionCodecFactory codecFactory = HadoopCodecs.newFactory(0);
CompressionCodecFactory.BytesInputDecompressor decompressor = null;
CompressionCodecFactory.BytesInputCompressor compressor = null;
Expand All @@ -364,9 +378,6 @@ private void processChunk(ColumnChunkMetaData chunk,
dataPageHeaderAAD = columnChunkEncryptorRunTime.getDataPageHeaderAAD();
}

ColumnIndex columnIndex = reader.readColumnIndex(chunk);
OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);
BloomFilter bloomFilter = reader.readBloomFilter(chunk);
if (bloomFilter != null) {
writer.addBloomFilter(chunk.getPath().toDotString(), bloomFilter);
}
Expand Down Expand Up @@ -736,6 +747,55 @@ private Type extractField(GroupType candidate, Type targetField) {
return null;
}

private static List<ColumnIndex> readAllColumnIndexes(
TransParquetFileReader reader,
List<ColumnChunkMetaData> chunks,
Map<ColumnPath, ColumnDescriptor> descriptorsMap) throws IOException {
List<ColumnIndex> columnIndexList = new ArrayList<>(chunks.size());
for (ColumnChunkMetaData chunk : chunks) {
if (descriptorsMap.containsKey(chunk.getPath())) {
columnIndexList.add(reader.readColumnIndex(chunk));
} else {
columnIndexList.add(null);
}
}

return columnIndexList;
}

private static List<OffsetIndex> readAllOffsetIndexes(
TransParquetFileReader reader,
List<ColumnChunkMetaData> chunks,
Map<ColumnPath, ColumnDescriptor> descriptorsMap) throws IOException {
List<OffsetIndex> offsetIndexList = new ArrayList<>(chunks.size());
for (ColumnChunkMetaData chunk : chunks) {
if (descriptorsMap.containsKey(chunk.getPath())) {
offsetIndexList.add(reader.readOffsetIndex(chunk));
} else {
offsetIndexList.add(null);
}
}

return offsetIndexList;
}

private static List<BloomFilter> readAllBloomFilters(
TransParquetFileReader reader,
List<ColumnChunkMetaData> chunks,
Map<ColumnPath, ColumnDescriptor> descriptorsMap) throws IOException {
List<BloomFilter> bloomFilterList = new ArrayList<>(chunks.size());
for (ColumnChunkMetaData chunk : chunks) {
if (descriptorsMap.containsKey(chunk.getPath())) {
bloomFilterList.add(reader.readBloomFilter(chunk));
} else {
bloomFilterList.add(null);
}
}

return bloomFilterList;
}


private static final class DummyGroupConverter extends GroupConverter {
@Override
public void start() {
Expand Down

0 comments on commit 175f9f9

Please sign in to comment.