Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dead letter table #233

Open
wants to merge 36 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
03c3e36
dead-letter-table
tabmatfournier Apr 4, 2024
e87d820
next steps
tabmatfournier Apr 4, 2024
b062c51
moved transform in
tabmatfournier Apr 5, 2024
9b7f6b0
more refactoring
tabmatfournier Apr 8, 2024
c1b4de2
even more refactoring
tabmatfournier Apr 8, 2024
4e706e1
cruft
tabmatfournier Apr 8, 2024
f9136d8
even more cruft
tabmatfournier Apr 8, 2024
d3905a5
tests
tabmatfournier Apr 8, 2024
18c79ba
table create exceptions
tabmatfournier Apr 9, 2024
77aad4b
introduces identifier column, more error handle wrapping
tabmatfournier Apr 9, 2024
edc75a5
make converter/smt error converters configurable/user extensible
tabmatfournier Apr 9, 2024
8ee6840
introduce catalogApi to make IcebergWriterFactory testing easier
tabmatfournier Apr 9, 2024
de479d3
catalogApi and test stubs
tabmatfournier Apr 9, 2024
b78a68d
finished tests
tabmatfournier Apr 10, 2024
ac5d6a5
negate
tabmatfournier Apr 10, 2024
50b300b
dead-letter-table
tabmatfournier Apr 26, 2024
01f8cbb
put null record dropping back into iceberg writer
tabmatfournier Apr 26, 2024
d89f15a
fix dead letter utils private constructor
tabmatfournier Apr 26, 2024
c5c2186
fix cruft in readme
tabmatfournier Apr 26, 2024
831f205
Merge branch 'main' into dead-letter-table
tabmatfournier Apr 29, 2024
451e20f
Merge branch 'main' into dead-letter-table
tabmatfournier Apr 30, 2024
41c4372
post-merge fixes
tabmatfournier Apr 30, 2024
13220e9
more comments removing cruft
tabmatfournier Apr 30, 2024
797b861
regexrecordrouter
tabmatfournier Apr 30, 2024
7bd7d5d
start of fallback mode
tabmatfournier Apr 30, 2024
bff7233
third mode
tabmatfournier May 13, 2024
25208da
another test case
tabmatfournier May 13, 2024
205c2d7
better regex detection to avoid an extra config
tabmatfournier May 13, 2024
4aeedcd
cruft cleanup and starting docs
tabmatfournier May 13, 2024
0cf18d1
fix error transform tests
tabmatfournier May 14, 2024
05ea87f
more docs
tabmatfournier May 14, 2024
457a2f3
Merge branch 'main' into dead-letter-table
tabmatfournier May 21, 2024
1518cc7
Merge branch 'main' into dead-letter-table
tabmatfournier Jun 5, 2024
e4977c4
dead-letter-table
tabmatfournier Jun 5, 2024
4036557
rename module to not include deadletter
tabmatfournier Jun 5, 2024
0ff7972
moved writeExceptions into exception module
tabmatfournier Jun 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ http-client = { module = "org.apache.httpcomponents.client5:httpclient5", versi
junit-api = { module = "org.junit.jupiter:junit-jupiter-api", version.ref = "junit-ver" }
junit-engine = { module = "org.junit.jupiter:junit-jupiter-engine", version.ref = "junit-ver" }
junit-params = { module = "org.junit.jupiter:junit-jupiter-params", version.ref = "junit-ver" }
kafka-connect-runtime = {module = "org.apache.kafka:connect-runtime", version.ref = "kafka-ver"}
mockito = "org.mockito:mockito-core:4.8.1"
testcontainers = { module = "org.testcontainers:testcontainers", version.ref = "testcontainers-ver" }
testcontainers-kafka = { module = "org.testcontainers:kafka", version.ref = "testcontainers-ver" }
Expand Down
30 changes: 30 additions & 0 deletions kafka-connect-deadletter/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
plugins {
id "java-test-fixtures"
}

dependencies {
implementation libs.iceberg.core
implementation libs.iceberg.common
implementation libs.iceberg.guava
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need any of the actual iceberg functionality in this module?

Suggested change
implementation libs.iceberg.core
implementation libs.iceberg.common
implementation libs.iceberg.guava

The only thing you do need is this import :D

import org.apache.iceberg.relocated.com.google.common.collect.Lists;

List<Struct> headers = Lists.newArrayList();

Which IMO you can just replace with this safely.

import java.util.ArrayList;

@SuppressWarnings("RegexpSingleline")
List<Struct> headers = new ArrayList<>();

implementation libs.avro
compileOnly libs.bundles.kafka.connect


testImplementation libs.junit.api
testRuntimeOnly libs.junit.engine

testImplementation libs.mockito
testImplementation libs.assertj

testFixturesImplementation libs.iceberg.common
testFixturesImplementation libs.iceberg.core
testFixturesImplementation libs.avro
tabmatfournier marked this conversation as resolved.
Show resolved Hide resolved
}

publishing {
publications {
mavenJava(MavenPublication) {
from components.java
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.tabular.iceberg.connect.deadletter;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.sink.SinkRecord;

public class DeadLetterUtils {

public static class DeadLetterException extends RuntimeException {
private final String location;
private final Throwable error;

public DeadLetterException(String location, Throwable error) {
super(error);
this.location = location;
this.error = error;
}

public String getLocation() {
return location;
}

public Throwable getError() {
return error;
}
}

private DeadLetterUtils() {
throw new IllegalStateException("Should not be initialialized");
tabmatfournier marked this conversation as resolved.
Show resolved Hide resolved
}

public static final String TYPE = "dlt";
public static final String KEY_BYTES = "key";
public static final String VALUE_BYTES = "value";
public static final String PAYLOAD_KEY = "transformed";
public static final String ORIGINAL_BYTES_KEY = "original";
private static final String HEADERS = "headers";
public static final Schema HEADER_ELEMENT_SCHEMA =
SchemaBuilder.struct()
.field("key", Schema.STRING_SCHEMA)
.field("value", Schema.OPTIONAL_BYTES_SCHEMA)
.optional()
.build();
public static final Schema HEADER_SCHEMA =
SchemaBuilder.array(HEADER_ELEMENT_SCHEMA).optional().build();
tabmatfournier marked this conversation as resolved.
Show resolved Hide resolved
public static final Schema FAILED_SCHEMA =
SchemaBuilder.struct()
.name("failed_message")
.parameter("isFailed", "true")
.field("identifier", Schema.OPTIONAL_STRING_SCHEMA)
.field("topic", Schema.STRING_SCHEMA)
.field("partition", Schema.INT32_SCHEMA)
.field("offset", Schema.INT64_SCHEMA)
.field("location", Schema.STRING_SCHEMA)
.field("timestamp", Schema.OPTIONAL_INT64_SCHEMA)
.field("exception", Schema.OPTIONAL_STRING_SCHEMA)
.field("stack_trace", Schema.OPTIONAL_STRING_SCHEMA)
.field("key_bytes", Schema.OPTIONAL_BYTES_SCHEMA)
.field("value_bytes", Schema.OPTIONAL_BYTES_SCHEMA)
.field(HEADERS, HEADER_SCHEMA)
.field("target_table", Schema.OPTIONAL_STRING_SCHEMA)
tabmatfournier marked this conversation as resolved.
Show resolved Hide resolved
.field("type", Schema.STRING_SCHEMA)
.schema();
tabmatfournier marked this conversation as resolved.
Show resolved Hide resolved

public static String stackTrace(Throwable error) {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
error.printStackTrace(pw);
return sw.toString();
}

public static class Values {
// expect byte[]
private final Object keyBytes;
// expect byte[]
private final Object valueBytes;
// expect List<Struct>
private final Object headers;

public Values(Object keyBytes, Object valueBytes, Object headers) {
this.keyBytes = keyBytes;
this.valueBytes = valueBytes;
this.headers = headers;
}

public Object getKeyBytes() {
return keyBytes;
}

public Object getValueBytes() {
return valueBytes;
}

public Object getHeaders() {
return headers;
}
}

public static SinkRecord failedRecord(
SinkRecord original, Throwable error, String location, String identifier) {
List<Struct> headers = null;
if (!original.headers().isEmpty()) {
headers = DeadLetterUtils.serializedHeaders(original);
}
Values values = new Values(original.key(), original.value(), headers);
return failedRecord(original, values, error, location, null, identifier);
}

private static SinkRecord failedRecord(
SinkRecord original,
Values values,
Throwable error,
String location,
String targetTable,
String identifier) {

Struct struct = new Struct(FAILED_SCHEMA);
if (identifier != null) {
struct.put("identifier", identifier);
}
struct.put("topic", original.topic());
struct.put("partition", original.kafkaPartition());
struct.put("offset", original.kafkaOffset());
struct.put("timestamp", original.timestamp());
struct.put("location", location);
struct.put("exception", error.toString());
String stack = stackTrace(error);
if (!stack.isEmpty()) {
struct.put("stack_trace", stackTrace(error));
}
if (values.getKeyBytes() != null) {
struct.put("key_bytes", values.getKeyBytes());
}
if (values.getValueBytes() != null) {
struct.put("value_bytes", values.getValueBytes());
}
if (values.getHeaders() != null) {
struct.put(HEADERS, values.getHeaders());
}
if (targetTable != null) {
struct.put("target_table", targetTable);
}

struct.put("type", TYPE);
return original.newRecord(
original.topic(),
original.kafkaPartition(),
null,
null,
FAILED_SCHEMA,
struct,
original.timestamp());
}

/**
* No way to get back the original Kafka header bytes. We instead have an array with elements of
* {"key": String, "value": bytes} for each header. This can be converted back into a Kafka
* Connect header by the user later, and further converted into Kafka RecordHeaders to be put back
* into a ProducerRecord to create the original headers on the Kafka record.
*
* @param original record where headers are still byte array values
* @return Struct for an Array that can be put into Iceberg
*/
public static List<Struct> serializedHeaders(SinkRecord original) {
List<Struct> headers = Lists.newArrayList();
for (Header header : original.headers()) {
Struct headerStruct = new Struct(HEADER_ELEMENT_SCHEMA);
headerStruct.put("key", header.key());
headerStruct.put("value", header.value());
headers.add(headerStruct);
}
return headers;
}

@SuppressWarnings("unchecked")
public static SinkRecord mapToFailedRecord(
String targetTable, SinkRecord record, String location, Throwable error, String identifier) {
Map<String, Object> payload = (Map<String, Object>) record.value();
Map<String, Object> bytes = (Map<String, Object>) payload.get(ORIGINAL_BYTES_KEY);
Object keyBytes = bytes.get(KEY_BYTES);
Object valueBytes = bytes.get(VALUE_BYTES);
Object headers = bytes.get(HEADERS);
Values values = new Values(keyBytes, valueBytes, headers);
return failedRecord(record, values, error, location, targetTable, identifier);
}
}
2 changes: 2 additions & 0 deletions kafka-connect-transforms/build.gradle
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
dependencies {
implementation project(":iceberg-kafka-connect-deadletter")
implementation libs.iceberg.guava
implementation libs.slf4j
compileOnly libs.bundles.kafka.connect
Expand All @@ -8,6 +9,7 @@ dependencies {

testImplementation libs.mockito
testImplementation libs.assertj
testRuntimeOnly libs.kafka.connect.runtime
}

configurations {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.tabular.iceberg.connect.transforms;

import io.tabular.iceberg.connect.deadletter.DeadLetterUtils;
import org.apache.kafka.connect.sink.SinkRecord;

public class DefaultExceptionHandler implements TransformExceptionHandler {
@Override
public SinkRecord handle(SinkRecord original, Throwable error, String location) {
return DeadLetterUtils.failedRecord(original, error, location, null);
}
}
Loading
Loading