From 7e7a2a46fc07c821ccad6d88d520c013ebe23928 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Greffier?= Date: Tue, 23 Jul 2024 00:31:56 +0200 Subject: [PATCH] Fix ACL being deleted from owner Kafka user even when granted to another namespace --- CONTRIBUTING.md | 2 +- .../controller/acl/AclController.java | 4 +- .../kafka/KafkaNamespaceRepository.java | 1 - .../service/AccessControlEntryService.java | 3 +- .../AccessControlEntryAsyncExecutor.java | 73 +++++++++++-------- .../ns4kafka/util/FormatErrorUtils.java | 4 +- .../AccessControlEntryServiceTest.java | 18 +++-- 7 files changed, 59 insertions(+), 46 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index e83638b9..95a1709e 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -42,7 +42,7 @@ an Issue to discuss your proposal first. This is not required but can save time In general, we follow the ["fork-and-pull" Git workflow](https://github.com/susam/gitpr) -- Fork the repository to your own Github account +- Fork the repository to your own GitHub account - Clone the project to your machine - Create a branch locally from master with a succinct but descriptive name - Commit changes to the branch diff --git a/src/main/java/com/michelin/ns4kafka/controller/acl/AclController.java b/src/main/java/com/michelin/ns4kafka/controller/acl/AclController.java index 55cb2fea..531192cb 100644 --- a/src/main/java/com/michelin/ns4kafka/controller/acl/AclController.java +++ b/src/main/java/com/michelin/ns4kafka/controller/acl/AclController.java @@ -110,10 +110,10 @@ public HttpResponse apply(Authentication authentication, Str Namespace ns = getNamespace(namespace); boolean isAdmin = authentication.getRoles().contains(ResourceBasedSecurityRule.IS_ADMIN); - boolean isSelfAssignedAcl = namespace.equals(accessControlEntry.getSpec().getGrantedTo()); + boolean isSelfAssigned = namespace.equals(accessControlEntry.getSpec().getGrantedTo()); List validationErrors; - if (isAdmin && isSelfAssignedAcl) { + if (isAdmin && isSelfAssigned) { // Validate overlapping OWNER validationErrors = accessControlEntryService.validateAsAdmin(accessControlEntry, ns); } else { diff --git a/src/main/java/com/michelin/ns4kafka/repository/kafka/KafkaNamespaceRepository.java b/src/main/java/com/michelin/ns4kafka/repository/kafka/KafkaNamespaceRepository.java index 58d57a13..df854295 100644 --- a/src/main/java/com/michelin/ns4kafka/repository/kafka/KafkaNamespaceRepository.java +++ b/src/main/java/com/michelin/ns4kafka/repository/kafka/KafkaNamespaceRepository.java @@ -66,5 +66,4 @@ public Optional findByName(String namespace) { .filter(ns -> ns.getMetadata().getName().equals(namespace)) .findFirst(); } - } diff --git a/src/main/java/com/michelin/ns4kafka/service/AccessControlEntryService.java b/src/main/java/com/michelin/ns4kafka/service/AccessControlEntryService.java index 572358ff..8aa8db11 100644 --- a/src/main/java/com/michelin/ns4kafka/service/AccessControlEntryService.java +++ b/src/main/java/com/michelin/ns4kafka/service/AccessControlEntryService.java @@ -135,6 +135,7 @@ public List validateAsAdmin(AccessControlEntry accessControlEntry, Names parentOverlap = ace.getSpec().getResource().startsWith(accessControlEntry.getSpec().getResource()) || topicAclsCollideWithParentOrChild(ace, accessControlEntry); } + // new ACL would be covered by a PREFIXED existing ACLs boolean childOverlap = false; if (ace.getSpec().getResourcePatternType() == AccessControlEntry.ResourcePatternType.PREFIXED) { @@ -241,7 +242,7 @@ public void delete(Namespace namespace, AccessControlEntry accessControlEntry) { AccessControlEntryAsyncExecutor accessControlEntryAsyncExecutor = applicationContext.getBean(AccessControlEntryAsyncExecutor.class, Qualifiers.byName(accessControlEntry.getMetadata().getCluster())); - accessControlEntryAsyncExecutor.deleteNs4KafkaAcl(namespace, accessControlEntry); + accessControlEntryAsyncExecutor.deleteAcl(namespace, accessControlEntry); accessControlEntryRepository.delete(accessControlEntry); } diff --git a/src/main/java/com/michelin/ns4kafka/service/executor/AccessControlEntryAsyncExecutor.java b/src/main/java/com/michelin/ns4kafka/service/executor/AccessControlEntryAsyncExecutor.java index 3244151e..72b8f62a 100644 --- a/src/main/java/com/michelin/ns4kafka/service/executor/AccessControlEntryAsyncExecutor.java +++ b/src/main/java/com/michelin/ns4kafka/service/executor/AccessControlEntryAsyncExecutor.java @@ -12,7 +12,6 @@ import com.michelin.ns4kafka.repository.NamespaceRepository; import com.michelin.ns4kafka.repository.kafka.KafkaStoreException; import com.michelin.ns4kafka.service.AccessControlEntryService; -import com.michelin.ns4kafka.service.ConnectorService; import com.michelin.ns4kafka.service.StreamService; import io.micronaut.context.annotation.EachBean; import jakarta.inject.Singleton; @@ -52,8 +51,6 @@ public class AccessControlEntryAsyncExecutor { private StreamService streamService; - private ConnectorService connectorService; - private NamespaceRepository namespaceRepository; /** @@ -130,9 +127,10 @@ private List collectNs4KafkaAcls() { .stream() .flatMap(namespace -> accessControlEntryService.findAllGrantedToNamespace(namespace) .stream() - .filter(accessControlEntry -> (List.of(TOPIC, GROUP, TRANSACTIONAL_ID) - .contains(accessControlEntry.getSpec().getResourceType()))) - .flatMap(accessControlEntry -> buildAclBindingsFromAccessControlEntry(accessControlEntry, + .filter(accessControlEntry -> List.of(TOPIC, GROUP, TRANSACTIONAL_ID) + .contains(accessControlEntry.getSpec().getResourceType())) + .flatMap(accessControlEntry -> buildAclBindingsFromAccessControlEntry( + accessControlEntry, namespace.getSpec().getKafkaUser()) .stream()) .distinct()); @@ -226,40 +224,45 @@ private List collectBrokerAcls(boolean managedUsersOnly) } /** - * Convert Ns4Kafka topic and group ACL into Kafka ACL. + * Convert Ns4Kafka topic, group and transactional ACL into Kafka ACL. * * @param accessControlEntry The Ns4Kafka ACL - * @param kafkaUser The ACL owner + * @param kafkaUser The Kafka user to link to the ACL * @return A list of Kafka ACLs */ private List buildAclBindingsFromAccessControlEntry(AccessControlEntry accessControlEntry, String kafkaUser) { // Convert pattern, convert resource type from Ns4Kafka to org.apache.kafka.common types - PatternType patternType = - PatternType.fromString(accessControlEntry.getSpec().getResourcePatternType().toString()); - ResourceType resourceType = org.apache.kafka.common.resource.ResourceType.fromString( + PatternType patternType = PatternType.fromString( + accessControlEntry.getSpec().getResourcePatternType().toString()); + + ResourceType resourceType = ResourceType.fromString( accessControlEntry.getSpec().getResourceType().toString()); - ResourcePattern resourcePattern = - new ResourcePattern(resourceType, accessControlEntry.getSpec().getResource(), patternType); - // Generate the required AclOperation based on ResourceType + ResourcePattern resourcePattern = new ResourcePattern(resourceType, + accessControlEntry.getSpec().getResource(), patternType); + + // Generate the required AclOperation and principal based on the permission List targetAclOperations; if (accessControlEntry.getSpec().getPermission() == AccessControlEntry.Permission.OWNER) { targetAclOperations = computeAclOperationForOwner(resourceType); } else { - // Should be READ or WRITE - targetAclOperations = - List.of(AclOperation.fromString(accessControlEntry.getSpec().getPermission().toString())); + targetAclOperations = List.of( + AclOperation.fromString(accessControlEntry.getSpec().getPermission().toString()) + ); } - final String aclUser = - accessControlEntry.getSpec().getGrantedTo().equals(PUBLIC_GRANTED_TO) ? PUBLIC_GRANTED_TO : kafkaUser; + final String aclUser = accessControlEntry.getSpec().getGrantedTo().equals(PUBLIC_GRANTED_TO) + ? PUBLIC_GRANTED_TO : kafkaUser; + return targetAclOperations .stream() - .map(aclOperation -> - new AclBinding(resourcePattern, - new org.apache.kafka.common.acl.AccessControlEntry(USER_PRINCIPAL + aclUser, - "*", aclOperation, AclPermissionType.ALLOW))) + .map(aclOperation -> new AclBinding(resourcePattern, + new org.apache.kafka.common.acl.AccessControlEntry( + USER_PRINCIPAL + aclUser, + "*", + aclOperation, + AclPermissionType.ALLOW))) .toList(); } @@ -363,20 +366,28 @@ private void deleteAcls(List toDelete) { * Delete a given Ns4Kafka ACL. * Convert Ns4Kafka ACL into Kafka ACLs before deletion. * - * @param namespace The namespace - * @param ns4kafkaAcl The Kafka ACL + * @param namespace The namespace + * @param acl The ACL */ - public void deleteNs4KafkaAcl(Namespace namespace, AccessControlEntry ns4kafkaAcl) { + public void deleteAcl(Namespace namespace, AccessControlEntry acl) { if (managedClusterProperties.isManageAcls()) { List results = new ArrayList<>(); - if (List.of(TOPIC, GROUP, TRANSACTIONAL_ID).contains(ns4kafkaAcl.getSpec().getResourceType())) { - results.addAll(buildAclBindingsFromAccessControlEntry(ns4kafkaAcl, namespace.getSpec().getKafkaUser())); + if (List.of(TOPIC, GROUP, TRANSACTIONAL_ID).contains(acl.getSpec().getResourceType())) { + boolean isSelfAssigned = acl.getMetadata().getNamespace().equals(acl.getSpec().getGrantedTo()); + + String kafkaUser = isSelfAssigned ? namespace.getSpec().getKafkaUser() + : namespaceRepository.findByName(acl.getSpec().getGrantedTo()) + .orElseThrow() + .getSpec() + .getKafkaUser(); + + results.addAll(buildAclBindingsFromAccessControlEntry(acl, kafkaUser)); } - if (ns4kafkaAcl.getSpec().getResourceType() == AccessControlEntry.ResourceType.CONNECT - && ns4kafkaAcl.getSpec().getPermission() == AccessControlEntry.Permission.OWNER) { - results.addAll(buildAclBindingsFromConnector(ns4kafkaAcl, namespace.getSpec().getKafkaUser())); + if (acl.getSpec().getResourceType() == AccessControlEntry.ResourceType.CONNECT + && acl.getSpec().getPermission() == AccessControlEntry.Permission.OWNER) { + results.addAll(buildAclBindingsFromConnector(acl, namespace.getSpec().getKafkaUser())); } deleteAcls(results); diff --git a/src/main/java/com/michelin/ns4kafka/util/FormatErrorUtils.java b/src/main/java/com/michelin/ns4kafka/util/FormatErrorUtils.java index f7ab60ff..7b118bcd 100644 --- a/src/main/java/com/michelin/ns4kafka/util/FormatErrorUtils.java +++ b/src/main/java/com/michelin/ns4kafka/util/FormatErrorUtils.java @@ -87,7 +87,7 @@ public static String invalidAclNotOwnerOfTopLevel(String invalidResourceValue, invalidResourcePatternTypeValue) { return String.format(INVALID_FIELDS, invalidResourceValue, invalidResourcePatternTypeValue, "resource", "resourcePatternType", - "cannot grant ACL to yourself"); + "cannot grant ACL because namespace is not owner of the top level resource"); } /** @@ -736,6 +736,6 @@ public static String invalidTopicTags(String invalidTagValue) { */ public static String invalidTopicTagsFormat(String invalidTagFormatValue) { return String.format(INVALID_FIELD, invalidTagFormatValue, "tags", - "tags should start with letter and be followed by alphanumeric or _ characters"); + "tags should start with letter and be followed by alphanumeric or _ characters"); } } diff --git a/src/test/java/com/michelin/ns4kafka/service/AccessControlEntryServiceTest.java b/src/test/java/com/michelin/ns4kafka/service/AccessControlEntryServiceTest.java index 95226448..90d50782 100644 --- a/src/test/java/com/michelin/ns4kafka/service/AccessControlEntryServiceTest.java +++ b/src/test/java/com/michelin/ns4kafka/service/AccessControlEntryServiceTest.java @@ -74,7 +74,7 @@ void validateNotAllowedResources() { "Invalid value \"OWNER\" for field \"permission\": value must be one of \"READ, WRITE\".", "Invalid value \"target-ns\" for field \"grantedTo\": resource not found.", "Invalid value \"test/PREFIXED\" for fields \"resource/resourcePatternType\": " - + "cannot grant ACL to yourself."), + + "cannot grant ACL because namespace is not owner of the top level resource."), actual); } @@ -112,7 +112,7 @@ void validateNotAllowedSelfGrant() { assertLinesMatch(List.of( "Invalid value \"namespace\" for field \"grantedTo\": cannot grant ACL to yourself.", "Invalid value \"test/PREFIXED\" for fields \"resource/resourcePatternType\": " - + "cannot grant ACL to yourself."), + + "cannot grant ACL because namespace is not owner of the top level resource."), actual); } @@ -124,6 +124,7 @@ void validateNotAllowedOwnerOfBadPrefix() { .cluster("local") .build()) .build(); + AccessControlEntry accessControlEntry = AccessControlEntry.builder() .metadata(Metadata.builder() .name("acl-name") @@ -137,6 +138,7 @@ void validateNotAllowedOwnerOfBadPrefix() { .grantedTo("target-ns") .build()) .build(); + when(applicationContext.getBean(NamespaceService.class)) .thenReturn(namespaceService); when(namespaceService.findByName("target-ns")) @@ -155,11 +157,11 @@ void validateNotAllowedOwnerOfBadPrefix() { )); List actual = accessControlEntryService.validate(accessControlEntry, ns); assertLinesMatch(List.of("Invalid value \"main/PREFIXED\" for fields \"resource/resourcePatternType\": " - + "cannot grant ACL to yourself."), actual); + + "cannot grant ACL because namespace is not owner of the top level resource."), actual); } @Test - void validate_NotAllowedOwnerOfBadLiteral() { + void validateNotAllowedOwnerOfBadLiteral() { Namespace ns = Namespace.builder() .metadata(Metadata.builder() .name("namespace") @@ -193,13 +195,13 @@ void validate_NotAllowedOwnerOfBadLiteral() { .permission(AccessControlEntry.Permission.OWNER) .resource("resource1") .grantedTo("namespace") - .build() - ) + .build()) .build() )); + List actual = accessControlEntryService.validate(accessControlEntry, ns); assertLinesMatch(List.of("Invalid value \"resource2/LITERAL\" for fields \"resource/resourcePatternType\": " - + "cannot grant ACL to yourself."), actual); + + "cannot grant ACL because namespace is not owner of the top level resource."), actual); } @Test @@ -286,7 +288,7 @@ void validate_AllowedOwnerOfPrefix() { } @Test - void validate_AllowedPublicGrantedTo() { + void validateAllowedPublicGrantedTo() { Namespace ns = Namespace.builder() .metadata(Metadata.builder() .name("namespace")