Skip to content

Commit

Permalink
Merge pull request #12 from questdb/jh_dedup_experiment
Browse files Browse the repository at this point in the history
feat: Exactly Once QoS
  • Loading branch information
jerrinot authored Oct 31, 2023
2 parents 91139d9 + 1097862 commit 2f573ca
Show file tree
Hide file tree
Showing 20 changed files with 994 additions and 190 deletions.
17 changes: 17 additions & 0 deletions .github/workflows/it.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
name: Run Integration Tests
on:
workflow_dispatch:

jobs:
publish:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up JDK 11
uses: actions/setup-java@v3
with:
java-version: 11
distribution: 'temurin'
cache: 'maven'
- name: Run integration tests
run: mvn -B verify
35 changes: 35 additions & 0 deletions connector/src/main/java/io/questdb/kafka/EmptyOffsetTracker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.questdb.kafka;

import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkTaskContext;

import java.util.Collection;
import java.util.Map;

public final class EmptyOffsetTracker implements OffsetTracker {
@Override
public void onPartitionsOpened(Collection<TopicPartition> partitions) {

}

@Override
public void onPartitionsClosed(Collection<TopicPartition> partitions) {

}

@Override
public void onObservedOffset(int partition, String topic, long offset) {

}

@Override
public void configureSafeOffsets(SinkTaskContext sinkTaskContext, long rewindOffset) {
assert rewindOffset == 0;
}

@Override
public void transformPreCommit(Map<TopicPartition, OffsetAndMetadata> currentOffsets, long rewindOffset) {
assert rewindOffset == 0;
}
}
122 changes: 122 additions & 0 deletions connector/src/main/java/io/questdb/kafka/MultiOffsetTracker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package io.questdb.kafka;

import io.questdb.std.CharSequenceObjHashMap;
import io.questdb.std.LongList;
import io.questdb.std.ObjList;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkTaskContext;

import java.util.Collection;
import java.util.Map;

public final class MultiOffsetTracker implements OffsetTracker {
private static final int EMPTY = -1;
private static final int CLOSED = -2;

private final CharSequenceObjHashMap<LongList> offsets = new CharSequenceObjHashMap<>();

private String lastTopicCache;
private LongList lastTopicOffsetsCache;

@Override
public void onPartitionsOpened(Collection<TopicPartition> partitions) {
for (TopicPartition partition : partitions) {
String topic = partition.topic();
LongList topicOffsets = offsets.get(topic);
if (topicOffsets == null) {
topicOffsets = new LongList(4);
offsets.put(topic, topicOffsets);
}

int partitionId = partition.partition();
int currentSize = topicOffsets.size();
if (currentSize <= partitionId) {
topicOffsets.extendAndSet(partitionId, EMPTY);
if (currentSize != partitionId) {
topicOffsets.fill(currentSize, partitionId, EMPTY);
}
} else if (topicOffsets.get(partitionId) == CLOSED) {
topicOffsets.set(partitionId, EMPTY);
}
}
}

@Override
public void onPartitionsClosed(Collection<TopicPartition> partitions) {
for (TopicPartition partition : partitions) {
String topic = partition.topic();
LongList topicOffsets = offsets.get(topic);
topicOffsets.set(partition.partition(), CLOSED);
}

// Remove topics that have all partitions closed
ObjList<CharSequence> keys = offsets.keys();
for (int i = 0, n = keys.size(); i < n; i++) {
CharSequence topic = keys.getQuick(i);
LongList topicOffsets = offsets.get(topic);
boolean allClosed = true;
for (int partition = 0, m = topicOffsets.size(); partition < m; partition++) {
if (topicOffsets.get(partition) != CLOSED) {
allClosed = false;
break;
}
}
if (allClosed) {
offsets.remove(topic);
}
}
}



@Override
public void onObservedOffset(int partition, String topic, long offset) {
LongList topicOffsets;

// intentional reference equality check - Kafka Connect use the same String instances
// so we can avoid hash map lookup
if (lastTopicCache == topic) {
topicOffsets = lastTopicOffsetsCache;
} else {
topicOffsets = offsets.get(topic);
lastTopicCache = topic;
lastTopicOffsetsCache = topicOffsets;
}
long maxOffset = topicOffsets.get(partition);
topicOffsets.set(partition, Math.max(maxOffset, offset));
}


@Override
public void configureSafeOffsets(SinkTaskContext sinkTaskContext, long rewindOffset) {
ObjList<CharSequence> keys = offsets.keys();
for (int i = 0, n = keys.size(); i < n; i++) {
CharSequence topic = keys.getQuick(i);
LongList topicOffsets = offsets.get(topic);
for (int partition = 0, m = topicOffsets.size(); partition < m; partition++) {
long offset = topicOffsets.get(partition);
// only rewind if we ever observed an offset for this partition
if (offset >= 0) {
sinkTaskContext.offset(new TopicPartition(topic.toString(), partition), Math.max(0, offset - rewindOffset));
}
}
}
}

@Override
public void transformPreCommit(Map<TopicPartition, OffsetAndMetadata> currentOffsets, long rewindOffset) {
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : currentOffsets.entrySet()) {
TopicPartition topicPartition = entry.getKey();
String topic = topicPartition.topic();
LongList topicOffsets = offsets.get(topic);
long offset = topicOffsets.get(topicPartition.partition());

// only transform if we ever observed an offset for this partition
if (offset >= 0) {
long newOffset = Math.max(0, offset - rewindOffset);
entry.setValue(new OffsetAndMetadata(newOffset));
}
}
}
}
20 changes: 20 additions & 0 deletions connector/src/main/java/io/questdb/kafka/OffsetTracker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.questdb.kafka;

