diff --git a/parquet-protobuf/pom.xml b/parquet-protobuf/pom.xml
index 85f83f5516..a7e0cf2ef0 100644
--- a/parquet-protobuf/pom.xml
+++ b/parquet-protobuf/pom.xml
@@ -30,7 +30,6 @@
jar
- 4.4
3.24.4
1.1.3
diff --git a/parquet-thrift/pom.xml b/parquet-thrift/pom.xml
index 833a0b55a2..99e85efc43 100644
--- a/parquet-thrift/pom.xml
+++ b/parquet-thrift/pom.xml
@@ -32,10 +32,6 @@
Apache Parquet Thrift
https://parquet.apache.org
-
- 4.4
-
-
org.apache.parquet
diff --git a/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestCorruptThriftRecords.java b/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestCorruptThriftRecords.java
index c31aa9c404..fa4dc634ec 100644
--- a/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestCorruptThriftRecords.java
+++ b/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestCorruptThriftRecords.java
@@ -46,7 +46,6 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
-import static org.apache.parquet.hadoop.thrift.TestInputOutputFormat.waitForJob;
public class TestCorruptThriftRecords {
@@ -166,48 +165,5 @@ private Path writeFileWithCorruptRecords(int numCorrupt, List
private void readFile(Path path, Configuration conf, String name) throws Exception {
Job job = new Job(conf, name);
setupJob(job, path);
- waitForJob(job);
- }
-
- @Test
- public void testDefaultsToNoTolerance() throws Exception {
- ArrayList expected = new ArrayList();
- try {
- readFile(writeFileWithCorruptRecords(1, expected), new Configuration(), "testDefaultsToNoTolerance");
- fail("This should throw");
- } catch (RuntimeException e) {
- // still should have actually read all the valid records
- assertEquals(100, ReadMapper.records.size());
- assertEqualsExcepted(expected.subList(0, 100), ReadMapper.records);
- }
- }
-
- @Test
- public void testCanTolerateBadRecords() throws Exception {
- Configuration conf = new Configuration();
- conf.setFloat(UnmaterializableRecordCounter.BAD_RECORD_THRESHOLD_CONF_KEY, 0.1f);
-
- List expected = new ArrayList();
-
- readFile(writeFileWithCorruptRecords(4, expected), conf, "testCanTolerateBadRecords");
- assertEquals(200, ReadMapper.records.size());
- assertEqualsExcepted(expected, ReadMapper.records);
- }
-
- @Test
- public void testThrowsWhenTooManyBadRecords() throws Exception {
- Configuration conf = new Configuration();
- conf.setFloat(UnmaterializableRecordCounter.BAD_RECORD_THRESHOLD_CONF_KEY, 0.1f);
-
- ArrayList expected = new ArrayList();
-
- try {
- readFile(writeFileWithCorruptRecords(300, expected), conf, "testThrowsWhenTooManyBadRecords");
- fail("This should throw");
- } catch (RuntimeException e) {
- // still should have actually read all the valid records
- assertEquals(100, ReadMapper.records.size());
- assertEqualsExcepted(expected.subList(0, 100), ReadMapper.records);
- }
}
}
diff --git a/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestInputOutputFormat.java b/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestInputOutputFormat.java
deleted file mode 100644
index af8e60d036..0000000000
--- a/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestInputOutputFormat.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.hadoop.thrift;
-
-import static java.lang.Thread.sleep;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.ArrayList;
-
-import org.junit.Assert;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.thrift.TBase;
-import org.junit.Test;
-
-import org.apache.parquet.example.data.Group;
-import org.apache.parquet.hadoop.metadata.CompressionCodecName;
-import org.apache.parquet.thrift.test.compat.StructV1;
-import org.apache.parquet.thrift.test.compat.StructV2;
-import org.apache.parquet.thrift.test.compat.StructV3;
-
-import com.twitter.data.proto.tutorial.thrift.AddressBook;
-import com.twitter.data.proto.tutorial.thrift.Name;
-import com.twitter.data.proto.tutorial.thrift.Person;
-import com.twitter.data.proto.tutorial.thrift.PhoneNumber;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TestInputOutputFormat {
- private static final Logger LOG = LoggerFactory.getLogger(TestInputOutputFormat.class);
-
- public static AddressBook nextAddressbook(int i) {
- final ArrayList persons = new ArrayList();
- for (int j = 0; j < i % 3; j++) {
- final ArrayList phones = new ArrayList();
- for (int k = 0; k < i%4; k++) {
- phones.add(new PhoneNumber("12345"+i));
- }
- persons.add(new Person(new Name("John"+i, "Roberts"), i, "John@example.com" + i, phones));
- }
- AddressBook a = new AddressBook(persons);
- return a;
- };
-
- public static class MyMapper extends Mapper {
-
- public void run(org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException, InterruptedException {
- for (int i = 0; i < 10; i++) {
- AddressBook a = TestInputOutputFormat.nextAddressbook(i);
- context.write(null, a);
- }
- }
- }
-
- public static class MyMapper2 extends Mapper {
- protected void map(Void key, AddressBook value, Mapper.Context context) throws IOException ,InterruptedException {
- context.write(null, new Text(value.toString()));
- }
-
- }
-
- @Test
- public void testReadWrite() throws Exception {
- final Configuration conf = new Configuration();
- final Path inputPath = new Path("src/test/java/org/apache/parquet/hadoop/thrift/TestInputOutputFormat.java");
- final Path parquetPath = new Path("target/test/thrift/TestInputOutputFormat/parquet");
- final Path outputPath = new Path("target/test/thrift/TestInputOutputFormat/out");
- final FileSystem fileSystem = parquetPath.getFileSystem(conf);
- fileSystem.delete(parquetPath, true);
- fileSystem.delete(outputPath, true);
- {
- final Job job = new Job(conf, "write");
-
- // input not really used
- TextInputFormat.addInputPath(job, inputPath);
- job.setInputFormatClass(TextInputFormat.class);
-
- job.setMapperClass(TestInputOutputFormat.MyMapper.class);
- job.setNumReduceTasks(0);
-
- job.setOutputFormatClass(ParquetThriftOutputFormat.class);
- ParquetThriftOutputFormat.setCompression(job, CompressionCodecName.GZIP);
- ParquetThriftOutputFormat.setOutputPath(job, parquetPath);
- ParquetThriftOutputFormat.setThriftClass(job, AddressBook.class);
-
- waitForJob(job);
- }
- {
- final Job job = new Job(conf, "read");
- job.setInputFormatClass(ParquetThriftInputFormat.class);
- ParquetThriftInputFormat.setInputPaths(job, parquetPath);
-
- job.setMapperClass(TestInputOutputFormat.MyMapper2.class);
- job.setNumReduceTasks(0);
-
- job.setOutputFormatClass(TextOutputFormat.class);
- TextOutputFormat.setOutputPath(job, outputPath);
-
- waitForJob(job);
- }
-
- final BufferedReader out = new BufferedReader(new FileReader(new File(outputPath.toString(), "part-m-00000")));
- String lineOut = null;
- int lineNumber = 0;
- while ((lineOut = out.readLine()) != null) {
- lineOut = lineOut.substring(lineOut.indexOf("\t") + 1);
- AddressBook a = nextAddressbook(lineNumber);
- assertEquals("line " + lineNumber, a.toString(), lineOut);
- ++ lineNumber;
- }
- assertNull("line " + lineNumber, out.readLine());
- out.close();
- }
-
- public static class SchemaEvolutionMapper1 extends Mapper {
- protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException ,InterruptedException {
- context.write(null, new StructV1(value.toString() + 1));
- };
- }
-
- public static class SchemaEvolutionMapper2 extends Mapper {
- protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException ,InterruptedException {
- final StructV2 s = new StructV2(value.toString() + 2);
- s.setAge("undetermined");
- context.write(null, s);
- };
- }
-
- public static class SchemaEvolutionMapper3 extends Mapper {
- protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException ,InterruptedException {
- final StructV3 s = new StructV3(value.toString() + 3);
- s.setAge("average");
- s.setGender("unavailable");
- context.write(null, s);
- };
- }
-
- public static class SchemaEvolutionReadMapper extends Mapper {
- protected void map(LongWritable key, StructV3 value, org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException ,InterruptedException {
- context.write(null, new Text(value.toString()));
- };
- }
-
- @Test
- public void testSchemaEvolution() throws Exception {
- final Configuration conf = new Configuration();
- final Path inputPath = new Path("target/test/thrift/schema_evolution/in");
- final Path parquetPath = new Path("target/test/thrift/schema_evolution/parquet");
- final Path outputPath = new Path("target/test/thrift/schema_evolution/out");
- final FileSystem fileSystem = parquetPath.getFileSystem(conf);
- fileSystem.delete(inputPath, true);
- final FSDataOutputStream in = fileSystem.create(inputPath);
- in.writeUTF("Alice\nBob\nCharles\n");
- in.close();
- fileSystem.delete(parquetPath, true);
- fileSystem.delete(outputPath, true);
- {
- write(conf, inputPath, new Path(parquetPath, "V1"), TestInputOutputFormat.SchemaEvolutionMapper1.class, StructV1.class);
- write(conf, inputPath, new Path(parquetPath, "V2"), TestInputOutputFormat.SchemaEvolutionMapper2.class, StructV2.class);
- write(conf, inputPath, new Path(parquetPath, "V3"), TestInputOutputFormat.SchemaEvolutionMapper3.class, StructV3.class);
- }
- {
- final Job job = new Job(conf, "read");
- job.setInputFormatClass(ParquetThriftInputFormat.class);
- ParquetThriftInputFormat.setInputPaths(job, new Path(parquetPath, "*"));
- ParquetThriftInputFormat.setThriftClass(job.getConfiguration(), StructV3.class);
- job.setMapperClass(TestInputOutputFormat.SchemaEvolutionReadMapper.class);
- job.setNumReduceTasks(0);
-
- job.setOutputFormatClass(TextOutputFormat.class);
- TextOutputFormat.setOutputPath(job, outputPath);
-
- waitForJob(job);
- }
-
- read(outputPath + "/part-m-00000", 3);
- read(outputPath + "/part-m-00001", 3);
- read(outputPath + "/part-m-00002", 3);
- }
-
- private void read(String outputPath, int expected) throws FileNotFoundException,
- IOException {
- final BufferedReader out = new BufferedReader(new FileReader(new File(outputPath.toString())));
- String lineOut = null;
- int lineNumber = 0;
- while ((lineOut = out.readLine()) != null) {
- lineOut = lineOut.substring(lineOut.indexOf("\t") + 1);
- System.out.println(lineOut);
- ++ lineNumber;
- }
- out.close();
- Assert.assertEquals(expected, lineNumber);
- }
-
- private void write(final Configuration conf, final Path inputPath,
- final Path parquetPath, Class extends Mapper> mapperClass, Class extends TBase, ?>> outputClass) throws IOException, Exception {
- final Job job = new Job(conf, "write");
-
- // input not really used
- TextInputFormat.addInputPath(job, inputPath);
- job.setInputFormatClass(TextInputFormat.class);
-
- job.setMapperClass(mapperClass);
- job.setNumReduceTasks(0);
-
- job.setOutputFormatClass(ParquetThriftOutputFormat.class);
- ParquetThriftOutputFormat.setCompression(job, CompressionCodecName.GZIP);
- ParquetThriftOutputFormat.setOutputPath(job, parquetPath);
- ParquetThriftOutputFormat.setThriftClass(job, outputClass);
-
- waitForJob(job);
- }
-
- public static void waitForJob(Job job) throws Exception {
- job.submit();
- while (!job.isComplete()) {
- LOG.debug("waiting for job {}", job.getJobName());
- sleep(100);
- }
- LOG.info("status for job {}: {}", job.getJobName(), (job.isSuccessful() ? "SUCCESS" : "FAILURE"));
- if (!job.isSuccessful()) {
- throw new RuntimeException("job failed " + job.getJobName());
- }
- }
-
-}
diff --git a/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestParquetToThriftReadWriteAndProjection.java b/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestParquetToThriftReadWriteAndProjection.java
deleted file mode 100644
index 3fc71f4b11..0000000000
--- a/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestParquetToThriftReadWriteAndProjection.java
+++ /dev/null
@@ -1,385 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.hadoop.thrift;
-
-import com.twitter.data.proto.tutorial.thrift.AddressBook;
-import com.twitter.data.proto.tutorial.thrift.Name;
-import com.twitter.data.proto.tutorial.thrift.Person;
-import com.twitter.data.proto.tutorial.thrift.PhoneNumber;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapreduce.*;
-import org.apache.parquet.thrift.test.compat.MapWithPrimMapValue;
-import org.apache.parquet.thrift.test.compat.MapWithStructMapValue;
-import org.apache.parquet.thrift.test.compat.MapWithStructValue;
-import org.apache.parquet.thrift.test.compat.StructV3;
-import org.apache.parquet.thrift.test.compat.StructV4WithExtracStructField;
-import org.apache.thrift.TBase;
-import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.transport.TIOStreamTransport;
-import org.junit.Test;
-import org.apache.parquet.hadoop.api.ReadSupport;
-import org.apache.parquet.hadoop.util.ContextUtil;
-import org.apache.parquet.thrift.test.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayOutputStream;
-import java.util.*;
-
-import static org.junit.Assert.assertEquals;
-
-public class TestParquetToThriftReadWriteAndProjection {
-
- private static final Logger LOG = LoggerFactory.getLogger(TestParquetToThriftReadWriteAndProjection.class);
-
- @Test
- public void testThriftOptionalFieldsWithReadProjectionUsingParquetSchema() throws Exception {
- // test with projection
- Configuration conf = new Configuration();
- final String readProjectionSchema = "message AddressBook {\n" +
- " optional group persons {\n" +
- " repeated group persons_tuple {\n" +
- " required group name {\n" +
- " optional binary first_name;\n" +
- " optional binary last_name;\n" +
- " }\n" +
- " optional int32 id;\n" +
- " }\n" +
- " }\n" +
- "}";
- conf.set(ReadSupport.PARQUET_READ_SCHEMA, readProjectionSchema);
- TBase toWrite = new AddressBook(
- Arrays.asList(
- new Person(
- new Name("Bob", "Roberts"),
- 0,
- "bob.roberts@example.com",
- Arrays.asList(new PhoneNumber("1234567890")))));
-
- TBase toRead = new AddressBook(
- Arrays.asList(
- new Person(
- new Name("Bob", "Roberts"),
- 0,
- null,
- null)));
- shouldDoProjection(conf, toWrite, toRead, AddressBook.class);
- }
-
- @Test
- public void testPullingInRequiredStructWithFilter() throws Exception {
- final String projectionFilterDesc = "persons/{id};persons/email";
- TBase toWrite = new AddressBook(
- Arrays.asList(
- new Person(
- new Name("Bob", "Roberts"),
- 0,
- "bob.roberts@example.com",
- Arrays.asList(new PhoneNumber("1234567890")))));
-
- //Name is a required field, but is projected out. To make the thrift record pass validation, the name field is filled
- //with empty string
- TBase toRead = new AddressBook(
- Arrays.asList(
- new Person(
- new Name("", ""),
- 0,
- "bob.roberts@example.com",
- null)));
- shouldDoProjectionWithThriftColumnFilter(projectionFilterDesc, toWrite, toRead, AddressBook.class);
- }
-
- @Test
- public void testReorderdOptionalFields() throws Exception {
- final String projectionFilter = "**";
- StructWithReorderedOptionalFields toWrite = new StructWithReorderedOptionalFields();
- toWrite.setFieldOne(1);
- toWrite.setFieldTwo(2);
- toWrite.setFieldThree(3);
- shouldDoProjectionWithThriftColumnFilter(projectionFilter, toWrite, toWrite, StructWithReorderedOptionalFields.class);
- }
-
- @Test
- public void testProjectOutOptionalFields() throws Exception {
-
- final String projectionFilterDesc = "persons/name/*";
-
- TBase toWrite = new AddressBook(
- Arrays.asList(
- new Person(
- new Name("Bob", "Roberts"),
- 0,
- "bob.roberts@example.com",
- Arrays.asList(new PhoneNumber("1234567890")))));
-
- //emails and phones are optional fields that do not match the projection filter
- TBase toRead = new AddressBook(
- Arrays.asList(
- new Person(
- new Name("Bob", "Roberts"),
- 0,
- null,
- null))
- );
-
- shouldDoProjectionWithThriftColumnFilter(projectionFilterDesc, toWrite, toRead, AddressBook.class);
- }
-
- @Test
- public void testPullInRequiredMaps() throws Exception {
- String filter = "name";
-
- Map mapValue = new HashMap();
- mapValue.put("a", "1");
- mapValue.put("b", "2");
- RequiredMapFixture toWrite = new RequiredMapFixture(mapValue);
- toWrite.setName("testName");
-
- RequiredMapFixture toRead = new RequiredMapFixture(new HashMap());
- toRead.setName("testName");
-
- shouldDoProjectionWithThriftColumnFilter(filter, toWrite, toRead, RequiredMapFixture.class);
- }
-
- @Test
- public void testDropMapValuePrimitive() throws Exception {
- String filter = "mavalue/key";
-
- Map mapValue = new HashMap();
- mapValue.put("a", "1");
- mapValue.put("b", "2");
- RequiredMapFixture toWrite = new RequiredMapFixture(mapValue);
- toWrite.setName("testName");
-
- // for now we expect no value projection to happen
- // because a sentinel value is selected from the value
- Map readValue = new HashMap();
- readValue.put("a", "1");
- readValue.put("b", "2");
-
- RequiredMapFixture toRead = new RequiredMapFixture(readValue);
-
- shouldDoProjectionWithThriftColumnFilter(filter, toWrite, toRead, RequiredMapFixture.class);
- }
-
- private StructV4WithExtracStructField makeStructV4WithExtracStructField(String id) {
- StructV4WithExtracStructField sv4 = new StructV4WithExtracStructField();
- StructV3 sv3 = new StructV3();
- sv3.setAge("age " + id);
- sv3.setGender("gender" + id);
- sv3.setName("inner name " + id);
- sv4.setAge("outer age " + id);
- sv4.setAddedStruct(sv3);
- sv4.setGender("outer gender " + id);
- sv4.setName("outer name " + id);
- return sv4;
- }
-
-
- @Test
- public void testDropMapValueStruct() throws Exception {
- String filter = "reqMap/key";
-
- Map mapValue = new HashMap();
-
- StructV4WithExtracStructField v1 = makeStructV4WithExtracStructField("1");
- StructV4WithExtracStructField v2 = makeStructV4WithExtracStructField("2");
-
- mapValue.put("key 1", v1);
- mapValue.put("key 2", v2);
- MapWithStructValue toWrite = new MapWithStructValue(mapValue);
-
- // for now we expect a sentinel column to be kept
- HashMap readValue = new HashMap();
- readValue.put("key 1", new StructV4WithExtracStructField("outer name 1"));
- readValue.put("key 2", new StructV4WithExtracStructField("outer name 2"));
-
- MapWithStructValue toRead = new MapWithStructValue(readValue);
-
- shouldDoProjectionWithThriftColumnFilter(filter, toWrite, toRead, MapWithStructValue.class);
- }
-
- @Test
- public void testDropMapValueNestedPrim() throws Exception {
- String filter = "reqMap/key";
-
- Map> mapValue =
- new HashMap>();
-
- Map innerValue1 = new HashMap();
- innerValue1.put("inner key (1, 1)", "inner (1, 1)");
- innerValue1.put("inner key (1, 2)", "inner (1, 2)");
-
- Map innerValue2 = new HashMap();
- innerValue2.put("inner key (2, 1)", "inner (2, 1)");
- innerValue2.put("inner key (2, 2)", "inner (2, 2)");
-
- mapValue.put("outer key 1", innerValue1);
- mapValue.put("outer key 2", innerValue2);
-
- MapWithPrimMapValue toWrite = new MapWithPrimMapValue(mapValue);
-
- Map> expected = new HashMap>();
-
- Map expectedInnerValue1 = new HashMap();
- expectedInnerValue1.put("inner key (1, 1)", "inner (1, 1)");
- expectedInnerValue1.put("inner key (1, 2)", "inner (1, 2)");
-
- Map expectedInnerValue2 = new HashMap();
- expectedInnerValue2.put("inner key (2, 1)", "inner (2, 1)");
- expectedInnerValue2.put("inner key (2, 2)", "inner (2, 2)");
-
- expected.put("outer key 1", expectedInnerValue1);
- expected.put("outer key 2", expectedInnerValue2);
-
- MapWithPrimMapValue toRead = new MapWithPrimMapValue(expected);
-
- shouldDoProjectionWithThriftColumnFilter(filter, toWrite, toRead, MapWithPrimMapValue.class);
- }
-
-
- @Test
- public void testDropMapValueNestedStruct() throws Exception {
- String filter = "reqMap/key";
-
- Map> mapValue =
- new HashMap>();
-
- Map innerValue1 = new HashMap();
- innerValue1.put("inner key (1, 1)", makeStructV4WithExtracStructField("inner (1, 1)"));
- innerValue1.put("inner key (1, 2)", makeStructV4WithExtracStructField("inner (1, 2)"));
-
- Map innerValue2 = new HashMap();
- innerValue2.put("inner key (2, 1)", makeStructV4WithExtracStructField("inner (2, 1)"));
- innerValue2.put("inner key (2, 2)", makeStructV4WithExtracStructField("inner (2, 2)"));
-
- mapValue.put("outer key 1", innerValue1);
- mapValue.put("outer key 2", innerValue2);
-
- MapWithStructMapValue toWrite = new MapWithStructMapValue(mapValue);
-
- Map> expected = new HashMap>();
-
- Map expectedInnerValue1 = new HashMap();
- expectedInnerValue1.put("inner key (1, 1)", new StructV4WithExtracStructField("outer name inner (1, 1)"));
- expectedInnerValue1.put("inner key (1, 2)", new StructV4WithExtracStructField("outer name inner (1, 2)"));
-
- Map expectedInnerValue2 = new HashMap();
- expectedInnerValue2.put("inner key (2, 1)", new StructV4WithExtracStructField("outer name inner (2, 1)"));
- expectedInnerValue2.put("inner key (2, 2)", new StructV4WithExtracStructField("outer name inner (2, 2)"));
-
- expected.put("outer key 1", expectedInnerValue1);
- expected.put("outer key 2", expectedInnerValue2);
-
- MapWithStructMapValue toRead = new MapWithStructMapValue(expected);
-
- shouldDoProjectionWithThriftColumnFilter(filter, toWrite, toRead, MapWithStructMapValue.class);
- }
-
- @Test
- public void testPullInRequiredLists() throws Exception {
- String filter = "info";
-
- RequiredListFixture toWrite = new RequiredListFixture(Arrays.asList(new org.apache.parquet.thrift.test.Name("first_name")));
- toWrite.setInfo("test_info");
-
- RequiredListFixture toRead = new RequiredListFixture(new ArrayList());
- toRead.setInfo("test_info");
-
- shouldDoProjectionWithThriftColumnFilter(filter, toWrite, toRead, RequiredListFixture.class);
- }
-
- @Test
- public void testPullInRequiredSets() throws Exception {
- String filter = "info";
-
- RequiredSetFixture toWrite = new RequiredSetFixture(new HashSet(Arrays.asList(new org.apache.parquet.thrift.test.Name("first_name"))));
- toWrite.setInfo("test_info");
-
- RequiredSetFixture toRead = new RequiredSetFixture(new HashSet());
- toRead.setInfo("test_info");
-
- shouldDoProjectionWithThriftColumnFilter(filter, toWrite, toRead, RequiredSetFixture.class);
- }
-
- @Test
- public void testPullInPrimitiveValues() throws Exception {
- String filter = "info_string";
-
- RequiredPrimitiveFixture toWrite = new RequiredPrimitiveFixture(true, (byte)2, (short)3, 4, (long)5, (double)6.0, "7");
- toWrite.setInfo_string("it's info");
-
- RequiredPrimitiveFixture toRead = new RequiredPrimitiveFixture(false, (byte)0, (short)0, 0, (long)0, (double)0.0, "");
- toRead.setInfo_string("it's info");
-
- shouldDoProjectionWithThriftColumnFilter(filter, toWrite, toRead, RequiredPrimitiveFixture.class);
- }
-
- private void shouldDoProjectionWithThriftColumnFilter(String filterDesc, TBase toWrite, TBase toRead, Class extends TBase, ?>> thriftClass) throws Exception {
- Configuration conf = new Configuration();
- conf.set(ThriftReadSupport.THRIFT_COLUMN_FILTER_KEY, filterDesc);
- shouldDoProjection(conf, toWrite, toRead, thriftClass);
- }
-
-
- private > void shouldDoProjection(Configuration conf, T recordToWrite, T exptectedReadResult, Class extends TBase, ?>> thriftClass) throws Exception {
- final Path parquetFile = new Path("target/test/TestParquetToThriftReadWriteAndProjection/file.parquet");
- final FileSystem fs = parquetFile.getFileSystem(conf);
- if (fs.exists(parquetFile)) {
- fs.delete(parquetFile, true);
- }
-
- //create a test file
- final TProtocolFactory protocolFactory = new TCompactProtocol.Factory();
- final TaskAttemptID taskId = new TaskAttemptID("local", 0, true, 0, 0);
- final ThriftToParquetFileWriter w = new ThriftToParquetFileWriter(parquetFile, ContextUtil.newTaskAttemptContext(conf, taskId), protocolFactory, thriftClass);
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- final TProtocol protocol = protocolFactory.getProtocol(new TIOStreamTransport(baos));
-
- recordToWrite.write(protocol);
- w.write(new BytesWritable(baos.toByteArray()));
- w.close();
-
-
- final ParquetThriftInputFormat parquetThriftInputFormat = new ParquetThriftInputFormat();
- final Job job = new Job(conf, "read");
- job.setInputFormatClass(ParquetThriftInputFormat.class);
- ParquetThriftInputFormat.setInputPaths(job, parquetFile);
- final JobID jobID = new JobID("local", 1);
- List splits = parquetThriftInputFormat.getSplits(ContextUtil.newJobContext(ContextUtil.getConfiguration(job), jobID));
- T readValue = null;
- for (InputSplit split : splits) {
- TaskAttemptContext taskAttemptContext = ContextUtil.newTaskAttemptContext(ContextUtil.getConfiguration(job), new TaskAttemptID(new TaskID(jobID, true, 1), 0));
- try (final RecordReader reader = parquetThriftInputFormat.createRecordReader(split, taskAttemptContext)) {
- reader.initialize(split, taskAttemptContext);
- if (reader.nextKeyValue()) {
- readValue = reader.getCurrentValue();
- LOG.info("{}", readValue);
- }
- }
- }
- assertEquals(exptectedReadResult, readValue);
- }
-
-}
diff --git a/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestThriftToParquetFileWriter.java b/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestThriftToParquetFileWriter.java
index e96b226e4e..5df0f8624b 100644
--- a/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestThriftToParquetFileWriter.java
+++ b/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestThriftToParquetFileWriter.java
@@ -19,6 +19,7 @@
package org.apache.parquet.hadoop.thrift;
import static org.junit.Assert.assertEquals;
+
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;
@@ -56,13 +57,6 @@
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
-import com.twitter.data.proto.tutorial.thrift.AddressBook;
-import com.twitter.data.proto.tutorial.thrift.Name;
-import com.twitter.data.proto.tutorial.thrift.Person;
-import com.twitter.data.proto.tutorial.thrift.PhoneNumber;
-import com.twitter.elephantbird.thrift.test.TestListInMap;
-import com.twitter.elephantbird.thrift.test.TestMapInList;
-
import org.apache.parquet.schema.MessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,253 +65,123 @@ public class TestThriftToParquetFileWriter {
private static final Logger LOG = LoggerFactory.getLogger(TestThriftToParquetFileWriter.class);
@Test
- public void testWriteFile() throws IOException, InterruptedException, TException {
- final AddressBook a = new AddressBook(
- Arrays.asList(
- new Person(
- new Name("Bob", "Roberts"),
- 0,
- "bob.roberts@example.com",
- Arrays.asList(new PhoneNumber("1234567890")))));
-
- final Path fileToCreate = createFile(new Configuration(), a);
-
- ParquetReader reader = createRecordReader(fileToCreate);
-
- Group g = null;
- int i = 0;
- while((g = reader.read()) != null) {
- assertEquals(a.persons.size(), g.getFieldRepetitionCount("persons"));
- assertEquals(a.persons.get(0).email, g.getGroup("persons", 0).getGroup(0, 0).getString("email", 0));
- // just some sanity check, we're testing the various layers somewhere else
- ++i;
- }
- assertEquals("read 1 record", 1, i);
-
- }
- @Test
- public void testWriteStatistics() throws Exception {
- //create correct stats small numbers
- IntStatistics intStatsSmall = new IntStatistics();
- intStatsSmall.setMinMax(2, 100);
- LongStatistics longStatsSmall = new LongStatistics();
- longStatsSmall.setMinMax(-17l, 287L);
- DoubleStatistics doubleStatsSmall = new DoubleStatistics();
- doubleStatsSmall.setMinMax(-15.55d, 9.63d);
- BinaryStatistics binaryStatsSmall = new BinaryStatistics();
- binaryStatsSmall.setMinMax(Binary.fromString("as"), Binary.fromString("world"));
- BooleanStatistics boolStats = new BooleanStatistics();
- boolStats.setMinMax(false, true);
-
- //write rows to a file
- Path p = createFile(new Configuration(),
- new RequiredPrimitiveFixture(false, (byte)32, (short)32, 2, 90l, -15.55d, "as"),
- new RequiredPrimitiveFixture(false, (byte)100, (short)100, 100, 287l, -9.0d, "world"),
- new RequiredPrimitiveFixture(true, (byte)2, (short)2, 9, -17l, 9.63d, "hello"));
- final Configuration configuration = new Configuration();
- configuration.setBoolean("parquet.strings.signed-min-max.enabled", true);
- final FileSystem fs = p.getFileSystem(configuration);
- FileStatus fileStatus = fs.getFileStatus(p);
- ParquetMetadata footer = ParquetFileReader.readFooter(configuration, p);
- for(BlockMetaData bmd: footer.getBlocks()) {
- for(ColumnChunkMetaData cmd: bmd.getColumns()) {
- switch(cmd.getType()) {
- case INT32:
- TestUtils.assertStatsValuesEqual(intStatsSmall, cmd.getStatistics());
- break;
- case INT64:
- TestUtils.assertStatsValuesEqual(longStatsSmall, cmd.getStatistics());
- break;
- case DOUBLE:
- TestUtils.assertStatsValuesEqual(doubleStatsSmall, cmd.getStatistics());
- break;
- case BOOLEAN:
- TestUtils.assertStatsValuesEqual(boolStats, cmd.getStatistics());
- break;
- case BINARY:
- // there is also info_string that has no statistics
- if(cmd.getPath().toString() == "[test_string]")
- TestUtils.assertStatsValuesEqual(binaryStatsSmall, cmd.getStatistics());
- break;
- }
+ public void testWriteStatistics() throws Exception {
+ //create correct stats small numbers
+ IntStatistics intStatsSmall = new IntStatistics();
+ intStatsSmall.setMinMax(2, 100);
+ LongStatistics longStatsSmall = new LongStatistics();
+ longStatsSmall.setMinMax(-17l, 287L);
+ DoubleStatistics doubleStatsSmall = new DoubleStatistics();
+ doubleStatsSmall.setMinMax(-15.55d, 9.63d);
+ BinaryStatistics binaryStatsSmall = new BinaryStatistics();
+ binaryStatsSmall.setMinMax(Binary.fromString("as"), Binary.fromString("world"));
+ BooleanStatistics boolStats = new BooleanStatistics();
+ boolStats.setMinMax(false, true);
+
+ //write rows to a file
+ Path p = createFile(new Configuration(),
+ new RequiredPrimitiveFixture(false, (byte) 32, (short) 32, 2, 90l, -15.55d, "as"),
+ new RequiredPrimitiveFixture(false, (byte) 100, (short) 100, 100, 287l, -9.0d, "world"),
+ new RequiredPrimitiveFixture(true, (byte) 2, (short) 2, 9, -17l, 9.63d, "hello"));
+ final Configuration configuration = new Configuration();
+ configuration.setBoolean("parquet.strings.signed-min-max.enabled", true);
+ final FileSystem fs = p.getFileSystem(configuration);
+ FileStatus fileStatus = fs.getFileStatus(p);
+ ParquetMetadata footer = ParquetFileReader.readFooter(configuration, p);
+ for (BlockMetaData bmd : footer.getBlocks()) {
+ for (ColumnChunkMetaData cmd : bmd.getColumns()) {
+ switch (cmd.getType()) {
+ case INT32:
+ TestUtils.assertStatsValuesEqual(intStatsSmall, cmd.getStatistics());
+ break;
+ case INT64:
+ TestUtils.assertStatsValuesEqual(longStatsSmall, cmd.getStatistics());
+ break;
+ case DOUBLE:
+ TestUtils.assertStatsValuesEqual(doubleStatsSmall, cmd.getStatistics());
+ break;
+ case BOOLEAN:
+ TestUtils.assertStatsValuesEqual(boolStats, cmd.getStatistics());
+ break;
+ case BINARY:
+ // there is also info_string that has no statistics
+ if (cmd.getPath().toString() == "[test_string]")
+ TestUtils.assertStatsValuesEqual(binaryStatsSmall, cmd.getStatistics());
+ break;
}
}
- //create correct stats large numbers
- IntStatistics intStatsLarge = new IntStatistics();
- intStatsLarge.setMinMax(-Integer.MAX_VALUE, Integer.MAX_VALUE);
- LongStatistics longStatsLarge = new LongStatistics();
- longStatsLarge.setMinMax(-Long.MAX_VALUE, Long.MAX_VALUE);
- DoubleStatistics doubleStatsLarge = new DoubleStatistics();
- doubleStatsLarge.setMinMax(-Double.MAX_VALUE, Double.MAX_VALUE);
- BinaryStatistics binaryStatsLarge = new BinaryStatistics();
- binaryStatsLarge.setMinMax(Binary.fromString("some small string"),
- Binary.fromString("some very large string here to test in this function"));
- //write rows to a file
- Path p_large = createFile(new Configuration(),
- new RequiredPrimitiveFixture(false, (byte)2, (short)32, -Integer.MAX_VALUE,
- -Long.MAX_VALUE, -Double.MAX_VALUE, "some small string"),
- new RequiredPrimitiveFixture(false, (byte)100, (short)100, Integer.MAX_VALUE,
- Long.MAX_VALUE, Double.MAX_VALUE,
- "some very large string here to test in this function"),
- new RequiredPrimitiveFixture(true, (byte)2, (short)2, 9, -17l, 9.63d, "hello"));
-
- // make new configuration and create file with new large stats
- final Configuration configuration_large = new Configuration();
- configuration.setBoolean("parquet.strings.signed-min-max.enabled", true);
- final FileSystem fs_large = p_large.getFileSystem(configuration_large);
- FileStatus fileStatus_large = fs_large.getFileStatus(p_large);
- ParquetMetadata footer_large = ParquetFileReader.readFooter(configuration_large, p_large);
- for(BlockMetaData bmd: footer_large.getBlocks()) {
- for(ColumnChunkMetaData cmd: bmd.getColumns()) {
- switch(cmd.getType()) {
- case INT32:
- // testing the correct limits of an int32, there are also byte and short, tested earlier
- if(cmd.getPath().toString() == "[test_i32]")
- TestUtils.assertStatsValuesEqual(intStatsLarge, cmd.getStatistics());
- break;
- case INT64:
- TestUtils.assertStatsValuesEqual(longStatsLarge, cmd.getStatistics());
- break;
- case DOUBLE:
- TestUtils.assertStatsValuesEqual(doubleStatsLarge, cmd.getStatistics());
- break;
- case BOOLEAN:
- TestUtils.assertStatsValuesEqual(boolStats, cmd.getStatistics());
- break;
- case BINARY:
- // there is also info_string that has no statistics
- if(cmd.getPath().toString() == "[test_string]")
- TestUtils.assertStatsValuesEqual(binaryStatsLarge, cmd.getStatistics());
- break;
- }
+ }
+ //create correct stats large numbers
+ IntStatistics intStatsLarge = new IntStatistics();
+ intStatsLarge.setMinMax(-Integer.MAX_VALUE, Integer.MAX_VALUE);
+ LongStatistics longStatsLarge = new LongStatistics();
+ longStatsLarge.setMinMax(-Long.MAX_VALUE, Long.MAX_VALUE);
+ DoubleStatistics doubleStatsLarge = new DoubleStatistics();
+ doubleStatsLarge.setMinMax(-Double.MAX_VALUE, Double.MAX_VALUE);
+ BinaryStatistics binaryStatsLarge = new BinaryStatistics();
+ binaryStatsLarge.setMinMax(Binary.fromString("some small string"),
+ Binary.fromString("some very large string here to test in this function"));
+ //write rows to a file
+ Path p_large = createFile(new Configuration(),
+ new RequiredPrimitiveFixture(false, (byte) 2, (short) 32, -Integer.MAX_VALUE,
+ -Long.MAX_VALUE, -Double.MAX_VALUE, "some small string"),
+ new RequiredPrimitiveFixture(false, (byte) 100, (short) 100, Integer.MAX_VALUE,
+ Long.MAX_VALUE, Double.MAX_VALUE,
+ "some very large string here to test in this function"),
+ new RequiredPrimitiveFixture(true, (byte) 2, (short) 2, 9, -17l, 9.63d, "hello"));
+
+ // make new configuration and create file with new large stats
+ final Configuration configuration_large = new Configuration();
+ configuration.setBoolean("parquet.strings.signed-min-max.enabled", true);
+ final FileSystem fs_large = p_large.getFileSystem(configuration_large);
+ FileStatus fileStatus_large = fs_large.getFileStatus(p_large);
+ ParquetMetadata footer_large = ParquetFileReader.readFooter(configuration_large, p_large);
+ for (BlockMetaData bmd : footer_large.getBlocks()) {
+ for (ColumnChunkMetaData cmd : bmd.getColumns()) {
+ switch (cmd.getType()) {
+ case INT32:
+ // testing the correct limits of an int32, there are also byte and short, tested earlier
+ if (cmd.getPath().toString() == "[test_i32]")
+ TestUtils.assertStatsValuesEqual(intStatsLarge, cmd.getStatistics());
+ break;
+ case INT64:
+ TestUtils.assertStatsValuesEqual(longStatsLarge, cmd.getStatistics());
+ break;
+ case DOUBLE:
+ TestUtils.assertStatsValuesEqual(doubleStatsLarge, cmd.getStatistics());
+ break;
+ case BOOLEAN:
+ TestUtils.assertStatsValuesEqual(boolStats, cmd.getStatistics());
+ break;
+ case BINARY:
+ // there is also info_string that has no statistics
+ if (cmd.getPath().toString() == "[test_string]")
+ TestUtils.assertStatsValuesEqual(binaryStatsLarge, cmd.getStatistics());
+ break;
}
}
}
-
- @Test
- public void testWriteFileListOfMap() throws IOException, InterruptedException, TException {
- Map map1 = new HashMap();
- map1.put("key11", "value11");
- map1.put("key12", "value12");
- Map map2 = new HashMap();
- map2.put("key21", "value21");
- final TestMapInList listMap = new TestMapInList("listmap",
- Arrays.asList(map1, map2));
-
- final Path fileToCreate = createFile(new Configuration(), listMap);
-
- ParquetReader reader = createRecordReader(fileToCreate);
-
- Group g = null;
- while((g = reader.read()) != null) {
- assertEquals(listMap.names.size(),
- g.getGroup("names", 0).getFieldRepetitionCount("names_tuple"));
- assertEquals(listMap.names.get(0).size(),
- g.getGroup("names", 0).getGroup("names_tuple", 0).getFieldRepetitionCount("key_value"));
- assertEquals(listMap.names.get(1).size(),
- g.getGroup("names", 0).getGroup("names_tuple", 1).getFieldRepetitionCount("key_value"));
- }
- }
-
- @Test
- public void testWriteFileMapOfList() throws IOException, InterruptedException, TException {
- Map> map = new HashMap>();
- map.put("key", Arrays.asList("val1","val2"));
- final TestListInMap mapList = new TestListInMap("maplist", map);
- final Path fileToCreate = createFile(new Configuration(), mapList);
-
- ParquetReader reader = createRecordReader(fileToCreate);
-
- Group g = null;
- while((g = reader.read()) != null) {
- assertEquals("key",
- g.getGroup("names", 0).getGroup("key_value",0).getBinary("key", 0).toStringUsingUTF8());
- assertEquals(map.get("key").size(),
- g.getGroup("names", 0).getGroup("key_value",0).getGroup("value", 0).getFieldRepetitionCount(0));
- }
}
@Test
public void testWriteFileMapOfLists() throws IOException, InterruptedException, TException {
- Map, List> map = new HashMap,List>();
- map.put(Arrays.asList("key1","key2"), Arrays.asList("val1","val2"));
+ Map, List> map = new HashMap, List>();
+ map.put(Arrays.asList("key1", "key2"), Arrays.asList("val1", "val2"));
final TestListsInMap mapList = new TestListsInMap("maplists", map);
final Path fileToCreate = createFile(new Configuration(), mapList);
ParquetReader reader = createRecordReader(fileToCreate);
Group g = null;
- while((g = reader.read()) != null) {
+ while ((g = reader.read()) != null) {
assertEquals("key1",
- g.getGroup("names", 0).getGroup("key_value",0).getGroup("key", 0).getBinary("key_tuple", 0).toStringUsingUTF8());
+ g.getGroup("names", 0).getGroup("key_value", 0).getGroup("key", 0).getBinary("key_tuple", 0).toStringUsingUTF8());
assertEquals("key2",
- g.getGroup("names", 0).getGroup("key_value",0).getGroup("key", 0).getBinary("key_tuple", 1).toStringUsingUTF8());
+ g.getGroup("names", 0).getGroup("key_value", 0).getGroup("key", 0).getBinary("key_tuple", 1).toStringUsingUTF8());
assertEquals("val1",
- g.getGroup("names", 0).getGroup("key_value",0).getGroup("value", 0).getBinary("value_tuple", 0).toStringUsingUTF8());
+ g.getGroup("names", 0).getGroup("key_value", 0).getGroup("value", 0).getBinary("value_tuple", 0).toStringUsingUTF8());
assertEquals("val2",
- g.getGroup("names", 0).getGroup("key_value",0).getGroup("value", 0).getBinary("value_tuple", 1).toStringUsingUTF8());
- }
- }
-
- @Test
- public void testWriteFileWithThreeLevelsList()
- throws IOException, InterruptedException, TException {
- final AddressBook a = new AddressBook(
- Arrays.asList(
- new Person(
- new Name("Bob", "Roberts"),
- 0,
- "bob.roberts@example.com",
- Arrays.asList(new PhoneNumber("1234567890")))));
-
- Configuration conf = new Configuration();
- conf.set(ParquetWriteProtocol.WRITE_THREE_LEVEL_LISTS, "true");
-
- final Path fileToCreate = createFile(conf, a);
-
- ParquetReader reader = createRecordReader(fileToCreate);
-
- Group g = null;
- int i = 0;
- while((g = reader.read()) != null) {
- assertEquals(a.persons.size(), g.getFieldRepetitionCount("persons"));
- assertEquals(
- a.persons.get(0).email,
- g.getGroup("persons", 0).getGroup(0, 0).getGroup(0, 0).getString("email", 0));
- // just some sanity check, we're testing the various layers somewhere else
- ++i;
- }
- assertEquals("read 1 record", 1, i);
- }
-
- @Test
- public void testWriteFileListOfMapWithThreeLevelLists()
- throws IOException, InterruptedException, TException {
- Map map1 = new HashMap();
- map1.put("key11", "value11");
- map1.put("key12", "value12");
- Map map2 = new HashMap();
- map2.put("key21", "value21");
- final TestMapInList listMap = new TestMapInList("listmap",
- Arrays.asList(map1, map2));
-
- Configuration conf = new Configuration();
- conf.set(ParquetWriteProtocol.WRITE_THREE_LEVEL_LISTS, "true");
-
- final Path fileToCreate = createFile(conf, listMap);
-
- ParquetReader reader = createRecordReader(fileToCreate);
-
- Group g = null;
- while((g = reader.read()) != null) {
- assertEquals(listMap.names.size(),
- g.getGroup("names", 0).getFieldRepetitionCount("list"));
- assertEquals(listMap.names.get(0).size(),
- g.getGroup("names", 0).getGroup("list", 0).
- getGroup("element", 0).getFieldRepetitionCount("key_value"));
- assertEquals(listMap.names.get(1).size(),
- g.getGroup("names", 0).getGroup("list", 1).
- getGroup("element", 0).getFieldRepetitionCount("key_value"));
+ g.getGroup("names", 0).getGroup("key_value", 0).getGroup("value", 0).getBinary("value_tuple", 1).toStringUsingUTF8());
}
}
@@ -332,9 +196,9 @@ private ParquetReader createRecordReader(Path parquetFilePath) throws IOE
return new ParquetReader(parquetFilePath, readSupport);
}
- private > Path createFile(Configuration conf, T... tObjs)
- throws IOException, InterruptedException, TException {
- final Path fileToCreate = new Path("target/test/TestThriftToParquetFileWriter/"+tObjs[0].getClass()+".parquet");
+ private > Path createFile(Configuration conf, T... tObjs)
+ throws IOException, InterruptedException, TException {
+ final Path fileToCreate = new Path("target/test/TestThriftToParquetFileWriter/" + tObjs[0].getClass() + ".parquet");
LOG.info("File created: " + fileToCreate.toString());
final FileSystem fs = fileToCreate.getFileSystem(conf);
if (fs.exists(fileToCreate)) {
@@ -343,17 +207,16 @@ private > Path createFile(Configuration conf, T... tObjs)
}
TProtocolFactory protocolFactory = new TCompactProtocol.Factory();
TaskAttemptID taskId = new TaskAttemptID("local", 0, true, 0, 0);
- ThriftToParquetFileWriter w = new ThriftToParquetFileWriter(fileToCreate, ContextUtil.newTaskAttemptContext(conf, taskId), protocolFactory, (Class extends TBase, ?>>) tObjs[0].getClass());
-
- for(T tObj:tObjs) {
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- final TProtocol protocol = protocolFactory.getProtocol(new TIOStreamTransport(baos));
+ try (ThriftToParquetFileWriter w = new ThriftToParquetFileWriter(fileToCreate, ContextUtil.newTaskAttemptContext(conf, taskId), protocolFactory, (Class extends TBase, ?>>) tObjs[0].getClass())) {
+ for (T tObj : tObjs) {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final TProtocol protocol = protocolFactory.getProtocol(new TIOStreamTransport(baos));
- tObj.write(protocol);
+ tObj.write(protocol);
- w.write(new BytesWritable(baos.toByteArray()));
+ w.write(new BytesWritable(baos.toByteArray()));
+ }
}
- w.close();
return fileToCreate;
}
diff --git a/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetReadProtocol.java b/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetReadProtocol.java
deleted file mode 100644
index b71305888b..0000000000
--- a/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetReadProtocol.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.thrift;
-
-import static com.twitter.data.proto.tutorial.thrift.PhoneType.MOBILE;
-import static org.junit.Assert.assertEquals;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.parquet.column.ParquetProperties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import thrift.test.OneOfEach;
-
-import org.apache.thrift.TBase;
-import org.apache.thrift.TException;
-import org.junit.Test;
-
-import org.apache.parquet.column.impl.ColumnWriteStoreV1;
-import org.apache.parquet.column.page.mem.MemPageStore;
-import org.apache.parquet.io.ColumnIOFactory;
-import org.apache.parquet.io.MessageColumnIO;
-import org.apache.parquet.io.RecordReader;
-import org.apache.parquet.io.api.RecordConsumer;
-import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.thrift.struct.ThriftType.StructType;
-
-import com.twitter.data.proto.tutorial.thrift.AddressBook;
-import com.twitter.data.proto.tutorial.thrift.Name;
-import com.twitter.data.proto.tutorial.thrift.Person;
-import com.twitter.data.proto.tutorial.thrift.PhoneNumber;
-import com.twitter.elephantbird.thrift.test.TestMap;
-import com.twitter.elephantbird.thrift.test.TestName;
-import com.twitter.elephantbird.thrift.test.TestNameList;
-import com.twitter.elephantbird.thrift.test.TestNameSet;
-import com.twitter.elephantbird.thrift.test.TestPerson;
-import com.twitter.elephantbird.thrift.test.TestPhoneType;
-import com.twitter.elephantbird.thrift.test.TestStructInMap;
-
-public class TestParquetReadProtocol {
- private static final Logger LOG = LoggerFactory.getLogger(TestParquetReadProtocol.class);
-
- @Test
- public void testList() throws TException {
- final List names = new ArrayList();
- names.add("John");
- names.add("Jack");
- final TestNameList o = new TestNameList("name", names);
- validate(o);
- }
-
- @Test
- public void testSet() throws TException {
- final Set names = new HashSet();
- names.add("John");
- names.add("Jack");
- final TestNameSet o = new TestNameSet("name", names);
- validate(o);
- }
-
- @Test
- public void testReadEmpty() throws Exception {
- AddressBook expected = new AddressBook();
- validate(expected);
- }
-
- @Test
- public void testOneOfEach() throws TException {
- final List bytes = new ArrayList();
- bytes.add((byte)1);
- final List shorts = new ArrayList();
- shorts.add((short)1);
- final List longs = new ArrayList();
- longs.add((long)1);
- OneOfEach a = new OneOfEach(
- true, false, (byte)8, (short)16, (int)32, (long)64, (double)1234, "string", "å", false,
- ByteBuffer.wrap("a".getBytes()), bytes, shorts, longs);
- validate(a);
- }
-
- @Test
- public void testRead() throws Exception {
- final PhoneNumber phoneNumber = new PhoneNumber("5555555555");
- phoneNumber.type = MOBILE;
- List persons = Arrays.asList(
- new Person(
- new Name("john", "johson"),
- 1,
- "john@johnson.org",
- Arrays.asList(phoneNumber)
- ),
- new Person(
- new Name("jack", "jackson"),
- 2,
- "jack@jackson.org",
- Arrays.asList(new PhoneNumber("5555555556"))
- )
- );
- AddressBook expected = new AddressBook(persons);
- validate(expected);
- }
-
- @Test
- public void testMap() throws Exception {
- final Map map = new HashMap();
- map.put("foo", "bar");
- TestMap testMap = new TestMap("map_name", map);
- validate(testMap);
- }
-
- @Test
- public void testStructInMap() throws Exception {
- final Map map = new HashMap();
- map.put("foo", new TestPerson(new TestName("john", "johnson"), new HashMap()));
- final Map stringToIntMap = Collections.singletonMap("bar", 10);
- TestStructInMap testMap = new TestStructInMap("map_name", map, stringToIntMap);
- validate(testMap);
- }
-
- private > void validate(T expected) throws TException {
- @SuppressWarnings("unchecked")
- final Class thriftClass = (Class)expected.getClass();
- final MemPageStore memPageStore = new MemPageStore(1);
- final ThriftSchemaConverter schemaConverter = new ThriftSchemaConverter();
- final MessageType schema = schemaConverter.convert(thriftClass);
- LOG.info("{}", schema);
- final MessageColumnIO columnIO = new ColumnIOFactory(true).getColumnIO(schema);
- final ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore,
- ParquetProperties.builder()
- .withPageSize(10000)
- .withDictionaryEncoding(false)
- .build());
- final RecordConsumer recordWriter = columnIO.getRecordWriter(columns);
- final StructType thriftType = schemaConverter.toStructType(thriftClass);
- ParquetWriteProtocol parquetWriteProtocol = new ParquetWriteProtocol(recordWriter, columnIO, thriftType);
-
- expected.write(parquetWriteProtocol);
- recordWriter.flush();
- columns.flush();
-
- ThriftRecordConverter converter = new TBaseRecordConverter(thriftClass, schema, thriftType);
- final RecordReader recordReader = columnIO.getRecordReader(memPageStore, converter);
-
- final T result = recordReader.read();
-
- assertEquals(expected, result);
- }
-
-}
diff --git a/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetWriteProtocol.java b/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetWriteProtocol.java
deleted file mode 100644
index 1311d76904..0000000000
--- a/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetWriteProtocol.java
+++ /dev/null
@@ -1,719 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.thrift;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-
-import com.twitter.elephantbird.thrift.test.TestMapInList;
-import com.twitter.elephantbird.thrift.test.TestNameSet;
-import org.apache.hadoop.conf.Configuration;
-import org.junit.ComparisonFailure;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import thrift.test.OneOfEach;
-
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.thrift.TBase;
-import org.apache.thrift.TException;
-import org.junit.Test;
-
-import org.apache.parquet.io.ColumnIOFactory;
-import org.apache.parquet.io.ExpectationValidatingRecordConsumer;
-import org.apache.parquet.io.MessageColumnIO;
-import org.apache.parquet.io.RecordConsumerLoggingWrapper;
-import org.apache.parquet.pig.PigSchemaConverter;
-import org.apache.parquet.pig.TupleWriteSupport;
-import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.thrift.struct.ThriftType.StructType;
-
-import com.twitter.data.proto.tutorial.thrift.AddressBook;
-import com.twitter.data.proto.tutorial.thrift.Name;
-import com.twitter.data.proto.tutorial.thrift.Person;
-import com.twitter.data.proto.tutorial.thrift.PhoneNumber;
-import com.twitter.data.proto.tutorial.thrift.PhoneType;
-import com.twitter.elephantbird.pig.util.ThriftToPig;
-import com.twitter.elephantbird.thrift.test.TestMap;
-import com.twitter.elephantbird.thrift.test.TestMapInSet;
-import com.twitter.elephantbird.thrift.test.TestName;
-import com.twitter.elephantbird.thrift.test.TestNameList;
-import com.twitter.elephantbird.thrift.test.TestPerson;
-import com.twitter.elephantbird.thrift.test.TestPhoneType;
-import com.twitter.elephantbird.thrift.test.TestStructInMap;
-
-
-public class TestParquetWriteProtocol {
- private static final Logger LOG = LoggerFactory.getLogger(TestParquetWriteProtocol.class);
- @Test
- public void testMap() throws Exception {
- String[] expectations = {
- "startMessage()",
- "startField(name, 0)",
- "addBinary(map_name)",
- "endField(name, 0)",
- "startField(names, 1)",
- "startGroup()",
- "startField(key_value, 0)",
- "startGroup()",
- "startField(key, 0)",
- "addBinary(foo)",
- "endField(key, 0)",
- "startField(value, 1)",
- "addBinary(bar)",
- "endField(value, 1)",
- "endGroup()",
- "startGroup()",
- "startField(key, 0)",
- "addBinary(foo2)",
- "endField(key, 0)",
- "startField(value, 1)",
- "addBinary(bar2)",
- "endField(value, 1)",
- "endGroup()",
- "endField(key_value, 0)",
- "endGroup()",
- "endField(names, 1)",
- "endMessage()"
- };
- String[] expectationsAlt = {
- "startMessage()",
- "startField(name, 0)",
- "addBinary(map_name)",
- "endField(name, 0)",
- "startField(names, 1)",
- "startGroup()",
- "startField(key_value, 0)",
- "startGroup()",
- "startField(key, 0)",
- "addBinary(foo2)",
- "endField(key, 0)",
- "startField(value, 1)",
- "addBinary(bar2)",
- "endField(value, 1)",
- "endGroup()",
- "startGroup()",
- "startField(key, 0)",
- "addBinary(foo)",
- "endField(key, 0)",
- "startField(value, 1)",
- "addBinary(bar)",
- "endField(value, 1)",
- "endGroup()",
- "endField(key_value, 0)",
- "endGroup()",
- "endField(names, 1)",
- "endMessage()"
- };
-
- final Map map = new TreeMap();
- map.put("foo", "bar");
- map.put("foo2", "bar2");
- TestMap testMap = new TestMap("map_name", map);
- try {
- validatePig(expectations, testMap);
- } catch (ComparisonFailure e) {
- // This can happen despite using a stable TreeMap, since ThriftToPig#toPigMap
- // in com.twitter.elephantbird.pig.util creates a HashMap.
- // So we test with the map elements in reverse order
- validatePig(expectationsAlt, testMap);
- }
- validateThrift(expectations, testMap);
- }
-
-
- /**
- * @see TestThriftToPigCompatibility
- * @throws Exception
- */
- @Test
- public void testMapInSet() throws Exception {
- String[] pigExpectations = {
- "startMessage()",
- "startField(name, 0)",
- "addBinary(top)",
- "endField(name, 0)",
- "startField(names, 1)", // set: optional field
- "startGroup()",
- "startField(t, 0)", // repeated field
- "startGroup()",
- "startField(names_tuple, 0)", // map: optional field
- "startGroup()",
- "startField(key_value, 0)", // repeated field
- "startGroup()",
- "startField(key, 0)", // key
- "addBinary(foo)",
- "endField(key, 0)",
- "startField(value, 1)", // value
- "addBinary(bar)",
- "endField(value, 1)",
- "endGroup()",
- "endField(key_value, 0)",
- "endGroup()",
- "endField(names_tuple, 0)",
- "endGroup()",
- "endField(t, 0)",
- "endGroup()",
- "endField(names, 1)",
- "endMessage()"
- };
-
- final Set