Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
ThomasCAI-mlv committed Jul 25, 2024
1 parent 1428c1e commit fb096a5
Show file tree
Hide file tree
Showing 7 changed files with 351 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,15 @@ public class SchemaController extends NamespacedResourceController {
SchemaService schemaService;

/**
* List schemas by namespace.
* List schemas by namespace, filtered by query parameters.
*
* @param namespace The namespace
* @param name The name filter
* @return A list of schemas
*/
@Get
public Flux<SchemaList> list(String namespace) {
return schemaService.findAllForNamespace(getNamespace(namespace));
public Flux<SchemaList> list(String namespace, @QueryValue(defaultValue = "*") String name) {
return schemaService.findAllForNamespace(getNamespace(namespace), name);
}

/**
Expand Down
32 changes: 32 additions & 0 deletions src/main/java/com/michelin/ns4kafka/service/AclService.java
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,24 @@ public List<AccessControlEntry> findAllGrantedToNamespace(Namespace namespace) {
.toList();
}

/**
* Find all owner-ACLs on a resource for a given namespace.
*
* @param namespace The namespace
* @param resourceType The resource
* @return A list of ACLs
*/
public List<AccessControlEntry> findResourceOwnerGrantedToNamespace(Namespace namespace,
AccessControlEntry.ResourceType resourceType) {
return accessControlEntryRepository.findAll()
.stream()
.filter(accessControlEntry ->
accessControlEntry.getSpec().getGrantedTo().equals(namespace.getMetadata().getName())
&& accessControlEntry.getSpec().getPermission() == AccessControlEntry.Permission.OWNER
&& accessControlEntry.getSpec().getResourceType() == resourceType)
.toList();
}

/**
* Find all public granted ACLs.
*
Expand Down Expand Up @@ -340,4 +358,18 @@ public boolean isNamespaceOwnerOfResource(String namespace, AccessControlEntry.R
public Optional<AccessControlEntry> findByName(String namespace, String name) {
return accessControlEntryRepository.findByName(namespace, name);
}

/**
* Check if there is any ACL concerning the given resource.
*
* @param acls The OWNER ACL list on resource
* @param resourceName The resource name to check ACL against
* @return true if there is any OWNER ACL concerning the given resource, false otherwise
*/
public boolean isAnyAclOfResource(List<AccessControlEntry> acls, String resourceName) {
return acls.stream().anyMatch(acl -> switch (acl.getSpec().getResourcePatternType()) {
case PREFIXED -> resourceName.startsWith(acl.getSpec().getResource());
case LITERAL -> resourceName.equals(acl.getSpec().getResource());
});
}
}
43 changes: 32 additions & 11 deletions src/main/java/com/michelin/ns4kafka/service/SchemaService.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.michelin.ns4kafka.service.client.schema.entities.SchemaCompatibilityResponse;
import com.michelin.ns4kafka.service.client.schema.entities.SchemaRequest;
import com.michelin.ns4kafka.service.client.schema.entities.SchemaResponse;
import com.michelin.ns4kafka.util.RegexUtils;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference;
import io.micronaut.core.util.CollectionUtils;
Expand Down Expand Up @@ -42,26 +43,46 @@ public class SchemaService {
SchemaRegistryClient schemaRegistryClient;

/**
* Get all the schemas by namespace.
* Get all the schemas of a given namespace.
*
* @param namespace The namespace
* @return A list of schemas
*/
public Flux<SchemaList> findAllForNamespace(Namespace namespace) {
List<AccessControlEntry> acls = aclService.findAllGrantedToNamespace(namespace).stream()
.filter(acl -> acl.getSpec().getPermission() == AccessControlEntry.Permission.OWNER)
.filter(acl -> acl.getSpec().getResourceType() == AccessControlEntry.ResourceType.TOPIC).toList();

List<AccessControlEntry> acls = aclService
.findResourceOwnerGrantedToNamespace(namespace, AccessControlEntry.ResourceType.TOPIC);
return schemaRegistryClient
.getSubjects(namespace.getMetadata().getCluster())
.filter(subject -> {
String underlyingTopicName = subject.replaceAll("(-key|-value)$", "");
String underlyingTopicName = subject.replaceAll("-(key|value)$", "");
return aclService.isAnyAclOfResource(acls, underlyingTopicName);
})
.map(subject -> SchemaList.builder()
.metadata(Metadata.builder()
.cluster(namespace.getMetadata().getCluster())
.namespace(namespace.getMetadata().getName())
.name(subject)
.build())
.build());
}

return acls.stream()
.anyMatch(accessControlEntry -> switch (accessControlEntry.getSpec().getResourcePatternType()) {
case PREFIXED -> underlyingTopicName.startsWith(accessControlEntry.getSpec().getResource());
case LITERAL -> underlyingTopicName.equals(accessControlEntry.getSpec().getResource());
});
/**
* Get all the schemas of a given namespace, filtered by given parameters.
*
* @param namespace The namespace
* @param name The name filter
* @return A list of schemas
*/
public Flux<SchemaList> findAllForNamespace(Namespace namespace, String name) {
List<AccessControlEntry> acls = aclService
.findResourceOwnerGrantedToNamespace(namespace, AccessControlEntry.ResourceType.TOPIC);
List<String> nameFilterPatterns = RegexUtils.wildcardStringsToRegexPatterns(List.of(name));
return schemaRegistryClient
.getSubjects(namespace.getMetadata().getCluster())
.filter(subject -> {
String underlyingTopicName = subject.replaceAll("-(key|value)$", "");
return aclService.isAnyAclOfResource(acls, underlyingTopicName)
&& RegexUtils.filterByPattern(subject, nameFilterPatterns);
})
.map(subject -> SchemaList.builder()
.metadata(Metadata.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ void applyNamespaceNotOwnerOfSubject() {
assertEquals(1, ((ResourceValidationException) error).getValidationErrors().size());
assertEquals("Invalid value \"prefix.subject-value\" for field \"name\": "
+ "namespace is not owner of the resource.",
((ResourceValidationException) error).getValidationErrors().get(0));
((ResourceValidationException) error).getValidationErrors().getFirst());
})
.verify();
}
Expand All @@ -161,7 +161,7 @@ void applyValidationErrors() {
assertEquals(ResourceValidationException.class, error.getClass());
assertEquals(1, ((ResourceValidationException) error).getValidationErrors().size());
assertEquals("Errors",
((ResourceValidationException) error).getValidationErrors().get(0));
((ResourceValidationException) error).getValidationErrors().getFirst());
})
.verify();
}
Expand Down Expand Up @@ -237,22 +237,37 @@ void applyDryRunNotCompatible() {
.consumeErrorWith(error -> {
assertEquals(ResourceValidationException.class, error.getClass());
assertEquals(1, ((ResourceValidationException) error).getValidationErrors().size());
assertEquals("Not compatible", ((ResourceValidationException) error).getValidationErrors().get(0));
assertEquals("Not compatible", ((ResourceValidationException) error).getValidationErrors().getFirst());
})
.verify();

verify(schemaService, never()).register(namespace, schema);
}

@Test
void list() {
void listWithoutParameter() {
Namespace namespace = buildNamespace();
SchemaList schema = buildSchemaList();

when(namespaceService.findByName("myNamespace")).thenReturn(Optional.of(namespace));
when(schemaService.findAllForNamespace(namespace)).thenReturn(Flux.fromIterable(List.of(schema)));
when(schemaService.findAllForNamespace(namespace, "*")).thenReturn(Flux.fromIterable(List.of(schema)));

StepVerifier.create(schemaController.list("myNamespace"))
StepVerifier.create(schemaController.list("myNamespace", "*"))
.consumeNextWith(
schemaResponse -> assertEquals("prefix.subject-value", schemaResponse.getMetadata().getName()))
.verifyComplete();
}

@Test
void listWithNameParameter() {
Namespace namespace = buildNamespace();
SchemaList schema = buildSchemaList();

when(namespaceService.findByName("myNamespace")).thenReturn(Optional.of(namespace));
when(schemaService.findAllForNamespace(namespace, "prefix.subject-value"))
.thenReturn(Flux.fromIterable(List.of(schema)));

StepVerifier.create(schemaController.list("myNamespace", "prefix.subject-value"))
.consumeNextWith(
schemaResponse -> assertEquals("prefix.subject-value", schemaResponse.getMetadata().getName()))
.verifyComplete();
Expand Down Expand Up @@ -365,7 +380,7 @@ void compatibilityUpdateNamespaceNotOwnerOfSubject() {
assertEquals(1, ((ResourceValidationException) error).getValidationErrors().size());
assertEquals("Invalid value \"prefix.subject-value\" for field \"name\": "
+ "namespace is not owner of the resource.",
((ResourceValidationException) error).getValidationErrors().get(0));
((ResourceValidationException) error).getValidationErrors().getFirst());
})
.verify();

Expand All @@ -385,7 +400,7 @@ void deleteSubjectNamespaceNotOwnerOfSubject() {
assertEquals(1, ((ResourceValidationException) error).getValidationErrors().size());
assertEquals("Invalid value \"prefix.subject-value\" for field \"name\": "
+ "namespace is not owner of the resource.",
((ResourceValidationException) error).getValidationErrors().get(0));
((ResourceValidationException) error).getValidationErrors().getFirst());
})
.verify();

Expand Down
111 changes: 111 additions & 0 deletions src/test/java/com/michelin/ns4kafka/service/AclServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -932,4 +932,115 @@ void shouldNotCollideIfDifferentResource() {
assertFalse(aclService.topicAclsCollide(aceTopicPrefixedOwner, aceConnectLiteralOwner));
assertFalse(aclService.topicAclsCollide(aceConnectLiteralOwner, aceTopicPrefixedOwner));
}

@Test
void findResourceOwnerAclGrantedToNamespace() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder().name("namespace1").build()).build();
AccessControlEntry acl1 = AccessControlEntry.builder()
.spec(AccessControlEntry.AccessControlEntrySpec.builder()
.resourceType(AccessControlEntry.ResourceType.TOPIC)
.permission(AccessControlEntry.Permission.OWNER)
.grantedTo("namespace1").build())
.build();
AccessControlEntry acl2 = AccessControlEntry.builder()
.spec(AccessControlEntry.AccessControlEntrySpec.builder()
.resourceType(AccessControlEntry.ResourceType.TOPIC)
.permission(AccessControlEntry.Permission.READ)
.grantedTo("namespace1").build())
.build();
AccessControlEntry acl3 = AccessControlEntry.builder()
.spec(AccessControlEntry.AccessControlEntrySpec.builder()
.resourceType(AccessControlEntry.ResourceType.CONNECT)
.permission(AccessControlEntry.Permission.OWNER)
.grantedTo("namespace1").build())
.build();
AccessControlEntry acl4 = AccessControlEntry.builder()
.spec(AccessControlEntry.AccessControlEntrySpec.builder()
.resourceType(AccessControlEntry.ResourceType.TOPIC)
.permission(AccessControlEntry.Permission.OWNER)
.grantedTo("namespace2").build())
.build();
AccessControlEntry acl5 = AccessControlEntry.builder()
.spec(AccessControlEntry.AccessControlEntrySpec.builder()
.resourceType(AccessControlEntry.ResourceType.TOPIC)
.permission(AccessControlEntry.Permission.READ)
.grantedTo("*").build())
.build();
AccessControlEntry acl6 = AccessControlEntry.builder()
.spec(AccessControlEntry.AccessControlEntrySpec.builder()
.resourceType(AccessControlEntry.ResourceType.GROUP)
.permission(AccessControlEntry.Permission.WRITE)
.grantedTo("namespace1").build())
.build();

when(accessControlEntryRepository.findAll()).thenReturn(List.of(acl1, acl2, acl3, acl4, acl5, acl6));

assertEquals(List.of(acl1),
aclService.findResourceOwnerGrantedToNamespace(ns, AccessControlEntry.ResourceType.TOPIC));
assertEquals(List.of(acl3),
aclService.findResourceOwnerGrantedToNamespace(ns, AccessControlEntry.ResourceType.CONNECT));
assertEquals(List.of(),
aclService.findResourceOwnerGrantedToNamespace(ns, AccessControlEntry.ResourceType.GROUP));
}

@Test
void isPrefixedAclOfResource() {
AccessControlEntry acl1 = AccessControlEntry.builder()
.spec(AccessControlEntry.AccessControlEntrySpec.builder()
.resourceType(AccessControlEntry.ResourceType.TOPIC)
.resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED)
.permission(AccessControlEntry.Permission.OWNER)
.resource("abc.")
.grantedTo("namespace")
.build())
.build();

AccessControlEntry acl2 = AccessControlEntry.builder()
.spec(AccessControlEntry.AccessControlEntrySpec.builder()
.resourceType(AccessControlEntry.ResourceType.CONNECT)
.resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED)
.permission(AccessControlEntry.Permission.OWNER)
.resource("abc_")
.grantedTo("namespace")
.build())
.build();
List<AccessControlEntry> acls = List.of(acl1, acl2);

assertFalse(aclService.isAnyAclOfResource(acls, "xyz.topic1"));
assertFalse(aclService.isAnyAclOfResource(acls, "topic1-abc"));
assertFalse(aclService.isAnyAclOfResource(acls, "abc-topic1"));
assertTrue(aclService.isAnyAclOfResource(acls, "abc.topic1"));
assertTrue(aclService.isAnyAclOfResource(acls, "abc_topic1"));
}

@Test
void isLiteralAclOfResource() {
AccessControlEntry acl1 = AccessControlEntry.builder()
.spec(AccessControlEntry.AccessControlEntrySpec.builder()
.resourceType(AccessControlEntry.ResourceType.TOPIC)
.resourcePatternType(AccessControlEntry.ResourcePatternType.LITERAL)
.permission(AccessControlEntry.Permission.OWNER)
.resource("abc.topic1")
.grantedTo("namespace")
.build())
.build();

AccessControlEntry acl2 = AccessControlEntry.builder()
.spec(AccessControlEntry.AccessControlEntrySpec.builder()
.resourceType(AccessControlEntry.ResourceType.CONNECT)
.resourcePatternType(AccessControlEntry.ResourcePatternType.LITERAL)
.permission(AccessControlEntry.Permission.OWNER)
.resource("abc-topic1")
.grantedTo("namespace")
.build())
.build();
List<AccessControlEntry> acls = List.of(acl1, acl2);

assertFalse(aclService.isAnyAclOfResource(acls, "xyz.topic1"));
assertFalse(aclService.isAnyAclOfResource(acls, "abc.topic12"));
assertFalse(aclService.isAnyAclOfResource(acls, "abc_topic1"));
assertTrue(aclService.isAnyAclOfResource(acls, "abc.topic1"));
assertTrue(aclService.isAnyAclOfResource(acls, "abc-topic1"));
}
}
Loading

0 comments on commit fb096a5

Please sign in to comment.