Skip to content

Commit

Permalink
PARQUET-2413: Support configurable extraMetadata in ParquetWriter (#1241
Browse files Browse the repository at this point in the history
)
  • Loading branch information
clairemcginty authored Jan 28, 2024
1 parent df543e1 commit 19f2843
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalDouble;
import java.util.OptionalLong;
Expand Down Expand Up @@ -113,6 +115,7 @@ public static WriterVersion fromString(String name) {
private final int pageRowCountLimit;
private final boolean pageWriteChecksumEnabled;
private final boolean enableByteStreamSplit;
private final Map<String, String> extraMetaData;

private ParquetProperties(Builder builder) {
this.pageSizeThreshold = builder.pageSize;
Expand All @@ -139,6 +142,7 @@ private ParquetProperties(Builder builder) {
this.pageRowCountLimit = builder.pageRowCountLimit;
this.pageWriteChecksumEnabled = builder.pageWriteChecksumEnabled;
this.enableByteStreamSplit = builder.enableByteStreamSplit;
this.extraMetaData = builder.extraMetaData;
}

public ValuesWriter newRepetitionLevelWriter(ColumnDescriptor path) {
Expand Down Expand Up @@ -293,6 +297,10 @@ public int getBloomFilterCandidatesCount(ColumnDescriptor column) {
return numBloomFilterCandidates.getValue(column);
}

public Map<String, String> getExtraMetaData() {
return extraMetaData;
}

public static Builder builder() {
return new Builder();
}
Expand Down Expand Up @@ -342,6 +350,7 @@ public static class Builder {
private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT;
private boolean pageWriteChecksumEnabled = DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED;
private boolean enableByteStreamSplit = DEFAULT_IS_BYTE_STREAM_SPLIT_ENABLED;
private Map<String, String> extraMetaData = new HashMap<>();

private Builder() {
enableDict = ColumnProperty.<Boolean>builder().withDefaultValue(DEFAULT_IS_DICTIONARY_ENABLED);
Expand Down Expand Up @@ -373,6 +382,7 @@ private Builder(ParquetProperties toCopy) {
this.numBloomFilterCandidates = ColumnProperty.<Integer>builder(toCopy.numBloomFilterCandidates);
this.maxBloomFilterBytes = toCopy.maxBloomFilterBytes;
this.enableByteStreamSplit = toCopy.enableByteStreamSplit;
this.extraMetaData = toCopy.extraMetaData;
}

/**
Expand Down Expand Up @@ -584,6 +594,11 @@ public Builder withPageWriteChecksumEnabled(boolean val) {
return this;
}

public Builder withExtraMetaData(Map<String, String> extraMetaData) {
this.extraMetaData = extraMetaData;
return this;
}

public ParquetProperties build() {
ParquetProperties properties = new ParquetProperties(this);
// we pass a constructed but uninitialized factory to ParquetProperties above as currently
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ParquetProperties;
Expand Down Expand Up @@ -403,15 +405,29 @@ public ParquetWriter(Path file, Configuration conf, WriteSupport<T> writeSupport

this.codecFactory = codecFactory;
CompressionCodecFactory.BytesInputCompressor compressor = codecFactory.getCompressor(compressionCodecName);

final Map<String, String> extraMetadata;
if (encodingProps.getExtraMetaData() == null
|| encodingProps.getExtraMetaData().isEmpty()) {
extraMetadata = writeContext.getExtraMetaData();
} else {
extraMetadata = new HashMap<>(writeContext.getExtraMetaData());

encodingProps.getExtraMetaData().forEach((metadataKey, metadataValue) -> {
if (metadataKey.equals(OBJECT_MODEL_NAME_PROP)) {
throw new IllegalArgumentException("Cannot overwrite metadata key " + OBJECT_MODEL_NAME_PROP
+ ". Please use another key name.");
}

if (extraMetadata.put(metadataKey, metadataValue) != null) {
throw new IllegalArgumentException(
"Duplicate metadata key " + metadataKey + ". Please use another key name.");
}
});
}

this.writer = new InternalParquetRecordWriter<T>(
fileWriter,
writeSupport,
schema,
writeContext.getExtraMetaData(),
rowGroupSize,
compressor,
validating,
encodingProps);
fileWriter, writeSupport, schema, extraMetadata, rowGroupSize, compressor, validating, encodingProps);
}

public void write(T object) throws IOException {
Expand Down Expand Up @@ -849,6 +865,17 @@ public SELF withStatisticsTruncateLength(int length) {
return self();
}

/**
* Sets additional metadata entries to be included in the file footer.
*
* @param extraMetaData a Map of additional stringly-typed metadata entries
* @return this builder for method chaining
*/
public SELF withExtraMetaData(Map<String, String> extraMetaData) {
encodingPropsBuilder.withExtraMetaData(extraMetaData);
return self();
}

/**
* Set a property that will be available to the read path. For writers that use a Hadoop
* configuration, this is the recommended way to add configuration values.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.parquet.hadoop.example;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -98,7 +97,6 @@ public static Builder builder(OutputFile file) {

public static class Builder extends ParquetWriter.Builder<Group, Builder> {
private MessageType type = null;
private Map<String, String> extraMetaData = new HashMap<String, String>();

private Builder(Path file) {
super(file);
Expand All @@ -113,11 +111,6 @@ public Builder withType(MessageType type) {
return this;
}

public Builder withExtraMetaData(Map<String, String> extraMetaData) {
this.extraMetaData = extraMetaData;
return this;
}

@Override
protected Builder self() {
return this;
Expand All @@ -130,7 +123,12 @@ protected WriteSupport<Group> getWriteSupport(Configuration conf) {

@Override
protected WriteSupport<Group> getWriteSupport(ParquetConfiguration conf) {
return new GroupWriteSupport(type, extraMetaData);
return new GroupWriteSupport(type);
}

@Override
public Builder withExtraMetaData(Map<String, String> extraMetaData) {
return super.withExtraMetaData(extraMetaData);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import com.google.common.collect.ImmutableMap;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
Expand Down Expand Up @@ -407,6 +408,64 @@ public void testParquetFileWritesExpectedNumberOfBlocks() throws IOException {
testParquetFileNumberOfBlocks(1, 1, 3);
}

@Test
public void testExtraMetaData() throws Exception {
final Configuration conf = new Configuration();
final File testDir = temp.newFile();
testDir.delete();

final MessageType schema = parseMessageType("message test { required int32 int32_field; }");
GroupWriteSupport.setSchema(schema, conf);
final SimpleGroupFactory f = new SimpleGroupFactory(schema);

for (WriterVersion version : WriterVersion.values()) {
final Path filePath = new Path(testDir.getAbsolutePath(), version.name());
final ParquetWriter<Group> writer = ExampleParquetWriter.builder(new TestOutputFile(filePath, conf))
.withConf(conf)
.withExtraMetaData(ImmutableMap.of("simple-key", "some-value-1", "nested.key", "some-value-2"))
.build();
for (int i = 0; i < 1000; i++) {
writer.write(f.newGroup().append("int32_field", 32));
}
writer.close();

final ParquetFileReader reader =
ParquetFileReader.open(HadoopInputFile.fromPath(filePath, new Configuration()));
assertEquals(1000, reader.readNextRowGroup().getRowCount());
assertEquals(
ImmutableMap.of(
"simple-key",
"some-value-1",
"nested.key",
"some-value-2",
ParquetWriter.OBJECT_MODEL_NAME_PROP,
"example"),
reader.getFileMetaData().getKeyValueMetaData());

reader.close();
}
}

@Test
public void testFailsOnConflictingExtraMetaDataKey() throws Exception {
final Configuration conf = new Configuration();
final File testDir = temp.newFile();
testDir.delete();

final MessageType schema = parseMessageType("message test { required int32 int32_field; }");
GroupWriteSupport.setSchema(schema, conf);

for (WriterVersion version : WriterVersion.values()) {
final Path filePath = new Path(testDir.getAbsolutePath(), version.name());

Assert.assertThrows(IllegalArgumentException.class, () -> ExampleParquetWriter.builder(
new TestOutputFile(filePath, conf))
.withConf(conf)
.withExtraMetaData(ImmutableMap.of(ParquetWriter.OBJECT_MODEL_NAME_PROP, "some-value-3"))
.build());
}
}

private void testParquetFileNumberOfBlocks(
int minRowCountForPageSizeCheck, int maxRowCountForPageSizeCheck, int expectedNumberOfBlocks)
throws IOException {
Expand Down

0 comments on commit 19f2843

Please sign in to comment.