From b3ce5548219bed8cf070c8585511f15582c0e3cc Mon Sep 17 00:00:00 2001 From: thcai Date: Wed, 25 Sep 2024 17:44:40 +0200 Subject: [PATCH 1/7] feature --- .../ns4kafka/controller/SchemaController.java | 54 ++++++++++++++++++- .../ns4kafka/service/SchemaService.java | 4 +- .../client/schema/SchemaRegistryClient.java | 2 +- 3 files changed, 55 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/michelin/ns4kafka/controller/SchemaController.java b/src/main/java/com/michelin/ns4kafka/controller/SchemaController.java index 5538c264..41da1a30 100644 --- a/src/main/java/com/michelin/ns4kafka/controller/SchemaController.java +++ b/src/main/java/com/michelin/ns4kafka/controller/SchemaController.java @@ -14,6 +14,7 @@ import com.michelin.ns4kafka.util.exception.ResourceValidationException; import io.micronaut.http.HttpResponse; import io.micronaut.http.HttpStatus; +import io.micronaut.http.MutableHttpResponse; import io.micronaut.http.annotation.Body; import io.micronaut.http.annotation.Controller; import io.micronaut.http.annotation.Delete; @@ -146,16 +147,65 @@ public Mono> apply(String namespace, @Valid @Body Schema sc } /** - * Delete all schema versions under the given subject, or a specific version of the schema if specified. + * Delete all schema versions or a specific schema version if specified, under all given subjects. + * + * @param namespace The namespace + * @param name The subject name parameter + * @param versionOptional The version of the schemas to delete + * @param dryrun Run in dry mode or not? + * @return A HTTP response + */ + @Status(HttpStatus.NO_CONTENT) + @Delete + public Mono> bulkDelete(String namespace, + @QueryValue(defaultValue = "*") String name, + @QueryValue("version") Optional versionOptional, + @QueryValue(defaultValue = "false") boolean dryrun) { + Namespace ns = getNamespace(namespace); + + return schemaService.findByWildcardName(ns, name) + .flatMap(schema -> versionOptional + .map(version -> schemaService.getSubjectByVersion(ns, schema.getMetadata().getName(), version)) + .orElseGet(() -> schemaService.getSubjectLatestVersion(ns, schema.getMetadata().getName())) + .map(Optional::of) + .defaultIfEmpty(Optional.empty())) + .collectList() + .flatMap(schemasList -> schemasList.isEmpty() || schemasList.stream().anyMatch(Optional::isEmpty) + ? Mono.just(HttpResponse.notFound()) : + (dryrun ? Mono.just(HttpResponse.noContent()) : + Flux.fromIterable(schemasList) + .filter(Optional::isPresent) + .map(Optional::get) + .flatMap(schema -> (versionOptional.isEmpty() + ? schemaService.deleteAllVersions(ns, schema.getMetadata().getName()) : + schemaService.deleteVersion(ns, schema.getMetadata().getName(), versionOptional.get())) + .flatMap(deletedVersionIds -> { + sendEventLog( + schema, + ApplyStatus.deleted, + schema.getSpec(), + null, + versionOptional.map(v -> String.valueOf(deletedVersionIds)) + .orElse(EMPTY_STRING) + ); + return Mono.just(HttpResponse.noContent()); + })) + .then(Mono.just(HttpResponse.noContent())))); + } + + /** + * Delete all schema versions or a specific schema version if specified, under the given subject. * * @param namespace The namespace * @param subject The subject * @param versionOptional The version of the schema to delete - * @param dryrun Run in dry mode or not + * @param dryrun Run in dry mode or not? * @return A HTTP response + * @deprecated use bulkDelete(String, String name, Optional version) instead. */ @Status(HttpStatus.NO_CONTENT) @Delete("/{subject}") + @Deprecated(since = "1.13.0") public Mono> delete(String namespace, @PathVariable String subject, @QueryValue("version") Optional versionOptional, diff --git a/src/main/java/com/michelin/ns4kafka/service/SchemaService.java b/src/main/java/com/michelin/ns4kafka/service/SchemaService.java index 597abc25..f659c5cd 100644 --- a/src/main/java/com/michelin/ns4kafka/service/SchemaService.java +++ b/src/main/java/com/michelin/ns4kafka/service/SchemaService.java @@ -230,7 +230,7 @@ public Mono register(Namespace namespace, Schema schema) { * * @param namespace The namespace * @param subject The subject to delete - * @return The list of deleted versions + * @return The list of deleted schema versions */ public Mono deleteAllVersions(Namespace namespace, String subject) { return schemaRegistryClient @@ -246,7 +246,7 @@ public Mono deleteAllVersions(Namespace namespace, String subject) { * @param namespace The namespace * @param subject The subject * @param version The version of the schema to delete - * @return The latest subject after deletion + * @return The deleted schema version */ public Mono deleteVersion(Namespace namespace, String subject, String version) { return schemaRegistryClient diff --git a/src/main/java/com/michelin/ns4kafka/service/client/schema/SchemaRegistryClient.java b/src/main/java/com/michelin/ns4kafka/service/client/schema/SchemaRegistryClient.java index 0060a20b..288d5ab9 100644 --- a/src/main/java/com/michelin/ns4kafka/service/client/schema/SchemaRegistryClient.java +++ b/src/main/java/com/michelin/ns4kafka/service/client/schema/SchemaRegistryClient.java @@ -139,7 +139,7 @@ public Mono deleteSubject(String kafkaCluster, String subject, boolea } /** - * Delete a subject. + * Delete schema version under a subject. * * @param kafkaCluster The Kafka cluster * @param subject The subject From 2fc68c9a93562f169219ff112e7a21186ea077e7 Mon Sep 17 00:00:00 2001 From: thcai Date: Mon, 30 Sep 2024 13:49:12 +0200 Subject: [PATCH 2/7] fix return type & unit tests --- .../ns4kafka/controller/SchemaController.java | 50 ++++--- .../controller/SchemaControllerTest.java | 125 ++++++++++++++++++ 2 files changed, 153 insertions(+), 22 deletions(-) diff --git a/src/main/java/com/michelin/ns4kafka/controller/SchemaController.java b/src/main/java/com/michelin/ns4kafka/controller/SchemaController.java index 41da1a30..55a38400 100644 --- a/src/main/java/com/michelin/ns4kafka/controller/SchemaController.java +++ b/src/main/java/com/michelin/ns4kafka/controller/SchemaController.java @@ -157,7 +157,7 @@ public Mono> apply(String namespace, @Valid @Body Schema sc */ @Status(HttpStatus.NO_CONTENT) @Delete - public Mono> bulkDelete(String namespace, + public Mono> bulkDelete(String namespace, @QueryValue(defaultValue = "*") String name, @QueryValue("version") Optional versionOptional, @QueryValue(defaultValue = "false") boolean dryrun) { @@ -170,27 +170,33 @@ public Mono> bulkDelete(String namespace, .map(Optional::of) .defaultIfEmpty(Optional.empty())) .collectList() - .flatMap(schemasList -> schemasList.isEmpty() || schemasList.stream().anyMatch(Optional::isEmpty) - ? Mono.just(HttpResponse.notFound()) : - (dryrun ? Mono.just(HttpResponse.noContent()) : - Flux.fromIterable(schemasList) - .filter(Optional::isPresent) - .map(Optional::get) - .flatMap(schema -> (versionOptional.isEmpty() - ? schemaService.deleteAllVersions(ns, schema.getMetadata().getName()) : - schemaService.deleteVersion(ns, schema.getMetadata().getName(), versionOptional.get())) - .flatMap(deletedVersionIds -> { - sendEventLog( - schema, - ApplyStatus.deleted, - schema.getSpec(), - null, - versionOptional.map(v -> String.valueOf(deletedVersionIds)) - .orElse(EMPTY_STRING) - ); - return Mono.just(HttpResponse.noContent()); - })) - .then(Mono.just(HttpResponse.noContent())))); + .flatMap(schemas -> { + if (schemas.isEmpty() || schemas.stream().anyMatch(Optional::isEmpty)) { + return Mono.just(HttpResponse.notFound()); + } + + if (dryrun) { + return Mono.just(HttpResponse.noContent()); + } + + return Flux.fromIterable(schemas) + .map(Optional::get) + .flatMap(schema -> (versionOptional.isEmpty() + ? schemaService.deleteAllVersions(ns, schema.getMetadata().getName()) : + schemaService.deleteVersion(ns, schema.getMetadata().getName(), versionOptional.get())) + .flatMap(deletedVersionIds -> { + sendEventLog( + schema, + ApplyStatus.deleted, + schema.getSpec(), + null, + versionOptional.map(v -> String.valueOf(deletedVersionIds)) + .orElse(EMPTY_STRING) + ); + return Mono.just(HttpResponse.noContent()); + })) + .then(Mono.just(HttpResponse.noContent())); + }); } /** diff --git a/src/test/java/com/michelin/ns4kafka/controller/SchemaControllerTest.java b/src/test/java/com/michelin/ns4kafka/controller/SchemaControllerTest.java index 7c0097e6..5db99b97 100644 --- a/src/test/java/com/michelin/ns4kafka/controller/SchemaControllerTest.java +++ b/src/test/java/com/michelin/ns4kafka/controller/SchemaControllerTest.java @@ -442,6 +442,7 @@ void shouldNotUpdateCompatibilityWhenNamespaceNotOwner() { } @Test + @SuppressWarnings("deprecation") void shouldNotDeleteAllSchemaVersionsWhenNotOwner() { Namespace namespace = buildNamespace(); @@ -465,6 +466,7 @@ void shouldNotDeleteAllSchemaVersionsWhenNotOwner() { } @Test + @SuppressWarnings("deprecation") void shouldNotDeleteOneSchemaVersionWhenNotOwner() { Namespace namespace = buildNamespace(); @@ -488,6 +490,7 @@ void shouldNotDeleteOneSchemaVersionWhenNotOwner() { } @Test + @SuppressWarnings("deprecation") void shouldDeleteAllSchemaVersions() { Namespace namespace = buildNamespace(); Schema schema = buildSchema(); @@ -514,6 +517,7 @@ void shouldDeleteAllSchemaVersions() { } @Test + @SuppressWarnings("deprecation") void shouldDeleteSchemaVersion() { Namespace namespace = buildNamespace(); Schema schema1 = buildSchema(); @@ -535,6 +539,7 @@ void shouldDeleteSchemaVersion() { } @Test + @SuppressWarnings("deprecation") void shouldNotDeleteAllSchemaVersionsWhenEmpty() { Namespace namespace = buildNamespace(); @@ -553,6 +558,7 @@ void shouldNotDeleteAllSchemaVersionsWhenEmpty() { } @Test + @SuppressWarnings("deprecation") void shouldNotDeleteSchemaVersionWhenEmpty() { Namespace namespace = buildNamespace(); @@ -571,6 +577,7 @@ void shouldNotDeleteSchemaVersionWhenEmpty() { } @Test + @SuppressWarnings("deprecation") void shouldNotDeleteAllSchemaVersionsInDryRunMode() { Namespace namespace = buildNamespace(); Schema schema = buildSchema(); @@ -590,6 +597,7 @@ void shouldNotDeleteAllSchemaVersionsInDryRunMode() { } @Test + @SuppressWarnings("deprecation") void shouldNotDeleteSchemaVersionInDryRunMode() { Namespace namespace = buildNamespace(); Schema schema = buildSchema(); @@ -608,6 +616,123 @@ void shouldNotDeleteSchemaVersionInDryRunMode() { verify(schemaService, never()).deleteVersion(namespace, "prefix.subject-value", "1"); } + @Test + void shouldBulkDeleteAllSchemaVersions() { + Namespace namespace = buildNamespace(); + Schema schema1 = buildSchema(); + SchemaList schemaList = buildSchemaList(); + + when(namespaceService.findByName("myNamespace")) + .thenReturn(Optional.of(namespace)); + when(schemaService.findByWildcardName(namespace, "prefix.subject-value")) + .thenReturn(Flux.fromIterable(List.of(schemaList))); + when(schemaService.getSubjectLatestVersion(namespace, "prefix.subject-value")) + .thenReturn(Mono.just(schema1)); + when(schemaService.deleteAllVersions(namespace, "prefix.subject-value")) + .thenReturn(Mono.just(new Integer[1])); + + StepVerifier.create(schemaController.bulkDelete("myNamespace", "prefix.subject-value", Optional.empty(), false)) + .consumeNextWith(response -> assertEquals(HttpStatus.NO_CONTENT, response.getStatus())) + .verifyComplete(); + + verify(applicationEventPublisher).publishEvent(any()); + } + + @Test + void shouldBulkDeleteSchemaVersion() { + Namespace namespace = buildNamespace(); + Schema schema1 = buildSchema(); + SchemaList schemaList = buildSchemaList(); + + when(namespaceService.findByName("myNamespace")) + .thenReturn(Optional.of(namespace)); + when(schemaService.findByWildcardName(namespace, "prefix.subject-value")) + .thenReturn(Flux.fromIterable(List.of(schemaList))); + when(schemaService.getSubjectByVersion(namespace, "prefix.subject-value", "1")) + .thenReturn(Mono.just(schema1)); + when(schemaService.deleteVersion(namespace, "prefix.subject-value", "1")) + .thenReturn(Mono.just(1)); + + StepVerifier.create(schemaController.bulkDelete("myNamespace", "prefix.subject-value", Optional.of("1"), false)) + .consumeNextWith(response -> assertEquals(HttpStatus.NO_CONTENT, response.getStatus())) + .verifyComplete(); + + verify(applicationEventPublisher).publishEvent(any()); + } + + + @Test + void shouldNotBulkDeleteAllSchemaVersionsWhenEmpty() { + Namespace namespace = buildNamespace(); + + when(namespaceService.findByName("myNamespace")) + .thenReturn(Optional.of(namespace)); + when(schemaService.findByWildcardName(namespace, "prefix.subject-value")) + .thenReturn(Flux.fromIterable(List.of())); + + StepVerifier.create(schemaController.bulkDelete("myNamespace", "prefix.subject-value", Optional.empty(), false)) + .consumeNextWith(response -> assertEquals(HttpStatus.NOT_FOUND, response.getStatus())) + .verifyComplete(); + + verify(schemaService, never()).deleteAllVersions(namespace, "prefix.subject-value"); + } + + @Test + void shouldNotBulkDeleteSchemaVersionWhenEmpty() { + Namespace namespace = buildNamespace(); + + when(namespaceService.findByName("myNamespace")) + .thenReturn(Optional.of(namespace)); + when(schemaService.findByWildcardName(namespace, "prefix.subject-value")) + .thenReturn(Flux.fromIterable(List.of())); + + StepVerifier.create(schemaController.bulkDelete("myNamespace", "prefix.subject-value", Optional.of("1"), false)) + .consumeNextWith(response -> assertEquals(HttpStatus.NOT_FOUND, response.getStatus())) + .verifyComplete(); + + verify(schemaService, never()).deleteVersion(namespace, "prefix.subject-value", "1"); + } + + @Test + void shouldNotBulkDeleteAllSchemaVersionsInDryRunMode() { + Namespace namespace = buildNamespace(); + Schema schema = buildSchema(); + SchemaList schemaList = buildSchemaList(); + + when(namespaceService.findByName("myNamespace")) + .thenReturn(Optional.of(namespace)); + when(schemaService.findByWildcardName(namespace, "prefix.subject-value")) + .thenReturn(Flux.fromIterable(List.of(schemaList))); + when(schemaService.getSubjectLatestVersion(namespace, "prefix.subject-value")) + .thenReturn(Mono.just(schema)); + + StepVerifier.create(schemaController.bulkDelete("myNamespace", "prefix.subject-value", Optional.empty(), true)) + .consumeNextWith(response -> assertEquals(HttpStatus.NO_CONTENT, response.getStatus())) + .verifyComplete(); + + verify(schemaService, never()).deleteAllVersions(namespace, "prefix.subject-value"); + } + + @Test + void shouldNotBulkDeleteSchemaVersionInDryRunMode() { + Namespace namespace = buildNamespace(); + Schema schema = buildSchema(); + SchemaList schemaList = buildSchemaList(); + + when(namespaceService.findByName("myNamespace")) + .thenReturn(Optional.of(namespace)); + when(schemaService.findByWildcardName(namespace, "prefix.subject-value")) + .thenReturn(Flux.fromIterable(List.of(schemaList))); + when(schemaService.getSubjectByVersion(namespace, "prefix.subject-value", "1")) + .thenReturn(Mono.just(schema)); + + StepVerifier.create(schemaController.bulkDelete("myNamespace", "prefix.subject-value", Optional.of("1"), true)) + .consumeNextWith(response -> assertEquals(HttpStatus.NO_CONTENT, response.getStatus())) + .verifyComplete(); + + verify(schemaService, never()).deleteVersion(namespace, "prefix.subject-value", "1"); + } + private Namespace buildNamespace() { return Namespace.builder() .metadata(Metadata.builder() From 06ea05a258dd944a64a01eed24f5e22f0e03d239 Mon Sep 17 00:00:00 2001 From: thcai Date: Mon, 30 Sep 2024 16:49:40 +0200 Subject: [PATCH 3/7] add unit tests + remove unused import --- .../ns4kafka/controller/SchemaController.java | 1 - .../controller/SchemaControllerTest.java | 60 ++++++++++++++++++- 2 files changed, 58 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/michelin/ns4kafka/controller/SchemaController.java b/src/main/java/com/michelin/ns4kafka/controller/SchemaController.java index 55a38400..02cea6c0 100644 --- a/src/main/java/com/michelin/ns4kafka/controller/SchemaController.java +++ b/src/main/java/com/michelin/ns4kafka/controller/SchemaController.java @@ -14,7 +14,6 @@ import com.michelin.ns4kafka.util.exception.ResourceValidationException; import io.micronaut.http.HttpResponse; import io.micronaut.http.HttpStatus; -import io.micronaut.http.MutableHttpResponse; import io.micronaut.http.annotation.Body; import io.micronaut.http.annotation.Controller; import io.micronaut.http.annotation.Delete; diff --git a/src/test/java/com/michelin/ns4kafka/controller/SchemaControllerTest.java b/src/test/java/com/michelin/ns4kafka/controller/SchemaControllerTest.java index 5db99b97..6801019a 100644 --- a/src/test/java/com/michelin/ns4kafka/controller/SchemaControllerTest.java +++ b/src/test/java/com/michelin/ns4kafka/controller/SchemaControllerTest.java @@ -641,7 +641,7 @@ void shouldBulkDeleteAllSchemaVersions() { @Test void shouldBulkDeleteSchemaVersion() { Namespace namespace = buildNamespace(); - Schema schema1 = buildSchema(); + Schema schema = buildSchema(); SchemaList schemaList = buildSchemaList(); when(namespaceService.findByName("myNamespace")) @@ -649,7 +649,7 @@ void shouldBulkDeleteSchemaVersion() { when(schemaService.findByWildcardName(namespace, "prefix.subject-value")) .thenReturn(Flux.fromIterable(List.of(schemaList))); when(schemaService.getSubjectByVersion(namespace, "prefix.subject-value", "1")) - .thenReturn(Mono.just(schema1)); + .thenReturn(Mono.just(schema)); when(schemaService.deleteVersion(namespace, "prefix.subject-value", "1")) .thenReturn(Mono.just(1)); @@ -693,6 +693,54 @@ void shouldNotBulkDeleteSchemaVersionWhenEmpty() { verify(schemaService, never()).deleteVersion(namespace, "prefix.subject-value", "1"); } + @Test + void shouldNotBulkDeleteAllSchemaVersionsWhenVersionNotFound() { + Namespace namespace = buildNamespace(); + Schema schema = buildSchema(); + SchemaList schemaList = buildSchemaList(); + SchemaList schemaList2 = buildSchemaList2(); + + when(namespaceService.findByName("myNamespace")) + .thenReturn(Optional.of(namespace)); + when(schemaService.findByWildcardName(namespace, "prefix.subject*")) + .thenReturn(Flux.fromIterable(List.of(schemaList, schemaList2))); + when(schemaService.getSubjectLatestVersion(namespace, "prefix.subject-value")) + .thenReturn(Mono.just(schema)); + when(schemaService.getSubjectLatestVersion(namespace, "prefix.subject2-value")) + .thenReturn(Mono.empty()); + + StepVerifier.create(schemaController.bulkDelete("myNamespace", "prefix.subject*", Optional.empty(), false)) + .consumeNextWith(response -> assertEquals(HttpStatus.NOT_FOUND, response.getStatus())) + .verifyComplete(); + + verify(schemaService, never()).deleteAllVersions(namespace, "prefix.subject-value"); + verify(schemaService, never()).deleteAllVersions(namespace, "prefix.subject2-value"); + } + + @Test + void shouldNotBulkDeleteSchemaVersionWhenVersionNotFound() { + Namespace namespace = buildNamespace(); + Schema schema = buildSchema(); + SchemaList schemaList = buildSchemaList(); + SchemaList schemaList2 = buildSchemaList2(); + + when(namespaceService.findByName("myNamespace")) + .thenReturn(Optional.of(namespace)); + when(schemaService.findByWildcardName(namespace, "prefix.subject*")) + .thenReturn(Flux.fromIterable(List.of(schemaList, schemaList2))); + when(schemaService.getSubjectByVersion(namespace, "prefix.subject-value", "1")) + .thenReturn(Mono.just(schema)); + when(schemaService.getSubjectByVersion(namespace, "prefix.subject2-value", "1")) + .thenReturn(Mono.empty()); + + StepVerifier.create(schemaController.bulkDelete("myNamespace", "prefix.subject*", Optional.of("1"), false)) + .consumeNextWith(response -> assertEquals(HttpStatus.NOT_FOUND, response.getStatus())) + .verifyComplete(); + + verify(schemaService, never()).deleteVersion(namespace, "prefix.subject-value", "1"); + verify(schemaService, never()).deleteVersion(namespace, "prefix.subject2-value", "1"); + } + @Test void shouldNotBulkDeleteAllSchemaVersionsInDryRunMode() { Namespace namespace = buildNamespace(); @@ -794,4 +842,12 @@ private SchemaList buildSchemaList() { .build()) .build(); } + + private SchemaList buildSchemaList2() { + return SchemaList.builder() + .metadata(Metadata.builder() + .name("prefix.subject2-value") + .build()) + .build(); + } } From 8cba207a3f8c97d8fca9042bec28b4fec8bb2652 Mon Sep 17 00:00:00 2001 From: thcai Date: Thu, 3 Oct 2024 14:19:57 +0200 Subject: [PATCH 4/7] Add integration test --- .../integration/SchemaIntegrationTest.java | 133 +++++++++++++++--- 1 file changed, 112 insertions(+), 21 deletions(-) diff --git a/src/test/java/com/michelin/ns4kafka/integration/SchemaIntegrationTest.java b/src/test/java/com/michelin/ns4kafka/integration/SchemaIntegrationTest.java index f829f208..6da3af77 100644 --- a/src/test/java/com/michelin/ns4kafka/integration/SchemaIntegrationTest.java +++ b/src/test/java/com/michelin/ns4kafka/integration/SchemaIntegrationTest.java @@ -768,7 +768,7 @@ void shouldRegisterSameSchemaTwice() { } @Test - void shouldDeleteSchema() { + void shouldDeleteSchemaVersion() { Schema schemaV1 = Schema.builder() .metadata(Metadata.builder() .name("ns1-subject4-value") @@ -861,61 +861,152 @@ void shouldDeleteSchema() { var deleteLatestVersionResponse = ns4KafkaClient .toBlocking() .exchange(HttpRequest - .create(HttpMethod.DELETE, "/api/namespaces/ns1/schemas/ns1-subject4-value?version=latest") + .create(HttpMethod.DELETE, "/api/namespaces/ns1/schemas?name=ns1-subject4-value&version=latest") .bearerAuth(token), Schema.class); assertEquals(HttpStatus.NO_CONTENT, deleteLatestVersionResponse.getStatus()); - // Get all schemas - var getSchemaAfterLatestVersionDeletionResponse = ns4KafkaClient + // Get schemas versions + var getSchemaAfterLatestVersionDeletionResponse = schemaRegistryClient .toBlocking() .exchange(HttpRequest - .create(HttpMethod.GET, "/api/namespaces/ns1/schemas/ns1-subject4-value") - .bearerAuth(token), Schema.class); + .create(HttpMethod.GET, "/subjects/ns1-subject4-value/versions"), Argument.listOf(String.class)); - // Expects v3 is returned by ns4kafka assertTrue(getSchemaAfterLatestVersionDeletionResponse.getBody().isPresent()); - assertEquals(3, getSchemaAfterLatestVersionDeletionResponse.getBody().get().getSpec().getVersion()); + assertTrue(getSchemaAfterLatestVersionDeletionResponse.getBody().get().containsAll(List.of("1", "2", "3"))); // Delete old schema version var deleteOldVersionResponse = ns4KafkaClient .toBlocking() .exchange(HttpRequest - .create(HttpMethod.DELETE, "/api/namespaces/ns1/schemas/ns1-subject4-value?version=1") + .create(HttpMethod.DELETE, "/api/namespaces/ns1/schemas?name=ns1-subject4-value&version=1") .bearerAuth(token), Schema.class); assertEquals(HttpStatus.NO_CONTENT, deleteOldVersionResponse.getStatus()); - // Get all schemas - var getSchemaAfterOldVersionDeletionResponse = ns4KafkaClient + // Get schemas versions + var getSchemaAfterOldVersionDeletionResponse = schemaRegistryClient .toBlocking() .exchange(HttpRequest - .create(HttpMethod.GET, "/api/namespaces/ns1/schemas/ns1-subject4-value") - .bearerAuth(token), Schema.class); + .create(HttpMethod.GET, "/subjects/ns1-subject4-value/versions"), Argument.listOf(String.class)); - // Expects v3 as returned schema assertTrue(getSchemaAfterOldVersionDeletionResponse.getBody().isPresent()); - assertEquals(3, getSchemaAfterOldVersionDeletionResponse.getBody().get().getSpec().getVersion()); + assertTrue(getSchemaAfterOldVersionDeletionResponse.getBody().get().containsAll(List.of("2", "3"))); // Delete all remaining schema versions var deleteAllVersionsResponse = ns4KafkaClient .toBlocking() .exchange(HttpRequest - .create(HttpMethod.DELETE, "/api/namespaces/ns1/schemas/ns1-subject4-value") + .create(HttpMethod.DELETE, "/api/namespaces/ns1/schemas?name=ns1-subject4-value") .bearerAuth(token), Schema.class); assertEquals(HttpStatus.NO_CONTENT, deleteAllVersionsResponse.getStatus()); // Get all schemas - var getSchemaAfterAllVersionsDeletionResponse = ns4KafkaClient + var getSchemaAfterAllVersionsDeletionResponse = schemaRegistryClient .toBlocking() .exchange(HttpRequest - .create(HttpMethod.GET, "/api/namespaces/ns1/schemas") - .bearerAuth(token), Argument.listOf(SchemaList.class)); + .create(HttpMethod.GET, "/subjects"), Argument.listOf(String.class)); assertTrue(getSchemaAfterAllVersionsDeletionResponse.getBody().isPresent()); - assertTrue(getSchemaAfterAllVersionsDeletionResponse.getBody().get() + assertTrue(getSchemaAfterAllVersionsDeletionResponse.getBody().get().isEmpty()); + } + + @Test + void shouldBulkDeleteSchemas() { + Schema schema1 = Schema.builder() + .metadata(Metadata.builder() + .name("ns1-subject5-value") + .build()) + .spec(Schema.SchemaSpec.builder() + .schema( + "{\"namespace\":\"com.michelin.kafka.producer.showcase.avro\",\"type\":\"record\"," + + "\"name\":\"PersonAvro\",\"fields\":[{\"name\":\"firstName\",\"type\":[\"null\",\"string\"]," + + "\"default\":null,\"doc\":\"First name of the person\"}," + + "{\"name\":\"lastName\",\"type\":[\"null\",\"string\"],\"default\":null," + + "\"doc\":\"Last name of the person\"},{\"name\":\"dateOfBirth\",\"type\":[\"null\"," + + "{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null," + + "\"doc\":\"Date of birth of the person\"}]}") + .build()) + .build(); + + Schema schema2 = Schema.builder() + .metadata(Metadata.builder() + .name("ns1-subject5-key") + .build()) + .spec(Schema.SchemaSpec.builder() + .schema( + "{\"namespace\":\"com.michelin.kafka.producer.showcase.avro\",\"type\":\"record\"," + + "\"name\":\"PersonAvro\",\"fields\":[{\"name\":\"firstName\",\"type\":[\"null\",\"string\"]," + + "\"default\":null,\"doc\":\"First name of the person\"}," + + "{\"name\":\"lastName\",\"type\":[\"null\",\"string\"],\"default\":null," + + "\"doc\":\"Last name of the person\"}]}") + .build()) + .build(); + + Schema schema3 = Schema.builder() + .metadata(Metadata.builder() + .name("ns1-subject6-value") + .build()) + .spec(Schema.SchemaSpec.builder() + .schema( + "{\"namespace\":\"com.michelin.kafka.producer.showcase.avro\",\"type\":\"record\"," + + "\"name\":\"PersonAvro\",\"fields\":[{\"name\":\"firstName\",\"type\":[\"null\",\"string\"]," + + "\"default\":null,\"doc\":\"First name of the person\"}," + + "{\"name\":\"lastName\",\"type\":[\"null\",\"string\"],\"default\":null," + + "\"doc\":\"Last name of the person\"}]}") + .build()) + .build(); + + // Register all schemas + ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.POST, "/api/namespaces/ns1/schemas") + .bearerAuth(token) + .body(schema1), Schema.class); + + ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.POST, "/api/namespaces/ns1/schemas") + .bearerAuth(token) + .body(schema2), Schema.class); + + ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.POST, "/api/namespaces/ns1/schemas") + .bearerAuth(token) + .body(schema3), Schema.class); + + var getSchemasBeforeDeletionResponse = schemaRegistryClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.GET, "/subjects"), Argument.listOf(String.class)); + + assertTrue(getSchemasBeforeDeletionResponse.getBody().isPresent()); + assertTrue(getSchemasBeforeDeletionResponse.getBody().get() + .containsAll(List.of("ns1-subject5-value", "ns1-subject5-key", "ns1-subject6-value"))); + + // Delete schema with wildcard + var deleteResponse = ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.DELETE, "/api/namespaces/ns1/schemas?name=ns1-subject5-*") + .bearerAuth(token), Schema.class); + + assertEquals(HttpStatus.NO_CONTENT, deleteResponse.getStatus()); + + var getSchemasAfterDeletionResponse = schemaRegistryClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.GET, "/subjects"), Argument.listOf(String.class)); + + assertTrue(getSchemasAfterDeletionResponse.getBody().isPresent()); + assertTrue(getSchemasAfterDeletionResponse.getBody().get() .stream() - .noneMatch(schemaList -> schemaList.getMetadata().getName().equals("ns1-subject4-value"))); + .noneMatch(subject -> List.of("ns1-subject5-key", "ns1-subject5-value").contains(subject))); + assertTrue(getSchemasAfterDeletionResponse.getBody().get().contains("ns1-subject6-value")); } } From a67627d4a72554c0db4b5a3f3aa1ac312795bcdf Mon Sep 17 00:00:00 2001 From: thcai Date: Thu, 3 Oct 2024 15:06:34 +0200 Subject: [PATCH 5/7] Fix test + fix javadoc for depreciation --- .../java/com/michelin/ns4kafka/controller/SchemaController.java | 2 +- .../michelin/ns4kafka/integration/SchemaIntegrationTest.java | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/java/com/michelin/ns4kafka/controller/SchemaController.java b/src/main/java/com/michelin/ns4kafka/controller/SchemaController.java index 02cea6c0..4981521d 100644 --- a/src/main/java/com/michelin/ns4kafka/controller/SchemaController.java +++ b/src/main/java/com/michelin/ns4kafka/controller/SchemaController.java @@ -62,7 +62,7 @@ public Flux list(String namespace, @QueryValue(defaultValue = "*") S * @param namespace The namespace * @param subject The subject * @return A schema - * @deprecated use list(String, String name) instead. + * @deprecated use {@link #list(String, String)} instead. */ @Get("/{subject}") @Deprecated(since = "1.12.0") diff --git a/src/test/java/com/michelin/ns4kafka/integration/SchemaIntegrationTest.java b/src/test/java/com/michelin/ns4kafka/integration/SchemaIntegrationTest.java index 6da3af77..2ffd765d 100644 --- a/src/test/java/com/michelin/ns4kafka/integration/SchemaIntegrationTest.java +++ b/src/test/java/com/michelin/ns4kafka/integration/SchemaIntegrationTest.java @@ -908,7 +908,6 @@ void shouldDeleteSchemaVersion() { .exchange(HttpRequest .create(HttpMethod.GET, "/subjects"), Argument.listOf(String.class)); - assertTrue(getSchemaAfterAllVersionsDeletionResponse.getBody().isPresent()); assertTrue(getSchemaAfterAllVersionsDeletionResponse.getBody().get().isEmpty()); } From 3eca7f4f97ae3830f08bf8a4af8754c3d03c16e0 Mon Sep 17 00:00:00 2001 From: thcai Date: Thu, 3 Oct 2024 16:52:22 +0200 Subject: [PATCH 6/7] Fix test --- .../michelin/ns4kafka/integration/SchemaIntegrationTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/test/java/com/michelin/ns4kafka/integration/SchemaIntegrationTest.java b/src/test/java/com/michelin/ns4kafka/integration/SchemaIntegrationTest.java index 2ffd765d..56506c01 100644 --- a/src/test/java/com/michelin/ns4kafka/integration/SchemaIntegrationTest.java +++ b/src/test/java/com/michelin/ns4kafka/integration/SchemaIntegrationTest.java @@ -1,6 +1,7 @@ package com.michelin.ns4kafka.integration; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -908,7 +909,8 @@ void shouldDeleteSchemaVersion() { .exchange(HttpRequest .create(HttpMethod.GET, "/subjects"), Argument.listOf(String.class)); - assertTrue(getSchemaAfterAllVersionsDeletionResponse.getBody().get().isEmpty()); + assertTrue(getSchemaAfterOldVersionDeletionResponse.getBody().isPresent()); + assertFalse(getSchemaAfterAllVersionsDeletionResponse.getBody().get().contains("ns1-subject4-value")); } @Test From ad93343ffe09d0269fdc7385a9c28f261eb6f77b Mon Sep 17 00:00:00 2001 From: thcai Date: Thu, 3 Oct 2024 16:53:26 +0200 Subject: [PATCH 7/7] fix javadoc --- .../java/com/michelin/ns4kafka/controller/SchemaController.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/michelin/ns4kafka/controller/SchemaController.java b/src/main/java/com/michelin/ns4kafka/controller/SchemaController.java index 4981521d..5c3c895f 100644 --- a/src/main/java/com/michelin/ns4kafka/controller/SchemaController.java +++ b/src/main/java/com/michelin/ns4kafka/controller/SchemaController.java @@ -206,7 +206,7 @@ public Mono> bulkDelete(String namespace, * @param versionOptional The version of the schema to delete * @param dryrun Run in dry mode or not? * @return A HTTP response - * @deprecated use bulkDelete(String, String name, Optional version) instead. + * @deprecated use {@link #bulkDelete(String, String, Optional, boolean)} instead. */ @Status(HttpStatus.NO_CONTENT) @Delete("/{subject}")