diff --git a/kstreamplify-core-test/src/test/java/com/michelin/kstreamplify/TopologyErrorHandlerTest.java b/kstreamplify-core-test/src/test/java/com/michelin/kstreamplify/TopologyErrorHandlerTest.java index ce7464a3..3af13ac7 100644 --- a/kstreamplify-core-test/src/test/java/com/michelin/kstreamplify/TopologyErrorHandlerTest.java +++ b/kstreamplify-core-test/src/test/java/com/michelin/kstreamplify/TopologyErrorHandlerTest.java @@ -7,6 +7,7 @@ import com.michelin.kstreamplify.error.TopologyErrorHandler; import com.michelin.kstreamplify.initializer.KafkaStreamsStarter; import com.michelin.kstreamplify.utils.SerdesUtils; +import com.michelin.kstreamplify.utils.TopicWithSerde; import java.util.List; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringDeserializer; @@ -136,4 +137,19 @@ void shouldContinueWhenProcessingValueIsInvalidAvro() { assertEquals(1, resultDlq.size()); assertEquals(0, resultOutput.size()); } + + @Test + void shouldCreateInputAndOutputTopicsWithSerde() { + TestInputTopic inputTopic = createInputTestTopic(new TopicWithSerde<>("INPUT_TOPIC", + "APP_NAME", Serdes.String(), Serdes.String())); + + assertEquals("TestInputTopic[topic='INPUT_TOPIC', keySerializer=StringSerializer, " + + "valueSerializer=StringSerializer]", inputTopic.toString()); + + TestOutputTopic outputTopic = createOutputTestTopic(new TopicWithSerde<>("OUTPUT_TOPIC", + "APP_NAME", Serdes.String(), Serdes.String())); + + assertEquals("TestOutputTopic[topic='OUTPUT_TOPIC', keyDeserializer=StringDeserializer, " + + "valueDeserializer=StringDeserializer, size=0]", outputTopic.toString()); + } } diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/context/KafkaStreamsExecutionContext.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/context/KafkaStreamsExecutionContext.java index b9a5268c..6371ea36 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/context/KafkaStreamsExecutionContext.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/context/KafkaStreamsExecutionContext.java @@ -6,7 +6,9 @@ import java.util.Map; import java.util.Properties; +import lombok.AccessLevel; import lombok.Getter; +import lombok.NoArgsConstructor; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -16,6 +18,7 @@ * The class to represent the context of the KStream. */ @Slf4j +@NoArgsConstructor(access = AccessLevel.PRIVATE) public class KafkaStreamsExecutionContext { /** @@ -36,6 +39,7 @@ public class KafkaStreamsExecutionContext { * The properties of the stream execution context. */ @Getter + @Setter private static Properties properties; /** @@ -51,9 +55,6 @@ public class KafkaStreamsExecutionContext { @Getter private static String prefix; - private KafkaStreamsExecutionContext() { - } - /** * Register KStream properties. * diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/initializer/KafkaStreamsInitializer.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/initializer/KafkaStreamsInitializer.java index a7c311c0..be4f238b 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/initializer/KafkaStreamsInitializer.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/initializer/KafkaStreamsInitializer.java @@ -48,7 +48,7 @@ public class KafkaStreamsInitializer { /** * The application properties. */ - protected Properties properties; + protected Properties properties = new Properties(); /** * The DLQ topic. diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/TopicWithSerde.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/TopicWithSerde.java index 9a853a89..2adce2d3 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/TopicWithSerde.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/TopicWithSerde.java @@ -57,7 +57,6 @@ public class TopicWithSerde { @Getter private final Serde valueSerde; - /** *

Additional constructor which uses default parameter "self" for prefixPropertyKey.

* diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/context/KafkaStreamsExecutionContextTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/context/KafkaStreamsExecutionContextTest.java new file mode 100644 index 00000000..783ed928 --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/context/KafkaStreamsExecutionContextTest.java @@ -0,0 +1,59 @@ +package com.michelin.kstreamplify.context; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import java.util.Properties; +import org.apache.kafka.streams.StreamsConfig; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class KafkaStreamsExecutionContextTest { + + @BeforeEach + void setUp() { + KafkaStreamsExecutionContext.setProperties(null); + } + + @Test + void shouldNotRegisterPropertiesWhenNull() { + KafkaStreamsExecutionContext.registerProperties(null); + assertNull(KafkaStreamsExecutionContext.getProperties()); + } + + @Test + void shouldAddPrefixToAppId() { + Properties properties = new Properties(); + properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); + properties.put("prefix.self", "abc."); + + KafkaStreamsExecutionContext.registerProperties(properties); + + assertEquals("abc.", KafkaStreamsExecutionContext.getPrefix()); + assertEquals("abc.appId", KafkaStreamsExecutionContext.getProperties() + .get(StreamsConfig.APPLICATION_ID_CONFIG)); + } + + @Test + void shouldNotAddPrefixToAppIdIfNoPrefix() { + Properties properties = new Properties(); + properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); + + KafkaStreamsExecutionContext.registerProperties(properties); + + assertEquals("", KafkaStreamsExecutionContext.getPrefix()); + assertEquals("appId", KafkaStreamsExecutionContext.getProperties() + .get(StreamsConfig.APPLICATION_ID_CONFIG)); + } + + @Test + void shouldNotAddPrefixToAppIdIfNotAppId() { + Properties properties = new Properties(); + properties.put("prefix.self", "abc."); + + KafkaStreamsExecutionContext.registerProperties(properties); + + assertEquals("abc.", KafkaStreamsExecutionContext.getPrefix()); + assertNull(KafkaStreamsExecutionContext.getProperties().get(StreamsConfig.APPLICATION_ID_CONFIG)); + } +} diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorTest.java index 2103a0a9..a80bd38f 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorTest.java @@ -34,7 +34,7 @@ class DedupWithPredicateProcessorTest { private TimestampedKeyValueStore store; @BeforeEach - public void setUp() { + void setUp() { // Create an instance of DedupWithPredicateProcessor for testing processor = new DedupWithPredicateProcessor<>("testStore", Duration.ofHours(1), TestKeyExtractor::extract); diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/DlqDeserializationExceptionHandlerTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandlerTest.java similarity index 68% rename from kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/DlqDeserializationExceptionHandlerTest.java rename to kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandlerTest.java index cf0b669c..52a6af0d 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/DlqDeserializationExceptionHandlerTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandlerTest.java @@ -1,13 +1,12 @@ -package com.michelin.kstreamplify.rest; +package com.michelin.kstreamplify.error; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.when; import com.michelin.kstreamplify.avro.KafkaError; import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext; -import com.michelin.kstreamplify.error.DlqDeserializationExceptionHandler; -import com.michelin.kstreamplify.error.DlqExceptionHandler; import io.confluent.kafka.serializers.KafkaAvroSerializer; import io.confluent.kafka.serializers.KafkaAvroSerializerConfig; import java.nio.charset.StandardCharsets; @@ -18,6 +17,7 @@ import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.errors.DeserializationExceptionHandler; @@ -96,4 +96,45 @@ void shouldConfigure() { assertTrue(DlqExceptionHandler.getProducer() instanceof KafkaProducer); } + + @Test + void shouldEnrichWithException() { + KafkaError.Builder kafkaError = KafkaError.newBuilder() + .setTopic("topic") + .setStack("stack") + .setPartition(0) + .setOffset(0) + .setCause("cause") + .setValue("value"); + + handler = new DlqDeserializationExceptionHandler(); + KafkaError.Builder enrichedBuilder = handler.enrichWithException(kafkaError, + new RuntimeException("Exception..."), "key".getBytes(StandardCharsets.UTF_8), + "value".getBytes(StandardCharsets.UTF_8)); + + KafkaError error = enrichedBuilder.build(); + assertEquals("Unknown cause", error.getCause()); + assertNull(error.getContextMessage()); + } + + @Test + void shouldEnrichWithRecordTooLargeException() { + KafkaError.Builder kafkaError = KafkaError.newBuilder() + .setTopic("topic") + .setStack("stack") + .setPartition(0) + .setOffset(0) + .setCause("cause") + .setValue("value"); + + handler = new DlqDeserializationExceptionHandler(); + KafkaError.Builder enrichedBuilder = handler.enrichWithException(kafkaError, + new RecordTooLargeException("Exception..."), "key".getBytes(StandardCharsets.UTF_8), + "value".getBytes(StandardCharsets.UTF_8)); + + KafkaError error = enrichedBuilder.build(); + assertEquals("Unknown cause", error.getCause()); + assertEquals("The record is too large to be set as value (5 bytes). " + + "The key will be used instead", error.getValue()); + } } diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/DlqExceptionHandlerTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/DlqExceptionHandlerTest.java similarity index 94% rename from kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/DlqExceptionHandlerTest.java rename to kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/DlqExceptionHandlerTest.java index 13dbcd51..47309059 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/DlqExceptionHandlerTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/DlqExceptionHandlerTest.java @@ -1,4 +1,4 @@ -package com.michelin.kstreamplify.rest; +package com.michelin.kstreamplify.error; import static org.junit.jupiter.api.Assertions.assertNotNull; diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/DlqProductionExceptionHandlerTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/DlqProductionExceptionHandlerTest.java similarity index 99% rename from kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/DlqProductionExceptionHandlerTest.java rename to kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/DlqProductionExceptionHandlerTest.java index f526f509..25e3d215 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/DlqProductionExceptionHandlerTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/DlqProductionExceptionHandlerTest.java @@ -1,4 +1,4 @@ -package com.michelin.kstreamplify.rest; +package com.michelin.kstreamplify.error; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/GenericErrorProcessorTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/GenericErrorProcessorTest.java similarity index 97% rename from kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/GenericErrorProcessorTest.java rename to kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/GenericErrorProcessorTest.java index 56cb6010..918116cc 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/GenericErrorProcessorTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/GenericErrorProcessorTest.java @@ -1,4 +1,4 @@ -package com.michelin.kstreamplify.rest; +package com.michelin.kstreamplify.error; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/ProcessingErrorTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/ProcessingErrorTest.java similarity index 98% rename from kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/ProcessingErrorTest.java rename to kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/ProcessingErrorTest.java index 8165388f..4fb66504 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/ProcessingErrorTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/ProcessingErrorTest.java @@ -1,4 +1,4 @@ -package com.michelin.kstreamplify.rest; +package com.michelin.kstreamplify.error; import static org.junit.jupiter.api.Assertions.assertEquals; diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/ProcessingResultTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/ProcessingResultTest.java similarity index 98% rename from kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/ProcessingResultTest.java rename to kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/ProcessingResultTest.java index 5e470924..719e2909 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/ProcessingResultTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/ProcessingResultTest.java @@ -1,4 +1,4 @@ -package com.michelin.kstreamplify.rest; +package com.michelin.kstreamplify.error; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/initializer/KafkaStreamsInitializerTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/initializer/KafkaStreamsInitializerTest.java index ca82f484..b1aee7c4 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/initializer/KafkaStreamsInitializerTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/initializer/KafkaStreamsInitializerTest.java @@ -5,21 +5,12 @@ import static com.michelin.kstreamplify.constants.PropertyConstants.PROPERTY_SEPARATOR; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.mockStatic; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext; -import com.michelin.kstreamplify.model.RestServiceResponse; import com.michelin.kstreamplify.properties.PropertiesUtils; -import com.michelin.kstreamplify.services.ProbeService; -import io.confluent.kafka.serializers.AbstractKafkaAvroSerializer; -import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe; -import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import java.util.Properties; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/properties/PropertiesUtilsTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/properties/PropertiesUtilsTest.java new file mode 100644 index 00000000..7e61a16d --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/properties/PropertiesUtilsTest.java @@ -0,0 +1,28 @@ +package com.michelin.kstreamplify.properties; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Properties; +import org.junit.jupiter.api.Test; + +class PropertiesUtilsTest { + + @Test + void shouldLoadProperties() { + Properties properties = PropertiesUtils.loadProperties(); + + assertTrue(properties.containsKey("server.port")); + assertTrue(properties.containsValue(8080)); + + assertTrue(properties.containsKey("kafka.properties.application.id")); + assertTrue(properties.containsValue("appId")); + } + + @Test + void shouldLoadKafkaProperties() { + Properties properties = PropertiesUtils.loadKafkaProperties(PropertiesUtils.loadProperties()); + + assertTrue(properties.containsKey("application.id")); + assertTrue(properties.containsValue("appId")); + } +} diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/properties/RocksDbConfigTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/properties/RocksDbConfigTest.java index 64d68a0c..7c908973 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/properties/RocksDbConfigTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/properties/RocksDbConfigTest.java @@ -11,12 +11,15 @@ import java.util.Properties; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import org.mockito.junit.jupiter.MockitoExtension; import org.rocksdb.BlockBasedTableConfig; import org.rocksdb.CompressionType; import org.rocksdb.Options; +@ExtendWith(MockitoExtension.class) class RocksDbConfigTest { @Mock @@ -24,7 +27,6 @@ class RocksDbConfigTest { @BeforeEach void setUp() { - MockitoAnnotations.openMocks(this); when(options.tableFormatConfig()).thenReturn(new BlockBasedTableConfig()); } diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/DefaultProbeControllerTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/DefaultProbeControllerTest.java new file mode 100644 index 00000000..78ab4bc5 --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/DefaultProbeControllerTest.java @@ -0,0 +1,17 @@ +package com.michelin.kstreamplify.rest; + +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import com.michelin.kstreamplify.initializer.KafkaStreamsInitializer; +import org.junit.jupiter.api.Test; + +class DefaultProbeControllerTest { + @Test + void shouldCreateServerWithDefaultHostAndPort() { + DefaultProbeController controller = new DefaultProbeController(new KafkaStreamsInitializer()); + + assertNotNull(controller.server.getAddress().getAddress().getHostName()); + assertNotEquals(0, controller.server.getAddress().getPort()); + } +} diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/utils/TopicWithSerdesTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/utils/TopicWithSerdesTest.java new file mode 100644 index 00000000..e068e03c --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/utils/TopicWithSerdesTest.java @@ -0,0 +1,101 @@ +package com.michelin.kstreamplify.utils; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext; +import java.util.Properties; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.kstream.KStream; +import org.junit.jupiter.api.Test; + +class TopicWithSerdesTest { + + @Test + void shouldCreateTopicWithSerde() { + KafkaStreamsExecutionContext.registerProperties(new Properties()); + + TopicWithSerde topicWithSerde = new TopicWithSerde<>("INPUT_TOPIC", + Serdes.String(), Serdes.String()); + + assertEquals("INPUT_TOPIC", topicWithSerde.getUnPrefixedName()); + assertEquals("INPUT_TOPIC", topicWithSerde.toString()); + } + + @Test + void shouldCreateTopicWithSerdeWithPrefix() { + Properties properties = new Properties(); + properties.put("prefix.self", "abc."); + + KafkaStreamsExecutionContext.registerProperties(properties); + + TopicWithSerde topicWithSerde = new TopicWithSerde<>("INPUT_TOPIC", + Serdes.String(), Serdes.String()); + + assertEquals("INPUT_TOPIC", topicWithSerde.getUnPrefixedName()); + assertEquals("abc.INPUT_TOPIC", topicWithSerde.toString()); + } + + @Test + void shouldCreateStream() { + KafkaStreamsExecutionContext.registerProperties(new Properties()); + + TopicWithSerde topicWithSerde = new TopicWithSerde<>("INPUT_TOPIC", + Serdes.String(), Serdes.String()); + + StreamsBuilder streamsBuilder = new StreamsBuilder(); + topicWithSerde.stream(streamsBuilder); + + assertEquals(""" + Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [INPUT_TOPIC]) + --> none + + """, streamsBuilder.build().describe().toString()); + } + + @Test + void shouldCreateTable() { + KafkaStreamsExecutionContext.registerProperties(new Properties()); + + TopicWithSerde topicWithSerde = new TopicWithSerde<>("INPUT_TOPIC", + Serdes.String(), Serdes.String()); + + StreamsBuilder streamsBuilder = new StreamsBuilder(); + topicWithSerde.table(streamsBuilder, "myStore"); + + assertEquals(""" + Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [INPUT_TOPIC]) + --> KTABLE-SOURCE-0000000001 + Processor: KTABLE-SOURCE-0000000001 (stores: [myStore]) + --> none + <-- KSTREAM-SOURCE-0000000000 + + """, streamsBuilder.build().describe().toString()); + } + + @Test + void shouldCreateGlobalKtable() { + KafkaStreamsExecutionContext.registerProperties(new Properties()); + + TopicWithSerde topicWithSerde = new TopicWithSerde<>("INPUT_TOPIC", + Serdes.String(), Serdes.String()); + + StreamsBuilder streamsBuilder = new StreamsBuilder(); + topicWithSerde.globalTable(streamsBuilder, "myStore"); + + assertEquals(""" + Topologies: + Sub-topology: 0 for global store (will not generate tasks) + Source: KSTREAM-SOURCE-0000000000 (topics: [INPUT_TOPIC]) + --> KTABLE-SOURCE-0000000001 + Processor: KTABLE-SOURCE-0000000001 (stores: [myStore]) + --> none + <-- KSTREAM-SOURCE-0000000000 + """, streamsBuilder.build().describe().toString()); + } +} diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/utils/TopicWithSerdesUnitTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/utils/TopicWithSerdesUnitTest.java deleted file mode 100644 index cc4a4229..00000000 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/utils/TopicWithSerdesUnitTest.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.michelin.kstreamplify.utils; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; - -import org.apache.kafka.common.serialization.Serdes; -import org.junit.jupiter.api.Test; - -class TopicWithSerdesUnitTest { - - @Test - void shouldCreateTopicWithSerde() { - TopicWithSerde topicWithSerde = new TopicWithSerde<>("INPUT_TOPIC", - Serdes.String(), Serdes.String()); - - assertEquals("INPUT_TOPIC", topicWithSerde.getUnPrefixedName()); - } -} diff --git a/kstreamplify-core/src/test/resources/application.yml b/kstreamplify-core/src/test/resources/application.yml new file mode 100644 index 00000000..434274e2 --- /dev/null +++ b/kstreamplify-core/src/test/resources/application.yml @@ -0,0 +1,5 @@ +server: + port: 8080 +kafka: + properties: + application.id: appId