Skip to content

Commit

Permalink
Connect Resource creates its own Consumer Group ACL (#131)
Browse files Browse the repository at this point in the history
* Added connect- Consumer Group ACL in Kafka when Connect ACL is declared

* Increase ACL Test coverage

* Good catch little test
  • Loading branch information
twobeeb authored Dec 1, 2021
1 parent cc7aa6b commit 0313c4a
Show file tree
Hide file tree
Showing 2 changed files with 200 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@
import com.michelin.ns4kafka.repositories.NamespaceRepository;
import com.michelin.ns4kafka.repositories.kafka.KafkaStoreException;
import com.michelin.ns4kafka.services.AccessControlEntryService;
import com.michelin.ns4kafka.services.KafkaConnectService;
import com.michelin.ns4kafka.services.StreamService;

import io.micronaut.context.annotation.EachBean;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.acl.AclBinding;
Expand All @@ -26,6 +25,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -42,6 +42,8 @@ public class AccessControlEntryAsyncExecutor {
AccessControlEntryService accessControlEntryService;
@Inject
StreamService streamService;
@Inject
KafkaConnectService kafkaConnectService;

public AccessControlEntryAsyncExecutor(KafkaAsyncExecutorConfig kafkaAsyncExecutorConfig) {
this.kafkaAsyncExecutorConfig = kafkaAsyncExecutorConfig;
Expand Down Expand Up @@ -103,21 +105,35 @@ private List<AclBinding> collectNs4KafkaACLs() {
// at some point we want to manage multiple users within a namespace, each having their own ACLs.

List<Namespace> namespaces = namespaceRepository.findAllForCluster(kafkaAsyncExecutorConfig.getName());
List<AclBinding> ns4kafkaACLs = Stream.concat(
namespaces.stream()

// Converts Topic and Group ns4kafka ACL to Topic and Group Kafka AclBindings
Stream<AclBinding> aclBindingFromACLs = namespaces.stream()
.flatMap(namespace -> accessControlEntryService.findAllGrantedToNamespace(namespace)
.stream()
.filter(accessControlEntry -> accessControlEntry.getSpec().getResourceType() == AccessControlEntry.ResourceType.TOPIC ||
accessControlEntry.getSpec().getResourceType() == AccessControlEntry.ResourceType.GROUP)
//1-N ACE to List<AclBinding>
.flatMap(accessControlEntry ->
buildAclBindingsFromAccessControlEntry(accessControlEntry, namespace.getSpec().getKafkaUser()).stream()))
, namespaces.stream()
buildAclBindingsFromAccessControlEntry(accessControlEntry, namespace.getSpec().getKafkaUser()).stream()));
// Converts KafkaStream Resources to Topic (CREATE/DELETE) AclBindings
Stream<AclBinding> aclBindingFromKStream = namespaces.stream()
.flatMap(namespace -> streamService.findAllForNamespace(namespace)
.stream()
.flatMap(kafkaStream ->
buildAclBindingsFromKafkaStream(kafkaStream, namespace.getSpec().getKafkaUser()).stream()))
).collect(Collectors.toList());
buildAclBindingsFromKafkaStream(kafkaStream, namespace.getSpec().getKafkaUser()).stream()));
// Converts Connect ACL to Group AclBindings (connect-)
Stream<AclBinding> aclBindingFromConnect = namespaces.stream()
.flatMap(namespace -> accessControlEntryService.findAllGrantedToNamespace(namespace)
.stream()
.filter(accessControlEntry -> accessControlEntry.getSpec().getResourceType() == AccessControlEntry.ResourceType.CONNECT)
.filter(accessControlEntry -> accessControlEntry.getSpec().getPermission() == AccessControlEntry.Permission.OWNER)
//1-N ACE to List<AclBinding>
.flatMap(accessControlEntry ->
buildAclBindingsFromConnector(accessControlEntry, namespace.getSpec().getKafkaUser()).stream()));

List<AclBinding> ns4kafkaACLs = Stream.of(aclBindingFromACLs, aclBindingFromKStream, aclBindingFromConnect)
.flatMap(Function.identity())
.collect(Collectors.toList());

