Skip to content

Commit

Permalink
better regex detection to avoid an extra config
Browse files Browse the repository at this point in the history
  • Loading branch information
tabmatfournier committed May 13, 2024
1 parent 25208da commit 205c2d7
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.tabular.iceberg.connect.deadletter.FailedRecordFactory;

import java.util.List;

import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.kafka.connect.sink.SinkRecord;
Expand Down Expand Up @@ -86,7 +87,11 @@ public static RecordRouter from(
if (config.tables() != null && !config.tables().isEmpty()) {
config.tables().forEach(TableIdentifier::of);
if (config.tablesRouteField() != null) {
baseRecordRouter = new FallbackRecordRouter(new DynamicRecordRouter(writers, config.tablesRouteField()), new ConfigRecordRouter(writers, config.tables()));
if (hasRegexMode(config)) {
baseRecordRouter = new RegexRecordRouter(writers, config);
} else {
baseRecordRouter = new FallbackRecordRouter(new DynamicRecordRouter(writers, config.tablesRouteField()), new ConfigRecordRouter(writers, config.tables()));
}
} else {
baseRecordRouter = new ConfigRecordRouter(writers, config.tables());
}
Expand All @@ -111,6 +116,23 @@ public static RecordRouter from(
return baseRecordRouter;
}

private static boolean hasRegexMode(IcebergSinkConfig config) {
long definedRegexes = config
.tables()
.stream()
.map(
tableName -> {
try {
return config
.tableConfig(tableName)
.routeRegex().isPresent();
} catch (Exception unused) {
return false;
}
}).filter(present -> present).count();
return definedRegexes > 0;
}

public static class ConfigRecordRouter extends RecordRouter {
private final List<String> tables;
private final WriterManager writers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public void fallBackWriterTest() {
assertThat(result).isEqualTo(Lists.newArrayList(Pair.of("route_field_table", recordWithRoute), Pair.of("tbl1", recordWithoutRoute), Pair.of("tbl2", recordWithoutRoute)));
}


@Test
@DisplayName("DynamicRecordRouter should dispatch based on the record field")
public void dynamicRecordRouterTest() {
Expand Down

0 comments on commit 205c2d7

Please sign in to comment.