Skip to content

Commit

Permalink
Support unchanged status on schemas (#344)
Browse files Browse the repository at this point in the history
* Handling unchanged status for schema

* Fixing tests

* Fixing repositories

* Fixing checkstyle issues

* Light refactoring

* Check all the subject versions for unchanged schema

* Fix checkstyle

* Complete integration test for avro references

* Add schema with references in docker demo. Fix NPE on unknown reference. Refactor schema validation

* Simplify references validation

---------

Co-authored-by: Loïc Greffier <[email protected]>
  • Loading branch information
AlexisSouquiere and loicgreffier authored Jan 21, 2024
1 parent 27a37ca commit 0e369c5
Show file tree
Hide file tree
Showing 12 changed files with 701 additions and 92 deletions.
48 changes: 48 additions & 0 deletions .docker/resources/user/schema-refs.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
---
apiVersion: v1
kind: Schema
metadata:
name: abc.header-value
spec:
schema: |
{
"namespace": "io.github.michelin.ns4kafka.avro",
"type": "record",
"name": "KafkaHeader",
"fields": [
{
"name": "id",
"type": "long"
}
]
}
---
apiVersion: v1
kind: Schema
metadata:
name: abc.person-value
spec:
schema: |
{
"namespace": "io.github.michelin.ns4kafka.avro",
"type": "record",
"name": "KafkaPerson",
"fields": [
{
"name": "header",
"type": "io.github.michelin.ns4kafka.avro.KafkaHeader"
},
{
"name": "firstName",
"type": "string"
},
{
"name": "lastName",
"type": "string"
}
]
}
references:
- name: io.github.michelin.ns4kafka.avro.KafkaHeader
subject: abc.header-value
version: 1
5 changes: 5 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ group = "com.michelin.ns4kafka"

repositories {
mavenCentral()
maven {
url "https://packages.confluent.io/maven"
}
}

dependencies {
Expand All @@ -34,6 +37,8 @@ dependencies {
implementation("io.swagger.core.v3:swagger-annotations")
implementation("jakarta.annotation:jakarta.annotation-api")
implementation("jakarta.validation:jakarta.validation-api")
implementation('io.confluent:kafka-schema-registry-client:7.5.1')


compileOnly("org.projectlombok:lombok")
compileOnly("com.google.code.findbugs:jsr305") // https://github.com/micronaut-projects/micronaut-core/pull/5691
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ public Mono<HttpResponse<Connector>> apply(String namespace, @Valid @Body Connec
@QueryValue(defaultValue = "false") boolean dryrun) {
Namespace ns = getNamespace(namespace);

// Validate ownership
if (!connectorService.isNamespaceOwnerOfConnect(ns, connector.getMetadata().getName())) {
return Mono.error(new ResourceValidationException(
List.of(String.format(NAMESPACE_NOT_OWNER, connector.getMetadata().getName())),
Expand Down
106 changes: 55 additions & 51 deletions src/main/java/com/michelin/ns4kafka/controllers/SchemaController.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.michelin.ns4kafka.services.SchemaService;
import com.michelin.ns4kafka.utils.enums.ApplyStatus;
import com.michelin.ns4kafka.utils.exceptions.ResourceValidationException;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.HttpStatus;
import io.micronaut.http.annotation.Body;
Expand All @@ -25,8 +26,10 @@
import jakarta.inject.Inject;
import jakarta.validation.Valid;
import java.time.Instant;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand All @@ -38,6 +41,8 @@
@Controller(value = "/api/namespaces/{namespace}/schemas")
@ExecuteOn(TaskExecutors.IO)
public class SchemaController extends NamespacedResourceController {
private static final String NAMESPACE_NOT_OWNER = "Namespace not owner of this schema %s.";

@Inject
SchemaService schemaService;

Expand Down Expand Up @@ -83,66 +88,65 @@ public Mono<HttpResponse<Schema>> apply(String namespace, @Valid @Body Schema sc
@QueryValue(defaultValue = "false") boolean dryrun) {
Namespace ns = getNamespace(namespace);

// Validate TopicNameStrategy
// https://github.com/confluentinc/schema-registry/blob/master/schema-serializer/src/main/java/io/confluent/kafka/serializers/subject/TopicNameStrategy.java
if (!schema.getMetadata().getName().endsWith("-key") && !schema.getMetadata().getName().endsWith("-value")) {
return Mono.error(
new ResourceValidationException(List.of("Invalid value " + schema.getMetadata().getName()
+ " for name: subject must end with -key or -value"), schema.getKind(),
schema.getMetadata().getName()));
}

// Validate ownership
if (!schemaService.isNamespaceOwnerOfSubject(ns, schema.getMetadata().getName())) {
return Mono.error(
new ResourceValidationException(List.of(String.format("Namespace not owner of this schema %s.",
schema.getMetadata().getName())), schema.getKind(), schema.getMetadata().getName()));
return Mono.error(new ResourceValidationException(
List.of(String.format(NAMESPACE_NOT_OWNER, schema.getMetadata().getName())),
schema.getKind(), schema.getMetadata().getName()));
}

return schemaService
.validateSchemaCompatibility(ns.getMetadata().getCluster(), schema)
.flatMap(validationErrors -> {
if (!validationErrors.isEmpty()) {
return Mono.error(new ResourceValidationException(validationErrors, schema.getKind(),
return schemaService.validateSchema(ns, schema)
.flatMap(errors -> {
if (!errors.isEmpty()) {
return Mono.error(new ResourceValidationException(errors, schema.getKind(),
schema.getMetadata().getName()));
}

return schemaService
.getLatestSubject(ns, schema.getMetadata().getName())
.map(Optional::of)
.defaultIfEmpty(Optional.empty())
.flatMap(latestSubjectOptional -> {
schema.getMetadata().setCreationTimestamp(Date.from(Instant.now()));
schema.getMetadata().setCluster(ns.getMetadata().getCluster());
schema.getMetadata().setNamespace(ns.getMetadata().getName());
latestSubjectOptional.ifPresent(
value -> schema.getSpec().setCompatibility(value.getSpec().getCompatibility()));

if (dryrun) {
// Cannot compute the "unchanged" apply status before getting the ID at registration
return Mono.just(formatHttpResponse(schema,
latestSubjectOptional.isPresent() ? ApplyStatus.changed : ApplyStatus.created));
return schemaService.getAllSubjectVersions(ns, schema.getMetadata().getName())
.collectList()
.flatMap(subjects -> {
// If new schema matches any of the existing schemas, return unchanged
boolean unchanged = subjects.stream().anyMatch(subject -> {
var actualSchema = new AvroSchema(subject.getSpec().getSchema(),
schemaService.getReferences(subject), schemaService.getSchemaReferences(subject, ns),
null);
var newSchema = new AvroSchema(schema.getSpec().getSchema(),
schemaService.getReferences(schema), schemaService.getSchemaReferences(schema, ns),
null);

return Objects.equals(newSchema.canonicalString(), actualSchema.canonicalString())
&& Objects.equals(newSchema.references(), actualSchema.references());
});

if (unchanged) {
return Mono.just(formatHttpResponse(schema, ApplyStatus.unchanged));
}

return schemaService
.register(ns, schema)
.map(id -> {
ApplyStatus status;

if (latestSubjectOptional.isEmpty()) {
status = ApplyStatus.created;
sendEventLog(schema.getKind(), schema.getMetadata(), status, null,
schema.getSpec());
} else if (id > latestSubjectOptional.get().getSpec().getId()) {
status = ApplyStatus.changed;
sendEventLog(schema.getKind(), schema.getMetadata(), status,
latestSubjectOptional.get().getSpec(),
schema.getSpec());
} else {
status = ApplyStatus.unchanged;
.validateSchemaCompatibility(ns.getMetadata().getCluster(), schema)
.flatMap(validationErrors -> {
if (!validationErrors.isEmpty()) {
return Mono.error(new ResourceValidationException(
validationErrors, schema.getKind(), schema.getMetadata().getName()));
}

return formatHttpResponse(schema, status);
schema.getMetadata().setCreationTimestamp(Date.from(Instant.now()));
schema.getMetadata().setCluster(ns.getMetadata().getCluster());
schema.getMetadata().setNamespace(ns.getMetadata().getName());

ApplyStatus status = subjects.isEmpty() ? ApplyStatus.created : ApplyStatus.changed;
if (dryrun) {
return Mono.just(formatHttpResponse(schema, status));
}

return schemaService
.register(ns, schema)
.map(id -> {
sendEventLog(schema.getKind(), schema.getMetadata(), status,
subjects.isEmpty() ? null : subjects.stream()
.max(Comparator.comparingInt((Schema s) -> s.getSpec().getId())),
schema.getSpec());

return formatHttpResponse(schema, status);
});
});
});
});
Expand All @@ -165,7 +169,7 @@ public Mono<HttpResponse<Void>> deleteSubject(String namespace, @PathVariable St
// Validate ownership
if (!schemaService.isNamespaceOwnerOfSubject(ns, subject)) {
return Mono.error(new ResourceValidationException(
List.of(String.format("Namespace not owner of this schema %s.", subject)),
List.of(String.format(NAMESPACE_NOT_OWNER, subject)),
AccessControlEntry.ResourceType.SCHEMA.toString(), subject));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,15 @@ public Mono<List<String>> validateLocally(Namespace namespace, Connector connect
.collect(Collectors.joining(", "));
return Mono.just(
List.of("Invalid value " + connector.getSpec().getConnectCluster()
+ " for spec.connectCluster: Value must be one of [" + allowedConnectClusters + "]"));
+ " for spec.connectCluster: Value must be one of [" + allowedConnectClusters + "]."));
}

// If class doesn't exist, no need to go further
// If class does not exist, no need to go further
if (StringUtils.isEmpty(connector.getSpec().getConfig().get(CONNECTOR_CLASS))) {
return Mono.just(List.of("Invalid value for spec.config.'connector.class': Value must be non-null"));
return Mono.just(List.of("Invalid value for spec.config.'connector.class': Value must be non-null."));
}

// Connector type exists on this target connect cluster ?
// Connector type exists on this target connect cluster
return kafkaConnectClient.connectPlugins(namespace.getMetadata().getCluster(),
connector.getSpec().getConnectCluster())
.map(connectorPluginInfos -> {
Expand All @@ -141,8 +141,8 @@ public Mono<List<String>> validateLocally(Namespace namespace, Connector connect
.findFirst();

if (connectorType.isEmpty()) {
return List.of("Failed to find any class that implements Connector and which name matches "
+ connector.getSpec().getConfig().get(CONNECTOR_CLASS));
return List.of("Failed to find any class that implements connector and which name matches "
+ connector.getSpec().getConfig().get(CONNECTOR_CLASS) + ".");
}

return namespace.getSpec().getConnectValidator() != null
Expand Down
Loading

0 comments on commit 0e369c5

Please sign in to comment.