diff --git a/connector/src/main/java/io/questdb/kafka/QuestDBSinkConnectorConfig.java b/connector/src/main/java/io/questdb/kafka/QuestDBSinkConnectorConfig.java index ea0d689..1fb6a5f 100644 --- a/connector/src/main/java/io/questdb/kafka/QuestDBSinkConnectorConfig.java +++ b/connector/src/main/java/io/questdb/kafka/QuestDBSinkConnectorConfig.java @@ -70,7 +70,7 @@ public final class QuestDBSinkConnectorConfig extends AbstractConfig { public static final String TIMESTAMP_FORMAT = "timestamp.string.format"; private static final String TIMESTAMP_FORMAT_DOC = "Timestamp format. Used when parsing timestamp string fields"; - private static final String DEFAULT_TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSSSS'Z'"; + private static final String DEFAULT_TIMESTAMP_FORMAT = "yyyy-MM-ddTHH:mm:ss.SSSUUUZ"; public QuestDBSinkConnectorConfig(ConfigDef config, Map parsedConfig) { super(config, parsedConfig); diff --git a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java index c1f7d1c..cb47e40 100644 --- a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java +++ b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java @@ -796,6 +796,37 @@ public void testParsingStringTimestamp() { "select * from " + topicName); } + @Test + public void testParsingStringTimestamp_defaultPattern() { + connect.kafka().createTopic(topicName, 1); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName); + props.put("value.converter.schemas.enable", "false"); + props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "born"); + props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false"); + props.put(QuestDBSinkConnectorConfig.TIMESTAMP_STRING_FIELDS, "born,death"); + + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); + + QuestDBUtils.assertSql(questDBContainer, + "{\"ddl\":\"OK\"}\n", + "create table " + topicName + " (firstname string, lastname string, death timestamp, born timestamp) timestamp(born)", + QuestDBUtils.Endpoint.EXEC); + + String birthTimestamp = "1985-08-02T16:41:55.402095Z"; + String deadTimestamp = "2023-08-02T16:41:55.402095Z"; + connect.kafka().produce(topicName, "foo", + "{\"firstname\":\"John\"" + + ",\"lastname\":\"Doe\"" + + ",\"death\":\"" + deadTimestamp + "\"" + + ",\"born\":\"" + birthTimestamp + "\"}" + ); + + QuestDBUtils.assertSqlEventually(questDBContainer, "\"firstname\",\"lastname\",\"death\",\"born\"\r\n" + + "\"John\",\"Doe\",\"2023-08-02T16:41:55.402095Z\",\"1985-08-02T16:41:55.402095Z\"\r\n", + "select * from " + topicName); + } + @Test public void testCustomPrefixWithPrimitiveKeyAndValues() { connect.kafka().createTopic(topicName, 1);