import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkTaskContext;

import java.util.Collection;
import java.util.Map;

public interface OffsetTracker {
void onPartitionsOpened(Collection<TopicPartition> partitions);

void onPartitionsClosed(Collection<TopicPartition> partitions);

void onObservedOffset(int partition, String topic, long offset);

void configureSafeOffsets(SinkTaskContext sinkTaskContext, long rewindOffset);

void transformPreCommit(Map<TopicPartition, OffsetAndMetadata> currentOffsets, long rewindOffset);
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,15 @@ public final class QuestDBSinkConnectorConfig extends AbstractConfig {
public static final String TIMESTAMP_FORMAT = "timestamp.string.format";
private static final String TIMESTAMP_FORMAT_DOC = "Timestamp format. Used when parsing timestamp string fields";

public static final String DEDUPLICATION_REWIND_CONFIG = "dedup.rewind.offset";
private static final String DEDUPLICATION_REWIND_DOC = "Rewind offset for deduplication. " +
"On failure, the connector will rewind the offset by this amount and retry. This is designed to work in concert with QuestDB " +
"deduplication feature. The rewind offset must be greater than or equal to the maximum number of records that can lost in the event of a failure. " +
"If the rewind is too small, some events might be missing from QuestDB. If the rewind is too large, the connector will be slower to recover " +
"as it will have to reprocess a large number of records and QuestDB will have to do extra work with deduplication. If you are testing this " +
"feature for the first time then 150000 is a good starting point.";


private static final String DEFAULT_TIMESTAMP_FORMAT = "yyyy-MM-ddTHH:mm:ss.SSSUUUZ";

public QuestDBSinkConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
Expand Down Expand Up @@ -99,7 +108,12 @@ public static ConfigDef conf() {
.define(MAX_RETRIES, Type.INT, 10, Importance.LOW, MAX_RETRIES_DOC)
.define(TIMESTAMP_FORMAT, Type.STRING, DEFAULT_TIMESTAMP_FORMAT, TimestampFormatValidator.INSTANCE, Importance.MEDIUM, TIMESTAMP_FORMAT_DOC)
.define(TIMESTAMP_STRING_FIELDS, Type.STRING, null, Importance.MEDIUM, TIMESTAMP_STRING_FIELDS_DOC)
.define(DESIGNATED_TIMESTAMP_KAFKA_NATIVE_CONFIG, Type.BOOLEAN, false, Importance.MEDIUM, DESIGNATED_TIMESTAMP_KAFKA_NATIVE_DOC);
.define(DESIGNATED_TIMESTAMP_KAFKA_NATIVE_CONFIG, Type.BOOLEAN, false, Importance.MEDIUM, DESIGNATED_TIMESTAMP_KAFKA_NATIVE_DOC)
.define(DEDUPLICATION_REWIND_CONFIG, Type.LONG, 0, Importance.MEDIUM, DEDUPLICATION_REWIND_DOC);
}

public long getDeduplicationRewindOffset() {
return getLong(DEDUPLICATION_REWIND_CONFIG);
}

public String getHost() {
Expand Down
36 changes: 33 additions & 3 deletions connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public final class QuestDBSinkTask extends SinkTask {
private long batchesSinceLastError = 0;
private DateFormat dataFormat;
private boolean kafkaTimestampsEnabled;
private OffsetTracker tracker = new MultiOffsetTracker();
private long deduplicationRewindOffset;

@Override
public String version() {
Expand All @@ -46,6 +48,13 @@ public String version() {
@Override
public void start(Map<String, String> map) {
this.config = new QuestDBSinkConnectorConfig(map);
this.deduplicationRewindOffset = config.getDeduplicationRewindOffset();
if (deduplicationRewindOffset == 0) {
tracker = new EmptyOffsetTracker();
} else {
tracker = new MultiOffsetTracker();
}

String timestampStringFields = config.getTimestampStringFields();
if (timestampStringFields != null) {
stringTimestampColumns = new HashSet<>();
Expand Down Expand Up @@ -73,6 +82,16 @@ public void start(Map<String, String> map) {
this.timestampUnits = config.getTimestampUnitsOrNull();
}

@Override
public void open(Collection<TopicPartition> partitions) {
tracker.onPartitionsOpened(partitions);
}

@Override
public void close(Collection<TopicPartition> partitions) {
tracker.onPartitionsClosed(partitions);
}

private Sender createSender() {
log.debug("Creating a new sender");
Sender.LineSenderBuilder builder = Sender.builder().address(config.getHost());
Expand Down Expand Up @@ -119,8 +138,8 @@ public void put(Collection<SinkRecord> collection) {
if (++batchesSinceLastError == 10) {
// why 10? why not to reset the retry counter immediately upon a successful flush()?
// there are two reasons for server disconnections:
// 1. the server is down / unreachable / other_infrastructure_issues
// 2. the client is sending bad data (e.g. pushing a string to a double column)
// 1. infrastructure: the server is down / unreachable / other_infrastructure_issues
// 2. structural: the client is sending bad data (e.g. pushing a string to a double column)
// errors in the latter case are not recoverable. upon receiving bad data the server will *eventually* close the connection,
// after a while, the client will notice that the connection is closed and will try to reconnect
// if we reset the retry counter immediately upon first successful flush() then we end-up in a loop where we flush bad data,
Expand All @@ -140,13 +159,21 @@ private void onSenderException(LineSenderException e) {
closeSenderSilently();
sender = null;
log.debug("Sender exception, retrying in {} ms", config.getRetryBackoffMs());
tracker.configureSafeOffsets(context, deduplicationRewindOffset);
context.timeout(config.getRetryBackoffMs());
throw new RetriableException(e);
} else {
throw new ConnectException("Failed to send data to QuestDB after " + config.getMaxRetries() + " retries");
}
}


@Override
public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
tracker.transformPreCommit(currentOffsets, deduplicationRewindOffset);
return currentOffsets;
}

private void closeSenderSilently() {
try {
if (sender != null) {
Expand All @@ -159,6 +186,9 @@ private void closeSenderSilently() {

private void handleSingleRecord(SinkRecord record) {
assert timestampColumnValue == Long.MIN_VALUE;

tracker.onObservedOffset(record.kafkaPartition(), record.topic(), record.kafkaOffset());

String explicitTable = config.getTable();
String tableName = explicitTable == null ? record.topic() : explicitTable;
sender.table(tableName);
Expand Down Expand Up @@ -410,7 +440,7 @@ private boolean tryWriteLogicalType(String name, Schema schema, Object value) {

@Override
public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
// not needed as put() flushes after each record
// not needed as put() flushes after each batch
}

@Override
Expand Down
11 changes: 8 additions & 3 deletions connector/src/test/java/io/questdb/kafka/ConnectTestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -42,18 +43,22 @@ static void assertConnectorTaskStateEventually(EmbeddedConnectCluster connect, A
}

static Map<String, String> baseConnectorProps(GenericContainer<?> questDBContainer, String topicName) {
String ilpIUrl = questDBContainer.getHost() + ":" + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_ILP_PORT);

Map<String, String> props = new HashMap<>();
props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, QuestDBSinkConnector.class.getName());
props.put("topics", topicName);
props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
props.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
props.put("host", questDBContainer.getHost() + ":" + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_ILP_PORT));
props.put("host", ilpIUrl);
return props;
}

static void assertConnectorTaskState(EmbeddedConnectCluster connect, String connectorName, AbstractStatus.State expectedState) {
ConnectorStateInfo info = connect.connectorStatus(connectorName);
if (info == null) {
ConnectorStateInfo info = null;
try {
info = connect.connectorStatus(connectorName);
} catch (ConnectRestException e) {
fail("Connector " + connectorName + " not found");
}
List<ConnectorStateInfo.TaskState> taskStates = info.tasks();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,9 @@ public void testSmoke() {

connect.kafka().produce(topicName, "key", new String(converter.fromConnectData(topicName, schema, struct)));

QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"age\"\r\n"
QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\"\r\n"
+ "\"John\",\"Doe\",42\r\n",
"select firstname,lastname,age from " + topicName);
"select firstname,lastname,age from " + topicName,
questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT));
}
}
Loading

0 comments on commit 2f573ca

Please sign in to comment.