diff --git a/.docker/resources/user/schema-refs.yml b/.docker/resources/user/schema-refs.yml new file mode 100644 index 00000000..1c3925c4 --- /dev/null +++ b/.docker/resources/user/schema-refs.yml @@ -0,0 +1,48 @@ +--- +apiVersion: v1 +kind: Schema +metadata: + name: abc.header-value +spec: + schema: | + { + "namespace": "io.github.michelin.ns4kafka.avro", + "type": "record", + "name": "KafkaHeader", + "fields": [ + { + "name": "id", + "type": "long" + } + ] + } +--- +apiVersion: v1 +kind: Schema +metadata: + name: abc.person-value +spec: + schema: | + { + "namespace": "io.github.michelin.ns4kafka.avro", + "type": "record", + "name": "KafkaPerson", + "fields": [ + { + "name": "header", + "type": "io.github.michelin.ns4kafka.avro.KafkaHeader" + }, + { + "name": "firstName", + "type": "string" + }, + { + "name": "lastName", + "type": "string" + } + ] + } + references: + - name: io.github.michelin.ns4kafka.avro.KafkaHeader + subject: abc.header-value + version: 1 \ No newline at end of file diff --git a/build.gradle b/build.gradle index fed73fb1..dff569fc 100644 --- a/build.gradle +++ b/build.gradle @@ -12,6 +12,9 @@ group = "com.michelin.ns4kafka" repositories { mavenCentral() + maven { + url "https://packages.confluent.io/maven" + } } dependencies { @@ -34,6 +37,8 @@ dependencies { implementation("io.swagger.core.v3:swagger-annotations") implementation("jakarta.annotation:jakarta.annotation-api") implementation("jakarta.validation:jakarta.validation-api") + implementation('io.confluent:kafka-schema-registry-client:7.5.1') + compileOnly("org.projectlombok:lombok") compileOnly("com.google.code.findbugs:jsr305") // https://github.com/micronaut-projects/micronaut-core/pull/5691 diff --git a/src/main/java/com/michelin/ns4kafka/controllers/ConnectorController.java b/src/main/java/com/michelin/ns4kafka/controllers/ConnectorController.java index 24c3864f..7e45d63f 100644 --- a/src/main/java/com/michelin/ns4kafka/controllers/ConnectorController.java +++ b/src/main/java/com/michelin/ns4kafka/controllers/ConnectorController.java @@ -122,7 +122,6 @@ public Mono> apply(String namespace, @Valid @Body Connec @QueryValue(defaultValue = "false") boolean dryrun) { Namespace ns = getNamespace(namespace); - // Validate ownership if (!connectorService.isNamespaceOwnerOfConnect(ns, connector.getMetadata().getName())) { return Mono.error(new ResourceValidationException( List.of(String.format(NAMESPACE_NOT_OWNER, connector.getMetadata().getName())), diff --git a/src/main/java/com/michelin/ns4kafka/controllers/SchemaController.java b/src/main/java/com/michelin/ns4kafka/controllers/SchemaController.java index 437dcc99..607907ef 100644 --- a/src/main/java/com/michelin/ns4kafka/controllers/SchemaController.java +++ b/src/main/java/com/michelin/ns4kafka/controllers/SchemaController.java @@ -9,6 +9,7 @@ import com.michelin.ns4kafka.services.SchemaService; import com.michelin.ns4kafka.utils.enums.ApplyStatus; import com.michelin.ns4kafka.utils.exceptions.ResourceValidationException; +import io.confluent.kafka.schemaregistry.avro.AvroSchema; import io.micronaut.http.HttpResponse; import io.micronaut.http.HttpStatus; import io.micronaut.http.annotation.Body; @@ -25,8 +26,10 @@ import jakarta.inject.Inject; import jakarta.validation.Valid; import java.time.Instant; +import java.util.Comparator; import java.util.Date; import java.util.List; +import java.util.Objects; import java.util.Optional; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -38,6 +41,8 @@ @Controller(value = "/api/namespaces/{namespace}/schemas") @ExecuteOn(TaskExecutors.IO) public class SchemaController extends NamespacedResourceController { + private static final String NAMESPACE_NOT_OWNER = "Namespace not owner of this schema %s."; + @Inject SchemaService schemaService; @@ -83,66 +88,65 @@ public Mono> apply(String namespace, @Valid @Body Schema sc @QueryValue(defaultValue = "false") boolean dryrun) { Namespace ns = getNamespace(namespace); - // Validate TopicNameStrategy - // https://github.com/confluentinc/schema-registry/blob/master/schema-serializer/src/main/java/io/confluent/kafka/serializers/subject/TopicNameStrategy.java - if (!schema.getMetadata().getName().endsWith("-key") && !schema.getMetadata().getName().endsWith("-value")) { - return Mono.error( - new ResourceValidationException(List.of("Invalid value " + schema.getMetadata().getName() - + " for name: subject must end with -key or -value"), schema.getKind(), - schema.getMetadata().getName())); - } - - // Validate ownership if (!schemaService.isNamespaceOwnerOfSubject(ns, schema.getMetadata().getName())) { - return Mono.error( - new ResourceValidationException(List.of(String.format("Namespace not owner of this schema %s.", - schema.getMetadata().getName())), schema.getKind(), schema.getMetadata().getName())); + return Mono.error(new ResourceValidationException( + List.of(String.format(NAMESPACE_NOT_OWNER, schema.getMetadata().getName())), + schema.getKind(), schema.getMetadata().getName())); } - return schemaService - .validateSchemaCompatibility(ns.getMetadata().getCluster(), schema) - .flatMap(validationErrors -> { - if (!validationErrors.isEmpty()) { - return Mono.error(new ResourceValidationException(validationErrors, schema.getKind(), + return schemaService.validateSchema(ns, schema) + .flatMap(errors -> { + if (!errors.isEmpty()) { + return Mono.error(new ResourceValidationException(errors, schema.getKind(), schema.getMetadata().getName())); } - - return schemaService - .getLatestSubject(ns, schema.getMetadata().getName()) - .map(Optional::of) - .defaultIfEmpty(Optional.empty()) - .flatMap(latestSubjectOptional -> { - schema.getMetadata().setCreationTimestamp(Date.from(Instant.now())); - schema.getMetadata().setCluster(ns.getMetadata().getCluster()); - schema.getMetadata().setNamespace(ns.getMetadata().getName()); - latestSubjectOptional.ifPresent( - value -> schema.getSpec().setCompatibility(value.getSpec().getCompatibility())); - - if (dryrun) { - // Cannot compute the "unchanged" apply status before getting the ID at registration - return Mono.just(formatHttpResponse(schema, - latestSubjectOptional.isPresent() ? ApplyStatus.changed : ApplyStatus.created)); + return schemaService.getAllSubjectVersions(ns, schema.getMetadata().getName()) + .collectList() + .flatMap(subjects -> { + // If new schema matches any of the existing schemas, return unchanged + boolean unchanged = subjects.stream().anyMatch(subject -> { + var actualSchema = new AvroSchema(subject.getSpec().getSchema(), + schemaService.getReferences(subject), schemaService.getSchemaReferences(subject, ns), + null); + var newSchema = new AvroSchema(schema.getSpec().getSchema(), + schemaService.getReferences(schema), schemaService.getSchemaReferences(schema, ns), + null); + + return Objects.equals(newSchema.canonicalString(), actualSchema.canonicalString()) + && Objects.equals(newSchema.references(), actualSchema.references()); + }); + + if (unchanged) { + return Mono.just(formatHttpResponse(schema, ApplyStatus.unchanged)); } return schemaService - .register(ns, schema) - .map(id -> { - ApplyStatus status; - - if (latestSubjectOptional.isEmpty()) { - status = ApplyStatus.created; - sendEventLog(schema.getKind(), schema.getMetadata(), status, null, - schema.getSpec()); - } else if (id > latestSubjectOptional.get().getSpec().getId()) { - status = ApplyStatus.changed; - sendEventLog(schema.getKind(), schema.getMetadata(), status, - latestSubjectOptional.get().getSpec(), - schema.getSpec()); - } else { - status = ApplyStatus.unchanged; + .validateSchemaCompatibility(ns.getMetadata().getCluster(), schema) + .flatMap(validationErrors -> { + if (!validationErrors.isEmpty()) { + return Mono.error(new ResourceValidationException( + validationErrors, schema.getKind(), schema.getMetadata().getName())); } - return formatHttpResponse(schema, status); + schema.getMetadata().setCreationTimestamp(Date.from(Instant.now())); + schema.getMetadata().setCluster(ns.getMetadata().getCluster()); + schema.getMetadata().setNamespace(ns.getMetadata().getName()); + + ApplyStatus status = subjects.isEmpty() ? ApplyStatus.created : ApplyStatus.changed; + if (dryrun) { + return Mono.just(formatHttpResponse(schema, status)); + } + + return schemaService + .register(ns, schema) + .map(id -> { + sendEventLog(schema.getKind(), schema.getMetadata(), status, + subjects.isEmpty() ? null : subjects.stream() + .max(Comparator.comparingInt((Schema s) -> s.getSpec().getId())), + schema.getSpec()); + + return formatHttpResponse(schema, status); + }); }); }); }); @@ -165,7 +169,7 @@ public Mono> deleteSubject(String namespace, @PathVariable St // Validate ownership if (!schemaService.isNamespaceOwnerOfSubject(ns, subject)) { return Mono.error(new ResourceValidationException( - List.of(String.format("Namespace not owner of this schema %s.", subject)), + List.of(String.format(NAMESPACE_NOT_OWNER, subject)), AccessControlEntry.ResourceType.SCHEMA.toString(), subject)); } diff --git a/src/main/java/com/michelin/ns4kafka/services/ConnectorService.java b/src/main/java/com/michelin/ns4kafka/services/ConnectorService.java index 1be6cf98..967d259c 100644 --- a/src/main/java/com/michelin/ns4kafka/services/ConnectorService.java +++ b/src/main/java/com/michelin/ns4kafka/services/ConnectorService.java @@ -121,15 +121,15 @@ public Mono> validateLocally(Namespace namespace, Connector connect .collect(Collectors.joining(", ")); return Mono.just( List.of("Invalid value " + connector.getSpec().getConnectCluster() - + " for spec.connectCluster: Value must be one of [" + allowedConnectClusters + "]")); + + " for spec.connectCluster: Value must be one of [" + allowedConnectClusters + "].")); } - // If class doesn't exist, no need to go further + // If class does not exist, no need to go further if (StringUtils.isEmpty(connector.getSpec().getConfig().get(CONNECTOR_CLASS))) { - return Mono.just(List.of("Invalid value for spec.config.'connector.class': Value must be non-null")); + return Mono.just(List.of("Invalid value for spec.config.'connector.class': Value must be non-null.")); } - // Connector type exists on this target connect cluster ? + // Connector type exists on this target connect cluster return kafkaConnectClient.connectPlugins(namespace.getMetadata().getCluster(), connector.getSpec().getConnectCluster()) .map(connectorPluginInfos -> { @@ -141,8 +141,8 @@ public Mono> validateLocally(Namespace namespace, Connector connect .findFirst(); if (connectorType.isEmpty()) { - return List.of("Failed to find any class that implements Connector and which name matches " - + connector.getSpec().getConfig().get(CONNECTOR_CLASS)); + return List.of("Failed to find any class that implements connector and which name matches " + + connector.getSpec().getConfig().get(CONNECTOR_CLASS) + "."); } return namespace.getSpec().getConnectValidator() != null diff --git a/src/main/java/com/michelin/ns4kafka/services/SchemaService.java b/src/main/java/com/michelin/ns4kafka/services/SchemaService.java index f829148b..35b15e8e 100644 --- a/src/main/java/com/michelin/ns4kafka/services/SchemaService.java +++ b/src/main/java/com/michelin/ns4kafka/services/SchemaService.java @@ -10,10 +10,17 @@ import com.michelin.ns4kafka.services.clients.schema.entities.SchemaCompatibilityResponse; import com.michelin.ns4kafka.services.clients.schema.entities.SchemaRequest; import com.michelin.ns4kafka.services.clients.schema.entities.SchemaResponse; +import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference; +import io.micronaut.core.util.CollectionUtils; import jakarta.inject.Inject; import jakarta.inject.Singleton; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.Objects; import java.util.Optional; +import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -61,6 +68,70 @@ public Flux findAllForNamespace(Namespace namespace) { .build()); } + /** + * Get all the subject versions for a given subject. + * + * @param namespace The namespace + * @param subject The subject + * @return All the subject versions + */ + public Flux getAllSubjectVersions(Namespace namespace, String subject) { + return schemaRegistryClient.getAllSubjectVersions(namespace.getMetadata().getCluster(), subject) + .map(subjectResponse -> Schema.builder() + .metadata(ObjectMeta.builder() + .cluster(namespace.getMetadata().getCluster()) + .namespace(namespace.getMetadata().getName()) + .name(subjectResponse.subject()) + .build()) + .spec(Schema.SchemaSpec.builder() + .id(subjectResponse.id()) + .version(subjectResponse.version()) + .schema(subjectResponse.schema()) + .schemaType(subjectResponse.schemaType() == null ? Schema.SchemaType.AVRO : + Schema.SchemaType.valueOf(subjectResponse.schemaType())) + .references(subjectResponse.references()) + .build()) + .build() + ); + } + + /** + * Get a subject by its name and version. + * + * @param namespace The namespace + * @param subject The subject + * @param version The version + * @return A Subject + */ + public Mono getSubject(Namespace namespace, String subject, Integer version) { + return schemaRegistryClient + .getSubject(namespace.getMetadata().getCluster(), subject, version) + .flatMap(latestSubjectOptional -> schemaRegistryClient + .getCurrentCompatibilityBySubject(namespace.getMetadata().getCluster(), subject) + .map(Optional::of) + .defaultIfEmpty(Optional.empty()) + .map(currentCompatibilityOptional -> { + Schema.Compatibility compatibility = currentCompatibilityOptional.isPresent() + ? currentCompatibilityOptional.get().compatibilityLevel() : Schema.Compatibility.GLOBAL; + + return Schema.builder() + .metadata(ObjectMeta.builder() + .cluster(namespace.getMetadata().getCluster()) + .namespace(namespace.getMetadata().getName()) + .name(latestSubjectOptional.subject()) + .build()) + .spec(Schema.SchemaSpec.builder() + .id(latestSubjectOptional.id()) + .version(latestSubjectOptional.version()) + .compatibility(compatibility) + .schema(latestSubjectOptional.schema()) + .schemaType(latestSubjectOptional.schemaType() == null ? Schema.SchemaType.AVRO : + Schema.SchemaType.valueOf(latestSubjectOptional.schemaType())) + .build()) + .build(); + })); + } + /** * Get the last version of a schema by namespace and subject. * @@ -97,6 +168,59 @@ public Mono getLatestSubject(Namespace namespace, String subject) { })); } + /** + * Validate a schema when it is created or updated. + * + * @param namespace The namespace + * @param schema The schema to validate + * @return A list of errors + */ + public Mono> validateSchema(Namespace namespace, Schema schema) { + return Mono.defer(() -> { + List validationErrors = new ArrayList<>(); + + // Validate TopicNameStrategy + // https://github.com/confluentinc/schema-registry/blob/master/schema-serializer/src/main/java/io/confluent/kafka/serializers/subject/TopicNameStrategy.java + if (!schema.getMetadata().getName().endsWith("-key") + && !schema.getMetadata().getName().endsWith("-value")) { + validationErrors.add(String.format("Invalid value %s for name: Value must end with -key or -value.", + schema.getMetadata().getName())); + } + + if (!CollectionUtils.isEmpty(schema.getSpec().getReferences())) { + return Mono.zip(validateReferences(namespace, schema), Mono.just(validationErrors), + (referenceErrors, errors) -> { + errors.addAll(referenceErrors); + return errors; + }); + } + + return Mono.just(validationErrors); + }); + } + + /** + * Validate the references of a schema. + * + * @param ns The namespace + * @param schema The schema to validate + * @return A list of errors + */ + private Mono> validateReferences(Namespace ns, Schema schema) { + return Flux.fromIterable(schema.getSpec().getReferences()) + .flatMap(reference -> getSubject(ns, reference.getSubject(), reference.getVersion()) + .map(Optional::of) + .defaultIfEmpty(Optional.empty()) + .mapNotNull(schemaOptional -> { + if (schemaOptional.isEmpty()) { + return String.format("Reference %s version %s not found.", + reference.getSubject(), reference.getVersion()); + } + return null; + })) + .collectList(); + } + /** * Publish a schema. * @@ -191,4 +315,41 @@ public boolean isNamespaceOwnerOfSubject(Namespace namespace, String subjectName AccessControlEntry.ResourceType.TOPIC, underlyingTopicName); } + + /** + * Get all the schema of all the references for a given schema. + * + * @param schema The schema + * @param namespace The namespace + * @return The schema references + */ + public Map getSchemaReferences(Schema schema, Namespace namespace) { + if (CollectionUtils.isEmpty(schema.getSpec().getReferences())) { + return Collections.emptyMap(); + } + + return schema.getSpec().getReferences() + .stream() + .map(reference -> getSubject(namespace, reference.getSubject(), reference.getVersion()).block()) + .collect(Collectors.toMap(s -> + Objects.requireNonNull(s).getMetadata().getName(), s -> s.getSpec().getSchema())); + } + + /** + * Get the schema references. + * + * @param schema The schema + * @return The schema references + */ + public List getReferences(Schema schema) { + if (CollectionUtils.isEmpty(schema.getSpec().getReferences())) { + return Collections.emptyList(); + } + + return schema.getSpec().getReferences() + .stream() + .map(reference -> + new SchemaReference(reference.getName(), reference.getSubject(), reference.getVersion())) + .toList(); + } } diff --git a/src/main/java/com/michelin/ns4kafka/services/clients/schema/SchemaRegistryClient.java b/src/main/java/com/michelin/ns4kafka/services/clients/schema/SchemaRegistryClient.java index 114f5ff3..a1908d50 100644 --- a/src/main/java/com/michelin/ns4kafka/services/clients/schema/SchemaRegistryClient.java +++ b/src/main/java/com/michelin/ns4kafka/services/clients/schema/SchemaRegistryClient.java @@ -15,7 +15,6 @@ import io.micronaut.http.HttpRequest; import io.micronaut.http.HttpResponse; import io.micronaut.http.HttpStatus; -import io.micronaut.http.MediaType; import io.micronaut.http.MutableHttpRequest; import io.micronaut.http.client.HttpClient; import io.micronaut.http.client.annotation.Client; @@ -23,6 +22,7 @@ import jakarta.inject.Inject; import jakarta.inject.Singleton; import java.net.URI; +import java.util.Arrays; import java.util.List; import java.util.Optional; import lombok.extern.slf4j.Slf4j; @@ -68,11 +68,55 @@ public Flux getSubjects(String kafkaCluster) { public Mono getLatestSubject(String kafkaCluster, String subject) { ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster); HttpRequest request = HttpRequest.GET( - URI.create(StringUtils.prependUri(config.getUrl(), SUBJECTS + subject + "/versions/latest"))) + URI.create(StringUtils.prependUri(config.getUrl(), SUBJECTS + subject + "/versions/latest"))) + .basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword()); + return Mono.from(httpClient.retrieve(request, SchemaResponse.class)) + .onErrorResume(HttpClientResponseException.class, + ex -> ex.getStatus().equals(HttpStatus.NOT_FOUND) ? Mono.empty() : Mono.error(ex)); + } + + /** + * Get a subject by it name and id. + * + * @param kafkaCluster The Kafka cluster + * @param subject The subject + * @param version The subject version + * @return A subject + */ + public Mono getSubject(String kafkaCluster, String subject, Integer version) { + ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster); + HttpRequest request = HttpRequest.GET( + URI.create(StringUtils.prependUri(config.getUrl(), SUBJECTS + subject + "/versions/" + version))) .basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword()); return Mono.from(httpClient.retrieve(request, SchemaResponse.class)) + .onErrorResume(HttpClientResponseException.class, + ex -> ex.getStatus().equals(HttpStatus.NOT_FOUND) ? Mono.empty() : Mono.error(ex)); + } + + /** + * Get all the versions of a given subject. + * + * @param kafkaCluster The Kafka cluster + * @param subject The subject + * @return All the versions of a subject + */ + public Flux getAllSubjectVersions(String kafkaCluster, String subject) { + ManagedClusterProperties.SchemaRegistryProperties config = getSchemaRegistry(kafkaCluster); + HttpRequest request = HttpRequest.GET( + URI.create(StringUtils.prependUri(config.getUrl(), SUBJECTS + subject + "/versions"))) + .basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword()); + + return Flux.from(httpClient.retrieve(request, Integer[].class)) + .flatMap(ids -> Flux.fromIterable(Arrays.asList(ids)) + .flatMap(id -> { + HttpRequest requestVersion = HttpRequest.GET( + URI.create(StringUtils.prependUri(config.getUrl(), SUBJECTS + subject + "/versions/" + id))) + .basicAuth(config.getBasicAuthUsername(), config.getBasicAuthPassword()); + + return httpClient.retrieve(requestVersion, SchemaResponse.class); + })) .onErrorResume(HttpClientResponseException.class, - ex -> ex.getStatus().equals(HttpStatus.NOT_FOUND) ? Mono.empty() : Mono.error(ex)); + ex -> ex.getStatus().equals(HttpStatus.NOT_FOUND) ? Flux.empty() : Flux.error(ex)); } /** diff --git a/src/main/java/com/michelin/ns4kafka/services/clients/schema/entities/SchemaResponse.java b/src/main/java/com/michelin/ns4kafka/services/clients/schema/entities/SchemaResponse.java index 5d9c4388..1c861b6f 100644 --- a/src/main/java/com/michelin/ns4kafka/services/clients/schema/entities/SchemaResponse.java +++ b/src/main/java/com/michelin/ns4kafka/services/clients/schema/entities/SchemaResponse.java @@ -1,5 +1,7 @@ package com.michelin.ns4kafka.services.clients.schema.entities; +import com.michelin.ns4kafka.models.schema.Schema; +import java.util.List; import lombok.Builder; /** @@ -10,7 +12,13 @@ * @param subject The subject * @param schema The schema * @param schemaType The schema type + * @param references The schema references */ @Builder -public record SchemaResponse(Integer id, Integer version, String subject, String schema, String schemaType) { +public record SchemaResponse(Integer id, + Integer version, + String subject, + String schema, + String schemaType, + List references) { } diff --git a/src/test/java/com/michelin/ns4kafka/controllers/SchemaControllerTest.java b/src/test/java/com/michelin/ns4kafka/controllers/SchemaControllerTest.java index 7f7ade4e..d5962202 100644 --- a/src/test/java/com/michelin/ns4kafka/controllers/SchemaControllerTest.java +++ b/src/test/java/com/michelin/ns4kafka/controllers/SchemaControllerTest.java @@ -57,8 +57,9 @@ void applyCreated() { when(namespaceService.findByName("myNamespace")).thenReturn(Optional.of(namespace)); when(schemaService.isNamespaceOwnerOfSubject(namespace, schema.getMetadata().getName())).thenReturn(true); + when(schemaService.validateSchema(namespace, schema)).thenReturn(Mono.just(List.of())); when(schemaService.validateSchemaCompatibility("local", schema)).thenReturn(Mono.just(List.of())); - when(schemaService.getLatestSubject(namespace, schema.getMetadata().getName())).thenReturn(Mono.empty()); + when(schemaService.getAllSubjectVersions(namespace, schema.getMetadata().getName())).thenReturn(Flux.empty()); when(schemaService.register(namespace, schema)).thenReturn(Mono.just(1)); when(securityService.username()).thenReturn(Optional.of("test-user")); when(securityService.hasRole(ResourceBasedSecurityRule.IS_ADMIN)).thenReturn(false); @@ -77,18 +78,20 @@ void applyCreated() { void applyChanged() { Namespace namespace = buildNamespace(); Schema schema = buildSchema(); + Schema schemaV2 = buildSchemaV2(); when(namespaceService.findByName("myNamespace")).thenReturn(Optional.of(namespace)); when(schemaService.isNamespaceOwnerOfSubject(namespace, schema.getMetadata().getName())).thenReturn(true); - when(schemaService.validateSchemaCompatibility("local", schema)).thenReturn(Mono.just(List.of())); - when(schemaService.getLatestSubject(namespace, schema.getMetadata().getName())) - .thenReturn(Mono.just(schema)); - when(schemaService.register(namespace, schema)).thenReturn(Mono.just(2)); + when(schemaService.validateSchema(namespace, schemaV2)).thenReturn(Mono.just(List.of())); + when(schemaService.validateSchemaCompatibility("local", schemaV2)).thenReturn(Mono.just(List.of())); + when(schemaService.getAllSubjectVersions(namespace, schema.getMetadata().getName())) + .thenReturn(Flux.just(schema)); + when(schemaService.register(namespace, schemaV2)).thenReturn(Mono.just(2)); when(securityService.username()).thenReturn(Optional.of("test-user")); when(securityService.hasRole(ResourceBasedSecurityRule.IS_ADMIN)).thenReturn(false); doNothing().when(applicationEventPublisher).publishEvent(any()); - StepVerifier.create(schemaController.apply("myNamespace", schema, false)) + StepVerifier.create(schemaController.apply("myNamespace", schemaV2, false)) .consumeNextWith(response -> { assertEquals("changed", response.header("X-Ns4kafka-Result")); assertTrue(response.getBody().isPresent()); @@ -104,9 +107,9 @@ void applyUnchanged() { when(namespaceService.findByName("myNamespace")).thenReturn(Optional.of(namespace)); when(schemaService.isNamespaceOwnerOfSubject(namespace, schema.getMetadata().getName())).thenReturn(true); - when(schemaService.validateSchemaCompatibility("local", schema)).thenReturn(Mono.just(List.of())); - when(schemaService.getLatestSubject(namespace, schema.getMetadata().getName())).thenReturn(Mono.just(schema)); - when(schemaService.register(namespace, schema)).thenReturn(Mono.just(1)); + when(schemaService.validateSchema(namespace, schema)).thenReturn(Mono.just(List.of())); + when(schemaService.getAllSubjectVersions(namespace, schema.getMetadata().getName())) + .thenReturn(Flux.just(schema)); StepVerifier.create(schemaController.apply("myNamespace", schema, false)) .consumeNextWith(response -> { @@ -117,39 +120,39 @@ void applyUnchanged() { .verifyComplete(); } + @Test - void applyWrongSubjectName() { + void applyNamespaceNotOwnerOfSubject() { Namespace namespace = buildNamespace(); Schema schema = buildSchema(); - schema.getMetadata().setName("wrongSubjectName"); when(namespaceService.findByName("myNamespace")).thenReturn(Optional.of(namespace)); + when(schemaService.isNamespaceOwnerOfSubject(namespace, schema.getMetadata().getName())).thenReturn(false); StepVerifier.create(schemaController.apply("myNamespace", schema, false)) .consumeErrorWith(error -> { assertEquals(ResourceValidationException.class, error.getClass()); assertEquals(1, ((ResourceValidationException) error).getValidationErrors().size()); - assertEquals("Invalid value wrongSubjectName for name: subject must end with -key or -value", + assertEquals("Namespace not owner of this schema prefix.subject-value.", ((ResourceValidationException) error).getValidationErrors().get(0)); }) .verify(); - - verify(schemaService, never()).register(namespace, schema); } @Test - void applyNamespaceNotOwnerOfSubject() { + void applyValidationErrors() { Namespace namespace = buildNamespace(); Schema schema = buildSchema(); when(namespaceService.findByName("myNamespace")).thenReturn(Optional.of(namespace)); - when(schemaService.isNamespaceOwnerOfSubject(namespace, schema.getMetadata().getName())).thenReturn(false); + when(schemaService.isNamespaceOwnerOfSubject(namespace, schema.getMetadata().getName())).thenReturn(true); + when(schemaService.validateSchema(namespace, schema)).thenReturn(Mono.just(List.of("Errors"))); StepVerifier.create(schemaController.apply("myNamespace", schema, false)) .consumeErrorWith(error -> { assertEquals(ResourceValidationException.class, error.getClass()); assertEquals(1, ((ResourceValidationException) error).getValidationErrors().size()); - assertEquals("Namespace not owner of this schema prefix.subject-value.", + assertEquals("Errors", ((ResourceValidationException) error).getValidationErrors().get(0)); }) .verify(); @@ -162,8 +165,9 @@ void applyDryRunCreated() { when(namespaceService.findByName("myNamespace")).thenReturn(Optional.of(namespace)); when(schemaService.isNamespaceOwnerOfSubject(namespace, schema.getMetadata().getName())).thenReturn(true); + when(schemaService.validateSchema(namespace, schema)).thenReturn(Mono.just(List.of())); when(schemaService.validateSchemaCompatibility("local", schema)).thenReturn(Mono.just(List.of())); - when(schemaService.getLatestSubject(namespace, schema.getMetadata().getName())).thenReturn(Mono.empty()); + when(schemaService.getAllSubjectVersions(namespace, schema.getMetadata().getName())).thenReturn(Flux.empty()); StepVerifier.create(schemaController.apply("myNamespace", schema, true)) .consumeNextWith(response -> { @@ -180,13 +184,17 @@ void applyDryRunCreated() { void applyDryRunChanged() { Namespace namespace = buildNamespace(); Schema schema = buildSchema(); + Schema schemaV2 = buildSchemaV2(); when(namespaceService.findByName("myNamespace")).thenReturn(Optional.of(namespace)); - when(schemaService.isNamespaceOwnerOfSubject(namespace, schema.getMetadata().getName())).thenReturn(true); - when(schemaService.validateSchemaCompatibility("local", schema)).thenReturn(Mono.just(List.of())); - when(schemaService.getLatestSubject(namespace, schema.getMetadata().getName())).thenReturn(Mono.just(schema)); - - StepVerifier.create(schemaController.apply("myNamespace", schema, true)) + when(schemaService.isNamespaceOwnerOfSubject(namespace, schema.getMetadata().getName())) + .thenReturn(true); + when(schemaService.validateSchema(namespace, schemaV2)).thenReturn(Mono.just(List.of())); + when(schemaService.validateSchemaCompatibility("local", schemaV2)).thenReturn(Mono.just(List.of())); + when(schemaService.getAllSubjectVersions(namespace, schema.getMetadata().getName())) + .thenReturn(Flux.just(schema)); + + StepVerifier.create(schemaController.apply("myNamespace", schemaV2, true)) .consumeNextWith(response -> { assertEquals("changed", response.header("X-Ns4kafka-Result")); assertTrue(response.getBody().isPresent()); @@ -194,16 +202,20 @@ void applyDryRunChanged() { }) .verifyComplete(); - verify(schemaService, never()).register(namespace, schema); + verify(schemaService, never()).register(namespace, schemaV2); } @Test void applyDryRunNotCompatible() { Namespace namespace = buildNamespace(); Schema schema = buildSchema(); + Schema schemaV2 = buildSchemaV2(); when(namespaceService.findByName("myNamespace")).thenReturn(Optional.of(namespace)); when(schemaService.isNamespaceOwnerOfSubject(namespace, schema.getMetadata().getName())).thenReturn(true); + when(schemaService.validateSchema(namespace, schema)).thenReturn(Mono.just(List.of())); + when(schemaService.getAllSubjectVersions(namespace, schema.getMetadata().getName())) + .thenReturn(Flux.just(schemaV2)); when(schemaService.validateSchemaCompatibility("local", schema)).thenReturn( Mono.just(List.of("Not compatible"))); @@ -446,6 +458,28 @@ private Schema buildSchema() { .build(); } + private Schema buildSchemaV2() { + return Schema.builder() + .metadata(ObjectMeta.builder() + .name("prefix.subject-value") + .build()) + .spec(Schema.SchemaSpec.builder() + .id(1) + .version(2) + .schema( + "{\"namespace\":\"com.michelin.kafka.producer.showcase.avro\",\"type\":\"record\"," + + "\"name\":\"PersonAvro\"" + + ",\"fields\":[{\"name\":\"firstName\",\"type\":[\"null\",\"string\"],\"default\":null," + + "\"doc\":\"First name of the person\"},{\"name\":\"lastName\",\"type\":[\"null\"," + + "\"string\"],\"default\":null,\"doc\":\"Last name of the person\"}," + + "{\"name\":\"dateOfBirth\",\"type\":[\"null\",{\"type\":\"long\"," + + "\"logicalType\":\"timestamp-millis\"}],\"default\":null," + + "\"doc\":\"Date of birth of the person\"},{\"name\":\"birthPlace\",\"type\":[\"null\"," + + "\"string\"],\"default\":null,\"doc\":\"Place of birth\"}]}") + .build()) + .build(); + } + private SchemaList buildSchemaList() { return SchemaList.builder() .metadata(ObjectMeta.builder() diff --git a/src/test/java/com/michelin/ns4kafka/integration/SchemaTest.java b/src/test/java/com/michelin/ns4kafka/integration/SchemaTest.java index 42270d28..7c4032bc 100644 --- a/src/test/java/com/michelin/ns4kafka/integration/SchemaTest.java +++ b/src/test/java/com/michelin/ns4kafka/integration/SchemaTest.java @@ -14,6 +14,8 @@ import com.michelin.ns4kafka.models.schema.SchemaList; import com.michelin.ns4kafka.services.clients.schema.entities.SchemaCompatibilityResponse; import com.michelin.ns4kafka.services.clients.schema.entities.SchemaResponse; +import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest; +import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaResponse; import io.micronaut.context.ApplicationContext; import io.micronaut.context.annotation.Property; import io.micronaut.core.type.Argument; @@ -256,7 +258,7 @@ void registerSchemaIncompatibility() { } @Test - void registerSchemaWithReferences() { + void shouldRegisterSchemaWithReferences() { Schema schemaHeader = Schema.builder() .metadata(ObjectMeta.builder() .name("ns1-header-subject-value") @@ -353,6 +355,145 @@ void registerSchemaWithReferences() { assertEquals("ns1-person-subject-value", actualPerson.subject()); } + @Test + void shouldCheckSchemaStatus() { + // Create header + Schema schemaHeader = Schema.builder() + .metadata(ObjectMeta.builder() + .name("ns1-header-subject-value") + .build()) + .spec(Schema.SchemaSpec.builder() + .schema("{\"namespace\":\"com.michelin.kafka.producer.showcase.avro\",\"type\":\"record\"," + + "\"name\":\"HeaderAvro\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"string\"]," + + "\"default\":null,\"doc\":\"ID of the header\"}]}") + .build()) + .build(); + + ns4KafkaClient.toBlocking().exchange(HttpRequest.create(HttpMethod.POST, "/api/namespaces/ns1/schemas") + .bearerAuth(token) + .body(schemaHeader), Schema.class); + + // Create person + Schema schemaPersonWithRefs = Schema.builder() + .metadata(ObjectMeta.builder() + .name("ns1-person-subject-value") + .build()) + .spec(Schema.SchemaSpec.builder() + .schema("{\"namespace\":\"com.michelin.kafka.producer.showcase.avro\"," + + "\"type\":\"record\",\"name\":\"PersonAvro\",\"fields\":[{\"name\":\"header\",\"type\":[\"null\"," + + "\"com.michelin.kafka.producer.showcase.avro.HeaderAvro\"]," + + "\"default\":null,\"doc\":\"Header of the person\"},{\"name\":\"firstName\"," + + "\"type\":[\"null\",\"string\"],\"default\":null,\"doc\":\"First name of the person\"}," + + "{\"name\":\"lastName\",\"type\":[\"null\",\"string\"],\"default\":null,\"doc\":" + + "\"Last name of the person\"},{\"name\":\"dateOfBirth\",\"type\":[\"null\",{\"type\":\"long\"," + + "\"logicalType\":\"timestamp-millis\"}],\"default\":null," + + "\"doc\":\"Date of birth of the person\"}]}") + .references(List.of(Schema.SchemaSpec.Reference.builder() + .name("com.michelin.kafka.producer.showcase.avro.HeaderAvro") + .subject("ns1-header-subject-value") + .version(1) + .build())) + .build()) + .build(); + + ns4KafkaClient.toBlocking().exchange(HttpRequest.create(HttpMethod.POST, "/api/namespaces/ns1/schemas") + .bearerAuth(token) + .body(schemaPersonWithRefs), Schema.class); + + // Create person, result should be unchanged + var personUnchangedResponse = + ns4KafkaClient.toBlocking().exchange(HttpRequest.create(HttpMethod.POST, "/api/namespaces/ns1/schemas") + .bearerAuth(token) + .body(schemaPersonWithRefs), Schema.class); + + assertEquals("unchanged", personUnchangedResponse.header("X-Ns4kafka-Result")); + + // Create person v2, result should be changed + Schema newSchemaVersionPersonWithRefs = Schema.builder() + .metadata(ObjectMeta.builder() + .name("ns1-person-subject-value") + .build()) + .spec(Schema.SchemaSpec.builder() + .schema("{\"namespace\":\"com.michelin.kafka.producer.showcase.avro\"," + + "\"type\":\"record\",\"name\":\"PersonAvro\",\"fields\":[{\"name\":\"header\",\"type\":[\"null\"," + + "\"com.michelin.kafka.producer.showcase.avro.HeaderAvro\"]," + + "\"default\":null,\"doc\":\"Header of the person\"},{\"name\":\"firstName\"," + + "\"type\":[\"null\",\"string\"],\"default\":null,\"doc\":\"First name of the person\"}," + + "{\"name\":\"lastName\",\"type\":[\"null\",\"string\"],\"default\":null,\"doc\":" + + "\"Last name of the person\"},{\"name\":\"dateOfBirth\",\"type\":[\"null\",{\"type\":\"long\"," + + "\"logicalType\":\"timestamp-millis\"}],\"default\":null," + + "\"doc\":\"Date of birth of the person\"},{\"name\":\"birthPlace\",\"type\":[\"null\"," + + "\"string\"],\"default\":null,\"doc\":\"Place of birth\"}]}") + .references(List.of(Schema.SchemaSpec.Reference.builder() + .name("com.michelin.kafka.producer.showcase.avro.HeaderAvro") + .subject("ns1-header-subject-value") + .version(1) + .build())) + .build()) + .build(); + + var newPersonCreateResponse = + ns4KafkaClient.toBlocking().exchange(HttpRequest.create(HttpMethod.POST, "/api/namespaces/ns1/schemas") + .bearerAuth(token) + .body(newSchemaVersionPersonWithRefs), Schema.class); + + assertEquals("changed", newPersonCreateResponse.header("X-Ns4kafka-Result")); + + SchemaResponse newActualPerson = schemaRegistryClient.toBlocking() + .retrieve(HttpRequest.GET("/subjects/ns1-person-subject-value/versions/latest"), + SchemaResponse.class); + + Assertions.assertNotNull(newActualPerson.id()); + assertEquals(2, newActualPerson.version()); + assertEquals("ns1-person-subject-value", newActualPerson.subject()); + + // Recreate person v1, result should be unchanged + var personCreateV1Response = + ns4KafkaClient.toBlocking().exchange(HttpRequest.create(HttpMethod.POST, "/api/namespaces/ns1/schemas") + .bearerAuth(token) + .body(schemaPersonWithRefs), Schema.class); + + assertEquals("unchanged", personCreateV1Response.header("X-Ns4kafka-Result")); + + Schema schemaHeaderV2 = Schema.builder() + .metadata(ObjectMeta.builder() + .name("ns1-header-subject-value") + .build()) + .spec(Schema.SchemaSpec.builder() + .schema("{\"namespace\":\"com.michelin.kafka.producer.showcase.avro\",\"type\":\"record\"," + + "\"name\":\"HeaderAvro\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"string\"]," + + "\"default\":null,\"doc\":\"ID of the header\"},{\"name\":\"value\",\"type\":[\"null\"," + + "\"string\"],\"default\":null,\"doc\":\"value of the header\"}]}") + .build()) + .build(); + + // Create header v2 + var headerV2CreateResponse = + ns4KafkaClient.toBlocking().exchange(HttpRequest.create(HttpMethod.POST, "/api/namespaces/ns1/schemas") + .bearerAuth(token) + .body(schemaHeaderV2), Schema.class); + + assertEquals("changed", headerV2CreateResponse.header("X-Ns4kafka-Result")); + + SchemaResponse actualHeaderV2 = schemaRegistryClient.toBlocking() + .retrieve(HttpRequest.GET("/subjects/ns1-header-subject-value/versions/latest"), + SchemaResponse.class); + + Assertions.assertNotNull(actualHeaderV2.id()); + assertEquals(2, actualHeaderV2.version()); + assertEquals("ns1-header-subject-value", actualHeaderV2.subject()); + + // Create person referencing header v2, result should be changed + newSchemaVersionPersonWithRefs.getSpec().getReferences().get(0).setVersion(2); + + var newPersonCreateWithV2RefResponse = + ns4KafkaClient.toBlocking().exchange(HttpRequest.create(HttpMethod.POST, "/api/namespaces/ns1/schemas") + .bearerAuth(token) + .body(newSchemaVersionPersonWithRefs), Schema.class); + + assertEquals("changed", newPersonCreateWithV2RefResponse.header("X-Ns4kafka-Result")); + } + @Test void registerSchemaWrongPrefix() { Schema wrongSchema = Schema.builder() @@ -458,4 +599,100 @@ void registerSchema() { assertEquals(HttpStatus.NOT_FOUND, getException.getStatus()); } + + @Test + void registerSameSchemaTwice() { + Schema schema = Schema.builder() + .metadata(ObjectMeta.builder() + .name("ns1-subject3-value") + .build()) + .spec(Schema.SchemaSpec.builder() + .schema( + "{\"namespace\":\"com.michelin.kafka.producer.showcase.avro\",\"type\":\"record\"," + + "\"name\":\"PersonAvro\",\"fields\":[{\"name\":\"firstName\",\"type\":[\"null\",\"string\"]," + + "\"default\":null,\"doc\":\"First name of the person\"}," + + "{\"name\":\"lastName\",\"type\":[\"null\",\"string\"],\"default\":null," + + "\"doc\":\"Last name of the person\"},{\"name\":\"dateOfBirth\",\"type\":[\"null\"," + + "{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null," + + "\"doc\":\"Date of birth of the person\"}]}") + .build()) + .build(); + + // Apply schema + var createResponse = + ns4KafkaClient.toBlocking().exchange(HttpRequest.create(HttpMethod.POST, "/api/namespaces/ns1/schemas") + .bearerAuth(token) + .body(schema), Schema.class); + + assertEquals("created", createResponse.header("X-Ns4kafka-Result")); + + // Get all schemas + var getResponse = + ns4KafkaClient.toBlocking().exchange(HttpRequest.create(HttpMethod.GET, "/api/namespaces/ns1/schemas") + .bearerAuth(token), Argument.listOf(SchemaList.class)); + + assertTrue(getResponse.getBody().isPresent()); + assertTrue(getResponse.getBody().get() + .stream() + .anyMatch(schemaList -> schemaList.getMetadata().getName().equals("ns1-subject3-value"))); + + // Apply the same schema with swapped fields + Schema sameSchemaWithSwappedFields = Schema.builder() + .metadata(ObjectMeta.builder() + .name("ns1-subject3-value") + .build()) + .spec(Schema.SchemaSpec.builder() + .schema( + "{\"namespace\":\"com.michelin.kafka.producer.showcase.avro\",\"type\":\"record\"," + + "\"name\":\"PersonAvro\",\"fields\":[ {\"name\":\"lastName\",\"type\":[\"null\",\"string\"]," + + "\"default\":null, \"doc\":\"Last name of the person\"}," + + "{\"name\":\"firstName\",\"type\":[\"null\",\"string\"], \"default\":null," + + "\"doc\":\"First name of the person\"}, {\"name\":\"dateOfBirth\",\"type\":[\"null\"," + + "{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null," + + "\"doc\":\"Date of birth of the person\"}]}") + .build()) + .build(); + + var createSwappedFieldsResponse = + ns4KafkaClient.toBlocking().exchange(HttpRequest.create(HttpMethod.POST, "/api/namespaces/ns1/schemas") + .bearerAuth(token) + .body(sameSchemaWithSwappedFields), Schema.class); + + // Expects a new version + assertEquals("changed", createSwappedFieldsResponse.header("X-Ns4kafka-Result")); + + // Get the latest version to check if we are on v2 + SchemaResponse schemaAfterApply = schemaRegistryClient.toBlocking() + .retrieve(HttpRequest.GET("/subjects/ns1-subject3-value/versions/latest"), + SchemaResponse.class); + + Assertions.assertNotNull(schemaAfterApply.id()); + assertEquals(2, schemaAfterApply.version()); + assertEquals("ns1-subject3-value", schemaAfterApply.subject()); + + // Apply again the schema with swapped fields + var createAgainResponse = + ns4KafkaClient.toBlocking().exchange(HttpRequest.create(HttpMethod.POST, "/api/namespaces/ns1/schemas") + .bearerAuth(token) + .body(sameSchemaWithSwappedFields), Schema.class); + + // Expects no new schema version but an unchanged status + assertEquals("unchanged", createAgainResponse.header("X-Ns4kafka-Result")); + + // Apply again with SR client to be sure that + RegisterSchemaRequest request = new RegisterSchemaRequest(); + request.setSchema(sameSchemaWithSwappedFields.getSpec().getSchema()); + + schemaRegistryClient.toBlocking() + .exchange(HttpRequest.create(HttpMethod.POST, "/subjects/ns1-subject3-value/versions") + .body(request), RegisterSchemaResponse.class); + + SchemaResponse schemaAfterPostOnRegistry = schemaRegistryClient.toBlocking() + .retrieve(HttpRequest.GET("/subjects/ns1-subject3-value/versions/latest"), + SchemaResponse.class); + + Assertions.assertNotNull(schemaAfterPostOnRegistry.id()); + assertEquals(2, schemaAfterPostOnRegistry.version()); + assertEquals("ns1-subject3-value", schemaAfterPostOnRegistry.subject()); + } } diff --git a/src/test/java/com/michelin/ns4kafka/services/ConnectorServiceTest.java b/src/test/java/com/michelin/ns4kafka/services/ConnectorServiceTest.java index 67da839e..6d71c67a 100644 --- a/src/test/java/com/michelin/ns4kafka/services/ConnectorServiceTest.java +++ b/src/test/java/com/michelin/ns4kafka/services/ConnectorServiceTest.java @@ -359,7 +359,7 @@ void validateLocallyInvalidConnectCluster() { StepVerifier.create(connectorService.validateLocally(ns, connector)) .consumeNextWith(response -> { assertEquals(1, response.size()); - assertEquals("Invalid value wrong for spec.connectCluster: Value must be one of [local-name]", + assertEquals("Invalid value wrong for spec.connectCluster: Value must be one of [local-name].", response.get(0)); }) .verifyComplete(); @@ -374,6 +374,7 @@ void validateLocallyNoClassName() { .config(Map.of()) .build()) .build(); + Namespace ns = Namespace.builder() .metadata(ObjectMeta.builder() .name("namespace") @@ -387,7 +388,7 @@ void validateLocallyNoClassName() { StepVerifier.create(connectorService.validateLocally(ns, connector)) .consumeNextWith(response -> { assertEquals(1, response.size()); - assertEquals("Invalid value for spec.config.'connector.class': Value must be non-null", + assertEquals("Invalid value for spec.config.'connector.class': Value must be non-null.", response.get(0)); }) .verifyComplete(); @@ -419,8 +420,8 @@ void validateLocallyInvalidClassName() { .consumeNextWith(response -> { assertEquals(1, response.size()); assertEquals( - "Failed to find any class that implements Connector and which name matches " - + "org.apache.kafka.connect.file.FileStreamSinkConnector", + "Failed to find any class that implements connector and which name matches " + + "org.apache.kafka.connect.file.FileStreamSinkConnector.", response.get(0)); }) .verifyComplete(); diff --git a/src/test/java/com/michelin/ns4kafka/services/SchemaServiceTest.java b/src/test/java/com/michelin/ns4kafka/services/SchemaServiceTest.java index 4a900533..2652dde5 100644 --- a/src/test/java/com/michelin/ns4kafka/services/SchemaServiceTest.java +++ b/src/test/java/com/michelin/ns4kafka/services/SchemaServiceTest.java @@ -120,6 +120,24 @@ void getBySubjectAndVersion() { .verifyComplete(); } + @Test + void getAllSubjectVersions() { + Namespace namespace = buildNamespace(); + SchemaResponse schemaResponse = buildSchemaResponse("prefix.schema-one"); + + when(schemaRegistryClient.getAllSubjectVersions(namespace.getMetadata().getCluster(), + "prefix.schema-one")).thenReturn(Flux.just(schemaResponse)); + + StepVerifier.create(schemaService.getAllSubjectVersions(namespace, "prefix.schema-one")) + .consumeNextWith(subjectVersion -> { + assertEquals("prefix.schema-one", subjectVersion.getMetadata().getName()); + assertEquals("local", subjectVersion.getMetadata().getCluster()); + assertEquals("myNamespace", subjectVersion.getMetadata().getNamespace()); + assertEquals(schemaResponse.references(), subjectVersion.getSpec().getReferences()); + }) + .verifyComplete(); + } + @Test void getBySubjectAndVersionEmptyResponse() { Namespace namespace = buildNamespace(); @@ -265,6 +283,51 @@ void isNamespaceOwnerOfSubjectTest() { assertTrue(schemaService.isNamespaceOwnerOfSubject(ns, "prefix.schema-one")); } + @Test + void shouldValidateSchema() { + Namespace namespace = buildNamespace(); + Schema schema = buildSchema(); + SchemaCompatibilityResponse compatibilityResponse = buildCompatibilityResponse(); + schema.getSpec().setReferences(List.of(Schema.SchemaSpec.Reference.builder() + .name("reference") + .subject("subject-reference") + .version(1) + .build())); + + when(schemaRegistryClient.getSubject(namespace.getMetadata().getCluster(), + "subject-reference", 1)) + .thenReturn(Mono.just(buildSchemaResponse("subject-reference"))); + when(schemaRegistryClient.getCurrentCompatibilityBySubject(any(), any())).thenReturn( + Mono.just(compatibilityResponse)); + + StepVerifier.create(schemaService.validateSchema(namespace, schema)) + .consumeNextWith(errors -> assertTrue(errors.isEmpty())) + .verifyComplete(); + } + + @Test + void shouldNotValidateSchema() { + Namespace namespace = buildNamespace(); + Schema schema = buildSchema(); + schema.getMetadata().setName("wrongSubjectName"); + schema.getSpec().setReferences(List.of(Schema.SchemaSpec.Reference.builder() + .name("reference") + .subject("subject-reference") + .version(1) + .build())); + + when(schemaRegistryClient.getSubject(namespace.getMetadata().getCluster(), + "subject-reference", 1)).thenReturn(Mono.empty()); + + StepVerifier.create(schemaService.validateSchema(namespace, schema)) + .consumeNextWith(errors -> { + assertTrue(errors.contains("Invalid value wrongSubjectName for name: " + + "Value must end with -key or -value.")); + assertTrue(errors.contains("Reference subject-reference version 1 not found.")); + }) + .verifyComplete(); + } + private Namespace buildNamespace() { return Namespace.builder() .metadata(ObjectMeta.builder() @@ -279,7 +342,7 @@ private Namespace buildNamespace() { private Schema buildSchema() { return Schema.builder() .metadata(ObjectMeta.builder() - .name("prefix.schema-one") + .name("prefix.schema-one-value") .build()) .spec(Schema.SchemaSpec.builder() .compatibility(Schema.Compatibility.BACKWARD) @@ -308,6 +371,11 @@ private SchemaResponse buildSchemaResponse(String subject) { + "\"doc\":\"Last name of the person\"},{\"name\":\"dateOfBirth\",\"type\":[\"null\"," + "{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null," + "\"doc\":\"Date of birth of the person\"}]}") + .references(List.of(Schema.SchemaSpec.Reference.builder() + .name("reference") + .subject("subject-reference") + .version(1) + .build())) .build(); }