Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dead letter table #233

Open
wants to merge 36 commits into
base: main
Choose a base branch
from
Open

Dead letter table #233

wants to merge 36 commits into from

Conversation

tabmatfournier
Copy link
Contributor

@tabmatfournier tabmatfournier commented Apr 9, 2024

Implements an (optional) dead letter table where failed messages go to a dedicated Iceberg table. This functionality aims to improve and simplify the error handling provided by Kafka Connect. Kafka Connect's dead letter queue only handles deserialization and SMT failures and writes to another Kafka topic where it requires additional engineering effort to inspect and recover messages. With this PR, errors are written to a dedicated Iceberg Table where messages can be inspected and recovered using tools users may be more comfortable with (Spark, etc). The table row contains everything required to convert a row back into a Kafka ProducerRecord; however, the functionality to do this is engine specific and not provided in this PR.

Location of Failure Kafka Connect DLQ This PR
Deserialization/Converter Yes Yes
SMT Yes Yes
Table creation / schema issues No Yes
Iceberg record conversion No Yes
Malformed records (e.g. missing table route information) No Yes
Schema evolution issues No Yes

This PR aims to minimize stream processing exceptions from imperfect producers by writing to the Dead Letter Table rather than failing constantly and causing rebalances inside of the Kafka Cluster which can negatively affect other jobs.

It is comprised of two components:

  1. An ErrorTransform SMT that wraps the Deserializer and zero or more SMTs
  2. Changes to Worker.java / IcebergWriterFactory.java to catch issues around table creation, schema parsing, and Iceberg record conversion

Not all errors result in conversion to records for the Dead Letter Table. For example, network/connection errors thrown during table operations w/ the underlying Catalog will still fail the connector (as a form of retry when the connector is restarted).

This is opt in. Users can decide not to use this, use the Kafka Connect DLQ, ignore errors, or fail on all errors just like previous functionality.

Error Transform

Kafka Connects value, key, and header converters must be ByteArrayConverters. The desired converters (AvroConverter, JsonConverter, etc.) are supplied to the ErrorTransform SMT along with any other SMTs.

  • On deserialization success and SMT transformation success: A special record containing a Map<String, Object> is constructed that contains the deserialized and transformed SinkRecord as well as the original key, value, and header bytes of the message.
  • On deserialization failure OR SMT transformation failure: a SinkRecord of Struct is created containing failure metadata such as kafka metadata, exception, stack trace, and original key, value, and header bytes.

Changes to Worker.java / IcebergWriterFactory.java

When configured to use the DeadLetterTable the connector expects to messages to be in the shape of the data from the ErrorTransform SMT. Failed records from the SMT will be written to the Dead Letter Table. Successfully transformed SinkRecords will attempt the normal connector flow. If it fails for non-transient reasons, the original key, value, and header bytes in the specially transformed record are used to construct a SinkRecord for the Dead Letter Table with the required Kafka and error metadata before being written via normal table fanout.

Limitations

This is the first PR. Additional work is required for some advanced Converters such as the AvroConverter, where finely grained exception handling needs to be implemented to differentiate between real Avro errors (e.g. the message is not valid Avro bytes or the message does not have an entry in the Schema registry) and network related Avro exceptions (e.g. contacting the Schema registry times out). This is planned in future PRs. In the interim, an ErrorHandler class is exposed as a config option for both converters and SMTs and can be extended by users to implement the required error handling (and rethrowing) for advanced Converters / custom Converters / etc.

@tabmatfournier tabmatfournier marked this pull request as ready for review April 10, 2024 15:48
@tabmatfournier tabmatfournier requested a review from fqtab April 10, 2024 17:45
Copy link
Contributor

@fqtab fqtab left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have more comments but I'm struggling to get past some of the limitations of the current approach like the fixed schema. I have a different take on the problem that I would strongly like for us to consider:

  • Exceptions happening within SinkTask.put would be captured by a user configured WriteExceptionHandler and handled there as the user wants (write to a dead-letter-table, kafka, log it, whatever the user wants)
  • Converter/SMT exceptions (i.e. things before SinkTask.put), users should configure the connector in iceberg.tables.dynamic-enabled with a iceberg.tables.route-field and write an exception-handling-SMT that points to the dead-letter-table.

