Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate integration tests to KRaft broker #434

Merged
merged 1 commit into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions .docker/config/broker/kafka_server_jaas.conf
Original file line number Diff line number Diff line change
@@ -1,10 +1,4 @@
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin" password="admin"
user_admin="admin";
};

KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin" password="admin";
};
6 changes: 3 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ dependencies {
runtimeOnly("ch.qos.logback:logback-classic")

testImplementation("org.mockito:mockito-core")
testImplementation("org.testcontainers:junit-jupiter")
testImplementation("org.testcontainers:testcontainers")
testImplementation("org.testcontainers:kafka")
testImplementation("org.testcontainers:junit-jupiter:1.20.0")
testImplementation("org.testcontainers:testcontainers:1.20.0")
testImplementation("org.testcontainers:kafka:1.20.0")
testImplementation("org.mockito:mockito-junit-jupiter:5.12.0")
testImplementation("org.junit.jupiter:junit-jupiter-params:5.10.3")
testImplementation("io.projectreactor:reactor-test")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,25 +41,30 @@
*/
@Slf4j
public abstract class KafkaStore<T> {
private final Map<String, T> store;
private final AtomicBoolean initialized = new AtomicBoolean(false);
private final ReentrantLock offsetUpdateLock;
private final Condition offsetReachedThreshold;
@Inject
ApplicationContext applicationContext;

@Inject
AdminClient adminClient;

@Inject
KafkaStoreProperties kafkaStoreProperties;

@Inject
@Named(TaskExecutors.SCHEDULED)
TaskScheduler taskScheduler;

@Property(name = "ns4kafka.store.kafka.init-timeout")
int initTimeout;

private final Map<String, T> store;
private final AtomicBoolean initialized = new AtomicBoolean(false);
private final ReentrantLock offsetUpdateLock;
private final Condition offsetReachedThreshold;
String kafkaTopic;
Producer<String, T> kafkaProducer;
long offsetInSchemasTopic = -1;
long lastWrittenOffset = -1;
@Property(name = "ns4kafka.store.kafka.init-timeout")
int initTimeout;

KafkaStore(String kafkaTopic, Producer<String, T> kafkaProducer) {
this.kafkaTopic = kafkaTopic;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,19 @@ public void synchronizeTopics() {

Map<ConfigResource, Collection<AlterConfigOp>> updateTopics = checkTopics.stream()
.map(topic -> {
Map<String, String> actualConf =
brokerTopics.get(topic.getMetadata().getName()).getSpec().getConfigs();
Map<String, String> expectedConf =
topic.getSpec().getConfigs() == null ? Map.of() : topic.getSpec().getConfigs();
Map<String, String> actualConf = brokerTopics.get(topic.getMetadata().getName())
.getSpec()
.getConfigs();

Map<String, String> expectedConf = topic.getSpec().getConfigs() == null
? Map.of() : topic.getSpec().getConfigs();

Collection<AlterConfigOp> topicConfigChanges = computeConfigChanges(expectedConf, actualConf);
if (!topicConfigChanges.isEmpty()) {
ConfigResource cr =
new ConfigResource(ConfigResource.Type.TOPIC, topic.getMetadata().getName());
ConfigResource cr = new ConfigResource(
ConfigResource.Type.TOPIC,
topic.getMetadata().getName()
);
return Map.entry(cr, topicConfigChanges);
}
return null;
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.michelin.ns4kafka.integration.TopicIntegrationTest.BearerAccessRefreshToken;
import com.michelin.ns4kafka.integration.container.KafkaIntegrationTest;
import com.michelin.ns4kafka.model.AccessControlEntry;
import com.michelin.ns4kafka.model.AccessControlEntry.AccessControlEntrySpec;
import com.michelin.ns4kafka.model.AccessControlEntry.Permission;
Expand All @@ -21,7 +22,6 @@
import com.michelin.ns4kafka.model.RoleBinding.Verb;
import com.michelin.ns4kafka.service.executor.AccessControlEntryAsyncExecutor;
import com.michelin.ns4kafka.validation.TopicValidator;
import io.micronaut.context.annotation.Property;
import io.micronaut.http.HttpMethod;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
Expand All @@ -47,8 +47,7 @@
import org.junit.jupiter.api.Test;

@MicronautTest
@Property(name = "micronaut.security.gitlab.enabled", value = "false")
class AclIntegrationTest extends AbstractIntegrationTest {
class AclIntegrationTest extends KafkaIntegrationTest {
@Inject
@Client("/")
HttpClient ns4KafkaClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
import static org.junit.jupiter.api.Assertions.assertEquals;

import com.michelin.ns4kafka.controller.ApiResourcesController;
import com.michelin.ns4kafka.integration.container.KafkaIntegrationTest;
import com.michelin.ns4kafka.model.Metadata;
import com.michelin.ns4kafka.model.Namespace;
import com.michelin.ns4kafka.model.RoleBinding;
import com.michelin.ns4kafka.validation.TopicValidator;
import io.micronaut.context.annotation.Property;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpMethod;
import io.micronaut.http.HttpRequest;
Expand All @@ -21,8 +21,7 @@
import org.junit.jupiter.api.Test;

@MicronautTest
@Property(name = "micronaut.security.gitlab.enabled", value = "false")
class ApiResourcesIntegrationTest extends AbstractIntegrationTest {
class ApiResourcesIntegrationTest extends KafkaIntegrationTest {
@Inject
@Client("/")
HttpClient ns4KafkaClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.michelin.ns4kafka.integration.TopicIntegrationTest.BearerAccessRefreshToken;
import com.michelin.ns4kafka.integration.container.KafkaConnectIntegrationTest;
import com.michelin.ns4kafka.model.AccessControlEntry;
import com.michelin.ns4kafka.model.AccessControlEntry.AccessControlEntrySpec;
import com.michelin.ns4kafka.model.AccessControlEntry.Permission;
Expand All @@ -32,7 +33,6 @@
import com.michelin.ns4kafka.validation.ConnectValidator;
import com.michelin.ns4kafka.validation.TopicValidator;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Property;
import io.micronaut.http.HttpMethod;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
Expand All @@ -53,8 +53,7 @@

@Slf4j
@MicronautTest
@Property(name = "micronaut.security.gitlab.enabled", value = "false")
class ConnectIntegrationTest extends AbstractIntegrationConnectTest {
class ConnectorIntegrationTest extends KafkaConnectIntegrationTest {
@Inject
private ApplicationContext applicationContext;

Expand All @@ -75,7 +74,7 @@ class ConnectIntegrationTest extends AbstractIntegrationConnectTest {
@BeforeAll
void init() {
// Create HTTP client as bean to load client configuration from application.yml
connectClient = applicationContext.createBean(HttpClient.class, connectContainer.getUrl());
connectClient = applicationContext.createBean(HttpClient.class, getConnectUrl());

Namespace namespace = Namespace.builder()
.metadata(Metadata.builder()
Expand Down Expand Up @@ -181,7 +180,7 @@ void shouldGetConnectClusterVersion() {
.toBlocking()
.retrieve(HttpRequest.GET("/"), ServerInfo.class);

assertEquals("7.4.1-ccs", actual.version());
assertEquals("7.7.0-ccs", actual.version());
}

@Test
Expand Down Expand Up @@ -470,7 +469,7 @@ void shouldRestartConnector() throws InterruptedException {
assertEquals(HttpStatus.OK, restartResponse.status());

waitForConnectorAndTasksToBeInState("ns1-co1", Connector.TaskState.RUNNING);

ConnectorStateInfo actual = connectClient
.toBlocking()
.retrieve(HttpRequest.GET("/connectors/ns1-co1/status"), ConnectorStateInfo.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.michelin.ns4kafka.integration.TopicIntegrationTest.BearerAccessRefreshToken;
import com.michelin.ns4kafka.integration.container.KafkaIntegrationTest;
import com.michelin.ns4kafka.model.AccessControlEntry;
import com.michelin.ns4kafka.model.AccessControlEntry.AccessControlEntrySpec;
import com.michelin.ns4kafka.model.AccessControlEntry.Permission;
Expand All @@ -23,7 +24,6 @@
import com.michelin.ns4kafka.model.Topic;
import com.michelin.ns4kafka.model.Topic.TopicSpec;
import com.michelin.ns4kafka.validation.TopicValidator;
import io.micronaut.context.annotation.Property;
import io.micronaut.http.HttpMethod;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
Expand All @@ -41,8 +41,7 @@
import org.junit.jupiter.api.Test;

@MicronautTest
@Property(name = "micronaut.security.gitlab.enabled", value = "false")
class ExceptionHandlerIntegrationTest extends AbstractIntegrationTest {
class ExceptionHandlerIntegrationTest extends KafkaIntegrationTest {
@Inject
@Client("/")
HttpClient ns4KafkaClient;
Expand Down
Loading