Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add WriteExceptionHandler #243

Closed
wants to merge 1 commit into from
Closed

Add WriteExceptionHandler #243

wants to merge 1 commit into from

Conversation

fqtab
Copy link
Contributor

@fqtab fqtab commented Apr 23, 2024

Adds a configuration option to specify a WriteExceptionHandler to use to handle any exceptions while writing SinkRecords to files.

I haven't tested any of this yet :D but in theory:

  • Exceptions happening within SinkTask.put would be captured by the user configured WriteExceptionHandler and handled there.
  • Converter/SMT exceptions (i.e. things before SinkTask.put), users should configure the connector in iceberg.tables.dynamic-enabled with a iceberg.tables.route-field and write an exception-handling-SMT that points to the the dead-letter-table.
  • We should provide sample implementations of WriteExceptionHandler and an ExceptionHandlingSMT that should work for 90% of usecases.

Pros

  1. Doesn't require a specific schema for dead-letter tables
    • The nice thing here is users can have any schema for their "dead-letter" table
    • If you only want to write out topic, partition, and offset, you can
    • If you want to write out keys and values as byte[], you can
    • If you want to write out keys and values as JSON, you can
  2. Doesn't limit you to a single dead-letter-table per connector
    • Users can have as many or as few dead letter tables as they want, just write an appropriate handler to do it
    • E.g. if a user wants a dead-letter-table per topic, users can just write a WriteExceptionHandler that sends bads record from different topics to different tables.
  3. The connector code remains completely oblivious to Converter/SMT failures
  4. Users have fine-grained control over which exceptions to handle (or not)
  5. Similarly, we (maintainers) don't have to worry about adding exception handling code to the connector
    • I'm particularly worried about REST catalogs which only really have a REST spec. There is no guarantee an exception thrown by one REST Catalog is non-transient versus another.
  6. Users aren't forced to write bad records to an iceberg table
    • Although I expect this feature to be used mostly for writing bad records to an iceberg table, it's not compulsory
    • Some users might want to write to Kafka and this is easy to do in Kafka Connect via the SinkTaskContext.errantRecordReporter.report API
    • Users are also welcome to write to other systems (e.g. Redis, logs etc.) via a custom WriteExceptionHandler implementation
  7. Relatively straight-forward change to the codebase
  8. Minor: Doesn't require a specific record structure (isFailed parameter, PAYLOAD_KEY, etc.) for all records in "dead-letter-table mode" (unless the user needs it for their WriteExceptionHandler implementation)
    1. This is minor because I imagine that at least 50% of use cases will want this but I still think we can do better than isFailed parameter and PAYLOAD_KEY and things.

