Skip to content

Commit

Permalink
Improve unit tests coverage (#97)
Browse files Browse the repository at this point in the history
* Hit Sonar quality gate

* Add Kafka Streams exec context tests

* Add properties utils tests

* Add test on topic with serdes

* Lint

* Improve Dlq serialization tests

* Lint
  • Loading branch information
Loïc GREFFIER authored Sep 19, 2023
1 parent 061a86e commit e214078
Show file tree
Hide file tree
Showing 19 changed files with 284 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.michelin.kstreamplify.error.TopologyErrorHandler;
import com.michelin.kstreamplify.initializer.KafkaStreamsStarter;
import com.michelin.kstreamplify.utils.SerdesUtils;
import com.michelin.kstreamplify.utils.TopicWithSerde;
import java.util.List;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
Expand Down Expand Up @@ -136,4 +137,19 @@ void shouldContinueWhenProcessingValueIsInvalidAvro() {
assertEquals(1, resultDlq.size());
assertEquals(0, resultOutput.size());
}

@Test
void shouldCreateInputAndOutputTopicsWithSerde() {
TestInputTopic<String, String> inputTopic = createInputTestTopic(new TopicWithSerde<>("INPUT_TOPIC",
"APP_NAME", Serdes.String(), Serdes.String()));

assertEquals("TestInputTopic[topic='INPUT_TOPIC', keySerializer=StringSerializer, "
+ "valueSerializer=StringSerializer]", inputTopic.toString());

TestOutputTopic<String, String> outputTopic = createOutputTestTopic(new TopicWithSerde<>("OUTPUT_TOPIC",
"APP_NAME", Serdes.String(), Serdes.String()));

assertEquals("TestOutputTopic[topic='OUTPUT_TOPIC', keyDeserializer=StringDeserializer, "
+ "valueDeserializer=StringDeserializer, size=0]", outputTopic.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

import java.util.Map;
import java.util.Properties;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -16,6 +18,7 @@
* The class to represent the context of the KStream.
*/
@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class KafkaStreamsExecutionContext {

/**
Expand All @@ -36,6 +39,7 @@ public class KafkaStreamsExecutionContext {
* The properties of the stream execution context.
*/
@Getter
@Setter
private static Properties properties;

/**
Expand All @@ -51,9 +55,6 @@ public class KafkaStreamsExecutionContext {
@Getter
private static String prefix;

private KafkaStreamsExecutionContext() {
}

/**
* Register KStream properties.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class KafkaStreamsInitializer {
/**
* The application properties.
*/
protected Properties properties;
protected Properties properties = new Properties();

/**
* The DLQ topic.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ public class TopicWithSerde<K, V> {
@Getter
private final Serde<V> valueSerde;


/**
* <p>Additional constructor which uses default parameter "self" for prefixPropertyKey.</p>
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.michelin.kstreamplify.context;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;

import java.util.Properties;
import org.apache.kafka.streams.StreamsConfig;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class KafkaStreamsExecutionContextTest {

@BeforeEach
void setUp() {
KafkaStreamsExecutionContext.setProperties(null);
}

@Test
void shouldNotRegisterPropertiesWhenNull() {
KafkaStreamsExecutionContext.registerProperties(null);
assertNull(KafkaStreamsExecutionContext.getProperties());
}

@Test
void shouldAddPrefixToAppId() {
Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
properties.put("prefix.self", "abc.");

KafkaStreamsExecutionContext.registerProperties(properties);

assertEquals("abc.", KafkaStreamsExecutionContext.getPrefix());
assertEquals("abc.appId", KafkaStreamsExecutionContext.getProperties()
.get(StreamsConfig.APPLICATION_ID_CONFIG));
}

@Test
void shouldNotAddPrefixToAppIdIfNoPrefix() {
Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");

KafkaStreamsExecutionContext.registerProperties(properties);

assertEquals("", KafkaStreamsExecutionContext.getPrefix());
assertEquals("appId", KafkaStreamsExecutionContext.getProperties()
.get(StreamsConfig.APPLICATION_ID_CONFIG));
}

@Test
void shouldNotAddPrefixToAppIdIfNotAppId() {
Properties properties = new Properties();
properties.put("prefix.self", "abc.");

KafkaStreamsExecutionContext.registerProperties(properties);

assertEquals("abc.", KafkaStreamsExecutionContext.getPrefix());
assertNull(KafkaStreamsExecutionContext.getProperties().get(StreamsConfig.APPLICATION_ID_CONFIG));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class DedupWithPredicateProcessorTest {
private TimestampedKeyValueStore<String, KafkaError> store;

@BeforeEach
public void setUp() {
void setUp() {
// Create an instance of DedupWithPredicateProcessor for testing
processor = new DedupWithPredicateProcessor<>("testStore", Duration.ofHours(1), TestKeyExtractor::extract);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package com.michelin.kstreamplify.rest;
package com.michelin.kstreamplify.error;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.when;

import com.michelin.kstreamplify.avro.KafkaError;
import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext;
import com.michelin.kstreamplify.error.DlqDeserializationExceptionHandler;
import com.michelin.kstreamplify.error.DlqExceptionHandler;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import java.nio.charset.StandardCharsets;
Expand All @@ -18,6 +17,7 @@
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
Expand Down Expand Up @@ -96,4 +96,45 @@ void shouldConfigure() {

assertTrue(DlqExceptionHandler.getProducer() instanceof KafkaProducer<byte[], KafkaError>);
}

@Test
void shouldEnrichWithException() {
KafkaError.Builder kafkaError = KafkaError.newBuilder()
.setTopic("topic")
.setStack("stack")
.setPartition(0)
.setOffset(0)
.setCause("cause")
.setValue("value");

handler = new DlqDeserializationExceptionHandler();
KafkaError.Builder enrichedBuilder = handler.enrichWithException(kafkaError,
new RuntimeException("Exception..."), "key".getBytes(StandardCharsets.UTF_8),
"value".getBytes(StandardCharsets.UTF_8));

KafkaError error = enrichedBuilder.build();
assertEquals("Unknown cause", error.getCause());
assertNull(error.getContextMessage());
}

@Test
void shouldEnrichWithRecordTooLargeException() {
KafkaError.Builder kafkaError = KafkaError.newBuilder()
.setTopic("topic")
.setStack("stack")
.setPartition(0)
.setOffset(0)
.setCause("cause")
.setValue("value");

handler = new DlqDeserializationExceptionHandler();
KafkaError.Builder enrichedBuilder = handler.enrichWithException(kafkaError,
new RecordTooLargeException("Exception..."), "key".getBytes(StandardCharsets.UTF_8),
"value".getBytes(StandardCharsets.UTF_8));

KafkaError error = enrichedBuilder.build();
assertEquals("Unknown cause", error.getCause());
assertEquals("The record is too large to be set as value (5 bytes). "
+ "The key will be used instead", error.getValue());
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.michelin.kstreamplify.rest;
package com.michelin.kstreamplify.error;

import static org.junit.jupiter.api.Assertions.assertNotNull;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.michelin.kstreamplify.rest;
package com.michelin.kstreamplify.error;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.michelin.kstreamplify.rest;
package com.michelin.kstreamplify.error;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.michelin.kstreamplify.rest;
package com.michelin.kstreamplify.error;

import static org.junit.jupiter.api.Assertions.assertEquals;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.michelin.kstreamplify.rest;
package com.michelin.kstreamplify.error;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,12 @@
import static com.michelin.kstreamplify.constants.PropertyConstants.PROPERTY_SEPARATOR;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext;
import com.michelin.kstreamplify.model.RestServiceResponse;
import com.michelin.kstreamplify.properties.PropertiesUtils;
import com.michelin.kstreamplify.services.ProbeService;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerializer;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import java.util.Properties;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.michelin.kstreamplify.properties;

import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.Properties;
import org.junit.jupiter.api.Test;

class PropertiesUtilsTest {

@Test
void shouldLoadProperties() {
Properties properties = PropertiesUtils.loadProperties();

assertTrue(properties.containsKey("server.port"));
assertTrue(properties.containsValue(8080));

assertTrue(properties.containsKey("kafka.properties.application.id"));
assertTrue(properties.containsValue("appId"));
}

@Test
void shouldLoadKafkaProperties() {
Properties properties = PropertiesUtils.loadKafkaProperties(PropertiesUtils.loadProperties());

assertTrue(properties.containsKey("application.id"));
assertTrue(properties.containsValue("appId"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,22 @@
import java.util.Properties;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.junit.jupiter.MockitoExtension;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.CompressionType;
import org.rocksdb.Options;

@ExtendWith(MockitoExtension.class)
class RocksDbConfigTest {

@Mock
private Options options;

@BeforeEach
void setUp() {
MockitoAnnotations.openMocks(this);
when(options.tableFormatConfig()).thenReturn(new BlockBasedTableConfig());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.michelin.kstreamplify.rest;

import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

import com.michelin.kstreamplify.initializer.KafkaStreamsInitializer;
import org.junit.jupiter.api.Test;

class DefaultProbeControllerTest {
@Test
void shouldCreateServerWithDefaultHostAndPort() {
DefaultProbeController controller = new DefaultProbeController(new KafkaStreamsInitializer());

assertNotNull(controller.server.getAddress().getAddress().getHostName());
assertNotEquals(0, controller.server.getAddress().getPort());
}
}
Loading

0 comments on commit e214078

Please sign in to comment.