diff --git a/src/main/java/com/michelin/ns4kafka/controller/acl/AclController.java b/src/main/java/com/michelin/ns4kafka/controller/acl/AclController.java index 49bc9e4c..d0c8c012 100644 --- a/src/main/java/com/michelin/ns4kafka/controller/acl/AclController.java +++ b/src/main/java/com/michelin/ns4kafka/controller/acl/AclController.java @@ -1,8 +1,10 @@ package com.michelin.ns4kafka.controller.acl; +import static com.michelin.ns4kafka.service.AclService.PUBLIC_GRANTED_TO; import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidAclDeleteOnlyAdmin; import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidImmutableField; import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidNotFound; +import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidSecuredNamespaceGrantAcl; import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidSelfAssignedAclDelete; import static com.michelin.ns4kafka.util.enumation.Kind.ACCESS_CONTROL_ENTRY; import static io.micronaut.core.util.StringUtils.EMPTY_STRING; @@ -105,10 +107,19 @@ public HttpResponse apply(Authentication authentication, @QueryValue(defaultValue = "false") boolean dryrun) { Namespace ns = getNamespace(namespace); + boolean grantorIsSecured = getNamespace(accessControlEntry.getMetadata().getNamespace()).getSpec().isSecured(); + + boolean granteeIsSecured = !PUBLIC_GRANTED_TO.equals(accessControlEntry.getSpec().getGrantedTo()) + && getNamespace(accessControlEntry.getSpec().getGrantedTo()).getSpec().isSecured(); boolean isAdmin = authentication.getRoles().contains(ResourceBasedSecurityRule.IS_ADMIN); boolean isSelfAssigned = namespace.equals(accessControlEntry.getSpec().getGrantedTo()); List validationErrors; + + if (grantorIsSecured && !granteeIsSecured) { + throw new ResourceValidationException(accessControlEntry, invalidSecuredNamespaceGrantAcl()); + } + if (isAdmin && isSelfAssigned) { // Validate overlapping OWNER validationErrors = aclService.validateAsAdmin(accessControlEntry, ns); diff --git a/src/main/java/com/michelin/ns4kafka/model/Namespace.java b/src/main/java/com/michelin/ns4kafka/model/Namespace.java index c17d6b8e..36d2a846 100644 --- a/src/main/java/com/michelin/ns4kafka/model/Namespace.java +++ b/src/main/java/com/michelin/ns4kafka/model/Namespace.java @@ -50,6 +50,8 @@ public static class NamespaceSpec { @NotBlank private String kafkaUser; @Builder.Default + private boolean isSecured = Boolean.FALSE; + @Builder.Default private List connectClusters = List.of(); private TopicValidator topicValidator; private ConnectValidator connectValidator; diff --git a/src/main/java/com/michelin/ns4kafka/util/FormatErrorUtils.java b/src/main/java/com/michelin/ns4kafka/util/FormatErrorUtils.java index 9d22c087..a9f011ad 100644 --- a/src/main/java/com/michelin/ns4kafka/util/FormatErrorUtils.java +++ b/src/main/java/com/michelin/ns4kafka/util/FormatErrorUtils.java @@ -638,6 +638,16 @@ public static String invalidQuotaOperationCannotAdd(ResourceQuota.ResourceQuotaS toAdd)); } + /** + * Invalid secured namespace grant ACL to basic namespaces. + * + * @return the error message + */ + public static String invalidSecuredNamespaceGrantAcl() { + return String.format(INVALID_OPERATION, OPERATION_APPLY, + "secured namespace cannot grant ACL to basic namespaces"); + } + /** * Invalid schema suffix. * diff --git a/src/test/java/com/michelin/ns4kafka/controller/AclControllerTest.java b/src/test/java/com/michelin/ns4kafka/controller/AclControllerTest.java index ea51bb5c..dc34f2f5 100644 --- a/src/test/java/com/michelin/ns4kafka/controller/AclControllerTest.java +++ b/src/test/java/com/michelin/ns4kafka/controller/AclControllerTest.java @@ -312,6 +312,9 @@ void shouldGetAcl() { @Test void shouldApplyFailsAsAdmin() { Namespace namespace = Namespace.builder() + .spec(Namespace.NamespaceSpec.builder() + .isSecured(Boolean.FALSE) + .build()) .metadata(Metadata.builder() .name("test") .cluster("local") @@ -347,6 +350,9 @@ void shouldApplyFailsAsAdmin() { @Test void shouldApplyWithSuccessAsAdmin() { Namespace namespace = Namespace.builder() + .spec(Namespace.NamespaceSpec.builder() + .isSecured(Boolean.FALSE) + .build()) .metadata(Metadata.builder() .name("test") .cluster("local") @@ -356,6 +362,7 @@ void shouldApplyWithSuccessAsAdmin() { AccessControlEntry accessControlEntry = AccessControlEntry.builder() .metadata(Metadata.builder() .name("acl-test") + .namespace("test") .build()) .spec(AccessControlEntry.AccessControlEntrySpec.builder() .resourceType(AccessControlEntry.ResourceType.TOPIC) @@ -381,8 +388,11 @@ void shouldApplyWithSuccessAsAdmin() { } @Test - void shouldApplyFailWithValidationErrors() { + void shouldApplyFailWithAclValidationErrors() { Namespace namespace = Namespace.builder() + .spec(Namespace.NamespaceSpec.builder() + .isSecured(Boolean.FALSE) + .build()) .metadata(Metadata.builder() .name("test") .cluster("local") @@ -416,6 +426,9 @@ void shouldApplyFailWithValidationErrors() { @Test void shouldApplyAclWithSuccess() { Namespace namespace = Namespace.builder() + .spec(Namespace.NamespaceSpec.builder() + .isSecured(Boolean.FALSE) + .build()) .metadata(Metadata.builder() .name("test") .cluster("local") @@ -424,6 +437,7 @@ void shouldApplyAclWithSuccess() { AccessControlEntry accessControlEntry = AccessControlEntry.builder() .metadata(Metadata.builder() + .namespace("test") .build()) .spec(AccessControlEntry.AccessControlEntrySpec.builder() .resourceType(AccessControlEntry.ResourceType.TOPIC) @@ -453,6 +467,9 @@ void shouldApplyAclWithSuccess() { @Test void shouldEndApplyWithSuccessWhenAclAlreadyExists() { Namespace namespace = Namespace.builder() + .spec(Namespace.NamespaceSpec.builder() + .isSecured(Boolean.FALSE) + .build()) .metadata(Metadata.builder() .name("test") .cluster("local") @@ -462,6 +479,7 @@ void shouldEndApplyWithSuccessWhenAclAlreadyExists() { AccessControlEntry accessControlEntry = AccessControlEntry.builder() .metadata(Metadata.builder() .name("ace1") + .namespace("test") .build()) .spec(AccessControlEntry.AccessControlEntrySpec.builder() .resourceType(AccessControlEntry.ResourceType.TOPIC) @@ -489,6 +507,9 @@ void shouldEndApplyWithSuccessWhenAclAlreadyExists() { @Test void shouldApplyFailWhenSpecChanges() { Namespace namespace = Namespace.builder() + .spec(Namespace.NamespaceSpec.builder() + .isSecured(Boolean.FALSE) + .build()) .metadata(Metadata.builder() .name("test") .cluster("local") @@ -498,6 +519,7 @@ void shouldApplyFailWhenSpecChanges() { AccessControlEntry accessControlEntry = AccessControlEntry.builder() .metadata(Metadata.builder() .name("ace1") + .namespace("test") .build()) .spec(AccessControlEntry.AccessControlEntrySpec.builder() .resourceType(AccessControlEntry.ResourceType.TOPIC) @@ -511,6 +533,7 @@ void shouldApplyFailWhenSpecChanges() { AccessControlEntry oldAccessControlEntry = AccessControlEntry.builder() .metadata(Metadata.builder() .name("ace1") + .namespace("test") .build()) .spec(AccessControlEntry.AccessControlEntrySpec.builder() .resourceType(AccessControlEntry.ResourceType.CONNECT) // This line was changed @@ -537,6 +560,9 @@ void shouldApplyFailWhenSpecChanges() { @Test void shouldApplyAclWithSuccessWhenMetadataChanges() { Namespace namespace = Namespace.builder() + .spec(Namespace.NamespaceSpec.builder() + .isSecured(Boolean.FALSE) + .build()) .metadata(Metadata.builder() .name("test") .cluster("local") @@ -546,6 +572,7 @@ void shouldApplyAclWithSuccessWhenMetadataChanges() { AccessControlEntry accessControlEntry = AccessControlEntry.builder() .metadata(Metadata.builder() .name("ace1") + .namespace("test") .labels(Map.of("new-label", "label-value")) // This label is new .build()) .spec(AccessControlEntry.AccessControlEntrySpec.builder() @@ -561,6 +588,7 @@ void shouldApplyAclWithSuccessWhenMetadataChanges() { AccessControlEntry.builder() .metadata(Metadata.builder() .name("ace1") + .namespace("test") .build()) .spec(AccessControlEntry.AccessControlEntrySpec.builder() .resourceType(AccessControlEntry.ResourceType.TOPIC) @@ -589,6 +617,9 @@ void shouldApplyAclWithSuccessWhenMetadataChanges() { @Test void shouldApplyAclWithSuccessWhenMetadataChangesInDryRunMode() { Namespace ns = Namespace.builder() + .spec(Namespace.NamespaceSpec.builder() + .isSecured(Boolean.FALSE) + .build()) .metadata(Metadata.builder() .name("test") .cluster("local") @@ -598,6 +629,7 @@ void shouldApplyAclWithSuccessWhenMetadataChangesInDryRunMode() { AccessControlEntry accessControlEntry = AccessControlEntry.builder() .metadata(Metadata.builder() .name("ace1") + .namespace("test") .labels(Map.of("new-label", "label-value")) // This label is new .build()) .spec(AccessControlEntry.AccessControlEntrySpec.builder() @@ -613,6 +645,7 @@ void shouldApplyAclWithSuccessWhenMetadataChangesInDryRunMode() { AccessControlEntry.builder() .metadata(Metadata.builder() .name("ace1") + .namespace("test") .build()) .spec(AccessControlEntry.AccessControlEntrySpec.builder() .resourceType(AccessControlEntry.ResourceType.TOPIC) @@ -638,14 +671,27 @@ void shouldApplyAclWithSuccessWhenMetadataChangesInDryRunMode() { } @Test - void shouldApplyAclInDryRunModeAsAdmin() { + void shouldNotApplyAclInDryRunModeAsAdmin() { Namespace namespace = Namespace.builder() + .spec(Namespace.NamespaceSpec.builder() + .isSecured(Boolean.FALSE) + .build()) .metadata(Metadata.builder() .name("test") .cluster("local") .build()) .build(); + Namespace adminNamespace = Namespace.builder() + .spec(Namespace.NamespaceSpec.builder() + .isSecured(Boolean.FALSE) + .build()) + .metadata(Metadata.builder() + .name("admin") + .cluster("local") + .build()) + .build(); + AccessControlEntry accessControlEntry = AccessControlEntry.builder() .metadata(Metadata.builder() @@ -665,6 +711,7 @@ void shouldApplyAclInDryRunModeAsAdmin() { Map.of(ROLES, List.of("isAdmin()"))); when(namespaceService.findByName("test")).thenReturn(Optional.of(namespace)); + when(namespaceService.findByName("admin")).thenReturn(Optional.of(adminNamespace)); when(aclService.validateAsAdmin(accessControlEntry, namespace)).thenReturn(List.of()); var response = accessControlListController.apply(authentication, "test", accessControlEntry, true); @@ -674,8 +721,11 @@ void shouldApplyAclInDryRunModeAsAdmin() { } @Test - void shouldApplyAclInDryRunMode() { + void shouldNotApplyAclInDryRunMode() { Namespace namespace = Namespace.builder() + .spec(Namespace.NamespaceSpec.builder() + .isSecured(Boolean.FALSE) + .build()) .metadata(Metadata.builder() .name("test") .cluster("local") @@ -684,6 +734,7 @@ void shouldApplyAclInDryRunMode() { AccessControlEntry accessControlEntry = AccessControlEntry.builder() .metadata(Metadata.builder() + .namespace("test") .build()) .spec(AccessControlEntry.AccessControlEntrySpec.builder() .resourceType(AccessControlEntry.ResourceType.TOPIC) @@ -705,6 +756,203 @@ void shouldApplyAclInDryRunMode() { verify(aclService, never()).create(accessControlEntry); } + @Test + void shouldApplyAclFromSecuredNamespaceToSecuredNamespace() { + Namespace namespace1 = Namespace.builder() + .spec(Namespace.NamespaceSpec.builder() + .isSecured(Boolean.TRUE) + .build()) + .metadata(Metadata.builder() + .name("secured1") + .cluster("local") + .build()) + .build(); + + Namespace namespace2 = Namespace.builder() + .spec(Namespace.NamespaceSpec.builder() + .isSecured(Boolean.TRUE) + .build()) + .metadata(Metadata.builder() + .name("secured2") + .cluster("local") + .build()) + .build(); + + AccessControlEntry accessControlEntry = AccessControlEntry.builder() + .metadata(Metadata.builder() + .namespace("secured1") + .build()) + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .resourceType(AccessControlEntry.ResourceType.TOPIC) + .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) + .permission(AccessControlEntry.Permission.READ) + .resource("prefix") + .grantedTo("secured2") + .build()) + .build(); + + Authentication authentication = Authentication.build("user", Map.of("roles", List.of())); + + when(namespaceService.findByName("secured1")).thenReturn(Optional.of(namespace1)); + when(namespaceService.findByName("secured2")).thenReturn(Optional.of(namespace2)); + when(aclService.validate(accessControlEntry, namespace1)).thenReturn(List.of()); + when(aclService.create(accessControlEntry)).thenReturn(accessControlEntry); + + var response = accessControlListController.apply(authentication, "secured1", accessControlEntry, false); + AccessControlEntry actual = response.body(); + + assertEquals("created", response.header("X-Ns4kafka-Result")); + assertEquals("secured1", actual.getMetadata().getNamespace()); + assertEquals("local", actual.getMetadata().getCluster()); + } + + @Test + void shouldApplyAclFromBasicNamespaceToBasicNamespace() { + Namespace namespace1 = Namespace.builder() + .spec(Namespace.NamespaceSpec.builder() + .isSecured(Boolean.FALSE) + .build()) + .metadata(Metadata.builder() + .name("basic1") + .cluster("local") + .build()) + .build(); + + Namespace namespace2 = Namespace.builder() + .spec(Namespace.NamespaceSpec.builder() + .isSecured(Boolean.FALSE) + .build()) + .metadata(Metadata.builder() + .name("basic2") + .cluster("local") + .build()) + .build(); + + AccessControlEntry accessControlEntry = AccessControlEntry.builder() + .metadata(Metadata.builder() + .namespace("basic1") + .build()) + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .resourceType(AccessControlEntry.ResourceType.TOPIC) + .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) + .permission(AccessControlEntry.Permission.WRITE) + .resource("prefix") + .grantedTo("basic2") + .build()) + .build(); + + Authentication authentication = Authentication.build("user", Map.of("roles", List.of())); + + when(namespaceService.findByName("basic1")).thenReturn(Optional.of(namespace1)); + when(namespaceService.findByName("basic2")).thenReturn(Optional.of(namespace2)); + when(aclService.validate(accessControlEntry, namespace1)).thenReturn(List.of()); + when(aclService.create(accessControlEntry)).thenReturn(accessControlEntry); + + var response = accessControlListController.apply(authentication, "basic1", accessControlEntry, false); + AccessControlEntry actual = response.body(); + + assertEquals("created", response.header("X-Ns4kafka-Result")); + assertEquals("basic1", actual.getMetadata().getNamespace()); + assertEquals("local", actual.getMetadata().getCluster()); + } + + @Test + void shouldApplyAclFromBasicNamespaceToSecuredNamespace() { + Namespace namespace1 = Namespace.builder() + .spec(Namespace.NamespaceSpec.builder() + .isSecured(Boolean.FALSE) + .build()) + .metadata(Metadata.builder() + .name("basic") + .cluster("local") + .build()) + .build(); + + Namespace namespace2 = Namespace.builder() + .spec(Namespace.NamespaceSpec.builder() + .isSecured(Boolean.TRUE) + .build()) + .metadata(Metadata.builder() + .name("secured") + .cluster("local") + .build()) + .build(); + + AccessControlEntry accessControlEntry = AccessControlEntry.builder() + .metadata(Metadata.builder() + .namespace("basic") + .build()) + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .resourceType(AccessControlEntry.ResourceType.TOPIC) + .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) + .permission(AccessControlEntry.Permission.WRITE) + .resource("prefix") + .grantedTo("secured") + .build()) + .build(); + + Authentication authentication = Authentication.build("user", Map.of("roles", List.of())); + + when(namespaceService.findByName("basic")).thenReturn(Optional.of(namespace1)); + when(namespaceService.findByName("secured")).thenReturn(Optional.of(namespace2)); + when(aclService.validate(accessControlEntry, namespace1)).thenReturn(List.of()); + when(aclService.create(accessControlEntry)).thenReturn(accessControlEntry); + + var response = accessControlListController.apply(authentication, "basic", accessControlEntry, false); + AccessControlEntry actual = response.body(); + + assertEquals("created", response.header("X-Ns4kafka-Result")); + assertEquals("basic", actual.getMetadata().getNamespace()); + assertEquals("local", actual.getMetadata().getCluster()); + } + + @Test + void shouldNotApplyAclFromSecuredNamespaceToBasicNamespace() { + Namespace namespace1 = Namespace.builder() + .spec(Namespace.NamespaceSpec.builder() + .isSecured(Boolean.TRUE) + .build()) + .metadata(Metadata.builder() + .name("secured") + .cluster("local") + .build()) + .build(); + + Namespace namespace2 = Namespace.builder() + .spec(Namespace.NamespaceSpec.builder() + .isSecured(Boolean.FALSE) + .build()) + .metadata(Metadata.builder() + .name("basic") + .cluster("local") + .build()) + .build(); + + AccessControlEntry accessControlEntry = AccessControlEntry.builder() + .metadata(Metadata.builder() + .namespace("secured") + .build()) + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .resourceType(AccessControlEntry.ResourceType.TOPIC) + .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) + .permission(AccessControlEntry.Permission.READ) + .resource("prefix") + .grantedTo("basic") + .build()) + .build(); + + Authentication authentication = Authentication.build("user", Map.of("roles", List.of())); + + when(namespaceService.findByName("secured")).thenReturn(Optional.of(namespace1)); + when(namespaceService.findByName("basic")).thenReturn(Optional.of(namespace2)); + + ResourceValidationException actual = assertThrows(ResourceValidationException.class, + () -> accessControlListController.apply(authentication, "secured", accessControlEntry, false)); + assertEquals(1, actual.getValidationErrors().size()); + assertEquals("Invalid \"apply\" operation: secured namespace cannot grant ACL to basic namespaces.", + actual.getValidationErrors().getFirst()); + } + @Test @SuppressWarnings("deprecation") void shouldNotDeleteAclWhenNotFound() { diff --git a/src/test/java/com/michelin/ns4kafka/integration/AclIntegrationTest.java b/src/test/java/com/michelin/ns4kafka/integration/AclIntegrationTest.java index 727b1396..f60abaf7 100644 --- a/src/test/java/com/michelin/ns4kafka/integration/AclIntegrationTest.java +++ b/src/test/java/com/michelin/ns4kafka/integration/AclIntegrationTest.java @@ -66,6 +66,7 @@ void init() { .build()) .spec(NamespaceSpec.builder() .kafkaUser("user1") + .isSecured(Boolean.FALSE) .connectClusters(List.of("test-connect")) .topicValidator(TopicValidator.makeDefaultOneBroker()) .build()) diff --git a/src/test/java/com/michelin/ns4kafka/integration/StreamIntegrationTest.java b/src/test/java/com/michelin/ns4kafka/integration/StreamIntegrationTest.java index 5d827537..c0426673 100644 --- a/src/test/java/com/michelin/ns4kafka/integration/StreamIntegrationTest.java +++ b/src/test/java/com/michelin/ns4kafka/integration/StreamIntegrationTest.java @@ -64,6 +64,7 @@ void init() { AccessControlEntry acl1 = AccessControlEntry.builder() .metadata(Metadata.builder() .name("nskafkastream-acl-topic") + .namespace("nskafkastream") .build()) .spec(AccessControlEntrySpec.builder() .resourceType(ResourceType.TOPIC) @@ -98,6 +99,7 @@ void init() { AccessControlEntry acl2 = AccessControlEntry.builder() .metadata(Metadata.builder() .name("nskafkastream-acl-group") + .namespace("nskafkastream") .build()) .spec(AccessControlEntrySpec.builder() .resourceType(ResourceType.GROUP)