Skip to content

Commit

Permalink
Simplify references validation
Browse files Browse the repository at this point in the history
  • Loading branch information
loicgreffier committed Jan 21, 2024
1 parent 8b8f79e commit c4be16b
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,15 @@ public Mono<List<String>> validateLocally(Namespace namespace, Connector connect
.collect(Collectors.joining(", "));
return Mono.just(
List.of("Invalid value " + connector.getSpec().getConnectCluster()
+ " for spec.connectCluster: Value must be one of [" + allowedConnectClusters + "]"));
+ " for spec.connectCluster: Value must be one of [" + allowedConnectClusters + "]."));
}

// If class doesn't exist, no need to go further
// If class does not exist, no need to go further
if (StringUtils.isEmpty(connector.getSpec().getConfig().get(CONNECTOR_CLASS))) {
return Mono.just(List.of("Invalid value for spec.config.'connector.class': Value must be non-null"));
return Mono.just(List.of("Invalid value for spec.config.'connector.class': Value must be non-null."));
}

// Connector type exists on this target connect cluster ?
// Connector type exists on this target connect cluster
return kafkaConnectClient.connectPlugins(namespace.getMetadata().getCluster(),
connector.getSpec().getConnectCluster())
.map(connectorPluginInfos -> {
Expand All @@ -141,8 +141,8 @@ public Mono<List<String>> validateLocally(Namespace namespace, Connector connect
.findFirst();

if (connectorType.isEmpty()) {
return List.of("Failed to find any class that implements Connector and which name matches "
+ connector.getSpec().getConfig().get(CONNECTOR_CLASS));
return List.of("Failed to find any class that implements connector and which name matches "
+ connector.getSpec().getConfig().get(CONNECTOR_CLASS) + ".");
}

return namespace.getSpec().getConnectValidator() != null
Expand Down
32 changes: 15 additions & 17 deletions src/main/java/com/michelin/ns4kafka/services/SchemaService.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,11 @@ public Mono<Schema> getLatestSubject(Namespace namespace, String subject) {
/**
* Validate a schema when it is created or updated.
*
* @param ns The namespace
* @param schema The schema to validate
* @param namespace The namespace
* @param schema The schema to validate
* @return A list of errors
*/
public Mono<List<String>> validateSchema(Namespace ns, Schema schema) {
public Mono<List<String>> validateSchema(Namespace namespace, Schema schema) {
return Mono.defer(() -> {
List<String> validationErrors = new ArrayList<>();

Expand All @@ -188,7 +188,7 @@ public Mono<List<String>> validateSchema(Namespace ns, Schema schema) {
}

if (!CollectionUtils.isEmpty(schema.getSpec().getReferences())) {
return Mono.zip(validateReferences(ns, schema), Mono.just(validationErrors),
return Mono.zip(validateReferences(namespace, schema), Mono.just(validationErrors),
(referenceErrors, errors) -> {
errors.addAll(referenceErrors);
return errors;
Expand All @@ -207,19 +207,17 @@ public Mono<List<String>> validateSchema(Namespace ns, Schema schema) {
* @return A list of errors
*/
private Mono<List<String>> validateReferences(Namespace ns, Schema schema) {
return Flux.concat(schema.getSpec().getReferences()
.stream()
.map(reference -> getSubject(ns, reference.getSubject(), reference.getVersion())
.map(Optional::of)
.defaultIfEmpty(Optional.empty())
.mapNotNull(schemaOptional -> {
if (schemaOptional.isEmpty()) {
return String.format("Reference %s with version %s not found.",
reference.getSubject(), reference.getVersion());
}
return null;
}))
.toList())
return Flux.fromIterable(schema.getSpec().getReferences())
.flatMap(reference -> getSubject(ns, reference.getSubject(), reference.getVersion())
.map(Optional::of)
.defaultIfEmpty(Optional.empty())
.mapNotNull(schemaOptional -> {
if (schemaOptional.isEmpty()) {
return String.format("Reference %s version %s not found.",
reference.getSubject(), reference.getVersion());
}
return null;
}))
.collectList();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ void validateLocallyInvalidConnectCluster() {
StepVerifier.create(connectorService.validateLocally(ns, connector))
.consumeNextWith(response -> {
assertEquals(1, response.size());
assertEquals("Invalid value wrong for spec.connectCluster: Value must be one of [local-name]",
assertEquals("Invalid value wrong for spec.connectCluster: Value must be one of [local-name].",
response.get(0));
})
.verifyComplete();
Expand All @@ -374,6 +374,7 @@ void validateLocallyNoClassName() {
.config(Map.of())
.build())
.build();

Namespace ns = Namespace.builder()
.metadata(ObjectMeta.builder()
.name("namespace")
Expand All @@ -387,7 +388,7 @@ void validateLocallyNoClassName() {
StepVerifier.create(connectorService.validateLocally(ns, connector))
.consumeNextWith(response -> {
assertEquals(1, response.size());
assertEquals("Invalid value for spec.config.'connector.class': Value must be non-null",
assertEquals("Invalid value for spec.config.'connector.class': Value must be non-null.",
response.get(0));
})
.verifyComplete();
Expand Down Expand Up @@ -419,8 +420,8 @@ void validateLocallyInvalidClassName() {
.consumeNextWith(response -> {
assertEquals(1, response.size());
assertEquals(
"Failed to find any class that implements Connector and which name matches "
+ "org.apache.kafka.connect.file.FileStreamSinkConnector",
"Failed to find any class that implements connector and which name matches "
+ "org.apache.kafka.connect.file.FileStreamSinkConnector.",
response.get(0));
})
.verifyComplete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,10 +287,18 @@ void isNamespaceOwnerOfSubjectTest() {
void shouldValidateSchema() {
Namespace namespace = buildNamespace();
Schema schema = buildSchema();

SchemaCompatibilityResponse compatibilityResponse = buildCompatibilityResponse();
schema.getSpec().setReferences(List.of(Schema.SchemaSpec.Reference.builder()
.name("reference")
.subject("subject-reference")
.version(1)
.build()));

when(schemaRegistryClient.getSubject(namespace.getMetadata().getCluster(),
"subject-reference", 1))
.thenReturn(Mono.just(buildSchemaResponse("subject-reference")));
when(schemaRegistryClient.getCurrentCompatibilityBySubject(any(), any())).thenReturn(
Mono.just(compatibilityResponse));

StepVerifier.create(schemaService.validateSchema(namespace, schema))
.consumeNextWith(errors -> assertTrue(errors.isEmpty()))
Expand All @@ -302,11 +310,11 @@ void shouldNotValidateSchema() {
Namespace namespace = buildNamespace();
Schema schema = buildSchema();
schema.getMetadata().setName("wrongSubjectName");
schema.getSpec().getReferences().add(Schema.SchemaSpec.Reference.builder()
schema.getSpec().setReferences(List.of(Schema.SchemaSpec.Reference.builder()
.name("reference")
.subject("subject-reference")
.version(1)
.build());
.build()));

when(schemaRegistryClient.getSubject(namespace.getMetadata().getCluster(),
"subject-reference", 1)).thenReturn(Mono.empty());
Expand All @@ -315,7 +323,7 @@ void shouldNotValidateSchema() {
.consumeNextWith(errors -> {
assertTrue(errors.contains("Invalid value wrongSubjectName for name: "
+ "Value must end with -key or -value."));
assertTrue(errors.contains("Reference subject-reference with version 1 not found."));
assertTrue(errors.contains("Reference subject-reference version 1 not found."));
})
.verifyComplete();
}
Expand All @@ -334,7 +342,7 @@ private Namespace buildNamespace() {
private Schema buildSchema() {
return Schema.builder()
.metadata(ObjectMeta.builder()
.name("prefix.schema-one")
.name("prefix.schema-one-value")
.build())
.spec(Schema.SchemaSpec.builder()
.compatibility(Schema.Compatibility.BACKWARD)
Expand Down

0 comments on commit c4be16b

Please sign in to comment.