Skip to content

Commit

Permalink
Add config option to enable case-insensitive lookup by column name
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanck committed Nov 23, 2023
1 parent afb2aaf commit 25cbecb
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 26 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ The zip archive will be found under `./kafka-connect-runtime/build/distributions
| iceberg.tables.auto-create-enabled | Set to `true` to automatically create destination tables, default is `false` |
| iceberg.tables.evolve-schema-enabled | Set to `true` to add any missing record fields to the table schema, default is `false` |
| iceberg.tables.schema-force-optional | Set to `true` to set columns as optional during table create and evolution, default is `false` to respect schema |
| iceberg.tables.schema-case-insensitive | Set to `true` to look up table columns by case-insensitive name, default is `false` for case-sensitive |
| iceberg.tables.auto-create-props.* | Properties set on new tables during auto-create |
| iceberg.tables.write-props.* | Properties passed through to Iceberg writer initialization, these take precedence |
| iceberg.table.\<table name\>.commit-branch | Table-specific branch for commits, use `iceberg.tables.default-commit-branch` if not specified |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,12 @@ public class IcebergSinkConfig extends AbstractConfig {
"iceberg.tables.upsert-mode-enabled";
private static final String TABLES_AUTO_CREATE_ENABLED_PROP =
"iceberg.tables.auto-create-enabled";
private static final String TABLES_SCHEMA_FORCE_OPTIONAL_PROP =
"iceberg.tables.schema-force-optional";
private static final String TABLES_EVOLVE_SCHEMA_ENABLED_PROP =
"iceberg.tables.evolve-schema-enabled";
private static final String TABLES_SCHEMA_FORCE_OPTIONAL_PROP =
"iceberg.tables.schema-force-optional";
private static final String TABLES_SCHEMA_CASE_INSENSITIVE_PROP =
"iceberg.tables.schema-case-insensitive";
private static final String CONTROL_TOPIC_PROP = "iceberg.control.topic";
private static final String CONTROL_GROUP_ID_PROP = "iceberg.control.group-id";
private static final String COMMIT_INTERVAL_MS_PROP = "iceberg.control.commit.interval-ms";
Expand Down Expand Up @@ -175,6 +177,12 @@ private static ConfigDef newConfigDef() {
false,
Importance.MEDIUM,
"Set to true to set columns as optional during table create and evolution, false to respect schema");
configDef.define(
TABLES_SCHEMA_CASE_INSENSITIVE_PROP,
Type.BOOLEAN,
false,
Importance.MEDIUM,
"Set to true to look up table columns by case-insensitive name, false for case-sensitive");
configDef.define(
TABLES_EVOLVE_SCHEMA_ENABLED_PROP,
Type.BOOLEAN,
Expand Down Expand Up @@ -427,12 +435,16 @@ public boolean autoCreateEnabled() {
return getBoolean(TABLES_AUTO_CREATE_ENABLED_PROP);
}

public boolean evolveSchemaEnabled() {
return getBoolean(TABLES_EVOLVE_SCHEMA_ENABLED_PROP);
}

public boolean schemaForceOptional() {
return getBoolean(TABLES_SCHEMA_FORCE_OPTIONAL_PROP);
}

public boolean evolveSchemaEnabled() {
return getBoolean(TABLES_EVOLVE_SCHEMA_ENABLED_PROP);
public boolean schemaCaseInsensitive() {
return getBoolean(TABLES_SCHEMA_CASE_INSENSITIVE_PROP);
}

public JsonConverter jsonConverter() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,9 @@ private GenericRecord convertToStruct(

private NestedField lookupStructField(String fieldName, StructType schema, int structFieldId) {
if (nameMapping == null) {
return schema.field(fieldName);
return config.schemaCaseInsensitive()
? schema.caseInsensitiveField(fieldName)
: schema.field(fieldName);
}

return structNameMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,10 @@
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.storage.ConverterConfig;
import org.apache.kafka.connect.storage.ConverterType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public class RecordConverterTest {

Expand Down Expand Up @@ -119,6 +122,11 @@ public class RecordConverterTest {
Types.NestedField.required(1, "ii", Types.IntegerType.get()),
Types.NestedField.required(2, "st", Types.StringType.get()));

private static final org.apache.iceberg.Schema SIMPLE_SCHEMA_UPPER_CASE =
new org.apache.iceberg.Schema(
Types.NestedField.required(1, "II", Types.IntegerType.get()),
Types.NestedField.required(2, "ST", Types.StringType.get()));

private static final org.apache.iceberg.Schema ID_SCHEMA =
new org.apache.iceberg.Schema(Types.NestedField.required(1, "ii", Types.IntegerType.get()));

Expand Down Expand Up @@ -154,24 +162,30 @@ public class RecordConverterTest {
private static final List<String> LIST_VAL = ImmutableList.of("hello", "world");
private static final Map<String, String> MAP_VAL = ImmutableMap.of("one", "1", "two", "2");

private static final IcebergSinkConfig CONFIG = mock(IcebergSinkConfig.class);
private static final JsonConverter JSON_CONVERTER = new JsonConverter();

static {
JsonConverter jsonConverter = new JsonConverter();
jsonConverter.configure(
JSON_CONVERTER.configure(
ImmutableMap.of(
JsonConverterConfig.SCHEMAS_ENABLE_CONFIG,
false,
ConverterConfig.TYPE_CONFIG,
ConverterType.VALUE.getName()));
when(CONFIG.jsonConverter()).thenReturn(jsonConverter);
}

private IcebergSinkConfig config;

@BeforeEach
public void before() {
this.config = mock(IcebergSinkConfig.class);
when(config.jsonConverter()).thenReturn(JSON_CONVERTER);
}

@Test
public void testMapConvert() {
Table table = mock(Table.class);
when(table.schema()).thenReturn(SCHEMA);
RecordConverter converter = new RecordConverter(table, CONFIG);
RecordConverter converter = new RecordConverter(table, config);

Map<String, Object> data = createMapData();
Record record = converter.convert(data);
Expand All @@ -182,7 +196,7 @@ public void testMapConvert() {
public void testNestedMapConvert() {
Table table = mock(Table.class);
when(table.schema()).thenReturn(NESTED_SCHEMA);
RecordConverter converter = new RecordConverter(table, CONFIG);
RecordConverter converter = new RecordConverter(table, config);

Map<String, Object> nestedData = createNestedMapData();
Record record = converter.convert(nestedData);
Expand All @@ -194,7 +208,7 @@ public void testNestedMapConvert() {
public void testMapToString() throws Exception {
Table table = mock(Table.class);
when(table.schema()).thenReturn(SIMPLE_SCHEMA);
RecordConverter converter = new RecordConverter(table, CONFIG);
RecordConverter converter = new RecordConverter(table, config);

Map<String, Object> nestedData = createNestedMapData();
Record record = converter.convert(nestedData);
Expand All @@ -208,7 +222,7 @@ public void testMapToString() throws Exception {
public void testStructConvert() {
Table table = mock(Table.class);
when(table.schema()).thenReturn(SCHEMA);
RecordConverter converter = new RecordConverter(table, CONFIG);
RecordConverter converter = new RecordConverter(table, config);

Struct data = createStructData();
Record record = converter.convert(data);
Expand All @@ -219,7 +233,7 @@ public void testStructConvert() {
public void testNestedStructConvert() {
Table table = mock(Table.class);
when(table.schema()).thenReturn(NESTED_SCHEMA);
RecordConverter converter = new RecordConverter(table, CONFIG);
RecordConverter converter = new RecordConverter(table, config);

Struct nestedData = createNestedStructData();
Record record = converter.convert(nestedData);
Expand All @@ -231,7 +245,7 @@ public void testNestedStructConvert() {
public void testStructToString() throws Exception {
Table table = mock(Table.class);
when(table.schema()).thenReturn(SIMPLE_SCHEMA);
RecordConverter converter = new RecordConverter(table, CONFIG);
RecordConverter converter = new RecordConverter(table, config);

Struct nestedData = createNestedStructData();
Record record = converter.convert(nestedData);
Expand All @@ -252,18 +266,39 @@ public void testNameMapping() {
ImmutableMap.of(
TableProperties.DEFAULT_NAME_MAPPING, NameMappingParser.toJson(nameMapping)));

RecordConverter converter = new RecordConverter(table, CONFIG);
RecordConverter converter = new RecordConverter(table, config);

Map<String, Object> data = ImmutableMap.of("renamed_ii", 123);
Record record = converter.convert(data);
assertThat(record.getField("ii")).isEqualTo(123);
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testCaseSensitivity(boolean caseInsensitive) {
Table table = mock(Table.class);
when(table.schema()).thenReturn(SIMPLE_SCHEMA_UPPER_CASE);

when(config.schemaCaseInsensitive()).thenReturn(caseInsensitive);

RecordConverter converter = new RecordConverter(table, config);

Map<String, Object> data = ImmutableMap.of("ii", 123);
Record record = converter.convert(data);

if (caseInsensitive) {
assertThat(record.getField("II")).isEqualTo(123);
} else {
assertThat(record.getField("II")).isEqualTo(null);
}
}

@Test
public void testDecimalConversion() {
Table table = mock(Table.class);
when(table.schema()).thenReturn(SIMPLE_SCHEMA);
RecordConverter converter = new RecordConverter(table, CONFIG);

RecordConverter converter = new RecordConverter(table, config);

BigDecimal expected = new BigDecimal("123.45");

Expand All @@ -288,7 +323,7 @@ public void testDecimalConversion() {
public void testDateConversion() {
Table table = mock(Table.class);
when(table.schema()).thenReturn(SIMPLE_SCHEMA);
RecordConverter converter = new RecordConverter(table, CONFIG);
RecordConverter converter = new RecordConverter(table, config);

LocalDate expected = LocalDate.of(2023, 11, 15);

Expand All @@ -310,7 +345,7 @@ public void testDateConversion() {
public void testTimeConversion() {
Table table = mock(Table.class);
when(table.schema()).thenReturn(SIMPLE_SCHEMA);
RecordConverter converter = new RecordConverter(table, CONFIG);
RecordConverter converter = new RecordConverter(table, config);

LocalTime expected = LocalTime.of(7, 51, 30, 888_000_000);

Expand Down Expand Up @@ -345,7 +380,7 @@ public void testTimestampWithoutZoneConversion() {
private void convertToTimestamps(Temporal expected, long expectedMillis, TimestampType type) {
Table table = mock(Table.class);
when(table.schema()).thenReturn(SIMPLE_SCHEMA);
RecordConverter converter = new RecordConverter(table, CONFIG);
RecordConverter converter = new RecordConverter(table, config);

List<Object> inputList =
ImmutableList.of(
Expand Down Expand Up @@ -375,7 +410,7 @@ private void convertToTimestamps(Temporal expected, long expectedMillis, Timesta
public void testMissingColumnDetectionMap() {
Table table = mock(Table.class);
when(table.schema()).thenReturn(ID_SCHEMA);
RecordConverter converter = new RecordConverter(table, CONFIG);
RecordConverter converter = new RecordConverter(table, config);

Map<String, Object> data = Maps.newHashMap(createMapData());
data.put("null", null);
Expand Down Expand Up @@ -413,7 +448,7 @@ public void testMissingColumnDetectionMap() {
public void testMissingColumnDetectionMapNested() {
Table table = mock(Table.class);
when(table.schema()).thenReturn(ID_SCHEMA);
RecordConverter converter = new RecordConverter(table, CONFIG);
RecordConverter converter = new RecordConverter(table, config);

Map<String, Object> nestedData = createNestedMapData();
List<SchemaUpdate> addCols = Lists.newArrayList();
Expand Down Expand Up @@ -449,7 +484,7 @@ public void testMissingColumnDetectionMapNested() {
public void testMissingColumnDetectionStruct() {
Table table = mock(Table.class);
when(table.schema()).thenReturn(ID_SCHEMA);
RecordConverter converter = new RecordConverter(table, CONFIG);
RecordConverter converter = new RecordConverter(table, config);

Struct data = createStructData();
List<SchemaUpdate> updates = Lists.newArrayList();
Expand Down Expand Up @@ -487,7 +522,7 @@ public void testEvolveTypeDetectionStruct() {

Table table = mock(Table.class);
when(table.schema()).thenReturn(tableSchema);
RecordConverter converter = new RecordConverter(table, CONFIG);
RecordConverter converter = new RecordConverter(table, config);

Schema valueSchema =
SchemaBuilder.struct().field("ii", Schema.INT64_SCHEMA).field("ff", Schema.FLOAT64_SCHEMA);
Expand Down Expand Up @@ -521,7 +556,7 @@ public void testEvolveTypeDetectionStructNested() {

Table table = mock(Table.class);
when(table.schema()).thenReturn(tableSchema);
RecordConverter converter = new RecordConverter(table, CONFIG);
RecordConverter converter = new RecordConverter(table, config);

Schema structSchema =
SchemaBuilder.struct().field("ii", Schema.INT64_SCHEMA).field("ff", Schema.FLOAT64_SCHEMA);
Expand All @@ -548,7 +583,7 @@ public void testEvolveTypeDetectionStructNested() {
public void testMissingColumnDetectionStructNested() {
Table table = mock(Table.class);
when(table.schema()).thenReturn(ID_SCHEMA);
RecordConverter converter = new RecordConverter(table, CONFIG);
RecordConverter converter = new RecordConverter(table, config);

Struct nestedData = createNestedStructData();
List<SchemaUpdate> addCols = Lists.newArrayList();
Expand Down

0 comments on commit 25cbecb

Please sign in to comment.