Skip to content

Commit

Permalink
feat: support for HTTP transport
Browse files Browse the repository at this point in the history
  • Loading branch information
jerrinot committed Apr 4, 2024
1 parent 164e80e commit 9e58de8
Show file tree
Hide file tree
Showing 18 changed files with 398 additions and 456 deletions.
37 changes: 32 additions & 5 deletions connector/src/main/java/io/questdb/kafka/BufferingSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import io.questdb.std.BoolList;
import io.questdb.std.LongList;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -96,17 +98,22 @@ public Sender boolColumn(CharSequence name, boolean value) {
}

@Override
public Sender timestampColumn(CharSequence name, long value) {
public Sender timestampColumn(CharSequence name, long value, ChronoUnit unit) {
if (symbolColumns.contains(name)) {
symbolColumnNames.add(name);
symbolColumnValues.add(String.valueOf(value));
} else {
timestampNames.add(name);
timestampValues.add(value);
timestampValues.add(unitToMicros(value, unit));
}
return this;
}

@Override
public Sender timestampColumn(CharSequence charSequence, Instant instant) {
return null;
}

@Override
public Sender symbol(CharSequence name, CharSequence value) {
symbolColumnNames.add(name);
Expand Down Expand Up @@ -164,16 +171,36 @@ private void transferFields() {
for (int i = 0, n = timestampNames.size(); i < n; i++) {
CharSequence fieldName = timestampNames.get(i);
long fieldValue = timestampValues.get(i);
sender.timestampColumn(fieldName, fieldValue);
sender.timestampColumn(fieldName, fieldValue, ChronoUnit.MICROS);
}
timestampNames.clear();
timestampValues.clear();
}

private static long unitToMicros(long value, ChronoUnit unit) {
switch (unit) {
case NANOS:
return value / 1000L;
case MICROS:
return value;
case MILLIS:
return value * 1000L;
case SECONDS:
return value * 1_000_000L;
default:
throw new IllegalArgumentException("Unsupported unit: " + unit);
}
}

@Override
public void at(long timestamp) {
public void at(long timestamp, ChronoUnit unit) {
transferFields();
sender.at(timestamp);
sender.at(timestamp, unit);
}

@Override
public void at(Instant instant) {
throw new UnsupportedOperationException();
}

@Override
Expand Down
35 changes: 0 additions & 35 deletions connector/src/main/java/io/questdb/kafka/EmptyOffsetTracker.java

This file was deleted.

122 changes: 0 additions & 122 deletions connector/src/main/java/io/questdb/kafka/MultiOffsetTracker.java

This file was deleted.

20 changes: 0 additions & 20 deletions connector/src/main/java/io/questdb/kafka/OffsetTracker.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ public final class QuestDBSinkConnectorConfig extends AbstractConfig {
public static final String TLS_VALIDATION_MODE_CONFIG = "tls.validation.mode";
public static final String TLS_VALIDATION_MODE_DOC = "TLS validation mode. Possible values: default, insecure";

public static final String CONFIGURATION_STRING_CONFIG = "client.conf.string";
public static final String CONFIGURATION_STRING_DOC = "Configuration string for QuestDB client";

public static final String RETRY_BACKOFF_MS = "retry.backoff.ms";
private static final String RETRY_BACKOFF_MS_DOC = "The time in milliseconds to wait following an error before a retry attempt is made";

Expand All @@ -70,15 +73,6 @@ public final class QuestDBSinkConnectorConfig extends AbstractConfig {
public static final String TIMESTAMP_FORMAT = "timestamp.string.format";
private static final String TIMESTAMP_FORMAT_DOC = "Timestamp format. Used when parsing timestamp string fields";

public static final String DEDUPLICATION_REWIND_CONFIG = "dedup.rewind.offset";
private static final String DEDUPLICATION_REWIND_DOC = "Rewind offset for deduplication. " +
"On failure, the connector will rewind the offset by this amount and retry. This is designed to work in concert with QuestDB " +
"deduplication feature. The rewind offset must be greater than or equal to the maximum number of records that can lost in the event of a failure. " +
"If the rewind is too small, some events might be missing from QuestDB. If the rewind is too large, the connector will be slower to recover " +
"as it will have to reprocess a large number of records and QuestDB will have to do extra work with deduplication. If you are testing this " +
"feature for the first time then 150000 is a good starting point.";


private static final String DEFAULT_TIMESTAMP_FORMAT = "yyyy-MM-ddTHH:mm:ss.SSSUUUZ";

public QuestDBSinkConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
Expand All @@ -91,7 +85,7 @@ public QuestDBSinkConnectorConfig(Map<String, String> parsedConfig) {

public static ConfigDef conf() {
return new ConfigDef()
.define(HOST_CONFIG, Type.STRING, Importance.HIGH, HOST_DOC)
.define(HOST_CONFIG, Type.STRING, null, Importance.HIGH, HOST_DOC)
.define(TABLE_CONFIG, Type.STRING, null, TablenameValidator.INSTANCE, Importance.HIGH, TABLE_DOC)
.define(KEY_PREFIX_CONFIG, Type.STRING, "key", Importance.MEDIUM, KEY_PREFIX_DOC)
.define(VALUE_PREFIX_CONFIG, Type.STRING, "", Importance.MEDIUM, VALUE_PREFIX_DOC)
Expand All @@ -109,12 +103,12 @@ public static ConfigDef conf() {
.define(TIMESTAMP_FORMAT, Type.STRING, DEFAULT_TIMESTAMP_FORMAT, TimestampFormatValidator.INSTANCE, Importance.MEDIUM, TIMESTAMP_FORMAT_DOC)
.define(TIMESTAMP_STRING_FIELDS, Type.STRING, null, Importance.MEDIUM, TIMESTAMP_STRING_FIELDS_DOC)
.define(DESIGNATED_TIMESTAMP_KAFKA_NATIVE_CONFIG, Type.BOOLEAN, false, Importance.MEDIUM, DESIGNATED_TIMESTAMP_KAFKA_NATIVE_DOC)
.define(DEDUPLICATION_REWIND_CONFIG, Type.LONG, 0, Importance.MEDIUM, DEDUPLICATION_REWIND_DOC)
.define(TLS_VALIDATION_MODE_CONFIG, Type.STRING, "default", ConfigDef.ValidString.in("default", "insecure"), Importance.LOW, TLS_VALIDATION_MODE_DOC);
.define(TLS_VALIDATION_MODE_CONFIG, Type.STRING, "default", ConfigDef.ValidString.in("default", "insecure"), Importance.LOW, TLS_VALIDATION_MODE_DOC)
.define(CONFIGURATION_STRING_CONFIG, Type.STRING, null, Importance.HIGH, CONFIGURATION_STRING_DOC);
}

public long getDeduplicationRewindOffset() {
return getLong(DEDUPLICATION_REWIND_CONFIG);
public String getConfigurationString() {
return getString(CONFIGURATION_STRING_CONFIG);
}

public String getTlsValidationMode() {
Expand Down
Loading

0 comments on commit 9e58de8

Please sign in to comment.