diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java index 7bf4009ee1..5152d5b070 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java @@ -20,6 +20,8 @@ import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt; +import java.util.HashMap; +import java.util.Map; import java.util.Objects; import java.util.OptionalDouble; import java.util.OptionalLong; @@ -113,6 +115,7 @@ public static WriterVersion fromString(String name) { private final int pageRowCountLimit; private final boolean pageWriteChecksumEnabled; private final boolean enableByteStreamSplit; + private final Map extraMetaData; private ParquetProperties(Builder builder) { this.pageSizeThreshold = builder.pageSize; @@ -139,6 +142,7 @@ private ParquetProperties(Builder builder) { this.pageRowCountLimit = builder.pageRowCountLimit; this.pageWriteChecksumEnabled = builder.pageWriteChecksumEnabled; this.enableByteStreamSplit = builder.enableByteStreamSplit; + this.extraMetaData = builder.extraMetaData; } public ValuesWriter newRepetitionLevelWriter(ColumnDescriptor path) { @@ -293,6 +297,10 @@ public int getBloomFilterCandidatesCount(ColumnDescriptor column) { return numBloomFilterCandidates.getValue(column); } + public Map getExtraMetaData() { + return extraMetaData; + } + public static Builder builder() { return new Builder(); } @@ -342,6 +350,7 @@ public static class Builder { private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT; private boolean pageWriteChecksumEnabled = DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED; private boolean enableByteStreamSplit = DEFAULT_IS_BYTE_STREAM_SPLIT_ENABLED; + private Map extraMetaData = new HashMap<>(); private Builder() { enableDict = ColumnProperty.builder().withDefaultValue(DEFAULT_IS_DICTIONARY_ENABLED); @@ -373,6 +382,7 @@ private Builder(ParquetProperties toCopy) { this.numBloomFilterCandidates = ColumnProperty.builder(toCopy.numBloomFilterCandidates); this.maxBloomFilterBytes = toCopy.maxBloomFilterBytes; this.enableByteStreamSplit = toCopy.enableByteStreamSplit; + this.extraMetaData = toCopy.extraMetaData; } /** @@ -584,6 +594,11 @@ public Builder withPageWriteChecksumEnabled(boolean val) { return this; } + public Builder withExtraMetaData(Map extraMetaData) { + this.extraMetaData = extraMetaData; + return this; + } + public ParquetProperties build() { ParquetProperties properties = new ParquetProperties(this); // we pass a constructed but uninitialized factory to ParquetProperties above as currently diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java index fc9db5872f..1838d1db44 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java @@ -20,6 +20,8 @@ import java.io.Closeable; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.column.ParquetProperties; @@ -403,15 +405,29 @@ public ParquetWriter(Path file, Configuration conf, WriteSupport writeSupport this.codecFactory = codecFactory; CompressionCodecFactory.BytesInputCompressor compressor = codecFactory.getCompressor(compressionCodecName); + + final Map extraMetadata; + if (encodingProps.getExtraMetaData() == null + || encodingProps.getExtraMetaData().isEmpty()) { + extraMetadata = writeContext.getExtraMetaData(); + } else { + extraMetadata = new HashMap<>(writeContext.getExtraMetaData()); + + encodingProps.getExtraMetaData().forEach((metadataKey, metadataValue) -> { + if (metadataKey.equals(OBJECT_MODEL_NAME_PROP)) { + throw new IllegalArgumentException("Cannot overwrite metadata key " + OBJECT_MODEL_NAME_PROP + + ". Please use another key name."); + } + + if (extraMetadata.put(metadataKey, metadataValue) != null) { + throw new IllegalArgumentException( + "Duplicate metadata key " + metadataKey + ". Please use another key name."); + } + }); + } + this.writer = new InternalParquetRecordWriter( - fileWriter, - writeSupport, - schema, - writeContext.getExtraMetaData(), - rowGroupSize, - compressor, - validating, - encodingProps); + fileWriter, writeSupport, schema, extraMetadata, rowGroupSize, compressor, validating, encodingProps); } public void write(T object) throws IOException { @@ -849,6 +865,17 @@ public SELF withStatisticsTruncateLength(int length) { return self(); } + /** + * Sets additional metadata entries to be included in the file footer. + * + * @param extraMetaData a Map of additional stringly-typed metadata entries + * @return this builder for method chaining + */ + public SELF withExtraMetaData(Map extraMetaData) { + encodingPropsBuilder.withExtraMetaData(extraMetaData); + return self(); + } + /** * Set a property that will be available to the read path. For writers that use a Hadoop * configuration, this is the recommended way to add configuration values. diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java index 23df1faa35..e6b71a49d2 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java @@ -19,7 +19,6 @@ package org.apache.parquet.hadoop.example; import java.io.IOException; -import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -98,7 +97,6 @@ public static Builder builder(OutputFile file) { public static class Builder extends ParquetWriter.Builder { private MessageType type = null; - private Map extraMetaData = new HashMap(); private Builder(Path file) { super(file); @@ -113,11 +111,6 @@ public Builder withType(MessageType type) { return this; } - public Builder withExtraMetaData(Map extraMetaData) { - this.extraMetaData = extraMetaData; - return this; - } - @Override protected Builder self() { return this; @@ -130,7 +123,12 @@ protected WriteSupport getWriteSupport(Configuration conf) { @Override protected WriteSupport getWriteSupport(ParquetConfiguration conf) { - return new GroupWriteSupport(type, extraMetaData); + return new GroupWriteSupport(type); + } + + @Override + public Builder withExtraMetaData(Map extraMetaData) { + return super.withExtraMetaData(extraMetaData); } } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java index b5dedf6653..fa9ee865d0 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java @@ -36,6 +36,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import com.google.common.collect.ImmutableMap; import java.io.File; import java.io.IOException; import java.util.HashMap; @@ -407,6 +408,64 @@ public void testParquetFileWritesExpectedNumberOfBlocks() throws IOException { testParquetFileNumberOfBlocks(1, 1, 3); } + @Test + public void testExtraMetaData() throws Exception { + final Configuration conf = new Configuration(); + final File testDir = temp.newFile(); + testDir.delete(); + + final MessageType schema = parseMessageType("message test { required int32 int32_field; }"); + GroupWriteSupport.setSchema(schema, conf); + final SimpleGroupFactory f = new SimpleGroupFactory(schema); + + for (WriterVersion version : WriterVersion.values()) { + final Path filePath = new Path(testDir.getAbsolutePath(), version.name()); + final ParquetWriter writer = ExampleParquetWriter.builder(new TestOutputFile(filePath, conf)) + .withConf(conf) + .withExtraMetaData(ImmutableMap.of("simple-key", "some-value-1", "nested.key", "some-value-2")) + .build(); + for (int i = 0; i < 1000; i++) { + writer.write(f.newGroup().append("int32_field", 32)); + } + writer.close(); + + final ParquetFileReader reader = + ParquetFileReader.open(HadoopInputFile.fromPath(filePath, new Configuration())); + assertEquals(1000, reader.readNextRowGroup().getRowCount()); + assertEquals( + ImmutableMap.of( + "simple-key", + "some-value-1", + "nested.key", + "some-value-2", + ParquetWriter.OBJECT_MODEL_NAME_PROP, + "example"), + reader.getFileMetaData().getKeyValueMetaData()); + + reader.close(); + } + } + + @Test + public void testFailsOnConflictingExtraMetaDataKey() throws Exception { + final Configuration conf = new Configuration(); + final File testDir = temp.newFile(); + testDir.delete(); + + final MessageType schema = parseMessageType("message test { required int32 int32_field; }"); + GroupWriteSupport.setSchema(schema, conf); + + for (WriterVersion version : WriterVersion.values()) { + final Path filePath = new Path(testDir.getAbsolutePath(), version.name()); + + Assert.assertThrows(IllegalArgumentException.class, () -> ExampleParquetWriter.builder( + new TestOutputFile(filePath, conf)) + .withConf(conf) + .withExtraMetaData(ImmutableMap.of(ParquetWriter.OBJECT_MODEL_NAME_PROP, "some-value-3")) + .build()); + } + } + private void testParquetFileNumberOfBlocks( int minRowCountForPageSizeCheck, int maxRowCountForPageSizeCheck, int expectedNumberOfBlocks) throws IOException {