Cons

  1. If users want to write Converter/SMT failures to dead-letter-table, they need to use the connector in dynamic
    • Dynamic mode as it's implemented today only allows you to write to one table at a time i.e. no table fanout.
    • However, we should be able to alleviate this issue by allowing users to provide a comma-separated list of tables in the route field (im not sure why this wasn't done in the first place so we should double check here). This should be a separate PR though.
    • Even if we can't alleviate this issue, I'm happy to accept this as a compromise (since the semantics are gonna be a little weird with multiple tables involved) but that's debatable.
    • @ajreid21 had another good idea on addressing the dynamic-mode constraint here. We could potentially change the connector a little bit so that in dynamic mode, it defaults to looking at the route-field but if the route field doesn't exist, it could fall back to some statically defined values (just like in static mode).
  2. Users have to write a correct WriteExceptionHandler implementation
    • We can mitigate this by writing a sample WriteExceptionHanlder implementation, similar to how we provide sample SMT implementations already
  3. Users have to make sure their WriteExceptionHandler implementation is available on the classpath
    • Users already have to make sure that Converter/SMT implementations are available on the classpath so this shouldn't be anything new to most users
    • If we really wanted to, we could include some WriteExceptionHandler implementations in the kafka-connect-iceberg project (but we should get some stable implementations first)

How to

It might be a little hard to see how people could use this feature for specific use cases so I've prepared a separate draft PR to show this: #244

@fqtab fqtab mentioned this pull request Apr 23, 2024
@fqtab fqtab marked this pull request as draft April 23, 2024 17:04
@fqtab
Copy link
Contributor Author

fqtab commented Apr 23, 2024

@tabmatfournier this is just a draft to illustrate the idea but feel free to ask any questions here.
I don't intend to merge this; I assume you'll pull in any changes back into your PR if we go with this approach.

@tabmatfournier
Copy link
Contributor

Thanks. I'll take a look

@Override
public void write(SinkRecord sinkRecord, String tableName, boolean ignoreMissingTable) {
try {
underlying.write(sinkRecord, tableName, ignoreMissingTable);
Copy link
Contributor

@tabmatfournier tabmatfournier Apr 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very broad net here which makes me uncomfortable.

In my PR there is a reason why I introduced CatalogAPI wrapping a bunch of the calls to Iceberg / changes to IcebergWriterFactory where most of this is coming from. Is it schema failing? is it creating the table failing? is it evolving the table failure? Is it the catalog failing? is it the partitionspec failing? Much more fine grained control on the errors. This is catching a ton of errors from many sources and the opportunities for getting it wrong are large.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can keep something like CatalogAPI that throws clear Exception classes e.g. SchemaEvolveError, TableCreateError, PartitionEvolutionError. I'm not opposed to that.

Having the broad net here is necessary to enable WriteExceptionHandler implementations. It's down to the WriteExceptionHandler implementation to decide which errors it wants to recover from and which it doesn't. I most certainly don't recommend sending everything that throws an Exception to the dead-letter-table as some of those could be transient.

Copy link
Contributor

@tabmatfournier tabmatfournier Apr 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I raise a DeadLetterException wrapping the underlying exception in some cases. Again, it's not that easy/clean when you start putting it all together.

@tabmatfournier
Copy link
Contributor

tabmatfournier commented Apr 23, 2024

Not sure this is much of an improvement TBH. Very broad net.

The real problem is you have a bunch of errors in the writer .write method which happen long before you write a record to iceberg:

  • a laundry list of issues creating a table (table names, partition specs, etc) including values from hardcoded configs
  • partition evolution cases
  • iceberg record conversion issues

Actually writing (to an appender) is not the issue. But the above logic is split between the WriterFactory and the writer itself.

Might be worth pulling that out into distinct steps (catalogAPI does this for the first two cases, may be worth introducing a RecordConverter outside of the writer). It might make the wrapping you want more clear.

@tabmatfournier
Copy link
Contributor

< E.g. if a user wants a dead-letter-table per topic, users can just write a WriteExceptionHandler that sends bads record from different topics to different tables.

This is nice but you also have to handle cases where you don't know the table it's going to (because the table name itself is invalid and the catalog throws) when trying to create it. It's not that simple.

@tabmatfournier
Copy link
Contributor

tabmatfournier commented Apr 23, 2024

7. Relatively straight-forward change to the codebase disagree, and there is functionality missing that once added ... will make this more complicated.

Current plan:

  1. refactor table creation/schema stuff more cleanly (this is mostly CatalogAPI in my PR)
  2. refactor record conversion into classes
  3. Introduce an interface for exception handling (per this PR)
  4. Look into modifying the special record shape --Mine creates a special struct, this suggests throwing it all onto the keys but the fact remains it's still a special record that needs information to be in a specific space. This means the pluggable error handling in the ErrorTransform needs to still produce a special record and since that is pluggable, you have to be careful, as the user needs to know in the connector how to dig out that record also in their pluggable interface. At some point you have provided nothing but a footgun API --imho, we have to make some opinions here not just a bunch of interfaces that the user needs to "know" to glue together (even though it's not obvious in code, due to constraints forced on the system by Kafka Connect). If this can't be done cleanly, we should talk again to see what can be done. I don't think it's as easy as what has been presented in your two draft PRs.

@tabmatfournier
Copy link
Contributor

  • Converter/SMT exceptions (i.e. things before SinkTask.put), users should configure the connector in iceberg.tables.dynamic-enabled with a iceberg.tables.route-field and write an exception-handling-SMT that points to the the dead-letter-table.

I don't want to gate keep this feature only for users who use dynamic-enabled. There is nothing here restricting other users from using this other than "simplifying code" (of course it's simplifying code --we are ignoring valid configurations).

I believe this to be a deal breaker.

@tabmatfournier
Copy link
Contributor

tabmatfournier commented Apr 23, 2024

Similarly, we (maintainers) don't have to worry about adding exception handling code to the connector

  • I'm particularly worried about REST catalogs which only really have a REST spec. There is no guarantee an exception thrown by one REST Catalog is non-transient versus another.

If that is the case, why are we providing any implementations at all. IMHO we can't get away with just providing the interface for users. We have to provide some amount of default implementation.

@Override
public void write(SinkRecord sinkRecord, String tableName, boolean ignoreMissingTable) {
try {
underlying.write(sinkRecord, tableName, ignoreMissingTable);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will also cause infinite loops if you fail to create the dead letter table (possibly because the dead letter route has been set to an invalid value). Also an issue that this breaks due to whether or not autocreate tables is configured on.

Again, this is why CatalogAPI exists in the other API:

  • it knows when you are creating / doing something with the dead letter table, so throw those errors instead of attempting to throw them into a dead letter table and do not catch the exception
  • when you are doing something with a regular record/not dead letter table, then apply the error handler.

I can still do the above w/ the error handler, but you can't have this broad write here --too many things downstream, some of which involve hard coded configs, some of which will involve the dead letter table --e.g. we support partitioning the dead letter table via the normal way to provide partition specs for any table, if that fails, you need to hard fail the connector.

The above would infinite loop in that case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will also cause infinite loops

I deliberately wrote it recursively. If users (which could be us, the maintainers!) feel an exception is transient, they're welcome to retry via the API, I don't feel we should restrict that option.

possibly because the dead letter route has been set to an invalid value

There is no dead-letter-table-route with this approach as far as the connector code is concerned.
The WriteExceptionHandler may certainly try to write to a dead-letter-table but that is a WriteExceptionHandler implementation detail.

I can still do the above w/ the error handler, but you can't have this broad write here --too many things downstream, some of which involve hard coded configs, some of which will involve the dead letter table --e.g. we support partitioning the dead letter table via the normal way to provide partition specs for any table, if that fails, you need to hard fail the connector.

With this approach, a dead-letter-table is like any other ordinary table so you can still "partitioning the dead letter table via the normal way to provide partition specs for any table."

if that fails, you need to hard fail the connector.

No problems, you can still achieve that here with the WriteExceptionHandler approach. The WriteExceptionHandler implementation just needs to ensure that if it sees a PartitionSpecEvolutionError for what it considers to be a dead-letter-table, it should just hard-fail :)

Again, this is why CatalogAPI exists in the other API:

Like I said, I'm not opposed to having a concept like the Catalog API in the connector code if it makes it easier to react to clear and actionable exceptions.

@tabmatfournier
Copy link
Contributor

  • Users can have as many or as few dead letter tables as they want, just write an appropriate handler to do it

Is this something we want?

Remember: you will still need a default dead letter table because there are many cases where you don't know where to send the record to (error transform failures are a good example of this), so you are left to config value/fn of connector name. I would argue topic name is a poor choice but you could do that if you wanted. It is a poor choice because the user may be running several connectors for the same topic and using different SMTs.

I'm not convinced you can just "write an appropriate handler to do it" because you often won't have any context for where to route to.

@fqtab
Copy link
Contributor Author

fqtab commented Apr 24, 2024

Not sure this is much of an improvement TBH. Very broad net.

The real problem is you have a bunch of errors in the writer .write method which happen long before you write a record to iceberg:

  • a laundry list of issues creating a table (table names, partition specs, etc) including values from hardcoded configs
  • partition evolution cases
  • iceberg record conversion issues

Actually writing (to an appender) is not the issue. But the above logic is split between the WriterFactory and the writer itself.

Might be worth pulling that out into distinct steps (catalogAPI does this for the first two cases, may be worth introducing a RecordConverter outside of the writer). It might make the wrapping you want more clear.

Oh my bad, i threw this together pretty quickly. I thought most of the errors would only happen when we write SinkRecords to files but if there are errors we think it would be worth catching higher up (e.g. when creating a Writer), then we can certainly move the try {} catch () to a higher level, not opposed to that.

@fqtab
Copy link
Contributor Author

fqtab commented Apr 24, 2024

  • Converter/SMT exceptions (i.e. things before SinkTask.put), users should configure the connector in iceberg.tables.dynamic-enabled with a iceberg.tables.route-field and write an exception-handling-SMT that points to the the dead-letter-table.

I don't want to gate keep this feature only for users who use dynamic-enabled. There is nothing here restricting other users from using this other than "simplifying code" (of course it's simplifying code --we are ignoring valid configurations).

I believe this to be a deal breaker.

No problems, there are things we can do to mitigate this.
Alex had a pretty good idea about allowing dynamic mode to fallback to static mapping if the route field is missing/null.
I see that as a valuable feature for usecases outside of handling converter/SMT failures so I would be in favour of supporting that change to the connector code.
That should address this concern entirely unless I'm missing something else.

@tabmatfournier
Copy link
Contributor

there are errors we think it would be worth catching higher up (e.g. when creating a Writer), then we can certainly move the try {} catch () to a higher level, not opposed to that.

It's not that easy. Unfortunately where you need to catch is a shotgun (worker, writer, writer factory, schemautils, etc.)

@tabmatfournier
Copy link
Contributor

  • Converter/SMT exceptions (i.e. things before SinkTask.put), users should configure the connector in iceberg.tables.dynamic-enabled with a iceberg.tables.route-field and write an exception-handling-SMT that points to the the dead-letter-table.

I don't want to gate keep this feature only for users who use dynamic-enabled. There is nothing here restricting other users from using this other than "simplifying code" (of course it's simplifying code --we are ignoring valid configurations).
I believe this to be a deal breaker.

