From 18ff01ff124dc614cacc2958c6487114e067ac23 Mon Sep 17 00:00:00 2001 From: Thomas CAI <92149044+ThomasCAI-mlv@users.noreply.github.com> Date: Fri, 11 Oct 2024 19:48:21 +0200 Subject: [PATCH] Handle wildcard parameter in Schema deletion API (#446) * feature * fix return type & unit tests * add unit tests + remove unused import * Add integration test * Fix test + fix javadoc for depreciation * Fix test * fix javadoc --- .../ns4kafka/controller/SchemaController.java | 61 +++++- .../ns4kafka/service/SchemaService.java | 4 +- .../client/schema/SchemaRegistryClient.java | 2 +- .../controller/SchemaControllerTest.java | 181 ++++++++++++++++++ .../integration/SchemaIntegrationTest.java | 136 ++++++++++--- 5 files changed, 356 insertions(+), 28 deletions(-) diff --git a/src/main/java/com/michelin/ns4kafka/controller/SchemaController.java b/src/main/java/com/michelin/ns4kafka/controller/SchemaController.java index 5538c264..5c3c895f 100644 --- a/src/main/java/com/michelin/ns4kafka/controller/SchemaController.java +++ b/src/main/java/com/michelin/ns4kafka/controller/SchemaController.java @@ -62,7 +62,7 @@ public Flux list(String namespace, @QueryValue(defaultValue = "*") S * @param namespace The namespace * @param subject The subject * @return A schema - * @deprecated use list(String, String name) instead. + * @deprecated use {@link #list(String, String)} instead. */ @Get("/{subject}") @Deprecated(since = "1.12.0") @@ -146,16 +146,71 @@ public Mono> apply(String namespace, @Valid @Body Schema sc } /** - * Delete all schema versions under the given subject, or a specific version of the schema if specified. + * Delete all schema versions or a specific schema version if specified, under all given subjects. + * + * @param namespace The namespace + * @param name The subject name parameter + * @param versionOptional The version of the schemas to delete + * @param dryrun Run in dry mode or not? + * @return A HTTP response + */ + @Status(HttpStatus.NO_CONTENT) + @Delete + public Mono> bulkDelete(String namespace, + @QueryValue(defaultValue = "*") String name, + @QueryValue("version") Optional versionOptional, + @QueryValue(defaultValue = "false") boolean dryrun) { + Namespace ns = getNamespace(namespace); + + return schemaService.findByWildcardName(ns, name) + .flatMap(schema -> versionOptional + .map(version -> schemaService.getSubjectByVersion(ns, schema.getMetadata().getName(), version)) + .orElseGet(() -> schemaService.getSubjectLatestVersion(ns, schema.getMetadata().getName())) + .map(Optional::of) + .defaultIfEmpty(Optional.empty())) + .collectList() + .flatMap(schemas -> { + if (schemas.isEmpty() || schemas.stream().anyMatch(Optional::isEmpty)) { + return Mono.just(HttpResponse.notFound()); + } + + if (dryrun) { + return Mono.just(HttpResponse.noContent()); + } + + return Flux.fromIterable(schemas) + .map(Optional::get) + .flatMap(schema -> (versionOptional.isEmpty() + ? schemaService.deleteAllVersions(ns, schema.getMetadata().getName()) : + schemaService.deleteVersion(ns, schema.getMetadata().getName(), versionOptional.get())) + .flatMap(deletedVersionIds -> { + sendEventLog( + schema, + ApplyStatus.deleted, + schema.getSpec(), + null, + versionOptional.map(v -> String.valueOf(deletedVersionIds)) + .orElse(EMPTY_STRING) + ); + return Mono.just(HttpResponse.noContent()); + })) + .then(Mono.just(HttpResponse.noContent())); + }); + } + + /** + * Delete all schema versions or a specific schema version if specified, under the given subject. * * @param namespace The namespace * @param subject The subject * @param versionOptional The version of the schema to delete - * @param dryrun Run in dry mode or not + * @param dryrun Run in dry mode or not? * @return A HTTP response + * @deprecated use {@link #bulkDelete(String, String, Optional, boolean)} instead. */ @Status(HttpStatus.NO_CONTENT) @Delete("/{subject}") + @Deprecated(since = "1.13.0") public Mono> delete(String namespace, @PathVariable String subject, @QueryValue("version") Optional versionOptional, diff --git a/src/main/java/com/michelin/ns4kafka/service/SchemaService.java b/src/main/java/com/michelin/ns4kafka/service/SchemaService.java index 597abc25..f659c5cd 100644 --- a/src/main/java/com/michelin/ns4kafka/service/SchemaService.java +++ b/src/main/java/com/michelin/ns4kafka/service/SchemaService.java @@ -230,7 +230,7 @@ public Mono register(Namespace namespace, Schema schema) { * * @param namespace The namespace * @param subject The subject to delete - * @return The list of deleted versions + * @return The list of deleted schema versions */ public Mono deleteAllVersions(Namespace namespace, String subject) { return schemaRegistryClient @@ -246,7 +246,7 @@ public Mono deleteAllVersions(Namespace namespace, String subject) { * @param namespace The namespace * @param subject The subject * @param version The version of the schema to delete - * @return The latest subject after deletion + * @return The deleted schema version */ public Mono deleteVersion(Namespace namespace, String subject, String version) { return schemaRegistryClient diff --git a/src/main/java/com/michelin/ns4kafka/service/client/schema/SchemaRegistryClient.java b/src/main/java/com/michelin/ns4kafka/service/client/schema/SchemaRegistryClient.java index 0060a20b..288d5ab9 100644 --- a/src/main/java/com/michelin/ns4kafka/service/client/schema/SchemaRegistryClient.java +++ b/src/main/java/com/michelin/ns4kafka/service/client/schema/SchemaRegistryClient.java @@ -139,7 +139,7 @@ public Mono deleteSubject(String kafkaCluster, String subject, boolea } /** - * Delete a subject. + * Delete schema version under a subject. * * @param kafkaCluster The Kafka cluster * @param subject The subject diff --git a/src/test/java/com/michelin/ns4kafka/controller/SchemaControllerTest.java b/src/test/java/com/michelin/ns4kafka/controller/SchemaControllerTest.java index 7c0097e6..6801019a 100644 --- a/src/test/java/com/michelin/ns4kafka/controller/SchemaControllerTest.java +++ b/src/test/java/com/michelin/ns4kafka/controller/SchemaControllerTest.java @@ -442,6 +442,7 @@ void shouldNotUpdateCompatibilityWhenNamespaceNotOwner() { } @Test + @SuppressWarnings("deprecation") void shouldNotDeleteAllSchemaVersionsWhenNotOwner() { Namespace namespace = buildNamespace(); @@ -465,6 +466,7 @@ void shouldNotDeleteAllSchemaVersionsWhenNotOwner() { } @Test + @SuppressWarnings("deprecation") void shouldNotDeleteOneSchemaVersionWhenNotOwner() { Namespace namespace = buildNamespace(); @@ -488,6 +490,7 @@ void shouldNotDeleteOneSchemaVersionWhenNotOwner() { } @Test + @SuppressWarnings("deprecation") void shouldDeleteAllSchemaVersions() { Namespace namespace = buildNamespace(); Schema schema = buildSchema(); @@ -514,6 +517,7 @@ void shouldDeleteAllSchemaVersions() { } @Test + @SuppressWarnings("deprecation") void shouldDeleteSchemaVersion() { Namespace namespace = buildNamespace(); Schema schema1 = buildSchema(); @@ -535,6 +539,7 @@ void shouldDeleteSchemaVersion() { } @Test + @SuppressWarnings("deprecation") void shouldNotDeleteAllSchemaVersionsWhenEmpty() { Namespace namespace = buildNamespace(); @@ -553,6 +558,7 @@ void shouldNotDeleteAllSchemaVersionsWhenEmpty() { } @Test + @SuppressWarnings("deprecation") void shouldNotDeleteSchemaVersionWhenEmpty() { Namespace namespace = buildNamespace(); @@ -571,6 +577,7 @@ void shouldNotDeleteSchemaVersionWhenEmpty() { } @Test + @SuppressWarnings("deprecation") void shouldNotDeleteAllSchemaVersionsInDryRunMode() { Namespace namespace = buildNamespace(); Schema schema = buildSchema(); @@ -590,6 +597,7 @@ void shouldNotDeleteAllSchemaVersionsInDryRunMode() { } @Test + @SuppressWarnings("deprecation") void shouldNotDeleteSchemaVersionInDryRunMode() { Namespace namespace = buildNamespace(); Schema schema = buildSchema(); @@ -608,6 +616,171 @@ void shouldNotDeleteSchemaVersionInDryRunMode() { verify(schemaService, never()).deleteVersion(namespace, "prefix.subject-value", "1"); } + @Test + void shouldBulkDeleteAllSchemaVersions() { + Namespace namespace = buildNamespace(); + Schema schema1 = buildSchema(); + SchemaList schemaList = buildSchemaList(); + + when(namespaceService.findByName("myNamespace")) + .thenReturn(Optional.of(namespace)); + when(schemaService.findByWildcardName(namespace, "prefix.subject-value")) + .thenReturn(Flux.fromIterable(List.of(schemaList))); + when(schemaService.getSubjectLatestVersion(namespace, "prefix.subject-value")) + .thenReturn(Mono.just(schema1)); + when(schemaService.deleteAllVersions(namespace, "prefix.subject-value")) + .thenReturn(Mono.just(new Integer[1])); + + StepVerifier.create(schemaController.bulkDelete("myNamespace", "prefix.subject-value", Optional.empty(), false)) + .consumeNextWith(response -> assertEquals(HttpStatus.NO_CONTENT, response.getStatus())) + .verifyComplete(); + + verify(applicationEventPublisher).publishEvent(any()); + } + + @Test + void shouldBulkDeleteSchemaVersion() { + Namespace namespace = buildNamespace(); + Schema schema = buildSchema(); + SchemaList schemaList = buildSchemaList(); + + when(namespaceService.findByName("myNamespace")) + .thenReturn(Optional.of(namespace)); + when(schemaService.findByWildcardName(namespace, "prefix.subject-value")) + .thenReturn(Flux.fromIterable(List.of(schemaList))); + when(schemaService.getSubjectByVersion(namespace, "prefix.subject-value", "1")) + .thenReturn(Mono.just(schema)); + when(schemaService.deleteVersion(namespace, "prefix.subject-value", "1")) + .thenReturn(Mono.just(1)); + + StepVerifier.create(schemaController.bulkDelete("myNamespace", "prefix.subject-value", Optional.of("1"), false)) + .consumeNextWith(response -> assertEquals(HttpStatus.NO_CONTENT, response.getStatus())) + .verifyComplete(); + + verify(applicationEventPublisher).publishEvent(any()); + } + + + @Test + void shouldNotBulkDeleteAllSchemaVersionsWhenEmpty() { + Namespace namespace = buildNamespace(); + + when(namespaceService.findByName("myNamespace")) + .thenReturn(Optional.of(namespace)); + when(schemaService.findByWildcardName(namespace, "prefix.subject-value")) + .thenReturn(Flux.fromIterable(List.of())); + + StepVerifier.create(schemaController.bulkDelete("myNamespace", "prefix.subject-value", Optional.empty(), false)) + .consumeNextWith(response -> assertEquals(HttpStatus.NOT_FOUND, response.getStatus())) + .verifyComplete(); + + verify(schemaService, never()).deleteAllVersions(namespace, "prefix.subject-value"); + } + + @Test + void shouldNotBulkDeleteSchemaVersionWhenEmpty() { + Namespace namespace = buildNamespace(); + + when(namespaceService.findByName("myNamespace")) + .thenReturn(Optional.of(namespace)); + when(schemaService.findByWildcardName(namespace, "prefix.subject-value")) + .thenReturn(Flux.fromIterable(List.of())); + + StepVerifier.create(schemaController.bulkDelete("myNamespace", "prefix.subject-value", Optional.of("1"), false)) + .consumeNextWith(response -> assertEquals(HttpStatus.NOT_FOUND, response.getStatus())) + .verifyComplete(); + + verify(schemaService, never()).deleteVersion(namespace, "prefix.subject-value", "1"); + } + + @Test + void shouldNotBulkDeleteAllSchemaVersionsWhenVersionNotFound() { + Namespace namespace = buildNamespace(); + Schema schema = buildSchema(); + SchemaList schemaList = buildSchemaList(); + SchemaList schemaList2 = buildSchemaList2(); + + when(namespaceService.findByName("myNamespace")) + .thenReturn(Optional.of(namespace)); + when(schemaService.findByWildcardName(namespace, "prefix.subject*")) + .thenReturn(Flux.fromIterable(List.of(schemaList, schemaList2))); + when(schemaService.getSubjectLatestVersion(namespace, "prefix.subject-value")) + .thenReturn(Mono.just(schema)); + when(schemaService.getSubjectLatestVersion(namespace, "prefix.subject2-value")) + .thenReturn(Mono.empty()); + + StepVerifier.create(schemaController.bulkDelete("myNamespace", "prefix.subject*", Optional.empty(), false)) + .consumeNextWith(response -> assertEquals(HttpStatus.NOT_FOUND, response.getStatus())) + .verifyComplete(); + + verify(schemaService, never()).deleteAllVersions(namespace, "prefix.subject-value"); + verify(schemaService, never()).deleteAllVersions(namespace, "prefix.subject2-value"); + } + + @Test + void shouldNotBulkDeleteSchemaVersionWhenVersionNotFound() { + Namespace namespace = buildNamespace(); + Schema schema = buildSchema(); + SchemaList schemaList = buildSchemaList(); + SchemaList schemaList2 = buildSchemaList2(); + + when(namespaceService.findByName("myNamespace")) + .thenReturn(Optional.of(namespace)); + when(schemaService.findByWildcardName(namespace, "prefix.subject*")) + .thenReturn(Flux.fromIterable(List.of(schemaList, schemaList2))); + when(schemaService.getSubjectByVersion(namespace, "prefix.subject-value", "1")) + .thenReturn(Mono.just(schema)); + when(schemaService.getSubjectByVersion(namespace, "prefix.subject2-value", "1")) + .thenReturn(Mono.empty()); + + StepVerifier.create(schemaController.bulkDelete("myNamespace", "prefix.subject*", Optional.of("1"), false)) + .consumeNextWith(response -> assertEquals(HttpStatus.NOT_FOUND, response.getStatus())) + .verifyComplete(); + + verify(schemaService, never()).deleteVersion(namespace, "prefix.subject-value", "1"); + verify(schemaService, never()).deleteVersion(namespace, "prefix.subject2-value", "1"); + } + + @Test + void shouldNotBulkDeleteAllSchemaVersionsInDryRunMode() { + Namespace namespace = buildNamespace(); + Schema schema = buildSchema(); + SchemaList schemaList = buildSchemaList(); + + when(namespaceService.findByName("myNamespace")) + .thenReturn(Optional.of(namespace)); + when(schemaService.findByWildcardName(namespace, "prefix.subject-value")) + .thenReturn(Flux.fromIterable(List.of(schemaList))); + when(schemaService.getSubjectLatestVersion(namespace, "prefix.subject-value")) + .thenReturn(Mono.just(schema)); + + StepVerifier.create(schemaController.bulkDelete("myNamespace", "prefix.subject-value", Optional.empty(), true)) + .consumeNextWith(response -> assertEquals(HttpStatus.NO_CONTENT, response.getStatus())) + .verifyComplete(); + + verify(schemaService, never()).deleteAllVersions(namespace, "prefix.subject-value"); + } + + @Test + void shouldNotBulkDeleteSchemaVersionInDryRunMode() { + Namespace namespace = buildNamespace(); + Schema schema = buildSchema(); + SchemaList schemaList = buildSchemaList(); + + when(namespaceService.findByName("myNamespace")) + .thenReturn(Optional.of(namespace)); + when(schemaService.findByWildcardName(namespace, "prefix.subject-value")) + .thenReturn(Flux.fromIterable(List.of(schemaList))); + when(schemaService.getSubjectByVersion(namespace, "prefix.subject-value", "1")) + .thenReturn(Mono.just(schema)); + + StepVerifier.create(schemaController.bulkDelete("myNamespace", "prefix.subject-value", Optional.of("1"), true)) + .consumeNextWith(response -> assertEquals(HttpStatus.NO_CONTENT, response.getStatus())) + .verifyComplete(); + + verify(schemaService, never()).deleteVersion(namespace, "prefix.subject-value", "1"); + } + private Namespace buildNamespace() { return Namespace.builder() .metadata(Metadata.builder() @@ -669,4 +842,12 @@ private SchemaList buildSchemaList() { .build()) .build(); } + + private SchemaList buildSchemaList2() { + return SchemaList.builder() + .metadata(Metadata.builder() + .name("prefix.subject2-value") + .build()) + .build(); + } } diff --git a/src/test/java/com/michelin/ns4kafka/integration/SchemaIntegrationTest.java b/src/test/java/com/michelin/ns4kafka/integration/SchemaIntegrationTest.java index f829f208..56506c01 100644 --- a/src/test/java/com/michelin/ns4kafka/integration/SchemaIntegrationTest.java +++ b/src/test/java/com/michelin/ns4kafka/integration/SchemaIntegrationTest.java @@ -1,6 +1,7 @@ package com.michelin.ns4kafka.integration; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -768,7 +769,7 @@ void shouldRegisterSameSchemaTwice() { } @Test - void shouldDeleteSchema() { + void shouldDeleteSchemaVersion() { Schema schemaV1 = Schema.builder() .metadata(Metadata.builder() .name("ns1-subject4-value") @@ -861,61 +862,152 @@ void shouldDeleteSchema() { var deleteLatestVersionResponse = ns4KafkaClient .toBlocking() .exchange(HttpRequest - .create(HttpMethod.DELETE, "/api/namespaces/ns1/schemas/ns1-subject4-value?version=latest") + .create(HttpMethod.DELETE, "/api/namespaces/ns1/schemas?name=ns1-subject4-value&version=latest") .bearerAuth(token), Schema.class); assertEquals(HttpStatus.NO_CONTENT, deleteLatestVersionResponse.getStatus()); - // Get all schemas - var getSchemaAfterLatestVersionDeletionResponse = ns4KafkaClient + // Get schemas versions + var getSchemaAfterLatestVersionDeletionResponse = schemaRegistryClient .toBlocking() .exchange(HttpRequest - .create(HttpMethod.GET, "/api/namespaces/ns1/schemas/ns1-subject4-value") - .bearerAuth(token), Schema.class); + .create(HttpMethod.GET, "/subjects/ns1-subject4-value/versions"), Argument.listOf(String.class)); - // Expects v3 is returned by ns4kafka assertTrue(getSchemaAfterLatestVersionDeletionResponse.getBody().isPresent()); - assertEquals(3, getSchemaAfterLatestVersionDeletionResponse.getBody().get().getSpec().getVersion()); + assertTrue(getSchemaAfterLatestVersionDeletionResponse.getBody().get().containsAll(List.of("1", "2", "3"))); // Delete old schema version var deleteOldVersionResponse = ns4KafkaClient .toBlocking() .exchange(HttpRequest - .create(HttpMethod.DELETE, "/api/namespaces/ns1/schemas/ns1-subject4-value?version=1") + .create(HttpMethod.DELETE, "/api/namespaces/ns1/schemas?name=ns1-subject4-value&version=1") .bearerAuth(token), Schema.class); assertEquals(HttpStatus.NO_CONTENT, deleteOldVersionResponse.getStatus()); - // Get all schemas - var getSchemaAfterOldVersionDeletionResponse = ns4KafkaClient + // Get schemas versions + var getSchemaAfterOldVersionDeletionResponse = schemaRegistryClient .toBlocking() .exchange(HttpRequest - .create(HttpMethod.GET, "/api/namespaces/ns1/schemas/ns1-subject4-value") - .bearerAuth(token), Schema.class); + .create(HttpMethod.GET, "/subjects/ns1-subject4-value/versions"), Argument.listOf(String.class)); - // Expects v3 as returned schema assertTrue(getSchemaAfterOldVersionDeletionResponse.getBody().isPresent()); - assertEquals(3, getSchemaAfterOldVersionDeletionResponse.getBody().get().getSpec().getVersion()); + assertTrue(getSchemaAfterOldVersionDeletionResponse.getBody().get().containsAll(List.of("2", "3"))); // Delete all remaining schema versions var deleteAllVersionsResponse = ns4KafkaClient .toBlocking() .exchange(HttpRequest - .create(HttpMethod.DELETE, "/api/namespaces/ns1/schemas/ns1-subject4-value") + .create(HttpMethod.DELETE, "/api/namespaces/ns1/schemas?name=ns1-subject4-value") .bearerAuth(token), Schema.class); assertEquals(HttpStatus.NO_CONTENT, deleteAllVersionsResponse.getStatus()); // Get all schemas - var getSchemaAfterAllVersionsDeletionResponse = ns4KafkaClient + var getSchemaAfterAllVersionsDeletionResponse = schemaRegistryClient .toBlocking() .exchange(HttpRequest - .create(HttpMethod.GET, "/api/namespaces/ns1/schemas") - .bearerAuth(token), Argument.listOf(SchemaList.class)); + .create(HttpMethod.GET, "/subjects"), Argument.listOf(String.class)); + + assertTrue(getSchemaAfterOldVersionDeletionResponse.getBody().isPresent()); + assertFalse(getSchemaAfterAllVersionsDeletionResponse.getBody().get().contains("ns1-subject4-value")); + } + + @Test + void shouldBulkDeleteSchemas() { + Schema schema1 = Schema.builder() + .metadata(Metadata.builder() + .name("ns1-subject5-value") + .build()) + .spec(Schema.SchemaSpec.builder() + .schema( + "{\"namespace\":\"com.michelin.kafka.producer.showcase.avro\",\"type\":\"record\"," + + "\"name\":\"PersonAvro\",\"fields\":[{\"name\":\"firstName\",\"type\":[\"null\",\"string\"]," + + "\"default\":null,\"doc\":\"First name of the person\"}," + + "{\"name\":\"lastName\",\"type\":[\"null\",\"string\"],\"default\":null," + + "\"doc\":\"Last name of the person\"},{\"name\":\"dateOfBirth\",\"type\":[\"null\"," + + "{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null," + + "\"doc\":\"Date of birth of the person\"}]}") + .build()) + .build(); + + Schema schema2 = Schema.builder() + .metadata(Metadata.builder() + .name("ns1-subject5-key") + .build()) + .spec(Schema.SchemaSpec.builder() + .schema( + "{\"namespace\":\"com.michelin.kafka.producer.showcase.avro\",\"type\":\"record\"," + + "\"name\":\"PersonAvro\",\"fields\":[{\"name\":\"firstName\",\"type\":[\"null\",\"string\"]," + + "\"default\":null,\"doc\":\"First name of the person\"}," + + "{\"name\":\"lastName\",\"type\":[\"null\",\"string\"],\"default\":null," + + "\"doc\":\"Last name of the person\"}]}") + .build()) + .build(); + + Schema schema3 = Schema.builder() + .metadata(Metadata.builder() + .name("ns1-subject6-value") + .build()) + .spec(Schema.SchemaSpec.builder() + .schema( + "{\"namespace\":\"com.michelin.kafka.producer.showcase.avro\",\"type\":\"record\"," + + "\"name\":\"PersonAvro\",\"fields\":[{\"name\":\"firstName\",\"type\":[\"null\",\"string\"]," + + "\"default\":null,\"doc\":\"First name of the person\"}," + + "{\"name\":\"lastName\",\"type\":[\"null\",\"string\"],\"default\":null," + + "\"doc\":\"Last name of the person\"}]}") + .build()) + .build(); + + // Register all schemas + ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.POST, "/api/namespaces/ns1/schemas") + .bearerAuth(token) + .body(schema1), Schema.class); + + ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.POST, "/api/namespaces/ns1/schemas") + .bearerAuth(token) + .body(schema2), Schema.class); + + ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.POST, "/api/namespaces/ns1/schemas") + .bearerAuth(token) + .body(schema3), Schema.class); + + var getSchemasBeforeDeletionResponse = schemaRegistryClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.GET, "/subjects"), Argument.listOf(String.class)); + + assertTrue(getSchemasBeforeDeletionResponse.getBody().isPresent()); + assertTrue(getSchemasBeforeDeletionResponse.getBody().get() + .containsAll(List.of("ns1-subject5-value", "ns1-subject5-key", "ns1-subject6-value"))); + + // Delete schema with wildcard + var deleteResponse = ns4KafkaClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.DELETE, "/api/namespaces/ns1/schemas?name=ns1-subject5-*") + .bearerAuth(token), Schema.class); + + assertEquals(HttpStatus.NO_CONTENT, deleteResponse.getStatus()); + + var getSchemasAfterDeletionResponse = schemaRegistryClient + .toBlocking() + .exchange(HttpRequest + .create(HttpMethod.GET, "/subjects"), Argument.listOf(String.class)); - assertTrue(getSchemaAfterAllVersionsDeletionResponse.getBody().isPresent()); - assertTrue(getSchemaAfterAllVersionsDeletionResponse.getBody().get() + assertTrue(getSchemasAfterDeletionResponse.getBody().isPresent()); + assertTrue(getSchemasAfterDeletionResponse.getBody().get() .stream() - .noneMatch(schemaList -> schemaList.getMetadata().getName().equals("ns1-subject4-value"))); + .noneMatch(subject -> List.of("ns1-subject5-key", "ns1-subject5-value").contains(subject))); + assertTrue(getSchemasAfterDeletionResponse.getBody().get().contains("ns1-subject6-value")); } }