Skip to content

Commit

Permalink
third mode
Browse files Browse the repository at this point in the history
  • Loading branch information
tabmatfournier committed May 13, 2024
1 parent 7bd7d5d commit bff7233
Show file tree
Hide file tree
Showing 8 changed files with 307 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,28 @@
import org.apache.kafka.connect.transforms.util.SimpleConfig;

public class DefaultFailedRecordFactory implements FailedRecordFactory {

private static final String DEAD_LETTER_TABLE_NAME_PROP = "table_name";

private static final String DEAD_LETTER_ROUTE_FIELD_PROP = "route_field";
private static final ConfigDef CONFIG_DEF =
new ConfigDef()
.define(
DEAD_LETTER_TABLE_NAME_PROP,
ConfigDef.Type.STRING,
null,
ConfigDef.Importance.MEDIUM,
"dead letter table name namespace.table");
"dead letter table name namespace.table")
.define(DEAD_LETTER_ROUTE_FIELD_PROP,
ConfigDef.Type.STRING,
null,
ConfigDef.Importance.MEDIUM,
"route field to inject table name on");

private static final String HEADERS = "headers";
private Schema schema;

private String deadLetterTableName;
private String deadLetterRouteField;

@Override
public Schema schema(String context) {
Expand Down Expand Up @@ -100,29 +107,11 @@ public SinkRecord recordFromConnector(SinkRecord record, Throwable error, String
record.topic(), record.kafkaPartition(), null, null, schema, struct, record.timestamp());
}

@Override
public boolean isFailedTransformRecord(SinkRecord record) {
if (record != null && record.valueSchema() != null) {
Map<String, String> parameters = record.valueSchema().parameters();
if (parameters != null) {
String isFailed = parameters.get("transform_failed");
if (isFailed != null) {
return isFailed.equals("true");
}
}
}
return false;
}

@Override
public String tableName(SinkRecord record) {
return deadLetterTableName;
}

