Skip to content

Commit

Permalink
Fix ACL being deleted from owner Kafka user even when granted to anot…
Browse files Browse the repository at this point in the history
…her namespace
  • Loading branch information
loicgreffier committed Jul 22, 2024
1 parent 189153d commit 7e7a2a4
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 46 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ an Issue to discuss your proposal first. This is not required but can save time

In general, we follow the ["fork-and-pull" Git workflow](https://github.com/susam/gitpr)

- Fork the repository to your own Github account
- Fork the repository to your own GitHub account
- Clone the project to your machine
- Create a branch locally from master with a succinct but descriptive name
- Commit changes to the branch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,10 @@ public HttpResponse<AccessControlEntry> apply(Authentication authentication, Str
Namespace ns = getNamespace(namespace);

boolean isAdmin = authentication.getRoles().contains(ResourceBasedSecurityRule.IS_ADMIN);
boolean isSelfAssignedAcl = namespace.equals(accessControlEntry.getSpec().getGrantedTo());
boolean isSelfAssigned = namespace.equals(accessControlEntry.getSpec().getGrantedTo());

List<String> validationErrors;
if (isAdmin && isSelfAssignedAcl) {
if (isAdmin && isSelfAssigned) {
// Validate overlapping OWNER
validationErrors = accessControlEntryService.validateAsAdmin(accessControlEntry, ns);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,4 @@ public Optional<Namespace> findByName(String namespace) {
.filter(ns -> ns.getMetadata().getName().equals(namespace))
.findFirst();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ public List<String> validateAsAdmin(AccessControlEntry accessControlEntry, Names
parentOverlap = ace.getSpec().getResource().startsWith(accessControlEntry.getSpec().getResource())
|| topicAclsCollideWithParentOrChild(ace, accessControlEntry);
}

// new ACL would be covered by a PREFIXED existing ACLs
boolean childOverlap = false;
if (ace.getSpec().getResourcePatternType() == AccessControlEntry.ResourcePatternType.PREFIXED) {
Expand Down Expand Up @@ -241,7 +242,7 @@ public void delete(Namespace namespace, AccessControlEntry accessControlEntry) {
AccessControlEntryAsyncExecutor accessControlEntryAsyncExecutor =
applicationContext.getBean(AccessControlEntryAsyncExecutor.class,
Qualifiers.byName(accessControlEntry.getMetadata().getCluster()));
accessControlEntryAsyncExecutor.deleteNs4KafkaAcl(namespace, accessControlEntry);
accessControlEntryAsyncExecutor.deleteAcl(namespace, accessControlEntry);

accessControlEntryRepository.delete(accessControlEntry);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import com.michelin.ns4kafka.repository.NamespaceRepository;
import com.michelin.ns4kafka.repository.kafka.KafkaStoreException;
import com.michelin.ns4kafka.service.AccessControlEntryService;
import com.michelin.ns4kafka.service.ConnectorService;
import com.michelin.ns4kafka.service.StreamService;
import io.micronaut.context.annotation.EachBean;
import jakarta.inject.Singleton;
Expand Down Expand Up @@ -52,8 +51,6 @@ public class AccessControlEntryAsyncExecutor {

private StreamService streamService;

private ConnectorService connectorService;

private NamespaceRepository namespaceRepository;

/**
Expand Down Expand Up @@ -130,9 +127,10 @@ private List<AclBinding> collectNs4KafkaAcls() {
.stream()
.flatMap(namespace -> accessControlEntryService.findAllGrantedToNamespace(namespace)
.stream()
.filter(accessControlEntry -> (List.of(TOPIC, GROUP, TRANSACTIONAL_ID)
.contains(accessControlEntry.getSpec().getResourceType())))
.flatMap(accessControlEntry -> buildAclBindingsFromAccessControlEntry(accessControlEntry,
.filter(accessControlEntry -> List.of(TOPIC, GROUP, TRANSACTIONAL_ID)
.contains(accessControlEntry.getSpec().getResourceType()))
.flatMap(accessControlEntry -> buildAclBindingsFromAccessControlEntry(
accessControlEntry,
namespace.getSpec().getKafkaUser())
.stream())
.distinct());
Expand Down Expand Up @@ -226,40 +224,45 @@ private List<AclBinding> collectBrokerAcls(boolean managedUsersOnly)
}

/**
* Convert Ns4Kafka topic and group ACL into Kafka ACL.
* Convert Ns4Kafka topic, group and transactional ACL into Kafka ACL.
*
* @param accessControlEntry The Ns4Kafka ACL
* @param kafkaUser The ACL owner
* @param kafkaUser The Kafka user to link to the ACL
* @return A list of Kafka ACLs
*/
private List<AclBinding> buildAclBindingsFromAccessControlEntry(AccessControlEntry accessControlEntry,
String kafkaUser) {
// Convert pattern, convert resource type from Ns4Kafka to org.apache.kafka.common types
PatternType patternType =
PatternType.fromString(accessControlEntry.getSpec().getResourcePatternType().toString());
ResourceType resourceType = org.apache.kafka.common.resource.ResourceType.fromString(
PatternType patternType = PatternType.fromString(
accessControlEntry.getSpec().getResourcePatternType().toString());

ResourceType resourceType = ResourceType.fromString(
accessControlEntry.getSpec().getResourceType().toString());
ResourcePattern resourcePattern =
new ResourcePattern(resourceType, accessControlEntry.getSpec().getResource(), patternType);

// Generate the required AclOperation based on ResourceType
ResourcePattern resourcePattern = new ResourcePattern(resourceType,
accessControlEntry.getSpec().getResource(), patternType);

// Generate the required AclOperation and principal based on the permission
List<AclOperation> targetAclOperations;
if (accessControlEntry.getSpec().getPermission() == AccessControlEntry.Permission.OWNER) {
targetAclOperations = computeAclOperationForOwner(resourceType);
} else {
// Should be READ or WRITE
targetAclOperations =
List.of(AclOperation.fromString(accessControlEntry.getSpec().getPermission().toString()));
targetAclOperations = List.of(
AclOperation.fromString(accessControlEntry.getSpec().getPermission().toString())
);
}

final String aclUser =
accessControlEntry.getSpec().getGrantedTo().equals(PUBLIC_GRANTED_TO) ? PUBLIC_GRANTED_TO : kafkaUser;
final String aclUser = accessControlEntry.getSpec().getGrantedTo().equals(PUBLIC_GRANTED_TO)
? PUBLIC_GRANTED_TO : kafkaUser;

return targetAclOperations
.stream()
.map(aclOperation ->
new AclBinding(resourcePattern,
new org.apache.kafka.common.acl.AccessControlEntry(USER_PRINCIPAL + aclUser,
"*", aclOperation, AclPermissionType.ALLOW)))
.map(aclOperation -> new AclBinding(resourcePattern,
new org.apache.kafka.common.acl.AccessControlEntry(
USER_PRINCIPAL + aclUser,
"*",
aclOperation,
AclPermissionType.ALLOW)))
.toList();
}

Expand Down Expand Up @@ -363,20 +366,28 @@ private void deleteAcls(List<AclBinding> toDelete) {
* Delete a given Ns4Kafka ACL.
* Convert Ns4Kafka ACL into Kafka ACLs before deletion.
*
* @param namespace The namespace
* @param ns4kafkaAcl The Kafka ACL
* @param namespace The namespace
* @param acl The ACL
*/
public void deleteNs4KafkaAcl(Namespace namespace, AccessControlEntry ns4kafkaAcl) {
public void deleteAcl(Namespace namespace, AccessControlEntry acl) {
if (managedClusterProperties.isManageAcls()) {
List<AclBinding> results = new ArrayList<>();

if (List.of(TOPIC, GROUP, TRANSACTIONAL_ID).contains(ns4kafkaAcl.getSpec().getResourceType())) {
results.addAll(buildAclBindingsFromAccessControlEntry(ns4kafkaAcl, namespace.getSpec().getKafkaUser()));
if (List.of(TOPIC, GROUP, TRANSACTIONAL_ID).contains(acl.getSpec().getResourceType())) {
boolean isSelfAssigned = acl.getMetadata().getNamespace().equals(acl.getSpec().getGrantedTo());

String kafkaUser = isSelfAssigned ? namespace.getSpec().getKafkaUser()
: namespaceRepository.findByName(acl.getSpec().getGrantedTo())
.orElseThrow()
.getSpec()
.getKafkaUser();

results.addAll(buildAclBindingsFromAccessControlEntry(acl, kafkaUser));
}

if (ns4kafkaAcl.getSpec().getResourceType() == AccessControlEntry.ResourceType.CONNECT
&& ns4kafkaAcl.getSpec().getPermission() == AccessControlEntry.Permission.OWNER) {
results.addAll(buildAclBindingsFromConnector(ns4kafkaAcl, namespace.getSpec().getKafkaUser()));
if (acl.getSpec().getResourceType() == AccessControlEntry.ResourceType.CONNECT
&& acl.getSpec().getPermission() == AccessControlEntry.Permission.OWNER) {
results.addAll(buildAclBindingsFromConnector(acl, namespace.getSpec().getKafkaUser()));
}

deleteAcls(results);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public static String invalidAclNotOwnerOfTopLevel(String invalidResourceValue,
invalidResourcePatternTypeValue) {
return String.format(INVALID_FIELDS, invalidResourceValue, invalidResourcePatternTypeValue, "resource",
"resourcePatternType",
"cannot grant ACL to yourself");
"cannot grant ACL because namespace is not owner of the top level resource");
}

/**
Expand Down Expand Up @@ -736,6 +736,6 @@ public static String invalidTopicTags(String invalidTagValue) {
*/
public static String invalidTopicTagsFormat(String invalidTagFormatValue) {
return String.format(INVALID_FIELD, invalidTagFormatValue, "tags",
"tags should start with letter and be followed by alphanumeric or _ characters");
"tags should start with letter and be followed by alphanumeric or _ characters");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ void validateNotAllowedResources() {
"Invalid value \"OWNER\" for field \"permission\": value must be one of \"READ, WRITE\".",
"Invalid value \"target-ns\" for field \"grantedTo\": resource not found.",
"Invalid value \"test/PREFIXED\" for fields \"resource/resourcePatternType\": "
+ "cannot grant ACL to yourself."),
+ "cannot grant ACL because namespace is not owner of the top level resource."),
actual);

}
Expand Down Expand Up @@ -112,7 +112,7 @@ void validateNotAllowedSelfGrant() {
assertLinesMatch(List.of(
"Invalid value \"namespace\" for field \"grantedTo\": cannot grant ACL to yourself.",
"Invalid value \"test/PREFIXED\" for fields \"resource/resourcePatternType\": "
+ "cannot grant ACL to yourself."),
+ "cannot grant ACL because namespace is not owner of the top level resource."),
actual);
}

Expand All @@ -124,6 +124,7 @@ void validateNotAllowedOwnerOfBadPrefix() {
.cluster("local")
.build())
.build();

AccessControlEntry accessControlEntry = AccessControlEntry.builder()
.metadata(Metadata.builder()
.name("acl-name")
Expand All @@ -137,6 +138,7 @@ void validateNotAllowedOwnerOfBadPrefix() {
.grantedTo("target-ns")
.build())
.build();

when(applicationContext.getBean(NamespaceService.class))
.thenReturn(namespaceService);
when(namespaceService.findByName("target-ns"))
Expand All @@ -155,11 +157,11 @@ void validateNotAllowedOwnerOfBadPrefix() {
));
List<String> actual = accessControlEntryService.validate(accessControlEntry, ns);
assertLinesMatch(List.of("Invalid value \"main/PREFIXED\" for fields \"resource/resourcePatternType\": "
+ "cannot grant ACL to yourself."), actual);
+ "cannot grant ACL because namespace is not owner of the top level resource."), actual);
}

@Test
void validate_NotAllowedOwnerOfBadLiteral() {
void validateNotAllowedOwnerOfBadLiteral() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
.name("namespace")
Expand Down Expand Up @@ -193,13 +195,13 @@ void validate_NotAllowedOwnerOfBadLiteral() {
.permission(AccessControlEntry.Permission.OWNER)
.resource("resource1")
.grantedTo("namespace")
.build()
)
.build())
.build()
));

List<String> actual = accessControlEntryService.validate(accessControlEntry, ns);
assertLinesMatch(List.of("Invalid value \"resource2/LITERAL\" for fields \"resource/resourcePatternType\": "
+ "cannot grant ACL to yourself."), actual);
+ "cannot grant ACL because namespace is not owner of the top level resource."), actual);
}

@Test
Expand Down Expand Up @@ -286,7 +288,7 @@ void validate_AllowedOwnerOfPrefix() {
}

@Test
void validate_AllowedPublicGrantedTo() {
void validateAllowedPublicGrantedTo() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
.name("namespace")
Expand Down

0 comments on commit 7e7a2a4

Please sign in to comment.