Skip to content

Commit

Permalink
Remove deprecated usage of KafkaContainer (#492)
Browse files Browse the repository at this point in the history
  • Loading branch information
loicgreffier authored Dec 9, 2024
1 parent c51d9f3 commit 85591e4
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public abstract class KafkaConnectIntegrationTest extends KafkaIntegrationTest {
.withNetwork(NETWORK)
.withNetworkAliases("connect")
.withExposedPorts(8083)
.withEnv("CONNECT_BOOTSTRAP_SERVERS", "broker:9092")
.withEnv("CONNECT_BOOTSTRAP_SERVERS", "broker:9093")
.withEnv("CONNECT_REST_ADVERTISED_HOST_NAME", "connect")
.withEnv("CONNECT_GROUP_ID", "compose-connect-group-" + UUID.randomUUID())
.withEnv("CONNECT_CONFIG_STORAGE_TOPIC", "docker-connect-configs")
Expand All @@ -34,8 +34,10 @@ public abstract class KafkaConnectIntegrationTest extends KafkaIntegrationTest {
.withEnv("CONNECT_LOG4J_LOGGERS", "org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR")
.withEnv("CONNECT_SASL_MECHANISM", "PLAIN")
.withEnv("CONNECT_SECURITY_PROTOCOL", "SASL_PLAINTEXT")
.withEnv("CONNECT_SASL_JAAS_CONFIG",
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";")
.withEnv(
"CONNECT_SASL_JAAS_CONFIG",
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";"
)
.waitingFor(Wait.forHttp("/").forStatusCode(200));

@NonNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,11 @@

import io.micronaut.core.annotation.NonNull;
import io.micronaut.test.support.TestPropertyProvider;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.kafka.clients.admin.Admin;
import org.junit.jupiter.api.TestInstance;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.kafka.ConfluentKafkaContainer;
import org.testcontainers.utility.DockerImageName;

/**
Expand All @@ -24,33 +20,29 @@ public abstract class KafkaIntegrationTest implements TestPropertyProvider {
protected static final Network NETWORK = Network.newNetwork();
private Admin adminClient;

protected final KafkaContainer broker = new KafkaContainer(DockerImageName
protected final ConfluentKafkaContainer broker = new ConfluentKafkaContainer(DockerImageName
.parse("confluentinc/cp-kafka:" + CONFLUENT_PLATFORM_VERSION))
.withNetwork(NETWORK)
.withNetworkAliases("broker")
.withCreateContainerCmdModifier(modifier -> {
List<String> commands = new ArrayList<>(Arrays.asList(Objects.requireNonNull(modifier.getCmd())));
// Override the security protocol for CONTROLLER listener to SASL_PLAINTEXT defined by
// KafkaContainer#withRaft.
commands.set(1, "export KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=BROKER:SASL_PLAINTEXT,"
+ "PLAINTEXT:SASL_PLAINTEXT,CONTROLLER:SASL_PLAINTEXT; " + commands.get(1));
modifier.withCmd(commands);
})
.withEnv("KAFKA_SASL_ENABLED_MECHANISMS", "PLAIN")
.withEnv("KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL", "PLAIN")
.withEnv("KAFKA_SASL_MECHANISM_CONTROLLER_PROTOCOL", "PLAIN")
.withEnv(
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
"BROKER:SASL_PLAINTEXT,PLAINTEXT:SASL_PLAINTEXT,CONTROLLER:SASL_PLAINTEXT"
)
.withEnv("KAFKA_LISTENER_NAME_BROKER_PLAIN_SASL_JAAS_CONFIG", getSaslPlainJaasConfig())
.withEnv("KAFKA_LISTENER_NAME_CONTROLLER_PLAIN_SASL_JAAS_CONFIG", getSaslPlainJaasConfig())
.withEnv("KAFKA_LISTENER_NAME_PLAINTEXT_PLAIN_SASL_JAAS_CONFIG", getSaslPlainJaasConfig())
.withEnv("KAFKA_SUPER_USERS", "User:admin")
.withEnv("KAFKA_AUTHORIZER_CLASS_NAME", "org.apache.kafka.metadata.authorizer.StandardAuthorizer")
.withEnv("KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND", "false")
.withKraft();
.withEnv("KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND", "false");

@NonNull
@Override
public Map<String, String> getProperties() {
if (!broker.isRunning()) {
broker.withLogConsumer(outputFrame -> System.out.print(outputFrame.getUtf8String()));
broker.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public abstract class SchemaRegistryIntegrationTest extends KafkaIntegrationTest
.withExposedPorts(8081)
.withEnv("SCHEMA_REGISTRY_HOST_NAME", "schema-registry")
.withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8081")
.withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "broker:9092")
.withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "broker:9093")
.withEnv("SCHEMA_REGISTRY_KAFKASTORE_SASL_MECHANISM", "PLAIN")
.withEnv("SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL", "SASL_PLAINTEXT")
.withEnv("SCHEMA_REGISTRY_KAFKASTORE_SASL_JAAS_CONFIG",
Expand Down

0 comments on commit 85591e4

Please sign in to comment.