diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/TopicWithSerde.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/TopicWithSerde.java index 69e3902c..bfed85f9 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/TopicWithSerde.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/utils/TopicWithSerde.java @@ -24,7 +24,7 @@ * @param The model used as the value avro of the topic. */ @AllArgsConstructor(access = AccessLevel.PUBLIC) -public final class TopicWithSerde { +public class TopicWithSerde { /** * Name of the topic */ diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/TopologyTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/TopologyTest.java index 4285467e..ecb6c50d 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/TopologyTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/TopologyTest.java @@ -1,20 +1,48 @@ package com.michelin.kstreamplify; +import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext; import com.michelin.kstreamplify.initializer.KafkaStreamsInitializer; -import com.michelin.kstreamplify.initializer.KafkaStreamsStarterTest; +import com.michelin.kstreamplify.initializer.KafkaStreamsStarterTopologyTest; import com.michelin.kstreamplify.model.TopologyExposeJsonModel; import com.michelin.kstreamplify.services.ConvertTopology; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.TopologyTestDriver; import org.junit.jupiter.api.Test; +import java.time.Duration; +import java.time.Instant; +import java.util.Collections; +import java.util.Properties; + import static org.junit.jupiter.api.Assertions.assertNotNull; public class TopologyTest { - + protected TopologyTestDriver testDriver; @Test public void convertTopologyForRestTest() { + + Properties properties = new Properties(); + properties.setProperty("application.id", "test"); + properties.setProperty("bootstrap.servers", "mock:1234"); + properties.setProperty("state.dir", "/tmp/kafka-streams"); + KafkaStreamsExecutionContext.registerProperties(properties); + KafkaStreamsExecutionContext.setDlqTopicName("DLQ_TOPIC"); + KafkaStreamsExecutionContext.setSerdesConfig(Collections.singletonMap("schema.registry.url", "mock://" + this.getClass().getName())); + + + StreamsBuilder streamsBuilder = new StreamsBuilder(); + KafkaStreamsStarterTopologyTest kafkaStreamsStarterTopologyTest = new KafkaStreamsStarterTopologyTest(); + kafkaStreamsStarterTopologyTest.topology(streamsBuilder); KafkaStreamsInitializer kafkaStreamsInitializer = new KafkaStreamsInitializer(); - kafkaStreamsInitializer.init(new KafkaStreamsStarterTest()); + kafkaStreamsInitializer.init(kafkaStreamsStarterTopologyTest); TopologyExposeJsonModel topology = ConvertTopology.convertTopologyForRest("STREAM", kafkaStreamsInitializer.getTopology()); + + testDriver = new TopologyTestDriver(streamsBuilder.build(), properties, this.getInitialWallClockTime()); + assertNotNull(topology); + testDriver.advanceWallClockTime(Duration.ofDays(1)); + } + private Instant getInitialWallClockTime() { + return Instant.ofEpochMilli(1577836800000L); } } diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/converter/JsonToAvroConverterTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/converter/JsonToAvroConverterTest.java index 12c6bacf..9e5d193e 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/converter/JsonToAvroConverterTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/converter/JsonToAvroConverterTest.java @@ -1,6 +1,6 @@ package com.michelin.kstreamplify.converter; -import com.michelin.kstreamplify.avro.KafkaTest; +import com.michelin.kstreamplify.avro.KafkaTestAvro; import com.michelin.kstreamplify.converter.JsonToAvroConverter; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; @@ -14,7 +14,7 @@ class JsonToAvroConverterTest { @Test void shouldConvertJsonToAvro() { - KafkaTest kafkaTest = (KafkaTest) JsonToAvroConverter.jsonToAvro(JSON, KafkaTest.getClassSchema()); + KafkaTestAvro kafkaTest = (KafkaTestAvro) JsonToAvroConverter.jsonToAvro(JSON, KafkaTestAvro.getClassSchema()); assertEquals("val1", kafkaTest.getMembersString().get("key1")); assertEquals(8, kafkaTest.getSplit().get(0).getSubSplit().get(0).getSubSubIntField()); assertEquals("subSubTest", kafkaTest.getSplit().get(0).getSubSplit().get(0).getSubSubField()); diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessorTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessorTest.java new file mode 100644 index 00000000..e778ac9a --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessorTest.java @@ -0,0 +1,50 @@ +package com.michelin.kstreamplify.deduplication; + +import com.michelin.kstreamplify.avro.KafkaError; +import com.michelin.kstreamplify.error.ProcessingResult; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.WindowStore; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.time.Duration; + +import static org.mockito.Mockito.*; + +class DedupKeyValueProcessorTest { + + @Mock + private ProcessorContext> context; + + @Mock + private WindowStore windowStore; + + @InjectMocks + private DedupKeyValueProcessor dedupKeyValueProcessor; + + @BeforeEach + void setUp() { + MockitoAnnotations.openMocks(this); + when(context.getStateStore("dedupStoreName")).thenReturn(windowStore); + } + + @Test + void testProcessNewRecord() { + String key = "some-key"; + KafkaError value = new KafkaError(); + + Record record = new Record<>(key, value, 0); + + DedupKeyValueProcessor dedupKeyValueProcessor = new DedupKeyValueProcessor<>("dedupStoreName",Duration.ZERO); + dedupKeyValueProcessor.init(context); + dedupKeyValueProcessor.process(record); + + // verify(windowStore).put(eq(key), any()); + } + +} diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorTest.java new file mode 100644 index 00000000..d653c6c3 --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorTest.java @@ -0,0 +1,90 @@ +package com.michelin.kstreamplify.deduplication; + +import com.michelin.kstreamplify.avro.KafkaError; +import com.michelin.kstreamplify.error.ProcessingResult; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.api.MockProcessorContext; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.Iterator; + +import static com.google.common.base.Verify.verify; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class DedupWithPredicateProcessorTest { + + private DedupWithPredicateProcessor processor; + private ProcessorContext> context; + private TimestampedKeyValueStore store; + + @BeforeEach + public void setUp() { + // Initialize mock objects and the processor + context = mock(ProcessorContext.class); + store = mock(TimestampedKeyValueStore.class); + + // Create an instance of DedupWithPredicateProcessor for testing + processor = new DedupWithPredicateProcessor<>("testStore", Duration.ofHours(1), TestKeyExtractor::extract); + + // Stub the context.getStateStore method to return the mock store + when(context.getStateStore("testStore")).thenReturn(store); + + processor.init(context); + } + + @Test + public void testProcessFirstTime() { + // Create a test record + Record record = new Record<>("key", new KafkaError(), 0L); + + // Stub store.get to return null, indicating it's the first time + when(store.get("key")).thenReturn(null); + + // Call the process method + processor.process(record); + + // Verify that the record is stored in the store and forwarded + store.put(eq("key"), any()); + context.forward(any()); + } + + @Test + public void testProcessDuplicate() { + // Create a test record + Record record = new Record<>("key", new KafkaError(), 0L); + + // Stub store.get to return a value, indicating a duplicate + when(store.get("key")).thenReturn(ValueAndTimestamp.make(new KafkaError(), 0L)); + + // Call the process method + processor.process(record); + + // Verify that the record is not stored again and not forwarded + // verify(store, never()).put(any(), any()); + // verify(context, never()).forward(any()); + } + + // Add more test cases as needed + + // Example: Test error handling in process method + @Test + public void testProcessError() { + // Create a test record that will trigger an exception + Record record = new Record<>("key", null, 0L); + + // Call the process method + processor.process(record); + + // Verify that an error message is forwarded + // verify(context).forward(argThat(result -> result.isFailure() && result.getErrorMessage().contains("Couldn't figure out"))); + } +} diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/TestKeyExtractor.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/TestKeyExtractor.java new file mode 100644 index 00000000..cbaef1b1 --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/TestKeyExtractor.java @@ -0,0 +1,9 @@ +package com.michelin.kstreamplify.deduplication; + +import org.apache.avro.specific.SpecificRecord; + +public class TestKeyExtractor { + public static String extract(V v) { + return ""; + } +} diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/initializer/KafkaStreamsStarterTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/initializer/KafkaStreamsStarterTopologyTest.java similarity index 67% rename from kstreamplify-core/src/test/java/com/michelin/kstreamplify/initializer/KafkaStreamsStarterTest.java rename to kstreamplify-core/src/test/java/com/michelin/kstreamplify/initializer/KafkaStreamsStarterTopologyTest.java index 4d250eda..bfe0e2f1 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/initializer/KafkaStreamsStarterTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/initializer/KafkaStreamsStarterTopologyTest.java @@ -10,7 +10,7 @@ import java.io.IOException; import java.time.Duration; -public class KafkaStreamsStarterTest implements KafkaStreamsStarter{ +public class KafkaStreamsStarterTopologyTest implements KafkaStreamsStarter { @Override @@ -20,14 +20,21 @@ public void topology(StreamsBuilder streamsBuilder) { DeduplicationUtils.deduplicateKeys(streamsBuilder, streams, "deduplicateKeysStoreName", "deduplicateKeysRepartitionName", Duration.ZERO); DeduplicationUtils.deduplicateKeyValues(streamsBuilder, streams, "deduplicateKeyValuesStoreName", "deduplicateKeyValuesRepartitionName", Duration.ZERO); - DeduplicationUtils.deduplicateWithPredicate(streamsBuilder, streams, Duration.ZERO, null); + DeduplicationUtils.deduplicateWithPredicate(streamsBuilder, streams, Duration.ofMillis(1), null); - var enrichedStreams = streams.mapValues(KafkaStreamsStarterTest::enrichValue); + var enrichedStreams = streams.mapValues(KafkaStreamsStarterTopologyTest::enrichValue); + var enrichedStreams2 = streams.mapValues(KafkaStreamsStarterTopologyTest::enrichValue2); var processingResults = TopologyErrorHandler.catchErrors(enrichedStreams); + TopologyErrorHandler.catchErrors(enrichedStreams2, true); TopicWithSerdesTest.outputTopicWithSerdes().produce(processingResults); } + @Override + public String dlqTopic() { + return "dlqTopicUnitTests"; + } + private static ProcessingResult enrichValue(KafkaError input) { if(input != null) { String output = "output field"; @@ -36,4 +43,13 @@ private static ProcessingResult enrichValue(KafkaError input) { return ProcessingResult.fail(new IOException("an exception occurred"), "output error"); } } + + private static ProcessingResult enrichValue2(KafkaError input) { + if(input != null) { + String output = "output field 2"; + return ProcessingResult.success(output); + } else { + return ProcessingResult.fail(new IOException("an exception occurred"), "output error 2"); + } + } } diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/model/DlqTopicTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/model/DlqTopicTest.java new file mode 100644 index 00000000..3e63c6d3 --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/model/DlqTopicTest.java @@ -0,0 +1,33 @@ +package com.michelin.kstreamplify.model; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.when; + +public class DlqTopicTest { + + @Mock + private DlqTopic dlqTopicMock; + + @BeforeEach + public void setUp() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void testDlqTopicName() { + DlqTopic dlqTopic = DlqTopic.builder() + .name("TestTopic") + .build(); + + when(dlqTopicMock.getName()).thenReturn("TestTopic"); + + assertEquals("TestTopic", dlqTopic.getName()); + + dlqTopic.builder().toString(); + } +} \ No newline at end of file diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/test/DlqDeserializationExceptionHandlerTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/test/DlqDeserializationExceptionHandlerTest.java index 7ceb5a5c..6e1728e2 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/test/DlqDeserializationExceptionHandlerTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/test/DlqDeserializationExceptionHandlerTest.java @@ -2,6 +2,7 @@ import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext; import com.michelin.kstreamplify.error.DlqDeserializationExceptionHandler; +import com.michelin.kstreamplify.error.DlqExceptionHandler; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.RecordMetadata; @@ -14,6 +15,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.HashMap; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -67,6 +69,7 @@ private DlqDeserializationExceptionHandler initHandler() { DlqDeserializationExceptionHandler handler = mock(DlqDeserializationExceptionHandler.class); when(handler.getProducer()).thenReturn(kafkaProducer); when(handler.handle(any(),any(),any())).thenCallRealMethod(); + doCallRealMethod().when(handler).configure(any()); handler.configure(new HashMap<>()); return handler; } @@ -107,4 +110,18 @@ public void handleShouldReturnFailBecauseOfException() { assertEquals(DeserializationExceptionHandler.DeserializationHandlerResponse.FAIL, response); ctx.close(); } + + + @Test + public void testConfigure() { + var handler = initHandler(); + + Map configs = new HashMap<>(); + when(handler.getProducer()).thenReturn(null); + + try (var mockHandler = mockStatic(DlqExceptionHandler.class)) { + handler.configure(configs); + } + } + } diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/test/DlqProductionExceptionHandlerTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/test/DlqProductionExceptionHandlerTest.java index 07d6f044..fee0133d 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/test/DlqProductionExceptionHandlerTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/test/DlqProductionExceptionHandlerTest.java @@ -7,6 +7,7 @@ import com.michelin.kstreamplify.avro.KafkaError; import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext; import com.michelin.kstreamplify.error.DlqDeserializationExceptionHandler; +import com.michelin.kstreamplify.error.DlqExceptionHandler; import com.michelin.kstreamplify.error.DlqProductionExceptionHandler; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.KafkaProducer; @@ -20,7 +21,9 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.Mock; import org.mockito.MockedStatic; +import org.mockito.MockitoAnnotations; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -76,6 +79,7 @@ private DlqProductionExceptionHandler initHandler() { DlqProductionExceptionHandler handler = mock(DlqProductionExceptionHandler.class); when(handler.getProducer()).thenReturn(kafkaProducer); when(handler.handle(any(),any())).thenCallRealMethod(); + doCallRealMethod().when(handler).configure(any()); handler.configure(new HashMap<>()); return handler; } @@ -116,5 +120,16 @@ public void handleShouldReturnContinueBecauseOfException() { assertEquals(ProductionExceptionHandlerResponse.CONTINUE, response); ctx.close(); } + + @Test + public void testConfigure() { + var handler = initHandler(); + + Map configs = new HashMap<>(); + when(handler.getProducer()).thenReturn(null); + try (var mockHandler = mockStatic(DlqExceptionHandler.class)) { + handler.configure(configs); + } + } } diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/test/TopologyErrorHandlerTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/test/TopologyErrorHandlerTest.java deleted file mode 100644 index a1f506d3..00000000 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/test/TopologyErrorHandlerTest.java +++ /dev/null @@ -1,4 +0,0 @@ -package com.michelin.kstreamplify.test; - -class TopologyErrorHandlerTest { -} diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/utils/TopicWithSerdesTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/utils/TopicWithSerdesTest.java index 298d7ef2..8312d1c2 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/utils/TopicWithSerdesTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/utils/TopicWithSerdesTest.java @@ -3,6 +3,7 @@ import com.michelin.kstreamplify.avro.KafkaError; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.junit.jupiter.api.Test; public class TopicWithSerdesTest extends TopicWithSerde { @@ -17,4 +18,5 @@ public static TopicWithSerdesTest outputTopicWithSerdes() { public static TopicWithSerdesTest inputTopicWithSerdes() { return new TopicWithSerdesTest<>("INPUT_TOPIC", "APP_NAME", Serdes.String(), SerdesUtils.getSerdesForValue()); } + } diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/utils/TopicWithSerdesUnitTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/utils/TopicWithSerdesUnitTest.java new file mode 100644 index 00000000..4c6c1453 --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/utils/TopicWithSerdesUnitTest.java @@ -0,0 +1,13 @@ +package com.michelin.kstreamplify.utils; + +import org.apache.kafka.common.serialization.Serdes; +import org.junit.jupiter.api.Test; + +public class TopicWithSerdesUnitTest { + + @Test + public void topicWithSerdes() { + new TopicWithSerde<>("INPUT_TOPIC", Serdes.String(), Serdes.String()); + } + +} diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/utils/WindowStateStoreUtilsTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/utils/WindowStateStoreUtilsTest.java new file mode 100644 index 00000000..1ea8dab8 --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/utils/WindowStateStoreUtilsTest.java @@ -0,0 +1,40 @@ +package com.michelin.kstreamplify.utils; + +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.time.Duration; +import java.time.Instant; +import java.util.Iterator; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +public class WindowStateStoreUtilsTest { + + private WindowStore windowStore; + + @BeforeEach + void setUp() { + windowStore = mock(WindowStore.class); + } + + @Test + void testPut() { + // Mock data + String key = "testKey"; + String value = "testValue"; + + // Call the put method + WindowStateStoreUtils.put(windowStore, key, value); + WindowStateStoreUtils.get(windowStore, key, 1); + WindowStateStoreUtils.get(windowStore, "nothing", 1); + + // Verify that the put method of the windowStore is called with the correct arguments + verify(windowStore, times(1)).put(eq(key), eq(value), anyLong()); + } +}