Skip to content

Commit

Permalink
connect cluster filter
Browse files Browse the repository at this point in the history
  • Loading branch information
ThomasCAI-mlv committed Jul 30, 2024
1 parent 017338b commit c21c8ce
Show file tree
Hide file tree
Showing 6 changed files with 427 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,15 @@ public class ConnectClusterController extends NamespacedResourceController {
ConnectorService connectorService;

/**
* List Kafka Connect clusters by namespace.
* List Kafka Connect clusters by namespace, filtered by name parameter.
*
* @param namespace The namespace
* @param name The name parameter
* @return A list of Kafka Connect clusters
*/
@Get
public List<ConnectCluster> list(String namespace) {
return connectClusterService.findAllByNamespaceOwner(getNamespace(namespace));
public List<ConnectCluster> list(String namespace, @QueryValue(defaultValue = "*") String name) {
return connectClusterService.findAllByNamespaceOwner(getNamespace(namespace), name);
}

/**
Expand Down
32 changes: 32 additions & 0 deletions src/main/java/com/michelin/ns4kafka/service/AclService.java
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,24 @@ public List<AccessControlEntry> findAllGrantedToNamespace(Namespace namespace) {
.toList();
}

/**
* Find all owner-ACLs on a resource for a given namespace.
*
* @param namespace The namespace
* @param resourceType The resource
* @return A list of ACLs
*/
public List<AccessControlEntry> findResourceOwnerGrantedToNamespace(Namespace namespace,
AccessControlEntry.ResourceType resourceType) {
return accessControlEntryRepository.findAll()
.stream()
.filter(accessControlEntry ->
accessControlEntry.getSpec().getGrantedTo().equals(namespace.getMetadata().getName())
&& accessControlEntry.getSpec().getPermission() == AccessControlEntry.Permission.OWNER
&& accessControlEntry.getSpec().getResourceType() == resourceType)
.toList();
}

/**
* Find all public granted ACLs.
*
Expand Down Expand Up @@ -340,4 +358,18 @@ public boolean isNamespaceOwnerOfResource(String namespace, AccessControlEntry.R
public Optional<AccessControlEntry> findByName(String namespace, String name) {
return accessControlEntryRepository.findByName(namespace, name);
}

/**
* Check if there is any ACL concerning the given resource.
*
* @param acls The OWNER ACL list on resource
* @param resourceName The resource name to check ACL against
* @return true if there is any OWNER ACL concerning the given resource, false otherwise
*/
public boolean isAnyAclOfResource(List<AccessControlEntry> acls, String resourceName) {
return acls.stream().anyMatch(acl -> switch (acl.getSpec().getResourcePatternType()) {
case PREFIXED -> resourceName.startsWith(acl.getSpec().getResource());
case LITERAL -> resourceName.equals(acl.getSpec().getResource());
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.michelin.ns4kafka.service.client.connect.entities.ServerInfo;
import com.michelin.ns4kafka.util.EncryptionUtils;
import com.michelin.ns4kafka.util.FormatErrorUtils;
import com.michelin.ns4kafka.util.RegexUtils;
import io.micronaut.core.util.StringUtils;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.MutableHttpRequest;
Expand Down Expand Up @@ -73,7 +74,7 @@ public class ConnectClusterService {
* Find all self deployed Connect clusters.
*
* @param all Include hard-declared Connect clusters
* @return A list of Connect clusters
* @return A list of Connect clusters
*/
public Flux<ConnectCluster> findAll(boolean all) {
List<ConnectCluster> results = connectClusterRepository.findAll();
Expand Down Expand Up @@ -116,26 +117,27 @@ public Flux<ConnectCluster> findAll(boolean all) {
}

/**
* Find all self deployed Connect clusters for a given namespace with a given list of permissions.
* Find all self deployed Connect clusters of a given namespace, with a given list of permissions,
* filtered by name parameter.
*
* @param namespace The namespace
* @param name The name filter
* @param permissions The list of permission to filter on
* @return A list of Connect clusters
*/
public List<ConnectCluster> findAllByNamespace(Namespace namespace,
public List<ConnectCluster> findAllByNamespace(Namespace namespace, String name,
List<AccessControlEntry.Permission> permissions) {
List<AccessControlEntry> acls = aclService.findAllGrantedToNamespace(namespace).stream()
.filter(acl -> permissions.contains(acl.getSpec().getPermission()))
.filter(acl -> acl.getSpec().getResourceType() == AccessControlEntry.ResourceType.CONNECT_CLUSTER).toList();
List<AccessControlEntry> acls = aclService.findAllGrantedToNamespace(namespace)
.stream()
.filter(acl -> permissions.contains(acl.getSpec().getPermission())
&& acl.getSpec().getResourceType() == AccessControlEntry.ResourceType.CONNECT_CLUSTER)
.toList();
List<String> nameFilterPatterns = RegexUtils.wildcardStringsToRegexPatterns(List.of(name));

return connectClusterRepository.findAllForCluster(namespace.getMetadata().getCluster())
.stream()
.filter(connector -> acls.stream().anyMatch(accessControlEntry -> switch (accessControlEntry.getSpec()
.getResourcePatternType()) {
case PREFIXED -> connector.getMetadata().getName()
.startsWith(accessControlEntry.getSpec().getResource());
case LITERAL -> connector.getMetadata().getName().equals(accessControlEntry.getSpec().getResource());
}))
.filter(cc -> aclService.isAnyAclOfResource(acls, cc.getMetadata().getName())
&& RegexUtils.filterByPattern(cc.getMetadata().getName(), nameFilterPatterns))
.toList();
}

Expand All @@ -146,7 +148,18 @@ public List<ConnectCluster> findAllByNamespace(Namespace namespace,
* @return The list of owned Connect cluster
*/
public List<ConnectCluster> findAllByNamespaceOwner(Namespace namespace) {
return findAllByNamespace(namespace, List.of(AccessControlEntry.Permission.OWNER))
return findAllByNamespaceOwner(namespace, "*");
}

/**
* Find all self deployed Connect clusters whose namespace is owner, filtered by name parameter.
*
* @param namespace The namespace
* @param name The name filter
* @return The list of owned Connect cluster
*/
public List<ConnectCluster> findAllByNamespaceOwner(Namespace namespace, String name) {
return findAllByNamespace(namespace, name, List.of(AccessControlEntry.Permission.OWNER))
.stream()
.map(connectCluster -> {
var builder = ConnectCluster.ConnectClusterSpec.builder()
Expand Down Expand Up @@ -185,8 +198,8 @@ public List<ConnectCluster> findAllByNamespaceOwner(Namespace namespace) {
*/
public List<ConnectCluster> findAllByNamespaceWrite(Namespace namespace) {
return Stream.concat(
this.findAllByNamespaceOwner(namespace).stream(),
this.findAllByNamespace(namespace, List.of(AccessControlEntry.Permission.WRITE)).stream()
this.findAllByNamespaceOwner(namespace, "*").stream(),
this.findAllByNamespace(namespace, "*", List.of(AccessControlEntry.Permission.WRITE)).stream()
.map(connectCluster -> ConnectCluster.builder()
.metadata(connectCluster.getMetadata())
.spec(ConnectCluster.ConnectClusterSpec.builder()
Expand All @@ -209,10 +222,7 @@ public List<ConnectCluster> findAllByNamespaceWrite(Namespace namespace) {
* @return An optional connect worker
*/
public Optional<ConnectCluster> findByNamespaceAndNameOwner(Namespace namespace, String connectClusterName) {
return findAllByNamespaceOwner(namespace)
.stream()
.filter(connectCluster -> connectCluster.getMetadata().getName().equals(connectClusterName))
.findFirst();
return findAllByNamespaceOwner(namespace, connectClusterName).stream().findFirst();
}

/**
Expand Down Expand Up @@ -296,7 +306,7 @@ public Mono<List<String>> validateConnectClusterCreation(ConnectCluster connectC
public List<String> validateConnectClusterVault(final Namespace namespace, final String connectCluster) {
final var errors = new ArrayList<String>();

final List<ConnectCluster> kafkaConnects = findAllByNamespace(namespace,
final List<ConnectCluster> kafkaConnects = findAllByNamespace(namespace, "*",
List.of(AccessControlEntry.Permission.OWNER, AccessControlEntry.Permission.WRITE));

if (kafkaConnects.isEmpty()) {
Expand Down Expand Up @@ -373,7 +383,7 @@ public boolean isNamespaceAllowedForConnectCluster(Namespace namespace, String c
*/
public List<VaultResponse> vaultPassword(final Namespace namespace, final String connectCluster,
final List<String> passwords) {
final Optional<ConnectCluster> kafkaConnect = findAllByNamespace(namespace,
final Optional<ConnectCluster> kafkaConnect = findAllByNamespace(namespace, "*",
List.of(AccessControlEntry.Permission.OWNER, AccessControlEntry.Permission.WRITE))
.stream()
.filter(cc ->
Expand Down Expand Up @@ -410,6 +420,4 @@ public List<VaultResponse> vaultPassword(final Namespace namespace, final String
.build())
.toList();
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -62,47 +62,57 @@ class ConnectClusterControllerTest {
ApplicationEventPublisher<AuditLog> applicationEventPublisher;

@Test
void listEmptyConnectClusters() {
void listEmptyConnectCluster() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
.name("test")
.cluster("local")
.build())
.build();

when(namespaceService.findByName("test"))
.thenReturn(Optional.of(ns));
when(connectClusterService.findAllByNamespaceOwner(ns))
.thenReturn(List.of());
when(namespaceService.findByName("test")).thenReturn(Optional.of(ns));
when(connectClusterService.findAllByNamespaceOwner(ns, "*")).thenReturn(List.of());

List<ConnectCluster> actual = connectClusterController.list("test");
assertTrue(actual.isEmpty());
assertTrue(connectClusterController.list("test", "*").isEmpty());
}

@Test
void listMultipleConnectClusters() {
void listConnectClusterWithoutNameParameter() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
.name("test")
.cluster("local")
.build())
.build();

when(namespaceService.findByName("test"))
.thenReturn(Optional.of(ns));
when(connectClusterService.findAllByNamespaceOwner(ns))
.thenReturn(List.of(
ConnectCluster.builder()
.metadata(Metadata.builder().name("connect-cluster")
.build())
.build(),
ConnectCluster.builder()
.metadata(Metadata.builder().name("connect-cluster2")
.build())
.build()));
List<ConnectCluster> ccs = List.of(
ConnectCluster.builder().metadata(Metadata.builder().name("connect-cluster").build()).build(),
ConnectCluster.builder().metadata(Metadata.builder().name("connect-cluster2").build()).build()
);

when(namespaceService.findByName("test")).thenReturn(Optional.of(ns));
when(connectClusterService.findAllByNamespaceOwner(ns, "*")).thenReturn(ccs);

assertEquals(ccs, connectClusterController.list("test", "*"));
}

@Test
void listConnectClusterWithNameParameter() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
.name("test")
.cluster("local")
.build())
.build();

List<ConnectCluster> ccs = List.of(
ConnectCluster.builder().metadata(Metadata.builder().name("connect-cluster").build()).build()
);

when(namespaceService.findByName("test")).thenReturn(Optional.of(ns));
when(connectClusterService.findAllByNamespaceOwner(ns, "connect-cluster")).thenReturn(ccs);

List<ConnectCluster> actual = connectClusterController.list("test");
assertEquals(2, actual.size());
assertEquals(ccs, connectClusterController.list("test", "connect-cluster"));
}

@Test
Expand Down Expand Up @@ -269,7 +279,7 @@ void deleteConnectClusterWithConnectors() {
assertEquals(
"Invalid \"delete\" operation: The Kafka Connect \"connect-cluster\" has 1 deployed connector(s): "
+ "connect1. Please remove the associated connector(s) before deleting it.",
result.getValidationErrors().get(0));
result.getValidationErrors().getFirst());
}

@Test
Expand Down Expand Up @@ -335,7 +345,7 @@ void createNewConnectClusterNotOwner() {
assertEquals(1, ((ResourceValidationException) error).getValidationErrors().size());
assertEquals(
"Invalid value \"connect-cluster\" for field \"name\": namespace is not owner of the resource.",
((ResourceValidationException) error).getValidationErrors().get(0));
((ResourceValidationException) error).getValidationErrors().getFirst());
})
.verify();
}
Expand Down Expand Up @@ -366,7 +376,7 @@ void createNewConnectClusterValidationError() {
.consumeErrorWith(error -> {
assertEquals(ResourceValidationException.class, error.getClass());
assertEquals(1, ((ResourceValidationException) error).getValidationErrors().size());
assertEquals("Error occurred", ((ResourceValidationException) error).getValidationErrors().get(0));
assertEquals("Error occurred", ((ResourceValidationException) error).getValidationErrors().getFirst());
})
.verify();
}
Expand Down Expand Up @@ -559,7 +569,7 @@ void vaultOnNonAllowedConnectCluster() {
assertEquals(1, result.getValidationErrors().size());
assertEquals("Invalid value \"connect-cluster-na\" for field \"connect-cluster\": "
+ "namespace is not allowed to use this Kafka Connect.",
result.getValidationErrors().get(0));
result.getValidationErrors().getFirst());
}

@Test
Expand All @@ -584,7 +594,7 @@ void vaultOnNotValidAes256ConnectCluster() {
ResourceValidationException result = assertThrows(ResourceValidationException.class,
() -> connectClusterController.vaultPassword("test", connectClusterName, secrets));
assertEquals(1, result.getValidationErrors().size());
assertEquals("Error config.", result.getValidationErrors().get(0));
assertEquals("Error config.", result.getValidationErrors().getFirst());
}

@Test
Expand Down Expand Up @@ -614,7 +624,7 @@ void vaultOnValidAes256ConnectCluster() {

final List<VaultResponse> actual =
connectClusterController.vaultPassword("test", connectClusterName, List.of("secret"));
assertEquals("secret", actual.get(0).getSpec().getClearText());
assertEquals("encryptedSecret", actual.get(0).getSpec().getEncrypted());
assertEquals("secret", actual.getFirst().getSpec().getClearText());
assertEquals("encryptedSecret", actual.getFirst().getSpec().getEncrypted());
}
}
Loading

0 comments on commit c21c8ce

Please sign in to comment.