diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkConnectorConfig.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkConnectorConfig.java index 91718f0..be12cfe 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkConnectorConfig.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkConnectorConfig.java @@ -38,7 +38,7 @@ public final class QuestDBSinkConnectorConfig extends AbstractConfig { private static final String TIMESTAMP_STRING_FIELDS_DOC = "Comma-separated list of string fields that should be parsed as timestamps."; public static final String TIMESTAMP_UNITS_CONFIG = "timestamp.units"; - private static final String TIMESTAMP_UNITS_DOC = "Units of designated timestamp field. Possible values: auto, millis, micros, nanos"; + private static final String TIMESTAMP_UNITS_DOC = "Units of designated timestamp field. Possible values: auto, seconds, millis, micros, nanos"; public static final String INCLUDE_KEY_CONFIG = "include.key"; private static final String INCLUDE_KEY_DOC = "Include key in the table"; @@ -97,7 +97,7 @@ public static ConfigDef conf() { .define(USERNAME, Type.STRING, "admin", Importance.MEDIUM, USERNAME_DOC) .define(TOKEN, Type.PASSWORD, null, Importance.MEDIUM, TOKEN_DOC) .define(TLS, Type.BOOLEAN, false, Importance.MEDIUM, TLS_DOC) - .define(TIMESTAMP_UNITS_CONFIG, Type.STRING, "auto", ConfigDef.ValidString.in("auto", "millis", "micros", "nanos"), Importance.LOW, TIMESTAMP_UNITS_DOC, null, -1, ConfigDef.Width.NONE, TIMESTAMP_UNITS_CONFIG, Collections.emptyList(), TimestampUnitsRecommender.INSTANCE) + .define(TIMESTAMP_UNITS_CONFIG, Type.STRING, "auto", ConfigDef.ValidString.in("auto", "seconds", "millis", "micros", "nanos"), Importance.LOW, TIMESTAMP_UNITS_DOC, null, -1, ConfigDef.Width.NONE, TIMESTAMP_UNITS_CONFIG, Collections.emptyList(), TimestampUnitsRecommender.INSTANCE) .define(RETRY_BACKOFF_MS, Type.LONG, 3_000, Importance.LOW, RETRY_BACKOFF_MS_DOC) .define(MAX_RETRIES, Type.INT, 10, Importance.LOW, MAX_RETRIES_DOC) .define(TIMESTAMP_FORMAT, Type.STRING, DEFAULT_TIMESTAMP_FORMAT, TimestampFormatValidator.INSTANCE, Importance.MEDIUM, TIMESTAMP_FORMAT_DOC) @@ -178,6 +178,8 @@ public boolean isTls() { public TimeUnit getTimestampUnitsOrNull() { String configured = getString(TIMESTAMP_UNITS_CONFIG); switch (configured) { + case "seconds": + return TimeUnit.SECONDS; case "millis": return TimeUnit.MILLISECONDS; case "micros": @@ -187,7 +189,7 @@ public TimeUnit getTimestampUnitsOrNull() { case "auto": return null; default: - throw new ConnectException("Unknown timestamp units mode: " + configured + ". Possible values: auto, millis, micros, nanos"); + throw new ConnectException("Unknown timestamp units mode: " + configured + ". Possible values: auto, seconds, millis, micros, nanos"); } } diff --git a/connector/src/main/java/io/questdb/kafka/TimestampHelper.java b/connector/src/main/java/io/questdb/kafka/TimestampHelper.java index 8650c2f..bb86ebe 100644 --- a/connector/src/main/java/io/questdb/kafka/TimestampHelper.java +++ b/connector/src/main/java/io/questdb/kafka/TimestampHelper.java @@ -7,7 +7,9 @@ private TimestampHelper() { } static TimeUnit guessTimestampUnits(long timestamp) { - if (timestamp < 10000000000000L) { // 11/20/2286, 5:46:40 PM in millis and 4/26/1970, 5:46:40 PM in micros + if (timestamp < 10000000000L) { + return TimeUnit.SECONDS; + } else if (timestamp < 10000000000000L) { // 11/20/2286, 5:46:40 PM in millis and 4/26/1970, 5:46:40 PM in micros return TimeUnit.MILLISECONDS; } else if (timestamp < 10000000000000000L) { return TimeUnit.MICROSECONDS; diff --git a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorConfigTest.java b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorConfigTest.java index b253a5e..b5da6ae 100644 --- a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorConfigTest.java +++ b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorConfigTest.java @@ -72,7 +72,7 @@ public void testTimeunitsValidator() { configKey.validator.ensureValid(QuestDBSinkConnectorConfig.TIMESTAMP_UNITS_CONFIG, "foo"); fail("Expected ConfigException"); } catch (ConfigException e) { - assertEquals("Invalid value foo for configuration timestamp.units: String must be one of: auto, millis, micros, nanos", e.getMessage()); + assertEquals("Invalid value foo for configuration timestamp.units: String must be one of: auto, seconds, millis, micros, nanos", e.getMessage()); } } diff --git a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java index 3593b84..f774b05 100644 --- a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java +++ b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java @@ -20,6 +20,7 @@ import org.junit.jupiter.api.io.CleanupMode; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.LoggerFactory; import org.testcontainers.containers.FixedHostPortGenericContainer; @@ -40,6 +41,7 @@ import static java.util.Collections.singletonMap; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG; import static org.hamcrest.CoreMatchers.containsString; @@ -301,7 +303,6 @@ public void testRetrying_recoversFromInfrastructureIssues(boolean useHttp) throw questDBContainer = newQuestDbConnector(); for (int i = 0; i < 50; i++) { connect.kafka().produce(topicName, "key3", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":" + i + "}"); - Thread.sleep(100); } QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\"\r\n" @@ -312,6 +313,33 @@ public void testRetrying_recoversFromInfrastructureIssues(boolean useHttp) throw ); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testJsonNestedLongTimestampInSeconds(boolean useHttp) { + connect.kafka().createTopic(topicName, 1); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); + props.put("value.converter.schemas.enable", "false"); + props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "timeseriesElement_observationDateTime"); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); + connect.kafka().produce(topicName, "key", + "{\"timeseriesElement\":{\n" + + "\"open\":65994.204593157,\n" + + "\"close\":66213.0396203394,\n" + + "\"low\":65106.5637640695,\n" + + "\"high\":66467.4682495325,\n" + + "\"observationDateTime\":1712185812,\n" + + "\"volume\":104734.408713828\n" + + "}}" + ); + + QuestDBUtils.assertSqlEventually( + "\"key\",\"timeseriesElement_volume\",\"timeseriesElement_high\",\"timeseriesElement_low\",\"timeseriesElement_close\",\"timeseriesElement_open\",\"timestamp\"\r\n" + + "\"key\",104734.408713828,66467.4682495325,65106.5637640695,66213.0396203394,65994.204593157,\"2024-04-03T23:10:12.000000Z\"\r\n", + "select * from " + topicName, + httpPort); + } + @ParameterizedTest @ValueSource(booleans = {true, false}) public void testEmptyCollection_wontFailTheConnector(boolean useHttp) { @@ -550,24 +578,17 @@ public void testTimestampUnitResolution_auto(boolean useHttp) { } @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testTimestampUnitResolution_millis(boolean useHttp) { - testTimestampUnitResolution0("millis", useHttp); - } - - @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testTimestampUnitResolution_micros(boolean useHttp) { - testTimestampUnitResolution0("micros", useHttp); - } - - @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testTimestampUnitResolution_nanos(boolean useHttp) { - testTimestampUnitResolution0("nanos", useHttp); - } - - private void testTimestampUnitResolution0(String mode, boolean useHttp) { + @CsvSource({ + "seconds, true", + "seconds, false", + "millis, true", + "millis, false", + "micros, true", + "micros, false", + "nanos, true", + "nanos, false", + }) + public void testTimestampUnitResolution0(String mode, boolean useHttp) { TimeUnit unit; switch (mode) { case "nanos": @@ -579,6 +600,9 @@ private void testTimestampUnitResolution0(String mode, boolean useHttp) { case "millis": unit = TimeUnit.MILLISECONDS; break; + case "seconds": + unit = TimeUnit.SECONDS; + break; default: throw new IllegalArgumentException("Unknown mode: " + mode); } @@ -601,9 +625,11 @@ private void testTimestampUnitResolution0(String mode, boolean useHttp) { connect.kafka().produce(topicName, "foo", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"birth\":0}"); connect.kafka().produce(topicName, "bar", "{\"firstname\":\"Jane\",\"lastname\":\"Doe\",\"birth\":" + birthTarget + "}"); + + String upperBound = unit == SECONDS ? "2206-11-20T17:46:39.000000Z" : "2206-11-20T17:46:39.999000Z"; QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"timestamp\"\r\n" + "\"John\",\"Doe\",\"1970-01-01T00:00:00.000000Z\"\r\n" - + "\"Jane\",\"Doe\",\"2206-11-20T17:46:39.999000Z\"\r\n", + + "\"Jane\",\"Doe\",\""+ upperBound + "\"\r\n", "select firstname,lastname,timestamp from " + topicName, httpPort); } diff --git a/connector/src/test/java/io/questdb/kafka/TimestampHelperTest.java b/connector/src/test/java/io/questdb/kafka/TimestampHelperTest.java index 7f60b9f..53a56ad 100644 --- a/connector/src/test/java/io/questdb/kafka/TimestampHelperTest.java +++ b/connector/src/test/java/io/questdb/kafka/TimestampHelperTest.java @@ -27,6 +27,9 @@ public void testBoundaries_auto() { .setTimeOfDay(17, 46, 39, 999) .build().getTime(); + assertEquals(TimeUnit.SECONDS, TimestampHelper.getTimestampUnits(null, lowerBound.getTime() / 1000)); + assertEquals(TimeUnit.SECONDS, TimestampHelper.getTimestampUnits(null, upperBound.getTime() / 1000)); + assertEquals(TimeUnit.MILLISECONDS, TimestampHelper.getTimestampUnits(null, lowerBound.getTime())); assertEquals(TimeUnit.MILLISECONDS, TimestampHelper.getTimestampUnits(null, upperBound.getTime())); @@ -39,6 +42,9 @@ public void testBoundaries_auto() { @Test public void testBoundaries_explicit() { + assertEquals(TimeUnit.SECONDS, TimestampHelper.getTimestampUnits(TimeUnit.SECONDS, 0)); + assertEquals(TimeUnit.SECONDS, TimestampHelper.getTimestampUnits(TimeUnit.SECONDS, Long.MAX_VALUE)); + assertEquals(TimeUnit.MILLISECONDS, TimestampHelper.getTimestampUnits(TimeUnit.MILLISECONDS, 0)); assertEquals(TimeUnit.MILLISECONDS, TimestampHelper.getTimestampUnits(TimeUnit.MILLISECONDS, Long.MAX_VALUE)); @@ -51,7 +57,7 @@ public void testBoundaries_explicit() { @Test public void testSlack() { - assertEquals(TimeUnit.MILLISECONDS, TimestampHelper.getTimestampUnits(null, 1712188800)); + assertEquals(TimeUnit.SECONDS, TimestampHelper.getTimestampUnits(null, 1712188800)); } } \ No newline at end of file