Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: records with data type mismatching goes to DLQ instead of failing the connector #27

Merged
merged 8 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,9 @@ jobs:
distribution: 'temurin'
cache: maven
- name: Build with Maven
run: mvn -B package --file pom.xml
run: mvn -B package --file pom.xml
- name: Archive connector artifact
uses: actions/upload-artifact@v4
with:
name: connector-snapshot
path: connector/target/kafka-questdb-connector-*-bin.zip
58 changes: 53 additions & 5 deletions connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.questdb.cutlass.http.client.HttpClientException;
import io.questdb.cutlass.line.LineSenderException;
import io.questdb.std.NumericException;
import io.questdb.std.ObjList;
import io.questdb.std.datetime.DateFormat;
import io.questdb.std.datetime.microtime.Timestamps;
import io.questdb.std.datetime.millitime.DateFormatUtils;
Expand All @@ -15,6 +16,7 @@
import org.apache.kafka.connect.data.*;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
Expand Down Expand Up @@ -48,6 +50,8 @@ public final class QuestDBSinkTask extends SinkTask {
private long nextFlushNanos;
private int pendingRows;
private final FlushConfig flushConfig = new FlushConfig();
private final ObjList<SinkRecord> inflightSinkRecords = new ObjList<>();
private ErrantRecordReporter reporter;

@Override
public String version() {
Expand Down Expand Up @@ -86,6 +90,12 @@ public void start(Map<String, String> map) {
this.allowedLag = config.getAllowedLag();
this.nextFlushNanos = System.nanoTime() + flushConfig.autoFlushNanos;
this.recordToTable = Templating.newTableTableFn(config.getTable());
try {
reporter = context.errantRecordReporter();
} catch (NoSuchMethodError | NoClassDefFoundError e) {
// Kafka older than 2.6
reporter = null;
}
}

private Sender createRawSender() {
Expand Down Expand Up @@ -159,6 +169,9 @@ public void put(Collection<SinkRecord> collection) {
sender = createSender();
}
for (SinkRecord record : collection) {
if (httpTransport) {
inflightSinkRecords.add(record);
}
handleSingleRecord(record);
}

Expand Down Expand Up @@ -208,22 +221,27 @@ public void put(Collection<SinkRecord> collection) {
private void flushAndResetCounters() {
log.debug("Flushing data to QuestDB");
try {
sender.flush();
if (sender != null) {
sender.flush();
}
nextFlushNanos = System.nanoTime() + flushConfig.autoFlushNanos;
pendingRows = 0;
} catch (LineSenderException | HttpClientException e) {
onSenderException(e);
} finally {
inflightSinkRecords.clear();
}
}

private void onSenderException(Exception e) {
if (httpTransport) {
closeSenderSilently();
nextFlushNanos = System.nanoTime() + flushConfig.autoFlushNanos;
pendingRows = 0;
throw new ConnectException("Failed to send data to QuestDB", e);
onHttpSenderException(e);
} else {
onTcpSenderException(e);
}
}

private void onTcpSenderException(Exception e) {
batchesSinceLastError = 0;
if (--remainingRetries > 0) {
closeSenderSilently();
Expand All @@ -235,6 +253,36 @@ private void onSenderException(Exception e) {
}
}

private void onHttpSenderException(Exception e) {
closeSenderSilently();
if (
(reporter != null && e.getMessage() != null) // hack to detect data parsing errors
&& (e.getMessage().contains("error in line") || e.getMessage().contains("failed to parse line protocol"))
) {
// ok, we have a parsing error, let's try to send records one by one to find the problematic record
// and we will report it to the error handler. the rest of the records will make it to QuestDB
sender = createSender();
for (int i = 0; i < inflightSinkRecords.size(); i++) {
SinkRecord sinkRecord = inflightSinkRecords.get(i);
try {
handleSingleRecord(sinkRecord);
sender.flush();
} catch (Exception ex) {
context.errantRecordReporter().report(sinkRecord, ex);
closeSenderSilently();
sender = createSender();
}
}
nextFlushNanos = System.nanoTime() + flushConfig.autoFlushNanos;
pendingRows = 0;
} else {
// ok, this is not a parsing error, let's just close the sender and rethrow the exception
nextFlushNanos = System.nanoTime() + flushConfig.autoFlushNanos;
pendingRows = 0;
throw new ConnectException("Failed to send data to QuestDB", e);
}
}

private void closeSenderSilently() {
if (sender != null) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
public final class QuestDBSinkConnectorEmbeddedTest {
private static int httpPort = -1;
private static int ilpPort = -1;
private static final String OFFICIAL_QUESTDB_DOCKER = "questdb/questdb:7.4.0";
private static final String OFFICIAL_QUESTDB_DOCKER = "questdb/questdb:8.1.1";
private static final boolean DUMP_QUESTDB_CONTAINER_LOGS = true;

private EmbeddedConnectCluster connect;
Expand Down Expand Up @@ -248,6 +248,81 @@ public void testDeadLetterQueue_wrongJson(boolean useHttp) {
Assertions.assertEquals("{\"not valid json}", new String(dqlRecord.value()));
}

@Test
public void testDeadLetterQueue_badColumnType() {
connect.kafka().createTopic(topicName, 1);
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true);
props.put("value.converter.schemas.enable", "false");
props.put("errors.deadletterqueue.topic.name", "dlq");
props.put("errors.deadletterqueue.topic.replication.factor", "1");
props.put("errors.tolerance", "all");
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);

QuestDBUtils.assertSql(
"{\"ddl\":\"OK\"}",
"create table " + topicName + " (firstname string, lastname string, age int, id uuid, ts timestamp) timestamp(ts) partition by day wal",
httpPort,
QuestDBUtils.Endpoint.EXEC);

String goodRecordA = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d041\"}";
String goodRecordB = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d042\"}";
String goodRecordC = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d043\"}";
String badRecordA = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"Invalid UUID\"}";
String badRecordB = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":\"not a number\",\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d041\"}";

// interleave good and bad records
connect.kafka().produce(topicName, "key", goodRecordA);
connect.kafka().produce(topicName, "key", badRecordA);
connect.kafka().produce(topicName, "key", goodRecordB);
connect.kafka().produce(topicName, "key", badRecordB);
connect.kafka().produce(topicName, "key", goodRecordC);

ConsumerRecords<byte[], byte[]> fetchedRecords = connect.kafka().consume(2, 120_000, "dlq");
Assertions.assertEquals(2, fetchedRecords.count());
Iterator<ConsumerRecord<byte[], byte[]>> iterator = fetchedRecords.iterator();
Assertions.assertEquals(badRecordA, new String(iterator.next().value()));
Assertions.assertEquals(badRecordB, new String(iterator.next().value()));

QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\",\"id\"\r\n"
+ "\"John\",\"Doe\",42,ad956a45-a55b-441e-b80d-023a2bf5d041\r\n"
+ "\"John\",\"Doe\",42,ad956a45-a55b-441e-b80d-023a2bf5d042\r\n"
+ "\"John\",\"Doe\",42,ad956a45-a55b-441e-b80d-023a2bf5d043\r\n",
"select firstname,lastname,age, id from " + topicName,
httpPort);

}

@Test
public void testbadColumnType_noDLQ() {
connect.kafka().createTopic(topicName, 1);
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true);
props.put("value.converter.schemas.enable", "false");
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);

QuestDBUtils.assertSql(
"{\"ddl\":\"OK\"}",
"create table " + topicName + " (firstname string, lastname string, age int, id uuid, ts timestamp) timestamp(ts) partition by day wal",
httpPort,
QuestDBUtils.Endpoint.EXEC);

String goodRecordA = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d041\"}";
String goodRecordB = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d042\"}";
String goodRecordC = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d043\"}";
String badRecordA = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"Invalid UUID\"}";
String badRecordB = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":\"not a number\",\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d041\"}";

// interleave good and bad records
connect.kafka().produce(topicName, "key", goodRecordA);
connect.kafka().produce(topicName, "key", badRecordA);
connect.kafka().produce(topicName, "key", goodRecordB);
connect.kafka().produce(topicName, "key", badRecordB);
connect.kafka().produce(topicName, "key", goodRecordC);

ConnectTestUtils.assertConnectorTaskStateEventually(connect, AbstractStatus.State.FAILED);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testSymbol(boolean useHttp) {
Expand Down
Loading