diff --git a/kstreamplify-core-test/src/test/java/com/michelin/kstreamplify/TopologyErrorHandlerTest.java b/kstreamplify-core-test/src/test/java/com/michelin/kstreamplify/TopologyErrorHandlerTest.java index ce7464a3..3af13ac7 100644 --- a/kstreamplify-core-test/src/test/java/com/michelin/kstreamplify/TopologyErrorHandlerTest.java +++ b/kstreamplify-core-test/src/test/java/com/michelin/kstreamplify/TopologyErrorHandlerTest.java @@ -7,6 +7,7 @@ import com.michelin.kstreamplify.error.TopologyErrorHandler; import com.michelin.kstreamplify.initializer.KafkaStreamsStarter; import com.michelin.kstreamplify.utils.SerdesUtils; +import com.michelin.kstreamplify.utils.TopicWithSerde; import java.util.List; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringDeserializer; @@ -136,4 +137,19 @@ void shouldContinueWhenProcessingValueIsInvalidAvro() { assertEquals(1, resultDlq.size()); assertEquals(0, resultOutput.size()); } + + @Test + void shouldCreateInputAndOutputTopicsWithSerde() { + TestInputTopic inputTopic = createInputTestTopic(new TopicWithSerde<>("INPUT_TOPIC", + "APP_NAME", Serdes.String(), Serdes.String())); + + assertEquals("TestInputTopic[topic='INPUT_TOPIC', keySerializer=StringSerializer, " + + "valueSerializer=StringSerializer]", inputTopic.toString()); + + TestOutputTopic outputTopic = createOutputTestTopic(new TopicWithSerde<>("OUTPUT_TOPIC", + "APP_NAME", Serdes.String(), Serdes.String())); + + assertEquals("TestOutputTopic[topic='OUTPUT_TOPIC', keyDeserializer=StringDeserializer, " + + "valueDeserializer=StringDeserializer, size=0]", outputTopic.toString()); + } } diff --git a/kstreamplify-core/pom.xml b/kstreamplify-core/pom.xml index da09e5ea..a665a0e6 100644 --- a/kstreamplify-core/pom.xml +++ b/kstreamplify-core/pom.xml @@ -11,6 +11,20 @@ kstreamplify-core + + + ch.qos.logback + logback-core + ${logback.version} + + + + ch.qos.logback + logback-classic + ${logback.version} + + + diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/context/KafkaStreamsExecutionContext.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/context/KafkaStreamsExecutionContext.java index b9a5268c..6371ea36 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/context/KafkaStreamsExecutionContext.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/context/KafkaStreamsExecutionContext.java @@ -6,7 +6,9 @@ import java.util.Map; import java.util.Properties; +import lombok.AccessLevel; import lombok.Getter; +import lombok.NoArgsConstructor; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -16,6 +18,7 @@ * The class to represent the context of the KStream. */ @Slf4j +@NoArgsConstructor(access = AccessLevel.PRIVATE) public class KafkaStreamsExecutionContext { /** @@ -36,6 +39,7 @@ public class KafkaStreamsExecutionContext { * The properties of the stream execution context. */ @Getter + @Setter private static Properties properties; /** @@ -51,9 +55,6 @@ public class KafkaStreamsExecutionContext { @Getter private static String prefix; - private KafkaStreamsExecutionContext() { - } - /** * Register KStream properties. * diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandler.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandler.java index a02eda5e..b3fda449 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandler.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandler.java @@ -24,6 +24,8 @@ public class DlqDeserializationExceptionHandler extends DlqExceptionHandler /** * Constructor. + * + * @param producer A Kafka producer. */ public DlqDeserializationExceptionHandler(Producer producer) { DlqExceptionHandler.producer = producer; diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqProductionExceptionHandler.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqProductionExceptionHandler.java index 3181cfd5..a75e6807 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqProductionExceptionHandler.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqProductionExceptionHandler.java @@ -22,6 +22,8 @@ public class DlqProductionExceptionHandler extends DlqExceptionHandler /** * Constructor. + * + * @param producer A Kafka producer */ public DlqProductionExceptionHandler(Producer producer) { DlqExceptionHandler.producer = producer; diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/initializer/KafkaStreamsInitializer.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/initializer/KafkaStreamsInitializer.java index a7c311c0..be4f238b 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/initializer/KafkaStreamsInitializer.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/initializer/KafkaStreamsInitializer.java @@ -48,7 +48,7 @@ public class KafkaStreamsInitializer { /** * The application properties. */ - protected Properties properties; + protected Properties properties = new Properties(); /** * The DLQ topic. diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/TopicWithSerde.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/TopicWithSerde.java index 9a853a89..2adce2d3 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/TopicWithSerde.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/TopicWithSerde.java @@ -57,7 +57,6 @@ public class TopicWithSerde { @Getter private final Serde valueSerde; - /** *

Additional constructor which uses default parameter "self" for prefixPropertyKey.

* diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/context/KafkaStreamsExecutionContextTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/context/KafkaStreamsExecutionContextTest.java new file mode 100644 index 00000000..783ed928 --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/context/KafkaStreamsExecutionContextTest.java @@ -0,0 +1,59 @@ +package com.michelin.kstreamplify.context; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import java.util.Properties; +import org.apache.kafka.streams.StreamsConfig; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class KafkaStreamsExecutionContextTest { + + @BeforeEach + void setUp() { + KafkaStreamsExecutionContext.setProperties(null); + } + + @Test + void shouldNotRegisterPropertiesWhenNull() { + KafkaStreamsExecutionContext.registerProperties(null); + assertNull(KafkaStreamsExecutionContext.getProperties()); + } + + @Test + void shouldAddPrefixToAppId() { + Properties properties = new Properties(); + properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); + properties.put("prefix.self", "abc."); + + KafkaStreamsExecutionContext.registerProperties(properties); + + assertEquals("abc.", KafkaStreamsExecutionContext.getPrefix()); + assertEquals("abc.appId", KafkaStreamsExecutionContext.getProperties() + .get(StreamsConfig.APPLICATION_ID_CONFIG)); + } + + @Test + void shouldNotAddPrefixToAppIdIfNoPrefix() { + Properties properties = new Properties(); + properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); + + KafkaStreamsExecutionContext.registerProperties(properties); + + assertEquals("", KafkaStreamsExecutionContext.getPrefix()); + assertEquals("appId", KafkaStreamsExecutionContext.getProperties() + .get(StreamsConfig.APPLICATION_ID_CONFIG)); + } + + @Test + void shouldNotAddPrefixToAppIdIfNotAppId() { + Properties properties = new Properties(); + properties.put("prefix.self", "abc."); + + KafkaStreamsExecutionContext.registerProperties(properties); + + assertEquals("abc.", KafkaStreamsExecutionContext.getPrefix()); + assertNull(KafkaStreamsExecutionContext.getProperties().get(StreamsConfig.APPLICATION_ID_CONFIG)); + } +} diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorTest.java index 2103a0a9..a80bd38f 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorTest.java @@ -34,7 +34,7 @@ class DedupWithPredicateProcessorTest { private TimestampedKeyValueStore store; @BeforeEach - public void setUp() { + void setUp() { // Create an instance of DedupWithPredicateProcessor for testing processor = new DedupWithPredicateProcessor<>("testStore", Duration.ofHours(1), TestKeyExtractor::extract); diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandlerTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandlerTest.java new file mode 100644 index 00000000..52a6af0d --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandlerTest.java @@ -0,0 +1,140 @@ +package com.michelin.kstreamplify.error; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.when; + +import com.michelin.kstreamplify.avro.KafkaError; +import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext; +import io.confluent.kafka.serializers.KafkaAvroSerializer; +import io.confluent.kafka.serializers.KafkaAvroSerializerConfig; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.errors.DeserializationExceptionHandler; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class DlqDeserializationExceptionHandlerTest { + @Mock + private ConsumerRecord record; + + @Mock + private ProcessorContext processorContext; + + private Producer producer; + + private DlqDeserializationExceptionHandler handler; + + @BeforeEach + void setUp() { + Serializer serializer = (Serializer) new KafkaAvroSerializer(); + serializer.configure(Map.of(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://"), false); + producer = new MockProducer<>(true, new ByteArraySerializer(), serializer); + + KafkaStreamsExecutionContext.setDlqTopicName(null); + } + + @Test + void shouldReturnFailIfNoDlq() { + handler = new DlqDeserializationExceptionHandler(producer); + + DeserializationExceptionHandler.DeserializationHandlerResponse response = + handler.handle(processorContext, record, new RuntimeException("Exception...")); + + assertEquals(DeserializationExceptionHandler.DeserializationHandlerResponse.FAIL, response); + } + + @Test + void shouldReturnFailOnExceptionDuringHandle() { + handler = new DlqDeserializationExceptionHandler(producer); + KafkaStreamsExecutionContext.setDlqTopicName("DlqTopic"); + DeserializationExceptionHandler.DeserializationHandlerResponse response = + handler.handle(processorContext, record, new KafkaException("Exception...")); + + assertEquals(DeserializationExceptionHandler.DeserializationHandlerResponse.FAIL, response); + } + + @Test + void shouldReturnContinueOnKafkaException() { + handler = new DlqDeserializationExceptionHandler(producer); + KafkaStreamsExecutionContext.setDlqTopicName("DlqTopic"); + + when(record.key()).thenReturn("key".getBytes(StandardCharsets.UTF_8)); + when(record.value()).thenReturn("value".getBytes(StandardCharsets.UTF_8)); + when(record.topic()).thenReturn("topic"); + + DeserializationExceptionHandler.DeserializationHandlerResponse response = + handler.handle(processorContext, record, new KafkaException("Exception...")); + + assertEquals(DeserializationExceptionHandler.DeserializationHandlerResponse.CONTINUE, response); + } + + @Test + void shouldConfigure() { + Map configs = new HashMap<>(); + configs.put("bootstrap.servers", "localhost:9092"); + configs.put("schema.registry.url", "localhost:8080"); + configs.put("acks", "all"); + + handler = new DlqDeserializationExceptionHandler(); + handler.configure(configs); + + assertTrue(DlqExceptionHandler.getProducer() instanceof KafkaProducer); + } + + @Test + void shouldEnrichWithException() { + KafkaError.Builder kafkaError = KafkaError.newBuilder() + .setTopic("topic") + .setStack("stack") + .setPartition(0) + .setOffset(0) + .setCause("cause") + .setValue("value"); + + handler = new DlqDeserializationExceptionHandler(); + KafkaError.Builder enrichedBuilder = handler.enrichWithException(kafkaError, + new RuntimeException("Exception..."), "key".getBytes(StandardCharsets.UTF_8), + "value".getBytes(StandardCharsets.UTF_8)); + + KafkaError error = enrichedBuilder.build(); + assertEquals("Unknown cause", error.getCause()); + assertNull(error.getContextMessage()); + } + + @Test + void shouldEnrichWithRecordTooLargeException() { + KafkaError.Builder kafkaError = KafkaError.newBuilder() + .setTopic("topic") + .setStack("stack") + .setPartition(0) + .setOffset(0) + .setCause("cause") + .setValue("value"); + + handler = new DlqDeserializationExceptionHandler(); + KafkaError.Builder enrichedBuilder = handler.enrichWithException(kafkaError, + new RecordTooLargeException("Exception..."), "key".getBytes(StandardCharsets.UTF_8), + "value".getBytes(StandardCharsets.UTF_8)); + + KafkaError error = enrichedBuilder.build(); + assertEquals("Unknown cause", error.getCause()); + assertEquals("The record is too large to be set as value (5 bytes). " + + "The key will be used instead", error.getValue()); + } +} diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/DlqExceptionHandlerTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/DlqExceptionHandlerTest.java new file mode 100644 index 00000000..47309059 --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/DlqExceptionHandlerTest.java @@ -0,0 +1,23 @@ +package com.michelin.kstreamplify.error; + +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import com.michelin.kstreamplify.error.DlqExceptionHandler; +import java.util.HashMap; +import java.util.Map; +import org.junit.jupiter.api.Test; + +class DlqExceptionHandlerTest { + + @Test + void shouldInstantiateProducer() { + Map configs = new HashMap<>(); + configs.put("bootstrap.servers", "localhost:9092"); + configs.put("schema.registry.url", "localhost:8080"); + configs.put("acks", "all"); + + DlqExceptionHandler.instantiateProducer("test-client", configs); + + assertNotNull(DlqExceptionHandler.getProducer()); + } +} diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/DlqProductionExceptionHandlerTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/DlqProductionExceptionHandlerTest.java new file mode 100644 index 00000000..25e3d215 --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/DlqProductionExceptionHandlerTest.java @@ -0,0 +1,108 @@ +package com.michelin.kstreamplify.error; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.when; + +import com.michelin.kstreamplify.avro.KafkaError; +import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext; +import com.michelin.kstreamplify.error.DlqExceptionHandler; +import com.michelin.kstreamplify.error.DlqProductionExceptionHandler; +import io.confluent.kafka.serializers.KafkaAvroSerializer; +import io.confluent.kafka.serializers.KafkaAvroSerializerConfig; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.clients.consumer.RetriableCommitFailedException; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.errors.ProductionExceptionHandler; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class DlqProductionExceptionHandlerTest { + @Mock + private ProducerRecord record; + + private Producer producer; + + private DlqProductionExceptionHandler handler; + + @BeforeEach + void setUp() { + Serializer serializer = (Serializer) new KafkaAvroSerializer(); + serializer.configure(Map.of(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://"), false); + producer = new MockProducer<>(true, new ByteArraySerializer(), serializer); + + KafkaStreamsExecutionContext.setDlqTopicName(null); + } + + @Test + void shouldReturnFailIfNoDlq() { + handler = new DlqProductionExceptionHandler(producer); + + ProductionExceptionHandler.ProductionExceptionHandlerResponse response = + handler.handle(record, new RuntimeException("Exception...")); + + assertEquals(ProductionExceptionHandler.ProductionExceptionHandlerResponse.FAIL, response); + } + + @Test + void shouldReturnContinueOnExceptionDuringHandle() { + handler = new DlqProductionExceptionHandler(producer); + KafkaStreamsExecutionContext.setDlqTopicName("DlqTopic"); + ProductionExceptionHandler.ProductionExceptionHandlerResponse response = + handler.handle(record, new KafkaException("Exception...")); + + assertEquals(ProductionExceptionHandler.ProductionExceptionHandlerResponse.CONTINUE, response); + } + + @Test + void shouldReturnContinueOnKafkaException() { + handler = new DlqProductionExceptionHandler(producer); + KafkaStreamsExecutionContext.setDlqTopicName("DlqTopic"); + + when(record.key()).thenReturn("key".getBytes(StandardCharsets.UTF_8)); + when(record.value()).thenReturn("value".getBytes(StandardCharsets.UTF_8)); + when(record.topic()).thenReturn("topic"); + + ProductionExceptionHandler.ProductionExceptionHandlerResponse response = + handler.handle(record, new KafkaException("Exception...")); + + assertEquals(ProductionExceptionHandler.ProductionExceptionHandlerResponse.CONTINUE, response); + } + + @Test + void shouldReturnFailOnRetriableException() { + handler = new DlqProductionExceptionHandler(producer); + KafkaStreamsExecutionContext.setDlqTopicName("DlqTopic"); + + ProductionExceptionHandler.ProductionExceptionHandlerResponse response = + handler.handle(record, new RetriableCommitFailedException("Exception...")); + + assertEquals(ProductionExceptionHandler.ProductionExceptionHandlerResponse.FAIL, response); + } + + @Test + void shouldConfigure() { + Map configs = new HashMap<>(); + configs.put("bootstrap.servers", "localhost:9092"); + configs.put("schema.registry.url", "localhost:8080"); + configs.put("acks", "all"); + + handler = new DlqProductionExceptionHandler(); + handler.configure(configs); + + assertTrue(DlqExceptionHandler.getProducer() instanceof KafkaProducer); + } +} + diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/GenericErrorProcessorTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/GenericErrorProcessorTest.java new file mode 100644 index 00000000..918116cc --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/GenericErrorProcessorTest.java @@ -0,0 +1,53 @@ +package com.michelin.kstreamplify.error; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.michelin.kstreamplify.avro.KafkaError; +import com.michelin.kstreamplify.error.GenericErrorProcessor; +import com.michelin.kstreamplify.error.ProcessingError; +import java.util.Optional; +import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext; +import org.apache.kafka.streams.processor.api.FixedKeyRecord; +import org.apache.kafka.streams.processor.api.RecordMetadata; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class GenericErrorProcessorTest { + private final GenericErrorProcessor errorProcessor = new GenericErrorProcessor<>(); + + @Mock + private FixedKeyProcessorContext mockContext; + + @Mock + private FixedKeyRecord> mockRecord; + + @Mock + private RecordMetadata mockRecordMetadata; + + @Test + void shouldProcessError() { + when(mockRecord.value()) + .thenReturn(new ProcessingError<>(new RuntimeException("Exception..."), "Context message", "Record")); + + // Given a mock RecordMetadata + when(mockRecordMetadata.offset()).thenReturn(10L); + when(mockRecordMetadata.partition()).thenReturn(0); + when(mockRecordMetadata.topic()).thenReturn("test-topic"); + + // Given that the context has a recordMetadata + when(mockContext.recordMetadata()).thenReturn(Optional.of(mockRecordMetadata)); + + // When processing the record + errorProcessor.init(mockContext); + errorProcessor.process(mockRecord); + + verify(mockContext).forward(any()); + } +} diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/ProcessingErrorTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/ProcessingErrorTest.java new file mode 100644 index 00000000..4fb66504 --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/ProcessingErrorTest.java @@ -0,0 +1,64 @@ +package com.michelin.kstreamplify.error; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.michelin.kstreamplify.avro.KafkaError; +import com.michelin.kstreamplify.error.ProcessingError; +import org.junit.jupiter.api.Test; + +class ProcessingErrorTest { + + @Test + void shouldCreateProcessingErrorFromStringRecord() { + String contextMessage = "Some context message"; + Exception exception = new Exception("Test Exception"); + String kafkaRecord = "Sample Kafka Record"; + + ProcessingError processingError = new ProcessingError<>(exception, contextMessage, kafkaRecord); + + // Assert + assertEquals(exception, processingError.getException()); + assertEquals(contextMessage, processingError.getContextMessage()); + assertEquals(kafkaRecord, processingError.getKafkaRecord()); + } + + @Test + void shouldCreateProcessingErrorWithNoContextMessage() { + Exception exception = new Exception("Test Exception"); + String kafkaRecord = "Sample Kafka Record"; + + ProcessingError processingError = new ProcessingError<>(exception, kafkaRecord); + + // Assert + assertEquals(exception, processingError.getException()); + assertEquals("No context message", processingError.getContextMessage()); + assertEquals(kafkaRecord, processingError.getKafkaRecord()); + } + + @Test + void shouldCreateProcessingErrorFromAvroRecord() { + String contextMessage = "Some context message"; + Exception exception = new Exception("Test Exception"); + KafkaError kafkaRecord = KafkaError.newBuilder() + .setCause("Cause") + .setOffset(1L) + .setPartition(1) + .setTopic("Topic") + .setValue("Value") + .build(); + + ProcessingError processingError = new ProcessingError<>(exception, contextMessage, kafkaRecord); + + assertEquals(exception, processingError.getException()); + assertEquals(contextMessage, processingError.getContextMessage()); + assertEquals(""" + { + "partition": 1, + "offset": 1, + "cause": "Cause", + "topic": "Topic", + "value": "Value" + }""", processingError.getKafkaRecord()); + } +} + diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/ProcessingResultTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/ProcessingResultTest.java new file mode 100644 index 00000000..719e2909 --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/ProcessingResultTest.java @@ -0,0 +1,90 @@ +package com.michelin.kstreamplify.error; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.michelin.kstreamplify.error.ProcessingResult; +import org.apache.kafka.streams.processor.api.Record; +import org.junit.jupiter.api.Test; + +class ProcessingResultTest { + + @Test + void shouldCreateProcessingResultSuccess() { + String successValue = "Success"; + ProcessingResult result = ProcessingResult.success(successValue); + + assertTrue(result.isValid()); + assertEquals(successValue, result.getValue()); + assertNull(result.getError()); + } + + @Test + void shouldCreateWrappedProcessingResult() { + String value = "Value"; + long timestamp = System.currentTimeMillis(); + + Record record = new Record<>("key", value, timestamp); + Record> wrappedRecord = ProcessingResult.wrapRecordSuccess(record); + + assertEquals(record.key(), wrappedRecord.key()); + assertNotNull(wrappedRecord.value()); + assertTrue(wrappedRecord.value().isValid()); + assertEquals(value, wrappedRecord.value().getValue()); + assertNull(wrappedRecord.value().getError()); + assertEquals(record.timestamp(), wrappedRecord.timestamp()); + } + + @Test + void shouldCreateFailedProcessingResult() { + String failedRecordValue = "Failed Value"; + Exception exception = new Exception("Test Exception"); + + ProcessingResult result = ProcessingResult.fail(exception, failedRecordValue); + + assertFalse(result.isValid()); + assertNull(result.getValue()); + assertNotNull(result.getError()); + assertEquals(exception, result.getError().getException()); + assertEquals(failedRecordValue, result.getError().getKafkaRecord()); + assertEquals("No context message", result.getError().getContextMessage()); + } + + @Test + void shouldCreateWrappedFailedProcessingResult() { + String key = "key"; + String failedValue = "value"; + long timestamp = System.currentTimeMillis(); + Exception exception = new Exception("Test Exception"); + + Record record = new Record<>(key, failedValue, timestamp); + + Record> wrappedRecord = + ProcessingResult.wrapRecordFailure(exception, record); + + assertEquals(record.key(), wrappedRecord.key()); + assertNotNull(wrappedRecord.value()); + assertFalse(wrappedRecord.value().isValid()); + assertNull(wrappedRecord.value().getValue()); + assertNotNull(wrappedRecord.value().getError()); + assertEquals(exception, wrappedRecord.value().getError().getException()); + assertEquals(failedValue, wrappedRecord.value().getError().getKafkaRecord()); + assertEquals("No context message", wrappedRecord.value().getError().getContextMessage()); + assertEquals(record.timestamp(), wrappedRecord.timestamp()); + } + + @Test + void shouldProcessingResultBeValid() { + ProcessingResult validResult = ProcessingResult.success("Value"); + ProcessingResult invalidResult1 = ProcessingResult.fail(new Exception(), 42); + ProcessingResult invalidResult2 = new ProcessingResult<>(null); + + assertTrue(validResult.isValid()); + assertFalse(invalidResult1.isValid()); + assertFalse(invalidResult2.isValid()); + } +} + diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/initializer/KafkaStreamsInitializerTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/initializer/KafkaStreamsInitializerTest.java index ca82f484..b1aee7c4 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/initializer/KafkaStreamsInitializerTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/initializer/KafkaStreamsInitializerTest.java @@ -5,21 +5,12 @@ import static com.michelin.kstreamplify.constants.PropertyConstants.PROPERTY_SEPARATOR; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.mockStatic; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext; -import com.michelin.kstreamplify.model.RestServiceResponse; import com.michelin.kstreamplify.properties.PropertiesUtils; -import com.michelin.kstreamplify.services.ProbeService; -import io.confluent.kafka.serializers.AbstractKafkaAvroSerializer; -import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe; -import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import java.util.Properties; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/integrations/KafkaStreamsInitializerIntegrationTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/integrations/KafkaStreamsInitializerIntegrationTest.java new file mode 100644 index 00000000..b4cabb1e --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/integrations/KafkaStreamsInitializerIntegrationTest.java @@ -0,0 +1,162 @@ +package com.michelin.kstreamplify.integrations; + +import static org.apache.kafka.streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext; +import com.michelin.kstreamplify.initializer.KafkaStreamsInitializer; +import com.michelin.kstreamplify.initializer.KafkaStreamsStarter; +import java.io.IOException; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsMetadata; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +@Slf4j +@Testcontainers +class KafkaStreamsInitializerIntegrationTest { + private final KafkaStreamsInitializer initializer = new KafkaStreamInitializerImpl(); + + private final HttpClient httpClient = HttpClient.newBuilder().build(); + + @Container + static KafkaContainer kafka = new KafkaContainer(DockerImageName + .parse("confluentinc/cp-kafka:7.4.0")) + .withKraft(); + + @BeforeAll + static void setUp() { + createTopics("inputTopic", "outputTopic"); + } + + @Test + void shouldInitAndRun() throws InterruptedException, IOException { + initializer.init(new KafkaStreamsStarterImpl()); + + waitingForKafkaStreamsToRun(); + + assertEquals(KafkaStreams.State.RUNNING, initializer.getKafkaStreams().state()); + + List streamsMetadata = + new ArrayList<>(initializer.getKafkaStreams().metadataForAllStreamsClients()); + + // Assert Kafka Streams initialization + assertEquals("localhost", streamsMetadata.get(0).hostInfo().host()); + assertEquals(8080, streamsMetadata.get(0).hostInfo().port()); + assertTrue(streamsMetadata.get(0).stateStoreNames().isEmpty()); + + List topicPartitions = streamsMetadata.get(0).topicPartitions().stream().toList(); + + assertEquals("inputTopic", topicPartitions.get(0).topic()); + assertEquals(0, topicPartitions.get(0).partition()); + + assertEquals("dlqTopic", KafkaStreamsExecutionContext.getDlqTopicName()); + assertEquals("org.apache.kafka.common.serialization.Serdes$StringSerde", + KafkaStreamsExecutionContext.getSerdesConfig().get("default.key.serde")); + assertEquals("org.apache.kafka.common.serialization.Serdes$StringSerde", + KafkaStreamsExecutionContext.getSerdesConfig().get("default.value.serde")); + + assertEquals("localhost:8080", + KafkaStreamsExecutionContext.getProperties().get("application.server")); + + // Assert HTTP probes + HttpRequest requestReady = HttpRequest.newBuilder() + .uri(URI.create("http://localhost:8080/ready")) + .GET() + .build(); + + HttpResponse responseReady = httpClient.send(requestReady, HttpResponse.BodyHandlers.discarding()); + + assertEquals(200, responseReady.statusCode()); + + HttpRequest requestLiveness = HttpRequest.newBuilder() + .uri(URI.create("http://localhost:8080/liveness")) + .GET() + .build(); + + HttpResponse responseLiveness = httpClient.send(requestLiveness, HttpResponse.BodyHandlers.discarding()); + + assertEquals(200, responseLiveness.statusCode()); + + HttpRequest requestTopology = HttpRequest.newBuilder() + .uri(URI.create("http://localhost:8080/topology")) + .GET() + .build(); + + HttpResponse responseTopology = httpClient.send(requestTopology, HttpResponse.BodyHandlers.ofString()); + + assertEquals(200, responseTopology.statusCode()); + assertEquals(""" + Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [inputTopic]) + --> KSTREAM-SINK-0000000001 + Sink: KSTREAM-SINK-0000000001 (topic: outputTopic) + <-- KSTREAM-SOURCE-0000000000 + + """, responseTopology.body()); + } + + private void waitingForKafkaStreamsToRun() throws InterruptedException { + while (!initializer.getKafkaStreams().state().equals(KafkaStreams.State.RUNNING)) { + log.info("Waiting for Kafka Streams to start..."); + Thread.sleep(2000); + } + } + + private static void createTopics(String... topics) { + var newTopics = Arrays.stream(topics) + .map(topic -> new NewTopic(topic, 1, (short) 1)) + .toList(); + try (var admin = AdminClient.create(Map.of(BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()))) { + admin.createTopics(newTopics); + } + } + + static class KafkaStreamInitializerImpl extends KafkaStreamsInitializer { + @Override + protected void initProperties() { + super.initProperties(); + KafkaStreamsExecutionContext.getProperties() + .setProperty(BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); + } + } + + @Slf4j + static class KafkaStreamsStarterImpl extends KafkaStreamsStarter { + @Override + public void topology(StreamsBuilder streamsBuilder) { + streamsBuilder + .stream("inputTopic") + .to("outputTopic"); + } + + @Override + public String dlqTopic() { + return "dlqTopic"; + } + + @Override + public void onStart(KafkaStreams kafkaStreams) { + log.info("Starting Kafka Streams from integration tests!"); + } + } +} diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/properties/PropertiesUtilsTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/properties/PropertiesUtilsTest.java new file mode 100644 index 00000000..7e61a16d --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/properties/PropertiesUtilsTest.java @@ -0,0 +1,28 @@ +package com.michelin.kstreamplify.properties; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Properties; +import org.junit.jupiter.api.Test; + +class PropertiesUtilsTest { + + @Test + void shouldLoadProperties() { + Properties properties = PropertiesUtils.loadProperties(); + + assertTrue(properties.containsKey("server.port")); + assertTrue(properties.containsValue(8080)); + + assertTrue(properties.containsKey("kafka.properties.application.id")); + assertTrue(properties.containsValue("appId")); + } + + @Test + void shouldLoadKafkaProperties() { + Properties properties = PropertiesUtils.loadKafkaProperties(PropertiesUtils.loadProperties()); + + assertTrue(properties.containsKey("application.id")); + assertTrue(properties.containsValue("appId")); + } +} diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/properties/RocksDbConfigTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/properties/RocksDbConfigTest.java index 64d68a0c..7c908973 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/properties/RocksDbConfigTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/properties/RocksDbConfigTest.java @@ -11,12 +11,15 @@ import java.util.Properties; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import org.mockito.junit.jupiter.MockitoExtension; import org.rocksdb.BlockBasedTableConfig; import org.rocksdb.CompressionType; import org.rocksdb.Options; +@ExtendWith(MockitoExtension.class) class RocksDbConfigTest { @Mock @@ -24,7 +27,6 @@ class RocksDbConfigTest { @BeforeEach void setUp() { - MockitoAnnotations.openMocks(this); when(options.tableFormatConfig()).thenReturn(new BlockBasedTableConfig()); } diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/DefaultProbeControllerTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/DefaultProbeControllerTest.java new file mode 100644 index 00000000..78ab4bc5 --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/DefaultProbeControllerTest.java @@ -0,0 +1,17 @@ +package com.michelin.kstreamplify.rest; + +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import com.michelin.kstreamplify.initializer.KafkaStreamsInitializer; +import org.junit.jupiter.api.Test; + +class DefaultProbeControllerTest { + @Test + void shouldCreateServerWithDefaultHostAndPort() { + DefaultProbeController controller = new DefaultProbeController(new KafkaStreamsInitializer()); + + assertNotNull(controller.server.getAddress().getAddress().getHostName()); + assertNotEquals(0, controller.server.getAddress().getPort()); + } +} diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/utils/TopicWithSerdesTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/utils/TopicWithSerdesTest.java new file mode 100644 index 00000000..e068e03c --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/utils/TopicWithSerdesTest.java @@ -0,0 +1,101 @@ +package com.michelin.kstreamplify.utils; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext; +import java.util.Properties; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.kstream.KStream; +import org.junit.jupiter.api.Test; + +class TopicWithSerdesTest { + + @Test + void shouldCreateTopicWithSerde() { + KafkaStreamsExecutionContext.registerProperties(new Properties()); + + TopicWithSerde topicWithSerde = new TopicWithSerde<>("INPUT_TOPIC", + Serdes.String(), Serdes.String()); + + assertEquals("INPUT_TOPIC", topicWithSerde.getUnPrefixedName()); + assertEquals("INPUT_TOPIC", topicWithSerde.toString()); + } + + @Test + void shouldCreateTopicWithSerdeWithPrefix() { + Properties properties = new Properties(); + properties.put("prefix.self", "abc."); + + KafkaStreamsExecutionContext.registerProperties(properties); + + TopicWithSerde topicWithSerde = new TopicWithSerde<>("INPUT_TOPIC", + Serdes.String(), Serdes.String()); + + assertEquals("INPUT_TOPIC", topicWithSerde.getUnPrefixedName()); + assertEquals("abc.INPUT_TOPIC", topicWithSerde.toString()); + } + + @Test + void shouldCreateStream() { + KafkaStreamsExecutionContext.registerProperties(new Properties()); + + TopicWithSerde topicWithSerde = new TopicWithSerde<>("INPUT_TOPIC", + Serdes.String(), Serdes.String()); + + StreamsBuilder streamsBuilder = new StreamsBuilder(); + topicWithSerde.stream(streamsBuilder); + + assertEquals(""" + Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [INPUT_TOPIC]) + --> none + + """, streamsBuilder.build().describe().toString()); + } + + @Test + void shouldCreateTable() { + KafkaStreamsExecutionContext.registerProperties(new Properties()); + + TopicWithSerde topicWithSerde = new TopicWithSerde<>("INPUT_TOPIC", + Serdes.String(), Serdes.String()); + + StreamsBuilder streamsBuilder = new StreamsBuilder(); + topicWithSerde.table(streamsBuilder, "myStore"); + + assertEquals(""" + Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [INPUT_TOPIC]) + --> KTABLE-SOURCE-0000000001 + Processor: KTABLE-SOURCE-0000000001 (stores: [myStore]) + --> none + <-- KSTREAM-SOURCE-0000000000 + + """, streamsBuilder.build().describe().toString()); + } + + @Test + void shouldCreateGlobalKtable() { + KafkaStreamsExecutionContext.registerProperties(new Properties()); + + TopicWithSerde topicWithSerde = new TopicWithSerde<>("INPUT_TOPIC", + Serdes.String(), Serdes.String()); + + StreamsBuilder streamsBuilder = new StreamsBuilder(); + topicWithSerde.globalTable(streamsBuilder, "myStore"); + + assertEquals(""" + Topologies: + Sub-topology: 0 for global store (will not generate tasks) + Source: KSTREAM-SOURCE-0000000000 (topics: [INPUT_TOPIC]) + --> KTABLE-SOURCE-0000000001 + Processor: KTABLE-SOURCE-0000000001 (stores: [myStore]) + --> none + <-- KSTREAM-SOURCE-0000000000 + """, streamsBuilder.build().describe().toString()); + } +} diff --git a/kstreamplify-core/src/test/resources/application.yml b/kstreamplify-core/src/test/resources/application.yml new file mode 100644 index 00000000..508dbd3f --- /dev/null +++ b/kstreamplify-core/src/test/resources/application.yml @@ -0,0 +1,7 @@ +server: + port: 8080 +kafka: + properties: + application.id: appId + default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde + default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde \ No newline at end of file diff --git a/kstreamplify-core/src/test/resources/logback-test.xml b/kstreamplify-core/src/test/resources/logback-test.xml new file mode 100644 index 00000000..3865b14e --- /dev/null +++ b/kstreamplify-core/src/test/resources/logback-test.xml @@ -0,0 +1,11 @@ + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + \ No newline at end of file diff --git a/kstreamplify-spring-boot/pom.xml b/kstreamplify-spring-boot/pom.xml index 1d3b1542..5cbb314e 100644 --- a/kstreamplify-spring-boot/pom.xml +++ b/kstreamplify-spring-boot/pom.xml @@ -24,6 +24,12 @@ ${spring-boot.version} + + org.springframework.boot + spring-boot-starter-logging + ${spring-boot.version} + + org.springframework.boot spring-boot-starter-test @@ -31,6 +37,13 @@ test + + org.springframework.boot + spring-boot-testcontainers + ${spring-boot.version} + test + + com.michelin kstreamplify-core diff --git a/kstreamplify-spring-boot/src/main/java/com/michelin/kstreamplify/properties/KafkaProperties.java b/kstreamplify-spring-boot/src/main/java/com/michelin/kstreamplify/properties/KafkaProperties.java index 04057298..79323374 100644 --- a/kstreamplify-spring-boot/src/main/java/com/michelin/kstreamplify/properties/KafkaProperties.java +++ b/kstreamplify-spring-boot/src/main/java/com/michelin/kstreamplify/properties/KafkaProperties.java @@ -19,7 +19,7 @@ public class KafkaProperties { /** * The Kafka properties. */ - private final Map properties = new HashMap<>(); + private Map properties = new HashMap<>(); /** * Return the Kafka properties as {@link java.util.Properties}. diff --git a/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/integrations/SpringKafkaStreamsInitializerIntegrationTest.java b/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/integrations/SpringKafkaStreamsInitializerIntegrationTest.java new file mode 100644 index 00000000..135a9ef7 --- /dev/null +++ b/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/integrations/SpringKafkaStreamsInitializerIntegrationTest.java @@ -0,0 +1,159 @@ +package com.michelin.kstreamplify.integrations; + +import static org.apache.kafka.streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.DEFINED_PORT; + +import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext; +import com.michelin.kstreamplify.initializer.KafkaStreamsInitializer; +import com.michelin.kstreamplify.initializer.KafkaStreamsStarter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsMetadata; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.web.client.TestRestTemplate; +import org.springframework.http.ResponseEntity; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +@Slf4j +@Testcontainers +@SpringBootTest(webEnvironment = DEFINED_PORT) +class SpringKafkaStreamsInitializerIntegrationTest { + @Autowired + private KafkaStreamsInitializer initializer; + + @Autowired + private TestRestTemplate restTemplate; + + @Container + static KafkaContainer kafka = new KafkaContainer(DockerImageName + .parse("confluentinc/cp-kafka:7.4.0")) + .withKraft(); + + @DynamicPropertySource + static void kafkaProperties(DynamicPropertyRegistry registry) { + registry.add("kafka.properties." + BOOTSTRAP_SERVERS_CONFIG, + kafka::getBootstrapServers); + } + + @BeforeAll + static void setUp() { + createTopics("inputTopic", "outputTopic"); + } + + @Test + void shouldInitAndRun() throws InterruptedException, IOException { + waitingForKafkaStreamsToRun(); + + assertEquals(KafkaStreams.State.RUNNING, initializer.getKafkaStreams().state()); + + List streamsMetadata = + new ArrayList<>(initializer.getKafkaStreams().metadataForAllStreamsClients()); + + // Assert Kafka Streams initialization + assertEquals("localhost", streamsMetadata.get(0).hostInfo().host()); + assertEquals(8081, streamsMetadata.get(0).hostInfo().port()); + assertTrue(streamsMetadata.get(0).stateStoreNames().isEmpty()); + + List topicPartitions = streamsMetadata.get(0).topicPartitions().stream().toList(); + + assertEquals("inputTopic", topicPartitions.get(0).topic()); + assertEquals(0, topicPartitions.get(0).partition()); + + assertEquals("dlqTopic", KafkaStreamsExecutionContext.getDlqTopicName()); + assertEquals("org.apache.kafka.common.serialization.Serdes$StringSerde", + KafkaStreamsExecutionContext.getSerdesConfig().get("default.key.serde")); + assertEquals("org.apache.kafka.common.serialization.Serdes$StringSerde", + KafkaStreamsExecutionContext.getSerdesConfig().get("default.value.serde")); + + assertEquals("localhost:8081", + KafkaStreamsExecutionContext.getProperties().get("application.server")); + + // Assert HTTP probes + + ResponseEntity responseReady = restTemplate + .getForEntity("http://localhost:8081/ready", Void.class); + + assertEquals(200, responseReady.getStatusCode().value()); + + ResponseEntity responseLiveness = restTemplate + .getForEntity("http://localhost:8081/liveness", Void.class); + + assertEquals(200, responseLiveness.getStatusCode().value()); + + ResponseEntity responseTopology = restTemplate + .getForEntity("http://localhost:8081/topology", String.class); + + assertEquals(200, responseTopology.getStatusCode().value()); + assertEquals(""" + Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [inputTopic]) + --> KSTREAM-SINK-0000000001 + Sink: KSTREAM-SINK-0000000001 (topic: outputTopic) + <-- KSTREAM-SOURCE-0000000000 + + """, responseTopology.getBody()); + } + + private void waitingForKafkaStreamsToRun() throws InterruptedException { + while (!initializer.getKafkaStreams().state().equals(KafkaStreams.State.RUNNING)) { + log.info("Waiting for Kafka Streams to start..."); + Thread.sleep(2000); + } + } + + private static void createTopics(String... topics) { + var newTopics = Arrays.stream(topics) + .map(topic -> new NewTopic(topic, 1, (short) 1)) + .toList(); + try (var admin = AdminClient.create(Map.of(BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()))) { + admin.createTopics(newTopics); + } + } + + @Slf4j + @SpringBootApplication + static class KafkaStreamsStarterImpl extends KafkaStreamsStarter { + public static void main(String[] args) { + SpringApplication.run(KafkaStreamsStarterImpl.class, args); + } + + @Override + public void topology(StreamsBuilder streamsBuilder) { + streamsBuilder + .stream("inputTopic") + .to("outputTopic"); + } + + @Override + public String dlqTopic() { + return "dlqTopic"; + } + + @Override + public void onStart(KafkaStreams kafkaStreams) { + log.info("Starting Kafka Streams from integration tests!"); + } + } +} diff --git a/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/properties/KafkaPropertiesTest.java b/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/properties/KafkaPropertiesTest.java new file mode 100644 index 00000000..c4c0e5b0 --- /dev/null +++ b/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/properties/KafkaPropertiesTest.java @@ -0,0 +1,25 @@ +package com.michelin.kstreamplify.properties; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Map; +import org.junit.jupiter.api.Test; + +class KafkaPropertiesTest { + + private final KafkaProperties kafkaProperties = new KafkaProperties(); + + @Test + void shouldLoadProperties() { + Map props = Map.of( + "application.id", "appId" + ); + + kafkaProperties.setProperties(props); + + assertTrue(kafkaProperties.getProperties().containsKey("application.id")); + assertTrue(kafkaProperties.getProperties().containsValue("appId")); + assertTrue(kafkaProperties.asProperties().containsKey("application.id")); + assertTrue(kafkaProperties.asProperties().containsValue("appId")); + } +} diff --git a/kstreamplify-spring-boot/src/test/resources/application.yml b/kstreamplify-spring-boot/src/test/resources/application.yml new file mode 100644 index 00000000..3ffd8327 --- /dev/null +++ b/kstreamplify-spring-boot/src/test/resources/application.yml @@ -0,0 +1,7 @@ +server: + port: 8081 +kafka: + properties: + application.id: appId + default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde + default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde \ No newline at end of file diff --git a/pom.xml b/pom.xml index 6de3f0de..afe64be0 100644 --- a/pom.xml +++ b/pom.xml @@ -14,7 +14,7 @@ The Apache License, Version 2.0 - http://www.apache.org/licenses/LICENSE-2.0.txt + https://www.apache.org/licenses/LICENSE-2.0.txt @@ -111,26 +111,29 @@ kstreamplify-core-test - - 1.11.2 - 2.13.0 - 3.13.0 - 2.10.1 - 17 - 5.10.0 - 3.4.0 - 7.5.0 - 1.18.28 - 17 - 17 - 5.4.0 - 0.8.10 - 3.1.3 - michelin - michelin_kstreamplify - ${project.artifactId} - https://sonarcloud.io - + + 1.11.3 + 2.13.0 + 3.13.0 + 2.10.1 + 17 + 5.10.0 + 3.4.0 + 7.5.0 + 1.4.11 + 1.18.30 + 17 + 17 + 5.5.0 + 0.8.10 + 2.0.9 + 3.1.4 + 1.19.0 + michelin + michelin_kstreamplify + ${project.artifactId} + https://sonarcloud.io + @@ -182,37 +185,70 @@ ${commons-lang3.version} - - org.mockito - mockito-core - ${mockito.version} - test - + + org.slf4j + slf4j-api + ${slf4j.version} + + + + org.slf4j + slf4j-simple + ${slf4j.version} + + + + org.mockito + mockito-core + ${mockito.version} + test + - - org.mockito - mockito-junit-jupiter - ${mockito.version} - test - - + + org.mockito + mockito-junit-jupiter + ${mockito.version} + test + + + + org.testcontainers + testcontainers + ${testcontainers.version} + test + - - - - org.apache.maven.plugins - maven-source-plugin - 3.3.0 - - - attach-sources - verify - - jar-no-fork - - - - + + org.testcontainers + junit-jupiter + ${testcontainers.version} + test + + + + org.testcontainers + kafka + ${testcontainers.version} + test + + + + + + + org.apache.maven.plugins + maven-source-plugin + 3.3.0 + + + attach-sources + verify + + jar-no-fork + + + + org.apache.maven.plugins