From c21c8ce24e9ec2d27d98c93915ec9f259c2fb796 Mon Sep 17 00:00:00 2001 From: thcai Date: Tue, 30 Jul 2024 10:08:31 +0200 Subject: [PATCH] connect cluster filter --- .../connect/ConnectClusterController.java | 7 +- .../michelin/ns4kafka/service/AclService.java | 32 +++ .../service/ConnectClusterService.java | 54 ++-- .../ConnectClusterControllerTest.java | 68 +++-- .../ns4kafka/service/AclServiceTest.java | 111 ++++++++ .../service/ConnectClusterServiceTest.java | 268 ++++++++++++++---- 6 files changed, 427 insertions(+), 113 deletions(-) diff --git a/src/main/java/com/michelin/ns4kafka/controller/connect/ConnectClusterController.java b/src/main/java/com/michelin/ns4kafka/controller/connect/ConnectClusterController.java index 84590d1f..9f66ec7e 100644 --- a/src/main/java/com/michelin/ns4kafka/controller/connect/ConnectClusterController.java +++ b/src/main/java/com/michelin/ns4kafka/controller/connect/ConnectClusterController.java @@ -50,14 +50,15 @@ public class ConnectClusterController extends NamespacedResourceController { ConnectorService connectorService; /** - * List Kafka Connect clusters by namespace. + * List Kafka Connect clusters by namespace, filtered by name parameter. * * @param namespace The namespace + * @param name The name parameter * @return A list of Kafka Connect clusters */ @Get - public List list(String namespace) { - return connectClusterService.findAllByNamespaceOwner(getNamespace(namespace)); + public List list(String namespace, @QueryValue(defaultValue = "*") String name) { + return connectClusterService.findAllByNamespaceOwner(getNamespace(namespace), name); } /** diff --git a/src/main/java/com/michelin/ns4kafka/service/AclService.java b/src/main/java/com/michelin/ns4kafka/service/AclService.java index 37abd2e4..f50b304c 100644 --- a/src/main/java/com/michelin/ns4kafka/service/AclService.java +++ b/src/main/java/com/michelin/ns4kafka/service/AclService.java @@ -262,6 +262,24 @@ public List findAllGrantedToNamespace(Namespace namespace) { .toList(); } + /** + * Find all owner-ACLs on a resource for a given namespace. + * + * @param namespace The namespace + * @param resourceType The resource + * @return A list of ACLs + */ + public List findResourceOwnerGrantedToNamespace(Namespace namespace, + AccessControlEntry.ResourceType resourceType) { + return accessControlEntryRepository.findAll() + .stream() + .filter(accessControlEntry -> + accessControlEntry.getSpec().getGrantedTo().equals(namespace.getMetadata().getName()) + && accessControlEntry.getSpec().getPermission() == AccessControlEntry.Permission.OWNER + && accessControlEntry.getSpec().getResourceType() == resourceType) + .toList(); + } + /** * Find all public granted ACLs. * @@ -340,4 +358,18 @@ public boolean isNamespaceOwnerOfResource(String namespace, AccessControlEntry.R public Optional findByName(String namespace, String name) { return accessControlEntryRepository.findByName(namespace, name); } + + /** + * Check if there is any ACL concerning the given resource. + * + * @param acls The OWNER ACL list on resource + * @param resourceName The resource name to check ACL against + * @return true if there is any OWNER ACL concerning the given resource, false otherwise + */ + public boolean isAnyAclOfResource(List acls, String resourceName) { + return acls.stream().anyMatch(acl -> switch (acl.getSpec().getResourcePatternType()) { + case PREFIXED -> resourceName.startsWith(acl.getSpec().getResource()); + case LITERAL -> resourceName.equals(acl.getSpec().getResource()); + }); + } } diff --git a/src/main/java/com/michelin/ns4kafka/service/ConnectClusterService.java b/src/main/java/com/michelin/ns4kafka/service/ConnectClusterService.java index 51184020..834e5786 100644 --- a/src/main/java/com/michelin/ns4kafka/service/ConnectClusterService.java +++ b/src/main/java/com/michelin/ns4kafka/service/ConnectClusterService.java @@ -18,6 +18,7 @@ import com.michelin.ns4kafka.service.client.connect.entities.ServerInfo; import com.michelin.ns4kafka.util.EncryptionUtils; import com.michelin.ns4kafka.util.FormatErrorUtils; +import com.michelin.ns4kafka.util.RegexUtils; import io.micronaut.core.util.StringUtils; import io.micronaut.http.HttpRequest; import io.micronaut.http.MutableHttpRequest; @@ -73,7 +74,7 @@ public class ConnectClusterService { * Find all self deployed Connect clusters. * * @param all Include hard-declared Connect clusters - * @return A list of Connect clusters + * @return A list of Connect clusters */ public Flux findAll(boolean all) { List results = connectClusterRepository.findAll(); @@ -116,26 +117,27 @@ public Flux findAll(boolean all) { } /** - * Find all self deployed Connect clusters for a given namespace with a given list of permissions. + * Find all self deployed Connect clusters of a given namespace, with a given list of permissions, + * filtered by name parameter. * * @param namespace The namespace + * @param name The name filter * @param permissions The list of permission to filter on * @return A list of Connect clusters */ - public List findAllByNamespace(Namespace namespace, + public List findAllByNamespace(Namespace namespace, String name, List permissions) { - List acls = aclService.findAllGrantedToNamespace(namespace).stream() - .filter(acl -> permissions.contains(acl.getSpec().getPermission())) - .filter(acl -> acl.getSpec().getResourceType() == AccessControlEntry.ResourceType.CONNECT_CLUSTER).toList(); + List acls = aclService.findAllGrantedToNamespace(namespace) + .stream() + .filter(acl -> permissions.contains(acl.getSpec().getPermission()) + && acl.getSpec().getResourceType() == AccessControlEntry.ResourceType.CONNECT_CLUSTER) + .toList(); + List nameFilterPatterns = RegexUtils.wildcardStringsToRegexPatterns(List.of(name)); return connectClusterRepository.findAllForCluster(namespace.getMetadata().getCluster()) .stream() - .filter(connector -> acls.stream().anyMatch(accessControlEntry -> switch (accessControlEntry.getSpec() - .getResourcePatternType()) { - case PREFIXED -> connector.getMetadata().getName() - .startsWith(accessControlEntry.getSpec().getResource()); - case LITERAL -> connector.getMetadata().getName().equals(accessControlEntry.getSpec().getResource()); - })) + .filter(cc -> aclService.isAnyAclOfResource(acls, cc.getMetadata().getName()) + && RegexUtils.filterByPattern(cc.getMetadata().getName(), nameFilterPatterns)) .toList(); } @@ -146,7 +148,18 @@ public List findAllByNamespace(Namespace namespace, * @return The list of owned Connect cluster */ public List findAllByNamespaceOwner(Namespace namespace) { - return findAllByNamespace(namespace, List.of(AccessControlEntry.Permission.OWNER)) + return findAllByNamespaceOwner(namespace, "*"); + } + + /** + * Find all self deployed Connect clusters whose namespace is owner, filtered by name parameter. + * + * @param namespace The namespace + * @param name The name filter + * @return The list of owned Connect cluster + */ + public List findAllByNamespaceOwner(Namespace namespace, String name) { + return findAllByNamespace(namespace, name, List.of(AccessControlEntry.Permission.OWNER)) .stream() .map(connectCluster -> { var builder = ConnectCluster.ConnectClusterSpec.builder() @@ -185,8 +198,8 @@ public List findAllByNamespaceOwner(Namespace namespace) { */ public List findAllByNamespaceWrite(Namespace namespace) { return Stream.concat( - this.findAllByNamespaceOwner(namespace).stream(), - this.findAllByNamespace(namespace, List.of(AccessControlEntry.Permission.WRITE)).stream() + this.findAllByNamespaceOwner(namespace, "*").stream(), + this.findAllByNamespace(namespace, "*", List.of(AccessControlEntry.Permission.WRITE)).stream() .map(connectCluster -> ConnectCluster.builder() .metadata(connectCluster.getMetadata()) .spec(ConnectCluster.ConnectClusterSpec.builder() @@ -209,10 +222,7 @@ public List findAllByNamespaceWrite(Namespace namespace) { * @return An optional connect worker */ public Optional findByNamespaceAndNameOwner(Namespace namespace, String connectClusterName) { - return findAllByNamespaceOwner(namespace) - .stream() - .filter(connectCluster -> connectCluster.getMetadata().getName().equals(connectClusterName)) - .findFirst(); + return findAllByNamespaceOwner(namespace, connectClusterName).stream().findFirst(); } /** @@ -296,7 +306,7 @@ public Mono> validateConnectClusterCreation(ConnectCluster connectC public List validateConnectClusterVault(final Namespace namespace, final String connectCluster) { final var errors = new ArrayList(); - final List kafkaConnects = findAllByNamespace(namespace, + final List kafkaConnects = findAllByNamespace(namespace, "*", List.of(AccessControlEntry.Permission.OWNER, AccessControlEntry.Permission.WRITE)); if (kafkaConnects.isEmpty()) { @@ -373,7 +383,7 @@ public boolean isNamespaceAllowedForConnectCluster(Namespace namespace, String c */ public List vaultPassword(final Namespace namespace, final String connectCluster, final List passwords) { - final Optional kafkaConnect = findAllByNamespace(namespace, + final Optional kafkaConnect = findAllByNamespace(namespace, "*", List.of(AccessControlEntry.Permission.OWNER, AccessControlEntry.Permission.WRITE)) .stream() .filter(cc -> @@ -410,6 +420,4 @@ public List vaultPassword(final Namespace namespace, final String .build()) .toList(); } - - } diff --git a/src/test/java/com/michelin/ns4kafka/controller/ConnectClusterControllerTest.java b/src/test/java/com/michelin/ns4kafka/controller/ConnectClusterControllerTest.java index 8979d994..1549ac99 100644 --- a/src/test/java/com/michelin/ns4kafka/controller/ConnectClusterControllerTest.java +++ b/src/test/java/com/michelin/ns4kafka/controller/ConnectClusterControllerTest.java @@ -62,7 +62,7 @@ class ConnectClusterControllerTest { ApplicationEventPublisher applicationEventPublisher; @Test - void listEmptyConnectClusters() { + void listEmptyConnectCluster() { Namespace ns = Namespace.builder() .metadata(Metadata.builder() .name("test") @@ -70,17 +70,14 @@ void listEmptyConnectClusters() { .build()) .build(); - when(namespaceService.findByName("test")) - .thenReturn(Optional.of(ns)); - when(connectClusterService.findAllByNamespaceOwner(ns)) - .thenReturn(List.of()); + when(namespaceService.findByName("test")).thenReturn(Optional.of(ns)); + when(connectClusterService.findAllByNamespaceOwner(ns, "*")).thenReturn(List.of()); - List actual = connectClusterController.list("test"); - assertTrue(actual.isEmpty()); + assertTrue(connectClusterController.list("test", "*").isEmpty()); } @Test - void listMultipleConnectClusters() { + void listConnectClusterWithoutNameParameter() { Namespace ns = Namespace.builder() .metadata(Metadata.builder() .name("test") @@ -88,21 +85,34 @@ void listMultipleConnectClusters() { .build()) .build(); - when(namespaceService.findByName("test")) - .thenReturn(Optional.of(ns)); - when(connectClusterService.findAllByNamespaceOwner(ns)) - .thenReturn(List.of( - ConnectCluster.builder() - .metadata(Metadata.builder().name("connect-cluster") - .build()) - .build(), - ConnectCluster.builder() - .metadata(Metadata.builder().name("connect-cluster2") - .build()) - .build())); + List ccs = List.of( + ConnectCluster.builder().metadata(Metadata.builder().name("connect-cluster").build()).build(), + ConnectCluster.builder().metadata(Metadata.builder().name("connect-cluster2").build()).build() + ); + + when(namespaceService.findByName("test")).thenReturn(Optional.of(ns)); + when(connectClusterService.findAllByNamespaceOwner(ns, "*")).thenReturn(ccs); + + assertEquals(ccs, connectClusterController.list("test", "*")); + } + + @Test + void listConnectClusterWithNameParameter() { + Namespace ns = Namespace.builder() + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); + + List ccs = List.of( + ConnectCluster.builder().metadata(Metadata.builder().name("connect-cluster").build()).build() + ); + + when(namespaceService.findByName("test")).thenReturn(Optional.of(ns)); + when(connectClusterService.findAllByNamespaceOwner(ns, "connect-cluster")).thenReturn(ccs); - List actual = connectClusterController.list("test"); - assertEquals(2, actual.size()); + assertEquals(ccs, connectClusterController.list("test", "connect-cluster")); } @Test @@ -269,7 +279,7 @@ void deleteConnectClusterWithConnectors() { assertEquals( "Invalid \"delete\" operation: The Kafka Connect \"connect-cluster\" has 1 deployed connector(s): " + "connect1. Please remove the associated connector(s) before deleting it.", - result.getValidationErrors().get(0)); + result.getValidationErrors().getFirst()); } @Test @@ -335,7 +345,7 @@ void createNewConnectClusterNotOwner() { assertEquals(1, ((ResourceValidationException) error).getValidationErrors().size()); assertEquals( "Invalid value \"connect-cluster\" for field \"name\": namespace is not owner of the resource.", - ((ResourceValidationException) error).getValidationErrors().get(0)); + ((ResourceValidationException) error).getValidationErrors().getFirst()); }) .verify(); } @@ -366,7 +376,7 @@ void createNewConnectClusterValidationError() { .consumeErrorWith(error -> { assertEquals(ResourceValidationException.class, error.getClass()); assertEquals(1, ((ResourceValidationException) error).getValidationErrors().size()); - assertEquals("Error occurred", ((ResourceValidationException) error).getValidationErrors().get(0)); + assertEquals("Error occurred", ((ResourceValidationException) error).getValidationErrors().getFirst()); }) .verify(); } @@ -559,7 +569,7 @@ void vaultOnNonAllowedConnectCluster() { assertEquals(1, result.getValidationErrors().size()); assertEquals("Invalid value \"connect-cluster-na\" for field \"connect-cluster\": " + "namespace is not allowed to use this Kafka Connect.", - result.getValidationErrors().get(0)); + result.getValidationErrors().getFirst()); } @Test @@ -584,7 +594,7 @@ void vaultOnNotValidAes256ConnectCluster() { ResourceValidationException result = assertThrows(ResourceValidationException.class, () -> connectClusterController.vaultPassword("test", connectClusterName, secrets)); assertEquals(1, result.getValidationErrors().size()); - assertEquals("Error config.", result.getValidationErrors().get(0)); + assertEquals("Error config.", result.getValidationErrors().getFirst()); } @Test @@ -614,7 +624,7 @@ void vaultOnValidAes256ConnectCluster() { final List actual = connectClusterController.vaultPassword("test", connectClusterName, List.of("secret")); - assertEquals("secret", actual.get(0).getSpec().getClearText()); - assertEquals("encryptedSecret", actual.get(0).getSpec().getEncrypted()); + assertEquals("secret", actual.getFirst().getSpec().getClearText()); + assertEquals("encryptedSecret", actual.getFirst().getSpec().getEncrypted()); } } diff --git a/src/test/java/com/michelin/ns4kafka/service/AclServiceTest.java b/src/test/java/com/michelin/ns4kafka/service/AclServiceTest.java index 0bc76966..e926534e 100644 --- a/src/test/java/com/michelin/ns4kafka/service/AclServiceTest.java +++ b/src/test/java/com/michelin/ns4kafka/service/AclServiceTest.java @@ -932,4 +932,115 @@ void shouldNotCollideIfDifferentResource() { assertFalse(aclService.topicAclsCollide(aceTopicPrefixedOwner, aceConnectLiteralOwner)); assertFalse(aclService.topicAclsCollide(aceConnectLiteralOwner, aceTopicPrefixedOwner)); } + + @Test + void findResourceOwnerAclGrantedToNamespace() { + Namespace ns = Namespace.builder() + .metadata(Metadata.builder().name("namespace1").build()).build(); + AccessControlEntry acl1 = AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .resourceType(AccessControlEntry.ResourceType.TOPIC) + .permission(AccessControlEntry.Permission.OWNER) + .grantedTo("namespace1").build()) + .build(); + AccessControlEntry acl2 = AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .resourceType(AccessControlEntry.ResourceType.TOPIC) + .permission(AccessControlEntry.Permission.READ) + .grantedTo("namespace1").build()) + .build(); + AccessControlEntry acl3 = AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .resourceType(AccessControlEntry.ResourceType.CONNECT) + .permission(AccessControlEntry.Permission.OWNER) + .grantedTo("namespace1").build()) + .build(); + AccessControlEntry acl4 = AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .resourceType(AccessControlEntry.ResourceType.TOPIC) + .permission(AccessControlEntry.Permission.OWNER) + .grantedTo("namespace2").build()) + .build(); + AccessControlEntry acl5 = AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .resourceType(AccessControlEntry.ResourceType.TOPIC) + .permission(AccessControlEntry.Permission.READ) + .grantedTo("*").build()) + .build(); + AccessControlEntry acl6 = AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .resourceType(AccessControlEntry.ResourceType.GROUP) + .permission(AccessControlEntry.Permission.WRITE) + .grantedTo("namespace1").build()) + .build(); + + when(accessControlEntryRepository.findAll()).thenReturn(List.of(acl1, acl2, acl3, acl4, acl5, acl6)); + + assertEquals(List.of(acl1), + aclService.findResourceOwnerGrantedToNamespace(ns, AccessControlEntry.ResourceType.TOPIC)); + assertEquals(List.of(acl3), + aclService.findResourceOwnerGrantedToNamespace(ns, AccessControlEntry.ResourceType.CONNECT)); + assertEquals(List.of(), + aclService.findResourceOwnerGrantedToNamespace(ns, AccessControlEntry.ResourceType.GROUP)); + } + + @Test + void isPrefixedAclOfResource() { + AccessControlEntry acl1 = AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .resourceType(AccessControlEntry.ResourceType.TOPIC) + .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) + .permission(AccessControlEntry.Permission.OWNER) + .resource("abc.") + .grantedTo("namespace") + .build()) + .build(); + + AccessControlEntry acl2 = AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .resourceType(AccessControlEntry.ResourceType.CONNECT) + .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) + .permission(AccessControlEntry.Permission.OWNER) + .resource("abc_") + .grantedTo("namespace") + .build()) + .build(); + List acls = List.of(acl1, acl2); + + assertFalse(aclService.isAnyAclOfResource(acls, "xyz.topic1")); + assertFalse(aclService.isAnyAclOfResource(acls, "topic1-abc")); + assertFalse(aclService.isAnyAclOfResource(acls, "abc-topic1")); + assertTrue(aclService.isAnyAclOfResource(acls, "abc.topic1")); + assertTrue(aclService.isAnyAclOfResource(acls, "abc_topic1")); + } + + @Test + void isLiteralAclOfResource() { + AccessControlEntry acl1 = AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .resourceType(AccessControlEntry.ResourceType.TOPIC) + .resourcePatternType(AccessControlEntry.ResourcePatternType.LITERAL) + .permission(AccessControlEntry.Permission.OWNER) + .resource("abc.topic1") + .grantedTo("namespace") + .build()) + .build(); + + AccessControlEntry acl2 = AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .resourceType(AccessControlEntry.ResourceType.CONNECT) + .resourcePatternType(AccessControlEntry.ResourcePatternType.LITERAL) + .permission(AccessControlEntry.Permission.OWNER) + .resource("abc-topic1") + .grantedTo("namespace") + .build()) + .build(); + List acls = List.of(acl1, acl2); + + assertFalse(aclService.isAnyAclOfResource(acls, "xyz.topic1")); + assertFalse(aclService.isAnyAclOfResource(acls, "abc.topic12")); + assertFalse(aclService.isAnyAclOfResource(acls, "abc_topic1")); + assertTrue(aclService.isAnyAclOfResource(acls, "abc.topic1")); + assertTrue(aclService.isAnyAclOfResource(acls, "abc-topic1")); + } } diff --git a/src/test/java/com/michelin/ns4kafka/service/ConnectClusterServiceTest.java b/src/test/java/com/michelin/ns4kafka/service/ConnectClusterServiceTest.java index 1b2f6195..e21c0180 100644 --- a/src/test/java/com/michelin/ns4kafka/service/ConnectClusterServiceTest.java +++ b/src/test/java/com/michelin/ns4kafka/service/ConnectClusterServiceTest.java @@ -1,6 +1,7 @@ package com.michelin.ns4kafka.service; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -29,7 +30,6 @@ import java.util.Map; import java.util.Optional; import java.util.stream.Stream; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.InjectMocks; @@ -208,21 +208,154 @@ void findAllForNamespace() { .build()) .build() )); + when(aclService.isAnyAclOfResource(any(), eq("prefix.connect-cluster"))).thenReturn(true); + when(aclService.isAnyAclOfResource(any(), eq("prefix2.connect-two"))).thenReturn(true); + when(aclService.isAnyAclOfResource(any(), eq("prefix3.connect-cluster"))).thenReturn(false); + when(aclService.isAnyAclOfResource(any(), eq("not-owner"))).thenReturn(false); - List actual = - connectClusterService.findAllByNamespace(namespace, List.of(AccessControlEntry.Permission.OWNER)); + assertEquals(List.of(connectCluster, connectClusterTwo), + connectClusterService.findAllByNamespace(namespace, "*", List.of(AccessControlEntry.Permission.OWNER))); + } - assertEquals(2, actual.size()); + @Test + void listConnectClusterWithNameParameter() { + Namespace namespace = Namespace.builder() + .metadata(Metadata.builder() + .name("myNamespace") + .cluster("local") + .build()) + .spec(Namespace.NamespaceSpec.builder().build()) + .build(); + + ConnectCluster cc1 = ConnectCluster.builder() + .metadata(Metadata.builder().name("prefix.connect").build()) + .spec(ConnectCluster.ConnectClusterSpec.builder().url("https://after").build()) + .build(); + + ConnectCluster cc2 = ConnectCluster.builder() + .metadata(Metadata.builder().name("prefix2.connect-two").build()) + .spec(ConnectCluster.ConnectClusterSpec.builder().url("https://after").build()) + .build(); + + List acls = List.of( + AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .permission(AccessControlEntry.Permission.OWNER) + .grantedTo("namespace") + .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) + .resourceType(AccessControlEntry.ResourceType.CONNECT_CLUSTER) + .resource("prefix.") + .build()) + .build(), + AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .permission(AccessControlEntry.Permission.OWNER) + .grantedTo("namespace") + .resourcePatternType(AccessControlEntry.ResourcePatternType.LITERAL) + .resourceType(AccessControlEntry.ResourceType.CONNECT_CLUSTER) + .resource("prefix2.connect-two") + .build()) + .build() + ); + + when(connectClusterRepository.findAllForCluster("local")).thenReturn(List.of(cc1, cc2)); + when(aclService.findAllGrantedToNamespace(namespace)).thenReturn(acls); + when(aclService.isAnyAclOfResource(acls, "prefix.connect")).thenReturn(true); + when(aclService.isAnyAclOfResource(acls, "prefix2.connect-two")).thenReturn(true); + + assertEquals(List.of(cc1), connectClusterService.findAllByNamespace(namespace, "prefix.connect", + List.of(AccessControlEntry.Permission.OWNER))); + assertTrue(connectClusterService.findAllByNamespace(namespace, "not-owner", + List.of(AccessControlEntry.Permission.OWNER)).isEmpty()); + } + + @Test + void listConnectClusterWithWildcardNameParameter() { + Namespace namespace = Namespace.builder() + .metadata(Metadata.builder() + .name("myNamespace") + .cluster("local") + .build()) + .spec(Namespace.NamespaceSpec.builder().build()) + .build(); + + ConnectCluster cc1 = ConnectCluster.builder() + .metadata(Metadata.builder().name("prefix.cc1").build()) + .spec(ConnectCluster.ConnectClusterSpec.builder().url("https://after").build()) + .build(); - assertTrue( - actual.stream().anyMatch(connector -> connector.getMetadata().getName().equals("prefix.connect-cluster"))); - assertTrue( - actual.stream().anyMatch(connector -> connector.getMetadata().getName().equals("prefix2.connect-two"))); + ConnectCluster cc2 = ConnectCluster.builder() + .metadata(Metadata.builder().name("prefix.cc2").build()) + .spec(ConnectCluster.ConnectClusterSpec.builder().url("https://after").build()) + .build(); - Assertions.assertFalse( - actual.stream().anyMatch(connector -> connector.getMetadata().getName().equals("not-owner"))); - Assertions.assertFalse( - actual.stream().anyMatch(connector -> connector.getMetadata().getName().equals("prefix3.connect-cluster"))); + ConnectCluster cc3 = ConnectCluster.builder() + .metadata(Metadata.builder().name("prefix2.connect-two").build()) + .spec(ConnectCluster.ConnectClusterSpec.builder().url("https://after").build()) + .build(); + + ConnectCluster cc4 = ConnectCluster.builder() + .metadata(Metadata.builder().name("prefix3.connect1").build()) + .spec(ConnectCluster.ConnectClusterSpec.builder().url("https://after").build()) + .build(); + + ConnectCluster cc5 = ConnectCluster.builder() + .metadata(Metadata.builder().name("prefix3.connect2").build()) + .spec(ConnectCluster.ConnectClusterSpec.builder().url("https://after").build()) + .build(); + + List acls = List.of( + AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .permission(AccessControlEntry.Permission.OWNER) + .grantedTo("namespace") + .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) + .resourceType(AccessControlEntry.ResourceType.CONNECT_CLUSTER) + .resource("prefix.") + .build()) + .build(), + AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .permission(AccessControlEntry.Permission.OWNER) + .grantedTo("namespace") + .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) + .resourceType(AccessControlEntry.ResourceType.CONNECT_CLUSTER) + .resource("prefix3.") + .build()) + .build(), + AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .permission(AccessControlEntry.Permission.OWNER) + .grantedTo("namespace") + .resourcePatternType(AccessControlEntry.ResourcePatternType.LITERAL) + .resourceType(AccessControlEntry.ResourceType.CONNECT_CLUSTER) + .resource("prefix2.connect-two") + .build()) + .build() + ); + + when(connectClusterRepository.findAllForCluster("local")).thenReturn(List.of(cc1, cc2, cc3, cc4, cc5)); + when(aclService.findAllGrantedToNamespace(namespace)).thenReturn(acls); + when(aclService.isAnyAclOfResource(any(), eq("prefix.cc1"))).thenReturn(true); + when(aclService.isAnyAclOfResource(any(), eq("prefix.cc2"))).thenReturn(true); + when(aclService.isAnyAclOfResource(any(), eq("prefix2.connect-two"))).thenReturn(true); + when(aclService.isAnyAclOfResource(any(), eq("prefix3.connect1"))).thenReturn(true); + when(aclService.isAnyAclOfResource(any(), eq("prefix3.connect2"))).thenReturn(true); + + assertEquals(List.of(cc1, cc2, cc3, cc4, cc5), connectClusterService.findAllByNamespace(namespace, "*", + List.of(AccessControlEntry.Permission.OWNER))); + assertEquals(List.of(cc1, cc2), connectClusterService.findAllByNamespace(namespace, "prefix.*", + List.of(AccessControlEntry.Permission.OWNER))); + assertEquals(List.of(cc4, cc5), connectClusterService.findAllByNamespace(namespace, "prefix?.connect?", + List.of(AccessControlEntry.Permission.OWNER))); + assertEquals(List.of(cc2, cc5), connectClusterService.findAllByNamespace(namespace, "*2", + List.of(AccessControlEntry.Permission.OWNER))); + assertEquals(List.of(cc3), connectClusterService.findAllByNamespace(namespace, "prefix*.*-two", + List.of(AccessControlEntry.Permission.OWNER))); + assertTrue(connectClusterService.findAllByNamespace(namespace, "*-three", + List.of(AccessControlEntry.Permission.OWNER)).isEmpty()); + assertTrue(connectClusterService.findAllByNamespace(namespace, "prefix?.cc?", + List.of(AccessControlEntry.Permission.OWNER)).isEmpty()); } @Test @@ -265,6 +398,8 @@ void findByNamespaceAndName() { .build() )); + when(aclService.isAnyAclOfResource(any(), eq("prefix.connect-cluster"))).thenReturn(true); + Optional actual = connectClusterService.findByNamespaceAndNameOwner(namespace, "prefix.connect-cluster"); @@ -313,6 +448,8 @@ void findByNamespaceAndNameUnhealthy() { .build() )); + when(aclService.isAnyAclOfResource(any(), eq("prefix.connect-cluster"))).thenReturn(true); + Optional actual = connectClusterService.findByNamespaceAndNameOwner(namespace, "prefix.connect-cluster"); @@ -343,9 +480,6 @@ void findByNamespaceAndNameEmpty() { .build()) .build(); - when(kafkaConnectClient.version("local", "prefix.connect-cluster")) - .thenReturn(Mono.just(HttpResponse.ok())); - when(connectClusterRepository.findAllForCluster("local")) .thenReturn(List.of(connectCluster)); @@ -362,10 +496,9 @@ void findByNamespaceAndNameEmpty() { .build() )); - Optional actual = - connectClusterService.findByNamespaceAndNameOwner(namespace, "does-not-exist"); + when(aclService.isAnyAclOfResource(any(), eq("prefix.connect-cluster"))).thenReturn(true); - assertTrue(actual.isEmpty()); + assertTrue(connectClusterService.findByNamespaceAndNameOwner(namespace, "does-not-exist").isEmpty()); } @Test @@ -430,7 +563,7 @@ void validateConnectClusterCreationAlreadyDefined() { assertEquals( "Invalid value \"test-connect\" for field \"name\": a Kafka Connect is already defined " + "globally with this name. Please provide a different name.", - errors.get(0)); + errors.getFirst()); }) .verifyComplete(); } @@ -454,7 +587,7 @@ void validateConnectClusterCreationDown() { StepVerifier.create(connectClusterService.validateConnectClusterCreation(connectCluster)) .consumeNextWith(errors -> { assertEquals(1L, errors.size()); - assertEquals("Invalid \"test-connect\": the Kafka Connect is not healthy (error).", errors.get(0)); + assertEquals("Invalid \"test-connect\": the Kafka Connect is not healthy (error).", errors.getFirst()); }) .verifyComplete(); } @@ -475,7 +608,7 @@ void validateConnectClusterCreationMalformedUrl() { .consumeNextWith(errors -> { assertEquals(1L, errors.size()); assertEquals("Invalid value \"malformed-url\" for field \"url\": malformed URL.", - errors.get(0)); + errors.getFirst()); }) .verifyComplete(); } @@ -505,7 +638,7 @@ void validateConnectClusterCreationBadAes256MissingSalt() { assertEquals(1L, errors.size()); assertEquals("Invalid empty value for fields \"aes256Key, aes256Salt\": " + "AES key and salt are required to activate encryption.", - errors.get(0)); + errors.getFirst()); }) .verifyComplete(); } @@ -535,7 +668,7 @@ void validateConnectClusterCreationBadAes256MissingKey() { assertEquals(1L, errors.size()); assertEquals("Invalid empty value for fields \"aes256Key, aes256Salt\": " + "AES key and salt are required to activate encryption.", - errors.get(0)); + errors.getFirst()); }) .verifyComplete(); } @@ -607,7 +740,7 @@ void validateConnectClusterVaultNoClusterAvailable() { assertEquals(1L, errors.size()); assertEquals("Invalid value \"prefix.fake-connect-cluster\" for field \"name\": resource not found.", - errors.get(0)); + errors.getFirst()); } @Test @@ -676,12 +809,16 @@ void validateConnectClusterVaultNoClusterAvailableWithAes256() { .build() )); + when(aclService.isAnyAclOfResource(any(), eq("prefix1.connect-cluster"))).thenReturn(true); + when(aclService.isAnyAclOfResource(any(), eq("prefix2.connect-cluster"))).thenReturn(true); + when(aclService.isAnyAclOfResource(any(), eq("prefix3.connect-cluster"))).thenReturn(true); + List errors = connectClusterService.validateConnectClusterVault(namespace, "prefix1.fake-connect-cluster"); assertEquals(1L, errors.size()); assertEquals("Invalid empty value for fields \"aes256Key, aes256Salt\": " - + "AES key and salt are required to activate encryption.", errors.get(0)); + + "AES key and salt are required to activate encryption.", errors.getFirst()); } /** @@ -756,12 +893,16 @@ void validateConnectClusterVaultClusterNotAvailable() { .build() )); + when(aclService.isAnyAclOfResource(any(), eq("prefix1.connect-cluster"))).thenReturn(true); + when(aclService.isAnyAclOfResource(any(), eq("prefix2.connect-cluster"))).thenReturn(true); + when(aclService.isAnyAclOfResource(any(), eq("prefix3.connect-cluster"))).thenReturn(true); + List errors = connectClusterService.validateConnectClusterVault(namespace, "prefix1.fake-connect-cluster"); assertEquals(1L, errors.size()); assertEquals("Invalid value \"prefix1.fake-connect-cluster\" for field \"name\": " - + "value must be one of \"prefix2.connect-cluster, prefix3.connect-cluster\".", errors.get(0)); + + "value must be one of \"prefix2.connect-cluster, prefix3.connect-cluster\".", errors.getFirst()); } @Test @@ -800,6 +941,8 @@ void validateConnectClusterVault() { .build() )); + when(aclService.isAnyAclOfResource(any(), eq("prefix.connect-cluster"))).thenReturn(true); + List errors = connectClusterService.validateConnectClusterVault(namespace, "prefix.connect-cluster"); assertEquals(0L, errors.size()); @@ -843,7 +986,7 @@ void vaultPasswordNoConnectClusterWithAes256Config() { List actual = connectClusterService.vaultPassword(namespace, "prefix.connect-cluster", List.of("secret")); - assertEquals("secret", actual.get(0).getSpec().getEncrypted()); + assertEquals("secret", actual.getFirst().getSpec().getEncrypted()); } /** @@ -885,42 +1028,44 @@ void findAllByNamespaceWriteAsOwner() { .build()) .build(); - when(kafkaConnectClient.version(any(), any())) - .thenReturn(Mono.just(HttpResponse.ok())); + AccessControlEntry acl1 = AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .permission(AccessControlEntry.Permission.WRITE) + .grantedTo("namespace") + .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) + .resourceType(AccessControlEntry.ResourceType.CONNECT_CLUSTER) + .resource("prefix.") + .build()) + .build(); - when(connectClusterRepository.findAllForCluster("local")) - .thenReturn(List.of(connectCluster, connectClusterOwner)); + AccessControlEntry acl2 = AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .permission(AccessControlEntry.Permission.OWNER) + .grantedTo("namespace") + .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) + .resourceType(AccessControlEntry.ResourceType.CONNECT_CLUSTER) + .resource("owner.") + .build()) + .build(); - when(aclService.findAllGrantedToNamespace(namespace)) - .thenReturn(List.of( - AccessControlEntry.builder() - .spec(AccessControlEntry.AccessControlEntrySpec.builder() - .permission(AccessControlEntry.Permission.WRITE) - .grantedTo("namespace") - .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) - .resourceType(AccessControlEntry.ResourceType.CONNECT_CLUSTER) - .resource("prefix.") - .build()) - .build(), - AccessControlEntry.builder() - .spec(AccessControlEntry.AccessControlEntrySpec.builder() - .permission(AccessControlEntry.Permission.OWNER) - .grantedTo("namespace") - .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) - .resourceType(AccessControlEntry.ResourceType.CONNECT_CLUSTER) - .resource("owner.") - .build()) - .build() - )); + when(aclService.findAllGrantedToNamespace(namespace)).thenReturn(List.of(acl1, acl2)); + when(connectClusterRepository.findAllForCluster("local")) + .thenReturn(List.of(connectCluster, connectClusterOwner)); + when(aclService.isAnyAclOfResource(List.of(acl2), "owner.connect-cluster")).thenReturn(true); + when(aclService.isAnyAclOfResource(List.of(acl2), "prefix.connect-cluster")).thenReturn(false); when(securityProperties.getAes256EncryptionKey()).thenReturn(encryptKey); + when(kafkaConnectClient.version(any(), any())).thenReturn(Mono.just(HttpResponse.ok())); + when(aclService.isAnyAclOfResource(List.of(acl1), "prefix.connect-cluster")).thenReturn(true); + when(aclService.isAnyAclOfResource(List.of(acl1), "owner.connect-cluster")).thenReturn(false); + List actual = connectClusterService.findAllByNamespaceWrite(namespace); assertEquals(2, actual.size()); // 1rts is for owner with decrypted values - assertEquals("password", actual.get(0).getSpec().getPassword()); - assertEquals("aes256Key", actual.get(0).getSpec().getAes256Key()); - assertEquals("aes256Salt", actual.get(0).getSpec().getAes256Salt()); + assertEquals("password", actual.getFirst().getSpec().getPassword()); + assertEquals("aes256Key", actual.getFirst().getSpec().getAes256Key()); + assertEquals("aes256Salt", actual.getFirst().getSpec().getAes256Salt()); // second is only for write with wildcards assertEquals("*****", actual.get(1).getSpec().getPassword()); @@ -1020,6 +1165,8 @@ void shouldNamespaceAllowedForConnectCluster() { .build() )); + when(aclService.isAnyAclOfResource(any(), eq("prefix.connect-cluster"))).thenReturn(true); + boolean actual = connectClusterService.isNamespaceAllowedForConnectCluster(namespace, "prefix.connect-cluster"); assertTrue(actual); @@ -1082,10 +1229,13 @@ void shouldNamespaceNotAllowedForConnectCluster() { .build() )); + when(aclService.isAnyAclOfResource(any(), eq("prefix.connect-cluster"))).thenReturn(true); + when(aclService.isAnyAclOfResource(any(), eq("owner.connect-cluster"))).thenReturn(true); + boolean actual = connectClusterService.isNamespaceAllowedForConnectCluster(namespace, "not-allowed-prefix.connect-cluster"); - Assertions.assertFalse(actual); + assertFalse(actual); } /** @@ -1130,11 +1280,12 @@ void vaultPasswordWithoutFormat() { )); when(securityProperties.getAes256EncryptionKey()).thenReturn("changeitchangeitchangeitchangeit"); + when(aclService.isAnyAclOfResource(any(), eq("prefix.connect-cluster"))).thenReturn(true); List actual = connectClusterService.vaultPassword(namespace, "prefix.connect-cluster", List.of("secret")); - assertTrue(actual.get(0).getSpec().getEncrypted().matches("^\\$\\{aes256\\:.*\\}")); + assertTrue(actual.getFirst().getSpec().getEncrypted().matches("^\\$\\{aes256\\:.*\\}")); } /** @@ -1180,10 +1331,11 @@ void vaultPasswordWithFormat() { )); when(securityProperties.getAes256EncryptionKey()).thenReturn("changeitchangeitchangeitchangeit"); + when(aclService.isAnyAclOfResource(any(), eq("prefix.connect-cluster"))).thenReturn(true); List actual = connectClusterService.vaultPassword(namespace, "prefix.connect-cluster", List.of("secret")); - Assertions.assertFalse(actual.get(0).getSpec().getEncrypted().matches("^\\$\\{aes256\\:.*\\}")); + assertFalse(actual.getFirst().getSpec().getEncrypted().matches("^\\$\\{aes256\\:.*\\}")); } }