From 10754638e0dd09f5b4e51f0945609e313d0665d7 Mon Sep 17 00:00:00 2001 From: Thomas CAI <92149044+ThomasCAI-mlv@users.noreply.github.com> Date: Wed, 9 Oct 2024 09:26:12 +0200 Subject: [PATCH 1/5] Handle wildcard parameter in ACL deletion API (#458) * Handle wildcard parameter in ACL deletion API --- .../controller/acl/AclController.java | 66 +++++- .../michelin/ns4kafka/service/AclService.java | 32 ++- .../ns4kafka/util/FormatErrorUtils.java | 11 + .../controller/AclControllerTest.java | 203 +++++++++++++++++- .../ns4kafka/service/AclServiceTest.java | 79 ++++++- 5 files changed, 374 insertions(+), 17 deletions(-) diff --git a/src/main/java/com/michelin/ns4kafka/controller/acl/AclController.java b/src/main/java/com/michelin/ns4kafka/controller/acl/AclController.java index 6b6bb820..f0b24ec9 100644 --- a/src/main/java/com/michelin/ns4kafka/controller/acl/AclController.java +++ b/src/main/java/com/michelin/ns4kafka/controller/acl/AclController.java @@ -3,6 +3,7 @@ import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidAclDeleteOnlyAdmin; import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidImmutableField; import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidNotFound; +import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidSelfAssignedAclDelete; import static com.michelin.ns4kafka.util.enumation.Kind.ACCESS_CONTROL_ENTRY; import static io.micronaut.core.util.StringUtils.EMPTY_STRING; @@ -58,7 +59,7 @@ public List list(String namespace, .stream() .sorted(Comparator.comparing((AccessControlEntry acl) -> acl.getMetadata().getNamespace())) .toList(); - case GRANTOR -> aclService.findAllGrantedByNamespaceByWildcardName(ns, name) + case GRANTOR -> aclService.findAllGrantedByNamespaceToOthersByWildcardName(ns, name) .stream() .sorted(Comparator.comparing(acl -> acl.getSpec().getGrantedTo())) .toList(); @@ -77,7 +78,7 @@ public List list(String namespace, * @param namespace The name * @param acl The ACL name * @return The ACL - * @deprecated use list(String, Optional ALL, String name) instead. + * @deprecated use {@link #list(String, Optional, String)} instead. */ @Get("/{acl}") @Deprecated(since = "1.12.0") @@ -94,7 +95,7 @@ public Optional get(String namespace, String acl) { * @param authentication The authentication entity * @param namespace The namespace * @param accessControlEntry The ACL - * @param dryrun Is dry run mode or not ? + * @param dryrun Is dry run mode or not? * @return An HTTP response */ @Post("{?dryrun}") @@ -152,17 +153,74 @@ public HttpResponse apply(Authentication authentication, Str return formatHttpResponse(aclService.create(accessControlEntry), status); } + /** + * Delete an ACL. + * + * @param authentication The authentication entity + * @param namespace The namespace + * @param name The ACL name parameter + * @param dryrun Is dry run mode or not? + * @return An HTTP response + */ + @Delete + @Status(HttpStatus.NO_CONTENT) + public HttpResponse bulkDelete(Authentication authentication, String namespace, + @QueryValue(defaultValue = "*") String name, + @QueryValue(defaultValue = "false") boolean dryrun) { + + Namespace ns = getNamespace(namespace); + List acls = aclService.findAllGrantedByNamespaceByWildcardName(ns, name); + List selfAssignedAcls = acls + .stream() + .filter(acl -> namespace.equals(acl.getSpec().getGrantedTo())) + .toList(); + boolean isAdmin = authentication.getRoles().contains(ResourceBasedSecurityRule.IS_ADMIN); + + if (acls.isEmpty()) { + return HttpResponse.notFound(); + } + + // If non-admin tries to delete at least one self-assigned ACL, throw validation error + if (!isAdmin && !selfAssignedAcls.isEmpty()) { + List selfAssignedAclsNames = selfAssignedAcls + .stream() + .map(acl -> acl.getMetadata().getName()) + .toList(); + throw new ResourceValidationException(ACCESS_CONTROL_ENTRY, name, + invalidSelfAssignedAclDelete(name, String.join(", ", selfAssignedAclsNames)) + ); + } + + if (dryrun) { + return HttpResponse.noContent(); + } + + acls.forEach(acl -> { + sendEventLog( + acl, + ApplyStatus.deleted, + acl.getSpec(), + null, + EMPTY_STRING); + aclService.delete(acl); + }); + + return HttpResponse.noContent(); + } + /** * Delete an ACL. * * @param authentication The authentication entity * @param namespace The namespace * @param name The ACL name - * @param dryrun Is dry run mode or not ? + * @param dryrun Is dry run mode or not? * @return An HTTP response + * @deprecated use {@link #bulkDelete(Authentication, String, String, boolean)} instead. */ @Delete("/{name}{?dryrun}") @Status(HttpStatus.NO_CONTENT) + @Deprecated(since = "1.13.0") public HttpResponse delete(Authentication authentication, String namespace, String name, @QueryValue(defaultValue = "false") boolean dryrun) { AccessControlEntry accessControlEntry = aclService diff --git a/src/main/java/com/michelin/ns4kafka/service/AclService.java b/src/main/java/com/michelin/ns4kafka/service/AclService.java index 2d3cfe0b..d476f6f9 100644 --- a/src/main/java/com/michelin/ns4kafka/service/AclService.java +++ b/src/main/java/com/michelin/ns4kafka/service/AclService.java @@ -263,12 +263,25 @@ public List findAllGrantedToNamespace(Namespace namespace) { } /** - * Find all ACLs that a given namespace granted to other namespaces. + * Find all ACLs granted by a given namespace. * * @param namespace The namespace * @return A list of ACLs */ public List findAllGrantedByNamespace(Namespace namespace) { + return accessControlEntryRepository.findAll() + .stream() + .filter(acl -> acl.getMetadata().getNamespace().equals(namespace.getMetadata().getName())) + .toList(); + } + + /** + * Find all ACLs that a given namespace granted to other namespaces. + * + * @param namespace The namespace + * @return A list of ACLs + */ + public List findAllGrantedByNamespaceToOthers(Namespace namespace) { return accessControlEntryRepository.findAll() .stream() .filter(acl -> acl.getMetadata().getNamespace().equals(namespace.getMetadata().getName())) @@ -308,7 +321,7 @@ public List findAllGrantedToNamespaceByWildcardName(Namespac } /** - * Find all ACLs that a given namespace granted to other namespaces, filtered by name parameter. + * Find all ACLs granted by a given namespace, filtered by name parameter. * * @param namespace The namespace * @param name The name parameter @@ -322,6 +335,21 @@ public List findAllGrantedByNamespaceByWildcardName(Namespac .toList(); } + /** + * Find all ACLs that a given namespace granted to other namespaces, filtered by name parameter. + * + * @param namespace The namespace + * @param name The name parameter + * @return A list of ACLs + */ + public List findAllGrantedByNamespaceToOthersByWildcardName(Namespace namespace, String name) { + List nameFilterPatterns = RegexUtils.convertWildcardStringsToRegex(List.of(name)); + return findAllGrantedByNamespaceToOthers(namespace) + .stream() + .filter(acl -> RegexUtils.isResourceCoveredByRegex(acl.getMetadata().getName(), nameFilterPatterns)) + .toList(); + } + /** * Find all ACLs that a given namespace granted to other namespaces, filtered by name parameter. * diff --git a/src/main/java/com/michelin/ns4kafka/util/FormatErrorUtils.java b/src/main/java/com/michelin/ns4kafka/util/FormatErrorUtils.java index 7b118bcd..9d22c087 100644 --- a/src/main/java/com/michelin/ns4kafka/util/FormatErrorUtils.java +++ b/src/main/java/com/michelin/ns4kafka/util/FormatErrorUtils.java @@ -50,6 +50,17 @@ public static String invalidAclDeleteOnlyAdmin(String invalidAclName) { return String.format(INVALID_FIELD, invalidAclName, FIELD_NAME, "only administrators can delete this ACL"); } + /** + * Invalid self-assigned ACL delete, authorized only by admin. + * + * @param invalidAclName the invalid ACL name + * @return the error message + */ + public static String invalidSelfAssignedAclDelete(String invalidAclName, String acls) { + return String.format(INVALID_FIELD, invalidAclName, FIELD_NAME, + "only administrators can delete the following self-assigned ACLs: " + acls); + } + /** * Invalid ACL field. * diff --git a/src/test/java/com/michelin/ns4kafka/controller/AclControllerTest.java b/src/test/java/com/michelin/ns4kafka/controller/AclControllerTest.java index 05787420..c68bb3b4 100644 --- a/src/test/java/com/michelin/ns4kafka/controller/AclControllerTest.java +++ b/src/test/java/com/michelin/ns4kafka/controller/AclControllerTest.java @@ -135,7 +135,7 @@ void shouldListAclsWithoutNameParameter() { when(aclService.findAllGrantedToNamespaceByWildcardName(namespace, "*")).thenReturn( List.of(aceTopicPrefixedOwnerAdminToTest, aceConnectPrefixedOwnerAdminToTest, aceTopicPrefixedReadNamespaceOtherToTest, aceTopicPrefixedReadAdminToAll)); - when(aclService.findAllGrantedByNamespaceByWildcardName(namespace, "*")) + when(aclService.findAllGrantedByNamespaceToOthersByWildcardName(namespace, "*")) .thenReturn(List.of(aceTopicPrefixedReadTestToNamespaceOther)); when(aclService.findAllRelatedToNamespaceByWildcardName(namespace, "*")).thenReturn( List.of(aceTopicPrefixedReadTestToNamespaceOther, aceTopicPrefixedOwnerAdminToTest, @@ -223,7 +223,7 @@ void shouldListAclsWithNameParameter() { .thenReturn(List.of(aclGrantedToNamespace)); when(aclService.findAllGrantedToNamespaceByWildcardName(namespace, "ownerAcl")) .thenReturn(List.of()); - when(aclService.findAllGrantedByNamespaceByWildcardName(namespace, "aclGrantedByNamespace")) + when(aclService.findAllGrantedByNamespaceToOthersByWildcardName(namespace, "aclGrantedByNamespace")) .thenReturn(List.of(aclGrantedByNamespace)); when(aclService.findAllGrantedToNamespaceByWildcardName(namespace, "ownerAcl")) .thenReturn(List.of()); @@ -706,7 +706,8 @@ void shouldApplyAclInDryRunMode() { } @Test - void shouldDeleteAclFailWhenNotFound() { + @SuppressWarnings("deprecation") + void shouldNotDeleteAclWhenNotFound() { Authentication authentication = Authentication.build("user", Map.of("roles", List.of())); when(aclService.findByName("test", "ace1")).thenReturn(Optional.empty()); @@ -719,7 +720,8 @@ void shouldDeleteAclFailWhenNotFound() { } @Test - void shouldDeleteSelfAssignedAclFailWhenNotAdmin() { + @SuppressWarnings("deprecation") + void shouldNotDeleteSelfAssignedAclWhenNotAdmin() { AccessControlEntry accessControlEntry = AccessControlEntry.builder() .metadata(Metadata.builder() .name("ace1") @@ -746,7 +748,8 @@ void shouldDeleteSelfAssignedAclFailWhenNotAdmin() { } @Test - void shouldDeleteSelfAssignedAclWithSuccessAsAdmin() { + @SuppressWarnings("deprecation") + void shouldDeleteSelfAssignedAclAsAdmin() { AccessControlEntry accessControlEntry = AccessControlEntry.builder() .metadata(Metadata.builder() .name("ace1") @@ -772,7 +775,8 @@ void shouldDeleteSelfAssignedAclWithSuccessAsAdmin() { } @Test - void shouldDeleteAclWithSuccess() { + @SuppressWarnings("deprecation") + void shouldDeleteAcl() { AccessControlEntry accessControlEntry = AccessControlEntry.builder() .metadata(Metadata.builder() .name("ace1") @@ -801,7 +805,8 @@ void shouldDeleteAclWithSuccess() { } @Test - void shouldDeleteInDryRunMode() { + @SuppressWarnings("deprecation") + void shouldNotDeleteInDryRunMode() { AccessControlEntry accessControlEntry = AccessControlEntry.builder() .metadata(Metadata.builder() .name("ace1") @@ -824,4 +829,188 @@ void shouldDeleteInDryRunMode() { verify(aclService, never()).delete(any()); assertEquals(HttpStatus.NO_CONTENT, actual.status()); } + + @Test + void shouldNotBulkDeleteAclWhenNotFound() { + Authentication authentication = Authentication.build("user", Map.of("roles", List.of())); + Namespace namespace = Namespace.builder() + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); + + when(namespaceService.findByName("test")).thenReturn(Optional.of(namespace)); + when(aclService.findAllGrantedByNamespaceByWildcardName(namespace, "ace1")).thenReturn(List.of()); + + HttpResponse actual = accessControlListController.bulkDelete(authentication, "test", "ace1", false); + assertEquals(HttpStatus.NOT_FOUND, actual.status()); + } + + @Test + void shouldNotBulkDeleteSelfAssignedAclWhenNotAdmin() { + Namespace namespace = Namespace.builder() + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); + AccessControlEntry acl1 = AccessControlEntry.builder() + .metadata(Metadata.builder() + .name("ace1") + .namespace("test") + .cluster("local") + .build()) + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .resourceType(AccessControlEntry.ResourceType.TOPIC) + .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) + .permission(AccessControlEntry.Permission.OWNER) + .resource("prefix") + .grantedTo("test").build()) + .build(); + + AccessControlEntry acl2 = AccessControlEntry.builder() + .metadata(Metadata.builder() + .name("ace2") + .namespace("test") + .cluster("local") + .build()) + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .resourceType(AccessControlEntry.ResourceType.TOPIC) + .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) + .permission(AccessControlEntry.Permission.READ) + .resource("prefix") + .grantedTo("other").build()) + .build(); + + Authentication authentication = Authentication.build("user", Map.of("roles", List.of())); + + when(namespaceService.findByName("test")).thenReturn(Optional.of(namespace)); + when(aclService.findAllGrantedByNamespaceByWildcardName(namespace, "ace*")).thenReturn(List.of(acl1, acl2)); + + ResourceValidationException actual = assertThrows(ResourceValidationException.class, + () -> accessControlListController.bulkDelete(authentication, "test", "ace*", false)); + + assertEquals("Invalid value \"ace*\" for field \"name\":" + + " only administrators can delete the following self-assigned ACLs: ace1.", + actual.getValidationErrors().getFirst()); + } + + @Test + void shouldBulkDeleteSelfAssignedAclAsAdmin() { + Namespace namespace = Namespace.builder() + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); + AccessControlEntry accessControlEntry = AccessControlEntry.builder() + .metadata(Metadata.builder() + .name("ace1") + .namespace("test") + .cluster("local") + .build()) + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .resourceType(AccessControlEntry.ResourceType.TOPIC) + .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) + .permission(AccessControlEntry.Permission.OWNER) + .resource("prefix") + .grantedTo("test").build()) + .build(); + + Authentication authentication = Authentication.build("user", List.of("isAdmin()"), + Map.of("roles", List.of("isAdmin()"))); + + when(namespaceService.findByName("test")).thenReturn(Optional.of(namespace)); + when(aclService.findAllGrantedByNamespaceByWildcardName(namespace, "ace1")) + .thenReturn(List.of(accessControlEntry)); + + HttpResponse actual = accessControlListController.bulkDelete(authentication, "test", "ace1", false); + + assertEquals(HttpStatus.NO_CONTENT, actual.status()); + } + + @Test + void shouldBulkDeleteAcl() { + Namespace namespace = Namespace.builder() + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); + AccessControlEntry acl1 = AccessControlEntry.builder() + .metadata(Metadata.builder() + .name("ace1") + .namespace("test") + .cluster("local") + .build()) + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .resourceType(AccessControlEntry.ResourceType.TOPIC) + .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) + .permission(AccessControlEntry.Permission.READ) + .resource("prefix") + .grantedTo("namespace-other") + .build()) + .build(); + + AccessControlEntry acl2 = AccessControlEntry.builder() + .metadata(Metadata.builder() + .name("ace2") + .namespace("test") + .cluster("local") + .build()) + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .resourceType(AccessControlEntry.ResourceType.TOPIC) + .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) + .permission(AccessControlEntry.Permission.WRITE) + .resource("prefix") + .grantedTo("namespace-other2") + .build()) + .build(); + + Authentication authentication = Authentication.build("user", Map.of("roles", List.of())); + + when(namespaceService.findByName("test")).thenReturn(Optional.of(namespace)); + when(aclService.findAllGrantedByNamespaceByWildcardName(namespace, "ace*")).thenReturn(List.of(acl1, acl2)); + when(securityService.username()).thenReturn(Optional.of("test-user")); + when(securityService.hasRole(ResourceBasedSecurityRule.IS_ADMIN)).thenReturn(false); + doNothing().when(applicationEventPublisher).publishEvent(any()); + + HttpResponse actual = accessControlListController.bulkDelete(authentication, "test", "ace*", false); + + assertEquals(HttpStatus.NO_CONTENT, actual.status()); + } + + @Test + void shouldNotBulkDeleteInDryRunMode() { + Namespace namespace = Namespace.builder() + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); + AccessControlEntry accessControlEntry = AccessControlEntry.builder() + .metadata(Metadata.builder() + .name("ace1") + .namespace("test") + .cluster("local") + .build()) + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .resourceType(AccessControlEntry.ResourceType.TOPIC) + .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) + .permission(AccessControlEntry.Permission.READ) + .resource("prefix") + .grantedTo("namespace-other") + .build()).build(); + + Authentication authentication = Authentication.build("user", Map.of("roles", List.of())); + + when(namespaceService.findByName("test")).thenReturn(Optional.of(namespace)); + when(aclService.findAllGrantedByNamespaceByWildcardName(namespace, "ace1")) + .thenReturn(List.of(accessControlEntry)); + HttpResponse actual = accessControlListController.bulkDelete(authentication, "test", "ace1", true); + + verify(aclService, never()).delete(any()); + assertEquals(HttpStatus.NO_CONTENT, actual.status()); + } } diff --git a/src/test/java/com/michelin/ns4kafka/service/AclServiceTest.java b/src/test/java/com/michelin/ns4kafka/service/AclServiceTest.java index fdc41c3c..54045e9a 100644 --- a/src/test/java/com/michelin/ns4kafka/service/AclServiceTest.java +++ b/src/test/java/com/michelin/ns4kafka/service/AclServiceTest.java @@ -1022,7 +1022,7 @@ void shouldFindAclGrantedByNamespaceByWildcardName() { AccessControlEntry acl2 = AccessControlEntry.builder() .metadata(Metadata.builder() - .name("acl-ns1-read-to-ns2") + .name("ns1-read-ns2") .namespace("namespace1") .build()) .spec(AccessControlEntry.AccessControlEntrySpec.builder() @@ -1066,12 +1066,83 @@ void shouldFindAclGrantedByNamespaceByWildcardName() { when(accessControlEntryRepository.findAll()).thenReturn(List.of(acl1, acl2, acl3, acl4, acl5)); - assertEquals(List.of(acl2, acl3), aclService.findAllGrantedByNamespaceByWildcardName(ns, "*")); - assertEquals(List.of(acl2), aclService.findAllGrantedByNamespaceByWildcardName(ns, "acl-ns1-read-to-ns2")); - assertEquals(List.of(acl2, acl3), aclService.findAllGrantedByNamespaceByWildcardName(ns, "*-to-ns2")); + assertEquals(List.of(acl1, acl2, acl3), aclService.findAllGrantedByNamespaceByWildcardName(ns, "*")); + assertEquals(List.of(acl2), aclService.findAllGrantedByNamespaceByWildcardName(ns, "ns1-read-ns2")); + assertEquals(List.of(acl3), aclService.findAllGrantedByNamespaceByWildcardName(ns, "*-to-ns2")); assertTrue(aclService.findAllGrantedByNamespaceByWildcardName(ns, "not-found").isEmpty()); } + @Test + void shouldFindAclGrantedByNamespaceToOthersByWildcardName() { + Namespace ns = Namespace.builder() + .metadata(Metadata.builder() + .name("namespace1") + .build()) + .build(); + + AccessControlEntry acl1 = AccessControlEntry.builder() + .metadata(Metadata.builder() + .name("ns1-acl-topic") + .namespace("namespace1") + .build()) + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .resourceType(AccessControlEntry.ResourceType.TOPIC) + .permission(AccessControlEntry.Permission.OWNER) + .grantedTo("namespace1").build()) + .build(); + + AccessControlEntry acl2 = AccessControlEntry.builder() + .metadata(Metadata.builder() + .name("ns1-read-ns2") + .namespace("namespace1") + .build()) + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .resourceType(AccessControlEntry.ResourceType.TOPIC) + .permission(AccessControlEntry.Permission.READ) + .grantedTo("namespace2").build()) + .build(); + + AccessControlEntry acl3 = AccessControlEntry.builder() + .metadata(Metadata.builder() + .name("ns1-connect-write-to-ns2") + .namespace("namespace1") + .build()) + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .resourceType(AccessControlEntry.ResourceType.CONNECT) + .permission(AccessControlEntry.Permission.WRITE) + .grantedTo("namespace2").build()) + .build(); + + AccessControlEntry acl4 = AccessControlEntry.builder() + .metadata(Metadata.builder() + .name("ns2-acl-topic") + .namespace("namespace2") + .build()) + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .resourceType(AccessControlEntry.ResourceType.TOPIC) + .permission(AccessControlEntry.Permission.OWNER) + .grantedTo("namespace2").build()) + .build(); + + AccessControlEntry acl5 = AccessControlEntry.builder() + .metadata(Metadata.builder() + .name("ns3-read-topic-all") + .namespace("namespace3") + .build()) + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .resourceType(AccessControlEntry.ResourceType.TOPIC) + .permission(AccessControlEntry.Permission.READ) + .grantedTo("*").build()) + .build(); + + when(accessControlEntryRepository.findAll()).thenReturn(List.of(acl1, acl2, acl3, acl4, acl5)); + + assertEquals(List.of(acl2, acl3), aclService.findAllGrantedByNamespaceToOthersByWildcardName(ns, "*")); + assertEquals(List.of(acl2), aclService.findAllGrantedByNamespaceToOthersByWildcardName(ns, "ns1-read-ns2")); + assertEquals(List.of(acl3), aclService.findAllGrantedByNamespaceToOthersByWildcardName(ns, "*-to-ns2")); + assertTrue(aclService.findAllGrantedByNamespaceToOthersByWildcardName(ns, "not-found").isEmpty()); + } + @Test void shouldFindAclRelatedToNamespaceByWildcardName() { AccessControlEntry acl1 = AccessControlEntry.builder() From c50b6fde8db960070ae2411b506b9f0a66a05ca5 Mon Sep 17 00:00:00 2001 From: adriencalime <110117127+adriencalime@users.noreply.github.com> Date: Wed, 9 Oct 2024 09:39:49 +0200 Subject: [PATCH 2/5] Handle wildcard parameter in Kstream deletion API (#455) * Handle wildcard parameter in Kstream deletion API --- .../ns4kafka/controller/StreamController.java | 55 ++++++- .../controller/StreamControllerTest.java | 143 ++++++++++++++++++ .../integration/StreamIntegrationTest.java | 15 ++ 3 files changed, 210 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/michelin/ns4kafka/controller/StreamController.java b/src/main/java/com/michelin/ns4kafka/controller/StreamController.java index d19872c0..3ae71890 100644 --- a/src/main/java/com/michelin/ns4kafka/controller/StreamController.java +++ b/src/main/java/com/michelin/ns4kafka/controller/StreamController.java @@ -40,7 +40,7 @@ public class StreamController extends NamespacedResourceController { * List Kafka Streams by namespace, filtered by name parameter. * * @param namespace The namespace - * @param name The name parameter + * @param name The name parameter * @return A list of Kafka Streams */ @Get @@ -67,7 +67,7 @@ Optional get(String namespace, String stream) { * * @param namespace The namespace * @param stream The Kafka Stream - * @param dryrun Is dry run mode or not ? + * @param dryrun Is dry run mode or not? * @return An HTTP response */ @Post("/{?dryrun}") @@ -110,11 +110,13 @@ HttpResponse apply(String namespace, @Body @Valid KafkaStream strea * * @param namespace The namespace * @param stream The Kafka Streams - * @param dryrun Is dry run mode or not ? + * @param dryrun Is dry run mode or not? * @return An HTTP response + * @deprecated use {@link #bulkDelete(String, String, boolean)} instead. */ @Status(HttpStatus.NO_CONTENT) @Delete("/{stream}{?dryrun}") + @Deprecated(since = "1.13.0") HttpResponse delete(String namespace, String stream, @QueryValue(defaultValue = "false") boolean dryrun) { Namespace ns = getNamespace(namespace); if (!streamService.isNamespaceOwnerOfKafkaStream(ns, stream)) { @@ -144,4 +146,51 @@ HttpResponse delete(String namespace, String stream, @QueryValue(defaultVa streamService.delete(ns, optionalStream.get()); return HttpResponse.noContent(); } + + /** + * Delete a Kafka Streams. + * + * @param namespace The namespace + * @param name The name parameter + * @param dryrun Is dry run mode or not? + * @return An HTTP response + */ + @Status(HttpStatus.NO_CONTENT) + @Delete + HttpResponse bulkDelete(String namespace, @QueryValue(defaultValue = "*") String name, + @QueryValue(defaultValue = "false") boolean dryrun) { + Namespace ns = getNamespace(namespace); + + List kafkaStreams = streamService.findByWildcardName(ns, name); + + List validationErrors = kafkaStreams.stream() + .filter(kafkaStream -> + !streamService.isNamespaceOwnerOfKafkaStream(ns, kafkaStream.getMetadata().getName())) + .map(kafkaStream -> invalidOwner(kafkaStream.getMetadata().getName())) + .toList(); + + if (!validationErrors.isEmpty()) { + throw new ResourceValidationException(KAFKA_STREAM, name, validationErrors); + } + + if (kafkaStreams.isEmpty()) { + return HttpResponse.notFound(); + } + + if (dryrun) { + return HttpResponse.noContent(); + } + kafkaStreams.forEach(kafkaStream -> { + sendEventLog( + kafkaStream, + ApplyStatus.deleted, + kafkaStream.getMetadata(), + null, + EMPTY_STRING + ); + streamService.delete(ns, kafkaStream); + }); + + return HttpResponse.noContent(); + } } diff --git a/src/test/java/com/michelin/ns4kafka/controller/StreamControllerTest.java b/src/test/java/com/michelin/ns4kafka/controller/StreamControllerTest.java index 71d8c649..71a78309 100644 --- a/src/test/java/com/michelin/ns4kafka/controller/StreamControllerTest.java +++ b/src/test/java/com/michelin/ns4kafka/controller/StreamControllerTest.java @@ -327,6 +327,7 @@ void shouldNotCreateStreamsWhenValidationErrors() { } @Test + @SuppressWarnings("deprecation") void shouldDeleteStreams() { Namespace ns = Namespace.builder() .metadata(Metadata.builder() @@ -359,6 +360,7 @@ void shouldDeleteStreams() { } @Test + @SuppressWarnings("deprecation") void shouldDeleteStreamsInDryRunMode() { Namespace ns = Namespace.builder() .metadata(Metadata.builder() @@ -388,6 +390,7 @@ void shouldDeleteStreamsInDryRunMode() { } @Test + @SuppressWarnings("deprecation") void shouldNotDeleteStreamsWhenNotFound() { Namespace ns = Namespace.builder() .metadata(Metadata.builder() @@ -411,6 +414,7 @@ void shouldNotDeleteStreamsWhenNotFound() { } @Test + @SuppressWarnings("deprecation") void shouldNotDeleteStreamsWhenNotOwner() { Namespace ns = Namespace.builder() .metadata(Metadata.builder() @@ -428,4 +432,143 @@ void shouldNotDeleteStreamsWhenNotOwner() { assertThrows(ResourceValidationException.class, () -> streamController.delete("test", "test_stream1", false)); verify(streamService, never()).delete(any(), any()); } + + @Test + void shouldDeleteMultipleStreams() { + Namespace ns = Namespace.builder() + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); + + KafkaStream stream1 = KafkaStream.builder() + .metadata(Metadata.builder() + .name("test_stream1") + .build()) + .build(); + + KafkaStream stream2 = KafkaStream.builder() + .metadata(Metadata.builder() + .name("test_stream2") + .build()) + .build(); + + when(namespaceService.findByName("test")) + .thenReturn(Optional.of(ns)); + + when(streamService.isNamespaceOwnerOfKafkaStream(ns, "test_stream1")) + .thenReturn(true); + + when(streamService.isNamespaceOwnerOfKafkaStream(ns, "test_stream2")) + .thenReturn(true); + + when(streamService.findByWildcardName(ns, "test_stream*")) + .thenReturn(List.of(stream1, stream2)); + + when(securityService.username()).thenReturn(Optional.of("test-user")); + when(securityService.hasRole(ResourceBasedSecurityRule.IS_ADMIN)).thenReturn(false); + doNothing().when(applicationEventPublisher).publishEvent(any()); + doNothing().when(streamService).delete(ns, stream1); + doNothing().when(streamService).delete(ns, stream2); + var response = streamController.bulkDelete("test", "test_stream*", false); + assertEquals(HttpStatus.NO_CONTENT, response.getStatus()); + } + + @Test + void shouldDeleteMultipleStreamsInDryRunMode() { + Namespace ns = Namespace.builder() + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); + + KafkaStream stream1 = KafkaStream.builder() + .metadata(Metadata.builder() + .name("test_stream1") + .build()) + .build(); + + KafkaStream stream2 = KafkaStream.builder() + .metadata(Metadata.builder() + .name("test_stream2") + .build()) + .build(); + + when(namespaceService.findByName("test")) + .thenReturn(Optional.of(ns)); + + when(streamService.isNamespaceOwnerOfKafkaStream(ns, "test_stream1")) + .thenReturn(true); + + when(streamService.isNamespaceOwnerOfKafkaStream(ns, "test_stream2")) + .thenReturn(true); + + when(streamService.findByWildcardName(ns, "test_stream*")) + .thenReturn(List.of(stream1, stream2)); + + var response = streamController.bulkDelete("test", "test_stream*", true); + verify(streamService, never()).delete(any(), any()); + assertEquals(HttpStatus.NO_CONTENT, response.getStatus()); + } + + @Test + void shouldNotDeleteMultipleStreamsWhenNotFound() { + Namespace ns = Namespace.builder() + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); + + when(namespaceService.findByName("test")) + .thenReturn(Optional.of(ns)); + + when(streamService.findByWildcardName(ns, "test_stream*")) + .thenReturn(List.of()); + + var response = streamController.bulkDelete("test", "test_stream*", false); + verify(streamService, never()).delete(any(), any()); + + assertEquals(HttpStatus.NOT_FOUND, response.getStatus()); + } + + @Test + void shouldNotDeleteMultipleStreamsWhenNotOwner() { + Namespace ns = Namespace.builder() + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); + + KafkaStream stream1 = KafkaStream.builder() + .metadata(Metadata.builder() + .name("test_stream1") + .build()) + .build(); + + KafkaStream stream2 = KafkaStream.builder() + .metadata(Metadata.builder() + .name("test_stream2") + .build()) + .build(); + + when(namespaceService.findByName("test")) + .thenReturn(Optional.of(ns)); + + when(streamService.isNamespaceOwnerOfKafkaStream(ns, "test_stream1")) + .thenReturn(true); + + when(streamService.isNamespaceOwnerOfKafkaStream(ns, "test_stream2")) + .thenReturn(false); + + when(streamService.findByWildcardName(ns, "test_stream*")) + .thenReturn(List.of(stream1, stream2)); + + assertThrows(ResourceValidationException.class, () -> + streamController.bulkDelete("test", "test_stream*", false)); + verify(streamService, never()).delete(any(), any()); + } } \ No newline at end of file diff --git a/src/test/java/com/michelin/ns4kafka/integration/StreamIntegrationTest.java b/src/test/java/com/michelin/ns4kafka/integration/StreamIntegrationTest.java index 3faf89d7..5d827537 100644 --- a/src/test/java/com/michelin/ns4kafka/integration/StreamIntegrationTest.java +++ b/src/test/java/com/michelin/ns4kafka/integration/StreamIntegrationTest.java @@ -16,6 +16,7 @@ import com.michelin.ns4kafka.model.Namespace.NamespaceSpec; import com.michelin.ns4kafka.service.executor.AccessControlEntryAsyncExecutor; import com.michelin.ns4kafka.validation.TopicValidator; +import io.micronaut.core.type.Argument; import io.micronaut.http.HttpMethod; import io.micronaut.http.HttpRequest; import io.micronaut.http.HttpResponse; @@ -163,5 +164,19 @@ void shouldVerifyCreationOfAcls() throws InterruptedException, ExecutionExceptio assertEquals(1, aclTransactionalId.size()); assertTrue(aclTransactionalId.stream().findFirst().isPresent()); assertEquals(AclOperation.WRITE, aclTransactionalId.stream().findFirst().get().entry().operation()); + + ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.DELETE, "/api/namespaces/nskafkastream/streams?name=kstream*") + .bearerAuth(token)); + + HttpResponse> streams = ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.GET, "/api/namespaces/nskafkastream/streams") + .bearerAuth(token), Argument.listOf(KafkaStream.class)); + + assertEquals(0, streams.getBody().get().size()); } } From a0c6b8afaceb480599d571678aa3b06ad549649e Mon Sep 17 00:00:00 2001 From: Thomas CAI <92149044+ThomasCAI-mlv@users.noreply.github.com> Date: Wed, 9 Oct 2024 16:48:37 +0200 Subject: [PATCH 3/5] Handle wildcard parameter in Quota deletion API (#465) * Handle wildcard parameter in Quota deletion API --- .../quota/ResourceQuotaController.java | 44 +++++++++++++- .../ResourceQuotaControllerTest.java | 58 +++++++++++++++++-- 2 files changed, 95 insertions(+), 7 deletions(-) diff --git a/src/main/java/com/michelin/ns4kafka/controller/quota/ResourceQuotaController.java b/src/main/java/com/michelin/ns4kafka/controller/quota/ResourceQuotaController.java index 8f8957d8..25a3a707 100644 --- a/src/main/java/com/michelin/ns4kafka/controller/quota/ResourceQuotaController.java +++ b/src/main/java/com/michelin/ns4kafka/controller/quota/ResourceQuotaController.java @@ -42,6 +42,7 @@ public class ResourceQuotaController extends NamespacedResourceController { * List quotas by namespace. * * @param namespace The namespace + * @param name The name parameter * @return A list of quotas */ @Get @@ -59,7 +60,7 @@ public List list(String namespace, @QueryValue(defaultVal * @param namespace The name * @param quota The quota name * @return A quota - * @deprecated use list(String, String name) instead. + * @deprecated use {@link #list(String, String)} instead. */ @Get("/{quota}") @Deprecated(since = "1.12.0") @@ -115,16 +116,55 @@ public HttpResponse apply(String namespace, @Body @Valid Resource return formatHttpResponse(resourceQuotaService.create(quota), status); } + /** + * Delete quotas. + * + * @param namespace The namespace + * @param name The name parameter + * @param dryrun Is dry run mode or not? + * @return An HTTP response + */ + @Delete + @Status(HttpStatus.NO_CONTENT) + public HttpResponse bulkDelete(String namespace, @QueryValue(defaultValue = "*") String name, + @QueryValue(defaultValue = "false") boolean dryrun) { + + List resourceQuotas = resourceQuotaService.findByWildcardName(namespace, name); + + if (resourceQuotas.isEmpty()) { + return HttpResponse.notFound(); + } + + if (dryrun) { + return HttpResponse.noContent(); + } + + resourceQuotas.forEach(resourceQuota -> { + sendEventLog( + resourceQuota, + ApplyStatus.deleted, + resourceQuota.getSpec(), + null, + EMPTY_STRING + ); + resourceQuotaService.delete(resourceQuota); + }); + + return HttpResponse.noContent(); + } + /** * Delete a quota. * * @param namespace The namespace * @param name The resource quota - * @param dryrun Is dry run mode or not ? + * @param dryrun Is dry run mode or not? * @return An HTTP response + * @deprecated use {@link #bulkDelete(String, String, boolean)} instead. */ @Delete("/{name}{?dryrun}") @Status(HttpStatus.NO_CONTENT) + @Deprecated(since = "1.13.0") public HttpResponse delete(String namespace, String name, @QueryValue(defaultValue = "false") boolean dryrun) { Optional resourceQuota = resourceQuotaService.findByName(namespace, name); diff --git a/src/test/java/com/michelin/ns4kafka/controller/ResourceQuotaControllerTest.java b/src/test/java/com/michelin/ns4kafka/controller/ResourceQuotaControllerTest.java index ed161303..ad426b39 100644 --- a/src/test/java/com/michelin/ns4kafka/controller/ResourceQuotaControllerTest.java +++ b/src/test/java/com/michelin/ns4kafka/controller/ResourceQuotaControllerTest.java @@ -321,7 +321,8 @@ void shouldApplyUpdated() { } @Test - void shouldDeleteWhenNotFound() { + @SuppressWarnings("deprecation") + void shouldNotDeleteQuotaWhenNotFound() { when(resourceQuotaService.findByName("test", "quota")).thenReturn(Optional.empty()); HttpResponse actual = resourceQuotaController.delete("test", "quota", false); assertEquals(HttpStatus.NOT_FOUND, actual.getStatus()); @@ -329,11 +330,12 @@ void shouldDeleteWhenNotFound() { } @Test - void shouldDeleteWhenDryRun() { + @SuppressWarnings("deprecation") + void shouldNotDeleteQuotaWhenDryRun() { ResourceQuota resourceQuota = ResourceQuota.builder() .metadata(Metadata.builder() .cluster("local") - .name("created-quota") + .name("quota") .build()) .spec(Map.of("count/topics", "3")) .build(); @@ -345,11 +347,12 @@ void shouldDeleteWhenDryRun() { } @Test - void shouldDelete() { + @SuppressWarnings("deprecation") + void shouldDeleteQuota() { ResourceQuota resourceQuota = ResourceQuota.builder() .metadata(Metadata.builder() .cluster("local") - .name("created-quota") + .name("quota") .build()) .spec(Map.of("count/topics", "3")) .build(); @@ -364,4 +367,49 @@ void shouldDelete() { assertEquals(HttpStatus.NO_CONTENT, actual.getStatus()); verify(resourceQuotaService).delete(resourceQuota); } + + @Test + void shouldNotBulkDeleteQuotaWhenNotFound() { + when(resourceQuotaService.findByWildcardName("test", "quota*")).thenReturn(List.of()); + HttpResponse actual = resourceQuotaController.bulkDelete("test", "quota*", false); + assertEquals(HttpStatus.NOT_FOUND, actual.getStatus()); + verify(resourceQuotaService, never()).delete(ArgumentMatchers.any()); + } + + @Test + void shouldNotBulkDeleteQuotaWhenDryRun() { + ResourceQuota resourceQuota1 = ResourceQuota.builder() + .metadata(Metadata.builder() + .cluster("local") + .name("quota1") + .build()) + .spec(Map.of("count/topics", "3")) + .build(); + + when(resourceQuotaService.findByWildcardName("test", "quota*")).thenReturn(List.of(resourceQuota1)); + HttpResponse actual = resourceQuotaController.bulkDelete("test", "quota*", true); + assertEquals(HttpStatus.NO_CONTENT, actual.getStatus()); + verify(resourceQuotaService, never()).delete(ArgumentMatchers.any()); + } + + @Test + void shouldBulkDeleteQuota() { + ResourceQuota resourceQuota = ResourceQuota.builder() + .metadata(Metadata.builder() + .cluster("local") + .name("quota") + .build()) + .spec(Map.of("count/topics", "3")) + .build(); + + when(resourceQuotaService.findByWildcardName("test", "quota*")).thenReturn(List.of(resourceQuota)); + when(securityService.username()).thenReturn(Optional.of("test-user")); + when(securityService.hasRole(ResourceBasedSecurityRule.IS_ADMIN)).thenReturn(false); + doNothing().when(applicationEventPublisher).publishEvent(any()); + doNothing().when(resourceQuotaService).delete(resourceQuota); + + HttpResponse actual = resourceQuotaController.bulkDelete("test", "quota*", false); + assertEquals(HttpStatus.NO_CONTENT, actual.getStatus()); + verify(resourceQuotaService).delete(resourceQuota); + } } From 18ff01ff124dc614cacc2958c6487114e067ac23 Mon Sep 17 00:00:00 2001 From: Thomas CAI <92149044+ThomasCAI-mlv@users.noreply.github.com> Date: Fri, 11 Oct 2024 19:48:21 +0200 Subject: [PATCH 4/5] Handle wildcard parameter in Schema deletion API (#446) * feature * fix return type & unit tests * add unit tests + remove unused import * Add integration test * Fix test + fix javadoc for depreciation * Fix test * fix javadoc --- .../ns4kafka/controller/SchemaController.java | 61 +++++- .../ns4kafka/service/SchemaService.java | 4 +- .../client/schema/SchemaRegistryClient.java | 2 +- .../controller/SchemaControllerTest.java | 181 ++++++++++++++++++ .../integration/SchemaIntegrationTest.java | 136 ++++++++++--- 5 files changed, 356 insertions(+), 28 deletions(-) diff --git a/src/main/java/com/michelin/ns4kafka/controller/SchemaController.java b/src/main/java/com/michelin/ns4kafka/controller/SchemaController.java index 5538c264..5c3c895f 100644 --- a/src/main/java/com/michelin/ns4kafka/controller/SchemaController.java +++ b/src/main/java/com/michelin/ns4kafka/controller/SchemaController.java @@ -62,7 +62,7 @@ public Flux list(String namespace, @QueryValue(defaultValue = "*") S * @param namespace The namespace * @param subject The subject * @return A schema - * @deprecated use list(String, String name) instead. + * @deprecated use {@link #list(String, String)} instead. */ @Get("/{subject}") @Deprecated(since = "1.12.0") @@ -146,16 +146,71 @@ public Mono> apply(String namespace, @Valid @Body Schema sc } /** - * Delete all schema versions under the given subject, or a specific version of the schema if specified. + * Delete all schema versions or a specific schema version if specified, under all given subjects. + * + * @param namespace The namespace + * @param name The subject name parameter + * @param versionOptional The version of the schemas to delete + * @param dryrun Run in dry mode or not? + * @return A HTTP response + */ + @Status(HttpStatus.NO_CONTENT) + @Delete + public Mono> bulkDelete(String namespace, + @QueryValue(defaultValue = "*") String name, + @QueryValue("version") Optional versionOptional, + @QueryValue(defaultValue = "false") boolean dryrun) { + Namespace ns = getNamespace(namespace); + + return schemaService.findByWildcardName(ns, name) + .flatMap(schema -> versionOptional + .map(version -> schemaService.getSubjectByVersion(ns, schema.getMetadata().getName(), version)) + .orElseGet(() -> schemaService.getSubjectLatestVersion(ns, schema.getMetadata().getName())) + .map(Optional::of) + .defaultIfEmpty(Optional.empty())) + .collectList() + .flatMap(schemas -> { + if (schemas.isEmpty() || schemas.stream().anyMatch(Optional::isEmpty)) { + return Mono.just(HttpResponse.notFound()); + } + + if (dryrun) { + return Mono.just(HttpResponse.noContent()); + } + + return Flux.fromIterable(schemas) + .map(Optional::get) + .flatMap(schema -> (versionOptional.isEmpty() + ? schemaService.deleteAllVersions(ns, schema.getMetadata().getName()) : + schemaService.deleteVersion(ns, schema.getMetadata().getName(), versionOptional.get())) + .flatMap(deletedVersionIds -> { + sendEventLog( + schema, + ApplyStatus.deleted, + schema.getSpec(), + null, + versionOptional.map(v -> String.valueOf(deletedVersionIds)) + .orElse(EMPTY_STRING) + ); + return Mono.just(HttpResponse.noContent()); + })) + .then(Mono.just(HttpResponse.noContent())); + }); + } + + /** + * Delete all schema versions or a specific schema version if specified, under the given subject. * * @param namespace The namespace * @param subject The subject * @param versionOptional The version of the schema to delete - * @param dryrun Run in dry mode or not + * @param dryrun Run in dry mode or not? * @return A HTTP response + * @deprecated use {@link #bulkDelete(String, String, Optional, boolean)} instead. */ @Status(HttpStatus.NO_CONTENT) @Delete("/{subject}") + @Deprecated(since = "1.13.0") public Mono> delete(String namespace, @PathVariable String subject, @QueryValue("version") Optional versionOptional, diff --git a/src/main/java/com/michelin/ns4kafka/service/SchemaService.java b/src/main/java/com/michelin/ns4kafka/service/SchemaService.java index 597abc25..f659c5cd 100644 --- a/src/main/java/com/michelin/ns4kafka/service/SchemaService.java +++ b/src/main/java/com/michelin/ns4kafka/service/SchemaService.java @@ -230,7 +230,7 @@ public Mono register(Namespace namespace, Schema schema) { * * @param namespace The namespace * @param subject The subject to delete - * @return The list of deleted versions + * @return The list of deleted schema versions */ public Mono deleteAllVersions(Namespace namespace, String subject) { return schemaRegistryClient @@ -246,7 +246,7 @@ public Mono deleteAllVersions(Namespace namespace, String subject) { * @param namespace The namespace * @param subject The subject * @param version The version of the schema to delete - * @return The latest subject after deletion + * @return The deleted schema version */ public Mono deleteVersion(Namespace namespace, String subject, String version) { return schemaRegistryClient diff --git a/src/main/java/com/michelin/ns4kafka/service/client/schema/SchemaRegistryClient.java b/src/main/java/com/michelin/ns4kafka/service/client/schema/SchemaRegistryClient.java index 0060a20b..288d5ab9 100644 --- a/src/main/java/com/michelin/ns4kafka/service/client/schema/SchemaRegistryClient.java +++ b/src/main/java/com/michelin/ns4kafka/service/client/schema/SchemaRegistryClient.java @@ -139,7 +139,7 @@ public Mono deleteSubject(String kafkaCluster, String subject, boolea } /** - * Delete a subject. + * Delete schema version under a subject. * * @param kafkaCluster The Kafka cluster * @param subject The subject diff --git a/src/test/java/com/michelin/ns4kafka/controller/SchemaControllerTest.java b/src/test/java/com/michelin/ns4kafka/controller/SchemaControllerTest.java index 7c0097e6..6801019a 100644 --- a/src/test/java/com/michelin/ns4kafka/controller/SchemaControllerTest.java +++ b/src/test/java/com/michelin/ns4kafka/controller/SchemaControllerTest.java @@ -442,6 +442,7 @@ void shouldNotUpdateCompatibilityWhenNamespaceNotOwner() { } @Test + @SuppressWarnings("deprecation") void shouldNotDeleteAllSchemaVersionsWhenNotOwner() { Namespace namespace = buildNamespace(); @@ -465,6 +466,7 @@ void shouldNotDeleteAllSchemaVersionsWhenNotOwner() { } @Test + @SuppressWarnings("deprecation") void shouldNotDeleteOneSchemaVersionWhenNotOwner() { Namespace namespace = buildNamespace(); @@ -488,6 +490,7 @@ void shouldNotDeleteOneSchemaVersionWhenNotOwner() { } @Test + @SuppressWarnings("deprecation") void shouldDeleteAllSchemaVersions() { Namespace namespace = buildNamespace(); Schema schema = buildSchema(); @@ -514,6 +517,7 @@ void shouldDeleteAllSchemaVersions() { } @Test + @SuppressWarnings("deprecation") void shouldDeleteSchemaVersion() { Namespace namespace = buildNamespace(); Schema schema1 = buildSchema(); @@ -535,6 +539,7 @@ void shouldDeleteSchemaVersion() { } @Test + @SuppressWarnings("deprecation") void shouldNotDeleteAllSchemaVersionsWhenEmpty() { Namespace namespace = buildNamespace(); @@ -553,6 +558,7 @@ void shouldNotDeleteAllSchemaVersionsWhenEmpty() { } @Test + @SuppressWarnings("deprecation") void shouldNotDeleteSchemaVersionWhenEmpty() { Namespace namespace = buildNamespace(); @@ -571,6 +577,7 @@ void shouldNotDeleteSchemaVersionWhenEmpty() { } @Test + @SuppressWarnings("deprecation") void shouldNotDeleteAllSchemaVersionsInDryRunMode() { Namespace namespace = buildNamespace(); Schema schema = buildSchema(); @@ -590,6 +597,7 @@ void shouldNotDeleteAllSchemaVersionsInDryRunMode() { } @Test + @SuppressWarnings("deprecation") void shouldNotDeleteSchemaVersionInDryRunMode() { Namespace namespace = buildNamespace(); Schema schema = buildSchema(); @@ -608,6 +616,171 @@ void shouldNotDeleteSchemaVersionInDryRunMode() { verify(schemaService, never()).deleteVersion(namespace, "prefix.subject-value", "1"); } + @Test + void shouldBulkDeleteAllSchemaVersions() { + Namespace namespace = buildNamespace(); + Schema schema1 = buildSchema(); + SchemaList schemaList = buildSchemaList(); + + when(namespaceService.findByName("myNamespace")) + .thenReturn(Optional.of(namespace)); + when(schemaService.findByWildcardName(namespace, "prefix.subject-value")) + .thenReturn(Flux.fromIterable(List.of(schemaList))); + when(schemaService.getSubjectLatestVersion(namespace, "prefix.subject-value")) + .thenReturn(Mono.just(schema1)); + when(schemaService.deleteAllVersions(namespace, "prefix.subject-value")) + .thenReturn(Mono.just(new Integer[1])); + + StepVerifier.create(schemaController.bulkDelete("myNamespace", "prefix.subject-value", Optional.empty(), false)) + .consumeNextWith(response -> assertEquals(HttpStatus.NO_CONTENT, response.getStatus())) + .verifyComplete(); + + verify(applicationEventPublisher).publishEvent(any()); + } + + @Test + void shouldBulkDeleteSchemaVersion() { + Namespace namespace = buildNamespace(); + Schema schema = buildSchema(); + SchemaList schemaList = buildSchemaList(); + + when(namespaceService.findByName("myNamespace")) + .thenReturn(Optional.of(namespace)); + when(schemaService.findByWildcardName(namespace, "prefix.subject-value")) + .thenReturn(Flux.fromIterable(List.of(schemaList))); + when(schemaService.getSubjectByVersion(namespace, "prefix.subject-value", "1")) + .thenReturn(Mono.just(schema)); + when(schemaService.deleteVersion(namespace, "prefix.subject-value", "1")) + .thenReturn(Mono.just(1)); + + StepVerifier.create(schemaController.bulkDelete("myNamespace", "prefix.subject-value", Optional.of("1"), false)) + .consumeNextWith(response -> assertEquals(HttpStatus.NO_CONTENT, response.getStatus())) + .verifyComplete(); + + verify(applicationEventPublisher).publishEvent(any()); + } + + + @Test + void shouldNotBulkDeleteAllSchemaVersionsWhenEmpty() { + Namespace namespace = buildNamespace(); + + when(namespaceService.findByName("myNamespace")) + .thenReturn(Optional.of(namespace)); + when(schemaService.findByWildcardName(namespace, "prefix.subject-value")) + .thenReturn(Flux.fromIterable(List.of())); + + StepVerifier.create(schemaController.bulkDelete("myNamespace", "prefix.subject-value", Optional.empty(), false)) + .consumeNextWith(response -> assertEquals(HttpStatus.NOT_FOUND, response.getStatus())) + .verifyComplete(); + + verify(schemaService, never()).deleteAllVersions(namespace, "prefix.subject-value"); + } + + @Test + void shouldNotBulkDeleteSchemaVersionWhenEmpty() { + Namespace namespace = buildNamespace(); + + when(namespaceService.findByName("myNamespace")) + .thenReturn(Optional.of(namespace)); + when(schemaService.findByWildcardName(namespace, "prefix.subject-value")) + .thenReturn(Flux.fromIterable(List.of())); + + StepVerifier.create(schemaController.bulkDelete("myNamespace", "prefix.subject-value", Optional.of("1"), false)) + .consumeNextWith(response -> assertEquals(HttpStatus.NOT_FOUND, response.getStatus())) + .verifyComplete(); + + verify(schemaService, never()).deleteVersion(namespace, "prefix.subject-value", "1"); + } + + @Test + void shouldNotBulkDeleteAllSchemaVersionsWhenVersionNotFound() { + Namespace namespace = buildNamespace(); + Schema schema = buildSchema(); + SchemaList schemaList = buildSchemaList(); + SchemaList schemaList2 = buildSchemaList2(); + + when(namespaceService.findByName("myNamespace")) + .thenReturn(Optional.of(namespace)); + when(schemaService.findByWildcardName(namespace, "prefix.subject*")) + .thenReturn(Flux.fromIterable(List.of(schemaList, schemaList2))); + when(schemaService.getSubjectLatestVersion(namespace, "prefix.subject-value")) + .thenReturn(Mono.just(schema)); + when(schemaService.getSubjectLatestVersion(namespace, "prefix.subject2-value")) + .thenReturn(Mono.empty()); + + StepVerifier.create(schemaController.bulkDelete("myNamespace", "prefix.subject*", Optional.empty(), false)) + .consumeNextWith(response -> assertEquals(HttpStatus.NOT_FOUND, response.getStatus())) + .verifyComplete(); + + verify(schemaService, never()).deleteAllVersions(namespace, "prefix.subject-value"); + verify(schemaService, never()).deleteAllVersions(namespace, "prefix.subject2-value"); + } + + @Test + void shouldNotBulkDeleteSchemaVersionWhenVersionNotFound() { + Namespace namespace = buildNamespace(); + Schema schema = buildSchema(); + SchemaList schemaList = buildSchemaList(); + SchemaList schemaList2 = buildSchemaList2(); + + when(namespaceService.findByName("myNamespace")) + .thenReturn(Optional.of(namespace)); + when(schemaService.findByWildcardName(namespace, "prefix.subject*")) + .thenReturn(Flux.fromIterable(List.of(schemaList, schemaList2))); + when(schemaService.getSubjectByVersion(namespace, "prefix.subject-value", "1")) + .thenReturn(Mono.just(schema)); + when(schemaService.getSubjectByVersion(namespace, "prefix.subject2-value", "1")) + .thenReturn(Mono.empty()); + + StepVerifier.create(schemaController.bulkDelete("myNamespace", "prefix.subject*", Optional.of("1"), false)) + .consumeNextWith(response -> assertEquals(HttpStatus.NOT_FOUND, response.getStatus())) + .verifyComplete(); + + verify(schemaService, never()).deleteVersion(namespace, "prefix.subject-value", "1"); + verify(schemaService, never()).deleteVersion(namespace, "prefix.subject2-value", "1"); + } + + @Test + void shouldNotBulkDeleteAllSchemaVersionsInDryRunMode() { + Namespace namespace = buildNamespace(); + Schema schema = buildSchema(); + SchemaList schemaList = buildSchemaList(); + + when(namespaceService.findByName("myNamespace")) + .thenReturn(Optional.of(namespace)); + when(schemaService.findByWildcardName(namespace, "prefix.subject-value")) + .thenReturn(Flux.fromIterable(List.of(schemaList))); + when(schemaService.getSubjectLatestVersion(namespace, "prefix.subject-value")) + .thenReturn(Mono.just(schema)); + + StepVerifier.create(schemaController.bulkDelete("myNamespace", "prefix.subject-value", Optional.empty(), true)) + .consumeNextWith(response -> assertEquals(HttpStatus.NO_CONTENT, response.getStatus())) + .verifyComplete(); + + verify(schemaService, never()).deleteAllVersions(namespace, "prefix.subject-value"); + } + + @Test + void shouldNotBulkDeleteSchemaVersionInDryRunMode() { + Namespace namespace = buildNamespace(); + Schema schema = buildSchema(); + SchemaList schemaList = buildSchemaList(); + + when(namespaceService.findByName("myNamespace")) + .thenReturn(Optional.of(namespace)); + when(schemaService.findByWildcardName(namespace, "prefix.subject-value")) + .thenReturn(Flux.fromIterable(List.of(schemaList))); + when(schemaService.getSubjectByVersion(namespace, "prefix.subject-value", "1")) + .thenReturn(Mono.just(schema)); + + StepVerifier.create(schemaController.bulkDelete("myNamespace", "prefix.subject-value", Optional.of("1"), true)) + .consumeNextWith(response -> assertEquals(HttpStatus.NO_CONTENT, response.getStatus())) + .verifyComplete(); + + verify(schemaService, never()).deleteVersion(namespace, "prefix.subject-value", "1"); + } + private Namespace buildNamespace() { return Namespace.builder() .metadata(Metadata.builder() @@ -669,4 +842,12 @@ private SchemaList buildSchemaList() { .build()) .build(); } + + private SchemaList buildSchemaList2() { + return SchemaList.builder() + .metadata(Metadata.builder() + .name("prefix.subject2-value") + .build()) + .build(); + } } diff --git a/src/test/java/com/michelin/ns4kafka/integration/SchemaIntegrationTest.java b/src/test/java/com/michelin/ns4kafka/integration/SchemaIntegrationTest.java index f829f208..56506c01 100644 --- a/src/test/java/com/michelin/ns4kafka/integration/SchemaIntegrationTest.java +++ b/src/test/java/com/michelin/ns4kafka/integration/SchemaIntegrationTest.java @@ -1,6 +1,7 @@ package com.michelin.ns4kafka.integration; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -768,7 +769,7 @@ void shouldRegisterSameSchemaTwice() { } @Test - void shouldDeleteSchema() { + void shouldDeleteSchemaVersion() { Schema schemaV1 = Schema.builder() .metadata(Metadata.builder() .name("ns1-subject4-value") @@ -861,61 +862,152 @@ void shouldDeleteSchema() { var deleteLatestVersionResponse = ns4KafkaClient .toBlocking() .exchange(HttpRequest - .create(HttpMethod.DELETE, "/api/namespaces/ns1/schemas/ns1-subject4-value?version=latest") + .create(HttpMethod.DELETE, "/api/namespaces/ns1/schemas?name=ns1-subject4-value&version=latest") .bearerAuth(token), Schema.class); assertEquals(HttpStatus.NO_CONTENT, deleteLatestVersionResponse.getStatus()); - // Get all schemas - var getSchemaAfterLatestVersionDeletionResponse = ns4KafkaClient + // Get schemas versions + var getSchemaAfterLatestVersionDeletionResponse = schemaRegistryClient .toBlocking() .exchange(HttpRequest - .create(HttpMethod.GET, "/api/namespaces/ns1/schemas/ns1-subject4-value") - .bearerAuth(token), Schema.class); + .create(HttpMethod.GET, "/subjects/ns1-subject4-value/versions"), Argument.listOf(String.class)); - // Expects v3 is returned by ns4kafka assertTrue(getSchemaAfterLatestVersionDeletionResponse.getBody().isPresent()); - assertEquals(3, getSchemaAfterLatestVersionDeletionResponse.getBody().get().getSpec().getVersion()); + assertTrue(getSchemaAfterLatestVersionDeletionResponse.getBody().get().containsAll(List.of("1", "2", "3"))); // Delete old schema version var deleteOldVersionResponse = ns4KafkaClient .toBlocking() .exchange(HttpRequest - .create(HttpMethod.DELETE, "/api/namespaces/ns1/schemas/ns1-subject4-value?version=1") + .create(HttpMethod.DELETE, "/api/namespaces/ns1/schemas?name=ns1-subject4-value&version=1") .bearerAuth(token), Schema.class); assertEquals(HttpStatus.NO_CONTENT, deleteOldVersionResponse.getStatus()); - // Get all schemas - var getSchemaAfterOldVersionDeletionResponse = ns4KafkaClient + // Get schemas versions + var getSchemaAfterOldVersionDeletionResponse = schemaRegistryClient .toBlocking() .exchange(HttpRequest - .create(HttpMethod.GET, "/api/namespaces/ns1/schemas/ns1-subject4-value") - .bearerAuth(token), Schema.class); + .create(HttpMethod.GET, "/subjects/ns1-subject4-value/versions"), Argument.listOf(String.class)); - // Expects v3 as returned schema assertTrue(getSchemaAfterOldVersionDeletionResponse.getBody().isPresent()); - assertEquals(3, getSchemaAfterOldVersionDeletionResponse.getBody().get().getSpec().getVersion()); + assertTrue(getSchemaAfterOldVersionDeletionResponse.getBody().get().containsAll(List.of("2", "3"))); // Delete all remaining schema versions var deleteAllVersionsResponse = ns4KafkaClient .toBlocking() .exchange(HttpRequest - .create(HttpMethod.DELETE, "/api/namespaces/ns1/schemas/ns1-subject4-value") + .create(HttpMethod.DELETE, "/api/namespaces/ns1/schemas?name=ns1-subject4-value") .bearerAuth(token), Schema.class); assertEquals(HttpStatus.NO_CONTENT, deleteAllVersionsResponse.getStatus()); // Get all schemas - var getSchemaAfterAllVersionsDeletionResponse = ns4KafkaClient + var getSchemaAfterAllVersionsDeletionResponse = schemaRegistryClient .toBlocking() .exchange(HttpRequest - .create(HttpMethod.GET, "/api/namespaces/ns1/schemas") - .bearerAuth(token), Argument.listOf(SchemaList.class)); + .create(HttpMethod.GET, "/subjects"), Argument.listOf(String.class)); + + assertTrue(getSchemaAfterOldVersionDeletionResponse.getBody().isPresent()); + assertFalse(getSchemaAfterAllVersionsDeletionResponse.getBody().get().contains("ns1-subject4-value")); + } + + @Test + void shouldBulkDeleteSchemas() { + Schema schema1 = Schema.builder() + .metadata(Metadata.builder() + .name("ns1-subject5-value") + .build()) + .spec(Schema.SchemaSpec.builder() + .schema( + "{\"namespace\":\"com.michelin.kafka.producer.showcase.avro\",\"type\":\"record\"," + + "\"name\":\"PersonAvro\",\"fields\":[{\"name\":\"firstName\",\"type\":[\"null\",\"string\"]," + + "\"default\":null,\"doc\":\"First name of the person\"}," + + "{\"name\":\"lastName\",\"type\":[\"null\",\"string\"],\"default\":null," + + "\"doc\":\"Last name of the person\"},{\"name\":\"dateOfBirth\",\"type\":[\"null\"," + + "{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null," + + "\"doc\":\"Date of birth of the person\"}]}") + .build()) + .build(); + + Schema schema2 = Schema.builder() + .metadata(Metadata.builder() + .name("ns1-subject5-key") + .build()) + .spec(Schema.SchemaSpec.builder() + .schema( + "{\"namespace\":\"com.michelin.kafka.producer.showcase.avro\",\"type\":\"record\"," + + "\"name\":\"PersonAvro\",\"fields\":[{\"name\":\"firstName\",\"type\":[\"null\",\"string\"]," + + "\"default\":null,\"doc\":\"First name of the person\"}," + + "{\"name\":\"lastName\",\"type\":[\"null\",\"string\"],\"default\":null," + + "\"doc\":\"Last name of the person\"}]}") + .build()) + .build(); + + Schema schema3 = Schema.builder() + .metadata(Metadata.builder() + .name("ns1-subject6-value") + .build()) + .spec(Schema.SchemaSpec.builder() + .schema( + "{\"namespace\":\"com.michelin.kafka.producer.showcase.avro\",\"type\":\"record\"," + + "\"name\":\"PersonAvro\",\"fields\":[{\"name\":\"firstName\",\"type\":[\"null\",\"string\"]," + + "\"default\":null,\"doc\":\"First name of the person\"}," + + "{\"name\":\"lastName\",\"type\":[\"null\",\"string\"],\"default\":null," + + "\"doc\":\"Last name of the person\"}]}") + .build()) + .build(); + + // Register all schemas + ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.POST, "/api/namespaces/ns1/schemas") + .bearerAuth(token) + .body(schema1), Schema.class); + + ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.POST, "/api/namespaces/ns1/schemas") + .bearerAuth(token) + .body(schema2), Schema.class); + + ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.POST, "/api/namespaces/ns1/schemas") + .bearerAuth(token) + .body(schema3), Schema.class); + + var getSchemasBeforeDeletionResponse = schemaRegistryClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.GET, "/subjects"), Argument.listOf(String.class)); + + assertTrue(getSchemasBeforeDeletionResponse.getBody().isPresent()); + assertTrue(getSchemasBeforeDeletionResponse.getBody().get() + .containsAll(List.of("ns1-subject5-value", "ns1-subject5-key", "ns1-subject6-value"))); + + // Delete schema with wildcard + var deleteResponse = ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.DELETE, "/api/namespaces/ns1/schemas?name=ns1-subject5-*") + .bearerAuth(token), Schema.class); + + assertEquals(HttpStatus.NO_CONTENT, deleteResponse.getStatus()); + + var getSchemasAfterDeletionResponse = schemaRegistryClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.GET, "/subjects"), Argument.listOf(String.class)); - assertTrue(getSchemaAfterAllVersionsDeletionResponse.getBody().isPresent()); - assertTrue(getSchemaAfterAllVersionsDeletionResponse.getBody().get() + assertTrue(getSchemasAfterDeletionResponse.getBody().isPresent()); + assertTrue(getSchemasAfterDeletionResponse.getBody().get() .stream() - .noneMatch(schemaList -> schemaList.getMetadata().getName().equals("ns1-subject4-value"))); + .noneMatch(subject -> List.of("ns1-subject5-key", "ns1-subject5-value").contains(subject))); + assertTrue(getSchemasAfterDeletionResponse.getBody().get().contains("ns1-subject6-value")); } } From be5f1be9e46bfeca2487e86c5d1cba2e55ae7cfe Mon Sep 17 00:00:00 2001 From: adriencalime <110117127+adriencalime@users.noreply.github.com> Date: Fri, 11 Oct 2024 20:21:18 +0200 Subject: [PATCH 5/5] Handle wildcard parameter in Connector deletion API (#451) * delete connectors api with wildcard * add delete integration test for connectors * Improve javadoc & remove extra indents * Increase sleep time --------- Co-authored-by: thcai --- .../controller/ConnectorController.java | 47 +++++- .../controller/ConnectorControllerTest.java | 136 ++++++++++++++++++ .../integration/ConnectorIntegrationTest.java | 17 ++- 3 files changed, 197 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/michelin/ns4kafka/controller/ConnectorController.java b/src/main/java/com/michelin/ns4kafka/controller/ConnectorController.java index 787bc3cb..fd41b93f 100644 --- a/src/main/java/com/michelin/ns4kafka/controller/ConnectorController.java +++ b/src/main/java/com/michelin/ns4kafka/controller/ConnectorController.java @@ -12,7 +12,6 @@ import com.michelin.ns4kafka.service.ResourceQuotaService; import com.michelin.ns4kafka.util.enumation.ApplyStatus; import com.michelin.ns4kafka.util.exception.ResourceValidationException; -import io.micronaut.core.util.StringUtils; import io.micronaut.http.HttpResponse; import io.micronaut.http.HttpStatus; import io.micronaut.http.MutableHttpResponse; @@ -66,7 +65,7 @@ public List list(String namespace, @QueryValue(defaultValue = "*") St * @param namespace The namespace * @param connector The name * @return A connector - * @deprecated use list(String, String name) instead. + * @deprecated use {@link #list(String, String)} instead. */ @Get("/{connector}") @Deprecated(since = "1.12.0") @@ -165,9 +164,11 @@ public Mono> apply(String namespace, @Valid @Body Connec * @param connector The current connector name to delete * @param dryrun Run in dry mode or not * @return A HTTP response + * @deprecated use {@link #bulkDelete(String, String, boolean)} instead. */ @Status(HttpStatus.NO_CONTENT) @Delete("/{connector}{?dryrun}") + @Deprecated(since = "1.13.0") public Mono> delete(String namespace, String connector, @QueryValue(defaultValue = "false") boolean dryrun) { Namespace ns = getNamespace(namespace); @@ -201,6 +202,48 @@ public Mono> delete(String namespace, String connector, .map(httpResponse -> HttpResponse.noContent()); } + /** + * Delete connectors. + * + * @param namespace The current namespace + * @param name The name parameter + * @param dryrun Run in dry mode or not + * @return A HTTP response + */ + @Status(HttpStatus.NO_CONTENT) + @Delete + public Mono> bulkDelete(String namespace, @QueryValue(defaultValue = "*") String name, + @QueryValue(defaultValue = "false") boolean dryrun) { + Namespace ns = getNamespace(namespace); + + List connectors = connectorService.findByWildcardName(ns, name); + + // Validate ownership + List validationErrors = connectors.stream() + .filter(connector -> !connectorService.isNamespaceOwnerOfConnect(ns, connector.getMetadata().getName())) + .map(connector -> invalidOwner(connector.getMetadata().getName())) + .toList(); + + if (!validationErrors.isEmpty()) { + return Mono.error(new ResourceValidationException(CONNECTOR, name, validationErrors)); + } + + if (connectors.isEmpty()) { + return Mono.just(HttpResponse.notFound()); + } + + if (dryrun) { + return Mono.just(HttpResponse.noContent()); + } + + return Flux.fromIterable(connectors) + .flatMap(connector -> { + sendEventLog(connector, ApplyStatus.deleted, connector.getSpec(), null, EMPTY_STRING); + return connectorService.delete(ns, connector); + }) + .then(Mono.just(HttpResponse.noContent())); + } + /** * Change the state of a connector. * diff --git a/src/test/java/com/michelin/ns4kafka/controller/ConnectorControllerTest.java b/src/test/java/com/michelin/ns4kafka/controller/ConnectorControllerTest.java index 3e0a4cef..6194e702 100644 --- a/src/test/java/com/michelin/ns4kafka/controller/ConnectorControllerTest.java +++ b/src/test/java/com/michelin/ns4kafka/controller/ConnectorControllerTest.java @@ -156,6 +156,7 @@ void shouldGetConnector() { } @Test + @SuppressWarnings("deprecation") void shouldNotDeleteConnectorWhenNotOwned() { Namespace ns = Namespace.builder() .metadata(Metadata.builder() @@ -180,6 +181,7 @@ void shouldNotDeleteConnectorWhenNotOwned() { } @Test + @SuppressWarnings("deprecation") void shouldDeleteConnector() { Namespace ns = Namespace.builder() .metadata(Metadata.builder() @@ -207,6 +209,7 @@ void shouldDeleteConnector() { } @Test + @SuppressWarnings("deprecation") void shouldDeleteConnectorInDryRunMode() { Namespace ns = Namespace.builder() .metadata(Metadata.builder() @@ -236,6 +239,7 @@ void shouldDeleteConnectorInDryRunMode() { } @Test + @SuppressWarnings("deprecation") void shouldNotDeleteConnectorWhenNotFound() { Namespace ns = Namespace.builder() .metadata(Metadata.builder() @@ -258,6 +262,138 @@ void shouldNotDeleteConnectorWhenNotFound() { verify(connectorService, never()).delete(any(), any()); } + @Test + void shouldDeleteConnectors() { + Namespace ns = Namespace.builder() + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); + + Connector connector1 = Connector.builder().metadata(Metadata.builder().name("connect1").build()).build(); + Connector connector2 = Connector.builder().metadata(Metadata.builder().name("connect2").build()).build(); + when(namespaceService.findByName("test")) + .thenReturn(Optional.of(ns)); + when(connectorService.isNamespaceOwnerOfConnect(ns, "connect1")) + .thenReturn(true); + when(connectorService.isNamespaceOwnerOfConnect(ns, "connect2")) + .thenReturn(true); + when(connectorService.findByWildcardName(ns, "connect*")) + .thenReturn(List.of(connector1, connector2)); + when(connectorService.delete(ns, connector1)) + .thenReturn(Mono.just(HttpResponse.noContent())); + when(connectorService.delete(ns, connector2)) + .thenReturn(Mono.just(HttpResponse.noContent())); + when(securityService.username()).thenReturn(Optional.of("test-user")); + when(securityService.hasRole(ResourceBasedSecurityRule.IS_ADMIN)).thenReturn(false); + doNothing().when(applicationEventPublisher).publishEvent(any()); + + StepVerifier.create(connectorController.bulkDelete("test", "connect*", false)) + .consumeNextWith(response -> assertEquals(HttpStatus.NO_CONTENT, response.getStatus())) + .verifyComplete(); + } + + @Test + void shouldNotDeleteConnectorsWhenNotFound() { + Namespace ns = Namespace.builder() + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); + + when(namespaceService.findByName("test")) + .thenReturn(Optional.of(ns)); + when(connectorService.findByWildcardName(ns, "connect*")) + .thenReturn(List.of()); + + StepVerifier.create(connectorController.bulkDelete("test", "connect*", true)) + .consumeNextWith(response -> assertEquals(HttpStatus.NOT_FOUND, response.getStatus())) + .verifyComplete(); + + verify(connectorService, never()).delete(any(), any()); + } + + @Test + void shouldDeleteConnectorsInDryRunMode() { + Namespace ns = Namespace.builder() + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); + + Connector connector1 = Connector.builder() + .metadata(Metadata.builder() + .name("connect1") + .build()) + .build(); + + Connector connector2 = Connector.builder() + .metadata(Metadata.builder() + .name("connect2") + .build()) + .build(); + + when(namespaceService.findByName("test")) + .thenReturn(Optional.of(ns)); + when(connectorService.findByWildcardName(ns, "connect*")) + .thenReturn(List.of(connector1, connector2)); + when(connectorService.isNamespaceOwnerOfConnect(ns, "connect1")) + .thenReturn(true); + when(connectorService.isNamespaceOwnerOfConnect(ns, "connect2")) + .thenReturn(true); + + StepVerifier.create(connectorController.bulkDelete("test", "connect*", true)) + .consumeNextWith(response -> assertEquals(HttpStatus.NO_CONTENT, response.getStatus())) + .verifyComplete(); + + verify(connectorService, never()).delete(any(), any()); + } + + @Test + void shouldNotDeleteConnectorsWhenNotOwned() { + Namespace ns = Namespace.builder() + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); + + Connector connector1 = Connector.builder() + .metadata(Metadata.builder() + .name("connect1") + .build()) + .build(); + + Connector connector2 = Connector.builder() + .metadata(Metadata.builder() + .name("connect2") + .build()) + .build(); + + when(connectorService.findByWildcardName(ns, "connect*")) + .thenReturn(List.of(connector1, connector2)); + + when(namespaceService.findByName("test")) + .thenReturn(Optional.of(ns)); + when(connectorService.isNamespaceOwnerOfConnect(ns, "connect1")) + .thenReturn(false); + when(connectorService.isNamespaceOwnerOfConnect(ns, "connect2")) + .thenReturn(true); + + StepVerifier.create(connectorController.bulkDelete("test", "connect*", false)) + .consumeErrorWith(error -> { + assertEquals(ResourceValidationException.class, error.getClass()); + assertEquals(1, ((ResourceValidationException) error).getValidationErrors().size()); + assertEquals( + "Invalid value \"connect1\" for field \"name\": namespace is not owner of the resource.", + ((ResourceValidationException) error).getValidationErrors().getFirst()); + }) + .verify(); + } + @Test void shouldNotCreateConnectorWhenNotOwner() { Connector connector = Connector.builder() diff --git a/src/test/java/com/michelin/ns4kafka/integration/ConnectorIntegrationTest.java b/src/test/java/com/michelin/ns4kafka/integration/ConnectorIntegrationTest.java index 55c456c8..1400570e 100644 --- a/src/test/java/com/michelin/ns4kafka/integration/ConnectorIntegrationTest.java +++ b/src/test/java/com/michelin/ns4kafka/integration/ConnectorIntegrationTest.java @@ -33,6 +33,7 @@ import com.michelin.ns4kafka.validation.ConnectValidator; import com.michelin.ns4kafka.validation.TopicValidator; import io.micronaut.context.ApplicationContext; +import io.micronaut.core.type.Argument; import io.micronaut.http.HttpMethod; import io.micronaut.http.HttpRequest; import io.micronaut.http.HttpResponse; @@ -323,6 +324,20 @@ void shouldDeployConnectors() throws InterruptedException { assertTrue(actualConnectorWithFillParameter.config().containsKey("file")); assertEquals("test", actualConnectorWithFillParameter.config().get("file")); + + ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.DELETE, "/api/namespaces/ns1/connectors?name=ns1*") + .bearerAuth(token)); + + HttpResponse> connectors = ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.GET, "/api/namespaces/ns1/connectors") + .bearerAuth(token), Argument.listOf(Connector.class)); + + assertEquals(0, connectors.getBody().get().size()); } @Test @@ -610,7 +625,7 @@ private void forceConnectorSynchronization() throws InterruptedException { .blockLast(); // Wait for Kafka Connect to deploy and update connectors - Thread.sleep(3000); + Thread.sleep(4000); } private void waitForConnectorAndTasksToBeInState(String connector, Connector.TaskState state)