Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Respect optional/required in table create and schema evolution #115

Merged
merged 2 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import io.tabular.iceberg.connect.data.SchemaUpdate.AddColumn;
import io.tabular.iceberg.connect.data.SchemaUpdate.TypeUpdate;
import io.tabular.iceberg.connect.data.SchemaUpdate.MakeOptional;
import io.tabular.iceberg.connect.data.SchemaUpdate.UpdateType;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.math.BigDecimal;
Expand Down Expand Up @@ -212,13 +213,24 @@ private GenericRecord convertToStruct(
new AddColumn(parentFieldName, recordField.name(), type));
}
} else {
PrimitiveType evolveDataType =
SchemaUtils.needsDataTypeUpdate(tableField.type(), recordField.schema());
// update the type if needed and schema evolution is on, otherwise set the value
if (evolveDataType != null && schemaUpdateConsumer != null) {
String fieldName = tableSchema.findColumnName(tableField.fieldId());
schemaUpdateConsumer.accept(new TypeUpdate(fieldName, evolveDataType));
} else {
boolean hasSchemaUpdates = false;
if (schemaUpdateConsumer != null) {
// update the type if needed and schema evolution is on
PrimitiveType evolveDataType =
SchemaUtils.needsDataTypeUpdate(tableField.type(), recordField.schema());
if (evolveDataType != null) {
String fieldName = tableSchema.findColumnName(tableField.fieldId());
schemaUpdateConsumer.accept(new UpdateType(fieldName, evolveDataType));
hasSchemaUpdates = true;
}
// make optional if needed and schema evolution is on
if (tableField.isRequired() && recordField.schema().isOptional()) {
String fieldName = tableSchema.findColumnName(tableField.fieldId());
schemaUpdateConsumer.accept(new MakeOptional(fieldName));
hasSchemaUpdates = true;
}
}
if (!hasSchemaUpdates) {
result.setField(
tableField.name(),
convertValue(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ public Type type() {
}
}

public static class TypeUpdate extends SchemaUpdate {
public static class UpdateType extends SchemaUpdate {
private final String name;
private final PrimitiveType type;

public TypeUpdate(String name, PrimitiveType type) {
public UpdateType(String name, PrimitiveType type) {
this.name = name;
this.type = type;
}
Expand All @@ -64,4 +64,16 @@ public PrimitiveType type() {
return type;
}
}

public static class MakeOptional extends SchemaUpdate {
private final String name;

public MakeOptional(String name) {
this.name = name;
}

public String name() {
return name;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
import static java.util.stream.Collectors.toList;

import io.tabular.iceberg.connect.data.SchemaUpdate.AddColumn;
import io.tabular.iceberg.connect.data.SchemaUpdate.TypeUpdate;
import io.tabular.iceberg.connect.data.SchemaUpdate.MakeOptional;
import io.tabular.iceberg.connect.data.SchemaUpdate.UpdateType;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
Expand Down Expand Up @@ -104,14 +105,22 @@ private static void commitSchemaUpdates(Table table, List<SchemaUpdate> updates)
.collect(toList());

// filter out columns that have the updated type
List<TypeUpdate> typeUpdates =
List<UpdateType> updateTypes =
updates.stream()
.filter(update -> update instanceof TypeUpdate)
.map(update -> (TypeUpdate) update)
.filter(typeUpdate -> !typeMatches(table.schema(), typeUpdate))
.filter(update -> update instanceof UpdateType)
.map(update -> (UpdateType) update)
.filter(updateType -> !typeMatches(table.schema(), updateType))
.collect(toList());

if (addColumns.isEmpty() && typeUpdates.isEmpty()) {
// filter out columns that have already been made optional
List<MakeOptional> makeOptionals =
updates.stream()
.filter(update -> update instanceof MakeOptional)
.map(update -> (MakeOptional) update)
.filter(makeOptional -> !isOptional(table.schema(), makeOptional))
.collect(toList());

if (addColumns.isEmpty() && updateTypes.isEmpty() && makeOptionals.isEmpty()) {
// no updates to apply
LOG.info("Schema for table {} already up-to-date", table.name());
return;
Expand All @@ -121,7 +130,8 @@ private static void commitSchemaUpdates(Table table, List<SchemaUpdate> updates)
UpdateSchema updateSchema = table.updateSchema();
addColumns.forEach(
update -> updateSchema.addColumn(update.parentName(), update.name(), update.type()));
typeUpdates.forEach(update -> updateSchema.updateColumn(update.name(), update.type()));
updateTypes.forEach(update -> updateSchema.updateColumn(update.name(), update.type()));
makeOptionals.forEach(update -> updateSchema.makeColumnOptional(update.name()));
updateSchema.commit();
LOG.info("Schema for table {} updated with new columns", table.name());
}
Expand All @@ -134,10 +144,14 @@ private static boolean columnExists(org.apache.iceberg.Schema schema, AddColumn
return struct.field(update.name()) != null;
}

private static boolean typeMatches(org.apache.iceberg.Schema schema, TypeUpdate update) {
private static boolean typeMatches(org.apache.iceberg.Schema schema, UpdateType update) {
return schema.findType(update.name()).typeId() == update.type().typeId();
}

private static boolean isOptional(org.apache.iceberg.Schema schema, MakeOptional update) {
return schema.findField(update.name()).isOptional();
}

public static PartitionSpec createPartitionSpec(
org.apache.iceberg.Schema schema, List<String> partitionBy) {
if (partitionBy.isEmpty()) {
Expand Down Expand Up @@ -242,18 +256,29 @@ Type toIcebergType(Schema valueSchema) {
return DoubleType.get();
case ARRAY:
Type elementType = toIcebergType(valueSchema.valueSchema());
return ListType.ofOptional(nextId(), elementType);
if (valueSchema.isOptional()) {
return ListType.ofOptional(nextId(), elementType);
} else {
return ListType.ofRequired(nextId(), elementType);
}
case MAP:
Type keyType = toIcebergType(valueSchema.keySchema());
Type valueType = toIcebergType(valueSchema.valueSchema());
return MapType.ofOptional(nextId(), nextId(), keyType, valueType);
if (valueSchema.isOptional()) {
return MapType.ofOptional(nextId(), nextId(), keyType, valueType);
} else {
return MapType.ofRequired(nextId(), nextId(), keyType, valueType);
}
case STRUCT:
List<NestedField> structFields =
valueSchema.fields().stream()
.map(
field ->
NestedField.optional(
nextId(), field.name(), toIcebergType(field.schema())))
NestedField.of(
nextId(),
field.schema().isOptional(),
field.name(),
toIcebergType(field.schema())))
.collect(toList());
return StructType.of(structFields);
case STRING:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import io.tabular.iceberg.connect.data.SchemaUpdate.AddColumn;
import io.tabular.iceberg.connect.data.SchemaUpdate.TypeUpdate;
import io.tabular.iceberg.connect.data.SchemaUpdate.UpdateType;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.time.Duration;
Expand Down Expand Up @@ -492,12 +492,12 @@ public void testEvolveTypeDetectionStruct() {

List<SchemaUpdate> updates = Lists.newArrayList();
converter.convert(data, updates::add);
List<TypeUpdate> addCols =
updates.stream().map(update -> (TypeUpdate) update).collect(toList());
List<UpdateType> addCols =
updates.stream().map(update -> (UpdateType) update).collect(toList());

assertThat(addCols).hasSize(2);

Map<String, TypeUpdate> updateMap = Maps.newHashMap();
Map<String, UpdateType> updateMap = Maps.newHashMap();
addCols.forEach(update -> updateMap.put(update.name(), update));

assertThat(updateMap.get("ii").type()).isInstanceOf(LongType.class);
Expand Down Expand Up @@ -529,12 +529,12 @@ public void testEvolveTypeDetectionStructNested() {

List<SchemaUpdate> updates = Lists.newArrayList();
converter.convert(data, updates::add);
List<TypeUpdate> addCols =
updates.stream().map(update -> (TypeUpdate) update).collect(toList());
List<UpdateType> addCols =
updates.stream().map(update -> (UpdateType) update).collect(toList());

assertThat(addCols).hasSize(2);

Map<String, TypeUpdate> updateMap = Maps.newHashMap();
Map<String, UpdateType> updateMap = Maps.newHashMap();
addCols.forEach(update -> updateMap.put(update.name(), update));

assertThat(updateMap.get("st.ii").type()).isInstanceOf(LongType.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
import static org.mockito.Mockito.when;

import io.tabular.iceberg.connect.data.SchemaUpdate.AddColumn;
import io.tabular.iceberg.connect.data.SchemaUpdate.TypeUpdate;
import io.tabular.iceberg.connect.data.SchemaUpdate.MakeOptional;
import io.tabular.iceberg.connect.data.SchemaUpdate.UpdateType;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
Expand Down Expand Up @@ -92,15 +93,17 @@ public void testApplySchemaUpdates() {
List<SchemaUpdate> updates =
ImmutableList.of(
new AddColumn(null, "i", IntegerType.get()),
new TypeUpdate("i", IntegerType.get()),
new TypeUpdate("f", DoubleType.get()),
new UpdateType("i", IntegerType.get()),
new MakeOptional("i"),
new UpdateType("f", DoubleType.get()),
new AddColumn(null, "s", StringType.get()));

SchemaUtils.applySchemaUpdates(table, updates);
verify(table).refresh();
verify(table).updateSchema();
verify(updateSchema).addColumn(isNull(), matches("s"), isA(StringType.class));
verify(updateSchema).updateColumn(matches("f"), isA(DoubleType.class));
verify(updateSchema).makeColumnOptional(matches("i"));
verify(updateSchema).commit();
}

Expand Down