Skip to content

Commit

Permalink
Fix restart operation not restarting all connector tasks (#448)
Browse files Browse the repository at this point in the history
  • Loading branch information
loicgreffier authored Sep 30, 2024
1 parent 5286f98 commit e0afe72
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 26 deletions.
22 changes: 8 additions & 14 deletions src/main/java/com/michelin/ns4kafka/service/ConnectorService.java
Original file line number Diff line number Diff line change
Expand Up @@ -256,20 +256,14 @@ public Flux<Connector> listUnsynchronizedConnectors(Namespace namespace) {
public Mono<HttpResponse<Void>> restart(Namespace namespace, Connector connector) {
return kafkaConnectClient.status(namespace.getMetadata().getCluster(), connector.getSpec().getConnectCluster(),
connector.getMetadata().getName())
.flatMap(status -> {
Flux<HttpResponse<Void>> responses = Flux.fromIterable(status.tasks())
.flatMap(task -> kafkaConnectClient.restart(namespace.getMetadata().getCluster(),
connector.getSpec().getConnectCluster(), connector.getMetadata().getName(), task.getId()))
.map(response -> {
log.info("Success restarting connector [{}] on namespace [{}] connect [{}]",
connector.getMetadata().getName(),
namespace.getMetadata().getName(),
connector.getSpec().getConnectCluster());
return HttpResponse.ok();
});

return Mono.from(responses);
});
.flatMap(status -> Flux.fromIterable(status.tasks())
.flatMap(task -> kafkaConnectClient.restart(namespace.getMetadata().getCluster(),
connector.getSpec().getConnectCluster(), connector.getMetadata().getName(), task.getId()))
.doOnNext(restart -> log.info("Success restarting connector [{}] on namespace [{}] connect [{}]",
connector.getMetadata().getName(),
namespace.getMetadata().getName(),
connector.getSpec().getConnectCluster()))
.then(Mono.just(HttpResponse.ok())));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,17 @@
@Singleton
public class KafkaConnectClient {
private static final String CONNECTORS = "/connectors/";

@Inject
ConnectClusterRepository connectClusterRepository;

@Inject
@Client(id = "kafka-connect")
private HttpClient httpClient;

@Inject
private List<ManagedClusterProperties> managedClusterProperties;

@Inject
private SecurityProperties securityProperties;

Expand Down Expand Up @@ -222,7 +226,8 @@ public Mono<HttpResponse<Void>> resume(String kafkaCluster, String connectCluste
* @return The Kafka Connect configuration
*/
public KafkaConnectClient.KafkaConnectHttpConfig getKafkaConnectConfig(String kafkaCluster, String connectCluster) {
Optional<ManagedClusterProperties> config = managedClusterProperties.stream()
Optional<ManagedClusterProperties> config = managedClusterProperties
.stream()
.filter(kafkaAsyncExecutorConfig -> kafkaAsyncExecutorConfig.getName().equals(kafkaCluster))
.findFirst();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public ConnectorState(@JsonProperty("state") String state, @JsonProperty("worker
public static class TaskState extends AbstractState implements Comparable<TaskState> {
private final int id;

public TaskState(@JsonProperty("id") int id, @JsonProperty("state") String state,
public TaskState(@JsonProperty("id") int id,
@JsonProperty("state") String state,
@JsonProperty("worker_id") String worker,
@JsonProperty("msg") String msg) {
super(state, worker, msg);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package com.michelin.ns4kafka.service;

import static com.michelin.ns4kafka.service.client.connect.entities.ConnectorType.SOURCE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand All @@ -21,6 +25,7 @@
import com.michelin.ns4kafka.service.client.connect.entities.ConfigKeyInfo;
import com.michelin.ns4kafka.service.client.connect.entities.ConfigValueInfo;
import com.michelin.ns4kafka.service.client.connect.entities.ConnectorPluginInfo;
import com.michelin.ns4kafka.service.client.connect.entities.ConnectorStateInfo;
import com.michelin.ns4kafka.service.client.connect.entities.ConnectorType;
import com.michelin.ns4kafka.service.executor.ConnectorAsyncExecutor;
import com.michelin.ns4kafka.validation.ConnectValidator;
Expand All @@ -35,7 +40,6 @@
import java.util.Optional;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentMatchers;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
Expand Down Expand Up @@ -868,11 +872,7 @@ void shouldNotValidateRemotelyWhenErrorHappens() {
List.of(new ConfigInfo(new ConfigKeyInfo(null, null, false, null, null, null, null, 0, null, null, null),
new ConfigValueInfo(null, null, null, List.of("error_message"), true))));

when(kafkaConnectClient.validate(
ArgumentMatchers.eq("local"),
ArgumentMatchers.eq("local-name"),
ArgumentMatchers.any(),
ArgumentMatchers.any()))
when(kafkaConnectClient.validate(eq("local"), eq("local-name"), any(), any()))
.thenReturn(Mono.just(configInfos));

StepVerifier.create(connectorService.validateRemotely(ns, connector))
Expand Down Expand Up @@ -908,10 +908,10 @@ void shouldValidateRemotely() {
ConfigInfos configInfos = new ConfigInfos("name", 1, List.of(), List.of());

when(kafkaConnectClient.validate(
ArgumentMatchers.eq("local"),
ArgumentMatchers.eq("local-name"),
ArgumentMatchers.any(),
ArgumentMatchers.any()))
eq("local"),
eq("local-name"),
any(),
any()))
.thenReturn(Mono.just(configInfos));

StepVerifier.create(connectorService.validateRemotely(ns, connector))
Expand Down Expand Up @@ -1305,4 +1305,49 @@ void shouldNotDeleteConnectorWhenConnectClusterReturnsError() {

verify(connectorRepository, never()).delete(connector);
}

@Test
void shouldRestartAllTasksOfConnector() {
Namespace namespace = Namespace.builder()
.metadata(Metadata.builder()
.name("namespace")
.cluster("local")
.build())
.build();

Connector connector = Connector.builder()
.metadata(Metadata.builder()
.name("ns-connect1")
.build())
.spec(Connector.ConnectorSpec.builder()
.connectCluster("local-name")
.build())
.build();

when(kafkaConnectClient.status(namespace.getMetadata().getCluster(), connector.getSpec().getConnectCluster(),
connector.getMetadata().getName()))
.thenReturn(Mono.just(new ConnectorStateInfo(
"connector",
new ConnectorStateInfo.ConnectorState("RUNNING", "worker", "message"),
List.of(
new ConnectorStateInfo.TaskState(0, "RUNNING", "worker", "message"),
new ConnectorStateInfo.TaskState(1, "RUNNING", "worker", "message"),
new ConnectorStateInfo.TaskState(2, "RUNNING", "worker", "message")
),
SOURCE)));

when(kafkaConnectClient.restart(any(), any(), any(), anyInt()))
.thenReturn(Mono.just(HttpResponse.ok()));

StepVerifier.create(connectorService.restart(namespace, connector))
.consumeNextWith(response -> assertEquals(HttpStatus.OK, response.getStatus()))
.verifyComplete();

verify(kafkaConnectClient).restart(namespace.getMetadata().getCluster(),
connector.getSpec().getConnectCluster(), connector.getMetadata().getName(), 0);
verify(kafkaConnectClient).restart(namespace.getMetadata().getCluster(),
connector.getSpec().getConnectCluster(), connector.getMetadata().getName(), 1);
verify(kafkaConnectClient).restart(namespace.getMetadata().getCluster(),
connector.getSpec().getConnectCluster(), connector.getMetadata().getName(), 2);
}
}

0 comments on commit e0afe72

Please sign in to comment.