diff --git a/src/main/java/com/michelin/ns4kafka/services/executors/AccessControlEntryAsyncExecutor.java b/src/main/java/com/michelin/ns4kafka/services/executors/AccessControlEntryAsyncExecutor.java index a032703f..08cf6670 100644 --- a/src/main/java/com/michelin/ns4kafka/services/executors/AccessControlEntryAsyncExecutor.java +++ b/src/main/java/com/michelin/ns4kafka/services/executors/AccessControlEntryAsyncExecutor.java @@ -26,12 +26,14 @@ import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.common.acl.AccessControlEntryFilter; import org.apache.kafka.common.acl.AclBinding; import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.acl.AclPermissionType; import org.apache.kafka.common.resource.PatternType; import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourcePatternFilter; import org.apache.kafka.common.resource.ResourceType; /** @@ -179,8 +181,14 @@ private List collectBrokerAcls(boolean managedUsersOnly) List validResourceTypes = List.of(ResourceType.TOPIC, ResourceType.GROUP, ResourceType.TRANSACTIONAL_ID); + AccessControlEntryFilter accessControlEntryFilter = new AccessControlEntryFilter( + managedClusterProperties.getProvider() + .equals(ManagedClusterProperties.KafkaProvider.CONFLUENT_CLOUD) ? "UserV2:*" : null, + null, AclOperation.ANY, AclPermissionType.ANY); + AclBindingFilter aclBindingFilter = new AclBindingFilter(ResourcePatternFilter.ANY, accessControlEntryFilter); + List userAcls = getAdminClient() - .describeAcls(AclBindingFilter.ANY) + .describeAcls(aclBindingFilter) .values().get(10, TimeUnit.SECONDS) .stream() .filter(aclBinding -> validResourceTypes.contains(aclBinding.pattern().resourceType()))