Skip to content

Commit

Permalink
feature
Browse files Browse the repository at this point in the history
  • Loading branch information
ThomasCAI-mlv committed Sep 25, 2024
1 parent 119d1b4 commit b3ce554
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.michelin.ns4kafka.util.exception.ResourceValidationException;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.HttpStatus;
import io.micronaut.http.MutableHttpResponse;
import io.micronaut.http.annotation.Body;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Delete;
Expand Down Expand Up @@ -146,16 +147,65 @@ public Mono<HttpResponse<Schema>> apply(String namespace, @Valid @Body Schema sc
}

/**
* Delete all schema versions under the given subject, or a specific version of the schema if specified.
* Delete all schema versions or a specific schema version if specified, under all given subjects.
*
* @param namespace The namespace
* @param name The subject name parameter
* @param versionOptional The version of the schemas to delete
* @param dryrun Run in dry mode or not?
* @return A HTTP response
*/
@Status(HttpStatus.NO_CONTENT)
@Delete
public Mono<MutableHttpResponse<Object>> bulkDelete(String namespace,
@QueryValue(defaultValue = "*") String name,
@QueryValue("version") Optional<String> versionOptional,
@QueryValue(defaultValue = "false") boolean dryrun) {
Namespace ns = getNamespace(namespace);

return schemaService.findByWildcardName(ns, name)
.flatMap(schema -> versionOptional
.map(version -> schemaService.getSubjectByVersion(ns, schema.getMetadata().getName(), version))
.orElseGet(() -> schemaService.getSubjectLatestVersion(ns, schema.getMetadata().getName()))
.map(Optional::of)
.defaultIfEmpty(Optional.empty()))
.collectList()
.flatMap(schemasList -> schemasList.isEmpty() || schemasList.stream().anyMatch(Optional::isEmpty)
? Mono.just(HttpResponse.notFound()) :
(dryrun ? Mono.just(HttpResponse.noContent()) :
Flux.fromIterable(schemasList)
.filter(Optional::isPresent)
.map(Optional::get)
.flatMap(schema -> (versionOptional.isEmpty()
? schemaService.deleteAllVersions(ns, schema.getMetadata().getName()) :
schemaService.deleteVersion(ns, schema.getMetadata().getName(), versionOptional.get()))
.flatMap(deletedVersionIds -> {
sendEventLog(
schema,
ApplyStatus.deleted,
schema.getSpec(),
null,
versionOptional.map(v -> String.valueOf(deletedVersionIds))
.orElse(EMPTY_STRING)
);
return Mono.just(HttpResponse.noContent());
}))
.then(Mono.just(HttpResponse.noContent()))));
}

/**
* Delete all schema versions or a specific schema version if specified, under the given subject.
*
* @param namespace The namespace
* @param subject The subject
* @param versionOptional The version of the schema to delete
* @param dryrun Run in dry mode or not
* @param dryrun Run in dry mode or not?
* @return A HTTP response
* @deprecated use bulkDelete(String, String name, Optional version) instead.
*/
@Status(HttpStatus.NO_CONTENT)
@Delete("/{subject}")
@Deprecated(since = "1.13.0")
public Mono<HttpResponse<Void>> delete(String namespace,
@PathVariable String subject,
@QueryValue("version") Optional<String> versionOptional,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public Mono<Integer> register(Namespace namespace, Schema schema) {
*
* @param namespace The namespace
* @param subject The subject to delete
* @return The list of deleted versions
* @return The list of deleted schema versions
*/
public Mono<Integer[]> deleteAllVersions(Namespace namespace, String subject) {
return schemaRegistryClient
Expand All @@ -246,7 +246,7 @@ public Mono<Integer[]> deleteAllVersions(Namespace namespace, String subject) {
* @param namespace The namespace
* @param subject The subject
* @param version The version of the schema to delete
* @return The latest subject after deletion
* @return The deleted schema version
*/
public Mono<Integer> deleteVersion(Namespace namespace, String subject, String version) {
return schemaRegistryClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public Mono<Integer[]> deleteSubject(String kafkaCluster, String subject, boolea
}

/**
* Delete a subject.
* Delete schema version under a subject.
*
* @param kafkaCluster The Kafka cluster
* @param subject The subject
Expand Down

0 comments on commit b3ce554

Please sign in to comment.