Skip to content

Commit

Permalink
Add unit tests coverage (#62)
Browse files Browse the repository at this point in the history
* add TU

* fix conflicts

* add TU

* add TU

* remove code smells

* remove code smells

* remove code smells

* remove code smells

* Update unit tests

* Fix lint

* Add tests for kafka Streams initializer

---------

Co-authored-by: Loïc Greffier <[email protected]>
  • Loading branch information
adriencalime and Loïc Greffier committed Sep 26, 2023
1 parent 8ba101f commit d2a2954
Show file tree
Hide file tree
Showing 28 changed files with 1,378 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ void setUp() {

@Test
void shouldContinueWhenProcessingValueIsValid() {

stringInputTopic.pipeInput("key", "message");

var resultDlq = dlqTopic.readValuesToList();
Expand All @@ -109,7 +108,6 @@ void shouldSendExceptionToDlqWhenProcessingValueIsInvalid() {

@Test
void shouldContinueWhenProcessingValueIsValidAvro() {

KafkaError avroModel = KafkaError.newBuilder()
.setTopic("topic")
.setStack("stack")
Expand All @@ -130,7 +128,6 @@ void shouldContinueWhenProcessingValueIsValidAvro() {

@Test
void shouldContinueWhenProcessingValueIsInvalidAvro() {

avroInputTopic.pipeInput("key", null);

List<KafkaError> resultDlq = dlqTopic.readValuesToList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
Expand All @@ -21,6 +23,7 @@
/**
* The class to convert Json to Avro.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class JsonToAvroConverter {
/**
* Convert a file in json to avro.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import com.michelin.kstreamplify.avro.KafkaError;
import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext;
import java.util.Map;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
Expand All @@ -15,10 +17,18 @@
* The class managing deserialization exceptions.
*/
@Slf4j
@NoArgsConstructor
public class DlqDeserializationExceptionHandler extends DlqExceptionHandler
implements DeserializationExceptionHandler {
private static final Object GUARD = new Object();

/**
* Constructor.
*/
public DlqDeserializationExceptionHandler(Producer<byte[], KafkaError> producer) {
DlqExceptionHandler.producer = producer;
}

/**
* Manage deserialization exceptions.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Properties;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
Expand All @@ -21,15 +23,16 @@ public abstract class DlqExceptionHandler {
/**
* The DLQ producer.
*/
protected static KafkaProducer<byte[], KafkaError> producer;
@Getter
protected static Producer<byte[], KafkaError> producer;

/**
* Create a producer.
*
* @param clientId The producer client id
* @param configs The producer configs
*/
protected static void instantiateProducer(String clientId, Map<String, ?> configs) {
public static void instantiateProducer(String clientId, Map<String, ?> configs) {
Properties properties = new Properties();
properties.putAll(configs);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
Expand All @@ -49,9 +52,9 @@ protected static void instantiateProducer(String clientId, Map<String, ?> config
* @param value the record value
* @return the error enriched by the exception
*/
protected KafkaError.Builder enrichWithException(KafkaError.Builder builder,
Exception exception, byte[] key,
byte[] value) {
public KafkaError.Builder enrichWithException(KafkaError.Builder builder,
Exception exception, byte[] key,
byte[] value) {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
exception.printStackTrace(pw);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import com.michelin.kstreamplify.avro.KafkaError;
import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext;
import java.util.Map;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
Expand All @@ -13,10 +15,18 @@
* The class managing DLQ production exceptions.
*/
@Slf4j
@NoArgsConstructor
public class DlqProductionExceptionHandler extends DlqExceptionHandler
implements ProductionExceptionHandler {
private static final Object GUARD = new Object();

/**
* Constructor.
*/
public DlqProductionExceptionHandler(Producer<byte[], KafkaError> producer) {
DlqExceptionHandler.producer = producer;
}

/**
* Manage production exceptions.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class ProcessingResult<V, V2> {
*
* @param value The success value
*/
private ProcessingResult(V value) {
public ProcessingResult(V value) {
this.value = value;
}

Expand All @@ -35,7 +35,7 @@ private ProcessingResult(V value) {
*
* @param error the ProcessingError containing the
*/
private ProcessingResult(ProcessingError<V2> error) {
public ProcessingResult(ProcessingError<V2> error) {
this.error = error;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
* @param <V> The model used as the value avro of the topic.
*/
@AllArgsConstructor(access = AccessLevel.PUBLIC)
public final class TopicWithSerde<K, V> {
public class TopicWithSerde<K, V> {
/**
* Name of the topic.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ public static <K, V> V get(WindowStore<K, V> stateStore, K key, int retentionDay
var resultIterator =
stateStore.backwardFetch(key, Instant.now().minus(Duration.ofDays(retentionDays)),
Instant.now());

if (resultIterator != null && resultIterator.hasNext()) {
return resultIterator.next().value;
}

return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.michelin.kstreamplify.converter;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;

import com.michelin.kstreamplify.avro.KafkaTestAvro;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;

@Slf4j
class JsonToAvroConverterTest {

private static final String JSON =
"{\"membersString\":{\"key1\":\"val1\"},\"split\":[{\"subSplit\":"
+ "[{\"subSubIntField\":8,\"subSubField\":\"subSubTest\"}],\"subField\":\"subTest\"}],"
+ "\"booleanField\":false,\"members\":{\"key1\":{\"mapQuantityField\":1}},"
+ "\"quantityField\":10,\"stringField\":\"test\",\"listString\":[\"val1\",\"val2\"]}";

@Test
void shouldConvertJsonToAvro() {
KafkaTestAvro kafkaTest = (KafkaTestAvro) JsonToAvroConverter.jsonToAvro(JSON, KafkaTestAvro.getClassSchema());
assertEquals("val1", kafkaTest.getMembersString().get("key1"));
assertEquals(8, kafkaTest.getSplit().get(0).getSubSplit().get(0).getSubSubIntField());
assertEquals("subSubTest", kafkaTest.getSplit().get(0).getSubSplit().get(0).getSubSubField());
assertEquals("subTest", kafkaTest.getSplit().get(0).getSubField());
assertFalse(kafkaTest.getBooleanField());
assertEquals("1.0000", kafkaTest.getMembers().get("key1").getMapQuantityField().toString());
assertEquals("10.0000", kafkaTest.getQuantityField().toString());
assertEquals("test", kafkaTest.getStringField());
assertEquals("val1", kafkaTest.getListString().get(0));
assertEquals("val2", kafkaTest.getListString().get(1));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.michelin.kstreamplify.deduplication;

import static org.mockito.Mockito.any;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.michelin.kstreamplify.avro.KafkaError;
import com.michelin.kstreamplify.error.ProcessingResult;
import java.time.Duration;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
class DedupKeyProcessorTest {

@Mock
private ProcessorContext<String, ProcessingResult<KafkaError, KafkaError>> context;

@Mock
private TimestampedKeyValueStore<String, String> dedupTimestampedStore;

@Test
void shouldProcessNewRecord() {
String key = "some-key";

when(context.getStateStore("dedupStoreName")).thenReturn(dedupTimestampedStore);
when(dedupTimestampedStore.get(key)).thenReturn(null);

DedupKeyProcessor<KafkaError> dedupKeyProcessor = new DedupKeyProcessor<>("dedupStoreName", Duration.ZERO);
dedupKeyProcessor.init(context);

KafkaError value = new KafkaError();
Record<String, KafkaError> record = new Record<>(key, value, 0);
dedupKeyProcessor.process(record);

verify(dedupTimestampedStore).put(eq(key), any());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.michelin.kstreamplify.deduplication;

import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.michelin.kstreamplify.avro.KafkaError;
import com.michelin.kstreamplify.error.ProcessingResult;
import java.time.Duration;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.WindowStore;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
class DedupKeyValueProcessorTest {

@Mock
private ProcessorContext<String, ProcessingResult<KafkaError, KafkaError>> context;

@Mock
private WindowStore<String, KafkaError> windowStore;

@Test
void shouldProcessNewRecord() {
String key = "some-key";
KafkaError value = new KafkaError();

Record<String, KafkaError> record = new Record<>(key, value, 0);

when(context.getStateStore("dedupStoreName")).thenReturn(windowStore);

DedupKeyValueProcessor<KafkaError> dedupKeyValueProcessor = new DedupKeyValueProcessor<>("dedupStoreName",
Duration.ZERO);
dedupKeyValueProcessor.init(context);
dedupKeyValueProcessor.process(record);

verify(windowStore).put(record.key(), record.value(), record.timestamp());
}
}
Loading

0 comments on commit d2a2954

Please sign in to comment.