diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/EventDecoder.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/EventDecoder.java index 590cf394..7ba4f878 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/EventDecoder.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/EventDecoder.java @@ -156,7 +156,7 @@ private Payload convertCommitResponse(CommitResponsePayload payload) { .get(); Type fieldType = AvroSchemaUtil.convert(fieldSchema); int fieldId = (int) f.getObjectProp("field-id"); - convertedFields.add(Types.NestedField.of(fieldId, false, f.name(), fieldType)); + convertedFields.add(Types.NestedField.of(fieldId, f.schema().isNullable(), f.name(), fieldType)); } Types.StructType convertedStructType = Types.StructType.of(convertedFields); diff --git a/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/EventDecoderTest.java b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/EventDecoderTest.java index c3285930..3d1ecab9 100644 --- a/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/EventDecoderTest.java +++ b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/EventDecoderTest.java @@ -37,10 +37,9 @@ import java.util.Collections; import java.util.List; import java.util.UUID; -import java.util.stream.Collectors; import org.apache.avro.Schema; +import org.apache.iceberg.DataFile; import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.connect.events.CommitComplete; import org.apache.iceberg.connect.events.CommitToTable; @@ -49,8 +48,8 @@ import org.apache.iceberg.connect.events.Event; import org.apache.iceberg.connect.events.PayloadType; import org.apache.iceberg.connect.events.StartCommit; +import org.apache.iceberg.connect.events.TableReference; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.junit.jupiter.api.Test; @@ -166,24 +165,19 @@ public void testCommitResponseBecomesDataWrittenPartitioned() { assertThat(payload.tableReference().catalog()).isEqualTo("catalog"); assertThat(payload.tableReference().identifier()).isEqualTo(TableIdentifier.of("db", "tbl")); - Schema.Field field = - payload.getSchema().getFields().get(2).schema().getTypes().stream() - .filter(s -> s.getType() != Schema.Type.NULL) - .findFirst() - .get() - .getElementType() - .getField("partition"); - List resultPartitionInfo = - field.schema().getFields().stream() - .map( - f -> { - Type fieldType = AvroSchemaUtil.convert(f.schema()); - int fieldId = (int) f.getObjectProp("field-id"); - return Types.NestedField.optional(fieldId, f.name(), fieldType); - }) - .collect(Collectors.toList()); - - assertThat(resultPartitionInfo).isEqualTo(spec.partitionType().fields()); + assertThat(payload.writeSchema()).isEqualTo( + Types.StructType.of( + Types.NestedField.required(10_300, "commit_id", Types.UUIDType.get()), + Types.NestedField.required( + 10_301, "table_reference", TableReference.ICEBERG_SCHEMA), + Types.NestedField.optional( + 10_302, + "data_files", + Types.ListType.ofRequired(10_303, DataFile.getType(spec.partitionType()))), + Types.NestedField.optional( + 10_304, + "delete_files", + Types.ListType.ofRequired(10_304, DataFile.getType(spec.partitionType()))))); } @Test