Skip to content

Commit

Permalink
fix: records with data type mismatching goes to DLQ instead of failin…
Browse files Browse the repository at this point in the history
…g the connector (#27)

* fix: records with data type mismatching go to DLQ instead of failing the connector

fixes #26
this is a first impl. it could be optimized further, but I assume bad data are rare
and it already does the job as it is.

there is a potential issue with `inflightSinkRecords` referencing all in-flight
data. in theory, this can +- double memory consumption. but we need the original
SinkRecord so we can send them to DLQ.
  • Loading branch information
jerrinot authored Sep 9, 2024
1 parent 59e21ae commit b4510d1
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 7 deletions.
7 changes: 6 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,9 @@ jobs:
distribution: 'temurin'
cache: maven
- name: Build with Maven
run: mvn -B package --file pom.xml
run: mvn -B package --file pom.xml
- name: Archive connector artifact
uses: actions/upload-artifact@v4
with:
name: connector-snapshot
path: connector/target/kafka-questdb-connector-*-bin.zip
58 changes: 53 additions & 5 deletions connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.questdb.cutlass.http.client.HttpClientException;
import io.questdb.cutlass.line.LineSenderException;
import io.questdb.std.NumericException;
import io.questdb.std.ObjList;
import io.questdb.std.datetime.DateFormat;
import io.questdb.std.datetime.microtime.Timestamps;
import io.questdb.std.datetime.millitime.DateFormatUtils;
Expand All @@ -15,6 +16,7 @@
import org.apache.kafka.connect.data.*;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
Expand Down Expand Up @@ -48,6 +50,8 @@ public final class QuestDBSinkTask extends SinkTask {
private long nextFlushNanos;
private int pendingRows;
private final FlushConfig flushConfig = new FlushConfig();
private final ObjList<SinkRecord> inflightSinkRecords = new ObjList<>();
private ErrantRecordReporter reporter;

@Override
public String version() {
Expand Down Expand Up @@ -86,6 +90,12 @@ public void start(Map<String, String> map) {
this.allowedLag = config.getAllowedLag();
this.nextFlushNanos = System.nanoTime() + flushConfig.autoFlushNanos;
this.recordToTable = Templating.newTableTableFn(config.getTable());
try {
reporter = context.errantRecordReporter();
} catch (NoSuchMethodError | NoClassDefFoundError e) {
// Kafka older than 2.6
reporter = null;
}
}

private Sender createRawSender() {
Expand Down Expand Up @@ -159,6 +169,9 @@ public void put(Collection<SinkRecord> collection) {
sender = createSender();
}
for (SinkRecord record : collection) {
if (httpTransport) {
inflightSinkRecords.add(record);
}
handleSingleRecord(record);
}

Expand Down Expand Up @@ -208,22 +221,27 @@ public void put(Collection<SinkRecord> collection) {
private void flushAndResetCounters() {
log.debug("Flushing data to QuestDB");
try {
sender.flush();
if (sender != null) {
sender.flush();
}
nextFlushNanos = System.nanoTime() + flushConfig.autoFlushNanos;
pendingRows = 0;
} catch (LineSenderException | HttpClientException e) {
onSenderException(e);
} finally {
inflightSinkRecords.clear();
}
}

private void onSenderException(Exception e) {
if (httpTransport) {
closeSenderSilently();
nextFlushNanos = System.nanoTime() + flushConfig.autoFlushNanos;
pendingRows = 0;
throw new ConnectException("Failed to send data to QuestDB", e);
onHttpSenderException(e);
} else {
onTcpSenderException(e);
}
}

private void onTcpSenderException(Exception e) {
batchesSinceLastError = 0;
if (--remainingRetries > 0) {
closeSenderSilently();
Expand All @@ -235,6 +253,36 @@ private void onSenderException(Exception e) {
}
}

private void onHttpSenderException(Exception e) {
closeSenderSilently();
if (
(reporter != null && e.getMessage() != null) // hack to detect data parsing errors
&& (e.getMessage().contains("error in line") || e.getMessage().contains("failed to parse line protocol"))
) {
// ok, we have a parsing error, let's try to send records one by one to find the problematic record
// and we will report it to the error handler. the rest of the records will make it to QuestDB
sender = createSender();
for (int i = 0; i < inflightSinkRecords.size(); i++) {
SinkRecord sinkRecord = inflightSinkRecords.get(i);
try {
handleSingleRecord(sinkRecord);
sender.flush();
} catch (Exception ex) {
context.errantRecordReporter().report(sinkRecord, ex);
closeSenderSilently();
sender = createSender();
}
}
nextFlushNanos = System.nanoTime() + flushConfig.autoFlushNanos;
pendingRows = 0;
} else {
// ok, this is not a parsing error, let's just close the sender and rethrow the exception
nextFlushNanos = System.nanoTime() + flushConfig.autoFlushNanos;
pendingRows = 0;
throw new ConnectException("Failed to send data to QuestDB", e);
}
}

private void closeSenderSilently() {
if (sender != null) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
public final class QuestDBSinkConnectorEmbeddedTest {
private static int httpPort = -1;
private static int ilpPort = -1;
private static final String OFFICIAL_QUESTDB_DOCKER = "questdb/questdb:7.4.0";
private static final String OFFICIAL_QUESTDB_DOCKER = "questdb/questdb:8.1.1";
private static final boolean DUMP_QUESTDB_CONTAINER_LOGS = true;

private EmbeddedConnectCluster connect;
Expand Down Expand Up @@ -248,6 +248,81 @@ public void testDeadLetterQueue_wrongJson(boolean useHttp) {
Assertions.assertEquals("{\"not valid json}", new String(dqlRecord.value()));
}

@Test
public void testDeadLetterQueue_badColumnType() {
connect.kafka().createTopic(topicName, 1);
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true);
props.put("value.converter.schemas.enable", "false");
props.put("errors.deadletterqueue.topic.name", "dlq");
props.put("errors.deadletterqueue.topic.replication.factor", "1");
props.put("errors.tolerance", "all");
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);

QuestDBUtils.assertSql(
"{\"ddl\":\"OK\"}",
"create table " + topicName + " (firstname string, lastname string, age int, id uuid, ts timestamp) timestamp(ts) partition by day wal",
httpPort,
QuestDBUtils.Endpoint.EXEC);

String goodRecordA = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d041\"}";
String goodRecordB = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d042\"}";
String goodRecordC = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d043\"}";
String badRecordA = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"Invalid UUID\"}";
String badRecordB = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":\"not a number\",\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d041\"}";

// interleave good and bad records
connect.kafka().produce(topicName, "key", goodRecordA);
connect.kafka().produce(topicName, "key", badRecordA);
connect.kafka().produce(topicName, "key", goodRecordB);
connect.kafka().produce(topicName, "key", badRecordB);
connect.kafka().produce(topicName, "key", goodRecordC);

ConsumerRecords<byte[], byte[]> fetchedRecords = connect.kafka().consume(2, 120_000, "dlq");
Assertions.assertEquals(2, fetchedRecords.count());
Iterator<ConsumerRecord<byte[], byte[]>> iterator = fetchedRecords.iterator();
Assertions.assertEquals(badRecordA, new String(iterator.next().value()));
Assertions.assertEquals(badRecordB, new String(iterator.next().value()));

QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\",\"id\"\r\n"
+ "\"John\",\"Doe\",42,ad956a45-a55b-441e-b80d-023a2bf5d041\r\n"
+ "\"John\",\"Doe\",42,ad956a45-a55b-441e-b80d-023a2bf5d042\r\n"
+ "\"John\",\"Doe\",42,ad956a45-a55b-441e-b80d-023a2bf5d043\r\n",
"select firstname,lastname,age, id from " + topicName,
httpPort);

}

@Test
public void testbadColumnType_noDLQ() {
connect.kafka().createTopic(topicName, 1);
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true);
props.put("value.converter.schemas.enable", "false");
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);

QuestDBUtils.assertSql(
"{\"ddl\":\"OK\"}",
"create table " + topicName + " (firstname string, lastname string, age int, id uuid, ts timestamp) timestamp(ts) partition by day wal",
httpPort,
QuestDBUtils.Endpoint.EXEC);

String goodRecordA = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d041\"}";
String goodRecordB = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d042\"}";
String goodRecordC = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d043\"}";
String badRecordA = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"Invalid UUID\"}";
String badRecordB = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":\"not a number\",\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d041\"}";

// interleave good and bad records
connect.kafka().produce(topicName, "key", goodRecordA);
connect.kafka().produce(topicName, "key", badRecordA);
connect.kafka().produce(topicName, "key", goodRecordB);
connect.kafka().produce(topicName, "key", badRecordB);
connect.kafka().produce(topicName, "key", goodRecordC);

ConnectTestUtils.assertConnectorTaskStateEventually(connect, AbstractStatus.State.FAILED);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testSymbol(boolean useHttp) {
Expand Down

0 comments on commit b4510d1

Please sign in to comment.