No problems, there are things we can do to mitigate this. Alex had a pretty good idea about allowing dynamic mode to fallback to static mapping if the route field is missing/null. I see that as a valuable feature for usecases outside of handling converter/SMT failures so I would be in favour of supporting that change to the connector code. That should address this concern entirely unless I'm missing something else.

Not opposed but I can show you how this is tricky/challenging in a screenshare.

@fqtab
Copy link
Contributor Author

fqtab commented Apr 24, 2024

If that is the case, why are we providing any implementations at all. IMHO we can't get away with just providing the interface for users. We have to provide some amount of default implementation.

Absolutely.
That's why your PR is offering an ExceptionHandlingTransform.
The only additional thing this PR is requiring is also an appropriate WriteExceptionHandler. We need to provide a good default implementation of that. And we can do that correctly for Tabular's REST Catalog with a TabularWriteExceptionHandler for example (+ Hive + anything else, up to us).
For other REST Catalogs? We can't offer anything of the sort without seeing their code.

@fqtab
Copy link
Contributor Author

fqtab commented Apr 24, 2024

there are errors we think it would be worth catching higher up (e.g. when creating a Writer), then we can certainly move the try {} catch () to a higher level, not opposed to that.

It's not that easy. Unfortunately where you need to catch is a shotgun (worker, writer, writer factory, schemautils, etc.)

Sure, let's pair later, sounds like I might have dodged a lot of the complexity by focusing on only the write path :D

Scratch that, this should cover all errors coming from WriterFactory, IcebergWriter, and SchemaUtills as well?
https://github.com/tabular-io/iceberg-kafka-connect/blob/925dbcb74627a9c53370db55ec8967677c5605a2/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/MultiTableWriter.java#L49

@fqtab fqtab closed this May 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants