Skip to content

Commit

Permalink
Split secured ns granting public ACL from granting ACL to basic ns
Browse files Browse the repository at this point in the history
  • Loading branch information
ThomasCAI-mlv committed Dec 18, 2024
1 parent e1f7bb8 commit 782dc18
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidImmutableField;
import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidNotFound;
import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidSecuredNamespaceGrantAcl;
import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidSecuredNamespaceGrantPublicAcl;
import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidSelfAssignedAclDelete;
import static com.michelin.ns4kafka.util.enumation.Kind.ACCESS_CONTROL_ENTRY;
import static io.micronaut.core.util.StringUtils.EMPTY_STRING;
Expand Down Expand Up @@ -108,14 +109,20 @@ public HttpResponse<AccessControlEntry> apply(Authentication authentication,
Namespace ns = getNamespace(namespace);

boolean grantorIsSecured = getNamespace(accessControlEntry.getMetadata().getNamespace()).getSpec().isSecured();

boolean granteeIsSecured = !PUBLIC_GRANTED_TO.equals(accessControlEntry.getSpec().getGrantedTo())
boolean granteeIsPublic = PUBLIC_GRANTED_TO.equals(accessControlEntry.getSpec().getGrantedTo());
boolean granteeIsSecured = !granteeIsPublic
&& getNamespace(accessControlEntry.getSpec().getGrantedTo()).getSpec().isSecured();
boolean isAdmin = authentication.getRoles().contains(ResourceBasedSecurityRule.IS_ADMIN);
boolean isSelfAssigned = namespace.equals(accessControlEntry.getSpec().getGrantedTo());

List<String> validationErrors;

// secured namespaces are not allowed to grant public ACL
if (grantorIsSecured && granteeIsPublic) {
throw new ResourceValidationException(accessControlEntry, invalidSecuredNamespaceGrantPublicAcl());
}

// secured namespaces are not allowed to grant ACL to basic namespaces
if (grantorIsSecured && !granteeIsSecured) {
throw new ResourceValidationException(accessControlEntry, invalidSecuredNamespaceGrantAcl());
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/michelin/ns4kafka/model/Namespace.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public static class NamespaceSpec {
@NotBlank
private String kafkaUser;
@Builder.Default
private boolean isSecured = Boolean.FALSE;
private boolean secured = Boolean.FALSE;
@Builder.Default
private List<String> connectClusters = List.of();
private TopicValidator topicValidator;
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/com/michelin/ns4kafka/util/FormatErrorUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,16 @@ public static String invalidSecuredNamespaceGrantAcl() {
"secured namespace cannot grant ACL to basic namespaces");
}

/**
* Invalid secured namespace grant ACL to basic namespaces.
*
* @return the error message
*/
public static String invalidSecuredNamespaceGrantPublicAcl() {
return String.format(INVALID_OPERATION, OPERATION_APPLY,
"secured namespace cannot grant public ACL");
}

/**
* Invalid schema suffix.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ void shouldGetAcl() {
void shouldApplyFailsAsAdmin() {
Namespace namespace = Namespace.builder()
.spec(Namespace.NamespaceSpec.builder()
.isSecured(Boolean.FALSE)
.secured(Boolean.FALSE)
.build())
.metadata(Metadata.builder()
.name("test")
Expand Down Expand Up @@ -351,7 +351,7 @@ void shouldApplyFailsAsAdmin() {
void shouldApplyWithSuccessAsAdmin() {
Namespace namespace = Namespace.builder()
.spec(Namespace.NamespaceSpec.builder()
.isSecured(Boolean.FALSE)
.secured(Boolean.FALSE)
.build())
.metadata(Metadata.builder()
.name("test")
Expand Down Expand Up @@ -391,7 +391,7 @@ void shouldApplyWithSuccessAsAdmin() {
void shouldApplyFailWithAclValidationErrors() {
Namespace namespace = Namespace.builder()
.spec(Namespace.NamespaceSpec.builder()
.isSecured(Boolean.FALSE)
.secured(Boolean.FALSE)
.build())
.metadata(Metadata.builder()
.name("test")
Expand Down Expand Up @@ -427,7 +427,7 @@ void shouldApplyFailWithAclValidationErrors() {
void shouldApplyAclWithSuccess() {
Namespace namespace = Namespace.builder()
.spec(Namespace.NamespaceSpec.builder()
.isSecured(Boolean.FALSE)
.secured(Boolean.FALSE)
.build())
.metadata(Metadata.builder()
.name("test")
Expand Down Expand Up @@ -468,7 +468,7 @@ void shouldApplyAclWithSuccess() {
void shouldEndApplyWithSuccessWhenAclAlreadyExists() {
Namespace namespace = Namespace.builder()
.spec(Namespace.NamespaceSpec.builder()
.isSecured(Boolean.FALSE)
.secured(Boolean.FALSE)
.build())
.metadata(Metadata.builder()
.name("test")
Expand Down Expand Up @@ -508,7 +508,7 @@ void shouldEndApplyWithSuccessWhenAclAlreadyExists() {
void shouldApplyFailWhenSpecChanges() {
Namespace namespace = Namespace.builder()
.spec(Namespace.NamespaceSpec.builder()
.isSecured(Boolean.FALSE)
.secured(Boolean.FALSE)
.build())
.metadata(Metadata.builder()
.name("test")
Expand Down Expand Up @@ -561,7 +561,7 @@ void shouldApplyFailWhenSpecChanges() {
void shouldApplyAclWithSuccessWhenMetadataChanges() {
Namespace namespace = Namespace.builder()
.spec(Namespace.NamespaceSpec.builder()
.isSecured(Boolean.FALSE)
.secured(Boolean.FALSE)
.build())
.metadata(Metadata.builder()
.name("test")
Expand Down Expand Up @@ -618,7 +618,7 @@ void shouldApplyAclWithSuccessWhenMetadataChanges() {
void shouldApplyAclWithSuccessWhenMetadataChangesInDryRunMode() {
Namespace ns = Namespace.builder()
.spec(Namespace.NamespaceSpec.builder()
.isSecured(Boolean.FALSE)
.secured(Boolean.FALSE)
.build())
.metadata(Metadata.builder()
.name("test")
Expand Down Expand Up @@ -674,7 +674,7 @@ void shouldApplyAclWithSuccessWhenMetadataChangesInDryRunMode() {
void shouldNotApplyAclInDryRunModeAsAdmin() {
Namespace namespace = Namespace.builder()
.spec(Namespace.NamespaceSpec.builder()
.isSecured(Boolean.FALSE)
.secured(Boolean.FALSE)
.build())
.metadata(Metadata.builder()
.name("test")
Expand All @@ -684,7 +684,7 @@ void shouldNotApplyAclInDryRunModeAsAdmin() {

Namespace adminNamespace = Namespace.builder()
.spec(Namespace.NamespaceSpec.builder()
.isSecured(Boolean.FALSE)
.secured(Boolean.FALSE)
.build())
.metadata(Metadata.builder()
.name("admin")
Expand Down Expand Up @@ -724,7 +724,7 @@ void shouldNotApplyAclInDryRunModeAsAdmin() {
void shouldNotApplyAclInDryRunMode() {
Namespace namespace = Namespace.builder()
.spec(Namespace.NamespaceSpec.builder()
.isSecured(Boolean.FALSE)
.secured(Boolean.FALSE)
.build())
.metadata(Metadata.builder()
.name("test")
Expand Down Expand Up @@ -760,7 +760,7 @@ void shouldNotApplyAclInDryRunMode() {
void shouldApplyAclFromSecuredNamespaceToSecuredNamespace() {
Namespace namespace1 = Namespace.builder()
.spec(Namespace.NamespaceSpec.builder()
.isSecured(Boolean.TRUE)
.secured(Boolean.TRUE)
.build())
.metadata(Metadata.builder()
.name("secured1")
Expand All @@ -770,7 +770,7 @@ void shouldApplyAclFromSecuredNamespaceToSecuredNamespace() {

Namespace namespace2 = Namespace.builder()
.spec(Namespace.NamespaceSpec.builder()
.isSecured(Boolean.TRUE)
.secured(Boolean.TRUE)
.build())
.metadata(Metadata.builder()
.name("secured2")
Expand Down Expand Up @@ -810,7 +810,7 @@ void shouldApplyAclFromSecuredNamespaceToSecuredNamespace() {
void shouldApplyAclFromBasicNamespaceToBasicNamespace() {
Namespace namespace1 = Namespace.builder()
.spec(Namespace.NamespaceSpec.builder()
.isSecured(Boolean.FALSE)
.secured(Boolean.FALSE)
.build())
.metadata(Metadata.builder()
.name("basic1")
Expand All @@ -820,7 +820,7 @@ void shouldApplyAclFromBasicNamespaceToBasicNamespace() {

Namespace namespace2 = Namespace.builder()
.spec(Namespace.NamespaceSpec.builder()
.isSecured(Boolean.FALSE)
.secured(Boolean.FALSE)
.build())
.metadata(Metadata.builder()
.name("basic2")
Expand Down Expand Up @@ -860,7 +860,7 @@ void shouldApplyAclFromBasicNamespaceToBasicNamespace() {
void shouldApplyAclFromBasicNamespaceToSecuredNamespace() {
Namespace namespace1 = Namespace.builder()
.spec(Namespace.NamespaceSpec.builder()
.isSecured(Boolean.FALSE)
.secured(Boolean.FALSE)
.build())
.metadata(Metadata.builder()
.name("basic")
Expand All @@ -870,7 +870,7 @@ void shouldApplyAclFromBasicNamespaceToSecuredNamespace() {

Namespace namespace2 = Namespace.builder()
.spec(Namespace.NamespaceSpec.builder()
.isSecured(Boolean.TRUE)
.secured(Boolean.TRUE)
.build())
.metadata(Metadata.builder()
.name("secured")
Expand Down Expand Up @@ -910,7 +910,7 @@ void shouldApplyAclFromBasicNamespaceToSecuredNamespace() {
void shouldNotApplyAclFromSecuredNamespaceToBasicNamespace() {
Namespace namespace1 = Namespace.builder()
.spec(Namespace.NamespaceSpec.builder()
.isSecured(Boolean.TRUE)
.secured(Boolean.TRUE)
.build())
.metadata(Metadata.builder()
.name("secured")
Expand All @@ -920,7 +920,7 @@ void shouldNotApplyAclFromSecuredNamespaceToBasicNamespace() {

Namespace namespace2 = Namespace.builder()
.spec(Namespace.NamespaceSpec.builder()
.isSecured(Boolean.FALSE)
.secured(Boolean.FALSE)
.build())
.metadata(Metadata.builder()
.name("basic")
Expand Down Expand Up @@ -953,6 +953,42 @@ void shouldNotApplyAclFromSecuredNamespaceToBasicNamespace() {
actual.getValidationErrors().getFirst());
}

@Test
void shouldNotApplyPublicAclFromSecuredNamespace() {
Namespace namespace = Namespace.builder()
.spec(Namespace.NamespaceSpec.builder()
.secured(Boolean.TRUE)
.build())
.metadata(Metadata.builder()
.name("test")
.cluster("local")
.build())
.build();

AccessControlEntry accessControlEntry = AccessControlEntry.builder()
.metadata(Metadata.builder()
.namespace("test")
.build())
.spec(AccessControlEntry.AccessControlEntrySpec.builder()
.resourceType(AccessControlEntry.ResourceType.TOPIC)
.resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED)
.permission(AccessControlEntry.Permission.READ)
.resource("prefix")
.grantedTo("*")
.build())
.build();

Authentication authentication = Authentication.build("user", Map.of("roles", List.of()));

when(namespaceService.findByName("test")).thenReturn(Optional.of(namespace));

ResourceValidationException actual = assertThrows(ResourceValidationException.class,
() -> accessControlListController.apply(authentication, "test", accessControlEntry, false));
assertEquals(1, actual.getValidationErrors().size());
assertEquals("Invalid \"apply\" operation: secured namespace cannot grant public ACL.",
actual.getValidationErrors().getFirst());
}

@Test
@SuppressWarnings("deprecation")
void shouldNotDeleteAclWhenNotFound() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ void init() {
.build())
.spec(NamespaceSpec.builder()
.kafkaUser("user1")
.isSecured(Boolean.FALSE)
.secured(Boolean.FALSE)
.connectClusters(List.of("test-connect"))
.topicValidator(TopicValidator.makeDefaultOneBroker())
.build())
Expand Down

0 comments on commit 782dc18

Please sign in to comment.