diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/RecordConverter.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/RecordConverter.java index df66e2d2..c5637c3a 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/RecordConverter.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/RecordConverter.java @@ -22,7 +22,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.tabular.iceberg.connect.data.SchemaUpdate.AddColumn; -import io.tabular.iceberg.connect.data.SchemaUpdate.TypeUpdate; +import io.tabular.iceberg.connect.data.SchemaUpdate.MakeOptional; +import io.tabular.iceberg.connect.data.SchemaUpdate.UpdateType; import java.io.IOException; import java.io.UncheckedIOException; import java.math.BigDecimal; @@ -212,13 +213,24 @@ private GenericRecord convertToStruct( new AddColumn(parentFieldName, recordField.name(), type)); } } else { - PrimitiveType evolveDataType = - SchemaUtils.needsDataTypeUpdate(tableField.type(), recordField.schema()); - // update the type if needed and schema evolution is on, otherwise set the value - if (evolveDataType != null && schemaUpdateConsumer != null) { - String fieldName = tableSchema.findColumnName(tableField.fieldId()); - schemaUpdateConsumer.accept(new TypeUpdate(fieldName, evolveDataType)); - } else { + boolean hasSchemaUpdates = false; + if (schemaUpdateConsumer != null) { + // update the type if needed and schema evolution is on + PrimitiveType evolveDataType = + SchemaUtils.needsDataTypeUpdate(tableField.type(), recordField.schema()); + if (evolveDataType != null) { + String fieldName = tableSchema.findColumnName(tableField.fieldId()); + schemaUpdateConsumer.accept(new UpdateType(fieldName, evolveDataType)); + hasSchemaUpdates = true; + } + // make optional if needed and schema evolution is on + if (tableField.isRequired() && recordField.schema().isOptional()) { + String fieldName = tableSchema.findColumnName(tableField.fieldId()); + schemaUpdateConsumer.accept(new MakeOptional(fieldName)); + hasSchemaUpdates = true; + } + } + if (!hasSchemaUpdates) { result.setField( tableField.name(), convertValue( diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/SchemaUpdate.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/SchemaUpdate.java index e6a83ba2..ab54f9e3 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/SchemaUpdate.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/SchemaUpdate.java @@ -47,11 +47,11 @@ public Type type() { } } - public static class TypeUpdate extends SchemaUpdate { + public static class UpdateType extends SchemaUpdate { private final String name; private final PrimitiveType type; - public TypeUpdate(String name, PrimitiveType type) { + public UpdateType(String name, PrimitiveType type) { this.name = name; this.type = type; } @@ -64,4 +64,16 @@ public PrimitiveType type() { return type; } } + + public static class MakeOptional extends SchemaUpdate { + private final String name; + + public MakeOptional(String name) { + this.name = name; + } + + public String name() { + return name; + } + } } diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/SchemaUtils.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/SchemaUtils.java index 3f6d42da..6843328d 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/SchemaUtils.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/SchemaUtils.java @@ -21,7 +21,8 @@ import static java.util.stream.Collectors.toList; import io.tabular.iceberg.connect.data.SchemaUpdate.AddColumn; -import io.tabular.iceberg.connect.data.SchemaUpdate.TypeUpdate; +import io.tabular.iceberg.connect.data.SchemaUpdate.MakeOptional; +import io.tabular.iceberg.connect.data.SchemaUpdate.UpdateType; import java.math.BigDecimal; import java.time.LocalDate; import java.time.LocalDateTime; @@ -104,14 +105,22 @@ private static void commitSchemaUpdates(Table table, List updates) .collect(toList()); // filter out columns that have the updated type - List typeUpdates = + List updateTypes = updates.stream() - .filter(update -> update instanceof TypeUpdate) - .map(update -> (TypeUpdate) update) - .filter(typeUpdate -> !typeMatches(table.schema(), typeUpdate)) + .filter(update -> update instanceof UpdateType) + .map(update -> (UpdateType) update) + .filter(updateType -> !typeMatches(table.schema(), updateType)) .collect(toList()); - if (addColumns.isEmpty() && typeUpdates.isEmpty()) { + // filter out columns that have already been made optional + List makeOptionals = + updates.stream() + .filter(update -> update instanceof MakeOptional) + .map(update -> (MakeOptional) update) + .filter(makeOptional -> !isOptional(table.schema(), makeOptional)) + .collect(toList()); + + if (addColumns.isEmpty() && updateTypes.isEmpty() && makeOptionals.isEmpty()) { // no updates to apply LOG.info("Schema for table {} already up-to-date", table.name()); return; @@ -121,7 +130,8 @@ private static void commitSchemaUpdates(Table table, List updates) UpdateSchema updateSchema = table.updateSchema(); addColumns.forEach( update -> updateSchema.addColumn(update.parentName(), update.name(), update.type())); - typeUpdates.forEach(update -> updateSchema.updateColumn(update.name(), update.type())); + updateTypes.forEach(update -> updateSchema.updateColumn(update.name(), update.type())); + makeOptionals.forEach(update -> updateSchema.makeColumnOptional(update.name())); updateSchema.commit(); LOG.info("Schema for table {} updated with new columns", table.name()); } @@ -134,10 +144,14 @@ private static boolean columnExists(org.apache.iceberg.Schema schema, AddColumn return struct.field(update.name()) != null; } - private static boolean typeMatches(org.apache.iceberg.Schema schema, TypeUpdate update) { + private static boolean typeMatches(org.apache.iceberg.Schema schema, UpdateType update) { return schema.findType(update.name()).typeId() == update.type().typeId(); } + private static boolean isOptional(org.apache.iceberg.Schema schema, MakeOptional update) { + return schema.findField(update.name()).isOptional(); + } + public static PartitionSpec createPartitionSpec( org.apache.iceberg.Schema schema, List partitionBy) { if (partitionBy.isEmpty()) { @@ -242,18 +256,29 @@ Type toIcebergType(Schema valueSchema) { return DoubleType.get(); case ARRAY: Type elementType = toIcebergType(valueSchema.valueSchema()); - return ListType.ofOptional(nextId(), elementType); + if (valueSchema.valueSchema().isOptional()) { + return ListType.ofOptional(nextId(), elementType); + } else { + return ListType.ofRequired(nextId(), elementType); + } case MAP: Type keyType = toIcebergType(valueSchema.keySchema()); Type valueType = toIcebergType(valueSchema.valueSchema()); - return MapType.ofOptional(nextId(), nextId(), keyType, valueType); + if (valueSchema.valueSchema().isOptional()) { + return MapType.ofOptional(nextId(), nextId(), keyType, valueType); + } else { + return MapType.ofRequired(nextId(), nextId(), keyType, valueType); + } case STRUCT: List structFields = valueSchema.fields().stream() .map( field -> - NestedField.optional( - nextId(), field.name(), toIcebergType(field.schema()))) + NestedField.of( + nextId(), + field.schema().isOptional(), + field.name(), + toIcebergType(field.schema()))) .collect(toList()); return StructType.of(structFields); case STRING: diff --git a/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/RecordConverterTest.java b/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/RecordConverterTest.java index f747730c..f90fea89 100644 --- a/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/RecordConverterTest.java +++ b/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/RecordConverterTest.java @@ -25,7 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.tabular.iceberg.connect.data.SchemaUpdate.AddColumn; -import io.tabular.iceberg.connect.data.SchemaUpdate.TypeUpdate; +import io.tabular.iceberg.connect.data.SchemaUpdate.UpdateType; import java.math.BigDecimal; import java.nio.ByteBuffer; import java.time.Duration; @@ -492,12 +492,12 @@ public void testEvolveTypeDetectionStruct() { List updates = Lists.newArrayList(); converter.convert(data, updates::add); - List addCols = - updates.stream().map(update -> (TypeUpdate) update).collect(toList()); + List addCols = + updates.stream().map(update -> (UpdateType) update).collect(toList()); assertThat(addCols).hasSize(2); - Map updateMap = Maps.newHashMap(); + Map updateMap = Maps.newHashMap(); addCols.forEach(update -> updateMap.put(update.name(), update)); assertThat(updateMap.get("ii").type()).isInstanceOf(LongType.class); @@ -529,12 +529,12 @@ public void testEvolveTypeDetectionStructNested() { List updates = Lists.newArrayList(); converter.convert(data, updates::add); - List addCols = - updates.stream().map(update -> (TypeUpdate) update).collect(toList()); + List addCols = + updates.stream().map(update -> (UpdateType) update).collect(toList()); assertThat(addCols).hasSize(2); - Map updateMap = Maps.newHashMap(); + Map updateMap = Maps.newHashMap(); addCols.forEach(update -> updateMap.put(update.name(), update)); assertThat(updateMap.get("st.ii").type()).isInstanceOf(LongType.class); diff --git a/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/SchemaUtilsTest.java b/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/SchemaUtilsTest.java index a9277033..b7dc74d7 100644 --- a/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/SchemaUtilsTest.java +++ b/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/SchemaUtilsTest.java @@ -29,7 +29,8 @@ import static org.mockito.Mockito.when; import io.tabular.iceberg.connect.data.SchemaUpdate.AddColumn; -import io.tabular.iceberg.connect.data.SchemaUpdate.TypeUpdate; +import io.tabular.iceberg.connect.data.SchemaUpdate.MakeOptional; +import io.tabular.iceberg.connect.data.SchemaUpdate.UpdateType; import java.math.BigDecimal; import java.time.LocalDate; import java.time.LocalDateTime; @@ -92,8 +93,9 @@ public void testApplySchemaUpdates() { List updates = ImmutableList.of( new AddColumn(null, "i", IntegerType.get()), - new TypeUpdate("i", IntegerType.get()), - new TypeUpdate("f", DoubleType.get()), + new UpdateType("i", IntegerType.get()), + new MakeOptional("i"), + new UpdateType("f", DoubleType.get()), new AddColumn(null, "s", StringType.get())); SchemaUtils.applySchemaUpdates(table, updates); @@ -101,6 +103,7 @@ public void testApplySchemaUpdates() { verify(table).updateSchema(); verify(updateSchema).addColumn(isNull(), matches("s"), isA(StringType.class)); verify(updateSchema).updateColumn(matches("f"), isA(DoubleType.class)); + verify(updateSchema).makeColumnOptional(matches("i")); verify(updateSchema).commit(); }