Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle wildcard parameter in Schema deletion API #446

Merged
merged 7 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public Flux<SchemaList> list(String namespace, @QueryValue(defaultValue = "*") S
* @param namespace The namespace
* @param subject The subject
* @return A schema
* @deprecated use list(String, String name) instead.
* @deprecated use {@link #list(String, String)} instead.
*/
@Get("/{subject}")
@Deprecated(since = "1.12.0")
Expand Down Expand Up @@ -146,16 +146,71 @@ public Mono<HttpResponse<Schema>> apply(String namespace, @Valid @Body Schema sc
}

/**
* Delete all schema versions under the given subject, or a specific version of the schema if specified.
* Delete all schema versions or a specific schema version if specified, under all given subjects.
*
* @param namespace The namespace
* @param name The subject name parameter
* @param versionOptional The version of the schemas to delete
* @param dryrun Run in dry mode or not?
* @return A HTTP response
*/
@Status(HttpStatus.NO_CONTENT)
@Delete
public Mono<HttpResponse<Void>> bulkDelete(String namespace,
@QueryValue(defaultValue = "*") String name,
@QueryValue("version") Optional<String> versionOptional,
@QueryValue(defaultValue = "false") boolean dryrun) {
Namespace ns = getNamespace(namespace);

return schemaService.findByWildcardName(ns, name)
.flatMap(schema -> versionOptional
.map(version -> schemaService.getSubjectByVersion(ns, schema.getMetadata().getName(), version))
.orElseGet(() -> schemaService.getSubjectLatestVersion(ns, schema.getMetadata().getName()))
.map(Optional::of)
.defaultIfEmpty(Optional.empty()))
.collectList()
.flatMap(schemas -> {
if (schemas.isEmpty() || schemas.stream().anyMatch(Optional::isEmpty)) {
return Mono.just(HttpResponse.notFound());
}

if (dryrun) {
return Mono.just(HttpResponse.noContent());
}

return Flux.fromIterable(schemas)
.map(Optional::get)
.flatMap(schema -> (versionOptional.isEmpty()
? schemaService.deleteAllVersions(ns, schema.getMetadata().getName()) :
schemaService.deleteVersion(ns, schema.getMetadata().getName(), versionOptional.get()))
.flatMap(deletedVersionIds -> {
sendEventLog(
schema,
ApplyStatus.deleted,
schema.getSpec(),
null,
versionOptional.map(v -> String.valueOf(deletedVersionIds))
.orElse(EMPTY_STRING)
);
return Mono.just(HttpResponse.noContent());
}))
.then(Mono.just(HttpResponse.noContent()));
});
}

/**
* Delete all schema versions or a specific schema version if specified, under the given subject.
*
* @param namespace The namespace
* @param subject The subject
* @param versionOptional The version of the schema to delete
* @param dryrun Run in dry mode or not
* @param dryrun Run in dry mode or not?
* @return A HTTP response
* @deprecated use {@link #bulkDelete(String, String, Optional, boolean)} instead.
*/
@Status(HttpStatus.NO_CONTENT)
@Delete("/{subject}")
@Deprecated(since = "1.13.0")
public Mono<HttpResponse<Void>> delete(String namespace,
@PathVariable String subject,
@QueryValue("version") Optional<String> versionOptional,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public Mono<Integer> register(Namespace namespace, Schema schema) {
*
* @param namespace The namespace
* @param subject The subject to delete
* @return The list of deleted versions
* @return The list of deleted schema versions
*/
public Mono<Integer[]> deleteAllVersions(Namespace namespace, String subject) {
return schemaRegistryClient
Expand All @@ -246,7 +246,7 @@ public Mono<Integer[]> deleteAllVersions(Namespace namespace, String subject) {
* @param namespace The namespace
* @param subject The subject
* @param version The version of the schema to delete
* @return The latest subject after deletion
* @return The deleted schema version
*/
public Mono<Integer> deleteVersion(Namespace namespace, String subject, String version) {
return schemaRegistryClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public Mono<Integer[]> deleteSubject(String kafkaCluster, String subject, boolea
}

/**
* Delete a subject.
* Delete schema version under a subject.
*
* @param kafkaCluster The Kafka cluster
* @param subject The subject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,7 @@ void shouldNotUpdateCompatibilityWhenNamespaceNotOwner() {
}

@Test
@SuppressWarnings("deprecation")
void shouldNotDeleteAllSchemaVersionsWhenNotOwner() {
Namespace namespace = buildNamespace();

Expand All @@ -465,6 +466,7 @@ void shouldNotDeleteAllSchemaVersionsWhenNotOwner() {
}

@Test
@SuppressWarnings("deprecation")
void shouldNotDeleteOneSchemaVersionWhenNotOwner() {
Namespace namespace = buildNamespace();

Expand All @@ -488,6 +490,7 @@ void shouldNotDeleteOneSchemaVersionWhenNotOwner() {
}

@Test
@SuppressWarnings("deprecation")
void shouldDeleteAllSchemaVersions() {
Namespace namespace = buildNamespace();
Schema schema = buildSchema();
Expand All @@ -514,6 +517,7 @@ void shouldDeleteAllSchemaVersions() {
}

@Test
@SuppressWarnings("deprecation")
void shouldDeleteSchemaVersion() {
Namespace namespace = buildNamespace();
Schema schema1 = buildSchema();
Expand All @@ -535,6 +539,7 @@ void shouldDeleteSchemaVersion() {
}

@Test
@SuppressWarnings("deprecation")
void shouldNotDeleteAllSchemaVersionsWhenEmpty() {
Namespace namespace = buildNamespace();

Expand All @@ -553,6 +558,7 @@ void shouldNotDeleteAllSchemaVersionsWhenEmpty() {
}

@Test
@SuppressWarnings("deprecation")
void shouldNotDeleteSchemaVersionWhenEmpty() {
Namespace namespace = buildNamespace();

Expand All @@ -571,6 +577,7 @@ void shouldNotDeleteSchemaVersionWhenEmpty() {
}

@Test
@SuppressWarnings("deprecation")
void shouldNotDeleteAllSchemaVersionsInDryRunMode() {
Namespace namespace = buildNamespace();
Schema schema = buildSchema();
Expand All @@ -590,6 +597,7 @@ void shouldNotDeleteAllSchemaVersionsInDryRunMode() {
}

@Test
@SuppressWarnings("deprecation")
void shouldNotDeleteSchemaVersionInDryRunMode() {
Namespace namespace = buildNamespace();
Schema schema = buildSchema();
Expand All @@ -608,6 +616,171 @@ void shouldNotDeleteSchemaVersionInDryRunMode() {
verify(schemaService, never()).deleteVersion(namespace, "prefix.subject-value", "1");
}

@Test
void shouldBulkDeleteAllSchemaVersions() {
Namespace namespace = buildNamespace();
Schema schema1 = buildSchema();
SchemaList schemaList = buildSchemaList();

when(namespaceService.findByName("myNamespace"))
.thenReturn(Optional.of(namespace));
when(schemaService.findByWildcardName(namespace, "prefix.subject-value"))
.thenReturn(Flux.fromIterable(List.of(schemaList)));
when(schemaService.getSubjectLatestVersion(namespace, "prefix.subject-value"))
.thenReturn(Mono.just(schema1));
when(schemaService.deleteAllVersions(namespace, "prefix.subject-value"))
.thenReturn(Mono.just(new Integer[1]));

StepVerifier.create(schemaController.bulkDelete("myNamespace", "prefix.subject-value", Optional.empty(), false))
.consumeNextWith(response -> assertEquals(HttpStatus.NO_CONTENT, response.getStatus()))
.verifyComplete();

verify(applicationEventPublisher).publishEvent(any());
}

@Test
void shouldBulkDeleteSchemaVersion() {
Namespace namespace = buildNamespace();
Schema schema = buildSchema();
SchemaList schemaList = buildSchemaList();

when(namespaceService.findByName("myNamespace"))
.thenReturn(Optional.of(namespace));
when(schemaService.findByWildcardName(namespace, "prefix.subject-value"))
.thenReturn(Flux.fromIterable(List.of(schemaList)));
when(schemaService.getSubjectByVersion(namespace, "prefix.subject-value", "1"))
.thenReturn(Mono.just(schema));
when(schemaService.deleteVersion(namespace, "prefix.subject-value", "1"))
.thenReturn(Mono.just(1));

StepVerifier.create(schemaController.bulkDelete("myNamespace", "prefix.subject-value", Optional.of("1"), false))
.consumeNextWith(response -> assertEquals(HttpStatus.NO_CONTENT, response.getStatus()))
.verifyComplete();

verify(applicationEventPublisher).publishEvent(any());
}


@Test
void shouldNotBulkDeleteAllSchemaVersionsWhenEmpty() {
Namespace namespace = buildNamespace();

when(namespaceService.findByName("myNamespace"))
.thenReturn(Optional.of(namespace));
when(schemaService.findByWildcardName(namespace, "prefix.subject-value"))
.thenReturn(Flux.fromIterable(List.of()));

StepVerifier.create(schemaController.bulkDelete("myNamespace", "prefix.subject-value", Optional.empty(), false))
.consumeNextWith(response -> assertEquals(HttpStatus.NOT_FOUND, response.getStatus()))
.verifyComplete();

verify(schemaService, never()).deleteAllVersions(namespace, "prefix.subject-value");
}

@Test
void shouldNotBulkDeleteSchemaVersionWhenEmpty() {
Namespace namespace = buildNamespace();

when(namespaceService.findByName("myNamespace"))
.thenReturn(Optional.of(namespace));
when(schemaService.findByWildcardName(namespace, "prefix.subject-value"))
.thenReturn(Flux.fromIterable(List.of()));

StepVerifier.create(schemaController.bulkDelete("myNamespace", "prefix.subject-value", Optional.of("1"), false))
.consumeNextWith(response -> assertEquals(HttpStatus.NOT_FOUND, response.getStatus()))
.verifyComplete();

verify(schemaService, never()).deleteVersion(namespace, "prefix.subject-value", "1");
}

@Test
void shouldNotBulkDeleteAllSchemaVersionsWhenVersionNotFound() {
Namespace namespace = buildNamespace();
Schema schema = buildSchema();
SchemaList schemaList = buildSchemaList();
SchemaList schemaList2 = buildSchemaList2();

when(namespaceService.findByName("myNamespace"))
.thenReturn(Optional.of(namespace));
when(schemaService.findByWildcardName(namespace, "prefix.subject*"))
.thenReturn(Flux.fromIterable(List.of(schemaList, schemaList2)));
when(schemaService.getSubjectLatestVersion(namespace, "prefix.subject-value"))
.thenReturn(Mono.just(schema));
when(schemaService.getSubjectLatestVersion(namespace, "prefix.subject2-value"))
.thenReturn(Mono.empty());

StepVerifier.create(schemaController.bulkDelete("myNamespace", "prefix.subject*", Optional.empty(), false))
.consumeNextWith(response -> assertEquals(HttpStatus.NOT_FOUND, response.getStatus()))
.verifyComplete();

verify(schemaService, never()).deleteAllVersions(namespace, "prefix.subject-value");
verify(schemaService, never()).deleteAllVersions(namespace, "prefix.subject2-value");
}

@Test
void shouldNotBulkDeleteSchemaVersionWhenVersionNotFound() {
Namespace namespace = buildNamespace();
Schema schema = buildSchema();
SchemaList schemaList = buildSchemaList();
SchemaList schemaList2 = buildSchemaList2();

when(namespaceService.findByName("myNamespace"))
.thenReturn(Optional.of(namespace));
when(schemaService.findByWildcardName(namespace, "prefix.subject*"))
.thenReturn(Flux.fromIterable(List.of(schemaList, schemaList2)));
when(schemaService.getSubjectByVersion(namespace, "prefix.subject-value", "1"))
.thenReturn(Mono.just(schema));
when(schemaService.getSubjectByVersion(namespace, "prefix.subject2-value", "1"))
.thenReturn(Mono.empty());

StepVerifier.create(schemaController.bulkDelete("myNamespace", "prefix.subject*", Optional.of("1"), false))
.consumeNextWith(response -> assertEquals(HttpStatus.NOT_FOUND, response.getStatus()))
.verifyComplete();

verify(schemaService, never()).deleteVersion(namespace, "prefix.subject-value", "1");
verify(schemaService, never()).deleteVersion(namespace, "prefix.subject2-value", "1");
}

@Test
void shouldNotBulkDeleteAllSchemaVersionsInDryRunMode() {
Namespace namespace = buildNamespace();
Schema schema = buildSchema();
SchemaList schemaList = buildSchemaList();

when(namespaceService.findByName("myNamespace"))
.thenReturn(Optional.of(namespace));
when(schemaService.findByWildcardName(namespace, "prefix.subject-value"))
.thenReturn(Flux.fromIterable(List.of(schemaList)));
when(schemaService.getSubjectLatestVersion(namespace, "prefix.subject-value"))
.thenReturn(Mono.just(schema));

StepVerifier.create(schemaController.bulkDelete("myNamespace", "prefix.subject-value", Optional.empty(), true))
.consumeNextWith(response -> assertEquals(HttpStatus.NO_CONTENT, response.getStatus()))
.verifyComplete();

verify(schemaService, never()).deleteAllVersions(namespace, "prefix.subject-value");
}

@Test
void shouldNotBulkDeleteSchemaVersionInDryRunMode() {
Namespace namespace = buildNamespace();
Schema schema = buildSchema();
SchemaList schemaList = buildSchemaList();

when(namespaceService.findByName("myNamespace"))
.thenReturn(Optional.of(namespace));
when(schemaService.findByWildcardName(namespace, "prefix.subject-value"))
.thenReturn(Flux.fromIterable(List.of(schemaList)));
when(schemaService.getSubjectByVersion(namespace, "prefix.subject-value", "1"))
.thenReturn(Mono.just(schema));

StepVerifier.create(schemaController.bulkDelete("myNamespace", "prefix.subject-value", Optional.of("1"), true))
.consumeNextWith(response -> assertEquals(HttpStatus.NO_CONTENT, response.getStatus()))
.verifyComplete();

verify(schemaService, never()).deleteVersion(namespace, "prefix.subject-value", "1");
}

private Namespace buildNamespace() {
return Namespace.builder()
.metadata(Metadata.builder()
Expand Down Expand Up @@ -669,4 +842,12 @@ private SchemaList buildSchemaList() {
.build())
.build();
}

private SchemaList buildSchemaList2() {
return SchemaList.builder()
.metadata(Metadata.builder()
.name("prefix.subject2-value")
.build())
.build();
}
}
Loading