@Override
public void configure(Map<String, ?> props) {
SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
deadLetterTableName = config.getString(DEAD_LETTER_TABLE_NAME_PROP);
deadLetterRouteField = config.getString(DEAD_LETTER_ROUTE_FIELD_PROP);
if (deadLetterTableName == null) {
throw new IllegalArgumentException("Dead letter table name cannot be null");
}
Expand All @@ -141,7 +130,8 @@ public void configure(Map<String, ?> props) {
.field(HEADERS, DeadLetterUtils.HEADER_SCHEMA)
.field("context", Schema.OPTIONAL_STRING_SCHEMA)
.field("target_table", Schema.OPTIONAL_STRING_SCHEMA)
.schema();
.field(deadLetterRouteField, Schema.STRING_SCHEMA)
.build();
}

private void addCommon(Struct struct, SinkRecord record, Throwable error, String context) {
Expand All @@ -157,5 +147,6 @@ private void addCommon(Struct struct, SinkRecord record, Throwable error, String
if (context != null) {
struct.put("context", context);
}
struct.put(deadLetterRouteField, deadLetterTableName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,5 @@ public interface FailedRecordFactory {
// where in the original are the byte arrays.
SinkRecord recordFromConnector(SinkRecord record, Throwable error, String context);

boolean isFailedTransformRecord(SinkRecord record);

String tableName(SinkRecord record);

void configure(Map<String, ?> props);
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,6 @@ public class IcebergSinkConfig extends AbstractConfig {
"iceberg.tables.deadletter.record_factory";
private static final String FAILED_RECORD_FACTORY_PREFIX =
"iceberg.tables.deadletter.record_factory";
private static final String FAILED_RECORD_FACTORY_DEFAULT =
"io.tabular.iceberg.connect.deadletter.DefaultFailedRecordFactory";
private static final String CONTROL_TOPIC_PROP = "iceberg.control.topic";
private static final String CONTROL_GROUP_ID_PROP = "iceberg.control.group-id";
private static final String COMMIT_INTERVAL_MS_PROP = "iceberg.control.commit.interval-ms";
Expand Down Expand Up @@ -250,7 +248,7 @@ private static ConfigDef newConfigDef() {
configDef.define(
FAILED_RECORD_FACTORY_PROP,
Type.STRING,
FAILED_RECORD_FACTORY_DEFAULT,
null,
Importance.MEDIUM,
"If writing to Dead Letter Table, failed record factory class to use");
return configDef;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void initialize(
}

@Override
public Result handle(SinkRecord record, Exception exception) {
public SinkRecord handle(SinkRecord record, Exception exception) {
if (exception instanceof WriteException) {
return handleWriteException(record, (WriteException) exception);
}
Expand All @@ -46,7 +46,7 @@ public Result handle(SinkRecord record, Exception exception) {
}

@SuppressWarnings("checkstyle:CyclomaticComplexity")
private Result handleWriteException(SinkRecord record, WriteException exception) {
private SinkRecord handleWriteException(SinkRecord record, WriteException exception) {
if (exception instanceof WriteException.CreateTableException) {
Throwable cause = exception.getCause();
if (cause instanceof IllegalArgumentException || cause instanceof ValidationException) {
Expand Down Expand Up @@ -81,14 +81,7 @@ private Result handleWriteException(SinkRecord record, WriteException exception)
throw exception;
}

private Result failedRecord(SinkRecord record, WriteException exception) {
String targetTableId = exception.tableId();
if (targetTableId != null && targetTableId.equals(factory.tableName(record))) {
throw new IllegalArgumentException(
String.format(
"Must throw for exceptions involving target Dead Letter Table: %s", targetTableId),
exception);
}
return new Result(factory.recordFromConnector(record, exception, null), targetTableId);
private SinkRecord failedRecord(SinkRecord record, WriteException exception) {
return factory.recordFromConnector(record, exception, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,34 +42,55 @@ protected final String extractRouteValue(Object recordValue, String routeField)
} catch (Exception error) {
throw new WriteException.RouteException(error);
}

// confirm if this can even happen
return routeValue == null ? null : routeValue.toString();
}


/*
|iceberg.tables|dynamic-enables|route-field| routing behavior |
|--------------|---------------|----------------|-------------|
| empty | true | populated | DynamicRecordRouter |
| empty | false | populated | RegexRouter |
| populated | false | null | ConfigRouter |
| populated | false | populated | DynamicRecordRouter then ConfigRouter |
what does iceberg.tables.default-commit-branch do ?
Record routing is complex due to maintaining non-breaking config changes
<ul>
<li> if iceberg.tables.dynamic-enabled is true then we route based on iceberg.tables.route-field, regardless of other fields
<li> if iceberg.tables.dynamic-enabled is false and iceberg.tables is empty, we use routeRegex
<li> if iceberg.tables.dynamic-enabled is false and iceberg.tables is populated and route-field is empty, we route to all listed tables
<li> as above, but if route-field is set we attempt to route dynamically and if that field does not exist we do the behavior above.
</ul>
<p>
The last option is required for Dead Letter Table handling while routing to iceberg.tables, as the dead letter table routing
is similar to dynamic routing: based on a field and under the control of the user. </p>
How to avoid the infinite loop problem?
*/
public static RecordRouter from(
WriterManager writers,
IcebergSinkConfig config,
ClassLoader loader,
SinkTaskContext context) {
WriterManager writers,
IcebergSinkConfig config,
ClassLoader loader,
SinkTaskContext context) {
RecordRouter baseRecordRouter;

if (config.dynamicTablesEnabled()) {
Preconditions.checkNotNull(
config.tablesRouteField(), "Route field cannot be null with dynamic routing");
config.tablesRouteField(), "Route field cannot be null with dynamic routing");
baseRecordRouter = new DynamicRecordRouter(writers, config.tablesRouteField());
} else {
if (config.tablesRouteField() == null) {
// validate all table identifiers are valid, otherwise exception is thrown
// as this is an invalid config setting, not an error during processing
if (config.tables() != null && !config.tables().isEmpty()) {
config.tables().forEach(TableIdentifier::of);
if (config.deadLetterTableEnabled()) {

// need a config option to find this one
if (config.tablesRouteField() != null) {
baseRecordRouter = new FallbackRecordRouter(new DynamicRecordRouter(writers, config.tablesRouteField()), new ConfigRecordRouter(writers, config.tables()));
} else{
} else {
baseRecordRouter = new ConfigRecordRouter(writers, config.tables());
}
} else {
// does this need a fallback or can it even have a fallback :thinking-face:
baseRecordRouter = new RegexRecordRouter(writers, config);
}
}
Expand All @@ -78,19 +99,19 @@ public static RecordRouter from(
String failedRecordFactoryClass = config.getFailedRecordHandler();
String handlerClass = config.getWriteExceptionHandler();
FailedRecordFactory factory =
(FailedRecordFactory) DeadLetterUtils.loadClass(failedRecordFactoryClass, loader);
(FailedRecordFactory) DeadLetterUtils.loadClass(failedRecordFactoryClass, loader);
factory.configure(config.failedRecordHandlerProperties());
WriteExceptionHandler handler =
(WriteExceptionHandler) DeadLetterUtils.loadClass(handlerClass, loader);
(WriteExceptionHandler) DeadLetterUtils.loadClass(handlerClass, loader);
handler.initialize(context, config, factory);
baseRecordRouter =
new RecordRouter.ErrorHandlingRecordRouter(baseRecordRouter, handler, writers, factory);
new RecordRouter.ErrorHandlingRecordRouter(baseRecordRouter, handler);
}

return baseRecordRouter;
}

private static class ConfigRecordRouter extends RecordRouter {
public static class ConfigRecordRouter extends RecordRouter {
private final List<String> tables;
private final WriterManager writers;

Expand All @@ -103,13 +124,13 @@ private static class ConfigRecordRouter extends RecordRouter {
public void write(SinkRecord record) {
// route to all tables
tables.forEach(
tableName -> {
writers.write(tableName, record, false);
});
tableName -> {
writers.write(tableName, record, false);
});
}
}

private static class RegexRecordRouter extends RecordRouter {
public static class RegexRecordRouter extends RecordRouter {
private final String routeField;
private final WriterManager writers;
private final IcebergSinkConfig config;
Expand All @@ -125,29 +146,29 @@ public void write(SinkRecord record) {
String routeValue = extractRouteValue(record.value(), routeField);
if (routeValue != null) {
config
.tables()
.forEach(
tableName ->
config
.tableConfig(tableName)
.routeRegex()
.ifPresent(
regex -> {
boolean matches;
try {
matches = regex.matcher(routeValue).matches();
} catch (Exception error) {
throw new WriteException.RouteRegexException(error);
}
if (matches) {
writers.write(tableName, record, false);
}
}));
.tables()
.forEach(
tableName ->
config
.tableConfig(tableName)
.routeRegex()
.ifPresent(
regex -> {
boolean matches;
try {
matches = regex.matcher(routeValue).matches();
} catch (Exception error) {
throw new WriteException.RouteRegexException(error);
}
if (matches) {
writers.write(tableName, record, false);
}
}));
}
}
}

private static class DynamicRecordRouter extends RecordRouter {
public static class DynamicRecordRouter extends RecordRouter {
private final String routeField;
private final WriterManager writers;

Expand All @@ -166,7 +187,7 @@ public void write(SinkRecord record) {
}
}

private static class FallbackRecordRouter extends RecordRouter {
public static class FallbackRecordRouter extends RecordRouter {
private final RecordRouter primary;
private final RecordRouter fallback;

Expand All @@ -177,43 +198,31 @@ private static class FallbackRecordRouter extends RecordRouter {

public void write(SinkRecord record) {
try {
primary.write(record); // this doesn't work because of the null. or rather test this out.
primary.write(record);
} catch (Exception error) {
fallback.write(record);
}
}
}

private static class ErrorHandlingRecordRouter extends RecordRouter {
public static class ErrorHandlingRecordRouter extends RecordRouter {
private final WriteExceptionHandler handler;
private final WriterManager writers;
private final RecordRouter router;
private final FailedRecordFactory failedRecordFactory;

ErrorHandlingRecordRouter(
RecordRouter baseRouter,
WriteExceptionHandler handler,
WriterManager writers,
FailedRecordFactory factory) {
RecordRouter baseRouter,
WriteExceptionHandler handler) {
this.router = baseRouter;
this.handler = handler;
this.writers = writers;
this.failedRecordFactory = factory;
}

@Override
public void write(SinkRecord record) {
if (failedRecordFactory.isFailedTransformRecord(record)) {
writers.write(failedRecordFactory.tableName(record), record, false);
} else {
try {
router.write(record);
} catch (Exception error) {
WriteExceptionHandler.Result result = handler.handle(record, error);
if (result != null) {
writers.write(result.tableName(), result.sinkRecord(), false);
}
}
try {
router.write(record);
} catch (Exception error) {
SinkRecord result = handler.handle(record, error);
router.write(result);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ public class WriteException extends RuntimeException {
tableIdentifier = null;
}

WriteException(String msg) {
super(msg);
tableIdentifier = null;
}

WriteException(TableIdentifier tableId, Throwable cause) {
super(cause);
this.tableIdentifier = tableId.toString();
Expand Down Expand Up @@ -80,6 +85,10 @@ public static class RouteException extends WriteException {
RouteException(Throwable cause) {
super(cause);
}

RouteException(String msg) {
super(msg);
}
}

public static class RouteRegexException extends WriteException {
Expand Down
Loading

0 comments on commit bff7233

Please sign in to comment.