diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/RecordRouter.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/RecordRouter.java index 18722ad5..4abdd743 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/RecordRouter.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/RecordRouter.java @@ -21,6 +21,7 @@ import io.tabular.iceberg.connect.IcebergSinkConfig; import io.tabular.iceberg.connect.deadletter.DeadLetterUtils; import io.tabular.iceberg.connect.deadletter.FailedRecordFactory; + import java.util.List; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -60,8 +61,15 @@ public static RecordRouter from( // validate all table identifiers are valid, otherwise exception is thrown // as this is an invalid config setting, not an error during processing config.tables().forEach(TableIdentifier::of); - baseRecordRouter = new ConfigRecordRouter(writers, config.tables()); + if (config.deadLetterTableEnabled()) { + + // need a config option to find this one + baseRecordRouter = new FallbackRecordRouter(new DynamicRecordRouter(writers, config.tablesRouteField()), new ConfigRecordRouter(writers, config.tables())); + } else{ + baseRecordRouter = new ConfigRecordRouter(writers, config.tables()); + } } else { + // does this need a fallback or can it even have a fallback :thinking-face: baseRecordRouter = new RegexRecordRouter(writers, config); } } @@ -158,6 +166,24 @@ public void write(SinkRecord record) { } } + private static class FallbackRecordRouter extends RecordRouter { + private final RecordRouter primary; + private final RecordRouter fallback; + + FallbackRecordRouter(RecordRouter primary, RecordRouter fallback) { + this.primary = primary; + this.fallback = fallback; + } + + public void write(SinkRecord record) { + try { + primary.write(record); // this doesn't work because of the null. or rather test this out. + } catch (Exception error) { + fallback.write(record); + } + } + } + private static class ErrorHandlingRecordRouter extends RecordRouter { private final WriteExceptionHandler handler; private final WriterManager writers;