From 8e6f00c6fa0dbd40ae5765b524f4b847c280b57e Mon Sep 17 00:00:00 2001 From: thcai Date: Thu, 25 Jul 2024 17:07:07 +0200 Subject: [PATCH] filter feature on connector --- .../controller/ConnectorController.java | 7 +- .../michelin/ns4kafka/service/AclService.java | 2 +- .../ns4kafka/service/ConnectorService.java | 37 +- .../controller/ConnectorControllerTest.java | 40 +- .../ns4kafka/service/AclServiceTest.java | 32 +- .../service/ConnectorServiceTest.java | 588 ++++++++++-------- 6 files changed, 386 insertions(+), 320 deletions(-) diff --git a/src/main/java/com/michelin/ns4kafka/controller/ConnectorController.java b/src/main/java/com/michelin/ns4kafka/controller/ConnectorController.java index 058bc97d..99c597da 100644 --- a/src/main/java/com/michelin/ns4kafka/controller/ConnectorController.java +++ b/src/main/java/com/michelin/ns4kafka/controller/ConnectorController.java @@ -47,14 +47,15 @@ public class ConnectorController extends NamespacedResourceController { ResourceQuotaService resourceQuotaService; /** - * List connectors by namespace. + * List connectors by namespace, filtered by name parameter. * * @param namespace The namespace + * @param name The name parameter * @return A list of connectors */ @Get - public List list(String namespace) { - return connectorService.findAllForNamespace(getNamespace(namespace)); + public List list(String namespace, @QueryValue(defaultValue = "*") String name) { + return connectorService.findAllForNamespace(getNamespace(namespace), name); } /** diff --git a/src/main/java/com/michelin/ns4kafka/service/AclService.java b/src/main/java/com/michelin/ns4kafka/service/AclService.java index 23de59b4..cfb96eca 100644 --- a/src/main/java/com/michelin/ns4kafka/service/AclService.java +++ b/src/main/java/com/michelin/ns4kafka/service/AclService.java @@ -374,4 +374,4 @@ public boolean isAnyAclOfResource(List acls, String resource case LITERAL -> resourceName.equals(acl.getSpec().getResource()); }); } -} \ No newline at end of file +} diff --git a/src/main/java/com/michelin/ns4kafka/service/ConnectorService.java b/src/main/java/com/michelin/ns4kafka/service/ConnectorService.java index 0af76f55..a96288cc 100644 --- a/src/main/java/com/michelin/ns4kafka/service/ConnectorService.java +++ b/src/main/java/com/michelin/ns4kafka/service/ConnectorService.java @@ -13,6 +13,7 @@ import com.michelin.ns4kafka.service.client.connect.entities.ConnectorSpecs; import com.michelin.ns4kafka.service.executor.ConnectorAsyncExecutor; import com.michelin.ns4kafka.util.FormatErrorUtils; +import com.michelin.ns4kafka.util.RegexUtils; import io.micronaut.context.ApplicationContext; import io.micronaut.core.util.StringUtils; import io.micronaut.http.HttpResponse; @@ -57,23 +58,29 @@ public class ConnectorService { * @return A list of connectors */ public List findAllForNamespace(Namespace namespace) { - List acls = aclService.findAllGrantedToNamespace(namespace); + List acls = aclService + .findResourceOwnerGrantedToNamespace(namespace, AccessControlEntry.ResourceType.CONNECT); return connectorRepository.findAllForCluster(namespace.getMetadata().getCluster()) .stream() - .filter(connector -> acls.stream().anyMatch(accessControlEntry -> { - if (accessControlEntry.getSpec().getPermission() != AccessControlEntry.Permission.OWNER) { - return false; - } - if (accessControlEntry.getSpec().getResourceType() == AccessControlEntry.ResourceType.CONNECT) { - return switch (accessControlEntry.getSpec().getResourcePatternType()) { - case PREFIXED -> connector.getMetadata().getName() - .startsWith(accessControlEntry.getSpec().getResource()); - case LITERAL -> - connector.getMetadata().getName().equals(accessControlEntry.getSpec().getResource()); - }; - } - return false; - })) + .filter(connector -> aclService.isAnyAclOfResource(acls, connector.getMetadata().getName())) + .toList(); + } + + /** + * Find all connectors by given namespace, filtered by name parameter. + * + * @param namespace The namespace + * @param name The name filter + * @return A list of connectors + */ + public List findAllForNamespace(Namespace namespace, String name) { + List acls = aclService + .findResourceOwnerGrantedToNamespace(namespace, AccessControlEntry.ResourceType.CONNECT); + List nameFilterPatterns = RegexUtils.wildcardStringsToRegexPatterns(List.of(name)); + return connectorRepository.findAllForCluster(namespace.getMetadata().getCluster()) + .stream() + .filter(connector -> aclService.isAnyAclOfResource(acls, connector.getMetadata().getName()) + && RegexUtils.filterByPattern(connector.getMetadata().getName(), nameFilterPatterns)) .toList(); } diff --git a/src/test/java/com/michelin/ns4kafka/controller/ConnectorControllerTest.java b/src/test/java/com/michelin/ns4kafka/controller/ConnectorControllerTest.java index 726dd616..ce9a0478 100644 --- a/src/test/java/com/michelin/ns4kafka/controller/ConnectorControllerTest.java +++ b/src/test/java/com/michelin/ns4kafka/controller/ConnectorControllerTest.java @@ -68,13 +68,10 @@ void listEmptyConnectors() { .build()) .build(); - when(namespaceService.findByName("test")) - .thenReturn(Optional.of(ns)); - when(connectorService.findAllForNamespace(ns)) - .thenReturn(List.of()); + when(namespaceService.findByName("test")).thenReturn(Optional.of(ns)); + when(connectorService.findAllForNamespace(ns, "*")).thenReturn(List.of()); - List actual = connectorController.list("test"); - assertTrue(actual.isEmpty()); + assertTrue(connectorController.list("test", "*").isEmpty()); } @Test @@ -86,15 +83,30 @@ void listMultipleConnectors() { .build()) .build(); - when(namespaceService.findByName("test")) - .thenReturn(Optional.of(ns)); - when(connectorService.findAllForNamespace(ns)) - .thenReturn(List.of( - Connector.builder().metadata(Metadata.builder().name("connect1").build()).build(), - Connector.builder().metadata(Metadata.builder().name("connect2").build()).build())); + Connector connector1 = Connector.builder().metadata(Metadata.builder().name("connect1").build()).build(); + Connector connector2 = Connector.builder().metadata(Metadata.builder().name("connect2").build()).build(); + + when(namespaceService.findByName("test")).thenReturn(Optional.of(ns)); + when(connectorService.findAllForNamespace(ns, "*")).thenReturn(List.of(connector1, connector2)); + + assertEquals(List.of(connector1, connector2), connectorController.list("test", "*")); + } + + @Test + void listConnectorWithNameParameter() { + Namespace ns = Namespace.builder() + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); + + Connector connector = Connector.builder().metadata(Metadata.builder().name("connect1").build()).build(); + + when(namespaceService.findByName("test")).thenReturn(Optional.of(ns)); + when(connectorService.findAllForNamespace(ns, "connect1")).thenReturn(List.of(connector)); - List actual = connectorController.list("test"); - assertEquals(2, actual.size()); + assertEquals(List.of(connector), connectorController.list("test", "connect1")); } @Test diff --git a/src/test/java/com/michelin/ns4kafka/service/AclServiceTest.java b/src/test/java/com/michelin/ns4kafka/service/AclServiceTest.java index 64e52630..e926534e 100644 --- a/src/test/java/com/michelin/ns4kafka/service/AclServiceTest.java +++ b/src/test/java/com/michelin/ns4kafka/service/AclServiceTest.java @@ -1017,24 +1017,24 @@ void isPrefixedAclOfResource() { @Test void isLiteralAclOfResource() { AccessControlEntry acl1 = AccessControlEntry.builder() - .spec(AccessControlEntry.AccessControlEntrySpec.builder() - .resourceType(AccessControlEntry.ResourceType.TOPIC) - .resourcePatternType(AccessControlEntry.ResourcePatternType.LITERAL) - .permission(AccessControlEntry.Permission.OWNER) - .resource("abc.topic1") - .grantedTo("namespace") - .build()) - .build(); + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .resourceType(AccessControlEntry.ResourceType.TOPIC) + .resourcePatternType(AccessControlEntry.ResourcePatternType.LITERAL) + .permission(AccessControlEntry.Permission.OWNER) + .resource("abc.topic1") + .grantedTo("namespace") + .build()) + .build(); AccessControlEntry acl2 = AccessControlEntry.builder() - .spec(AccessControlEntry.AccessControlEntrySpec.builder() - .resourceType(AccessControlEntry.ResourceType.CONNECT) - .resourcePatternType(AccessControlEntry.ResourcePatternType.LITERAL) - .permission(AccessControlEntry.Permission.OWNER) - .resource("abc-topic1") - .grantedTo("namespace") - .build()) - .build(); + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .resourceType(AccessControlEntry.ResourceType.CONNECT) + .resourcePatternType(AccessControlEntry.ResourcePatternType.LITERAL) + .permission(AccessControlEntry.Permission.OWNER) + .resource("abc-topic1") + .grantedTo("namespace") + .build()) + .build(); List acls = List.of(acl1, acl2); assertFalse(aclService.isAnyAclOfResource(acls, "xyz.topic1")); diff --git a/src/test/java/com/michelin/ns4kafka/service/ConnectorServiceTest.java b/src/test/java/com/michelin/ns4kafka/service/ConnectorServiceTest.java index 54efe431..42035b71 100644 --- a/src/test/java/com/michelin/ns4kafka/service/ConnectorServiceTest.java +++ b/src/test/java/com/michelin/ns4kafka/service/ConnectorServiceTest.java @@ -1,6 +1,7 @@ package com.michelin.ns4kafka.service; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; @@ -34,7 +35,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentMatchers; @@ -66,23 +66,17 @@ class ConnectorServiceTest { ConnectClusterService connectClusterService; @Test - void findByNamespaceNone() { + void listNoConnector() { Namespace ns = Namespace.builder() .metadata(Metadata.builder() .name("namespace") .cluster("local") .build()) - .spec(NamespaceSpec.builder() - .connectClusters(List.of("local-name")) - .build()) .build(); - when(connectorRepository.findAllForCluster("local")) - .thenReturn(List.of()); + when(connectorRepository.findAllForCluster("local")).thenReturn(List.of()); - List actual = connectorService.findAllForNamespace(ns); - - assertTrue(actual.isEmpty()); + assertTrue(connectorService.findAllForNamespace(ns).isEmpty()); } @Test @@ -92,113 +86,210 @@ void findByNamespaceMultiple() { .name("namespace") .cluster("local") .build()) - .spec(NamespaceSpec.builder() - .connectClusters(List.of("local-name")) - .build()) .build(); - Connector c1 = Connector.builder() - .metadata(Metadata.builder().name("ns-connect1").build()) - .build(); - Connector c2 = Connector.builder() - .metadata(Metadata.builder().name("ns-connect2").build()) - .build(); - Connector c3 = Connector.builder() - .metadata(Metadata.builder().name("other-connect1").build()) - .build(); - Connector c4 = Connector.builder() - .metadata(Metadata.builder().name("other-connect2").build()) - .build(); - Connector c5 = Connector.builder() - .metadata(Metadata.builder().name("ns2-connect1").build()) + Connector c1 = Connector.builder().metadata(Metadata.builder().name("ns-connect1").build()).build(); + Connector c2 = Connector.builder().metadata(Metadata.builder().name("ns-connect2").build()).build(); + Connector c3 = Connector.builder().metadata(Metadata.builder().name("other-connect1").build()).build(); + Connector c4 = Connector.builder().metadata(Metadata.builder().name("other-connect2").build()).build(); + Connector c5 = Connector.builder().metadata(Metadata.builder().name("ns2-connect1").build()).build(); + + List acls = List.of( + AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .permission(AccessControlEntry.Permission.OWNER) + .grantedTo("namespace") + .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) + .resourceType(AccessControlEntry.ResourceType.CONNECT) + .resource("ns-") + .build()) + .build(), + AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .permission(AccessControlEntry.Permission.OWNER) + .grantedTo("namespace") + .resourcePatternType(AccessControlEntry.ResourcePatternType.LITERAL) + .resourceType(AccessControlEntry.ResourceType.CONNECT) + .resource("other-connect1") + .build()) + .build() + ); + + when(aclService.findResourceOwnerGrantedToNamespace(ns, AccessControlEntry.ResourceType.CONNECT)) + .thenReturn(acls); + + when(connectorRepository.findAllForCluster("local")).thenReturn(List.of(c1, c2, c3, c4, c5)); + when(aclService.isAnyAclOfResource(acls, "ns-connect1")).thenReturn(true); + when(aclService.isAnyAclOfResource(acls, "ns-connect2")).thenReturn(true); + when(aclService.isAnyAclOfResource(acls, "other-connect1")).thenReturn(true); + when(aclService.isAnyAclOfResource(acls, "other-connect2")).thenReturn(false); + when(aclService.isAnyAclOfResource(acls, "ns2-connect1")).thenReturn(false); + + assertEquals(List.of(c1, c2, c3), connectorService.findAllForNamespace(ns)); + } + + @Test + void listConnectorWithoutParameter() { + Namespace ns = Namespace.builder() + .metadata(Metadata.builder() + .name("namespace") + .cluster("local") + .build()) .build(); - when(aclService.findAllGrantedToNamespace(ns)) - .thenReturn(List.of( - AccessControlEntry.builder() - .spec(AccessControlEntry.AccessControlEntrySpec.builder() - .permission(AccessControlEntry.Permission.OWNER) - .grantedTo("namespace") - .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) - .resourceType(AccessControlEntry.ResourceType.CONNECT) - .resource("ns-") - .build()) - .build(), - AccessControlEntry.builder() - .spec(AccessControlEntry.AccessControlEntrySpec.builder() - .permission(AccessControlEntry.Permission.OWNER) - .grantedTo("namespace") - .resourcePatternType(AccessControlEntry.ResourcePatternType.LITERAL) - .resourceType(AccessControlEntry.ResourceType.CONNECT) - .resource("other-connect1") - .build()) - .build(), - AccessControlEntry.builder() - .spec(AccessControlEntry.AccessControlEntrySpec.builder() - .permission(AccessControlEntry.Permission.OWNER) - .grantedTo("namespace") - .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) - .resourceType(AccessControlEntry.ResourceType.TOPIC) - .resource("ns-") - .build()) - .build(), - AccessControlEntry.builder() - .spec(AccessControlEntry.AccessControlEntrySpec.builder() - .permission(AccessControlEntry.Permission.READ) - .grantedTo("namespace") - .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) - .resourceType(AccessControlEntry.ResourceType.CONNECT) - .resource("ns2-") - .build()) - .build(), - AccessControlEntry.builder() - .spec(AccessControlEntry.AccessControlEntrySpec.builder() - .permission(AccessControlEntry.Permission.WRITE) - .grantedTo("namespace") - .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) - .resourceType(AccessControlEntry.ResourceType.CONNECT) - .resource("ns3-") - .build()) - .build() - )); + Connector c1 = Connector.builder().metadata(Metadata.builder().name("ns-connect1").build()).build(); + Connector c2 = Connector.builder().metadata(Metadata.builder().name("other-connect1").build()).build(); + Connector c3 = Connector.builder().metadata(Metadata.builder().name("other-connect2").build()).build(); + Connector c4 = Connector.builder().metadata(Metadata.builder().name("ns2-connect1").build()).build(); - when(connectorRepository.findAllForCluster("local")) - .thenReturn(List.of(c1, c2, c3, c4, c5)); + List acls = List.of( + AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .permission(AccessControlEntry.Permission.OWNER) + .grantedTo("namespace") + .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) + .resourceType(AccessControlEntry.ResourceType.CONNECT) + .resource("ns-") + .build()) + .build(), + AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .permission(AccessControlEntry.Permission.OWNER) + .grantedTo("namespace") + .resourcePatternType(AccessControlEntry.ResourcePatternType.LITERAL) + .resourceType(AccessControlEntry.ResourceType.CONNECT) + .resource("other-connect1") + .build()) + .build() + ); - List actual = connectorService.findAllForNamespace(ns); + when(aclService.findResourceOwnerGrantedToNamespace(ns, AccessControlEntry.ResourceType.CONNECT)) + .thenReturn(acls); - assertEquals(3, actual.size()); - // contains - assertTrue(actual.stream().anyMatch(connector -> connector.getMetadata().getName().equals("ns-connect1"))); - assertTrue(actual.stream().anyMatch(connector -> connector.getMetadata().getName().equals("ns-connect2"))); - assertTrue(actual.stream().anyMatch(connector -> connector.getMetadata().getName().equals("other-connect1"))); - // doesn't contain - Assertions.assertFalse( - actual.stream().anyMatch(connector -> connector.getMetadata().getName().equals("other-connect2"))); - Assertions.assertFalse( - actual.stream().anyMatch(connector -> connector.getMetadata().getName().equals("ns2-connect1"))); - Assertions.assertFalse( - actual.stream().anyMatch(connector -> connector.getMetadata().getName().equals("ns3-connect1"))); + when(connectorRepository.findAllForCluster("local")).thenReturn(List.of(c1, c2, c3, c4)); + when(aclService.isAnyAclOfResource(acls, "ns-connect1")).thenReturn(true); + when(aclService.isAnyAclOfResource(acls, "other-connect1")).thenReturn(true); + when(aclService.isAnyAclOfResource(acls, "other-connect2")).thenReturn(false); + when(aclService.isAnyAclOfResource(acls, "ns2-connect1")).thenReturn(false); + + assertEquals(List.of(c1, c2), connectorService.findAllForNamespace(ns, "*")); } @Test - void findByNameNotFound() { + void listConnectorWithNameParameter() { Namespace ns = Namespace.builder() .metadata(Metadata.builder() .name("namespace") .cluster("local") .build()) - .spec(NamespaceSpec.builder() - .connectClusters(List.of("local-name")) + .build(); + + Connector c1 = Connector.builder().metadata(Metadata.builder().name("ns-connect1").build()).build(); + Connector c2 = Connector.builder().metadata(Metadata.builder().name("other-connect1").build()).build(); + Connector c3 = Connector.builder().metadata(Metadata.builder().name("other-connect2").build()).build(); + Connector c4 = Connector.builder().metadata(Metadata.builder().name("ns2-connect1").build()).build(); + + List acls = List.of( + AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .permission(AccessControlEntry.Permission.OWNER) + .grantedTo("namespace") + .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) + .resourceType(AccessControlEntry.ResourceType.CONNECT) + .resource("ns-") + .build()) + .build(), + AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .permission(AccessControlEntry.Permission.OWNER) + .grantedTo("namespace") + .resourcePatternType(AccessControlEntry.ResourcePatternType.LITERAL) + .resourceType(AccessControlEntry.ResourceType.CONNECT) + .resource("other-connect1") + .build()) + .build() + ); + + when(aclService.findResourceOwnerGrantedToNamespace(ns, AccessControlEntry.ResourceType.CONNECT)) + .thenReturn(acls); + + when(connectorRepository.findAllForCluster("local")).thenReturn(List.of(c1, c2, c3, c4)); + when(aclService.isAnyAclOfResource(acls, "ns-connect1")).thenReturn(true); + when(aclService.isAnyAclOfResource(acls, "other-connect1")).thenReturn(true); + when(aclService.isAnyAclOfResource(acls, "other-connect2")).thenReturn(false); + when(aclService.isAnyAclOfResource(acls, "ns2-connect1")).thenReturn(false); + + assertEquals(List.of(c1), connectorService.findAllForNamespace(ns, "ns-connect1")); + assertEquals(List.of(c2), connectorService.findAllForNamespace(ns, "other-connect1")); + assertTrue(connectorService.findAllForNamespace(ns, "ns2-connect1").isEmpty()); + assertTrue(connectorService.findAllForNamespace(ns, "ns4-connect1").isEmpty()); + } + + @Test + void listConnectorWithWildcardNameParameter() { + Namespace ns = Namespace.builder() + .metadata(Metadata.builder() + .name("namespace") + .cluster("local") .build()) .build(); - when(connectorRepository.findAllForCluster("local")) - .thenReturn(List.of()); + Connector c1 = Connector.builder().metadata(Metadata.builder().name("ns-connect1").build()).build(); + Connector c2 = Connector.builder().metadata(Metadata.builder().name("ns-connect2").build()).build(); + Connector c3 = Connector.builder().metadata(Metadata.builder().name("other-connect1").build()).build(); + Connector c4 = Connector.builder().metadata(Metadata.builder().name("other-connect2").build()).build(); + Connector c5 = Connector.builder().metadata(Metadata.builder().name("ns2-connect1").build()).build(); - Optional actual = connectorService.findByName(ns, "ns-connect1"); + List acls = List.of( + AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .permission(AccessControlEntry.Permission.OWNER) + .grantedTo("namespace") + .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) + .resourceType(AccessControlEntry.ResourceType.CONNECT) + .resource("ns-") + .build()) + .build(), + AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .permission(AccessControlEntry.Permission.OWNER) + .grantedTo("namespace") + .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) + .resourceType(AccessControlEntry.ResourceType.CONNECT) + .resource("other-") + .build()) + .build() + ); + + when(aclService.findResourceOwnerGrantedToNamespace(ns, AccessControlEntry.ResourceType.CONNECT)) + .thenReturn(acls); + + when(connectorRepository.findAllForCluster("local")).thenReturn(List.of(c1, c2, c3, c4, c5)); + when(aclService.isAnyAclOfResource(acls, "ns-connect1")).thenReturn(true); + when(aclService.isAnyAclOfResource(acls, "ns-connect2")).thenReturn(true); + when(aclService.isAnyAclOfResource(acls, "other-connect1")).thenReturn(true); + when(aclService.isAnyAclOfResource(acls, "other-connect2")).thenReturn(true); + when(aclService.isAnyAclOfResource(acls, "ns2-connect1")).thenReturn(false); + + assertEquals(List.of(c1, c2), connectorService.findAllForNamespace(ns, "ns-connect?")); + assertEquals(List.of(c1, c3), connectorService.findAllForNamespace(ns, "*-connect1")); + assertEquals(List.of(c1, c2, c3, c4), connectorService.findAllForNamespace(ns, "*-connect?")); + assertTrue(connectorService.findAllForNamespace(ns, "ns2-*").isEmpty()); + assertTrue(connectorService.findAllForNamespace(ns, "ns*4-connect?").isEmpty()); + } + + @Test + void findByNameNotFound() { + Namespace ns = Namespace.builder() + .metadata(Metadata.builder() + .name("namespace") + .cluster("local") + .build()) + .build(); + + when(connectorRepository.findAllForCluster("local")).thenReturn(List.of()); - assertTrue(actual.isEmpty()); + assertTrue(connectorService.findByName(ns, "ns-connect1").isEmpty()); } @Test @@ -212,49 +303,12 @@ void findByNameFound() { .connectClusters(List.of("local-name")) .build()) .build(); - Connector c1 = Connector.builder() - .metadata(Metadata.builder().name("ns-connect1").build()) - .build(); - Connector c2 = Connector.builder() - .metadata(Metadata.builder().name("ns-connect2").build()) - .build(); - Connector c3 = Connector.builder() - .metadata(Metadata.builder().name("other-connect1").build()) - .build(); + Connector c1 = Connector.builder().metadata(Metadata.builder().name("ns-connect1").build()).build(); + Connector c2 = Connector.builder().metadata(Metadata.builder().name("ns-connect2").build()).build(); + Connector c3 = Connector.builder().metadata(Metadata.builder().name("other-connect1").build()).build(); - when(aclService.findAllGrantedToNamespace(ns)) - .thenReturn(List.of( - AccessControlEntry.builder() - .spec(AccessControlEntry.AccessControlEntrySpec.builder() - .permission(AccessControlEntry.Permission.OWNER) - .grantedTo("namespace") - .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) - .resourceType(AccessControlEntry.ResourceType.CONNECT) - .resource("ns-") - .build()) - .build(), - AccessControlEntry.builder() - .spec(AccessControlEntry.AccessControlEntrySpec.builder() - .permission(AccessControlEntry.Permission.OWNER) - .grantedTo("namespace") - .resourcePatternType(AccessControlEntry.ResourcePatternType.LITERAL) - .resourceType(AccessControlEntry.ResourceType.CONNECT) - .resource("other-connect1") - .build()) - .build(), - AccessControlEntry.builder() - .spec(AccessControlEntry.AccessControlEntrySpec.builder() - .permission(AccessControlEntry.Permission.OWNER) - .grantedTo("namespace") - .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) - .resourceType(AccessControlEntry.ResourceType.TOPIC) - .resource("ns-") - .build()) - .build() - )); - - when(aclService.findAllGrantedToNamespace(ns)) - .thenReturn(List.of(AccessControlEntry.builder() + List acls = List.of( + AccessControlEntry.builder() .spec(AccessControlEntry.AccessControlEntrySpec.builder() .permission(AccessControlEntry.Permission.OWNER) .grantedTo("namespace") @@ -262,10 +316,24 @@ void findByNameFound() { .resourceType(AccessControlEntry.ResourceType.CONNECT) .resource("ns-") .build()) - .build())); + .build(), + AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .permission(AccessControlEntry.Permission.OWNER) + .grantedTo("namespace") + .resourcePatternType(AccessControlEntry.ResourcePatternType.LITERAL) + .resourceType(AccessControlEntry.ResourceType.CONNECT) + .resource("other-connect1") + .build()) + .build() + ); - when(connectorRepository.findAllForCluster("local")) - .thenReturn(List.of(c1, c2, c3)); + when(aclService.findResourceOwnerGrantedToNamespace(ns, AccessControlEntry.ResourceType.CONNECT)) + .thenReturn(acls); + when(connectorRepository.findAllForCluster("local")).thenReturn(List.of(c1, c2, c3)); + when(aclService.isAnyAclOfResource(acls, "ns-connect1")).thenReturn(true); + when(aclService.isAnyAclOfResource(acls, "ns-connect2")).thenReturn(true); + when(aclService.isAnyAclOfResource(acls, "other-connect1")).thenReturn(true); Optional actual = connectorService.findByName(ns, "ns-connect1"); @@ -361,7 +429,7 @@ void validateLocallyInvalidConnectCluster() { assertEquals(1, response.size()); assertEquals( "Invalid value \"wrong\" for field \"connectCluster\": value must be one of \"local-name\".", - response.get(0)); + response.getFirst()); }) .verifyComplete(); } @@ -390,7 +458,7 @@ void validateLocallyNoClassName() { .consumeNextWith(response -> { assertEquals(1, response.size()); assertEquals("Invalid empty value for field \"connector.class\": value must not be null.", - response.get(0)); + response.getFirst()); }) .verifyComplete(); } @@ -424,7 +492,7 @@ void validateLocallyInvalidClassName() { "Invalid value \"org.apache.kafka.connect.file.FileStreamSinkConnector\" " + "for field \"connector.class\": failed to find any class that implements connector and " + "which name matches org.apache.kafka.connect.file.FileStreamSinkConnector.", - response.get(0)); + response.getFirst()); }) .verifyComplete(); } @@ -463,7 +531,7 @@ void validateLocallyValidationErrors() { .consumeNextWith(response -> { assertEquals(1, response.size()); assertEquals("Invalid empty value for field \"missing.field\": value must not be null.", - response.get(0)); + response.getFirst()); }) .verifyComplete(); } @@ -678,7 +746,7 @@ void validateRemotelyErrors() { StepVerifier.create(connectorService.validateRemotely(ns, connector)) .consumeNextWith(response -> { assertEquals(1, response.size()); - assertEquals("Invalid \"connect1\": error_message.", response.get(0)); + assertEquals("Invalid \"connect1\": error_message.", response.getFirst()); }) .verifyComplete(); } @@ -737,21 +805,31 @@ void listUnsynchronizedNoExistingConnectors() { .metadata(Metadata.builder().name("ns-connect-cluster").build()) .build(); - Connector c1 = Connector.builder() - .metadata(Metadata.builder().name("ns-connect1").build()) - .build(); - Connector c2 = Connector.builder() - .metadata(Metadata.builder().name("ns-connect2").build()) - .build(); - Connector c3 = Connector.builder() - .metadata(Metadata.builder().name("ns1-connect1").build()) - .build(); - Connector c4 = Connector.builder() - .metadata(Metadata.builder().name("ns2-connect1").build()) - .build(); - Connector c5 = Connector.builder() - .metadata(Metadata.builder().name("ns1-connect2").build()) - .build(); + Connector c1 = Connector.builder().metadata(Metadata.builder().name("ns-connect1").build()).build(); + Connector c2 = Connector.builder().metadata(Metadata.builder().name("ns-connect2").build()).build(); + Connector c3 = Connector.builder().metadata(Metadata.builder().name("ns1-connect1").build()).build(); + Connector c5 = Connector.builder().metadata(Metadata.builder().name("ns2-connect1").build()).build(); + Connector c4 = Connector.builder().metadata(Metadata.builder().name("ns1-connect2").build()).build(); + + List acls = List.of( + AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .permission(AccessControlEntry.Permission.OWNER) + .grantedTo("namespace") + .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) + .resourceType(AccessControlEntry.ResourceType.CONNECT) + .resource("ns-") + .build()) + .build(), + AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .permission(AccessControlEntry.Permission.OWNER) + .grantedTo("namespace") + .resourcePatternType(AccessControlEntry.ResourcePatternType.LITERAL) + .resourceType(AccessControlEntry.ResourceType.CONNECT) + .resource("ns1-connect1") + .build()) + .build()); when(connectClusterService.findAllByNamespaceWithWritePermission(ns)) .thenReturn(List.of(connectCluster)); @@ -777,30 +855,11 @@ void listUnsynchronizedNoExistingConnectors() { "ns2-connect1")) .thenReturn(false); - when(aclService.findAllGrantedToNamespace(ns)) - .thenReturn(List.of( - AccessControlEntry.builder() - .spec(AccessControlEntry.AccessControlEntrySpec.builder() - .permission(AccessControlEntry.Permission.OWNER) - .grantedTo("namespace") - .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) - .resourceType(AccessControlEntry.ResourceType.CONNECT) - .resource("ns-") - .build()) - .build(), - AccessControlEntry.builder() - .spec(AccessControlEntry.AccessControlEntrySpec.builder() - .permission(AccessControlEntry.Permission.OWNER) - .grantedTo("namespace") - .resourcePatternType(AccessControlEntry.ResourcePatternType.LITERAL) - .resourceType(AccessControlEntry.ResourceType.CONNECT) - .resource("ns1-connect1") - .build()) - .build())); + when(aclService.findResourceOwnerGrantedToNamespace(ns, AccessControlEntry.ResourceType.CONNECT)) + .thenReturn(acls); // no connects exists into Ns4Kafka - when(connectorRepository.findAllForCluster("local")) - .thenReturn(List.of()); + when(connectorRepository.findAllForCluster("local")).thenReturn(List.of()); StepVerifier.create(connectorService.listUnsynchronizedConnectors(ns)) .consumeNextWith(connector -> assertEquals("ns-connect1", connector.getMetadata().getName())) @@ -826,26 +885,45 @@ void listUnsynchronizedAllExistingConnectors() { when(applicationContext.getBean(ConnectorAsyncExecutor.class, Qualifiers.byName(ns.getMetadata().getCluster()))).thenReturn(connectorAsyncExecutor); - ConnectCluster connectCluster = ConnectCluster.builder() .metadata(Metadata.builder().name("ns-connect-cluster").build()) .build(); - Connector c1 = Connector.builder() - .metadata(Metadata.builder().name("ns-connect1").build()) - .build(); - Connector c2 = Connector.builder() - .metadata(Metadata.builder().name("ns-connect2").build()) - .build(); - Connector c3 = Connector.builder() - .metadata(Metadata.builder().name("ns1-connect1").build()) - .build(); - Connector c4 = Connector.builder() - .metadata(Metadata.builder().name("ns2-connect1").build()) - .build(); - Connector c5 = Connector.builder() - .metadata(Metadata.builder().name("ns1-connect2").build()) - .build(); + Connector c1 = Connector.builder().metadata(Metadata.builder().name("ns-connect1").build()).build(); + Connector c2 = Connector.builder().metadata(Metadata.builder().name("ns-connect2").build()).build(); + Connector c3 = Connector.builder().metadata(Metadata.builder().name("ns1-connect1").build()).build(); + Connector c4 = Connector.builder().metadata(Metadata.builder().name("ns2-connect1").build()).build(); + Connector c5 = Connector.builder().metadata(Metadata.builder().name("ns1-connect2").build()).build(); + + List acls = List.of( + AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .permission(AccessControlEntry.Permission.OWNER) + .grantedTo("namespace") + .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) + .resourceType(AccessControlEntry.ResourceType.CONNECT) + .resource("ns-") + .build()) + .build(), + AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .permission(AccessControlEntry.Permission.OWNER) + .grantedTo("namespace") + .resourcePatternType(AccessControlEntry.ResourcePatternType.LITERAL) + .resourceType(AccessControlEntry.ResourceType.CONNECT) + .resource("ns1-connect1") + .build()) + .build(), + AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .permission(AccessControlEntry.Permission.OWNER) + .grantedTo("namespace") + .resourcePatternType(AccessControlEntry.ResourcePatternType.LITERAL) + .resourceType(AccessControlEntry.ResourceType.CONNECT) + .resource("ns1-connect2") + .build()) + .build() + ); when(connectClusterService.findAllByNamespaceWithWritePermission(ns)) .thenReturn(List.of(connectCluster)); @@ -873,36 +951,13 @@ void listUnsynchronizedAllExistingConnectors() { "ns2-connect1")) .thenReturn(false); - when(aclService.findAllGrantedToNamespace(ns)) - .thenReturn(List.of( - AccessControlEntry.builder() - .spec(AccessControlEntry.AccessControlEntrySpec.builder() - .permission(AccessControlEntry.Permission.OWNER) - .grantedTo("namespace") - .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) - .resourceType(AccessControlEntry.ResourceType.CONNECT) - .resource("ns-") - .build()) - .build(), - AccessControlEntry.builder() - .spec(AccessControlEntry.AccessControlEntrySpec.builder() - .permission(AccessControlEntry.Permission.OWNER) - .grantedTo("namespace") - .resourcePatternType(AccessControlEntry.ResourcePatternType.LITERAL) - .resourceType(AccessControlEntry.ResourceType.CONNECT) - .resource("ns1-connect1") - .build()) - .build(), - AccessControlEntry.builder() - .spec(AccessControlEntry.AccessControlEntrySpec.builder() - .permission(AccessControlEntry.Permission.OWNER) - .grantedTo("namespace") - .resourcePatternType(AccessControlEntry.ResourcePatternType.LITERAL) - .resourceType(AccessControlEntry.ResourceType.CONNECT) - .resource("ns1-connect2") - .build()) - .build() - )); + when(aclService.findResourceOwnerGrantedToNamespace(ns, AccessControlEntry.ResourceType.CONNECT)) + .thenReturn(acls); + when(aclService.isAnyAclOfResource(acls, "ns-connect1")).thenReturn(true); + when(aclService.isAnyAclOfResource(acls, "ns-connect2")).thenReturn(true); + when(aclService.isAnyAclOfResource(acls, "ns1-connect1")).thenReturn(true); + when(aclService.isAnyAclOfResource(acls, "ns1-connect2")).thenReturn(true); + when(aclService.isAnyAclOfResource(acls, "ns2-connect1")).thenReturn(false); StepVerifier.create(connectorService.listUnsynchronizedConnectors(ns)) .verifyComplete(); @@ -926,26 +981,37 @@ void listUnsynchronizedPartialExistingConnectors() { Qualifiers.byName(ns.getMetadata().getCluster()))).thenReturn(connectorAsyncExecutor); // list of existing broker connectors - Connector c1 = Connector.builder() - .metadata(Metadata.builder().name("ns-connect1").build()) - .build(); - Connector c2 = Connector.builder() - .metadata(Metadata.builder().name("ns-connect2").build()) - .build(); - Connector c3 = Connector.builder() - .metadata(Metadata.builder().name("ns1-connect1").build()) - .build(); - Connector c4 = Connector.builder() - .metadata(Metadata.builder().name("ns2-connect1").build()) - .build(); + Connector c1 = Connector.builder().metadata(Metadata.builder().name("ns-connect1").build()).build(); + Connector c2 = Connector.builder().metadata(Metadata.builder().name("ns-connect2").build()).build(); + Connector c3 = Connector.builder().metadata(Metadata.builder().name("ns1-connect1").build()).build(); + Connector c4 = Connector.builder().metadata(Metadata.builder().name("ns2-connect1").build()).build(); - when(connectorAsyncExecutor.collectBrokerConnectors("local-name")).thenReturn(Flux.fromIterable(List.of( - c1, c2, c3, c4))); + List acls = List.of( + AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .permission(AccessControlEntry.Permission.OWNER) + .grantedTo("namespace") + .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) + .resourceType(AccessControlEntry.ResourceType.CONNECT) + .resource("ns-") + .build()) + .build(), + AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .permission(AccessControlEntry.Permission.OWNER) + .grantedTo("namespace") + .resourcePatternType(AccessControlEntry.ResourcePatternType.LITERAL) + .resourceType(AccessControlEntry.ResourceType.CONNECT) + .resource("ns1-connect1") + .build()) + .build() + ); - // list of existing broker connects - when(connectorRepository.findAllForCluster("local")) - .thenReturn(List.of(c1, c2, c3, c4)); + when(connectorAsyncExecutor.collectBrokerConnectors("local-name")) + .thenReturn(Flux.fromIterable(List.of(c1, c2, c3, c4))); + // list of existing broker connects + when(connectorRepository.findAllForCluster("local")).thenReturn(List.of(c1, c2, c3, c4)); // list of existing Ns4Kafka access control entries when(aclService.isNamespaceOwnerOfResource("namespace", AccessControlEntry.ResourceType.CONNECT, @@ -961,30 +1027,10 @@ void listUnsynchronizedPartialExistingConnectors() { "ns2-connect1")) .thenReturn(false); - when(aclService.findAllGrantedToNamespace(ns)) - .thenReturn(List.of( - AccessControlEntry.builder() - .spec(AccessControlEntry.AccessControlEntrySpec.builder() - .permission(AccessControlEntry.Permission.OWNER) - .grantedTo("namespace") - .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) - .resourceType(AccessControlEntry.ResourceType.CONNECT) - .resource("ns-") - .build()) - .build(), - AccessControlEntry.builder() - .spec(AccessControlEntry.AccessControlEntrySpec.builder() - .permission(AccessControlEntry.Permission.OWNER) - .grantedTo("namespace") - .resourcePatternType(AccessControlEntry.ResourcePatternType.LITERAL) - .resourceType(AccessControlEntry.ResourceType.CONNECT) - .resource("ns1-connect1") - .build()) - .build() - )); - - when(connectorRepository.findAllForCluster("local")) - .thenReturn(List.of(c1)); + when(aclService.findResourceOwnerGrantedToNamespace(ns, AccessControlEntry.ResourceType.CONNECT)) + .thenReturn(acls); + when(connectorRepository.findAllForCluster("local")).thenReturn(List.of(c1)); + when(aclService.isAnyAclOfResource(acls, "ns-connect1")).thenReturn(true); StepVerifier.create(connectorService.listUnsynchronizedConnectors(ns)) .consumeNextWith(connector -> assertEquals("ns-connect2", connector.getMetadata().getName()))