Skip to content

Commit

Permalink
Handle wildcard parameter in Schema deletion API (#446)
Browse files Browse the repository at this point in the history
* feature

* fix return type & unit tests

* add unit tests + remove unused import

* Add integration test

* Fix test + fix javadoc for depreciation

* Fix test

* fix javadoc
  • Loading branch information
ThomasCAI-mlv authored Oct 11, 2024
1 parent a0c6b8a commit 18ff01f
Show file tree
Hide file tree
Showing 5 changed files with 356 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public Flux<SchemaList> list(String namespace, @QueryValue(defaultValue = "*") S
* @param namespace The namespace
* @param subject The subject
* @return A schema
* @deprecated use list(String, String name) instead.
* @deprecated use {@link #list(String, String)} instead.
*/
@Get("/{subject}")
@Deprecated(since = "1.12.0")
Expand Down Expand Up @@ -146,16 +146,71 @@ public Mono<HttpResponse<Schema>> apply(String namespace, @Valid @Body Schema sc
}

/**
* Delete all schema versions under the given subject, or a specific version of the schema if specified.
* Delete all schema versions or a specific schema version if specified, under all given subjects.
*
* @param namespace The namespace
* @param name The subject name parameter
* @param versionOptional The version of the schemas to delete
* @param dryrun Run in dry mode or not?
* @return A HTTP response
*/
@Status(HttpStatus.NO_CONTENT)
@Delete
public Mono<HttpResponse<Void>> bulkDelete(String namespace,
@QueryValue(defaultValue = "*") String name,
@QueryValue("version") Optional<String> versionOptional,
@QueryValue(defaultValue = "false") boolean dryrun) {
Namespace ns = getNamespace(namespace);

return schemaService.findByWildcardName(ns, name)
.flatMap(schema -> versionOptional
.map(version -> schemaService.getSubjectByVersion(ns, schema.getMetadata().getName(), version))
.orElseGet(() -> schemaService.getSubjectLatestVersion(ns, schema.getMetadata().getName()))
.map(Optional::of)
.defaultIfEmpty(Optional.empty()))
.collectList()
.flatMap(schemas -> {
if (schemas.isEmpty() || schemas.stream().anyMatch(Optional::isEmpty)) {
return Mono.just(HttpResponse.notFound());
}

if (dryrun) {
return Mono.just(HttpResponse.noContent());
}

return Flux.fromIterable(schemas)
.map(Optional::get)
.flatMap(schema -> (versionOptional.isEmpty()
? schemaService.deleteAllVersions(ns, schema.getMetadata().getName()) :
schemaService.deleteVersion(ns, schema.getMetadata().getName(), versionOptional.get()))
.flatMap(deletedVersionIds -> {
sendEventLog(
schema,
ApplyStatus.deleted,
schema.getSpec(),
null,
versionOptional.map(v -> String.valueOf(deletedVersionIds))
.orElse(EMPTY_STRING)
);
return Mono.just(HttpResponse.noContent());
}))
.then(Mono.just(HttpResponse.noContent()));
});
}

/**
* Delete all schema versions or a specific schema version if specified, under the given subject.
*
* @param namespace The namespace
* @param subject The subject
* @param versionOptional The version of the schema to delete
* @param dryrun Run in dry mode or not
* @param dryrun Run in dry mode or not?
* @return A HTTP response
* @deprecated use {@link #bulkDelete(String, String, Optional, boolean)} instead.
*/
@Status(HttpStatus.NO_CONTENT)
@Delete("/{subject}")
@Deprecated(since = "1.13.0")
public Mono<HttpResponse<Void>> delete(String namespace,
@PathVariable String subject,
@QueryValue("version") Optional<String> versionOptional,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public Mono<Integer> register(Namespace namespace, Schema schema) {
*
* @param namespace The namespace
* @param subject The subject to delete
* @return The list of deleted versions
* @return The list of deleted schema versions
*/
public Mono<Integer[]> deleteAllVersions(Namespace namespace, String subject) {
return schemaRegistryClient
Expand All @@ -246,7 +246,7 @@ public Mono<Integer[]> deleteAllVersions(Namespace namespace, String subject) {
* @param namespace The namespace
* @param subject The subject
* @param version The version of the schema to delete
* @return The latest subject after deletion
* @return The deleted schema version
*/
public Mono<Integer> deleteVersion(Namespace namespace, String subject, String version) {
return schemaRegistryClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public Mono<Integer[]> deleteSubject(String kafkaCluster, String subject, boolea
}

/**
* Delete a subject.
* Delete schema version under a subject.
*
* @param kafkaCluster The Kafka cluster
* @param subject The subject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,7 @@ void shouldNotUpdateCompatibilityWhenNamespaceNotOwner() {
}

@Test
@SuppressWarnings("deprecation")
void shouldNotDeleteAllSchemaVersionsWhenNotOwner() {
Namespace namespace = buildNamespace();

Expand All @@ -465,6 +466,7 @@ void shouldNotDeleteAllSchemaVersionsWhenNotOwner() {
}

@Test
@SuppressWarnings("deprecation")
void shouldNotDeleteOneSchemaVersionWhenNotOwner() {
Namespace namespace = buildNamespace();

Expand All @@ -488,6 +490,7 @@ void shouldNotDeleteOneSchemaVersionWhenNotOwner() {
}

@Test
@SuppressWarnings("deprecation")
void shouldDeleteAllSchemaVersions() {
Namespace namespace = buildNamespace();
Schema schema = buildSchema();
Expand All @@ -514,6 +517,7 @@ void shouldDeleteAllSchemaVersions() {
}

@Test
@SuppressWarnings("deprecation")
void shouldDeleteSchemaVersion() {
Namespace namespace = buildNamespace();
Schema schema1 = buildSchema();
Expand All @@ -535,6 +539,7 @@ void shouldDeleteSchemaVersion() {
}

@Test
@SuppressWarnings("deprecation")
void shouldNotDeleteAllSchemaVersionsWhenEmpty() {
Namespace namespace = buildNamespace();

Expand All @@ -553,6 +558,7 @@ void shouldNotDeleteAllSchemaVersionsWhenEmpty() {
}

@Test
@SuppressWarnings("deprecation")
void shouldNotDeleteSchemaVersionWhenEmpty() {
Namespace namespace = buildNamespace();

Expand All @@ -571,6 +577,7 @@ void shouldNotDeleteSchemaVersionWhenEmpty() {
}

@Test
@SuppressWarnings("deprecation")
void shouldNotDeleteAllSchemaVersionsInDryRunMode() {
Namespace namespace = buildNamespace();
Schema schema = buildSchema();
Expand All @@ -590,6 +597,7 @@ void shouldNotDeleteAllSchemaVersionsInDryRunMode() {
}

@Test
@SuppressWarnings("deprecation")
void shouldNotDeleteSchemaVersionInDryRunMode() {
Namespace namespace = buildNamespace();
Schema schema = buildSchema();
Expand All @@ -608,6 +616,171 @@ void shouldNotDeleteSchemaVersionInDryRunMode() {
verify(schemaService, never()).deleteVersion(namespace, "prefix.subject-value", "1");
}

@Test
void shouldBulkDeleteAllSchemaVersions() {
Namespace namespace = buildNamespace();
Schema schema1 = buildSchema();
SchemaList schemaList = buildSchemaList();

when(namespaceService.findByName("myNamespace"))
.thenReturn(Optional.of(namespace));
when(schemaService.findByWildcardName(namespace, "prefix.subject-value"))
.thenReturn(Flux.fromIterable(List.of(schemaList)));
when(schemaService.getSubjectLatestVersion(namespace, "prefix.subject-value"))
.thenReturn(Mono.just(schema1));
when(schemaService.deleteAllVersions(namespace, "prefix.subject-value"))
.thenReturn(Mono.just(new Integer[1]));

StepVerifier.create(schemaController.bulkDelete("myNamespace", "prefix.subject-value", Optional.empty(), false))
.consumeNextWith(response -> assertEquals(HttpStatus.NO_CONTENT, response.getStatus()))
.verifyComplete();

verify(applicationEventPublisher).publishEvent(any());
}

@Test
void shouldBulkDeleteSchemaVersion() {
Namespace namespace = buildNamespace();
Schema schema = buildSchema();
SchemaList schemaList = buildSchemaList();

when(namespaceService.findByName("myNamespace"))
.thenReturn(Optional.of(namespace));
when(schemaService.findByWildcardName(namespace, "prefix.subject-value"))
.thenReturn(Flux.fromIterable(List.of(schemaList)));
when(schemaService.getSubjectByVersion(namespace, "prefix.subject-value", "1"))
.thenReturn(Mono.just(schema));
when(schemaService.deleteVersion(namespace, "prefix.subject-value", "1"))
.thenReturn(Mono.just(1));

StepVerifier.create(schemaController.bulkDelete("myNamespace", "prefix.subject-value", Optional.of("1"), false))
.consumeNextWith(response -> assertEquals(HttpStatus.NO_CONTENT, response.getStatus()))
.verifyComplete();

verify(applicationEventPublisher).publishEvent(any());
}


@Test
void shouldNotBulkDeleteAllSchemaVersionsWhenEmpty() {
Namespace namespace = buildNamespace();

when(namespaceService.findByName("myNamespace"))
.thenReturn(Optional.of(namespace));
when(schemaService.findByWildcardName(namespace, "prefix.subject-value"))
.thenReturn(Flux.fromIterable(List.of()));

StepVerifier.create(schemaController.bulkDelete("myNamespace", "prefix.subject-value", Optional.empty(), false))
.consumeNextWith(response -> assertEquals(HttpStatus.NOT_FOUND, response.getStatus()))
.verifyComplete();

verify(schemaService, never()).deleteAllVersions(namespace, "prefix.subject-value");
}

@Test
void shouldNotBulkDeleteSchemaVersionWhenEmpty() {
Namespace namespace = buildNamespace();

when(namespaceService.findByName("myNamespace"))
.thenReturn(Optional.of(namespace));
when(schemaService.findByWildcardName(namespace, "prefix.subject-value"))
.thenReturn(Flux.fromIterable(List.of()));

StepVerifier.create(schemaController.bulkDelete("myNamespace", "prefix.subject-value", Optional.of("1"), false))
.consumeNextWith(response -> assertEquals(HttpStatus.NOT_FOUND, response.getStatus()))
.verifyComplete();

verify(schemaService, never()).deleteVersion(namespace, "prefix.subject-value", "1");
}

@Test
void shouldNotBulkDeleteAllSchemaVersionsWhenVersionNotFound() {
Namespace namespace = buildNamespace();
Schema schema = buildSchema();
SchemaList schemaList = buildSchemaList();
SchemaList schemaList2 = buildSchemaList2();

when(namespaceService.findByName("myNamespace"))
.thenReturn(Optional.of(namespace));
when(schemaService.findByWildcardName(namespace, "prefix.subject*"))
.thenReturn(Flux.fromIterable(List.of(schemaList, schemaList2)));
when(schemaService.getSubjectLatestVersion(namespace, "prefix.subject-value"))
.thenReturn(Mono.just(schema));
when(schemaService.getSubjectLatestVersion(namespace, "prefix.subject2-value"))
.thenReturn(Mono.empty());

StepVerifier.create(schemaController.bulkDelete("myNamespace", "prefix.subject*", Optional.empty(), false))
.consumeNextWith(response -> assertEquals(HttpStatus.NOT_FOUND, response.getStatus()))
.verifyComplete();

verify(schemaService, never()).deleteAllVersions(namespace, "prefix.subject-value");
verify(schemaService, never()).deleteAllVersions(namespace, "prefix.subject2-value");
}

@Test
void shouldNotBulkDeleteSchemaVersionWhenVersionNotFound() {
Namespace namespace = buildNamespace();
Schema schema = buildSchema();
SchemaList schemaList = buildSchemaList();
SchemaList schemaList2 = buildSchemaList2();

when(namespaceService.findByName("myNamespace"))
.thenReturn(Optional.of(namespace));
when(schemaService.findByWildcardName(namespace, "prefix.subject*"))
.thenReturn(Flux.fromIterable(List.of(schemaList, schemaList2)));
when(schemaService.getSubjectByVersion(namespace, "prefix.subject-value", "1"))
.thenReturn(Mono.just(schema));
when(schemaService.getSubjectByVersion(namespace, "prefix.subject2-value", "1"))
.thenReturn(Mono.empty());

StepVerifier.create(schemaController.bulkDelete("myNamespace", "prefix.subject*", Optional.of("1"), false))
.consumeNextWith(response -> assertEquals(HttpStatus.NOT_FOUND, response.getStatus()))
.verifyComplete();

verify(schemaService, never()).deleteVersion(namespace, "prefix.subject-value", "1");
verify(schemaService, never()).deleteVersion(namespace, "prefix.subject2-value", "1");
}

@Test
void shouldNotBulkDeleteAllSchemaVersionsInDryRunMode() {
Namespace namespace = buildNamespace();
Schema schema = buildSchema();
SchemaList schemaList = buildSchemaList();

when(namespaceService.findByName("myNamespace"))
.thenReturn(Optional.of(namespace));
when(schemaService.findByWildcardName(namespace, "prefix.subject-value"))
.thenReturn(Flux.fromIterable(List.of(schemaList)));
when(schemaService.getSubjectLatestVersion(namespace, "prefix.subject-value"))
.thenReturn(Mono.just(schema));

StepVerifier.create(schemaController.bulkDelete("myNamespace", "prefix.subject-value", Optional.empty(), true))
.consumeNextWith(response -> assertEquals(HttpStatus.NO_CONTENT, response.getStatus()))
.verifyComplete();

verify(schemaService, never()).deleteAllVersions(namespace, "prefix.subject-value");
}

@Test
void shouldNotBulkDeleteSchemaVersionInDryRunMode() {
Namespace namespace = buildNamespace();
Schema schema = buildSchema();
SchemaList schemaList = buildSchemaList();

when(namespaceService.findByName("myNamespace"))
.thenReturn(Optional.of(namespace));
when(schemaService.findByWildcardName(namespace, "prefix.subject-value"))
.thenReturn(Flux.fromIterable(List.of(schemaList)));
when(schemaService.getSubjectByVersion(namespace, "prefix.subject-value", "1"))
.thenReturn(Mono.just(schema));

StepVerifier.create(schemaController.bulkDelete("myNamespace", "prefix.subject-value", Optional.of("1"), true))
.consumeNextWith(response -> assertEquals(HttpStatus.NO_CONTENT, response.getStatus()))
.verifyComplete();

verify(schemaService, never()).deleteVersion(namespace, "prefix.subject-value", "1");
}

private Namespace buildNamespace() {
return Namespace.builder()
.metadata(Metadata.builder()
Expand Down Expand Up @@ -669,4 +842,12 @@ private SchemaList buildSchemaList() {
.build())
.build();
}

private SchemaList buildSchemaList2() {
return SchemaList.builder()
.metadata(Metadata.builder()
.name("prefix.subject2-value")
.build())
.build();
}
}
Loading

0 comments on commit 18ff01f

Please sign in to comment.