From e3792cead878e501504ec6ae4ba3755d15a31db3 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Fri, 5 Apr 2024 12:15:09 +0200 Subject: [PATCH] feat: support for HTTP transport (#13) QuestDB ILP client supports HTTP transport in versions 7.4.0+ thus the client dependency version is updated too. Configuration The connector now supports client configuration strings as documented in the client documentation. Set the client configuration string under the key client.conf.string. Example: ```json { "name": "my-connector", "config": { "tasks.max": "4", "connector.class": "io.questdb.kafka.QuestDBSinkConnector", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "topics": "mytopic", "value.converter.schemas.enable": "false", "timestamp.field.name": "ts", "timestamp.units": "nanos", "client.conf.string": "http::addr=questdb:9000;auto_flush_rows=10000;auto_flush_interval=1200000;retry_timeout=60000;" } } ``` The old client configuration keys (host, username, token, tls, etc) are considered deprecated and should not be used. They cannot be used together with the client configuration string. Kafka-specific configuration options (timestamp.units, topics, etc.) are not part of the client configuration string and still have their own keys. See the example above. Advantages of the HTTP transport The HTTP transport implements proper error reporting, automatic retries for recoverable errors and simpler configuration. When combined with QuestDB server deduplications it allows Exactly-Once semantics. Known Issues Due to a bug in QuestDB client 7.4.0 it's recommended to disable interval-based flushes. See questdb/questdb#4372 for details. This will be resolved in the next version. --- .../io/questdb/kafka/BufferingSender.java | 37 +- .../io/questdb/kafka/EmptyOffsetTracker.java | 35 -- .../io/questdb/kafka/MultiOffsetTracker.java | 122 ------- .../java/io/questdb/kafka/OffsetTracker.java | 20 -- .../questdb/kafka/QuestDBSinkConnector.java | 41 ++- .../kafka/QuestDBSinkConnectorConfig.java | 22 +- .../io/questdb/kafka/QuestDBSinkTask.java | 123 ++++--- .../io/questdb/kafka/ConnectTestUtils.java | 15 +- .../kafka/QuestDBSinkConnectorConfigTest.java | 42 ++- .../QuestDBSinkConnectorEmbeddedAuthTest.java | 2 +- .../QuestDBSinkConnectorEmbeddedTest.java | 338 +++++++++++------- .../java/io/questdb/kafka/QuestDBUtils.java | 2 +- .../io/questdb/kafka/TimestampHelperTest.java | 5 + .../questdb/kafka/AvroSchemaRegistryIT.java | 18 +- .../questdb/kafka/QuestDBSinkConnectorIT.java | 21 +- .../src/test/java/kafka/DebeziumIT.java | 19 +- integration-tests/exactlyonce/pom.xml | 5 + .../java/io/questdb/kafka/ExactlyOnceIT.java | 81 ++--- .../confluent-docker-images/Dockerfile | 4 +- .../docker-compose.yaml | 2 +- .../confluent-docker-images/readme.md | 2 +- .../faker/Dockerfile-Connect | 2 +- .../faker/docker-compose.yml | 4 +- .../stocks/docker-compose.yml | 2 +- pom.xml | 2 +- 25 files changed, 497 insertions(+), 469 deletions(-) delete mode 100644 connector/src/main/java/io/questdb/kafka/EmptyOffsetTracker.java delete mode 100644 connector/src/main/java/io/questdb/kafka/MultiOffsetTracker.java delete mode 100644 connector/src/main/java/io/questdb/kafka/OffsetTracker.java diff --git a/connector/src/main/java/io/questdb/kafka/BufferingSender.java b/connector/src/main/java/io/questdb/kafka/BufferingSender.java index b2d7819..08a84b1 100644 --- a/connector/src/main/java/io/questdb/kafka/BufferingSender.java +++ b/connector/src/main/java/io/questdb/kafka/BufferingSender.java @@ -4,6 +4,8 @@ import io.questdb.std.BoolList; import io.questdb.std.LongList; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -96,17 +98,22 @@ public Sender boolColumn(CharSequence name, boolean value) { } @Override - public Sender timestampColumn(CharSequence name, long value) { + public Sender timestampColumn(CharSequence name, long value, ChronoUnit unit) { if (symbolColumns.contains(name)) { symbolColumnNames.add(name); symbolColumnValues.add(String.valueOf(value)); } else { timestampNames.add(name); - timestampValues.add(value); + timestampValues.add(unitToMicros(value, unit)); } return this; } + @Override + public Sender timestampColumn(CharSequence charSequence, Instant instant) { + throw new UnsupportedOperationException("Not implemented"); + } + @Override public Sender symbol(CharSequence name, CharSequence value) { symbolColumnNames.add(name); @@ -164,16 +171,36 @@ private void transferFields() { for (int i = 0, n = timestampNames.size(); i < n; i++) { CharSequence fieldName = timestampNames.get(i); long fieldValue = timestampValues.get(i); - sender.timestampColumn(fieldName, fieldValue); + sender.timestampColumn(fieldName, fieldValue, ChronoUnit.MICROS); } timestampNames.clear(); timestampValues.clear(); } + private static long unitToMicros(long value, ChronoUnit unit) { + switch (unit) { + case NANOS: + return value / 1000L; + case MICROS: + return value; + case MILLIS: + return value * 1000L; + case SECONDS: + return value * 1_000_000L; + default: + throw new IllegalArgumentException("Unsupported unit: " + unit); + } + } + @Override - public void at(long timestamp) { + public void at(long timestamp, ChronoUnit unit) { transferFields(); - sender.at(timestamp); + sender.at(timestamp, unit); + } + + @Override + public void at(Instant instant) { + throw new UnsupportedOperationException(); } @Override diff --git a/connector/src/main/java/io/questdb/kafka/EmptyOffsetTracker.java b/connector/src/main/java/io/questdb/kafka/EmptyOffsetTracker.java deleted file mode 100644 index aef8315..0000000 --- a/connector/src/main/java/io/questdb/kafka/EmptyOffsetTracker.java +++ /dev/null @@ -1,35 +0,0 @@ -package io.questdb.kafka; - -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.connect.sink.SinkTaskContext; - -import java.util.Collection; -import java.util.Map; - -public final class EmptyOffsetTracker implements OffsetTracker { - @Override - public void onPartitionsOpened(Collection partitions) { - - } - - @Override - public void onPartitionsClosed(Collection partitions) { - - } - - @Override - public void onObservedOffset(int partition, String topic, long offset) { - - } - - @Override - public void configureSafeOffsets(SinkTaskContext sinkTaskContext, long rewindOffset) { - assert rewindOffset == 0; - } - - @Override - public void transformPreCommit(Map currentOffsets, long rewindOffset) { - assert rewindOffset == 0; - } -} diff --git a/connector/src/main/java/io/questdb/kafka/MultiOffsetTracker.java b/connector/src/main/java/io/questdb/kafka/MultiOffsetTracker.java deleted file mode 100644 index b9dfe76..0000000 --- a/connector/src/main/java/io/questdb/kafka/MultiOffsetTracker.java +++ /dev/null @@ -1,122 +0,0 @@ -package io.questdb.kafka; - -import io.questdb.std.CharSequenceObjHashMap; -import io.questdb.std.LongList; -import io.questdb.std.ObjList; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.connect.sink.SinkTaskContext; - -import java.util.Collection; -import java.util.Map; - -public final class MultiOffsetTracker implements OffsetTracker { - private static final int EMPTY = -1; - private static final int CLOSED = -2; - - private final CharSequenceObjHashMap offsets = new CharSequenceObjHashMap<>(); - - private String lastTopicCache; - private LongList lastTopicOffsetsCache; - - @Override - public void onPartitionsOpened(Collection partitions) { - for (TopicPartition partition : partitions) { - String topic = partition.topic(); - LongList topicOffsets = offsets.get(topic); - if (topicOffsets == null) { - topicOffsets = new LongList(4); - offsets.put(topic, topicOffsets); - } - - int partitionId = partition.partition(); - int currentSize = topicOffsets.size(); - if (currentSize <= partitionId) { - topicOffsets.extendAndSet(partitionId, EMPTY); - if (currentSize != partitionId) { - topicOffsets.fill(currentSize, partitionId, EMPTY); - } - } else if (topicOffsets.get(partitionId) == CLOSED) { - topicOffsets.set(partitionId, EMPTY); - } - } - } - - @Override - public void onPartitionsClosed(Collection partitions) { - for (TopicPartition partition : partitions) { - String topic = partition.topic(); - LongList topicOffsets = offsets.get(topic); - topicOffsets.set(partition.partition(), CLOSED); - } - - // Remove topics that have all partitions closed - ObjList keys = offsets.keys(); - for (int i = 0, n = keys.size(); i < n; i++) { - CharSequence topic = keys.getQuick(i); - LongList topicOffsets = offsets.get(topic); - boolean allClosed = true; - for (int partition = 0, m = topicOffsets.size(); partition < m; partition++) { - if (topicOffsets.get(partition) != CLOSED) { - allClosed = false; - break; - } - } - if (allClosed) { - offsets.remove(topic); - } - } - } - - - - @Override - public void onObservedOffset(int partition, String topic, long offset) { - LongList topicOffsets; - - // intentional reference equality check - Kafka Connect use the same String instances - // so we can avoid hash map lookup - if (lastTopicCache == topic) { - topicOffsets = lastTopicOffsetsCache; - } else { - topicOffsets = offsets.get(topic); - lastTopicCache = topic; - lastTopicOffsetsCache = topicOffsets; - } - long maxOffset = topicOffsets.get(partition); - topicOffsets.set(partition, Math.max(maxOffset, offset)); - } - - - @Override - public void configureSafeOffsets(SinkTaskContext sinkTaskContext, long rewindOffset) { - ObjList keys = offsets.keys(); - for (int i = 0, n = keys.size(); i < n; i++) { - CharSequence topic = keys.getQuick(i); - LongList topicOffsets = offsets.get(topic); - for (int partition = 0, m = topicOffsets.size(); partition < m; partition++) { - long offset = topicOffsets.get(partition); - // only rewind if we ever observed an offset for this partition - if (offset >= 0) { - sinkTaskContext.offset(new TopicPartition(topic.toString(), partition), Math.max(0, offset - rewindOffset)); - } - } - } - } - - @Override - public void transformPreCommit(Map currentOffsets, long rewindOffset) { - for (Map.Entry entry : currentOffsets.entrySet()) { - TopicPartition topicPartition = entry.getKey(); - String topic = topicPartition.topic(); - LongList topicOffsets = offsets.get(topic); - long offset = topicOffsets.get(topicPartition.partition()); - - // only transform if we ever observed an offset for this partition - if (offset >= 0) { - long newOffset = Math.max(0, offset - rewindOffset); - entry.setValue(new OffsetAndMetadata(newOffset)); - } - } - } -} diff --git a/connector/src/main/java/io/questdb/kafka/OffsetTracker.java b/connector/src/main/java/io/questdb/kafka/OffsetTracker.java deleted file mode 100644 index a268f4f..0000000 --- a/connector/src/main/java/io/questdb/kafka/OffsetTracker.java +++ /dev/null @@ -1,20 +0,0 @@ -package io.questdb.kafka; - -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.connect.sink.SinkTaskContext; - -import java.util.Collection; -import java.util.Map; - -public interface OffsetTracker { - void onPartitionsOpened(Collection partitions); - - void onPartitionsClosed(Collection partitions); - - void onObservedOffset(int partition, String topic, long offset); - - void configureSafeOffsets(SinkTaskContext sinkTaskContext, long rewindOffset); - - void transformPreCommit(Map currentOffsets, long rewindOffset); -} diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkConnector.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkConnector.java index 66fc4a6..93f6f10 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkConnector.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkConnector.java @@ -49,9 +49,46 @@ public ConfigDef config() { public Config validate(Map connectorConfigs) { String s = connectorConfigs.get(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_KAFKA_NATIVE_CONFIG); if (Boolean.parseBoolean(s) && connectorConfigs.get(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG) != null) { - throw new IllegalArgumentException("Cannot use " + QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG - + " with " + QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_KAFKA_NATIVE_CONFIG +". These options are mutually exclusive."); + throw new IllegalArgumentException("Cannot use '" + QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG + + "' with '" + QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_KAFKA_NATIVE_CONFIG +"'. These options are mutually exclusive."); } + + validateClientConfiguration(connectorConfigs); return super.validate(connectorConfigs); } + + private static void validateClientConfiguration(Map connectorConfigs) { + String host = connectorConfigs.get(QuestDBSinkConnectorConfig.HOST_CONFIG); + String confString = connectorConfigs.get(QuestDBSinkConnectorConfig.CONFIGURATION_STRING_CONFIG); + String envConfString = System.getenv("QDB_CLIENT_CONF"); + + // cannot set client configuration string via both explicit config and environment variable + if (confString != null && envConfString != null) { + throw new IllegalArgumentException("Only one of '" + QuestDBSinkConnectorConfig.CONFIGURATION_STRING_CONFIG + "' or QDB_CLIENT_CONF environment variable must be set. They cannot be used together."); + } + + if (confString == null && envConfString == null) { + if (host == null) { + throw new IllegalArgumentException("Either '" + QuestDBSinkConnectorConfig.CONFIGURATION_STRING_CONFIG + "' or '" + QuestDBSinkConnectorConfig.HOST_CONFIG + "' must be set."); + } + return; // configuration string is not used, nothing else to validate + } + + // configuration string is used, let's validate no other client configuration is set + if (host != null) { + throw new IllegalArgumentException("Only one of '" + QuestDBSinkConnectorConfig.HOST_CONFIG + "' or '" + QuestDBSinkConnectorConfig.CONFIGURATION_STRING_CONFIG + "' must be set."); + } + if (connectorConfigs.get(QuestDBSinkConnectorConfig.TLS) != null) { + throw new IllegalArgumentException("Only one of '" + QuestDBSinkConnectorConfig.TLS + "' or '" + QuestDBSinkConnectorConfig.CONFIGURATION_STRING_CONFIG + "' must be set."); + } + if (connectorConfigs.get(QuestDBSinkConnectorConfig.TLS_VALIDATION_MODE_CONFIG) != null) { + throw new IllegalArgumentException("Only one of '" + QuestDBSinkConnectorConfig.TLS_VALIDATION_MODE_CONFIG + "' or '" + QuestDBSinkConnectorConfig.CONFIGURATION_STRING_CONFIG + "' must be set."); + } + if (connectorConfigs.get(QuestDBSinkConnectorConfig.TOKEN) != null) { + throw new IllegalArgumentException("Only one of '" + QuestDBSinkConnectorConfig.TOKEN + "' or '" + QuestDBSinkConnectorConfig.CONFIGURATION_STRING_CONFIG + "' must be set."); + } + if (connectorConfigs.get(QuestDBSinkConnectorConfig.USERNAME) != null) { + throw new IllegalArgumentException("Only one of '" + QuestDBSinkConnectorConfig.USERNAME + "' or '" + QuestDBSinkConnectorConfig.CONFIGURATION_STRING_CONFIG + "' must be set."); + } + } } diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkConnectorConfig.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkConnectorConfig.java index 0ee98e1..91718f0 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkConnectorConfig.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkConnectorConfig.java @@ -61,6 +61,9 @@ public final class QuestDBSinkConnectorConfig extends AbstractConfig { public static final String TLS_VALIDATION_MODE_CONFIG = "tls.validation.mode"; public static final String TLS_VALIDATION_MODE_DOC = "TLS validation mode. Possible values: default, insecure"; + public static final String CONFIGURATION_STRING_CONFIG = "client.conf.string"; + public static final String CONFIGURATION_STRING_DOC = "Configuration string for QuestDB client"; + public static final String RETRY_BACKOFF_MS = "retry.backoff.ms"; private static final String RETRY_BACKOFF_MS_DOC = "The time in milliseconds to wait following an error before a retry attempt is made"; @@ -70,15 +73,6 @@ public final class QuestDBSinkConnectorConfig extends AbstractConfig { public static final String TIMESTAMP_FORMAT = "timestamp.string.format"; private static final String TIMESTAMP_FORMAT_DOC = "Timestamp format. Used when parsing timestamp string fields"; - public static final String DEDUPLICATION_REWIND_CONFIG = "dedup.rewind.offset"; - private static final String DEDUPLICATION_REWIND_DOC = "Rewind offset for deduplication. " + - "On failure, the connector will rewind the offset by this amount and retry. This is designed to work in concert with QuestDB " + - "deduplication feature. The rewind offset must be greater than or equal to the maximum number of records that can lost in the event of a failure. " + - "If the rewind is too small, some events might be missing from QuestDB. If the rewind is too large, the connector will be slower to recover " + - "as it will have to reprocess a large number of records and QuestDB will have to do extra work with deduplication. If you are testing this " + - "feature for the first time then 150000 is a good starting point."; - - private static final String DEFAULT_TIMESTAMP_FORMAT = "yyyy-MM-ddTHH:mm:ss.SSSUUUZ"; public QuestDBSinkConnectorConfig(ConfigDef config, Map parsedConfig) { @@ -91,7 +85,7 @@ public QuestDBSinkConnectorConfig(Map parsedConfig) { public static ConfigDef conf() { return new ConfigDef() - .define(HOST_CONFIG, Type.STRING, Importance.HIGH, HOST_DOC) + .define(HOST_CONFIG, Type.STRING, null, Importance.HIGH, HOST_DOC) .define(TABLE_CONFIG, Type.STRING, null, TablenameValidator.INSTANCE, Importance.HIGH, TABLE_DOC) .define(KEY_PREFIX_CONFIG, Type.STRING, "key", Importance.MEDIUM, KEY_PREFIX_DOC) .define(VALUE_PREFIX_CONFIG, Type.STRING, "", Importance.MEDIUM, VALUE_PREFIX_DOC) @@ -109,12 +103,12 @@ public static ConfigDef conf() { .define(TIMESTAMP_FORMAT, Type.STRING, DEFAULT_TIMESTAMP_FORMAT, TimestampFormatValidator.INSTANCE, Importance.MEDIUM, TIMESTAMP_FORMAT_DOC) .define(TIMESTAMP_STRING_FIELDS, Type.STRING, null, Importance.MEDIUM, TIMESTAMP_STRING_FIELDS_DOC) .define(DESIGNATED_TIMESTAMP_KAFKA_NATIVE_CONFIG, Type.BOOLEAN, false, Importance.MEDIUM, DESIGNATED_TIMESTAMP_KAFKA_NATIVE_DOC) - .define(DEDUPLICATION_REWIND_CONFIG, Type.LONG, 0, Importance.MEDIUM, DEDUPLICATION_REWIND_DOC) - .define(TLS_VALIDATION_MODE_CONFIG, Type.STRING, "default", ConfigDef.ValidString.in("default", "insecure"), Importance.LOW, TLS_VALIDATION_MODE_DOC); + .define(TLS_VALIDATION_MODE_CONFIG, Type.STRING, "default", ConfigDef.ValidString.in("default", "insecure"), Importance.LOW, TLS_VALIDATION_MODE_DOC) + .define(CONFIGURATION_STRING_CONFIG, Type.PASSWORD, null, Importance.HIGH, CONFIGURATION_STRING_DOC); } - public long getDeduplicationRewindOffset() { - return getLong(DEDUPLICATION_REWIND_CONFIG); + public Password getConfigurationString() { + return getPassword(CONFIGURATION_STRING_CONFIG); } public String getTlsValidationMode() { diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java index 3c59a03..bdff4be 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java @@ -1,13 +1,16 @@ package io.questdb.kafka; import io.questdb.client.Sender; +import io.questdb.cutlass.http.client.HttpClientException; import io.questdb.cutlass.line.LineSenderException; +import io.questdb.cutlass.line.http.LineHttpSender; import io.questdb.std.NumericException; import io.questdb.std.datetime.DateFormat; import io.questdb.std.datetime.microtime.Timestamps; import io.questdb.std.datetime.millitime.DateFormatUtils; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.types.Password; import org.apache.kafka.connect.data.Date; import org.apache.kafka.connect.data.*; import org.apache.kafka.connect.errors.ConnectException; @@ -17,6 +20,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.temporal.ChronoUnit; import java.util.*; import java.util.concurrent.TimeUnit; @@ -37,8 +41,7 @@ public final class QuestDBSinkTask extends SinkTask { private long batchesSinceLastError = 0; private DateFormat dataFormat; private boolean kafkaTimestampsEnabled; - private OffsetTracker tracker = new MultiOffsetTracker(); - private long deduplicationRewindOffset; + private boolean httpTransport; @Override public String version() { @@ -48,13 +51,6 @@ public String version() { @Override public void start(Map map) { this.config = new QuestDBSinkConnectorConfig(map); - this.deduplicationRewindOffset = config.getDeduplicationRewindOffset(); - if (deduplicationRewindOffset == 0) { - tracker = new EmptyOffsetTracker(); - } else { - tracker = new MultiOffsetTracker(); - } - String timestampStringFields = config.getTimestampStringFields(); if (timestampStringFields != null) { stringTimestampColumns = new HashSet<>(); @@ -82,19 +78,21 @@ public void start(Map map) { this.timestampUnits = config.getTimestampUnitsOrNull(); } - @Override - public void open(Collection partitions) { - tracker.onPartitionsOpened(partitions); - } - - @Override - public void close(Collection partitions) { - tracker.onPartitionsClosed(partitions); - } - - private Sender createSender() { + private Sender createRawSender() { log.debug("Creating a new sender"); - Sender.LineSenderBuilder builder = Sender.builder().address(config.getHost()); + Password confStrSecret = config.getConfigurationString(); + String confStr = confStrSecret == null ? null : confStrSecret.value(); + if (confStr == null || confStr.isEmpty()) { + confStr = System.getenv("QDB_CLIENT_CONF"); + } + if (confStr != null && !confStr.isEmpty()) { + log.debug("Using client configuration string"); + Sender s = Sender.fromConfig(confStr); + httpTransport = s instanceof LineHttpSender; + return s; + } + log.debug("Using legacy client configuration"); + Sender.LineSenderBuilder builder = Sender.builder(Sender.Transport.TCP).address(config.getHost()); if (config.isTls()) { builder.enableTls(); if ("insecure".equals(config.getTlsValidationMode())) { @@ -108,7 +106,11 @@ private Sender createSender() { } builder.enableAuth(username).authToken(config.getToken().value()); } - Sender rawSender = builder.build(); + return builder.build(); + } + + private Sender createSender() { + Sender rawSender = createRawSender(); String symbolColumns = config.getSymbolColumns(); if (symbolColumns == null) { log.debug("No symbol columns configured. Using raw sender"); @@ -136,33 +138,40 @@ public void put(Collection collection) { for (SinkRecord record : collection) { handleSingleRecord(record); } - sender.flush(); - log.debug("Successfully sent {} records", collection.size()); - if (++batchesSinceLastError == 10) { - // why 10? why not to reset the retry counter immediately upon a successful flush()? - // there are two reasons for server disconnections: - // 1. infrastructure: the server is down / unreachable / other_infrastructure_issues - // 2. structural: the client is sending bad data (e.g. pushing a string to a double column) - // errors in the latter case are not recoverable. upon receiving bad data the server will *eventually* close the connection, - // after a while, the client will notice that the connection is closed and will try to reconnect - // if we reset the retry counter immediately upon first successful flush() then we end-up in a loop where we flush bad data, - // the server closes the connection, the client reconnects, reset the retry counter, and sends bad data again, etc. - // to avoid this, we only reset the retry counter after a few successful flushes. - log.debug("Successfully sent 10 batches in a row. Resetting retry counter"); - remainingRetries = config.getMaxRetries(); + + if (!httpTransport) { + log.debug("Sending {} records", collection.size()); + sender.flush(); + log.debug("Successfully sent {} records", collection.size()); + if (++batchesSinceLastError == 10) { + // why 10? why not to reset the retry counter immediately upon a successful flush()? + // there are two reasons for server disconnections: + // 1. infrastructure: the server is down / unreachable / other_infrastructure_issues + // 2. structural: the client is sending bad data (e.g. pushing a string to a double column) + // errors in the latter case are not recoverable. upon receiving bad data the server will *eventually* close the connection, + // after a while, the client will notice that the connection is closed and will try to reconnect + // if we reset the retry counter immediately upon first successful flush() then we end-up in a loop where we flush bad data, + // the server closes the connection, the client reconnects, reset the retry counter, and sends bad data again, etc. + // to avoid this, we only reset the retry counter after a few successful flushes. + log.debug("Successfully sent 10 batches in a row. Resetting retry counter"); + remainingRetries = config.getMaxRetries(); + } } - } catch (LineSenderException e) { + } catch (LineSenderException | HttpClientException e) { onSenderException(e); } } - private void onSenderException(LineSenderException e) { + private void onSenderException(Exception e) { + if (httpTransport) { + throw new ConnectException("Failed to send data to QuestDB", e); + } + batchesSinceLastError = 0; if (--remainingRetries > 0) { closeSenderSilently(); sender = null; log.debug("Sender exception, retrying in {} ms", config.getRetryBackoffMs()); - tracker.configureSafeOffsets(context, deduplicationRewindOffset); context.timeout(config.getRetryBackoffMs()); throw new RetriableException(e); } else { @@ -170,13 +179,6 @@ private void onSenderException(LineSenderException e) { } } - - @Override - public Map preCommit(Map currentOffsets) { - tracker.transformPreCommit(currentOffsets, deduplicationRewindOffset); - return currentOffsets; - } - private void closeSenderSilently() { try { if (sender != null) { @@ -190,8 +192,6 @@ private void closeSenderSilently() { private void handleSingleRecord(SinkRecord record) { assert timestampColumnValue == Long.MIN_VALUE; - tracker.onObservedOffset(record.kafkaPartition(), record.topic(), record.kafkaOffset()); - String explicitTable = config.getTable(); String tableName = explicitTable == null ? record.topic() : explicitTable; sender.table(tableName); @@ -208,7 +208,7 @@ private void handleSingleRecord(SinkRecord record) { sender.atNow(); } else { try { - sender.at(timestampColumnValue); + sender.at(timestampColumnValue, ChronoUnit.NANOS); } finally { timestampColumnValue = Long.MIN_VALUE; } @@ -310,7 +310,7 @@ private void writePhysicalTypeWithoutSchema(String name, Object value, String fa String stringVal = (String) value; if (stringTimestampColumns.contains(actualName)) { long timestamp = parseToMicros(stringVal); - sender.timestampColumn(actualName, timestamp); + sender.timestampColumn(actualName, timestamp, ChronoUnit.MICROS); } else { sender.stringColumn(actualName, stringVal); } @@ -336,7 +336,7 @@ private void writePhysicalTypeWithoutSchema(String name, Object value, String fa handleMap(name, (Map) value, fallbackName); } else if (value instanceof java.util.Date) { long epochMillis = ((java.util.Date) value).getTime(); - sender.timestampColumn(actualName, TimeUnit.MILLISECONDS.toMicros(epochMillis)); + sender.timestampColumn(actualName, TimeUnit.MILLISECONDS.toMicros(epochMillis), ChronoUnit.MICROS); } else { onUnsupportedType(actualName, value.getClass().getName()); } @@ -344,7 +344,7 @@ private void writePhysicalTypeWithoutSchema(String name, Object value, String fa private long parseToMicros(String timestamp) { try { - return dataFormat.parse(timestamp, DateFormatUtils.enLocale); + return dataFormat.parse(timestamp, DateFormatUtils.EN_LOCALE); } catch (NumericException e) { throw new ConnectException("Cannot parse timestamp: " + timestamp + " with the configured format '" + config.getTimestampFormat() +"' use '" + QuestDBSinkConnectorConfig.TIMESTAMP_FORMAT + "' to configure the right timestamp format. " + @@ -385,7 +385,7 @@ private boolean tryWritePhysicalTypeFromSchema(String name, Schema schema, Objec String s = (String) value; if (stringTimestampColumns.contains(primitiveTypesName)) { long timestamp = parseToMicros(s); - sender.timestampColumn(sanitizedName, timestamp); + sender.timestampColumn(sanitizedName, timestamp, ChronoUnit.MICROS); } else { sender.stringColumn(sanitizedName, s); } @@ -417,18 +417,18 @@ private boolean tryWriteLogicalType(String name, Schema schema, Object value) { switch (schema.name()) { case "io.debezium.time.MicroTimestamp": long l = (Long) value; - sender.timestampColumn(name, l); + sender.timestampColumn(name, l, ChronoUnit.MICROS); return true; case "io.debezium.time.Date": int i = (Integer) value; long micros = Timestamps.addDays(0, i); - sender.timestampColumn(name, micros); + sender.timestampColumn(name, micros, ChronoUnit.MICROS); return true; case Timestamp.LOGICAL_NAME: case Date.LOGICAL_NAME: java.util.Date d = (java.util.Date) value; long epochMillis = d.getTime(); - sender.timestampColumn(name, TimeUnit.MILLISECONDS.toMicros(epochMillis)); + sender.timestampColumn(name, epochMillis, ChronoUnit.MILLIS); return true; case Time.LOGICAL_NAME: d = (java.util.Date) value; @@ -443,7 +443,16 @@ private boolean tryWriteLogicalType(String name, Schema schema, Object value) { @Override public void flush(Map map) { - // not needed as put() flushes after each batch + if (httpTransport) { + try { + log.info("Flushing data to QuestDB"); + sender.flush(); + } catch (LineSenderException | HttpClientException e) { + onSenderException(e); + throw new ConnectException("Failed to flush data to QuestDB", e); + } + } + // TCP transport flushes after each batch so no need to flush here } @Override diff --git a/connector/src/test/java/io/questdb/kafka/ConnectTestUtils.java b/connector/src/test/java/io/questdb/kafka/ConnectTestUtils.java index 5ff9d73..e2f71bd 100644 --- a/connector/src/test/java/io/questdb/kafka/ConnectTestUtils.java +++ b/connector/src/test/java/io/questdb/kafka/ConnectTestUtils.java @@ -42,15 +42,24 @@ static void assertConnectorTaskStateEventually(EmbeddedConnectCluster connect, A Awaitility.await().atMost(CONNECTOR_START_TIMEOUT_MS, MILLISECONDS).untilAsserted(() -> assertConnectorTaskState(connect, CONNECTOR_NAME, expectedState)); } - static Map baseConnectorProps(GenericContainer questDBContainer, String topicName) { - String ilpIUrl = questDBContainer.getHost() + ":" + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_ILP_PORT); + static Map baseConnectorProps(GenericContainer questDBContainer, String topicName, boolean useHttp) { + String host = questDBContainer.getHost(); Map props = new HashMap<>(); props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, QuestDBSinkConnector.class.getName()); props.put("topics", topicName); props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); props.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); - props.put("host", ilpIUrl); + + String confString; + if (!useHttp) { + String ilpIUrl = host + ":" + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_ILP_PORT); + props.put("host", ilpIUrl); + } else { + int port = questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT); + confString = "http::addr="+host+":"+ port + ";"; + props.put("client.conf.string", confString); + } return props; } diff --git a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorConfigTest.java b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorConfigTest.java index 4ce761b..b253a5e 100644 --- a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorConfigTest.java +++ b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorConfigTest.java @@ -19,6 +19,43 @@ public class QuestDBSinkConnectorConfigTest { + @Test + public void testClientConfigurationStringCannotBeCombinedWithExplicitClientConfig() { + assertCannotBeSetTogetherWithConfigString(QuestDBSinkConnectorConfig.HOST_CONFIG, "localhost"); + assertCannotBeSetTogetherWithConfigString(QuestDBSinkConnectorConfig.USERNAME, "joe"); + assertCannotBeSetTogetherWithConfigString(QuestDBSinkConnectorConfig.TOKEN, "secret"); + assertCannotBeSetTogetherWithConfigString(QuestDBSinkConnectorConfig.TLS, "true"); + assertCannotBeSetTogetherWithConfigString(QuestDBSinkConnectorConfig.TLS, "false"); + assertCannotBeSetTogetherWithConfigString(QuestDBSinkConnectorConfig.TLS_VALIDATION_MODE_CONFIG, "default"); + assertCannotBeSetTogetherWithConfigString(QuestDBSinkConnectorConfig.TLS_VALIDATION_MODE_CONFIG, "insecure"); + } + + @Test + public void testEitherHostOrClientConfigStringMustBeSet() { + Map config = baseConnectorProps(); + QuestDBSinkConnector connector = new QuestDBSinkConnector(); + try { + connector.validate(config); + fail("Expected IllegalArgumentException"); + } catch (IllegalArgumentException e) { + assertEquals("Either 'client.conf.string' or 'host' must be set.", e.getMessage()); + } + } + + private void assertCannotBeSetTogetherWithConfigString(String configKey, String configValue) { + Map config = baseConnectorProps(); + config.put(QuestDBSinkConnectorConfig.CONFIGURATION_STRING_CONFIG, "http::addr=localhost;"); + config.put(configKey, configValue); + + QuestDBSinkConnector connector = new QuestDBSinkConnector(); + try { + connector.validate(config); + fail("Expected IllegalArgumentException"); + } catch (IllegalArgumentException e) { + assertEquals("Only one of '" + configKey + "' or 'client.conf.string' must be set.", e.getMessage()); + } + } + @Test public void testTimeunitsValidator() { ConfigDef conf = QuestDBSinkConnectorConfig.conf(); @@ -52,6 +89,7 @@ public void testTimeunitsRecommender() { public void testTlsConfig() { ConfigDef confDef = QuestDBSinkConnectorConfig.conf(); Map config = baseConnectorProps(); + config.put("client.conf.string", "http::addr=localhost;tls=true"); config.put("tls", "true"); QuestDBSinkConnectorConfig sinkConnectorConfig = new QuestDBSinkConnectorConfig(confDef, config); @@ -80,18 +118,21 @@ public void testTlsValidationModeValidation() { public void testExplicitTablenameValidation() { ConfigDef confDef = QuestDBSinkConnectorConfig.conf(); Map config = baseConnectorProps(); + config.put("client.conf.string", "http::addr=localhost;tls=true"); // positive case I - valid explicit table name ConfigValue configValue = confDef.validate(config).stream().filter(c -> c.name().equals(QuestDBSinkConnectorConfig.TABLE_CONFIG)).findFirst().get(); assertTrue(configValue.errorMessages().isEmpty()); // positive case II - missing explicit table name config = baseConnectorProps(); + config.put("client.conf.string", "http::addr=localhost;tls=true"); config.remove(QuestDBSinkConnectorConfig.TABLE_CONFIG); configValue = confDef.validate(config).stream().filter(c -> c.name().equals(QuestDBSinkConnectorConfig.TABLE_CONFIG)).findFirst().get(); assertTrue(configValue.errorMessages().isEmpty()); // negative case - invalid characters in explicit table name config = baseConnectorProps(); + config.put("client.conf.string", "http::addr=localhost;tls=true"); config.put(QuestDBSinkConnectorConfig.TABLE_CONFIG, "not?valid"); configValue = confDef.validate(config).stream().filter(c -> c.name().equals(QuestDBSinkConnectorConfig.TABLE_CONFIG)).findFirst().get(); assertEquals(1, configValue.errorMessages().size()); @@ -104,7 +145,6 @@ private Map baseConnectorProps() { props.put("topics", "myTopic"); props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); props.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); - props.put("host", "localhost"); return props; } } \ No newline at end of file diff --git a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedAuthTest.java b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedAuthTest.java index 9eb6037..898e9e8 100644 --- a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedAuthTest.java +++ b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedAuthTest.java @@ -95,7 +95,7 @@ public void tearDown() { @ValueSource(booleans = {false, true}) public void testSmoke(boolean useTls) { connect.kafka().createTopic(topicName, 1); - Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, false); props.put(QuestDBSinkConnectorConfig.USERNAME, TEST_USER_NAME); props.put(QuestDBSinkConnectorConfig.TOKEN, TEST_USER_TOKEN); diff --git a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java index 9282223..bbcaddc 100644 --- a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java +++ b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java @@ -19,6 +19,8 @@ import org.junit.jupiter.api.*; import org.junit.jupiter.api.io.CleanupMode; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.LoggerFactory; import org.testcontainers.containers.FixedHostPortGenericContainer; import org.testcontainers.containers.GenericContainer; @@ -26,6 +28,7 @@ import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; import org.testcontainers.junit.jupiter.Testcontainers; +import java.io.IOException; import java.math.BigDecimal; import java.nio.file.Path; import java.time.Instant; @@ -47,7 +50,7 @@ public final class QuestDBSinkConnectorEmbeddedTest { private static int httpPort = -1; private static int ilpPort = -1; - private static final String OFFICIAL_QUESTDB_DOCKER = "questdb/questdb:nightly"; + private static final String OFFICIAL_QUESTDB_DOCKER = "questdb/questdb:7.4.0"; private static final boolean DUMP_QUESTDB_CONTAINER_LOGS = true; private EmbeddedConnectCluster connect; @@ -65,11 +68,17 @@ public static void createContainer() { @AfterAll public static void stopContainer() { questDBContainer.stop(); - Files.rmdir(io.questdb.std.str.Path.getThreadLocal(dbRoot.toAbsolutePath().toString())); + Files.rmdir(io.questdb.std.str.Path.getThreadLocal(dbRoot.toAbsolutePath().toString()), true); } private static String questDBDirectory() { - return dbRoot.resolve("questdb").toAbsolutePath().toString(); + Path questdb = dbRoot.resolve("questdb").toAbsolutePath(); + try { + java.nio.file.Files.createDirectories(questdb); + } catch (IOException e) { + throw new AssertionError("Could not create directory: " + questdb, e); + } + return questdb.toAbsolutePath().toString(); } private static GenericContainer questDBContainer; @@ -109,6 +118,7 @@ public void setUp() { Map props = new HashMap<>(); props.put("connector.client.config.override.policy", "All"); + props.put("offset.flush.interval.ms", "1000"); connect = new EmbeddedConnectCluster.Builder() .name("questdb-connect-cluster") .workerProps(props) @@ -123,10 +133,11 @@ public void tearDown() { connect.stop(); } - @Test - public void testSmoke() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testSmoke(boolean useHttp) { connect.kafka().createTopic(topicName, 1); - Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); ConnectTestUtils.assertConnectorTaskRunningEventually(connect); Schema schema = SchemaBuilder.struct().name("com.example.Person") @@ -148,10 +159,11 @@ public void testSmoke() { httpPort); } - @Test - public void testDeadLetterQueue_wrongJson() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testDeadLetterQueue_wrongJson(boolean useHttp) { connect.kafka().createTopic(topicName, 1); - Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); props.put("value.converter.schemas.enable", "false"); props.put("errors.deadletterqueue.topic.name", "dlq"); props.put("errors.deadletterqueue.topic.replication.factor", "1"); @@ -173,10 +185,11 @@ public void testDeadLetterQueue_wrongJson() { Assertions.assertEquals("{\"not valid json}", new String(dqlRecord.value())); } - @Test - public void testSymbol() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testSymbol(boolean useHttp) { connect.kafka().createTopic(topicName, 1); - Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); props.put(QuestDBSinkConnectorConfig.SYMBOL_COLUMNS_CONFIG, "firstname,lastname"); connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); ConnectTestUtils.assertConnectorTaskRunningEventually(connect); @@ -200,9 +213,9 @@ public void testSymbol() { } @Test - public void testRetrying_badDataStopsTheConnectorEventually() throws Exception { + public void testRetrying_badDataStopsTheConnectorEventually_tcp() throws Exception { connect.kafka().createTopic(topicName, 1); - Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, false); props.put("value.converter.schemas.enable", "false"); props.put(QuestDBSinkConnectorConfig.RETRY_BACKOFF_MS, "1000"); props.put(QuestDBSinkConnectorConfig.MAX_RETRIES, "5"); @@ -233,9 +246,36 @@ public void testRetrying_badDataStopsTheConnectorEventually() throws Exception { } @Test - public void testRetrying_recoversFromInfrastructureIssues() throws Exception { + public void testRetrying_badDataStopsTheConnectorEventually_http() { + connect.kafka().createTopic(topicName, 1); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true); + props.put("value.converter.schemas.enable", "false"); + props.put(QuestDBSinkConnectorConfig.RETRY_BACKOFF_MS, "1000"); + props.put(QuestDBSinkConnectorConfig.MAX_RETRIES, "5"); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); + + // creates a record with 'age' as long + connect.kafka().produce(topicName, "key", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42}"); + + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\"\r\n" + + "\"John\",\"Doe\",42\r\n", + "select firstname,lastname,age from " + topicName, + httpPort); + + for (int i = 0; i < 150_000; i++) { + // injects records with 'age' as string + connect.kafka().produce(topicName, "key", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":\"str\"}"); + } + + ConnectTestUtils.assertConnectorTaskStateEventually(connect, AbstractStatus.State.FAILED); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testRetrying_recoversFromInfrastructureIssues(boolean useHttp) throws Exception { connect.kafka().createTopic(topicName, 1); - Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); props.put("value.converter.schemas.enable", "false"); props.put(QuestDBSinkConnectorConfig.RETRY_BACKOFF_MS, "1000"); props.put(QuestDBSinkConnectorConfig.MAX_RETRIES, "40"); @@ -270,10 +310,11 @@ public void testRetrying_recoversFromInfrastructureIssues() throws Exception { httpPort); } - @Test - public void testEmptyCollection_wontFailTheConnector() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testEmptyCollection_wontFailTheConnector(boolean useHttp) { connect.kafka().createTopic(topicName, 1); - Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); // filter out all message props.put("transforms", "drop"); props.put("transforms.drop.type", "org.apache.kafka.connect.transforms.Filter"); @@ -301,10 +342,11 @@ public void testEmptyCollection_wontFailTheConnector() { } } - @Test - public void testSymbol_withAllOtherILPTypes() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testSymbol_withAllOtherILPTypes(boolean useHttp) { connect.kafka().createTopic(topicName, 1); - Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); props.put(QuestDBSinkConnectorConfig.SYMBOL_COLUMNS_CONFIG, "firstname"); connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); ConnectTestUtils.assertConnectorTaskRunningEventually(connect); @@ -353,10 +395,11 @@ public void testSymbol_withAllOtherILPTypes() { httpPort); } - @Test - public void testUpfrontTable_withSymbols() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testUpfrontTable_withSymbols(boolean useHttp) { connect.kafka().createTopic(topicName, 1); - Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); props.put(QuestDBSinkConnectorConfig.SYMBOL_COLUMNS_CONFIG, "firstname,lastname"); connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); ConnectTestUtils.assertConnectorTaskRunningEventually(connect); @@ -373,7 +416,7 @@ public void testUpfrontTable_withSymbols() { QuestDBUtils.assertSql( "{\"ddl\":\"OK\"}", - "create table " + topicName + " (firstname symbol, lastname symbol, age int)", + "create table " + topicName + " (firstname symbol, lastname symbol, age int, ts timestamp) timestamp(ts) partition by day wal", httpPort, QuestDBUtils.Endpoint.EXEC); @@ -381,12 +424,13 @@ public void testUpfrontTable_withSymbols() { QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\",\"key\"\r\n" + "\"John\",\"Doe\",42,\"key\"\r\n", - "select * from " + topicName, + "select firstname, lastname, age, key from " + topicName, httpPort); } @Test public void testExactlyOnce_withDedup() throws BrokenBarrierException, InterruptedException { + // no parametrized since TCP transport does not support exactly-once processing connect.kafka().createTopic(topicName, 4); Schema schema = SchemaBuilder.struct().name("com.example.Event") @@ -429,10 +473,9 @@ public void testExactlyOnce_withDedup() throws BrokenBarrierException, Interrupt QuestDBUtils.Endpoint.EXEC); // start connector - Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true); props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false"); props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "ts"); - props.put(QuestDBSinkConnectorConfig.DEDUPLICATION_REWIND_CONFIG, "150000"); connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); ConnectTestUtils.assertConnectorTaskRunningEventually(connect); @@ -471,10 +514,11 @@ private static void restartQuestDB() { questDBContainer = newQuestDbConnector(); } - @Test - public void testTimestampUnitResolution_auto() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testTimestampUnitResolution_auto(boolean useHttp) { connect.kafka().createTopic(topicName, 1); - Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); props.put("value.converter.schemas.enable", "false"); props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "birth"); connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); @@ -503,22 +547,25 @@ public void testTimestampUnitResolution_auto() { httpPort); } - @Test - public void testTimestampUnitResolution_millis() { - testTimestampUnitResolution0("millis"); + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testTimestampUnitResolution_millis(boolean useHttp) { + testTimestampUnitResolution0("millis", useHttp); } - @Test - public void testTimestampUnitResolution_micros() { - testTimestampUnitResolution0("micros"); + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testTimestampUnitResolution_micros(boolean useHttp) { + testTimestampUnitResolution0("micros", useHttp); } - @Test - public void testTimestampUnitResolution_nanos() { - testTimestampUnitResolution0("nanos"); + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testTimestampUnitResolution_nanos(boolean useHttp) { + testTimestampUnitResolution0("nanos", useHttp); } - private void testTimestampUnitResolution0(String mode) { + private void testTimestampUnitResolution0(String mode, boolean useHttp) { TimeUnit unit; switch (mode) { case "nanos": @@ -534,7 +581,7 @@ private void testTimestampUnitResolution0(String mode) { throw new IllegalArgumentException("Unknown mode: " + mode); } connect.kafka().createTopic(topicName, 1); - Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); props.put("value.converter.schemas.enable", "false"); props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "birth"); props.put(QuestDBSinkConnectorConfig.TIMESTAMP_UNITS_CONFIG, mode); @@ -559,9 +606,10 @@ private void testTimestampUnitResolution0(String mode) { httpPort); } - @Test - public void testKafkaNativeTimestampsAndExplicitDesignatedFieldTimestampMutuallyExclusive() { - Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testKafkaNativeTimestampsAndExplicitDesignatedFieldTimestampMutuallyExclusive(boolean useHttp) { + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); props.put("value.converter.schemas.enable", "false"); props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "born"); props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_KAFKA_NATIVE_CONFIG, "true"); @@ -569,14 +617,15 @@ public void testKafkaNativeTimestampsAndExplicitDesignatedFieldTimestampMutually connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); fail("Expected ConnectException"); } catch (ConnectException e) { - assertThat(e.getMessage(), containsString("timestamp.field.name with timestamp.kafka.native")); + assertThat(e.getMessage(), containsString("'timestamp.field.name' with 'timestamp.kafka.native'")); } } - @Test - public void testKafkaNativeTimestamp() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testKafkaNativeTimestamp(boolean useHttp) { connect.kafka().createTopic(topicName, 1); - Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); props.put("value.converter.schemas.enable", "false"); props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_KAFKA_NATIVE_CONFIG, "true"); @@ -585,7 +634,7 @@ public void testKafkaNativeTimestamp() { QuestDBUtils.assertSql( "{\"ddl\":\"OK\"}", - "create table " + topicName + " (firstname string, lastname string, born timestamp) timestamp(born)", + "create table " + topicName + " (firstname string, lastname string, born timestamp) timestamp(born) partition by day wal", httpPort, QuestDBUtils.Endpoint.EXEC); @@ -608,10 +657,11 @@ public void testKafkaNativeTimestamp() { httpPort); } - @Test - public void testTimestampSMT_parseTimestamp_schemaLess() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testTimestampSMT_parseTimestamp_schemaLess(boolean useHttp) { connect.kafka().createTopic(topicName, 1); - Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); props.put("value.converter.schemas.enable", "false"); props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "born"); props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false"); @@ -633,7 +683,7 @@ public void testTimestampSMT_parseTimestamp_schemaLess() { QuestDBUtils.assertSql( "{\"ddl\":\"OK\"}", - "create table " + topicName + " (firstname string, lastname string, death timestamp, born timestamp) timestamp(born)", + "create table " + topicName + " (firstname string, lastname string, death timestamp, born timestamp) timestamp(born) partition by day wal", httpPort, QuestDBUtils.Endpoint.EXEC); @@ -652,10 +702,11 @@ public void testTimestampSMT_parseTimestamp_schemaLess() { httpPort); } - @Test - public void testTimestampSMT_parseTimestamp_withSchema() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testTimestampSMT_parseTimestamp_withSchema(boolean useHttp) { connect.kafka().createTopic(topicName, 1); - Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "born"); props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false"); @@ -695,10 +746,11 @@ public void testTimestampSMT_parseTimestamp_withSchema() { httpPort); } - @Test - public void testUpfrontTable() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testUpfrontTable(boolean useHttp) { connect.kafka().createTopic(topicName, 1); - Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); ConnectTestUtils.assertConnectorTaskRunningEventually(connect); Schema schema = SchemaBuilder.struct().name("com.example.Person") @@ -714,7 +766,7 @@ public void testUpfrontTable() { QuestDBUtils.assertSql( "{\"ddl\":\"OK\"}", - "create table " + topicName + " (firstname string, lastname string, age int)", + "create table " + topicName + " (firstname string, lastname string, age int, ts timestamp) timestamp(ts) partition by day wal", httpPort, QuestDBUtils.Endpoint.EXEC); @@ -722,14 +774,15 @@ public void testUpfrontTable() { QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\",\"key\"\r\n" + "\"John\",\"Doe\",42,\"key\"\r\n", - "select * from " + topicName, + "select firstname, lastname, age, key from " + topicName, httpPort); } - @Test - public void testDesignatedTimestamp_noSchema_unixEpochMillis() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testDesignatedTimestamp_noSchema_unixEpochMillis(boolean useHttp) { connect.kafka().createTopic(topicName, 1); - Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); props.put("value.converter.schemas.enable", "false"); props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "birth"); connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); @@ -743,10 +796,11 @@ public void testDesignatedTimestamp_noSchema_unixEpochMillis() { httpPort); } - @Test - public void testDesignatedTimestamp_noSchema_dateTransform_fromStringToTimestamp() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testDesignatedTimestamp_noSchema_dateTransform_fromStringToTimestamp(boolean useHttp) { connect.kafka().createTopic(topicName, 1); - Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); props.put("value.converter.schemas.enable", "false"); props.put("transforms", "convert_birth"); props.put("transforms.convert_birth.type", "org.apache.kafka.connect.transforms.TimestampConverter$Value"); @@ -765,10 +819,11 @@ public void testDesignatedTimestamp_noSchema_dateTransform_fromStringToTimestamp httpPort); } - @Test - public void testDesignatedTimestamp_withSchema() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testDesignatedTimestamp_withSchema(boolean useHttp) { connect.kafka().createTopic(topicName, 1); - Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "birth"); connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); ConnectTestUtils.assertConnectorTaskRunningEventually(connect); @@ -798,10 +853,11 @@ public void testDesignatedTimestamp_withSchema() { httpPort); } - @Test - public void testDoNotIncludeKey() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testDoNotIncludeKey(boolean useHttp) { connect.kafka().createTopic(topicName, 1); - Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "birth"); props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false"); connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); @@ -832,10 +888,11 @@ public void testDoNotIncludeKey() { httpPort); } - @Test - public void testJsonNoSchema() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testJsonNoSchema(boolean useHttp) { connect.kafka().createTopic(topicName, 1); - Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); props.put("value.converter.schemas.enable", "false"); connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); ConnectTestUtils.assertConnectorTaskRunningEventually(connect); @@ -847,10 +904,11 @@ public void testJsonNoSchema() { httpPort); } - @Test - public void testJsonNoSchema_mixedFlotingAndIntTypes() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testJsonNoSchema_mixedFlotingAndIntTypes(boolean useHttp) { connect.kafka().createTopic(topicName, 1); - Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); props.put("value.converter.schemas.enable", "false"); props.put(QuestDBSinkConnectorConfig.DOUBLE_COLUMNS_CONFIG, "age"); connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); @@ -866,10 +924,11 @@ public void testJsonNoSchema_mixedFlotingAndIntTypes() { } - @Test - public void testJsonNoSchema_ArrayNotSupported() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testJsonNoSchema_ArrayNotSupported(boolean useHttp) { connect.kafka().createTopic(topicName, 1); - Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); props.put("value.converter.schemas.enable", "false"); connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); ConnectTestUtils.assertConnectorTaskRunningEventually(connect); @@ -878,10 +937,11 @@ public void testJsonNoSchema_ArrayNotSupported() { ConnectTestUtils.assertConnectorTaskFailedEventually(connect); } - @Test - public void testPrimitiveKey() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testPrimitiveKey(boolean useHttp) { connect.kafka().createTopic(topicName, 1); - Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); ConnectTestUtils.assertConnectorTaskRunningEventually(connect); Schema schema = SchemaBuilder.struct().name("com.example.Person") @@ -903,10 +963,11 @@ public void testPrimitiveKey() { httpPort); } - @Test - public void testParsingStringTimestamp_designatedTimestampNotListedExplicitly() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testParsingStringTimestamp_designatedTimestampNotListedExplicitly(boolean useHttp) { connect.kafka().createTopic(topicName, 1); - Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); props.put("value.converter.schemas.enable", "false"); props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "born"); props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false"); @@ -917,7 +978,7 @@ public void testParsingStringTimestamp_designatedTimestampNotListedExplicitly() QuestDBUtils.assertSql( "{\"ddl\":\"OK\"}", - "create table " + topicName + " (firstname string, lastname string, born timestamp) timestamp(born)", + "create table " + topicName + " (firstname string, lastname string, born timestamp) timestamp(born) partition by day wal", httpPort, QuestDBUtils.Endpoint.EXEC); @@ -934,10 +995,11 @@ public void testParsingStringTimestamp_designatedTimestampNotListedExplicitly() httpPort); } - @Test - public void testParsingStringTimestamp() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testParsingStringTimestamp(boolean useHttp) { connect.kafka().createTopic(topicName, 1); - Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); props.put("value.converter.schemas.enable", "false"); props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "born"); props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false"); @@ -949,7 +1011,7 @@ public void testParsingStringTimestamp() { QuestDBUtils.assertSql( "{\"ddl\":\"OK\"}", - "create table " + topicName + " (firstname string, lastname string, death timestamp, born timestamp) timestamp(born)", + "create table " + topicName + " (firstname string, lastname string, death timestamp, born timestamp) timestamp(born) partition by day wal", httpPort, QuestDBUtils.Endpoint.EXEC); @@ -968,10 +1030,11 @@ public void testParsingStringTimestamp() { httpPort); } - @Test - public void testParsingStringTimestamp_defaultPattern() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testParsingStringTimestamp_defaultPattern(boolean useHttp) { connect.kafka().createTopic(topicName, 1); - Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); props.put("value.converter.schemas.enable", "false"); props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "born"); props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false"); @@ -982,7 +1045,7 @@ public void testParsingStringTimestamp_defaultPattern() { QuestDBUtils.assertSql( "{\"ddl\":\"OK\"}", - "create table " + topicName + " (firstname string, lastname string, death timestamp, born timestamp) timestamp(born)", + "create table " + topicName + " (firstname string, lastname string, death timestamp, born timestamp) timestamp(born) partition by day wal", httpPort, QuestDBUtils.Endpoint.EXEC); @@ -1001,10 +1064,11 @@ public void testParsingStringTimestamp_defaultPattern() { httpPort); } - @Test - public void testCustomPrefixWithPrimitiveKeyAndValues() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testCustomPrefixWithPrimitiveKeyAndValues(boolean useHttp) { connect.kafka().createTopic(topicName, 1); - Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); props.put(QuestDBSinkConnectorConfig.KEY_PREFIX_CONFIG, "col_key"); @@ -1021,10 +1085,11 @@ public void testCustomPrefixWithPrimitiveKeyAndValues() { httpPort); } - @Test - public void testSkipUnsupportedType_Bytes() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testSkipUnsupportedType_Bytes(boolean useHttp) { connect.kafka().createTopic(topicName, 1); - Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); props.put(QuestDBSinkConnectorConfig.SKIP_UNSUPPORTED_TYPES_CONFIG, "true"); connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); ConnectTestUtils.assertConnectorTaskRunningEventually(connect); @@ -1047,10 +1112,11 @@ public void testSkipUnsupportedType_Bytes() { httpPort); } - @Test - public void testDefaultPrefixWithPrimitiveKeyAndValues() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testDefaultPrefixWithPrimitiveKeyAndValues(boolean useHttp) { connect.kafka().createTopic(topicName, 1); - Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); @@ -1065,10 +1131,11 @@ public void testDefaultPrefixWithPrimitiveKeyAndValues() { httpPort); } - @Test - public void testStructKey() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testStructKey(boolean useHttp) { connect.kafka().createTopic(topicName, 1); - Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); //overrider the convertor from String to Json props.put(KEY_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); @@ -1091,10 +1158,11 @@ public void testStructKey() { httpPort); } - @Test - public void testStructKeyWithNoPrefix() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testStructKeyWithNoPrefix(boolean useHttp) { connect.kafka().createTopic(topicName, 1); - Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); //overrider the convertor from String to Json props.put(KEY_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); @@ -1119,10 +1187,11 @@ public void testStructKeyWithNoPrefix() { httpPort); } - @Test - public void testStructKeyAndPrimitiveValue() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testStructKeyAndPrimitiveValue(boolean useHttp) { connect.kafka().createTopic(topicName, 1); - Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); //overrider the convertor from String to Json props.put(KEY_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); @@ -1147,11 +1216,12 @@ public void testStructKeyAndPrimitiveValue() { } - @Test - public void testExplicitTableName() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testExplicitTableName(boolean useHttp) { String tableName = ConnectTestUtils.newTableName(); connect.kafka().createTopic(topicName, 1); - Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); props.put(QuestDBSinkConnectorConfig.TABLE_CONFIG, tableName); connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); ConnectTestUtils.assertConnectorTaskRunningEventually(connect); @@ -1174,10 +1244,11 @@ public void testExplicitTableName() { httpPort); } - @Test - public void testLogicalTypes() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testLogicalTypes(boolean useHttp) { connect.kafka().createTopic(topicName, 1); - Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); props.put(QuestDBSinkConnectorConfig.TABLE_CONFIG, topicName); connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); ConnectTestUtils.assertConnectorTaskRunningEventually(connect); @@ -1224,10 +1295,11 @@ public void testLogicalTypes() { httpPort); } - @Test - public void testDecimalTypeNotSupported() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testDecimalTypeNotSupported(boolean useHttp) { connect.kafka().createTopic(topicName, 1); - Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); props.put(QuestDBSinkConnectorConfig.TABLE_CONFIG, topicName); connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); ConnectTestUtils.assertConnectorTaskRunningEventually(connect); @@ -1249,10 +1321,11 @@ public void testDecimalTypeNotSupported() { ConnectTestUtils.assertConnectorTaskFailedEventually(connect); } - @Test - public void testNestedStructInValue() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testNestedStructInValue(boolean useHttp) { connect.kafka().createTopic(topicName, 1); - Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); props.put(QuestDBSinkConnectorConfig.TABLE_CONFIG, topicName); connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); ConnectTestUtils.assertConnectorTaskRunningEventually(connect); @@ -1281,10 +1354,11 @@ public void testNestedStructInValue() { httpPort); } - @Test - public void testMultiLevelNestedStructInValue() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testMultiLevelNestedStructInValue(boolean useHttp) { connect.kafka().createTopic(topicName, 1); - Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); props.put(QuestDBSinkConnectorConfig.TABLE_CONFIG, topicName); connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); ConnectTestUtils.assertConnectorTaskRunningEventually(connect); diff --git a/connector/src/test/java/io/questdb/kafka/QuestDBUtils.java b/connector/src/test/java/io/questdb/kafka/QuestDBUtils.java index 6970672..56f92fa 100644 --- a/connector/src/test/java/io/questdb/kafka/QuestDBUtils.java +++ b/connector/src/test/java/io/questdb/kafka/QuestDBUtils.java @@ -43,7 +43,7 @@ private QuestDBUtils() { } public static void assertSqlEventually(String expectedResult, String query, int timeoutSeconds, int port) { - await().atMost(timeoutSeconds, TimeUnit.SECONDS).untilAsserted(() -> assertSql(expectedResult, query, port)); + await().pollInterval(5, TimeUnit.SECONDS).atMost(timeoutSeconds, TimeUnit.SECONDS).untilAsserted(() -> assertSql(expectedResult, query, port)); } public static void assertSqlEventually(String expectedResult, String query, int port) { diff --git a/connector/src/test/java/io/questdb/kafka/TimestampHelperTest.java b/connector/src/test/java/io/questdb/kafka/TimestampHelperTest.java index 4287d39..7f60b9f 100644 --- a/connector/src/test/java/io/questdb/kafka/TimestampHelperTest.java +++ b/connector/src/test/java/io/questdb/kafka/TimestampHelperTest.java @@ -49,4 +49,9 @@ public void testBoundaries_explicit() { assertEquals(TimeUnit.NANOSECONDS, TimestampHelper.getTimestampUnits(TimeUnit.NANOSECONDS, Long.MAX_VALUE)); } + @Test + public void testSlack() { + assertEquals(TimeUnit.MILLISECONDS, TimestampHelper.getTimestampUnits(null, 1712188800)); + } + } \ No newline at end of file diff --git a/integration-tests/avro-schema-registry/src/test/java/io/questdb/kafka/AvroSchemaRegistryIT.java b/integration-tests/avro-schema-registry/src/test/java/io/questdb/kafka/AvroSchemaRegistryIT.java index a607b1c..696a61f 100644 --- a/integration-tests/avro-schema-registry/src/test/java/io/questdb/kafka/AvroSchemaRegistryIT.java +++ b/integration-tests/avro-schema-registry/src/test/java/io/questdb/kafka/AvroSchemaRegistryIT.java @@ -46,11 +46,15 @@ public class AvroSchemaRegistryIT { private final static Network network = Network.newNetwork(); @Container - private final KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.2.0")) - .withNetwork(network); + private final KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.6.0")) + .withNetwork(network) + .withNetworkAliases("kafka") + .withKraft() + .withEnv("KAFKA_BROKER_ID", "0") + .withEnv("KAFKA_CONTROLLER_QUORUM_VOTERS", "0@kafka:9094"); @Container - private final GenericContainer questDBContainer = new GenericContainer<>("questdb/questdb:6.5.3") + private final GenericContainer questDBContainer = new GenericContainer<>("questdb/questdb:7.4.0") .withNetwork(network) .withExposedPorts(QuestDBUtils.QUESTDB_HTTP_PORT) .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("questdb"))) @@ -58,7 +62,7 @@ public class AvroSchemaRegistryIT { .withEnv("JAVA_OPTS", "-Djava.locale.providers=JRE,SPI"); @Container - private final DebeziumContainer connectContainer = new DebeziumContainer("confluentinc/cp-kafka-connect:7.2.1") + private final DebeziumContainer connectContainer = new DebeziumContainer("confluentinc/cp-kafka-connect:7.6.0") .withEnv("CONNECT_BOOTSTRAP_SERVERS", kafkaContainer.getNetworkAliases().get(0) + ":9092") .withEnv("CONNECT_GROUP_ID", "test") .withEnv("CONNECT_OFFSET_STORAGE_TOPIC", "connect-storage-topic") @@ -71,6 +75,7 @@ public class AvroSchemaRegistryIT { .withEnv("CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR", "1") .withEnv("CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR", "1") .withEnv("CONNECT_STATUS_STORAGE_REPLICATION_FACTOR", "1") +// .withEnv("QDB_DEBUG", "true") .withNetwork(network) .withExposedPorts(8083) .withCopyFileToContainer(MountableFile.forHostPath(connectorJarResolver.getJarPath()), "/usr/share/java/kafka/questdb-connector.jar") @@ -84,7 +89,7 @@ public class AvroSchemaRegistryIT { .withStartupTimeout(ofMinutes(5))); @Container - private GenericContainer schemaRegistry = new GenericContainer<>(DockerImageName.parse("confluentinc/cp-schema-registry:7.2.2")) + private GenericContainer schemaRegistry = new GenericContainer<>(DockerImageName.parse("confluentinc/cp-schema-registry:7.6.0")) .withNetwork(network) .withNetworkAliases("schema-registry") .withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", kafkaContainer.getNetworkAliases().get(0) + ":9092") @@ -144,6 +149,7 @@ public void testSchemaEvolution() throws Exception { } private void startConnector(String topicName) { + String confString = "http::addr=" + questDBContainer.getNetworkAliases().get(0) + ":" + QuestDBUtils.QUESTDB_HTTP_PORT + ";auto_flush_rows=1;"; ConnectorConfiguration connector = ConnectorConfiguration.create() .with("connector.class", QuestDBSinkConnector.class.getName()) .with("tasks.max", "1") @@ -153,7 +159,7 @@ private void startConnector(String topicName) { .with("topics", topicName) .with(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "birthday") .with(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false") - .with("host", questDBContainer.getNetworkAliases().get(0) + ":" + QuestDBUtils.QUESTDB_ILP_PORT); + .with("client.conf.string", confString); connectContainer.registerConnector("my-connector", connector); } diff --git a/integration-tests/cp-server/src/test/java/io/questdb/kafka/QuestDBSinkConnectorIT.java b/integration-tests/cp-server/src/test/java/io/questdb/kafka/QuestDBSinkConnectorIT.java index 7e8e7a8..fae941b 100644 --- a/integration-tests/cp-server/src/test/java/io/questdb/kafka/QuestDBSinkConnectorIT.java +++ b/integration-tests/cp-server/src/test/java/io/questdb/kafka/QuestDBSinkConnectorIT.java @@ -38,19 +38,22 @@ public class QuestDBSinkConnectorIT { private final static Network network = Network.newNetwork(); @Container - private static final KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.2.0")) - .withNetwork(network); + private static final KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.6.0")) + .withNetwork(network) + .withNetworkAliases("kafka") + .withKraft() + .withEnv("KAFKA_BROKER_ID", "0") + .withEnv("KAFKA_CONTROLLER_QUORUM_VOTERS", "0@kafka:9094"); @Container - private static final GenericContainer questDBContainer = new GenericContainer<>("questdb/questdb:6.5.3") + private static final GenericContainer questDBContainer = new GenericContainer<>("questdb/questdb:7.4.0") .withNetwork(network) + .withNetworkAliases("questdb") .withExposedPorts(QuestDBUtils.QUESTDB_HTTP_PORT) - .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("questdb"))) - .withEnv("QDB_CAIRO_COMMIT_LAG", "100") - .withEnv("JAVA_OPTS", "-Djava.locale.providers=JRE,SPI"); + .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("questdb"))); @Container - private static final DebeziumContainer connectContainer = new DebeziumContainer("confluentinc/cp-kafka-connect:7.2.1") + private static final DebeziumContainer connectContainer = new DebeziumContainer("confluentinc/cp-kafka-connect:7.6.0") .withEnv("CONNECT_BOOTSTRAP_SERVERS", kafkaContainer.getNetworkAliases().get(0) + ":9092") .withEnv("CONNECT_GROUP_ID", "test") .withEnv("CONNECT_OFFSET_STORAGE_TOPIC", "connect-storage-topic") @@ -63,6 +66,7 @@ public class QuestDBSinkConnectorIT { .withEnv("CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR", "1") .withEnv("CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR", "1") .withEnv("CONNECT_STATUS_STORAGE_REPLICATION_FACTOR", "1") + .withEnv("QDB_CLIENT_CONF", "http::addr=questdb;auto_flush_rows=1;") .withNetwork(network) .withExposedPorts(8083) .withCopyFileToContainer(MountableFile.forHostPath(connectorJarResolver.getJarPath()), "/usr/share/java/kafka/questdb-connector.jar") @@ -92,8 +96,7 @@ public void test() throws Exception { .with("tasks.max", "1") .with("key.converter", "org.apache.kafka.connect.storage.StringConverter") .with("value.converter", "org.apache.kafka.connect.storage.StringConverter") - .with("topics", topicName) - .with("host", questDBContainer.getNetworkAliases().get(0) + ":" + QuestDBUtils.QUESTDB_ILP_PORT); + .with("topics", topicName); connectContainer.registerConnector("my-connector", connector); diff --git a/integration-tests/debezium/src/test/java/kafka/DebeziumIT.java b/integration-tests/debezium/src/test/java/kafka/DebeziumIT.java index 67bf8fc..21fcfa1 100644 --- a/integration-tests/debezium/src/test/java/kafka/DebeziumIT.java +++ b/integration-tests/debezium/src/test/java/kafka/DebeziumIT.java @@ -40,8 +40,12 @@ public class DebeziumIT { private static final Network network = Network.newNetwork(); @Container - private final KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.2.2")) - .withNetwork(network); + private final KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.6.0")) + .withNetwork(network) + .withNetworkAliases("kafka") + .withKraft() + .withEnv("KAFKA_BROKER_ID", "0") + .withEnv("KAFKA_CONTROLLER_QUORUM_VOTERS", "0@kafka:9094"); @Container public PostgreSQLContainer postgresContainer = @@ -59,10 +63,11 @@ public class DebeziumIT { .dependsOn(kafkaContainer) .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("debezium"))) .withEnv("CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE", "true") - .withEnv("CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE", "true"); + .withEnv("CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE", "true") + .withEnv("OFFSET_FLUSH_INTERVAL_MS", "1000"); @Container - private final GenericContainer questDBContainer = new GenericContainer<>("questdb/questdb:6.5.3") + private final GenericContainer questDBContainer = new GenericContainer<>("questdb/questdb:7.4.0") .withNetwork(network) .withExposedPorts(QuestDBUtils.QUESTDB_HTTP_PORT) .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("questdb"))) @@ -71,9 +76,10 @@ public class DebeziumIT { private ConnectorConfiguration newQuestSinkBaseConfig(String questTableName) { - ConnectorConfiguration questSink = ConnectorConfiguration.create() + String confString = "http::addr=" + questDBContainer.getNetworkAliases().get(0) + ":9000;auto_flush_rows=1000;"; + return ConnectorConfiguration.create() .with("connector.class", QuestDBSinkConnector.class.getName()) - .with("host", questDBContainer.getNetworkAliases().get(0)) + .with("client.conf.string", confString) .with("tasks.max", "1") .with("topics", PG_SERVER_NAME + "."+ PG_SCHEMA_NAME + "." + PG_TABLE_NAME) .with(QuestDBSinkConnectorConfig.TABLE_CONFIG, questTableName) @@ -82,7 +88,6 @@ private ConnectorConfiguration newQuestSinkBaseConfig(String questTableName) { .with("transforms", "unwrap") .with("transforms.unwrap.type", "io.debezium.transforms.ExtractNewRecordState") .with(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false"); - return questSink; } @Test diff --git a/integration-tests/exactlyonce/pom.xml b/integration-tests/exactlyonce/pom.xml index 06c6b58..e712f95 100644 --- a/integration-tests/exactlyonce/pom.xml +++ b/integration-tests/exactlyonce/pom.xml @@ -10,6 +10,11 @@ kafka-it-exactlyonce + + 11 + 11 + + diff --git a/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java b/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java index 15e334d..391dc1e 100644 --- a/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java +++ b/integration-tests/exactlyonce/src/test/java/io/questdb/kafka/ExactlyOnceIT.java @@ -3,7 +3,6 @@ import io.questdb.client.Sender; import io.questdb.std.Os; import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; @@ -36,7 +35,6 @@ import java.nio.file.Path; import java.nio.file.attribute.PosixFilePermission; import java.nio.file.attribute.PosixFilePermissions; -import java.time.Instant; import java.util.*; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; @@ -52,10 +50,9 @@ public class ExactlyOnceIT { private static final int VICTIM_KAFKA = 2; private static final int VICTIMS_TOTAL = VICTIM_KAFKA + 1; - private static final DockerImageName KAFKA_CONTAINER_IMAGE = DockerImageName.parse("confluentinc/cp-kafka:7.5.1"); - private static final DockerImageName ZOOKEEPER_CONTAINER_IMAGE = DockerImageName.parse("confluentinc/cp-zookeeper:7.5.1"); - private static final DockerImageName CONNECT_CONTAINER_IMAGE = DockerImageName.parse("confluentinc/cp-kafka-connect:7.5.1"); - private static final DockerImageName QUESTDB_CONTAINER_IMAGE = DockerImageName.parse("questdb/questdb:7.3.3"); + private static final DockerImageName KAFKA_CONTAINER_IMAGE = DockerImageName.parse("confluentinc/cp-kafka:7.6.0"); + private static final DockerImageName CONNECT_CONTAINER_IMAGE = DockerImageName.parse("confluentinc/cp-kafka-connect:7.6.0"); + private static final DockerImageName QUESTDB_CONTAINER_IMAGE = DockerImageName.parse("questdb/questdb:7.4.0"); private static final int KAFKA_CLUSTER_SIZE = 3; private static final int CONNECT_CLUSTER_SIZE = 2; @@ -71,7 +68,7 @@ public class ExactlyOnceIT { private final static Network network = Network.newNetwork(); - private static GenericContainer zookeeper; +// private static GenericContainer zookeeper; private static KafkaContainer[] kafkas = new KafkaContainer[KAFKA_CLUSTER_SIZE]; private static GenericContainer[] connects = new GenericContainer[CONNECT_CLUSTER_SIZE]; private static GenericContainer questdb; @@ -80,7 +77,6 @@ public class ExactlyOnceIT { @BeforeAll public static void createContainers() { - zookeeper = newZookeeperContainer(); questdb = newQuestDBContainer(); for (int i = 0; i < KAFKA_CLUSTER_SIZE; i++) { kafkas[i] = newKafkaContainer(i); @@ -93,7 +89,7 @@ public static void createContainers() { Stream.concat( Stream.of(kafkas), Stream.of(connects) ), - Stream.of(zookeeper, questdb) + Stream.of(questdb) ); Startables.deepStart(containers).join(); questHttpPort = questdb.getMappedPort(9000); @@ -104,20 +100,7 @@ public static void stopContainer() { questdb.stop(); Stream.of(kafkas).forEach(KafkaContainer::stop); Stream.of(connects).forEach(GenericContainer::stop); - zookeeper.stop(); - - io.questdb.std.Files.rmdir(io.questdb.std.str.Path.getThreadLocal(persistence.toAbsolutePath().toString())); - } - - private static GenericContainer newZookeeperContainer() { - return new GenericContainer<>(ZOOKEEPER_CONTAINER_IMAGE) - .withNetwork(network) - .withNetworkAliases("zookeeper") - .withEnv("ZOOKEEPER_CLIENT_PORT", "2181") - .withEnv("ZOOKEEPER_TICK_TIME", "300") - .withEnv("ZOOKEEPER_INIT_LIMIT", "10") - .withEnv("ZOOKEEPER_SYNC_LIMIT", "5") - .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("zookeeper"))); + io.questdb.std.Files.rmdir(io.questdb.std.str.Path.getThreadLocal(persistence.toAbsolutePath().toString()), true); } private static GenericContainer newQuestDBContainer() { @@ -132,6 +115,7 @@ private static GenericContainer newQuestDBContainer() { .withNetwork(network) // .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("questdb"))) .withFileSystemBind(dbRoot.toAbsolutePath().toString(), "/var/lib/questdb") +// .withEnv("QDB_DEBUG", "true") .withCreateContainerCmdModifier(cmd -> cmd.withHostName("questdb")); if (questHttpPort == 0) { @@ -150,24 +134,29 @@ private static KafkaContainer newKafkaContainer(int id) { // create world-writable directory Files.createDirectories(kafkaData, PosixFilePermissions.asFileAttribute(rwxrwxrwx)); - - // p.of(kafkaData.toAbsolutePath().toString()); // io.questdb.std.Files.mkdirs(p, 0_777); - + StringBuilder votersBuilder = new StringBuilder(); + for (int i = 0; i < KAFKA_CLUSTER_SIZE; i++) { + if (i > 0) { + votersBuilder.append(','); + } + votersBuilder.append(i).append("@kafka").append(i).append(":9094"); + } return new KafkaContainer(KAFKA_CONTAINER_IMAGE) .withNetwork(network) - .dependsOn(zookeeper) - .withExternalZookeeper("zookeeper:2181") + .withNetworkAliases("kafka" + id) + .withKraft() .withEnv("KAFKA_BROKER_ID", String.valueOf(id)) .withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "3") .withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "3") .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "3") .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "2") .withEnv("KAFKA_NUM_PARTITIONS", "3") + .withEnv("KAFKA_CONTROLLER_QUORUM_VOTERS", votersBuilder.toString()) .withFileSystemBind(kafkaData.toAbsolutePath().toString(), "/var/lib/kafka/data") - .withCreateContainerCmdModifier(cmd -> cmd.withHostName("kafka" + id)) - .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("kafka" + id))); + .withCreateContainerCmdModifier(cmd -> cmd.withHostName("kafka" + id)); +// .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("kafka" + id))); } catch (IOException e) { throw new RuntimeException(e); } @@ -176,16 +165,19 @@ private static KafkaContainer newKafkaContainer(int id) { private static GenericContainer newConnectContainer(int id) { List dependencies = new ArrayList<>(Arrays.asList(kafkas)); dependencies.add(questdb); - return new GenericContainer<>(CONNECT_CONTAINER_IMAGE) .withEnv("CONNECT_BOOTSTRAP_SERVERS", "kafka0:9092") .withEnv("CONNECT_GROUP_ID", "test") + .withEnv("CONNECT_OFFSET_FLUSH_INTERVAL_MS", "5000") + .withEnv("CONNECT_CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY", "All") .withEnv("CONNECT_OFFSET_STORAGE_TOPIC", "connect-storage-topic") .withEnv("CONNECT_CONFIG_STORAGE_TOPIC", "connect-config-topic") .withEnv("CONNECT_STATUS_STORAGE_TOPIC", "connect-status-topic") .withEnv("CONNECT_KEY_CONVERTER", "org.apache.kafka.connect.storage.StringConverter") .withEnv("CONNECT_VALUE_CONVERTER", "org.apache.kafka.connect.json.JsonConverter") .withEnv("CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE", "false") +// .withEnv("CONNECT_LOG4J_LOGGERS", "io.questdb.kafka=ALL") +// .withEnv("QDB_DEBUG", "true") .withEnv("CONNECT_REST_ADVERTISED_HOST_NAME", "connect" + id) .withNetwork(network) .withExposedPorts(8083) @@ -213,10 +205,10 @@ public void test() throws Exception { props.put("include.key", "false"); int recordCount = 5_000_000; - new Thread(() -> { - try (Producer producer = new KafkaProducer<>(props)) { - for (int i = 0; i < recordCount; i++ ) { - String json = newPayload(); + Thread producerThread = new Thread(() -> { + try (KafkaProducer producer = new KafkaProducer<>(props)) { + for (int i = 0; i < recordCount; i++) { + String json = newPayload(i); producer.send(new ProducerRecord<>(topicName, null, json)); // 1% chance of duplicates - we want them to be also deduped by QuestDB @@ -225,11 +217,12 @@ public void test() throws Exception { } } } - }).start(); + }); + producerThread.start(); QuestDBUtils.assertSql( "{\"ddl\":\"OK\"}", - "CREATE TABLE " + topicName + " (ts TIMESTAMP, id UUID, val LONG) timestamp(ts) PARTITION BY DAY WAL DEDUP UPSERT KEYS(ts, id);", + "CREATE TABLE " + topicName + " (ts TIMESTAMP, id UUID, val LONG) timestamp(ts) PARTITION BY DAY WAL DEDUP UPSERT KEYS(ts);", questdb.getMappedPort(QuestDBUtils.QUESTDB_HTTP_PORT), QuestDBUtils.Endpoint.EXEC); @@ -250,9 +243,8 @@ public void test() throws Exception { } @NotNull - private static String newPayload() { - Instant now = Instant.now(); - long nanoTs = now.getEpochSecond() * 1_000_000_000 + now.getNano(); + private static String newPayload(int i) { + long nanoTs = i * 1_000_000_000L; UUID uuid = UUID.randomUUID(); int val = ThreadLocalRandom.current().nextInt(100); @@ -262,7 +254,7 @@ private static String newPayload() { private static void startKillingRandomContainers(CyclicBarrier barrier) { new Thread(() -> { while (barrier.getNumberWaiting() == 0) { // keep killing them until the checker thread passed the assertion - Os.sleep(ThreadLocalRandom.current().nextInt(5_000, 30_000)); + Os.sleep(ThreadLocalRandom.current().nextInt(5_000, 10_000)); int victim = ThreadLocalRandom.current().nextInt(VICTIMS_TOTAL); switch (victim) { case VICTIM_QUESTDB: { @@ -284,7 +276,6 @@ private static void startKillingRandomContainers(CyclicBarrier barrier) { int n = ThreadLocalRandom.current().nextInt(kafkas.length); kafkas[n].stop(); KafkaContainer container = newKafkaContainer(n); - Os.sleep(5000); // wait for zookeeper to detect the previous kafka container was stopped container.start(); kafkas[n] = container; break; @@ -304,6 +295,8 @@ private static void startKillingRandomContainers(CyclicBarrier barrier) { } private static void startConnector() throws IOException, InterruptedException, URISyntaxException { + String confString = "http::addr=questdb:9000;auto_flush_rows=10000;auto_flush_interval=1200000;retry_timeout=60000;"; + String payload = "{\"name\":\"my-connector\",\"config\":{" + "\"tasks.max\":\"4\"," + "\"connector.class\":\"io.questdb.kafka.QuestDBSinkConnector\"," + @@ -311,9 +304,9 @@ private static void startConnector() throws IOException, InterruptedException, U "\"value.converter\":\"org.apache.kafka.connect.json.JsonConverter\"," + "\"topics\":\"mytopic\"," + "\"value.converter.schemas.enable\":\"false\"," + - "\"dedup.rewind.offset\":\"150000\"," + "\"timestamp.field.name\":\"ts\"," + - "\"host\":\"questdb:9009\"}" + + "\"timestamp.units\":\"nanos\"," + + "\"client.conf.string\":\""+ confString + "\"}" + "}"; HttpResponse response = HttpClient.newBuilder().connectTimeout(ofSeconds(10)).build().send( diff --git a/kafka-questdb-connector-samples/confluent-docker-images/Dockerfile b/kafka-questdb-connector-samples/confluent-docker-images/Dockerfile index a460cf0..805b495 100644 --- a/kafka-questdb-connector-samples/confluent-docker-images/Dockerfile +++ b/kafka-questdb-connector-samples/confluent-docker-images/Dockerfile @@ -1,2 +1,2 @@ -FROM confluentinc/cp-kafka-connect-base:7.3.2 -RUN confluent-hub install --no-prompt questdb/kafka-questdb-connector:0.6 \ No newline at end of file +FROM confluentinc/cp-kafka-connect-base:7.6.0 +RUN confluent-hub install --no-prompt questdb/kafka-questdb-connector:0.8 \ No newline at end of file diff --git a/kafka-questdb-connector-samples/confluent-docker-images/docker-compose.yaml b/kafka-questdb-connector-samples/confluent-docker-images/docker-compose.yaml index c705471..2e89df6 100644 --- a/kafka-questdb-connector-samples/confluent-docker-images/docker-compose.yaml +++ b/kafka-questdb-connector-samples/confluent-docker-images/docker-compose.yaml @@ -29,7 +29,7 @@ services: ports: - "2181:2181" kafka1: - image: confluentinc/cp-kafka:7.3.2 + image: confluentinc/cp-kafka:7.6.0 ports: - "9092:9092" depends_on: diff --git a/kafka-questdb-connector-samples/confluent-docker-images/readme.md b/kafka-questdb-connector-samples/confluent-docker-images/readme.md index b100a05..908fc56 100644 --- a/kafka-questdb-connector-samples/confluent-docker-images/readme.md +++ b/kafka-questdb-connector-samples/confluent-docker-images/readme.md @@ -20,7 +20,7 @@ It also uses the [Kafka UI](https://github.com/provectus/kafka-ui) project for K - The previous command will generate a lot of log messages. Eventually logging should cease. This means both Apache Kafka and QuestDB are running. - Go to http://localhost:8080/ui/clusters/kafka/connectors and click on the “Create Connector” button. ![screenshot of Kafka UI, with the Create Connector button highlighted](img/create.png) -- The connector name should be QuestDB, use the following configuration and click at Submit: +- The connector name should be 'questdb', use the following configuration and click at Submit: ```json { "connector.class": "io.questdb.kafka.QuestDBSinkConnector", diff --git a/kafka-questdb-connector-samples/faker/Dockerfile-Connect b/kafka-questdb-connector-samples/faker/Dockerfile-Connect index afb7170..2458304 100644 --- a/kafka-questdb-connector-samples/faker/Dockerfile-Connect +++ b/kafka-questdb-connector-samples/faker/Dockerfile-Connect @@ -4,5 +4,5 @@ RUN apt-get update && apt-get install -y curl wget unzip jq RUN curl -s https://api.github.com/repos/questdb/kafka-questdb-connector/releases/latest | jq -r '.assets[]|select(.content_type == "application/zip")|.browser_download_url'|wget -qi - RUN unzip kafka-questdb-connector-*-bin.zip -FROM confluentinc/cp-kafka-connect:7.2.1 +FROM confluentinc/cp-kafka-connect:7.6.0 COPY --from=builder /opt/kafka-questdb-connector/*.jar /usr/share/java/kafka/ \ No newline at end of file diff --git a/kafka-questdb-connector-samples/faker/docker-compose.yml b/kafka-questdb-connector-samples/faker/docker-compose.yml index 994ea1f..d42df9d 100644 --- a/kafka-questdb-connector-samples/faker/docker-compose.yml +++ b/kafka-questdb-connector-samples/faker/docker-compose.yml @@ -1,14 +1,12 @@ version: '2.1' services: questdb: - image: questdb/questdb:6.5.4 + image: questdb/questdb:7.4.0 expose: - "9009" ports: - "19000:9000" environment: - - QDB_CAIRO_COMMIT_LAG=1000 - - JAVA_OPTS=-Djava.locale.providers=JRE,SPI - QDB_LINE_DEFAULT_PARTITION_BY=YEAR zookeeper: image: zookeeper:3.6.2 diff --git a/kafka-questdb-connector-samples/stocks/docker-compose.yml b/kafka-questdb-connector-samples/stocks/docker-compose.yml index 78d12f5..422cf3b 100644 --- a/kafka-questdb-connector-samples/stocks/docker-compose.yml +++ b/kafka-questdb-connector-samples/stocks/docker-compose.yml @@ -1,7 +1,7 @@ version: '2.1' services: questdb: - image: questdb/questdb:6.6.1 + image: questdb/questdb:7.4.0 expose: - "9009" ports: diff --git a/pom.xml b/pom.xml index c2eef24..aa8d808 100644 --- a/pom.xml +++ b/pom.xml @@ -84,7 +84,7 @@ org.questdb questdb - 7.2.1 + 7.4.0 org.junit.jupiter