diff --git a/kstreamplify-core-test/src/test/java/com/michelin/kstreamplify/TopologyErrorHandlerTest.java b/kstreamplify-core-test/src/test/java/com/michelin/kstreamplify/TopologyErrorHandlerTest.java index c30c20b1..ce7464a3 100644 --- a/kstreamplify-core-test/src/test/java/com/michelin/kstreamplify/TopologyErrorHandlerTest.java +++ b/kstreamplify-core-test/src/test/java/com/michelin/kstreamplify/TopologyErrorHandlerTest.java @@ -86,7 +86,6 @@ void setUp() { @Test void shouldContinueWhenProcessingValueIsValid() { - stringInputTopic.pipeInput("key", "message"); var resultDlq = dlqTopic.readValuesToList(); @@ -109,7 +108,6 @@ void shouldSendExceptionToDlqWhenProcessingValueIsInvalid() { @Test void shouldContinueWhenProcessingValueIsValidAvro() { - KafkaError avroModel = KafkaError.newBuilder() .setTopic("topic") .setStack("stack") @@ -130,7 +128,6 @@ void shouldContinueWhenProcessingValueIsValidAvro() { @Test void shouldContinueWhenProcessingValueIsInvalidAvro() { - avroInputTopic.pipeInput("key", null); List resultDlq = dlqTopic.readValuesToList(); diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/converter/JsonToAvroConverter.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/converter/JsonToAvroConverter.java index d6fc5bb2..2ae490df 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/converter/JsonToAvroConverter.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/converter/JsonToAvroConverter.java @@ -12,6 +12,8 @@ import java.util.HashMap; import java.util.Map; import java.util.Optional; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; @@ -21,6 +23,7 @@ /** * The class to convert Json to Avro. */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) public class JsonToAvroConverter { /** * Convert a file in json to avro. diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandler.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandler.java index 75abc4e8..a02eda5e 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandler.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandler.java @@ -3,9 +3,11 @@ import com.michelin.kstreamplify.avro.KafkaError; import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext; import java.util.Map; +import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.KafkaException; import org.apache.kafka.streams.errors.DeserializationExceptionHandler; @@ -15,10 +17,18 @@ * The class managing deserialization exceptions. */ @Slf4j +@NoArgsConstructor public class DlqDeserializationExceptionHandler extends DlqExceptionHandler implements DeserializationExceptionHandler { private static final Object GUARD = new Object(); + /** + * Constructor. + */ + public DlqDeserializationExceptionHandler(Producer producer) { + DlqExceptionHandler.producer = producer; + } + /** * Manage deserialization exceptions. * diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqExceptionHandler.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqExceptionHandler.java index 84e723b8..15487f97 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqExceptionHandler.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqExceptionHandler.java @@ -7,8 +7,10 @@ import java.nio.ByteBuffer; import java.util.Map; import java.util.Properties; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.serialization.ByteArraySerializer; @@ -21,7 +23,8 @@ public abstract class DlqExceptionHandler { /** * The DLQ producer. */ - protected static KafkaProducer producer; + @Getter + protected static Producer producer; /** * Create a producer. @@ -29,7 +32,7 @@ public abstract class DlqExceptionHandler { * @param clientId The producer client id * @param configs The producer configs */ - protected static void instantiateProducer(String clientId, Map configs) { + public static void instantiateProducer(String clientId, Map configs) { Properties properties = new Properties(); properties.putAll(configs); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, @@ -49,9 +52,9 @@ protected static void instantiateProducer(String clientId, Map config * @param value the record value * @return the error enriched by the exception */ - protected KafkaError.Builder enrichWithException(KafkaError.Builder builder, - Exception exception, byte[] key, - byte[] value) { + public KafkaError.Builder enrichWithException(KafkaError.Builder builder, + Exception exception, byte[] key, + byte[] value) { StringWriter sw = new StringWriter(); PrintWriter pw = new PrintWriter(sw); exception.printStackTrace(pw); diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqProductionExceptionHandler.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqProductionExceptionHandler.java index 7e376adc..3181cfd5 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqProductionExceptionHandler.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqProductionExceptionHandler.java @@ -3,8 +3,10 @@ import com.michelin.kstreamplify.avro.KafkaError; import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext; import java.util.Map; +import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.streams.errors.ProductionExceptionHandler; @@ -13,10 +15,18 @@ * The class managing DLQ production exceptions. */ @Slf4j +@NoArgsConstructor public class DlqProductionExceptionHandler extends DlqExceptionHandler implements ProductionExceptionHandler { private static final Object GUARD = new Object(); + /** + * Constructor. + */ + public DlqProductionExceptionHandler(Producer producer) { + DlqExceptionHandler.producer = producer; + } + /** * Manage production exceptions. * diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/ProcessingResult.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/ProcessingResult.java index 858ccc63..0c9a5c5b 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/ProcessingResult.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/ProcessingResult.java @@ -26,7 +26,7 @@ public class ProcessingResult { * * @param value The success value */ - private ProcessingResult(V value) { + public ProcessingResult(V value) { this.value = value; } @@ -35,7 +35,7 @@ private ProcessingResult(V value) { * * @param error the ProcessingError containing the */ - private ProcessingResult(ProcessingError error) { + public ProcessingResult(ProcessingError error) { this.error = error; } diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/TopicWithSerde.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/TopicWithSerde.java index 3c0bab83..9a853a89 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/TopicWithSerde.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/TopicWithSerde.java @@ -24,7 +24,7 @@ * @param The model used as the value avro of the topic. */ @AllArgsConstructor(access = AccessLevel.PUBLIC) -public final class TopicWithSerde { +public class TopicWithSerde { /** * Name of the topic. */ diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/WindowStateStoreUtils.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/WindowStateStoreUtils.java index 2cc7192e..3de9d109 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/WindowStateStoreUtils.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/WindowStateStoreUtils.java @@ -38,9 +38,11 @@ public static V get(WindowStore stateStore, K key, int retentionDay var resultIterator = stateStore.backwardFetch(key, Instant.now().minus(Duration.ofDays(retentionDays)), Instant.now()); + if (resultIterator != null && resultIterator.hasNext()) { return resultIterator.next().value; } + return null; } } diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/converter/JsonToAvroConverterTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/converter/JsonToAvroConverterTest.java new file mode 100644 index 00000000..93ae25a1 --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/converter/JsonToAvroConverterTest.java @@ -0,0 +1,33 @@ +package com.michelin.kstreamplify.converter; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; + +import com.michelin.kstreamplify.avro.KafkaTestAvro; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Test; + +@Slf4j +class JsonToAvroConverterTest { + + private static final String JSON = + "{\"membersString\":{\"key1\":\"val1\"},\"split\":[{\"subSplit\":" + + "[{\"subSubIntField\":8,\"subSubField\":\"subSubTest\"}],\"subField\":\"subTest\"}]," + + "\"booleanField\":false,\"members\":{\"key1\":{\"mapQuantityField\":1}}," + + "\"quantityField\":10,\"stringField\":\"test\",\"listString\":[\"val1\",\"val2\"]}"; + + @Test + void shouldConvertJsonToAvro() { + KafkaTestAvro kafkaTest = (KafkaTestAvro) JsonToAvroConverter.jsonToAvro(JSON, KafkaTestAvro.getClassSchema()); + assertEquals("val1", kafkaTest.getMembersString().get("key1")); + assertEquals(8, kafkaTest.getSplit().get(0).getSubSplit().get(0).getSubSubIntField()); + assertEquals("subSubTest", kafkaTest.getSplit().get(0).getSubSplit().get(0).getSubSubField()); + assertEquals("subTest", kafkaTest.getSplit().get(0).getSubField()); + assertFalse(kafkaTest.getBooleanField()); + assertEquals("1.0000", kafkaTest.getMembers().get("key1").getMapQuantityField().toString()); + assertEquals("10.0000", kafkaTest.getQuantityField().toString()); + assertEquals("test", kafkaTest.getStringField()); + assertEquals("val1", kafkaTest.getListString().get(0)); + assertEquals("val2", kafkaTest.getListString().get(1)); + } +} diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessorTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessorTest.java new file mode 100644 index 00000000..2dad34bd --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessorTest.java @@ -0,0 +1,45 @@ +package com.michelin.kstreamplify.deduplication; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.michelin.kstreamplify.avro.KafkaError; +import com.michelin.kstreamplify.error.ProcessingResult; +import java.time.Duration; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class DedupKeyProcessorTest { + + @Mock + private ProcessorContext> context; + + @Mock + private TimestampedKeyValueStore dedupTimestampedStore; + + @Test + void shouldProcessNewRecord() { + String key = "some-key"; + + when(context.getStateStore("dedupStoreName")).thenReturn(dedupTimestampedStore); + when(dedupTimestampedStore.get(key)).thenReturn(null); + + DedupKeyProcessor dedupKeyProcessor = new DedupKeyProcessor<>("dedupStoreName", Duration.ZERO); + dedupKeyProcessor.init(context); + + KafkaError value = new KafkaError(); + Record record = new Record<>(key, value, 0); + dedupKeyProcessor.process(record); + + verify(dedupTimestampedStore).put(eq(key), any()); + } +} diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessorTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessorTest.java new file mode 100644 index 00000000..faa070d2 --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessorTest.java @@ -0,0 +1,43 @@ +package com.michelin.kstreamplify.deduplication; + +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.michelin.kstreamplify.avro.KafkaError; +import com.michelin.kstreamplify.error.ProcessingResult; +import java.time.Duration; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.WindowStore; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class DedupKeyValueProcessorTest { + + @Mock + private ProcessorContext> context; + + @Mock + private WindowStore windowStore; + + @Test + void shouldProcessNewRecord() { + String key = "some-key"; + KafkaError value = new KafkaError(); + + Record record = new Record<>(key, value, 0); + + when(context.getStateStore("dedupStoreName")).thenReturn(windowStore); + + DedupKeyValueProcessor dedupKeyValueProcessor = new DedupKeyValueProcessor<>("dedupStoreName", + Duration.ZERO); + dedupKeyValueProcessor.init(context); + dedupKeyValueProcessor.process(record); + + verify(windowStore).put(record.key(), record.value(), record.timestamp()); + } +} diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorTest.java new file mode 100644 index 00000000..2103a0a9 --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorTest.java @@ -0,0 +1,98 @@ +package com.michelin.kstreamplify.deduplication; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.michelin.kstreamplify.avro.KafkaError; +import com.michelin.kstreamplify.error.ProcessingResult; +import java.time.Duration; +import org.apache.avro.specific.SpecificRecord; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class DedupWithPredicateProcessorTest { + + private DedupWithPredicateProcessor processor; + + @Mock + private ProcessorContext> context; + + @Mock + private TimestampedKeyValueStore store; + + @BeforeEach + public void setUp() { + // Create an instance of DedupWithPredicateProcessor for testing + processor = new DedupWithPredicateProcessor<>("testStore", Duration.ofHours(1), TestKeyExtractor::extract); + + // Stub the context.getStateStore method to return the mock store + when(context.getStateStore("testStore")).thenReturn(store); + + processor.init(context); + } + + @Test + void shouldProcessFirstTime() { + // Create a test record + Record record = new Record<>("key", new KafkaError(), 0L); + + // Stub store.get to return null, indicating it's the first time + when(store.get(any())).thenReturn(null); + + // Call the process method + processor.process(record); + + verify(store).put(eq(""), argThat(arg -> arg.value().equals(record.value()))); + verify(context).forward(argThat(arg -> arg.value().getValue().equals(record.value()))); + } + + @Test + void shouldProcessDuplicate() { + // Create a test record + Record record = new Record<>("key", new KafkaError(), 0L); + + // Stub store.get to return a value, indicating a duplicate + when(store.get("")).thenReturn(ValueAndTimestamp.make(new KafkaError(), 0L)); + + // Call the process method + processor.process(record); + + verify(store, never()).put(any(), any()); + verify(context, never()).forward(any()); + } + + @Test + void shouldThrowException() { + // Create a test record + Record record = new Record<>("key", new KafkaError(), 0L); + + when(store.get(any())).thenReturn(null); + doThrow(new RuntimeException("Exception...")).when(store).put(any(), any()); + + // Call the process method + processor.process(record); + + verify(context).forward(argThat(arg -> arg.value().getError().getContextMessage() + .equals("Couldn't figure out what to do with the current payload: " + + "An unlikely error occurred during deduplication transform"))); + } + + public static class TestKeyExtractor { + public static String extract(V v) { + return ""; + } + } +} diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/initializer/KafkaStreamsInitializerTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/initializer/KafkaStreamsInitializerTest.java new file mode 100644 index 00000000..ca82f484 --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/initializer/KafkaStreamsInitializerTest.java @@ -0,0 +1,81 @@ +package com.michelin.kstreamplify.initializer; + +import static com.michelin.kstreamplify.constants.InitializerConstants.SERVER_PORT_PROPERTY; +import static com.michelin.kstreamplify.constants.PropertyConstants.KAFKA_PROPERTIES_PREFIX; +import static com.michelin.kstreamplify.constants.PropertyConstants.PROPERTY_SEPARATOR; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext; +import com.michelin.kstreamplify.model.RestServiceResponse; +import com.michelin.kstreamplify.properties.PropertiesUtils; +import com.michelin.kstreamplify.services.ProbeService; +import io.confluent.kafka.serializers.AbstractKafkaAvroSerializer; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; +import java.util.Properties; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.MockedStatic; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class KafkaStreamsInitializerTest { + + private final KafkaStreamsInitializer initializer = new KafkaStreamsInitializer(); + + @Test + void shouldInitProperties() { + try (MockedStatic propertiesUtilsMockedStatic = mockStatic(PropertiesUtils.class)) { + Properties properties = new Properties(); + properties.put(SERVER_PORT_PROPERTY, 8080); + properties.put(KAFKA_PROPERTIES_PREFIX + PROPERTY_SEPARATOR + StreamsConfig.APPLICATION_ID_CONFIG, "appId"); + properties.put(KAFKA_PROPERTIES_PREFIX + PROPERTY_SEPARATOR + "prefix.self", "abc."); + + propertiesUtilsMockedStatic.when(PropertiesUtils::loadProperties) + .thenReturn(properties); + + propertiesUtilsMockedStatic.when(() -> PropertiesUtils.loadKafkaProperties(any())).thenCallRealMethod(); + + initializer.initProperties(); + + assertNotNull(initializer.getProperties()); + assertEquals(8080, initializer.getServerPort()); + assertTrue(initializer.getKafkaProperties().containsKey(StreamsConfig.APPLICATION_ID_CONFIG)); + assertEquals("abc.", KafkaStreamsExecutionContext.getPrefix()); + assertEquals("abc.appId", KafkaStreamsExecutionContext.getProperties() + .get(StreamsConfig.APPLICATION_ID_CONFIG)); + } + } + + @Test + void shouldShutdownClientOnUncaughtException() { + try (MockedStatic propertiesUtilsMockedStatic = mockStatic(PropertiesUtils.class)) { + Properties properties = new Properties(); + properties.put(SERVER_PORT_PROPERTY, 8080); + properties.put(KAFKA_PROPERTIES_PREFIX + PROPERTY_SEPARATOR + StreamsConfig.APPLICATION_ID_CONFIG, "appId"); + + propertiesUtilsMockedStatic.when(PropertiesUtils::loadProperties) + .thenReturn(properties); + + propertiesUtilsMockedStatic.when(() -> PropertiesUtils.loadKafkaProperties(any())) + .thenCallRealMethod(); + + initializer.initProperties(); + + StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse response = initializer + .onStreamsUncaughtException(new RuntimeException("Test Exception")); + + assertEquals(StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT, response); + } + } +} diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/initializer/KafkaStreamsStarterTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/initializer/KafkaStreamsStarterTest.java new file mode 100644 index 00000000..ba09af0d --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/initializer/KafkaStreamsStarterTest.java @@ -0,0 +1,122 @@ +package com.michelin.kstreamplify.initializer; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.michelin.kstreamplify.avro.KafkaError; +import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext; +import com.michelin.kstreamplify.deduplication.DeduplicationUtils; +import com.michelin.kstreamplify.error.ProcessingResult; +import com.michelin.kstreamplify.error.TopologyErrorHandler; +import com.michelin.kstreamplify.utils.SerdesUtils; +import com.michelin.kstreamplify.utils.TopicWithSerde; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; +import java.io.IOException; +import java.time.Duration; +import java.util.Map; +import java.util.Properties; +import lombok.Getter; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.junit.jupiter.api.Test; + +class KafkaStreamsStarterTest { + + @Test + void shouldInstantiateKafkaStreamsStarter() { + KafkaStreamsExecutionContext.registerProperties(new Properties()); + KafkaStreamsExecutionContext.setSerdesConfig( + Map.of(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://")); + + StreamsBuilder builder = new StreamsBuilder(); + KafkaStreamsStarterImpl starter = new KafkaStreamsStarterImpl(); + starter.topology(builder); + + assertNotNull(builder.build().describe()); + assertEquals("dlqTopicUnitTests", starter.dlqTopic()); + + starter.onStart(null); + assertTrue(starter.isStarted()); + } + + /** + * Kafka Streams Starter implementation used for unit tests purpose. + */ + @Getter + static class KafkaStreamsStarterImpl extends KafkaStreamsStarter { + private boolean started; + + @Override + public void topology(StreamsBuilder streamsBuilder) { + var streams = TopicWithSerdesTestHelper.inputTopicWithSerdes().stream(streamsBuilder); + + DeduplicationUtils.deduplicateKeys(streamsBuilder, streams, + "deduplicateKeysStoreName", "deduplicateKeysRepartitionName", + Duration.ZERO); + DeduplicationUtils.deduplicateKeyValues(streamsBuilder, streams, + "deduplicateKeyValuesStoreName", + "deduplicateKeyValuesRepartitionName", Duration.ZERO); + DeduplicationUtils.deduplicateWithPredicate(streamsBuilder, streams, Duration.ofMillis(1), null); + + var enrichedStreams = streams.mapValues(KafkaStreamsStarterImpl::enrichValue); + var enrichedStreams2 = streams.mapValues(KafkaStreamsStarterImpl::enrichValue2); + var processingResults = TopologyErrorHandler.catchErrors(enrichedStreams); + TopologyErrorHandler.catchErrors(enrichedStreams2, true); + TopicWithSerdesTestHelper.outputTopicWithSerdes().produce(processingResults); + + } + + @Override + public String dlqTopic() { + return "dlqTopicUnitTests"; + } + + @Override + public void onStart(KafkaStreams kafkaStreams) { + started = true; + } + + private static ProcessingResult enrichValue(KafkaError input) { + if (input != null) { + String output = "output field"; + return ProcessingResult.success(output); + } else { + return ProcessingResult.fail(new IOException("an exception occurred"), "output error"); + } + } + + private static ProcessingResult enrichValue2(KafkaError input) { + if (input != null) { + String output = "output field 2"; + return ProcessingResult.success(output); + } else { + return ProcessingResult.fail(new IOException("an exception occurred"), "output error 2"); + } + } + } + + /** + * Topic with serdes helper used for unit tests purpose. + * + * @param The key type + * @param The value type + */ + public static class TopicWithSerdesTestHelper extends TopicWithSerde { + private TopicWithSerdesTestHelper(String name, String appName, Serde keySerde, Serde valueSerde) { + super(name, appName, keySerde, valueSerde); + } + + public static TopicWithSerdesTestHelper outputTopicWithSerdes() { + return new TopicWithSerdesTestHelper<>("OUTPUT_TOPIC", "APP_NAME", + Serdes.String(), Serdes.String()); + } + + public static TopicWithSerdesTestHelper inputTopicWithSerdes() { + return new TopicWithSerdesTestHelper<>("INPUT_TOPIC", "APP_NAME", + Serdes.String(), SerdesUtils.getSerdesForValue()); + } + } +} diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/properties/RocksDbConfigTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/properties/RocksDbConfigTest.java new file mode 100644 index 00000000..64d68a0c --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/properties/RocksDbConfigTest.java @@ -0,0 +1,82 @@ +package com.michelin.kstreamplify.properties; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.rocksdb.BlockBasedTableConfig; +import org.rocksdb.CompressionType; +import org.rocksdb.Options; + +class RocksDbConfigTest { + + @Mock + private Options options; + + @BeforeEach + void setUp() { + MockitoAnnotations.openMocks(this); + when(options.tableFormatConfig()).thenReturn(new BlockBasedTableConfig()); + } + + @Test + void testSetConfigWithDefaultValues() { + // Arrange + Map configs = new HashMap<>(); + RocksDbConfig rocksDbConfig = new RocksDbConfig(); + KafkaStreamsExecutionContext.registerProperties(new Properties()); + + // Act + rocksDbConfig.setConfig("storeName", options, configs); + + // Assert + verify(options, times(1)).tableFormatConfig(); + verify(options, times(1)).setTableFormatConfig(any()); + verify(options, times(1)).setMaxWriteBufferNumber(RocksDbConfig.ROCKSDB_MAX_WRITE_BUFFER_DEFAULT); + verify(options, times(1)).setWriteBufferSize(RocksDbConfig.ROCKSDB_WRITE_BUFFER_SIZE_DEFAULT); + verify(options, times(1)).setCompressionType(CompressionType.NO_COMPRESSION); + } + + @Test + void testSetConfigWithCustomValues() { + // Arrange + long cacheSize = 64 * 1024L * 1024L; + long writeBufferSize = 8 * 1024L * 1024L; + long blockSize = 8 * 1024L; + int maxWriteBuffer = 4; + boolean cacheIndexBlock = false; + String compressionType = "lz4"; + + Map configs = new HashMap<>(); + configs.put(RocksDbConfig.ROCKSDB_CACHE_SIZE_CONFIG, String.valueOf(cacheSize)); + configs.put(RocksDbConfig.ROCKSDB_WRITE_BUFFER_SIZE_CONFIG, String.valueOf(writeBufferSize)); + configs.put(RocksDbConfig.ROCKSDB_BLOCK_SIZE_CONFIG, String.valueOf(blockSize)); + configs.put(RocksDbConfig.ROCKSDB_MAX_WRITE_BUFFER_CONFIG, String.valueOf(maxWriteBuffer)); + configs.put(RocksDbConfig.ROCKSDB_CACHE_INDEX_BLOCK_ENABLED_CONFIG, String.valueOf(cacheIndexBlock)); + configs.put(RocksDbConfig.ROCKSDB_COMPRESSION_TYPE_CONFIG, compressionType); + Properties properties = new Properties(); + properties.putAll(configs); + KafkaStreamsExecutionContext.registerProperties(properties); + + RocksDbConfig rocksDbConfig = new RocksDbConfig(); + + // Act + rocksDbConfig.setConfig("storeName", options, configs); + + // Assert + verify(options, times(1)).tableFormatConfig(); + verify(options, times(1)).setTableFormatConfig(any()); + verify(options, times(1)).setMaxWriteBufferNumber(maxWriteBuffer); + verify(options, times(1)).setWriteBufferSize(writeBufferSize); + verify(options, times(1)).setCompressionType(CompressionType.getCompressionType(compressionType)); + } +} diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/DlqDeserializationExceptionHandlerTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/DlqDeserializationExceptionHandlerTest.java new file mode 100644 index 00000000..cf0b669c --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/DlqDeserializationExceptionHandlerTest.java @@ -0,0 +1,99 @@ +package com.michelin.kstreamplify.rest; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.when; + +import com.michelin.kstreamplify.avro.KafkaError; +import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext; +import com.michelin.kstreamplify.error.DlqDeserializationExceptionHandler; +import com.michelin.kstreamplify.error.DlqExceptionHandler; +import io.confluent.kafka.serializers.KafkaAvroSerializer; +import io.confluent.kafka.serializers.KafkaAvroSerializerConfig; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.errors.DeserializationExceptionHandler; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class DlqDeserializationExceptionHandlerTest { + @Mock + private ConsumerRecord record; + + @Mock + private ProcessorContext processorContext; + + private Producer producer; + + private DlqDeserializationExceptionHandler handler; + + @BeforeEach + void setUp() { + Serializer serializer = (Serializer) new KafkaAvroSerializer(); + serializer.configure(Map.of(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://"), false); + producer = new MockProducer<>(true, new ByteArraySerializer(), serializer); + + KafkaStreamsExecutionContext.setDlqTopicName(null); + } + + @Test + void shouldReturnFailIfNoDlq() { + handler = new DlqDeserializationExceptionHandler(producer); + + DeserializationExceptionHandler.DeserializationHandlerResponse response = + handler.handle(processorContext, record, new RuntimeException("Exception...")); + + assertEquals(DeserializationExceptionHandler.DeserializationHandlerResponse.FAIL, response); + } + + @Test + void shouldReturnFailOnExceptionDuringHandle() { + handler = new DlqDeserializationExceptionHandler(producer); + KafkaStreamsExecutionContext.setDlqTopicName("DlqTopic"); + DeserializationExceptionHandler.DeserializationHandlerResponse response = + handler.handle(processorContext, record, new KafkaException("Exception...")); + + assertEquals(DeserializationExceptionHandler.DeserializationHandlerResponse.FAIL, response); + } + + @Test + void shouldReturnContinueOnKafkaException() { + handler = new DlqDeserializationExceptionHandler(producer); + KafkaStreamsExecutionContext.setDlqTopicName("DlqTopic"); + + when(record.key()).thenReturn("key".getBytes(StandardCharsets.UTF_8)); + when(record.value()).thenReturn("value".getBytes(StandardCharsets.UTF_8)); + when(record.topic()).thenReturn("topic"); + + DeserializationExceptionHandler.DeserializationHandlerResponse response = + handler.handle(processorContext, record, new KafkaException("Exception...")); + + assertEquals(DeserializationExceptionHandler.DeserializationHandlerResponse.CONTINUE, response); + } + + @Test + void shouldConfigure() { + Map configs = new HashMap<>(); + configs.put("bootstrap.servers", "localhost:9092"); + configs.put("schema.registry.url", "localhost:8080"); + configs.put("acks", "all"); + + handler = new DlqDeserializationExceptionHandler(); + handler.configure(configs); + + assertTrue(DlqExceptionHandler.getProducer() instanceof KafkaProducer); + } +} diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/DlqExceptionHandlerTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/DlqExceptionHandlerTest.java new file mode 100644 index 00000000..13dbcd51 --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/DlqExceptionHandlerTest.java @@ -0,0 +1,23 @@ +package com.michelin.kstreamplify.rest; + +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import com.michelin.kstreamplify.error.DlqExceptionHandler; +import java.util.HashMap; +import java.util.Map; +import org.junit.jupiter.api.Test; + +class DlqExceptionHandlerTest { + + @Test + void shouldInstantiateProducer() { + Map configs = new HashMap<>(); + configs.put("bootstrap.servers", "localhost:9092"); + configs.put("schema.registry.url", "localhost:8080"); + configs.put("acks", "all"); + + DlqExceptionHandler.instantiateProducer("test-client", configs); + + assertNotNull(DlqExceptionHandler.getProducer()); + } +} diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/DlqProductionExceptionHandlerTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/DlqProductionExceptionHandlerTest.java new file mode 100644 index 00000000..f526f509 --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/DlqProductionExceptionHandlerTest.java @@ -0,0 +1,108 @@ +package com.michelin.kstreamplify.rest; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.when; + +import com.michelin.kstreamplify.avro.KafkaError; +import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext; +import com.michelin.kstreamplify.error.DlqExceptionHandler; +import com.michelin.kstreamplify.error.DlqProductionExceptionHandler; +import io.confluent.kafka.serializers.KafkaAvroSerializer; +import io.confluent.kafka.serializers.KafkaAvroSerializerConfig; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.clients.consumer.RetriableCommitFailedException; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.errors.ProductionExceptionHandler; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class DlqProductionExceptionHandlerTest { + @Mock + private ProducerRecord record; + + private Producer producer; + + private DlqProductionExceptionHandler handler; + + @BeforeEach + void setUp() { + Serializer serializer = (Serializer) new KafkaAvroSerializer(); + serializer.configure(Map.of(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://"), false); + producer = new MockProducer<>(true, new ByteArraySerializer(), serializer); + + KafkaStreamsExecutionContext.setDlqTopicName(null); + } + + @Test + void shouldReturnFailIfNoDlq() { + handler = new DlqProductionExceptionHandler(producer); + + ProductionExceptionHandler.ProductionExceptionHandlerResponse response = + handler.handle(record, new RuntimeException("Exception...")); + + assertEquals(ProductionExceptionHandler.ProductionExceptionHandlerResponse.FAIL, response); + } + + @Test + void shouldReturnContinueOnExceptionDuringHandle() { + handler = new DlqProductionExceptionHandler(producer); + KafkaStreamsExecutionContext.setDlqTopicName("DlqTopic"); + ProductionExceptionHandler.ProductionExceptionHandlerResponse response = + handler.handle(record, new KafkaException("Exception...")); + + assertEquals(ProductionExceptionHandler.ProductionExceptionHandlerResponse.CONTINUE, response); + } + + @Test + void shouldReturnContinueOnKafkaException() { + handler = new DlqProductionExceptionHandler(producer); + KafkaStreamsExecutionContext.setDlqTopicName("DlqTopic"); + + when(record.key()).thenReturn("key".getBytes(StandardCharsets.UTF_8)); + when(record.value()).thenReturn("value".getBytes(StandardCharsets.UTF_8)); + when(record.topic()).thenReturn("topic"); + + ProductionExceptionHandler.ProductionExceptionHandlerResponse response = + handler.handle(record, new KafkaException("Exception...")); + + assertEquals(ProductionExceptionHandler.ProductionExceptionHandlerResponse.CONTINUE, response); + } + + @Test + void shouldReturnFailOnRetriableException() { + handler = new DlqProductionExceptionHandler(producer); + KafkaStreamsExecutionContext.setDlqTopicName("DlqTopic"); + + ProductionExceptionHandler.ProductionExceptionHandlerResponse response = + handler.handle(record, new RetriableCommitFailedException("Exception...")); + + assertEquals(ProductionExceptionHandler.ProductionExceptionHandlerResponse.FAIL, response); + } + + @Test + void shouldConfigure() { + Map configs = new HashMap<>(); + configs.put("bootstrap.servers", "localhost:9092"); + configs.put("schema.registry.url", "localhost:8080"); + configs.put("acks", "all"); + + handler = new DlqProductionExceptionHandler(); + handler.configure(configs); + + assertTrue(DlqExceptionHandler.getProducer() instanceof KafkaProducer); + } +} + diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/GenericErrorProcessorTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/GenericErrorProcessorTest.java new file mode 100644 index 00000000..56cb6010 --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/GenericErrorProcessorTest.java @@ -0,0 +1,53 @@ +package com.michelin.kstreamplify.rest; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.michelin.kstreamplify.avro.KafkaError; +import com.michelin.kstreamplify.error.GenericErrorProcessor; +import com.michelin.kstreamplify.error.ProcessingError; +import java.util.Optional; +import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext; +import org.apache.kafka.streams.processor.api.FixedKeyRecord; +import org.apache.kafka.streams.processor.api.RecordMetadata; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class GenericErrorProcessorTest { + private final GenericErrorProcessor errorProcessor = new GenericErrorProcessor<>(); + + @Mock + private FixedKeyProcessorContext mockContext; + + @Mock + private FixedKeyRecord> mockRecord; + + @Mock + private RecordMetadata mockRecordMetadata; + + @Test + void shouldProcessError() { + when(mockRecord.value()) + .thenReturn(new ProcessingError<>(new RuntimeException("Exception..."), "Context message", "Record")); + + // Given a mock RecordMetadata + when(mockRecordMetadata.offset()).thenReturn(10L); + when(mockRecordMetadata.partition()).thenReturn(0); + when(mockRecordMetadata.topic()).thenReturn("test-topic"); + + // Given that the context has a recordMetadata + when(mockContext.recordMetadata()).thenReturn(Optional.of(mockRecordMetadata)); + + // When processing the record + errorProcessor.init(mockContext); + errorProcessor.process(mockRecord); + + verify(mockContext).forward(any()); + } +} diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/ProcessingErrorTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/ProcessingErrorTest.java new file mode 100644 index 00000000..8165388f --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/ProcessingErrorTest.java @@ -0,0 +1,64 @@ +package com.michelin.kstreamplify.rest; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.michelin.kstreamplify.avro.KafkaError; +import com.michelin.kstreamplify.error.ProcessingError; +import org.junit.jupiter.api.Test; + +class ProcessingErrorTest { + + @Test + void shouldCreateProcessingErrorFromStringRecord() { + String contextMessage = "Some context message"; + Exception exception = new Exception("Test Exception"); + String kafkaRecord = "Sample Kafka Record"; + + ProcessingError processingError = new ProcessingError<>(exception, contextMessage, kafkaRecord); + + // Assert + assertEquals(exception, processingError.getException()); + assertEquals(contextMessage, processingError.getContextMessage()); + assertEquals(kafkaRecord, processingError.getKafkaRecord()); + } + + @Test + void shouldCreateProcessingErrorWithNoContextMessage() { + Exception exception = new Exception("Test Exception"); + String kafkaRecord = "Sample Kafka Record"; + + ProcessingError processingError = new ProcessingError<>(exception, kafkaRecord); + + // Assert + assertEquals(exception, processingError.getException()); + assertEquals("No context message", processingError.getContextMessage()); + assertEquals(kafkaRecord, processingError.getKafkaRecord()); + } + + @Test + void shouldCreateProcessingErrorFromAvroRecord() { + String contextMessage = "Some context message"; + Exception exception = new Exception("Test Exception"); + KafkaError kafkaRecord = KafkaError.newBuilder() + .setCause("Cause") + .setOffset(1L) + .setPartition(1) + .setTopic("Topic") + .setValue("Value") + .build(); + + ProcessingError processingError = new ProcessingError<>(exception, contextMessage, kafkaRecord); + + assertEquals(exception, processingError.getException()); + assertEquals(contextMessage, processingError.getContextMessage()); + assertEquals(""" + { + "partition": 1, + "offset": 1, + "cause": "Cause", + "topic": "Topic", + "value": "Value" + }""", processingError.getKafkaRecord()); + } +} + diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/ProcessingResultTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/ProcessingResultTest.java new file mode 100644 index 00000000..5e470924 --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/ProcessingResultTest.java @@ -0,0 +1,90 @@ +package com.michelin.kstreamplify.rest; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.michelin.kstreamplify.error.ProcessingResult; +import org.apache.kafka.streams.processor.api.Record; +import org.junit.jupiter.api.Test; + +class ProcessingResultTest { + + @Test + void shouldCreateProcessingResultSuccess() { + String successValue = "Success"; + ProcessingResult result = ProcessingResult.success(successValue); + + assertTrue(result.isValid()); + assertEquals(successValue, result.getValue()); + assertNull(result.getError()); + } + + @Test + void shouldCreateWrappedProcessingResult() { + String value = "Value"; + long timestamp = System.currentTimeMillis(); + + Record record = new Record<>("key", value, timestamp); + Record> wrappedRecord = ProcessingResult.wrapRecordSuccess(record); + + assertEquals(record.key(), wrappedRecord.key()); + assertNotNull(wrappedRecord.value()); + assertTrue(wrappedRecord.value().isValid()); + assertEquals(value, wrappedRecord.value().getValue()); + assertNull(wrappedRecord.value().getError()); + assertEquals(record.timestamp(), wrappedRecord.timestamp()); + } + + @Test + void shouldCreateFailedProcessingResult() { + String failedRecordValue = "Failed Value"; + Exception exception = new Exception("Test Exception"); + + ProcessingResult result = ProcessingResult.fail(exception, failedRecordValue); + + assertFalse(result.isValid()); + assertNull(result.getValue()); + assertNotNull(result.getError()); + assertEquals(exception, result.getError().getException()); + assertEquals(failedRecordValue, result.getError().getKafkaRecord()); + assertEquals("No context message", result.getError().getContextMessage()); + } + + @Test + void shouldCreateWrappedFailedProcessingResult() { + String key = "key"; + String failedValue = "value"; + long timestamp = System.currentTimeMillis(); + Exception exception = new Exception("Test Exception"); + + Record record = new Record<>(key, failedValue, timestamp); + + Record> wrappedRecord = + ProcessingResult.wrapRecordFailure(exception, record); + + assertEquals(record.key(), wrappedRecord.key()); + assertNotNull(wrappedRecord.value()); + assertFalse(wrappedRecord.value().isValid()); + assertNull(wrappedRecord.value().getValue()); + assertNotNull(wrappedRecord.value().getError()); + assertEquals(exception, wrappedRecord.value().getError().getException()); + assertEquals(failedValue, wrappedRecord.value().getError().getKafkaRecord()); + assertEquals("No context message", wrappedRecord.value().getError().getContextMessage()); + assertEquals(record.timestamp(), wrappedRecord.timestamp()); + } + + @Test + void shouldProcessingResultBeValid() { + ProcessingResult validResult = ProcessingResult.success("Value"); + ProcessingResult invalidResult1 = ProcessingResult.fail(new Exception(), 42); + ProcessingResult invalidResult2 = new ProcessingResult<>(null); + + assertTrue(validResult.isValid()); + assertFalse(invalidResult1.isValid()); + assertFalse(invalidResult2.isValid()); + } +} + diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/services/ProbeServiceTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/services/ProbeServiceTest.java new file mode 100644 index 00000000..4d6489b3 --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/services/ProbeServiceTest.java @@ -0,0 +1,134 @@ +package com.michelin.kstreamplify.services; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.when; + +import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext; +import com.michelin.kstreamplify.initializer.KafkaStreamsInitializer; +import com.michelin.kstreamplify.initializer.KafkaStreamsStarter; +import com.michelin.kstreamplify.model.RestServiceResponse; +import java.net.HttpURLConnection; +import java.util.Properties; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class ProbeServiceTest { + @Mock + private KafkaStreamsInitializer kafkaStreamsInitializer; + + @Mock + private KafkaStreams kafkaStreams; + + @Test + void shouldGetReadinessProbeWithWhenStreamsRunning() { + KafkaStreamsExecutionContext.registerProperties(new Properties()); + + when(kafkaStreamsInitializer.getKafkaStreams()).thenReturn(kafkaStreams); + when(kafkaStreams.state()).thenReturn(KafkaStreams.State.RUNNING); + + RestServiceResponse response = ProbeService.readinessProbe(kafkaStreamsInitializer); + + assertEquals(HttpURLConnection.HTTP_OK, response.getStatus()); + } + + @Test + void shouldGetReadinessProbeWithWhenStreamsNotRunning() { + KafkaStreamsExecutionContext.registerProperties(new Properties()); + + when(kafkaStreamsInitializer.getKafkaStreams()).thenReturn(kafkaStreams); + when(kafkaStreams.state()).thenReturn(KafkaStreams.State.NOT_RUNNING); + + RestServiceResponse response = ProbeService.readinessProbe(kafkaStreamsInitializer); + + assertEquals(HttpURLConnection.HTTP_UNAVAILABLE, response.getStatus()); + } + + @Test + void shouldGetReadinessProbeWithWhenStreamsNull() { + when(kafkaStreamsInitializer.getKafkaStreams()).thenReturn(null); + + RestServiceResponse response = ProbeService.readinessProbe(kafkaStreamsInitializer); + + assertEquals(HttpURLConnection.HTTP_BAD_REQUEST, response.getStatus()); + } + + @Test + void shouldGetLivenessProbeWithWhenStreamsRunning() { + when(kafkaStreamsInitializer.getKafkaStreams()).thenReturn(kafkaStreams); + when(kafkaStreams.state()).thenReturn(KafkaStreams.State.RUNNING); + + RestServiceResponse response = ProbeService.livenessProbe(kafkaStreamsInitializer); + + assertEquals(HttpURLConnection.HTTP_OK, response.getStatus()); + } + + @Test + void shouldGetLivenessProbeWithWhenStreamsNotRunning() { + when(kafkaStreamsInitializer.getKafkaStreams()).thenReturn(kafkaStreams); + when(kafkaStreams.state()).thenReturn(KafkaStreams.State.NOT_RUNNING); + + RestServiceResponse response = ProbeService.livenessProbe(kafkaStreamsInitializer); + + assertEquals(HttpURLConnection.HTTP_INTERNAL_ERROR, response.getStatus()); + } + + @Test + void shouldGetLivenessProbeWithWhenStreamsNull() { + when(kafkaStreamsInitializer.getKafkaStreams()).thenReturn(null); + + RestServiceResponse response = ProbeService.livenessProbe(kafkaStreamsInitializer); + + assertEquals(HttpURLConnection.HTTP_NO_CONTENT, response.getStatus()); + } + + @Test + void shouldExposeTopologyWithNonNullTopology() { + StreamsBuilder streamsBuilder = new StreamsBuilder(); + KafkaStreamsStarter starter = new KafkaStreamsStarterImpl(); + starter.topology(streamsBuilder); + + when(kafkaStreamsInitializer.getTopology()).thenReturn(streamsBuilder.build()); + + RestServiceResponse response = ProbeService.exposeTopology(kafkaStreamsInitializer); + + assertEquals(HttpURLConnection.HTTP_OK, response.getStatus()); + assertEquals(""" + Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [inputTopic]) + --> KSTREAM-SINK-0000000001 + Sink: KSTREAM-SINK-0000000001 (topic: outputTopic) + <-- KSTREAM-SOURCE-0000000000 + + """, response.getBody()); + } + + @Test + void shouldExposeTopologyWithNullTopology() { + when(kafkaStreamsInitializer.getTopology()).thenReturn(null); + + RestServiceResponse response = ProbeService.exposeTopology(kafkaStreamsInitializer); + + assertEquals(HttpURLConnection.HTTP_NO_CONTENT, response.getStatus()); + } + + static class KafkaStreamsStarterImpl extends KafkaStreamsStarter { + @Override + public void topology(StreamsBuilder streamsBuilder) { + streamsBuilder + .stream("inputTopic") + .to("outputTopic"); + } + + @Override + public String dlqTopic() { + return "dlqTopic"; + } + } +} + diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/utils/TopicWithSerdesUnitTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/utils/TopicWithSerdesUnitTest.java new file mode 100644 index 00000000..cc4a4229 --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/utils/TopicWithSerdesUnitTest.java @@ -0,0 +1,18 @@ +package com.michelin.kstreamplify.utils; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import org.apache.kafka.common.serialization.Serdes; +import org.junit.jupiter.api.Test; + +class TopicWithSerdesUnitTest { + + @Test + void shouldCreateTopicWithSerde() { + TopicWithSerde topicWithSerde = new TopicWithSerde<>("INPUT_TOPIC", + Serdes.String(), Serdes.String()); + + assertEquals("INPUT_TOPIC", topicWithSerde.getUnPrefixedName()); + } +} diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/utils/WindowStateStoreUtilsTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/utils/WindowStateStoreUtilsTest.java new file mode 100644 index 00000000..bc7d9bba --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/utils/WindowStateStoreUtilsTest.java @@ -0,0 +1,53 @@ +package com.michelin.kstreamplify.utils; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class WindowStateStoreUtilsTest { + @Mock + private WindowStore windowStore; + + @Mock + private WindowStoreIterator iterator; + + @Test + void shouldPutAndGetFromWindowStore() { + String value = "testValue"; + + when(iterator.hasNext()) + .thenReturn(true) + .thenReturn(false); + + when(iterator.next()) + .thenReturn(KeyValue.pair(1L, value)); + + when(windowStore.backwardFetch(anyString(), any(), any())) + .thenReturn(iterator); + + // Call the put method + String key = "testKey"; + WindowStateStoreUtils.put(windowStore, key, value); + String result = WindowStateStoreUtils.get(windowStore, key, 1); + String nullResult = WindowStateStoreUtils.get(windowStore, "nothing", 1); + + // Verify that the put method of the windowStore is called with the correct arguments + assertEquals("testValue", result); + assertNull(nullResult); + verify(windowStore).put(eq(key), eq(value), anyLong()); + } +} diff --git a/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/initializer/SpringKafkaStreamsInitializerTest.java b/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/initializer/SpringKafkaStreamsInitializerTest.java new file mode 100644 index 00000000..5112f68f --- /dev/null +++ b/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/initializer/SpringKafkaStreamsInitializerTest.java @@ -0,0 +1,88 @@ +package com.michelin.kstreamplify.initializer; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext; +import com.michelin.kstreamplify.properties.KafkaProperties; +import java.lang.reflect.Field; +import java.util.Properties; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.DefaultApplicationArguments; +import org.springframework.context.ConfigurableApplicationContext; + +@ExtendWith(MockitoExtension.class) +class SpringKafkaStreamsInitializerTest { + @Mock + private ConfigurableApplicationContext applicationContext; + + @Mock + private KafkaStreamsStarter kafkaStreamsStarter; + + @Mock + private KafkaProperties kafkaProperties; + + @InjectMocks + private SpringKafkaStreamsInitializer initializer; + + @Test + void shouldInitProperties() { + Properties properties = new Properties(); + properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); + properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + properties.put("prefix.self", "abc."); + + when(kafkaProperties.asProperties()).thenReturn(properties); + + initializer.initProperties(); + + assertEquals(kafkaStreamsStarter, initializer.getKafkaStreamsStarter()); + assertNotNull(initializer.getKafkaProperties()); + assertEquals("abc.", KafkaStreamsExecutionContext.getPrefix()); + assertEquals("abc.appId", KafkaStreamsExecutionContext.getProperties() + .get(StreamsConfig.APPLICATION_ID_CONFIG)); + } + + @Test + void shouldCloseSpringBootContextOnUncaughtException() { + Properties properties = new Properties(); + properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); + properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + properties.put("prefix.self", "abc."); + + when(kafkaProperties.asProperties()).thenReturn(properties); + + initializer.initProperties(); + StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse response = initializer + .onStreamsUncaughtException(new RuntimeException("Test Exception")); + + assertEquals(StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT, response); + verify(applicationContext).close(); + } + + @Test + void shouldCloseSpringBootContextOnChangeState() { + initializer.onStateChange(KafkaStreams.State.ERROR, KafkaStreams.State.RUNNING); + verify(applicationContext).close(); + } + + @Test + void shouldNotCloseSpringBootContextOnChangeStateNotError() { + initializer.onStateChange(KafkaStreams.State.REBALANCING, KafkaStreams.State.RUNNING); + verify(applicationContext, never()).close(); + } +} diff --git a/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/rest/SpringProbeControllerTest.java b/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/rest/SpringProbeControllerTest.java new file mode 100644 index 00000000..bce5a66b --- /dev/null +++ b/kstreamplify-spring-boot/src/test/java/com/michelin/kstreamplify/rest/SpringProbeControllerTest.java @@ -0,0 +1,55 @@ +package com.michelin.kstreamplify.rest; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mockStatic; + +import com.michelin.kstreamplify.model.RestServiceResponse; +import com.michelin.kstreamplify.services.ProbeService; +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; + +class SpringProbeControllerTest { + private final SpringProbeController controller = new SpringProbeController(); + + @Test + void shouldGetReadinessProbe() { + try (MockedStatic probeService = mockStatic(ProbeService.class)) { + probeService.when(() -> ProbeService.readinessProbe(any())) + .thenReturn(new RestServiceResponse<>(200, "Ready")); + + ResponseEntity response = controller.readinessProbe(); + + assertEquals(HttpStatus.OK, response.getStatusCode()); + assertEquals("Ready", response.getBody()); + } + } + + @Test + void shouldGetLivenessProbe() { + try (MockedStatic probeService = mockStatic(ProbeService.class)) { + probeService.when(() -> ProbeService.livenessProbe(any())) + .thenReturn(new RestServiceResponse<>(200, "Alive")); + + ResponseEntity response = controller.livenessProbe(); + + assertEquals(HttpStatus.OK, response.getStatusCode()); + assertEquals("Alive", response.getBody()); + } + } + + @Test + void shouldGetTopology() { + try (MockedStatic probeService = mockStatic(ProbeService.class)) { + probeService.when(() -> ProbeService.exposeTopology(any())) + .thenReturn(new RestServiceResponse<>(200, "Topology")); + + ResponseEntity response = controller.exposeTopology(); + + assertEquals(HttpStatus.OK, response.getStatusCode()); + assertEquals("Topology", response.getBody()); + } + } +} diff --git a/lombok.config b/lombok.config new file mode 100644 index 00000000..df71bb6a --- /dev/null +++ b/lombok.config @@ -0,0 +1,2 @@ +config.stopBubbling = true +lombok.addLombokGeneratedAnnotation = true diff --git a/pom.xml b/pom.xml index 6fef361d..6de3f0de 100644 --- a/pom.xml +++ b/pom.xml @@ -111,24 +111,26 @@ kstreamplify-core-test - - 1.11.2 - 2.13.0 - 3.13.0 - 2.10.1 - 17 - 5.10.0 - 3.4.0 - 7.5.0 - 1.18.28 - 17 - 17 - 3.1.3 - michelin - michelin_kstreamplify - ${project.artifactId} - https://sonarcloud.io - + + 1.11.2 + 2.13.0 + 3.13.0 + 2.10.1 + 17 + 5.10.0 + 3.4.0 + 7.5.0 + 1.18.28 + 17 + 17 + 5.4.0 + 0.8.10 + 3.1.3 + michelin + michelin_kstreamplify + ${project.artifactId} + https://sonarcloud.io + @@ -180,24 +182,37 @@ ${commons-lang3.version} - + + org.mockito + mockito-core + ${mockito.version} + test + - - - - org.apache.maven.plugins - maven-source-plugin - 3.3.0 - - - attach-sources - verify - - jar-no-fork - - - - + + org.mockito + mockito-junit-jupiter + ${mockito.version} + test + + + + + + + org.apache.maven.plugins + maven-source-plugin + 3.3.0 + + + attach-sources + verify + + jar-no-fork + + + + org.apache.maven.plugins @@ -230,7 +245,7 @@ org.jacoco jacoco-maven-plugin - 0.8.10 + ${jacoco.version} jacoco-initialize