Skip to content

Commit

Permalink
start of fallback mode
Browse files Browse the repository at this point in the history
  • Loading branch information
tabmatfournier committed Apr 30, 2024
1 parent 797b861 commit 7bd7d5d
Showing 1 changed file with 27 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.tabular.iceberg.connect.IcebergSinkConfig;
import io.tabular.iceberg.connect.deadletter.DeadLetterUtils;
import io.tabular.iceberg.connect.deadletter.FailedRecordFactory;

import java.util.List;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -60,8 +61,15 @@ public static RecordRouter from(
// validate all table identifiers are valid, otherwise exception is thrown
// as this is an invalid config setting, not an error during processing
config.tables().forEach(TableIdentifier::of);
baseRecordRouter = new ConfigRecordRouter(writers, config.tables());
if (config.deadLetterTableEnabled()) {

// need a config option to find this one
baseRecordRouter = new FallbackRecordRouter(new DynamicRecordRouter(writers, config.tablesRouteField()), new ConfigRecordRouter(writers, config.tables()));
} else{
baseRecordRouter = new ConfigRecordRouter(writers, config.tables());
}
} else {
// does this need a fallback or can it even have a fallback :thinking-face:
baseRecordRouter = new RegexRecordRouter(writers, config);
}
}
Expand Down Expand Up @@ -158,6 +166,24 @@ public void write(SinkRecord record) {
}
}

private static class FallbackRecordRouter extends RecordRouter {
private final RecordRouter primary;
private final RecordRouter fallback;

FallbackRecordRouter(RecordRouter primary, RecordRouter fallback) {
this.primary = primary;
this.fallback = fallback;
}

public void write(SinkRecord record) {
try {
primary.write(record); // this doesn't work because of the null. or rather test this out.
} catch (Exception error) {
fallback.write(record);
}
}
}

private static class ErrorHandlingRecordRouter extends RecordRouter {
private final WriteExceptionHandler handler;
private final WriterManager writers;
Expand Down

0 comments on commit 7bd7d5d

Please sign in to comment.