diff --git a/.github/workflows/it.yml b/.github/workflows/it.yml new file mode 100644 index 0000000..b75f03a --- /dev/null +++ b/.github/workflows/it.yml @@ -0,0 +1,17 @@ +name: Run Integration Tests +on: + workflow_dispatch: + +jobs: + publish: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: Set up JDK 11 + uses: actions/setup-java@v3 + with: + java-version: 11 + distribution: 'temurin' + cache: 'maven' + - name: Run integration tests + run: mvn -B verify \ No newline at end of file diff --git a/connector/src/main/java/io/questdb/kafka/EmptyOffsetTracker.java b/connector/src/main/java/io/questdb/kafka/EmptyOffsetTracker.java new file mode 100644 index 0000000..aef8315 --- /dev/null +++ b/connector/src/main/java/io/questdb/kafka/EmptyOffsetTracker.java @@ -0,0 +1,35 @@ +package io.questdb.kafka; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.sink.SinkTaskContext; + +import java.util.Collection; +import java.util.Map; + +public final class EmptyOffsetTracker implements OffsetTracker { + @Override + public void onPartitionsOpened(Collection partitions) { + + } + + @Override + public void onPartitionsClosed(Collection partitions) { + + } + + @Override + public void onObservedOffset(int partition, String topic, long offset) { + + } + + @Override + public void configureSafeOffsets(SinkTaskContext sinkTaskContext, long rewindOffset) { + assert rewindOffset == 0; + } + + @Override + public void transformPreCommit(Map currentOffsets, long rewindOffset) { + assert rewindOffset == 0; + } +} diff --git a/connector/src/main/java/io/questdb/kafka/MultiOffsetTracker.java b/connector/src/main/java/io/questdb/kafka/MultiOffsetTracker.java new file mode 100644 index 0000000..b9dfe76 --- /dev/null +++ b/connector/src/main/java/io/questdb/kafka/MultiOffsetTracker.java @@ -0,0 +1,122 @@ +package io.questdb.kafka; + +import io.questdb.std.CharSequenceObjHashMap; +import io.questdb.std.LongList; +import io.questdb.std.ObjList; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.sink.SinkTaskContext; + +import java.util.Collection; +import java.util.Map; + +public final class MultiOffsetTracker implements OffsetTracker { + private static final int EMPTY = -1; + private static final int CLOSED = -2; + + private final CharSequenceObjHashMap offsets = new CharSequenceObjHashMap<>(); + + private String lastTopicCache; + private LongList lastTopicOffsetsCache; + + @Override + public void onPartitionsOpened(Collection partitions) { + for (TopicPartition partition : partitions) { + String topic = partition.topic(); + LongList topicOffsets = offsets.get(topic); + if (topicOffsets == null) { + topicOffsets = new LongList(4); + offsets.put(topic, topicOffsets); + } + + int partitionId = partition.partition(); + int currentSize = topicOffsets.size(); + if (currentSize <= partitionId) { + topicOffsets.extendAndSet(partitionId, EMPTY); + if (currentSize != partitionId) { + topicOffsets.fill(currentSize, partitionId, EMPTY); + } + } else if (topicOffsets.get(partitionId) == CLOSED) { + topicOffsets.set(partitionId, EMPTY); + } + } + } + + @Override + public void onPartitionsClosed(Collection partitions) { + for (TopicPartition partition : partitions) { + String topic = partition.topic(); + LongList topicOffsets = offsets.get(topic); + topicOffsets.set(partition.partition(), CLOSED); + } + + // Remove topics that have all partitions closed + ObjList keys = offsets.keys(); + for (int i = 0, n = keys.size(); i < n; i++) { + CharSequence topic = keys.getQuick(i); + LongList topicOffsets = offsets.get(topic); + boolean allClosed = true; + for (int partition = 0, m = topicOffsets.size(); partition < m; partition++) { + if (topicOffsets.get(partition) != CLOSED) { + allClosed = false; + break; + } + } + if (allClosed) { + offsets.remove(topic); + } + } + } + + + + @Override + public void onObservedOffset(int partition, String topic, long offset) { + LongList topicOffsets; + + // intentional reference equality check - Kafka Connect use the same String instances + // so we can avoid hash map lookup + if (lastTopicCache == topic) { + topicOffsets = lastTopicOffsetsCache; + } else { + topicOffsets = offsets.get(topic); + lastTopicCache = topic; + lastTopicOffsetsCache = topicOffsets; + } + long maxOffset = topicOffsets.get(partition); + topicOffsets.set(partition, Math.max(maxOffset, offset)); + } + + + @Override + public void configureSafeOffsets(SinkTaskContext sinkTaskContext, long rewindOffset) { + ObjList keys = offsets.keys(); + for (int i = 0, n = keys.size(); i < n; i++) { + CharSequence topic = keys.getQuick(i); + LongList topicOffsets = offsets.get(topic); + for (int partition = 0, m = topicOffsets.size(); partition < m; partition++) { + long offset = topicOffsets.get(partition); + // only rewind if we ever observed an offset for this partition + if (offset >= 0) { + sinkTaskContext.offset(new TopicPartition(topic.toString(), partition), Math.max(0, offset - rewindOffset)); + } + } + } + } + + @Override + public void transformPreCommit(Map currentOffsets, long rewindOffset) { + for (Map.Entry entry : currentOffsets.entrySet()) { + TopicPartition topicPartition = entry.getKey(); + String topic = topicPartition.topic(); + LongList topicOffsets = offsets.get(topic); + long offset = topicOffsets.get(topicPartition.partition()); + + // only transform if we ever observed an offset for this partition + if (offset >= 0) { + long newOffset = Math.max(0, offset - rewindOffset); + entry.setValue(new OffsetAndMetadata(newOffset)); + } + } + } +} diff --git a/connector/src/main/java/io/questdb/kafka/OffsetTracker.java b/connector/src/main/java/io/questdb/kafka/OffsetTracker.java new file mode 100644 index 0000000..a268f4f --- /dev/null +++ b/connector/src/main/java/io/questdb/kafka/OffsetTracker.java @@ -0,0 +1,20 @@ +package io.questdb.kafka; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.sink.SinkTaskContext; + +import java.util.Collection; +import java.util.Map; + +public interface OffsetTracker { + void onPartitionsOpened(Collection partitions); + + void onPartitionsClosed(Collection partitions); + + void onObservedOffset(int partition, String topic, long offset); + + void configureSafeOffsets(SinkTaskContext sinkTaskContext, long rewindOffset); + + void transformPreCommit(Map currentOffsets, long rewindOffset); +} diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkConnectorConfig.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkConnectorConfig.java index 1fb6a5f..73d0710 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkConnectorConfig.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkConnectorConfig.java @@ -70,6 +70,15 @@ public final class QuestDBSinkConnectorConfig extends AbstractConfig { public static final String TIMESTAMP_FORMAT = "timestamp.string.format"; private static final String TIMESTAMP_FORMAT_DOC = "Timestamp format. Used when parsing timestamp string fields"; + public static final String DEDUPLICATION_REWIND_CONFIG = "dedup.rewind.offset"; + private static final String DEDUPLICATION_REWIND_DOC = "Rewind offset for deduplication. " + + "On failure, the connector will rewind the offset by this amount and retry. This is designed to work in concert with QuestDB " + + "deduplication feature. The rewind offset must be greater than or equal to the maximum number of records that can lost in the event of a failure. " + + "If the rewind is too small, some events might be missing from QuestDB. If the rewind is too large, the connector will be slower to recover " + + "as it will have to reprocess a large number of records and QuestDB will have to do extra work with deduplication. If you are testing this " + + "feature for the first time then 150000 is a good starting point."; + + private static final String DEFAULT_TIMESTAMP_FORMAT = "yyyy-MM-ddTHH:mm:ss.SSSUUUZ"; public QuestDBSinkConnectorConfig(ConfigDef config, Map parsedConfig) { @@ -99,7 +108,12 @@ public static ConfigDef conf() { .define(MAX_RETRIES, Type.INT, 10, Importance.LOW, MAX_RETRIES_DOC) .define(TIMESTAMP_FORMAT, Type.STRING, DEFAULT_TIMESTAMP_FORMAT, TimestampFormatValidator.INSTANCE, Importance.MEDIUM, TIMESTAMP_FORMAT_DOC) .define(TIMESTAMP_STRING_FIELDS, Type.STRING, null, Importance.MEDIUM, TIMESTAMP_STRING_FIELDS_DOC) - .define(DESIGNATED_TIMESTAMP_KAFKA_NATIVE_CONFIG, Type.BOOLEAN, false, Importance.MEDIUM, DESIGNATED_TIMESTAMP_KAFKA_NATIVE_DOC); + .define(DESIGNATED_TIMESTAMP_KAFKA_NATIVE_CONFIG, Type.BOOLEAN, false, Importance.MEDIUM, DESIGNATED_TIMESTAMP_KAFKA_NATIVE_DOC) + .define(DEDUPLICATION_REWIND_CONFIG, Type.LONG, 0, Importance.MEDIUM, DEDUPLICATION_REWIND_DOC); + } + + public long getDeduplicationRewindOffset() { + return getLong(DEDUPLICATION_REWIND_CONFIG); } public String getHost() { diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java index b72f013..e7ad8b1 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java @@ -37,6 +37,8 @@ public final class QuestDBSinkTask extends SinkTask { private long batchesSinceLastError = 0; private DateFormat dataFormat; private boolean kafkaTimestampsEnabled; + private OffsetTracker tracker = new MultiOffsetTracker(); + private long deduplicationRewindOffset; @Override public String version() { @@ -46,6 +48,13 @@ public String version() { @Override public void start(Map map) { this.config = new QuestDBSinkConnectorConfig(map); + this.deduplicationRewindOffset = config.getDeduplicationRewindOffset(); + if (deduplicationRewindOffset == 0) { + tracker = new EmptyOffsetTracker(); + } else { + tracker = new MultiOffsetTracker(); + } + String timestampStringFields = config.getTimestampStringFields(); if (timestampStringFields != null) { stringTimestampColumns = new HashSet<>(); @@ -73,6 +82,16 @@ public void start(Map map) { this.timestampUnits = config.getTimestampUnitsOrNull(); } + @Override + public void open(Collection partitions) { + tracker.onPartitionsOpened(partitions); + } + + @Override + public void close(Collection partitions) { + tracker.onPartitionsClosed(partitions); + } + private Sender createSender() { log.debug("Creating a new sender"); Sender.LineSenderBuilder builder = Sender.builder().address(config.getHost()); @@ -119,8 +138,8 @@ public void put(Collection collection) { if (++batchesSinceLastError == 10) { // why 10? why not to reset the retry counter immediately upon a successful flush()? // there are two reasons for server disconnections: - // 1. the server is down / unreachable / other_infrastructure_issues - // 2. the client is sending bad data (e.g. pushing a string to a double column) + // 1. infrastructure: the server is down / unreachable / other_infrastructure_issues + // 2. structural: the client is sending bad data (e.g. pushing a string to a double column) // errors in the latter case are not recoverable. upon receiving bad data the server will *eventually* close the connection, // after a while, the client will notice that the connection is closed and will try to reconnect // if we reset the retry counter immediately upon first successful flush() then we end-up in a loop where we flush bad data, @@ -140,6 +159,7 @@ private void onSenderException(LineSenderException e) { closeSenderSilently(); sender = null; log.debug("Sender exception, retrying in {} ms", config.getRetryBackoffMs()); + tracker.configureSafeOffsets(context, deduplicationRewindOffset); context.timeout(config.getRetryBackoffMs()); throw new RetriableException(e); } else { @@ -147,6 +167,13 @@ private void onSenderException(LineSenderException e) { } } + + @Override + public Map preCommit(Map currentOffsets) { + tracker.transformPreCommit(currentOffsets, deduplicationRewindOffset); + return currentOffsets; + } + private void closeSenderSilently() { try { if (sender != null) { @@ -159,6 +186,9 @@ private void closeSenderSilently() { private void handleSingleRecord(SinkRecord record) { assert timestampColumnValue == Long.MIN_VALUE; + + tracker.onObservedOffset(record.kafkaPartition(), record.topic(), record.kafkaOffset()); + String explicitTable = config.getTable(); String tableName = explicitTable == null ? record.topic() : explicitTable; sender.table(tableName); @@ -410,7 +440,7 @@ private boolean tryWriteLogicalType(String name, Schema schema, Object value) { @Override public void flush(Map map) { - // not needed as put() flushes after each record + // not needed as put() flushes after each batch } @Override diff --git a/connector/src/test/java/io/questdb/kafka/ConnectTestUtils.java b/connector/src/test/java/io/questdb/kafka/ConnectTestUtils.java index 712f2f2..5ff9d73 100644 --- a/connector/src/test/java/io/questdb/kafka/ConnectTestUtils.java +++ b/connector/src/test/java/io/questdb/kafka/ConnectTestUtils.java @@ -4,6 +4,7 @@ import org.apache.kafka.connect.runtime.AbstractStatus; import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; +import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; import org.apache.kafka.connect.storage.StringConverter; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; import org.awaitility.Awaitility; @@ -42,18 +43,22 @@ static void assertConnectorTaskStateEventually(EmbeddedConnectCluster connect, A } static Map baseConnectorProps(GenericContainer questDBContainer, String topicName) { + String ilpIUrl = questDBContainer.getHost() + ":" + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_ILP_PORT); + Map props = new HashMap<>(); props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, QuestDBSinkConnector.class.getName()); props.put("topics", topicName); props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); props.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); - props.put("host", questDBContainer.getHost() + ":" + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_ILP_PORT)); + props.put("host", ilpIUrl); return props; } static void assertConnectorTaskState(EmbeddedConnectCluster connect, String connectorName, AbstractStatus.State expectedState) { - ConnectorStateInfo info = connect.connectorStatus(connectorName); - if (info == null) { + ConnectorStateInfo info = null; + try { + info = connect.connectorStatus(connectorName); + } catch (ConnectRestException e) { fail("Connector " + connectorName + " not found"); } List taskStates = info.tasks(); diff --git a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedAuthTest.java b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedAuthTest.java index c1f42b5..dc991c9 100644 --- a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedAuthTest.java +++ b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedAuthTest.java @@ -82,8 +82,9 @@ public void testSmoke() { connect.kafka().produce(topicName, "key", new String(converter.fromConnectData(topicName, schema, struct))); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"age\"\r\n" + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\"\r\n" + "\"John\",\"Doe\",42\r\n", - "select firstname,lastname,age from " + topicName); + "select firstname,lastname,age from " + topicName, + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT)); } } diff --git a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java index cb47e40..5308768 100644 --- a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java +++ b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java @@ -1,9 +1,12 @@ package io.questdb.kafka; +import io.questdb.std.Files; +import io.questdb.std.Os; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.connect.data.Date; import org.apache.kafka.connect.data.*; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.json.JsonConverter; @@ -13,23 +16,23 @@ import org.apache.kafka.connect.storage.ConverterType; import org.apache.kafka.connect.storage.StringConverter; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.io.CleanupMode; +import org.junit.jupiter.api.io.TempDir; import org.slf4j.LoggerFactory; import org.testcontainers.containers.FixedHostPortGenericContainer; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; -import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import java.math.BigDecimal; -import java.util.Calendar; -import java.util.HashMap; -import java.util.Map; -import java.util.TimeZone; +import java.nio.file.Path; +import java.time.Instant; +import java.util.*; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import static java.util.Collections.singletonMap; @@ -42,33 +45,58 @@ @Testcontainers public final class QuestDBSinkConnectorEmbeddedTest { + private static int httpPort = -1; + private static int ilpPort = -1; + private static final String OFFICIAL_QUESTDB_DOCKER = "questdb/questdb:nightly"; + private static final boolean DUMP_QUESTDB_CONTAINER_LOGS = true; + private EmbeddedConnectCluster connect; private Converter converter; private String topicName; - @Container - private static GenericContainer questDBContainer = newQuestDbConnector(); + @TempDir(cleanup = CleanupMode.NEVER) + static Path dbRoot; - private static GenericContainer newQuestDbConnector() { - return newQuestDbConnector(null, null); + @BeforeAll + public static void createContainer() { + questDBContainer = newQuestDbConnector(); + } + + @AfterAll + public static void stopContainer() { + questDBContainer.stop(); + Files.rmdir(io.questdb.std.str.Path.getThreadLocal(dbRoot.toAbsolutePath().toString())); } - private static GenericContainer newQuestDbConnector(Integer httpPort, Integer ilpPort) { - FixedHostPortGenericContainer selfGenericContainer = new FixedHostPortGenericContainer<>("questdb/questdb:6.6.1"); - if (httpPort != null) { - selfGenericContainer.withFixedExposedPort(httpPort, QuestDBUtils.QUESTDB_HTTP_PORT); + private static String questDBDirectory() { + return dbRoot.resolve("questdb").toAbsolutePath().toString(); + } + + private static GenericContainer questDBContainer; + + private static GenericContainer newQuestDbConnector() { + FixedHostPortGenericContainer selfGenericContainer = new FixedHostPortGenericContainer<>(OFFICIAL_QUESTDB_DOCKER); + if (httpPort != -1) { + selfGenericContainer = selfGenericContainer.withFixedExposedPort(httpPort, QuestDBUtils.QUESTDB_HTTP_PORT); } else { selfGenericContainer.addExposedPort(QuestDBUtils.QUESTDB_HTTP_PORT); } - if (ilpPort != null) { - selfGenericContainer.withFixedExposedPort(ilpPort, QuestDBUtils.QUESTDB_ILP_PORT); + if (ilpPort != -1) { + selfGenericContainer = selfGenericContainer.withFixedExposedPort(ilpPort, QuestDBUtils.QUESTDB_ILP_PORT); } else { selfGenericContainer.addExposedPort(QuestDBUtils.QUESTDB_ILP_PORT); } - selfGenericContainer.setWaitStrategy(new LogMessageWaitStrategy().withRegEx(".*server-main enjoy.*")); - return selfGenericContainer.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("questdb"))) - .withEnv("QDB_CAIRO_COMMIT_LAG", "100") - .withEnv("JAVA_OPTS", "-Djava.locale.providers=JRE,SPI"); + selfGenericContainer = selfGenericContainer.withFileSystemBind(questDBDirectory(), "/var/lib/questdb"); + if (DUMP_QUESTDB_CONTAINER_LOGS) { + selfGenericContainer = selfGenericContainer.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("questdb"))); + } + selfGenericContainer.setWaitStrategy(new LogMessageWaitStrategy().withRegEx(".*server-main.*")); + selfGenericContainer.start(); + + httpPort = selfGenericContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT); + ilpPort = selfGenericContainer.getMappedPort(QuestDBUtils.QUESTDB_ILP_PORT); + + return selfGenericContainer; } @BeforeEach @@ -78,8 +106,13 @@ public void setUp() { jsonConverter.configure(singletonMap(ConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName())); converter = jsonConverter; + + Map props = new HashMap<>(); + props.put("connector.client.config.override.policy", "All"); connect = new EmbeddedConnectCluster.Builder() .name("questdb-connect-cluster") + .workerProps(props) + .numWorkers(4) .build(); connect.start(); @@ -109,9 +142,10 @@ public void testSmoke() { connect.kafka().produce(topicName, "key", new String(converter.fromConnectData(topicName, schema, struct))); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"age\"\r\n" + QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n" + "\"John\",\"Doe\",42\r\n", - "select firstname,lastname,age from " + topicName); + "select firstname,lastname,age from " + topicName, + httpPort); } @Test @@ -127,9 +161,10 @@ public void testDeadLetterQueue_wrongJson() { connect.kafka().produce(topicName, "key", "{\"not valid json}"); connect.kafka().produce(topicName, "key", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42}"); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"age\"\r\n" + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\"\r\n" + "\"John\",\"Doe\",42\r\n", - "select firstname,lastname,age from " + topicName); + "select firstname,lastname,age from " + topicName, + httpPort); ConsumerRecords fetchedRecords = connect.kafka().consume(1, 5000, "dlq"); Assertions.assertEquals(1, fetchedRecords.count()); @@ -158,9 +193,10 @@ public void testSymbol() { connect.kafka().produce(topicName, "key", new String(converter.fromConnectData(topicName, schema, struct))); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"age\"\r\n" + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\"\r\n" + "\"John\",\"Doe\",42\r\n", - "select firstname,lastname,age from " + topicName); + "select firstname,lastname,age from " + topicName, + httpPort); } @Test @@ -176,9 +212,10 @@ public void testRetrying_badDataStopsTheConnectorEventually() throws Exception { // creates a record with 'age' as long connect.kafka().produce(topicName, "key", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42}"); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"age\"\r\n" + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\"\r\n" + "\"John\",\"Doe\",42\r\n", - "select firstname,lastname,age from " + topicName); + "select firstname,lastname,age from " + topicName, + httpPort); for (int i = 0; i < 50; i++) { // injects a record with 'age' as string @@ -208,14 +245,11 @@ public void testRetrying_recoversFromInfrastructureIssues() throws Exception { connect.kafka().produce(topicName, "key1", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42}"); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"age\"\r\n" + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\"\r\n" + "\"John\",\"Doe\",42\r\n", - "select firstname,lastname,age from " + topicName); + "select firstname,lastname,age from " + topicName, + httpPort); - // we need to get mapped ports, becasue we are going to kill the container and restart it again - // and we need the same mapping otherwise the Kafka connect will not be able to re-connect to it - Integer httpPort = questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT); - Integer ilpPort = questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_ILP_PORT); questDBContainer.stop(); // insert a few records while the QuestDB is down for (int i = 0; i < 10; i++) { @@ -224,16 +258,16 @@ public void testRetrying_recoversFromInfrastructureIssues() throws Exception { } // restart QuestDB - questDBContainer = newQuestDbConnector(httpPort, ilpPort); - questDBContainer.start(); + questDBContainer = newQuestDbConnector(); for (int i = 0; i < 50; i++) { connect.kafka().produce(topicName, "key3", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":" + i + "}"); Thread.sleep(100); } - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"age\"\r\n" + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\"\r\n" + "\"John\",\"Doe\",49\r\n", - "select firstname,lastname,age from " + topicName + " where age = 49"); + "select firstname,lastname,age from " + topicName + " where age = 49", + httpPort); } @Test @@ -312,10 +346,11 @@ public void testSymbol_withAllOtherILPTypes() { connect.kafka().produce(topicName, "p1", new String(converter.fromConnectData(topicName, schema, p1))); connect.kafka().produce(topicName, "p2", new String(converter.fromConnectData(topicName, schema, p2))); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"age\",\"vegan\",\"height\",\"birth\"\r\n" + QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\",\"vegan\",\"height\",\"birth\"\r\n" + "\"John\",\"Doe\",42,true,1.8,\"2022-10-23T13:53:59.123000Z\"\r\n" + "\"Jane\",\"Doe\",41,false,1.6,\"2021-10-23T13:53:59.123000Z\"\r\n", - "select firstname,lastname,age,vegan,height,birth from " + topicName); + "select firstname,lastname,age,vegan,height,birth from " + topicName, + httpPort); } @Test @@ -336,16 +371,104 @@ public void testUpfrontTable_withSymbols() { .put("lastname", "Doe") .put("age", (byte) 42); - QuestDBUtils.assertSql(questDBContainer, - "{\"ddl\":\"OK\"}\n", + QuestDBUtils.assertSql( + "{\"ddl\":\"OK\"}", "create table " + topicName + " (firstname symbol, lastname symbol, age int)", + httpPort, QuestDBUtils.Endpoint.EXEC); connect.kafka().produce(topicName, "key", new String(converter.fromConnectData(topicName, schema, struct))); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"age\",\"key\"\r\n" + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\",\"key\"\r\n" + "\"John\",\"Doe\",42,\"key\"\r\n", - "select * from " + topicName); + "select * from " + topicName, + httpPort); + } + + @Test + public void testExactlyOnce_withDedup() throws BrokenBarrierException, InterruptedException { + connect.kafka().createTopic(topicName, 4); + + Schema schema = SchemaBuilder.struct().name("com.example.Event") + .field("ts", Schema.INT64_SCHEMA) + .field("id", Schema.STRING_SCHEMA) + .field("type", Schema.INT64_SCHEMA) + .build(); + + // async inserts to Kafka + long recordCount = 1_000_000; + Map prodProps = new HashMap<>(); + new Thread(() -> { + try (KafkaProducer producer = connect.kafka().createProducer(prodProps)) { + for (long i = 0; i < recordCount; i++) { + Instant now = Instant.now(); + long nanoTs = now.getEpochSecond() * 1_000_000_000 + now.getNano(); + Struct struct = new Struct(schema) + .put("ts", nanoTs) + .put("id", UUID.randomUUID().toString()) + .put("type", (i % 5)); + + byte[] value = new String(converter.fromConnectData(topicName, schema, struct)).getBytes(); + producer.send(new ProducerRecord<>(topicName, null, null, value)); + + // 1% chance of duplicates - we want them to be also deduped by QuestDB + if (ThreadLocalRandom.current().nextInt(100) == 0) { + producer.send(new ProducerRecord<>(topicName, null, null, value)); + } + + } + } + }).start(); + + + // configure questdb dedups + QuestDBUtils.assertSql( + "{\"ddl\":\"OK\"}", + "CREATE TABLE " + topicName + " (ts TIMESTAMP, id UUID, type LONG) timestamp(ts) PARTITION BY DAY WAL DEDUP UPSERT KEYS(ts, id);", + httpPort, + QuestDBUtils.Endpoint.EXEC); + + // start connector + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); + props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false"); + props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "ts"); + props.put(QuestDBSinkConnectorConfig.DEDUPLICATION_REWIND_CONFIG, "150000"); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); + + // restart QuestDB every 15 seconds + CyclicBarrier barrier = new CyclicBarrier(2); + new Thread(() -> { + while (barrier.getNumberWaiting() == 0) { + Os.sleep(15_000); + restartQuestDB(); + } + try { + barrier.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError(e); + } catch (BrokenBarrierException e) { + // shouldn't happen + throw new RuntimeException(e); + } + }).start(); + + // make sure we have all records in the table + QuestDBUtils.assertSqlEventually( + "\"count\"\r\n" + + recordCount + "\r\n", + "select count(*) from " + topicName, + 600, + httpPort); + + // await the restarter thread so we don't leave dangling threads behind + barrier.await(); + } + + private static void restartQuestDB() { + questDBContainer.stop(); + questDBContainer = newQuestDbConnector(); } @Test @@ -372,11 +495,12 @@ public void testTimestampUnitResolution_auto() { connect.kafka().produce(topicName, "bar", "{\"firstname\":\"Jane\",\"lastname\":\"Doe\",\"birth\":" + birthInMicros + "}"); connect.kafka().produce(topicName, "baz", "{\"firstname\":\"Jack\",\"lastname\":\"Doe\",\"birth\":" + birthInNanos + "}"); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"timestamp\"\r\n" + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"timestamp\"\r\n" + "\"John\",\"Doe\",\"2022-10-23T13:53:59.123000Z\"\r\n" + "\"Jane\",\"Doe\",\"2022-10-23T13:53:59.123000Z\"\r\n" + "\"Jack\",\"Doe\",\"2022-10-23T13:53:59.123000Z\"\r\n", - "select firstname,lastname,timestamp from " + topicName); + "select firstname,lastname,timestamp from " + topicName, + httpPort); } @Test @@ -428,10 +552,11 @@ private void testTimestampUnitResolution0(String mode) { connect.kafka().produce(topicName, "foo", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"birth\":0}"); connect.kafka().produce(topicName, "bar", "{\"firstname\":\"Jane\",\"lastname\":\"Doe\",\"birth\":" + birthTarget + "}"); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"timestamp\"\r\n" + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"timestamp\"\r\n" + "\"John\",\"Doe\",\"1970-01-01T00:00:00.000000Z\"\r\n" + "\"Jane\",\"Doe\",\"2206-11-20T17:46:39.999000Z\"\r\n", - "select firstname,lastname,timestamp from " + topicName); + "select firstname,lastname,timestamp from " + topicName, + httpPort); } @Test @@ -458,9 +583,10 @@ public void testKafkaNativeTimestamp() { connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); ConnectTestUtils.assertConnectorTaskRunningEventually(connect); - QuestDBUtils.assertSql(questDBContainer, - "{\"ddl\":\"OK\"}\n", + QuestDBUtils.assertSql( + "{\"ddl\":\"OK\"}", "create table " + topicName + " (firstname string, lastname string, born timestamp) timestamp(born)", + httpPort, QuestDBUtils.Endpoint.EXEC); java.util.Date birth = new Calendar.Builder() @@ -476,9 +602,10 @@ public void testKafkaNativeTimestamp() { producer.send(record); } - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"born\"\r\n" + + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"born\"\r\n" + "\"John\",\"Doe\",\"2022-10-23T13:53:59.123000Z\"\r\n", - "select * from " + topicName); + "select * from " + topicName, + httpPort); } @Test @@ -504,9 +631,10 @@ public void testTimestampSMT_parseTimestamp_schemaLess() { connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); ConnectTestUtils.assertConnectorTaskRunningEventually(connect); - QuestDBUtils.assertSql(questDBContainer, - "{\"ddl\":\"OK\"}\n", + QuestDBUtils.assertSql( + "{\"ddl\":\"OK\"}", "create table " + topicName + " (firstname string, lastname string, death timestamp, born timestamp) timestamp(born)", + httpPort, QuestDBUtils.Endpoint.EXEC); String birthTimestamp = "1985-08-02 16:41:55.402 UTC"; @@ -518,9 +646,10 @@ public void testTimestampSMT_parseTimestamp_schemaLess() { + ",\"born\":\"" + birthTimestamp + "\"}" ); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"death\",\"born\"\r\n" + + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"death\",\"born\"\r\n" + "\"John\",\"Doe\",\"2023-08-02T16:41:55.402000Z\",\"1985-08-02T16:41:55.402000Z\"\r\n", - "select * from " + topicName); + "select * from " + topicName, + httpPort); } @Test @@ -560,9 +689,10 @@ public void testTimestampSMT_parseTimestamp_withSchema() { connect.kafka().produce(topicName, "key", new String(converter.fromConnectData(topicName, schema, struct))); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"death\",\"timestamp\"\r\n" + + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"death\",\"timestamp\"\r\n" + "\"John\",\"Doe\",\"2023-08-02T16:41:55.402000Z\",\"1985-08-02T16:41:55.402000Z\"\r\n", - "select * from " + topicName); + "select * from " + topicName, + httpPort); } @Test @@ -582,16 +712,18 @@ public void testUpfrontTable() { .put("lastname", "Doe") .put("age", (byte) 42); - QuestDBUtils.assertSql(questDBContainer, - "{\"ddl\":\"OK\"}\n", + QuestDBUtils.assertSql( + "{\"ddl\":\"OK\"}", "create table " + topicName + " (firstname string, lastname string, age int)", + httpPort, QuestDBUtils.Endpoint.EXEC); connect.kafka().produce(topicName, "key", new String(converter.fromConnectData(topicName, schema, struct))); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"age\",\"key\"\r\n" + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\",\"key\"\r\n" + "\"John\",\"Doe\",42,\"key\"\r\n", - "select * from " + topicName); + "select * from " + topicName, + httpPort); } @Test @@ -605,9 +737,10 @@ public void testDesignatedTimestamp_noSchema_unixEpochMillis() { connect.kafka().produce(topicName, "foo", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"birth\":433774466123}"); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"key\",\"firstname\",\"lastname\",\"timestamp\"\r\n" + QuestDBUtils.assertSqlEventually("\"key\",\"firstname\",\"lastname\",\"timestamp\"\r\n" + "\"foo\",\"John\",\"Doe\",\"1983-09-30T12:54:26.123000Z\"\r\n", - "select * from " + topicName); + "select * from " + topicName, + httpPort); } @Test @@ -626,9 +759,10 @@ public void testDesignatedTimestamp_noSchema_dateTransform_fromStringToTimestamp connect.kafka().produce(topicName, "foo", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"birth\":\"1989-09-23T10:25:33.107Z\"}"); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"key\",\"firstname\",\"lastname\",\"timestamp\"\r\n" + QuestDBUtils.assertSqlEventually("\"key\",\"firstname\",\"lastname\",\"timestamp\"\r\n" + "\"foo\",\"John\",\"Doe\",\"1989-09-23T10:25:33.107000Z\"\r\n", - "select * from " + topicName); + "select * from " + topicName, + httpPort); } @Test @@ -658,9 +792,10 @@ public void testDesignatedTimestamp_withSchema() { connect.kafka().produce(topicName, "key", new String(converter.fromConnectData(topicName, schema, struct))); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"key\",\"firstname\",\"lastname\",\"timestamp\"\r\n" + QuestDBUtils.assertSqlEventually("\"key\",\"firstname\",\"lastname\",\"timestamp\"\r\n" + "\"key\",\"John\",\"Doe\",\"2022-10-23T13:53:59.123000Z\"\r\n", - "select * from " + topicName); + "select * from " + topicName, + httpPort); } @Test @@ -691,9 +826,10 @@ public void testDoNotIncludeKey() { connect.kafka().produce(topicName, "key", new String(converter.fromConnectData(topicName, schema, struct))); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"timestamp\"\r\n" + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"timestamp\"\r\n" + "\"John\",\"Doe\",\"2022-10-23T13:53:59.123000Z\"\r\n", - "select * from " + topicName); + "select * from " + topicName, + httpPort); } @Test @@ -705,9 +841,10 @@ public void testJsonNoSchema() { ConnectTestUtils.assertConnectorTaskRunningEventually(connect); connect.kafka().produce(topicName, "key", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42}"); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"age\"\r\n" + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\"\r\n" + "\"John\",\"Doe\",42\r\n", - "select firstname,lastname,age from " + topicName); + "select firstname,lastname,age from " + topicName, + httpPort); } @Test @@ -721,10 +858,11 @@ public void testJsonNoSchema_mixedFlotingAndIntTypes() { connect.kafka().produce(topicName, "key", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42}"); connect.kafka().produce(topicName, "key", "{\"firstname\":\"Jane\",\"lastname\":\"Doe\",\"age\":42.5}"); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"age\"\r\n" + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\"\r\n" + "\"John\",\"Doe\",42.0\r\n" + "\"Jane\",\"Doe\",42.5\r\n", - "select firstname,lastname,age from " + topicName); + "select firstname,lastname,age from " + topicName, + httpPort); } @@ -759,9 +897,10 @@ public void testPrimitiveKey() { connect.kafka().produce(topicName, "key", new String(converter.fromConnectData(topicName, schema, struct))); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"age\",\"key\"\r\n" + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\",\"key\"\r\n" + "\"John\",\"Doe\",42,\"key\"\r\n", - "select firstname, lastname, age, key from " + topicName); + "select firstname, lastname, age, key from " + topicName, + httpPort); } @Test @@ -777,9 +916,10 @@ public void testParsingStringTimestamp() { connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); ConnectTestUtils.assertConnectorTaskRunningEventually(connect); - QuestDBUtils.assertSql(questDBContainer, - "{\"ddl\":\"OK\"}\n", + QuestDBUtils.assertSql( + "{\"ddl\":\"OK\"}", "create table " + topicName + " (firstname string, lastname string, death timestamp, born timestamp) timestamp(born)", + httpPort, QuestDBUtils.Endpoint.EXEC); String birthTimestamp = "1985-08-02 16:41:55.402095 UTC"; @@ -791,9 +931,10 @@ public void testParsingStringTimestamp() { + ",\"born\":\"" + birthTimestamp + "\"}" ); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"death\",\"born\"\r\n" + + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"death\",\"born\"\r\n" + "\"John\",\"Doe\",\"2023-08-02T16:41:55.402095Z\",\"1985-08-02T16:41:55.402095Z\"\r\n", - "select * from " + topicName); + "select * from " + topicName, + httpPort); } @Test @@ -808,9 +949,10 @@ public void testParsingStringTimestamp_defaultPattern() { connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); ConnectTestUtils.assertConnectorTaskRunningEventually(connect); - QuestDBUtils.assertSql(questDBContainer, - "{\"ddl\":\"OK\"}\n", + QuestDBUtils.assertSql( + "{\"ddl\":\"OK\"}", "create table " + topicName + " (firstname string, lastname string, death timestamp, born timestamp) timestamp(born)", + httpPort, QuestDBUtils.Endpoint.EXEC); String birthTimestamp = "1985-08-02T16:41:55.402095Z"; @@ -822,9 +964,10 @@ public void testParsingStringTimestamp_defaultPattern() { + ",\"born\":\"" + birthTimestamp + "\"}" ); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"death\",\"born\"\r\n" + + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"death\",\"born\"\r\n" + "\"John\",\"Doe\",\"2023-08-02T16:41:55.402095Z\",\"1985-08-02T16:41:55.402095Z\"\r\n", - "select * from " + topicName); + "select * from " + topicName, + httpPort); } @Test @@ -841,9 +984,10 @@ public void testCustomPrefixWithPrimitiveKeyAndValues() { connect.kafka().produce(topicName, "foo", "bar"); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"col_key\",\"col_value\"\r\n" + QuestDBUtils.assertSqlEventually("\"col_key\",\"col_value\"\r\n" + "\"foo\",\"bar\"\r\n", - "select col_key, col_value from " + topicName); + "select col_key, col_value from " + topicName, + httpPort); } @Test @@ -866,9 +1010,10 @@ public void testSkipUnsupportedType_Bytes() { connect.kafka().produce(topicName, "key", new String(converter.fromConnectData(topicName, schema, struct))); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"key\",\"firstname\",\"lastname\"\r\n" + QuestDBUtils.assertSqlEventually("\"key\",\"firstname\",\"lastname\"\r\n" + "\"key\",\"John\",\"Doe\"\r\n", - "select key, firstname, lastname from " + topicName); + "select key, firstname, lastname from " + topicName, + httpPort); } @Test @@ -883,9 +1028,10 @@ public void testDefaultPrefixWithPrimitiveKeyAndValues() { connect.kafka().produce(topicName, "foo", "bar"); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"key\",\"value\"\r\n" + QuestDBUtils.assertSqlEventually("\"key\",\"value\"\r\n" + "\"foo\",\"bar\"\r\n", - "select key, value from " + topicName); + "select key, value from " + topicName, + httpPort); } @Test @@ -908,9 +1054,10 @@ public void testStructKey() { String json = new String(converter.fromConnectData(topicName, schema, struct)); connect.kafka().produce(topicName, json, json); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"key_firstname\",\"key_lastname\"\r\n" + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"key_firstname\",\"key_lastname\"\r\n" + "\"John\",\"Doe\",\"John\",\"Doe\"\r\n", - "select firstname, lastname, key_firstname, key_lastname from " + topicName); + "select firstname, lastname, key_firstname, key_lastname from " + topicName, + httpPort); } @Test @@ -935,9 +1082,10 @@ public void testStructKeyWithNoPrefix() { String json = new String(converter.fromConnectData(topicName, schema, struct)); connect.kafka().produce(topicName, json, "foo"); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"value\"\r\n" + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"value\"\r\n" + "\"John\",\"Doe\",\"foo\"\r\n", - "select firstname, lastname, value from " + topicName); + "select firstname, lastname, value from " + topicName, + httpPort); } @Test @@ -961,9 +1109,10 @@ public void testStructKeyAndPrimitiveValue() { String json = new String(converter.fromConnectData(topicName, schema, struct)); connect.kafka().produce(topicName, json, "foo"); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"key_firstname\",\"key_lastname\",\"value\"\r\n" + QuestDBUtils.assertSqlEventually("\"key_firstname\",\"key_lastname\",\"value\"\r\n" + "\"John\",\"Doe\",\"foo\"\r\n", - "select key_firstname, key_lastname, value from " + topicName); + "select key_firstname, key_lastname, value from " + topicName, + httpPort); } @@ -988,9 +1137,10 @@ public void testExplicitTableName() { connect.kafka().produce(topicName, "key", new String(converter.fromConnectData(topicName, schema, struct))); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"age\"\r\n" + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\"\r\n" + "\"John\",\"Doe\",42\r\n", - "select firstname,lastname,age from " + tableName); + "select firstname,lastname,age from " + tableName, + httpPort); } @Test @@ -1037,9 +1187,10 @@ public void testLogicalTypes() { connect.kafka().produce(topicName, "key", new String(converter.fromConnectData(topicName, schema, struct))); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"age\",\"col_timestamp\",\"col_date\",\"col_time\"\r\n" + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\",\"col_timestamp\",\"col_date\",\"col_time\"\r\n" + "\"John\",\"Doe\",42,\"2022-10-23T13:53:59.123000Z\",\"2022-10-23T00:00:00.000000Z\",50039123\r\n", - "select firstname,lastname,age, col_timestamp, col_date, col_time from " + topicName); + "select firstname,lastname,age, col_timestamp, col_date, col_time from " + topicName, + httpPort); } @Test @@ -1093,9 +1244,10 @@ public void testNestedStructInValue() { String value = new String(converter.fromConnectData(topicName, personSchema, person)); connect.kafka().produce(topicName, "key", value); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"name_firstname\",\"name_lastname\"\r\n" + QuestDBUtils.assertSqlEventually("\"name_firstname\",\"name_lastname\"\r\n" + "\"John\",\"Doe\"\r\n", - "select name_firstname, name_lastname from " + topicName); + "select name_firstname, name_lastname from " + topicName, + httpPort); } @Test @@ -1135,8 +1287,9 @@ public void testMultiLevelNestedStructInValue() { String value = new String(converter.fromConnectData(topicName, coupleSchema, couple)); connect.kafka().produce(topicName, "key", value); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"partner1_name_firstname\",\"partner1_name_lastname\",\"partner2_name_firstname\",\"partner2_name_lastname\"\r\n" + QuestDBUtils.assertSqlEventually("\"partner1_name_firstname\",\"partner1_name_lastname\",\"partner2_name_firstname\",\"partner2_name_lastname\"\r\n" + "\"John\",\"Doe\",\"Jane\",\"Doe\"\r\n", - "select partner1_name_firstname, partner1_name_lastname, partner2_name_firstname, partner2_name_lastname from " + topicName); + "select partner1_name_firstname, partner1_name_lastname, partner2_name_firstname, partner2_name_lastname from " + topicName, + httpPort); } } diff --git a/connector/src/test/java/io/questdb/kafka/QuestDBUtils.java b/connector/src/test/java/io/questdb/kafka/QuestDBUtils.java index 6cacc5d..6970672 100644 --- a/connector/src/test/java/io/questdb/kafka/QuestDBUtils.java +++ b/connector/src/test/java/io/questdb/kafka/QuestDBUtils.java @@ -4,7 +4,6 @@ import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; -import org.testcontainers.containers.GenericContainer; import java.io.IOException; import java.net.URLEncoder; @@ -43,23 +42,32 @@ private QuestDBUtils() { } - public static void assertSqlEventually(GenericContainer questdbContainer, String expectedResult, String query) { - await().atMost(QUERY_WAITING_TIME_SECONDS, TimeUnit.SECONDS).untilAsserted(() -> assertSql(questdbContainer, expectedResult, query)); + public static void assertSqlEventually(String expectedResult, String query, int timeoutSeconds, int port) { + await().atMost(timeoutSeconds, TimeUnit.SECONDS).untilAsserted(() -> assertSql(expectedResult, query, port)); } - public static void assertSql(GenericContainer questdbContainer, String expectedResult, String query) { - assertSql(questdbContainer, expectedResult, query, Endpoint.EXPORT); + public static void assertSqlEventually(String expectedResult, String query, int port) { + await().atMost(QUERY_WAITING_TIME_SECONDS, TimeUnit.SECONDS).untilAsserted(() -> assertSql(expectedResult, query, port)); } - public static void assertSql(GenericContainer questdbContainer, String expectedResult, String query, Endpoint endpoint) { - try (Response response = executeQuery(questdbContainer, query, endpoint)) { + public static void assertSql(String expectedResult, String query, int port) { + assertSql(expectedResult, query, port, Endpoint.EXPORT); + } + + public static void assertSql(String expectedResult, String query, int port, Endpoint endpoint) { + try (Response response = executeQuery(port, query, endpoint)) { if (response.code() != 200) { fail("Query failed, returned code " + response.code()); } try (okhttp3.ResponseBody body = response.body()) { if (body != null) { String bodyString = body.string(); - assertEquals(expectedResult, bodyString); + try { + assertEquals(expectedResult, bodyString); + } catch (AssertionError e) { + System.out.println("Received response: " + bodyString); + throw e; + } } } } catch (IOException e) { @@ -67,9 +75,9 @@ public static void assertSql(GenericContainer questdbContainer, String expect } } - private static Response executeQuery(GenericContainer questContainer, String query, Endpoint endpoint) throws IOException { + private static Response executeQuery(int port, String query, Endpoint endpoint) throws IOException { String encodedQuery = URLEncoder.encode(query, "UTF-8"); - String baseUrl = "http://" + questContainer.getHost() + ":" + questContainer.getMappedPort(QUESTDB_HTTP_PORT); + String baseUrl = "http://localhost:" + port; Request request = new Request.Builder() .url(baseUrl + "/" + endpoint.endpoint + "?query=" + encodedQuery) .build(); diff --git a/connector/src/test/resources/log4j.properties b/connector/src/test/resources/log4j.properties index ec93932..95c1bac 100644 --- a/connector/src/test/resources/log4j.properties +++ b/connector/src/test/resources/log4j.properties @@ -26,4 +26,4 @@ log4j.logger.org.apache.zookeeper=WARN log4j.logger.kafka=WARN log4j.logger.org.reflections=ERROR log4j.logger.state.change.logger=WARN -log4j.logger.io.questdb.kafka=DEBUG +#log4j.logger.io.questdb.kafka=DEBUG diff --git a/integration-tests/avro-schema-registry/src/test/java/io/questdb/kafka/AvroSchemaRegistryIT.java b/integration-tests/avro-schema-registry/src/test/java/io/questdb/kafka/AvroSchemaRegistryIT.java index 68c17cd..a607b1c 100644 --- a/integration-tests/avro-schema-registry/src/test/java/io/questdb/kafka/AvroSchemaRegistryIT.java +++ b/integration-tests/avro-schema-registry/src/test/java/io/questdb/kafka/AvroSchemaRegistryIT.java @@ -106,9 +106,9 @@ public void testSmoke() throws Exception { } startConnector(topicName); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"timestamp\"\r\n" + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"timestamp\"\r\n" + "\"John\",\"Doe\",\"2000-01-01T00:00:00.000000Z\"\r\n", - "select * from " + topicName); + "select * from " + topicName, questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT)); } @Test @@ -124,9 +124,9 @@ public void testSchemaEvolution() throws Exception { } startConnector(topicName); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"timestamp\"\r\n" + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"timestamp\"\r\n" + "\"John\",\"Doe\",\"2000-01-01T00:00:00.000000Z\"\r\n", - "select * from " + topicName); + "select * from " + topicName, questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT)); try (Producer producer = new KafkaProducer<>(producerProps())) { Schema schema = new org.apache.avro.Schema.Parser().parse(getClass().getResourceAsStream("/avro-runtime/StudentWithExtraColumn.avsc")); @@ -137,10 +137,10 @@ public void testSchemaEvolution() throws Exception { student.put("active", true); producer.send(new ProducerRecord<>(topicName, "foo", student)).get(); } - QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"timestamp\",\"active\"\r\n" + QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"timestamp\",\"active\"\r\n" + "\"John\",\"Doe\",\"2000-01-01T00:00:00.000000Z\",false\r\n" + "\"Mary\",\"Doe\",\"2005-01-01T00:00:00.000000Z\",true\r\n", - "select * from " + topicName); + "select * from " + topicName, questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT)); } private void startConnector(String topicName) { diff --git a/integration-tests/commons/pom.xml b/integration-tests/commons/pom.xml index 5cee86b..aa2715a 100644 --- a/integration-tests/commons/pom.xml +++ b/integration-tests/commons/pom.xml @@ -20,5 +20,9 @@ org.testcontainers junit-jupiter + + org.junit.jupiter + junit-jupiter-api + \ No newline at end of file diff --git a/integration-tests/commons/src/main/java/io/questdb/kafka/JarResolverExtension.java b/integration-tests/commons/src/main/java/io/questdb/kafka/JarResolverExtension.java index 79577fc..b3ffe21 100644 --- a/integration-tests/commons/src/main/java/io/questdb/kafka/JarResolverExtension.java +++ b/integration-tests/commons/src/main/java/io/questdb/kafka/JarResolverExtension.java @@ -37,7 +37,7 @@ public String getJarPath() { } if (resource.getProtocol().equals("file")) { String pathString = resource.getPath(); - return buildJarFromSinglingTarget(pathString); + return buildJarFromSiblingTargetDir(pathString); } else if (resource.getProtocol().equals("jar")) { return getPathToJarWithClass(clazz); } @@ -52,7 +52,7 @@ private static String getPathToJarWithClass(Class clazz) { return path.toString(); } - private String buildJarFromSinglingTarget(String pathString) { + private String buildJarFromSiblingTargetDir(String pathString) { try { tempDir = Files.createTempDirectory("jar-resolver-tmp"); } catch (IOException e) { diff --git a/integration-tests/cp-server/src/test/java/io/questdb/kafka/QuestDBSinkConnectorIT.java b/integration-tests/cp-server/src/test/java/io/questdb/kafka/QuestDBSinkConnectorIT.java index 17dc386..7e8e7a8 100644 --- a/integration-tests/cp-server/src/test/java/io/questdb/kafka/QuestDBSinkConnectorIT.java +++ b/integration-tests/cp-server/src/test/java/io/questdb/kafka/QuestDBSinkConnectorIT.java @@ -97,8 +97,8 @@ public void test() throws Exception { connectContainer.registerConnector("my-connector", connector); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"key\",\"value\"\r\n" - + "\"foo\",\"bar\"\r\n", "select key, value from " + topicName); + QuestDBUtils.assertSqlEventually( "\"key\",\"value\"\r\n" + + "\"foo\",\"bar\"\r\n", "select key, value from " + topicName, questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT)); } } diff --git a/integration-tests/debezium/src/test/java/kafka/DebeziumIT.java b/integration-tests/debezium/src/test/java/kafka/DebeziumIT.java index f5b5a79..67bf8fc 100644 --- a/integration-tests/debezium/src/test/java/kafka/DebeziumIT.java +++ b/integration-tests/debezium/src/test/java/kafka/DebeziumIT.java @@ -3,15 +3,9 @@ import io.debezium.testing.testcontainers.ConnectorConfiguration; import io.debezium.testing.testcontainers.DebeziumContainer; import io.questdb.client.Sender; -import io.questdb.kafka.QuestDBSinkConnector; -import io.questdb.kafka.QuestDBSinkConnectorConfig; -import io.questdb.kafka.QuestDBSinkTask; -import io.questdb.kafka.QuestDBUtils; -import io.questdb.kafka.JarResolverExtension; +import io.questdb.kafka.*; import org.apache.kafka.connect.json.JsonConverter; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.LoggerFactory; @@ -22,21 +16,11 @@ import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.lifecycle.Startables; import org.testcontainers.utility.DockerImageName; import org.testcontainers.utility.MountableFile; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; +import java.sql.*; import java.util.concurrent.ThreadLocalRandom; -import java.util.stream.Stream; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.Assert.assertEquals; @Testcontainers public class DebeziumIT { @@ -117,10 +101,11 @@ public void testSmoke() throws Exception { ConnectorConfiguration questSinkConfig = newQuestSinkBaseConfig(questTableName); debeziumContainer.registerConnector(QUESTDB_CONNECTOR_NAME, questSinkConfig); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\"\r\n" + QuestDBUtils.assertSqlEventually( "\"id\",\"title\"\r\n" + "1,\"Learn CDC\"\r\n" + "2,\"Learn Debezium\"\r\n", - "select id, title from " + questTableName); + "select id, title from " + questTableName, + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT)); } } @@ -144,13 +129,14 @@ public void testManyUpdates() throws Exception { statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (3, 'PDB', 1.0)"); statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (4, 'KDB', 1.0)"); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"symbol\",\"price\"\r\n" + QuestDBUtils.assertSqlEventually( "\"id\",\"symbol\",\"price\"\r\n" + "0,\"TDB\",1.0\r\n" + "1,\"QDB\",1.0\r\n" + "2,\"IDB\",1.0\r\n" + "3,\"PDB\",1.0\r\n" + "4,\"KDB\",1.0\r\n", - "select id, symbol, price from " + questTableName); + "select id, symbol, price from " + questTableName, + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT)); try (PreparedStatement preparedStatement = connection.prepareStatement("update " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " set price = ? where id = ?")) { //a bunch of updates @@ -171,18 +157,20 @@ public void testManyUpdates() throws Exception { } // all symbols have the last well-known price - QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"symbol\",\"last_price\"\r\n" + QuestDBUtils.assertSqlEventually("\"id\",\"symbol\",\"last_price\"\r\n" + "0,\"TDB\",42.0\r\n" + "1,\"QDB\",42.0\r\n" + "2,\"IDB\",42.0\r\n" + "3,\"PDB\",42.0\r\n" + "4,\"KDB\",42.0\r\n", - "select id, symbol, last(price) as last_price from " + questTableName); + "select id, symbol, last(price) as last_price from " + questTableName, + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT)); // total number of rows is equal to the number of updates and inserts - QuestDBUtils.assertSqlEventually(questDBContainer, "\"count\"\r\n" + QuestDBUtils.assertSqlEventually("\"count\"\r\n" + "200010\r\n", - "select count() from " + questTableName); + "select count() from " + questTableName, + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT)); } } @@ -202,17 +190,19 @@ public void testSchemaChange() throws Exception { ConnectorConfiguration questSink = newQuestSinkBaseConfig(questTableName); debeziumContainer.registerConnector(QUESTDB_CONNECTOR_NAME, questSink); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\"\r\n" + QuestDBUtils.assertSqlEventually( "\"id\",\"title\"\r\n" + "1,\"Learn CDC\"\r\n", - "select id, title from " + questTableName); + "select id, title from " + questTableName, + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT)); statement.execute("alter table " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " add column description varchar(255)"); statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (2, 'Learn Debezium', 'Best book ever')"); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\",\"description\"\r\n" + QuestDBUtils.assertSqlEventually("\"id\",\"title\",\"description\"\r\n" + "1,\"Learn CDC\",\r\n" + "2,\"Learn Debezium\",\"Best book ever\"\r\n", - "select id, title, description from " + questTableName); + "select id, title, description from " + questTableName, + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT)); } } @@ -231,16 +221,18 @@ public void testUpdatesChange() throws Exception { ConnectorConfiguration questSink = newQuestSinkBaseConfig(questTableName); debeziumContainer.registerConnector(QUESTDB_CONNECTOR_NAME, questSink); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\"\r\n" + QuestDBUtils.assertSqlEventually("\"id\",\"title\"\r\n" + "1,\"Learn CDC\"\r\n", - "select id, title from " + questTableName); + "select id, title from " + questTableName, + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT)); statement.executeUpdate("update " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " set title = 'Learn Debezium' where id = 1"); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\"\r\n" + QuestDBUtils.assertSqlEventually( "\"id\",\"title\"\r\n" + "1,\"Learn CDC\"\r\n" + "1,\"Learn Debezium\"\r\n", - "select id, title from " + questTableName); + "select id, title from " + questTableName, + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT)); } } @@ -259,17 +251,19 @@ public void testInsertThenDeleteThenInsertAgain() throws Exception { ConnectorConfiguration questSink = newQuestSinkBaseConfig(questTableName); debeziumContainer.registerConnector(QUESTDB_CONNECTOR_NAME, questSink); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\"\r\n" + QuestDBUtils.assertSqlEventually( "\"id\",\"title\"\r\n" + "1,\"Learn CDC\"\r\n", - "select id, title from " + questTableName); + "select id, title from " + questTableName, + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT)); statement.execute("delete from " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " where id = 1"); statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (1, 'Learn Debezium')"); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\"\r\n" + QuestDBUtils.assertSqlEventually( "\"id\",\"title\"\r\n" + "1,\"Learn CDC\"\r\n" + "1,\"Learn Debezium\"\r\n", - "select id, title from " + questTableName); + "select id, title from " + questTableName, + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT)); } } @@ -288,9 +282,10 @@ public void testEventTime() throws SQLException { questSink.with(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "created_at"); debeziumContainer.registerConnector(QUESTDB_CONNECTOR_NAME, questSink); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\",\"timestamp\"\r\n" + QuestDBUtils.assertSqlEventually( "\"id\",\"title\",\"timestamp\"\r\n" + "1,\"Learn CDC\",\"2021-01-02T01:02:03.456000Z\"\r\n", - "select id, title, timestamp from " + questTableName); + "select id, title, timestamp from " + questTableName, + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT)); } } @@ -309,9 +304,10 @@ public void testEventTimeMicros() throws SQLException { questSink.with(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "created_at"); debeziumContainer.registerConnector(QUESTDB_CONNECTOR_NAME, questSink); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\",\"timestamp\"\r\n" + QuestDBUtils.assertSqlEventually( "\"id\",\"title\",\"timestamp\"\r\n" + "1,\"Learn CDC\",\"2021-01-02T01:02:03.123456Z\"\r\n", - "select id, title, timestamp from " + questTableName); + "select id, title, timestamp from " + questTableName, + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT)); } } @@ -330,9 +326,10 @@ public void testEventTimeNanos() throws SQLException { questSink.with(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "created_at"); debeziumContainer.registerConnector(QUESTDB_CONNECTOR_NAME, questSink); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\",\"timestamp\"\r\n" + QuestDBUtils.assertSqlEventually( "\"id\",\"title\",\"timestamp\"\r\n" + "1,\"Learn CDC\",\"2021-01-02T01:02:03.123457Z\"\r\n", - "select id, title, timestamp from " + questTableName); + "select id, title, timestamp from " + questTableName, + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT)); } } @@ -350,9 +347,10 @@ public void testNonDesignatedTimestamp() throws SQLException { ConnectorConfiguration questSink = newQuestSinkBaseConfig(questTableName); debeziumContainer.registerConnector(QUESTDB_CONNECTOR_NAME, questSink); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\",\"created_at\"\r\n" + QuestDBUtils.assertSqlEventually( "\"id\",\"title\",\"created_at\"\r\n" + "1,\"Learn CDC\",\"2021-01-02T01:02:03.456000Z\"\r\n", - "select id, title, created_at from " + questTableName); + "select id, title, created_at from " + questTableName, + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT)); } } @@ -370,9 +368,10 @@ public void testDate() throws SQLException { ConnectorConfiguration questSink = newQuestSinkBaseConfig(questTableName); debeziumContainer.registerConnector(QUESTDB_CONNECTOR_NAME, questSink); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\",\"created_at\"\r\n" + QuestDBUtils.assertSqlEventually( "\"id\",\"title\",\"created_at\"\r\n" + "1,\"Learn CDC\",\"2021-01-02T00:00:00.000000Z\"\r\n", - "select id, title, created_at from " + questTableName); + "select id, title, created_at from " + questTableName, + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT)); } } @@ -391,18 +390,20 @@ public void testDelete() throws SQLException { questSink.with(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "created_at"); debeziumContainer.registerConnector(QUESTDB_CONNECTOR_NAME, questSink); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\",\"timestamp\"\r\n" + QuestDBUtils.assertSqlEventually( "\"id\",\"title\",\"timestamp\"\r\n" + "1,\"Learn CDC\",\"2021-01-02T00:00:00.000000Z\"\r\n", - "select * from " + questTableName); + "select * from " + questTableName, + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT)); // delete should be ignored by QuestDB statement.execute("delete from " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " where id = 1"); statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (2, 'Learn Debezium', '2021-01-03')"); - QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\",\"timestamp\"\r\n" + QuestDBUtils.assertSqlEventually( "\"id\",\"title\",\"timestamp\"\r\n" + "1,\"Learn CDC\",\"2021-01-02T00:00:00.000000Z\"\r\n" + "2,\"Learn Debezium\",\"2021-01-03T00:00:00.000000Z\"\r\n", - "select * from " + questTableName); + "select * from " + questTableName, + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT)); } } diff --git a/integration-tests/exactlyonce/pom.xml b/integration-tests/exactlyonce/pom.xml new file mode 100644 index 0000000..06c6b58 --- /dev/null +++ b/integration-tests/exactlyonce/pom.xml @@ -0,0 +1,72 @@ + + + 4.0.0 + + org.questdb + kafka-integration-tests + 0.10-SNAPSHOT + + kafka-it-exactlyonce + + + + + org.questdb + kafka-questdb-connector + ${project.version} + test + tests + + + org.questdb + kafka-questdb-connector + ${project.version} + test + + + org.apache.kafka + connect-api + ${kafka.version} + test + + + org.testcontainers + testcontainers + test + + + org.testcontainers + kafka + test + + + org.testcontainers + junit-jupiter + test + + + io.debezium + debezium-testing-testcontainers + 1.9.5.Final + test + + + org.slf4j + slf4j-api + provided + + + org.slf4j + slf4j-log4j12 + test + + + org.questdb + kafka-it-common + ${project.version} + + + + \ No newline at end of file diff --git a/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java b/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java new file mode 100644 index 0000000..08c8d45 --- /dev/null +++ b/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java @@ -0,0 +1,320 @@ +package io.questdb.kafka; + +import io.questdb.client.Sender; +import io.questdb.std.Os; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.CleanupMode; +import org.junit.jupiter.api.io.TempDir; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.FixedHostPortGenericContainer; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; +import org.testcontainers.lifecycle.Startable; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.MountableFile; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Instant; +import java.util.*; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Stream; + +import static java.time.Duration.ofMinutes; +import static java.time.Duration.ofSeconds; + +public class ExactlyOnceIT { + private static final int VICTIM_QUESTDB = 0; + private static final int VICTIM_CONNECT = 1; + private static final int VICTIM_KAFKA = 2; + private static final int VICTIMS_TOTAL = VICTIM_KAFKA + 1; + + private static final DockerImageName KAFKA_CONTAINER_IMAGE = DockerImageName.parse("confluentinc/cp-kafka:7.5.1"); + private static final DockerImageName ZOOKEEPER_CONTAINER_IMAGE = DockerImageName.parse("confluentinc/cp-zookeeper:7.5.1"); + private static final DockerImageName CONNECT_CONTAINER_IMAGE = DockerImageName.parse("confluentinc/cp-kafka-connect:7.5.1"); + private static final DockerImageName QUESTDB_CONTAINER_IMAGE = DockerImageName.parse("questdb/questdb:7.3.3"); + private static final int KAFKA_CLUSTER_SIZE = 3; + private static final int CONNECT_CLUSTER_SIZE = 2; + + @TempDir(cleanup = CleanupMode.NEVER) + static Path persistence; + + // we need to locate JARs with QuestDB client and Kafka Connect Connector, + // this is later used to copy to the Kafka Connect container + @RegisterExtension + public static JarResolverExtension connectorJarResolver = JarResolverExtension.forClass(QuestDBSinkTask.class); + @RegisterExtension + public static JarResolverExtension questdbClientJarResolver = JarResolverExtension.forClass(Sender.class); + + private final static Network network = Network.newNetwork(); + + private static GenericContainer zookeeper; + private static KafkaContainer[] kafkas = new KafkaContainer[KAFKA_CLUSTER_SIZE]; + private static GenericContainer[] connects = new GenericContainer[CONNECT_CLUSTER_SIZE]; + private static GenericContainer questdb; + + private static int questHttpPort; + + @BeforeAll + public static void createContainers() { + zookeeper = newZookeeperContainer(); + questdb = newQuestDBContainer(); + for (int i = 0; i < KAFKA_CLUSTER_SIZE; i++) { + kafkas[i] = newKafkaContainer(i); + } + for (int i = 0; i < CONNECT_CLUSTER_SIZE; i++) { + connects[i] = newConnectContainer(i); + } + + Stream containers = Stream.concat( + Stream.concat( + Stream.of(kafkas), Stream.of(connects) + ), + Stream.of(zookeeper, questdb) + ); + Startables.deepStart(containers).join(); + questHttpPort = questdb.getMappedPort(9000); + } + + @AfterAll + public static void stopContainer() { + questdb.stop(); + Stream.of(kafkas).forEach(KafkaContainer::stop); + Stream.of(connects).forEach(GenericContainer::stop); + zookeeper.stop(); + + io.questdb.std.Files.rmdir(io.questdb.std.str.Path.getThreadLocal(persistence.toAbsolutePath().toString())); + } + + private static GenericContainer newZookeeperContainer() { + return new GenericContainer<>(ZOOKEEPER_CONTAINER_IMAGE) + .withNetwork(network) + .withNetworkAliases("zookeeper") + .withEnv("ZOOKEEPER_CLIENT_PORT", "2181") + .withEnv("ZOOKEEPER_TICK_TIME", "300") + .withEnv("ZOOKEEPER_INIT_LIMIT", "10") + .withEnv("ZOOKEEPER_SYNC_LIMIT", "5") + .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("zookeeper"))); + } + + private static GenericContainer newQuestDBContainer() { + + Path dbRoot; + try { + dbRoot = Files.createDirectories(persistence.resolve("questdb")); + } catch (IOException e) { + throw new RuntimeException(e); + } + FixedHostPortGenericContainer container = new FixedHostPortGenericContainer<>(QUESTDB_CONTAINER_IMAGE.asCanonicalNameString()) + .withNetwork(network) +// .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("questdb"))) + .withFileSystemBind(dbRoot.toAbsolutePath().toString(), "/var/lib/questdb") + .withCreateContainerCmdModifier(cmd -> cmd.withHostName("questdb")); + + if (questHttpPort == 0) { + container = container.withExposedPorts(9000); + } else { + container.withFixedExposedPort(questHttpPort, 9000); + } + return container; + } + + private static KafkaContainer newKafkaContainer(int id) { + Path kafkaData; + try { + kafkaData = Files.createDirectories(persistence.resolve("kafka").resolve("data" + id)); + } catch (IOException e) { + throw new RuntimeException(e); + } + + return new KafkaContainer(KAFKA_CONTAINER_IMAGE) + .withNetwork(network) + .dependsOn(zookeeper) + .withExternalZookeeper("zookeeper:2181") + .withEnv("KAFKA_BROKER_ID", String.valueOf(id)) + .withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "3") + .withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "3") + .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "3") + .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "2") + .withEnv("KAFKA_NUM_PARTITIONS", "3") + .withFileSystemBind(kafkaData.toAbsolutePath().toString(), "/var/lib/kafka/data") + .withCreateContainerCmdModifier(cmd -> cmd.withHostName("kafka" + id)) + .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("kafka" + id))); + } + + private static GenericContainer newConnectContainer(int id) { + List dependencies = new ArrayList<>(Arrays.asList(kafkas)); + dependencies.add(questdb); + + return new GenericContainer<>(CONNECT_CONTAINER_IMAGE) + .withEnv("CONNECT_BOOTSTRAP_SERVERS", "kafka0:9092") + .withEnv("CONNECT_GROUP_ID", "test") + .withEnv("CONNECT_OFFSET_STORAGE_TOPIC", "connect-storage-topic") + .withEnv("CONNECT_CONFIG_STORAGE_TOPIC", "connect-config-topic") + .withEnv("CONNECT_STATUS_STORAGE_TOPIC", "connect-status-topic") + .withEnv("CONNECT_KEY_CONVERTER", "org.apache.kafka.connect.storage.StringConverter") + .withEnv("CONNECT_VALUE_CONVERTER", "org.apache.kafka.connect.json.JsonConverter") + .withEnv("CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE", "false") + .withEnv("CONNECT_REST_ADVERTISED_HOST_NAME", "connect" + id) + .withNetwork(network) + .withExposedPorts(8083) + .withCopyFileToContainer(MountableFile.forHostPath(connectorJarResolver.getJarPath()), "/usr/share/java/kafka/questdb-connector.jar") + .withCopyFileToContainer(MountableFile.forHostPath(questdbClientJarResolver.getJarPath()), "/usr/share/java/kafka/questdb.jar") + .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("connect" + id))) + .dependsOn(dependencies) + .withCreateContainerCmdModifier(cmd -> cmd.withHostName("connect" + id)) + .waitingFor(new HttpWaitStrategy() + .forPath("/connectors") + .forStatusCode(200) + .forPort(8083) + .withStartupTimeout(ofMinutes(5))); + } + + @Test + public void test() throws Exception { + String topicName = "mytopic"; + + Properties props = new Properties(); + String bootstrapServers = kafkas[0].getBootstrapServers(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put("include.key", "false"); + + int recordCount = 5_000_000; + new Thread(() -> { + try (Producer producer = new KafkaProducer<>(props)) { + for (int i = 0; i < recordCount; i++ ) { + String json = newPayload(); + producer.send(new ProducerRecord<>(topicName, null, json)); + + // 1% chance of duplicates - we want them to be also deduped by QuestDB + if (ThreadLocalRandom.current().nextInt(100) == 0) { + producer.send(new ProducerRecord<>(topicName, null, json)); + } + } + } + }).start(); + + QuestDBUtils.assertSql( + "{\"ddl\":\"OK\"}", + "CREATE TABLE " + topicName + " (ts TIMESTAMP, id UUID, val LONG) timestamp(ts) PARTITION BY DAY WAL DEDUP UPSERT KEYS(ts, id);", + questdb.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT), + QuestDBUtils.Endpoint.EXEC); + + startConnector(); + + CyclicBarrier barrier = new CyclicBarrier(2); + startKillingRandomContainers(barrier); + + // make sure we have exactly the expected records in QuestDB + QuestDBUtils.assertSqlEventually( + "\"count\"\r\n" + + recordCount + "\r\n", + "select count(*) from " + topicName, + 600, + questHttpPort); + + barrier.await(); + } + + @NotNull + private static String newPayload() { + Instant now = Instant.now(); + long nanoTs = now.getEpochSecond() * 1_000_000_000 + now.getNano(); + UUID uuid = UUID.randomUUID(); + int val = ThreadLocalRandom.current().nextInt(100); + + return "{\"ts\":" + nanoTs + ",\"id\":\"" + uuid + "\",\"val\":" + val + "}"; + } + + private static void startKillingRandomContainers(CyclicBarrier barrier) { + new Thread(() -> { + while (barrier.getNumberWaiting() == 0) { // keep killing them until the checker thread passed the assertion + Os.sleep(ThreadLocalRandom.current().nextInt(5_000, 30_000)); + int victim = ThreadLocalRandom.current().nextInt(VICTIMS_TOTAL); + switch (victim) { + case VICTIM_QUESTDB: { + questdb.stop(); + GenericContainer container = newQuestDBContainer(); + container.start(); + questdb = container; + break; + } + case VICTIM_CONNECT: { + int n = ThreadLocalRandom.current().nextInt(connects.length); + connects[n].stop(); + GenericContainer container = newConnectContainer(n); + container.start(); + connects[n] = container; + break; + } + case VICTIM_KAFKA: { + int n = ThreadLocalRandom.current().nextInt(kafkas.length); + kafkas[n].stop(); + KafkaContainer container = newKafkaContainer(n); + Os.sleep(5000); // wait for zookeeper to detect the previous kafka container was stopped + container.start(); + kafkas[n] = container; + break; + } + } + } + + try { + barrier.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (BrokenBarrierException e) { + throw new RuntimeException(e); + } + }).start(); + } + + private static void startConnector() throws IOException, InterruptedException, URISyntaxException { + String payload = "{\"name\":\"my-connector\",\"config\":{" + + "\"tasks.max\":\"4\"," + + "\"connector.class\":\"io.questdb.kafka.QuestDBSinkConnector\"," + + "\"key.converter\":\"org.apache.kafka.connect.storage.StringConverter\"," + + "\"value.converter\":\"org.apache.kafka.connect.json.JsonConverter\"," + + "\"topics\":\"mytopic\"," + + "\"value.converter.schemas.enable\":\"false\"," + + "\"dedup.rewind.offset\":\"150000\"," + + "\"timestamp.field.name\":\"ts\"," + + "\"host\":\"questdb:9009\"}" + + "}"; + + HttpResponse response = HttpClient.newBuilder().connectTimeout(ofSeconds(10)).build().send( + HttpRequest.newBuilder().POST(HttpRequest.BodyPublishers.ofString(payload)) + .uri(new URI("http://localhost:" + connects[0].getMappedPort(8083) + "/connectors")) + .header("Content-Type", "application/json") + .build(), + HttpResponse.BodyHandlers.ofString() + ); + if (response.statusCode() != 201) { + throw new RuntimeException("Failed to create connector: " + response.body()); + } + } +} diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 8bff927..08efd86 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -13,6 +13,7 @@ debezium commons avro-schema-registry + exactlyonce pom @@ -26,6 +27,7 @@ integration-test + verify diff --git a/pom.xml b/pom.xml index 0e7ba29..c2eef24 100644 --- a/pom.xml +++ b/pom.xml @@ -20,7 +20,7 @@ 2.13 3.3.1 5.9.0 - 1.17.3 + 1.19.1 1.7.36 4.1.0 4.10.0