Skip to content

Commit

Permalink
Update unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Loïc Greffier committed Sep 18, 2023
1 parent db33ceb commit ddc683c
Show file tree
Hide file tree
Showing 20 changed files with 168 additions and 228 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ void generalTearDown() throws IOException {
* @return The corresponding TestInputTopic
*/
protected <K, V> TestInputTopic<K, V> createInputTestTopic(TopicWithSerde<K, V> topicWithSerde) {
return this.testDriver.createInputTopic(topicWithSerde.getUnPrefixedName(), topicWithSerde.getKeySerde().serializer(), topicWithSerde.getValueSerde().serializer());
return this.testDriver.createInputTopic(topicWithSerde.getUnPrefixedName(),
topicWithSerde.getKeySerde().serializer(), topicWithSerde.getValueSerde().serializer());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ public void init(ProcessorContext<K, ProcessingResult<V, V>> context) {
@Override
public void process(Record<K, V> record) {
try {

String identifier = deduplicationKeyExtractor.apply(record.value());
// Retrieve the matching identifier in the statestore and return null if found it (signaling a duplicate)
if (dedupTimestampedStore.get(identifier) == null) {
Expand All @@ -88,7 +87,7 @@ public void process(Record<K, V> record) {
processorContext.forward(ProcessingResult.wrapRecordSuccess(record));
}
} catch (Exception e) {
processorContext.forward(ProcessingResult.wrapRecordFailure(e, record, "Couldn't figure out what to do with the current payload: An unlikely error occured during deduplication transform"));
processorContext.forward(ProcessingResult.wrapRecordFailure(e, record, "Couldn't figure out what to do with the current payload: An unlikely error occurred during deduplication transform"));
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;

@Slf4j
class JsonToAvroConverterTest {
Expand All @@ -19,7 +20,7 @@ void shouldConvertJsonToAvro() {
assertEquals(8, kafkaTest.getSplit().get(0).getSubSplit().get(0).getSubSubIntField());
assertEquals("subSubTest", kafkaTest.getSplit().get(0).getSubSplit().get(0).getSubSubField());
assertEquals("subTest", kafkaTest.getSplit().get(0).getSubField());
assertEquals(false, kafkaTest.getBooleanField());
assertFalse(kafkaTest.getBooleanField());
assertEquals("1.0000", kafkaTest.getMembers().get("key1").getMapQuantityField().toString());
assertEquals("10.0000", kafkaTest.getQuantityField().toString());
assertEquals("test", kafkaTest.getStringField());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,17 @@
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import java.time.Duration;
import org.mockito.junit.jupiter.MockitoExtension;

import static org.mockito.Mockito.*;

@ExtendWith(MockitoExtension.class)
class DedupKeyProcessorTest {

@Mock
Expand All @@ -26,19 +29,14 @@ class DedupKeyProcessorTest {
@InjectMocks
private DedupKeyProcessor<KafkaError> dedupKeyProcessor;

@BeforeEach
void setUp() {
MockitoAnnotations.openMocks(this);
when(context.getStateStore("dedupStoreName")).thenReturn(dedupTimestampedStore);
}

@Test
void testProcessNewRecord() {
void shouldProcessNewRecord() {
String key = "some-key";
KafkaError value = new KafkaError();

Record<String, KafkaError> record = new Record<>(key, value, 0);

when(context.getStateStore("dedupStoreName")).thenReturn(dedupTimestampedStore);
when(dedupTimestampedStore.get(key)).thenReturn(null);

DedupKeyProcessor<KafkaError> dedupKeyProcessor = new DedupKeyProcessor<>("dedupStoreName",Duration.ZERO);
Expand All @@ -47,5 +45,4 @@ void testProcessNewRecord() {

verify(dedupTimestampedStore).put(eq(key), any());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,44 +8,42 @@
import org.apache.kafka.streams.state.WindowStore;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import java.time.Duration;
import org.mockito.junit.jupiter.MockitoExtension;

import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.Mockito.*;

@ExtendWith(MockitoExtension.class)
class DedupKeyValueProcessorTest {

@Mock
private ProcessorContext<String, ProcessingResult<KafkaError, KafkaError>> context;

@Mock
private WindowStore<String, String> windowStore;
private WindowStore<String, KafkaError> windowStore;

@InjectMocks
private DedupKeyValueProcessor<KafkaError> dedupKeyValueProcessor;

@BeforeEach
void setUp() {
MockitoAnnotations.openMocks(this);
when(context.getStateStore("dedupStoreName")).thenReturn(windowStore);
}

@Test
void testProcessNewRecord() {
void shouldProcessNewRecord() {
String key = "some-key";
KafkaError value = new KafkaError();

Record<String, KafkaError> record = new Record<>(key, value, 0);

when(context.getStateStore("dedupStoreName")).thenReturn(windowStore);

DedupKeyValueProcessor<KafkaError> dedupKeyValueProcessor = new DedupKeyValueProcessor<>("dedupStoreName",Duration.ZERO);
dedupKeyValueProcessor.init(context);
dedupKeyValueProcessor.process(record);

assertNotNull(record);
verify(windowStore).put(record.key(), record.value(), record.timestamp());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.michelin.kstreamplify.avro.KafkaError;
import com.michelin.kstreamplify.error.ProcessingResult;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.api.MockProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorContext;
Expand All @@ -13,26 +14,34 @@

import java.time.Duration;
import java.util.Iterator;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

import static com.google.common.base.Verify.verify;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class DedupWithPredicateProcessorTest {

private DedupWithPredicateProcessor<String, KafkaError> processor;

@Mock
private ProcessorContext<String, ProcessingResult<KafkaError, KafkaError>> context;

@Mock
private TimestampedKeyValueStore<String, KafkaError> store;

@BeforeEach
public void setUp() {
// Initialize mock objects and the processor
context = mock(ProcessorContext.class);
store = mock(TimestampedKeyValueStore.class);

// Create an instance of DedupWithPredicateProcessor for testing
processor = new DedupWithPredicateProcessor<>("testStore", Duration.ofHours(1), TestKeyExtractor::extract);

Expand All @@ -43,46 +52,52 @@ public void setUp() {
}

@Test
void testProcessFirstTime() {
void shouldProcessFirstTime() {
// Create a test record
Record<String, KafkaError> record = new Record<>("key", new KafkaError(), 0L);

// Stub store.get to return null, indicating it's the first time
when(store.get("key")).thenReturn(null);
when(store.get(any())).thenReturn(null);

// Call the process method
processor.process(record);

// Verify that the record is stored in the store and forwarded
store.put(eq("key"), any());
context.forward(any());

assertNotNull(record);
verify(store).put(eq(""), argThat(arg -> arg.value().equals(record.value())));
verify(context).forward(argThat(arg -> arg.value().getValue().equals(record.value())));
}

@Test
void testProcessDuplicate() {
void shouldProcessDuplicate() {
// Create a test record
Record<String, KafkaError> record = new Record<>("key", new KafkaError(), 0L);

// Stub store.get to return a value, indicating a duplicate
when(store.get("key")).thenReturn(ValueAndTimestamp.make(new KafkaError(), 0L));
when(store.get("")).thenReturn(ValueAndTimestamp.make(new KafkaError(), 0L));

// Call the process method
processor.process(record);

assertNotNull(record);
verify(store, never()).put(any(), any());
verify(context, never()).forward(any());
}

// Example: Test error handling in process method
@Test
void testProcessError() {
// Create a test record that will trigger an exception
Record<String, KafkaError> record = new Record<>("key", null, 0L);
void shouldThrowException() {
// Create a test record
Record<String, KafkaError> record = new Record<>("key", new KafkaError(), 0L);

when(store.get(any())).thenReturn(null);
doThrow(new RuntimeException("Exception...")).when(store).put(any(), any());

// Call the process method
processor.process(record);

assertNotNull(record);
verify(context).forward(argThat(arg -> arg.value().getError().getContextMessage().equals("Couldn't figure out what to do with the current payload: An unlikely error occurred during deduplication transform")));
}

public static class TestKeyExtractor {
public static <V extends SpecificRecord> String extract(V v) {
return "";
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package com.michelin.kstreamplify.initializer;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.michelin.kstreamplify.avro.KafkaError;
import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext;
import com.michelin.kstreamplify.deduplication.DeduplicationUtils;
import com.michelin.kstreamplify.error.ProcessingResult;
import com.michelin.kstreamplify.error.TopologyErrorHandler;
import com.michelin.kstreamplify.utils.TopicWithSerdesTestHelper;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.Properties;
import lombok.Getter;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.junit.jupiter.api.Test;

class KafkaStreamsStarterTest {

@Test
void shouldInstantiateKafkaStreamsStarter() {
KafkaStreamsExecutionContext.registerProperties(new Properties());
KafkaStreamsExecutionContext.setSerdesConfig(Map.of(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://"));

StreamsBuilder builder = new StreamsBuilder();
KafkaStreamsStarterImpl starter = new KafkaStreamsStarterImpl();
starter.topology(builder);

assertNotNull(builder.build().describe());
assertEquals("dlqTopicUnitTests", starter.dlqTopic());

starter.onStart(null);
assertTrue(starter.isStarted());
}

@Getter
static class KafkaStreamsStarterImpl extends KafkaStreamsStarter {
private boolean started;

@Override
public void topology(StreamsBuilder streamsBuilder) {
var streams = TopicWithSerdesTestHelper.inputTopicWithSerdes().stream(streamsBuilder);

DeduplicationUtils.deduplicateKeys(streamsBuilder, streams, "deduplicateKeysStoreName", "deduplicateKeysRepartitionName", Duration.ZERO);
DeduplicationUtils.deduplicateKeyValues(streamsBuilder, streams, "deduplicateKeyValuesStoreName", "deduplicateKeyValuesRepartitionName", Duration.ZERO);
DeduplicationUtils.deduplicateWithPredicate(streamsBuilder, streams, Duration.ofMillis(1), null);

var enrichedStreams = streams.mapValues(KafkaStreamsStarterImpl::enrichValue);
var enrichedStreams2 = streams.mapValues(KafkaStreamsStarterImpl::enrichValue2);
var processingResults = TopologyErrorHandler.catchErrors(enrichedStreams);
TopologyErrorHandler.catchErrors(enrichedStreams2, true);
TopicWithSerdesTestHelper.outputTopicWithSerdes().produce(processingResults);

}

@Override
public String dlqTopic() {
return "dlqTopicUnitTests";
}

@Override
public void onStart(KafkaStreams kafkaStreams) {
started = true;
}

private static ProcessingResult<String, String> enrichValue(KafkaError input) {
if (input != null) {
String output = "output field";
return ProcessingResult.success(output);
} else {
return ProcessingResult.fail(new IOException("an exception occurred"), "output error");
}
}

private static ProcessingResult<String, String> enrichValue2(KafkaError input) {
if (input != null) {
String output = "output field 2";
return ProcessingResult.success(output);
} else {
return ProcessingResult.fail(new IOException("an exception occurred"), "output error 2");
}
}
}
}
Loading

0 comments on commit ddc683c

Please sign in to comment.