diff --git a/kstreamplify-core-test/src/main/java/com/michelin/kstreamplify/KafkaStreamsStarterTest.java b/kstreamplify-core-test/src/main/java/com/michelin/kstreamplify/KafkaStreamsStarterTest.java index 9b16eb4..cc6acf9 100644 --- a/kstreamplify-core-test/src/main/java/com/michelin/kstreamplify/KafkaStreamsStarterTest.java +++ b/kstreamplify-core-test/src/main/java/com/michelin/kstreamplify/KafkaStreamsStarterTest.java @@ -133,10 +133,35 @@ void generalTearDown() throws IOException { * @param <V> The serializable type of the value * @return The corresponding TestInputTopic */ + protected <K, V> TestInputTopic<K, V> createInputTestTopic(TopicWithSerde<K, V> topicWithSerde) { + return this.testDriver.createInputTopic( + topicWithSerde.getUnPrefixedName(), + topicWithSerde.getKeySerde().serializer(), + topicWithSerde.getValueSerde().serializer() + ); + } + + /** + * Creates an input test topic on the testDriver using the provided topicWithSerde. + * + * @param topicWithSerde The topic with serde used to crete the test topic + * @param <K> The serializable type of the key + * @param <V> The serializable type of the value + * @return The corresponding TestInputTopic + * + * @deprecated Use {@link #createInputTestTopic(TopicWithSerde)} + */ + @Deprecated(since = "1.1.0") protected <K, V> TestInputTopic<K, V> createInputTestTopic( - TopicWithSerde<K, V> topicWithSerde) { - return this.testDriver.createInputTopic(topicWithSerde.getUnPrefixedName(), - topicWithSerde.getKeySerde().serializer(), topicWithSerde.getValueSerde().serializer()); + com.michelin.kstreamplify.utils.TopicWithSerde<K, V> topicWithSerde + ) { + return createInputTestTopic( + new TopicWithSerde<>( + topicWithSerde.getUnPrefixedName(), + topicWithSerde.getKeySerde(), + topicWithSerde.getValueSerde() + ) + ); } /** @@ -147,10 +172,34 @@ protected <K, V> TestInputTopic<K, V> createInputTestTopic( * @param <V> The serializable type of the value * @return The corresponding TestOutputTopic */ + protected <K, V> TestOutputTopic<K, V> createOutputTestTopic(TopicWithSerde<K, V> topicWithSerde) { + return this.testDriver.createOutputTopic( + topicWithSerde.getUnPrefixedName(), + topicWithSerde.getKeySerde().deserializer(), + topicWithSerde.getValueSerde().deserializer() + ); + } + + /** + * Creates an output test topic on the testDriver using the provided topicWithSerde. + * + * @param topicWithSerde The topic with serde used to crete the test topic + * @param <K> The serializable type of the key + * @param <V> The serializable type of the value + * @return The corresponding TestOutputTopic + * + * @deprecated Use {@link #createOutputTestTopic(TopicWithSerde)} + */ + @Deprecated(since = "1.1.0") protected <K, V> TestOutputTopic<K, V> createOutputTestTopic( - TopicWithSerde<K, V> topicWithSerde) { - return this.testDriver.createOutputTopic(topicWithSerde.getUnPrefixedName(), - topicWithSerde.getKeySerde().deserializer(), - topicWithSerde.getValueSerde().deserializer()); + com.michelin.kstreamplify.utils.TopicWithSerde<K, V> topicWithSerde + ) { + return createOutputTestTopic( + new TopicWithSerde<>( + topicWithSerde.getUnPrefixedName(), + topicWithSerde.getKeySerde(), + topicWithSerde.getValueSerde() + ) + ); } } diff --git a/kstreamplify-core-test/src/test/java/com/michelin/kstreamplify/TopologyErrorHandlerTest.java b/kstreamplify-core-test/src/test/java/com/michelin/kstreamplify/TopologyErrorHandlerTest.java index c13fb23..e677ac1 100644 --- a/kstreamplify-core-test/src/test/java/com/michelin/kstreamplify/TopologyErrorHandlerTest.java +++ b/kstreamplify-core-test/src/test/java/com/michelin/kstreamplify/TopologyErrorHandlerTest.java @@ -140,6 +140,34 @@ void shouldContinueWhenProcessingValueIsInvalidAvro() { assertEquals(0, resultOutput.size()); } + @Test + @SuppressWarnings("deprecation") + void shouldCreateInputAndOutputTopicsWithDeprecatedSerde() { + TestInputTopic<String, String> inputTopic = createInputTestTopic( + new com.michelin.kstreamplify.utils.TopicWithSerde<>( + "INPUT_TOPIC", + "APP_NAME", + Serdes.String(), + Serdes.String() + ) + ); + + assertEquals("TestInputTopic[topic='INPUT_TOPIC', keySerializer=StringSerializer, " + + "valueSerializer=StringSerializer]", inputTopic.toString()); + + TestOutputTopic<String, String> outputTopic = createOutputTestTopic( + new com.michelin.kstreamplify.utils.TopicWithSerde<>( + "OUTPUT_TOPIC", + "APP_NAME", + Serdes.String(), + Serdes.String() + ) + ); + + assertEquals("TestOutputTopic[topic='OUTPUT_TOPIC', keyDeserializer=StringDeserializer, " + + "valueDeserializer=StringDeserializer, size=0]", outputTopic.toString()); + } + @Test void shouldCreateInputAndOutputTopicsWithSerde() { TestInputTopic<String, String> inputTopic = createInputTestTopic(new TopicWithSerde<>("INPUT_TOPIC",