diff --git a/kstreamplify-core-test/src/main/java/com/michelin/kstreamplify/KafkaStreamsStarterTest.java b/kstreamplify-core-test/src/main/java/com/michelin/kstreamplify/KafkaStreamsStarterTest.java index 1496632f..2bd1fb41 100644 --- a/kstreamplify-core-test/src/main/java/com/michelin/kstreamplify/KafkaStreamsStarterTest.java +++ b/kstreamplify-core-test/src/main/java/com/michelin/kstreamplify/KafkaStreamsStarterTest.java @@ -101,7 +101,8 @@ void generalTearDown() throws IOException { * @return The corresponding TestInputTopic */ protected TestInputTopic createInputTestTopic(TopicWithSerde topicWithSerde) { - return this.testDriver.createInputTopic(topicWithSerde.getUnPrefixedName(), topicWithSerde.getKeySerde().serializer(), topicWithSerde.getValueSerde().serializer()); + return this.testDriver.createInputTopic(topicWithSerde.getUnPrefixedName(), + topicWithSerde.getKeySerde().serializer(), topicWithSerde.getValueSerde().serializer()); } /** diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessor.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessor.java index 0a0a8998..845e6041 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessor.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessor.java @@ -79,7 +79,6 @@ public void init(ProcessorContext> context) { @Override public void process(Record record) { try { - String identifier = deduplicationKeyExtractor.apply(record.value()); // Retrieve the matching identifier in the statestore and return null if found it (signaling a duplicate) if (dedupTimestampedStore.get(identifier) == null) { @@ -88,7 +87,7 @@ public void process(Record record) { processorContext.forward(ProcessingResult.wrapRecordSuccess(record)); } } catch (Exception e) { - processorContext.forward(ProcessingResult.wrapRecordFailure(e, record, "Couldn't figure out what to do with the current payload: An unlikely error occured during deduplication transform")); + processorContext.forward(ProcessingResult.wrapRecordFailure(e, record, "Couldn't figure out what to do with the current payload: An unlikely error occurred during deduplication transform")); } } } diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/TopologyTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/TopologyTest.java deleted file mode 100644 index 0d9874da..00000000 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/TopologyTest.java +++ /dev/null @@ -1,48 +0,0 @@ -package com.michelin.kstreamplify; - -import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext; -import com.michelin.kstreamplify.initializer.KafkaStreamsInitializer; -import com.michelin.kstreamplify.initializer.KafkaStreamsStarterTopologyTestHelper; -import com.michelin.kstreamplify.model.TopologyExposeJsonModel; -import com.michelin.kstreamplify.services.ConvertTopology; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.TopologyTestDriver; -import org.junit.jupiter.api.Test; - -import java.time.Duration; -import java.time.Instant; -import java.util.Collections; -import java.util.Properties; - -import static org.junit.jupiter.api.Assertions.assertNotNull; - -class TopologyTest { - protected TopologyTestDriver testDriver; - @Test - void convertTopologyForRestTest() { - - Properties properties = new Properties(); - properties.setProperty("application.id", "test"); - properties.setProperty("bootstrap.servers", "mock:1234"); - properties.setProperty("state.dir", "/tmp/kafka-streams"); - KafkaStreamsExecutionContext.registerProperties(properties); - KafkaStreamsExecutionContext.setDlqTopicName("DLQ_TOPIC"); - KafkaStreamsExecutionContext.setSerdesConfig(Collections.singletonMap("schema.registry.url", "mock://" + this.getClass().getName())); - - - StreamsBuilder streamsBuilder = new StreamsBuilder(); - KafkaStreamsStarterTopologyTestHelper kafkaStreamsStarterTopologyTest = new KafkaStreamsStarterTopologyTestHelper(); - kafkaStreamsStarterTopologyTest.topology(streamsBuilder); - KafkaStreamsInitializer kafkaStreamsInitializer = new KafkaStreamsInitializer(); - kafkaStreamsInitializer.init(kafkaStreamsStarterTopologyTest); - TopologyExposeJsonModel topology = ConvertTopology.convertTopologyForRest("STREAM", kafkaStreamsInitializer.getTopology()); - - testDriver = new TopologyTestDriver(streamsBuilder.build(), properties, this.getInitialWallClockTime()); - - assertNotNull(topology); - testDriver.advanceWallClockTime(Duration.ofDays(1)); - } - private Instant getInitialWallClockTime() { - return Instant.ofEpochMilli(1577836800000L); - } -} diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/converter/JsonToAvroConverterTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/converter/JsonToAvroConverterTest.java index 9e5d193e..1f0bace3 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/converter/JsonToAvroConverterTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/converter/JsonToAvroConverterTest.java @@ -6,6 +6,7 @@ import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; @Slf4j class JsonToAvroConverterTest { @@ -19,7 +20,7 @@ void shouldConvertJsonToAvro() { assertEquals(8, kafkaTest.getSplit().get(0).getSubSplit().get(0).getSubSubIntField()); assertEquals("subSubTest", kafkaTest.getSplit().get(0).getSubSplit().get(0).getSubSubField()); assertEquals("subTest", kafkaTest.getSplit().get(0).getSubField()); - assertEquals(false, kafkaTest.getBooleanField()); + assertFalse(kafkaTest.getBooleanField()); assertEquals("1.0000", kafkaTest.getMembers().get("key1").getMapQuantityField().toString()); assertEquals("10.0000", kafkaTest.getQuantityField().toString()); assertEquals("test", kafkaTest.getStringField()); diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessorTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessorTest.java index 24a2a618..71723a56 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessorTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessorTest.java @@ -7,14 +7,17 @@ import org.apache.kafka.streams.state.TimestampedKeyValueStore; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import java.time.Duration; +import org.mockito.junit.jupiter.MockitoExtension; import static org.mockito.Mockito.*; +@ExtendWith(MockitoExtension.class) class DedupKeyProcessorTest { @Mock @@ -26,19 +29,14 @@ class DedupKeyProcessorTest { @InjectMocks private DedupKeyProcessor dedupKeyProcessor; - @BeforeEach - void setUp() { - MockitoAnnotations.openMocks(this); - when(context.getStateStore("dedupStoreName")).thenReturn(dedupTimestampedStore); - } - @Test - void testProcessNewRecord() { + void shouldProcessNewRecord() { String key = "some-key"; KafkaError value = new KafkaError(); Record record = new Record<>(key, value, 0); + when(context.getStateStore("dedupStoreName")).thenReturn(dedupTimestampedStore); when(dedupTimestampedStore.get(key)).thenReturn(null); DedupKeyProcessor dedupKeyProcessor = new DedupKeyProcessor<>("dedupStoreName",Duration.ZERO); @@ -47,5 +45,4 @@ void testProcessNewRecord() { verify(dedupTimestampedStore).put(eq(key), any()); } - } diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessorTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessorTest.java index 37100d36..83cec08a 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessorTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessorTest.java @@ -8,44 +8,42 @@ import org.apache.kafka.streams.state.WindowStore; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import java.time.Duration; +import org.mockito.junit.jupiter.MockitoExtension; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.mockito.Mockito.*; +@ExtendWith(MockitoExtension.class) class DedupKeyValueProcessorTest { @Mock private ProcessorContext> context; @Mock - private WindowStore windowStore; + private WindowStore windowStore; @InjectMocks private DedupKeyValueProcessor dedupKeyValueProcessor; - @BeforeEach - void setUp() { - MockitoAnnotations.openMocks(this); - when(context.getStateStore("dedupStoreName")).thenReturn(windowStore); - } - @Test - void testProcessNewRecord() { + void shouldProcessNewRecord() { String key = "some-key"; KafkaError value = new KafkaError(); Record record = new Record<>(key, value, 0); + when(context.getStateStore("dedupStoreName")).thenReturn(windowStore); + DedupKeyValueProcessor dedupKeyValueProcessor = new DedupKeyValueProcessor<>("dedupStoreName",Duration.ZERO); dedupKeyValueProcessor.init(context); dedupKeyValueProcessor.process(record); - assertNotNull(record); + verify(windowStore).put(record.key(), record.value(), record.timestamp()); } - } diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorTest.java index 608c153a..687a44d9 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorTest.java @@ -2,6 +2,7 @@ import com.michelin.kstreamplify.avro.KafkaError; import com.michelin.kstreamplify.error.ProcessingResult; +import org.apache.avro.specific.SpecificRecord; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.api.MockProcessorContext; import org.apache.kafka.streams.processor.api.ProcessorContext; @@ -13,26 +14,34 @@ import java.time.Duration; import java.util.Iterator; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; -import static com.google.common.base.Verify.verify; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +@ExtendWith(MockitoExtension.class) class DedupWithPredicateProcessorTest { private DedupWithPredicateProcessor processor; + + @Mock private ProcessorContext> context; + + @Mock private TimestampedKeyValueStore store; @BeforeEach public void setUp() { - // Initialize mock objects and the processor - context = mock(ProcessorContext.class); - store = mock(TimestampedKeyValueStore.class); - // Create an instance of DedupWithPredicateProcessor for testing processor = new DedupWithPredicateProcessor<>("testStore", Duration.ofHours(1), TestKeyExtractor::extract); @@ -43,46 +52,52 @@ public void setUp() { } @Test - void testProcessFirstTime() { + void shouldProcessFirstTime() { // Create a test record Record record = new Record<>("key", new KafkaError(), 0L); // Stub store.get to return null, indicating it's the first time - when(store.get("key")).thenReturn(null); + when(store.get(any())).thenReturn(null); // Call the process method processor.process(record); - // Verify that the record is stored in the store and forwarded - store.put(eq("key"), any()); - context.forward(any()); - - assertNotNull(record); + verify(store).put(eq(""), argThat(arg -> arg.value().equals(record.value()))); + verify(context).forward(argThat(arg -> arg.value().getValue().equals(record.value()))); } @Test - void testProcessDuplicate() { + void shouldProcessDuplicate() { // Create a test record Record record = new Record<>("key", new KafkaError(), 0L); // Stub store.get to return a value, indicating a duplicate - when(store.get("key")).thenReturn(ValueAndTimestamp.make(new KafkaError(), 0L)); + when(store.get("")).thenReturn(ValueAndTimestamp.make(new KafkaError(), 0L)); // Call the process method processor.process(record); - assertNotNull(record); + verify(store, never()).put(any(), any()); + verify(context, never()).forward(any()); } - // Example: Test error handling in process method @Test - void testProcessError() { - // Create a test record that will trigger an exception - Record record = new Record<>("key", null, 0L); + void shouldThrowException() { + // Create a test record + Record record = new Record<>("key", new KafkaError(), 0L); + + when(store.get(any())).thenReturn(null); + doThrow(new RuntimeException("Exception...")).when(store).put(any(), any()); // Call the process method processor.process(record); - assertNotNull(record); + verify(context).forward(argThat(arg -> arg.value().getError().getContextMessage().equals("Couldn't figure out what to do with the current payload: An unlikely error occurred during deduplication transform"))); + } + + public static class TestKeyExtractor { + public static String extract(V v) { + return ""; + } } } diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/initializer/KafkaStreamsStarterTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/initializer/KafkaStreamsStarterTest.java new file mode 100644 index 00000000..9ab2233a --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/initializer/KafkaStreamsStarterTest.java @@ -0,0 +1,90 @@ +package com.michelin.kstreamplify.initializer; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.michelin.kstreamplify.avro.KafkaError; +import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext; +import com.michelin.kstreamplify.deduplication.DeduplicationUtils; +import com.michelin.kstreamplify.error.ProcessingResult; +import com.michelin.kstreamplify.error.TopologyErrorHandler; +import com.michelin.kstreamplify.utils.TopicWithSerdesTestHelper; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; +import io.confluent.kafka.serializers.KafkaAvroSerializer; +import java.io.IOException; +import java.time.Duration; +import java.util.Map; +import java.util.Properties; +import lombok.Getter; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.junit.jupiter.api.Test; + +class KafkaStreamsStarterTest { + + @Test + void shouldInstantiateKafkaStreamsStarter() { + KafkaStreamsExecutionContext.registerProperties(new Properties()); + KafkaStreamsExecutionContext.setSerdesConfig(Map.of(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://")); + + StreamsBuilder builder = new StreamsBuilder(); + KafkaStreamsStarterImpl starter = new KafkaStreamsStarterImpl(); + starter.topology(builder); + + assertNotNull(builder.build().describe()); + assertEquals("dlqTopicUnitTests", starter.dlqTopic()); + + starter.onStart(null); + assertTrue(starter.isStarted()); + } + + @Getter + static class KafkaStreamsStarterImpl extends KafkaStreamsStarter { + private boolean started; + + @Override + public void topology(StreamsBuilder streamsBuilder) { + var streams = TopicWithSerdesTestHelper.inputTopicWithSerdes().stream(streamsBuilder); + + DeduplicationUtils.deduplicateKeys(streamsBuilder, streams, "deduplicateKeysStoreName", "deduplicateKeysRepartitionName", Duration.ZERO); + DeduplicationUtils.deduplicateKeyValues(streamsBuilder, streams, "deduplicateKeyValuesStoreName", "deduplicateKeyValuesRepartitionName", Duration.ZERO); + DeduplicationUtils.deduplicateWithPredicate(streamsBuilder, streams, Duration.ofMillis(1), null); + + var enrichedStreams = streams.mapValues(KafkaStreamsStarterImpl::enrichValue); + var enrichedStreams2 = streams.mapValues(KafkaStreamsStarterImpl::enrichValue2); + var processingResults = TopologyErrorHandler.catchErrors(enrichedStreams); + TopologyErrorHandler.catchErrors(enrichedStreams2, true); + TopicWithSerdesTestHelper.outputTopicWithSerdes().produce(processingResults); + + } + + @Override + public String dlqTopic() { + return "dlqTopicUnitTests"; + } + + @Override + public void onStart(KafkaStreams kafkaStreams) { + started = true; + } + + private static ProcessingResult enrichValue(KafkaError input) { + if (input != null) { + String output = "output field"; + return ProcessingResult.success(output); + } else { + return ProcessingResult.fail(new IOException("an exception occurred"), "output error"); + } + } + + private static ProcessingResult enrichValue2(KafkaError input) { + if (input != null) { + String output = "output field 2"; + return ProcessingResult.success(output); + } else { + return ProcessingResult.fail(new IOException("an exception occurred"), "output error 2"); + } + } + } +} diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/initializer/KafkaStreamsStarterTopologyTestHelper.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/initializer/KafkaStreamsStarterTopologyTestHelper.java deleted file mode 100644 index e6543ad9..00000000 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/initializer/KafkaStreamsStarterTopologyTestHelper.java +++ /dev/null @@ -1,55 +0,0 @@ -package com.michelin.kstreamplify.initializer; - -import com.michelin.kstreamplify.avro.KafkaError; -import com.michelin.kstreamplify.deduplication.DeduplicationUtils; -import com.michelin.kstreamplify.error.ProcessingResult; -import com.michelin.kstreamplify.error.TopologyErrorHandler; -import com.michelin.kstreamplify.utils.TopicWithSerdesTestHelper; -import org.apache.kafka.streams.StreamsBuilder; - -import java.io.IOException; -import java.time.Duration; - -public class KafkaStreamsStarterTopologyTestHelper extends KafkaStreamsStarter { - - - @Override - public void topology(StreamsBuilder streamsBuilder) { - - var streams = TopicWithSerdesTestHelper.inputTopicWithSerdes().stream(streamsBuilder); - - DeduplicationUtils.deduplicateKeys(streamsBuilder, streams, "deduplicateKeysStoreName", "deduplicateKeysRepartitionName", Duration.ZERO); - DeduplicationUtils.deduplicateKeyValues(streamsBuilder, streams, "deduplicateKeyValuesStoreName", "deduplicateKeyValuesRepartitionName", Duration.ZERO); - DeduplicationUtils.deduplicateWithPredicate(streamsBuilder, streams, Duration.ofMillis(1), null); - - var enrichedStreams = streams.mapValues(KafkaStreamsStarterTopologyTestHelper::enrichValue); - var enrichedStreams2 = streams.mapValues(KafkaStreamsStarterTopologyTestHelper::enrichValue2); - var processingResults = TopologyErrorHandler.catchErrors(enrichedStreams); - TopologyErrorHandler.catchErrors(enrichedStreams2, true); - TopicWithSerdesTestHelper.outputTopicWithSerdes().produce(processingResults); - - } - - @Override - public String dlqTopic() { - return "dlqTopicUnitTests"; - } - - private static ProcessingResult enrichValue(KafkaError input) { - if(input != null) { - String output = "output field"; - return ProcessingResult.success(output); - } else { - return ProcessingResult.fail(new IOException("an exception occurred"), "output error"); - } - } - - private static ProcessingResult enrichValue2(KafkaError input) { - if(input != null) { - String output = "output field 2"; - return ProcessingResult.success(output); - } else { - return ProcessingResult.fail(new IOException("an exception occurred"), "output error 2"); - } - } -} diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/model/DlqTopicTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/model/DlqTopicTest.java deleted file mode 100644 index a6f2130c..00000000 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/model/DlqTopicTest.java +++ /dev/null @@ -1,33 +0,0 @@ -package com.michelin.kstreamplify.model; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.Mockito.when; - -class DlqTopicTest { - - @Mock - private DlqTopic dlqTopicMock; - - @BeforeEach - public void setUp() { - MockitoAnnotations.initMocks(this); - } - - @Test - void testDlqTopicName() { - DlqTopic dlqTopic = DlqTopic.builder() - .name("TestTopic") - .build(); - - when(dlqTopicMock.getName()).thenReturn("TestTopic"); - - assertEquals("TestTopic", dlqTopic.getName()); - - dlqTopic.builder().toString(); - } -} \ No newline at end of file diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/properties/RocksDBConfigTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/properties/RocksDbConfigTest.java similarity index 99% rename from kstreamplify-core/src/test/java/com/michelin/kstreamplify/properties/RocksDBConfigTest.java rename to kstreamplify-core/src/test/java/com/michelin/kstreamplify/properties/RocksDbConfigTest.java index b2c20fa7..b4c7935f 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/properties/RocksDBConfigTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/properties/RocksDbConfigTest.java @@ -15,7 +15,7 @@ import static org.mockito.Mockito.*; -class RocksDBConfigTest { +class RocksDbConfigTest { @Mock private Options options; diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/test/DlqDeserializationExceptionHandlerTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/DlqDeserializationExceptionHandlerTest.java similarity index 99% rename from kstreamplify-core/src/test/java/com/michelin/kstreamplify/test/DlqDeserializationExceptionHandlerTest.java rename to kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/DlqDeserializationExceptionHandlerTest.java index 3fcb5172..029946b5 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/test/DlqDeserializationExceptionHandlerTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/DlqDeserializationExceptionHandlerTest.java @@ -1,4 +1,4 @@ -package com.michelin.kstreamplify.test; +package com.michelin.kstreamplify.rest; import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext; import com.michelin.kstreamplify.error.DlqDeserializationExceptionHandler; diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/test/DlqExceptionHandlerTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/DlqExceptionHandlerTest.java similarity index 96% rename from kstreamplify-core/src/test/java/com/michelin/kstreamplify/test/DlqExceptionHandlerTest.java rename to kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/DlqExceptionHandlerTest.java index 4aefc9b1..7c9a972c 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/test/DlqExceptionHandlerTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/DlqExceptionHandlerTest.java @@ -1,4 +1,4 @@ -package com.michelin.kstreamplify.test; +package com.michelin.kstreamplify.rest; import com.michelin.kstreamplify.avro.KafkaError; import com.michelin.kstreamplify.error.DlqDeserializationExceptionHandler; diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/test/DlqProductionExceptionHandlerTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/DlqProductionExceptionHandlerTest.java similarity index 90% rename from kstreamplify-core/src/test/java/com/michelin/kstreamplify/test/DlqProductionExceptionHandlerTest.java rename to kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/DlqProductionExceptionHandlerTest.java index 056e9cc6..b68b39c7 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/test/DlqProductionExceptionHandlerTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/DlqProductionExceptionHandlerTest.java @@ -1,28 +1,19 @@ -package com.michelin.kstreamplify.test; +package com.michelin.kstreamplify.rest; import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.*; -import com.michelin.kstreamplify.avro.KafkaError; import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext; -import com.michelin.kstreamplify.error.DlqDeserializationExceptionHandler; import com.michelin.kstreamplify.error.DlqExceptionHandler; import com.michelin.kstreamplify.error.DlqProductionExceptionHandler; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.RetriableException; -import org.apache.kafka.streams.errors.DeserializationExceptionHandler; import org.apache.kafka.streams.errors.ProductionExceptionHandler.ProductionExceptionHandlerResponse; import org.apache.kafka.streams.processor.ProcessorContext; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.mockito.Mock; import org.mockito.MockedStatic; -import org.mockito.MockitoAnnotations; import java.io.IOException; import java.nio.charset.StandardCharsets; diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/test/GenericErrorProcessorTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/GenericErrorProcessorTest.java similarity index 98% rename from kstreamplify-core/src/test/java/com/michelin/kstreamplify/test/GenericErrorProcessorTest.java rename to kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/GenericErrorProcessorTest.java index f0ad0d9d..b817b206 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/test/GenericErrorProcessorTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/GenericErrorProcessorTest.java @@ -1,4 +1,4 @@ -package com.michelin.kstreamplify.test; +package com.michelin.kstreamplify.rest; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.mockito.Mockito.*; diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/test/ProcessingErrorTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/ProcessingErrorTest.java similarity index 93% rename from kstreamplify-core/src/test/java/com/michelin/kstreamplify/test/ProcessingErrorTest.java rename to kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/ProcessingErrorTest.java index d2d953b9..9d32fd22 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/test/ProcessingErrorTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/ProcessingErrorTest.java @@ -1,11 +1,9 @@ -package com.michelin.kstreamplify.test; +package com.michelin.kstreamplify.rest; -import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext; import com.michelin.kstreamplify.converter.AvroToJsonConverter; import com.michelin.kstreamplify.error.ProcessingError; import org.apache.avro.generic.GenericRecord; import org.junit.jupiter.api.Test; -import org.mockito.Mock; import org.mockito.MockedStatic; import org.mockito.Mockito; diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/test/ProcessingResultTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/ProcessingResultTest.java similarity index 95% rename from kstreamplify-core/src/test/java/com/michelin/kstreamplify/test/ProcessingResultTest.java rename to kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/ProcessingResultTest.java index 3e45c6fa..872d4352 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/test/ProcessingResultTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/rest/ProcessingResultTest.java @@ -1,13 +1,10 @@ -package com.michelin.kstreamplify.test; +package com.michelin.kstreamplify.rest; -import com.michelin.kstreamplify.error.ProcessingError; import com.michelin.kstreamplify.error.ProcessingResult; import org.apache.kafka.streams.processor.api.Record; import org.junit.jupiter.api.Test; -import org.mockito.Mockito; import static org.junit.jupiter.api.Assertions.*; -import static org.mockito.Mockito.*; class ProcessingResultTest { diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/services/ProbeServiceTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/services/ProbeServiceTest.java index c568eec3..ee166c25 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/services/ProbeServiceTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/services/ProbeServiceTest.java @@ -8,105 +8,86 @@ import org.apache.kafka.streams.processor.internals.StreamThread; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import java.net.HttpURLConnection; import java.util.Properties; import java.util.Set; +import org.mockito.junit.jupiter.MockitoExtension; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +@ExtendWith(MockitoExtension.class) class ProbeServiceTest { - @Mock private KafkaStreamsInitializer kafkaStreamsInitializer; @Mock private KafkaStreams kafkaStreams; - @Mock - private StreamThread streamThread; - - @BeforeEach - void setUp() { - MockitoAnnotations.openMocks(this); - when(kafkaStreamsInitializer.getKafkaStreams()).thenReturn(kafkaStreams); - } - @Test - void testReadinessProbeWithRunningStreams() { - // Arrange + void shouldGetReadinessProbeWithRunningStreams() { KafkaStreamsExecutionContext.registerProperties(new Properties()); + + when(kafkaStreamsInitializer.getKafkaStreams()).thenReturn(kafkaStreams); when(kafkaStreams.state()).thenReturn(KafkaStreams.State.RUNNING); - // Act RestServiceResponse response = ProbeService.readinessProbe(kafkaStreamsInitializer); - // Assert assertEquals(HttpURLConnection.HTTP_OK, response.getStatus()); } @Test void testReadinessProbeWithNonRunningStreams() { - // Arrange KafkaStreamsExecutionContext.registerProperties(new Properties()); + + when(kafkaStreamsInitializer.getKafkaStreams()).thenReturn(kafkaStreams); when(kafkaStreams.state()).thenReturn(KafkaStreams.State.NOT_RUNNING); - // Act RestServiceResponse response = ProbeService.readinessProbe(kafkaStreamsInitializer); - // Assert assertEquals(HttpURLConnection.HTTP_UNAVAILABLE, response.getStatus()); } @Test void testReadinessProbeWithNullKafkaStreams() { - // Arrange when(kafkaStreamsInitializer.getKafkaStreams()).thenReturn(null); - // Act RestServiceResponse response = ProbeService.readinessProbe(kafkaStreamsInitializer); - // Assert assertEquals(HttpURLConnection.HTTP_BAD_REQUEST, response.getStatus()); } @Test void testLivenessProbeWithRunningStreams() { - // Arrange + when(kafkaStreamsInitializer.getKafkaStreams()).thenReturn(kafkaStreams); when(kafkaStreams.state()).thenReturn(KafkaStreams.State.RUNNING); - // Act RestServiceResponse response = ProbeService.livenessProbe(kafkaStreamsInitializer); - // Assert assertEquals(HttpURLConnection.HTTP_OK, response.getStatus()); } @Test void testLivenessProbeWithNonRunningStreams() { - // Arrange + when(kafkaStreamsInitializer.getKafkaStreams()).thenReturn(kafkaStreams); when(kafkaStreams.state()).thenReturn(KafkaStreams.State.NOT_RUNNING); - // Act RestServiceResponse response = ProbeService.livenessProbe(kafkaStreamsInitializer); - // Assert assertEquals(HttpURLConnection.HTTP_INTERNAL_ERROR, response.getStatus()); } @Test void testLivenessProbeWithNullKafkaStreams() { - // Arrange when(kafkaStreamsInitializer.getKafkaStreams()).thenReturn(null); - // Act RestServiceResponse response = ProbeService.livenessProbe(kafkaStreamsInitializer); - // Assert assertEquals(HttpURLConnection.HTTP_NO_CONTENT, response.getStatus()); } diff --git a/lombok.config b/lombok.config new file mode 100644 index 00000000..df71bb6a --- /dev/null +++ b/lombok.config @@ -0,0 +1,2 @@ +config.stopBubbling = true +lombok.addLombokGeneratedAnnotation = true diff --git a/pom.xml b/pom.xml index 781e87c5..98edaf2c 100644 --- a/pom.xml +++ b/pom.xml @@ -189,6 +189,12 @@ test + + org.mockito + mockito-junit-jupiter + ${mockito.version} + test +