Skip to content

Commit

Permalink
Merge branch 'main' into 48-getting-started-with-a-gif-animation
Browse files Browse the repository at this point in the history
# Conflicts:
#	kstreamplify-core-test/src/test/java/com/michelin/kstreamplify/TopologyErrorHandlerTest.java
#	kstreamplify-core/src/main/java/com/michelin/kstreamplify/context/KafkaStreamsExecutionContext.java
#	kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandler.java
#	kstreamplify-core/src/main/java/com/michelin/kstreamplify/error/DlqProductionExceptionHandler.java
#	kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorTest.java
#	kstreamplify-core/src/test/java/com/michelin/kstreamplify/initializer/KafkaStreamsInitializerTest.java
#	kstreamplify-core/src/test/java/com/michelin/kstreamplify/properties/RocksDbConfigTest.java
#	pom.xml
  • Loading branch information
Loïc Greffier committed Sep 26, 2023
2 parents 88e9efd + ff82273 commit 606e9ad
Show file tree
Hide file tree
Showing 29 changed files with 1,197 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.michelin.kstreamplify.error.TopologyErrorHandler;
import com.michelin.kstreamplify.initializer.KafkaStreamsStarter;
import com.michelin.kstreamplify.utils.SerdesUtils;
import com.michelin.kstreamplify.utils.TopicWithSerde;
import java.util.List;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
Expand Down Expand Up @@ -136,4 +137,19 @@ void shouldContinueWhenProcessingValueIsInvalidAvro() {
assertEquals(1, resultDlq.size());
assertEquals(0, resultOutput.size());
}

@Test
void shouldCreateInputAndOutputTopicsWithSerde() {
TestInputTopic<String, String> inputTopic = createInputTestTopic(new TopicWithSerde<>("INPUT_TOPIC",
"APP_NAME", Serdes.String(), Serdes.String()));

assertEquals("TestInputTopic[topic='INPUT_TOPIC', keySerializer=StringSerializer, "
+ "valueSerializer=StringSerializer]", inputTopic.toString());

TestOutputTopic<String, String> outputTopic = createOutputTestTopic(new TopicWithSerde<>("OUTPUT_TOPIC",
"APP_NAME", Serdes.String(), Serdes.String()));

assertEquals("TestOutputTopic[topic='OUTPUT_TOPIC', keyDeserializer=StringDeserializer, "
+ "valueDeserializer=StringDeserializer, size=0]", outputTopic.toString());
}
}
14 changes: 14 additions & 0 deletions kstreamplify-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,20 @@

<artifactId>kstreamplify-core</artifactId>

<dependencies>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>${logback.version}</version>
</dependency>

<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

