Skip to content

Commit

Permalink
Add test on topic with serdes
Browse files Browse the repository at this point in the history
  • Loading branch information
Loïc Greffier committed Sep 19, 2023
1 parent 4d80b06 commit c74d39e
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.michelin.kstreamplify.error.TopologyErrorHandler;
import com.michelin.kstreamplify.initializer.KafkaStreamsStarter;
import com.michelin.kstreamplify.utils.SerdesUtils;
import com.michelin.kstreamplify.utils.TopicWithSerde;
import java.util.List;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
Expand Down Expand Up @@ -136,4 +137,19 @@ void shouldContinueWhenProcessingValueIsInvalidAvro() {
assertEquals(1, resultDlq.size());
assertEquals(0, resultOutput.size());
}

@Test
void shouldCreateInputAndOutputTopicsWithSerde() {
TestInputTopic<String, String> inputTopic = createInputTestTopic(new TopicWithSerde<>("INPUT_TOPIC",
"APP_NAME", Serdes.String(), Serdes.String()));

assertEquals("TestInputTopic[topic='INPUT_TOPIC', keySerializer=StringSerializer, "
+ "valueSerializer=StringSerializer]", inputTopic.toString());

TestOutputTopic<String, String> outputTopic = createOutputTestTopic(new TopicWithSerde<>("OUTPUT_TOPIC",
"APP_NAME", Serdes.String(), Serdes.String()));

assertEquals("TestOutputTopic[topic='OUTPUT_TOPIC', keyDeserializer=StringDeserializer, "
+ "valueDeserializer=StringDeserializer, size=0]", outputTopic.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ public class TopicWithSerde<K, V> {
@Getter
private final Serde<V> valueSerde;


/**
* <p>Additional constructor which uses default parameter "self" for prefixPropertyKey.</p>
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package com.michelin.kstreamplify.utils;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.junit.jupiter.api.Test;

class TopicWithSerdesTest {

@Test
void shouldCreateTopicWithSerde() {
KafkaStreamsExecutionContext.registerProperties(new Properties());

TopicWithSerde<String, String> topicWithSerde = new TopicWithSerde<>("INPUT_TOPIC",
Serdes.String(), Serdes.String());

assertEquals("INPUT_TOPIC", topicWithSerde.getUnPrefixedName());
assertEquals("INPUT_TOPIC", topicWithSerde.toString());
}

@Test
void shouldCreateTopicWithSerdeWithPrefix() {
Properties properties = new Properties();
properties.put("prefix.self", "abc.");

KafkaStreamsExecutionContext.registerProperties(properties);

TopicWithSerde<String, String> topicWithSerde = new TopicWithSerde<>("INPUT_TOPIC",
Serdes.String(), Serdes.String());

assertEquals("INPUT_TOPIC", topicWithSerde.getUnPrefixedName());
assertEquals("abc.INPUT_TOPIC", topicWithSerde.toString());
}

@Test
void shouldCreateStream() {
KafkaStreamsExecutionContext.registerProperties(new Properties());

TopicWithSerde<String, String> topicWithSerde = new TopicWithSerde<>("INPUT_TOPIC",
Serdes.String(), Serdes.String());

StreamsBuilder streamsBuilder = new StreamsBuilder();
topicWithSerde.stream(streamsBuilder);

assertEquals("""
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [INPUT_TOPIC])
--> none
""", streamsBuilder.build().describe().toString());
}

@Test
void shouldCreateTable() {
KafkaStreamsExecutionContext.registerProperties(new Properties());

TopicWithSerde<String, String> topicWithSerde = new TopicWithSerde<>("INPUT_TOPIC",
Serdes.String(), Serdes.String());

StreamsBuilder streamsBuilder = new StreamsBuilder();
topicWithSerde.table(streamsBuilder, "myStore");

assertEquals("""
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [INPUT_TOPIC])
--> KTABLE-SOURCE-0000000001
Processor: KTABLE-SOURCE-0000000001 (stores: [myStore])
--> none
<-- KSTREAM-SOURCE-0000000000
""", streamsBuilder.build().describe().toString());
}

@Test
void shouldCreateGKTable() {
KafkaStreamsExecutionContext.registerProperties(new Properties());

TopicWithSerde<String, String> topicWithSerde = new TopicWithSerde<>("INPUT_TOPIC",
Serdes.String(), Serdes.String());

StreamsBuilder streamsBuilder = new StreamsBuilder();
topicWithSerde.globalTable(streamsBuilder, "myStore");

assertEquals("""
Topologies:
Sub-topology: 0 for global store (will not generate tasks)
Source: KSTREAM-SOURCE-0000000000 (topics: [INPUT_TOPIC])
--> KTABLE-SOURCE-0000000001
Processor: KTABLE-SOURCE-0000000001 (stores: [myStore])
--> none
<-- KSTREAM-SOURCE-0000000000
""", streamsBuilder.build().describe().toString());
}
}

This file was deleted.

0 comments on commit c74d39e

Please sign in to comment.