if (log.isDebugEnabled()) {
log.debug("ACLs found on ns4kafka : " + ns4kafkaACLs.size());
Expand Down Expand Up @@ -209,6 +225,19 @@ private List<AclBinding> buildAclBindingsFromKafkaStream(KafkaStream stream, Str
)
);
}
private List<AclBinding> buildAclBindingsFromConnector(AccessControlEntry acl, String kafkaUser) {
// READ on Consumer Group connect-{prefix}
PatternType patternType = PatternType.fromString(acl.getSpec().getResourcePatternType().toString());
ResourcePattern resourcePattern = new ResourcePattern(ResourceType.GROUP,
"connect-" + acl.getSpec().getResource(),
patternType);
return List.of(
new AclBinding(
resourcePattern,
new org.apache.kafka.common.acl.AccessControlEntry("User:" + kafkaUser, "*", AclOperation.READ, AclPermissionType.ALLOW)
)
);
}

private List<AclOperation> computeAclOperationForOwner(ResourceType resourceType) {
switch (resourceType) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package com.michelin.ns4kafka.integration;

import com.michelin.ns4kafka.integration.TopicTest.BearerAccessRefreshToken;
import com.michelin.ns4kafka.models.*;
import com.michelin.ns4kafka.models.AccessControlEntry;
import com.michelin.ns4kafka.models.AccessControlEntry.AccessControlEntrySpec;
import com.michelin.ns4kafka.models.AccessControlEntry.Permission;
import com.michelin.ns4kafka.models.AccessControlEntry.ResourcePatternType;
import com.michelin.ns4kafka.models.AccessControlEntry.ResourceType;
import com.michelin.ns4kafka.models.Namespace;
import com.michelin.ns4kafka.models.Namespace.NamespaceSpec;
import com.michelin.ns4kafka.models.ObjectMeta;
import com.michelin.ns4kafka.models.RoleBinding;
import com.michelin.ns4kafka.models.RoleBinding.*;
import com.michelin.ns4kafka.services.executors.AccessControlEntryAsyncExecutor;
import com.michelin.ns4kafka.validation.TopicValidator;
Expand All @@ -21,18 +19,18 @@
import io.micronaut.rxjava3.http.client.Rx3HttpClient;
import io.micronaut.security.authentication.UsernamePasswordCredentials;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.acl.*;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;

@MicronautTest
Expand Down Expand Up @@ -89,7 +87,7 @@ void init(){
}

@Test
void createACL() throws InterruptedException, ExecutionException {
void createTopicReadACL() throws InterruptedException, ExecutionException {

AccessControlEntry aclTopic = AccessControlEntry.builder()
.metadata(ObjectMeta.builder()
Expand All @@ -107,30 +105,178 @@ void createACL() throws InterruptedException, ExecutionException {

client.exchange(HttpRequest.create(HttpMethod.POST,"/api/namespaces/ns1/acls").bearerAuth(token).body(aclTopic)).blockingFirst();

List<Map<String, Object>> aclTopicSaved = client.retrieve(HttpRequest.create(HttpMethod.GET,"/api/namespaces/ns1/acls").bearerAuth(token), List.class).blockingFirst();
System.out.println(aclTopicSaved);

//force ACL Sync
accessControlEntryAsyncExecutorList.forEach(AccessControlEntryAsyncExecutor::run);


Admin kafkaClient = getAdminClient();

AclBindingFilter user1Filter = new AclBindingFilter(
ResourcePatternFilter.ANY,
new AccessControlEntryFilter("User:user1", null, AclOperation.ANY, AclPermissionType.ANY));
Collection<AclBinding> results = kafkaClient.describeAcls(user1Filter).values().get();

AclBinding expected = new AclBinding(
new ResourcePattern(org.apache.kafka.common.resource.ResourceType.TOPIC, "ns1-", PatternType.PREFIXED),
new org.apache.kafka.common.acl.AccessControlEntry("User:user1", "*", AclOperation.READ, AclPermissionType.ALLOW));

Assertions.assertEquals(1, results.size());
Assertions.assertEquals(expected, results.stream().findFirst().get());

// DELETE the ACL and verify
client.exchange(HttpRequest.create(HttpMethod.DELETE,"/api/namespaces/ns1/acls/ns1-acl").bearerAuth(token).body(aclTopic)).blockingFirst();

accessControlEntryAsyncExecutorList.forEach(AccessControlEntryAsyncExecutor::run);

results = kafkaClient.describeAcls(user1Filter).values().get();

Assertions.assertTrue(results.isEmpty());
}

@Test
void createConnectOwnerACL() throws InterruptedException, ExecutionException {

AccessControlEntry aclTopic = AccessControlEntry.builder()
.metadata(ObjectMeta.builder()
.name("ns1-acl")
.namespace("ns1")
.build())
.spec(AccessControlEntrySpec.builder()
.resourceType(ResourceType.CONNECT)
.resource("ns1-")
.resourcePatternType(ResourcePatternType.PREFIXED)
.permission(Permission.OWNER)
.grantedTo("ns1")
.build())
.build();

client.exchange(HttpRequest.create(HttpMethod.POST,"/api/namespaces/ns1/acls").bearerAuth(token).body(aclTopic)).blockingFirst();

//force ACL Sync
accessControlEntryAsyncExecutorList.forEach(AccessControlEntryAsyncExecutor::run);

Admin kafkaClient = getAdminClient();

AclBindingFilter user1Filter = new AclBindingFilter(
ResourcePatternFilter.ANY,
new AccessControlEntryFilter("User:user1", null, AclOperation.ANY, AclPermissionType.ANY));
Collection<AclBinding> results = kafkaClient.describeAcls(user1Filter).values().get();

AclBinding expected = new AclBinding(
new ResourcePattern(org.apache.kafka.common.resource.ResourceType.GROUP, "connect-ns1-", PatternType.PREFIXED),
new org.apache.kafka.common.acl.AccessControlEntry("User:user1", "*", AclOperation.READ, AclPermissionType.ALLOW));

Assertions.assertEquals(1, results.size());
Assertions.assertEquals(expected, results.stream().findFirst().get());

// DELETE the ACL and verify
client.exchange(HttpRequest.create(HttpMethod.DELETE,"/api/namespaces/ns1/acls/ns1-acl").bearerAuth(token).body(aclTopic)).blockingFirst();

accessControlEntryAsyncExecutorList.forEach(AccessControlEntryAsyncExecutor::run);

results = kafkaClient.describeAcls(user1Filter).values().get();

Assertions.assertTrue(results.isEmpty());
}

@Test
void createStreamACL() throws InterruptedException, ExecutionException {
AccessControlEntry aclTopic = AccessControlEntry.builder()
.metadata(ObjectMeta.builder()
.name("ns1-acl-topic")
.namespace("ns1")
.build())
.spec(AccessControlEntrySpec.builder()
.resourceType(ResourceType.TOPIC)
.resource("ns1-")
.resourcePatternType(ResourcePatternType.PREFIXED)
.permission(Permission.OWNER)
.grantedTo("ns1")
.build())
.build();
AccessControlEntry aclGroup = AccessControlEntry.builder()
.metadata(ObjectMeta.builder()
.name("ns1-acl-group")
.namespace("ns1")
.build())
.spec(AccessControlEntrySpec.builder()
.resourceType(ResourceType.GROUP)
.resource("ns1-")
.resourcePatternType(ResourcePatternType.PREFIXED)
.permission(Permission.OWNER)
.grantedTo("ns1")
.build())
.build();

client.exchange(HttpRequest.create(HttpMethod.POST,"/api/namespaces/ns1/acls").bearerAuth(token).body(aclTopic)).blockingFirst();
client.exchange(HttpRequest.create(HttpMethod.POST,"/api/namespaces/ns1/acls").bearerAuth(token).body(aclGroup)).blockingFirst();

//force ACL Sync
accessControlEntryAsyncExecutorList.forEach(AccessControlEntryAsyncExecutor::run);

Admin kafkaClient = getAdminClient();

AclBindingFilter user1Filter = new AclBindingFilter(
ResourcePatternFilter.ANY,
new AccessControlEntryFilter("User:user1", null, AclOperation.ANY, AclPermissionType.ANY));
Collection<AclBinding> results = kafkaClient.describeAcls(user1Filter).values().get();

// Topic ns1- READ
// Topic ns1- WRITE
// Group ns1- READ

AclBinding ac1 = new AclBinding(
new ResourcePattern(org.apache.kafka.common.resource.ResourceType.TOPIC, "ns1-", PatternType.PREFIXED),
new org.apache.kafka.common.acl.AccessControlEntry("User:user1", "*", AclOperation.READ, AclPermissionType.ALLOW));
AclBinding ac2 = new AclBinding(
new ResourcePattern(org.apache.kafka.common.resource.ResourceType.TOPIC, "ns1-", PatternType.PREFIXED),
new org.apache.kafka.common.acl.AccessControlEntry("User:user1", "*", AclOperation.WRITE, AclPermissionType.ALLOW));
AclBinding ac3 = new AclBinding(
new ResourcePattern(org.apache.kafka.common.resource.ResourceType.GROUP, "ns1-", PatternType.PREFIXED),
new org.apache.kafka.common.acl.AccessControlEntry("User:user1", "*", AclOperation.READ, AclPermissionType.ALLOW));

Assertions.assertEquals(3, results.size());
Assertions.assertTrue(results.containsAll(List.of(ac1, ac2, ac3)));


KafkaStream stream = KafkaStream.builder()
.metadata(ObjectMeta.builder()
.name("ns1-stream1")
.namespace("ns1")
.build())
.build();

client.exchange(HttpRequest.create(HttpMethod.POST,"/api/namespaces/ns1/streams").bearerAuth(token).body(stream)).blockingFirst();

//force ACL Sync
accessControlEntryAsyncExecutorList.forEach(AccessControlEntryAsyncExecutor::run);

results = kafkaClient.describeAcls(user1Filter).values().get();

// Topic ns1- READ
// Topic ns1- WRITE
// Group ns1- READ
// Topic ns1-stream1 CREATE
// Topic ns1-stream1 DELETE
AclBinding ac4 = new AclBinding(
new ResourcePattern(org.apache.kafka.common.resource.ResourceType.TOPIC, "ns1-stream1", PatternType.PREFIXED),
new org.apache.kafka.common.acl.AccessControlEntry("User:user1", "*", AclOperation.CREATE, AclPermissionType.ALLOW));
AclBinding ac5 = new AclBinding(
new ResourcePattern(org.apache.kafka.common.resource.ResourceType.TOPIC, "ns1-stream1", PatternType.PREFIXED),
new org.apache.kafka.common.acl.AccessControlEntry("User:user1", "*", AclOperation.DELETE, AclPermissionType.ALLOW));

Assertions.assertEquals(5, results.size());
Assertions.assertTrue(results.containsAll(List.of(ac1, ac2, ac3, ac4, ac5)));


// DELETE the Stream & ACL and verify
client.exchange(HttpRequest.create(HttpMethod.DELETE,"/api/namespaces/ns1/streams/ns1-stream1").bearerAuth(token).body(aclTopic)).blockingFirst();
client.exchange(HttpRequest.create(HttpMethod.DELETE,"/api/namespaces/ns1/acls/ns1-acl-topic").bearerAuth(token).body(aclTopic)).blockingFirst();
client.exchange(HttpRequest.create(HttpMethod.DELETE,"/api/namespaces/ns1/acls/ns1-acl-group").bearerAuth(token).body(aclTopic)).blockingFirst();

accessControlEntryAsyncExecutorList.forEach(AccessControlEntryAsyncExecutor::run);

AclBinding result = results.stream().findAny().get();
results = kafkaClient.describeAcls(user1Filter).values().get();

Assertions.assertEquals(AclOperation.READ, result.entry().operation());
Assertions.assertEquals(AclPermissionType.ALLOW, result.entry().permissionType());
Assertions.assertEquals("User:user1", result.entry().principal());
Assertions.assertEquals(org.apache.kafka.common.resource.ResourceType.TOPIC, result.pattern().resourceType());
Assertions.assertEquals(PatternType.PREFIXED, result.pattern().patternType());
Assertions.assertEquals("ns1-", result.pattern().name());
Assertions.assertTrue(results.isEmpty());
}
}

0 comments on commit 0313c4a

Please sign in to comment.