import java.util.Map;
import java.util.Properties;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -16,6 +18,7 @@
* The class to represent the context of the KStream.
*/
@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class KafkaStreamsExecutionContext {

/**
Expand All @@ -36,6 +39,7 @@ public class KafkaStreamsExecutionContext {
* The properties of the stream execution context.
*/
@Getter
@Setter
private static Properties properties;

/**
Expand All @@ -51,9 +55,6 @@ public class KafkaStreamsExecutionContext {
@Getter
private static String prefix;

private KafkaStreamsExecutionContext() {
}

/**
* Register KStream properties.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public class DlqDeserializationExceptionHandler extends DlqExceptionHandler

/**
* Constructor.
*
* @param producer A Kafka producer.
*/
public DlqDeserializationExceptionHandler(Producer<byte[], KafkaError> producer) {
DlqExceptionHandler.producer = producer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ public class DlqProductionExceptionHandler extends DlqExceptionHandler

/**
* Constructor.
*
* @param producer A Kafka producer
*/
public DlqProductionExceptionHandler(Producer<byte[], KafkaError> producer) {
DlqExceptionHandler.producer = producer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class KafkaStreamsInitializer {
/**
* The application properties.
*/
protected Properties properties;
protected Properties properties = new Properties();

/**
* The DLQ topic.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ public class TopicWithSerde<K, V> {
@Getter
private final Serde<V> valueSerde;


/**
* <p>Additional constructor which uses default parameter "self" for prefixPropertyKey.</p>
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.michelin.kstreamplify.context;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;

import java.util.Properties;
import org.apache.kafka.streams.StreamsConfig;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class KafkaStreamsExecutionContextTest {

@BeforeEach
void setUp() {
KafkaStreamsExecutionContext.setProperties(null);
}

@Test
void shouldNotRegisterPropertiesWhenNull() {
KafkaStreamsExecutionContext.registerProperties(null);
assertNull(KafkaStreamsExecutionContext.getProperties());
}

@Test
void shouldAddPrefixToAppId() {
Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
properties.put("prefix.self", "abc.");

KafkaStreamsExecutionContext.registerProperties(properties);

assertEquals("abc.", KafkaStreamsExecutionContext.getPrefix());
assertEquals("abc.appId", KafkaStreamsExecutionContext.getProperties()
.get(StreamsConfig.APPLICATION_ID_CONFIG));
}

@Test
void shouldNotAddPrefixToAppIdIfNoPrefix() {
Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");

KafkaStreamsExecutionContext.registerProperties(properties);

assertEquals("", KafkaStreamsExecutionContext.getPrefix());
assertEquals("appId", KafkaStreamsExecutionContext.getProperties()
.get(StreamsConfig.APPLICATION_ID_CONFIG));
}

@Test
void shouldNotAddPrefixToAppIdIfNotAppId() {
Properties properties = new Properties();
properties.put("prefix.self", "abc.");

KafkaStreamsExecutionContext.registerProperties(properties);

assertEquals("abc.", KafkaStreamsExecutionContext.getPrefix());
assertNull(KafkaStreamsExecutionContext.getProperties().get(StreamsConfig.APPLICATION_ID_CONFIG));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class DedupWithPredicateProcessorTest {
private TimestampedKeyValueStore<String, KafkaError> store;

@BeforeEach
public void setUp() {
void setUp() {
// Create an instance of DedupWithPredicateProcessor for testing
processor = new DedupWithPredicateProcessor<>("testStore", Duration.ofHours(1), TestKeyExtractor::extract);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package com.michelin.kstreamplify.error;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.when;

import com.michelin.kstreamplify.avro.KafkaError;
import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
class DlqDeserializationExceptionHandlerTest {
@Mock
private ConsumerRecord<byte[], byte[]> record;

@Mock
private ProcessorContext processorContext;

private Producer<byte[], KafkaError> producer;

private DlqDeserializationExceptionHandler handler;

@BeforeEach
void setUp() {
Serializer<KafkaError> serializer = (Serializer) new KafkaAvroSerializer();
serializer.configure(Map.of(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://"), false);
producer = new MockProducer<>(true, new ByteArraySerializer(), serializer);

KafkaStreamsExecutionContext.setDlqTopicName(null);
}

@Test
void shouldReturnFailIfNoDlq() {
handler = new DlqDeserializationExceptionHandler(producer);

DeserializationExceptionHandler.DeserializationHandlerResponse response =
handler.handle(processorContext, record, new RuntimeException("Exception..."));

assertEquals(DeserializationExceptionHandler.DeserializationHandlerResponse.FAIL, response);
}

@Test
void shouldReturnFailOnExceptionDuringHandle() {
handler = new DlqDeserializationExceptionHandler(producer);
KafkaStreamsExecutionContext.setDlqTopicName("DlqTopic");
DeserializationExceptionHandler.DeserializationHandlerResponse response =
handler.handle(processorContext, record, new KafkaException("Exception..."));

assertEquals(DeserializationExceptionHandler.DeserializationHandlerResponse.FAIL, response);
}

@Test
void shouldReturnContinueOnKafkaException() {
handler = new DlqDeserializationExceptionHandler(producer);
KafkaStreamsExecutionContext.setDlqTopicName("DlqTopic");

when(record.key()).thenReturn("key".getBytes(StandardCharsets.UTF_8));
when(record.value()).thenReturn("value".getBytes(StandardCharsets.UTF_8));
when(record.topic()).thenReturn("topic");

DeserializationExceptionHandler.DeserializationHandlerResponse response =
handler.handle(processorContext, record, new KafkaException("Exception..."));

assertEquals(DeserializationExceptionHandler.DeserializationHandlerResponse.CONTINUE, response);
}

@Test
void shouldConfigure() {
Map<String, Object> configs = new HashMap<>();
configs.put("bootstrap.servers", "localhost:9092");
configs.put("schema.registry.url", "localhost:8080");
configs.put("acks", "all");

handler = new DlqDeserializationExceptionHandler();
handler.configure(configs);

assertTrue(DlqExceptionHandler.getProducer() instanceof KafkaProducer<byte[], KafkaError>);

Check failure on line 97 in kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/DlqDeserializationExceptionHandlerTest.java

View workflow job for this annotation

GitHub Actions / JUnit Test Report

DlqDeserializationExceptionHandlerTest.shouldConfigure

expected: <true> but was: <false>
Raw output
org.opentest4j.AssertionFailedError: expected: <true> but was: <false>
	at org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
	at org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
	at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
	at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
	at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:31)
	at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:179)
	at com.michelin.kstreamplify.rest.DlqDeserializationExceptionHandlerTest.shouldConfigure(DlqDeserializationExceptionHandlerTest.java:97)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
}

@Test
void shouldEnrichWithException() {
KafkaError.Builder kafkaError = KafkaError.newBuilder()
.setTopic("topic")
.setStack("stack")
.setPartition(0)
.setOffset(0)
.setCause("cause")
.setValue("value");

handler = new DlqDeserializationExceptionHandler();
KafkaError.Builder enrichedBuilder = handler.enrichWithException(kafkaError,
new RuntimeException("Exception..."), "key".getBytes(StandardCharsets.UTF_8),
"value".getBytes(StandardCharsets.UTF_8));

KafkaError error = enrichedBuilder.build();
assertEquals("Unknown cause", error.getCause());
assertNull(error.getContextMessage());
}

@Test
void shouldEnrichWithRecordTooLargeException() {
KafkaError.Builder kafkaError = KafkaError.newBuilder()
.setTopic("topic")
.setStack("stack")
.setPartition(0)
.setOffset(0)
.setCause("cause")
.setValue("value");

handler = new DlqDeserializationExceptionHandler();
KafkaError.Builder enrichedBuilder = handler.enrichWithException(kafkaError,
new RecordTooLargeException("Exception..."), "key".getBytes(StandardCharsets.UTF_8),
"value".getBytes(StandardCharsets.UTF_8));

KafkaError error = enrichedBuilder.build();
assertEquals("Unknown cause", error.getCause());
assertEquals("The record is too large to be set as value (5 bytes). "
+ "The key will be used instead", error.getValue());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.michelin.kstreamplify.error;

import static org.junit.jupiter.api.Assertions.assertNotNull;

import com.michelin.kstreamplify.error.DlqExceptionHandler;
import java.util.HashMap;
import java.util.Map;
import org.junit.jupiter.api.Test;

class DlqExceptionHandlerTest {

@Test
void shouldInstantiateProducer() {
Map<String, Object> configs = new HashMap<>();
configs.put("bootstrap.servers", "localhost:9092");
configs.put("schema.registry.url", "localhost:8080");
configs.put("acks", "all");

DlqExceptionHandler.instantiateProducer("test-client", configs);

assertNotNull(DlqExceptionHandler.getProducer());
}
}
Loading

0 comments on commit 606e9ad

Please sign in to comment.