From 1c4e49e9e049a17a97484d08e12173ac0ae496fa Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Tue, 23 Jan 2024 19:08:36 +0800 Subject: [PATCH] [WIP][SQL] Avoid parquet footer reads twice --- pom.xml | 2 +- .../parquet/SpecificParquetRecordReaderBase.java | 15 ++++++++++----- .../datasources/parquet/ParquetFileFormat.scala | 6 +++++- 3 files changed, 16 insertions(+), 7 deletions(-) diff --git a/pom.xml b/pom.xml index 95a70fbf91cb8..fa7fdb995b0a1 100644 --- a/pom.xml +++ b/pom.xml @@ -140,7 +140,7 @@ 3.6.1 10.16.1.1 - 1.13.1 + 1.14.0-SNAPSHOT 1.9.2 shaded-protobuf 9.4.53.v20231009 diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java index 6d00048154a56..1e39418b4838b 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Set; +import org.apache.parquet.io.InputFile; import scala.Option; import com.google.common.annotations.VisibleForTesting; @@ -100,13 +101,17 @@ public void initialize( FileSplit split = (FileSplit) inputSplit; this.file = split.getPath(); ParquetFileReader fileReader; + ParquetReadOptions options = HadoopReadOptions + .builder(configuration, file) + .withRange(split.getStart(), split.getStart() + split.getLength()) + .build(); if (fileFooter.isDefined()) { - fileReader = new ParquetFileReader(configuration, file, fileFooter.get()); + InputFile inputFile = fileFooter.get().getInputFile(); + if (inputFile == null) { + inputFile = HadoopInputFile.fromPath(file, configuration); + } + fileReader = new ParquetFileReader(inputFile, options, fileFooter.get()); } else { - ParquetReadOptions options = HadoopReadOptions - .builder(configuration, file) - .withRange(split.getStart(), split.getStart() + split.getLength()) - .build(); fileReader = new ParquetFileReader( HadoopInputFile.fromPath(file, configuration), options); } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index df367766501d4..676db5948ef2d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -214,6 +214,10 @@ class ParquetFileFormat } else { ParquetFooterReader.readFooter(sharedConf, file, ParquetFooterReader.SKIP_ROW_GROUPS) } + val parquetSplit = + new ParquetInputSplit(filePath, file.start, file.start + file.length, file.length, + Array.empty[String], null) + parquetSplit.setFooter(fileFooter) val footerFileMetaData = fileFooter.getFileMetaData val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec( @@ -322,7 +326,7 @@ class ParquetFileFormat requiredSchema) val iter = new RecordReaderIterator[InternalRow](readerWithRowIndexes) try { - readerWithRowIndexes.initialize(split, hadoopAttemptContext) + readerWithRowIndexes.initialize(parquetSplit, hadoopAttemptContext) val fullSchema = toAttributes(requiredSchema) ++ toAttributes(partitionSchema) val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema)