From 85591e4fde27d1b61cfa9b892145d5e27c370da6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20GREFFIER?= Date: Mon, 9 Dec 2024 09:49:07 +0100 Subject: [PATCH] Remove deprecated usage of KafkaContainer (#492) --- .../KafkaConnectIntegrationTest.java | 8 ++++--- .../container/KafkaIntegrationTest.java | 24 +++++++------------ .../SchemaRegistryIntegrationTest.java | 2 +- 3 files changed, 14 insertions(+), 20 deletions(-) diff --git a/src/test/java/com/michelin/ns4kafka/integration/container/KafkaConnectIntegrationTest.java b/src/test/java/com/michelin/ns4kafka/integration/container/KafkaConnectIntegrationTest.java index 034426a2..634cddc1 100644 --- a/src/test/java/com/michelin/ns4kafka/integration/container/KafkaConnectIntegrationTest.java +++ b/src/test/java/com/michelin/ns4kafka/integration/container/KafkaConnectIntegrationTest.java @@ -19,7 +19,7 @@ public abstract class KafkaConnectIntegrationTest extends KafkaIntegrationTest { .withNetwork(NETWORK) .withNetworkAliases("connect") .withExposedPorts(8083) - .withEnv("CONNECT_BOOTSTRAP_SERVERS", "broker:9092") + .withEnv("CONNECT_BOOTSTRAP_SERVERS", "broker:9093") .withEnv("CONNECT_REST_ADVERTISED_HOST_NAME", "connect") .withEnv("CONNECT_GROUP_ID", "compose-connect-group-" + UUID.randomUUID()) .withEnv("CONNECT_CONFIG_STORAGE_TOPIC", "docker-connect-configs") @@ -34,8 +34,10 @@ public abstract class KafkaConnectIntegrationTest extends KafkaIntegrationTest { .withEnv("CONNECT_LOG4J_LOGGERS", "org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR") .withEnv("CONNECT_SASL_MECHANISM", "PLAIN") .withEnv("CONNECT_SECURITY_PROTOCOL", "SASL_PLAINTEXT") - .withEnv("CONNECT_SASL_JAAS_CONFIG", - "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";") + .withEnv( + "CONNECT_SASL_JAAS_CONFIG", + "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";" + ) .waitingFor(Wait.forHttp("/").forStatusCode(200)); @NonNull diff --git a/src/test/java/com/michelin/ns4kafka/integration/container/KafkaIntegrationTest.java b/src/test/java/com/michelin/ns4kafka/integration/container/KafkaIntegrationTest.java index 1f47c960..71f86c9d 100644 --- a/src/test/java/com/michelin/ns4kafka/integration/container/KafkaIntegrationTest.java +++ b/src/test/java/com/michelin/ns4kafka/integration/container/KafkaIntegrationTest.java @@ -4,15 +4,11 @@ import io.micronaut.core.annotation.NonNull; import io.micronaut.test.support.TestPropertyProvider; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; import java.util.Map; -import java.util.Objects; import org.apache.kafka.clients.admin.Admin; import org.junit.jupiter.api.TestInstance; -import org.testcontainers.containers.KafkaContainer; import org.testcontainers.containers.Network; +import org.testcontainers.kafka.ConfluentKafkaContainer; import org.testcontainers.utility.DockerImageName; /** @@ -24,33 +20,29 @@ public abstract class KafkaIntegrationTest implements TestPropertyProvider { protected static final Network NETWORK = Network.newNetwork(); private Admin adminClient; - protected final KafkaContainer broker = new KafkaContainer(DockerImageName + protected final ConfluentKafkaContainer broker = new ConfluentKafkaContainer(DockerImageName .parse("confluentinc/cp-kafka:" + CONFLUENT_PLATFORM_VERSION)) .withNetwork(NETWORK) .withNetworkAliases("broker") - .withCreateContainerCmdModifier(modifier -> { - List commands = new ArrayList<>(Arrays.asList(Objects.requireNonNull(modifier.getCmd()))); - // Override the security protocol for CONTROLLER listener to SASL_PLAINTEXT defined by - // KafkaContainer#withRaft. - commands.set(1, "export KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=BROKER:SASL_PLAINTEXT," - + "PLAINTEXT:SASL_PLAINTEXT,CONTROLLER:SASL_PLAINTEXT; " + commands.get(1)); - modifier.withCmd(commands); - }) .withEnv("KAFKA_SASL_ENABLED_MECHANISMS", "PLAIN") .withEnv("KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL", "PLAIN") .withEnv("KAFKA_SASL_MECHANISM_CONTROLLER_PROTOCOL", "PLAIN") + .withEnv( + "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", + "BROKER:SASL_PLAINTEXT,PLAINTEXT:SASL_PLAINTEXT,CONTROLLER:SASL_PLAINTEXT" + ) .withEnv("KAFKA_LISTENER_NAME_BROKER_PLAIN_SASL_JAAS_CONFIG", getSaslPlainJaasConfig()) .withEnv("KAFKA_LISTENER_NAME_CONTROLLER_PLAIN_SASL_JAAS_CONFIG", getSaslPlainJaasConfig()) .withEnv("KAFKA_LISTENER_NAME_PLAINTEXT_PLAIN_SASL_JAAS_CONFIG", getSaslPlainJaasConfig()) .withEnv("KAFKA_SUPER_USERS", "User:admin") .withEnv("KAFKA_AUTHORIZER_CLASS_NAME", "org.apache.kafka.metadata.authorizer.StandardAuthorizer") - .withEnv("KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND", "false") - .withKraft(); + .withEnv("KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND", "false"); @NonNull @Override public Map getProperties() { if (!broker.isRunning()) { + broker.withLogConsumer(outputFrame -> System.out.print(outputFrame.getUtf8String())); broker.start(); } diff --git a/src/test/java/com/michelin/ns4kafka/integration/container/SchemaRegistryIntegrationTest.java b/src/test/java/com/michelin/ns4kafka/integration/container/SchemaRegistryIntegrationTest.java index bb610891..6d5c2362 100644 --- a/src/test/java/com/michelin/ns4kafka/integration/container/SchemaRegistryIntegrationTest.java +++ b/src/test/java/com/michelin/ns4kafka/integration/container/SchemaRegistryIntegrationTest.java @@ -20,7 +20,7 @@ public abstract class SchemaRegistryIntegrationTest extends KafkaIntegrationTest .withExposedPorts(8081) .withEnv("SCHEMA_REGISTRY_HOST_NAME", "schema-registry") .withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8081") - .withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "broker:9092") + .withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "broker:9093") .withEnv("SCHEMA_REGISTRY_KAFKASTORE_SASL_MECHANISM", "PLAIN") .withEnv("SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL", "SASL_PLAINTEXT") .withEnv("SCHEMA_REGISTRY_KAFKASTORE_SASL_JAAS_CONFIG",