diff --git a/src/main/java/com/michelin/ns4kafka/controller/connect/ConnectClusterController.java b/src/main/java/com/michelin/ns4kafka/controller/connect/ConnectClusterController.java index b0e8d489..794986e7 100644 --- a/src/main/java/com/michelin/ns4kafka/controller/connect/ConnectClusterController.java +++ b/src/main/java/com/michelin/ns4kafka/controller/connect/ConnectClusterController.java @@ -135,9 +135,11 @@ public Mono> apply(String namespace, @Body @Valid C * @param connectCluster The current connect cluster name to delete * @param dryrun Run in dry mode or not * @return A HTTP response + * @deprecated use {@link #bulkDelete(String, String, boolean)} instead. */ @Status(HttpStatus.NO_CONTENT) @Delete("/{connectCluster}{?dryrun}") + @Deprecated(since = "1.13.0") public HttpResponse delete(String namespace, String connectCluster, @QueryValue(defaultValue = "false") boolean dryrun) { Namespace ns = getNamespace(namespace); @@ -166,20 +168,66 @@ public HttpResponse delete(String namespace, String connectCluster, return HttpResponse.noContent(); } - ConnectCluster connectClusterToDelete = optionalConnectCluster.get(); + performDeletion(optionalConnectCluster.get()); + return HttpResponse.noContent(); + } - sendEventLog( - connectClusterToDelete, - ApplyStatus.deleted, - connectClusterToDelete.getSpec(), - null, - EMPTY_STRING - ); + /** + * Delete Kafka Connect clusters. + * + * @param namespace The current namespace + * @param name The name parameter + * @param dryrun Run in dry mode or not + * @return A HTTP response + */ + @Status(HttpStatus.NO_CONTENT) + @Delete + public HttpResponse bulkDelete(String namespace, @QueryValue(defaultValue = "*") String name, + @QueryValue(defaultValue = "false") boolean dryrun) { + Namespace ns = getNamespace(namespace); - connectClusterService.delete(connectClusterToDelete); + List connectClusters = connectClusterService.findByWildcardNameWithOwnerPermission(ns, name); + + List validationErrors = new ArrayList<>(); + connectClusters.forEach(cc -> { + List connectors = connectorService.findAllByConnectCluster(ns, cc.getMetadata().getName()); + if (!connectors.isEmpty()) { + validationErrors.add(invalidConnectClusterDeleteOperation(cc.getMetadata().getName(), connectors)); + } + }); + + if (!validationErrors.isEmpty()) { + throw new ResourceValidationException(CONNECT_CLUSTER, name, validationErrors); + } + + if (connectClusters.isEmpty()) { + return HttpResponse.notFound(); + } + + if (dryrun) { + return HttpResponse.noContent(); + } + + connectClusters.forEach(this::performDeletion); return HttpResponse.noContent(); } + /** + * Perform the deletion of the connectCluster and send an event log. + * + * @param connectCluster The connectCluster to delete + */ + private void performDeletion(ConnectCluster connectCluster) { + sendEventLog( + connectCluster, + ApplyStatus.deleted, + connectCluster.getSpec(), + null, + EMPTY_STRING + ); + connectClusterService.delete(connectCluster); + } + /** * List vault Kafka Connect clusters by namespace. * diff --git a/src/main/java/com/michelin/ns4kafka/service/ConnectClusterService.java b/src/main/java/com/michelin/ns4kafka/service/ConnectClusterService.java index afbb935e..fb622c61 100644 --- a/src/main/java/com/michelin/ns4kafka/service/ConnectClusterService.java +++ b/src/main/java/com/michelin/ns4kafka/service/ConnectClusterService.java @@ -82,6 +82,7 @@ public Flux findAll(boolean all) { if (all) { results.addAll(managedClusterProperties .stream() + .filter(cluster -> cluster.getConnects() != null) .map(config -> config.getConnects().entrySet() .stream() .map(entry -> diff --git a/src/test/java/com/michelin/ns4kafka/controller/ConnectClusterControllerTest.java b/src/test/java/com/michelin/ns4kafka/controller/ConnectClusterControllerTest.java index cf08a9f2..9bcbae45 100644 --- a/src/test/java/com/michelin/ns4kafka/controller/ConnectClusterControllerTest.java +++ b/src/test/java/com/michelin/ns4kafka/controller/ConnectClusterControllerTest.java @@ -159,6 +159,7 @@ void shouldGetConnectCluster() { } @Test + @SuppressWarnings("deprecation") void shouldNotDeleteConnectClusterWhenNotOwner() { Namespace ns = Namespace.builder() .metadata(Metadata.builder() @@ -177,6 +178,7 @@ void shouldNotDeleteConnectClusterWhenNotOwner() { } @Test + @SuppressWarnings("deprecation") void shouldNotDeleteConnectClusterWhenNotFound() { Namespace ns = Namespace.builder() .metadata(Metadata.builder() @@ -197,6 +199,7 @@ void shouldNotDeleteConnectClusterWhenNotFound() { } @Test + @SuppressWarnings("deprecation") void shouldDeleteConnectCluster() { Namespace ns = Namespace.builder() .metadata(Metadata.builder() @@ -229,6 +232,7 @@ void shouldDeleteConnectCluster() { } @Test + @SuppressWarnings("deprecation") void shouldDeleteConnectClusterInDryRunMode() { Namespace ns = Namespace.builder() .metadata(Metadata.builder() @@ -259,6 +263,7 @@ void shouldDeleteConnectClusterInDryRunMode() { } @Test + @SuppressWarnings("deprecation") void shouldNotDeleteConnectClusterWithConnectorsAssociated() { Namespace ns = Namespace.builder() .metadata(Metadata.builder() @@ -290,6 +295,140 @@ void shouldNotDeleteConnectClusterWithConnectorsAssociated() { result.getValidationErrors().getFirst()); } + @Test + void shouldDeleteConnectClusters() { + Namespace ns = Namespace.builder() + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); + + ConnectCluster connectCluster1 = ConnectCluster.builder() + .metadata(Metadata.builder() + .name("connect-cluster1") + .build()) + .build(); + + ConnectCluster connectCluster2 = ConnectCluster.builder() + .metadata(Metadata.builder() + .name("connect-cluster2") + .build()) + .build(); + + when(namespaceService.findByName("test")) + .thenReturn(Optional.of(ns)); + when(connectorService.findAllByConnectCluster(ns, "connect-cluster1")) + .thenReturn(List.of()); + when(connectorService.findAllByConnectCluster(ns, "connect-cluster2")) + .thenReturn(List.of()); + when(connectClusterService.findByWildcardNameWithOwnerPermission(ns, "connect-cluster*")) + .thenReturn(List.of(connectCluster1, connectCluster2)); + doNothing().when(connectClusterService).delete(connectCluster1); + doNothing().when(connectClusterService).delete(connectCluster2); + when(securityService.username()).thenReturn(Optional.of("test-user")); + when(securityService.hasRole(ResourceBasedSecurityRule.IS_ADMIN)).thenReturn(false); + doNothing().when(applicationEventPublisher).publishEvent(any()); + + HttpResponse actual = connectClusterController.bulkDelete("test", "connect-cluster*", false); + assertEquals(HttpStatus.NO_CONTENT, actual.getStatus()); + } + + @Test + void shouldDeleteConnectClustersInDryRunMode() { + Namespace ns = Namespace.builder() + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); + + ConnectCluster connectCluster = ConnectCluster.builder() + .metadata(Metadata.builder() + .name("connect-cluster") + .build()) + .build(); + + when(namespaceService.findByName("test")) + .thenReturn(Optional.of(ns)); + when(connectorService.findAllByConnectCluster(ns, "connect-cluster")) + .thenReturn(List.of()); + when(connectClusterService.findByWildcardNameWithOwnerPermission(ns, "connect-cluster*")) + .thenReturn(List.of(connectCluster)); + + HttpResponse actual = connectClusterController.bulkDelete("test", "connect-cluster*", true); + assertEquals(HttpStatus.NO_CONTENT, actual.getStatus()); + + verify(connectClusterService, never()).delete(any()); + } + + @Test + void shouldNotDeleteConnectClustersWhenNotFound() { + Namespace ns = Namespace.builder() + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); + + when(namespaceService.findByName("test")) + .thenReturn(Optional.of(ns)); + when(connectClusterService.findByWildcardNameWithOwnerPermission(ns, "connect-cluster*")) + .thenReturn(List.of()); + + HttpResponse actual = connectClusterController.bulkDelete("test", "connect-cluster*", false); + assertEquals(HttpStatus.NOT_FOUND, actual.getStatus()); + } + + @Test + void shouldNotDeleteConnectClustersWithConnectorsAssociated() { + Namespace ns = Namespace.builder() + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); + + Connector connector = Connector.builder() + .metadata(Metadata.builder() + .name("connect1") + .build()) + .build(); + + ConnectCluster connectCluster1 = ConnectCluster.builder() + .metadata(Metadata.builder() + .name("connect-cluster1") + .build()) + .build(); + + ConnectCluster connectCluster2 = ConnectCluster.builder() + .metadata(Metadata.builder() + .name("connect-cluster2") + .build()) + .build(); + + when(namespaceService.findByName("test")) + .thenReturn(Optional.of(ns)); + when(connectorService.findAllByConnectCluster(ns, "connect-cluster1")) + .thenReturn(List.of()); + when(connectorService.findAllByConnectCluster(ns, "connect-cluster2")) + .thenReturn(List.of()); + when(connectClusterService.findByWildcardNameWithOwnerPermission(ns, "connect-cluster*")) + .thenReturn(List.of(connectCluster1, connectCluster2)); + + when(connectorService.findAllByConnectCluster(ns, "connect-cluster2")) + .thenReturn(List.of(connector)); + + ResourceValidationException result = assertThrows(ResourceValidationException.class, + () -> connectClusterController.bulkDelete("test", "connect-cluster*", false)); + + assertEquals(1, result.getValidationErrors().size()); + assertEquals( + "Invalid \"delete\" operation: The Kafka Connect \"connect-cluster2\" has 1 deployed connector(s): " + + "connect1. Please remove the associated connector(s) before deleting it.", + result.getValidationErrors().getFirst()); + } + @Test void shouldCreateConnectCluster() { Namespace ns = Namespace.builder() diff --git a/src/test/java/com/michelin/ns4kafka/integration/ConnectClusterIntegrationTest.java b/src/test/java/com/michelin/ns4kafka/integration/ConnectClusterIntegrationTest.java new file mode 100644 index 00000000..426523ae --- /dev/null +++ b/src/test/java/com/michelin/ns4kafka/integration/ConnectClusterIntegrationTest.java @@ -0,0 +1,247 @@ +package com.michelin.ns4kafka.integration; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.michelin.ns4kafka.integration.TopicIntegrationTest.BearerAccessRefreshToken; +import com.michelin.ns4kafka.integration.container.KafkaConnectIntegrationTest; +import com.michelin.ns4kafka.model.AccessControlEntry; +import com.michelin.ns4kafka.model.AccessControlEntry.AccessControlEntrySpec; +import com.michelin.ns4kafka.model.AccessControlEntry.Permission; +import com.michelin.ns4kafka.model.AccessControlEntry.ResourcePatternType; +import com.michelin.ns4kafka.model.AccessControlEntry.ResourceType; +import com.michelin.ns4kafka.model.Metadata; +import com.michelin.ns4kafka.model.Namespace; +import com.michelin.ns4kafka.model.Namespace.NamespaceSpec; +import com.michelin.ns4kafka.model.RoleBinding; +import com.michelin.ns4kafka.model.RoleBinding.Role; +import com.michelin.ns4kafka.model.RoleBinding.RoleBindingSpec; +import com.michelin.ns4kafka.model.RoleBinding.Subject; +import com.michelin.ns4kafka.model.RoleBinding.SubjectType; +import com.michelin.ns4kafka.model.RoleBinding.Verb; +import com.michelin.ns4kafka.model.connect.cluster.ConnectCluster; +import com.michelin.ns4kafka.model.connect.cluster.VaultResponse; +import com.michelin.ns4kafka.service.executor.ConnectorAsyncExecutor; +import com.michelin.ns4kafka.service.executor.TopicAsyncExecutor; +import com.michelin.ns4kafka.validation.ConnectValidator; +import com.michelin.ns4kafka.validation.TopicValidator; +import io.micronaut.context.ApplicationContext; +import io.micronaut.core.type.Argument; +import io.micronaut.http.HttpMethod; +import io.micronaut.http.HttpRequest; +import io.micronaut.http.HttpResponse; +import io.micronaut.http.client.HttpClient; +import io.micronaut.http.client.annotation.Client; +import io.micronaut.security.authentication.UsernamePasswordCredentials; +import io.micronaut.test.extensions.junit5.annotation.MicronautTest; +import jakarta.inject.Inject; +import java.util.List; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +@Slf4j +@MicronautTest +class ConnectClusterIntegrationTest extends KafkaConnectIntegrationTest { + @Inject + private ApplicationContext applicationContext; + + @Inject + @Client("/") + private HttpClient ns4KafkaClient; + + @Inject + private List topicAsyncExecutorList; + + @Inject + private List connectorAsyncExecutorList; + + private String token; + + @BeforeAll + void init() { + Namespace namespace = Namespace.builder() + .metadata(Metadata.builder() + .name("ns1") + .cluster("test-cluster") + .build()) + .spec(NamespaceSpec.builder() + .kafkaUser("user1") + .connectClusters(List.of("test-connect")) + .topicValidator(TopicValidator.makeDefaultOneBroker()) + .connectValidator(ConnectValidator.builder() + .validationConstraints(Map.of()) + .sinkValidationConstraints(Map.of()) + .classValidationConstraints(Map.of()) + .build()) + .build()) + .build(); + + RoleBinding roleBinding = RoleBinding.builder() + .metadata(Metadata.builder() + .name("ns1-rb") + .namespace("ns1") + .build()) + .spec(RoleBindingSpec.builder() + .role(Role.builder() + .resourceTypes(List.of( + "topics", + "acls", + "connect-clusters", + "connect-clusters/vaults", + "connectors")) + .verbs(List.of(Verb.POST, Verb.GET, Verb.PUT, Verb.DELETE)) + .build()) + .subject(Subject.builder() + .subjectName("group1") + .subjectType(SubjectType.GROUP) + .build()) + .build()) + .build(); + + UsernamePasswordCredentials credentials = new UsernamePasswordCredentials("admin", "admin"); + HttpResponse response = ns4KafkaClient + .toBlocking() + .exchange(HttpRequest.POST("/login", credentials), BearerAccessRefreshToken.class); + + token = response.getBody().get().getAccessToken(); + + ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.POST, "/api/namespaces") + .bearerAuth(token) + .body(namespace)); + + ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.POST, "/api/namespaces/ns1/role-bindings") + .bearerAuth(token) + .body(roleBinding)); + + AccessControlEntry aclConnect = AccessControlEntry.builder() + .metadata(Metadata.builder() + .name("ns1-acl") + .namespace("ns1") + .build()) + .spec(AccessControlEntrySpec.builder() + .resourceType(ResourceType.CONNECT) + .resource("ns1-") + .resourcePatternType(ResourcePatternType.PREFIXED) + .permission(Permission.OWNER) + .grantedTo("ns1") + .build()) + .build(); + + ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.POST, "/api/namespaces/ns1/acls") + .bearerAuth(token) + .body(aclConnect)); + + AccessControlEntry aclConnectCluster = AccessControlEntry.builder() + .metadata(Metadata.builder() + .name("ns1-acl-cc") + .namespace("ns1") + .build()) + .spec(AccessControlEntrySpec.builder() + .resourceType(ResourceType.CONNECT_CLUSTER) + .resource("ns1-") + .resourcePatternType(ResourcePatternType.PREFIXED) + .permission(Permission.OWNER) + .grantedTo("ns1") + .build()) + .build(); + + ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.POST, "/api/namespaces/ns1/acls") + .bearerAuth(token) + .body(aclConnectCluster)); + + AccessControlEntry aclTopic = AccessControlEntry.builder() + .metadata(Metadata.builder() + .name("ns1-acl-topic") + .namespace("ns1") + .build()) + .spec(AccessControlEntrySpec.builder() + .resourceType(ResourceType.TOPIC) + .resource("ns1-") + .resourcePatternType(ResourcePatternType.PREFIXED) + .permission(Permission.OWNER) + .grantedTo("ns1") + .build()) + .build(); + + ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.POST, "/api/namespaces/ns1/acls") + .bearerAuth(token) + .body(aclTopic)); + } + + @Test + void shouldCreateConnectCluster() { + ConnectCluster connectCluster = ConnectCluster.builder() + .metadata(Metadata.builder() + .name("ns1-connectCluster") + .namespace("ns1") + .build()) + .spec(ConnectCluster.ConnectClusterSpec.builder() + .url(getConnectUrl()) + .username("") + .password("") + .aes256Key("aes256Key") + .aes256Salt("aes256Salt") + .aes256Format("%s") + .build()) + .build(); + + ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.POST, "/api/namespaces/ns1/connect-clusters") + .bearerAuth(token) + .body(connectCluster)); + + HttpResponse> connectClusters = ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.GET, "/api/namespaces/ns1/connect-clusters") + .bearerAuth(token), Argument.listOf(ConnectCluster.class)); + + assertEquals(1, connectClusters.getBody().get().size()); + assertEquals("ns1-connectCluster", connectClusters.getBody().get().get(0).getMetadata().getName()); + + List passwordsToVault = List.of("password1", "password2"); + + HttpResponse> vaultResponse = ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.POST, "/api/namespaces/ns1/connect-clusters/ns1-connectCluster/vaults") + .bearerAuth(token) + .body(passwordsToVault), Argument.listOf(VaultResponse.class)); + + assertEquals(2, vaultResponse.getBody().get().size()); + assertEquals("password1", vaultResponse.getBody().get().get(0).getSpec().getClearText()); + assertEquals("password2", vaultResponse.getBody().get().get(1).getSpec().getClearText()); + + ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.DELETE, "/api/namespaces/ns1/connect-clusters?name=ns1*") + .bearerAuth(token)); + + connectClusters = ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.GET, "/api/namespaces/ns1/connect-clusters") + .bearerAuth(token), Argument.listOf(ConnectCluster.class)); + + assertEquals(0, connectClusters.getBody().get().size()); + } +} diff --git a/src/test/java/com/michelin/ns4kafka/service/ConnectClusterServiceTest.java b/src/test/java/com/michelin/ns4kafka/service/ConnectClusterServiceTest.java index 536f05c1..127e1c13 100644 --- a/src/test/java/com/michelin/ns4kafka/service/ConnectClusterServiceTest.java +++ b/src/test/java/com/michelin/ns4kafka/service/ConnectClusterServiceTest.java @@ -128,6 +128,38 @@ void shouldFindAllConnectClustersIncludingThoseDeclaredInNs4KafkaConfig() { .verifyComplete(); } + @Test + void shouldFindAllConnectClustersIncludingThoseDeclaredInNs4KafkaConfigEvenIfNull() { + ConnectCluster connectCluster = ConnectCluster.builder() + .metadata(Metadata.builder() + .name("connect-cluster") + .build()) + .spec(ConnectCluster.ConnectClusterSpec.builder() + .url("https://after") + .build()) + .build(); + + when(connectClusterRepository.findAll()) + .thenReturn(new ArrayList<>(List.of(connectCluster))); + + ManagedClusterProperties kafka = new ManagedClusterProperties("local"); + kafka.setConnects(null); + + when(managedClusterPropertiesList.stream()) + .thenReturn(Stream.of(kafka)); + when(kafkaConnectClient.version(any(), any())) + .thenReturn(Mono.just(HttpResponse.ok())) + .thenReturn(Mono.error(new Exception("error"))); + + StepVerifier.create(connectClusterService.findAll(true)) + .consumeNextWith(result -> { + assertEquals("connect-cluster", result.getMetadata().getName()); + assertEquals(ConnectCluster.Status.HEALTHY, result.getSpec().getStatus()); + assertNull(result.getSpec().getStatusMessage()); + }) + .verifyComplete(); + } + @Test void shouldFindAllConnectClustersForNamespaceWithOwnership() { Namespace namespace = Namespace.builder()