Skip to content

Commit

Permalink
PARQUET-2418: Add integration test for BYTE_STREAM_SPLIT (#1255)
Browse files Browse the repository at this point in the history
  • Loading branch information
pitrou authored Jan 19, 2024
1 parent d03b49b commit 8264d8b
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.parquet.hadoop;

import java.io.IOException;
import okhttp3.OkHttpClient;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ErrorCollector;
Expand All @@ -37,10 +36,8 @@ public class ITTestEncryptionOptions {

TestEncryptionOptions test = new TestEncryptionOptions();

OkHttpClient httpClient = new OkHttpClient();

@Test
public void testInteropReadEncryptedParquetFiles() throws IOException {
test.testInteropReadEncryptedParquetFiles(errorCollector, httpClient);
test.testInteropReadEncryptedParquetFiles(errorCollector);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.parquet.hadoop;

import java.io.IOException;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InterOpTester {
private static final String PARQUET_TESTING_REPO = "https://github.com/apache/parquet-testing/raw/";
private static final String PARQUET_TESTING_PATH = "target/parquet-testing/";
private static final Logger LOG = LoggerFactory.getLogger(InterOpTester.class);
private OkHttpClient httpClient = new OkHttpClient();

public Path GetInterOpFile(String fileName, String changeset) throws IOException {
return GetInterOpFile(fileName, changeset, "data");
}

/**
* Get interOp file from parquet-testing repo, possibly downloading it.
*
* @param fileName The name of the file to get.
* @param changeset The changeset ID in the parquet-testing repo.
* @param subdir The subdirectory the file lives in inside the repo (for example "data").
* @return Path The local path to the interOp file.
*/
public Path GetInterOpFile(String fileName, String changeset, String subdir) throws IOException {
Path rootPath = new Path(PARQUET_TESTING_PATH, subdir);
Configuration conf = new Configuration();
FileSystem fs = rootPath.getFileSystem(conf);
if (!fs.exists(rootPath)) {
LOG.info("Create folder for interOp files: " + rootPath);
if (!fs.mkdirs(rootPath)) {
throw new IOException("Cannot create path " + rootPath);
}
}

Path file = new Path(rootPath, fileName);
if (!fs.exists(file)) {
String downloadUrl = String.format("%s/%s/%s/%s", PARQUET_TESTING_REPO, changeset, subdir, fileName);
LOG.info("Download interOp file: " + downloadUrl);
Request request = new Request.Builder().url(downloadUrl).build();
try (Response response = httpClient.newCall(request).execute()) {
if (!response.isSuccessful()) {
throw new IOException("Failed to download file: " + response);
}
try (FSDataOutputStream fdos = fs.create(file)) {
fdos.write(response.body().bytes());
}
}
}
return file;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.crypto.ColumnDecryptionProperties;
import org.apache.parquet.crypto.ColumnEncryptionProperties;
Expand Down Expand Up @@ -124,8 +119,6 @@
*/
public class TestEncryptionOptions {
private static final Logger LOG = LoggerFactory.getLogger(TestEncryptionOptions.class);
// The link includes a reference to a specific commit. To take a newer version - update this link.
private static final String PARQUET_TESTING_REPO = "https://github.com/apache/parquet-testing/raw/40379b3/data/";

@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
Expand All @@ -134,8 +127,8 @@ public class TestEncryptionOptions {
public ErrorCollector localErrorCollector = new ErrorCollector();

private ErrorCollector errorCollector;

private static String PARQUET_TESTING_PATH = "target/parquet-testing/data";
private InterOpTester interop = new InterOpTester();
private static final String CHANGESET = "40379b3";

private static final byte[] FOOTER_ENCRYPTION_KEY = "0123456789012345".getBytes();
private static final byte[][] COLUMN_ENCRYPTION_KEYS = {
Expand Down Expand Up @@ -348,19 +341,14 @@ public void testWriteReadEncryptedParquetFiles() throws IOException {
* It's not moved into a separate file since it shares many utilities with the unit tests in this file.
*
* @param errorCollector - the error collector of the integration tests suite
* @param httpClient - HTTP client to be used for fetching parquet files for interop tests
* @throws IOException
*/
public void testInteropReadEncryptedParquetFiles(ErrorCollector errorCollector, OkHttpClient httpClient)
throws IOException {
public void testInteropReadEncryptedParquetFiles(ErrorCollector errorCollector) throws IOException {
this.errorCollector = errorCollector;
Path rootPath = new Path(PARQUET_TESTING_PATH);
LOG.info("======== testInteropReadEncryptedParquetFiles {} ========", rootPath.toString());
boolean readOnlyEncrypted = true;
downloadInteropFiles(rootPath, readOnlyEncrypted, httpClient);
byte[] AADPrefix = AAD_PREFIX_STRING.getBytes(StandardCharsets.UTF_8);
// Read using various decryption configurations.
testInteropReadEncryptedParquetFiles(rootPath, readOnlyEncrypted, LINEAR_DATA);
testInteropReadEncryptedParquetFiles(readOnlyEncrypted, LINEAR_DATA);
}

private void testWriteEncryptedParquetFiles(Path root, List<SingleRow> data) throws IOException {
Expand Down Expand Up @@ -505,48 +493,7 @@ private void testReadEncryptedParquetFiles(Path root, List<SingleRow> data) {
}
}

private void downloadInteropFiles(Path rootPath, boolean readOnlyEncrypted, OkHttpClient httpClient)
throws IOException {
LOG.info("Download interop files if needed");
Configuration conf = new Configuration();
FileSystem fs = rootPath.getFileSystem(conf);
LOG.info(rootPath + " exists?: " + fs.exists(rootPath));
if (!fs.exists(rootPath)) {
LOG.info("Create folder for interop files: " + rootPath);
if (!fs.mkdirs(rootPath)) {
throw new IOException("Cannot create path " + rootPath);
}
}

EncryptionConfiguration[] encryptionConfigurations = EncryptionConfiguration.values();
for (EncryptionConfiguration encryptionConfiguration : encryptionConfigurations) {
if (readOnlyEncrypted && (EncryptionConfiguration.NO_ENCRYPTION == encryptionConfiguration)) {
continue;
}
if (EncryptionConfiguration.UNIFORM_ENCRYPTION_PLAINTEXT_FOOTER == encryptionConfiguration) {
continue;
}
if (EncryptionConfiguration.ENCRYPT_COLUMNS_PLAIN_FOOTER_COMPLETE == encryptionConfiguration) {
continue;
}
String fileName = getFileName(encryptionConfiguration);
Path file = new Path(rootPath, fileName);
if (!fs.exists(file)) {
String downloadUrl = PARQUET_TESTING_REPO + fileName;
LOG.info("Download interop file: " + downloadUrl);
Request request = new Request.Builder().url(downloadUrl).build();
Response response = httpClient.newCall(request).execute();
if (!response.isSuccessful()) {
throw new IOException("Failed to download file: " + response);
}
try (FSDataOutputStream fdos = fs.create(file)) {
fdos.write(response.body().bytes());
}
}
}
}

private void testInteropReadEncryptedParquetFiles(Path root, boolean readOnlyEncrypted, List<SingleRow> data)
private void testInteropReadEncryptedParquetFiles(boolean readOnlyEncrypted, List<SingleRow> data)
throws IOException {
Configuration conf = new Configuration();
DecryptionConfiguration[] decryptionConfigurations = DecryptionConfiguration.values();
Expand All @@ -562,7 +509,7 @@ private void testInteropReadEncryptedParquetFiles(Path root, boolean readOnlyEnc
if (EncryptionConfiguration.ENCRYPT_COLUMNS_PLAIN_FOOTER_COMPLETE == encryptionConfiguration) {
continue;
}
Path file = new Path(root, getFileName(encryptionConfiguration));
Path file = interop.GetInterOpFile(getFileName(encryptionConfiguration), CHANGESET);
LOG.info("==> Decryption configuration {}", decryptionConfiguration);
FileDecryptionProperties fileDecryptionProperties = decryptionConfiguration.getDecryptionProperties();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.parquet.hadoop;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.junit.Test;

public class TestInterOpReadByteStreamSplit {
private InterOpTester interop = new InterOpTester();
private static final String FLOATS_FILE = "byte_stream_split.zstd.parquet";
private static final String CHANGESET = "4cb3cff";

@Test
public void testReadFloats() throws IOException {
Path floatsFile = interop.GetInterOpFile(FLOATS_FILE, CHANGESET);
final int expectRows = 300;

try (ParquetReader<Group> reader =
ParquetReader.builder(new GroupReadSupport(), floatsFile).build()) {
for (int i = 0; i < expectRows; ++i) {
Group group = reader.read();
assertTrue(group != null);
float fval = group.getFloat("f32", 0);
double dval = group.getDouble("f64", 0);
// Values are from the normal distribution
assertTrue(Math.abs(fval) < 4.0);
assertTrue(Math.abs(dval) < 4.0);
switch (i) {
case 0:
assertEquals(1.7640524f, fval, 0.0);
assertEquals(-1.3065268517353166, dval, 0.0);
break;
case 1:
assertEquals(0.4001572f, fval, 0.0);
assertEquals(1.658130679618188, dval, 0.0);
break;
case expectRows - 2:
assertEquals(-0.39944902f, fval, 0.0);
assertEquals(-0.9301565025243212, dval, 0.0);
break;
case expectRows - 1:
assertEquals(0.37005588f, fval, 0.0);
assertEquals(-0.17858909208732915, dval, 0.0);
break;
}
}
assertTrue(reader.read() == null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,39 +24,26 @@
import static org.junit.Assert.fail;

import java.io.IOException;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.api.Binary;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestInterOpReadFloat16 {

// The link includes a reference to a specific commit. To take a newer version - update this link.
private static final String PARQUET_TESTING_REPO = "https://github.com/apache/parquet-testing/raw/da467da/data/";
private static String PARQUET_TESTING_PATH = "target/parquet-testing/data";
private static String FLOAT16_NONZEROS_NANS_FILE = "float16_nonzeros_and_nans.parquet";
private static String FLOAT16_ZEROS_NANS_FILE = "float16_zeros_and_nans.parquet";
private static final String CHANGESET = "da467da";

private static final Logger LOG = LoggerFactory.getLogger(TestInterOpReadFloat16.class);
private OkHttpClient httpClient = new OkHttpClient();
private InterOpTester interop = new InterOpTester();

@Test
public void testInterOpReadFloat16NonZerosAndNansParquetFiles() throws IOException {
Path rootPath = new Path(PARQUET_TESTING_PATH);
LOG.info("======== testInterOpReadFloat16NonZerosAndNansParquetFiles {} ========", rootPath);
Path filePath = interop.GetInterOpFile(FLOAT16_NONZEROS_NANS_FILE, CHANGESET);

Path filePath = downloadInterOpFiles(rootPath, FLOAT16_NONZEROS_NANS_FILE, httpClient);
final int expectRows = 8;
Binary[] c0ExpectValues = {
null,
Expand Down Expand Up @@ -100,10 +87,8 @@ public void testInterOpReadFloat16NonZerosAndNansParquetFiles() throws IOExcepti

@Test
public void testInterOpReadFloat16ZerosAndNansParquetFiles() throws IOException {
Path rootPath = new Path(PARQUET_TESTING_PATH);
LOG.info("======== testInterOpReadFloat16ZerosAndNansParquetFiles {} ========", rootPath);
Path filePath = interop.GetInterOpFile(FLOAT16_ZEROS_NANS_FILE, "da467da");

Path filePath = downloadInterOpFiles(rootPath, FLOAT16_ZEROS_NANS_FILE, httpClient);
final int expectRows = 3;
Binary[] c0ExpectValues = {
null,
Expand Down Expand Up @@ -138,32 +123,4 @@ public void testInterOpReadFloat16ZerosAndNansParquetFiles() throws IOException
}
}
}

private Path downloadInterOpFiles(Path rootPath, String fileName, OkHttpClient httpClient) throws IOException {
LOG.info("Download interOp files if needed");
Configuration conf = new Configuration();
FileSystem fs = rootPath.getFileSystem(conf);
LOG.info(rootPath + " exists?: " + fs.exists(rootPath));
if (!fs.exists(rootPath)) {
LOG.info("Create folder for interOp files: " + rootPath);
if (!fs.mkdirs(rootPath)) {
throw new IOException("Cannot create path " + rootPath);
}
}

Path file = new Path(rootPath, fileName);
if (!fs.exists(file)) {
String downloadUrl = PARQUET_TESTING_REPO + fileName;
LOG.info("Download interOp file: " + downloadUrl);
Request request = new Request.Builder().url(downloadUrl).build();
Response response = httpClient.newCall(request).execute();
if (!response.isSuccessful()) {
throw new IOException("Failed to download file: " + response);
}
try (FSDataOutputStream fdos = fs.create(file)) {
fdos.write(response.body().bytes());
}
}
return file;
}
}
Loading

0 comments on commit 8264d8b

Please sign in to comment.