Skip to content

Commit

Permalink
Update syntax
Browse files Browse the repository at this point in the history
  • Loading branch information
Fokko committed Oct 18, 2023
1 parent bde22f1 commit 857b84c
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@
import org.apache.parquet.thrift.struct.ThriftType.StructType;

@Deprecated
public class ThriftBytesWriteSupport extends WriteSupport<BytesWritable> {
public class
ThriftBytesWriteSupport extends WriteSupport<BytesWritable> {
private static final Logger LOG = LoggerFactory.getLogger(ThriftBytesWriteSupport.class);
private static final String PARQUET_PROTOCOL_CLASS = "parquet.protocol.class";

Expand Down Expand Up @@ -92,6 +93,10 @@ public ThriftBytesWriteSupport() {
* @deprecated Use @link{ThriftBytesWriteSupport(Configuration configuration,
* TProtocolFactory protocolFactory, {@literal Class<? extends TBase<?,?>>} thriftClass,
* boolean buffered, FieldIgnoredHandler errorHandler)} instead
* @param protocolFactory The factory
* @param thriftClass The class
* @param buffered When buffered
* @param errorHandler Handler when fields are missing
*/
@Deprecated
public ThriftBytesWriteSupport(TProtocolFactory protocolFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@
/**
* A field projection filter decides whether a thrift field (column) should
* be included when reading thrift data. It is used to implement projection push down.
*
* See {@link StrictFieldProjectionFilter} and
* {@link org.apache.parquet.thrift.projection.deprecated.DeprecatedFieldProjectionFilter}
*/
@Deprecated
public interface FieldProjectionFilter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void testThriftOptionalFieldsWithReadProjectionUsingParquetSchema() throw

@Test
public void testPullingInRequiredStructWithFilter() throws Exception {
final String projectionFilterDesc = "persons/{id};persons/email";
final String projectionFilterDesc = "persons.{id};persons.email";
TBase toWrite = new AddressBook(
Arrays.asList(
new Person(
Expand Down Expand Up @@ -122,8 +122,7 @@ public void testReorderdOptionalFields() throws Exception {

@Test
public void testProjectOutOptionalFields() throws Exception {

final String projectionFilterDesc = "persons/name/*";
final String projectionFilterDesc = "persons.name.*";

TBase toWrite = new AddressBook(
Arrays.asList(
Expand Down Expand Up @@ -164,7 +163,7 @@ public void testPullInRequiredMaps() throws Exception {

@Test
public void testDropMapValuePrimitive() throws Exception {
String filter = "mavalue/key";
String filter = "mavalue.key";

Map<String, String> mapValue = new HashMap<String, String>();
mapValue.put("a", "1");
Expand Down Expand Up @@ -199,7 +198,7 @@ private StructV4WithExtracStructField makeStructV4WithExtracStructField(String i

@Test
public void testDropMapValueStruct() throws Exception {
String filter = "reqMap/key";
String filter = "reqMap.key";

Map<String, StructV4WithExtracStructField> mapValue = new HashMap<String, StructV4WithExtracStructField>();

Expand All @@ -222,7 +221,7 @@ public void testDropMapValueStruct() throws Exception {

@Test
public void testDropMapValueNestedPrim() throws Exception {
String filter = "reqMap/key";
String filter = "reqMap.key";

Map<String, Map<String, String>> mapValue =
new HashMap<String, Map<String, String>>();
Expand Down Expand Up @@ -261,10 +260,9 @@ public void testDropMapValueNestedPrim() throws Exception {

@Test
public void testDropMapValueNestedStruct() throws Exception {
String filter = "reqMap/key";
String filter = "reqMap.key";

Map<String, Map<String, StructV4WithExtracStructField>> mapValue =
new HashMap<String, Map<String, StructV4WithExtracStructField>>();
Map<String, Map<String, StructV4WithExtracStructField>> mapValue = new HashMap<>();

Map<String, StructV4WithExtracStructField> innerValue1 = new HashMap<String, StructV4WithExtracStructField>();
innerValue1.put("inner key (1, 1)", makeStructV4WithExtracStructField("inner (1, 1)"));
Expand Down Expand Up @@ -353,22 +351,21 @@ private void shouldDoProjectionWithThriftColumnFilter(String filterDesc, TBase t
//create a test file
final TProtocolFactory protocolFactory = new TCompactProtocol.Factory();
final TaskAttemptID taskId = new TaskAttemptID("local", 0, true, 0, 0);
final ThriftToParquetFileWriter w = new ThriftToParquetFileWriter(parquetFile, ContextUtil.newTaskAttemptContext(conf, taskId), protocolFactory, thriftClass);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final TProtocol protocol = protocolFactory.getProtocol(new TIOStreamTransport(baos));

recordToWrite.write(protocol);
w.write(new BytesWritable(baos.toByteArray()));
w.close();
try (ThriftToParquetFileWriter w = new ThriftToParquetFileWriter(parquetFile, ContextUtil.newTaskAttemptContext(conf, taskId), protocolFactory, thriftClass);
ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
final TProtocol protocol = protocolFactory.getProtocol(new TIOStreamTransport(baos));
recordToWrite.write(protocol);
w.write(new BytesWritable(baos.toByteArray()));
}

T readValue = null;

final ParquetThriftInputFormat<T> parquetThriftInputFormat = new ParquetThriftInputFormat<T>();
final Job job = new Job(conf, "read");
final ParquetThriftInputFormat<T> parquetThriftInputFormat = new ParquetThriftInputFormat<>();
Job job = new Job(conf, "read");
job.setInputFormatClass(ParquetThriftInputFormat.class);
ParquetThriftInputFormat.setInputPaths(job, parquetFile);
final JobID jobID = new JobID("local", 1);
List<InputSplit> splits = parquetThriftInputFormat.getSplits(ContextUtil.newJobContext(ContextUtil.getConfiguration(job), jobID));
T readValue = null;
for (InputSplit split : splits) {
TaskAttemptContext taskAttemptContext = ContextUtil.newTaskAttemptContext(ContextUtil.getConfiguration(job), new TaskAttemptID(new TaskID(jobID, true, 1), 0));
try (final RecordReader<Void, T> reader = parquetThriftInputFormat.createRecordReader(split, taskAttemptContext)) {
Expand Down

0 comments on commit 857b84c

Please sign in to comment.