From b3ce5548219bed8cf070c8585511f15582c0e3cc Mon Sep 17 00:00:00 2001 From: thcai Date: Wed, 25 Sep 2024 17:44:40 +0200 Subject: [PATCH] feature --- .../ns4kafka/controller/SchemaController.java | 54 ++++++++++++++++++- .../ns4kafka/service/SchemaService.java | 4 +- .../client/schema/SchemaRegistryClient.java | 2 +- 3 files changed, 55 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/michelin/ns4kafka/controller/SchemaController.java b/src/main/java/com/michelin/ns4kafka/controller/SchemaController.java index 5538c264..41da1a30 100644 --- a/src/main/java/com/michelin/ns4kafka/controller/SchemaController.java +++ b/src/main/java/com/michelin/ns4kafka/controller/SchemaController.java @@ -14,6 +14,7 @@ import com.michelin.ns4kafka.util.exception.ResourceValidationException; import io.micronaut.http.HttpResponse; import io.micronaut.http.HttpStatus; +import io.micronaut.http.MutableHttpResponse; import io.micronaut.http.annotation.Body; import io.micronaut.http.annotation.Controller; import io.micronaut.http.annotation.Delete; @@ -146,16 +147,65 @@ public Mono> apply(String namespace, @Valid @Body Schema sc } /** - * Delete all schema versions under the given subject, or a specific version of the schema if specified. + * Delete all schema versions or a specific schema version if specified, under all given subjects. + * + * @param namespace The namespace + * @param name The subject name parameter + * @param versionOptional The version of the schemas to delete + * @param dryrun Run in dry mode or not? + * @return A HTTP response + */ + @Status(HttpStatus.NO_CONTENT) + @Delete + public Mono> bulkDelete(String namespace, + @QueryValue(defaultValue = "*") String name, + @QueryValue("version") Optional versionOptional, + @QueryValue(defaultValue = "false") boolean dryrun) { + Namespace ns = getNamespace(namespace); + + return schemaService.findByWildcardName(ns, name) + .flatMap(schema -> versionOptional + .map(version -> schemaService.getSubjectByVersion(ns, schema.getMetadata().getName(), version)) + .orElseGet(() -> schemaService.getSubjectLatestVersion(ns, schema.getMetadata().getName())) + .map(Optional::of) + .defaultIfEmpty(Optional.empty())) + .collectList() + .flatMap(schemasList -> schemasList.isEmpty() || schemasList.stream().anyMatch(Optional::isEmpty) + ? Mono.just(HttpResponse.notFound()) : + (dryrun ? Mono.just(HttpResponse.noContent()) : + Flux.fromIterable(schemasList) + .filter(Optional::isPresent) + .map(Optional::get) + .flatMap(schema -> (versionOptional.isEmpty() + ? schemaService.deleteAllVersions(ns, schema.getMetadata().getName()) : + schemaService.deleteVersion(ns, schema.getMetadata().getName(), versionOptional.get())) + .flatMap(deletedVersionIds -> { + sendEventLog( + schema, + ApplyStatus.deleted, + schema.getSpec(), + null, + versionOptional.map(v -> String.valueOf(deletedVersionIds)) + .orElse(EMPTY_STRING) + ); + return Mono.just(HttpResponse.noContent()); + })) + .then(Mono.just(HttpResponse.noContent())))); + } + + /** + * Delete all schema versions or a specific schema version if specified, under the given subject. * * @param namespace The namespace * @param subject The subject * @param versionOptional The version of the schema to delete - * @param dryrun Run in dry mode or not + * @param dryrun Run in dry mode or not? * @return A HTTP response + * @deprecated use bulkDelete(String, String name, Optional version) instead. */ @Status(HttpStatus.NO_CONTENT) @Delete("/{subject}") + @Deprecated(since = "1.13.0") public Mono> delete(String namespace, @PathVariable String subject, @QueryValue("version") Optional versionOptional, diff --git a/src/main/java/com/michelin/ns4kafka/service/SchemaService.java b/src/main/java/com/michelin/ns4kafka/service/SchemaService.java index 597abc25..f659c5cd 100644 --- a/src/main/java/com/michelin/ns4kafka/service/SchemaService.java +++ b/src/main/java/com/michelin/ns4kafka/service/SchemaService.java @@ -230,7 +230,7 @@ public Mono register(Namespace namespace, Schema schema) { * * @param namespace The namespace * @param subject The subject to delete - * @return The list of deleted versions + * @return The list of deleted schema versions */ public Mono deleteAllVersions(Namespace namespace, String subject) { return schemaRegistryClient @@ -246,7 +246,7 @@ public Mono deleteAllVersions(Namespace namespace, String subject) { * @param namespace The namespace * @param subject The subject * @param version The version of the schema to delete - * @return The latest subject after deletion + * @return The deleted schema version */ public Mono deleteVersion(Namespace namespace, String subject, String version) { return schemaRegistryClient diff --git a/src/main/java/com/michelin/ns4kafka/service/client/schema/SchemaRegistryClient.java b/src/main/java/com/michelin/ns4kafka/service/client/schema/SchemaRegistryClient.java index 0060a20b..288d5ab9 100644 --- a/src/main/java/com/michelin/ns4kafka/service/client/schema/SchemaRegistryClient.java +++ b/src/main/java/com/michelin/ns4kafka/service/client/schema/SchemaRegistryClient.java @@ -139,7 +139,7 @@ public Mono deleteSubject(String kafkaCluster, String subject, boolea } /** - * Delete a subject. + * Delete schema version under a subject. * * @param kafkaCluster The Kafka cluster * @param subject The subject