Skip to content

Commit

Permalink
feat: support unix epoch timestamps in seconds (#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
jerrinot authored Apr 6, 2024
1 parent 9c3c12f commit f0350f5
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public final class QuestDBSinkConnectorConfig extends AbstractConfig {
private static final String TIMESTAMP_STRING_FIELDS_DOC = "Comma-separated list of string fields that should be parsed as timestamps.";

public static final String TIMESTAMP_UNITS_CONFIG = "timestamp.units";
private static final String TIMESTAMP_UNITS_DOC = "Units of designated timestamp field. Possible values: auto, millis, micros, nanos";
private static final String TIMESTAMP_UNITS_DOC = "Units of designated timestamp field. Possible values: auto, seconds, millis, micros, nanos";

public static final String INCLUDE_KEY_CONFIG = "include.key";
private static final String INCLUDE_KEY_DOC = "Include key in the table";
Expand Down Expand Up @@ -97,7 +97,7 @@ public static ConfigDef conf() {
.define(USERNAME, Type.STRING, "admin", Importance.MEDIUM, USERNAME_DOC)
.define(TOKEN, Type.PASSWORD, null, Importance.MEDIUM, TOKEN_DOC)
.define(TLS, Type.BOOLEAN, false, Importance.MEDIUM, TLS_DOC)
.define(TIMESTAMP_UNITS_CONFIG, Type.STRING, "auto", ConfigDef.ValidString.in("auto", "millis", "micros", "nanos"), Importance.LOW, TIMESTAMP_UNITS_DOC, null, -1, ConfigDef.Width.NONE, TIMESTAMP_UNITS_CONFIG, Collections.emptyList(), TimestampUnitsRecommender.INSTANCE)
.define(TIMESTAMP_UNITS_CONFIG, Type.STRING, "auto", ConfigDef.ValidString.in("auto", "seconds", "millis", "micros", "nanos"), Importance.LOW, TIMESTAMP_UNITS_DOC, null, -1, ConfigDef.Width.NONE, TIMESTAMP_UNITS_CONFIG, Collections.emptyList(), TimestampUnitsRecommender.INSTANCE)
.define(RETRY_BACKOFF_MS, Type.LONG, 3_000, Importance.LOW, RETRY_BACKOFF_MS_DOC)
.define(MAX_RETRIES, Type.INT, 10, Importance.LOW, MAX_RETRIES_DOC)
.define(TIMESTAMP_FORMAT, Type.STRING, DEFAULT_TIMESTAMP_FORMAT, TimestampFormatValidator.INSTANCE, Importance.MEDIUM, TIMESTAMP_FORMAT_DOC)
Expand Down Expand Up @@ -178,6 +178,8 @@ public boolean isTls() {
public TimeUnit getTimestampUnitsOrNull() {
String configured = getString(TIMESTAMP_UNITS_CONFIG);
switch (configured) {
case "seconds":
return TimeUnit.SECONDS;
case "millis":
return TimeUnit.MILLISECONDS;
case "micros":
Expand All @@ -187,7 +189,7 @@ public TimeUnit getTimestampUnitsOrNull() {
case "auto":
return null;
default:
throw new ConnectException("Unknown timestamp units mode: " + configured + ". Possible values: auto, millis, micros, nanos");
throw new ConnectException("Unknown timestamp units mode: " + configured + ". Possible values: auto, seconds, millis, micros, nanos");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ private TimestampHelper() {
}

static TimeUnit guessTimestampUnits(long timestamp) {
if (timestamp < 10000000000000L) { // 11/20/2286, 5:46:40 PM in millis and 4/26/1970, 5:46:40 PM in micros
if (timestamp < 10000000000L) {
return TimeUnit.SECONDS;
} else if (timestamp < 10000000000000L) { // 11/20/2286, 5:46:40 PM in millis and 4/26/1970, 5:46:40 PM in micros
return TimeUnit.MILLISECONDS;
} else if (timestamp < 10000000000000000L) {
return TimeUnit.MICROSECONDS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void testTimeunitsValidator() {
configKey.validator.ensureValid(QuestDBSinkConnectorConfig.TIMESTAMP_UNITS_CONFIG, "foo");
fail("Expected ConfigException");
} catch (ConfigException e) {
assertEquals("Invalid value foo for configuration timestamp.units: String must be one of: auto, millis, micros, nanos", e.getMessage());
assertEquals("Invalid value foo for configuration timestamp.units: String must be one of: auto, seconds, millis, micros, nanos", e.getMessage());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.junit.jupiter.api.io.CleanupMode;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.FixedHostPortGenericContainer;
Expand All @@ -40,6 +41,7 @@

import static java.util.Collections.singletonMap;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
import static org.hamcrest.CoreMatchers.containsString;
Expand Down Expand Up @@ -301,7 +303,6 @@ public void testRetrying_recoversFromInfrastructureIssues(boolean useHttp) throw
questDBContainer = newQuestDbConnector();
for (int i = 0; i < 50; i++) {
connect.kafka().produce(topicName, "key3", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":" + i + "}");
Thread.sleep(100);
}

QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\"\r\n"
Expand All @@ -312,6 +313,33 @@ public void testRetrying_recoversFromInfrastructureIssues(boolean useHttp) throw
);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testJsonNestedLongTimestampInSeconds(boolean useHttp) {
connect.kafka().createTopic(topicName, 1);
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp);
props.put("value.converter.schemas.enable", "false");
props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "timeseriesElement_observationDateTime");
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);
connect.kafka().produce(topicName, "key",
"{\"timeseriesElement\":{\n" +
"\"open\":65994.204593157,\n" +
"\"close\":66213.0396203394,\n" +
"\"low\":65106.5637640695,\n" +
"\"high\":66467.4682495325,\n" +
"\"observationDateTime\":1712185812,\n" +
"\"volume\":104734.408713828\n" +
"}}"
);

QuestDBUtils.assertSqlEventually(
"\"key\",\"timeseriesElement_volume\",\"timeseriesElement_high\",\"timeseriesElement_low\",\"timeseriesElement_close\",\"timeseriesElement_open\",\"timestamp\"\r\n" +
"\"key\",104734.408713828,66467.4682495325,65106.5637640695,66213.0396203394,65994.204593157,\"2024-04-03T23:10:12.000000Z\"\r\n",
"select * from " + topicName,
httpPort);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testEmptyCollection_wontFailTheConnector(boolean useHttp) {
Expand Down Expand Up @@ -550,24 +578,17 @@ public void testTimestampUnitResolution_auto(boolean useHttp) {
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testTimestampUnitResolution_millis(boolean useHttp) {
testTimestampUnitResolution0("millis", useHttp);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testTimestampUnitResolution_micros(boolean useHttp) {
testTimestampUnitResolution0("micros", useHttp);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testTimestampUnitResolution_nanos(boolean useHttp) {
testTimestampUnitResolution0("nanos", useHttp);
}

private void testTimestampUnitResolution0(String mode, boolean useHttp) {
@CsvSource({
"seconds, true",
"seconds, false",
"millis, true",
"millis, false",
"micros, true",
"micros, false",
"nanos, true",
"nanos, false",
})
public void testTimestampUnitResolution0(String mode, boolean useHttp) {
TimeUnit unit;
switch (mode) {
case "nanos":
Expand All @@ -579,6 +600,9 @@ private void testTimestampUnitResolution0(String mode, boolean useHttp) {
case "millis":
unit = TimeUnit.MILLISECONDS;
break;
case "seconds":
unit = TimeUnit.SECONDS;
break;
default:
throw new IllegalArgumentException("Unknown mode: " + mode);
}
Expand All @@ -601,9 +625,11 @@ private void testTimestampUnitResolution0(String mode, boolean useHttp) {
connect.kafka().produce(topicName, "foo", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"birth\":0}");
connect.kafka().produce(topicName, "bar", "{\"firstname\":\"Jane\",\"lastname\":\"Doe\",\"birth\":" + birthTarget + "}");


String upperBound = unit == SECONDS ? "2206-11-20T17:46:39.000000Z" : "2206-11-20T17:46:39.999000Z";
QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"timestamp\"\r\n"
+ "\"John\",\"Doe\",\"1970-01-01T00:00:00.000000Z\"\r\n"
+ "\"Jane\",\"Doe\",\"2206-11-20T17:46:39.999000Z\"\r\n",
+ "\"Jane\",\"Doe\",\""+ upperBound + "\"\r\n",
"select firstname,lastname,timestamp from " + topicName,
httpPort);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ public void testBoundaries_auto() {
.setTimeOfDay(17, 46, 39, 999)
.build().getTime();

assertEquals(TimeUnit.SECONDS, TimestampHelper.getTimestampUnits(null, lowerBound.getTime() / 1000));
assertEquals(TimeUnit.SECONDS, TimestampHelper.getTimestampUnits(null, upperBound.getTime() / 1000));

assertEquals(TimeUnit.MILLISECONDS, TimestampHelper.getTimestampUnits(null, lowerBound.getTime()));
assertEquals(TimeUnit.MILLISECONDS, TimestampHelper.getTimestampUnits(null, upperBound.getTime()));

Expand All @@ -39,6 +42,9 @@ public void testBoundaries_auto() {

@Test
public void testBoundaries_explicit() {
assertEquals(TimeUnit.SECONDS, TimestampHelper.getTimestampUnits(TimeUnit.SECONDS, 0));
assertEquals(TimeUnit.SECONDS, TimestampHelper.getTimestampUnits(TimeUnit.SECONDS, Long.MAX_VALUE));

assertEquals(TimeUnit.MILLISECONDS, TimestampHelper.getTimestampUnits(TimeUnit.MILLISECONDS, 0));
assertEquals(TimeUnit.MILLISECONDS, TimestampHelper.getTimestampUnits(TimeUnit.MILLISECONDS, Long.MAX_VALUE));

Expand All @@ -51,7 +57,7 @@ public void testBoundaries_explicit() {

@Test
public void testSlack() {
assertEquals(TimeUnit.MILLISECONDS, TimestampHelper.getTimestampUnits(null, 1712188800));
assertEquals(TimeUnit.SECONDS, TimestampHelper.getTimestampUnits(null, 1712188800));
}

}

0 comments on commit f0350f5

Please sign in to comment.