From 3ba39361dedbce2d4cb6a9f6ea381ccd143f66d2 Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Mon, 25 Nov 2024 17:10:51 +0100 Subject: [PATCH] Add support for starting Kafka Connect Signed-off-by: Mickael Maison --- .../test/container/StrimziConnectCluster.java | 230 +++++++++++++++++ .../container/StrimziConnectContainer.java | 66 +++++ .../test/container/StrimziKafkaCluster.java | 24 +- .../test/container/StrimziKafkaContainer.java | 4 + .../container/StrimziConnectClusterIT.java | 232 ++++++++++++++++++ .../container/StrimziConnectClusterTest.java | 67 +++++ 6 files changed, 616 insertions(+), 7 deletions(-) create mode 100644 src/main/java/io/strimzi/test/container/StrimziConnectCluster.java create mode 100644 src/main/java/io/strimzi/test/container/StrimziConnectContainer.java create mode 100644 src/test/java/io/strimzi/test/container/StrimziConnectClusterIT.java create mode 100644 src/test/java/io/strimzi/test/container/StrimziConnectClusterTest.java diff --git a/src/main/java/io/strimzi/test/container/StrimziConnectCluster.java b/src/main/java/io/strimzi/test/container/StrimziConnectCluster.java new file mode 100644 index 0000000..10ded43 --- /dev/null +++ b/src/main/java/io/strimzi/test/container/StrimziConnectCluster.java @@ -0,0 +1,230 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.test.container; + +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * A Kafka Connect cluster using the latest image from quay.io/strimzi/kafka with the given version. + * Kafka Connect is started in distributed mode. Users must use the exposed REST API to start, stop and manage connectors. + */ +public class StrimziConnectCluster { + + private static final String NETWORK_ALIAS_PREFIX = "connect-"; + private static final int CONNECT_PORT = 8083; + private static final int INTER_WORKER_PORT = 8084; + + private final StrimziKafkaCluster kafkaCluster; + private final Map additionalConnectConfiguration; + private final String kafkaVersion; + private final boolean includeFileConnectors; + private final String groupId; + private final List workers; + + public StrimziConnectCluster(StrimziConnectClusterBuilder builder) { + kafkaCluster = builder.kafkaCluster; + additionalConnectConfiguration = builder.additionalConnectConfiguration; + kafkaVersion = builder.kafkaVersion == null + ? KafkaVersionService.getInstance().latestRelease().getVersion() + : builder.kafkaVersion; + includeFileConnectors = builder.includeFileConnectors; + groupId = builder.groupId; + + String imageName = KafkaVersionService.strimziTestContainerImageName(kafkaVersion); + + workers = new ArrayList<>(); + for (int i = 0; i < builder.workersNum; i++) { + String host = NETWORK_ALIAS_PREFIX + i; + Properties configs = buildConfigs(host); + StrimziConnectContainer worker = new StrimziConnectContainer(imageName, kafkaCluster, configs) + .withNetwork(kafkaCluster.getNetwork()) + .withNetworkAliases(host) + .withExposedPorts(CONNECT_PORT) + .withEnv("LOG_DIR", "/tmp") + .waitForRunning() + .waitingFor(Wait.forHttp("/").forStatusCode(200)); + workers.add(worker); + } + } + + private Properties buildConfigs(String host) { + Properties properties = new Properties(); + properties.setProperty("bootstrap.servers", kafkaCluster.getNetworkBootstrapServers()); + properties.setProperty("group.id", groupId); + properties.setProperty("key.converter", "org.apache.kafka.connect.storage.StringConverter"); + properties.setProperty("value.converter", "org.apache.kafka.connect.storage.StringConverter"); + properties.setProperty("offset.storage.topic", "connect-offsets"); + properties.setProperty("offset.storage.replication.factor", "-1"); + properties.setProperty("config.storage.topic", "connect-configs"); + properties.setProperty("config.storage.replication.factor", "-1"); + properties.setProperty("status.storage.topic", "connect-status"); + properties.setProperty("status.storage.replication.factor", "-1"); + properties.setProperty("listeners", "http://:" + CONNECT_PORT + ",http://" + host + ":" + INTER_WORKER_PORT); + properties.putAll(additionalConnectConfiguration); + if (includeFileConnectors) { + String connectFileJar = "/opt/kafka/libs/connect-file-" + kafkaVersion + ".jar"; + if (properties.containsKey("plugin.path")) { + String pluginPath = properties.getProperty("plugin.path"); + properties.setProperty("plugin.path", pluginPath + "," + connectFileJar); + } else { + properties.setProperty("plugin.path", connectFileJar); + } + } + return properties; + } + + /** + * Get the workers of this Kafka Connect cluster. + * + * @return collection of GenericContainer containers + */ + public Collection> getWorkers() { + return new ArrayList<>(workers); + } + + /** + * Start the Kafka Connect cluster. + * This starts all the workers and waits for them to all be healthy and ready to be used. + */ + public void start() { + for (StrimziConnectContainer worker : workers) { + worker.start(); + } + } + + /** + * Stop the Kafka Connect cluster. + */ + public void stop() { + workers.forEach(StrimziConnectContainer::stop); + } + + /** + * Return the REST API endpoint of one of the available workers. + * + * @return the REST API endpoint + */ + public String getRestEndpoint() { + for (StrimziConnectContainer worker : workers) { + if (worker.isRunning()) { + return "http://" + worker.getHost() + ":" + worker.getMappedPort(CONNECT_PORT); + } + } + throw new IllegalStateException("No workers are running and healthy"); + } + + /** + * Builder class for {@code StrimziConnectCluster}. + *

+ * Use this builder to create instances of {@code StrimziConnectCluster}. + * You must at least call {@link #withKafkaCluster(StrimziKafkaCluster)}, and + * {@link #withGroupId(String)} before calling {@link #build()}. + *

+ */ + public static class StrimziConnectClusterBuilder { + + private Map additionalConnectConfiguration = new HashMap<>(); + private boolean includeFileConnectors = true; + private int workersNum = 1; + private String kafkaVersion; + private StrimziKafkaCluster kafkaCluster; + private String groupId; + + /** + * Set the Kafka cluster the Kafka Connect cluster will use to. + * + * @param kafkaCluster the {@link StrimziKafkaCluster} instance + * @return the current instance of {@code StrimziConnectClusterBuilder} for method chaining + */ + public StrimziConnectClusterBuilder withKafkaCluster(StrimziKafkaCluster kafkaCluster) { + this.kafkaCluster = kafkaCluster; + return this; + } + + /** + * Set the number of Kafka Connect workers in the cluster. + * If not called, the cluster has a single worker. + * + * @param workersNum the number of Kafka Connect workers + * @return the current instance of {@code StrimziConnectClusterBuilder} for method chaining + */ + public StrimziConnectClusterBuilder withNumberOfWorkers(int workersNum) { + this.workersNum = workersNum; + return this; + } + + /** + * Add additional Kafka Connect configuration parameters. + * These configurations are applied to all workers in the cluster. + * + * @param additionalConnectConfiguration a map of additional Kafka Connect configuration options + * @return the current instance of {@code StrimziConnectClusterBuilder} for method chaining + */ + public StrimziConnectClusterBuilder withAdditionalConnectConfiguration(Map additionalConnectConfiguration) { + this.additionalConnectConfiguration = additionalConnectConfiguration; + return this; + } + + /** + * Specify the Kafka version to be used for the Connect workers in the cluster. + * If not called, the latest Kafka version available from {@link KafkaVersionService} will be used. + * + * @param kafkaVersion the desired Kafka version for the Connect cluster + * @return the current instance of {@code StrimziConnectClusterBuilder} for method chaining + */ + public StrimziConnectClusterBuilder withKafkaVersion(String kafkaVersion) { + this.kafkaVersion = kafkaVersion; + return this; + } + + /** + * Disable the FileStreams connectors. + * If not called, the FileSteams connectors are added to plugin.path. + * + * @return the current instance of {@code StrimziConnectClusterBuilder} for method chaining + */ + public StrimziConnectClusterBuilder withoutFileConnectors() { + this.includeFileConnectors = false; + return this; + } + + /** + * Specify the group.id of the Connect cluster. + * + * @param groupId the group id + * @return the current instance of {@code StrimziConnectClusterBuilder} for method chaining + */ + public StrimziConnectClusterBuilder withGroupId(String groupId) { + this.groupId = groupId; + return this; + } + + /** + * Build and return a {@code StrimziConnectCluster} instance based on the provided configurations. + * + * @return a new instance of {@code StrimziConnectCluster} + */ + public StrimziConnectCluster build() { + if (kafkaCluster == null) { + throw new IllegalArgumentException("A Kafka cluster must be specified"); + } + if (groupId == null) { + throw new IllegalArgumentException("The Connect cluster group.id configuration must be specified"); + } + if (workersNum < 0) { + throw new IllegalArgumentException("The number of workers in the Connect cluster must be greater than 0"); + } + return new StrimziConnectCluster(this); + } + } +} diff --git a/src/main/java/io/strimzi/test/container/StrimziConnectContainer.java b/src/main/java/io/strimzi/test/container/StrimziConnectContainer.java new file mode 100644 index 0000000..9046dc4 --- /dev/null +++ b/src/main/java/io/strimzi/test/container/StrimziConnectContainer.java @@ -0,0 +1,66 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.test.container; + +import com.github.dockerjava.api.command.InspectContainerResponse; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.images.builder.Transferable; + +import java.io.IOException; +import java.io.StringWriter; +import java.io.UncheckedIOException; +import java.nio.charset.StandardCharsets; +import java.util.Properties; + +class StrimziConnectContainer extends GenericContainer { + + private static final String STARTER_SCRIPT = "/start_connect.sh"; + private static final String CONFIG_FILE = "/opt/kafka/config/connect.properties"; + + private final StrimziKafkaCluster kafkaCluster; + private final Properties configs; + + public StrimziConnectContainer(String imageName, StrimziKafkaCluster kafkaCluster, Properties configs) { + super(imageName); + this.kafkaCluster = kafkaCluster; + this.configs = configs; + } + + @Override + protected void doStart() { + super.setNetwork(kafkaCluster.getNetwork()); + super.setCommand("sh", "-c", "while [ ! -x " + STARTER_SCRIPT + " ]; do sleep 0.1; done; " + STARTER_SCRIPT); + super.doStart(); + } + + @Override + protected void containerIsStarting(final InspectContainerResponse containerInfo, final boolean reused) { + super.containerIsStarting(containerInfo, reused); + + // Write configs to a file in the container + StringWriter writer = new StringWriter(); + try { + configs.store(writer, null); + } catch (IOException e) { + throw new UncheckedIOException("Failed to build configuration file", e); + } + copyFileToContainer( + Transferable.of(writer.toString().getBytes(StandardCharsets.UTF_8)), + CONFIG_FILE); + + // Write starter script to a file in the container + String command = "/opt/kafka/bin/connect-distributed.sh " + CONFIG_FILE; + copyFileToContainer( + Transferable.of(command.getBytes(StandardCharsets.UTF_8), 700), + STARTER_SCRIPT + ); + } + + public StrimziConnectContainer waitForRunning() { + super.waitingFor(Wait.forLogMessage(".*Finished starting connectors and tasks.*", 1)); + return this; + } +} diff --git a/src/main/java/io/strimzi/test/container/StrimziKafkaCluster.java b/src/main/java/io/strimzi/test/container/StrimziKafkaCluster.java index 4a73bc4..fac8566 100644 --- a/src/main/java/io/strimzi/test/container/StrimziKafkaCluster.java +++ b/src/main/java/io/strimzi/test/container/StrimziKafkaCluster.java @@ -162,7 +162,7 @@ public static class StrimziKafkaClusterBuilder { * Sets the number of Kafka brokers in the cluster. * * @param brokersNum the number of Kafka brokers - * @return the current instance of {@code StrimziKafkaClusterBuilder} for method chaining + * @return the current instance of {@code StrimziConnectClusterBuilder} for method chaining */ public StrimziKafkaClusterBuilder withNumberOfBrokers(int brokersNum) { this.brokersNum = brokersNum; @@ -174,7 +174,7 @@ public StrimziKafkaClusterBuilder withNumberOfBrokers(int brokersNum) { * If not provided, it defaults to the number of brokers. * * @param internalTopicReplicationFactor the replication factor for internal topics - * @return the current instance of {@code StrimziKafkaClusterBuilder} for method chaining + * @return the current instance of {@code StrimziConnectClusterBuilder} for method chaining */ public StrimziKafkaClusterBuilder withInternalTopicReplicationFactor(int internalTopicReplicationFactor) { this.internalTopicReplicationFactor = internalTopicReplicationFactor; @@ -186,7 +186,7 @@ public StrimziKafkaClusterBuilder withInternalTopicReplicationFactor(int interna * These configurations are applied to all brokers in the cluster. * * @param additionalKafkaConfiguration a map of additional Kafka configuration options - * @return the current instance of {@code StrimziKafkaClusterBuilder} for method chaining + * @return the current instance of {@code StrimziConnectClusterBuilder} for method chaining */ public StrimziKafkaClusterBuilder withAdditionalKafkaConfiguration(Map additionalKafkaConfiguration) { if (additionalKafkaConfiguration != null) { @@ -199,7 +199,7 @@ public StrimziKafkaClusterBuilder withAdditionalKafkaConfiguration(Map * - * @return the current instance of {@code StrimziKafkaClusterBuilder} for method chaining + * @return the current instance of {@code StrimziConnectClusterBuilder} for method chaining */ public StrimziKafkaClusterBuilder withKraft() { this.enableKRaft = true; @@ -263,6 +263,16 @@ public Collection getBrokers() { return this.brokers; } + /** + * Get the bootstrap servers that containers on the same network should use to connect + * @return a comma separated list of Kafka bootstrap servers + */ + public String getNetworkBootstrapServers() { + return brokers.stream() + .map(broker -> ((StrimziKafkaContainer) broker).getNetworkBootstrapServers()) + .collect(Collectors.joining(",")); + } + @Override @DoNotMutate public boolean hasKraftOrExternalZooKeeperConfigured() { diff --git a/src/main/java/io/strimzi/test/container/StrimziKafkaContainer.java b/src/main/java/io/strimzi/test/container/StrimziKafkaContainer.java index 0783904..19e924c 100644 --- a/src/main/java/io/strimzi/test/container/StrimziKafkaContainer.java +++ b/src/main/java/io/strimzi/test/container/StrimziKafkaContainer.java @@ -612,6 +612,10 @@ public String getBootstrapServers() { return bootstrapServersProvider.apply(this); } + public String getNetworkBootstrapServers() { + return NETWORK_ALIAS_PREFIX + brokerId + ":" + INTER_BROKER_LISTENER_PORT; + } + /** * Get the cluster id. This is only supported for KRaft containers. * @return The cluster id. diff --git a/src/test/java/io/strimzi/test/container/StrimziConnectClusterIT.java b/src/test/java/io/strimzi/test/container/StrimziConnectClusterIT.java new file mode 100644 index 0000000..f0280b3 --- /dev/null +++ b/src/test/java/io/strimzi/test/container/StrimziConnectClusterIT.java @@ -0,0 +1,232 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.test.container; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; + +import java.net.HttpURLConnection; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.hasItem; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.MatcherAssert.assertThat; + +public class StrimziConnectClusterIT { + + public static final Set MIRROR_MAKER_CONNECTORS = Set.of( + "MirrorSourceConnector", + "MirrorCheckpointConnector", + "MirrorHeartbeatConnector"); + + public static final Set FILE_CONNECTORS = Set.of( + "FileStreamSinkConnector", + "FileStreamSourceConnector"); + + private StrimziKafkaCluster kafkaCluster; + private StrimziConnectCluster connectCluster; + + @BeforeEach + public void setUp() { + kafkaCluster = new StrimziKafkaCluster.StrimziKafkaClusterBuilder() + .withSharedNetwork() + .withNumberOfBrokers(1) + .withKraft() + .build(); + kafkaCluster.start(); + } + + @AfterEach + public void tearDown() { + kafkaCluster.stop(); + connectCluster.stop(); + } + + @Test + public void testBasicCluster() throws Exception { + connectCluster = new StrimziConnectCluster.StrimziConnectClusterBuilder() + .withGroupId("my-cluster") + .withKafkaCluster(kafkaCluster) + .build(); + connectCluster.start(); + + assertThat(connectCluster.getWorkers().size(), is(1)); + String root = httpGet("/"); + assertThat(root, containsString(getClusterId())); + String connectors = httpGet("/connector-plugins"); + for (String connector : MIRROR_MAKER_CONNECTORS) { + assertThat(connectors, containsString(connector)); + } + for (String connector : FILE_CONNECTORS) { + assertThat(connectors, containsString(connector)); + } + } + + @Test + public void testDisableFileConnectors() throws Exception { + connectCluster = new StrimziConnectCluster.StrimziConnectClusterBuilder() + .withGroupId("my-cluster") + .withKafkaCluster(kafkaCluster) + .withoutFileConnectors() + .build(); + connectCluster.start(); + + assertThat(connectCluster.getWorkers().size(), is(1)); + String root = httpGet("/"); + assertThat(root, containsString(getClusterId())); + String connectors = httpGet("/connector-plugins"); + for (String connector : MIRROR_MAKER_CONNECTORS) { + assertThat(connectors, containsString(connector)); + } + for (String connector : FILE_CONNECTORS) { + assertThat(connectors, not(containsString(connector))); + } + } + + @Test + public void testKafkaVersion() throws Exception { + String version = "3.8.1"; + connectCluster = new StrimziConnectCluster.StrimziConnectClusterBuilder() + .withGroupId("my-cluster") + .withKafkaCluster(kafkaCluster) + .withKafkaVersion(version) + .build(); + connectCluster.start(); + + assertThat(connectCluster.getWorkers().size(), is(1)); + String root = httpGet("/"); + assertThat(root, containsString(getClusterId())); + assertThat(root, containsString(version)); + } + + @Test + public void testOverrideConfigs() throws Exception { + String offsetTopic = "custom-offset-topic"; + connectCluster = new StrimziConnectCluster.StrimziConnectClusterBuilder() + .withGroupId("my-cluster") + .withKafkaCluster(kafkaCluster) + .withAdditionalConnectConfiguration(Map.of("offset.storage.topic", offsetTopic)) + .build(); + connectCluster.start(); + + assertThat(connectCluster.getWorkers().size(), is(1)); + String root = httpGet("/"); + assertThat(root, containsString(getClusterId())); + try (Admin admin = Admin.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.getBootstrapServers()))) { + Set topics = admin.listTopics().names().get(); + assertThat(topics, hasItem(offsetTopic)); + } + } + + @Test + public void testRunConnector() throws Exception { + String topic = "topic-to-export"; + String file = "/tmp/sink.out"; + String connectorName = "file-sink"; + List records = new ArrayList<>(); + try (Admin admin = Admin.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.getBootstrapServers()))) { + admin.createTopics(List.of(new NewTopic(topic, 1, (short) -1))); + } + try (KafkaProducer producer = new KafkaProducer<>(Map.of( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.getBootstrapServers(), + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(), + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName() + ))) { + for (int i = 0; i < 5; i++) { + String value = "record" + i; + producer.send(new ProducerRecord<>(topic, value)); + records.add(value); + } + } + connectCluster = new StrimziConnectCluster.StrimziConnectClusterBuilder() + .withGroupId("my-cluster") + .withKafkaCluster(kafkaCluster) + .build(); + connectCluster.start(); + assertThat(connectCluster.getWorkers().size(), is(1)); + String root = httpGet("/"); + assertThat(root, containsString(getClusterId())); + + String connectorConfig = + "{\n" + + " \"connector.class\":\"org.apache.kafka.connect.file.FileStreamSinkConnector\",\n" + + " \"topics\": \"" + topic + "\",\n" + + " \"file\": \"" + file + "\"\n" + + "}"; + String config = httpPut("/connectors/" + connectorName + "/config", connectorConfig); + assertThat(config, containsString("\"name\":\"" + connectorName + "\"")); + assertThat(config, containsString("\"type\":\"sink\"")); + String connectors = httpGet("/connectors"); + assertThat(connectors, containsString(connectorName)); + + GenericContainer worker = connectCluster.getWorkers().iterator().next(); + + for (String record : records) { + Utils.waitFor("Checking " + record + " is in " + file, + Duration.ofSeconds(5), + Duration.ofMinutes(1), + () -> { + try { + Container.ExecResult result = worker.execInContainer("sh", "-c", "cat " + file); + return result.getStdout().contains(record); + } catch (Exception exc) { + return false; + } + } + ); + } + } + + public String httpGet(String path) throws Exception { + HttpClient httpClient = HttpClient.newHttpClient(); + URI uri = new URI(connectCluster.getRestEndpoint() + path); + HttpRequest request = HttpRequest.newBuilder() + .GET() + .uri(uri) + .build(); + HttpResponse response = httpClient.send(request, HttpResponse.BodyHandlers.ofString()); + assertThat(response.statusCode(), is(HttpURLConnection.HTTP_OK)); + return response.body(); + } + + public String httpPut(String path, String body) throws Exception { + HttpClient httpClient = HttpClient.newHttpClient(); + URI uri = new URI(connectCluster.getRestEndpoint() + path); + HttpRequest request = HttpRequest.newBuilder() + .PUT(HttpRequest.BodyPublishers.ofString(body)) + .setHeader("Content-Type", "application/json") + .uri(uri) + .build(); + HttpResponse response = httpClient.send(request, HttpResponse.BodyHandlers.ofString()); + assertThat(response.statusCode(), is(HttpURLConnection.HTTP_CREATED)); + return response.body(); + } + + String getClusterId() throws Exception { + try (Admin admin = Admin.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.getBootstrapServers()))) { + return admin.describeCluster().clusterId().get(); + } + } +} diff --git a/src/test/java/io/strimzi/test/container/StrimziConnectClusterTest.java b/src/test/java/io/strimzi/test/container/StrimziConnectClusterTest.java new file mode 100644 index 0000000..af02647 --- /dev/null +++ b/src/test/java/io/strimzi/test/container/StrimziConnectClusterTest.java @@ -0,0 +1,67 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.test.container; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class StrimziConnectClusterTest { + + @Test + void testConnectClusterNegativeOrZeroNumberOfWorkers() { + assertThrows(IllegalArgumentException.class, () -> + new StrimziConnectCluster.StrimziConnectClusterBuilder() + .withNumberOfWorkers(0) + .build() + ); + assertThrows(IllegalArgumentException.class, () -> + new StrimziConnectCluster.StrimziConnectClusterBuilder() + .withNumberOfWorkers(-1) + .build() + ); + } + + @Test + void testConnectClusterWithoutBootstrapServers() { + assertThrows(IllegalArgumentException.class, () -> + new StrimziConnectCluster.StrimziConnectClusterBuilder() + .withGroupId("groupId") + .build() + ); + } + + @Test + void testConnectClusterWithoutGroupId() { + assertThrows(IllegalArgumentException.class, () -> + new StrimziConnectCluster.StrimziConnectClusterBuilder() + .withKafkaCluster(new StrimziKafkaCluster.StrimziKafkaClusterBuilder() + .withNumberOfBrokers(1) + .build()) + .build() + ); + } + + @Test + void testConnectCluster() { + StrimziConnectCluster cluster = new StrimziConnectCluster.StrimziConnectClusterBuilder() + .withKafkaCluster(new StrimziKafkaCluster.StrimziKafkaClusterBuilder() + .withNumberOfBrokers(1) + .build()) + .withGroupId("groupId") + .build(); + } + + @Test + void testGetRestApiEndpointThrowsBeforeStart() { + StrimziConnectCluster cluster = new StrimziConnectCluster.StrimziConnectClusterBuilder() + .withKafkaCluster(new StrimziKafkaCluster.StrimziKafkaClusterBuilder() + .withNumberOfBrokers(1) + .build()) + .withGroupId("groupId") + .build(); + assertThrows(IllegalStateException.class, cluster::getRestEndpoint); + } +}