diff --git a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java index 6498f59..6f26018 100644 --- a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java +++ b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java @@ -996,6 +996,46 @@ public void testDoNotIncludeKey(boolean useHttp) { httpPort); } + @Test + public void testExtractKafkaIngestionTimestampAsField() { + connect.kafka().createTopic(topicName, 1); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true); + props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "birth"); // the field is injected via InsertField SMT + props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false"); + props.put("transforms", "InsertField"); + props.put("transforms.InsertField.type", "org.apache.kafka.connect.transforms.InsertField$Value"); + props.put("transforms.InsertField.timestamp.field", "birth"); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); + + // note: there is no birth field in the message payload + Schema schema = SchemaBuilder.struct().name("com.example.Person") + .field("firstname", Schema.STRING_SCHEMA) + .field("lastname", Schema.STRING_SCHEMA) + .build(); + Struct struct = new Struct(schema) + .put("firstname", "John") + .put("lastname", "Doe"); + + Map prodProps = new HashMap<>(); + try (KafkaProducer producer = connect.kafka().createProducer(prodProps)) { + java.util.Date birth = new Calendar.Builder() + .setTimeZone(TimeZone.getTimeZone("UTC")) + .setDate(2022, 9, 23) // note: month is 0-based + .setTimeOfDay(13, 53, 59, 123) + .build().getTime(); + long kafkaTimestamp = birth.getTime(); + ProducerRecord producerRecord = new ProducerRecord<>(topicName, null, kafkaTimestamp, "key".getBytes(), new String(converter.fromConnectData(topicName, schema, struct)).getBytes()); + producer.send(producerRecord); + } + + QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"timestamp\"\r\n" + + "\"John\",\"Doe\",\"2022-10-23T13:53:59.123000Z\"\r\n", + "select * from " + topicName, + httpPort); + } + + @ParameterizedTest @ValueSource(booleans = {true, false}) public void testJsonNoSchema(boolean useHttp) {