Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
tabmatfournier committed May 21, 2024
1 parent 3626568 commit 94e9c01
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ private Payload convertCommitResponse(CommitResponsePayload payload) {
.get();
Type fieldType = AvroSchemaUtil.convert(fieldSchema);
int fieldId = (int) f.getObjectProp("field-id");
convertedFields.add(Types.NestedField.of(fieldId, false, f.name(), fieldType));
convertedFields.add(Types.NestedField.of(fieldId, f.schema().isNullable(), f.name(), fieldType));
}

Types.StructType convertedStructType = Types.StructType.of(convertedFields);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,9 @@
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.connect.events.CommitComplete;
import org.apache.iceberg.connect.events.CommitToTable;
Expand All @@ -49,8 +48,8 @@
import org.apache.iceberg.connect.events.Event;
import org.apache.iceberg.connect.events.PayloadType;
import org.apache.iceberg.connect.events.StartCommit;
import org.apache.iceberg.connect.events.TableReference;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -166,24 +165,19 @@ public void testCommitResponseBecomesDataWrittenPartitioned() {
assertThat(payload.tableReference().catalog()).isEqualTo("catalog");
assertThat(payload.tableReference().identifier()).isEqualTo(TableIdentifier.of("db", "tbl"));

Schema.Field field =
payload.getSchema().getFields().get(2).schema().getTypes().stream()
.filter(s -> s.getType() != Schema.Type.NULL)
.findFirst()
.get()
.getElementType()
.getField("partition");
List<Types.NestedField> resultPartitionInfo =
field.schema().getFields().stream()
.map(
f -> {
Type fieldType = AvroSchemaUtil.convert(f.schema());
int fieldId = (int) f.getObjectProp("field-id");
return Types.NestedField.optional(fieldId, f.name(), fieldType);
})
.collect(Collectors.toList());

assertThat(resultPartitionInfo).isEqualTo(spec.partitionType().fields());
assertThat(payload.writeSchema()).isEqualTo(
Types.StructType.of(
Types.NestedField.required(10_300, "commit_id", Types.UUIDType.get()),
Types.NestedField.required(
10_301, "table_reference", TableReference.ICEBERG_SCHEMA),
Types.NestedField.optional(
10_302,
"data_files",
Types.ListType.ofRequired(10_303, DataFile.getType(spec.partitionType()))),
Types.NestedField.optional(
10_304,
"delete_files",
Types.ListType.ofRequired(10_304, DataFile.getType(spec.partitionType())))));
}

@Test
Expand Down

0 comments on commit 94e9c01

Please sign in to comment.