From b42607c13ac436971d1444f42ee3568b340ea648 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20GREFFIER?= Date: Tue, 27 Aug 2024 13:25:36 +0200 Subject: [PATCH] Migrate integration tests to Kraft (#434) --- .docker/config/broker/kafka_server_jaas.conf | 6 -- build.gradle | 6 +- .../ns4kafka/repository/kafka/KafkaStore.java | 17 +-- .../service/executor/TopicAsyncExecutor.java | 17 +-- .../AbstractIntegrationConnectTest.java | 43 -------- ...AbstractIntegrationSchemaRegistryTest.java | 45 -------- .../integration/AbstractIntegrationTest.java | 86 --------------- .../integration/AclIntegrationTest.java | 5 +- .../ApiResourcesIntegrationTest.java | 5 +- ...est.java => ConnectorIntegrationTest.java} | 11 +- .../ExceptionHandlerIntegrationTest.java | 5 +- .../integration/LoginIntegrationTest.java | 5 +- .../integration/NamespaceIntegrationTest.java | 5 +- .../integration/SchemaIntegrationTest.java | 7 +- .../integration/StreamIntegrationTest.java | 21 ++-- .../integration/TopicIntegrationTest.java | 27 +++-- .../integration/UserIntegrationTest.java | 5 +- .../KafkaConnectIntegrationTest.java | 57 ++++++++++ .../container/KafkaIntegrationTest.java | 89 +++++++++++++++ .../SchemaRegistryIntegrationTest.java | 47 ++++++++ .../testcontainers/KafkaConnectContainer.java | 102 ------------------ .../SchemaRegistryContainer.java | 45 -------- src/test/resources/application-test.yml | 23 ++-- 23 files changed, 279 insertions(+), 400 deletions(-) delete mode 100644 src/test/java/com/michelin/ns4kafka/integration/AbstractIntegrationConnectTest.java delete mode 100644 src/test/java/com/michelin/ns4kafka/integration/AbstractIntegrationSchemaRegistryTest.java delete mode 100644 src/test/java/com/michelin/ns4kafka/integration/AbstractIntegrationTest.java rename src/test/java/com/michelin/ns4kafka/integration/{ConnectIntegrationTest.java => ConnectorIntegrationTest.java} (98%) create mode 100644 src/test/java/com/michelin/ns4kafka/integration/container/KafkaConnectIntegrationTest.java create mode 100644 src/test/java/com/michelin/ns4kafka/integration/container/KafkaIntegrationTest.java create mode 100644 src/test/java/com/michelin/ns4kafka/integration/container/SchemaRegistryIntegrationTest.java delete mode 100644 src/test/java/com/michelin/ns4kafka/testcontainers/KafkaConnectContainer.java delete mode 100644 src/test/java/com/michelin/ns4kafka/testcontainers/SchemaRegistryContainer.java diff --git a/.docker/config/broker/kafka_server_jaas.conf b/.docker/config/broker/kafka_server_jaas.conf index e7c82f3f..3158200a 100644 --- a/.docker/config/broker/kafka_server_jaas.conf +++ b/.docker/config/broker/kafka_server_jaas.conf @@ -1,10 +1,4 @@ KafkaServer { - org.apache.kafka.common.security.scram.ScramLoginModule required - username="admin" password="admin" - user_admin="admin"; -}; - -KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin"; }; \ No newline at end of file diff --git a/build.gradle b/build.gradle index 53babf7c..038497b3 100644 --- a/build.gradle +++ b/build.gradle @@ -46,9 +46,9 @@ dependencies { runtimeOnly("ch.qos.logback:logback-classic") testImplementation("org.mockito:mockito-core") - testImplementation("org.testcontainers:junit-jupiter") - testImplementation("org.testcontainers:testcontainers") - testImplementation("org.testcontainers:kafka") + testImplementation("org.testcontainers:junit-jupiter:1.20.0") + testImplementation("org.testcontainers:testcontainers:1.20.0") + testImplementation("org.testcontainers:kafka:1.20.0") testImplementation("org.mockito:mockito-junit-jupiter:5.12.0") testImplementation("org.junit.jupiter:junit-jupiter-params:5.11.0") testImplementation("io.projectreactor:reactor-test") diff --git a/src/main/java/com/michelin/ns4kafka/repository/kafka/KafkaStore.java b/src/main/java/com/michelin/ns4kafka/repository/kafka/KafkaStore.java index 07ebeca7..8dd2aa72 100644 --- a/src/main/java/com/michelin/ns4kafka/repository/kafka/KafkaStore.java +++ b/src/main/java/com/michelin/ns4kafka/repository/kafka/KafkaStore.java @@ -41,25 +41,30 @@ */ @Slf4j public abstract class KafkaStore { - private final Map store; - private final AtomicBoolean initialized = new AtomicBoolean(false); - private final ReentrantLock offsetUpdateLock; - private final Condition offsetReachedThreshold; @Inject ApplicationContext applicationContext; + @Inject AdminClient adminClient; + @Inject KafkaStoreProperties kafkaStoreProperties; + @Inject @Named(TaskExecutors.SCHEDULED) TaskScheduler taskScheduler; + + @Property(name = "ns4kafka.store.kafka.init-timeout") + int initTimeout; + + private final Map store; + private final AtomicBoolean initialized = new AtomicBoolean(false); + private final ReentrantLock offsetUpdateLock; + private final Condition offsetReachedThreshold; String kafkaTopic; Producer kafkaProducer; long offsetInSchemasTopic = -1; long lastWrittenOffset = -1; - @Property(name = "ns4kafka.store.kafka.init-timeout") - int initTimeout; KafkaStore(String kafkaTopic, Producer kafkaProducer) { this.kafkaTopic = kafkaTopic; diff --git a/src/main/java/com/michelin/ns4kafka/service/executor/TopicAsyncExecutor.java b/src/main/java/com/michelin/ns4kafka/service/executor/TopicAsyncExecutor.java index a0c9f54a..663fb8ea 100644 --- a/src/main/java/com/michelin/ns4kafka/service/executor/TopicAsyncExecutor.java +++ b/src/main/java/com/michelin/ns4kafka/service/executor/TopicAsyncExecutor.java @@ -98,14 +98,19 @@ public void synchronizeTopics() { Map> updateTopics = checkTopics.stream() .map(topic -> { - Map actualConf = - brokerTopics.get(topic.getMetadata().getName()).getSpec().getConfigs(); - Map expectedConf = - topic.getSpec().getConfigs() == null ? Map.of() : topic.getSpec().getConfigs(); + Map actualConf = brokerTopics.get(topic.getMetadata().getName()) + .getSpec() + .getConfigs(); + + Map expectedConf = topic.getSpec().getConfigs() == null + ? Map.of() : topic.getSpec().getConfigs(); + Collection topicConfigChanges = computeConfigChanges(expectedConf, actualConf); if (!topicConfigChanges.isEmpty()) { - ConfigResource cr = - new ConfigResource(ConfigResource.Type.TOPIC, topic.getMetadata().getName()); + ConfigResource cr = new ConfigResource( + ConfigResource.Type.TOPIC, + topic.getMetadata().getName() + ); return Map.entry(cr, topicConfigChanges); } return null; diff --git a/src/test/java/com/michelin/ns4kafka/integration/AbstractIntegrationConnectTest.java b/src/test/java/com/michelin/ns4kafka/integration/AbstractIntegrationConnectTest.java deleted file mode 100644 index 9e161333..00000000 --- a/src/test/java/com/michelin/ns4kafka/integration/AbstractIntegrationConnectTest.java +++ /dev/null @@ -1,43 +0,0 @@ -package com.michelin.ns4kafka.integration; - -import com.michelin.ns4kafka.testcontainers.KafkaConnectContainer; -import io.micronaut.core.annotation.NonNull; -import java.util.HashMap; -import java.util.Map; -import org.junit.jupiter.api.TestInstance; -import org.testcontainers.utility.DockerImageName; - -/** - * Kafka Connect integration test. - */ -@TestInstance(TestInstance.Lifecycle.PER_CLASS) -public abstract class AbstractIntegrationConnectTest extends AbstractIntegrationTest { - public KafkaConnectContainer connectContainer; - - /** - * Starts the Kafka Connect container. - * - * @return Properties enriched with the Kafka Connect URL - */ - @NonNull - @Override - public Map getProperties() { - Map brokerProps = super.getProperties(); - if (connectContainer == null || !connectContainer.isRunning()) { - connectContainer = - new KafkaConnectContainer(DockerImageName.parse("confluentinc/cp-kafka-connect:" + CONFLUENT_VERSION), - "kafka:9092") - .withEnv("CONNECT_SASL_MECHANISM", "PLAIN") - .withEnv("CONNECT_SECURITY_PROTOCOL", "SASL_PLAINTEXT") - .withEnv("CONNECT_SASL_JAAS_CONFIG", - "org.apache.kafka.common.security.plain.PlainLoginModule " - + "required username=\"admin\" password=\"admin\";") - .withNetwork(network); - connectContainer.start(); - } - - Map properties = new HashMap<>(brokerProps); - properties.put("ns4kafka.managed-clusters.test-cluster.connects.test-connect.url", connectContainer.getUrl()); - return properties; - } -} diff --git a/src/test/java/com/michelin/ns4kafka/integration/AbstractIntegrationSchemaRegistryTest.java b/src/test/java/com/michelin/ns4kafka/integration/AbstractIntegrationSchemaRegistryTest.java deleted file mode 100644 index d8496fff..00000000 --- a/src/test/java/com/michelin/ns4kafka/integration/AbstractIntegrationSchemaRegistryTest.java +++ /dev/null @@ -1,45 +0,0 @@ -package com.michelin.ns4kafka.integration; - -import com.michelin.ns4kafka.testcontainers.SchemaRegistryContainer; -import io.micronaut.core.annotation.NonNull; -import java.util.HashMap; -import java.util.Map; -import org.junit.jupiter.api.TestInstance; -import org.testcontainers.utility.DockerImageName; - -/** - * Schema Registry integration test. - */ -@TestInstance(TestInstance.Lifecycle.PER_CLASS) -public abstract class AbstractIntegrationSchemaRegistryTest extends AbstractIntegrationTest { - public static final String CONFLUENT_REGISTRY_VERSION = "7.4.1"; - public SchemaRegistryContainer schemaRegistryContainer; - - /** - * Starts the Schema registry container. - * - * @return Properties enriched with the Schema Registry URL - */ - @NonNull - @Override - public Map getProperties() { - Map brokerProps = super.getProperties(); - if (schemaRegistryContainer == null || !schemaRegistryContainer.isRunning()) { - schemaRegistryContainer = new SchemaRegistryContainer( - DockerImageName.parse("confluentinc/cp-schema-registry:" + CONFLUENT_REGISTRY_VERSION), - "kafka:9092") - .withEnv("SCHEMA_REGISTRY_KAFKASTORE_SASL_MECHANISM", "PLAIN") - .withEnv("SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL", "SASL_PLAINTEXT") - .withEnv("SCHEMA_REGISTRY_KAFKASTORE_SASL_JAAS_CONFIG", - "org.apache.kafka.common.security.plain.PlainLoginModule " - + "required username=\"admin\" password=\"admin\";") - .withNetwork(network); - - schemaRegistryContainer.start(); - } - - Map properties = new HashMap<>(brokerProps); - properties.put("ns4kafka.managed-clusters.test-cluster.schemaRegistry.url", schemaRegistryContainer.getUrl()); - return properties; - } -} diff --git a/src/test/java/com/michelin/ns4kafka/integration/AbstractIntegrationTest.java b/src/test/java/com/michelin/ns4kafka/integration/AbstractIntegrationTest.java deleted file mode 100644 index f88646f8..00000000 --- a/src/test/java/com/michelin/ns4kafka/integration/AbstractIntegrationTest.java +++ /dev/null @@ -1,86 +0,0 @@ -package com.michelin.ns4kafka.integration; - -import io.micronaut.core.annotation.NonNull; -import io.micronaut.test.support.TestPropertyProvider; -import java.util.Map; -import org.apache.kafka.clients.admin.Admin; -import org.junit.jupiter.api.TestInstance; -import org.testcontainers.containers.KafkaContainer; -import org.testcontainers.containers.Network; -import org.testcontainers.utility.DockerImageName; - -/** - * Abstract integration test. - */ -@TestInstance(TestInstance.Lifecycle.PER_CLASS) -public abstract class AbstractIntegrationTest implements TestPropertyProvider { - public static final String CONFLUENT_VERSION = "7.4.1"; - - public KafkaContainer kafka; - public Network network; - private Admin adminClient; - - @NonNull - @Override - public Map getProperties() { - if (kafka == null || !kafka.isRunning()) { - network = Network.newNetwork(); - kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:" + CONFLUENT_VERSION)) - .withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "PLAINTEXT:SASL_PLAINTEXT,BROKER:SASL_PLAINTEXT") - .withEnv("KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL", "PLAIN") - .withEnv("KAFKA_LISTENER_NAME_PLAINTEXT_SASL_ENABLED_MECHANISMS", "PLAIN") - .withEnv("KAFKA_LISTENER_NAME_BROKER_SASL_ENABLED_MECHANISMS", "PLAIN") - .withEnv("KAFKA_LISTENER_NAME_BROKER_PLAIN_SASL_JAAS_CONFIG", getJaasConfig()) - .withEnv("KAFKA_LISTENER_NAME_PLAINTEXT_PLAIN_SASL_JAAS_CONFIG", getJaasConfig()) - .withEnv("KAFKA_AUTHORIZER_CLASS_NAME", "kafka.security.authorizer.AclAuthorizer") - .withEnv("KAFKA_SUPER_USERS", "User:admin") - .withNetworkAliases("kafka") - .withNetwork(network); - kafka.start(); - } - - return Map.of( - "kafka.bootstrap.servers", kafka.getHost() + ":" + kafka.getMappedPort(9093), - "kafka.sasl.mechanism", "PLAIN", - "kafka.security.protocol", "SASL_PLAINTEXT", - "kafka.sasl.jaas.config", - "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";", - "ns4kafka.managed-clusters.test-cluster.config.bootstrap.servers", - kafka.getHost() + ":" + kafka.getMappedPort(9093), - "ns4kafka.managed-clusters.test-cluster.config.sasl.mechanism", "PLAIN", - "ns4kafka.managed-clusters.test-cluster.config.security.protocol", "SASL_PLAINTEXT", - "ns4kafka.managed-clusters.test-cluster.config.sasl.jaas.config", - "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";" - ); - } - - /** - * Get the JAAS config. - * - * @return The JAAS config - */ - private static String getJaasConfig() { - return "org.apache.kafka.common.security.plain.PlainLoginModule required " - + "username=\"admin\" password=\"admin\" " - + "user_admin=\"admin\" " - + "user_client=\"client\";"; - } - - /** - * Getter for admin client service. - * - * @return The admin client - */ - public Admin getAdminClient() { - if (adminClient == null) { - adminClient = Admin.create(Map.of( - "bootstrap.servers", kafka.getHost() + ":" + kafka.getMappedPort(9093), - "sasl.mechanism", "PLAIN", - "security.protocol", "SASL_PLAINTEXT", - "sasl.jaas.config", - "org.apache.kafka.common.security.plain.PlainLoginModule " - + "required username=\"admin\" password=\"admin\";")); - } - return adminClient; - } -} diff --git a/src/test/java/com/michelin/ns4kafka/integration/AclIntegrationTest.java b/src/test/java/com/michelin/ns4kafka/integration/AclIntegrationTest.java index 30661dff..727b1396 100644 --- a/src/test/java/com/michelin/ns4kafka/integration/AclIntegrationTest.java +++ b/src/test/java/com/michelin/ns4kafka/integration/AclIntegrationTest.java @@ -4,6 +4,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import com.michelin.ns4kafka.integration.TopicIntegrationTest.BearerAccessRefreshToken; +import com.michelin.ns4kafka.integration.container.KafkaIntegrationTest; import com.michelin.ns4kafka.model.AccessControlEntry; import com.michelin.ns4kafka.model.AccessControlEntry.AccessControlEntrySpec; import com.michelin.ns4kafka.model.AccessControlEntry.Permission; @@ -21,7 +22,6 @@ import com.michelin.ns4kafka.model.RoleBinding.Verb; import com.michelin.ns4kafka.service.executor.AccessControlEntryAsyncExecutor; import com.michelin.ns4kafka.validation.TopicValidator; -import io.micronaut.context.annotation.Property; import io.micronaut.http.HttpMethod; import io.micronaut.http.HttpRequest; import io.micronaut.http.HttpResponse; @@ -47,8 +47,7 @@ import org.junit.jupiter.api.Test; @MicronautTest -@Property(name = "micronaut.security.gitlab.enabled", value = "false") -class AclIntegrationTest extends AbstractIntegrationTest { +class AclIntegrationTest extends KafkaIntegrationTest { @Inject @Client("/") HttpClient ns4KafkaClient; diff --git a/src/test/java/com/michelin/ns4kafka/integration/ApiResourcesIntegrationTest.java b/src/test/java/com/michelin/ns4kafka/integration/ApiResourcesIntegrationTest.java index 302b0861..b2d6942a 100644 --- a/src/test/java/com/michelin/ns4kafka/integration/ApiResourcesIntegrationTest.java +++ b/src/test/java/com/michelin/ns4kafka/integration/ApiResourcesIntegrationTest.java @@ -3,11 +3,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import com.michelin.ns4kafka.controller.ApiResourcesController; +import com.michelin.ns4kafka.integration.container.KafkaIntegrationTest; import com.michelin.ns4kafka.model.Metadata; import com.michelin.ns4kafka.model.Namespace; import com.michelin.ns4kafka.model.RoleBinding; import com.michelin.ns4kafka.validation.TopicValidator; -import io.micronaut.context.annotation.Property; import io.micronaut.core.type.Argument; import io.micronaut.http.HttpMethod; import io.micronaut.http.HttpRequest; @@ -21,8 +21,7 @@ import org.junit.jupiter.api.Test; @MicronautTest -@Property(name = "micronaut.security.gitlab.enabled", value = "false") -class ApiResourcesIntegrationTest extends AbstractIntegrationTest { +class ApiResourcesIntegrationTest extends KafkaIntegrationTest { @Inject @Client("/") HttpClient ns4KafkaClient; diff --git a/src/test/java/com/michelin/ns4kafka/integration/ConnectIntegrationTest.java b/src/test/java/com/michelin/ns4kafka/integration/ConnectorIntegrationTest.java similarity index 98% rename from src/test/java/com/michelin/ns4kafka/integration/ConnectIntegrationTest.java rename to src/test/java/com/michelin/ns4kafka/integration/ConnectorIntegrationTest.java index c6383869..55c456c8 100644 --- a/src/test/java/com/michelin/ns4kafka/integration/ConnectIntegrationTest.java +++ b/src/test/java/com/michelin/ns4kafka/integration/ConnectorIntegrationTest.java @@ -6,6 +6,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import com.michelin.ns4kafka.integration.TopicIntegrationTest.BearerAccessRefreshToken; +import com.michelin.ns4kafka.integration.container.KafkaConnectIntegrationTest; import com.michelin.ns4kafka.model.AccessControlEntry; import com.michelin.ns4kafka.model.AccessControlEntry.AccessControlEntrySpec; import com.michelin.ns4kafka.model.AccessControlEntry.Permission; @@ -32,7 +33,6 @@ import com.michelin.ns4kafka.validation.ConnectValidator; import com.michelin.ns4kafka.validation.TopicValidator; import io.micronaut.context.ApplicationContext; -import io.micronaut.context.annotation.Property; import io.micronaut.http.HttpMethod; import io.micronaut.http.HttpRequest; import io.micronaut.http.HttpResponse; @@ -53,8 +53,7 @@ @Slf4j @MicronautTest -@Property(name = "micronaut.security.gitlab.enabled", value = "false") -class ConnectIntegrationTest extends AbstractIntegrationConnectTest { +class ConnectorIntegrationTest extends KafkaConnectIntegrationTest { @Inject private ApplicationContext applicationContext; @@ -75,7 +74,7 @@ class ConnectIntegrationTest extends AbstractIntegrationConnectTest { @BeforeAll void init() { // Create HTTP client as bean to load client configuration from application.yml - connectClient = applicationContext.createBean(HttpClient.class, connectContainer.getUrl()); + connectClient = applicationContext.createBean(HttpClient.class, getConnectUrl()); Namespace namespace = Namespace.builder() .metadata(Metadata.builder() @@ -181,7 +180,7 @@ void shouldGetConnectClusterVersion() { .toBlocking() .retrieve(HttpRequest.GET("/"), ServerInfo.class); - assertEquals("7.4.1-ccs", actual.version()); + assertEquals("7.7.0-ccs", actual.version()); } @Test @@ -470,7 +469,7 @@ void shouldRestartConnector() throws InterruptedException { assertEquals(HttpStatus.OK, restartResponse.status()); waitForConnectorAndTasksToBeInState("ns1-co1", Connector.TaskState.RUNNING); - + ConnectorStateInfo actual = connectClient .toBlocking() .retrieve(HttpRequest.GET("/connectors/ns1-co1/status"), ConnectorStateInfo.class); diff --git a/src/test/java/com/michelin/ns4kafka/integration/ExceptionHandlerIntegrationTest.java b/src/test/java/com/michelin/ns4kafka/integration/ExceptionHandlerIntegrationTest.java index c92a7556..6b091ab2 100644 --- a/src/test/java/com/michelin/ns4kafka/integration/ExceptionHandlerIntegrationTest.java +++ b/src/test/java/com/michelin/ns4kafka/integration/ExceptionHandlerIntegrationTest.java @@ -5,6 +5,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import com.michelin.ns4kafka.integration.TopicIntegrationTest.BearerAccessRefreshToken; +import com.michelin.ns4kafka.integration.container.KafkaIntegrationTest; import com.michelin.ns4kafka.model.AccessControlEntry; import com.michelin.ns4kafka.model.AccessControlEntry.AccessControlEntrySpec; import com.michelin.ns4kafka.model.AccessControlEntry.Permission; @@ -23,7 +24,6 @@ import com.michelin.ns4kafka.model.Topic; import com.michelin.ns4kafka.model.Topic.TopicSpec; import com.michelin.ns4kafka.validation.TopicValidator; -import io.micronaut.context.annotation.Property; import io.micronaut.http.HttpMethod; import io.micronaut.http.HttpRequest; import io.micronaut.http.HttpResponse; @@ -41,8 +41,7 @@ import org.junit.jupiter.api.Test; @MicronautTest -@Property(name = "micronaut.security.gitlab.enabled", value = "false") -class ExceptionHandlerIntegrationTest extends AbstractIntegrationTest { +class ExceptionHandlerIntegrationTest extends KafkaIntegrationTest { @Inject @Client("/") HttpClient ns4KafkaClient; diff --git a/src/test/java/com/michelin/ns4kafka/integration/LoginIntegrationTest.java b/src/test/java/com/michelin/ns4kafka/integration/LoginIntegrationTest.java index 0b702d30..afcb4314 100644 --- a/src/test/java/com/michelin/ns4kafka/integration/LoginIntegrationTest.java +++ b/src/test/java/com/michelin/ns4kafka/integration/LoginIntegrationTest.java @@ -2,7 +2,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; -import io.micronaut.context.annotation.Property; +import com.michelin.ns4kafka.integration.container.KafkaIntegrationTest; import io.micronaut.http.HttpRequest; import io.micronaut.http.HttpResponse; import io.micronaut.http.HttpStatus; @@ -14,8 +14,7 @@ import org.junit.jupiter.api.Test; @MicronautTest -@Property(name = "micronaut.security.gitlab.enabled", value = "false") -class LoginIntegrationTest extends AbstractIntegrationTest { +class LoginIntegrationTest extends KafkaIntegrationTest { @Inject @Client("/") HttpClient ns4KafkaClient; diff --git a/src/test/java/com/michelin/ns4kafka/integration/NamespaceIntegrationTest.java b/src/test/java/com/michelin/ns4kafka/integration/NamespaceIntegrationTest.java index 7460d6f7..1bc60111 100644 --- a/src/test/java/com/michelin/ns4kafka/integration/NamespaceIntegrationTest.java +++ b/src/test/java/com/michelin/ns4kafka/integration/NamespaceIntegrationTest.java @@ -4,6 +4,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import com.michelin.ns4kafka.integration.container.KafkaIntegrationTest; import com.michelin.ns4kafka.model.AccessControlEntry; import com.michelin.ns4kafka.model.Metadata; import com.michelin.ns4kafka.model.Namespace; @@ -11,7 +12,6 @@ import com.michelin.ns4kafka.model.Status; import com.michelin.ns4kafka.model.Topic; import com.michelin.ns4kafka.validation.TopicValidator; -import io.micronaut.context.annotation.Property; import io.micronaut.http.HttpMethod; import io.micronaut.http.HttpRequest; import io.micronaut.http.HttpResponse; @@ -29,8 +29,7 @@ import org.junit.jupiter.api.Test; @MicronautTest -@Property(name = "micronaut.security.gitlab.enabled", value = "false") -class NamespaceIntegrationTest extends AbstractIntegrationTest { +class NamespaceIntegrationTest extends KafkaIntegrationTest { @Inject @Client("/") HttpClient ns4KafkaClient; diff --git a/src/test/java/com/michelin/ns4kafka/integration/SchemaIntegrationTest.java b/src/test/java/com/michelin/ns4kafka/integration/SchemaIntegrationTest.java index e79a8b2b..18ea97f1 100644 --- a/src/test/java/com/michelin/ns4kafka/integration/SchemaIntegrationTest.java +++ b/src/test/java/com/michelin/ns4kafka/integration/SchemaIntegrationTest.java @@ -6,6 +6,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import com.michelin.ns4kafka.integration.TopicIntegrationTest.BearerAccessRefreshToken; +import com.michelin.ns4kafka.integration.container.SchemaRegistryIntegrationTest; import com.michelin.ns4kafka.model.AccessControlEntry; import com.michelin.ns4kafka.model.Metadata; import com.michelin.ns4kafka.model.Namespace; @@ -18,7 +19,6 @@ import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest; import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaResponse; import io.micronaut.context.ApplicationContext; -import io.micronaut.context.annotation.Property; import io.micronaut.core.type.Argument; import io.micronaut.http.HttpMethod; import io.micronaut.http.HttpRequest; @@ -36,8 +36,7 @@ import org.junit.jupiter.api.Test; @MicronautTest -@Property(name = "micronaut.security.gitlab.enabled", value = "false") -class SchemaIntegrationTest extends AbstractIntegrationSchemaRegistryTest { +class SchemaIntegrationTest extends SchemaRegistryIntegrationTest { @Inject private ApplicationContext applicationContext; @@ -52,7 +51,7 @@ class SchemaIntegrationTest extends AbstractIntegrationSchemaRegistryTest { @BeforeAll void init() { // Create HTTP client as bean to load client configuration from application.yml - schemaRegistryClient = applicationContext.createBean(HttpClient.class, schemaRegistryContainer.getUrl()); + schemaRegistryClient = applicationContext.createBean(HttpClient.class, getSchemaRegistryUrl()); Namespace namespace = Namespace.builder() .metadata(Metadata.builder() diff --git a/src/test/java/com/michelin/ns4kafka/integration/StreamIntegrationTest.java b/src/test/java/com/michelin/ns4kafka/integration/StreamIntegrationTest.java index 63770089..3faf89d7 100644 --- a/src/test/java/com/michelin/ns4kafka/integration/StreamIntegrationTest.java +++ b/src/test/java/com/michelin/ns4kafka/integration/StreamIntegrationTest.java @@ -4,6 +4,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import com.michelin.ns4kafka.integration.TopicIntegrationTest.BearerAccessRefreshToken; +import com.michelin.ns4kafka.integration.container.KafkaIntegrationTest; import com.michelin.ns4kafka.model.AccessControlEntry; import com.michelin.ns4kafka.model.AccessControlEntry.AccessControlEntrySpec; import com.michelin.ns4kafka.model.AccessControlEntry.Permission; @@ -15,7 +16,6 @@ import com.michelin.ns4kafka.model.Namespace.NamespaceSpec; import com.michelin.ns4kafka.service.executor.AccessControlEntryAsyncExecutor; import com.michelin.ns4kafka.validation.TopicValidator; -import io.micronaut.context.annotation.Property; import io.micronaut.http.HttpMethod; import io.micronaut.http.HttpRequest; import io.micronaut.http.HttpResponse; @@ -36,8 +36,7 @@ import org.junit.jupiter.api.Test; @MicronautTest -@Property(name = "micronaut.security.gitlab.enabled", value = "false") -class StreamIntegrationTest extends AbstractIntegrationTest { +class StreamIntegrationTest extends KafkaIntegrationTest { @Inject @Client("/") HttpClient ns4KafkaClient; @@ -144,14 +143,14 @@ void shouldVerifyCreationOfAcls() throws InterruptedException, ExecutionExceptio .get(); var aclTransactionalId = kafkaClient.describeAcls( - new AclBindingFilter( - new ResourcePatternFilter( - org.apache.kafka.common.resource.ResourceType.TRANSACTIONAL_ID, - stream.getMetadata().getName(), - PatternType.PREFIXED - ), - AccessControlEntryFilter.ANY - )) + new AclBindingFilter( + new ResourcePatternFilter( + org.apache.kafka.common.resource.ResourceType.TRANSACTIONAL_ID, + stream.getMetadata().getName(), + PatternType.PREFIXED + ), + AccessControlEntryFilter.ANY + )) .values() .get(); diff --git a/src/test/java/com/michelin/ns4kafka/integration/TopicIntegrationTest.java b/src/test/java/com/michelin/ns4kafka/integration/TopicIntegrationTest.java index e8e42d4f..f1c7f4c0 100644 --- a/src/test/java/com/michelin/ns4kafka/integration/TopicIntegrationTest.java +++ b/src/test/java/com/michelin/ns4kafka/integration/TopicIntegrationTest.java @@ -8,6 +8,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.michelin.ns4kafka.controller.AkhqClaimProviderController; +import com.michelin.ns4kafka.integration.container.KafkaIntegrationTest; import com.michelin.ns4kafka.model.AccessControlEntry; import com.michelin.ns4kafka.model.AccessControlEntry.AccessControlEntrySpec; import com.michelin.ns4kafka.model.AccessControlEntry.Permission; @@ -28,7 +29,6 @@ import com.michelin.ns4kafka.model.Topic.TopicSpec; import com.michelin.ns4kafka.service.executor.TopicAsyncExecutor; import com.michelin.ns4kafka.validation.TopicValidator; -import io.micronaut.context.annotation.Property; import io.micronaut.core.type.Argument; import io.micronaut.http.HttpMethod; import io.micronaut.http.HttpRequest; @@ -56,8 +56,7 @@ import org.junit.jupiter.api.Test; @MicronautTest -@Property(name = "micronaut.security.gitlab.enabled", value = "false") -class TopicIntegrationTest extends AbstractIntegrationTest { +class TopicIntegrationTest extends KafkaIntegrationTest { @Inject @Client("/") HttpClient ns4KafkaClient; @@ -262,7 +261,7 @@ void shouldCreateTopic() throws InterruptedException, ExecutionException { assertEquals("created", response.header("X-Ns4kafka-Result")); - topicAsyncExecutorList.forEach(TopicAsyncExecutor::run); + forceTopicSynchronization(); Admin kafkaClient = getAdminClient(); @@ -318,7 +317,7 @@ void shouldUpdateTopic() throws InterruptedException, ExecutionException { assertEquals("created", response.header("X-Ns4kafka-Result")); - topicAsyncExecutorList.forEach(TopicAsyncExecutor::run); + forceTopicSynchronization(); response = ns4KafkaClient .toBlocking() @@ -353,8 +352,7 @@ void shouldUpdateTopic() throws InterruptedException, ExecutionException { assertEquals("changed", response.header("X-Ns4kafka-Result")); - // Force Topic Sync - topicAsyncExecutorList.forEach(TopicAsyncExecutor::run); + forceTopicSynchronization(); Admin kafkaClient = getAdminClient(); @@ -495,7 +493,7 @@ void shouldUpdateTopicWithNoChange() { } @Test - void shouldDeleteRecords() { + void shouldDeleteRecords() throws InterruptedException { Topic topicToDelete = Topic.builder() .metadata(Metadata.builder() .name("ns1-topicToDelete") @@ -519,7 +517,7 @@ void shouldDeleteRecords() { assertEquals("created", response.header("X-Ns4kafka-Result")); - topicAsyncExecutorList.forEach(TopicAsyncExecutor::run); + forceTopicSynchronization(); List deleteRecordsResponse = ns4KafkaClient .toBlocking() @@ -564,7 +562,7 @@ void shouldDeleteRecords() { } @Test - void shouldDeleteRecordsOnCompactTopic() { + void shouldDeleteRecordsOnCompactTopic() throws InterruptedException { Topic topicToDelete = Topic.builder() .metadata(Metadata.builder() .name("ns1-compactTopicToDelete") @@ -589,7 +587,7 @@ void shouldDeleteRecordsOnCompactTopic() { assertEquals("created", response.header("X-Ns4kafka-Result")); - topicAsyncExecutorList.forEach(TopicAsyncExecutor::run); + forceTopicSynchronization(); HttpClientResponseException exception = assertThrows(HttpClientResponseException.class, () -> ns4KafkaClient @@ -601,6 +599,13 @@ void shouldDeleteRecordsOnCompactTopic() { assertEquals(HttpStatus.UNPROCESSABLE_ENTITY, exception.getStatus()); } + private void forceTopicSynchronization() throws InterruptedException { + topicAsyncExecutorList.forEach(TopicAsyncExecutor::run); + + // Wait for topics to be updated in Kafka broker + Thread.sleep(2000); + } + @Data @NoArgsConstructor @AllArgsConstructor diff --git a/src/test/java/com/michelin/ns4kafka/integration/UserIntegrationTest.java b/src/test/java/com/michelin/ns4kafka/integration/UserIntegrationTest.java index c3f5c7f6..e78c227f 100644 --- a/src/test/java/com/michelin/ns4kafka/integration/UserIntegrationTest.java +++ b/src/test/java/com/michelin/ns4kafka/integration/UserIntegrationTest.java @@ -5,6 +5,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import com.michelin.ns4kafka.integration.container.KafkaIntegrationTest; import com.michelin.ns4kafka.model.KafkaUserResetPassword; import com.michelin.ns4kafka.model.Metadata; import com.michelin.ns4kafka.model.Namespace; @@ -12,7 +13,6 @@ import com.michelin.ns4kafka.model.quota.ResourceQuota; import com.michelin.ns4kafka.service.executor.UserAsyncExecutor; import com.michelin.ns4kafka.validation.TopicValidator; -import io.micronaut.context.annotation.Property; import io.micronaut.http.HttpMethod; import io.micronaut.http.HttpRequest; import io.micronaut.http.HttpResponse; @@ -35,8 +35,7 @@ import org.junit.jupiter.api.Test; @MicronautTest -@Property(name = "micronaut.security.gitlab.enabled", value = "false") -class UserIntegrationTest extends AbstractIntegrationTest { +class UserIntegrationTest extends KafkaIntegrationTest { @Inject @Client("/") HttpClient ns4KafkaClient; diff --git a/src/test/java/com/michelin/ns4kafka/integration/container/KafkaConnectIntegrationTest.java b/src/test/java/com/michelin/ns4kafka/integration/container/KafkaConnectIntegrationTest.java new file mode 100644 index 00000000..37499576 --- /dev/null +++ b/src/test/java/com/michelin/ns4kafka/integration/container/KafkaConnectIntegrationTest.java @@ -0,0 +1,57 @@ +package com.michelin.ns4kafka.integration.container; + +import io.micronaut.core.annotation.NonNull; +import java.util.HashMap; +import java.util.Map; +import org.junit.jupiter.api.TestInstance; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerImageName; + +/** + * Base class for Kafka Connect integration tests. + */ +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public abstract class KafkaConnectIntegrationTest extends KafkaIntegrationTest { + private final GenericContainer connect = new GenericContainer<>(DockerImageName + .parse("confluentinc/cp-kafka-connect:" + CONFLUENT_PLATFORM_VERSION)) + .withNetwork(NETWORK) + .withNetworkAliases("connect") + .withExposedPorts(8083) + .withEnv("CONNECT_BOOTSTRAP_SERVERS", "broker:9092") + .withEnv("CONNECT_REST_ADVERTISED_HOST_NAME", "connect") + .withEnv("CONNECT_GROUP_ID", "compose-connect-group") + .withEnv("CONNECT_CONFIG_STORAGE_TOPIC", "docker-connect-configs") + .withEnv("CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR", "1") + .withEnv("CONNECT_OFFSET_STORAGE_TOPIC", "docker-connect-offsets") + .withEnv("CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR", "1") + .withEnv("CONNECT_STATUS_STORAGE_TOPIC", "docker-connect-status") + .withEnv("CONNECT_STATUS_STORAGE_REPLICATION_FACTOR", "1") + .withEnv("CONNECT_KEY_CONVERTER", "org.apache.kafka.connect.json.JsonConverter") + .withEnv("CONNECT_VALUE_CONVERTER", "org.apache.kafka.connect.json.JsonConverter") + .withEnv("CONNECT_PLUGIN_PATH", "/usr/share/java,/usr/share/filestream-connectors") + .withEnv("CONNECT_LOG4J_LOGGERS", "org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR") + .withEnv("CONNECT_SASL_MECHANISM", "PLAIN") + .withEnv("CONNECT_SECURITY_PROTOCOL", "SASL_PLAINTEXT") + .withEnv("CONNECT_SASL_JAAS_CONFIG", + "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";") + .waitingFor(Wait.forHttp("/").forStatusCode(200)); + + @NonNull + @Override + public Map getProperties() { + Map brokerProperties = super.getProperties(); + + if (!connect.isRunning()) { + connect.start(); + } + + Map properties = new HashMap<>(brokerProperties); + properties.put("ns4kafka.managed-clusters.test-cluster.connects.test-connect.url", getConnectUrl()); + return properties; + } + + protected String getConnectUrl() { + return "http://" + connect.getHost() + ":" + connect.getFirstMappedPort(); + } +} diff --git a/src/test/java/com/michelin/ns4kafka/integration/container/KafkaIntegrationTest.java b/src/test/java/com/michelin/ns4kafka/integration/container/KafkaIntegrationTest.java new file mode 100644 index 00000000..1f47c960 --- /dev/null +++ b/src/test/java/com/michelin/ns4kafka/integration/container/KafkaIntegrationTest.java @@ -0,0 +1,89 @@ +package com.michelin.ns4kafka.integration.container; + +import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; + +import io.micronaut.core.annotation.NonNull; +import io.micronaut.test.support.TestPropertyProvider; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import org.apache.kafka.clients.admin.Admin; +import org.junit.jupiter.api.TestInstance; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.utility.DockerImageName; + +/** + * Base class for Kafka integration tests. + */ +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public abstract class KafkaIntegrationTest implements TestPropertyProvider { + protected static final String CONFLUENT_PLATFORM_VERSION = "7.7.0"; + protected static final Network NETWORK = Network.newNetwork(); + private Admin adminClient; + + protected final KafkaContainer broker = new KafkaContainer(DockerImageName + .parse("confluentinc/cp-kafka:" + CONFLUENT_PLATFORM_VERSION)) + .withNetwork(NETWORK) + .withNetworkAliases("broker") + .withCreateContainerCmdModifier(modifier -> { + List commands = new ArrayList<>(Arrays.asList(Objects.requireNonNull(modifier.getCmd()))); + // Override the security protocol for CONTROLLER listener to SASL_PLAINTEXT defined by + // KafkaContainer#withRaft. + commands.set(1, "export KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=BROKER:SASL_PLAINTEXT," + + "PLAINTEXT:SASL_PLAINTEXT,CONTROLLER:SASL_PLAINTEXT; " + commands.get(1)); + modifier.withCmd(commands); + }) + .withEnv("KAFKA_SASL_ENABLED_MECHANISMS", "PLAIN") + .withEnv("KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL", "PLAIN") + .withEnv("KAFKA_SASL_MECHANISM_CONTROLLER_PROTOCOL", "PLAIN") + .withEnv("KAFKA_LISTENER_NAME_BROKER_PLAIN_SASL_JAAS_CONFIG", getSaslPlainJaasConfig()) + .withEnv("KAFKA_LISTENER_NAME_CONTROLLER_PLAIN_SASL_JAAS_CONFIG", getSaslPlainJaasConfig()) + .withEnv("KAFKA_LISTENER_NAME_PLAINTEXT_PLAIN_SASL_JAAS_CONFIG", getSaslPlainJaasConfig()) + .withEnv("KAFKA_SUPER_USERS", "User:admin") + .withEnv("KAFKA_AUTHORIZER_CLASS_NAME", "org.apache.kafka.metadata.authorizer.StandardAuthorizer") + .withEnv("KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND", "false") + .withKraft(); + + @NonNull + @Override + public Map getProperties() { + if (!broker.isRunning()) { + broker.start(); + } + + return Map.of( + "kafka." + BOOTSTRAP_SERVERS_CONFIG, broker.getBootstrapServers(), + "ns4kafka.managed-clusters.test-cluster.config." + BOOTSTRAP_SERVERS_CONFIG, broker.getBootstrapServers() + ); + } + + /** + * Get the JAAS config. + * + * @return The JAAS config + */ + private static String getSaslPlainJaasConfig() { + return "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\"" + + "user_admin=\"admin\";"; + } + + /** + * Getter for admin client service. + * + * @return The admin client + */ + public Admin getAdminClient() { + if (adminClient == null) { + adminClient = Admin.create(Map.of( + "bootstrap.servers", broker.getBootstrapServers(), + "sasl.mechanism", "PLAIN", + "security.protocol", "SASL_PLAINTEXT", + "sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule " + + "required username=\"admin\" password=\"admin\";")); + } + return adminClient; + } +} diff --git a/src/test/java/com/michelin/ns4kafka/integration/container/SchemaRegistryIntegrationTest.java b/src/test/java/com/michelin/ns4kafka/integration/container/SchemaRegistryIntegrationTest.java new file mode 100644 index 00000000..bb610891 --- /dev/null +++ b/src/test/java/com/michelin/ns4kafka/integration/container/SchemaRegistryIntegrationTest.java @@ -0,0 +1,47 @@ +package com.michelin.ns4kafka.integration.container; + +import io.micronaut.core.annotation.NonNull; +import java.util.HashMap; +import java.util.Map; +import org.junit.jupiter.api.TestInstance; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerImageName; + +/** + * Base class for Schema Registry integration tests. + */ +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public abstract class SchemaRegistryIntegrationTest extends KafkaIntegrationTest { + private final GenericContainer schemaRegistry = new GenericContainer<>(DockerImageName + .parse("confluentinc/cp-schema-registry:" + CONFLUENT_PLATFORM_VERSION)) + .withNetwork(NETWORK) + .withNetworkAliases("schema-registry") + .withExposedPorts(8081) + .withEnv("SCHEMA_REGISTRY_HOST_NAME", "schema-registry") + .withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8081") + .withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "broker:9092") + .withEnv("SCHEMA_REGISTRY_KAFKASTORE_SASL_MECHANISM", "PLAIN") + .withEnv("SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL", "SASL_PLAINTEXT") + .withEnv("SCHEMA_REGISTRY_KAFKASTORE_SASL_JAAS_CONFIG", + "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";") + .waitingFor(Wait.forHttp("/subjects").forStatusCode(200)); + + @NonNull + @Override + public Map getProperties() { + Map brokerProperties = super.getProperties(); + + if (!schemaRegistry.isRunning()) { + schemaRegistry.start(); + } + + Map properties = new HashMap<>(brokerProperties); + properties.put("ns4kafka.managed-clusters.test-cluster.schemaRegistry.url", getSchemaRegistryUrl()); + return properties; + } + + protected String getSchemaRegistryUrl() { + return "http://" + schemaRegistry.getHost() + ":" + schemaRegistry.getFirstMappedPort(); + } +} diff --git a/src/test/java/com/michelin/ns4kafka/testcontainers/KafkaConnectContainer.java b/src/test/java/com/michelin/ns4kafka/testcontainers/KafkaConnectContainer.java deleted file mode 100644 index 0822022a..00000000 --- a/src/test/java/com/michelin/ns4kafka/testcontainers/KafkaConnectContainer.java +++ /dev/null @@ -1,102 +0,0 @@ -package com.michelin.ns4kafka.testcontainers; - -import static java.lang.String.format; - -import java.time.Duration; -import java.util.UUID; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.wait.strategy.Wait; -import org.testcontainers.utility.DockerImageName; - -/** - * This file is a slight adaptation of the KafkaConnectContainer code. - * Available on ydespreaux's GitHub account. - * - * @see KafkaConnectContainer.java - */ -public class KafkaConnectContainer extends GenericContainer { - public static final int CONNECT_REST_PORT_INTERNAL = 8083; - public static final String GROUP_ID_CONFIG = "CONNECT_GROUP_ID"; - public static final String OFFSET_STORAGE_TOPIC_CONFIG = "CONNECT_OFFSET_STORAGE_TOPIC"; - public static final String OFFSET_STORAGE_PARTITIONS_CONFIG = "CONNECT_OFFSET_STORAGE_PARTITIONS"; - public static final String CONFIG_STORAGE_TOPIC_CONFIG = "CONNECT_CONFIG_STORAGE_TOPIC"; - public static final String STATUS_STORAGE_TOPIC_CONFIG = "CONNECT_STATUS_STORAGE_TOPIC"; - public static final String STATUS_STORAGE_PARTITIONS_CONFIG = "CONNECT_STATUS_STORAGE_PARTITIONS"; - public static final String KEY_CONVERTER_CONFIG = "CONNECT_KEY_CONVERTER"; - public static final String VALUE_CONVERTER_CONFIG = "CONNECT_VALUE_CONVERTER"; - private static final String PLUGIN_PATH_CONTAINER = "/usr/share/java,/usr/share/filestream-connectors"; - private static final String GROUP_ID_DEFAULT_VALUE = "kafka-connect-group"; - private static final String OFFSET_STORAGE_TOPIC_DEFAULT_VALUE = "connect-offsets"; - private static final Integer OFFSET_STORAGE_PARTITIONS_DEFAULT_VALUE = 3; - private static final String OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG = "CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR"; - private static final Integer OFFSET_STORAGE_REPLICATION_FACTOR_DEFAULT_VALUE = 1; - private static final String CONFIG_STORAGE_TOPIC_DEFAULT_VALUE = "connect-config"; - private static final String CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG = "CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR"; - private static final Integer CONFIG_STORAGE_REPLICATION_FACTOR_DEFAULT_VALUE = 1; - private static final String STATUS_STORAGE_TOPIC_DEFAULT_VALUE = "connect-status"; - private static final Integer STATUS_STORAGE_PARTITIONS_DEFAULT_VALUE = 3; - private static final String STATUS_STORAGE_REPLICATION_FACTOR_CONFIG = "CONNECT_STATUS_STORAGE_REPLICATION_FACTOR"; - private static final Integer STATUS_STORAGE_REPLICATION_FACTOR_DEFAULT_VALUE = 1; - private static final String KEY_CONVERTER_DEFAULT_VALUE = "org.apache.kafka.connect.json.JsonConverter"; - private static final String VALUE_CONVERTER_DEFAULT_VALUE = "org.apache.kafka.connect.json.JsonConverter"; - private static final String INTERNAL_KEY_CONVERTER_CONFIG = "CONNECT_INTERNAL_KEY_CONVERTER"; - private static final String INTERNAL_KEY_CONVERTER_DEFAULT_VALUE = "org.apache.kafka.connect.json.JsonConverter"; - private static final String INTERNAL_VALUE_CONVERTER_CONFIG = "CONNECT_INTERNAL_VALUE_CONVERTER"; - private static final String INTERNAL_VALUE_CONVERTER_DEFAULT_VALUE = "org.apache.kafka.connect.json.JsonConverter"; - private final String bootstrapServers; - - /** - * Constructor. - * - * @param dockerImageName The docker image name - * @param bootstrapServers The bootstrap servers - */ - public KafkaConnectContainer(DockerImageName dockerImageName, String bootstrapServers) { - super(dockerImageName); - this.bootstrapServers = bootstrapServers; - this.withEnv(GROUP_ID_CONFIG, GROUP_ID_DEFAULT_VALUE) - .withEnv(KEY_CONVERTER_CONFIG, KEY_CONVERTER_DEFAULT_VALUE) - .withEnv(VALUE_CONVERTER_CONFIG, VALUE_CONVERTER_DEFAULT_VALUE) - .withEnv(OFFSET_STORAGE_TOPIC_CONFIG, OFFSET_STORAGE_TOPIC_DEFAULT_VALUE) - .withEnv(OFFSET_STORAGE_PARTITIONS_CONFIG, String.valueOf(OFFSET_STORAGE_PARTITIONS_DEFAULT_VALUE)) - .withEnv(OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG, - String.valueOf(OFFSET_STORAGE_REPLICATION_FACTOR_DEFAULT_VALUE)) - .withEnv(CONFIG_STORAGE_TOPIC_CONFIG, CONFIG_STORAGE_TOPIC_DEFAULT_VALUE) - .withEnv(CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG, - String.valueOf(CONFIG_STORAGE_REPLICATION_FACTOR_DEFAULT_VALUE)) - .withEnv(STATUS_STORAGE_TOPIC_CONFIG, STATUS_STORAGE_TOPIC_DEFAULT_VALUE) - .withEnv(STATUS_STORAGE_PARTITIONS_CONFIG, String.valueOf(STATUS_STORAGE_PARTITIONS_DEFAULT_VALUE)) - .withEnv(STATUS_STORAGE_REPLICATION_FACTOR_CONFIG, - String.valueOf(STATUS_STORAGE_REPLICATION_FACTOR_DEFAULT_VALUE)) - .withEnv(INTERNAL_KEY_CONVERTER_CONFIG, INTERNAL_KEY_CONVERTER_DEFAULT_VALUE) - .withEnv(INTERNAL_VALUE_CONVERTER_CONFIG, INTERNAL_VALUE_CONVERTER_DEFAULT_VALUE); - - withExposedPorts(CONNECT_REST_PORT_INTERNAL); - withNetworkAliases("kafka-connect"); - waitingFor(Wait.forHttp("/").withStartupTimeout(Duration.ofSeconds(120L))); - } - - /** - * Configure the container. - */ - @Override - protected void configure() { - super.configure(); - this.withEnv("CONNECT_BOOTSTRAP_SERVERS", this.bootstrapServers) - .withEnv("CONNECT_REST_ADVERTISED_HOST_NAME", "kafka-connect") - .withEnv("CONNECT_PLUGIN_PATH", PLUGIN_PATH_CONTAINER) - .withEnv("CONNECT_LOG4J_LOGGERS", "org.reflections=ERROR") - .withEnv("CONNECT_REST_PORT", String.valueOf(CONNECT_REST_PORT_INTERNAL)) - .withCreateContainerCmdModifier(createContainerCmd -> createContainerCmd.withName( - "testcontainsers-kafka-connect-" + UUID.randomUUID())); - } - - /** - * Get the url of Kafka Connect. - * - * @return The URL - */ - public String getUrl() { - return format("http://%s:%d", this.getHost(), getMappedPort(CONNECT_REST_PORT_INTERNAL)); - } -} diff --git a/src/test/java/com/michelin/ns4kafka/testcontainers/SchemaRegistryContainer.java b/src/test/java/com/michelin/ns4kafka/testcontainers/SchemaRegistryContainer.java deleted file mode 100644 index 488f859a..00000000 --- a/src/test/java/com/michelin/ns4kafka/testcontainers/SchemaRegistryContainer.java +++ /dev/null @@ -1,45 +0,0 @@ -package com.michelin.ns4kafka.testcontainers; - - -import static java.lang.String.format; - -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.wait.strategy.Wait; -import org.testcontainers.utility.DockerImageName; - -/** - * This class is a testcontainers implementation for the - * https://docs.confluent.io/current/schema-registry/index.html - * Docker container. - */ -public class SchemaRegistryContainer extends GenericContainer { - private static final int SCHEMA_REGISTRY_INTERNAL_PORT = 8081; - - /** - * Constructor. - * - * @param dockerImageName The Docker image name of the Schema Registry - * @param bootstrapServers The bootstrap servers URL - */ - public SchemaRegistryContainer(DockerImageName dockerImageName, String bootstrapServers) { - super(dockerImageName); - - this - .withEnv("SCHEMA_REGISTRY_HOST_NAME", "localhost") - .withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", bootstrapServers); - - withExposedPorts(SCHEMA_REGISTRY_INTERNAL_PORT); - withNetworkAliases("schema-registry"); - waitingFor(Wait.forHttp("/subjects")); - } - - /** - * Get the current URL of the schema registry. - * - * @return The current URL of the schema registry - */ - public String getUrl() { - return format("http://%s:%d", this.getHost(), this.getMappedPort(SCHEMA_REGISTRY_INTERNAL_PORT)); - } -} - diff --git a/src/test/resources/application-test.yml b/src/test/resources/application-test.yml index 7a328ccd..48e9fcf3 100644 --- a/src/test/resources/application-test.yml +++ b/src/test/resources/application-test.yml @@ -1,7 +1,13 @@ +micronaut: + security: + gitlab: + enabled: false + kafka: - embedded: - enabled: false - bootstrap.servers: "localhost:9092" + bootstrap.servers: "localhost:9092" # Replaced by Testcontainers + sasl.mechanism: "PLAIN" + security.protocol: "SASL_PLAINTEXT" + sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";" ns4kafka: security: @@ -22,10 +28,11 @@ ns4kafka: manage-acls: true manage-topics: true manage-connectors: true - connectsS: - test-connect: "toto" + config: + bootstrap.servers: "localhost:9092" # Replaced by Testcontainers + sasl.mechanism: "PLAIN" + security.protocol: "SASL_PLAINTEXT" + sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";" connects: test-connect: - url: "localhost:8083" - config: - bootstrap.servers: "localhost:9092" + url: "localhost:8083" # Replaced by Testcontainers \ No newline at end of file