Skip to content

Commit

Permalink
Finalize fix
Browse files Browse the repository at this point in the history
  • Loading branch information
loicgreffier committed Jul 23, 2024
1 parent a681230 commit 2c9779b
Show file tree
Hide file tree
Showing 33 changed files with 1,059 additions and 715 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import com.michelin.ns4kafka.model.AccessControlEntry;
import com.michelin.ns4kafka.property.AkhqProperties;
import com.michelin.ns4kafka.property.ManagedClusterProperties;
import com.michelin.ns4kafka.service.AccessControlEntryService;
import com.michelin.ns4kafka.service.AclService;
import com.michelin.ns4kafka.service.NamespaceService;
import io.micronaut.core.annotation.Introspected;
import io.micronaut.http.annotation.Body;
Expand Down Expand Up @@ -46,7 +46,7 @@ public class AkhqClaimProviderController {
AkhqProperties config;

@Inject
AccessControlEntryService accessControlEntryService;
AclService aclService;

@Inject
NamespaceService namespaceService;
Expand Down Expand Up @@ -76,11 +76,11 @@ public AkhqClaimResponse generateClaim(@Valid @Body AkhqClaimRequest request) {
.stream()
.filter(namespace -> namespace.getMetadata().getLabels() != null
&& groups.contains(namespace.getMetadata().getLabels().getOrDefault(config.getGroupLabel(), "_")))
.flatMap(namespace -> accessControlEntryService.findAllGrantedToNamespace(namespace).stream())
.flatMap(namespace -> aclService.findAllGrantedToNamespace(namespace).stream())
.collect(Collectors.toList());

// Add all public ACLs.
relatedAcl.addAll(accessControlEntryService.findAllPublicGrantedTo());
relatedAcl.addAll(aclService.findAllPublicGrantedTo());

return AkhqClaimResponse.builder()
.roles(config.getFormerRoles())
Expand Down Expand Up @@ -117,7 +117,7 @@ public AkhqClaimResponseV2 generateClaimV2(@Valid @Body AkhqClaimRequest request
List<AccessControlEntry> relatedAcl = getAclsByGroups(groups);

// Add all public ACLs.
relatedAcl.addAll(accessControlEntryService.findAllPublicGrantedTo());
relatedAcl.addAll(aclService.findAllPublicGrantedTo());

return AkhqClaimResponseV2.builder()
.roles(config.getFormerRoles())
Expand Down Expand Up @@ -146,7 +146,7 @@ public AkhqClaimResponseV3 generateClaimV3(@Valid @Body AkhqClaimRequest request
List<AccessControlEntry> acls = getAclsByGroups(groups);

// Add all public ACLs
acls.addAll(accessControlEntryService.findAllPublicGrantedTo());
acls.addAll(aclService.findAllPublicGrantedTo());

// Remove unnecessary ACLs
// E.g., project.topic1 when project.* is granted on the same resource type and cluster
Expand Down Expand Up @@ -297,7 +297,7 @@ private List<AccessControlEntry> getAclsByGroups(List<String> groups) {
&& !Collections.disjoint(groups, List.of(namespace.getMetadata().getLabels()
.getOrDefault(config.getGroupLabel(), "_")
.split(","))))
.flatMap(namespace -> accessControlEntryService.findAllGrantedToNamespace(namespace).stream())
.flatMap(namespace -> aclService.findAllGrantedToNamespace(namespace).stream())
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.michelin.ns4kafka.controller.acl;

import static com.michelin.ns4kafka.service.AccessControlEntryService.PUBLIC_GRANTED_TO;
import static com.michelin.ns4kafka.service.AclService.PUBLIC_GRANTED_TO;
import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidAclDeleteOnlyAdmin;
import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidImmutableField;
import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidNotFound;
Expand All @@ -10,7 +10,7 @@
import com.michelin.ns4kafka.model.AccessControlEntry;
import com.michelin.ns4kafka.model.Namespace;
import com.michelin.ns4kafka.security.ResourceBasedSecurityRule;
import com.michelin.ns4kafka.service.AccessControlEntryService;
import com.michelin.ns4kafka.service.AclService;
import com.michelin.ns4kafka.util.enumation.ApplyStatus;
import com.michelin.ns4kafka.util.exception.ResourceValidationException;
import io.micronaut.http.HttpResponse;
Expand Down Expand Up @@ -39,7 +39,7 @@
@Controller("/api/namespaces/{namespace}/acls")
public class AclController extends NamespacedResourceController {
@Inject
AccessControlEntryService accessControlEntryService;
AclService aclService;

/**
* List ACLs by namespace.
Expand All @@ -56,19 +56,19 @@ public List<AccessControlEntry> list(String namespace, Optional<AclLimit> limit)

Namespace ns = getNamespace(namespace);
return switch (limit.get()) {
case GRANTEE -> accessControlEntryService.findAllGrantedToNamespace(ns)
case GRANTEE -> aclService.findAllGrantedToNamespace(ns)
.stream()
.sorted(Comparator.comparing(o -> o.getMetadata().getNamespace()))
.toList();
case GRANTOR -> accessControlEntryService.findAllForCluster(ns.getMetadata().getCluster())
case GRANTOR -> aclService.findAllForCluster(ns.getMetadata().getCluster())
.stream()
// granted by me
.filter(accessControlEntry -> accessControlEntry.getMetadata().getNamespace().equals(namespace))
// without the granted to me
.filter(accessControlEntry -> !accessControlEntry.getSpec().getGrantedTo().equals(namespace))
.sorted(Comparator.comparing(o -> o.getSpec().getGrantedTo()))
.toList();
default -> accessControlEntryService.findAllForCluster(ns.getMetadata().getCluster())
default -> aclService.findAllForCluster(ns.getMetadata().getCluster())
.stream()
.filter(accessControlEntry ->
accessControlEntry.getMetadata().getNamespace().equals(namespace)
Expand Down Expand Up @@ -115,9 +115,9 @@ public HttpResponse<AccessControlEntry> apply(Authentication authentication, Str
List<String> validationErrors;
if (isAdmin && isSelfAssigned) {
// Validate overlapping OWNER
validationErrors = accessControlEntryService.validateAsAdmin(accessControlEntry, ns);
validationErrors = aclService.validateAsAdmin(accessControlEntry, ns);
} else {
validationErrors = accessControlEntryService.validate(accessControlEntry, ns);
validationErrors = aclService.validate(accessControlEntry, ns);
}

if (!validationErrors.isEmpty()) {
Expand All @@ -127,7 +127,7 @@ public HttpResponse<AccessControlEntry> apply(Authentication authentication, Str
// AccessControlEntry spec is immutable
// This prevents accidental updates on ACL resources already declared with the same name (with different rules)
Optional<AccessControlEntry> existingAcl =
accessControlEntryService.findByName(namespace, accessControlEntry.getMetadata().getName());
aclService.findByName(namespace, accessControlEntry.getMetadata().getName());
if (existingAcl.isPresent() && !existingAcl.get().getSpec().equals(accessControlEntry.getSpec())) {
throw new ResourceValidationException(accessControlEntry,
invalidImmutableField("spec"));
Expand All @@ -150,7 +150,7 @@ public HttpResponse<AccessControlEntry> apply(Authentication authentication, Str
sendEventLog(accessControlEntry, status, existingAcl.<Object>map(AccessControlEntry::getSpec).orElse(null),
accessControlEntry.getSpec());

return formatHttpResponse(accessControlEntryService.create(accessControlEntry), status);
return formatHttpResponse(aclService.create(accessControlEntry), status);
}

/**
Expand All @@ -166,7 +166,7 @@ public HttpResponse<AccessControlEntry> apply(Authentication authentication, Str
@Status(HttpStatus.NO_CONTENT)
public HttpResponse<Void> delete(Authentication authentication, String namespace, String name,
@QueryValue(defaultValue = "false") boolean dryrun) {
AccessControlEntry accessControlEntry = accessControlEntryService
AccessControlEntry accessControlEntry = aclService
.findByName(namespace, name)
.orElseThrow(() -> new ResourceValidationException(ACCESS_CONTROL_ENTRY, name, invalidNotFound(name)));

Expand All @@ -183,7 +183,7 @@ public HttpResponse<Void> delete(Authentication authentication, String namespace

sendEventLog(accessControlEntry, ApplyStatus.deleted, accessControlEntry.getSpec(), null);

accessControlEntryService.delete(getNamespace(namespace), accessControlEntry);
aclService.delete(accessControlEntry);
return HttpResponse.noContent();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import com.michelin.ns4kafka.controller.generic.NonNamespacedResourceController;
import com.michelin.ns4kafka.model.AccessControlEntry;
import com.michelin.ns4kafka.security.ResourceBasedSecurityRule;
import com.michelin.ns4kafka.service.AccessControlEntryService;
import com.michelin.ns4kafka.service.AclService;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import io.swagger.v3.oas.annotations.tags.Tag;
Expand All @@ -19,7 +19,7 @@
@RolesAllowed(ResourceBasedSecurityRule.IS_ADMIN)
public class AclNonNamespacedController extends NonNamespacedResourceController {
@Inject
AccessControlEntryService accessControlEntryService;
AclService aclService;

/**
* List ACLs.
Expand All @@ -28,6 +28,6 @@ public class AclNonNamespacedController extends NonNamespacedResourceController
*/
@Get
public List<AccessControlEntry> listAll() {
return accessControlEntryService.findAll();
return aclService.findAll();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
* Access control entry service.
*/
@Singleton
public class AccessControlEntryService {
public class AclService {
public static final String PUBLIC_GRANTED_TO = "*";

@Inject
Expand Down Expand Up @@ -235,14 +235,13 @@ public AccessControlEntry create(AccessControlEntry accessControlEntry) {
/**
* Delete an ACL from broker and from internal topic.
*
* @param namespace The namespace
* @param accessControlEntry The ACL
*/
public void delete(Namespace namespace, AccessControlEntry accessControlEntry) {
public void delete(AccessControlEntry accessControlEntry) {
AccessControlEntryAsyncExecutor accessControlEntryAsyncExecutor =
applicationContext.getBean(AccessControlEntryAsyncExecutor.class,
Qualifiers.byName(accessControlEntry.getMetadata().getCluster()));
accessControlEntryAsyncExecutor.deleteAcl(namespace, accessControlEntry);
accessControlEntryAsyncExecutor.deleteAcl(accessControlEntry);

accessControlEntryRepository.delete(accessControlEntry);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class ConnectClusterService {
KafkaConnectClient kafkaConnectClient;

@Inject
AccessControlEntryService accessControlEntryService;
AclService aclService;

@Inject
ConnectClusterRepository connectClusterRepository;
Expand Down Expand Up @@ -124,7 +124,7 @@ public Flux<ConnectCluster> findAll(boolean all) {
*/
public List<ConnectCluster> findAllByNamespace(Namespace namespace,
List<AccessControlEntry.Permission> permissions) {
List<AccessControlEntry> acls = accessControlEntryService.findAllGrantedToNamespace(namespace).stream()
List<AccessControlEntry> acls = aclService.findAllGrantedToNamespace(namespace).stream()
.filter(acl -> permissions.contains(acl.getSpec().getPermission()))
.filter(acl -> acl.getSpec().getResourceType() == AccessControlEntry.ResourceType.CONNECT_CLUSTER).toList();

Expand Down Expand Up @@ -346,7 +346,7 @@ public void delete(ConnectCluster connectCluster) {
* @return true if it is, false otherwise
*/
public boolean isNamespaceOwnerOfConnectCluster(Namespace namespace, String connectCluster) {
return accessControlEntryService.isNamespaceOwnerOfResource(namespace.getMetadata().getName(),
return aclService.isNamespaceOwnerOfResource(namespace.getMetadata().getName(),
AccessControlEntry.ResourceType.CONNECT_CLUSTER, connectCluster);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
@Singleton
public class ConnectorService {
@Inject
AccessControlEntryService accessControlEntryService;
AclService aclService;

@Inject
KafkaConnectClient kafkaConnectClient;
Expand All @@ -57,7 +57,7 @@ public class ConnectorService {
* @return A list of connectors
*/
public List<Connector> findAllForNamespace(Namespace namespace) {
List<AccessControlEntry> acls = accessControlEntryService.findAllGrantedToNamespace(namespace);
List<AccessControlEntry> acls = aclService.findAllGrantedToNamespace(namespace);
return connectorRepository.findAllForCluster(namespace.getMetadata().getCluster())
.stream()
.filter(connector -> acls.stream().anyMatch(accessControlEntry -> {
Expand Down Expand Up @@ -161,7 +161,7 @@ public Mono<List<String>> validateLocally(Namespace namespace, Connector connect
* @return true if it is, false otherwise
*/
public boolean isNamespaceOwnerOfConnect(Namespace namespace, String connect) {
return accessControlEntryService.isNamespaceOwnerOfResource(namespace.getMetadata().getName(),
return aclService.isNamespaceOwnerOfResource(namespace.getMetadata().getName(),
AccessControlEntry.ResourceType.CONNECT, connect);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class ConsumerGroupService {
ApplicationContext applicationContext;

@Inject
AccessControlEntryService accessControlEntryService;
AclService aclService;

/**
* Check if a given namespace is owner of a given group.
Expand All @@ -47,7 +47,7 @@ public class ConsumerGroupService {
* @return true if it is, false otherwise
*/
public boolean isNamespaceOwnerOfConsumerGroup(String namespace, String groupId) {
return accessControlEntryService.isNamespaceOwnerOfResource(namespace, AccessControlEntry.ResourceType.GROUP,
return aclService.isNamespaceOwnerOfResource(namespace, AccessControlEntry.ResourceType.GROUP,
groupId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class NamespaceService {
RoleBindingService roleBindingService;

@Inject
AccessControlEntryService accessControlEntryService;
AclService aclService;

@Inject
ConnectorService connectorService;
Expand Down Expand Up @@ -166,7 +166,7 @@ public List<String> listAllNamespaceResources(Namespace namespace) {
.map(connector -> CONNECTOR + "/" + connector.getMetadata().getName()),
connectClusterService.findAllByNamespaceOwner(namespace).stream()
.map(connectCluster -> CONNECT_CLUSTER + "/" + connectCluster.getMetadata().getName()),
accessControlEntryService.findAllForNamespace(namespace).stream()
aclService.findAllForNamespace(namespace).stream()
.map(ace -> ACCESS_CONTROL_ENTRY + "/" + ace.getMetadata().getName()),
resourceQuotaService.findByNamespace(namespace.getMetadata().getName()).stream()
.map(resourceQuota -> RESOURCE_QUOTA + "/" + resourceQuota.getMetadata().getName()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
@Singleton
public class SchemaService {
@Inject
AccessControlEntryService accessControlEntryService;
AclService aclService;

@Inject
SchemaRegistryClient schemaRegistryClient;
Expand All @@ -48,7 +48,7 @@ public class SchemaService {
* @return A list of schemas
*/
public Flux<SchemaList> findAllForNamespace(Namespace namespace) {
List<AccessControlEntry> acls = accessControlEntryService.findAllGrantedToNamespace(namespace).stream()
List<AccessControlEntry> acls = aclService.findAllGrantedToNamespace(namespace).stream()
.filter(acl -> acl.getSpec().getPermission() == AccessControlEntry.Permission.OWNER)
.filter(acl -> acl.getSpec().getResourceType() == AccessControlEntry.ResourceType.TOPIC).toList();

Expand Down Expand Up @@ -291,7 +291,7 @@ public Mono<SchemaCompatibilityResponse> updateSubjectCompatibility(Namespace na
*/
public boolean isNamespaceOwnerOfSubject(Namespace namespace, String subjectName) {
String underlyingTopicName = subjectName.replaceAll("(-key|-value)$", "");
return accessControlEntryService.isNamespaceOwnerOfResource(namespace.getMetadata().getName(),
return aclService.isNamespaceOwnerOfResource(namespace.getMetadata().getName(),
AccessControlEntry.ResourceType.TOPIC,
underlyingTopicName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class StreamService {
StreamRepository streamRepository;

@Inject
AccessControlEntryService accessControlEntryService;
AclService aclService;

@Inject
ApplicationContext applicationContext;
Expand Down Expand Up @@ -62,7 +62,7 @@ public Optional<KafkaStream> findByName(Namespace namespace, String stream) {
* @return true if it is, false otherwise
*/
public boolean isNamespaceOwnerOfKafkaStream(Namespace namespace, String resource) {
return new HashSet<>(accessControlEntryService.findAllGrantedToNamespace(namespace)
return new HashSet<>(aclService.findAllGrantedToNamespace(namespace)
.stream()
.filter(accessControlEntry -> accessControlEntry.getSpec().getPermission()
== AccessControlEntry.Permission.OWNER)
Expand Down
12 changes: 6 additions & 6 deletions src/main/java/com/michelin/ns4kafka/service/TopicService.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class TopicService {
TopicRepository topicRepository;

@Inject
AccessControlEntryService accessControlEntryService;
AclService aclService;

@Inject
ApplicationContext applicationContext;
Expand All @@ -63,7 +63,7 @@ public List<Topic> findAll() {
* @return A list of topics
*/
public List<Topic> findAllForNamespace(Namespace namespace) {
List<AccessControlEntry> acls = accessControlEntryService.findAllGrantedToNamespace(namespace);
List<AccessControlEntry> acls = aclService.findAllGrantedToNamespace(namespace);
return topicRepository.findAllForCluster(namespace.getMetadata().getCluster())
.stream()
.filter(topic -> acls.stream().anyMatch(accessControlEntry -> {
Expand Down Expand Up @@ -106,7 +106,7 @@ public Optional<Topic> findByName(Namespace namespace, String topic) {
* @return true if it is, false otherwise
*/
public boolean isNamespaceOwnerOfTopic(String namespace, String topic) {
return accessControlEntryService.isNamespaceOwnerOfResource(namespace, AccessControlEntry.ResourceType.TOPIC,
return aclService.isNamespaceOwnerOfResource(namespace, AccessControlEntry.ResourceType.TOPIC,
topic);
}

Expand Down Expand Up @@ -322,13 +322,13 @@ public Map<TopicPartition, Long> deleteRecords(Topic topic, Map<TopicPartition,
/**
* Check if all topic tags respect confluent format (starts with letter followed by alphanumerical or underscore).
*
* @param topic The topic which contains tags
* @param topic The topic which contains tags
* @return true if yes, false otherwise
*/
public boolean isTagsFormatValid(Topic topic) {
return topic.getSpec().getTags()
.stream()
.allMatch(tag -> tag.matches("^[a-zA-Z]\\w*$"));
.stream()
.allMatch(tag -> tag.matches("^[a-zA-Z]\\w*$"));
}

/**
Expand Down
Loading

0 comments on commit 2c9779b

Please sign in to comment.