Skip to content

Commit

Permalink
Restore create topic methods for unit tests (#257)
Browse files Browse the repository at this point in the history
* Restore create topic methods for unit tests

* Refacto

* Fix
  • Loading branch information
loicgreffier authored Oct 8, 2024
1 parent 5beb68c commit 3e09dea
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,35 @@ void generalTearDown() throws IOException {
* @param <V> The serializable type of the value
* @return The corresponding TestInputTopic
*/
protected <K, V> TestInputTopic<K, V> createInputTestTopic(TopicWithSerde<K, V> topicWithSerde) {
return this.testDriver.createInputTopic(
topicWithSerde.getUnPrefixedName(),
topicWithSerde.getKeySerde().serializer(),
topicWithSerde.getValueSerde().serializer()
);
}

/**
* Creates an input test topic on the testDriver using the provided topicWithSerde.
*
* @param topicWithSerde The topic with serde used to crete the test topic
* @param <K> The serializable type of the key
* @param <V> The serializable type of the value
* @return The corresponding TestInputTopic
*
* @deprecated Use {@link #createInputTestTopic(TopicWithSerde)}
*/
@Deprecated(since = "1.1.0")
protected <K, V> TestInputTopic<K, V> createInputTestTopic(
TopicWithSerde<K, V> topicWithSerde) {
return this.testDriver.createInputTopic(topicWithSerde.getUnPrefixedName(),
topicWithSerde.getKeySerde().serializer(), topicWithSerde.getValueSerde().serializer());
com.michelin.kstreamplify.utils.TopicWithSerde<K, V> topicWithSerde
) {
return createInputTestTopic(
new TopicWithSerde<>(
topicWithSerde.getUnPrefixedName(),
topicWithSerde.getKeySerde(),
topicWithSerde.getValueSerde()
)
);
}

/**
Expand All @@ -147,10 +172,34 @@ protected <K, V> TestInputTopic<K, V> createInputTestTopic(
* @param <V> The serializable type of the value
* @return The corresponding TestOutputTopic
*/
protected <K, V> TestOutputTopic<K, V> createOutputTestTopic(TopicWithSerde<K, V> topicWithSerde) {
return this.testDriver.createOutputTopic(
topicWithSerde.getUnPrefixedName(),
topicWithSerde.getKeySerde().deserializer(),
topicWithSerde.getValueSerde().deserializer()
);
}

/**
* Creates an output test topic on the testDriver using the provided topicWithSerde.
*
* @param topicWithSerde The topic with serde used to crete the test topic
* @param <K> The serializable type of the key
* @param <V> The serializable type of the value
* @return The corresponding TestOutputTopic
*
* @deprecated Use {@link #createOutputTestTopic(TopicWithSerde)}
*/
@Deprecated(since = "1.1.0")
protected <K, V> TestOutputTopic<K, V> createOutputTestTopic(
TopicWithSerde<K, V> topicWithSerde) {
return this.testDriver.createOutputTopic(topicWithSerde.getUnPrefixedName(),
topicWithSerde.getKeySerde().deserializer(),
topicWithSerde.getValueSerde().deserializer());
com.michelin.kstreamplify.utils.TopicWithSerde<K, V> topicWithSerde
) {
return createOutputTestTopic(
new TopicWithSerde<>(
topicWithSerde.getUnPrefixedName(),
topicWithSerde.getKeySerde(),
topicWithSerde.getValueSerde()
)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,34 @@ void shouldContinueWhenProcessingValueIsInvalidAvro() {
assertEquals(0, resultOutput.size());
}

@Test
@SuppressWarnings("deprecation")
void shouldCreateInputAndOutputTopicsWithDeprecatedSerde() {
TestInputTopic<String, String> inputTopic = createInputTestTopic(
new com.michelin.kstreamplify.utils.TopicWithSerde<>(
"INPUT_TOPIC",
"APP_NAME",
Serdes.String(),
Serdes.String()
)
);

assertEquals("TestInputTopic[topic='INPUT_TOPIC', keySerializer=StringSerializer, "
+ "valueSerializer=StringSerializer]", inputTopic.toString());

TestOutputTopic<String, String> outputTopic = createOutputTestTopic(
new com.michelin.kstreamplify.utils.TopicWithSerde<>(
"OUTPUT_TOPIC",
"APP_NAME",
Serdes.String(),
Serdes.String()
)
);

assertEquals("TestOutputTopic[topic='OUTPUT_TOPIC', keyDeserializer=StringDeserializer, "
+ "valueDeserializer=StringDeserializer, size=0]", outputTopic.toString());
}

@Test
void shouldCreateInputAndOutputTopicsWithSerde() {
TestInputTopic<String, String> inputTopic = createInputTestTopic(new TopicWithSerde<>("INPUT_TOPIC",
Expand Down

0 comments on commit 3e09dea

Please sign in to comment.