Skip to content

Commit

Permalink
PARQUET-2347: Add interface layer between Parquet and Hadoop Configur…
Browse files Browse the repository at this point in the history
…ation (#1141)
  • Loading branch information
amousavigourabi authored Oct 30, 2023
1 parent 4949e06 commit 1b08469
Show file tree
Hide file tree
Showing 57 changed files with 1,206 additions and 133 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@
package org.apache.parquet.avro;

import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.mapreduce.Job;
import org.apache.parquet.avro.AvroWriteSupport;
import org.apache.parquet.hadoop.ParquetOutputFormat;
import org.apache.parquet.hadoop.util.ContextUtil;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.filter.UnboundRecordFilter;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.api.ReadSupport;
Expand Down Expand Up @@ -53,6 +54,10 @@ public static <T> Builder<T> builder(InputFile file) {
return new Builder<T>(file);
}

public static <T> Builder<T> builder(InputFile file, ParquetConfiguration conf) {
return new Builder<T>(file, conf);
}

/**
* Convenience method for creating a ParquetReader which uses Avro
* {@link GenericData} objects to store data from reads.
Expand All @@ -67,6 +72,21 @@ public static ParquetReader<GenericRecord> genericRecordReader(InputFile file) t
return new Builder<GenericRecord>(file).withDataModel(GenericData.get()).build();
}

/**
* Convenience method for creating a ParquetReader which uses Avro
* {@link GenericData} objects to store data from reads.
*
* @param file The location to read data from
* @param conf The configuration to use
* @return A {@code ParquetReader} which reads data as Avro
* {@code GenericData}
* @throws IOException if the InputFile has been closed, or if some other I/O
* error occurs
*/
public static ParquetReader<GenericRecord> genericRecordReader(InputFile file, ParquetConfiguration conf) throws IOException {
return new Builder<GenericRecord>(file, conf).withDataModel(GenericData.get()).build();
}

/**
* Convenience method for creating a ParquetReader which uses Avro
* {@link GenericData} objects to store data from reads.
Expand Down Expand Up @@ -143,6 +163,10 @@ private Builder(InputFile file) {
super(file);
}

private Builder(InputFile file, ParquetConfiguration conf) {
super(file, conf);
}

public Builder<T> withDataModel(GenericData model) {
this.model = model;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ParquetProperties.WriterVersion;
import org.apache.parquet.conf.HadoopParquetConfiguration;
import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
Expand Down Expand Up @@ -154,6 +156,12 @@ private static <T> WriteSupport<T> writeSupport(Schema avroSchema,
private static <T> WriteSupport<T> writeSupport(Configuration conf,
Schema avroSchema,
GenericData model) {
return writeSupport(new HadoopParquetConfiguration(conf), avroSchema, model);
}

private static <T> WriteSupport<T> writeSupport(ParquetConfiguration conf,
Schema avroSchema,
GenericData model) {
return new AvroWriteSupport<T>(
new AvroSchemaConverter(conf).convert(avroSchema), avroSchema, model);
}
Expand Down Expand Up @@ -189,5 +197,10 @@ protected Builder<T> self() {
protected WriteSupport<T> getWriteSupport(Configuration conf) {
return AvroParquetWriter.writeSupport(conf, schema, model);
}

@Override
protected WriteSupport<T> getWriteSupport(ParquetConfiguration conf) {
return AvroParquetWriter.writeSupport(conf, schema, model);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
import org.apache.avro.generic.GenericData;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.parquet.conf.HadoopParquetConfiguration;
import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.hadoop.api.ReadSupport;
import org.apache.parquet.hadoop.util.ConfigurationUtil;
import org.apache.parquet.io.api.RecordMaterializer;
import org.apache.parquet.schema.MessageType;
import org.slf4j.Logger;
Expand Down Expand Up @@ -95,6 +98,13 @@ public AvroReadSupport(GenericData model) {
public ReadContext init(Configuration configuration,
Map<String, String> keyValueMetaData,
MessageType fileSchema) {
return init(new HadoopParquetConfiguration(configuration), keyValueMetaData, fileSchema);
}

@Override
public ReadContext init(ParquetConfiguration configuration,
Map<String, String> keyValueMetaData,
MessageType fileSchema) {
MessageType projection = fileSchema;
Map<String, String> metadata = new LinkedHashMap<String, String>();

Expand All @@ -120,6 +130,13 @@ public ReadContext init(Configuration configuration,
public RecordMaterializer<T> prepareForRead(
Configuration configuration, Map<String, String> keyValueMetaData,
MessageType fileSchema, ReadContext readContext) {
return prepareForRead(new HadoopParquetConfiguration(configuration), keyValueMetaData, fileSchema, readContext);
}

@Override
public RecordMaterializer<T> prepareForRead(
ParquetConfiguration configuration, Map<String, String> keyValueMetaData,
MessageType fileSchema, ReadContext readContext) {
Map<String, String> metadata = readContext.getReadSupportMetadata();
MessageType parquetSchema = readContext.getRequestedSchema();
Schema avroSchema;
Expand Down Expand Up @@ -153,7 +170,7 @@ private static <T> RecordMaterializer<T> newCompatMaterializer(
parquetSchema, avroSchema, model);
}

private GenericData getDataModel(Configuration conf, Schema schema) {
private GenericData getDataModel(ParquetConfiguration conf, Schema schema) {
if (model != null) {
return model;
}
Expand All @@ -175,6 +192,6 @@ private GenericData getDataModel(Configuration conf, Schema schema) {

Class<? extends AvroDataSupplier> suppClass = conf.getClass(
AVRO_DATA_SUPPLIER, SpecificDataSupplier.class, AvroDataSupplier.class);
return ReflectionUtils.newInstance(suppClass, conf).get();
return ReflectionUtils.newInstance(suppClass, ConfigurationUtil.createHadoopConfiguration(conf)).get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.avro.Schema;

import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.conf.HadoopParquetConfiguration;
import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.schema.ConversionPatterns;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
Expand Down Expand Up @@ -102,6 +104,10 @@ public AvroSchemaConverter() {
}

public AvroSchemaConverter(Configuration conf) {
this(new HadoopParquetConfiguration(conf));
}

public AvroSchemaConverter(ParquetConfiguration conf) {
this.assumeRepeatedIsListElement = conf.getBoolean(
ADD_LIST_ELEMENT_RECORDS, ADD_LIST_ELEMENT_RECORDS_DEFAULT);
this.writeOldListStructure = conf.getBoolean(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.conf.HadoopParquetConfiguration;
import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.util.ConfigurationUtil;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.GroupType;
Expand Down Expand Up @@ -129,6 +132,11 @@ public static void setSchema(Configuration configuration, Schema schema) {

@Override
public WriteContext init(Configuration configuration) {
return init(new HadoopParquetConfiguration(configuration));
}

@Override
public WriteContext init(ParquetConfiguration configuration) {
if (rootAvroSchema == null) {
this.rootAvroSchema = new Schema.Parser().parse(configuration.get(AVRO_SCHEMA));
this.rootSchema = new AvroSchemaConverter(configuration).convert(rootAvroSchema);
Expand Down Expand Up @@ -404,7 +412,7 @@ private Binary fromAvroString(Object value) {
return Binary.fromCharSequence(value.toString());
}

private static GenericData getDataModel(Configuration conf, Schema schema) {
private static GenericData getDataModel(ParquetConfiguration conf, Schema schema) {
if (conf.get(AVRO_DATA_SUPPLIER) == null && schema != null) {
GenericData modelForSchema;
try {
Expand All @@ -423,7 +431,7 @@ private static GenericData getDataModel(Configuration conf, Schema schema) {

Class<? extends AvroDataSupplier> suppClass = conf.getClass(
AVRO_DATA_SUPPLIER, SpecificDataSupplier.class, AvroDataSupplier.class);
return ReflectionUtils.newInstance(suppClass, conf).get();
return ReflectionUtils.newInstance(suppClass, ConfigurationUtil.createHadoopConfiguration(conf)).get();
}

private abstract class ListWriter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import org.apache.avro.util.Utf8;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.conf.HadoopParquetConfiguration;
import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.api.WriteSupport;
Expand All @@ -77,22 +79,31 @@ public class TestReadWrite {
@Parameterized.Parameters
public static Collection<Object[]> data() {
Object[][] data = new Object[][] {
{ false, false }, // use the new converters
{ true, false }, // use the old converters
{ false, true } }; // use a local disk location
{ false, false, false }, // use the new converters with hadoop config
{ true, false, false }, // use the old converters with hadoop config
{ false, true, false }, // use a local disk location with hadoop config
{ false, false, true }, // use the new converters with parquet config interface
{ true, false, true }, // use the old converters with parquet config interface
{ false, true, true } }; // use a local disk location with parquet config interface
return Arrays.asList(data);
}

private final boolean compat;
private final boolean local;
private final boolean confInterface;
private final Configuration testConf = new Configuration();
private final ParquetConfiguration parquetConf = new HadoopParquetConfiguration(true);

public TestReadWrite(boolean compat, boolean local) {
public TestReadWrite(boolean compat, boolean local, boolean confInterface) {
this.compat = compat;
this.local = local;
this.confInterface = confInterface;
this.testConf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, compat);
testConf.setBoolean("parquet.avro.add-list-element-records", false);
testConf.setBoolean("parquet.avro.write-old-list-structure", false);
this.testConf.setBoolean("parquet.avro.add-list-element-records", false);
this.testConf.setBoolean("parquet.avro.write-old-list-structure", false);
this.parquetConf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, compat);
this.parquetConf.setBoolean("parquet.avro.add-list-element-records", false);
this.parquetConf.setBoolean("parquet.avro.write-old-list-structure", false);
}

@Test
Expand Down Expand Up @@ -431,6 +442,11 @@ public void testAllUsingDefaultAvroSchema() throws Exception {

@Override
public WriteContext init(Configuration configuration) {
return init(new HadoopParquetConfiguration(configuration));
}

@Override
public WriteContext init(ParquetConfiguration configuration) {
return new WriteContext(MessageTypeParser.parseMessageType(TestAvroSchemaConverter.ALL_PARQUET_SCHEMA),
new HashMap<String, String>());
}
Expand Down Expand Up @@ -864,30 +880,44 @@ private File createTempFile() throws IOException {
}

private ParquetWriter<GenericRecord> writer(String file, Schema schema) throws IOException {
AvroParquetWriter.Builder<GenericRecord> writerBuilder;
if (local) {
return AvroParquetWriter
writerBuilder = AvroParquetWriter
.<GenericRecord>builder(new LocalOutputFile(Paths.get(file)))
.withSchema(schema)
.withConf(testConf)
.build();
.withSchema(schema);
} else {
return AvroParquetWriter
writerBuilder = AvroParquetWriter
.<GenericRecord>builder(new Path(file))
.withSchema(schema)
.withSchema(schema);
}
if (confInterface) {
return writerBuilder
.withConf(parquetConf)
.build();
} else {
return writerBuilder
.withConf(testConf)
.build();
}
}

private ParquetReader<GenericRecord> reader(String file) throws IOException {
AvroParquetReader.Builder<GenericRecord> readerBuilder;
if (local) {
return AvroParquetReader
readerBuilder = AvroParquetReader
.<GenericRecord>builder(new LocalInputFile(Paths.get(file)))
.withDataModel(GenericData.get())
.withConf(testConf)
.withDataModel(GenericData.get());
} else {
return new AvroParquetReader<>(testConf, new Path(file));
}
if (confInterface) {
return readerBuilder
.withConf(parquetConf)
.build();
} else {
return new AvroParquetReader(testConf, new Path(file));
return readerBuilder
.withConf(testConf)
.build();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.apache.avro.util.Utf8;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.conf.HadoopParquetConfiguration;
import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.io.api.Binary;
Expand Down Expand Up @@ -358,6 +360,11 @@ public void testAllUsingDefaultAvroSchema() throws Exception {

@Override
public WriteContext init(Configuration configuration) {
return init(new HadoopParquetConfiguration(configuration));
}

@Override
public WriteContext init(ParquetConfiguration configuration) {
return new WriteContext(MessageTypeParser.parseMessageType(TestAvroSchemaConverter.ALL_PARQUET_SCHEMA),
new HashMap<String, String>());
}
Expand Down
Loading

0 comments on commit 1b08469

Please sign in to comment.