Skip to content

Commit

Permalink
PARQUET-2374: Add metrics support for parquet file reader (#1187)
Browse files Browse the repository at this point in the history
  • Loading branch information
parthchandra authored Jan 12, 2024
1 parent 8418b8b commit 2e0cd19
Show file tree
Hide file tree
Showing 6 changed files with 194 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.parquet.crypto.FileDecryptionProperties;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.format.converter.ParquetMetadataConverter.MetadataFilter;
import org.apache.parquet.hadoop.ParquetMetricsCallback;

public class HadoopReadOptions extends ParquetReadOptions {
private final Configuration conf;
Expand All @@ -49,7 +50,8 @@ private HadoopReadOptions(
int maxAllocationSize,
Map<String, String> properties,
Configuration conf,
FileDecryptionProperties fileDecryptionProperties) {
FileDecryptionProperties fileDecryptionProperties,
ParquetMetricsCallback metricsCallback) {
super(
useSignedStringMinMax,
useStatsFilter,
Expand All @@ -66,6 +68,7 @@ private HadoopReadOptions(
maxAllocationSize,
properties,
fileDecryptionProperties,
metricsCallback,
new HadoopParquetConfiguration(conf));
this.conf = conf;
}
Expand Down Expand Up @@ -127,7 +130,8 @@ public ParquetReadOptions build() {
maxAllocationSize,
properties,
conf,
fileDecryptionProperties);
fileDecryptionProperties,
metricsCallback);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.parquet.crypto.FileDecryptionProperties;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.ParquetMetricsCallback;
import org.apache.parquet.hadoop.util.HadoopCodecs;

// Internal use only
Expand Down Expand Up @@ -75,6 +76,7 @@ public class ParquetReadOptions {
private final Map<String, String> properties;
private final FileDecryptionProperties fileDecryptionProperties;
private final ParquetConfiguration conf;
private final ParquetMetricsCallback metricsCallback;

ParquetReadOptions(
boolean useSignedStringMinMax,
Expand All @@ -91,7 +93,8 @@ public class ParquetReadOptions {
ByteBufferAllocator allocator,
int maxAllocationSize,
Map<String, String> properties,
FileDecryptionProperties fileDecryptionProperties) {
FileDecryptionProperties fileDecryptionProperties,
ParquetMetricsCallback metricsCallback) {
this(
useSignedStringMinMax,
useStatsFilter,
Expand All @@ -108,6 +111,7 @@ public class ParquetReadOptions {
maxAllocationSize,
properties,
fileDecryptionProperties,
metricsCallback,
new HadoopParquetConfiguration());
}

Expand All @@ -127,6 +131,7 @@ public class ParquetReadOptions {
int maxAllocationSize,
Map<String, String> properties,
FileDecryptionProperties fileDecryptionProperties,
ParquetMetricsCallback metricsCallback,
ParquetConfiguration conf) {
this.useSignedStringMinMax = useSignedStringMinMax;
this.useStatsFilter = useStatsFilter;
Expand All @@ -143,6 +148,7 @@ public class ParquetReadOptions {
this.maxAllocationSize = maxAllocationSize;
this.properties = Collections.unmodifiableMap(properties);
this.fileDecryptionProperties = fileDecryptionProperties;
this.metricsCallback = metricsCallback;
this.conf = conf;
}

Expand Down Expand Up @@ -210,6 +216,10 @@ public FileDecryptionProperties getDecryptionProperties() {
return fileDecryptionProperties;
}

public ParquetMetricsCallback getMetricsCallback() {
return metricsCallback;
}

public boolean isEnabled(String property, boolean defaultValue) {
Optional<String> propValue = Optional.ofNullable(properties.get(property));
return propValue.map(Boolean::parseBoolean).orElse(defaultValue);
Expand Down Expand Up @@ -245,6 +255,7 @@ public static class Builder {
protected Map<String, String> properties = new HashMap<>();
protected FileDecryptionProperties fileDecryptionProperties = null;
protected ParquetConfiguration conf;
protected ParquetMetricsCallback metricsCallback;

public Builder() {
this(new HadoopParquetConfiguration());
Expand Down Expand Up @@ -391,6 +402,11 @@ public Builder withDecryption(FileDecryptionProperties fileDecryptionProperties)
return this;
}

public Builder withMetricsCallback(ParquetMetricsCallback metricsCallback) {
this.metricsCallback = metricsCallback;
return this;
}

public Builder set(String key, String value) {
properties.put(key, value);
return this;
Expand All @@ -407,6 +423,7 @@ public Builder copy(ParquetReadOptions options) {
withAllocator(options.allocator);
withPageChecksumVerification(options.usePageChecksumVerification);
withDecryption(options.fileDecryptionProperties);
withMetricsCallback(options.metricsCallback);
conf = options.conf;
for (Map.Entry<String, String> keyValue : options.properties.entrySet()) {
set(keyValue.getKey(), keyValue.getValue());
Expand Down Expand Up @@ -439,6 +456,7 @@ public ParquetReadOptions build() {
maxAllocationSize,
properties,
fileDecryptionProperties,
metricsCallback,
conf);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,13 @@ public DataPage visit(DataPageV1 dataPageV1) {

ByteBuffer decompressedBuffer =
options.getAllocator().allocate(dataPageV1.getUncompressedSize());
long start = System.nanoTime();
decompressor.decompress(
byteBuffer,
(int) compressedSize,
decompressedBuffer,
dataPageV1.getUncompressedSize());
setDecompressMetrics(bytes, start);

// HACKY: sometimes we need to do `flip` because the position of output bytebuffer is
// not reset.
Expand All @@ -172,7 +174,9 @@ public DataPage visit(DataPageV1 dataPageV1) {
if (null != blockDecryptor) {
bytes = BytesInput.from(blockDecryptor.decrypt(bytes.toByteArray(), dataPageAAD));
}
long start = System.nanoTime();
decompressed = decompressor.decompress(bytes, dataPageV1.getUncompressedSize());
setDecompressMetrics(bytes, start);
}

final DataPageV1 decompressedPage;
Expand Down Expand Up @@ -234,8 +238,10 @@ public DataPage visit(DataPageV2 dataPageV2) {
- dataPageV2.getRepetitionLevels().size());
ByteBuffer decompressedBuffer =
options.getAllocator().allocate(uncompressedSize);
long start = System.nanoTime();
decompressor.decompress(
byteBuffer, (int) compressedSize, decompressedBuffer, uncompressedSize);
setDecompressMetrics(pageBytes, start);

// HACKY: sometimes we need to do `flip` because the position of output bytebuffer is
// not reset.
Expand All @@ -255,7 +261,9 @@ public DataPage visit(DataPageV2 dataPageV2) {
int uncompressedSize = Math.toIntExact(dataPageV2.getUncompressedSize()
- dataPageV2.getDefinitionLevels().size()
- dataPageV2.getRepetitionLevels().size());
long start = System.nanoTime();
pageBytes = decompressor.decompress(pageBytes, uncompressedSize);
setDecompressMetrics(pageBytes, start);
}
}
} catch (IOException e) {
Expand Down Expand Up @@ -293,6 +301,23 @@ public DataPage visit(DataPageV2 dataPageV2) {
});
}

private void setDecompressMetrics(BytesInput bytes, long start) {
final ParquetMetricsCallback metricsCallback = options.getMetricsCallback();
if (metricsCallback != null) {
long time = Math.max(System.nanoTime() - start, 0);
long len = bytes.size();
double throughput = ((double) len / time) * ((double) 1000_000_000L) / (1024 * 1024);
LOG.debug(
"Decompress block: Length: {} MB, Time: {} msecs, throughput: {} MB/s",
len / (1024 * 1024),
time / 1000_000L,
throughput);
metricsCallback.setDuration(ParquetFileReaderMetrics.DecompressTime.name(), time);
metricsCallback.setValueLong(ParquetFileReaderMetrics.DecompressSize.name(), len);
metricsCallback.setValueDouble(ParquetFileReaderMetrics.DecompressThroughput.name(), throughput);
}
}

@Override
public DictionaryPage readDictionaryPage() {
if (compressedDictionaryPage == null) {
Expand All @@ -303,6 +328,10 @@ public DictionaryPage readDictionaryPage() {
if (null != blockDecryptor) {
bytes = BytesInput.from(blockDecryptor.decrypt(bytes.toByteArray(), dictionaryPageAAD));
}
long start = System.nanoTime();
BytesInput decompressed =
decompressor.decompress(bytes, compressedDictionaryPage.getUncompressedSize());
setDecompressMetrics(bytes, start);
DictionaryPage decompressedPage = new DictionaryPage(
decompressor.decompress(bytes, compressedDictionaryPage.getUncompressedSize()),
compressedDictionaryPage.getDictionarySize(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,38 @@ public ParquetFileReader(Configuration conf, Path file, ParquetMetadata footer)
this.crc = options.usePageChecksumVerification() ? new CRC32() : null;
}

/**
* @param conf the Hadoop Configuration
* @param file Path to a parquet file
* @param footer a {@link ParquetMetadata} footer already read from the file
* @param options {@link ParquetReadOptions}
* @throws IOException if the file can not be opened
*/
public ParquetFileReader(Configuration conf, Path file, ParquetMetadata footer, ParquetReadOptions options)
throws IOException {
this.converter = new ParquetMetadataConverter(conf);
this.file = HadoopInputFile.fromPath(file, conf);
this.f = this.file.newStream();
this.fileMetaData = footer.getFileMetaData();
this.fileDecryptor = fileMetaData.getFileDecryptor();
this.options = options;
this.footer = footer;
try {
this.blocks = filterRowGroups(footer.getBlocks());
} catch (Exception e) {
// In case that filterRowGroups throws an exception in the constructor, the new stream
// should be closed. Otherwise, there's no way to close this outside.
f.close();
throw e;
}
this.blockIndexStores = listWithNulls(this.blocks.size());
this.blockRowRanges = listWithNulls(this.blocks.size());
for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) {
paths.put(ColumnPath.get(col.getPath()), col);
}
this.crc = options.usePageChecksumVerification() ? new CRC32() : null;
}

public ParquetFileReader(InputFile file, ParquetReadOptions options) throws IOException {
this.converter = new ParquetMetadataConverter(options);
this.file = file;
Expand Down Expand Up @@ -1003,7 +1035,7 @@ private ColumnChunkPageReadStore internalReadRowGroup(int blockIndex) throws IOE
ColumnChunkPageReadStore rowGroup =
new ColumnChunkPageReadStore(block.getRowCount(), block.getRowIndexOffset());
// prepare the list of consecutive parts to read them in one scan
List<ConsecutivePartList> allParts = new ArrayList<ConsecutivePartList>();
List<ConsecutivePartList> allParts = new ArrayList<>();
ConsecutivePartList currentParts = null;
for (ColumnChunkMetaData mc : block.getColumns()) {
ColumnPath pathKey = mc.getPath();
Expand Down Expand Up @@ -1961,10 +1993,12 @@ public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOEx
buffers.add(options.getAllocator().allocate(lastAllocationSize));
}

long readStart = System.nanoTime();
for (ByteBuffer buffer : buffers) {
f.readFully(buffer);
buffer.flip();
}
setReadMetrics(readStart);

// report in a counter the data we just scanned
BenchmarkCounter.incrementBytesRead(length);
Expand All @@ -1974,6 +2008,24 @@ public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOEx
}
}

private void setReadMetrics(long startNs) {
ParquetMetricsCallback metricsCallback = options.getMetricsCallback();
if (metricsCallback != null) {
long totalFileReadTimeNs = Math.max(System.nanoTime() - startNs, 0);
double sizeInMb = ((double) length) / (1024 * 1024);
double timeInSec = ((double) totalFileReadTimeNs) / 1000_0000_0000L;
double throughput = sizeInMb / timeInSec;
LOG.debug(
"Parquet: File Read stats: Length: {} MB, Time: {} secs, throughput: {} MB/sec ",
sizeInMb,
timeInSec,
throughput);
metricsCallback.setDuration(ParquetFileReaderMetrics.ReadTime.name(), totalFileReadTimeNs);
metricsCallback.setValueLong(ParquetFileReaderMetrics.ReadSize.name(), length);
metricsCallback.setValueDouble(ParquetFileReaderMetrics.ReadThroughput.name(), throughput);
}
}

/**
* @return the position following the last byte of these chunks
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.hadoop;

public enum ParquetFileReaderMetrics {

// metrics
ReadTime("time spent in reading Parquet file from storage"),
SeekTime("time spent in seek when reading Parquet file from storage"),
ReadSize("read size when reading Parquet file from storage (MB)"),
ReadThroughput("read throughput when reading Parquet file from storage (MB/sec)"),
DecompressTime("time spent in block decompression"),
DecompressSize("decompressed data size (MB)"),
DecompressThroughput("block decompression throughput (MB/sec)");

private final String desc;

ParquetFileReaderMetrics(String desc) {
this.desc = desc;
}

public String description() {
return desc;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.hadoop;

import org.apache.hadoop.classification.InterfaceStability;

/**
* A simple interface to pass basic metric values by name to any implementation. Typically, an
* implementation of this interface will serve as a bridge to pass metric values on
* to the metrics system of a distributed engine (hadoop, spark, etc).
* <br>
* Development Note: This interface should provide a default implementation for any new metric tracker
* added to allow for backward compatibility
* <br>
* e.g.
* <br>
* <code>default addMaximum(key, value) { } ; </code>
*/
@InterfaceStability.Unstable
public interface ParquetMetricsCallback {
void setValueInt(String name, int value);

void setValueLong(String name, long value);

void setValueFloat(String name, float value);

void setValueDouble(String name, double value);

void setDuration(String name, long value);
}

0 comments on commit 2e0cd19

Please sign in to comment.