From c4be16bddbac8363df30b8d9345fcff1b5408713 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Greffier?= Date: Sun, 21 Jan 2024 19:09:40 +0100 Subject: [PATCH] Simplify references validation --- .../ns4kafka/services/ConnectorService.java | 12 +++---- .../ns4kafka/services/SchemaService.java | 32 +++++++++---------- .../services/ConnectorServiceTest.java | 9 +++--- .../ns4kafka/services/SchemaServiceTest.java | 18 ++++++++--- 4 files changed, 39 insertions(+), 32 deletions(-) diff --git a/src/main/java/com/michelin/ns4kafka/services/ConnectorService.java b/src/main/java/com/michelin/ns4kafka/services/ConnectorService.java index 1be6cf98..967d259c 100644 --- a/src/main/java/com/michelin/ns4kafka/services/ConnectorService.java +++ b/src/main/java/com/michelin/ns4kafka/services/ConnectorService.java @@ -121,15 +121,15 @@ public Mono> validateLocally(Namespace namespace, Connector connect .collect(Collectors.joining(", ")); return Mono.just( List.of("Invalid value " + connector.getSpec().getConnectCluster() - + " for spec.connectCluster: Value must be one of [" + allowedConnectClusters + "]")); + + " for spec.connectCluster: Value must be one of [" + allowedConnectClusters + "].")); } - // If class doesn't exist, no need to go further + // If class does not exist, no need to go further if (StringUtils.isEmpty(connector.getSpec().getConfig().get(CONNECTOR_CLASS))) { - return Mono.just(List.of("Invalid value for spec.config.'connector.class': Value must be non-null")); + return Mono.just(List.of("Invalid value for spec.config.'connector.class': Value must be non-null.")); } - // Connector type exists on this target connect cluster ? + // Connector type exists on this target connect cluster return kafkaConnectClient.connectPlugins(namespace.getMetadata().getCluster(), connector.getSpec().getConnectCluster()) .map(connectorPluginInfos -> { @@ -141,8 +141,8 @@ public Mono> validateLocally(Namespace namespace, Connector connect .findFirst(); if (connectorType.isEmpty()) { - return List.of("Failed to find any class that implements Connector and which name matches " - + connector.getSpec().getConfig().get(CONNECTOR_CLASS)); + return List.of("Failed to find any class that implements connector and which name matches " + + connector.getSpec().getConfig().get(CONNECTOR_CLASS) + "."); } return namespace.getSpec().getConnectValidator() != null diff --git a/src/main/java/com/michelin/ns4kafka/services/SchemaService.java b/src/main/java/com/michelin/ns4kafka/services/SchemaService.java index ef1407cd..35b15e8e 100644 --- a/src/main/java/com/michelin/ns4kafka/services/SchemaService.java +++ b/src/main/java/com/michelin/ns4kafka/services/SchemaService.java @@ -171,11 +171,11 @@ public Mono getLatestSubject(Namespace namespace, String subject) { /** * Validate a schema when it is created or updated. * - * @param ns The namespace - * @param schema The schema to validate + * @param namespace The namespace + * @param schema The schema to validate * @return A list of errors */ - public Mono> validateSchema(Namespace ns, Schema schema) { + public Mono> validateSchema(Namespace namespace, Schema schema) { return Mono.defer(() -> { List validationErrors = new ArrayList<>(); @@ -188,7 +188,7 @@ public Mono> validateSchema(Namespace ns, Schema schema) { } if (!CollectionUtils.isEmpty(schema.getSpec().getReferences())) { - return Mono.zip(validateReferences(ns, schema), Mono.just(validationErrors), + return Mono.zip(validateReferences(namespace, schema), Mono.just(validationErrors), (referenceErrors, errors) -> { errors.addAll(referenceErrors); return errors; @@ -207,19 +207,17 @@ public Mono> validateSchema(Namespace ns, Schema schema) { * @return A list of errors */ private Mono> validateReferences(Namespace ns, Schema schema) { - return Flux.concat(schema.getSpec().getReferences() - .stream() - .map(reference -> getSubject(ns, reference.getSubject(), reference.getVersion()) - .map(Optional::of) - .defaultIfEmpty(Optional.empty()) - .mapNotNull(schemaOptional -> { - if (schemaOptional.isEmpty()) { - return String.format("Reference %s with version %s not found.", - reference.getSubject(), reference.getVersion()); - } - return null; - })) - .toList()) + return Flux.fromIterable(schema.getSpec().getReferences()) + .flatMap(reference -> getSubject(ns, reference.getSubject(), reference.getVersion()) + .map(Optional::of) + .defaultIfEmpty(Optional.empty()) + .mapNotNull(schemaOptional -> { + if (schemaOptional.isEmpty()) { + return String.format("Reference %s version %s not found.", + reference.getSubject(), reference.getVersion()); + } + return null; + })) .collectList(); } diff --git a/src/test/java/com/michelin/ns4kafka/services/ConnectorServiceTest.java b/src/test/java/com/michelin/ns4kafka/services/ConnectorServiceTest.java index 67da839e..6d71c67a 100644 --- a/src/test/java/com/michelin/ns4kafka/services/ConnectorServiceTest.java +++ b/src/test/java/com/michelin/ns4kafka/services/ConnectorServiceTest.java @@ -359,7 +359,7 @@ void validateLocallyInvalidConnectCluster() { StepVerifier.create(connectorService.validateLocally(ns, connector)) .consumeNextWith(response -> { assertEquals(1, response.size()); - assertEquals("Invalid value wrong for spec.connectCluster: Value must be one of [local-name]", + assertEquals("Invalid value wrong for spec.connectCluster: Value must be one of [local-name].", response.get(0)); }) .verifyComplete(); @@ -374,6 +374,7 @@ void validateLocallyNoClassName() { .config(Map.of()) .build()) .build(); + Namespace ns = Namespace.builder() .metadata(ObjectMeta.builder() .name("namespace") @@ -387,7 +388,7 @@ void validateLocallyNoClassName() { StepVerifier.create(connectorService.validateLocally(ns, connector)) .consumeNextWith(response -> { assertEquals(1, response.size()); - assertEquals("Invalid value for spec.config.'connector.class': Value must be non-null", + assertEquals("Invalid value for spec.config.'connector.class': Value must be non-null.", response.get(0)); }) .verifyComplete(); @@ -419,8 +420,8 @@ void validateLocallyInvalidClassName() { .consumeNextWith(response -> { assertEquals(1, response.size()); assertEquals( - "Failed to find any class that implements Connector and which name matches " - + "org.apache.kafka.connect.file.FileStreamSinkConnector", + "Failed to find any class that implements connector and which name matches " + + "org.apache.kafka.connect.file.FileStreamSinkConnector.", response.get(0)); }) .verifyComplete(); diff --git a/src/test/java/com/michelin/ns4kafka/services/SchemaServiceTest.java b/src/test/java/com/michelin/ns4kafka/services/SchemaServiceTest.java index a65bd483..2652dde5 100644 --- a/src/test/java/com/michelin/ns4kafka/services/SchemaServiceTest.java +++ b/src/test/java/com/michelin/ns4kafka/services/SchemaServiceTest.java @@ -287,10 +287,18 @@ void isNamespaceOwnerOfSubjectTest() { void shouldValidateSchema() { Namespace namespace = buildNamespace(); Schema schema = buildSchema(); - + SchemaCompatibilityResponse compatibilityResponse = buildCompatibilityResponse(); + schema.getSpec().setReferences(List.of(Schema.SchemaSpec.Reference.builder() + .name("reference") + .subject("subject-reference") + .version(1) + .build())); + when(schemaRegistryClient.getSubject(namespace.getMetadata().getCluster(), "subject-reference", 1)) .thenReturn(Mono.just(buildSchemaResponse("subject-reference"))); + when(schemaRegistryClient.getCurrentCompatibilityBySubject(any(), any())).thenReturn( + Mono.just(compatibilityResponse)); StepVerifier.create(schemaService.validateSchema(namespace, schema)) .consumeNextWith(errors -> assertTrue(errors.isEmpty())) @@ -302,11 +310,11 @@ void shouldNotValidateSchema() { Namespace namespace = buildNamespace(); Schema schema = buildSchema(); schema.getMetadata().setName("wrongSubjectName"); - schema.getSpec().getReferences().add(Schema.SchemaSpec.Reference.builder() + schema.getSpec().setReferences(List.of(Schema.SchemaSpec.Reference.builder() .name("reference") .subject("subject-reference") .version(1) - .build()); + .build())); when(schemaRegistryClient.getSubject(namespace.getMetadata().getCluster(), "subject-reference", 1)).thenReturn(Mono.empty()); @@ -315,7 +323,7 @@ void shouldNotValidateSchema() { .consumeNextWith(errors -> { assertTrue(errors.contains("Invalid value wrongSubjectName for name: " + "Value must end with -key or -value.")); - assertTrue(errors.contains("Reference subject-reference with version 1 not found.")); + assertTrue(errors.contains("Reference subject-reference version 1 not found.")); }) .verifyComplete(); } @@ -334,7 +342,7 @@ private Namespace buildNamespace() { private Schema buildSchema() { return Schema.builder() .metadata(ObjectMeta.builder() - .name("prefix.schema-one") + .name("prefix.schema-one-value") .build()) .spec(Schema.SchemaSpec.builder() .compatibility(Schema.Compatibility.BACKWARD)