Skip to content

Commit

Permalink
add TU
Browse files Browse the repository at this point in the history
  • Loading branch information
adriencalime committed Sep 6, 2023
1 parent c562da1 commit e9d0121
Show file tree
Hide file tree
Showing 14 changed files with 322 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
* @param <V> The model used as the value avro of the topic.
*/
@AllArgsConstructor(access = AccessLevel.PUBLIC)
public final class TopicWithSerde<K, V> {
public class TopicWithSerde<K, V> {
/**
* Name of the topic
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,48 @@
package com.michelin.kstreamplify;

import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext;
import com.michelin.kstreamplify.initializer.KafkaStreamsInitializer;
import com.michelin.kstreamplify.initializer.KafkaStreamsStarterTest;
import com.michelin.kstreamplify.initializer.KafkaStreamsStarterTopologyTest;
import com.michelin.kstreamplify.model.TopologyExposeJsonModel;
import com.michelin.kstreamplify.services.ConvertTopology;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.Properties;

import static org.junit.jupiter.api.Assertions.assertNotNull;

public class TopologyTest {

protected TopologyTestDriver testDriver;
@Test
public void convertTopologyForRestTest() {

Properties properties = new Properties();
properties.setProperty("application.id", "test");
properties.setProperty("bootstrap.servers", "mock:1234");
properties.setProperty("state.dir", "/tmp/kafka-streams");
KafkaStreamsExecutionContext.registerProperties(properties);
KafkaStreamsExecutionContext.setDlqTopicName("DLQ_TOPIC");
KafkaStreamsExecutionContext.setSerdesConfig(Collections.singletonMap("schema.registry.url", "mock://" + this.getClass().getName()));


StreamsBuilder streamsBuilder = new StreamsBuilder();
KafkaStreamsStarterTopologyTest kafkaStreamsStarterTopologyTest = new KafkaStreamsStarterTopologyTest();
kafkaStreamsStarterTopologyTest.topology(streamsBuilder);
KafkaStreamsInitializer kafkaStreamsInitializer = new KafkaStreamsInitializer();
kafkaStreamsInitializer.init(new KafkaStreamsStarterTest());
kafkaStreamsInitializer.init(kafkaStreamsStarterTopologyTest);
TopologyExposeJsonModel topology = ConvertTopology.convertTopologyForRest("STREAM", kafkaStreamsInitializer.getTopology());

testDriver = new TopologyTestDriver(streamsBuilder.build(), properties, this.getInitialWallClockTime());

assertNotNull(topology);
testDriver.advanceWallClockTime(Duration.ofDays(1));
}
private Instant getInitialWallClockTime() {
return Instant.ofEpochMilli(1577836800000L);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.michelin.kstreamplify.converter;

import com.michelin.kstreamplify.avro.KafkaTest;
import com.michelin.kstreamplify.avro.KafkaTestAvro;
import com.michelin.kstreamplify.converter.JsonToAvroConverter;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
Expand All @@ -14,7 +14,7 @@ class JsonToAvroConverterTest {

@Test
void shouldConvertJsonToAvro() {
KafkaTest kafkaTest = (KafkaTest) JsonToAvroConverter.jsonToAvro(JSON, KafkaTest.getClassSchema());
KafkaTestAvro kafkaTest = (KafkaTestAvro) JsonToAvroConverter.jsonToAvro(JSON, KafkaTestAvro.getClassSchema());
assertEquals("val1", kafkaTest.getMembersString().get("key1"));
assertEquals(8, kafkaTest.getSplit().get(0).getSubSplit().get(0).getSubSubIntField());
assertEquals("subSubTest", kafkaTest.getSplit().get(0).getSubSplit().get(0).getSubSubField());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.michelin.kstreamplify.deduplication;

import com.michelin.kstreamplify.avro.KafkaError;
import com.michelin.kstreamplify.error.ProcessingResult;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.WindowStore;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import java.time.Duration;

import static org.mockito.Mockito.*;

class DedupKeyValueProcessorTest {

@Mock
private ProcessorContext<String, ProcessingResult<KafkaError, KafkaError>> context;

@Mock
private WindowStore<String, String> windowStore;

@InjectMocks
private DedupKeyValueProcessor<KafkaError> dedupKeyValueProcessor;

@BeforeEach
void setUp() {
MockitoAnnotations.openMocks(this);
when(context.getStateStore("dedupStoreName")).thenReturn(windowStore);
}

@Test
void testProcessNewRecord() {
String key = "some-key";
KafkaError value = new KafkaError();

Record<String, KafkaError> record = new Record<>(key, value, 0);

DedupKeyValueProcessor<KafkaError> dedupKeyValueProcessor = new DedupKeyValueProcessor<>("dedupStoreName",Duration.ZERO);
dedupKeyValueProcessor.init(context);
dedupKeyValueProcessor.process(record);

// verify(windowStore).put(eq(key), any());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package com.michelin.kstreamplify.deduplication;

import com.michelin.kstreamplify.avro.KafkaError;
import com.michelin.kstreamplify.error.ProcessingResult;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.api.MockProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.Iterator;

import static com.google.common.base.Verify.verify;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class DedupWithPredicateProcessorTest {

private DedupWithPredicateProcessor<String, KafkaError> processor;
private ProcessorContext<String, ProcessingResult<KafkaError, KafkaError>> context;
private TimestampedKeyValueStore<String, KafkaError> store;

@BeforeEach
public void setUp() {
// Initialize mock objects and the processor
context = mock(ProcessorContext.class);
store = mock(TimestampedKeyValueStore.class);

// Create an instance of DedupWithPredicateProcessor for testing
processor = new DedupWithPredicateProcessor<>("testStore", Duration.ofHours(1), TestKeyExtractor::extract);

// Stub the context.getStateStore method to return the mock store
when(context.getStateStore("testStore")).thenReturn(store);

processor.init(context);
}

@Test
public void testProcessFirstTime() {
// Create a test record
Record<String, KafkaError> record = new Record<>("key", new KafkaError(), 0L);

// Stub store.get to return null, indicating it's the first time
when(store.get("key")).thenReturn(null);

// Call the process method
processor.process(record);

// Verify that the record is stored in the store and forwarded
store.put(eq("key"), any());
context.forward(any());
}

@Test
public void testProcessDuplicate() {
// Create a test record
Record<String, KafkaError> record = new Record<>("key", new KafkaError(), 0L);

// Stub store.get to return a value, indicating a duplicate
when(store.get("key")).thenReturn(ValueAndTimestamp.make(new KafkaError(), 0L));

// Call the process method
processor.process(record);

// Verify that the record is not stored again and not forwarded
// verify(store, never()).put(any(), any());
// verify(context, never()).forward(any());
}

// Add more test cases as needed

// Example: Test error handling in process method
@Test
public void testProcessError() {
// Create a test record that will trigger an exception
Record<String, KafkaError> record = new Record<>("key", null, 0L);

// Call the process method
processor.process(record);

// Verify that an error message is forwarded
// verify(context).forward(argThat(result -> result.isFailure() && result.getErrorMessage().contains("Couldn't figure out")));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.michelin.kstreamplify.deduplication;

import org.apache.avro.specific.SpecificRecord;

public class TestKeyExtractor {
public static <V extends SpecificRecord> String extract(V v) {
return "";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import java.io.IOException;
import java.time.Duration;

public class KafkaStreamsStarterTest implements KafkaStreamsStarter{
public class KafkaStreamsStarterTopologyTest implements KafkaStreamsStarter {


@Override
Expand All @@ -20,14 +20,21 @@ public void topology(StreamsBuilder streamsBuilder) {

DeduplicationUtils.deduplicateKeys(streamsBuilder, streams, "deduplicateKeysStoreName", "deduplicateKeysRepartitionName", Duration.ZERO);
DeduplicationUtils.deduplicateKeyValues(streamsBuilder, streams, "deduplicateKeyValuesStoreName", "deduplicateKeyValuesRepartitionName", Duration.ZERO);
DeduplicationUtils.deduplicateWithPredicate(streamsBuilder, streams, Duration.ZERO, null);
DeduplicationUtils.deduplicateWithPredicate(streamsBuilder, streams, Duration.ofMillis(1), null);

var enrichedStreams = streams.mapValues(KafkaStreamsStarterTest::enrichValue);
var enrichedStreams = streams.mapValues(KafkaStreamsStarterTopologyTest::enrichValue);
var enrichedStreams2 = streams.mapValues(KafkaStreamsStarterTopologyTest::enrichValue2);
var processingResults = TopologyErrorHandler.catchErrors(enrichedStreams);
TopologyErrorHandler.catchErrors(enrichedStreams2, true);
TopicWithSerdesTest.outputTopicWithSerdes().produce(processingResults);

}

@Override
public String dlqTopic() {
return "dlqTopicUnitTests";
}

private static ProcessingResult<String,String> enrichValue(KafkaError input) {
if(input != null) {
String output = "output field";
Expand All @@ -36,4 +43,13 @@ private static ProcessingResult<String,String> enrichValue(KafkaError input) {
return ProcessingResult.fail(new IOException("an exception occurred"), "output error");
}
}

private static ProcessingResult<String,String> enrichValue2(KafkaError input) {
if(input != null) {
String output = "output field 2";
return ProcessingResult.success(output);
} else {
return ProcessingResult.fail(new IOException("an exception occurred"), "output error 2");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.michelin.kstreamplify.model;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.when;

public class DlqTopicTest {

@Mock
private DlqTopic dlqTopicMock;

@BeforeEach
public void setUp() {
MockitoAnnotations.initMocks(this);
}

@Test
public void testDlqTopicName() {
DlqTopic dlqTopic = DlqTopic.builder()
.name("TestTopic")
.build();

when(dlqTopicMock.getName()).thenReturn("TestTopic");

assertEquals("TestTopic", dlqTopic.getName());

dlqTopic.builder().toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext;
import com.michelin.kstreamplify.error.DlqDeserializationExceptionHandler;
import com.michelin.kstreamplify.error.DlqExceptionHandler;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.RecordMetadata;
Expand All @@ -14,6 +15,7 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -67,6 +69,7 @@ private DlqDeserializationExceptionHandler initHandler() {
DlqDeserializationExceptionHandler handler = mock(DlqDeserializationExceptionHandler.class);
when(handler.getProducer()).thenReturn(kafkaProducer);
when(handler.handle(any(),any(),any())).thenCallRealMethod();
doCallRealMethod().when(handler).configure(any());
handler.configure(new HashMap<>());
return handler;
}
Expand Down Expand Up @@ -107,4 +110,18 @@ public void handleShouldReturnFailBecauseOfException() {
assertEquals(DeserializationExceptionHandler.DeserializationHandlerResponse.FAIL, response);
ctx.close();
}


@Test
public void testConfigure() {
var handler = initHandler();

Map<String, Object> configs = new HashMap<>();
when(handler.getProducer()).thenReturn(null);

try (var mockHandler = mockStatic(DlqExceptionHandler.class)) {
handler.configure(configs);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.michelin.kstreamplify.avro.KafkaError;
import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext;
import com.michelin.kstreamplify.error.DlqDeserializationExceptionHandler;
import com.michelin.kstreamplify.error.DlqExceptionHandler;
import com.michelin.kstreamplify.error.DlqProductionExceptionHandler;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
Expand All @@ -20,7 +21,9 @@
import org.apache.kafka.streams.processor.ProcessorContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.MockitoAnnotations;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -76,6 +79,7 @@ private DlqProductionExceptionHandler initHandler() {
DlqProductionExceptionHandler handler = mock(DlqProductionExceptionHandler.class);
when(handler.getProducer()).thenReturn(kafkaProducer);
when(handler.handle(any(),any())).thenCallRealMethod();
doCallRealMethod().when(handler).configure(any());
handler.configure(new HashMap<>());
return handler;
}
Expand Down Expand Up @@ -116,5 +120,16 @@ public void handleShouldReturnContinueBecauseOfException() {
assertEquals(ProductionExceptionHandlerResponse.CONTINUE, response);
ctx.close();
}

@Test
public void testConfigure() {
var handler = initHandler();

Map<String, Object> configs = new HashMap<>();
when(handler.getProducer()).thenReturn(null);
try (var mockHandler = mockStatic(DlqExceptionHandler.class)) {
handler.configure(configs);
}
}
}

Loading

0 comments on commit e9d0121

Please sign in to comment.