diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/ITTestEncryptionOptions.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/ITTestEncryptionOptions.java index dfa1e27920..27e7acc3ec 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/ITTestEncryptionOptions.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/ITTestEncryptionOptions.java @@ -19,7 +19,6 @@ package org.apache.parquet.hadoop; import java.io.IOException; -import okhttp3.OkHttpClient; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ErrorCollector; @@ -37,10 +36,8 @@ public class ITTestEncryptionOptions { TestEncryptionOptions test = new TestEncryptionOptions(); - OkHttpClient httpClient = new OkHttpClient(); - @Test public void testInteropReadEncryptedParquetFiles() throws IOException { - test.testInteropReadEncryptedParquetFiles(errorCollector, httpClient); + test.testInteropReadEncryptedParquetFiles(errorCollector); } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/InterOpTester.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/InterOpTester.java new file mode 100644 index 0000000000..2618fcb219 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/InterOpTester.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.hadoop; + +import java.io.IOException; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class InterOpTester { + private static final String PARQUET_TESTING_REPO = "https://github.com/apache/parquet-testing/raw/"; + private static final String PARQUET_TESTING_PATH = "target/parquet-testing/"; + private static final Logger LOG = LoggerFactory.getLogger(InterOpTester.class); + private OkHttpClient httpClient = new OkHttpClient(); + + public Path GetInterOpFile(String fileName, String changeset) throws IOException { + return GetInterOpFile(fileName, changeset, "data"); + } + + /** + * Get interOp file from parquet-testing repo, possibly downloading it. + * + * @param fileName The name of the file to get. + * @param changeset The changeset ID in the parquet-testing repo. + * @param subdir The subdirectory the file lives in inside the repo (for example "data"). + * @return Path The local path to the interOp file. + */ + public Path GetInterOpFile(String fileName, String changeset, String subdir) throws IOException { + Path rootPath = new Path(PARQUET_TESTING_PATH, subdir); + Configuration conf = new Configuration(); + FileSystem fs = rootPath.getFileSystem(conf); + if (!fs.exists(rootPath)) { + LOG.info("Create folder for interOp files: " + rootPath); + if (!fs.mkdirs(rootPath)) { + throw new IOException("Cannot create path " + rootPath); + } + } + + Path file = new Path(rootPath, fileName); + if (!fs.exists(file)) { + String downloadUrl = String.format("%s/%s/%s/%s", PARQUET_TESTING_REPO, changeset, subdir, fileName); + LOG.info("Download interOp file: " + downloadUrl); + Request request = new Request.Builder().url(downloadUrl).build(); + try (Response response = httpClient.newCall(request).execute()) { + if (!response.isSuccessful()) { + throw new IOException("Failed to download file: " + response); + } + try (FSDataOutputStream fdos = fs.create(file)) { + fdos.write(response.body().bytes()); + } + } + } + return file; + } +} diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestEncryptionOptions.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestEncryptionOptions.java index 39e3fdccf7..ee572449b2 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestEncryptionOptions.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestEncryptionOptions.java @@ -29,12 +29,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import okhttp3.OkHttpClient; -import okhttp3.Request; -import okhttp3.Response; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.parquet.crypto.ColumnDecryptionProperties; import org.apache.parquet.crypto.ColumnEncryptionProperties; @@ -124,8 +119,6 @@ */ public class TestEncryptionOptions { private static final Logger LOG = LoggerFactory.getLogger(TestEncryptionOptions.class); - // The link includes a reference to a specific commit. To take a newer version - update this link. - private static final String PARQUET_TESTING_REPO = "https://github.com/apache/parquet-testing/raw/40379b3/data/"; @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @@ -134,8 +127,8 @@ public class TestEncryptionOptions { public ErrorCollector localErrorCollector = new ErrorCollector(); private ErrorCollector errorCollector; - - private static String PARQUET_TESTING_PATH = "target/parquet-testing/data"; + private InterOpTester interop = new InterOpTester(); + private static final String CHANGESET = "40379b3"; private static final byte[] FOOTER_ENCRYPTION_KEY = "0123456789012345".getBytes(); private static final byte[][] COLUMN_ENCRYPTION_KEYS = { @@ -348,19 +341,14 @@ public void testWriteReadEncryptedParquetFiles() throws IOException { * It's not moved into a separate file since it shares many utilities with the unit tests in this file. * * @param errorCollector - the error collector of the integration tests suite - * @param httpClient - HTTP client to be used for fetching parquet files for interop tests * @throws IOException */ - public void testInteropReadEncryptedParquetFiles(ErrorCollector errorCollector, OkHttpClient httpClient) - throws IOException { + public void testInteropReadEncryptedParquetFiles(ErrorCollector errorCollector) throws IOException { this.errorCollector = errorCollector; - Path rootPath = new Path(PARQUET_TESTING_PATH); - LOG.info("======== testInteropReadEncryptedParquetFiles {} ========", rootPath.toString()); boolean readOnlyEncrypted = true; - downloadInteropFiles(rootPath, readOnlyEncrypted, httpClient); byte[] AADPrefix = AAD_PREFIX_STRING.getBytes(StandardCharsets.UTF_8); // Read using various decryption configurations. - testInteropReadEncryptedParquetFiles(rootPath, readOnlyEncrypted, LINEAR_DATA); + testInteropReadEncryptedParquetFiles(readOnlyEncrypted, LINEAR_DATA); } private void testWriteEncryptedParquetFiles(Path root, List data) throws IOException { @@ -505,48 +493,7 @@ private void testReadEncryptedParquetFiles(Path root, List data) { } } - private void downloadInteropFiles(Path rootPath, boolean readOnlyEncrypted, OkHttpClient httpClient) - throws IOException { - LOG.info("Download interop files if needed"); - Configuration conf = new Configuration(); - FileSystem fs = rootPath.getFileSystem(conf); - LOG.info(rootPath + " exists?: " + fs.exists(rootPath)); - if (!fs.exists(rootPath)) { - LOG.info("Create folder for interop files: " + rootPath); - if (!fs.mkdirs(rootPath)) { - throw new IOException("Cannot create path " + rootPath); - } - } - - EncryptionConfiguration[] encryptionConfigurations = EncryptionConfiguration.values(); - for (EncryptionConfiguration encryptionConfiguration : encryptionConfigurations) { - if (readOnlyEncrypted && (EncryptionConfiguration.NO_ENCRYPTION == encryptionConfiguration)) { - continue; - } - if (EncryptionConfiguration.UNIFORM_ENCRYPTION_PLAINTEXT_FOOTER == encryptionConfiguration) { - continue; - } - if (EncryptionConfiguration.ENCRYPT_COLUMNS_PLAIN_FOOTER_COMPLETE == encryptionConfiguration) { - continue; - } - String fileName = getFileName(encryptionConfiguration); - Path file = new Path(rootPath, fileName); - if (!fs.exists(file)) { - String downloadUrl = PARQUET_TESTING_REPO + fileName; - LOG.info("Download interop file: " + downloadUrl); - Request request = new Request.Builder().url(downloadUrl).build(); - Response response = httpClient.newCall(request).execute(); - if (!response.isSuccessful()) { - throw new IOException("Failed to download file: " + response); - } - try (FSDataOutputStream fdos = fs.create(file)) { - fdos.write(response.body().bytes()); - } - } - } - } - - private void testInteropReadEncryptedParquetFiles(Path root, boolean readOnlyEncrypted, List data) + private void testInteropReadEncryptedParquetFiles(boolean readOnlyEncrypted, List data) throws IOException { Configuration conf = new Configuration(); DecryptionConfiguration[] decryptionConfigurations = DecryptionConfiguration.values(); @@ -562,7 +509,7 @@ private void testInteropReadEncryptedParquetFiles(Path root, boolean readOnlyEnc if (EncryptionConfiguration.ENCRYPT_COLUMNS_PLAIN_FOOTER_COMPLETE == encryptionConfiguration) { continue; } - Path file = new Path(root, getFileName(encryptionConfiguration)); + Path file = interop.GetInterOpFile(getFileName(encryptionConfiguration), CHANGESET); LOG.info("==> Decryption configuration {}", decryptionConfiguration); FileDecryptionProperties fileDecryptionProperties = decryptionConfiguration.getDecryptionProperties(); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInterOpReadByteStreamSplit.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInterOpReadByteStreamSplit.java new file mode 100644 index 0000000000..6e7b1581c7 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInterOpReadByteStreamSplit.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.hadoop; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.junit.Test; + +public class TestInterOpReadByteStreamSplit { + private InterOpTester interop = new InterOpTester(); + private static final String FLOATS_FILE = "byte_stream_split.zstd.parquet"; + private static final String CHANGESET = "4cb3cff"; + + @Test + public void testReadFloats() throws IOException { + Path floatsFile = interop.GetInterOpFile(FLOATS_FILE, CHANGESET); + final int expectRows = 300; + + try (ParquetReader reader = + ParquetReader.builder(new GroupReadSupport(), floatsFile).build()) { + for (int i = 0; i < expectRows; ++i) { + Group group = reader.read(); + assertTrue(group != null); + float fval = group.getFloat("f32", 0); + double dval = group.getDouble("f64", 0); + // Values are from the normal distribution + assertTrue(Math.abs(fval) < 4.0); + assertTrue(Math.abs(dval) < 4.0); + switch (i) { + case 0: + assertEquals(1.7640524f, fval, 0.0); + assertEquals(-1.3065268517353166, dval, 0.0); + break; + case 1: + assertEquals(0.4001572f, fval, 0.0); + assertEquals(1.658130679618188, dval, 0.0); + break; + case expectRows - 2: + assertEquals(-0.39944902f, fval, 0.0); + assertEquals(-0.9301565025243212, dval, 0.0); + break; + case expectRows - 1: + assertEquals(0.37005588f, fval, 0.0); + assertEquals(-0.17858909208732915, dval, 0.0); + break; + } + } + assertTrue(reader.read() == null); + } + } +} diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInterOpReadFloat16.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInterOpReadFloat16.java index 92d5fe8e86..9ae3cfe297 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInterOpReadFloat16.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInterOpReadFloat16.java @@ -24,12 +24,7 @@ import static org.junit.Assert.fail; import java.io.IOException; -import okhttp3.OkHttpClient; -import okhttp3.Request; -import okhttp3.Response; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.parquet.example.data.Group; import org.apache.parquet.hadoop.example.GroupReadSupport; @@ -37,26 +32,18 @@ import org.apache.parquet.hadoop.util.HadoopInputFile; import org.apache.parquet.io.api.Binary; import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class TestInterOpReadFloat16 { - - // The link includes a reference to a specific commit. To take a newer version - update this link. - private static final String PARQUET_TESTING_REPO = "https://github.com/apache/parquet-testing/raw/da467da/data/"; - private static String PARQUET_TESTING_PATH = "target/parquet-testing/data"; private static String FLOAT16_NONZEROS_NANS_FILE = "float16_nonzeros_and_nans.parquet"; private static String FLOAT16_ZEROS_NANS_FILE = "float16_zeros_and_nans.parquet"; + private static final String CHANGESET = "da467da"; - private static final Logger LOG = LoggerFactory.getLogger(TestInterOpReadFloat16.class); - private OkHttpClient httpClient = new OkHttpClient(); + private InterOpTester interop = new InterOpTester(); @Test public void testInterOpReadFloat16NonZerosAndNansParquetFiles() throws IOException { - Path rootPath = new Path(PARQUET_TESTING_PATH); - LOG.info("======== testInterOpReadFloat16NonZerosAndNansParquetFiles {} ========", rootPath); + Path filePath = interop.GetInterOpFile(FLOAT16_NONZEROS_NANS_FILE, CHANGESET); - Path filePath = downloadInterOpFiles(rootPath, FLOAT16_NONZEROS_NANS_FILE, httpClient); final int expectRows = 8; Binary[] c0ExpectValues = { null, @@ -100,10 +87,8 @@ public void testInterOpReadFloat16NonZerosAndNansParquetFiles() throws IOExcepti @Test public void testInterOpReadFloat16ZerosAndNansParquetFiles() throws IOException { - Path rootPath = new Path(PARQUET_TESTING_PATH); - LOG.info("======== testInterOpReadFloat16ZerosAndNansParquetFiles {} ========", rootPath); + Path filePath = interop.GetInterOpFile(FLOAT16_ZEROS_NANS_FILE, "da467da"); - Path filePath = downloadInterOpFiles(rootPath, FLOAT16_ZEROS_NANS_FILE, httpClient); final int expectRows = 3; Binary[] c0ExpectValues = { null, @@ -138,32 +123,4 @@ public void testInterOpReadFloat16ZerosAndNansParquetFiles() throws IOException } } } - - private Path downloadInterOpFiles(Path rootPath, String fileName, OkHttpClient httpClient) throws IOException { - LOG.info("Download interOp files if needed"); - Configuration conf = new Configuration(); - FileSystem fs = rootPath.getFileSystem(conf); - LOG.info(rootPath + " exists?: " + fs.exists(rootPath)); - if (!fs.exists(rootPath)) { - LOG.info("Create folder for interOp files: " + rootPath); - if (!fs.mkdirs(rootPath)) { - throw new IOException("Cannot create path " + rootPath); - } - } - - Path file = new Path(rootPath, fileName); - if (!fs.exists(file)) { - String downloadUrl = PARQUET_TESTING_REPO + fileName; - LOG.info("Download interOp file: " + downloadUrl); - Request request = new Request.Builder().url(downloadUrl).build(); - Response response = httpClient.newCall(request).execute(); - if (!response.isSuccessful()) { - throw new IOException("Failed to download file: " + response); - } - try (FSDataOutputStream fdos = fs.create(file)) { - fdos.write(response.body().bytes()); - } - } - return file; - } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestInteropReadLz4RawCodec.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestInteropReadLz4RawCodec.java index 2c85a5fde2..31263f995a 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestInteropReadLz4RawCodec.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestInteropReadLz4RawCodec.java @@ -23,38 +23,24 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; -import okhttp3.OkHttpClient; -import okhttp3.Request; -import okhttp3.Response; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.parquet.example.data.Group; +import org.apache.parquet.hadoop.InterOpTester; import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.example.GroupReadSupport; import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class TestInteropReadLz4RawCodec { - - // The link includes a reference to a specific commit. To take a newer version - update this link. - private static final String PARQUET_TESTING_REPO = "https://github.com/apache/parquet-testing/raw/19fcd4d/data/"; - private static String PARQUET_TESTING_PATH = "target/parquet-testing/data"; + private static final String CHANGESET = "19fcd4d"; private static String SIMPLE_FILE = "lz4_raw_compressed.parquet"; private static String LARGER_FILE = "lz4_raw_compressed_larger.parquet"; - private static final Logger LOG = LoggerFactory.getLogger(TestInteropReadLz4RawCodec.class); - private OkHttpClient httpClient = new OkHttpClient(); + private InterOpTester interop = new InterOpTester(); @Test public void testInteropReadLz4RawSimpleParquetFiles() throws IOException { - Path rootPath = new Path(PARQUET_TESTING_PATH); - LOG.info("======== testInteropReadLz4RawSimpleParquetFiles {} ========", rootPath.toString()); - // Test simple parquet file with lz4 raw compressed - Path simpleFile = downloadInteropFiles(rootPath, SIMPLE_FILE, httpClient); + Path simpleFile = interop.GetInterOpFile(SIMPLE_FILE, CHANGESET); final int expectRows = 4; long[] c0ExpectValues = {1593604800, 1593604800, 1593604801, 1593604801}; String[] c1ExpectValues = {"abc", "def", "abc", "def"}; @@ -75,12 +61,9 @@ public void testInteropReadLz4RawSimpleParquetFiles() throws IOException { @Test public void testInteropReadLz4RawLargerParquetFiles() throws IOException { - Path rootPath = new Path(PARQUET_TESTING_PATH); - LOG.info("======== testInteropReadLz4RawLargerParquetFiles {} ========", rootPath.toString()); - // Test larger parquet file with lz4 raw compressed final int expectRows = 10000; - Path largerFile = downloadInteropFiles(rootPath, LARGER_FILE, httpClient); + Path largerFile = interop.GetInterOpFile(LARGER_FILE, CHANGESET); String[] c0ExpectValues = { "c7ce6bef-d5b0-4863-b199-8ea8c7fb117b", "e8fb9197-cb9f-4118-b67f-fbfa65f61843", @@ -102,32 +85,4 @@ public void testInteropReadLz4RawLargerParquetFiles() throws IOException { assertTrue(reader.read() == null); } } - - private Path downloadInteropFiles(Path rootPath, String fileName, OkHttpClient httpClient) throws IOException { - LOG.info("Download interop files if needed"); - Configuration conf = new Configuration(); - FileSystem fs = rootPath.getFileSystem(conf); - LOG.info(rootPath + " exists?: " + fs.exists(rootPath)); - if (!fs.exists(rootPath)) { - LOG.info("Create folder for interop files: " + rootPath); - if (!fs.mkdirs(rootPath)) { - throw new IOException("Cannot create path " + rootPath); - } - } - - Path file = new Path(rootPath, fileName); - if (!fs.exists(file)) { - String downloadUrl = PARQUET_TESTING_REPO + fileName; - LOG.info("Download interop file: " + downloadUrl); - Request request = new Request.Builder().url(downloadUrl).build(); - Response response = httpClient.newCall(request).execute(); - if (!response.isSuccessful()) { - throw new IOException("Failed to download file: " + response); - } - try (FSDataOutputStream fdos = fs.create(file)) { - fdos.write(response.body().bytes()); - } - } - return file; - } }