Skip to content

Commit

Permalink
Add support for starting Kafka Connect
Browse files Browse the repository at this point in the history
Signed-off-by: Mickael Maison <[email protected]>
  • Loading branch information
mimaison committed Dec 18, 2024
1 parent b697d95 commit 3ba3936
Show file tree
Hide file tree
Showing 6 changed files with 616 additions and 7 deletions.
230 changes: 230 additions & 0 deletions src/main/java/io/strimzi/test/container/StrimziConnectCluster.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.test.container;

import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
* A Kafka Connect cluster using the latest image from quay.io/strimzi/kafka with the given version.
* Kafka Connect is started in distributed mode. Users must use the exposed REST API to start, stop and manage connectors.
*/
public class StrimziConnectCluster {

private static final String NETWORK_ALIAS_PREFIX = "connect-";
private static final int CONNECT_PORT = 8083;
private static final int INTER_WORKER_PORT = 8084;

private final StrimziKafkaCluster kafkaCluster;
private final Map<String, String> additionalConnectConfiguration;
private final String kafkaVersion;
private final boolean includeFileConnectors;
private final String groupId;
private final List<StrimziConnectContainer> workers;

public StrimziConnectCluster(StrimziConnectClusterBuilder builder) {
kafkaCluster = builder.kafkaCluster;
additionalConnectConfiguration = builder.additionalConnectConfiguration;
kafkaVersion = builder.kafkaVersion == null
? KafkaVersionService.getInstance().latestRelease().getVersion()
: builder.kafkaVersion;
includeFileConnectors = builder.includeFileConnectors;
groupId = builder.groupId;

String imageName = KafkaVersionService.strimziTestContainerImageName(kafkaVersion);

workers = new ArrayList<>();
for (int i = 0; i < builder.workersNum; i++) {
String host = NETWORK_ALIAS_PREFIX + i;
Properties configs = buildConfigs(host);
StrimziConnectContainer worker = new StrimziConnectContainer(imageName, kafkaCluster, configs)
.withNetwork(kafkaCluster.getNetwork())
.withNetworkAliases(host)
.withExposedPorts(CONNECT_PORT)
.withEnv("LOG_DIR", "/tmp")
.waitForRunning()
.waitingFor(Wait.forHttp("/").forStatusCode(200));
workers.add(worker);
}
}

private Properties buildConfigs(String host) {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", kafkaCluster.getNetworkBootstrapServers());
properties.setProperty("group.id", groupId);
properties.setProperty("key.converter", "org.apache.kafka.connect.storage.StringConverter");
properties.setProperty("value.converter", "org.apache.kafka.connect.storage.StringConverter");
properties.setProperty("offset.storage.topic", "connect-offsets");
properties.setProperty("offset.storage.replication.factor", "-1");
properties.setProperty("config.storage.topic", "connect-configs");
properties.setProperty("config.storage.replication.factor", "-1");
properties.setProperty("status.storage.topic", "connect-status");
properties.setProperty("status.storage.replication.factor", "-1");
properties.setProperty("listeners", "http://:" + CONNECT_PORT + ",http://" + host + ":" + INTER_WORKER_PORT);
properties.putAll(additionalConnectConfiguration);
if (includeFileConnectors) {
String connectFileJar = "/opt/kafka/libs/connect-file-" + kafkaVersion + ".jar";
if (properties.containsKey("plugin.path")) {
String pluginPath = properties.getProperty("plugin.path");
properties.setProperty("plugin.path", pluginPath + "," + connectFileJar);
} else {
properties.setProperty("plugin.path", connectFileJar);
}
}
return properties;
}

/**
* Get the workers of this Kafka Connect cluster.
*
* @return collection of GenericContainer containers
*/
public Collection<GenericContainer<?>> getWorkers() {
return new ArrayList<>(workers);
}

/**
* Start the Kafka Connect cluster.
* This starts all the workers and waits for them to all be healthy and ready to be used.
*/
public void start() {
for (StrimziConnectContainer worker : workers) {
worker.start();
}
}

/**
* Stop the Kafka Connect cluster.
*/
public void stop() {
workers.forEach(StrimziConnectContainer::stop);
}

/**
* Return the REST API endpoint of one of the available workers.
*
* @return the REST API endpoint
*/
public String getRestEndpoint() {
for (StrimziConnectContainer worker : workers) {
if (worker.isRunning()) {
return "http://" + worker.getHost() + ":" + worker.getMappedPort(CONNECT_PORT);
}
}
throw new IllegalStateException("No workers are running and healthy");
}

/**
* Builder class for {@code StrimziConnectCluster}.
* <p>
* Use this builder to create instances of {@code StrimziConnectCluster}.
* You must at least call {@link #withKafkaCluster(StrimziKafkaCluster)}, and
* {@link #withGroupId(String)} before calling {@link #build()}.
* </p>
*/
public static class StrimziConnectClusterBuilder {

private Map<String, String> additionalConnectConfiguration = new HashMap<>();
private boolean includeFileConnectors = true;
private int workersNum = 1;
private String kafkaVersion;
private StrimziKafkaCluster kafkaCluster;
private String groupId;

/**
* Set the Kafka cluster the Kafka Connect cluster will use to.
*
* @param kafkaCluster the {@link StrimziKafkaCluster} instance
* @return the current instance of {@code StrimziConnectClusterBuilder} for method chaining
*/
public StrimziConnectClusterBuilder withKafkaCluster(StrimziKafkaCluster kafkaCluster) {
this.kafkaCluster = kafkaCluster;
return this;
}

/**
* Set the number of Kafka Connect workers in the cluster.
* If not called, the cluster has a single worker.
*
* @param workersNum the number of Kafka Connect workers
* @return the current instance of {@code StrimziConnectClusterBuilder} for method chaining
*/
public StrimziConnectClusterBuilder withNumberOfWorkers(int workersNum) {
this.workersNum = workersNum;
return this;
}

/**
* Add additional Kafka Connect configuration parameters.
* These configurations are applied to all workers in the cluster.
*
* @param additionalConnectConfiguration a map of additional Kafka Connect configuration options
* @return the current instance of {@code StrimziConnectClusterBuilder} for method chaining
*/
public StrimziConnectClusterBuilder withAdditionalConnectConfiguration(Map<String, String> additionalConnectConfiguration) {
this.additionalConnectConfiguration = additionalConnectConfiguration;
return this;
}

/**
* Specify the Kafka version to be used for the Connect workers in the cluster.
* If not called, the latest Kafka version available from {@link KafkaVersionService} will be used.
*
* @param kafkaVersion the desired Kafka version for the Connect cluster
* @return the current instance of {@code StrimziConnectClusterBuilder} for method chaining
*/
public StrimziConnectClusterBuilder withKafkaVersion(String kafkaVersion) {
this.kafkaVersion = kafkaVersion;
return this;
}

/**
* Disable the FileStreams connectors.
* If not called, the FileSteams connectors are added to plugin.path.
*
* @return the current instance of {@code StrimziConnectClusterBuilder} for method chaining
*/
public StrimziConnectClusterBuilder withoutFileConnectors() {
this.includeFileConnectors = false;
return this;
}

/**
* Specify the group.id of the Connect cluster.
*
* @param groupId the group id
* @return the current instance of {@code StrimziConnectClusterBuilder} for method chaining
*/
public StrimziConnectClusterBuilder withGroupId(String groupId) {
this.groupId = groupId;
return this;
}

/**
* Build and return a {@code StrimziConnectCluster} instance based on the provided configurations.
*
* @return a new instance of {@code StrimziConnectCluster}
*/
public StrimziConnectCluster build() {
if (kafkaCluster == null) {
throw new IllegalArgumentException("A Kafka cluster must be specified");
}
if (groupId == null) {
throw new IllegalArgumentException("The Connect cluster group.id configuration must be specified");
}
if (workersNum < 0) {
throw new IllegalArgumentException("The number of workers in the Connect cluster must be greater than 0");
}
return new StrimziConnectCluster(this);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.test.container;

import com.github.dockerjava.api.command.InspectContainerResponse;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.images.builder.Transferable;

import java.io.IOException;
import java.io.StringWriter;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.Properties;

class StrimziConnectContainer extends GenericContainer<StrimziConnectContainer> {

private static final String STARTER_SCRIPT = "/start_connect.sh";
private static final String CONFIG_FILE = "/opt/kafka/config/connect.properties";

private final StrimziKafkaCluster kafkaCluster;
private final Properties configs;

public StrimziConnectContainer(String imageName, StrimziKafkaCluster kafkaCluster, Properties configs) {
super(imageName);
this.kafkaCluster = kafkaCluster;
this.configs = configs;
}

@Override
protected void doStart() {
super.setNetwork(kafkaCluster.getNetwork());
super.setCommand("sh", "-c", "while [ ! -x " + STARTER_SCRIPT + " ]; do sleep 0.1; done; " + STARTER_SCRIPT);
super.doStart();
}

@Override
protected void containerIsStarting(final InspectContainerResponse containerInfo, final boolean reused) {
super.containerIsStarting(containerInfo, reused);

// Write configs to a file in the container
StringWriter writer = new StringWriter();
try {
configs.store(writer, null);
} catch (IOException e) {
throw new UncheckedIOException("Failed to build configuration file", e);
}
copyFileToContainer(
Transferable.of(writer.toString().getBytes(StandardCharsets.UTF_8)),
CONFIG_FILE);

// Write starter script to a file in the container
String command = "/opt/kafka/bin/connect-distributed.sh " + CONFIG_FILE;
copyFileToContainer(
Transferable.of(command.getBytes(StandardCharsets.UTF_8), 700),
STARTER_SCRIPT
);
}

public StrimziConnectContainer waitForRunning() {
super.waitingFor(Wait.forLogMessage(".*Finished starting connectors and tasks.*", 1));
return this;
}
}
24 changes: 17 additions & 7 deletions src/main/java/io/strimzi/test/container/StrimziKafkaCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public static class StrimziKafkaClusterBuilder {
* Sets the number of Kafka brokers in the cluster.
*
* @param brokersNum the number of Kafka brokers
* @return the current instance of {@code StrimziKafkaClusterBuilder} for method chaining
* @return the current instance of {@code StrimziConnectClusterBuilder} for method chaining
*/
public StrimziKafkaClusterBuilder withNumberOfBrokers(int brokersNum) {
this.brokersNum = brokersNum;
Expand All @@ -174,7 +174,7 @@ public StrimziKafkaClusterBuilder withNumberOfBrokers(int brokersNum) {
* If not provided, it defaults to the number of brokers.
*
* @param internalTopicReplicationFactor the replication factor for internal topics
* @return the current instance of {@code StrimziKafkaClusterBuilder} for method chaining
* @return the current instance of {@code StrimziConnectClusterBuilder} for method chaining
*/
public StrimziKafkaClusterBuilder withInternalTopicReplicationFactor(int internalTopicReplicationFactor) {
this.internalTopicReplicationFactor = internalTopicReplicationFactor;
Expand All @@ -186,7 +186,7 @@ public StrimziKafkaClusterBuilder withInternalTopicReplicationFactor(int interna
* These configurations are applied to all brokers in the cluster.
*
* @param additionalKafkaConfiguration a map of additional Kafka configuration options
* @return the current instance of {@code StrimziKafkaClusterBuilder} for method chaining
* @return the current instance of {@code StrimziConnectClusterBuilder} for method chaining
*/
public StrimziKafkaClusterBuilder withAdditionalKafkaConfiguration(Map<String, String> additionalKafkaConfiguration) {
if (additionalKafkaConfiguration != null) {
Expand All @@ -199,7 +199,7 @@ public StrimziKafkaClusterBuilder withAdditionalKafkaConfiguration(Map<String, S
* Sets a {@code ToxiproxyContainer} to simulate network conditions such as latency or disconnection.
*
* @param proxyContainer the proxy container for simulating network conditions
* @return the current instance of {@code StrimziKafkaClusterBuilder} for method chaining
* @return the current instance of {@code StrimziConnectClusterBuilder} for method chaining
*/
public StrimziKafkaClusterBuilder withProxyContainer(ToxiproxyContainer proxyContainer) {
this.proxyContainer = proxyContainer;
Expand All @@ -210,7 +210,7 @@ public StrimziKafkaClusterBuilder withProxyContainer(ToxiproxyContainer proxyCon
* Enables a shared Docker network for the Kafka cluster.
* This allows the Kafka cluster to interact with other containers on the same network.
*
* @return the current instance of {@code StrimziKafkaClusterBuilder} for method chaining
* @return the current instance of {@code StrimziConnectClusterBuilder} for method chaining
*/
public StrimziKafkaClusterBuilder withSharedNetwork() {
this.enableSharedNetwork = true;
Expand All @@ -222,7 +222,7 @@ public StrimziKafkaClusterBuilder withSharedNetwork() {
* If no version is provided, the latest Kafka version available from {@link KafkaVersionService} will be used.
*
* @param kafkaVersion the desired Kafka version for the cluster
* @return the current instance of {@code StrimziKafkaClusterBuilder} for method chaining
* @return the current instance of {@code StrimziConnectClusterBuilder} for method chaining
*/
public StrimziKafkaClusterBuilder withKafkaVersion(String kafkaVersion) {
this.kafkaVersion = kafkaVersion;
Expand All @@ -235,7 +235,7 @@ public StrimziKafkaClusterBuilder withKafkaVersion(String kafkaVersion) {
* KRaft mode allows Kafka to operate without ZooKeeper.
* </p>
*
* @return the current instance of {@code StrimziKafkaClusterBuilder} for method chaining
* @return the current instance of {@code StrimziConnectClusterBuilder} for method chaining
*/
public StrimziKafkaClusterBuilder withKraft() {
this.enableKRaft = true;
Expand Down Expand Up @@ -263,6 +263,16 @@ public Collection<KafkaContainer> getBrokers() {
return this.brokers;
}

/**
* Get the bootstrap servers that containers on the same network should use to connect
* @return a comma separated list of Kafka bootstrap servers
*/
public String getNetworkBootstrapServers() {
return brokers.stream()
.map(broker -> ((StrimziKafkaContainer) broker).getNetworkBootstrapServers())
.collect(Collectors.joining(","));
}

@Override
@DoNotMutate
public boolean hasKraftOrExternalZooKeeperConfigured() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,10 @@ public String getBootstrapServers() {
return bootstrapServersProvider.apply(this);
}

public String getNetworkBootstrapServers() {
return NETWORK_ALIAS_PREFIX + brokerId + ":" + INTER_BROKER_LISTENER_PORT;
}

/**
* Get the cluster id. This is only supported for KRaft containers.
* @return The cluster id.
Expand Down
Loading

0 comments on commit 3ba3936

Please sign in to comment.