-
Notifications
You must be signed in to change notification settings - Fork 52
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dead letter table #233
base: main
Are you sure you want to change the base?
Dead letter table #233
Conversation
- implements dead letter table handling as a config option
kafka-connect/src/main/java/io/tabular/iceberg/connect/data/IcebergWriterFactory.java
Outdated
Show resolved
Hide resolved
...-connect-deadletter/src/main/java/io/tabular/iceberg/connect/deadletter/DeadLetterUtils.java
Outdated
Show resolved
Hide resolved
...a-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/ErrorTransform.java
Show resolved
Hide resolved
...a-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/ErrorTransform.java
Outdated
Show resolved
Hide resolved
...a-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/ErrorTransform.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have more comments but I'm struggling to get past some of the limitations of the current approach like the fixed schema. I have a different take on the problem that I would strongly like for us to consider:
- Exceptions happening within SinkTask.put would be captured by a user configured WriteExceptionHandler and handled there as the user wants (write to a dead-letter-table, kafka, log it, whatever the user wants)
- Converter/SMT exceptions (i.e. things before SinkTask.put), users should configure the connector in iceberg.tables.dynamic-enabled with a iceberg.tables.route-field and write an exception-handling-SMT that points to the dead-letter-table.
...-connect-deadletter/src/main/java/io/tabular/iceberg/connect/deadletter/DeadLetterUtils.java
Outdated
Show resolved
Hide resolved
...-connect-deadletter/src/main/java/io/tabular/iceberg/connect/deadletter/DeadLetterUtils.java
Outdated
Show resolved
Hide resolved
...-connect-deadletter/src/main/java/io/tabular/iceberg/connect/deadletter/DeadLetterUtils.java
Outdated
Show resolved
Hide resolved
...a-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/ErrorTransform.java
Show resolved
Hide resolved
...a-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/ErrorTransform.java
Outdated
Show resolved
Hide resolved
kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Worker.java
Outdated
Show resolved
Hide resolved
kafka-connect/src/main/java/io/tabular/iceberg/connect/data/CatalogApi.java
Outdated
Show resolved
Hide resolved
kafka-connect/src/main/java/io/tabular/iceberg/connect/data/IcebergWriter.java
Outdated
Show resolved
Hide resolved
kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Worker.java
Outdated
Show resolved
Hide resolved
kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Worker.java
Outdated
Show resolved
Hide resolved
- substantiall reworks the PR - error transform and connector are connected via FailedRecordFactory - users can plug in their own schema shape for the failed records - users can dispatch to whatever dead letter table they want - bunch of classes moved around - pluggable WriteExceptionHandler introducered to catch failures - lots of code updated with custom WriteExceptions meant to be caught by the WriteExceptionHandler - works with or without the ErrorTransform in play
Substantial re-work. Still need to add tests for |
Largely been addressed in the latest update, appreciate the feedback and discussions we've had. |
kafka-connect/src/main/java/io/tabular/iceberg/connect/data/RecordRouter.java
Show resolved
Hide resolved
Need to add a config for third mode. Look at |
@@ -322,6 +325,114 @@ See above for creating the table | |||
} | |||
``` | |||
|
|||
## Dead Letter Table | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Example config with Dead Letter will be very useful
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Docs can always be improved. I'll try to take another stab at this.
This is a big feature with a somewhat clunky coinfig API due to config visibility rules in Kafka Connect, so more docs/examples certainly help.
Super useful functionality, thanks! |
...nect-deadletter/src/main/java/io/tabular/iceberg/connect/deadletter/FailedRecordFactory.java
Outdated
Show resolved
Hide resolved
...a-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/ErrorTransform.java
Outdated
Show resolved
Hide resolved
...a-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/ErrorTransform.java
Outdated
Show resolved
Hide resolved
...ransforms/src/main/java/io/tabular/iceberg/connect/transforms/TransformExceptionHandler.java
Outdated
Show resolved
Hide resolved
...-transforms/src/main/java/io/tabular/iceberg/connect/transforms/DefaultExceptionHandler.java
Outdated
Show resolved
Hide resolved
...a-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/ErrorTransform.java
Outdated
Show resolved
Hide resolved
public static final String KEY_HEADER = "t_original_key"; | ||
public static final String VALUE_HEADER = "t_original_value"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe you're de-risking the chance of a collision with an existing header by prefixing a t_
More out of curiosity, what does the t_
stand for?
And wondering if we can do a little better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, we could de-risk collisions more by just adding a single header t_original_record
which is a Struct with this kind of structure (psuedo-code) instead of adding 3 separate headers:
Struct {
OPTIONAL_BYTES_SCHEMA key,
OPTIONAL_BYTES_SCHEMA value,
OPTIONAL_ARRAY_HEADER_SCHEMA headers,
}
nit: I would also name the header something specific to iceberg-kafka-connect IDK something along the lines of kafka.connect.iceberg.error.transform.original.record
or something (obviously this is too long but you get the idea).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are correct in derisking collisions. I chose t
for tabular.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I chose t for tabular.
🤦 should have figured that one out ....
implementation libs.iceberg.core | ||
implementation libs.iceberg.common | ||
implementation libs.iceberg.guava |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need any of the actual iceberg functionality in this module?
implementation libs.iceberg.core | |
implementation libs.iceberg.common | |
implementation libs.iceberg.guava |
The only thing you do need is this import :D
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
List<Struct> headers = Lists.newArrayList();
Which IMO you can just replace with this safely.
import java.util.ArrayList;
@SuppressWarnings("RegexpSingleline")
List<Struct> headers = new ArrayList<>();
...adletter/src/main/java/io/tabular/iceberg/connect/deadletter/DefaultFailedRecordFactory.java
Outdated
Show resolved
Hide resolved
...adletter/src/main/java/io/tabular/iceberg/connect/deadletter/DefaultFailedRecordFactory.java
Outdated
Show resolved
Hide resolved
...adletter/src/main/java/io/tabular/iceberg/connect/deadletter/DefaultFailedRecordFactory.java
Outdated
Show resolved
Hide resolved
...adletter/src/main/java/io/tabular/iceberg/connect/deadletter/DefaultFailedRecordFactory.java
Outdated
Show resolved
Hide resolved
...nect-deadletter/src/main/java/io/tabular/iceberg/connect/deadletter/FailedRecordFactory.java
Outdated
Show resolved
Hide resolved
...adletter/src/main/java/io/tabular/iceberg/connect/deadletter/DefaultFailedRecordFactory.java
Outdated
Show resolved
Hide resolved
...nect-deadletter/src/main/java/io/tabular/iceberg/connect/deadletter/FailedRecordFactory.java
Outdated
Show resolved
Hide resolved
kafka-connect/src/main/java/io/tabular/iceberg/connect/data/DefaultWriteExceptionHandler.java
Outdated
Show resolved
Hide resolved
...adletter/src/main/java/io/tabular/iceberg/connect/deadletter/DefaultFailedRecordFactory.java
Outdated
Show resolved
Hide resolved
...nect-deadletter/src/main/java/io/tabular/iceberg/connect/deadletter/FailedRecordFactory.java
Outdated
Show resolved
Hide resolved
kafka-connect/src/main/java/io/tabular/iceberg/connect/data/WriteExceptionHandler.java
Outdated
Show resolved
Hide resolved
kafka-connect/src/main/java/io/tabular/iceberg/connect/data/RecordRouter.java
Outdated
Show resolved
Hide resolved
kafka-connect/src/main/java/io/tabular/iceberg/connect/IcebergSinkConfig.java
Outdated
Show resolved
Hide resolved
kafka-connect/src/main/java/io/tabular/iceberg/connect/IcebergSinkConfig.java
Outdated
Show resolved
Hide resolved
kafka-connect/src/main/java/io/tabular/iceberg/connect/data/DefaultWriteExceptionHandler.java
Outdated
Show resolved
Hide resolved
import org.apache.kafka.connect.sink.SinkRecord; | ||
import org.apache.kafka.connect.sink.SinkTaskContext; | ||
|
||
public class DefaultWriteExceptionHandler implements WriteExceptionHandler { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's concerning because that means other users defining their own WriteExceptionHandler
implementations can't reference those exceptions either?
- simplify original bytes location - clean up unused methods, values - simplify write exception interface - move failed record handler initialization to be write exception handler specific - clean up error transform converters - close SMT and converters
Hi, any ETA for this feature? |
Thank you for implementing this valuable feature..! It will solve the many issues like below |
Hi, do you have an ETA for when this PR will be merged? |
This would be much appreciated to have as we are dealing with it and skipping the problematic offset leads to loss of data. 😞 |
Implements an (optional) dead letter table where failed messages go to a dedicated Iceberg table. This functionality aims to improve and simplify the error handling provided by Kafka Connect. Kafka Connect's dead letter queue only handles deserialization and SMT failures and writes to another Kafka topic where it requires additional engineering effort to inspect and recover messages. With this PR, errors are written to a dedicated Iceberg Table where messages can be inspected and recovered using tools users may be more comfortable with (Spark, etc). The table row contains everything required to convert a row back into a Kafka
ProducerRecord
; however, the functionality to do this is engine specific and not provided in this PR.This PR aims to minimize stream processing exceptions from imperfect producers by writing to the Dead Letter Table rather than failing constantly and causing rebalances inside of the Kafka Cluster which can negatively affect other jobs.
It is comprised of two components:
Worker.java
/IcebergWriterFactory.java
to catch issues around table creation, schema parsing, and Iceberg record conversionNot all errors result in conversion to records for the Dead Letter Table. For example, network/connection errors thrown during table operations w/ the underlying Catalog will still fail the connector (as a form of retry when the connector is restarted).
This is opt in. Users can decide not to use this, use the Kafka Connect DLQ, ignore errors, or fail on all errors just like previous functionality.
Error Transform
Kafka Connects
value
,key
, andheader
converters must beByteArrayConverters
. The desired converters (AvroConverter
,JsonConverter
, etc.) are supplied to theErrorTransform
SMT along with any other SMTs.Map<String, Object>
is constructed that contains the deserialized and transformedSinkRecord
as well as the original key, value, and header bytes of the message.SinkRecord
ofStruct
is created containing failure metadata such as kafka metadata, exception, stack trace, and original key, value, and header bytes.Changes to
Worker.java
/IcebergWriterFactory.java
When configured to use the
DeadLetterTable
the connector expects to messages to be in the shape of the data from theErrorTransform
SMT. Failed records from the SMT will be written to the Dead Letter Table. Successfully transformed SinkRecords will attempt the normal connector flow. If it fails for non-transient reasons, the original key, value, and header bytes in the specially transformed record are used to construct aSinkRecord
for the Dead Letter Table with the required Kafka and error metadata before being written via normal table fanout.Limitations
This is the first PR. Additional work is required for some advanced Converters such as the
AvroConverter
, where finely grained exception handling needs to be implemented to differentiate between real Avro errors (e.g. the message is not valid Avro bytes or the message does not have an entry in the Schema registry) and network related Avro exceptions (e.g. contacting the Schema registry times out). This is planned in future PRs. In the interim, an ErrorHandler class is exposed as a config option for both converters and SMTs and can be extended by users to implement the required error handling (and rethrowing) for advanced Converters / custom Converters / etc.