See #243 #244 for draft implementations of the idea

- substantiall reworks the PR
- error transform and connector are connected via FailedRecordFactory
  - users can plug in their own schema shape for the failed records
  - users can dispatch to whatever dead letter table they want
- bunch of classes moved around
- pluggable WriteExceptionHandler introducered to catch failures
- lots of code updated with custom WriteExceptions meant to be caught by the WriteExceptionHandler
- works with or without the ErrorTransform in play
@tabmatfournier
Copy link
Contributor Author

Substantial re-work. Still need to add tests for ErrorHandlingRecordRouter but the code is more or less there now.

@tabmatfournier
Copy link
Contributor Author

I have more comments but I'm struggling to get past some of the limitations of the current approach like the fixed schema. I have a different take on the problem that I would strongly like for us to consider:

  • Exceptions happening within SinkTask.put would be captured by a user configured WriteExceptionHandler and handled there as the user wants (write to a dead-letter-table, kafka, log it, whatever the user wants)
  • Converter/SMT exceptions (i.e. things before SinkTask.put), users should configure the connector in iceberg.tables.dynamic-enabled with a iceberg.tables.route-field and write an exception-handling-SMT that points to the dead-letter-table.

See #243 #244 for draft implementations of the idea

Largely been addressed in the latest update, appreciate the feedback and discussions we've had.

@tabmatfournier
Copy link
Contributor Author

Need to add a config for third mode. Look at IntegrationMultiTableTest where both Iceberg.tables and a regex is set --I can't differentiate this case from the static routing (with dynamic fallback for dead letter routing). HRM.

README.md Outdated Show resolved Hide resolved
@@ -322,6 +325,114 @@ See above for creating the table
}
```

## Dead Letter Table

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Example config with Dead Letter will be very useful

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docs can always be improved. I'll try to take another stab at this.

This is a big feature with a somewhat clunky coinfig API due to config visibility rules in Kafka Connect, so more docs/examples certainly help.

@ron-trail
Copy link

Super useful functionality, thanks!

Comment on lines 54 to 55
public static final String KEY_HEADER = "t_original_key";
public static final String VALUE_HEADER = "t_original_value";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you're de-risking the chance of a collision with an existing header by prefixing a t_
More out of curiosity, what does the t_ stand for?
And wondering if we can do a little better.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, we could de-risk collisions more by just adding a single header t_original_record which is a Struct with this kind of structure (psuedo-code) instead of adding 3 separate headers:

Struct {
   OPTIONAL_BYTES_SCHEMA key, 
   OPTIONAL_BYTES_SCHEMA value, 
   OPTIONAL_ARRAY_HEADER_SCHEMA headers, 
}

nit: I would also name the header something specific to iceberg-kafka-connect IDK something along the lines of kafka.connect.iceberg.error.transform.original.record or something (obviously this is too long but you get the idea).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct in derisking collisions. I chose t for tabular.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chose t for tabular.

🤦 should have figured that one out ....

Comment on lines 6 to 8
implementation libs.iceberg.core
implementation libs.iceberg.common
implementation libs.iceberg.guava
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need any of the actual iceberg functionality in this module?

Suggested change
implementation libs.iceberg.core
implementation libs.iceberg.common
implementation libs.iceberg.guava

The only thing you do need is this import :D

import org.apache.iceberg.relocated.com.google.common.collect.Lists;

List<Struct> headers = Lists.newArrayList();

Which IMO you can just replace with this safely.

import java.util.ArrayList;

@SuppressWarnings("RegexpSingleline")
List<Struct> headers = new ArrayList<>();

import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;

public class DefaultWriteExceptionHandler implements WriteExceptionHandler {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's concerning because that means other users defining their own WriteExceptionHandler implementations can't reference those exceptions either?

- simplify original bytes location
- clean up unused methods, values
- simplify write exception interface
- move failed record handler initialization to be write exception handler specific
- clean up error transform converters
- close SMT and converters
@ron-trail
Copy link

Hi, any ETA for this feature?

@preetpuri
Copy link

Hi, do you have an ETA for when this PR will be merged?

@Daesgar
Copy link

Daesgar commented Oct 14, 2024

This would be much appreciated to have as we are dealing with it and skipping the problematic offset leads to loss of data. 😞

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants