diff --git a/src/main/java/com/michelin/ns4kafka/controller/SchemaController.java b/src/main/java/com/michelin/ns4kafka/controller/SchemaController.java index a78c3628..0c9b43ef 100644 --- a/src/main/java/com/michelin/ns4kafka/controller/SchemaController.java +++ b/src/main/java/com/michelin/ns4kafka/controller/SchemaController.java @@ -44,14 +44,15 @@ public class SchemaController extends NamespacedResourceController { SchemaService schemaService; /** - * List schemas by namespace. + * List schemas by namespace, filtered by query parameters. * * @param namespace The namespace + * @param name The name filter * @return A list of schemas */ @Get - public Flux list(String namespace) { - return schemaService.findAllForNamespace(getNamespace(namespace)); + public Flux list(String namespace, @QueryValue(defaultValue = "*") String name) { + return schemaService.findAllForNamespace(getNamespace(namespace), name); } /** diff --git a/src/main/java/com/michelin/ns4kafka/service/AclService.java b/src/main/java/com/michelin/ns4kafka/service/AclService.java index 37abd2e4..f50b304c 100644 --- a/src/main/java/com/michelin/ns4kafka/service/AclService.java +++ b/src/main/java/com/michelin/ns4kafka/service/AclService.java @@ -262,6 +262,24 @@ public List findAllGrantedToNamespace(Namespace namespace) { .toList(); } + /** + * Find all owner-ACLs on a resource for a given namespace. + * + * @param namespace The namespace + * @param resourceType The resource + * @return A list of ACLs + */ + public List findResourceOwnerGrantedToNamespace(Namespace namespace, + AccessControlEntry.ResourceType resourceType) { + return accessControlEntryRepository.findAll() + .stream() + .filter(accessControlEntry -> + accessControlEntry.getSpec().getGrantedTo().equals(namespace.getMetadata().getName()) + && accessControlEntry.getSpec().getPermission() == AccessControlEntry.Permission.OWNER + && accessControlEntry.getSpec().getResourceType() == resourceType) + .toList(); + } + /** * Find all public granted ACLs. * @@ -340,4 +358,18 @@ public boolean isNamespaceOwnerOfResource(String namespace, AccessControlEntry.R public Optional findByName(String namespace, String name) { return accessControlEntryRepository.findByName(namespace, name); } + + /** + * Check if there is any ACL concerning the given resource. + * + * @param acls The OWNER ACL list on resource + * @param resourceName The resource name to check ACL against + * @return true if there is any OWNER ACL concerning the given resource, false otherwise + */ + public boolean isAnyAclOfResource(List acls, String resourceName) { + return acls.stream().anyMatch(acl -> switch (acl.getSpec().getResourcePatternType()) { + case PREFIXED -> resourceName.startsWith(acl.getSpec().getResource()); + case LITERAL -> resourceName.equals(acl.getSpec().getResource()); + }); + } } diff --git a/src/main/java/com/michelin/ns4kafka/service/SchemaService.java b/src/main/java/com/michelin/ns4kafka/service/SchemaService.java index 9ff41d3c..a8b75127 100644 --- a/src/main/java/com/michelin/ns4kafka/service/SchemaService.java +++ b/src/main/java/com/michelin/ns4kafka/service/SchemaService.java @@ -14,6 +14,7 @@ import com.michelin.ns4kafka.service.client.schema.entities.SchemaCompatibilityResponse; import com.michelin.ns4kafka.service.client.schema.entities.SchemaRequest; import com.michelin.ns4kafka.service.client.schema.entities.SchemaResponse; +import com.michelin.ns4kafka.util.RegexUtils; import io.confluent.kafka.schemaregistry.avro.AvroSchema; import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference; import io.micronaut.core.util.CollectionUtils; @@ -42,26 +43,46 @@ public class SchemaService { SchemaRegistryClient schemaRegistryClient; /** - * Get all the schemas by namespace. + * Get all the schemas of a given namespace. * * @param namespace The namespace * @return A list of schemas */ public Flux findAllForNamespace(Namespace namespace) { - List acls = aclService.findAllGrantedToNamespace(namespace).stream() - .filter(acl -> acl.getSpec().getPermission() == AccessControlEntry.Permission.OWNER) - .filter(acl -> acl.getSpec().getResourceType() == AccessControlEntry.ResourceType.TOPIC).toList(); - + List acls = aclService + .findResourceOwnerGrantedToNamespace(namespace, AccessControlEntry.ResourceType.TOPIC); return schemaRegistryClient .getSubjects(namespace.getMetadata().getCluster()) .filter(subject -> { - String underlyingTopicName = subject.replaceAll("(-key|-value)$", ""); + String underlyingTopicName = subject.replaceAll("-(key|value)$", ""); + return aclService.isAnyAclOfResource(acls, underlyingTopicName); + }) + .map(subject -> SchemaList.builder() + .metadata(Metadata.builder() + .cluster(namespace.getMetadata().getCluster()) + .namespace(namespace.getMetadata().getName()) + .name(subject) + .build()) + .build()); + } - return acls.stream() - .anyMatch(accessControlEntry -> switch (accessControlEntry.getSpec().getResourcePatternType()) { - case PREFIXED -> underlyingTopicName.startsWith(accessControlEntry.getSpec().getResource()); - case LITERAL -> underlyingTopicName.equals(accessControlEntry.getSpec().getResource()); - }); + /** + * Get all the schemas of a given namespace, filtered by given parameters. + * + * @param namespace The namespace + * @param name The name filter + * @return A list of schemas + */ + public Flux findAllForNamespace(Namespace namespace, String name) { + List acls = aclService + .findResourceOwnerGrantedToNamespace(namespace, AccessControlEntry.ResourceType.TOPIC); + List nameFilterPatterns = RegexUtils.wildcardStringsToRegexPatterns(List.of(name)); + return schemaRegistryClient + .getSubjects(namespace.getMetadata().getCluster()) + .filter(subject -> { + String underlyingTopicName = subject.replaceAll("-(key|value)$", ""); + return aclService.isAnyAclOfResource(acls, underlyingTopicName) + && RegexUtils.filterByPattern(subject, nameFilterPatterns); }) .map(subject -> SchemaList.builder() .metadata(Metadata.builder() diff --git a/src/test/java/com/michelin/ns4kafka/controller/SchemaControllerTest.java b/src/test/java/com/michelin/ns4kafka/controller/SchemaControllerTest.java index 1ba48ab5..f0ebf40d 100644 --- a/src/test/java/com/michelin/ns4kafka/controller/SchemaControllerTest.java +++ b/src/test/java/com/michelin/ns4kafka/controller/SchemaControllerTest.java @@ -142,7 +142,7 @@ void applyNamespaceNotOwnerOfSubject() { assertEquals(1, ((ResourceValidationException) error).getValidationErrors().size()); assertEquals("Invalid value \"prefix.subject-value\" for field \"name\": " + "namespace is not owner of the resource.", - ((ResourceValidationException) error).getValidationErrors().get(0)); + ((ResourceValidationException) error).getValidationErrors().getFirst()); }) .verify(); } @@ -161,7 +161,7 @@ void applyValidationErrors() { assertEquals(ResourceValidationException.class, error.getClass()); assertEquals(1, ((ResourceValidationException) error).getValidationErrors().size()); assertEquals("Errors", - ((ResourceValidationException) error).getValidationErrors().get(0)); + ((ResourceValidationException) error).getValidationErrors().getFirst()); }) .verify(); } @@ -237,7 +237,7 @@ void applyDryRunNotCompatible() { .consumeErrorWith(error -> { assertEquals(ResourceValidationException.class, error.getClass()); assertEquals(1, ((ResourceValidationException) error).getValidationErrors().size()); - assertEquals("Not compatible", ((ResourceValidationException) error).getValidationErrors().get(0)); + assertEquals("Not compatible", ((ResourceValidationException) error).getValidationErrors().getFirst()); }) .verify(); @@ -245,14 +245,29 @@ void applyDryRunNotCompatible() { } @Test - void list() { + void listWithoutParameter() { Namespace namespace = buildNamespace(); SchemaList schema = buildSchemaList(); when(namespaceService.findByName("myNamespace")).thenReturn(Optional.of(namespace)); - when(schemaService.findAllForNamespace(namespace)).thenReturn(Flux.fromIterable(List.of(schema))); + when(schemaService.findAllForNamespace(namespace, "*")).thenReturn(Flux.fromIterable(List.of(schema))); - StepVerifier.create(schemaController.list("myNamespace")) + StepVerifier.create(schemaController.list("myNamespace", "*")) + .consumeNextWith( + schemaResponse -> assertEquals("prefix.subject-value", schemaResponse.getMetadata().getName())) + .verifyComplete(); + } + + @Test + void listWithNameParameter() { + Namespace namespace = buildNamespace(); + SchemaList schema = buildSchemaList(); + + when(namespaceService.findByName("myNamespace")).thenReturn(Optional.of(namespace)); + when(schemaService.findAllForNamespace(namespace, "prefix.subject-value")) + .thenReturn(Flux.fromIterable(List.of(schema))); + + StepVerifier.create(schemaController.list("myNamespace", "prefix.subject-value")) .consumeNextWith( schemaResponse -> assertEquals("prefix.subject-value", schemaResponse.getMetadata().getName())) .verifyComplete(); @@ -365,7 +380,7 @@ void compatibilityUpdateNamespaceNotOwnerOfSubject() { assertEquals(1, ((ResourceValidationException) error).getValidationErrors().size()); assertEquals("Invalid value \"prefix.subject-value\" for field \"name\": " + "namespace is not owner of the resource.", - ((ResourceValidationException) error).getValidationErrors().get(0)); + ((ResourceValidationException) error).getValidationErrors().getFirst()); }) .verify(); @@ -385,7 +400,7 @@ void deleteSubjectNamespaceNotOwnerOfSubject() { assertEquals(1, ((ResourceValidationException) error).getValidationErrors().size()); assertEquals("Invalid value \"prefix.subject-value\" for field \"name\": " + "namespace is not owner of the resource.", - ((ResourceValidationException) error).getValidationErrors().get(0)); + ((ResourceValidationException) error).getValidationErrors().getFirst()); }) .verify(); diff --git a/src/test/java/com/michelin/ns4kafka/service/AclServiceTest.java b/src/test/java/com/michelin/ns4kafka/service/AclServiceTest.java index 0bc76966..e926534e 100644 --- a/src/test/java/com/michelin/ns4kafka/service/AclServiceTest.java +++ b/src/test/java/com/michelin/ns4kafka/service/AclServiceTest.java @@ -932,4 +932,115 @@ void shouldNotCollideIfDifferentResource() { assertFalse(aclService.topicAclsCollide(aceTopicPrefixedOwner, aceConnectLiteralOwner)); assertFalse(aclService.topicAclsCollide(aceConnectLiteralOwner, aceTopicPrefixedOwner)); } + + @Test + void findResourceOwnerAclGrantedToNamespace() { + Namespace ns = Namespace.builder() + .metadata(Metadata.builder().name("namespace1").build()).build(); + AccessControlEntry acl1 = AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .resourceType(AccessControlEntry.ResourceType.TOPIC) + .permission(AccessControlEntry.Permission.OWNER) + .grantedTo("namespace1").build()) + .build(); + AccessControlEntry acl2 = AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .resourceType(AccessControlEntry.ResourceType.TOPIC) + .permission(AccessControlEntry.Permission.READ) + .grantedTo("namespace1").build()) + .build(); + AccessControlEntry acl3 = AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .resourceType(AccessControlEntry.ResourceType.CONNECT) + .permission(AccessControlEntry.Permission.OWNER) + .grantedTo("namespace1").build()) + .build(); + AccessControlEntry acl4 = AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .resourceType(AccessControlEntry.ResourceType.TOPIC) + .permission(AccessControlEntry.Permission.OWNER) + .grantedTo("namespace2").build()) + .build(); + AccessControlEntry acl5 = AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .resourceType(AccessControlEntry.ResourceType.TOPIC) + .permission(AccessControlEntry.Permission.READ) + .grantedTo("*").build()) + .build(); + AccessControlEntry acl6 = AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .resourceType(AccessControlEntry.ResourceType.GROUP) + .permission(AccessControlEntry.Permission.WRITE) + .grantedTo("namespace1").build()) + .build(); + + when(accessControlEntryRepository.findAll()).thenReturn(List.of(acl1, acl2, acl3, acl4, acl5, acl6)); + + assertEquals(List.of(acl1), + aclService.findResourceOwnerGrantedToNamespace(ns, AccessControlEntry.ResourceType.TOPIC)); + assertEquals(List.of(acl3), + aclService.findResourceOwnerGrantedToNamespace(ns, AccessControlEntry.ResourceType.CONNECT)); + assertEquals(List.of(), + aclService.findResourceOwnerGrantedToNamespace(ns, AccessControlEntry.ResourceType.GROUP)); + } + + @Test + void isPrefixedAclOfResource() { + AccessControlEntry acl1 = AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .resourceType(AccessControlEntry.ResourceType.TOPIC) + .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) + .permission(AccessControlEntry.Permission.OWNER) + .resource("abc.") + .grantedTo("namespace") + .build()) + .build(); + + AccessControlEntry acl2 = AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .resourceType(AccessControlEntry.ResourceType.CONNECT) + .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) + .permission(AccessControlEntry.Permission.OWNER) + .resource("abc_") + .grantedTo("namespace") + .build()) + .build(); + List acls = List.of(acl1, acl2); + + assertFalse(aclService.isAnyAclOfResource(acls, "xyz.topic1")); + assertFalse(aclService.isAnyAclOfResource(acls, "topic1-abc")); + assertFalse(aclService.isAnyAclOfResource(acls, "abc-topic1")); + assertTrue(aclService.isAnyAclOfResource(acls, "abc.topic1")); + assertTrue(aclService.isAnyAclOfResource(acls, "abc_topic1")); + } + + @Test + void isLiteralAclOfResource() { + AccessControlEntry acl1 = AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .resourceType(AccessControlEntry.ResourceType.TOPIC) + .resourcePatternType(AccessControlEntry.ResourcePatternType.LITERAL) + .permission(AccessControlEntry.Permission.OWNER) + .resource("abc.topic1") + .grantedTo("namespace") + .build()) + .build(); + + AccessControlEntry acl2 = AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .resourceType(AccessControlEntry.ResourceType.CONNECT) + .resourcePatternType(AccessControlEntry.ResourcePatternType.LITERAL) + .permission(AccessControlEntry.Permission.OWNER) + .resource("abc-topic1") + .grantedTo("namespace") + .build()) + .build(); + List acls = List.of(acl1, acl2); + + assertFalse(aclService.isAnyAclOfResource(acls, "xyz.topic1")); + assertFalse(aclService.isAnyAclOfResource(acls, "abc.topic12")); + assertFalse(aclService.isAnyAclOfResource(acls, "abc_topic1")); + assertTrue(aclService.isAnyAclOfResource(acls, "abc.topic1")); + assertTrue(aclService.isAnyAclOfResource(acls, "abc-topic1")); + } } diff --git a/src/test/java/com/michelin/ns4kafka/service/SchemaServiceTest.java b/src/test/java/com/michelin/ns4kafka/service/SchemaServiceTest.java index 39d5fa16..21727201 100644 --- a/src/test/java/com/michelin/ns4kafka/service/SchemaServiceTest.java +++ b/src/test/java/com/michelin/ns4kafka/service/SchemaServiceTest.java @@ -3,6 +3,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.eq; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -40,52 +42,40 @@ class SchemaServiceTest { SchemaRegistryClient schemaRegistryClient; @Test - void getAllByNamespace() { + void listSchemaWithoutParameter() { Namespace namespace = buildNamespace(); List subjectsResponse = Arrays.asList("prefix.schema-one", "prefix2.schema-two", "prefix2.schema-three"); + List acls = List.of( + AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .permission(AccessControlEntry.Permission.OWNER) + .grantedTo("myNamespace") + .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) + .resourceType(AccessControlEntry.ResourceType.TOPIC) + .resource("prefix.") + .build()) + .build(), + AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .permission(AccessControlEntry.Permission.OWNER) + .grantedTo("myNamespace") + .resourcePatternType(AccessControlEntry.ResourcePatternType.LITERAL) + .resourceType(AccessControlEntry.ResourceType.TOPIC) + .resource("prefix2.schema-two") + .build()) + .build() + ); + + when(aclService.findResourceOwnerGrantedToNamespace(namespace, + AccessControlEntry.ResourceType.TOPIC)).thenReturn(acls); + when(schemaRegistryClient.getSubjects(namespace.getMetadata().getCluster())).thenReturn( Flux.fromIterable(subjectsResponse)); - when(aclService.findAllGrantedToNamespace(namespace)) - .thenReturn(List.of( - AccessControlEntry.builder() - .spec(AccessControlEntry.AccessControlEntrySpec.builder() - .permission(AccessControlEntry.Permission.OWNER) - .grantedTo("namespace") - .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) - .resourceType(AccessControlEntry.ResourceType.TOPIC) - .resource("prefix.") - .build()) - .build(), - AccessControlEntry.builder() - .spec(AccessControlEntry.AccessControlEntrySpec.builder() - .permission(AccessControlEntry.Permission.OWNER) - .grantedTo("namespace") - .resourcePatternType(AccessControlEntry.ResourcePatternType.LITERAL) - .resourceType(AccessControlEntry.ResourceType.TOPIC) - .resource("prefix2.schema-two") - .build()) - .build(), - AccessControlEntry.builder() - .spec(AccessControlEntry.AccessControlEntrySpec.builder() - .permission(AccessControlEntry.Permission.READ) - .grantedTo("namespace") - .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) - .resourceType(AccessControlEntry.ResourceType.TOPIC) - .resource("prefix3.") - .build()) - .build(), - AccessControlEntry.builder() - .spec(AccessControlEntry.AccessControlEntrySpec.builder() - .permission(AccessControlEntry.Permission.OWNER) - .grantedTo("namespace") - .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) - .resourceType(AccessControlEntry.ResourceType.CONNECT) - .resource("ns-") - .build()) - .build() - )); + when(aclService.isAnyAclOfResource(acls, "prefix.schema-one")).thenReturn(true); + when(aclService.isAnyAclOfResource(acls, "prefix2.schema-two")).thenReturn(true); + when(aclService.isAnyAclOfResource(acls, "prefix2.schema-three")).thenReturn(false); StepVerifier.create(schemaService.findAllForNamespace(namespace)) .consumeNextWith(schema -> assertEquals("prefix.schema-one", schema.getMetadata().getName())) @@ -93,8 +83,9 @@ void getAllByNamespace() { .verifyComplete(); } + @Test - void getAllByNamespaceEmptyResponse() { + void listSchemaEmptyResponse() { Namespace namespace = buildNamespace(); when(schemaRegistryClient.getSubjects(namespace.getMetadata().getCluster())).thenReturn(Flux.empty()); @@ -103,6 +94,123 @@ void getAllByNamespaceEmptyResponse() { .verifyComplete(); } + @Test + void listSchemaWithNameParameter() { + Namespace namespace = buildNamespace(); + List subjectsResponse = List.of("prefix.schema-one", "prefix2.schema-two", "prefix2.schema-three"); + + List acls = List.of( + AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .permission(AccessControlEntry.Permission.OWNER) + .grantedTo("myNamespace") + .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) + .resourceType(AccessControlEntry.ResourceType.TOPIC) + .resource("prefix.") + .build()) + .build(), + AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .permission(AccessControlEntry.Permission.OWNER) + .grantedTo("myNamespace") + .resourcePatternType(AccessControlEntry.ResourcePatternType.LITERAL) + .resourceType(AccessControlEntry.ResourceType.TOPIC) + .resource("prefix2.schema-two") + .build()) + .build() + ); + + when(aclService.findResourceOwnerGrantedToNamespace(namespace, + AccessControlEntry.ResourceType.TOPIC)).thenReturn(acls); + when(schemaRegistryClient.getSubjects(namespace.getMetadata().getCluster())).thenReturn( + Flux.fromIterable(subjectsResponse)); + when(aclService.isAnyAclOfResource(acls, "prefix.schema-one")).thenReturn(true); + when(aclService.isAnyAclOfResource(acls, "prefix2.schema-two")).thenReturn(true); + when(aclService.isAnyAclOfResource(acls, "prefix2.schema-three")).thenReturn(false); + + StepVerifier.create(schemaService.findAllForNamespace(namespace, "prefix.schema-one")) + .consumeNextWith(schema -> assertEquals("prefix.schema-one", schema.getMetadata().getName())) + .verifyComplete(); + StepVerifier.create(schemaService.findAllForNamespace(namespace, "prefix2.schema-three")).verifyComplete(); + StepVerifier.create(schemaService.findAllForNamespace(namespace, "prefix3.schema-four")).verifyComplete(); + StepVerifier.create(schemaService.findAllForNamespace(namespace, "")) + .consumeNextWith(schema -> assertEquals("prefix.schema-one", schema.getMetadata().getName())) + .consumeNextWith(schema -> assertEquals("prefix2.schema-two", schema.getMetadata().getName())) + .verifyComplete(); + } + + @Test + void listSchemaWithWildcardNameParameter() { + Namespace namespace = buildNamespace(); + List subjectsResponse = List.of("prefix1.schema1-value", "prefix1.schema1-key", "prefix1.schema-one", + "prefix2.schema2-value", "prefix4.schema1-value", "prefix4.schema2-key"); + + List acls = List.of( + AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .permission(AccessControlEntry.Permission.OWNER) + .grantedTo("myNamespace") + .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) + .resourceType(AccessControlEntry.ResourceType.TOPIC) + .resource("prefix1.") + .build()) + .build(), + AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .permission(AccessControlEntry.Permission.OWNER) + .grantedTo("myNamespace") + .resourcePatternType(AccessControlEntry.ResourcePatternType.LITERAL) + .resourceType(AccessControlEntry.ResourceType.TOPIC) + .resource("prefix2.schema2-value") + .build()) + .build(), + AccessControlEntry.builder() + .spec(AccessControlEntry.AccessControlEntrySpec.builder() + .permission(AccessControlEntry.Permission.OWNER) + .grantedTo("myNamespace") + .resourcePatternType(AccessControlEntry.ResourcePatternType.PREFIXED) + .resourceType(AccessControlEntry.ResourceType.TOPIC) + .resource("prefix4.") + .build()) + .build() + ); + + when(aclService.findResourceOwnerGrantedToNamespace(namespace, + AccessControlEntry.ResourceType.TOPIC)).thenReturn(acls); + when(schemaRegistryClient.getSubjects(namespace.getMetadata().getCluster())).thenReturn( + Flux.fromIterable(subjectsResponse)); + when(aclService.isAnyAclOfResource(eq(acls), anyString())).thenReturn(true); + + StepVerifier.create(schemaService.findAllForNamespace(namespace, "prefix1.*")) + .consumeNextWith(schema -> assertEquals("prefix1.schema1-value", schema.getMetadata().getName())) + .consumeNextWith(schema -> assertEquals("prefix1.schema1-key", schema.getMetadata().getName())) + .consumeNextWith(schema -> assertEquals("prefix1.schema-one", schema.getMetadata().getName())) + .verifyComplete(); + + StepVerifier.create(schemaService.findAllForNamespace(namespace, "*-value")) + .consumeNextWith(schema -> assertEquals("prefix1.schema1-value", schema.getMetadata().getName())) + .consumeNextWith(schema -> assertEquals("prefix2.schema2-value", schema.getMetadata().getName())) + .consumeNextWith(schema -> assertEquals("prefix4.schema1-value", schema.getMetadata().getName())) + .verifyComplete(); + + StepVerifier.create(schemaService.findAllForNamespace(namespace, "prefix?.schema1-key")) + .consumeNextWith(schema -> assertEquals("prefix1.schema1-key", schema.getMetadata().getName())) + .verifyComplete(); + + StepVerifier.create(schemaService.findAllForNamespace(namespace, "prefix?.schema-*")) + .consumeNextWith(schema -> assertEquals("prefix1.schema-one", schema.getMetadata().getName())) + .verifyComplete(); + + StepVerifier.create(schemaService.findAllForNamespace(namespace, "*")) + .consumeNextWith(schema -> assertEquals("prefix1.schema1-value", schema.getMetadata().getName())) + .consumeNextWith(schema -> assertEquals("prefix1.schema1-key", schema.getMetadata().getName())) + .consumeNextWith(schema -> assertEquals("prefix1.schema-one", schema.getMetadata().getName())) + .consumeNextWith(schema -> assertEquals("prefix2.schema2-value", schema.getMetadata().getName())) + .consumeNextWith(schema -> assertEquals("prefix4.schema1-value", schema.getMetadata().getName())) + .consumeNextWith(schema -> assertEquals("prefix4.schema2-key", schema.getMetadata().getName())) + .verifyComplete(); + } + @Test void getBySubjectAndVersion() { Namespace namespace = buildNamespace(); diff --git a/src/test/java/com/michelin/ns4kafka/util/RegexUtils.java b/src/test/java/com/michelin/ns4kafka/util/RegexUtilsTest.java similarity index 100% rename from src/test/java/com/michelin/ns4kafka/util/RegexUtils.java rename to src/test/java/com/michelin/ns4kafka/util/RegexUtilsTest.java