diff --git a/src/main/java/com/michelin/ns4kafka/controller/SchemaController.java b/src/main/java/com/michelin/ns4kafka/controller/SchemaController.java index 3b78e73e..949ab98d 100644 --- a/src/main/java/com/michelin/ns4kafka/controller/SchemaController.java +++ b/src/main/java/com/michelin/ns4kafka/controller/SchemaController.java @@ -146,13 +146,14 @@ public Mono> apply(String namespace, @Valid @Body Schema sc * * @param namespace The namespace * @param subject The subject - * @param version The version of the schema to delete (optional) + * @param version The version of the schema to delete * @param dryrun Run in dry mode or not * @return A HTTP response */ @Status(HttpStatus.NO_CONTENT) @Delete("/{subject}") - public Mono> delete(String namespace, @PathVariable String subject, + public Mono> delete(String namespace, + @PathVariable String subject, @QueryValue Optional version, @QueryValue(defaultValue = "false") boolean dryrun) { Namespace ns = getNamespace(namespace); diff --git a/src/main/java/com/michelin/ns4kafka/service/client/schema/SchemaRegistryClient.java b/src/main/java/com/michelin/ns4kafka/service/client/schema/SchemaRegistryClient.java index 0b26de8a..0060a20b 100644 --- a/src/main/java/com/michelin/ns4kafka/service/client/schema/SchemaRegistryClient.java +++ b/src/main/java/com/michelin/ns4kafka/service/client/schema/SchemaRegistryClient.java @@ -143,7 +143,7 @@ public Mono deleteSubject(String kafkaCluster, String subject, boolea * * @param kafkaCluster The Kafka cluster * @param subject The subject - * @param version The version + * @param version The version * @param hardDelete Should the subject be hard deleted or not * @return The version of the deleted subject */ diff --git a/src/test/java/com/michelin/ns4kafka/controller/SchemaControllerTest.java b/src/test/java/com/michelin/ns4kafka/controller/SchemaControllerTest.java index d116c5a0..8374f801 100644 --- a/src/test/java/com/michelin/ns4kafka/controller/SchemaControllerTest.java +++ b/src/test/java/com/michelin/ns4kafka/controller/SchemaControllerTest.java @@ -447,19 +447,19 @@ void shouldNotDeleteAllSchemaVersionsWhenNotOwner() { Namespace namespace = buildNamespace(); when(namespaceService.findByName("myNamespace")) - .thenReturn(Optional.of(namespace)); + .thenReturn(Optional.of(namespace)); when(schemaService.isNamespaceOwnerOfSubject(namespace, "prefix.subject-value")) - .thenReturn(false); + .thenReturn(false); StepVerifier.create(schemaController.delete("myNamespace", "prefix.subject-value", Optional.empty(), false)) - .consumeErrorWith(error -> { - assertEquals(ResourceValidationException.class, error.getClass()); - assertEquals(1, ((ResourceValidationException) error).getValidationErrors().size()); - assertEquals("Invalid value \"prefix.subject-value\" for field \"name\": " - + "namespace is not owner of the resource.", - ((ResourceValidationException) error).getValidationErrors().getFirst()); - }) - .verify(); + .consumeErrorWith(error -> { + assertEquals(ResourceValidationException.class, error.getClass()); + assertEquals(1, ((ResourceValidationException) error).getValidationErrors().size()); + assertEquals("Invalid value \"prefix.subject-value\" for field \"name\": " + + "namespace is not owner of the resource.", + ((ResourceValidationException) error).getValidationErrors().getFirst()); + }) + .verify(); verify(schemaService, never()).getLatestSubject(any(), any()); verify(schemaService, never()).deleteAllVersions(any(), any()); @@ -561,7 +561,7 @@ void shouldDeleteLatestSchemaVersionBySpecifyingLatest() { .consumeNextWith(response -> assertEquals(HttpStatus.NO_CONTENT, response.getStatus())) .verifyComplete(); - verify(applicationEventPublisher, times(1)).publishEvent(any()); + verify(applicationEventPublisher).publishEvent(any()); } @Test @@ -588,7 +588,7 @@ void shouldDeleteLatestSchemaVersionBySpecifyingVersion() { .consumeNextWith(response -> assertEquals(HttpStatus.NO_CONTENT, response.getStatus())) .verifyComplete(); - verify(applicationEventPublisher, times(1)).publishEvent(any()); + verify(applicationEventPublisher).publishEvent(any()); } @Test @@ -614,7 +614,7 @@ void shouldDeleteLastSchemaVersion() { .consumeNextWith(response -> assertEquals(HttpStatus.NO_CONTENT, response.getStatus())) .verifyComplete(); - verify(applicationEventPublisher, times(1)).publishEvent(any()); + verify(applicationEventPublisher).publishEvent(any()); } @Test diff --git a/src/test/java/com/michelin/ns4kafka/integration/SchemaIntegrationTest.java b/src/test/java/com/michelin/ns4kafka/integration/SchemaIntegrationTest.java index 7c27a854..13a3036c 100644 --- a/src/test/java/com/michelin/ns4kafka/integration/SchemaIntegrationTest.java +++ b/src/test/java/com/michelin/ns4kafka/integration/SchemaIntegrationTest.java @@ -766,12 +766,110 @@ void shouldRegisterSameSchemaTwice() { assertNotNull(schemaAfterPostOnRegistry.id()); assertEquals(2, schemaAfterPostOnRegistry.version()); assertEquals("ns1-subject3-value", schemaAfterPostOnRegistry.subject()); + } + + @Test + void shouldDeleteSchema() { + Schema schemaV1 = Schema.builder() + .metadata(Metadata.builder() + .name("ns1-subject4-value") + .build()) + .spec(Schema.SchemaSpec.builder() + .schema( + "{\"namespace\":\"com.michelin.kafka.producer.showcase.avro\",\"type\":\"record\"," + + "\"name\":\"PersonAvro\",\"fields\":[" + + "{\"name\":\"field1\",\"type\":[\"null\",\"string\"]}," + + "{\"name\":\"field2\",\"type\":[\"null\",\"string\"]}]}") + .build()) + .build(); + + // Register V1 schema + var createV1Response = ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.POST, "/api/namespaces/ns1/schemas") + .bearerAuth(token) + .body(schemaV1), Schema.class); + + assertEquals("created", createV1Response.header("X-Ns4kafka-Result")); + + Schema schemaV2 = Schema.builder() + .metadata(Metadata.builder() + .name("ns1-subject4-value") + .build()) + .spec(Schema.SchemaSpec.builder() + .schema( + "{\"namespace\":\"com.michelin.kafka.producer.showcase.avro\",\"type\":\"record\"," + + "\"name\":\"PersonAvro\",\"fields\":[" + + "{\"name\":\"field1\",\"type\":[\"null\",\"string\"]}," + + "{\"name\":\"field2\",\"type\":[\"null\",\"string\"]}]}") + .build()) + .build(); + + // Register V2 schema + var createV2Response = ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.POST, "/api/namespaces/ns1/schemas") + .bearerAuth(token) + .body(schemaV2), Schema.class); + + assertEquals("created", createV2Response.header("X-Ns4kafka-Result")); + + Schema schemaV3 = Schema.builder() + .metadata(Metadata.builder() + .name("ns1-subject4-value") + .build()) + .spec(Schema.SchemaSpec.builder() + .schema( + "{\"namespace\":\"com.michelin.kafka.producer.showcase.avro\",\"type\":\"record\"," + + "\"name\":\"PersonAvro\",\"fields\":[" + + "{\"name\":\"field1\",\"type\":[\"null\",\"string\"]}," + + "{\"name\":\"field2\",\"type\":[\"null\",\"string\"]}," + + "{\"name\":\"field3\",\"type\":[\"null\",\"string\"]}]}") + .build()) + .build(); + + // Register V3 schema + var createV3Response = ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.POST, "/api/namespaces/ns1/schemas") + .bearerAuth(token) + .body(schemaV3), Schema.class); + + assertEquals("created", createV3Response.header("X-Ns4kafka-Result")); + + Schema schemaV4 = Schema.builder() + .metadata(Metadata.builder() + .name("ns1-subject4-value") + .build()) + .spec(Schema.SchemaSpec.builder() + .schema( + "{\"namespace\":\"com.michelin.kafka.producer.showcase.avro\",\"type\":\"record\"," + + "\"name\":\"PersonAvro\",\"fields\":[" + + "{\"name\":\"field1\",\"type\":[\"null\",\"string\"]}," + + "{\"name\":\"field2\",\"type\":[\"null\",\"string\"]}," + + "{\"name\":\"field3\",\"type\":[\"null\",\"string\"]}," + + "{\"name\":\"field4\",\"type\":[\"null\",\"string\"]}]}") + .build()) + .build(); + + // Register V4 schema + var createV4Response = ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.POST, "/api/namespaces/ns1/schemas") + .bearerAuth(token) + .body(schemaV4), Schema.class); + + assertEquals("created", createV4Response.header("X-Ns4kafka-Result")); // Delete latest schema version var deleteLatestVersionResponse = ns4KafkaClient .toBlocking() .exchange(HttpRequest - .create(HttpMethod.DELETE, "/api/namespaces/ns1/schemas/ns1-subject3-value?version=latest") + .create(HttpMethod.DELETE, "/api/namespaces/ns1/schemas/ns1-subject4-value?version=latest") .bearerAuth(token), Schema.class); assertEquals(HttpStatus.NO_CONTENT, deleteLatestVersionResponse.getStatus()); @@ -780,26 +878,18 @@ void shouldRegisterSameSchemaTwice() { var getSchemaAfterLatestVersionDeletionResponse = ns4KafkaClient .toBlocking() .exchange(HttpRequest - .create(HttpMethod.GET, "/api/namespaces/ns1/schemas/ns1-subject3-value") + .create(HttpMethod.GET, "/api/namespaces/ns1/schemas/ns1-subject4-value") .bearerAuth(token), Schema.class); - // Expects v1 is returned by ns4kafka + // Expects v3 is returned by ns4kafka assertTrue(getSchemaAfterLatestVersionDeletionResponse.getBody().isPresent()); - assertEquals(1, getSchemaAfterLatestVersionDeletionResponse.getBody().get().getSpec().getVersion()); - - // Post the v2 schema again - ns4KafkaClient - .toBlocking() - .exchange(HttpRequest - .create(HttpMethod.POST, "/api/namespaces/ns1/schemas") - .bearerAuth(token) - .body(sameSchemaWithSwappedFields), Schema.class); + assertEquals(3, getSchemaAfterLatestVersionDeletionResponse.getBody().get().getSpec().getVersion()); // Delete old schema version var deleteOldVersionResponse = ns4KafkaClient .toBlocking() .exchange(HttpRequest - .create(HttpMethod.DELETE, "/api/namespaces/ns1/schemas/ns1-subject3-value?version=1") + .create(HttpMethod.DELETE, "/api/namespaces/ns1/schemas/ns1-subject4-value?version=1") .bearerAuth(token), Schema.class); assertEquals(HttpStatus.NO_CONTENT, deleteOldVersionResponse.getStatus()); @@ -808,11 +898,30 @@ void shouldRegisterSameSchemaTwice() { var getSchemaAfterOldVersionDeletionResponse = ns4KafkaClient .toBlocking() .exchange(HttpRequest - .create(HttpMethod.GET, "/api/namespaces/ns1/schemas/ns1-subject3-value") + .create(HttpMethod.GET, "/api/namespaces/ns1/schemas/ns1-subject4-value") .bearerAuth(token), Schema.class); - // Expects v2 as returned schema + // Expects v3 as returned schema assertTrue(getSchemaAfterOldVersionDeletionResponse.getBody().isPresent()); - assertEquals(2, getSchemaAfterOldVersionDeletionResponse.getBody().get().getSpec().getVersion()); + assertEquals(3, getSchemaAfterOldVersionDeletionResponse.getBody().get().getSpec().getVersion()); + + // Delete all remaining schema versions + var deleteAllVersionsResponse = ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.DELETE, "/api/namespaces/ns1/schemas/ns1-subject4-value") + .bearerAuth(token), Schema.class); + + assertEquals(HttpStatus.NO_CONTENT, deleteAllVersionsResponse.getStatus()); + + // Get all schemas + var getSchemaAfterAllVersionsDeletionResponse = ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.GET, "/api/namespaces/ns1/schemas/ns1-subject4-value") + .bearerAuth(token), Schema.class); + + // Expects no returned schema + assertTrue(getSchemaAfterAllVersionsDeletionResponse.getBody().isEmpty()); } }