diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/RecordRouter.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/RecordRouter.java index 5cd429c1..5dece5f5 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/RecordRouter.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/RecordRouter.java @@ -23,6 +23,7 @@ import io.tabular.iceberg.connect.deadletter.FailedRecordFactory; import java.util.List; + import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.kafka.connect.sink.SinkRecord; @@ -86,7 +87,11 @@ public static RecordRouter from( if (config.tables() != null && !config.tables().isEmpty()) { config.tables().forEach(TableIdentifier::of); if (config.tablesRouteField() != null) { - baseRecordRouter = new FallbackRecordRouter(new DynamicRecordRouter(writers, config.tablesRouteField()), new ConfigRecordRouter(writers, config.tables())); + if (hasRegexMode(config)) { + baseRecordRouter = new RegexRecordRouter(writers, config); + } else { + baseRecordRouter = new FallbackRecordRouter(new DynamicRecordRouter(writers, config.tablesRouteField()), new ConfigRecordRouter(writers, config.tables())); + } } else { baseRecordRouter = new ConfigRecordRouter(writers, config.tables()); } @@ -111,6 +116,23 @@ public static RecordRouter from( return baseRecordRouter; } + private static boolean hasRegexMode(IcebergSinkConfig config) { + long definedRegexes = config + .tables() + .stream() + .map( + tableName -> { + try { +return config + .tableConfig(tableName) + .routeRegex().isPresent(); + } catch (Exception unused) { + return false; + } + }).filter(present -> present).count(); + return definedRegexes > 0; + } + public static class ConfigRecordRouter extends RecordRouter { private final List tables; private final WriterManager writers; diff --git a/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/RecordRouterTest.java b/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/RecordRouterTest.java index 2b1787fc..7811ab7d 100644 --- a/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/RecordRouterTest.java +++ b/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/RecordRouterTest.java @@ -110,6 +110,7 @@ public void fallBackWriterTest() { assertThat(result).isEqualTo(Lists.newArrayList(Pair.of("route_field_table", recordWithRoute), Pair.of("tbl1", recordWithoutRoute), Pair.of("tbl2", recordWithoutRoute))); } + @Test @DisplayName("DynamicRecordRouter should dispatch based on the record field") public void dynamicRecordRouterTest() {