Skip to content

Commit

Permalink
cruft cleanup and starting docs
Browse files Browse the repository at this point in the history
  • Loading branch information
tabmatfournier committed May 13, 2024
1 parent 205c2d7 commit 4aeedcd
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 25 deletions.
69 changes: 69 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ The zip archive will be found under `./kafka-connect-runtime/build/distributions
| iceberg.hadoop-conf-dir | If specified, Hadoop config files in this directory will be loaded |
| iceberg.hadoop.* | Properties passed through to the Hadoop configuration |
| iceberg.kafka.* | Properties passed through to control topic Kafka client initialization |
| iceberg.tables.deadletter.handler | See Dead Letter Table Mode |
| iceberg.tables.deadletter.record_factpry | See Dead Letter Table Mode |
| iceberg.tables.deadletter.record_factory.* | see Dead Letter Table Mode |

If `iceberg.tables.dynamic-enabled` is `false` (the default) then you must specify `iceberg.tables`. If
`iceberg.tables.dynamic-enabled` is `true` then you must specify `iceberg.tables.route-field` which will
Expand Down Expand Up @@ -322,6 +325,72 @@ See above for creating the table
}
```

## Dead Letter Table

The connector can be configured to write to one or more Dead Letter iceberg tables, with capability beyond
what is offered from Kafka Connects Dead Letter Queue implementation.

| Location of Failure | Kafka Connect DLQ | Dead Letter Table Mode |
|----------------------------------------------------------|-------------------|------------------------|
| Deserialization/Converter | Yes | Yes* |
| SMT | Yes | Yes* |
| Table creation / schema issues | No | Yes |
| Iceberg record conversion | No | Yes |
| Malformed records (e.g. missing table route information) | No | Yes |
| Schema evolution issues | No | Yes |

If the `ErrorTransform` SMT is not used, it may be challenging to put records into the Dead Letter Table other than
recording metadata (Topic, Partition, Offset) while dropping the message.

If the `ErrorTransform` SMT is used, failures can include the original bytes of the message in the Iceberg Table
that can be extracted/inspected using a downstream query engine.

In order to use the ErrorTransform:

... todo

In order to turn on Dead Letter Table mode:

... todo

### Routing

Dead Letter Table routing is a variation on Dynamic Routing --that is, a route field can be added by the
FailedRecordHandler that can be used to dispatch to one or more Dead Letter tables.

| iceberg.tables | dynamic-enabled | route-field | routing behavior |
|----------------|-----------------|--------------|--------------------------------------------------------------|
| empty | true | populated | DynamicRecordRouter |
| empty | false | populated | RegexRouter |
| populated | false | populated | RegexRouter if iceberg.table.\<table name\>.route-regex set |
| populated | false | null | ConfigRouter |
| populated | false | populated | DynamicRecordRouter then ConfigRouter |

### Partitioning

The following properties still apply to the Dead Letter Table. The partition-by field can be used to customize the
partitioning of the Dead Letter table(s).

| Property | Description |
|--------------------------------------------|------------------------------------------------------------------------------------------------|
| iceberg.table.\<table name\>.commit-branch | Table-specific branch for commits, use `iceberg.tables.default-commit-branch` if not specified |
| iceberg.table.\<table name\>.id-columns | Comma-separated list of columns that identify a row in the table (primary key) |
| iceberg.table.\<table name\>.partition-by | Comma-separated list of partition fields to use when creating the table |

### ErrorTransform specifics

... todo

### WriteExceptionHandler Contract

... todo

### FailedRecordHandler Contract

... todo

### Provided Handlers

## Resources

* [Running IcebergSinkConnector locally](https://github.com/wuerike/kafka-iceberg-streaming)
Original file line number Diff line number Diff line change
Expand Up @@ -44,34 +44,9 @@ protected final String extractRouteValue(Object recordValue, String routeField)
throw new WriteException.RouteException(error);
}

// confirm if this can even happen
return routeValue == null ? null : routeValue.toString();
}


/*
|iceberg.tables|dynamic-enables|route-field| routing behavior |
|--------------|---------------|----------------|-------------|
| empty | true | populated | DynamicRecordRouter |
| empty | false | populated | RegexRouter |
| populated | false | null | ConfigRouter |
| populated | false | populated | DynamicRecordRouter then ConfigRouter |
what does iceberg.tables.default-commit-branch do ?
Record routing is complex due to maintaining non-breaking config changes
<ul>
<li> if iceberg.tables.dynamic-enabled is true then we route based on iceberg.tables.route-field, regardless of other fields
<li> if iceberg.tables.dynamic-enabled is false and iceberg.tables is empty, we use routeRegex
<li> if iceberg.tables.dynamic-enabled is false and iceberg.tables is populated and route-field is empty, we route to all listed tables
<li> as above, but if route-field is set we attempt to route dynamically and if that field does not exist we do the behavior above.
</ul>
<p>
The last option is required for Dead Letter Table handling while routing to iceberg.tables, as the dead letter table routing
is similar to dynamic routing: based on a field and under the control of the user. </p>
How to avoid the infinite loop problem?
*/
public static RecordRouter from(
WriterManager writers,
IcebergSinkConfig config,
Expand Down

0 comments on commit 4aeedcd

Please sign in to comment.