From 782dc189cda1bf93a45010f523957a7452f85911 Mon Sep 17 00:00:00 2001 From: thcai Date: Wed, 18 Dec 2024 15:23:19 +0100 Subject: [PATCH] Split secured ns granting public ACL from granting ACL to basic ns --- .../controller/acl/AclController.java | 11 ++- .../michelin/ns4kafka/model/Namespace.java | 2 +- .../ns4kafka/util/FormatErrorUtils.java | 10 +++ .../controller/AclControllerTest.java | 74 ++++++++++++++----- .../integration/AclIntegrationTest.java | 2 +- 5 files changed, 76 insertions(+), 23 deletions(-) diff --git a/src/main/java/com/michelin/ns4kafka/controller/acl/AclController.java b/src/main/java/com/michelin/ns4kafka/controller/acl/AclController.java index d0c8c012..1d7e9a6c 100644 --- a/src/main/java/com/michelin/ns4kafka/controller/acl/AclController.java +++ b/src/main/java/com/michelin/ns4kafka/controller/acl/AclController.java @@ -5,6 +5,7 @@ import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidImmutableField; import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidNotFound; import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidSecuredNamespaceGrantAcl; +import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidSecuredNamespaceGrantPublicAcl; import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidSelfAssignedAclDelete; import static com.michelin.ns4kafka.util.enumation.Kind.ACCESS_CONTROL_ENTRY; import static io.micronaut.core.util.StringUtils.EMPTY_STRING; @@ -108,14 +109,20 @@ public HttpResponse apply(Authentication authentication, Namespace ns = getNamespace(namespace); boolean grantorIsSecured = getNamespace(accessControlEntry.getMetadata().getNamespace()).getSpec().isSecured(); - - boolean granteeIsSecured = !PUBLIC_GRANTED_TO.equals(accessControlEntry.getSpec().getGrantedTo()) + boolean granteeIsPublic = PUBLIC_GRANTED_TO.equals(accessControlEntry.getSpec().getGrantedTo()); + boolean granteeIsSecured = !granteeIsPublic && getNamespace(accessControlEntry.getSpec().getGrantedTo()).getSpec().isSecured(); boolean isAdmin = authentication.getRoles().contains(ResourceBasedSecurityRule.IS_ADMIN); boolean isSelfAssigned = namespace.equals(accessControlEntry.getSpec().getGrantedTo()); List validationErrors; + // secured namespaces are not allowed to grant public ACL + if (grantorIsSecured && granteeIsPublic) { + throw new ResourceValidationException(accessControlEntry, invalidSecuredNamespaceGrantPublicAcl()); + } + + // secured namespaces are not allowed to grant ACL to basic namespaces if (grantorIsSecured && !granteeIsSecured) { throw new ResourceValidationException(accessControlEntry, invalidSecuredNamespaceGrantAcl()); } diff --git a/src/main/java/com/michelin/ns4kafka/model/Namespace.java b/src/main/java/com/michelin/ns4kafka/model/Namespace.java index 36d2a846..dab6cd99 100644 --- a/src/main/java/com/michelin/ns4kafka/model/Namespace.java +++ b/src/main/java/com/michelin/ns4kafka/model/Namespace.java @@ -50,7 +50,7 @@ public static class NamespaceSpec { @NotBlank private String kafkaUser; @Builder.Default - private boolean isSecured = Boolean.FALSE; + private boolean secured = Boolean.FALSE; @Builder.Default private List connectClusters = List.of(); private TopicValidator topicValidator; diff --git a/src/main/java/com/michelin/ns4kafka/util/FormatErrorUtils.java b/src/main/java/com/michelin/ns4kafka/util/FormatErrorUtils.java index a9f011ad..f3878657 100644 --- a/src/main/java/com/michelin/ns4kafka/util/FormatErrorUtils.java +++ b/src/main/java/com/michelin/ns4kafka/util/FormatErrorUtils.java @@ -648,6 +648,16 @@ public static String invalidSecuredNamespaceGrantAcl() { "secured namespace cannot grant ACL to basic namespaces"); } + /** + * Invalid secured namespace grant ACL to basic namespaces. + * + * @return the error message + */ + public static String invalidSecuredNamespaceGrantPublicAcl() { + return String.format(INVALID_OPERATION, OPERATION_APPLY, + "secured namespace cannot grant public ACL"); + } + /** * Invalid schema suffix. * diff --git a/src/test/java/com/michelin/ns4kafka/controller/AclControllerTest.java b/src/test/java/com/michelin/ns4kafka/controller/AclControllerTest.java index dc34f2f5..c113de1e 100644 --- a/src/test/java/com/michelin/ns4kafka/controller/AclControllerTest.java +++ b/src/test/java/com/michelin/ns4kafka/controller/AclControllerTest.java @@ -313,7 +313,7 @@ void shouldGetAcl() { void shouldApplyFailsAsAdmin() { Namespace namespace = Namespace.builder() .spec(Namespace.NamespaceSpec.builder() - .isSecured(Boolean.FALSE) + .secured(Boolean.FALSE) .build()) .metadata(Metadata.builder() .name("test") @@ -351,7 +351,7 @@ void shouldApplyFailsAsAdmin() { void shouldApplyWithSuccessAsAdmin() { Namespace namespace = Namespace.builder() .spec(Namespace.NamespaceSpec.builder() - .isSecured(Boolean.FALSE) + .secured(Boolean.FALSE) .build()) .metadata(Metadata.builder() .name("test") @@ -391,7 +391,7 @@ void shouldApplyWithSuccessAsAdmin() { void shouldApplyFailWithAclValidationErrors() { Namespace namespace = Namespace.builder() .spec(Namespace.NamespaceSpec.builder() - .isSecured(Boolean.FALSE) + .secured(Boolean.FALSE) .build()) .metadata(Metadata.builder() .name("test") @@ -427,7 +427,7 @@ void shouldApplyFailWithAclValidationErrors() { void shouldApplyAclWithSuccess() { Namespace namespace = Namespace.builder() .spec(Namespace.NamespaceSpec.builder() - .isSecured(Boolean.FALSE) + .secured(Boolean.FALSE) .build()) .metadata(Metadata.builder() .name("test") @@ -468,7 +468,7 @@ void shouldApplyAclWithSuccess() { void shouldEndApplyWithSuccessWhenAclAlreadyExists() { Namespace namespace = Namespace.builder() .spec(Namespace.NamespaceSpec.builder() - .isSecured(Boolean.FALSE) + .secured(Boolean.FALSE) .build()) .metadata(Metadata.builder() .name("test") @@ -508,7 +508,7 @@ void shouldEndApplyWithSuccessWhenAclAlreadyExists() { void shouldApplyFailWhenSpecChanges() { Namespace namespace = Namespace.builder() .spec(Namespace.NamespaceSpec.builder() - .isSecured(Boolean.FALSE) + .secured(Boolean.FALSE) .build()) .metadata(Metadata.builder() .name("test") @@ -561,7 +561,7 @@ void shouldApplyFailWhenSpecChanges() { void shouldApplyAclWithSuccessWhenMetadataChanges() { Namespace namespace = Namespace.builder() .spec(Namespace.NamespaceSpec.builder() - .isSecured(Boolean.FALSE) + .secured(Boolean.FALSE) .build()) .metadata(Metadata.builder() .name("test") @@ -618,7 +618,7 @@ void shouldApplyAclWithSuccessWhenMetadataChanges() { void shouldApplyAclWithSuccessWhenMetadataChangesInDryRunMode() { Namespace ns = Namespace.builder() .spec(Namespace.NamespaceSpec.builder() - .isSecured(Boolean.FALSE) + .secured(Boolean.FALSE) .build()) .metadata(Metadata.builder() .name("test") @@ -674,7 +674,7 @@ void shouldApplyAclWithSuccessWhenMetadataChangesInDryRunMode() { void shouldNotApplyAclInDryRunModeAsAdmin() { Namespace namespace = Namespace.builder() .spec(Namespace.NamespaceSpec.builder() - .isSecured(Boolean.FALSE) + .secured(Boolean.FALSE) .build()) .metadata(Metadata.builder() .name("test") @@ -684,7 +684,7 @@ void shouldNotApplyAclInDryRunModeAsAdmin() { Namespace adminNamespace = Namespace.builder() .spec(Namespace.NamespaceSpec.builder() - .isSecured(Boolean.FALSE) + .secured(Boolean.FALSE) .build()) .metadata(Metadata.builder() .name("admin") @@ -724,7 +724,7 @@ void shouldNotApplyAclInDryRunModeAsAdmin() { void shouldNotApplyAclInDryRunMode() { Namespace namespace = Namespace.builder() .spec(Namespace.NamespaceSpec.builder() - .isSecured(Boolean.FALSE) + .secured(Boolean.FALSE) .build()) .metadata(Metadata.builder() .name("test") @@ -760,7 +760,7 @@ void shouldNotApplyAclInDryRunMode() { void shouldApplyAclFromSecuredNamespaceToSecuredNamespace() { Namespace namespace1 = Namespace.builder() .spec(Namespace.NamespaceSpec.builder() - .isSecured(Boolean.TRUE) + .secured(Boolean.TRUE) .build()) .metadata(Metadata.builder() .name("secured1") @@ -770,7 +770,7 @@ void shouldApplyAclFromSecuredNamespaceToSecuredNamespace() { Namespace namespace2 = Namespace.builder() .spec(Namespace.NamespaceSpec.builder() - .isSecured(Boolean.TRUE) + .secured(Boolean.TRUE) .build()) .metadata(Metadata.builder() .name("secured2") @@ -810,7 +810,7 @@ void shouldApplyAclFromSecuredNamespaceToSecuredNamespace() { void shouldApplyAclFromBasicNamespaceToBasicNamespace() { Namespace namespace1 = Namespace.builder() .spec(Namespace.NamespaceSpec.builder() - .isSecured(Boolean.FALSE) + .secured(Boolean.FALSE) .build()) .metadata(Metadata.builder() .name("basic1") @@ -820,7 +820,7 @@ void shouldApplyAclFromBasicNamespaceToBasicNamespace() { Namespace namespace2 = Namespace.builder() .spec(Namespace.NamespaceSpec.builder() - .isSecured(Boolean.FALSE) + .secured(Boolean.FALSE) .build()) .metadata(Metadata.builder() .name("basic2") @@ -860,7 +860,7 @@ void shouldApplyAclFromBasicNamespaceToBasicNamespace() { void shouldApplyAclFromBasicNamespaceToSecuredNamespace() { Namespace namespace1 = Namespace.builder() .spec(Namespace.NamespaceSpec.builder() - .isSecured(Boolean.FALSE) + .secured(Boolean.FALSE) .build()) .metadata(Metadata.builder() .name("basic") @@ -870,7 +870,7 @@ void shouldApplyAclFromBasicNamespaceToSecuredNamespace() { Namespace namespace2 = Namespace.builder() .spec(Namespace.NamespaceSpec.builder() - .isSecured(Boolean.TRUE) + .secured(Boolean.TRUE) .build()) .metadata(Metadata.builder() .name("secured") @@ -910,7 +910,7 @@ void shouldApplyAclFromBasicNamespaceToSecuredNamespace() { void shouldNotApplyAclFromSecuredNamespaceToBasicNamespace() { Namespace namespace1 = Namespace.builder() .spec(Namespace.NamespaceSpec.builder() - .isSecured(Boolean.TRUE) + .secured(Boolean.TRUE) .build()) .metadata(Metadata.builder() .name("secured") @@ -920,7 +920,7 @@ void shouldNotApplyAclFromSecuredNamespaceToBasicNamespace() { Namespace namespace2 = Namespace.builder() .spec(Namespace.NamespaceSpec.builder() - .isSecured(Boolean.FALSE) + .secured(Boolean.FALSE) .build()) .metadata(Metadata.builder() .name("basic") @@ -953,6 +953,42 @@ void shouldNotApplyAclFromSecuredNamespaceToBasicNamespace() { actual.getValidationErrors().getFirst()); } + @Test + void shouldNotApplyPublicAclFromSecuredNamespace() { + Namespace namespace = Namespace.builder() + .spec(Namespace.NamespaceSpec.builder() + .secured(Boolean.TRUE) + .build()) + .metadata(Metadata.builder() + .name("test") + .cluster("local") + .build()) + .build(); + + AccessControlEntry accessControlEntry = AccessControlEntry.builder() + .metadata(Metadata.builder() + .namespace("test") + .build()) + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .resourceType(AccessControlEntry.ResourceType.TOPIC) + .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) + .permission(AccessControlEntry.Permission.READ) + .resource("prefix") + .grantedTo("*") + .build()) + .build(); + + Authentication authentication = Authentication.build("user", Map.of("roles", List.of())); + + when(namespaceService.findByName("test")).thenReturn(Optional.of(namespace)); + + ResourceValidationException actual = assertThrows(ResourceValidationException.class, + () -> accessControlListController.apply(authentication, "test", accessControlEntry, false)); + assertEquals(1, actual.getValidationErrors().size()); + assertEquals("Invalid \"apply\" operation: secured namespace cannot grant public ACL.", + actual.getValidationErrors().getFirst()); + } + @Test @SuppressWarnings("deprecation") void shouldNotDeleteAclWhenNotFound() { diff --git a/src/test/java/com/michelin/ns4kafka/integration/AclIntegrationTest.java b/src/test/java/com/michelin/ns4kafka/integration/AclIntegrationTest.java index f60abaf7..50fae787 100644 --- a/src/test/java/com/michelin/ns4kafka/integration/AclIntegrationTest.java +++ b/src/test/java/com/michelin/ns4kafka/integration/AclIntegrationTest.java @@ -66,7 +66,7 @@ void init() { .build()) .spec(NamespaceSpec.builder() .kafkaUser("user1") - .isSecured(Boolean.FALSE) + .secured(Boolean.FALSE) .connectClusters(List.of("test-connect")) .topicValidator(TopicValidator.makeDefaultOneBroker()) .build())