From 04dd83af6c217968424126241829deebaa70574e Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Mon, 9 Oct 2023 10:55:06 +0200 Subject: [PATCH 01/16] feat: Exactly Once QoS This feature requites coordination with QuestDB deduplication. --- .../MultiTopicPartitionOffsetTracker.java | 76 ++++ .../io/questdb/kafka/QuestDBSinkTask.java | 25 ++ .../SingleTopicPartitionOffsetTracker.java | 72 ++++ .../kafka/TopicPartitionOffsetTracker.java | 20 + .../io/questdb/kafka/ConnectTestUtils.java | 11 +- .../QuestDBSinkConnectorEmbeddedAuthTest.java | 5 +- .../QuestDBSinkConnectorEmbeddedTest.java | 360 ++++++++++++------ .../java/io/questdb/kafka/QuestDBUtils.java | 28 +- connector/src/test/resources/log4j.properties | 2 +- .../questdb/kafka/AvroSchemaRegistryIT.java | 12 +- .../questdb/kafka/QuestDBSinkConnectorIT.java | 4 +- .../src/test/java/kafka/DebeziumIT.java | 105 ++--- 12 files changed, 537 insertions(+), 183 deletions(-) create mode 100644 connector/src/main/java/io/questdb/kafka/MultiTopicPartitionOffsetTracker.java create mode 100644 connector/src/main/java/io/questdb/kafka/SingleTopicPartitionOffsetTracker.java create mode 100644 connector/src/main/java/io/questdb/kafka/TopicPartitionOffsetTracker.java diff --git a/connector/src/main/java/io/questdb/kafka/MultiTopicPartitionOffsetTracker.java b/connector/src/main/java/io/questdb/kafka/MultiTopicPartitionOffsetTracker.java new file mode 100644 index 0000000..e308e8f --- /dev/null +++ b/connector/src/main/java/io/questdb/kafka/MultiTopicPartitionOffsetTracker.java @@ -0,0 +1,76 @@ +package io.questdb.kafka; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.sink.SinkTaskContext; + +import java.util.*; + +public final class MultiTopicPartitionOffsetTracker implements TopicPartitionOffsetTracker { + private final List> offsets = new ArrayList<>(); + + @Override + public void onPartitionsOpened(Collection partitions) { + for (TopicPartition partition : partitions) { + if (offsets.size() - 1 < partition.partition() || offsets.get(partition.partition()) == null) { + Map topic2offset = new HashMap<>(); + topic2offset.put(partition.topic(), -1L); + offsets.add(partition.partition(), topic2offset); + } else { + offsets.get(partition.partition()).put(partition.topic(), -1L); + } + } + } + + @Override + public void onPartitionsClosed(Collection partitions) { + for (TopicPartition partition : partitions) { + Map topic2offset = offsets.get(partition.partition()); + topic2offset.remove(partition.topic()); + if (topic2offset.isEmpty()) { + offsets.set(partition.partition(), null); + } + } + } + + @Override + public void onObservedOffset(int partition, String topic, long offset) { + Map partitionOffsets = offsets.get(partition); + Long maxOffset = partitionOffsets.get(topic); + if (maxOffset < offset) { + partitionOffsets.put(topic, offset); + } + } + + + @Override + public void configureSafeOffsets(SinkTaskContext sinkTaskContext, long rewindOffset) { + for (int partition = 0; partition < offsets.size(); partition++) { + Map topicOffsets = offsets.get(partition); + if (topicOffsets != null) { + for (Map.Entry entry : topicOffsets.entrySet()) { + String topic = entry.getKey(); + Long offset = entry.getValue(); + if (offset != -1) { + long newOffset = Math.max(0, offset - rewindOffset); + sinkTaskContext.offset(new TopicPartition(topic, partition), newOffset); + } + } + } + } + } + + @Override + public void transformPreCommit(Map currentOffsets, long rewindOffset) { + for (Map.Entry entry : currentOffsets.entrySet()) { + TopicPartition topicPartition = entry.getKey(); + Map partitionOffsets = offsets.get(topicPartition.partition()); + assert partitionOffsets != null; + Long offset = partitionOffsets.get(topicPartition.topic()); + if (offset != -1) { + long newOffset = Math.max(0, offset - rewindOffset); + entry.setValue(new OffsetAndMetadata(newOffset)); + } + } + } +} diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java index b72f013..fefda06 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java @@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit; public final class QuestDBSinkTask extends SinkTask { + private static final int PROBABLY_SAFE_OFFSET_ROLLBACK = 150_000; private static final char STRUCT_FIELD_SEPARATOR = '_'; private static final String PRIMITIVE_KEY_FALLBACK_NAME = "key"; private static final String PRIMITIVE_VALUE_FALLBACK_NAME = "value"; @@ -37,6 +38,8 @@ public final class QuestDBSinkTask extends SinkTask { private long batchesSinceLastError = 0; private DateFormat dataFormat; private boolean kafkaTimestampsEnabled; +// private final TopicPartitionOffsetTracker tracker = new MultiTopicPartitionOffsetTracker(); +private final TopicPartitionOffsetTracker tracker = new SingleTopicPartitionOffsetTracker(); @Override public String version() { @@ -73,6 +76,16 @@ public void start(Map map) { this.timestampUnits = config.getTimestampUnitsOrNull(); } + @Override + public void open(Collection partitions) { + tracker.onPartitionsOpened(partitions); + } + + @Override + public void close(Collection partitions) { + tracker.onPartitionsClosed(partitions); + } + private Sender createSender() { log.debug("Creating a new sender"); Sender.LineSenderBuilder builder = Sender.builder().address(config.getHost()); @@ -140,6 +153,7 @@ private void onSenderException(LineSenderException e) { closeSenderSilently(); sender = null; log.debug("Sender exception, retrying in {} ms", config.getRetryBackoffMs()); + tracker.configureSafeOffsets(context, PROBABLY_SAFE_OFFSET_ROLLBACK); context.timeout(config.getRetryBackoffMs()); throw new RetriableException(e); } else { @@ -147,6 +161,14 @@ private void onSenderException(LineSenderException e) { } } + + @Override + public Map preCommit(Map currentOffsets) { + assert currentOffsets.size() == 1; + tracker.transformPreCommit(currentOffsets, PROBABLY_SAFE_OFFSET_ROLLBACK); + return currentOffsets; + } + private void closeSenderSilently() { try { if (sender != null) { @@ -159,6 +181,9 @@ private void closeSenderSilently() { private void handleSingleRecord(SinkRecord record) { assert timestampColumnValue == Long.MIN_VALUE; + + tracker.onObservedOffset(record.kafkaPartition(), record.topic(), record.kafkaOffset()); + String explicitTable = config.getTable(); String tableName = explicitTable == null ? record.topic() : explicitTable; sender.table(tableName); diff --git a/connector/src/main/java/io/questdb/kafka/SingleTopicPartitionOffsetTracker.java b/connector/src/main/java/io/questdb/kafka/SingleTopicPartitionOffsetTracker.java new file mode 100644 index 0000000..b2d44a2 --- /dev/null +++ b/connector/src/main/java/io/questdb/kafka/SingleTopicPartitionOffsetTracker.java @@ -0,0 +1,72 @@ +package io.questdb.kafka; + +import io.questdb.std.LongList; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.sink.SinkTaskContext; + +import java.util.Collection; +import java.util.Map; + +public final class SingleTopicPartitionOffsetTracker implements TopicPartitionOffsetTracker { + private final LongList offsets = new LongList(); + private String topic; + + @Override + public void onPartitionsOpened(Collection partitions) { + for (TopicPartition partition : partitions) { + if (topic == null) { + topic = partition.topic(); + } else if (!topic.equals(partition.topic())) { + throw new IllegalArgumentException("SingleTopicPartitionOffsetTracker can only track a single topic"); + } + offsets.extendAndSet(partition.partition(), -1); + } + } + + @Override + public void onPartitionsClosed(Collection partitions) { + for (TopicPartition partition : partitions) { + offsets.extendAndSet(partition.partition(), -1); + } + } + + @Override + public void onObservedOffset(int partition, String topic, long offset) { + long currentOffset = offsets.get(partition); + if (currentOffset < offset) { + offsets.extendAndSet(partition, offset); + } + } + + @Override + public void configureSafeOffsets(SinkTaskContext sinkTaskContext, long rewindOffset) { + assert topic != null; + + for (int partition = 0; partition < offsets.size(); partition++) { + long offset = offsets.get(partition); + if (offset != -1) { + long newOffset = Math.max(0, offset - rewindOffset); + sinkTaskContext.offset(new TopicPartition(topic, partition), newOffset); + } + } + } + + @Override + public void transformPreCommit(Map currentOffsets, long rewindOffset) { + assert topic != null; + + for (Map.Entry entry : currentOffsets.entrySet()) { + TopicPartition topicPartition = entry.getKey(); + if (!topicPartition.topic().equals(topic)) { + throw new IllegalArgumentException("SingleTopicPartitionOffsetTracker can only track a single topic"); + } + long offset = offsets.get(topicPartition.partition()); + if (offset != -1) { + long newOffset = Math.max(0, offset - rewindOffset); + OffsetAndMetadata offsetAndMetadata = entry.getValue(); + currentOffsets.put(topicPartition, new OffsetAndMetadata(newOffset, offsetAndMetadata.leaderEpoch(), offsetAndMetadata.metadata())); + } + } + } +} diff --git a/connector/src/main/java/io/questdb/kafka/TopicPartitionOffsetTracker.java b/connector/src/main/java/io/questdb/kafka/TopicPartitionOffsetTracker.java new file mode 100644 index 0000000..7791ef2 --- /dev/null +++ b/connector/src/main/java/io/questdb/kafka/TopicPartitionOffsetTracker.java @@ -0,0 +1,20 @@ +package io.questdb.kafka; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.sink.SinkTaskContext; + +import java.util.Collection; +import java.util.Map; + +public interface TopicPartitionOffsetTracker { + void onPartitionsOpened(Collection partitions); + + void onPartitionsClosed(Collection partitions); + + void onObservedOffset(int partition, String topic, long offset); + + void configureSafeOffsets(SinkTaskContext sinkTaskContext, long rewindOffset); + + void transformPreCommit(Map currentOffsets, long rewindOffset); +} diff --git a/connector/src/test/java/io/questdb/kafka/ConnectTestUtils.java b/connector/src/test/java/io/questdb/kafka/ConnectTestUtils.java index 712f2f2..5ff9d73 100644 --- a/connector/src/test/java/io/questdb/kafka/ConnectTestUtils.java +++ b/connector/src/test/java/io/questdb/kafka/ConnectTestUtils.java @@ -4,6 +4,7 @@ import org.apache.kafka.connect.runtime.AbstractStatus; import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; +import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; import org.apache.kafka.connect.storage.StringConverter; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; import org.awaitility.Awaitility; @@ -42,18 +43,22 @@ static void assertConnectorTaskStateEventually(EmbeddedConnectCluster connect, A } static Map baseConnectorProps(GenericContainer questDBContainer, String topicName) { + String ilpIUrl = questDBContainer.getHost() + ":" + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_ILP_PORT); + Map props = new HashMap<>(); props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, QuestDBSinkConnector.class.getName()); props.put("topics", topicName); props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); props.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); - props.put("host", questDBContainer.getHost() + ":" + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_ILP_PORT)); + props.put("host", ilpIUrl); return props; } static void assertConnectorTaskState(EmbeddedConnectCluster connect, String connectorName, AbstractStatus.State expectedState) { - ConnectorStateInfo info = connect.connectorStatus(connectorName); - if (info == null) { + ConnectorStateInfo info = null; + try { + info = connect.connectorStatus(connectorName); + } catch (ConnectRestException e) { fail("Connector " + connectorName + " not found"); } List taskStates = info.tasks(); diff --git a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedAuthTest.java b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedAuthTest.java index c1f42b5..dc991c9 100644 --- a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedAuthTest.java +++ b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedAuthTest.java @@ -82,8 +82,9 @@ public void testSmoke() { connect.kafka().produce(topicName, "key", new String(converter.fromConnectData(topicName, schema, struct))); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"age\"\r\n" + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\"\r\n" + "\"John\",\"Doe\",42\r\n", - "select firstname,lastname,age from " + topicName); + "select firstname,lastname,age from " + topicName, + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT)); } } diff --git a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java index cb47e40..6b90e0a 100644 --- a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java +++ b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java @@ -1,9 +1,11 @@ package io.questdb.kafka; +import io.questdb.std.Os; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.connect.data.Date; import org.apache.kafka.connect.data.*; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.json.JsonConverter; @@ -13,23 +15,22 @@ import org.apache.kafka.connect.storage.ConverterType; import org.apache.kafka.connect.storage.StringConverter; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.io.TempDir; import org.slf4j.LoggerFactory; import org.testcontainers.containers.FixedHostPortGenericContainer; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; -import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import java.math.BigDecimal; -import java.util.Calendar; -import java.util.HashMap; -import java.util.Map; -import java.util.TimeZone; +import java.nio.file.Path; +import java.time.Instant; +import java.util.*; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import static java.util.Collections.singletonMap; @@ -42,33 +43,54 @@ @Testcontainers public final class QuestDBSinkConnectorEmbeddedTest { + private static int httpPort = -1; + private static int ilpPort = -1; + private static final String OFFICIAL_QUESTDB_DOCKER = "questdb/questdb:nightly"; + private static final boolean DUMP_QUESTDB_CONTAINER_LOGS = true; + private EmbeddedConnectCluster connect; private Converter converter; private String topicName; - @Container - private static GenericContainer questDBContainer = newQuestDbConnector(); + @TempDir + static Path dbRoot; - private static GenericContainer newQuestDbConnector() { - return newQuestDbConnector(null, null); + @BeforeAll + public static void createContainer() { + questDBContainer = newQuestDbConnector(); + } + + @AfterAll + public static void stopContainer() { + questDBContainer.stop(); } - private static GenericContainer newQuestDbConnector(Integer httpPort, Integer ilpPort) { - FixedHostPortGenericContainer selfGenericContainer = new FixedHostPortGenericContainer<>("questdb/questdb:6.6.1"); - if (httpPort != null) { - selfGenericContainer.withFixedExposedPort(httpPort, QuestDBUtils.QUESTDB_HTTP_PORT); + private static GenericContainer questDBContainer; + + private static GenericContainer newQuestDbConnector() { + FixedHostPortGenericContainer selfGenericContainer = new FixedHostPortGenericContainer<>(OFFICIAL_QUESTDB_DOCKER); + if (httpPort != -1) { + selfGenericContainer = selfGenericContainer.withFixedExposedPort(httpPort, QuestDBUtils.QUESTDB_HTTP_PORT); } else { selfGenericContainer.addExposedPort(QuestDBUtils.QUESTDB_HTTP_PORT); } - if (ilpPort != null) { - selfGenericContainer.withFixedExposedPort(ilpPort, QuestDBUtils.QUESTDB_ILP_PORT); + if (ilpPort != -1) { + selfGenericContainer = selfGenericContainer.withFixedExposedPort(ilpPort, QuestDBUtils.QUESTDB_ILP_PORT); } else { selfGenericContainer.addExposedPort(QuestDBUtils.QUESTDB_ILP_PORT); } - selfGenericContainer.setWaitStrategy(new LogMessageWaitStrategy().withRegEx(".*server-main enjoy.*")); - return selfGenericContainer.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("questdb"))) - .withEnv("QDB_CAIRO_COMMIT_LAG", "100") - .withEnv("JAVA_OPTS", "-Djava.locale.providers=JRE,SPI"); + String dbRootString = dbRoot.toAbsolutePath().toString(); + selfGenericContainer = selfGenericContainer.withFileSystemBind(dbRootString, "/var/lib/questdb"); + if (DUMP_QUESTDB_CONTAINER_LOGS) { + selfGenericContainer = selfGenericContainer.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("questdb"))); + } + selfGenericContainer.setWaitStrategy(new LogMessageWaitStrategy().withRegEx(".*server-main.*")); + selfGenericContainer.start(); + + httpPort = selfGenericContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT); + ilpPort = selfGenericContainer.getMappedPort(QuestDBUtils.QUESTDB_ILP_PORT); + + return selfGenericContainer; } @BeforeEach @@ -78,8 +100,13 @@ public void setUp() { jsonConverter.configure(singletonMap(ConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName())); converter = jsonConverter; + + Map props = new HashMap<>(); + props.put("connector.client.config.override.policy", "All"); connect = new EmbeddedConnectCluster.Builder() .name("questdb-connect-cluster") + .workerProps(props) + .numWorkers(4) .build(); connect.start(); @@ -109,9 +136,10 @@ public void testSmoke() { connect.kafka().produce(topicName, "key", new String(converter.fromConnectData(topicName, schema, struct))); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"age\"\r\n" + QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n" + "\"John\",\"Doe\",42\r\n", - "select firstname,lastname,age from " + topicName); + "select firstname,lastname,age from " + topicName, + httpPort); } @Test @@ -127,9 +155,10 @@ public void testDeadLetterQueue_wrongJson() { connect.kafka().produce(topicName, "key", "{\"not valid json}"); connect.kafka().produce(topicName, "key", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42}"); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"age\"\r\n" + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\"\r\n" + "\"John\",\"Doe\",42\r\n", - "select firstname,lastname,age from " + topicName); + "select firstname,lastname,age from " + topicName, + httpPort); ConsumerRecords fetchedRecords = connect.kafka().consume(1, 5000, "dlq"); Assertions.assertEquals(1, fetchedRecords.count()); @@ -158,9 +187,10 @@ public void testSymbol() { connect.kafka().produce(topicName, "key", new String(converter.fromConnectData(topicName, schema, struct))); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"age\"\r\n" + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\"\r\n" + "\"John\",\"Doe\",42\r\n", - "select firstname,lastname,age from " + topicName); + "select firstname,lastname,age from " + topicName, + httpPort); } @Test @@ -176,9 +206,10 @@ public void testRetrying_badDataStopsTheConnectorEventually() throws Exception { // creates a record with 'age' as long connect.kafka().produce(topicName, "key", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42}"); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"age\"\r\n" + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\"\r\n" + "\"John\",\"Doe\",42\r\n", - "select firstname,lastname,age from " + topicName); + "select firstname,lastname,age from " + topicName, + httpPort); for (int i = 0; i < 50; i++) { // injects a record with 'age' as string @@ -208,14 +239,11 @@ public void testRetrying_recoversFromInfrastructureIssues() throws Exception { connect.kafka().produce(topicName, "key1", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42}"); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"age\"\r\n" + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\"\r\n" + "\"John\",\"Doe\",42\r\n", - "select firstname,lastname,age from " + topicName); + "select firstname,lastname,age from " + topicName, + httpPort); - // we need to get mapped ports, becasue we are going to kill the container and restart it again - // and we need the same mapping otherwise the Kafka connect will not be able to re-connect to it - Integer httpPort = questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT); - Integer ilpPort = questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_ILP_PORT); questDBContainer.stop(); // insert a few records while the QuestDB is down for (int i = 0; i < 10; i++) { @@ -224,16 +252,16 @@ public void testRetrying_recoversFromInfrastructureIssues() throws Exception { } // restart QuestDB - questDBContainer = newQuestDbConnector(httpPort, ilpPort); - questDBContainer.start(); + questDBContainer = newQuestDbConnector(); for (int i = 0; i < 50; i++) { connect.kafka().produce(topicName, "key3", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":" + i + "}"); Thread.sleep(100); } - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"age\"\r\n" + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\"\r\n" + "\"John\",\"Doe\",49\r\n", - "select firstname,lastname,age from " + topicName + " where age = 49"); + "select firstname,lastname,age from " + topicName + " where age = 49", + httpPort); } @Test @@ -312,10 +340,11 @@ public void testSymbol_withAllOtherILPTypes() { connect.kafka().produce(topicName, "p1", new String(converter.fromConnectData(topicName, schema, p1))); connect.kafka().produce(topicName, "p2", new String(converter.fromConnectData(topicName, schema, p2))); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"age\",\"vegan\",\"height\",\"birth\"\r\n" + QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\",\"vegan\",\"height\",\"birth\"\r\n" + "\"John\",\"Doe\",42,true,1.8,\"2022-10-23T13:53:59.123000Z\"\r\n" + "\"Jane\",\"Doe\",41,false,1.6,\"2021-10-23T13:53:59.123000Z\"\r\n", - "select firstname,lastname,age,vegan,height,birth from " + topicName); + "select firstname,lastname,age,vegan,height,birth from " + topicName, + httpPort); } @Test @@ -336,16 +365,103 @@ public void testUpfrontTable_withSymbols() { .put("lastname", "Doe") .put("age", (byte) 42); - QuestDBUtils.assertSql(questDBContainer, - "{\"ddl\":\"OK\"}\n", + QuestDBUtils.assertSql( + "{\"ddl\":\"OK\"}", "create table " + topicName + " (firstname symbol, lastname symbol, age int)", + httpPort, QuestDBUtils.Endpoint.EXEC); connect.kafka().produce(topicName, "key", new String(converter.fromConnectData(topicName, schema, struct))); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"age\",\"key\"\r\n" + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\",\"key\"\r\n" + "\"John\",\"Doe\",42,\"key\"\r\n", - "select * from " + topicName); + "select * from " + topicName, + httpPort); + } + + @Test + public void testExactlyOnce_withDedup() throws BrokenBarrierException, InterruptedException { + connect.kafka().createTopic(topicName, 4); + + Schema schema = SchemaBuilder.struct().name("com.example.Event") + .field("ts", Schema.INT64_SCHEMA) + .field("id", Schema.STRING_SCHEMA) + .field("type", Schema.INT64_SCHEMA) + .build(); + + // async inserts to Kafka + long recordCount = 10_000_000; + Map prodProps = new HashMap<>(); + new Thread(() -> { + try (KafkaProducer producer = connect.kafka().createProducer(prodProps)) { + for (long i = 0; i < recordCount; i++) { + Instant now = Instant.now(); + long nanoTs = now.getEpochSecond() * 1_000_000_000 + now.getNano(); + Struct struct = new Struct(schema) + .put("ts", nanoTs) + .put("id", UUID.randomUUID().toString()) + .put("type", (i % 5)); + + byte[] value = new String(converter.fromConnectData(topicName, schema, struct)).getBytes(); + producer.send(new ProducerRecord<>(topicName, null, null, value)); + + // 1% chance of duplicates - we want them to be also deduped by QuestDB + if (ThreadLocalRandom.current().nextInt(100) == 0) { + producer.send(new ProducerRecord<>(topicName, null, null, value)); + } + + } + } + }).start(); + + + // configure questdb dedups + QuestDBUtils.assertSql( + "{\"ddl\":\"OK\"}", + "CREATE TABLE " + topicName + " (ts TIMESTAMP, id UUID, type LONG) timestamp(ts) PARTITION BY DAY WAL DEDUP UPSERT KEYS(ts, id);", + httpPort, + QuestDBUtils.Endpoint.EXEC); + + // start connector + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); + props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false"); + props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "ts"); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); + + // restart QuestDB every 15 seconds + CyclicBarrier barrier = new CyclicBarrier(2); + new Thread(() -> { + while (barrier.getNumberWaiting() == 0) { + Os.sleep(15_000); + restartQuestDB(); + } + try { + barrier.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError(e); + } catch (BrokenBarrierException e) { + // shouldn't happen + throw new RuntimeException(e); + } + }).start(); + + // make sure we have all records in the table + QuestDBUtils.assertSqlEventually( + "\"count\"\r\n" + + recordCount + "\r\n", + "select count(*) from " + topicName, + 600, + httpPort); + + // await the restarter thread so we don't leave dangling threads behind + barrier.await(); + } + + private static void restartQuestDB() { + questDBContainer.stop(); + questDBContainer = newQuestDbConnector(); } @Test @@ -372,11 +488,12 @@ public void testTimestampUnitResolution_auto() { connect.kafka().produce(topicName, "bar", "{\"firstname\":\"Jane\",\"lastname\":\"Doe\",\"birth\":" + birthInMicros + "}"); connect.kafka().produce(topicName, "baz", "{\"firstname\":\"Jack\",\"lastname\":\"Doe\",\"birth\":" + birthInNanos + "}"); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"timestamp\"\r\n" + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"timestamp\"\r\n" + "\"John\",\"Doe\",\"2022-10-23T13:53:59.123000Z\"\r\n" + "\"Jane\",\"Doe\",\"2022-10-23T13:53:59.123000Z\"\r\n" + "\"Jack\",\"Doe\",\"2022-10-23T13:53:59.123000Z\"\r\n", - "select firstname,lastname,timestamp from " + topicName); + "select firstname,lastname,timestamp from " + topicName, + httpPort); } @Test @@ -428,10 +545,11 @@ private void testTimestampUnitResolution0(String mode) { connect.kafka().produce(topicName, "foo", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"birth\":0}"); connect.kafka().produce(topicName, "bar", "{\"firstname\":\"Jane\",\"lastname\":\"Doe\",\"birth\":" + birthTarget + "}"); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"timestamp\"\r\n" + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"timestamp\"\r\n" + "\"John\",\"Doe\",\"1970-01-01T00:00:00.000000Z\"\r\n" + "\"Jane\",\"Doe\",\"2206-11-20T17:46:39.999000Z\"\r\n", - "select firstname,lastname,timestamp from " + topicName); + "select firstname,lastname,timestamp from " + topicName, + httpPort); } @Test @@ -458,9 +576,10 @@ public void testKafkaNativeTimestamp() { connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); ConnectTestUtils.assertConnectorTaskRunningEventually(connect); - QuestDBUtils.assertSql(questDBContainer, - "{\"ddl\":\"OK\"}\n", + QuestDBUtils.assertSql( + "{\"ddl\":\"OK\"}", "create table " + topicName + " (firstname string, lastname string, born timestamp) timestamp(born)", + httpPort, QuestDBUtils.Endpoint.EXEC); java.util.Date birth = new Calendar.Builder() @@ -476,9 +595,10 @@ public void testKafkaNativeTimestamp() { producer.send(record); } - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"born\"\r\n" + + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"born\"\r\n" + "\"John\",\"Doe\",\"2022-10-23T13:53:59.123000Z\"\r\n", - "select * from " + topicName); + "select * from " + topicName, + httpPort); } @Test @@ -504,9 +624,10 @@ public void testTimestampSMT_parseTimestamp_schemaLess() { connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); ConnectTestUtils.assertConnectorTaskRunningEventually(connect); - QuestDBUtils.assertSql(questDBContainer, - "{\"ddl\":\"OK\"}\n", + QuestDBUtils.assertSql( + "{\"ddl\":\"OK\"}", "create table " + topicName + " (firstname string, lastname string, death timestamp, born timestamp) timestamp(born)", + httpPort, QuestDBUtils.Endpoint.EXEC); String birthTimestamp = "1985-08-02 16:41:55.402 UTC"; @@ -518,9 +639,10 @@ public void testTimestampSMT_parseTimestamp_schemaLess() { + ",\"born\":\"" + birthTimestamp + "\"}" ); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"death\",\"born\"\r\n" + + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"death\",\"born\"\r\n" + "\"John\",\"Doe\",\"2023-08-02T16:41:55.402000Z\",\"1985-08-02T16:41:55.402000Z\"\r\n", - "select * from " + topicName); + "select * from " + topicName, + httpPort); } @Test @@ -560,9 +682,10 @@ public void testTimestampSMT_parseTimestamp_withSchema() { connect.kafka().produce(topicName, "key", new String(converter.fromConnectData(topicName, schema, struct))); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"death\",\"timestamp\"\r\n" + + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"death\",\"timestamp\"\r\n" + "\"John\",\"Doe\",\"2023-08-02T16:41:55.402000Z\",\"1985-08-02T16:41:55.402000Z\"\r\n", - "select * from " + topicName); + "select * from " + topicName, + httpPort); } @Test @@ -582,16 +705,18 @@ public void testUpfrontTable() { .put("lastname", "Doe") .put("age", (byte) 42); - QuestDBUtils.assertSql(questDBContainer, - "{\"ddl\":\"OK\"}\n", + QuestDBUtils.assertSql( + "{\"ddl\":\"OK\"}", "create table " + topicName + " (firstname string, lastname string, age int)", + httpPort, QuestDBUtils.Endpoint.EXEC); connect.kafka().produce(topicName, "key", new String(converter.fromConnectData(topicName, schema, struct))); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"age\",\"key\"\r\n" + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\",\"key\"\r\n" + "\"John\",\"Doe\",42,\"key\"\r\n", - "select * from " + topicName); + "select * from " + topicName, + httpPort); } @Test @@ -605,9 +730,10 @@ public void testDesignatedTimestamp_noSchema_unixEpochMillis() { connect.kafka().produce(topicName, "foo", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"birth\":433774466123}"); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"key\",\"firstname\",\"lastname\",\"timestamp\"\r\n" + QuestDBUtils.assertSqlEventually("\"key\",\"firstname\",\"lastname\",\"timestamp\"\r\n" + "\"foo\",\"John\",\"Doe\",\"1983-09-30T12:54:26.123000Z\"\r\n", - "select * from " + topicName); + "select * from " + topicName, + httpPort); } @Test @@ -626,9 +752,10 @@ public void testDesignatedTimestamp_noSchema_dateTransform_fromStringToTimestamp connect.kafka().produce(topicName, "foo", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"birth\":\"1989-09-23T10:25:33.107Z\"}"); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"key\",\"firstname\",\"lastname\",\"timestamp\"\r\n" + QuestDBUtils.assertSqlEventually("\"key\",\"firstname\",\"lastname\",\"timestamp\"\r\n" + "\"foo\",\"John\",\"Doe\",\"1989-09-23T10:25:33.107000Z\"\r\n", - "select * from " + topicName); + "select * from " + topicName, + httpPort); } @Test @@ -658,9 +785,10 @@ public void testDesignatedTimestamp_withSchema() { connect.kafka().produce(topicName, "key", new String(converter.fromConnectData(topicName, schema, struct))); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"key\",\"firstname\",\"lastname\",\"timestamp\"\r\n" + QuestDBUtils.assertSqlEventually("\"key\",\"firstname\",\"lastname\",\"timestamp\"\r\n" + "\"key\",\"John\",\"Doe\",\"2022-10-23T13:53:59.123000Z\"\r\n", - "select * from " + topicName); + "select * from " + topicName, + httpPort); } @Test @@ -691,9 +819,10 @@ public void testDoNotIncludeKey() { connect.kafka().produce(topicName, "key", new String(converter.fromConnectData(topicName, schema, struct))); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"timestamp\"\r\n" + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"timestamp\"\r\n" + "\"John\",\"Doe\",\"2022-10-23T13:53:59.123000Z\"\r\n", - "select * from " + topicName); + "select * from " + topicName, + httpPort); } @Test @@ -705,9 +834,10 @@ public void testJsonNoSchema() { ConnectTestUtils.assertConnectorTaskRunningEventually(connect); connect.kafka().produce(topicName, "key", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42}"); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"age\"\r\n" + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\"\r\n" + "\"John\",\"Doe\",42\r\n", - "select firstname,lastname,age from " + topicName); + "select firstname,lastname,age from " + topicName, + httpPort); } @Test @@ -721,10 +851,11 @@ public void testJsonNoSchema_mixedFlotingAndIntTypes() { connect.kafka().produce(topicName, "key", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42}"); connect.kafka().produce(topicName, "key", "{\"firstname\":\"Jane\",\"lastname\":\"Doe\",\"age\":42.5}"); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"age\"\r\n" + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\"\r\n" + "\"John\",\"Doe\",42.0\r\n" + "\"Jane\",\"Doe\",42.5\r\n", - "select firstname,lastname,age from " + topicName); + "select firstname,lastname,age from " + topicName, + httpPort); } @@ -759,9 +890,10 @@ public void testPrimitiveKey() { connect.kafka().produce(topicName, "key", new String(converter.fromConnectData(topicName, schema, struct))); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"age\",\"key\"\r\n" + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\",\"key\"\r\n" + "\"John\",\"Doe\",42,\"key\"\r\n", - "select firstname, lastname, age, key from " + topicName); + "select firstname, lastname, age, key from " + topicName, + httpPort); } @Test @@ -777,9 +909,10 @@ public void testParsingStringTimestamp() { connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); ConnectTestUtils.assertConnectorTaskRunningEventually(connect); - QuestDBUtils.assertSql(questDBContainer, - "{\"ddl\":\"OK\"}\n", + QuestDBUtils.assertSql( + "{\"ddl\":\"OK\"}", "create table " + topicName + " (firstname string, lastname string, death timestamp, born timestamp) timestamp(born)", + httpPort, QuestDBUtils.Endpoint.EXEC); String birthTimestamp = "1985-08-02 16:41:55.402095 UTC"; @@ -791,9 +924,10 @@ public void testParsingStringTimestamp() { + ",\"born\":\"" + birthTimestamp + "\"}" ); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"death\",\"born\"\r\n" + + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"death\",\"born\"\r\n" + "\"John\",\"Doe\",\"2023-08-02T16:41:55.402095Z\",\"1985-08-02T16:41:55.402095Z\"\r\n", - "select * from " + topicName); + "select * from " + topicName, + httpPort); } @Test @@ -808,9 +942,10 @@ public void testParsingStringTimestamp_defaultPattern() { connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); ConnectTestUtils.assertConnectorTaskRunningEventually(connect); - QuestDBUtils.assertSql(questDBContainer, - "{\"ddl\":\"OK\"}\n", + QuestDBUtils.assertSql( + "{\"ddl\":\"OK\"}", "create table " + topicName + " (firstname string, lastname string, death timestamp, born timestamp) timestamp(born)", + httpPort, QuestDBUtils.Endpoint.EXEC); String birthTimestamp = "1985-08-02T16:41:55.402095Z"; @@ -822,9 +957,10 @@ public void testParsingStringTimestamp_defaultPattern() { + ",\"born\":\"" + birthTimestamp + "\"}" ); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"death\",\"born\"\r\n" + + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"death\",\"born\"\r\n" + "\"John\",\"Doe\",\"2023-08-02T16:41:55.402095Z\",\"1985-08-02T16:41:55.402095Z\"\r\n", - "select * from " + topicName); + "select * from " + topicName, + httpPort); } @Test @@ -841,9 +977,10 @@ public void testCustomPrefixWithPrimitiveKeyAndValues() { connect.kafka().produce(topicName, "foo", "bar"); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"col_key\",\"col_value\"\r\n" + QuestDBUtils.assertSqlEventually("\"col_key\",\"col_value\"\r\n" + "\"foo\",\"bar\"\r\n", - "select col_key, col_value from " + topicName); + "select col_key, col_value from " + topicName, + httpPort); } @Test @@ -866,9 +1003,10 @@ public void testSkipUnsupportedType_Bytes() { connect.kafka().produce(topicName, "key", new String(converter.fromConnectData(topicName, schema, struct))); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"key\",\"firstname\",\"lastname\"\r\n" + QuestDBUtils.assertSqlEventually("\"key\",\"firstname\",\"lastname\"\r\n" + "\"key\",\"John\",\"Doe\"\r\n", - "select key, firstname, lastname from " + topicName); + "select key, firstname, lastname from " + topicName, + httpPort); } @Test @@ -883,9 +1021,10 @@ public void testDefaultPrefixWithPrimitiveKeyAndValues() { connect.kafka().produce(topicName, "foo", "bar"); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"key\",\"value\"\r\n" + QuestDBUtils.assertSqlEventually("\"key\",\"value\"\r\n" + "\"foo\",\"bar\"\r\n", - "select key, value from " + topicName); + "select key, value from " + topicName, + httpPort); } @Test @@ -908,9 +1047,10 @@ public void testStructKey() { String json = new String(converter.fromConnectData(topicName, schema, struct)); connect.kafka().produce(topicName, json, json); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"key_firstname\",\"key_lastname\"\r\n" + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"key_firstname\",\"key_lastname\"\r\n" + "\"John\",\"Doe\",\"John\",\"Doe\"\r\n", - "select firstname, lastname, key_firstname, key_lastname from " + topicName); + "select firstname, lastname, key_firstname, key_lastname from " + topicName, + httpPort); } @Test @@ -935,9 +1075,10 @@ public void testStructKeyWithNoPrefix() { String json = new String(converter.fromConnectData(topicName, schema, struct)); connect.kafka().produce(topicName, json, "foo"); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"value\"\r\n" + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"value\"\r\n" + "\"John\",\"Doe\",\"foo\"\r\n", - "select firstname, lastname, value from " + topicName); + "select firstname, lastname, value from " + topicName, + httpPort); } @Test @@ -961,9 +1102,10 @@ public void testStructKeyAndPrimitiveValue() { String json = new String(converter.fromConnectData(topicName, schema, struct)); connect.kafka().produce(topicName, json, "foo"); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"key_firstname\",\"key_lastname\",\"value\"\r\n" + QuestDBUtils.assertSqlEventually("\"key_firstname\",\"key_lastname\",\"value\"\r\n" + "\"John\",\"Doe\",\"foo\"\r\n", - "select key_firstname, key_lastname, value from " + topicName); + "select key_firstname, key_lastname, value from " + topicName, + httpPort); } @@ -988,9 +1130,10 @@ public void testExplicitTableName() { connect.kafka().produce(topicName, "key", new String(converter.fromConnectData(topicName, schema, struct))); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"age\"\r\n" + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\"\r\n" + "\"John\",\"Doe\",42\r\n", - "select firstname,lastname,age from " + tableName); + "select firstname,lastname,age from " + tableName, + httpPort); } @Test @@ -1037,9 +1180,10 @@ public void testLogicalTypes() { connect.kafka().produce(topicName, "key", new String(converter.fromConnectData(topicName, schema, struct))); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"age\",\"col_timestamp\",\"col_date\",\"col_time\"\r\n" + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\",\"col_timestamp\",\"col_date\",\"col_time\"\r\n" + "\"John\",\"Doe\",42,\"2022-10-23T13:53:59.123000Z\",\"2022-10-23T00:00:00.000000Z\",50039123\r\n", - "select firstname,lastname,age, col_timestamp, col_date, col_time from " + topicName); + "select firstname,lastname,age, col_timestamp, col_date, col_time from " + topicName, + httpPort); } @Test @@ -1093,9 +1237,10 @@ public void testNestedStructInValue() { String value = new String(converter.fromConnectData(topicName, personSchema, person)); connect.kafka().produce(topicName, "key", value); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"name_firstname\",\"name_lastname\"\r\n" + QuestDBUtils.assertSqlEventually("\"name_firstname\",\"name_lastname\"\r\n" + "\"John\",\"Doe\"\r\n", - "select name_firstname, name_lastname from " + topicName); + "select name_firstname, name_lastname from " + topicName, + httpPort); } @Test @@ -1135,8 +1280,9 @@ public void testMultiLevelNestedStructInValue() { String value = new String(converter.fromConnectData(topicName, coupleSchema, couple)); connect.kafka().produce(topicName, "key", value); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"partner1_name_firstname\",\"partner1_name_lastname\",\"partner2_name_firstname\",\"partner2_name_lastname\"\r\n" + QuestDBUtils.assertSqlEventually("\"partner1_name_firstname\",\"partner1_name_lastname\",\"partner2_name_firstname\",\"partner2_name_lastname\"\r\n" + "\"John\",\"Doe\",\"Jane\",\"Doe\"\r\n", - "select partner1_name_firstname, partner1_name_lastname, partner2_name_firstname, partner2_name_lastname from " + topicName); + "select partner1_name_firstname, partner1_name_lastname, partner2_name_firstname, partner2_name_lastname from " + topicName, + httpPort); } } diff --git a/connector/src/test/java/io/questdb/kafka/QuestDBUtils.java b/connector/src/test/java/io/questdb/kafka/QuestDBUtils.java index 6cacc5d..6970672 100644 --- a/connector/src/test/java/io/questdb/kafka/QuestDBUtils.java +++ b/connector/src/test/java/io/questdb/kafka/QuestDBUtils.java @@ -4,7 +4,6 @@ import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; -import org.testcontainers.containers.GenericContainer; import java.io.IOException; import java.net.URLEncoder; @@ -43,23 +42,32 @@ private QuestDBUtils() { } - public static void assertSqlEventually(GenericContainer questdbContainer, String expectedResult, String query) { - await().atMost(QUERY_WAITING_TIME_SECONDS, TimeUnit.SECONDS).untilAsserted(() -> assertSql(questdbContainer, expectedResult, query)); + public static void assertSqlEventually(String expectedResult, String query, int timeoutSeconds, int port) { + await().atMost(timeoutSeconds, TimeUnit.SECONDS).untilAsserted(() -> assertSql(expectedResult, query, port)); } - public static void assertSql(GenericContainer questdbContainer, String expectedResult, String query) { - assertSql(questdbContainer, expectedResult, query, Endpoint.EXPORT); + public static void assertSqlEventually(String expectedResult, String query, int port) { + await().atMost(QUERY_WAITING_TIME_SECONDS, TimeUnit.SECONDS).untilAsserted(() -> assertSql(expectedResult, query, port)); } - public static void assertSql(GenericContainer questdbContainer, String expectedResult, String query, Endpoint endpoint) { - try (Response response = executeQuery(questdbContainer, query, endpoint)) { + public static void assertSql(String expectedResult, String query, int port) { + assertSql(expectedResult, query, port, Endpoint.EXPORT); + } + + public static void assertSql(String expectedResult, String query, int port, Endpoint endpoint) { + try (Response response = executeQuery(port, query, endpoint)) { if (response.code() != 200) { fail("Query failed, returned code " + response.code()); } try (okhttp3.ResponseBody body = response.body()) { if (body != null) { String bodyString = body.string(); - assertEquals(expectedResult, bodyString); + try { + assertEquals(expectedResult, bodyString); + } catch (AssertionError e) { + System.out.println("Received response: " + bodyString); + throw e; + } } } } catch (IOException e) { @@ -67,9 +75,9 @@ public static void assertSql(GenericContainer questdbContainer, String expect } } - private static Response executeQuery(GenericContainer questContainer, String query, Endpoint endpoint) throws IOException { + private static Response executeQuery(int port, String query, Endpoint endpoint) throws IOException { String encodedQuery = URLEncoder.encode(query, "UTF-8"); - String baseUrl = "http://" + questContainer.getHost() + ":" + questContainer.getMappedPort(QUESTDB_HTTP_PORT); + String baseUrl = "http://localhost:" + port; Request request = new Request.Builder() .url(baseUrl + "/" + endpoint.endpoint + "?query=" + encodedQuery) .build(); diff --git a/connector/src/test/resources/log4j.properties b/connector/src/test/resources/log4j.properties index ec93932..95c1bac 100644 --- a/connector/src/test/resources/log4j.properties +++ b/connector/src/test/resources/log4j.properties @@ -26,4 +26,4 @@ log4j.logger.org.apache.zookeeper=WARN log4j.logger.kafka=WARN log4j.logger.org.reflections=ERROR log4j.logger.state.change.logger=WARN -log4j.logger.io.questdb.kafka=DEBUG +#log4j.logger.io.questdb.kafka=DEBUG diff --git a/integration-tests/avro-schema-registry/src/test/java/io/questdb/kafka/AvroSchemaRegistryIT.java b/integration-tests/avro-schema-registry/src/test/java/io/questdb/kafka/AvroSchemaRegistryIT.java index 68c17cd..a607b1c 100644 --- a/integration-tests/avro-schema-registry/src/test/java/io/questdb/kafka/AvroSchemaRegistryIT.java +++ b/integration-tests/avro-schema-registry/src/test/java/io/questdb/kafka/AvroSchemaRegistryIT.java @@ -106,9 +106,9 @@ public void testSmoke() throws Exception { } startConnector(topicName); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"timestamp\"\r\n" + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"timestamp\"\r\n" + "\"John\",\"Doe\",\"2000-01-01T00:00:00.000000Z\"\r\n", - "select * from " + topicName); + "select * from " + topicName, questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT)); } @Test @@ -124,9 +124,9 @@ public void testSchemaEvolution() throws Exception { } startConnector(topicName); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"timestamp\"\r\n" + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"timestamp\"\r\n" + "\"John\",\"Doe\",\"2000-01-01T00:00:00.000000Z\"\r\n", - "select * from " + topicName); + "select * from " + topicName, questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT)); try (Producer producer = new KafkaProducer<>(producerProps())) { Schema schema = new org.apache.avro.Schema.Parser().parse(getClass().getResourceAsStream("/avro-runtime/StudentWithExtraColumn.avsc")); @@ -137,10 +137,10 @@ public void testSchemaEvolution() throws Exception { student.put("active", true); producer.send(new ProducerRecord<>(topicName, "foo", student)).get(); } - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"timestamp\",\"active\"\r\n" + QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"timestamp\",\"active\"\r\n" + "\"John\",\"Doe\",\"2000-01-01T00:00:00.000000Z\",false\r\n" + "\"Mary\",\"Doe\",\"2005-01-01T00:00:00.000000Z\",true\r\n", - "select * from " + topicName); + "select * from " + topicName, questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT)); } private void startConnector(String topicName) { diff --git a/integration-tests/cp-server/src/test/java/io/questdb/kafka/QuestDBSinkConnectorIT.java b/integration-tests/cp-server/src/test/java/io/questdb/kafka/QuestDBSinkConnectorIT.java index 17dc386..7e8e7a8 100644 --- a/integration-tests/cp-server/src/test/java/io/questdb/kafka/QuestDBSinkConnectorIT.java +++ b/integration-tests/cp-server/src/test/java/io/questdb/kafka/QuestDBSinkConnectorIT.java @@ -97,8 +97,8 @@ public void test() throws Exception { connectContainer.registerConnector("my-connector", connector); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"key\",\"value\"\r\n" - + "\"foo\",\"bar\"\r\n", "select key, value from " + topicName); + QuestDBUtils.assertSqlEventually( "\"key\",\"value\"\r\n" + + "\"foo\",\"bar\"\r\n", "select key, value from " + topicName, questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT)); } } diff --git a/integration-tests/debezium/src/test/java/kafka/DebeziumIT.java b/integration-tests/debezium/src/test/java/kafka/DebeziumIT.java index f5b5a79..67bf8fc 100644 --- a/integration-tests/debezium/src/test/java/kafka/DebeziumIT.java +++ b/integration-tests/debezium/src/test/java/kafka/DebeziumIT.java @@ -3,15 +3,9 @@ import io.debezium.testing.testcontainers.ConnectorConfiguration; import io.debezium.testing.testcontainers.DebeziumContainer; import io.questdb.client.Sender; -import io.questdb.kafka.QuestDBSinkConnector; -import io.questdb.kafka.QuestDBSinkConnectorConfig; -import io.questdb.kafka.QuestDBSinkTask; -import io.questdb.kafka.QuestDBUtils; -import io.questdb.kafka.JarResolverExtension; +import io.questdb.kafka.*; import org.apache.kafka.connect.json.JsonConverter; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.LoggerFactory; @@ -22,21 +16,11 @@ import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.lifecycle.Startables; import org.testcontainers.utility.DockerImageName; import org.testcontainers.utility.MountableFile; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; +import java.sql.*; import java.util.concurrent.ThreadLocalRandom; -import java.util.stream.Stream; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.Assert.assertEquals; @Testcontainers public class DebeziumIT { @@ -117,10 +101,11 @@ public void testSmoke() throws Exception { ConnectorConfiguration questSinkConfig = newQuestSinkBaseConfig(questTableName); debeziumContainer.registerConnector(QUESTDB_CONNECTOR_NAME, questSinkConfig); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\"\r\n" + QuestDBUtils.assertSqlEventually( "\"id\",\"title\"\r\n" + "1,\"Learn CDC\"\r\n" + "2,\"Learn Debezium\"\r\n", - "select id, title from " + questTableName); + "select id, title from " + questTableName, + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT)); } } @@ -144,13 +129,14 @@ public void testManyUpdates() throws Exception { statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (3, 'PDB', 1.0)"); statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (4, 'KDB', 1.0)"); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"symbol\",\"price\"\r\n" + QuestDBUtils.assertSqlEventually( "\"id\",\"symbol\",\"price\"\r\n" + "0,\"TDB\",1.0\r\n" + "1,\"QDB\",1.0\r\n" + "2,\"IDB\",1.0\r\n" + "3,\"PDB\",1.0\r\n" + "4,\"KDB\",1.0\r\n", - "select id, symbol, price from " + questTableName); + "select id, symbol, price from " + questTableName, + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT)); try (PreparedStatement preparedStatement = connection.prepareStatement("update " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " set price = ? where id = ?")) { //a bunch of updates @@ -171,18 +157,20 @@ public void testManyUpdates() throws Exception { } // all symbols have the last well-known price - QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"symbol\",\"last_price\"\r\n" + QuestDBUtils.assertSqlEventually("\"id\",\"symbol\",\"last_price\"\r\n" + "0,\"TDB\",42.0\r\n" + "1,\"QDB\",42.0\r\n" + "2,\"IDB\",42.0\r\n" + "3,\"PDB\",42.0\r\n" + "4,\"KDB\",42.0\r\n", - "select id, symbol, last(price) as last_price from " + questTableName); + "select id, symbol, last(price) as last_price from " + questTableName, + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT)); // total number of rows is equal to the number of updates and inserts - QuestDBUtils.assertSqlEventually(questDBContainer, "\"count\"\r\n" + QuestDBUtils.assertSqlEventually("\"count\"\r\n" + "200010\r\n", - "select count() from " + questTableName); + "select count() from " + questTableName, + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT)); } } @@ -202,17 +190,19 @@ public void testSchemaChange() throws Exception { ConnectorConfiguration questSink = newQuestSinkBaseConfig(questTableName); debeziumContainer.registerConnector(QUESTDB_CONNECTOR_NAME, questSink); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\"\r\n" + QuestDBUtils.assertSqlEventually( "\"id\",\"title\"\r\n" + "1,\"Learn CDC\"\r\n", - "select id, title from " + questTableName); + "select id, title from " + questTableName, + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT)); statement.execute("alter table " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " add column description varchar(255)"); statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (2, 'Learn Debezium', 'Best book ever')"); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\",\"description\"\r\n" + QuestDBUtils.assertSqlEventually("\"id\",\"title\",\"description\"\r\n" + "1,\"Learn CDC\",\r\n" + "2,\"Learn Debezium\",\"Best book ever\"\r\n", - "select id, title, description from " + questTableName); + "select id, title, description from " + questTableName, + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT)); } } @@ -231,16 +221,18 @@ public void testUpdatesChange() throws Exception { ConnectorConfiguration questSink = newQuestSinkBaseConfig(questTableName); debeziumContainer.registerConnector(QUESTDB_CONNECTOR_NAME, questSink); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\"\r\n" + QuestDBUtils.assertSqlEventually("\"id\",\"title\"\r\n" + "1,\"Learn CDC\"\r\n", - "select id, title from " + questTableName); + "select id, title from " + questTableName, + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT)); statement.executeUpdate("update " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " set title = 'Learn Debezium' where id = 1"); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\"\r\n" + QuestDBUtils.assertSqlEventually( "\"id\",\"title\"\r\n" + "1,\"Learn CDC\"\r\n" + "1,\"Learn Debezium\"\r\n", - "select id, title from " + questTableName); + "select id, title from " + questTableName, + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT)); } } @@ -259,17 +251,19 @@ public void testInsertThenDeleteThenInsertAgain() throws Exception { ConnectorConfiguration questSink = newQuestSinkBaseConfig(questTableName); debeziumContainer.registerConnector(QUESTDB_CONNECTOR_NAME, questSink); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\"\r\n" + QuestDBUtils.assertSqlEventually( "\"id\",\"title\"\r\n" + "1,\"Learn CDC\"\r\n", - "select id, title from " + questTableName); + "select id, title from " + questTableName, + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT)); statement.execute("delete from " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " where id = 1"); statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (1, 'Learn Debezium')"); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\"\r\n" + QuestDBUtils.assertSqlEventually( "\"id\",\"title\"\r\n" + "1,\"Learn CDC\"\r\n" + "1,\"Learn Debezium\"\r\n", - "select id, title from " + questTableName); + "select id, title from " + questTableName, + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT)); } } @@ -288,9 +282,10 @@ public void testEventTime() throws SQLException { questSink.with(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "created_at"); debeziumContainer.registerConnector(QUESTDB_CONNECTOR_NAME, questSink); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\",\"timestamp\"\r\n" + QuestDBUtils.assertSqlEventually( "\"id\",\"title\",\"timestamp\"\r\n" + "1,\"Learn CDC\",\"2021-01-02T01:02:03.456000Z\"\r\n", - "select id, title, timestamp from " + questTableName); + "select id, title, timestamp from " + questTableName, + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT)); } } @@ -309,9 +304,10 @@ public void testEventTimeMicros() throws SQLException { questSink.with(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "created_at"); debeziumContainer.registerConnector(QUESTDB_CONNECTOR_NAME, questSink); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\",\"timestamp\"\r\n" + QuestDBUtils.assertSqlEventually( "\"id\",\"title\",\"timestamp\"\r\n" + "1,\"Learn CDC\",\"2021-01-02T01:02:03.123456Z\"\r\n", - "select id, title, timestamp from " + questTableName); + "select id, title, timestamp from " + questTableName, + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT)); } } @@ -330,9 +326,10 @@ public void testEventTimeNanos() throws SQLException { questSink.with(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "created_at"); debeziumContainer.registerConnector(QUESTDB_CONNECTOR_NAME, questSink); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\",\"timestamp\"\r\n" + QuestDBUtils.assertSqlEventually( "\"id\",\"title\",\"timestamp\"\r\n" + "1,\"Learn CDC\",\"2021-01-02T01:02:03.123457Z\"\r\n", - "select id, title, timestamp from " + questTableName); + "select id, title, timestamp from " + questTableName, + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT)); } } @@ -350,9 +347,10 @@ public void testNonDesignatedTimestamp() throws SQLException { ConnectorConfiguration questSink = newQuestSinkBaseConfig(questTableName); debeziumContainer.registerConnector(QUESTDB_CONNECTOR_NAME, questSink); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\",\"created_at\"\r\n" + QuestDBUtils.assertSqlEventually( "\"id\",\"title\",\"created_at\"\r\n" + "1,\"Learn CDC\",\"2021-01-02T01:02:03.456000Z\"\r\n", - "select id, title, created_at from " + questTableName); + "select id, title, created_at from " + questTableName, + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT)); } } @@ -370,9 +368,10 @@ public void testDate() throws SQLException { ConnectorConfiguration questSink = newQuestSinkBaseConfig(questTableName); debeziumContainer.registerConnector(QUESTDB_CONNECTOR_NAME, questSink); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\",\"created_at\"\r\n" + QuestDBUtils.assertSqlEventually( "\"id\",\"title\",\"created_at\"\r\n" + "1,\"Learn CDC\",\"2021-01-02T00:00:00.000000Z\"\r\n", - "select id, title, created_at from " + questTableName); + "select id, title, created_at from " + questTableName, + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT)); } } @@ -391,18 +390,20 @@ public void testDelete() throws SQLException { questSink.with(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "created_at"); debeziumContainer.registerConnector(QUESTDB_CONNECTOR_NAME, questSink); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\",\"timestamp\"\r\n" + QuestDBUtils.assertSqlEventually( "\"id\",\"title\",\"timestamp\"\r\n" + "1,\"Learn CDC\",\"2021-01-02T00:00:00.000000Z\"\r\n", - "select * from " + questTableName); + "select * from " + questTableName, + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT)); // delete should be ignored by QuestDB statement.execute("delete from " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " where id = 1"); statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (2, 'Learn Debezium', '2021-01-03')"); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\",\"timestamp\"\r\n" + QuestDBUtils.assertSqlEventually( "\"id\",\"title\",\"timestamp\"\r\n" + "1,\"Learn CDC\",\"2021-01-02T00:00:00.000000Z\"\r\n" + "2,\"Learn Debezium\",\"2021-01-03T00:00:00.000000Z\"\r\n", - "select * from " + questTableName); + "select * from " + questTableName, + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT)); } } From 40aed2becb1f20d3732c396fb6221d7464ff3ba4 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Tue, 10 Oct 2023 16:07:37 +0200 Subject: [PATCH 02/16] Dockerized Exactly Once test --- integration-tests/commons/pom.xml | 4 + .../questdb/kafka/JarResolverExtension.java | 4 +- integration-tests/exactlyonce/pom.xml | 78 +++++ .../java/io/questdb/kafka/ExactlyOnceIT.java | 296 ++++++++++++++++++ integration-tests/pom.xml | 1 + pom.xml | 2 +- 6 files changed, 382 insertions(+), 3 deletions(-) create mode 100644 integration-tests/exactlyonce/pom.xml create mode 100644 integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java diff --git a/integration-tests/commons/pom.xml b/integration-tests/commons/pom.xml index 5cee86b..aa2715a 100644 --- a/integration-tests/commons/pom.xml +++ b/integration-tests/commons/pom.xml @@ -20,5 +20,9 @@ org.testcontainers junit-jupiter + + org.junit.jupiter + junit-jupiter-api + \ No newline at end of file diff --git a/integration-tests/commons/src/main/java/io/questdb/kafka/JarResolverExtension.java b/integration-tests/commons/src/main/java/io/questdb/kafka/JarResolverExtension.java index 79577fc..b3ffe21 100644 --- a/integration-tests/commons/src/main/java/io/questdb/kafka/JarResolverExtension.java +++ b/integration-tests/commons/src/main/java/io/questdb/kafka/JarResolverExtension.java @@ -37,7 +37,7 @@ public String getJarPath() { } if (resource.getProtocol().equals("file")) { String pathString = resource.getPath(); - return buildJarFromSinglingTarget(pathString); + return buildJarFromSiblingTargetDir(pathString); } else if (resource.getProtocol().equals("jar")) { return getPathToJarWithClass(clazz); } @@ -52,7 +52,7 @@ private static String getPathToJarWithClass(Class clazz) { return path.toString(); } - private String buildJarFromSinglingTarget(String pathString) { + private String buildJarFromSiblingTargetDir(String pathString) { try { tempDir = Files.createTempDirectory("jar-resolver-tmp"); } catch (IOException e) { diff --git a/integration-tests/exactlyonce/pom.xml b/integration-tests/exactlyonce/pom.xml new file mode 100644 index 0000000..de2acee --- /dev/null +++ b/integration-tests/exactlyonce/pom.xml @@ -0,0 +1,78 @@ + + + 4.0.0 + + org.questdb + kafka-integration-tests + 0.10-SNAPSHOT + + + kafka-it-exactlyonce + + + 19 + 19 + UTF-8 + + + + + org.questdb + kafka-questdb-connector + ${project.version} + test + tests + + + org.questdb + kafka-questdb-connector + ${project.version} + test + + + org.apache.kafka + connect-api + ${kafka.version} + test + + + org.testcontainers + testcontainers + test + + + org.testcontainers + kafka + test + + + org.testcontainers + junit-jupiter + test + + + io.debezium + debezium-testing-testcontainers + 1.9.5.Final + test + + + org.slf4j + slf4j-api + provided + + + org.slf4j + slf4j-log4j12 + test + + + org.questdb + kafka-it-common + ${project.version} + + + + \ No newline at end of file diff --git a/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java b/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java new file mode 100644 index 0000000..018901d --- /dev/null +++ b/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java @@ -0,0 +1,296 @@ +package io.questdb.kafka; + +import io.questdb.client.Sender; +import io.questdb.std.Os; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.FixedHostPortGenericContainer; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; +import org.testcontainers.lifecycle.Startable; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.MountableFile; + +import java.io.IOException; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Instant; +import java.util.*; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Stream; + +import static java.time.Duration.ofMinutes; +import static java.time.Duration.ofSeconds; + +public class ExactlyOnceIT { + private static final DockerImageName KAFKA_CONTAINER_IMAGE = DockerImageName.parse("confluentinc/cp-kafka:7.5.1"); + private static final DockerImageName ZOOKEEPER_CONTAINER_IMAGE = DockerImageName.parse("confluentinc/cp-zookeeper:7.5.1"); + private static final DockerImageName CONNECT_CONTAINER_IMAGE = DockerImageName.parse("confluentinc/cp-kafka-connect:7.5.1"); + private static final DockerImageName QUESTDB_CONTAINER_IMAGE = DockerImageName.parse("questdb/questdb:7.3.3"); + private static final int KAFKA_CLUSTER_SIZE = 3; + private static final int CONNECT_CLUSTER_SIZE = 2; + + @TempDir + static Path persistence; + + // we need to locate JARs with QuestDB client and Kafka Connect Connector, + // this is later used to copy to the Kafka Connect container + @RegisterExtension + public static JarResolverExtension connectorJarResolver = JarResolverExtension.forClass(QuestDBSinkTask.class); + @RegisterExtension + public static JarResolverExtension questdbClientJarResolver = JarResolverExtension.forClass(Sender.class); + + private final static Network network = Network.newNetwork(); + + private static GenericContainer zookeeper; + private static KafkaContainer[] kafkas = new KafkaContainer[KAFKA_CLUSTER_SIZE]; + private static GenericContainer[] connects = new GenericContainer[CONNECT_CLUSTER_SIZE]; + private static GenericContainer questdb; + + private static int questHttpPort; + + @BeforeAll + public static void createContainers() { + zookeeper = newZookeeperContainer(); + questdb = newQuestDBContainer(); + for (int i = 0; i < KAFKA_CLUSTER_SIZE; i++) { + kafkas[i] = newKafkaContainer(i); + } + for (int i = 0; i < CONNECT_CLUSTER_SIZE; i++) { + connects[i] = newConnectContainer(i); + } + + Stream containers = Stream.concat( + Stream.concat( + Stream.of(kafkas), Stream.of(connects) + ), + Stream.of(zookeeper, questdb) + ); + Startables.deepStart(containers).join(); + questHttpPort = questdb.getMappedPort(9000); + } + + @AfterAll + public static void stopContainer() { + questdb.stop(); + Stream.of(kafkas).forEach(KafkaContainer::stop); + Stream.of(connects).forEach(GenericContainer::stop); + zookeeper.stop(); + } + + private static GenericContainer newZookeeperContainer() { + return new GenericContainer<>(ZOOKEEPER_CONTAINER_IMAGE) + .withNetwork(network) + .withNetworkAliases("zookeeper") + .withEnv("ZOOKEEPER_CLIENT_PORT", "2181") + .withEnv("ZOOKEEPER_TICK_TIME", "300") + .withEnv("ZOOKEEPER_INIT_LIMIT", "10") + .withEnv("ZOOKEEPER_SYNC_LIMIT", "5") + .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("zookeeper"))); + } + + private static GenericContainer newQuestDBContainer() { + + Path dbRoot; + try { + dbRoot = Files.createDirectories(persistence.resolve("questdb")); + } catch (IOException e) { + throw new RuntimeException(e); + } + FixedHostPortGenericContainer container = new FixedHostPortGenericContainer<>(QUESTDB_CONTAINER_IMAGE.asCanonicalNameString()) + .withNetwork(network) +// .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("questdb"))) + .withFileSystemBind(dbRoot.toAbsolutePath().toString(), "/var/lib/questdb") + .withCreateContainerCmdModifier(cmd -> cmd.withHostName("questdb")); + + if (questHttpPort == 0) { + container = container.withExposedPorts(9000); + } else { + container.withFixedExposedPort(questHttpPort, 9000); + } + return container; + } + + private static KafkaContainer newKafkaContainer(int id) { + Path kafkaData; + try { + kafkaData = Files.createDirectories(persistence.resolve("kafka").resolve("data" + id)); + } catch (IOException e) { + throw new RuntimeException(e); + } + + return new KafkaContainer(KAFKA_CONTAINER_IMAGE) + .withNetwork(network) + .dependsOn(zookeeper) + .withExternalZookeeper("zookeeper:2181") + .withEnv("KAFKA_BROKER_ID", String.valueOf(id)) + .withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "3") + .withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "3") + .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "3") + .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "2") + .withEnv("KAFKA_NUM_PARTITIONS", "3") + .withFileSystemBind(kafkaData.toAbsolutePath().toString(), "/var/lib/kafka/data") + .withCreateContainerCmdModifier(cmd -> cmd.withHostName("kafka" + id)) + .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("kafka" + id))); + } + + private static GenericContainer newConnectContainer(int id) { + List dependencies = new ArrayList<>(Arrays.asList(kafkas)); + dependencies.add(questdb); + + return new GenericContainer<>(CONNECT_CONTAINER_IMAGE) + .withEnv("CONNECT_BOOTSTRAP_SERVERS", "kafka0:9092") + .withEnv("CONNECT_GROUP_ID", "test") + .withEnv("CONNECT_OFFSET_STORAGE_TOPIC", "connect-storage-topic") + .withEnv("CONNECT_CONFIG_STORAGE_TOPIC", "connect-config-topic") + .withEnv("CONNECT_STATUS_STORAGE_TOPIC", "connect-status-topic") + .withEnv("CONNECT_KEY_CONVERTER", "org.apache.kafka.connect.storage.StringConverter") + .withEnv("CONNECT_VALUE_CONVERTER", "org.apache.kafka.connect.json.JsonConverter") + .withEnv("CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE", "false") + .withEnv("CONNECT_REST_ADVERTISED_HOST_NAME", "connect" + id) + .withNetwork(network) + .withExposedPorts(8083) + .withCopyFileToContainer(MountableFile.forHostPath(connectorJarResolver.getJarPath()), "/usr/share/java/kafka/questdb-connector.jar") + .withCopyFileToContainer(MountableFile.forHostPath(questdbClientJarResolver.getJarPath()), "/usr/share/java/kafka/questdb.jar") + .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("connect" + id))) + .dependsOn(dependencies) + .withCreateContainerCmdModifier(cmd -> cmd.withHostName("connect" + id)) + .waitingFor(new HttpWaitStrategy() + .forPath("/connectors") + .forStatusCode(200) + .forPort(8083) + .withStartupTimeout(ofMinutes(5))); + } + + @Test + public void test() throws Exception { + String topicName = "mytopic"; + int recordCount = 5_000_000; + + Properties props = new Properties(); + String bootstrapServers = kafkas[0].getBootstrapServers(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put("include.key", "false"); + + new Thread(() -> { + try (Producer producer = new KafkaProducer<>(props)) { + for (int i = 0; i < recordCount; i++ ) { + Instant now = Instant.now(); + long nanoTs = now.getEpochSecond() * 1_000_000_000 + now.getNano(); + UUID uuid = UUID.randomUUID(); + int val = ThreadLocalRandom.current().nextInt(100); + + String jsonVal = "{\"ts\":" + nanoTs + ",\"id\":\"" + uuid + "\",\"val\":" + val + "}"; + producer.send(new ProducerRecord<>(topicName, null, jsonVal)); + + // 1% chance of duplicates - we want them to be also deduped by QuestDB + if (ThreadLocalRandom.current().nextInt(100) == 0) { + producer.send(new ProducerRecord<>(topicName, null, jsonVal)); + } + } + } + }).start(); + + // configure questdb dedups + QuestDBUtils.assertSql( + "{\"ddl\":\"OK\"}", + "CREATE TABLE " + topicName + " (ts TIMESTAMP, id UUID, val LONG) timestamp(ts) PARTITION BY DAY WAL DEDUP UPSERT KEYS(ts, id);", + questdb.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT), + QuestDBUtils.Endpoint.EXEC); + + CyclicBarrier barrier = new CyclicBarrier(2); + new Thread(() -> { + while (barrier.getNumberWaiting() == 0) { + Os.sleep(ThreadLocalRandom.current().nextInt(5_000, 30_000)); + int victim = ThreadLocalRandom.current().nextInt(3); + switch (victim) { + case 0: { + questdb.stop(); + GenericContainer container = newQuestDBContainer(); + container.start(); + questdb = container; + break; + } + case 1: { + int n = ThreadLocalRandom.current().nextInt(connects.length); + connects[n].stop(); + GenericContainer container = newConnectContainer(n); + container.start(); + connects[n] = container; + break; + } + case 2: { + int n = ThreadLocalRandom.current().nextInt(kafkas.length); + kafkas[n].stop(); + KafkaContainer container = newKafkaContainer(n); + Os.sleep(5000); // wait for zookeeper to detect the previous kafka container was stopped + container.start(); + kafkas[n] = container; + break; + } + } + } + + try { + barrier.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (BrokenBarrierException e) { + throw new RuntimeException(e); + } + }).start(); + + String payload = "{\"name\":\"my-connector\",\"config\":{" + + "\"tasks.max\":\"4\"," + + "\"connector.class\":\"io.questdb.kafka.QuestDBSinkConnector\"," + + "\"key.converter\":\"org.apache.kafka.connect.storage.StringConverter\"," + + "\"value.converter\":\"org.apache.kafka.connect.json.JsonConverter\"," + + "\"topics\":\"mytopic\"," + + "\"value.converter.schemas.enable\":\"false\"," + + "\"timestamp.field.name\":\"ts\"," + + "\"host\":\"questdb:9009\"}" + + "}"; + + HttpResponse response = HttpClient.newBuilder().connectTimeout(ofSeconds(10)).build().send( + HttpRequest.newBuilder().POST(HttpRequest.BodyPublishers.ofString(payload)) + .uri(new URI("http://localhost:" + connects[0].getMappedPort(8083) + "/connectors")) + .header("Content-Type", "application/json") + .build(), + HttpResponse.BodyHandlers.ofString() + ); + if (response.statusCode() != 201) { + throw new RuntimeException("Failed to create connector: " + response.body()); + } + + QuestDBUtils.assertSqlEventually( + "\"count\"\r\n" + + recordCount + "\r\n", + "select count(*) from " + topicName, + 600, + questHttpPort); + + barrier.await(); + } +} diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 8bff927..96a0018 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -13,6 +13,7 @@ debezium commons avro-schema-registry + exactlyonce pom diff --git a/pom.xml b/pom.xml index 0e7ba29..c2eef24 100644 --- a/pom.xml +++ b/pom.xml @@ -20,7 +20,7 @@ 2.13 3.3.1 5.9.0 - 1.17.3 + 1.19.1 1.7.36 4.1.0 4.10.0 From 2c9bd89db9bdd731b1e484289918a3fd5eece5d1 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Wed, 11 Oct 2023 17:24:20 +0200 Subject: [PATCH 03/16] MultiTopicPartitionOffsetTracker perf. optimization --- .../MultiTopicPartitionOffsetTracker.java | 106 +++++++++++++----- .../io/questdb/kafka/QuestDBSinkTask.java | 10 +- .../QuestDBSinkConnectorEmbeddedTest.java | 2 +- 3 files changed, 82 insertions(+), 36 deletions(-) diff --git a/connector/src/main/java/io/questdb/kafka/MultiTopicPartitionOffsetTracker.java b/connector/src/main/java/io/questdb/kafka/MultiTopicPartitionOffsetTracker.java index e308e8f..4758a35 100644 --- a/connector/src/main/java/io/questdb/kafka/MultiTopicPartitionOffsetTracker.java +++ b/connector/src/main/java/io/questdb/kafka/MultiTopicPartitionOffsetTracker.java @@ -1,23 +1,43 @@ package io.questdb.kafka; +import io.questdb.std.CharSequenceObjHashMap; +import io.questdb.std.LongList; +import io.questdb.std.ObjList; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.sink.SinkTaskContext; -import java.util.*; +import java.util.Collection; +import java.util.Map; public final class MultiTopicPartitionOffsetTracker implements TopicPartitionOffsetTracker { - private final List> offsets = new ArrayList<>(); + private static final int EMPTY = -1; + private static final int CLOSED = -2; + + private final CharSequenceObjHashMap offsets = new CharSequenceObjHashMap<>(); + + private String lastTopicCache; + private LongList lastTopicOffsetsCache; @Override public void onPartitionsOpened(Collection partitions) { for (TopicPartition partition : partitions) { - if (offsets.size() - 1 < partition.partition() || offsets.get(partition.partition()) == null) { - Map topic2offset = new HashMap<>(); - topic2offset.put(partition.topic(), -1L); - offsets.add(partition.partition(), topic2offset); - } else { - offsets.get(partition.partition()).put(partition.topic(), -1L); + String topic = partition.topic(); + LongList topicOffsets = offsets.get(topic); + if (topicOffsets == null) { + topicOffsets = new LongList(4); + offsets.put(topic, topicOffsets); + } + + int partitionId = partition.partition(); + int currentSize = topicOffsets.size(); + if (currentSize <= partitionId) { + topicOffsets.extendAndSet(partitionId, EMPTY); + if (currentSize != partitionId) { + topicOffsets.fill(currentSize, partitionId, EMPTY); + } + } else if (topicOffsets.get(partitionId) == CLOSED) { + topicOffsets.set(partitionId, EMPTY); } } } @@ -25,36 +45,60 @@ public void onPartitionsOpened(Collection partitions) { @Override public void onPartitionsClosed(Collection partitions) { for (TopicPartition partition : partitions) { - Map topic2offset = offsets.get(partition.partition()); - topic2offset.remove(partition.topic()); - if (topic2offset.isEmpty()) { - offsets.set(partition.partition(), null); + String topic = partition.topic(); + LongList topicOffsets = offsets.get(topic); + topicOffsets.set(partition.partition(), CLOSED); + } + + // Remove topics that have all partitions closed + ObjList keys = offsets.keys(); + for (int i = 0, n = keys.size(); i < n; i++) { + CharSequence topic = keys.getQuick(i); + LongList topicOffsets = offsets.get(topic); + boolean allClosed = true; + for (int partition = 0, m = topicOffsets.size(); partition < m; partition++) { + if (topicOffsets.get(partition) != CLOSED) { + allClosed = false; + break; + } + } + if (allClosed) { + offsets.remove(topic); } } } + + @Override public void onObservedOffset(int partition, String topic, long offset) { - Map partitionOffsets = offsets.get(partition); - Long maxOffset = partitionOffsets.get(topic); - if (maxOffset < offset) { - partitionOffsets.put(topic, offset); + LongList topicOffsets; + + // intentional reference equality check - Kafka Connect use the same String instances + // so we can avoid hash map lookup + if (lastTopicCache == topic) { + topicOffsets = lastTopicOffsetsCache; + } else { + topicOffsets = offsets.get(topic); + lastTopicCache = topic; + lastTopicOffsetsCache = topicOffsets; } + long maxOffset = topicOffsets.get(partition); + topicOffsets.set(partition, Math.max(maxOffset, offset)); } @Override public void configureSafeOffsets(SinkTaskContext sinkTaskContext, long rewindOffset) { - for (int partition = 0; partition < offsets.size(); partition++) { - Map topicOffsets = offsets.get(partition); - if (topicOffsets != null) { - for (Map.Entry entry : topicOffsets.entrySet()) { - String topic = entry.getKey(); - Long offset = entry.getValue(); - if (offset != -1) { - long newOffset = Math.max(0, offset - rewindOffset); - sinkTaskContext.offset(new TopicPartition(topic, partition), newOffset); - } + ObjList keys = offsets.keys(); + for (int i = 0, n = keys.size(); i < n; i++) { + CharSequence topic = keys.getQuick(i); + LongList topicOffsets = offsets.get(topic); + for (int partition = 0, m = topicOffsets.size(); partition < m; partition++) { + long offset = topicOffsets.get(partition); + // only rewind if we ever observed an offset for this partition + if (offset >= 0) { + sinkTaskContext.offset(new TopicPartition(topic.toString(), partition), Math.max(0, offset - rewindOffset)); } } } @@ -64,10 +108,12 @@ public void configureSafeOffsets(SinkTaskContext sinkTaskContext, long rewindOff public void transformPreCommit(Map currentOffsets, long rewindOffset) { for (Map.Entry entry : currentOffsets.entrySet()) { TopicPartition topicPartition = entry.getKey(); - Map partitionOffsets = offsets.get(topicPartition.partition()); - assert partitionOffsets != null; - Long offset = partitionOffsets.get(topicPartition.topic()); - if (offset != -1) { + String topic = topicPartition.topic(); + LongList topicOffsets = offsets.get(topic); + long offset = topicOffsets.get(topicPartition.partition()); + + // only transform if we ever observed an offset for this partition + if (offset >= 0) { long newOffset = Math.max(0, offset - rewindOffset); entry.setValue(new OffsetAndMetadata(newOffset)); } diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java index fefda06..4a1d3af 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java @@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit; public final class QuestDBSinkTask extends SinkTask { - private static final int PROBABLY_SAFE_OFFSET_ROLLBACK = 150_000; + private static final int SAFE_OFFSET_ROLLBACK = 150_000; private static final char STRUCT_FIELD_SEPARATOR = '_'; private static final String PRIMITIVE_KEY_FALLBACK_NAME = "key"; private static final String PRIMITIVE_VALUE_FALLBACK_NAME = "value"; @@ -38,8 +38,8 @@ public final class QuestDBSinkTask extends SinkTask { private long batchesSinceLastError = 0; private DateFormat dataFormat; private boolean kafkaTimestampsEnabled; -// private final TopicPartitionOffsetTracker tracker = new MultiTopicPartitionOffsetTracker(); -private final TopicPartitionOffsetTracker tracker = new SingleTopicPartitionOffsetTracker(); + private final TopicPartitionOffsetTracker tracker = new MultiTopicPartitionOffsetTracker(); +//private final TopicPartitionOffsetTracker tracker = new SingleTopicPartitionOffsetTracker(); @Override public String version() { @@ -153,7 +153,7 @@ private void onSenderException(LineSenderException e) { closeSenderSilently(); sender = null; log.debug("Sender exception, retrying in {} ms", config.getRetryBackoffMs()); - tracker.configureSafeOffsets(context, PROBABLY_SAFE_OFFSET_ROLLBACK); + tracker.configureSafeOffsets(context, SAFE_OFFSET_ROLLBACK); context.timeout(config.getRetryBackoffMs()); throw new RetriableException(e); } else { @@ -165,7 +165,7 @@ private void onSenderException(LineSenderException e) { @Override public Map preCommit(Map currentOffsets) { assert currentOffsets.size() == 1; - tracker.transformPreCommit(currentOffsets, PROBABLY_SAFE_OFFSET_ROLLBACK); + tracker.transformPreCommit(currentOffsets, SAFE_OFFSET_ROLLBACK); return currentOffsets; } diff --git a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java index 6b90e0a..19f07d4 100644 --- a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java +++ b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java @@ -390,7 +390,7 @@ public void testExactlyOnce_withDedup() throws BrokenBarrierException, Interrupt .build(); // async inserts to Kafka - long recordCount = 10_000_000; + long recordCount = 1_000_000; Map prodProps = new HashMap<>(); new Thread(() -> { try (KafkaProducer producer = connect.kafka().createProducer(prodProps)) { From 17519b02f77dd999612b7571c2e2414a5a6a39c1 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Wed, 11 Oct 2023 17:36:24 +0200 Subject: [PATCH 04/16] remove a left-over assertion --- connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java | 1 - 1 file changed, 1 deletion(-) diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java index 4a1d3af..930d275 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java @@ -164,7 +164,6 @@ private void onSenderException(LineSenderException e) { @Override public Map preCommit(Map currentOffsets) { - assert currentOffsets.size() == 1; tracker.transformPreCommit(currentOffsets, SAFE_OFFSET_ROLLBACK); return currentOffsets; } From a968d561e0604f1217c0b781da04779d7c348b89 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Thu, 12 Oct 2023 11:05:05 +0200 Subject: [PATCH 05/16] delete persistence files from within a container --- .../kafka/QuestDBSinkConnectorEmbeddedTest.java | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java index 19f07d4..e5c6b64 100644 --- a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java +++ b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java @@ -63,6 +63,20 @@ public static void createContainer() { @AfterAll public static void stopContainer() { questDBContainer.stop(); + deleteFromContainer(questDBDirectory()); + } + + private static void deleteFromContainer(String directory) { + GenericContainer cleanup = new GenericContainer<>("alpine:3.18.4") + .withCommand("rm -rf /var/lib/delete") + .withFileSystemBind(directory, "/var/lib/delete") + .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("cleanup"))); + cleanup.start(); + cleanup.stop(); + } + + private static String questDBDirectory() { + return dbRoot.resolve("questdb").toAbsolutePath().toString(); } private static GenericContainer questDBContainer; @@ -79,8 +93,7 @@ private static GenericContainer newQuestDbConnector() { } else { selfGenericContainer.addExposedPort(QuestDBUtils.QUESTDB_ILP_PORT); } - String dbRootString = dbRoot.toAbsolutePath().toString(); - selfGenericContainer = selfGenericContainer.withFileSystemBind(dbRootString, "/var/lib/questdb"); + selfGenericContainer = selfGenericContainer.withFileSystemBind(questDBDirectory(), "/var/lib/questdb"); if (DUMP_QUESTDB_CONTAINER_LOGS) { selfGenericContainer = selfGenericContainer.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("questdb"))); } From 3137d577061ad0d6a3b67c96d0d58b0866c291bc Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Thu, 12 Oct 2023 11:21:09 +0200 Subject: [PATCH 06/16] experiment with dir delte --- .../QuestDBSinkConnectorEmbeddedTest.java | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java index e5c6b64..4429b71 100644 --- a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java +++ b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java @@ -63,13 +63,27 @@ public static void createContainer() { @AfterAll public static void stopContainer() { questDBContainer.stop(); - deleteFromContainer(questDBDirectory()); + deleteFromContainer("questdb"); } private static void deleteFromContainer(String directory) { GenericContainer cleanup = new GenericContainer<>("alpine:3.18.4") - .withCommand("rm -rf /var/lib/delete") - .withFileSystemBind(directory, "/var/lib/delete") + .withFileSystemBind(dbRoot.toAbsolutePath().toString(), "/var/lib/delete") + .withCommand("ls -l /var/lib/delete/") + .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("cleanup"))); + cleanup.start(); + cleanup.stop(); + + cleanup = new GenericContainer<>("alpine:3.18.4") + .withFileSystemBind(dbRoot.toAbsolutePath().toString(), "/var/lib/delete") + .withCommand("rm -rf /var/lib/delete/" + directory) + .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("cleanup"))); + cleanup.start(); + cleanup.stop(); + + cleanup = new GenericContainer<>("alpine:3.18.4") + .withFileSystemBind(dbRoot.toAbsolutePath().toString(), "/var/lib/delete") + .withCommand("ls -l /var/lib/delete/") .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("cleanup"))); cleanup.start(); cleanup.stop(); From 039527714012924ddc561424352ea21517e7c263 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Thu, 12 Oct 2023 13:29:27 +0200 Subject: [PATCH 07/16] refactoring --- .../java/io/questdb/kafka/ExactlyOnceIT.java | 73 ++++++++++++------- 1 file changed, 47 insertions(+), 26 deletions(-) diff --git a/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java b/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java index 018901d..31679e6 100644 --- a/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java +++ b/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java @@ -7,6 +7,7 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; +import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -26,6 +27,7 @@ import java.io.IOException; import java.net.URI; +import java.net.URISyntaxException; import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpResponse; @@ -42,6 +44,11 @@ import static java.time.Duration.ofSeconds; public class ExactlyOnceIT { + private static final int VICTIM_QUESTDB = 0; + private static final int VICTIM_CONNECT = 1; + private static final int VICTIM_KAFKA = 2; + private static final int VICTIMS_TOTAL = VICTIM_KAFKA + 1; + private static final DockerImageName KAFKA_CONTAINER_IMAGE = DockerImageName.parse("confluentinc/cp-kafka:7.5.1"); private static final DockerImageName ZOOKEEPER_CONTAINER_IMAGE = DockerImageName.parse("confluentinc/cp-zookeeper:7.5.1"); private static final DockerImageName CONNECT_CONTAINER_IMAGE = DockerImageName.parse("confluentinc/cp-kafka-connect:7.5.1"); @@ -184,7 +191,6 @@ private static GenericContainer newConnectContainer(int id) { @Test public void test() throws Exception { String topicName = "mytopic"; - int recordCount = 5_000_000; Properties props = new Properties(); String bootstrapServers = kafkas[0].getBootstrapServers(); @@ -193,46 +199,68 @@ public void test() throws Exception { props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put("include.key", "false"); + int recordCount = 5_000_000; new Thread(() -> { try (Producer producer = new KafkaProducer<>(props)) { for (int i = 0; i < recordCount; i++ ) { - Instant now = Instant.now(); - long nanoTs = now.getEpochSecond() * 1_000_000_000 + now.getNano(); - UUID uuid = UUID.randomUUID(); - int val = ThreadLocalRandom.current().nextInt(100); - - String jsonVal = "{\"ts\":" + nanoTs + ",\"id\":\"" + uuid + "\",\"val\":" + val + "}"; - producer.send(new ProducerRecord<>(topicName, null, jsonVal)); + String json = newPayload(); + producer.send(new ProducerRecord<>(topicName, null, json)); // 1% chance of duplicates - we want them to be also deduped by QuestDB if (ThreadLocalRandom.current().nextInt(100) == 0) { - producer.send(new ProducerRecord<>(topicName, null, jsonVal)); + producer.send(new ProducerRecord<>(topicName, null, json)); } } } }).start(); - // configure questdb dedups QuestDBUtils.assertSql( "{\"ddl\":\"OK\"}", "CREATE TABLE " + topicName + " (ts TIMESTAMP, id UUID, val LONG) timestamp(ts) PARTITION BY DAY WAL DEDUP UPSERT KEYS(ts, id);", questdb.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT), QuestDBUtils.Endpoint.EXEC); + startConnector(); + CyclicBarrier barrier = new CyclicBarrier(2); + startKillingRandomContainers(barrier); + + // make sure we have exactly the expected records in QuestDB + QuestDBUtils.assertSqlEventually( + "\"count\"\r\n" + + recordCount + "\r\n", + "select count(*) from " + topicName, + 600, + questHttpPort); + + barrier.await(); + } + + @NotNull + private static String newPayload() { + Instant now = Instant.now(); + long nanoTs = now.getEpochSecond() * 1_000_000_000 + now.getNano(); + UUID uuid = UUID.randomUUID(); + int val = ThreadLocalRandom.current().nextInt(100); + + String jsonVal = "{\"ts\":" + nanoTs + ",\"id\":\"" + uuid + "\",\"val\":" + val + "}"; + return jsonVal; + } + + private static void startKillingRandomContainers(CyclicBarrier barrier) { new Thread(() -> { - while (barrier.getNumberWaiting() == 0) { + while (barrier.getNumberWaiting() == 0) { // keep killing them until the checker thread passed the assertion Os.sleep(ThreadLocalRandom.current().nextInt(5_000, 30_000)); - int victim = ThreadLocalRandom.current().nextInt(3); + int victim = ThreadLocalRandom.current().nextInt(VICTIMS_TOTAL); switch (victim) { - case 0: { + case VICTIM_QUESTDB: { questdb.stop(); GenericContainer container = newQuestDBContainer(); container.start(); questdb = container; break; } - case 1: { + case VICTIM_CONNECT: { int n = ThreadLocalRandom.current().nextInt(connects.length); connects[n].stop(); GenericContainer container = newConnectContainer(n); @@ -240,7 +268,7 @@ public void test() throws Exception { connects[n] = container; break; } - case 2: { + case VICTIM_KAFKA: { int n = ThreadLocalRandom.current().nextInt(kafkas.length); kafkas[n].stop(); KafkaContainer container = newKafkaContainer(n); @@ -251,7 +279,7 @@ public void test() throws Exception { } } } - + try { barrier.await(); } catch (InterruptedException e) { @@ -261,7 +289,9 @@ public void test() throws Exception { throw new RuntimeException(e); } }).start(); + } + private static void startConnector() throws IOException, InterruptedException, URISyntaxException { String payload = "{\"name\":\"my-connector\",\"config\":{" + "\"tasks.max\":\"4\"," + "\"connector.class\":\"io.questdb.kafka.QuestDBSinkConnector\"," + @@ -271,7 +301,7 @@ public void test() throws Exception { "\"value.converter.schemas.enable\":\"false\"," + "\"timestamp.field.name\":\"ts\"," + "\"host\":\"questdb:9009\"}" + - "}"; + "}"; HttpResponse response = HttpClient.newBuilder().connectTimeout(ofSeconds(10)).build().send( HttpRequest.newBuilder().POST(HttpRequest.BodyPublishers.ofString(payload)) @@ -283,14 +313,5 @@ public void test() throws Exception { if (response.statusCode() != 201) { throw new RuntimeException("Failed to create connector: " + response.body()); } - - QuestDBUtils.assertSqlEventually( - "\"count\"\r\n" - + recordCount + "\r\n", - "select count(*) from " + topicName, - 600, - questHttpPort); - - barrier.await(); } } From 9c17a5a20bc5df7383eb7df4bfc9e25452e4a099 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Thu, 12 Oct 2023 13:29:51 +0200 Subject: [PATCH 08/16] hack to make sure to delete the file --- .../questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java index 4429b71..38bfc88 100644 --- a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java +++ b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java @@ -22,6 +22,7 @@ import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; +import org.testcontainers.containers.wait.strategy.ShellStrategy; import org.testcontainers.junit.jupiter.Testcontainers; import java.math.BigDecimal; @@ -70,21 +71,22 @@ private static void deleteFromContainer(String directory) { GenericContainer cleanup = new GenericContainer<>("alpine:3.18.4") .withFileSystemBind(dbRoot.toAbsolutePath().toString(), "/var/lib/delete") .withCommand("ls -l /var/lib/delete/") - .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("cleanup"))); + .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("cleanup1"))); cleanup.start(); cleanup.stop(); cleanup = new GenericContainer<>("alpine:3.18.4") .withFileSystemBind(dbRoot.toAbsolutePath().toString(), "/var/lib/delete") .withCommand("rm -rf /var/lib/delete/" + directory) - .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("cleanup"))); + .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("cleanup2"))) + .waitingFor(new ShellStrategy().withCommand("rm -rf /var/lib/delete/" + directory)); cleanup.start(); cleanup.stop(); cleanup = new GenericContainer<>("alpine:3.18.4") .withFileSystemBind(dbRoot.toAbsolutePath().toString(), "/var/lib/delete") .withCommand("ls -l /var/lib/delete/") - .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("cleanup"))); + .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("cleanup3"))); cleanup.start(); cleanup.stop(); } From 06d866e266daa3fefbfa1ea056407a76637a97b6 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Thu, 12 Oct 2023 15:33:07 +0200 Subject: [PATCH 09/16] another attempt to cleanup --- .../io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java index 38bfc88..96569df 100644 --- a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java +++ b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java @@ -1,5 +1,6 @@ package io.questdb.kafka; +import io.questdb.std.Files; import io.questdb.std.Os; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -16,6 +17,7 @@ import org.apache.kafka.connect.storage.StringConverter; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; import org.junit.jupiter.api.*; +import org.junit.jupiter.api.io.CleanupMode; import org.junit.jupiter.api.io.TempDir; import org.slf4j.LoggerFactory; import org.testcontainers.containers.FixedHostPortGenericContainer; @@ -53,7 +55,7 @@ public final class QuestDBSinkConnectorEmbeddedTest { private Converter converter; private String topicName; - @TempDir + @TempDir(cleanup = CleanupMode.NEVER) static Path dbRoot; @BeforeAll @@ -64,7 +66,8 @@ public static void createContainer() { @AfterAll public static void stopContainer() { questDBContainer.stop(); - deleteFromContainer("questdb"); +// deleteFromContainer("questdb"); + Files.rmdir(io.questdb.std.str.Path.getThreadLocal(dbRoot.toAbsolutePath().toString())); } private static void deleteFromContainer(String directory) { From 5d2210099866e8448bdb8625c05eea421052f714 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Thu, 12 Oct 2023 15:49:12 +0200 Subject: [PATCH 10/16] use inherited props --- integration-tests/exactlyonce/pom.xml | 6 ------ 1 file changed, 6 deletions(-) diff --git a/integration-tests/exactlyonce/pom.xml b/integration-tests/exactlyonce/pom.xml index de2acee..06c6b58 100644 --- a/integration-tests/exactlyonce/pom.xml +++ b/integration-tests/exactlyonce/pom.xml @@ -8,14 +8,8 @@ kafka-integration-tests 0.10-SNAPSHOT - kafka-it-exactlyonce - - 19 - 19 - UTF-8 - From 5a096b4f87cbdad9dbbfed1448b4f9357bce7d79 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Thu, 12 Oct 2023 15:50:22 +0200 Subject: [PATCH 11/16] do not rely on TmpDir for cleaning --- .../src/test/java/io/questdb/kafka/ExactlyOnceIT.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java b/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java index 31679e6..458d85f 100644 --- a/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java +++ b/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java @@ -12,6 +12,7 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.CleanupMode; import org.junit.jupiter.api.io.TempDir; import org.slf4j.LoggerFactory; import org.testcontainers.containers.FixedHostPortGenericContainer; @@ -56,7 +57,7 @@ public class ExactlyOnceIT { private static final int KAFKA_CLUSTER_SIZE = 3; private static final int CONNECT_CLUSTER_SIZE = 2; - @TempDir + @TempDir(cleanup = CleanupMode.NEVER) static Path persistence; // we need to locate JARs with QuestDB client and Kafka Connect Connector, @@ -102,6 +103,8 @@ public static void stopContainer() { Stream.of(kafkas).forEach(KafkaContainer::stop); Stream.of(connects).forEach(GenericContainer::stop); zookeeper.stop(); + + io.questdb.std.Files.rmdir(io.questdb.std.str.Path.getThreadLocal(persistence.toAbsolutePath().toString())); } private static GenericContainer newZookeeperContainer() { From 72dc75200c9a4c5bc75e7f24711c281617141c17 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Thu, 12 Oct 2023 17:26:56 +0200 Subject: [PATCH 12/16] integration tests verification --- .../src/main/java/io/questdb/kafka/QuestDBSinkTask.java | 6 +++--- integration-tests/pom.xml | 1 + 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java index 930d275..78507e7 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java @@ -132,8 +132,8 @@ public void put(Collection collection) { if (++batchesSinceLastError == 10) { // why 10? why not to reset the retry counter immediately upon a successful flush()? // there are two reasons for server disconnections: - // 1. the server is down / unreachable / other_infrastructure_issues - // 2. the client is sending bad data (e.g. pushing a string to a double column) + // 1. infrastructure: the server is down / unreachable / other_infrastructure_issues + // 2. structural: the client is sending bad data (e.g. pushing a string to a double column) // errors in the latter case are not recoverable. upon receiving bad data the server will *eventually* close the connection, // after a while, the client will notice that the connection is closed and will try to reconnect // if we reset the retry counter immediately upon first successful flush() then we end-up in a loop where we flush bad data, @@ -434,7 +434,7 @@ private boolean tryWriteLogicalType(String name, Schema schema, Object value) { @Override public void flush(Map map) { - // not needed as put() flushes after each record + // not needed as put() flushes after each batch } @Override diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 96a0018..08efd86 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -27,6 +27,7 @@ integration-test + verify From 1dca9f4d7a567cce77746436881511b095e55c91 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Thu, 12 Oct 2023 17:47:36 +0200 Subject: [PATCH 13/16] Github action to start integration tests --- .github/workflows/it.yml | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) create mode 100644 .github/workflows/it.yml diff --git a/.github/workflows/it.yml b/.github/workflows/it.yml new file mode 100644 index 0000000..b75f03a --- /dev/null +++ b/.github/workflows/it.yml @@ -0,0 +1,17 @@ +name: Run Integration Tests +on: + workflow_dispatch: + +jobs: + publish: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: Set up JDK 11 + uses: actions/setup-java@v3 + with: + java-version: 11 + distribution: 'temurin' + cache: 'maven' + - name: Run integration tests + run: mvn -B verify \ No newline at end of file From dcf32750b388a3a4825736d4c949101b3bc4f36f Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Tue, 31 Oct 2023 15:21:04 +0100 Subject: [PATCH 14/16] simplify offset tracker --- .../io/questdb/kafka/EmptyOffsetTracker.java | 35 +++++++++ ...etTracker.java => MultiOffsetTracker.java} | 2 +- ...nOffsetTracker.java => OffsetTracker.java} | 2 +- .../io/questdb/kafka/QuestDBSinkTask.java | 2 +- .../SingleTopicPartitionOffsetTracker.java | 72 ------------------- .../QuestDBSinkConnectorEmbeddedTest.java | 26 ------- .../java/io/questdb/kafka/ExactlyOnceIT.java | 3 +- 7 files changed, 39 insertions(+), 103 deletions(-) create mode 100644 connector/src/main/java/io/questdb/kafka/EmptyOffsetTracker.java rename connector/src/main/java/io/questdb/kafka/{MultiTopicPartitionOffsetTracker.java => MultiOffsetTracker.java} (98%) rename connector/src/main/java/io/questdb/kafka/{TopicPartitionOffsetTracker.java => OffsetTracker.java} (93%) delete mode 100644 connector/src/main/java/io/questdb/kafka/SingleTopicPartitionOffsetTracker.java diff --git a/connector/src/main/java/io/questdb/kafka/EmptyOffsetTracker.java b/connector/src/main/java/io/questdb/kafka/EmptyOffsetTracker.java new file mode 100644 index 0000000..aef8315 --- /dev/null +++ b/connector/src/main/java/io/questdb/kafka/EmptyOffsetTracker.java @@ -0,0 +1,35 @@ +package io.questdb.kafka; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.sink.SinkTaskContext; + +import java.util.Collection; +import java.util.Map; + +public final class EmptyOffsetTracker implements OffsetTracker { + @Override + public void onPartitionsOpened(Collection partitions) { + + } + + @Override + public void onPartitionsClosed(Collection partitions) { + + } + + @Override + public void onObservedOffset(int partition, String topic, long offset) { + + } + + @Override + public void configureSafeOffsets(SinkTaskContext sinkTaskContext, long rewindOffset) { + assert rewindOffset == 0; + } + + @Override + public void transformPreCommit(Map currentOffsets, long rewindOffset) { + assert rewindOffset == 0; + } +} diff --git a/connector/src/main/java/io/questdb/kafka/MultiTopicPartitionOffsetTracker.java b/connector/src/main/java/io/questdb/kafka/MultiOffsetTracker.java similarity index 98% rename from connector/src/main/java/io/questdb/kafka/MultiTopicPartitionOffsetTracker.java rename to connector/src/main/java/io/questdb/kafka/MultiOffsetTracker.java index 4758a35..b9dfe76 100644 --- a/connector/src/main/java/io/questdb/kafka/MultiTopicPartitionOffsetTracker.java +++ b/connector/src/main/java/io/questdb/kafka/MultiOffsetTracker.java @@ -10,7 +10,7 @@ import java.util.Collection; import java.util.Map; -public final class MultiTopicPartitionOffsetTracker implements TopicPartitionOffsetTracker { +public final class MultiOffsetTracker implements OffsetTracker { private static final int EMPTY = -1; private static final int CLOSED = -2; diff --git a/connector/src/main/java/io/questdb/kafka/TopicPartitionOffsetTracker.java b/connector/src/main/java/io/questdb/kafka/OffsetTracker.java similarity index 93% rename from connector/src/main/java/io/questdb/kafka/TopicPartitionOffsetTracker.java rename to connector/src/main/java/io/questdb/kafka/OffsetTracker.java index 7791ef2..a268f4f 100644 --- a/connector/src/main/java/io/questdb/kafka/TopicPartitionOffsetTracker.java +++ b/connector/src/main/java/io/questdb/kafka/OffsetTracker.java @@ -7,7 +7,7 @@ import java.util.Collection; import java.util.Map; -public interface TopicPartitionOffsetTracker { +public interface OffsetTracker { void onPartitionsOpened(Collection partitions); void onPartitionsClosed(Collection partitions); diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java index 78507e7..5c77692 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java @@ -38,7 +38,7 @@ public final class QuestDBSinkTask extends SinkTask { private long batchesSinceLastError = 0; private DateFormat dataFormat; private boolean kafkaTimestampsEnabled; - private final TopicPartitionOffsetTracker tracker = new MultiTopicPartitionOffsetTracker(); + private final OffsetTracker tracker = new MultiOffsetTracker(); //private final TopicPartitionOffsetTracker tracker = new SingleTopicPartitionOffsetTracker(); @Override diff --git a/connector/src/main/java/io/questdb/kafka/SingleTopicPartitionOffsetTracker.java b/connector/src/main/java/io/questdb/kafka/SingleTopicPartitionOffsetTracker.java deleted file mode 100644 index b2d44a2..0000000 --- a/connector/src/main/java/io/questdb/kafka/SingleTopicPartitionOffsetTracker.java +++ /dev/null @@ -1,72 +0,0 @@ -package io.questdb.kafka; - -import io.questdb.std.LongList; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.connect.sink.SinkTaskContext; - -import java.util.Collection; -import java.util.Map; - -public final class SingleTopicPartitionOffsetTracker implements TopicPartitionOffsetTracker { - private final LongList offsets = new LongList(); - private String topic; - - @Override - public void onPartitionsOpened(Collection partitions) { - for (TopicPartition partition : partitions) { - if (topic == null) { - topic = partition.topic(); - } else if (!topic.equals(partition.topic())) { - throw new IllegalArgumentException("SingleTopicPartitionOffsetTracker can only track a single topic"); - } - offsets.extendAndSet(partition.partition(), -1); - } - } - - @Override - public void onPartitionsClosed(Collection partitions) { - for (TopicPartition partition : partitions) { - offsets.extendAndSet(partition.partition(), -1); - } - } - - @Override - public void onObservedOffset(int partition, String topic, long offset) { - long currentOffset = offsets.get(partition); - if (currentOffset < offset) { - offsets.extendAndSet(partition, offset); - } - } - - @Override - public void configureSafeOffsets(SinkTaskContext sinkTaskContext, long rewindOffset) { - assert topic != null; - - for (int partition = 0; partition < offsets.size(); partition++) { - long offset = offsets.get(partition); - if (offset != -1) { - long newOffset = Math.max(0, offset - rewindOffset); - sinkTaskContext.offset(new TopicPartition(topic, partition), newOffset); - } - } - } - - @Override - public void transformPreCommit(Map currentOffsets, long rewindOffset) { - assert topic != null; - - for (Map.Entry entry : currentOffsets.entrySet()) { - TopicPartition topicPartition = entry.getKey(); - if (!topicPartition.topic().equals(topic)) { - throw new IllegalArgumentException("SingleTopicPartitionOffsetTracker can only track a single topic"); - } - long offset = offsets.get(topicPartition.partition()); - if (offset != -1) { - long newOffset = Math.max(0, offset - rewindOffset); - OffsetAndMetadata offsetAndMetadata = entry.getValue(); - currentOffsets.put(topicPartition, new OffsetAndMetadata(newOffset, offsetAndMetadata.leaderEpoch(), offsetAndMetadata.metadata())); - } - } - } -} diff --git a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java index 96569df..1462fbb 100644 --- a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java +++ b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java @@ -24,7 +24,6 @@ import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; -import org.testcontainers.containers.wait.strategy.ShellStrategy; import org.testcontainers.junit.jupiter.Testcontainers; import java.math.BigDecimal; @@ -66,34 +65,9 @@ public static void createContainer() { @AfterAll public static void stopContainer() { questDBContainer.stop(); -// deleteFromContainer("questdb"); Files.rmdir(io.questdb.std.str.Path.getThreadLocal(dbRoot.toAbsolutePath().toString())); } - private static void deleteFromContainer(String directory) { - GenericContainer cleanup = new GenericContainer<>("alpine:3.18.4") - .withFileSystemBind(dbRoot.toAbsolutePath().toString(), "/var/lib/delete") - .withCommand("ls -l /var/lib/delete/") - .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("cleanup1"))); - cleanup.start(); - cleanup.stop(); - - cleanup = new GenericContainer<>("alpine:3.18.4") - .withFileSystemBind(dbRoot.toAbsolutePath().toString(), "/var/lib/delete") - .withCommand("rm -rf /var/lib/delete/" + directory) - .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("cleanup2"))) - .waitingFor(new ShellStrategy().withCommand("rm -rf /var/lib/delete/" + directory)); - cleanup.start(); - cleanup.stop(); - - cleanup = new GenericContainer<>("alpine:3.18.4") - .withFileSystemBind(dbRoot.toAbsolutePath().toString(), "/var/lib/delete") - .withCommand("ls -l /var/lib/delete/") - .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("cleanup3"))); - cleanup.start(); - cleanup.stop(); - } - private static String questDBDirectory() { return dbRoot.resolve("questdb").toAbsolutePath().toString(); } diff --git a/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java b/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java index 458d85f..aa07fd1 100644 --- a/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java +++ b/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java @@ -246,8 +246,7 @@ private static String newPayload() { UUID uuid = UUID.randomUUID(); int val = ThreadLocalRandom.current().nextInt(100); - String jsonVal = "{\"ts\":" + nanoTs + ",\"id\":\"" + uuid + "\",\"val\":" + val + "}"; - return jsonVal; + return "{\"ts\":" + nanoTs + ",\"id\":\"" + uuid + "\",\"val\":" + val + "}"; } private static void startKillingRandomContainers(CyclicBarrier barrier) { From b017717306400bdcf06c49f8b358c89339ef72ad Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Tue, 31 Oct 2023 15:44:13 +0100 Subject: [PATCH 15/16] make rewind offset configurable --- .../kafka/QuestDBSinkConnectorConfig.java | 16 +++++++++++++++- .../java/io/questdb/kafka/QuestDBSinkTask.java | 16 +++++++++++----- .../kafka/QuestDBSinkConnectorEmbeddedTest.java | 1 + 3 files changed, 27 insertions(+), 6 deletions(-) diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkConnectorConfig.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkConnectorConfig.java index 1fb6a5f..73d0710 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkConnectorConfig.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkConnectorConfig.java @@ -70,6 +70,15 @@ public final class QuestDBSinkConnectorConfig extends AbstractConfig { public static final String TIMESTAMP_FORMAT = "timestamp.string.format"; private static final String TIMESTAMP_FORMAT_DOC = "Timestamp format. Used when parsing timestamp string fields"; + public static final String DEDUPLICATION_REWIND_CONFIG = "dedup.rewind.offset"; + private static final String DEDUPLICATION_REWIND_DOC = "Rewind offset for deduplication. " + + "On failure, the connector will rewind the offset by this amount and retry. This is designed to work in concert with QuestDB " + + "deduplication feature. The rewind offset must be greater than or equal to the maximum number of records that can lost in the event of a failure. " + + "If the rewind is too small, some events might be missing from QuestDB. If the rewind is too large, the connector will be slower to recover " + + "as it will have to reprocess a large number of records and QuestDB will have to do extra work with deduplication. If you are testing this " + + "feature for the first time then 150000 is a good starting point."; + + private static final String DEFAULT_TIMESTAMP_FORMAT = "yyyy-MM-ddTHH:mm:ss.SSSUUUZ"; public QuestDBSinkConnectorConfig(ConfigDef config, Map parsedConfig) { @@ -99,7 +108,12 @@ public static ConfigDef conf() { .define(MAX_RETRIES, Type.INT, 10, Importance.LOW, MAX_RETRIES_DOC) .define(TIMESTAMP_FORMAT, Type.STRING, DEFAULT_TIMESTAMP_FORMAT, TimestampFormatValidator.INSTANCE, Importance.MEDIUM, TIMESTAMP_FORMAT_DOC) .define(TIMESTAMP_STRING_FIELDS, Type.STRING, null, Importance.MEDIUM, TIMESTAMP_STRING_FIELDS_DOC) - .define(DESIGNATED_TIMESTAMP_KAFKA_NATIVE_CONFIG, Type.BOOLEAN, false, Importance.MEDIUM, DESIGNATED_TIMESTAMP_KAFKA_NATIVE_DOC); + .define(DESIGNATED_TIMESTAMP_KAFKA_NATIVE_CONFIG, Type.BOOLEAN, false, Importance.MEDIUM, DESIGNATED_TIMESTAMP_KAFKA_NATIVE_DOC) + .define(DEDUPLICATION_REWIND_CONFIG, Type.LONG, 0, Importance.MEDIUM, DEDUPLICATION_REWIND_DOC); + } + + public long getDeduplicationRewindOffset() { + return getLong(DEDUPLICATION_REWIND_CONFIG); } public String getHost() { diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java index 5c77692..e7ad8b1 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java @@ -21,7 +21,6 @@ import java.util.concurrent.TimeUnit; public final class QuestDBSinkTask extends SinkTask { - private static final int SAFE_OFFSET_ROLLBACK = 150_000; private static final char STRUCT_FIELD_SEPARATOR = '_'; private static final String PRIMITIVE_KEY_FALLBACK_NAME = "key"; private static final String PRIMITIVE_VALUE_FALLBACK_NAME = "value"; @@ -38,8 +37,8 @@ public final class QuestDBSinkTask extends SinkTask { private long batchesSinceLastError = 0; private DateFormat dataFormat; private boolean kafkaTimestampsEnabled; - private final OffsetTracker tracker = new MultiOffsetTracker(); -//private final TopicPartitionOffsetTracker tracker = new SingleTopicPartitionOffsetTracker(); + private OffsetTracker tracker = new MultiOffsetTracker(); + private long deduplicationRewindOffset; @Override public String version() { @@ -49,6 +48,13 @@ public String version() { @Override public void start(Map map) { this.config = new QuestDBSinkConnectorConfig(map); + this.deduplicationRewindOffset = config.getDeduplicationRewindOffset(); + if (deduplicationRewindOffset == 0) { + tracker = new EmptyOffsetTracker(); + } else { + tracker = new MultiOffsetTracker(); + } + String timestampStringFields = config.getTimestampStringFields(); if (timestampStringFields != null) { stringTimestampColumns = new HashSet<>(); @@ -153,7 +159,7 @@ private void onSenderException(LineSenderException e) { closeSenderSilently(); sender = null; log.debug("Sender exception, retrying in {} ms", config.getRetryBackoffMs()); - tracker.configureSafeOffsets(context, SAFE_OFFSET_ROLLBACK); + tracker.configureSafeOffsets(context, deduplicationRewindOffset); context.timeout(config.getRetryBackoffMs()); throw new RetriableException(e); } else { @@ -164,7 +170,7 @@ private void onSenderException(LineSenderException e) { @Override public Map preCommit(Map currentOffsets) { - tracker.transformPreCommit(currentOffsets, SAFE_OFFSET_ROLLBACK); + tracker.transformPreCommit(currentOffsets, deduplicationRewindOffset); return currentOffsets; } diff --git a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java index 1462fbb..5308768 100644 --- a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java +++ b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java @@ -432,6 +432,7 @@ public void testExactlyOnce_withDedup() throws BrokenBarrierException, Interrupt Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false"); props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "ts"); + props.put(QuestDBSinkConnectorConfig.DEDUPLICATION_REWIND_CONFIG, "150000"); connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); ConnectTestUtils.assertConnectorTaskRunningEventually(connect); From 10978620d7655e43af16cffddd24daad75ae5d0f Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Tue, 31 Oct 2023 17:16:36 +0100 Subject: [PATCH 16/16] integration test explicity set rewind offset --- .../src/test/java/io/questdb/kafka/ExactlyOnceIT.java | 1 + 1 file changed, 1 insertion(+) diff --git a/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java b/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java index aa07fd1..08c8d45 100644 --- a/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java +++ b/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java @@ -301,6 +301,7 @@ private static void startConnector() throws IOException, InterruptedException, U "\"value.converter\":\"org.apache.kafka.connect.json.JsonConverter\"," + "\"topics\":\"mytopic\"," + "\"value.converter.schemas.enable\":\"false\"," + + "\"dedup.rewind.offset\":\"150000\"," + "\"timestamp.field.name\":\"ts\"," + "\"host\":\"questdb:9009\"}" + "}";