diff --git a/kstreamplify-core-test/src/test/java/com/michelin/kstreamplify/TopologyErrorHandlerTest.java b/kstreamplify-core-test/src/test/java/com/michelin/kstreamplify/TopologyErrorHandlerTest.java index ce7464a3..3af13ac7 100644 --- a/kstreamplify-core-test/src/test/java/com/michelin/kstreamplify/TopologyErrorHandlerTest.java +++ b/kstreamplify-core-test/src/test/java/com/michelin/kstreamplify/TopologyErrorHandlerTest.java @@ -7,6 +7,7 @@ import com.michelin.kstreamplify.error.TopologyErrorHandler; import com.michelin.kstreamplify.initializer.KafkaStreamsStarter; import com.michelin.kstreamplify.utils.SerdesUtils; +import com.michelin.kstreamplify.utils.TopicWithSerde; import java.util.List; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringDeserializer; @@ -136,4 +137,19 @@ void shouldContinueWhenProcessingValueIsInvalidAvro() { assertEquals(1, resultDlq.size()); assertEquals(0, resultOutput.size()); } + + @Test + void shouldCreateInputAndOutputTopicsWithSerde() { + TestInputTopic inputTopic = createInputTestTopic(new TopicWithSerde<>("INPUT_TOPIC", + "APP_NAME", Serdes.String(), Serdes.String())); + + assertEquals("TestInputTopic[topic='INPUT_TOPIC', keySerializer=StringSerializer, " + + "valueSerializer=StringSerializer]", inputTopic.toString()); + + TestOutputTopic outputTopic = createOutputTestTopic(new TopicWithSerde<>("OUTPUT_TOPIC", + "APP_NAME", Serdes.String(), Serdes.String())); + + assertEquals("TestOutputTopic[topic='OUTPUT_TOPIC', keyDeserializer=StringDeserializer, " + + "valueDeserializer=StringDeserializer, size=0]", outputTopic.toString()); + } } diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/TopicWithSerde.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/TopicWithSerde.java index 9a853a89..2adce2d3 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/TopicWithSerde.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/TopicWithSerde.java @@ -57,7 +57,6 @@ public class TopicWithSerde { @Getter private final Serde valueSerde; - /** *

Additional constructor which uses default parameter "self" for prefixPropertyKey.

* diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/utils/TopicWithSerdesTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/utils/TopicWithSerdesTest.java new file mode 100644 index 00000000..b5d64604 --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/utils/TopicWithSerdesTest.java @@ -0,0 +1,101 @@ +package com.michelin.kstreamplify.utils; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext; +import java.util.Properties; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.kstream.KStream; +import org.junit.jupiter.api.Test; + +class TopicWithSerdesTest { + + @Test + void shouldCreateTopicWithSerde() { + KafkaStreamsExecutionContext.registerProperties(new Properties()); + + TopicWithSerde topicWithSerde = new TopicWithSerde<>("INPUT_TOPIC", + Serdes.String(), Serdes.String()); + + assertEquals("INPUT_TOPIC", topicWithSerde.getUnPrefixedName()); + assertEquals("INPUT_TOPIC", topicWithSerde.toString()); + } + + @Test + void shouldCreateTopicWithSerdeWithPrefix() { + Properties properties = new Properties(); + properties.put("prefix.self", "abc."); + + KafkaStreamsExecutionContext.registerProperties(properties); + + TopicWithSerde topicWithSerde = new TopicWithSerde<>("INPUT_TOPIC", + Serdes.String(), Serdes.String()); + + assertEquals("INPUT_TOPIC", topicWithSerde.getUnPrefixedName()); + assertEquals("abc.INPUT_TOPIC", topicWithSerde.toString()); + } + + @Test + void shouldCreateStream() { + KafkaStreamsExecutionContext.registerProperties(new Properties()); + + TopicWithSerde topicWithSerde = new TopicWithSerde<>("INPUT_TOPIC", + Serdes.String(), Serdes.String()); + + StreamsBuilder streamsBuilder = new StreamsBuilder(); + topicWithSerde.stream(streamsBuilder); + + assertEquals(""" + Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [INPUT_TOPIC]) + --> none + + """, streamsBuilder.build().describe().toString()); + } + + @Test + void shouldCreateTable() { + KafkaStreamsExecutionContext.registerProperties(new Properties()); + + TopicWithSerde topicWithSerde = new TopicWithSerde<>("INPUT_TOPIC", + Serdes.String(), Serdes.String()); + + StreamsBuilder streamsBuilder = new StreamsBuilder(); + topicWithSerde.table(streamsBuilder, "myStore"); + + assertEquals(""" + Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [INPUT_TOPIC]) + --> KTABLE-SOURCE-0000000001 + Processor: KTABLE-SOURCE-0000000001 (stores: [myStore]) + --> none + <-- KSTREAM-SOURCE-0000000000 + + """, streamsBuilder.build().describe().toString()); + } + + @Test + void shouldCreateGKTable() { + KafkaStreamsExecutionContext.registerProperties(new Properties()); + + TopicWithSerde topicWithSerde = new TopicWithSerde<>("INPUT_TOPIC", + Serdes.String(), Serdes.String()); + + StreamsBuilder streamsBuilder = new StreamsBuilder(); + topicWithSerde.globalTable(streamsBuilder, "myStore"); + + assertEquals(""" + Topologies: + Sub-topology: 0 for global store (will not generate tasks) + Source: KSTREAM-SOURCE-0000000000 (topics: [INPUT_TOPIC]) + --> KTABLE-SOURCE-0000000001 + Processor: KTABLE-SOURCE-0000000001 (stores: [myStore]) + --> none + <-- KSTREAM-SOURCE-0000000000 + """, streamsBuilder.build().describe().toString()); + } +} diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/utils/TopicWithSerdesUnitTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/utils/TopicWithSerdesUnitTest.java deleted file mode 100644 index cc4a4229..00000000 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/utils/TopicWithSerdesUnitTest.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.michelin.kstreamplify.utils; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; - -import org.apache.kafka.common.serialization.Serdes; -import org.junit.jupiter.api.Test; - -class TopicWithSerdesUnitTest { - - @Test - void shouldCreateTopicWithSerde() { - TopicWithSerde topicWithSerde = new TopicWithSerde<>("INPUT_TOPIC", - Serdes.String(), Serdes.String()); - - assertEquals("INPUT_TOPIC", topicWithSerde.getUnPrefixedName()); - } -}