Skip to content

Commit

Permalink
Add support for starting Kafka Connect
Browse files Browse the repository at this point in the history
Signed-off-by: Mickael Maison <[email protected]>
  • Loading branch information
mimaison committed Jan 6, 2025
1 parent 0f2e3f3 commit 2764904
Show file tree
Hide file tree
Showing 8 changed files with 693 additions and 14 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -454,8 +454,8 @@
<avoidCallsTo>org.slf4j</avoidCallsTo>
<avoidCallsTo>org.apache.commons.logging</avoidCallsTo>
</avoidCallsTo>
<mutationThreshold>91</mutationThreshold>
<coverageThreshold>71</coverageThreshold>
<mutationThreshold>88</mutationThreshold>
<coverageThreshold>74</coverageThreshold>
<verbose>true</verbose>
</configuration>
</plugin>
Expand Down
238 changes: 238 additions & 0 deletions src/main/java/io/strimzi/test/container/StrimziConnectCluster.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.test.container;

import com.groupcdg.pitest.annotations.DoNotMutate;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
* A Kafka Connect cluster using the latest image from quay.io/strimzi/kafka with the given version.
* Kafka Connect is started in distributed mode. Users must use the exposed REST API to start, stop and manage connectors.
*/
public class StrimziConnectCluster {

private static final String NETWORK_ALIAS_PREFIX = "connect-";
private static final int CONNECT_PORT = 8083;
private static final int INTER_WORKER_PORT = 8084;

private final StrimziKafkaCluster kafkaCluster;
private final Map<String, String> additionalConnectConfiguration;
private final String kafkaVersion;
private final boolean includeFileConnectors;
private final String groupId;
private final List<StrimziConnectContainer> workers;

public StrimziConnectCluster(StrimziConnectClusterBuilder builder) {
kafkaCluster = builder.kafkaCluster;
additionalConnectConfiguration = builder.additionalConnectConfiguration;
kafkaVersion = builder.kafkaVersion == null
? KafkaVersionService.getInstance().latestRelease().getVersion()
: builder.kafkaVersion;
includeFileConnectors = builder.includeFileConnectors;
groupId = builder.groupId;

String imageName = KafkaVersionService.strimziTestContainerImageName(kafkaVersion);

workers = new ArrayList<>();
for (int i = 0; i < builder.workersNum; i++) {
String host = NETWORK_ALIAS_PREFIX + i;
Properties configs = buildConfigs(host);
StrimziConnectContainer worker = new StrimziConnectContainer(imageName, kafkaCluster, configs)
.withNetwork(kafkaCluster.getNetwork())
.withNetworkAliases(host)
.withExposedPorts(CONNECT_PORT)
.withEnv("LOG_DIR", "/tmp")
.waitForRunning()
.waitingFor(Wait.forHttp("/").forStatusCode(200));
workers.add(worker);
}
}

private Properties buildConfigs(String host) {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", kafkaCluster.getNetworkBootstrapServers());
properties.setProperty("group.id", groupId);
properties.setProperty("key.converter", "org.apache.kafka.connect.storage.StringConverter");
properties.setProperty("value.converter", "org.apache.kafka.connect.storage.StringConverter");
properties.setProperty("offset.storage.topic", "connect-offsets");
properties.setProperty("offset.storage.replication.factor", "-1");
properties.setProperty("config.storage.topic", "connect-configs");
properties.setProperty("config.storage.replication.factor", "-1");
properties.setProperty("status.storage.topic", "connect-status");
properties.setProperty("status.storage.replication.factor", "-1");
properties.setProperty("listeners", "http://:" + CONNECT_PORT + ",http://" + host + ":" + INTER_WORKER_PORT);
properties.putAll(additionalConnectConfiguration);
if (includeFileConnectors) {
String connectFileJar = "/opt/kafka/libs/connect-file-" + kafkaVersion + ".jar";
if (properties.containsKey("plugin.path")) {
String pluginPath = properties.getProperty("plugin.path");
properties.setProperty("plugin.path", pluginPath + "," + connectFileJar);
} else {
properties.setProperty("plugin.path", connectFileJar);
}
}
return properties;
}

/**
* Get the workers of this Kafka Connect cluster.
*
* @return collection of GenericContainer containers
*/
@DoNotMutate
public Collection<GenericContainer<?>> getWorkers() {
return new ArrayList<>(workers);
}

/**
* Start the Kafka Connect cluster.
* This starts all the workers and waits for them to all be healthy and ready to be used.
*/
@DoNotMutate
public void start() {
for (StrimziConnectContainer worker : workers) {
worker.start();
}
}

/**
* Stop the Kafka Connect cluster.
*/
@DoNotMutate
public void stop() {
workers.forEach(StrimziConnectContainer::stop);
}

/**
* Return the REST API endpoint of one of the available workers.
*
* @return the REST API endpoint
*/
@DoNotMutate
public String getRestEndpoint() {
for (StrimziConnectContainer worker : workers) {
if (worker.isRunning()) {
return "http://" + worker.getHost() + ":" + worker.getMappedPort(CONNECT_PORT);
}
}
throw new IllegalStateException("No workers are running and healthy");
}

/**
* Builder class for {@code StrimziConnectCluster}.
* <p>
* Use this builder to create instances of {@code StrimziConnectCluster}.
* You must at least call {@link #withKafkaCluster(StrimziKafkaCluster)}, and
* {@link #withGroupId(String)} before calling {@link #build()}.
* </p>
*/
public static class StrimziConnectClusterBuilder {

private Map<String, String> additionalConnectConfiguration = new HashMap<>();
private boolean includeFileConnectors = true;
private int workersNum = 1;
private String kafkaVersion;
private StrimziKafkaCluster kafkaCluster;
private String groupId;

/**
* Set the Kafka cluster the Kafka Connect cluster will use to.
*
* @param kafkaCluster the {@link StrimziKafkaCluster} instance
* @return the current instance of {@code StrimziConnectClusterBuilder} for method chaining
*/
public StrimziConnectClusterBuilder withKafkaCluster(StrimziKafkaCluster kafkaCluster) {
this.kafkaCluster = kafkaCluster;
return this;
}

/**
* Set the number of Kafka Connect workers in the cluster.
* If not called, the cluster has a single worker.
*
* @param workersNum the number of Kafka Connect workers
* @return the current instance of {@code StrimziConnectClusterBuilder} for method chaining
*/
public StrimziConnectClusterBuilder withNumberOfWorkers(int workersNum) {
this.workersNum = workersNum;
return this;
}

/**
* Add additional Kafka Connect configuration parameters.
* These configurations are applied to all workers in the cluster.
*
* @param additionalConnectConfiguration a map of additional Kafka Connect configuration options
* @return the current instance of {@code StrimziConnectClusterBuilder} for method chaining
*/
public StrimziConnectClusterBuilder withAdditionalConnectConfiguration(Map<String, String> additionalConnectConfiguration) {
this.additionalConnectConfiguration = additionalConnectConfiguration;
return this;
}

/**
* Specify the Kafka version to be used for the Connect workers in the cluster.
* If not called, the latest Kafka version available from {@link KafkaVersionService} will be used.
*
* @param kafkaVersion the desired Kafka version for the Connect cluster
* @return the current instance of {@code StrimziConnectClusterBuilder} for method chaining
*/
public StrimziConnectClusterBuilder withKafkaVersion(String kafkaVersion) {
this.kafkaVersion = kafkaVersion;
return this;
}

/**
* Disable the FileStreams connectors.
* If not called, the FileSteams connectors are added to plugin.path.
*
* @return the current instance of {@code StrimziConnectClusterBuilder} for method chaining
*/
public StrimziConnectClusterBuilder withoutFileConnectors() {
this.includeFileConnectors = false;
return this;
}

/**
* Specify the group.id of the Connect cluster.
*
* @param groupId the group id
* @return the current instance of {@code StrimziConnectClusterBuilder} for method chaining
*/
public StrimziConnectClusterBuilder withGroupId(String groupId) {
this.groupId = groupId;
return this;
}

/**
* Build and return a {@code StrimziConnectCluster} instance based on the provided configurations.
*
* @return a new instance of {@code StrimziConnectCluster}
*/
public StrimziConnectCluster build() {
if (kafkaCluster == null) {
throw new IllegalArgumentException("A Kafka cluster must be specified");
}
if (groupId == null) {
throw new IllegalArgumentException("The Connect cluster group.id configuration must be specified");
}
if (workersNum <= 0) {
throw new IllegalArgumentException("The number of workers in the Connect cluster must be greater than 0");
}
if (additionalConnectConfiguration == null) {
throw new IllegalArgumentException("The additional configuration must be specified");
}
return new StrimziConnectCluster(this);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.test.container;

import com.github.dockerjava.api.command.InspectContainerResponse;
import com.groupcdg.pitest.annotations.DoNotMutate;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.images.builder.Transferable;

import java.io.IOException;
import java.io.StringWriter;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.Properties;

class StrimziConnectContainer extends GenericContainer<StrimziConnectContainer> {

private static final String STARTER_SCRIPT = "/start_connect.sh";
private static final String CONFIG_FILE = "/opt/kafka/config/connect.properties";

private final StrimziKafkaCluster kafkaCluster;
private final Properties configs;

public StrimziConnectContainer(String imageName, StrimziKafkaCluster kafkaCluster, Properties configs) {
super(imageName);
this.kafkaCluster = kafkaCluster;
this.configs = configs;
}

@Override
@DoNotMutate
protected void doStart() {
super.setNetwork(kafkaCluster.getNetwork());
super.setCommand("sh", "-c", "while [ ! -x " + STARTER_SCRIPT + " ]; do sleep 0.1; done; " + STARTER_SCRIPT);
super.doStart();
}

@Override
@DoNotMutate
protected void containerIsStarting(final InspectContainerResponse containerInfo, final boolean reused) {
super.containerIsStarting(containerInfo, reused);

// Write configs to a file in the container
StringWriter writer = new StringWriter();
try {
configs.store(writer, null);
} catch (IOException e) {
throw new UncheckedIOException("Failed to build configuration file", e);
}
copyFileToContainer(
Transferable.of(writer.toString().getBytes(StandardCharsets.UTF_8)),
CONFIG_FILE);

// Write starter script to a file in the container
String command = "/opt/kafka/bin/connect-distributed.sh " + CONFIG_FILE;
copyFileToContainer(
Transferable.of(command.getBytes(StandardCharsets.UTF_8), 700),
STARTER_SCRIPT
);
}

@DoNotMutate
public StrimziConnectContainer waitForRunning() {
super.waitingFor(Wait.forLogMessage(".*Finished starting connectors and tasks.*", 1));
return this;
}
}
Loading

0 comments on commit 2764904

Please sign in to comment.