Skip to content

Commit

Permalink
fix: create DataFormat unconditionally
Browse files Browse the repository at this point in the history
since it used to parse string designated timestamps even when
field was explicitly configured to be parsed as a timestamp.
  • Loading branch information
jerrinot committed Feb 12, 2024
1 parent 9239c05 commit 164e80e
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ public void start(Map<String, String> map) {
for (String symbolColumn : timestampStringFields.split(",")) {
stringTimestampColumns.add(symbolColumn.trim());
}
dataFormat = TimestampParserCompiler.compilePattern(config.getTimestampFormat());
} else {
stringTimestampColumns = Collections.emptySet();
}
dataFormat = TimestampParserCompiler.compilePattern(config.getTimestampFormat());

String doubleColumnsConfig = config.getDoubleColumns();
if (doubleColumnsConfig == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -903,6 +903,37 @@ public void testPrimitiveKey() {
httpPort);
}

@Test
public void testParsingStringTimestamp_designatedTimestampNotListedExplicitly() {
connect.kafka().createTopic(topicName, 1);
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName);
props.put("value.converter.schemas.enable", "false");
props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "born");
props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false");
props.put(QuestDBSinkConnectorConfig.TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss.SSSUUU z");

connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);

QuestDBUtils.assertSql(
"{\"ddl\":\"OK\"}",
"create table " + topicName + " (firstname string, lastname string, born timestamp) timestamp(born)",
httpPort,
QuestDBUtils.Endpoint.EXEC);

String birthTimestamp = "1985-08-02 16:41:55.402095 UTC";
connect.kafka().produce(topicName, "foo",
"{\"firstname\":\"John\""
+ ",\"lastname\":\"Doe\""
+ ",\"born\":\"" + birthTimestamp + "\"}"
);

QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"born\"\r\n" +
"\"John\",\"Doe\",\"1985-08-02T16:41:55.402095Z\"\r\n",
"select * from " + topicName,
httpPort);
}

@Test
public void testParsingStringTimestamp() {
connect.kafka().createTopic(topicName, 1);
Expand Down

0 comments on commit 164e80e

Please sign in to comment.