Skip to content

Commit

Permalink
Add Kafka integration tests with Testcontainers (#98)
Browse files Browse the repository at this point in the history
* Add testcontainers on core

* Add testcontainers for Spring Boot
  • Loading branch information
Loïc GREFFIER authored Sep 20, 2023
1 parent e214078 commit 2833299
Show file tree
Hide file tree
Showing 12 changed files with 484 additions and 51 deletions.
14 changes: 14 additions & 0 deletions kstreamplify-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,20 @@

<artifactId>kstreamplify-core</artifactId>

<dependencies>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>${logback.version}</version>
</dependency>

<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public class DlqDeserializationExceptionHandler extends DlqExceptionHandler

/**
* Constructor.
*
* @param producer A Kafka producer.
*/
public DlqDeserializationExceptionHandler(Producer<byte[], KafkaError> producer) {
DlqExceptionHandler.producer = producer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ public class DlqProductionExceptionHandler extends DlqExceptionHandler

/**
* Constructor.
*
* @param producer A Kafka producer
*/
public DlqProductionExceptionHandler(Producer<byte[], KafkaError> producer) {
DlqExceptionHandler.producer = producer;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package com.michelin.kstreamplify.integrations;

import static org.apache.kafka.streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.michelin.kstreamplify.context.KafkaStreamsExecutionContext;
import com.michelin.kstreamplify.initializer.KafkaStreamsInitializer;
import com.michelin.kstreamplify.initializer.KafkaStreamsStarter;
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsMetadata;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

@Slf4j
@Testcontainers
class KafkaStreamsInitializerIntegrationTest {
private final KafkaStreamsInitializer initializer = new KafkaStreamInitializerImpl();

private final HttpClient httpClient = HttpClient.newBuilder().build();

@Container
static KafkaContainer kafka = new KafkaContainer(DockerImageName
.parse("confluentinc/cp-kafka:7.4.0"))
.withKraft();

@BeforeAll
static void setUp() {
createTopics("inputTopic", "outputTopic");
}

@Test
void shouldInitAndRun() throws InterruptedException, IOException {
initializer.init(new KafkaStreamsStarterImpl());

waitingForKafkaStreamsToRun();

assertEquals(KafkaStreams.State.RUNNING, initializer.getKafkaStreams().state());

List<StreamsMetadata> streamsMetadata =
new ArrayList<>(initializer.getKafkaStreams().metadataForAllStreamsClients());

// Assert Kafka Streams initialization
assertEquals("localhost", streamsMetadata.get(0).hostInfo().host());
assertEquals(8080, streamsMetadata.get(0).hostInfo().port());
assertTrue(streamsMetadata.get(0).stateStoreNames().isEmpty());

List<TopicPartition> topicPartitions = streamsMetadata.get(0).topicPartitions().stream().toList();

assertEquals("inputTopic", topicPartitions.get(0).topic());
assertEquals(0, topicPartitions.get(0).partition());

assertEquals("dlqTopic", KafkaStreamsExecutionContext.getDlqTopicName());
assertEquals("org.apache.kafka.common.serialization.Serdes$StringSerde",
KafkaStreamsExecutionContext.getSerdesConfig().get("default.key.serde"));
assertEquals("org.apache.kafka.common.serialization.Serdes$StringSerde",
KafkaStreamsExecutionContext.getSerdesConfig().get("default.value.serde"));

assertEquals("localhost:8080",
KafkaStreamsExecutionContext.getProperties().get("application.server"));

// Assert HTTP probes
HttpRequest requestReady = HttpRequest.newBuilder()
.uri(URI.create("http://localhost:8080/ready"))
.GET()
.build();

HttpResponse<Void> responseReady = httpClient.send(requestReady, HttpResponse.BodyHandlers.discarding());

assertEquals(200, responseReady.statusCode());

HttpRequest requestLiveness = HttpRequest.newBuilder()
.uri(URI.create("http://localhost:8080/liveness"))
.GET()
.build();

HttpResponse<Void> responseLiveness = httpClient.send(requestLiveness, HttpResponse.BodyHandlers.discarding());

assertEquals(200, responseLiveness.statusCode());

HttpRequest requestTopology = HttpRequest.newBuilder()
.uri(URI.create("http://localhost:8080/topology"))
.GET()
.build();

HttpResponse<String> responseTopology = httpClient.send(requestTopology, HttpResponse.BodyHandlers.ofString());

assertEquals(200, responseTopology.statusCode());
assertEquals("""
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [inputTopic])
--> KSTREAM-SINK-0000000001
Sink: KSTREAM-SINK-0000000001 (topic: outputTopic)
<-- KSTREAM-SOURCE-0000000000
""", responseTopology.body());
}

private void waitingForKafkaStreamsToRun() throws InterruptedException {
while (!initializer.getKafkaStreams().state().equals(KafkaStreams.State.RUNNING)) {
log.info("Waiting for Kafka Streams to start...");
Thread.sleep(2000);
}
}

private static void createTopics(String... topics) {
var newTopics = Arrays.stream(topics)
.map(topic -> new NewTopic(topic, 1, (short) 1))
.toList();
try (var admin = AdminClient.create(Map.of(BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()))) {
admin.createTopics(newTopics);
}
}

static class KafkaStreamInitializerImpl extends KafkaStreamsInitializer {
@Override
protected void initProperties() {
super.initProperties();
KafkaStreamsExecutionContext.getProperties()
.setProperty(BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
}
}

@Slf4j
static class KafkaStreamsStarterImpl extends KafkaStreamsStarter {
@Override
public void topology(StreamsBuilder streamsBuilder) {
streamsBuilder
.stream("inputTopic")
.to("outputTopic");
}

@Override
public String dlqTopic() {
return "dlqTopic";
}

@Override
public void onStart(KafkaStreams kafkaStreams) {
log.info("Starting Kafka Streams from integration tests!");
}
}
}
2 changes: 2 additions & 0 deletions kstreamplify-core/src/test/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ server:
kafka:
properties:
application.id: appId
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
11 changes: 11 additions & 0 deletions kstreamplify-core/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>

<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
13 changes: 13 additions & 0 deletions kstreamplify-spring-boot/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,26 @@
<version>${spring-boot.version}</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
<version>${spring-boot.version}</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>${spring-boot.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-testcontainers</artifactId>
<version>${spring-boot.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.michelin</groupId>
<artifactId>kstreamplify-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class KafkaProperties {
/**
* The Kafka properties.
*/
private final Map<String, String> properties = new HashMap<>();
private Map<String, String> properties = new HashMap<>();

/**
* Return the Kafka properties as {@link java.util.Properties}.
Expand Down
Loading

0 comments on commit 2833299

Please sign in to comment.