From 2e0cd1925546d2560f7658086251851e6fa68559 Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Fri, 12 Jan 2024 07:24:18 -0800 Subject: [PATCH] PARQUET-2374: Add metrics support for parquet file reader (#1187) --- .../org/apache/parquet/HadoopReadOptions.java | 8 ++- .../apache/parquet/ParquetReadOptions.java | 20 ++++++- .../hadoop/ColumnChunkPageReadStore.java | 29 ++++++++++ .../parquet/hadoop/ParquetFileReader.java | 54 ++++++++++++++++++- .../hadoop/ParquetFileReaderMetrics.java | 41 ++++++++++++++ .../hadoop/ParquetMetricsCallback.java | 46 ++++++++++++++++ 6 files changed, 194 insertions(+), 4 deletions(-) create mode 100644 parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReaderMetrics.java create mode 100644 parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetMetricsCallback.java diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java index 78a24a03f6..51a89a678d 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java @@ -29,6 +29,7 @@ import org.apache.parquet.crypto.FileDecryptionProperties; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.format.converter.ParquetMetadataConverter.MetadataFilter; +import org.apache.parquet.hadoop.ParquetMetricsCallback; public class HadoopReadOptions extends ParquetReadOptions { private final Configuration conf; @@ -49,7 +50,8 @@ private HadoopReadOptions( int maxAllocationSize, Map properties, Configuration conf, - FileDecryptionProperties fileDecryptionProperties) { + FileDecryptionProperties fileDecryptionProperties, + ParquetMetricsCallback metricsCallback) { super( useSignedStringMinMax, useStatsFilter, @@ -66,6 +68,7 @@ private HadoopReadOptions( maxAllocationSize, properties, fileDecryptionProperties, + metricsCallback, new HadoopParquetConfiguration(conf)); this.conf = conf; } @@ -127,7 +130,8 @@ public ParquetReadOptions build() { maxAllocationSize, properties, conf, - fileDecryptionProperties); + fileDecryptionProperties, + metricsCallback); } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java index ee7595212e..72fe230bd6 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java @@ -43,6 +43,7 @@ import org.apache.parquet.crypto.FileDecryptionProperties; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.ParquetMetricsCallback; import org.apache.parquet.hadoop.util.HadoopCodecs; // Internal use only @@ -75,6 +76,7 @@ public class ParquetReadOptions { private final Map properties; private final FileDecryptionProperties fileDecryptionProperties; private final ParquetConfiguration conf; + private final ParquetMetricsCallback metricsCallback; ParquetReadOptions( boolean useSignedStringMinMax, @@ -91,7 +93,8 @@ public class ParquetReadOptions { ByteBufferAllocator allocator, int maxAllocationSize, Map properties, - FileDecryptionProperties fileDecryptionProperties) { + FileDecryptionProperties fileDecryptionProperties, + ParquetMetricsCallback metricsCallback) { this( useSignedStringMinMax, useStatsFilter, @@ -108,6 +111,7 @@ public class ParquetReadOptions { maxAllocationSize, properties, fileDecryptionProperties, + metricsCallback, new HadoopParquetConfiguration()); } @@ -127,6 +131,7 @@ public class ParquetReadOptions { int maxAllocationSize, Map properties, FileDecryptionProperties fileDecryptionProperties, + ParquetMetricsCallback metricsCallback, ParquetConfiguration conf) { this.useSignedStringMinMax = useSignedStringMinMax; this.useStatsFilter = useStatsFilter; @@ -143,6 +148,7 @@ public class ParquetReadOptions { this.maxAllocationSize = maxAllocationSize; this.properties = Collections.unmodifiableMap(properties); this.fileDecryptionProperties = fileDecryptionProperties; + this.metricsCallback = metricsCallback; this.conf = conf; } @@ -210,6 +216,10 @@ public FileDecryptionProperties getDecryptionProperties() { return fileDecryptionProperties; } + public ParquetMetricsCallback getMetricsCallback() { + return metricsCallback; + } + public boolean isEnabled(String property, boolean defaultValue) { Optional propValue = Optional.ofNullable(properties.get(property)); return propValue.map(Boolean::parseBoolean).orElse(defaultValue); @@ -245,6 +255,7 @@ public static class Builder { protected Map properties = new HashMap<>(); protected FileDecryptionProperties fileDecryptionProperties = null; protected ParquetConfiguration conf; + protected ParquetMetricsCallback metricsCallback; public Builder() { this(new HadoopParquetConfiguration()); @@ -391,6 +402,11 @@ public Builder withDecryption(FileDecryptionProperties fileDecryptionProperties) return this; } + public Builder withMetricsCallback(ParquetMetricsCallback metricsCallback) { + this.metricsCallback = metricsCallback; + return this; + } + public Builder set(String key, String value) { properties.put(key, value); return this; @@ -407,6 +423,7 @@ public Builder copy(ParquetReadOptions options) { withAllocator(options.allocator); withPageChecksumVerification(options.usePageChecksumVerification); withDecryption(options.fileDecryptionProperties); + withMetricsCallback(options.metricsCallback); conf = options.conf; for (Map.Entry keyValue : options.properties.entrySet()) { set(keyValue.getKey(), keyValue.getValue()); @@ -439,6 +456,7 @@ public ParquetReadOptions build() { maxAllocationSize, properties, fileDecryptionProperties, + metricsCallback, conf); } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java index 4905e94d2c..5c376c8ce4 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java @@ -156,11 +156,13 @@ public DataPage visit(DataPageV1 dataPageV1) { ByteBuffer decompressedBuffer = options.getAllocator().allocate(dataPageV1.getUncompressedSize()); + long start = System.nanoTime(); decompressor.decompress( byteBuffer, (int) compressedSize, decompressedBuffer, dataPageV1.getUncompressedSize()); + setDecompressMetrics(bytes, start); // HACKY: sometimes we need to do `flip` because the position of output bytebuffer is // not reset. @@ -172,7 +174,9 @@ public DataPage visit(DataPageV1 dataPageV1) { if (null != blockDecryptor) { bytes = BytesInput.from(blockDecryptor.decrypt(bytes.toByteArray(), dataPageAAD)); } + long start = System.nanoTime(); decompressed = decompressor.decompress(bytes, dataPageV1.getUncompressedSize()); + setDecompressMetrics(bytes, start); } final DataPageV1 decompressedPage; @@ -234,8 +238,10 @@ public DataPage visit(DataPageV2 dataPageV2) { - dataPageV2.getRepetitionLevels().size()); ByteBuffer decompressedBuffer = options.getAllocator().allocate(uncompressedSize); + long start = System.nanoTime(); decompressor.decompress( byteBuffer, (int) compressedSize, decompressedBuffer, uncompressedSize); + setDecompressMetrics(pageBytes, start); // HACKY: sometimes we need to do `flip` because the position of output bytebuffer is // not reset. @@ -255,7 +261,9 @@ public DataPage visit(DataPageV2 dataPageV2) { int uncompressedSize = Math.toIntExact(dataPageV2.getUncompressedSize() - dataPageV2.getDefinitionLevels().size() - dataPageV2.getRepetitionLevels().size()); + long start = System.nanoTime(); pageBytes = decompressor.decompress(pageBytes, uncompressedSize); + setDecompressMetrics(pageBytes, start); } } } catch (IOException e) { @@ -293,6 +301,23 @@ public DataPage visit(DataPageV2 dataPageV2) { }); } + private void setDecompressMetrics(BytesInput bytes, long start) { + final ParquetMetricsCallback metricsCallback = options.getMetricsCallback(); + if (metricsCallback != null) { + long time = Math.max(System.nanoTime() - start, 0); + long len = bytes.size(); + double throughput = ((double) len / time) * ((double) 1000_000_000L) / (1024 * 1024); + LOG.debug( + "Decompress block: Length: {} MB, Time: {} msecs, throughput: {} MB/s", + len / (1024 * 1024), + time / 1000_000L, + throughput); + metricsCallback.setDuration(ParquetFileReaderMetrics.DecompressTime.name(), time); + metricsCallback.setValueLong(ParquetFileReaderMetrics.DecompressSize.name(), len); + metricsCallback.setValueDouble(ParquetFileReaderMetrics.DecompressThroughput.name(), throughput); + } + } + @Override public DictionaryPage readDictionaryPage() { if (compressedDictionaryPage == null) { @@ -303,6 +328,10 @@ public DictionaryPage readDictionaryPage() { if (null != blockDecryptor) { bytes = BytesInput.from(blockDecryptor.decrypt(bytes.toByteArray(), dictionaryPageAAD)); } + long start = System.nanoTime(); + BytesInput decompressed = + decompressor.decompress(bytes, compressedDictionaryPage.getUncompressedSize()); + setDecompressMetrics(bytes, start); DictionaryPage decompressedPage = new DictionaryPage( decompressor.decompress(bytes, compressedDictionaryPage.getUncompressedSize()), compressedDictionaryPage.getDictionarySize(), diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index ce154e0aa1..91be68ed66 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -823,6 +823,38 @@ public ParquetFileReader(Configuration conf, Path file, ParquetMetadata footer) this.crc = options.usePageChecksumVerification() ? new CRC32() : null; } + /** + * @param conf the Hadoop Configuration + * @param file Path to a parquet file + * @param footer a {@link ParquetMetadata} footer already read from the file + * @param options {@link ParquetReadOptions} + * @throws IOException if the file can not be opened + */ + public ParquetFileReader(Configuration conf, Path file, ParquetMetadata footer, ParquetReadOptions options) + throws IOException { + this.converter = new ParquetMetadataConverter(conf); + this.file = HadoopInputFile.fromPath(file, conf); + this.f = this.file.newStream(); + this.fileMetaData = footer.getFileMetaData(); + this.fileDecryptor = fileMetaData.getFileDecryptor(); + this.options = options; + this.footer = footer; + try { + this.blocks = filterRowGroups(footer.getBlocks()); + } catch (Exception e) { + // In case that filterRowGroups throws an exception in the constructor, the new stream + // should be closed. Otherwise, there's no way to close this outside. + f.close(); + throw e; + } + this.blockIndexStores = listWithNulls(this.blocks.size()); + this.blockRowRanges = listWithNulls(this.blocks.size()); + for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) { + paths.put(ColumnPath.get(col.getPath()), col); + } + this.crc = options.usePageChecksumVerification() ? new CRC32() : null; + } + public ParquetFileReader(InputFile file, ParquetReadOptions options) throws IOException { this.converter = new ParquetMetadataConverter(options); this.file = file; @@ -1003,7 +1035,7 @@ private ColumnChunkPageReadStore internalReadRowGroup(int blockIndex) throws IOE ColumnChunkPageReadStore rowGroup = new ColumnChunkPageReadStore(block.getRowCount(), block.getRowIndexOffset()); // prepare the list of consecutive parts to read them in one scan - List allParts = new ArrayList(); + List allParts = new ArrayList<>(); ConsecutivePartList currentParts = null; for (ColumnChunkMetaData mc : block.getColumns()) { ColumnPath pathKey = mc.getPath(); @@ -1961,10 +1993,12 @@ public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOEx buffers.add(options.getAllocator().allocate(lastAllocationSize)); } + long readStart = System.nanoTime(); for (ByteBuffer buffer : buffers) { f.readFully(buffer); buffer.flip(); } + setReadMetrics(readStart); // report in a counter the data we just scanned BenchmarkCounter.incrementBytesRead(length); @@ -1974,6 +2008,24 @@ public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOEx } } + private void setReadMetrics(long startNs) { + ParquetMetricsCallback metricsCallback = options.getMetricsCallback(); + if (metricsCallback != null) { + long totalFileReadTimeNs = Math.max(System.nanoTime() - startNs, 0); + double sizeInMb = ((double) length) / (1024 * 1024); + double timeInSec = ((double) totalFileReadTimeNs) / 1000_0000_0000L; + double throughput = sizeInMb / timeInSec; + LOG.debug( + "Parquet: File Read stats: Length: {} MB, Time: {} secs, throughput: {} MB/sec ", + sizeInMb, + timeInSec, + throughput); + metricsCallback.setDuration(ParquetFileReaderMetrics.ReadTime.name(), totalFileReadTimeNs); + metricsCallback.setValueLong(ParquetFileReaderMetrics.ReadSize.name(), length); + metricsCallback.setValueDouble(ParquetFileReaderMetrics.ReadThroughput.name(), throughput); + } + } + /** * @return the position following the last byte of these chunks */ diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReaderMetrics.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReaderMetrics.java new file mode 100644 index 0000000000..737e6abb96 --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReaderMetrics.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop; + +public enum ParquetFileReaderMetrics { + + // metrics + ReadTime("time spent in reading Parquet file from storage"), + SeekTime("time spent in seek when reading Parquet file from storage"), + ReadSize("read size when reading Parquet file from storage (MB)"), + ReadThroughput("read throughput when reading Parquet file from storage (MB/sec)"), + DecompressTime("time spent in block decompression"), + DecompressSize("decompressed data size (MB)"), + DecompressThroughput("block decompression throughput (MB/sec)"); + + private final String desc; + + ParquetFileReaderMetrics(String desc) { + this.desc = desc; + } + + public String description() { + return desc; + } +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetMetricsCallback.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetMetricsCallback.java new file mode 100644 index 0000000000..6527fa1f79 --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetMetricsCallback.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop; + +import org.apache.hadoop.classification.InterfaceStability; + +/** + * A simple interface to pass basic metric values by name to any implementation. Typically, an + * implementation of this interface will serve as a bridge to pass metric values on + * to the metrics system of a distributed engine (hadoop, spark, etc). + *
+ * Development Note: This interface should provide a default implementation for any new metric tracker + * added to allow for backward compatibility + *
+ * e.g. + *
+ * default addMaximum(key, value) { } ; + */ +@InterfaceStability.Unstable +public interface ParquetMetricsCallback { + void setValueInt(String name, int value); + + void setValueLong(String name, long value); + + void setValueFloat(String name, float value); + + void setValueDouble(String name, double value); + + void setDuration(String name, long value); +}